海口市网站建设_网站建设公司_内容更新_seo优化
2026/1/10 2:32:03 网站建设 项目流程

目录

1. 项目背景与目标

2. 核心技术演进

2.1 方案对比:为什么旧方案慢?

2.2 关键代码优化点

3. 存储策略深度解析 (HDFS Block vs Spark Partition)

3.1 核心结论

3.2 各表最佳配置

4. 最终落地代码 (极速版)

4.1 通用 Python 提交脚本 (submit_job.py)

4.2 Dim 表 (4000万) & Info 表 (2000万) 通用逻辑

4.3 Related 表 (5000万) & Content 表 (1600万) 通用逻辑

5. 常见问题排查 (Troubleshooting)

6. 验证结果

7. 生产环境资源规划与分区策略 (基于集群实况)

7.1 集群现状与风险分析

7.2 资源隔离策略:弃帅保车

7.3 并行度计算 (The Math)

7.4 分区策略:速度与存储的动态平衡

7.5 最终落地配置

A. 提交脚本 (spark-submit 参数)

B. 代码逻辑 (PARTITION_CONF)


1. 项目背景与目标

之前生成千万级测试数据使用的Pyhive实现,生成数据耗时大,因此想测试Pyspark生成同数据量耗时多少。

在 CDH 集群(10个节点)上,基于现有的小样本数据,快速生成千万级测试数据,并确保数据分布均匀、写入高效。

集群配置如下:

表名目标数据量逻辑大小(估算)优化前耗时优化后耗时
Dim 表(企业基本信息)4000 万~30 GB~40 分钟< 5 分钟
Info 表(招标信息)2000 万~10 GB~20 分钟< 3 分钟
Related 表(关联关系)5000 万~3 GB~12 分钟< 2 分钟
Content 表(正文内容)1600 万~0.5 GB~15 分钟< 1 分钟

环境配置:

  • CDH 版本: 6.3.2

  • Spark 模式: YARN Client

  • 特殊限制: 必须避开节点cdh245(10.x.xx.245)。


2. 核心技术演进

2.1 方案对比:为什么旧方案慢?

特性旧方案 (循环膨胀法)新方案 (极速骨架法)
算法逻辑插入 -> 读取 -> 翻倍写入 -> 读取 -> ... (循环约20次)spark.range()内存生成骨架 ->CrossJoin模板 -> 一次写入
I/O 开销极高(反复读写磁盘)极低(几乎纯内存计算,仅一次写入)
任务调度启动 20+ 个 Spark Job,调度延迟大仅启动 1 个 Job,立即执行
文件特征灾难级:同时包含 KB 级小文件和 300MB+ 大文件完美级:文件大小均匀,数量可控
适用场景小数据量翻倍千万/亿级大规模造数

2.2 关键代码优化点

  1. 废弃While循环:改用spark.range(0, TARGET_ROWS)直接生成 ID 骨架。

  2. 动态分区探测:增加find_first_valid_partition函数,自动寻找源表最新分区作为“种子数据”,避免全表扫描。

  3. 精准控制并行度:使用repartition(N)强制控制写入文件数量,解决小文件问题。

  4. 节点黑名单:在提交脚本中配置spark.yarn.excludeNodes=cdh245


3. 存储策略深度解析 (HDFS Block vs Spark Partition)

3.1 核心结论

  • 不要强求 128MB:对于几十 GB 的数据量,为了保证计算时的并行度 (Concurrency),文件大小可以小于 128MB。

  • 最佳区间30MB - 100MB是当前数据量下的最佳平衡点。

  • 计算公式repartition的数量 = 逻辑数据总量 / 目标文件大小(忽略副本)。

3.2 各表最佳配置

  • Dim 表 (30GB):repartition(400)-> 单文件 ~75MB。

  • Info 表 (10GB):repartition(100)-> 单文件 ~100MB。

  • Related 表 (3GB):repartition(100)-> 单文件 ~30MB (为了高并发读取,牺牲少量 NameNode 内存是值得的)。

  • Content 表 (0.5GB):repartition(50)-> 单文件 ~10MB (数据量太小,为了保证有 50 个 Task 并发跑,必须切这么细)。


4. 最终落地代码 (极速版)

以下是经过所有优化后的最终版本代码逻辑。

4.1 通用 Python 提交脚本 (submit_job.py)

核心功能:自动上传代码,配置 YARN 资源,避开故障节点

# 关键配置片段 shell_content = f""" {SPARK_SUBMIT_BIN} \\ --master yarn \\ --deploy-mode client \\ --name 'Optimized_Job' \\ --conf "spark.yarn.excludeNodes=cdh245,10.8.15.245" \\ # <--- 核心配置 --driver-memory 4g \\ --executor-memory 4g \\ --num-executors 10 \\ --conf "spark.sql.shuffle.partitions=400" \\ {self.remote_py_path} """

4.2 Dim 表 (4000万) & Info 表 (2000万) 通用逻辑

核心功能:一次性生成,无循环,分区检测。

def explode_data_fast(spark): # 1. 动态寻找源表有效分区(种子) valid_part = find_first_valid_partition(spark, SOURCE_TABLE) df_template = spark.table(SOURCE_TABLE).filter(col("partition_date") == valid_part).limit(1) # 2. 内存生成骨架 (速度极快) df_skeleton = spark.range(0, TARGET_ROWS) # 3. 关联并生成数据 (使用广播 Join) df_final = df_skeleton.crossJoin(broadcast(df_template)) \ .withColumn("u_id", expr("uuid()")) \ .withColumn("partition_date", lit(FAKE_PART)) \ .drop("id") # 4. 并行写入 (Dim表设为400,Info表设为100) # 这一步保证了文件大小均匀且适中 df_final.repartition(400).write.mode("overwrite").insertInto(TARGET_TABLE_NAME)

4.3 Related 表 (5000万) & Content 表 (1600万) 通用逻辑

核心功能:取模关联算法,保证数据逻辑一致性。

# 骨架与种子数据的关联逻辑 df_joined = df_skeleton.crossJoin(broadcast(df_template)) \ .withColumn("uid_join_key", (col("row_idx") % 20000000).cast(LongType())) # 循环引用 Info 表 ID ​ # 写入时 Content 表因为数据量小,repartition(50) 即可 df_to_write.repartition(50).write.mode("overwrite").insertInto(TARGET_TABLE)

5. 常见问题排查 (Troubleshooting)

在执行过程中出现的日志现象及解释:

  1. HDFS 文件分布不均 / 耗时过长

    • 原因: 使用了旧的while循环逻辑。

    • 解决: 切换到上述“极速版”代码。

  2. SparkRackResolver: Got an error when resolving hostNames

    • 原因: 集群未配置机架感知脚本。

    • 影响: 无影响,会自动回退到/default-rack,可忽略。

  3. Trying to remove executor ... Asked to remove non-existent executor

    • 原因: Spark 的动态资源分配 (Dynamic Allocation) 或 容器被抢占。

    • 判断: 只要任务没 Fail,这是正常的资源调度行为。

  4. KeyboardInterrupt

    • 原因: 用户在 Driver 等待 YARN 资源分配时(SchedulerBackend is ready...)手动中断了脚本。

    • 解决: 耐心等待 1-2 分钟,或去 YARN 界面查看任务状态。建议脚本增加心跳保持机制。

6. 验证结果

任务执行完成后,通过以下命令验证,看到了完美的分布式存储状态:

hdfs dfs -ls /user/hive/warehouse/.../partition_date=20991231/
  • Info/Related 表: 生成 100 个文件,大小高度一致(约 30MB - 100MB)。

  • Content 表: 生成 50 个文件,大小高度一致(约 10MB)。

  • Dim 表: 生成 400 个文件,大小高度一致(约 75MB)。

结论:这就标志着造数任务不仅完成,而且达到了生产环境的高质量存储标准。

7. 生产环境资源规划与分区策略 (基于集群实况)

通过对 Cloudera Manager 集群资源的详细审计,重新制定了更符合生产环境现状的资源调度策略。下面详细阐述如何根据硬件规格计算并行度,以及如何保护集群核心节点。

7.1 集群现状与风险分析

  • 硬件规格:集群共 9 个节点 (cdh240-cdh249),配置高度统一,单节点物理内存31 GiB,磁盘约 1.4 TiB。

  • 角色分布不均

    • Master 节点 (cdh240):运行了 NameNode, ResourceManager 等16 个角色,内存仅剩约 15GB 可用。

    • Worker 节点 (cdh241等):仅运行 DataNode 等少数角色,内存极其空闲。

风险判断:如果按照全集群平均分配资源,cdh240会因资源耗尽而导致 NameNode 响应变慢,甚至引发整个集群的元数据服务卡顿。

7.2 资源隔离策略:弃帅保车

为了保障造数任务不影响集群稳定性,采取“主节点避让”策略:

  • 计算节点池:仅使用8 个Worker 节点 (cdh241-cdh249),彻底排除cdh240

  • 单节点资源计算

    • 物理内存:31 GB

    • 系统/Hadoop预留:-7 GB

    • YARN 安全可用24 GB

7.3 并行度计算 (The Math)

基于 8 个 Worker 节点进行算力规划:

  1. Executor 规格:采用标准容器规格,避免大内存 GC 卡顿。

    • 内存:4 GB(24GB / 6 = 4GB,整除且高效)

    • 核心:2 Cores(兼顾并发与调度开销)

  2. 集群总容量

    • 单节点 Executor 数:$24 \text{ GB} / 4 \text{ GB} = 6 \text{ 个}$

    • 集群总 Executor 数:$8 \text{ Nodes} \times 6 = 48 \text{ 个}$

    • 安全申请数45 个(预留 3 个余量给 Driver 和临时任务)

  3. 总并行度 (Total Cores)

    • $45 \text{ Executors} \times 2 \text{ Cores} = \mathbf{90 \text{ 并发核心}}$

7.4 分区策略:速度与存储的动态平衡

基于90 个并发核心HDFS 128MB Block两个基准,重新计算各表分区数。

表名数据量 (逻辑)策略分析最终分区数 (Repartition)单文件大小预期效果
Dim30 GB存储优先:$30GB/128MB \approx 240$。 且 $240 > 90$,CPU 吃饱。240~128 MBHDFS 完美存储并发度极高
Info10 GB计算优先:$10GB/128MB \approx 80$。 80 < 90,为跑满 CPU,强行提至 100。100~100 MB计算速度最大化无 CPU 空转
Related3 GB计算优先:数据较小。 为跑满 CPU,强制设为 90。90~33 MB极速完成瞬间利用全集群算力
Content0.5 GB避免碎片:数据极小。 仅使用一半算力,防止生成 KB 级小文件。45~11 MB平衡拒绝小文件风暴

7.5 最终落地配置

A. 提交脚本 (spark-submit参数)

显式排除 Master 节点,申请 45 个 Executor 榨干 Worker 节点性能。

spark-submit \ --master yarn \ --deploy-mode client \ --name 'Data_Gen_Production' \ --driver-memory 4g \ --executor-memory 4g \ --executor-cores 2 \ --num-executors 45 \ --conf spark.default.parallelism=180 \ --conf spark.sql.shuffle.partitions=180 \ --conf "spark.yarn.excludeNodes=cdh240,10.8.15.240" \ your_script.py
B. 代码逻辑 (PARTITION_CONF)

将上述计算的分区数值固化到代码中。

# 基于 90 Core 并发算力的生产环境配置 PARTITION_CONF = { "dim_enterprise": 240, # 存储对齐 (128MB) "info_bidding": 100, # 算力对齐 (跑满 CPU) "related_company": 90, # 算力对齐 (跑满 CPU) "content_table": 45 # 碎片控制 (半数并发) } ​ def write_optimized(df, table_name): # 获取科学计算后的分区数,默认为 200 target_p = PARTITION_CONF.get(table_name, 200) print(f">>> [生产模式] 表 {table_name} -> 重分区数: {target_p}") df.repartition(target_p) \ .write \ .mode("overwrite") \ .insertInto(table_name)

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询