Streams and Data Formats

crabka-client-streams is Crabka's Rust Streams client. It joins a KIP-1071 Streams rebalance group, runs a processing topology, and reads/writes Kafka topics through pluggable serdes.

Use this page when you are building stream-processing applications in Rust. Use Schema Registry Deployment when you need the registry service those schema-aware serdes talk to.

The client offers two processing models:

  • Row model — the Processor API and the high-level DSL (StreamsApp / streams_builder), one record at a time, with TopologyTestDriver for broker-free tests.
  • Columnar model — a ColumnarTopology whose edges are Polars DataFrames, for vectorized aggregation, with ColumnarTestDriver for broker-free tests.

Data formats

Serde / codecRust typeCargo featureUse it for
StringSerde / I64Serde / BytesSerdeString / i64 / Bytes(built-in)primitive keys/values
SchemaSerde<T, JsonSerde<T>>any serde + schemars::JsonSchema(built-in)Confluent JSON Schema
SchemaSerde<T, ProtobufSerde<T>>a prost Message(built-in)Confluent Protobuf (dynamic via prost-reflect)
SchemaSerde<T, AvroSerde<T>>apache_avro::AvroSchema(built-in)Confluent Avro
PolarsIpcSerdepolars::DataFramepolarscolumnar values (Arrow IPC)
ArrowIpcSerdearrow::RecordBatcharrowarrow-rs interchange
ColumnarSerde<T>columnar::Columnarcolumnarzero-copy native columnar

Schema serdes resolve schema IDs against a Confluent-compatible registry (crabka-schema-registry); the columnar serdes are self-describing Arrow IPC.

Getting started

Add the client and pick the columnar features you need:

[dependencies]
crabka-client-streams = { version = "0.3.7", features = ["polars", "arrow"] }

Define a type, then round-trip it through a schema serde. Any serde type that also derives schemars::JsonSchema works as a JSON-Schema value:

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, schemars::JsonSchema)]
struct OrderEvent {
    order_id: String,
    user: String,
    amount: f64,
    currency: String,
    ts_ms: i64,
}
let cache = SchemaCache::new(RegistryClient::new("http://unused"), CacheConfig::default());
cache.seed_subject_id("orders.json-value", 1);
let serde = SchemaSerde::new(JsonSerde::<OrderEvent>::value(&cache, false));

let event = OrderEvent {
    order_id: "o-1".into(),
    user: "alice".into(),
    amount: 5.0,
    currency: "USD".into(),
    ts_ms: 1,
};
let bytes = serde.serialize("orders.json", &event);
let back: OrderEvent = serde.deserialize("orders.json", &bytes).unwrap();

The idiomatic high-level DSL wires types in via DefaultSerde:

impl DefaultSerde for OrderEvent {
    type Serde = SchemaSerde<OrderEvent, JsonSerde<OrderEvent>>;
}
impl DefaultSerde for OrderProto {
    type Serde = SchemaSerde<OrderProto, ProtobufSerde<OrderProto>>;
}
let app = StreamsApp::builder()
    .bootstrap("127.0.0.1:9092")
    .application_id("orders-formats")
    .schema_registry("http://127.0.0.1:8081")
    .build();

let topology = app.streams_builder();
topology
    .stream::<String, OrderEvent>(["orders.json"])
    .map_values(|e: &OrderEvent| OrderProto {
        order_id: e.order_id.clone(),
        user: e.user.clone(),
        amount_cents: (e.amount * 100.0).round() as i64,
        currency: e.currency.to_uppercase(),
        ts_ms: e.ts_ms,
    })
    .to("orders.proto");

Columnar formats

For vectorized workloads, values are self-describing Arrow IPC. ArrowIpcSerde round-trips an arrow-rs RecordBatch (and PolarsIpcSerde does the same for a Polars DataFrame):

let schema = Arc::new(Schema::new(vec![
    Field::new("user", DataType::Utf8, false),
    Field::new("amount_cents", DataType::Int64, false),
]));
let batch = RecordBatch::try_new(
    schema,
    vec![
        Arc::new(StringArray::from(vec!["alice", "bob"])),
        Arc::new(Int64Array::from(vec![850_i64, 900])),
    ],
)
.unwrap();
let bytes = ArrowIpcSerde.serialize("orders.arrow", &batch);
let back = ArrowIpcSerde.deserialize("orders.arrow", &bytes).unwrap();

Worked pipeline: JSON → Protobuf → Arrow → Polars → summary Protobuf

This pipeline ingests order events as JSON, normalizes them to a Protobuf canonical form, batches them into Arrow, aggregates per user with the Polars columnar engine, and emits a Protobuf summary — one format at each topic hop:

orders.json   --JSON Schema-->  Stage A  --Protobuf-->  orders.proto
orders.proto  --Protobuf----->  Stage B  --Arrow IPC--> orders.arrow
orders.arrow  --Arrow IPC----->  Stage C (Polars group-by)
(agg rows)    --------------->  Stage D  --Protobuf-->  orders.summary

The full source is crates/client-streams/examples/format_pipeline.rs; it boots an in-process broker and Schema Registry and asserts the result, so it runs in CI as a test.

The harness — an in-process broker plus a Schema Registry on a real HTTP port:

async fn boot() -> Boot {
    let dir = tempfile::tempdir().expect("tempdir");
    let broker = Broker::start(BrokerConfig::for_tests(dir.path().to_path_buf()))
        .await
        .expect("broker start");
    let bootstrap = broker.listen_addr().to_string();

    // In-process Schema Registry over a real HTTP port.
    let cancel = CancellationToken::new();
    let cfg = RegistryConfig {
        bootstrap: bootstrap.clone(),
        schemas_topic: "_schemas".into(),
        schemas_topic_rf: 1,
        client_id: "format-pipeline-sr".into(),
        advertised_url: "http://127.0.0.1:0".into(),
        group_id: "schema-registry".into(),
        leader_eligibility: true,
        security: SecurityConfig::default(),
    };
    let store = KafkaStore::start(&cfg, cancel.clone())
        .await
        .expect("sr start");
    let app = rest::router(AppState { store });
    let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
        .await
        .expect("bind sr");
    let sr_addr = listener.local_addr().expect("sr addr");
    let serve_cancel = cancel.clone();
    tokio::spawn(async move {
        let _ = rest::serve::serve_http(listener, app, serve_cancel).await;
    });

    Boot {
        _broker: broker,
        bootstrap,
        registry_url: format!("http://{sr_addr}"),
        cancel,
        _dir: dir,
    }
}

The shared event type and the Arrow→Polars bridge codec:

/// Raw order, ingested as JSON (JSON-Schema serde).
#[derive(Clone, Debug, Serialize, Deserialize, schemars::JsonSchema)]
struct OrderEvent {
    order_id: String,
    user: String,
    amount: f64,
    currency: String,
    ts_ms: i64,
}
/// Source codec: each Kafka record value is an Arrow-IPC `RecordBatch`; decode
/// them into one Polars `DataFrame` the columnar engine can process. Bridges
/// arrow-rs -> polars explicitly (different Arrow memory libraries).
struct ArrowBlobCodec;

impl BatchCodec for ArrowBlobCodec {
    fn decode(&self, records: &[ConsumedRecord]) -> Result<DataFrame, BatchError> {
        let mut users: Vec<String> = Vec::new();
        let mut cents: Vec<i64> = Vec::new();
        for (i, rec) in records.iter().enumerate() {
            let batch = ArrowIpcSerde
                .deserialize("", &rec.value)
                .map_err(|e| BatchError(format!("arrow decode rec {i}: {e}")))?;
            let user_col = batch
                .column_by_name("user")
                .and_then(|c| c.as_any().downcast_ref::<StringArray>())
                .ok_or_else(|| BatchError("missing user column".into()))?;
            let cent_col = batch
                .column_by_name("amount_cents")
                .and_then(|c| c.as_any().downcast_ref::<Int64Array>())
                .ok_or_else(|| BatchError("missing amount_cents column".into()))?;
            for row in 0..batch.num_rows() {
                users.push(user_col.value(row).to_string());
                cents.push(cent_col.value(row));
            }
        }
        df!("user" => users, "amount_cents" => cents).map_err(|e| BatchError(e.to_string()))
    }

    fn encode(&self, _df: &DataFrame) -> Result<Vec<ProduceRecord>, BatchError> {
        Err(BatchError("ArrowBlobCodec is source-only".into()))
    }
}

Stage A — JSON → Protobuf

// Stage A — JSON -> Protobuf: deserialize JSON, normalize, emit OrderProto.
for v in drain(&bootstrap, "orders.json", "stage-a", events.len()).await {
    let ev: OrderEvent = json_serde
        .deserialize("orders.json", &v)
        .expect("json decode");
    let proto = OrderProto {
        order_id: ev.order_id,
        user: ev.user,
        amount_cents: (ev.amount * 100.0).round() as i64,
        currency: ev.currency.to_uppercase(),
        ts_ms: ev.ts_ms,
    };
    let bytes = proto_serde.serialize("orders.proto", &proto);
    send_record(&producer, "orders.proto", bytes).await;
}
producer.flush().await.expect("flush proto");

Stage B — Protobuf → Arrow

// Stage B — Protobuf -> Arrow: collect rows into one arrow-rs RecordBatch.
let mut users = Vec::new();
let mut cents = Vec::new();
for v in drain(&bootstrap, "orders.proto", "stage-b", events.len()).await {
    let p: OrderProto = proto_serde
        .deserialize("orders.proto", &v)
        .expect("proto decode");
    users.push(p.user);
    cents.push(p.amount_cents);
}
let schema = Arc::new(ArrowSchema::new(vec![
    Field::new("user", ArrowDataType::Utf8, false),
    Field::new("amount_cents", ArrowDataType::Int64, false),
]));
let batch = ::arrow::array::RecordBatch::try_new(
    schema,
    vec![
        Arc::new(StringArray::from(users)),
        Arc::new(Int64Array::from(cents)),
    ],
)
.expect("record batch");
send_record(
    &producer,
    "orders.arrow",
    ArrowIpcSerde.serialize("orders.arrow", &batch),
)
.await;
producer.flush().await.expect("flush arrow");

Stage C — Arrow → Polars (columnar group-by)

// Stage C — Arrow -> Polars: group-by-user sum + count in the columnar engine.
let consumed: Vec<ConsumedRecord> = drain(&bootstrap, "orders.arrow", "stage-c", 1)
    .await
    .into_iter()
    .enumerate()
    .map(|(i, v)| ConsumedRecord {
        key: None,
        value: v,
        timestamp: 0,
        partition: 0,
        offset: i as i64,
    })
    .collect();

let mut topo = ColumnarTopology::new();
let src = topo.add_source("src", ["orders.arrow"], ArrowBlobCodec);
let agg = topo.add_operator(
    "agg",
    BuiltinOp::GroupByAgg {
        keys: vec![col("user")],
        aggs: vec![
            col("amount_cents").sum().alias("total_cents"),
            col("amount_cents").count().alias("order_count"),
        ],
    },
    src,
);
topo.add_sink("out", "orders.summary.df", BlobCodec::default(), agg);
let built = topo.build().expect("build columnar");
let produced = built
    .run_batch("orders.arrow", &consumed)
    .expect("run_batch");

Stage D — Polars → summary Protobuf

// Stage D — Polars -> Protobuf: each aggregated row becomes an OrderSummary.
for (_topic, rec) in produced {
    let df = PolarsIpcSerde
        .deserialize("orders.summary.df", &rec.value)
        .expect("polars decode");
    let user_col = df.column("user").expect("user");
    let total_col = df.column("total_cents").expect("total_cents");
    let count_col = df
        .column("order_count")
        .expect("order_count")
        .cast(&DataType::Int64)
        .expect("cast count");
    for i in 0..df.height() {
        let summary = OrderSummary {
            user: extract_str(user_col, i),
            total_cents: extract_i64(total_col, i),
            order_count: extract_i64(&count_col, i),
        };
        let bytes = summary_serde.serialize("orders.summary", &summary);
        send_record(&producer, "orders.summary", bytes).await;
    }
}
producer.flush().await.expect("flush summary");

Verifying the rollup

// Verify the per-user rollup off the wire.
let mut by_user = BTreeMap::new();
for v in drain(&bootstrap, "orders.summary", "verify", 2).await {
    let s: OrderSummary = summary_serde
        .deserialize("orders.summary", &v)
        .expect("summary decode");
    by_user.insert(s.user.clone(), s);
}
let alice = by_user.get("alice").expect("alice summary");
assert_eq!(alice.total_cents, 850, "alice total_cents");
assert_eq!(alice.order_count, 2, "alice order_count");
let bob = by_user.get("bob").expect("bob summary");
assert_eq!(bob.total_cents, 900, "bob total_cents");
assert_eq!(bob.order_count, 1, "bob order_count");