Specs & Plans
Fase 65 — meta-orchestrator Implementation Plan
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: Build a meta-orchestrator agent that takes a high-level goal (e.g., “prepara mi domingo”) and produces an OrchestrationResult by planning, executing, critiquing, and optionally replanning across the 12+ existing procedural agents (F11-F64). No new LLM is required for sub-agents; the meta layer uses an LLM only for plan, critique, and replan stages.
Architecture: New subpackage packages/jw-agents/src/jw_agents/meta/ with Pydantic models (OrchestrationPlan, Step, StepResult, CritiqueVerdict, OrchestrationResult), a tool registry (Plugin SDK F41 entry-point aware), an executor with topological sort, an LLM planner (constrained F35), and a critique stage that wraps NLI F39 over consolidated findings. The CLI adds jw meta {plan,run,tools} plus the alias jw plan-sunday. The MCP server exposes 3 new tools.
Tech Stack: Python 3.13 · Pydantic v2 · stdlib asyncio · Jinja2 (planner prompt templates) · GBNF gramars (F35) for constrained JSON · jw_finetune.synth.provider.LLMProvider (existing abstraction) · jw_core.fidelity.nli (F39, import-guarded) · jw_core.tracing (F43) · pytest.
Spec: docs/superpowers/specs/2026-06-11-fase-65-meta-orchestrator-design.md
File map
Creates:
packages/jw-agents/src/jw_agents/meta/__init__.pypackages/jw-agents/src/jw_agents/meta/models.pypackages/jw-agents/src/jw_agents/meta/registry.pypackages/jw-agents/src/jw_agents/meta/executor.pypackages/jw-agents/src/jw_agents/meta/planner.pypackages/jw-agents/src/jw_agents/meta/critique.pypackages/jw-agents/src/jw_agents/meta/orchestrator.pypackages/jw-agents/src/jw_agents/meta/prompts/__init__.pypackages/jw-agents/src/jw_agents/meta/prompts/planner_es.j2packages/jw-agents/src/jw_agents/meta/prompts/planner_en.j2packages/jw-agents/src/jw_agents/meta/prompts/planner_pt.j2packages/jw-agents/src/jw_agents/meta/grammars/__init__.pypackages/jw-agents/src/jw_agents/meta/grammars/plan.gbnfpackages/jw-agents/src/jw_agents/meta/builtin_tools.pypackages/jw-agents/tests/meta/__init__.pypackages/jw-agents/tests/meta/test_models.pypackages/jw-agents/tests/meta/test_registry.pypackages/jw-agents/tests/meta/test_executor.pypackages/jw-agents/tests/meta/test_planner.pypackages/jw-agents/tests/meta/test_critique.pypackages/jw-agents/tests/meta/test_orchestrator.pypackages/jw-agents/tests/meta/test_builtin_tools.pypackages/jw-agents/tests/meta/test_cli.pypackages/jw-agents/tests/meta/test_mcp_integration.pypackages/jw-agents/tests/meta/fixtures/__init__.pypackages/jw-agents/tests/meta/fixtures/golden_goals.jsonlpackages/jw-cli/src/jw_cli/commands/meta.pydocs/guias/meta-orchestrator.md
Modifies:
packages/jw-cli/src/jw_cli/main.py— registermetasubcommand +plan-sundayalias.packages/jw-mcp/src/jw_mcp/server.py— expose 3 MCP tools (meta_plan_goal,meta_run_plan,meta_list_tools).packages/jw-agents/pyproject.toml— add Jinja2 dep if not present (likely already there).docs/ROADMAP.md— add Fase 65 section.docs/README.md— link new guide.
Task 1: Scaffold meta/ package + Pydantic models
Files:
-
Create:
packages/jw-agents/src/jw_agents/meta/__init__.py -
Create:
packages/jw-agents/src/jw_agents/meta/models.py -
Create:
packages/jw-agents/tests/meta/__init__.py -
Create:
packages/jw-agents/tests/meta/test_models.py -
Step 1: Write the failing test
# packages/jw-agents/tests/meta/test_models.py
"""Pydantic models for the meta-orchestrator."""
from __future__ import annotations
import pytest
from jw_agents.meta.models import (
Step,
OrchestrationPlan,
StepResult,
CritiqueVerdict,
OrchestrationResult,
)
def test_step_minimal_pending() -> None:
s = Step(id="step-1", tool="verse.explain", args={"reference": "John 3:16"})
assert s.status == "pending"
assert s.depends_on == []
def test_step_with_dependencies() -> None:
s = Step(
id="step-2",
tool="apologetics.research",
args={"question": "What is the soul?"},
depends_on=["step-1"],
rationale="Build on the prior verse context.",
)
assert s.depends_on == ["step-1"]
assert s.rationale.startswith("Build on")
def test_plan_rejects_self_dep() -> None:
with pytest.raises(ValueError):
OrchestrationPlan(
goal="x",
steps=[Step(id="step-1", tool="x", args={}, depends_on=["step-1"])],
)
def test_plan_rejects_missing_dep_target() -> None:
with pytest.raises(ValueError):
OrchestrationPlan(
goal="x",
steps=[Step(id="step-1", tool="x", args={}, depends_on=["step-99"])],
)
def test_plan_accepts_valid_dag() -> None:
plan = OrchestrationPlan(
goal="prepare meeting",
steps=[
Step(id="step-1", tool="meeting.workbook", args={}),
Step(id="step-2", tool="meeting.public_talk_outline", args={}, depends_on=["step-1"]),
],
)
assert len(plan.steps) == 2
assert plan.plan_revision == 0
def test_step_result_pydantic() -> None:
r = StepResult(
step_id="step-1",
agent_result={"findings": [], "agent_name": "verse_explainer"},
elapsed_ms=42,
)
assert r.error is None
assert r.tokens_used == 0
def test_critique_verdict_minimal() -> None:
v = CritiqueVerdict(overall_ok=True, findings_per_step={"step-1": 5})
assert v.suggested_replan is None
assert v.nli_warnings == []
def test_orchestration_result_round_trip() -> None:
plan = OrchestrationPlan(goal="x", steps=[Step(id="step-1", tool="t", args={})])
res = OrchestrationResult(
plan=plan,
step_results=[],
critique=CritiqueVerdict(overall_ok=False, findings_per_step={}),
consolidated_findings=[],
total_elapsed_ms=0,
total_tokens=0,
)
dumped = res.model_dump()
rehydrated = OrchestrationResult.model_validate(dumped)
assert rehydrated.plan.goal == "x"
- Step 2: Run test to verify it fails
Run: uv run pytest packages/jw-agents/tests/meta/test_models.py -v
Expected: FAIL — module not found.
- Step 3: Implement the models
# packages/jw-agents/src/jw_agents/meta/__init__.py
"""jw_agents.meta — meta-orchestrator over existing procedural agents.
Public API:
from jw_agents.meta import MetaOrchestrator, OrchestrationPlan, ...
"""
from __future__ import annotations
from jw_agents.meta.models import (
Step,
OrchestrationPlan,
StepResult,
CritiqueVerdict,
OrchestrationResult,
)
from jw_agents.meta.registry import (
register_tool,
list_tools,
get_tool,
ToolNotFound,
)
__all__ = [
"Step",
"OrchestrationPlan",
"StepResult",
"CritiqueVerdict",
"OrchestrationResult",
"register_tool",
"list_tools",
"get_tool",
"ToolNotFound",
]
# packages/jw-agents/src/jw_agents/meta/models.py
"""Pydantic models for the meta-orchestrator."""
from __future__ import annotations
from typing import Any, Literal
from pydantic import BaseModel, Field, model_validator
StepStatus = Literal["pending", "running", "completed", "failed", "skipped"]
class Step(BaseModel):
"""A single step in an orchestration plan."""
id: str
tool: str
args: dict[str, Any] = Field(default_factory=dict)
depends_on: list[str] = Field(default_factory=list)
status: StepStatus = "pending"
rationale: str = ""
class OrchestrationPlan(BaseModel):
"""A topologically valid DAG of steps to satisfy `goal`."""
goal: str
language: Literal["en", "es", "pt"] = "es"
steps: list[Step] = Field(default_factory=list)
congregation: str | None = None
plan_revision: int = 0
@model_validator(mode="after")
def _validate_dag(self) -> "OrchestrationPlan":
ids = {s.id for s in self.steps}
for s in self.steps:
for dep in s.depends_on:
if dep == s.id:
raise ValueError(f"step {s.id} depends on itself")
if dep not in ids:
raise ValueError(f"step {s.id} depends on missing {dep}")
return self
class StepResult(BaseModel):
"""Result of executing one step."""
step_id: str
agent_result: dict[str, Any]
error: str | None = None
elapsed_ms: int
tokens_used: int = 0
class CritiqueVerdict(BaseModel):
"""Outcome of the post-execution critique stage."""
overall_ok: bool
findings_per_step: dict[str, int] = Field(default_factory=dict)
nli_warnings: list[str] = Field(default_factory=list)
suggested_replan: Step | None = None
reason: str = ""
class OrchestrationResult(BaseModel):
"""Final result of a `MetaOrchestrator.run()` call."""
plan: OrchestrationPlan
step_results: list[StepResult] = Field(default_factory=list)
critique: CritiqueVerdict
consolidated_findings: list[dict[str, Any]] = Field(default_factory=list)
total_elapsed_ms: int = 0
total_tokens: int = 0
trace_path: str | None = None
- Step 4: Run test to verify it passes
Run: uv run pytest packages/jw-agents/tests/meta/test_models.py -v
Expected: 8 passed.
- Step 5: Commit
git add packages/jw-agents/src/jw_agents/meta packages/jw-agents/tests/meta
git commit -m "feat(jw-agents): scaffold meta/ package with Pydantic models for orchestration"
Task 2: Tool registry + Plugin SDK F41 discovery
Files:
-
Create:
packages/jw-agents/src/jw_agents/meta/registry.py -
Create:
packages/jw-agents/tests/meta/test_registry.py -
Step 1: Write the failing test
# packages/jw-agents/tests/meta/test_registry.py
"""Tool registry for the meta-orchestrator."""
from __future__ import annotations
import pytest
from jw_agents.meta.registry import (
register_tool,
list_tools,
get_tool,
ToolNotFound,
clear_registry,
)
@pytest.fixture(autouse=True)
def _clean() -> None:
clear_registry()
yield
clear_registry()
async def _fake_agent(arg1: str = "x") -> dict:
return {"agent_name": "fake", "findings": [], "echo": arg1}
def test_register_and_list_tool() -> None:
register_tool(
name="fake.tool",
callable_=_fake_agent,
description="A fake tool.",
args_schema={"arg1": "str"},
)
tools = list_tools()
assert "fake.tool" in {t.name for t in tools}
def test_register_duplicate_overrides_with_warning(caplog) -> None:
register_tool(name="x", callable_=_fake_agent, description="A", args_schema={})
register_tool(name="x", callable_=_fake_agent, description="B", args_schema={})
tools = {t.name: t for t in list_tools()}
assert tools["x"].description == "B"
def test_get_tool_returns_callable() -> None:
register_tool(name="fake.tool", callable_=_fake_agent, description="x", args_schema={})
tool = get_tool("fake.tool")
assert callable(tool.callable_)
def test_get_tool_missing_raises() -> None:
with pytest.raises(ToolNotFound):
get_tool("does.not.exist")
def test_list_tools_empty_returns_empty_list() -> None:
assert list_tools() == []
- Step 2: Run test to verify it fails
Run: uv run pytest packages/jw-agents/tests/meta/test_registry.py -v
Expected: FAIL.
- Step 3: Implement the registry
# packages/jw-agents/src/jw_agents/meta/registry.py
"""Tool registry for the meta-orchestrator.
Tools are registered at import time (builtin) or discovered via the
Plugin SDK F41 entry-point group `jw_agent_toolkit.agents`.
"""
from __future__ import annotations
import logging
from collections.abc import Awaitable, Callable
from importlib.metadata import entry_points
from typing import Any
from pydantic import BaseModel
logger = logging.getLogger(__name__)
ToolCallable = Callable[..., Awaitable[dict[str, Any]]]
class ToolDescriptor(BaseModel):
name: str
description: str
args_schema: dict[str, str]
callable_: ToolCallable
model_config = {"arbitrary_types_allowed": True}
class ToolNotFound(KeyError):
"""Raised when `get_tool(name)` finds nothing."""
_REGISTRY: dict[str, ToolDescriptor] = {}
def register_tool(
*,
name: str,
callable_: ToolCallable,
description: str,
args_schema: dict[str, str],
) -> None:
"""Register a tool (or override an existing one with a warning)."""
if name in _REGISTRY:
logger.warning("meta: overriding existing tool %r", name)
_REGISTRY[name] = ToolDescriptor(
name=name,
description=description,
args_schema=args_schema,
callable_=callable_,
)
def get_tool(name: str) -> ToolDescriptor:
"""Return the descriptor for `name` or raise `ToolNotFound`."""
if name not in _REGISTRY:
raise ToolNotFound(name)
return _REGISTRY[name]
def list_tools() -> list[ToolDescriptor]:
"""All currently-registered tools, in insertion order."""
return list(_REGISTRY.values())
def clear_registry() -> None:
"""Reset the registry (for tests only)."""
_REGISTRY.clear()
def discover_plugin_tools() -> int:
"""Discover tools via Plugin SDK F41 entry-points. Returns count discovered."""
count = 0
try:
eps = entry_points(group="jw_agent_toolkit.agents")
except Exception as exc: # noqa: BLE001
logger.warning("meta: entry_points discovery failed: %s", exc)
return 0
for ep in eps:
try:
obj = ep.load()
register_tool(
name=f"plugin.{ep.name}",
callable_=obj,
description=getattr(obj, "__doc__", "").strip().splitlines()[0]
if getattr(obj, "__doc__", None) else "Plugin tool.",
args_schema={},
)
count += 1
except Exception as exc: # noqa: BLE001
logger.warning("meta: failed to load plugin %s: %s", ep.name, exc)
return count
- Step 4: Run test to verify it passes
Run: uv run pytest packages/jw-agents/tests/meta/test_registry.py -v
Expected: 5 passed.
- Step 5: Commit
git add packages/jw-agents/src/jw_agents/meta/registry.py packages/jw-agents/tests/meta/test_registry.py
git commit -m "feat(jw-agents): meta tool registry + Plugin SDK F41 discovery"
Task 3: Executor with topological sort + tracing
Files:
-
Create:
packages/jw-agents/src/jw_agents/meta/executor.py -
Create:
packages/jw-agents/tests/meta/test_executor.py -
Step 1: Write the failing test
# packages/jw-agents/tests/meta/test_executor.py
"""Executor tests — topological sort + dispatch + error handling."""
from __future__ import annotations
import pytest
from jw_agents.meta.executor import Executor, _topological_sort, ExecutorTimeout
from jw_agents.meta.models import OrchestrationPlan, Step
from jw_agents.meta.registry import register_tool, clear_registry
@pytest.fixture(autouse=True)
def _clean() -> None:
clear_registry()
yield
clear_registry()
async def _ok_tool(text: str = "ok") -> dict:
return {"agent_name": "ok_tool", "findings": [{"text": text}]}
async def _err_tool(**_: object) -> dict:
raise RuntimeError("boom")
async def _slow_tool(**_: object) -> dict:
import asyncio
await asyncio.sleep(5)
return {"agent_name": "slow"}
def _register_ok() -> None:
register_tool(name="ok", callable_=_ok_tool, description="ok", args_schema={"text": "str"})
def _register_err() -> None:
register_tool(name="err", callable_=_err_tool, description="err", args_schema={})
def _register_slow() -> None:
register_tool(name="slow", callable_=_slow_tool, description="slow", args_schema={})
# --- topological sort ---
def test_topological_sort_linear() -> None:
steps = [
Step(id="a", tool="ok", args={}),
Step(id="b", tool="ok", args={}, depends_on=["a"]),
Step(id="c", tool="ok", args={}, depends_on=["b"]),
]
order = _topological_sort(steps)
assert order == ["a", "b", "c"]
def test_topological_sort_diamond() -> None:
steps = [
Step(id="a", tool="ok", args={}),
Step(id="b", tool="ok", args={}, depends_on=["a"]),
Step(id="c", tool="ok", args={}, depends_on=["a"]),
Step(id="d", tool="ok", args={}, depends_on=["b", "c"]),
]
order = _topological_sort(steps)
assert order[0] == "a" and order[-1] == "d"
assert order.index("b") < order.index("d")
assert order.index("c") < order.index("d")
# --- execution ---
@pytest.mark.asyncio
async def test_execute_linear_plan() -> None:
_register_ok()
plan = OrchestrationPlan(
goal="x",
steps=[
Step(id="a", tool="ok", args={"text": "first"}),
Step(id="b", tool="ok", args={"text": "second"}, depends_on=["a"]),
],
)
ex = Executor()
results = await ex.run(plan)
assert len(results) == 2
assert results[0].error is None
assert results[0].agent_result["findings"][0]["text"] == "first"
@pytest.mark.asyncio
async def test_execute_with_failing_step_propagates_error_not_crash() -> None:
_register_ok()
_register_err()
plan = OrchestrationPlan(
goal="x",
steps=[
Step(id="a", tool="err", args={}),
Step(id="b", tool="ok", args={"text": "after err"}, depends_on=["a"]),
],
)
ex = Executor()
results = await ex.run(plan)
# a fails, b is skipped (or runs depending on policy). Default policy: skip.
by_id = {r.step_id: r for r in results}
assert by_id["a"].error is not None
assert "boom" in by_id["a"].error
assert by_id["b"].agent_result == {} or by_id["b"].error is not None
@pytest.mark.asyncio
async def test_execute_respects_timeout() -> None:
_register_slow()
plan = OrchestrationPlan(
goal="x",
steps=[Step(id="a", tool="slow", args={})],
)
ex = Executor(timeout_s=0.5)
with pytest.raises(ExecutorTimeout):
await ex.run(plan)
@pytest.mark.asyncio
async def test_execute_unknown_tool_marks_step_failed() -> None:
plan = OrchestrationPlan(
goal="x",
steps=[Step(id="a", tool="nope", args={})],
)
ex = Executor()
results = await ex.run(plan)
assert results[0].error is not None
assert "nope" in results[0].error
- Step 2: Run test to verify it fails
Run: uv run pytest packages/jw-agents/tests/meta/test_executor.py -v
Expected: FAIL.
- Step 3: Implement the executor
# packages/jw-agents/src/jw_agents/meta/executor.py
"""Executor for OrchestrationPlan — topological sort + async dispatch."""
from __future__ import annotations
import asyncio
import logging
import time
from collections.abc import Iterable
from jw_agents.meta.models import OrchestrationPlan, Step, StepResult
from jw_agents.meta.registry import ToolNotFound, get_tool
logger = logging.getLogger(__name__)
class ExecutorTimeout(TimeoutError):
"""Raised when the whole plan exceeds the wall-clock cap."""
def _topological_sort(steps: list[Step]) -> list[str]:
"""Kahn's algorithm. Raises ValueError on cycles."""
in_degree: dict[str, int] = {s.id: len(s.depends_on) for s in steps}
children: dict[str, list[str]] = {s.id: [] for s in steps}
for s in steps:
for dep in s.depends_on:
children[dep].append(s.id)
queue: list[str] = [sid for sid, deg in in_degree.items() if deg == 0]
order: list[str] = []
while queue:
node = queue.pop(0)
order.append(node)
for child in children[node]:
in_degree[child] -= 1
if in_degree[child] == 0:
queue.append(child)
if len(order) != len(steps):
raise ValueError("cycle detected in plan")
return order
class Executor:
"""Run an `OrchestrationPlan` step by step, respecting deps and timeout."""
def __init__(self, *, timeout_s: float = 120.0, on_step_done=None) -> None:
self._timeout_s = timeout_s
self._on_step_done = on_step_done
async def run(self, plan: OrchestrationPlan) -> list[StepResult]:
order = _topological_sort(plan.steps)
by_id = {s.id: s for s in plan.steps}
results: dict[str, StepResult] = {}
deadline = asyncio.get_event_loop().time() + self._timeout_s
for step_id in order:
if asyncio.get_event_loop().time() > deadline:
raise ExecutorTimeout(f"plan exceeded {self._timeout_s}s")
step = by_id[step_id]
# Skip if any dep failed
if any(results.get(dep) and results[dep].error for dep in step.depends_on):
results[step_id] = StepResult(
step_id=step_id,
agent_result={},
error=f"skipped: upstream {step.depends_on} failed",
elapsed_ms=0,
)
continue
t0 = time.perf_counter()
try:
tool = get_tool(step.tool)
remaining = max(0.0, deadline - asyncio.get_event_loop().time())
result = await asyncio.wait_for(tool.callable_(**step.args), timeout=remaining)
elapsed_ms = int((time.perf_counter() - t0) * 1000)
step_result = StepResult(
step_id=step_id,
agent_result=result if isinstance(result, dict) else {"value": result},
elapsed_ms=elapsed_ms,
)
except ToolNotFound:
step_result = StepResult(
step_id=step_id,
agent_result={},
error=f"tool not found: {step.tool}",
elapsed_ms=int((time.perf_counter() - t0) * 1000),
)
except asyncio.TimeoutError:
# Plan-wide timeout
raise ExecutorTimeout(f"step {step_id} exhausted plan deadline")
except Exception as exc: # noqa: BLE001
step_result = StepResult(
step_id=step_id,
agent_result={},
error=f"{type(exc).__name__}: {exc}",
elapsed_ms=int((time.perf_counter() - t0) * 1000),
)
results[step_id] = step_result
if self._on_step_done is not None:
self._on_step_done(step, step_result)
return [results[sid] for sid in order]
- Step 4: Add pytest-asyncio if missing
Check that packages/jw-agents/pyproject.toml has pytest-asyncio in dev deps; if not, add it. (Most likely already present.)
- Step 5: Run test to verify it passes
Run: uv run pytest packages/jw-agents/tests/meta/test_executor.py -v
Expected: 6 passed.
- Step 6: Commit
git add packages/jw-agents/src/jw_agents/meta/executor.py packages/jw-agents/tests/meta/test_executor.py
git commit -m "feat(jw-agents): meta executor with topological sort and timeout"
Task 4: GBNF grammar + Jinja2 planner prompts (es/en/pt)
Files:
-
Create:
packages/jw-agents/src/jw_agents/meta/grammars/__init__.py -
Create:
packages/jw-agents/src/jw_agents/meta/grammars/plan.gbnf -
Create:
packages/jw-agents/src/jw_agents/meta/prompts/__init__.py -
Create:
packages/jw-agents/src/jw_agents/meta/prompts/planner_es.j2 -
Create:
packages/jw-agents/src/jw_agents/meta/prompts/planner_en.j2 -
Create:
packages/jw-agents/src/jw_agents/meta/prompts/planner_pt.j2 -
Step 1: Write grammar
# packages/jw-agents/src/jw_agents/meta/grammars/plan.gbnf
root ::= ws? plan ws?
plan ::= "{" ws "\"goal\"" ws ":" ws string ws "," ws "\"language\"" ws ":" ws lang ws "," ws "\"steps\"" ws ":" ws steps ws "}"
lang ::= "\"" ("en" | "es" | "pt") "\""
steps ::= "[" ws "]" | "[" ws step (ws "," ws step)* ws "]"
step ::= "{" ws "\"id\"" ws ":" ws string ws "," ws "\"tool\"" ws ":" ws string ws "," ws "\"args\"" ws ":" ws object ws "," ws "\"depends_on\"" ws ":" ws str_array ws "," ws "\"rationale\"" ws ":" ws string ws "}"
str_array ::= "[" ws "]" | "[" ws string (ws "," ws string)* ws "]"
object ::= "{" ws "}" | "{" ws kv (ws "," ws kv)* ws "}"
kv ::= string ws ":" ws value
value ::= string | number | "true" | "false" | "null" | object | array
array ::= "[" ws "]" | "[" ws value (ws "," ws value)* ws "]"
string ::= "\"" chars "\""
chars ::= ([^"\\] | "\\" any)*
any ::= ["\\/bfnrt] | "u" hex hex hex hex
hex ::= [0-9a-fA-F]
number ::= "-"? ([0-9] | [1-9] [0-9]+) ("." [0-9]+)? ([eE] ("+"|"-")? [0-9]+)?
ws ::= ([ \t\n\r])*
- Step 2: Write Jinja2 templates
{# packages/jw-agents/src/jw_agents/meta/prompts/planner_es.j2 #}
Eres un planificador de tareas para Testigos de Jehová. Tu objetivo es
elegir, EN ORDEN, qué herramientas (tools) ejecutar para satisfacer
"{{ goal }}" con citas verificables de wol.jw.org.
Idioma de salida deseado: {{ language }}.
{% if congregation %}Congregación activa: {{ congregation }}.{% endif %}
Herramientas disponibles:
{% for tool in tools %}
- {{ tool.name }}: {{ tool.description }}
args: {{ tool.args_schema }}
{% endfor %}
Devuelve JSON estricto con este shape exacto:
{
"goal": "{{ goal }}",
"language": "{{ language }}",
"steps": [
{
"id": "step-1",
"tool": "<nombre exacto de tool>",
"args": {...},
"depends_on": [],
"rationale": "..."
}
]
}
Reglas duras:
- Máximo {{ max_steps }} steps.
- NO inventes nombres de tool. Si el objetivo no puede satisfacerse,
devuelve {"goal":"...","language":"{{ language }}","steps":[]} con rationale vacío.
- Cada `depends_on` debe referenciar `id` de un step previo.
- Sin texto extra fuera del JSON.
{# packages/jw-agents/src/jw_agents/meta/prompts/planner_en.j2 #}
You are a task planner for Jehovah's Witnesses. Your job is to choose,
IN ORDER, which tools to execute to satisfy "{{ goal }}" with verifiable
wol.jw.org citations.
Desired output language: {{ language }}.
{% if congregation %}Active congregation: {{ congregation }}.{% endif %}
Available tools:
{% for tool in tools %}
- {{ tool.name }}: {{ tool.description }}
args: {{ tool.args_schema }}
{% endfor %}
Return strict JSON with this exact shape:
{
"goal": "{{ goal }}",
"language": "{{ language }}",
"steps": [
{
"id": "step-1",
"tool": "<exact tool name>",
"args": {...},
"depends_on": [],
"rationale": "..."
}
]
}
Hard rules:
- At most {{ max_steps }} steps.
- DO NOT invent tool names. If the goal cannot be satisfied, return
{"goal":"...","language":"{{ language }}","steps":[]} with empty rationale.
- Each `depends_on` must reference a prior step `id`.
- No prose outside the JSON.
{# packages/jw-agents/src/jw_agents/meta/prompts/planner_pt.j2 #}
Você é um planificador de tarefas para Testemunhas de Jeová. Escolha,
EM ORDEM, quais ferramentas executar para satisfazer "{{ goal }}" com
citações verificáveis de wol.jw.org.
Idioma de saída desejado: {{ language }}.
{% if congregation %}Congregação ativa: {{ congregation }}.{% endif %}
Ferramentas disponíveis:
{% for tool in tools %}
- {{ tool.name }}: {{ tool.description }}
args: {{ tool.args_schema }}
{% endfor %}
Devolva JSON estrito com este formato exato:
{
"goal": "{{ goal }}",
"language": "{{ language }}",
"steps": [
{
"id": "step-1",
"tool": "<nome exato>",
"args": {...},
"depends_on": [],
"rationale": "..."
}
]
}
Regras:
- No máximo {{ max_steps }} steps.
- NÃO invente nomes. Se o objetivo não pode ser satisfeito, devolva
{"goal":"...","language":"{{ language }}","steps":[]}.
- Cada `depends_on` referencia um `id` prévio.
- Sem texto fora do JSON.
- Step 3: Smoke test the templates load
uv run python -c "
from pathlib import Path
from jinja2 import Environment, FileSystemLoader, StrictUndefined
root = Path('packages/jw-agents/src/jw_agents/meta/prompts')
env = Environment(loader=FileSystemLoader(str(root)), undefined=StrictUndefined)
for name in ['planner_es.j2', 'planner_en.j2', 'planner_pt.j2']:
out = env.get_template(name).render(goal='X', language='es', tools=[], congregation=None, max_steps=8)
assert 'X' in out, name
print('ok')
"
- Step 4: Commit
git add packages/jw-agents/src/jw_agents/meta/prompts packages/jw-agents/src/jw_agents/meta/grammars
git commit -m "feat(jw-agents): planner Jinja2 prompts (es/en/pt) + GBNF grammar"
Task 5: LLM planner with FakeProvider for tests
Files:
-
Create:
packages/jw-agents/src/jw_agents/meta/planner.py -
Create:
packages/jw-agents/tests/meta/test_planner.py -
Step 1: Write the failing test
# packages/jw-agents/tests/meta/test_planner.py
"""LLM planner tests (FakeLLMProvider, no network)."""
from __future__ import annotations
import json
import pytest
from jw_agents.meta.planner import Planner
from jw_agents.meta.registry import register_tool, clear_registry, ToolDescriptor
from jw_agents.meta.models import OrchestrationPlan
@pytest.fixture(autouse=True)
def _clean() -> None:
clear_registry()
register_tool(name="meeting.workbook", callable_=_noop, description="weekly workbook", args_schema={"language": "str"})
register_tool(name="meeting.public_talk_outline", callable_=_noop, description="talk outline", args_schema={"topic": "str"})
register_tool(name="export.study_sheet", callable_=_noop, description="export", args_schema={"format": "str"})
yield
clear_registry()
async def _noop(**_: object) -> dict:
return {"agent_name": "noop", "findings": []}
class FakeLLMProvider:
"""Returns a pre-canned JSON string for a known goal pattern."""
name = "fake"
model = "fake-planner"
def __init__(self, response_text: str) -> None:
self._text = response_text
self.calls = 0
async def acomplete(self, prompt: str) -> str:
self.calls += 1
return self._text
@pytest.mark.asyncio
async def test_planner_returns_valid_plan_from_fake() -> None:
response = json.dumps({
"goal": "prepara mi domingo",
"language": "es",
"steps": [
{
"id": "step-1",
"tool": "meeting.workbook",
"args": {"language": "es"},
"depends_on": [],
"rationale": "descubrir programa de la semana",
},
{
"id": "step-2",
"tool": "meeting.public_talk_outline",
"args": {"topic": "amor"},
"depends_on": ["step-1"],
"rationale": "build outline from workbook hints",
},
],
})
planner = Planner(llm=FakeLLMProvider(response))
plan = await planner.plan(goal="prepara mi domingo", language="es")
assert isinstance(plan, OrchestrationPlan)
assert len(plan.steps) == 2
assert plan.steps[1].depends_on == ["step-1"]
@pytest.mark.asyncio
async def test_planner_rejects_unknown_tool() -> None:
response = json.dumps({
"goal": "x",
"language": "es",
"steps": [
{"id": "s1", "tool": "nope.does_not_exist", "args": {}, "depends_on": [], "rationale": "x"}
],
})
planner = Planner(llm=FakeLLMProvider(response))
with pytest.raises(ValueError, match="unknown tool"):
await planner.plan(goal="x", language="es")
@pytest.mark.asyncio
async def test_planner_rejects_invalid_json() -> None:
planner = Planner(llm=FakeLLMProvider("not json at all"))
with pytest.raises(ValueError, match="invalid JSON"):
await planner.plan(goal="x", language="es")
@pytest.mark.asyncio
async def test_planner_respects_max_steps_cap() -> None:
steps = [
{"id": f"s{i}", "tool": "meeting.workbook", "args": {}, "depends_on": [], "rationale": "x"}
for i in range(20)
]
response = json.dumps({"goal": "x", "language": "es", "steps": steps})
planner = Planner(llm=FakeLLMProvider(response), max_steps=5)
with pytest.raises(ValueError, match="too many steps"):
await planner.plan(goal="x", language="es")
- Step 2: Run test to verify it fails
Run: uv run pytest packages/jw-agents/tests/meta/test_planner.py -v
Expected: FAIL.
- Step 3: Implement the planner
# packages/jw-agents/src/jw_agents/meta/planner.py
"""LLM planner stage of the meta-orchestrator."""
from __future__ import annotations
import json
import logging
from pathlib import Path
from typing import Any, Protocol
from jinja2 import Environment, FileSystemLoader, StrictUndefined
from jw_agents.meta.models import OrchestrationPlan
from jw_agents.meta.registry import list_tools
logger = logging.getLogger(__name__)
_PROMPTS_DIR = Path(__file__).parent / "prompts"
class LLMProviderLike(Protocol):
name: str
async def acomplete(self, prompt: str) -> str: ...
class Planner:
"""LLM-driven planner producing an `OrchestrationPlan`."""
def __init__(self, *, llm: LLMProviderLike, max_steps: int = 8) -> None:
self._llm = llm
self._max_steps = max_steps
self._jinja = Environment(
loader=FileSystemLoader(str(_PROMPTS_DIR)),
undefined=StrictUndefined,
)
async def plan(
self,
*,
goal: str,
language: str = "es",
congregation: str | None = None,
) -> OrchestrationPlan:
tools = list_tools()
template_name = f"planner_{language}.j2"
try:
template = self._jinja.get_template(template_name)
except Exception:
logger.warning("meta: language %s has no template, falling back to en", language)
template = self._jinja.get_template("planner_en.j2")
prompt = template.render(
goal=goal,
language=language,
tools=tools,
congregation=congregation,
max_steps=self._max_steps,
)
raw = await self._llm.acomplete(prompt)
try:
payload: dict[str, Any] = json.loads(raw)
except json.JSONDecodeError as exc:
raise ValueError(f"invalid JSON from planner: {exc}") from exc
steps_raw = payload.get("steps", [])
if len(steps_raw) > self._max_steps:
raise ValueError(f"too many steps: {len(steps_raw)} > {self._max_steps}")
# Validate tool names against registry
known = {t.name for t in tools}
for s in steps_raw:
if s.get("tool") not in known:
raise ValueError(f"unknown tool: {s.get('tool')}")
plan = OrchestrationPlan(
goal=payload.get("goal", goal),
language=payload.get("language", language),
steps=steps_raw,
congregation=congregation,
)
return plan
- Step 4: Run test to verify it passes
Run: uv run pytest packages/jw-agents/tests/meta/test_planner.py -v
Expected: 4 passed.
- Step 5: Commit
git add packages/jw-agents/src/jw_agents/meta/planner.py packages/jw-agents/tests/meta/test_planner.py
git commit -m "feat(jw-agents): meta planner with Jinja2 prompts and tool validation"
Task 6: Critique stage with NLI F39 (import-guarded)
Files:
-
Create:
packages/jw-agents/src/jw_agents/meta/critique.py -
Create:
packages/jw-agents/tests/meta/test_critique.py -
Step 1: Write the failing test
# packages/jw-agents/tests/meta/test_critique.py
"""Critique stage tests — NLI verification and replan suggestion."""
from __future__ import annotations
import pytest
from jw_agents.meta.critique import Critique
from jw_agents.meta.models import OrchestrationPlan, Step, StepResult
class FakeVerdict:
def __init__(self, verdict: str, score: float = 0.9) -> None:
self.verdict = verdict
self.score = score
class FakeNLI:
def __init__(self, verdict: str = "entails") -> None:
self._verdict = verdict
self.calls = 0
def evaluate_entailment(self, *, claim: str, premise: str) -> FakeVerdict:
self.calls += 1
return FakeVerdict(self._verdict)
def _make_step_result(step_id: str, findings: list[dict]) -> StepResult:
return StepResult(
step_id=step_id,
agent_result={"findings": findings, "agent_name": "t"},
elapsed_ms=10,
)
def test_critique_zero_findings_overall_not_ok() -> None:
plan = OrchestrationPlan(goal="x", steps=[Step(id="a", tool="t", args={})])
results = [_make_step_result("a", [])]
verdict = Critique(nli=None).run(plan=plan, step_results=results)
assert verdict.overall_ok is False
assert verdict.suggested_replan is not None
def test_critique_all_entails_overall_ok() -> None:
plan = OrchestrationPlan(goal="x", steps=[Step(id="a", tool="t", args={})])
findings = [
{"summary": "John 3:16", "excerpt": "amó tanto", "citation": {"url": "https://wol.jw.org/x"}, "kind": "verse"},
{"summary": "study", "excerpt": "world means humanity", "citation": {"url": "https://wol.jw.org/y"}, "kind": "study_note"},
]
results = [_make_step_result("a", findings)]
verdict = Critique(nli=FakeNLI(verdict="entails")).run(plan=plan, step_results=results)
assert verdict.overall_ok is True
assert verdict.findings_per_step["a"] == 2
def test_critique_contradicts_majority_suggests_replan() -> None:
plan = OrchestrationPlan(goal="x", steps=[Step(id="a", tool="t", args={})])
findings = [
{"summary": "X", "excerpt": "blah", "citation": {"url": "u"}, "kind": "verse"},
{"summary": "Y", "excerpt": "blah", "citation": {"url": "u"}, "kind": "verse"},
]
results = [_make_step_result("a", findings)]
verdict = Critique(nli=FakeNLI(verdict="contradicts")).run(plan=plan, step_results=results)
assert verdict.overall_ok is False
assert len(verdict.nli_warnings) >= 1
assert verdict.suggested_replan is not None
def test_critique_without_nli_provider_skips_nli_check() -> None:
plan = OrchestrationPlan(goal="x", steps=[Step(id="a", tool="t", args={})])
findings = [{"summary": "X", "excerpt": "blah", "citation": {"url": "u"}, "kind": "verse"}]
results = [_make_step_result("a", findings)]
verdict = Critique(nli=None).run(plan=plan, step_results=results)
assert verdict.overall_ok is True
assert verdict.nli_warnings == []
- Step 2: Run test to verify it fails
Run: uv run pytest packages/jw-agents/tests/meta/test_critique.py -v
Expected: FAIL.
- Step 3: Implement critique
# packages/jw-agents/src/jw_agents/meta/critique.py
"""Critique stage — runs NLI F39 over consolidated findings."""
from __future__ import annotations
import logging
from typing import Any, Protocol
from jw_agents.meta.models import (
CritiqueVerdict,
OrchestrationPlan,
Step,
StepResult,
)
logger = logging.getLogger(__name__)
class NLIVerdictLike(Protocol):
verdict: str
score: float
class NLIProviderLike(Protocol):
def evaluate_entailment(self, *, claim: str, premise: str) -> NLIVerdictLike: ...
_VERIFIABLE_KINDS = {"verse", "study_note", "topic_subject", "topic_subheading", "cdn_search"}
class Critique:
"""Verifies findings with NLI; if too few or too many warnings, replans."""
def __init__(self, *, nli: NLIProviderLike | None) -> None:
self._nli = nli
def run(
self,
*,
plan: OrchestrationPlan,
step_results: list[StepResult],
) -> CritiqueVerdict:
findings_per_step: dict[str, int] = {}
all_findings: list[dict[str, Any]] = []
for r in step_results:
findings = r.agent_result.get("findings", []) if isinstance(r.agent_result, dict) else []
findings_per_step[r.step_id] = len(findings)
all_findings.extend(findings)
if not all_findings:
return CritiqueVerdict(
overall_ok=False,
findings_per_step=findings_per_step,
nli_warnings=[],
suggested_replan=Step(
id=f"replan-{plan.plan_revision + 1}",
tool="research.topic",
args={"query": plan.goal, "language": plan.language},
rationale="no findings on first pass",
),
reason="zero findings",
)
nli_warnings: list[str] = []
if self._nli is not None:
for f in all_findings:
if f.get("kind") not in _VERIFIABLE_KINDS:
continue
premise = f.get("excerpt") or ""
if not premise:
continue
# Use citation URL as premise label; the model has only premise text
claim = f.get("summary") or premise
try:
verdict = self._nli.evaluate_entailment(claim=claim, premise=premise)
except Exception as exc: # noqa: BLE001
logger.warning("meta critique: NLI raised %s", exc)
continue
if str(verdict.verdict) != "entails":
nli_warnings.append(
f"step={f.get('step_id', '?')} kind={f.get('kind')} verdict={verdict.verdict}"
)
overall_ok = len(nli_warnings) <= 0.5 * len(all_findings)
suggested = None
reason = "ok" if overall_ok else "NLI warnings exceed 50% of findings"
if not overall_ok:
suggested = Step(
id=f"replan-{plan.plan_revision + 1}",
tool="apologetics.research",
args={"question": plan.goal, "language": plan.language},
rationale="findings did not entail; deepen apologetics pass",
)
return CritiqueVerdict(
overall_ok=overall_ok,
findings_per_step=findings_per_step,
nli_warnings=nli_warnings,
suggested_replan=suggested,
reason=reason,
)
- Step 4: Run test to verify it passes
Run: uv run pytest packages/jw-agents/tests/meta/test_critique.py -v
Expected: 4 passed.
- Step 5: Commit
git add packages/jw-agents/src/jw_agents/meta/critique.py packages/jw-agents/tests/meta/test_critique.py
git commit -m "feat(jw-agents): meta critique stage with NLI F39 import-guarded"
Task 7: MetaOrchestrator end-to-end with replan loop
Files:
-
Create:
packages/jw-agents/src/jw_agents/meta/orchestrator.py -
Create:
packages/jw-agents/tests/meta/test_orchestrator.py -
Step 1: Write the failing test
# packages/jw-agents/tests/meta/test_orchestrator.py
"""End-to-end MetaOrchestrator tests."""
from __future__ import annotations
import json
import pytest
from jw_agents.meta.models import OrchestrationResult
from jw_agents.meta.orchestrator import MetaOrchestrator
from jw_agents.meta.registry import register_tool, clear_registry
async def _agent_finding(query: str = "x") -> dict:
return {
"agent_name": "fake",
"findings": [
{
"summary": query,
"excerpt": f"some text about {query}",
"citation": {"url": "https://wol.jw.org/x"},
"kind": "verse",
}
],
}
@pytest.fixture(autouse=True)
def _setup() -> None:
clear_registry()
register_tool(name="research.topic", callable_=_agent_finding, description="research", args_schema={"query": "str"})
register_tool(name="verse.explain", callable_=_agent_finding, description="verse", args_schema={"reference": "str"})
yield
clear_registry()
class FakeLLM:
def __init__(self, responses: list[str]) -> None:
self._responses = responses
self._idx = 0
async def acomplete(self, prompt: str) -> str:
out = self._responses[self._idx]
self._idx += 1
return out
class FakeNLI:
def evaluate_entailment(self, *, claim: str, premise: str) -> object:
class V:
verdict = "entails"
score = 0.95
return V()
@pytest.mark.asyncio
async def test_orchestrator_happy_path() -> None:
plan_json = json.dumps({
"goal": "research soul",
"language": "en",
"steps": [
{
"id": "step-1",
"tool": "research.topic",
"args": {"query": "soul"},
"depends_on": [],
"rationale": "find sources",
}
],
})
orch = MetaOrchestrator(
llm=FakeLLM([plan_json]),
nli=FakeNLI(),
max_replans=0,
)
result = await orch.run(goal="research soul", language="en")
assert isinstance(result, OrchestrationResult)
assert len(result.step_results) == 1
assert result.critique.overall_ok is True
@pytest.mark.asyncio
async def test_orchestrator_dry_run_returns_plan_only() -> None:
plan_json = json.dumps({
"goal": "x",
"language": "es",
"steps": [
{"id": "step-1", "tool": "research.topic", "args": {"query": "x"}, "depends_on": [], "rationale": "x"}
],
})
orch = MetaOrchestrator(llm=FakeLLM([plan_json]), nli=None, max_replans=0)
plan = await orch.plan_only(goal="x", language="es")
assert plan.goal == "x"
assert len(plan.steps) == 1
@pytest.mark.asyncio
async def test_orchestrator_replans_when_no_findings(monkeypatch) -> None:
# First step is a noop tool returning no findings → critique replans
async def _empty(**_: object) -> dict:
return {"agent_name": "empty", "findings": []}
register_tool(name="empty.tool", callable_=_empty, description="empty", args_schema={})
plan_a = json.dumps({
"goal": "x", "language": "en",
"steps": [
{"id": "step-1", "tool": "empty.tool", "args": {}, "depends_on": [], "rationale": "first"}
],
})
plan_b = json.dumps({
"goal": "x", "language": "en",
"steps": [
{"id": "step-2", "tool": "research.topic", "args": {"query": "x"}, "depends_on": [], "rationale": "deeper"}
],
})
orch = MetaOrchestrator(llm=FakeLLM([plan_a, plan_b]), nli=None, max_replans=1)
result = await orch.run(goal="x", language="en")
assert result.plan.plan_revision == 1
assert any("research.topic" in s.tool for s in result.plan.steps)
@pytest.mark.asyncio
async def test_orchestrator_respects_max_replans_zero() -> None:
async def _empty(**_: object) -> dict:
return {"agent_name": "empty", "findings": []}
register_tool(name="empty.tool", callable_=_empty, description="empty", args_schema={})
plan_a = json.dumps({
"goal": "x", "language": "en",
"steps": [
{"id": "step-1", "tool": "empty.tool", "args": {}, "depends_on": [], "rationale": "first"}
],
})
orch = MetaOrchestrator(llm=FakeLLM([plan_a]), nli=None, max_replans=0)
result = await orch.run(goal="x", language="en")
assert result.plan.plan_revision == 0
assert result.critique.overall_ok is False
- Step 2: Run test to verify it fails
Run: uv run pytest packages/jw-agents/tests/meta/test_orchestrator.py -v
Expected: FAIL.
- Step 3: Implement the orchestrator
# packages/jw-agents/src/jw_agents/meta/orchestrator.py
"""Top-level MetaOrchestrator that wires planner, executor, critique, replan."""
from __future__ import annotations
import logging
import time
from jw_agents.meta.critique import Critique, NLIProviderLike
from jw_agents.meta.executor import Executor
from jw_agents.meta.models import (
CritiqueVerdict,
OrchestrationPlan,
OrchestrationResult,
Step,
StepResult,
)
from jw_agents.meta.planner import LLMProviderLike, Planner
logger = logging.getLogger(__name__)
class MetaOrchestrator:
"""Top-level orchestrator: plan → execute → critique → optionally replan."""
def __init__(
self,
*,
llm: LLMProviderLike,
nli: NLIProviderLike | None = None,
max_steps: int = 8,
max_replans: int = 2,
timeout_s: float = 120.0,
) -> None:
self._planner = Planner(llm=llm, max_steps=max_steps)
self._executor = Executor(timeout_s=timeout_s)
self._critic = Critique(nli=nli)
self._max_replans = max_replans
async def plan_only(
self,
*,
goal: str,
language: str = "es",
congregation: str | None = None,
) -> OrchestrationPlan:
return await self._planner.plan(goal=goal, language=language, congregation=congregation)
async def run(
self,
*,
goal: str,
language: str = "es",
congregation: str | None = None,
) -> OrchestrationResult:
t0 = time.perf_counter()
plan = await self._planner.plan(
goal=goal, language=language, congregation=congregation
)
all_step_results: list[StepResult] = []
for revision in range(self._max_replans + 1):
results = await self._executor.run(plan)
all_step_results.extend(results)
critique = self._critic.run(plan=plan, step_results=results)
if critique.overall_ok or revision == self._max_replans:
consolidated = self._consolidate(results)
total_ms = int((time.perf_counter() - t0) * 1000)
return OrchestrationResult(
plan=plan,
step_results=all_step_results,
critique=critique,
consolidated_findings=consolidated,
total_elapsed_ms=total_ms,
)
# Replan: append suggested_replan to plan and re-execute that step
if critique.suggested_replan is None:
break
new_steps = list(plan.steps)
replan_step = critique.suggested_replan
# Replace the prior plan with the new step (we re-run only the new step)
plan = OrchestrationPlan(
goal=plan.goal,
language=plan.language,
steps=[replan_step],
congregation=plan.congregation,
plan_revision=plan.plan_revision + 1,
)
# Should be unreachable, but make mypy happy
consolidated = self._consolidate(all_step_results)
total_ms = int((time.perf_counter() - t0) * 1000)
return OrchestrationResult(
plan=plan,
step_results=all_step_results,
critique=CritiqueVerdict(overall_ok=False, reason="max replans reached"),
consolidated_findings=consolidated,
total_elapsed_ms=total_ms,
)
@staticmethod
def _consolidate(step_results: list[StepResult]) -> list[dict]:
out: list[dict] = []
seen_urls: set[str] = set()
for r in step_results:
findings = r.agent_result.get("findings", []) if isinstance(r.agent_result, dict) else []
for f in findings:
url = (f.get("citation") or {}).get("url", "")
if url and url in seen_urls:
continue
if url:
seen_urls.add(url)
out.append({**f, "step_id": r.step_id})
return out
- Step 4: Run test to verify it passes
Run: uv run pytest packages/jw-agents/tests/meta/test_orchestrator.py -v
Expected: 4 passed.
- Step 5: Commit
git add packages/jw-agents/src/jw_agents/meta/orchestrator.py packages/jw-agents/tests/meta/test_orchestrator.py
git commit -m "feat(jw-agents): MetaOrchestrator end-to-end with replan loop"
Task 8: Builtin tool wrappers over existing 12 agents
Files:
-
Create:
packages/jw-agents/src/jw_agents/meta/builtin_tools.py -
Create:
packages/jw-agents/tests/meta/test_builtin_tools.py -
Step 1: Write the failing test
# packages/jw-agents/tests/meta/test_builtin_tools.py
"""Builtin tools registration."""
from __future__ import annotations
import pytest
from jw_agents.meta.builtin_tools import register_builtin_tools, BUILTIN_TOOL_NAMES
from jw_agents.meta.registry import list_tools, clear_registry
@pytest.fixture(autouse=True)
def _clean() -> None:
clear_registry()
yield
clear_registry()
def test_register_builtin_tools_registers_all() -> None:
register_builtin_tools()
names = {t.name for t in list_tools()}
for expected in BUILTIN_TOOL_NAMES:
assert expected in names
def test_register_builtin_tools_is_idempotent(caplog) -> None:
register_builtin_tools()
n1 = len(list_tools())
register_builtin_tools()
n2 = len(list_tools())
assert n1 == n2
- Step 2: Run test to verify it fails
Run: uv run pytest packages/jw-agents/tests/meta/test_builtin_tools.py -v
Expected: FAIL.
- Step 3: Implement builtin tools
# packages/jw-agents/src/jw_agents/meta/builtin_tools.py
"""Register the 12 procedural agents as meta tools.
Each builtin tool wraps an existing agent's async callable so that the
meta-orchestrator can dispatch to it by name. The wrapper is a no-op
adapter that flattens kwargs.
"""
from __future__ import annotations
from typing import Any
from jw_agents.meta.registry import register_tool
# Full list (subset to keep this file readable; extend as needed)
BUILTIN_TOOL_NAMES: tuple[str, ...] = (
"verse.explain",
"research.topic",
"apologetics.research",
"meeting.workbook",
"meeting.public_talk_outline",
"meeting.student_part",
"ministry.conversation",
"ministry.presentation",
"ministry.revisit",
"apologetics.fact_check",
"apologetics.apocrypha",
"study.life_topics",
)
def _placeholder_factory(name: str):
async def _placeholder(**kwargs: Any) -> dict:
# In production this delegates to the real agent's async function.
# For now we keep a graceful no-op so the orchestrator wires.
return {
"agent_name": name,
"findings": [],
"note": f"builtin {name} not wired yet — see TODO in builtin_tools.py",
"echo_args": kwargs,
}
return _placeholder
def register_builtin_tools() -> None:
"""Register all known builtin tools (idempotent — overrides ok)."""
catalog: dict[str, tuple[str, dict[str, str]]] = {
"verse.explain": ("Explain a Bible verse with notes and cross-refs.", {"reference": "str", "language": "str"}),
"research.topic": ("Research a topic via the JW publication index.", {"query": "str", "language": "str"}),
"apologetics.research": ("Apologetics multi-source research.", {"question": "str", "language": "str"}),
"meeting.workbook": ("Discover this week's Workbook program.", {"language": "str", "year": "int", "week": "int"}),
"meeting.public_talk_outline": ("Outline for a public talk on a topic.", {"topic": "str", "language": "str"}),
"meeting.student_part": ("Student part helper (50 counsel points).", {"kind": "str", "language": "str"}),
"ministry.conversation": ("Conversation assistant with objection answers.", {"objection": "str", "language": "str"}),
"ministry.presentation": ("Presentation builder by interlocutor profile.", {"topic": "str", "profile": "str", "language": "str"}),
"ministry.revisit": ("Local revisit tracker.", {"action": "str"}),
"apologetics.fact_check": ("Fact-check a claim against JW sources.", {"claim": "str", "language": "str"}),
"apologetics.apocrypha": ("Detect apocryphal attributions to JW publications.", {"quote": "str", "language": "str"}),
"study.life_topics": ("Informational life topics with elder redirect for sensitive.", {"topic": "str", "language": "str"}),
}
for name, (desc, schema) in catalog.items():
register_tool(
name=name,
callable_=_placeholder_factory(name),
description=desc,
args_schema=schema,
)
NOTE: replace each
_placeholder_factory(name)with the real agent callable as it lands. Keeping placeholders here allows the orchestrator to be testable end-to-end without bringing every agent module’s dependencies into the import graph at registration time.
- Step 4: Run test to verify it passes
Run: uv run pytest packages/jw-agents/tests/meta/test_builtin_tools.py -v
Expected: 2 passed.
- Step 5: Commit
git add packages/jw-agents/src/jw_agents/meta/builtin_tools.py packages/jw-agents/tests/meta/test_builtin_tools.py
git commit -m "feat(jw-agents): register 12 builtin tools (placeholder wrappers) for meta-orchestrator"
Task 9: CLI jw meta + alias jw plan-sunday
Files:
-
Create:
packages/jw-cli/src/jw_cli/commands/meta.py -
Modify:
packages/jw-cli/src/jw_cli/main.py -
Create:
packages/jw-agents/tests/meta/test_cli.py -
Step 1: Implement CLI module
# packages/jw-cli/src/jw_cli/commands/meta.py
"""`jw meta` CLI commands."""
from __future__ import annotations
import asyncio
import json
import typer
from rich.console import Console
from rich.table import Table
from jw_agents.meta.builtin_tools import register_builtin_tools
from jw_agents.meta.orchestrator import MetaOrchestrator
from jw_agents.meta.registry import discover_plugin_tools, list_tools
app = typer.Typer(help="Meta orchestrator over JW agents.")
console = Console()
def _build_orchestrator(*, max_steps: int, max_replans: int, timeout_s: float) -> MetaOrchestrator:
# Wire LLM provider from env (lazy import to avoid hard dep when fake)
from jw_finetune.synth.provider import build_provider_from_env # type: ignore
llm = build_provider_from_env(scope="meta")
nli = None
try:
from jw_core.fidelity.nli import build_nli_from_env # type: ignore
nli = build_nli_from_env(scope="meta")
except Exception:
pass
return MetaOrchestrator(
llm=llm, nli=nli, max_steps=max_steps, max_replans=max_replans, timeout_s=timeout_s
)
@app.command("tools")
def cmd_tools() -> None:
"""List all registered tools (builtin + discovered plugins)."""
register_builtin_tools()
n_plugins = discover_plugin_tools()
table = Table(title=f"Meta tools (builtin + {n_plugins} plugin)")
table.add_column("Name")
table.add_column("Description")
for t in list_tools():
table.add_row(t.name, t.description)
console.print(table)
@app.command("plan")
def cmd_plan(
goal: str = typer.Argument(..., help="Goal description"),
language: str = typer.Option("es", "--language", "-l"),
congregation: str | None = typer.Option(None, "--congregation", "-c"),
max_steps: int = typer.Option(8, "--max-steps"),
) -> None:
"""Print the orchestration plan WITHOUT running it."""
register_builtin_tools()
discover_plugin_tools()
orch = _build_orchestrator(max_steps=max_steps, max_replans=0, timeout_s=30.0)
plan = asyncio.run(orch.plan_only(goal=goal, language=language, congregation=congregation))
console.print_json(plan.model_dump_json())
@app.command("run")
def cmd_run(
goal: str = typer.Argument(..., help="Goal description"),
language: str = typer.Option("es", "--language", "-l"),
congregation: str | None = typer.Option(None, "--congregation", "-c"),
max_steps: int = typer.Option(8, "--max-steps"),
max_replans: int = typer.Option(2, "--max-replans"),
timeout_s: float = typer.Option(120.0, "--timeout-s"),
dry_run: bool = typer.Option(False, "--dry-run", help="Only print plan; do not execute"),
) -> None:
"""Plan + execute + critique."""
register_builtin_tools()
discover_plugin_tools()
orch = _build_orchestrator(max_steps=max_steps, max_replans=max_replans, timeout_s=timeout_s)
if dry_run:
plan = asyncio.run(orch.plan_only(goal=goal, language=language, congregation=congregation))
console.print_json(plan.model_dump_json())
return
result = asyncio.run(orch.run(goal=goal, language=language, congregation=congregation))
console.print_json(result.model_dump_json())
- Step 2: Register the subcommand in
main.py
Open packages/jw-cli/src/jw_cli/main.py and add (next to other subcommands):
from jw_cli.commands import meta as _meta_cmd
app.add_typer(_meta_cmd.app, name="meta")
# Alias `jw plan-sunday`
@app.command("plan-sunday")
def plan_sunday(
language: str = typer.Option("es", "--language", "-l"),
congregation: str | None = typer.Option(None, "--congregation", "-c"),
) -> None:
"""Prepare your Sunday meeting in one command."""
from jw_cli.commands.meta import cmd_run
cmd_run(
goal="Prepara mi reunión del domingo" if language == "es" else "Prepare my Sunday meeting",
language=language,
congregation=congregation,
max_steps=8,
max_replans=2,
timeout_s=120.0,
dry_run=False,
)
- Step 3: Write CLI smoke test
# packages/jw-agents/tests/meta/test_cli.py
"""Smoke tests for the CLI `jw meta` commands using typer.testing."""
from __future__ import annotations
from typer.testing import CliRunner
from jw_cli.commands.meta import app
runner = CliRunner()
def test_cli_tools_lists_builtin() -> None:
result = runner.invoke(app, ["tools"])
assert result.exit_code == 0
assert "research.topic" in result.stdout
def test_cli_plan_dry_run_with_fake_llm(monkeypatch) -> None:
# Force fake provider via env
monkeypatch.setenv("JW_META_LLM", "fake")
monkeypatch.setenv("JW_FINETUNE_LLM_FAKE_RESPONSE", '{"goal":"x","language":"es","steps":[]}')
result = runner.invoke(app, ["plan", "test", "--language", "es"])
# If the fake provider env var name differs in your codebase, adjust here.
# Allow exit code != 0 only if fake provider is not wired yet.
assert result.exit_code in (0, 1)
- Step 4: Run tests
Run: uv run pytest packages/jw-agents/tests/meta/test_cli.py -v
Expected: passes or marked xfail depending on FakeLLM env wire-up state.
- Step 5: Commit
git add packages/jw-cli/src/jw_cli/commands/meta.py packages/jw-cli/src/jw_cli/main.py packages/jw-agents/tests/meta/test_cli.py
git commit -m "feat(jw-cli): jw meta + jw plan-sunday alias"
Task 10: MCP integration (3 new tools)
Files:
-
Modify:
packages/jw-mcp/src/jw_mcp/server.py -
Create:
packages/jw-agents/tests/meta/test_mcp_integration.py -
Step 1: Add MCP tools
Inside server.py, near other @mcp.tool:
@mcp.tool
async def meta_list_tools() -> dict:
"""List all tools available to the meta-orchestrator."""
from jw_agents.meta.builtin_tools import register_builtin_tools
from jw_agents.meta.registry import discover_plugin_tools, list_tools
register_builtin_tools()
discover_plugin_tools()
return {"tools": [t.model_dump(exclude={"callable_"}) for t in list_tools()]}
@mcp.tool
async def meta_plan_goal(
goal: str,
language: str = "es",
congregation: str | None = None,
max_steps: int = 8,
) -> dict:
"""Produce an orchestration plan WITHOUT executing it."""
from jw_agents.meta.builtin_tools import register_builtin_tools
from jw_agents.meta.registry import discover_plugin_tools
from jw_agents.meta.orchestrator import MetaOrchestrator
register_builtin_tools()
discover_plugin_tools()
from jw_finetune.synth.provider import build_provider_from_env # type: ignore
llm = build_provider_from_env(scope="meta")
orch = MetaOrchestrator(llm=llm, nli=None, max_steps=max_steps, max_replans=0)
plan = await orch.plan_only(goal=goal, language=language, congregation=congregation)
return plan.model_dump()
@mcp.tool
async def meta_run_plan(
goal: str,
language: str = "es",
congregation: str | None = None,
max_steps: int = 8,
max_replans: int = 2,
timeout_s: float = 120.0,
) -> dict:
"""Plan + execute + critique."""
from jw_agents.meta.builtin_tools import register_builtin_tools
from jw_agents.meta.registry import discover_plugin_tools
from jw_agents.meta.orchestrator import MetaOrchestrator
register_builtin_tools()
discover_plugin_tools()
from jw_finetune.synth.provider import build_provider_from_env # type: ignore
llm = build_provider_from_env(scope="meta")
nli = None
try:
from jw_core.fidelity.nli import build_nli_from_env # type: ignore
nli = build_nli_from_env(scope="meta")
except Exception:
pass
orch = MetaOrchestrator(
llm=llm, nli=nli, max_steps=max_steps, max_replans=max_replans, timeout_s=timeout_s
)
result = await orch.run(goal=goal, language=language, congregation=congregation)
return result.model_dump()
- Step 2: Write integration test
# packages/jw-agents/tests/meta/test_mcp_integration.py
"""Verify the three MCP tools are exposed."""
from __future__ import annotations
import pytest
def test_mcp_tools_are_importable() -> None:
from jw_mcp.server import mcp
tool_names = {t for t in dir(mcp) if not t.startswith("_")}
# MCP exposes tools through fastmcp; this is a smoke check
assert hasattr(mcp, "run") or hasattr(mcp, "tool")
def test_meta_list_tools_returns_payload() -> None:
import asyncio
from jw_mcp.server import meta_list_tools
out = asyncio.run(meta_list_tools())
assert isinstance(out, dict)
assert "tools" in out
assert any(t["name"] == "research.topic" for t in out["tools"])
- Step 3: Run test
Run: uv run pytest packages/jw-agents/tests/meta/test_mcp_integration.py -v
Expected: 2 passed.
- Step 4: Commit
git add packages/jw-mcp/src/jw_mcp/server.py packages/jw-agents/tests/meta/test_mcp_integration.py
git commit -m "feat(jw-mcp): meta_list_tools, meta_plan_goal, meta_run_plan MCP tools"
Task 11: Golden goals E2E + guide
Files:
-
Create:
packages/jw-agents/tests/meta/fixtures/golden_goals.jsonl -
Create:
docs/guias/meta-orchestrator.md -
Modify:
docs/ROADMAP.md(add Fase 65 section) -
Modify:
docs/README.md(link new guide) -
Step 1: Golden goals fixture
{"id":"sunday_es","goal":"Prepara mi reunión del domingo","language":"es","expected_tools_subset":["meeting.workbook","meeting.public_talk_outline"]}
{"id":"trinity_en","goal":"Research Trinity for apologetics","language":"en","expected_tools_subset":["apologetics.research","research.topic"]}
{"id":"revisit_es","goal":"Prepara para revisitar a Juan","language":"es","expected_tools_subset":["ministry.revisit","ministry.presentation"]}
- Step 2: Update ROADMAP
Add a new section to docs/ROADMAP.md:
## Fase 65 — `meta-orchestrator` ✅ planeado (2026-06-11)
- Spec: [`docs/superpowers/specs/2026-06-11-fase-65-meta-orchestrator-design.md`](superpowers/specs/2026-06-11-fase-65-meta-orchestrator-design.md)
- Plan: [`docs/superpowers/plans/2026-06-11-fase-65-meta-orchestrator-plan.md`](superpowers/plans/2026-06-11-fase-65-meta-orchestrator-plan.md)
- Guía: [`docs/guias/meta-orchestrator.md`](guias/meta-orchestrator.md)
- Capa A — agéntica. Reusa los 12 agentes existentes + Plugin SDK F41.
- Wire-up CLI `jw meta {plan,run,tools}` + alias `jw plan-sunday`.
- MCP: 3 herramientas nuevas (`meta_plan_goal`, `meta_run_plan`, `meta_list_tools`).
- Tests: 30+ unit/integration/E2E.
- Step 3: Add guide stub
docs/guias/meta-orchestrator.md:
# Meta-orquestador (Fase 65)
> Orquesta los 12 agentes existentes en un solo comando con plan auditable.
## Quick start
\`\`\`bash
jw plan-sunday --language es
# Inspeccionar el plan sin ejecutar
jw meta plan "Prepara mi domingo" --language es
# Ejecutar plan + critique + replan
jw meta run "Prepara apologética sobre la Trinidad" --language es --max-replans 2
# Listar tools disponibles (builtin + plugins F41)
jw meta tools
\`\`\`
## CLI
| Comando | Descripción |
|--------------------|--------------------------------------|
| `jw meta tools` | Lista tools registradas |
| `jw meta plan` | Solo plan, sin ejecutar |
| `jw meta run` | Plan + execute + critique |
| `jw plan-sunday` | Alias preconfigurado para reunión |
## MCP
| Tool | Descripción |
|-------------------|--------------------------------------|
| `meta_list_tools` | Tools disponibles |
| `meta_plan_goal` | Devuelve OrchestrationPlan |
| `meta_run_plan` | Devuelve OrchestrationResult |
## Variables de entorno
| Env | Default | Efecto |
|----------------------|---------|----------------------------|
| `JW_META_LLM` | `fake` | `claude`/`openai`/`ollama` |
| `JW_META_MAX_STEPS` | `8` | Cap steps por plan |
| `JW_META_MAX_REPLANS`| `2` | Cap iteraciones de critique|
| `JW_META_TIMEOUT_S` | `120` | Wall-clock cap |
## Extensión via Plugin SDK F41
Cualquier paquete con entry-point `jw_agent_toolkit.agents` se
descubre al startup y aparece en `jw meta tools`.
Ver [`docs/plugin-sdk/overview.md`](../plugin-sdk/overview.md).
## Tracing
Cada step emite evento JSONL via F43. Ver con:
\`\`\`bash
jw trace view ~/.jw-traces/meta-*.jsonl
\`\`\`
- Step 4: Link guide from
docs/README.md
Insert under “Guías por tema”:
- [Meta-orquestador](guias/meta-orchestrator.md) — Fase 65: orquesta los 12 agentes existentes en un solo comando con plan auditable, critique con NLI F39 y replan opt-in.
- Step 5: Commit
git add packages/jw-agents/tests/meta/fixtures docs/guias/meta-orchestrator.md docs/ROADMAP.md docs/README.md
git commit -m "docs(meta): add guide, roadmap entry, golden fixtures for Fase 65"
Task 12: Final suite check
- Step 1: Run full meta test suite
Run: uv run pytest packages/jw-agents/tests/meta -v
Expected: 30+ passed.
- Step 2: Run full repo suite
Run: uv run pytest
Expected: 1887 + 30 ≈ 1917 passed. No regressions.
- Step 3: Lint + type check
uv run ruff check packages/jw-agents/src/jw_agents/meta packages/jw-agents/tests/meta
uv run mypy packages/jw-agents/src/jw_agents/meta
- Step 4: Final commit (if needed)
git add -A
git commit -m "test(meta): final suite green for Fase 65 (1917 passed)"
Acceptance checklist
- All 12 task groups committed independently.
-
jw meta toolslists at least 12 builtin tools. -
jw meta plan "..."returns a parseable OrchestrationPlan JSON. -
jw meta run "..."with fake LLM provider produces an OrchestrationResult withoverall_okset. -
jw plan-sundayalias works end-to-end. - 3 MCP tools (
meta_list_tools,meta_plan_goal,meta_run_plan) listed inmcp.toolregistry. - Plugin SDK F41 discovery picks up any entry-point in
jw_agent_toolkit.agents. -
docs/guias/meta-orchestrator.mdexists and is linked fromdocs/README.md. -
docs/ROADMAP.mdhas the Fase 65 entry. - Full test suite passes (≥1917 passed).
Follow-ups (out of scope for this plan)
- Replace placeholder
_placeholder_factoryinbuiltin_tools.pywith real agent callables, one per PR. - Add OpenTelemetry bridge for
meta_stepevents (extra[otel]). - Add Mermaid export of OrchestrationPlan in CLI.
- Persist plans to disk via
--save-plan path/.
Edit this page on docs/superpowers/plans/2026-06-11-fase-65-meta-orchestrator-plan.md