"""
Agentpy Grid Module
Content: Class for discrete spatial environments
"""
import itertools
import numpy as np
import random as rd
import collections.abc as abc
import numpy.lib.recfunctions as rfs
from .objects import SpatialEnvironment
from .tools import make_list, make_matrix, AgentpyError, ListDict
from .sequences import AgentSet, AgentIter, AgentList
class _IterArea:
""" Iteratable object that takes either a numpy matrix or an iterable
as an input. If the object is an ndarray, it is flattened and iterated
over the contents of each element chained together. Otherwise, it is
simply iterated over the object.
Arguments:
area: Area of sets of elements.
exclude: Element to exclude. Assumes that element is in area.
"""
def __init__(self, area, exclude=None):
self.area = area
self.exclude = exclude
def __len__(self):
if isinstance(self.area, np.ndarray):
len_ = sum([len(s) for s in self.area.flat])
else:
len_ = len(self.area)
if self.exclude:
len_ -= 1 # Assumes that exclude is in Area
return len_
def __iter__(self):
if self.exclude:
if isinstance(self.area, np.ndarray):
return itertools.filterfalse(
lambda x: x is self.exclude,
itertools.chain.from_iterable(self.area.flat)
)
else:
return itertools.filterfalse(
lambda x: x is self.exclude, self.area)
else:
if isinstance(self.area, np.ndarray):
return itertools.chain.from_iterable(self.area.flat)
else:
return iter(self.area)
[docs]class GridIter(AgentIter):
""" Iterator over objects in :class:`Grid` that supports slicing.
Examples:
Create a model with a 10 by 10 grid
with one agent in each position::
model = ap.Model()
agents = ap.AgentList(model, 100)
grid = ap.Grid(model, (10, 10))
grid.add_agents(agents)
The following returns an iterator over the agents in all position::
>>> grid.agents
GridIter (100 objects)
The following returns an iterator over the agents
in the top-left quarter of the grid::
>>> grid.agents[0:5, 0:5]
GridIter (25 objects)
"""
def __init__(self, model, iter_, items):
super().__init__(model, iter_)
object.__setattr__(self, '_items', items)
def __getitem__(self, item):
sub_area = self._items[item]
return GridIter(self._model, _IterArea(sub_area), sub_area)
[docs]class Grid(SpatialEnvironment):
""" Environment that contains agents with a discrete spatial topology,
supporting multiple agents and attribute fields per cell.
For a continuous spatial topology, see :class:`Space`.
This class can be used as a parent class for custom grid types.
All agentpy model objects call the method :func:`setup` after creation,
and can access class attributes like dictionary items.
Arguments:
model (Model):
The model instance.
shape (tuple of int):
Size of the grid.
The length of the tuple defines the number of dimensions,
and the values in the tuple define the length of each dimension.
torus (bool, optional):
Whether to connect borders (default False).
If True, the grid will be toroidal, meaning that agents who
move over a border will re-appear on the opposite side.
If False, they will remain at the edge of the border.
track_empty (bool, optional):
Whether to keep track of empty cells (default False).
If true, empty cells can be accessed via :obj:`Grid.empty`.
check_border (bool, optional):
Ensure that agents stay within border (default True).
Can be set to False for faster performance.
**kwargs: Will be forwarded to :func:`Grid.setup`.
Attributes:
agents (GridIter):
Iterator over all agents in the grid.
positions (dict of Agent):
Dictionary linking each agent instance to its position.
grid (numpy.rec.array):
Structured numpy record array with a field 'agents'
that holds an :class:`AgentSet` in each position.
shape (tuple of int):
Length of each dimension.
ndim (int):
Number of dimensions.
all (list):
List of all positions in the grid.
empty (ListDict):
List of unoccupied positions, only available
if the Grid was initiated with `track_empty=True`.
"""
@staticmethod
def _agent_field(field_name, shape, model):
# Prepare structured array filled with empty agent sets
array = np.empty(shape, dtype=[(field_name, object)])
it = np.nditer(array, flags=['refs_ok', 'multi_index'])
for _ in it:
array[it.multi_index] = AgentSet(model)
return array
def __init__(self, model, shape, torus=False,
track_empty=False, check_border=True, **kwargs):
super().__init__(model)
self._track_empty = track_empty
self._check_border = check_border
self._torus = torus
self.positions = {}
self.grid = np.rec.array(self._agent_field('agents', shape, model))
self.shape = tuple(shape)
self.ndim = len(self.shape)
self.all = list(itertools.product(*[range(x) for x in shape]))
self.empty = ListDict(self.all) if track_empty else None
self._set_var_ignore()
self.setup(**kwargs)
@property
def agents(self):
return GridIter(self.model, self.positions.keys(), self.grid.agents)
# Add and remove agents ------------------------------------------------- #
def _add_agent(self, agent, position, field):
position = tuple(position)
self.grid[field][position].add(agent) # Add agent to grid
self.positions[agent] = position # Add agent position to dict
[docs] def add_agents(self, agents, positions=None, random=False, empty=False):
""" Adds agents to the grid environment.
Arguments:
agents (Sequence of Agent):
Iterable of agents to be added.
positions (Sequence of positions, optional):
The positions of the agents.
Must have the same length as 'agents',
with each entry being a tuple of integers.
If none is passed, positions will be chosen automatically
based on the arguments 'random' and 'empty':
- random and empty:
Random selection without repetition from `Grid.empty`.
- random and not empty:
Random selection with repetition from `Grid.all`.
- not random and empty:
Iterative selection from `Grid.empty`.
- not random and not empty:
Iterative selection from `Grid.all`.
random (bool, optional):
Whether to choose random positions (default False).
empty (bool, optional):
Whether to choose only empty cells (default False).
Can only be True if Grid was initiated with `track_empty=True`.
"""
field = 'agents'
if empty and not self._track_empty:
raise AgentpyError(
"To use 'Grid.add_agents()' with 'empty=True', "
"Grid must be iniated with 'track_empty=True'.")
# Choose positions
if positions:
pass
elif random:
n = len(agents)
if empty:
positions = self.model.random.sample(self.empty, k=n)
else:
positions = self.model.random.choices(self.all, k=n)
else:
if empty:
positions = list(self.empty) # Soft copy
else:
positions = itertools.cycle(self.all)
if empty and len(positions) < len(agents):
raise AgentpyError("Cannot add more agents than empty positions.")
if self._track_empty:
for agent, position in zip(agents, positions):
self._add_agent(agent, position, field)
if position in self.empty:
self.empty.remove(position)
else:
for agent, position in zip(agents, positions):
self._add_agent(agent, position, field)
[docs] def remove_agents(self, agents):
""" Removes agents from the environment. """
for agent in make_list(agents):
pos = self.positions[agent] # Get position
self.grid.agents[pos].remove(agent) # Remove agent from grid
del self.positions[agent] # Remove agent from position dict
if self._track_empty:
self.empty.append(pos) # Add position to free spots
# Move and select agents ------------------------------------------------ #
@staticmethod
def _border_behavior(position, shape, torus):
# Connected - Jump to other side
if torus:
new_position = tuple(x % x_max for x, x_max
in zip(position, shape))
# Not connected - Stop at border
else:
new_position = tuple(np.clip(position, 0,
np.array(shape)-1))
return new_position
[docs] def move_to(self, agent, pos):
""" Moves agent to new position.
Arguments:
agent (Agent): Instance of the agent.
pos (tuple of int): New position of the agent.
"""
pos_old = self.positions[agent]
if pos != pos_old:
# Grid options
if self._check_border:
pos = self._border_behavior(pos, self.shape, self._torus)
if self._track_empty:
if len(self.grid.agents[pos_old]) == 1:
if pos in self.empty:
self.empty.replace(pos, pos_old)
else:
self.empty.append(pos_old)
elif pos in self.empty:
self.empty.remove(pos)
self.grid.agents[pos_old].remove(agent)
self.grid.agents[pos].add(agent)
self.positions[agent] = pos
[docs] def move_by(self, agent, path):
""" Moves agent to new position, relative to current position.
Arguments:
agent (Agent): Instance of the agent.
path (tuple of int): Relative change of position.
"""
pos = [p + c for p, c in zip(self.positions[agent], path)]
self.move_to(agent, tuple(pos))
[docs] def neighbors(self, agent, distance=1):
""" Select neighbors of an agent within a given distance.
Arguments:
agent (Agent): Instance of the agent.
distance (int, optional):
Number of cells to cover in each direction,
including diagonally connected cells (default 1).
Returns:
AgentIter: Iterator over the selected neighbors.
"""
pos = self.positions[agent]
# TODO Change method upon initiation
# Case 1: Toroidal
if self._torus:
slices = [(p-distance, p+distance+1) for p in pos]
new_slices = []
for (x_from, x_to), x_max in zip(slices, self.shape):
if distance >= x_max//2 :
sl_tupl = [(0, x_max)]
elif x_to > x_max:
sl_tupl = [(x_from, x_max), (0, x_to - x_max)]
elif x_from < 0:
sl_tupl = [(x_max + x_from, x_max), (0, x_to)]
else:
sl_tupl = [(x_from, x_to)]
new_slices.append(sl_tupl)
areas = []
for slices in itertools.product(*new_slices):
slices = tuple(slice(*sl) for sl in slices)
areas.append(self.grid.agents[slices])
# TODO Exclude in every area inefficient
area_iters = [_IterArea(area, exclude=agent) for area in areas]
# TODO Can only be iterated on once
return AgentIter(self.model,
itertools.chain.from_iterable(area_iters))
# Case 2: Non-toroidal
else:
slices = tuple(slice(p-distance if p-distance >= 0 else 0,
p+distance+1) for p in pos)
area = self.grid.agents[slices]
# Iterator over all agents in area, exclude original agent
return AgentIter(self.model, _IterArea(area, exclude=agent))
# Fields and attributes ------------------------------------------------- #
[docs] def apply(self, func, field='agents'):
""" Applies a function to each grid position,
end returns an `numpy.ndarray` of return values.
Arguments:
func (function): Function that takes cell content as input.
field (str, optional): Field to use (default 'agents').
"""
return np.vectorize(func)(self.grid[field])
[docs] def attr_grid(self, attr_key, otypes='f', field='agents'):
""" Returns a grid with the value of the attribute of the agent
in each position, using :class:`numpy.vectorize`.
Positions with no agent will contain `numpy.nan`.
Should only be used for grids with zero or one agents per cell.
Other kinds of attribute grids can be created with :func:`Grid.apply`.
Arguments:
attr_key (str): Name of the attribute.
otypes (str or list of dtypes, optional):
Data type of returned grid (default float).
For more information, see :class:`numpy.vectorize`.
field (str, optional): Field to use (default 'agents').
"""
f = np.vectorize(
lambda x: getattr(next(iter(x)), attr_key) if x else np.nan,
otypes=otypes)
return f(self.grid[field])
[docs] def add_field(self, key, values=None):
"""
Add an attribute field to the grid.
Arguments:
key (str):
Name of the field.
values (optional):
Single value or :class:`numpy.ndarray`
of values (default None).
"""
if not isinstance(values, (np.ndarray, list)):
values = np.full(np.product(self.shape), fill_value=values)
if len(values.shape) > 1:
values = values.reshape(-1)
# Create attribute as a numpy field
self.grid = rfs.append_fields(
self.grid, key, values, usemask=False, asrecarray=True
).reshape(self.grid.shape)
# Create attribute as reference to field
setattr(self, key, self.grid[key])
[docs] def del_field(self, key):
"""
Delete a attribute field from the grid.
Arguments:
key (str): Name of the field.
"""
self.grid = rfs.drop_fields(
self.grid, key, usemask=False, asrecarray=True)
delattr(self, key)