Skip to content

API reference

amr_hub_abm package.

agent

Module to represent an agent in the AMR Hub ABM simulation.

Agent dataclass

Representation of an agent in the AMR Hub ABM simulation.

Source code in src/amr_hub_abm/agent.py
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
@dataclass
class Agent:
    """Representation of an agent in the AMR Hub ABM simulation."""

    idx: int
    location: Location
    heading_rad: float
    space: list[Building]

    interaction_radius: float = field(default=0.05)
    tasks: list[Task] = field(default_factory=list)
    agent_type: AgentType = field(default=AgentType.GENERIC)
    infection_status: InfectionStatus = field(default=InfectionStatus.SUSCEPTIBLE)
    infection_details: dict = field(default_factory=dict)

    movement_speed: float = field(default=0.1)  # units per time step

    @property
    def heading_degrees(self) -> float:
        """Get the agent's heading in degrees."""
        return math.degrees(self.heading_rad)

    @heading_degrees.setter
    def heading_degrees(self, value: float) -> None:
        self.heading_rad = math.radians(value) % (2 * math.pi)

    def __post_init__(self) -> None:
        """Post-initialization to log agent creation."""
        # Ensure heading is between 0 and 360 degrees
        self.heading_rad = self.heading_rad % (2 * math.pi)

        logger.debug(
            "Created Agent id %s of type %s at location %s with heading %s",
            self.idx,
            self.agent_type,
            self.location,
            self.heading_rad,
        )

    def get_room(self) -> Room | None:
        """Get the room the agent is currently located in, if any."""
        for building in self.space:
            if building.name != self.location.building:
                continue
            for floor in building.floors:
                if floor.floor_number != self.location.floor:
                    continue
                room = floor.find_room_by_location((self.location.x, self.location.y))
                if room:
                    return room
        return None

    def check_intersection_with_walls(self, walls: list[Wall]) -> bool:
        """Check if the agent intersects with any walls."""
        for wall in walls:
            if (
                wall.polygon.distance(
                    shapely.geometry.Point(self.location.x, self.location.y)
                )
                < self.interaction_radius
            ):
                return True
        return False

    def check_if_location_reached(self, target_location: Location) -> bool:
        """Check if the agent has reached the target location."""
        if self.location.building != target_location.building:
            return False
        if self.location.floor != target_location.floor:
            return False

        distance = self.location.distance_to(target_location)
        return distance <= self.interaction_radius

    def move_to_location(self, new_location: Location) -> None:
        """Move the agent to a new location and log the movement."""
        msg = f"Moving Agent id {self.idx} from {self.location} to {new_location}"
        logger.info(msg)
        self.location = new_location

    def plot_agent(self, ax: Axes, *, show_tags: bool = False) -> None:
        """Plot the agent on the given axes."""
        ax.plot(
            self.location.x,
            self.location.y,
            marker="o",
            markersize=5,
            color=ROLE_COLOUR_MAP[self.agent_type],
        )

        if show_tags:
            ax.text(
                self.location.x + 0.05,
                self.location.y + 0.05,
                f"{self.agent_type.value} {self.idx}",
                fontsize=8,
                ha="left",
                va="bottom",
            )

    def __repr__(self) -> str:
        """Return a string representation of the agent."""
        return (
            f"Agent(idx={self.idx}, {self.location}, "
            f"{math.degrees(self.heading_rad):.2f}°, "
            f"{self.interaction_radius}, {self.agent_type.value}, "
            f"{self.infection_status.value})"
        )

    def add_task(
        self,
        time: int,
        location: Location,
        event_type: str,
        additional_info: dict | None = None,
    ) -> None:
        """Add a task to the agent's task list and log the addition."""
        if event_type not in TASK_TYPES:
            msg = f"Invalid task type: {event_type}. Must be one of {TASK_TYPES}."
            raise SimulationModeError(msg)
        task_type = TaskType(event_type)

        task: Task

        if task_type == TaskType.ATTEND_PATIENT:
            if additional_info is None or "patient" not in additional_info:
                msg = "Patient ID must be provided for attend_patient tasks."
                raise SimulationModeError(msg)

            patient = additional_info["patient"]
            if not isinstance(patient, Agent):
                msg = "Patient must be an instance of Agent."
                raise SimulationModeError(msg)

            task = TaskAttendPatient(
                time_needed=15,
                time_due=time,
                patient=patient,
            )

        elif task_type == TaskType.DOOR_ACCESS:
            if location.building is None or location.floor is None:
                msg = "Building and floor must be provided for door access tasks."
                raise SimulationModeError(msg)

            if (
                additional_info is None
                or "door" not in additional_info
                or not isinstance(additional_info["door"], Door)
            ):
                msg = "Door must be provided in additional_info for door access tasks."
                raise SimulationModeError(msg)

            task = TaskDoorAccess(
                door=additional_info["door"],
                time_needed=1,
                time_due=time,
                building=location.building,
                floor=location.floor,
            )

        elif task_type == TaskType.WORKSTATION:
            task = TaskWorkstation(
                workstation_location=location,
                time_needed=30,
                time_due=time,
            )

        else:
            msg = f"Task type {task_type} not implemented yet."
            raise NotImplementedError(msg)

        self.tasks.append(task)

    def head_to_point(self, point: tuple[float, float]) -> None:
        """Set the agent's heading to face a specific point."""
        delta_x = point[0] - self.location.x
        delta_y = point[1] - self.location.y

        self.heading_rad = math.atan2(delta_y, delta_x) % (2 * math.pi)

    def move_one_step(self) -> None:
        """Move the agent one step in the direction of its heading."""
        delta_x = self.movement_speed * math.cos(self.heading_rad)
        delta_y = self.movement_speed * math.sin(self.heading_rad)

        new_x = self.location.x + delta_x
        new_y = self.location.y + delta_y

        new_location = replace(
            self.location,
            x=new_x,
            y=new_y,
        )

        self.move_to_location(new_location)

    def perform_in_progress_task(self, current_time: int) -> bool:
        """Perform an in-progress task and return True if a task was performed."""
        in_progress_tasks = [
            task for task in self.tasks if task.progress == TaskProgress.IN_PROGRESS
        ]

        if not in_progress_tasks:
            return False

        if len(in_progress_tasks) > 1:
            msg = f"Agent id {self.idx} has multiple ongoing tasks."
            logger.error(msg)
            raise RuntimeError(msg)

        task = in_progress_tasks[0]
        task.update_progress(current_time=current_time, agent=self)
        return True

    def perform_moving_to_task_location(self, current_time: int) -> bool:
        """Move the agent towards the location of its next task."""
        moving_to_location_tasks = [
            task
            for task in self.tasks
            if task.progress == TaskProgress.MOVING_TO_LOCATION
        ]

        if not moving_to_location_tasks:
            return False

        if len(moving_to_location_tasks) > 1:
            msg = f"Agent id {self.idx} has multiple tasks to start."
            logger.error(msg)
            raise RuntimeError(msg)

        next_task = moving_to_location_tasks[0]
        next_task.update_progress(current_time=current_time, agent=self)
        return True

    def perform_suspended_task(self, current_time: int) -> bool:
        """Perform a suspended task and return True if a task was performed."""
        suspended_tasks = [
            task for task in self.tasks if task.progress == TaskProgress.SUSPENDED
        ]

        if not suspended_tasks:
            return False

        suspended_tasks.sort(key=lambda t: t.priority.value, reverse=True)
        task = suspended_tasks[0]
        task.update_progress(current_time=current_time, agent=self)
        return True

    def perform_to_be_started_task(self, current_time: int) -> bool:
        """Perform a to-be-started task and return True if a task was performed."""
        to_be_started_tasks = [
            task for task in self.tasks if task.progress == TaskProgress.NOT_STARTED
        ]

        if not to_be_started_tasks:
            return False

        to_be_started_tasks.sort(key=lambda t: t.priority.value, reverse=True)
        task = to_be_started_tasks[0]

        task_move_time = (
            task.time_due
            - task.time_needed
            - self.estimate_time_to_reach_location(task.location)
        )
        logger.info(
            "Agent id %s next task move time: %s, current time: %s",
            self.idx,
            task_move_time,
            current_time,
        )

        if current_time < task_move_time:
            return False

        task.update_progress(current_time=current_time, agent=self)
        return True

    def perform_task(self, current_time: int, rooms: list[Room]) -> None:
        """Perform the agent's current task if it's due."""
        if not self.tasks:
            return

        logger.debug(
            "Agent id %s has %s tasks to perform.",
            self.idx,
            len(self.tasks),
        )

        if self.perform_in_progress_task(current_time=current_time):
            return

        logger.debug(
            "No in-progress tasks for Agent id %s.",
            self.idx,
        )

        if self.perform_moving_to_task_location(current_time=current_time):
            return

        logger.debug(
            "No tasks to move to for Agent id %s.",
            self.idx,
        )

        if self.perform_suspended_task(current_time=current_time):
            return

        logger.debug(
            "No suspended tasks for Agent id %s.",
            self.idx,
        )

        if self.perform_to_be_started_task(current_time=current_time):
            return

        logger.debug(
            "No to-be-started tasks for Agent id %s.",
            self.idx,
        )

        logger.debug(
            "Number of rooms available for Agent id %s: %s",
            self.idx,
            len(rooms),
        )

    def estimate_time_to_reach_location(self, target_location: Location) -> float:
        """Estimate the time required to reach a target location."""
        distance = self.location.distance_to(target_location)
        return distance / self.movement_speed

heading_degrees property writable

Get the agent's heading in degrees.

__post_init__()

Post-initialization to log agent creation.

Source code in src/amr_hub_abm/agent.py
82
83
84
85
86
87
88
89
90
91
92
93
def __post_init__(self) -> None:
    """Post-initialization to log agent creation."""
    # Ensure heading is between 0 and 360 degrees
    self.heading_rad = self.heading_rad % (2 * math.pi)

    logger.debug(
        "Created Agent id %s of type %s at location %s with heading %s",
        self.idx,
        self.agent_type,
        self.location,
        self.heading_rad,
    )

__repr__()

Return a string representation of the agent.

Source code in src/amr_hub_abm/agent.py
156
157
158
159
160
161
162
163
def __repr__(self) -> str:
    """Return a string representation of the agent."""
    return (
        f"Agent(idx={self.idx}, {self.location}, "
        f"{math.degrees(self.heading_rad):.2f}°, "
        f"{self.interaction_radius}, {self.agent_type.value}, "
        f"{self.infection_status.value})"
    )

add_task(time, location, event_type, additional_info=None)

Add a task to the agent's task list and log the addition.

Source code in src/amr_hub_abm/agent.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
def add_task(
    self,
    time: int,
    location: Location,
    event_type: str,
    additional_info: dict | None = None,
) -> None:
    """Add a task to the agent's task list and log the addition."""
    if event_type not in TASK_TYPES:
        msg = f"Invalid task type: {event_type}. Must be one of {TASK_TYPES}."
        raise SimulationModeError(msg)
    task_type = TaskType(event_type)

    task: Task

    if task_type == TaskType.ATTEND_PATIENT:
        if additional_info is None or "patient" not in additional_info:
            msg = "Patient ID must be provided for attend_patient tasks."
            raise SimulationModeError(msg)

        patient = additional_info["patient"]
        if not isinstance(patient, Agent):
            msg = "Patient must be an instance of Agent."
            raise SimulationModeError(msg)

        task = TaskAttendPatient(
            time_needed=15,
            time_due=time,
            patient=patient,
        )

    elif task_type == TaskType.DOOR_ACCESS:
        if location.building is None or location.floor is None:
            msg = "Building and floor must be provided for door access tasks."
            raise SimulationModeError(msg)

        if (
            additional_info is None
            or "door" not in additional_info
            or not isinstance(additional_info["door"], Door)
        ):
            msg = "Door must be provided in additional_info for door access tasks."
            raise SimulationModeError(msg)

        task = TaskDoorAccess(
            door=additional_info["door"],
            time_needed=1,
            time_due=time,
            building=location.building,
            floor=location.floor,
        )

    elif task_type == TaskType.WORKSTATION:
        task = TaskWorkstation(
            workstation_location=location,
            time_needed=30,
            time_due=time,
        )

    else:
        msg = f"Task type {task_type} not implemented yet."
        raise NotImplementedError(msg)

    self.tasks.append(task)

check_if_location_reached(target_location)

Check if the agent has reached the target location.

Source code in src/amr_hub_abm/agent.py
120
121
122
123
124
125
126
127
128
def check_if_location_reached(self, target_location: Location) -> bool:
    """Check if the agent has reached the target location."""
    if self.location.building != target_location.building:
        return False
    if self.location.floor != target_location.floor:
        return False

    distance = self.location.distance_to(target_location)
    return distance <= self.interaction_radius

check_intersection_with_walls(walls)

Check if the agent intersects with any walls.

Source code in src/amr_hub_abm/agent.py
108
109
110
111
112
113
114
115
116
117
118
def check_intersection_with_walls(self, walls: list[Wall]) -> bool:
    """Check if the agent intersects with any walls."""
    for wall in walls:
        if (
            wall.polygon.distance(
                shapely.geometry.Point(self.location.x, self.location.y)
            )
            < self.interaction_radius
        ):
            return True
    return False

estimate_time_to_reach_location(target_location)

Estimate the time required to reach a target location.

Source code in src/amr_hub_abm/agent.py
384
385
386
387
def estimate_time_to_reach_location(self, target_location: Location) -> float:
    """Estimate the time required to reach a target location."""
    distance = self.location.distance_to(target_location)
    return distance / self.movement_speed

get_room()

Get the room the agent is currently located in, if any.

Source code in src/amr_hub_abm/agent.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def get_room(self) -> Room | None:
    """Get the room the agent is currently located in, if any."""
    for building in self.space:
        if building.name != self.location.building:
            continue
        for floor in building.floors:
            if floor.floor_number != self.location.floor:
                continue
            room = floor.find_room_by_location((self.location.x, self.location.y))
            if room:
                return room
    return None

head_to_point(point)

Set the agent's heading to face a specific point.

Source code in src/amr_hub_abm/agent.py
230
231
232
233
234
235
def head_to_point(self, point: tuple[float, float]) -> None:
    """Set the agent's heading to face a specific point."""
    delta_x = point[0] - self.location.x
    delta_y = point[1] - self.location.y

    self.heading_rad = math.atan2(delta_y, delta_x) % (2 * math.pi)

move_one_step()

Move the agent one step in the direction of its heading.

Source code in src/amr_hub_abm/agent.py
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
def move_one_step(self) -> None:
    """Move the agent one step in the direction of its heading."""
    delta_x = self.movement_speed * math.cos(self.heading_rad)
    delta_y = self.movement_speed * math.sin(self.heading_rad)

    new_x = self.location.x + delta_x
    new_y = self.location.y + delta_y

    new_location = replace(
        self.location,
        x=new_x,
        y=new_y,
    )

    self.move_to_location(new_location)

move_to_location(new_location)

Move the agent to a new location and log the movement.

Source code in src/amr_hub_abm/agent.py
130
131
132
133
134
def move_to_location(self, new_location: Location) -> None:
    """Move the agent to a new location and log the movement."""
    msg = f"Moving Agent id {self.idx} from {self.location} to {new_location}"
    logger.info(msg)
    self.location = new_location

perform_in_progress_task(current_time)

Perform an in-progress task and return True if a task was performed.

Source code in src/amr_hub_abm/agent.py
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
def perform_in_progress_task(self, current_time: int) -> bool:
    """Perform an in-progress task and return True if a task was performed."""
    in_progress_tasks = [
        task for task in self.tasks if task.progress == TaskProgress.IN_PROGRESS
    ]

    if not in_progress_tasks:
        return False

    if len(in_progress_tasks) > 1:
        msg = f"Agent id {self.idx} has multiple ongoing tasks."
        logger.error(msg)
        raise RuntimeError(msg)

    task = in_progress_tasks[0]
    task.update_progress(current_time=current_time, agent=self)
    return True

perform_moving_to_task_location(current_time)

Move the agent towards the location of its next task.

Source code in src/amr_hub_abm/agent.py
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
def perform_moving_to_task_location(self, current_time: int) -> bool:
    """Move the agent towards the location of its next task."""
    moving_to_location_tasks = [
        task
        for task in self.tasks
        if task.progress == TaskProgress.MOVING_TO_LOCATION
    ]

    if not moving_to_location_tasks:
        return False

    if len(moving_to_location_tasks) > 1:
        msg = f"Agent id {self.idx} has multiple tasks to start."
        logger.error(msg)
        raise RuntimeError(msg)

    next_task = moving_to_location_tasks[0]
    next_task.update_progress(current_time=current_time, agent=self)
    return True

perform_suspended_task(current_time)

Perform a suspended task and return True if a task was performed.

Source code in src/amr_hub_abm/agent.py
291
292
293
294
295
296
297
298
299
300
301
302
303
def perform_suspended_task(self, current_time: int) -> bool:
    """Perform a suspended task and return True if a task was performed."""
    suspended_tasks = [
        task for task in self.tasks if task.progress == TaskProgress.SUSPENDED
    ]

    if not suspended_tasks:
        return False

    suspended_tasks.sort(key=lambda t: t.priority.value, reverse=True)
    task = suspended_tasks[0]
    task.update_progress(current_time=current_time, agent=self)
    return True

perform_task(current_time, rooms)

Perform the agent's current task if it's due.

Source code in src/amr_hub_abm/agent.py
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
def perform_task(self, current_time: int, rooms: list[Room]) -> None:
    """Perform the agent's current task if it's due."""
    if not self.tasks:
        return

    logger.debug(
        "Agent id %s has %s tasks to perform.",
        self.idx,
        len(self.tasks),
    )

    if self.perform_in_progress_task(current_time=current_time):
        return

    logger.debug(
        "No in-progress tasks for Agent id %s.",
        self.idx,
    )

    if self.perform_moving_to_task_location(current_time=current_time):
        return

    logger.debug(
        "No tasks to move to for Agent id %s.",
        self.idx,
    )

    if self.perform_suspended_task(current_time=current_time):
        return

    logger.debug(
        "No suspended tasks for Agent id %s.",
        self.idx,
    )

    if self.perform_to_be_started_task(current_time=current_time):
        return

    logger.debug(
        "No to-be-started tasks for Agent id %s.",
        self.idx,
    )

    logger.debug(
        "Number of rooms available for Agent id %s: %s",
        self.idx,
        len(rooms),
    )

perform_to_be_started_task(current_time)

Perform a to-be-started task and return True if a task was performed.

Source code in src/amr_hub_abm/agent.py
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
def perform_to_be_started_task(self, current_time: int) -> bool:
    """Perform a to-be-started task and return True if a task was performed."""
    to_be_started_tasks = [
        task for task in self.tasks if task.progress == TaskProgress.NOT_STARTED
    ]

    if not to_be_started_tasks:
        return False

    to_be_started_tasks.sort(key=lambda t: t.priority.value, reverse=True)
    task = to_be_started_tasks[0]

    task_move_time = (
        task.time_due
        - task.time_needed
        - self.estimate_time_to_reach_location(task.location)
    )
    logger.info(
        "Agent id %s next task move time: %s, current time: %s",
        self.idx,
        task_move_time,
        current_time,
    )

    if current_time < task_move_time:
        return False

    task.update_progress(current_time=current_time, agent=self)
    return True

plot_agent(ax, *, show_tags=False)

Plot the agent on the given axes.

Source code in src/amr_hub_abm/agent.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
def plot_agent(self, ax: Axes, *, show_tags: bool = False) -> None:
    """Plot the agent on the given axes."""
    ax.plot(
        self.location.x,
        self.location.y,
        marker="o",
        markersize=5,
        color=ROLE_COLOUR_MAP[self.agent_type],
    )

    if show_tags:
        ax.text(
            self.location.x + 0.05,
            self.location.y + 0.05,
            f"{self.agent_type.value} {self.idx}",
            fontsize=8,
            ha="left",
            va="bottom",
        )

AgentType

Bases: Enum

Enumeration of possible agent types.

Source code in src/amr_hub_abm/agent.py
32
33
34
35
36
37
class AgentType(Enum):
    """Enumeration of possible agent types."""

    GENERIC = "generic"
    PATIENT = "patient"
    HEALTHCARE_WORKER = "healthcare_worker"

InfectionStatus

Bases: Enum

Enumeration of possible infection statuses.

Source code in src/amr_hub_abm/agent.py
47
48
49
50
51
52
53
class InfectionStatus(Enum):
    """Enumeration of possible infection statuses."""

    SUSCEPTIBLE = "susceptible"
    EXPOSED = "exposed"
    INFECTED = "infected"
    RECOVERED = "recovered"

exceptions

Module defining all custom exceptions for the AMR Hub ABM simulation.

InvalidDefinitionError

Bases: Exception

Exception raised when a general invalid definition is encountered.

Source code in src/amr_hub_abm/exceptions.py
53
54
55
56
57
58
class InvalidDefinitionError(Exception):
    """Exception raised when a general invalid definition is encountered."""

    def __init__(self, message: str) -> None:
        """Initialize the InvalidDefinitionError."""
        super().__init__(f"Invalid definition: {message}.")

__init__(message)

Initialize the InvalidDefinitionError.

Source code in src/amr_hub_abm/exceptions.py
56
57
58
def __init__(self, message: str) -> None:
    """Initialize the InvalidDefinitionError."""
    super().__init__(f"Invalid definition: {message}.")

InvalidDistanceError

Bases: Exception

Exception raised when an invalid distance calculation is attempted.

Source code in src/amr_hub_abm/exceptions.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class InvalidDistanceError(Exception):
    """Exception raised when an invalid distance calculation is attempted."""

    def __init__(self, locations: tuple, building: bool) -> None:  # noqa: FBT001
        """Initialize the InvalidDistanceError."""
        initial_string = "Invalid distance calculation between"

        if building:
            super().__init__(
                f"{initial_string} buildings: {locations[0]} and {locations[1]}."
            )
        else:
            super().__init__(
                f"{initial_string} floors: {locations[0]} and {locations[1]}."
            )

__init__(locations, building)

Initialize the InvalidDistanceError.

Source code in src/amr_hub_abm/exceptions.py
15
16
17
18
19
20
21
22
23
24
25
26
def __init__(self, locations: tuple, building: bool) -> None:  # noqa: FBT001
    """Initialize the InvalidDistanceError."""
    initial_string = "Invalid distance calculation between"

    if building:
        super().__init__(
            f"{initial_string} buildings: {locations[0]} and {locations[1]}."
        )
    else:
        super().__init__(
            f"{initial_string} floors: {locations[0]} and {locations[1]}."
        )

InvalidDoorError

Bases: Exception

Exception raised when a door is defined with invalid parameters.

Source code in src/amr_hub_abm/exceptions.py
45
46
47
48
49
50
class InvalidDoorError(Exception):
    """Exception raised when a door is defined with invalid parameters."""

    def __init__(self, message: str) -> None:
        """Initialize the InvalidDoorError."""
        super().__init__(f"Invalid door definition: {message}.")

__init__(message)

Initialize the InvalidDoorError.

Source code in src/amr_hub_abm/exceptions.py
48
49
50
def __init__(self, message: str) -> None:
    """Initialize the InvalidDoorError."""
    super().__init__(f"Invalid door definition: {message}.")

InvalidRoomError

Bases: Exception

Exception raised when a room is defined with invalid parameters.

Source code in src/amr_hub_abm/exceptions.py
37
38
39
40
41
42
class InvalidRoomError(Exception):
    """Exception raised when a room is defined with invalid parameters."""

    def __init__(self, message: str) -> None:
        """Initialize the InvalidRoomError."""
        super().__init__(f"Invalid room definition: {message}.")

__init__(message)

Initialize the InvalidRoomError.

Source code in src/amr_hub_abm/exceptions.py
40
41
42
def __init__(self, message: str) -> None:
    """Initialize the InvalidRoomError."""
    super().__init__(f"Invalid room definition: {message}.")

SimulationModeError

Bases: Exception

Exception raised when an operation is invalid for the current simulation mode.

Source code in src/amr_hub_abm/exceptions.py
4
5
6
7
8
9
class SimulationModeError(Exception):
    """Exception raised when an operation is invalid for the current simulation mode."""

    def __init__(self, message: str) -> None:
        """Initialize the SimulationModeError."""
        super().__init__(f"Simulation mode error: {message}.")

__init__(message)

Initialize the SimulationModeError.

Source code in src/amr_hub_abm/exceptions.py
7
8
9
def __init__(self, message: str) -> None:
    """Initialize the SimulationModeError."""
    super().__init__(f"Simulation mode error: {message}.")

TimeError

Bases: Exception

Exception raised when a negative time value is encountered.

Source code in src/amr_hub_abm/exceptions.py
29
30
31
32
33
34
class TimeError(Exception):
    """Exception raised when a negative time value is encountered."""

    def __init__(self, message: str) -> None:
        """Initialize the TimeError."""
        super().__init__(f"Invalid time value encountered: {message}.")

__init__(message)

Initialize the TimeError.

Source code in src/amr_hub_abm/exceptions.py
32
33
34
def __init__(self, message: str) -> None:
    """Initialize the TimeError."""
    super().__init__(f"Invalid time value encountered: {message}.")

read_space_input

Module to import space input data for the AMR Hub ABM simulation.

SpaceInputReader dataclass

Class to read space input data from a YAML file.

Source code in src/amr_hub_abm/read_space_input.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
@dataclass
class SpaceInputReader:
    """Class to read space input data from a YAML file."""

    input_path: Path
    data: dict = field(init=False)

    door_list: list[Door] = field(init=False, default_factory=list)
    wall_list: list[Wall] = field(init=False, default_factory=list)

    room_door_dict: dict[str, list[DetachedDoor]] = field(
        init=False, default_factory=dict
    )

    rooms: list[Room] = field(init=False, default_factory=list)
    buildings: list[Building] = field(init=False, default_factory=list)

    topological: bool = field(init=False, default=False)

    def __post_init__(self) -> None:
        """Post-initialization to read and validate the YAML file."""
        self.validation()
        self.get_room_name_dict()
        self.create_doors_from_detatched_doors()
        self.create_rooms_from_data()

        for room in self.rooms:
            msg = f"Room '{room.name} (id {room.room_id})' created."
            logger.info(msg)

        self.buildings = self.organise_rooms_into_floors_and_buildings(self.rooms)

    def get_room_name_dict(self) -> None:
        """Extract room names and their corresponding detatched doors."""
        for building_data in [self.data["building"]]:
            for floor_data in building_data["floors"]:
                for room_data in floor_data["rooms"]:
                    self.topological = "area" in room_data
                    room_name = room_data["name"]
                    doors: list[DetachedDoor] = []

                    if not self.topological:
                        for door_data in room_data.get("doors", []):
                            door = DetachedDoor(
                                is_open=False,
                                access_control=(False, False),
                                start=(door_data[0], door_data[1]),
                                end=(door_data[2], door_data[3]),
                            )
                            doors.append(door)
                        self.room_door_dict[room_name] = doors

                    else:
                        for door_name in room_data.get("doors", []):
                            door = DetachedDoor(
                                is_open=False,
                                access_control=(False, False),
                                name=door_name,
                            )
                            doors.append(door)
                        self.room_door_dict[room_name] = doors

    def create_doors_from_detatched_doors(self) -> None:
        """Create Door instances from detatched doors."""
        detatced_door_list = [
            door for doors in self.room_door_dict.values() for door in doors
        ]
        unique_detatched_doors = set(detatced_door_list)
        sorted_unique_detatched_doors = sorted(
            unique_detatched_doors, key=lambda d: (d.start, d.end)
        )

        sorted_room_names = sorted(room_name for room_name in self.room_door_dict)

        for detatched_door in sorted_unique_detatched_doors:
            connecting_rooms = [
                sorted_room_names.index(room_name)
                for room_name, doors in self.room_door_dict.items()
                if detatched_door in doors
            ]

            if len(connecting_rooms) != 2:
                msg = (
                    f"Door at {detatched_door.start}-{detatched_door.end} must connect "
                    f"exactly two rooms. Found {len(connecting_rooms)}."
                )
                logger.error(msg)
                raise InvalidDoorError(msg)

            if self.topological:
                door = Door(
                    is_open=detatched_door.is_open,
                    access_control=detatched_door.access_control,
                    name=detatched_door.name,
                    connecting_rooms=(connecting_rooms[0], connecting_rooms[1]),
                    door_id=sorted_unique_detatched_doors.index(detatched_door),
                )
            else:
                door = Door(
                    is_open=detatched_door.is_open,
                    access_control=detatched_door.access_control,
                    start=detatched_door.start,
                    end=detatched_door.end,
                    connecting_rooms=(connecting_rooms[0], connecting_rooms[1]),
                    door_id=sorted_unique_detatched_doors.index(detatched_door),
                )

            self.door_list.append(door)

    @staticmethod
    def organise_rooms_into_floors_and_buildings(rooms: list[Room]) -> list[Building]:
        """Organize rooms into floors and buildings."""
        all_buildings = {room.building for room in rooms}
        buildings: list[Building] = []
        for building_name in all_buildings:
            building_rooms = [room for room in rooms if room.building == building_name]
            all_floors = {room.floor for room in building_rooms}
            floors: list[Floor] = []
            for floor_number in all_floors:
                floor_rooms = [
                    room for room in building_rooms if room.floor == floor_number
                ]
                floor = Floor(floor_number=floor_number, rooms=floor_rooms)
                msg = (
                    f"Floor {floor.floor_number} created with {len(floor.rooms)} rooms."
                )
                logger.info(msg)
                floors.append(floor)
            building = Building(name=building_name, floors=floors)
            msg = f"Building '{building.name}' created with {len(floors)} floors."
            logger.info(msg)
            buildings.append(building)

        return buildings

    def create_rooms_from_data(self) -> None:
        """Create Room instances from the validated data."""
        sorted_room_names = sorted(room_name for room_name in self.room_door_dict)

        for index, name in enumerate(sorted_room_names):
            for building_data in [self.data["building"]]:
                for floor_data in building_data["floors"]:
                    floor_level = floor_data["level"]
                    for room_data in floor_data["rooms"]:
                        if room_data["name"] == name:
                            room = self.create_room(
                                room_data, index, building_data["name"], floor_level
                            )
                            self.rooms.append(room)

    def validation(self) -> None:
        """Validate the space input data from the YAML file."""
        with self.input_path.open("r", encoding="utf-8") as file:
            self.data = yaml.safe_load(file)

        msg = f"Loaded space input data from {self.input_path}"
        logger.info(msg)

        if "building" not in self.data:
            msg = "The input data must contain a 'building' key."
            logger.error(msg)
            raise KeyError(msg)

        building_data = self.data["building"]
        self.validate_building_data(building_data)

        floors_data = building_data["floors"]
        for floor_data in floors_data:
            self.validate_floor_data(floor_data)
            rooms_data = floor_data["rooms"]
            for room_data in rooms_data:
                self.validate_room_data(room_data)

    def create_room(
        self, room_data: dict, room_id: int, building_name: str, floor_level: int
    ) -> Room:
        """Create a Room instance from room data."""
        if self.topological:
            return self.create_topological_room(
                room_data, room_id, building_name, floor_level
            )
        return self.create_spatial_room(room_data, room_id, building_name, floor_level)

    def create_topological_room(
        self, room_data: dict, room_id: int, building_name: str, floor_level: int
    ) -> Room:
        """Create a topological Room instance from room data."""
        room_doors = [
            door for door in self.door_list if door.name in room_data["doors"]
        ]

        return Room(
            room_id=room_id,
            name=room_data["name"],
            building=building_name,
            floor=floor_level,
            walls=None,
            doors=room_doors,
            contents=room_data.get("contents", []),
            area=room_data["area"],
        )

    def create_spatial_room(
        self, room_data: dict, room_id: int, building_name: str, floor_level: int
    ) -> Room:
        """Create a spatial Room instance from room data."""
        room_doors = [
            door for door in self.door_list if room_id in door.connecting_rooms
        ]

        room_walls: list[Wall] = []
        for wall_data in room_data.get("walls", []):
            wall = Wall(
                start=(wall_data[0], wall_data[1]),
                end=(wall_data[2], wall_data[3]),
            )
            self.wall_list.append(wall)
            room_walls.append(wall)

        return Room(
            room_id=room_id,
            name=room_data["name"],
            building=building_name,
            floor=floor_level,
            walls=room_walls,
            doors=room_doors,
            contents=room_data.get("contents", []),
        )

    @staticmethod
    def validate_building_data(building_data: dict) -> None:
        """Validate the building data structure."""
        if "name" not in building_data:
            msg = "The 'building' data must contain a 'name' key."
            logger.error(msg)
            raise KeyError(msg)

        if "address" not in building_data:
            msg = "The 'building' data must contain an 'address' key."
            logger.error(msg)
            raise KeyError(msg)

        if "floors" not in building_data:
            msg = "The 'building' data must contain a 'floors' key."
            logger.error(msg)
            raise KeyError(msg)

    @staticmethod
    def validate_floor_data(floor_data: dict) -> None:
        """Validate the floor data structure."""
        if "level" not in floor_data:
            msg = "Each floor must have a 'level' defined."
            logger.error(msg)
            raise KeyError(msg)

        if "rooms" not in floor_data:
            msg = "Each floor must contain a 'rooms' key."
            logger.error(msg)
            raise KeyError(msg)

    @staticmethod
    def validate_room_data(room_data: dict) -> None:
        """Validate the room data structure."""
        if "name" not in room_data:
            msg = "Each room must have a 'name' defined."
            logger.error(msg)
            raise KeyError(msg)

        if "doors" not in room_data:
            msg = "Each room must have a 'doors' key."
            logger.error(msg)
            raise KeyError(msg)

        if "walls" not in room_data and "area" not in room_data:
            msg = "Each room must have either 'walls' or 'area' defined."
            logger.error(msg)
            raise KeyError(msg)

        topological = room_data.get("area", False)

        if topological and "walls" in room_data:
            msg = "A topological room cannot have walls defined."
            logger.error(msg)
            raise KeyError(msg)

        if not topological:
            walls: list[Wall] = []
            walls_data: list[list[float]] = room_data["walls"]
            for wall in walls_data:
                SpaceInputReader.check_tuple_length(wall, 4, "wall")
                walls.append(Wall(start=(wall[0], wall[1]), end=(wall[2], wall[3])))

            msg = f"Room '{room_data['name']}' walls validated successfully."
            logger.info(msg)

            doors_data: list[list[float]] = room_data["doors"]
            for door in doors_data:
                SpaceInputReader.check_tuple_length(door, 4, "door")

        else:
            doors_names: list[str] = room_data["doors"]
            for name in doors_names:
                if not isinstance(name, str):
                    msg = "In topological mode, doors must be defined by their names."
                    logger.error(msg)
                    raise InvalidDoorError(msg)

    @staticmethod
    def check_tuple_length(
        data_tuple: list[float], expected_length: int, data_type: str
    ) -> None:
        """Check if a data tuple has the expected length."""
        if data_type not in {"wall", "door"}:
            msg = f"data_type must be either 'wall' or 'door'. Got '{data_type}'."
            raise InvalidDefinitionError(msg)

        if len(data_tuple) != expected_length:
            msg = f"Each {data_type} must be defined by {expected_length} values."
            logger.error(msg)
            if data_type == "wall":
                raise InvalidRoomError(msg)
            raise InvalidDoorError(msg)

__post_init__()

Post-initialization to read and validate the YAML file.

Source code in src/amr_hub_abm/read_space_input.py
43
44
45
46
47
48
49
50
51
52
53
54
def __post_init__(self) -> None:
    """Post-initialization to read and validate the YAML file."""
    self.validation()
    self.get_room_name_dict()
    self.create_doors_from_detatched_doors()
    self.create_rooms_from_data()

    for room in self.rooms:
        msg = f"Room '{room.name} (id {room.room_id})' created."
        logger.info(msg)

    self.buildings = self.organise_rooms_into_floors_and_buildings(self.rooms)

check_tuple_length(data_tuple, expected_length, data_type) staticmethod

Check if a data tuple has the expected length.

Source code in src/amr_hub_abm/read_space_input.py
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
@staticmethod
def check_tuple_length(
    data_tuple: list[float], expected_length: int, data_type: str
) -> None:
    """Check if a data tuple has the expected length."""
    if data_type not in {"wall", "door"}:
        msg = f"data_type must be either 'wall' or 'door'. Got '{data_type}'."
        raise InvalidDefinitionError(msg)

    if len(data_tuple) != expected_length:
        msg = f"Each {data_type} must be defined by {expected_length} values."
        logger.error(msg)
        if data_type == "wall":
            raise InvalidRoomError(msg)
        raise InvalidDoorError(msg)

create_doors_from_detatched_doors()

Create Door instances from detatched doors.

Source code in src/amr_hub_abm/read_space_input.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def create_doors_from_detatched_doors(self) -> None:
    """Create Door instances from detatched doors."""
    detatced_door_list = [
        door for doors in self.room_door_dict.values() for door in doors
    ]
    unique_detatched_doors = set(detatced_door_list)
    sorted_unique_detatched_doors = sorted(
        unique_detatched_doors, key=lambda d: (d.start, d.end)
    )

    sorted_room_names = sorted(room_name for room_name in self.room_door_dict)

    for detatched_door in sorted_unique_detatched_doors:
        connecting_rooms = [
            sorted_room_names.index(room_name)
            for room_name, doors in self.room_door_dict.items()
            if detatched_door in doors
        ]

        if len(connecting_rooms) != 2:
            msg = (
                f"Door at {detatched_door.start}-{detatched_door.end} must connect "
                f"exactly two rooms. Found {len(connecting_rooms)}."
            )
            logger.error(msg)
            raise InvalidDoorError(msg)

        if self.topological:
            door = Door(
                is_open=detatched_door.is_open,
                access_control=detatched_door.access_control,
                name=detatched_door.name,
                connecting_rooms=(connecting_rooms[0], connecting_rooms[1]),
                door_id=sorted_unique_detatched_doors.index(detatched_door),
            )
        else:
            door = Door(
                is_open=detatched_door.is_open,
                access_control=detatched_door.access_control,
                start=detatched_door.start,
                end=detatched_door.end,
                connecting_rooms=(connecting_rooms[0], connecting_rooms[1]),
                door_id=sorted_unique_detatched_doors.index(detatched_door),
            )

        self.door_list.append(door)

create_room(room_data, room_id, building_name, floor_level)

Create a Room instance from room data.

Source code in src/amr_hub_abm/read_space_input.py
197
198
199
200
201
202
203
204
205
def create_room(
    self, room_data: dict, room_id: int, building_name: str, floor_level: int
) -> Room:
    """Create a Room instance from room data."""
    if self.topological:
        return self.create_topological_room(
            room_data, room_id, building_name, floor_level
        )
    return self.create_spatial_room(room_data, room_id, building_name, floor_level)

create_rooms_from_data()

Create Room instances from the validated data.

Source code in src/amr_hub_abm/read_space_input.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
def create_rooms_from_data(self) -> None:
    """Create Room instances from the validated data."""
    sorted_room_names = sorted(room_name for room_name in self.room_door_dict)

    for index, name in enumerate(sorted_room_names):
        for building_data in [self.data["building"]]:
            for floor_data in building_data["floors"]:
                floor_level = floor_data["level"]
                for room_data in floor_data["rooms"]:
                    if room_data["name"] == name:
                        room = self.create_room(
                            room_data, index, building_data["name"], floor_level
                        )
                        self.rooms.append(room)

create_spatial_room(room_data, room_id, building_name, floor_level)

Create a spatial Room instance from room data.

Source code in src/amr_hub_abm/read_space_input.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
def create_spatial_room(
    self, room_data: dict, room_id: int, building_name: str, floor_level: int
) -> Room:
    """Create a spatial Room instance from room data."""
    room_doors = [
        door for door in self.door_list if room_id in door.connecting_rooms
    ]

    room_walls: list[Wall] = []
    for wall_data in room_data.get("walls", []):
        wall = Wall(
            start=(wall_data[0], wall_data[1]),
            end=(wall_data[2], wall_data[3]),
        )
        self.wall_list.append(wall)
        room_walls.append(wall)

    return Room(
        room_id=room_id,
        name=room_data["name"],
        building=building_name,
        floor=floor_level,
        walls=room_walls,
        doors=room_doors,
        contents=room_data.get("contents", []),
    )

create_topological_room(room_data, room_id, building_name, floor_level)

Create a topological Room instance from room data.

Source code in src/amr_hub_abm/read_space_input.py
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
def create_topological_room(
    self, room_data: dict, room_id: int, building_name: str, floor_level: int
) -> Room:
    """Create a topological Room instance from room data."""
    room_doors = [
        door for door in self.door_list if door.name in room_data["doors"]
    ]

    return Room(
        room_id=room_id,
        name=room_data["name"],
        building=building_name,
        floor=floor_level,
        walls=None,
        doors=room_doors,
        contents=room_data.get("contents", []),
        area=room_data["area"],
    )

get_room_name_dict()

Extract room names and their corresponding detatched doors.

Source code in src/amr_hub_abm/read_space_input.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def get_room_name_dict(self) -> None:
    """Extract room names and their corresponding detatched doors."""
    for building_data in [self.data["building"]]:
        for floor_data in building_data["floors"]:
            for room_data in floor_data["rooms"]:
                self.topological = "area" in room_data
                room_name = room_data["name"]
                doors: list[DetachedDoor] = []

                if not self.topological:
                    for door_data in room_data.get("doors", []):
                        door = DetachedDoor(
                            is_open=False,
                            access_control=(False, False),
                            start=(door_data[0], door_data[1]),
                            end=(door_data[2], door_data[3]),
                        )
                        doors.append(door)
                    self.room_door_dict[room_name] = doors

                else:
                    for door_name in room_data.get("doors", []):
                        door = DetachedDoor(
                            is_open=False,
                            access_control=(False, False),
                            name=door_name,
                        )
                        doors.append(door)
                    self.room_door_dict[room_name] = doors

organise_rooms_into_floors_and_buildings(rooms) staticmethod

Organize rooms into floors and buildings.

Source code in src/amr_hub_abm/read_space_input.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
@staticmethod
def organise_rooms_into_floors_and_buildings(rooms: list[Room]) -> list[Building]:
    """Organize rooms into floors and buildings."""
    all_buildings = {room.building for room in rooms}
    buildings: list[Building] = []
    for building_name in all_buildings:
        building_rooms = [room for room in rooms if room.building == building_name]
        all_floors = {room.floor for room in building_rooms}
        floors: list[Floor] = []
        for floor_number in all_floors:
            floor_rooms = [
                room for room in building_rooms if room.floor == floor_number
            ]
            floor = Floor(floor_number=floor_number, rooms=floor_rooms)
            msg = (
                f"Floor {floor.floor_number} created with {len(floor.rooms)} rooms."
            )
            logger.info(msg)
            floors.append(floor)
        building = Building(name=building_name, floors=floors)
        msg = f"Building '{building.name}' created with {len(floors)} floors."
        logger.info(msg)
        buildings.append(building)

    return buildings

validate_building_data(building_data) staticmethod

Validate the building data structure.

Source code in src/amr_hub_abm/read_space_input.py
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
@staticmethod
def validate_building_data(building_data: dict) -> None:
    """Validate the building data structure."""
    if "name" not in building_data:
        msg = "The 'building' data must contain a 'name' key."
        logger.error(msg)
        raise KeyError(msg)

    if "address" not in building_data:
        msg = "The 'building' data must contain an 'address' key."
        logger.error(msg)
        raise KeyError(msg)

    if "floors" not in building_data:
        msg = "The 'building' data must contain a 'floors' key."
        logger.error(msg)
        raise KeyError(msg)

validate_floor_data(floor_data) staticmethod

Validate the floor data structure.

Source code in src/amr_hub_abm/read_space_input.py
271
272
273
274
275
276
277
278
279
280
281
282
@staticmethod
def validate_floor_data(floor_data: dict) -> None:
    """Validate the floor data structure."""
    if "level" not in floor_data:
        msg = "Each floor must have a 'level' defined."
        logger.error(msg)
        raise KeyError(msg)

    if "rooms" not in floor_data:
        msg = "Each floor must contain a 'rooms' key."
        logger.error(msg)
        raise KeyError(msg)

validate_room_data(room_data) staticmethod

Validate the room data structure.

Source code in src/amr_hub_abm/read_space_input.py
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
@staticmethod
def validate_room_data(room_data: dict) -> None:
    """Validate the room data structure."""
    if "name" not in room_data:
        msg = "Each room must have a 'name' defined."
        logger.error(msg)
        raise KeyError(msg)

    if "doors" not in room_data:
        msg = "Each room must have a 'doors' key."
        logger.error(msg)
        raise KeyError(msg)

    if "walls" not in room_data and "area" not in room_data:
        msg = "Each room must have either 'walls' or 'area' defined."
        logger.error(msg)
        raise KeyError(msg)

    topological = room_data.get("area", False)

    if topological and "walls" in room_data:
        msg = "A topological room cannot have walls defined."
        logger.error(msg)
        raise KeyError(msg)

    if not topological:
        walls: list[Wall] = []
        walls_data: list[list[float]] = room_data["walls"]
        for wall in walls_data:
            SpaceInputReader.check_tuple_length(wall, 4, "wall")
            walls.append(Wall(start=(wall[0], wall[1]), end=(wall[2], wall[3])))

        msg = f"Room '{room_data['name']}' walls validated successfully."
        logger.info(msg)

        doors_data: list[list[float]] = room_data["doors"]
        for door in doors_data:
            SpaceInputReader.check_tuple_length(door, 4, "door")

    else:
        doors_names: list[str] = room_data["doors"]
        for name in doors_names:
            if not isinstance(name, str):
                msg = "In topological mode, doors must be defined by their names."
                logger.error(msg)
                raise InvalidDoorError(msg)

validation()

Validate the space input data from the YAML file.

Source code in src/amr_hub_abm/read_space_input.py
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
def validation(self) -> None:
    """Validate the space input data from the YAML file."""
    with self.input_path.open("r", encoding="utf-8") as file:
        self.data = yaml.safe_load(file)

    msg = f"Loaded space input data from {self.input_path}"
    logger.info(msg)

    if "building" not in self.data:
        msg = "The input data must contain a 'building' key."
        logger.error(msg)
        raise KeyError(msg)

    building_data = self.data["building"]
    self.validate_building_data(building_data)

    floors_data = building_data["floors"]
    for floor_data in floors_data:
        self.validate_floor_data(floor_data)
        rooms_data = floor_data["rooms"]
        for room_data in rooms_data:
            self.validate_room_data(room_data)

simulation

The main simulation module for the AMR Hub ABM.

Simulation dataclass

Representation of the AMR Hub ABM simulation.

Source code in src/amr_hub_abm/simulation.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
@dataclass
class Simulation:
    """Representation of the AMR Hub ABM simulation."""

    name: str
    description: str
    mode: SimulationMode

    space: list[Building]
    agents: list[Agent]

    total_simulation_time: int

    time: int = field(default=0, init=False)

    def step(self, plot_path: Path | None = None) -> None:
        """Advance the simulation by one time step."""
        if self.time >= self.total_simulation_time:
            msg = "Simulation has already reached its total simulation time."
            raise TimeError(msg)

        # randomize agent order each step to avoid bias
        random.shuffle(self.agents)

        for agent in self.agents:
            agent.perform_task(current_time=self.time, rooms=self.rooms)

        if plot_path is not None:
            self.plot_current_state(directory_path=plot_path)

        self.time += 1

    def plot_current_state(self, directory_path: Path) -> None:
        """Plot the current state of the simulation."""
        if directory_path.suffix != "":
            msg = f"The path {directory_path} is not a directory."
            raise NotADirectoryError(msg)
        directory_path.mkdir(parents=True, exist_ok=True)

        for building in self.space:
            axes: list[Axes] = [plt.subplots(nrows=len(building.floors), ncols=1)[1]]
            building.plot_building(axes=axes, agents=self.agents)
            simulation_name = f"Simulation: {self.name}"
            time = f"Time: {self.time}/{self.total_simulation_time}"
            plt.suptitle(f"{simulation_name} | {time}")
            plt.savefig(
                directory_path
                / f"plot_{self.name}_building_{building.name}_time_{self.time}.png"
            )
            plt.close()

    def __repr__(self) -> str:
        """Representation of the simulation."""
        header = f"Simulation: {self.name}\nDescription: {self.description}\n"
        header += f"Mode: {self.mode.value}\n"
        header += f"Total Simulation Time: {self.total_simulation_time}\n"
        header += f"Current Time: {self.time}\n"
        header += f"Number of Buildings: {len(self.space)}\n"
        header += f"Number of Agents: {len(self.agents)}\n"

        buildings_repr = "\n".join([repr(building) for building in self.space])
        agents_repr = "\n".join([repr(agent) for agent in self.agents])

        return f"{header}\nBuildings:\n{buildings_repr}\n\nAgents:\n{agents_repr}"

    @property
    def rooms(self) -> list[Room]:
        """Get all rooms in the simulation space."""
        all_rooms: list[Room] = []
        for building in self.space:
            for floor in building.floors:
                all_rooms.extend(floor.rooms)
        return all_rooms

rooms property

Get all rooms in the simulation space.

__repr__()

Representation of the simulation.

Source code in src/amr_hub_abm/simulation.py
82
83
84
85
86
87
88
89
90
91
92
93
94
def __repr__(self) -> str:
    """Representation of the simulation."""
    header = f"Simulation: {self.name}\nDescription: {self.description}\n"
    header += f"Mode: {self.mode.value}\n"
    header += f"Total Simulation Time: {self.total_simulation_time}\n"
    header += f"Current Time: {self.time}\n"
    header += f"Number of Buildings: {len(self.space)}\n"
    header += f"Number of Agents: {len(self.agents)}\n"

    buildings_repr = "\n".join([repr(building) for building in self.space])
    agents_repr = "\n".join([repr(agent) for agent in self.agents])

    return f"{header}\nBuildings:\n{buildings_repr}\n\nAgents:\n{agents_repr}"

plot_current_state(directory_path)

Plot the current state of the simulation.

Source code in src/amr_hub_abm/simulation.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def plot_current_state(self, directory_path: Path) -> None:
    """Plot the current state of the simulation."""
    if directory_path.suffix != "":
        msg = f"The path {directory_path} is not a directory."
        raise NotADirectoryError(msg)
    directory_path.mkdir(parents=True, exist_ok=True)

    for building in self.space:
        axes: list[Axes] = [plt.subplots(nrows=len(building.floors), ncols=1)[1]]
        building.plot_building(axes=axes, agents=self.agents)
        simulation_name = f"Simulation: {self.name}"
        time = f"Time: {self.time}/{self.total_simulation_time}"
        plt.suptitle(f"{simulation_name} | {time}")
        plt.savefig(
            directory_path
            / f"plot_{self.name}_building_{building.name}_time_{self.time}.png"
        )
        plt.close()

step(plot_path=None)

Advance the simulation by one time step.

Source code in src/amr_hub_abm/simulation.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def step(self, plot_path: Path | None = None) -> None:
    """Advance the simulation by one time step."""
    if self.time >= self.total_simulation_time:
        msg = "Simulation has already reached its total simulation time."
        raise TimeError(msg)

    # randomize agent order each step to avoid bias
    random.shuffle(self.agents)

    for agent in self.agents:
        agent.perform_task(current_time=self.time, rooms=self.rooms)

    if plot_path is not None:
        self.plot_current_state(directory_path=plot_path)

    self.time += 1

SimulationMode

Bases: Enum

Enumeration of simulation modes.

Source code in src/amr_hub_abm/simulation.py
24
25
26
27
28
class SimulationMode(Enum):
    """Enumeration of simulation modes."""

    SPATIAL = "spatial"
    TOPOLOGICAL = "topological"

simulation_factory

Module for creating simulation instances.

create_simulation(config_file)

Create a simulation instance from a configuration file.

Parameters:

Name Type Description Default
config_file Path

Path to the configuration file.

required

Returns:

Name Type Description
Simulation Simulation

An instance of the Simulation class.

Source code in src/amr_hub_abm/simulation_factory.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def create_simulation(config_file: Path) -> Simulation:
    """
    Create a simulation instance from a configuration file.

    Args:
        config_file (Path): Path to the configuration file.

    Returns:
        Simulation: An instance of the Simulation class.

    """
    if not config_file.exists():
        msg = f"Configuration file not found: {config_file}"
        raise FileNotFoundError(msg)

    with config_file.open(encoding="utf-8") as file:
        config_data = yaml.safe_load(file)

    buildings_path = Path(config_data["buildings_path"])
    msg = f"Buildings path from config: {buildings_path}"
    logger.debug(msg)
    space_reader = SpaceInputReader(buildings_path)
    logger.debug("Buildings loaded successfully.")
    logger.debug(space_reader.buildings)

    start_time = pd.to_datetime(config_data["start_time"])
    time_step_minutes = config_data["time_step_minutes"]
    end_time = pd.to_datetime(config_data["end_time"])
    total_minutes = (end_time - start_time).total_seconds() / 60
    total_steps = int(total_minutes // time_step_minutes)
    logger.info("Total simulation time steps: %d", total_steps)

    timeseries_data = read_location_timeseries(
        file_path=Path(config_data["location_timeseries_path"])
    )

    agents = parse_location_timeseries(
        timeseries_data=timeseries_data,
        rooms=space_reader.rooms,
        start_time=start_time,
        time_step_minutes=time_step_minutes,
    )
    msg = f"Parsed {len(agents)} agents from location time series."
    logger.info(msg)
    logger.info("Simulation creation complete.")

    return Simulation(
        name="AMR Hub ABM Simulation",
        description="A simulation instance created from configuration.",
        mode=SimulationMode.SPATIAL,
        space=space_reader.buildings,
        agents=agents,
        total_simulation_time=total_steps,
    )

create_space_from_rooms(rooms)

Create a list of Building instances from a list of Room instances.

Source code in src/amr_hub_abm/simulation_factory.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def create_space_from_rooms(rooms: list[Room]) -> list[Building]:
    """Create a list of Building instances from a list of Room instances."""
    building_dict: dict[str, Building] = {}

    for room in rooms:
        if room.building not in building_dict:
            building_dict[room.building] = Building(name=room.building, floors=[])

        if room.floor not in [
            f.floor_number for f in building_dict[room.building].floors
        ]:
            building_dict[room.building].floors.append(
                Floor(floor_number=room.floor, rooms=[room])
            )
        else:
            for floor in building_dict[room.building].floors:
                if floor.floor_number == room.floor:
                    floor.rooms.append(room)
                    break

    return list(building_dict.values())

get_random_location(room, building, floor)

Get a random location within a room.

Source code in src/amr_hub_abm/simulation_factory.py
118
119
120
121
122
123
124
125
126
def get_random_location(room: Room, building: str, floor: int) -> Location:
    """Get a random location within a room."""
    point = room.get_random_point()
    return Location(
        building=building,
        floor=floor,
        x=point[0],
        y=point[1],
    )

parse_location_string(location_str)

Parse a location string into its components.

Parameters:

Name Type Description Default
location_str str

The location string in the format "BuildingName:x,y".

required

Returns:

Type Description
str

tuple[str, int, str]: A tuple containing the building name, floor number,

int

and room name.

Source code in src/amr_hub_abm/simulation_factory.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
def parse_location_string(location_str: str) -> tuple[str, int, str]:
    """
    Parse a location string into its components.

    Args:
        location_str (str): The location string in the format "BuildingName:x,y".

    Returns:
        tuple[str, int, str]: A tuple containing the building name, floor number,
        and room name.

    """
    building_part, floor, room = location_str.split(":")
    return building_part, int(floor), room

parse_location_timeseries(timeseries_data, rooms, start_time, time_step_minutes)

Parse a CSV file containing location time series data for agents.

Parameters:

Name Type Description Default
timeseries_data DataFrame

DataFrame containing the location time series.

required

Returns:

Type Description
list[Agent]

list[Agent]: A list of Agent instances with populated location time series.

Source code in src/amr_hub_abm/simulation_factory.py
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
def parse_location_timeseries(
    timeseries_data: pd.DataFrame,
    rooms: list[Room],
    start_time: pd.Timestamp,
    time_step_minutes: int,
) -> list[Agent]:
    """
    Parse a CSV file containing location time series data for agents.

    Args:
        timeseries_data (pd.DataFrame): DataFrame containing the location time series.

    Returns:
        list[Agent]: A list of Agent instances with populated location time series.

    """
    hcw_dict: dict[int, Agent] = {}
    patient_dict: dict[int, Agent] = {}

    for _, row in timeseries_data.iterrows():
        hcw_id = int(row["hcw_id"])
        timestamp = row["timestamp"]
        location_str = row["location"]
        patient_id = int(row["patient_id"]) if row["patient_id"] != "-" else None
        event_type = row["event_type"]
        door_id = int(row["door_id"]) if row["door_id"] != "-" else None

        timestep = pd.to_datetime(timestamp)
        timestep_index = timestamp_to_timestep(timestep, start_time, time_step_minutes)
        building, floor, room_str = parse_location_string(location_str)
        additional_info: dict[Any, Any] = {}

        room = next(
            (
                r
                for r in rooms
                if r.name == room_str and r.building == building and r.floor == floor
            ),
            None,
        )

        if room is None:
            msg = f"Room not found: {room_str} in building {building} on floor {floor}"
            raise SimulationModeError(msg)

        if event_type == "attend_patient" and patient_id is None:
            msg = f"Patient ID must be provided for 'attend' events. Row: {row}"
            raise SimulationModeError(msg)

        if patient_id:
            update_patient(
                patient_id=patient_id,
                space_tuple=(building, floor, room),
                patient_dict=patient_dict,
                space=create_space_from_rooms(rooms),
            )
            patient = patient_dict[patient_id]

        if event_type == "attend_patient" and patient_id is not None:
            additional_info["patient"] = patient
            location = patient.location

        elif event_type == "door_access":
            if door_id is None:
                msg = f"Door ID must be provided for 'door_access' events. Row: {row}"
                raise SimulationModeError(msg)

            door = next(
                (d for d in room.doors if d.door_id == door_id),
                None,
            )

            if door is None:
                msg = f"Door ID {door_id} not found in room {room.name}. Row: {row}"
                msg += f" Available doors: {[d.door_id for d in room.doors]}"
                raise SimulationModeError(msg)

            midpoint = door.line.interpolate(0.5, normalized=True)
            point = (midpoint.x, midpoint.y)
            additional_info["door"] = door

            location = Location(
                building=building,
                floor=floor,
                x=point[0],
                y=point[1],
            )
        elif event_type == "workstation":
            location = get_random_location(room, building, floor)

        else:
            msg = f"Unknown event type: {event_type} in row: {row}"
            raise SimulationModeError(msg)

        update_hcw(
            hcw_id=hcw_id,
            space_tuple=(building, floor, room),
            event_tuple=(location, timestep_index, event_type),
            hcw_dict=hcw_dict,
            additional_info=additional_info or None,
            space=create_space_from_rooms(rooms),
        )

    return list(hcw_dict.values()) + list(patient_dict.values())

read_location_timeseries(file_path)

Read a CSV file containing location time series data for agents.

Parameters:

Name Type Description Default
file_path Path

Path to the CSV file.

required

Returns:

Type Description
DataFrame

pd.DataFrame: DataFrame containing the location time series data.

Source code in src/amr_hub_abm/simulation_factory.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
def read_location_timeseries(
    file_path: Path,
) -> pd.DataFrame:
    """
    Read a CSV file containing location time series data for agents.

    Args:
        file_path (Path): Path to the CSV file.

    Returns:
        pd.DataFrame: DataFrame containing the location time series data.

    """
    if not file_path.exists():
        msg = f"Location time series file not found: {file_path}"
        raise FileNotFoundError(msg)

    return pd.read_csv(file_path)

timestamp_to_timestep(timestamp, start_time, time_step_minutes)

Convert a timestamp to a simulation time step index.

Parameters:

Name Type Description Default
timestamp Timestamp

The timestamp to convert.

required
start_time Timestamp

The simulation start time.

required
time_step_minutes int

The duration of each time step in minutes.

required

Returns:

Name Type Description
int int

The corresponding time step index.

Source code in src/amr_hub_abm/simulation_factory.py
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
def timestamp_to_timestep(
    timestamp: pd.Timestamp,
    start_time: pd.Timestamp,
    time_step_minutes: int,
) -> int:
    """
    Convert a timestamp to a simulation time step index.

    Args:
        timestamp (pd.Timestamp): The timestamp to convert.
        start_time (pd.Timestamp): The simulation start time.
        time_step_minutes (int): The duration of each time step in minutes.

    Returns:
        int: The corresponding time step index.

    """
    delta = timestamp - start_time
    total_minutes = delta.total_seconds() / 60
    return int(total_minutes // time_step_minutes)

update_hcw(hcw_id, space_tuple, event_tuple, hcw_dict, space, additional_info=None)

Update healthcare worker information from data.

Source code in src/amr_hub_abm/simulation_factory.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
def update_hcw(  # noqa: PLR0913
    hcw_id: int,
    space_tuple: tuple[str, int, Room],
    event_tuple: tuple[Location, int, str],
    hcw_dict: dict[int, Agent],
    space: list[Building],
    additional_info: dict | None = None,
) -> None:
    """Update healthcare worker information from data."""
    building, floor, room = space_tuple
    location, timestep_index, event_type = event_tuple

    if hcw_id not in hcw_dict:
        hcw_location = get_random_location(room, building, floor)
        hcw_dict[hcw_id] = Agent(
            idx=hcw_id,
            location=hcw_location,
            heading_rad=0.0,
            agent_type=AgentType.HEALTHCARE_WORKER,
            space=space,
        )

    hcw_dict[hcw_id].add_task(timestep_index, location, event_type, additional_info)

update_patient(patient_id, space_tuple, patient_dict, space)

Update patient information from data.

Source code in src/amr_hub_abm/simulation_factory.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def update_patient(
    patient_id: int,
    space_tuple: tuple[str, int, Room],
    patient_dict: dict[int, Agent],
    space: list[Building],
) -> None:
    """Update patient information from data."""
    building, floor, room = space_tuple

    if patient_id is not None and patient_id not in patient_dict:
        location = get_random_location(room, building, floor)

        patient_dict[patient_id] = Agent(
            idx=patient_id,
            location=location,
            heading_rad=0.0,
            agent_type=AgentType.PATIENT,
            space=space,
        )

space

Space module for the AMR Hub ABM simulation.

building

Module containing building representation for the AMR Hub ABM simulation.

Building dataclass

Representation of a building in the AMR Hub ABM simulation.

Source code in src/amr_hub_abm/space/building.py
15
16
17
18
19
20
21
22
23
24
25
26
27
@dataclass
class Building:
    """Representation of a building in the AMR Hub ABM simulation."""

    name: str
    floors: list[Floor]

    def plot_building(
        self, axes: list[Axes], agents: list[Agent] | None = None
    ) -> None:
        """Plot the building layout."""
        for floor, ax in zip(self.floors, axes, strict=True):
            floor.plot(ax=ax, agents=agents)
plot_building(axes, agents=None)

Plot the building layout.

Source code in src/amr_hub_abm/space/building.py
22
23
24
25
26
27
def plot_building(
    self, axes: list[Axes], agents: list[Agent] | None = None
) -> None:
    """Plot the building layout."""
    for floor, ax in zip(self.floors, axes, strict=True):
        floor.plot(ax=ax, agents=agents)

content

Module defining room content types for the rooms of the AMR Hub ABM simulation.

Content dataclass

Enumeration of possible room contents.

Source code in src/amr_hub_abm/space/content.py
14
15
16
17
18
19
@dataclass
class Content:
    """Enumeration of possible room contents."""

    content_id: int
    content_type: ContentType

ContentType

Bases: Enum

Enumeration of possible room content types.

Source code in src/amr_hub_abm/space/content.py
 7
 8
 9
10
11
class ContentType(Enum):
    """Enumeration of possible room content types."""

    BED = "bed"
    WORKSTATION = "workstation"

door

Module defining door representation for the AMR Hub ABM simulation.

DetachedDoor dataclass

Representation of a detached door in the AMR Hub ABM simulation.

Source code in src/amr_hub_abm/space/door.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
@dataclass(kw_only=True, frozen=True)
class DetachedDoor:
    """Representation of a detached door in the AMR Hub ABM simulation."""

    is_open: bool
    access_control: tuple[bool, bool]
    name: str | None = field(default=None)
    start: tuple[float, float] | None = field(default=None)
    end: tuple[float, float] | None = field(default=None)

    def _identity_key(self) -> tuple[object, ...]:
        """Key used for equality + hashing. Ignores mutable state."""
        if self.name is not None:
            return ("name", self.name)
        # at this point start/end are both not None due to validation

        if self.start is None or self.end is None:
            msg = "Cannot create identity key from door without name or coordinates."
            raise InvalidDoorError(msg)
        return ("coords", self.start, self.end)

    def __eq__(self, other: object) -> bool:
        """Define equality comparison for DetachedDoor instances."""
        if not isinstance(other, DetachedDoor):
            return NotImplemented
        return self._identity_key() == other._identity_key()

    def __hash__(self) -> int:
        """Define hash for DetachedDoor instances."""
        return hash(self._identity_key())

    def check_for_start_end_consistency(self) -> None:
        """Check that start and end points are consistent."""
        if (self.start is None) != (self.end is None):
            msg = "Both start and end points must be None or both must be defined."
            raise InvalidDoorError(msg)

        if (self.start is not None and self.end is not None) and self.start == self.end:
            msg = "Door start and end points cannot be the same."
            raise InvalidDoorError(msg)

    def __post_init__(self) -> None:
        """Post-initialization to validate door coordinates."""
        if self.start is None and self.end is None:
            # If both start and end are None, we are in topological mode
            # and don't need to check coordinates. We just need to ensure that the door
            # has a name for identity purposes.
            if self.name is None:
                msg = "Door must have a name if start and end points are not defined."
                raise InvalidDoorError(msg)
            return

        self.check_for_start_end_consistency()

        if self.start is None or self.end is None:
            # This should never happen due to the consistency check, but we check again
            # to address mypy's anger issues.
            msg = "Both start and end points must be defined when in spatial mode."
            raise InvalidDoorError(msg)

        if self.start > self.end:
            temp = self.start
            object.__setattr__(self, "start", self.end)
            object.__setattr__(self, "end", temp)
__eq__(other)

Define equality comparison for DetachedDoor instances.

Source code in src/amr_hub_abm/space/door.py
31
32
33
34
35
def __eq__(self, other: object) -> bool:
    """Define equality comparison for DetachedDoor instances."""
    if not isinstance(other, DetachedDoor):
        return NotImplemented
    return self._identity_key() == other._identity_key()
__hash__()

Define hash for DetachedDoor instances.

Source code in src/amr_hub_abm/space/door.py
37
38
39
def __hash__(self) -> int:
    """Define hash for DetachedDoor instances."""
    return hash(self._identity_key())
__post_init__()

Post-initialization to validate door coordinates.

Source code in src/amr_hub_abm/space/door.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def __post_init__(self) -> None:
    """Post-initialization to validate door coordinates."""
    if self.start is None and self.end is None:
        # If both start and end are None, we are in topological mode
        # and don't need to check coordinates. We just need to ensure that the door
        # has a name for identity purposes.
        if self.name is None:
            msg = "Door must have a name if start and end points are not defined."
            raise InvalidDoorError(msg)
        return

    self.check_for_start_end_consistency()

    if self.start is None or self.end is None:
        # This should never happen due to the consistency check, but we check again
        # to address mypy's anger issues.
        msg = "Both start and end points must be defined when in spatial mode."
        raise InvalidDoorError(msg)

    if self.start > self.end:
        temp = self.start
        object.__setattr__(self, "start", self.end)
        object.__setattr__(self, "end", temp)
check_for_start_end_consistency()

Check that start and end points are consistent.

Source code in src/amr_hub_abm/space/door.py
41
42
43
44
45
46
47
48
49
def check_for_start_end_consistency(self) -> None:
    """Check that start and end points are consistent."""
    if (self.start is None) != (self.end is None):
        msg = "Both start and end points must be None or both must be defined."
        raise InvalidDoorError(msg)

    if (self.start is not None and self.end is not None) and self.start == self.end:
        msg = "Door start and end points cannot be the same."
        raise InvalidDoorError(msg)

Door dataclass

Bases: DetachedDoor

Representation of a door in the AMR Hub ABM simulation.

Source code in src/amr_hub_abm/space/door.py
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
@dataclass(eq=False, kw_only=True, frozen=True)
class Door(DetachedDoor):
    """Representation of a door in the AMR Hub ABM simulation."""

    connecting_rooms: tuple[int, int]
    door_id: int

    def __lt__(self, other: object) -> bool:
        """Define less-than comparison for Door instances."""
        if not isinstance(other, Door):
            return NotImplemented
        return self._identity_key() < other._identity_key()

    def __post_init__(self) -> None:
        """Post-initialization to validate door coordinates and create hash."""
        super().__post_init__()

    @property
    def line(self) -> shapely.geometry.LineString:
        """Get the line representation of the door."""
        if self.start is None or self.end is None:
            msg = "Door start and end must be defined when not in topological mode."
            raise InvalidDoorError(msg)
        return shapely.geometry.LineString([self.start, self.end])
line property

Get the line representation of the door.

__lt__(other)

Define less-than comparison for Door instances.

Source code in src/amr_hub_abm/space/door.py
83
84
85
86
87
def __lt__(self, other: object) -> bool:
    """Define less-than comparison for Door instances."""
    if not isinstance(other, Door):
        return NotImplemented
    return self._identity_key() < other._identity_key()
__post_init__()

Post-initialization to validate door coordinates and create hash.

Source code in src/amr_hub_abm/space/door.py
89
90
91
def __post_init__(self) -> None:
    """Post-initialization to validate door coordinates and create hash."""
    super().__post_init__()

floor

Module for Floor class.

Floor dataclass

Representation of a floor in a building.

Source code in src/amr_hub_abm/space/floor.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
@dataclass
class Floor:
    """Representation of a floor in a building."""

    floor_number: int
    rooms: list[Room]
    pseudo_rooms: list[Room] = field(init=False, default_factory=list)

    def __post_init__(self) -> None:
        """Post-initialization to validate floor attributes."""
        room_ids = [room.room_id for room in self.rooms]
        if len(room_ids) != len(set(room_ids)):
            msg = f"Duplicate room IDs found on floor {self.floor_number}."
            raise InvalidRoomError(msg)

    @property
    def room_ids(self) -> list[int]:
        """Get a list of room IDs on the floor."""
        return sorted([room.room_id for room in self.rooms])

    @property
    def room_names(self) -> list[str]:
        """Get a list of room names on the floor."""
        id_name_map = {room.room_id: room.name for room in self.rooms}
        return [id_name_map[room_id] for room_id in self.room_ids]

    @property
    def edge_set(self) -> set[tuple[int, int]]:
        """Get a list of all wall edges on the floor."""
        edges = set()
        for room in self.rooms:
            for door in room.doors:
                edges.add(door.connecting_rooms)
                edges.add((door.connecting_rooms[1], door.connecting_rooms[0]))
        return edges

    @property
    def adjacency_matrix(self) -> np.ndarray:
        """Get the adjacency matrix representing room connections on the floor."""
        n = len(self.rooms)
        room_id_to_index = {
            room_id: index for index, room_id in enumerate(self.room_ids)
        }
        adjacency_matrix = np.zeros((n, n), dtype=int)

        for edge in self.edge_set:
            room1, room2 = edge
            index1 = room_id_to_index[room1]
            index2 = room_id_to_index[room2]
            adjacency_matrix[index1, index2] = 1

        return adjacency_matrix

    def plot(self, ax: Axes, agents: list[Agent] | None = None) -> None:
        """Plot the floor layout including rooms and doors."""
        for room in self.rooms:
            room.plot(ax=ax, agents=agents)

    def add_pseudo_rooms(self) -> None:
        """Add pseudo-rooms to the floor."""
        for existing_room in self.rooms:
            if not existing_room.walls:
                pseudo_room = Floor.create_spatial_room_from_pseudo_room(existing_room)
                self.pseudo_rooms.append(pseudo_room)

    @staticmethod
    def create_spatial_room_from_pseudo_room(room: Room) -> Room:
        """
        Create a spatial room from a pseudo-room based on area.

        Args:
            room (Room): The pseudo-room to convert.

        Returns:
            Room: A spatial room with walls and doors based on the pseudo-room's area.

        Raises:
            InvalidRoomError: If the pseudo-room does not have a valid positive area.

        Note:
            This method generates a rectangular room layout based on the area of the
            pseudo-room. The doors are positioned along one side of the rectangle. This
            function is not complete. Currently it only creates single simple
            rectangular rooms. Due to possibly complex topology, rectangular rooms may
            not always be a valid representation. Currently, no topology connections
            are considered. This function will be improved in future versions.

        """
        if not room.area or room.area <= 0:
            msg = "Pseudo-room must have a valid positive area."
            raise InvalidRoomError(msg)
        length = max(room.area**0.5, 2 * len(room.doors) + 2)
        width = room.area / length

        pseudo_doors = room.doors.copy()
        for count, door in enumerate(pseudo_doors):
            object.__setattr__(door, "start", (2 * count + 1, width))
            object.__setattr__(door, "end", (2 * count + 2, width))

        pseudo_walls = [
            Wall((0, width), (0, 0)),
            Wall((0, 0), (length, 0)),
            Wall((length, 0), (length, width)),
        ]

        doorside_walls = [
            Wall((2 * count, width), (2 * count + 1, width))
            for count in range(len(pseudo_doors))
        ]
        doorside_walls.append(Wall((len(pseudo_doors) * 2, width), (length, width)))
        pseudo_walls.extend(doorside_walls)

        return Room(
            room_id=room.room_id,
            name=room.name,
            building=room.building,
            floor=room.floor,
            walls=pseudo_walls,
            doors=pseudo_doors,
            contents=room.contents,
        )

    def find_room_by_location(self, location: tuple[float, float]) -> Room | None:
        """Find the room that contains the given location."""
        for room in self.rooms:
            if room.walls and room.contains_point(location):
                return room
        return None
adjacency_matrix property

Get the adjacency matrix representing room connections on the floor.

edge_set property

Get a list of all wall edges on the floor.

room_ids property

Get a list of room IDs on the floor.

room_names property

Get a list of room names on the floor.

__post_init__()

Post-initialization to validate floor attributes.

Source code in src/amr_hub_abm/space/floor.py
28
29
30
31
32
33
def __post_init__(self) -> None:
    """Post-initialization to validate floor attributes."""
    room_ids = [room.room_id for room in self.rooms]
    if len(room_ids) != len(set(room_ids)):
        msg = f"Duplicate room IDs found on floor {self.floor_number}."
        raise InvalidRoomError(msg)
add_pseudo_rooms()

Add pseudo-rooms to the floor.

Source code in src/amr_hub_abm/space/floor.py
78
79
80
81
82
83
def add_pseudo_rooms(self) -> None:
    """Add pseudo-rooms to the floor."""
    for existing_room in self.rooms:
        if not existing_room.walls:
            pseudo_room = Floor.create_spatial_room_from_pseudo_room(existing_room)
            self.pseudo_rooms.append(pseudo_room)
create_spatial_room_from_pseudo_room(room) staticmethod

Create a spatial room from a pseudo-room based on area.

Parameters:

Name Type Description Default
room Room

The pseudo-room to convert.

required

Returns:

Name Type Description
Room Room

A spatial room with walls and doors based on the pseudo-room's area.

Raises:

Type Description
InvalidRoomError

If the pseudo-room does not have a valid positive area.

Note

This method generates a rectangular room layout based on the area of the pseudo-room. The doors are positioned along one side of the rectangle. This function is not complete. Currently it only creates single simple rectangular rooms. Due to possibly complex topology, rectangular rooms may not always be a valid representation. Currently, no topology connections are considered. This function will be improved in future versions.

Source code in src/amr_hub_abm/space/floor.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
@staticmethod
def create_spatial_room_from_pseudo_room(room: Room) -> Room:
    """
    Create a spatial room from a pseudo-room based on area.

    Args:
        room (Room): The pseudo-room to convert.

    Returns:
        Room: A spatial room with walls and doors based on the pseudo-room's area.

    Raises:
        InvalidRoomError: If the pseudo-room does not have a valid positive area.

    Note:
        This method generates a rectangular room layout based on the area of the
        pseudo-room. The doors are positioned along one side of the rectangle. This
        function is not complete. Currently it only creates single simple
        rectangular rooms. Due to possibly complex topology, rectangular rooms may
        not always be a valid representation. Currently, no topology connections
        are considered. This function will be improved in future versions.

    """
    if not room.area or room.area <= 0:
        msg = "Pseudo-room must have a valid positive area."
        raise InvalidRoomError(msg)
    length = max(room.area**0.5, 2 * len(room.doors) + 2)
    width = room.area / length

    pseudo_doors = room.doors.copy()
    for count, door in enumerate(pseudo_doors):
        object.__setattr__(door, "start", (2 * count + 1, width))
        object.__setattr__(door, "end", (2 * count + 2, width))

    pseudo_walls = [
        Wall((0, width), (0, 0)),
        Wall((0, 0), (length, 0)),
        Wall((length, 0), (length, width)),
    ]

    doorside_walls = [
        Wall((2 * count, width), (2 * count + 1, width))
        for count in range(len(pseudo_doors))
    ]
    doorside_walls.append(Wall((len(pseudo_doors) * 2, width), (length, width)))
    pseudo_walls.extend(doorside_walls)

    return Room(
        room_id=room.room_id,
        name=room.name,
        building=room.building,
        floor=room.floor,
        walls=pseudo_walls,
        doors=pseudo_doors,
        contents=room.contents,
    )
find_room_by_location(location)

Find the room that contains the given location.

Source code in src/amr_hub_abm/space/floor.py
142
143
144
145
146
147
def find_room_by_location(self, location: tuple[float, float]) -> Room | None:
    """Find the room that contains the given location."""
    for room in self.rooms:
        if room.walls and room.contains_point(location):
            return room
    return None
plot(ax, agents=None)

Plot the floor layout including rooms and doors.

Source code in src/amr_hub_abm/space/floor.py
73
74
75
76
def plot(self, ax: Axes, agents: list[Agent] | None = None) -> None:
    """Plot the floor layout including rooms and doors."""
    for room in self.rooms:
        room.plot(ax=ax, agents=agents)

location

Module containing location representation for the AMR Hub ABM simulation.

Location dataclass

Representation of a location in the AMR Hub ABM simulation.

Source code in src/amr_hub_abm/space/location.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
@dataclass
class Location:
    """Representation of a location in the AMR Hub ABM simulation."""

    x: float
    y: float
    floor: int
    building: str | None = None

    def move(self, new_x: float, new_y: float, new_floor: int) -> None:
        """Move the location to new coordinates."""
        self.x = new_x
        self.y = new_y
        self.floor = new_floor

    def distance_to(self, other: Location) -> float:
        """Calculate the Euclidean distance to another location."""
        if self.building != other.building:
            raise InvalidDistanceError((self.building, other.building), building=True)
        if self.floor != other.floor:
            raise InvalidDistanceError((self.floor, other.floor), building=False)

        return math.sqrt((self.x - other.x) ** 2 + (self.y - other.y) ** 2)

    def __repr__(self) -> str:
        """Return a string representation of the location."""
        return (
            f"Location(x={self.x:.2f}, y={self.y:.2f}, {self.floor}, {self.building})"
        )

    def which_room(self, rooms: list[Room]) -> Room | None:
        """Determine which room the location is in, if any."""
        for room in rooms:
            if room.building != self.building or room.floor != self.floor:
                continue
            if room.region.contains(shapely.geometry.Point(self.x, self.y)):
                return room
        return None

    def check_line_of_sight(self, other: Location, walls: list[Wall]) -> bool:
        """Check if there is a line of sight to another location, considering walls."""
        if self.building != other.building:
            return False

        if self.floor != other.floor:
            return False

        line_of_sight = shapely.geometry.LineString(
            [(self.x, self.y), (other.x, other.y)]
        )

        return not any(line_of_sight.crosses(wall.line) for wall in walls)
__repr__()

Return a string representation of the location.

Source code in src/amr_hub_abm/space/location.py
42
43
44
45
46
def __repr__(self) -> str:
    """Return a string representation of the location."""
    return (
        f"Location(x={self.x:.2f}, y={self.y:.2f}, {self.floor}, {self.building})"
    )
check_line_of_sight(other, walls)

Check if there is a line of sight to another location, considering walls.

Source code in src/amr_hub_abm/space/location.py
57
58
59
60
61
62
63
64
65
66
67
68
69
def check_line_of_sight(self, other: Location, walls: list[Wall]) -> bool:
    """Check if there is a line of sight to another location, considering walls."""
    if self.building != other.building:
        return False

    if self.floor != other.floor:
        return False

    line_of_sight = shapely.geometry.LineString(
        [(self.x, self.y), (other.x, other.y)]
    )

    return not any(line_of_sight.crosses(wall.line) for wall in walls)
distance_to(other)

Calculate the Euclidean distance to another location.

Source code in src/amr_hub_abm/space/location.py
33
34
35
36
37
38
39
40
def distance_to(self, other: Location) -> float:
    """Calculate the Euclidean distance to another location."""
    if self.building != other.building:
        raise InvalidDistanceError((self.building, other.building), building=True)
    if self.floor != other.floor:
        raise InvalidDistanceError((self.floor, other.floor), building=False)

    return math.sqrt((self.x - other.x) ** 2 + (self.y - other.y) ** 2)
move(new_x, new_y, new_floor)

Move the location to new coordinates.

Source code in src/amr_hub_abm/space/location.py
27
28
29
30
31
def move(self, new_x: float, new_y: float, new_floor: int) -> None:
    """Move the location to new coordinates."""
    self.x = new_x
    self.y = new_y
    self.floor = new_floor
which_room(rooms)

Determine which room the location is in, if any.

Source code in src/amr_hub_abm/space/location.py
48
49
50
51
52
53
54
55
def which_room(self, rooms: list[Room]) -> Room | None:
    """Determine which room the location is in, if any."""
    for room in rooms:
        if room.building != self.building or room.floor != self.floor:
            continue
        if room.region.contains(shapely.geometry.Point(self.x, self.y)):
            return room
    return None

room

Module defining room-related classes for the AMR Hub ABM simulation.

Room dataclass

Representation of a room in the AMR Hub ABM simulation.

Source code in src/amr_hub_abm/space/room.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
@dataclass
class Room:
    """Representation of a room in the AMR Hub ABM simulation."""

    room_id: int
    name: str
    building: str
    floor: int
    contents: list[Content]
    doors: list[Door]
    walls: list[Wall] | None = field(default=None)
    area: float | None = field(default=None)
    region: shapely.geometry.Polygon = field(init=False)
    room_hash: str = field(init=False)

    def __post_init__(self) -> None:
        """Post-initialization to validate room attributes."""
        if not self.walls and not self.area:
            msg = "Either walls or area must be provided to define a room."
            raise SimulationModeError(msg)

        if self.walls and len(self.walls) < 3:
            msg = "A room must have at least 3 walls to form a closed region."
            raise InvalidRoomError(msg)

        if self.walls and self.area:
            msg = "Provide either walls or area, not both, to define a room."
            raise SimulationModeError(msg)

        if not self.area:
            self.area = self.form_region().area

        if self.area <= 0:
            msg = f"Room area must be positive. Got {self.area}."
            raise InvalidRoomError(msg)

        if self.walls:
            self.region = self.form_region()
        else:
            self.region = shapely.geometry.Polygon()
            logger.warning(
                "Room %s has no walls; region is set to an empty polygon.", self.name
            )

        self.room_hash = (
            self.create_polygon_hash() if self.walls else self.create_name_hash()
        )

    def __hash__(self) -> int:
        """Generate a hash for the room based on its unique hash string."""
        return hash(self.room_hash)

    def __eq__(self, other: object) -> bool:
        """Check equality of two rooms based on their unique hash strings."""
        if not isinstance(other, Room):
            return NotImplemented
        return self.room_hash == other.room_hash

    def create_polygon_hash(self) -> str:
        """Create a unique hash for the room based on its polygonal region."""
        if not self.walls:
            msg = "Cannot create polygon hash without walls."
            raise SimulationModeError(msg)

        return hashlib.sha256(shapely.ops.orient(self.region).wkb).hexdigest()

    def create_name_hash(self) -> str:
        """Create a unique hash for the room based on its name."""
        return hashlib.sha256(self.name.encode("utf-8")).hexdigest()

    def form_region(self) -> shapely.geometry.Polygon:
        """Get the polygonal region of the room based on its walls."""
        if self.walls is None:
            msg = "Cannot form region without walls."
            raise InvalidRoomError(msg)

        merged_lines = shapely.ops.linemerge(
            [wall.line for wall in self.walls] + [door.line for door in self.doors]
        )

        polygon = shapely.ops.polygonize(merged_lines)

        if len(polygon) == 0:
            msg = "The walls do not form a valid closed region."
            raise InvalidRoomError(msg)

        return polygon[0]

    def plot(self, ax: Axes, agents: list[Agent] | None = None, **kwargs: dict) -> None:
        """Plot the room on a given matplotlib axis."""
        if not self.walls:
            msg = "Cannot plot room without walls."
            raise SimulationModeError(msg)

        for wall in self.walls:
            wall.plot(ax, color="black")  # type: ignore  # noqa: PGH003

        for door in self.doors:
            x, y = door.line.xy
            ax.plot(
                x,
                y,
                color=kwargs.get("door_color", "brown"),
                linewidth=kwargs.get("door_width", 2),
            )

        if agents is None:
            return

        for agent in agents:
            if (
                agent.location.building == self.building
                and agent.location.floor == self.floor
            ) and self.contains_point((agent.location.x, agent.location.y)):
                agent.plot_agent(ax)

    def contains_point(self, point: tuple[float, float]) -> bool:
        """Check if a given point is inside the room."""
        if not self.walls:
            msg = "Cannot check point containment without walls."
            raise SimulationModeError(msg)

        return self.region.contains(shapely.geometry.Point(point))

    def get_random_point(
        self, rng: np.random.Generator | None = None, max_attempts: int = 1000
    ) -> tuple[float, float]:
        """Get a random point within the room."""
        if not self.walls:
            msg = "Cannot get random point without walls."
            raise SimulationModeError(msg)

        if rng is None:
            rng = np.random.default_rng()

        minx, miny, maxx, maxy = self.region.bounds

        for _ in range(max_attempts):
            # If required later... Improve efficiency using batching or spatial indexing
            random_point = shapely.geometry.Point(
                rng.uniform(minx, maxx), rng.uniform(miny, maxy)
            )
            if self.region.contains(random_point):
                return (random_point.x, random_point.y)

        msg = f"""
        Failed to find a random point within the room after {max_attempts} attempts.
        Consider increasing max_attempts or checking room geometry.
        """
        raise SimulationModeError(msg)

    def get_door_access_point(self) -> tuple[Door, tuple[float, float]]:
        """Get a point near one of the room's doors for access."""
        if not self.doors:
            msg = f"Room {self.name} has no doors for access."
            raise InvalidRoomError(msg)

        if len(self.doors) > 1:
            msg = f"Room {self.name} has multiple doors; \
            This functionality is not supported for now."
            raise InvalidRoomError(msg)

        door = self.doors[0]
        midpoint = door.line.interpolate(0.5, normalized=True)
        return (door, (midpoint.x, midpoint.y))
__eq__(other)

Check equality of two rooms based on their unique hash strings.

Source code in src/amr_hub_abm/space/room.py
79
80
81
82
83
def __eq__(self, other: object) -> bool:
    """Check equality of two rooms based on their unique hash strings."""
    if not isinstance(other, Room):
        return NotImplemented
    return self.room_hash == other.room_hash
__hash__()

Generate a hash for the room based on its unique hash string.

Source code in src/amr_hub_abm/space/room.py
75
76
77
def __hash__(self) -> int:
    """Generate a hash for the room based on its unique hash string."""
    return hash(self.room_hash)
__post_init__()

Post-initialization to validate room attributes.

Source code in src/amr_hub_abm/space/room.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def __post_init__(self) -> None:
    """Post-initialization to validate room attributes."""
    if not self.walls and not self.area:
        msg = "Either walls or area must be provided to define a room."
        raise SimulationModeError(msg)

    if self.walls and len(self.walls) < 3:
        msg = "A room must have at least 3 walls to form a closed region."
        raise InvalidRoomError(msg)

    if self.walls and self.area:
        msg = "Provide either walls or area, not both, to define a room."
        raise SimulationModeError(msg)

    if not self.area:
        self.area = self.form_region().area

    if self.area <= 0:
        msg = f"Room area must be positive. Got {self.area}."
        raise InvalidRoomError(msg)

    if self.walls:
        self.region = self.form_region()
    else:
        self.region = shapely.geometry.Polygon()
        logger.warning(
            "Room %s has no walls; region is set to an empty polygon.", self.name
        )

    self.room_hash = (
        self.create_polygon_hash() if self.walls else self.create_name_hash()
    )
contains_point(point)

Check if a given point is inside the room.

Source code in src/amr_hub_abm/space/room.py
143
144
145
146
147
148
149
def contains_point(self, point: tuple[float, float]) -> bool:
    """Check if a given point is inside the room."""
    if not self.walls:
        msg = "Cannot check point containment without walls."
        raise SimulationModeError(msg)

    return self.region.contains(shapely.geometry.Point(point))
create_name_hash()

Create a unique hash for the room based on its name.

Source code in src/amr_hub_abm/space/room.py
93
94
95
def create_name_hash(self) -> str:
    """Create a unique hash for the room based on its name."""
    return hashlib.sha256(self.name.encode("utf-8")).hexdigest()
create_polygon_hash()

Create a unique hash for the room based on its polygonal region.

Source code in src/amr_hub_abm/space/room.py
85
86
87
88
89
90
91
def create_polygon_hash(self) -> str:
    """Create a unique hash for the room based on its polygonal region."""
    if not self.walls:
        msg = "Cannot create polygon hash without walls."
        raise SimulationModeError(msg)

    return hashlib.sha256(shapely.ops.orient(self.region).wkb).hexdigest()
form_region()

Get the polygonal region of the room based on its walls.

Source code in src/amr_hub_abm/space/room.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def form_region(self) -> shapely.geometry.Polygon:
    """Get the polygonal region of the room based on its walls."""
    if self.walls is None:
        msg = "Cannot form region without walls."
        raise InvalidRoomError(msg)

    merged_lines = shapely.ops.linemerge(
        [wall.line for wall in self.walls] + [door.line for door in self.doors]
    )

    polygon = shapely.ops.polygonize(merged_lines)

    if len(polygon) == 0:
        msg = "The walls do not form a valid closed region."
        raise InvalidRoomError(msg)

    return polygon[0]
get_door_access_point()

Get a point near one of the room's doors for access.

Source code in src/amr_hub_abm/space/room.py
178
179
180
181
182
183
184
185
186
187
188
189
190
191
def get_door_access_point(self) -> tuple[Door, tuple[float, float]]:
    """Get a point near one of the room's doors for access."""
    if not self.doors:
        msg = f"Room {self.name} has no doors for access."
        raise InvalidRoomError(msg)

    if len(self.doors) > 1:
        msg = f"Room {self.name} has multiple doors; \
        This functionality is not supported for now."
        raise InvalidRoomError(msg)

    door = self.doors[0]
    midpoint = door.line.interpolate(0.5, normalized=True)
    return (door, (midpoint.x, midpoint.y))
get_random_point(rng=None, max_attempts=1000)

Get a random point within the room.

Source code in src/amr_hub_abm/space/room.py
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
def get_random_point(
    self, rng: np.random.Generator | None = None, max_attempts: int = 1000
) -> tuple[float, float]:
    """Get a random point within the room."""
    if not self.walls:
        msg = "Cannot get random point without walls."
        raise SimulationModeError(msg)

    if rng is None:
        rng = np.random.default_rng()

    minx, miny, maxx, maxy = self.region.bounds

    for _ in range(max_attempts):
        # If required later... Improve efficiency using batching or spatial indexing
        random_point = shapely.geometry.Point(
            rng.uniform(minx, maxx), rng.uniform(miny, maxy)
        )
        if self.region.contains(random_point):
            return (random_point.x, random_point.y)

    msg = f"""
    Failed to find a random point within the room after {max_attempts} attempts.
    Consider increasing max_attempts or checking room geometry.
    """
    raise SimulationModeError(msg)
plot(ax, agents=None, **kwargs)

Plot the room on a given matplotlib axis.

Source code in src/amr_hub_abm/space/room.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
def plot(self, ax: Axes, agents: list[Agent] | None = None, **kwargs: dict) -> None:
    """Plot the room on a given matplotlib axis."""
    if not self.walls:
        msg = "Cannot plot room without walls."
        raise SimulationModeError(msg)

    for wall in self.walls:
        wall.plot(ax, color="black")  # type: ignore  # noqa: PGH003

    for door in self.doors:
        x, y = door.line.xy
        ax.plot(
            x,
            y,
            color=kwargs.get("door_color", "brown"),
            linewidth=kwargs.get("door_width", 2),
        )

    if agents is None:
        return

    for agent in agents:
        if (
            agent.location.building == self.building
            and agent.location.floor == self.floor
        ) and self.contains_point((agent.location.x, agent.location.y)):
            agent.plot_agent(ax)

wall

Module defining wall representation for the AMR Hub ABM simulation.

Wall dataclass

Representation of a wall in the AMR Hub ABM simulation.

Source code in src/amr_hub_abm/space/wall.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@dataclass
class Wall:
    """Representation of a wall in the AMR Hub ABM simulation."""

    start: tuple[float, float]
    end: tuple[float, float]
    thickness: float = 0.2

    @property
    def line(self) -> shapely.geometry.LineString:
        """Get the line representation of the wall."""
        return shapely.geometry.LineString([self.start, self.end])

    @property
    def polygon(self) -> shapely.geometry.Polygon:
        """Get the polygon representation of the wall based on its thickness."""
        line = self.line
        return line.buffer(self.thickness / 2, cap_style="square")

    def plot(self, ax: Axes, **kwargs: dict) -> None:
        """Plot the wall on a given matplotlib axis."""
        x, y = self.polygon.exterior.xy
        ax.fill(x, y, **kwargs)  # pyright: ignore[reportArgumentType]
line property

Get the line representation of the wall.

polygon property

Get the polygon representation of the wall based on its thickness.

plot(ax, **kwargs)

Plot the wall on a given matplotlib axis.

Source code in src/amr_hub_abm/space/wall.py
28
29
30
31
def plot(self, ax: Axes, **kwargs: dict) -> None:
    """Plot the wall on a given matplotlib axis."""
    x, y = self.polygon.exterior.xy
    ax.fill(x, y, **kwargs)  # pyright: ignore[reportArgumentType]

task

Module for AMR Hub ABM tasks.

Task dataclass

Representation of a task assigned to an agent.

Source code in src/amr_hub_abm/task.py
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
@dataclass
class Task:
    """Representation of a task assigned to an agent."""

    task_type: ClassVar[TaskType] = TaskType.GENERIC

    time_needed: int
    time_due: int

    progress: TaskProgress = field(default=TaskProgress.NOT_STARTED, kw_only=True)
    priority: TaskPriority = field(default=TaskPriority.MEDIUM, kw_only=True)

    location: Location = field(init=False)

    time_started: int = field(init=False)
    time_completed: int = field(init=False)

    def time_spent(self, current_time: int) -> int:
        """Calculate the time spent on the task so far."""
        if self.progress == TaskProgress.COMPLETED:
            return self.time_completed - self.time_started

        if self.progress == TaskProgress.IN_PROGRESS:
            return current_time - self.time_started

        return 0

    def __post_init__(self) -> None:
        """Post-initialization to validate task attributes."""
        if self.time_needed < 0:
            msg = "Time needed for a task cannot be negative."
            raise TimeError(msg)

        if self.time_due < 0:
            msg = "Time due for a task cannot be negative."
            raise TimeError(msg)

    def update_progress(self, current_time: int, agent: Agent) -> None:
        """Update the progress of the task based on time spent."""
        if self.progress == TaskProgress.COMPLETED:
            return

        time_spent = self.time_spent(current_time=current_time)

        if time_spent >= self.time_needed:
            self.progress = TaskProgress.COMPLETED
            self.time_completed = current_time
            return

        if self.progress == TaskProgress.IN_PROGRESS:
            logger.info(
                "Agent id %s performing task %s at location %s.",
                agent.idx,
                self.task_type,
                self.location,
            )
            return

        if not agent.check_if_location_reached(self.location):
            self.progress = TaskProgress.MOVING_TO_LOCATION
            logger.info(
                "Agent id %s moving to task location %s.", agent.idx, self.location
            )
            agent.head_to_point((self.location.x, self.location.y))
            agent.move_one_step()
            return

        if self.progress == TaskProgress.MOVING_TO_LOCATION:
            self.progress = TaskProgress.IN_PROGRESS
            self.time_started = current_time

    def __repr__(self) -> str:
        """Representation of the task."""
        return (
            f"Task(type={self.task_type}, priority={self.priority}, "
            f"progress={self.progress}, time_needed={self.time_needed}, "
            f"time_due={self.time_due})"
        )

__post_init__()

Post-initialization to validate task attributes.

Source code in src/amr_hub_abm/task.py
82
83
84
85
86
87
88
89
90
def __post_init__(self) -> None:
    """Post-initialization to validate task attributes."""
    if self.time_needed < 0:
        msg = "Time needed for a task cannot be negative."
        raise TimeError(msg)

    if self.time_due < 0:
        msg = "Time due for a task cannot be negative."
        raise TimeError(msg)

__repr__()

Representation of the task.

Source code in src/amr_hub_abm/task.py
126
127
128
129
130
131
132
def __repr__(self) -> str:
    """Representation of the task."""
    return (
        f"Task(type={self.task_type}, priority={self.priority}, "
        f"progress={self.progress}, time_needed={self.time_needed}, "
        f"time_due={self.time_due})"
    )

time_spent(current_time)

Calculate the time spent on the task so far.

Source code in src/amr_hub_abm/task.py
72
73
74
75
76
77
78
79
80
def time_spent(self, current_time: int) -> int:
    """Calculate the time spent on the task so far."""
    if self.progress == TaskProgress.COMPLETED:
        return self.time_completed - self.time_started

    if self.progress == TaskProgress.IN_PROGRESS:
        return current_time - self.time_started

    return 0

update_progress(current_time, agent)

Update the progress of the task based on time spent.

Source code in src/amr_hub_abm/task.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
def update_progress(self, current_time: int, agent: Agent) -> None:
    """Update the progress of the task based on time spent."""
    if self.progress == TaskProgress.COMPLETED:
        return

    time_spent = self.time_spent(current_time=current_time)

    if time_spent >= self.time_needed:
        self.progress = TaskProgress.COMPLETED
        self.time_completed = current_time
        return

    if self.progress == TaskProgress.IN_PROGRESS:
        logger.info(
            "Agent id %s performing task %s at location %s.",
            agent.idx,
            self.task_type,
            self.location,
        )
        return

    if not agent.check_if_location_reached(self.location):
        self.progress = TaskProgress.MOVING_TO_LOCATION
        logger.info(
            "Agent id %s moving to task location %s.", agent.idx, self.location
        )
        agent.head_to_point((self.location.x, self.location.y))
        agent.move_one_step()
        return

    if self.progress == TaskProgress.MOVING_TO_LOCATION:
        self.progress = TaskProgress.IN_PROGRESS
        self.time_started = current_time

TaskAttendPatient dataclass

Bases: Task

Representation of an 'attend patient' task.

Source code in src/amr_hub_abm/task.py
148
149
150
151
152
153
154
155
156
157
158
@dataclass
class TaskAttendPatient(Task):
    """Representation of an 'attend patient' task."""

    task_type: ClassVar[TaskType] = TaskType.ATTEND_PATIENT
    patient: Agent

    def __post_init__(self) -> None:
        """Post-initialization to set the task location."""
        super().__post_init__()
        self.location = self.patient.location

__post_init__()

Post-initialization to set the task location.

Source code in src/amr_hub_abm/task.py
155
156
157
158
def __post_init__(self) -> None:
    """Post-initialization to set the task location."""
    super().__post_init__()
    self.location = self.patient.location

TaskDoorAccess dataclass

Bases: Task

Representation of a 'door access' task.

Source code in src/amr_hub_abm/task.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
@dataclass
class TaskDoorAccess(Task):
    """Representation of a 'door access' task."""

    task_type: ClassVar[TaskType] = TaskType.DOOR_ACCESS
    door: Door
    building: str
    floor: int

    def __post_init__(self) -> None:
        """Post-initialization to set the task location."""
        super().__post_init__()

        if self.door.start is None or self.door.end is None:
            msg = "Door must have defined start and end points to set task location."
            raise SimulationModeError(msg)

        self.location = Location(
            building=self.building,
            floor=self.floor,
            x=(self.door.start[0] + self.door.end[0]) / 2,
            y=(self.door.start[1] + self.door.end[1]) / 2,
        )

__post_init__()

Post-initialization to set the task location.

Source code in src/amr_hub_abm/task.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
def __post_init__(self) -> None:
    """Post-initialization to set the task location."""
    super().__post_init__()

    if self.door.start is None or self.door.end is None:
        msg = "Door must have defined start and end points to set task location."
        raise SimulationModeError(msg)

    self.location = Location(
        building=self.building,
        floor=self.floor,
        x=(self.door.start[0] + self.door.end[0]) / 2,
        y=(self.door.start[1] + self.door.end[1]) / 2,
    )

TaskGotoLocation dataclass

Bases: Task

Representation of a 'goto location' task.

Source code in src/amr_hub_abm/task.py
135
136
137
138
139
140
141
142
143
144
145
@dataclass
class TaskGotoLocation(Task):
    """Representation of a 'goto location' task."""

    task_type: ClassVar[TaskType] = TaskType.GOTO_LOCATION
    destination_location: Location

    def __post_init__(self) -> None:
        """Post-initialization to set the task location."""
        super().__post_init__()
        self.location = self.destination_location

__post_init__()

Post-initialization to set the task location.

Source code in src/amr_hub_abm/task.py
142
143
144
145
def __post_init__(self) -> None:
    """Post-initialization to set the task location."""
    super().__post_init__()
    self.location = self.destination_location

TaskPriority

Bases: Enum

Enumeration of possible task priority levels.

Source code in src/amr_hub_abm/task.py
47
48
49
50
51
52
class TaskPriority(Enum):
    """Enumeration of possible task priority levels."""

    LOW = 1
    MEDIUM = 2
    HIGH = 3

TaskProgress

Bases: Enum

Enumeration of possible task progress states.

Source code in src/amr_hub_abm/task.py
20
21
22
23
24
25
26
27
class TaskProgress(Enum):
    """Enumeration of possible task progress states."""

    NOT_STARTED = "not_started"
    MOVING_TO_LOCATION = "moving_to_location"
    SUSPENDED = "suspended"
    IN_PROGRESS = "in_progress"
    COMPLETED = "completed"

TaskType

Bases: Enum

Enumeration of possible task types.

Source code in src/amr_hub_abm/task.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
class TaskType(Enum):
    """Enumeration of possible task types."""

    GENERIC = "generic"
    OFFICE_WORK = "office_work"
    NURSE_ROUND = "nurse_round"
    ATTEND_PATIENT = "attend_patient"
    GOTO_LOCATION = "goto_location"
    GOTO_AGENT = "goto_agent"
    ATTEND_BELL = "attend_bell"
    STAY_IN_BED = "stay_in_bed"
    STAY_IN_ROOM = "stay_in_room"
    INTERACT_WITH_AGENT = "interact_with_agent"
    DOOR_ACCESS = "door_access"
    WORKSTATION = "workstation"

TaskWorkstation dataclass

Bases: Task

Representation of a 'workstation' task.

Source code in src/amr_hub_abm/task.py
186
187
188
189
190
191
192
193
194
195
196
@dataclass
class TaskWorkstation(Task):
    """Representation of a 'workstation' task."""

    task_type: ClassVar[TaskType] = TaskType.WORKSTATION
    workstation_location: Location

    def __post_init__(self) -> None:
        """Post-initialization to set the task location."""
        super().__post_init__()
        self.location = self.workstation_location

__post_init__()

Post-initialization to set the task location.

Source code in src/amr_hub_abm/task.py
193
194
195
196
def __post_init__(self) -> None:
    """Post-initialization to set the task location."""
    super().__post_init__()
    self.location = self.workstation_location