If you are using Prefect Cloud and would like to pass your Prefect Cloud API key to
created jobs via a Kubernetes secret, set the
PREFECT_KUBERNETES_WORKER_STORE_PREFECT_API_IN_SECRET environment variable before
starting your worker:
Each values enclosed in {{ }} is a placeholder that will be replaced with
a value at runtime. The values that can be used a placeholders are defined
by the variables schema defined in the base job template.
The default job manifest and available variables can be customized on a work pool
by work pool basis. These customizations can be made via the Prefect UI when
creating or editing a work pool.
For example, if you wanted to allow custom memory requests for a Kubernetes work
pool you could update the job manifest template to look like this:
In this new template, the memory placeholder allows customization of the memory
allocated to Kubernetes jobs created by workers in this work pool, but the limit
is hard-coded and cannot be changed by deployments.
For more information about work pools and workers,
checkout out the Prefect docs.
Enum representing the image pull policy options for a Kubernetes job.
Source code in prefect_kubernetes/worker.py
232233234235236237
classKubernetesImagePullPolicy(enum.Enum):"""Enum representing the image pull policy options for a Kubernetes job."""IF_NOT_PRESENT="IfNotPresent"ALWAYS="Always"NEVER="Never"
classKubernetesWorker(BaseWorker):"""Prefect worker that executes flow runs within Kubernetes Jobs."""type="kubernetes"job_configuration=KubernetesWorkerJobConfigurationjob_configuration_variables=KubernetesWorkerVariables_description=("Execute flow runs within jobs scheduled on a Kubernetes cluster. Requires a ""Kubernetes cluster.")_display_name="Kubernetes"_documentation_url="https://prefecthq.github.io/prefect-kubernetes/worker/"_logo_url="https://cdn.sanity.io/images/3ugk85nk/production/2d0b896006ad463b49c28aaac14f31e00e32cfab-250x250.png"# noqadef__init__(self,*args,**kwargs):super().__init__(*args,**kwargs)self._created_secrets={}asyncdefrun(self,flow_run:"FlowRun",configuration:KubernetesWorkerJobConfiguration,task_status:Optional[anyio.abc.TaskStatus]=None,)->KubernetesWorkerResult:""" Executes a flow run within a Kubernetes Job and waits for the flow run to complete. Args: flow_run: The flow run to execute configuration: The configuration to use when executing the flow run. task_status: The task status object for the current flow run. If provided, the task will be marked as started. Returns: KubernetesWorkerResult: A result object containing information about the final state of the flow run """logger=self.get_flow_run_logger(flow_run)withself._get_configured_kubernetes_client(configuration)asclient:logger.info("Creating Kubernetes job...")job=awaitrun_sync_in_worker_thread(self._create_job,configuration,client)pid=awaitrun_sync_in_worker_thread(self._get_infrastructure_pid,job,client)# Indicate that the job has startediftask_statusisnotNone:task_status.started(pid)# Monitor the job until completionevents_replicator=KubernetesEventsReplicator(client=client,job_name=job.metadata.name,namespace=configuration.namespace,worker_resource=self._event_resource(),related_resources=self._event_related_resources(configuration=configuration),timeout_seconds=configuration.pod_watch_timeout_seconds,)withevents_replicator:status_code=awaitrun_sync_in_worker_thread(self._watch_job,logger,job.metadata.name,configuration,client)returnKubernetesWorkerResult(identifier=pid,status_code=status_code)asyncdefkill_infrastructure(self,infrastructure_pid:str,configuration:KubernetesWorkerJobConfiguration,grace_seconds:int=30,):""" Stops a job for a cancelled flow run based on the provided infrastructure PID and run configuration. """awaitrun_sync_in_worker_thread(self._stop_job,infrastructure_pid,configuration,grace_seconds)asyncdefteardown(self,*exc_info):awaitsuper().teardown(*exc_info)awaitself._clean_up_created_secrets()asyncdef_clean_up_created_secrets(self):"""Deletes any secrets created during the worker's operation."""coros=[]forkey,configurationinself._created_secrets.items():withself._get_configured_kubernetes_client(configuration)asclient:withself._get_core_client(client)ascore_client:coros.append(run_sync_in_worker_thread(core_client.delete_namespaced_secret,name=key[0],namespace=key[1],))results=awaitasyncio.gather(*coros,return_exceptions=True)forresultinresults:ifisinstance(result,Exception):self._logger.warning("Failed to delete created secret with exception: %s",result)def_stop_job(self,infrastructure_pid:str,configuration:KubernetesWorkerJobConfiguration,grace_seconds:int=30,):"""Removes the given Job from the Kubernetes cluster"""withself._get_configured_kubernetes_client(configuration)asclient:job_cluster_uid,job_namespace,job_name=self._parse_infrastructure_pid(infrastructure_pid)ifjob_namespace!=configuration.namespace:raiseInfrastructureNotAvailable(f"Unable to kill job {job_name!r}: The job is running in namespace "f"{job_namespace!r} but this worker expected jobs to be running in "f"namespace {configuration.namespace!r} based on the work pool and ""deployment configuration.")current_cluster_uid=self._get_cluster_uid(client)ifjob_cluster_uid!=current_cluster_uid:raiseInfrastructureNotAvailable(f"Unable to kill job {job_name!r}: The job is running on another ""cluster than the one specified by the infrastructure PID.")withself._get_batch_client(client)asbatch_client:try:batch_client.delete_namespaced_job(name=job_name,namespace=job_namespace,grace_period_seconds=grace_seconds,# Foreground propagation deletes dependent objects before deleting # noqa# owner objects. This ensures that the pods are cleaned up before # noqa# the job is marked as deleted.# See: https://kubernetes.io/docs/concepts/architecture/garbage-collection/#foreground-deletion # noqapropagation_policy="Foreground",)exceptkubernetes.client.exceptions.ApiExceptionasexc:ifexc.status==404:raiseInfrastructureNotFound(f"Unable to kill job {job_name!r}: The job was not found.")fromexcelse:raise@contextmanagerdef_get_configured_kubernetes_client(self,configuration:KubernetesWorkerJobConfiguration)->Generator["ApiClient",None,None]:""" Returns a configured Kubernetes client. """try:ifconfiguration.cluster_config:client=kubernetes.config.new_client_from_config_dict(config_dict=configuration.cluster_config.config,context=configuration.cluster_config.context_name,)else:# If no hardcoded config specified, try to load Kubernetes configuration# within a cluster. If that doesn't work, try to load the configuration# from the local environment, allowing any further ConfigExceptions to# bubble up.try:kubernetes.config.load_incluster_config()config=kubernetes.client.Configuration.get_default_copy()client=kubernetes.client.ApiClient(configuration=config)exceptkubernetes.config.ConfigException:client=kubernetes.config.new_client_from_config()ifos.environ.get("PREFECT_KUBERNETES_WORKER_ADD_TCP_KEEPALIVE","TRUE").strip().lower()in("true","1"):enable_socket_keep_alive(client)yieldclientfinally:client.rest_client.pool_manager.clear()def_replace_api_key_with_secret(self,configuration:KubernetesWorkerJobConfiguration,client:"ApiClient"):"""Replaces the PREFECT_API_KEY environment variable with a Kubernetes secret"""manifest_env=configuration.job_manifest["spec"]["template"]["spec"]["containers"][0].get("env")manifest_api_key_env=next((env_entryforenv_entryinmanifest_envifenv_entry.get("name")=="PREFECT_API_KEY"),{},)api_key=manifest_api_key_env.get("value")ifapi_key:secret_name=f"prefect-{_slugify_name(self.name)}-api-key"secret=self._upsert_secret(name=secret_name,value=api_key,namespace=configuration.namespace,client=client,)# Store configuration so that we can delete the secret when the worker shuts# downself._created_secrets[(secret.metadata.name,secret.metadata.namespace)]=configurationnew_api_env_entry={"name":"PREFECT_API_KEY","valueFrom":{"secretKeyRef":{"name":secret_name,"key":"value"}},}manifest_env=[entryifentry.get("name")!="PREFECT_API_KEY"elsenew_api_env_entryforentryinmanifest_env]configuration.job_manifest["spec"]["template"]["spec"]["containers"][0]["env"]=manifest_env@retry(stop=stop_after_attempt(MAX_ATTEMPTS),wait=wait_fixed(RETRY_MIN_DELAY_SECONDS)+wait_random(RETRY_MIN_DELAY_JITTER_SECONDS,RETRY_MAX_DELAY_JITTER_SECONDS,),reraise=True,)def_create_job(self,configuration:KubernetesWorkerJobConfiguration,client:"ApiClient")->"V1Job":""" Creates a Kubernetes job from a job manifest. """ifos.environ.get("PREFECT_KUBERNETES_WORKER_STORE_PREFECT_API_IN_SECRET","").strip().lower()in("true","1"):self._replace_api_key_with_secret(configuration=configuration,client=client)try:withself._get_batch_client(client)asbatch_client:job=batch_client.create_namespaced_job(configuration.namespace,configuration.job_manifest)exceptkubernetes.client.exceptions.ApiExceptionasexc:# Parse the reason and message from the response if feasiblemessage=""ifexc.reason:message+=": "+exc.reasonifexc.bodyand"message"in(body:=json.loads(exc.body)):message+=": "+body["message"]raiseInfrastructureError(f"Unable to create Kubernetes job{message}")fromexcreturnjobdef_upsert_secret(self,name:str,value:str,namespace:str,client:"ApiClient"):encoded_value=base64.b64encode(value.encode("utf-8")).decode("utf-8")withself._get_core_client(client)ascore_client:try:# Get the current version of the Secret and update it with the# new valuecurrent_secret=core_client.read_namespaced_secret(name=name,namespace=namespace)current_secret.data={"value":encoded_value}secret=core_client.replace_namespaced_secret(name=name,namespace=namespace,body=current_secret)exceptApiExceptionasexc:ifexc.status!=404:raise# Create the secret if it doesn't already existmetadata=V1ObjectMeta(name=name,namespace=namespace)secret=V1Secret(api_version="v1",kind="Secret",metadata=metadata,data={"value":encoded_value},)secret=core_client.create_namespaced_secret(namespace=namespace,body=secret)returnsecret@contextmanagerdef_get_batch_client(self,client:"ApiClient")->Generator["BatchV1Api",None,None]:""" Context manager for retrieving a Kubernetes batch client. """try:yieldkubernetes.client.BatchV1Api(api_client=client)finally:client.rest_client.pool_manager.clear()def_get_infrastructure_pid(self,job:"V1Job",client:"ApiClient")->str:""" Generates a Kubernetes infrastructure PID. The PID is in the format: "<cluster uid>:<namespace>:<job name>". """cluster_uid=self._get_cluster_uid(client)pid=f"{cluster_uid}:{job.metadata.namespace}:{job.metadata.name}"returnpiddef_parse_infrastructure_pid(self,infrastructure_pid:str)->Tuple[str,str,str]:""" Parse a Kubernetes infrastructure PID into its component parts. Returns a cluster UID, namespace, and job name. """cluster_uid,namespace,job_name=infrastructure_pid.split(":",2)returncluster_uid,namespace,job_name@contextmanagerdef_get_core_client(self,client:"ApiClient")->Generator["CoreV1Api",None,None]:""" Context manager for retrieving a Kubernetes core client. """try:yieldkubernetes.client.CoreV1Api(api_client=client)finally:client.rest_client.pool_manager.clear()def_get_cluster_uid(self,client:"ApiClient")->str:""" Gets a unique id for the current cluster being used. There is no real unique identifier for a cluster. However, the `kube-system` namespace is immutable and has a persistence UID that we use instead. PREFECT_KUBERNETES_CLUSTER_UID can be set in cases where the `kube-system` namespace cannot be read e.g. when a cluster role cannot be created. If set, this variable will be used and we will not attempt to read the `kube-system` namespace. See https://github.com/kubernetes/kubernetes/issues/44954 """# Default to an environment variableenv_cluster_uid=os.environ.get("PREFECT_KUBERNETES_CLUSTER_UID")ifenv_cluster_uid:returnenv_cluster_uid# Read the UID from the cluster namespacewithself._get_core_client(client)ascore_client:namespace=core_client.read_namespace("kube-system")cluster_uid=namespace.metadata.uidreturncluster_uiddef_job_events(self,watch:kubernetes.watch.Watch,batch_client:kubernetes.client.BatchV1Api,job_name:str,namespace:str,watch_kwargs:dict,)->Generator[Union[Any,dict,str],Any,None]:""" Stream job events. Pick up from the current resource version returned by the API in the case of a 410. See https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes # noqa """whileTrue:try:returnwatch.stream(func=batch_client.list_namespaced_job,namespace=namespace,field_selector=f"metadata.name={job_name}",**watch_kwargs,)exceptApiExceptionase:ife.status==410:job_list=batch_client.list_namespaced_job(namespace=namespace,field_selector=f"metadata.name={job_name}")resource_version=job_list.metadata.resource_versionwatch_kwargs["resource_version"]=resource_versionelse:raisedef_watch_job(self,logger:logging.Logger,job_name:str,configuration:KubernetesWorkerJobConfiguration,client:"ApiClient",)->int:""" Watch a job. Return the final status code of the first container. """logger.debug(f"Job {job_name!r}: Monitoring job...")job=self._get_job(logger,job_name,configuration,client)ifnotjob:return-1pod=self._get_job_pod(logger,job_name,configuration,client)ifnotpod:return-1# Calculate the deadline before streaming outputdeadline=((time.monotonic()+configuration.job_watch_timeout_seconds)ifconfiguration.job_watch_timeout_secondsisnotNoneelseNone)ifconfiguration.stream_output:withself._get_core_client(client)ascore_client:logs=core_client.read_namespaced_pod_log(pod.metadata.name,configuration.namespace,follow=True,_preload_content=False,container="prefect-job",)try:forloginlogs.stream():print(log.decode().rstrip())# Check if we have passed the deadline and should stop streaming# logsremaining_time=(deadline-time.monotonic()ifdeadlineelseNone)ifdeadlineandremaining_time<=0:breakexceptException:logger.warning(("Error occurred while streaming logs - ""Job will continue to run but logs will ""no longer be streamed to stdout."),exc_info=True,)withself._get_batch_client(client)asbatch_client:# Check if the job is completed before beginning a watchjob=batch_client.read_namespaced_job(name=job_name,namespace=configuration.namespace)completed=job.status.completion_timeisnotNonewhilenotcompleted:remaining_time=(math.ceil(deadline-time.monotonic())ifdeadlineelseNone)ifdeadlineandremaining_time<=0:logger.error(f"Job {job_name!r}: Job did not complete within "f"timeout of {configuration.job_watch_timeout_seconds}s.")return-1watch=kubernetes.watch.Watch()# The kubernetes library will disable retries if the timeout kwarg is# present regardless of the value so we do not pass it unless given# https://github.com/kubernetes-client/python/blob/84f5fea2a3e4b161917aa597bf5e5a1d95e24f5a/kubernetes/base/watch/watch.py#LL160watch_kwargs={"timeout_seconds":remaining_time}ifdeadlineelse{}foreventinself._job_events(watch,batch_client,job_name,configuration.namespace,watch_kwargs,):ifevent["type"]=="DELETED":logger.error(f"Job {job_name!r}: Job has been deleted.")completed=Trueelifevent["object"].status.completion_time:ifnotevent["object"].status.succeeded:# Job failed, exit while loop and return pod exit codelogger.error(f"Job {job_name!r}: Job failed.")completed=True# Check if the job has reached its backoff limit# and stop watching if it haselif(event["object"].spec.backoff_limitisnotNoneandevent["object"].status.failedisnotNoneandevent["object"].status.failed>event["object"].spec.backoff_limit):logger.error(f"Job {job_name!r}: Job reached backoff limit.")completed=True# If the job has no backoff limit, check if it has failed# and stop watching if it haselif(notevent["object"].spec.backoff_limitandevent["object"].status.failed):completed=Trueifcompleted:watch.stop()breakwithself._get_core_client(client)ascore_client:# Get all pods for the jobpods=core_client.list_namespaced_pod(namespace=configuration.namespace,label_selector=f"job-name={job_name}")# Get the status for only the most recently used podpods.items.sort(key=lambdapod:pod.metadata.creation_timestamp,reverse=True)most_recent_pod=pods.items[0]ifpods.itemselseNonefirst_container_status=(most_recent_pod.status.container_statuses[0]ifmost_recent_podelseNone)ifnotfirst_container_status:logger.error(f"Job {job_name!r}: No pods found for job.")return-1# In some cases, such as spot instance evictions, the pod will be forcibly# terminated and not report a status correctly.elif(first_container_status.stateisNoneorfirst_container_status.state.terminatedisNoneorfirst_container_status.state.terminated.exit_codeisNone):logger.error(f"Could not determine exit code for {job_name!r}.""Exit code will be reported as -1."f"First container status info did not report an exit code."f"First container info: {first_container_status}.")return-1returnfirst_container_status.state.terminated.exit_codedef_get_job(self,logger:logging.Logger,job_id:str,configuration:KubernetesWorkerJobConfiguration,client:"ApiClient",)->Optional["V1Job"]:"""Get a Kubernetes job by id."""withself._get_batch_client(client)asbatch_client:try:job=batch_client.read_namespaced_job(name=job_id,namespace=configuration.namespace)exceptkubernetes.client.exceptions.ApiException:logger.error(f"Job {job_id!r} was removed.",exc_info=True)returnNonereturnjobdef_get_job_pod(self,logger:logging.Logger,job_name:str,configuration:KubernetesWorkerJobConfiguration,client:"ApiClient",)->Optional["V1Pod"]:"""Get the first running pod for a job."""fromkubernetes.client.modelsimportV1Podwatch=kubernetes.watch.Watch()logger.debug(f"Job {job_name!r}: Starting watch for pod start...")last_phase=Nonelast_pod_name:Optional[str]=Nonewithself._get_core_client(client)ascore_client:foreventinwatch.stream(func=core_client.list_namespaced_pod,namespace=configuration.namespace,label_selector=f"job-name={job_name}",timeout_seconds=configuration.pod_watch_timeout_seconds,):pod:V1Pod=event["object"]last_pod_name=pod.metadata.namephase=pod.status.phaseifphase!=last_phase:logger.info(f"Job {job_name!r}: Pod has status {phase!r}.")ifphase!="Pending":watch.stop()returnpodlast_phase=phase# If we've gotten here, we never found the Pod that was created for the flow run# Job, so let's inspect the situation and log what we can find. It's possible# that the Job ran into scheduling constraints it couldn't satisfy, like# memory/CPU requests, or a volume that wasn't available, or a node with an# available GPU.logger.error(f"Job {job_name!r}: Pod never started.")self._log_recent_events(logger,job_name,last_pod_name,configuration,client)def_log_recent_events(self,logger:logging.Logger,job_name:str,pod_name:Optional[str],configuration:KubernetesWorkerJobConfiguration,client:"ApiClient",)->None:"""Look for reasons why a Job may not have been able to schedule a Pod, or why a Pod may not have been able to start and log them to the provided logger."""fromkubernetes.client.modelsimportCoreV1Event,CoreV1EventListdefbest_event_time(event:CoreV1Event)->datetime:"""Choose the best timestamp from a Kubernetes event"""returnevent.event_timeorevent.last_timestampdeflog_event(event:CoreV1Event):"""Log an event in one of a few formats to the provided logger"""ifevent.countandevent.count>1:logger.info("%s event %r (%s times) at %s: %s",event.involved_object.kind,event.reason,event.count,best_event_time(event),event.message,)else:logger.info("%s event %r at %s: %s",event.involved_object.kind,event.reason,best_event_time(event),event.message,)withself._get_core_client(client)ascore_client:events:CoreV1EventList=core_client.list_namespaced_event(configuration.namespace)event:CoreV1Eventforeventinsorted(events.items,key=best_event_time):if(event.involved_object.api_version=="batch/v1"andevent.involved_object.kind=="Job"andevent.involved_object.namespace==configuration.namespaceandevent.involved_object.name==job_name):log_event(event)if(pod_nameandevent.involved_object.api_version=="v1"andevent.involved_object.kind=="Pod"andevent.involved_object.namespace==configuration.namespaceandevent.involved_object.name==pod_name):log_event(event)
Stops a job for a cancelled flow run based on the provided infrastructure PID
and run configuration.
Source code in prefect_kubernetes/worker.py
607608609610611612613614615616617618619
asyncdefkill_infrastructure(self,infrastructure_pid:str,configuration:KubernetesWorkerJobConfiguration,grace_seconds:int=30,):""" Stops a job for a cancelled flow run based on the provided infrastructure PID and run configuration. """awaitrun_sync_in_worker_thread(self._stop_job,infrastructure_pid,configuration,grace_seconds)
asyncdefrun(self,flow_run:"FlowRun",configuration:KubernetesWorkerJobConfiguration,task_status:Optional[anyio.abc.TaskStatus]=None,)->KubernetesWorkerResult:""" Executes a flow run within a Kubernetes Job and waits for the flow run to complete. Args: flow_run: The flow run to execute configuration: The configuration to use when executing the flow run. task_status: The task status object for the current flow run. If provided, the task will be marked as started. Returns: KubernetesWorkerResult: A result object containing information about the final state of the flow run """logger=self.get_flow_run_logger(flow_run)withself._get_configured_kubernetes_client(configuration)asclient:logger.info("Creating Kubernetes job...")job=awaitrun_sync_in_worker_thread(self._create_job,configuration,client)pid=awaitrun_sync_in_worker_thread(self._get_infrastructure_pid,job,client)# Indicate that the job has startediftask_statusisnotNone:task_status.started(pid)# Monitor the job until completionevents_replicator=KubernetesEventsReplicator(client=client,job_name=job.metadata.name,namespace=configuration.namespace,worker_resource=self._event_resource(),related_resources=self._event_related_resources(configuration=configuration),timeout_seconds=configuration.pod_watch_timeout_seconds,)withevents_replicator:status_code=awaitrun_sync_in_worker_thread(self._watch_job,logger,job.metadata.name,configuration,client)returnKubernetesWorkerResult(identifier=pid,status_code=status_code)
Configuration class used by the Kubernetes worker.
An instance of this class is passed to the Kubernetes worker's run method
for each flow run. It contains all of the information necessary to execute
the flow run as a Kubernetes job.
Attributes:
Name
Type
Description
name
The name to give to created Kubernetes job.
command
The command executed in created Kubernetes jobs to kick off
flow run execution.
env
The environment variables to set in created Kubernetes jobs.
labels
The labels to set on created Kubernetes jobs.
namespace
str
The Kubernetes namespace to create Kubernetes jobs in.
job_manifest
Dict[str, Any]
The Kubernetes job manifest to use to create Kubernetes jobs.
classKubernetesWorkerJobConfiguration(BaseJobConfiguration):""" Configuration class used by the Kubernetes worker. An instance of this class is passed to the Kubernetes worker's `run` method for each flow run. It contains all of the information necessary to execute the flow run as a Kubernetes job. Attributes: name: The name to give to created Kubernetes job. command: The command executed in created Kubernetes jobs to kick off flow run execution. env: The environment variables to set in created Kubernetes jobs. labels: The labels to set on created Kubernetes jobs. namespace: The Kubernetes namespace to create Kubernetes jobs in. job_manifest: The Kubernetes job manifest to use to create Kubernetes jobs. cluster_config: The Kubernetes cluster configuration to use for authentication to a Kubernetes cluster. job_watch_timeout_seconds: The number of seconds to wait for the job to complete before timing out. If `None`, the worker will wait indefinitely. pod_watch_timeout_seconds: The number of seconds to wait for the pod to complete before timing out. stream_output: Whether or not to stream the job's output. """namespace:str=Field(default="default")job_manifest:Dict[str,Any]=Field(template=_get_default_job_manifest_template())cluster_config:Optional[KubernetesClusterConfig]=Field(default=None)job_watch_timeout_seconds:Optional[int]=Field(default=None)pod_watch_timeout_seconds:int=Field(default=60)stream_output:bool=Field(default=True)# internal-use only_api_dns_name:Optional[str]=None# Replaces 'localhost' in API URL@validator("job_manifest")def_ensure_metadata_is_present(cls,value:Dict[str,Any]):"""Ensures that the metadata is present in the job manifest."""if"metadata"notinvalue:value["metadata"]={}returnvalue@validator("job_manifest")def_ensure_labels_is_present(cls,value:Dict[str,Any]):"""Ensures that the metadata is present in the job manifest."""if"labels"notinvalue["metadata"]:value["metadata"]["labels"]={}returnvalue@validator("job_manifest")def_ensure_namespace_is_present(cls,value:Dict[str,Any],values):"""Ensures that the namespace is present in the job manifest."""if"namespace"notinvalue["metadata"]:value["metadata"]["namespace"]=values["namespace"]returnvalue@validator("job_manifest")def_ensure_job_includes_all_required_components(cls,value:Dict[str,Any]):""" Ensures that the job manifest includes all required components. """patch=JsonPatch.from_diff(value,_get_base_job_manifest())missing_paths=sorted([op["path"]foropinpatchifop["op"]=="add"])ifmissing_paths:raiseValueError("Job is missing required attributes at the following paths: "f"{', '.join(missing_paths)}")returnvalue@validator("job_manifest")def_ensure_job_has_compatible_values(cls,value:Dict[str,Any]):patch=JsonPatch.from_diff(value,_get_base_job_manifest())incompatible=sorted([f"{op['path']} must have value {op['value']!r}"foropinpatchifop["op"]=="replace"])ifincompatible:raiseValueError("Job has incompatible values for the following attributes: "f"{', '.join(incompatible)}")returnvaluedefprepare_for_flow_run(self,flow_run:"FlowRun",deployment:Optional["DeploymentResponse"]=None,flow:Optional["Flow"]=None,):""" Prepares the job configuration for a flow run. Ensures that necessary values are present in the job manifest and that the job manifest is valid. Args: flow_run: The flow run to prepare the job configuration for deployment: The deployment associated with the flow run used for preparation. flow: The flow associated with the flow run used for preparation. """super().prepare_for_flow_run(flow_run,deployment,flow)# Update configuration env and job manifest envself._update_prefect_api_url_if_local_server()self._populate_env_in_manifest()# Update labels in job manifestself._slugify_labels()# Add defaults to job manifest if necessaryself._populate_image_if_not_present()self._populate_command_if_not_present()self._populate_generate_name_if_not_present()def_populate_env_in_manifest(self):""" Populates environment variables in the job manifest. When `env` is templated as a variable in the job manifest it comes in as a dictionary. We need to convert it to a list of dictionaries to conform to the Kubernetes job manifest schema. This function also handles the case where the user has removed the `{{ env }}` placeholder and hard coded a value for `env`. In this case, we need to prepend our environment variables to the list to ensure Prefect setting propagation. An example reason the a user would remove the `{{ env }}` placeholder to hardcode Kubernetes secrets in the base job template. """transformed_env=[{"name":k,"value":v}fork,vinself.env.items()]template_env=self.job_manifest["spec"]["template"]["spec"]["containers"][0].get("env")# If user has removed `{{ env }}` placeholder and hard coded a value for `env`,# we need to prepend our environment variables to the list to ensure Prefect# setting propagation.ifisinstance(template_env,list):self.job_manifest["spec"]["template"]["spec"]["containers"][0]["env"]=[*transformed_env,*template_env,]# Current templating adds `env` as a dict when the kubernetes manifest requires# a list of dicts. Might be able to improve this in the future with a better# default `env` value and better typing.else:self.job_manifest["spec"]["template"]["spec"]["containers"][0]["env"]=transformed_envdef_update_prefect_api_url_if_local_server(self):"""If the API URL has been set by the base environment rather than the by the user, update the value to ensure connectivity when using a bridge network by updating local connections to use the internal host """ifself.env.get("PREFECT_API_URL")andself._api_dns_name:self.env["PREFECT_API_URL"]=(self.env["PREFECT_API_URL"].replace("localhost",self._api_dns_name).replace("127.0.0.1",self._api_dns_name))def_slugify_labels(self):"""Slugifies the labels in the job manifest."""all_labels={**self.job_manifest["metadata"].get("labels",{}),**self.labels}self.job_manifest["metadata"]["labels"]={_slugify_label_key(k):_slugify_label_value(v)fork,vinall_labels.items()}def_populate_image_if_not_present(self):"""Ensures that the image is present in the job manifest. Populates the image with the default Prefect image if it is not present."""try:if("image"notinself.job_manifest["spec"]["template"]["spec"]["containers"][0]):self.job_manifest["spec"]["template"]["spec"]["containers"][0]["image"]=get_prefect_image_name()exceptKeyError:raiseValueError("Unable to verify image due to invalid job manifest template.")def_populate_command_if_not_present(self):""" Ensures that the command is present in the job manifest. Populates the command with the `prefect -m prefect.engine` if a command is not present. """try:command=self.job_manifest["spec"]["template"]["spec"]["containers"][0].get("args")ifcommandisNone:self.job_manifest["spec"]["template"]["spec"]["containers"][0]["args"]=shlex.split(self._base_flow_run_command())elifisinstance(command,str):self.job_manifest["spec"]["template"]["spec"]["containers"][0]["args"]=shlex.split(command)elifnotisinstance(command,list):raiseValueError("Invalid job manifest template: 'command' must be a string or list.")exceptKeyError:raiseValueError("Unable to verify command due to invalid job manifest template.")def_populate_generate_name_if_not_present(self):"""Ensures that the generateName is present in the job manifest."""manifest_generate_name=self.job_manifest["metadata"].get("generateName","")has_placeholder=len(find_placeholders(manifest_generate_name))>0# if name wasn't present during template rendering, generateName will be# just a hyphenmanifest_generate_name_templated_with_empty_string=(manifest_generate_name=="-")if(notmanifest_generate_nameorhas_placeholderormanifest_generate_name_templated_with_empty_string):generate_name=Noneifself.name:generate_name=_slugify_name(self.name)# _slugify_name will return None if the slugified name in an exceptionifnotgenerate_name:generate_name="prefect-job"self.job_manifest["metadata"]["generateName"]=f"{generate_name}-"
defprepare_for_flow_run(self,flow_run:"FlowRun",deployment:Optional["DeploymentResponse"]=None,flow:Optional["Flow"]=None,):""" Prepares the job configuration for a flow run. Ensures that necessary values are present in the job manifest and that the job manifest is valid. Args: flow_run: The flow run to prepare the job configuration for deployment: The deployment associated with the flow run used for preparation. flow: The flow associated with the flow run used for preparation. """super().prepare_for_flow_run(flow_run,deployment,flow)# Update configuration env and job manifest envself._update_prefect_api_url_if_local_server()self._populate_env_in_manifest()# Update labels in job manifestself._slugify_labels()# Add defaults to job manifest if necessaryself._populate_image_if_not_present()self._populate_command_if_not_present()self._populate_generate_name_if_not_present()
classKubernetesWorkerVariables(BaseVariables):""" Default variables for the Kubernetes worker. The schema for this class is used to populate the `variables` section of the default base job template. """namespace:str=Field(default="default",description="The Kubernetes namespace to create jobs within.")image:Optional[str]=Field(default=None,description="The image reference of a container image to use for created jobs. ""If not set, the latest Prefect image will be used.",example="docker.io/prefecthq/prefect:2-latest",)service_account_name:Optional[str]=Field(default=None,description="The Kubernetes service account to use for job creation.",)image_pull_policy:Literal["IfNotPresent","Always","Never"]=Field(default=KubernetesImagePullPolicy.IF_NOT_PRESENT,description="The Kubernetes image pull policy to use for job containers.",)finished_job_ttl:Optional[int]=Field(default=None,title="Finished Job TTL",description="The number of seconds to retain jobs after completion. If set, ""finished jobs will be cleaned up by Kubernetes after the given delay. If not ""set, jobs will be retained indefinitely.",)job_watch_timeout_seconds:Optional[int]=Field(default=None,description=("Number of seconds to wait for each event emitted by a job before ""timing out. If not set, the worker will wait for each event indefinitely."),)pod_watch_timeout_seconds:int=Field(default=60,description="Number of seconds to watch for pod creation before timing out.",)stream_output:bool=Field(default=True,description=("If set, output will be streamed from the job to local standard output."),)cluster_config:Optional[KubernetesClusterConfig]=Field(default=None,description="The Kubernetes cluster config to use for job creation.",)