Spaces:
Sleeping
Sleeping
import csv | |
import os | |
import sys | |
from collections import defaultdict | |
import random | |
from datasets import load_dataset | |
set_names = [ | |
"cmv", | |
"yelp", | |
"xsum", | |
"tldr", | |
"eli5", | |
"wp", | |
"roct", | |
"hswag", | |
"squad", | |
"sci_gen", | |
] | |
oai_list = [ | |
# openai | |
"gpt-3.5-trubo", | |
"text-davinci-003", | |
"text-davinci-002", | |
] | |
llama_list = ["_7B", "_13B", "_30B", "_65B"] | |
glm_list = [ | |
"GLM130B", | |
] | |
flan_list = [ | |
# flan_t5, | |
"flan_t5_small", | |
"flan_t5_base", | |
"flan_t5_large", | |
"flan_t5_xl", | |
"flan_t5_xxl", | |
] | |
opt_list = [ | |
# opt, | |
"opt_125m", | |
"opt_350m", | |
"opt_1.3b", | |
"opt_2.7b", | |
"opt_6.7b", | |
"opt_13b", | |
"opt_30b", | |
"opt_iml_30b", | |
"opt_iml_max_1.3b", | |
] | |
bigscience_list = [ | |
"bloom_7b", | |
"t0_3b", | |
"t0_11b", | |
] | |
eleuther_list = [ | |
"gpt_j", | |
"gpt_neox", | |
] | |
model_sets = [ | |
oai_list, | |
llama_list, | |
glm_list, | |
flan_list, | |
opt_list, | |
bigscience_list, | |
eleuther_list, | |
] | |
data_dir = sys.argv[1] | |
dataset = load_dataset("yaful/DeepfakeTextDetect") | |
if not os.path.exists(data_dir): | |
os.makedirs(data_dir) | |
""" | |
csv_path = f"{data_dir}/train.csv" | |
train_results = list(csv.reader(open(csv_path,encoding='utf-8-sig')))[1:] | |
csv_path = f"{data_dir}/valid.csv" | |
valid_results = list(csv.reader(open(csv_path,encoding='utf-8-sig')))[1:] | |
csv_path = f"{data_dir}/test.csv" | |
test_results = list(csv.reader(open(csv_path,encoding='utf-8-sig')))[1:] | |
""" | |
train_results = [ | |
(row["text"], str(row["label"]), row["src"]) for row in list(dataset["train"]) | |
] | |
valid_results = [ | |
(row["text"], str(row["label"]), row["src"]) for row in list(dataset["validation"]) | |
] | |
test_results = [ | |
(row["text"], str(row["label"]), row["src"]) for row in list(dataset["test"]) | |
] | |
merge_dict = { | |
"train": (train_results, 800), | |
"valid": (valid_results, 100), | |
"test": (test_results, 100), | |
} | |
test_ood_gpt = dataset["test_ood_gpt"] | |
test_ood_gpt_para = dataset["test_ood_gpt_para"] | |
test_ood_gpt.to_csv(os.path.join(data_dir, "test_ood_gpt.csv")) | |
test_ood_gpt_para.to_csv(os.path.join(data_dir, "test_ood_gpt_para.csv")) | |
# make domain-specific_model-specific (gpt_j) | |
def prepare_domain_specific_model_specific(): | |
tgt_model = "gpt_j" | |
testbed_dir = f"{data_dir}/domain_specific_model_specific" | |
sub_results = defaultdict(lambda: defaultdict(list)) | |
print("# preparing domain-specific & model-specific ...") | |
for name in set_names: | |
print(f"## preparing {name} ...") | |
for split in ["train", "valid", "test"]: | |
split_results, split_count = merge_dict[split] | |
count = 0 | |
for res in split_results: | |
info = res[2] | |
res = res[:2] | |
if name in info: | |
# human-written | |
if res[1] == "1" and count <= split_count: | |
sub_results[name][split].append(res) | |
# machine-generated | |
if tgt_model in info: | |
assert res[1] == "0" | |
sub_results[name][split].append(res) | |
count += 1 | |
sub_dir = f"{testbed_dir}/{name}" | |
os.makedirs(sub_dir, exist_ok=True) | |
for split in ["train", "valid", "test"]: | |
print(f"{split} set: {len(sub_results[name][split])}") | |
rows = sub_results[name][split] | |
row_head = [["text", "label"]] | |
rows = row_head + rows | |
tmp_path = f"{sub_dir}/{split}.csv" | |
with open(tmp_path, "w", newline="", encoding="utf-8-sig") as f: | |
csvw = csv.writer(f) | |
csvw.writerows(rows) | |
# make domain_specific_cross_models | |
def prepare_domain_specific_cross_models(): | |
testbed_dir = f"{data_dir}/domain_specific_cross_models" | |
sub_results = defaultdict(lambda: defaultdict(list)) | |
print("# preparing domain_specific_cross_models ...") | |
for name in set_names: | |
print(f"## preparing {name} ...") | |
for split in ["train", "valid", "test"]: | |
split_results, split_count = merge_dict[split] | |
for res in split_results: | |
info = res[2] | |
res = res[:2] | |
if name in info: | |
# human-written | |
if res[1] == "1": | |
sub_results[name][split].append(res) | |
# machine-generated | |
else: | |
sub_results[name][split].append(res) | |
sub_dir = f"{testbed_dir}/{name}" | |
os.makedirs(sub_dir, exist_ok=True) | |
for split in ["train", "valid", "test"]: | |
print(f"{split} set: {len(sub_results[name][split])}") | |
rows = sub_results[name][split] | |
row_head = [["text", "label"]] | |
rows = row_head + rows | |
tmp_path = f"{sub_dir}/{split}.csv" | |
with open(tmp_path, "w", newline="", encoding="utf-8-sig") as f: | |
csvw = csv.writer(f) | |
csvw.writerows(rows) | |
# make cross_domains_model_specific | |
def prepare_cross_domains_model_specific(): | |
print("# preparing cross_domains_model_specific ...") | |
for model_patterns in model_sets: | |
sub_dir = f"{data_dir}/cross_domains_model_specific/model_{model_patterns[0]}" | |
os.makedirs(sub_dir, exist_ok=True) | |
# model_pattern = dict.fromkeys(model_pattern) | |
_tmp = " ".join(model_patterns) | |
print(f"## preparing {_tmp} ...") | |
ood_pos_test_samples = [] | |
out_split_samples = defaultdict(list) | |
for split in ["train", "valid", "test"]: | |
rows = merge_dict[split][0] | |
# print(f"Original {split} set length: {len(rows)}") | |
out_rows = [] | |
for row in rows: | |
valid = False | |
srcinfo = row[2] | |
if row[1] == "1": # appending all positive samples | |
valid = True | |
for pattern in model_patterns: | |
if pattern in srcinfo: | |
valid = True | |
break | |
if valid: | |
out_rows.append(row) | |
# out_rows.append(row+[srcinfo[0]]) | |
out_split_samples[split] = out_rows | |
for split in ["train", "valid", "test"]: | |
random.seed(1) | |
rows = out_split_samples[split] | |
pos_rows = [r for r in rows if r[1] == "1"] | |
neg_rows = [r for r in rows if r[1] == "0"] | |
len_neg = len(neg_rows) | |
random.shuffle(pos_rows) | |
out_split_samples[split] = pos_rows[:len_neg] + neg_rows | |
for split in ["train", "valid", "test"]: | |
out_rows = [e[:-1] for e in out_split_samples[split]] | |
print(f"{split} set: {len(out_rows)} ...") | |
# xxx | |
tgt_path = f"{sub_dir}/{split}.csv" | |
with open(tgt_path, "w", newline="", encoding="utf-8-sig") as f: | |
csvw = csv.writer(f) | |
csvw.writerows([["text", "label"]] + out_rows) | |
# make cross_domains_cross_models | |
def prepare_cross_domains_cross_models(): | |
print("# preparing cross_domains_cross_models ...") | |
testbed_dir = f"{data_dir}/cross_domains_cross_models" | |
os.makedirs(testbed_dir, exist_ok=True) | |
for split in ["train", "valid", "test"]: | |
csv_path = f"{testbed_dir}/{split}.csv" | |
with open(csv_path, "w", newline="", encoding="utf-8-sig") as f: | |
rows = [row[:-1] for row in merge_dict[split][0]] | |
print(f"{split} set: {len(rows)} ...") | |
csvw = csv.writer(f) | |
csvw.writerows([["text", "label"]] + rows) | |
# make unseen_models | |
def prepare_unseen_models(): | |
print("# preparing unseen_models ...") | |
for model_patterns in model_sets: | |
sub_dir = f"{data_dir}/unseen_models/unseen_model_{model_patterns[0]}" | |
os.makedirs(sub_dir, exist_ok=True) | |
_tmp = " ".join(model_patterns) | |
print(f"## preparing ood-models {_tmp} ...") | |
ood_pos_test_samples = [] | |
out_split_samples = defaultdict(list) | |
for split in ["train", "valid", "test", "test_ood"]: | |
data_name = split if split != "test_ood" else "test" | |
rows = merge_dict[data_name][0] | |
out_rows = [] | |
for row in rows: | |
valid = False | |
srcinfo = row[2] | |
for pattern in model_patterns: | |
if split != "test_ood": | |
if pattern in srcinfo: | |
valid = False | |
break | |
valid = True | |
else: | |
if pattern in srcinfo: | |
valid = True | |
break | |
if valid: | |
out_rows.append(row) | |
out_split_samples[split] = out_rows | |
random.seed(1) | |
test_rows = out_split_samples["test"] | |
test_pos_rows = [r for r in test_rows if r[1] == "1"] | |
test_neg_rows = [r for r in test_rows if r[1] == "0"] | |
len_aug = len(out_split_samples["test_ood"]) | |
# print(len_aug) | |
random.shuffle(test_pos_rows) | |
# out_split_samples['test'] = test_pos_rows[len_aug:] + test_neg_rows | |
out_split_samples["test_ood"] = ( | |
test_pos_rows[:len_aug] + out_split_samples["test_ood"] | |
) | |
for split in ["train", "valid", "test", "test_ood"]: | |
out_rows = [e[:-1] for e in out_split_samples[split]] | |
print(f"{split} set: {len(out_rows)}") | |
tgt_path = f"{sub_dir}/{split}.csv" | |
with open(tgt_path, "w", newline="", encoding="utf-8-sig") as f: | |
csvw = csv.writer(f) | |
csvw.writerows([["text", "label"]] + out_rows) | |
# make unseen_domains | |
def prepare_unseen_domains(): | |
print("# preparing unseen_domains ...") | |
testbed_dir = f"{data_dir}/unseen_domains" | |
sub_results = defaultdict(lambda: defaultdict(list)) | |
for name in set_names: | |
sub_dir = f"{data_dir}/unseen_domains/unseen_domain_{name}" | |
os.makedirs(sub_dir, exist_ok=True) | |
print(f"## preparing ood-domains {name} ...") | |
ood_pos_test_samples = [] | |
out_split_samples = defaultdict(list) | |
for split in ["train", "valid", "test", "test_ood"]: | |
data_name = split if split != "test_ood" else "test" | |
rows = merge_dict[data_name][0] | |
out_rows = [] | |
for row in rows: | |
srcinfo = row[2] | |
valid = True if name in srcinfo else False | |
valid = not valid if split != "test_ood" else valid | |
if valid: | |
out_rows.append(row) | |
out_split_samples[split] = out_rows | |
for split in ["train", "valid", "test", "test_ood"]: | |
out_rows = [e[:-1] for e in out_split_samples[split]] | |
print(f"{split} set: {len(out_rows)}") | |
tgt_path = f"{sub_dir}/{split}.csv" | |
with open(tgt_path, "w", newline="", encoding="utf-8-sig") as f: | |
csvw = csv.writer(f) | |
csvw.writerows([["text", "label"]] + out_rows) | |
# prepare 6 testbeds | |
prepare_domain_specific_model_specific() | |
print("-" * 100) | |
prepare_domain_specific_cross_models() | |
print("-" * 100) | |
prepare_cross_domains_model_specific() | |
print("-" * 100) | |
prepare_cross_domains_cross_models() | |
print("-" * 100) | |
prepare_unseen_models() | |
print("-" * 100) | |
prepare_unseen_domains() | |
print("-" * 100) | |