1. Vectorized UDF 是什么:Arrow 列式批传输 + Pandas 计算
向量化 UDF 的执行方式是:
1)Flink 把输入数据按 batch 切分
2)每个 batch 转为 Arrow columnar format 在 JVM 与 Python VM 之间传递
3)Python 侧把 batch 转为pandas.Series(标量函数)或pandas.Series列集合(聚合函数)
4)你的函数对整批数据向量化计算,返回结果
因此相对逐行 UDF,向量化 UDF 通常更快,原因是:
- 批量传输:减少 JVM/Python 往返次数
- 列式传输:减少反序列化成本
- Pandas/Numpy:底层实现优化,向量化运算效率高
前置要求(文档强调):
- Python 版本:3.9 / 3.10 / 3.11 / 3.12
- 客户端与集群侧都要安装 PyFlink(否则 Python UDF 无法执行)
2. 向量化标量函数:pandas.Series → pandas.Series(长度必须一致)
2.1 规则
- 输入:一个或多个
pandas.Series - 输出:一个
pandas.Series,长度必须与输入 batch 一致 - 使用方式:与普通 scalar UDF 一样,只要在 decorator 里加
func_type="pandas"
2.2 示例:两列相加(Table API 与 SQL 都能用)
frompyflink.tableimportTableEnvironment,EnvironmentSettingsfrompyflink.table.expressionsimportcolfrompyflink.table.udfimportudf@udf(result_type='BIGINT',func_type="pandas")defadd(i,j):returni+j settings=EnvironmentSettings.in_batch_mode()table_env=TableEnvironment.create(settings)# Table APImy_table.select(add(col("bigint"),col("bigint")))# SQLtable_env.create_temporary_function("add",add)table_env.sql_query("SELECT add(bigint, bigint) FROM MyTable")2.3 batch 大小怎么调:python.fn-execution.arrow.batch.size
Flink 会把输入切成 batch 再调用 UDF,batch size 由配置项控制:
python.fn-execution.arrow.batch.size
经验建议(不写玄学参数,只讲原则):
- batch 太小:函数调用次数多,开销大
- batch 太大:单次内存占用变大,容易 GC 或 OOM(尤其是字符串/复杂类型)
3. 向量化聚合函数(Pandas UDAF):pandas.Series → 单个标量
3.1 规则与限制(这是生产最容易踩坑的地方)
输入:一列或多列
pandas.Series输出:单个标量值
重要限制:
- 返回类型暂不支持
RowType和MapType - 不支持 partial aggregation(部分聚合)
- 执行时一个 group/window 的数据会一次性加载到内存:
必须确保单个 group/window 数据能放进内存
- 返回类型暂不支持
适用范围(文档列出):
- GroupBy Aggregation(Batch)
- GroupBy Window Aggregation(Batch + Stream)
- Over Window Aggregation(Batch + Stream 的 bounded over window)
3.2 示例:mean_udaf(GroupBy / Window / Over)
frompyflink.tableimportTableEnvironment,EnvironmentSettingsfrompyflink.table.expressionsimportcol,litfrompyflink.table.udfimportudaffrompyflink.table.windowimportTumble@udaf(result_type='FLOAT',func_type="pandas")defmean_udaf(v):returnv.mean()settings=EnvironmentSettings.in_batch_mode()table_env=TableEnvironment.create(settings)# my_table schema: [a: String, b: BigInt, c: BigInt, rowtime: ...]my_table=...# 1) GroupBymy_table.group_by(col('a')).select(col('a'),mean_udaf(col('b')))# 2) Tumble Windowtumble_window=(Tumble.over(lit(1).hours).on(col("rowtime")).alias("w"))my_table.window(tumble_window)\.group_by(col("w"))\.select(col('w').start,col('w').end,mean_udaf(col('b')))# 3) Over Window(bounded)table_env.create_temporary_function("mean_udaf",mean_udaf)table_env.sql_query(""" SELECT a, mean_udaf(b) OVER (PARTITION BY a ORDER BY rowtime ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) FROM MyTable """)4. 五种定义 Pandas UDAF 的方式:从最简单到最工程化
文档给了一个统一目标:输入两列 bigint,返回i.max() + j.max()。下面是五种常见写法。
4.1 方式 1:继承 AggregateFunction(可在 open() 里加 metrics 等)
适合:你要做指标、缓存、参数读取、复杂逻辑封装。
frompyflink.table.udfimportAggregateFunction,udafclassMaxAdd(AggregateFunction):defopen(self,function_context):mg=function_context.get_metric_group()self.counter=mg.add_group("key","value").counter("my_counter")self.counter_sum=0defcreate_accumulator(self):return[]defaccumulate(self,accumulator,*args):result=0forarginargs:result+=arg.max()accumulator.append(result)defget_value(self,accumulator):self.counter.inc(10)self.counter_sum+=10returnaccumulator[0]max_add=udaf(MaxAdd(),result_type='BIGINT',func_type="pandas")4.2 方式 2:装饰器函数(最常用、最清爽)
frompyflink.table.udfimportudaf@udaf(result_type='BIGINT',func_type="pandas")defmax_add(i,j):returni.max()+j.max()4.3 方式 3:lambda(小 demo 可用,生产不建议写复杂逻辑)
max_add=udaf(lambdai,j:i.max()+j.max(),result_type='BIGINT',func_type="pandas")4.4 方式 4:callable 对象(适合带状态但又不想继承基类)
classCallableMaxAdd(object):def__call__(self,i,j):returni.max()+j.max()max_add=udaf(CallableMaxAdd(),result_type='BIGINT',func_type="pandas")4.5 方式 5:partial(把常量参数“固化”进去)
importfunctoolsfrompyflink.table.udfimportudafdefpartial_max_add(i,j,k):returni.max()+j.max()+k max_add=udaf(functools.partial(partial_max_add,k=1),result_type='BIGINT',func_type="pandas")5. 生产落地的“关键避坑点”
5.1 Pandas UDAF 的内存风险:group/window 太大会炸
因为:
- 不支持 partial aggregation
- 一个 group/window 的数据会一次性加载到内存
所以如果你的 key 高基数但存在“超级大 key”(热点 key),Pandas UDAF 很容易把某个 Task 的内存顶爆。
应对策略(原则级):
- 避免在 Pandas UDAF 上做可能出现超大分组的计算
- 对热点 key 做预聚合/分桶(如果业务允许)
- 对窗口长度、数据倾斜要有监控与保护(例如先做过滤、采样评估)
5.2 返回类型限制:暂不支持 RowType / MapType
很多人想让 Pandas UDAF 返回多个指标(例如 mean+max+min),但文档明确说return type 不支持 RowType/MapType(至少“目前”不支持)。这种情况通常有两种做法:
- 拆成多个 UDAF(mean_udaf、max_udaf…)
- 或者先 pandas 侧算出多个标量,再在 Table/SQL 层组合(视版本能力而定)
5.3 标量函数必须返回等长 Series
向量化标量 UDF 的输出 Series 长度必须与输入 batch 一致,否则结果对不齐,会直接报错或产生不可用结果。
6. 最佳实践:什么时候该用向量化 UDF?
优先用 Pandas UDF 的场景:
- 纯计算/数值处理明显多于 JVM↔Python 往返开销
- 适合向量化(Series 级别运算能替代 for 循环)
- 需要利用 Pandas/Numpy 的生态能力(rolling、统计、向量操作等)
慎用或避免的场景:
- 超大 group/window 的聚合(Pandas UDAF 内存压力)
- 需要返回复杂结构(Row/Map)作为聚合结果
- 逻辑高度分支、逐行差异巨大,向量化收益不明显