Operation Chains¶
Fluent method chaining for Bundlebase operations.
Overview¶
Bundlebase uses operation chains to enable fluent method chaining while maintaining clean async/await syntax. This allows you to queue multiple operations before execution.
Chain Classes¶
OperationChain¶
OperationChain ¶
Chains multiple async operations on a Bundle with a single await.
This class queues up method calls and executes them all when awaited, allowing for fluent chaining with a single await at the end.
Example
c = await (c.attach("data.parquet") .drop_column("unwanted") .rename_column("old_name", "new_name"))
| PARAMETER | DESCRIPTION |
|---|---|
bundle
|
PyBundleBuilder or PyBundle instance
TYPE:
|
__getattr__ ¶
Intercept method calls to queue operations or handle final conversions.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
Method name
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Any
|
A callable that either queues the operation or executes the chain |
| RAISES | DESCRIPTION |
|---|---|
AttributeError
|
If the method name is not a valid operation |
__await__ ¶
Make this chain awaitable.
Executes all queued operations in order and returns the final bundle.
__del__ ¶
Detect if operations were never executed.
Logs a warning if the chain had queued operations that were never executed (i.e., the chain was never awaited).
CreateChain¶
CreateChain ¶
Awaitable chain for fluent creation and chaining in one go.
Handles the special case of creating a bundle first, then chaining operations. Unlike OperationChain, this starts without a bundle and creates one first.
Example
c = await (create(path) .attach("data.parquet") .drop_column("unwanted"))
| PARAMETER | DESCRIPTION |
|---|---|
create_func
|
The async create or open function from Rust bindings
TYPE:
|
*create_args
|
Arguments to pass to the create function
TYPE:
|
__getattr__ ¶
Intercept method calls to queue operations or handle final conversions.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
Method name
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Any
|
A callable that either queues the operation or executes the chain |
__await__ ¶
Make this chain awaitable.
Executes all queued operations and returns the final bundle.
__del__ ¶
Detect if create/open was never executed.
Logs a warning if the creation chain was never awaited.
ExtendChain¶
ExtendChain ¶
Awaitable chain for extending an existing bundle to a new directory.
Handles the special case of extending an existing bundle, then chaining operations. Unlike OperationChain, extend() is synchronous and returns immediately, allowing chaining to begin without awaiting first.
Example
c = await (existing_c.extend("/new/path") .attach("new_data.parquet") .drop_column("unwanted"))
| PARAMETER | DESCRIPTION |
|---|---|
original_extend_method
|
The original (unwrapped) extend method from Rust
TYPE:
|
existing_bundle
|
The PyBundle to extend
TYPE:
|
data_dir
|
The directory path for the new extended bundle
TYPE:
|
__getattr__ ¶
Intercept method calls to queue operations or handle final conversions.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
Method name
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Any
|
A callable that either queues the operation or executes the chain |
__await__ ¶
Make this chain awaitable.
Executes all queued operations and returns the final bundle.
__del__ ¶
Detect if extend chain was never executed.
Logs a warning if the extend chain was never awaited.
How It Works¶
When you call a mutation method on a bundle, it returns an OperationChain that queues the operation:
import bundlebase
# Each method call returns a chain
c = await bundlebase.create() # CreateChain
c = await c.attach("data.parquet") # OperationChain
c = await c.filter("age >= 18") # OperationChain
c = await c.drop_column("ssn") # OperationChain
# Final await executes all queued operations
df = await c.to_pandas()
Examples¶
Basic Chaining¶
import bundlebase
# Chain operations before execution
c = await (bundlebase.create()
.attach("data.parquet")
.filter("active = true")
.drop_column("temp"))
# Execute when needed
df = await c.to_pandas()
Create Chain¶
The create() function returns a CreateChain that can queue operations before the bundle is created:
import bundlebase
# All operations are queued
chain = (bundlebase.create()
.attach("data.parquet")
.filter("age >= 18")
.rename_column("fname", "first_name"))
# Single await executes creation + all operations
c = await chain
df = await c.to_pandas()
Extend Chain¶
When extending a bundle, you get an ExtendChain:
import bundlebase
# Open existing bundle
original = await bundlebase.open("/path/to/bundle")
# Extend with chained operations
extended = await (original.extend("/path/to/new/bundle")
.attach("new_data.parquet")
.filter("year >= 2020"))
# Commit the extended bundle
await extended.commit("Added 2020+ data")
Mixed Chaining and Direct Calls¶
You can mix chaining with direct method calls:
import bundlebase
# Start with a chain
c = await (bundlebase.create()
.attach("data.parquet")
.filter("age >= 18"))
# Continue with direct calls
c = await c.drop_column("ssn")
c = await c.rename_column("fname", "first_name")
# Or resume chaining
c = await (c
.query("SELECT id, first_name, last_name FROM bundle")
.filter("active = true"))
df = await c.to_pandas()
Benefits of Operation Chains¶
1. Clean Syntax¶
Chaining enables clean, readable code:
# With chaining
c = await (bundlebase.create()
.attach("data.parquet")
.filter("age >= 18")
.drop_column("ssn"))
# Without chaining (more verbose)
c = await bundlebase.create()
c = await c.attach("data.parquet")
c = await c.filter("age >= 18")
c = await c.drop_column("ssn")
2. Performance¶
Operations are queued and executed together, reducing async overhead:
# Good: Single async call
c = await (bundlebase.create()
.attach("data.parquet")
.filter("age >= 18")
.drop_column("ssn")) # All operations execute together
# Less optimal: Multiple async calls
c = await bundlebase.create()
c = await c.attach("data.parquet") # Separate async call
c = await c.filter("age >= 18") # Separate async call
c = await c.drop_column("ssn") # Separate async call
3. Type Safety¶
Chains maintain type information for IDE autocomplete:
import bundlebase
# IDE knows c is PyBundleBuilder
c = await (bundlebase.create()
.attach("data.parquet") # Autocomplete available
.filter("age >= 18") # Autocomplete available
.drop_column("ssn")) # Autocomplete available
Advanced Usage¶
Conditional Chaining¶
Build chains conditionally:
import bundlebase
# Start with base chain
chain = bundlebase.create().attach("data.parquet")
# Conditionally add operations
if filter_active:
chain = chain.filter("active = true")
if remove_pii:
chain = chain.drop_column("email")
chain = chain.drop_column("phone")
# Execute final chain
c = await chain
Reusable Chain Builders¶
Create functions that return chains:
import bundlebase
from bundlebase.chain import OperationChain
def clean_customer_data(chain: OperationChain) -> OperationChain:
"""Standard customer data cleaning."""
return (chain
.filter("active = true")
.drop_column("email")
.drop_column("phone")
.rename_column("fname", "first_name")
.rename_column("lname", "last_name"))
# Use the builder
c = await clean_customer_data(
bundlebase.create().attach("customers.parquet")
)
df = await c.to_pandas()
Programmatic Chain Building¶
Build chains programmatically:
import bundlebase
# Start with create
chain = bundlebase.create().attach("data.parquet")
# Add operations from a list
operations = [
("filter", ["age >= 18"], {}),
("drop_column", ["ssn"], {}),
("rename_column", ["fname", "first_name"], {}),
]
for method, args, kwargs in operations:
chain = getattr(chain, method)(*args, **kwargs)
# Execute
c = await chain
Implementation Details¶
Operation chains work by:
- Queueing Operations: Each method call adds to an operation queue
- Deferring Execution: No operations execute until the chain is awaited
- Batch Execution: All queued operations execute together when awaited
This provides both clean syntax and good performance.
Sync API Chains¶
The sync API also supports chaining:
import bundlebase.sync as dc
# Chains work in sync API too
c = (dc.create()
.attach("data.parquet")
.filter("age >= 18")
.drop_column("ssn"))
df = c.to_pandas() # No await needed
See Also¶
- Async API - Main API documentation
- Quick Start - Chaining examples
- Basic Concepts - Architecture overview