MilanM commited on
Commit
696e220
·
verified ·
1 Parent(s): 65ce127

Update stream_files_to_cos.py

Browse files
Files changed (1) hide show
  1. stream_files_to_cos.py +72 -143
stream_files_to_cos.py CHANGED
@@ -1,9 +1,4 @@
1
  def stream_file_to_cos():
2
- # # Install required dependencies
3
- # import subprocess
4
- # subprocess.check_output('pip install ibm-cos-sdk requests', shell=True)
5
- ### ^^^ Not necessary in this case since both are part of the default python 'runtime-24.1-py3.11' environment on watsox.ai
6
-
7
  # Import dependencies
8
  import ibm_boto3
9
  import requests
@@ -17,7 +12,27 @@ def stream_file_to_cos():
17
  """
18
  Extract the actual filename from response headers.
19
  Checks Content-Disposition and falls back to other methods if needed.
 
20
  """
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
21
  # Try Content-Disposition header first
22
  content_disposition = response.headers.get('Content-Disposition')
23
  if content_disposition:
@@ -30,99 +45,75 @@ def stream_file_to_cos():
30
  filename = unquote(filename)
31
  return filename.strip('"\'')
32
 
 
 
 
33
  # Try Content-Type for file extension
34
  content_type = response.headers.get('Content-Type', '').split(';')[0]
35
- extension_map = {
36
- # Documents
37
- 'application/pdf': '.pdf',
38
- 'application/vnd.openxmlformats-officedocument.wordprocessingml.document': '.docx',
39
- 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet': '.xlsx',
40
- 'application/vnd.openxmlformats-officedocument.presentationml.presentation': '.pptx',
41
- 'text/csv': '.csv',
42
- 'application/xml': '.xml',
43
- 'text/xml': '.xml',
44
- 'application/yaml': '.yaml',
45
- 'text/yaml': '.yaml',
46
- 'application/toml': '.toml',
47
- 'text/plain': '.txt',
48
-
49
- # Archives
50
- 'application/x-rar-compressed': '.rar',
51
- 'application/x-7z-compressed': '.7z',
52
- 'application/zip': '.zip',
53
- 'application/x-tar': '.tar',
54
- 'application/gzip': '.gz',
55
- 'application/x-gzip': '.gz',
56
-
57
- # Executables
58
- 'application/x-msdownload': '.exe',
59
- 'application/x-apple-diskimage': '.dmg',
60
-
61
- # Data formats
62
- 'application/json': '.json',
63
- 'application/x-jsonlines': '.jsonl',
64
- 'application/parquet': '.parquet',
65
-
66
- # Images
67
- 'image/jpeg': '.jpg',
68
- 'image/png': '.png',
69
- 'image/tiff': '.tiff',
70
- 'image/gif': '.gif',
71
-
72
- # Code and notebooks
73
- 'application/x-ipynb+json': '.ipynb',
74
- 'text/x-python': '.py',
75
- 'application/x-python-code': '.py'
76
- }
77
-
78
- # If we have a valid content type with extension mapping
79
- if content_type in extension_map:
80
- # Try to find a filename in the URL path
81
- url_path = response.url.split('/')[-1]
82
- # Remove query parameters if any
83
- url_path = url_path.split('?')[0]
84
- # If the URL path has no extension, add the appropriate one
85
- if '.' not in url_path:
86
- return f"{url_path}{extension_map[content_type]}"
87
 
88
  # Fallback to URL filename
89
- return response.url.split('/')[-1].split('?')[0]
90
 
91
- def score(payload, token=None):
92
  """
93
  WatsonX.ai deployable function to stream files from HTTP to Cloud Object Storage
94
 
95
- Expected payload format:
 
96
  {
97
- "input_data": [{
98
- "fields": ["cos_config", "source_urls", "prefix", "http_method"],
99
- "values": [[{
100
- "bucket_name": "my-bucket",
101
- "api_key": "my-api-key",
102
- "instance_id": "my-instance-id",
103
- "auth_endpoint": "https://iam.cloud.ibm.com/identity/token",
104
- "endpoint_url": "https://s3.us-south.cloud-object-storage.appdomain.cloud"
105
- },
106
- ["https://example.com/file1.pdf", "https://example.com/file2.csv"],
107
- "my/prefix",
108
- "GET"]]
109
- }]
110
  }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
111
  """
112
  try:
113
- # Extract input parameters from payload
114
- input_data = payload.get("input_data")[0]
115
- fields = input_data.get("fields")
116
- values = input_data.get("values")[0]
117
-
118
- # Map fields to values
119
  params = dict(zip(fields, values))
120
 
121
  # Extract COS configuration
122
  cos_config = params.get('cos_config', {})
123
 
124
  # Verify all required config values are present
125
- missing_configs = [k for k, v in cos_config.items() if not v]
 
126
  if missing_configs:
127
  return {
128
  'predictions': [{
@@ -234,67 +225,5 @@ def stream_file_to_cos():
234
 
235
  return score
236
 
237
- # For testing in notebook
238
- score = stream_file_to_cos()
239
-
240
-
241
- # ------------------------------------------------------------------------------------------------------------
242
- ### Example Usage:
243
- # try:
244
- # import requests
245
- # import json
246
-
247
- # wx_api_key = ""
248
- # wx_region = "us-south" ### watsonx.ai region
249
- # serving_name = "" ### Serving name or id of your deployment
250
-
251
-
252
- # ## Retrieve a bearer token
253
- # token_response = requests.post('https://iam.cloud.ibm.com/identity/token',
254
- # data={
255
- # "apikey": wx_api_key,
256
- # "grant_type": 'urn:ibm:params:oauth:grant-type:apikey'
257
- # }
258
- # )
259
- # bearer_tk = token_response.json()["access_token"]
260
-
261
-
262
- # # Example run of function
263
- # scoring_inputs = {
264
- # "input_data": [
265
- # {
266
- # "fields": [
267
- # "cos_config",
268
- # "source_urls",
269
- # "prefix",
270
- # "http_method"],
271
- # "values": [
272
- # [
273
- # {
274
- # "api_key": "<insert_api_key>",
275
- # "auth_endpoint": "https://iam.cloud.ibm.com/identity/token",
276
- # "bucket_name": "<target_bucket_name>",
277
- # "endpoint_url": "https://s3.eu-de.cloud-object-storage.appdomain.cloud", ### preset for Frankfurt Regional Here
278
- # "instance_id": "<resource_instance_id starts with crn:...>"
279
- # },
280
- # [
281
- # "https://data.mendeley.com/public-files/datasets/27c8pwsd6v/files/8145e2c0-83f8-4367-87d7-6778a7bc2e5f/file_downloaded", ### Example Data Links
282
- # "https://data.mendeley.com/public-files/datasets/27c8pwsd6v/files/136853fb-52b3-457f-94cf-c79821ed5145/file_downloaded",
283
- # "https://data.mendeley.com/public-files/datasets/27c8pwsd6v/files/8be42620-b4c2-4535-b9ce-e9b62190202f/file_downloaded",
284
- # "https://data.mendeley.com/public-files/datasets/27c8pwsd6v/files/f88087d7-4d29-444a-b9ec-e203c41ec52b/file_downloaded"
285
- # ],
286
- # "cos_stream_test_run_batch", ### "Folder path to save to"
287
- # "GET"
288
- # ]
289
- # ]
290
- # }
291
- # ]
292
- # }
293
 
294
- # function_run = requests.post(
295
- # url = f'https://{wx_region}.ml.cloud.ibm.com/ml/v4/deployments/{serving_name}/predictions?version=2021-05-01',
296
- # json = scoring_inputs,
297
- # headers = {'Authorization': 'Bearer ' + bearer_tk}
298
- # )
299
- # finally:
300
- # print(function_run.json())
 
1
  def stream_file_to_cos():
 
 
 
 
 
2
  # Import dependencies
3
  import ibm_boto3
4
  import requests
 
12
  """
13
  Extract the actual filename from response headers.
14
  Checks Content-Disposition and falls back to other methods if needed.
15
+ Uses mimetypes library for extension mapping.
16
  """
17
+ import mimetypes
18
+
19
+ # Ensure mimetypes database is initialized with common types
20
+ mimetypes.init()
21
+
22
+ # Add any missing but common MIME types that might not be in the default database
23
+ if not mimetypes.guess_extension('application/x-jsonlines'):
24
+ mimetypes.add_type('application/x-jsonlines', '.jsonl')
25
+ if not mimetypes.guess_extension('application/parquet'):
26
+ mimetypes.add_type('application/parquet', '.parquet')
27
+ if not mimetypes.guess_extension('application/x-ipynb+json'):
28
+ mimetypes.add_type('application/x-ipynb+json', '.ipynb')
29
+ if not mimetypes.guess_extension('application/yaml'):
30
+ mimetypes.add_type('application/yaml', '.yaml')
31
+ if not mimetypes.guess_extension('text/yaml'):
32
+ mimetypes.add_type('text/yaml', '.yaml')
33
+ if not mimetypes.guess_extension('application/toml'):
34
+ mimetypes.add_type('application/toml', '.toml')
35
+
36
  # Try Content-Disposition header first
37
  content_disposition = response.headers.get('Content-Disposition')
38
  if content_disposition:
 
45
  filename = unquote(filename)
46
  return filename.strip('"\'')
47
 
48
+ # Get the URL path as fallback filename
49
+ url_path = response.url.split('/')[-1].split('?')[0]
50
+
51
  # Try Content-Type for file extension
52
  content_type = response.headers.get('Content-Type', '').split(';')[0]
53
+ if content_type and '.' not in url_path:
54
+ # Get extension from mimetype
55
+ extension = mimetypes.guess_extension(content_type)
56
+ if extension:
57
+ return f"{url_path}{extension}"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
58
 
59
  # Fallback to URL filename
60
+ return url_path
61
 
62
+ def score(payload): ### or def score(payload, token=None) if you want to add authentication
63
  """
64
  WatsonX.ai deployable function to stream files from HTTP to Cloud Object Storage
65
 
66
+ Expected simplified format:
67
+ [
68
  {
69
+ "cos_config": {
70
+ "bucket_name": "my-bucket",
71
+ "api_key": "my-api-key",
72
+ "instance_id": "my-instance-id",
73
+ "auth_endpoint": "https://iam.cloud.ibm.com/identity/token",
74
+ "endpoint_url": "https://s3.us-south.cloud-object-storage.appdomain.cloud"
75
+ },
76
+ "source_urls": ["https://example.com/file1.pdf", "https://example.com/file2.csv"],
77
+ "prefix": "my/prefix",
78
+ "http_method": "GET"
 
 
 
79
  }
80
+ ]
81
+
82
+ Which you can run through this kind of helper function:
83
+ ### --- --- ---
84
+
85
+ def reformat_for_wxai_scoring(input_data):
86
+ '''Converts input data to WatsonX.ai scoring payload format.'''
87
+ # Convert single dict to list
88
+ inputs = [input_data] if isinstance(input_data, dict) else input_data
89
+
90
+ if not inputs:
91
+ return {"input_data": [{"fields": [], "values": [[]]}]}
92
+
93
+ # Extract fields from first object
94
+ fields = list(inputs[0].keys())
95
+
96
+ # Build values array
97
+ values = [[obj.get(field, None) for field in fields] for obj in inputs]
98
+
99
+ return {"input_data": [{"fields": fields, "values": values}]}
100
+
101
+ ### --- --- ---
102
  """
103
  try:
104
+ # Extract the actual payload from input_data format
105
+ fields = payload["input_data"][0]["fields"]
106
+ values = payload["input_data"][0]["values"][0]
107
+
108
+ # Create a dictionary from fields and values
 
109
  params = dict(zip(fields, values))
110
 
111
  # Extract COS configuration
112
  cos_config = params.get('cos_config', {})
113
 
114
  # Verify all required config values are present
115
+ required_configs = ['bucket_name', 'api_key', 'instance_id', 'auth_endpoint', 'endpoint_url']
116
+ missing_configs = [k for k in required_configs if k not in cos_config or not cos_config[k]]
117
  if missing_configs:
118
  return {
119
  'predictions': [{
 
225
 
226
  return score
227
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
228
 
229
+ score = stream_file_to_cos()