Streaming

Streams provide continuous data flow from Rust to target languages. Unlike async functions that return a single value, streams deliver multiple values over time. BoltFFI generates native stream types that integrate with each platform’s concurrency model, and handles the underlying buffering and synchronization.

The ffi_stream Attribute

Mark a method with #[ffi_stream] to expose it as a stream. The attribute requires an item parameter specifying the type of data that flows through the stream, and accepts an optional mode parameter to control how consumers receive events.

#[ffi_stream(item = Reading)]                    // item type is Reading, async mode (default)
#[ffi_stream(item = Reading, mode = "async")]    // explicit async mode
#[ffi_stream(item = Reading, mode = "callback")] // callback mode
#[ffi_stream(item = Reading, mode = "batch")]    // batch mode

The item type must be a type that can cross the FFI boundary - primitives, records marked with #[data], or other supported types. The method must return Arc<EventSubscription<T>> where T matches the item type.

Stream Modes

BoltFFI supports three consumption modes. Each mode generates different bindings suited to different use cases.

Async Mode

The default mode. Generates AsyncStream in Swift, Flow in Kotlin, and a callback-driven StreamSubscription<T> in Java. Cancellation propagates automatically.

RustSource
#[ffi_stream(item = Reading)]
pub fn readings(&self) -> Arc<EventSubscription<Reading>> {
  Arc::clone(&self.subscription)
}
public func readings() -> AsyncStream<Reading>

for await reading in sensor.readings() {
  print("Value: \(reading.value)")
}

Callback Mode

Generates a method that takes a callback and returns a cancellable handle.

RustSource
#[ffi_stream(item = Reading, mode = "callback")]
pub fn readings(&self) -> Arc<EventSubscription<Reading>> {
  Arc::clone(&self.subscription)
}
public func readings(
  callback: @escaping (Reading) -> Void
) -> StreamSubscription<Reading>

let sub = sensor.readings { reading in
  print("Value: \(reading.value)")
}
// later
sub.cancel()

Batch Mode

Generates a subscription object that lets consumers pull batches of events on their own schedule.

RustSource
#[ffi_stream(item = Reading, mode = "batch")]
pub fn readings(&self) -> Arc<EventSubscription<Reading>> {
  Arc::clone(&self.subscription)
}
public func readings()
  -> StreamSubscription<Reading>

let sub = sensor.readings()
let batch = sub.popBatch(maxCount: 100)
for reading in batch {
  process(reading)
}

Creating Streams

Streams are created using EventSubscription or StreamProducer. The choice depends on whether you need single or multiple subscribers.

EventSubscription

EventSubscription creates an independent subscription per call. Each subscriber gets its own buffer and receives all events pushed after subscribing.

RustSource
use boltffi::EventSubscription;
use std::sync::Arc;

pub struct Sensor {
  subscription: Arc<EventSubscription<Reading>>,
}

#[export]
impl Sensor {
  pub fn new() -> Self {
      Sensor {
          subscription: Arc::new(
              EventSubscription::new(256)
          ),
      }
  }
  
  #[ffi_stream(item = Reading)]
  pub fn readings(&self)
      -> Arc<EventSubscription<Reading>>
  {
      Arc::clone(&self.subscription)
  }
  
  pub fn emit(&self, value: f64) {
      self.subscription.push_event(Reading {
          value,
          timestamp: current_time_ms(),
      });
  }
}
public class Sensor {
  public init()
  public func readings() -> AsyncStream<Reading>
}

let sensor = Sensor()
for await reading in sensor.readings() {
  print(reading.value)
}

StreamProducer

StreamProducer broadcasts events to multiple subscribers. Each subscriber gets its own buffer, and pushing an event delivers it to all active subscribers.

RustSource
use boltffi::StreamProducer;

pub struct EventBus {
  producer: StreamProducer<Event>,
}

#[export]
impl EventBus {
  pub fn new() -> Self {
      EventBus {
          producer: StreamProducer::new(256),
      }
  }
  
  #[ffi_stream(item = Event)]
  pub fn events(&self)
      -> Arc<EventSubscription<Event>>
  {
      self.producer.subscribe()
  }
  
  pub fn emit(&self, event: Event) {
      self.producer.push(event);
  }
}
let bus = EventBus()

Task {
  for await event in bus.events() {
      print("Sub 1: \(event)")
  }
}

Task {
  for await event in bus.events() {
      print("Sub 2: \(event)")
  }
}

Buffer Capacity

Each subscription has a ring buffer that holds events until the consumer processes them. The default capacity is 256 items. For high-frequency streams, increase the capacity to avoid dropping events when the consumer falls behind.

// Default capacity (256)
EventSubscription::new(256)

// Larger buffer for high-frequency data
EventSubscription::new(4096)

// StreamProducer with custom capacity
StreamProducer::new(4096)

When the buffer is full, new events are dropped. The producer continues without blocking.

Stopping Streams

Streams can be stopped from either side. The producer can complete the stream, or the consumer can cancel their subscription.

Producer-side completion

Call unsubscribe() on the subscription to signal that no more events will be sent. Active consumers receive a completion signal.

impl Sensor {
    pub fn stop(&self) {
        self.subscription.unsubscribe();
    }
}

Consumer-side cancellation

In async mode, cancelling the task or breaking out of the loop cancels the subscription. In callback mode, call cancel() on the returned handle.

RustSource
// Async mode - cancel the task
let task = Task {
  for await reading in sensor.readings() {
      if reading.value > threshold {
          break
      }
  }
}
task.cancel()

// Callback mode
let sub = sensor.readings { reading in
  process(reading)
}
sub.cancel()

How It Works

Streams use a continuation-based polling mechanism similar to async functions. Each subscription has a lock-free ring buffer for events and a scheduler that coordinates between the producer and consumer. When events are pushed, the scheduler wakes any parked continuation. The consumer polls the subscription, and when events are available, they’re delivered in batches for efficiency. The entire hot path is lock-free, using atomic operations for state management.