pipe

class pemi.pipe.Pipe(*, name='self', **params)[source]

A pipe is a parameterized collection of sources and targets which can be executed (flow).

Parameters
  • name (str) – Assign a name to the pipe

  • **params – Additional keyword parameters

name

The name of the pipe.

Type

str

sources

A dictionary where the keys are the names of source data subjects and the values are instances of a data subject class.

Type

dict

targets

A dictionary where the keys are the names of target data subjects and the values are instances of a data subject class.

Type

dict

pipes

A dictionary referencing nested pipes where the keys are the names of the nested pipes and the values are the nested pipe instances. All pipes come with at least one nested pipe called ‘self’.

Type

dict

connections

Pemi connection object.

Type

PipeConnection

connect(from_pipe_name, from_subject_name)[source]

Connect one nested pipe to another

Parameters
  • from_pipe_name (str) – Name of the nested pipe that contains the source of the connection.

  • from_subject_name (str) – Name of the data subject in the nested pipe that contains the source of the connection. This data subject needs to be a target of the pipe referenced by from_pipe_name.

Returns

the PipeConnection object.

Return type

PipeConnection

Example

Connecting the target of one pipe to the source of another:

class MyPipe(pemi.Pipe):
    def __init__(self):
        super().__init__()

        self.pipe(
            name='get_awesome_data',
            pipe=GetAwesomeDataPipe()
        )
        self.connect('get_awesome_data', 'main').to('load_awesome_data', 'main')

        self.pipe(
            name='load_awesome_data',
            pipe=LoadAwesomeDataPipe()
        )
flow()[source]

Execute this pipe. This method is meant to be defined in a child class.

Example

A simple hello-world pipe:

class MyPipe(pemi.Pipe):
    def flow(self):
        print('hello world')
>>> MyPipe().flow()
'hello world'
from_pickle(picklepipe=None)[source]

Recursively load all data subjects in all nested pipes from a pickled bytes object created by to_pickle.

Parameters

picklepipe – The bytes object created by to_pickle

Returns

Return type

self

Example

De-pickling a pickled pipe:

my_pipe = MyPipe()
pickled = my_pipe.to_pickle()

my_other_pipe = MyPipe().from_pickle(pickled)
pipe(name, pipe)[source]

Defines a named pipe nested in this pipe.

Parameters
  • name (str) – Name of the nested pipe.

  • pipe (pipe) – The nested pipe instance.

Example

Creating a target:

class MyPipe(pemi.Pipe):
    def __init__(self):
        super().__init__()

        self.pipe(
            name='nested',
            pipe=pemi.Pipe()
        )
>>> MyPipe().pipes.keys()
['self', 'nested']
source(subject_class, name, schema=None, **kwargs)[source]

Define a source data subject for this pipe.

Parameters
  • subject_class (class) – The DataSubject class this source uses.

  • name (str) – Name of this data subject.

  • schema (schema) – Schema associated with this source.

Example

Creating a source:

class MyPipe(pemi.Pipe):
    def __init__(self):
        super().__init__()

        self.source(
            pemi.PdDataSubject,
            name='main'
        )
>>> MyPipe().sources.keys()
['main']
target(subject_class, name, schema=None, **kwargs)[source]

Define a target data subject for this pipe.

Parameters
  • subject_class (class) – The DataSubject class this target uses.

  • name (str) – Name of this data subject.

  • schema (schema) – Schema associated with this target.

Example

Creating a target:

class MyPipe(pemi.Pipe):
    def __init__(self):
        super().__init__()

        self.target(
            pemi.PdDataSubject,
            name='main'
        )
>>> MyPipe().targets.keys()
['main']
to_pickle(picklepipe=None)[source]

Recursively pickle all of the data subjects in this and all nested pipes

Parameters

picklepipe – A pickled representation of a pipe. Only used for recursion not meant to be set by user.

Returns

A bytes object containing the pickled pipe.

Return type

bytes

Example

Pickling a pipe:

>>> MyPipe.to_pickle()
b'.. <bytes> ..'