Skip to content

Commit 43b97ff

Browse files
committed
adding the read_state functionality and recreating the demo
1 parent 31364b8 commit 43b97ff

File tree

7 files changed

+712
-192
lines changed

7 files changed

+712
-192
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
__pycache__/
2+
.vscode/

.ipynb_checkpoints/demo-checkpoint.ipynb

+344-74
Large diffs are not rendered by default.

PandiNetwork.py

+14-3
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,16 @@ def get_vertices_schema(self):
3535
)
3636
return vertices_schema
3737

38+
def get_edges_schema(self):
39+
edges_schema = T.StructType(
40+
[
41+
T.StructField(name = "src", dataType=T.IntegerType(), nullable = False),
42+
T.StructField(name = "dst", dataType=T.IntegerType(), nullable = False),
43+
]
44+
)
45+
return edges_schema
46+
47+
3848
def toVertices(self, sdv):
3949
return sdv.rdd.toDF(['id', 'score'])
4050

@@ -73,11 +83,12 @@ def interact(self):
7383
self.edges.show()
7484
self.vertices.show()
7585
edges = self.edges.rdd.map(lambda x: (x.src, x.dst)).collect()
76-
vertices = self.vertices.rdd.map(lambda x: (x.id, x.score)).collect()
86+
keys,values = tuple(zip(*self.vertices.rdd.map(lambda x: (x.id, x.score)).collect()))
87+
print(keys, values, edges)
7788
G = nx.Graph()
78-
G.add_nodes_from([key for key,val in vertices])
89+
G.add_nodes_from(keys)
7990
G.add_edges_from(edges)
8091
plt.figure(figsize = (15,10))
81-
nx.draw(G, pos = nx.spring_layout(G,scale=10), node_size = 800, cmap=plt.get_cmap('viridis'), node_color=[val for key,val in vertices], with_labels=True, font_color='white')
92+
nx.draw(G, pos = nx.spring_layout(G,scale=10), node_size = 800, cmap=plt.get_cmap('brg'), node_color=values, with_labels=True, font_color='white')
8293
plt.show()
8394

PandiSim.py

+20-4
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,25 @@
11

2+
from pyspark import StorageLevel
3+
24
import sys
35
import os
46
sys.path.insert(1, './utils')
57

68
import PandiSimConfigInjection as config
9+
import SparkDependencyInjection as sdi
710

811

912

1013

11-
class PandiSim(config.PandiSimConfigInjection):
14+
class PandiSim(sdi.SparkDependencyInjection, config.PandiSimConfigInjection):
1215

13-
def __init__(self, network, epi_model, scoring_model, edge_model, params = {'take_screenshots':False}):
16+
def __init__(self, network, epi_model, scoring_model, edge_model, params = {'take_screenshots':False, 'destroy':False}):
1417
self.network = network
1518
self.epi_model = epi_model
1619
self.scoring_model = scoring_model
1720
self.edge_model = edge_model
1821
self.params = params
19-
self.params['t_end'] = epi_model.params['t_end']
22+
# self.params['t_end'] = epi_model.params['t_end']
2023

2124
def move(self):
2225
sotw = self.epi_model.next_sotw()[1]
@@ -31,6 +34,8 @@ def run(self, perc = 0.1):
3134
stopAt = self._perc_to_steps(perc)
3235

3336
for _ in range(stopAt):
37+
if self.epi_model.step >= 2 and self.params['destroy']:
38+
self.read_state()
3439
self.move()
3540
self.take_screenshot()
3641

@@ -46,4 +51,15 @@ def take_screenshot(self):
4651
.option('header', False).mode('overwrite').save(vertices_fil)
4752
self.network.edges\
4853
.write.format("csv").option("delimiter", ',')\
49-
.option('header', False).mode('overwrite').save(edges_fil)
54+
.option('header', False).mode('overwrite').save(edges_fil)
55+
56+
def read_state(self):
57+
hdfs = "hdfs://namenode:9000/"
58+
edges_fil = os.path.join(hdfs, self.read_from, f"step_{self.epi_model.step}", "edges.csv")
59+
vertices_fil = os.path.join(hdfs, self.read_from, f"step_{self.epi_model.step}", "vertices.csv")
60+
self.network.vertices = self.spark.read.format("csv").option("delimiter", ',')\
61+
.option('header', False).option('inferSchema', True).load(vertices_fil).toDF('id', 'score', 'health_status')\
62+
.sort('id').cache()
63+
self.network.edges = self.spark.read.format("csv").option("delimiter", ',')\
64+
.option('header', False).option('inferSchema', True).load(edges_fil).toDF('src', 'dst')\
65+
.cache()

demo.ipynb

+280-78
Large diffs are not rendered by default.

scoring_models/ScoringWalker.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,12 @@ def run(self):
2525
se.rdd = se.rdd.persist(StorageLevel.MEMORY_AND_DISK)
2626
D = sdm.SparseDistributedMatrix.diag(se)
2727
M = A.dot(D)
28-
C = A.dot(se).apply(lambda x: 1/x).outer(sdv.SparseDistributedVector.repeat(1, A.numRows()))
28+
b = A.dot(se).apply(lambda x: 1/x)
29+
print(b.rdd.collect())
30+
C = b.outer(sdv.SparseDistributedVector.repeat(1, A.numRows()))
31+
# print(C.entries.collect())
2932
P = M.multiply(C).transpose()
33+
# print(P.entries.collect())
3034
P.entries = P.entries.persist(StorageLevel.MEMORY_AND_DISK)
3135

3236
# running the walker:

test.py

+48-32
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
logger.LogManager.getLogger("akka").setLevel( logger.Level.ERROR )
4545

4646
sdi.SparkDependencyInjection.set_spark(spark).set_spark_context(sc)
47-
pci.PandiSimConfigInjection.set_write_to("d_pandisim")
47+
pci.PandiSimConfigInjection.set_write_to("d_pandisim").set_read_from("d_pandisim")
4848

4949
# a = SparseDistributedMatrix(sc, sc.parallelize([MatrixEntry(0, 0, 1),MatrixEntry(2, 0, 3),MatrixEntry(4, 0, 1)]), 4, 1).transpose()
5050
# o = SparseDistributedMatrix.ones(sc, 4).transpose()
@@ -74,7 +74,7 @@
7474

7575
# print(u.dot(a).rdd.collect())
7676
# print(u.dot(a).rdd.collect())
77-
# print(a.dot(a.dot(a.dot(u))).rdd.collect())
77+
# print(a.dot(u).rdd.collect())
7878
# print(v.dot(u))
7979
# print(v.outer(u).entries.collect())
8080
# print(u.op(v).rdd.collect())
@@ -110,60 +110,76 @@
110110
# print(v.rdd.collect())
111111
# print(v.op(ns, 'add').rdd.collect())
112112

113-
sir = ssir.Simple_SIR(
114-
inits = {'S':0.9, 'I':0.1, 'R':0},
115-
params = {'beta':0.35, 'gamma':0.07, 'N':6, 't_end':20, 'step_size':1}
116-
)
117-
sir.run()
118-
dr = sir.current_sotw()[1]
119-
120-
init = Initializer101(
121-
nbr_vertices = 6,
122-
nbr_edges = 2,
123-
nbr_infected = int(dr[0]),
124-
nbr_recovered = int(dr[1])
125-
)
113+
# sir = ssir.Simple_SIR(
114+
# inits = {'S':0.9, 'I':0.1, 'R':0},
115+
# params = {'beta':0.35, 'gamma':0.07, 'N':6, 't_end':20, 'step_size':1}
116+
# )
117+
# sir.run()
118+
# dr = sir.current_sotw()[1]
119+
120+
# init = Initializer101(
121+
# nbr_vertices = 6,
122+
# nbr_edges = 2,
123+
# nbr_infected = int(dr[0]),
124+
# nbr_recovered = int(dr[1])
125+
# )
126126
# init = Initializer101(
127127
# nbr_vertices = 20,
128128
# nbr_edges = 4,
129129
# nbr_infected = 3,
130130
# nbr_recovered = 2
131131
# )
132-
init.initialize_vertices()
133-
init.initialize_edges(init.vertices)
132+
# init.initialize_vertices()
133+
# init.initialize_edges(init.vertices)
134134

135135
# network = pn.PandiNetwork(init.vertices, init.edges, init.nbr_vertices)
136-
network = init.toPandiNetwork()
136+
# network = init.toPandiNetwork()
137137

138-
walker = sw.ScoringWalker(
139-
network,
140-
params = {'alpha-scaler':-2, 'walker-steps':3}
141-
)
138+
# walker = sw.ScoringWalker(
139+
# network,
140+
# params = {'alpha-scaler':-2, 'walker-steps':3}
141+
# )
142142

143143
# walker.run()
144144
# walker.annotate((2,1))
145145
# network.vertices.show()
146146
# network.edges.show()
147147

148-
edge_est = see.StochasticEdgeEstimator(
149-
network,
150-
params = {'SDF': 100, 'alpha': 80, 'beta': 100}
151-
)
148+
# edge_est = see.StochasticEdgeEstimator(
149+
# network,
150+
# params = {'SDF': 100, 'alpha': 80, 'beta': 100}
151+
# )
152152

153153
# edge_est.run()
154154
# network.vertices.show()
155155
# network.edges.show(50, False)
156156

157+
# pandisim = ps.PandiSim(
158+
# network = network,
159+
# epi_model = sir,
160+
# scoring_model = walker,
161+
# edge_model = edge_est,
162+
# params = {'take_screenshots':False}
163+
# )
164+
165+
# pandisim.move()
166+
# pandisim.take_screenshot()
167+
168+
# network.vertices.show()
169+
# network.edges.show()
170+
171+
172+
173+
network = pn.PandiNetwork(None,None,6)
174+
157175
pandisim = ps.PandiSim(
158176
network = network,
159-
epi_model = sir,
160-
scoring_model = walker,
161-
edge_model = edge_est,
177+
epi_model = None,
178+
scoring_model = None,
179+
edge_model = None,
162180
params = {'take_screenshots':False}
163181
)
164182

165-
pandisim.move()
166-
pandisim.take_screenshot()
167-
183+
pandisim.read_state(1)
168184
network.vertices.show()
169185
network.edges.show()

0 commit comments

Comments
 (0)