Sources¶
Sources allow you to define where data files come from and automatically discover and attach new files as they become available. This is useful for working with directories of files that grow over time, such as daily data exports or streaming data partitions.
Overview¶
The source workflow has two steps:
- Define a source with
CREATE SOURCE- Specifies where to look for files - Fetch new files with
FETCH- Discovers and attaches any new files found
Basic Usage¶
import bundlebase as bb
# Create a bundle with a source
bundle = await (bb.create("my/data")
.create_source("remote_dir", {
"url": "s3://my-bucket/data/",
"patterns": "**/*.parquet"
}))
# Fetch discovers and attaches all matching files
await bundle.fetch("base", "add")
# Later, fetch again to get any new files
await bundle.fetch("base", "add")
import bundlebase.sync as bb
# Create a bundle with a source
bundle = (bb.create("my/data")
.create_source("remote_dir", {
"url": "s3://my-bucket/data/",
"patterns": "**/*.parquet"
}))
# Fetch discovers and attaches all matching files
bundle.fetch("base", "add")
# Later, fetch again to get any new files
bundle.fetch("base", "add")
Connectors¶
Connectors define how to pull data from an external source into your bundle.
Bundlebase ships with several common connectors, but custom connectors can also be made and plugged in to support any external data you need.
http¶
Downloads data from a single HTTP(S) URL. Supports GET, POST, and PUT methods, custom request headers, and any URL that returns a data file (CSV, TSV, JSON, Parquet, Excel, etc.).
Arguments:
| Argument | Required | Default | Description |
|---|---|---|---|
url |
Yes | — | The HTTP(S) URL to download |
method |
No | GET |
HTTP method: GET, POST, or PUT |
body |
No | — | Request body string, for POST/PUT |
headers |
No | — | Additional request headers, one Name: Value per line |
format |
No | auto |
File format: csv, json, jsonl, parquet, tsv, or auto |
head_supported |
No | true |
Set to false if the server rejects HEAD requests |
Format auto-detection: When format is auto (the default), the connector detects the data format using this priority:
- Content-Type header — from the HTTP response (e.g.,
text/csv,application/json) - URL file extension — recognized extensions:
.csv,.json,.jsonl,.parquet,.tsv,.xlsx - Content inspection — examines the downloaded bytes
GET requests (default):
-- Download a CSV directly
CREATE SOURCE USING http WITH (url = 'https://data.mn.gov/api/lake_quality.csv')
-- API endpoint — format auto-detected from Content-Type header
CREATE SOURCE USING http WITH (url = 'https://api.example.com/data?query=results')
-- Force format when auto-detection doesn't match
CREATE SOURCE USING http WITH (url = 'https://api.example.com/data', format = 'csv')
-- Server doesn't support HEAD requests
CREATE SOURCE USING http WITH (url = 'https://data.example.gov/export.csv', head_supported = 'false')
POST and PUT requests:
Some APIs require POST or PUT to retrieve data — for example, query APIs that accept filter parameters as a JSON body. Use the method and body arguments.
For multi-line JSON bodies, use dollar-quoting ($$...$$) — no escaping needed:
```sql -- POST with a JSON body (dollar-quoting avoids escaping issues) CREATE SOURCE USING http WITH ( url = 'https://api.example.com/data/query', method = 'POST', body = $\({ "filters": {"state": "MN", "type": "Lake"}, "format": "csv" }\)$, headers = 'Content-Type: application/json
Accept: text/csv' )
-- POST with a form-encoded body
CREATE SOURCE USING http WITH (
url = 'https://api.example.com/export',
method = 'POST',
body = 'statecode=US%3A27&mimeType=csv',
headers = 'Content-Type: application/x-www-form-urlencoded'
)
```
Custom headers (GET with authentication):
Use headers to add authorization tokens or request specific response formats from GET endpoints:
```sql CREATE SOURCE USING http WITH ( url = 'https://api.example.com/export', headers = 'Authorization: Bearer my-api-token
Accept: text/csv' ) ```
Note
POST and PUT requests skip the HEAD probe entirely — head_supported has no effect for non-GET methods.
JSON Options¶
When a connector fetches a JSON file, you can control how records are extracted and nested fields are flattened. The connector transforms the JSON and copies the result into the bundle as Parquet. These options work with any connector (http, remote_dir, ftp_directory, etc.).
| Option | Default | Description |
|---|---|---|
json_record_path |
(required to activate) | Dot-notation path to the array of records (e.g., "data", "results.items"). Empty string ("") means the top-level value is a bare JSON array. |
json_sep |
_ |
Separator used when flattening nested field names. With the default, user.name becomes the column user_name. |
json_meta |
(none) | Comma-separated dot-notation paths to fields in the outer object to include as extra columns on every row (e.g., "total,page"). |
Common pattern — API with response wrapper:
Most REST APIs return a wrapper like {"total": 847, "data": [{...}, ...]} rather than a bare array. Use json_record_path to reach the array and json_meta to capture outer fields:
Deeply nested path:
Directory of JSON files with consistent structure:
remote_dir¶
Lists files from a local or cloud directory. Supports any URL scheme supported by the IO registry (S3, GCS, Azure, file://, etc.).
Arguments:
| Argument | Required | Description |
|---|---|---|
url |
Yes | Directory URL (e.g., s3://bucket/data/, file:///path/to/data/) |
patterns |
No | Comma-separated glob patterns (default: **/*) |
copy |
No | "true" to copy files into bundle (default), "false" to reference in place |
# S3 bucket
bundle = await bundle.create_source("remote_dir", {
"url": "s3://my-bucket/data/",
"patterns": "**/*.parquet"
})
# Local directory
bundle = await bundle.create_source("remote_dir", {
"url": "file:///data/exports/",
"patterns": "**/*.csv,**/*.parquet"
})
# Reference files in place instead of copying
bundle = await bundle.create_source("remote_dir", {
"url": "s3://my-bucket/data/",
"copy": "false"
})
# S3 bucket
bundle = bundle.create_source("remote_dir", {
"url": "s3://my-bucket/data/",
"patterns": "**/*.parquet"
})
# Local directory
bundle = bundle.create_source("remote_dir", {
"url": "file:///data/exports/",
"patterns": "**/*.csv,**/*.parquet"
})
# Reference files in place instead of copying
bundle = bundle.create_source("remote_dir", {
"url": "s3://my-bucket/data/",
"copy": "false"
})
ftp_directory¶
Lists files from an FTP server. Supports anonymous and authenticated access.
Arguments:
| Argument | Required | Description |
|---|---|---|
url |
Yes | FTP URL (e.g., ftp://user:pass@host:21/path/) |
patterns |
No | Comma-separated glob patterns (default: **/*) |
Note
Files are always copied into the bundle since FTP URLs cannot be directly referenced during query execution.
sftp_directory¶
Lists files from a remote directory via SFTP. Requires an SSH private key for authentication.
Arguments:
| Argument | Required | Description |
|---|---|---|
url |
Yes | SFTP URL (e.g., sftp://user@host:22/path/) |
key_path |
Yes | Path to SSH private key file (e.g., ~/.ssh/id_rsa) |
patterns |
No | Comma-separated glob patterns (default: **/*) |
Note
Files are always copied into the bundle since SFTP URLs cannot be directly referenced during query execution.
kaggle¶
Downloads dataset files from Kaggle via the Kaggle REST API. Discovers individual files within a dataset, downloads them as ZIP archives, and extracts the contents automatically.
Arguments:
| Argument | Required | Description |
|---|---|---|
dataset |
Yes | Dataset identifier in owner/dataset-name format (e.g., zillow/zecon) |
patterns |
No | Comma-separated glob patterns (default: **/*) |
mode |
No | Sync mode: add (default), update, or sync |
version |
No | Dataset version number to download (default: latest) |
Authentication
Kaggle credentials are configured via bundlebase configuration. Available config keys for the kaggle scope:
| Key | Description | Default |
|---|---|---|
username |
Kaggle username | from ~/.kaggle/kaggle.json |
key |
Kaggle API key | from ~/.kaggle/kaggle.json |
url |
Kaggle API base URL | https://www.kaggle.com |
If username and key are not set via bundlebase config, they fall back to the standard Kaggle credentials file at ~/.kaggle/kaggle.json. You can create this file by running kaggle CLI setup or by generating an API token from your Kaggle account settings.
Note
Files are always copied into the bundle since Kaggle files are downloaded from a remote API.
# All files from a dataset
bundle = await bundle.create_source("kaggle", {
"dataset": "zillow/zecon"
})
# Only CSV files
bundle = await bundle.create_source("kaggle", {
"dataset": "zillow/zecon",
"patterns": "*.csv"
})
# With sync mode to detect updates
bundle = await bundle.create_source("kaggle", {
"dataset": "zillow/zecon",
"mode": "update"
})
# All files from a dataset
bundle = bundle.create_source("kaggle", {
"dataset": "zillow/zecon"
})
# Only CSV files
bundle = bundle.create_source("kaggle", {
"dataset": "zillow/zecon",
"patterns": "*.csv"
})
# With sync mode to detect updates
bundle = bundle.create_source("kaggle", {
"dataset": "zillow/zecon",
"mode": "update"
})
Custom Connectors¶
Bundlebase supports custom connectors in two modes — native (in-process, zero-copy) and IPC (subprocess). Custom connectors use a two-step workflow: IMPORT CONNECTOR or IMPORT TEMP CONNECTOR, then CREATE SOURCE.
native (In-Process, Zero-Copy)¶
bundle.import_temp_connector('example.connector', 'python::example_connector:ExampleConnector')
bundle.create_source('example.connector')
IPC (Subprocess)¶
bundle.import_connector('example.connector', 'ipc::./example_connector')
bundle.create_source('example.connector')
See Custom Connectors for full command reference, runtime values, SDKs, and protocol details.
Dropping Connectors¶
To remove custom connectors, see the full command reference:
DROP CONNECTOR— removes the connector (or just a specific platform's entry)DROP TEMP CONNECTOR— removes runtime-only connector entries
Fetching Data¶
fetch()¶
Discovers and attaches new files from a specific pack's sources. Returns a list of FetchResults, one for each source.
Dry Run¶
Use DRY RUN (SQL) to preview what a fetch would do without actually attaching or removing any files. The returned FetchResults show what would be added, replaced, or removed.
fetch_all()¶
Discovers and attaches new files from all defined sources across all packs. Returns a list of FetchResults, one for each source (including sources with no changes).
FetchResults¶
Each FetchResults object contains:
| Property | Type | Description |
|---|---|---|
connector |
str |
Connector name (e.g., "remote_dir") |
source_url |
str |
Source URL |
pack |
str |
Pack name ("base" or join name) |
added |
list[FetchedBlock] |
Blocks that were newly added |
replaced |
list[FetchedBlock] |
Blocks that were replaced (updated) |
removed |
list[str] |
Source locations of blocks that were removed |
Methods:
total_count()- Total number of changes (added + replaced + removed)is_empty()- ReturnsTrueif no changes were made
Sources with Joins¶
You can define sources for joined packs by specifying the pack parameter.
import bundlebase as bb
# Create bundle with base data
bundle = await bb.create("my/data").attach("orders.parquet")
# Create a join for customer data
bundle = await bundle.join("customers", "bundle.customer_id = customers.id")
# Define a source for the customers pack
bundle = await bundle.create_source("remote_dir", {
"url": "s3://bucket/customers/",
"patterns": "**/*.parquet"
}, pack="customers")
# Fetch will attach files to the customers join
results = await bundle.fetch("customers", "add")
print(f"Added {len(results[0].added)} customer files")
import bundlebase.sync as bb
# Create bundle with base data
bundle = bb.create("my/data").attach("orders.parquet")
# Create a join for customer data
bundle = bundle.join("customers", "bundle.customer_id = customers.id")
# Define a source for the customers pack
bundle = bundle.create_source("remote_dir", {
"url": "s3://bucket/customers/",
"patterns": "**/*.parquet"
}, pack="customers")
# Fetch will attach files to the customers join
results = bundle.fetch("customers", "add")
print(f"Added {len(results[0].added)} customer files")
Pattern Matching¶
The patterns argument accepts comma-separated glob patterns:
| Pattern | Matches |
|---|---|
**/* |
All files recursively (default) |
*.parquet |
Parquet files in the root directory |
**/*.parquet |
Parquet files in any subdirectory |
**/*.csv,**/*.parquet |
CSV and Parquet files |
2024/**/*.parquet |
Parquet files under the 2024 directory |
Workflow Example¶
A typical workflow for incrementally loading data:
import bundlebase as bb
# Initial setup
bundle = await (bb.create("sales/data")
.create_source("remote_dir", {
"url": "s3://company/sales/",
"patterns": "**/*.parquet"
}))
# Initial load
results = await bundle.fetch("base", "add")
total_added = sum(len(r.added) for r in results)
print(f"Initial load: {total_added} files")
await bundle.commit("Initial data load")
# ... time passes, new files appear in S3 ...
# Incremental load (only attaches new files)
bundle = (await bb.open("sales/data")).extend()
results = await bundle.fetch("base", "add")
total_added = sum(len(r.added) for r in results)
if total_added > 0:
print(f"Loaded {total_added} new files")
await bundle.commit("Incremental data load")
import bundlebase.sync as bb
# Initial setup
bundle = (bb.create("sales/data")
.create_source("remote_dir", {
"url": "s3://company/sales/",
"patterns": "**/*.parquet"
}))
# Initial load
results = bundle.fetch("base", "add")
total_added = sum(len(r.added) for r in results)
print(f"Initial load: {total_added} files")
bundle.commit("Initial data load")
# ... time passes, new files appear in S3 ...
# Incremental load (only attaches new files)
bundle = bb.open("sales/data").extend()
results = bundle.fetch("base", "add")
total_added = sum(len(r.added) for r in results)
if total_added > 0:
print(f"Loaded {total_added} new files")
bundle.commit("Incremental data load")