"""
Agentpy Output Module
Content: DataDict class for output data
"""
import pandas as pd
import os
from os import listdir, makedirs
from os.path import getmtime, join
from SALib.analyze import sobol
from .tools import AttrDict, make_list, AgentpyError
import json
import numpy as np
class NpEncoder(json.JSONEncoder):
""" Adds support for numpy number formats to json. """
# By Jie Yang https://stackoverflow.com/a/57915246
def default(self, obj):
if isinstance(obj, np.integer):
return int(obj)
elif isinstance(obj, np.floating):
return float(obj)
elif isinstance(obj, np.ndarray):
return obj.tolist()
elif isinstance(obj, np.bool_):
return bool(obj)
else:
return super(NpEncoder, self).default(obj)
def _last_exp_id(name, path):
""" Identifies existing experiment data and return highest id. """
output_dirs = listdir(path)
exp_dirs = [s for s in output_dirs if name in s]
if exp_dirs:
ids = [int(s.split('_')[-1]) for s in exp_dirs]
return max(ids)
else:
return None
# TODO Create DataSubDict without methods
[docs]class DataDict(AttrDict):
""" Nested dictionary for output data of simulations.
Items can be accessed like attributes.
Attributes can differ from the standard ones listed below.
Attributes:
info (dict):
Metadata of the simulation.
parameters (DataDict):
Simulation parameters.
variables (DataDict):
Recorded variables, separatedper object type.
reporters (pandas.DataFrame):
Reported outcomes of the simulation.
sensitivity (DataDict):
Sensitivity data, if calculated.
"""
def __repr__(self, indent=False):
rep = ""
if not indent:
rep += "DataDict {"
i = ' ' if indent else ''
for k, v in self.items():
rep += f"\n{i}'{k}': "
if isinstance(v, (int, float, np.integer, np.floating)):
rep += f"{v} {type(v)}"
elif isinstance(v, str):
x0 = f"(length {len(v)})"
x = f"...' {x0}" if len(v) > 20 else "'"
rep += f"'{v[:30]}{x} {type(v)}"
elif isinstance(v, pd.DataFrame):
lv = len(list(v.columns))
rv = len(list(v.index))
rep += f"DataFrame with {lv} " \
f"variable{'s' if lv != 1 else ''} " \
f"and {rv} row{'s' if rv != 1 else ''}"
elif isinstance(v, DataDict):
rep += f"{v.__repr__(indent=True)}"
elif isinstance(v, dict):
lv = len(list(v.keys()))
rep += f"Dictionary with {lv} key{'s' if lv != 1 else ''}"
elif isinstance(v, list):
lv = len(v)
rep += f"List with {lv} entr{'ies' if lv != 1 else 'y'}"
else:
rep += f"Object of type {type(v)}"
if not indent:
rep += "\n}"
return rep
def _short_repr(self):
len_ = len(self.keys())
return f"DataDict {{{len_} entr{'y' if len_ == 1 else 'ies'}}}"
def __eq__(self, other):
""" Check equivalence of two DataDicts."""
if not isinstance(other, DataDict):
return False
for key, item in self.items():
if key not in other:
return False
if isinstance(item, pd.DataFrame):
if not self[key].equals(other[key]):
return False
elif not self[key] == other[key]:
return False
return True
def __ne__(self, other):
return not self.__eq__(other)
# Data analysis --------------------------------------------------------- #
@staticmethod
def _sobol_set_df_index(df, p_keys, reporter):
df['parameter'] = p_keys
df['reporter'] = reporter
df.set_index(['reporter', 'parameter'], inplace=True)
[docs] def calc_sobol(self, reporters=None, **kwargs):
""" Calculates Sobol Sensitivity Indices
using :func:`SALib.analyze.sobol.analyze`.
Data must be from an :class:`Experiment` with a :class:`Sample`
that was generated with the method 'saltelli'.
If the experiment had more than one iteration,
the mean value between iterations will be taken.
Arguments:
reporters (str or list of str, optional): The reporters that should
be used for the analysis. If none are passed,
all existing reporters except 'seed' are used.
**kwargs: Will be forwarded to :func:`SALib.analyze.sobol.analyze`.
Returns:
DataDict: The DataDict itself with an added category 'sensitivity'.
"""
if not self.parameters.log['type'] == 'saltelli':
raise AgentpyError("Sampling method must be 'saltelli'.")
if self.info['iterations'] == 1:
reporters_df = self.reporters
else:
reporters_df = self.reporters.groupby('sample_id').mean()
# STEP 1 - Load salib problem from parameter log
param_ranges_salib = self.parameters.log['salib_problem']
calc_second_order = self.parameters.log['calc_second_order']
# STEP 2 - Calculate Sobol Sensitivity Indices
if reporters is None:
reporters = reporters_df.columns
if 'seed' in reporters:
reporters = reporters.drop('seed')
elif isinstance(reporters, str):
reporters = [reporters]
p_keys = self._combine_pars(sample=True, constants=False).keys()
dfs_list = [[] for _ in range(4 if calc_second_order else 2)]
for reporter in reporters:
y = np.array(reporters_df[reporter])
si = sobol.analyze(param_ranges_salib, y, calc_second_order, **kwargs)
# Make dataframes out of S1 and ST sensitivities
keyss = [['S1', 'ST'], ['S1_conf', 'ST_conf']]
for keys, dfs in zip(keyss, dfs_list[0:2]):
s = {k[0:2]: v for k, v in si.items() if k in keys}
df = pd.DataFrame(s)
self._sobol_set_df_index(df, p_keys, reporter)
dfs.append(df)
# Make dataframes out S2 sensitivities
if calc_second_order:
for key, dfs in zip(['S2', 'S2_conf'], dfs_list[2:4]):
df = pd.DataFrame(si[key])
self._sobol_set_df_index(df, p_keys, reporter)
dfs.append(df)
# Combine dataframes for each reporter
self['sensitivity'] = sdict = DataDict()
sdict['sobol'] = pd.concat(dfs_list[0])
sdict['sobol_conf'] = pd.concat(dfs_list[1])
if calc_second_order:
# Add Second-Order to self
dfs_si = [sdict['sobol'], pd.concat(dfs_list[2])]
dfs_si_conf = [sdict['sobol_conf'], pd.concat(dfs_list[3])]
sdict['sobol'] = pd.concat(dfs_si, axis=1)
sdict['sobol_conf'] = pd.concat(dfs_si_conf, axis=1)
# Create Multi-Index for Columns
arrays = [["S1", "ST"] + ["S2"] * len(p_keys), [""] * 2 + list(p_keys)]
tuples = list(zip(*arrays))
index = pd.MultiIndex.from_tuples(tuples, names=["order", "parameter"])
sdict['sobol'].columns = index
sdict['sobol_conf'].columns = index.copy()
return self
# Data arrangement ------------------------------------------------------ #
def _combine_vars(self, obj_types=True, var_keys=True):
""" Returns pandas dataframe with combined variables """
# Retrieve variables
if 'variables' in self:
vs = self['variables']
else:
return None
if len(vs.keys()) == 1:
return list(vs.values())[0] # Return df if vs has only one entry
elif isinstance(vs, DataDict):
df_dict = dict(vs) # Convert to dict if vs is DataDict
# Remove dataframes that don't include any of the selected var_keys
if var_keys is not True:
df_dict = {k: v for k, v in df_dict.items()
if any(x in v.columns for x in make_list(var_keys))}
# Select object types
if obj_types is not True:
df_dict = {k: v for k, v in df_dict.items()
if k in make_list(obj_types)}
# Add 'obj_id' before 't' for model df
model_type = self.info['model_type']
if model_type in list(df_dict.keys()):
df = df_dict[model_type]
df['obj_id'] = 0
indexes = list(df.index.names)
indexes.insert(-1, 'obj_id')
df = df.reset_index()
df = df.set_index(indexes)
df_dict[model_type] = df
# Return none if empty
if df_dict == {}:
return None
# Create dataframe
df = pd.concat(df_dict) # Dict keys (obj_type) will be added to index
df.index = df.index.set_names('obj_type', level=0) # Rename new index
# Select var_keys
if var_keys is not True:
# make_list prevents conversion to pd.Series for single value
df = df[make_list(var_keys)]
return df
def _dict_pars_to_df(self, dict_pars):
n = self.info['sample_size'] if 'sample_size' in self.info else 1
d = {k: [v] * n for k, v in dict_pars.items()}
i = pd.Index(list(range(n)), name='sample_id')
return pd.DataFrame(d, index=i)
def _combine_pars(self, sample=True, constants=True):
""" Returns pandas dataframe with parameters and sample_id """
# Cancel if there are no parameters
if 'parameters' not in self:
return None
dfp = pd.DataFrame()
if sample and 'sample' in self.parameters:
dfp = self.parameters.sample.copy()
if constants and 'constants' in self.parameters:
for k, v in self.parameters.constants.items():
dfp[k] = v
elif constants and 'constants' in self.parameters:
dfp = self._dict_pars_to_df(self.parameters.constants)
# Cancel if no parameters have been selected
if dfp is None or dfp.empty is True:
return None
# Remove seed parameter as the actually used seed is reported per run
if 'seed' in dfp:
del dfp['seed']
return dfp
[docs] def arrange(self, variables=False, reporters=False, parameters=False,
constants=False, obj_types=True, index=False):
""" Combines and/or filters data based on passed arguments.
Arguments:
variables (bool or str or list of str, optional):
Key or list of keys of variables to include in the dataframe.
If True, all available variables are selected.
If False (default), no variables are selected.
reporters (bool or str or list of str, optional):
Key or list of keys of reporters to include in the dataframe.
If True, all available reporters are selected.
If False (default), no reporters are selected.
parameters (bool or str or list of str, optional):
Key or list of keys of parameters to include in the dataframe.
If True, all non-constant parameters are selected.
If False (default), no parameters are selected.
constants (bool, optional):
Include constants if 'parameters' is True (default False).
obj_types (str or list of str, optional):
Agent and/or environment types to include in the dataframe.
If True (default), all objects are selected.
If False, no objects are selected.
index (bool, optional):
Whether to keep original multi-index structure (default False).
Returns:
pandas.DataFrame: The newly arranged dataframe.
"""
dfv = dfm = dfp = df = None
# Step 1: Variables
if variables is not False:
dfv = self._combine_vars(obj_types, variables)
# Step 2: Measures
if reporters is not False:
dfm = self.reporters
if reporters is not True: # Select reporter keys
# make_list prevents conversion to pd.Series for single value
dfm = dfm[make_list(reporters)]
# Step 3: Parameters
if parameters is True:
dfp = self._combine_pars(constants=constants)
elif parameters is not False:
dfp = self._combine_pars()
dfp = dfp[make_list(parameters)]
# Step 4: Combine dataframes
if dfv is not None and dfm is not None:
# Combine variables & measures
index_keys = dfv.index.names
dfm = dfm.reset_index()
dfv = dfv.reset_index()
df = pd.concat([dfm, dfv])
df = df.set_index(index_keys)
elif dfv is not None:
df = dfv
elif dfm is not None:
df = dfm
if dfp is not None:
if df is None:
df = dfp
else: # Combine df with parameters
if df is not None and isinstance(df.index, pd.MultiIndex):
dfp = dfp.reindex(df.index, level='sample_id')
df = pd.concat([df, dfp], axis=1)
if df is None:
return pd.DataFrame()
# Step 6: Reset index
if not index:
df = df.reset_index()
return df
[docs] def arrange_reporters(self):
""" Common use case of :obj:`DataDict.arrange`
with `reporters=True` and `parameters=True`. """
return self.arrange(variables=False, reporters=True, parameters=True)
[docs] def arrange_variables(self):
""" Common use case of :obj:`DataDict.arrange`
with `variables=True` and `parameters=True`. """
return self.arrange(variables=True, reporters=False, parameters=True)
# Saving and loading data ----------------------------------------------- #
[docs] def save(self, exp_name=None, exp_id=None, path='ap_output', display=True):
""" Writes data to directory `{path}/{exp_name}_{exp_id}/`.
Works only for entries that are of type :class:`DataDict`,
:class:`pandas.DataFrame`, or serializable with JSON
(int, float, str, dict, list). Numpy objects will be converted
to standard objects, if possible.
Arguments:
exp_name (str, optional): Name of the experiment to be saved.
If none is passed, `self.info['model_type']` is used.
exp_id (int, optional): Number of the experiment.
Note that passing an existing id can overwrite existing data.
If none is passed, a new id is generated.
path (str, optional): Target directory (default 'ap_output').
display (bool, optional): Display saving progress (default True).
"""
# Create output directory if it doesn't exist
if path not in listdir():
makedirs(path)
# Set exp_name
if exp_name is None:
if 'info' in self and 'model_type' in self.info:
exp_name = self.info['model_type']
else:
exp_name = 'Unnamed'
exp_name = exp_name.replace(" ", "_")
# Set exp_id
if exp_id is None:
exp_id = _last_exp_id(exp_name, path)
if exp_id is None:
exp_id = 1
else:
exp_id += 1
# Create new directory for output
directory = f'{exp_name}_{exp_id}'
path_dir = f'{path}/{directory}'
if directory not in listdir(path):
makedirs(path_dir)
# Save experiment data
for key, output in self.items():
if isinstance(output, pd.DataFrame):
output.to_csv(f'{path_dir}/{key}.csv')
elif isinstance(output, DataDict):
for k, o in output.items():
if isinstance(o, pd.DataFrame):
o.to_csv(f'{path_dir}/{key}_{k}.csv')
elif isinstance(o, dict):
with open(f'{path_dir}/{key}_{k}.json', 'w') as fp:
json.dump(o, fp, cls=NpEncoder)
else: # Use JSON for other object types
try:
with open(f'{path_dir}/{key}.json', 'w') as fp:
json.dump(output, fp, cls=NpEncoder)
except TypeError as e:
print(f"Warning: Object '{key}' could not be saved. "
f"(Reason: {e})")
os.remove(f'{path_dir}/{key}.json')
# TODO Support grids & graphs
# elif t == nx.Graph:
# nx.write_graphml(output, f'{path}/{key}.graphml')
if display:
print(f"Data saved to {path_dir}")
def _load(self, exp_name=None, exp_id=None,
path='ap_output', display=True):
def load_file(path, file, display):
if display:
print(f'Loading {file} - ', end='')
i_cols = ['sample_id', 'iteration', 'obj_id', 't']
ext = file.split(".")[-1]
path = path + file
try:
if ext == 'csv':
obj = pd.read_csv(path) # Convert .csv into DataFrane
index = [i for i in i_cols if i in obj.columns]
if index: # Set potential index columns
obj = obj.set_index(index)
elif ext == 'json':
# Convert .json with json decoder
with open(path, 'r') as fp:
obj = json.load(fp)
# Convert dict to AttrDict
if isinstance(obj, dict):
obj = AttrDict(obj)
# TODO Support grids & graphs
# elif ext == 'graphml':
# self[key] = nx.read_graphml(path)
else:
raise ValueError(f"File type '{ext}' not supported")
if display:
print('Successful')
return obj
except Exception as e:
print(f'Error: {e}')
# Prepare for loading
if exp_name is None:
# Choose latest modified experiment
exp_names = listdir(path)
paths = [join(path, d) for d in exp_names]
latest_exp = exp_names[paths.index(max(paths, key=getmtime))]
exp_name = latest_exp.rsplit('_', 1)[0]
exp_name = exp_name.replace(" ", "_")
if exp_id is None:
exp_id = _last_exp_id(exp_name, path)
if exp_id is None:
raise FileNotFoundError(f"No experiment found with "
f"name '{exp_name}' in path '{path}'")
path = f'{path}/{exp_name}_{exp_id}/'
if display:
print(f'Loading from directory {path}')
# Loading data
for file in listdir(path):
if 'variables_' in file:
if 'variables' not in self:
self['variables'] = DataDict()
ext = file.split(".")[-1]
key = file[:-(len(ext) + 1)].replace('variables_', '')
self['variables'][key] = load_file(path, file, display)
elif 'parameters_' in file:
ext = file.split(".")[-1]
key = file[:-(len(ext) + 1)].replace('parameters_', '')
if 'parameters' not in self:
self['parameters'] = DataDict()
self['parameters'][key] = load_file(path, file, display)
else:
ext = file.split(".")[-1]
key = file[:-(len(ext) + 1)]
self[key] = load_file(path, file, display)
return self
[docs] @classmethod
def load(cls, exp_name=None, exp_id=None, path='ap_output', display=True):
""" Reads data from directory `{path}/{exp_name}_{exp_id}/`.
Arguments:
exp_name (str, optional): Experiment name.
If none is passed, the most recent experiment is chosen.
exp_id (int, optional): Id number of the experiment.
If none is passed, the highest available id used.
path (str, optional): Target directory (default 'ap_output').
display (bool, optional): Display loading progress (default True).
Returns:
DataDict: The loaded data from the chosen experiment.
"""
return cls()._load(exp_name, exp_id, path, display)