devops/script/automate.py
2025-12-12 11:40:38 +08:00

532 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import requests
from bs4 import BeautifulSoup
import time
import json
import csv
import os
from urllib.parse import urljoin, urlparse
import logging
from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeoutError
import asyncio
class WebScraper:
def __init__(self, delay=1, timeout=30000, headless=True):
"""
初始化爬虫
:param delay: 请求间隔时间(秒)
:param timeout: 页面加载超时时间(毫秒)
:param headless: 是否无头模式运行浏览器
"""
self.delay = delay
self.timeout = timeout
self.headless = headless
self.playwright = None
self.browser = None
self.context = None
# 设置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
self.logger = logging.getLogger(__name__)
def start_browser(self):
"""启动浏览器"""
try:
self.playwright = sync_playwright().start()
self.browser = self.playwright.chromium.launch(
headless=self.headless,
args=[
'--disable-blink-features=AutomationControlled',
'--no-sandbox',
'--disable-dev-shm-usage'
]
)
self.context = self.browser.new_context(
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
viewport={'width': 1920, 'height': 1080}
)
self.logger.info("浏览器启动成功")
except Exception as e:
self.logger.error(f"启动浏览器失败: {e}")
raise
def stop_browser(self):
"""关闭浏览器"""
if self.context:
self.context.close()
if self.browser:
self.browser.close()
if self.playwright:
self.playwright.stop()
self.logger.info("浏览器已关闭")
def get_page_with_playwright(self, url, max_retries=3, wait_for_load=True):
"""
使用Playwright获取网页内容
:param url: 目标URL
:param max_retries: 最大重试次数
:param wait_for_load: 是否等待页面完全加载
:return: HTML内容或None
"""
if not self.context:
self.start_browser()
for attempt in range(max_retries):
page = None
try:
page = self.context.new_page()
# 设置额外的请求头
page.set_extra_http_headers({
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'DNT': '1',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
})
# 访问页面
response = page.goto(url, timeout=self.timeout, wait_until='domcontentloaded')
if response and response.status == 200:
# 等待页面加载完成
if wait_for_load:
try:
# 等待网络空闲
page.wait_for_load_state('networkidle', timeout=10000)
# 额外等待JavaScript执行
page.wait_for_timeout(2000)
except PlaywrightTimeoutError:
self.logger.warning(f"页面加载超时,但继续处理: {url}")
# 获取页面内容
html_content = page.content()
page.close()
self.logger.info(f"成功获取页面: {url}")
time.sleep(self.delay)
return html_content
else:
self.logger.warning(f"页面响应状态异常: {response.status if response else 'None'}")
except PlaywrightTimeoutError:
self.logger.warning(f"页面加载超时 (尝试 {attempt + 1}/{max_retries}): {url}")
except Exception as e:
self.logger.warning(f"获取页面失败 (尝试 {attempt + 1}/{max_retries}): {e}")
finally:
if page:
page.close()
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # 指数退避
self.logger.error(f"无法获取页面: {url}")
return None
def get_page(self, url, max_retries=3):
"""
获取网页内容兼容方法使用Playwright
:param url: 目标URL
:param max_retries: 最大重试次数
:return: HTML内容或None
"""
return self.get_page_with_playwright(url, max_retries)
def parse_html(self, html_content, parser='html.parser'):
"""
解析HTML内容
:param html_content: HTML字符串
:param parser: 解析器类型
:return: BeautifulSoup对象
"""
return BeautifulSoup(html_content, parser)
def extract_links(self, soup, base_url):
"""
提取页面中的所有链接
:param soup: BeautifulSoup对象
:param base_url: 基础URL
:return: 链接列表
"""
links = []
for link in soup.find_all('a', href=True):
full_url = urljoin(base_url, link['href'])
links.append({
'url': full_url,
'text': link.get_text(strip=True),
'title': link.get('title', '')
})
return links
def extract_images(self, soup, base_url):
"""
提取页面中的所有图片
:param soup: BeautifulSoup对象
:param base_url: 基础URL
:return: 图片信息列表
"""
images = []
for img in soup.find_all('img'):
src = img.get('src')
if src:
full_url = urljoin(base_url, src)
images.append({
'url': full_url,
'alt': img.get('alt', ''),
'title': img.get('title', '')
})
return images
def save_to_json(self, data, filename):
"""
保存数据为JSON格式
:param data: 要保存的数据
:param filename: 文件名
"""
os.makedirs(os.path.dirname(filename), exist_ok=True)
with open(filename, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
self.logger.info(f"数据已保存到: {filename}")
def save_to_csv(self, data, filename, fieldnames):
"""
保存数据为CSV格式
:param data: 要保存的数据列表
:param filename: 文件名
:param fieldnames: CSV字段名
"""
os.makedirs(os.path.dirname(filename), exist_ok=True)
with open(filename, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(data)
self.logger.info(f"数据已保存到: {filename}")
def save_to_html(self, html_content, filename):
"""
保存HTML内容到文件
:param html_content: HTML内容
:param filename: 文件名
"""
os.makedirs(os.path.dirname(filename), exist_ok=True)
with open(filename, 'w', encoding='utf-8') as f:
f.write(html_content)
self.logger.info(f"HTML已保存到: {filename}")
def scrape_page(self, url):
"""
爬取单个页面的完整信息
:param url: 目标URL
:return: 页面数据字典
"""
html_content = self.get_page_with_playwright(url)
if not html_content:
return None
# 保存原始HTML内容
soup = self.parse_html(html_content)
# 提取基本信息
title = soup.find('title')
title_text = title.get_text(strip=True) if title else ''
# 提取meta信息
meta_description = soup.find('meta', attrs={'name': 'description'})
description = meta_description.get('content', '') if meta_description else ''
# 提取链接和图片
links = self.extract_links(soup, url)
images = self.extract_images(soup, url)
# 提取文本内容
text_content = soup.get_text(strip=True)
# 提取JavaScript变量特别针对飞书等SPA应用
script_data = self.extract_script_data(soup)
return {
'url': url,
'title': title_text,
'description': description,
'links': links,
'images': images,
'text_length': len(text_content),
'html_content': html_content,
'html_length': len(html_content),
'script_data': script_data,
'timestamp': time.strftime('%Y-%m-%d %H:%M:%S')
}
def extract_script_data(self, soup):
"""
提取页面中的JavaScript数据
:param soup: BeautifulSoup对象
:return: 提取的数据字典
"""
script_data = {}
# 查找包含JSON数据的script标签
for script in soup.find_all('script'):
script_text = script.get_text()
# 查找常见的数据模式
patterns = [
'window.__pageStartTime',
'window.serverInjectRes',
'window.ENV',
'window.locales',
'window.deviceInfoBySSO'
]
for pattern in patterns:
if pattern in script_text:
try:
# 提取变量赋值
lines = script_text.split('\n')
for line in lines:
if pattern in line and '=' in line:
script_data[pattern] = line.strip()
except Exception as e:
self.logger.warning(f"提取脚本数据失败: {e}")
return script_data
def create_safe_filename(self, url, max_length=100):
"""
创建安全的文件名
:param url: 原始URL
:param max_length: 最大文件名长度
:return: 安全的文件名
"""
# 移除协议
safe_name = url.replace('https://', '').replace('http://', '')
# 替换或移除无效字符
invalid_chars = ['<', '>', ':', '"', '|', '?', '*', '/', '\\', '&', '=', '#']
for char in invalid_chars:
safe_name = safe_name.replace(char, '_')
# 移除连续的下划线
while '__' in safe_name:
safe_name = safe_name.replace('__', '_')
# 移除开头和结尾的下划线
safe_name = safe_name.strip('_')
# 限制长度
if len(safe_name) > max_length:
safe_name = safe_name[:max_length]
# 确保文件名不为空
if not safe_name:
safe_name = 'scraped_page'
return safe_name
def manual_login_and_scrape(self, login_url, target_urls):
"""
手动登录并爬取页面
:param login_url: 登录页面URL
:param target_urls: 需要爬取的目标URL列表
:return: 爬取结果列表
"""
if not self.context:
self.start_browser()
page = self.context.new_page()
try:
# 打开登录页面
self.logger.info(f"正在打开登录页面: {login_url}")
page.goto(login_url, timeout=self.timeout)
# 等待用户手动登录
print("=" * 60)
print("浏览器已打开,请在浏览器中完成登录操作")
print("登录完成后,请在此控制台按回车键继续...")
print("=" * 60)
input()
# 确认登录状态
current_url = page.url
self.logger.info(f"当前页面URL: {current_url}")
# 获取cookies用于后续请求
cookies = self.context.cookies()
self.logger.info(f"获取到 {len(cookies)} 个cookies")
# 开始爬取目标页面
results = []
for url in target_urls:
print(f"正在爬取: {url}")
data = self.scrape_page_with_session(page, url)
if data:
results.append(data)
return results
except Exception as e:
self.logger.error(f"手动登录过程中发生错误: {e}")
return []
finally:
if page:
page.close()
def scrape_page_with_session(self, page, url):
"""
使用已有session爬取页面
:param page: Playwright页面对象
:param url: 目标URL
:return: 页面数据字典
"""
try:
# 导航到目标页面
response = page.goto(url, timeout=self.timeout, wait_until='domcontentloaded')
if response and response.status == 200:
# 等待页面加载完成
try:
page.wait_for_load_state('networkidle', timeout=15000)
page.wait_for_timeout(3000) # 额外等待时间确保内容加载
except PlaywrightTimeoutError:
self.logger.warning(f"页面加载超时,但继续处理: {url}")
# 获取页面内容
html_content = page.content()
# 解析页面内容
soup = self.parse_html(html_content)
# 提取基本信息
title = soup.find('title')
title_text = title.get_text(strip=True) if title else ''
# 提取meta信息
meta_description = soup.find('meta', attrs={'name': 'description'})
description = meta_description.get('content', '') if meta_description else ''
# 提取链接和图片
links = self.extract_links(soup, url)
images = self.extract_images(soup, url)
# 提取文本内容
text_content = soup.get_text(strip=True)
# 提取JavaScript变量
script_data = self.extract_script_data(soup)
self.logger.info(f"成功获取页面: {url}")
time.sleep(self.delay)
return {
'url': url,
'title': title_text,
'description': description,
'links': links,
'images': images,
'text_length': len(text_content),
'html_content': html_content,
'html_length': len(html_content),
'script_data': script_data,
'timestamp': time.strftime('%Y-%m-%d %H:%M:%S')
}
else:
self.logger.warning(f"页面响应状态异常: {response.status if response else 'None'}")
return None
except Exception as e:
self.logger.error(f"爬取页面失败: {e}")
return None
def wait_for_manual_action(self, page, message="请完成操作后按回车继续..."):
"""
等待用户手动操作
:param page: Playwright页面对象
:param message: 提示信息
"""
print("=" * 60)
print(f"当前页面: {page.url}")
print(message)
print("=" * 60)
input()
def main():
"""
示例使用方法 - 手动登录模式
"""
# 创建爬虫实例(非无头模式,便于手动登录)
scraper = WebScraper(delay=2, timeout=30000, headless=False)
try:
# 启动浏览器
scraper.start_browser()
# 定义输出目录
output_dir = 'd:/Github/devops/output'
# 登录页面和目标页面
login_url = 'https://yixj5m42od.feishu.cn/sheets/VlAIsKxYchNABztr3RGcBnrEnYM?table=tblNs3625a9MAc89&view=vewkRuPWCL&sheet=y1FRE9'
target_urls = [
'https://yixj5m42od.feishu.cn/sheets/VlAIsKxYchNABztr3RGcBnrEnYM?table=tblNs3625a9MAc89&view=vewkRuPWCL&sheet=y1FRE9',
# 可以添加更多需要爬取的页面
]
# 手动登录并爬取
print("开始手动登录流程...")
results = scraper.manual_login_and_scrape(login_url, target_urls)
# 保存结果
if results:
# 保存HTML文件
for result in results:
safe_filename = scraper.create_safe_filename(result['url'])
html_filename = f'{output_dir}/html/{safe_filename}_logged_in.html'
scraper.save_to_html(result['html_content'], html_filename)
# 保存完整数据包含HTML到JSON
scraper.save_to_json(results, f'{output_dir}/scrape_results_full_logged_in.json')
# 保存不含HTML的精简版本到JSON
simplified_results = []
for result in results:
simplified_result = result.copy()
simplified_result.pop('html_content', None)
simplified_results.append(simplified_result)
scraper.save_to_json(simplified_results, f'{output_dir}/scrape_results_logged_in.json')
# 保存CSV
csv_data = []
for result in results:
csv_data.append({
'url': result['url'],
'title': result['title'],
'description': result['description'],
'links_count': len(result['links']),
'images_count': len(result['images']),
'html_length': result['html_length'],
'timestamp': result['timestamp']
})
fieldnames = ['url', 'title', 'description', 'links_count', 'images_count', 'html_length', 'timestamp']
scraper.save_to_csv(csv_data, f'{output_dir}/scrape_results_logged_in.csv', fieldnames)
print(f"爬取完成,共处理 {len(results)} 个页面")
print(f"HTML文件保存在: {output_dir}/html/")
print(f"完整数据保存在: {output_dir}/scrape_results_full_logged_in.json")
else:
print("未获取到任何数据")
except KeyboardInterrupt:
print("\n用户中断操作")
except Exception as e:
print(f"发生错误: {e}")
finally:
# 确保浏览器被关闭
print("正在关闭浏览器...")
scraper.stop_browser()
if __name__ == "__main__":
main()