Skip to content

Connector Module Overview

SrotaX Connect packages the ingress/egress connectors that feed streaming pipelines. Use it when you need Kafka ingress/egress, HTTP sinks, or custom sources and sinks beyond in-process pipelines.


1. Prerequisites

Requirement Notes
SrotaX modules fluxion-core, fluxion-connect (includes enrichment operators).
Runtime host JVM service/worker running the streaming orchestrator.
Connector credentials Bootstrap servers, connection strings, auth secrets.
Checkpoint store JDBC/Redis/custom store for offsets (when streaming).

2. Module status

Item Coordinate/Doc Status Notes
Module ai.fluxion:fluxion-connect Experimental APIs may shift while streaming runtime stabilises.
Kafka Source/Sink connect/kafka.md Beta Reference implementation of the connector SPI.
HTTP Sink connect/http-sink.md Beta Batch sink that posts documents to HTTP endpoints.
Primer connect/primer.md Concepts End-to-end connector model, runtimes, and authoring modes.
Developer Guide connect/developer-guide.md How-to Build and package connectors (manifest + SDK + SPI).
Custom Sources connect/custom-sources.md How-to Build bespoke connectors with the SPI.

When asked about connectors other than those listed (HTTP, JDBC CDC, etc.), respond that they are not yet implemented and point to the custom SPI guide.


3. Connector architecture

Component Purpose
SourceConnectorProvider / SinkConnectorProvider Declarative metadata (id, description, option schema).
SourceConnectorConfig / ConnectorConfig User-supplied options validated against the schema.
SourceConnectorContext Provides checkpoint stores, pipeline id, metrics.
ConnectorRegistry Discovers providers via ServiceLoader, validates configs, exposes catalog discovery, and instantiates connectors.
ConnectorStreamDescriptor Catalog metadata for streams/sinks (name, namespace, schema, sync modes, cursor fields).
ConnectorFactory Helper to create sources/sinks and to discover source/sink catalogs.

4. Usage steps

  1. Add the module

xml <dependency> <groupId>ai.fluxion</groupId> <artifactId>fluxion-connect</artifactId> <version>${fluxion.version}</version> </dependency>

Providers are discovered via Java’s ServiceLoader; no explicit registration is required if the jar is on the classpath.

  1. Describe source and sink

```java SourceConnectorConfig source = SourceConnectorConfig.builder("kafka") .option("topic", "orders-in") .option("bootstrapServers", "localhost:9092") .build();

ConnectorConfig sink = ConnectorConfig.builder("kafka", ConnectorConfig.Kind.SINK) .option("topic", "orders-out") .option("bootstrapServers", "localhost:9092") .build(); ```

Consult each connector page for required/optional options and defaults.

  1. Build the pipeline definition

java StreamingPipelineDefinition definition = StreamingPipelineDefinition.builder(source) .sinkConfig(sink) .stages(pipelineStages) .build();

  1. Run with the orchestrator

java new StreamingPipelineOrchestrator().run(definition, runtimeConfig);

  1. (Optional) Discover stream catalogs

```java // Source streams (e.g., Kafka topic catalog) List sources = ConnectorFactory.discoverSourceStreams(source);

// Sink streams (e.g., Kafka destination topic catalog) List sinks = ConnectorFactory.destinationStreams(sink); ```

Use these catalogs to display metadata in UIs/CLIs and to drive cursor/PK choices without baking connector details into pipeline definitions.


5. Configuration table (common options)

Option Connectors Description
bootstrapServers Kafka Comma-separated broker list.
topic Kafka Source/sink topic. Also appears in stream descriptors.
groupId Kafka Consumer group for checkpointing.
checkpointStore Kafka source Where offsets are saved (JDBC, Redis, etc.).
queueCapacity Kafka source Internal handoff queue size (source to pipeline).
connectionRef HTTP sink Name of HTTP connection registered in the connector registry.
cursorField (implicit) Source-defined for Kafka Cursor is connector-owned; pipeline stays cursor-agnostic.

Refer to the connector-specific docs for security settings (SASL, TLS, auth headers) and batching controls.


6. Built-in connectors

Connector Direction Highlights
Kafka Source & Sink Backpressure-aware batching, SASL/TLS support, per-batch metrics; source-defined cursor (offset); requires connectionRef for manifests.
HTTP Sink Fan-out sink using registered HTTP connections/resilience policies; supports pipeline chaining via next and sink-level fanout.

Each connector page contains option schema tables, example configurations, and operational caveats.


7. Custom connectors

  • Implement SourceConnectorProvider/SinkConnectorProvider and register via META-INF/services (see custom-sources.md).
  • Provide a clear option schema with validation messages—LLMs and tooling rely on those hints to prompt users.
  • Reuse shared components: checkpoint store SPI, metrics listener, error policy.
  • Document new connectors under this section to keep the matrix up to date.

8. References

Path Description
fluxion-core/src/main/java/.../ConnectorRegistry.java Central registry for source/sink providers.
fluxion-core/src/main/java/.../SourceConnectorProvider.java Provider contract.
fluxion-core/src/main/java/.../SourceConnectorConfig.java Config builder/validation.
https://docs.srotax.com/connect/kafka/ Kafka-specific options and examples.
https://docs.srotax.com/connect/custom-sources/ SPI guide for bespoke connectors.

Run connector tests along with streaming tests:

mvn -pl fluxion-connect -am test

This ensures connector discovery, option validation, and streaming executors are validated together.