Use Case: ETL / Periodic Data Collection¶
The problem¶
You need to pull data from an external source regularly — a vendor drops CSV files on an SFTP server every night, a government API publishes a monthly export, a partner sends Parquet files to an S3 bucket. You write a script: connect to SFTP, list new files, download them, normalize the columns, load into your system. It works.
Then three months later the vendor changes their column names. Or adds a subfolder. Or the SFTP credentials rotate. Or you need to add a second vendor with a slightly different format. The custom logic accumulates.
Bundlebase flips the model. Instead of custom logic that knows about SFTP internals and then hands data to downstream consumers, you define the source once in a bundle. The bundle knows how to fetch; downstream systems just call bb.open(). The upstream plumbing is isolated in one place and the downstream interface never changes.
The scenario¶
A vendor delivers daily sales CSVs to an SFTP server. The files land in /outbound/sales/ with names like sales_2026-04-04.csv. Your data science team and a reporting dashboard both need this data. You want:
- Automatic pickup of new files each day
- Consistent column names regardless of what the vendor sends
- Downstream consumers that don't know or care that SFTP is involved
Step 1: Define the bundle and its source (once)¶
import bundlebase.sync as bb
bundle = (bb.create("s3://team-data/vendor-sales")
.set_name("Vendor Sales — Daily Feed")
.set_description(
"Daily CSV feed from Vendor A via SFTP. "
"Normalized columns, filtered to completed transactions. "
"Downstream consumers use bb.open() — SFTP details are internal."
)
.create_source("sftp_directory", {
"url": "sftp://vendor.example.com/outbound/sales/",
"patterns": "sales_*.csv",
"username": "pipeline-user",
"key_path": "/etc/keys/vendor_rsa"
})
.normalize_column_names()
.filter("status = 'completed'")
.drop_column("internal_vendor_code"))
# First fetch — picks up all existing files
bundle.fetch("base", "add")
bundle.commit("Initial load — all historical files")
print(f"Loaded: {bundle.num_rows:,} rows")
CREATE 's3://team-data/vendor-sales';
SET NAME 'Vendor Sales — Daily Feed';
SET DESCRIPTION 'Daily CSV feed from Vendor A via SFTP. Normalized columns, filtered to completed transactions. Downstream consumers use OPEN — SFTP details are internal.';
CREATE SOURCE FOR base USING sftp_directory WITH (url = 'sftp://vendor.example.com/outbound/sales/', patterns = 'sales_*.csv', username = 'pipeline-user', key_path = '/etc/keys/vendor_rsa');
NORMALIZE COLUMN NAMES;
FILTER WITH SELECT * FROM bundle WHERE status = 'completed';
DROP COLUMN internal_vendor_code;
FETCH base ADD;
COMMIT 'Initial load — all historical files';
SHOW STATUS;
The normalize_column_names() and filter() calls are part of the bundle definition — they run every time data is fetched. If the vendor changes Status to status or STATUS, it doesn't matter. The downstream schema stays consistent.
Step 2: Daily fetch (the only recurring step)¶
The daily job is three lines:
#!/usr/bin/env python3
# scripts/fetch_vendor_sales.py
import bundlebase.sync as bb
from datetime import date
bundle = bb.open("s3://team-data/vendor-sales").extend()
bundle.fetch("base", "add") # only pulls files not already in the bundle
bundle.commit(f"Daily fetch — {date.today()}")
print(f"Done: {bundle.num_rows:,} total rows at version {bundle.version}")
fetch("base", "add") is idempotent: it checks which files are already attached and skips them. Run it twice and nothing changes. If today's file hasn't arrived yet, nothing happens — run it again later.
Step 3: Downstream consumers don't change¶
Your data science team:
Your reporting dashboard (using the CLI SQL server):
A BI tool connecting over SQL gets the same data the same way. Neither consumer knows the data comes from SFTP. If you switch vendors, update the source definition in the bundle — the downstream interface is unchanged.
Handling multiple upstream sources¶
When you add a second vendor with a different format, you extend the same bundle rather than creating a parallel pipeline:
bundle = bb.open("s3://team-data/vendor-sales").extend()
# Add a second vendor source — different SFTP, different format
bundle.create_source("sftp_directory", {
"url": "sftp://vendor-b.example.com/exports/",
"patterns": "*.parquet",
"username": "pipeline-b",
"key_path": "/etc/keys/vendor_b_rsa"
}, name="vendor_b")
bundle.fetch("vendor_b", "add")
bundle.commit("Added Vendor B source")
Both sources feed the same bundle. Downstream consumers still call bb.open("s3://team-data/vendor-sales") and get unified data — they don't need to know a second vendor was added.
The abstraction it provides¶
┌─────────────────────────────────────┐
Upstream sources │ SFTP (Vendor A) S3 (Vendor B) │
│ HTTP API FTP archive │
└──────────────┬──────────────────────┘
│ bundle.fetch()
┌──────────────▼──────────────────────┐
│ Bundle │
│ normalized schema · version history │
│ transformation record · metadata │
└──────────────┬──────────────────────┘
│ bb.open()
┌──────────────▼──────────────────────┐
Downstream │ pandas polars SQL Flight numpy │
consumers └─────────────────────────────────────┘
Upstream complexity is encapsulated in the bundle. Downstream simplicity is guaranteed by the interface.
What you get that custom ETL scripts don't¶
| Concern | Custom scripts | Bundlebase |
|---|---|---|
| Idempotent re-runs | Write your own dedup logic | fetch("add") skips known files |
| Schema normalization | Code in the script | Committed into the bundle definition |
| Audit log | Log files (if they exist) | Commit history on the bundle |
| Downstream interface | Changes when upstream changes | Always bb.open() |
| Adding a new source | New script, new pipeline | Add a source, extend the bundle |
| Consumer stack lock-in | Consumers depend on your script's output format | pandas, polars, SQL, SQL |
Scheduling the fetch¶
# crontab — run every day at 6am
0 6 * * * /usr/bin/python3 /opt/pipeline/scripts/fetch_vendor_sales.py >> /var/log/vendor-sales.log 2>&1
Or in whatever orchestrator you already use — the script is simple enough that it doesn't need a framework.
Next steps¶
- Sources — full connector reference: SFTP, S3 remote_dir, FTP, HTTP, Kaggle, custom
- Data Engineer use case — multi-source pipelines with multiple fetch strategies
- Backend Developer use case — publishing API data the same way
- Versioning — reset, undo, and auditing what changed