Staging

Testing the decorators to help orchestrate caching and input/output passing through record state between stages.

Exceptions:

CachersMismatchError

EmptyCachersError

ExecutingWithSkippedInputError

InputSignatureError

OutputSignatureError

Classes:

SkippedOutput(record, stage_name, artifact_rep)

An object placed into record state when a stage’s execution is skipped based on the DAG.

Functions:

aggregate([inputs, outputs, cachers])

Decorator to wrap around a function that represents some step that must operate across multiple different parameter sets or “execution chains” within an experiment.

stage([inputs, outputs, cachers, …])

Decorator to wrap around a function that represents a single step in an experiment, a block with inputs and outputs pertaining to the remainder of that experiment.

exception curifactory.staging.CachersMismatchError
exception curifactory.staging.EmptyCachersError
exception curifactory.staging.ExecutingWithSkippedInputError
exception curifactory.staging.InputSignatureError
exception curifactory.staging.OutputSignatureError
class curifactory.staging.SkippedOutput(record: curifactory.record.Record, stage_name: str, artifact_rep: curifactory.record.ArtifactRepresentation)

An object placed into record state when a stage’s execution is skipped based on the DAG. This is so that the inputs check at the beginning of the stage (prior to DAG execution check) does not fail.

curifactory.staging.aggregate(inputs: Optional[list] = None, outputs: Optional[list] = None, cachers: Optional[list] = None)

Decorator to wrap around a function that represents some step that must operate across multiple different parameter sets or “execution chains” within an experiment. This is normally used to run final analyses and comparisons of results across all passed parameter sets.

Important

Any function wrapped with the aggregate decorator must take a Record instance as the first argument and a list of Record instances as the second. The former is the record that applies to this function, and the latter is the set of other records from elsewhere in the experiment that this function needs to aggregate across.

Parameters
  • inputs (list[str]) – A list of variable names this stage expects to find in the state of each record passed into it. A warning will be thrown on any records that do not have the requested variable in state. Variables listed here are used for the DAG/map calculation to determine which stages are actually required to run for this stage to have everything it needs. Note that all inputs listed here must have a corresponding input parameter in the function definition line, each with the exact same name as in this list. These arguments are each dictionaries of the requested state artifacts, keyed by the record they come from.

  • outputs (list[str]) – A list of variable names that this stage will return and store in the record state. These represent, in order, the tuple of returned values from the function being wrapped.

  • cachers (list[Cacheable]) – An optional list of Cacheable instances (“strategies”) to apply to each of the return outputs. If specified, for each output, an instance of the corresponding cacher is initialized, and the save() function is called. Before the wrapped function is called, the output path is first checked, and if it exists and the current record parameters are not set to overwrite, the load() function is called and the wrapped function does not execute. Note that caching is all or nothing for a single function, you cannot cache only one returned value out of several.

Example

@aggregate(["results"], ["final_results"], [JsonCacher])
def compile_results(record: Record, records: List[Record], results: dict[Record, float]):
    final_results = {}
    for in_record, result in results.items():
        results[in_record.params.name] = result
    return final_results
curifactory.staging.stage(inputs: Optional[list] = None, outputs: Optional[list] = None, cachers: Optional[list] = None, suppress_missing_inputs: bool = False)

Decorator to wrap around a function that represents a single step in an experiment, a block with inputs and outputs pertaining to the remainder of that experiment.

Important

Any function wrapped with the stage decorator must take a Record instance as the first parameter, followed by the input parameters corresponding to the inputs list.

Parameters
  • inputs (list[str]) – A list of variable names that this stage will need from the record state. Note that all inputs listed here must have a corresponding input argument in the function definition line, each with the exact same name as in this list.

  • outputs (list[Union[str, Lazy]]) – A list of variable names that this stage will return and store in the record state. These represent, in order, the tuple of returned values from the function being wrapped.

  • cachers (list[Cacheable]) – An optional list of Cacheable instances (“strategies”) to apply to each of the return outputs. If specified, for each output, an instance of the corresponding cacher is initialized, and the save() function is called. Before the wrapped function is called, the output path is first checked, and if it exists and the current record parameter set is not configured to overwrite, the load() function is called and the wrapped function does not execute. Note that caching is all or nothing for a single function, you cannot cache only one returned value out of several.

  • suppress_missing_inputs (bool) – If true, any stage inputs that are not found in the record’s state will be passed in as None rather than raising an exception. This can be used to make all inputs optional, such as if a stage will be used after different sets of previous stages and not all values are necessarily required.

Example

@stage(inputs=["data", "model"], outputs=["results"], cachers=[JsonCacher])
def test_model(record: Record, data: pd.DataFrame, model):
    # ...
    return results_dictionary

Note that from this example, this stage assumes some other stages have output "data" and "model" at some point.