watsonx.ai_prompting_examples / stream_files_to_cos.py
MilanM's picture
Upload 2 files
1602ff5 verified
raw
history blame
11.6 kB
def stream_file_to_cos():
# # Install required dependencies
# import subprocess
# subprocess.check_output('pip install ibm-cos-sdk requests', shell=True)
### ^^^ Not necessary in this case since both are part of the default python 'runtime-24.1-py3.11' environment on watsox.ai
# Import dependencies
import ibm_boto3
import requests
from ibm_botocore.client import Config
import json
import os
import re
from urllib.parse import unquote
def extract_filename_from_headers(response):
"""
Extract the actual filename from response headers.
Checks Content-Disposition and falls back to other methods if needed.
"""
# Try Content-Disposition header first
content_disposition = response.headers.get('Content-Disposition')
if content_disposition:
# Look for filename= or filename*= parameters
matches = re.findall(r'filename\*?=(?:([^\']*\'\')?([^;\n]*))', content_disposition)
if matches:
# Take the last match and handle encoded filenames
encoding, filename = matches[-1]
if encoding:
filename = unquote(filename)
return filename.strip('"\'')
# Try Content-Type for file extension
content_type = response.headers.get('Content-Type', '').split(';')[0]
extension_map = {
# Documents
'application/pdf': '.pdf',
'application/vnd.openxmlformats-officedocument.wordprocessingml.document': '.docx',
'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet': '.xlsx',
'application/vnd.openxmlformats-officedocument.presentationml.presentation': '.pptx',
'text/csv': '.csv',
'application/xml': '.xml',
'text/xml': '.xml',
'application/yaml': '.yaml',
'text/yaml': '.yaml',
'application/toml': '.toml',
'text/plain': '.txt',
# Archives
'application/x-rar-compressed': '.rar',
'application/x-7z-compressed': '.7z',
'application/zip': '.zip',
'application/x-tar': '.tar',
'application/gzip': '.gz',
'application/x-gzip': '.gz',
# Executables
'application/x-msdownload': '.exe',
'application/x-apple-diskimage': '.dmg',
# Data formats
'application/json': '.json',
'application/x-jsonlines': '.jsonl',
'application/parquet': '.parquet',
# Images
'image/jpeg': '.jpg',
'image/png': '.png',
'image/tiff': '.tiff',
'image/gif': '.gif',
# Code and notebooks
'application/x-ipynb+json': '.ipynb',
'text/x-python': '.py',
'application/x-python-code': '.py'
}
# If we have a valid content type with extension mapping
if content_type in extension_map:
# Try to find a filename in the URL path
url_path = response.url.split('/')[-1]
# Remove query parameters if any
url_path = url_path.split('?')[0]
# If the URL path has no extension, add the appropriate one
if '.' not in url_path:
return f"{url_path}{extension_map[content_type]}"
# Fallback to URL filename
return response.url.split('/')[-1].split('?')[0]
def score(payload, token=None):
"""
WatsonX.ai deployable function to stream files from HTTP to Cloud Object Storage
Expected payload format:
{
"input_data": [{
"fields": ["cos_config", "source_urls", "prefix", "http_method"],
"values": [[{
"bucket_name": "my-bucket",
"api_key": "my-api-key",
"instance_id": "my-instance-id",
"auth_endpoint": "https://iam.cloud.ibm.com/identity/token",
"endpoint_url": "https://s3.us-south.cloud-object-storage.appdomain.cloud"
},
["https://example.com/file1.pdf", "https://example.com/file2.csv"],
"my/prefix",
"GET"]]
}]
}
"""
try:
# Extract input parameters from payload
input_data = payload.get("input_data")[0]
fields = input_data.get("fields")
values = input_data.get("values")[0]
# Map fields to values
params = dict(zip(fields, values))
# Extract COS configuration
cos_config = params.get('cos_config', {})
# Verify all required config values are present
missing_configs = [k for k, v in cos_config.items() if not v]
if missing_configs:
return {
'predictions': [{
'fields': ['status', 'message'],
'values': [['error', f"Missing required configuration: {', '.join(missing_configs)}"]]
}]
}
# Get function parameters
source_urls = params.get('source_urls', [])
if not source_urls:
return {
'predictions': [{
'fields': ['status', 'message'],
'values': [['error', "Missing required parameter: source_urls"]]
}]
}
# Convert single URL to list if necessary
if isinstance(source_urls, str):
source_urls = [source_urls]
prefix = params.get('prefix', '')
http_method = params.get('http_method', 'GET')
# Initialize COS client
cos_client = ibm_boto3.client(
"s3",
ibm_api_key_id=cos_config['api_key'],
ibm_service_instance_id=cos_config['instance_id'],
ibm_auth_endpoint=cos_config['auth_endpoint'],
config=Config(signature_version="oauth"),
endpoint_url=cos_config['endpoint_url']
)
# Normalize prefix
if prefix:
prefix = prefix.strip('/')
if prefix:
prefix = f"{prefix}/"
# Track results for each URL
results = []
errors = []
for source_url in source_urls:
try:
# Setup download stream
session = requests.Session()
response = session.request(http_method, source_url, stream=True)
response.raise_for_status()
# Extract actual filename from response
filename = extract_filename_from_headers(response)
# Combine prefix with filename for the full COS key
target_key = f"{prefix}{filename}" if prefix else filename
# Upload file to COS
conf = ibm_boto3.s3.transfer.TransferConfig(
multipart_threshold=1024**2, # 1MB
max_concurrency=100
)
cos_client.upload_fileobj(
response.raw,
cos_config['bucket_name'],
target_key,
Config=conf
)
results.append({
"source_url": source_url,
"bucket": cos_config['bucket_name'],
"key": target_key,
"filename": filename,
"status": "success"
})
except Exception as e:
errors.append({
"source_url": source_url,
"error": str(e)
})
# Prepare response in watsonx.ai format
response_data = {
"successful_uploads": results,
"failed_uploads": errors,
"total_processed": len(source_urls),
"successful_count": len(results),
"failed_count": len(errors)
}
return {
'predictions': [{
'fields': ['status', 'data'],
'values': [['success' if results else 'error', response_data]]
}]
}
except Exception as e:
return {
'predictions': [{
'fields': ['status', 'message'],
'values': [['error', f"Error processing request: {str(e)}"]]
}]
}
return score
# For testing in notebook
score = stream_file_to_cos()
# ------------------------------------------------------------------------------------------------------------
### Example Usage:
# try:
# import requests
# import json
# wx_api_key = ""
# wx_region = "us-south" ### watsonx.ai region
# serving_name = "" ### Serving name or id of your deployment
# ## Retrieve a bearer token
# token_response = requests.post('https://iam.cloud.ibm.com/identity/token',
# data={
# "apikey": wx_api_key,
# "grant_type": 'urn:ibm:params:oauth:grant-type:apikey'
# }
# )
# bearer_tk = token_response.json()["access_token"]
# # Example run of function
# scoring_inputs = {
# "input_data": [
# {
# "fields": [
# "cos_config",
# "source_urls",
# "prefix",
# "http_method"],
# "values": [
# [
# {
# "api_key": "<insert_api_key>",
# "auth_endpoint": "https://iam.cloud.ibm.com/identity/token",
# "bucket_name": "<target_bucket_name>",
# "endpoint_url": "https://s3.eu-de.cloud-object-storage.appdomain.cloud", ### preset for Frankfurt Regional Here
# "instance_id": "<resource_instance_id starts with crn:...>"
# },
# [
# "https://data.mendeley.com/public-files/datasets/27c8pwsd6v/files/8145e2c0-83f8-4367-87d7-6778a7bc2e5f/file_downloaded", ### Example Data Links
# "https://data.mendeley.com/public-files/datasets/27c8pwsd6v/files/136853fb-52b3-457f-94cf-c79821ed5145/file_downloaded",
# "https://data.mendeley.com/public-files/datasets/27c8pwsd6v/files/8be42620-b4c2-4535-b9ce-e9b62190202f/file_downloaded",
# "https://data.mendeley.com/public-files/datasets/27c8pwsd6v/files/f88087d7-4d29-444a-b9ec-e203c41ec52b/file_downloaded"
# ],
# "cos_stream_test_run_batch", ### "Folder path to save to"
# "GET"
# ]
# ]
# }
# ]
# }
# function_run = requests.post(
# url = f'https://{wx_region}.ml.cloud.ibm.com/ml/v4/deployments/{serving_name}/predictions?version=2021-05-01',
# json = scoring_inputs,
# headers = {'Authorization': 'Bearer ' + bearer_tk}
# )
# finally:
# print(function_run.json())