-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathclient.py
More file actions
271 lines (223 loc) · 8.46 KB
/
client.py
File metadata and controls
271 lines (223 loc) · 8.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
import enum
import re
import typing
import httpx
from ..core.client_wrapper import AsyncClientWrapper, SyncClientWrapper
from ..core.request_options import RequestOptions
# this is used as the default value for optional parameters
OMIT = typing.cast(typing.Any, ...)
_DEFAULT_WORKFLOW_DOMAIN = "m.pipedream.net"
_ENDPOINT_ID_RE = re.compile(r"^e(n|o)[a-z0-9-]+$")
class HTTPAuthType(enum.Enum):
"""Authentication types for workflow invocation."""
NONE = "none"
STATIC_BEARER = "static_bearer_token"
OAUTH = "oauth"
def _is_endpoint_id(url_or_endpoint: str) -> bool:
return bool(_ENDPOINT_ID_RE.match(url_or_endpoint))
def _build_workflow_url(url_or_endpoint: str, *, workflow_domain: str) -> str:
if not url_or_endpoint:
raise ValueError("URL or endpoint ID is required")
if url_or_endpoint.startswith(("http://", "https://")):
return url_or_endpoint
if "." in url_or_endpoint and not _is_endpoint_id(url_or_endpoint):
return f"https://{url_or_endpoint}"
if not _is_endpoint_id(url_or_endpoint):
raise ValueError(f"Invalid endpoint ID format: {url_or_endpoint}")
return f"https://{url_or_endpoint}.{workflow_domain}"
def _prepare_headers(
headers: typing.Optional[typing.Dict[str, str]],
auth_type: HTTPAuthType,
) -> typing.Dict[str, str]:
request_headers = dict(headers) if headers else {}
if auth_type == HTTPAuthType.NONE:
# Suppress the default bearer token added by the client wrapper.
request_headers["Authorization"] = ""
return request_headers
def _add_external_user_id_header(
headers: typing.Optional[typing.Dict[str, str]],
external_user_id: str,
) -> typing.Dict[str, str]:
if not external_user_id:
raise ValueError("external_user_id is required")
request_headers = dict(headers) if headers else {}
request_headers["X-PD-External-User-ID"] = external_user_id
return request_headers
class WorkflowsClient:
def __init__(
self,
*,
client_wrapper: SyncClientWrapper,
workflow_domain: str = _DEFAULT_WORKFLOW_DOMAIN,
):
self._client_wrapper = client_wrapper
self._workflow_domain = workflow_domain
def invoke(
self,
url_or_endpoint: str,
*,
method: str = "POST",
body: typing.Optional[typing.Any] = None,
headers: typing.Optional[typing.Dict[str, str]] = None,
auth_type: HTTPAuthType = HTTPAuthType.NONE,
request_options: typing.Optional[RequestOptions] = None,
) -> httpx.Response:
"""
Invoke a workflow via its HTTP interface URL or endpoint ID.
Parameters
----------
url_or_endpoint : str
The full URL of the workflow's HTTP interface or the endpoint ID.
method : str
HTTP method to use (default: "POST").
body : typing.Optional[typing.Any]
Request body. Dict/list values are serialized as JSON; other types are sent as raw form data.
headers : typing.Optional[typing.Dict[str, str]]
Additional HTTP headers to include.
auth_type : HTTPAuthType
Authentication mode (default: HTTPAuthType.NONE).
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
httpx.Response
Response returned by the invoked workflow.
Examples
--------
from pipedream import Pipedream, PipedreamEnvironment
from pipedream.workflows.client import HTTPAuthType
client = Pipedream(
client_id="<clientId>",
client_secret="<clientSecret>",
environment=PipedreamEnvironment.PROD,
)
response = client.workflows.invoke(
url_or_endpoint="https://your-workflow.m.pipedream.net",
method="POST",
body={"key": "value"},
headers={"Content-Type": "application/json"},
auth_type=HTTPAuthType.NONE,
)
"""
workflow_url = _build_workflow_url(url_or_endpoint, workflow_domain=self._workflow_domain)
request_headers = _prepare_headers(headers, auth_type)
response = self._client_wrapper.httpx_client.request(
workflow_url,
method=method,
data=body,
json=body,
headers=request_headers,
request_options=request_options,
)
response.raise_for_status()
return response
def invoke_for_external_user(
self,
url_or_endpoint: str,
*,
external_user_id: str,
method: str = "POST",
body: typing.Optional[typing.Any] = None,
headers: typing.Optional[typing.Dict[str, str]] = None,
request_options: typing.Optional[RequestOptions] = None,
) -> httpx.Response:
"""
Invoke a workflow on behalf of a Pipedream Connect end-user.
Forces OAuth authentication and adds the `X-PD-External-User-ID` header.
Parameters
----------
url_or_endpoint : str
The full URL of the workflow's HTTP interface or the endpoint ID.
external_user_id : str
The external user ID for whom the workflow is being invoked.
method : str
HTTP method to use (default: "POST").
body : typing.Optional[typing.Any]
Request body. Dict/list values are serialized as JSON; other types are sent as raw form data.
headers : typing.Optional[typing.Dict[str, str]]
Additional HTTP headers to include.
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
httpx.Response
Response returned by the invoked workflow.
Examples
--------
from pipedream import Pipedream, PipedreamEnvironment
client = Pipedream(
client_id="<clientId>",
client_secret="<clientSecret>",
environment=PipedreamEnvironment.PROD,
)
response = client.workflows.invoke_for_external_user(
url_or_endpoint="en2r1n8a98np7",
external_user_id="user_123",
method="POST",
body={"message": "Hello from external user"},
headers={"Content-Type": "application/json"},
)
"""
return self.invoke(
url_or_endpoint,
method=method,
body=body,
headers=_add_external_user_id_header(headers, external_user_id),
auth_type=HTTPAuthType.OAUTH,
request_options=request_options,
)
class AsyncWorkflowsClient:
def __init__(
self,
*,
client_wrapper: AsyncClientWrapper,
workflow_domain: str = _DEFAULT_WORKFLOW_DOMAIN,
):
self._client_wrapper = client_wrapper
self._workflow_domain = workflow_domain
async def invoke(
self,
url_or_endpoint: str,
*,
method: str = "POST",
body: typing.Optional[typing.Any] = None,
headers: typing.Optional[typing.Dict[str, str]] = None,
auth_type: HTTPAuthType = HTTPAuthType.NONE,
request_options: typing.Optional[RequestOptions] = None,
) -> httpx.Response:
"""
Async counterpart of `WorkflowsClient.invoke`.
"""
workflow_url = _build_workflow_url(url_or_endpoint, workflow_domain=self._workflow_domain)
request_headers = _prepare_headers(headers, auth_type)
response = await self._client_wrapper.httpx_client.request(
workflow_url,
method=method,
data=body,
json=body,
headers=request_headers,
request_options=request_options,
)
response.raise_for_status()
return response
async def invoke_for_external_user(
self,
url_or_endpoint: str,
*,
external_user_id: str,
method: str = "POST",
body: typing.Optional[typing.Any] = None,
headers: typing.Optional[typing.Dict[str, str]] = None,
request_options: typing.Optional[RequestOptions] = None,
) -> httpx.Response:
"""
Async counterpart of `WorkflowsClient.invoke_for_external_user`.
"""
return await self.invoke(
url_or_endpoint,
method=method,
body=body,
headers=_add_external_user_id_header(headers, external_user_id),
auth_type=HTTPAuthType.OAUTH,
request_options=request_options,
)