项目地址spline-spark-agent
案例分析:za.co.absa.spline.harvester.postprocessing.PostProcessor
trait PostProcessingFilter extends NamedEntity { def processExecutionEvent(event: ExecutionEvent, ctx: HarvestingContext): ExecutionEvent def processExecutionPlan(plan: ExecutionPlan, ctx: HarvestingContext): ExecutionPlan def processReadOperation(op: ReadOperation, ctx: HarvestingContext): ReadOperation def processWriteOperation(op: WriteOperation, ctx: HarvestingContext): WriteOperation def processDataOperation(op: DataOperation, ctx: HarvestingContext): DataOperation
}class PostProcessor(filters: Seq[PostProcessingFilter], ctx: HarvestingContext) { // 入参类型: (E, HarvestingContext) => E, 返回类型: E => E// 其是将二元函数转换至一元函数,即(E, HarvestingContext) => E --转换至--> E => E// 其实质就是ctx存储在 E => E 函数内部private def provideCtx[E](f: (E, HarvestingContext) => E): E => E = (e: E) => f(e, ctx) // 入参类型: PostProcessingFilter => (E, HarvestingContext) => E的函数,返回类型: E => E 的 函数 // 其中入参类型是 输入PostProcessingFilter并返回(E, HarvestingContext) => E函数的函数;// 其中 (E, HarvestingContext) => E 函数是入入参类型是(E, HarvestingContext),返回类型:E的函数private def chainFilters[E](f: PostProcessingFilter => (E, HarvestingContext) => E): E => E = filters // 入参类型:PostProcessingFilter,返回类型:(E, HarvestingContext) => E // 即遍历filters取其中一个filter执行f(filter) --> 返回(E, HarvestingContext) => E).map(f(_)) // 入参类型: provideCtx((E, HarvestingContext) => E),返回类型: E => E// 即遍历(E, HarvestingContext) => E) --> E => E.map(provideCtx)// 入参类型:E => E,返回类型: E => E.reduceOption(_.andThen(_)) // 入参类型:E => E,返回类型: E => E.getOrElse(identity) def process(event: ExecutionEvent): ExecutionEvent = // 此处很难理解:// 一、拆解:chainFilters(_.processExecutionEvent) 其返回类型:E => E// chainFilters需要的入参类型: PostProcessingFilter => (E, HarvestingContext) => E// 提供的入参类型: _.processExecutionEvent// 仔细观察processExecutionEvent其 def processExecutionEvent(event: ExecutionEvent, ctx: HarvestingContext): ExecutionEvent 正好是 (E, HarvestingContext) => E;// scala中`_`代表遍历中的一个对象,此处是filters中的一个对象// 因此就是filters中的一个对象的processExecutionEvent方法(函数作为入参)// 二、拆解:(E => E)(event)// 即将event对象传递给函数类型 E => E 作为入参,此处E => E是代表函数类型实际函数可能是 (e:E):E => func(e) ,func同样返回E类型chainFilters(_.processExecutionEvent)(event)
}