circuit_init.py 7.49 KiB
# @Author: Felix Kramer <kramer>
# @Date: 2021-05-08T20:34:30+02:00
# @Email: kramer@mpi-cbg.de
# @Project: go-with-the-flow
# @Last modified by: Felix Kramer
# @Last modified time: 2021-09-13T22:58:03+02:00
# @License: MIT
# standard types
import networkx as nx
import numpy as np
import pandas as pd
import sys
# custom embeddings/architectures
import kirchhoff.init_crystal as init_crystal
import kirchhoff.init_random as init_random
# custom output functions
import kirchhoff.draw_networkx as dx
def initialize_circuit_from_networkx(input_graph):
kirchhoff_graph=circuit()
kirchhoff_graph.default_init(input_graph)
return kirchhoff_graph
def initialize_circuit_from_crystal(crystal_type='default',periods=1):
kirchhoff_graph=circuit()
input_graph=init_crystal.init_graph_from_crystal(crystal_type,periods)
kirchhoff_graph.default_init(input_graph)
return kirchhoff_graph
def initialize_circuit_from_random(random_type='default',periods=10,sidelength=1):
kirchhoff_graph=circuit()
input_graph=init_random.init_graph_from_random(random_type,periods,sidelength)
kirchhoff_graph.default_init(input_graph)
return kirchhoff_graph
class circuit:
def __init__(self):
self.scales={
'conductance':1,
'flow':1,
'length':1
}
self.graph={
'source_mode':'',
'plexus_mode':'',
'threshold':0.001,
'num_sources':1
}
self.nodes=pd.DataFrame(
{
'source':[],
'potential':[],
'label':[],
}
)
self.edges=pd.DataFrame(
{
'conductivity':[],
'flow_rate':[],
}
)
self.set_graph_containers()
self.draw_weight_scaling=1.
def set_graph_containers(self):
self.G=nx.DiGraph()
self.H=nx.Graph()
self.H_C=[]
self.H_J=[]
self.list_graph_nodes=[]
self.list_graph_edges=[]
def default_init(self, input_graph):
self.G=nx.convert_node_labels_to_integers(input_graph, first_label=0, ordering='default')
self.initialize_circuit()
self.list_graph_nodes=list(self.G.nodes())
self.list_graph_edges=list(self.G.edges())
def initialize_circuit(self):
e=self.G.number_of_edges()
n=self.G.number_of_nodes()
init_val=['#269ab3',0,0,5]
init_attributes=['color','source','potential','conductivity']
for i,val in enumerate(init_val):
nx.set_node_attributes(self.G, val , name=init_attributes[i])
for k in self.nodes:
self.nodes[k]=np.zeros(n)
for k in self.edges:
self.edges[k]=np.ones(e)
self.set_network_attributes()
print('circuit(): initialized and ready for (some) action :)')
#get incidence atrix and its transpose
def get_incidence_matrices(self):
B=nx.incidence_matrix(self.G,nodelist=self.list_graph_nodes,edgelist=self.list_graph_edges,oriented=True).toarray()
BT=np.transpose(B)
return B,BT
# update network traits from dynamic data
def set_network_attributes(self):
#set potential node values
for i,n in enumerate(self.list_graph_nodes):
self.G.nodes[n]['potential']=self.nodes['potential'][i]
self.G.nodes[n]['label']=i
#set conductivity matrix
for j,e in enumerate(self.list_graph_edges):
self.G.edges[e]['conductivity']=self.edges['conductivity'][j]
self.G.edges[e]['label']=j
# clipp small edges & translate conductance into general edge weight
def clipp_graph(self):
#cut out edges which lie beneath a certain threshold value and export this clipped structure
self.set_network_attributes()
for e in self.list_graph_edges:
if self.G.edges[e]['conductivity'] > self.threshold:
self.H.add_edge(*e)
for k in self.G.edges[e].keys():
self.H.edges[e][k]=self.G.edges[e][k]
self.list_pruned_nodes=list(self.H.nodes())
self.list_pruned_edges=list(self.H.edges())
for n in list_pruned_nodes:
for k in self.G.nodes[n].keys():
self.H.nodes[n][k]=self.G.nodes[n][k]
self.H_J.append(self.G.nodes[n]['source'])
for e in list_pruned_edges:
self.H_C.append(self.H.edges[e]['conductivity'])
self.H_C=np.asarray(self.H_C)
self.H_J=np.asarray(self.H_J)
if len(list(self.H.nodes()))==0:
sys.exit('FAILED PRUNING')
def calc_root_incidence(self):
root=0
sink=0
for i,n in enumerate(self.list_graph_nodes):
if self.G.nodes[n]['source'] > 0:
root=n
if K.G.nodes[n]['source'] < 0:
sink=n
E_1=list(self.G.edges(root))
E_2=list(self.G.edges(sink))
E_ROOT=[]
E_SINK=[]
for e in E_1:
if e[0]!=root:
E_ROOT+=list(self.G.edges(e[0]))
else:
E_ROOT+=list(self.G.edges(e[1]))
for e in E_2:
if e[0]!=sink:
E_SINK+=list(self.G.edges(e[0]))
else:
E_SINK+=list(self.edges(e[1]))
return E_ROOT,E_SINK
# test consistency of conductancies & sources
def test_source_consistency(self):
self.set_network_attributes()
tolerance=0.000001
# check value consistency
sources=np.fromiter(nx.get_node_attributes(self.G, 'source').values(),float)
if np.sum(sources) > tolerance:
sys.exit('Error, input and ouput flows not balanced!')
else:
print('set_source_landscape(): '+self.graph['source_mode']+' is set and consistent :)')
def test_conductance_consistency(self):
self.set_network_attributes()
tolerance=0.000001
# check value consistency
conductivities=np.fromiter(nx.get_edge_attributes(self.G, 'conductivity').values(),float)
if len(np.where(conductivities <=0 )[0]) !=0:
sys.exit('Error, conductivities negaitve/zero!')
else:
print('set_plexus_landscape(): '+self.graph['plexus_mode']+' is set and consistent :)')
def get_pos(self):
pos_key='pos'
reset_layout=False
for j,n in enumerate(self.G.nodes()):
if pos_key not in self.G.nodes[n]:
reset_layout=True
if reset_layout:
print('set networkx.spring_layout()')
pos = nx.spring_layout(self.G)
else:
pos = nx.get_node_attributes(self.G,'pos')
return pos
def set_pos(self,pos_data={}):
pos_key='pos'
reset_layout=False
nodata=False
if len(pos_data.values())==0:
nodata=True
for j,n in enumerate(self.G.nodes()):
if pos_key not in self.G.nodes[n]:
reset_layout=True
if reset_layout and nodata:
print('set networkx.spring_layout()')
pos = nx.spring_layout(self.G)
nx.set_node_attributes(self.G, pos, 'pos')
else:
nx.set_node_attributes(self.G, pos_data, 'pos')
def set_scale_pars(self, new_parameters):
self.scales=new_parameters
def set_graph_pars(self, new_parameters):
self.graph=new_parameters
# output
def plot_circuit(self):
self.set_pos()
fig=dx.plot_networkx( self.G, edge_list=self.list_graph_edges, node_list=self.list_graph_nodes )
return fig