引言:竞品监控在数字化竞争中的战略价值
在当今激烈的市场竞争环境中,竞品网站内容监控已成为企业战略决策的重要一环。通过实时追踪竞争对手的产品更新、价格调整、营销活动和技术动态,企业能够快速响应市场变化,优化自身策略。传统的手动监控方式效率低下且容易遗漏关键信息,而现代化的Python爬虫技术结合最新AI能力,可以构建高效、智能的竞品监控系统。
本文将深入探讨如何利用Python最新技术栈构建一个功能完整的竞品内容监控爬虫,涵盖异步请求、动态渲染处理、AI内容解析、智能差异检测和自动化报告生成等核心功能。
技术架构概览
我们的监控系统将采用以下现代技术栈:
异步请求:
aiohttp+asyncio实现高并发采集动态渲染:
playwright处理JavaScript渲染页面智能解析:
BeautifulSoup4+ 自定义规则 + AI辅助分析内容差异检测:
difflib+ 语义相似度计算数据存储:
SQLAlchemy+PostgreSQL结构化存储调度与监控:
APScheduler+ 健康检查机制AI增强:
OpenAI API或本地LLM进行内容分析
完整代码实现
1. 环境配置与依赖安装
python
# requirements.txt aiohttp>=3.9.0 playwright>=1.40.0 beautifulsoup4>=4.12.0 sqlalchemy>=2.0.0 apscheduler>=3.10.0 openai>=1.0.0 python-dotenv>=1.0.0 lxml>=4.9.0 pandas>=2.0.0 numpy>=1.24.0 scikit-learn>=1.3.0 pydantic>=2.0.0 redis>=5.0.0 celery>=5.3.0
2. 核心监控爬虫类实现
python
import asyncio import aiohttp from aiohttp import TCPConnector from playwright.async_api import async_playwright from bs4 import BeautifulSoup import hashlib from urllib.parse import urljoin, urlparse import json from datetime import datetime from typing import List, Dict, Optional, Set, Tuple import logging from dataclasses import dataclass from pydantic import BaseModel, Field from sqlalchemy import create_engine, Column, String, Text, DateTime, JSON, Integer from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker import difflib from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.metrics.pairwise import cosine_similarity import numpy as np import re # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # 数据模型定义 Base = declarative_base() class CompetitorContent(Base): """竞品内容存储模型""" __tablename__ = 'competitor_content' id = Column(Integer, primary_key=True) competitor_name = Column(String(200), nullable=False, index=True) url = Column(String(500), nullable=False, unique=True) title = Column(String(500)) content_hash = Column(String(64), nullable=False, index=True) content_text = Column(Text) content_html = Column(Text) metadata = Column(JSON) # 存储价格、日期、作者等结构化数据 screenshot_path = Column(String(500)) detected_changes = Column(JSON) # 检测到的变更 similarity_score = Column(Integer) # 与上次内容的相似度 category = Column(String(100)) # 内容分类 extracted_data = Column(JSON) # 提取的特定数据 crawled_at = Column(DateTime, default=datetime.utcnow) first_seen_at = Column(DateTime, default=datetime.utcnow) last_updated_at = Column(DateTime, default=datetime.utcnow) @dataclass class CrawlResult: """爬取结果数据类""" url: str competitor: str title: str content: str html: str metadata: Dict screenshot: Optional[bytes] = None content_hash: str = "" class CompetitorMonitorConfig(BaseModel): """监控配置模型""" competitor_name: str base_urls: List[str] crawl_depth: int = Field(default=2, ge=1, le=5) check_interval_minutes: int = Field(default=60, ge=5) css_selectors: Dict[str, str] = Field(default_factory=dict) exclusion_patterns: List[str] = Field(default_factory=list) dynamic_content: bool = Field(default=False) enable_ai_analysis: bool = Field(default=True) change_threshold: float = Field(default=0.3, ge=0, le=1) class AsyncCompetitorMonitor: """异步竞品监控爬虫""" def __init__(self, config: CompetitorMonitorConfig, db_url: str): self.config = config self.db_url = db_url self.visited_urls: Set[str] = set() self.session = None self.db_session = None self.setup_database() def setup_database(self): """初始化数据库连接""" engine = create_engine(self.db_url) Base.metadata.create_all(engine) Session = sessionmaker(bind=engine) self.db_session = Session() async def initialize(self): """初始化异步会话""" timeout = aiohttp.ClientTimeout(total=30) connector = TCPConnector(limit=100, limit_per_host=20) self.session = aiohttp.ClientSession( timeout=timeout, connector=connector, headers={ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language': 'en-US,en;q=0.9', 'Accept-Encoding': 'gzip, deflate', } ) def should_crawl(self, url: str) -> bool: """判断URL是否应该爬取""" # 检查是否在排除模式中 for pattern in self.config.exclusion_patterns: if re.search(pattern, url): return False # 检查是否已访问 if url in self.visited_urls: return False # 检查是否属于竞品域名 parsed_url = urlparse(url) base_domains = {urlparse(base).netloc for base in self.config.base_urls} return parsed_url.netloc in base_domains async def crawl_with_playwright(self, url: str) -> Optional[Tuple[str, bytes]]: """使用Playwright处理动态内容""" try: async with async_playwright() as p: browser = await p.chromium.launch(headless=True) context = await browser.new_context( viewport={'width': 1920, 'height': 1080}, user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' ) page = await context.new_page() # 设置请求拦截,优化性能 await page.route("**/*.{png,jpg,jpeg,gif,svg,woff,woff2,eot,ttf}", lambda route: route.abort()) await page.goto(url, wait_until='networkidle') # 等待可能动态加载的内容 await page.wait_for_timeout(2000) # 截图用于可视化监控 screenshot = await page.screenshot(full_page=True) # 获取渲染后的HTML content = await page.content() await browser.close() return content, screenshot except Exception as e: logger.error(f"Playwright爬取失败 {url}: {e}") return None async def fetch_url(self, url: str, depth: int = 0) -> Optional[CrawlResult]: """获取单个URL内容""" if depth > self.config.crawl_depth or not self.should_crawl(url): return None self.visited_urls.add(url) try: if self.config.dynamic_content: # 使用Playwright处理动态内容 result = await self.crawl_with_playwright(url) if result: html, screenshot = result else: return None else: # 使用aiohttp获取静态内容 async with self.session.get(url) as response: if response.status != 200: logger.warning(f"请求失败 {url}: {response.status}") return None html = await response.text() screenshot = None # 解析HTML内容 soup = BeautifulSoup(html, 'lxml') # 提取标题 title = soup.title.string if soup.title else "" # 清理内容,移除脚本和样式 for script in soup(["script", "style", "nav", "footer"]): script.decompose() # 根据配置的CSS选择器提取特定内容 extracted_content = self.extract_with_selectors(soup) # 获取主要文本内容 main_content = extracted_content or soup.get_text(separator=' ', strip=True) # 提取元数据 metadata = self.extract_metadata(soup, url) # 生成内容哈希 content_hash = hashlib.sha256(main_content.encode()).hexdigest() return CrawlResult( url=url, competitor=self.config.competitor_name, title=title, content=main_content, html=html, metadata=metadata, screenshot=screenshot, content_hash=content_hash ) except Exception as e: logger.error(f"爬取失败 {url}: {e}") return None def extract_with_selectors(self, soup: BeautifulSoup) -> str: """使用CSS选择器提取特定内容""" extracted_parts = [] # 价格选择器 if 'price' in self.config.css_selectors: price_elements = soup.select(self.config.css_selectors['price']) for elem in price_elements: extracted_parts.append(f"价格: {elem.get_text(strip=True)}") # 产品名称选择器 if 'product_name' in self.config.css_selectors: name_elements = soup.select(self.config.css_selectors['product_name']) for elem in name_elements: extracted_parts.append(f"产品: {elem.get_text(strip=True)}") # 日期选择器 if 'date' in self.config.css_selectors: date_elements = soup.select(self.config.css_selectors['date']) for elem in date_elements: extracted_parts.append(f"日期: {elem.get_text(strip=True)}") return " | ".join(extracted_parts) if extracted_parts else "" def extract_metadata(self, soup: BeautifulSoup, url: str) -> Dict: """提取页面元数据""" metadata = { 'url': url, 'timestamp': datetime.utcnow().isoformat(), 'meta_tags': {} } # 提取meta标签 for meta in soup.find_all('meta'): if meta.get('name'): metadata['meta_tags'][meta.get('name')] = meta.get('content', '') elif meta.get('property'): metadata['meta_tags'][meta.get('property')] = meta.get('content', '') # 提取Open Graph数据 og_data = {} for meta in soup.find_all('meta', property=re.compile(r'^og:')): og_data[meta['property']] = meta.get('content', '') metadata['og_data'] = og_data # 提取结构化数据(JSON-LD) json_ld_scripts = soup.find_all('script', type='application/ld+json') structured_data = [] for script in json_ld_scripts: try: data = json.loads(script.string) structured_data.append(data) except: pass metadata['structured_data'] = structured_data return metadata def calculate_content_similarity(self, text1: str, text2: str) -> float: """计算内容相似度""" if not text1 or not text2: return 0.0 # 使用TF-IDF向量化和余弦相似度 vectorizer = TfidfVectorizer(stop_words='english') try: tfidf_matrix = vectorizer.fit_transform([text1, text2]) similarity = cosine_similarity(tfidf_matrix[0:1], tfidf_matrix[1:2])[0][0] return float(similarity) except: # 回退到简单字符串相似度 return difflib.SequenceMatcher(None, text1[:1000], text2[:1000]).ratio() def detect_changes(self, old_content: str, new_content: str, old_metadata: Dict, new_metadata: Dict) -> Dict: """检测内容变化""" changes = { 'content_changed': False, 'metadata_changed': False, 'specific_changes': [], 'similarity_score': 0.0 } # 计算相似度 similarity = self.calculate_content_similarity(old_content, new_content) changes['similarity_score'] = similarity # 检测内容变化 if similarity < (1 - self.config.change_threshold): changes['content_changed'] = True # 使用difflib生成差异详情 diff = list(difflib.unified_diff( old_content.splitlines()[:50], # 限制行数 new_content.splitlines()[:50], lineterm='' )) changes['diff'] = diff[:20] # 取前20个差异 # 检测元数据变化 if old_metadata != new_metadata: changes['metadata_changed'] = True # 找出具体的元数据变更 for key in set(old_metadata.keys()) | set(new_metadata.keys()): old_val = old_metadata.get(key) new_val = new_metadata.get(key) if old_val != new_val: changes['specific_changes'].append({ 'field': key, 'old': str(old_val), 'new': str(new_val) }) return changes async def extract_urls_from_page(self, soup: BeautifulSoup, base_url: str) -> List[str]: """从页面中提取所有URL""" urls = [] for link in soup.find_all('a', href=True): href = link['href'] full_url = urljoin(base_url, href) # 过滤非HTTP链接和锚点 if not full_url.startswith('http'): continue # 去重和过滤 if full_url not in urls and self.should_crawl(full_url): urls.append(full_url) return urls async def crawl_competitor_site(self, start_url: str) -> List[CrawlResult]: """递归爬取竞品网站""" results = [] async def recursive_crawl(url: str, current_depth: int): if current_depth > self.config.crawl_depth: return logger.info(f"爬取: {url} (深度: {current_depth})") result = await self.fetch_url(url, current_depth) if result: results.append(result) # 保存到数据库 await self.save_to_database(result) # 如果不是最后一层,继续提取链接 if current_depth < self.config.crawl_depth: try: soup = BeautifulSoup(result.html, 'lxml') child_urls = await self.extract_urls_from_page(soup, url) # 限制并发数,避免被封 batch_size = 5 for i in range(0, len(child_urls), batch_size): batch = child_urls[i:i+batch_size] tasks = [recursive_crawl(child_url, current_depth + 1) for child_url in batch] await asyncio.gather(*tasks, return_exceptions=True) # 礼貌性延迟 await asyncio.sleep(1) except Exception as e: logger.error(f"提取链接失败 {url}: {e}") await recursive_crawl(start_url, 0) return results async def save_to_database(self, result: CrawlResult): """保存爬取结果到数据库""" try: # 检查是否已存在 existing = self.db_session.query(CompetitorContent).filter_by(url=result.url).first() if existing: # 检测变化 changes = self.detect_changes( existing.content_text, result.content, existing.metadata or {}, result.metadata ) # 如果有显著变化,更新记录 if changes['content_changed'] or changes['metadata_changed']: existing.content_text = result.content existing.content_html = result.html existing.metadata = result.metadata existing.content_hash = result.content_hash existing.detected_changes = changes existing.similarity_score = int(changes['similarity_score'] * 100) existing.last_updated_at = datetime.utcnow() logger.info(f"内容更新: {result.url} (相似度: {changes['similarity_score']:.2%})") # 如果有AI分析,执行进一步处理 if self.config.enable_ai_analysis and changes['content_changed']: await self.perform_ai_analysis(existing, result) else: # 新内容 new_content = CompetitorContent( competitor_name=result.competitor, url=result.url, title=result.title[:500], content_text=result.content, content_html=result.html, content_hash=result.content_hash, metadata=result.metadata, detected_changes={}, similarity_score=100, crawled_at=datetime.utcnow(), first_seen_at=datetime.utcnow(), last_updated_at=datetime.utcnow() ) self.db_session.add(new_content) logger.info(f"新内容发现: {result.url}") self.db_session.commit() except Exception as e: logger.error(f"数据库保存失败 {result.url}: {e}") self.db_session.rollback() async def perform_ai_analysis(self, content_record: CompetitorContent, new_result: CrawlResult): """执行AI分析""" # 这里可以集成OpenAI API或其他AI服务 # 例如:自动分类、情感分析、关键信息提取等 try: # 示例:使用规则检测价格变化 price_patterns = [ r'¥(\d+(?:\.\d{2})?)', r'¥(\d+(?:\.\d{2})?)', r'价格[::]\s*(\d+(?:\.\d{2})?)', r'price[::]\s*\$?(\d+(?:\.\d{2})?)' ] prices = [] for pattern in price_patterns: matches = re.findall(pattern, new_result.content, re.IGNORECASE) prices.extend(matches) if prices: content_record.extracted_data = { 'prices': prices, 'analysis_timestamp': datetime.utcnow().isoformat() } except Exception as e: logger.error(f"AI分析失败: {e}") async def monitor_all_competitors(self): """监控所有配置的竞品网站""" all_results = [] for base_url in self.config.base_urls: logger.info(f"开始监控: {self.config.competitor_name} - {base_url}") try: results = await self.crawl_competitor_site(base_url) all_results.extend(results) # 生成监控报告 await self.generate_monitoring_report(results) except Exception as e: logger.error(f"监控失败 {base_url}: {e}") return all_results async def generate_monitoring_report(self, results: List[CrawlResult]): """生成监控报告""" if not results: return report = { 'competitor': self.config.competitor_name, 'crawl_time': datetime.utcnow().isoformat(), 'total_pages': len(results), 'new_pages': 0, 'updated_pages': 0, 'significant_changes': [], 'summary': {} } for result in results: # 这里可以添加详细的报告逻辑 pass # 保存报告到文件或发送通知 report_path = f"reports/{self.config.competitor_name}_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.json" with open(report_path, 'w', encoding='utf-8') as f: json.dump(report, f, ensure_ascii=False, indent=2) logger.info(f"报告已生成: {report_path}") async def close(self): """清理资源""" if self.session: await self.session.close() if self.db_session: self.db_session.close() # 配置管理器 class MonitorConfigManager: """监控配置管理器""" def __init__(self, config_file: str = 'monitor_configs.json'): self.config_file = config_file self.configs = self.load_configs() def load_configs(self) -> List[CompetitorMonitorConfig]: """加载监控配置""" try: with open(self.config_file, 'r', encoding='utf-8') as f: config_data = json.load(f) return [CompetitorMonitorConfig(**data) for data in config_data] except FileNotFoundError: logger.warning(f"配置文件不存在: {self.config_file}") return [] def get_competitor_config(self, name: str) -> Optional[CompetitorMonitorConfig]: """获取特定竞品的配置""" for config in self.configs: if config.competitor_name == name: return config return None # 主监控服务 class CompetitorMonitoringService: """竞品监控服务""" def __init__(self, db_url: str, config_manager: MonitorConfigManager): self.db_url = db_url self.config_manager = config_manager self.monitors = {} async def initialize_monitors(self): """初始化所有监控器""" for config in self.config_manager.configs: monitor = AsyncCompetitorMonitor(config, self.db_url) await monitor.initialize() self.monitors[config.competitor_name] = monitor async def run_monitoring_cycle(self): """运行监控周期""" logger.info("开始竞品监控周期") all_results = [] for name, monitor in self.monitors.items(): try: logger.info(f"监控竞品: {name}") results = await monitor.monitor_all_competitors() all_results.extend(results) # 等待一段时间,避免过于频繁的请求 await asyncio.sleep(10) except Exception as e: logger.error(f"监控失败 {name}: {e}") logger.info(f"监控周期完成,共处理 {len(all_results)} 个页面") return all_results async def close_all(self): """关闭所有监控器""" for monitor in self.monitors.values(): await monitor.close() # 示例配置 def create_example_config(): """创建示例配置文件""" configs = [ { "competitor_name": "ExampleTech", "base_urls": [ "https://example-tech.com/products", "https://blog.example-tech.com" ], "crawl_depth": 2, "check_interval_minutes": 120, "css_selectors": { "price": ".price, .product-price, [itemprop='price']", "product_name": ".product-title, h1.product-name", "date": ".post-date, .article-date, time" }, "exclusion_patterns": [ ".*/cart/.*", ".*/checkout/.*", ".*\.pdf$", ".*\.zip$" ], "dynamic_content": True, "enable_ai_analysis": True, "change_threshold": 0.25 } ] with open('monitor_configs.json', 'w', encoding='utf-8') as f: json.dump(configs, f, indent=2, ensure_ascii=False) print("示例配置文件已创建: monitor_configs.json") # 主程序 async def main(): """主函数""" # 创建示例配置(首次运行时) # create_example_config() # 初始化配置管理器 config_manager = MonitorConfigManager('monitor_configs.json') if not config_manager.configs: logger.error("没有找到监控配置") return # 数据库连接(示例使用SQLite,生产环境建议使用PostgreSQL) db_url = "sqlite:///competitor_monitor.db" # 初始化监控服务 service = CompetitorMonitoringService(db_url, config_manager) await service.initialize_monitors() try: # 运行监控周期 results = await service.run_monitoring_cycle() # 这里可以添加定时调度逻辑 # 使用APScheduler实现定期监控 finally: await service.close_all() if __name__ == "__main__": # 运行主程序 asyncio.run(main())3. 高级功能扩展
python
# advanced_features.py import asyncio from typing import List, Dict import aiohttp from openai import AsyncOpenAI import pandas as pd from collections import Counter import numpy as np class AdvancedCompetitorAnalyzer: """高级竞品分析器""" def __init__(self, openai_api_key: str = None): self.openai_client = AsyncOpenAI(api_key=openai_api_key) if openai_api_key else None async def analyze_content_with_ai(self, content: str, previous_content: str = None) -> Dict: """使用AI分析内容""" if not self.openai_client: return {} try: prompt = f""" 分析以下竞品内容,提供: 1. 主要内容主题 2. 关键产品或服务 3. 价格信息(如果存在) 4. 发布日期或时效性 5. 情感倾向 6. 与竞争对手相比的独特卖点 内容: {content[:3000]} # 限制长度 {f"前次内容摘要:{previous_content[:1000]}" if previous_content else ""} 请以JSON格式返回分析结果。 """ response = await self.openai_client.chat.completions.create( model="gpt-4-turbo-preview", messages=[ {"role": "system", "content": "你是一个专业的竞品分析专家。"}, {"role": "user", "content": prompt} ], temperature=0.3, max_tokens=1000 ) analysis_text = response.choices[0].message.content # 尝试解析JSON import json try: return json.loads(analysis_text) except: return {"analysis": analysis_text} except Exception as e: logger.error(f"AI分析失败: {e}") return {} def extract_price_changes(self, content_history: List[Dict]) -> List[Dict]: """提取价格变化趋势""" price_changes = [] for i in range(1, len(content_history)): old_prices = self._extract_prices(content_history[i-1]['content']) new_prices = self._extract_prices(content_history[i]['content']) if old_prices and new_prices: for product, old_price in old_prices.items(): if product in new_prices: new_price = new_prices[product] if old_price != new_price: price_changes.append({ 'product': product, 'old_price': old_price, 'new_price': new_price, 'change_percentage': ((new_price - old_price) / old_price) * 100, 'date': content_history[i]['date'] }) return price_changes def _extract_prices(self, text: str) -> Dict[str, float]: """从文本中提取价格""" price_patterns = [ (r'¥(\d+(?:\.\d{2})?)', 'CNY'), (r'¥(\d+(?:\.\d{2})?)', 'CNY'), (r'\$(\d+(?:\.\d{2})?)', 'USD'), (r'€(\d+(?:\.\d{2})?)', 'EUR'), (r'价格[::]\s*(\d+(?:\.\d{2})?)', 'CNY'), (r'price[::]\s*\$?(\d+(?:\.\d{2})?)', 'USD') ] prices = {} for pattern, currency in price_patterns: matches = re.finditer(pattern, text, re.IGNORECASE) for match in matches: price_value = float(match.group(1)) product_context = text[max(0, match.start()-50):match.start()] # 简单提取产品名称(实际应用中需要更复杂的逻辑) product_name = "产品" if ':' in product_context: product_name = product_context.split(':')[-1].strip()[:50] prices[f"{product_name} ({currency})"] = price_value return prices def generate_competitive_analysis_report(self, competitors_data: Dict[str, List]) -> pd.DataFrame: """生成竞争分析报告""" reports = [] for competitor, data_list in competitors_data.items(): if not data_list: continue latest_data = data_list[-1] report = { '竞争对手': competitor, '监控页面数': len(data_list), '最近更新时间': latest_data.get('date', ''), '内容变更频率': self._calculate_change_frequency(data_list), '平均内容长度': np.mean([len(d.get('content', '')) for d in data_list]), '价格数量': len(self._extract_prices(latest_data.get('content', ''))), '关键词频率': self._extract_top_keywords(latest_data.get('content', ''), 10) } reports.append(report) return pd.DataFrame(reports) def _calculate_change_frequency(self, data_list: List[Dict]) -> float: """计算内容变更频率""" if len(data_list) < 2: return 0.0 changes = 0 for i in range(1, len(data_list)): similarity = self.calculate_content_similarity( data_list[i-1].get('content', ''), data_list[i].get('content', '') ) if similarity < 0.8: # 相似度低于80%视为有变化 changes += 1 return changes / (len(data_list) - 1) def _extract_top_keywords(self, text: str, top_n: int = 10) -> List[str]: """提取Top关键词""" # 这里可以集成更复杂的关键词提取算法 words = re.findall(r'\b\w{3,}\b', text.lower()) stop_words = {'the', 'and', 'for', 'with', 'this', 'that', 'are', 'was', 'were'} filtered_words = [w for w in words if w not in stop_words] word_counts = Counter(filtered_words) return [word for word, _ in word_counts.most_common(top_n)] # 定时调度器 from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.interval import IntervalTrigger class ScheduledMonitor: """定时监控调度器""" def __init__(self, service: CompetitorMonitoringService): self.service = service self.scheduler = AsyncIOScheduler() async def start(self): """启动定时监控""" # 立即运行一次 asyncio.create_task(self.service.run_monitoring_cycle()) # 配置每个竞品的定时任务 for config in self.service.config_manager.configs: trigger = IntervalTrigger(minutes=config.check_interval_minutes) self.scheduler.add_job( self.run_competitor_monitor, trigger, args=[config.competitor_name], id=f"monitor_{config.competitor_name}", name=f"监控 {config.competitor_name}", max_instances=1 ) self.scheduler.start() logger.info("定时监控调度器已启动") async def run_competitor_monitor(self, competitor_name: str): """运行单个竞品监控""" logger.info(f"定时任务: 监控 {competitor_name}") monitor = self.service.monitors.get(competitor_name) if monitor: try: await monitor.monitor_all_competitors() except Exception as e: logger.error(f"定时监控失败 {competitor_name}: {e}")部署与优化建议
1. 部署架构
text
竞品监控系统架构: ├── 采集层 (Crawlers) │ ├── 静态内容爬虫 (aiohttp) │ ├── 动态内容爬虫 (Playwright) │ └── API数据采集 (异步请求) ├── 处理层 (Processors) │ ├── 内容解析器 (BeautifulSoup) │ ├── 变化检测器 (相似度算法) │ └── AI分析器 (OpenAI集成) ├── 存储层 (Storage) │ ├── PostgreSQL (结构化数据) │ ├── Redis (缓存与队列) │ └── 文件系统 (截图与原始HTML) ├── 调度层 (Scheduler) │ └── APScheduler (定时任务) └── 展示层 (Dashboard) ├── Flask/FastAPI (API服务) └── Vue/React (前端展示)
2. 反爬虫规避策略
python
# anti_anti_scraping.py class AntiAntiScraping: """反反爬虫策略""" def __init__(self): self.user_agents = [ 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', ] self.proxy_pool = [ # 代理服务器列表 ] async def rotate_user_agent(self, session: aiohttp.ClientSession): """轮换User-Agent""" session.headers.update({ 'User-Agent': np.random.choice(self.user_agents) }) async def add_random_delay(self, min_delay: float = 1.0, max_delay: float = 3.0): """添加随机延迟""" delay = np.random.uniform(min_delay, max_delay) await asyncio.sleep(delay) def use_proxy(self): """使用代理""" if self.proxy_pool: proxy = np.random.choice(self.proxy_pool) return f"http://{proxy}" return None3. 监控与告警
python
# monitoring_alert.py class MonitoringAlert: """监控告警系统""" def __init__(self): self.alert_channels = [] def add_channel(self, channel_type: str, config: Dict): """添加告警渠道""" if channel_type == "email": self.alert_channels.append(EmailAlert(config)) elif channel_type == "slack": self.alert_channels.append(SlackAlert(config)) elif channel_type == "webhook": self.alert_channels.append(WebhookAlert(config)) async def send_alert(self, title: str, message: str, level: str = "info"): """发送告警""" for channel in self.alert_channels: try: await channel.send(title, message, level) except Exception as e: logger.error(f"告警发送失败 {channel}: {e}") async def check_and_alert(self, monitor_results: List[CrawlResult]): """检查并发送告警""" significant_changes = [] for result in monitor_results: # 检测重大变化 if self.is_significant_change(result): significant_changes.append(result) if significant_changes: alert_msg = self.format_alert_message(significant_changes) await self.send_alert( "竞品内容重大变更告警", alert_msg, "warning" )总结与最佳实践
关键要点:
异步性能:使用
aiohttp和asyncio实现高并发,显著提升爬取效率动态渲染:集成
Playwright处理JavaScript渲染的现代网页智能解析:结合CSS选择器、正则表达式和AI分析提取结构化信息
变化检测:采用TF-IDF向量化和余弦相似度精确识别内容变更
可扩展架构:模块化设计,便于添加新的竞品网站和监控规则
合规性考虑:尊重robots.txt,设置合理延迟,避免对目标网站造成压力
扩展建议:
数据可视化:集成Grafana或自定义Dashboard展示监控数据
机器学习:训练模型自动识别产品分类、价格变化模式
多语言支持:添加多语言内容处理能力
API扩展:提供RESTful API供其他系统调用监控结果
云原生部署:容器化部署,支持Kubernetes弹性伸缩