Skip to content

Sync API

The sync API provides a synchronous interface perfect for scripts and Jupyter notebooks. No async/await required!

Overview

The sync API is available in the bundlebase.sync module and wraps the async API with automatic event loop management:

import bundlebase.sync as dc

# No await needed!
c = dc.create()
c.attach("data.parquet")
df = c.to_pandas()

Factory Functions

create

create(path: str = '', config: Optional[Any] = None) -> SyncBundleBuilder

Create a new Bundle synchronously.

Creates an empty bundle at the specified path. Use attach() to add data.

PARAMETER DESCRIPTION
path

Optional path for bundle storage (default: random memory location)

TYPE: str DEFAULT: ''

config

Optional configuration dict for cloud storage settings

TYPE: Optional[Any] DEFAULT: None

RETURNS DESCRIPTION
SyncBundleBuilder

SyncBundleBuilder ready for immediate use

Example

import bundlebase.sync as bb c = bb.create() c.attach("data.parquet") df = c.to_pandas()

With config:

config = {"region": "us-west-2"} c = dc.create("s3://bucket/", config=config)

RAISES DESCRIPTION
ValueError

If path is invalid

open

open(path: str, config: Optional[Any] = None) -> SyncBundle

Open an existing Bundle synchronously.

Loads a previously saved bundle from disk.

PARAMETER DESCRIPTION
path

Path to the saved bundle

TYPE: str

config

Optional configuration dict for cloud storage settings

TYPE: Optional[Any] DEFAULT: None

RETURNS DESCRIPTION
SyncBundle

SyncBundle (read-only) with the loaded operations

Example

import bundlebase.sync as bb c = bb.open("/path/to/bundle") df = c.to_pandas()

With config:

config = {"region": "us-west-2"} c = dc.open("s3://bucket/container", config=config)

RAISES DESCRIPTION
ValueError

If bundle cannot be loaded

Core Classes

SyncBundle

Read-only bundle class returned by bundlebase.sync.open().

SyncBundle

SyncBundle(async_bundle: Any)

Synchronous wrapper for PyBundle (read-only).

Provides a synchronous interface to immutable Bundle operations. All async operations are automatically executed synchronously.

PARAMETER DESCRIPTION
async_bundle

The underlying PyBundle instance

TYPE: Any

schema property

schema: Any

Get the schema of the bundle.

RETURNS DESCRIPTION
Any

PySchema object representing the current column structure

name property

name: Optional[str]

Get the bundle name.

RETURNS DESCRIPTION
Optional[str]

Bundle name or None if not set

description property

description: Optional[str]

Get the bundle description.

RETURNS DESCRIPTION
Optional[str]

Bundle description or None if not set

version property

version: str

Get the bundle version.

RETURNS DESCRIPTION
str

12-character hex version string

url property

url: str

Get the bundle URL/path.

RETURNS DESCRIPTION
str

Bundle storage location

history

history() -> List[Any]

Get the commit history of the bundle.

RETURNS DESCRIPTION
List[Any]

List of commit objects with metadata

status

status() -> List[Any]

Get the list of changes added since bundle creation/extension.

RETURNS DESCRIPTION
List[Any]

List of PyChange objects representing uncommitted operations

num_rows

num_rows() -> int

Get the number of rows in the bundle.

explain

explain(verbose: bool = False, analyze: bool = False, format: str = None, sql: str = None) -> ExplainResult

Get the query execution plan.

PARAMETER DESCRIPTION
verbose

If True, show more detailed plan information

TYPE: bool DEFAULT: False

analyze

If True, run the plan and show actual execution statistics

TYPE: bool DEFAULT: False

format

Output format - "indent" (default), "tree", or "graphviz"

TYPE: str DEFAULT: None

sql

Optional SQL statement to explain instead of the bundle's dataframe

TYPE: str DEFAULT: None

RETURNS DESCRIPTION
ExplainResult

ExplainResult with readable string representation

to_pandas

to_pandas() -> Any

Convert bundle data to pandas DataFrame.

to_polars

to_polars() -> Any

Convert bundle data to Polars DataFrame.

to_numpy

to_numpy() -> Dict[str, Any]

Convert bundle data to dict of numpy arrays.

to_dict

to_dict() -> Dict[str, List[Any]]

Convert bundle data to dict of lists.

as_pyarrow

as_pyarrow() -> Any

Get all data as PyArrow Table.

extend

extend(data_dir: Optional[str] = None) -> SyncBundleBuilder

Extend this bundle to create a new BundleBuilder.

This is the primary way to create a new BundleBuilder from an existing bundle. The new builder can optionally have a different data directory.

PARAMETER DESCRIPTION
data_dir

Optional new data directory. If None, uses the current bundle's data_dir.

TYPE: Optional[str] DEFAULT: None

RETURNS DESCRIPTION
SyncBundleBuilder

New SyncBundleBuilder

Example

Extend with just a new data directory

builder = bundle.extend(data_dir="s3://bucket/new")

Extend and then filter

builder = bundle.extend() builder.filter("active = true", [])

RAISES DESCRIPTION
ValueError

If data_dir is invalid

query

query(sql: str, params: Optional[List[Any]] = None, hard_limit: Optional[int] = None) -> SyncQueryResult

Execute a SQL query and return streaming results.

Unlike extend() with SQL, this does NOT create a new BundleBuilder. It directly executes the query and returns the results.

PARAMETER DESCRIPTION
sql

SQL query string

TYPE: str

params

Optional list of parameters for parameterized queries. If None, defaults to empty list.

TYPE: Optional[List[Any]] DEFAULT: None

hard_limit

Optional maximum number of rows to return. Applied at the DataFrame level for efficient execution. If None, no limit is applied.

TYPE: Optional[int] DEFAULT: None

RETURNS DESCRIPTION
SyncQueryResult

SyncQueryResult that can be converted to pandas/polars.

describe_function

describe_function(name: str) -> SyncQueryResult

Describe a registered function's metadata.

Returns a table with columns: name, kind, input_types, return_type, runner, logic, platform, temporary.

PARAMETER DESCRIPTION
name

Dotted function name (e.g., "acme.double_val")

TYPE: str

RETURNS DESCRIPTION
SyncQueryResult

SyncQueryResult with function metadata

describe_data

describe_data(columns) -> SyncQueryResult

Describe data quality and statistics for specified columns.

Returns per-column stats: min, max, avg, null counts, top 10 values, and top 10 invalid values (when expected types are specified).

PARAMETER DESCRIPTION
columns

List of column names (str) or tuples of (column_name, expected_type). Example: ["age", "salary"] or [("price", "Float64"), "name"]

RETURNS DESCRIPTION
SyncQueryResult

SyncQueryResult with column statistics

test_connector

test_connector(name: str, **kwargs) -> SyncQueryResult

Test an already-imported connector by name.

Calls discover() then data() to validate the integration without modifying the bundle.

PARAMETER DESCRIPTION
name

Connector name (e.g., "http", "acme.weather")

TYPE: str

**kwargs

Connector arguments (e.g., url="https://example.com/data.csv")

DEFAULT: {}

RETURNS DESCRIPTION
SyncQueryResult

SyncQueryResult with sections: discover, schema, sample, result

test_temp_connector

test_temp_connector(from_: str, **kwargs) -> SyncQueryResult

Test a connector inline without importing it first.

Calls discover() then data() to validate the integration without modifying the bundle.

PARAMETER DESCRIPTION
from_

Runtime and entrypoint string (e.g., "python::my_module:MyConnector")

TYPE: str

**kwargs

Connector arguments

DEFAULT: {}

RETURNS DESCRIPTION
SyncQueryResult

SyncQueryResult with sections: discover, schema, sample, result

import_temp_connector

import_temp_connector(name: str, from_: str, platform: str = '*/*') -> SyncBundle

Load a temporary connector with runtime-only logic (not persisted).

Temporary connectors are session-scoped and support all runners including python. They are not saved on commit and must be re-registered each session.

PARAMETER DESCRIPTION
name

Dot-separated connector name (e.g., "acme.weather")

TYPE: str

from_

Runtime and logic string (e.g., "python::mod:Class")

TYPE: str

platform

Docker-style platform string (default: "/")

TYPE: str DEFAULT: '*/*'

RETURNS DESCRIPTION
SyncBundle

Self for fluent chaining

drop_temp_connector

drop_temp_connector(name: str, platform: str = None) -> str

Drop temporary (runtime-only) connector for a connector definition.

PARAMETER DESCRIPTION
name

The defined connector name

TYPE: str

platform

Optional platform filter (e.g., "linux/amd64"). None drops all.

TYPE: str DEFAULT: None

RETURNS DESCRIPTION
str

Message describing what was dropped

import_temp_function

import_temp_function(name: str, from_: str, platform: str = '*/*') -> SyncBundle

Load a temporary SQL function (not persisted).

Types and kind are auto-detected from the function's manifest (bundlebase_metadata()).

PARAMETER DESCRIPTION
name

Dotted function name (e.g., "acme.double_val")

TYPE: str

from_

Runtime and logic string (e.g., "python::mod:func")

TYPE: str

platform

Platform in os/arch format (default "/")

TYPE: str DEFAULT: '*/*'

RETURNS DESCRIPTION
SyncBundle

Self for fluent chaining

drop_temp_function

drop_temp_function(name: str, platform: str = None) -> str

Drop a temporary function.

PARAMETER DESCRIPTION
name

The dotted function name (e.g., "acme.double_val")

TYPE: str

platform

Optional platform filter. None drops all.

TYPE: str DEFAULT: None

RETURNS DESCRIPTION
str

Message describing what was dropped

SyncBundleBuilder

Mutable bundle class returned by bundlebase.sync.create() and transformation methods.

SyncBundleBuilder

SyncBundleBuilder(async_bundle: Any)

Bases: SyncBundle

Synchronous wrapper for PyBundleBuilder (mutable).

Provides a synchronous interface to mutable Bundle operations with fluent chaining support (no await needed).

Persistent vs Temporary imports

Functions and connectors can be imported as persistent or temporary (temp):

  • Temporary (import_temp_function / import_temp_connector): Session-scoped, supports all runners including python. Not saved on commit; must be re-registered each session. Corresponds to IMPORT TEMP FUNCTION / IMPORT TEMP CONNECTOR in SQL.

  • Persistent (import_function / import_connector): Bundled into the data package and replayed on open. Requires a serializable runner (not python). Corresponds to IMPORT FUNCTION / IMPORT CONNECTOR in SQL.

Temp definitions override persistent ones at resolution time.

Example

c = dc.create() c.attach("data.parquet").filter("active = true").drop_column("email") df = c.to_pandas()

attach

attach(location: str, pack: str = 'base') -> SyncBundleBuilder

Attach a data source to the bundle.

PARAMETER DESCRIPTION
location

The URL/path of the data to attach

TYPE: str

pack

The pack to attach to. Use "base" for the base pack, or a join name to attach to that join's pack.

TYPE: str DEFAULT: 'base'

RETURNS DESCRIPTION
SyncBundleBuilder

Self for fluent chaining

detach_block

detach_block(location: str) -> SyncBundleBuilder

Detach a data block from the bundle by its location.

PARAMETER DESCRIPTION
location

The location (URL) of the block to detach

TYPE: str

RETURNS DESCRIPTION
SyncBundleBuilder

Self for fluent chaining

replace_block

replace_block(old_location: str, new_location: str) -> SyncBundleBuilder

Replace a block's data location in the bundle.

Changes where a block's data is read from without changing the block's identity. Useful when data files are moved to a new location.

PARAMETER DESCRIPTION
old_location

The current location (URL) of the block

TYPE: str

new_location

The new location (URL) to read data from

TYPE: str

RETURNS DESCRIPTION
SyncBundleBuilder

Self for fluent chaining

add_column

add_column(name: str, expression: str) -> SyncBundleBuilder

Add a computed column to the bundle.

PARAMETER DESCRIPTION
name

Name for the new column

TYPE: str

expression

SQL expression to compute the column value

TYPE: str

drop_column

drop_column(name: str) -> SyncBundleBuilder

Remove a column from the bundle.

cast_column

cast_column(name: str, new_type: str) -> SyncBundleBuilder

Cast a column to a different data type.

PARAMETER DESCRIPTION
name

Name of the column to cast

TYPE: str

new_type

Target type (e.g., "integer", "float", "string")

TYPE: str

normalize_column_names

normalize_column_names() -> SyncBundleBuilder

Normalize all column names to lowercase+underscore identifiers.

rename_column

rename_column(old_name: str, new_name: str) -> SyncBundleBuilder

Rename a column.

PARAMETER DESCRIPTION
old_name

Current column name

TYPE: str

new_name

New column name

TYPE: str

RETURNS DESCRIPTION
SyncBundleBuilder

Self for fluent chaining

filter

filter(query: str, params: Optional[List[Any]] = None) -> SyncBundleBuilder

Filter rows based on a SQL SELECT query.

PARAMETER DESCRIPTION
query

SQL SELECT query (e.g., "SELECT * FROM bundle WHERE salary > $1")

TYPE: str

params

Optional list of parameters for parameterized queries ($1, $2, etc.). If None, defaults to empty list.

TYPE: Optional[List[Any]] DEFAULT: None

RETURNS DESCRIPTION
SyncBundleBuilder

Self for fluent chaining

Example

c.filter("SELECT * FROM bundle WHERE salary > $1", [50000.0]) c.filter("SELECT * FROM bundle WHERE active = true")

delete

delete(where_clause: str) -> SyncBundleBuilder

Delete rows matching a WHERE condition.

PARAMETER DESCRIPTION
where_clause

SQL WHERE condition (e.g., "salary < 0")

TYPE: str

RETURNS DESCRIPTION
SyncBundleBuilder

Self for fluent chaining

Example

c.delete("salary < 0") c.delete("status = 'inactive' AND last_login < '2020-01-01'")

update

update(set_where: str) -> SyncBundleBuilder

Update rows matching a WHERE condition with new values.

PARAMETER DESCRIPTION
set_where

SET and WHERE clause (e.g., "SET salary = 100 WHERE id = 1")

TYPE: str

RETURNS DESCRIPTION
SyncBundleBuilder

Self for fluent chaining

always_delete

always_delete(where_clause: str) -> SyncBundleBuilder

Register a persistent always-delete rule and immediately delete matching rows.

PARAMETER DESCRIPTION
where_clause

SQL WHERE condition (e.g., "salary < 0")

TYPE: str

RETURNS DESCRIPTION
SyncBundleBuilder

Self for fluent chaining

drop_always_delete

drop_always_delete(where_clause: Optional[str] = None) -> SyncBundleBuilder

Remove always-delete rules.

PARAMETER DESCRIPTION
where_clause

Specific rule to remove, or None to remove all rules

TYPE: Optional[str] DEFAULT: None

RETURNS DESCRIPTION
SyncBundleBuilder

Self for fluent chaining

always_update

always_update(set_where: str) -> SyncBundleBuilder

Register a persistent always-update rule and immediately update matching rows.

PARAMETER DESCRIPTION
set_where

SET and WHERE clause (e.g., "SET salary = 0 WHERE salary < 0")

TYPE: str

RETURNS DESCRIPTION
SyncBundleBuilder

Self for fluent chaining

drop_always_update

drop_always_update(rule_text: Optional[str] = None) -> SyncBundleBuilder

Remove always-update rules.

PARAMETER DESCRIPTION
rule_text

Specific rule to remove ("SET ... WHERE ..."), or None to remove all

TYPE: Optional[str] DEFAULT: None

RETURNS DESCRIPTION
SyncBundleBuilder

Self for fluent chaining

join

join(name: str, on: str, location: Optional[str] = None, how: str = 'inner') -> SyncBundleBuilder

Join with another data source.

If location is None, the join point is created without any initial data. Data can be attached later using attach(location, pack=name) or create_source(pack=name).

drop_join

drop_join(join_name: str) -> SyncBundleBuilder

Drop an existing join.

PARAMETER DESCRIPTION
join_name

Name of the join to drop

TYPE: str

RETURNS DESCRIPTION
SyncBundleBuilder

Self for fluent chaining

create_source

create_source(connector: str, args: Dict[str, str], pack: str = 'base') -> SyncBundleBuilder

Create a data source for a pack.

PARAMETER DESCRIPTION
connector

Connector name (e.g., "remote_dir" for built-in, "acme.weather" for custom)

TYPE: str

args

Dictionary of connector-specific arguments

TYPE: Dict[str, str]

pack

Which pack to define the source for: - "base" (default): The base pack - A join name: A joined pack by its join name

TYPE: str DEFAULT: 'base'

RETURNS DESCRIPTION
SyncBundleBuilder

Self for fluent chaining

import_connector

import_connector(name: str, from_: str, platform: str = '*/*') -> SyncBundleBuilder

Load a named connector with logic (persisted).

Persistent connectors are bundled into the data package and replayed on open. The python runner cannot be persisted -- use import_temp_connector for in-process Python connectors.

PARAMETER DESCRIPTION
name

Dot-separated connector name (e.g., "acme.weather")

TYPE: str

from_

Runtime and logic string (e.g., "python::mod:Class")

TYPE: str

platform

Docker-style platform string (default: "/")

TYPE: str DEFAULT: '*/*'

RETURNS DESCRIPTION
SyncBundleBuilder

Self for fluent chaining

describe_function

describe_function(name: str) -> SyncQueryResult

Describe a registered function's metadata.

Returns a table with columns: name, kind, input_types, return_type, runner, logic, platform, temporary.

PARAMETER DESCRIPTION
name

Dotted function name (e.g., "acme.double_val")

TYPE: str

RETURNS DESCRIPTION
SyncQueryResult

SyncQueryResult with function metadata

describe_data

describe_data(columns) -> SyncQueryResult

Describe data quality and statistics for specified columns.

Returns per-column stats: min, max, avg, null counts, top 10 values, and top 10 invalid values (when expected types are specified).

PARAMETER DESCRIPTION
columns

List of column names (str) or tuples of (column_name, expected_type). Example: ["age", "salary"] or [("price", "Float64"), "name"]

RETURNS DESCRIPTION
SyncQueryResult

SyncQueryResult with column statistics

test_connector

test_connector(name: str, **kwargs) -> SyncQueryResult

Test an already-imported connector by name.

Calls discover() then data() to validate the integration without modifying the bundle.

PARAMETER DESCRIPTION
name

Connector name (e.g., "http", "acme.weather")

TYPE: str

**kwargs

Connector arguments (e.g., url="https://example.com/data.csv")

DEFAULT: {}

RETURNS DESCRIPTION
SyncQueryResult

SyncQueryResult with sections: discover, schema, sample, result

test_temp_connector

test_temp_connector(from_: str, **kwargs) -> SyncQueryResult

Test a connector inline without importing it first.

Calls discover() then data() to validate the integration without modifying the bundle.

PARAMETER DESCRIPTION
from_

Runtime and entrypoint string (e.g., "python::my_module:MyConnector")

TYPE: str

**kwargs

Connector arguments

DEFAULT: {}

RETURNS DESCRIPTION
SyncQueryResult

SyncQueryResult with sections: discover, schema, sample, result

import_temp_connector

import_temp_connector(name: str, from_: str, platform: str = '*/*') -> SyncBundleBuilder

Load a temporary connector with runtime-only logic (not persisted).

Temporary connectors are session-scoped and support all runners including python. They are not saved on commit and must be re-registered each session.

PARAMETER DESCRIPTION
name

Dot-separated connector name (e.g., "acme.weather")

TYPE: str

from_

Runtime and logic string (e.g., "python::mod:Class")

TYPE: str

platform

Docker-style platform string (default: "/")

TYPE: str DEFAULT: '*/*'

RETURNS DESCRIPTION
SyncBundleBuilder

Self for fluent chaining

rename_connector

rename_connector(old_name: str, new_name: str) -> SyncBundleBuilder

Rename a connector to a new dotted name.

Renames all entries and updates sources referencing the old connector name.

PARAMETER DESCRIPTION
old_name

Current connector name (e.g., "acme.weather")

TYPE: str

new_name

New connector name (e.g., "acme.weather_v2")

TYPE: str

RETURNS DESCRIPTION
SyncBundleBuilder

Self for fluent chaining

drop_connector

drop_connector(name: str, platform: str = None) -> SyncBundleBuilder

Drop a connector. Without a platform, removes the entire definition. With a platform, removes only the logic for that platform.

PARAMETER DESCRIPTION
name

The dotted connector name (e.g., "acme.weather")

TYPE: str

platform

Optional platform filter (e.g., "linux/amd64"). None drops entire connector.

TYPE: str DEFAULT: None

RETURNS DESCRIPTION
SyncBundleBuilder

Self for fluent chaining

drop_temp_connector

drop_temp_connector(name: str, platform: str = None) -> str

Drop temporary (runtime-only) connector for a connector definition.

PARAMETER DESCRIPTION
name

The defined connector name

TYPE: str

platform

Optional platform filter (e.g., "linux/amd64"). None drops all.

TYPE: str DEFAULT: None

RETURNS DESCRIPTION
str

Message describing what was dropped

drop_temp_function

drop_temp_function(name: str, platform: str = None) -> str

Drop a temporary function.

PARAMETER DESCRIPTION
name

The dotted function name (e.g., "acme.double_val")

TYPE: str

platform

Optional platform filter. None drops all.

TYPE: str DEFAULT: None

RETURNS DESCRIPTION
str

Message describing what was dropped

fetch

fetch(pack: str = 'base', mode: str = 'add') -> List[FetchResults]

Fetch data from sources for a pack.

Checks the pack's sources for new files and attaches them to the bundle.

PARAMETER DESCRIPTION
pack

Which pack to fetch sources for: - "base" (default): The base pack - A join name: A joined pack by its join name

TYPE: str DEFAULT: 'base'

mode

Sync mode (default: "add"): - "add": Only attach new files - "update": Add new files and replace changed files - "sync": Add new, replace changed, and remove deleted files

TYPE: str DEFAULT: 'add'

RETURNS DESCRIPTION
List[FetchResults]

List of FetchResults, one for each source in the pack.

List[FetchResults]

Each result contains details about blocks added, replaced, and removed.

fetch_all

fetch_all(mode: str = 'add') -> List[FetchResults]

Fetch data from all defined sources.

Checks all defined sources for new files and attaches them to the bundle.

PARAMETER DESCRIPTION
mode

Sync mode (default: "add"): - "add": Only attach new files - "update": Add new files and replace changed files - "sync": Add new, replace changed, and remove deleted files

TYPE: str DEFAULT: 'add'

RETURNS DESCRIPTION
List[FetchResults]

List of FetchResults, one for each source across all packs.

List[FetchResults]

Includes results for sources with no changes (empty results).

extend

extend(data_dir: Optional[str] = None) -> SyncBundleBuilder

Extend this bundle to create a new BundleBuilder.

PARAMETER DESCRIPTION
data_dir

Optional new data directory. If None, uses the current bundle's data_dir.

TYPE: Optional[str] DEFAULT: None

RETURNS DESCRIPTION
SyncBundleBuilder

New SyncBundleBuilder

Example

Extend and then filter

extended = c.extend() extended.filter("active = true", [])

query

query(sql: str, params: Optional[List[Any]] = None, hard_limit: Optional[int] = None) -> SyncQueryResult

Execute a SQL query and return streaming results.

Unlike extend() with SQL, this does NOT create a new BundleBuilder. It directly executes the query and returns the results.

PARAMETER DESCRIPTION
sql

SQL query string

TYPE: str

params

Optional list of parameters for parameterized queries. If None, defaults to empty list.

TYPE: Optional[List[Any]] DEFAULT: None

hard_limit

Optional maximum number of rows to return. Applied at the DataFrame level for efficient execution. If None, no limit is applied.

TYPE: Optional[int] DEFAULT: None

RETURNS DESCRIPTION
SyncQueryResult

SyncQueryResult that can be converted to pandas/polars.

create_view

create_view(name: str, sql: str) -> SyncBundleBuilder

Create a view from a SQL query.

PARAMETER DESCRIPTION
name

Name for the new view

TYPE: str

sql

SQL query defining the view contents

TYPE: str

RETURNS DESCRIPTION
SyncBundleBuilder

SyncBundleBuilder for the new view

set_name

set_name(name: str) -> SyncBundleBuilder

Set the bundle name.

set_description

set_description(desc: str) -> SyncBundleBuilder

Set the bundle description.

import_function

import_function(name: str, from_: str, platform: str = '*/*') -> SyncBundleBuilder

Load a named SQL function (persisted).

Persistent functions are bundled into the data package and replayed on open. The python runner cannot be persisted -- use import_temp_function for in-process Python functions.

Types and kind are auto-detected from the function's manifest.

PARAMETER DESCRIPTION
name

Dotted function name (e.g., "acme.double_val") or wildcard (e.g., "acme.*")

TYPE: str

from_

Runtime and logic string (e.g., "python::mod:func")

TYPE: str

platform

Platform in os/arch format (default "/")

TYPE: str DEFAULT: '*/*'

RETURNS DESCRIPTION
SyncBundleBuilder

SyncBundleBuilder for chaining

import_temp_function

import_temp_function(name: str, from_: str, platform: str = '*/*') -> SyncBundleBuilder

Load a temporary SQL function (not persisted).

Types and kind are auto-detected from the function's manifest (bundlebase_metadata()).

PARAMETER DESCRIPTION
name

Dotted function name (e.g., "acme.double_val")

TYPE: str

from_

Runtime and logic string (e.g., "python::mod:func")

TYPE: str

platform

Platform in os/arch format (default "/")

TYPE: str DEFAULT: '*/*'

RETURNS DESCRIPTION
SyncBundleBuilder

SyncBundleBuilder for chaining

rename_function

rename_function(old_name: str, new_name: str) -> SyncBundleBuilder

Rename a function to a new dotted name.

Renames all entries, deregisters old UDFs, and re-registers under the new name.

PARAMETER DESCRIPTION
old_name

Current function name (e.g., "acme.double_val")

TYPE: str

new_name

New function name (e.g., "acme.double_val_v2")

TYPE: str

RETURNS DESCRIPTION
SyncBundleBuilder

SyncBundleBuilder for chaining

drop_function

drop_function(name: str, platform: str = None, input_types: list = None) -> SyncBundleBuilder

Drop a function. Without a platform or input_types, removes the entire definition. With a platform, removes only the logic for that platform. With input_types, removes only the overload matching that signature.

PARAMETER DESCRIPTION
name

The dotted function name (e.g., "acme.double_val")

TYPE: str

platform

Optional platform filter (e.g., "linux/amd64"). None drops entire function.

TYPE: str DEFAULT: None

input_types

Optional list of Arrow type names to drop a specific overload.

TYPE: list DEFAULT: None

RETURNS DESCRIPTION
SyncBundleBuilder

SyncBundleBuilder for chaining

create_index

create_index(columns: Union[str, List[str]], index_type: str, args: Optional[Dict[str, str]] = None, name: Optional[str] = None) -> SyncBundleBuilder

Create an index on one or more columns.

PARAMETER DESCRIPTION
columns

Column name (str) or list of column names (list[str])

TYPE: Union[str, List[str]]

index_type

Index type - "column" or "text"

TYPE: str

args

Optional index-specific arguments (e.g., {"tokenizer": "en_stem"})

TYPE: Optional[Dict[str, str]] DEFAULT: None

name

Optional index name (for text indexes). Auto-generated if not provided.

TYPE: Optional[str] DEFAULT: None

drop_index

drop_index(column: str) -> SyncBundleBuilder

Drop an index from a column.

rebuild_index

rebuild_index(column: str) -> SyncBundleBuilder

Rebuild an existing index on a column.

reindex

reindex() -> SyncBundleBuilder

Create indexes for columns that don't have them yet.

Iterates through all defined indexes and creates index files for any blocks that don't have indexes yet. This is useful after attaching new data or recovering from partial index creation failures.

RETURNS DESCRIPTION
SyncBundleBuilder

Self for fluent chaining

commit

commit(message: str) -> Any

Commit changes to persistent storage.

Utility Functions

stream_batches

stream_batches(bundle: SyncBundle) -> Any

Stream RecordBatches from a bundle synchronously.

WARNING: This function materializes ALL batches in memory first, then yields them. This is a limitation of the synchronous API due to Python's threading model. For true streaming with constant memory usage, use the async API: async for batch in bundlebase.stream_batches(bundle): process(batch)

For better memory efficiency with the sync API, consider: 1. Using pandas/polars conversion instead of streaming 2. Processing smaller subsets of data (using filter operations) 3. Using the async API instead

PARAMETER DESCRIPTION
bundle

SyncBundle to stream from

TYPE: SyncBundle

YIELDS DESCRIPTION
Any

pyarrow.RecordBatch objects (all loaded into memory first)

Example

import bundlebase.sync as bb c = bb.create().attach("data.parquet") for batch in bb.stream_batches©: ... print(f"Processing {batch.num_rows} rows")

RAISES DESCRIPTION
ValueError

If streaming fails

Examples

Simple Script

import bundlebase.sync as dc

# Create and process data
c = dc.create()
c.attach("userdata.parquet")
c.filter("salary > 50000")
c.drop_column("email")

# Export
df = c.to_pandas()
print(f"Found {len(df)} high earners")

Method Chaining

import bundlebase.sync as dc

df = (dc.create()
      .attach("data.parquet")
      .drop_column("email")
      .filter("active = true")
      .rename_column("fname", "first_name")
      .to_pandas())

Jupyter Notebook

First, install the jupyter extra:

pip install "bundlebase[jupyter]"

Then in your notebook:

import bundlebase.sync as dc

c = dc.create().attach("data.parquet")
display(c.to_pandas())  # Nice table in notebook

Streaming Large Datasets

import bundlebase.sync as dc

c = dc.create().attach("huge_dataset.parquet")

total_rows = 0
for batch in dc.stream_batches(c):
    # Process batch (~100MB)
    total_rows += batch.num_rows
    print(f"Processed {batch.num_rows} rows")

print(f"Total: {total_rows}")

Saving and Loading

import bundlebase.sync as dc

# Create and save
c = dc.create("/tmp/my_bundle")
c.attach("data.parquet")
c.filter("year >= 2020")
c.commit("Filtered to 2020+")

# Later, load
c = dc.open("/tmp/my_bundle")
df = c.to_pandas()

Async vs Sync Comparison

Async API

import bundlebase
import asyncio

async def process():
    c = await bundlebase.create()
    c = await c.attach("data.parquet")
    df = await c.to_pandas()
    return df

df = asyncio.run(process())

Sync API

import bundlebase.sync as dc

c = dc.create()
c.attach("data.parquet")
df = c.to_pandas()

Performance Notes

Overhead

The sync API adds minimal overhead:

  • Scripts: ~0.1ms per operation (persistent event loop)
  • Jupyter: ~0.2ms per operation (nested asyncio)

This is negligible compared to data I/O time.

Optimization

Chaining operations reduces overhead:

# Good: One event loop call
df = (dc.create()
      .attach("data.parquet")
      .filter("x > 10")
      .to_pandas())

# Less optimal: Multiple event loop calls
c = dc.create()
c.attach("data.parquet")
c.filter("x > 10")
df = c.to_pandas()

Error Handling

Handle errors like regular Python code:

import bundlebase.sync as dc

try:
    c = dc.create()
    c.attach("nonexistent.parquet")
except ValueError as e:
    print(f"Failed to load data: {e}")

Migration from Async

Migration is straightforward:

Before (Async)

import bundlebase

async def process():
    c = await bundlebase.create()
    c = await c.attach("data.parquet")
    c = await c.filter("x > 10")
    return await c.to_pandas()

After (Sync)

import bundlebase.sync as dc

c = dc.create()
c.attach("data.parquet")
c.filter("x > 10")
df = c.to_pandas()

Just remove await and import bundlebase.sync!

Troubleshooting

ImportError: nest_asyncio required

Install Jupyter support:

pip install "bundlebase[jupyter]"

"No event loop running" in Jupyter

Make sure you've imported from bundlebase.sync:

import bundlebase.sync as dc  # Not bundlebase!

See Also