@@ -413,6 +413,25 @@ def _append_aws_credentials_conf(
413413 return spark_opts
414414
415415
416+ def compute_executor_instances_k8s (user_spark_opts : Dict [str , str ]) -> int :
417+ if 'spark.executor.instances' in user_spark_opts :
418+ return int (user_spark_opts ['spark.executor.instances' ])
419+
420+ executor_cores = int (user_spark_opts .get ('spark.executor.cores' , DEFAULT_EXECUTOR_CORES ))
421+ if 'spark.cores.max' in user_spark_opts :
422+ # spark.cores.max provided, calculate based on (max cores // per-executor cores)
423+ executor_instances = (int (user_spark_opts ['spark.cores.max' ]) // executor_cores )
424+ log .warning (
425+ f'spark.cores.max should no longer be provided and should be replaced '
426+ f'by the exact value of spark.executor.instances in --spark-args' ,
427+ )
428+ else :
429+ # spark.executor.instances and spark.cores.max not provided, the executor instances should at least
430+ # be equal to DEFAULT_EXECUTOR_INSTANCES.
431+ executor_instances = max (DEFAULT_MAX_CORES // executor_cores , DEFAULT_EXECUTOR_INSTANCES )
432+ return executor_instances
433+
434+
416435def _adjust_spark_requested_resources (
417436 user_spark_opts : Dict [str , str ],
418437 cluster_manager : str ,
@@ -424,33 +443,14 @@ def _adjust_spark_requested_resources(
424443 max_cores = int (user_spark_opts .setdefault ('spark.cores.max' , str (DEFAULT_MAX_CORES )))
425444 executor_instances = max_cores / executor_cores
426445 elif cluster_manager == 'kubernetes' :
427- # TODO(gcoll|COREML-2697): Consider cleaning this part of the code up
428- # once mesos is not longer around at Yelp.
429- if 'spark.executor.instances' not in user_spark_opts :
430-
431- executor_instances = int (user_spark_opts .get (
432- 'spark.cores.max' ,
433- str (DEFAULT_MAX_CORES ),
434- )) // executor_cores
435- user_spark_opts ['spark.executor.instances' ] = str (executor_instances )
436-
437- if user_spark_opts ['spark.executor.instances' ] == str (DEFAULT_EXECUTOR_INSTANCES ):
438- log .warning (
439- f'spark.executor.instances not provided. Setting spark.executor.instances as '
440- f'{ executor_instances } . If you wish to change the number of executors, please '
441- f'provide the exact value of spark.executor.instances in --spark-args' ,
442- )
443-
446+ executor_instances = compute_executor_instances_k8s (user_spark_opts )
447+ user_spark_opts .setdefault ('spark.executor.instances' , str (executor_instances ))
448+ max_cores = executor_instances * executor_cores
444449 if (
445450 'spark.mesos.executor.memoryOverhead' in user_spark_opts and
446451 'spark.executor.memoryOverhead' not in user_spark_opts
447452 ):
448453 user_spark_opts ['spark.executor.memoryOverhead' ] = user_spark_opts ['spark.mesos.executor.memoryOverhead' ]
449-
450- executor_instances = int (
451- user_spark_opts .setdefault ('spark.executor.instances' , str (DEFAULT_EXECUTOR_INSTANCES )),
452- )
453- max_cores = executor_instances * executor_cores
454454 user_spark_opts .setdefault (
455455 'spark.kubernetes.allocation.batch.size' ,
456456 str (DEFAULT_K8S_BATCH_SIZE ),
0 commit comments