From navan-pack
Syncs Navan BOOKING and TRANSACTION data using full-refresh ETL, incremental watermarks, webhooks, and SQL merge-upserts to warehouses like PostgreSQL.
npx claudepluginhub jeremylongshore/claude-code-plugins-plus-skills --plugin navan-packThis skill is limited to using the following tools:
This skill provides production-grade sync strategies for Navan data. The two primary tables have fundamentally different sync models: BOOKING requires weekly full-refresh with merge-upsert logic (every record is re-imported, keyed by UUID), while TRANSACTION is incremental and append-only. Real-time use cases require webhook callbacks for event-driven processing. This skill covers all three tie...
Extracts Navan booking/transaction data via paginated REST API or connectors (Fivetran/Airbyte) with filtering/deduplication for warehouses, dashboards, data quality debugging.
Guides connecting data warehouse sources like Postgres, MySQL, Stripe, HubSpot, MongoDB, Salesforce, BigQuery, Snowflake to PostHog via wizard, credential validation, table discovery, sync selection, and creation.
Builds reliable third-party API integrations with OAuth, webhooks, rate limiting, error handling, and data sync. Use for external services like Slack, Stripe, Gmail.
Share bugs, ideas, or general feedback.
This skill provides production-grade sync strategies for Navan data. The two primary tables have fundamentally different sync models: BOOKING requires weekly full-refresh with merge-upsert logic (every record is re-imported, keyed by UUID), while TRANSACTION is incremental and append-only. Real-time use cases require webhook callbacks for event-driven processing. This skill covers all three tiers — scheduled full-refresh, incremental watermark-based sync, and real-time webhooks — along with Airbyte connector configuration and idempotent SQL upsert patterns.
navan-install-auth)NAVAN_CLIENT_ID, NAVAN_CLIENT_SECRET, NAVAN_BASE_URLThe BOOKING table is re-imported weekly by Navan. Every record is refreshed, so your sync must use merge-upsert logic to avoid duplicates while capturing updates.
const tokenRes = await fetch(`${process.env.NAVAN_BASE_URL}/ta-auth/oauth/token`, {
method: 'POST',
headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
body: new URLSearchParams({
grant_type: 'client_credentials',
client_id: process.env.NAVAN_CLIENT_ID!,
client_secret: process.env.NAVAN_CLIENT_SECRET!,
}),
});
const { access_token } = await tokenRes.json();
const headers = { Authorization: `Bearer ${access_token}` };
// Full extraction — paginate through all bookings for weekly refresh
let allBookings: any[] = [];
let page = 0;
const size = 50;
while (true) {
const res = await fetch(
`${process.env.NAVAN_BASE_URL}/v1/bookings?page=${page}&size=${size}`,
{ headers }
);
const { data } = await res.json();
if (!data || !data.length) break;
allBookings.push(...data);
if (data.length < size) break;
page++;
}
console.log(`Extracted ${allBookings.length} bookings for full refresh`);
SQL merge-upsert pattern (PostgreSQL):
-- Staging table receives raw API data
CREATE TABLE IF NOT EXISTS navan_booking_staging (
uuid TEXT PRIMARY KEY,
traveler_email TEXT,
origin TEXT,
destination TEXT,
start_date DATE,
end_date DATE,
total_cost NUMERIC(12,2),
currency TEXT DEFAULT 'USD',
department TEXT,
cost_center TEXT,
status TEXT,
in_policy BOOLEAN,
created_at TIMESTAMPTZ,
updated_at TIMESTAMPTZ,
synced_at TIMESTAMPTZ DEFAULT NOW()
);
-- Merge-upsert: insert new records, update changed records
INSERT INTO navan_booking AS b
SELECT * FROM navan_booking_staging s
ON CONFLICT (uuid) DO UPDATE SET
traveler_email = EXCLUDED.traveler_email,
origin = EXCLUDED.origin,
destination = EXCLUDED.destination,
start_date = EXCLUDED.start_date,
end_date = EXCLUDED.end_date,
total_cost = EXCLUDED.total_cost,
status = EXCLUDED.status,
in_policy = EXCLUDED.in_policy,
updated_at = EXCLUDED.updated_at,
synced_at = NOW()
WHERE b.updated_at < EXCLUDED.updated_at;
TRANSACTION data is append-only. Use watermark-based sync to pull only new records.
// Track high-watermark for incremental pulls
interface SyncState {
lastSyncDate: string; // ISO date of last successful sync
lastTransactionId: string;
}
async function loadSyncState(): Promise<SyncState> {
const fs = await import('fs');
try {
return JSON.parse(fs.readFileSync('.navan-sync-state.json', 'utf-8'));
} catch {
return { lastSyncDate: '2025-01-01', lastTransactionId: '' };
}
}
async function saveSyncState(state: SyncState) {
const fs = await import('fs');
fs.writeFileSync('.navan-sync-state.json', JSON.stringify(state, null, 2));
}
// Pull bookings since last watermark
const state = await loadSyncState();
const today = new Date().toISOString().split('T')[0];
const txnRes = await fetch(
`${process.env.NAVAN_BASE_URL}/v1/bookings` +
`?createdFrom=${state.lastSyncDate}&createdTo=${today}&page=0&size=50`,
{ headers }
);
const { data: transactions } = await txnRes.json();
// Filter out already-seen transactions
const newTxns = transactions.filter(
(t: any) => t.transaction_id > state.lastTransactionId
);
console.log(`New transactions since ${state.lastSyncDate}: ${newTxns.length}`);
// Update watermark after successful load
if (newTxns.length > 0) {
await saveSyncState({
lastSyncDate: today,
lastTransactionId: newTxns[newTxns.length - 1].transaction_id,
});
}
import { createServer } from 'http';
import { createHmac } from 'crypto';
// Webhook handler for real-time Navan events
const server = createServer(async (req, res) => {
if (req.method !== 'POST' || req.url !== '/navan/webhook') {
res.writeHead(404);
res.end();
return;
}
const chunks: Buffer[] = [];
for await (const chunk of req) chunks.push(chunk as Buffer);
const body = Buffer.concat(chunks).toString();
// Verify webhook signature
const signature = req.headers['x-navan-signature'] as string;
const expected = createHmac('sha256', process.env.NAVAN_WEBHOOK_SECRET!)
.update(body)
.digest('hex');
if (signature !== expected) {
console.error('Invalid webhook signature');
res.writeHead(401);
res.end('Unauthorized');
return;
}
const event = JSON.parse(body);
console.log(`Webhook event: ${event.type}`);
switch (event.type) {
case 'booking.created':
console.log(`New booking: ${event.data.uuid}`);
break;
case 'booking.updated':
console.log(`Booking updated: ${event.data.uuid}`);
break;
case 'booking.cancelled':
console.log(`Booking cancelled: ${event.data.uuid}`);
break;
case 'expense.submitted':
console.log(`Expense submitted: ${event.data.transaction_id}`);
break;
case 'expense.approved':
console.log(`Expense approved: ${event.data.transaction_id}`);
break;
default:
console.log(`Unknown event type: ${event.type}`);
}
res.writeHead(200);
res.end('OK');
});
server.listen(3000, () => console.log('Webhook listener on :3000'));
# Airbyte source-navan connector (v0.0.42)
# Production sync mode configuration
source:
sourceDefinitionId: source-navan
connectionConfiguration:
client_id: "${NAVAN_CLIENT_ID}"
client_secret: "${NAVAN_CLIENT_SECRET}"
# Sync catalog — bookings stream
syncCatalog:
streams:
- stream:
name: bookings
jsonSchema: {}
config:
# Full Refresh for BOOKING (weekly re-import model)
syncMode: full_refresh
destinationSyncMode: overwrite
# Alternative: append_dedup with uuid as primary key
# syncMode: full_refresh
# destinationSyncMode: append_dedup
# primaryKey: [["uuid"]]
# Schedule: run weekly to match Navan's BOOKING refresh cycle
schedule:
scheduleType: cron
cronExpression: "0 2 * * 0" # Sunday 2am UTC
// Monitor sync health and detect drift
interface SyncMetrics {
tableName: string;
lastSyncTime: Date;
recordCount: number;
expectedFrequency: string;
isStale: boolean;
}
async function checkSyncHealth(): Promise<SyncMetrics[]> {
const state = await loadSyncState();
const lastSync = new Date(state.lastSyncDate);
const hoursSinceSync = (Date.now() - lastSync.getTime()) / (1000 * 60 * 60);
return [
{
tableName: 'BOOKING',
lastSyncTime: lastSync,
recordCount: 0, // populated from warehouse query
expectedFrequency: 'weekly',
isStale: hoursSinceSync > 7 * 24 + 6, // alert if > 7.25 days
},
{
tableName: 'TRANSACTION',
lastSyncTime: lastSync,
recordCount: 0,
expectedFrequency: 'daily',
isStale: hoursSinceSync > 25, // alert if > 25 hours
},
];
}
const health = await checkSyncHealth();
health.forEach(m => {
const status = m.isStale ? 'STALE' : 'OK';
console.log(`${m.tableName}: ${status} (last sync: ${m.lastSyncTime.toISOString()})`);
});
// Ensure loads are idempotent — safe to re-run without side effects
async function idempotentLoad(records: any[], tableName: string) {
const batchId = `${tableName}-${new Date().toISOString()}`;
// 1. Write to staging with batch ID
console.log(`Loading ${records.length} records to ${tableName}_staging (batch: ${batchId})`);
// 2. Merge-upsert from staging to target
// Uses UUID as natural key — same record always resolves to same row
console.log(`Merging ${tableName}_staging -> ${tableName}`);
// 3. Record batch metadata for audit
console.log(`Batch ${batchId} complete: ${records.length} records processed`);
return { batchId, recordCount: records.length, status: 'complete' };
}
Successful execution produces:
| Error | HTTP Code | Cause | Solution |
|---|---|---|---|
| Unauthorized | 401 | Expired or invalid bearer token | Re-authenticate via POST /ta-auth/oauth/token |
| Rate Limited | 429 | Too many API requests | Use exponential backoff; increase sync interval |
| Timeout | 504 | Full refresh too large | Chunk by date range (30-day windows) |
| Webhook Sig Invalid | 401 | Tampered or replayed event | Verify NAVAN_WEBHOOK_SECRET; check clock skew |
| Duplicate Records | N/A | Missing UUID dedup in BOOKING sync | Apply merge-upsert with ON CONFLICT (uuid) |
| Sync Drift | N/A | Missed incremental window | Fall back to full refresh; reset watermark |
Python — Incremental TRANSACTION sync with watermark:
import requests
import json
import os
from datetime import datetime
base_url = os.environ.get('NAVAN_BASE_URL', 'https://api.navan.com')
auth = requests.post(f'{base_url}/ta-auth/oauth/token', data={
'grant_type': 'client_credentials',
'client_id': os.environ['NAVAN_CLIENT_ID'],
'client_secret': os.environ['NAVAN_CLIENT_SECRET'],
})
headers = {'Authorization': f'Bearer {auth.json()["access_token"]}'}
# Load watermark
try:
with open('.navan-sync-state.json') as f:
state = json.load(f)
except FileNotFoundError:
state = {'last_sync_date': '2025-01-01'}
today = datetime.now().strftime('%Y-%m-%d')
resp = requests.get(
f'{base_url}/v1/bookings',
params={'createdFrom': state['last_sync_date'], 'createdTo': today, 'page': 0, 'size': 50},
headers=headers,
).json()
txns = resp['data']
print(f'Fetched {len(txns)} records since {state["last_sync_date"]}')
# Save updated watermark
with open('.navan-sync-state.json', 'w') as f:
json.dump({'last_sync_date': today}, f)
After configuring data sync, proceed to navan-observability for pipeline monitoring or navan-performance-tuning for optimizing large-volume syncs.