1. 配置入口:DataStream vs Table API
1.1 DataStream API:用 Configuration 创建 env
frompyflink.commonimportConfigurationfrompyflink.datastreamimportStreamExecutionEnvironment config=Configuration()config.set_integer("python.fn-execution.bundle.size",1000)config.set_integer("python.fn-execution.arrow.batch.size",1000)env=StreamExecutionEnvironment.get_execution_environment(config)特点:
- 更推荐用于混用 DataStream + Table 的场景(官方也强调:混用时优先用 DataStream API 的方式配置依赖/参数,覆盖面更完整)
1.2 Table API:t_env.get_config().set 或 EnvironmentSettings.with_configuration
frompyflink.tableimportTableEnvironment,EnvironmentSettings t_env=TableEnvironment.create(EnvironmentSettings.in_streaming_mode())t_env.get_config().set("python.fn-execution.bundle.size","1000")或:
frompyflink.commonimportConfigurationfrompyflink.tableimportTableEnvironment,EnvironmentSettings config=Configuration()config.set_string("python.fn-execution.bundle.size","1000")env_settings=EnvironmentSettings.new_instance()\.in_streaming_mode()\.with_configuration(config)\.build()t_env=TableEnvironment.create(env_settings)2. 你最该关注的 8 个配置项(调优优先级从高到低)
下面是 PyFlink 里“最常用、最有效、最容易踩坑”的配置项组合。
2.1 bundle.size / bundle.time:吞吐 vs 延迟的总开关
python.fn-execution.bundle.size(默认 1000)- 越大:吞吐更高(函数调用次数更少),但占用更多内存、延迟更高
python.fn-execution.bundle.time(默认 1000ms)- 越小:尾延迟更低(更快 flush),但吞吐可能下降
经验建议:
- 偏吞吐(批处理/离线):bundle.size 2000~10000,bundle.time 1000~5000ms
- 偏低延迟(实时):bundle.size 200~1000,bundle.time 50~300ms
2.2 arrow.batch.size:Pandas/Arrow 向量化的核心旋钮
python.fn-execution.arrow.batch.size(默认 1000)- 文档明确:arrow.batch.size 不应超过 bundle.size,否则会被 bundle.size “压住”。
经验建议:
- 你用了 Pandas UDF/向量化:arrow.batch.size 512/1000/2048 逐级试
- 你没用 Pandas:这个影响不大,保持默认即可
2.3 python.execution-mode:PROCESS vs THREAD(性能与兼容性)
python.execution-mode:process(默认) /threadTHREAD 目的是减少进程间通信与序列化开销,但:
- 会受 GIL 影响
- 很多场景会自动回退到 process
- Table API 中:Python UDAF / Pandas UDF&UDAF 等不支持 THREAD(你前面贴过支持矩阵)
经验建议:
- 追求“稳”:先 process
- 明确知道自己只是用基础 Map/FlatMap/Filter(DataStream)或普通 Python UDF(Table),再试 thread
- 线上一定要通过日志/metrics确认是否回退
2.4 python.fn-execution.memory.managed:Python Worker 用哪块内存
python.fn-execution.memory.managed(默认 true)- true:Python worker 使用 task slot 的managed memory 预算
- false:走 task off-heap,需要你配置
taskmanager.memory.task.off-heap.size
经验建议:
- 没特殊理由,保持 true
- 你遇到 Python worker 内存被挤爆或 OOM,才考虑配合 slot 资源与 off-heap 做更细粒度隔离
2.5 python.operator-chaining.enabled:算子链(性能常用大招)
- 默认 true:非 shuffle 的 Python 算子会链起来,减少序列化/反序列化
- 关闭 chaining:通常用于某个算子输出爆炸(flat_map)导致链路不均衡,或需要不同并行度/slot group
经验建议:
默认开
出现:
- 某个 flat_map 产出极多、导致下游算子背压异常
- 或者你想让某段逻辑独立扩容
再考虑关 chaining 或用start_new_chain/disable_chaining
2.6 python.metric.enabled:指标开关(极端性能场景用)
- 默认 true
- 关掉可以减轻一些开销(通常不是第一优先级)
经验建议:
- 正常保持 true
- 你在极限压测、且 Python 指标采集确实成为瓶颈时再关
2.7 python.profile.enabled:Python worker profiling
- 默认 false
- 打开后会周期性输出 profiling 结果到 TaskManager 日志,周期受 bundle.size/time 影响
经验建议:
- 调优/排障期打开
- 生产长期打开要谨慎(日志量 + 一定开销)
2.8 python.systemenv.enabled:是否加载系统环境变量
- 默认 true
- 你需要更“干净”的 worker 环境(避免系统 env 干扰)时可关
3. 依赖类配置:python.files / python.archives / python.requirements / python.executable
这 4 个经常一起用,作用完全不同:
python.files:把.py/.zip/.whl/目录加到 worker 的 PYTHONPATH(常用于你自己的代码包)python.archives:上传并解压归档(zip/tar),常用于模型文件/数据文件/虚拟环境python.requirements:requirements.txt(可加离线 wheel 缓存目录),worker 侧 pip installpython.executable:指定 worker 使用哪个 Python(支持指向 archive 内的解释器路径)python.client.executable:客户端(提交端)用于解析 Python UDF 的解释器
线上强烈建议的组合:
- 业务代码:python.files
- 第三方包:python.requirements + cached_dir(离线部署时尤其重要)
- venv/模型:python.archives
- worker python:python.executable 指向 venv 的 python
4. 三套“可直接抄”的配置模板
4.1 实时低延迟(更快 flush、较小批)
适合:在线计算、延迟敏感、单条处理快
frompyflink.commonimportConfigurationfrompyflink.datastreamimportStreamExecutionEnvironment config=Configuration()config.set_string("python.execution-mode","process")config.set_integer("python.fn-execution.bundle.size",300)config.set_integer("python.fn-execution.bundle.time",100)# msconfig.set_integer("python.fn-execution.arrow.batch.size",300)# <= bundle.sizeconfig.set_boolean("python.operator-chaining.enabled",True)env=StreamExecutionEnvironment.get_execution_environment(config)4.2 高吞吐批处理(更大 bundle/批)
适合:离线、吞吐优先、可接受更高延迟
frompyflink.commonimportConfigurationfrompyflink.datastreamimportStreamExecutionEnvironment config=Configuration()config.set_string("python.execution-mode","process")config.set_integer("python.fn-execution.bundle.size",5000)config.set_integer("python.fn-execution.bundle.time",2000)config.set_integer("python.fn-execution.arrow.batch.size",2048)config.set_boolean("python.fn-execution.memory.managed",True)env=StreamExecutionEnvironment.get_execution_environment(config)4.3 THREAD 模式尝鲜(只推荐在“确定支持”的作业)
适合:DataStream 里基础算子为主、UDF 逻辑不重、瓶颈在进程通信
frompyflink.commonimportConfigurationfrompyflink.datastreamimportStreamExecutionEnvironment config=Configuration()config.set_string("python.execution-mode","thread")config.set_integer("python.fn-execution.bundle.size",1000)config.set_integer("python.fn-execution.bundle.time",500)env=StreamExecutionEnvironment.get_execution_environment(config)注意:如果你用到了 THREAD 不支持的点,最终会回退到 process(务必验证)。
5. 一个很实用的调参顺序(不走弯路)
- 先确保类型信息/序列化没坑(DataStream 的 output_type、Table 的 changelog sink 能接住)
- 调
bundle.size(吞吐) +bundle.time(延迟) - 如果用 Pandas/Arrow:再调
arrow.batch.size - 确认内存是否稳定:必要时考虑 managed/off-heap 预算
- 最后再考虑
thread、metric.enabled、chaining 等“更偏工程化”的选项