Source Connectors

Connect external CRMs, SQL warehouses, and cloud storage to the Identity Store.

Source Connectors

The Identity Store ingests records from 22 external source types. Each connector is a registered reader on the backend (backend/app/services/readers.py) plus a UI form block on the frontend (components/sources/connector-registry.tsx or file-source-registry.tsx). Configure a source once with credentials, then trigger ingest from the Sources API or the Sources page.

Connector Categories

CategoryAuthsource_type valuesNotes
SQL databasesConnection stringpostgres_identity, mysql_identity, snowflake_identity, bigquery_identityPer-driver identifier regex guards against SQL injection on the user-supplied table name
File / cloud storageCloud SDK creds or signed URLcsv_url, s3_csv, gcs_csv, azure_blob_csv, google_sheets, sftp_csvCSV files are streamed; csv_url capped at 50 MB
OAuth RESTOAuth 2.0 (refresh token)salesforce_contacts, microsoft_contacts, google_contactsTokens refresh on-demand via oauth_flows.ensure_fresh_credential()
Bearer-token RESTAPI keyhubspot_contacts, airtable_records, stripe_customers, intercom_contacts, pipedrive_persons, zendesk_users, shopify_customers, klaviyo_profiles, mailchimp_membersToken stored encrypted; included as Authorization: Bearer …

Cursor-based Incremental Ingest

Most connectors accept a cursor configuration on the source row to skip already-ingested data on re-runs:

FieldExampleBehavior
cursor_columnupdated_atColumn or field name to track
cursor_typeint / timestamp / textDrives the comparison rule
cursor_after2026-04-13T00:00:00ZHigh-water mark from the last successful run

Note: csv_url and pipedrive_persons skip the shared cursor helper because their timestamp formats need bespoke handling. Every other reader funnels through _apply_cursor_filter() for consistent behavior.

Adding a Connector

If you're extending the suite, the registration pattern is:

  1. Backend reader — implement a function with the Reader signature in readers.py, then register("your_source_type", _your_reader). End the function with _apply_cursor_filter() and _cast_all_utf8() so cursors and the JSONB store stay consistent.
  2. CI invariant — add your_source_type to the expected set in tests/test_readers.py::test_all_phase1_connectors_registered. CI fails fast if the registration is missing.
  3. Frontend form block — add an entry to components/sources/connector-registry.tsx (bearer-token style) or file-source-registry.tsx (cloud/file style). Avoid inlining new branches into sources/page.tsx.
  4. SQL connectors only — define a _safe_<driver>_identifier() regex guard and a test_rejects_unsafe_table_name test, mirroring the MySQL / Snowflake / BigQuery implementations.

Tip: SQL, OAuth, and BigQuery flows stay inline in sources/page.tsx because their UX (Test Connection button, OAuth redirect, project + dataset + JSON hybrid) doesn't fit the registry shape. Everything else belongs in a registry.

OAuth Credential Refresh

OAuth connectors store the refresh token at credential creation time and exchange it for a fresh access token on every ingest run via ensure_fresh_credential(). If the refresh fails — typically because the user revoked access — the backend translates the invalid_grant body into an actionable error pointing the user back to the credentials panel rather than leaking the raw HTTP error.