MilanM commited on
Commit
1602ff5
·
verified ·
1 Parent(s): 8d8c41d

Upload 2 files

Browse files
cos_stream_schema_examples.py ADDED
@@ -0,0 +1,101 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Define input data schema as a proper Python list (not a string)
2
+ input_schema = [
3
+ {
4
+ 'id': '1',
5
+ 'type': 'struct',
6
+ 'fields': [
7
+ {
8
+ 'name': 'cos_config',
9
+ 'type': 'object',
10
+ 'nullable': False,
11
+ 'metadata': {
12
+ 'description': 'Cloud Object Storage configuration'
13
+ }
14
+ },
15
+ {
16
+ 'name': 'source_urls',
17
+ 'type': 'array',
18
+ 'nullable': False,
19
+ 'metadata': {
20
+ 'description': 'URLs of files to download and upload to COS'
21
+ }
22
+ },
23
+ {
24
+ 'name': 'prefix',
25
+ 'type': 'string',
26
+ 'nullable': True,
27
+ 'metadata': {
28
+ 'description': 'Optional prefix to add to the file names in COS'
29
+ }
30
+ },
31
+ {
32
+ 'name': 'http_method',
33
+ 'type': 'string',
34
+ 'nullable': True,
35
+ 'metadata': {
36
+ 'description': 'HTTP method to use for downloading files'
37
+ }
38
+ }
39
+ ]
40
+ }
41
+ ]
42
+
43
+ # Define output data schema as a proper Python list (not a string)
44
+ output_schema = [
45
+ {
46
+ 'id': '1',
47
+ 'type': 'struct',
48
+ 'fields': [
49
+ {
50
+ 'name': 'status',
51
+ 'type': 'string',
52
+ 'nullable': False,
53
+ 'metadata': {
54
+ 'description': 'Status of the operation (success or error)'
55
+ }
56
+ },
57
+ {
58
+ 'name': 'data',
59
+ 'type': 'object',
60
+ 'nullable': True,
61
+ 'metadata': {
62
+ 'description': 'Response data containing upload details'
63
+ }
64
+ },
65
+ {
66
+ 'name': 'message',
67
+ 'type': 'string',
68
+ 'nullable': True,
69
+ 'metadata': {
70
+ 'description': 'Error message if status is error'
71
+ }
72
+ }
73
+ ]
74
+ }
75
+ ]
76
+
77
+ # Define sample scoring input
78
+ sample_input = {
79
+ 'input_data': [
80
+ {
81
+ 'fields': ['cos_config', 'source_urls', 'prefix', 'http_method'],
82
+ 'values': [
83
+ [
84
+ {
85
+ 'bucket_name': 'my-bucket',
86
+ 'api_key': '<your-cos-api-key>',
87
+ 'instance_id': '<your-cos-instance-id>',
88
+ 'auth_endpoint': 'https://iam.cloud.ibm.com/identity/token',
89
+ 'endpoint_url': 'https://s3.eu-de.cloud-object-storage.appdomain.cloud'
90
+ },
91
+ [
92
+ 'https://example.com/sample-file.pdf',
93
+ 'https://example.com/another-file.csv'
94
+ ],
95
+ 'uploads/files',
96
+ 'GET'
97
+ ]
98
+ ]
99
+ }
100
+ ]
101
+ }
stream_files_to_cos.py ADDED
@@ -0,0 +1,300 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ def stream_file_to_cos():
2
+ # # Install required dependencies
3
+ # import subprocess
4
+ # subprocess.check_output('pip install ibm-cos-sdk requests', shell=True)
5
+ ### ^^^ Not necessary in this case since both are part of the default python 'runtime-24.1-py3.11' environment on watsox.ai
6
+
7
+ # Import dependencies
8
+ import ibm_boto3
9
+ import requests
10
+ from ibm_botocore.client import Config
11
+ import json
12
+ import os
13
+ import re
14
+ from urllib.parse import unquote
15
+
16
+ def extract_filename_from_headers(response):
17
+ """
18
+ Extract the actual filename from response headers.
19
+ Checks Content-Disposition and falls back to other methods if needed.
20
+ """
21
+ # Try Content-Disposition header first
22
+ content_disposition = response.headers.get('Content-Disposition')
23
+ if content_disposition:
24
+ # Look for filename= or filename*= parameters
25
+ matches = re.findall(r'filename\*?=(?:([^\']*\'\')?([^;\n]*))', content_disposition)
26
+ if matches:
27
+ # Take the last match and handle encoded filenames
28
+ encoding, filename = matches[-1]
29
+ if encoding:
30
+ filename = unquote(filename)
31
+ return filename.strip('"\'')
32
+
33
+ # Try Content-Type for file extension
34
+ content_type = response.headers.get('Content-Type', '').split(';')[0]
35
+ extension_map = {
36
+ # Documents
37
+ 'application/pdf': '.pdf',
38
+ 'application/vnd.openxmlformats-officedocument.wordprocessingml.document': '.docx',
39
+ 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet': '.xlsx',
40
+ 'application/vnd.openxmlformats-officedocument.presentationml.presentation': '.pptx',
41
+ 'text/csv': '.csv',
42
+ 'application/xml': '.xml',
43
+ 'text/xml': '.xml',
44
+ 'application/yaml': '.yaml',
45
+ 'text/yaml': '.yaml',
46
+ 'application/toml': '.toml',
47
+ 'text/plain': '.txt',
48
+
49
+ # Archives
50
+ 'application/x-rar-compressed': '.rar',
51
+ 'application/x-7z-compressed': '.7z',
52
+ 'application/zip': '.zip',
53
+ 'application/x-tar': '.tar',
54
+ 'application/gzip': '.gz',
55
+ 'application/x-gzip': '.gz',
56
+
57
+ # Executables
58
+ 'application/x-msdownload': '.exe',
59
+ 'application/x-apple-diskimage': '.dmg',
60
+
61
+ # Data formats
62
+ 'application/json': '.json',
63
+ 'application/x-jsonlines': '.jsonl',
64
+ 'application/parquet': '.parquet',
65
+
66
+ # Images
67
+ 'image/jpeg': '.jpg',
68
+ 'image/png': '.png',
69
+ 'image/tiff': '.tiff',
70
+ 'image/gif': '.gif',
71
+
72
+ # Code and notebooks
73
+ 'application/x-ipynb+json': '.ipynb',
74
+ 'text/x-python': '.py',
75
+ 'application/x-python-code': '.py'
76
+ }
77
+
78
+ # If we have a valid content type with extension mapping
79
+ if content_type in extension_map:
80
+ # Try to find a filename in the URL path
81
+ url_path = response.url.split('/')[-1]
82
+ # Remove query parameters if any
83
+ url_path = url_path.split('?')[0]
84
+ # If the URL path has no extension, add the appropriate one
85
+ if '.' not in url_path:
86
+ return f"{url_path}{extension_map[content_type]}"
87
+
88
+ # Fallback to URL filename
89
+ return response.url.split('/')[-1].split('?')[0]
90
+
91
+ def score(payload, token=None):
92
+ """
93
+ WatsonX.ai deployable function to stream files from HTTP to Cloud Object Storage
94
+
95
+ Expected payload format:
96
+ {
97
+ "input_data": [{
98
+ "fields": ["cos_config", "source_urls", "prefix", "http_method"],
99
+ "values": [[{
100
+ "bucket_name": "my-bucket",
101
+ "api_key": "my-api-key",
102
+ "instance_id": "my-instance-id",
103
+ "auth_endpoint": "https://iam.cloud.ibm.com/identity/token",
104
+ "endpoint_url": "https://s3.us-south.cloud-object-storage.appdomain.cloud"
105
+ },
106
+ ["https://example.com/file1.pdf", "https://example.com/file2.csv"],
107
+ "my/prefix",
108
+ "GET"]]
109
+ }]
110
+ }
111
+ """
112
+ try:
113
+ # Extract input parameters from payload
114
+ input_data = payload.get("input_data")[0]
115
+ fields = input_data.get("fields")
116
+ values = input_data.get("values")[0]
117
+
118
+ # Map fields to values
119
+ params = dict(zip(fields, values))
120
+
121
+ # Extract COS configuration
122
+ cos_config = params.get('cos_config', {})
123
+
124
+ # Verify all required config values are present
125
+ missing_configs = [k for k, v in cos_config.items() if not v]
126
+ if missing_configs:
127
+ return {
128
+ 'predictions': [{
129
+ 'fields': ['status', 'message'],
130
+ 'values': [['error', f"Missing required configuration: {', '.join(missing_configs)}"]]
131
+ }]
132
+ }
133
+
134
+ # Get function parameters
135
+ source_urls = params.get('source_urls', [])
136
+ if not source_urls:
137
+ return {
138
+ 'predictions': [{
139
+ 'fields': ['status', 'message'],
140
+ 'values': [['error', "Missing required parameter: source_urls"]]
141
+ }]
142
+ }
143
+
144
+ # Convert single URL to list if necessary
145
+ if isinstance(source_urls, str):
146
+ source_urls = [source_urls]
147
+
148
+ prefix = params.get('prefix', '')
149
+ http_method = params.get('http_method', 'GET')
150
+
151
+ # Initialize COS client
152
+ cos_client = ibm_boto3.client(
153
+ "s3",
154
+ ibm_api_key_id=cos_config['api_key'],
155
+ ibm_service_instance_id=cos_config['instance_id'],
156
+ ibm_auth_endpoint=cos_config['auth_endpoint'],
157
+ config=Config(signature_version="oauth"),
158
+ endpoint_url=cos_config['endpoint_url']
159
+ )
160
+
161
+ # Normalize prefix
162
+ if prefix:
163
+ prefix = prefix.strip('/')
164
+ if prefix:
165
+ prefix = f"{prefix}/"
166
+
167
+ # Track results for each URL
168
+ results = []
169
+ errors = []
170
+
171
+ for source_url in source_urls:
172
+ try:
173
+ # Setup download stream
174
+ session = requests.Session()
175
+ response = session.request(http_method, source_url, stream=True)
176
+ response.raise_for_status()
177
+
178
+ # Extract actual filename from response
179
+ filename = extract_filename_from_headers(response)
180
+
181
+ # Combine prefix with filename for the full COS key
182
+ target_key = f"{prefix}{filename}" if prefix else filename
183
+
184
+ # Upload file to COS
185
+ conf = ibm_boto3.s3.transfer.TransferConfig(
186
+ multipart_threshold=1024**2, # 1MB
187
+ max_concurrency=100
188
+ )
189
+
190
+ cos_client.upload_fileobj(
191
+ response.raw,
192
+ cos_config['bucket_name'],
193
+ target_key,
194
+ Config=conf
195
+ )
196
+
197
+ results.append({
198
+ "source_url": source_url,
199
+ "bucket": cos_config['bucket_name'],
200
+ "key": target_key,
201
+ "filename": filename,
202
+ "status": "success"
203
+ })
204
+
205
+ except Exception as e:
206
+ errors.append({
207
+ "source_url": source_url,
208
+ "error": str(e)
209
+ })
210
+
211
+ # Prepare response in watsonx.ai format
212
+ response_data = {
213
+ "successful_uploads": results,
214
+ "failed_uploads": errors,
215
+ "total_processed": len(source_urls),
216
+ "successful_count": len(results),
217
+ "failed_count": len(errors)
218
+ }
219
+
220
+ return {
221
+ 'predictions': [{
222
+ 'fields': ['status', 'data'],
223
+ 'values': [['success' if results else 'error', response_data]]
224
+ }]
225
+ }
226
+
227
+ except Exception as e:
228
+ return {
229
+ 'predictions': [{
230
+ 'fields': ['status', 'message'],
231
+ 'values': [['error', f"Error processing request: {str(e)}"]]
232
+ }]
233
+ }
234
+
235
+ return score
236
+
237
+ # For testing in notebook
238
+ score = stream_file_to_cos()
239
+
240
+
241
+ # ------------------------------------------------------------------------------------------------------------
242
+ ### Example Usage:
243
+ # try:
244
+ # import requests
245
+ # import json
246
+
247
+ # wx_api_key = ""
248
+ # wx_region = "us-south" ### watsonx.ai region
249
+ # serving_name = "" ### Serving name or id of your deployment
250
+
251
+
252
+ # ## Retrieve a bearer token
253
+ # token_response = requests.post('https://iam.cloud.ibm.com/identity/token',
254
+ # data={
255
+ # "apikey": wx_api_key,
256
+ # "grant_type": 'urn:ibm:params:oauth:grant-type:apikey'
257
+ # }
258
+ # )
259
+ # bearer_tk = token_response.json()["access_token"]
260
+
261
+
262
+ # # Example run of function
263
+ # scoring_inputs = {
264
+ # "input_data": [
265
+ # {
266
+ # "fields": [
267
+ # "cos_config",
268
+ # "source_urls",
269
+ # "prefix",
270
+ # "http_method"],
271
+ # "values": [
272
+ # [
273
+ # {
274
+ # "api_key": "<insert_api_key>",
275
+ # "auth_endpoint": "https://iam.cloud.ibm.com/identity/token",
276
+ # "bucket_name": "<target_bucket_name>",
277
+ # "endpoint_url": "https://s3.eu-de.cloud-object-storage.appdomain.cloud", ### preset for Frankfurt Regional Here
278
+ # "instance_id": "<resource_instance_id starts with crn:...>"
279
+ # },
280
+ # [
281
+ # "https://data.mendeley.com/public-files/datasets/27c8pwsd6v/files/8145e2c0-83f8-4367-87d7-6778a7bc2e5f/file_downloaded", ### Example Data Links
282
+ # "https://data.mendeley.com/public-files/datasets/27c8pwsd6v/files/136853fb-52b3-457f-94cf-c79821ed5145/file_downloaded",
283
+ # "https://data.mendeley.com/public-files/datasets/27c8pwsd6v/files/8be42620-b4c2-4535-b9ce-e9b62190202f/file_downloaded",
284
+ # "https://data.mendeley.com/public-files/datasets/27c8pwsd6v/files/f88087d7-4d29-444a-b9ec-e203c41ec52b/file_downloaded"
285
+ # ],
286
+ # "cos_stream_test_run_batch", ### "Folder path to save to"
287
+ # "GET"
288
+ # ]
289
+ # ]
290
+ # }
291
+ # ]
292
+ # }
293
+
294
+ # function_run = requests.post(
295
+ # url = f'https://{wx_region}.ml.cloud.ibm.com/ml/v4/deployments/{serving_name}/predictions?version=2021-05-01',
296
+ # json = scoring_inputs,
297
+ # headers = {'Authorization': 'Bearer ' + bearer_tk}
298
+ # )
299
+ # finally:
300
+ # print(function_run.json())