Skip to content

Commit cd61115

Browse files
CaptainSameSameer Sharma
andauthored
MLCOMPUTE-595 | remove DRA disable suggestion for notebooks (#103)
* MLCOMPUTE-595 | remove DRA disable suggestion for notebooks, fix cachedExecutorIdleTimeout warning * MLCOMPUTE-595 | added cost warnings * MLCOMPUTE-595 | added nicer formatting * MLCOMPUTE-595 | changed srv config keys to replicate spark configs * MLCOMPUTE-595 | moved cost factor constants to srv configs * MLCOMPUTE-595 | add unit tests, deprecated mesos tests --------- Co-authored-by: Sameer Sharma <[email protected]>
1 parent db83140 commit cd61115

File tree

3 files changed

+282
-138
lines changed

3 files changed

+282
-138
lines changed

service_configuration_lib/spark_config.py

Lines changed: 111 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import yaml
2323
from boto3 import Session
2424

25+
from service_configuration_lib.text_colors import TextColors
26+
2527
AWS_CREDENTIALS_DIR = '/etc/boto_cfg/'
2628
AWS_ENV_CREDENTIALS_PROVIDER = 'com.amazonaws.auth.EnvironmentVariableCredentialsProvider'
2729
GPU_POOLS_YAML_FILE_PATH = '/nail/srv/configs/gpu_pools.yaml'
@@ -79,15 +81,19 @@
7981
# Spark srv-configs
8082
spark_srv_conf = dict()
8183
spark_constants = dict()
84+
default_spark_conf = dict()
85+
spark_costs = dict()
8286

8387

8488
def _load_spark_srv_conf(preset_values: Dict[str, Any] = dict()):
85-
global spark_srv_conf, spark_constants
89+
global spark_srv_conf, spark_constants, default_spark_conf, spark_costs
8690
try:
8791
with open(DEFAULT_SPARK_RUN_CONFIG) as fp:
8892
loaded_values = yaml.safe_load(fp.read())
8993
spark_srv_conf = {**preset_values, **loaded_values}
9094
spark_constants = spark_srv_conf.get('spark_constants', dict())
95+
default_spark_conf = spark_constants.get('defaults', dict())
96+
spark_costs = spark_constants.get('cost_factor', dict())
9197
except Exception as e:
9298
log.warning(f'Failed to load {DEFAULT_SPARK_RUN_CONFIG}: {e}')
9399

@@ -136,11 +142,11 @@ def get_aws_credentials(
136142
elif aws_credentials_json:
137143
with open(aws_credentials_json, 'r') as f:
138144
creds = json.load(f)
139-
return (creds.get('accessKeyId'), creds.get('secretAccessKey'), None)
145+
return creds.get('accessKeyId'), creds.get('secretAccessKey'), None
140146
elif assume_web_identity:
141147
credentials = assume_aws_role_web_identity(session_duration)
142148
if credentials:
143-
return (credentials['AccessKeyId'], credentials['SecretAccessKey'], credentials['SessionToken'])
149+
return credentials['AccessKeyId'], credentials['SecretAccessKey'], credentials['SessionToken']
144150
else:
145151
log.warning(
146152
'Tried to assume role with web identity but neither '
@@ -280,7 +286,7 @@ def _append_sql_partitions_conf(spark_opts: Dict[str, str]) -> Dict[str, str]:
280286
num_partitions = 3 * (
281287
int(spark_opts.get('spark.cores.max', 0)) or
282288
int(spark_opts.get('spark.executor.instances', 0)) *
283-
int(spark_opts.get('spark.executor.cores', spark_constants.get('default_executor_cores', 2)))
289+
int(spark_opts.get('spark.executor.cores', default_spark_conf.get('spark.executor.cores', 2)))
284290
)
285291

286292
if (
@@ -292,18 +298,23 @@ def _append_sql_partitions_conf(spark_opts: Dict[str, str]) -> Dict[str, str]:
292298

293299
num_partitions_dra = (
294300
int(spark_opts.get('spark.dynamicAllocation.maxExecutors', 0)) *
295-
int(spark_opts.get('spark.executor.cores', spark_constants.get('default_executor_cores', 2)))
301+
int(spark_opts.get('spark.executor.cores', default_spark_conf.get('spark.executor.cores', 2)))
296302
)
297303
num_partitions = max(num_partitions, num_partitions_dra)
298304

299-
num_partitions = num_partitions or spark_constants.get('default_sql_shuffle_partitions', 128)
305+
num_partitions = num_partitions or default_spark_conf.get('spark.sql.shuffle.partitions', 128)
300306
_append_spark_config(spark_opts, 'spark.sql.shuffle.partitions', str(num_partitions))
301307
_append_spark_config(spark_opts, 'spark.sql.files.minPartitionNum', str(num_partitions))
302308
_append_spark_config(spark_opts, 'spark.default.parallelism', str(num_partitions))
303309

304310
return spark_opts
305311

306312

313+
def _is_jupyterhub_job(spark_app_name: str) -> bool:
314+
# TODO: add regex to better match Jupyterhub Spark session app name
315+
return 'jupyterhub' in spark_app_name
316+
317+
307318
def get_dra_configs(spark_opts: Dict[str, str]) -> Dict[str, str]:
308319
# don't enable DRA if it is explicitly disabled
309320
if (
@@ -312,43 +323,49 @@ def get_dra_configs(spark_opts: Dict[str, str]) -> Dict[str, str]:
312323
):
313324
return spark_opts
314325

326+
spark_app_name = spark_opts.get('spark.app.name', '')
327+
315328
log.warning(
316-
'Spark Dynamic Resource Allocation (DRA) enabled for this batch. More info: y/spark-dra. '
317-
'If your job is performing worse because of DRA, consider disabling DRA. To disable, '
318-
'please provide spark.dynamicAllocation.enabled=false in your spark args\n',
329+
TextColors.yellow(
330+
'\nSpark Dynamic Resource Allocation (DRA) enabled for this batch. More info: y/spark-dra.\n',
331+
),
319332
)
320333

321-
spark_app_name = spark_opts.get('spark.app.name', '')
322334
# set defaults if not provided already
323335
_append_spark_config(spark_opts, 'spark.dynamicAllocation.enabled', 'true')
324336
_append_spark_config(spark_opts, 'spark.dynamicAllocation.shuffleTracking.enabled', 'true')
325337
_append_spark_config(
326338
spark_opts, 'spark.dynamicAllocation.executorAllocationRatio',
327-
str(spark_constants.get('default_dra_executor_allocation_ratio', 0.8)),
339+
str(default_spark_conf.get('spark.dynamicAllocation.executorAllocationRatio', 0.8)),
328340
)
329-
cached_executor_idle_timeout = spark_constants.get('default_dra_cached_executor_idle_timeout', '900s')
341+
cached_executor_idle_timeout = default_spark_conf.get('spark.dynamicAllocation.cachedExecutorIdleTimeout', '1500s')
342+
if 'spark.dynamicAllocation.cachedExecutorIdleTimeout' not in spark_opts:
343+
if _is_jupyterhub_job(spark_app_name):
344+
# increase cachedExecutorIdleTimeout by 15 minutes in case of Jupyterhub
345+
cached_executor_idle_timeout = str(int(cached_executor_idle_timeout[:-1]) + 900) + 's'
346+
log.warning(
347+
f'\nSetting {TextColors.yellow("spark.dynamicAllocation.cachedExecutorIdleTimeout")} as '
348+
f'{cached_executor_idle_timeout}. Executor with cached data block will be released '
349+
f'if it has been idle for this duration. If you wish to change the value of cachedExecutorIdleTimeout, '
350+
f'please provide the exact value of spark.dynamicAllocation.cachedExecutorIdleTimeout '
351+
f'in your spark args. If your job is performing bad because the cached data was lost, '
352+
f'please consider increasing this value.\n',
353+
)
330354
_append_spark_config(
331355
spark_opts, 'spark.dynamicAllocation.cachedExecutorIdleTimeout',
332356
cached_executor_idle_timeout,
333357
)
334-
log.warning(
335-
f'\nSetting spark.dynamicAllocation.cachedExecutorIdleTimeout as {cached_executor_idle_timeout}. '
336-
f'Executor with cached data block will be released if it has been idle for this duration. '
337-
f'If you wish to change the value of cachedExecutorIdleTimeout, please provide the exact value of '
338-
f'spark.dynamicAllocation.cachedExecutorIdleTimeout in your spark args. If your job is performing bad because '
339-
f'the cached data was lost, please consider increasing this value.\n',
340-
)
341358

342359
min_ratio_executors = None
343-
default_dra_min_executor_ratio = spark_constants.get('default_dra_min_executor_ratio', 0.25)
360+
default_dra_min_executor_ratio = default_spark_conf.get('spark.yelp.dra.minExecutorRatio', 0.25)
344361
if 'spark.dynamicAllocation.minExecutors' not in spark_opts:
345362
# the ratio of total executors to be used as minExecutors
346363
min_executor_ratio = spark_opts.get('spark.yelp.dra.minExecutorRatio', default_dra_min_executor_ratio)
347364
# set minExecutors default as a ratio of spark.executor.instances
348365
num_instances = int(
349366
spark_opts.get(
350367
'spark.executor.instances',
351-
spark_constants.get('default_executor_instances', 2),
368+
default_spark_conf.get('spark.executor.instances', 2),
352369
),
353370
)
354371
min_executors = int(num_instances * float(min_executor_ratio))
@@ -364,11 +381,10 @@ def get_dra_configs(spark_opts: Dict[str, str]) -> Dict[str, str]:
364381

365382
min_ratio_executors = min_executors
366383

367-
warn_msg = '\nSetting spark.dynamicAllocation.minExecutors as'
384+
warn_msg = f'\nSetting {TextColors.yellow("spark.dynamicAllocation.minExecutors")} as'
368385

369386
# set minExecutors equal to 0 for Jupyter Spark sessions
370-
# TODO: add regex to better match Jupyterhub Spark session app name
371-
if 'jupyterhub' in spark_app_name:
387+
if _is_jupyterhub_job(spark_app_name):
372388
min_executors = 0
373389
warn_msg = (
374390
f'Looks like you are launching Spark session from a Jupyter notebook. '
@@ -383,8 +399,7 @@ def get_dra_configs(spark_opts: Dict[str, str]) -> Dict[str, str]:
383399
f'the exact value of spark.dynamicAllocation.minExecutors in your spark args\n',
384400
)
385401

386-
# TODO: add regex to better match Jupyterhub Spark session app name
387-
if 'jupyterhub' not in spark_app_name and 'spark.yelp.dra.minExecutorRatio' not in spark_opts:
402+
if not _is_jupyterhub_job(spark_app_name) and 'spark.yelp.dra.minExecutorRatio' not in spark_opts:
388403
log.debug(
389404
f'\nspark.yelp.dra.minExecutorRatio not provided. This specifies the ratio of total executors '
390405
f'to be used as minimum executors for Dynamic Resource Allocation. More info: y/spark-dra. Using '
@@ -397,7 +412,7 @@ def get_dra_configs(spark_opts: Dict[str, str]) -> Dict[str, str]:
397412
max_executors = int(
398413
spark_opts.get(
399414
'spark.executor.instances',
400-
spark_constants.get('default_executor_instances', 2),
415+
default_spark_conf.get('spark.executor.instances', 2),
401416
),
402417
)
403418
# maxExecutors should not be less than initialExecutors
@@ -406,8 +421,8 @@ def get_dra_configs(spark_opts: Dict[str, str]) -> Dict[str, str]:
406421

407422
spark_opts['spark.dynamicAllocation.maxExecutors'] = str(max_executors)
408423
log.warning(
409-
f'\nSetting spark.dynamicAllocation.maxExecutors as {max_executors}. If you wish to '
410-
f'change the value of maximum executors, please provide the exact value of '
424+
f'\nSetting {TextColors.yellow("spark.dynamicAllocation.maxExecutors")} as {max_executors}. '
425+
f'If you wish to change the value of maximum executors, please provide the exact value of '
411426
f'spark.dynamicAllocation.maxExecutors in your spark args\n',
412427
)
413428

@@ -423,8 +438,8 @@ def get_dra_configs(spark_opts: Dict[str, str]) -> Dict[str, str]:
423438

424439
spark_opts['spark.dynamicAllocation.initialExecutors'] = str(initial_executors)
425440
log.warning(
426-
f'\nSetting spark.dynamicAllocation.initialExecutors as {initial_executors}. If you wish to '
427-
f'change the value of initial executors, please provide the exact value of '
441+
f'\nSetting {TextColors.yellow("spark.dynamicAllocation.initialExecutors")} as {initial_executors}. '
442+
f'If you wish to change the value of initial executors, please provide the exact value of '
428443
f'spark.dynamicAllocation.initialExecutors in your spark args\n',
429444
)
430445

@@ -501,7 +516,7 @@ def compute_executor_instances_k8s(user_spark_opts: Dict[str, str]) -> int:
501516
executor_cores = int(
502517
user_spark_opts.get(
503518
'spark.executor.cores',
504-
spark_constants.get('default_executor_cores', 2),
519+
default_spark_conf.get('spark.executor.cores', 2),
505520
),
506521
)
507522

@@ -514,8 +529,8 @@ def compute_executor_instances_k8s(user_spark_opts: Dict[str, str]) -> int:
514529
# spark.executor.instances and spark.cores.max not provided, the executor instances should at least
515530
# be equal to `default_executor_instances`.
516531
executor_instances = max(
517-
spark_constants.get('default_max_cores', 4) // executor_cores,
518-
spark_constants.get('default_executor_instances', 2),
532+
default_spark_conf.get('spark.cores.max', 4) // executor_cores,
533+
default_spark_conf.get('spark.executor.instances', 2),
519534
)
520535

521536
# Deprecation message
@@ -567,23 +582,23 @@ def _recalculate_executor_resources(
567582
executor_cores = int(
568583
user_spark_opts.get(
569584
'spark.executor.cores',
570-
spark_constants.get('default_executor_cores', 2),
585+
default_spark_conf.get('spark.executor.cores', 2),
571586
),
572587
)
573588
executor_memory = user_spark_opts.get(
574589
'spark.executor.memory',
575-
f'{spark_constants.get("default_executor_memory", 28)}g',
590+
f'{default_spark_conf.get("spark.executor.memory", 28)}g',
576591
)
577592
executor_instances = int(
578593
user_spark_opts.get(
579594
'spark.executor.instances',
580-
spark_constants.get('default_executor_instances', 2),
595+
default_spark_conf.get('spark.executor.instances', 2),
581596
),
582597
)
583598
task_cpus = int(
584599
user_spark_opts.get(
585600
'spark.task.cpus',
586-
spark_constants.get('default_task_cpus', 1),
601+
default_spark_conf.get('spark.task.cpus', 1),
587602
),
588603
)
589604

@@ -701,14 +716,14 @@ def _adjust_spark_requested_resources(
701716
executor_cores = int(
702717
user_spark_opts.setdefault(
703718
'spark.executor.cores',
704-
str(spark_constants.get('default_executor_cores', 2)),
719+
str(default_spark_conf.get('spark.executor.cores', 2)),
705720
),
706721
)
707722
if cluster_manager == 'mesos':
708723
max_cores = int(
709724
user_spark_opts.setdefault(
710725
'spark.cores.max',
711-
str(spark_constants.get('default_max_cores', 4)),
726+
str(default_spark_conf.get('spark.cores.max', 4)),
712727
),
713728
)
714729
executor_instances = max_cores / executor_cores
@@ -723,7 +738,7 @@ def _adjust_spark_requested_resources(
723738
user_spark_opts['spark.executor.memoryOverhead'] = user_spark_opts['spark.mesos.executor.memoryOverhead']
724739
user_spark_opts.setdefault(
725740
'spark.kubernetes.allocation.batch.size',
726-
str(spark_constants.get('default_k8s_batch_size', 512)),
741+
str(default_spark_conf.get('spark.kubernetes.allocation.batch.size', 512)),
727742
)
728743
user_spark_opts.setdefault('spark.kubernetes.executor.limit.cores', str(executor_cores))
729744
waiting_time = (
@@ -738,7 +753,7 @@ def _adjust_spark_requested_resources(
738753
executor_instances = int(
739754
user_spark_opts.setdefault(
740755
'spark.executor.instances',
741-
str(spark_constants.get('default_executor_instances', 2)),
756+
str(default_spark_conf.get('spark.executor.instances', 2)),
742757
),
743758
)
744759
max_cores = executor_instances * executor_cores
@@ -1021,6 +1036,57 @@ def _convert_user_spark_opts_value_to_str(user_spark_opts: Mapping[str, Any]) ->
10211036
return output
10221037

10231038

1039+
def compute_approx_hourly_cost_dollars(spark_conf, paasta_cluster, paasta_pool):
1040+
per_executor_cores = int(spark_conf.get(
1041+
'spark.executor.cores',
1042+
default_spark_conf.get('spark.executor.cores', 2),
1043+
))
1044+
max_cores = per_executor_cores * (int(spark_conf.get(
1045+
'spark.executor.instances',
1046+
default_spark_conf.get('spark.executor.instances', 2),
1047+
)))
1048+
min_cores = max_cores
1049+
if 'spark.dynamicAllocation.enabled' in spark_conf and spark_conf['spark.dynamicAllocation.enabled'] == 'true':
1050+
max_cores = per_executor_cores * (int(
1051+
spark_conf.get('spark.dynamicAllocation.maxExecutors', max_cores),
1052+
))
1053+
min_cores = per_executor_cores * (int(
1054+
spark_conf.get('spark.dynamicAllocation.minExecutors', min_cores),
1055+
))
1056+
1057+
if paasta_pool == 'batch':
1058+
default_cost_factor = 0.041
1059+
else:
1060+
default_cost_factor = 0.142
1061+
cost_factor = spark_costs.get(paasta_cluster, dict()).get(paasta_pool, default_cost_factor)
1062+
1063+
min_dollars = round(min_cores * cost_factor, 5)
1064+
max_dollars = round(max_cores * cost_factor, 5)
1065+
if max_dollars * 24 > spark_constants.get('high_cost_threshold_daily', 500):
1066+
log.warning(
1067+
TextColors.red(
1068+
TextColors.bold(
1069+
'\n!!!!! HIGH COST ALERT !!!!!',
1070+
),
1071+
),
1072+
)
1073+
if min_dollars != max_dollars:
1074+
log.warning(
1075+
TextColors.magenta(
1076+
f'\nThe requested resources are expected to cost a maximum of $ {TextColors.bold(str(max_dollars))} '
1077+
f'every hour and $ {TextColors.bold(str(max_dollars * 24))} in a day.\n',
1078+
),
1079+
)
1080+
else:
1081+
log.warning(
1082+
TextColors.magenta(
1083+
f'\nThe requested resources are expected to cost $ {max_dollars} every hour and $ {max_dollars * 24} '
1084+
f'in a day.\n',
1085+
),
1086+
)
1087+
return min_dollars, max_dollars
1088+
1089+
10241090
def get_spark_conf(
10251091
cluster_manager: str,
10261092
spark_app_base_name: str,
@@ -1154,6 +1220,9 @@ def get_spark_conf(
11541220
if cluster_manager != 'mesos':
11551221
spark_conf = get_dra_configs(spark_conf)
11561222

1223+
# generate cost warnings
1224+
compute_approx_hourly_cost_dollars(spark_conf, paasta_cluster, paasta_pool)
1225+
11571226
# configure spark_event_log
11581227
spark_conf = _append_event_log_conf(spark_conf, *aws_creds)
11591228

0 commit comments

Comments
 (0)