Azure Event Hubs Connectors
Fluxion Connect includes Event Hubs source and sink connectors so pipelines can
ingest from and publish back to Azure Event Hubs without extra glue code.
1. Prerequisites
| Requirement |
Notes |
| Dependency |
ai.fluxion:fluxion-connect plus com.azure:azure-messaging-eventhubs. |
| Azure namespace |
Event Hubs namespace reachable from the Fluxion service. |
| Access |
SAS connection string with Listen/Send permissions as needed. |
| Checkpoint store |
JDBC/Redis/custom store when running streaming pipelines. |
2. Source configuration (type: eventhub)
YAML snippet
source:
type: eventhub
options:
connectionString: "Endpoint=sb://your-namespace.servicebus.windows.net/;SharedAccessKeyName=..."
eventHubName: orders
consumerGroup: $Default
queueCapacity: 64
maxBatchSize: 128
maxWaitTime: PT5S
startPosition: earliest
prefetchCount: 300
Options
| Option |
Description |
Default |
connectionString |
Azure Event Hubs connection string (can include entity path). |
Required |
eventHubName |
Event hub to read from (omit if set in the connection string). |
null |
consumerGroup |
Consumer group used by the receiver. |
$Default |
queueCapacity |
Internal queue size between Event Hubs client and pipeline. |
32 |
maxBatchSize |
Maximum events delivered per pipeline batch. |
128 |
maxWaitTime |
Time to wait before emitting an empty batch (ISO-8601). |
PT5S |
startPosition |
earliest or latest. |
earliest |
prefetchCount |
Client prefetch size. |
null |
3. Sink configuration (type: eventhub)
YAML snippet
sink:
type: eventhub
options:
connectionString: "Endpoint=sb://your-namespace.servicebus.windows.net/;SharedAccessKeyName=..."
eventHubName: orders-out
batchSize: 100
flushTimeout: PT10S
partitionId: "0"
partitionKey: customer-id
Options
| Option |
Description |
Default |
connectionString |
Event Hubs connection string. |
Required |
eventHubName |
Event hub to publish to. |
null |
batchSize |
Maximum documents batched per send. |
100 |
flushTimeout |
Wait for send completion (ISO-8601). |
PT10S |
partitionId |
Fixed partition id. Mutually exclusive with partitionKey. |
null |
partitionKey |
Partition key applied to the batch. |
null |
4. Stream catalogs (for UIs/CLIs)
- Source (
discoverStreams): name=eventHubName, namespace=eventhub, supportedSyncModes=[FULL_REFRESH, INCREMENTAL], cursorFields=["sequenceNumber"], sourceDefinedCursor=true (connector-owned).
- Sink (
destinationStreams): name=eventHubName, namespace=eventhub, supportedSyncModes=[FULL_REFRESH].
Fetch via /api/connectors/discovery/sources|sinks to keep pipeline specs connector-agnostic.
5. Troubleshooting
| Symptom |
Possible cause |
Remedy |
IllegalArgumentException: connectionString missing |
Config validation failure. |
Provide a connection string with Endpoint, SharedAccessKeyName, SharedAccessKey. |
com.azure.messaging.eventhubs.EventHubsException$ResourceNotFound |
Wrong eventHubName or insufficient permissions. |
Verify event hub exists and credentials have access. |
| Events stalled |
Consumer group checkpoint ahead of stream. |
Change consumerGroup or reset checkpoints. |
| Sink retries indefinitely |
Namespace unreachable or throttled. |
Adjust StreamingErrorPolicy (dead-letter/skip) and review Azure throttling limits. |
6. Testing
- Run Event Hubs connector tests:
bash
mvn -pl fluxion-core -am test -Dtest=*EventHub*
- Local testing requires an Azure Event Hubs namespace (no official local emulator). Use a dev namespace with shared access keys.
7. References
| Path |
Description |
fluxion-core/src/main/java/.../EventHubSourceConnectorProvider.java |
Source provider implementation. |
fluxion-core/src/main/java/.../EventHubSinkConnectorProvider.java |
Sink provider implementation. |
| Azure docs |
Event Hubs connection strings |
Use these templates to bootstrap your streaming pipelines; tweak batching and
queue sizes based on throughput and latency requirements.