Module mls_lib.orchestration.step

Step

Classes

class Step (**inputs)

Abstract Step

Expand source code
class Step:
    """
    Abstract Step
    """
    def __init__(self, **inputs) -> None:
        self.inputs = inputs
        self.outputs = {}
        self.finished = False
        self.step_category = "abstract_step"

    def execute(self):
        """ Execute the step. """    
    def get_output(self, port):
        """ Get the output of the step. """
        return self.outputs[port]
    def _get_input_step(self, port):
        """ Get the input step of the step. """
        return self.inputs[port]
    def is_ready(self):
        """ Check if the step is ready. """
        for _, step in self.inputs.items():
            if not step[0].is_finished():
                return False
        return True

    def is_finished(self):
        """ Check if the step is finished. """
        return self.finished
    def _get_input(self, port) -> Object:
        """ Get the input of the step by getting the output of the input step. """
        origin, origin_port = self.inputs[port]
        return origin.get_output(origin_port)

    def _set_output(self, port, value):
        """ Set the output of the step. """
        self.outputs[port] = value
    def _finish_execution(self):
        """ Finish the execution of the step. """
        self.finished = True

Subclasses

Methods

def execute(self)

Execute the step.

def get_output(self, port)

Get the output of the step.

def is_finished(self)

Check if the step is finished.

def is_ready(self)

Check if the step is ready.