【Flink银行反欺诈系统设计方案】1.短时间内多次大额交易场景的flink与cep的实现
- 开源代码
- 2025-09-11 12:15:02

【flink应用系列】1.Flink银行反欺诈系统设计方案 1. 经典案例:短时间内多次大额交易1.1 场景描述1.2 风险判定逻辑 2. 使用Flink实现2.1 实现思路2.2 代码实现2.3 使用Flink流处理 3. 使用Flink CEP实现3.1 实现思路3.2 代码实现 4. 总结 1. 经典案例:短时间内多次大额交易 1.1 场景描述
规则1:单笔交易金额超过10,000元。
规则2:同一用户在10分钟内进行了3次或更多次交易。
风险行为:同时满足规则1和规则2的交易行为。
1.2 风险判定逻辑检测每笔交易是否满足“单笔交易金额超过10,000元”。
对同一用户,统计10分钟内的交易次数。
如果交易次数达到3次或更多,则判定为风险行为。
2. 使用Flink实现 2.1 实现思路使用Flink的KeyedStream按用户分组。
使用ProcessFunction实现自定义窗口逻辑,统计10分钟内的交易次数。
结合规则1和规则2,判断是否为风险行为。
2.2 代码实现 // 定义交易数据POJO public class Transaction { private String transactionId; private String userId; private Double amount; private Long timestamp; // getters and setters } // 定义风控结果POJO public class RiskResult { private String userId; private String transactionId; private String riskLevel; private String actionTaken; private Long createTime; // getters and setters } // 实现风控逻辑 public class FraudDetectionProcessFunction extends KeyedProcessFunction<String, Transaction, RiskResult> { private transient ValueState<Integer> transactionCountState; private transient ValueState<Long> timerState; @Override public void open(Configuration parameters) { // 初始化状态 ValueStateDescriptor<Integer> countDescriptor = new ValueStateDescriptor<>( "transactionCount", Types.INT ); transactionCountState = getRuntimeContext().getState(countDescriptor); ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>( "timerState", Types.LONG ); timerState = getRuntimeContext().getState(timerDescriptor); } @Override public void processElement( Transaction transaction, Context ctx, Collector<RiskResult> out) throws Exception { // 规则1:单笔交易金额超过10,000元 if (transaction.getAmount() > 10000) { // 更新交易次数 Integer count = transactionCountState.value(); if (count == null) { count = 0; } count += 1; transactionCountState.update(count); // 如果是第一次满足规则1,设置10分钟的定时器 if (count == 1) { long timer = ctx.timestamp() + 10 * 60 * 1000; // 10分钟 ctx.timerService().registerEventTimeTimer(timer); timerState.update(timer); } // 规则2:10分钟内交易次数达到3次 if (count >= 3) { RiskResult result = new RiskResult(); result.setUserId(transaction.getUserId()); result.setTransactionId(transaction.getTransactionId()); result.setRiskLevel("HIGH"); result.setActionTaken("ALERT"); result.setCreateTime(System.currentTimeMillis()); out.collect(result); } } } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<RiskResult> out) throws Exception { // 定时器触发时,重置状态 transactionCountState.clear(); timerState.clear(); } } 2.3 使用Flink流处理java
DataStream<Transaction> transactionStream = env.addSource(transactionSource); DataStream<RiskResult> riskResultStream = transactionStream .keyBy(Transaction::getUserId) .process(new FraudDetectionProcessFunction()); riskResultStream.addSink(new AlertSink()); 3. 使用Flink CEP实现Flink CEP(Complex Event Processing)是Flink提供的复杂事件处理库,适合处理基于时间序列的模式匹配。以下是使用Flink CEP实现上述风控规则的示例。
3.1 实现思路定义模式:检测10分钟内3次或更多次大额交易。
使用Flink CEP的模式匹配功能,匹配符合条件的事件序列。
3.2 代码实现java
// 定义交易数据POJO public class Transaction { private String transactionId; private String userId; private Double amount; private Long timestamp; // getters and setters } // 定义风控结果POJO public class RiskResult { private String userId; private List<String> transactionIds; private String riskLevel; private String actionTaken; private Long createTime; // getters and setters } // 实现风控逻辑 public class FraudDetectionCEP { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 交易数据流 DataStream<Transaction> transactionStream = env.addSource(transactionSource) .assignTimestampsAndWatermarks( WatermarkStrategy.<Transaction>forBoundedOutOfOrderness(Duration.ofSeconds(5)) .withTimestampAssigner((event, timestamp) -> event.getTimestamp()) ); // 按用户分组 KeyedStream<Transaction, String> keyedStream = transactionStream .keyBy(Transaction::getUserId); // 定义CEP模式:10分钟内3次或更多次大额交易 Pattern<Transaction, ?> pattern = Pattern.<Transaction>begin("first") .where(new SimpleCondition<Transaction>() { @Override public boolean filter(Transaction transaction) { return transaction.getAmount() > 10000; } }) .next("second") .where(new SimpleCondition<Transaction>() { @Override public boolean filter(Transaction transaction) { return transaction.getAmount() > 10000; } }) .next("third") .where(new SimpleCondition<Transaction>() { @Override public boolean filter(Transaction transaction) { return transaction.getAmount() > 10000; } }) .within(Time.minutes(10)); // 应用模式 PatternStream<Transaction> patternStream = CEP.pattern(keyedStream, pattern); // 生成风控结果 DataStream<RiskResult> riskResultStream = patternStream.process( new PatternProcessFunction<Transaction, RiskResult>() { @Override public void processMatch( Map<String, List<Transaction>> match, Context ctx, Collector<RiskResult> out) throws Exception { RiskResult result = new RiskResult(); result.setUserId(match.get("first").get(0).getUserId()); result.setTransactionIds( match.values().stream() .flatMap(List::stream) .map(Transaction::getTransactionId) .collect(Collectors.toList()) ); result.setRiskLevel("HIGH"); result.setActionTaken("ALERT"); result.setCreateTime(System.currentTimeMillis()); out.collect(result); } } ); // 输出结果 riskResultStream.addSink(new AlertSink()); env.execute("Fraud Detection with Flink CEP"); } } 4. 总结Flink实现:通过KeyedProcessFunction和状态管理实现多规则匹配。
Flink CEP实现:通过定义复杂事件模式,简化多规则匹配的逻辑。
适用场景:
Flink适合需要自定义逻辑的场景。
Flink CEP适合基于时间序列的模式匹配场景。
通过以上实现,可以高效检测银行交易中的风险行为,并根据需要扩展更多规则
【Flink银行反欺诈系统设计方案】1.短时间内多次大额交易场景的flink与cep的实现由讯客互联开源代码栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“【Flink银行反欺诈系统设计方案】1.短时间内多次大额交易场景的flink与cep的实现”
下一篇
策略模式的C++实现示例