This module contains functions that allow sending type-checked RunInput data
to flows at runtime. Flows can send back responses, establishing two-way
channels with senders. These functions are particularly useful for systems that
require ongoing data transfer or need to react to input quickly.
real-time interaction and efficient data handling. It's designed to facilitate
dynamic communication within distributed or microservices-oriented systems,
making it ideal for scenarios requiring continuous data synchronization and
processing. It's particularly useful for systems that require ongoing data
input and output.
The following is an example of two flows. One sends a random number to the
other and waits for a response. The other receives the number, squares it, and
sends the result back. The sender flow then prints the result.
Sender flow:
importrandomfromuuidimportUUIDfromprefectimportflow,get_run_loggerfromprefect.inputimportRunInputclassNumberData(RunInput):number:int@flowasyncdefsender_flow(receiver_flow_run_id:UUID):logger=get_run_logger()the_number=random.randint(1,100)awaitNumberData(number=the_number).send_to(receiver_flow_run_id)receiver=NumberData.receive(flow_run_id=receiver_flow_run_id)squared=awaitreceiver.next()logger.info(f"{the_number} squared is {squared.number}")
classAutomaticRunInput(RunInput,Generic[T]):value:T@classmethod@sync_compatibleasyncdefload(cls,keyset:Keyset,flow_run_id:Optional[UUID]=None)->T:""" Load the run input response from the given key. Args: - keyset (Keyset): the keyset to load the input for - flow_run_id (UUID, optional): the flow run ID to load the input for """instance=awaitsuper().load(keyset,flow_run_id=flow_run_id)returninstance.value@classmethoddefsubclass_from_type(cls,_type:Type[T])->Type["AutomaticRunInput[T]"]:""" Create a new `AutomaticRunInput` subclass from the given type. This method uses the type's name as a key prefix to identify related flow run inputs. This helps in ensuring that values saved under a type (like List[int]) are retrievable under the generic type name (like "list"). """fields:Dict[str,Any]={"value":(_type,...)}# Explanation for using getattr for type name extraction:# - "__name__": This is the usual attribute for getting the name of# most types.# - "_name": Used as a fallback, some type annotations in Python 3.9# and earlier might only have this attribute instead of __name__.# - If neither is available, defaults to an empty string to prevent# errors, but typically we should find at least one valid name# attribute. This will match all automatic inputs sent to the flow# run, rather than a specific type.## This approach ensures compatibility across Python versions and# handles various edge cases in type annotations.type_prefix:str=getattr(_type,"__name__",getattr(_type,"_name","")).lower()class_name=f"{type_prefix}AutomaticRunInput"# Creating a new Pydantic model class dynamically with the name based# on the type prefix.new_cls:Type["AutomaticRunInput"]=pydantic.create_model(class_name,**fields,__base__=AutomaticRunInput)returnnew_cls@classmethoddefreceive(cls,*args,**kwargs):ifkwargs.get("key_prefix")isNone:kwargs["key_prefix"]=f"{cls.__name__.lower()}-auto"returnGetAutomaticInputHandler(run_input_cls=cls,*args,**kwargs)
@classmethod@sync_compatibleasyncdefload(cls,keyset:Keyset,flow_run_id:Optional[UUID]=None)->T:""" Load the run input response from the given key. Args: - keyset (Keyset): the keyset to load the input for - flow_run_id (UUID, optional): the flow run ID to load the input for """instance=awaitsuper().load(keyset,flow_run_id=flow_run_id)returninstance.value
Create a new AutomaticRunInput subclass from the given type.
This method uses the type's name as a key prefix to identify related
flow run inputs. This helps in ensuring that values saved under a type
(like List[int]) are retrievable under the generic type name (like "list").
@classmethoddefsubclass_from_type(cls,_type:Type[T])->Type["AutomaticRunInput[T]"]:""" Create a new `AutomaticRunInput` subclass from the given type. This method uses the type's name as a key prefix to identify related flow run inputs. This helps in ensuring that values saved under a type (like List[int]) are retrievable under the generic type name (like "list"). """fields:Dict[str,Any]={"value":(_type,...)}# Explanation for using getattr for type name extraction:# - "__name__": This is the usual attribute for getting the name of# most types.# - "_name": Used as a fallback, some type annotations in Python 3.9# and earlier might only have this attribute instead of __name__.# - If neither is available, defaults to an empty string to prevent# errors, but typically we should find at least one valid name# attribute. This will match all automatic inputs sent to the flow# run, rather than a specific type.## This approach ensures compatibility across Python versions and# handles various edge cases in type annotations.type_prefix:str=getattr(_type,"__name__",getattr(_type,"_name","")).lower()class_name=f"{type_prefix}AutomaticRunInput"# Creating a new Pydantic model class dynamically with the name based# on the type prefix.new_cls:Type["AutomaticRunInput"]=pydantic.create_model(class_name,**fields,__base__=AutomaticRunInput)returnnew_cls
classRunInput(pydantic.BaseModel):classConfig:extra="forbid"_description:Optional[str]=pydantic.PrivateAttr(default=None)_metadata:RunInputMetadata=pydantic.PrivateAttr()@propertydefmetadata(self)->RunInputMetadata:returnself._metadata@classmethoddefkeyset_from_type(cls)->Keyset:returnkeyset_from_base_key(cls.__name__.lower())@classmethod@sync_compatibleasyncdefsave(cls,keyset:Keyset,flow_run_id:Optional[UUID]=None):""" Save the run input response to the given key. Args: - keyset (Keyset): the keyset to save the input for - flow_run_id (UUID, optional): the flow run ID to save the input for """ifHAS_PYDANTIC_V2:schema=create_v2_schema(cls.__name__,model_base=cls)else:schema=cls.schema(by_alias=True)awaitcreate_flow_run_input(key=keyset["schema"],value=schema,flow_run_id=flow_run_id)description=cls._descriptionifisinstance(cls._description,str)elseNoneifdescription:awaitcreate_flow_run_input(key=keyset["description"],value=description,flow_run_id=flow_run_id,)@classmethod@sync_compatibleasyncdefload(cls,keyset:Keyset,flow_run_id:Optional[UUID]=None):""" Load the run input response from the given key. Args: - keyset (Keyset): the keyset to load the input for - flow_run_id (UUID, optional): the flow run ID to load the input for """flow_run_id=ensure_flow_run_id(flow_run_id)value=awaitread_flow_run_input(keyset["response"],flow_run_id=flow_run_id)ifvalue:instance=cls(**value)else:instance=cls()instance._metadata=RunInputMetadata(key=keyset["response"],sender=None,receiver=flow_run_id)returninstance@classmethoddefload_from_flow_run_input(cls,flow_run_input:"FlowRunInput"):""" Load the run input from a FlowRunInput object. Args: - flow_run_input (FlowRunInput): the flow run input to load the input for """instance=cls(**flow_run_input.decoded_value)instance._metadata=RunInputMetadata(key=flow_run_input.key,sender=flow_run_input.sender,receiver=flow_run_input.flow_run_id,)returninstance@classmethoddefwith_initial_data(cls:Type[R],description:Optional[str]=None,**kwargs:Any)->Type[R]:""" Create a new `RunInput` subclass with the given initial data as field defaults. Args: - description (str, optional): a description to show when resuming a flow run that requires input - kwargs (Any): the initial data to populate the subclass """fields:Dict[str,Any]={}forkey,valueinkwargs.items():fields[key]=(type(value),value)model=pydantic.create_model(cls.__name__,**fields,__base__=cls)ifdescriptionisnotNone:model._description=descriptionreturnmodel@sync_compatibleasyncdefrespond(self,run_input:"RunInput",sender:Optional[str]=None,key_prefix:Optional[str]=None,):flow_run_id=Noneifself.metadata.senderandself.metadata.sender.startswith("prefect.flow-run"):_,_,id=self.metadata.sender.rpartition(".")flow_run_id=UUID(id)ifnotflow_run_id:raiseRuntimeError("Cannot respond to an input that was not sent by a flow run.")await_send_input(flow_run_id=flow_run_id,run_input=run_input,sender=sender,key_prefix=key_prefix,)@sync_compatibleasyncdefsend_to(self,flow_run_id:UUID,sender:Optional[str]=None,key_prefix:Optional[str]=None,):await_send_input(flow_run_id=flow_run_id,run_input=self,sender=sender,key_prefix=key_prefix,)@classmethoddefreceive(cls,timeout:Optional[float]=3600,poll_interval:float=10,raise_timeout_error:bool=False,exclude_keys:Optional[Set[str]]=None,key_prefix:Optional[str]=None,flow_run_id:Optional[UUID]=None,):ifkey_prefixisNone:key_prefix=f"{cls.__name__.lower()}-auto"returnGetInputHandler(run_input_cls=cls,key_prefix=key_prefix,timeout=timeout,poll_interval=poll_interval,raise_timeout_error=raise_timeout_error,exclude_keys=exclude_keys,flow_run_id=flow_run_id,)@classmethoddefsubclass_from_base_model_type(cls,model_cls:Type[pydantic.BaseModel])->Type["RunInput"]:""" Create a new `RunInput` subclass from the given `pydantic.BaseModel` subclass. Args: - model_cls (pydantic.BaseModel subclass): the class from which to create the new `RunInput` subclass """returntype(f"{model_cls.__name__}RunInput",(RunInput,model_cls),{})# type: ignore
@classmethod@sync_compatibleasyncdefload(cls,keyset:Keyset,flow_run_id:Optional[UUID]=None):""" Load the run input response from the given key. Args: - keyset (Keyset): the keyset to load the input for - flow_run_id (UUID, optional): the flow run ID to load the input for """flow_run_id=ensure_flow_run_id(flow_run_id)value=awaitread_flow_run_input(keyset["response"],flow_run_id=flow_run_id)ifvalue:instance=cls(**value)else:instance=cls()instance._metadata=RunInputMetadata(key=keyset["response"],sender=None,receiver=flow_run_id)returninstance
@classmethoddefload_from_flow_run_input(cls,flow_run_input:"FlowRunInput"):""" Load the run input from a FlowRunInput object. Args: - flow_run_input (FlowRunInput): the flow run input to load the input for """instance=cls(**flow_run_input.decoded_value)instance._metadata=RunInputMetadata(key=flow_run_input.key,sender=flow_run_input.sender,receiver=flow_run_input.flow_run_id,)returninstance
@classmethod@sync_compatibleasyncdefsave(cls,keyset:Keyset,flow_run_id:Optional[UUID]=None):""" Save the run input response to the given key. Args: - keyset (Keyset): the keyset to save the input for - flow_run_id (UUID, optional): the flow run ID to save the input for """ifHAS_PYDANTIC_V2:schema=create_v2_schema(cls.__name__,model_base=cls)else:schema=cls.schema(by_alias=True)awaitcreate_flow_run_input(key=keyset["schema"],value=schema,flow_run_id=flow_run_id)description=cls._descriptionifisinstance(cls._description,str)elseNoneifdescription:awaitcreate_flow_run_input(key=keyset["description"],value=description,flow_run_id=flow_run_id,)
Create a new RunInput subclass from the given pydantic.BaseModel
subclass.
Parameters:
Name
Type
Description
Default
-
model_cls (pydantic.BaseModel subclass
the class from which
to create the new RunInput subclass
required
Source code in src/prefect/input/run_input.py
309310311312313314315316317318319320321
@classmethoddefsubclass_from_base_model_type(cls,model_cls:Type[pydantic.BaseModel])->Type["RunInput"]:""" Create a new `RunInput` subclass from the given `pydantic.BaseModel` subclass. Args: - model_cls (pydantic.BaseModel subclass): the class from which to create the new `RunInput` subclass """returntype(f"{model_cls.__name__}RunInput",(RunInput,model_cls),{})# type: ignore
@classmethoddefwith_initial_data(cls:Type[R],description:Optional[str]=None,**kwargs:Any)->Type[R]:""" Create a new `RunInput` subclass with the given initial data as field defaults. Args: - description (str, optional): a description to show when resuming a flow run that requires input - kwargs (Any): the initial data to populate the subclass """fields:Dict[str,Any]={}forkey,valueinkwargs.items():fields[key]=(type(value),value)model=pydantic.create_model(cls.__name__,**fields,__base__=cls)ifdescriptionisnotNone:model._description=descriptionreturnmodel
defkeyset_from_base_key(base_key:str)->Keyset:""" Get the keyset for the given base key. Args: - base_key (str): the base key to get the keyset for Returns: - Dict[str, str]: the keyset """return{"description":f"{base_key}-description","response":f"{base_key}-response","schema":f"{base_key}-schema",}
defkeyset_from_paused_state(state:"State")->Keyset:""" Get the keyset for the given Paused state. Args: - state (State): the state to get the keyset for """ifnotstate.is_paused():raiseRuntimeError(f"{state.type.value!r} is unsupported.")state_name=state.nameor""base_key=f"{state_name.lower()}-{str(state.state_details.pause_key)}"returnkeyset_from_base_key(base_key)
defrun_input_subclass_from_type(_type:Union[Type[R],Type[T],pydantic.BaseModel],)->Union[Type[AutomaticRunInput[T]],Type[R]]:""" Create a new `RunInput` subclass from the given type. """ifisclass(_type):ifissubclass(_type,RunInput):returncast(Type[R],_type)elifissubclass(_type,pydantic.BaseModel):returncast(Type[R],RunInput.subclass_from_base_model_type(_type))# Could be something like a typing._GenericAlias or any other type that# isn't a `RunInput` subclass or `pydantic.BaseModel` subclass. Try passing# it to AutomaticRunInput to see if we can create a model from it.returncast(Type[AutomaticRunInput[T]],AutomaticRunInput.subclass_from_type(cast(Type[T],_type)),)