-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathnode.h
177 lines (139 loc) · 5.01 KB
/
node.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
// Copyright 2016 Dino Wernli. All Rights Reserved. See LICENSE for licensing terms.
#ifndef NODE_H_
#define NODE_H_
#include <future>
#include <iostream>
#include <set>
#include <string>
#include "error.h"
#include "input.h"
#include "output.h"
namespace ccproducers {
// The various states a node can be in during execution of a graph.
enum class NodeState { BLOCKED, RUNNING, FINISHED };
// Common base type for all node handles. Carry things around like an id used
// for retrieval of the real node within a producer graph.
class NodeHandleBase{
public:
NodeHandleBase(int node_id) : node_id_(node_id) {}
virtual ~NodeHandleBase() {}
// The id of the node this handle points to.
int NodeId() const {
return node_id_;
}
private:
int node_id_;
};
// A handle to a node with a specific output type.
template<class T>
class NodeHandle : public NodeHandleBase {
public:
NodeHandle(int node_id) : NodeHandleBase(node_id) {}
virtual ~NodeHandle() {}
};
// Base type for all nodes in the graph. Exists mainly because "Node" has a
// template parameter and we need a way to store pointers to nodes regardless
// of their exact type parameters.
class NodeBase {
public:
NodeBase(int id, std::string name, std::set<NodeBase*> deps);
const std::string& name() const { return name_; }
void Run();
bool IsDone() const;
void SetFinished();
void AddReverseDep(NodeBase* rdep);
// Attempts to set the node's state to running. Returns true if the node
// has transitioned to RUNNING as a result of this call.
bool TrySetRunning();
// Starts the execution of this node asynchronously. Does not block.
// Eventually, this node's result promise will be fulfilled.
void Start();
// Returns the transitive set of nodes which need to run in order for this
// node to have produced a result. In particular, the returned set contains
// this node.
std::set<NodeBase*> TransitiveDeps();
// Informs this node that another node has finished execution. The supplied
// node must be a dependency of this node.
void ReportFinished(NodeBase* node);
void DumpState() const {
std::cout << DebugPrefix() << std::endl;
}
protected:
virtual void RunProducer() = 0;
void TransitiveDepsInternal(std::set<NodeBase*>* result);
std::string DebugPrefix() const;
std::string DebugState() const;
private:
bool CanRun() const;
// The id of this node. Unique withing a producer graph.
int id_;
std::string name_;
// Holds the future used to track the async producer run.
std::future<void> async_future_;
// Deps (need to run before) and rdeps (can only run after) of this node.
// Don't need to be lock guarded because configuration happens before the
// execution of the graph.
std::set<NodeBase*> deps_;
std::set<NodeBase*> rdeps_;
// Holds the state of the node.
NodeState state_;
mutable std::recursive_mutex state_lock_;
// Holds the list of rdeps which have completed.
std::set<NodeBase*> finished_deps_;
mutable std::recursive_mutex finished_deps_lock_;
};
// Represents a node in the graph with an output of a specific type.
template<class T>
class Node : public NodeBase {
public:
Node(
int id,
std::string name,
std::function<Output<T>()> producer,
std::set<NodeBase*> deps)
: NodeBase(id, name, deps), producer_(producer) { }
// Returns a future which gets resolved once the producer for this node has
// been executed.
std::future<const T&> ResultFuture() {
return result_promise_.get_future();
}
// Returns nullptr until the producer of this node hass been executed.
const Output<T>* GetOutput() {
return result_.get();
}
protected:
void RunProducer() {
std::cout << DebugPrefix() << "Running producer" << std::endl;
// Try running the producer, making sure we recover from any exceptions.
try {
result_ = std::make_unique<Output<T>>(std::move(producer_()));
} catch (std::exception&) {
result_ = std::make_unique<Output<T>>(
Error("Exception while running producer"));
}
// Resolve the promise for the produced result.
try {
if (result_->IsError()) {
throw std::runtime_error("Producer ran and produced an error");
} else {
result_promise_.set_value(result_->get());
}
} catch (std::exception&) {
std::cout << DebugPrefix()
<< "Recovering from exception, setting failed" << std::endl;
result_promise_.set_exception(std::current_exception());
}
std::cout << DebugPrefix() << "Running producer finished" << std::endl;
}
private:
// This points to nullptr until RunProducer() is called.
std::unique_ptr<Output<T>> result_;
// A producer with all inputs bound to the results of other producers. This
// must only be executed if all dependency producers have already been run.
std::function<Output<T>()> producer_;
// A promise for the result. This is resolved once the result_ field above
// gets populated with a value or an error.
std::promise<const T&> result_promise_;
};
} // namespace ccproducers
#endif // NODE_H