Skip to content

Kafka Connectors

Fluxion Connect exposes Kafka as both a streaming source and sink. The connectors share the same SPI used across the platform, so the configuration maps cleanly onto streaming pipelines, error policies, and metrics.


1. Prerequisites

Requirement Notes
Dependency Add ai.fluxion:fluxion-connect plus Kafka client (org.apache.kafka:kafka-clients).
Kafka cluster Brokers reachable from the Fluxion service. Tested on Kafka 2.8+.
Credentials (optional) SASL/TLS material if connecting to secure clusters.
Checkpoint store JDBC/Redis/custom store for offsets when streaming.

2. Source configuration (type: kafka)

YAML snippet

source:
  type: kafka
  options:
    topic: orders
    bootstrapServers: localhost:9092
    pollTimeout: PT0.5S
    queueCapacity: 64
    keyDeserializer: org.apache.kafka.common.serialization.StringDeserializer
    valueDeserializer: org.apache.kafka.common.serialization.StringDeserializer
    securityProtocol: SASL_SSL
    saslMechanism: PLAIN
    saslJaasConfig: >
      org.apache.kafka.common.security.plain.PlainLoginModule required
      username="user" password="pass";
    enable.auto.commit: false   # passes through to Kafka consumer properties

Options

Option Description Default
topic Kafka topic to subscribe to. Required
bootstrapServers Comma-separated list of brokers. Required
pollTimeout Consumer poll wait (ISO-8601 duration). PT0.5S
queueCapacity Internal queue size feeding the pipeline. 16
keyDeserializer Kafka key deserializer class. StringDeserializer
valueDeserializer Kafka value deserializer class. StringDeserializer
groupId Consumer group id override. fluxion-<topic>
securityProtocol Kafka security.protocol (e.g., SSL, SASL_SSL). null
saslMechanism Kafka sasl.mechanism (e.g., SCRAM-SHA-512). null
saslJaasConfig JAAS config string for SASL. null

All additional keys under options are copied into the consumer Properties object, so you can enable idempotent consumers, partition assignment strategies, or custom timeouts.


3. Sink configuration (type: kafka)

YAML snippet

sink:
  type: kafka
  options:
    topic: orders-out
    bootstrapServers: localhost:9092
    batchSize: 200
    flushTimeout: PT5S
    acks: all
    keySerializer: org.apache.kafka.common.serialization.StringSerializer
    valueSerializer: org.apache.kafka.common.serialization.StringSerializer
    compression.type: gzip

Options

Option Description Default
topic Kafka topic to publish to. Required
bootstrapServers Comma-separated broker list. Required
batchSize Records per send batch. 100
flushTimeout Wait for acknowledgements (ISO-8601). PT10S
acks Producer acknowledgement level (acks). 1
keySerializer Kafka key serializer class. StringSerializer
valueSerializer Kafka value serializer class. StringSerializer
securityProtocol / saslMechanism / saslJaasConfig Same as source. null

Additional producer options (linger, retries, idempotence, compression) are passed through untouched.


4. Stream catalogs (for UIs/CLIs)

Kafka connectors expose catalogs for discovery endpoints:

  • Source (discoverStreams): name=topic, namespace=groupId or "kafka", supportedSyncModes=[FULL_REFRESH, INCREMENTAL], cursorFields=["offset"], sourceDefinedCursor=true (offsets are connector-owned).
  • Sink (destinationStreams): name=topic, namespace=bootstrapServers, supportedSyncModes=[FULL_REFRESH].

Fetch these via /api/connectors/discovery/sources|sinks so pipeline specs stay connector-agnostic.


5. Security options

Scenario Required settings
TLS without auth securityProtocol: SSL plus truststore configuration (system properties or Kafka props).
SASL/PLAIN securityProtocol: SASL_SSL, saslMechanism: PLAIN, saslJaasConfig: ....
SCRAM securityProtocol: SASL_SSL, saslMechanism: SCRAM-SHA-512, JAAS string with username/password.
Kerberos securityProtocol: SASL_PLAINTEXT, saslMechanism: GSSAPI, JAAS config referencing keytab/krb5.

Consult Kafka’s security docs for the exact property names; any additional keys (e.g., ssl.truststore.location) can be added directly to options.


6. Troubleshooting

Symptom Possible cause Remedy
NoSuchMethodError on Kafka classes Version mismatch between fluxion-connect and your Kafka client. Align Kafka client version with the cluster (2.8+ recommended).
Consumer stuck at oldest offset groupId shared across environments. Set a unique group id per environment or use auto.offset.reset.
TopicAuthorizationFailed Missing ACLs or credentials. Verify SASL/TLS settings and broker ACLs.
Sink retries endlessly Downstream brokers unavailable. Adjust StreamingErrorPolicy (dead-letter/skip) or configure Kafka producer retries.

7. Testing

  • Run connector tests alongside streaming tests: bash mvn -pl fluxion-core -am test -Dtest=*Kafka*
  • Local broker for manual testing: docker run --rm -p 9092:9092 confluentinc/cp-kafka.

8. References

Path Description
fluxion-core/src/main/java/.../KafkaSourceConnectorProvider.java Source provider implementation.
fluxion-core/src/main/java/.../KafkaSinkConnectorProvider.java Sink provider implementation.
fluxion-docs/docs/streaming/quickstart.md Kafka → HTTP streaming tutorial.

Use these snippets as templates for your own pipeline definitions and adjust properties according to your cluster’s configuration.