Data Ingestion¶
Attach vs. source¶
Use attach when you know the exact file to add right now.
Use create_source + fetch when you want a bundle to stay in sync with a location over time — the source definition is stored in the bundle and fetch discovers what's new.
HTTP sources¶
import bundlebase.sync as bb
bundle = bb.create("s3://my-bucket/api-data")
# JSON API
bundle.create_source("http", {
"url": "https://api.example.com/v2/records",
"format": "json",
"json_record_path": "data" # path to the records array in the response
})
bundle.fetch("base", "add")
bundle.commit("Initial fetch")
HTTP POST with authentication¶
import json
bundle.create_source("http", {
"url": "https://api.example.com/v2/export",
"method": "POST",
"body": json.dumps({"since": "2026-01-01", "format": "json"}),
"headers": "Authorization: Bearer TOKEN\nContent-Type: application/json",
"json_record_path": "results"
})
bundle.fetch("base", "add")
S3 sources with glob patterns¶
SFTP sources¶
Fetch modes¶
| Mode | What it does |
|---|---|
add |
Add files not already in the bundle — idempotent, safe to run repeatedly |
update |
Add new files and refresh files that have changed |
sync |
Full replacement — remove files no longer in the source, add new ones |
Preview what a fetch would do before running it:
Multiple named sources¶
import bundlebase.sync as bb
bundle = bb.create("s3://analytics/pipeline")
# Primary event data from S3
bundle.create_source("remote_dir", {
"url": "s3://raw-data/events/",
"patterns": "**/*.parquet"
}) # defaults to name="base"
# Reference taxonomy from internal API
bundle.create_source("http", {
"url": "https://api.internal/taxonomy",
"format": "json",
"json_record_path": "types"
}, name="ref")
# Vendor supplement from SFTP
bundle.create_source("sftp_directory", {
"url": "sftp://vendor.example.com/outbound/",
"patterns": "*.csv",
"username": "pipeline-user",
"key_path": "/etc/keys/vendor_rsa"
}, name="vendor")
bundle.fetch("base", "add")
bundle.fetch("ref", "sync")
bundle.fetch("vendor", "add")
bundle.commit("Initial pipeline load")
CREATE 's3://analytics/pipeline';
CREATE SOURCE FOR base USING remote_dir WITH (
url = 's3://raw-data/events/',
patterns = '**/*.parquet'
);
CREATE SOURCE FOR ref USING http WITH (
url = 'https://api.internal/taxonomy',
format = 'json',
json_record_path = 'types'
);
CREATE SOURCE FOR vendor USING sftp_directory WITH (
url = 'sftp://vendor.example.com/outbound/',
patterns = '*.csv',
username = 'pipeline-user',
key_path = '/etc/keys/vendor_rsa'
);
FETCH base ADD;
FETCH ref SYNC;
FETCH vendor ADD;
COMMIT 'Initial pipeline load';
Incremental daily run¶
Once sources are defined, subsequent runs are just fetch + commit: