Skip to content

Commit b794777

Browse files
authored
Merge pull request grpc#16513 from ericgribkoff/python_unit_fork_tests
Add fork tests as Python unit tests
2 parents 3dacd1a + ff0d219 commit b794777

File tree

7 files changed

+242
-72
lines changed

7 files changed

+242
-72
lines changed

Diff for: src/python/grpcio/grpc/_channel.py

+1-2
Original file line numberDiff line numberDiff line change
@@ -1033,6 +1033,7 @@ def stream_stream(self,
10331033

10341034
def _close(self):
10351035
self._channel.close(cygrpc.StatusCode.cancelled, 'Channel closed!')
1036+
cygrpc.fork_unregister_channel(self)
10361037
_moot(self._connectivity_state)
10371038

10381039
def _close_on_fork(self):
@@ -1060,8 +1061,6 @@ def __del__(self):
10601061
# for as long as they are in use and to close them after using them,
10611062
# then deletion of this grpc._channel.Channel instance can be made to
10621063
# effect closure of the underlying cygrpc.Channel instance.
1063-
if cygrpc is not None: # Globals may have already been collected.
1064-
cygrpc.fork_unregister_channel(self)
10651064
# This prevent the failed-at-initializing object removal from failing.
10661065
# Though the __init__ failed, the removal will still trigger __del__.
10671066
if _moot is not None and hasattr(self, '_connectivity_state'):

Diff for: src/python/grpcio/grpc/_cython/_cygrpc/fork_posix.pyx.pxi

+28-16
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
1615
import logging
1716
import os
1817
import threading
@@ -37,15 +36,20 @@ _GRPC_ENABLE_FORK_SUPPORT = (
3736
os.environ.get('GRPC_ENABLE_FORK_SUPPORT', '0')
3837
.lower() in _TRUE_VALUES)
3938

39+
_fork_handler_failed = False
40+
4041
cdef void __prefork() nogil:
4142
with gil:
43+
global _fork_handler_failed
44+
_fork_handler_failed = False
4245
with _fork_state.fork_in_progress_condition:
4346
_fork_state.fork_in_progress = True
4447
if not _fork_state.active_thread_count.await_zero_threads(
4548
_AWAIT_THREADS_TIMEOUT_SECONDS):
4649
_LOGGER.error(
4750
'Failed to shutdown gRPC Python threads prior to fork. '
4851
'Behavior after fork will be undefined.')
52+
_fork_handler_failed = True
4953

5054

5155
cdef void __postfork_parent() nogil:
@@ -57,20 +61,28 @@ cdef void __postfork_parent() nogil:
5761

5862
cdef void __postfork_child() nogil:
5963
with gil:
60-
# Thread could be holding the fork_in_progress_condition inside of
61-
# block_if_fork_in_progress() when fork occurs. Reset the lock here.
62-
_fork_state.fork_in_progress_condition = threading.Condition()
63-
# A thread in return_from_user_request_generator() may hold this lock
64-
# when fork occurs.
65-
_fork_state.active_thread_count = _ActiveThreadCount()
66-
for state_to_reset in _fork_state.postfork_states_to_reset:
67-
state_to_reset.reset_postfork_child()
68-
_fork_state.fork_epoch += 1
69-
for channel in _fork_state.channels:
70-
channel._close_on_fork()
71-
# TODO(ericgribkoff) Check and abort if core is not shutdown
72-
with _fork_state.fork_in_progress_condition:
73-
_fork_state.fork_in_progress = False
64+
try:
65+
if _fork_handler_failed:
66+
return
67+
# Thread could be holding the fork_in_progress_condition inside of
68+
# block_if_fork_in_progress() when fork occurs. Reset the lock here.
69+
_fork_state.fork_in_progress_condition = threading.Condition()
70+
# A thread in return_from_user_request_generator() may hold this lock
71+
# when fork occurs.
72+
_fork_state.active_thread_count = _ActiveThreadCount()
73+
for state_to_reset in _fork_state.postfork_states_to_reset:
74+
state_to_reset.reset_postfork_child()
75+
_fork_state.postfork_states_to_reset = []
76+
_fork_state.fork_epoch += 1
77+
for channel in _fork_state.channels:
78+
channel._close_on_fork()
79+
with _fork_state.fork_in_progress_condition:
80+
_fork_state.fork_in_progress = False
81+
except:
82+
_LOGGER.error('Exiting child due to raised exception')
83+
_LOGGER.error(sys.exc_info()[0])
84+
os._exit(os.EX_USAGE)
85+
7486
if grpc_is_initialized() > 0:
7587
with gil:
7688
_LOGGER.error('Failed to shutdown gRPC Core after fork()')
@@ -148,7 +160,7 @@ def fork_register_channel(channel):
148160

149161
def fork_unregister_channel(channel):
150162
if _GRPC_ENABLE_FORK_SUPPORT:
151-
_fork_state.channels.remove(channel)
163+
_fork_state.channels.discard(channel)
152164

153165

154166
class _ActiveThreadCount(object):

Diff for: src/python/grpcio_tests/commands.py

+2
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,8 @@ class TestGevent(setuptools.Command):
111111
"""Command to run tests w/gevent."""
112112

113113
BANNED_TESTS = (
114+
# Fork support is not compatible with gevent
115+
'fork._fork_interop_test.ForkInteropTest',
114116
# These tests send a lot of RPCs and are really slow on gevent. They will
115117
# eventually succeed, but need to dig into performance issues.
116118
'unit._cython._no_messages_server_completion_queue_per_call_test.Test.test_rpcs',
+152
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
# Copyright 2019 gRPC authors.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
"""Client-side fork interop tests as a unit test."""
15+
16+
import six
17+
import subprocess
18+
import sys
19+
import threading
20+
import unittest
21+
from grpc._cython import cygrpc
22+
from tests.fork import methods
23+
24+
# New instance of multiprocessing.Process using fork without exec can and will
25+
# hang if the Python process has any other threads running. This includes the
26+
# additional thread spawned by our _runner.py class. So in order to test our
27+
# compatibility with multiprocessing, we first fork+exec a new process to ensure
28+
# we don't have any conflicting background threads.
29+
_CLIENT_FORK_SCRIPT_TEMPLATE = """if True:
30+
import os
31+
import sys
32+
from grpc._cython import cygrpc
33+
from tests.fork import methods
34+
35+
cygrpc._GRPC_ENABLE_FORK_SUPPORT = True
36+
os.environ['GRPC_POLL_STRATEGY'] = 'epoll1'
37+
methods.TestCase.%s.run_test({
38+
'server_host': 'localhost',
39+
'server_port': %d,
40+
'use_tls': False
41+
})
42+
"""
43+
_SUBPROCESS_TIMEOUT_S = 30
44+
45+
46+
@unittest.skipUnless(
47+
sys.platform.startswith("linux"),
48+
"not supported on windows, and fork+exec networking blocked on mac")
49+
@unittest.skipUnless(six.PY2, "https://github.com/grpc/grpc/issues/18075")
50+
class ForkInteropTest(unittest.TestCase):
51+
52+
def setUp(self):
53+
start_server_script = """if True:
54+
import sys
55+
import time
56+
57+
import grpc
58+
from src.proto.grpc.testing import test_pb2_grpc
59+
from tests.interop import methods as interop_methods
60+
from tests.unit import test_common
61+
62+
server = test_common.test_server()
63+
test_pb2_grpc.add_TestServiceServicer_to_server(
64+
interop_methods.TestService(), server)
65+
port = server.add_insecure_port('[::]:0')
66+
server.start()
67+
print(port)
68+
sys.stdout.flush()
69+
while True:
70+
time.sleep(1)
71+
"""
72+
self._server_process = subprocess.Popen(
73+
[sys.executable, '-c', start_server_script],
74+
stdout=subprocess.PIPE,
75+
stderr=subprocess.PIPE)
76+
timer = threading.Timer(_SUBPROCESS_TIMEOUT_S,
77+
self._server_process.kill)
78+
try:
79+
timer.start()
80+
self._port = int(self._server_process.stdout.readline())
81+
except ValueError:
82+
raise Exception('Failed to get port from server')
83+
finally:
84+
timer.cancel()
85+
86+
def testConnectivityWatch(self):
87+
self._verifyTestCase(methods.TestCase.CONNECTIVITY_WATCH)
88+
89+
def testCloseChannelBeforeFork(self):
90+
self._verifyTestCase(methods.TestCase.CLOSE_CHANNEL_BEFORE_FORK)
91+
92+
def testAsyncUnarySameChannel(self):
93+
self._verifyTestCase(methods.TestCase.ASYNC_UNARY_SAME_CHANNEL)
94+
95+
def testAsyncUnaryNewChannel(self):
96+
self._verifyTestCase(methods.TestCase.ASYNC_UNARY_NEW_CHANNEL)
97+
98+
def testBlockingUnarySameChannel(self):
99+
self._verifyTestCase(methods.TestCase.BLOCKING_UNARY_SAME_CHANNEL)
100+
101+
def testBlockingUnaryNewChannel(self):
102+
self._verifyTestCase(methods.TestCase.BLOCKING_UNARY_NEW_CHANNEL)
103+
104+
def testInProgressBidiContinueCall(self):
105+
self._verifyTestCase(methods.TestCase.IN_PROGRESS_BIDI_CONTINUE_CALL)
106+
107+
def testInProgressBidiSameChannelAsyncCall(self):
108+
self._verifyTestCase(
109+
methods.TestCase.IN_PROGRESS_BIDI_SAME_CHANNEL_ASYNC_CALL)
110+
111+
def testInProgressBidiSameChannelBlockingCall(self):
112+
self._verifyTestCase(
113+
methods.TestCase.IN_PROGRESS_BIDI_SAME_CHANNEL_BLOCKING_CALL)
114+
115+
def testInProgressBidiNewChannelAsyncCall(self):
116+
self._verifyTestCase(
117+
methods.TestCase.IN_PROGRESS_BIDI_NEW_CHANNEL_ASYNC_CALL)
118+
119+
def testInProgressBidiNewChannelBlockingCall(self):
120+
self._verifyTestCase(
121+
methods.TestCase.IN_PROGRESS_BIDI_NEW_CHANNEL_BLOCKING_CALL)
122+
123+
def tearDown(self):
124+
self._server_process.kill()
125+
126+
def _verifyTestCase(self, test_case):
127+
script = _CLIENT_FORK_SCRIPT_TEMPLATE % (test_case.name, self._port)
128+
process = subprocess.Popen(
129+
[sys.executable, '-c', script],
130+
stdout=subprocess.PIPE,
131+
stderr=subprocess.PIPE)
132+
timer = threading.Timer(_SUBPROCESS_TIMEOUT_S, process.kill)
133+
try:
134+
timer.start()
135+
try:
136+
out, err = process.communicate(timeout=_SUBPROCESS_TIMEOUT_S)
137+
except TypeError:
138+
# The timeout parameter was added in Python 3.3.
139+
out, err = process.communicate()
140+
except subprocess.TimeoutExpired:
141+
process.kill()
142+
raise RuntimeError('Process failed to terminate')
143+
finally:
144+
timer.cancel()
145+
self.assertEqual(
146+
0, process.returncode,
147+
'process failed with exit code %d (stdout: %s, stderr: %s)' %
148+
(process.returncode, out, err))
149+
150+
151+
if __name__ == '__main__':
152+
unittest.main(verbosity=2)

Diff for: src/python/grpcio_tests/tests/fork/client.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -63,12 +63,12 @@ def _test_case_from_arg(test_case_arg):
6363

6464
def test_fork():
6565
logging.basicConfig(level=logging.INFO)
66-
args = _args()
67-
if args.test_case == "all":
66+
args = vars(_args())
67+
if args['test_case'] == "all":
6868
for test_case in methods.TestCase:
6969
test_case.run_test(args)
7070
else:
71-
test_case = _test_case_from_arg(args.test_case)
71+
test_case = _test_case_from_arg(args['test_case'])
7272
test_case.run_test(args)
7373

7474

0 commit comments

Comments
 (0)