眉山市网站建设_网站建设公司_Bootstrap_seo优化
2026/1/11 22:02:45 网站建设 项目流程

1)如何准备 Python 虚拟环境(venv.zip)

场景

你本地跑 PyFlink 没问题,但一提交到远程集群就报:

  • ModuleNotFoundError
  • Python 版本不对
  • pandas/pyarrow/apache-beam 版本不匹配

根因几乎都是:集群机器上 Python 环境与你本地不一致。最佳做法是把可运行的 Python 环境“打包随任务走”。

官方便捷脚本(Linux/macOS)

shsetup-pyflink-virtual-env.sh2.2.0

含义:按指定 PyFlink 版本,准备一套可用的 Python 虚拟环境压缩包(通常输出venv.zip)。

本地执行(Local)

sourcevenv/bin/activate python xxx.py

集群执行(Cluster:核心是 add_python_archive + set_python_executable)

# 1) 上传/分发 venv.zip(会在 worker 端解压到工作目录)table_env.add_python_archive("venv.zip")# 2) 指定 worker 端用哪个 python 解释器跑 UDFtable_env.get_config().set_python_executable("venv.zip/venv/bin/python")

易错点(一定写进博客)

  • add_python_archive("venv.zip")解压后的目录名通常就是venv.zip/...(除非你指定了 target_dir)
  • set_python_executable(...)必须用相对路径指向 worker 工作目录下的 python
  • 如果你的集群是 Linux,venv 也必须在 Linux 上构建;不要在 Windows 打包 venv.zip 给 Linux 用

2)如何添加 Jar(Connector / Java UDF 等)

什么时候需要

只要你用了任何 Java/Scala 侧实现的东西,基本都要 jar,例如:

  • Kafka / JDBC / Elasticsearch / Hudi / Iceberg 连接器
  • 各种 format(json、avro、protobuf…)
  • Java UDF、catalog 实现等

pipeline.jars:上传到集群

# 仅支持本地 file:// URL;多个 jar 用 ; 分隔table_env.get_config().set("pipeline.jars","file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar")

特点:

  • 会把 jar 作为 job 依赖上传/分发(更适合“任务自带依赖”)

pipeline.classpaths:加入 classpath(需客户端与集群都能访问)

table_env.get_config().set("pipeline.classpaths","file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar")

特点:

  • 更像“引用外部位置的 jar”
  • 要求 URL 在 client 和 cluster 都可访问,否则运行时找不到

推荐策略

  • 初学/本地/临时任务:优先pipeline.jars
  • 企业集群统一部署 jar:用pipeline.classpaths或集群侧统一配置(但要保证路径一致)

3)如何添加 Python 依赖文件(python.files / add_python_file)

场景

你的 UDF 在my_udf.py或者工具函数在某个目录myDir/utils/...,远程执行时找不到模块。

目录结构:

myDir ├── utils │ ├── __init__.py │ └── my_util.py

添加依赖:

table_env.add_python_file("myDir")defmy_udf():fromutilsimportmy_util

关键原则

  • 只要不是“main.py 同文件定义的函数”,就强烈建议用python.files/add_python_file进行分发
  • 避免远程 worker 报ModuleNotFoundError

4)Mini Cluster/IDE 本地运行为什么“没输出”?

根因

很多 API 是异步提交

  • Table API:execute_sql(...)StatementSet.execute()
  • DataStream:execute_async(...)

如果你在 IDE/mini cluster 里运行,主进程提前退出,任务还没跑完,就看不到结果。

Table API:必须 wait

t_result=table_env.execute_sql("INSERT INTO ...")t_result.wait()

DataStream:必须 result()

job_client=stream_execution_env.execute_async("My DataStream Job")job_client.get_job_execution_result().result()

非常重要的提醒

  • 远程集群(YARN / K8s / standalone detach)通常不需要 wait
  • 你如果保留.wait(),可能会导致客户端一直阻塞,看起来像“卡住”

一页速记(放文末)

  • 打包 Python 环境:add_python_archive(venv.zip)+set_python_executable(venv.zip/venv/bin/python)
  • 带 jar 依赖:pipeline.jars(上传)优先,pipeline.classpaths(引用)谨慎
  • 带 Python 代码:add_python_file(dir_or_file),否则远程很容易 ModuleNotFound
  • IDE/mini cluster 没输出:异步 API 要.wait()/.result();远程提交记得删掉等待逻辑

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询