Home Assistant Components¶
This section documents the backend pieces that power the RaspyRFM Home Assistant integration. Each subsection links to the relevant source files and explains how the pieces collaborate to monitor radio traffic, learn payloads, and expose entities inside Home Assistant.
Configuration flow and setup¶
RaspyRFM is installed as a config entry. The integration registers a
config flow that resolves the gateway host name and persists the chosen
UDP port. Once an entry is created, async_setup_entry instantiates
the hub, forwards platform setups, and makes the management panel,
websocket commands, and the raspyrfm.send_action service available so
payloads can be replayed from automations.
"""Config flow for the RaspyRFM integration."""
from __future__ import annotations
import socket
from typing import Any, Dict
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.core import HomeAssistant
from .const import CONF_HOST, CONF_PORT, DEFAULT_PORT, DOMAIN
async def _validate_input(hass: HomeAssistant, data: Dict[str, Any]) -> Dict[str, Any]:
host = data[CONF_HOST]
port = data[CONF_PORT]
def _resolve() -> str:
return socket.gethostbyname(host)
await hass.async_add_executor_job(_resolve)
return {"title": f"RaspyRFM ({host})", "host": host, "port": port}
class RaspyRFMConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
"""Handle a config flow for RaspyRFM."""
VERSION = 1
async def async_step_user(self, user_input: Dict[str, Any] | None = None):
errors: Dict[str, str] = {}
if user_input is not None:
try:
info = await _validate_input(self.hass, user_input)
except OSError:
errors["base"] = "cannot_connect"
else:
await self.async_set_unique_id(user_input[CONF_HOST])
self._abort_if_unique_id_configured()
return self.async_create_entry(title=info["title"], data=user_input)
schema = vol.Schema({
vol.Required(CONF_HOST): str,
vol.Optional(CONF_PORT, default=DEFAULT_PORT): int,
})
return self.async_show_form(step_id="user", data_schema=schema, errors=errors)
async def async_step_import(self, user_input: Dict[str, Any]) -> Dict[str, Any]:
"""Handle YAML import."""
return await self.async_step_user(user_input)
async def async_get_options_flow(self, entry: config_entries.ConfigEntry):
return RaspyRFMOptionsFlow(entry)
class RaspyRFMOptionsFlow(config_entries.OptionsFlow):
"""Handle options flow for RaspyRFM."""
def __init__(self, entry: config_entries.ConfigEntry) -> None:
self._entry = entry
async def async_step_init(self, user_input: Dict[str, Any] | None = None):
errors: Dict[str, str] = {}
if user_input is not None:
new_data = {
CONF_HOST: user_input[CONF_HOST],
CONF_PORT: user_input[CONF_PORT],
}
self.hass.config_entries.async_update_entry(self._entry, data=new_data)
return self.async_create_entry(title="", data={})
schema = vol.Schema({
vol.Required(CONF_HOST, default=self._entry.data.get(CONF_HOST)): str,
vol.Required(CONF_PORT, default=self._entry.data.get(CONF_PORT, DEFAULT_PORT)): int,
})
return self.async_show_form(step_id="init", data_schema=schema, errors=errors)
"""Home Assistant integration for the RaspyRFM gateway."""
from __future__ import annotations
import logging
import voluptuous as vol
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryNotReady
from homeassistant.helpers import config_validation as cv
from .const import (
ATTR_ACTION,
ATTR_DEVICE_ID,
DATA_SERVICE_REGISTERED,
DOMAIN,
PLATFORMS,
SERVICE_SEND_ACTION,
)
from .hub import RaspyRFMHub
from .panel import async_register_panel, async_unregister_panel
from .websocket import async_register_websocket_handlers
_LOGGER = logging.getLogger(__name__)
SEND_ACTION_SCHEMA = vol.Schema({
vol.Required(ATTR_DEVICE_ID): cv.string,
vol.Required(ATTR_ACTION): cv.string,
})
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up RaspyRFM from a config entry."""
hub = RaspyRFMHub(hass, entry)
try:
await hub.async_setup()
except OSError as err:
raise ConfigEntryNotReady("Unable to initialise RaspyRFM hub") from err
hass.data.setdefault(DOMAIN, {})[entry.entry_id] = hub
if not hass.data[DOMAIN].get(DATA_SERVICE_REGISTERED):
async def _async_handle_send_action(call) -> None:
device_id: str = call.data[ATTR_DEVICE_ID]
action: str = call.data[ATTR_ACTION]
for key, candidate in hass.data[DOMAIN].items():
if key.startswith("_"):
continue
if candidate.storage.get_device(device_id):
Gateway and hub orchestration¶
The RaspyRFMGateway wraps the
UDP socket used by the hardware bridge. The hub owns a gateway instance
and coordinates persistent storage, signal learning, and entity updates
via Home Assistant dispatcher signals.
"""Helpers for interacting with the RaspyRFM UDP gateway."""
from __future__ import annotations
import logging
import socket
from homeassistant.core import HomeAssistant
_LOGGER = logging.getLogger(__name__)
class RaspyRFMGateway:
"""Abstraction around the UDP based RaspyRFM gateway."""
def __init__(self, hass: HomeAssistant, host: str, port: int) -> None:
self._hass = hass
self._host = host
self._port = port
@property
def host(self) -> str:
"""Return the configured host."""
return self._host
@property
def port(self) -> int:
"""Return the configured port."""
return self._port
async def async_update(self, host: str, port: int) -> None:
"""Update connection details."""
self._host = host
self._port = port
async def async_send_raw(self, payload: str) -> None:
"""Send a raw payload to the gateway via UDP."""
if not self._host:
raise OSError("Gateway host not configured")
def _send() -> None:
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
sock.settimeout(2)
sock.sendto(payload.encode("utf-8"), (self._host, self._port))
await self._hass.async_add_executor_job(_send)
async def async_ping(self) -> None:
"""Attempt to contact the gateway."""
await self.async_send_raw("PING")
"""Core hub object for the RaspyRFM integration."""
from __future__ import annotations
import asyncio
from dataclasses import dataclass
from datetime import datetime
import logging
import socket
import uuid
from typing import Any, Dict, Iterable, List, Optional
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.helpers.dispatcher import async_dispatcher_send
from .const import (
CONF_HOST,
CONF_PORT,
DEFAULT_PORT,
SIGNAL_DEVICE_REGISTRY_UPDATED,
SIGNAL_DEVICE_REMOVED,
SIGNAL_LEARNING_STATE,
SIGNAL_SIGNAL_RECEIVED,
)
from .storage import (
RaspyRFMDeviceEntry,
RaspyRFMDeviceStorage,
RaspyRFMSignalMapStorage,
RaspyRFMSignalMapping,
)
from .gateway import RaspyRFMGateway
from .learn import LearnManager, LearnedSignal
_LOGGER = logging.getLogger(__name__)
@dataclass(slots=True)
class ActiveSignal:
"""Representation of a signal currently known to the hub."""
signal: LearnedSignal
received_at: datetime
source: tuple[str, int]
class RaspyRFMHub:
"""Bridge between Home Assistant and a RaspyRFM gateway."""
def __init__(self, hass: HomeAssistant, entry: ConfigEntry) -> None:
self._hass = hass
self._entry = entry
self._gateway = RaspyRFMGateway(
hass,
entry.data.get(CONF_HOST, ""),
entry.data.get(CONF_PORT, DEFAULT_PORT),
)
self._storage = RaspyRFMDeviceStorage(hass)
self._map_storage = RaspyRFMSignalMapStorage(hass)
self._learn_manager = LearnManager(hass, self)
self._active_signals: Dict[str, ActiveSignal] = {}
self._signals_lock = asyncio.Lock()
@property
def gateway(self) -> RaspyRFMGateway:
"""Return the gateway helper object."""
return self._gateway
@property
def storage(self) -> RaspyRFMDeviceStorage:
"""Return the persistent storage helper."""
return self._storage
@property
def learn_manager(self) -> LearnManager:
"""Return the signal learning manager."""
return self._learn_manager
async def async_setup(self) -> None:
"""Initialise hub resources."""
await self._storage.async_load()
await self._map_storage.async_load()
async def async_unload(self) -> None:
"""Unload hub resources."""
await self._learn_manager.async_stop()
await self._storage.async_unload()
await self._map_storage.async_unload()
async def async_update_entry(self, entry: ConfigEntry) -> None:
"""Handle entry updates."""
self._entry = entry
await self._gateway.async_update( # type: ignore[no-untyped-call]
entry.data.get(CONF_HOST, ""), entry.data.get(CONF_PORT, DEFAULT_PORT)
)
async def async_start_learning(self) -> None:
"""Start a learning session."""
async with self._signals_lock:
self._active_signals.clear()
await self._learn_manager.async_start()
async def async_stop_learning(self) -> None:
"""Stop an active learning session."""
await self._learn_manager.async_stop()
def iter_devices(self) -> Iterable[RaspyRFMDeviceEntry]:
"""Return all configured devices."""
return self._storage.iter_devices()
def iter_signal_mappings(self) -> Iterable[RaspyRFMSignalMapping]:
"""Return all stored signal mappings."""
return self._map_storage.iter_mappings()
def get_device(self, device_id: str) -> Optional[RaspyRFMDeviceEntry]:
"""Return a device by id."""
return self._storage.get_device(device_id)
async def async_create_device(
self,
name: str,
device_type: str,
signals: Dict[str, str],
metadata: Optional[Dict[str, Any]] = None,
) -> RaspyRFMDeviceEntry:
"""Create a new device entry from learned signals."""
device = RaspyRFMDeviceEntry(
device_id=str(uuid.uuid4()),
name=name,
device_type=device_type,
signals=signals,
metadata=metadata or {},
)
await self._storage.async_add_or_update(device)
async_dispatcher_send(self._hass, SIGNAL_DEVICE_REGISTRY_UPDATED, device.device_id)
return device
async def async_remove_device(self, device_id: str) -> None:
"""Remove a device entry."""
await self._storage.async_remove(device_id)
async_dispatcher_send(self._hass, SIGNAL_DEVICE_REMOVED, device_id)
async def async_set_signal_mapping(
self,
payload: str,
category: str,
label: str,
linked_devices: Optional[List[str]] = None,
) -> RaspyRFMSignalMapping:
"""Store metadata describing a learned payload."""
mapping = RaspyRFMSignalMapping(
payload=payload,
category=category,
label=label or payload,
linked_devices=list(linked_devices or []),
)
await self._map_storage.async_set(mapping)
return mapping
async def async_remove_signal_mapping(self, payload: str) -> None:
"""Delete mapping metadata for a payload."""
await self._map_storage.async_remove(payload)
async def async_list_signal_mappings(self) -> List[Dict[str, Any]]:
"""Return a serialisable mapping overview."""
return [mapping.to_dict() for mapping in self._map_storage.iter_mappings()]
async def async_handle_learned_signal(self, signal: LearnedSignal, addr: tuple[str, int]) -> None:
"""Handle a signal received during a learning session."""
async with self._signals_lock:
self._active_signals[signal.uid] = ActiveSignal(
signal=signal, received_at=datetime.utcnow(), source=addr
)
async_dispatcher_send(
self._hass,
SIGNAL_SIGNAL_RECEIVED,
signal.to_dict(),
)
await self._maybe_match_devices(signal)
async def _maybe_match_devices(self, signal: LearnedSignal) -> None:
"""Match a learned signal with configured devices and fire updates."""
matched: List[str] = []
for device in self._storage.iter_devices():
if device.matches_signal(signal.payload):
matched.append(device.device_id)
if not matched:
return
for device_id in matched:
async_dispatcher_send(self._hass, SIGNAL_DEVICE_REGISTRY_UPDATED, device_id)
async def async_list_active_signals(self) -> List[Dict[str, Any]]:
"""Return a snapshot of all active signals."""
async with self._signals_lock:
return [signal.signal.to_dict() for signal in self._active_signals.values()]
async def async_reload_devices(self) -> None:
"""Reload device information from disk."""
await self._storage.async_load()
async_dispatcher_send(self._hass, SIGNAL_DEVICE_REGISTRY_UPDATED, None)
async def async_record_learning_state(self, active: bool) -> None:
"""Announce a change of the learning state."""
async_dispatcher_send(self._hass, SIGNAL_LEARNING_STATE, {"active": active})
async def async_send_raw(self, payload: str) -> None:
"""Send a raw UDP payload to the gateway."""
await self._gateway.async_send_raw(payload)
async def async_send_device_action(self, device_id: str, action: str) -> None:
"""Send a stored signal for the given device."""
device = self._storage.get_device(device_id)
if device is None:
raise ValueError(f"Unknown device: {device_id}")
Signal learning pipeline¶
The LearnManager binds a UDP
listener to DEFAULT_LISTEN_PORT (49881) and streams payloads into the
hub. Incoming packets are normalised into LearnedSignal dataclasses
and broadcast over dispatcher events so that the UI and entity platforms
receive live updates. Each payload is fingerprinted against the
raspyrfm-client device library using the classifier helper, which
allows the UI to suggest an entity type even when users have not labelled
the signal yet.
"""Signal learning helper for the RaspyRFM integration."""
from __future__ import annotations
import asyncio
from dataclasses import dataclass, field
from datetime import datetime
import logging
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
from homeassistant.core import HomeAssistant
from .classifier import classify_payload
from .const import DEFAULT_LISTEN_PORT
if TYPE_CHECKING:
from .hub import RaspyRFMHub
_LOGGER = logging.getLogger(__name__)
@dataclass(slots=True)
class LearnedSignal:
"""Representation of a learned radio signal."""
uid: str
payload: str
received: datetime
metadata: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
"""Return a serialisable representation."""
return {
"uid": self.uid,
"payload": self.payload,
"received": self.received.isoformat(),
"metadata": self.metadata,
}
class RaspyRFMLearnProtocol(asyncio.DatagramProtocol):
"""Asyncio datagram protocol for capturing signals."""
def __init__(self, manager: LearnManager) -> None:
self._manager = manager
def datagram_received(self, data: bytes, addr: Tuple[str, int]) -> None:
payload = data.decode("utf-8", errors="ignore").strip()
if not payload:
return
asyncio.create_task(self._manager.async_process_datagram(payload, addr))
class LearnManager:
"""Manage RaspyRFM learning sessions."""
def __init__(self, hass: HomeAssistant, hub: RaspyRFMHub) -> None:
self._hass = hass
self._hub = hub
self._transport: Optional[asyncio.transports.DatagramTransport] = None
self._active = False
self._listen_port = DEFAULT_LISTEN_PORT
self._signals: List[LearnedSignal] = []
self._lock = asyncio.Lock()
@property
def is_active(self) -> bool:
"""Return whether learning is active."""
return self._active
async def async_start(self) -> None:
"""Start listening for incoming datagrams."""
if self._active:
return
loop = asyncio.get_running_loop()
self._transport, _ = await loop.create_datagram_endpoint(
lambda: RaspyRFMLearnProtocol(self), local_addr=("0.0.0.0", self._listen_port)
)
self._signals.clear()
self._active = True
try:
await self._hub.async_send_raw("RXSTART")
except OSError:
_LOGGER.debug("Unable to send RXSTART command to gateway")
await self._hub.async_record_learning_state(True)
async def async_stop(self) -> None:
"""Stop listening for signals."""
if not self._active:
return
if self._transport is not None:
self._transport.close()
self._transport = None
self._active = False
try:
await self._hub.async_send_raw("RXSTOP")
except OSError:
_LOGGER.debug("Unable to send RXSTOP command to gateway")
await self._hub.async_record_learning_state(False)
async def async_process_datagram(self, payload: str, addr: Tuple[str, int]) -> None:
"""Process an incoming UDP datagram."""
if not payload:
return
signal = LearnedSignal(
uid=f"sig_{len(self._signals)+1}",
payload=payload,
received=datetime.utcnow(),
metadata={"source": addr[0], "port": addr[1]},
)
classification = classify_payload(payload)
if classification:
signal.metadata["classification"] = classification.to_dict()
async with self._lock:
self._signals.append(signal)
await self._hub.async_handle_learned_signal(signal, addr)
async def async_list_signals(self) -> List[Dict[str, Any]]:
"""Return a list of captured signals."""
async with self._lock:
return [signal.to_dict() for signal in self._signals]
async def async_clear_signals(self) -> None:
"""Signal classification helpers for RaspyRFM payloads."""
from __future__ import annotations
from dataclasses import dataclass
from functools import lru_cache
import logging
from typing import Dict, Iterable, List, Optional, Set, Tuple
from raspyrfm_client import RaspyRFMClient
from raspyrfm_client.device_implementations.controlunit.actions import Action
LOGGER = logging.getLogger(__name__)
@dataclass(frozen=True)
class SignalFingerprint:
"""Fingerprint describing a payload independent of the channel config."""
repetitions: int
gap: int
timebase: int
pulse_count: int
min_pulse: int
max_pulse: int
@dataclass(slots=True)
class SignalClassification:
"""Classification result for a payload."""
actions: Set[Action]
suggested_type: str
def to_dict(self) -> Dict[str, object]:
"""Return a serialisable representation."""
return {
"actions": sorted(action.name.lower() for action in self.actions),
"suggested_type": self.suggested_type,
}
def classify_payload(payload: str) -> Optional[SignalClassification]:
"""Return a best-effort classification for a payload string."""
fingerprint = _fingerprint_from_payload(payload)
if fingerprint is None:
return None
action_candidates = _fingerprint_action_map().get(fingerprint)
if not action_candidates:
return None
suggested_type = _actions_to_device_type(action_candidates)
return SignalClassification(actions=set(action_candidates), suggested_type=suggested_type)
def _actions_to_device_type(actions: Iterable[Action]) -> str:
"""Translate a collection of supported actions into a RaspyRFM device type."""
action_set = set(actions)
if {Action.BRIGHT, Action.DIMM} & action_set:
return "light"
if Action.ON in action_set and Action.OFF in action_set:
return "switch"
if action_set == {Action.ON}:
return "button"
if action_set >= {Action.PAIR} or action_set >= {Action.UNPAIR}:
# Pairing remotes act as buttons in the Home Assistant context.
return "button"
return "universal"
def _fingerprint_from_payload(payload: str) -> Optional[SignalFingerprint]:
"""Parse a payload into a fingerprint for comparison with the library table."""
if not payload:
return None
body = payload.strip()
if ":" in body:
# Strip transport prefix (e.g. ``TXP:`` or ``RXP:``)
try:
_, body = body.split(":", 1)
except ValueError:
return None
tokens = [token for token in body.split(",") if token]
if len(tokens) < 6:
return None
try:
repetitions = int(tokens[2])
gap = int(tokens[3])
timebase = int(tokens[4])
pair_count = int(tokens[5])
except ValueError:
return None
pulses: List[int] = []
for token in tokens[6:6 + pair_count * 2]:
try:
pulses.append(int(token))
except ValueError:
return None
if len(pulses) < 2:
return None
min_pulse = min(pulses)
max_pulse = max(pulses)
return SignalFingerprint(
repetitions=repetitions,
gap=gap,
timebase=timebase,
pulse_count=int(len(pulses) / 2),
min_pulse=min_pulse,
max_pulse=max_pulse,
)
@lru_cache(maxsize=1)
def _fingerprint_action_map() -> Dict[SignalFingerprint, Set[Action]]:
"""Build a lookup table for the available payload fingerprints."""
client = RaspyRFMClient()
mapping: Dict[SignalFingerprint, Set[Action]] = {}
for manufacturer in client.get_supported_controlunit_manufacturers():
for model in client.get_supported_controlunit_models(manufacturer):
device = client.get_controlunit(manufacturer, model)
try:
default_config = {
key: _default_value(regex)
for key, regex in device.get_channel_config_args().items()
}
device.set_channel_config(**default_config)
except Exception as err: # pragma: no cover - defensive fallback
LOGGER.debug(
"Unable to build default configuration for %s %s: %s",
manufacturer,
model,
err,
)
continue
for action in device.get_supported_actions():
try:
pulses, repetitions, timebase = device.get_pulse_data(action)
except Exception as err: # pragma: no cover - defensive fallback
LOGGER.debug(
"Unable to build fingerprint for %s %s action %s: %s",
manufacturer,
model,
action,
err,
)
continue
gap = 5600 # The RaspyRFM gateways always inject this constant gap.
min_pulse, max_pulse = _pulse_bounds(pulses)
fingerprint = SignalFingerprint(
repetitions=repetitions,
gap=gap,
timebase=timebase,
pulse_count=len(pulses),
min_pulse=min_pulse,
max_pulse=max_pulse,
)
mapping.setdefault(fingerprint, set()).add(action)
return mapping
def _pulse_bounds(pulses: Iterable[Tuple[int, int]]) -> Tuple[int, int]:
"""Return the minimum and maximum pulse length from a sequence."""
minima: List[int] = []
maxima: List[int] = []
for first, second in pulses:
minima.append(min(first, second))
maxima.append(max(first, second))
return (min(minima), max(maxima))
def _default_value(regex: str) -> str:
"""Return a deterministic default value that satisfies a configuration regex."""
pattern = regex.strip().lstrip("^").rstrip("$")
if pattern == "[01]":
return "0"
if pattern == "[1-4]":
return "1"
if pattern == "[1-3]":
return "1"
if pattern == "[A-D]":
return "A"
if pattern == "[A-C]":
return "A"
if pattern == "[A-E]":
return "A"
if pattern == "[A-P]":
return "A"
if pattern == "[01fF]":
return "0"
if pattern == "[01]{12}":
return "0" * 12
if pattern == "[01]{26}":
return "0" * 26
if pattern == "[0-9A-F]{5}":
return "00000"
if pattern == "([1-9]|0[1-9]|1[0-6])":
return "01"
# Fall back to the first character when the pattern is a simple character set.
if pattern.startswith("[") and pattern.endswith("]"):
characters = pattern[1:-1]
if characters and characters[0] != "^":
Persistent storage and device registry¶
Two storage helpers manage device definitions and optional metadata about
captured payloads. The RaspyRFMDeviceStorage class keeps track of
all entity types created from learned signals, including switches, lights,
button groups, and universal listeners, while RaspyRFMSignalMapStorage
stores labels, semantic categories, and links between payloads and devices.
"""Persistent storage helpers for RaspyRFM devices."""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, Dict, Iterable, List, Optional
from homeassistant.core import HomeAssistant
from homeassistant.helpers.storage import Store
from .const import (
MAPPING_CATEGORIES,
MAPPING_STORAGE_KEY,
MAPPING_STORAGE_VERSION,
STORAGE_KEY,
STORAGE_VERSION,
)
@dataclass(slots=True)
class RaspyRFMDeviceEntry:
"""Representation of a configured device."""
device_id: str
name: str
device_type: str
signals: Dict[str, str]
metadata: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
"""Return a dictionary representation."""
return {
"device_id": self.device_id,
"name": self.name,
"device_type": self.device_type,
"signals": self.signals,
"metadata": self.metadata,
}
def matches_signal(self, payload: str) -> bool:
"""Return True if the payload matches this device."""
return payload in self.signals.values()
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "RaspyRFMDeviceEntry":
"""Create an instance from stored data."""
return cls(
device_id=data["device_id"],
name=data["name"],
device_type=data.get("device_type", "switch"),
signals=data.get("signals", {}),
metadata=data.get("metadata", {}),
)
@dataclass(slots=True)
class RaspyRFMSignalMapping:
"""Representation of a learned signal mapping."""
payload: str
category: str
label: str
linked_devices: list[str] = field(default_factory=list)
def __post_init__(self) -> None:
if self.category not in MAPPING_CATEGORIES:
raise ValueError(f"Unknown mapping category: {self.category}")
def to_dict(self) -> Dict[str, Any]:
"""Return a serialisable form."""
return {
"payload": self.payload,
"category": self.category,
"label": self.label,
"linked_devices": self.linked_devices,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "RaspyRFMSignalMapping":
"""Create an instance from stored data."""
return cls(
payload=data["payload"],
category=data.get("category", "other"),
label=data.get("label", data.get("payload", "")),
linked_devices=list(data.get("linked_devices", [])),
)
class RaspyRFMDeviceStorage:
"""Storage helper managing device persistence."""
def __init__(self, hass: HomeAssistant) -> None:
self._hass = hass
self._store: Store = Store(hass, STORAGE_VERSION, STORAGE_KEY)
self._devices: Dict[str, RaspyRFMDeviceEntry] = {}
async def async_load(self) -> None:
"""Load device information from disk."""
data = await self._store.async_load()
if not data:
self._devices = {}
return
devices: List[RaspyRFMDeviceEntry] = [
RaspyRFMDeviceEntry.from_dict(item) for item in data.get("devices", [])
]
self._devices = {device.device_id: device for device in devices}
async def async_unload(self) -> None:
"""Flush changes to disk."""
await self._store.async_save({"devices": [device.to_dict() for device in self._devices.values()]})
def iter_devices(self) -> Iterable[RaspyRFMDeviceEntry]:
"""Iterate over all devices."""
return list(self._devices.values())
def iter_devices_by_type(self, device_type: str) -> Iterable[RaspyRFMDeviceEntry]:
"""Iterate over devices of a specific type."""
return [device for device in self._devices.values() if device.device_type == device_type]
def get_device(self, device_id: str) -> Optional[RaspyRFMDeviceEntry]:
"""Return a device by id."""
return self._devices.get(device_id)
async def async_add_or_update(self, device: RaspyRFMDeviceEntry) -> None:
"""Persist a device entry."""
self._devices[device.device_id] = device
await self._store.async_save({"devices": [d.to_dict() for d in self._devices.values()]})
async def async_remove(self, device_id: str) -> None:
"""Remove a device entry."""
if device_id in self._devices:
self._devices.pop(device_id)
await self._store.async_save({"devices": [d.to_dict() for d in self._devices.values()]})
class RaspyRFMSignalMapStorage:
"""Storage helper for signal mapping metadata."""
def __init__(self, hass: HomeAssistant) -> None:
self._store = Store(hass, MAPPING_STORAGE_VERSION, MAPPING_STORAGE_KEY)
self._mappings: Dict[str, RaspyRFMSignalMapping] = {}
async def async_load(self) -> None:
"""Load mapping state from disk."""
data = await self._store.async_load()
if not data:
self._mappings = {}
return
entries = [RaspyRFMSignalMapping.from_dict(item) for item in data.get("mappings", [])]
self._mappings = {entry.payload: entry for entry in entries}
async def async_unload(self) -> None:
"""Persist mapping state to disk."""
await self._store.async_save({"mappings": [entry.to_dict() for entry in self._mappings.values()]})
def iter_mappings(self) -> Iterable[RaspyRFMSignalMapping]:
"""Iterate over all known mappings."""
return list(self._mappings.values())
def get_mapping(self, payload: str) -> Optional[RaspyRFMSignalMapping]:
"""Return mapping information for a payload if present."""
return self._mappings.get(payload)
async def async_set(self, mapping: RaspyRFMSignalMapping) -> None:
"""Store or update a mapping entry."""
self._mappings[mapping.payload] = mapping
await self._store.async_save({"mappings": [entry.to_dict() for entry in self._mappings.values()]})
async def async_remove(self, payload: str) -> None:
"""Remove a mapping entry."""
if payload in self._mappings:
self._mappings.pop(payload)
await self._store.async_save({"mappings": [entry.to_dict() for entry in self._mappings.values()]})
Entity platforms¶
All entities share a base class that listens for dispatcher updates when devices change. The entity platforms iterate over stored devices, create entities on demand, and react to live signal messages from the learning pipeline. Besides switches and binary sensors the integration now exposes a light platform for dimmable actuators, a button platform that creates one entity per stored action, and a universal sensor that records and replays raw payloads when no fingerprint matches.
"""Base entity classes for the RaspyRFM integration."""
from __future__ import annotations
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.helpers.entity import Entity
from .const import DOMAIN, SIGNAL_DEVICE_REGISTRY_UPDATED
from .hub import RaspyRFMHub
from .storage import RaspyRFMDeviceEntry
class RaspyRFMEntity(Entity):
"""Common representation of a RaspyRFM entity."""
_attr_should_poll = False
def __init__(self, hub: RaspyRFMHub, device: RaspyRFMDeviceEntry) -> None:
self._hub = hub
self._device = device
self._unsub_dispatcher = None
async def async_added_to_hass(self) -> None:
await super().async_added_to_hass()
def _handle_update(device_id: str | None) -> None:
if device_id is None or device_id == self._device.device_id:
new_device = self._hub.get_device(self._device.device_id)
if new_device:
self._device = new_device
self.async_write_ha_state()
self._unsub_dispatcher = async_dispatcher_connect(
self.hass, SIGNAL_DEVICE_REGISTRY_UPDATED, _handle_update
)
async def async_will_remove_from_hass(self) -> None:
if self._unsub_dispatcher is not None:
self._unsub_dispatcher()
self._unsub_dispatcher = None
await super().async_will_remove_from_hass()
@property
def device_info(self) -> dict:
return {
"identifiers": {(DOMAIN, self._hub.gateway.host)},
"manufacturer": "Seegel Systeme",
"name": "RaspyRFM",
}
@property
def unique_id(self) -> str:
return self._device.device_id
@property
def name(self) -> str:
return self._device.name
"""Switch platform for RaspyRFM."""
from __future__ import annotations
import logging
from typing import Any, Dict
from homeassistant.components.switch import SwitchEntity
from homeassistant.core import callback
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from .const import DOMAIN, SIGNAL_DEVICE_REGISTRY_UPDATED, SIGNAL_SIGNAL_RECEIVED
from .entity import RaspyRFMEntity
from .hub import RaspyRFMHub
from .storage import RaspyRFMDeviceEntry
_LOGGER = logging.getLogger(__name__)
async def async_setup_entry(hass, entry, async_add_entities):
hub: RaspyRFMHub = hass.data[DOMAIN][entry.entry_id]
entities: Dict[str, RaspyRFMSwitch] = {}
@callback
def _ensure_entities() -> None:
new_entities = []
for device in hub.storage.iter_devices_by_type("switch"):
if device.device_id in entities:
continue
entity = RaspyRFMSwitch(hub, device)
entities[device.device_id] = entity
new_entities.append(entity)
if new_entities:
async_add_entities(new_entities)
_ensure_entities()
@callback
def handle_device_update(device_id: str | None) -> None:
if device_id is None or device_id not in entities:
_ensure_entities()
entry.async_on_unload(
async_dispatcher_connect(hass, SIGNAL_DEVICE_REGISTRY_UPDATED, handle_device_update)
)
class RaspyRFMSwitch(RaspyRFMEntity, SwitchEntity):
"""Representation of a learned switch."""
def __init__(self, hub: RaspyRFMHub, device: RaspyRFMDeviceEntry) -> None:
super().__init__(hub, device)
self._attr_is_on = False
self._signal_unsub = None
async def async_added_to_hass(self) -> None:
await super().async_added_to_hass()
@callback
def handle_signal(event: Dict[str, Any]) -> None:
payload = event.get("payload")
if payload == self._device.signals.get("on"):
self._attr_is_on = True
self.async_write_ha_state()
elif payload == self._device.signals.get("off"):
self._attr_is_on = False
self.async_write_ha_state()
self._signal_unsub = async_dispatcher_connect(
self.hass, SIGNAL_SIGNAL_RECEIVED, handle_signal
)
async def async_will_remove_from_hass(self) -> None:
if self._signal_unsub is not None:
self._signal_unsub()
self._signal_unsub = None
await super().async_will_remove_from_hass()
async def async_turn_on(self, **kwargs: Any) -> None:
signal = self._device.signals.get("on")
if signal is None:
raise ValueError("No ON signal stored for this device")
await self._hub.async_send_raw(signal)
self._attr_is_on = True
self.async_write_ha_state()
async def async_turn_off(self, **kwargs: Any) -> None:
signal = self._device.signals.get("off")
if signal is None:
_LOGGER.warning(
"Device %s has no OFF signal stored; ignoring turn_off request",
self._device.device_id,
)
return
await self._hub.async_send_raw(signal)
self._attr_is_on = False
self.async_write_ha_state()
"""Binary sensor platform for RaspyRFM."""
from __future__ import annotations
import asyncio
from typing import Any, Dict
from homeassistant.components.binary_sensor import BinarySensorEntity
from homeassistant.core import callback
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from .const import DOMAIN, SIGNAL_DEVICE_REGISTRY_UPDATED, SIGNAL_SIGNAL_RECEIVED
from .entity import RaspyRFMEntity
from .hub import RaspyRFMHub
from .storage import RaspyRFMDeviceEntry
RESET_TIMEOUT = 5
async def async_setup_entry(hass, entry, async_add_entities):
hub: RaspyRFMHub = hass.data[DOMAIN][entry.entry_id]
entities: Dict[str, RaspyRFMBinarySensor] = {}
@callback
def _ensure_entities() -> None:
new_entities = []
for device in hub.storage.iter_devices_by_type("binary_sensor"):
if device.device_id in entities:
continue
entity = RaspyRFMBinarySensor(hub, device)
entities[device.device_id] = entity
new_entities.append(entity)
if new_entities:
async_add_entities(new_entities)
_ensure_entities()
@callback
def handle_device_update(device_id: str | None) -> None:
if device_id is None or device_id not in entities:
_ensure_entities()
entry.async_on_unload(
async_dispatcher_connect(hass, SIGNAL_DEVICE_REGISTRY_UPDATED, handle_device_update)
)
class RaspyRFMBinarySensor(RaspyRFMEntity, BinarySensorEntity):
"""Representation of a learned binary sensor."""
def __init__(self, hub: RaspyRFMHub, device: RaspyRFMDeviceEntry) -> None:
super().__init__(hub, device)
self._reset_handle: asyncio.TimerHandle | None = None
self._signal_unsub = None
self._attr_is_on = False
async def async_added_to_hass(self) -> None:
await super().async_added_to_hass()
@callback
def handle_signal(event: Dict[str, Any]) -> None:
payload = event.get("payload")
if payload not in self._device.signals.values():
return
self._attr_is_on = True
self.async_write_ha_state()
if self._reset_handle is not None:
self._reset_handle.cancel()
self._reset_handle = self.hass.loop.call_later(RESET_TIMEOUT, self._reset_state)
self._signal_unsub = async_dispatcher_connect(
self.hass, SIGNAL_SIGNAL_RECEIVED, handle_signal
)
async def async_will_remove_from_hass(self) -> None:
if self._signal_unsub is not None:
self._signal_unsub()
self._signal_unsub = None
if self._reset_handle is not None:
self._reset_handle.cancel()
self._reset_handle = None
await super().async_will_remove_from_hass()
@callback
def _reset_state(self) -> None:
self._reset_handle = None
self._attr_is_on = False
self.async_write_ha_state()
"""Light platform for RaspyRFM."""
from __future__ import annotations
from typing import Any, Dict
from homeassistant.components.light import ColorMode, LightEntity
from homeassistant.core import callback
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from .const import DOMAIN, SIGNAL_DEVICE_REGISTRY_UPDATED, SIGNAL_SIGNAL_RECEIVED
from .entity import RaspyRFMEntity
from .hub import RaspyRFMHub
from .storage import RaspyRFMDeviceEntry
async def async_setup_entry(hass, entry, async_add_entities):
hub: RaspyRFMHub = hass.data[DOMAIN][entry.entry_id]
entities: Dict[str, RaspyRFMLight] = {}
@callback
def _ensure_entities() -> None:
new_entities = []
for device in hub.storage.iter_devices_by_type("light"):
if device.device_id in entities:
continue
entity = RaspyRFMLight(hub, device)
entities[device.device_id] = entity
new_entities.append(entity)
if new_entities:
async_add_entities(new_entities)
_ensure_entities()
@callback
def handle_device_update(device_id: str | None) -> None:
if device_id is None or device_id not in entities:
_ensure_entities()
entry.async_on_unload(
async_dispatcher_connect(hass, SIGNAL_DEVICE_REGISTRY_UPDATED, handle_device_update)
)
class RaspyRFMLight(RaspyRFMEntity, LightEntity):
"""Representation of a dimmable RaspyRFM light."""
_attr_supported_color_modes = {ColorMode.ONOFF}
_attr_color_mode = ColorMode.ONOFF
def __init__(self, hub: RaspyRFMHub, device: RaspyRFMDeviceEntry) -> None:
super().__init__(hub, device)
self._attr_is_on = False
self._signal_unsub = None
async def async_added_to_hass(self) -> None:
await super().async_added_to_hass()
@callback
def handle_signal(event: Dict[str, Any]) -> None:
payload = event.get("payload")
if payload == self._device.signals.get("on"):
self._attr_is_on = True
elif payload == self._device.signals.get("off"):
self._attr_is_on = False
elif payload == self._device.signals.get("bright"):
self._attr_is_on = True
elif payload == self._device.signals.get("dim") and "off" not in self._device.signals:
# Devices without an explicit OFF signal often dim to turn off.
self._attr_is_on = False
else:
return
self.async_write_ha_state()
self._signal_unsub = async_dispatcher_connect(
self.hass, SIGNAL_SIGNAL_RECEIVED, handle_signal
)
async def async_will_remove_from_hass(self) -> None:
if self._signal_unsub is not None:
self._signal_unsub()
self._signal_unsub = None
await super().async_will_remove_from_hass()
@property
def extra_state_attributes(self) -> Dict[str, Any]:
return {"available_signals": list(self._device.signals.keys())}
async def async_turn_on(self, **kwargs: Any) -> None:
payload = self._device.signals.get("on") or self._device.signals.get("bright")
if payload is None:
raise ValueError("No ON or BRIGHT signal stored for this device")
await self._hub.async_send_raw(payload)
self._attr_is_on = True
self.async_write_ha_state()
async def async_turn_off(self, **kwargs: Any) -> None:
payload = self._device.signals.get("off") or self._device.signals.get("dim")
if payload is None:
raise ValueError("No OFF or DIM signal stored for this device")
await self._hub.async_send_raw(payload)
self._attr_is_on = False
self.async_write_ha_state()
"""Button platform for RaspyRFM."""
from __future__ import annotations
from datetime import datetime
from typing import Any, Dict
from homeassistant.components.button import ButtonEntity
from homeassistant.core import callback
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.util import dt as dt_util
from .const import DOMAIN, SIGNAL_DEVICE_REGISTRY_UPDATED, SIGNAL_SIGNAL_RECEIVED
from .entity import RaspyRFMEntity
from .hub import RaspyRFMHub
from .storage import RaspyRFMDeviceEntry
async def async_setup_entry(hass, entry, async_add_entities):
hub: RaspyRFMHub = hass.data[DOMAIN][entry.entry_id]
entities: Dict[str, RaspyRFMButton] = {}
@callback
def _ensure_entities() -> None:
new_entities = []
for device in hub.storage.iter_devices_by_type("button"):
for action in sorted(device.signals.keys()):
key = f"{device.device_id}:{action}"
if key in entities:
continue
entity = RaspyRFMButton(hub, device, action)
entities[key] = entity
new_entities.append(entity)
if new_entities:
async_add_entities(new_entities)
_ensure_entities()
@callback
def handle_device_update(device_id: str | None) -> None:
if device_id is None:
_ensure_entities()
return
device = hub.storage.get_device(device_id)
if device is None:
_ensure_entities()
return
stale_keys = [
key
for key in list(entities)
if key.startswith(f"{device_id}:")
and key.split(":", 1)[1] not in device.signals
]
for key in stale_keys:
entity = entities.pop(key)
if entity.hass is not None:
entity.hass.async_create_task(entity.async_remove())
for action in device.signals.keys():
key = f"{device_id}:{action}"
if key not in entities:
_ensure_entities()
return
# Existing entities will refresh their state on next dispatcher tick.
entry.async_on_unload(
async_dispatcher_connect(hass, SIGNAL_DEVICE_REGISTRY_UPDATED, handle_device_update)
)
class RaspyRFMButton(RaspyRFMEntity, ButtonEntity):
"""Representation of a RaspyRFM button action."""
def __init__(self, hub: RaspyRFMHub, device: RaspyRFMDeviceEntry, action: str) -> None:
super().__init__(hub, device)
self._action = action
self._last_triggered: datetime | None = None
self._signal_unsub = None
@property
def name(self) -> str:
base = super().name
label = self._action.replace("_", " ").title()
return f"{base} {label}"
@property
def unique_id(self) -> str:
return f"{super().unique_id}:{self._action}"
async def async_added_to_hass(self) -> None:
await super().async_added_to_hass()
@callback
def handle_signal(event: Dict[str, Any]) -> None:
payload = event.get("payload")
if payload == self._device.signals.get(self._action):
self._last_triggered = dt_util.utcnow()
self.async_write_ha_state()
self._signal_unsub = async_dispatcher_connect(
self.hass, SIGNAL_SIGNAL_RECEIVED, handle_signal
)
async def async_will_remove_from_hass(self) -> None:
if self._signal_unsub is not None:
self._signal_unsub()
self._signal_unsub = None
await super().async_will_remove_from_hass()
@property
def extra_state_attributes(self) -> Dict[str, Any]:
attrs: Dict[str, Any] = {
"action": self._action,
}
if self._last_triggered is not None:
attrs["last_triggered"] = self._last_triggered.isoformat()
return attrs
async def async_press(self, **kwargs: Any) -> None:
payload = self._device.signals.get(self._action)
if payload is None:
raise ValueError(f"No signal stored for action {self._action}")
await self._hub.async_send_raw(payload)
self._last_triggered = dt_util.utcnow()
self.async_write_ha_state()
"""Sensor platform for universal RaspyRFM devices."""
from __future__ import annotations
from typing import Any, Dict
from homeassistant.components.sensor import SensorEntity
from homeassistant.core import callback
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from .const import DOMAIN, SIGNAL_DEVICE_REGISTRY_UPDATED, SIGNAL_SIGNAL_RECEIVED
from .entity import RaspyRFMEntity
from .hub import RaspyRFMHub
from .storage import RaspyRFMDeviceEntry
async def async_setup_entry(hass, entry, async_add_entities):
hub: RaspyRFMHub = hass.data[DOMAIN][entry.entry_id]
entities: Dict[str, RaspyRFMUniversalSensor] = {}
@callback
def _ensure_entities() -> None:
new_entities = []
for device in hub.storage.iter_devices_by_type("universal"):
if device.device_id in entities:
continue
entity = RaspyRFMUniversalSensor(hub, device)
entities[device.device_id] = entity
new_entities.append(entity)
if new_entities:
async_add_entities(new_entities)
_ensure_entities()
@callback
def handle_device_update(device_id: str | None) -> None:
if device_id is None or device_id not in entities:
_ensure_entities()
entry.async_on_unload(
async_dispatcher_connect(hass, SIGNAL_DEVICE_REGISTRY_UPDATED, handle_device_update)
)
class RaspyRFMUniversalSensor(RaspyRFMEntity, SensorEntity):
"""A best-effort entity for devices without a dedicated platform."""
_attr_icon = "mdi:radio-tower"
def __init__(self, hub: RaspyRFMHub, device: RaspyRFMDeviceEntry) -> None:
super().__init__(hub, device)
self._attr_native_value = None
self._signal_unsub = None
async def async_added_to_hass(self) -> None:
await super().async_added_to_hass()
@callback
def handle_signal(event: Dict[str, Any]) -> None:
payload = event.get("payload")
for action, stored_payload in self._device.signals.items():
if payload == stored_payload:
self._attr_native_value = action
self.async_write_ha_state()
break
self._signal_unsub = async_dispatcher_connect(
self.hass, SIGNAL_SIGNAL_RECEIVED, handle_signal
)
async def async_will_remove_from_hass(self) -> None:
if self._signal_unsub is not None:
self._signal_unsub()
self._signal_unsub = None
await super().async_will_remove_from_hass()
@property
def extra_state_attributes(self) -> Dict[str, Any]:
return {
"available_actions": list(self._device.signals.keys()),
}
Websocket API surface¶
The integration exposes a websocket namespace under raspyrfm/. The
commands cover the full lifecycle: starting and stopping capture, listing
signals, creating or deleting devices, triggering stored actions, and
maintaining the optional signal mapping metadata.
"""Websocket commands exposed by the RaspyRFM integration."""
from __future__ import annotations
import logging
from typing import Any, Dict, Set
import voluptuous as vol
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.components import websocket_api
from .const import (
DOMAIN,
MAPPING_CATEGORIES,
SIGNAL_LEARNING_STATE,
SIGNAL_SIGNAL_RECEIVED,
WS_TYPE_DEVICE_CREATE,
WS_TYPE_DEVICE_DELETE,
WS_TYPE_DEVICE_LIST,
WS_TYPE_DEVICE_RELOAD,
WS_TYPE_DEVICE_SEND,
WS_TYPE_LEARNING_START,
WS_TYPE_LEARNING_STATUS,
WS_TYPE_LEARNING_STOP,
WS_TYPE_LEARNING_SUBSCRIBE,
WS_TYPE_SIGNAL_MAP_DELETE,
WS_TYPE_SIGNAL_MAP_LIST,
WS_TYPE_SIGNAL_MAP_UPDATE,
WS_TYPE_SIGNALS_LIST,
WS_TYPE_SIGNALS_SUBSCRIBE,
)
from .hub import RaspyRFMHub
_LOGGER = logging.getLogger(__name__)
HANDLERS_REGISTERED = "_raspyrfm_ws_handlers"
def async_register_websocket_handlers(hass: HomeAssistant) -> None:
"""Register websocket commands."""
if hass.data.get(HANDLERS_REGISTERED):
return
websocket_api.async_register_command(hass, handle_learning_start)
websocket_api.async_register_command(hass, handle_learning_stop)
websocket_api.async_register_command(hass, handle_learning_status)
websocket_api.async_register_command(hass, handle_learning_subscribe)
websocket_api.async_register_command(hass, handle_signals_list)
websocket_api.async_register_command(hass, handle_signals_subscribe)
websocket_api.async_register_command(hass, handle_device_create)
websocket_api.async_register_command(hass, handle_device_delete)
websocket_api.async_register_command(hass, handle_device_list)
websocket_api.async_register_command(hass, handle_device_reload)
websocket_api.async_register_command(hass, handle_device_send_action)
websocket_api.async_register_command(hass, handle_signal_map_list)
websocket_api.async_register_command(hass, handle_signal_map_update)
websocket_api.async_register_command(hass, handle_signal_map_delete)
hass.data[HANDLERS_REGISTERED] = True
def _get_hub(hass: HomeAssistant, msg: Dict[str, Any]) -> RaspyRFMHub:
entry_id = msg.get("entry_id")
if entry_id is None:
# fallback to first entry
if DOMAIN not in hass.data or not hass.data[DOMAIN]:
raise websocket_api.HomeAssistantWebSocketError("No RaspyRFM entries configured")
candidates = [
key for key in hass.data[DOMAIN] if not key.startswith("_")
]
if not candidates:
raise websocket_api.HomeAssistantWebSocketError("No RaspyRFM entries configured")
entry_id = candidates[0]
hub = hass.data[DOMAIN].get(entry_id)
if hub is None:
raise websocket_api.HomeAssistantWebSocketError("Unknown RaspyRFM entry")
return hub
@websocket_api.websocket_command({vol.Required("type"): WS_TYPE_LEARNING_START, vol.Optional("entry_id"): str})
@websocket_api.async_response
async def handle_learning_start(hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: Dict[str, Any]) -> None:
"""Start capturing signals."""
hub = _get_hub(hass, msg)
await hub.async_start_learning()
connection.send_result(msg["id"], {"active": True})
@websocket_api.websocket_command({vol.Required("type"): WS_TYPE_LEARNING_STOP, vol.Optional("entry_id"): str})
@websocket_api.async_response
async def handle_learning_stop(hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: Dict[str, Any]) -> None:
"""Stop capturing signals."""
hub = _get_hub(hass, msg)
await hub.async_stop_learning()
connection.send_result(msg["id"], {"active": False})
@websocket_api.websocket_command({vol.Required("type"): WS_TYPE_LEARNING_STATUS, vol.Optional("entry_id"): str})
@websocket_api.async_response
async def handle_learning_status(hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: Dict[str, Any]) -> None:
"""Return the current learning state."""
hub = _get_hub(hass, msg)
connection.send_result(msg["id"], {"active": hub.learn_manager.is_active})
@websocket_api.websocket_command({vol.Required("type"): WS_TYPE_LEARNING_SUBSCRIBE, vol.Optional("entry_id"): str})
@callback
def handle_learning_subscribe(hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: Dict[str, Any]) -> None:
"""Subscribe to learning state updates."""
@callback
def forward(payload: Dict[str, Any]) -> None:
connection.send_message(websocket_api.event_message(msg["id"], payload))
connection.subscriptions[msg["id"]] = async_dispatcher_connect(
hass, SIGNAL_LEARNING_STATE, forward
)
connection.send_result(msg["id"])
@websocket_api.websocket_command({vol.Required("type"): WS_TYPE_SIGNALS_LIST, vol.Optional("entry_id"): str})
@websocket_api.async_response
async def handle_signals_list(hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: Dict[str, Any]) -> None:
"""Return a list of currently captured signals."""
hub = _get_hub(hass, msg)
signals = await hub.learn_manager.async_list_signals()
connection.send_result(msg["id"], {"signals": signals})
@websocket_api.websocket_command({vol.Required("type"): WS_TYPE_SIGNALS_SUBSCRIBE, vol.Optional("entry_id"): str})
@callback
def handle_signals_subscribe(hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: Dict[str, Any]) -> None:
"""Subscribe to incoming signals."""
@callback
def forward(payload: Dict[str, Any]) -> None:
connection.send_message(websocket_api.event_message(msg["id"], payload))
connection.subscriptions[msg["id"]] = async_dispatcher_connect(
hass, SIGNAL_SIGNAL_RECEIVED, forward
)
connection.send_result(msg["id"])
SUPPORTED_DEVICE_TYPES = ["switch", "binary_sensor", "light", "button", "universal"]
_DEVICE_CREATE_SCHEMA = websocket_api.BASE_COMMAND_MESSAGE_SCHEMA.extend(
{
vol.Required("type"): WS_TYPE_DEVICE_CREATE,
vol.Required("name"): cv.string,
vol.Required("device_type"): vol.In(SUPPORTED_DEVICE_TYPES),
vol.Required("signals"): {cv.string: cv.string},
vol.Optional("metadata"): {cv.string: cv.Any()},
vol.Optional("entry_id"): str,
}
)
@websocket_api.websocket_command(_DEVICE_CREATE_SCHEMA)
@websocket_api.async_response
async def handle_device_create(hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: Dict[str, Any]) -> None:
"""Create a device from captured signals."""
hub = _get_hub(hass, msg)
_validate_device_payload(msg["device_type"], msg["signals"])
device = await hub.async_create_device(
msg["name"], msg["device_type"], msg["signals"], msg.get("metadata")
)
connection.send_result(msg["id"], {"device": device.to_dict()})
@websocket_api.websocket_command({
vol.Required("type"): WS_TYPE_DEVICE_DELETE,
vol.Required("device_id"): cv.string,
vol.Optional("entry_id"): str,
})
@websocket_api.async_response
async def handle_device_delete(hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: Dict[str, Any]) -> None:
"""Delete a stored device."""
hub = _get_hub(hass, msg)
await hub.async_remove_device(msg["device_id"])
connection.send_result(msg["id"], {})
@websocket_api.websocket_command({vol.Required("type"): WS_TYPE_DEVICE_LIST, vol.Optional("entry_id"): str})
@websocket_api.async_response
async def handle_device_list(hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: Dict[str, Any]) -> None:
"""Return the list of stored devices."""
hub = _get_hub(hass, msg)
devices = [device.to_dict() for device in hub.iter_devices()]
connection.send_result(msg["id"], {"devices": devices})
@websocket_api.websocket_command({vol.Required("type"): WS_TYPE_DEVICE_RELOAD, vol.Optional("entry_id"): str})
@websocket_api.async_response
async def handle_device_reload(hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: Dict[str, Any]) -> None:
"""Reload devices from persistent storage."""
hub = _get_hub(hass, msg)
await hub.async_reload_devices()
connection.send_result(msg["id"], {})
@websocket_api.websocket_command(
{
vol.Required("type"): WS_TYPE_DEVICE_SEND,
vol.Required("device_id"): cv.string,
vol.Required("action"): cv.string,
vol.Optional("entry_id"): str,
}
)
@websocket_api.async_response
async def handle_device_send_action(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: Dict[str, Any],
) -> None:
"""Trigger a stored signal for a RaspyRFM device."""
hub = _get_hub(hass, msg)
try:
await hub.async_send_device_action(msg["device_id"], msg["action"])
except ValueError as err:
raise websocket_api.HomeAssistantWebSocketError(str(err)) from err
connection.send_result(msg["id"], {})
def _validate_device_payload(device_type: str, signals: Dict[str, str]) -> None:
"""Ensure the provided payload mapping matches the expected schema."""
required: Dict[str, str] = {}
optional: Set[str] = set()
if device_type == "switch":
required = {"on": "ON payload", "off": "OFF payload"}
elif device_type == "binary_sensor":
required = {"trigger": "Trigger payload"}
elif device_type == "light":
required = {"on": "ON payload"}
optional = {"off", "bright", "dim"}
elif device_type in {"button", "universal"}:
if not signals:
raise websocket_api.HomeAssistantWebSocketError(
"Provide at least one signal mapping"
)
else:
raise websocket_api.HomeAssistantWebSocketError("Unsupported device type")
missing = [key for key in required if key not in signals or not signals[key]]
if missing:
raise websocket_api.HomeAssistantWebSocketError(
f"Missing {', '.join(missing)} for {device_type}"
)
if device_type == "light" and not any(
signals.get(key) for key in ("off", "dim")
):
raise websocket_api.HomeAssistantWebSocketError(
"Provide either an OFF or DIM payload for light devices"
)
unexpected = [
key
for key in signals
if key not in required and key not in optional and device_type not in {"button", "universal"}
]
if unexpected:
raise websocket_api.HomeAssistantWebSocketError(
f"Unexpected signals for {device_type}: {', '.join(unexpected)}"
)
@websocket_api.websocket_command({vol.Required("type"): WS_TYPE_SIGNAL_MAP_LIST, vol.Optional("entry_id"): str})
@websocket_api.async_response
async def handle_signal_map_list(hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: Dict[str, Any]) -> None:
"""Return the stored signal mapping metadata."""
hub = _get_hub(hass, msg)
mappings = await hub.async_list_signal_mappings()
connection.send_result(msg["id"], {"mappings": mappings})
_SIGNAL_MAP_UPDATE_SCHEMA = websocket_api.BASE_COMMAND_MESSAGE_SCHEMA.extend(
{
vol.Required("type"): WS_TYPE_SIGNAL_MAP_UPDATE,
vol.Required("payload"): cv.string,
vol.Required("category"): vol.In(MAPPING_CATEGORIES),
vol.Required("label"): cv.string,
vol.Optional("linked_devices"): [cv.string],
vol.Optional("entry_id"): str,
}
)
@websocket_api.websocket_command(_SIGNAL_MAP_UPDATE_SCHEMA)
@websocket_api.async_response
async def handle_signal_map_update(hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: Dict[str, Any]) -> None:
"""Store mapping information for a payload."""
hub = _get_hub(hass, msg)
mapping = await hub.async_set_signal_mapping(
msg["payload"],
msg["category"],
msg["label"],
msg.get("linked_devices"),
)
connection.send_result(msg["id"], {"mapping": mapping.to_dict()})
Panel registration and static assets¶
Every config entry registers a static path and serves a custom panel that is restricted to administrators. The panel is implemented as a LitElement module that runs entirely inside the Home Assistant frontend.
"""Frontend panel registration for RaspyRFM."""
from __future__ import annotations
from importlib import resources
from typing import Set
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from .const import PANEL_ICON, PANEL_TITLE, PANEL_URL_PATH
STATIC_PATH = "/raspyrfm_static"
async def async_register_panel(hass: HomeAssistant, entry: ConfigEntry) -> None:
"""Register the RaspyRFM panel."""
entries: Set[str] = hass.data.setdefault("_raspyrfm_panel_entries", set())
if entry.entry_id in entries:
return
if not entries:
panel_path = resources.files(__package__).joinpath("frontend")
hass.http.register_static_path(STATIC_PATH, str(panel_path), cache_headers=False)
hass.components.frontend.async_register_panel(
component_name="custom",
frontend_url_path=PANEL_URL_PATH,
webcomponent_path=f"{STATIC_PATH}/raspyrfm-panel.js",
config={"name": PANEL_TITLE, "icon": PANEL_ICON},
require_admin=True,
)
entries.add(entry.entry_id)
async def async_unregister_panel(hass: HomeAssistant, entry: ConfigEntry) -> None:
"""Unregister panel for a given entry."""
entries: Set[str] = hass.data.get("_raspyrfm_panel_entries", set())
entries.discard(entry.entry_id)
if entries:
return
hass.components.frontend.async_remove_panel(PANEL_URL_PATH)
hass.http.unregister_static_path(STATIC_PATH)
hass.data.pop("_raspyrfm_panel_entries", None)
Frontend component¶
The raspyrfm-panel web component drives the onboarding experience for
RaspyRFM users. It exposes learning controls, renders cards summarising
captured payloads, fingerprints signals against the Python library to
suggest device types, provides a flexible creation form, and lets users map
payloads to semantic categories with optional device links. The component
relies exclusively on the websocket commands described above, so no
additional backend endpoints are required.
const { LitElement, html, css } = window;
const MAX_SIGNAL_HISTORY = 200;
class RaspyRFMPanel extends LitElement {
static get properties() {
return {
hass: {},
learning: { type: Boolean },
signals: { state: true },
devices: { state: true },
signalMappings: { state: true },
formType: { state: true },
formName: { state: true },
formSignals: { state: true },
formActions: { state: true },
formCustomAction: { state: true },
error: { state: true },
successMessage: { state: true },
infoMessage: { state: true },
};
}
static get styles() {
return css`
:host {
display: block;
min-height: 100%;
padding: 24px;
background: linear-gradient(135deg, var(--primary-background-color), rgba(0, 0, 0, 0))
no-repeat;
--success-color-fallback: #4caf50;
--info-color-fallback: #2196f3;
}
h2.section-title {
font-size: 1.4rem;
font-weight: 600;
margin: 0 0 12px;
display: flex;
align-items: center;
gap: 8px;
}
.layout {
display: grid;
gap: 24px;
}
.split-columns {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(320px, 1fr));
gap: 24px;
}
.actions {
display: flex;
flex-wrap: wrap;
gap: 12px;
margin-bottom: 24px;
}
ha-card {
border-radius: 18px;
box-shadow: var(--ha-card-box-shadow, 0 10px 30px rgba(0, 0, 0, 0.12));
overflow: hidden;
}
ha-card .card-content {
padding: 20px;
}
table {
width: 100%;
border-collapse: collapse;
}
th,
td {
padding: 10px;
border-bottom: 1px solid var(--divider-color);
font-size: 0.95rem;
}
tbody tr:last-child td {
border-bottom: none;
}
.signal-list {
max-height: 320px;
overflow-y: auto;
display: grid;
gap: 12px;
}
.signal-entry {
display: flex;
justify-content: space-between;
align-items: center;
padding: 14px;
border-radius: 12px;
background: rgba(0, 0, 0, 0.04);
}
.meta-block {
display: flex;
flex-direction: column;
gap: 4px;
}
.signal-meta {
font-size: 12px;
color: var(--secondary-text-color);
}
.form-grid {
display: grid;
gap: 16px;
}
.form-row {
display: flex;
gap: 12px;
align-items: center;
flex-wrap: wrap;
}
.pill {
padding: 2px 8px;
border-radius: 12px;
background-color: var(--accent-color);
color: var(--text-primary-color);
font-size: 12px;
}
.chip {
display: inline-flex;
align-items: center;
gap: 6px;
padding: 4px 10px;
border-radius: 14px;
background: rgba(0, 0, 0, 0.08);
font-size: 0.75rem;
font-weight: 600;
text-transform: uppercase;
letter-spacing: 0.04em;
}
.chip.primary {
background: rgba(25, 118, 210, 0.16);
color: var(--primary-color);
}
.signal-actions {
display: flex;
flex-wrap: wrap;
gap: 8px;
}
.signal-chips {
display: flex;
flex-wrap: wrap;
gap: 8px;
margin-top: 6px;
}
.signal-chip {
display: flex;
flex-direction: column;
gap: 4px;
padding: 10px;
border-radius: 12px;
background: rgba(0, 0, 0, 0.03);
}
.error {
color: var(--error-color);
margin-bottom: 12px;
padding: 12px 16px;
background: rgba(244, 67, 54, 0.1);
border-radius: 8px;
border-left: 4px solid var(--error-color);
}
.success {
color: var(--success-color, var(--success-color-fallback));
margin-bottom: 12px;
padding: 12px 16px;
background: rgba(76, 175, 80, 0.1);
border-radius: 8px;
border-left: 4px solid var(--success-color, var(--success-color-fallback));
}
.info {
color: var(--info-color, var(--info-color-fallback));
margin-bottom: 12px;
padding: 12px 16px;
background: rgba(33, 150, 243, 0.1);
border-radius: 8px;
border-left: 4px solid var(--info-color, var(--info-color-fallback));
}
.required {
margin-left: 4px;
color: var(--error-color);
}
.mapping-grid {
display: grid;
gap: 18px;
}
.mapping-item {
display: grid;
gap: 12px;
padding: 16px;
border-radius: 12px;
background: rgba(0, 0, 0, 0.04);
}
.mapping-actions {
display: flex;
gap: 12px;
flex-wrap: wrap;
}
mwc-button.danger,
mwc-button.danger[unelevated] {
--mdc-theme-primary: var(--error-color);
}
.device-checkboxes {
display: flex;
flex-wrap: wrap;
gap: 8px;
}
.map-canvas {
position: relative;
border-radius: 16px;
background: linear-gradient(135deg, rgba(0, 150, 136, 0.12), rgba(30, 136, 229, 0.12));
padding: 12px;
overflow: hidden;
}
svg.mapping {
width: 100%;
min-height: 260px;
}
.category-label {
font-size: 0.8rem;
font-weight: 600;
fill: var(--primary-text-color);
}
.category-column {
fill: rgba(255, 255, 255, 0.35);
}
.node-label {
font-size: 0.75rem;
fill: var(--primary-text-color);
}
.node-circle {
fill: var(--accent-color);
stroke: rgba(0, 0, 0, 0.2);
stroke-width: 1;
}
@media (max-width: 768px) {
:host {
padding: 16px;
}
.actions {
justify-content: stretch;
flex-direction: column;
}
.actions mwc-button {
width: 100%;
}
.signal-entry {
flex-direction: column;
align-items: stretch;
gap: 12px;
}
.split-columns {
grid-template-columns: 1fr;
}
h2.section-title {
font-size: 1.2rem;
}
}
.help-text {
font-size: 0.85rem;
color: var(--secondary-text-color);
margin-top: 8px;
padding: 8px;
background: rgba(0, 0, 0, 0.02);
border-radius: 6px;
}
.help-icon {
cursor: help;
opacity: 0.6;
transition: opacity 0.2s;
}
.help-icon:hover {
opacity: 1;
}
.button-group {
display: flex;
gap: 8px;
flex-wrap: wrap;
}
@media (prefers-reduced-motion: reduce) {
* {
animation: none !important;
transition: none !important;
}
}
`;
}
constructor() {
super();
this.learning = false;
this.signals = [];
this.devices = [];
this.signalMappings = {};
this.formType = "switch";
this.formName = "";
this.formSignals = {};
this.formActions = [];
this._configureActionsForType(this.formType);
this.formCustomAction = "";
this.error = null;
this.successMessage = null;
this.infoMessage = null;
this._signalUnsub = null;
this._learningUnsub = null;
this._persistedMappings = {};
}
connectedCallback() {
super.connectedCallback();
this._initialize();
}
disconnectedCallback() {
super.disconnectedCallback();
if (this._signalUnsub) {
this._signalUnsub();
this._signalUnsub = null;
}
if (this._learningUnsub) {
this._learningUnsub();
this._learningUnsub = null;
}
}
async _initialize() {
await this._loadState();
await this._subscribeSignals();
await this._subscribeLearning();
await this._refreshDevices();
}
async _loadState() {
const status = await this.hass.callWS({ type: "raspyrfm/learning/status" });
this.learning = status.active;
const signals = await this.hass.callWS({ type: "raspyrfm/signals/list" });
this.signals = this._normaliseSignals(signals.signals || []);
}
async _loadMappings() {
try {
const response = await this.hass.callWS({ type: "raspyrfm/signals/map/list" });
const next = {};
(response.mappings || []).forEach((entry) => {
next[entry.payload] = entry;
});
this.signalMappings = next;
this._persistedMappings = JSON.parse(JSON.stringify(next));
if (this.error === "Signal mapping is temporarily unavailable.") {
this.error = null;
}
} catch (err) {
// Older backends might not expose the mapping API yet – degrade gracefully.
console.warn("Unable to load RaspyRFM signal mappings", err);
if (Object.keys(this.signalMappings || {}).length) {
this.signalMappings = {};
}
if (!this.error) {
this.error = "Signal mapping is temporarily unavailable.";
}
}
}
async _subscribeSignals() {
this._signalUnsub = await this.hass.connection.subscribeMessage((message) => {
if (message.type !== "event") {
return;
}
const signal = message.event;
this.signals = this._normaliseSignals([...this.signals, signal]);
}, {
type: "raspyrfm/signals/subscribe"
});
}
async _subscribeLearning() {
this._learningUnsub = await this.hass.connection.subscribeMessage((message) => {
if (message.type !== "event") {
return;
}
this.learning = message.event.active;
}, {
type: "raspyrfm/learning/subscribe"
});
}
async _refreshDevices() {
const response = await this.hass.callWS({ type: "raspyrfm/devices/list" });
this.devices = response.devices || [];
await this._loadMappings();
}
render() {
return html`
<div class="layout">
<div class="actions">
<mwc-button raised icon="mdi:play" @click=${this._handleStartLearning} ?disabled=${this.learning}
>Start learning</mwc-button
>
<mwc-button icon="mdi:stop" @click=${this._handleStopLearning} ?disabled=${!this.learning}
>Stop learning</mwc-button
>
<mwc-button icon="mdi:refresh" @click=${this._refreshDevices}>Refresh devices</mwc-button>
<mwc-button icon="mdi:chart-bubble" @click=${this._loadMappings}>Reload mapping</mwc-button>
</div>
${this.error ? html`<div class="error">⚠️ ${this.error}</div>` : ""}
${this.successMessage ? html`<div class="success">✓ ${this.successMessage}</div>` : ""}
${this.infoMessage ? html`<div class="info">ℹ️ ${this.infoMessage}</div>` : ""}
${!this.learning && !this.signals.length ? html`
<div class="info">
<strong>Getting Started:</strong> Click "Start learning" to begin capturing RF signals from your remotes and devices.
For Raspberry Pi 4/5 with HAOS, ensure your RaspyRFM gateway is connected and accessible on the network.
</div>
` : ""}
<div class="split-columns">
${this._renderSignals()} ${this._renderForm()}
</div>
${this._renderMappingWorkspace()}
${this._renderDevices()}
</div>
`;
}
_renderSignals() {
if (!this.signals.length) {
return html`
<ha-card header="Captured signals">
<div class="card-content">No signals received yet. Use the start learning button and trigger your remotes or sensors.</div>
</ha-card>
`;
}
return html`
<ha-card header="Captured signals">
<div class="card-content signal-list">
${this.signals.map((signal) => this._renderSignal(signal))}
</div>
</ha-card>
`;
}
_renderSignal(signal) {
const actions = Array.isArray(this.formActions) ? this.formActions : [];
const classification = signal?.metadata?.classification;
const classificationActions = Array.isArray(classification?.actions)
? classification.actions.map((action) => this._labelForAction(this._normaliseActionKey(action)))
: [];
return html`
<div class="signal-entry">
<div class="meta-block">
<div>${signal.payload}</div>
<div class="signal-meta">${signal.received}</div>
${classification
? html`
<div class="signal-chips">
<span class="chip primary">${classification.suggested_type}</span>
${classificationActions.map((label) => html`<span class="chip">${label}</span>`) }
</div>
`
: ""}
</div>
<div class="signal-actions">
${actions.length
? actions.map(
(action) => html`
<mwc-button
dense
outlined
@click=${() => this._assignSignal(action.key, signal.payload)}
>Set as ${action.label}</mwc-button
>
`,
)
: html`<span class="signal-meta">Add an action to the form to assign this payload.</span>`}
Home Assistant integration checklist¶
To use the integration in your own Home Assistant instance:
Copy the
custom_components/raspyrfmdirectory into your Home Assistantconfig/custom_componentsfolder.Restart Home Assistant so the new integration is discovered.
Navigate to Settings → Devices & Services and add the RaspyRFM integration. Provide the host name or IP address of the RaspyRFM gateway and the UDP port it listens on (
49880by default).Open the RaspyRFM panel from the sidebar to start a learning session and capture payloads. Use the Create Home Assistant device card to turn them into entities, then link payloads to devices in the mapping workspace.
Optional: update the labels, categories, and device associations for each payload in the mapping workspace. The data is persisted through
.storage/raspyrfm_signal_mapand is restored after Home Assistant restarts.