|
def stream_file_to_cos(): |
|
|
|
|
|
|
|
|
|
|
|
|
|
import ibm_boto3 |
|
import requests |
|
from ibm_botocore.client import Config |
|
import json |
|
import os |
|
import re |
|
from urllib.parse import unquote |
|
|
|
def extract_filename_from_headers(response): |
|
""" |
|
Extract the actual filename from response headers. |
|
Checks Content-Disposition and falls back to other methods if needed. |
|
""" |
|
|
|
content_disposition = response.headers.get('Content-Disposition') |
|
if content_disposition: |
|
|
|
matches = re.findall(r'filename\*?=(?:([^\']*\'\')?([^;\n]*))', content_disposition) |
|
if matches: |
|
|
|
encoding, filename = matches[-1] |
|
if encoding: |
|
filename = unquote(filename) |
|
return filename.strip('"\'') |
|
|
|
|
|
content_type = response.headers.get('Content-Type', '').split(';')[0] |
|
extension_map = { |
|
|
|
'application/pdf': '.pdf', |
|
'application/vnd.openxmlformats-officedocument.wordprocessingml.document': '.docx', |
|
'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet': '.xlsx', |
|
'application/vnd.openxmlformats-officedocument.presentationml.presentation': '.pptx', |
|
'text/csv': '.csv', |
|
'application/xml': '.xml', |
|
'text/xml': '.xml', |
|
'application/yaml': '.yaml', |
|
'text/yaml': '.yaml', |
|
'application/toml': '.toml', |
|
'text/plain': '.txt', |
|
|
|
|
|
'application/x-rar-compressed': '.rar', |
|
'application/x-7z-compressed': '.7z', |
|
'application/zip': '.zip', |
|
'application/x-tar': '.tar', |
|
'application/gzip': '.gz', |
|
'application/x-gzip': '.gz', |
|
|
|
|
|
'application/x-msdownload': '.exe', |
|
'application/x-apple-diskimage': '.dmg', |
|
|
|
|
|
'application/json': '.json', |
|
'application/x-jsonlines': '.jsonl', |
|
'application/parquet': '.parquet', |
|
|
|
|
|
'image/jpeg': '.jpg', |
|
'image/png': '.png', |
|
'image/tiff': '.tiff', |
|
'image/gif': '.gif', |
|
|
|
|
|
'application/x-ipynb+json': '.ipynb', |
|
'text/x-python': '.py', |
|
'application/x-python-code': '.py' |
|
} |
|
|
|
|
|
if content_type in extension_map: |
|
|
|
url_path = response.url.split('/')[-1] |
|
|
|
url_path = url_path.split('?')[0] |
|
|
|
if '.' not in url_path: |
|
return f"{url_path}{extension_map[content_type]}" |
|
|
|
|
|
return response.url.split('/')[-1].split('?')[0] |
|
|
|
def score(payload, token=None): |
|
""" |
|
WatsonX.ai deployable function to stream files from HTTP to Cloud Object Storage |
|
|
|
Expected payload format: |
|
{ |
|
"input_data": [{ |
|
"fields": ["cos_config", "source_urls", "prefix", "http_method"], |
|
"values": [[{ |
|
"bucket_name": "my-bucket", |
|
"api_key": "my-api-key", |
|
"instance_id": "my-instance-id", |
|
"auth_endpoint": "https://iam.cloud.ibm.com/identity/token", |
|
"endpoint_url": "https://s3.us-south.cloud-object-storage.appdomain.cloud" |
|
}, |
|
["https://example.com/file1.pdf", "https://example.com/file2.csv"], |
|
"my/prefix", |
|
"GET"]] |
|
}] |
|
} |
|
""" |
|
try: |
|
|
|
input_data = payload.get("input_data")[0] |
|
fields = input_data.get("fields") |
|
values = input_data.get("values")[0] |
|
|
|
|
|
params = dict(zip(fields, values)) |
|
|
|
|
|
cos_config = params.get('cos_config', {}) |
|
|
|
|
|
missing_configs = [k for k, v in cos_config.items() if not v] |
|
if missing_configs: |
|
return { |
|
'predictions': [{ |
|
'fields': ['status', 'message'], |
|
'values': [['error', f"Missing required configuration: {', '.join(missing_configs)}"]] |
|
}] |
|
} |
|
|
|
|
|
source_urls = params.get('source_urls', []) |
|
if not source_urls: |
|
return { |
|
'predictions': [{ |
|
'fields': ['status', 'message'], |
|
'values': [['error', "Missing required parameter: source_urls"]] |
|
}] |
|
} |
|
|
|
|
|
if isinstance(source_urls, str): |
|
source_urls = [source_urls] |
|
|
|
prefix = params.get('prefix', '') |
|
http_method = params.get('http_method', 'GET') |
|
|
|
|
|
cos_client = ibm_boto3.client( |
|
"s3", |
|
ibm_api_key_id=cos_config['api_key'], |
|
ibm_service_instance_id=cos_config['instance_id'], |
|
ibm_auth_endpoint=cos_config['auth_endpoint'], |
|
config=Config(signature_version="oauth"), |
|
endpoint_url=cos_config['endpoint_url'] |
|
) |
|
|
|
|
|
if prefix: |
|
prefix = prefix.strip('/') |
|
if prefix: |
|
prefix = f"{prefix}/" |
|
|
|
|
|
results = [] |
|
errors = [] |
|
|
|
for source_url in source_urls: |
|
try: |
|
|
|
session = requests.Session() |
|
response = session.request(http_method, source_url, stream=True) |
|
response.raise_for_status() |
|
|
|
|
|
filename = extract_filename_from_headers(response) |
|
|
|
|
|
target_key = f"{prefix}{filename}" if prefix else filename |
|
|
|
|
|
conf = ibm_boto3.s3.transfer.TransferConfig( |
|
multipart_threshold=1024**2, |
|
max_concurrency=100 |
|
) |
|
|
|
cos_client.upload_fileobj( |
|
response.raw, |
|
cos_config['bucket_name'], |
|
target_key, |
|
Config=conf |
|
) |
|
|
|
results.append({ |
|
"source_url": source_url, |
|
"bucket": cos_config['bucket_name'], |
|
"key": target_key, |
|
"filename": filename, |
|
"status": "success" |
|
}) |
|
|
|
except Exception as e: |
|
errors.append({ |
|
"source_url": source_url, |
|
"error": str(e) |
|
}) |
|
|
|
|
|
response_data = { |
|
"successful_uploads": results, |
|
"failed_uploads": errors, |
|
"total_processed": len(source_urls), |
|
"successful_count": len(results), |
|
"failed_count": len(errors) |
|
} |
|
|
|
return { |
|
'predictions': [{ |
|
'fields': ['status', 'data'], |
|
'values': [['success' if results else 'error', response_data]] |
|
}] |
|
} |
|
|
|
except Exception as e: |
|
return { |
|
'predictions': [{ |
|
'fields': ['status', 'message'], |
|
'values': [['error', f"Error processing request: {str(e)}"]] |
|
}] |
|
} |
|
|
|
return score |
|
|
|
|
|
score = stream_file_to_cos() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|