陇南市网站建设_网站建设公司_Logo设计_seo优化
2026/1/2 15:24:12 网站建设 项目流程

引言:设计素材采集的挑战与机遇

在数字化设计时代,高质量的设计素材是设计师创作的基石。然而,面对众多设计素材网站,手动下载效率低下且难以批量获取。Python爬虫技术为我们提供了自动化采集的解决方案。本文将深入探讨如何使用最新的Python异步爬虫技术,高效、合规地采集设计素材网站。

技术栈概览

  • Python 3.8+:核心编程语言

  • aiohttp:异步HTTP客户端/服务器框架

  • asyncio:Python原生异步I/O框架

  • BeautifulSoup4:HTML解析库

  • aiofiles:异步文件操作

  • Redis:分布式缓存和任务队列(可选)

  • 代理IP池:应对反爬机制

项目结构设计

text

design-material-crawler/ ├── src/ │ ├── crawler.py # 主爬虫逻辑 │ ├── parser.py # 页面解析器 │ ├── storage.py # 数据存储模块 │ ├── proxy_manager.py # 代理管理器 │ └── utils.py # 工具函数 ├── config/ │ └── settings.py # 配置文件 ├── data/ # 采集的数据 ├── logs/ # 日志文件 └── requirements.txt # 依赖列表

完整代码实现

1. 环境配置与依赖安装

requirements.txt:

txt

aiohttp==3.9.1 beautifulsoup4==4.12.2 aiofiles==23.2.1 redis==5.0.1 asyncio-throttle==1.0.2 fake-useragent==1.4.0 python-dotenv==1.0.0 pillow==10.1.0 opencv-python==4.9.0.80

2. 配置文件

config/settings.py:

python

import os from dotenv import load_dotenv load_dotenv() class Config: # 爬虫配置 MAX_CONCURRENT_REQUESTS = 10 REQUEST_TIMEOUT = 30 RETRY_ATTEMPTS = 3 DELAY_BETWEEN_REQUESTS = 1.0 # 目标网站配置 TARGET_SITES = { 'unsplash': { 'base_url': 'https://unsplash.com', 'search_url': 'https://unsplash.com/napi/search/photos', 'per_page': 20 }, 'pexels': { 'base_url': 'https://www.pexels.com', 'search_url': 'https://www.pexels.com/api/v3/search', 'api_key': os.getenv('PEXELS_API_KEY', '') } } # 存储配置 STORAGE_PATH = './data' IMAGE_FORMATS = ['jpg', 'png', 'webp', 'svg'] MAX_FILE_SIZE = 50 * 1024 * 1024 # 50MB # 代理配置 USE_PROXY = True PROXY_POOL_URL = os.getenv('PROXY_POOL_URL', '') # Redis配置 REDIS_URL = os.getenv('REDIS_URL', 'redis://localhost:6379/0') # 请求头配置 HEADERS = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language': 'en-US,en;q=0.5', 'Accept-Encoding': 'gzip, deflate', 'DNT': '1', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1' }

3. 主爬虫类实现

src/crawler.py:

python

import asyncio import aiohttp import aiofiles import logging from typing import List, Dict, Any, Optional from pathlib import Path from urllib.parse import urljoin, urlparse import hashlib import json from datetime import datetime from config.settings import Config from src.proxy_manager import ProxyManager from src.parser import Parser from src.storage import StorageManager from src.utils import rate_limiter, retry_handler, generate_file_hash logger = logging.getLogger(__name__) class DesignMaterialCrawler: """设计素材网站异步爬虫""" def __init__(self, config: Config): self.config = config self.session: Optional[aiohttp.ClientSession] = None self.proxy_manager = ProxyManager(config) if config.USE_PROXY else None self.parser = Parser() self.storage = StorageManager(config) self.semaphore = asyncio.Semaphore(config.MAX_CONCURRENT_REQUESTS) self.visited_urls = set() async def __aenter__(self): """异步上下文管理器入口""" timeout = aiohttp.ClientTimeout(total=self.config.REQUEST_TIMEOUT) connector = aiohttp.TCPConnector(limit=100, force_close=True) self.session = aiohttp.ClientSession( timeout=timeout, connector=connector, headers=self.config.HEADERS ) await self.storage.initialize() return self async def __aexit__(self, exc_type, exc_val, exc_tb): """异步上下文管理器出口""" if self.session: await self.session.close() await self.storage.close() @retry_handler(max_retries=3, delay=1.0) @rate_limiter(max_calls=10, period=1.0) async def fetch_page(self, url: str, params: Dict = None) -> str: """获取页面内容""" if url in self.visited_urls: return "" proxy = None if self.proxy_manager: proxy = await self.proxy_manager.get_proxy() try: async with self.semaphore: async with self.session.get( url, params=params, proxy=proxy, ssl=False ) as response: response.raise_for_status() content = await response.text() self.visited_urls.add(url) # 记录成功请求 logger.info(f"成功获取页面: {url}, 状态码: {response.status}") return content except aiohttp.ClientError as e: logger.error(f"请求失败 {url}: {str(e)}") # 标记代理失效 if proxy and self.proxy_manager: await self.proxy_manager.mark_proxy_failed(proxy) raise except asyncio.TimeoutError: logger.error(f"请求超时: {url}") raise async def download_file(self, url: str, filepath: Path) -> bool: """下载文件到本地""" if filepath.exists(): logger.info(f"文件已存在: {filepath}") return True try: async with self.session.get(url) as response: if response.status == 200: # 检查文件类型和大小 content_type = response.headers.get('Content-Type', '') content_length = int(response.headers.get('Content-Length', 0)) if not self._is_valid_file(content_type, content_length): logger.warning(f"无效文件类型或大小: {url}") return False # 异步写入文件 async with aiofiles.open(filepath, 'wb') as f: async for chunk in response.content.iter_chunked(8192): await f.write(chunk) logger.info(f"文件下载成功: {filepath}") return True else: logger.error(f"下载失败 {url}: 状态码 {response.status}") return False except Exception as e: logger.error(f"下载文件异常 {url}: {str(e)}") return False def _is_valid_file(self, content_type: str, content_length: int) -> bool: """验证文件类型和大小""" # 检查文件大小 if content_length > self.config.MAX_FILE_SIZE: return False # 检查文件类型 valid_types = [f'image/{fmt}' for fmt in self.config.IMAGE_FORMATS] valid_types.extend(['application/zip', 'application/x-rar-compressed']) return any(valid_type in content_type for valid_type in valid_types) async def crawl_site(self, site_name: str, keywords: List[str], max_items: int = 100): """爬取指定网站的设计素材""" site_config = self.config.TARGET_SITES.get(site_name) if not site_config: logger.error(f"未找到网站配置: {site_name}") return logger.info(f"开始爬取 {site_name},关键词: {keywords}") all_items = [] for keyword in keywords: logger.info(f"搜索关键词: {keyword}") page = 1 collected_items = 0 while collected_items < max_items: try: # 构建请求参数 params = self._build_search_params(site_name, keyword, page) # 获取搜索结果 if site_name == 'unsplash': search_url = site_config['search_url'] content = await self.fetch_page(search_url, params) data = json.loads(content) items = self.parser.parse_unsplash_results(data) elif site_name == 'pexels': if not site_config.get('api_key'): logger.error("Pexels需要API Key") break search_url = site_config['search_url'] headers = {'Authorization': site_config['api_key']} async with self.session.get(search_url, params=params, headers=headers) as response: data = await response.json() items = self.parser.parse_pexels_results(data) else: logger.error(f"不支持的网站: {site_name}") break if not items: break # 处理每个素材项 tasks = [] for item in items: if collected_items >= max_items: break tasks.append(self._process_material_item(item, site_name, keyword)) collected_items += 1 # 并发处理 results = await asyncio.gather(*tasks, return_exceptions=True) # 收集成功的结果 for result in results: if isinstance(result, dict): all_items.append(result) page += 1 await asyncio.sleep(1) # 礼貌延迟 except Exception as e: logger.error(f"爬取过程异常: {str(e)}") break # 保存元数据 if all_items: metadata_file = self.storage.save_metadata(all_items, site_name) logger.info(f"爬取完成,共收集 {len(all_items)} 个素材,元数据保存至: {metadata_file}") def _build_search_params(self, site_name: str, keyword: str, page: int) -> Dict: """构建搜索参数""" if site_name == 'unsplash': return { 'query': keyword, 'page': page, 'per_page': min(20, self.config.TARGET_SITES['unsplash']['per_page']) } elif site_name == 'pexels': return { 'query': keyword, 'page': page, 'per_page': 15 } return {} async def _process_material_item(self, item: Dict, site_name: str, keyword: str) -> Dict: """处理单个素材项""" try: # 下载主图片 image_url = item.get('image_url') if not image_url: return {} # 生成文件名 filename = self._generate_filename(item, site_name) filepath = self.storage.get_filepath(site_name, 'images', filename) # 下载文件 success = await self.download_file(image_url, filepath) if success: # 计算文件哈希 file_hash = await generate_file_hash(filepath) # 构建元数据 metadata = { 'id': item.get('id', ''), 'title': item.get('title', ''), 'description': item.get('description', ''), 'image_url': image_url, 'download_url': item.get('download_url', ''), 'author': item.get('author', {}), 'tags': item.get('tags', []), 'keywords': [keyword], 'site': site_name, 'file_path': str(filepath), 'file_hash': file_hash, 'file_size': filepath.stat().st_size, 'crawled_at': datetime.now().isoformat(), 'metadata': item } # 保存缩略图 await self.storage.create_thumbnail(filepath) return metadata except Exception as e: logger.error(f"处理素材项失败: {str(e)}") return {} def _generate_filename(self, item: Dict, site_name: str) -> str: """生成文件名""" item_id = item.get('id', '') title = item.get('title', 'untitled').lower() # 清理文件名 import re title_clean = re.sub(r'[^\w\-_\. ]', '_', title) title_clean = re.sub(r'\s+', '_', title_clean) # 提取扩展名 image_url = item.get('image_url', '') ext = Path(urlparse(image_url).path).suffix if image_url else '.jpg' return f"{site_name}_{item_id}_{title_clean}{ext}" async def crawl_batch(self, sites_keywords: Dict[str, List[str]], max_items_per_site: int = 50): """批量爬取多个网站""" tasks = [] for site_name, keywords in sites_keywords.items(): task = self.crawl_site(site_name, keywords, max_items_per_site) tasks.append(task) # 并发执行所有任务 results = await asyncio.gather(*tasks, return_exceptions=True) # 处理结果 for site_name, result in zip(sites_keywords.keys(), results): if isinstance(result, Exception): logger.error(f"爬取 {site_name} 失败: {str(result)}") else: logger.info(f"爬取 {site_name} 完成")

4. 页面解析器

src/parser.py:

python

from bs4 import BeautifulSoup import json from typing import List, Dict, Any import re class Parser: """页面解析器""" def parse_unsplash_results(self, data: Dict) -> List[Dict]: """解析Unsplash API返回数据""" results = [] if 'results' in data: for item in data['results']: result = { 'id': item.get('id', ''), 'title': item.get('description', 'Untitled'), 'description': item.get('alt_description', ''), 'image_url': item.get('urls', {}).get('regular', ''), 'download_url': item.get('links', {}).get('download', ''), 'author': { 'name': item.get('user', {}).get('name', ''), 'username': item.get('user', {}).get('username', ''), 'profile_url': item.get('user', {}).get('links', {}).get('html', '') }, 'tags': [tag.get('title', '') for tag in item.get('tags', [])], 'width': item.get('width', 0), 'height': item.get('height', 0), 'color': item.get('color', ''), 'likes': item.get('likes', 0) } results.append(result) return results def parse_pexels_results(self, data: Dict) -> List[Dict]: """解析Pexels API返回数据""" results = [] if 'photos' in data: for item in data['photos']: result = { 'id': str(item.get('id', '')), 'title': item.get('alt', 'Untitled'), 'description': '', 'image_url': item.get('src', {}).get('large', ''), 'download_url': item.get('url', ''), 'author': { 'name': item.get('photographer', ''), 'profile_url': item.get('photographer_url', '') }, 'tags': [], 'width': item.get('width', 0), 'height': item.get('height', 0), 'avg_color': item.get('avg_color', ''), 'liked': item.get('liked', False) } results.append(result) return results def parse_html_page(self, html: str, site_type: str) -> List[Dict]: """解析HTML页面(备用方法)""" soup = BeautifulSoup(html, 'html.parser') results = [] if site_type == 'unsplash': # 解析Unsplash页面结构 image_elements = soup.select('figure img') for img in image_elements: src = img.get('src') or img.get('data-src') if src and 'images.unsplash.com' in src: result = { 'image_url': src, 'title': img.get('alt', ''), 'author': self._extract_author_unsplash(img) } results.append(result) return results def _extract_author_unsplash(self, element) -> Dict: """从Unsplash元素提取作者信息""" # 根据实际页面结构调整 author_info = { 'name': '', 'username': '', 'profile_url': '' } # 尝试找到作者信息 parent = element.find_parent('a', href=re.compile(r'/@')) if parent and parent.get('href'): author_info['profile_url'] = 'https://unsplash.com' + parent['href'] author_info['username'] = parent['href'].split('/@')[-1] return author_info

5. 存储管理器

src/storage.py:

python

import json import csv import sqlite3 from pathlib import Path from typing import List, Dict, Any, Optional import aiofiles import asyncio from datetime import datetime import hashlib from PIL import Image import io class StorageManager: """存储管理器""" def __init__(self, config): self.config = config self.base_path = Path(config.STORAGE_PATH) self.db_conn: Optional[sqlite3.Connection] = None async def initialize(self): """初始化存储目录和数据库""" # 创建目录结构 directories = ['images', 'thumbnails', 'metadata', 'logs'] for dir_name in directories: (self.base_path / dir_name).mkdir(parents=True, exist_ok=True) # 初始化数据库 self._init_database() def _init_database(self): """初始化SQLite数据库""" db_path = self.base_path / 'materials.db' self.db_conn = sqlite3.connect(str(db_path)) # 创建表 cursor = self.db_conn.cursor() # 素材表 cursor.execute(''' CREATE TABLE IF NOT EXISTS materials ( id TEXT PRIMARY KEY, title TEXT, description TEXT, image_url TEXT, download_url TEXT, author_name TEXT, author_url TEXT, tags TEXT, keywords TEXT, site TEXT, file_path TEXT, file_hash TEXT UNIQUE, file_size INTEGER, width INTEGER, height INTEGER, color TEXT, likes INTEGER, crawled_at TEXT, metadata TEXT ) ''') # 下载记录表 cursor.execute(''' CREATE TABLE IF NOT EXISTS download_logs ( id INTEGER PRIMARY KEY AUTOINCREMENT, material_id TEXT, file_path TEXT, download_time TEXT, status TEXT, error_message TEXT, FOREIGN KEY (material_id) REFERENCES materials (id) ) ''') self.db_conn.commit() def get_filepath(self, site_name: str, file_type: str, filename: str) -> Path: """获取文件路径""" date_str = datetime.now().strftime('%Y/%m/%d') filepath = self.base_path / site_name / file_type / date_str / filename # 确保目录存在 filepath.parent.mkdir(parents=True, exist_ok=True) return filepath def save_metadata(self, items: List[Dict], site_name: str) -> Path: """保存元数据""" timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') metadata_file = self.base_path / 'metadata' / f'{site_name}_{timestamp}.json' # 保存为JSON with open(metadata_file, 'w', encoding='utf-8') as f: json.dump(items, f, ensure_ascii=False, indent=2) # 同时保存到数据库 self._save_to_database(items) # 保存为CSV(可选) csv_file = metadata_file.with_suffix('.csv') self._save_to_csv(items, csv_file) return metadata_file def _save_to_database(self, items: List[Dict]): """保存数据到数据库""" cursor = self.db_conn.cursor() for item in items: cursor.execute(''' INSERT OR REPLACE INTO materials ( id, title, description, image_url, download_url, author_name, author_url, tags, keywords, site, file_path, file_hash, file_size, width, height, color, likes, crawled_at, metadata ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( item.get('id'), item.get('title'), item.get('description'), item.get('image_url'), item.get('download_url'), item.get('author', {}).get('name'), item.get('author', {}).get('profile_url'), json.dumps(item.get('tags', [])), json.dumps(item.get('keywords', [])), item.get('site'), item.get('file_path'), item.get('file_hash'), item.get('file_size'), item.get('metadata', {}).get('width'), item.get('metadata', {}).get('height'), item.get('metadata', {}).get('color'), item.get('metadata', {}).get('likes'), item.get('crawled_at'), json.dumps(item.get('metadata', {})) )) self.db_conn.commit() def _save_to_csv(self, items: List[Dict], csv_file: Path): """保存数据到CSV""" if not items: return # 提取CSV字段 fieldnames = [ 'id', 'title', 'description', 'site', 'author_name', 'keywords', 'tags', 'file_path', 'file_size', 'crawled_at' ] with open(csv_file, 'w', newline='', encoding='utf-8') as f: writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() for item in items: row = { 'id': item.get('id', ''), 'title': item.get('title', ''), 'description': item.get('description', ''), 'site': item.get('site', ''), 'author_name': item.get('author', {}).get('name', ''), 'keywords': ';'.join(item.get('keywords', [])), 'tags': ';'.join(item.get('tags', [])), 'file_path': item.get('file_path', ''), 'file_size': item.get('file_size', 0), 'crawled_at': item.get('crawled_at', '') } writer.writerow(row) async def create_thumbnail(self, image_path: Path, size: tuple = (200, 200)): """创建缩略图""" try: thumb_path = self.base_path / 'thumbnails' / image_path.name # 异步执行图像处理 await asyncio.to_thread(self._generate_thumbnail, image_path, thumb_path, size) except Exception as e: print(f"创建缩略图失败 {image_path}: {str(e)}") def _generate_thumbnail(self, src_path: Path, dst_path: Path, size: tuple): """生成缩略图(在单独线程中执行)""" with Image.open(src_path) as img: img.thumbnail(size, Image.Resampling.LANCZOS) # 确保缩略图目录存在 dst_path.parent.mkdir(parents=True, exist_ok=True) # 保存缩略图 img.save(dst_path, 'JPEG', quality=85) async def close(self): """关闭存储资源""" if self.db_conn: self.db_conn.close()

6. 工具函数

src/utils.py:

python

import asyncio import hashlib import time from functools import wraps from pathlib import Path from typing import Callable, Any import aiofiles def rate_limiter(max_calls: int, period: float): """限流装饰器""" def decorator(func): last_reset = time.time() call_count = 0 @wraps(func) async def wrapper(*args, **kwargs): nonlocal last_reset, call_count current_time = time.time() if current_time - last_reset > period: last_reset = current_time call_count = 0 if call_count >= max_calls: wait_time = period - (current_time - last_reset) if wait_time > 0: await asyncio.sleep(wait_time) last_reset = time.time() call_count = 0 call_count += 1 return await func(*args, **kwargs) return wrapper return decorator def retry_handler(max_retries: int = 3, delay: float = 1.0): """重试装饰器""" def decorator(func): @wraps(func) async def wrapper(*args, **kwargs): last_exception = None for attempt in range(max_retries + 1): try: return await func(*args, **kwargs) except Exception as e: last_exception = e if attempt < max_retries: wait_time = delay * (2 ** attempt) # 指数退避 await asyncio.sleep(wait_time) raise last_exception return wrapper return decorator async def generate_file_hash(filepath: Path, algorithm: str = 'sha256') -> str: """生成文件哈希""" hash_func = hashlib.new(algorithm) async with aiofiles.open(filepath, 'rb') as f: while chunk := await f.read(8192): hash_func.update(chunk) return hash_func.hexdigest() def validate_url(url: str) -> bool: """验证URL格式""" from urllib.parse import urlparse try: result = urlparse(url) return all([result.scheme, result.netloc]) except: return False class ProgressTracker: """进度跟踪器""" def __init__(self, total: int): self.total = total self.completed = 0 self.start_time = time.time() def update(self, increment: int = 1): """更新进度""" self.completed += increment # 计算进度百分比 percentage = (self.completed / self.total) * 100 # 计算预计剩余时间 elapsed = time.time() - self.start_time if self.completed > 0: estimated_total = elapsed * (self.total / self.completed) remaining = estimated_total - elapsed else: remaining = 0 print(f"进度: {percentage:.1f}% | 已完成: {self.completed}/{self.total} | " f"剩余时间: {remaining:.0f}秒")

7. 主程序入口

main.py:

python

import asyncio import logging from pathlib import Path import sys from config.settings import Config from src.crawler import DesignMaterialCrawler def setup_logging(): """配置日志""" log_dir = Path('logs') log_dir.mkdir(exist_ok=True) log_file = log_dir / f'crawler_{asyncio.get_event_loop().time()}.log' logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(log_file, encoding='utf-8'), logging.StreamHandler(sys.stdout) ] ) async def main(): """主函数""" # 配置日志 setup_logging() logger = logging.getLogger(__name__) # 加载配置 config = Config() # 定义爬取任务 sites_keywords = { 'unsplash': ['design', 'background', 'texture', 'pattern', 'minimal'], # 'pexels': ['design', 'creative', 'art', 'graphic'] # 需要API Key } try: async with DesignMaterialCrawler(config) as crawler: # 批量爬取 await crawler.crawl_batch(sites_keywords, max_items_per_site=20) # 或者单独爬取 # await crawler.crawl_site('unsplash', ['nature', 'technology'], max_items=50) except KeyboardInterrupt: logger.info("用户中断爬取过程") except Exception as e: logger.error(f"爬取过程发生错误: {str(e)}", exc_info=True) finally: logger.info("爬虫程序结束") if __name__ == '__main__': # 设置事件循环策略(Windows兼容) if sys.platform == 'win32': asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) # 运行主程序 asyncio.run(main())

高级特性与优化

1. 分布式爬虫扩展

python

# src/distributed.py import asyncio import redis.asyncio as redis from typing import List, Dict import json class DistributedCrawler: """分布式爬虫管理器""" def __init__(self, config): self.config = config self.redis_client = None self.queue_name = 'crawler:tasks' self.result_name = 'crawler:results' async def initialize(self): """初始化Redis连接""" self.redis_client = await redis.from_url( self.config.REDIS_URL, decode_responses=True ) async def push_tasks(self, tasks: List[Dict]): """推送任务到队列""" for task in tasks: await self.redis_client.lpush( self.queue_name, json.dumps(task) ) async def get_results(self, count: int = 100) -> List[Dict]: """获取处理结果""" results = [] for _ in range(count): result = await self.redis_client.rpop(self.result_name) if result: results.append(json.loads(result)) return results

2. 图像内容分析

python

# src/image_analyzer.py import cv2 import numpy as np from PIL import Image import asyncio from typing import Dict class ImageAnalyzer: """图像内容分析器""" @staticmethod async def analyze_image(image_path: Path) -> Dict: """分析图像特征""" # 在单独线程中执行CPU密集型操作 return await asyncio.to_thread( ImageAnalyzer._analyze_image_sync, image_path ) @staticmethod def _analyze_image_sync(image_path: Path) -> Dict: """同步图像分析""" try: # 使用OpenCV分析 image = cv2.imread(str(image_path)) if image is None: return {} # 提取颜色直方图 hist = cv2.calcHist([image], [0, 1, 2], None, [8, 8, 8], [0, 256, 0, 256, 0, 256]) hist = cv2.normalize(hist, hist).flatten() # 计算平均颜色 avg_color = cv2.mean(image)[:3] # 检测边缘 gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) edges = cv2.Canny(gray, 100, 200) edge_density = np.sum(edges > 0) / edges.size return { 'dominant_colors': hist.tolist(), 'average_color': avg_color, 'edge_density': float(edge_density), 'resolution': f"{image.shape[1]}x{image.shape[0]}" } except Exception as e: print(f"图像分析失败: {str(e)}") return {}

爬虫伦理与合规性

1. Robots.txt 遵守

python

# src/robots_checker.py import urllib.robotparser import aiohttp class RobotsChecker: """Robots.txt检查器""" def __init__(self): self.parser = urllib.robotparser.RobotFileParser() async def can_fetch(self, url: str, user_agent: str = '*') -> bool: """检查是否允许爬取""" base_url = self._extract_base_url(url) robots_url = f"{base_url}/robots.txt" try: async with aiohttp.ClientSession() as session: async with session.get(robots_url) as response: if response.status == 200: content = await response.text() self.parser.parse(content.splitlines()) return self.parser.can_fetch(user_agent, url) except: pass return True # 如果无法获取robots.txt,默认允许 def _extract_base_url(self, url: str) -> str: """提取基础URL""" from urllib.parse import urlparse parsed = urlparse(url) return f"{parsed.scheme}://{parsed.netloc}"

2. 使用注意事项

  1. 尊重版权:仅下载允许商业使用的素材

  2. 遵守条款:仔细阅读目标网站的Terms of Service

  3. 控制频率:添加适当延迟,避免对服务器造成压力

  4. 设置User-Agent:明确标识你的爬虫

  5. 处理错误:妥善处理404、429等HTTP状态码

  6. 数据去重:避免重复下载相同内容

性能优化建议

  1. 连接池复用:重用HTTP连接,减少TCP握手开销

  2. 异步文件IO:使用aiofiles避免文件操作阻塞事件循环

  3. 内存管理:及时释放大对象,使用生成器处理大量数据

  4. 错误恢复:实现检查点机制,支持断点续爬

  5. 缓存策略:缓存已解析的页面和数据

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询