File size: 3,346 Bytes
99c38a1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import sys
import os
import time
from tqdm import tqdm
from tools import MechEventName
from typing import List
import pandas as pd
import gc
from pathlib import Path

REDUCE_FACTOR = 0.25
SLEEP = 0.5
REQUEST_ID_FIELD = "request_id"
SCRIPTS_DIR = Path(__file__).parent
ROOT_DIR = SCRIPTS_DIR.parent
DATA_DIR = ROOT_DIR / "data"
BLOCK_FIELD = "block"


def parse_args() -> str:
    """Parse the arguments and return the RPC."""
    if len(sys.argv) != 2:
        raise ValueError("Expected the RPC as a positional argument.")
    return sys.argv[1]


def read_abi(abi_path: str) -> str:
    """Read and return the wxDAI contract's ABI."""
    with open(abi_path) as abi_file:
        return abi_file.read()


def reduce_window(contract_instance, event, from_block, batch_size, latest_block):
    """Dynamically reduce the batch size window."""
    keep_fraction = 1 - REDUCE_FACTOR
    events_filter = contract_instance.events[event].build_filter()
    events_filter.fromBlock = from_block
    batch_size = int(batch_size * keep_fraction)
    events_filter.toBlock = min(from_block + batch_size, latest_block)
    tqdm.write(f"RPC timed out! Resizing batch size to {batch_size}.")
    time.sleep(SLEEP)
    return events_filter, batch_size


def limit_text(text: str, limit: int = 200) -> str:
    """Limit the given text"""
    if len(text) > limit:
        return f"{text[:limit]}..."
    return text


def check_for_dicts(df: pd.DataFrame) -> List[str]:
    """Check for columns that contain dictionaries."""
    dict_columns = []
    for column in df.columns:
        if df[column].apply(lambda x: isinstance(x, dict)).any():
            dict_columns.append(column)
    return dict_columns


def drop_dict_rows(df: pd.DataFrame, dict_columns: List[str]) -> pd.DataFrame:
    """Drop rows that contain dictionaries."""
    for column in dict_columns:
        df = df[~df[column].apply(lambda x: isinstance(x, dict))]
    return df


def clean(df: pd.DataFrame) -> pd.DataFrame:
    """Clean the dataframe."""
    dict_columns = check_for_dicts(df)
    df = drop_dict_rows(df, dict_columns)
    cleaned = df.drop_duplicates()
    cleaned[REQUEST_ID_FIELD] = cleaned[REQUEST_ID_FIELD].astype("str")
    return cleaned


def gen_event_filename(event_name: MechEventName) -> str:
    """Generate the filename of an event."""
    return f"{event_name.value.lower()}s.parquet"


def read_n_last_lines(filename: str, n: int = 1) -> str:
    """Return the `n` last lines' content of a file."""
    num_newlines = 0
    with open(filename, "rb") as f:
        try:
            f.seek(-2, os.SEEK_END)
            while num_newlines < n:
                f.seek(-2, os.SEEK_CUR)
                if f.read(1) == b"\n":
                    num_newlines += 1
        except OSError:
            f.seek(0)
        last_line = f.readline().decode()
    return last_line


def get_earliest_block(event_name: MechEventName) -> int:
    """Get the earliest block number to use when filtering for events."""
    filename = gen_event_filename(event_name)
    if not os.path.exists(DATA_DIR / filename):
        return 0

    df = pd.read_parquet(DATA_DIR / filename)
    block_field = f"{event_name.value.lower()}_{BLOCK_FIELD}"
    earliest_block = int(df[block_field].max())
    # clean and release all memory
    del df
    gc.collect()
    return earliest_block