Sources¶
Sources allow you to define where data files come from and automatically discover and attach new files as they become available. This is useful for working with directories of files that grow over time, such as daily data exports or streaming data partitions.
Overview¶
The source workflow has two steps:
- Define a source with
CREATE SOURCE- Specifies where to look for files - Fetch new files with
FETCH- Discovers and attaches any new files found
CREATE SOURCE runs an implicit FETCH base ADD by default, so the new source is populated immediately. Pass fetch=False (Python) or add NO FETCH (SQL) to skip that step. This is useful when shipping an empty bundle whose recipients fetch their own data.
Basic Usage¶
import bundlebase as bb
# Create a bundle with a source
bundle = await (bb.create("my/data")
.create_source("remote_dir", {
"url": "s3://my-bucket/data/",
"patterns": "**/*.parquet"
}))
# Fetch discovers and attaches all matching files
await bundle.fetch("base", "add")
# Later, fetch again to get any new files
await bundle.fetch("base", "add")
import bundlebase.sync as bb
# Create a bundle with a source
bundle = (bb.create("my/data")
.create_source("remote_dir", {
"url": "s3://my-bucket/data/",
"patterns": "**/*.parquet"
}))
# Fetch discovers and attaches all matching files
bundle.fetch("base", "add")
# Later, fetch again to get any new files
bundle.fetch("base", "add")
Batching Small Files¶
When a source produces many small files, you can merge them into larger parquet batches to reduce query overhead.
Use a human-readable size such as 500K, 15M, or 3G. MIN BATCH only applies when fetched data is stored as parquet (SAVE AS AUTO or SAVE AS PARQUET).
Connectors¶
Connectors define how to pull data from an external source into your bundle.
Bundlebase ships with several common connectors, but custom connectors can also be made and plugged in to support any external data you need.
http¶
Downloads data from a single HTTP(S) URL. Supports GET, POST, and PUT methods, custom request headers, and any URL that returns a data file (CSV, TSV, JSON, Parquet, Excel, etc.).
Arguments:
| Argument | Required | Default | Description |
|---|---|---|---|
url |
Yes | — | The HTTP(S) URL to download |
method |
No | GET |
HTTP method: GET, POST, or PUT |
body |
No | — | Request body string, for POST/PUT |
headers |
No | — | Additional request headers, one Name: Value per line |
format |
No | auto |
File format: csv, json, jsonl, parquet, tsv, or auto |
head_supported |
No | true |
Set to false if the server rejects HEAD requests |
Format auto-detection: When format is auto (the default), the connector detects the data format using this priority:
- Content-Type header from the HTTP response (e.g.,
text/csv,application/json) - URL file extension (recognized:
.csv,.json,.jsonl,.parquet,.tsv,.xlsx) - Content inspection of the downloaded bytes
GET requests (default):
-- Download a CSV directly
CREATE SOURCE USING http WITH (url = 'https://data.mn.gov/api/lake_quality.csv')
-- API endpoint, format auto-detected from Content-Type header
CREATE SOURCE USING http WITH (url = 'https://api.example.com/data?query=results')
-- Force format when auto-detection doesn't match
CREATE SOURCE USING http WITH (url = 'https://api.example.com/data', format = 'csv')
-- Server doesn't support HEAD requests
CREATE SOURCE USING http WITH (url = 'https://data.example.gov/export.csv', head_supported = 'false')
POST and PUT requests:
Some APIs require POST or PUT to retrieve data, for example query APIs that accept filter parameters as a JSON body. Use the method and body arguments.
For multi-line JSON bodies, use dollar-quoting ($$...$$), which avoids the need for escaping:
```sql -- POST with a JSON body (dollar-quoting avoids escaping issues) CREATE SOURCE USING http WITH ( url = 'https://api.example.com/data/query', method = 'POST', body = $\({ "filters": {"state": "MN", "type": "Lake"}, "format": "csv" }\)$, headers = 'Content-Type: application/json
Accept: text/csv' )
-- POST with a form-encoded body
CREATE SOURCE USING http WITH (
url = 'https://api.example.com/export',
method = 'POST',
body = 'statecode=US%3A27&mimeType=csv',
headers = 'Content-Type: application/x-www-form-urlencoded'
)
```
Custom headers (GET with authentication):
Use headers to add authorization tokens or request specific response formats from GET endpoints:
```sql CREATE SOURCE USING http WITH ( url = 'https://api.example.com/export', headers = 'Authorization: Bearer my-api-token
Accept: text/csv' ) ```
Note
POST and PUT requests skip the HEAD probe entirely, so head_supported has no effect for non-GET methods.
JSON Options¶
When a connector fetches a JSON file, you can control how records are extracted and nested fields are flattened. The connector transforms the JSON and copies the result into the bundle as Parquet. These options work with any connector (http, remote_dir, ftp_directory, etc.).
| Option | Default | Description |
|---|---|---|
json_record_path |
(required to activate) | Dot-notation path to the array of records (e.g., "data", "results.items"). Empty string ("") means the top-level value is a bare JSON array. |
json_sep |
_ |
Separator used when flattening nested field names. With the default, user.name becomes the column user_name. |
json_meta |
(none) | Comma-separated dot-notation paths to fields in the outer object to include as extra columns on every row (e.g., "total,page"). |
Common pattern: API with response wrapper
Most REST APIs return a wrapper like {"total": 847, "data": [{...}, ...]} rather than a bare array. Use json_record_path to reach the array and json_meta to capture outer fields:
Deeply nested path:
Directory of JSON files with consistent structure:
remote_dir¶
Lists files from a local or cloud directory. Supports any URL scheme supported by the IO registry (S3, GCS, Azure, file://, etc.).
Arguments:
| Argument | Required | Description |
|---|---|---|
url |
Yes | Directory URL (e.g., s3://bucket/data/, file:///path/to/data/) |
patterns |
No | Comma-separated glob patterns (default: **/*) |
copy |
No | "true" to copy files into bundle (default), "false" to reference in place |
# S3 bucket
bundle = await bundle.create_source("remote_dir", {
"url": "s3://my-bucket/data/",
"patterns": "**/*.parquet"
})
# Local directory
bundle = await bundle.create_source("remote_dir", {
"url": "file:///data/exports/",
"patterns": "**/*.csv,**/*.parquet"
})
# Reference files in place instead of copying
bundle = await bundle.create_source("remote_dir", {
"url": "s3://my-bucket/data/",
"copy": "false"
})
# S3 bucket
bundle = bundle.create_source("remote_dir", {
"url": "s3://my-bucket/data/",
"patterns": "**/*.parquet"
})
# Local directory
bundle = bundle.create_source("remote_dir", {
"url": "file:///data/exports/",
"patterns": "**/*.csv,**/*.parquet"
})
# Reference files in place instead of copying
bundle = bundle.create_source("remote_dir", {
"url": "s3://my-bucket/data/",
"copy": "false"
})
ftp_directory¶
Lists files from an FTP server. Supports anonymous and authenticated access.
Arguments:
| Argument | Required | Description |
|---|---|---|
url |
Yes | FTP URL (e.g., ftp://user:pass@host:21/path/) |
patterns |
No | Comma-separated glob patterns (default: **/*) |
Note
Files are always copied into the bundle since FTP URLs cannot be directly referenced during query execution.
sftp_directory¶
Lists files from a remote directory via SFTP. Requires an SSH private key for authentication.
Arguments:
| Argument | Required | Description |
|---|---|---|
url |
Yes | SFTP URL (e.g., sftp://user@host:22/path/) |
key_path |
Yes | Path to SSH private key file (e.g., ~/.ssh/id_rsa) |
patterns |
No | Comma-separated glob patterns (default: **/*) |
Note
Files are always copied into the bundle since SFTP URLs cannot be directly referenced during query execution.
kaggle¶
Downloads dataset files from Kaggle via the Kaggle REST API. Discovers individual files within a dataset, downloads them as ZIP archives, and extracts the contents automatically.
Arguments:
| Argument | Required | Description |
|---|---|---|
dataset |
Yes | Dataset identifier in owner/dataset-name format (e.g., zillow/zecon) |
patterns |
No | Comma-separated glob patterns (default: **/*) |
mode |
No | Sync mode: add (default), update, or sync |
version |
No | Dataset version number to download (default: latest) |
Authentication
Kaggle credentials are configured via bundlebase configuration. Available config keys for the kaggle scope:
| Key | Description | Default |
|---|---|---|
username |
Kaggle username | from ~/.kaggle/kaggle.json |
key |
Kaggle API key | from ~/.kaggle/kaggle.json |
url |
Kaggle API base URL | https://www.kaggle.com |
If username and key are not set via bundlebase config, they fall back to the standard Kaggle credentials file at ~/.kaggle/kaggle.json. You can create this file by running kaggle CLI setup or by generating an API token from your Kaggle account settings.
Note
Files are always copied into the bundle since Kaggle files are downloaded from a remote API.
# All files from a dataset
bundle = await bundle.create_source("kaggle", {
"dataset": "zillow/zecon"
})
# Only CSV files
bundle = await bundle.create_source("kaggle", {
"dataset": "zillow/zecon",
"patterns": "*.csv"
})
# With sync mode to detect updates
bundle = await bundle.create_source("kaggle", {
"dataset": "zillow/zecon",
"mode": "update"
})
# All files from a dataset
bundle = bundle.create_source("kaggle", {
"dataset": "zillow/zecon"
})
# Only CSV files
bundle = bundle.create_source("kaggle", {
"dataset": "zillow/zecon",
"patterns": "*.csv"
})
# With sync mode to detect updates
bundle = bundle.create_source("kaggle", {
"dataset": "zillow/zecon",
"mode": "update"
})
Custom Connectors¶
Bundlebase supports custom connectors in two modes: native (in-process, zero-copy) and IPC (subprocess). Custom connectors use a two-step workflow: IMPORT CONNECTOR or IMPORT TEMP CONNECTOR, then CREATE SOURCE.
native (In-Process, Zero-Copy)¶
bundle.import_temp_connector('example.connector', 'python::example_connector:ExampleConnector')
bundle.create_source('example.connector')
IPC (Subprocess)¶
bundle.import_connector('example.connector', 'ipc::./example_connector')
bundle.create_source('example.connector')
See Custom Connectors for full command reference, runtime values, SDKs, and protocol details.
Dropping Connectors¶
To remove custom connectors, see the full command reference:
DROP CONNECTOR-- removes the connector (or just a specific platform's entry)DROP TEMP CONNECTOR-- removes runtime-only connector entries
Fetching Data¶
fetch()¶
Discovers and attaches new files from a specific pack's sources. Returns a list of FetchResults, one for each source.
Dry Run¶
Use DRY RUN (SQL) to preview what a fetch would do without actually attaching or removing any files. The returned FetchResults show what would be added, replaced, or removed.
fetch_all()¶
Discovers and attaches new files from all defined sources across all packs. Returns a list of FetchResults, one for each source (including sources with no changes).
FetchResults¶
Each FetchResults object contains:
| Property | Type | Description |
|---|---|---|
connector |
str |
Connector name (e.g., "remote_dir") |
source_id |
str |
Stable identifier for this source. Pass to DESCRIBE SOURCE for full configuration |
pack |
str |
Pack name ("base" or join name) |
added |
list[FetchedSource] |
Source locations newly added (one entry per DiscoveredLocation, not per bundle block) |
replaced |
list[FetchedSource] |
Source locations whose content changed and was re-attached |
removed |
list[str] |
Source locations no longer reported by the connector and detached |
These lists are an upper bound on the resulting block delta; MIN BATCH may merge several added/replaced source locations into a single bundle block.
Methods:
total_count()- Total number of source-location changes (added + replaced + removed)is_empty()- ReturnsTrueif no changes were made
Sources with Joins¶
You can define sources for joined packs by specifying the pack parameter.
import bundlebase as bb
# Create bundle with base data
bundle = await bb.create("my/data").attach("orders.parquet")
# Create a join for customer data
bundle = await bundle.join("customers", "bundle.customer_id = customers.id")
# Define a source for the customers pack
bundle = await bundle.create_source("remote_dir", {
"url": "s3://bucket/customers/",
"patterns": "**/*.parquet"
}, pack="customers")
# Fetch will attach files to the customers join
results = await bundle.fetch("customers", "add")
print(f"Added {len(results[0].added)} customer files")
import bundlebase.sync as bb
# Create bundle with base data
bundle = bb.create("my/data").attach("orders.parquet")
# Create a join for customer data
bundle = bundle.join("customers", "bundle.customer_id = customers.id")
# Define a source for the customers pack
bundle = bundle.create_source("remote_dir", {
"url": "s3://bucket/customers/",
"patterns": "**/*.parquet"
}, pack="customers")
# Fetch will attach files to the customers join
results = bundle.fetch("customers", "add")
print(f"Added {len(results[0].added)} customer files")
Pattern Matching¶
The patterns argument accepts comma-separated glob patterns:
| Pattern | Matches |
|---|---|
**/* |
All files recursively (default) |
*.parquet |
Parquet files in the root directory |
**/*.parquet |
Parquet files in any subdirectory |
**/*.csv,**/*.parquet |
CSV and Parquet files |
2024/**/*.parquet |
Parquet files under the 2024 directory |
Workflow Example¶
A typical workflow for incrementally loading data:
import bundlebase as bb
# Initial setup
bundle = await (bb.create("sales/data")
.create_source("remote_dir", {
"url": "s3://company/sales/",
"patterns": "**/*.parquet"
}))
# Initial load
results = await bundle.fetch("base", "add")
total_added = sum(len(r.added) for r in results)
print(f"Initial load: {total_added} files")
await bundle.commit("Initial data load")
# ... time passes, new files appear in S3 ...
# Incremental load (only attaches new files)
bundle = (await bb.open("sales/data")).extend()
results = await bundle.fetch("base", "add")
total_added = sum(len(r.added) for r in results)
if total_added > 0:
print(f"Loaded {total_added} new files")
await bundle.commit("Incremental data load")
import bundlebase.sync as bb
# Initial setup
bundle = (bb.create("sales/data")
.create_source("remote_dir", {
"url": "s3://company/sales/",
"patterns": "**/*.parquet"
}))
# Initial load
results = bundle.fetch("base", "add")
total_added = sum(len(r.added) for r in results)
print(f"Initial load: {total_added} files")
bundle.commit("Initial data load")
# ... time passes, new files appear in S3 ...
# Incremental load (only attaches new files)
bundle = bb.open("sales/data").extend()
results = bundle.fetch("base", "add")
total_added = sum(len(r.added) for r in results)
if total_added > 0:
print(f"Loaded {total_added} new files")
bundle.commit("Incremental data load")