Arrow Flight Streaming Design
End-to-end Arrow RecordBatch streaming from DuckDB to client via gRPC Flight, with 1:1 API compatibility to current REST endpoints plus MandoDataPointConfig metadata.
Base Branch
feature/BE-2023— uses centralized error logging and error code enforcement patterns.
Design Decisions
| Decision | Choice | Rationale |
|---|---|---|
| Streaming core | DataPointStream wrapping async Stream<Item=Result<RecordBatch, StreamError>> | Async-native, composable |
| DuckDB bridge | spawn_blocking + bounded mpsc(8), using stream_arrow() | ArrowStream borrows stmt, proven in railscale |
| Calculated DPs | Selective materialization — pure DPs passthrough | Railscale ParsedData::Passthrough vs Parsed pattern |
| Metadata | MandoDataPointConfig + update_info as JSON in FlightData.app_metadata | Single do_get() call |
| Parallelism | Server-side fan-out, multiplexed stream with dp_id column | Client sends one request |
| Auth | None initially | Simplify v1 |
| Port | 50051 (configurable FLIGHT_PORT), tokio::join! with Axum on 8080 | Standard gRPC convention |
| Feature gate | #[cfg(feature = "flight")] | Default build unchanged |
| Error pattern | thiserror + mando_core::error!() | Matches BE-2023 patterns |
Architecture
Current Flow (Before)
DuckDB stmt.query_arrow()
-> collect into Vec<DataFrame> -> single DataFrame
-> convert_output_fields()
-> handle_data_point_types()
-> REST formatters: HashMap<DataPointId, DataFrame>
Problem
Every query materializes the entire result. Peak memory = full result x2 (Arrow batches + DataFrame copy).
MandoDataPointConfigis computed but discarded before response.
Target Flow (After)
DuckDB stmt.stream_arrow()
-> DataPointStream (async RecordBatch stream via bounded mpsc)
-> batch transforms: strip columns, convert timezone
-> StreamingMandoService.retrieve_stream()
Normal DPs: zero-copy streaming, yield after topo sort
Virtual DPs: materialize -> resample -> re-stream
Calculated DPs: materialize deps -> eval -> re-stream
-> Flight do_get(): yield as FlightData
Components
DataPointStream (mando-lib/src/stream.rs)
Core streaming abstraction. See Railscale Pattern.
Constructors: from_channel, from_duckdb, from_batches, empty
Combinators: map_batches, chain, collect_dataframe
Deps: tokio-stream, tokio-util
StreamingRepository (mando-lib/src/repo.rs)
Parallel to existing Repository trait:
retrieve_stream/retrieve_original_stream/retrieve_override_stream- Returns
DataPointStreaminstead ofDataFrame - DuckDB impl reuses existing SQL-building, swaps
query_arrowforstream_arrow
StreamingMandoService (mando-lib/src/service.rs)
retrieve_catalog()+retrieve_stream()retrieve_stream()returnsStream<Item = ResolvedDataPointStream>following the three-tier streaming model: Normal DPs yield zero-copy after topo sort, Virtual DPs materialize then re-stream after resampling, Calculated DPs materialize all deps then eval and re-streamResolvedDataPointStreambundlesDataPointId,MandoDataPointConfig,DataPointStream,Vec<DataPointUpdateInfo>- Existing
MandoServicestays untouched
MandoFlightService (mando-bess/src/flight.rs)
Implements arrow_flight::FlightService:
do_get: deserialize ticket, flat-map outer stream, injectdp_id, encode — data flows while later DPs still resolvinglist_flights: catalog asFlightInfoentriesget_flight_info: schema + metadata per DP- All others:
Unimplemented
StreamError
#[derive(Error, Debug)]
pub enum StreamError {
Repo(RepoError),
Arrow(ArrowError),
Polars(PolarsError),
Channel(String),
}Railscale Pattern
Borrowed Pattern
From the railscale HTTP proxy’s
ParsedData::PassthroughvsParsedData::Parsedenum. Pure data flows zero-copy, only frames needing transformation get materialized.
Applied here: Normal/Virtual DPs stream through combinators without allocation. Calculated DPs collect_dataframe() their dependency streams, evaluate expressions, then from_batches() re-streams the result.
Files
New: mando-lib/src/stream.rs, mando-bess/src/flight.rs
Modified: workspace Cargo.toml, mando-lib, mando-bess, repo.rs, service.rs
Unchanged: All REST endpoints, MandoService, Repository, data models, DuckDB schema, formatters
Testing
- Unit: DataPointStream ops, StreamingRepository vs in-memory DuckDB, ticket/metadata serialization
- Integration: FlightServiceClient end-to-end, multi-DP batch, calculated DP, all query modes
Previous Work
- Branch
andras/BE-1595, commit31f50b3e(abandoned) - Design docs:
docs/plans/2026-03-05-arrow-streaming-migration-design.md - Key lesson:
ArrowStreamborrows stmt → needspawn_blocking+ channel
Validation Findings (2026-04-15)
Architecture validation revealed important constraints:
Three-Tier Streaming Model
Not all DP types stream end-to-end
Normal DPs get true zero-copy streaming. Virtual and Calculated DPs must materialize then re-stream.
| DP Type | RecordBatch streaming | Per-DP streaming | Why |
|---|---|---|---|
| Normal | Full zero-copy from DuckDB | Yields after topo sort | No transforms need full data |
| Virtual | Materialize → resample → re-stream | After materialization | convert_to_metadata() needs full value_time range for resampling |
| Calculated | Materialize deps → eval → re-stream | After all deps complete | Expression eval uses joins/unions requiring all deps |
Confirmed Clean
- Arrow types: DuckDB and arrow-flight use same
arrow::RecordBatchfrom apache-arrow-rs 56.2.0 — zero conversion - Repository: SQL builders shared, convert_output_fields per-batch compatible, same generics work
- Service: MandoService is Clone + Arc-safe for sharing between Axum and Flight
Risks
Tonic Version Conflict
opentelemetry-otlp 0.30.0 pulls tonic transitively. May conflict with tonic 0.13.1. Must verify cargo resolution before implementation.
Design Corrections
- Topological sort must complete before any DP retrieval (Kahn’s algorithm needs full graph)
- Virtual DPs need full DataFrame for resolution conversion — changed from “map_batches streaming” to materialize-then-restream
- Decided against partial buffering for Virtual/Calculated DPs in v1 — complexity too high for marginal gains on minority of queries