Skip to content

DataLogger API

Records experiment data from callable sources to CSV or JSON files.

Channels can be registered before or after calling :meth:start. The logger automatically determines the file format from the extension: .csv → CSV, .json or .ndjson → NDJSON.

Parameters:

Name Type Description Default
include_timestamp bool

If True, prepend a "t" column with the wall-clock time of each step. Defaults to True.

True
include_step bool

If True, prepend a "step" column with the step index. Defaults to True.

True

Example::

logger = DataLogger()
logger.watch("speed", lambda: robot.speed)
logger.start("run1.csv")
for _ in range(1000):
    sim.step()
    logger.step()
logger.stop()
Source code in bulletlab/logging/logger.py
class DataLogger:
    """Records experiment data from callable sources to CSV or JSON files.

    Channels can be registered before or after calling :meth:`start`.
    The logger automatically determines the file format from the extension:
    ``.csv`` → CSV, ``.json`` or ``.ndjson`` → NDJSON.

    Args:
        include_timestamp: If ``True``, prepend a ``"t"`` column with the
            wall-clock time of each step. Defaults to ``True``.
        include_step: If ``True``, prepend a ``"step"`` column with the step
            index. Defaults to ``True``.

    Example::

        logger = DataLogger()
        logger.watch("speed", lambda: robot.speed)
        logger.start("run1.csv")
        for _ in range(1000):
            sim.step()
            logger.step()
        logger.stop()
    """

    def __init__(
        self,
        include_timestamp: bool = True,
        include_step: bool = True,
    ) -> None:
        self._channels: dict[str, Callable[[], Any]] = {}
        self._writer: CsvWriter | JsonWriter | None = None
        self._step_count: int = 0
        self._include_timestamp = include_timestamp
        self._include_step = include_step
        self._start_time: float = 0.0
        self._running: bool = False

    # ------------------------------------------------------------------
    # Channel registration
    # ------------------------------------------------------------------

    def watch(self, name: str, source: Callable[[], Any]) -> "DataLogger":
        """Register a data source to log.

        Args:
            name: Column name in the output file.
            source: Callable returning the current value.

        Returns:
            self, for method chaining.

        Example::

            logger.watch("speed", lambda: robot.speed)
            logger.watch("roll",  lambda: robot.roll)
        """
        self._channels[name] = source
        return self

    def unwatch(self, name: str) -> None:
        """Remove a previously registered channel.

        Args:
            name: Channel name to remove.
        """
        self._channels.pop(name, None)

    # ------------------------------------------------------------------
    # Lifecycle
    # ------------------------------------------------------------------

    def start(self, filepath: str | Path) -> "DataLogger":
        """Open the output file and begin logging.

        Args:
            filepath: Path to the output file. Extension determines format:
                ``.csv`` for CSV, ``.json`` / ``.ndjson`` for NDJSON.

        Returns:
            self, for method chaining.

        Raises:
            RuntimeError: If the logger is already running.

        Example::

            logger.start("run1.csv")
        """
        if self._running:
            raise RuntimeError("DataLogger is already running. Call stop() first.")

        filepath = Path(filepath)
        filepath.parent.mkdir(parents=True, exist_ok=True)
        ext = filepath.suffix.lower()

        fieldnames = self._build_fieldnames()

        if ext in (".json", ".ndjson", ".jsonl"):
            self._writer = JsonWriter(filepath)
        else:
            # Default to CSV
            self._writer = CsvWriter(filepath, fieldnames)

        self._step_count = 0
        self._start_time = time.monotonic()
        self._running = True
        return self

    def stop(self) -> None:
        """Flush and close the output file.

        Example::

            logger.stop()
        """
        if self._writer is not None:
            self._writer.close()
            self._writer = None
        self._running = False

    # ------------------------------------------------------------------
    # Data recording
    # ------------------------------------------------------------------

    def step(self, t: float | None = None) -> dict[str, Any]:
        """Record one row of data from all watched sources.

        Call this once per simulation step. If not started, this is a no-op.

        Args:
            t: Timestamp override. If ``None``, uses wall-clock elapsed time
                since :meth:`start` was called.

        Returns:
            The recorded row as a dictionary.

        Example::

            for _ in range(1000):
                sim.step()
                logger.step()
        """
        if not self._running or self._writer is None:
            return {}

        timestamp = t if t is not None else (time.monotonic() - self._start_time)
        row: dict[str, Any] = {}

        if self._include_step:
            row["step"] = self._step_count
        if self._include_timestamp:
            row["t"] = round(timestamp, 6)

        for name, source in self._channels.items():
            try:
                val = source()
            except Exception:
                val = float("nan")
            row[name] = val

        self._writer.write_row(row)
        self._step_count += 1
        return row

    def flush(self) -> None:
        """Manually flush buffered data to disk."""
        if self._writer is not None:
            self._writer.flush()

    # ------------------------------------------------------------------
    # Helpers
    # ------------------------------------------------------------------

    def _build_fieldnames(self) -> list[str]:
        """Build the ordered list of field names including meta-columns."""
        fields: list[str] = []
        if self._include_step:
            fields.append("step")
        if self._include_timestamp:
            fields.append("t")
        fields.extend(self._channels.keys())
        return fields

    # ------------------------------------------------------------------
    # State
    # ------------------------------------------------------------------

    @property
    def is_running(self) -> bool:
        """``True`` if the logger is currently recording."""
        return self._running

    @property
    def step_count(self) -> int:
        """Number of rows recorded since :meth:`start` was last called."""
        return self._step_count

    @property
    def channel_names(self) -> list[str]:
        """Names of all registered channels."""
        return list(self._channels.keys())

    # ------------------------------------------------------------------
    # Context manager
    # ------------------------------------------------------------------

    def __enter__(self) -> "DataLogger":
        return self

    def __exit__(self, *_: object) -> None:
        self.stop()

    def __repr__(self) -> str:
        status = "running" if self._running else "stopped"
        return f"DataLogger({status}, channels={list(self._channels.keys())}, steps={self._step_count})"

channel_names: list[str] property

Names of all registered channels.

is_running: bool property

True if the logger is currently recording.

step_count: int property

Number of rows recorded since :meth:start was last called.

flush() -> None

Manually flush buffered data to disk.

Source code in bulletlab/logging/logger.py
def flush(self) -> None:
    """Manually flush buffered data to disk."""
    if self._writer is not None:
        self._writer.flush()

start(filepath: str | Path) -> 'DataLogger'

Open the output file and begin logging.

Parameters:

Name Type Description Default
filepath str | Path

Path to the output file. Extension determines format: .csv for CSV, .json / .ndjson for NDJSON.

required

Returns:

Type Description
'DataLogger'

self, for method chaining.

Raises:

Type Description
RuntimeError

If the logger is already running.

Example::

logger.start("run1.csv")
Source code in bulletlab/logging/logger.py
def start(self, filepath: str | Path) -> "DataLogger":
    """Open the output file and begin logging.

    Args:
        filepath: Path to the output file. Extension determines format:
            ``.csv`` for CSV, ``.json`` / ``.ndjson`` for NDJSON.

    Returns:
        self, for method chaining.

    Raises:
        RuntimeError: If the logger is already running.

    Example::

        logger.start("run1.csv")
    """
    if self._running:
        raise RuntimeError("DataLogger is already running. Call stop() first.")

    filepath = Path(filepath)
    filepath.parent.mkdir(parents=True, exist_ok=True)
    ext = filepath.suffix.lower()

    fieldnames = self._build_fieldnames()

    if ext in (".json", ".ndjson", ".jsonl"):
        self._writer = JsonWriter(filepath)
    else:
        # Default to CSV
        self._writer = CsvWriter(filepath, fieldnames)

    self._step_count = 0
    self._start_time = time.monotonic()
    self._running = True
    return self

step(t: float | None = None) -> dict[str, Any]

Record one row of data from all watched sources.

Call this once per simulation step. If not started, this is a no-op.

Parameters:

Name Type Description Default
t float | None

Timestamp override. If None, uses wall-clock elapsed time since :meth:start was called.

None

Returns:

Type Description
dict[str, Any]

The recorded row as a dictionary.

Example::

for _ in range(1000):
    sim.step()
    logger.step()
Source code in bulletlab/logging/logger.py
def step(self, t: float | None = None) -> dict[str, Any]:
    """Record one row of data from all watched sources.

    Call this once per simulation step. If not started, this is a no-op.

    Args:
        t: Timestamp override. If ``None``, uses wall-clock elapsed time
            since :meth:`start` was called.

    Returns:
        The recorded row as a dictionary.

    Example::

        for _ in range(1000):
            sim.step()
            logger.step()
    """
    if not self._running or self._writer is None:
        return {}

    timestamp = t if t is not None else (time.monotonic() - self._start_time)
    row: dict[str, Any] = {}

    if self._include_step:
        row["step"] = self._step_count
    if self._include_timestamp:
        row["t"] = round(timestamp, 6)

    for name, source in self._channels.items():
        try:
            val = source()
        except Exception:
            val = float("nan")
        row[name] = val

    self._writer.write_row(row)
    self._step_count += 1
    return row

stop() -> None

Flush and close the output file.

Example::

logger.stop()
Source code in bulletlab/logging/logger.py
def stop(self) -> None:
    """Flush and close the output file.

    Example::

        logger.stop()
    """
    if self._writer is not None:
        self._writer.close()
        self._writer = None
    self._running = False

unwatch(name: str) -> None

Remove a previously registered channel.

Parameters:

Name Type Description Default
name str

Channel name to remove.

required
Source code in bulletlab/logging/logger.py
def unwatch(self, name: str) -> None:
    """Remove a previously registered channel.

    Args:
        name: Channel name to remove.
    """
    self._channels.pop(name, None)

watch(name: str, source: Callable[[], Any]) -> 'DataLogger'

Register a data source to log.

Parameters:

Name Type Description Default
name str

Column name in the output file.

required
source Callable[[], Any]

Callable returning the current value.

required

Returns:

Type Description
'DataLogger'

self, for method chaining.

Example::

logger.watch("speed", lambda: robot.speed)
logger.watch("roll",  lambda: robot.roll)
Source code in bulletlab/logging/logger.py
def watch(self, name: str, source: Callable[[], Any]) -> "DataLogger":
    """Register a data source to log.

    Args:
        name: Column name in the output file.
        source: Callable returning the current value.

    Returns:
        self, for method chaining.

    Example::

        logger.watch("speed", lambda: robot.speed)
        logger.watch("roll",  lambda: robot.roll)
    """
    self._channels[name] = source
    return self