Skip to content

Connectors at a Glance

An overview of how Fluxion connectors are modeled, packaged, and executed—aimed to mirror the clarity of the Camunda connector docs.


1) What a connector is

  • Inbound runtimes (triggers): polling, webhook, streaming, timer. They emit events into pipelines.
  • Outbound runtimes (actions): HTTP/JavaBean/pipeline-call actions push data out or invoke downstream work.
  • Transports vs. runtimes: transports encapsulate I/O (Kafka, HTTP, custom). Runtimes orchestrate how that transport is executed.

2) Authoring models

Mode When to use How it works
Manifest-first Declarative/config-as-code; classpath manifests JSON manifest (operations, execution blocks). ManifestConnectorDispatcher runs actions/triggers from the manifest.
SDK/SPI-first Java services needing programmatic control Implement SourceConnectorProvider / SinkConnectorProvider, register via ServiceLoader, then instantiate via ConnectorRegistry/ConnectorFactory.

Both resolve to the same providers; you can ship default manifests inside the connector jar and still expose the SPI.


3) Execution types (renamed)

execution.type Kind Use
http action Call external HTTP APIs with retry/CB.
javaBean action/trigger Invoke app-provided beans for custom logic.
pipelineCall action Call another pipeline/version.
polling trigger Scheduled pulls from systems that don’t push.
webhook trigger Listen on an HTTP endpoint and emit payloads.
streaming trigger Long-lived source→sink streaming (e.g., Kafka→HTTP sink).
timer trigger Cron/interval ticks.

4) Package layout (example)

my-connector/
  src/main/java/.../MySourceProvider.java      # implements SourceConnectorProvider
  src/main/java/.../MySinkProvider.java        # implements SinkConnectorProvider
  src/main/resources/META-INF/services/
    ai.fluxion.core.engine.connectors.SourceConnectorProvider
    ai.fluxion.core.engine.connectors.SinkConnectorProvider
  src/main/resources/manifests/
    my-connector.json                          # optional manifest(s)
  • Providers define descriptor + option schema.
  • Options are validated by ConnectorRegistry and exposed to UIs/CLIs.
  • Manifests can be bundled for drop-in defaults.

5) Runtime flow

  1. Discovery: ConnectorRegistry loads providers via ServiceLoader.
  2. Validation: SourceConnectorConfig / ConnectorConfig validate options against provider schemas.
  3. Execution: streaming uses StreamingPipelineExecutor (source→stages→sink with backpressure); actions use ManifestConnectorDispatcher (HTTP, bean, pipelineCall, webhook, polling, streaming triggers).
  4. Operators: enrichment operators ($httpCall, $sqlQuery, etc.) resolve connection refs to these connectors; streaming pipelines wire sources/sinks by type.

6) Resilience, security, observability

  • Resilience: HTTP/Kafka sinks accept retry/circuit-breaker instance names (retry / circuitBreaker or via connectionRef). Registries come from the Spring Boot starter or manual wiring.
  • Security: Keep credentials out of manifests; inject them via connection instances and ConnectorContext::resolveSecret.
  • Observability: ConnectorLogger, ConnectorMeterRegistry, ConnectorTracer flow through contexts—wire them to your logging/metrics/ tracing backends.

7) Streaming shape and chaining rules

  • connectionRef is required on streaming source and sink blocks (they pull transport config + resilience from the registry).
  • Process block (supported): a single top-level process object (type: pipeline|pipelineCall) runs before the sink chain. Its output flows into the sink/fanout.
  • Fanout (parallel delivery) uses fanout on the sink block; calls are synchronous/serial today.
  • Pipeline chaining uses type:"pipeline" sinks with next (single sink or list for fanout after the pipeline). next is honored only on pipeline sinks; use fanout for non-pipeline sinks.

Examples:

// Fanout without a pipeline
"sink": {
  "fanout": [
    { "type": "http", "connectionRef": "http-a" },
    { "type": "kafka", "connectionRef": "kafka-b" }
  ]
}

// Pipeline, then fanout to HTTP/Kafka
"sink": {
  "type": "pipeline",
  "pipeline": "enrich-orders",
  "next": [
    { "type": "http", "connectionRef": "http-a" },
    { "type": "kafka", "connectionRef": "kafka-b" }
  ]
}

// Top-level process (single object), then pipeline sink with next
"process": { "type": "pipeline", "pipeline": "normalize", "version": "1.0.0" },
"sink": {
  "type": "pipeline",
  "pipeline": "route",
  "next": { "fanout": [
    { "type": "http", "connectionRef": "http-a" },
    { "type": "kafka", "connectionRef": "kafka-b" }
  ] }
}

8) Quick starts

  • Manifest action (HTTP): place a manifest under resources/manifests, then new ManifestConnectorDispatcher().executeAction(manifest, "op", ctx, body).
  • Manifest streaming: execution.type=streaming with connectionRef on source/sink (e.g., Kafka → HTTP sink). startTrigger(...) returns a Flux.
  • SDK streaming: build configs, then:
StreamingSource source = ConnectorFactory.createSource(sourceCfg, SourceConnectorContext.from(new StreamingContext()));
StreamingSink sink = ConnectorFactory.createSink(sinkCfg);
new StreamingPipelineExecutor().processStream(source, List.of(), sink, new StreamingContext());

9) Choosing a model

  • Pick manifest-first for low-code/operator consoles and tenant-level customization without rebuilds.
  • Pick SDK-first when embedding in services, needing dynamic secrets, or building bespoke transports.
  • Mix them when you want default manifests shipped with your connector jar but still expose the SPI for power users.

10) Execution examples (manifest + SDK)

HTTP action (manifest)

{
  "operations": { "call": "call" },
  "operationDefs": {
    "call": {
      "kind": "action",
      "execution": { "type": "http", "method": "POST", "urlTemplate": "https://api.example.com/orders" }
    }
  }
}

SDK

Object out = dispatcher.executeAction(manifest, "call", ctx, Map.of("orderId","A-1"));

JavaBean action/trigger (manifest)

"execution": { "type": "javaBean", "beanName": "myHandler" }

SDK

dispatcher.registerActionHandler("myHandler", (c, in) -> Map.of("ok", true));
dispatcher.executeAction(manifest, "myOp", ctx, Map.of());

Pipeline call (manifest)

"execution": { "type": "pipelineCall", "targetPipeline": "orders", "version": "v1" }

Polling trigger (manifest)

"execution": { "type": "polling", "intervalMillis": 5000, "handlerBean": "poller" }

SDK

dispatcher.registerTriggerHandler("poller", (c, cfg) -> Flux.interval(Duration.ofSeconds(5)).map(i -> Map.of("tick", i)));

Webhook trigger (manifest)

"execution": { "type": "webhook", "path": "/events", "method": "POST" }

SDK

Flux<Map<String,Object>> flux = dispatcher.startTrigger(manifest, "hookOp", ctx, Map.of("port", 8080));

Streaming trigger (manifest)

"execution": {
  "type": "streaming",
  "stream": "orders",
  "source": { "type": "kafka", "connectionRef": "kafka-orders" },
  "sink":   { "type": "http",  "connectionRef": "http-sink" }
}

Streaming with a pre-sink process (pipeline sink)

"execution": {
  "type": "streaming",
  "stream": "orders",
  "sink": {
    "type": "pipeline",
    "pipeline": "enrich-orders",
    "next": {
      "fanout": [
        { "type": "http", "connectionRef": "http-a" },
        { "type": "kafka", "connectionRef": "kafka-b" }
      ]
    }
  }
}

SDK

SourceConnectorConfig src = SourceConnectorConfig.builder("kafka")
    .option("bootstrapServers","localhost:9092").option("topic","orders").build();
ConnectorConfig sink = ConnectorConfig.builder("http", ConnectorConfig.Kind.SINK)
    .option("endpoint","http://localhost:8081/ingest").build();
StreamingSource source = ConnectorFactory.createSource(src, SourceConnectorContext.from(new StreamingContext()));
StreamingSink httpSink = ConnectorFactory.createSink(sink);
new StreamingPipelineExecutor().processStream(source, List.of(), httpSink, new StreamingContext());

Timer trigger (manifest)

"execution": { "type": "timer", "cron": "PT30S" }

Pipeline invoker & resilience (Spring Boot starter) - The starter auto-registers a PipelineCallInvoker that POSTs to the pipeline-service endpoint /api/pipelines/{name}/{version}:run. It is used by: - execution.type=pipelineCall actions - Streaming sinks with type=pipeline / pipelineCall - Top-level streaming process (single object: type=pipeline|pipelineCall) - Configure target service, timeout, and Resilience4j policies:

fluxion:
  connect:
    pipeline:
      base-url: http://fluxion-pipeline-service:8085
      timeout-ms: 10000
resilience4j:
  retry:
    instances:
      pipeline-service:
        max-attempts: 3
        wait-duration: 500ms
  circuitbreaker:
    instances:
      pipeline-service:
        failure-rate-threshold: 50
        wait-duration-in-open-state: 5s
        permitted-number-of-calls-in-half-open-state: 3
        sliding-window-size: 10
  • Inputs are sent as document in the POST body; the pipeline response (Map/Document/list) is forwarded to the next sink or returned to the caller.
  • You can also load connection instances (YAML/JSON) into the registry at startup so connectionRef values resolve without code changes; use connector-specific config such as urlTemplate, headers, bootstrapServers, topic scoped by tenant/pipeline.

10) Built-in connectors & supported executions

Connector type Module Supports Execution usage
HTTP sink (type=http) fluxion-connect Action (http), Streaming sink execution.type=http for actions; in streaming manifests use sink.type=http to post batches.
Kafka (type=kafka) fluxion-connect-kafka Streaming source & sink execution.type=streaming with source.type=kafka and/or sink.type=kafka.
JavaBean dispatcher (handlers) Action/trigger execution.type=javaBean with beanName; register handlers in dispatcher.
Pipeline call dispatcher Action execution.type=pipelineCall targeting another pipeline.
Webhook dispatcher Trigger execution.type=webhook to start an HTTP listener and emit payloads.
Polling dispatcher Trigger execution.type=polling interval or handler bean.
Timer dispatcher Trigger execution.type=timer with cron/ISO-8601 duration.

11) Next steps