Source code for simply.power_network

import inspect
import json
import random
import networkx as nx
from networkx.readwrite import json_graph
import matplotlib.pyplot as plt
from pathlib import Path

import simply.config as cfg


[docs]class PowerNetwork: """ Representation of energy grid and associated grid fees. """ def __init__(self, name, network, weight_factor=None): """ New network model. Sets edge weights to leaf nodes to 0 (cluster). Calculates shortest paths, as network is unlikely to change again. :param name: name of network :type name: string :param network: graph representation :type network: networkx graph :param weight_factor: scale graph edge weights to power transmission cost. Can be set in config: network->weight_factor. Default 1 :type weight_factor: float """ self.name = name # all leaf nodes are potential connection points for actors self.leaf_nodes = [n for n, d in network.degree() if d == 1] # holds shortest paths from each node to each other node self.short_paths = None # clusters is list of sets with node IDs. Unused. self.clusters = [] # reverse lookup: node ID -> cluster index. Unused. self.node_to_cluster = {} # matrix with (scaled) weights between clusters self.grid_fee_matrix = [] if weight_factor is None: weight_factor = cfg.parser.getfloat("network", "weight_factor", fallback=1) self.network = network self.update_shortest_paths() self.generate_grid_fee_matrix(weight_factor)
[docs] def update_shortest_paths(self): self.short_paths = dict(nx.shortest_path(self.network, weight="weight"))
[docs] def generate_grid_fee_matrix(self, weight_factor=1): # clustering of nodes by weight. Within cluster, edges have weight 0 # Reset cluster list, lookup and fee matrix self.clusters = [] self.node_to_cluster = {} self.grid_fee_matrix = [] # BFS: start with any node nodes = [list(self.network.nodes)[0]] while nodes: # get first node from list. Guaranteed to not be part of prior cluster u = nodes.pop(0) # start new cluster with this node cluster = len(self.clusters) self.clusters.append({u}) self.node_to_cluster[u] = cluster # check neighbors using BFS cluster_nodes = [u] while cluster_nodes: # get next neighbor node node = cluster_nodes.pop(0) for edge in self.network.edges(node, data=True): # get target of this connection (neighbor of neighbor) v = edge[1] if v in self.node_to_cluster: # already visited continue if edge[2].get("weight", 0) == 0: # weight zero: part of cluster # add to cluster set self.clusters[-1].add(v) self.node_to_cluster[v] = cluster # add to list of neighbors to check later cluster_nodes.append(v) else: # not part of cluster # add to list of nodes that form new clusters nodes.append(v) # Calculate accumulated weights on path between clusters and actor nodes # Get any one node from each cluster root_nodes = {i: list(c)[0] for i, c in enumerate(self.clusters)} # init weight matrix with zeros num_root_nodes = len(root_nodes) self.grid_fee_matrix = [[0] * num_root_nodes for _ in range(num_root_nodes)] # fill weight matrix # matrix symmetric: only need to compute half of values, diagonal is 0 for i, n1 in root_nodes.items(): for j, n2 in root_nodes.items(): if i > j: # get weight between n1 and n2 w = self.get_path_weight(n1, n2) * weight_factor self.grid_fee_matrix[i][j] = w self.grid_fee_matrix[j][i] = w if i == j: self.grid_fee_matrix[i][j] = cfg.config.local_grid_fee print(f"Generated grid fee matrix: {self.grid_fee_matrix}")
[docs] def to_image(self, dirpath=Path("./")): fig = self.plot(False) fig.savefig(dirpath / f"{self.name}.png") plt.close(fig)
[docs] def plot(self, show=True): fig = plt.figure() # TODO: improved plot with or without Graphvis # from simply.plotting import plot_hierarchical # plot_hierarchical(self.network) try: plot_topology_graphvis(self.network) except (ImportError, FileNotFoundError): # In case dot/Graphviz is not installed nx.draw(self.network, with_labels=True, font_weight="bold", node_size=50) if show: plt.show() return fig
[docs] def to_dict(self): return json_graph.node_link_data(self.network)
[docs] def to_json(self): filename = self.name + ".json" net_json = json_graph.node_link_data(self.network) json.dump(net_json, open(filename, "w"), indent=2)
[docs] def add_actors_random(self, actors): actor_nodes = list([a.id for a in actors]) random.shuffle(actor_nodes) connections = random.choices(self.leaf_nodes, k=len(actors)) map = {} while actor_nodes: a = actor_nodes.pop() c = connections.pop() map[a] = c self.network.add_edge(c, a, weight=0) # The Actor knows its grid connection node ID for a in actors: a.grid_id = map[a.id] return map
[docs] def add_actors_map(self, map): for a, c in map.items(): self.network.add_edge(c, a, weight=0) return map
[docs] def get_path_weight(self, u, v): # get weight of path between nodes # returns sum of edge weights on shortest path if u == v: return 0 if self.short_paths is None: self.update_shortest_paths() path = self.short_paths[u][v] path_weight = 0 u = path[0] for v in path[1:]: path_weight += self.network[u][v].get("weight", 0) u = v return path_weight
[docs] def get_cluster_weights(self, c1, c2): # get all weighted pathlength between two clusters of nodes # returns dict with nodes from c1 -> nodes from c2 -> weight # unused weights = {u: {} for u in c1} for u in c1: for v in c2: weights[u][v] = self.get_path_weight(u, v) return weights
[docs]def create_random(nodes): nw = nx.random_tree(nodes) # Add random weights in [0, 1] with 0.1 resolution for e in nw.edges: nw[e[0]][e[1]]["weight"] = random.randint(0, 10) * 0.1 return PowerNetwork("random", nw)
[docs]def create_random2(nodes): nw = nx.random_tree(nodes) # TODO check also: nx.balanced_tree(branches, height) leaf_nodes = sorted(n for n, d in nw.degree() if d == 1) offset = 0 for n in leaf_nodes: for i in range(random.randint(2, 5)): offset += 1 nw.add_edge(n, nodes + offset) # Add random weights in [0, 1] with 0.1 resolution for e in nw.edges: nw[e[0]][e[1]]["weight"] = random.randint(0, 10) * 0.1 return PowerNetwork("random", nw)
[docs]def load_network(): nw = nx.random_lobster(3, 0.5, 0.2) # Add random weights in [0, 1] with 0.1 resolution for e in nw.edges: nw[e[0]][e[1]]["weight"] = random.randint(0, 10) * 0.1 return PowerNetwork("random", nw)
[docs]def create_power_network_from_config(network_path, weight_factor=1): with open(network_path) as user_file: file_contents = user_file.read() network_json = json.loads(file_contents) network_name = list(network_json.keys())[0] network_json = list(network_json.values())[0] if "edges" in network_json: edge_key = "edges" elif "links" in network_json: edge_key = "links" else: raise ValueError( f"Unsupported node-link JSON format. Expected 'edges' or 'links', got keys: {list(network_json.keys())}" ) sig = inspect.signature(json_graph.node_link_graph) params = sig.parameters common_kwargs = { "directed": network_json.get("directed", False), "multigraph": network_json.get("multigraph", False), } if "edges" in params: # newer NetworkX network = json_graph.node_link_graph( network_json, edges=edge_key, **common_kwargs, ) elif "link" in params: # intermediate NetworkX 3.x network = json_graph.node_link_graph( network_json, link=edge_key, **common_kwargs, ) elif "attrs" in params: # older NetworkX 2.x network = json_graph.node_link_graph( network_json, attrs={ "source": "source", "target": "target", "name": "id", "key": "key", "link": edge_key, }, **common_kwargs, ) else: raise TypeError( f"Unsupported networkx.node_link_graph signature: {sig}" ) return PowerNetwork(network_name, network, weight_factor)
[docs]def remove_weights_from_leef_nodes(network): leaf_nodes = [n for n, d in network.degree() if d == 1] # leaves with their parents (no weight between them) for leaf in leaf_nodes: for u, v, d in network.edges(leaf, data=True): d["weight"] = 0
[docs]def plot_topology_graphvis(G): from networkx.drawing.nx_pydot import graphviz_layout pos = graphviz_layout(G, prog="dot") nx.draw(G, pos, with_labels=True, font_weight='bold', node_size=50)