AI News Hub Logo

AI News Hub

๐Ÿค– nanobot: A Comprehensive Build-Your-Own Guide ๐Ÿ“š

DEV Community
Truong Phung

A deep, actionable breakdown of HKUDS/nanobot โ€” the ~4k-line ultra-lightweight personal AI agent โ€” distilled into principles, techniques, and a step-by-step blueprint you can use to build a similar system. 1. ๐Ÿงฉ What nanobot is (and why it matters) 2. โš™๏ธ Core design principles 3. ๐Ÿ—๏ธ Architecture at a glance 4. ๐Ÿ“ Repo structure (the map you'll keep open) 5. ๐Ÿ”„ The Agent Loop (the heart) 6. ๐ŸšŒ The Message Bus (the seam) 7. ๐Ÿ“ก Channel adapter pattern 8. ๐Ÿค– Provider abstraction (LLMs) 9. ๐Ÿ› ๏ธ Tools (the LLM's hands) 10. ๐Ÿ”— MCP integration (free tools from the ecosystem) 11. ๐Ÿง  Memory: two stages and a "dream" 12. ๐Ÿค Sub-agents (the cheap version) 13. ๐Ÿ”ง Slash commands 14. ๐Ÿ“š Skills (markdown as a plug-in format) 15. ๐Ÿงฑ Context assembly 16. ๐Ÿ’พ Sessions, checkpoints, and graceful interruption 17. ๐Ÿ” Security model 18. โš™๏ธ Configuration model 19. ๐Ÿš€ Deployment paths 20. ๐Ÿ—บ๏ธ Step-by-step blueprint to build your own 21. ๐Ÿ’ก Lessons & non-obvious wins 22. ๐Ÿ”ฎ Where it can go next ๐Ÿ“š Sources nanobot is an open-source AI agent framework whose entire core agent loop fits in a few thousand lines of clean Python. It is positioned in the same lineage as Claude Code, Codex, and OpenClaw โ€” but consciously stripped down to the smallest readable kernel that still supports: Multiple chat channels (Telegram, Discord, Slack, Feishu, WeChat, Email, Matrix, MS Teams, WhatsApp, โ€ฆ) 25+ LLM providers (OpenAI, Anthropic, Gemini, DeepSeek, Qwen, Ollama, vLLM, OpenRouterโ€ฆ) MCP (Model Context Protocol) tool servers Long-running memory with a "dream" consolidation phase Sub-agents, sandboxed tool execution, cron jobs, slash commands OpenAI-compatible HTTP API + Python SDK + WebUI Why study it? Because it inverts the usual "framework" instinct. Instead of orchestration layers, plugins, DAGs, and graph schedulers, it centers everything on one small agent loop and lets memory, skills, and tools flow in as context, not as machinery. That's the trick worth copying. These five principles are the entire reason the codebase stays small. Internalize them before writing any code. A single async function consumes a message, asks the LLM, runs tools, repeats. There is no DAG, no planner-executor split, no "agent graph". Anything fancier (sub-agents, dream memory) is implemented as a tool the loop calls, not as a parallel control flow. Channels (Telegram, CLI, Slackโ€ฆ) never call the agent directly. They put InboundMessage on a queue. The agent reads from the queue. That single seam is what makes adding a 14th channel a one-file PR. Each chat (session_key) gets a lock. Within a session everything is strictly serial (no race on history). Across sessions, work runs in parallel (one user's slow tool call doesn't block another's). It's the simplest correct concurrency model for a multi-tenant chat agent. context, not orchestration Skills are markdown files. Memory is MEMORY.md, SOUL.md, USER.md, history.jsonl. They get injected into the prompt, not loaded into a vector DB or a state machine. The LLM does the routing. Three plug-in surfaces, each backed by a dataclass-driven registry: ProviderSpec for LLMs BaseChannel for chat platforms Tool for capabilities Adding a new integration almost always means adding one entry to one registry plus one file. โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚ Chat platformsโ”‚ โ”€โ”€โ–บ โ”‚ Channels โ”‚ โ”€โ”€โ–บ โ”‚ MessageBus โ”‚ โ”‚ (Telegramโ€ฆ) โ”‚ โ”‚ (adapters) โ”‚ โ”‚ inbound queue โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚ โ–ผ โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚ AgentLoop (run) โ”‚ โ”‚ โ”œโ”€โ”€ per-session lock + pending queue (mid-turn injection) โ”‚ โ”‚ โ”œโ”€โ”€ auto-compact / consolidate by tokens โ”‚ โ”‚ โ”œโ”€โ”€ slash commands router โ”‚ โ”‚ โ””โ”€โ”€ _run_agent_loop โ”€โ”€โ–บ Runner.run โ”‚ โ”‚ โ”œโ”€โ”€ build messages โ”‚ โ”‚ โ”œโ”€โ”€ call Provider.chat โ”‚ โ”‚ โ”œโ”€โ”€ execute Tools (concurrent) โ”‚ โ”‚ โ””โ”€โ”€ repeat until stop_reason โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚ โ–ผ โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚ MessageBus โ”‚ โ”‚ outbound queue โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ–ผ Channels.send โ†’ user Side systems hanging off this spine: Providers โ€” pluggable LLM backends behind one chat() method Tools โ€” Python Tool subclasses + MCP wrappers + sub-agent spawn Memory โ€” file-based (MEMORY.md, SOUL.md, USER.md, history.jsonl) + git-versioned Sessions โ€” JSON files persisting per-chat history and checkpoints Cron / Heartbeat โ€” scheduled triggers fed back as inbound messages Security โ€” sandboxing (bwrap), workspace confinement, allowFrom whitelists API / WebUI โ€” OpenAI-compatible HTTP layer and dev gateway nanobot/ โ”œโ”€โ”€ agent/ โ”‚ โ”œโ”€โ”€ loop.py # the AgentLoop class โ€” top-level dispatcher โ”‚ โ”œโ”€โ”€ runner.py # iteration loop: LLM โ†’ tools โ†’ repeat โ”‚ โ”œโ”€โ”€ context.py # builds system prompt + messages โ”‚ โ”œโ”€โ”€ memory.py # MemoryStore, Consolidator, Dream โ”‚ โ”œโ”€โ”€ skills.py # SkillsLoader (SKILL.md files) โ”‚ โ”œโ”€โ”€ subagent.py # spawn isolated agents as asyncio Tasks โ”‚ โ”œโ”€โ”€ autocompact.py # idle-time history compression โ”‚ โ”œโ”€โ”€ hook.py # AgentHook lifecycle (streaming, progress) โ”‚ โ””โ”€โ”€ tools/ # built-in tools โ”‚ โ”œโ”€โ”€ base.py # Tool ABC + JSON schema generation โ”‚ โ”œโ”€โ”€ registry.py โ”‚ โ”œโ”€โ”€ filesystem.py shell.py web.py search.py notebook.py โ”‚ โ”œโ”€โ”€ ask.py # ask-user with buttons โ”‚ โ”œโ”€โ”€ spawn.py # sub-agent spawn tool โ”‚ โ”œโ”€โ”€ message.py # send-to-channel tool โ”‚ โ”œโ”€โ”€ cron.py self.py sandbox.py file_state.py mcp.py โ”œโ”€โ”€ bus/ โ”‚ โ”œโ”€โ”€ events.py # InboundMessage / OutboundMessage dataclasses โ”‚ โ””โ”€โ”€ queue.py # MessageBus (two asyncio.Queues) โ”œโ”€โ”€ channels/ โ”‚ โ”œโ”€โ”€ base.py # BaseChannel ABC โ”‚ โ”œโ”€โ”€ registry.py manager.py โ”‚ โ””โ”€โ”€ telegram.py discord.py slack.py feishu.py wecom.py weixin.py โ”‚ qq.py dingtalk.py matrix.py msteams.py whatsapp.py โ”‚ email.py mochat.py websocket.py โ”œโ”€โ”€ providers/ โ”‚ โ”œโ”€โ”€ base.py # LLMProvider ABC + LLMResponse โ”‚ โ”œโ”€โ”€ registry.py factory.py โ”‚ โ”œโ”€โ”€ openai_compat_provider.py # covers most providers โ”‚ โ”œโ”€โ”€ anthropic_provider.py azure_openai_provider.py โ”‚ โ”œโ”€โ”€ github_copilot_provider.py openai_codex_provider.py โ”‚ โ””โ”€โ”€ transcription.py โ”œโ”€โ”€ command/ # slash commands (/help /stop /memory โ€ฆ) โ”œโ”€โ”€ config/ # JSON config loading and validation โ”œโ”€โ”€ session/ # per-chat persistence (JSON files) โ”œโ”€โ”€ cron/ heartbeat/ # scheduled triggers โ”œโ”€โ”€ security/ # sandbox, allowFrom, SSRF guards โ”œโ”€โ”€ api/ # OpenAI-compatible HTTP server โ”œโ”€โ”€ cli/ # `nanobot onboard | agent | gateway` โ”œโ”€โ”€ templates/ # prompt templates (dream_phase1.md etc.) โ””โ”€โ”€ nanobot.py # top-level wiring Read in this order to learn fastest: bus/events.py โ†’ channels/base.py โ†’ agent/loop.py โ†’ agent/runner.py โ†’ agent/context.py โ†’ agent/memory.py. This is the smallest interesting piece in the project. Three stacked async functions, total ~300 lines. AgentLoop.run) async def run(self) -> None: self._running = True await self._connect_mcp() while self._running: try: msg = await asyncio.wait_for(self.bus.consume_inbound(), timeout=1.0) except asyncio.TimeoutError: self.auto_compact.check_expired(...) # housekeeping tick continue if self.commands.is_priority(msg.content.strip()): await self._dispatch_command_inline(...) # /stop etc. preempt continue key = self._effective_session_key(msg) if key in self._pending_queues: # session is mid-turn โ†’ inject as follow-up, don't start a 2nd task self._pending_queues[key].put_nowait(msg) continue task = asyncio.create_task(self._dispatch(msg)) self._active_tasks.setdefault(key, []).append(task) Three things make this worth copying: 1-second timeout on the queue read so housekeeping can run between messages. Priority command shortcut for things that must work even when a turn is busy (/stop). Pending queues for mid-turn injection โ€” the trick that lets users send a follow-up while a turn is still running. _dispatch) async def _dispatch(self, msg): key = self._effective_session_key(msg) lock = self._session_locks.setdefault(key, asyncio.Lock()) pending = asyncio.Queue(maxsize=20) self._pending_queues[key] = pending try: async with lock, self._concurrency_gate or nullcontext(): response = await self._process_message(msg, pending_queue=pending, ...) if response: await self.bus.publish_outbound(response) finally: # if more messages arrived, push them back to inbound for the next turn queue = self._pending_queues.pop(key, None) while queue and not queue.empty(): await self.bus.publish_inbound(queue.get_nowait()) The lock-per-session is the entire concurrency model. No threads, no actors, no Redis. Runner.run) for iteration in range(spec.max_iterations): msgs = self._drop_orphan_tool_results(messages) msgs = self._backfill_missing_tool_results(msgs) msgs = self._microcompact(msgs) response = await self._request_model(spec, msgs, hook, ctx) if response.should_execute_tools: results, events, fatal = await self._execute_tools(spec, response.tool_calls, ...) # append assistant + tool messages, possibly inject pending user messages if fatal: stop_reason = "tool_error"; break else: stop_reason = "completed"; break else: stop_reason = "max_iterations" Stop reasons drive the public contract: completed, ask_user, tool_error, error, empty_final_response, max_iterations. Each one renders differently for the user. Key invariants the runner enforces every iteration: Drop orphan tool results โ€” if the LLM forgot to emit a tool_use for a tool_result, strip it (some providers will 400 otherwise). Backfill missing tool results โ€” if the LLM emitted tool_use with no matching result, synthesize an error placeholder so the trace is well-formed. Microcompact โ€” fast pre-call truncation of large blobs. Concurrent tools โ€” asyncio.gather over a batch when concurrent_tools=True. Two queues. That's it. # bus/events.py @dataclass class InboundMessage: channel: str; sender_id: str; chat_id: str; content: str timestamp: datetime = field(default_factory=datetime.now) media: list[str] = field(default_factory=list) metadata: dict[str, Any] = field(default_factory=dict) session_key_override: str | None = None @dataclass class OutboundMessage: channel: str; chat_id: str; content: str reply_to: str | None = None media: list[str] = field(default_factory=list) metadata: dict[str, Any] = field(default_factory=dict) buttons: list[list[str]] = field(default_factory=list) # bus/queue.py class MessageBus: def __init__(self): self.inbound = asyncio.Queue[InboundMessage]() self.outbound = asyncio.Queue[OutboundMessage]() async def publish_inbound(self, m): await self.inbound.put(m) async def publish_outbound(self, m): await self.outbound.put(m) async def consume_inbound(self): return await self.inbound.get() async def consume_outbound(self): return await self.outbound.get() Why this is enough. Every channel just needs to (a) translate platform events into InboundMessage and publish_inbound, and (b) listen to consume_outbound for messages addressed to its channel name and translate back. No shared state, no events, no observers, no callbacks. Side effect: cron jobs, heartbeat triggers, sub-agent results, and inter-agent messages all use the same bus โ€” they're just synthetic InboundMessage events with channel="system". Uniformity = small code. class BaseChannel(ABC): name: str = "base" def __init__(self, config, bus): self.config = config; self.bus = bus @abstractmethod async def start(self): ... @abstractmethod async def stop(self): ... @abstractmethod async def send(self, msg: OutboundMessage): ... async def _handle_message(self, sender_id, chat_id, content, media=None, metadata=None, session_key=None): if not self.is_allowed(sender_id): # allowFrom check return await self.bus.publish_inbound(InboundMessage( channel=self.name, sender_id=str(sender_id), chat_id=str(chat_id), content=content, media=media or [], metadata=metadata or {}, session_key_override=session_key, )) A new channel implementation is roughly: Subclass BaseChannel. In start(), open the platform's SDK/long-poll/websocket and call _handle_message(...) per inbound event. In send(), look up the platform handle from msg.chat_id and post msg.content (and buttons / media if supported). In stop(), drain and close. Add an entry to channels/registry.py so config can refer to it by name. That's it. ~150 LOC for a Telegram or Slack channel. @dataclass class LLMResponse: content: str | None tool_calls: list[ToolCallRequest] = field(default_factory=list) finish_reason: str = "stop" usage: dict[str, int] = field(default_factory=dict) reasoning_content: str | None = None thinking_blocks: list[dict] | None = None error_status_code: int | None = None error_should_retry: bool | None = None retry_after: float | None = None class LLMProvider(ABC): @abstractmethod async def chat(self, messages, tools=None, model=None, max_tokens=4096, temperature=0.7, reasoning_effort=None, tool_choice=None) -> LLMResponse: ... @abstractmethod def get_default_model(self) -> str: ... # provided by base async def chat_with_retry(self, ...): ... # exponential backoff async def chat_stream_with_retry(self, ...): ... The provider registry uses a lightweight ProviderSpec dataclass: name, keywords, env_key, display_name, default_api_base, is_gateway, detect_by_key_prefix, supports_max_completion_tokens, model_overrides. ~75% of providers reuse openai_compat_provider.py and only differ by spec. New gateways like OpenRouter cost ~10 lines of config. Single hard-won detail to copy: capture rich error metadata in LLMResponse (error_status_code, error_kind, error_should_retry, retry_after). The retry layer becomes a one-page, provider-agnostic policy instead of a forest of except clauses. class Tool(ABC): name: str description: str parameters: dict # JSON Schema read_only: bool = False concurrency_safe: bool = False exclusive: bool = False # blocks other tools from running in parallel def to_schema(self) -> dict: return {"type": "function", "function": {"name": self.name, "description": self.description, "parameters": self.parameters}} def validate_params(self, params: dict) -> dict: ... def cast_params(self, params: dict) -> dict: ... @abstractmethod async def execute(self, **kwargs) -> Any: ... # str | content blocks Built-in tools (agent/tools/): filesystem.py โ€” read/write/edit/list, with workspace confinement shell.py โ€” exec with optional bwrap sandbox search.py / web.py โ€” DuckDuckGo, Brave, Tavily, Jina, Kagi, SearXNG; fetch URLs notebook.py โ€” Jupyter cell editing ask.py โ€” ask_user raises a "stop and wait for human" with optional buttons spawn.py โ€” fire-and-forget sub-agent message.py โ€” proactively send to a channel (channel, chat_id) cron.py / self.py โ€” schedule and modify own behavior mcp.py โ€” bridge to MCP servers (next section) Three properties matter for correctness: read_only โ†’ safe to ignore for state checkpointing concurrency_safe โ†’ can be batched in asyncio.gather exclusive โ†’ must be the only tool in its batch The runner partitions a turn's tool calls into batches honoring these flags. That's how you get fast parallel reads without races on writes. # pseudo-summary of agent/tools/mcp.py async def connect_mcp_servers(specs, registry): for spec in specs: async with AsyncExitStack() as stack: session = await open_session(spec) # stdio | sse | streamableHttp tools = await session.list_tools() res = await session.list_resources() prompts= await session.list_prompts() for t in filter_by_enabled(tools, spec.enabled_tools): registry.register(MCPToolWrapper(session, t)) for r in res: registry.register(MCPResourceWrapper(session, r)) for p in prompts: registry.register(MCPPromptWrapper(session, p)) Notes worth stealing: Sanitize names with mcp_{server}_{tool} to avoid collisions. Wrap each server in its own AsyncExitStack inside its own task โ€” prevents asyncio cancel-scope leakage when one server dies. Detect a small set of transient exception names (connection reset, broken pipeโ€ฆ) and retry once. Everything else fails fast. Surface a hint about "stdio protocol pollution" when JSON-RPC parse fails โ€” saves hours of debugging when an MCP server prints to stdout. workspace/ โ”œโ”€โ”€ MEMORY.md # long-term facts (git-tracked, line-age annotated) โ”œโ”€โ”€ SOUL.md # agent identity / persona (git-tracked) โ”œโ”€โ”€ USER.md # user profile (git-tracked) โ””โ”€โ”€ sessions//history.jsonl # append-only event log Triggered every turn. If the prompt would exceed context_window_tokens โˆ’ max_completion_tokens โˆ’ 1024, evict oldest user-turn boundaries and summarize the chunk via the LLM (capped at 8000 chars). Append summary to history.jsonl, advance session.last_consolidated. Result: prompt always fits. Runs as a cron-like background pass. Two phases: # phase 1: ANALYZE โ€” read unprocessed history + current MEMORY/SOUL/USER phase1 = await self.provider.chat_with_retry( messages=[{"role":"system","content":render_template("agent/dream_phase1.md")}, {"role":"user","content": history + file_context}]) # phase 2: EXECUTE โ€” give the analysis to a runner with file-edit tools result = await self._runner.run(AgentRunSpec( initial_messages=messages, tools=[ReadFileTool, EditFileTool, WriteFileTool], model=self.model, max_iterations=self.max_iterations)) sha = self.store.git.auto_commit("dream consolidation") The dream phase is the agent literally writing notes to itself, then committing them with git. Because everything is files-on-disk + git, the entire memory state is recoverable, diffable, auditable, and human-editable. There is no vector DB. Per-line age suffixes (โ† 30d) are computed from git blame so the LLM can naturally deprioritize stale entries. After idleCompactAfterMinutes of silence, the older context of a session is summarized in-place. The original structured tool-call trail in that session file is not recoverable afterward โ€” but the summarized form is small enough that the next turn starts cheaply. bg_task = asyncio.create_task( self._run_subagent(task_id, task, display_label, origin, status)) self._running_tasks[task_id] = bg_task Asyncio task isolation, not process isolation. Same memory; different tool registry. Reduced toolset: filesystem, shell (if enabled), web โ€” but no message, no spawn (no recursion, no broadcasting). Status object tracks phase, iteration, tool_events, error for live progress. Result reporting via the bus: when done, the sub-agent publishes a synthetic InboundMessage(channel="system", sender_id="subagent", metadata={"injected_event":"subagent_result","subagent_task_id":task_id}). The main loop picks it up like any other message โ€” the bus is the universal IPC. The main loop's pending-queue logic blocks for up to 5 minutes if sub-agents from this turn are still running before completing the turn. So sub-agents never silently leak into a future turn. /stop, /memory, /help โ€ฆ) Located in command/. A small Router matches the first token of msg.content. Two tiers: Priority commands (/stop) preempt โ€” handled before the per-session lock so they always work even mid-turn. Normal commands run inside the turn after history restore but before LLM call. The CommandContext carries (msg, session, key, raw, loop) so commands can inspect or mutate session state. If you want a command surface in your own clone: parse the leading token of content, dispatch to a dict[str, Callable], return an OutboundMessage to short-circuit. ~60 LOC. A skill is just a directory with SKILL.md and any helper files. The frontmatter looks like: --- name: code-review description: Review changed code for quality and correctness requires: bins: [git, rg] env: [GITHUB_TOKEN] --- # Code review skill You are reviewing code. Read the diff with `git diff mainโ€ฆHEAD`, then โ€ฆ The SkillsLoader: Scans workspace/skills/ (user) and packaged nanobot/skills/ (built-in). User overrides built-in. Filters by requirements (shutil.which(bin), os.environ.get(env)). Builds a summary (one line per skill: name + description + availability) that always lives in the system prompt. Loads the full body on demand when the model invokes the skill. That progressive-disclosure pattern (summary always; body on demand) is the entire reason 50+ skills don't blow the context window. Steal it. ContextBuilder.build_system_prompt() concatenates: Identity block (workspace path, OS, Python version, current time) AGENTS.md, SOUL.md, USER.md, TOOLS.md (if present) Memory context from MemoryStore.get_memory_context() Always-active skill bodies Skill summaries (lightweight catalog) Recent history (last 50 entries, hard-capped at 32k chars) build_messages() then prepends a runtime context block (current channel, chat_id, time, session summaries) right before the user's current turn. It also merges consecutive same-role messages so providers like Anthropic don't reject the request. The two constants _MAX_RECENT_HISTORY = 50 and _MAX_HISTORY_CHARS = 32_000 are the only magic numbers; everything else is dynamic. A session is one JSON file keyed by (channel, chat_id) (or an explicit override). It holds history + metadata. Before each turn the loop saves a runtime checkpoint containing intermediate tool messages. On /stop or crash the next turn restores those messages so a half-finished tool sequence isn't lost. An ask_user tool call sets a different kind of pending state: the next user message gets routed as the result of the ask_user call rather than as a new user turn. This combo (checkpoint + ask_user pending + pending queue + per-session lock) is what makes the agent feel "alive across restarts" without a real state machine. Five layers, each minimal: allowFrom whitelist per channel โ€” empty list denies everyone (fail-closed). tools.restrictToWorkspace โ€” every filesystem tool has a path-prefix check; an absolute path outside the workspace is rejected. tools.exec.sandbox = "bwrap" โ€” Linux bubblewrap wrapper around shell; user namespace + read-only mounts. SSRF guard for web.fetch โ€” configurable private-range whitelist; rejects 169.254., 10., etc. by default. Secrets via env โ€” config supports ${VAR} interpolation; for systemd, EnvironmentFile= keeps them off disk in plaintext. For your own build: even if you skip bwrap, do implement #1, #2, and #5 from day one. They cost almost nothing and cover the realistic attack surface. Single JSON file at ~/.nanobot/config.json. Three top-level blocks worth understanding: { "providers": { "openai": {"apiKey": "${OPENAI_API_KEY}"} }, "agent": { "model": "gpt-5", "provider": "openai", "timezone": "America/Los_Angeles", "idleCompactAfterMinutes": 30, "unifiedSession": true, "disabledSkills": [], "contextWindowTokens": 128000 }, "channels": { "telegram": {"enabled": true, "token": "${TG_TOKEN}", "allowFrom": ["12345"]}, "discord": {"enabled": false} }, "tools": { "restrictToWorkspace": true, "exec": {"enable": true, "sandbox": "bwrap"} }, "search": {"provider": "duckduckgo", "maxResults": 5}, "mcp": { "filesystem": {"command": "npx", "args": ["@mcp/filesystem", "/work"]} } } Design rule: registry as single source of truth. The provider/channel registries declare fields and defaults; config validation just checks against the spec. No bespoke per-provider parsing code. Three pre-baked deployment recipes, each ~30 lines: Docker / docker-compose โ€” non-root nanobot user (UID 1000), bwrap pre-installed; mount ~/.nanobot and a workspace/ dir. Gateway exposes port 18790. systemd user service โ€” Restart=always, ProtectSystem=strict, NoNewPrivileges=yes. Use loginctl enable-linger so it survives logout. macOS LaunchAgent โ€” plist with RunAtLoad=true, launchctl bootstrap gui/$(id -u) .... Three CLI entry points: nanobot onboard โ€” interactive setup writes ~/.nanobot/config.json. nanobot agent โ€” single-process CLI chat (development). nanobot gateway โ€” long-running daemon serving channels + HTTP API + WebUI. If your goal is "build a similar one", here's the smallest path to feature parity with the core. Each step is doable in one sitting. bus/events.py, bus/queue.py โ€” InboundMessage, OutboundMessage, MessageBus(asyncio.Queue ร— 2). channels/base.py โ€” abstract start/stop/send + _handle_message helper. channels/cli.py โ€” read stdin, write stdout. Done. providers/base.py โ€” LLMProvider ABC + LLMResponse dataclass + chat_with_retry. providers/openai_compat.py โ€” one implementation that covers OpenAI, DeepSeek, Qwen, Groq, OpenRouter, vLLM, LM Studio, Ollama (they all speak OpenAI's wire format). Add ProviderSpec registry for keywords/env_keys. tools/base.py โ€” Tool ABC, to_schema(), validate_params(), read_only/concurrency_safe/exclusive flags. tools/registry.py โ€” register/unregister/list/to_schemas() and partitioning into batches. One built-in tool: read_file. Test that the LLM calls it. agent/runner.py โ€” implement the for iteration in range(max_iterations): loop. Call provider, dispatch tools (sequential first, parallel later), append messages, stop reasons. Add the three list hygiene helpers: drop_orphan_tool_results, backfill_missing_tool_results, microcompact. agent/loop.py โ€” run() reads bus โ†’ per-session lock โ†’ _dispatch โ†’ _process_message โ†’ runner.run โ†’ publish outbound. Pending queue + mid-turn injection. Save/load session JSON. agent/context.py โ€” build_system_prompt (identity, files, history) + build_messages (merge same-role). Constants _MAX_RECENT_HISTORY=50, _MAX_HISTORY_CHARS=32_000. Pick Telegram (cleanest API). Long-poll โ†’ _handle_message. send() posts back. Done. File-based: MEMORY.md, SOUL.md, USER.md, history.jsonl. Inject MEMORY.md into system prompt. Append turn summaries to history.jsonl. (Skip dream / git for v1; add later.) Estimate prompt tokens (use tiktoken or provider token counts). When > budget, summarize the oldest chunk via the LLM, replace with the summary in history.jsonl. SkillsLoader: scan workspace/skills/, parse frontmatter, build summary. MCP wrapper: connect via stdio, list_tools, register as Tool subclasses with name prefix. Trivial dispatcher on first token. ask_user tool that returns a sentinel; loop emits buttons in OutboundMessage; next inbound is treated as the tool result. subagent.spawn(task) creates an asyncio task with a reduced ToolRegistry; result emitted as system InboundMessage. Cron writes synthetic InboundMessages on schedule. allowFrom whitelist per channel (fail-closed). Workspace path check in every filesystem tool. SSRF guard in web.fetch. Env var interpolation in config. That's the whole thing. ~2500โ€“3500 LOC if you stay disciplined. The moment you reach for a "framework abstraction", stop and ask "would the nanobot author write this?" A short list of things that look small but pay enormous dividends. Copy these even if you copy nothing else. The bus is the universal IPC. Cron, sub-agents, and inter-agent messages all become InboundMessage(channel="system", โ€ฆ). One queue โ†’ one reader โ†’ one mental model. Per-session lock + pending queue is the single trick that makes mid-turn follow-ups feel natural. You will be tempted to use a state machine. Don't. Stop reasons are a real ABI. Treat them like HTTP status codes; never let them leak as exceptions. Files + git > vector DB for an agent's memory of itself. Diffable, recoverable, human-editable, free. Progressive skill loading (catalog summaries always; bodies on demand) keeps context cheap as your skill library grows. Three concurrency flags (read_only, concurrency_safe, exclusive) on tools are enough to safely parallelize. You don't need a real scheduler. Rich LLMResponse error fields โ€” error_status_code, error_should_retry, retry_after โ€” turn provider error handling from Nร—provider matrices into a single retry policy. One JSON config; registry-validated. Don't write per-provider parsers. Add a ProviderSpec row and you're done. allowFrom empty = deny all is the right default. Many "personal" agents accidentally ship open. Session checkpoint on every iteration โ€” cheap, and it makes /stop and crashes feel free instead of catastrophic. The roadmap targets multi-modal I/O, long-term memory beyond MEMORY.md, multi-step planning, calendar-class integrations, and self-improvement loops. The interesting community direction is the Native Agent Swarm proposal (#1495) โ€” moving from asyncio-task sub-agents to true process-isolated parallel agents with message-passing, while keeping the same bus contract. If you're building on top, picking a process boundary that matches that proposal will let you ride future upstream work. HKUDS/nanobot (main repo) README.md agent/loop.py agent/runner.py agent/context.py agent/memory.py agent/skills.py agent/subagent.py agent/tools/base.py agent/tools/mcp.py bus/events.py bus/queue.py channels/base.py providers/base.py docs/configuration.md docs/chat-apps.md docs/deployment.md Native Agent Swarm Architecture RFC (#1495) nanobot Roadmap discussion (#431) nanobot.wiki nanobot-study (3-day learning plan) If you found this helpful, let me know by leaving a ๐Ÿ‘ or a comment!, or if you think this post could help someone, feel free to share it! Thank you very much! ๐Ÿ˜ƒ