Sync API¶
The sync API provides a synchronous interface perfect for scripts and Jupyter notebooks. No async/await required!
Overview¶
The sync API is available in the bundlebase.sync module and wraps the async API with automatic event loop management:
import bundlebase.sync as dc
# No await needed!
c = dc.create()
c.attach("data.parquet")
df = c.to_pandas()
Factory Functions¶
create ¶
Create a new Bundle synchronously.
Creates an empty bundle at the specified path. Use attach() to add data.
| PARAMETER | DESCRIPTION |
|---|---|
path
|
Optional path for bundle storage (default: random memory location)
TYPE:
|
config
|
Optional configuration dict for cloud storage settings
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
SyncBundleBuilder
|
SyncBundleBuilder ready for immediate use |
Example
import bundlebase.sync as bb c = bb.create() c.attach("data.parquet") df = c.to_pandas()
With config:¶
config = {"region": "us-west-2"} c = dc.create("s3://bucket/", config=config)
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If path is invalid |
open ¶
Open an existing Bundle synchronously.
Loads a previously saved bundle from disk.
| PARAMETER | DESCRIPTION |
|---|---|
path
|
Path to the saved bundle
TYPE:
|
config
|
Optional configuration dict for cloud storage settings
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
SyncBundle
|
SyncBundle (read-only) with the loaded operations |
Example
import bundlebase.sync as bb c = bb.open("/path/to/bundle") df = c.to_pandas()
With config:¶
config = {"region": "us-west-2"} c = dc.open("s3://bucket/container", config=config)
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If bundle cannot be loaded |
Core Classes¶
SyncBundle¶
Read-only bundle class returned by bundlebase.sync.open().
SyncBundle ¶
Synchronous wrapper for PyBundle (read-only).
Provides a synchronous interface to immutable Bundle operations. All async operations are automatically executed synchronously.
| PARAMETER | DESCRIPTION |
|---|---|
async_bundle
|
The underlying PyBundle instance
TYPE:
|
schema
property
¶
Get the schema of the bundle.
| RETURNS | DESCRIPTION |
|---|---|
Any
|
PySchema object representing the current column structure |
name
property
¶
Get the bundle name.
| RETURNS | DESCRIPTION |
|---|---|
Optional[str]
|
Bundle name or None if not set |
description
property
¶
Get the bundle description.
| RETURNS | DESCRIPTION |
|---|---|
Optional[str]
|
Bundle description or None if not set |
version
property
¶
Get the bundle version.
| RETURNS | DESCRIPTION |
|---|---|
str
|
12-character hex version string |
history ¶
Get the commit history of the bundle.
| RETURNS | DESCRIPTION |
|---|---|
List[Any]
|
List of commit objects with metadata |
status ¶
Get the list of changes added since bundle creation/extension.
| RETURNS | DESCRIPTION |
|---|---|
List[Any]
|
List of PyChange objects representing uncommitted operations |
explain ¶
explain(verbose: bool = False, analyze: bool = False, format: str = None, sql: str = None) -> ExplainResult
Get the query execution plan.
| PARAMETER | DESCRIPTION |
|---|---|
verbose
|
If True, show more detailed plan information
TYPE:
|
analyze
|
If True, run the plan and show actual execution statistics
TYPE:
|
format
|
Output format - "indent" (default), "tree", or "graphviz"
TYPE:
|
sql
|
Optional SQL statement to explain instead of the bundle's dataframe
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
ExplainResult
|
ExplainResult with readable string representation |
extend ¶
Extend this bundle to create a new BundleBuilder.
This is the primary way to create a new BundleBuilder from an existing bundle. The new builder can optionally have a different data directory.
| PARAMETER | DESCRIPTION |
|---|---|
data_dir
|
Optional new data directory. If None, uses the current bundle's data_dir.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
SyncBundleBuilder
|
New SyncBundleBuilder |
Example
Extend with just a new data directory¶
builder = bundle.extend(data_dir="s3://bucket/new")
Extend and then filter¶
builder = bundle.extend() builder.filter("active = true", [])
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If data_dir is invalid |
query ¶
query(sql: str, params: Optional[List[Any]] = None, hard_limit: Optional[int] = None) -> SyncQueryResult
Execute a SQL query and return streaming results.
Unlike extend() with SQL, this does NOT create a new BundleBuilder. It directly executes the query and returns the results.
| PARAMETER | DESCRIPTION |
|---|---|
sql
|
SQL query string
TYPE:
|
params
|
Optional list of parameters for parameterized queries. If None, defaults to empty list.
TYPE:
|
hard_limit
|
Optional maximum number of rows to return. Applied at the DataFrame level for efficient execution. If None, no limit is applied.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
SyncQueryResult
|
SyncQueryResult that can be converted to pandas/polars. |
describe_function ¶
Describe a registered function's metadata.
Returns a table with columns: name, kind, input_types, return_type, runner, logic, platform, temporary.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
Dotted function name (e.g., "acme.double_val")
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
SyncQueryResult
|
SyncQueryResult with function metadata |
describe_data ¶
Describe data quality and statistics for specified columns.
Returns per-column stats: min, max, avg, null counts, top 10 values, and top 10 invalid values (when expected types are specified).
| PARAMETER | DESCRIPTION |
|---|---|
columns
|
List of column names (str) or tuples of (column_name, expected_type). Example: ["age", "salary"] or [("price", "Float64"), "name"]
|
| RETURNS | DESCRIPTION |
|---|---|
SyncQueryResult
|
SyncQueryResult with column statistics |
test_connector ¶
Test an already-imported connector by name.
Calls discover() then data() to validate the integration without modifying the bundle.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
Connector name (e.g., "http", "acme.weather")
TYPE:
|
**kwargs
|
Connector arguments (e.g., url="https://example.com/data.csv")
DEFAULT:
|
| RETURNS | DESCRIPTION |
|---|---|
SyncQueryResult
|
SyncQueryResult with sections: discover, schema, sample, result |
test_temp_connector ¶
Test a connector inline without importing it first.
Calls discover() then data() to validate the integration without modifying the bundle.
| PARAMETER | DESCRIPTION |
|---|---|
from_
|
Runtime and entrypoint string (e.g., "python::my_module:MyConnector")
TYPE:
|
**kwargs
|
Connector arguments
DEFAULT:
|
| RETURNS | DESCRIPTION |
|---|---|
SyncQueryResult
|
SyncQueryResult with sections: discover, schema, sample, result |
import_temp_connector ¶
Load a temporary connector with runtime-only logic (not persisted).
Temporary connectors are session-scoped and support all runners
including python. They are not saved on commit and must be
re-registered each session.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
Dot-separated connector name (e.g., "acme.weather")
TYPE:
|
from_
|
Runtime and logic string (e.g., "python::mod:Class")
TYPE:
|
platform
|
Docker-style platform string (default: "/")
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
SyncBundle
|
Self for fluent chaining |
drop_temp_connector ¶
Drop temporary (runtime-only) connector for a connector definition.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
The defined connector name
TYPE:
|
platform
|
Optional platform filter (e.g., "linux/amd64"). None drops all.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
str
|
Message describing what was dropped |
import_temp_function ¶
Load a temporary SQL function (not persisted).
Types and kind are auto-detected from the function's manifest
(bundlebase_metadata()).
| PARAMETER | DESCRIPTION |
|---|---|
name
|
Dotted function name (e.g., "acme.double_val")
TYPE:
|
from_
|
Runtime and logic string (e.g., "python::mod:func")
TYPE:
|
platform
|
Platform in os/arch format (default "/")
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
SyncBundle
|
Self for fluent chaining |
drop_temp_function ¶
Drop a temporary function.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
The dotted function name (e.g., "acme.double_val")
TYPE:
|
platform
|
Optional platform filter. None drops all.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
str
|
Message describing what was dropped |
SyncBundleBuilder¶
Mutable bundle class returned by bundlebase.sync.create() and transformation methods.
SyncBundleBuilder ¶
Bases: SyncBundle
Synchronous wrapper for PyBundleBuilder (mutable).
Provides a synchronous interface to mutable Bundle operations with fluent chaining support (no await needed).
Persistent vs Temporary imports
Functions and connectors can be imported as persistent or temporary (temp):
-
Temporary (
import_temp_function/import_temp_connector): Session-scoped, supports all runners includingpython. Not saved on commit; must be re-registered each session. Corresponds toIMPORT TEMP FUNCTION/IMPORT TEMP CONNECTORin SQL. -
Persistent (
import_function/import_connector): Bundled into the data package and replayed on open. Requires a serializable runner (notpython). Corresponds toIMPORT FUNCTION/IMPORT CONNECTORin SQL.
Temp definitions override persistent ones at resolution time.
Example
c = dc.create() c.attach("data.parquet").filter("active = true").drop_column("email") df = c.to_pandas()
attach ¶
Attach a data source to the bundle.
| PARAMETER | DESCRIPTION |
|---|---|
location
|
The URL/path of the data to attach
TYPE:
|
pack
|
The pack to attach to. Use "base" for the base pack, or a join name to attach to that join's pack.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
SyncBundleBuilder
|
Self for fluent chaining |
detach_block ¶
Detach a data block from the bundle by its location.
| PARAMETER | DESCRIPTION |
|---|---|
location
|
The location (URL) of the block to detach
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
SyncBundleBuilder
|
Self for fluent chaining |
replace_block ¶
Replace a block's data location in the bundle.
Changes where a block's data is read from without changing the block's identity. Useful when data files are moved to a new location.
| PARAMETER | DESCRIPTION |
|---|---|
old_location
|
The current location (URL) of the block
TYPE:
|
new_location
|
The new location (URL) to read data from
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
SyncBundleBuilder
|
Self for fluent chaining |
add_column ¶
Add a computed column to the bundle.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
Name for the new column
TYPE:
|
expression
|
SQL expression to compute the column value
TYPE:
|
cast_column ¶
Cast a column to a different data type.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
Name of the column to cast
TYPE:
|
new_type
|
Target type (e.g., "integer", "float", "string")
TYPE:
|
normalize_column_names ¶
Normalize all column names to lowercase+underscore identifiers.
rename_column ¶
Rename a column.
| PARAMETER | DESCRIPTION |
|---|---|
old_name
|
Current column name
TYPE:
|
new_name
|
New column name
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
SyncBundleBuilder
|
Self for fluent chaining |
filter ¶
Filter rows based on a SQL SELECT query.
| PARAMETER | DESCRIPTION |
|---|---|
query
|
SQL SELECT query (e.g., "SELECT * FROM bundle WHERE salary > $1")
TYPE:
|
params
|
Optional list of parameters for parameterized queries ($1, $2, etc.). If None, defaults to empty list.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
SyncBundleBuilder
|
Self for fluent chaining |
Example
c.filter("SELECT * FROM bundle WHERE salary > $1", [50000.0]) c.filter("SELECT * FROM bundle WHERE active = true")
delete ¶
Delete rows matching a WHERE condition.
| PARAMETER | DESCRIPTION |
|---|---|
where_clause
|
SQL WHERE condition (e.g., "salary < 0")
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
SyncBundleBuilder
|
Self for fluent chaining |
Example
c.delete("salary < 0") c.delete("status = 'inactive' AND last_login < '2020-01-01'")
update ¶
Update rows matching a WHERE condition with new values.
| PARAMETER | DESCRIPTION |
|---|---|
set_where
|
SET and WHERE clause (e.g., "SET salary = 100 WHERE id = 1")
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
SyncBundleBuilder
|
Self for fluent chaining |
always_delete ¶
Register a persistent always-delete rule and immediately delete matching rows.
| PARAMETER | DESCRIPTION |
|---|---|
where_clause
|
SQL WHERE condition (e.g., "salary < 0")
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
SyncBundleBuilder
|
Self for fluent chaining |
drop_always_delete ¶
Remove always-delete rules.
| PARAMETER | DESCRIPTION |
|---|---|
where_clause
|
Specific rule to remove, or None to remove all rules
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
SyncBundleBuilder
|
Self for fluent chaining |
always_update ¶
Register a persistent always-update rule and immediately update matching rows.
| PARAMETER | DESCRIPTION |
|---|---|
set_where
|
SET and WHERE clause (e.g., "SET salary = 0 WHERE salary < 0")
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
SyncBundleBuilder
|
Self for fluent chaining |
drop_always_update ¶
Remove always-update rules.
| PARAMETER | DESCRIPTION |
|---|---|
rule_text
|
Specific rule to remove ("SET ... WHERE ..."), or None to remove all
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
SyncBundleBuilder
|
Self for fluent chaining |
join ¶
Join with another data source.
If location is None, the join point is created without any initial data. Data can be attached later using attach(location, pack=name) or create_source(pack=name).
drop_join ¶
Drop an existing join.
| PARAMETER | DESCRIPTION |
|---|---|
join_name
|
Name of the join to drop
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
SyncBundleBuilder
|
Self for fluent chaining |
create_source ¶
Create a data source for a pack.
| PARAMETER | DESCRIPTION |
|---|---|
connector
|
Connector name (e.g., "remote_dir" for built-in, "acme.weather" for custom)
TYPE:
|
args
|
Dictionary of connector-specific arguments
TYPE:
|
pack
|
Which pack to define the source for: - "base" (default): The base pack - A join name: A joined pack by its join name
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
SyncBundleBuilder
|
Self for fluent chaining |
import_connector ¶
Load a named connector with logic (persisted).
Persistent connectors are bundled into the data package and replayed
on open. The python runner cannot be persisted -- use
import_temp_connector for in-process Python connectors.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
Dot-separated connector name (e.g., "acme.weather")
TYPE:
|
from_
|
Runtime and logic string (e.g., "python::mod:Class")
TYPE:
|
platform
|
Docker-style platform string (default: "/")
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
SyncBundleBuilder
|
Self for fluent chaining |
describe_function ¶
Describe a registered function's metadata.
Returns a table with columns: name, kind, input_types, return_type, runner, logic, platform, temporary.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
Dotted function name (e.g., "acme.double_val")
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
SyncQueryResult
|
SyncQueryResult with function metadata |
describe_data ¶
Describe data quality and statistics for specified columns.
Returns per-column stats: min, max, avg, null counts, top 10 values, and top 10 invalid values (when expected types are specified).
| PARAMETER | DESCRIPTION |
|---|---|
columns
|
List of column names (str) or tuples of (column_name, expected_type). Example: ["age", "salary"] or [("price", "Float64"), "name"]
|
| RETURNS | DESCRIPTION |
|---|---|
SyncQueryResult
|
SyncQueryResult with column statistics |
test_connector ¶
Test an already-imported connector by name.
Calls discover() then data() to validate the integration without modifying the bundle.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
Connector name (e.g., "http", "acme.weather")
TYPE:
|
**kwargs
|
Connector arguments (e.g., url="https://example.com/data.csv")
DEFAULT:
|
| RETURNS | DESCRIPTION |
|---|---|
SyncQueryResult
|
SyncQueryResult with sections: discover, schema, sample, result |
test_temp_connector ¶
Test a connector inline without importing it first.
Calls discover() then data() to validate the integration without modifying the bundle.
| PARAMETER | DESCRIPTION |
|---|---|
from_
|
Runtime and entrypoint string (e.g., "python::my_module:MyConnector")
TYPE:
|
**kwargs
|
Connector arguments
DEFAULT:
|
| RETURNS | DESCRIPTION |
|---|---|
SyncQueryResult
|
SyncQueryResult with sections: discover, schema, sample, result |
import_temp_connector ¶
Load a temporary connector with runtime-only logic (not persisted).
Temporary connectors are session-scoped and support all runners
including python. They are not saved on commit and must be
re-registered each session.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
Dot-separated connector name (e.g., "acme.weather")
TYPE:
|
from_
|
Runtime and logic string (e.g., "python::mod:Class")
TYPE:
|
platform
|
Docker-style platform string (default: "/")
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
SyncBundleBuilder
|
Self for fluent chaining |
rename_connector ¶
Rename a connector to a new dotted name.
Renames all entries and updates sources referencing the old connector name.
| PARAMETER | DESCRIPTION |
|---|---|
old_name
|
Current connector name (e.g., "acme.weather")
TYPE:
|
new_name
|
New connector name (e.g., "acme.weather_v2")
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
SyncBundleBuilder
|
Self for fluent chaining |
drop_connector ¶
Drop a connector. Without a platform, removes the entire definition. With a platform, removes only the logic for that platform.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
The dotted connector name (e.g., "acme.weather")
TYPE:
|
platform
|
Optional platform filter (e.g., "linux/amd64"). None drops entire connector.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
SyncBundleBuilder
|
Self for fluent chaining |
drop_temp_connector ¶
Drop temporary (runtime-only) connector for a connector definition.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
The defined connector name
TYPE:
|
platform
|
Optional platform filter (e.g., "linux/amd64"). None drops all.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
str
|
Message describing what was dropped |
drop_temp_function ¶
Drop a temporary function.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
The dotted function name (e.g., "acme.double_val")
TYPE:
|
platform
|
Optional platform filter. None drops all.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
str
|
Message describing what was dropped |
fetch ¶
Fetch data from sources for a pack.
Checks the pack's sources for new files and attaches them to the bundle.
| PARAMETER | DESCRIPTION |
|---|---|
pack
|
Which pack to fetch sources for: - "base" (default): The base pack - A join name: A joined pack by its join name
TYPE:
|
mode
|
Sync mode (default: "add"): - "add": Only attach new files - "update": Add new files and replace changed files - "sync": Add new, replace changed, and remove deleted files
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
List[FetchResults]
|
List of FetchResults, one for each source in the pack. |
List[FetchResults]
|
Each result contains details about blocks added, replaced, and removed. |
fetch_all ¶
Fetch data from all defined sources.
Checks all defined sources for new files and attaches them to the bundle.
| PARAMETER | DESCRIPTION |
|---|---|
mode
|
Sync mode (default: "add"): - "add": Only attach new files - "update": Add new files and replace changed files - "sync": Add new, replace changed, and remove deleted files
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
List[FetchResults]
|
List of FetchResults, one for each source across all packs. |
List[FetchResults]
|
Includes results for sources with no changes (empty results). |
extend ¶
Extend this bundle to create a new BundleBuilder.
| PARAMETER | DESCRIPTION |
|---|---|
data_dir
|
Optional new data directory. If None, uses the current bundle's data_dir.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
SyncBundleBuilder
|
New SyncBundleBuilder |
query ¶
query(sql: str, params: Optional[List[Any]] = None, hard_limit: Optional[int] = None) -> SyncQueryResult
Execute a SQL query and return streaming results.
Unlike extend() with SQL, this does NOT create a new BundleBuilder. It directly executes the query and returns the results.
| PARAMETER | DESCRIPTION |
|---|---|
sql
|
SQL query string
TYPE:
|
params
|
Optional list of parameters for parameterized queries. If None, defaults to empty list.
TYPE:
|
hard_limit
|
Optional maximum number of rows to return. Applied at the DataFrame level for efficient execution. If None, no limit is applied.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
SyncQueryResult
|
SyncQueryResult that can be converted to pandas/polars. |
create_view ¶
Create a view from a SQL query.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
Name for the new view
TYPE:
|
sql
|
SQL query defining the view contents
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
SyncBundleBuilder
|
SyncBundleBuilder for the new view |
import_function ¶
Load a named SQL function (persisted).
Persistent functions are bundled into the data package and replayed on
open. The python runner cannot be persisted -- use
import_temp_function for in-process Python functions.
Types and kind are auto-detected from the function's manifest.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
Dotted function name (e.g., "acme.double_val") or wildcard (e.g., "acme.*")
TYPE:
|
from_
|
Runtime and logic string (e.g., "python::mod:func")
TYPE:
|
platform
|
Platform in os/arch format (default "/")
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
SyncBundleBuilder
|
SyncBundleBuilder for chaining |
import_temp_function ¶
Load a temporary SQL function (not persisted).
Types and kind are auto-detected from the function's manifest
(bundlebase_metadata()).
| PARAMETER | DESCRIPTION |
|---|---|
name
|
Dotted function name (e.g., "acme.double_val")
TYPE:
|
from_
|
Runtime and logic string (e.g., "python::mod:func")
TYPE:
|
platform
|
Platform in os/arch format (default "/")
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
SyncBundleBuilder
|
SyncBundleBuilder for chaining |
rename_function ¶
Rename a function to a new dotted name.
Renames all entries, deregisters old UDFs, and re-registers under the new name.
| PARAMETER | DESCRIPTION |
|---|---|
old_name
|
Current function name (e.g., "acme.double_val")
TYPE:
|
new_name
|
New function name (e.g., "acme.double_val_v2")
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
SyncBundleBuilder
|
SyncBundleBuilder for chaining |
drop_function ¶
Drop a function. Without a platform or input_types, removes the entire definition. With a platform, removes only the logic for that platform. With input_types, removes only the overload matching that signature.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
The dotted function name (e.g., "acme.double_val")
TYPE:
|
platform
|
Optional platform filter (e.g., "linux/amd64"). None drops entire function.
TYPE:
|
input_types
|
Optional list of Arrow type names to drop a specific overload.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
SyncBundleBuilder
|
SyncBundleBuilder for chaining |
create_index ¶
create_index(columns: Union[str, List[str]], index_type: str, args: Optional[Dict[str, str]] = None, name: Optional[str] = None) -> SyncBundleBuilder
Create an index on one or more columns.
| PARAMETER | DESCRIPTION |
|---|---|
columns
|
Column name (str) or list of column names (list[str])
TYPE:
|
index_type
|
Index type - "column" or "text"
TYPE:
|
args
|
Optional index-specific arguments (e.g., {"tokenizer": "en_stem"})
TYPE:
|
name
|
Optional index name (for text indexes). Auto-generated if not provided.
TYPE:
|
rebuild_index ¶
Rebuild an existing index on a column.
reindex ¶
Create indexes for columns that don't have them yet.
Iterates through all defined indexes and creates index files for any blocks that don't have indexes yet. This is useful after attaching new data or recovering from partial index creation failures.
| RETURNS | DESCRIPTION |
|---|---|
SyncBundleBuilder
|
Self for fluent chaining |
Utility Functions¶
stream_batches ¶
Stream RecordBatches from a bundle synchronously.
WARNING: This function materializes ALL batches in memory first, then yields them. This is a limitation of the synchronous API due to Python's threading model. For true streaming with constant memory usage, use the async API: async for batch in bundlebase.stream_batches(bundle): process(batch)
For better memory efficiency with the sync API, consider: 1. Using pandas/polars conversion instead of streaming 2. Processing smaller subsets of data (using filter operations) 3. Using the async API instead
| PARAMETER | DESCRIPTION |
|---|---|
bundle
|
SyncBundle to stream from
TYPE:
|
| YIELDS | DESCRIPTION |
|---|---|
Any
|
pyarrow.RecordBatch objects (all loaded into memory first) |
Example
import bundlebase.sync as bb c = bb.create().attach("data.parquet") for batch in bb.stream_batches©: ... print(f"Processing {batch.num_rows} rows")
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If streaming fails |
Examples¶
Simple Script¶
import bundlebase.sync as dc
# Create and process data
c = dc.create()
c.attach("userdata.parquet")
c.filter("salary > 50000")
c.drop_column("email")
# Export
df = c.to_pandas()
print(f"Found {len(df)} high earners")
Method Chaining¶
import bundlebase.sync as dc
df = (dc.create()
.attach("data.parquet")
.drop_column("email")
.filter("active = true")
.rename_column("fname", "first_name")
.to_pandas())
Jupyter Notebook¶
First, install the jupyter extra:
Then in your notebook:
import bundlebase.sync as dc
c = dc.create().attach("data.parquet")
display(c.to_pandas()) # Nice table in notebook
Streaming Large Datasets¶
import bundlebase.sync as dc
c = dc.create().attach("huge_dataset.parquet")
total_rows = 0
for batch in dc.stream_batches(c):
# Process batch (~100MB)
total_rows += batch.num_rows
print(f"Processed {batch.num_rows} rows")
print(f"Total: {total_rows}")
Saving and Loading¶
import bundlebase.sync as dc
# Create and save
c = dc.create("/tmp/my_bundle")
c.attach("data.parquet")
c.filter("year >= 2020")
c.commit("Filtered to 2020+")
# Later, load
c = dc.open("/tmp/my_bundle")
df = c.to_pandas()
Async vs Sync Comparison¶
Async API¶
import bundlebase
import asyncio
async def process():
c = await bundlebase.create()
c = await c.attach("data.parquet")
df = await c.to_pandas()
return df
df = asyncio.run(process())
Sync API¶
Performance Notes¶
Overhead¶
The sync API adds minimal overhead:
- Scripts: ~0.1ms per operation (persistent event loop)
- Jupyter: ~0.2ms per operation (nested asyncio)
This is negligible compared to data I/O time.
Optimization¶
Chaining operations reduces overhead:
# Good: One event loop call
df = (dc.create()
.attach("data.parquet")
.filter("x > 10")
.to_pandas())
# Less optimal: Multiple event loop calls
c = dc.create()
c.attach("data.parquet")
c.filter("x > 10")
df = c.to_pandas()
Error Handling¶
Handle errors like regular Python code:
import bundlebase.sync as dc
try:
c = dc.create()
c.attach("nonexistent.parquet")
except ValueError as e:
print(f"Failed to load data: {e}")
Migration from Async¶
Migration is straightforward:
Before (Async)¶
import bundlebase
async def process():
c = await bundlebase.create()
c = await c.attach("data.parquet")
c = await c.filter("x > 10")
return await c.to_pandas()
After (Sync)¶
import bundlebase.sync as dc
c = dc.create()
c.attach("data.parquet")
c.filter("x > 10")
df = c.to_pandas()
Just remove await and import bundlebase.sync!
Troubleshooting¶
ImportError: nest_asyncio required¶
Install Jupyter support:
"No event loop running" in Jupyter¶
Make sure you've imported from bundlebase.sync:
See Also¶
- Async API - Async/await interface
- Quick Start Guide - Side-by-side examples
- Examples - Practical code examples