Spaces:
Running
Running
File size: 1,918 Bytes
81e13bb |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
import requests
import concurrent.futures
import time
# Define the API endpoint
#http://43.204.234.114:8000/api/aadhar_ocr
# API_URL = "http://127.0.0.1:8000/api/aadhar_ocr"
API_URL = "http://localhost:8000/api/aadhar_ocr"
# Define the file paths
FILE_PATHS = {
"aadhar_file": "uploads/aadhar/test_one.jpg",
# "pan_file": "test_images_pan/6ea33087.jpeg",
# "cheque_file": "test_images_cheque/0f81678a.jpeg",
# "gst_file": "test_images_gst/0a52fbcb_page3_image_0.jpg",
}
# Function to send a single POST request
def send_request():
try:
start_time = time.time()
# Open files dynamically for each request
files = {key: open(path, "rb") for key, path in FILE_PATHS.items()}
response = requests.post(API_URL, files=files)
print("this is response\n\n",response)
end_time = time.time()
print(f"\nTime taken for one request: {end_time - start_time:.2f} seconds")
# Close the files after the request
for file in files.values():
file.close()
return response.status_code, response.text
except requests.exceptions.RequestException as e:
return "Error", str(e)
# Main function to send multiple concurrent requests
def test_api_concurrency(num_requests):
start_time = time.time()
with concurrent.futures.ThreadPoolExecutor() as executor:
# Launch multiple requests concurrently
results = list(executor.map(lambda _: send_request(), range(num_requests)))
end_time = time.time()
# Print results
for idx, (status, text) in enumerate(results):
print(f"Request {idx + 1}: Status Code: {status}, Response: {text}")
print(f"\nTotal time taken: {end_time - start_time:.2f} seconds")
# Number of concurrent requests
NUM_REQUESTS = 8 # Adjust this number based on your testing needs
if __name__ == "__main__":
test_api_concurrency(NUM_REQUESTS)
|