Python SDK¶
Build custom Bundlebase connectors in Python.
Installation¶
Requires pyarrow as a peer dependency.
Required Configuration
Python connectors require the allow_external_code configuration setting. See Configuration for details.
Quick Start¶
# example_connector.py
import pyarrow as pa
from bundlebase_sdk import Connector, Location, serve
class ExampleConnector(Connector):
def discover(self, attached_locations, **kwargs):
return [Location("data.parquet", format="parquet", version="v1")]
def data(self, location, **kwargs):
return pa.table({"id": [1, 2, 3], "value": ["a", "b", "c"]})
if __name__ == "__main__":
serve(ExampleConnector())
Use with:
b.import_temp_connector('example.connector', 'ipc::example_connector.py')
b.create_source('example.connector')
API Reference¶
Connector¶
Abstract base class. Subclass and implement discover() and data(). Optionally override stable_url().
discover(attached_locations, **kwargs) -> list[Location]¶
Return the available data locations.
| Parameter | Type | Description |
|---|---|---|
attached_locations |
list[str] |
Locations already attached to the bundle |
**kwargs |
str |
Extra arguments from the source configuration |
Returns: List of Location objects.
data(location, **kwargs) -> data¶
Return data for the given location.
| Parameter | Type | Description |
|---|---|---|
location |
Location |
The location to fetch data for |
**kwargs |
str |
Extra arguments from the source configuration |
Returns: One of the supported data return types.
stable_url(location, **kwargs) -> StableUrl | None¶
Return a stable URL for the given location, if available. Default returns None.
| Parameter | Type | Description |
|---|---|---|
location |
Location |
The location to get a URL for |
**kwargs |
str |
Extra arguments from the source configuration |
Returns: A StableUrl or None.
Location¶
Location(
location="path/to/file.parquet", # identifier (required)
must_copy=True, # copy into bundle? (default: True)
format="parquet", # file format (default: "parquet")
version="v1", # for change detection (default: "")
)
| Field | Type | Default | Description |
|---|---|---|---|
location |
str |
(required) | Identifier for this data file |
must_copy |
bool |
True |
Whether the data must be copied into the bundle |
format |
str |
"parquet" |
File format |
version |
str |
"" |
Version string for change detection |
StableUrl¶
| Field | Type | Description |
|---|---|---|
url |
str |
The stable URL string |
serve()¶
Run the connector as a JSON-RPC subprocess. Reads requests from stdin and writes responses to stdout. This is the main entry point for IPC connector scripts.
Data Return Types¶
The data() method supports several return types:
| Return Type | Description |
|---|---|
pa.Table |
PyArrow Table (most common) |
pa.RecordBatch |
Single record batch |
list[pa.RecordBatch] |
Multiple batches (streaming) |
list[dict] |
List of row dicts (requires schema()) |
dict[str, list] |
Column-oriented dict (requires schema()) |
Iterator[dict] |
Lazy iterator of dicts (requires schema()) |
None |
No data for this location |
Schema-Driven Connectors¶
Instead of constructing PyArrow objects directly, you can define a schema() method with simple Python type strings and return plain Python data structures. The SDK handles Arrow conversion automatically.
Defining a Schema¶
Override schema() to return a dict mapping column names to type strings:
class MyConnector(Connector):
def schema(self):
return {"name": "Utf8", "age": "Int32", "score": "Float64", "active": "Boolean"}
Supported type strings:
| Type String | Arrow Type | Aliases |
|---|---|---|
Utf8 |
pa.string() |
string |
Int64 |
pa.int64() |
int |
Int8, Int16, Int32 |
pa.int8(), etc. |
|
UInt8, UInt16, UInt32, UInt64 |
pa.uint8(), etc. |
|
Float64 |
pa.float64() |
float, double |
Float16, Float32 |
pa.float16(), etc. |
|
Boolean |
pa.bool_() |
bool |
Date32 |
pa.date32() |
date |
Date64 |
pa.date64() |
|
Timestamp |
pa.timestamp("us") |
|
Binary |
pa.binary() |
bytes |
Returning Column-Oriented Dicts¶
With a schema defined, you can return data as dict[str, list]:
class MyConnector(Connector):
def schema(self):
return {"name": "Utf8", "age": "Int32"}
def discover(self, attached_locations, **kwargs):
return [Location("people")]
def data(self, location, **kwargs):
return {"name": ["alice", "bob"], "age": [30, 25]}
Returning Row-Oriented Dicts¶
You can also return list[dict] — the schema ensures correct types:
def data(self, location, **kwargs):
return [{"name": "alice", "age": 30}, {"name": "bob", "age": 25}]
A schema() method is required when returning dict data (list[dict], dict[str, list], or iterator of dicts). Returning dict data without a schema raises a ValueError.
Full Schema Example¶
from bundlebase_sdk import Connector, Location, serve
class SensorConnector(Connector):
def schema(self):
return {"sensor_id": "Utf8", "temperature": "Float32", "reading_count": "Int32"}
def discover(self, attached_locations, **kwargs):
return [Location("readings")]
def data(self, location, **kwargs):
return {
"sensor_id": ["s1", "s2", "s3"],
"temperature": [22.5, 19.8, 25.1],
"reading_count": [100, 85, 120],
}
if __name__ == "__main__":
serve(SensorConnector())
Complete Example¶
A source that discovers multiple locations, returns multi-batch data, provides stable URLs, and handles extra arguments:
import pyarrow as pa
from bundlebase_sdk import Connector, Location, StableUrl, serve
class DatabaseSource(Connector):
def discover(self, attached_locations, **kwargs):
db_host = kwargs.get("db_host", "localhost")
return [
Location("users.parquet", must_copy=True, format="parquet", version="v2"),
Location("orders.parquet", must_copy=True, format="parquet", version="v1"),
]
def data(self, location, **kwargs):
if location.location == "users.parquet":
# Return multiple batches for large datasets
batch1 = pa.record_batch(
{"id": [1, 2], "name": ["alice", "bob"]},
schema=pa.schema([("id", pa.int64()), ("name", pa.string())]),
)
batch2 = pa.record_batch(
{"id": [3], "name": ["charlie"]},
schema=pa.schema([("id", pa.int64()), ("name", pa.string())]),
)
return [batch1, batch2]
elif location.location == "orders.parquet":
return pa.table({
"order_id": [101, 102],
"user_id": [1, 2],
"amount": [29.99, 49.99],
})
return None
def stable_url(self, location, **kwargs):
if location.location == "users.parquet":
return StableUrl("https://db.example.com/exports/users-v2.parquet")
return None
if __name__ == "__main__":
serve(DatabaseSource())
Testing¶
The SDK exposes _serve() which accepts explicit IO streams, letting you test your connector without launching a subprocess:
import io
import json
import struct
import pyarrow as pa
from bundlebase_sdk import Connector, Location
from bundlebase_sdk.serve import _serve
class ExampleConnector(Connector):
def discover(self, attached_locations, **kwargs):
return [Location("test.parquet")]
def data(self, location, **kwargs):
return pa.table({"x": [1, 2, 3]})
def make_request(method, params=None, req_id=1):
req = {"jsonrpc": "2.0", "id": req_id, "method": method, "params": params or {}}
return json.dumps(req).encode() + b"\n"
def test_discover():
stdin = io.BytesIO(
make_request("discover", {"attached_locations": []}, req_id=1)
+ make_request("shutdown", req_id=2)
)
stdout = io.BytesIO()
_serve(ExampleConnector(), stdin, stdout)
resp = json.loads(stdout.getvalue().split(b"\n")[0])
assert len(resp["result"]["locations"]) == 1
assert resp["result"]["locations"][0]["location"] == "test.parquet"
def test_data():
stdin = io.BytesIO(
make_request("data", {"location": {"location": "test.parquet"}}, req_id=1)
+ make_request("shutdown", req_id=2)
)
stdout = io.BytesIO()
_serve(ExampleConnector(), stdin, stdout)
out = stdout.getvalue()
newline_idx = out.index(b"\n") + 1
length = struct.unpack(">I", out[newline_idx:newline_idx + 4])[0]
assert length > 0
ipc_data = out[newline_idx + 4:newline_idx + 4 + length]
table = pa.ipc.open_stream(ipc_data).read_all()
assert table.num_rows == 3
Native Mode¶
Python sources can run in-process for zero-copy Arrow transfer, eliminating subprocess overhead:
import bundlebase.sync as bb
from example_connector import ExampleConnector
bundle = bb.create("my/data")
bundle.import_temp_connector('example.connector', 'python::example_connector:ExampleConnector')
bundle.create_source('example.connector')
bundle.fetch("base", "add")
The same Connector class works for both native and IPC mode — no code changes needed. The only difference is how you create the connector:
| Mode | Registration | Data Transfer |
|---|---|---|
| Native | import_temp_connector(..., 'python::module:Class') |
Zero-copy via PyO3 |
| IPC | import_temp_connector(..., 'ipc::script.py') |
Serialized Arrow IPC over pipes |
Extra Arguments¶
Pass extra arguments when creating the source:
These are forwarded to your discover() and data() methods as **kwargs, just like in IPC mode.
When to Use Native vs IPC¶
Use native (runtime='python') when:
- Your source is part of the same Python project
- You need maximum performance for large datasets
- You want the simplest possible setup
Use IPC (runtime='ipc') when:
- Your source runs as a standalone script
- You want process isolation (source crashes don't affect Bundlebase)
- You're packaging your connector as a Docker image
See Native Mode for the full overview.
Error Handling¶
Exceptions raised in your discover(), data(), or stable_url() methods are caught by the SDK and returned as JSON-RPC error responses with code -32000. The exception message is included in the error:
class ExampleConnector(Connector):
def data(self, location, **kwargs):
raise ValueError("Database connection failed")
# Bundlebase receives: {"error": {"code": -32000, "message": "Database connection failed"}}
Bundlebase surfaces these errors to the user as connector failures during FETCH.