532 lines
19 KiB
Python
532 lines
19 KiB
Python
import requests
|
||
from bs4 import BeautifulSoup
|
||
import time
|
||
import json
|
||
import csv
|
||
import os
|
||
from urllib.parse import urljoin, urlparse
|
||
import logging
|
||
from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeoutError
|
||
import asyncio
|
||
|
||
class WebScraper:
|
||
def __init__(self, delay=1, timeout=30000, headless=True):
|
||
"""
|
||
初始化爬虫
|
||
:param delay: 请求间隔时间(秒)
|
||
:param timeout: 页面加载超时时间(毫秒)
|
||
:param headless: 是否无头模式运行浏览器
|
||
"""
|
||
self.delay = delay
|
||
self.timeout = timeout
|
||
self.headless = headless
|
||
self.playwright = None
|
||
self.browser = None
|
||
self.context = None
|
||
|
||
# 设置日志
|
||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||
self.logger = logging.getLogger(__name__)
|
||
|
||
def start_browser(self):
|
||
"""启动浏览器"""
|
||
try:
|
||
self.playwright = sync_playwright().start()
|
||
self.browser = self.playwright.chromium.launch(
|
||
headless=self.headless,
|
||
args=[
|
||
'--disable-blink-features=AutomationControlled',
|
||
'--no-sandbox',
|
||
'--disable-dev-shm-usage'
|
||
]
|
||
)
|
||
self.context = self.browser.new_context(
|
||
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
|
||
viewport={'width': 1920, 'height': 1080}
|
||
)
|
||
self.logger.info("浏览器启动成功")
|
||
except Exception as e:
|
||
self.logger.error(f"启动浏览器失败: {e}")
|
||
raise
|
||
|
||
def stop_browser(self):
|
||
"""关闭浏览器"""
|
||
if self.context:
|
||
self.context.close()
|
||
if self.browser:
|
||
self.browser.close()
|
||
if self.playwright:
|
||
self.playwright.stop()
|
||
self.logger.info("浏览器已关闭")
|
||
|
||
def get_page_with_playwright(self, url, max_retries=3, wait_for_load=True):
|
||
"""
|
||
使用Playwright获取网页内容
|
||
:param url: 目标URL
|
||
:param max_retries: 最大重试次数
|
||
:param wait_for_load: 是否等待页面完全加载
|
||
:return: HTML内容或None
|
||
"""
|
||
if not self.context:
|
||
self.start_browser()
|
||
|
||
for attempt in range(max_retries):
|
||
page = None
|
||
try:
|
||
page = self.context.new_page()
|
||
|
||
# 设置额外的请求头
|
||
page.set_extra_http_headers({
|
||
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
|
||
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
|
||
'Accept-Encoding': 'gzip, deflate, br',
|
||
'DNT': '1',
|
||
'Connection': 'keep-alive',
|
||
'Upgrade-Insecure-Requests': '1',
|
||
})
|
||
|
||
# 访问页面
|
||
response = page.goto(url, timeout=self.timeout, wait_until='domcontentloaded')
|
||
|
||
if response and response.status == 200:
|
||
# 等待页面加载完成
|
||
if wait_for_load:
|
||
try:
|
||
# 等待网络空闲
|
||
|
||
page.wait_for_load_state('networkidle', timeout=10000)
|
||
# 额外等待JavaScript执行
|
||
page.wait_for_timeout(2000)
|
||
except PlaywrightTimeoutError:
|
||
self.logger.warning(f"页面加载超时,但继续处理: {url}")
|
||
|
||
# 获取页面内容
|
||
html_content = page.content()
|
||
page.close()
|
||
|
||
self.logger.info(f"成功获取页面: {url}")
|
||
time.sleep(self.delay)
|
||
return html_content
|
||
else:
|
||
self.logger.warning(f"页面响应状态异常: {response.status if response else 'None'}")
|
||
|
||
except PlaywrightTimeoutError:
|
||
self.logger.warning(f"页面加载超时 (尝试 {attempt + 1}/{max_retries}): {url}")
|
||
except Exception as e:
|
||
self.logger.warning(f"获取页面失败 (尝试 {attempt + 1}/{max_retries}): {e}")
|
||
finally:
|
||
if page:
|
||
page.close()
|
||
|
||
if attempt < max_retries - 1:
|
||
time.sleep(2 ** attempt) # 指数退避
|
||
|
||
self.logger.error(f"无法获取页面: {url}")
|
||
return None
|
||
|
||
def get_page(self, url, max_retries=3):
|
||
"""
|
||
获取网页内容(兼容方法,使用Playwright)
|
||
:param url: 目标URL
|
||
:param max_retries: 最大重试次数
|
||
:return: HTML内容或None
|
||
"""
|
||
return self.get_page_with_playwright(url, max_retries)
|
||
|
||
def parse_html(self, html_content, parser='html.parser'):
|
||
"""
|
||
解析HTML内容
|
||
:param html_content: HTML字符串
|
||
:param parser: 解析器类型
|
||
:return: BeautifulSoup对象
|
||
"""
|
||
return BeautifulSoup(html_content, parser)
|
||
|
||
def extract_links(self, soup, base_url):
|
||
"""
|
||
提取页面中的所有链接
|
||
:param soup: BeautifulSoup对象
|
||
:param base_url: 基础URL
|
||
:return: 链接列表
|
||
"""
|
||
links = []
|
||
for link in soup.find_all('a', href=True):
|
||
full_url = urljoin(base_url, link['href'])
|
||
links.append({
|
||
'url': full_url,
|
||
'text': link.get_text(strip=True),
|
||
'title': link.get('title', '')
|
||
})
|
||
return links
|
||
|
||
def extract_images(self, soup, base_url):
|
||
"""
|
||
提取页面中的所有图片
|
||
:param soup: BeautifulSoup对象
|
||
:param base_url: 基础URL
|
||
:return: 图片信息列表
|
||
"""
|
||
images = []
|
||
for img in soup.find_all('img'):
|
||
src = img.get('src')
|
||
if src:
|
||
full_url = urljoin(base_url, src)
|
||
images.append({
|
||
'url': full_url,
|
||
'alt': img.get('alt', ''),
|
||
'title': img.get('title', '')
|
||
})
|
||
return images
|
||
|
||
def save_to_json(self, data, filename):
|
||
"""
|
||
保存数据为JSON格式
|
||
:param data: 要保存的数据
|
||
:param filename: 文件名
|
||
"""
|
||
os.makedirs(os.path.dirname(filename), exist_ok=True)
|
||
with open(filename, 'w', encoding='utf-8') as f:
|
||
json.dump(data, f, ensure_ascii=False, indent=2)
|
||
self.logger.info(f"数据已保存到: {filename}")
|
||
|
||
def save_to_csv(self, data, filename, fieldnames):
|
||
"""
|
||
保存数据为CSV格式
|
||
:param data: 要保存的数据列表
|
||
:param filename: 文件名
|
||
:param fieldnames: CSV字段名
|
||
"""
|
||
os.makedirs(os.path.dirname(filename), exist_ok=True)
|
||
with open(filename, 'w', newline='', encoding='utf-8') as f:
|
||
writer = csv.DictWriter(f, fieldnames=fieldnames)
|
||
writer.writeheader()
|
||
writer.writerows(data)
|
||
self.logger.info(f"数据已保存到: {filename}")
|
||
|
||
def save_to_html(self, html_content, filename):
|
||
"""
|
||
保存HTML内容到文件
|
||
:param html_content: HTML内容
|
||
:param filename: 文件名
|
||
"""
|
||
os.makedirs(os.path.dirname(filename), exist_ok=True)
|
||
with open(filename, 'w', encoding='utf-8') as f:
|
||
f.write(html_content)
|
||
self.logger.info(f"HTML已保存到: {filename}")
|
||
|
||
def scrape_page(self, url):
|
||
"""
|
||
爬取单个页面的完整信息
|
||
:param url: 目标URL
|
||
:return: 页面数据字典
|
||
"""
|
||
html_content = self.get_page_with_playwright(url)
|
||
if not html_content:
|
||
return None
|
||
|
||
# 保存原始HTML内容
|
||
soup = self.parse_html(html_content)
|
||
|
||
# 提取基本信息
|
||
title = soup.find('title')
|
||
title_text = title.get_text(strip=True) if title else ''
|
||
|
||
# 提取meta信息
|
||
meta_description = soup.find('meta', attrs={'name': 'description'})
|
||
description = meta_description.get('content', '') if meta_description else ''
|
||
|
||
# 提取链接和图片
|
||
links = self.extract_links(soup, url)
|
||
images = self.extract_images(soup, url)
|
||
|
||
# 提取文本内容
|
||
text_content = soup.get_text(strip=True)
|
||
|
||
# 提取JavaScript变量(特别针对飞书等SPA应用)
|
||
script_data = self.extract_script_data(soup)
|
||
|
||
return {
|
||
'url': url,
|
||
'title': title_text,
|
||
'description': description,
|
||
'links': links,
|
||
'images': images,
|
||
'text_length': len(text_content),
|
||
'html_content': html_content,
|
||
'html_length': len(html_content),
|
||
'script_data': script_data,
|
||
'timestamp': time.strftime('%Y-%m-%d %H:%M:%S')
|
||
}
|
||
|
||
def extract_script_data(self, soup):
|
||
"""
|
||
提取页面中的JavaScript数据
|
||
:param soup: BeautifulSoup对象
|
||
:return: 提取的数据字典
|
||
"""
|
||
script_data = {}
|
||
|
||
# 查找包含JSON数据的script标签
|
||
for script in soup.find_all('script'):
|
||
script_text = script.get_text()
|
||
|
||
# 查找常见的数据模式
|
||
patterns = [
|
||
'window.__pageStartTime',
|
||
'window.serverInjectRes',
|
||
'window.ENV',
|
||
'window.locales',
|
||
'window.deviceInfoBySSO'
|
||
]
|
||
|
||
for pattern in patterns:
|
||
if pattern in script_text:
|
||
try:
|
||
# 提取变量赋值
|
||
lines = script_text.split('\n')
|
||
for line in lines:
|
||
if pattern in line and '=' in line:
|
||
script_data[pattern] = line.strip()
|
||
except Exception as e:
|
||
self.logger.warning(f"提取脚本数据失败: {e}")
|
||
|
||
return script_data
|
||
|
||
def create_safe_filename(self, url, max_length=100):
|
||
"""
|
||
创建安全的文件名
|
||
:param url: 原始URL
|
||
:param max_length: 最大文件名长度
|
||
:return: 安全的文件名
|
||
"""
|
||
# 移除协议
|
||
safe_name = url.replace('https://', '').replace('http://', '')
|
||
|
||
# 替换或移除无效字符
|
||
invalid_chars = ['<', '>', ':', '"', '|', '?', '*', '/', '\\', '&', '=', '#']
|
||
for char in invalid_chars:
|
||
safe_name = safe_name.replace(char, '_')
|
||
|
||
# 移除连续的下划线
|
||
while '__' in safe_name:
|
||
safe_name = safe_name.replace('__', '_')
|
||
|
||
# 移除开头和结尾的下划线
|
||
safe_name = safe_name.strip('_')
|
||
|
||
# 限制长度
|
||
if len(safe_name) > max_length:
|
||
safe_name = safe_name[:max_length]
|
||
|
||
# 确保文件名不为空
|
||
if not safe_name:
|
||
safe_name = 'scraped_page'
|
||
|
||
return safe_name
|
||
|
||
def manual_login_and_scrape(self, login_url, target_urls):
|
||
"""
|
||
手动登录并爬取页面
|
||
:param login_url: 登录页面URL
|
||
:param target_urls: 需要爬取的目标URL列表
|
||
:return: 爬取结果列表
|
||
"""
|
||
if not self.context:
|
||
self.start_browser()
|
||
|
||
page = self.context.new_page()
|
||
|
||
try:
|
||
# 打开登录页面
|
||
self.logger.info(f"正在打开登录页面: {login_url}")
|
||
page.goto(login_url, timeout=self.timeout)
|
||
|
||
# 等待用户手动登录
|
||
print("=" * 60)
|
||
print("浏览器已打开,请在浏览器中完成登录操作")
|
||
print("登录完成后,请在此控制台按回车键继续...")
|
||
print("=" * 60)
|
||
input()
|
||
|
||
# 确认登录状态
|
||
current_url = page.url
|
||
self.logger.info(f"当前页面URL: {current_url}")
|
||
|
||
# 获取cookies用于后续请求
|
||
cookies = self.context.cookies()
|
||
self.logger.info(f"获取到 {len(cookies)} 个cookies")
|
||
|
||
# 开始爬取目标页面
|
||
results = []
|
||
for url in target_urls:
|
||
print(f"正在爬取: {url}")
|
||
data = self.scrape_page_with_session(page, url)
|
||
if data:
|
||
results.append(data)
|
||
|
||
return results
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"手动登录过程中发生错误: {e}")
|
||
return []
|
||
finally:
|
||
if page:
|
||
page.close()
|
||
|
||
def scrape_page_with_session(self, page, url):
|
||
"""
|
||
使用已有session爬取页面
|
||
:param page: Playwright页面对象
|
||
:param url: 目标URL
|
||
:return: 页面数据字典
|
||
"""
|
||
try:
|
||
# 导航到目标页面
|
||
response = page.goto(url, timeout=self.timeout, wait_until='domcontentloaded')
|
||
|
||
if response and response.status == 200:
|
||
# 等待页面加载完成
|
||
try:
|
||
page.wait_for_load_state('networkidle', timeout=15000)
|
||
page.wait_for_timeout(3000) # 额外等待时间确保内容加载
|
||
except PlaywrightTimeoutError:
|
||
self.logger.warning(f"页面加载超时,但继续处理: {url}")
|
||
|
||
# 获取页面内容
|
||
html_content = page.content()
|
||
|
||
# 解析页面内容
|
||
soup = self.parse_html(html_content)
|
||
|
||
# 提取基本信息
|
||
title = soup.find('title')
|
||
title_text = title.get_text(strip=True) if title else ''
|
||
|
||
# 提取meta信息
|
||
meta_description = soup.find('meta', attrs={'name': 'description'})
|
||
description = meta_description.get('content', '') if meta_description else ''
|
||
|
||
# 提取链接和图片
|
||
links = self.extract_links(soup, url)
|
||
images = self.extract_images(soup, url)
|
||
|
||
# 提取文本内容
|
||
text_content = soup.get_text(strip=True)
|
||
|
||
# 提取JavaScript变量
|
||
script_data = self.extract_script_data(soup)
|
||
|
||
self.logger.info(f"成功获取页面: {url}")
|
||
time.sleep(self.delay)
|
||
|
||
return {
|
||
'url': url,
|
||
'title': title_text,
|
||
'description': description,
|
||
'links': links,
|
||
'images': images,
|
||
'text_length': len(text_content),
|
||
'html_content': html_content,
|
||
'html_length': len(html_content),
|
||
'script_data': script_data,
|
||
'timestamp': time.strftime('%Y-%m-%d %H:%M:%S')
|
||
}
|
||
else:
|
||
self.logger.warning(f"页面响应状态异常: {response.status if response else 'None'}")
|
||
return None
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"爬取页面失败: {e}")
|
||
return None
|
||
|
||
def wait_for_manual_action(self, page, message="请完成操作后按回车继续..."):
|
||
"""
|
||
等待用户手动操作
|
||
:param page: Playwright页面对象
|
||
:param message: 提示信息
|
||
"""
|
||
print("=" * 60)
|
||
print(f"当前页面: {page.url}")
|
||
print(message)
|
||
print("=" * 60)
|
||
input()
|
||
|
||
def main():
|
||
"""
|
||
示例使用方法 - 手动登录模式
|
||
"""
|
||
# 创建爬虫实例(非无头模式,便于手动登录)
|
||
scraper = WebScraper(delay=2, timeout=30000, headless=False)
|
||
|
||
try:
|
||
# 启动浏览器
|
||
scraper.start_browser()
|
||
|
||
# 定义输出目录
|
||
output_dir = 'd:/Github/devops/output'
|
||
|
||
# 登录页面和目标页面
|
||
login_url = 'https://yixj5m42od.feishu.cn/sheets/VlAIsKxYchNABztr3RGcBnrEnYM?table=tblNs3625a9MAc89&view=vewkRuPWCL&sheet=y1FRE9'
|
||
target_urls = [
|
||
'https://yixj5m42od.feishu.cn/sheets/VlAIsKxYchNABztr3RGcBnrEnYM?table=tblNs3625a9MAc89&view=vewkRuPWCL&sheet=y1FRE9',
|
||
# 可以添加更多需要爬取的页面
|
||
]
|
||
|
||
# 手动登录并爬取
|
||
print("开始手动登录流程...")
|
||
results = scraper.manual_login_and_scrape(login_url, target_urls)
|
||
|
||
# 保存结果
|
||
if results:
|
||
# 保存HTML文件
|
||
for result in results:
|
||
safe_filename = scraper.create_safe_filename(result['url'])
|
||
html_filename = f'{output_dir}/html/{safe_filename}_logged_in.html'
|
||
scraper.save_to_html(result['html_content'], html_filename)
|
||
|
||
# 保存完整数据(包含HTML)到JSON
|
||
scraper.save_to_json(results, f'{output_dir}/scrape_results_full_logged_in.json')
|
||
|
||
# 保存不含HTML的精简版本到JSON
|
||
simplified_results = []
|
||
for result in results:
|
||
simplified_result = result.copy()
|
||
simplified_result.pop('html_content', None)
|
||
simplified_results.append(simplified_result)
|
||
|
||
scraper.save_to_json(simplified_results, f'{output_dir}/scrape_results_logged_in.json')
|
||
|
||
# 保存CSV
|
||
csv_data = []
|
||
for result in results:
|
||
csv_data.append({
|
||
'url': result['url'],
|
||
'title': result['title'],
|
||
'description': result['description'],
|
||
'links_count': len(result['links']),
|
||
'images_count': len(result['images']),
|
||
'html_length': result['html_length'],
|
||
'timestamp': result['timestamp']
|
||
})
|
||
|
||
fieldnames = ['url', 'title', 'description', 'links_count', 'images_count', 'html_length', 'timestamp']
|
||
scraper.save_to_csv(csv_data, f'{output_dir}/scrape_results_logged_in.csv', fieldnames)
|
||
|
||
print(f"爬取完成,共处理 {len(results)} 个页面")
|
||
print(f"HTML文件保存在: {output_dir}/html/")
|
||
print(f"完整数据保存在: {output_dir}/scrape_results_full_logged_in.json")
|
||
else:
|
||
print("未获取到任何数据")
|
||
|
||
except KeyboardInterrupt:
|
||
print("\n用户中断操作")
|
||
except Exception as e:
|
||
print(f"发生错误: {e}")
|
||
finally:
|
||
# 确保浏览器被关闭
|
||
print("正在关闭浏览器...")
|
||
scraper.stop_browser()
|
||
|
||
if __name__ == "__main__":
|
||
main()
|