Skip to content

HTTP Sink Connector

Posts batches of documents to an HTTP endpoint. Discovered via type=http and implemented by HttpSinkProvider (module: fluxion-connect). You can either point directly at an endpoint or reuse a named HTTP connection from the connector registry (connectionRef) with shared headers/timeouts/resilience.

Options

Option Type Required Default Description
endpoint string No (yes if no connectionRef) Target HTTP endpoint to post batches to.
connectionRef string No Name of a registered HTTP connection (registry stores base URL, headers, resilience).
connectionScope string No default::default Optional scope for resolving connectionRef (e.g., tenant::pipeline).
retryInstance string No Resilience4j retry instance name (overrides connection defaults).
circuitBreakerInstance string No Resilience4j circuit breaker instance name (overrides connection defaults).
allowEmpty boolean No false Whether to send empty batches.
fanout array No List of additional sinks to call after this sink completes (synchronous).

Manifest usage (streaming sink)

"execution": {
  "type": "streaming",
  "source": { "type": "kafka", "bootstrapServers": "localhost:9092", "topic": "orders", "groupId": "g1" },
  "sink":   { "type": "http",  "connectionRef": "orders-api", "allowEmpty": false }
}

Register orders-api via ConnectorRegistryInitializer.loadFromSystemProperties(...) or programmatically (see Manifest & SDK guide).

SDK usage

ConnectorConfig sink = ConnectorConfig.builder("http", ConnectorConfig.Kind.SINK)
    .option("connectionRef", "orders-api")
    .option("allowEmpty", false)
    .build();
StreamingSink httpSink = ConnectorFactory.createSink(sink);

Registry-backed connections (optional)

Load HTTP connections at startup so multiple sinks/operators can reuse them:

ConnectorRegistryInitializer.loadFromSystemProperties(); // reads connections/templates paths
// or registry.registerConnection(scope, "orders-api", Map.of("endpoint", "https://api.example.com/ingest"));

retryInstance/circuitBreakerInstance resolve from the shared Resilience4j registries (populated by the Spring Boot starter or manually).

Use alongside a streaming source (Kafka/custom) with StreamingPipelineExecutor to deliver batches to your HTTP endpoint.

Chaining and fanout examples

// Pipeline sink feeding two HTTP sinks
"sink": {
  "type": "pipeline",
  "pipeline": "enrich-orders",
  "next": [
    { "type": "http", "connectionRef": "http-a" },
    { "type": "http", "connectionRef": "http-b" }
  ]
}

// Fanout directly from the HTTP sink (synchronous)
"sink": {
  "type": "http",
  "connectionRef": "http-a",
  "fanout": [
    { "type": "http", "connectionRef": "http-b" },
    { "type": "kafka", "connectionRef": "kafka-out" }
  ]
}