Skip to content

Operation Chains

Fluent method chaining for Bundlebase operations.

Overview

Bundlebase uses operation chains to enable fluent method chaining while maintaining clean async/await syntax. This allows you to queue multiple operations before execution.

Chain Classes

OperationChain

OperationChain

OperationChain(bundle: Any)

Chains multiple async operations on a Bundle with a single await.

This class queues up method calls and executes them all when awaited, allowing for fluent chaining with a single await at the end.

Example

c = await (c.attach("data.parquet") .drop_column("unwanted") .rename_column("old_name", "new_name"))

PARAMETER DESCRIPTION
bundle

PyBundleBuilder or PyBundle instance

TYPE: Any

__getattr__

__getattr__(name: str) -> Any

Intercept method calls to queue operations or handle final conversions.

PARAMETER DESCRIPTION
name

Method name

TYPE: str

RETURNS DESCRIPTION
Any

A callable that either queues the operation or executes the chain

RAISES DESCRIPTION
AttributeError

If the method name is not a valid operation

__await__

__await__()

Make this chain awaitable.

Executes all queued operations in order and returns the final bundle.

__del__

__del__()

Detect if operations were never executed.

Logs a warning if the chain had queued operations that were never executed (i.e., the chain was never awaited).

CreateChain

CreateChain

CreateChain(create_func: Callable, *create_args: Any)

Awaitable chain for fluent creation and chaining in one go.

Handles the special case of creating a bundle first, then chaining operations. Unlike OperationChain, this starts without a bundle and creates one first.

Example

c = await (create(path) .attach("data.parquet") .drop_column("unwanted"))

PARAMETER DESCRIPTION
create_func

The async create or open function from Rust bindings

TYPE: Callable

*create_args

Arguments to pass to the create function

TYPE: Any DEFAULT: ()

__getattr__

__getattr__(name: str) -> Any

Intercept method calls to queue operations or handle final conversions.

PARAMETER DESCRIPTION
name

Method name

TYPE: str

RETURNS DESCRIPTION
Any

A callable that either queues the operation or executes the chain

__await__

__await__()

Make this chain awaitable.

Executes all queued operations and returns the final bundle.

__del__

__del__()

Detect if create/open was never executed.

Logs a warning if the creation chain was never awaited.

ExtendChain

ExtendChain

ExtendChain(original_extend_method: Callable, existing_bundle: Any, data_dir: str)

Awaitable chain for extending an existing bundle to a new directory.

Handles the special case of extending an existing bundle, then chaining operations. Unlike OperationChain, extend() is synchronous and returns immediately, allowing chaining to begin without awaiting first.

Example

c = await (existing_c.extend("/new/path") .attach("new_data.parquet") .drop_column("unwanted"))

PARAMETER DESCRIPTION
original_extend_method

The original (unwrapped) extend method from Rust

TYPE: Callable

existing_bundle

The PyBundle to extend

TYPE: Any

data_dir

The directory path for the new extended bundle

TYPE: str

__getattr__

__getattr__(name: str) -> Any

Intercept method calls to queue operations or handle final conversions.

PARAMETER DESCRIPTION
name

Method name

TYPE: str

RETURNS DESCRIPTION
Any

A callable that either queues the operation or executes the chain

__await__

__await__()

Make this chain awaitable.

Executes all queued operations and returns the final bundle.

__del__

__del__()

Detect if extend chain was never executed.

Logs a warning if the extend chain was never awaited.

How It Works

When you call a mutation method on a bundle, it returns an OperationChain that queues the operation:

import bundlebase

# Each method call returns a chain
c = await bundlebase.create()  # CreateChain
c = await c.attach("data.parquet")  # OperationChain
c = await c.filter("age >= 18")  # OperationChain
c = await c.drop_column("ssn")  # OperationChain

# Final await executes all queued operations
df = await c.to_pandas()

Examples

Basic Chaining

import bundlebase

# Chain operations before execution
c = await (bundlebase.create()
    .attach("data.parquet")
    .filter("active = true")
    .drop_column("temp"))

# Execute when needed
df = await c.to_pandas()

Create Chain

The create() function returns a CreateChain that can queue operations before the bundle is created:

import bundlebase

# All operations are queued
chain = (bundlebase.create()
    .attach("data.parquet")
    .filter("age >= 18")
    .rename_column("fname", "first_name"))

# Single await executes creation + all operations
c = await chain
df = await c.to_pandas()

Extend Chain

When extending a bundle, you get an ExtendChain:

import bundlebase

# Open existing bundle
original = await bundlebase.open("/path/to/bundle")

# Extend with chained operations
extended = await (original.extend("/path/to/new/bundle")
    .attach("new_data.parquet")
    .filter("year >= 2020"))

# Commit the extended bundle
await extended.commit("Added 2020+ data")

Mixed Chaining and Direct Calls

You can mix chaining with direct method calls:

import bundlebase

# Start with a chain
c = await (bundlebase.create()
    .attach("data.parquet")
    .filter("age >= 18"))

# Continue with direct calls
c = await c.drop_column("ssn")
c = await c.rename_column("fname", "first_name")

# Or resume chaining
c = await (c
    .query("SELECT id, first_name, last_name FROM bundle")
    .filter("active = true"))

df = await c.to_pandas()

Benefits of Operation Chains

1. Clean Syntax

Chaining enables clean, readable code:

# With chaining
c = await (bundlebase.create()
    .attach("data.parquet")
    .filter("age >= 18")
    .drop_column("ssn"))

# Without chaining (more verbose)
c = await bundlebase.create()
c = await c.attach("data.parquet")
c = await c.filter("age >= 18")
c = await c.drop_column("ssn")

2. Performance

Operations are queued and executed together, reducing async overhead:

# Good: Single async call
c = await (bundlebase.create()
    .attach("data.parquet")
    .filter("age >= 18")
    .drop_column("ssn"))  # All operations execute together

# Less optimal: Multiple async calls
c = await bundlebase.create()
c = await c.attach("data.parquet")  # Separate async call
c = await c.filter("age >= 18")  # Separate async call
c = await c.drop_column("ssn")  # Separate async call

3. Type Safety

Chains maintain type information for IDE autocomplete:

import bundlebase

# IDE knows c is PyBundleBuilder
c = await (bundlebase.create()
    .attach("data.parquet")  # Autocomplete available
    .filter("age >= 18")  # Autocomplete available
    .drop_column("ssn"))  # Autocomplete available

Advanced Usage

Conditional Chaining

Build chains conditionally:

import bundlebase

# Start with base chain
chain = bundlebase.create().attach("data.parquet")

# Conditionally add operations
if filter_active:
    chain = chain.filter("active = true")

if remove_pii:
    chain = chain.drop_column("email")
    chain = chain.drop_column("phone")

# Execute final chain
c = await chain

Reusable Chain Builders

Create functions that return chains:

import bundlebase
from bundlebase.chain import OperationChain

def clean_customer_data(chain: OperationChain) -> OperationChain:
    """Standard customer data cleaning."""
    return (chain
        .filter("active = true")
        .drop_column("email")
        .drop_column("phone")
        .rename_column("fname", "first_name")
        .rename_column("lname", "last_name"))

# Use the builder
c = await clean_customer_data(
    bundlebase.create().attach("customers.parquet")
)

df = await c.to_pandas()

Programmatic Chain Building

Build chains programmatically:

import bundlebase

# Start with create
chain = bundlebase.create().attach("data.parquet")

# Add operations from a list
operations = [
    ("filter", ["age >= 18"], {}),
    ("drop_column", ["ssn"], {}),
    ("rename_column", ["fname", "first_name"], {}),
]

for method, args, kwargs in operations:
    chain = getattr(chain, method)(*args, **kwargs)

# Execute
c = await chain

Implementation Details

Operation chains work by:

  1. Queueing Operations: Each method call adds to an operation queue
  2. Deferring Execution: No operations execute until the chain is awaited
  3. Batch Execution: All queued operations execute together when awaited

This provides both clean syntax and good performance.

Sync API Chains

The sync API also supports chaining:

import bundlebase.sync as dc

# Chains work in sync API too
c = (dc.create()
    .attach("data.parquet")
    .filter("age >= 18")
    .drop_column("ssn"))

df = c.to_pandas()  # No await needed

See Also