Custom Connectors¶
Custom connectors let you write data providers in any language. The runtime parameter determines how Bundlebase loads and communicates with your connector:
| Type | How It Works | Performance | Languages |
|---|---|---|---|
python |
In-process via PyO3 | Zero-copy Arrow | Python |
ffi |
In-process via dlopen of a shared library |
Zero-copy Arrow | Rust, Go, Java |
java |
Subprocess via java -jar |
Serialized Arrow IPC | Java |
docker |
Subprocess via docker run |
Serialized Arrow IPC | Any language |
ipc |
Subprocess via direct command execution | Serialized Arrow IPC | Any language |
Internally, python and ffi run in-process (native mode) for zero-copy Arrow transfer. java, docker, and ipc run as subprocesses communicating over stdin/stdout.
Your source code is the same regardless of type — only the entry point differs. SDKs for Python, Go, Java, and Rust handle the protocol automatically.
Runtime URI Format Reference¶
When importing a connector, the FROM clause uses a runtime::entrypoint URI. This table shows the format for each runtime:
| Runtime | URI Format | Example |
|---|---|---|
ipc |
ipc::command |
ipc::python my_module.py |
ffi |
ffi::path[:symbol] |
ffi::libexample.so:my_func |
python |
python::module:Class |
python::my_source:MyConnector |
java |
java::path.jar |
java::connectors/my.jar |
docker |
docker::image:tag |
docker::myorg/myconnector:latest |
Built-in Connectors¶
Bundlebase includes several built-in connectors that don't need to be imported:
| Connector | Description |
|---|---|
remote_dir |
Files from a remote directory (S3, HTTP, etc.) |
ftp_directory |
Files from an FTP server |
sftp_directory |
Files from an SFTP server |
kaggle |
Datasets from Kaggle |
web_scrape |
Data scraped from web pages |
postgres |
Data from a PostgreSQL database |
Use these directly with CREATE SOURCE — no IMPORT CONNECTOR step needed.
Overview¶
Custom connectors use a simple workflow:
- Load the connector — defines a named connector
- Create a source — creates a source instance from the connector
- Fetch data — discovers and attaches data from the source
To remove or rename connectors:
- Drop connector — removes the connector (or just a specific platform's entry)
- Drop temp connector — removes runtime-only connector entries
- Rename connector — renames a connector and updates associated sources
- Rename temp connector — renames runtime-only connector entries
Choosing a Runtime¶
Use python when:
- Your source is a Python class in the same project
- You need maximum performance with zero serialization overhead
- Note: requires
IMPORT TEMP CONNECTORsince Python code can't be bundled
Use ffi when:
- You have a compiled shared library (
.so/.dylib/.dll) from Rust, Go, or Java - You need zero-copy performance with a portable, bundled source
Use java, docker, or ipc when:
- You want process isolation (source crashes don't affect Bundlebase)
- You're packaging your connector as a Docker image (
docker) - You're running a Java JAR (
java) - You're running any other executable (
ipc)
Configuration¶
External Code Execution
Custom connectors that execute external code (Python native connectors, shared libraries, IPC subprocesses) require the allow_external_code configuration setting:
Without this, CREATE SOURCE will fail with "External code execution is disabled".
Commands¶
IMPORT CONNECTOR¶
Creates a named connector. The connector definition is persisted into the bundle's commit history, making the bundle portable.
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
name |
str |
(required) | Dot-separated connector name (e.g., "acme.weather") |
from_ |
str |
(required) | Runtime URI in runtime::entrypoint format (e.g., "ipc::my_connector", "python::my_module:MyConnector") |
platform |
str |
"*/*" |
Target platform (e.g., "linux/amd64", "darwin/arm64", "*/*" for all) |
Note
IMPORT CONNECTOR rejects runtime='python' because Python code cannot be serialized into the bundle. Use IMPORT TEMP CONNECTOR for Python connectors.
Connector names use a dot-separated namespace format. The part before the first dot is the namespace (e.g., acme in acme.weather). Choose a namespace that is unique to you or your organization — this prevents naming collisions when sharing bundles. For example:
mycompany.sales.crm— "mycompany" namespacejdoe.weather.noaa— personal namespaceacme.weather— organization namespace
The name must contain exactly one dot.
You can call IMPORT CONNECTOR multiple times for different platforms on the same connector — the last call for a given platform wins. At runtime, Bundlebase selects the best match for the current OS/architecture.
IMPORT TEMP CONNECTOR¶
Creates a connector at runtime only — nothing is persisted into the bundle. Use this for runtime='python' in-process connectors. Works on both Bundle (read-only) and BundleBuilder.
Parameters: Same as IMPORT CONNECTOR, but the from_ parameter accepts all runtimes including python.
Temporary connectors take precedence over persisted connectors when both exist for the same platform. This is useful for development workflows where you want to test a Python connector locally before packaging it as a shared library or Docker image.
CREATE SOURCE¶
Creates a source instance from a connector. For built-in connectors (remote_dir, kaggle, etc.), this is a single step. For custom connectors, you must first IMPORT CONNECTOR or IMPORT TEMP CONNECTOR.
# Custom connector (no extra args)
bundle = await bundle.create_source('acme.weather')
# Custom connector with extra args forwarded to discover/data
bundle = await bundle.create_source('acme.weather', {
'region': 'us-east'
})
# Built-in connector
bundle = await bundle.create_source('remote_dir', {
'url': 's3://bucket/data/',
'patterns': '**/*.parquet'
})
# Source on a specific pack
bundle = await bundle.create_source('remote_dir', {
'url': 's3://bucket/customers/'
}, pack='customers')
# Custom connector (no extra args)
bundle.create_source('acme.weather')
# Custom connector with extra args forwarded to discover/data
bundle.create_source('acme.weather', {
'region': 'us-east'
})
# Built-in connector
bundle.create_source('remote_dir', {
'url': 's3://bucket/data/',
'patterns': '**/*.parquet'
})
# Source on a specific pack
bundle.create_source('remote_dir', {
'url': 's3://bucket/customers/'
}, pack='customers')
-- Custom connector (no extra args)
CREATE SOURCE USING acme.weather
-- Custom connector with extra args
CREATE SOURCE USING acme.weather WITH (region = 'us-east')
-- Built-in connector
CREATE SOURCE USING remote_dir WITH (url = 's3://bucket/data/', patterns = '**/*.parquet')
-- Source on a specific pack
CREATE SOURCE FOR customers USING remote_dir WITH (url = 's3://bucket/customers/')
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
connector |
str |
(required) | Connector name — either a built-in name or a dot-separated custom connector name |
args |
dict |
{} |
Key-value arguments passed to the connector. For custom connectors, these are forwarded as extra arguments to discover(), data(), and stable_url() |
pack |
str |
"base" |
Which pack to attach discovered files to |
FETCH¶
Discovers and attaches new files from sources. Returns a list of FetchResults, one per source. See Data Sources for full details on fetch modes and results.
Fetch modes: add (only add new files), update (add new + update changed), sync (add + update + remove missing).
DROP CONNECTOR¶
Removes a connector. Without a platform, removes the entire connector definition and all its entries (persisted and temporary) and source instances. With a platform, removes only the connector for that platform.
Like other drop operations, the bundled artifacts are not deleted — a DropConnectorOp is added to the history.
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
name |
str |
(required) | Dot-separated connector name |
platform |
str |
None |
If specified, only drop the connector for this platform. If None, drops the entire connector. |
DROP TEMP CONNECTOR¶
Removes runtime-only connector entries. Works on both Bundle and BundleBuilder.
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
name |
str |
(required) | Dot-separated connector name |
platform |
str |
None |
If specified, only drop the connector for this platform. If None, drops all platforms. |
Returns: The number of connector entries removed.
RENAME CONNECTOR¶
Renames a connector definition to a new dotted name. All platform entries are renamed, and any sources referencing the old connector name are automatically updated.
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
old_name |
str |
(required) | Current dot-separated connector name |
new_name |
str |
(required) | New dot-separated connector name |
RENAME TEMP CONNECTOR¶
Renames runtime-only connector entries to a new name. Only temporary entries are renamed; persistent entries are not affected.
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
old_name |
str |
(required) | Current dot-separated connector name |
new_name |
str |
(required) | New dot-separated connector name |
How It Works¶
Native Mode¶
Python: Source objects are called directly in-process via PyO3 — no subprocess, no serialization.
Compiled languages: Build a shared library (.so/.dylib/.dll) exporting the C ABI. Bundlebase dlopens it and uses the Arrow C Data Interface.
IPC Mode¶
An IPC connector runs as a subprocess that Bundlebase launches and communicates with over stdin/stdout:
- Discover — Bundlebase sends a
discovercall. Your source returns a list of available data locations. - Data — For each location, Bundlebase sends a
datacall. Your source returns Arrow record batches. - Stable URL (optional) — Bundlebase may send a
stable_urlcall to check if a location has a cached URL. - Shutdown — Bundlebase sends a
shutdowncall and the subprocess exits.
Key Concepts¶
Location¶
A Location represents a discovered data file. Every SDK provides this type with the same fields:
| Field | Type | Default | Description |
|---|---|---|---|
location |
string | (required) | Identifier for the data file (e.g., "data/file1.parquet") |
must_copy |
bool | true |
Whether the data must be copied into the bundle |
format |
string | "parquet" |
File format hint |
version |
string | "" |
Version string for change detection |
StableUrl¶
A StableUrl contains a single url field. When provided, Bundlebase can cache the data at that URL and skip re-fetching on subsequent runs if the URL hasn't changed.
Extra Arguments¶
Any extra key-value arguments passed in the CREATE SOURCE configuration are forwarded to your discover, data, and stable_url methods. This lets you parameterize your connector without changing code.
Runtime Values¶
The runtime parameter determines how Bundlebase loads and runs the connector:
| Runtime | Mode | entrypoint value |
What happens |
|---|---|---|---|
python |
Native (in-process) | module:Class |
Imports the Python class via PyO3 and calls it directly |
ffi |
Native (in-process) | Path to .so/.dylib/.dll |
dlopens the shared library and uses Arrow C Data Interface |
java |
IPC (subprocess) | Path to JAR file | Runs java -jar <entrypoint> as a subprocess |
docker |
IPC (subprocess) | Docker image name | Runs docker run -i --rm <entrypoint> as a subprocess |
ipc |
IPC (subprocess) | Command to run | Executes <entrypoint> directly (whitespace-split) as a subprocess |
Docker Connectors¶
Package any connector as a Docker image:
FROM python:3.13-slim
RUN pip install bundlebase-sdk pyarrow
COPY example_connector.py /app/example_connector.py
CMD ["python", "/app/example_connector.py"]
Use with IMPORT CONNECTOR:
bundle.import_connector('example.connector', 'docker::myorg/example-connector:latest')
bundle.create_source('example.connector')
The container receives JSON-RPC on stdin and writes responses to stdout.
Getting Started¶
Scaffold a new project
Use the bundlebase init-sdk command to generate a new connector or function project with all the boilerplate:
# Scaffold a Python connector project
bundlebase init-sdk python my_connector --type connector
# Scaffold a Go function project
bundlebase init-sdk go my_functions --type function
# Scaffold both connector and function
bundlebase init-sdk rust my_project --type both
Supported languages: python, go, java, rust.
SDK Quick Start¶
Each SDK handles the protocol for you. Implement the connector interface and choose your entry point — serve() for IPC mode or the native export for zero-copy mode.
from bundlebase_sdk import Connector, Location, serve
import pyarrow as pa
class ExampleConnector(Connector):
def discover(self, attached_locations, **kwargs):
return [Location("data.parquet", format="parquet", version="v1")]
def data(self, location, **kwargs):
return pa.table({"id": [1, 2, 3], "value": ["a", "b", "c"]})
if __name__ == "__main__":
serve(ExampleConnector())
See the Python SDK reference for full API details.
type ExampleConnector struct{}
func (s *ExampleConnector) Discover(attached []string, args map[string]string) ([]sdk.Location, error) {
return []sdk.Location{
{Location: "data.parquet", MustCopy: true, Format: "parquet", Version: "v1"},
}, nil
}
func (s *ExampleConnector) Data(loc sdk.Location, args map[string]string) ([]arrow.Record, error) {
// Build and return Arrow records
}
func main() { sdk.Serve(&ExampleConnector{}) }
See the Go SDK reference for full API details.
public class ExampleConnector implements Connector {
public List<Location> discover(List<String> attached, Map<String, String> args) {
return List.of(new Location("data.parquet", true, "parquet", "v1"));
}
public VectorSchemaRoot data(Location loc, Map<String, String> args) {
// Build and return Arrow VectorSchemaRoot
}
public static void main(String[] args) { Serve.run(new ExampleConnector()); }
}
See the Java SDK reference for full API details.
struct ExampleConnector;
impl Connector for ExampleConnector {
fn discover(&self, _attached: &[String], _args: &HashMap<String, String>)
-> Result<Vec<Location>, Box<dyn std::error::Error>> {
Ok(vec![Location { location: "data.parquet".into(), ..Location::new("data.parquet") }])
}
fn data(&self, _location: &Location, _args: &HashMap<String, String>)
-> Result<Option<Vec<RecordBatch>>, Box<dyn std::error::Error>> {
// Build and return Arrow RecordBatches
}
}
fn main() { bundlebase_sdk::serve(&ExampleConnector); }
See the Rust SDK reference for full API details.
Protocol Reference¶
For implementing connectors in languages without an SDK.
Transport: Line-delimited JSON-RPC 2.0 on stdin/stdout.
Health Check¶
ping — Returns "pong". Used by Bundlebase to verify the subprocess is alive and responsive. All SDKs handle this automatically.
Reserved Argument Keys¶
The args map passed to discover, data, and stable_url may contain reserved keys prefixed with _:
_columns— A comma-separated list of column names that the caller wants. Connectors that support column pushdown can parse this key to return only the requested columns, reducing data transfer. It is safe to ignore this key.
Methods¶
discover — Returns available locations.
Request params: {"attached_locations": ["loc1", ...], ...extra_args}
Response: {"locations": [{"location": "...", "must_copy": true, "format": "parquet", "version": "v1"}, ...]}
data — Returns data for a location.
Request params: {"location": {"location": "...", "must_copy": true, "format": "...", "version": "..."}, ...extra_args}
Response: {"ok": true} followed by a length-prefixed Arrow IPC frame.
stable_url — Returns a stable URL (optional).
Response: {"url": "https://..."} or null.
shutdown — Clean exit.
Response: {"ok": true}, then exit.
Arrow IPC Framing¶
After the data JSON response line, write:
- 4 bytes: Big-endian
u32length of the IPC data - N bytes: Arrow IPC stream bytes
Write a zero-length prefix (\x00\x00\x00\x00) for no data.
Error Handling¶
Return JSON-RPC errors for failures:
Standard codes: -32601 (method not found), -32000 (application error).