import warnings
import pandas as pd
from pathlib import Path
import csv
import simply.config as cfg
from simply.actor import Order
from simply.defaults import MARKETID
LARGE_ORDER_THRESHOLD = 2**32
MARKET_MAKER_THRESHOLD = 2**63-1
ASK = +1
BID = -1
[docs]class Market:
"""
Representation of a market. Collects orders, implements a matching strategy for clearing,
finalizes post-matching.
If a grid_fee_matrix parameter is not given, the default grid fee will be used.
This class provides a basic matching strategy which may be overridden.
"""
def __init__(self, network=None, grid_fee_matrix=None, time_step=None, name=None):
self.orders = pd.DataFrame(columns=Order._fields)
if isinstance(name, str):
if not name == MARKETID:
self.name = name
else:
raise ValueError("Explicit Market definition is not allowed to have the default Market ID.")
else:
self.name = MARKETID
warnings.warn(f'No Market name specified. Using default name {MARKETID}.')
self.trades = None
self.cleared_volume = {}
self.matches = []
self.t_step = time_step
self.step = cfg.config.start
self.actor_callback = {}
self.network = network
self.save_csv = cfg.config.save_csv
try:
self.csv_path = Path(cfg.config.results_path)
# if the market_results directory does not already exist, create it
if not self.csv_path.exists() and self.save_csv:
self.csv_path.mkdir()
except (FileNotFoundError, TypeError) as e:
raise FileNotFoundError(f"Market is unable to save_csv to path: "
f"{cfg.config.results_path}: {e}")
except AttributeError as e:
# A missing path is only problematic, if files should be saved
if self.save_csv:
raise FileNotFoundError(f"Market is unable to save_csv to path: "
f"{cfg.config.results_path}: {e}")
self.grid_fee_matrix = grid_fee_matrix
if grid_fee_matrix is None:
warnings.warn("Pay-As-Bid market was generated without a grid_fee_matrix "
"in its constructor. The market will use the grid fee from the "
f"configuration for all trades.\n Grid Fee = "
f"{cfg.config.default_grid_fee}")
if self.save_csv:
match_header = ["time", "bid_id", "ask_id", "bid_actor", "ask_actor", "bid_cluster",
"ask_cluster", "energy", "price", 'included_grid_fee', 'market_name']
self.create_csv('matches.csv', match_header)
self.create_csv('orders.csv', list(Order._fields)+["market_name"])
[docs] def get_bids(self):
# Get all open bids in market. Returns dataframe.
return self.orders[self.orders["type"] == -1]
[docs] def get_asks(self):
# Get all open asks in market. Returns dataframe.
return self.orders[self.orders["type"] == 1]
[docs] def print(self):
# Debug: print bids and asks to terminal.
print(self.get_bids())
print(self.get_asks())
[docs] def reset(self):
self.matches = []
self.trades = None
self.actor_callback = {}
[docs] def accept_order(self, order, order_id=None, callback=None):
"""
Handle new order.
Order must have same time step as market, type must be -1 or +1.
Energy is quantized according to the market's energy unit (round down).
Signature of callback function: matching time, sign for energy direction
(opposite of order type), matched energy, matching price.
:param order: Order (type, time, actor_id, energy, price)
:param callback: callback function (called when order is successfully matched)
:param order_id: (optional) define order ID of the order to be inserted, otherwise
consecutive numbers are used (if this leads to overriding indices, an IndexError is
raised)
:return:
"""
if order is None:
return
if order.time != self.t_step:
raise ValueError("Wrong order time ({}), market is at time {}".format(order.time,
self.t_step))
# Ignore Orders without energy volume
if order.energy == 0:
return
if order.price is None:
raise ValueError("Wrong order price ({})".format(order.price))
if order.type not in [-1, 1]:
raise ValueError("Wrong order type ({})".format(order.type))
# look up cluster
if order.cluster is None and self.network is not None:
cluster = self.network.node_to_cluster.get(order.actor_id)
if cfg.config.verbose and cluster is not None:
warnings.warn(f"Order has cluster None. Found actor_id {order.actor_id} in network,"
f" new cluster: {cluster}")
order = order._replace(cluster=cluster)
# make certain energy has step size of energy_unit
energy = (
(order.energy + cfg.config.EPS) // cfg.config.energy_unit) * cfg.config.energy_unit
# make certain enough energy is traded
if energy < cfg.config.energy_unit:
return
order = order._replace(energy=energy)
# If an order ID parameter is not set,
# - raise error if current consecuitve number does not equal the total number of orders
# - otherwise ignore index -> consecutive numbers are intact
# otherwise adopt the ID, while checking it is not already used
if order_id is None:
if len(self.orders) != 0 and len(self.orders) - 1 != self.orders.index.max():
raise IndexError("Previous order IDs were defined externally and reset when "
"inserting orders without predefined order_id.")
self.orders = pd.concat(
[self.orders, pd.DataFrame([order], dtype=object)],
ignore_index=True
)
else:
if order_id in self.orders.index:
raise ValueError("Order ID ({}) already exists".format(order_id))
new_order = pd.DataFrame([order], dtype=object, index=[order_id])
self.orders = pd.concat([self.orders, new_order], ignore_index=False)
self.actor_callback[order.actor_id] = callback
self.append_to_csv([order], 'orders.csv')
[docs] def clear(self, reset=True):
"""
Clear market. Match orders, call callbacks of matched orders, reset/tidy up dataframes.
:param reset: not retaining orders for next market cycle
:return: None
"""
# TODO match bids
matches = self.match(show=cfg.config.show_prints)
self.matches.append(matches)
self.cleared_volume[self.t_step] = sum([m["energy"] for m in matches])
print(f"Market '{self.name}' cleared for time {self.t_step}"
f" ({self.step}/{cfg.config.start + cfg.config.nb_ts - 1}):")
for match in matches:
bid_actor_callback = self.actor_callback[match["bid_actor"]]
ask_actor_callback = self.actor_callback[match["ask_actor"]]
energy = match["energy"]
price = match["price"]
if bid_actor_callback is not None:
bid_actor_callback(self.t_step, 1, energy, price)
if ask_actor_callback is not None:
ask_actor_callback(self.t_step, -1, energy, price-match["included_grid_fee"])
if reset:
# don't retain orders for next cycle
self.orders = pd.DataFrame(columns=Order._fields)
else:
# remove fully matched orders
self.orders = self.orders[self.orders.energy >= cfg.config.energy_unit]
[docs] def match(self, show=False):
"""
Example matching algorithm: pay as bid, first come, first served.
Return structure: each match is a dict and has the following items:
time: current market time
bid_id: ID of bid order
ask_id: ID of ask order
bid_actor: ID of bidding actor
ask_actor: ID of asking actor
bid_cluster: cluster of bidding actor
ask_cluster: cluster of asking actor
energy: matched energy (multiple of market's energy unit)
price: matching price
This is meant to be replaced in subclasses.
:param show: show or print plots (mainly for debugging)
:return: list of dictionaries with matches
"""
# order by price (while previously original ordering is reversed for equal prices)
# i.e. higher probability of matching for higher ask prices or lower bid prices
bids = self.get_bids().iloc[::-1].sort_values(["price"], ascending=False)
asks = self.get_asks().iloc[::-1].sort_values(["price"], ascending=True)
matches = []
for ask_id, ask in asks.iterrows():
for bid_id, bid in bids.iterrows():
if ask.actor_id == bid.actor_id:
continue
if ask.energy >= cfg.config.energy_unit and bid.energy >= cfg.config.energy_unit \
and ask.price + cfg.config.default_grid_fee <= bid.price:
# match ask and bid
energy = min(ask.energy, bid.energy)
ask.energy -= energy
bid.energy -= energy
self.orders.loc[ask_id] = ask
self.orders.loc[bid_id] = bid
matches.append({
"time": self.t_step,
"bid_id": bid_id,
"ask_id": ask_id,
"bid_actor": bid.actor_id,
"ask_actor": ask.actor_id,
"bid_cluster": bid.cluster,
"ask_cluster": ask.cluster,
"energy": energy,
"price": bid.price
})
if show:
print(matches)
output = self.add_grid_fee_info(matches)
self.append_to_csv(output, 'matches.csv')
return matches
[docs] def append_to_csv(self, data, filename):
"""
append_to_csv() appends the given data to the specified CSV file. Extends data by market_name.
:param data: the data to be appended to the file, as a Pandas DataFrame
:param filename: the name of the file to which data should be appended
:return: None
"""
if self.save_csv:
saved_data = pd.DataFrame(data, dtype=object)
saved_data['market_name'] = self.name
saved_data.to_csv(self.csv_path / filename, mode='a', index=False, header=False)
[docs] def create_csv(self, filename, headers):
"""
create_csv() creates a new CSV file with the given filename at the given path with the given
headers.
:param filename: the name of the file to be created
:param headers: a list of strings representing the headers to be written to the file
:return: None
"""
with open(self.csv_path / filename, 'w') as f:
writer = csv.writer(f)
writer.writerow(headers)
[docs] def get_grid_fee(self, match=None, bid_cluster=None, ask_cluster=None):
"""
Returns the grid fee associated with the bid and ask clusters of a given match.
:param match: a dictionary representing a match, with keys 'bid_cluster' and 'ask_cluster'
:param bid_cluster: cluster id of ask
:param ask_cluster: cluster id of bid
:return: the grid fee associated with the given bid and ask clusters
"""
if match or match is not None:
if bid_cluster or ask_cluster:
warnings.warn('Either pass match OR ("bid_cluster" and "ask_cluster"),'
'otherwise only match information is considered')
# if match is given, data from the match is used. In other cases bid
bid_cluster = match['bid_cluster']
ask_cluster = match['ask_cluster']
if not self.grid_fee_matrix:
return cfg.config.default_grid_fee
else:
if bid_cluster is None or ask_cluster is None:
if cfg.config.verbose:
warnings.warn("At least one cluster is 'None', returning default grid fee.")
# default grid fee
return cfg.config.default_grid_fee
else:
return self.grid_fee_matrix[bid_cluster][ask_cluster]
[docs] def add_grid_fee_info(self, matches):
"""
Takes in a list of matches and returns the same list with an additional field 'grid_fee'
added to each match dictionary.
:param matches: a list of dictionaries representing matches, with keys 'bid_cluster' and
'ask_cluster'
:return: the input list of matches with the additional field 'grid_fee'
"""
output = []
for match in matches:
match['included_grid_fee'] = self.get_grid_fee(match)
output.append(match)
return output
[docs] def apply_grid_fee(self, ask, bid):
"""
Updates the given ask price by adding the grid fee associated with the given bid and ask
clusters.
:param ask: the ask price to be updated
:param bid: the bid used to determine the grid fee to be applied
:return: None
"""
try:
ask.price += self.grid_fee_matrix[bid.cluster][ask.cluster]
except TypeError:
# if an actor has none as cluster, e.g. the market maker, a TypeError will be thrown.
# use default grid fee in this case.
ask.price += cfg.config.default_grid_fee
[docs]def filter_orders(asks, bids):
large_asks_mask = asks.energy >= LARGE_ORDER_THRESHOLD
large_asks = asks[large_asks_mask]
asks_mm = large_asks[large_asks.energy >= MARKET_MAKER_THRESHOLD]
if len(asks_mm) > 1:
print(f"WARNING! More than one ask market maker:{len(asks_mm)}")
asks = asks[~large_asks_mask]
if len(large_asks) > len(asks_mm):
print("WARNING! {} large asks filtered".format(len(large_asks) - len(asks_mm)))
large_bids_mask = bids.energy >= LARGE_ORDER_THRESHOLD
large_bids = bids[large_bids_mask]
bids_mm = large_bids[large_bids.energy >= MARKET_MAKER_THRESHOLD]
if len(bids_mm) > 1:
print(f"WARNING! More than one bid market maker: {len(bids_mm)}")
bids = bids[~large_bids_mask]
if len(large_bids) > len(bids_mm):
print("WARNING! {} large bids filtered".format(len(large_bids) - len(bids_mm)))
return asks, asks_mm, bids, bids_mm, large_asks, large_bids