1+ import base64
12import functools
23import hashlib
34import itertools
3435DEFAULT_EXECUTOR_CORES = 2
3536DEFAULT_EXECUTOR_INSTANCES = 2
3637DEFAULT_EXECUTOR_MEMORY = '4g'
38+ DEFAULT_K8S_LABEL_LENGTH = 63
3739
3840
3941NON_CONFIGURABLE_SPARK_OPTS = {
@@ -446,6 +448,12 @@ def _get_k8s_spark_env(
446448 volumes : Optional [List [Mapping [str , str ]]],
447449 paasta_pool : str ,
448450) -> Dict [str , str ]:
451+ # RFC 1123: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#dns-label-names
452+ # technically only paasta instance can be longer than 63 chars. But we apply the normalization regardless.
453+ # NOTE: this affects only k8s labels, not the pod names.
454+ _paasta_cluster = _get_k8s_resource_name_limit_size_with_hash (paasta_cluster )
455+ _paasta_service = _get_k8s_resource_name_limit_size_with_hash (paasta_service )
456+ _paasta_instance = _get_k8s_resource_name_limit_size_with_hash (paasta_instance )
449457 spark_env = {
450458 'spark.master' : f'k8s://https://k8s.{ paasta_cluster } .paasta:6443' ,
451459 'spark.executorEnv.PAASTA_SERVICE' : paasta_service ,
@@ -460,19 +468,37 @@ def _get_k8s_spark_env(
460468 'spark.kubernetes.authenticate.clientKeyFile' : f'{ K8S_AUTH_FOLDER } /{ paasta_cluster } -client.key' ,
461469 'spark.kubernetes.authenticate.clientCertFile' : f'{ K8S_AUTH_FOLDER } /{ paasta_cluster } -client.crt' ,
462470 'spark.kubernetes.container.image.pullPolicy' : 'Always' ,
463- 'spark.kubernetes.executor.label.yelp.com/paasta_service' : paasta_service ,
464- 'spark.kubernetes.executor.label.yelp.com/paasta_instance' : paasta_instance ,
465- 'spark.kubernetes.executor.label.yelp.com/paasta_cluster' : paasta_cluster ,
466- 'spark.kubernetes.executor.label.paasta.yelp.com/service' : paasta_service ,
467- 'spark.kubernetes.executor.label.paasta.yelp.com/instance' : paasta_instance ,
468- 'spark.kubernetes.executor.label.paasta.yelp.com/cluster' : paasta_cluster ,
471+ 'spark.kubernetes.executor.label.yelp.com/paasta_service' : _paasta_service ,
472+ 'spark.kubernetes.executor.label.yelp.com/paasta_instance' : _paasta_instance ,
473+ 'spark.kubernetes.executor.label.yelp.com/paasta_cluster' : _paasta_cluster ,
474+ 'spark.kubernetes.executor.label.paasta.yelp.com/service' : _paasta_service ,
475+ 'spark.kubernetes.executor.label.paasta.yelp.com/instance' : _paasta_instance ,
476+ 'spark.kubernetes.executor.label.paasta.yelp.com/cluster' : _paasta_cluster ,
469477 'spark.kubernetes.node.selector.yelp.com/pool' : paasta_pool ,
470478 'spark.kubernetes.executor.label.yelp.com/pool' : paasta_pool ,
471479 ** _get_k8s_docker_volumes_conf (volumes ),
472480 }
473481 return spark_env
474482
475483
484+ def _get_k8s_resource_name_limit_size_with_hash (name : str , limit : int = 63 , suffix : int = 4 ) -> str :
485+ """ Returns `name` unchanged if it's length does not exceed the `limit`.
486+ Otherwise, returns truncated `name` with it's hash of size `suffix`
487+ appended.
488+
489+ base32 encoding is chosen as it satisfies the common requirement in
490+ various k8s names to be alphanumeric.
491+
492+ NOTE: This function is the same as paasta/paasta_tools/kubernetes_tools.py
493+ """
494+ if len (name ) > limit :
495+ digest = hashlib .md5 (name .encode ()).digest ()
496+ hash = base64 .b32encode (digest ).decode ().replace ('=' , '' ).lower ()
497+ return f'{ name [:(limit - suffix - 1 )]} -{ hash [:suffix ]} '
498+ else :
499+ return name
500+
501+
476502def stringify_spark_env (spark_env : Mapping [str , str ]) -> str :
477503 return ' ' .join ([f'--conf { k } ={ v } ' for k , v in spark_env .items ()])
478504
0 commit comments