驻马店市网站建设_网站建设公司_VPS_seo优化
2026/1/7 10:55:35 网站建设 项目流程

一、概述:行业数据报告采集的重要性与挑战

在当今数据驱动的商业环境中,行业数据报告对于市场分析、竞品研究、战略规划具有至关重要的作用。然而,这些宝贵的数据往往分散在各个网站、平台和PDF文档中,手动收集耗时耗力。本文将介绍如何使用最新的Python爬虫技术,构建一个高效、稳定的行业数据报告采集系统。

本文将重点介绍:

  1. 使用Playwright进行现代化网页数据采集

  2. 异步编程提升采集效率

  3. 智能解析PDF报告内容

  4. 数据清洗与存储方案

  5. 反爬虫策略与伦理考虑

二、技术栈选择:为什么选择这些最新技术?

2.1 Playwright vs Selenium vs Requests

Playwright是微软开发的现代化浏览器自动化工具,相比传统的Selenium和Requests,具有以下优势:

  • 支持所有现代浏览器(Chromium、Firefox、WebKit)

  • 自动等待元素加载,减少等待时间

  • 更强大的选择器和事件处理

  • 内置截图、录屏功能

  • 更好的TypeScript/Python支持

2.2 异步编程(asyncio)

使用异步IO可以同时处理多个网页请求,显著提升采集效率,特别是在需要处理大量页面时。

2.3 其他关键技术

  • Pandas:数据处理与分析

  • PyPDF2 / pdfplumber:PDF内容提取

  • MongoDB / PostgreSQL:数据存储

  • FastAPI:构建数据API服务

三、完整爬虫系统架构设计

python

""" 行业数据报告采集系统 - 完整实现 支持:网页爬取、PDF解析、数据清洗、持久化存储 """ import asyncio import re import json import pandas as pd from datetime import datetime from typing import List, Dict, Optional, Any from dataclasses import dataclass, asdict import logging from urllib.parse import urljoin, urlparse import hashlib # PDF处理相关 import pdfplumber from PyPDF2 import PdfReader # 数据库相关 from sqlalchemy import create_engine, Column, String, Text, DateTime, JSON from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker # 异步爬虫核心 from playwright.async_api import async_playwright, Browser, Page, Response import aiohttp from aiohttp import ClientSession, TCPConnector import aiofiles # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('industry_data_crawler.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # 数据模型定义 Base = declarative_base() class IndustryReport(Base): """行业报告数据模型""" __tablename__ = 'industry_reports' id = Column(String(64), primary_key=True) title = Column(String(500), nullable=False) source_url = Column(String(1000), nullable=False) publish_date = Column(DateTime) industry_category = Column(String(200)) data_source = Column(String(200)) file_url = Column(String(1000)) file_type = Column(String(50)) # pdf, doc, html等 content_summary = Column(Text) full_content = Column(Text) keywords = Column(JSON) metadata = Column(JSON) created_at = Column(DateTime, default=datetime.now) updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now) def __repr__(self): return f"<IndustryReport(title='{self.title[:50]}...', source='{self.data_source}')>" @dataclass class ReportData: """报告数据结构类""" title: str source_url: str publish_date: Optional[datetime] industry_category: str data_source: str file_url: Optional[str] file_type: str content_summary: str full_content: str keywords: List[str] metadata: Dict[str, Any] def generate_id(self): """生成唯一ID""" content = f"{self.source_url}{self.title}{self.publish_date}" return hashlib.sha256(content.encode()).hexdigest() def to_dict(self): """转换为字典""" data = asdict(self) data['id'] = self.generate_id() data['publish_date'] = self.publish_date.isoformat() if self.publish_date else None return data class IndustryDataCrawler: """行业数据报告采集器主类""" def __init__(self, config_path: str = "config.json"): """ 初始化采集器 Args: config_path: 配置文件路径 """ self.config = self._load_config(config_path) self.db_engine = None self.db_session = None self._init_database() self.user_agents = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36", ] def _load_config(self, config_path: str) -> Dict: """加载配置文件""" default_config = { "database": { "url": "sqlite:///industry_reports.db", "echo": False }, "crawler": { "max_concurrent": 5, "request_timeout": 30, "retry_times": 3, "delay_range": [1, 3] }, "target_sites": { "艾瑞咨询": "https://www.iresearch.com.cn", "易观分析": "https://www.analysys.cn", "艾媒网": "https://www.iimedia.cn", "199IT": "http://www.199it.com", "数据局": "https://www.shujuju.cn" }, "proxies": [], # 代理配置 "keywords": ["行业报告", "白皮书", "市场分析", "趋势报告"] } try: with open(config_path, 'r', encoding='utf-8') as f: user_config = json.load(f) # 合并配置 for key in default_config: if key in user_config: if isinstance(default_config[key], dict) and isinstance(user_config[key], dict): default_config[key].update(user_config[key]) else: default_config[key] = user_config[key] except FileNotFoundError: logger.warning(f"配置文件 {config_path} 不存在,使用默认配置") return default_config def _init_database(self): """初始化数据库连接""" db_url = self.config["database"]["url"] self.db_engine = create_engine( db_url, echo=self.config["database"]["echo"] ) Base.metadata.create_all(self.db_engine) Session = sessionmaker(bind=self.db_engine) self.db_session = Session() logger.info(f"数据库初始化完成: {db_url}") async def crawl_site(self, site_name: str, site_url: str): """ 爬取指定网站的报告 Args: site_name: 网站名称 site_url: 网站URL """ logger.info(f"开始爬取网站: {site_name} ({site_url})") async with async_playwright() as p: # 启动浏览器 browser = await p.chromium.launch( headless=True, args=['--disable-blink-features=AutomationControlled'] ) # 创建上下文 context = await browser.new_context( viewport={'width': 1920, 'height': 1080}, user_agent=self.user_agents[0] ) # 创建页面 page = await context.new_page() try: # 访问网站 await page.goto(site_url, timeout=60000) # 根据不同网站采取不同的爬取策略 if "iresearch" in site_url: reports = await self._crawl_iresearch(page, site_url) elif "analysys" in site_url: reports = await self._crawl_analysys(page, site_url) elif "iimedia" in site_url: reports = await self._crawl_iimedia(page, site_url) else: reports = await self._crawl_general_site(page, site_url) # 处理获取到的报告 for report_info in reports: try: report_data = await self._process_report(report_info) if report_data: self._save_to_database(report_data) logger.info(f"成功保存报告: {report_data.title}") except Exception as e: logger.error(f"处理报告失败: {report_info.get('url', '未知')}, 错误: {str(e)}") logger.info(f"网站 {site_name} 爬取完成,共获取 {len(reports)} 个报告") except Exception as e: logger.error(f"爬取网站 {site_name} 失败: {str(e)}") finally: await browser.close() async def _crawl_iresearch(self, page: Page, base_url: str) -> List[Dict]: """爬取艾瑞咨询网站""" reports = [] try: # 查找报告链接 report_links = await page.query_selector_all( "a[href*='report'], a:has-text('报告'), a:has-text('白皮书')" ) for link in report_links[:20]: # 限制数量 href = await link.get_attribute('href') title = await link.text_content() or await link.get_attribute('title') or "" if href and any(kw in title for kw in self.config["keywords"]): full_url = urljoin(base_url, href) reports.append({ 'url': full_url, 'title': title.strip(), 'source': '艾瑞咨询' }) # 尝试搜索报告列表页 await page.goto(f"{base_url}/report.shtml", timeout=30000) # 等待内容加载 await page.wait_for_selector(".report-list, .list-content", timeout=10000) # 提取更多报告 items = await page.query_selector_all(".report-item, .list-item") for item in items[:30]: try: link = await item.query_selector("a") if link: href = await link.get_attribute('href') title = await link.text_content() or "" if href and any(kw in title for kw in self.config["keywords"]): full_url = urljoin(base_url, href) # 提取日期 date_elem = await item.query_selector(".date, .time") date_str = await date_elem.text_content() if date_elem else "" reports.append({ 'url': full_url, 'title': title.strip(), 'date': self._parse_date(date_str.strip()), 'source': '艾瑞咨询' }) except Exception as e: logger.debug(f"提取报告项失败: {str(e)}") continue except Exception as e: logger.error(f"爬取艾瑞咨询失败: {str(e)}") return reports async def _crawl_analysys(self, page: Page, base_url: str) -> List[Dict]: """爬取易观分析网站""" reports = [] try: # 导航到报告页面 await page.goto(f"{base_url}/report", timeout=30000) await page.wait_for_load_state('networkidle') # 提取报告信息 report_elements = await page.query_selector_all(".report-card, .article-item") for elem in report_elements[:25]: try: # 提取标题和链接 title_elem = await elem.query_selector("h3, h4, .title") link_elem = await elem.query_selector("a") if not link_elem: continue href = await link_elem.get_attribute('href') title = await title_elem.text_content() if title_elem else await link_elem.text_content() if href and title: full_url = urljoin(base_url, href) # 提取日期和摘要 date_elem = await elem.query_selector(".date, .time, .publish-date") date_str = await date_elem.text_content() if date_elem else "" summary_elem = await elem.query_selector(".summary, .description, .abstract") summary = await summary_elem.text_content() if summary_elem else "" reports.append({ 'url': full_url, 'title': title.strip(), 'date': self._parse_date(date_str.strip()), 'summary': summary.strip(), 'source': '易观分析' }) except Exception as e: logger.debug(f"提取报告元素失败: {str(e)}") continue except Exception as e: logger.error(f"爬取易观分析失败: {str(e)}") return reports async def _crawl_iimedia(self, page: Page, base_url: str) -> List[Dict]: """爬取艾媒网""" reports = [] try: # 直接访问报告列表页 await page.goto(f"{base_url}/report", timeout=30000) # 使用更通用的选择器 await page.wait_for_selector("a[href*='report']:visible", timeout=10000) # 查找所有包含报告的链接 links = await page.query_selector_all( "a[href*='report']:visible, a:has-text('报告'):visible, a:has-text('Research'):visible" ) seen_urls = set() for link in links: try: href = await link.get_attribute('href') if not href or href in seen_urls: continue title = await link.text_content() or await link.get_attribute('title') or "" # 过滤非报告链接 if not any(kw in title for kw in self.config["keywords"]): continue full_url = urljoin(base_url, href) seen_urls.add(href) reports.append({ 'url': full_url, 'title': title.strip()[:200], 'source': '艾媒网' }) except Exception as e: continue except Exception as e: logger.error(f"爬取艾媒网失败: {str(e)}") return reports async def _crawl_general_site(self, page: Page, base_url: str) -> List[Dict]: """通用网站爬取方法""" reports = [] try: # 搜索报告相关页面 search_selectors = [ "a[href*='report']", "a[href*='white-paper']", "a[href*='research']", "a:has-text('报告')", "a:has-text('白皮书')", "a:has-text('研究')" ] for selector in search_selectors: try: links = await page.query_selector_all(selector) for link in links: try: href = await link.get_attribute('href') title = await link.text_content() or "" if href and title: full_url = urljoin(base_url, href) # 检查是否已存在 if not any(r['url'] == full_url for r in reports): reports.append({ 'url': full_url, 'title': title.strip()[:300], 'source': urlparse(base_url).netloc }) except: continue except: continue # 去重 unique_reports = [] seen_urls = set() for report in reports: if report['url'] not in seen_urls: seen_urls.add(report['url']) unique_reports.append(report) except Exception as e: logger.error(f"通用爬取失败: {str(e)}") return unique_reports[:50] # 限制数量 async def _process_report(self, report_info: Dict) -> Optional[ReportData]: """ 处理单个报告,提取详细信息 Args: report_info: 报告基本信息 Returns: ReportData对象或None """ try: url = report_info['url'] # 判断是否是PDF文件 if url.lower().endswith('.pdf'): return await self._process_pdf_report(url, report_info) else: return await self._process_html_report(url, report_info) except Exception as e: logger.error(f"处理报告失败 {report_info.get('url', '未知')}: {str(e)}") return None async def _process_pdf_report(self, pdf_url: str, report_info: Dict) -> Optional[ReportData]: """处理PDF报告""" try: logger.info(f"处理PDF报告: {pdf_url}") # 下载PDF文件 pdf_content = await self._download_file(pdf_url) if not pdf_content: return None # 解析PDF内容 pdf_info = await self._parse_pdf_content(pdf_content, pdf_url) # 构建报告数据 report_data = ReportData( title=report_info.get('title') or pdf_info.get('title', '未知标题'), source_url=pdf_url, publish_date=report_info.get('date') or pdf_info.get('date'), industry_category=self._categorize_industry(report_info.get('title', '')), data_source=report_info.get('source', '未知来源'), file_url=pdf_url, file_type='pdf', content_summary=pdf_info.get('summary', '')[:500], full_content=pdf_info.get('full_text', '')[:10000], # 限制长度 keywords=self._extract_keywords(pdf_info.get('full_text', '')), metadata={ 'page_count': pdf_info.get('page_count', 0), 'file_size': len(pdf_content), 'extracted_time': datetime.now().isoformat() } ) return report_data except Exception as e: logger.error(f"处理PDF报告失败 {pdf_url}: {str(e)}") return None async def _process_html_report(self, url: str, report_info: Dict) -> Optional[ReportData]: """处理HTML报告页面""" try: async with async_playwright() as p: browser = await p.chromium.launch(headless=True) context = await browser.new_context( user_agent=self.user_agents[1] ) page = await context.new_page() await page.goto(url, timeout=60000) await page.wait_for_load_state('networkidle') # 提取页面内容 title = await self._extract_page_title(page) content = await self._extract_main_content(page) date = await self._extract_publish_date(page) # 查找PDF下载链接 pdf_links = await page.query_selector_all("a[href$='.pdf']") pdf_url = None if pdf_links: pdf_href = await pdf_links[0].get_attribute('href') pdf_url = urljoin(url, pdf_href) if pdf_href else None await browser.close() # 构建报告数据 report_data = ReportData( title=title or report_info.get('title', '未知标题'), source_url=url, publish_date=date or report_info.get('date'), industry_category=self._categorize_industry(title or ''), data_source=report_info.get('source', '未知来源'), file_url=pdf_url, file_type='html' if not pdf_url else 'pdf', content_summary=self._generate_summary(content)[:500], full_content=content[:15000], # 限制长度 keywords=self._extract_keywords(content), metadata={ 'has_pdf': bool(pdf_url), 'word_count': len(content), 'extracted_time': datetime.now().isoformat() } ) return report_data except Exception as e: logger.error(f"处理HTML报告失败 {url}: {str(e)}") return None async def _download_file(self, url: str) -> Optional[bytes]: """下载文件""" try: async with aiohttp.ClientSession() as session: async with session.get(url, timeout=30) as response: if response.status == 200: return await response.read() else: logger.warning(f"下载文件失败 {url}: HTTP {response.status}") return None except Exception as e: logger.error(f"下载文件失败 {url}: {str(e)}") return None async def _parse_pdf_content(self, pdf_content: bytes, pdf_url: str) -> Dict: """解析PDF内容""" pdf_info = { 'title': '', 'date': None, 'summary': '', 'full_text': '', 'page_count': 0 } try: # 使用pdfplumber提取文本 with pdfplumber.open(io.BytesIO(pdf_content)) as pdf: pdf_info['page_count'] = len(pdf.pages) # 提取所有页面文本 full_text = "" for i, page in enumerate(pdf.pages[:50]): # 限制前50页 text = page.extract_text() if text: full_text += text + "\n" pdf_info['full_text'] = full_text # 尝试从第一页提取标题 if pdf.pages: first_page_text = pdf.pages[0].extract_text() or "" lines = first_page_text.split('\n') if lines: pdf_info['title'] = lines[0][:200] # 生成摘要 if full_text: pdf_info['summary'] = self._generate_summary(full_text[:5000]) except Exception as e: logger.error(f"解析PDF失败 {pdf_url}: {str(e)}") # 回退到PyPDF2 try: reader = PdfReader(io.BytesIO(pdf_content)) pdf_info['page_count'] = len(reader.pages) full_text = "" for page in reader.pages[:10]: text = page.extract_text() if text: full_text += text pdf_info['full_text'] = full_text except Exception as e2: logger.error(f"PyPDF2解析也失败 {pdf_url}: {str(e2)}") return pdf_info async def _extract_page_title(self, page: Page) -> str: """提取页面标题""" try: # 尝试多种选择器 selectors = [ "h1", ".title", ".article-title", ".report-title", "title" ] for selector in selectors: try: element = await page.query_selector(selector) if element: title = await element.text_content() if title and len(title) > 10: return title.strip() except: continue # 回退到页面标题 return (await page.title())[:200] except: return "" async def _extract_main_content(self, page: Page) -> str: """提取主要内容""" try: # 尝试常见的内容选择器 content_selectors = [ ".article-content", ".report-content", ".content", ".main-content", "article", ".details", ".body" ] for selector in content_selectors: try: element = await page.query_selector(selector) if element: text = await element.text_content() if text and len(text) > 200: return self._clean_text(text) except: continue # 回退到body提取 body = await page.query_selector("body") if body: text = await body.text_content() return self._clean_text(text)[:10000] return "" except Exception as e: logger.error(f"提取内容失败: {str(e)}") return "" async def _extract_publish_date(self, page: Page) -> Optional[datetime]: """提取发布日期""" try: date_selectors = [ ".publish-date", ".date", ".time", "[itemprop='datePublished']", "meta[property='article:published_time']", "meta[name='publish_date']" ] for selector in date_selectors: try: if selector.startswith("meta"): element = await page.query_selector(selector) if element: date_str = await element.get_attribute('content') if date_str: return self._parse_date(date_str) else: element = await page.query_selector(selector) if element: date_text = await element.text_content() if date_text: return self._parse_date(date_text.strip()) except: continue return None except: return None def _parse_date(self, date_str: str) -> Optional[datetime]: """解析日期字符串""" if not date_str: return None # 常见日期格式 date_patterns = [ r'(\d{4})[-/年](\d{1,2})[-/月](\d{1,2})[日]?', r'(\d{4})\.(\d{1,2})\.(\d{1,2})', r'(\d{1,2})[-/](\d{1,2})[-/](\d{4})', ] for pattern in date_patterns: match = re.search(pattern, date_str) if match: try: groups = match.groups() if len(groups) == 3: # 统一转换为YYYY-MM-DD格式 if len(groups[0]) == 4: # YYYY-MM-DD year, month, day = groups else: # DD-MM-YYYY day, month, year = groups return datetime(int(year), int(month), int(day)) except: continue return None def _categorize_industry(self, text: str) -> str: """行业分类""" industry_keywords = { "互联网": ["互联网", "电商", "社交", "游戏", "在线教育", "短视频", "直播"], "金融": ["金融", "银行", "保险", "证券", "支付", "区块链", "数字货币"], "科技": ["科技", "人工智能", "AI", "大数据", "云计算", "5G", "物联网"], "消费": ["消费", "零售", "餐饮", "食品", "饮料", "美妆", "服装"], "医疗": ["医疗", "医药", "健康", "医院", "疫苗", "生物"], "汽车": ["汽车", "新能源", "自动驾驶", "电动车", "造车"], "房地产": ["房地产", "房产", "楼市", "房价", "住宅"], "教育": ["教育", "培训", "学校", "在线教育", "K12"], "旅游": ["旅游", "酒店", "航空", "景区", "出行"] } text_lower = text.lower() for industry, keywords in industry_keywords.items(): for keyword in keywords: if keyword.lower() in text_lower: return industry return "其他" def _extract_keywords(self, text: str, top_n: int = 10) -> List[str]: """提取关键词""" if not text: return [] # 简单的关键词提取(实际项目建议使用jieba等库) words = re.findall(r'[\u4e00-\u9fa5]{2,6}', text) # 过滤停用词 stop_words = {"的", "了", "在", "是", "和", "与", "及", "或", "等", "有", "这个", "这些"} filtered_words = [w for w in words if w not in stop_words] # 词频统计 from collections import Counter word_counts = Counter(filtered_words) return [word for word, count in word_counts.most_common(top_n)] def _generate_summary(self, text: str, max_length: int = 500) -> str: """生成摘要""" if not text: return "" # 简单的摘要生成(实际项目建议使用文本摘要算法) sentences = re.split(r'[。!?!?]', text) # 取前几个句子作为摘要 summary = "" for sentence in sentences: if len(summary) + len(sentence) < max_length: summary += sentence + "。" else: break return summary.strip() or text[:max_length] def _clean_text(self, text: str) -> str: """清洗文本""" if not text: return "" # 去除多余空白字符 text = re.sub(r'\s+', ' ', text) # 去除特殊字符 text = re.sub(r'[^\w\u4e00-\u9fa5\s.,!?;:,。!?;:、()()【】\[\]《》<>]', '', text) return text.strip() def _save_to_database(self, report_data: ReportData): """保存到数据库""" try: # 检查是否已存在 existing = self.db_session.query(IndustryReport).filter_by( id=report_data.generate_id() ).first() if existing: # 更新现有记录 existing.title = report_data.title existing.content_summary = report_data.content_summary existing.full_content = report_data.full_content existing.updated_at = datetime.now() logger.info(f"更新报告: {report_data.title}") else: # 创建新记录 report_dict = report_data.to_dict() db_report = IndustryReport(**report_dict) self.db_session.add(db_report) logger.info(f"新增报告: {report_data.title}") self.db_session.commit() except Exception as e: logger.error(f"保存到数据库失败: {str(e)}") self.db_session.rollback() async def run(self): """运行爬虫""" logger.info("开始行业数据报告采集任务") start_time = datetime.now() sites = self.config["target_sites"] # 创建异步任务 tasks = [] for site_name, site_url in sites.items(): task = asyncio.create_task(self.crawl_site(site_name, site_url)) tasks.append(task) # 等待所有任务完成 await asyncio.gather(*tasks, return_exceptions=True) # 统计结果 total_reports = self.db_session.query(IndustryReport).count() end_time = datetime.now() duration = (end_time - start_time).total_seconds() logger.info(f"采集任务完成!") logger.info(f"总耗时: {duration:.2f}秒") logger.info(f"数据库中共有报告: {total_reports}份") # 生成统计报告 self._generate_statistics_report() def _generate_statistics_report(self): """生成统计报告""" try: # 查询统计数据 reports = self.db_session.query(IndustryReport).all() if not reports: logger.warning("没有找到报告数据") return # 创建DataFrame data = [] for report in reports: data.append({ '标题': report.title, '来源': report.data_source, '行业分类': report.industry_category, '发布日期': report.publish_date.strftime('%Y-%m-%d') if report.publish_date else '未知', '文件类型': report.file_type, '关键词': ', '.join(report.keywords) if report.keywords else '' }) df = pd.DataFrame(data) # 保存为Excel excel_path = f"industry_reports_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx" df.to_excel(excel_path, index=False) # 生成统计信息 stats = { '报告总数': len(df), '来源分布': df['来源'].value_counts().to_dict(), '行业分布': df['行业分类'].value_counts().to_dict(), '文件类型分布': df['文件类型'].value_counts().to_dict() } # 保存统计信息 stats_path = f"crawler_stats_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" with open(stats_path, 'w', encoding='utf-8') as f: json.dump(stats, f, ensure_ascii=False, indent=2) logger.info(f"统计报告已保存: {excel_path}") logger.info(f"统计信息已保存: {stats_path}") # 打印摘要 print("\n" + "="*50) print("行业数据报告采集统计摘要") print("="*50) print(f"报告总数: {stats['报告总数']}") print("\n来源分布:") for source, count in stats['来源分布'].items(): print(f" {source}: {count}") print("\n行业分布:") for industry, count in stats['行业分布'].items(): print(f" {industry}: {count}") except Exception as e: logger.error(f"生成统计报告失败: {str(e)}") def export_to_csv(self, output_path: str = "industry_reports.csv"): """导出数据到CSV""" try: reports = self.db_session.query(IndustryReport).all() data = [] for report in reports: data.append({ 'id': report.id, 'title': report.title, 'source': report.data_source, 'url': report.source_url, 'publish_date': report.publish_date.isoformat() if report.publish_date else '', 'industry': report.industry_category, 'file_type': report.file_type, 'summary': report.content_summary, 'keywords': json.dumps(report.keywords, ensure_ascii=False) if report.keywords else '', 'created_at': report.created_at.isoformat() }) df = pd.DataFrame(data) df.to_csv(output_path, index=False, encoding='utf-8-sig') logger.info(f"数据已导出到: {output_path}") return df except Exception as e: logger.error(f"导出CSV失败: {str(e)}") return None # 使用示例 async def main(): """主函数""" # 创建爬虫实例 crawler = IndustryDataCrawler() # 运行爬虫 await crawler.run() # 导出数据 crawler.export_to_csv() # 关闭数据库连接 if crawler.db_session: crawler.db_session.close() print("\n采集任务已完成!") # 异步运行 if __name__ == "__main__": import io # 用于pdfplumber的BytesIO # 运行主函数 asyncio.run(main())

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询