Claude Code Transcript History¶
This example builds an empty Bundlebase bundle that anyone with a Claude Code installation can run FETCH against to load their own transcript history into a queryable schema.
The bundle ships:
- A native Go connector (FFI shared library) that walks Claude's
~/.claude/projectsJSONL files and flattens them into one row per transcript event. - A
CREATE SOURCEdefinition that points the connector at the local~/.claude/projectsdirectory. - B-tree indexes on
project_id,session_id,event_type,timestampand an inverted text index onsearch_text. - Two views (
message_events,tool_events) for the most common slice queries. - The Go source for the connector, attached as a
WITH (src = '…')archive so a recipient can audit, fork, or rebuild the binary.
No data is fetched at build time, so the bundle ends as a structure-only / empty bundle. Recipients run FETCH base ADD to populate it from their own machine.
Files¶
| File | Description | Download |
|---|---|---|
claude-history-bundle.tar.gz |
Prebuilt empty bundle (gzipped tar). Extract, open with bundlebase, and FETCH. |
Download |
bundlebase.yaml |
Config file enabling system.allow_external_code so the bundled FFI connector loads. |
Download |
create_claude_history_bundle.py |
Python script that builds the connector, defines the bundle, and writes the .tar. |
Download |
claude_history_connector/main.go |
Go connector source. Implements the bundlebase FFI ABI to walk *.jsonl files. |
Download |
claude_history_connector/go.mod |
Go module manifest. | Download |
claude_history_connector/go.sum |
Go module checksums. | Download |
Using the Prebuilt Bundle¶
Download claude-history-bundle.tar.gz and extract it before use — FFI shared libraries can't be dlopen-ed from inside a tar:
Then FETCH to populate the bundle from your local ~/.claude/projects directory and start querying. Pick whichever interface you prefer:
The connector is a custom FFI shared library, so bundlebase has to be told it's allowed to load external code by setting system.allow_external_code = 'true'. The CLI tab below picks that up from a YAML config file (bundlebase.yaml) via --config; SQL uses SET CONFIG; Python passes a config={...} dict to open(). Without it, FETCH refuses to load the connector.
Download bundlebase.yaml into the same directory as the extracted bundle and pass it with --config:
# Pull data from your local ~/.claude/projects into the bundle.
# `extend` is the mutating subcommand; `--config` enables the FFI connector.
bundlebase extend --config bundlebase.yaml \
--bundle ./claude-history-bundle \
"FETCH base ADD"
# Same --config is needed on every read that touches the connector.
bundlebase query --config bundlebase.yaml \
--bundle ./claude-history-bundle \
"SELECT COUNT(*) FROM bundle"
# Recent assistant replies in one project (uses the project_id index)
bundlebase query --config bundlebase.yaml \
--bundle ./claude-history-bundle "
SELECT timestamp, content_text
FROM message_events
WHERE project_id = '/Users/me/src/myproj'
AND message_role = 'assistant'
ORDER BY timestamp DESC
LIMIT 10
"
The config file is just two lines of YAML — bundlebase.yaml contains:
Run these from the REPL (bundlebase repl --bundle ./claude-history-bundle) or any tool that submits SQL to the bundle. The SET CONFIG is session-scoped, so it only needs to run once per connection.
-- Allow loading the bundled FFI connector. Runtime-only (session scope).
SET CONFIG allow_external_code = 'true' FOR 'system';
-- Pull data from your local ~/.claude/projects into the bundle
FETCH base ADD;
-- Quick sanity-check
SELECT COUNT(*) FROM bundle;
-- Recent assistant replies in one project (uses the project_id index)
SELECT timestamp, content_text
FROM message_events
WHERE project_id = '/Users/me/src/myproj'
AND message_role = 'assistant'
ORDER BY timestamp DESC
LIMIT 10;
import bundlebase.sync as bb
# `config={"system": {"allow_external_code": "true"}}` is required so
# bundlebase will load the bundled FFI connector. extend() turns the
# read-only Bundle into a BundleBuilder so we can fetch and commit.
bundle = bb.open(
"./claude-history-bundle",
config={"system": {"allow_external_code": "true"}},
).extend()
# Pull data from your local ~/.claude/projects into the bundle
bundle.fetch("base", "add")
bundle.commit("Loaded transcripts")
# Quick sanity-check
print(bundle.query("SELECT COUNT(*) FROM bundle"))
# Recent assistant replies in one project (uses the project_id index)
print(bundle.query(
"""
SELECT timestamp, content_text
FROM message_events
WHERE project_id = '/Users/me/src/myproj'
AND message_role = 'assistant'
ORDER BY timestamp DESC
LIMIT 10
"""
))
The bundle already carries the source definition, the connector binary, and the indexes and views — so as soon as FETCH finishes, your data is queryable through message_events, tool_events, and the indexed columns.
The connector binary is platform-specific — the prebuilt tar ships only for the platform it was built on. To target other platforms, run the script yourself.
Querying the Bundle¶
Queries always run against the table named bundle — that's the full row set. The b-tree indexes on project_id, session_id, event_type, and timestamp make filters on those columns cheap, and the inverted text index on search_text powers full-text search. Every session that does a fetch needs SET CONFIG allow_external_code = 'true' FOR 'system' first (or the equivalent Python config=... arg).
-- All assistant replies in one project, newest first.
-- `event_type` mirrors the JSONL `type` field — assistant/user messages
-- have event_type = 'assistant' or 'user' (there is no 'message').
SELECT timestamp, content_text
FROM bundle
WHERE project_id = '/Users/me/src/myproj'
AND event_type = 'assistant'
ORDER BY timestamp DESC
LIMIT 50;
-- Distribution of event types across the whole bundle
SELECT event_type, COUNT(*) AS n
FROM bundle
GROUP BY event_type
ORDER BY n DESC;
-- Sessions ranked by how many tool calls they made
-- (tool calls live on assistant rows; tool_names is non-null only for them).
SELECT session_id, COUNT(*) AS tool_calls
FROM bundle
WHERE event_type = 'assistant' AND tool_names IS NOT NULL
GROUP BY session_id
ORDER BY tool_calls DESC
LIMIT 20;
-- Token spend per day across every project
SELECT date_trunc('day', timestamp) AS day,
SUM(message_usage_input_tokens) AS input_tokens,
SUM(message_usage_output_tokens) AS output_tokens
FROM bundle
WHERE event_type = 'assistant'
GROUP BY 1
ORDER BY 1 DESC;
-- Full-text search uses the `search()` table function, which replaces
-- `FROM bundle` and exposes a BM25 `_score` column. Single-arg form is
-- enough here because the bundle has exactly one inverted index.
SELECT _score, timestamp, project_id, event_type, tool_names, content_text
FROM search('web_search')
ORDER BY _score DESC
LIMIT 100;
-- Full-text search combined with an indexed filter (search() is just a
-- table source — normal WHERE/ORDER BY clauses compose with it).
SELECT _score, timestamp, content_text
FROM search('pg_dump')
WHERE project_id = '/Users/me/src/myproj'
ORDER BY _score DESC
LIMIT 50;
-- Two-arg form names the index explicitly and accepts BM25 query syntax
-- (field:term, AND/OR, etc.) — useful when a bundle has multiple text indexes.
SELECT _score, timestamp, content_text
FROM search('search_text_idx', 'web_search AND timeout')
ORDER BY _score DESC
LIMIT 20;
-- Errors surfaced from tool results
SELECT timestamp, session_id, tool_names, tool_result_error
FROM bundle
WHERE tool_result_error IS NOT NULL
ORDER BY timestamp DESC
LIMIT 50;
Using the bundled views¶
message_events and tool_events are predefined views, but you don't SELECT FROM message_events — you scope the bundle to the view first, then query bundle against that scope. From Python:
import bundlebase as bb
bundle = bb.open("./claude-history-bundle", config={"system": {"allow_external_code": "true"}})
messages = bundle.view("message_events")
print(messages.query("""
SELECT timestamp, message_role, content_text
FROM bundle
WHERE project_id = '/Users/me/src/myproj'
ORDER BY timestamp DESC
LIMIT 50
"""))
tools = bundle.view("tool_events")
print(tools.query("SELECT tool_names, COUNT(*) FROM bundle GROUP BY tool_names ORDER BY 2 DESC"))
bundle.view(name) returns a read-only sub-bundle that exposes the view's filtered/projected rows under the same bundle table name, so the same SQL idioms apply.
Building the Bundle Yourself¶
From this example's scripts/claude_history/ directory:
# Build into the default ./claude-history-bundle directory and tar
python create_claude_history_bundle.py
The script needs the Go toolchain on your PATH to compile the connector and bundlebase (with allow_external_code = true) to register it.
The script writes a directory bundle and then calls bundle.export_tar(...) to produce a single-file copy for distribution. The directory bundle is what you actually use locally — the tar exists so you can hand a single artifact to someone else.
The Build Script¶
The Python script runs in three phases:
- Build a full bundle locally.
CREATE SOURCEauto-fetches your own transcripts so the bundle has a real schema.CREATE INDEXandCREATE VIEWare then applied against that schema (they need to resolve column references, which an empty bundle can't do). EXPORT EMPTYto a sibling path. The export walks the full bundle's history and re-applies only the structural operations (CREATE SOURCE, CREATE INDEX, CREATE VIEW, IMPORT CONNECTOR, EXPECTED SCHEMA, etc.) into a fresh bundle. ATTACH/DETACH/REPLACE/DELETE/UPDATE are stripped, so the result has no rows but knows how to fetch them.export_tarpackages the empty bundle as a single.tarfor download.
"""Create an empty Bundlebase bundle for Claude Code transcript history.
The shipped bundle holds:
* A Go connector compiled to a shared library, registered with `IMPORT
CONNECTOR ... WITH (src = '...')` so a recipient can recover the source.
* A `CREATE SOURCE` definition that points the connector at `~/.claude/projects`
(or any directory passed via `--source-dir`).
* B-tree indexes on `project_id`, `session_id`, `event_type`, `timestamp` and
an inverted text index on `search_text`.
* Two views (`message_events`, `tool_events`).
To get the indexes and views into the empty bundle, the script first builds a
*full* working bundle locally (with `fetch=True` against your own transcripts,
which gives the bundle a real schema to resolve indexes / views against),
then runs `EXPORT EMPTY` to strip the data while preserving every structural
op. Recipients of the empty bundle run `FETCH base ADD` to populate it from
their own transcript history under the same shape.
The script also packages the empty bundle as a single ``.tar`` for
distribution. (FFI shared libraries can't be ``dlopen``-ed from inside a tar,
so recipients of the tar must extract it before running ``FETCH``.)
Run from this directory:
python create_claude_history_bundle.py [--bundle-dir DIR] [--tar PATH]
"""
from __future__ import annotations
import argparse
import shutil
import subprocess
import sys
import zipfile
from pathlib import Path
import bundlebase.sync as bb
SCRIPT_DIR = Path(__file__).resolve().parent
DEFAULT_BUNDLE_DIR = SCRIPT_DIR / "claude-history-bundle"
DEFAULT_TAR = SCRIPT_DIR / "claude-history-bundle.tar.gz"
DEFAULT_SOURCE_DIR = Path.home() / ".claude" / "projects"
DEFAULT_PATTERNS = "*.jsonl,**/*.jsonl"
CONNECTOR_NAME = "local.claude_history"
CONNECTOR_SOURCE = SCRIPT_DIR / "claude_history_connector"
def library_filename() -> str:
"""Return the platform-appropriate shared-library filename."""
suffix = {"darwin": ".dylib", "win32": ".dll"}.get(sys.platform, ".so")
return f"libclaude_history_connector{suffix}"
def build_connector(output_dir: Path) -> Path:
"""Compile the sibling Go connector to a shared library."""
output_dir.mkdir(parents=True, exist_ok=True)
library_path = output_dir / library_filename()
subprocess.run(
["go", "build", "-buildmode=c-shared", "-o", str(library_path), "."],
cwd=CONNECTOR_SOURCE,
check=True,
)
return library_path.resolve()
def zip_connector_source(output_path: Path) -> Path:
"""Bundle the Go source into a zip the script attaches with WITH (src = ...)."""
output_path.parent.mkdir(parents=True, exist_ok=True)
with zipfile.ZipFile(output_path, "w", zipfile.ZIP_DEFLATED) as zf:
for file in CONNECTOR_SOURCE.rglob("*"):
if file.is_file():
zf.write(file, file.relative_to(CONNECTOR_SOURCE.parent))
return output_path
def create_bundle(
bundle_dir: Path,
source_dir: Path,
patterns: str,
tar_path: Path | None,
) -> None:
if bundle_dir.exists():
raise SystemExit(
f"Bundle directory already exists: {bundle_dir}\n"
"Remove it first if you want to recreate."
)
if tar_path is not None and tar_path.exists():
raise SystemExit(
f"Tar file already exists: {tar_path}\nRemove it first if you want to recreate."
)
build_dir = bundle_dir.parent / ".build"
full_bundle_dir = build_dir / "full-bundle"
if full_bundle_dir.exists():
shutil.rmtree(full_bundle_dir)
library_path = build_connector(build_dir)
source_zip = zip_connector_source(build_dir / "claude_history_connector_source.zip")
# ----- Phase 1: build a full working bundle (with data) -----
#
# CREATE SOURCE auto-fetches by default, which populates the schema.
# CREATE INDEX / CREATE VIEW need that schema to resolve column references,
# so they have to run on the populated bundle. The data we ingest here
# gets stripped in phase 2 — only the structural ops survive.
full = bb.create(
str(full_bundle_dir),
config={"system": {"allow_external_code": "true"}},
)
full.set_name("Claude History")
full.set_description(
"Flattened Claude Code transcript history with one row per transcript event. "
"Optimized for querying sessions, prompts, replies, tool calls, and tool results."
)
# Register the compiled connector and ship the Go source alongside it so
# any recipient can audit, fork, or rebuild via `EXPORT SOURCE`.
full.import_connector(
CONNECTOR_NAME,
f"ffi::{library_path}",
src=str(source_zip),
)
source_args = {"patterns": patterns}
if source_dir.resolve() != DEFAULT_SOURCE_DIR.resolve():
source_args["source_dir"] = str(source_dir.resolve())
# Default fetch=True so the bundle has a real schema for the indexes
# and views below to resolve against.
full.create_source(CONNECTOR_NAME, source_args)
full.create_index("project_id", "btree")
full.create_index("session_id", "btree")
full.create_index("event_type", "btree")
full.create_index("timestamp", "btree")
full.create_index(["search_text"], "text", name="search_text_idx")
full.create_view(
"message_events",
"SELECT * FROM bundle WHERE message_role IS NOT NULL",
)
full.create_view(
"tool_events",
"SELECT * FROM bundle WHERE tool_names IS NOT NULL OR tool_result_text IS NOT NULL",
)
full.commit("Built full Claude transcript history bundle")
# ----- Phase 2: export EMPTY to strip data, keep structure -----
#
# `EXPORT EMPTY` walks the full bundle's history and re-applies only the
# structural ops (CREATE SOURCE, CREATE INDEX, CREATE VIEW, column ops,
# always-update / always-delete rules, EXPECTED SCHEMA) to a fresh bundle
# at `bundle_dir`. ATTACH/DETACH/REPLACE/DELETE/UPDATE are dropped, so
# the resulting bundle has no rows but knows how to fetch them.
full.export_empty(str(bundle_dir))
print(f"Created empty bundle at {bundle_dir}")
# ----- Phase 3 (optional): package the empty bundle as a tar -----
if tar_path is not None:
empty = bb.open(
str(bundle_dir),
config={"system": {"allow_external_code": "true"}},
).extend()
# gzip=True writes a .tar.gz so the downloadable artifact is small.
# Recipients still need to extract before FETCH because dlopen can't
# read shared libs from inside an archive.
empty.export_tar(str(tar_path), gzip=True)
print(f"Packaged empty bundle as tar at {tar_path}")
print("Recipients of the tar must extract it (`tar -xf …`) before running FETCH.")
print(f"Default source directory: {source_dir}")
print("Run `FETCH base ADD` against the empty bundle to pull in transcript data.")
def main() -> None:
parser = argparse.ArgumentParser(description=__doc__.splitlines()[0])
parser.add_argument(
"--bundle-dir",
type=Path,
default=DEFAULT_BUNDLE_DIR,
help=f"Output directory bundle path (default: {DEFAULT_BUNDLE_DIR}).",
)
parser.add_argument(
"--tar",
type=Path,
nargs="?",
const=DEFAULT_TAR,
default=DEFAULT_TAR,
help=(
"Also package the bundle as a single .tar archive at this path "
f"(default: {DEFAULT_TAR}). Pass --no-tar to skip."
),
)
parser.add_argument(
"--no-tar",
dest="tar",
action="store_const",
const=None,
help="Skip the .tar packaging step.",
)
parser.add_argument(
"--source-dir",
type=Path,
default=DEFAULT_SOURCE_DIR,
help=f"Claude transcript directory baked into the source (default: {DEFAULT_SOURCE_DIR}).",
)
parser.add_argument(
"--patterns",
default=DEFAULT_PATTERNS,
help=f"Comma-separated glob patterns to include (default: {DEFAULT_PATTERNS}).",
)
args = parser.parse_args()
create_bundle(
args.bundle_dir.resolve(),
args.source_dir.expanduser().resolve(),
args.patterns,
args.tar.resolve() if args.tar is not None else None,
)
if __name__ == "__main__":
main()
The Go Connector¶
The connector implements the native FFI ABI: bundlebase_discover to enumerate JSONL files in the source directory and bundlebase_data to stream one Arrow record batch per file. Transcript events are flattened — one row per JSON line, with nested message / tool / usage fields hoisted into top-level columns matching the indexes the bundle defines.
package main
import "C"
import (
"bufio"
"encoding/json"
"fmt"
"hash/fnv"
"os"
"path/filepath"
"sort"
"strings"
"time"
"github.com/apache/arrow-go/v18/arrow"
"github.com/bmatcuk/doublestar/v4"
sdk "github.com/nvoxland/bundlebase/sdk/go/bundlebasesdk"
)
const (
defaultSourceDir = "~/.claude/projects"
defaultPatterns = "*.jsonl,**/*.jsonl"
batchSize = 1000
)
var outputSchema = map[string]string{
"agent_id": "Utf8",
"content_text": "Utf8",
"content_types": "Utf8",
"cwd": "Utf8",
"data_json": "Utf8",
"entrypoint": "Utf8",
"event_subtype": "Utf8",
"event_type": "Utf8",
"file_is_subagent": "Boolean",
"git_branch": "Utf8",
"is_sidechain": "Boolean",
"line_number": "Int64",
"message_id": "Utf8",
"message_model": "Utf8",
"message_role": "Utf8",
"message_stop_reason": "Utf8",
"message_type": "Utf8",
"message_usage_cache_creation_ephemeral_1h_input_tokens": "Int64",
"message_usage_cache_creation_ephemeral_5m_input_tokens": "Int64",
"message_usage_cache_creation_input_tokens": "Int64",
"message_usage_cache_read_input_tokens": "Int64",
"message_usage_inference_geo": "Utf8",
"message_usage_input_tokens": "Int64",
"message_usage_iterations_cache_creation_ephemeral_1h_input_tokens": "Int64",
"message_usage_iterations_cache_creation_ephemeral_5m_input_tokens": "Int64",
"message_usage_iterations_cache_creation_input_tokens": "Int64",
"message_usage_iterations_cache_read_input_tokens": "Int64",
"message_usage_iterations_count": "Int64",
"message_usage_iterations_input_tokens": "Int64",
"message_usage_iterations_output_tokens": "Int64",
"message_usage_iterations_types": "Utf8",
"message_usage_output_tokens": "Int64",
"message_usage_server_tool_use_web_fetch_requests": "Int64",
"message_usage_server_tool_use_web_search_requests": "Int64",
"message_usage_service_tier": "Utf8",
"message_usage_speed": "Utf8",
"parent_tool_use_id": "Utf8",
"parent_uuid": "Utf8",
"permission_mode": "Utf8",
"project_id": "Utf8",
"prompt_id": "Utf8",
"request_id": "Utf8",
"search_text": "Utf8",
"session_id": "Utf8",
"slug": "Utf8",
"snapshot_json": "Utf8",
"source_file": "Utf8",
"source_tool_assistant_uuid": "Utf8",
"thinking_text": "Utf8",
"timestamp": "Timestamp",
"tool_call_ids": "Utf8",
"tool_input_json": "Utf8",
"tool_names": "Utf8",
"tool_result_error": "Boolean",
"tool_result_text": "Utf8",
"tool_use_id": "Utf8",
"user_type": "Utf8",
"uuid": "Utf8",
"version": "Utf8",
}
type ClaudeHistoryConnector struct{}
func init() {
sdk.ExportConnector(&ClaudeHistoryConnector{})
}
func main() {}
func (c *ClaudeHistoryConnector) Discover(_ []string, args map[string]string) ([]sdk.Location, error) {
sourceDir, err := resolveSourceDir(args)
if err != nil {
return nil, err
}
patterns := parsePatterns(args)
projectFiles, err := collectProjectFiles(sourceDir, patterns)
if err != nil {
return nil, err
}
projects := make([]string, 0, len(projectFiles))
for projectID := range projectFiles {
projects = append(projects, projectID)
}
sort.Strings(projects)
locations := make([]sdk.Location, 0, len(projects))
for _, projectID := range projects {
locations = append(locations, sdk.Location{
Location: projectID,
MustCopy: true,
Format: "parquet",
Version: projectVersion(sourceDir, projectFiles[projectID]),
})
}
sort.Slice(locations, func(i, j int) bool {
return locations[i].Location < locations[j].Location
})
return locations, nil
}
func (c *ClaudeHistoryConnector) Data(location sdk.Location, args map[string]string) ([]arrow.Record, error) {
sourceDir, err := resolveSourceDir(args)
if err != nil {
return nil, err
}
projectFiles, err := collectProjectFiles(sourceDir, parsePatterns(args))
if err != nil {
return nil, err
}
files, ok := projectFiles[location.Location]
if !ok {
return nil, fmt.Errorf("project batch not found: %s", location.Location)
}
rows := make([]map[string]interface{}, 0, batchSize)
records := make([]arrow.Record, 0)
for _, relPath := range files {
path := filepath.Join(sourceDir, filepath.FromSlash(relPath))
file, err := os.Open(path)
if err != nil {
return nil, err
}
scanner := bufio.NewScanner(file)
scanner.Buffer(make([]byte, 1024*1024), 20*1024*1024)
var lineNumber int64
for scanner.Scan() {
lineNumber++
line := strings.TrimSpace(scanner.Text())
if line == "" {
continue
}
var event map[string]interface{}
if err := json.Unmarshal([]byte(line), &event); err != nil {
file.Close()
return nil, fmt.Errorf("parse %s line %d: %w", relPath, lineNumber, err)
}
rows = append(rows, buildRow(relPath, lineNumber, event))
if len(rows) >= batchSize {
batchRecords, err := sdk.NormalizeToRecords(rows, outputSchema)
if err != nil {
file.Close()
return nil, err
}
records = append(records, batchRecords...)
rows = rows[:0]
}
}
if err := scanner.Err(); err != nil {
file.Close()
return nil, err
}
file.Close()
}
if len(rows) > 0 {
batchRecords, err := sdk.NormalizeToRecords(rows, outputSchema)
if err != nil {
return nil, err
}
records = append(records, batchRecords...)
}
return records, nil
}
func buildRow(sourceFile string, lineNumber int64, event map[string]interface{}) map[string]interface{} {
projectID := projectIDForSourceFile(sourceFile)
row := map[string]interface{}{
"event_subtype": stringValue(event["subtype"]),
"event_type": stringValue(event["type"]),
"file_is_subagent": strings.Contains(sourceFile, "/subagents/"),
"line_number": lineNumber,
"parent_tool_use_id": stringValue(event["parentToolUseID"]),
"parent_uuid": stringValue(event["parentUuid"]),
"permission_mode": stringValue(event["permissionMode"]),
"project_id": projectID,
"prompt_id": stringValue(event["promptId"]),
"request_id": stringValue(event["requestId"]),
"session_id": stringValue(event["sessionId"]),
"slug": stringValue(event["slug"]),
"source_file": sourceFile,
"source_tool_assistant_uuid": stringValue(event["sourceToolAssistantUUID"]),
"timestamp": parseTimestampMicros(stringValue(event["timestamp"])),
"tool_use_id": stringValue(event["toolUseID"]),
"user_type": stringValue(event["userType"]),
"uuid": stringValue(event["uuid"]),
"version": stringValue(event["version"]),
}
if value, ok := boolValue(event["isSidechain"]); ok {
row["is_sidechain"] = value
}
if value := stringValue(event["cwd"]); value != "" {
row["cwd"] = value
}
if value := stringValue(event["gitBranch"]); value != "" {
row["git_branch"] = value
}
if value := stringValue(event["entrypoint"]); value != "" {
row["entrypoint"] = value
}
if value := jsonString(event["data"]); value != "" {
row["data_json"] = value
}
if value := jsonString(event["snapshot"]); value != "" {
row["snapshot_json"] = value
}
if value := stringOrStructuredText(event["content"]); value != "" {
row["content_text"] = value
}
textParts := []string{stringValue(row["content_text"])}
thinkingParts := []string{}
contentTypes := []string{}
toolNames := []string{}
toolCallIDs := []string{}
toolInputs := []interface{}{}
toolResultParts := []string{}
toolResultUseIDs := []string{}
var toolResultError *bool
if message, ok := mapValue(event["message"]); ok {
row["message_id"] = stringValue(message["id"])
row["message_model"] = stringValue(message["model"])
row["message_role"] = stringValue(message["role"])
row["message_stop_reason"] = stringValue(message["stop_reason"])
row["message_type"] = stringValue(message["type"])
if usage, ok := mapValue(message["usage"]); ok {
addUsageFields(row, usage)
}
switch content := message["content"].(type) {
case string:
appendIfPresent(&textParts, content)
appendIfPresent(&contentTypes, "text")
case []interface{}:
for _, item := range content {
itemMap, ok := mapValue(item)
if !ok {
appendIfPresent(&textParts, stringOrStructuredText(item))
continue
}
itemType := stringValue(itemMap["type"])
appendIfPresent(&contentTypes, itemType)
switch itemType {
case "text":
appendIfPresent(&textParts, stringValue(itemMap["text"]))
case "thinking":
appendIfPresent(&thinkingParts, firstNonEmpty(
stringValue(itemMap["thinking"]),
stringValue(itemMap["text"]),
))
case "tool_use":
appendIfPresent(&toolNames, stringValue(itemMap["name"]))
appendIfPresent(&toolCallIDs, stringValue(itemMap["id"]))
if input := itemMap["input"]; input != nil {
toolInputs = append(toolInputs, input)
}
case "tool_result":
appendIfPresent(&toolResultParts, stringOrStructuredText(itemMap["content"]))
appendIfPresent(&toolResultUseIDs, stringValue(itemMap["tool_use_id"]))
if value, ok := boolValue(itemMap["is_error"]); ok {
if toolResultError == nil {
toolResultError = new(bool)
}
*toolResultError = *toolResultError || value
}
default:
appendIfPresent(&textParts, fallbackText(itemMap))
}
}
default:
appendIfPresent(&textParts, stringOrStructuredText(content))
}
}
row["content_text"] = joined(textParts)
row["thinking_text"] = joined(thinkingParts)
row["content_types"] = joinedWithComma(contentTypes)
row["tool_names"] = joinedWithComma(toolNames)
row["tool_call_ids"] = joinedWithComma(toolCallIDs)
row["tool_result_text"] = joined(toolResultParts)
if len(toolInputs) > 0 {
row["tool_input_json"] = jsonString(toolInputs)
}
if toolResultError != nil {
row["tool_result_error"] = *toolResultError
}
// Claude Code nests `tool_use_id` inside each `tool_result` content block;
// the top-level `event["toolUseID"]` only appears on a few sub-agent
// metadata events. Prefer the nested ID(s) so result rows can be joined
// back to the assistant's tool_use via the same id.
if len(toolResultUseIDs) > 0 {
row["tool_use_id"] = joinedWithComma(toolResultUseIDs)
}
searchParts := []string{
stringValue(row["content_text"]),
stringValue(row["thinking_text"]),
stringValue(row["tool_result_text"]),
strings.Join(toolNames, " "),
stringValue(row["event_type"]),
stringValue(row["message_role"]),
stringValue(row["message_model"]),
}
row["search_text"] = joined(searchParts)
if value := stringValue(event["agentId"]); value != "" {
row["agent_id"] = value
}
return row
}
func collectProjectFiles(sourceDir string, patterns []string) (map[string][]string, error) {
projectFiles := make(map[string][]string)
err := filepath.WalkDir(sourceDir, func(path string, d os.DirEntry, walkErr error) error {
if walkErr != nil {
return walkErr
}
if d.IsDir() {
return nil
}
relPath, err := filepath.Rel(sourceDir, path)
if err != nil {
return err
}
relPath = filepath.ToSlash(relPath)
if !matchesPatterns(relPath, patterns) {
return nil
}
projectID := projectIDForSourceFile(relPath)
projectFiles[projectID] = append(projectFiles[projectID], relPath)
return nil
})
if err != nil {
return nil, err
}
for projectID := range projectFiles {
sort.Strings(projectFiles[projectID])
}
return projectFiles, nil
}
func projectVersion(sourceDir string, files []string) string {
hasher := fnv.New64a()
for _, relPath := range files {
_, _ = hasher.Write([]byte(relPath))
_, _ = hasher.Write([]byte{0})
fullPath := filepath.Join(sourceDir, filepath.FromSlash(relPath))
if info, err := os.Stat(fullPath); err == nil {
_, _ = hasher.Write([]byte(fileVersion(info)))
}
_, _ = hasher.Write([]byte{'\n'})
}
return fmt.Sprintf("%d-%x", len(files), hasher.Sum64())
}
func projectIDForSourceFile(sourceFile string) string {
projectID := sourceFile
if slash := strings.IndexByte(sourceFile, '/'); slash >= 0 {
projectID = sourceFile[:slash]
}
return projectID
}
func resolveSourceDir(args map[string]string) (string, error) {
candidate := firstNonEmpty(args["source_dir"], args["url"], defaultSourceDir)
if strings.HasPrefix(candidate, "file://") {
candidate = strings.TrimPrefix(candidate, "file://")
}
if strings.HasPrefix(candidate, "~/") {
home, err := os.UserHomeDir()
if err != nil {
return "", err
}
candidate = filepath.Join(home, candidate[2:])
}
path, err := filepath.Abs(candidate)
if err != nil {
return "", err
}
info, err := os.Stat(path)
if err != nil {
return "", err
}
if !info.IsDir() {
return "", fmt.Errorf("source directory is not a directory: %s", path)
}
return path, nil
}
func parsePatterns(args map[string]string) []string {
patterns := firstNonEmpty(args["patterns"], defaultPatterns)
parts := strings.Split(patterns, ",")
normalized := make([]string, 0, len(parts))
for _, part := range parts {
part = strings.TrimSpace(part)
if part != "" {
normalized = append(normalized, part)
}
}
return normalized
}
func matchesPatterns(path string, patterns []string) bool {
for _, pattern := range patterns {
matched, err := doublestar.Match(pattern, path)
if err == nil && matched {
return true
}
}
return false
}
func fileVersion(info os.FileInfo) string {
return fmt.Sprintf("%d-%d", info.Size(), info.ModTime().UnixNano())
}
func mapValue(value interface{}) (map[string]interface{}, bool) {
asMap, ok := value.(map[string]interface{})
return asMap, ok
}
func boolValue(value interface{}) (bool, bool) {
asBool, ok := value.(bool)
return asBool, ok
}
func int64Value(value interface{}) (int64, bool) {
switch typed := value.(type) {
case int:
return int64(typed), true
case int64:
return typed, true
case float64:
return int64(typed), true
case json.Number:
parsed, err := typed.Int64()
if err == nil {
return parsed, true
}
return 0, false
default:
return 0, false
}
}
// parseTimestampMicros converts an ISO-8601 string (the format Claude Code
// writes for `timestamp` fields) to microseconds since the Unix epoch — the
// units the SDK's Timestamp_us builder expects. Returns nil for empty or
// unparseable inputs so the row gets a real null instead of a sentinel zero.
func parseTimestampMicros(s string) interface{} {
if s == "" {
return nil
}
t, err := time.Parse(time.RFC3339Nano, s)
if err != nil {
return nil
}
return t.UnixMicro()
}
func stringValue(value interface{}) string {
if value == nil {
return ""
}
switch typed := value.(type) {
case string:
return typed
case json.Number:
return typed.String()
case float64:
return fmt.Sprintf("%.0f", typed)
case bool:
if typed {
return "true"
}
return "false"
default:
return ""
}
}
func stringOrStructuredText(value interface{}) string {
switch typed := value.(type) {
case nil:
return ""
case string:
return typed
case []interface{}:
parts := make([]string, 0, len(typed))
for _, item := range typed {
appendIfPresent(&parts, stringOrStructuredText(item))
}
return joined(parts)
case map[string]interface{}:
return fallbackText(typed)
default:
return fmt.Sprint(typed)
}
}
func fallbackText(value map[string]interface{}) string {
parts := []string{
stringValue(value["text"]),
stringValue(value["thinking"]),
stringOrStructuredText(value["content"]),
}
if text := joined(parts); text != "" {
return text
}
return jsonString(value)
}
func jsonString(value interface{}) string {
if value == nil {
return ""
}
bytes, err := json.Marshal(value)
if err != nil {
return ""
}
return string(bytes)
}
func appendIfPresent(dest *[]string, value string) {
value = strings.TrimSpace(value)
if value != "" {
*dest = append(*dest, value)
}
}
func joined(parts []string) string {
trimmed := make([]string, 0, len(parts))
for _, part := range parts {
part = strings.TrimSpace(part)
if part != "" {
trimmed = append(trimmed, part)
}
}
return strings.Join(trimmed, "\n\n")
}
func joinedWithComma(parts []string) string {
trimmed := make([]string, 0, len(parts))
for _, part := range parts {
part = strings.TrimSpace(part)
if part != "" {
trimmed = append(trimmed, part)
}
}
return strings.Join(trimmed, ", ")
}
func firstNonEmpty(values ...string) string {
for _, value := range values {
if strings.TrimSpace(value) != "" {
return value
}
}
return ""
}
func addUsageFields(row map[string]interface{}, usage map[string]interface{}) {
setInt64Field(row, "message_usage_input_tokens", usage["input_tokens"])
setInt64Field(row, "message_usage_output_tokens", usage["output_tokens"])
setInt64Field(row, "message_usage_cache_creation_input_tokens", usage["cache_creation_input_tokens"])
setInt64Field(row, "message_usage_cache_read_input_tokens", usage["cache_read_input_tokens"])
setStringField(row, "message_usage_inference_geo", usage["inference_geo"])
setStringField(row, "message_usage_service_tier", usage["service_tier"])
setStringField(row, "message_usage_speed", usage["speed"])
if cacheCreation, ok := mapValue(usage["cache_creation"]); ok {
setInt64Field(row, "message_usage_cache_creation_ephemeral_1h_input_tokens", cacheCreation["ephemeral_1h_input_tokens"])
setInt64Field(row, "message_usage_cache_creation_ephemeral_5m_input_tokens", cacheCreation["ephemeral_5m_input_tokens"])
}
if serverToolUse, ok := mapValue(usage["server_tool_use"]); ok {
setInt64Field(row, "message_usage_server_tool_use_web_fetch_requests", serverToolUse["web_fetch_requests"])
setInt64Field(row, "message_usage_server_tool_use_web_search_requests", serverToolUse["web_search_requests"])
}
iterations, ok := usage["iterations"].([]interface{})
if !ok || len(iterations) == 0 {
return
}
row["message_usage_iterations_count"] = int64(len(iterations))
iterationTypes := make([]string, 0, len(iterations))
var inputTokens int64
var outputTokens int64
var cacheCreationInputTokens int64
var cacheReadInputTokens int64
var ephemeral1hInputTokens int64
var ephemeral5mInputTokens int64
for _, iteration := range iterations {
iterationMap, ok := mapValue(iteration)
if !ok {
continue
}
appendIfPresent(&iterationTypes, stringValue(iterationMap["type"]))
inputTokens += int64OrZero(iterationMap["input_tokens"])
outputTokens += int64OrZero(iterationMap["output_tokens"])
cacheCreationInputTokens += int64OrZero(iterationMap["cache_creation_input_tokens"])
cacheReadInputTokens += int64OrZero(iterationMap["cache_read_input_tokens"])
if cacheCreation, ok := mapValue(iterationMap["cache_creation"]); ok {
ephemeral1hInputTokens += int64OrZero(cacheCreation["ephemeral_1h_input_tokens"])
ephemeral5mInputTokens += int64OrZero(cacheCreation["ephemeral_5m_input_tokens"])
}
}
row["message_usage_iterations_types"] = joinedWithComma(iterationTypes)
row["message_usage_iterations_input_tokens"] = inputTokens
row["message_usage_iterations_output_tokens"] = outputTokens
row["message_usage_iterations_cache_creation_input_tokens"] = cacheCreationInputTokens
row["message_usage_iterations_cache_read_input_tokens"] = cacheReadInputTokens
row["message_usage_iterations_cache_creation_ephemeral_1h_input_tokens"] = ephemeral1hInputTokens
row["message_usage_iterations_cache_creation_ephemeral_5m_input_tokens"] = ephemeral5mInputTokens
}
func setInt64Field(row map[string]interface{}, key string, value interface{}) {
if parsed, ok := int64Value(value); ok {
row[key] = parsed
}
}
func setStringField(row map[string]interface{}, key string, value interface{}) {
if parsed := stringValue(value); parsed != "" {
row[key] = parsed
}
}
func int64OrZero(value interface{}) int64 {
if parsed, ok := int64Value(value); ok {
return parsed
}
return 0
}