Skip to content

Conversion Functions

Convert Bundlebase bundles to various data formats.

Overview

Bundlebase provides utilities to convert bundles to pandas, Polars, NumPy, and Python dictionaries. All conversion functions use streaming internally to handle datasets larger than RAM.

Functions

to_pandas

to_pandas async

to_pandas(bundle: PyBundle | PyBundleBuilder) -> pd.DataFrame

Convert the bundle's data to a pandas DataFrame using streaming.

This function uses streaming internally to handle large datasets efficiently. Data is processed in batches to avoid loading everything into memory at once.

PARAMETER DESCRIPTION
bundle

The Bundle to convert

TYPE: PyBundle | PyBundleBuilder

RETURNS DESCRIPTION
DataFrame

pd.DataFrame: The data as a pandas DataFrame with proper column types

RAISES DESCRIPTION
ImportError

If pandas is not installed

ValueError

If query execution fails

RuntimeError

If conversion fails

Example

c = await bundlebase.create() c = await c.attach("data.parquet") df = await c.to_pandas() # Using instance method - streams internally df.head()

to_polars

to_polars async

to_polars(bundle: PyBundle | PyBundleBuilder) -> pl.DataFrame

Convert the bundle's data to a Polars DataFrame using streaming.

This function uses streaming internally to handle large datasets efficiently. Polars can efficiently handle batched data construction.

PARAMETER DESCRIPTION
bundle

The Bundle to convert

TYPE: PyBundle | PyBundleBuilder

RETURNS DESCRIPTION
DataFrame

pl.DataFrame: The data as a Polars DataFrame

RAISES DESCRIPTION
ImportError

If polars is not installed

ValueError

If query execution fails

RuntimeError

If conversion fails

Example

c = await bundlebase.create() c = await c.attach("data.parquet") df = await c.to_polars() # Using instance method - streams internally df.head()

to_numpy

to_numpy async

to_numpy(bundle: PyBundle) -> Dict[str, np.ndarray]

Convert the bundle's data to a dictionary of numpy arrays.

Each column becomes a key in the returned dictionary with a numpy array as its value. Null values are preserved (as NaN for float columns, None for object columns).

PARAMETER DESCRIPTION
bundle

The Bundle to convert

TYPE: PyBundle

RETURNS DESCRIPTION
dict

Dictionary mapping column names to numpy arrays

TYPE: Dict[str, ndarray]

RAISES DESCRIPTION
ImportError

If numpy is not installed (required by PyArrow)

ValueError

If query execution fails

RuntimeError

If conversion fails

Example

c = await bundlebase.create() c = await c.attach("data.parquet") arrays = await c.to_numpy() # Using instance method arrays['column_name']

to_dict

to_dict async

to_dict(bundle: PyBundle) -> Dict[str, list]

Convert the bundle's data to a dictionary of lists.

Each column becomes a key in the returned dictionary with a list of values. This is useful for JSON serialization or working with generic Python structures.

PARAMETER DESCRIPTION
bundle

The Bundle to convert

TYPE: PyBundle

RETURNS DESCRIPTION
dict

Dictionary mapping column names to lists of values

TYPE: Dict[str, list]

RAISES DESCRIPTION
ValueError

If query execution fails

RuntimeError

If conversion fails

Example

c = await bundlebase.create() c = await c.attach("data.parquet") data = await c.to_dict() # Using instance method import json json.dumps(data)

stream_batches

stream_batches async

stream_batches(bundle: PyBundle | PyBundleBuilder) -> AsyncIterator[pa.RecordBatch]

Iterate over RecordBatches without materializing the full dataset.

This function provides streaming access to data, processing one batch at a time. This is memory-efficient for large datasets that don't fit in RAM.

PARAMETER DESCRIPTION
bundle

The Bundle to stream from

TYPE: PyBundle | PyBundleBuilder

YIELDS DESCRIPTION
AsyncIterator[RecordBatch]

pa.RecordBatch: PyArrow RecordBatch objects, one at a time

RAISES DESCRIPTION
ValueError

If streaming fails or bundle is invalid

TypeError

If bundle doesn't support streaming

Example

c = await bundlebase.open("large_file.parquet") total_rows = 0 async for batch in stream_batches©: # Process each batch independently df = batch.to_pandas() total_rows += len(df) # Memory is freed after each iteration print(f"Processed {total_rows} rows")

Examples

Converting to pandas

import bundlebase

c = await (bundlebase.create()
    .attach("data.parquet")
    .filter("age >= 18"))

# Convert to pandas DataFrame
df = await c.to_pandas()
print(df.head())

Converting to Polars

import bundlebase

c = await (bundlebase.create()
    .attach("data.parquet"))

# Convert to Polars DataFrame
df = await c.to_polars()
print(df.describe())

Converting to NumPy

import bundlebase

c = await (bundlebase.create()
    .attach("data.parquet")
    .query("SELECT x, y, z FROM bundle"))

# Convert to NumPy arrays (dict of column name -> array)
arrays = await c.to_numpy()
print(arrays["x"].mean())

Converting to Dictionary

import bundlebase

c = await (bundlebase.create()
    .attach("data.parquet"))

# Convert to Python dict (column name -> list of values)
data = await c.to_dict()
print(len(data["id"]))

Streaming Large Datasets

For datasets larger than RAM, use stream_batches():

import bundlebase

c = await bundlebase.open("huge_dataset.parquet")

# Process in batches
total_rows = 0
async for batch in bundlebase.stream_batches(c):
    # Each batch is a PyArrow RecordBatch (~100MB)
    total_rows += batch.num_rows

    # Convert batch to pandas if needed
    batch_df = batch.to_pandas()
    process(batch_df)

print(f"Processed {total_rows} rows")

Memory Efficiency

All conversion functions use streaming internally:

# These all stream internally (constant memory):
df = await c.to_pandas()   # ✓ Good
df = await c.to_polars()   # ✓ Good
arrays = await c.to_numpy()  # ✓ Good

# For large datasets, use custom streaming:
async for batch in bundlebase.stream_batches(c):
    process(batch)  # ✓ Best for custom processing

Avoid for Large Datasets

Do NOT use as_pyarrow() for large datasets - it materializes the entire dataset in memory. Use stream_batches() instead.

Batch Size

stream_batches() uses a default batch size of ~100MB. You can customize this by implementing a custom progress tracker:

from bundlebase.progress import StreamProgress

progress = StreamProgress(batch_size=50_000_000)  # 50MB batches

async for batch in bundlebase.stream_batches(c, progress=progress):
    process(batch)

Integration Examples

With pandas

import bundlebase
import pandas as pd

c = await (bundlebase.create()
    .attach("data.parquet")
    .filter("active = true"))

# Convert and continue with pandas
df = await c.to_pandas()
df = df.sort_values("date")
df.to_csv("output.csv")

With Polars

import bundlebase
import polars as pl

c = await (bundlebase.create()
    .attach("data.parquet"))

# Convert to Polars for further processing
df = await c.to_polars()
result = df.group_by("category").agg(pl.col("value").sum())

With NumPy

import bundlebase
import numpy as np

c = await (bundlebase.create()
    .attach("data.parquet")
    .query("SELECT x, y, z FROM bundle"))

# Convert to NumPy for numerical operations
arrays = await c.to_numpy()
x = arrays["x"]
y = arrays["y"]
correlation = np.corrcoef(x, y)[0, 1]

Custom Streaming Processing

import bundlebase

c = await bundlebase.open("large_dataset.parquet")

# Custom incremental processing
results = []
async for batch in bundlebase.stream_batches(c):
    # Process each batch independently
    batch_result = analyze_batch(batch)
    results.append(batch_result)

# Combine results
final_result = combine_results(results)

See Also