Skip to content

Progress Tracking

Monitor the progress of long-running Bundlebase operations.

Overview

Bundlebase provides a progress tracking system for monitoring data processing operations. This is particularly useful when working with large datasets or long-running transformations.

Progress Module

progress

Progress tracking for long-running Bundle operations.

This module provides integration with progress bar libraries like tqdm, allowing users to see visual progress feedback for operations like indexing, attaching data, and querying large datasets.

By default, if tqdm is installed, progress bars will appear automatically. Users can also provide custom callbacks for progress tracking.

Example

Auto-enabled if tqdm installed

import bundlebase c = await bundlebase.create("/path/to/bundle") await c.attach("large_data.parquet") # Progress bar appears automatically

Custom callback

def my_progress(event, operation, id, current, total, message): if event == 'start': print(f"Starting: {operation}") elif event == 'update': print(f"Progress: {current}/{total or '?'}") elif event == 'finish': print(f"Finished: {operation}")

bundlebase.progress.set_callback(my_progress)

_current_tracker module-attribute

_current_tracker: Optional[Callable] = None

__all__ module-attribute

__all__ = ['set_callback', 'enable_tqdm', 'disable', 'install_default_tracker', 'TqdmProgressTracker', 'CustomCallbackTracker']

TqdmProgressTracker

TqdmProgressTracker()

Progress tracker that uses tqdm for visual progress bars.

This tracker creates and manages tqdm progress bars for Bundle operations. It supports both determinate (known total) and indeterminate (unknown total) progress.

__call__

__call__(event: str, operation: str, id: int, current: int, total: Optional[int], message: Optional[str])

Handle a progress event.

PARAMETER DESCRIPTION
event

Type of event ('start', 'update', 'finish')

TYPE: str

operation

Human-readable operation name

TYPE: str

id

Unique identifier for this progress operation

TYPE: int

current

Current progress value

TYPE: int

total

Total expected value (None for indeterminate)

TYPE: Optional[int]

message

Optional status message

TYPE: Optional[str]

CustomCallbackTracker

CustomCallbackTracker(callback: Callable)

Progress tracker that forwards events to a custom callback.

This tracker allows users to provide their own progress handling logic.

PARAMETER DESCRIPTION
callback

Function with signature: fn(event, operation, id, current, total, message)

TYPE: Callable

__call__

__call__(event: str, operation: str, id: int, current: int, total: Optional[int], message: Optional[str])

Forward the event to the custom callback.

_has_tqdm

_has_tqdm() -> bool

Check if tqdm is available.

set_callback

set_callback(callback: Optional[Callable] = None)

Set a custom progress callback.

PARAMETER DESCRIPTION
callback

Function with signature: fn(event: str, operation: str, id: int, current: int, total: Optional[int], message: Optional[str]) If None, disables progress tracking.

TYPE: Optional[Callable] DEFAULT: None

Example

def my_callback(event, operation, id, current, total, message): if event == 'start': print(f"Starting: {operation}") elif event == 'update': pct = (current / total * 100) if total else 0 print(f"Progress: {pct:.1f}%") elif event == 'finish': print(f"Finished: {operation}")

bundlebase.progress.set_callback(my_callback)

enable_tqdm

enable_tqdm()

Explicitly enable tqdm progress bars.

This is called automatically on module import if tqdm is available. Users only need to call this if they disabled progress and want to re-enable it.

RAISES DESCRIPTION
RuntimeError

If tqdm is not installed

disable

disable()

Disable all progress tracking.

Operations will run silently without any progress feedback.

install_default_tracker

install_default_tracker()

Install the default progress tracker.

If tqdm is available, enables tqdm progress bars. Otherwise, does nothing (silent operation).

This is called automatically when the bundlebase module is imported.

Examples

Basic Progress Tracking

import bundlebase
from bundlebase.progress import StreamProgress

# Create a progress tracker
progress = StreamProgress()

c = await bundlebase.create()
c = await c.attach("large_dataset.parquet")

# Stream with progress tracking
async for batch in bundlebase.stream_batches(c, progress=progress):
    # Show progress
    print(f"Progress: {progress.percentage:.1f}%")
    print(f"Rows processed: {progress.rows_processed}")

    # Process batch
    process_batch(batch)

Custom Progress Reporting

import bundlebase
from bundlebase.progress import StreamProgress

class CustomProgress(StreamProgress):
    """Custom progress tracker with logging."""

    def update(self, rows: int):
        """Called when progress updates."""
        super().update(rows)
        if self.rows_processed % 100000 == 0:
            print(f"Processed {self.rows_processed:,} rows...")

progress = CustomProgress()

c = await bundlebase.create().attach("data.parquet")
async for batch in bundlebase.stream_batches(c, progress=progress):
    process(batch)

Progress with Time Estimates

import bundlebase
from bundlebase.progress import StreamProgress
import time

progress = StreamProgress()
start_time = time.time()

c = await bundlebase.open("large_dataset.parquet")

async for batch in bundlebase.stream_batches(c, progress=progress):
    # Calculate ETA
    elapsed = time.time() - start_time
    if progress.percentage > 0:
        total_time = elapsed / (progress.percentage / 100)
        remaining = total_time - elapsed
        print(f"Progress: {progress.percentage:.1f}% (ETA: {remaining:.0f}s)")

    process(batch)

Integration with tqdm

import bundlebase
from bundlebase.progress import StreamProgress
from tqdm import tqdm

progress = StreamProgress()
c = await bundlebase.open("large_dataset.parquet")

# Use tqdm for nice progress bar
with tqdm(total=100, desc="Processing") as pbar:
    last_percentage = 0

    async for batch in bundlebase.stream_batches(c, progress=progress):
        # Update tqdm
        current = progress.percentage
        pbar.update(current - last_percentage)
        last_percentage = current

        process(batch)

Batch Size Control

import bundlebase
from bundlebase.progress import StreamProgress

# Control batch size (in bytes)
progress = StreamProgress(batch_size=50_000_000)  # 50MB batches

c = await bundlebase.create().attach("data.parquet")
async for batch in bundlebase.stream_batches(c, progress=progress):
    # Smaller batches for memory-constrained environments
    print(f"Batch size: {batch.nbytes / 1024 / 1024:.1f} MB")
    process(batch)

Progress Properties

Progress trackers typically provide:

  • rows_processed - Total number of rows processed so far
  • percentage - Completion percentage (0-100)
  • batch_count - Number of batches processed
  • batch_size - Target size of each batch in bytes

Implementing Custom Trackers

Create custom progress trackers by subclassing StreamProgress:

from bundlebase.progress import StreamProgress
import logging

class LoggingProgress(StreamProgress):
    """Progress tracker that logs to Python logging."""

    def __init__(self, batch_size: int = 100_000_000):
        super().__init__(batch_size)
        self.logger = logging.getLogger(__name__)

    def update(self, rows: int):
        """Called when new rows are processed."""
        super().update(rows)
        self.logger.info(
            f"Processed {self.rows_processed:,} rows "
            f"({self.percentage:.1f}%)"
        )

    def complete(self):
        """Called when processing completes."""
        self.logger.info(
            f"Processing complete! Total rows: {self.rows_processed:,}"
        )

# Use custom tracker
progress = LoggingProgress()
async for batch in bundlebase.stream_batches(c, progress=progress):
    process(batch)
progress.complete()

Performance Considerations

Progress tracking adds minimal overhead:

  • With progress: ~0.1ms per batch
  • Without progress: ~0.05ms per batch

For most use cases, this overhead is negligible compared to data processing time.

Integration with UI Frameworks

Streamlit

import bundlebase
from bundlebase.progress import StreamProgress
import streamlit as st

progress = StreamProgress()
progress_bar = st.progress(0)
status_text = st.empty()

c = await bundlebase.open("large_dataset.parquet")

async for batch in bundlebase.stream_batches(c, progress=progress):
    # Update Streamlit UI
    progress_bar.progress(progress.percentage / 100)
    status_text.text(f"Processed {progress.rows_processed:,} rows")

    process(batch)

progress_bar.progress(100)
status_text.text("Processing complete!")

Jupyter Widgets

import bundlebase
from bundlebase.progress import StreamProgress
from ipywidgets import IntProgress, HTML, VBox
from IPython.display import display

progress = StreamProgress()
progress_widget = IntProgress(min=0, max=100, description='Progress:')
status_widget = HTML()
display(VBox([progress_widget, status_widget]))

c = await bundlebase.open("large_dataset.parquet")

async for batch in bundlebase.stream_batches(c, progress=progress):
    # Update widgets
    progress_widget.value = int(progress.percentage)
    status_widget.value = f"<p>Processed {progress.rows_processed:,} rows</p>"

    process(batch)

See Also