|
import sys |
|
import os |
|
import time |
|
from tqdm import tqdm |
|
from tools import MechEventName |
|
from typing import List |
|
import pandas as pd |
|
import gc |
|
from pathlib import Path |
|
|
|
REDUCE_FACTOR = 0.25 |
|
SLEEP = 0.5 |
|
REQUEST_ID_FIELD = "request_id" |
|
SCRIPTS_DIR = Path(__file__).parent |
|
ROOT_DIR = SCRIPTS_DIR.parent |
|
DATA_DIR = ROOT_DIR / "data" |
|
BLOCK_FIELD = "block" |
|
|
|
|
|
def parse_args() -> str: |
|
"""Parse the arguments and return the RPC.""" |
|
if len(sys.argv) != 2: |
|
raise ValueError("Expected the RPC as a positional argument.") |
|
return sys.argv[1] |
|
|
|
|
|
def read_abi(abi_path: str) -> str: |
|
"""Read and return the wxDAI contract's ABI.""" |
|
with open(abi_path) as abi_file: |
|
return abi_file.read() |
|
|
|
|
|
def reduce_window(contract_instance, event, from_block, batch_size, latest_block): |
|
"""Dynamically reduce the batch size window.""" |
|
keep_fraction = 1 - REDUCE_FACTOR |
|
events_filter = contract_instance.events[event].build_filter() |
|
events_filter.fromBlock = from_block |
|
batch_size = int(batch_size * keep_fraction) |
|
events_filter.toBlock = min(from_block + batch_size, latest_block) |
|
tqdm.write(f"RPC timed out! Resizing batch size to {batch_size}.") |
|
time.sleep(SLEEP) |
|
return events_filter, batch_size |
|
|
|
|
|
def limit_text(text: str, limit: int = 200) -> str: |
|
"""Limit the given text""" |
|
if len(text) > limit: |
|
return f"{text[:limit]}..." |
|
return text |
|
|
|
|
|
def check_for_dicts(df: pd.DataFrame) -> List[str]: |
|
"""Check for columns that contain dictionaries.""" |
|
dict_columns = [] |
|
for column in df.columns: |
|
if df[column].apply(lambda x: isinstance(x, dict)).any(): |
|
dict_columns.append(column) |
|
return dict_columns |
|
|
|
|
|
def drop_dict_rows(df: pd.DataFrame, dict_columns: List[str]) -> pd.DataFrame: |
|
"""Drop rows that contain dictionaries.""" |
|
for column in dict_columns: |
|
df = df[~df[column].apply(lambda x: isinstance(x, dict))] |
|
return df |
|
|
|
|
|
def clean(df: pd.DataFrame) -> pd.DataFrame: |
|
"""Clean the dataframe.""" |
|
dict_columns = check_for_dicts(df) |
|
df = drop_dict_rows(df, dict_columns) |
|
cleaned = df.drop_duplicates() |
|
cleaned[REQUEST_ID_FIELD] = cleaned[REQUEST_ID_FIELD].astype("str") |
|
return cleaned |
|
|
|
|
|
def gen_event_filename(event_name: MechEventName) -> str: |
|
"""Generate the filename of an event.""" |
|
return f"{event_name.value.lower()}s.parquet" |
|
|
|
|
|
def read_n_last_lines(filename: str, n: int = 1) -> str: |
|
"""Return the `n` last lines' content of a file.""" |
|
num_newlines = 0 |
|
with open(filename, "rb") as f: |
|
try: |
|
f.seek(-2, os.SEEK_END) |
|
while num_newlines < n: |
|
f.seek(-2, os.SEEK_CUR) |
|
if f.read(1) == b"\n": |
|
num_newlines += 1 |
|
except OSError: |
|
f.seek(0) |
|
last_line = f.readline().decode() |
|
return last_line |
|
|
|
|
|
def get_earliest_block(event_name: MechEventName) -> int: |
|
"""Get the earliest block number to use when filtering for events.""" |
|
filename = gen_event_filename(event_name) |
|
if not os.path.exists(DATA_DIR / filename): |
|
return 0 |
|
|
|
df = pd.read_parquet(DATA_DIR / filename) |
|
block_field = f"{event_name.value.lower()}_{BLOCK_FIELD}" |
|
earliest_block = int(df[block_field].max()) |
|
|
|
del df |
|
gc.collect() |
|
return earliest_block |
|
|