Home Assistant Components

This section documents the backend pieces that power the RaspyRFM Home Assistant integration. Each subsection links to the relevant source files and explains how the pieces collaborate to monitor radio traffic, learn payloads, and expose entities inside Home Assistant.

Configuration flow and setup

RaspyRFM is installed as a config entry. The integration registers a config flow that resolves the gateway host name and persists the chosen UDP port. Once an entry is created, async_setup_entry instantiates the hub, forwards platform setups, and makes the management panel, websocket commands, and the raspyrfm.send_action service available so payloads can be replayed from automations.

"""Config flow for the RaspyRFM integration."""

from __future__ import annotations

import socket
from typing import Any, Dict

import voluptuous as vol

from homeassistant import config_entries
from homeassistant.core import HomeAssistant

from .const import CONF_HOST, CONF_PORT, DEFAULT_PORT, DOMAIN


async def _validate_input(hass: HomeAssistant, data: Dict[str, Any]) -> Dict[str, Any]:
    host = data[CONF_HOST]
    port = data[CONF_PORT]

    def _resolve() -> str:
        return socket.gethostbyname(host)

    await hass.async_add_executor_job(_resolve)
    return {"title": f"RaspyRFM ({host})", "host": host, "port": port}


class RaspyRFMConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
    """Handle a config flow for RaspyRFM."""

    VERSION = 1

    async def async_step_user(self, user_input: Dict[str, Any] | None = None):
        errors: Dict[str, str] = {}

        if user_input is not None:
            try:
                info = await _validate_input(self.hass, user_input)
            except OSError:
                errors["base"] = "cannot_connect"
            else:
                await self.async_set_unique_id(user_input[CONF_HOST])
                self._abort_if_unique_id_configured()
                return self.async_create_entry(title=info["title"], data=user_input)

        schema = vol.Schema({
            vol.Required(CONF_HOST): str,
            vol.Optional(CONF_PORT, default=DEFAULT_PORT): int,
        })
        return self.async_show_form(step_id="user", data_schema=schema, errors=errors)

    async def async_step_import(self, user_input: Dict[str, Any]) -> Dict[str, Any]:
        """Handle YAML import."""

        return await self.async_step_user(user_input)

    async def async_get_options_flow(self, entry: config_entries.ConfigEntry):
        return RaspyRFMOptionsFlow(entry)


class RaspyRFMOptionsFlow(config_entries.OptionsFlow):
    """Handle options flow for RaspyRFM."""

    def __init__(self, entry: config_entries.ConfigEntry) -> None:
        self._entry = entry

    async def async_step_init(self, user_input: Dict[str, Any] | None = None):
        errors: Dict[str, str] = {}

        if user_input is not None:
            new_data = {
                CONF_HOST: user_input[CONF_HOST],
                CONF_PORT: user_input[CONF_PORT],
            }
            self.hass.config_entries.async_update_entry(self._entry, data=new_data)
            return self.async_create_entry(title="", data={})

        schema = vol.Schema({
            vol.Required(CONF_HOST, default=self._entry.data.get(CONF_HOST)): str,
            vol.Required(CONF_PORT, default=self._entry.data.get(CONF_PORT, DEFAULT_PORT)): int,
        })
        return self.async_show_form(step_id="init", data_schema=schema, errors=errors)
"""Home Assistant integration for the RaspyRFM gateway."""

from __future__ import annotations

import logging

import voluptuous as vol

from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryNotReady
from homeassistant.helpers import config_validation as cv

from .const import (
    ATTR_ACTION,
    ATTR_DEVICE_ID,
    DATA_SERVICE_REGISTERED,
    DOMAIN,
    PLATFORMS,
    SERVICE_SEND_ACTION,
)
from .hub import RaspyRFMHub
from .panel import async_register_panel, async_unregister_panel
from .websocket import async_register_websocket_handlers

_LOGGER = logging.getLogger(__name__)

SEND_ACTION_SCHEMA = vol.Schema({
    vol.Required(ATTR_DEVICE_ID): cv.string,
    vol.Required(ATTR_ACTION): cv.string,
})


async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
    """Set up RaspyRFM from a config entry."""
    hub = RaspyRFMHub(hass, entry)

    try:
        await hub.async_setup()
    except OSError as err:
        raise ConfigEntryNotReady("Unable to initialise RaspyRFM hub") from err

    hass.data.setdefault(DOMAIN, {})[entry.entry_id] = hub

    if not hass.data[DOMAIN].get(DATA_SERVICE_REGISTERED):

        async def _async_handle_send_action(call) -> None:
            device_id: str = call.data[ATTR_DEVICE_ID]
            action: str = call.data[ATTR_ACTION]
            for key, candidate in hass.data[DOMAIN].items():
                if key.startswith("_"):
                    continue
                if candidate.storage.get_device(device_id):

Gateway and hub orchestration

The RaspyRFMGateway wraps the UDP socket used by the hardware bridge. The hub owns a gateway instance and coordinates persistent storage, signal learning, and entity updates via Home Assistant dispatcher signals.

"""Helpers for interacting with the RaspyRFM UDP gateway."""

from __future__ import annotations
import logging
import socket

from homeassistant.core import HomeAssistant

_LOGGER = logging.getLogger(__name__)


class RaspyRFMGateway:
    """Abstraction around the UDP based RaspyRFM gateway."""

    def __init__(self, hass: HomeAssistant, host: str, port: int) -> None:
        self._hass = hass
        self._host = host
        self._port = port

    @property
    def host(self) -> str:
        """Return the configured host."""

        return self._host

    @property
    def port(self) -> int:
        """Return the configured port."""

        return self._port

    async def async_update(self, host: str, port: int) -> None:
        """Update connection details."""

        self._host = host
        self._port = port

    async def async_send_raw(self, payload: str) -> None:
        """Send a raw payload to the gateway via UDP."""

        if not self._host:
            raise OSError("Gateway host not configured")

        def _send() -> None:
            with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
                sock.settimeout(2)
                sock.sendto(payload.encode("utf-8"), (self._host, self._port))

        await self._hass.async_add_executor_job(_send)

    async def async_ping(self) -> None:
        """Attempt to contact the gateway."""

        await self.async_send_raw("PING")
"""Core hub object for the RaspyRFM integration."""

from __future__ import annotations

import asyncio
from dataclasses import dataclass
from datetime import datetime
import logging
import socket
import uuid
from typing import Any, Dict, Iterable, List, Optional

from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.helpers.dispatcher import async_dispatcher_send

from .const import (
    CONF_HOST,
    CONF_PORT,
    DEFAULT_PORT,
    SIGNAL_DEVICE_REGISTRY_UPDATED,
    SIGNAL_DEVICE_REMOVED,
    SIGNAL_LEARNING_STATE,
    SIGNAL_SIGNAL_RECEIVED,
)
from .storage import (
    RaspyRFMDeviceEntry,
    RaspyRFMDeviceStorage,
    RaspyRFMSignalMapStorage,
    RaspyRFMSignalMapping,
)
from .gateway import RaspyRFMGateway
from .learn import LearnManager, LearnedSignal

_LOGGER = logging.getLogger(__name__)


@dataclass(slots=True)
class ActiveSignal:
    """Representation of a signal currently known to the hub."""

    signal: LearnedSignal
    received_at: datetime
    source: tuple[str, int]


class RaspyRFMHub:
    """Bridge between Home Assistant and a RaspyRFM gateway."""

    def __init__(self, hass: HomeAssistant, entry: ConfigEntry) -> None:
        self._hass = hass
        self._entry = entry
        self._gateway = RaspyRFMGateway(
            hass,
            entry.data.get(CONF_HOST, ""),
            entry.data.get(CONF_PORT, DEFAULT_PORT),
        )
        self._storage = RaspyRFMDeviceStorage(hass)
        self._map_storage = RaspyRFMSignalMapStorage(hass)
        self._learn_manager = LearnManager(hass, self)
        self._active_signals: Dict[str, ActiveSignal] = {}
        self._signals_lock = asyncio.Lock()

    @property
    def gateway(self) -> RaspyRFMGateway:
        """Return the gateway helper object."""

        return self._gateway

    @property
    def storage(self) -> RaspyRFMDeviceStorage:
        """Return the persistent storage helper."""

        return self._storage

    @property
    def learn_manager(self) -> LearnManager:
        """Return the signal learning manager."""

        return self._learn_manager

    async def async_setup(self) -> None:
        """Initialise hub resources."""

        await self._storage.async_load()
        await self._map_storage.async_load()

    async def async_unload(self) -> None:
        """Unload hub resources."""

        await self._learn_manager.async_stop()
        await self._storage.async_unload()
        await self._map_storage.async_unload()

    async def async_update_entry(self, entry: ConfigEntry) -> None:
        """Handle entry updates."""

        self._entry = entry
        await self._gateway.async_update(  # type: ignore[no-untyped-call]
            entry.data.get(CONF_HOST, ""), entry.data.get(CONF_PORT, DEFAULT_PORT)
        )

    async def async_start_learning(self) -> None:
        """Start a learning session."""

        async with self._signals_lock:
            self._active_signals.clear()
        await self._learn_manager.async_start()

    async def async_stop_learning(self) -> None:
        """Stop an active learning session."""

        await self._learn_manager.async_stop()

    def iter_devices(self) -> Iterable[RaspyRFMDeviceEntry]:
        """Return all configured devices."""

        return self._storage.iter_devices()

    def iter_signal_mappings(self) -> Iterable[RaspyRFMSignalMapping]:
        """Return all stored signal mappings."""

        return self._map_storage.iter_mappings()

    def get_device(self, device_id: str) -> Optional[RaspyRFMDeviceEntry]:
        """Return a device by id."""

        return self._storage.get_device(device_id)

    async def async_create_device(
        self,
        name: str,
        device_type: str,
        signals: Dict[str, str],
        metadata: Optional[Dict[str, Any]] = None,
    ) -> RaspyRFMDeviceEntry:
        """Create a new device entry from learned signals."""

        device = RaspyRFMDeviceEntry(
            device_id=str(uuid.uuid4()),
            name=name,
            device_type=device_type,
            signals=signals,
            metadata=metadata or {},
        )
        await self._storage.async_add_or_update(device)
        async_dispatcher_send(self._hass, SIGNAL_DEVICE_REGISTRY_UPDATED, device.device_id)
        return device

    async def async_remove_device(self, device_id: str) -> None:
        """Remove a device entry."""

        await self._storage.async_remove(device_id)
        async_dispatcher_send(self._hass, SIGNAL_DEVICE_REMOVED, device_id)

    async def async_set_signal_mapping(
        self,
        payload: str,
        category: str,
        label: str,
        linked_devices: Optional[List[str]] = None,
    ) -> RaspyRFMSignalMapping:
        """Store metadata describing a learned payload."""

        mapping = RaspyRFMSignalMapping(
            payload=payload,
            category=category,
            label=label or payload,
            linked_devices=list(linked_devices or []),
        )
        await self._map_storage.async_set(mapping)
        return mapping

    async def async_remove_signal_mapping(self, payload: str) -> None:
        """Delete mapping metadata for a payload."""

        await self._map_storage.async_remove(payload)

    async def async_list_signal_mappings(self) -> List[Dict[str, Any]]:
        """Return a serialisable mapping overview."""

        return [mapping.to_dict() for mapping in self._map_storage.iter_mappings()]

    async def async_handle_learned_signal(self, signal: LearnedSignal, addr: tuple[str, int]) -> None:
        """Handle a signal received during a learning session."""

        async with self._signals_lock:
            self._active_signals[signal.uid] = ActiveSignal(
                signal=signal, received_at=datetime.utcnow(), source=addr
            )
        async_dispatcher_send(
            self._hass,
            SIGNAL_SIGNAL_RECEIVED,
            signal.to_dict(),
        )
        await self._maybe_match_devices(signal)

    async def _maybe_match_devices(self, signal: LearnedSignal) -> None:
        """Match a learned signal with configured devices and fire updates."""

        matched: List[str] = []
        for device in self._storage.iter_devices():
            if device.matches_signal(signal.payload):
                matched.append(device.device_id)

        if not matched:
            return

        for device_id in matched:
            async_dispatcher_send(self._hass, SIGNAL_DEVICE_REGISTRY_UPDATED, device_id)

    async def async_list_active_signals(self) -> List[Dict[str, Any]]:
        """Return a snapshot of all active signals."""

        async with self._signals_lock:
            return [signal.signal.to_dict() for signal in self._active_signals.values()]

    async def async_reload_devices(self) -> None:
        """Reload device information from disk."""

        await self._storage.async_load()
        async_dispatcher_send(self._hass, SIGNAL_DEVICE_REGISTRY_UPDATED, None)

    async def async_record_learning_state(self, active: bool) -> None:
        """Announce a change of the learning state."""

        async_dispatcher_send(self._hass, SIGNAL_LEARNING_STATE, {"active": active})

    async def async_send_raw(self, payload: str) -> None:
        """Send a raw UDP payload to the gateway."""

        await self._gateway.async_send_raw(payload)

    async def async_send_device_action(self, device_id: str, action: str) -> None:
        """Send a stored signal for the given device."""

        device = self._storage.get_device(device_id)
        if device is None:
            raise ValueError(f"Unknown device: {device_id}")

Signal learning pipeline

The LearnManager binds a UDP listener to DEFAULT_LISTEN_PORT (49881) and streams payloads into the hub. Incoming packets are normalised into LearnedSignal dataclasses and broadcast over dispatcher events so that the UI and entity platforms receive live updates. Each payload is fingerprinted against the raspyrfm-client device library using the classifier helper, which allows the UI to suggest an entity type even when users have not labelled the signal yet.

"""Signal learning helper for the RaspyRFM integration."""

from __future__ import annotations

import asyncio
from dataclasses import dataclass, field
from datetime import datetime
import logging
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple

from homeassistant.core import HomeAssistant

from .classifier import classify_payload
from .const import DEFAULT_LISTEN_PORT

if TYPE_CHECKING:
    from .hub import RaspyRFMHub

_LOGGER = logging.getLogger(__name__)


@dataclass(slots=True)
class LearnedSignal:
    """Representation of a learned radio signal."""

    uid: str
    payload: str
    received: datetime
    metadata: Dict[str, Any] = field(default_factory=dict)

    def to_dict(self) -> Dict[str, Any]:
        """Return a serialisable representation."""

        return {
            "uid": self.uid,
            "payload": self.payload,
            "received": self.received.isoformat(),
            "metadata": self.metadata,
        }


class RaspyRFMLearnProtocol(asyncio.DatagramProtocol):
    """Asyncio datagram protocol for capturing signals."""

    def __init__(self, manager: LearnManager) -> None:
        self._manager = manager

    def datagram_received(self, data: bytes, addr: Tuple[str, int]) -> None:
        payload = data.decode("utf-8", errors="ignore").strip()
        if not payload:
            return

        asyncio.create_task(self._manager.async_process_datagram(payload, addr))


class LearnManager:
    """Manage RaspyRFM learning sessions."""

    def __init__(self, hass: HomeAssistant, hub: RaspyRFMHub) -> None:
        self._hass = hass
        self._hub = hub
        self._transport: Optional[asyncio.transports.DatagramTransport] = None
        self._active = False
        self._listen_port = DEFAULT_LISTEN_PORT
        self._signals: List[LearnedSignal] = []
        self._lock = asyncio.Lock()

    @property
    def is_active(self) -> bool:
        """Return whether learning is active."""

        return self._active

    async def async_start(self) -> None:
        """Start listening for incoming datagrams."""

        if self._active:
            return

        loop = asyncio.get_running_loop()
        self._transport, _ = await loop.create_datagram_endpoint(
            lambda: RaspyRFMLearnProtocol(self), local_addr=("0.0.0.0", self._listen_port)
        )
        self._signals.clear()
        self._active = True
        try:
            await self._hub.async_send_raw("RXSTART")
        except OSError:
            _LOGGER.debug("Unable to send RXSTART command to gateway")
        await self._hub.async_record_learning_state(True)

    async def async_stop(self) -> None:
        """Stop listening for signals."""

        if not self._active:
            return

        if self._transport is not None:
            self._transport.close()
            self._transport = None
        self._active = False
        try:
            await self._hub.async_send_raw("RXSTOP")
        except OSError:
            _LOGGER.debug("Unable to send RXSTOP command to gateway")
        await self._hub.async_record_learning_state(False)

    async def async_process_datagram(self, payload: str, addr: Tuple[str, int]) -> None:
        """Process an incoming UDP datagram."""

        if not payload:
            return

        signal = LearnedSignal(
            uid=f"sig_{len(self._signals)+1}",
            payload=payload,
            received=datetime.utcnow(),
            metadata={"source": addr[0], "port": addr[1]},
        )
        classification = classify_payload(payload)
        if classification:
            signal.metadata["classification"] = classification.to_dict()
        async with self._lock:
            self._signals.append(signal)
        await self._hub.async_handle_learned_signal(signal, addr)

    async def async_list_signals(self) -> List[Dict[str, Any]]:
        """Return a list of captured signals."""

        async with self._lock:
            return [signal.to_dict() for signal in self._signals]

    async def async_clear_signals(self) -> None:
"""Signal classification helpers for RaspyRFM payloads."""

from __future__ import annotations

from dataclasses import dataclass
from functools import lru_cache
import logging
from typing import Dict, Iterable, List, Optional, Set, Tuple

from raspyrfm_client import RaspyRFMClient
from raspyrfm_client.device_implementations.controlunit.actions import Action

LOGGER = logging.getLogger(__name__)


@dataclass(frozen=True)
class SignalFingerprint:
    """Fingerprint describing a payload independent of the channel config."""

    repetitions: int
    gap: int
    timebase: int
    pulse_count: int
    min_pulse: int
    max_pulse: int


@dataclass(slots=True)
class SignalClassification:
    """Classification result for a payload."""

    actions: Set[Action]
    suggested_type: str

    def to_dict(self) -> Dict[str, object]:
        """Return a serialisable representation."""

        return {
            "actions": sorted(action.name.lower() for action in self.actions),
            "suggested_type": self.suggested_type,
        }


def classify_payload(payload: str) -> Optional[SignalClassification]:
    """Return a best-effort classification for a payload string."""

    fingerprint = _fingerprint_from_payload(payload)
    if fingerprint is None:
        return None

    action_candidates = _fingerprint_action_map().get(fingerprint)
    if not action_candidates:
        return None

    suggested_type = _actions_to_device_type(action_candidates)
    return SignalClassification(actions=set(action_candidates), suggested_type=suggested_type)


def _actions_to_device_type(actions: Iterable[Action]) -> str:
    """Translate a collection of supported actions into a RaspyRFM device type."""

    action_set = set(actions)
    if {Action.BRIGHT, Action.DIMM} & action_set:
        return "light"
    if Action.ON in action_set and Action.OFF in action_set:
        return "switch"
    if action_set == {Action.ON}:
        return "button"
    if action_set >= {Action.PAIR} or action_set >= {Action.UNPAIR}:
        # Pairing remotes act as buttons in the Home Assistant context.
        return "button"
    return "universal"


def _fingerprint_from_payload(payload: str) -> Optional[SignalFingerprint]:
    """Parse a payload into a fingerprint for comparison with the library table."""

    if not payload:
        return None

    body = payload.strip()
    if ":" in body:
        # Strip transport prefix (e.g. ``TXP:`` or ``RXP:``)
        try:
            _, body = body.split(":", 1)
        except ValueError:
            return None

    tokens = [token for token in body.split(",") if token]
    if len(tokens) < 6:
        return None

    try:
        repetitions = int(tokens[2])
        gap = int(tokens[3])
        timebase = int(tokens[4])
        pair_count = int(tokens[5])
    except ValueError:
        return None

    pulses: List[int] = []
    for token in tokens[6:6 + pair_count * 2]:
        try:
            pulses.append(int(token))
        except ValueError:
            return None

    if len(pulses) < 2:
        return None

    min_pulse = min(pulses)
    max_pulse = max(pulses)

    return SignalFingerprint(
        repetitions=repetitions,
        gap=gap,
        timebase=timebase,
        pulse_count=int(len(pulses) / 2),
        min_pulse=min_pulse,
        max_pulse=max_pulse,
    )


@lru_cache(maxsize=1)
def _fingerprint_action_map() -> Dict[SignalFingerprint, Set[Action]]:
    """Build a lookup table for the available payload fingerprints."""

    client = RaspyRFMClient()
    mapping: Dict[SignalFingerprint, Set[Action]] = {}

    for manufacturer in client.get_supported_controlunit_manufacturers():
        for model in client.get_supported_controlunit_models(manufacturer):
            device = client.get_controlunit(manufacturer, model)
            try:
                default_config = {
                    key: _default_value(regex)
                    for key, regex in device.get_channel_config_args().items()
                }
                device.set_channel_config(**default_config)
            except Exception as err:  # pragma: no cover - defensive fallback
                LOGGER.debug(
                    "Unable to build default configuration for %s %s: %s",
                    manufacturer,
                    model,
                    err,
                )
                continue

            for action in device.get_supported_actions():
                try:
                    pulses, repetitions, timebase = device.get_pulse_data(action)
                except Exception as err:  # pragma: no cover - defensive fallback
                    LOGGER.debug(
                        "Unable to build fingerprint for %s %s action %s: %s",
                        manufacturer,
                        model,
                        action,
                        err,
                    )
                    continue

                gap = 5600  # The RaspyRFM gateways always inject this constant gap.
                min_pulse, max_pulse = _pulse_bounds(pulses)
                fingerprint = SignalFingerprint(
                    repetitions=repetitions,
                    gap=gap,
                    timebase=timebase,
                    pulse_count=len(pulses),
                    min_pulse=min_pulse,
                    max_pulse=max_pulse,
                )
                mapping.setdefault(fingerprint, set()).add(action)

    return mapping


def _pulse_bounds(pulses: Iterable[Tuple[int, int]]) -> Tuple[int, int]:
    """Return the minimum and maximum pulse length from a sequence."""

    minima: List[int] = []
    maxima: List[int] = []
    for first, second in pulses:
        minima.append(min(first, second))
        maxima.append(max(first, second))
    return (min(minima), max(maxima))


def _default_value(regex: str) -> str:
    """Return a deterministic default value that satisfies a configuration regex."""

    pattern = regex.strip().lstrip("^").rstrip("$")
    if pattern == "[01]":
        return "0"
    if pattern == "[1-4]":
        return "1"
    if pattern == "[1-3]":
        return "1"
    if pattern == "[A-D]":
        return "A"
    if pattern == "[A-C]":
        return "A"
    if pattern == "[A-E]":
        return "A"
    if pattern == "[A-P]":
        return "A"
    if pattern == "[01fF]":
        return "0"
    if pattern == "[01]{12}":
        return "0" * 12
    if pattern == "[01]{26}":
        return "0" * 26
    if pattern == "[0-9A-F]{5}":
        return "00000"
    if pattern == "([1-9]|0[1-9]|1[0-6])":
        return "01"

    # Fall back to the first character when the pattern is a simple character set.
    if pattern.startswith("[") and pattern.endswith("]"):
        characters = pattern[1:-1]
        if characters and characters[0] != "^":

Persistent storage and device registry

Two storage helpers manage device definitions and optional metadata about captured payloads. The RaspyRFMDeviceStorage class keeps track of all entity types created from learned signals, including switches, lights, button groups, and universal listeners, while RaspyRFMSignalMapStorage stores labels, semantic categories, and links between payloads and devices.

"""Persistent storage helpers for RaspyRFM devices."""

from __future__ import annotations

from dataclasses import dataclass, field
from typing import Any, Dict, Iterable, List, Optional

from homeassistant.core import HomeAssistant
from homeassistant.helpers.storage import Store

from .const import (
    MAPPING_CATEGORIES,
    MAPPING_STORAGE_KEY,
    MAPPING_STORAGE_VERSION,
    STORAGE_KEY,
    STORAGE_VERSION,
)


@dataclass(slots=True)
class RaspyRFMDeviceEntry:
    """Representation of a configured device."""

    device_id: str
    name: str
    device_type: str
    signals: Dict[str, str]
    metadata: Dict[str, Any] = field(default_factory=dict)

    def to_dict(self) -> Dict[str, Any]:
        """Return a dictionary representation."""

        return {
            "device_id": self.device_id,
            "name": self.name,
            "device_type": self.device_type,
            "signals": self.signals,
            "metadata": self.metadata,
        }

    def matches_signal(self, payload: str) -> bool:
        """Return True if the payload matches this device."""

        return payload in self.signals.values()

    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> "RaspyRFMDeviceEntry":
        """Create an instance from stored data."""

        return cls(
            device_id=data["device_id"],
            name=data["name"],
            device_type=data.get("device_type", "switch"),
            signals=data.get("signals", {}),
            metadata=data.get("metadata", {}),
        )


@dataclass(slots=True)
class RaspyRFMSignalMapping:
    """Representation of a learned signal mapping."""

    payload: str
    category: str
    label: str
    linked_devices: list[str] = field(default_factory=list)

    def __post_init__(self) -> None:
        if self.category not in MAPPING_CATEGORIES:
            raise ValueError(f"Unknown mapping category: {self.category}")

    def to_dict(self) -> Dict[str, Any]:
        """Return a serialisable form."""

        return {
            "payload": self.payload,
            "category": self.category,
            "label": self.label,
            "linked_devices": self.linked_devices,
        }

    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> "RaspyRFMSignalMapping":
        """Create an instance from stored data."""

        return cls(
            payload=data["payload"],
            category=data.get("category", "other"),
            label=data.get("label", data.get("payload", "")),
            linked_devices=list(data.get("linked_devices", [])),
        )


class RaspyRFMDeviceStorage:
    """Storage helper managing device persistence."""

    def __init__(self, hass: HomeAssistant) -> None:
        self._hass = hass
        self._store: Store = Store(hass, STORAGE_VERSION, STORAGE_KEY)
        self._devices: Dict[str, RaspyRFMDeviceEntry] = {}

    async def async_load(self) -> None:
        """Load device information from disk."""

        data = await self._store.async_load()
        if not data:
            self._devices = {}
            return

        devices: List[RaspyRFMDeviceEntry] = [
            RaspyRFMDeviceEntry.from_dict(item) for item in data.get("devices", [])
        ]
        self._devices = {device.device_id: device for device in devices}

    async def async_unload(self) -> None:
        """Flush changes to disk."""

        await self._store.async_save({"devices": [device.to_dict() for device in self._devices.values()]})

    def iter_devices(self) -> Iterable[RaspyRFMDeviceEntry]:
        """Iterate over all devices."""

        return list(self._devices.values())

    def iter_devices_by_type(self, device_type: str) -> Iterable[RaspyRFMDeviceEntry]:
        """Iterate over devices of a specific type."""

        return [device for device in self._devices.values() if device.device_type == device_type]

    def get_device(self, device_id: str) -> Optional[RaspyRFMDeviceEntry]:
        """Return a device by id."""

        return self._devices.get(device_id)

    async def async_add_or_update(self, device: RaspyRFMDeviceEntry) -> None:
        """Persist a device entry."""

        self._devices[device.device_id] = device
        await self._store.async_save({"devices": [d.to_dict() for d in self._devices.values()]})

    async def async_remove(self, device_id: str) -> None:
        """Remove a device entry."""

        if device_id in self._devices:
            self._devices.pop(device_id)
            await self._store.async_save({"devices": [d.to_dict() for d in self._devices.values()]})


class RaspyRFMSignalMapStorage:
    """Storage helper for signal mapping metadata."""

    def __init__(self, hass: HomeAssistant) -> None:
        self._store = Store(hass, MAPPING_STORAGE_VERSION, MAPPING_STORAGE_KEY)
        self._mappings: Dict[str, RaspyRFMSignalMapping] = {}

    async def async_load(self) -> None:
        """Load mapping state from disk."""

        data = await self._store.async_load()
        if not data:
            self._mappings = {}
            return

        entries = [RaspyRFMSignalMapping.from_dict(item) for item in data.get("mappings", [])]
        self._mappings = {entry.payload: entry for entry in entries}

    async def async_unload(self) -> None:
        """Persist mapping state to disk."""

        await self._store.async_save({"mappings": [entry.to_dict() for entry in self._mappings.values()]})

    def iter_mappings(self) -> Iterable[RaspyRFMSignalMapping]:
        """Iterate over all known mappings."""

        return list(self._mappings.values())

    def get_mapping(self, payload: str) -> Optional[RaspyRFMSignalMapping]:
        """Return mapping information for a payload if present."""

        return self._mappings.get(payload)

    async def async_set(self, mapping: RaspyRFMSignalMapping) -> None:
        """Store or update a mapping entry."""

        self._mappings[mapping.payload] = mapping
        await self._store.async_save({"mappings": [entry.to_dict() for entry in self._mappings.values()]})

    async def async_remove(self, payload: str) -> None:
        """Remove a mapping entry."""

        if payload in self._mappings:
            self._mappings.pop(payload)
            await self._store.async_save({"mappings": [entry.to_dict() for entry in self._mappings.values()]})

Entity platforms

All entities share a base class that listens for dispatcher updates when devices change. The entity platforms iterate over stored devices, create entities on demand, and react to live signal messages from the learning pipeline. Besides switches and binary sensors the integration now exposes a light platform for dimmable actuators, a button platform that creates one entity per stored action, and a universal sensor that records and replays raw payloads when no fingerprint matches.

"""Base entity classes for the RaspyRFM integration."""

from __future__ import annotations

from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.helpers.entity import Entity

from .const import DOMAIN, SIGNAL_DEVICE_REGISTRY_UPDATED
from .hub import RaspyRFMHub
from .storage import RaspyRFMDeviceEntry


class RaspyRFMEntity(Entity):
    """Common representation of a RaspyRFM entity."""

    _attr_should_poll = False

    def __init__(self, hub: RaspyRFMHub, device: RaspyRFMDeviceEntry) -> None:
        self._hub = hub
        self._device = device
        self._unsub_dispatcher = None

    async def async_added_to_hass(self) -> None:
        await super().async_added_to_hass()

        def _handle_update(device_id: str | None) -> None:
            if device_id is None or device_id == self._device.device_id:
                new_device = self._hub.get_device(self._device.device_id)
                if new_device:
                    self._device = new_device
                self.async_write_ha_state()

        self._unsub_dispatcher = async_dispatcher_connect(
            self.hass, SIGNAL_DEVICE_REGISTRY_UPDATED, _handle_update
        )

    async def async_will_remove_from_hass(self) -> None:
        if self._unsub_dispatcher is not None:
            self._unsub_dispatcher()
            self._unsub_dispatcher = None
        await super().async_will_remove_from_hass()

    @property
    def device_info(self) -> dict:
        return {
            "identifiers": {(DOMAIN, self._hub.gateway.host)},
            "manufacturer": "Seegel Systeme",
            "name": "RaspyRFM",
        }

    @property
    def unique_id(self) -> str:
        return self._device.device_id

    @property
    def name(self) -> str:
        return self._device.name
"""Switch platform for RaspyRFM."""

from __future__ import annotations

import logging
from typing import Any, Dict

from homeassistant.components.switch import SwitchEntity
from homeassistant.core import callback
from homeassistant.helpers.dispatcher import async_dispatcher_connect

from .const import DOMAIN, SIGNAL_DEVICE_REGISTRY_UPDATED, SIGNAL_SIGNAL_RECEIVED
from .entity import RaspyRFMEntity
from .hub import RaspyRFMHub
from .storage import RaspyRFMDeviceEntry


_LOGGER = logging.getLogger(__name__)


async def async_setup_entry(hass, entry, async_add_entities):
    hub: RaspyRFMHub = hass.data[DOMAIN][entry.entry_id]
    entities: Dict[str, RaspyRFMSwitch] = {}

    @callback
    def _ensure_entities() -> None:
        new_entities = []
        for device in hub.storage.iter_devices_by_type("switch"):
            if device.device_id in entities:
                continue
            entity = RaspyRFMSwitch(hub, device)
            entities[device.device_id] = entity
            new_entities.append(entity)
        if new_entities:
            async_add_entities(new_entities)

    _ensure_entities()

    @callback
    def handle_device_update(device_id: str | None) -> None:
        if device_id is None or device_id not in entities:
            _ensure_entities()

    entry.async_on_unload(
        async_dispatcher_connect(hass, SIGNAL_DEVICE_REGISTRY_UPDATED, handle_device_update)
    )


class RaspyRFMSwitch(RaspyRFMEntity, SwitchEntity):
    """Representation of a learned switch."""

    def __init__(self, hub: RaspyRFMHub, device: RaspyRFMDeviceEntry) -> None:
        super().__init__(hub, device)
        self._attr_is_on = False
        self._signal_unsub = None

    async def async_added_to_hass(self) -> None:
        await super().async_added_to_hass()

        @callback
        def handle_signal(event: Dict[str, Any]) -> None:
            payload = event.get("payload")
            if payload == self._device.signals.get("on"):
                self._attr_is_on = True
                self.async_write_ha_state()
            elif payload == self._device.signals.get("off"):
                self._attr_is_on = False
                self.async_write_ha_state()

        self._signal_unsub = async_dispatcher_connect(
            self.hass, SIGNAL_SIGNAL_RECEIVED, handle_signal
        )

    async def async_will_remove_from_hass(self) -> None:
        if self._signal_unsub is not None:
            self._signal_unsub()
            self._signal_unsub = None
        await super().async_will_remove_from_hass()

    async def async_turn_on(self, **kwargs: Any) -> None:
        signal = self._device.signals.get("on")
        if signal is None:
            raise ValueError("No ON signal stored for this device")
        await self._hub.async_send_raw(signal)
        self._attr_is_on = True
        self.async_write_ha_state()

    async def async_turn_off(self, **kwargs: Any) -> None:
        signal = self._device.signals.get("off")
        if signal is None:
            _LOGGER.warning(
                "Device %s has no OFF signal stored; ignoring turn_off request",
                self._device.device_id,
            )
            return
        await self._hub.async_send_raw(signal)
        self._attr_is_on = False
        self.async_write_ha_state()
"""Binary sensor platform for RaspyRFM."""

from __future__ import annotations

import asyncio
from typing import Any, Dict

from homeassistant.components.binary_sensor import BinarySensorEntity
from homeassistant.core import callback
from homeassistant.helpers.dispatcher import async_dispatcher_connect

from .const import DOMAIN, SIGNAL_DEVICE_REGISTRY_UPDATED, SIGNAL_SIGNAL_RECEIVED
from .entity import RaspyRFMEntity
from .hub import RaspyRFMHub
from .storage import RaspyRFMDeviceEntry

RESET_TIMEOUT = 5


async def async_setup_entry(hass, entry, async_add_entities):
    hub: RaspyRFMHub = hass.data[DOMAIN][entry.entry_id]
    entities: Dict[str, RaspyRFMBinarySensor] = {}

    @callback
    def _ensure_entities() -> None:
        new_entities = []
        for device in hub.storage.iter_devices_by_type("binary_sensor"):
            if device.device_id in entities:
                continue
            entity = RaspyRFMBinarySensor(hub, device)
            entities[device.device_id] = entity
            new_entities.append(entity)
        if new_entities:
            async_add_entities(new_entities)

    _ensure_entities()

    @callback
    def handle_device_update(device_id: str | None) -> None:
        if device_id is None or device_id not in entities:
            _ensure_entities()

    entry.async_on_unload(
        async_dispatcher_connect(hass, SIGNAL_DEVICE_REGISTRY_UPDATED, handle_device_update)
    )


class RaspyRFMBinarySensor(RaspyRFMEntity, BinarySensorEntity):
    """Representation of a learned binary sensor."""

    def __init__(self, hub: RaspyRFMHub, device: RaspyRFMDeviceEntry) -> None:
        super().__init__(hub, device)
        self._reset_handle: asyncio.TimerHandle | None = None
        self._signal_unsub = None
        self._attr_is_on = False

    async def async_added_to_hass(self) -> None:
        await super().async_added_to_hass()

        @callback
        def handle_signal(event: Dict[str, Any]) -> None:
            payload = event.get("payload")
            if payload not in self._device.signals.values():
                return
            self._attr_is_on = True
            self.async_write_ha_state()
            if self._reset_handle is not None:
                self._reset_handle.cancel()
            self._reset_handle = self.hass.loop.call_later(RESET_TIMEOUT, self._reset_state)

        self._signal_unsub = async_dispatcher_connect(
            self.hass, SIGNAL_SIGNAL_RECEIVED, handle_signal
        )

    async def async_will_remove_from_hass(self) -> None:
        if self._signal_unsub is not None:
            self._signal_unsub()
            self._signal_unsub = None
        if self._reset_handle is not None:
            self._reset_handle.cancel()
            self._reset_handle = None
        await super().async_will_remove_from_hass()

    @callback
    def _reset_state(self) -> None:
        self._reset_handle = None
        self._attr_is_on = False
        self.async_write_ha_state()
"""Light platform for RaspyRFM."""

from __future__ import annotations

from typing import Any, Dict

from homeassistant.components.light import ColorMode, LightEntity
from homeassistant.core import callback
from homeassistant.helpers.dispatcher import async_dispatcher_connect

from .const import DOMAIN, SIGNAL_DEVICE_REGISTRY_UPDATED, SIGNAL_SIGNAL_RECEIVED
from .entity import RaspyRFMEntity
from .hub import RaspyRFMHub
from .storage import RaspyRFMDeviceEntry


async def async_setup_entry(hass, entry, async_add_entities):
    hub: RaspyRFMHub = hass.data[DOMAIN][entry.entry_id]
    entities: Dict[str, RaspyRFMLight] = {}

    @callback
    def _ensure_entities() -> None:
        new_entities = []
        for device in hub.storage.iter_devices_by_type("light"):
            if device.device_id in entities:
                continue
            entity = RaspyRFMLight(hub, device)
            entities[device.device_id] = entity
            new_entities.append(entity)
        if new_entities:
            async_add_entities(new_entities)

    _ensure_entities()

    @callback
    def handle_device_update(device_id: str | None) -> None:
        if device_id is None or device_id not in entities:
            _ensure_entities()

    entry.async_on_unload(
        async_dispatcher_connect(hass, SIGNAL_DEVICE_REGISTRY_UPDATED, handle_device_update)
    )


class RaspyRFMLight(RaspyRFMEntity, LightEntity):
    """Representation of a dimmable RaspyRFM light."""

    _attr_supported_color_modes = {ColorMode.ONOFF}
    _attr_color_mode = ColorMode.ONOFF

    def __init__(self, hub: RaspyRFMHub, device: RaspyRFMDeviceEntry) -> None:
        super().__init__(hub, device)
        self._attr_is_on = False
        self._signal_unsub = None

    async def async_added_to_hass(self) -> None:
        await super().async_added_to_hass()

        @callback
        def handle_signal(event: Dict[str, Any]) -> None:
            payload = event.get("payload")
            if payload == self._device.signals.get("on"):
                self._attr_is_on = True
            elif payload == self._device.signals.get("off"):
                self._attr_is_on = False
            elif payload == self._device.signals.get("bright"):
                self._attr_is_on = True
            elif payload == self._device.signals.get("dim") and "off" not in self._device.signals:
                # Devices without an explicit OFF signal often dim to turn off.
                self._attr_is_on = False
            else:
                return
            self.async_write_ha_state()

        self._signal_unsub = async_dispatcher_connect(
            self.hass, SIGNAL_SIGNAL_RECEIVED, handle_signal
        )

    async def async_will_remove_from_hass(self) -> None:
        if self._signal_unsub is not None:
            self._signal_unsub()
            self._signal_unsub = None
        await super().async_will_remove_from_hass()

    @property
    def extra_state_attributes(self) -> Dict[str, Any]:
        return {"available_signals": list(self._device.signals.keys())}

    async def async_turn_on(self, **kwargs: Any) -> None:
        payload = self._device.signals.get("on") or self._device.signals.get("bright")
        if payload is None:
            raise ValueError("No ON or BRIGHT signal stored for this device")
        await self._hub.async_send_raw(payload)
        self._attr_is_on = True
        self.async_write_ha_state()

    async def async_turn_off(self, **kwargs: Any) -> None:
        payload = self._device.signals.get("off") or self._device.signals.get("dim")
        if payload is None:
            raise ValueError("No OFF or DIM signal stored for this device")
        await self._hub.async_send_raw(payload)
        self._attr_is_on = False
        self.async_write_ha_state()
"""Button platform for RaspyRFM."""

from __future__ import annotations

from datetime import datetime
from typing import Any, Dict

from homeassistant.components.button import ButtonEntity
from homeassistant.core import callback
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.util import dt as dt_util

from .const import DOMAIN, SIGNAL_DEVICE_REGISTRY_UPDATED, SIGNAL_SIGNAL_RECEIVED
from .entity import RaspyRFMEntity
from .hub import RaspyRFMHub
from .storage import RaspyRFMDeviceEntry


async def async_setup_entry(hass, entry, async_add_entities):
    hub: RaspyRFMHub = hass.data[DOMAIN][entry.entry_id]
    entities: Dict[str, RaspyRFMButton] = {}

    @callback
    def _ensure_entities() -> None:
        new_entities = []
        for device in hub.storage.iter_devices_by_type("button"):
            for action in sorted(device.signals.keys()):
                key = f"{device.device_id}:{action}"
                if key in entities:
                    continue
                entity = RaspyRFMButton(hub, device, action)
                entities[key] = entity
                new_entities.append(entity)
        if new_entities:
            async_add_entities(new_entities)

    _ensure_entities()

    @callback
    def handle_device_update(device_id: str | None) -> None:
        if device_id is None:
            _ensure_entities()
            return
        device = hub.storage.get_device(device_id)
        if device is None:
            _ensure_entities()
            return
        stale_keys = [
            key
            for key in list(entities)
            if key.startswith(f"{device_id}:")
            and key.split(":", 1)[1] not in device.signals
        ]
        for key in stale_keys:
            entity = entities.pop(key)
            if entity.hass is not None:
                entity.hass.async_create_task(entity.async_remove())
        for action in device.signals.keys():
            key = f"{device_id}:{action}"
            if key not in entities:
                _ensure_entities()
                return
        # Existing entities will refresh their state on next dispatcher tick.

    entry.async_on_unload(
        async_dispatcher_connect(hass, SIGNAL_DEVICE_REGISTRY_UPDATED, handle_device_update)
    )


class RaspyRFMButton(RaspyRFMEntity, ButtonEntity):
    """Representation of a RaspyRFM button action."""

    def __init__(self, hub: RaspyRFMHub, device: RaspyRFMDeviceEntry, action: str) -> None:
        super().__init__(hub, device)
        self._action = action
        self._last_triggered: datetime | None = None
        self._signal_unsub = None

    @property
    def name(self) -> str:
        base = super().name
        label = self._action.replace("_", " ").title()
        return f"{base} {label}"

    @property
    def unique_id(self) -> str:
        return f"{super().unique_id}:{self._action}"

    async def async_added_to_hass(self) -> None:
        await super().async_added_to_hass()

        @callback
        def handle_signal(event: Dict[str, Any]) -> None:
            payload = event.get("payload")
            if payload == self._device.signals.get(self._action):
                self._last_triggered = dt_util.utcnow()
                self.async_write_ha_state()

        self._signal_unsub = async_dispatcher_connect(
            self.hass, SIGNAL_SIGNAL_RECEIVED, handle_signal
        )

    async def async_will_remove_from_hass(self) -> None:
        if self._signal_unsub is not None:
            self._signal_unsub()
            self._signal_unsub = None
        await super().async_will_remove_from_hass()

    @property
    def extra_state_attributes(self) -> Dict[str, Any]:
        attrs: Dict[str, Any] = {
            "action": self._action,
        }
        if self._last_triggered is not None:
            attrs["last_triggered"] = self._last_triggered.isoformat()
        return attrs

    async def async_press(self, **kwargs: Any) -> None:
        payload = self._device.signals.get(self._action)
        if payload is None:
            raise ValueError(f"No signal stored for action {self._action}")
        await self._hub.async_send_raw(payload)
        self._last_triggered = dt_util.utcnow()
        self.async_write_ha_state()
"""Sensor platform for universal RaspyRFM devices."""

from __future__ import annotations

from typing import Any, Dict

from homeassistant.components.sensor import SensorEntity
from homeassistant.core import callback
from homeassistant.helpers.dispatcher import async_dispatcher_connect

from .const import DOMAIN, SIGNAL_DEVICE_REGISTRY_UPDATED, SIGNAL_SIGNAL_RECEIVED
from .entity import RaspyRFMEntity
from .hub import RaspyRFMHub
from .storage import RaspyRFMDeviceEntry


async def async_setup_entry(hass, entry, async_add_entities):
    hub: RaspyRFMHub = hass.data[DOMAIN][entry.entry_id]
    entities: Dict[str, RaspyRFMUniversalSensor] = {}

    @callback
    def _ensure_entities() -> None:
        new_entities = []
        for device in hub.storage.iter_devices_by_type("universal"):
            if device.device_id in entities:
                continue
            entity = RaspyRFMUniversalSensor(hub, device)
            entities[device.device_id] = entity
            new_entities.append(entity)
        if new_entities:
            async_add_entities(new_entities)

    _ensure_entities()

    @callback
    def handle_device_update(device_id: str | None) -> None:
        if device_id is None or device_id not in entities:
            _ensure_entities()

    entry.async_on_unload(
        async_dispatcher_connect(hass, SIGNAL_DEVICE_REGISTRY_UPDATED, handle_device_update)
    )


class RaspyRFMUniversalSensor(RaspyRFMEntity, SensorEntity):
    """A best-effort entity for devices without a dedicated platform."""

    _attr_icon = "mdi:radio-tower"

    def __init__(self, hub: RaspyRFMHub, device: RaspyRFMDeviceEntry) -> None:
        super().__init__(hub, device)
        self._attr_native_value = None
        self._signal_unsub = None

    async def async_added_to_hass(self) -> None:
        await super().async_added_to_hass()

        @callback
        def handle_signal(event: Dict[str, Any]) -> None:
            payload = event.get("payload")
            for action, stored_payload in self._device.signals.items():
                if payload == stored_payload:
                    self._attr_native_value = action
                    self.async_write_ha_state()
                    break

        self._signal_unsub = async_dispatcher_connect(
            self.hass, SIGNAL_SIGNAL_RECEIVED, handle_signal
        )

    async def async_will_remove_from_hass(self) -> None:
        if self._signal_unsub is not None:
            self._signal_unsub()
            self._signal_unsub = None
        await super().async_will_remove_from_hass()

    @property
    def extra_state_attributes(self) -> Dict[str, Any]:
        return {
            "available_actions": list(self._device.signals.keys()),
        }

Websocket API surface

The integration exposes a websocket namespace under raspyrfm/. The commands cover the full lifecycle: starting and stopping capture, listing signals, creating or deleting devices, triggering stored actions, and maintaining the optional signal mapping metadata.

"""Websocket commands exposed by the RaspyRFM integration."""

from __future__ import annotations

import logging
from typing import Any, Dict, Set

import voluptuous as vol

from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.components import websocket_api

from .const import (
    DOMAIN,
    MAPPING_CATEGORIES,
    SIGNAL_LEARNING_STATE,
    SIGNAL_SIGNAL_RECEIVED,
    WS_TYPE_DEVICE_CREATE,
    WS_TYPE_DEVICE_DELETE,
    WS_TYPE_DEVICE_LIST,
    WS_TYPE_DEVICE_RELOAD,
    WS_TYPE_DEVICE_SEND,
    WS_TYPE_LEARNING_START,
    WS_TYPE_LEARNING_STATUS,
    WS_TYPE_LEARNING_STOP,
    WS_TYPE_LEARNING_SUBSCRIBE,
    WS_TYPE_SIGNAL_MAP_DELETE,
    WS_TYPE_SIGNAL_MAP_LIST,
    WS_TYPE_SIGNAL_MAP_UPDATE,
    WS_TYPE_SIGNALS_LIST,
    WS_TYPE_SIGNALS_SUBSCRIBE,
)
from .hub import RaspyRFMHub

_LOGGER = logging.getLogger(__name__)

HANDLERS_REGISTERED = "_raspyrfm_ws_handlers"


def async_register_websocket_handlers(hass: HomeAssistant) -> None:
    """Register websocket commands."""

    if hass.data.get(HANDLERS_REGISTERED):
        return

    websocket_api.async_register_command(hass, handle_learning_start)
    websocket_api.async_register_command(hass, handle_learning_stop)
    websocket_api.async_register_command(hass, handle_learning_status)
    websocket_api.async_register_command(hass, handle_learning_subscribe)
    websocket_api.async_register_command(hass, handle_signals_list)
    websocket_api.async_register_command(hass, handle_signals_subscribe)
    websocket_api.async_register_command(hass, handle_device_create)
    websocket_api.async_register_command(hass, handle_device_delete)
    websocket_api.async_register_command(hass, handle_device_list)
    websocket_api.async_register_command(hass, handle_device_reload)
    websocket_api.async_register_command(hass, handle_device_send_action)
    websocket_api.async_register_command(hass, handle_signal_map_list)
    websocket_api.async_register_command(hass, handle_signal_map_update)
    websocket_api.async_register_command(hass, handle_signal_map_delete)

    hass.data[HANDLERS_REGISTERED] = True


def _get_hub(hass: HomeAssistant, msg: Dict[str, Any]) -> RaspyRFMHub:
    entry_id = msg.get("entry_id")
    if entry_id is None:
        # fallback to first entry
        if DOMAIN not in hass.data or not hass.data[DOMAIN]:
            raise websocket_api.HomeAssistantWebSocketError("No RaspyRFM entries configured")
        candidates = [
            key for key in hass.data[DOMAIN] if not key.startswith("_")
        ]
        if not candidates:
            raise websocket_api.HomeAssistantWebSocketError("No RaspyRFM entries configured")
        entry_id = candidates[0]

    hub = hass.data[DOMAIN].get(entry_id)
    if hub is None:
        raise websocket_api.HomeAssistantWebSocketError("Unknown RaspyRFM entry")
    return hub


@websocket_api.websocket_command({vol.Required("type"): WS_TYPE_LEARNING_START, vol.Optional("entry_id"): str})
@websocket_api.async_response
async def handle_learning_start(hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: Dict[str, Any]) -> None:
    """Start capturing signals."""

    hub = _get_hub(hass, msg)
    await hub.async_start_learning()
    connection.send_result(msg["id"], {"active": True})


@websocket_api.websocket_command({vol.Required("type"): WS_TYPE_LEARNING_STOP, vol.Optional("entry_id"): str})
@websocket_api.async_response
async def handle_learning_stop(hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: Dict[str, Any]) -> None:
    """Stop capturing signals."""

    hub = _get_hub(hass, msg)
    await hub.async_stop_learning()
    connection.send_result(msg["id"], {"active": False})


@websocket_api.websocket_command({vol.Required("type"): WS_TYPE_LEARNING_STATUS, vol.Optional("entry_id"): str})
@websocket_api.async_response
async def handle_learning_status(hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: Dict[str, Any]) -> None:
    """Return the current learning state."""

    hub = _get_hub(hass, msg)
    connection.send_result(msg["id"], {"active": hub.learn_manager.is_active})


@websocket_api.websocket_command({vol.Required("type"): WS_TYPE_LEARNING_SUBSCRIBE, vol.Optional("entry_id"): str})
@callback
def handle_learning_subscribe(hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: Dict[str, Any]) -> None:
    """Subscribe to learning state updates."""

    @callback
    def forward(payload: Dict[str, Any]) -> None:
        connection.send_message(websocket_api.event_message(msg["id"], payload))

    connection.subscriptions[msg["id"]] = async_dispatcher_connect(
        hass, SIGNAL_LEARNING_STATE, forward
    )
    connection.send_result(msg["id"])


@websocket_api.websocket_command({vol.Required("type"): WS_TYPE_SIGNALS_LIST, vol.Optional("entry_id"): str})
@websocket_api.async_response
async def handle_signals_list(hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: Dict[str, Any]) -> None:
    """Return a list of currently captured signals."""

    hub = _get_hub(hass, msg)
    signals = await hub.learn_manager.async_list_signals()
    connection.send_result(msg["id"], {"signals": signals})


@websocket_api.websocket_command({vol.Required("type"): WS_TYPE_SIGNALS_SUBSCRIBE, vol.Optional("entry_id"): str})
@callback
def handle_signals_subscribe(hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: Dict[str, Any]) -> None:
    """Subscribe to incoming signals."""

    @callback
    def forward(payload: Dict[str, Any]) -> None:
        connection.send_message(websocket_api.event_message(msg["id"], payload))

    connection.subscriptions[msg["id"]] = async_dispatcher_connect(
        hass, SIGNAL_SIGNAL_RECEIVED, forward
    )
    connection.send_result(msg["id"])


SUPPORTED_DEVICE_TYPES = ["switch", "binary_sensor", "light", "button", "universal"]


_DEVICE_CREATE_SCHEMA = websocket_api.BASE_COMMAND_MESSAGE_SCHEMA.extend(
    {
        vol.Required("type"): WS_TYPE_DEVICE_CREATE,
        vol.Required("name"): cv.string,
        vol.Required("device_type"): vol.In(SUPPORTED_DEVICE_TYPES),
        vol.Required("signals"): {cv.string: cv.string},
        vol.Optional("metadata"): {cv.string: cv.Any()},
        vol.Optional("entry_id"): str,
    }
)


@websocket_api.websocket_command(_DEVICE_CREATE_SCHEMA)
@websocket_api.async_response
async def handle_device_create(hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: Dict[str, Any]) -> None:
    """Create a device from captured signals."""

    hub = _get_hub(hass, msg)
    _validate_device_payload(msg["device_type"], msg["signals"])
    device = await hub.async_create_device(
        msg["name"], msg["device_type"], msg["signals"], msg.get("metadata")
    )
    connection.send_result(msg["id"], {"device": device.to_dict()})


@websocket_api.websocket_command({
    vol.Required("type"): WS_TYPE_DEVICE_DELETE,
    vol.Required("device_id"): cv.string,
    vol.Optional("entry_id"): str,
})
@websocket_api.async_response
async def handle_device_delete(hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: Dict[str, Any]) -> None:
    """Delete a stored device."""

    hub = _get_hub(hass, msg)
    await hub.async_remove_device(msg["device_id"])
    connection.send_result(msg["id"], {})


@websocket_api.websocket_command({vol.Required("type"): WS_TYPE_DEVICE_LIST, vol.Optional("entry_id"): str})
@websocket_api.async_response
async def handle_device_list(hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: Dict[str, Any]) -> None:
    """Return the list of stored devices."""

    hub = _get_hub(hass, msg)
    devices = [device.to_dict() for device in hub.iter_devices()]
    connection.send_result(msg["id"], {"devices": devices})


@websocket_api.websocket_command({vol.Required("type"): WS_TYPE_DEVICE_RELOAD, vol.Optional("entry_id"): str})
@websocket_api.async_response
async def handle_device_reload(hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: Dict[str, Any]) -> None:
    """Reload devices from persistent storage."""

    hub = _get_hub(hass, msg)
    await hub.async_reload_devices()
    connection.send_result(msg["id"], {})


@websocket_api.websocket_command(
    {
        vol.Required("type"): WS_TYPE_DEVICE_SEND,
        vol.Required("device_id"): cv.string,
        vol.Required("action"): cv.string,
        vol.Optional("entry_id"): str,
    }
)
@websocket_api.async_response
async def handle_device_send_action(
    hass: HomeAssistant,
    connection: websocket_api.ActiveConnection,
    msg: Dict[str, Any],
) -> None:
    """Trigger a stored signal for a RaspyRFM device."""

    hub = _get_hub(hass, msg)
    try:
        await hub.async_send_device_action(msg["device_id"], msg["action"])
    except ValueError as err:
        raise websocket_api.HomeAssistantWebSocketError(str(err)) from err
    connection.send_result(msg["id"], {})


def _validate_device_payload(device_type: str, signals: Dict[str, str]) -> None:
    """Ensure the provided payload mapping matches the expected schema."""

    required: Dict[str, str] = {}
    optional: Set[str] = set()

    if device_type == "switch":
        required = {"on": "ON payload", "off": "OFF payload"}
    elif device_type == "binary_sensor":
        required = {"trigger": "Trigger payload"}
    elif device_type == "light":
        required = {"on": "ON payload"}
        optional = {"off", "bright", "dim"}
    elif device_type in {"button", "universal"}:
        if not signals:
            raise websocket_api.HomeAssistantWebSocketError(
                "Provide at least one signal mapping"
            )
    else:
        raise websocket_api.HomeAssistantWebSocketError("Unsupported device type")

    missing = [key for key in required if key not in signals or not signals[key]]
    if missing:
        raise websocket_api.HomeAssistantWebSocketError(
            f"Missing {', '.join(missing)} for {device_type}"
        )

    if device_type == "light" and not any(
        signals.get(key) for key in ("off", "dim")
    ):
        raise websocket_api.HomeAssistantWebSocketError(
            "Provide either an OFF or DIM payload for light devices"
        )

    unexpected = [
        key
        for key in signals
        if key not in required and key not in optional and device_type not in {"button", "universal"}
    ]
    if unexpected:
        raise websocket_api.HomeAssistantWebSocketError(
            f"Unexpected signals for {device_type}: {', '.join(unexpected)}"
        )


@websocket_api.websocket_command({vol.Required("type"): WS_TYPE_SIGNAL_MAP_LIST, vol.Optional("entry_id"): str})
@websocket_api.async_response
async def handle_signal_map_list(hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: Dict[str, Any]) -> None:
    """Return the stored signal mapping metadata."""

    hub = _get_hub(hass, msg)
    mappings = await hub.async_list_signal_mappings()
    connection.send_result(msg["id"], {"mappings": mappings})


_SIGNAL_MAP_UPDATE_SCHEMA = websocket_api.BASE_COMMAND_MESSAGE_SCHEMA.extend(
    {
        vol.Required("type"): WS_TYPE_SIGNAL_MAP_UPDATE,
        vol.Required("payload"): cv.string,
        vol.Required("category"): vol.In(MAPPING_CATEGORIES),
        vol.Required("label"): cv.string,
        vol.Optional("linked_devices"): [cv.string],
        vol.Optional("entry_id"): str,
    }
)


@websocket_api.websocket_command(_SIGNAL_MAP_UPDATE_SCHEMA)
@websocket_api.async_response
async def handle_signal_map_update(hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: Dict[str, Any]) -> None:
    """Store mapping information for a payload."""

    hub = _get_hub(hass, msg)
    mapping = await hub.async_set_signal_mapping(
        msg["payload"],
        msg["category"],
        msg["label"],
        msg.get("linked_devices"),
    )
    connection.send_result(msg["id"], {"mapping": mapping.to_dict()})

Panel registration and static assets

Every config entry registers a static path and serves a custom panel that is restricted to administrators. The panel is implemented as a LitElement module that runs entirely inside the Home Assistant frontend.

"""Frontend panel registration for RaspyRFM."""

from __future__ import annotations

from importlib import resources
from typing import Set

from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant

from .const import PANEL_ICON, PANEL_TITLE, PANEL_URL_PATH

STATIC_PATH = "/raspyrfm_static"


async def async_register_panel(hass: HomeAssistant, entry: ConfigEntry) -> None:
    """Register the RaspyRFM panel."""

    entries: Set[str] = hass.data.setdefault("_raspyrfm_panel_entries", set())
    if entry.entry_id in entries:
        return

    if not entries:
        panel_path = resources.files(__package__).joinpath("frontend")
        hass.http.register_static_path(STATIC_PATH, str(panel_path), cache_headers=False)
        hass.components.frontend.async_register_panel(
            component_name="custom",
            frontend_url_path=PANEL_URL_PATH,
            webcomponent_path=f"{STATIC_PATH}/raspyrfm-panel.js",
            config={"name": PANEL_TITLE, "icon": PANEL_ICON},
            require_admin=True,
        )

    entries.add(entry.entry_id)


async def async_unregister_panel(hass: HomeAssistant, entry: ConfigEntry) -> None:
    """Unregister panel for a given entry."""

    entries: Set[str] = hass.data.get("_raspyrfm_panel_entries", set())
    entries.discard(entry.entry_id)

    if entries:
        return

    hass.components.frontend.async_remove_panel(PANEL_URL_PATH)
    hass.http.unregister_static_path(STATIC_PATH)
    hass.data.pop("_raspyrfm_panel_entries", None)

Frontend component

The raspyrfm-panel web component drives the onboarding experience for RaspyRFM users. It exposes learning controls, renders cards summarising captured payloads, fingerprints signals against the Python library to suggest device types, provides a flexible creation form, and lets users map payloads to semantic categories with optional device links. The component relies exclusively on the websocket commands described above, so no additional backend endpoints are required.

const { LitElement, html, css } = window;

const MAX_SIGNAL_HISTORY = 200;

class RaspyRFMPanel extends LitElement {
  static get properties() {
    return {
      hass: {},
      learning: { type: Boolean },
      signals: { state: true },
      devices: { state: true },
      signalMappings: { state: true },
      formType: { state: true },
      formName: { state: true },
      formSignals: { state: true },
      formActions: { state: true },
      formCustomAction: { state: true },
      error: { state: true },
      successMessage: { state: true },
      infoMessage: { state: true },
    };
  }

  static get styles() {
    return css`
      :host {
        display: block;
        min-height: 100%;
        padding: 24px;
        background: linear-gradient(135deg, var(--primary-background-color), rgba(0, 0, 0, 0))
          no-repeat;
        --success-color-fallback: #4caf50;
        --info-color-fallback: #2196f3;
      }

      h2.section-title {
        font-size: 1.4rem;
        font-weight: 600;
        margin: 0 0 12px;
        display: flex;
        align-items: center;
        gap: 8px;
      }

      .layout {
        display: grid;
        gap: 24px;
      }

      .split-columns {
        display: grid;
        grid-template-columns: repeat(auto-fit, minmax(320px, 1fr));
        gap: 24px;
      }

      .actions {
        display: flex;
        flex-wrap: wrap;
        gap: 12px;
        margin-bottom: 24px;
      }

      ha-card {
        border-radius: 18px;
        box-shadow: var(--ha-card-box-shadow, 0 10px 30px rgba(0, 0, 0, 0.12));
        overflow: hidden;
      }

      ha-card .card-content {
        padding: 20px;
      }

      table {
        width: 100%;
        border-collapse: collapse;
      }

      th,
      td {
        padding: 10px;
        border-bottom: 1px solid var(--divider-color);
        font-size: 0.95rem;
      }

      tbody tr:last-child td {
        border-bottom: none;
      }

      .signal-list {
        max-height: 320px;
        overflow-y: auto;
        display: grid;
        gap: 12px;
      }

      .signal-entry {
        display: flex;
        justify-content: space-between;
        align-items: center;
        padding: 14px;
        border-radius: 12px;
        background: rgba(0, 0, 0, 0.04);
      }

      .meta-block {
        display: flex;
        flex-direction: column;
        gap: 4px;
      }

      .signal-meta {
        font-size: 12px;
        color: var(--secondary-text-color);
      }

      .form-grid {
        display: grid;
        gap: 16px;
      }

      .form-row {
        display: flex;
        gap: 12px;
        align-items: center;
        flex-wrap: wrap;
      }

      .pill {
        padding: 2px 8px;
        border-radius: 12px;
        background-color: var(--accent-color);
        color: var(--text-primary-color);
        font-size: 12px;
      }

      .chip {
        display: inline-flex;
        align-items: center;
        gap: 6px;
        padding: 4px 10px;
        border-radius: 14px;
        background: rgba(0, 0, 0, 0.08);
        font-size: 0.75rem;
        font-weight: 600;
        text-transform: uppercase;
        letter-spacing: 0.04em;
      }

      .chip.primary {
        background: rgba(25, 118, 210, 0.16);
        color: var(--primary-color);
      }

      .signal-actions {
        display: flex;
        flex-wrap: wrap;
        gap: 8px;
      }

      .signal-chips {
        display: flex;
        flex-wrap: wrap;
        gap: 8px;
        margin-top: 6px;
      }

      .signal-chip {
        display: flex;
        flex-direction: column;
        gap: 4px;
        padding: 10px;
        border-radius: 12px;
        background: rgba(0, 0, 0, 0.03);
      }

      .error {
        color: var(--error-color);
        margin-bottom: 12px;
        padding: 12px 16px;
        background: rgba(244, 67, 54, 0.1);
        border-radius: 8px;
        border-left: 4px solid var(--error-color);
      }

      .success {
        color: var(--success-color, var(--success-color-fallback));
        margin-bottom: 12px;
        padding: 12px 16px;
        background: rgba(76, 175, 80, 0.1);
        border-radius: 8px;
        border-left: 4px solid var(--success-color, var(--success-color-fallback));
      }

      .info {
        color: var(--info-color, var(--info-color-fallback));
        margin-bottom: 12px;
        padding: 12px 16px;
        background: rgba(33, 150, 243, 0.1);
        border-radius: 8px;
        border-left: 4px solid var(--info-color, var(--info-color-fallback));
      }

      .required {
        margin-left: 4px;
        color: var(--error-color);
      }

      .mapping-grid {
        display: grid;
        gap: 18px;
      }

      .mapping-item {
        display: grid;
        gap: 12px;
        padding: 16px;
        border-radius: 12px;
        background: rgba(0, 0, 0, 0.04);
      }

      .mapping-actions {
        display: flex;
        gap: 12px;
        flex-wrap: wrap;
      }

      mwc-button.danger,
      mwc-button.danger[unelevated] {
        --mdc-theme-primary: var(--error-color);
      }

      .device-checkboxes {
        display: flex;
        flex-wrap: wrap;
        gap: 8px;
      }

      .map-canvas {
        position: relative;
        border-radius: 16px;
        background: linear-gradient(135deg, rgba(0, 150, 136, 0.12), rgba(30, 136, 229, 0.12));
        padding: 12px;
        overflow: hidden;
      }

      svg.mapping {
        width: 100%;
        min-height: 260px;
      }

      .category-label {
        font-size: 0.8rem;
        font-weight: 600;
        fill: var(--primary-text-color);
      }

      .category-column {
        fill: rgba(255, 255, 255, 0.35);
      }

      .node-label {
        font-size: 0.75rem;
        fill: var(--primary-text-color);
      }

      .node-circle {
        fill: var(--accent-color);
        stroke: rgba(0, 0, 0, 0.2);
        stroke-width: 1;
      }

      @media (max-width: 768px) {
        :host {
          padding: 16px;
        }

        .actions {
          justify-content: stretch;
          flex-direction: column;
        }

        .actions mwc-button {
          width: 100%;
        }

        .signal-entry {
          flex-direction: column;
          align-items: stretch;
          gap: 12px;
        }

        .split-columns {
          grid-template-columns: 1fr;
        }

        h2.section-title {
          font-size: 1.2rem;
        }
      }

      .help-text {
        font-size: 0.85rem;
        color: var(--secondary-text-color);
        margin-top: 8px;
        padding: 8px;
        background: rgba(0, 0, 0, 0.02);
        border-radius: 6px;
      }

      .help-icon {
        cursor: help;
        opacity: 0.6;
        transition: opacity 0.2s;
      }

      .help-icon:hover {
        opacity: 1;
      }

      .button-group {
        display: flex;
        gap: 8px;
        flex-wrap: wrap;
      }

      @media (prefers-reduced-motion: reduce) {
        * {
          animation: none !important;
          transition: none !important;
        }
      }
    `;
  }

  constructor() {
    super();
    this.learning = false;
    this.signals = [];
    this.devices = [];
    this.signalMappings = {};
    this.formType = "switch";
    this.formName = "";
    this.formSignals = {};
    this.formActions = [];
    this._configureActionsForType(this.formType);
    this.formCustomAction = "";
    this.error = null;
    this.successMessage = null;
    this.infoMessage = null;
    this._signalUnsub = null;
    this._learningUnsub = null;
    this._persistedMappings = {};
  }

  connectedCallback() {
    super.connectedCallback();
    this._initialize();
  }

  disconnectedCallback() {
    super.disconnectedCallback();
    if (this._signalUnsub) {
      this._signalUnsub();
      this._signalUnsub = null;
    }
    if (this._learningUnsub) {
      this._learningUnsub();
      this._learningUnsub = null;
    }
  }

  async _initialize() {
    await this._loadState();
    await this._subscribeSignals();
    await this._subscribeLearning();
    await this._refreshDevices();
  }

  async _loadState() {
    const status = await this.hass.callWS({ type: "raspyrfm/learning/status" });
    this.learning = status.active;
    const signals = await this.hass.callWS({ type: "raspyrfm/signals/list" });
    this.signals = this._normaliseSignals(signals.signals || []);
  }

  async _loadMappings() {
    try {
      const response = await this.hass.callWS({ type: "raspyrfm/signals/map/list" });
      const next = {};
      (response.mappings || []).forEach((entry) => {
        next[entry.payload] = entry;
      });
      this.signalMappings = next;
      this._persistedMappings = JSON.parse(JSON.stringify(next));
      if (this.error === "Signal mapping is temporarily unavailable.") {
        this.error = null;
      }
    } catch (err) {
      // Older backends might not expose the mapping API yet – degrade gracefully.
      console.warn("Unable to load RaspyRFM signal mappings", err);
      if (Object.keys(this.signalMappings || {}).length) {
        this.signalMappings = {};
      }
      if (!this.error) {
        this.error = "Signal mapping is temporarily unavailable.";
      }
    }
  }

  async _subscribeSignals() {
    this._signalUnsub = await this.hass.connection.subscribeMessage((message) => {
      if (message.type !== "event") {
        return;
      }
      const signal = message.event;
      this.signals = this._normaliseSignals([...this.signals, signal]);
    }, {
      type: "raspyrfm/signals/subscribe"
    });
  }

  async _subscribeLearning() {
    this._learningUnsub = await this.hass.connection.subscribeMessage((message) => {
      if (message.type !== "event") {
        return;
      }
      this.learning = message.event.active;
    }, {
      type: "raspyrfm/learning/subscribe"
    });
  }

  async _refreshDevices() {
    const response = await this.hass.callWS({ type: "raspyrfm/devices/list" });
    this.devices = response.devices || [];
    await this._loadMappings();
  }

  render() {
    return html`
      <div class="layout">
        <div class="actions">
          <mwc-button raised icon="mdi:play" @click=${this._handleStartLearning} ?disabled=${this.learning}
            >Start learning</mwc-button
          >
          <mwc-button icon="mdi:stop" @click=${this._handleStopLearning} ?disabled=${!this.learning}
            >Stop learning</mwc-button
          >
          <mwc-button icon="mdi:refresh" @click=${this._refreshDevices}>Refresh devices</mwc-button>
          <mwc-button icon="mdi:chart-bubble" @click=${this._loadMappings}>Reload mapping</mwc-button>
        </div>
        ${this.error ? html`<div class="error">⚠️ ${this.error}</div>` : ""}
        ${this.successMessage ? html`<div class="success">✓ ${this.successMessage}</div>` : ""}
        ${this.infoMessage ? html`<div class="info">ℹ️ ${this.infoMessage}</div>` : ""}
        ${!this.learning && !this.signals.length ? html`
          <div class="info">
            <strong>Getting Started:</strong> Click "Start learning" to begin capturing RF signals from your remotes and devices. 
            For Raspberry Pi 4/5 with HAOS, ensure your RaspyRFM gateway is connected and accessible on the network.
          </div>
        ` : ""}
        <div class="split-columns">
          ${this._renderSignals()} ${this._renderForm()}
        </div>
        ${this._renderMappingWorkspace()}
        ${this._renderDevices()}
      </div>
    `;
  }

  _renderSignals() {
    if (!this.signals.length) {
      return html`
        <ha-card header="Captured signals">
          <div class="card-content">No signals received yet. Use the start learning button and trigger your remotes or sensors.</div>
        </ha-card>
      `;
    }
    return html`
      <ha-card header="Captured signals">
        <div class="card-content signal-list">
          ${this.signals.map((signal) => this._renderSignal(signal))}
        </div>
      </ha-card>
    `;
  }

  _renderSignal(signal) {
    const actions = Array.isArray(this.formActions) ? this.formActions : [];
    const classification = signal?.metadata?.classification;
    const classificationActions = Array.isArray(classification?.actions)
      ? classification.actions.map((action) => this._labelForAction(this._normaliseActionKey(action)))
      : [];

    return html`
      <div class="signal-entry">
        <div class="meta-block">
          <div>${signal.payload}</div>
          <div class="signal-meta">${signal.received}</div>
          ${classification
            ? html`
                <div class="signal-chips">
                  <span class="chip primary">${classification.suggested_type}</span>
                  ${classificationActions.map((label) => html`<span class="chip">${label}</span>`) }
                </div>
              `
            : ""}
        </div>
        <div class="signal-actions">
          ${actions.length
            ? actions.map(
                (action) => html`
                  <mwc-button
                    dense
                    outlined
                    @click=${() => this._assignSignal(action.key, signal.payload)}
                    >Set as ${action.label}</mwc-button
                  >
                `,
              )
            : html`<span class="signal-meta">Add an action to the form to assign this payload.</span>`}

Home Assistant integration checklist

To use the integration in your own Home Assistant instance:

  1. Copy the custom_components/raspyrfm directory into your Home Assistant config/custom_components folder.

  2. Restart Home Assistant so the new integration is discovered.

  3. Navigate to Settings → Devices & Services and add the RaspyRFM integration. Provide the host name or IP address of the RaspyRFM gateway and the UDP port it listens on (49880 by default).

  4. Open the RaspyRFM panel from the sidebar to start a learning session and capture payloads. Use the Create Home Assistant device card to turn them into entities, then link payloads to devices in the mapping workspace.

  5. Optional: update the labels, categories, and device associations for each payload in the mapping workspace. The data is persisted through .storage/raspyrfm_signal_map and is restored after Home Assistant restarts.