forked from pytorch/pytorch
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_fx_passes.py
276 lines (225 loc) · 9.01 KB
/
test_fx_passes.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
# Owner(s): ["oncall: fx"]
import operator
import logging
import torch
from torch.fx._symbolic_trace import symbolic_trace
from torch.fx.passes.infra.partitioner import CapabilityBasedPartitioner
from torch.fx.passes.operator_support import OperatorSupport
from torch.fx.passes.utils.fuser_utils import fuse_by_partitions
from torch.testing._internal.common_utils import run_tests, parametrize, instantiate_parametrized_tests
from torch.testing._internal.jit_utils import JitTestCase
logging.basicConfig(level=logging.WARNING)
logger = logging.getLogger(__name__)
class TestModule(torch.nn.Module):
def __init__(self):
super().__init__()
self.linear = torch.nn.Linear(4, 4)
self.linear2 = torch.nn.Linear(4, 4)
self.param = torch.nn.Parameter(torch.rand(4, 4))
def forward(self, a, b, c):
add = a + b
linear_1 = self.linear(add)
add_1 = add + c
add_2 = add_1 + self.param
add_3 = add_1 + linear_1
add_4 = add_2 + add_3
linear_2 = self.linear2(add_4)
add_5 = linear_2 + add_4
add_6 = add_5 + a
relu = add_6.relu()
return add_4, add_6, relu
class TestPartitionFunctions:
@staticmethod
def forward1(a, b, c):
add = a + b
add_1 = add + b
add_2 = add_1 + c
relu_1 = add_2.relu()
add_3 = add_1 + add_2
add_4 = add_1 + relu_1 + add_3
relu_2 = add_4.relu()
add_5 = relu_2 + add_4
add_6 = add_5 + add_4
return add_4, add_6
@staticmethod
def forward2(a, b, _):
add = a + b
add_1 = add + b
relu_1 = add_1.relu() # blocked by this
add_3 = add_1 + relu_1
add_4 = add_1 + add_3
return add_4, add_1
@staticmethod
def forward3(a, b, c):
add = a + b
add_1 = a + c
add_2 = b + c
return add, add_1, add_2
@staticmethod
def forward4(a, b, c):
add = a + b
add_1 = a + c
add_2 = b + c
return torch.where(add > 0, add_1, add_2)
@staticmethod
def forward5(a, b, c):
# add should be fused right branch, as left branch is not supported
add = a + 1
# left branch
relu = add.relu()
# right branch
add_1 = add + 2
return relu, add_1
@staticmethod
def forward6(a, b, c):
# add should have its own partition, as neither branchs are supported
add = a + 1
# left branch
relu = add.relu()
# right branch
relu_1 = add.relu()
return relu, relu_1
@staticmethod
def forward7(a, b, c):
# both branches are supported, all adds should be fused together
add = a + 1
# left branch
add_1 = add + 2
# right branch is larger
add_2 = add + 1
add_3 = add_2 + 1
return add_3, add_1
@staticmethod
def forward8(a, b, c):
# both branches are in the same partition, add should join the same partition
add = a + 1
# left branch
add_1 = add + 2
# right branch
add_2 = add + 1
# left and right branch merges
add_3 = add_2 + add_1
return add_3
@staticmethod
def forward9(a, b, c):
add = a + 1
# branch 1
add_1 = add + 1
# branch 2
add_2 = add + 1
# branch_3
add_3 = add + 1
out = torch.stack([add_1, add_2, add_3])
return out
@staticmethod
def forward10(a, b, c):
add = a + 1
# branch 1
add_1 = add + 1
# branch 2
add_2 = add + 1
# branch 3: depends on branch 2
add_3 = add + add_2
out = torch.stack([add_1, add_2, add_3])
return out
@staticmethod
def forward11(a, b, c):
add = a + 1
# branch 1
add_1 = add.relu()
# branch 2 depends on branch 1
add_2 = add + add_1
# branch 3
add_3 = add.relu()
out = torch.stack([add_1, add_2, add_3])
return out
# A mock OperatorSupport class, where only operator.add is supported
class MockOperatorSupport(OperatorSupport):
def is_node_supported(self, submodules, node: torch.fx.Node) -> bool:
return node.op == "call_function" and node.target in {operator.add}
class TestFXGraphPasses(JitTestCase):
@parametrize("fn, expected_partition", [
(TestPartitionFunctions.forward1, [["add_7", "add_6"], ["add_5", "add_4", "add_3"], ["add_2", "add_1", "add"]]),
(TestPartitionFunctions.forward2, [["add_3", "add_2"], ["add_1", "add"]]),
# 2 branches cases
(TestPartitionFunctions.forward5, [["add_1", "add"]]),
(TestPartitionFunctions.forward6, [["add"]]),
(TestPartitionFunctions.forward7, [["add_3", "add_2", "add", "add_1"]]),
(TestPartitionFunctions.forward8, [["add_3", "add_2", "add", "add_1"]]),
# 3 branch cases
(TestPartitionFunctions.forward9, [['add_3', 'add_2', 'add_1', 'add']]),
(TestPartitionFunctions.forward10, [['add_3', 'add_2', 'add', 'add_1']]),
(TestPartitionFunctions.forward11, [['add_1'], ['add']]),
])
def test_partitioner(self, fn, expected_partition):
traced = symbolic_trace(fn)
supported_ops = MockOperatorSupport()
partitioner = CapabilityBasedPartitioner(traced, supported_ops, allows_single_node_partition=True)
partitions = partitioner.propose_partitions()
partitions_name = [[node.name for node in partition.nodes] for partition in partitions]
assert len(partitions_name) == len(expected_partition)
for i in range(len(partitions_name)):
assert set(partitions_name[i]) == set(expected_partition[i])
fused_graph = partitioner.fuse_partitions(partitions)
a, b, c = torch.rand(4), torch.rand(4), torch.rand(4)
expected = fn(a, b, c)
result = fused_graph(a, b, c)
torch.testing.assert_close(expected, result)
@parametrize("fn, expected_partition", [
# horizontal fusion without a common downstream node, not supported yet
(TestPartitionFunctions.forward3, [["add_2", "add_1", "add"]]),
# horizontal fusion with a common downstream node, not supported yet
(TestPartitionFunctions.forward4, [["add_2", "add_1", "add"]]),
])
def test_partitioner_xfail(self, fn, expected_partition):
traced = symbolic_trace(fn)
supported_ops = MockOperatorSupport()
partitioner = CapabilityBasedPartitioner(traced, supported_ops, allows_single_node_partition=True)
partitions = partitioner.propose_partitions()
partitions_name = [[node.name for node in partition.nodes] for partition in partitions]
with self.assertRaises(Exception):
assert len(partitions_name) == len(expected_partition)
@parametrize("partition", [
[['add', 'add_1'], ['add_5', 'add_6']],
[['add', 'add_1', 'add_2']], # vertical fusion
[['add_2', 'add_3']], # horizontal fusion
[['add_3', 'add_4']],
[['add_6', 'add_5']], # arbitray node order
[['add_4', 'add_1', 'add_3', 'add_2']], # arbitray node order
[['add_5', 'add_6'], ['add_1', 'add_2', 'add_3', 'add_4']], # arbitray partition order
[['add_5', 'linear2']], # includes call_function + call_module node
[['add_6', 'relu']], # includes call_function + call_module node
[['param', 'add_2']], # includes get_attr + call_module nodes
[['param', 'add_1', 'linear']], # includes get_attr + call_function + call_module nodes
[["add", "linear", "add_1", "param", "add_2", "add_3", "add_4", "linear2", "add_5", "add_6", "relu"]], # full graph
])
def test_fuser_util(self, partition):
m = TestModule()
gm = symbolic_trace(m)
nodes_by_name = {node.name : node for node in gm.graph.nodes}
partitions = []
for node_names in partition:
partitions.append([nodes_by_name[name] for name in node_names])
fused_graph = fuse_by_partitions(gm, partitions)
a, b, c = torch.rand(4), torch.rand(4), torch.rand(4)
expected = m(a, b, c)
result = fused_graph(a, b, c)
torch.testing.assert_close(expected, result)
@parametrize("partition", [
[['add', 'add_1'], ['add_1', 'add_5', 'add_6']], # add_1 exists in multiple partitions
[['add', 'add_1', 'add_3']], # invalid partition: circular dependency
[['add_4', 'add_5']], # invalid partition: circular dependency
[['relu', 'add_5']], # invalid partition: circular dependency
])
def test_fuser_util_xfail(self, partition):
m = TestModule()
gm = symbolic_trace(m)
nodes_by_name = {node.name : node for node in gm.graph.nodes}
partitions = []
for node_names in partition:
partitions.append([nodes_by_name[name] for name in node_names])
with self.assertRaises(Exception):
fuse_by_partitions(gm, partitions)
instantiate_parametrized_tests(TestFXGraphPasses)
if __name__ == "__main__":
run_tests()