File size: 1,690 Bytes
94e514b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import boto3
import csv
from decimal import Decimal
from boto3.dynamodb.conditions import Key

from tools.config import AWS_REGION, ACCESS_LOG_DYNAMODB_TABLE_NAME, FEEDBACK_LOG_DYNAMODB_TABLE_NAME, USAGE_LOG_DYNAMODB_TABLE_NAME, OUTPUT_FOLDER

# Replace with your actual table name and region
TABLE_NAME = USAGE_LOG_DYNAMODB_TABLE_NAME # Choose as appropriate
REGION = AWS_REGION
CSV_OUTPUT = OUTPUT_FOLDER + 'dynamodb_logs_export.csv'

# Create DynamoDB resource
dynamodb = boto3.resource('dynamodb', region_name=REGION)
table = dynamodb.Table(TABLE_NAME)

# Helper function to convert Decimal to float or int
def convert_types(item):
    for key, value in item.items():
        if isinstance(value, Decimal):
            # Convert to int if no decimal places, else float
            item[key] = int(value) if value % 1 == 0 else float(value)
    return item

# Paginated scan
def scan_table():
    items = []
    response = table.scan()
    items.extend(response['Items'])

    while 'LastEvaluatedKey' in response:
        response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'])
        items.extend(response['Items'])

    return items

# Export to CSV
def export_to_csv(items, output_path):
    if not items:
        print("No items found.")
        return

    fieldnames = sorted(items[0].keys())

    with open(output_path, 'w', newline='', encoding='utf-8') as csvfile:
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        writer.writeheader()

        for item in items:
            writer.writerow(convert_types(item))

    print(f"Exported {len(items)} items to {output_path}")

# Run export
items = scan_table()
export_to_csv(items, CSV_OUTPUT)