Basics

Agent API basics
Written by Philipp
Updated 1 week ago
Advanced topic

The Architect Agent comes with an integrated WebSocket API. The API is used by Architect Manager internally and may be accessed by you through custom scripts.

To access the API, you need to create a WebSocket connection to your Agent by connecting through `ws` or `wss` to your Agents public ip, on the configured listening port.

Whether you need to connect to `ws` or `wss` (SSL) depends on your Agents SSL setting. By default, SSL is turned on with self signed certificates.

Connections through common WS libraries require you to explicitly accept self signed certificates

To programmatically authenticate against an Agent, you need to initiate the connection with a header

Sec-Websocket-Protocol: SMP1.0, {username}, {password}

Any connection, without providing the authentication header will be blocked.

Once connected, commands can be sent as JSON formatted payload. It is following this struct pattern:

{
   action: <str>
   parameters: <map>
}

You are able to retrieve all commands supported by your Agent version by sending the following

{
   action: "list_actions",
   parameters: {}
}

You will receive a list of supported actions, their required parameters and data types.

Examples

Python 3

#!/usr/bin/env python3
"""
This is a minimal class to interface with the Architect Agent programmatically.
"""
from __future__ import annotations

import re
import ssl
import json
import uuid
import types
import asyncio
from typing import Callable, Awaitable, Any, Dict, Optional

import websockets
from websockets import WebSocketClientProtocol


class CFToolsArchitectAgentClient:
	def __init__(
		self,
		host: str,
		port: int,
		*,
		use_tls: bool,
		username: str,
		password: str,
		on_event: Optional[Callable[[dict], Awaitable[None]]] = None,
		on_connected: Optional[Callable[[dict], Awaitable[None]]] = None,
	) -> None:
		self._host = host
		self._port = port
		self._use_tls = use_tls
		self._username = username
		self._password = password
		self._on_event = on_event or (lambda _: asyncio.create_task(asyncio.sleep(0)))
		self._on_connected = on_connected

		proto = "wss" if self._use_tls else "ws"
		self._uri = f"{proto}://{self._host}:{self._port}/ws"
		self._subprotocols = ["SMP1.0", self._username, self._password]

		self._ws: WebSocketClientProtocol | None = None
		self._receiver_task: asyncio.Task[None] | None = None
		self._pending: Dict[str, asyncio.Future[dict]] = dict()

		self._actions: list = list()
		self._populated_actions: bool = False

	async def connect(self) -> None:
		"""Establish the WebSocket and launch the passive receiver."""
		ssl_ctx = ssl.create_default_context()
		ssl_ctx.check_hostname = False
		ssl_ctx.verify_mode = ssl.CERT_NONE

		self._ws = await websockets.connect(
			self._uri,
			ssl=ssl_ctx,
			subprotocols=self._subprotocols,
		)
		print(f"Connected to {self._uri} (sub-protocol: {self._ws.subprotocol!r})")

		# Start the background listener
		self._receiver_task = asyncio.create_task(self._receiver())

	async def close(self) -> None:
		if self._receiver_task:
			self._receiver_task.cancel()
			with contextlib.suppress(asyncio.CancelledError):
				await self._receiver_task
		if self._ws:
			await self._ws.close()

		print("Connection closed")

	async def send_request(
		self, payload: dict, *, timeout: float | None = 10.0
	) -> dict:
		"""
		Fire-and-await request/response conversation.

		- Adds an `idempotence` token.
		- Returns the first message that echoes that token.
		"""

		if not self._ws:
			raise RuntimeError("Not connected")

		token = str(uuid.uuid4()).replace('-', '')
		outgoing = dict(payload, idempotence=token)

		fut: asyncio.Future[dict] = asyncio.get_running_loop().create_future()
		self._pending[token] = fut

		await self._ws.send(json.dumps(outgoing))
		try:
			return await asyncio.wait_for(fut, timeout=timeout)

		finally:
			# Clean up even on timeout/cancel
			self._pending.pop(token, None)

	async def _receiver(self) -> None:
		"""Runs forever, routing every incoming frame."""
		assert self._ws  # no-connect guard
		asyncio.create_task(self._populate_actions())
		
		try:
			async for raw in self._ws:
				try:
					msg = json.loads(raw)
				except json.JSONDecodeError:
					continue  # ignore non-JSON frames

				token = msg.get("idempotence")
				if token and token in self._pending:
					# Wake the awaiting send_request()
					self._pending[token].set_result(msg)

				else:
					# Unsolicited event → callback
					asyncio.create_task(self._on_event(msg))

		except websockets.ConnectionClosedOK:
			pass

		except Exception as exc:
			# Cancel all waiters on fatal connection error
			for fut in self._pending.values():
				if not fut.done():
					fut.set_exception(exc)
			raise

	async def _populate_actions(self) -> dict:
		await asyncio.sleep(1)
		catalogue: dict = await self.list_actions()

		def _make_stub(action: str, meta: dict, py_name: str):
			"""Factory: returns a bound coroutine for an action."""
			async def _stub(self, **params):
				required = meta.get("required_parameters", [])
				if required is None:
					required = []

				missing = [p for p in required if p not in params]
				if missing:
					raise TypeError(
						f"Missing {missing} for action '{action}'"
					)

				payload = {"action": action, "parameters": params}
				return await self.send_request(payload)

			_stub.__name__ = py_name
			_stub.__doc__ = (
				f"Dynamically generated wrapper for remote action '{action}'.\n"
				f"Remote description: {meta.get('description', 'n/a')}"
			)
			return _stub

		for action_name, meta in catalogue.items():
			if meta.get("cloud_controller_only"):
				continue

			py_name = action_name

			stub = _make_stub(action_name, meta, py_name)
			setattr(self, py_name, types.MethodType(stub, self))
			self._actions.append(action_name)

		self._populated_actions = True
		if self._on_connected:
			asyncio.create_task(self._on_connected())

	async def list_actions(self) -> dict:
		"""Utility wrapper around the `list_actions` request."""
		reply = await self.send_request({"action": "list_actions", "parameters": {}})
		return reply["parameters"]["actions"]


async def main() -> None:
	client: CFToolsArchitectAgentClient = None

	async def handle_event(event: dict) -> None:
		print("event:", event)

	async def handle_connected() -> None:
		print('Connected.')
		print(await client.servers())

	HOST = "your ip / hostname"
	PORT = 8090
	USE_TLS = True
	USERNAME = "username"
	PASSWORD = "password"

	client = CFToolsArchitectAgentClient(
		host = HOST,
		port = PORT,
		use_tls = USE_TLS,
		username = USERNAME,
		password = PASSWORD,
		on_event=handle_event,
		on_connected=handle_connected
	)
	await client.connect()

	# Keep running; Ctrl-C to exit
	try:
		while True:
			await asyncio.sleep(3600)
	finally:
		await client.close()


if __name__ == "__main__":
	import contextlib
	try:
		asyncio.run(main())

	except KeyboardInterrupt:
		pass

Did this answer your question?