The Architect Agent comes with an integrated WebSocket API. The API is used by Architect Manager internally and may be accessed by you through custom scripts.
To access the API, you need to create a WebSocket connection to your Agent by connecting through `ws` or `wss` to your Agents public ip, on the configured listening port.
Whether you need to connect to `ws` or `wss` (SSL) depends on your Agents SSL setting. By default, SSL is turned on with self signed certificates.
To programmatically authenticate against an Agent, you need to initiate the connection with a header
Sec-Websocket-Protocol: SMP1.0, {username}, {password}
Any connection, without providing the authentication header will be blocked.
Once connected, commands can be sent as JSON formatted payload. It is following this struct pattern:
{
action: <str>
parameters: <map>
}
You are able to retrieve all commands supported by your Agent version by sending the following
{
action: "list_actions",
parameters: {}
}
You will receive a list of supported actions, their required parameters and data types.
Examples
Python 3
#!/usr/bin/env python3
"""
This is a minimal class to interface with the Architect Agent programmatically.
"""
from __future__ import annotations
import re
import ssl
import json
import uuid
import types
import asyncio
from typing import Callable, Awaitable, Any, Dict, Optional
import websockets
from websockets import WebSocketClientProtocol
class CFToolsArchitectAgentClient:
def __init__(
self,
host: str,
port: int,
*,
use_tls: bool,
username: str,
password: str,
on_event: Optional[Callable[[dict], Awaitable[None]]] = None,
on_connected: Optional[Callable[[dict], Awaitable[None]]] = None,
) -> None:
self._host = host
self._port = port
self._use_tls = use_tls
self._username = username
self._password = password
self._on_event = on_event or (lambda _: asyncio.create_task(asyncio.sleep(0)))
self._on_connected = on_connected
proto = "wss" if self._use_tls else "ws"
self._uri = f"{proto}://{self._host}:{self._port}/ws"
self._subprotocols = ["SMP1.0", self._username, self._password]
self._ws: WebSocketClientProtocol | None = None
self._receiver_task: asyncio.Task[None] | None = None
self._pending: Dict[str, asyncio.Future[dict]] = dict()
self._actions: list = list()
self._populated_actions: bool = False
async def connect(self) -> None:
"""Establish the WebSocket and launch the passive receiver."""
ssl_ctx = ssl.create_default_context()
ssl_ctx.check_hostname = False
ssl_ctx.verify_mode = ssl.CERT_NONE
self._ws = await websockets.connect(
self._uri,
ssl=ssl_ctx,
subprotocols=self._subprotocols,
)
print(f"Connected to {self._uri} (sub-protocol: {self._ws.subprotocol!r})")
# Start the background listener
self._receiver_task = asyncio.create_task(self._receiver())
async def close(self) -> None:
if self._receiver_task:
self._receiver_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await self._receiver_task
if self._ws:
await self._ws.close()
print("Connection closed")
async def send_request(
self, payload: dict, *, timeout: float | None = 10.0
) -> dict:
"""
Fire-and-await request/response conversation.
- Adds an `idempotence` token.
- Returns the first message that echoes that token.
"""
if not self._ws:
raise RuntimeError("Not connected")
token = str(uuid.uuid4()).replace('-', '')
outgoing = dict(payload, idempotence=token)
fut: asyncio.Future[dict] = asyncio.get_running_loop().create_future()
self._pending[token] = fut
await self._ws.send(json.dumps(outgoing))
try:
return await asyncio.wait_for(fut, timeout=timeout)
finally:
# Clean up even on timeout/cancel
self._pending.pop(token, None)
async def _receiver(self) -> None:
"""Runs forever, routing every incoming frame."""
assert self._ws # no-connect guard
asyncio.create_task(self._populate_actions())
try:
async for raw in self._ws:
try:
msg = json.loads(raw)
except json.JSONDecodeError:
continue # ignore non-JSON frames
token = msg.get("idempotence")
if token and token in self._pending:
# Wake the awaiting send_request()
self._pending[token].set_result(msg)
else:
# Unsolicited event → callback
asyncio.create_task(self._on_event(msg))
except websockets.ConnectionClosedOK:
pass
except Exception as exc:
# Cancel all waiters on fatal connection error
for fut in self._pending.values():
if not fut.done():
fut.set_exception(exc)
raise
async def _populate_actions(self) -> dict:
await asyncio.sleep(1)
catalogue: dict = await self.list_actions()
def _make_stub(action: str, meta: dict, py_name: str):
"""Factory: returns a bound coroutine for an action."""
async def _stub(self, **params):
required = meta.get("required_parameters", [])
if required is None:
required = []
missing = [p for p in required if p not in params]
if missing:
raise TypeError(
f"Missing {missing} for action '{action}'"
)
payload = {"action": action, "parameters": params}
return await self.send_request(payload)
_stub.__name__ = py_name
_stub.__doc__ = (
f"Dynamically generated wrapper for remote action '{action}'.\n"
f"Remote description: {meta.get('description', 'n/a')}"
)
return _stub
for action_name, meta in catalogue.items():
if meta.get("cloud_controller_only"):
continue
py_name = action_name
stub = _make_stub(action_name, meta, py_name)
setattr(self, py_name, types.MethodType(stub, self))
self._actions.append(action_name)
self._populated_actions = True
if self._on_connected:
asyncio.create_task(self._on_connected())
async def list_actions(self) -> dict:
"""Utility wrapper around the `list_actions` request."""
reply = await self.send_request({"action": "list_actions", "parameters": {}})
return reply["parameters"]["actions"]
async def main() -> None:
client: CFToolsArchitectAgentClient = None
async def handle_event(event: dict) -> None:
print("event:", event)
async def handle_connected() -> None:
print('Connected.')
print(await client.servers())
HOST = "your ip / hostname"
PORT = 8090
USE_TLS = True
USERNAME = "username"
PASSWORD = "password"
client = CFToolsArchitectAgentClient(
host = HOST,
port = PORT,
use_tls = USE_TLS,
username = USERNAME,
password = PASSWORD,
on_event=handle_event,
on_connected=handle_connected
)
await client.connect()
# Keep running; Ctrl-C to exit
try:
while True:
await asyncio.sleep(3600)
finally:
await client.close()
if __name__ == "__main__":
import contextlib
try:
asyncio.run(main())
except KeyboardInterrupt:
pass