Diode WebSocket API Python Example (subscribe_channel)

This is an example python script demonstrating how to use the Diode API with websockets:

import asyncio
import json
import os
import ssl
import sys
import websockets
import websockets.exceptions

bearer_token = "REPLACE"
zone_id = "REPLACE"
channel_id = "REPLACE"
device_id = "REPLACE"
diode_url = f"https://{device_id}.diode.link/api/json_rpc"
diode_wss_uri = f"wss://{device_id}.diode.link/api/json_rpc/ws"
method_send_msg = "send_message"
method_auth = "authenticate"
method_subscribe = "subscribe_channel"
method = method_send_msg


def _ssl_context():
    """SSL context for wss: use certifi CA bundle if available; set DIODE_VERIFY_SSL=0 to disable verification."""
    verify = os.environ.get("DIODE_VERIFY_SSL", "1").strip().lower() not in ("0", "false", "no")
    if verify:
        ctx = ssl.create_default_context()
        try:
            import certifi
            ctx.load_verify_locations(certifi.where())
        except ImportError:
            pass
        return ctx
    return ssl._create_unverified_context()


def _open_timeout():
    """Seconds to wait for opening handshake. Set DIODE_OPEN_TIMEOUT (default 60)."""
    try:
        t = int(os.environ.get("DIODE_OPEN_TIMEOUT", "60").strip())
        return max(5, min(300, t))
    except ValueError:
        return 60


def base_payload():
    return {"jsonrpc": "2.0", "id": 1}


def handle_exception(e):
    if isinstance(e, websockets.exceptions.WebSocketException):
        print(f"WebSocket error: {e}")
    elif isinstance(e, websockets.exceptions.ConnectionClosedError):
        print(f"Conn closed unexpectedly: {e}")
    elif isinstance(e, json.JSONDecodeError):
        print(f"JSON decoding err: {e}")
    elif isinstance(e, asyncio.TimeoutError):
        print(f"Timed out request: {e}")
    else:
        print(f"An unexpected error occurred: {e}")


async def send_auth(websocket):
    # API v3.2 requires params as a JSON object (not array)
    params = {"token": bearer_token}
    payload = base_payload()
    payload["method"] = method_auth
    payload["params"] = params
    message = json.dumps(payload)
    await websocket.send(message)
    response = await websocket.recv()
    return json.loads(response)


async def subscribe_channel(websocket):
    # API v3.2 requires params as a JSON object (not array)
    params = {"zone_id": zone_id, "channel_id": channel_id}
    payload = base_payload()
    payload["method"] = method_subscribe
    payload["params"] = params
    message = json.dumps(payload)
    await websocket.send(message)
    response = await websocket.recv()
    return json.loads(response)


async def send_message(websocket, text):
    # API v3.2 requires params as a JSON object; key is message_text not message
    params = {"zone_id": zone_id, "channel_id": channel_id, "message_text": text}
    payload = {"jsonrpc": "2.0", "id": 2, "method": method_send_msg, "params": params}
    await websocket.send(json.dumps(payload))
    response = await websocket.recv()
    return json.loads(response)


# message receiver
async def receive_messages():
    try:
        print(diode_wss_uri)
        ssl_ctx = _ssl_context()
        open_timeout = _open_timeout()
        print(f"open_timeout={open_timeout}s")
        async with websockets.connect(diode_wss_uri, ssl=ssl_ctx, open_timeout=open_timeout) as websocket:
            try:
                # initial authentication
                resp = await send_auth(websocket)
                print(resp)

                # subscribe to the diode channel
                resp = await subscribe_channel(websocket)
                print(resp)

                # send a hello message
                resp = await send_message(websocket, "hello")
                print("send_message:", resp)

            except Exception as e:
                handle_exception(e)
                sys.exit(1)

            while True:
                message = await websocket.recv()
                print(f"Received message: {message}")

    except KeyboardInterrupt:
        print("Program exit. Interrupted by user")
    except Exception as e:
        print(f"An error occurred upon connection attempt: {e}")


# main loop
asyncio.run(receive_messages())