Conversion Functions¶
Convert Bundlebase bundles to various data formats.
Overview¶
Bundlebase provides utilities to convert bundles to pandas, Polars, NumPy, and Python dictionaries. All conversion functions use streaming internally to handle datasets larger than RAM.
Functions¶
to_pandas¶
to_pandas
async
¶
Convert the bundle's data to a pandas DataFrame using streaming.
This function uses streaming internally to handle large datasets efficiently. Data is processed in batches to avoid loading everything into memory at once.
| PARAMETER | DESCRIPTION |
|---|---|
bundle
|
The Bundle to convert
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
DataFrame
|
pd.DataFrame: The data as a pandas DataFrame with proper column types |
| RAISES | DESCRIPTION |
|---|---|
ImportError
|
If pandas is not installed |
ValueError
|
If query execution fails |
RuntimeError
|
If conversion fails |
Example
c = await bundlebase.create() c = await c.attach("data.parquet") df = await c.to_pandas() # Using instance method - streams internally df.head()
to_polars¶
to_polars
async
¶
Convert the bundle's data to a Polars DataFrame using streaming.
This function uses streaming internally to handle large datasets efficiently. Polars can efficiently handle batched data construction.
| PARAMETER | DESCRIPTION |
|---|---|
bundle
|
The Bundle to convert
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
DataFrame
|
pl.DataFrame: The data as a Polars DataFrame |
| RAISES | DESCRIPTION |
|---|---|
ImportError
|
If polars is not installed |
ValueError
|
If query execution fails |
RuntimeError
|
If conversion fails |
Example
c = await bundlebase.create() c = await c.attach("data.parquet") df = await c.to_polars() # Using instance method - streams internally df.head()
to_numpy¶
to_numpy
async
¶
Convert the bundle's data to a dictionary of numpy arrays.
Each column becomes a key in the returned dictionary with a numpy array as its value. Null values are preserved (as NaN for float columns, None for object columns).
| PARAMETER | DESCRIPTION |
|---|---|
bundle
|
The Bundle to convert
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
dict
|
Dictionary mapping column names to numpy arrays
TYPE:
|
| RAISES | DESCRIPTION |
|---|---|
ImportError
|
If numpy is not installed (required by PyArrow) |
ValueError
|
If query execution fails |
RuntimeError
|
If conversion fails |
Example
c = await bundlebase.create() c = await c.attach("data.parquet") arrays = await c.to_numpy() # Using instance method arrays['column_name']
to_dict¶
to_dict
async
¶
Convert the bundle's data to a dictionary of lists.
Each column becomes a key in the returned dictionary with a list of values. This is useful for JSON serialization or working with generic Python structures.
| PARAMETER | DESCRIPTION |
|---|---|
bundle
|
The Bundle to convert
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
dict
|
Dictionary mapping column names to lists of values
TYPE:
|
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If query execution fails |
RuntimeError
|
If conversion fails |
Example
c = await bundlebase.create() c = await c.attach("data.parquet") data = await c.to_dict() # Using instance method import json json.dumps(data)
stream_batches¶
stream_batches
async
¶
Iterate over RecordBatches without materializing the full dataset.
This function provides streaming access to data, processing one batch at a time. This is memory-efficient for large datasets that don't fit in RAM.
| PARAMETER | DESCRIPTION |
|---|---|
bundle
|
The Bundle to stream from
TYPE:
|
| YIELDS | DESCRIPTION |
|---|---|
AsyncIterator[RecordBatch]
|
pa.RecordBatch: PyArrow RecordBatch objects, one at a time |
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If streaming fails or bundle is invalid |
TypeError
|
If bundle doesn't support streaming |
Example
c = await bundlebase.open("large_file.parquet") total_rows = 0 async for batch in stream_batches©: # Process each batch independently df = batch.to_pandas() total_rows += len(df) # Memory is freed after each iteration print(f"Processed {total_rows} rows")
Examples¶
Converting to pandas¶
import bundlebase
c = await (bundlebase.create()
.attach("data.parquet")
.filter("age >= 18"))
# Convert to pandas DataFrame
df = await c.to_pandas()
print(df.head())
Converting to Polars¶
import bundlebase
c = await (bundlebase.create()
.attach("data.parquet"))
# Convert to Polars DataFrame
df = await c.to_polars()
print(df.describe())
Converting to NumPy¶
import bundlebase
c = await (bundlebase.create()
.attach("data.parquet")
.query("SELECT x, y, z FROM bundle"))
# Convert to NumPy arrays (dict of column name -> array)
arrays = await c.to_numpy()
print(arrays["x"].mean())
Converting to Dictionary¶
import bundlebase
c = await (bundlebase.create()
.attach("data.parquet"))
# Convert to Python dict (column name -> list of values)
data = await c.to_dict()
print(len(data["id"]))
Streaming Large Datasets¶
For datasets larger than RAM, use stream_batches():
import bundlebase
c = await bundlebase.open("huge_dataset.parquet")
# Process in batches
total_rows = 0
async for batch in bundlebase.stream_batches(c):
# Each batch is a PyArrow RecordBatch (~100MB)
total_rows += batch.num_rows
# Convert batch to pandas if needed
batch_df = batch.to_pandas()
process(batch_df)
print(f"Processed {total_rows} rows")
Memory Efficiency¶
All conversion functions use streaming internally:
# These all stream internally (constant memory):
df = await c.to_pandas() # ✓ Good
df = await c.to_polars() # ✓ Good
arrays = await c.to_numpy() # ✓ Good
# For large datasets, use custom streaming:
async for batch in bundlebase.stream_batches(c):
process(batch) # ✓ Best for custom processing
Avoid for Large Datasets
Do NOT use as_pyarrow() for large datasets - it materializes the entire dataset in memory. Use stream_batches() instead.
Batch Size¶
stream_batches() uses a default batch size of ~100MB. You can customize this by implementing a custom progress tracker:
from bundlebase.progress import StreamProgress
progress = StreamProgress(batch_size=50_000_000) # 50MB batches
async for batch in bundlebase.stream_batches(c, progress=progress):
process(batch)
Integration Examples¶
With pandas¶
import bundlebase
import pandas as pd
c = await (bundlebase.create()
.attach("data.parquet")
.filter("active = true"))
# Convert and continue with pandas
df = await c.to_pandas()
df = df.sort_values("date")
df.to_csv("output.csv")
With Polars¶
import bundlebase
import polars as pl
c = await (bundlebase.create()
.attach("data.parquet"))
# Convert to Polars for further processing
df = await c.to_polars()
result = df.group_by("category").agg(pl.col("value").sum())
With NumPy¶
import bundlebase
import numpy as np
c = await (bundlebase.create()
.attach("data.parquet")
.query("SELECT x, y, z FROM bundle"))
# Convert to NumPy for numerical operations
arrays = await c.to_numpy()
x = arrays["x"]
y = arrays["y"]
correlation = np.corrcoef(x, y)[0, 1]
Custom Streaming Processing¶
import bundlebase
c = await bundlebase.open("large_dataset.parquet")
# Custom incremental processing
results = []
async for batch in bundlebase.stream_batches(c):
# Process each batch independently
batch_result = analyze_batch(batch)
results.append(batch_result)
# Combine results
final_result = combine_results(results)
See Also¶
- Async API - Bundle operations
- Progress Tracking - Monitor streaming operations