Spaces:
Sleeping
Sleeping
File size: 5,055 Bytes
ee46c3b 738c854 ee46c3b 712ea2e ee46c3b 712ea2e ee46c3b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
from bs4 import BeautifulSoup
import re
import requests as r
from html2text import html2text
import tqdm
import time
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager
import multiprocessing
# def from_desktop_to_mobile_version(url):
# """Convert a desktop URL to its mobile version."""
# return url.replace("https://kin.naver.com", "https://m.kin.naver.com")
def initialize_webdriver():
"""Initialize and return a WebDriver instance with headless options."""
options = webdriver.ChromeOptions()
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--headless=new")
options.add_argument("--disable-gpu")
service = Service(ChromeDriverManager().install())
return webdriver.Chrome(options=options, service=service)
def process_url(url):
driver = initialize_webdriver()
try:
print("Processing URL:", url)
driver.get(url)
closeBtn = WebDriverWait(driver, 5).until(
EC.element_to_be_clickable((By.CSS_SELECTOR, ".layer_promotion_choice_inner > .ico_close_layer")),
message="Close button not found."
)
if closeBtn:
print("Closing the popup")
closeBtn.click()
time.sleep(0.2)
print("CLOSED")
expandBtn = driver.find_element(By.ID, 'nextPageButton')
print("Expand button: ", expandBtn)
if expandBtn.is_displayed():
WebDriverWait(driver, 10).until(
EC.element_to_be_clickable(expandBtn),
message="Expand button wasn't loaded in time."
)
expandBtn.click()
print("Clicked the ex`pand button")
time.sleep(0.5)
html_content = driver.page_source
soup = BeautifulSoup(html_content, "html.parser")
answers = soup.find_all('div', {'class': 'answerDetail'})
answers = [html2text(str(answer.prettify())) for answer in answers]
title = soup.find('div', {'class': 'endTitleSection'}).text.strip()
questionDetails = soup.find('div', {'class': 'questionDetail'}).text.strip()
title = title.replace("질문", '').strip()
print("Answers extracted from: \n", url)
print(len(answers))
print('-'*60)
return {
"title": title,
"questionDetails": questionDetails,
"url": url,
"answers": answers
}
except Exception as e:
print(f"Error processing URL {url} \n\n\n{e}")
with open('error_urls.txt', 'w') as f:
f.write(url + '\n')
return {"title": '', "questionDetails": '', "url": url, "answers": ''}
finally:
driver.quit()
def get_answers(results_a_elements, query):
"""Fetch answers for all the extracted result links."""
if not results_a_elements:
print("No results found.")
return []
print("Result links extracted: ", len(results_a_elements))
# Limit the number of parallel processes for better resource management
# max_processes = max(1, int(multiprocessing.cpu_count() * 0.5))
# with multiprocessing.Pool(processes=max_processes) as pool:
# results = pool.map(process_url, results_a_elements)
results = []
# answer_count = 0
for url in tqdm.tqdm(results_a_elements):
res = process_url(url)
results.append(res)
answer_count += len(res['answers'])
return results
def get_search_results(query, num_pages):
"""Fetch search results for the given query from Naver 지식in."""
results = []
for page in range(1, num_pages + 1):
url = f"https://kin.naver.com/search/list.naver?query={query}&page={page}"
print("Starting the scraping process for:\n", url)
try:
response = r.get(url)
soup = BeautifulSoup(response.text, "html.parser")
results_a_elements = soup.find("ul", {"class": "basic1"}).find_all("a", {"class": "_searchListTitleAnchor"})
results_a_elements = [a.get('href') for a in results_a_elements if a.get("href")]
results += results_a_elements
except Exception as e:
print(f"Error while fetching search results: {e}")
return results
def extract_data(query, num_pages=150) -> list[dict[str, object]]:
results_a_elements = get_search_results(query, num_pages)
print(results_a_elements)
answers = get_answers(results_a_elements, query)
print("Total answers collected:", len(answers))
return answers
# if __name__ == "__main__":
# process_url("https://kin.naver.com/qna/detail.naver?d1id=4&dirId=401030203&docId=478845808&qb=67O07ZeYIOyImOyIoOu5hA==&enc=utf8§ion=kin.qna_ency&rank=1&search_sort=0&spq=0") |