Connector Manifest & SDK
This page documents the connector manifest format and the Java SDK used for custom handlers. The goals:
- Thin, reusable integrations for external systems (HTTP, DBs, Kafka, S3, etc.).
- Multi-tenant aware: never leak data/secrets across tenants.
- Declarative manifests (JSON/YAML) plus optional Java handlers.
- A fixed set of execution types—no connector-specific executors.
Execution types (required)
Every operation declares an execution.type (do not invent new types):
http— generic HTTP/REST callsjavaBean— call a Spring bean implementing connector logic (SDKs, DB drivers, etc.)pipelineCall— invoke another Fluxion pipeline or rulesethttpServer— inbound HTTP/webhook triggerspolling— periodic polling of external systemsreactiveStream— streaming sources (Kafka, Pub/Sub, CDC, etc.)timer— cron triggers, delays, scheduled tasks
LLM-friendly recipe
- Pick
execution.type(http | javaBean | pipelineCall | httpServer | polling | reactiveStream | timer). - Write a manifest with
id,version,operations,operationDefs[*].kind(action/trigger), andexecution.type. - Implement handlers:
- Actions:
ConnectorActionHandleror extendAbstractActionHandler. - Triggers:
ConnectorTriggerHandlerreturningFlux<O>. - Streaming:
SourceConnectorProvider/SinkConnectorProvider+discoverStreamsreturningConnectorStreamDescriptor. - Package:
- Put manifest under
src/main/resources/manifests/. - Add ServiceLoader files:
META-INF/services/ai.fluxion.core.engine.connectors.SourceConnectorProvider/...SinkConnectorProvider. - Register Spring beans for
javaBean/httpServer/pipelineCallexecutions. - Enforce secrets/multi-tenancy via
ConnectorContext.resolveSecret, keep handlers stateless, use discovery endpoints for catalogs.
Sample manifests by execution.type
http (action)
{
"operations": { "echo": "echo" },
"operationDefs": {
"echo": {
"operationId": "echo",
"kind": "action",
"execution": {
"type": "http",
"method": "POST",
"urlTemplate": "https://api.example.com/echo",
"timeoutMs": 5000
}
}
}
}
javaBean (action)
{
"operations": { "add": "add" },
"operationDefs": {
"add": {
"operationId": "add",
"kind": "action",
"execution": {
"type": "javaBean",
"beanName": "addHandler"
}
}
}
}
pipelineCall (action)
{
"operations": { "invoke": "invoke" },
"operationDefs": {
"invoke": {
"operationId": "invoke",
"kind": "action",
"execution": {
"type": "pipelineCall",
"targetPipeline": "downstream.orders",
"version": "1.0.0"
}
}
}
}
httpServer (trigger/webhook)
{
"operations": { "ingest": "ingest" },
"operationDefs": {
"ingest": {
"operationId": "ingest",
"kind": "trigger",
"execution": {
"type": "httpServer",
"path": "/webhook",
"method": "POST"
}
}
}
}
polling (trigger)
{
"operations": { "poll": "poll" },
"operationDefs": {
"poll": {
"operationId": "poll",
"kind": "trigger",
"execution": {
"type": "polling",
"intervalMillis": 30000
}
}
}
}
reactiveStream (trigger)
{
"operations": { "stream": "stream" },
"operationDefs": {
"stream": {
"operationId": "stream",
"kind": "trigger",
"execution": {
"type": "reactiveStream",
"stream": "orders",
"group": "connectors"
}
}
}
}
timer (trigger)
{
"operations": { "tick": "tick" },
"operationDefs": {
"tick": {
"operationId": "tick",
"kind": "trigger",
"execution": {
"type": "timer",
"cron": "PT1M"
}
}
}
}
Manifest shape (JSON/YAML)
{
"schemaVersion": "1.0.0",
"id": "slack.webapi",
"version": "1.0.0",
"displayName": "Slack Web API",
"category": "messaging",
"auth": {
"type": "bearerToken",
"configSchema": { "type": "object" }
},
"configSchema": { "type": "object" },
"operations": {
"postMessage": { "$ref": "#/operationDefs/postMessage" }
},
"operationDefs": {
"postMessage": {
"operationId": "postMessage",
"kind": "action",
"inputSchema": { "type": "object" },
"outputSchema": { "type": "object" },
"execution": {
"type": "http",
"method": "POST",
"urlTemplate": "{{config.baseUrl}}/chat.postMessage",
"authStyle": "bearerTokenFromConfig.botTokenRef",
"timeoutMs": 10000
}
}
}
}
Fields:
id,version,displayName,categoryauth:type, optionalconfigSchemaconfigSchema: connector-level JSON Schemaoperations: map of operation name → ref intooperationDefsoperationDefs[*]:operationId,kind(actionortrigger),inputSchema,outputSchema,execution(must includeexecution.type)
Java Connector SDK
Use these interfaces when custom logic is needed (typically for javaBean, polling,
reactiveStream):
ConnectorActionHandler<I, O>— one-shot actions (send email/HTTP/db write).ConnectorTriggerHandler<I, O>— triggers/streams; returnsFlux<O>.ConnectorContext— tenant/pipeline metadata, config, secret resolver, logger, metrics, tracing; multi-tenant safe.AbstractActionHandler<I, O>— convenience wrapper to record latency and log failures.
Example action handler:
@Component("smtpEmailConnector")
public class SmtpEmailConnector extends AbstractActionHandler<SendEmailInput, SendEmailOutput> {
private final JavaMailSender mailSender;
public SmtpEmailConnector(JavaMailSender mailSender) { this.mailSender = mailSender; }
@Override
protected SendEmailOutput doExecute(ConnectorContext ctx, SendEmailInput input) throws Exception {
SmtpConfig cfg = ctx.getConnectorConfig(SmtpConfig.class);
ctx.getLogger().info("Sending email for tenant {} to {}", ctx.getTenantId(), input.to);
// ... send ...
return new SendEmailOutput("sent");
}
}
Secrets and multi-tenancy
- Store only secret references (e.g.,
secret://vault/t-123/slack/botToken) in config. - Mark secret fields in schemas with
"x-secret": true. - Resolve secrets only via
ConnectorContext.resolveSecret(secretRef). - Never log or emit secret values.
Orchestration boundary
- Pipeline orchestration lives in the engine (
ConnectorEngine,PipelineEngine). - Handlers should perform external I/O only; do not embed orchestration logic.
- Triggers/streams are started by the trigger runtime; actions are invoked by pipeline connector nodes.
Catalogs (streams/sinks)
ConnectorStreamDescriptorexposes stream metadata (name, namespace, schema, sync modes, cursor/PK hints) forreactiveStream/pollingconnectors and sinks.- Use discovery endpoints (
/api/connectors/discovery/sources|sinksin pipeline-service) to fetch catalogs; pipeline specs remain connector-agnostic.
Checklist for new connectors
- Pick an
execution.typefrom the fixed list. - Provide a manifest with
id/version/displayName/category,auth,configSchema,operations+operationDefs(withexecution.type). - For custom logic, implement
ConnectorActionHandler/ConnectorTriggerHandleror extendAbstractActionHandler. - Respect multi-tenancy and secret handling via
ConnectorContext. - (Streaming) Expose
ConnectorStreamDescriptorif applicable for catalogs.