|
def stream_file_to_cos(): |
|
|
|
import ibm_boto3 |
|
import requests |
|
from ibm_botocore.client import Config |
|
import json |
|
import os |
|
import re |
|
from urllib.parse import unquote |
|
|
|
def extract_filename_from_headers(response): |
|
""" |
|
Extract the actual filename from response headers. |
|
Checks Content-Disposition and falls back to other methods if needed. |
|
Uses mimetypes library for extension mapping. |
|
""" |
|
import mimetypes |
|
|
|
|
|
mimetypes.init() |
|
|
|
|
|
if not mimetypes.guess_extension('application/x-jsonlines'): |
|
mimetypes.add_type('application/x-jsonlines', '.jsonl') |
|
if not mimetypes.guess_extension('application/parquet'): |
|
mimetypes.add_type('application/parquet', '.parquet') |
|
if not mimetypes.guess_extension('application/x-ipynb+json'): |
|
mimetypes.add_type('application/x-ipynb+json', '.ipynb') |
|
if not mimetypes.guess_extension('application/yaml'): |
|
mimetypes.add_type('application/yaml', '.yaml') |
|
if not mimetypes.guess_extension('text/yaml'): |
|
mimetypes.add_type('text/yaml', '.yaml') |
|
if not mimetypes.guess_extension('application/toml'): |
|
mimetypes.add_type('application/toml', '.toml') |
|
|
|
|
|
content_disposition = response.headers.get('Content-Disposition') |
|
if content_disposition: |
|
|
|
matches = re.findall(r'filename\*?=(?:([^\']*\'\')?([^;\n]*))', content_disposition) |
|
if matches: |
|
|
|
encoding, filename = matches[-1] |
|
if encoding: |
|
filename = unquote(filename) |
|
return filename.strip('"\'') |
|
|
|
|
|
url_path = response.url.split('/')[-1].split('?')[0] |
|
|
|
|
|
content_type = response.headers.get('Content-Type', '').split(';')[0] |
|
if content_type and '.' not in url_path: |
|
|
|
extension = mimetypes.guess_extension(content_type) |
|
if extension: |
|
return f"{url_path}{extension}" |
|
|
|
|
|
return url_path |
|
|
|
def score(payload): |
|
""" |
|
WatsonX.ai deployable function to stream files from HTTP to Cloud Object Storage |
|
|
|
Expected simplified format: |
|
[ |
|
{ |
|
"cos_config": { |
|
"bucket_name": "my-bucket", |
|
"api_key": "my-api-key", |
|
"instance_id": "my-instance-id", |
|
"auth_endpoint": "https://iam.cloud.ibm.com/identity/token", |
|
"endpoint_url": "https://s3.us-south.cloud-object-storage.appdomain.cloud" |
|
}, |
|
"source_urls": ["https://example.com/file1.pdf", "https://example.com/file2.csv"], |
|
"prefix": "my/prefix", |
|
"http_method": "GET" |
|
} |
|
] |
|
|
|
Which you can run through this kind of helper function: |
|
### --- --- --- |
|
|
|
def reformat_for_wxai_scoring(input_data): |
|
'''Converts input data to WatsonX.ai scoring payload format.''' |
|
# Convert single dict to list |
|
inputs = [input_data] if isinstance(input_data, dict) else input_data |
|
|
|
if not inputs: |
|
return {"input_data": [{"fields": [], "values": [[]]}]} |
|
|
|
# Extract fields from first object |
|
fields = list(inputs[0].keys()) |
|
|
|
# Build values array |
|
values = [[obj.get(field, None) for field in fields] for obj in inputs] |
|
|
|
return {"input_data": [{"fields": fields, "values": values}]} |
|
|
|
### --- --- --- |
|
""" |
|
try: |
|
|
|
fields = payload["input_data"][0]["fields"] |
|
values = payload["input_data"][0]["values"][0] |
|
|
|
|
|
params = dict(zip(fields, values)) |
|
|
|
|
|
cos_config = params.get('cos_config', {}) |
|
|
|
|
|
required_configs = ['bucket_name', 'api_key', 'instance_id', 'auth_endpoint', 'endpoint_url'] |
|
missing_configs = [k for k in required_configs if k not in cos_config or not cos_config[k]] |
|
if missing_configs: |
|
return { |
|
'predictions': [{ |
|
'fields': ['status', 'message'], |
|
'values': [['error', f"Missing required configuration: {', '.join(missing_configs)}"]] |
|
}] |
|
} |
|
|
|
|
|
source_urls = params.get('source_urls', []) |
|
if not source_urls: |
|
return { |
|
'predictions': [{ |
|
'fields': ['status', 'message'], |
|
'values': [['error', "Missing required parameter: source_urls"]] |
|
}] |
|
} |
|
|
|
|
|
if isinstance(source_urls, str): |
|
source_urls = [source_urls] |
|
|
|
prefix = params.get('prefix', '') |
|
http_method = params.get('http_method', 'GET') |
|
|
|
|
|
cos_client = ibm_boto3.client( |
|
"s3", |
|
ibm_api_key_id=cos_config['api_key'], |
|
ibm_service_instance_id=cos_config['instance_id'], |
|
ibm_auth_endpoint=cos_config['auth_endpoint'], |
|
config=Config(signature_version="oauth"), |
|
endpoint_url=cos_config['endpoint_url'] |
|
) |
|
|
|
|
|
if prefix: |
|
prefix = prefix.strip('/') |
|
if prefix: |
|
prefix = f"{prefix}/" |
|
|
|
|
|
results = [] |
|
errors = [] |
|
|
|
for source_url in source_urls: |
|
try: |
|
|
|
session = requests.Session() |
|
response = session.request(http_method, source_url, stream=True) |
|
response.raise_for_status() |
|
|
|
|
|
filename = extract_filename_from_headers(response) |
|
|
|
|
|
target_key = f"{prefix}{filename}" if prefix else filename |
|
|
|
|
|
conf = ibm_boto3.s3.transfer.TransferConfig( |
|
multipart_threshold=1024**2, |
|
max_concurrency=100 |
|
) |
|
|
|
cos_client.upload_fileobj( |
|
response.raw, |
|
cos_config['bucket_name'], |
|
target_key, |
|
Config=conf |
|
) |
|
|
|
results.append({ |
|
"source_url": source_url, |
|
"bucket": cos_config['bucket_name'], |
|
"key": target_key, |
|
"filename": filename, |
|
"status": "success" |
|
}) |
|
|
|
except Exception as e: |
|
errors.append({ |
|
"source_url": source_url, |
|
"error": str(e) |
|
}) |
|
|
|
|
|
response_data = { |
|
"successful_uploads": results, |
|
"failed_uploads": errors, |
|
"total_processed": len(source_urls), |
|
"successful_count": len(results), |
|
"failed_count": len(errors) |
|
} |
|
|
|
return { |
|
'predictions': [{ |
|
'fields': ['status', 'data'], |
|
'values': [['success' if results else 'error', response_data]] |
|
}] |
|
} |
|
|
|
except Exception as e: |
|
return { |
|
'predictions': [{ |
|
'fields': ['status', 'message'], |
|
'values': [['error', f"Error processing request: {str(e)}"]] |
|
}] |
|
} |
|
|
|
return score |
|
|
|
|
|
score = stream_file_to_cos() |