Skip to content

Async API

The async API provides a modern async/await interface for Bundlebase operations.

Factory Functions

create

create

create(path: str = '', config: Optional[Dict[str, Any]] = None) -> CreateChain

Create a new Bundle with fluent chaining support.

Returns an awaitable chain that can queue operations before execution.

PARAMETER DESCRIPTION
path

Optional path for bundle storage

TYPE: str DEFAULT: ''

config

Optional configuration dict for cloud storage settings

TYPE: Optional[Dict[str, Any]] DEFAULT: None

RETURNS DESCRIPTION
CreateChain

CreateChain that can be chained with operations

Example

c = await (create(path) .attach("data.parquet") .drop_column("unwanted") .rename_column("old", "new"))

With config:

config = {"region": "us-west-2", "s3://bucket/": {"endpoint": "http://localhost:9000"}} c = await create("s3://my-bucket/", config=config)

open

open async

open(path: str, config: Optional[Dict[str, Any]] = None) -> PyBundle

Load a bundle definition from a saved file.

PARAMETER DESCRIPTION
path

Path to the saved bundle file (YAML format)

TYPE: str

config

Optional configuration dict for cloud storage settings

TYPE: Optional[Dict[str, Any]] DEFAULT: None

RETURNS DESCRIPTION
PyBundle

A PyBundle with the loaded operations (read-only)

RAISES DESCRIPTION
ValueError

If the file cannot be loaded

Example

bundle = await open("s3://my-bucket/container")

With config:

config = {"region": "us-west-2"} bundle = await open("s3://my-bucket/container", config=config)

Core Classes

PyBundle

Read-only bundle class returned by open().

PyBundle

Read-only Bundle class for data processing operations.

Provides a lazy evaluation pipeline for loading, transforming, and querying data from various sources using Apache Arrow and DataFusion.

Note: This class is read-only. Use PyBundleBuilder for mutations.

name property

name: Optional[str]

Get the bundle name.

RETURNS DESCRIPTION
Optional[str]

Bundle name or None if not set

description property

description: Optional[str]

Get the bundle description.

RETURNS DESCRIPTION
Optional[str]

Bundle description or None if not set

num_rows property

num_rows: int

Get the number of rows in the bundle.

RETURNS DESCRIPTION
int

Number of rows based on the attached data sources

extend

extend(data_dir: str) -> ExtendChain

Extend this bundle to a new directory with chainable operations.

Creates an BundleBuilder in the specified directory, copying the existing bundle's state and allowing new operations to be chained.

PARAMETER DESCRIPTION
data_dir

Path to the new directory for the extended bundle

TYPE: str

RETURNS DESCRIPTION
ExtendChain

ExtendChain that can be chained with operations

Example

c = await bundlebase.open(path) extended = await c.extend(new_path).attach("data.parquet").drop_column("col")

schema async

schema() -> PySchema

Get the current schema of the bundle.

RETURNS DESCRIPTION
PySchema

PySchema object representing the current column structure

to_pandas async

to_pandas() -> Any

Convert the bundle's data to a pandas DataFrame.

RETURNS DESCRIPTION
Any

pandas.DataFrame with the results

RAISES DESCRIPTION
ImportError

If pandas is not installed

ValueError

If conversion fails or bundle has no data

Example

df = await bundle.to_pandas()

to_polars async

to_polars() -> Any

Convert the bundle's data to a Polars DataFrame.

RETURNS DESCRIPTION
Any

polars.DataFrame with the results

RAISES DESCRIPTION
ImportError

If polars is not installed

ValueError

If conversion fails or bundle has no data

Example

df = await bundle.to_polars()

to_numpy async

to_numpy() -> Dict[str, Any]

Convert the bundle's data to a dictionary of numpy arrays.

RETURNS DESCRIPTION
Dict[str, Any]

Dictionary mapping column names to numpy arrays

RAISES DESCRIPTION
ImportError

If numpy is not installed

ValueError

If conversion fails or bundle has no data

Example

arrays = await bundle.to_numpy()

to_dict async

to_dict() -> Dict[str, List[Any]]

Convert the bundle's data to a dictionary of lists.

RETURNS DESCRIPTION
Dict[str, List[Any]]

Dictionary mapping column names to lists of values

RAISES DESCRIPTION
ValueError

If conversion fails or bundle has no data

Example

data = await bundle.to_dict()

explain async

explain(verbose: bool = False, analyze: bool = False, format: Optional[str] = None, sql: Optional[str] = None) -> RecordBatchStream

Get the query execution plan as a stream.

Generates and returns the logical and physical query plan that DataFusion will use to execute the operation pipeline.

PARAMETER DESCRIPTION
verbose

If True, show more detailed plan information

TYPE: bool DEFAULT: False

analyze

If True, run the plan and show actual execution statistics

TYPE: bool DEFAULT: False

format

Output format - "indent" (default), "tree", or "graphviz"

TYPE: Optional[str] DEFAULT: None

sql

Optional SQL statement to explain instead of the bundle's dataframe

TYPE: Optional[str] DEFAULT: None

RETURNS DESCRIPTION
RecordBatchStream

RecordBatchStream with plan_type and plan columns

RAISES DESCRIPTION
ValueError

If plan generation fails

Example

stream = await bundle.explain() batch = await stream.next() # Contains plan_type and plan columns

PyBundleBuilder

Mutable bundle class returned by create() and transformation methods.

PyBundleBuilder

Mutable Bundle class for data processing operations.

Provides a lazy evaluation pipeline for loading, transforming, and querying data from various sources using Apache Arrow and DataFusion. Supports fluent operation chaining with a single await.

All mutation methods return an OperationChain that queues operations and can be awaited to execute them sequentially.

Example

c = await (await bundlebase.create(path) .attach("data.parquet") .drop_column("unwanted") .rename_column("old_name", "new_name"))

name property

name: Optional[str]

Get the bundle name.

RETURNS DESCRIPTION
Optional[str]

Bundle name or None if not set

description property

description: Optional[str]

Get the bundle description.

RETURNS DESCRIPTION
Optional[str]

Bundle description or None if not set

num_rows property

num_rows: int

Get the number of rows in the bundle.

RETURNS DESCRIPTION
int

Number of rows based on the attached data sources

attach

attach(location: str, pack: str = 'base') -> OperationChain

Attach data from a file location.

Queues an attach operation that will be executed when the chain is awaited. Supports CSV, JSON, Parquet files.

PARAMETER DESCRIPTION
location

Data file location (e.g., "data.csv", "data.parquet")

TYPE: str

pack

Pack to attach to - "base" for the base pack, or a join name for joined data

TYPE: str DEFAULT: 'base'

RETURNS DESCRIPTION
OperationChain

OperationChain for fluent chaining

RAISES DESCRIPTION
ValueError

If the location is invalid or data cannot be loaded

Example

c = await c.attach("data.parquet") c = await c.attach("extra_users.csv", pack="users") # attach to a join

add_column

add_column(name: str, expression: str) -> OperationChain

Queue an add_column operation.

Adds a computed column to the bundle using a SQL expression.

PARAMETER DESCRIPTION
name

Name for the new column

TYPE: str

expression

SQL expression to compute the column value

TYPE: str

RETURNS DESCRIPTION
OperationChain

OperationChain for fluent chaining

RAISES DESCRIPTION
ValueError

If the column already exists

Example

c = await c.add_column("full_name", "first_name || ' ' || last_name")

cast_column

cast_column(name: str, new_type: str) -> OperationChain

Queue a cast_column operation.

Casts a column to a different data type.

PARAMETER DESCRIPTION
name

Name of the column to cast

TYPE: str

new_type

Target type (e.g., "integer", "float", "string")

TYPE: str

RETURNS DESCRIPTION
OperationChain

OperationChain for fluent chaining

RAISES DESCRIPTION
ValueError

If the column doesn't exist

Example

c = await c.cast_column("price", "integer")

drop_column

drop_column(name: str) -> OperationChain

Queue a drop_column operation.

PARAMETER DESCRIPTION
name

Name of the column to remove

TYPE: str

RETURNS DESCRIPTION
OperationChain

OperationChain for fluent chaining

RAISES DESCRIPTION
ValueError

If the column doesn't exist

Example

c = await c.drop_column("unwanted_col")

rename_column

rename_column(old_name: str, new_name: str) -> OperationChain

Queue a rename_column operation.

PARAMETER DESCRIPTION
old_name

Current column name

TYPE: str

new_name

New column name

TYPE: str

RETURNS DESCRIPTION
OperationChain

OperationChain for fluent chaining

RAISES DESCRIPTION
ValueError

If the column doesn't exist

Example

c = await c.rename_column("old_name", "new_name")

filter

filter(query: str, params: Optional[List[Any]] = None) -> OperationChain

Queue a filter operation.

PARAMETER DESCRIPTION
query

SQL SELECT query (e.g., "SELECT * FROM bundle WHERE salary > $1")

TYPE: str

params

Optional list of parameters for parameterized queries

TYPE: Optional[List[Any]] DEFAULT: None

RETURNS DESCRIPTION
OperationChain

OperationChain for fluent chaining

Example

c = await c.filter("SELECT * FROM bundle WHERE salary > $1", [50000])

select

select(sql: str, params: Optional[List[Any]] = None) -> OperationChain

Queue a select operation.

PARAMETER DESCRIPTION
sql

SQL query string (e.g., "SELECT * FROM bundle LIMIT 10")

TYPE: str

params

Optional list of parameters for parameterized queries

TYPE: Optional[List[Any]] DEFAULT: None

RETURNS DESCRIPTION
OperationChain

OperationChain for fluent chaining

Example

c = await c.select("SELECT * FROM bundle LIMIT 10")

join

join(name: str, expression: str, location: Optional[str] = None, join_type: Optional[str] = None) -> OperationChain

Queue a join operation.

PARAMETER DESCRIPTION
name

Name for the joined data (used to reference in expressions)

TYPE: str

expression

Join condition expression

TYPE: str

location

Optional data file location to join with (can attach data later)

TYPE: Optional[str] DEFAULT: None

join_type

Type of join ("Inner", "Left", "Right", "Full")

TYPE: Optional[str] DEFAULT: None

RETURNS DESCRIPTION
OperationChain

OperationChain for fluent chaining

Example

c = await c.join("users", 'bundle.id = users.user_id', "users.csv") c = await c.join("regions", 'bundle.country = regions.country') # attach data later

set_name

set_name(name: str) -> OperationChain

Queue a set_name operation.

PARAMETER DESCRIPTION
name

Bundle name

TYPE: str

RETURNS DESCRIPTION
OperationChain

OperationChain for fluent chaining

Example

c = await c.set_name("My Bundle")

set_description

set_description(description: str) -> OperationChain

Queue a set_description operation.

PARAMETER DESCRIPTION
description

Bundle description

TYPE: str

RETURNS DESCRIPTION
OperationChain

OperationChain for fluent chaining

Example

c = await c.set_description("A description")

import_function

import_function(name: str, from_: str, platform: str = '*/*') -> OperationChain

Load a named SQL function (persisted).

Types and kind are auto-detected from the function's manifest.

PARAMETER DESCRIPTION
name

Dotted function name (e.g., "acme.double_val") or wildcard (e.g., "acme.*")

TYPE: str

from_

Runtime and logic string (e.g., "python::mod:func")

TYPE: str

platform

Platform in os/arch format (default "/")

TYPE: str DEFAULT: '*/*'

RETURNS DESCRIPTION
OperationChain

OperationChain for fluent chaining

create_source

create_source(connector: str, args: Dict[str, str], pack: str = 'base') -> OperationChain

Create a data source for a pack.

Queues an operation to define a source from which files can be automatically attached via fetch().

PARAMETER DESCRIPTION
connector

Connector name. Available connectors:

"remote_dir" - List files from a local or cloud directory: - "url" (required): Directory URL (e.g., "s3://bucket/data/", "file:///path/to/data/") - "patterns" (optional): Comma-separated glob patterns (e.g., "/*.parquet,/*.csv") - "copy" (optional): "true" to copy files into bundle (default), "false" to reference in place

"ftp_directory" - List files from a remote FTP directory: - "url" (required): FTP URL (e.g., "ftp://user:pass@ftp.example.com:21/path/to/data") - "patterns" (optional): Comma-separated glob patterns (defaults to "**/*") Supports anonymous FTP (just omit user/pass), custom ports, and authenticated access. Note: Files are always copied into the bundle (remote files cannot be directly referenced)

"sftp_directory" - List files from a remote directory via SFTP: - "url" (required): SFTP URL (e.g., "sftp://user@host:22/path/to/data") - "key_path" (required): Path to SSH private key file (e.g., "~/.ssh/id_rsa") - "patterns" (optional): Comma-separated glob patterns (defaults to "**/*") Note: Files are always copied into the bundle (remote files cannot be directly referenced)

TYPE: str

args

Connector-specific configuration arguments as described above.

TYPE: Dict[str, str]

pack

Which pack to define the source for: - "base" (default): The base pack - A join name: A joined pack by its join name

TYPE: str DEFAULT: 'base'

RETURNS DESCRIPTION
OperationChain

OperationChain for fluent chaining

Examples:

Local/cloud directory (base pack)

c = await c.create_source("remote_dir", {"url": "s3://bucket/data/", "patterns": "**/*.parquet"})

FTP directory (anonymous)

c = await c.create_source("ftp_directory", {"url": "ftp://ftp.example.com/pub/data/"})

FTP directory (authenticated)

c = await c.create_source("ftp_directory", {"url": "ftp://user:pass@ftp.example.com/data/"})

Remote directory via SFTP

c = await c.create_source("sftp_directory", {"url": "sftp://user@host/data/", "key_path": "~/.ssh/id_rsa"})

Define source for a joined pack

c = await c.create_source("remote_dir", {"url": "s3://bucket/customers/"}, pack="customers")

import_connector

import_connector(name: str, from_: str, platform: str = '*/*') -> OperationChain

Load a named connector with logic (persisted).

PARAMETER DESCRIPTION
name

Dot-separated connector name (e.g., "acme.weather")

TYPE: str

from_

Runtime and logic string (e.g., "python::mod:Class")

TYPE: str

platform

Docker-style platform string (default: "/")

TYPE: str DEFAULT: '*/*'

RETURNS DESCRIPTION
OperationChain

OperationChain for fluent chaining

import_temp_connector

import_temp_connector(name: str, from_: str, platform: str = '*/*') -> OperationChain

Load a temporary connector with runtime-only logic (not persisted).

PARAMETER DESCRIPTION
name

Dot-separated connector name (e.g., "acme.weather")

TYPE: str

from_

Runtime and logic string (e.g., "python::mod:Class")

TYPE: str

platform

Docker-style platform string (default: "/")

TYPE: str DEFAULT: '*/*'

RETURNS DESCRIPTION
OperationChain

OperationChain for fluent chaining

drop_connector

drop_connector(name: str, platform: Optional[str] = None) -> OperationChain

Drop a connector. Without a platform, removes the entire definition. With a platform, removes only the logic for that platform.

PARAMETER DESCRIPTION
name

The dotted connector name (e.g., "acme.weather")

TYPE: str

platform

Optional platform filter (e.g., "linux/amd64"). None drops entire connector.

TYPE: Optional[str] DEFAULT: None

RETURNS DESCRIPTION
OperationChain

OperationChain for fluent chaining

drop_temp_connector

drop_temp_connector(name: str, platform: Optional[str] = None) -> OperationChain

Drop runtime-only connector (session-only, no operation created).

PARAMETER DESCRIPTION
name

The dotted connector name (e.g., "acme.weather")

TYPE: str

platform

Optional platform filter. None drops all.

TYPE: Optional[str] DEFAULT: None

RETURNS DESCRIPTION
OperationChain

OperationChain for fluent chaining

fetch async

fetch(pack: str, mode: str) -> List[FetchResults]

Fetch data from sources for a pack.

Compares files in the pack's sources with already-attached files and auto-attaches any new files found.

PARAMETER DESCRIPTION
pack

Which pack to fetch sources for (e.g. "base" or a join name)

TYPE: str

mode

Sync mode: "add", "update", or "sync". - "add": Only attach new files - "update": Add new files and replace changed files - "sync": Add new, replace changed, and remove deleted files

TYPE: str

RETURNS DESCRIPTION
List[FetchResults]

List of FetchResults, one for each source in the pack.

List[FetchResults]

Each result contains details about blocks added, replaced, and removed.

Example

results = await c.fetch("base", "add") for result in results: print(f"{result.connector}: {len(result.added)} added")

fetch_all async

fetch_all(mode: str) -> List[FetchResults]

Fetch data from all defined sources.

Compares files in each source with already-attached files and auto-attaches any new files found.

PARAMETER DESCRIPTION
mode

Sync mode: "add", "update", or "sync". - "add": Only attach new files - "update": Add new files and replace changed files - "sync": Add new, replace changed, and remove deleted files

TYPE: str

RETURNS DESCRIPTION
List[FetchResults]

List of FetchResults, one for each source across all packs.

List[FetchResults]

Includes results for sources with no changes (empty results).

Example

results = await c.fetch_all("add") for result in results: print(f"{result.connector}: {result.total_count()} changes")

create_index

create_index(columns: Union[str, List[str]], index_type: str, args: Optional[Dict[str, str]] = None, name: Optional[str] = None) -> OperationChain

Create an index on one or more columns.

PARAMETER DESCRIPTION
columns

Column name (str) or list of column names (list[str])

TYPE: Union[str, List[str]]

index_type

Index type - "column" or "text"

TYPE: str

args

Optional index-specific arguments (e.g., {"tokenizer": "en_stem"})

TYPE: Optional[Dict[str, str]] DEFAULT: None

name

Optional index name (for text indexes). Auto-generated if not provided.

TYPE: Optional[str] DEFAULT: None

RETURNS DESCRIPTION
OperationChain

OperationChain for fluent chaining

Example

c = await c.create_index("user_id", "column") c = await c.create_index("description", "text") c = await c.create_index(["title", "description"], "text") c = await c.create_index(["title", "description"], "text", name="product_search")

rebuild_index

rebuild_index(column: str) -> OperationChain

Rebuild an index on a column.

PARAMETER DESCRIPTION
column

Name of the column whose index to rebuild

TYPE: str

RETURNS DESCRIPTION
OperationChain

OperationChain for fluent chaining

Example

c = await c.rebuild_index("user_id")

reindex async

reindex() -> PyBundleBuilder

Create indexes for columns that don't have them yet.

Iterates through all defined indexes and creates index files for any blocks that don't have indexes yet. This is useful after attaching new data or recovering from partial index creation failures.

RETURNS DESCRIPTION
PyBundleBuilder

The bundle after reindexing

RAISES DESCRIPTION
ValueError

If reindexing fails

commit async

commit(message: str) -> PyBundleBuilder

Commit the current state of the bundle.

PARAMETER DESCRIPTION
message

Commit message describing the changes

TYPE: str

RETURNS DESCRIPTION
PyBundleBuilder

The bundle after committing

RAISES DESCRIPTION
ValueError

If commit fails

schema async

schema() -> PySchema

Get the current schema of the bundle.

RETURNS DESCRIPTION
PySchema

PySchema object representing the current column structure

to_pandas async

to_pandas() -> Any

Convert the bundle's data to a pandas DataFrame.

RETURNS DESCRIPTION
Any

pandas.DataFrame with the results

RAISES DESCRIPTION
ImportError

If pandas is not installed

ValueError

If conversion fails or bundle has no data

Example

df = await bundle.to_pandas()

to_polars async

to_polars() -> Any

Convert the bundle's data to a Polars DataFrame.

RETURNS DESCRIPTION
Any

polars.DataFrame with the results

RAISES DESCRIPTION
ImportError

If polars is not installed

ValueError

If conversion fails or bundle has no data

Example

df = await bundle.to_polars()

to_numpy async

to_numpy() -> Dict[str, Any]

Convert the bundle's data to a dictionary of numpy arrays.

RETURNS DESCRIPTION
Dict[str, Any]

Dictionary mapping column names to numpy arrays

RAISES DESCRIPTION
ImportError

If numpy is not installed

ValueError

If conversion fails or bundle has no data

Example

arrays = await bundle.to_numpy()

to_dict async

to_dict() -> Dict[str, List[Any]]

Convert the bundle's data to a dictionary of lists.

RETURNS DESCRIPTION
Dict[str, List[Any]]

Dictionary mapping column names to lists of values

RAISES DESCRIPTION
ValueError

If conversion fails or bundle has no data

Example

data = await bundle.to_dict()

explain async

explain(verbose: bool = False, analyze: bool = False, format: Optional[str] = None, sql: Optional[str] = None) -> RecordBatchStream

Get the query execution plan as a stream.

Generates and returns the logical and physical query plan that DataFusion will use to execute the operation pipeline.

PARAMETER DESCRIPTION
verbose

If True, show more detailed plan information

TYPE: bool DEFAULT: False

analyze

If True, run the plan and show actual execution statistics

TYPE: bool DEFAULT: False

format

Output format - "indent" (default), "tree", or "graphviz"

TYPE: Optional[str] DEFAULT: None

sql

Optional SQL statement to explain instead of the bundle's dataframe

TYPE: Optional[str] DEFAULT: None

RETURNS DESCRIPTION
RecordBatchStream

RecordBatchStream with plan_type and plan columns

RAISES DESCRIPTION
ValueError

If plan generation fails

Example

stream = await bundle.explain() batch = await stream.next() # Contains plan_type and plan columns

Supporting Classes

PyBundleStatus

PyBundleStatus

Bundle status showing uncommitted changes.

changes property

changes: List[PyChange]

The changes that represent changes since creation/extension.

total_operations property

total_operations: int

Total number of individual operations across all changes.

is_empty

is_empty() -> bool

Check if there are any uncommitted changes.

PyChange

PyChange

Information about a logical, user-level change.

id property

id: str

Unique identifier for the change.

description property

description: str

Human-readable description of what operations were performed.

operation_count property

operation_count: int

Number of individual operations in this change.

Utility Functions

set_rust_log_level

set_rust_log_level

set_rust_log_level(level: int) -> None

Set the logging level for Rust components.

PARAMETER DESCRIPTION
level

logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR

TYPE: int

Example

import logging bundlebase.set_rust_log_level(logging.DEBUG)

test_datafile

test_datafile builtin

test_datafile(name)

Get memory URL for test data file

random_memory_url

random_memory_url builtin

random_memory_url()

Get random memory URL for test bundle

Examples

Basic Usage

import bundlebase

# Create a new bundle
c = await bundlebase.create()

# Attach data
c = await c.attach("data.parquet")

# Transform
c = await c.filter("age >= 18")
c = await c.drop_column("ssn")

# Export
df = await c.to_pandas()

Method Chaining

import bundlebase

c = await (bundlebase.create()
    .attach("data.parquet")
    .filter("active = true")
    .drop_column("temp")
    .rename_column("old", "new"))

df = await c.to_pandas()

Opening Saved Bundles

import bundlebase

# Open existing bundle
c = await bundlebase.open("/path/to/bundle")

# Extend it (creates mutable copy)
c = await c.extend()

# Add more operations
c = await c.filter("year >= 2020")

# Commit changes
await c.commit("Filtered to 2020+")

Error Handling

import bundlebase

try:
    c = await bundlebase.create()
    c = await c.attach("nonexistent.parquet")
except ValueError as e:
    print(f"Error loading data: {e}")

See Also