Skip to content

Reference

I am pypely.

Welcome to the py pipeline abstraction language.

pipeline

pipeline(*funcs)

I chain functions together.

I can deal with any number of provided functions. But I need at least one function. Functions that are fead into me need to be typed. The types of the provided functions need to match.

Example
from pypely import pipeline

def open_favourite_ide():
    ...

def create_new_conda_environment():
    ...

...

use_pypely = pipeline(
    open_favourite_ide,
    create_new_conda_environment,
    activate_environment,
    install_pypely,
    have_fun_building_pipelines
)

use_pypely() # -> 🥳

Parameters:

Name Type Description Default
funcs Callable

The functions that will be chained to form the pipeline.

()

Returns:

Type Description
Callable[P, Output]

Callable[P, Output]: A callable that forwards the input P to the first function. The output of the first function is passed to the second function, etc.

Source code in pypely/core/_functions.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def pipeline(*funcs: Unpack[Tuple[Callable[P, Any], Unpack[Tuple[Callable, ...]], Callable[..., Output]]]) -> Callable[P, Output]:  # type: ignore
    """I chain functions together.

    I can deal with any number of provided functions. But I need at least one function.
    Functions that are fead into me need to be typed.
    The types of the provided functions need to match.

    Example:
        ```python
        from pypely import pipeline

        def open_favourite_ide():
            ...

        def create_new_conda_environment():
            ...

        ...

        use_pypely = pipeline(
            open_favourite_ide,
            create_new_conda_environment,
            activate_environment,
            install_pypely,
            have_fun_building_pipelines
        )

        use_pypely() # -> 🥳
        ```

    Args:
        funcs (Callable): The functions that will be chained to form the pipeline.

    Returns:
        Callable[P, Output]: A callable that forwards the input `P` to the first function. The output of the first function is passed to the second function, etc.
    """
    first, *remaining = funcs
    initial = _wrap_with_error_handling(
        first
    )  # Only the second function is wrapped with error handling in check_and_compose
    _pipeline = reduce(check_and_compose, remaining, initial)

    @memorizable
    def _call(*args: P.args, **kwargs: P.kwargs) -> Output:
        with PipelineMemoryContext() as _:
            return _pipeline(*args, **kwargs)

    _call = define_annotation(_call, funcs[0], funcs[-1].__annotations__["return"])
    _call = define_signature(_call, funcs[0], funcs[-1].__annotations__["return"])

    return _call

fork

fork(*funcs)

I split the output into multiple parallel branches.

Each branch recieves the same input = the output of the function previous to fork.

Parameters:

Name Type Description Default
funcs Callable

The functions that consume the output of the previous function in parallel.

()

Returns:

Type Description
Callable[P, PypelyTuple]

Callable[P, PypelyTuple]: A function that provides the output of all provided functions as a tuple

Source code in pypely/core/_functions.py
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def fork(*funcs: Callable[P, Any]) -> Callable[P, PypelyTuple]:
    """I split the output into multiple parallel branches.

    Each branch recieves the same input = the output of the function previous to `fork`.

    Args:
        funcs (Callable): The functions that consume the output of the previous function in parallel.

    Returns:
        Callable[P, PypelyTuple]: A function that provides the output of all provided functions as a tuple
    """

    @memorizable(allow_ingest=False)  # type: ignore
    def _fork(*args: P.args, **kwargs: P.kwargs) -> PypelyTuple:
        return PypelyTuple(*(func(*args, **kwargs) for func in funcs))

    _fork_annotated = define_annotation(_fork, funcs[0], _fork.__annotations__["return"])
    _fork_signed = define_signature(_fork_annotated, funcs[0], _fork.__annotations__["return"])

    return _fork_signed

to

to(cls, *set_fields)

I convert multiple branches into an object.

I can only be used after fork. My purpose is to bring the created branches back together. The output of each branch will be used to instantiate the object. The output of the first branch will be set as the first attribute of obj etc.

set_fields can be used to adjust the order in which the outputs are used for instantiation. The following example demonstrates how to can be used with fields to specify to order in which the outputs of fork are used.

Example
@dataclass
class Table:
    tea: Tea
    plate: Plate
    bread: Bread
    eggs: Eggs


morning_routine = pipeline(
    wake_up,
    go_to_kitchen,
    fork(
        make_tea,
        fry_eggs,
        cut_bread,
        get_plate
    ),
    to(Table, "tea", "eggs", "bread", "plate")
)

Parameters:

Name Type Description Default
cls Type[T]

A class that will be instantiated by the outputs of the previous fork

required
set_fields str

This can be used to define the order in which the fields are set. Please check out the example for a better explanation.

()

Returns:

Type Description
Callable[[PypelyTuple], T]

Callable[[PypelyTuple], T]: A function that will instantiate the object when called

Source code in pypely/core/_functions.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
def to(cls: Type[T], *set_fields: str) -> Callable[[PypelyTuple], T]:
    """I convert multiple branches into an object.

    I can only be used after fork.
    My purpose is to bring the created branches back together.
    The output of each branch will be used to instantiate the object.
    The output of the first branch will be set as the first attribute of `obj` etc.

    `set_fields` can be used to adjust the order in which the outputs are used for instantiation.
    The following example demonstrates how `to` can be used with fields to specify to order in which the outputs of `fork` are used.

    Example:
        ```python
        @dataclass
        class Table:
            tea: Tea
            plate: Plate
            bread: Bread
            eggs: Eggs


        morning_routine = pipeline(
            wake_up,
            go_to_kitchen,
            fork(
                make_tea,
                fry_eggs,
                cut_bread,
                get_plate
            ),
            to(Table, "tea", "eggs", "bread", "plate")
        )
        ```

    Args:
        cls (Type[T]): A class that will be instantiated by the outputs of the previous `fork`
        set_fields (str): This can be used to define the order in which the fields are set.
            Please check out the example for a better explanation.

    Returns:
        Callable[[PypelyTuple], T]: A function that will instantiate the object when called
    """

    @memorizable(allow_ingest=False)  # type: ignore
    def _to(vals: PypelyTuple) -> T:
        vals_flattened = _flatten(vals)
        if not set_fields == ():
            assert len(vals_flattened) == len(set_fields)
            fields_named = {field_name: val for field_name, val in zip(set_fields, vals_flattened)}
            return cls(**fields_named)
        else:
            return cls(*vals_flattened)

    def _mock_function(p: PypelyTuple) -> None:
        pass

    _to_annotated = define_annotation(_to, _mock_function, _to.__annotations__["return"])
    _to_signed = define_signature(_to_annotated, _mock_function, _to.__annotations__["return"])

    return _to_signed

merge

merge(func)

I merge multiple branches.

I can only run after fork. I bring multiple branches back together. The given function func will recieve all outputs of the previous fork. func will recieve the outputs in order. func needs to take as many arguments as there are branches

Parameters:

Name Type Description Default
func Callable[P, T]

The function that defines the logic for how the branches will be merged.

required

Returns:

Type Description
Callable[[PypelyTuple], T]

Callable[[PypelyTuple], T]: A function that will apply func to the outputs of the previous fork

Source code in pypely/core/_functions.py
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
def merge(func: Callable[P, T]) -> Callable[[PypelyTuple], T]:
    """I merge multiple branches.

    I can only run after `fork`. I bring multiple branches back together.
    The given function `func` will recieve all outputs of the previous `fork`.
    `func` will recieve the outputs in order.
    `func` needs to take as many arguments as there are branches

    Args:
        func (Callable[P, T]): The function that defines the logic for how the branches will be merged.

    Returns:
        Callable[[PypelyTuple], T]: A function that will apply `func` to the outputs of the previous `fork`
    """

    @memorizable(allow_ingest=False)  # type: ignore
    def _merge(branches: PypelyTuple) -> T:
        flat_branches = _flatten(branches)
        return func(*flat_branches)

    def _mock_function(p: PypelyTuple) -> None:
        pass

    _merge_annotated = define_annotation(_merge, _mock_function, _merge.__annotations__["return"])
    _merge_signed = define_signature(_merge_annotated, _mock_function, _merge.__annotations__["return"])

    return _merge_signed

identity

identity(x)

I forward the given input untouched.

This can be useful if you want to forward a result for a later step. As this approach can also make the pipeline hard to understand, it is advised to use pypely.memory.memorizable

Parameters:

Name Type Description Default
x T

Any input

required

Returns:

Name Type Description
T T

The input is unchanged.

Source code in pypely/core/_functions.py
192
193
194
195
196
197
198
199
200
201
202
203
204
def identity(x: T) -> T:
    """I forward the given input untouched.

    This can be useful if you want to forward a result for a later step.
    As this approach can also make the pipeline hard to understand, it is advised to use [pypely.memory.memorizable][]

    Args:
        x (T): Any input

    Returns:
        T: The input is unchanged.
    """
    return x

components

Ignore for now.

core

I provide core functionalities of python.

errors

I provide all errors that can occur when interacting with the core of pypely.

PipelineStepError
PipelineStepError(func, exception)

Bases: PypelyError

I will be raised when a step inside a pipeline fails.

Source code in pypely/core/errors/_pipeline.py
13
14
15
def __init__(self, func: Callable, exception: Exception):
    message = self.__error_message(func, exception)
    super(PipelineStepError, self).__init__(message)
ParameterAnnotationsMissingError
ParameterAnnotationsMissingError(func)

Bases: PypelyError

I will be raised if at least one parameter of the function is not type annotated.

Source code in pypely/core/errors/_pipeline.py
24
25
26
def __init__(self, func: Callable):
    message = self.__error_message(func)
    super().__init__(message)
ReturnTypeAnnotationMissingError
ReturnTypeAnnotationMissingError(func)

Bases: PypelyError

I will be raised if a function has no return type annotation.

Source code in pypely/core/errors/_pipeline.py
41
42
43
def __init__(self, func: Callable):
    message = self.__error_message(func)
    super().__init__(message)
InvalidParameterAnnotationError
InvalidParameterAnnotationError(func, param)

Bases: PypelyError

I will be rasied when an invalid parameter annotation is used.

Source code in pypely/core/errors/_pipeline.py
58
59
60
def __init__(self, func: Callable, param: inspect.Parameter):
    message = self.__error_message(func, param)
    super().__init__(message)
OutputInputDoNotMatchError
OutputInputDoNotMatchError(
    func1, func2, inner_exception=None
)

Bases: PypelyError

I will be raised when the output does not match the input of the following function.

Source code in pypely/core/errors/_pipeline.py
75
76
77
def __init__(self, func1: Callable, func2: Callable, inner_exception: Optional[Exception] = None):
    message = self.__error_message(func1, func2, inner_exception)
    super().__init__(message)

memory

I expose functionality to persist step outputs and use them in later steps.

This functionality leverages the shift operators >> and <<. This allows you to store the output of a step and use it later in an other step. You can either use a string to define a memory entry or pypely.memory.MemoryEntry

Example:

pipeline(
    func1 >> "result",
    func2,
    func3,
    func4 << "result"
)

result = MemoryEntry()

pipeline(
    func1 >> result,
    func2,
    func3,
    func4 << result
)

The function that consumes the memory entry also consumes the output from the previous step. So from the previous example func4 would receive the output of func3 and the memory entry result.

The types of the memory entries are stored and it is checked if a function is capable to consume a memory entry based on the type information.

Memory entry names must be unique

The names of memory entries must be unique throughout the code base when using strings. This limitation is given as there is no way to identify the context in which the memory is used during buildtime. The type information is stored during the buildtime. The simplest way to work around this limitation is to use pypely.memory.MemoryEntry as shown in the second example.

The memory is context sensitive

The usage of memory entries is context sensitive. This means that you can only consume memory entries from that have been created in the same pipeline. Furthermore this means that the usage of memory entries from sub- / parent-pipelines is not possible. This is done to prohibit too complex memory usages.

Due to the context sensitivity, the following example will fail:

pipeline(
    func1 >> "result",
    pipeline(
        func2 << "result",
        func3,
        func4
    ),
    func5
)

MemoryEntry

MemoryEntry()

I can be used to describe a memory entry.

This allows to use a clear name which is internally handled as a uuid. In the future memorizable might only support me instead of using strings. If you run into naming conflicts use me instead of a string to reference the memory entry.

Example
from pypely import pipeline
from pypely.memory import memorizable, MemoryEntry

@memorizable
def some_func() -> int:
    return 42

intermediate_result = MemoryEntry()
pipeline(
    ...,
    some_func >> intermediate_result
)
Source code in pypely/memory/_wrappers.py
50
51
def __init__(self):
    self.id = str(uuid.uuid4())

memorizable

memorizable(func=None, allow_ingest=True)

I enable a function to interact with the memory.

The func parameter is usually not given directly. This is done via decorators. So memorizable can be used in two ways:

Example
from pypely.memory import memorizable


@memorizable
def some_func_that_allows_ingest(arg1: int) -> int:
    ...


@memorizable(allow_ingest=False)
def some_func_that_does_not_allow_ingest(arg1: int) -> int:
    ...

Parameters:

Name Type Description Default
func Optional[Callable[P, T]]

The function that wants to interact with the memory. Defaults to None.

None
allow_ingest Optional[bool]

This describes if data from the memory can be ingested into the function. Defaults to True.

True

Returns:

Type Description
Union[Callable[[Callable[P, T]], Callable[P, T]], Callable[P, T]]

Union[Callable[[Callable[P, T]], Memorizable], Callable[P, T]]: A callable that allows to use the shift operators (<<, >>)

Source code in pypely/memory/_wrappers.py
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
def memorizable(
    func: Optional[Callable[P, T]] = None, allow_ingest: Optional[bool] = True
) -> Union[Callable[[Callable[P, T]], Callable[P, T]], Callable[P, T]]:
    """I enable a function to interact with the memory.

    The `func` parameter is usually not given directly. This is done via decorators. So memorizable can be used in two ways:

    Example:
        ```python
        from pypely.memory import memorizable


        @memorizable
        def some_func_that_allows_ingest(arg1: int) -> int:
            ...


        @memorizable(allow_ingest=False)
        def some_func_that_does_not_allow_ingest(arg1: int) -> int:
            ...
        ```

    Args:
        func (Optional[Callable[P, T]], optional): The function that wants to interact with the memory. Defaults to None.
        allow_ingest (Optional[bool], optional): This describes if data from the memory can be ingested into the function. Defaults to True.

    Returns:
        Union[Callable[[Callable[P, T]], Memorizable], Callable[P, T]]: A callable that allows to use the shift operators (`<<`, `>>`)
    """
    if func is None:

        def _wrap(func: Callable[P, T]) -> Callable[P, T]:
            return Memorizable(func, allow_ingest)

        return _wrap
    else:
        return Memorizable(func, allow_ingest)

errors

I describe all errors that can occur during memory interaction.

MemoryAttributeExistsError
MemoryAttributeExistsError(message)

Bases: AttributeError, PypelyError

I will be raised if a memory entry already exists.

Source code in pypely/memory/errors.py
 9
10
def __init__(self, message):
    super(AttributeError, self).__init__(message)
MemoryAttributeNotFoundError
MemoryAttributeNotFoundError(message)

Bases: AttributeError, PypelyError

I will be raised if no memory entry with a given name exists.

Source code in pypely/memory/errors.py
16
17
def __init__(self, message):
    super(AttributeError, self).__init__(message)
MemoryIngestNotAllowedError
MemoryIngestNotAllowedError(message)

Bases: RuntimeError, PypelyError

I will be raised if a memory entry is ingested into a function that does not allow this.

Source code in pypely/memory/errors.py
23
24
def __init__(self, message):
    super(RuntimeError, self).__init__(message)
MemoryTypeDoesNotMatchError
MemoryTypeDoesNotMatchError(message)

Bases: RuntimeError, PypelyError

I will be rasie if the type of a memory entry does not match the parameter type it should be ingested for.

Source code in pypely/memory/errors.py
30
31
def __init__(self, message):
    super(RuntimeError, self).__init__(message)
InvalidMemoryAttributeError
InvalidMemoryAttributeError(message)

Bases: AttributeError, PypelyError

I will be raised if a value that is ingested into the memory has an invalid type.

Source code in pypely/memory/errors.py
37
38
def __init__(self, message: str) -> None:
    super().__init__(message)
NoFreeParameterFound
NoFreeParameterFound(message)

Bases: AttributeError, PypelyError

I am raised when there are too many ingests into a function.

Source code in pypely/memory/errors.py
44
45
def __init__(self, message: str) -> None:
    super().__init__(message)