pipe¶
-
class
pemi.pipe.
Pipe
(*, name='self', **params)[source]¶ A pipe is a parameterized collection of sources and targets which can be executed (flow).
- Parameters
name (str) – Assign a name to the pipe
**params – Additional keyword parameters
-
name
¶ The name of the pipe.
- Type
str
-
sources
¶ A dictionary where the keys are the names of source data subjects and the values are instances of a data subject class.
- Type
dict
-
targets
¶ A dictionary where the keys are the names of target data subjects and the values are instances of a data subject class.
- Type
dict
-
pipes
¶ A dictionary referencing nested pipes where the keys are the names of the nested pipes and the values are the nested pipe instances. All pipes come with at least one nested pipe called ‘self’.
- Type
dict
-
connections
¶ Pemi connection object.
- Type
PipeConnection
-
connect
(from_pipe_name, from_subject_name)[source]¶ Connect one nested pipe to another
- Parameters
from_pipe_name (str) – Name of the nested pipe that contains the source of the connection.
from_subject_name (str) – Name of the data subject in the nested pipe that contains the source of the connection. This data subject needs to be a target of the pipe referenced by from_pipe_name.
- Returns
the PipeConnection object.
- Return type
PipeConnection
Example
Connecting the target of one pipe to the source of another:
class MyPipe(pemi.Pipe): def __init__(self): super().__init__() self.pipe( name='get_awesome_data', pipe=GetAwesomeDataPipe() ) self.connect('get_awesome_data', 'main').to('load_awesome_data', 'main') self.pipe( name='load_awesome_data', pipe=LoadAwesomeDataPipe() )
-
flow
()[source]¶ Execute this pipe. This method is meant to be defined in a child class.
Example
A simple hello-world pipe:
class MyPipe(pemi.Pipe): def flow(self): print('hello world')
>>> MyPipe().flow() 'hello world'
-
from_pickle
(picklepipe=None)[source]¶ Recursively load all data subjects in all nested pipes from a pickled bytes object created by to_pickle.
- Parameters
picklepipe – The bytes object created by to_pickle
- Returns
- Return type
self
Example
De-pickling a pickled pipe:
my_pipe = MyPipe() pickled = my_pipe.to_pickle() my_other_pipe = MyPipe().from_pickle(pickled)
-
pipe
(name, pipe)[source]¶ Defines a named pipe nested in this pipe.
- Parameters
name (str) – Name of the nested pipe.
pipe (pipe) – The nested pipe instance.
Example
Creating a target:
class MyPipe(pemi.Pipe): def __init__(self): super().__init__() self.pipe( name='nested', pipe=pemi.Pipe() )
>>> MyPipe().pipes.keys() ['self', 'nested']
-
source
(subject_class, name, schema=None, **kwargs)[source]¶ Define a source data subject for this pipe.
- Parameters
subject_class (class) – The
DataSubject
class this source uses.name (str) – Name of this data subject.
schema (schema) –
Schema
associated with this source.
Example
Creating a source:
class MyPipe(pemi.Pipe): def __init__(self): super().__init__() self.source( pemi.PdDataSubject, name='main' )
>>> MyPipe().sources.keys() ['main']
-
target
(subject_class, name, schema=None, **kwargs)[source]¶ Define a target data subject for this pipe.
- Parameters
subject_class (class) – The
DataSubject
class this target uses.name (str) – Name of this data subject.
schema (schema) –
Schema
associated with this target.
Example
Creating a target:
class MyPipe(pemi.Pipe): def __init__(self): super().__init__() self.target( pemi.PdDataSubject, name='main' )
>>> MyPipe().targets.keys() ['main']
-
to_pickle
(picklepipe=None)[source]¶ Recursively pickle all of the data subjects in this and all nested pipes
- Parameters
picklepipe – A pickled representation of a pipe. Only used for recursion not meant to be set by user.
- Returns
A bytes object containing the pickled pipe.
- Return type
bytes
Example
Pickling a pipe:
>>> MyPipe.to_pickle() b'.. <bytes> ..'