Async API¶
The async API provides a modern async/await interface for Bundlebase operations.
Factory Functions¶
create¶
create ¶
Create a new Bundle with fluent chaining support.
Returns an awaitable chain that can queue operations before execution.
| PARAMETER | DESCRIPTION |
|---|---|
path
|
Optional path for bundle storage
TYPE:
|
config
|
Optional configuration dict for cloud storage settings
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
CreateChain
|
CreateChain that can be chained with operations |
Example
c = await (create(path) .attach("data.parquet") .drop_column("unwanted") .rename_column("old", "new"))
With config:¶
config = {"region": "us-west-2", "s3://bucket/": {"endpoint": "http://localhost:9000"}} c = await create("s3://my-bucket/", config=config)
open¶
open
async
¶
Load a bundle definition from a saved file.
| PARAMETER | DESCRIPTION |
|---|---|
path
|
Path to the saved bundle file (YAML format)
TYPE:
|
config
|
Optional configuration dict for cloud storage settings
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
PyBundle
|
A PyBundle with the loaded operations (read-only) |
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If the file cannot be loaded |
Example
bundle = await open("s3://my-bucket/container")
With config:¶
config = {"region": "us-west-2"} bundle = await open("s3://my-bucket/container", config=config)
Core Classes¶
PyBundle¶
Read-only bundle class returned by open().
PyBundle ¶
Read-only Bundle class for data processing operations.
Provides a lazy evaluation pipeline for loading, transforming, and querying data from various sources using Apache Arrow and DataFusion.
Note: This class is read-only. Use PyBundleBuilder for mutations.
name
property
¶
Get the bundle name.
| RETURNS | DESCRIPTION |
|---|---|
Optional[str]
|
Bundle name or None if not set |
description
property
¶
Get the bundle description.
| RETURNS | DESCRIPTION |
|---|---|
Optional[str]
|
Bundle description or None if not set |
num_rows
property
¶
Get the number of rows in the bundle.
| RETURNS | DESCRIPTION |
|---|---|
int
|
Number of rows based on the attached data sources |
extend ¶
Extend this bundle to a new directory with chainable operations.
Creates an BundleBuilder in the specified directory, copying the existing bundle's state and allowing new operations to be chained.
| PARAMETER | DESCRIPTION |
|---|---|
data_dir
|
Path to the new directory for the extended bundle
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
ExtendChain
|
ExtendChain that can be chained with operations |
Example
c = await bundlebase.open(path) extended = await c.extend(new_path).attach("data.parquet").drop_column("col")
schema
async
¶
Get the current schema of the bundle.
| RETURNS | DESCRIPTION |
|---|---|
PySchema
|
PySchema object representing the current column structure |
to_pandas
async
¶
Convert the bundle's data to a pandas DataFrame.
| RETURNS | DESCRIPTION |
|---|---|
Any
|
pandas.DataFrame with the results |
| RAISES | DESCRIPTION |
|---|---|
ImportError
|
If pandas is not installed |
ValueError
|
If conversion fails or bundle has no data |
Example
df = await bundle.to_pandas()
to_polars
async
¶
Convert the bundle's data to a Polars DataFrame.
| RETURNS | DESCRIPTION |
|---|---|
Any
|
polars.DataFrame with the results |
| RAISES | DESCRIPTION |
|---|---|
ImportError
|
If polars is not installed |
ValueError
|
If conversion fails or bundle has no data |
Example
df = await bundle.to_polars()
to_numpy
async
¶
Convert the bundle's data to a dictionary of numpy arrays.
| RETURNS | DESCRIPTION |
|---|---|
Dict[str, Any]
|
Dictionary mapping column names to numpy arrays |
| RAISES | DESCRIPTION |
|---|---|
ImportError
|
If numpy is not installed |
ValueError
|
If conversion fails or bundle has no data |
Example
arrays = await bundle.to_numpy()
to_dict
async
¶
Convert the bundle's data to a dictionary of lists.
| RETURNS | DESCRIPTION |
|---|---|
Dict[str, List[Any]]
|
Dictionary mapping column names to lists of values |
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If conversion fails or bundle has no data |
Example
data = await bundle.to_dict()
explain
async
¶
explain(verbose: bool = False, analyze: bool = False, format: Optional[str] = None, sql: Optional[str] = None) -> RecordBatchStream
Get the query execution plan as a stream.
Generates and returns the logical and physical query plan that DataFusion will use to execute the operation pipeline.
| PARAMETER | DESCRIPTION |
|---|---|
verbose
|
If True, show more detailed plan information
TYPE:
|
analyze
|
If True, run the plan and show actual execution statistics
TYPE:
|
format
|
Output format - "indent" (default), "tree", or "graphviz"
TYPE:
|
sql
|
Optional SQL statement to explain instead of the bundle's dataframe
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
RecordBatchStream
|
RecordBatchStream with plan_type and plan columns |
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If plan generation fails |
Example
stream = await bundle.explain() batch = await stream.next() # Contains plan_type and plan columns
PyBundleBuilder¶
Mutable bundle class returned by create() and transformation methods.
PyBundleBuilder ¶
Mutable Bundle class for data processing operations.
Provides a lazy evaluation pipeline for loading, transforming, and querying data from various sources using Apache Arrow and DataFusion. Supports fluent operation chaining with a single await.
All mutation methods return an OperationChain that queues operations and can be awaited to execute them sequentially.
Example
c = await (await bundlebase.create(path) .attach("data.parquet") .drop_column("unwanted") .rename_column("old_name", "new_name"))
name
property
¶
Get the bundle name.
| RETURNS | DESCRIPTION |
|---|---|
Optional[str]
|
Bundle name or None if not set |
description
property
¶
Get the bundle description.
| RETURNS | DESCRIPTION |
|---|---|
Optional[str]
|
Bundle description or None if not set |
num_rows
property
¶
Get the number of rows in the bundle.
| RETURNS | DESCRIPTION |
|---|---|
int
|
Number of rows based on the attached data sources |
attach ¶
Attach data from a file location.
Queues an attach operation that will be executed when the chain is awaited. Supports CSV, JSON, Parquet files.
| PARAMETER | DESCRIPTION |
|---|---|
location
|
Data file location (e.g., "data.csv", "data.parquet")
TYPE:
|
pack
|
Pack to attach to - "base" for the base pack, or a join name for joined data
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
OperationChain
|
OperationChain for fluent chaining |
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If the location is invalid or data cannot be loaded |
Example
c = await c.attach("data.parquet") c = await c.attach("extra_users.csv", pack="users") # attach to a join
add_column ¶
Queue an add_column operation.
Adds a computed column to the bundle using a SQL expression.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
Name for the new column
TYPE:
|
expression
|
SQL expression to compute the column value
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
OperationChain
|
OperationChain for fluent chaining |
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If the column already exists |
Example
c = await c.add_column("full_name", "first_name || ' ' || last_name")
cast_column ¶
Queue a cast_column operation.
Casts a column to a different data type.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
Name of the column to cast
TYPE:
|
new_type
|
Target type (e.g., "integer", "float", "string")
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
OperationChain
|
OperationChain for fluent chaining |
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If the column doesn't exist |
Example
c = await c.cast_column("price", "integer")
drop_column ¶
Queue a drop_column operation.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
Name of the column to remove
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
OperationChain
|
OperationChain for fluent chaining |
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If the column doesn't exist |
Example
c = await c.drop_column("unwanted_col")
rename_column ¶
Queue a rename_column operation.
| PARAMETER | DESCRIPTION |
|---|---|
old_name
|
Current column name
TYPE:
|
new_name
|
New column name
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
OperationChain
|
OperationChain for fluent chaining |
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If the column doesn't exist |
Example
c = await c.rename_column("old_name", "new_name")
filter ¶
Queue a filter operation.
| PARAMETER | DESCRIPTION |
|---|---|
query
|
SQL SELECT query (e.g., "SELECT * FROM bundle WHERE salary > $1")
TYPE:
|
params
|
Optional list of parameters for parameterized queries
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
OperationChain
|
OperationChain for fluent chaining |
Example
c = await c.filter("SELECT * FROM bundle WHERE salary > $1", [50000])
select ¶
Queue a select operation.
| PARAMETER | DESCRIPTION |
|---|---|
sql
|
SQL query string (e.g., "SELECT * FROM bundle LIMIT 10")
TYPE:
|
params
|
Optional list of parameters for parameterized queries
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
OperationChain
|
OperationChain for fluent chaining |
Example
c = await c.select("SELECT * FROM bundle LIMIT 10")
join ¶
join(name: str, expression: str, location: Optional[str] = None, join_type: Optional[str] = None) -> OperationChain
Queue a join operation.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
Name for the joined data (used to reference in expressions)
TYPE:
|
expression
|
Join condition expression
TYPE:
|
location
|
Optional data file location to join with (can attach data later)
TYPE:
|
join_type
|
Type of join ("Inner", "Left", "Right", "Full")
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
OperationChain
|
OperationChain for fluent chaining |
Example
c = await c.join("users", 'bundle.id = users.user_id', "users.csv") c = await c.join("regions", 'bundle.country = regions.country') # attach data later
set_name ¶
Queue a set_name operation.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
Bundle name
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
OperationChain
|
OperationChain for fluent chaining |
Example
c = await c.set_name("My Bundle")
set_description ¶
Queue a set_description operation.
| PARAMETER | DESCRIPTION |
|---|---|
description
|
Bundle description
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
OperationChain
|
OperationChain for fluent chaining |
Example
c = await c.set_description("A description")
import_function ¶
Load a named SQL function (persisted).
Types and kind are auto-detected from the function's manifest.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
Dotted function name (e.g., "acme.double_val") or wildcard (e.g., "acme.*")
TYPE:
|
from_
|
Runtime and logic string (e.g., "python::mod:func")
TYPE:
|
platform
|
Platform in os/arch format (default "/")
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
OperationChain
|
OperationChain for fluent chaining |
create_source ¶
Create a data source for a pack.
Queues an operation to define a source from which files can be automatically attached via fetch().
| PARAMETER | DESCRIPTION |
|---|---|
connector
|
Connector name. Available connectors: "remote_dir" - List files from a local or cloud directory: - "url" (required): Directory URL (e.g., "s3://bucket/data/", "file:///path/to/data/") - "patterns" (optional): Comma-separated glob patterns (e.g., "/*.parquet,/*.csv") - "copy" (optional): "true" to copy files into bundle (default), "false" to reference in place "ftp_directory" - List files from a remote FTP directory: - "url" (required): FTP URL (e.g., "ftp://user:pass@ftp.example.com:21/path/to/data") - "patterns" (optional): Comma-separated glob patterns (defaults to "**/*") Supports anonymous FTP (just omit user/pass), custom ports, and authenticated access. Note: Files are always copied into the bundle (remote files cannot be directly referenced) "sftp_directory" - List files from a remote directory via SFTP: - "url" (required): SFTP URL (e.g., "sftp://user@host:22/path/to/data") - "key_path" (required): Path to SSH private key file (e.g., "~/.ssh/id_rsa") - "patterns" (optional): Comma-separated glob patterns (defaults to "**/*") Note: Files are always copied into the bundle (remote files cannot be directly referenced)
TYPE:
|
args
|
Connector-specific configuration arguments as described above.
TYPE:
|
pack
|
Which pack to define the source for: - "base" (default): The base pack - A join name: A joined pack by its join name
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
OperationChain
|
OperationChain for fluent chaining |
Examples:
Local/cloud directory (base pack)¶
c = await c.create_source("remote_dir", {"url": "s3://bucket/data/", "patterns": "**/*.parquet"})
FTP directory (anonymous)¶
c = await c.create_source("ftp_directory", {"url": "ftp://ftp.example.com/pub/data/"})
FTP directory (authenticated)¶
c = await c.create_source("ftp_directory", {"url": "ftp://user:pass@ftp.example.com/data/"})
Remote directory via SFTP¶
c = await c.create_source("sftp_directory", {"url": "sftp://user@host/data/", "key_path": "~/.ssh/id_rsa"})
Define source for a joined pack¶
c = await c.create_source("remote_dir", {"url": "s3://bucket/customers/"}, pack="customers")
import_connector ¶
Load a named connector with logic (persisted).
| PARAMETER | DESCRIPTION |
|---|---|
name
|
Dot-separated connector name (e.g., "acme.weather")
TYPE:
|
from_
|
Runtime and logic string (e.g., "python::mod:Class")
TYPE:
|
platform
|
Docker-style platform string (default: "/")
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
OperationChain
|
OperationChain for fluent chaining |
import_temp_connector ¶
Load a temporary connector with runtime-only logic (not persisted).
| PARAMETER | DESCRIPTION |
|---|---|
name
|
Dot-separated connector name (e.g., "acme.weather")
TYPE:
|
from_
|
Runtime and logic string (e.g., "python::mod:Class")
TYPE:
|
platform
|
Docker-style platform string (default: "/")
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
OperationChain
|
OperationChain for fluent chaining |
drop_connector ¶
Drop a connector. Without a platform, removes the entire definition. With a platform, removes only the logic for that platform.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
The dotted connector name (e.g., "acme.weather")
TYPE:
|
platform
|
Optional platform filter (e.g., "linux/amd64"). None drops entire connector.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
OperationChain
|
OperationChain for fluent chaining |
drop_temp_connector ¶
Drop runtime-only connector (session-only, no operation created).
| PARAMETER | DESCRIPTION |
|---|---|
name
|
The dotted connector name (e.g., "acme.weather")
TYPE:
|
platform
|
Optional platform filter. None drops all.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
OperationChain
|
OperationChain for fluent chaining |
fetch
async
¶
Fetch data from sources for a pack.
Compares files in the pack's sources with already-attached files and auto-attaches any new files found.
| PARAMETER | DESCRIPTION |
|---|---|
pack
|
Which pack to fetch sources for (e.g. "base" or a join name)
TYPE:
|
mode
|
Sync mode: "add", "update", or "sync". - "add": Only attach new files - "update": Add new files and replace changed files - "sync": Add new, replace changed, and remove deleted files
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
List[FetchResults]
|
List of FetchResults, one for each source in the pack. |
List[FetchResults]
|
Each result contains details about blocks added, replaced, and removed. |
Example
results = await c.fetch("base", "add") for result in results: print(f"{result.connector}: {len(result.added)} added")
fetch_all
async
¶
Fetch data from all defined sources.
Compares files in each source with already-attached files and auto-attaches any new files found.
| PARAMETER | DESCRIPTION |
|---|---|
mode
|
Sync mode: "add", "update", or "sync". - "add": Only attach new files - "update": Add new files and replace changed files - "sync": Add new, replace changed, and remove deleted files
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
List[FetchResults]
|
List of FetchResults, one for each source across all packs. |
List[FetchResults]
|
Includes results for sources with no changes (empty results). |
Example
results = await c.fetch_all("add") for result in results: print(f"{result.connector}: {result.total_count()} changes")
create_index ¶
create_index(columns: Union[str, List[str]], index_type: str, args: Optional[Dict[str, str]] = None, name: Optional[str] = None) -> OperationChain
Create an index on one or more columns.
| PARAMETER | DESCRIPTION |
|---|---|
columns
|
Column name (str) or list of column names (list[str])
TYPE:
|
index_type
|
Index type - "column" or "text"
TYPE:
|
args
|
Optional index-specific arguments (e.g., {"tokenizer": "en_stem"})
TYPE:
|
name
|
Optional index name (for text indexes). Auto-generated if not provided.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
OperationChain
|
OperationChain for fluent chaining |
Example
c = await c.create_index("user_id", "column") c = await c.create_index("description", "text") c = await c.create_index(["title", "description"], "text") c = await c.create_index(["title", "description"], "text", name="product_search")
rebuild_index ¶
Rebuild an index on a column.
| PARAMETER | DESCRIPTION |
|---|---|
column
|
Name of the column whose index to rebuild
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
OperationChain
|
OperationChain for fluent chaining |
Example
c = await c.rebuild_index("user_id")
reindex
async
¶
Create indexes for columns that don't have them yet.
Iterates through all defined indexes and creates index files for any blocks that don't have indexes yet. This is useful after attaching new data or recovering from partial index creation failures.
| RETURNS | DESCRIPTION |
|---|---|
PyBundleBuilder
|
The bundle after reindexing |
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If reindexing fails |
commit
async
¶
Commit the current state of the bundle.
| PARAMETER | DESCRIPTION |
|---|---|
message
|
Commit message describing the changes
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
PyBundleBuilder
|
The bundle after committing |
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If commit fails |
schema
async
¶
Get the current schema of the bundle.
| RETURNS | DESCRIPTION |
|---|---|
PySchema
|
PySchema object representing the current column structure |
to_pandas
async
¶
Convert the bundle's data to a pandas DataFrame.
| RETURNS | DESCRIPTION |
|---|---|
Any
|
pandas.DataFrame with the results |
| RAISES | DESCRIPTION |
|---|---|
ImportError
|
If pandas is not installed |
ValueError
|
If conversion fails or bundle has no data |
Example
df = await bundle.to_pandas()
to_polars
async
¶
Convert the bundle's data to a Polars DataFrame.
| RETURNS | DESCRIPTION |
|---|---|
Any
|
polars.DataFrame with the results |
| RAISES | DESCRIPTION |
|---|---|
ImportError
|
If polars is not installed |
ValueError
|
If conversion fails or bundle has no data |
Example
df = await bundle.to_polars()
to_numpy
async
¶
Convert the bundle's data to a dictionary of numpy arrays.
| RETURNS | DESCRIPTION |
|---|---|
Dict[str, Any]
|
Dictionary mapping column names to numpy arrays |
| RAISES | DESCRIPTION |
|---|---|
ImportError
|
If numpy is not installed |
ValueError
|
If conversion fails or bundle has no data |
Example
arrays = await bundle.to_numpy()
to_dict
async
¶
Convert the bundle's data to a dictionary of lists.
| RETURNS | DESCRIPTION |
|---|---|
Dict[str, List[Any]]
|
Dictionary mapping column names to lists of values |
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If conversion fails or bundle has no data |
Example
data = await bundle.to_dict()
explain
async
¶
explain(verbose: bool = False, analyze: bool = False, format: Optional[str] = None, sql: Optional[str] = None) -> RecordBatchStream
Get the query execution plan as a stream.
Generates and returns the logical and physical query plan that DataFusion will use to execute the operation pipeline.
| PARAMETER | DESCRIPTION |
|---|---|
verbose
|
If True, show more detailed plan information
TYPE:
|
analyze
|
If True, run the plan and show actual execution statistics
TYPE:
|
format
|
Output format - "indent" (default), "tree", or "graphviz"
TYPE:
|
sql
|
Optional SQL statement to explain instead of the bundle's dataframe
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
RecordBatchStream
|
RecordBatchStream with plan_type and plan columns |
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If plan generation fails |
Example
stream = await bundle.explain() batch = await stream.next() # Contains plan_type and plan columns
Supporting Classes¶
PyBundleStatus¶
PyBundleStatus ¶
Bundle status showing uncommitted changes.
PyChange¶
PyChange ¶
Information about a logical, user-level change.
Utility Functions¶
set_rust_log_level¶
set_rust_log_level ¶
Set the logging level for Rust components.
| PARAMETER | DESCRIPTION |
|---|---|
level
|
logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR
TYPE:
|
Example
import logging bundlebase.set_rust_log_level(logging.DEBUG)
test_datafile¶
random_memory_url¶
Examples¶
Basic Usage¶
import bundlebase
# Create a new bundle
c = await bundlebase.create()
# Attach data
c = await c.attach("data.parquet")
# Transform
c = await c.filter("age >= 18")
c = await c.drop_column("ssn")
# Export
df = await c.to_pandas()
Method Chaining¶
import bundlebase
c = await (bundlebase.create()
.attach("data.parquet")
.filter("active = true")
.drop_column("temp")
.rename_column("old", "new"))
df = await c.to_pandas()
Opening Saved Bundles¶
import bundlebase
# Open existing bundle
c = await bundlebase.open("/path/to/bundle")
# Extend it (creates mutable copy)
c = await c.extend()
# Add more operations
c = await c.filter("year >= 2020")
# Commit changes
await c.commit("Filtered to 2020+")
Error Handling¶
import bundlebase
try:
c = await bundlebase.create()
c = await c.attach("nonexistent.parquet")
except ValueError as e:
print(f"Error loading data: {e}")
See Also¶
- Sync API - Synchronous interface
- Conversion Functions - Data export utilities
- Operation Chains - Fluent chaining implementation