orchestrate.py — Reference Event Loop สำหรับ Cross-Agent Handoffs
หน้าที่ (Purpose)
Script นี้เป็น reference implementation ของ event loop ที่ catch handoff requests ระหว่าง managed agents เช่น agent A อาจขอให้ agent B ทำการประเมินค่า (valuation) โดยการส่ง handoff message ผ่าน output text
⚠️ หมายเหตุ: นี่คือ reference code ที่ใช้เพื่อแสดง pattern เท่านั้น ในผลิตชั้นจริง ควรแทนที่ด้วย workflow engine ที่เหมาะสม เช่น Temporal, Airflow, หรือ Guidewire event bus
เมื่อควรใช้:
- เข้าใจวิธี orchestration pattern ของ managed agents ทำงาน
- ทดสอบ handoff logic ในสภาวะ non-production
- สร้าง orchestrator ที่ปรับตัวตามความต้องการของบริษัทของคุณ
Usage
python3 scripts/orchestrate.py
Environment variables:
SOURCE_SESSION_ID(required): ID ของ agent session ที่เราต้องการ listen สำหรับ handoff requestsAGENT_IDS(optional): JSON object ที่ map agent slug → deployed agent_id
export SOURCE_SESSION_ID="agent-sess-abc123def456"
export AGENT_IDS='{"pitch-agent":"agent-789xyz","earnings-reviewer":"agent-456uvw"}'
python3 scripts/orchestrate.py
ข้อกำหนด:
- Python 3.7+
anthropicSDK (pip install anthropic)jsonschemalibrary (pip install jsonschema)
ภายในทำอะไร (How It Works)
1. Handoff Request Pattern (บรรทัด 40-42)
Script ค้นหา handoff request ที่ฝังอยู่ใน agent’s text output ด้วย regex pattern:
HANDOFF_RE = re.compile(r'\{"type":\s*"handoff_request".*?\}', re.DOTALL)
Handoff blob ต้องมี JSON object เช่น:
{
"type": "handoff_request",
"target_agent": "earnings-reviewer",
"payload": {
"event": "Please analyze the Q4 earnings report",
"context_ref": "doc/earnings-2025-q4.pdf"
}
}
2. Security Allowlist (บรรทัด 23-27)
ก่อนส่ง handoff ไปยัง target agent script จะตรวจสอบว่า target_agent อยู่ใน hardcoded allowlist:
ALLOWED_TARGETS = {
"pitch-agent", "market-researcher", "earnings-reviewer",
"model-builder", "gl-reconciler", "kyc-screener",
"valuation-reviewer", "month-end-closer", "statement-auditor",
}
หากเอา agent ใหม่เข้าระบบ ต้องเพิ่มชื่อใน ALLOWED_TARGETS
3. Schema Validation (บรรทัด 29-38)
Payload ต้องตรงกับ schema นี้:
HANDOFF_PAYLOAD_SCHEMA = {
"type": "object",
"additionalProperties": False,
"required": ["event"],
"properties": {
"event": {"type": "string", "maxLength": 2000},
"context_ref": {"type": "string", "maxLength": 256,
"pattern": r"^[A-Za-z0-9 ._/:#-]+$"},
},
}
eventต้องเป็น string ≤ 2000 ตัวอักษร (required)context_refเป็น optional reference ชี้ไปยังเอกสารหรือบริบท
4. Extract & Validate (บรรทัด 45-61)
ฟังก์ชัน extract_handoff() ดึง handoff จาก text ด้วยขั้นตอน:
def extract_handoff(text: str) -> dict | None:
m = HANDOFF_RE.search(text) # Find the blob
if not m: return None
obj = json.loads(m.group(0)) # Parse JSON
target = obj.get("target_agent")
if target not in ALLOWED_TARGETS: # Allowlist check
return None
jsonschema.validate(instance=payload, schema=HANDOFF_PAYLOAD_SCHEMA) # Schema check
return {"target_agent": target, "payload": payload}
5. Event Loop (บรรทัด 64-82)
ฟังก์ชัน run() คือ main orchestration loop:
def run(source_session_id: str, agent_ids: dict[str, str]) -> None:
client = anthropic.Anthropic()
with client.beta.agents.sessions.stream(session_id=source_session_id) as stream:
for event in stream:
if event.type != "message_delta" or not getattr(event, "text", None):
continue
handoff = extract_handoff(event.text)
if not handoff: continue
target_id = agent_ids.get(handoff["target_agent"])
if not target_id: continue
client.beta.agents.sessions.steer( # Steer the target agent
agent_id=target_id,
input=handoff["payload"]["event"],
)
Loop นี้:
- Stream message deltas จาก source session
- ค้นหา handoff blob ในแต่ละ text chunk
- Validate ด้วย allowlist + schema
- ค้นหา target agent ID จาก
agent_idsmap - Steer (ส่ง input ไป) target agent session
Output
Script ไม่ print อะไร — มันเพียงแค่ listen และ forward handoffs ไปยัง target agents ตามความเงียบ ๆ
หากเกิด error:
- Invalid JSON ในผลลัพธ์ของ agent: silently skipped
- Target agent not in allowlist: silently skipped
- Target agent_id not found: silently skipped
- Missing env var: Python exception จะ exit
ตัวอย่าง (Example)
# Setup: deploy 2 agents first
$ bash scripts/deploy-managed-agent.sh pitch-agent
deployed: pitch-agent
agent id: agent-123abc
$ bash scripts/deploy-managed-agent.sh earnings-reviewer
deployed: earnings-reviewer
agent id: agent-456def
# Start a session with pitch agent
$ AGENT_ID="agent-123abc"
curl -X POST https://api.anthropic.com/v1/agents/$AGENT_ID/sessions \
-H "x-api-key: $ANTHROPIC_API_KEY" \
-d '{}'
# Returns: {"id": "agent-sess-xyz789"}
# In another terminal, run orchestrator
$ export SOURCE_SESSION_ID="agent-sess-xyz789"
$ export AGENT_IDS='{"earnings-reviewer":"agent-456def"}'
$ python3 scripts/orchestrate.py
# (Now listening for handoffs...)
# Agent output contains:
# {"type":"handoff_request","target_agent":"earnings-reviewer","payload":{"event":"Analyze Q4 earnings"}}
# → orchestrate.py extracts it, validates it, then calls:
# client.beta.agents.sessions.steer(agent_id="agent-456def", input="Analyze Q4 earnings")
เกี่ยวข้องกับอะไรอีกบ้าง
- deploy-managed-agent.sh: Deploy agents ที่ต้องการจะ orchestrate
- Managed Agents API (
/v1/agents): ใช้ session.stream() และ session.steer() - Workflow engines: Temporal, Airflow ควรใช้ pattern นี้ เพื่อ implement in production
Security Notes
- Handoff requests ที่ฝังในเอกสาร untrusted อาจถูก exploit ดังนั้น allowlist + schema validation เป็น minimum mitigation
- ในผลิตชั้นจริง ใช้ dedicated tool call หรือ typed SSE event แทน text parsing
- ตรวจสอบให้แน่ใจ AGENT_IDS env var ถูก set ถูกต้องสำหรับ prod