Arrow Flight Streaming Design

End-to-end Arrow RecordBatch streaming from DuckDB to client via gRPC Flight, with 1:1 API compatibility to current REST endpoints plus MandoDataPointConfig metadata.

Base Branch

feature/BE-2023 — uses centralized error logging and error code enforcement patterns.

Design Decisions

DecisionChoiceRationale
Streaming coreDataPointStream wrapping async Stream<Item=Result<RecordBatch, StreamError>>Async-native, composable
DuckDB bridgespawn_blocking + bounded mpsc(8), using stream_arrow()ArrowStream borrows stmt, proven in railscale
Calculated DPsSelective materialization — pure DPs passthroughRailscale ParsedData::Passthrough vs Parsed pattern
MetadataMandoDataPointConfig + update_info as JSON in FlightData.app_metadataSingle do_get() call
ParallelismServer-side fan-out, multiplexed stream with dp_id columnClient sends one request
AuthNone initiallySimplify v1
Port50051 (configurable FLIGHT_PORT), tokio::join! with Axum on 8080Standard gRPC convention
Feature gate#[cfg(feature = "flight")]Default build unchanged
Error patternthiserror + mando_core::error!()Matches BE-2023 patterns

Architecture

Current Flow (Before)

DuckDB stmt.query_arrow()
    -> collect into Vec<DataFrame> -> single DataFrame
    -> convert_output_fields()
    -> handle_data_point_types()
    -> REST formatters: HashMap<DataPointId, DataFrame>

Problem

Every query materializes the entire result. Peak memory = full result x2 (Arrow batches + DataFrame copy). MandoDataPointConfig is computed but discarded before response.

Target Flow (After)

DuckDB stmt.stream_arrow()
    -> DataPointStream (async RecordBatch stream via bounded mpsc)
    -> batch transforms: strip columns, convert timezone
    -> StreamingMandoService.retrieve_stream()
        Normal DPs:      zero-copy streaming, yield after topo sort
        Virtual DPs:     materialize -> resample -> re-stream
        Calculated DPs:  materialize deps -> eval -> re-stream
    -> Flight do_get(): yield as FlightData

Components

DataPointStream (mando-lib/src/stream.rs)

Core streaming abstraction. See Railscale Pattern.

Constructors: from_channel, from_duckdb, from_batches, empty Combinators: map_batches, chain, collect_dataframe Deps: tokio-stream, tokio-util

StreamingRepository (mando-lib/src/repo.rs)

Parallel to existing Repository trait:

  • retrieve_stream / retrieve_original_stream / retrieve_override_stream
  • Returns DataPointStream instead of DataFrame
  • DuckDB impl reuses existing SQL-building, swaps query_arrow for stream_arrow

StreamingMandoService (mando-lib/src/service.rs)

  • retrieve_catalog() + retrieve_stream()
  • retrieve_stream() returns Stream<Item = ResolvedDataPointStream> following the three-tier streaming model: Normal DPs yield zero-copy after topo sort, Virtual DPs materialize then re-stream after resampling, Calculated DPs materialize all deps then eval and re-stream
  • ResolvedDataPointStream bundles DataPointId, MandoDataPointConfig, DataPointStream, Vec<DataPointUpdateInfo>
  • Existing MandoService stays untouched

MandoFlightService (mando-bess/src/flight.rs)

Implements arrow_flight::FlightService:

  • do_get: deserialize ticket, flat-map outer stream, inject dp_id, encode — data flows while later DPs still resolving
  • list_flights: catalog as FlightInfo entries
  • get_flight_info: schema + metadata per DP
  • All others: Unimplemented

StreamError

#[derive(Error, Debug)]
pub enum StreamError {
    Repo(RepoError),
    Arrow(ArrowError),
    Polars(PolarsError),
    Channel(String),
}

Railscale Pattern

Borrowed Pattern

From the railscale HTTP proxy’s ParsedData::Passthrough vs ParsedData::Parsed enum. Pure data flows zero-copy, only frames needing transformation get materialized.

Applied here: Normal/Virtual DPs stream through combinators without allocation. Calculated DPs collect_dataframe() their dependency streams, evaluate expressions, then from_batches() re-streams the result.

Files

New: mando-lib/src/stream.rs, mando-bess/src/flight.rs Modified: workspace Cargo.toml, mando-lib, mando-bess, repo.rs, service.rs Unchanged: All REST endpoints, MandoService, Repository, data models, DuckDB schema, formatters

Testing

  • Unit: DataPointStream ops, StreamingRepository vs in-memory DuckDB, ticket/metadata serialization
  • Integration: FlightServiceClient end-to-end, multi-DP batch, calculated DP, all query modes

Previous Work

  • Branch andras/BE-1595, commit 31f50b3e (abandoned)
  • Design docs: docs/plans/2026-03-05-arrow-streaming-migration-design.md
  • Key lesson: ArrowStream borrows stmt need spawn_blocking + channel

Validation Findings (2026-04-15)

Architecture validation revealed important constraints:

Three-Tier Streaming Model

Not all DP types stream end-to-end

Normal DPs get true zero-copy streaming. Virtual and Calculated DPs must materialize then re-stream.

DP TypeRecordBatch streamingPer-DP streamingWhy
NormalFull zero-copy from DuckDBYields after topo sortNo transforms need full data
VirtualMaterialize resample re-streamAfter materializationconvert_to_metadata() needs full value_time range for resampling
CalculatedMaterialize deps eval re-streamAfter all deps completeExpression eval uses joins/unions requiring all deps

Confirmed Clean

  • Arrow types: DuckDB and arrow-flight use same arrow::RecordBatch from apache-arrow-rs 56.2.0 — zero conversion
  • Repository: SQL builders shared, convert_output_fields per-batch compatible, same generics work
  • Service: MandoService is Clone + Arc-safe for sharing between Axum and Flight

Risks

Tonic Version Conflict

opentelemetry-otlp 0.30.0 pulls tonic transitively. May conflict with tonic 0.13.1. Must verify cargo resolution before implementation.

Design Corrections

  • Topological sort must complete before any DP retrieval (Kahn’s algorithm needs full graph)
  • Virtual DPs need full DataFrame for resolution conversion — changed from “map_batches streaming” to materialize-then-restream
  • Decided against partial buffering for Virtual/Calculated DPs in v1 — complexity too high for marginal gains on minority of queries

See Also