utensil.loopflow package¶
Subpackages¶
Submodules¶
utensil.loopflow.loopflow module¶
- class utensil.loopflow.loopflow.BaseNode(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)[source]¶
Bases:
multiprocessing.context.Process
- class utensil.loopflow.loopflow.BaseNodeProcess(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)[source]¶
Bases:
multiprocessing.context.Process
- class utensil.loopflow.loopflow.NodeProcessFunction[source]¶
Bases:
object
- classmethod parse(o) List[utensil.loopflow.loopflow.NodeProcessFunction] [source]¶
- class utensil.loopflow.loopflow.NodeProcessMeta(node_name: 'str', triggerings: 'List[Node]', children: 'List[Node]', process_funcs: 'List[NodeProcessFunction]', export: 'Tuple[str]', result_q: 'SimpleQueue', end_q: 'SimpleQueue')[source]¶
Bases:
object
- node_name: str¶
- triggerings: List[utensil.loopflow.loopflow.Node]¶
- children: List[utensil.loopflow.loopflow.Node]¶
- process_funcs: List[utensil.loopflow.loopflow.NodeProcessFunction]¶
- export: Tuple[str]¶
- result_q: multiprocessing.context.BaseContext.SimpleQueue¶
- end_q: multiprocessing.context.BaseContext.SimpleQueue¶
- class utensil.loopflow.loopflow.NodeProcess(meta: utensil.loopflow.loopflow.NodeProcessMeta, *args, **kwargs)[source]¶
- class utensil.loopflow.loopflow.ParentSpecifier(node_name: 'str', flow_condition: 'Tuple[str]', flows: 'Tuple[Any]', flow_use: 'Tuple[str]')[source]¶
Bases:
object
- node_name: str¶
- flow_condition: Tuple[str]¶
- flows: Tuple[Any]¶
- flow_use: Tuple[str]¶
- class utensil.loopflow.loopflow.Node(name: str, end: bool, proc_funcs: List[utensil.loopflow.loopflow.NodeProcessFunction], end_q: multiprocessing.context.BaseContext.SimpleQueue, result_q: multiprocessing.context.BaseContext.SimpleQueue, triggers: Union[None, utensil.loopflow.loopflow.Triggers] = None, parents: Union[None, utensil.loopflow.loopflow.Parents] = None, export: Union[None, str, List[str]] = None)[source]¶
- class utensil.loopflow.loopflow.Flow(nodes: List[utensil.loopflow.loopflow.Node], result_q: multiprocessing.context.BaseContext.SimpleQueue)[source]¶
Bases:
object
- utensil.loopflow.loopflow.default_node_process_function_name(c: Type[utensil.loopflow.loopflow.NodeProcessFunction])[source]¶
- utensil.loopflow.loopflow.register_node_process_functions(proc_funcs: Optional[List[Type[utensil.loopflow.loopflow.NodeProcessFunction]]] = None, proc_func_map: Optional[Dict[str, Type[utensil.loopflow.loopflow.NodeProcessFunction]]] = None, proc_func_module: Optional[module] = None, raise_on_exist: bool = True)[source]¶