Source code for simply.scenario

import os
import json
import warnings
from typing import Sized, Iterable
from networkx.readwrite import json_graph
import pandas as pd
import numpy as np
import random
import matplotlib
import matplotlib.pyplot as plt
from concurrent.futures import ThreadPoolExecutor, as_completed

import simply.config as cfg
from simply import actor, market_maker
from simply import power_network
# from simply.battery import Battery
from simply.util import get_all_data, timeit
from simply.market_maker import MarketMaker
from simply.actor import Actor
from simply.market import Market
from simply.util import run_obj_method
from simply.defaults import MARKETID

try:
    matplotlib.use('TkAgg')
except ImportError as e:
    print(e)
debug_actor = None  # 'residential_3'


[docs]class Environment: """Representation of the environment which is visible to all actors. Decouples scenario information from actors. :param iterable buy_prices: iterable of prices the market maker would buy energy for :param int steps_per_hour: amount of simulation steps per hour :param function add_actor_to_scenario: Function which adds the actor to py:attr:`~simply.scenario.actors` :param dict kwargs: kwargs for MarketMaker generation Members: time_step : int current time_step of the environment steps_per_hour : int amount of simulation steps per hour add_actor_to_scenario : function Function which adds the actor to py:attr:`~simply.scenario.actors` time_range : pd.DatetimeIndex / int index that is used for all outputs and data indexing; if it is not provided an int range is used based on the config get_grid_fee : method getter function of grid_fee of the Market market_maker : py:class:`~simply.market_maker.MarketMaker` market_maker in this environment (deprecated) market_makers : list(py:class:`~simply.market_maker.MarketMaker`) list of market_maker objects of this environment """ def __init__(self, steps_per_hour, add_actor_to_scenario, time_range=None, **kwargs): self.time_step = cfg.config.start self.steps_per_hour = steps_per_hour if steps_per_hour is not None else cfg.config.ts_per_hour if time_range is None: self.time_range = range(cfg.config.start + cfg.config.nb_ts + 1) elif isinstance(time_range, pd.DatetimeIndex): # Check for correct datetime frequency if time_range.freq is None: time_range.freq = pd.infer_freq(time_range) print(f"Found date time index starting at {time_range[0]} " f"with freq {time_range.freq}") if time_range.freq != "{}T".format(60 / self.steps_per_hour): warnings.warn(f"Time Index of data frequency {str(time_range.freq)} " f"does not match the configured steps_per_hour: {self.steps_per_hour}") self.time_range = time_range else: self.time_range = time_range self.add_actor_to_scenario = add_actor_to_scenario # Get grid fee method of market to make grid fees accessible for actors. Will be overwritten # when market is added to scenario self.get_grid_fee = None # is instance of Market().get_grid_fee self.get_grid_fee_dict = {} self.market_makers = {}
[docs]def is_scenario_participant(obj): if isinstance(obj, Actor): return True if isinstance(obj, MarketMaker): return True return False
[docs]class Scenario: """ Representation of the world state: who is present (actors) and how everything is connected (power_network). RNG (random number generator) seed is preserved so results can be reproduced. """ def __init__(self, network, map_actors=None, buy_prices: np.array = None, rng_seed=None, steps_per_hour=None, time_range=None, **kwargs): self.rng_seed = rng_seed if rng_seed is not None else random.getrandbits(32) random.seed(self.rng_seed) self._market = None self.market_dict = {} self.power_network: power_network.PowerNetwork = network self.market_participants = list() # maps node ids to actors if map_actors is None: map_actors = {} self.map_actors: dict = map_actors self.kwargs = kwargs self.environment = Environment(steps_per_hour, self.add_participant, time_range, **kwargs) if buy_prices is None: buy_prices = np.array(()) else: buy_prices = np.array(buy_prices) self.add_market_maker(buy_prices, **kwargs) if "market" in kwargs.keys(): self.set_market(kwargs["market"])
[docs] def add_market_maker(self, buy_prices: Sized, **kwargs): if len(buy_prices) == 0: warnings.warn("Environment was created without a market maker since no buy_prices, " "were provided.") else: # Create the Market maker. Since the environment is passed the MarketMaker automatically # adds itself to the environment and also to the scenario participants MarketMaker(buy_prices=buy_prices, environment=self.environment, **kwargs)
[docs] def get_market(self): return self._market
[docs] def set_market(self, market): assert isinstance(market, Market), "Scenario.market can only be changed to type 'Market'" self._market = market self._market.t_step = self.environment.time_step self.environment.get_grid_fee = self._market.get_grid_fee
# creating a property object. This way changing markets also leads to changes in grid fee market = property(get_market, set_market)
[docs] def add_participants(self, participants: Iterable, map_actors=None, add_to_network=False): """Add participants to the scenario. If added to the network nodes, this is either done using the provided mapping dictionary or randomly. :param participants: participants to be added :type participants: simply.actor.Actor or simply.scenario.MarketMaker :param map_actors: dictionary with actor_ids as key and node_id as value :param add_to_network: should the actors be added to the power network :type add_to_network: bool :return: """ actors = list(filter(lambda x: isinstance(x, Actor), participants)) if map_actors is None: if add_to_network: # Add the only actors randomly to the power network map_actors = self.power_network.add_actors_random(actors) else: map_actors = {} else: # Make sure there are as many unique actor_ids as actors actor_ids = [actor_.id for actor_ in actors] assert len(actors) == len(set(actor_ids)) # Make sure every actor is found exactly once in map_actors and map_actors does not # contain not used actors assert set(actor_ids) == set(map_actors.keys()) if add_to_network: self.power_network.add_actors_map(map_actors) # Only update the node mapping of the provided actors self.map_actors.update(map_actors) for participant in participants: self._add_participant(participant) mm_list = [x for x in self.market_participants if isinstance(x, MarketMaker)] if mm_list != 0: print(f" + Added {len(mm_list)} MarketMakers to the Scenario.") print(f" + Added {len(actors)} Actors to the Scenario.")
[docs] def add_participant(self, participant, map_node=None, add_to_network=False): self._add_participant(participant) map_actors = {} if map_node is None: if add_to_network: map_actors = self.power_network.add_actors_random([participant]) else: map_actors = {participant.id: map_node} if add_to_network: _ = self.power_network.add_actors_map(map_actors) self.map_actors.update(map_actors)
def _add_participant(self, participant): assert is_scenario_participant(participant) if participant not in self.market_participants: if isinstance(participant, MarketMaker): self.environment.market_makers[participant.id] = participant self.market_participants.append(participant) else: warnings.warn(f"Participant {participant} is already part of the scenario, and was " f"not added again.") participant.environment = self.environment participant.create_prediction() @timeit def create_strategies(self, max_workers=None, update_step=1): # only actors create strategies (in parallel) actors = [p for p in self.market_participants if isinstance(p, Actor)] if not actors: return if self.environment.time_step % update_step != 0: for a in actors: a.shift_market_schedule() return if max_workers is None: max_workers = os.cpu_count() or 1 # Parallel: create schedule per actor and update object in main process # - cbc is running as external program which is why ThreadPoolExecutor is sufficient # (the use of ProcessPoolExecutor even leads to longer execution time due to pickling # overhead) # if process pool is used the values have to be updated due to separate memory process_execution = False with ThreadPoolExecutor(max_workers=max_workers) as ex: futs = {ex.submit(run_obj_method, a, "get_market_schedule"): a for a in actors} if process_execution: raise NotImplementedError for fut in as_completed(futs): actor = futs[fut] market_schedule = fut.result() # writing back the result due to mutability # (only necessary for Process Execution) # TODO check for missing updates other than market_schedule actor.market_schedule = market_schedule @timeit def create_strategies_sequential(self, update_step=1): # sequential execution of market_schedule creation actors = [p for p in self.market_participants if isinstance(p, Actor)] if self.environment.time_step % update_step != 0: for a in actors: a.shift_market_schedule() return else: for a in actors: a.get_market_schedule()
[docs] def add_market(self, market): self.market = market self.sync_market_time()
[docs] def add_to_market_dict(self, market): assert isinstance(market, Market), "Only Instances of class 'Market' can be added to Scenario.market_dict" if market.name in self.market_dict.keys(): warnings.warn(f"Market named {market.name} already exists.") self.market_dict[market.name] = market self.market_dict[market.name].t_step = self.environment.time_step self.market_dict[market.name].t_step = self.environment.time_range[self.environment.time_step] self.market_dict[market.name].step = self.environment.time_step self.environment.get_grid_fee_dict[market.name] = self.market_dict[market.name].get_grid_fee
[docs] def sync_market_time(self): for market in self.market_dict.values(): market.t_step = self.environment.time_range[self.environment.time_step] market.step = self.environment.time_step
@timeit def market_step(self): for participant in self.market_participants: orders = participant.generate_orders() for order in orders: if isinstance(participant, Actor): self.market_dict[participant.assigned_market].accept_order( order, callback=participant.receive_market_results) else: # MarketMaker for market in participant.assigned_market: self.market_dict[market].accept_order(order, callback=participant.receive_market_results) if debug_actor: print([order for order in orders if "MarketMaker" != order.actor_id]) for m in self.market_dict.values(): print(m.orders) for market in self.market_dict.values(): market.clear(reset=cfg.config.reset_market) if debug_actor: for mark in self.market_dict.values(): print([m for ma in mark.matches for m in ma if m["time"] == self.environment.time_step]) print([m for matches in mark.matches for m in matches if m["time"] == self.environment.time_step and (m["bid_actor"] == debug_actor or m["ask_actor"] == debug_actor)])
[docs] def next_time_step(self): for participant in self.market_participants: participant.prepare_next_time_step() if isinstance(self.environment.time_step, int): self.environment.time_step += 1 else: raise TypeError("Expect time_step to be of type: int") for participant in self.market_participants: participant.create_prediction() self.sync_market_time()
[docs] def from_config(self): pass
def __str__(self): return "Scenario(network: {}, actors: {}, map_actors: {})".format( self.power_network, self.market_participants, self.map_actors )
[docs] def to_dict(self): return { "rng_seed": self.rng_seed, "power_network": {self.power_network.name: self.power_network.to_dict()}, "actors": {a.id: a.to_dict() for a in self.market_participants}, "map_actors": self.map_actors, }
[docs] def save(self, dirpath, data_format): """ Save scenario files to directory dirpath: Path object """ # create target directory dirpath.mkdir(parents=True, exist_ok=True) # save meta information dirpath.joinpath('_meta.inf').write_text(json.dumps({"rng_seed": self.rng_seed}, indent=2)) # save power network dirpath.joinpath('network.json').write_text( json.dumps( {self.power_network.name: self.power_network.to_dict()}, indent=2, default=serialize_int64 ) ) # save actors if data_format == "csv": # Save data in separate csv file and all actors in one config file a_dict = {} m_dict = {} for participant in self.market_participants: if isinstance(participant, Actor): a_dict[participant.id] = participant.to_dict(external_data=True) if isinstance(participant, MarketMaker): m_dict[participant.id] = participant.to_dict(external_data=True) participant.save_csv(dirpath) dirpath.joinpath('actors.json').write_text( json.dumps({"actors": a_dict, "marketMakers": m_dict}, indent=2, default=serialize_int64)) else: # Save config and data per actor in a single file for participant in self.market_participants: dirpath.joinpath(f'actor_{participant.id}.{data_format}').write_text( json.dumps(participant.to_dict(external_data=False), indent=2, default=serialize_int64) ) # save map_actors dirpath.joinpath('map_actors.json').write_text(json.dumps(self.map_actors, indent=2)) self.power_network.to_image(dirpath)
[docs] def save_additional_results(self, dirpath): for a in list(filter(lambda x: isinstance(x, Actor), self.market_participants)): a.save_actor_result(dirpath / f"actor_{a.id}.csv") print("Additional actor results saved.")
[docs] def track_actor_schedule(self, dirpath, actor_id): for a in list(filter(lambda x: isinstance(x, Actor), self.market_participants)): if a.id == actor_id: a.save_actor_schedule(dirpath / f"actor_{a.id}_schedule.csv") print(f"Tracking actor {actor_id} schedule (saved)") return
[docs] def concat_actors_data(self): """ Create a list of all actor data DataFrames and concatenate them using multi-column keys :return: DataFrame with multi-column-index (actor-level, asset-level) """ actors = list(filter(lambda x: isinstance(x, Actor), self.market_participants)) data = [a.data for a in actors] return pd.concat(data, keys=[actor_.id for actor_ in actors], axis=1)
[docs] def plot_participant_data(self): """ Extracts asset data from all actors of the scenario and plots all time series per asset type as well as the aggregated sum per asset. """ actor_data = self.concat_actors_data() fig, ax = plt.subplots(3, sharex=True) ax[0].set_title("PV") ax[1].set_title("Load") ax[2].set_title("Sum") pv_df = get_all_data(actor_data, "pv") load_df = get_all_data(actor_data, "load") mask = load_df.apply(lambda x: abs(x) == 2 ** 63 - 1) pv_df.plot(ax=ax[0], legend=False) load_df[~mask].plot(ax=ax[1], legend=False) pv_df.sum(axis=1).plot(ax=ax[2]) load_df[~mask].sum(axis=1).plot(ax=ax[2]) ax[2].legend(["pv", "load"]) plt.show()
[docs] def plot_prices(self, market_name=MARKETID): for mm_name, mm in self.environment.market_makers.items(): fig, ax = plt.subplots(1, sharex=True) ax = [ax] ax[0].plot([p + cfg.config.default_grid_fee for p in mm.all_sell_prices]) ax[0].plot(mm.all_buy_prices) ax[0].set_title(mm_name) plt.show()
[docs] def reset(self): """ Reset the scenario after a simulation is run""" # Reset the time step for the environment and all markets self.environment.time_step = cfg.config.start for m in self.market_dict.values(): m.t_step = self.environment.time_step m.reset() # Remove previous participants self.market_participants = [] # Store the old market maker # adaptation for multi market makers market_makers = self.environment.market_makers for mm in market_makers.values(): mm.reset() self.add_participant(mm)
[docs]def serialize_int64(obj): if isinstance(obj, np.int64): return int(obj) raise TypeError("Type %s is not serializable" % type(obj))
[docs]def from_dict(scenario_dict): pn_name, pn_dict = scenario_dict["power_network"].popitem() assert len(scenario_dict["power_network"]) == 0, "Multiple power networks in scenario" network = json_graph.node_link_graph(pn_dict, directed=pn_dict.get("directed", False), multigraph=pn_dict.get("multigraph", False)) pn = power_network.PowerNetwork(pn_name, network) scen = Scenario(pn, scenario_dict["map_actors"], scenario_dict["rng_seed"]) for actor_id, ai in scenario_dict["actors"].items(): actor_ = actor.Actor(actor_id, pd.read_json(ai["df"]), ai["ls"], ai["ps"], ai["pm"]) scen.add_participant(actor_) return scen
[docs]def load(dirpath, data_format): """ Create scenario from files that were generated by Scenario.save() :param dirpath: Path object :param data_format: File ending of actor data e.g. `csv` """ # read meta info meta_text = dirpath.joinpath('_meta.inf').read_text() meta = json.loads(meta_text) rng_seed = meta.get("rng_seed", None) pn = power_network.create_power_network_from_config( next(dirpath.glob('network.*')), weight_factor=cfg.config.weight_factor) markets_json = dirpath / "markets.json" if markets_json.is_file(): existing_markets = [] with open(markets_json) as f: for mc in json.load(f): existing_markets.append(mc["market_name"]) else: warnings.warn(f"No markets file found. Using default market: {MARKETID}.") existing_markets = [MARKETID] assert len(existing_markets) != 0, "At least one market has to be defined" # read actors participants = [] time_range = None market_list = [] mm_markets = {} # per maket maker: aqssigned_market from datetime import datetime if data_format == "csv": actors_file = next(dirpath.glob("actors.*")) at = actors_file.read_text() actors_j = json.loads(at) for aj in actors_j["actors"].values(): if aj["assignedMarket"] in existing_markets: # market.json is not empty aj["df"] = pd.read_csv(dirpath / aj["csv"], parse_dates=['Time'], dayfirst=False, index_col='Time') assert datetime.strptime(cfg.config.start_date, "%Y-%m-%d") in aj["df"].index time_range = aj["df"].index participant = actor.Actor(**aj) participants.append(participant) for mj in actors_j["marketMakers"].values(): # filter the assigned markets with the available markets i.e. from market.json filtered_market = list(set(existing_markets) & set(mj["assignedMarket"])) if len(filtered_market) > 0: mj["assignedMarket"] = filtered_market m_df = pd.read_csv(dirpath / mj["csv"]) mj["buy_prices"] = list(m_df["all_buy_prices"]) if "all_sell_prices" in m_df.columns: mj["sell_prices"] = m_df["all_sell_prices"] participant = market_maker.MarketMaker(**mj) market_list += participant.assigned_market mm_markets[participant.id] = participant.assigned_market participants.append(participant) # check that every market has only one market maker assigned assert len(set(market_list)) == len(market_list), 'markets with more than one assigned market maker' for p in participants: # check for every Actor, that its assigned market_maker actually trades in its assigned market if isinstance(p, Actor): assert p.assigned_market in mm_markets[p.assigned_mm], ( f"{p.id} is assigned to {p.assigned_market} and {p.assigned_mm}. " f"But this market maker does not trade on this market.") else: actor_files = dirpath.glob(f"actor_*.{data_format}") for f in sorted(actor_files): at = f.read_text() aj = json.loads(at) if aj["id"] == market_maker.MARKETMAKERID or 'market_maker' in aj["id"]: participant = market_maker.MarketMaker(**aj) else: aj["df"] = pd.read_json(aj["df"]) aj["df"] = aj["df"].set_index("Time") aj["df"].index = pd.to_datetime(aj["df"].index) assert datetime.strptime(cfg.config.start_date, "%Y-%m-%d") in aj["df"].index time_range = aj["df"].index participant = actor.Actor(**aj) participants.append(participant) # Give actors knowledge of the cluster they belong to for aj in participants: if aj.id in pn.node_to_cluster: aj.cluster = pn.node_to_cluster[aj.id] # read map_actors map_actor_text = next(dirpath.glob('map_actors.*')).read_text() map_actors = json.loads(map_actor_text) # Take last actor data index as representative time step index scenario = Scenario(pn, map_actors, rng_seed=rng_seed, time_range=time_range) scenario.add_participants(participants) # save applied grid fee matrix results_path = cfg.config.results_path # if the market_results directory does not already exist, create it if not results_path.exists() and results_path: results_path.mkdir() results_path.joinpath('used_grid_fee_matrix.inf').write_text(json.dumps(pn.grid_fee_matrix)) return scenario
[docs]def create_random(num_nodes, num_actors, weight_factor, nb_ts=100, horizon=24): # Create random nodes pn = power_network.create_random(num_nodes) # Update the shortest paths and the grid fee matrix pn.update_shortest_paths() pn.generate_grid_fee_matrix(weight_factor) mm_buy_prices = np.random.random(nb_ts+horizon) scenario = Scenario(pn, None, buy_prices=mm_buy_prices) actors = [actor.create_random("H" + str(i), nb_ts=nb_ts, horizon=horizon) for i in range(num_actors)] # Quick fix set Timestemps range scenario.environment.time_range = actors[0].data.index # Add actor nodes at random position (leaf node) in the network # One network node can contain several actors (using random.choices method) scenario.map_actors = pn.add_actors_random(actors) scenario.add_participants(actors) return scenario
[docs]def create_random2(num_nodes, num_actors, nb_ts=100, horizon=24): assert num_actors < num_nodes # num_actors has to be much smaller than num_nodes # Create random nodes pn = power_network.create_random(num_nodes) # Create random actors actors = [actor.create_random("H" + str(i), nb_ts=nb_ts, horizon=horizon) for i in range(num_actors)] # Add actor nodes at random position (leaf node) in the network # One selected network node (using random.sample method), directly represents a single actor # No nodes and edges are added to the network actor_nodes = random.sample(pn.leaf_nodes, num_actors) map_actors = {actor.id: node_id for actor, node_id in zip(actors, actor_nodes)} mm_buy_prices = np.random.random(nb_ts+horizon) scenario = Scenario(pn, map_actors, mm_buy_prices) scenario.add_participants(actors) return scenario
[docs]def create_scenario_from_csv(dirpath, num_nodes, num_actors, weight_factor, ts_hour=4, nb_ts=None, horizon=24): """ Load csv files from path and randomly select num_actors to be randomly :param dirpath: Path object :param num_nodes: number of nodes in the network :param num_actors: number of actors in the network :param weight_factor: weight factor used to derive grid fees :param ts_hour: number of time slot of equal length within one hour :param nb_ts: number of time slots to be generated :param horizon: number of time slots to look into future to make the prediction for actor strategy """ # Create random nodes in the power network pn = power_network.create_random(num_nodes) # Read all filenames from given directory filenames = dirpath.glob("*.csv") # Choose a random sample of files to read filenames = random.sample(list(filenames), num_actors) # Assign csv file to actor and save dictionary household_type = {} # create initial list of actors actors = [] # iterate over list of files to be read to update actors for i, filename in enumerate(filenames): # save actor_id and data description in list household_type.update({i: filename.stem}) print('actor_id: {} - household: {}'.format(i, household_type[i])) # read file a = actor.create_from_csv("H_" + str(i), asset_dict={ "load": {"csv": filename, "col_index": 1}, "pv": {} }, start_date="2016-01-01", nb_ts=nb_ts, horizon=horizon, ts_hour=ts_hour) actors.append(a) # Add actor nodes at random position (leaf node) in the network # One network node can contain several actors (using random.choices method) map_actors = pn.add_actors_random(actors) # Update the shortest paths and the grid fee matrix pn.update_shortest_paths() pn.generate_grid_fee_matrix(weight_factor) scenario = Scenario(pn, map_actors, steps_per_hour=ts_hour) scenario.add_participants(actors) return