def stream_file_to_cos(): # Import dependencies import ibm_boto3 import requests from ibm_botocore.client import Config import json import os import re from urllib.parse import unquote def extract_filename_from_headers(response): """ Extract the actual filename from response headers. Checks Content-Disposition and falls back to other methods if needed. Uses mimetypes library for extension mapping. """ import mimetypes # Ensure mimetypes database is initialized with common types mimetypes.init() # Add any missing but common MIME types that might not be in the default database if not mimetypes.guess_extension('application/x-jsonlines'): mimetypes.add_type('application/x-jsonlines', '.jsonl') if not mimetypes.guess_extension('application/parquet'): mimetypes.add_type('application/parquet', '.parquet') if not mimetypes.guess_extension('application/x-ipynb+json'): mimetypes.add_type('application/x-ipynb+json', '.ipynb') if not mimetypes.guess_extension('application/yaml'): mimetypes.add_type('application/yaml', '.yaml') if not mimetypes.guess_extension('text/yaml'): mimetypes.add_type('text/yaml', '.yaml') if not mimetypes.guess_extension('application/toml'): mimetypes.add_type('application/toml', '.toml') # Try Content-Disposition header first content_disposition = response.headers.get('Content-Disposition') if content_disposition: # Look for filename= or filename*= parameters matches = re.findall(r'filename\*?=(?:([^\']*\'\')?([^;\n]*))', content_disposition) if matches: # Take the last match and handle encoded filenames encoding, filename = matches[-1] if encoding: filename = unquote(filename) return filename.strip('"\'') # Get the URL path as fallback filename url_path = response.url.split('/')[-1].split('?')[0] # Try Content-Type for file extension content_type = response.headers.get('Content-Type', '').split(';')[0] if content_type and '.' not in url_path: # Get extension from mimetype extension = mimetypes.guess_extension(content_type) if extension: return f"{url_path}{extension}" # Fallback to URL filename return url_path def score(payload): ### or def score(payload, token=None) if you want to add authentication """ WatsonX.ai deployable function to stream files from HTTP to Cloud Object Storage Expected simplified format: [ { "cos_config": { "bucket_name": "my-bucket", "api_key": "my-api-key", "instance_id": "my-instance-id", "auth_endpoint": "https://iam.cloud.ibm.com/identity/token", "endpoint_url": "https://s3.us-south.cloud-object-storage.appdomain.cloud" }, "source_urls": ["https://example.com/file1.pdf", "https://example.com/file2.csv"], "prefix": "my/prefix", "http_method": "GET" } ] Which you can run through this kind of helper function: ### --- --- --- def reformat_for_wxai_scoring(input_data): '''Converts input data to WatsonX.ai scoring payload format.''' # Convert single dict to list inputs = [input_data] if isinstance(input_data, dict) else input_data if not inputs: return {"input_data": [{"fields": [], "values": [[]]}]} # Extract fields from first object fields = list(inputs[0].keys()) # Build values array values = [[obj.get(field, None) for field in fields] for obj in inputs] return {"input_data": [{"fields": fields, "values": values}]} ### --- --- --- """ try: # Extract the actual payload from input_data format fields = payload["input_data"][0]["fields"] values = payload["input_data"][0]["values"][0] # Create a dictionary from fields and values params = dict(zip(fields, values)) # Extract COS configuration cos_config = params.get('cos_config', {}) # Verify all required config values are present required_configs = ['bucket_name', 'api_key', 'instance_id', 'auth_endpoint', 'endpoint_url'] missing_configs = [k for k in required_configs if k not in cos_config or not cos_config[k]] if missing_configs: return { 'predictions': [{ 'fields': ['status', 'message'], 'values': [['error', f"Missing required configuration: {', '.join(missing_configs)}"]] }] } # Get function parameters source_urls = params.get('source_urls', []) if not source_urls: return { 'predictions': [{ 'fields': ['status', 'message'], 'values': [['error', "Missing required parameter: source_urls"]] }] } # Convert single URL to list if necessary if isinstance(source_urls, str): source_urls = [source_urls] prefix = params.get('prefix', '') http_method = params.get('http_method', 'GET') # Initialize COS client cos_client = ibm_boto3.client( "s3", ibm_api_key_id=cos_config['api_key'], ibm_service_instance_id=cos_config['instance_id'], ibm_auth_endpoint=cos_config['auth_endpoint'], config=Config(signature_version="oauth"), endpoint_url=cos_config['endpoint_url'] ) # Normalize prefix if prefix: prefix = prefix.strip('/') if prefix: prefix = f"{prefix}/" # Track results for each URL results = [] errors = [] for source_url in source_urls: try: # Setup download stream session = requests.Session() response = session.request(http_method, source_url, stream=True) response.raise_for_status() # Extract actual filename from response filename = extract_filename_from_headers(response) # Combine prefix with filename for the full COS key target_key = f"{prefix}{filename}" if prefix else filename # Upload file to COS conf = ibm_boto3.s3.transfer.TransferConfig( multipart_threshold=1024**2, # 1MB max_concurrency=100 ) cos_client.upload_fileobj( response.raw, cos_config['bucket_name'], target_key, Config=conf ) results.append({ "source_url": source_url, "bucket": cos_config['bucket_name'], "key": target_key, "filename": filename, "status": "success" }) except Exception as e: errors.append({ "source_url": source_url, "error": str(e) }) # Prepare response in watsonx.ai format response_data = { "successful_uploads": results, "failed_uploads": errors, "total_processed": len(source_urls), "successful_count": len(results), "failed_count": len(errors) } return { 'predictions': [{ 'fields': ['status', 'data'], 'values': [['success' if results else 'error', response_data]] }] } except Exception as e: return { 'predictions': [{ 'fields': ['status', 'message'], 'values': [['error', f"Error processing request: {str(e)}"]] }] } return score score = stream_file_to_cos()