Skip to content

Commit 2249f3e

Browse files
+ Processor decorator
1 parent 20d8729 commit 2249f3e

File tree

2 files changed

+141
-1
lines changed

2 files changed

+141
-1
lines changed

src/thread/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@
3333

3434
# Export decorators
3535
from .decorators import (
36-
threaded
36+
threaded,
37+
processor
3738
)
3839

3940

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
"""
2+
## Processor
3+
4+
Documentation: https://thread.ngjx.org
5+
"""
6+
7+
from functools import wraps
8+
from ..thread import ParallelProcessing
9+
10+
from .._types import Overflow_In, Data_In
11+
from typing import Callable, Mapping, Sequence, Optional, Union, overload
12+
from typing_extensions import ParamSpec, TypeVar, Concatenate
13+
14+
15+
_TargetT = TypeVar('_TargetT')
16+
_TargetP = ParamSpec('_TargetP')
17+
_DataT = TypeVar('_DataT')
18+
TargetFunction = Callable[Concatenate[_DataT, _TargetP], _TargetT]
19+
20+
21+
NoParamReturn = Callable[Concatenate[Sequence[_DataT], _TargetP], ParallelProcessing[_TargetP, _TargetT, _DataT]]
22+
WithParamReturn = Callable[[TargetFunction[_DataT, _TargetP, _TargetT]], NoParamReturn[_DataT, _TargetP, _TargetT]]
23+
FullParamReturn = Callable[Concatenate[Sequence[_DataT], _TargetP], ParallelProcessing[_TargetP, _TargetT, _DataT]]
24+
WrappedWithParamReturn = Callable[[TargetFunction[_DataT, _TargetP, _TargetT]], WithParamReturn[_DataT, _TargetP, _TargetT]]
25+
26+
27+
28+
29+
@overload
30+
def processor(__function: TargetFunction[_DataT, _TargetP, _TargetT]) -> NoParamReturn[_DataT, _TargetP, _TargetT]: ...
31+
32+
@overload
33+
def processor(
34+
*,
35+
args: Sequence[Data_In] = (),
36+
kwargs: Mapping[str, Data_In] = {},
37+
ignore_errors: Sequence[type[Exception]] = (),
38+
suppress_errors: bool = False,
39+
**overflow_kwargs: Overflow_In
40+
) -> WithParamReturn[_DataT, _TargetP, _TargetT]: ...
41+
42+
@overload
43+
def processor(
44+
__function: TargetFunction[_DataT, _TargetP, _TargetT],
45+
*,
46+
args: Sequence[Data_In] = (),
47+
kwargs: Mapping[str, Data_In] = {},
48+
ignore_errors: Sequence[type[Exception]] = (),
49+
suppress_errors: bool = False,
50+
**overflow_kwargs: Overflow_In
51+
) -> FullParamReturn[_DataT, _TargetP, _TargetT]: ...
52+
53+
54+
def processor(
55+
__function: Optional[TargetFunction[_DataT, _TargetP, _TargetT]] = None,
56+
*,
57+
args: Sequence[Data_In] = (),
58+
kwargs: Mapping[str, Data_In] = {},
59+
ignore_errors: Sequence[type[Exception]] = (),
60+
suppress_errors: bool = False,
61+
**overflow_kwargs: Overflow_In
62+
) -> Union[NoParamReturn[_DataT, _TargetP, _TargetT], WithParamReturn[_DataT, _TargetP, _TargetT], FullParamReturn[_DataT, _TargetP, _TargetT]]:
63+
"""
64+
Decorate a function to run it in a thread
65+
66+
Parameters
67+
----------
68+
:param __function: The function to run in a thread
69+
:param args: Keyword-Only arguments to pass into `thread.Thread`
70+
:param kwargs: Keyword-Only keyword arguments to pass into `thread.Thread`
71+
:param ignore_errors: Keyword-Only arguments to pass into `thread.Thread`
72+
:param suppress_errors: Keyword-Only arguments to pass into `thread.Thread`
73+
:param **: Keyword-Only arguments to pass into `thread.Thread`
74+
75+
Returns
76+
-------
77+
:return decorator:
78+
79+
Use Case
80+
--------
81+
Now whenever `myfunction` is invoked, it will be executed in a thread and the `Thread` object will be returned
82+
83+
>>> @thread.threaded
84+
>>> def myfunction(*args, **kwargs): ...
85+
86+
>>> myJob = myfunction(1, 2)
87+
>>> type(myjob)
88+
> Thread
89+
90+
You can also pass keyword arguments to change the thread behaviour, it otherwise follows the defaults of `thread.Thread`
91+
>>> @thread.threaded(daemon = True)
92+
>>> def myfunction(): ...
93+
94+
Args will be ordered infront of function-parsed args parsed into `thread.Thread.args`
95+
>>> @thread.threaded(args = (1))
96+
>>> def myfunction(*args):
97+
>>> print(args)
98+
>>>
99+
>>> myfunction(4, 6).get_return_value()
100+
1, 4, 6
101+
"""
102+
103+
if not callable(__function):
104+
def wrapper(func: TargetFunction[_DataT, _TargetP, _TargetT]) -> FullParamReturn[_DataT, _TargetP, _TargetT]:
105+
return processor(
106+
func,
107+
args = args,
108+
kwargs = kwargs,
109+
ignore_errors = ignore_errors,
110+
suppress_errors = suppress_errors,
111+
**overflow_kwargs
112+
)
113+
return wrapper
114+
115+
overflow_kwargs.update({
116+
'ignore_errors': ignore_errors,
117+
'suppress_errors': suppress_errors
118+
})
119+
120+
kwargs = dict(kwargs)
121+
122+
@wraps(__function)
123+
def wrapped(data: Sequence[_DataT], *parsed_args: _TargetP.args, **parsed_kwargs: _TargetP.kwargs) -> ParallelProcessing[_TargetP, _TargetT, _DataT]:
124+
kwargs.update(parsed_kwargs)
125+
126+
processed_args = ( *args, *parsed_args )
127+
processed_kwargs = { i:v for i,v in kwargs.items() if i not in ['args', 'kwargs'] }
128+
129+
job = ParallelProcessing(
130+
target = __function,
131+
dataset = data,
132+
args = processed_args,
133+
kwargs = processed_kwargs,
134+
**overflow_kwargs
135+
)
136+
job.start()
137+
return job
138+
139+
return wrapped

0 commit comments

Comments
 (0)