引言:美食数据背后的技术挑战
在数字化美食时代,菜谱网站汇聚了海量的烹饪知识、用户评价和营养信息。对于美食应用开发、营养分析研究或个性化推荐系统而言,获取这些结构化数据至关重要。然而,大规模爬取菜谱数据面临诸多挑战:网站反爬机制、数据分散存储、动态内容加载等。本文将深入探讨如何利用Python最新技术栈构建高效、稳定的菜谱数据采集系统。
技术栈选择:现代化爬虫工具组合
核心工具
aiohttp+asyncio:异步HTTP客户端,处理高并发请求
Playwright:现代化浏览器自动化工具,处理JavaScript渲染
BeautifulSoup4+lxml:HTML解析的黄金组合
Redis:分布式任务队列和缓存管理
MongoDB:非结构化数据存储
Pydantic:数据验证和序列化
系统架构设计
三层爬虫架构
调度层:URL管理和任务分发
采集层:异步请求和动态渲染
处理层:数据清洗和持久化存储
python
import asyncio import aiohttp from typing import Optional, Dict, List, Any from dataclasses import dataclass from urllib.parse import urljoin, urlparse from bs4 import BeautifulSoup import json from pymongo import MongoClient from redis import Redis from pydantic import BaseModel, Field, validator from datetime import datetime import hashlib import logging from playwright.async_api import async_playwright import re # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # 数据模型定义 class RecipeModel(BaseModel): """菜谱数据模型""" recipe_id: str = Field(..., description="菜谱唯一ID") title: str = Field(..., description="菜谱标题") author: Optional[str] = Field(None, description="作者") cook_time: Optional[str] = Field(None, description="烹饪时间") difficulty: Optional[str] = Field(None, description="难度级别") rating: Optional[float] = Field(None, ge=0, le=5, description="评分") rating_count: Optional[int] = Field(None, ge=0, description="评分人数") ingredients: List[str] = Field(default_factory=list, description="食材列表") steps: List[str] = Field(default_factory=list, description="步骤说明") tags: List[str] = Field(default_factory=list, description="标签") category: List[str] = Field(default_factory=list, description="分类") nutrition_info: Optional[Dict[str, Any]] = Field(None, description="营养信息") image_urls: List[str] = Field(default_factory=list, description="图片URL") source_url: str = Field(..., description="来源URL") crawl_time: datetime = Field(default_factory=datetime.now, description="爬取时间") @validator('recipe_id') def validate_recipe_id(cls, v): if not v or len(v) < 3: raise ValueError('无效的菜谱ID') return v # 分布式任务队列 class TaskQueue: def __init__(self, redis_url='redis://localhost:6379'): self.redis = Redis.from_url(redis_url, decode_responses=True) self.queue_key = 'recipe:crawl:queue' self.visited_key = 'recipe:visited:urls' async def add_url(self, url: str, priority: int = 1): """添加URL到任务队列""" if not await self.is_visited(url): await self.redis.zadd(self.queue_key, {url: priority}) return True return False async def get_next_url(self) -> Optional[str]: """获取下一个要爬取的URL""" result = await self.redis.zpopmax(self.queue_key, count=1) if result: url, _ = result[0] await self.mark_visited(url) return url return None async def mark_visited(self, url: str): """标记URL为已访问""" url_hash = hashlib.md5(url.encode()).hexdigest() await self.redis.sadd(self.visited_key, url_hash) async def is_visited(self, url: str) -> bool: """检查URL是否已访问""" url_hash = hashlib.md5(url.encode()).hexdigest() return await self.redis.sismember(self.visited_key, url_hash) # 异步爬虫核心类 class RecipeSpider: def __init__(self, base_url: str, concurrency: int = 10, use_browser: bool = False): self.base_url = base_url self.concurrency = concurrency self.use_browser = use_browser self.task_queue = TaskQueue() self.session = None self.playwright = None self.browser = None self.context = None # MongoDB连接 self.mongo_client = MongoClient('mongodb://localhost:27017') self.db = self.mongo_client['recipe_database'] self.collection = self.db['recipes'] # 请求头配置 self.headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 ' '(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1' } # 数据提取规则(可根据不同网站配置) self.extraction_rules = { 'title': [ {'selector': 'h1.recipe-title', 'attr': 'text'}, {'selector': 'meta[property="og:title"]', 'attr': 'content'} ], 'ingredients': [ {'selector': '.ingredients li', 'attr': 'text'}, {'selector': '[itemprop="recipeIngredient"]', 'attr': 'text'} ], 'steps': [ {'selector': '.steps li', 'attr': 'text'}, {'selector': '[itemprop="recipeInstructions"] li', 'attr': 'text'} ], 'rating': [ {'selector': '.rating-value', 'attr': 'text', 'type': 'float'}, {'selector': 'meta[property="og:rating"]', 'attr': 'content', 'type': 'float'} ] } async def init_session(self): """初始化HTTP会话""" timeout = aiohttp.ClientTimeout(total=30) connector = aiohttp.TCPConnector(limit=self.concurrency, ssl=False) self.session = aiohttp.ClientSession( headers=self.headers, timeout=timeout, connector=connector ) async def init_browser(self): """初始化Playwright浏览器""" if self.use_browser: self.playwright = await async_playwright().start() self.browser = await self.playwright.chromium.launch( headless=True, args=['--disable-blink-features=AutomationControlled'] ) self.context = await self.browser.new_context( user_agent=self.headers['User-Agent'] ) async def fetch_with_retry(self, url: str, max_retries: int = 3) -> Optional[str]: """带重试机制的请求函数""" for attempt in range(max_retries): try: if self.use_browser and self.is_dynamic_page(url): return await self.fetch_with_browser(url) else: async with self.session.get(url) as response: if response.status == 200: return await response.text() elif response.status == 429: # 请求过多 await asyncio.sleep(2 ** attempt) # 指数退避 continue else: logger.warning(f"请求失败: {url}, 状态码: {response.status}") return None except Exception as e: logger.error(f"请求异常 {url} (尝试 {attempt+1}/{max_retries}): {e}") await asyncio.sleep(1) logger.error(f"请求失败,已达到最大重试次数: {url}") return None async def fetch_with_browser(self, url: str) -> Optional[str]: """使用浏览器渲染动态页面""" try: page = await self.context.new_page() # 拦截不必要的资源请求以提高速度 await page.route("**/*.{png,jpg,jpeg,gif,svg,woff,woff2,eot,ttf}", lambda route: route.abort()) # 设置额外请求头 await page.set_extra_http_headers({ 'Referer': self.base_url, 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8' }) await page.goto(url, wait_until='networkidle', timeout=30000) # 等待关键内容加载 await page.wait_for_selector('body', timeout=10000) # 获取页面内容 content = await page.content() await page.close() return content except Exception as e: logger.error(f"浏览器渲染失败 {url}: {e}") return None def is_dynamic_page(self, url: str) -> bool: """判断是否为动态页面""" dynamic_patterns = [ '#', 'javascript:', 'single-page-app', 'react', 'vue', 'angular' ] return any(pattern in url.lower() for pattern in dynamic_patterns) def extract_data(self, html: str, url: str) -> Optional[RecipeModel]: """从HTML中提取菜谱数据""" try: soup = BeautifulSoup(html, 'lxml') # 生成唯一ID recipe_id = hashlib.md5(url.encode()).hexdigest()[:12] # 提取标题 title = self._extract_by_rules(soup, 'title') # 提取食材 ingredients = self._extract_list(soup, 'ingredients') # 提取步骤 steps = self._extract_list(soup, 'steps') # 提取评分 rating_str = self._extract_by_rules(soup, 'rating') rating = float(rating_str) if rating_str else None # 提取标签和分类 tags = self._extract_tags(soup) categories = self._extract_categories(soup) # 提取图片URL image_urls = self._extract_images(soup) # 构造数据模型 recipe_data = RecipeModel( recipe_id=recipe_id, title=title or "未知菜谱", ingredients=ingredients, steps=steps, rating=rating, tags=tags, category=categories, image_urls=image_urls, source_url=url ) return recipe_data except Exception as e: logger.error(f"数据提取失败 {url}: {e}") return None def _extract_by_rules(self, soup: BeautifulSoup, field: str) -> Optional[str]: """根据规则提取字段""" if field not in self.extraction_rules: return None for rule in self.extraction_rules[field]: elements = soup.select(rule['selector']) if elements: element = elements[0] if rule['attr'] == 'text': text = element.get_text(strip=True) else: text = element.get(rule['attr'], '') if text and 'type' in rule: if rule['type'] == 'float': try: # 提取数字 numbers = re.findall(r'\d+\.?\d*', text) return numbers[0] if numbers else None except: pass return text if text else None return None def _extract_list(self, soup: BeautifulSoup, field: str) -> List[str]: """提取列表数据""" items = [] if field in self.extraction_rules: for rule in self.extraction_rules[field]: elements = soup.select(rule['selector']) for element in elements: if rule['attr'] == 'text': text = element.get_text(strip=True) else: text = element.get(rule['attr'], '') if text and len(text) > 1: items.append(text) if items: # 找到匹配规则就返回 break return items def _extract_tags(self, soup: BeautifulSoup) -> List[str]: """提取标签""" tags = [] # 尝试多种选择器 selectors = ['.tags a', '.tag-item', '[rel="tag"]', 'meta[name="keywords"]'] for selector in selectors: elements = soup.select(selector) for element in elements: if selector == 'meta[name="keywords"]': content = element.get('content', '') if content: tags.extend([t.strip() for t in content.split(',')]) else: text = element.get_text(strip=True) if text and len(text) < 20: # 标签通常较短 tags.append(text) if tags: break return list(set(tags))[:10] # 去重并限制数量 def _extract_categories(self, soup: BeautifulSoup) -> List[str]: """提取分类""" categories = [] breadcrumb_selectors = ['.breadcrumb a', '.category a', '.nav-path a'] for selector in breadcrumb_selectors: elements = soup.select(selector) for element in elements: text = element.get_text(strip=True) if text and len(text) < 30 and text.lower() not in ['home', '首页', '菜谱']: categories.append(text) return categories[:5] def _extract_images(self, soup: BeautifulSoup) -> List[str]: """提取图片URL""" images = [] img_selectors = [ 'img[itemprop="image"]', '.recipe-img img', 'meta[property="og:image"]', 'meta[name="twitter:image"]' ] for selector in img_selectors: elements = soup.select(selector) for element in elements: if selector.startswith('meta'): url = element.get('content', '') else: url = element.get('src', '') or element.get('data-src', '') if url and url.startswith(('http://', 'https://', '//')): if url.startswith('//'): url = 'https:' + url images.append(url) return list(set(images))[:5] # 去重并限制数量 async def discover_links(self, html: str, base_url: str): """从页面中发现新的链接""" soup = BeautifulSoup(html, 'lxml') links = set() # 查找可能的菜谱链接 recipe_patterns = [ 'recipe', 'cookbook', 'menu', 'dish', '/p/', '/recipe/', '/cook/', '/food/' ] for a in soup.find_all('a', href=True): href = a['href'] full_url = urljoin(base_url, href) # 过滤无效链接 if not self._is_valid_url(full_url): continue # 识别菜谱详情页 is_recipe_page = any(pattern in full_url.lower() for pattern in recipe_patterns) # 识别列表页(分页) is_list_page = re.search(r'page=\d+|p=\d+', full_url.lower()) if is_recipe_page: priority = 10 # 菜谱详情页优先级最高 elif is_list_page: priority = 5 # 列表页优先级中等 else: priority = 1 # 其他链接优先级低 await self.task_queue.add_url(full_url, priority) links.add(full_url) logger.info(f"发现 {len(links)} 个新链接") return links def _is_valid_url(self, url: str) -> bool: """验证URL是否有效""" parsed = urlparse(url) # 检查是否在同一域名下 if parsed.netloc and self.base_url not in parsed.netloc: return False # 排除常见的不需要爬取的文件 invalid_extensions = ['.pdf', '.jpg', '.png', '.gif', '.zip', '.rar'] if any(url.lower().endswith(ext) for ext in invalid_extensions): return False # 排除常见的不需要爬取的路径 invalid_paths = ['/login', '/register', '/logout', '/cart', '/checkout'] if any(path in url.lower() for path in invalid_paths): return False return True async def save_recipe(self, recipe: RecipeModel): """保存菜谱数据到MongoDB""" try: # 检查是否已存在 existing = await self.collection.find_one({'recipe_id': recipe.recipe_id}) if existing: # 更新现有记录 await self.collection.update_one( {'recipe_id': recipe.recipe_id}, {'$set': recipe.dict()} ) logger.info(f"更新菜谱: {recipe.title}") else: # 插入新记录 await self.collection.insert_one(recipe.dict()) logger.info(f"保存新菜谱: {recipe.title}") except Exception as e: logger.error(f"保存菜谱失败 {recipe.recipe_id}: {e}") async def worker(self, worker_id: int): """爬虫工作进程""" logger.info(f"工作进程 {worker_id} 启动") while True: url = await self.task_queue.get_next_url() if not url: logger.info(f"工作进程 {worker_id} 等待新任务...") await asyncio.sleep(5) continue logger.info(f"工作进程 {worker_id} 处理: {url}") # 获取页面内容 html = await self.fetch_with_retry(url) if not html: continue # 提取数据 recipe = self.extract_data(html, url) if recipe: await self.save_recipe(recipe) # 发现新链接 await self.discover_links(html, url) # 礼貌性延迟 await asyncio.sleep(0.5) async def start(self, start_urls: List[str]): """启动爬虫""" logger.info("启动菜谱爬虫...") # 初始化 await self.init_session() if self.use_browser: await self.init_browser() # 添加起始URL for url in start_urls: await self.task_queue.add_url(url, priority=10) # 启动工作进程 workers = [ self.worker(i) for i in range(self.concurrency) ] try: await asyncio.gather(*workers) except KeyboardInterrupt: logger.info("收到停止信号,正在关闭爬虫...") finally: # 清理资源 if self.session: await self.session.close() if self.browser: await self.browser.close() if self.playwright: await self.playwright.stop() self.mongo_client.close() logger.info("爬虫已关闭") # 配置管理器 class ConfigManager: @staticmethod def load_config(config_file='config.json'): """加载配置文件""" default_config = { "base_url": "https://www.example-recipes.com", "start_urls": [ "https://www.example-recipes.com/recipes", "https://www.example-recipes.com/categories" ], "concurrency": 8, "use_browser": True, "redis_url": "redis://localhost:6379", "mongodb_url": "mongodb://localhost:27017", "max_pages": 10000, "request_delay": 0.5, "user_agents": [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15" ] } try: with open(config_file, 'r', encoding='utf-8') as f: user_config = json.load(f) default_config.update(user_config) except FileNotFoundError: logger.warning(f"配置文件 {config_file} 未找到,使用默认配置") return default_config # 代理和反反爬策略 class AntiAntiCrawler: """反反爬虫策略管理器""" @staticmethod async def rotate_user_agent(session: aiohttp.ClientSession): """轮换User-Agent""" user_agents = [ 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15', 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36', 'Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/605.1.15' ] import random session.headers['User-Agent'] = random.choice(user_agents) @staticmethod async def simulate_human_delay(): """模拟人类操作延迟""" import random await asyncio.sleep(random.uniform(1, 3)) @staticmethod def generate_fingerprint(): """生成浏览器指纹""" import random import string return ''.join(random.choices(string.ascii_letters + string.digits, k=16)) # 主程序入口 async def main(): """主函数""" # 加载配置 config = ConfigManager.load_config() # 创建爬虫实例 spider = RecipeSpider( base_url=config['base_url'], concurrency=config['concurrency'], use_browser=config['use_browser'] ) try: # 启动爬虫 await spider.start(config['start_urls']) except Exception as e: logger.error(f"爬虫运行异常: {e}") raise if __name__ == "__main__": # 运行爬虫 asyncio.run(main())高级特性与优化策略
1. 智能限流与礼貌爬取
python
class RateLimiter: def __init__(self, requests_per_second=2): self.requests_per_second = requests_per_second self.semaphore = asyncio.Semaphore(requests_per_second) async def acquire(self): await self.semaphore.acquire() await asyncio.sleep(1 / self.requests_per_second) self.semaphore.release()
2. 分布式爬虫扩展
python
import celery from celery import Celery app = Celery('recipe_crawler', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0') @app.task def crawl_recipe_task(url): # 分布式任务处理 pass3. 数据质量监控
python
class DataQualityMonitor: def __init__(self): self.metrics = { 'total_crawled': 0, 'success_rate': 0.0, 'avg_recipe_completeness': 0.0 } def assess_recipe_quality(self, recipe: RecipeModel) -> float: """评估菜谱数据质量""" completeness_score = 0 if recipe.title and len(recipe.title) > 2: completeness_score += 20 if len(recipe.ingredients) >= 3: completeness_score += 30 if len(recipe.steps) >= 3: completeness_score += 30 if recipe.rating: completeness_score += 10 if recipe.image_urls: completeness_score += 10 return completeness_score部署与监控
Docker部署配置
dockerfile
FROM python:3.11-slim WORKDIR /app COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt COPY . . CMD ["python", "main.py"]
Prometheus监控指标
python
from prometheus_client import Counter, Gauge, Histogram REQUEST_COUNT = Counter('recipe_crawler_requests_total', 'Total requests') RECIPE_COUNT = Counter('recipe_crawler_recipes_total', 'Total recipes crawled') ERROR_COUNT = Counter('recipe_crawler_errors_total', 'Total errors') REQUEST_DURATION = Histogram('recipe_crawler_request_duration_seconds', 'Request duration')法律与伦理考虑
遵守robots.txt:始终尊重网站的爬虫协议
频率限制:避免对目标网站造成压力
数据使用:仅用于合法目的,遵守数据保护法规
版权尊重:注明数据来源,尊重原创内容
总结
本文详细介绍了构建现代化菜谱数据爬虫的完整技术方案。通过结合异步编程、动态渲染处理、分布式任务队列和数据验证等先进技术,我们能够高效、稳定地采集大规模的菜谱数据。关键技术点包括:
异步并发处理:使用asyncio和aiohttp实现高并发请求
智能渲染:结合静态解析和Playwright动态渲染
数据验证:使用Pydantic确保数据质量
容错机制:完善的错误处理和重试逻辑
可扩展架构:支持分布式部署和水平扩展