Source Connectors
Connect external CRMs, SQL warehouses, and cloud storage to the Identity Store.
Source Connectors
The Identity Store ingests records from 22 external source types. Each connector is a registered reader on the backend (backend/app/services/readers.py) plus a UI form block on the frontend (components/sources/connector-registry.tsx or file-source-registry.tsx). Configure a source once with credentials, then trigger ingest from the Sources API or the Sources page.
Connector Categories
| Category | Auth | source_type values | Notes |
|---|---|---|---|
| SQL databases | Connection string | postgres_identity, mysql_identity, snowflake_identity, bigquery_identity | Per-driver identifier regex guards against SQL injection on the user-supplied table name |
| File / cloud storage | Cloud SDK creds or signed URL | csv_url, s3_csv, gcs_csv, azure_blob_csv, google_sheets, sftp_csv | CSV files are streamed; csv_url capped at 50 MB |
| OAuth REST | OAuth 2.0 (refresh token) | salesforce_contacts, microsoft_contacts, google_contacts | Tokens refresh on-demand via oauth_flows.ensure_fresh_credential() |
| Bearer-token REST | API key | hubspot_contacts, airtable_records, stripe_customers, intercom_contacts, pipedrive_persons, zendesk_users, shopify_customers, klaviyo_profiles, mailchimp_members | Token stored encrypted; included as Authorization: Bearer … |
Cursor-based Incremental Ingest
Most connectors accept a cursor configuration on the source row to skip already-ingested data on re-runs:
| Field | Example | Behavior |
|---|---|---|
cursor_column | updated_at | Column or field name to track |
cursor_type | int / timestamp / text | Drives the comparison rule |
cursor_after | 2026-04-13T00:00:00Z | High-water mark from the last successful run |
Note:
csv_urlandpipedrive_personsskip the shared cursor helper because their timestamp formats need bespoke handling. Every other reader funnels through_apply_cursor_filter()for consistent behavior.
Adding a Connector
If you're extending the suite, the registration pattern is:
- Backend reader — implement a function with the
Readersignature inreaders.py, thenregister("your_source_type", _your_reader). End the function with_apply_cursor_filter()and_cast_all_utf8()so cursors and the JSONB store stay consistent. - CI invariant — add
your_source_typeto theexpectedset intests/test_readers.py::test_all_phase1_connectors_registered. CI fails fast if the registration is missing. - Frontend form block — add an entry to
components/sources/connector-registry.tsx(bearer-token style) orfile-source-registry.tsx(cloud/file style). Avoid inlining new branches intosources/page.tsx. - SQL connectors only — define a
_safe_<driver>_identifier()regex guard and atest_rejects_unsafe_table_nametest, mirroring the MySQL / Snowflake / BigQuery implementations.
Tip: SQL, OAuth, and BigQuery flows stay inline in
sources/page.tsxbecause their UX (Test Connection button, OAuth redirect, project + dataset + JSON hybrid) doesn't fit the registry shape. Everything else belongs in a registry.
OAuth Credential Refresh
OAuth connectors store the refresh token at credential creation time and exchange it for a fresh access token on every ingest run via ensure_fresh_credential(). If the refresh fails — typically because the user revoked access — the backend translates the invalid_grant body into an actionable error pointing the user back to the credentials panel rather than leaking the raw HTTP error.
Related
- Sources API — REST endpoints for source CRUD and ingest
- Workbench — where ingested records flow into the AI-driven MDM surface
- Entity Resolution Workflow — end-to-end pipeline once your sources are connected