Streaming Engine Quick Start
This walkthrough demonstrates how to stand up a simple streaming pipeline that ingests events from Kafka, applies a SrotaX Core aggregation, and delivers the results to an HTTP endpoint.
Prerequisites
| Requirement | Notes |
|---|---|
| SrotaX modules | fluxion-core, fluxion-connect (includes enrichment operators). |
| Kafka cluster | Bootstrap servers + topic for ingest. |
| Runtime | Java 21+ (examples use records, builders, switch expressions). |
| State store | Implementation of StateStore for offsets (examples use the in-memory store). |
1. Define the aggregation stages
Create the same JSON/DSL snippet you would run inside the Rule Engine. The
Streaming Engine executes identical stages, so behavioural parity is guaranteed.
Persisting or forwarding the results happens through the sink, not a $merge
stage.
{
"pipeline": [
{"$match": {"status": "PAID"}},
{"$group": {
"_id": "$customerId",
"orderCount": {"$count": {}},
"lifetimeValue": {"$sum": "$orderValue"}
}}
]
}
2. Wire up source and sink connectors
Using built-in providers (Kafka → HTTP)
SourceConnectorConfig sourceConfig = SourceConnectorConfig.builder("kafka")
.option("topic", "orders.v1")
.option("bootstrapServers", System.getenv("KAFKA_BOOTSTRAP"))
.option("groupId", "orders-ltv")
.build();
ConnectorConfig sinkConfig = ConnectorConfig.builder("http", ConnectorConfig.Kind.SINK)
.option("endpoint", System.getenv("ORDERS_HTTP_ENDPOINT"))
.option("allowEmpty", false)
.build();
endpoint can be swapped for connectionRef if you preload HTTP connections via
ConnectorRegistryInitializer.
Using manifest/SDK-driven connectors (custom source/sink)
Implement SourceConnectorProvider/SinkConnectorProvider, register via ServiceLoader, and reference by type in YAML/JSON:
source:
type: myCustomSource
options:
apiKey: ${API_KEY}
endpoint: https://api.example.com/events
sink:
type: myCustomSink
options:
queue: payments
In code, load the configs and let the registry resolve them to connectors:
SourceConnectorConfig sourceConfig =
SourceConnectorConfig.builder("myCustomSource")
.option("apiKey", System.getenv("API_KEY"))
.option("endpoint", "https://api.example.com/events")
.build();
ConnectorConfig sinkConfig =
ConnectorConfig.builder("myCustomSink", ConnectorConfig.Kind.SINK)
.option("queue", "payments")
.build();
Connector providers in fluxion-connect resolve these configs into runtime
StreamingSource and StreamingSink instances. Swapping connectors only requires changing the config payload.
3. Configure the orchestrator
StageMetrics metrics = new StageMetrics();
StreamingRuntimeConfig runtimeConfig =
StreamingRuntimeConfig.builder()
.microBatchSize(500)
.queueCapacity(2_048)
.sourceQueueCapacity(32)
.workerThreadPoolSize(8)
.build();
StageMetrics captures per-stage timings and counters. The runtime config tunes
batching, threading, and queue capacities.
4. Run the pipeline
StreamingPipelineDefinition definition =
StreamingPipelineDefinition.builder(sourceConfig)
.stages(JsonStageLoader.load("rules/orders-ltv.json"))
.sinkConfig(sinkConfig)
.pipelineId("orders-ltv")
.runtimeConfig(runtimeConfig)
.metrics(metrics)
.stateStore(new InMemoryStateStore())
.build();
StreamingPipelineHandle handle = new StreamingPipelineOrchestrator().run(definition);
The orchestrator handles the execution loop until the source signals completion
or a fatal error policy triggers a shutdown. handle.metrics() returns the same
StageMetrics instance registered in MetricsRegistry.
5. Observe and iterate
- Use the built-in stage metrics (
StageMetrics+MetricsRegistry) to track lag, throughput, and error rates. - State stores allow controlled restarts by persisting offsets/cursors.
- Adjust error policies to determine whether a failure retries, skips, or routes events to a dead-letter queue.
Next steps
- Explore additional sink/source combos in the SrotaX Connect section.
- Add enrichment by dropping
$httpCallor$sqlQueryoperators (packaged influxion-connect) into your stages. - Harden the pipeline with the operational guides on resilience, metrics, and deployment in production environments.
- Dive into advanced topics for error policies, observability, and deployment in the Streaming Engine overview.
- Review which aggregation stages fit streaming versus batch workloads in the Stage Support Matrix.
- Run the streaming module tests with
mvn -pl fluxion-connect -am testto validate connectors and executors before deploying. - Try the runnable demo in
fluxion-samples(streaming-kafkafor Kafka source + HTTP sink).