Functions (FFI & IPC Runtimes)¶
Custom functions let you extend bundlebase's SQL with high-performance scalar and aggregate functions written in any language that can produce a shared library (.so/.dylib) or standalone executable.
Runtimes¶
| Runtime | How it works | Best for | Persistent? |
|---|---|---|---|
python |
In-process via PyO3 | Prototyping, scripts | Temp only |
ipc |
Subprocess via SDK, communicates via Arrow IPC | Go, Python, any language | Yes |
ffi |
Loads a shared library (.so/.dylib) via FFI | Rust, C, C++ — zero IPC overhead | Yes |
java |
JVM subprocess via java -jar |
Java | Yes |
docker |
Containerized subprocess | Any language, most isolated | Yes |
Persistent vs Temporary¶
- Temporary (
IMPORT TEMP FUNCTION): Session-scoped, supports all runtimes includingpython. Great for iterative development. - Persistent (
IMPORT FUNCTION): Bundled into the data package, survives commits. Requires a serializable runtime (ipc,ffi,java,docker— notpython).
Development workflow: Start temporary for fast iteration, then promote to persistent when ready:
# 1. Develop interactively with temporary
c.import_temp_function("acme.transform", "python::my_module:transform")
c.query("SELECT acme.transform(col) FROM data") # test it
# 2. When ready, package as IPC and make persistent
c.import_function("acme.transform", "ipc::python:my_module_server.py")
Auto-Detection¶
Functions can declare their signatures via a bundlebase_metadata() convention, eliminating the need to specify types manually:
def bundlebase_metadata():
return {
"functions": [
{"name": "double_val", "input_types": ["Int64"], "return_type": "Int64", "kind": "scalar"},
{"name": "my_sum", "input_types": ["Int64"], "return_type": "Int64", "kind": "aggregate"},
]
}
With auto-detection, you can import without specifying types:
-- Import a single function (types auto-detected from metadata)
IMPORT TEMP FUNCTION acme.double_val FROM 'ipc::python:my_module_server.py'
-- Import all functions in a namespace
IMPORT TEMP FUNCTION acme.* FROM 'ipc::python:my_module_server.py'
This is the recommended approach for registering functions.
Writing an FFI Function (Rust/C)¶
Scalar Function C ABI¶
A scalar function receives Arrow arrays and produces an Arrow array:
int32_t <symbol_name>(
const FFI_ArrowArray** args, // Input arrays
const FFI_ArrowSchema** arg_schemas, // Input schemas
int64_t n_args, // Number of arguments
FFI_ArrowArray* out_array, // Output array (caller-allocated)
FFI_ArrowSchema* out_schema, // Output schema (caller-allocated)
char* err_buf, // Error message buffer
int64_t err_buf_len // Error buffer length
);
// Returns 0 on success, non-zero on error
Ownership convention: The function consumes input arrays (takes ownership via the FFI release callback). Output arrays are written to caller-provided buffers; the caller takes ownership.
Aggregate Function C ABI¶
Aggregate functions use a state-based protocol with five symbols:
// Create initial accumulator state
void* <symbol>_create_state();
// Accumulate a batch of values into state
int32_t <symbol>_accumulate(
void* state,
const FFI_ArrowArray** args,
const FFI_ArrowSchema** arg_schemas,
int64_t n_args,
char* err_buf, int64_t err_buf_len
);
// Produce final result (single-element array)
int32_t <symbol>_evaluate(
void* state,
FFI_ArrowArray* out_array,
FFI_ArrowSchema* out_schema,
char* err_buf, int64_t err_buf_len
);
// Free state when done
void <symbol>_free_state(void* state);
Rust cdylib Example¶
Create a new Rust library with crate-type = ["cdylib"]:
# Cargo.toml
[lib]
crate-type = ["cdylib"]
[dependencies]
arrow = { version = "57", features = ["ffi"] }
// src/lib.rs
use arrow::array::{ArrayRef, Int64Array};
use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema};
use std::sync::Arc;
#[no_mangle]
pub unsafe extern "C" fn double_val(
args: *const *const FFI_ArrowArray,
arg_schemas: *const *const FFI_ArrowSchema,
n_args: i64,
out_array: *mut FFI_ArrowArray,
out_schema: *mut FFI_ArrowSchema,
err_buf: *mut u8,
err_buf_len: i64,
) -> i32 {
// Import input array
let arg_array = std::ptr::read(*args.offset(0));
let arg_schema = &**arg_schemas.offset(0);
let data = arrow::ffi::from_ffi(arg_array, arg_schema).unwrap();
let array = arrow::array::make_array(data);
let input = array.as_any().downcast_ref::<Int64Array>().unwrap();
// Compute result
let result: Int64Array = input.iter().map(|v| v.map(|x| x * 2)).collect();
let result_ref: ArrayRef = Arc::new(result);
// Export output array
let (ffi_arr, ffi_schema) = arrow::ffi::to_ffi(&result_ref.to_data()).unwrap();
std::ptr::write(out_array, ffi_arr);
std::ptr::write(out_schema, ffi_schema);
0
}
Build and use:
Manifest Function (Bulk Discovery)¶
For libraries with multiple functions, export a manifest function:
use std::ffi::{c_char, CString};
const MANIFEST: &str = r#"{"functions": [
{"name": "double_val", "symbol": "double_val",
"input_types": ["Int64"], "return_type": "Int64", "kind": "scalar"},
{"name": "int_sum", "symbol": "int_sum",
"input_types": ["Int64"], "return_type": "Int64", "kind": "aggregate"}
]}"#;
#[no_mangle]
pub extern "C" fn bundlebase_functions() -> *const c_char {
CString::new(MANIFEST).unwrap().into_raw()
}
#[no_mangle]
pub unsafe extern "C" fn bundlebase_free_manifest(ptr: *const c_char) {
if !ptr.is_null() {
drop(CString::from_raw(ptr as *mut c_char));
}
}
Then register all functions at once:
IPC Functions¶
IPC functions run as separate processes. Bundlebase communicates with them via Arrow IPC over stdin/stdout.
IPC Discovery Protocol¶
IPC executables support bulk discovery via the --bundlebase-functions flag:
$ ./my_func --bundlebase-functions
{"functions": [
{"name": "double_val", "input_types": ["Int64"], "return_type": "Int64", "kind": "scalar"}
]}
Register all discovered functions:
Manifest JSON Format¶
Both FFI and IPC runtimes use the same JSON manifest format:
{
"functions": [
{
"name": "double_val",
"symbol": "double_val",
"input_types": ["Int64"],
"return_type": "Int64",
"kind": "scalar"
},
{
"name": "my_sum",
"input_types": ["Int64"],
"return_type": "Int64",
"kind": "aggregate"
}
]
}
| Field | Required | Description |
|---|---|---|
name |
Yes | Function name (used for SQL registration) |
symbol |
No | C symbol name (defaults to name) |
input_types |
Yes | Arrow type names for parameters |
return_type |
Yes | Arrow type name for return value |
kind |
No | scalar (default) or aggregate |
IPC Scalar Protocol¶
When Bundlebase invokes a scalar IPC function, the exchange is:
- JSON-RPC request —
invokemethod with the function name - Arrow IPC input — length-prefixed (4-byte big-endian u32) Arrow IPC stream containing one RecordBatch with one column per argument
- Arrow IPC output — length-prefixed Arrow IPC stream containing a single-column RecordBatch with the result
IPC Aggregate Protocol¶
IPC aggregate functions use four JSON-RPC methods. Aggregate state is opaque and server-side — only string state IDs cross the wire.
| Step | Method | Params | Arrow IPC | Response |
|---|---|---|---|---|
| 1 | create_state |
function |
— | {"state_id": "0"} |
| 2 | accumulate |
function, state_id |
Input batch (one column per arg) | {"ok": true} (ack) |
| 3 | merge |
function, state_id1, state_id2 |
— | {"state_id": "2"} |
| 4 | evaluate |
function, state_id |
— | {"ok": true} (ack), then Arrow IPC output (single-row, single-column) |
Lifecycle:
create_state— allocates a fresh accumulator on the server, returns an opaque state ID.accumulate— called once per batch. After the JSON-RPC ack, Bundlebase sends a length-prefixed Arrow IPC stream with the batch data. The server updates its internal state; nothing is returned.merge— combines two states (used during parallel/partitioned execution). Returns the merged state ID.evaluate— finalizes the aggregate. After the JSON-RPC ack, the server sends a length-prefixed Arrow IPC stream containing a single-row, single-column RecordBatch with the result.
To declare an aggregate function in the manifest, set "kind": "aggregate":
Python SDK Functions¶
The Python SDK provides a Function base class (bundlebase_sdk.function) for writing IPC function providers. Call serve_function(instance) to start the JSON-RPC serve loop.
Scalar Example¶
from bundlebase_sdk.function import Function
from bundlebase_sdk.function_serve import serve_function
import pyarrow as pa
class MyFunctions(Function):
def functions(self):
return [
{"name": "double_val", "input_types": ["Int64"], "return_type": "Int64", "kind": "scalar"},
]
def invoke(self, name, batch):
if name == "double_val":
col = batch.column(0)
result = pa.compute.multiply(col, 2)
return pa.record_batch({"result": result})
if __name__ == "__main__":
serve_function(MyFunctions())
Register and use:
Aggregate Example¶
For aggregate functions, implement four additional methods: create_state, accumulate, merge, and evaluate. The SDK's _AggregateStateStore manages state lifecycle automatically — your methods just work with plain Python objects.
from bundlebase_sdk.function import Function
from bundlebase_sdk.function_serve import serve_function
import pyarrow as pa
class MyFunctions(Function):
def functions(self):
return [
{"name": "my_avg", "input_types": ["Float64"], "return_type": "Float64", "kind": "aggregate"},
]
def invoke(self, name, batch):
raise NotImplementedError("No scalar functions")
def create_state(self, name):
# Return any Python object — it stays server-side
return {"sum": 0.0, "count": 0}
def accumulate(self, name, state, batch):
col = batch.column(0)
state["sum"] += pa.compute.sum(col).as_py()
state["count"] += len(col)
return state
def merge(self, name, state1, state2):
return {
"sum": state1["sum"] + state2["sum"],
"count": state1["count"] + state2["count"],
}
def evaluate(self, name, state):
if state["count"] == 0:
return pa.scalar(None, type=pa.float64())
return pa.scalar(state["sum"] / state["count"], type=pa.float64())
if __name__ == "__main__":
serve_function(MyFunctions())
Register and use:
IMPORT FUNCTION stats.* FROM 'ipc::python:my_functions.py'
SELECT category, stats.my_avg(amount) FROM bundle GROUP BY category
Key points:
- State objects are arbitrary Python objects — they never cross the wire
- accumulate receives a pa.RecordBatch with one column per function argument
- evaluate must return a pa.Scalar (or a plain Python value that PyArrow can convert)
- After evaluate, the state is automatically cleaned up
Function Kinds¶
The kind field in a function manifest can be one of:
scalar(default) — row-by-row transformationaggregate— many rows to one result per grouptable_valued— returns a table of rows (registration supported; execution is planned for a future release)
Supported Arrow Types¶
Primitive types:
Boolean, Int8, Int16, Int32, Int64, UInt8, UInt16, UInt32, UInt64, Float16, Float32, Float64, Utf8, LargeUtf8, Binary, LargeBinary, Date32, Date64
Complex types:
| Type | Syntax | Example |
|---|---|---|
| List | List<T> |
List<Int64> |
| Struct | Struct<name:type,...> |
Struct<x:Int64,y:Float64> |
| Map | Map<K,V> |
Map<Utf8,Int64> |
| Decimal | Decimal128(precision,scale) |
Decimal128(38,10) |
Complex types can be nested (e.g., List<Struct<name:Utf8,score:Float64>>).
Renaming Functions¶
RENAME FUNCTION¶
Renames a function definition to a new dotted name. All overloads are renamed. Old UDFs are deregistered and re-registered under the new name.
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
old_name |
str |
(required) | Current dot-separated function name |
new_name |
str |
(required) | New dot-separated function name |
RENAME TEMP FUNCTION¶
Renames runtime-only function entries to a new name. Only temporary entries are renamed; persistent entries are not affected.
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
old_name |
str |
(required) | Current dot-separated function name |
new_name |
str |
(required) | New dot-separated function name |
Troubleshooting¶
"External code execution is not allowed"¶
External code execution is disabled by default. Enable it in your bundle's configuration:
Or in SQL:
Subprocess not found or not executable¶
If you get a "No such file or directory" or "Permission denied" error:
- Verify the path in your
FROMclause is correct and accessible - For Python scripts, ensure the shebang line is present (
#!/usr/bin/env python3) and the file is executable (chmod +x) - For
ipc::python:script.py, ensurepythonis on your PATH
Protocol version mismatch¶
If you see "unsupported protocol version" errors, update your SDK to match the bundlebase version. The current protocol version is 1.
Subprocess crashes during execution¶
If a subprocess crashes mid-operation, bundlebase will report a pipe/IO error. The subprocess is automatically removed from the cache and will be restarted on the next call. Check the subprocess's stderr output for crash details.
Type mismatches¶
If your function returns data that doesn't match the declared schema:
- Verify
input_typesandreturn_typein your manifest match the actual Arrow types - Use
DESCRIBE FUNCTION namespace.nameto inspect the registered signature - Common mistake: returning
Int32whenInt64was declared
Function returns empty results¶
- Ensure your function returns data for all input rows (scalar functions must return an array the same length as the input)
- Check that your
--bundlebase-functionsmanifest is valid JSON and lists all functions