workflow-orchestrator

star 391

Module Loop and Iteration Skill for orchestrating multi-phase penetration testing workflows. Use when coordinating sequential tool execution, managing dependencies between reconnaissance and vulnerability scanning modules, implementing adaptive fallback strategies, or managing workflow state across iterations. Triggers on tasks requiring dependency chaining (e.g., Nmap results feeding into Nuclei), adaptive workflow adjustments based on results, or stateful iteration management to avoid redundant scans.

SHAdd0WTAka By SHAdd0WTAka schedule Updated 6/6/2026

name: workflow-orchestrator description: Module Loop and Iteration Skill for orchestrating multi-phase penetration testing workflows. Use when coordinating sequential tool execution, managing dependencies between reconnaissance and vulnerability scanning modules, implementing adaptive fallback strategies, or managing workflow state across iterations. Triggers on tasks requiring dependency chaining (e.g., Nmap results feeding into Nuclei), adaptive workflow adjustments based on results, or stateful iteration management to avoid redundant scans.

Workflow Orchestrator Skill

Manages cyclic testing workflows with dependency chaining, adaptive re-evaluation, and intelligent state management.

Quick Start

from scripts.workflow_engine import WorkflowEngine
from scripts.dependency_chain import DependencyChain
from scripts.state_manager import StateManager

# Initialize workflow
state = StateManager(target="example.com")
engine = WorkflowEngine(state)

# Define dependency chain
chain = DependencyChain()
chain.add_step("recon", NmapModule(), required=True)
chain.add_step("vuln_scan", NucleiModule(),
               depends_on=["recon"],
               input_mapper=lambda r: {"ports": r.open_ports})
chain.add_step("exploit", ExploitModule(),
               depends_on=["vuln_scan"],
               fallback=EnumModule())  # Fallback if no vulns

# Execute with iterations
results = await engine.execute(chain, max_iterations=3)

Workflow Architecture

┌─────────────────────────────────────────────────────────────┐
│                    WORKFLOW ITERATION                        │
├─────────────────────────────────────────────────────────────┤
│  1. CHECK STATE → Skip if already tested                     │
│  2. EXECUTE MODULE → Run with dependencies                   │
│  3. EVALUATE RESULTS → Success / Empty / Error               │
│  4. ADAPT STRATEGY → Fallback if empty                       │
│  5. CHAIN OUTPUTS → Feed into next modules                   │
│  6. UPDATE STATE → Mark tested, store findings               │
│  7. ITERATE → Continue until max_iterations or complete      │
└─────────────────────────────────────────────────────────────┘

Core Components

1. Dependency Chaining (scripts/dependency_chain.py)

Manages execution order and data flow between modules:

chain = DependencyChain()

# Reconnaissance phase
chain.add_step("port_scan", NmapScanner(), required=True)
chain.add_step("service_enum", ServiceEnumerator(),
               depends_on=["port_scan"])

# Vulnerability phase  
chain.add_step("web_scan", NucleiScanner(),
               depends_on=["port_scan"],
               condition=lambda ctx: 80 in ctx.get("port_scan", {}).ports or
                                    443 in ctx.get("port_scan", {}).ports)

# Exploitation phase
chain.add_step("sql_exploit", SQLMapModule(),
               depends_on=["web_scan"],
               condition=lambda ctx: "sql_injection" in ctx.findings)

See references/dependency_patterns.md for complete patterns.

2. Adaptive Re-evaluation (scripts/adaptive_strategy.py)

Switches strategies based on results:

Result Action
Success Continue to dependent modules
Empty Trigger fallback module
Error Retry with reduced intensity
Timeout Queue for later retry
# Define fallback strategies
fallbacks = {
    "web_scan": {
        "empty_result": "service_deep_scan",
        "error": "reduced_web_scan",
        "timeout": "queue_retry"
    },
    "vuln_scan": {
        "empty_result": "full_port_scan",
        "blocked": "stealth_scan"
    }
}

strategy = AdaptiveStrategy(fallbacks)
new_module = strategy.decide("web_scan", result)

3. State Management (scripts/state_manager.py)

Prevents redundant scans using Smart Memory:

state = StateManager(target="example.com")

# Check before scan
if state.is_tested("nmap", ports="80,443"):
    return state.get_result("nmap")

# Execute and store
result = await nmap.scan("example.com", ports="80,443")
state.store("nmap", result, metadata={"ports": "80,443", "timestamp": now()})

# Track tested paths
state.add_tested_path("/admin", method="GET", params={"id": "1"})

Workflow Patterns

Pattern 1: Standard Pentest Flow

Nmap (ports) → Nuclei (web vulns) → SQLMap (exploit) → Report
     ↓              ↓ (if empty)           ↓
  Service      Fallback to:          Fallback to:
  Enum         Dirbusting            Manual check

Pattern 2: Deep Dive Iteration

Iteration 1: Quick scan (top 100 ports)
Iteration 2: Full scan (all ports) - only if no services found
Iteration 3: Service-specific tools based on banners

Pattern 3: Parallel Branches

            ┌→ Web Path (FFuF)
Nmap Ports ─┤
            └→ Service Exploit (Metasploit)

See references/workflow_templates/ for YAML definitions.

Integration with Zen-AI-Pentest

# Integration with ZenOrchestrator
from core.orchestrator import ZenOrchestrator

class WorkflowOrchestrator(ZenOrchestrator):
    def __init__(self):
        super().__init__()
        self.workflow_engine = WorkflowEngine()
        self.state_manager = StateManager()

    async def run_workflow(self, target: str, strategy: str = "standard"):
        # Load workflow template
        template = WorkflowTemplate.load(f"assets/workflows/{strategy}.yaml")

        # Initialize state
        state = self.state_manager.for_target(target)

        # Execute with iterations
        return await self.workflow_engine.execute(
            template=template,
            state=state,
            max_iterations=template.max_iterations
        )

State Storage Format

{
  "target": "example.com",
  "workflow_id": "uuid",
  "iterations": [
    {
      "iteration": 1,
      "modules": {
        "nmap": {
          "status": "completed",
          "result_hash": "sha256:abc123",
          "timestamp": "2026-03-20T14:46:00Z",
          "inputs": {"ports": "top-100"},
          "outputs": {"open_ports": [80, 443]}
        },
        "nuclei": {
          "status": "completed",
          "depends_on": ["nmap"],
          "inputs": {"ports": [80, 443]}
        }
      }
    }
  ],
  "tested_paths": [
    {"path": "/admin", "method": "GET", "params": {}},
    {"path": "/api/users", "method": "POST", "params": {"id": "1"}}
  ],
  "findings": ["CVE-2021-44228", "sql_injection_login"],
  "adaptations": [
    {"module": "web_scan", "trigger": "empty", "fallback": "dirbusting"}
  ]
}

Error Handling

Error Type Handler Action
Module Crash Restart with reduced threads Log, retry once
Dependency Missing Skip dependent modules Mark as blocked
State Corruption Rebuild from last good Alert operator
Max Iterations Halt workflow Return partial results

Performance Optimization

  • Caching: Store module results keyed by inputs
  • Parallelization: Run independent modules concurrently
  • Early Termination: Stop if critical finding detected
  • Incremental Scans: Only test new paths in iterations

See references/optimization_guide.md.

References

Scripts

  • scripts/workflow_engine.py - Core workflow execution
  • scripts/dependency_chain.py - Module dependency management
  • scripts/adaptive_strategy.py - Fallback and adaptation logic
  • scripts/state_manager.py - Smart Memory integration
  • scripts/parallel_executor.py - Concurrent module execution
Install via CLI
npx skills add https://github.com/SHAdd0WTAka/Zen-Ai-Pentest --skill workflow-orchestrator
Repository Details
star Stars 391
call_split Forks 67
navigation Branch main
article Path SKILL.md
More from Creator