watsonx.ai_Function_Deployment_MNB / stream_files_to_cos.py
MilanM's picture
Update stream_files_to_cos.py
696e220 verified
def stream_file_to_cos():
# Import dependencies
import ibm_boto3
import requests
from ibm_botocore.client import Config
import json
import os
import re
from urllib.parse import unquote
def extract_filename_from_headers(response):
"""
Extract the actual filename from response headers.
Checks Content-Disposition and falls back to other methods if needed.
Uses mimetypes library for extension mapping.
"""
import mimetypes
# Ensure mimetypes database is initialized with common types
mimetypes.init()
# Add any missing but common MIME types that might not be in the default database
if not mimetypes.guess_extension('application/x-jsonlines'):
mimetypes.add_type('application/x-jsonlines', '.jsonl')
if not mimetypes.guess_extension('application/parquet'):
mimetypes.add_type('application/parquet', '.parquet')
if not mimetypes.guess_extension('application/x-ipynb+json'):
mimetypes.add_type('application/x-ipynb+json', '.ipynb')
if not mimetypes.guess_extension('application/yaml'):
mimetypes.add_type('application/yaml', '.yaml')
if not mimetypes.guess_extension('text/yaml'):
mimetypes.add_type('text/yaml', '.yaml')
if not mimetypes.guess_extension('application/toml'):
mimetypes.add_type('application/toml', '.toml')
# Try Content-Disposition header first
content_disposition = response.headers.get('Content-Disposition')
if content_disposition:
# Look for filename= or filename*= parameters
matches = re.findall(r'filename\*?=(?:([^\']*\'\')?([^;\n]*))', content_disposition)
if matches:
# Take the last match and handle encoded filenames
encoding, filename = matches[-1]
if encoding:
filename = unquote(filename)
return filename.strip('"\'')
# Get the URL path as fallback filename
url_path = response.url.split('/')[-1].split('?')[0]
# Try Content-Type for file extension
content_type = response.headers.get('Content-Type', '').split(';')[0]
if content_type and '.' not in url_path:
# Get extension from mimetype
extension = mimetypes.guess_extension(content_type)
if extension:
return f"{url_path}{extension}"
# Fallback to URL filename
return url_path
def score(payload): ### or def score(payload, token=None) if you want to add authentication
"""
WatsonX.ai deployable function to stream files from HTTP to Cloud Object Storage
Expected simplified format:
[
{
"cos_config": {
"bucket_name": "my-bucket",
"api_key": "my-api-key",
"instance_id": "my-instance-id",
"auth_endpoint": "https://iam.cloud.ibm.com/identity/token",
"endpoint_url": "https://s3.us-south.cloud-object-storage.appdomain.cloud"
},
"source_urls": ["https://example.com/file1.pdf", "https://example.com/file2.csv"],
"prefix": "my/prefix",
"http_method": "GET"
}
]
Which you can run through this kind of helper function:
### --- --- ---
def reformat_for_wxai_scoring(input_data):
'''Converts input data to WatsonX.ai scoring payload format.'''
# Convert single dict to list
inputs = [input_data] if isinstance(input_data, dict) else input_data
if not inputs:
return {"input_data": [{"fields": [], "values": [[]]}]}
# Extract fields from first object
fields = list(inputs[0].keys())
# Build values array
values = [[obj.get(field, None) for field in fields] for obj in inputs]
return {"input_data": [{"fields": fields, "values": values}]}
### --- --- ---
"""
try:
# Extract the actual payload from input_data format
fields = payload["input_data"][0]["fields"]
values = payload["input_data"][0]["values"][0]
# Create a dictionary from fields and values
params = dict(zip(fields, values))
# Extract COS configuration
cos_config = params.get('cos_config', {})
# Verify all required config values are present
required_configs = ['bucket_name', 'api_key', 'instance_id', 'auth_endpoint', 'endpoint_url']
missing_configs = [k for k in required_configs if k not in cos_config or not cos_config[k]]
if missing_configs:
return {
'predictions': [{
'fields': ['status', 'message'],
'values': [['error', f"Missing required configuration: {', '.join(missing_configs)}"]]
}]
}
# Get function parameters
source_urls = params.get('source_urls', [])
if not source_urls:
return {
'predictions': [{
'fields': ['status', 'message'],
'values': [['error', "Missing required parameter: source_urls"]]
}]
}
# Convert single URL to list if necessary
if isinstance(source_urls, str):
source_urls = [source_urls]
prefix = params.get('prefix', '')
http_method = params.get('http_method', 'GET')
# Initialize COS client
cos_client = ibm_boto3.client(
"s3",
ibm_api_key_id=cos_config['api_key'],
ibm_service_instance_id=cos_config['instance_id'],
ibm_auth_endpoint=cos_config['auth_endpoint'],
config=Config(signature_version="oauth"),
endpoint_url=cos_config['endpoint_url']
)
# Normalize prefix
if prefix:
prefix = prefix.strip('/')
if prefix:
prefix = f"{prefix}/"
# Track results for each URL
results = []
errors = []
for source_url in source_urls:
try:
# Setup download stream
session = requests.Session()
response = session.request(http_method, source_url, stream=True)
response.raise_for_status()
# Extract actual filename from response
filename = extract_filename_from_headers(response)
# Combine prefix with filename for the full COS key
target_key = f"{prefix}{filename}" if prefix else filename
# Upload file to COS
conf = ibm_boto3.s3.transfer.TransferConfig(
multipart_threshold=1024**2, # 1MB
max_concurrency=100
)
cos_client.upload_fileobj(
response.raw,
cos_config['bucket_name'],
target_key,
Config=conf
)
results.append({
"source_url": source_url,
"bucket": cos_config['bucket_name'],
"key": target_key,
"filename": filename,
"status": "success"
})
except Exception as e:
errors.append({
"source_url": source_url,
"error": str(e)
})
# Prepare response in watsonx.ai format
response_data = {
"successful_uploads": results,
"failed_uploads": errors,
"total_processed": len(source_urls),
"successful_count": len(results),
"failed_count": len(errors)
}
return {
'predictions': [{
'fields': ['status', 'data'],
'values': [['success' if results else 'error', response_data]]
}]
}
except Exception as e:
return {
'predictions': [{
'fields': ['status', 'message'],
'values': [['error', f"Error processing request: {str(e)}"]]
}]
}
return score
score = stream_file_to_cos()