Skip to content

Commit 44cbfb2

Browse files
authored
Add cluster-manager=local support. (#83)
Add cluster-manager=local support.
1 parent 672798b commit 44cbfb2

File tree

3 files changed

+154
-16
lines changed

3 files changed

+154
-16
lines changed

service_configuration_lib/spark_config.py

Lines changed: 53 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -71,11 +71,26 @@
7171
'spark.kubernetes.executor.label.paasta.yelp.com/cluster',
7272
}
7373
K8S_AUTH_FOLDER = '/etc/pki/spark'
74+
K8S_BASE_VOLUMES: List[Dict[str, str]] = [
75+
{'containerPath': K8S_AUTH_FOLDER, 'hostPath': K8S_AUTH_FOLDER, 'mode': 'RO'},
76+
{'containerPath': '/etc/passwd', 'hostPath': '/etc/passwd', 'mode': 'RO'},
77+
{'containerPath': '/etc/group', 'hostPath': '/etc/group', 'mode': 'RO'},
78+
]
7479

7580
log = logging.Logger(__name__)
7681
log.setLevel(logging.INFO)
7782

7883

84+
SUPPORTED_CLUSTER_MANAGERS = ['mesos', 'kubernetes', 'local']
85+
86+
87+
class UnsupportedClusterManagerException(Exception):
88+
89+
def __init__(self, manager: str):
90+
msg = f'Unsupported cluster manager {manager}, must be one of {SUPPORTED_CLUSTER_MANAGERS}'
91+
super().__init__(msg)
92+
93+
7994
def _load_aws_credentials_from_yaml(yaml_file_path) -> Tuple[str, str, Optional[str]]:
8095
with open(yaml_file_path, 'r') as yaml_file:
8196
try:
@@ -179,9 +194,7 @@ def _get_k8s_docker_volumes_conf(
179194
env = {}
180195
mounted_volumes = set()
181196
k8s_volumes = volumes or []
182-
k8s_volumes.append({'containerPath': K8S_AUTH_FOLDER, 'hostPath': K8S_AUTH_FOLDER, 'mode': 'RO'})
183-
k8s_volumes.append({'containerPath': '/etc/passwd', 'hostPath': '/etc/passwd', 'mode': 'RO'})
184-
k8s_volumes.append({'containerPath': '/etc/group', 'hostPath': '/etc/group', 'mode': 'RO'})
197+
k8s_volumes.extend(K8S_BASE_VOLUMES)
185198
_get_k8s_volume = functools.partial(_get_k8s_volume_hostpath_dict, count=itertools.count())
186199

187200
for volume in k8s_volumes:
@@ -454,6 +467,13 @@ def _adjust_spark_requested_resources(
454467
'spark.scheduler.maxRegisteredResourcesWaitingTime',
455468
str(waiting_time) + 'min',
456469
)
470+
elif cluster_manager == 'local':
471+
executor_instances = int(
472+
user_spark_opts.setdefault('spark.executor.instances', str(DEFAULT_EXECUTOR_INSTANCES)),
473+
)
474+
max_cores = executor_instances * executor_cores
475+
else:
476+
raise UnsupportedClusterManagerException(cluster_manager)
457477

458478
if max_cores < executor_cores:
459479
raise ValueError(f'Total number of cores {max_cores} is less than per-executor cores {executor_cores}')
@@ -649,6 +669,27 @@ def _get_k8s_spark_env(
649669
return spark_env
650670

651671

672+
def _get_local_spark_env(
673+
paasta_cluster: str,
674+
paasta_service: str,
675+
paasta_instance: str,
676+
volumes: Optional[List[Mapping[str, str]]],
677+
num_threads: int = 4,
678+
) -> Dict[str, str]:
679+
return {
680+
'spark.master': f'local[{num_threads}]',
681+
'spark.executorEnv.PAASTA_SERVICE': paasta_service,
682+
'spark.executorEnv.PAASTA_INSTANCE': paasta_instance,
683+
'spark.executorEnv.PAASTA_CLUSTER': paasta_cluster,
684+
'spark.executorEnv.PAASTA_INSTANCE_TYPE': 'spark',
685+
'spark.executorEnv.SPARK_EXECUTOR_DIRS': '/tmp',
686+
# Adding k8s docker volume params is a bit of a hack. PaasSTA
687+
# looks at the spark config to find the volumes it needs to mount
688+
# and so we are borrowing the spark k8s configs.
689+
**_get_k8s_docker_volumes_conf(volumes),
690+
}
691+
692+
652693
def _get_k8s_resource_name_limit_size_with_hash(name: str, limit: int = 63, suffix: int = 4) -> str:
653694
""" Returns `name` unchanged if it's length does not exceed the `limit`.
654695
Otherwise, returns truncated `name` with it's hash of size `suffix`
@@ -721,7 +762,7 @@ def get_spark_conf(
721762
) -> Dict[str, str]:
722763
"""Build spark config dict to run with spark on paasta
723764
724-
:param cluster_manager: which manager to use, value supported: [`mesos`, `kubernetes`]
765+
:param cluster_manager: which manager to use, must be in SUPPORTED_CLUSTER_MANAGERS
725766
:param spark_app_base_name: the base name to create spark app, we will append port
726767
and time to make the app name unique for easier to separate the output. Note that
727768
this is noop if `spark_opts_from_env` have `spark.app.name` configured.
@@ -812,8 +853,15 @@ def get_spark_conf(
812853
extra_volumes,
813854
paasta_pool,
814855
))
856+
elif cluster_manager == 'local':
857+
spark_conf.update(_get_local_spark_env(
858+
paasta_cluster,
859+
paasta_service,
860+
paasta_instance,
861+
extra_volumes,
862+
))
815863
else:
816-
raise ValueError('Unknown resource_manager, should be either [mesos,kubernetes]')
864+
raise UnsupportedClusterManagerException(cluster_manager)
817865

818866
# configure dynamic resource allocation configs
819867
spark_conf = get_dra_configs(spark_conf)

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
setup(
1919
name='service-configuration-lib',
20-
version='2.10.8',
20+
version='2.11.0',
2121
provides=['service_configuration_lib'],
2222
description='Start, stop, and inspect Yelp SOA services',
2323
url='https://github.com/Yelp/service_configuration_lib',

tests/spark_config_test.py

Lines changed: 100 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -149,10 +149,13 @@ class TestGetSparkConf:
149149
docker_image = 'docker-dev.yelp.com/test-image'
150150
executor_cores = '10'
151151
spark_app_base_name = 'test_app_base_name'
152-
extra_volumes = [{'hostPath': '/tmp', 'containerPath': '/tmp', 'mode': 'RO'}]
153152
default_mesos_leader = 'mesos://some-url.yelp.com:5050'
154153
aws_provider_key = 'spark.hadoop.fs.s3a.aws.credentials.provider'
155154

155+
@pytest.fixture
156+
def base_volumes(self):
157+
return [{'hostPath': '/tmp', 'containerPath': '/tmp', 'mode': 'RO'}]
158+
156159
@pytest.fixture
157160
def mock_paasta_volumes(self, monkeypatch, tmpdir):
158161
files = [f'paasta{i + 1}' for i in range(2)]
@@ -934,7 +937,7 @@ def verify(output):
934937
return verify
935938

936939
@pytest.mark.parametrize('use_temp_provider', [True, False])
937-
def test_get_spark_conf_aws_session(self, use_temp_provider):
940+
def test_get_spark_conf_aws_session(self, use_temp_provider, base_volumes):
938941
other_spark_opts = {'spark.driver.memory': '2g', 'spark.executor.memoryOverhead': '1024'}
939942
not_allowed_opts = {'spark.executorEnv.PAASTA_SERVICE': 'random-service'}
940943
user_spark_opts = {
@@ -954,7 +957,7 @@ def test_get_spark_conf_aws_session(self, use_temp_provider):
954957
paasta_service=self.service,
955958
paasta_instance=self.instance,
956959
docker_img=self.docker_image,
957-
extra_volumes=self.extra_volumes,
960+
extra_volumes=base_volumes,
958961
aws_creds=aws_creds,
959962
auto_set_temporary_credentials_provider=use_temp_provider,
960963
)
@@ -968,6 +971,7 @@ def test_get_spark_conf_mesos(
968971
self,
969972
user_spark_opts,
970973
spark_opts_from_env,
974+
base_volumes,
971975
ui_port,
972976
with_secret,
973977
mesos_leader,
@@ -1009,7 +1013,7 @@ def test_get_spark_conf_mesos(
10091013
paasta_service=self.service,
10101014
paasta_instance=self.instance,
10111015
docker_img=self.docker_image,
1012-
extra_volumes=self.extra_volumes,
1016+
extra_volumes=base_volumes,
10131017
aws_creds=aws_creds,
10141018
extra_docker_params=extra_docker_params,
10151019
with_secret=with_secret,
@@ -1038,7 +1042,7 @@ def test_get_spark_conf_mesos(
10381042
)
10391043
assert len(set(output.keys()) - verified_keys) == 0
10401044
mock_get_mesos_docker_volumes_conf.mocker.assert_called_once_with(
1041-
mock.ANY, self.extra_volumes, True,
1045+
mock.ANY, base_volumes, True,
10421046
)
10431047
mock_adjust_spark_requested_resources_mesos.mocker.assert_called_once_with(
10441048
mock.ANY, 'mesos', self.pool,
@@ -1053,8 +1057,15 @@ def test_get_spark_conf_mesos(
10531057
(warning_msg,), _ = mock_log.warning.call_args
10541058
assert next(iter(not_allowed_opts.keys())) in warning_msg
10551059

1060+
def _get_k8s_base_volumes(self):
1061+
"""Helper needed to allow tests to pass in github CI checks."""
1062+
return [
1063+
volume for volume in spark_config.K8S_BASE_VOLUMES
1064+
if os.path.exists(volume['containerPath'])
1065+
]
1066+
10561067
@pytest.fixture
1057-
def assert_kubernetes_conf(self):
1068+
def assert_kubernetes_conf(self, base_volumes):
10581069
expected_output = {
10591070
'spark.master': f'k8s://https://k8s.{self.cluster}.paasta:6443',
10601071
'spark.executorEnv.PAASTA_SERVICE': self.service,
@@ -1079,8 +1090,10 @@ def assert_kubernetes_conf(self):
10791090
'spark.kubernetes.executor.label.paasta.yelp.com/cluster': self.cluster,
10801091
'spark.kubernetes.node.selector.yelp.com/pool': self.pool,
10811092
'spark.kubernetes.executor.label.yelp.com/pool': self.pool,
1093+
'spark.logConf': 'true',
1094+
'spark.ui.showConsoleProgress': 'true',
10821095
}
1083-
for i, volume in enumerate(self.extra_volumes):
1096+
for i, volume in enumerate(base_volumes + self._get_k8s_base_volumes()):
10841097
expected_output[f'spark.kubernetes.executor.volumes.hostPath.{i}.mount.path'] = volume['containerPath']
10851098
expected_output[f'spark.kubernetes.executor.volumes.hostPath.{i}.mount.readOnly'] = str(
10861099
volume['mode'] == 'RO',
@@ -1093,11 +1106,12 @@ def verify(output):
10931106
return list(expected_output.keys())
10941107
return verify
10951108

1096-
def tes_leaderst_get_spark_conf_kubernetes(
1109+
def test_leaders_get_spark_conf_kubernetes(
10971110
self,
10981111
user_spark_opts,
10991112
spark_opts_from_env,
11001113
ui_port,
1114+
base_volumes,
11011115
mock_append_event_log_conf,
11021116
mock_append_aws_credentials_conf,
11031117
mock_append_sql_shuffle_partitions_conf,
@@ -1125,7 +1139,7 @@ def tes_leaderst_get_spark_conf_kubernetes(
11251139
paasta_service=self.service,
11261140
paasta_instance=self.instance,
11271141
docker_img=self.docker_image,
1128-
extra_volumes=self.extra_volumes,
1142+
extra_volumes=base_volumes,
11291143
aws_creds=aws_creds,
11301144
spark_opts_from_env=spark_opts_from_env,
11311145
)
@@ -1140,7 +1154,7 @@ def tes_leaderst_get_spark_conf_kubernetes(
11401154
list(mock_append_aws_credentials_conf.return_value.keys()) +
11411155
list(mock_append_sql_shuffle_partitions_conf.return_value.keys()),
11421156
)
1143-
assert len(set(output.keys()) - verified_keys) == 0
1157+
assert set(output.keys()) == verified_keys
11441158
mock_adjust_spark_requested_resources_kubernetes.mocker.assert_called_once_with(
11451159
mock.ANY, 'kubernetes', self.pool,
11461160
)
@@ -1152,6 +1166,82 @@ def tes_leaderst_get_spark_conf_kubernetes(
11521166
mock.ANY,
11531167
)
11541168

1169+
@pytest.fixture
1170+
def assert_local_conf(self, base_volumes):
1171+
expected_output = {
1172+
'spark.master': 'local[4]',
1173+
'spark.executorEnv.PAASTA_SERVICE': self.service,
1174+
'spark.executorEnv.PAASTA_INSTANCE': self.instance,
1175+
'spark.executorEnv.PAASTA_CLUSTER': self.cluster,
1176+
'spark.executorEnv.PAASTA_INSTANCE_TYPE': 'spark',
1177+
'spark.executorEnv.SPARK_EXECUTOR_DIRS': '/tmp',
1178+
'spark.logConf': 'true',
1179+
'spark.ui.showConsoleProgress': 'true',
1180+
}
1181+
for i, volume in enumerate(base_volumes + self._get_k8s_base_volumes()):
1182+
expected_output[f'spark.kubernetes.executor.volumes.hostPath.{i}.mount.path'] = volume['containerPath']
1183+
expected_output[f'spark.kubernetes.executor.volumes.hostPath.{i}.mount.readOnly'] = str(
1184+
volume['mode'] == 'RO',
1185+
).lower()
1186+
expected_output[f'spark.kubernetes.executor.volumes.hostPath.{i}.options.path'] = volume['hostPath']
1187+
1188+
def verify(output):
1189+
for key, value in expected_output.items():
1190+
assert output[key] == value
1191+
return list(expected_output.keys())
1192+
return verify
1193+
1194+
def test_local_spark(
1195+
self,
1196+
user_spark_opts,
1197+
spark_opts_from_env,
1198+
ui_port,
1199+
base_volumes,
1200+
mock_append_event_log_conf,
1201+
mock_append_aws_credentials_conf,
1202+
mock_append_sql_shuffle_partitions_conf,
1203+
mock_adjust_spark_requested_resources_kubernetes,
1204+
mock_time,
1205+
assert_ui_port,
1206+
assert_app_name,
1207+
assert_local_conf,
1208+
mock_log,
1209+
):
1210+
aws_creds = (None, None, None)
1211+
output = spark_config.get_spark_conf(
1212+
cluster_manager='local',
1213+
spark_app_base_name=self.spark_app_base_name,
1214+
user_spark_opts=user_spark_opts or {},
1215+
paasta_cluster=self.cluster,
1216+
paasta_pool=self.pool,
1217+
paasta_service=self.service,
1218+
paasta_instance=self.instance,
1219+
docker_img=self.docker_image,
1220+
extra_volumes=base_volumes,
1221+
aws_creds=aws_creds,
1222+
spark_opts_from_env=spark_opts_from_env,
1223+
)
1224+
verified_keys = set(
1225+
assert_ui_port(output) +
1226+
assert_app_name(output) +
1227+
assert_local_conf(output) +
1228+
list(mock_append_event_log_conf.return_value.keys()) +
1229+
list(mock_adjust_spark_requested_resources_kubernetes.return_value.keys()) +
1230+
list(mock_append_aws_credentials_conf.return_value.keys()) +
1231+
list(mock_append_sql_shuffle_partitions_conf.return_value.keys()),
1232+
)
1233+
assert set(output.keys()) == verified_keys
1234+
mock_append_event_log_conf.mocker.assert_called_once_with(
1235+
mock.ANY, *aws_creds,
1236+
)
1237+
mock_adjust_spark_requested_resources_kubernetes.mocker.assert_called_once_with(
1238+
mock.ANY, 'local', self.pool,
1239+
)
1240+
mock_append_aws_credentials_conf.mocker.assert_called_once_with(mock.ANY, *aws_creds)
1241+
mock_append_sql_shuffle_partitions_conf.mocker.assert_called_once_with(
1242+
mock.ANY,
1243+
)
1244+
11551245
@pytest.mark.parametrize('reason', ['mesos_leader', 'mesos_secret'])
11561246
def test_get_spark_conf_mesos_error(self, reason, monkeypatch, mock_request_mesos_leader):
11571247
if reason == 'mesos_leader':

0 commit comments

Comments
 (0)