05 — Workflow Engine
🇧🇷 Motor de Workflows
🇬🇧 Workflow Engine
The Problem
You need to run a sequence of steps: validate data, call an API, transform the result, send an email. If a step fails, what happens? If the server dies halfway through?
A workflow engine solves this. You define the steps as nodes in a graph, and the engine executes them in the right order, with retries, queues, and persisted state.
This is how n8n, Zapier, and Power Automate work under the hood.
Architecture
POST /api/v1/workflows → Create workflow
GET /api/v1/workflows/:id → Get workflow
POST /api/v1/workflows/:id/execute → Execute workflow
GET /api/v1/workflows/:id/runs → Execution historyTypeScript Implementation
Workflow definition
typescript
interface Node {
id: string;
type: 'trigger:webhook' | 'action:http' | 'action:transform'
| 'condition:ifelse' | 'action:email' | 'action:log';
config: Record<string, any>;
}
interface Edge {
from: string;
to: string;
condition?: string; // 'success' | 'error' | expression
}
interface Workflow {
id: string;
nodes: Node[];
edges: Edge[];
}DAG Executor
typescript
class WorkflowExecutor {
private state: Map<string, any> = new Map();
async execute(workflow: Workflow, triggerData: any): Promise<void> {
const trigger = workflow.nodes.find(n => n.type.startsWith('trigger:'));
if (!trigger) throw new Error('Workflow without trigger');
this.state.set('trigger', triggerData);
await this.executeNode(workflow, trigger.id);
}
private async executeNode(workflow: Workflow, nodeId: string) {
const node = workflow.nodes.find(n => n.id === nodeId);
if (!node) return;
const result = await this.runNode(node);
this.state.set(nodeId, result);
const nextEdges = workflow.edges.filter(e => e.from === nodeId);
for (const edge of nextEdges) {
if (edge.condition && !this.evalCondition(edge.condition, result)) {
continue;
}
await this.executeNode(workflow, edge.to);
}
}
private async runNode(node: Node): Promise<any> {
switch (node.type) {
case 'action:http':
return fetch(node.config.url, {
method: node.config.method || 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(node.config.body),
}).then(r => r.json());
case 'action:transform':
const fn = new Function('data', node.config.script);
return fn(this.state);
case 'condition:ifelse':
return this.evalField(node.config);
default:
return null;
}
}
}Workflow example
json
{
"nodes": [
{ "id": "trigger", "type": "trigger:webhook", "config": {} },
{ "id": "validate", "type": "action:http", "config": {
"url": "http://validator/api",
"method": "POST"
}},
{ "id": "check", "type": "condition:ifelse", "config": {
"field": "body.valid", "operator": "equals", "value": "true"
}},
{ "id": "process", "type": "action:http", "config": {
"url": "http://processor/api", "method": "POST"
}},
{ "id": "notify", "type": "action:email", "config": {
"to": "admin@bank.com", "subject": "Processed"
}}
],
"edges": [
{ "from": "trigger", "to": "validate" },
{ "from": "validate", "to": "check" },
{ "from": "check", "to": "process", "condition": "success" },
{ "from": "check", "to": "notify", "condition": "error" }
]
}Go Implementation
In Go, each node runs in its own goroutine. The workflow is a pipeline of channels:
go
package main
import (
"context"
"encoding/json"
"fmt"
"net/http"
)
type Node struct {
ID string `json:"id"`
Type string `json:"type"`
Config map[string]interface{} `json:"config"`
}
type Edge struct {
From string `json:"from"`
To string `json:"to"`
Condition string `json:"condition,omitempty"`
}
type Workflow struct {
Nodes []Node `json:"nodes"`
Edges []Edge `json:"edges"`
}
type Executor struct {
state map[string]interface{}
done chan struct{}
}
func NewExecutor() *Executor {
return &Executor{
state: make(map[string]interface{}),
done: make(chan struct{}),
}
}
func (e *Executor) Execute(ctx context.Context, wf *Workflow, triggerData interface{}) error {
// Find trigger node
var trigger *Node
for _, n := range wf.Nodes {
if len(n.Type) > 8 && n.Type[:8] == "trigger:" {
trigger = &n
break
}
}
if trigger == nil {
return fmt.Errorf("no trigger node found")
}
e.state["trigger"] = triggerData
// Build adjacency list
edges := make(map[string][]Edge)
for _, edge := range wf.Edges {
edges[edge.From] = append(edges[edge.From], edge)
}
// Execute DAG
return e.executeNode(ctx, wf, edges, trigger.ID)
}
func (e *Executor) executeNode(ctx context.Context, wf *Workflow,
edges map[string][]Edge, nodeID string) error {
// Find node
var node *Node
for _, n := range wf.Nodes {
if n.ID == nodeID {
node = &n
break
}
}
if node == nil {
return nil
}
// Execute node
result, err := e.runNode(ctx, node)
if err != nil {
return err
}
e.state[nodeID] = result
// Execute children
for _, edge := range edges[nodeID] {
if edge.Condition != "" {
if !e.evaluateCondition(edge.Condition, result) {
continue
}
}
if err := e.executeNode(ctx, wf, edges, edge.To); err != nil {
return err
}
}
return nil
}
func (e *Executor) runNode(ctx context.Context, node *Node) (interface{}, error) {
switch node.Type {
case "action:http":
url, _ := node.Config["url"].(string)
resp, err := http.Get(url)
if err != nil {
return nil, err
}
defer resp.Body.Close()
var result interface{}
json.NewDecoder(resp.Body).Decode(&result)
return result, nil
case "condition:ifelse":
field, _ := node.Config["field"].(string)
value, _ := node.Config["value"].(string)
if data, ok := e.state[field]; ok {
return fmt.Sprintf("%v", data) == value, nil
}
return false, nil
default:
return nil, nil
}
}Key difference: In Go, the executor is truly concurrent with goroutines. Each branch of the workflow can run in parallel. In TypeScript, it's sequential with async/await — simpler to reason about, but less efficient.
Testing
bash
pnpm --filter @banking/workflow-engine dev
curl -X POST http://localhost:3005/api/v1/workflows \
-H "Content-Type: application/json" \
-d '{"nodes":[...],"edges":[...]}'
curl -X POST http://localhost:3005/api/v1/workflows/wf_1/executeLessons Learned
- DAG is the core — A workflow engine without a DAG is just a task queue.
- State must be external — If the server restarts, the workflow needs to pick up where it left off. Redis or PostgreSQL.
- Retry is not optional — HTTP calls fail. Your workflow needs to retry, with backoff.
- Observability — Every execution needs logging, tracing, and visible status. Without it, debugging is impossible.