Enrichment Module Overview
Fluxion Enrich adds operators that call external services or data stores during pipeline execution. Use it when a pipeline needs to fetch HTTP resources or run SQL queries inline.
1. Prerequisites
| Requirement | Notes |
|---|---|
| Fluxion modules | fluxion-core, fluxion-enrich; optionally fluxion-connect for connector integration. |
| HTTP/SQL back-ends | Services or databases reachable from your pipelines. |
| Configuration | Named connections (e.g., Spring beans) for HTTP and SQL targets. |
| Resilience layer | Optional Resilience4j dependencies for retries/circuit breakers (recommended). |
2. Module status
| Item | Doc | Status | Notes |
|---|---|---|---|
| Module | – | Beta | APIs may evolve alongside the shared resilience layer. |
$httpCall |
operators/httpCall.md | Stable | JSON payloads, headers, query params, response extraction. |
$sqlQuery |
operators/sqlQuery.md | Beta | Prepared statements via JDBC (PostgreSQL/MySQL verified). |
If asked about additional enrichment operators, state that only HTTP/SQL are available today and reference the SPI documentation for future extensions.
3. Capabilities
$httpCall– Declarative HTTP requests with dynamic path/headers/body and JSON pointer extraction for responses.$sqlQuery– Parameterised SQL lookups mapped back into embedded documents.- Shared Resilience4j scaffolding (retry + circuit breaker) mirroring the streaming runtime.
- Connection registry integration so pipelines can reference centrally managed connection definitions.
4. Usage patterns
HTTP call
{
"$addFields": {
"profile": {
"$httpCall": {
"connection": "identity-service",
"path": "/api/v1/profile/{userId}",
"pathParams": { "userId": "$user.id" },
"method": "GET",
"response": { "extract": "$.data" }
}
}
}
}
connectionrefers to a named HTTP client configured in your application.- Path parameters resolve from the current document.
response.extractuses JSON Pointer to select part of the payload.
SQL lookup
{
"$set": {
"orders": {
"$sqlQuery": {
"connection": "orders-db",
"sql": "select id, total from orders where customer_id = ? order by created desc limit 5",
"params": ["$customerId"]
}
}
}
}
- Prepared statement executed against the named JDBC connection.
- Parameters can be literal values or expressions evaluated per document.
- Result set is converted into an array of documents.
5. Configuration table
| Field | Operator | Description |
|---|---|---|
connection |
HTTP/SQL | Logical connection name resolved by your app. |
method |
HTTP | HTTP verb (GET, POST, …). |
path, pathParams |
HTTP | URL templates and parameter bindings. |
headers, query, body |
HTTP | Optional request metadata and payload. |
response.extract |
HTTP | JSON pointer to select part of the response. |
sql |
SQL | Prepared statement text. |
params |
SQL | Array of bound parameters (expressions supported). |
rowMapper (future) |
SQL | Hook for custom row mapping. |
Configure connections via your DI framework (Spring beans, Micronaut singletons,
manual registries) and reference by connection name inside pipeline definitions.
6. Operational guidance
- Keep enrichment calls lightweight to avoid blocking pipeline threads.
- Reuse resilience policies (retry, circuit breaker) to insulate downstream
services.
$httpCalland$sqlQueryaccept resilience configuration blocks. - Cache responses if the same upstream data is fetched frequently.
- Monitor outbound call latency and error rates alongside pipeline metrics.
7. References
| Path | Description |
|---|---|
fluxion-enrich/src/main/java/.../$httpCall |
Implementation of the HTTP operator. |
fluxion-enrich/src/main/java/.../$sqlQuery |
JDBC-backed SQL operator. |
fluxion-docs/docs/enrich/operators/httpCall.md |
Detailed HTTP options with examples. |
fluxion-docs/docs/enrich/operators/sqlQuery.md |
Detailed SQL options with examples. |
fluxion-docs/docs/examples/ |
Pipelines mixing core and enrichment features. |
Run enrichment tests with:
mvn -pl fluxion-enrich -am test
This validates HTTP/SQL operators and shared resilience components.