Connectors at a Glance
An overview of how Fluxion connectors are modeled, packaged, and executed—aimed to mirror the clarity of the Camunda connector docs.
1) What a connector is
- Inbound runtimes (triggers): polling, webhook, streaming, timer. They emit events into pipelines.
- Outbound runtimes (actions): HTTP/JavaBean/pipeline-call actions push data out or invoke downstream work.
- Transports vs. runtimes: transports encapsulate I/O (Kafka, HTTP, custom). Runtimes orchestrate how that transport is executed.
2) Authoring models
| Mode | When to use | How it works |
|---|---|---|
| Manifest-first | Declarative/config-as-code; classpath manifests | JSON manifest (operations, execution blocks). ManifestConnectorDispatcher runs actions/triggers from the manifest. |
| SDK/SPI-first | Java services needing programmatic control | Implement SourceConnectorProvider / SinkConnectorProvider, register via ServiceLoader, then instantiate via ConnectorRegistry/ConnectorFactory. |
Both resolve to the same providers; you can ship default manifests inside the connector jar and still expose the SPI.
3) Execution types (renamed)
execution.type |
Kind | Use |
|---|---|---|
http |
action | Call external HTTP APIs with retry/CB. |
javaBean |
action/trigger | Invoke app-provided beans for custom logic. |
pipelineCall |
action | Call another pipeline/version. |
polling |
trigger | Scheduled pulls from systems that don’t push. |
webhook |
trigger | Listen on an HTTP endpoint and emit payloads. |
streaming |
trigger | Long-lived source→sink streaming (e.g., Kafka→HTTP sink). |
timer |
trigger | Cron/interval ticks. |
4) Package layout (example)
my-connector/
src/main/java/.../MySourceProvider.java # implements SourceConnectorProvider
src/main/java/.../MySinkProvider.java # implements SinkConnectorProvider
src/main/resources/META-INF/services/
ai.fluxion.core.engine.connectors.SourceConnectorProvider
ai.fluxion.core.engine.connectors.SinkConnectorProvider
src/main/resources/manifests/
my-connector.json # optional manifest(s)
- Providers define descriptor + option schema.
- Options are validated by
ConnectorRegistryand exposed to UIs/CLIs. - Manifests can be bundled for drop-in defaults.
5) Runtime flow
- Discovery:
ConnectorRegistryloads providers viaServiceLoader. - Validation:
SourceConnectorConfig/ConnectorConfigvalidate options against provider schemas. - Execution: streaming uses
StreamingPipelineExecutor(source→stages→sink with backpressure); actions useManifestConnectorDispatcher(HTTP, bean, pipelineCall, webhook, polling, streaming triggers). - Operators: enrichment operators (
$httpCall,$sqlQuery, etc.) resolve connection refs to these connectors; streaming pipelines wire sources/sinks bytype.
6) Resilience, security, observability
- Resilience: HTTP/Kafka sinks accept retry/circuit-breaker instance names
(
retry/circuitBreakeror viaconnectionRef). Registries come from the Spring Boot starter or manual wiring. - Security: Keep credentials out of manifests; inject them via connection
instances and
ConnectorContext::resolveSecret. - Observability:
ConnectorLogger,ConnectorMeterRegistry,ConnectorTracerflow through contexts—wire them to your logging/metrics/ tracing backends.
7) Streaming shape and chaining rules
connectionRefis required on streamingsourceandsinkblocks (they pull transport config + resilience from the registry).- Process block (supported): a single top-level
processobject (type: pipeline|pipelineCall) runs before the sink chain. Its output flows into the sink/fanout. - Fanout (parallel delivery) uses
fanouton the sink block; calls are synchronous/serial today. - Pipeline chaining uses
type:"pipeline"sinks withnext(single sink or list for fanout after the pipeline).nextis honored only on pipeline sinks; usefanoutfor non-pipeline sinks.
Examples:
// Fanout without a pipeline
"sink": {
"fanout": [
{ "type": "http", "connectionRef": "http-a" },
{ "type": "kafka", "connectionRef": "kafka-b" }
]
}
// Pipeline, then fanout to HTTP/Kafka
"sink": {
"type": "pipeline",
"pipeline": "enrich-orders",
"next": [
{ "type": "http", "connectionRef": "http-a" },
{ "type": "kafka", "connectionRef": "kafka-b" }
]
}
// Top-level process (single object), then pipeline sink with next
"process": { "type": "pipeline", "pipeline": "normalize", "version": "1.0.0" },
"sink": {
"type": "pipeline",
"pipeline": "route",
"next": { "fanout": [
{ "type": "http", "connectionRef": "http-a" },
{ "type": "kafka", "connectionRef": "kafka-b" }
] }
}
8) Quick starts
- Manifest action (HTTP): place a manifest under
resources/manifests, thennew ManifestConnectorDispatcher().executeAction(manifest, "op", ctx, body). - Manifest streaming:
execution.type=streamingwithconnectionRefon source/sink (e.g., Kafka → HTTP sink).startTrigger(...)returns aFlux. - SDK streaming: build configs, then:
StreamingSource source = ConnectorFactory.createSource(sourceCfg, SourceConnectorContext.from(new StreamingContext()));
StreamingSink sink = ConnectorFactory.createSink(sinkCfg);
new StreamingPipelineExecutor().processStream(source, List.of(), sink, new StreamingContext());
9) Choosing a model
- Pick manifest-first for low-code/operator consoles and tenant-level customization without rebuilds.
- Pick SDK-first when embedding in services, needing dynamic secrets, or building bespoke transports.
- Mix them when you want default manifests shipped with your connector jar but still expose the SPI for power users.
10) Execution examples (manifest + SDK)
HTTP action (manifest)
{
"operations": { "call": "call" },
"operationDefs": {
"call": {
"kind": "action",
"execution": { "type": "http", "method": "POST", "urlTemplate": "https://api.example.com/orders" }
}
}
}
SDK
Object out = dispatcher.executeAction(manifest, "call", ctx, Map.of("orderId","A-1"));
JavaBean action/trigger (manifest)
"execution": { "type": "javaBean", "beanName": "myHandler" }
SDK
dispatcher.registerActionHandler("myHandler", (c, in) -> Map.of("ok", true));
dispatcher.executeAction(manifest, "myOp", ctx, Map.of());
Pipeline call (manifest)
"execution": { "type": "pipelineCall", "targetPipeline": "orders", "version": "v1" }
Polling trigger (manifest)
"execution": { "type": "polling", "intervalMillis": 5000, "handlerBean": "poller" }
SDK
dispatcher.registerTriggerHandler("poller", (c, cfg) -> Flux.interval(Duration.ofSeconds(5)).map(i -> Map.of("tick", i)));
Webhook trigger (manifest)
"execution": { "type": "webhook", "path": "/events", "method": "POST" }
SDK
Flux<Map<String,Object>> flux = dispatcher.startTrigger(manifest, "hookOp", ctx, Map.of("port", 8080));
Streaming trigger (manifest)
"execution": {
"type": "streaming",
"stream": "orders",
"source": { "type": "kafka", "connectionRef": "kafka-orders" },
"sink": { "type": "http", "connectionRef": "http-sink" }
}
Streaming with a pre-sink process (pipeline sink)
"execution": {
"type": "streaming",
"stream": "orders",
"sink": {
"type": "pipeline",
"pipeline": "enrich-orders",
"next": {
"fanout": [
{ "type": "http", "connectionRef": "http-a" },
{ "type": "kafka", "connectionRef": "kafka-b" }
]
}
}
}
SDK
SourceConnectorConfig src = SourceConnectorConfig.builder("kafka")
.option("bootstrapServers","localhost:9092").option("topic","orders").build();
ConnectorConfig sink = ConnectorConfig.builder("http", ConnectorConfig.Kind.SINK)
.option("endpoint","http://localhost:8081/ingest").build();
StreamingSource source = ConnectorFactory.createSource(src, SourceConnectorContext.from(new StreamingContext()));
StreamingSink httpSink = ConnectorFactory.createSink(sink);
new StreamingPipelineExecutor().processStream(source, List.of(), httpSink, new StreamingContext());
Timer trigger (manifest)
"execution": { "type": "timer", "cron": "PT30S" }
Pipeline invoker & resilience (Spring Boot starter)
- The starter auto-registers a PipelineCallInvoker that POSTs to the pipeline-service endpoint /api/pipelines/{name}/{version}:run. It is used by:
- execution.type=pipelineCall actions
- Streaming sinks with type=pipeline / pipelineCall
- Top-level streaming process (single object: type=pipeline|pipelineCall)
- Configure target service, timeout, and Resilience4j policies:
fluxion:
connect:
pipeline:
base-url: http://fluxion-pipeline-service:8085
timeout-ms: 10000
resilience4j:
retry:
instances:
pipeline-service:
max-attempts: 3
wait-duration: 500ms
circuitbreaker:
instances:
pipeline-service:
failure-rate-threshold: 50
wait-duration-in-open-state: 5s
permitted-number-of-calls-in-half-open-state: 3
sliding-window-size: 10
- Inputs are sent as
documentin the POST body; the pipeline response (Map/Document/list) is forwarded to the next sink or returned to the caller. - You can also load connection instances (YAML/JSON) into the registry at startup so
connectionRefvalues resolve without code changes; use connector-specific config such asurlTemplate,headers,bootstrapServers,topicscoped by tenant/pipeline.
10) Built-in connectors & supported executions
| Connector type | Module | Supports | Execution usage |
|---|---|---|---|
HTTP sink (type=http) |
fluxion-connect | Action (http), Streaming sink | execution.type=http for actions; in streaming manifests use sink.type=http to post batches. |
Kafka (type=kafka) |
fluxion-connect-kafka | Streaming source & sink | execution.type=streaming with source.type=kafka and/or sink.type=kafka. |
| JavaBean | dispatcher (handlers) | Action/trigger | execution.type=javaBean with beanName; register handlers in dispatcher. |
| Pipeline call | dispatcher | Action | execution.type=pipelineCall targeting another pipeline. |
| Webhook | dispatcher | Trigger | execution.type=webhook to start an HTTP listener and emit payloads. |
| Polling | dispatcher | Trigger | execution.type=polling interval or handler bean. |
| Timer | dispatcher | Trigger | execution.type=timer with cron/ISO-8601 duration. |
11) Next steps
- Module overview
- Manifest + SDK format
- Connector specifics: Kafka
- Developer guide