Java SDK¶
Build custom Bundlebase connectors in Java.
Installation¶
Add the SDK to your Maven pom.xml:
<dependency>
<groupId>com.bundlebase</groupId>
<artifactId>bundlebase-sdk</artifactId>
<version>0.1.0</version>
</dependency>
Requires Apache Arrow Java (org.apache.arrow:arrow-vector and org.apache.arrow:arrow-memory-netty).
Quick Start¶
import com.bundlebase.sdk.*;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.*;
import org.apache.arrow.vector.types.pojo.*;
import java.util.*;
public class ExampleConnector implements Connector {
private final RootAllocator allocator = new RootAllocator();
public List<Location> discover(List<String> attached, Map<String, String> args) {
return List.of(new Location("data.parquet", true, "parquet", "v1"));
}
public VectorSchemaRoot data(Location loc, Map<String, String> args) {
Schema schema = new Schema(List.of(
Field.nullable("id", new ArrowType.Int(64, true)),
Field.nullable("value", new ArrowType.Utf8())
));
VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator);
root.allocateNew();
((BigIntVector) root.getVector("id")).setSafe(0, 1);
((VarCharVector) root.getVector("value")).setSafe(0, "a".getBytes());
root.setRowCount(1);
return root;
}
public static void main(String[] args) { Serve.run(new ExampleConnector()); }
}
Build a JAR and use with:
bundle.import_connector('example.connector', 'java::target/example-connector.jar')
bundle.create_source('example.connector')
API Reference¶
Connector Interface¶
public interface Connector {
List<Location> discover(List<String> attachedLocations, Map<String, String> args);
Object data(Location location, Map<String, String> args);
default Map<String, String> schema() { return null; }
default StableUrl stableUrl(Location location, Map<String, String> args) { return null; }
}
discover(attachedLocations, args)¶
Return the available data locations.
| Parameter | Type | Description |
|---|---|---|
attachedLocations |
List<String> |
Locations already attached to the bundle |
args |
Map<String, String> |
Extra arguments from the source configuration |
Returns: List<Location>
data(location, args)¶
Return data for the given location. Return null for no data.
| Parameter | Type | Description |
|---|---|---|
location |
Location |
The location to fetch data for |
args |
Map<String, String> |
Extra arguments from the source configuration |
Returns: One of the supported data return types.
schema()¶
Optional schema for dict-to-Arrow conversion. Required when data() returns List<Map> or Map<String, List>. Return null (default) when data() returns VectorSchemaRoot.
Returns: Map<String, String> mapping column names to type strings, or null
stableUrl(location, args)¶
Return a stable URL for the given location. Default returns null.
| Parameter | Type | Description |
|---|---|---|
location |
Location |
The location to get a URL for |
args |
Map<String, String> |
Extra arguments from the source configuration |
Returns: StableUrl or null
Location¶
public record Location(String location, boolean mustCopy, String format, String version) {
public Location(String location) { this(location, true, "parquet", ""); }
}
| Field | Type | Default | Description |
|---|---|---|---|
location |
String |
(required) | Identifier for this data file |
mustCopy |
boolean |
true |
Whether the data must be copied into the bundle |
format |
String |
"parquet" |
File format |
version |
String |
"" |
Version string for change detection |
The single-argument constructor new Location("path") defaults to mustCopy=true, format="parquet", version="".
StableUrl¶
| Field | Type | Description |
|---|---|---|
url |
String |
The stable URL string |
Serve¶
public class Serve {
public static void run(Connector source);
public static void run(Connector source, InputStream in, OutputStream out);
}
run(source) reads from stdin and writes to stdout. The two-argument overload accepts explicit streams for testing.
Data Return Types¶
The data() method supports several return types:
| Return Type | Description |
|---|---|
VectorSchemaRoot |
Arrow data directly (most control) |
List<Map<String, Object>> |
Row-oriented dicts (requires schema()) |
Map<String, List<?>> |
Column-oriented dict (requires schema()) |
null |
No data for this location |
A schema() method is required when returning dict data (List<Map> or Map<String, List>). Returning dict data without a schema throws an IllegalArgumentException.
Schema-Driven Connectors¶
Instead of constructing VectorSchemaRoot manually, you can define a schema() method with simple type strings and return plain Java collections. The SDK handles Arrow conversion automatically.
Defining a Schema¶
Override schema() to return a map of column names to type strings:
@Override
public Map<String, String> schema() {
Map<String, String> s = new LinkedHashMap<>();
s.put("name", "Utf8");
s.put("age", "Int32");
s.put("score", "Float64");
return s;
}
Supported type strings:
| Type String | Arrow Type | Aliases |
|---|---|---|
Utf8 |
Utf8 |
string |
Int64 |
Int(64, signed) |
int |
Int8, Int16, Int32 |
Int(8/16/32, signed) |
|
UInt8, UInt16, UInt32, UInt64 |
Int(8-64, unsigned) |
|
Float64 |
FloatingPoint(DOUBLE) |
float, double |
Float16, Float32 |
FloatingPoint(HALF/SINGLE) |
|
Boolean |
Bool |
bool |
Date32 |
Date(DAY) |
date |
Date64 |
Date(MILLISECOND) |
|
Timestamp |
Timestamp(MICROSECOND) |
|
Binary |
Binary |
bytes |
Returning Column-Oriented Data¶
With a schema defined, return data as Map<String, List<?>>:
@Override
public Object data(Location location, Map<String, String> args) {
Map<String, List<?>> cols = new LinkedHashMap<>();
cols.put("name", List.of("alice", "bob"));
cols.put("age", List.of(30, 25));
return cols;
}
Returning Row-Oriented Data¶
Return List<Map<String, Object>>:
@Override
public Object data(Location location, Map<String, String> args) {
return List.of(
Map.of("name", "alice", "age", 30),
Map.of("name", "bob", "age", 25)
);
}
Full Schema Example¶
import com.bundlebase.sdk.*;
import java.util.*;
public class SensorConnector implements Connector {
@Override
public Map<String, String> schema() {
Map<String, String> s = new LinkedHashMap<>();
s.put("sensor_id", "Utf8");
s.put("temperature", "Float32");
s.put("reading_count", "Int32");
return s;
}
@Override
public List<Location> discover(List<String> attached, Map<String, String> args) {
return List.of(new Location("readings"));
}
@Override
public Object data(Location location, Map<String, String> args) {
Map<String, List<?>> cols = new LinkedHashMap<>();
cols.put("sensor_id", List.of("s1", "s2", "s3"));
cols.put("temperature", List.of(22.5, 19.8, 25.1));
cols.put("reading_count", List.of(100, 85, 120));
return cols;
}
public static void main(String[] args) { Serve.run(new SensorConnector()); }
}
Complete Example¶
A source with multiple locations, stable URL support, and extra argument handling:
import com.bundlebase.sdk.*;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.*;
import org.apache.arrow.vector.types.pojo.*;
import java.util.*;
public class DatabaseSource implements Connector {
private final BufferAllocator allocator = new RootAllocator();
@Override
public List<Location> discover(List<String> attachedLocations, Map<String, String> args) {
return List.of(
new Location("users.parquet", true, "parquet", "v2"),
new Location("orders.parquet", true, "parquet", "v1")
);
}
@Override
public VectorSchemaRoot data(Location location, Map<String, String> args) {
switch (location.location()) {
case "users.parquet" -> {
Schema schema = new Schema(Arrays.asList(
Field.nullable("id", new ArrowType.Int(64, true)),
Field.nullable("name", new ArrowType.Utf8())
));
VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator);
root.allocateNew();
BigIntVector idVec = (BigIntVector) root.getVector("id");
VarCharVector nameVec = (VarCharVector) root.getVector("name");
idVec.setSafe(0, 1); nameVec.setSafe(0, "alice".getBytes());
idVec.setSafe(1, 2); nameVec.setSafe(1, "bob".getBytes());
idVec.setSafe(2, 3); nameVec.setSafe(2, "charlie".getBytes());
root.setRowCount(3);
return root;
}
case "orders.parquet" -> {
Schema schema = new Schema(Arrays.asList(
Field.nullable("order_id", new ArrowType.Int(64, true)),
Field.nullable("user_id", new ArrowType.Int(64, true)),
Field.nullable("amount", new ArrowType.FloatingPoint(org.apache.arrow.vector.types.FloatingPointPrecision.DOUBLE))
));
VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator);
root.allocateNew();
((BigIntVector) root.getVector("order_id")).setSafe(0, 101);
((BigIntVector) root.getVector("user_id")).setSafe(0, 1);
((Float8Vector) root.getVector("amount")).setSafe(0, 29.99);
root.setRowCount(1);
return root;
}
default -> { return null; }
}
}
@Override
public StableUrl stableUrl(Location location, Map<String, String> args) {
if ("users.parquet".equals(location.location())) {
return new StableUrl("https://db.example.com/exports/users-v2.parquet");
}
return null;
}
public static void main(String[] args) { Serve.run(new DatabaseSource()); }
}
Testing¶
The Serve.run() method accepts explicit InputStream/OutputStream for testing without launching a subprocess:
import com.bundlebase.sdk.*;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.Test;
import java.io.*;
import java.util.*;
import static org.junit.Assert.*;
public class ExampleConnectorTest {
private static final ObjectMapper MAPPER = new ObjectMapper();
private static byte[] makeRequest(String method, Map<String, Object> params, int id)
throws Exception {
Map<String, Object> req = new LinkedHashMap<>();
req.put("jsonrpc", "2.0");
req.put("id", id);
req.put("method", method);
req.put("params", params != null ? params : Map.of());
return (MAPPER.writeValueAsString(req) + "\n").getBytes();
}
@Test
public void testDiscover() throws Exception {
ByteArrayOutputStream input = new ByteArrayOutputStream();
input.write(makeRequest("discover", Map.of("attached_locations", List.of()), 1));
input.write(makeRequest("shutdown", null, 2));
ByteArrayOutputStream output = new ByteArrayOutputStream();
Serve.run(new ExampleConnector(), new ByteArrayInputStream(input.toByteArray()), output);
String[] lines = output.toString().split("\n");
JsonNode resp = MAPPER.readTree(lines[0]);
assertEquals(1, resp.get("id").asInt());
JsonNode locations = resp.get("result").get("locations");
assertEquals(1, locations.size());
assertEquals("data.parquet", locations.get(0).get("location").asText());
}
@Test
public void testDataReturnsArrow() throws Exception {
ByteArrayOutputStream input = new ByteArrayOutputStream();
input.write(makeRequest("data", Map.of(
"location", Map.of("location", "data.parquet")
), 1));
input.write(makeRequest("shutdown", null, 2));
ByteArrayOutputStream output = new ByteArrayOutputStream();
Serve.run(new ExampleConnector(), new ByteArrayInputStream(input.toByteArray()), output);
byte[] out = output.toByteArray();
int newlineIdx = 0;
while (newlineIdx < out.length && out[newlineIdx] != '\n') newlineIdx++;
newlineIdx++;
int length = java.nio.ByteBuffer.wrap(out, newlineIdx, 4).getInt();
assertTrue("Expected non-zero Arrow IPC data", length > 0);
}
}
Native Mode¶
Build your connector as a shared library using Project Panama (Java 22+) for zero-copy in-process loading.
Requirements¶
- Java 22+ — uses the Foreign Function & Memory API (JEP 454)
- GCC or Clang for compiling the thin C bootstrap
Setup¶
Register your connector with PluginExport:
import com.bundlebase.sdk.*;
public class ExampleFfiConnector implements Connector {
// ... implement discover() and data() as usual ...
static {
PluginExport.register(new ExampleFfiConnector());
}
}
Build¶
Build the Panama bridge and your Java code into a shared library:
This produces libbundlebase_plugin.so in target/.
Use¶
bundle.import_connector('example.connector', 'ffi::./libbundlebase_plugin.so')
bundle.create_source('example.connector')
How It Works¶
The native bridge uses Project Panama's Foreign Function & Memory API for high-performance Java↔C interop:
- A thin C bootstrap (
bundlebase_plugin.c) starts the JVM once at library load - It calls
PluginExport.initialize()— a single JNI call that registers Panama upcall stubs - All subsequent
bundlebase_discover,bundlebase_data, andbundlebase_stable_urlcalls route through Panama function pointers — no JNI method dispatch on the hot path - Strings are allocated via
malloc()in Java (using Panama downcalls) and freed by Bundlebase viabundlebase_free() - Arrow data is transferred via the Arrow C Data Interface (
ArrowArrayStream)
This architecture means JNI is used only for the one-time JVM bootstrap. All data-path calls use Panama upcalls, which avoids JNI overhead (method ID lookups, string conversions in C, etc.).
The same Connector interface works for both native and IPC — switch between them by changing only the entry point (PluginExport.register() + native build vs Serve.run() + JAR).
See Native Mode for the full C ABI reference.
Error Handling¶
Exceptions thrown in your discover(), data(), or stableUrl() methods are caught by the SDK and returned as JSON-RPC error responses with code -32000. The exception message is included:
public VectorSchemaRoot data(Location location, Map<String, String> args) {
throw new RuntimeException("Database connection failed");
// Bundlebase receives: {"error": {"code": -32000, "message": "Database connection failed"}}
}
The SDK automatically closes VectorSchemaRoot instances after sending the Arrow IPC data.
Bundlebase surfaces these errors to the user as connector failures during FETCH.