1. Flink UDF 一共有哪几种?
Flink 当前主要区分这些函数类型:
- ScalarFunction:一行进,一值出
- AsyncScalarFunction:一行进,一值出(异步)
- TableFunction(UDTF):一行进,多行出
- AsyncTableFunction:一行进,多行出(异步)
- AggregateFunction(UDAGG):多行进,一值出(聚合)
- TableAggregateFunction(UDTAGG):多行进,多行出(表聚合)
- Process Table Function(PTF):表进表出,带状态/定时器/变更流能力的“自定义算子”
你可以把它理解成两条主线:
- 行级:Scalar / UDTF(输入是“行/字段”)
- 聚合级:UDAGG / UDTAGG(输入是“多行”)
- 算子级:PTF(直接造一个“可持久化状态 + 定时器”的表算子)
2. 先把共性讲清楚:UDF 的基本实现原则
2.1 函数类要求
- 必须继承对应基类(如
ScalarFunction、AggregateFunction等) - 类必须
public、不能是匿名类/非 static 内部类 - 如果要放到持久化 catalog 中:需要默认构造器,运行期可实例化
2.2 运行时真正会被调用的是“评价方法”
不同函数类型方法名不同:
- Scalar:
eval(...) - UDAGG:
createAccumulator()+accumulate(...)+getValue(...) - UDTAGG:
createAccumulator()+accumulate(...)+emitValue(...)或emitUpdateWithRetract(...) - Async:
eval(CompletableFuture<...>, ...)(第一个参数永远是 future)
这些方法必须:
public- 非
static - 名字必须严格匹配(因为运行时是 codegen 调用)
2.3 类型推断(Type Inference)
默认通过反射从方法签名推断参数/返回类型;不够时用:
@DataTypeHint@FunctionHint
或直接 overridegetTypeInference()做高级逻辑(例如“用字符串字面量决定返回类型”)。
2.4 可重载、可变参、继承入参
你可以写多个eval/accumulate重载,也可以 varargs,但注意:
- Scala 写 varargs 要加
@scala.annotation.varargs - 推荐用 boxed primitives(
Integer而不是int)以支持 NULL
3. ScalarFunction:最常用的“字段加工厂”
3.1 适用场景
- 清洗字段、格式化、截断、脱敏、哈希、拼接、规则映射
- 只依赖当前行,不需要跨行状态
3.2 示例:Substring
importorg.apache.flink.table.functions.ScalarFunction;publicstaticclassSubstringFunctionextendsScalarFunction{publicStringeval(Strings,Integerbegin,Integerend){returns.substring(begin,end);}}调用方式(Table API & SQL):
// inlineenv.from("MyTable").select(call(SubstringFunction.class,$("myField"),5,12));// registerenv.createTemporarySystemFunction("SubstringFunction",SubstringFunction.class);env.sqlQuery("SELECT SubstringFunction(myField, 5, 12) FROM MyTable");3.3 小技巧:通配符参数 $(“*”)
当你想把整行所有字段都塞给函数:
publicstaticclassMyConcatFunctionextendsScalarFunction{publicStringeval(Object...fields){returnArrays.stream(fields).map(Object::toString).collect(Collectors.joining(","));}}env.from("MyTable").select(call(MyConcatFunction.class,$("*")));4. AsyncScalarFunction:把“慢 I/O”从主线程里挪走
4.1 适用场景
- 外部维表查询、HTTP/RPC、数据库查询、缓存 miss 回源
- 典型瓶颈是等待(而不是 CPU)
4.2 核心点
eval第一个参数必须是CompletableFuture<R>- 通过线程池/回调完成 future
- Flink 保证输出顺序与输入顺序一致(即使完成顺序不同)
importorg.apache.flink.table.functions.AsyncScalarFunction;importjava.util.concurrent.*;publicstaticclassBeverageNameLookupFunctionextendsAsyncScalarFunction{privatetransientExecutorexecutor;@Overridepublicvoidopen(FunctionContextcontext){executor=Executors.newFixedThreadPool(10);}publicvoideval(CompletableFuture<String>future,IntegerbeverageId){executor.execute(()->{try{Thread.sleep(1000);}catch(InterruptedExceptionignored){}switch(beverageId){case0:future.complete("Latte");break;case1:future.complete("Cappuccino");break;default:future.completeExceptionally(newIllegalArgumentException("Bad id"));}});}}常用配置:
table.exec.async-scalar.max-concurrent-operationstable.exec.async-scalar.timeouttable.exec.async-scalar.retry-strategy / retry-delay / max-attempts
5. TableFunction(UDTF):一行拆成多行
5.1 适用场景
- 分词、拆分数组/字符串、解析 JSON 数组、生成多条明细
- 类似 SQL 的“explode / lateral view”
5.2 示例:SplitFunction
importorg.apache.flink.table.functions.TableFunction;importorg.apache.flink.types.Row;publicstaticclassSplitFunctionextendsTableFunction<Row>{publicvoideval(Stringstr){for(Strings:str.split(" ")){collect(Row.of(s,s.length()));}}}Table API 调用(LATERAL JOIN):
env.from("MyTable").joinLateral(call(SplitFunction.class,$("myField"))).select($("myField"),$("word"),$("length"));SQL 调用:
SELECTmyField,word,lengthFROMMyTable,LATERALTABLE(SplitFunction(myField));6. AsyncTableFunction:异步的“多行输出”
它和 AsyncScalarFunction 的差异只有一个:结果是Collection<T>(多条),其余理念一致:
importorg.apache.flink.table.functions.AsyncTableFunction;importjava.util.concurrent.*;publicstaticclassBackgroundFunctionextendsAsyncTableFunction<Long>{privateExecutorexecutor;@Overridepublicvoidopen(FunctionContextcontext){executor=Executors.newFixedThreadPool(10);}publicvoideval(CompletableFuture<Collection<Long>>future,IntegerwaitMax){executor.execute(()->{ArrayList<Long>result=newArrayList<>();result.add(1L);result.add(2L);future.complete(result);});}}7. UDAGG(AggregateFunction):多行进,一值出,核心是 accumulator
7.1 为什么一定要 accumulator?
因为聚合不是一次算完,而是“边来边算”。Flink 会为每个 group/window 创建一个 accumulator,输入每来一行就accumulate(...)更新它,最终getValue(...)产出结果。
用 MAX(price) 来理解最直观(你给的图就是这个过程):
createAccumulator()造一个空状态- 每条 price 调用
accumulate(acc, price) - 最后
getValue(acc)返回最大值
7.2 示例:WeightedAvg(包含 retract/merge/reset)
importorg.apache.flink.table.functions.AggregateFunction;publicstaticclassWeightedAvgAccumulator{publiclongsum=0;publicintcount=0;}publicstaticclassWeightedAvgextendsAggregateFunction<Long,WeightedAvgAccumulator>{@OverridepublicWeightedAvgAccumulatorcreateAccumulator(){returnnewWeightedAvgAccumulator();}@OverridepublicLonggetValue(WeightedAvgAccumulatoracc){returnacc.count==0?null:acc.sum/acc.count;}publicvoidaccumulate(WeightedAvgAccumulatoracc,Longvalue,Integerweight){acc.sum+=value*weight;acc.count+=weight;}publicvoidretract(WeightedAvgAccumulatoracc,Longvalue,Integerweight){acc.sum-=value*weight;acc.count-=weight;}publicvoidmerge(WeightedAvgAccumulatoracc,Iterable<WeightedAvgAccumulator>it){for(WeightedAvgAccumulatora:it){acc.sum+=a.sum;acc.count+=a.count;}}publicvoidresetAccumulator(WeightedAvgAccumulatoracc){acc.sum=0L;acc.count=0;}}7.3 什么时候必须实现 retract / merge?
retract(...):OVER 聚合、需要更新撤回语义时经常需要merge(...):两阶段聚合优化、session window 等场景常常是必须项
一句话:你不实现,很多算子/优化就用不了或直接报错。
8. UDTAGG(TableAggregateFunction):多行进,多行出,比如 TOP2 / TOPN
UDTAGG 的 accumulator 一样是状态核心,只是输出不是单值,而是通过emitValue(...)(或更高效的emitUpdateWithRetract(...))把结果“吐”出去。
你给的 TOP2 图非常典型:
- accumulate 过程中不断维护 first/second
- emitValue 时输出两行: (max, rank=1)、(second, rank=2)
8.1 TOP2 示例
importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.table.functions.TableAggregateFunction;importorg.apache.flink.util.Collector;publicstaticclassTop2Accumulator{publicIntegerfirst;publicIntegersecond;}publicstaticclassTop2extendsTableAggregateFunction<Tuple2<Integer,Integer>,Top2Accumulator>{@OverridepublicTop2AccumulatorcreateAccumulator(){Top2Accumulatoracc=newTop2Accumulator();acc.first=Integer.MIN_VALUE;acc.second=Integer.MIN_VALUE;returnacc;}publicvoidaccumulate(Top2Accumulatoracc,Integervalue){if(value>acc.first){acc.second=acc.first;acc.first=value;}elseif(value>acc.second){acc.second=value;}}publicvoidmerge(Top2Accumulatoracc,Iterable<Top2Accumulator>it){for(Top2Accumulatorother:it){accumulate(acc,other.first);accumulate(acc,other.second);}}publicvoidemitValue(Top2Accumulatoracc,Collector<Tuple2<Integer,Integer>>out){if(acc.first!=Integer.MIN_VALUE)out.collect(Tuple2.of(acc.first,1));if(acc.second!=Integer.MIN_VALUE)out.collect(Tuple2.of(acc.second,2));}}8.2 为什么 emitUpdateWithRetract 更适合流式 TOPN?
emitValue()每次都把 TOPN 全量输出一遍,流式场景更新频繁时会很“吵”。emitUpdateWithRetract()可以:
- 先 retract 老值(update-before)
- 再 collect 新值(update-after)
只输出增量更新,吞吐和下游压力都更好。
重要提醒:不要在emitUpdateWithRetract里修改 accumulator(这是很多人踩坑点)。
9. Determinism、Constant Folding 与 Named Parameters:容易忽略但很关键
9.1 isDeterministic()
- 默认 true
- 不是纯函数(随机、时间、外部状态)就返回 false
它会影响:是否允许规划期做常量折叠(constant folding),以及是否每条记录都要算。
9.2 supportsConstantFolding()
- 默认允许常量折叠
- 有副作用的函数建议关闭,否则你以为“运行时会执行”,结果在 plan 阶段就被算成常量了。
9.3 Named Parameters(命名参数)
可以用@ArgumentHint定义参数名、可选性、类型;SQL 里用param1 => xxx调用更易读。
限制:类里不能有重载方法/可变参,否则命名参数会报错(这点很容易踩雷)。
10. PTF(Process Table Function):UDF 的“终极形态”
如果你觉得:
- Scalar/UDTF 只能处理单行
- UDAGG/UDTAGG 只能做聚合
但你想做的是: - 带状态的规则引擎
- 复杂事件处理(CEP-like)
- 等待/超时/同步(需要 timer)
- 自定义窗口/缓冲/回溯读取
那 PTF 就是你的舞台。
PTF 的定位可以理解为:在 Table 世界里实现“自定义算子”,它能访问:
- Flink managed state
- event-time / processing-time timers
- 表的 changelog 语义(insert/update/delete)
一句话总结:PTF 是把 DataStream 的 RichFunction + Keyed State + TimerService 能力“搬进”Table 生态的方案。
11. 选型速查:到底用哪一个?
- 只是字段转换:ScalarFunction
- 慢 I/O enrich:AsyncScalarFunction
- 一行拆多行:TableFunction(UDTF)
- 异步拆多行:AsyncTableFunction
- 多行聚合成一个值:UDAGG
- 多行聚合成多行(TOPN、分位数输出多段等):UDTAGG
- 需要状态 + 定时器 + 更复杂的表级处理:PTF
12. 实战建议:写 UDF 更“稳”的几个原则
- 先想清楚你需要的语义:append-only 还是 retract/update?决定你要不要实现 retract / emitUpdateWithRetract
- accumulator 结构要小而稳:别把大集合直接塞普通对象字段,必要时用 DataView(ListView/MapView)
- Async 函数要控制并发与超时:并发太大 = 外部系统雪崩,超时不控 = 任务卡死
- 对外部依赖要做降级:completeExceptionally + 合理 retry 策略
- 注册方式优先用 class(无参构造),实例方式适合交互调试或需要参数化构造的场景