MongoDB Connectors
Fluxion Connect ships MongoDB source and sink connectors so pipelines can watch
change streams and persist results back to collections.
1. Prerequisites
| Requirement |
Notes |
| Dependency |
ai.fluxion:fluxion-connect plus MongoDB driver (org.mongodb:mongodb-driver-sync). |
| Cluster type |
Replica set or sharded cluster (change streams are not supported on stand‑alone instances). |
| Permissions |
User with change-stream privileges (read, readWrite, changeStream). |
| Checkpoint store |
JDBC/Redis/custom store for streaming offsets. |
2. Source configuration (type: mongodb)
YAML snippet
source:
type: mongodb
options:
connectionString: "mongodb://mongo0:27017,mongo1:27017/?replicaSet=rs0"
database: orders
collection: events
queueCapacity: 64
maxBatchSize: 128
maxAwaitTime: PT5S
fullDocument: update_lookup
pipeline:
- { $match: { operationType: { $in: ["insert", "replace", "update"] } } }
Options
| Option |
Description |
Default |
connectionString |
MongoDB connection string pointing to replica set / sharded cluster. |
Required |
database |
Database to monitor. |
Required |
collection |
Collection to monitor. |
Required |
pipeline |
Additional aggregation stages applied to the change stream. |
[] |
queueCapacity |
Internal queue size feeding the pipeline. |
64 |
maxBatchSize |
Maximum events per batch. |
128 |
maxAwaitTime |
Wait before emitting empty batches (ISO-8601). |
PT5S |
fullDocument |
default, update_lookup, when_available, or required. |
default |
3. Sink configuration (type: mongodb)
YAML snippet
sink:
type: mongodb
options:
connectionString: "mongodb://mongo0:27017,mongo1:27017/?replicaSet=rs0"
database: analytics
collection: enriched_orders
mode: upsert
keyField: _id
writeConcern: MAJORITY
Options
| Option |
Description |
Default |
connectionString |
MongoDB connection string. |
Required |
database |
Database to write to. |
Required |
collection |
Collection to write to. |
Required |
mode |
insert, replace, or upsert. |
insert |
keyField |
Field used to identify documents for replace/upsert. |
_id |
writeConcern |
Write concern (ACKNOWLEDGED, MAJORITY, etc.). |
ACKNOWLEDGED |
4. Stream catalogs (for UIs/CLIs)
- Source (
discoverStreams): name=collection, namespace=database, supportedSyncModes=[FULL_REFRESH, INCREMENTAL], primaryKeys=[["_id"]], cursorFields=["resumeToken"], sourceDefinedCursor=true (connector-owned).
- Sink (
destinationStreams): name=collection, namespace=database, primaryKeys=[keyField], supportedSyncModes=[FULL_REFRESH].
Fetch via /api/connectors/discovery/sources|sinks to keep pipeline specs connector-agnostic.
5. Troubleshooting
| Symptom |
Possible cause |
Remedy |
IllegalArgumentException: connectionString missing |
Config validation failure. |
Supply a replica-set/sharded connection string. |
MongoCommandException: Change streams not enabled |
Connecting to standalone server. |
Configure a replica set or sharded cluster. |
| Events lagging |
Change stream backlog or slow consumer. |
Increase queueCapacity/maxBatchSize and inspect MongoDB performance metrics. |
| Sink duplicate key errors |
Upsert/replace without matching keyField. |
Set mode appropriately and ensure documents contain the key field. |
6. Testing
- Run MongoDB connector tests:
bash
mvn -pl fluxion-core -am test -Dtest=*Mongo*
- Integration tests require access to a MongoDB replica set; for local use, spin
up
docker compose with --replSet enabled or use MongoDB’s test containers.
7. References
| Path |
Description |
fluxion-core/src/main/java/.../MongoSourceConnectorProvider.java |
Source provider implementation. |
fluxion-core/src/main/java/.../MongoSinkConnectorProvider.java |
Sink provider implementation. |
| MongoDB docs |
Change streams, write concern. |
Use these templates to wire MongoDB into streaming pipelines, adjusting
queue/batch settings to balance latency versus throughput.