File size: 3,346 Bytes
99c38a1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
import sys
import os
import time
from tqdm import tqdm
from tools import MechEventName
from typing import List
import pandas as pd
import gc
from pathlib import Path
REDUCE_FACTOR = 0.25
SLEEP = 0.5
REQUEST_ID_FIELD = "request_id"
SCRIPTS_DIR = Path(__file__).parent
ROOT_DIR = SCRIPTS_DIR.parent
DATA_DIR = ROOT_DIR / "data"
BLOCK_FIELD = "block"
def parse_args() -> str:
"""Parse the arguments and return the RPC."""
if len(sys.argv) != 2:
raise ValueError("Expected the RPC as a positional argument.")
return sys.argv[1]
def read_abi(abi_path: str) -> str:
"""Read and return the wxDAI contract's ABI."""
with open(abi_path) as abi_file:
return abi_file.read()
def reduce_window(contract_instance, event, from_block, batch_size, latest_block):
"""Dynamically reduce the batch size window."""
keep_fraction = 1 - REDUCE_FACTOR
events_filter = contract_instance.events[event].build_filter()
events_filter.fromBlock = from_block
batch_size = int(batch_size * keep_fraction)
events_filter.toBlock = min(from_block + batch_size, latest_block)
tqdm.write(f"RPC timed out! Resizing batch size to {batch_size}.")
time.sleep(SLEEP)
return events_filter, batch_size
def limit_text(text: str, limit: int = 200) -> str:
"""Limit the given text"""
if len(text) > limit:
return f"{text[:limit]}..."
return text
def check_for_dicts(df: pd.DataFrame) -> List[str]:
"""Check for columns that contain dictionaries."""
dict_columns = []
for column in df.columns:
if df[column].apply(lambda x: isinstance(x, dict)).any():
dict_columns.append(column)
return dict_columns
def drop_dict_rows(df: pd.DataFrame, dict_columns: List[str]) -> pd.DataFrame:
"""Drop rows that contain dictionaries."""
for column in dict_columns:
df = df[~df[column].apply(lambda x: isinstance(x, dict))]
return df
def clean(df: pd.DataFrame) -> pd.DataFrame:
"""Clean the dataframe."""
dict_columns = check_for_dicts(df)
df = drop_dict_rows(df, dict_columns)
cleaned = df.drop_duplicates()
cleaned[REQUEST_ID_FIELD] = cleaned[REQUEST_ID_FIELD].astype("str")
return cleaned
def gen_event_filename(event_name: MechEventName) -> str:
"""Generate the filename of an event."""
return f"{event_name.value.lower()}s.parquet"
def read_n_last_lines(filename: str, n: int = 1) -> str:
"""Return the `n` last lines' content of a file."""
num_newlines = 0
with open(filename, "rb") as f:
try:
f.seek(-2, os.SEEK_END)
while num_newlines < n:
f.seek(-2, os.SEEK_CUR)
if f.read(1) == b"\n":
num_newlines += 1
except OSError:
f.seek(0)
last_line = f.readline().decode()
return last_line
def get_earliest_block(event_name: MechEventName) -> int:
"""Get the earliest block number to use when filtering for events."""
filename = gen_event_filename(event_name)
if not os.path.exists(DATA_DIR / filename):
return 0
df = pd.read_parquet(DATA_DIR / filename)
block_field = f"{event_name.value.lower()}_{BLOCK_FIELD}"
earliest_block = int(df[block_field].max())
# clean and release all memory
del df
gc.collect()
return earliest_block
|