Connector Manifest & SDK
This page documents the connector manifest format and how the dispatcher/SDK load it. For building custom providers/runtimes, see the “Build Custom Connector” pages in the sidebar.
- Thin, reusable integrations for external systems (HTTP, DBs, Kafka, S3, etc.).
- Multi-tenant aware: never leak data/secrets across tenants.
- Declarative manifests (JSON/YAML) plus optional Java handlers.
- A fixed set of execution types—no connector-specific executors.
Execution types (required)
Every operation declares an execution.type (do not invent new types):
http— generic HTTP/REST callsjavaBean— call a Spring bean implementing connector logic (SDKs, DB drivers, etc.)pipelineCall— invoke another SrotaX pipeline or rulesetwebhook— inbound HTTP/webhook triggerspolling— periodic polling of external systemsstreaming— streaming sources/sinks (Kafka, HTTP sink, or custom)timer— cron triggers, delays, scheduled tasks
LLM-friendly recipe
- Pick
execution.type(http | javaBean | pipelineCall | webhook | polling | streaming | timer). - Write a manifest with
id,version,operations,operationDefs[*].kind(action/trigger), andexecution.type. - Implement handlers (if using SDK):
- Actions:
ConnectorActionHandleror extendAbstractActionHandler. - Triggers:
ConnectorTriggerHandlerreturningFlux<O>. - Enforce secrets/multi-tenancy via
ConnectorContext.resolveSecret, keep handlers stateless.
Sample manifests by execution.type
http (action)
{
"operations": { "echo": "echo" },
"operationDefs": {
"echo": {
"operationId": "echo",
"kind": "action",
"execution": {
"type": "http",
"method": "POST",
"urlTemplate": "https://api.example.com/echo",
"timeoutMs": 5000
}
}
}
}
javaBean (action)
{
"operations": { "add": "add" },
"operationDefs": {
"add": {
"operationId": "add",
"kind": "action",
"execution": {
"type": "javaBean",
"beanName": "addHandler"
}
}
}
}
pipelineCall (action)
{
"operations": { "invoke": "invoke" },
"operationDefs": {
"invoke": {
"operationId": "invoke",
"kind": "action",
"execution": {
"type": "pipelineCall",
"targetPipeline": "downstream.orders",
"version": "1.0.0"
}
}
}
}
webhook (trigger)
{
"operations": { "ingest": "ingest" },
"operationDefs": {
"ingest": {
"operationId": "ingest",
"kind": "trigger",
"execution": {
"type": "webhook",
"path": "/webhook",
"method": "POST"
}
}
}
}
polling (trigger)
{
"operations": { "poll": "poll" },
"operationDefs": {
"poll": {
"operationId": "poll",
"kind": "trigger",
"execution": {
"type": "polling",
"intervalMillis": 30000
}
}
}
}
streaming (trigger)
{
"operations": { "stream": "stream" },
"operationDefs": {
"stream": {
"operationId": "stream",
"kind": "trigger",
"execution": {
"type": "streaming",
"stream": "orders",
"source": { "type": "kafka", "connectionRef": "kafka-orders" },
"sink": { "type": "http", "connectionRef": "http-ingest" }
}
}
}
}
streaming with pipeline process + pipeline sink (trigger)
{
"operations": { "stream": "stream" },
"operationDefs": {
"stream": {
"operationId": "stream",
"kind": "trigger",
"execution": {
"type": "streaming",
"stream": "orders",
"process": {
"type": "pipeline",
"pipeline": "normalize",
"version": "1.0.0"
},
"sink": {
"type": "pipeline",
"pipeline": "enrich-orders",
"next": [
{ "type": "http", "connectionRef": "http-notify" },
{ "type": "kafka", "connectionRef": "kafka-out" }
]
}
}
}
}
}
End-to-end: Kafka source → process → HTTP sink (templates + connections + manifest)
templates.yaml
- id: kafka.source
displayName: Kafka Source
category: kafka
executionType: STREAMING
sourceSchema:
type: object
required: [bootstrapServers, topic, groupId]
properties:
bootstrapServers: { type: string }
topic: { type: string }
groupId: { type: string }
keyDeserializer: { type: string, default: org.apache.kafka.common.serialization.StringDeserializer }
valueDeserializer: { type: string, default: org.apache.kafka.common.serialization.StringDeserializer }
- id: http.sink
displayName: HTTP Sink
category: http
executionType: STREAMING
sinkSchema:
type: object
required: [urlTemplate]
properties:
urlTemplate: { type: string }
headers: { type: object, additionalProperties: { type: string } }
retryInstance: { type: string }
circuitBreakerInstance: { type: string }
allowEmpty: { type: boolean, default: false }
connections.yaml
- connectionRef: kafka.orders
connectorId: kafka.source
config:
type: kafka
bootstrapServers: broker1:9092,broker2:9092
topic: orders
groupId: orders-stream
- connectionRef: http.orders
connectorId: http.sink
config:
type: http
urlTemplate: https://api.example.com/orders
headers:
Authorization: Bearer TOKEN
retryInstance: retry-a
circuitBreakerInstance: cb-a
manifest (JSON)
{
"schemaVersion": "1.0.0",
"id": "orders-streaming",
"version": "1.0.0",
"displayName": "Orders Stream",
"operations": { "orders-stream": "orders-stream" },
"operationDefs": {
"orders-stream": {
"operationId": "orders-stream",
"kind": "trigger",
"execution": {
"type": "streaming",
"stream": "orders",
"source": { "type": "kafka", "connectionRef": "kafka.orders" },
"process": { "type": "pipeline", "pipeline": "transform-orders", "version": "1.0.0" },
"sink": { "type": "http", "connectionRef": "http.orders", "allowEmpty": false }
}
}
}
}
Notes:
- pipeline/pipelineCall sinks forward their output into next sinks; next
accepts a single sink or a list for fan-out.
- Top-level process is a single object; supported types: pipeline / pipelineCall.
- fanout lives on the sink block (or via next list on a pipeline sink) and
delivers synchronously/serially today.
Define connectors programmatically (no manifest files)
Build the manifest objects in code and register them with the ConnectorRegistry
so they show up in discovery endpoints and run through the same dispatcher as
file-based manifests.
ConnectorOperationDefinition echoOp =
ConnectorOperationDefinition.builder("echo", ConnectorOperationDefinition.Kind.ACTION)
.execution(HttpExecution.builder()
.method("POST")
.urlTemplate("https://api.example.com/echo")
.build())
.inputSchema(Map.of("type", "object"))
.outputSchema(Map.of("type", "object"))
.build();
ConnectorManifest manifest = ConnectorManifest.builder("demo.echo", "1.0.0", "Demo Echo")
.operations(Map.of("echo", "echo"))
.operationDefs(Map.of("echo", echoOp))
.build();
ConnectorRegistry.getInstance().registerManifest(manifest);
For streaming triggers, build a ReactiveStreamExecution with source/sink
maps (including a single process object and sink next/fanout if needed)
and attach it to the operation definition. Registering the manifest makes it
discoverable and executable without shipping JSON/YAML files.
connections (file → registry)
- You can register connection instances (like ADF “linked services”) from a YAML/JSON file at startup and they’ll be available via connectionRef:
- connectionRef: demo.http
connectorId: demo.http # ties to the connector template
scope: { tenantId: acme, pipelineId: orders }
config:
type: http
urlTemplate: https://api.example.com/ship
headers:
Authorization: ${SECRET_TOKEN}
timeoutMs: 8000
- connectionRef: orders-db
connectorId: sql.connector
scope: { tenantId: acme, pipelineId: orders }
config:
type: sql
jdbcUrl: jdbc:postgresql://db:5432/orders
username: orders
password: ${DB_PASSWORD}
- A bootstrapper loads these and registers them into the connector registry per scope so
$enrich/$httpCall/pipeline sinks can resolveconnectionRef. - Runtime loader: point
fluxion.connect.templates-path/fluxion.connect.connections-pathsystem properties at your YAML/JSON files and initialize viaConnectorRegistryInitializer.loadFromSystemProperties(). The HTTP and Kafka sinks and enrichment operators then resolveconnectionReffrom the in-memory registry. - HTTP and Kafka sinks also accept
retryInstance/circuitBreakerInstance(Resilience4j names) when usingconnectionRefor inline config; populate the Resilience4j registries and the sink will wrap calls accordingly.
connections (programmatic registration) - Build connections at runtime and register them directly with the in-memory registry; useful for SDK-only flows or when pulling config from a service:
InMemoryConnectorRegistry registry = new InMemoryConnectorRegistry();
registry.registerConnection(
ConnectionScope.defaultScope(),
"orders-api",
Map.of(
"type", "http",
"endpoint", "https://api.example.com/orders",
"headers", Map.of("Authorization", "Bearer TOKEN"),
"retryInstance", "http-retry"
)
);
HttpSinkProvider.RegistryHolder.registry = registry; // make sinks/operators see it
- You can also register templates in code if you need to hydrate multiple
connectionRefs from the same base template.
timer (trigger)
{
"operations": { "tick": "tick" },
"operationDefs": {
"tick": {
"operationId": "tick",
"kind": "trigger",
"execution": {
"type": "timer",
"cron": "PT1M"
}
}
}
}
For custom providers/runtimes and per-type recipes, see: - Build custom connector: HTTP - Build custom connector: JavaBean - Build custom connector: Pipeline call - Build custom connector: Webhook - Build custom connector: Polling - Build custom connector: Streaming - Build custom connector: Timer
@Configuration
public class HeartbeatTimerConfig {
@Bean
CommandLineRunner startTimer(PipelineExecutor pipelineExecutor) {
return args -> {
List<Stage> stages = DocumentParser.getStagesFromJsonArray(
Files.readString(Path.of("pipelines/heartbeat.json"))
);
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
Runnable runPipeline = () -> {
List<Document> input = DocumentParser.getDocumentsFromJsonArray("[{}]"); // pipeline adds fields
pipelineExecutor.run(input, stages, Map.of("tenantId", "acme")); // globals hold tenant
};
scheduler.scheduleAtFixedRate(runPipeline, 0, 5, TimeUnit.MINUTES);
};
}
}
Swap out the scheduler for your trigger runtime so that the timer manifest drives scheduling automatically and the emitted document (firedAt, message) becomes the first input into your pipeline.
Pipeline invoker + resilience (starter defaults)
- The Spring Boot starter ships a default
PipelineCallInvokerthat POSTs to/api/pipelines/{name}/{version}:runon the pipeline-service. - Configure target URL + timeout and bind retry/circuit-breaker via Resilience4j registries:
fluxion:
connect:
pipeline:
base-url: http://fluxion-pipeline-service:8085
timeout-ms: 10000
headers:
Authorization: "Bearer <token>" # optional default headers (e.g., auth)
retry-instance: pipeline-service # optional override for retry instance name
circuit-breaker-instance: pipeline-service
resilience4j:
retry:
instances:
pipeline-service:
max-attempts: 3
wait-duration: 500ms
circuitbreaker:
instances:
pipeline-service:
failure-rate-threshold: 50
wait-duration-in-open-state: 5s
permitted-number-of-calls-in-half-open-state: 3
sliding-window-size: 10
- This invoker is used for
execution.type=pipelineCall, pipeline sinks (withnext), and top-level streamingprocess(single object). Inputs are sent asdocument; outputs flow to the sink chain or back to the caller. Default headers are applied to the POST. - Connections loaded via the registry loader become available to any connector/operator that uses
connectionRef(e.g.,$httpCall,$enrich, streaming sources/sinks).
Manifest shape (JSON/YAML)
{
"schemaVersion": "1.0.0",
"id": "slack.webapi",
"version": "1.0.0",
"displayName": "Slack Web API",
"category": "messaging",
"auth": {
"type": "bearerToken",
"configSchema": { "type": "object" }
},
"configSchema": { "type": "object" },
"operations": {
"postMessage": { "$ref": "#/operationDefs/postMessage" }
},
"operationDefs": {
"postMessage": {
"operationId": "postMessage",
"kind": "action",
"inputSchema": { "type": "object" },
"outputSchema": { "type": "object" },
"execution": {
"type": "http",
"method": "POST",
"urlTemplate": "{{config.baseUrl}}/chat.postMessage",
"authStyle": "bearerTokenFromConfig.botTokenRef",
"timeoutMs": 10000
}
}
}
}
Fields:
id,version,displayName,categoryauth:type, optionalconfigSchemaconfigSchema: connector-level JSON Schemaoperations: map of operation name → ref intooperationDefsoperationDefs[*]:operationId,kind(actionortrigger),inputSchema,outputSchema,execution(must includeexecution.type)
How connectors are resolved at runtime
- Manifest drives dispatch:
connectionRef+connectorId+operationIdselect the connector and operation. Enrichment operators ($httpCall,$sqlQuery,$enrich) pass these into the Connector Dispatcher; streaming connectors useSourceConnectorProvider/SinkConnectorProvider. - Scopes: the connector registry is scoped by tenant/pipeline. Set the scope (
tenant::pipeline) before registering connectors to avoid cross-tenant leakage. - ServiceLoader: source/sink providers are discovered via
META-INF/services/ai.fluxion.core.engine.connectors.SourceConnectorProvider(or sink). JavaBean executions are resolved from your DI container bybeanName. - Auth/config:
ConnectorContextsupplies tenant/pipeline IDs, config, secret resolution, logger, metrics, and tracing; never cache per-tenant secrets statically.
Dispatcher vs. providers
- Built-in in the dispatcher:
webhook,polling,timer,javaBean,pipelineCall,http. With a manifest that declares these types, the dispatcher spins up the webhook server, polling interval, timer ticks, or invokes the registered bean/pipeline/HTTP call. No provider SPI is needed—just register your action/trigger handlers (for javaBean/polling) and optional pipeline invoker/HTTP headers, etc. - Invocation paths: use
ManifestConnectorDispatcherdirectly, or invoke actions from pipelines via$enrich(or the specialized operators like$httpCall/$sqlQuerywhere available). - Provider SPI required:
streamingsources/sinks. You must haveSourceConnectorProvider/SinkConnectorProviderimplementations for thesource.type/sink.type(Kafka/custom) discovered via ServiceLoader. - Manifest loading: ServiceLoader only discovers providers. Manifests must be loaded separately—either via the Spring Boot starter classpath scan (e.g.,
classpath*:manifests/*.json) or manually withConnectorManifestLoader.load(...)before using the dispatcher/$enrich.
For per-type recipes and custom connector creation, see the “Build Custom Connector” pages in the sidebar (HTTP, JavaBean, pipeline call, webhook, polling, streaming, timer).
Execution-type examples at a glance
Use these patterns per execution.type when authoring manifests.
http (action)
{
"execution": {
"type": "http",
"method": "POST",
"urlTemplate": "https://api.example.com/echo",
"headers": { "X-Tenant": "{{context.tenantId}}" },
"timeoutMs": 5000
}
}
Per-operation schemas (input/output)
- Define payload shapes per operation via operationDefs[*].inputSchema / outputSchema (separate from connector config in templates.yaml).
- Example (Slack postMessage):
{
"operations": { "postMessage": "postMessage" },
"operationDefs": {
"postMessage": {
"operationId": "postMessage",
"kind": "action",
"inputSchema": {
"type": "object",
"required": ["channel", "text"],
"properties": {
"channel": { "type": "string" },
"text": { "type": "string" },
"thread_ts": { "type": "string" },
"attachments": { "type": "array", "items": { "type": "object" } }
}
},
"outputSchema": { "type": "object" },
"execution": {
"type": "http",
"method": "POST",
"urlTemplate": "{{config.baseUrl}}/chat.postMessage",
"headers": { "Authorization": "Bearer {{config.tokenRef}}" }
}
}
}
}
Each operation can have its own schema; reuse the connector template/config for shared settings like base URL, auth, and timeouts.
javaBean (action)
{
"execution": {
"type": "javaBean",
"beanName": "helloConnector" // Spring/Micronaut bean implementing ConnectorActionHandler
}
}
pipelineCall (action)
{
"execution": {
"type": "pipelineCall",
"targetPipeline": "downstream.orders",
"version": "1.0.0"
}
}
webhook (trigger)
{
"execution": {
"type": "webhook",
"path": "/webhook",
"method": "POST"
}
}
polling (trigger)
{
"execution": {
"type": "polling",
"intervalMillis": 30000,
"beanName": "pollerHandler" // implements ConnectorTriggerHandler
}
}
streaming (trigger)
{
"execution": {
"type": "streaming",
"stream": "orders",
"source": { "type": "kafka", "connectionRef": "kafka-orders" },
"sink": { "type": "http", "connectionRef": "http-sink" }
}
}
timer (trigger)
{
"execution": {
"type": "timer",
"cron": "PT5M"
}
}
Java Connector SDK
Use these interfaces when custom logic is needed (typically for javaBean, polling,
streaming implementations):
ConnectorActionHandler<I, O>— one-shot actions (send email/HTTP/db write).ConnectorTriggerHandler<I, O>— triggers/streams; returnsFlux<O>.ConnectorContext— tenant/pipeline metadata, config, secret resolver, logger, metrics, tracing; multi-tenant safe.AbstractActionHandler<I, O>— convenience wrapper to record latency and log failures.
Example action handler:
@Component("smtpEmailConnector")
public class SmtpEmailConnector extends AbstractActionHandler<SendEmailInput, SendEmailOutput> {
private final JavaMailSender mailSender;
public SmtpEmailConnector(JavaMailSender mailSender) { this.mailSender = mailSender; }
@Override
protected SendEmailOutput doExecute(ConnectorContext ctx, SendEmailInput input) throws Exception {
SmtpConfig cfg = ctx.getConnectorConfig(SmtpConfig.class);
ctx.getLogger().info("Sending email for tenant {} to {}", ctx.getTenantId(), input.to);
// ... send ...
return new SendEmailOutput("sent");
}
}
Secrets and multi-tenancy
- Store only secret references (e.g.,
secret://vault/t-123/slack/botToken) in config. - Mark secret fields in schemas with
"x-secret": true. - Resolve secrets only via
ConnectorContext.resolveSecret(secretRef). - Never log or emit secret values.
Orchestration boundary
- Pipeline orchestration lives in the engine (
ConnectorEngine,PipelineEngine). - Handlers should perform external I/O only; do not embed orchestration logic.
- Triggers/streams are started by the trigger runtime; actions are invoked by pipeline connector nodes.
Catalogs (streams/sinks)
ConnectorStreamDescriptorexposes stream metadata (name, namespace, schema, sync modes, cursor/PK hints) forstreaming/pollingconnectors and sinks.- Use discovery endpoints (
/api/connectors/discovery/sources|sinksin pipeline-service) to fetch catalogs; pipeline specs remain connector-agnostic.
Checklist for new connectors
- Pick an
execution.typefrom the fixed list. - Provide a manifest with
id/version/displayName/category,auth,configSchema,operations+operationDefs(withexecution.type). - For custom logic, implement
ConnectorActionHandler/ConnectorTriggerHandleror extendAbstractActionHandler. - Respect multi-tenancy and secret handling via
ConnectorContext. - (Streaming) Expose
ConnectorStreamDescriptorif applicable for catalogs.