Skip to main content

Simulation

Full source code

This tutorial shows how to drive a simulator with Rosia by building a closed loop between an Atari Skiing environment and a heuristic agent that steers through the gates. It puts together everything from the previous tutorials: ports, reactions, logical time, STAT, and Rerun visualization.

Pipeline

Two nodes form a tight feedback loop:

  • Environment wraps gymnasium's ALE/Skiing-v5. It emits the current frame on observation and consumes the next action on action_in.
  • Agent reads each frame, finds the player and the next gate by color, and emits a steering action on action_out (0=NOOP, 1=RIGHT, 2=LEFT).

Skiing Pipeline Diagram

The cycle is: Environment.observationAgent.observation_inAgent.action_outEnvironment.action_in → next frame.

Requirements

pip install 'gymnasium[atari]' 'ale-py' 'autorom[accept-rom-license]'

Nodes

import numpy as np
import gymnasium as gym
import ale_py

import rerun as rr
import rerun.blueprint as rrb

from rosia import InputPort, OutputPort, reaction, Node, Application, request_shutdown, log
from rosia.config import RerunConfig
from rosia.time import s

gym.register_envs(ale_py)

# Colors in RGB
PLAYER_COLOR = np.array([214, 92, 92])
RED_FLAG_COLOR = np.array([184, 50, 50])
BLUE_FLAG_COLOR = np.array([66, 72, 200])
THETA_DIFF_THRESHOLD = 0.015
SEED = 42


@Node
class Environment:
observation = OutputPort[np.ndarray]()
action_in = InputPort[int]()

def __init__(self, render: bool = True):
self.render = render
self.observation.set_STAT(0 * s)
self.dt = 1 * s / 15
render_mode = "rgb_array" if self.render else None
self.env = gym.make("ALE/Skiing-v5", render_mode=render_mode)
self.env.action_space.seed(SEED)

def start(self):
frame, _ = self.env.reset(seed=SEED)
log.info("Game started")
self.observation(frame, STAT=self.dt)

@reaction([action_in], eager=True)
def on_action(self):
frame, _, terminated, truncated, _ = self.env.step(self.action_in)
done = terminated or truncated
if done:
log.info("Game over!")
request_shutdown()
else:
yield self.dt
self.observation(frame, STAT=self.dt)

def shutdown(self):
self.env.close()

A few things to note about Environment:

  • self.dt = 1 * s / 15 is the simulator step duration — one Atari frame at 15 Hz.
  • self.observation.set_STAT(0 * s) declares that the very first frame is sent at logical time 0s. Without this, the downstream agent would not know it is safe to advance.
  • In start(), the initial frame is sent with STAT=self.dt, telling the agent: "the next observation will be at most dt from now". The agent can then advance its logical time up to (but not past) dt while waiting.
  • In on_action(), the reaction uses yield self.dt to advance logical time by one frame, then sends the new observation with STAT=self.dt. This keeps the loop running at a steady 15 Hz in logical time, regardless of how fast the wall clock is.
  • When the episode ends, request_shutdown() is called from inside the reaction, which tears down the application.

Why eager=True?

Without eager=True, yield self.dt would deadlock in this feedback loop:

@reaction([action_in])           # ❌ deadlocks without eager=True
def on_action(self):
frame, _, terminated, truncated, _ = self.env.step(self.action_in)
yield self.dt
self.observation(frame, STAT=self.dt)

Here's why. Normally, yield self.dt suspends the reaction and waits until STAT has advanced strictly past logical_time + dt before resuming. STAT for Environment is determined by action_in — the framework waits for the next action message before it considers it safe to advance. But the next action cannot be produced until Agent receives the next observation, which is only sent after the yield returns. The two nodes are stuck waiting for each other: a causality loop.

eager=True resolves this by relaxing the STAT guard from strictly past (t < STAT) to at or past (t <= STAT). When the Agent's action arrives with STAT=dt, the Environment's yield target is exactly dt. Without eager, the guard blocks (dt >= dt). With eager, it passes (dt > dt is false, so the eager reaction proceeds).

This one-tick relaxation is safe for feedback loops because the message at exactly STAT is the one this reaction itself will produce — it cannot arrive from upstream before we send it.

@Node
class Agent:
observation_in = InputPort[np.ndarray]()
action_out = OutputPort[int]()

def __init__(self):
self.prev_theta = 0.0

def find_position(
self, frame: np.ndarray, color: np.ndarray
) -> tuple[float, float] | None:
"""Find mean position of pixels matching color. Returns (row, col) or None."""
mask = np.all(frame == color, axis=2)
positions = np.argwhere(mask)
if len(positions) == 0:
return None
return float(positions[:, 0].mean()), float(positions[:, 1].mean())

@reaction([observation_in])
def decide(self):
frame = self.observation_in
player_pos = self.find_position(frame, PLAYER_COLOR)
cropped = frame[:200]
flag_pos = self.find_position(cropped, RED_FLAG_COLOR) or self.find_position(
cropped, BLUE_FLAG_COLOR
)

if player_pos is None or flag_pos is None:
# Can't see player or flag, go straight
self.action_out(0)
return

player_row, player_col = player_pos
flag_row, flag_col = flag_pos

theta = np.arctan2(flag_row - player_row, flag_col - player_col)
theta_diff = theta - self.prev_theta
self.prev_theta = theta

if theta_diff > THETA_DIFF_THRESHOLD:
action = 2 # LEFT
elif theta_diff < -THETA_DIFF_THRESHOLD:
action = 1 # RIGHT
else:
action = 0 # NOOP

log.rerun(rr.Image(frame), rerun_subpath="game")
self.action_out(action)

The Agent is purely reactive: every time a new frame arrives on observation_in, it computes a steering action and sends it back. The heuristic locates the player and the next gate (red, falling back to blue) by averaging the positions of pixels matching their respective colors, then steers based on the change in angle to the gate. Each frame is also forwarded to Rerun via log.rerun(...) so the run can be inspected visually.

Wiring

if __name__ == "__main__":
app = Application()
env = app.create_node(Environment(render=True))
agent = app.create_node(Agent())

env.observation >>= agent.observation_in
agent.action_out >>= env.action_in

app.diagram()
app.execute(
rerun_config=RerunConfig(
name="skiing",
blueprint=rrb.Blueprint(rrb.Spatial2DView()),
)
)

The two >>= lines close the feedback loop. app.diagram() writes the pipeline diagram shown above, and app.execute(...) starts the application with a Rerun viewer configured to show a single 2D spatial view.

Running

python examples/skiing.py

You should see a Rerun window pop up, with the skier descending the slope and steering toward the gates:

Skiing Rerun View

Because the environment is seeded with SEED = 42 (both env.reset(seed=...) and env.action_space.seed(...)) and the agent is fully deterministic, the entire run is reproducible from one execution to the next.

Why use Rosia for this?

A single-process loop would also work for this toy example, but expressing it as two Rosia nodes gives you a few things for free:

  • Process isolation. Environment and Agent run in separate processes, so a heavyweight learner or perception model can sit on the agent side without blocking simulation.
  • Logical-time pacing. The simulator advances in 1/15 s steps in logical time, independent of how long each step actually takes to compute. STAT keeps the agent and environment in lockstep without any explicit synchronization.
  • Observability. Rerun integration is one line per frame (log.rerun(rr.Image(frame), ...)), and the same logging that drives Rerun in development can be turned off in production.

Key points

  • Closed-loop simulators map naturally onto two reacting nodes connected in a cycle.
  • Use set_STAT and the STAT= argument on output_port(...) to advertise when the next message will arrive, so the downstream node can advance logical time safely.
  • Use @reaction([...], eager=True) for reactions in feedback loops where a yield would otherwise deadlock due to a causality loop. eager=True relaxes the STAT guard from t < STAT to t <= STAT.
  • Seeding both env.reset(seed=...) and env.action_space.seed(...) makes the run deterministic.