import requests from bs4 import BeautifulSoup import time import json import csv import os from urllib.parse import urljoin, urlparse import logging from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeoutError import asyncio class WebScraper: def __init__(self, delay=1, timeout=30000, headless=True): """ 初始化爬虫 :param delay: 请求间隔时间(秒) :param timeout: 页面加载超时时间(毫秒) :param headless: 是否无头模式运行浏览器 """ self.delay = delay self.timeout = timeout self.headless = headless self.playwright = None self.browser = None self.context = None # 设置日志 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') self.logger = logging.getLogger(__name__) def start_browser(self): """启动浏览器""" try: self.playwright = sync_playwright().start() self.browser = self.playwright.chromium.launch( headless=self.headless, args=[ '--disable-blink-features=AutomationControlled', '--no-sandbox', '--disable-dev-shm-usage' ] ) self.context = self.browser.new_context( user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', viewport={'width': 1920, 'height': 1080} ) self.logger.info("浏览器启动成功") except Exception as e: self.logger.error(f"启动浏览器失败: {e}") raise def stop_browser(self): """关闭浏览器""" if self.context: self.context.close() if self.browser: self.browser.close() if self.playwright: self.playwright.stop() self.logger.info("浏览器已关闭") def get_page_with_playwright(self, url, max_retries=3, wait_for_load=True): """ 使用Playwright获取网页内容 :param url: 目标URL :param max_retries: 最大重试次数 :param wait_for_load: 是否等待页面完全加载 :return: HTML内容或None """ if not self.context: self.start_browser() for attempt in range(max_retries): page = None try: page = self.context.new_page() # 设置额外的请求头 page.set_extra_http_headers({ 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Accept-Encoding': 'gzip, deflate, br', 'DNT': '1', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', }) # 访问页面 response = page.goto(url, timeout=self.timeout, wait_until='domcontentloaded') if response and response.status == 200: # 等待页面加载完成 if wait_for_load: try: # 等待网络空闲 page.wait_for_load_state('networkidle', timeout=10000) # 额外等待JavaScript执行 page.wait_for_timeout(2000) except PlaywrightTimeoutError: self.logger.warning(f"页面加载超时,但继续处理: {url}") # 获取页面内容 html_content = page.content() page.close() self.logger.info(f"成功获取页面: {url}") time.sleep(self.delay) return html_content else: self.logger.warning(f"页面响应状态异常: {response.status if response else 'None'}") except PlaywrightTimeoutError: self.logger.warning(f"页面加载超时 (尝试 {attempt + 1}/{max_retries}): {url}") except Exception as e: self.logger.warning(f"获取页面失败 (尝试 {attempt + 1}/{max_retries}): {e}") finally: if page: page.close() if attempt < max_retries - 1: time.sleep(2 ** attempt) # 指数退避 self.logger.error(f"无法获取页面: {url}") return None def get_page(self, url, max_retries=3): """ 获取网页内容(兼容方法,使用Playwright) :param url: 目标URL :param max_retries: 最大重试次数 :return: HTML内容或None """ return self.get_page_with_playwright(url, max_retries) def parse_html(self, html_content, parser='html.parser'): """ 解析HTML内容 :param html_content: HTML字符串 :param parser: 解析器类型 :return: BeautifulSoup对象 """ return BeautifulSoup(html_content, parser) def extract_links(self, soup, base_url): """ 提取页面中的所有链接 :param soup: BeautifulSoup对象 :param base_url: 基础URL :return: 链接列表 """ links = [] for link in soup.find_all('a', href=True): full_url = urljoin(base_url, link['href']) links.append({ 'url': full_url, 'text': link.get_text(strip=True), 'title': link.get('title', '') }) return links def extract_images(self, soup, base_url): """ 提取页面中的所有图片 :param soup: BeautifulSoup对象 :param base_url: 基础URL :return: 图片信息列表 """ images = [] for img in soup.find_all('img'): src = img.get('src') if src: full_url = urljoin(base_url, src) images.append({ 'url': full_url, 'alt': img.get('alt', ''), 'title': img.get('title', '') }) return images def save_to_json(self, data, filename): """ 保存数据为JSON格式 :param data: 要保存的数据 :param filename: 文件名 """ os.makedirs(os.path.dirname(filename), exist_ok=True) with open(filename, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) self.logger.info(f"数据已保存到: {filename}") def save_to_csv(self, data, filename, fieldnames): """ 保存数据为CSV格式 :param data: 要保存的数据列表 :param filename: 文件名 :param fieldnames: CSV字段名 """ os.makedirs(os.path.dirname(filename), exist_ok=True) with open(filename, 'w', newline='', encoding='utf-8') as f: writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() writer.writerows(data) self.logger.info(f"数据已保存到: {filename}") def save_to_html(self, html_content, filename): """ 保存HTML内容到文件 :param html_content: HTML内容 :param filename: 文件名 """ os.makedirs(os.path.dirname(filename), exist_ok=True) with open(filename, 'w', encoding='utf-8') as f: f.write(html_content) self.logger.info(f"HTML已保存到: {filename}") def scrape_page(self, url): """ 爬取单个页面的完整信息 :param url: 目标URL :return: 页面数据字典 """ html_content = self.get_page_with_playwright(url) if not html_content: return None # 保存原始HTML内容 soup = self.parse_html(html_content) # 提取基本信息 title = soup.find('title') title_text = title.get_text(strip=True) if title else '' # 提取meta信息 meta_description = soup.find('meta', attrs={'name': 'description'}) description = meta_description.get('content', '') if meta_description else '' # 提取链接和图片 links = self.extract_links(soup, url) images = self.extract_images(soup, url) # 提取文本内容 text_content = soup.get_text(strip=True) # 提取JavaScript变量(特别针对飞书等SPA应用) script_data = self.extract_script_data(soup) return { 'url': url, 'title': title_text, 'description': description, 'links': links, 'images': images, 'text_length': len(text_content), 'html_content': html_content, 'html_length': len(html_content), 'script_data': script_data, 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S') } def extract_script_data(self, soup): """ 提取页面中的JavaScript数据 :param soup: BeautifulSoup对象 :return: 提取的数据字典 """ script_data = {} # 查找包含JSON数据的script标签 for script in soup.find_all('script'): script_text = script.get_text() # 查找常见的数据模式 patterns = [ 'window.__pageStartTime', 'window.serverInjectRes', 'window.ENV', 'window.locales', 'window.deviceInfoBySSO' ] for pattern in patterns: if pattern in script_text: try: # 提取变量赋值 lines = script_text.split('\n') for line in lines: if pattern in line and '=' in line: script_data[pattern] = line.strip() except Exception as e: self.logger.warning(f"提取脚本数据失败: {e}") return script_data def create_safe_filename(self, url, max_length=100): """ 创建安全的文件名 :param url: 原始URL :param max_length: 最大文件名长度 :return: 安全的文件名 """ # 移除协议 safe_name = url.replace('https://', '').replace('http://', '') # 替换或移除无效字符 invalid_chars = ['<', '>', ':', '"', '|', '?', '*', '/', '\\', '&', '=', '#'] for char in invalid_chars: safe_name = safe_name.replace(char, '_') # 移除连续的下划线 while '__' in safe_name: safe_name = safe_name.replace('__', '_') # 移除开头和结尾的下划线 safe_name = safe_name.strip('_') # 限制长度 if len(safe_name) > max_length: safe_name = safe_name[:max_length] # 确保文件名不为空 if not safe_name: safe_name = 'scraped_page' return safe_name def manual_login_and_scrape(self, login_url, target_urls): """ 手动登录并爬取页面 :param login_url: 登录页面URL :param target_urls: 需要爬取的目标URL列表 :return: 爬取结果列表 """ if not self.context: self.start_browser() page = self.context.new_page() try: # 打开登录页面 self.logger.info(f"正在打开登录页面: {login_url}") page.goto(login_url, timeout=self.timeout) # 等待用户手动登录 print("=" * 60) print("浏览器已打开,请在浏览器中完成登录操作") print("登录完成后,请在此控制台按回车键继续...") print("=" * 60) input() # 确认登录状态 current_url = page.url self.logger.info(f"当前页面URL: {current_url}") # 获取cookies用于后续请求 cookies = self.context.cookies() self.logger.info(f"获取到 {len(cookies)} 个cookies") # 开始爬取目标页面 results = [] for url in target_urls: print(f"正在爬取: {url}") data = self.scrape_page_with_session(page, url) if data: results.append(data) return results except Exception as e: self.logger.error(f"手动登录过程中发生错误: {e}") return [] finally: if page: page.close() def scrape_page_with_session(self, page, url): """ 使用已有session爬取页面 :param page: Playwright页面对象 :param url: 目标URL :return: 页面数据字典 """ try: # 导航到目标页面 response = page.goto(url, timeout=self.timeout, wait_until='domcontentloaded') if response and response.status == 200: # 等待页面加载完成 try: page.wait_for_load_state('networkidle', timeout=15000) page.wait_for_timeout(3000) # 额外等待时间确保内容加载 except PlaywrightTimeoutError: self.logger.warning(f"页面加载超时,但继续处理: {url}") # 获取页面内容 html_content = page.content() # 解析页面内容 soup = self.parse_html(html_content) # 提取基本信息 title = soup.find('title') title_text = title.get_text(strip=True) if title else '' # 提取meta信息 meta_description = soup.find('meta', attrs={'name': 'description'}) description = meta_description.get('content', '') if meta_description else '' # 提取链接和图片 links = self.extract_links(soup, url) images = self.extract_images(soup, url) # 提取文本内容 text_content = soup.get_text(strip=True) # 提取JavaScript变量 script_data = self.extract_script_data(soup) self.logger.info(f"成功获取页面: {url}") time.sleep(self.delay) return { 'url': url, 'title': title_text, 'description': description, 'links': links, 'images': images, 'text_length': len(text_content), 'html_content': html_content, 'html_length': len(html_content), 'script_data': script_data, 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S') } else: self.logger.warning(f"页面响应状态异常: {response.status if response else 'None'}") return None except Exception as e: self.logger.error(f"爬取页面失败: {e}") return None def wait_for_manual_action(self, page, message="请完成操作后按回车继续..."): """ 等待用户手动操作 :param page: Playwright页面对象 :param message: 提示信息 """ print("=" * 60) print(f"当前页面: {page.url}") print(message) print("=" * 60) input() def main(): """ 示例使用方法 - 手动登录模式 """ # 创建爬虫实例(非无头模式,便于手动登录) scraper = WebScraper(delay=2, timeout=30000, headless=False) try: # 启动浏览器 scraper.start_browser() # 定义输出目录 output_dir = 'd:/Github/devops/output' # 登录页面和目标页面 login_url = 'https://yixj5m42od.feishu.cn/sheets/VlAIsKxYchNABztr3RGcBnrEnYM?table=tblNs3625a9MAc89&view=vewkRuPWCL&sheet=y1FRE9' target_urls = [ 'https://yixj5m42od.feishu.cn/sheets/VlAIsKxYchNABztr3RGcBnrEnYM?table=tblNs3625a9MAc89&view=vewkRuPWCL&sheet=y1FRE9', # 可以添加更多需要爬取的页面 ] # 手动登录并爬取 print("开始手动登录流程...") results = scraper.manual_login_and_scrape(login_url, target_urls) # 保存结果 if results: # 保存HTML文件 for result in results: safe_filename = scraper.create_safe_filename(result['url']) html_filename = f'{output_dir}/html/{safe_filename}_logged_in.html' scraper.save_to_html(result['html_content'], html_filename) # 保存完整数据(包含HTML)到JSON scraper.save_to_json(results, f'{output_dir}/scrape_results_full_logged_in.json') # 保存不含HTML的精简版本到JSON simplified_results = [] for result in results: simplified_result = result.copy() simplified_result.pop('html_content', None) simplified_results.append(simplified_result) scraper.save_to_json(simplified_results, f'{output_dir}/scrape_results_logged_in.json') # 保存CSV csv_data = [] for result in results: csv_data.append({ 'url': result['url'], 'title': result['title'], 'description': result['description'], 'links_count': len(result['links']), 'images_count': len(result['images']), 'html_length': result['html_length'], 'timestamp': result['timestamp'] }) fieldnames = ['url', 'title', 'description', 'links_count', 'images_count', 'html_length', 'timestamp'] scraper.save_to_csv(csv_data, f'{output_dir}/scrape_results_logged_in.csv', fieldnames) print(f"爬取完成,共处理 {len(results)} 个页面") print(f"HTML文件保存在: {output_dir}/html/") print(f"完整数据保存在: {output_dir}/scrape_results_full_logged_in.json") else: print("未获取到任何数据") except KeyboardInterrupt: print("\n用户中断操作") except Exception as e: print(f"发生错误: {e}") finally: # 确保浏览器被关闭 print("正在关闭浏览器...") scraper.stop_browser() if __name__ == "__main__": main()