Skip to content

Telemetry API

Aggregates multiple :class:~bulletlab.telemetry.channel.TelemetryChannel instances.

Register channels with :meth:watch, then call :meth:update every simulation step. Retrieve the latest values via :meth:get or :meth:snapshot.

Parameters:

Name Type Description Default
history_len int

Default history buffer length for new channels.

1000

Example::

telemetry = TelemetryManager()
telemetry.watch("Roll", lambda: robot.roll, unit="rad")
telemetry.watch("Speed", lambda: robot.speed, unit="m/s")

for _ in range(1000):
    sim.step()
    telemetry.update(t=sim.elapsed_time)

print(telemetry.snapshot())
Source code in bulletlab/telemetry/manager.py
class TelemetryManager:
    """Aggregates multiple :class:`~bulletlab.telemetry.channel.TelemetryChannel` instances.

    Register channels with :meth:`watch`, then call :meth:`update` every
    simulation step. Retrieve the latest values via :meth:`get` or
    :meth:`snapshot`.

    Args:
        history_len: Default history buffer length for new channels.

    Example::

        telemetry = TelemetryManager()
        telemetry.watch("Roll", lambda: robot.roll, unit="rad")
        telemetry.watch("Speed", lambda: robot.speed, unit="m/s")

        for _ in range(1000):
            sim.step()
            telemetry.update(t=sim.elapsed_time)

        print(telemetry.snapshot())
    """

    def __init__(self, history_len: int = 1000) -> None:
        self._channels: dict[str, TelemetryChannel] = {}
        self._history_len = history_len
        self._last_update_time: float = 0.0

    # ------------------------------------------------------------------
    # Registration
    # ------------------------------------------------------------------

    def watch(
        self,
        name: str,
        source: Callable[[], Any],
        unit: str = "",
        history_len: int | None = None,
    ) -> TelemetryChannel:
        """Register a new channel to monitor.

        Args:
            name: Human-readable channel name.
            source: A callable (lambda, function, or method) that returns
                the current value when called.
            unit: Optional unit string (e.g. ``"m/s"``).
            history_len: History buffer size. Defaults to manager's default.

        Returns:
            The created :class:`~bulletlab.telemetry.channel.TelemetryChannel`.

        Example::

            telemetry.watch("Speed", lambda: robot.speed, unit="m/s")
            telemetry.watch("Joint_1", robot.joints["iiwa_joint_1"].position)
        """
        if callable(source):
            source_fn = source
        else:
            # Allow passing a property value directly as a lambda
            _val = source
            source_fn = lambda: _val  # noqa: E731

        hlen = history_len if history_len is not None else self._history_len
        channel = TelemetryChannel(name=name, source=source_fn, history_len=hlen, unit=unit)
        self._channels[name] = channel
        return channel

    def unwatch(self, name: str) -> None:
        """Remove a channel by name.

        Args:
            name: Channel name to remove.
        """
        self._channels.pop(name, None)

    # ------------------------------------------------------------------
    # Update
    # ------------------------------------------------------------------

    def update(self, t: float | None = None) -> dict[str, Any]:
        """Poll all channels and return a snapshot of current values.

        Args:
            t: Timestamp for this update cycle. If ``None``, uses
                :func:`time.monotonic`.

        Returns:
            Dictionary mapping channel names to their current values.

        Example::

            values = telemetry.update(t=sim.elapsed_time)
        """
        timestamp = t if t is not None else time.monotonic()
        self._last_update_time = timestamp
        result = {}
        for name, channel in self._channels.items():
            result[name] = channel.poll(t=timestamp)
        return result

    # ------------------------------------------------------------------
    # Retrieval
    # ------------------------------------------------------------------

    def get(self, name: str, default: Any = None) -> Any:
        """Return the latest value for a channel.

        Args:
            name: Channel name.
            default: Value to return if channel does not exist.

        Example::

            speed = telemetry.get("Speed")
        """
        channel = self._channels.get(name)
        return channel.latest if channel is not None else default

    def snapshot(self) -> dict[str, Any]:
        """Return a dictionary of all channel names to their latest values.

        Example::

            data = telemetry.snapshot()
            print(data["Speed"])
        """
        return {name: ch.latest for name, ch in self._channels.items()}

    def history(self, name: str) -> list[tuple[float, Any]]:
        """Return the full history for a channel.

        Args:
            name: Channel name.

        Returns:
            List of ``(timestamp, value)`` tuples.

        Example::

            times_and_vals = telemetry.history("Speed")
        """
        channel = self._channels.get(name)
        if channel is None:
            return []
        return list(channel.history)

    def values_array(self, name: str) -> list[Any]:
        """Return only the values (no timestamps) for a channel.

        Args:
            name: Channel name.

        Returns:
            List of values in chronological order.
        """
        channel = self._channels.get(name)
        if channel is None:
            return []
        return channel.values

    # ------------------------------------------------------------------
    # Reset
    # ------------------------------------------------------------------

    def clear_history(self) -> None:
        """Clear history buffers for all channels."""
        for channel in self._channels.values():
            channel.clear()

    def clear_all(self) -> None:
        """Remove all registered channels."""
        self._channels.clear()

    # ------------------------------------------------------------------
    # Introspection
    # ------------------------------------------------------------------

    @property
    def channels(self) -> dict[str, TelemetryChannel]:
        """Dictionary of all registered channels."""
        return dict(self._channels)

    @property
    def channel_names(self) -> list[str]:
        """List of all registered channel names."""
        return list(self._channels.keys())

    def __len__(self) -> int:
        return len(self._channels)

    def __contains__(self, name: str) -> bool:
        return name in self._channels

    def __repr__(self) -> str:
        return f"TelemetryManager(channels={list(self._channels.keys())})"

channel_names: list[str] property

List of all registered channel names.

channels: dict[str, TelemetryChannel] property

Dictionary of all registered channels.

clear_all() -> None

Remove all registered channels.

Source code in bulletlab/telemetry/manager.py
def clear_all(self) -> None:
    """Remove all registered channels."""
    self._channels.clear()

clear_history() -> None

Clear history buffers for all channels.

Source code in bulletlab/telemetry/manager.py
def clear_history(self) -> None:
    """Clear history buffers for all channels."""
    for channel in self._channels.values():
        channel.clear()

get(name: str, default: Any = None) -> Any

Return the latest value for a channel.

Parameters:

Name Type Description Default
name str

Channel name.

required
default Any

Value to return if channel does not exist.

None

Example::

speed = telemetry.get("Speed")
Source code in bulletlab/telemetry/manager.py
def get(self, name: str, default: Any = None) -> Any:
    """Return the latest value for a channel.

    Args:
        name: Channel name.
        default: Value to return if channel does not exist.

    Example::

        speed = telemetry.get("Speed")
    """
    channel = self._channels.get(name)
    return channel.latest if channel is not None else default

history(name: str) -> list[tuple[float, Any]]

Return the full history for a channel.

Parameters:

Name Type Description Default
name str

Channel name.

required

Returns:

Type Description
list[tuple[float, Any]]

List of (timestamp, value) tuples.

Example::

times_and_vals = telemetry.history("Speed")
Source code in bulletlab/telemetry/manager.py
def history(self, name: str) -> list[tuple[float, Any]]:
    """Return the full history for a channel.

    Args:
        name: Channel name.

    Returns:
        List of ``(timestamp, value)`` tuples.

    Example::

        times_and_vals = telemetry.history("Speed")
    """
    channel = self._channels.get(name)
    if channel is None:
        return []
    return list(channel.history)

snapshot() -> dict[str, Any]

Return a dictionary of all channel names to their latest values.

Example::

data = telemetry.snapshot()
print(data["Speed"])
Source code in bulletlab/telemetry/manager.py
def snapshot(self) -> dict[str, Any]:
    """Return a dictionary of all channel names to their latest values.

    Example::

        data = telemetry.snapshot()
        print(data["Speed"])
    """
    return {name: ch.latest for name, ch in self._channels.items()}

unwatch(name: str) -> None

Remove a channel by name.

Parameters:

Name Type Description Default
name str

Channel name to remove.

required
Source code in bulletlab/telemetry/manager.py
def unwatch(self, name: str) -> None:
    """Remove a channel by name.

    Args:
        name: Channel name to remove.
    """
    self._channels.pop(name, None)

update(t: float | None = None) -> dict[str, Any]

Poll all channels and return a snapshot of current values.

Parameters:

Name Type Description Default
t float | None

Timestamp for this update cycle. If None, uses :func:time.monotonic.

None

Returns:

Type Description
dict[str, Any]

Dictionary mapping channel names to their current values.

Example::

values = telemetry.update(t=sim.elapsed_time)
Source code in bulletlab/telemetry/manager.py
def update(self, t: float | None = None) -> dict[str, Any]:
    """Poll all channels and return a snapshot of current values.

    Args:
        t: Timestamp for this update cycle. If ``None``, uses
            :func:`time.monotonic`.

    Returns:
        Dictionary mapping channel names to their current values.

    Example::

        values = telemetry.update(t=sim.elapsed_time)
    """
    timestamp = t if t is not None else time.monotonic()
    self._last_update_time = timestamp
    result = {}
    for name, channel in self._channels.items():
        result[name] = channel.poll(t=timestamp)
    return result

values_array(name: str) -> list[Any]

Return only the values (no timestamps) for a channel.

Parameters:

Name Type Description Default
name str

Channel name.

required

Returns:

Type Description
list[Any]

List of values in chronological order.

Source code in bulletlab/telemetry/manager.py
def values_array(self, name: str) -> list[Any]:
    """Return only the values (no timestamps) for a channel.

    Args:
        name: Channel name.

    Returns:
        List of values in chronological order.
    """
    channel = self._channels.get(name)
    if channel is None:
        return []
    return channel.values

watch(name: str, source: Callable[[], Any], unit: str = '', history_len: int | None = None) -> TelemetryChannel

Register a new channel to monitor.

Parameters:

Name Type Description Default
name str

Human-readable channel name.

required
source Callable[[], Any]

A callable (lambda, function, or method) that returns the current value when called.

required
unit str

Optional unit string (e.g. "m/s").

''
history_len int | None

History buffer size. Defaults to manager's default.

None

Returns:

Type Description
TelemetryChannel

The created :class:~bulletlab.telemetry.channel.TelemetryChannel.

Example::

telemetry.watch("Speed", lambda: robot.speed, unit="m/s")
telemetry.watch("Joint_1", robot.joints["iiwa_joint_1"].position)
Source code in bulletlab/telemetry/manager.py
def watch(
    self,
    name: str,
    source: Callable[[], Any],
    unit: str = "",
    history_len: int | None = None,
) -> TelemetryChannel:
    """Register a new channel to monitor.

    Args:
        name: Human-readable channel name.
        source: A callable (lambda, function, or method) that returns
            the current value when called.
        unit: Optional unit string (e.g. ``"m/s"``).
        history_len: History buffer size. Defaults to manager's default.

    Returns:
        The created :class:`~bulletlab.telemetry.channel.TelemetryChannel`.

    Example::

        telemetry.watch("Speed", lambda: robot.speed, unit="m/s")
        telemetry.watch("Joint_1", robot.joints["iiwa_joint_1"].position)
    """
    if callable(source):
        source_fn = source
    else:
        # Allow passing a property value directly as a lambda
        _val = source
        source_fn = lambda: _val  # noqa: E731

    hlen = history_len if history_len is not None else self._history_len
    channel = TelemetryChannel(name=name, source=source_fn, history_len=hlen, unit=unit)
    self._channels[name] = channel
    return channel

A single named data stream with a rolling history buffer.

Parameters:

Name Type Description Default
name str

Human-readable channel name.

required
source Callable[[], Any]

Callable that returns the current value when called.

required
history_len int

Maximum number of samples to retain in history. Defaults to 1000.

1000
unit str

Optional unit string (e.g. "m/s", "rad").

''

Example::

ch = TelemetryChannel("Roll", lambda: robot.roll, unit="rad")
ch.poll(0.0)
print(ch.latest)
print(ch.history[-1])   # (timestamp, value)
Source code in bulletlab/telemetry/channel.py
class TelemetryChannel:
    """A single named data stream with a rolling history buffer.

    Args:
        name: Human-readable channel name.
        source: Callable that returns the current value when called.
        history_len: Maximum number of samples to retain in history.
            Defaults to 1000.
        unit: Optional unit string (e.g. ``"m/s"``, ``"rad"``).

    Example::

        ch = TelemetryChannel("Roll", lambda: robot.roll, unit="rad")
        ch.poll(0.0)
        print(ch.latest)
        print(ch.history[-1])   # (timestamp, value)
    """

    def __init__(
        self,
        name: str,
        source: Callable[[], Any],
        history_len: int = 1000,
        unit: str = "",
    ) -> None:
        self._name = name
        self._source = source
        self._history: deque[tuple[float, Any]] = deque(maxlen=history_len)
        self._unit = unit
        self._latest: Any = None
        self._last_poll_time: float = 0.0

    # ------------------------------------------------------------------
    # Polling
    # ------------------------------------------------------------------

    def poll(self, t: float | None = None) -> Any:
        """Sample the source and store the result in history.

        Args:
            t: Timestamp for this sample. If ``None``, uses
                :func:`time.monotonic`.

        Returns:
            The sampled value.

        Example::

            value = channel.poll(t=sim.elapsed_time)
        """
        timestamp = t if t is not None else time.monotonic()
        try:
            value = self._source()
        except Exception as exc:
            value = float("nan")

        self._latest = value
        self._last_poll_time = timestamp
        self._history.append((timestamp, value))
        return value

    # ------------------------------------------------------------------
    # Properties
    # ------------------------------------------------------------------

    @property
    def name(self) -> str:
        """Channel name."""
        return self._name

    @property
    def unit(self) -> str:
        """Measurement unit string (e.g. ``"m/s"``)."""
        return self._unit

    @property
    def latest(self) -> Any:
        """Most recently polled value, or ``None`` if never polled."""
        return self._latest

    @property
    def history(self) -> deque[tuple[float, Any]]:
        """Rolling history as a deque of ``(timestamp, value)`` pairs."""
        return self._history

    @property
    def timestamps(self) -> list[float]:
        """List of all stored timestamps."""
        return [t for t, _ in self._history]

    @property
    def values(self) -> list[Any]:
        """List of all stored values."""
        return [v for _, v in self._history]

    def clear(self) -> None:
        """Clear the history buffer."""
        self._history.clear()
        self._latest = None

    def __repr__(self) -> str:
        unit_str = f" {self._unit}" if self._unit else ""
        return f"TelemetryChannel({self._name!r}, latest={self._latest}{unit_str})"

history: deque[tuple[float, Any]] property

Rolling history as a deque of (timestamp, value) pairs.

latest: Any property

Most recently polled value, or None if never polled.

name: str property

Channel name.

timestamps: list[float] property

List of all stored timestamps.

unit: str property

Measurement unit string (e.g. "m/s").

values: list[Any] property

List of all stored values.

clear() -> None

Clear the history buffer.

Source code in bulletlab/telemetry/channel.py
def clear(self) -> None:
    """Clear the history buffer."""
    self._history.clear()
    self._latest = None

poll(t: float | None = None) -> Any

Sample the source and store the result in history.

Parameters:

Name Type Description Default
t float | None

Timestamp for this sample. If None, uses :func:time.monotonic.

None

Returns:

Type Description
Any

The sampled value.

Example::

value = channel.poll(t=sim.elapsed_time)
Source code in bulletlab/telemetry/channel.py
def poll(self, t: float | None = None) -> Any:
    """Sample the source and store the result in history.

    Args:
        t: Timestamp for this sample. If ``None``, uses
            :func:`time.monotonic`.

    Returns:
        The sampled value.

    Example::

        value = channel.poll(t=sim.elapsed_time)
    """
    timestamp = t if t is not None else time.monotonic()
    try:
        value = self._source()
    except Exception as exc:
        value = float("nan")

    self._latest = value
    self._last_poll_time = timestamp
    self._history.append((timestamp, value))
    return value