Source code for agentpy.experiment

"""
Agentpy Experiment Module
Content: Experiment class
"""

import warnings
import pandas as pd
import random as rd

from os import sys

from .version import __version__
from datetime import datetime, timedelta
from .tools import make_list
from .datadict import DataDict
from .sample import Sample, Range, IntRange, Values
from joblib import Parallel,delayed


[docs]class Experiment: """ Experiment that can run an agent-based model over for multiple iterations and parameter combinations and generate combined output data. Arguments: model (type): The model class for the experiment to use. sample (dict or list of dict or Sample, optional): Parameter combination(s) to test in the experiment (default None). iterations (int, optional): How often to repeat every parameter combination (default 1). record (bool, optional): Keep the record of dynamic variables (default False). randomize (bool, optional): Generate different random seeds for every iteration (default True). If True, the parameter 'seed' will be used to initialize a random seed generator for every parameter combination in the sample. If False, the same seed will be used for every iteration. If no parameter 'seed' is defined, this option has no effect. For more information, see :doc:`guide_random` . **kwargs: Will be forwarded to all model instances created by the experiment. Attributes: output(DataDict): Recorded experiment data """ def __init__(self, model_class, sample=None, iterations=1, record=False, randomize=True, **kwargs): self.model = model_class self.output = DataDict() self.iterations = iterations self.record = record self._model_kwargs = kwargs self.name = model_class.__name__ # Prepare sample if isinstance(sample, Sample): self.sample = list(sample) self._sample_log = sample._log else: self.sample = make_list(sample, keep_none=True) self._sample_log = None # Prepare runs len_sample = len(self.sample) iter_range = range(iterations) if iterations > 1 else [None] sample_range = range(len_sample) if len_sample > 1 else [None] self.run_ids = [(sample_id, iteration) for sample_id in sample_range for iteration in iter_range] self.n_runs = len(self.run_ids) # Prepare seeds if randomize and sample is not None \ and any(['seed' in p for p in self.sample]): if len_sample > 1: rngs = [rd.Random(p['seed']) if 'seed' in p else rd.Random() for p in self.sample] self._random = { (sample_id, iteration): rngs[sample_id].getrandbits(128) for sample_id in sample_range for iteration in iter_range } else: p = list(self.sample)[0] seed = p['seed'] ranges = (Range, IntRange, Values) if isinstance(seed, ranges): seed = seed.vdef rng = rd.Random(seed) self._random = { (None, iteration): rng.getrandbits(128) for iteration in iter_range } else: self._random = None # Prepare output self.output.info = { 'model_type': model_class.__name__, 'time_stamp': str(datetime.now()), 'agentpy_version': __version__, 'python_version': sys.version[:5], 'experiment': True, 'scheduled_runs': self.n_runs, 'completed': False, 'random': randomize, 'record': record, 'sample_size': len(self.sample), 'iterations': iterations } self._parameters_to_output() def _parameters_to_output(self): """ Document parameters (separately for fixed & variable). """ df = pd.DataFrame(self.sample) df.index.rename('sample_id', inplace=True) fixed_pars = {} for col in df.columns: s = df[col] if len(s.unique()) == 1: fixed_pars[s.name] = df[col][0] df.drop(col, inplace=True, axis=1) self.output['parameters'] = DataDict() if fixed_pars: self.output['parameters']['constants'] = fixed_pars if not df.empty: self.output['parameters']['sample'] = df if self._sample_log: self.output['parameters']['log'] = self._sample_log @staticmethod def _add_single_output_to_combined(single_output, combined_output): """Append results from single run to combined output. Each key in single_output becomes a key in combined_output. DataDicts entries become dicts with lists of values. Other entries become lists of values. """ for key, value in single_output.items(): if key in ['parameters', 'info']: # Skip parameters & info continue if isinstance(value, DataDict): # Handle subdicts if key not in combined_output: # New key combined_output[key] = {} # as dict for obj_type, obj_df in single_output[key].items(): if obj_type not in combined_output[key]: # New subkey combined_output[key][obj_type] = [] # as list combined_output[key][obj_type].append(obj_df) else: # Handle other output types if key not in combined_output: # New key combined_output[key] = [] # as list combined_output[key].append(value) def _combine_dataframes(self, combined_output): """ Combines data from combined output. Dataframes are combined with concat. Dicts are transformed to DataDict. Other objects are kept as original. Combined data is written to self.output. """ for key, values in combined_output.items(): if values and all([isinstance(value, pd.DataFrame) for value in values]): self.output[key] = pd.concat(values) # Df are combined elif isinstance(values, dict): # Dict is transformed to DataDict self.output[key] = DataDict() for sk, sv in values.items(): if all([isinstance(v, pd.DataFrame) for v in sv]): self.output[key][sk] = pd.concat(sv) # Df are combined else: # Other objects are kept as original TODO TESTS self.output[key][sk] = sv elif key != 'info': # Other objects are kept as original TODO TESTS self.output[key] = values def _single_sim(self, run_id): """ Perform a single simulation.""" sample_id = 0 if run_id[0] is None else run_id[0] parameters = self.sample[sample_id] model = self.model(parameters, _run_id=run_id, **self._model_kwargs) if self._random: results = model.run(display=False, seed=self._random[run_id]) else: results = model.run(display=False) if 'variables' in results and self.record is False: del results['variables'] # Remove dynamic variables from record return results # TODO AgentPy 0.2.0 - Remove pool argument
[docs] def run(self, n_jobs=1, pool=None, display=True, **kwargs): """ Perform the experiment. The simulation will run the model once for each set of parameters and will repeat this process for the set number of iterations. Simulation results will be stored in `Experiment.output`. Parallel processing is supported based on :func:`joblib.Parallel`. Arguments: n_jobs (int, optional): Number of processes to run in parallel (default 1). If 1, no parallel processing is used. If -1, all CPUs are used. Will be forwarded to :func:`joblib.Parallel`. pool (multiprocessing.Pool, optional): [This argument is depreciated. Please use 'n_jobs' instead.] Pool of active processes for parallel processing. If none is passed, normal processing is used. display (bool, optional): Display simulation progress (default True). **kwargs: Additional keyword arguments for :func:`joblib.Parallel`. Returns: DataDict: Recorded experiment data. Examples: To run a normal experiment:: exp = ap.Experiment(MyModel, parameters) results = exp.run() To use parallel processing on all CPUs with status updates:: exp = ap.Experiment(MyModel, parameters) results = exp.run(n_jobs=-1, verbose=10) """ if display: n_runs = self.n_runs print(f"Scheduled runs: {n_runs}") t0 = datetime.now() # Time-Stamp Start combined_output = {} # Parallel processing with joblib if n_jobs != 1: # output_list = pool.map(self._single_sim, self.run_ids) output_list = Parallel(n_jobs=n_jobs, **kwargs)( delayed(self._single_sim)(i) for i in self.run_ids) for single_output in output_list: self._add_single_output_to_combined( single_output, combined_output) # Normal processing elif pool is None: i = -1 for run_id in self.run_ids: self._add_single_output_to_combined( self._single_sim(run_id), combined_output) if display: i += 1 td = (datetime.now() - t0).total_seconds() te = timedelta(seconds=int(td / (i + 1) * (n_runs - i - 1))) print(f"\rCompleted: {i + 1}, " f"estimated time remaining: {te}", end='') if display: print("") # Because the last print ended without a line-break # Parallel processing with multiprocessing (TODO to depreciate) else: warnings.warn( "The argument 'pool' in Experiment.run() is depreciated. " "Please use 'n_jobs' instead.") if display: print(f"Using parallel processing.") print(f"Active processes: {pool._processes}") output_list = pool.map(self._single_sim, self.run_ids) #Parallel(n_jobs=num_cores)(delayed(job)(BoidsModel,5,i) for i in tqdm(sample)) #Parallel(n_jobs=1)(delayed(sqrt)(i**2) for i in range(10)) for single_output in output_list: self._add_single_output_to_combined( single_output, combined_output) self._combine_dataframes(combined_output) self.end() self.output.info['completed'] = True self.output.info['run_time'] = ct = str(datetime.now() - t0) if display: print(f"Experiment finished\nRun time: {ct}") return self.output
[docs] def end(self): """ Defines the experiment's actions after the last simulation. Can be overwritten for final calculations and reporting.""" pass