@@ -268,13 +268,22 @@ def _append_sql_partitions_conf(spark_opts: Dict[str, str]) -> Dict[str, str]:
268268
269269
270270def get_dra_configs (spark_opts : Dict [str , str ]) -> Dict [str , str ]:
271+ # don't enable DRA if it is explicitly disabled
271272 if (
272- 'spark.dynamicAllocation.enabled' not in spark_opts or
273+ 'spark.dynamicAllocation.enabled' in spark_opts and
273274 str (spark_opts ['spark.dynamicAllocation.enabled' ]) != 'true'
274275 ):
275276 return spark_opts
276277
278+ log .warning (
279+ 'Spark Dynamic Resource Allocation (DRA) enabled for this batch. More info: y/spark-dra. '
280+ 'If your job is performing worse because of DRA, consider disabling DRA. To disable, '
281+ 'please provide spark.dynamicAllocation.enabled=false in your spark args\n ' ,
282+ )
283+
284+ spark_app_name = spark_opts .get ('spark.app.name' , '' )
277285 # set defaults if not provided already
286+ _append_spark_config (spark_opts , 'spark.dynamicAllocation.enabled' , 'true' )
278287 _append_spark_config (spark_opts , 'spark.dynamicAllocation.shuffleTracking.enabled' , 'true' )
279288 _append_spark_config (
280289 spark_opts , 'spark.dynamicAllocation.executorAllocationRatio' ,
@@ -288,10 +297,11 @@ def get_dra_configs(spark_opts: Dict[str, str]) -> Dict[str, str]:
288297 f'\n Setting spark.dynamicAllocation.cachedExecutorIdleTimeout as { DEFAULT_DRA_CACHED_EXECUTOR_IDLE_TIMEOUT } . '
289298 f'Executor with cached data block will be released if it has been idle for this duration. '
290299 f'If you wish to change the value of cachedExecutorIdleTimeout, please provide the exact value of '
291- f'spark.dynamicAllocation.cachedExecutorIdleTimeout in -- spark- args. If your job is performing bad because '
300+ f'spark.dynamicAllocation.cachedExecutorIdleTimeout in your spark args. If your job is performing bad because '
292301 f'the cached data was lost, please consider increasing this value.\n ' ,
293302 )
294303
304+ min_ratio_executors = None
295305 if 'spark.dynamicAllocation.minExecutors' not in spark_opts :
296306 # the ratio of total executors to be used as minExecutors
297307 min_executor_ratio = spark_opts .get ('spark.yelp.dra.minExecutorRatio' , DEFAULT_DRA_MIN_EXECUTOR_RATIO )
@@ -308,19 +318,34 @@ def get_dra_configs(spark_opts: Dict[str, str]) -> Dict[str, str]:
308318 float (min_executor_ratio )),
309319 )
310320
321+ min_ratio_executors = min_executors
322+
323+ warn_msg = '\n Setting spark.dynamicAllocation.minExecutors as'
324+
325+ # set minExecutors equal to 0 for Jupyter Spark sessions
326+ # TODO: add regex to better match Jupyterhub Spark session app name
327+ if 'jupyterhub' in spark_app_name :
328+ min_executors = 0
329+ warn_msg = (
330+ f'Looks like you are launching Spark session from a Jupyter notebook. '
331+ f'{ warn_msg } { min_executors } to save spark costs when any spark action is not running'
332+ )
333+ else :
334+ warn_msg = f'{ warn_msg } { min_executors } '
335+
311336 spark_opts ['spark.dynamicAllocation.minExecutors' ] = str (min_executors )
312337 log .warning (
313- f'\n Setting spark.dynamicAllocation.minExecutors as { min_executors } . If you wish to '
314- f'change the value of minimum executors, please provide the exact value of '
315- f'spark.dynamicAllocation.minExecutors in --spark-args\n ' ,
338+ f'\n { warn_msg } . If you wish to change the value of minimum executors, please provide '
339+ f'the exact value of spark.dynamicAllocation.minExecutors in your spark args\n ' ,
316340 )
317341
318- if 'spark.yelp.dra.minExecutorRatio' not in spark_opts :
342+ # TODO: add regex to better match Jupyterhub Spark session app name
343+ if 'jupyterhub' not in spark_app_name and 'spark.yelp.dra.minExecutorRatio' not in spark_opts :
319344 log .debug (
320345 f'\n spark.yelp.dra.minExecutorRatio not provided. This specifies the ratio of total executors '
321346 f'to be used as minimum executors for Dynamic Resource Allocation. More info: y/spark-dra. Using '
322347 f'default ratio: { DEFAULT_DRA_MIN_EXECUTOR_RATIO } . If you wish to change this value, please provide '
323- f'the desired spark.yelp.dra.minExecutorRatio in -- spark- args\n ' ,
348+ f'the desired spark.yelp.dra.minExecutorRatio in your spark args\n ' ,
324349 )
325350
326351 if 'spark.dynamicAllocation.maxExecutors' not in spark_opts :
@@ -334,7 +359,24 @@ def get_dra_configs(spark_opts: Dict[str, str]) -> Dict[str, str]:
334359 log .warning (
335360 f'\n Setting spark.dynamicAllocation.maxExecutors as { max_executors } . If you wish to '
336361 f'change the value of maximum executors, please provide the exact value of '
337- f'spark.dynamicAllocation.maxExecutors in --spark-args\n ' ,
362+ f'spark.dynamicAllocation.maxExecutors in your spark args\n ' ,
363+ )
364+
365+ # TODO: add regex to better match Jupyterhub Spark session app name
366+ if 'jupyterhub' in spark_app_name and 'spark.dynamicAllocation.initialExecutors' not in spark_opts :
367+ if min_ratio_executors is not None :
368+ # set initialExecutors default equal to minimum executors calculated above using
369+ # 'spark.yelp.dra.minExecutorRatio' and DEFAULT_DRA_MIN_EXECUTOR_RATIO for Jupyter Spark sessions
370+ initial_executors = min_ratio_executors
371+ else :
372+ # otherwise set initial executors equal to minimum executors
373+ initial_executors = int (spark_opts ['spark.dynamicAllocation.minExecutors' ])
374+
375+ spark_opts ['spark.dynamicAllocation.initialExecutors' ] = str (initial_executors )
376+ log .warning (
377+ f'\n Setting spark.dynamicAllocation.initialExecutors as { initial_executors } . If you wish to '
378+ f'change the value of initial executors, please provide the exact value of '
379+ f'spark.dynamicAllocation.initialExecutors in your spark args\n ' ,
338380 )
339381
340382 spark_opts ['spark.executor.instances' ] = spark_opts ['spark.dynamicAllocation.minExecutors' ]
@@ -896,7 +938,8 @@ def get_spark_conf(
896938 raise UnsupportedClusterManagerException (cluster_manager )
897939
898940 # configure dynamic resource allocation configs
899- spark_conf = get_dra_configs (spark_conf )
941+ if cluster_manager != 'mesos' :
942+ spark_conf = get_dra_configs (spark_conf )
900943
901944 # configure spark_event_log
902945 spark_conf = _append_event_log_conf (spark_conf , * aws_creds )
0 commit comments