def stream_file_to_cos(): # # Install required dependencies # import subprocess # subprocess.check_output('pip install ibm-cos-sdk requests', shell=True) ### ^^^ Not necessary in this case since both are part of the default python 'runtime-24.1-py3.11' environment on watsox.ai # Import dependencies import ibm_boto3 import requests from ibm_botocore.client import Config import json import os import re from urllib.parse import unquote def extract_filename_from_headers(response): """ Extract the actual filename from response headers. Checks Content-Disposition and falls back to other methods if needed. """ # Try Content-Disposition header first content_disposition = response.headers.get('Content-Disposition') if content_disposition: # Look for filename= or filename*= parameters matches = re.findall(r'filename\*?=(?:([^\']*\'\')?([^;\n]*))', content_disposition) if matches: # Take the last match and handle encoded filenames encoding, filename = matches[-1] if encoding: filename = unquote(filename) return filename.strip('"\'') # Try Content-Type for file extension content_type = response.headers.get('Content-Type', '').split(';')[0] extension_map = { # Documents 'application/pdf': '.pdf', 'application/vnd.openxmlformats-officedocument.wordprocessingml.document': '.docx', 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet': '.xlsx', 'application/vnd.openxmlformats-officedocument.presentationml.presentation': '.pptx', 'text/csv': '.csv', 'application/xml': '.xml', 'text/xml': '.xml', 'application/yaml': '.yaml', 'text/yaml': '.yaml', 'application/toml': '.toml', 'text/plain': '.txt', # Archives 'application/x-rar-compressed': '.rar', 'application/x-7z-compressed': '.7z', 'application/zip': '.zip', 'application/x-tar': '.tar', 'application/gzip': '.gz', 'application/x-gzip': '.gz', # Executables 'application/x-msdownload': '.exe', 'application/x-apple-diskimage': '.dmg', # Data formats 'application/json': '.json', 'application/x-jsonlines': '.jsonl', 'application/parquet': '.parquet', # Images 'image/jpeg': '.jpg', 'image/png': '.png', 'image/tiff': '.tiff', 'image/gif': '.gif', # Code and notebooks 'application/x-ipynb+json': '.ipynb', 'text/x-python': '.py', 'application/x-python-code': '.py' } # If we have a valid content type with extension mapping if content_type in extension_map: # Try to find a filename in the URL path url_path = response.url.split('/')[-1] # Remove query parameters if any url_path = url_path.split('?')[0] # If the URL path has no extension, add the appropriate one if '.' not in url_path: return f"{url_path}{extension_map[content_type]}" # Fallback to URL filename return response.url.split('/')[-1].split('?')[0] def score(payload, token=None): """ WatsonX.ai deployable function to stream files from HTTP to Cloud Object Storage Expected payload format: { "input_data": [{ "fields": ["cos_config", "source_urls", "prefix", "http_method"], "values": [[{ "bucket_name": "my-bucket", "api_key": "my-api-key", "instance_id": "my-instance-id", "auth_endpoint": "https://iam.cloud.ibm.com/identity/token", "endpoint_url": "https://s3.us-south.cloud-object-storage.appdomain.cloud" }, ["https://example.com/file1.pdf", "https://example.com/file2.csv"], "my/prefix", "GET"]] }] } """ try: # Extract input parameters from payload input_data = payload.get("input_data")[0] fields = input_data.get("fields") values = input_data.get("values")[0] # Map fields to values params = dict(zip(fields, values)) # Extract COS configuration cos_config = params.get('cos_config', {}) # Verify all required config values are present missing_configs = [k for k, v in cos_config.items() if not v] if missing_configs: return { 'predictions': [{ 'fields': ['status', 'message'], 'values': [['error', f"Missing required configuration: {', '.join(missing_configs)}"]] }] } # Get function parameters source_urls = params.get('source_urls', []) if not source_urls: return { 'predictions': [{ 'fields': ['status', 'message'], 'values': [['error', "Missing required parameter: source_urls"]] }] } # Convert single URL to list if necessary if isinstance(source_urls, str): source_urls = [source_urls] prefix = params.get('prefix', '') http_method = params.get('http_method', 'GET') # Initialize COS client cos_client = ibm_boto3.client( "s3", ibm_api_key_id=cos_config['api_key'], ibm_service_instance_id=cos_config['instance_id'], ibm_auth_endpoint=cos_config['auth_endpoint'], config=Config(signature_version="oauth"), endpoint_url=cos_config['endpoint_url'] ) # Normalize prefix if prefix: prefix = prefix.strip('/') if prefix: prefix = f"{prefix}/" # Track results for each URL results = [] errors = [] for source_url in source_urls: try: # Setup download stream session = requests.Session() response = session.request(http_method, source_url, stream=True) response.raise_for_status() # Extract actual filename from response filename = extract_filename_from_headers(response) # Combine prefix with filename for the full COS key target_key = f"{prefix}{filename}" if prefix else filename # Upload file to COS conf = ibm_boto3.s3.transfer.TransferConfig( multipart_threshold=1024**2, # 1MB max_concurrency=100 ) cos_client.upload_fileobj( response.raw, cos_config['bucket_name'], target_key, Config=conf ) results.append({ "source_url": source_url, "bucket": cos_config['bucket_name'], "key": target_key, "filename": filename, "status": "success" }) except Exception as e: errors.append({ "source_url": source_url, "error": str(e) }) # Prepare response in watsonx.ai format response_data = { "successful_uploads": results, "failed_uploads": errors, "total_processed": len(source_urls), "successful_count": len(results), "failed_count": len(errors) } return { 'predictions': [{ 'fields': ['status', 'data'], 'values': [['success' if results else 'error', response_data]] }] } except Exception as e: return { 'predictions': [{ 'fields': ['status', 'message'], 'values': [['error', f"Error processing request: {str(e)}"]] }] } return score # For testing in notebook score = stream_file_to_cos() # ------------------------------------------------------------------------------------------------------------ ### Example Usage: # try: # import requests # import json # wx_api_key = "" # wx_region = "us-south" ### watsonx.ai region # serving_name = "" ### Serving name or id of your deployment # ## Retrieve a bearer token # token_response = requests.post('https://iam.cloud.ibm.com/identity/token', # data={ # "apikey": wx_api_key, # "grant_type": 'urn:ibm:params:oauth:grant-type:apikey' # } # ) # bearer_tk = token_response.json()["access_token"] # # Example run of function # scoring_inputs = { # "input_data": [ # { # "fields": [ # "cos_config", # "source_urls", # "prefix", # "http_method"], # "values": [ # [ # { # "api_key": "", # "auth_endpoint": "https://iam.cloud.ibm.com/identity/token", # "bucket_name": "", # "endpoint_url": "https://s3.eu-de.cloud-object-storage.appdomain.cloud", ### preset for Frankfurt Regional Here # "instance_id": "" # }, # [ # "https://data.mendeley.com/public-files/datasets/27c8pwsd6v/files/8145e2c0-83f8-4367-87d7-6778a7bc2e5f/file_downloaded", ### Example Data Links # "https://data.mendeley.com/public-files/datasets/27c8pwsd6v/files/136853fb-52b3-457f-94cf-c79821ed5145/file_downloaded", # "https://data.mendeley.com/public-files/datasets/27c8pwsd6v/files/8be42620-b4c2-4535-b9ce-e9b62190202f/file_downloaded", # "https://data.mendeley.com/public-files/datasets/27c8pwsd6v/files/f88087d7-4d29-444a-b9ec-e203c41ec52b/file_downloaded" # ], # "cos_stream_test_run_batch", ### "Folder path to save to" # "GET" # ] # ] # } # ] # } # function_run = requests.post( # url = f'https://{wx_region}.ml.cloud.ibm.com/ml/v4/deployments/{serving_name}/predictions?version=2021-05-01', # json = scoring_inputs, # headers = {'Authorization': 'Bearer ' + bearer_tk} # ) # finally: # print(function_run.json())