Skip to content

Executor Test Suite – Scenario Guide

Friendly breakdown of the unit tests in fluxion-core/src/test/java/.../PipelineExecutorTest.java covering $match, $project, system variables, and computed fields.


How to run the tests

mvn -pl fluxion-core test -Dtest=PipelineExecutorTest

Or run the entire module:

mvn -pl fluxion-core -am test

Scenario matrix

Test Purpose Key stages/operators
test_simple_match_pipeline Filter documents where value >= 20. $match, comparison operators
test_project_include_fields Return only name and age. $project (inclusion)
test_project_exclude_fields Remove city field. $project (exclusion)
test_project_computed_fields Add total via $multiply. $project, $multiply
test_project_exclude_id Remove _id while keeping name. $project, _id handling
test_project_nested_fields Project nested subfields (user.profile.first_name). $project, dotted paths

Each scenario below lists the input, pipeline, and expected output so you can replicate the behaviour in your own tests or services.


1. test_simple_match_pipeline

Input:
[
  { "value": 10 },
  { "value": 20 },
  { "value": 30 }
]

Pipeline:
[
  { "$match": { "value": { "$gte": 20 } } }
]

Output:
[
  { "value": 20 },
  { "value": 30 }
]

Tip: $match supports the full MongoDB query syntax, including compound conditions ($and, $or).


2. test_project_include_fields

Input:
[
  { "name": "Alice", "age": 30, "city": "New York" }
]

Pipeline:
[
  { "$project": { "name": 1, "age": 1 } }
]

Output:
[
  { "name": "Alice", "age": 30 }
]

Tip: When using inclusion, _id is present unless explicitly excluded.


3. test_project_exclude_fields

Input:
[
  { "name": "Bob", "age": 25, "city": "Los Angeles" }
]

Pipeline:
[
  { "$project": { "city": 0 } }
]

Output:
[
  { "name": "Bob", "age": 25 }
]

Tip: Exclusion-style projections cannot mix inclusion (other than _id).


4. test_project_computed_fields

Input:
[
  { "item": "A", "price": 10, "qty": 2 }
]

Pipeline:
[
  {
    "$project": {
      "item": 1,
      "total": { "$multiply": ["$price", "$qty"] }
    }
  }
]

Output:
[
  { "item": "A", "total": 20 }
]

Tip: Combine $multiply with $add/$divide for discount or tax calculations.


5. test_project_exclude_id

Input:
[
  { "_id": 123, "name": "Charlie", "city": "Miami" }
]

Pipeline:
[
  { "$project": { "_id": 0, "name": 1 } }
]

Output:
[
  { "name": "Charlie" }
]

Tip: This is a common pattern when returning documents over REST APIs.


6. test_project_nested_fields

Input:
[
  {
    "user": {
      "profile": {
        "first_name": "Dana",
        "last_name": "Smith"
      }
    },
    "age": 40
  }
]

Pipeline:
[
  { "$project": { "user.profile.first_name": 1, "age": 1, "_id": 0 } }
]

Output:
[
  {
    "user": { "profile": { "first_name": "Dana" } },
    "age": 40
  }
]

Tip: Use dotted paths to include/exclude nested fields. Combine with $map/$filter when the nesting involves arrays.


Feel free to extend the suite with additional cases (renamed fields, computed aliases, system variable usage). The structure above keeps everything explicit for humans and assistants alike.

📌 Pipeline

[
  { "$project": { "user.profile.first_name": 1, "age": 1 } }
]

📤 Output

[
  {
    "user": { "profile": { "first_name": "Dana" } },
    "age": 40
  }
]

✅ test_project_renamed_fields_with_expression

📖 Description

Uses computed aliases for fields using $project.

📥 Input Document

[
  { "name": "Eve", "visits": 5 }
]

📌 Pipeline

[
  {
    "$project": {
      "username": "$name",
      "visit_count": "$visits"
    }
  }
]

📤 Output

[
  { "username": "Eve", "visit_count": 5 }
]

✅ test_replace_with_simple_field

📖 Description

Replaces full document with a subdocument field using $replaceWith.

📥 Input Document

[
  {
    "product": "Widget",
    "details": {
      "name": "Widget A",
      "price": 100
    }
  }
]

📌 Pipeline

[
  { "$replaceWith": "$details" }
]

📤 Output

[
  { "name": "Widget A", "price": 100 }
]

✅ test_replace_with_merge_objects_executor_style

📖 Description

Merges conditional object into the root using $mergeObjects and $cond.

📥 Input Document

[
  { "product": "Widget", "price": 400 }
]

📌 Pipeline

[
  {
    "$replaceWith": {
      "$mergeObjects": [
        { "type": "standard" },
        {
          "$cond": {
            "if": { "$gte": ["$price", 500] },
            "then": { "type": "premium" },
            "else": {}
          }
        },
        "$$ROOT"
      ]
    }
  }
]

📤 Output

[
  { "type": "standard", "product": "Widget", "price": 400 }
]

✅ test_root_variable_deep_nested_merge_cond

📖 Description

Demonstrates conditional field injection and full merge with root document.

📥 Input Document

[
  { "product": "Gadget", "price": 450 }
]

📌 Pipeline

[
  {
    "$replaceWith": {
      "$mergeObjects": [
        { "default": "basic" },
        {
          "$cond": {
            "if": { "$gte": ["$price", 500] },
            "then": { "premiumField": true },
            "else": {}
          }
        },
        "$$ROOT"
      ]
    }
  }
]

📤 Output

[
  { "default": "basic", "product": "Gadget", "price": 450 }
]


✅ test_now_variable_deep_nested

📖 Description

Adds createdAt using the system variable $$NOW and copies user.

📥 Input Document

[
  { "user": "Alice" }
]

📌 Pipeline

[
  {
    "$addFields": {
      "audit": {
        "createdAt": "$$NOW",
        "createdBy": "$user"
      }
    }
  }
]

📤 Output

[
  {
    "user": "Alice",
    "audit": {
      "createdAt": "<timestamp>",
      "createdBy": "Alice"
    }
  }
]

✅ test_remove_variable_nested_project

📖 Description

Uses $$REMOVE to eliminate a field dynamically during $project.

📥 Input Document

[
  {
    "name": "Alice",
    "privateField": "secret",
    "publicField": "info"
  }
]

📌 Pipeline

[
  {
    "$project": {
      "publicField": 1,
      "privateField": "$$REMOVE",
      "profile": {
        "name": "$name"
      }
    }
  }
]

📤 Output

[
  {
    "publicField": "info",
    "profile": {
      "name": "Alice"
    }
  }
]

✅ test_cluster_time_variable_nested

📖 Description

Adds cluster time using $$CLUSTER_TIME.

📥 Input Document

[
  { "device": "IoT Sensor" }
]

📌 Pipeline

[
  {
    "$addFields": {
      "metadata": {
        "clusterCapturedAt": "$$CLUSTER_TIME",
        "deviceName": "$device"
      }
    }
  }
]

📤 Output

[
  {
    "device": "IoT Sensor",
    "metadata": {
      "clusterCapturedAt": "<cluster_time>",
      "deviceName": "IoT Sensor"
    }
  }
]

✅ test_expression_remove_variable

📖 Description

Removes status field using $$REMOVE during $addFields.

📥 Input Document

[
  { "name": "Charlie", "status": "active" }
]

📌 Pipeline

[
  {
    "$addFields": {
      "status": "$$REMOVE"
    }
  }
]

📤 Output

[
  { "name": "Charlie" }
]

✅ test_expression_now_variable

📖 Description

Projects current timestamp using $$NOW.

📥 Input Document

[
  { "user": "Bob" }
]

📌 Pipeline

[
  { "$project": { "createdAt": "$$NOW" } }
]

📤 Output

[
  { "createdAt": "<timestamp>" }
]


✅ test_function_double_price

📖 Description

Doubles the price using $function operator with a custom lambda.

📥 Input Document

[
  { "price": 100 }
]

📌 Pipeline

[
  {
    "$addFields": {
      "doublePrice": {
        "$function": {
          "body": "lambda price: price * 2",
          "args": ["$price"],
          "lang": "js"
        }
      }
    }
  }
]

📤 Output

[
  { "price": 100, "doublePrice": 200 }
]

✅ test_function_add_two_fields

📖 Description

Adds two fields together using a custom lambda function.

📥 Input Document

[
  { "a": 5, "b": 7 }
]

📌 Pipeline

[
  {
    "$addFields": {
      "sumAB": {
        "$function": {
          "body": "lambda a, b: a + b",
          "args": ["$a", "$b"],
          "lang": "js"
        }
      }
    }
  }
]

📤 Output

[
  { "a": 5, "b": 7, "sumAB": 12 }
]

✅ test_function_pass_fail_status

📖 Description

Evaluates whether a score passes or fails based on threshold logic.

📥 Input Document

[
  { "score": 45 },
  { "score": 75 }
]

📌 Pipeline

[
  {
    "$addFields": {
      "status": {
        "$function": {
          "body": "lambda score: 'pass' if score >= 50 else 'fail'",
          "args": ["$score"],
          "lang": "js"
        }
      }
    }
  }
]

📤 Output

[
  { "score": 45, "status": "fail" },
  { "score": 75, "status": "pass" }
]

✅ test_function_complex_logic

📖 Description

Applies tiered pricing logic based on product category.

📥 Input Document

[
  { "category": "A", "price": 100 },
  { "category": "B", "price": 200 }
]

📌 Pipeline

[
  {
    "$addFields": {
      "adjustedPrice": {
        "$function": {
          "body": "lambda category, price: price * 0.9 if category == 'A' else price * 1.1",
          "args": ["$category", "$price"],
          "lang": "js"
        }
      }
    }
  }
]

📤 Output

[
  { "category": "A", "price": 100, "adjustedPrice": 90.0 },
  { "category": "B", "price": 200, "adjustedPrice": 220.0 }
]


✅ test_expr_inside_match

📖 Description

Uses $expr inside $match to dynamically filter documents based on field values.

📥 Input Document

[
  { "_id": 1, "price": 100 },
  { "_id": 2, "price": 600 },
  { "_id": 3, "price": 400 }
]

📌 Pipeline

[
  {
    "$match": {
      "$expr": { "$gt": ["$price", 500] }
    }
  }
]

📤 Output

[
  { "_id": 2, "price": 600 }
]

✅ test_extra_logic_pipeline

📖 Description

Classifies IoT sensor risk level using $switch, $and, $or, and $lt conditions.

📥 Input Document

{
  "device_id": "sensor-500",
  "temperature": 42,
  "humidity": 82,
  "battery": 18,
  "device_status": "online",
  "signal_strength": -95,
  "location": "ZoneA",
  "uptime_hours": 400
}

📌 Pipeline

[
  {
    "$addFields": {
      "risk_level": {
        "$switch": {
          "branches": [
            {
              "case": {
                "$and": [
                  { "$gte": ["$temperature", 40] },
                  { "$gte": ["$humidity", 80] },
                  { "$lt": ["$battery", 20] }
                ]
              },
              "then": "Critical"
            },
            {
              "case": {
                "$or": [
                  { "$lt": ["$signal_strength", -90] },
                  { "$eq": ["$device_status", "offline"] }
                ]
              },
              "then": "Warning"
            },
            {
              "case": {
                "$and": [
                  { "$gt": ["$uptime_hours", 300] },
                  { "$eq": ["$location", "ZoneA"] }
                ]
              },
              "then": "Maintenance Due"
            }
          ],
          "default": "Normal"
        }
      }
    }
  },
  {
    "$project": {
      "device_id": 1,
      "risk_level": 1,
      "battery": 1,
      "signal_strength": 1,
      "uptime_hours": 1
    }
  }
]

📤 Output

[
  {
    "device_id": "sensor-500",
    "risk_level": "Critical",
    "battery": 18,
    "signal_strength": -95,
    "uptime_hours": 400
  }
]

✅ test_extra_deep_pipeline

📖 Description

This test: - Computes total payment using $reduce - Extracts last tracking event for each shipment using $arrayElemAt and $size - Projects only essential order and customer fields

📥 Input Document

{
  "order_id": "A1001",
  "customer": {
    "name": "Alice Wonderland",
    "loyalty": "gold"
  },
  "shipments": [
    {
      "shipment_id": "S1",
      "tracking_events": [
        { "status": "picked_up", "timestamp": "2023-01-10T10:00:00" },
        { "status": "in_transit", "timestamp": "2023-01-11T12:00:00" }
      ]
    },
    {
      "shipment_id": "S2",
      "tracking_events": [
        { "status": "picked_up", "timestamp": "2023-01-12T09:00:00" },
        { "status": "delivered", "timestamp": "2023-01-14T16:00:00" }
      ]
    }
  ],
  "payment": {
    "transactions": [
      { "txn_id": "T1", "amount": 120 },
      { "txn_id": "T2", "amount": 30 }
    ]
  }
}

📌 Pipeline

[
  {
    "$addFields": {
      "total_payment_amount": {
        "$reduce": {
          "input": "$payment.transactions",
          "initialValue": 0,
          "in": { "$add": ["$$value", "$$this.amount"] }
        }
      }
    }
  },
  {
    "$addFields": {
      "last_status_per_shipment": {
        "$map": {
          "input": "$shipments",
          "as": "shipment",
          "in": {
            "shipment_id": "$$shipment.shipment_id",
            "last_event": {
              "$arrayElemAt": [
                "$$shipment.tracking_events",
                { "$subtract": [ { "$size": "$$shipment.tracking_events" }, 1 ] }
              ]
            }
          }
        }
      }
    }
  },
  {
    "$project": {
      "order_id": 1,
      "customer_name": "$customer.name",
      "customer_loyalty": "$customer.loyalty",
      "total_payment_amount": 1,
      "last_status_per_shipment": 1
    }
  }
]

📤 Output

[
  {
    "order_id": "A1001",
    "customer_name": "Alice Wonderland",
    "customer_loyalty": "gold",
    "total_payment_amount": 150,
    "last_status_per_shipment": [
      {
        "shipment_id": "S1",
        "last_event": {
          "status": "in_transit",
          "timestamp": "2023-01-11T12:00:00"
        }
      },
      {
        "shipment_id": "S2",
        "last_event": {
          "status": "delivered",
          "timestamp": "2023-01-14T16:00:00"
        }
      }
    ]
  }
]