Spaces:
Running
on
Zero
Running
on
Zero
File size: 5,467 Bytes
b0120da bc42ceb b0120da bc42ceb b0120da bc42ceb b0120da bc42ceb b0120da bc42ceb b0120da bc42ceb b0120da |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 |
import os
import fsspec
import pyarrow as pa
# pyarrow needs the initialization from this import
import pyarrow.dataset # pyright: ignore
import typer
from pyarrow.lib import ArrowInvalid
from rich.progress import track
def is_valid_arrow_file(path: str):
try:
dataset = pa.dataset.dataset(path, format="arrow")
return True
except ArrowInvalid:
return False
app = typer.Typer()
S3_PREFIX = "s3://"
def get_fs(path: str, s3_profile: str | None = None) -> fsspec.AbstractFileSystem:
if path.startswith("s3://"):
if s3_profile is None:
return fsspec.filesystem("s3")
else:
return fsspec.filesystem("s3", profile=s3_profile)
else:
return fsspec.filesystem("file")
@app.command()
def print_local_to_delete(
blob_dir: str, local_dirs: list[str], s3_profile: str = "blt"
):
for s in local_dirs:
assert s.endswith("/"), "Dirs must end with /"
assert blob_dir.endswith("/"), "Dirs must end with /"
blob_fs = fsspec.filesystem("s3", profile=s3_profile)
blob_files = blob_fs.find(blob_dir)
for f in track(blob_files):
size = blob_fs.info(f)["Size"]
if not f.lower().endswith(".complete"):
assert size != 0, f"Size was invalidly zero for {f}"
blob_relative_paths = {f[len(blob_dir) - len(S3_PREFIX) :] for f in blob_files}
local_fs = fsspec.filesystem("file")
files_to_delete = []
for local_dir in local_dirs:
local_files = local_fs.find(local_dir)
for f in local_files:
relative_path = f[len(local_dir) :]
if relative_path in blob_relative_paths and not os.path.islink(f):
files_to_delete.append(f)
print(len(files_to_delete))
with open("/tmp/files_to_delete.txt", "w") as f:
for file in files_to_delete:
f.write(f"{file}\n")
@app.command()
def compare_local_to_blob(
source_dirs: list[str],
dst_dir: str,
s3_profile: str = "blt",
print_sizes: bool = False,
):
for s in source_dirs:
assert s.endswith("/"), "Dirs must end with /"
assert dst_dir.endswith("/"), "Dirs must end with /"
assert len(source_dirs) != 0
assert dst_dir.startswith("s3://")
local_fs = fsspec.filesystem("file")
dst_fs = fsspec.filesystem("s3", profile=s3_profile)
source_to_files = {}
source_file_to_size = {}
all_local_files = set()
for s in source_dirs:
skipped = []
if s not in source_to_files:
source_to_files[s] = []
for f in local_fs.find(s):
if os.path.islink(f):
continue
if f.endswith(".COMPLETE") or f.endswith(".complete"):
is_complete_file = True
assert os.path.getsize(f) == 0, ".COMPLETE files should be empty"
else:
is_complete_file = False
if not is_complete_file and os.path.getsize(f) == 0:
skipped.append(f)
continue
if f.endswith(".arrow"):
if not is_valid_arrow_file(f):
skipped.append(f)
continue
file_without_prefix = f[len(s) :]
if file_without_prefix not in source_file_to_size:
source_file_to_size[file_without_prefix] = os.path.getsize(f)
else:
source_file_to_size[file_without_prefix] = max(
source_file_to_size[file_without_prefix], os.path.getsize(f)
)
source_to_files[s].append(f)
all_local_files.add(file_without_prefix)
print(s, len(source_to_files[s]), "skipped", len(skipped), skipped[:10])
dst_files = dst_fs.find(dst_dir)
print(dst_dir, len(dst_files))
dst_file_to_size = {}
dst_file_set = set()
for f in dst_files:
dst_file_without_prefix = f[len(dst_dir) - len(S3_PREFIX) :]
dst_file_set.add(dst_file_without_prefix)
dst_file_to_size[dst_file_without_prefix] = dst_fs.size(f)
diff = all_local_files.symmetric_difference(dst_file_set)
print("Local files", len(all_local_files))
print("DST Files", len(dst_file_set))
print("Symmetric difference", len(diff))
dst_only_files = dst_file_set - all_local_files
print("DST only", len(dst_only_files), list(dst_only_files)[:10])
all_files = dst_file_set | all_local_files
print("Check that files match")
size_success = True
for f in sorted(all_files):
if f in source_file_to_size and f in dst_file_to_size:
if source_file_to_size[f] != dst_file_to_size[f]:
size_success = False
print(
f"Mismatch file size for {f}, Local: {source_file_to_size[f]} Blob: {dst_file_to_size[f]}"
)
else:
if print_sizes:
print(f"Matching file size: {dst_file_to_size[f]} for {f}")
elif f not in source_file_to_size:
size_success = False
print(f"Missing file in source: {f}")
elif f not in dst_file_to_size:
size_success = False
print(f"missing file in dst: {f}")
else:
raise ValueError("Unexpected to be missing file in src and dst")
if size_success:
print("All files pass size check")
else:
raise ValueError("At least one file failed size comparison check")
if __name__ == "__main__":
app()
|