新聞中心
本文轉(zhuǎn)載自微信公眾號(hào)「五分鐘學(xué)大數(shù)據(jù)」,作者五分鐘學(xué)大數(shù)據(jù)。轉(zhuǎn)載本文請(qǐng)聯(lián)系五分鐘學(xué)大數(shù)據(jù)公眾號(hào)。

網(wǎng)站建設(shè)哪家好,找成都創(chuàng)新互聯(lián)!專(zhuān)注于網(wǎng)頁(yè)設(shè)計(jì)、網(wǎng)站建設(shè)、微信開(kāi)發(fā)、微信平臺(tái)小程序開(kāi)發(fā)、集團(tuán)企業(yè)網(wǎng)站建設(shè)等服務(wù)項(xiàng)目。為回饋新老客戶(hù)創(chuàng)新互聯(lián)還提供了江夏免費(fèi)建站歡迎大家使用!
我們?cè)诳粗辈サ臅r(shí)候,不管對(duì)于主播還是用戶(hù)來(lái)說(shuō),非常重要的一項(xiàng)就是彈幕文化。為了增加直播趣味性和互動(dòng)性, 各大網(wǎng)絡(luò)直播平臺(tái)紛紛采用彈窗彈幕作為用戶(hù)實(shí)時(shí)交流的方式,內(nèi)容豐富且形式多樣的彈幕數(shù)據(jù)中隱含著復(fù)雜的用戶(hù)屬性與用戶(hù)行為, 研究并理解在線直播平臺(tái)用戶(hù)具有彈幕內(nèi)容審核與監(jiān)控、輿論熱點(diǎn)預(yù)測(cè)、個(gè)性化摘要標(biāo)注等多方面的應(yīng)用價(jià)值。
本文不分析彈幕數(shù)據(jù)的應(yīng)用價(jià)值,只通過(guò)彈幕內(nèi)容審核與監(jiān)控案例來(lái)了解下Flink CEP的概念及功能。
在用戶(hù)發(fā)彈幕時(shí),直播平臺(tái)主要實(shí)時(shí)監(jiān)控識(shí)別兩類(lèi)彈幕內(nèi)容:一類(lèi)是發(fā)布不友善彈幕的用戶(hù) ;一類(lèi)是刷屏的用戶(hù)。
我們先記住上述需要實(shí)時(shí)監(jiān)控識(shí)別的兩類(lèi)用戶(hù),接下來(lái)介紹Flink CEP的API,然后使用CEP解決上述問(wèn)題。
Flink CEP
Flink CEP 是什么
Flink CEP是一個(gè)基于Flink的復(fù)雜事件處理庫(kù),可以從多個(gè)數(shù)據(jù)流中發(fā)現(xiàn)復(fù)雜事件,識(shí)別有意義的事件(例如機(jī)會(huì)或者威脅),并盡快的做出響應(yīng),而不是需要等待幾天或則幾個(gè)月相當(dāng)長(zhǎng)的時(shí)間,才發(fā)現(xiàn)問(wèn)題。
Flink CEP API
CEP API的核心是Pattern(模式) API,它允許你快速定義復(fù)雜的事件模式。每個(gè)模式包含多個(gè)階段(stage)或者我們也可稱(chēng)為狀態(tài)(state)。從一個(gè)狀態(tài)切換到另一個(gè)狀態(tài),用戶(hù)可以指定條件,這些條件可以作用在鄰近的事件或獨(dú)立事件上。
介紹API之前先來(lái)理解幾個(gè)概念:
1. 模式與模式序列
- 簡(jiǎn)單模式稱(chēng)為模式,將最終在數(shù)據(jù)流中進(jìn)行搜索匹配的復(fù)雜模式序列稱(chēng)為模式序列,每個(gè)復(fù)雜模式序列是由多個(gè)簡(jiǎn)單模式組成。
- 匹配是一系列輸入事件,這些事件通過(guò)一系列有效的模式轉(zhuǎn)換,能夠訪問(wèn)復(fù)雜模式圖的所有模式。
- 每個(gè)模式必須具有唯一的名稱(chēng),我們可以使用模式名稱(chēng)來(lái)標(biāo)識(shí)該模式匹配到的事件。
2. 單個(gè)模式
一個(gè)模式既可以是單例的,也可以是循環(huán)的。單例模式接受單個(gè)事件,循環(huán)模式可以接受多個(gè)事件。
3. 模式示例:
有如下模式:a b+ c?d
其中a,b,c,d這些字母代表的是模式,+代表循環(huán),b+就是循環(huán)模式;?代表可選,c?就是可選模式;
所以上述模式的意思就是:a后面可以跟一個(gè)或多個(gè)b,后面再可選的跟c,最后跟d。
其中a、c? 、d是單例模式,b+是循環(huán)模式。
一般情況下,模式都是單例模式,可以使用量詞(Quantifiers)將其轉(zhuǎn)換為循環(huán)模式。
每個(gè)模式可以帶有一個(gè)或多個(gè)條件,這些條件是基于事件接收進(jìn)行定義的。或者說(shuō),每個(gè)模式通過(guò)一個(gè)或多個(gè)條件來(lái)匹配和接收事件。
了解完上述概念后,接下來(lái)介紹下案例中需要用到的幾個(gè)CEP API:
案例中用到的CEP API:
- Begin:定義一個(gè)起始模式狀態(tài)
用法:start = Pattern.<:Event>begin("start");
- Next:附加一個(gè)新的模式狀態(tài)。匹配事件必須直接接續(xù)上一個(gè)匹配事件
用法:next = start.next("next");
- Where:定義當(dāng)前模式狀態(tài)的過(guò)濾條件。僅當(dāng)事件通過(guò)過(guò)濾器時(shí),它才能與狀態(tài)匹配
用法:patternState.where(_.message == "TMD");
- Within: 定義事件序列與模式匹配的最大時(shí)間間隔。如果未完成的事件序列超過(guò)此時(shí)間,則將其丟棄
用法:patternState.within(Time.seconds(10));
- Times:一個(gè)給定類(lèi)型的事件出現(xiàn)了指定次數(shù)
用法:patternState.times(5);
API 先介紹以上這幾個(gè),接下來(lái)我們解決下文章開(kāi)頭提到的案例:
監(jiān)測(cè)用戶(hù)彈幕行為案例
案例一:監(jiān)測(cè)惡意用戶(hù)
規(guī)則:用戶(hù)如果在10s內(nèi),同時(shí)輸入 TMD 超過(guò)5次,就認(rèn)為用戶(hù)為惡意攻擊,識(shí)別出該用戶(hù)。
使用 Flink CEP 檢測(cè)惡意用戶(hù):
- import org.apache.flink.api.scala._
- import org.apache.flink.cep.PatternSelectFunction
- import org.apache.flink.cep.scala.{CEP, PatternStream}
- import org.apache.flink.cep.scala.pattern.Pattern
- import org.apache.flink.streaming.api.TimeCharacteristic
- import org.apache.flink.streaming.api.scala.{DataStream, OutputTag, StreamExecutionEnvironment}
- import org.apache.flink.streaming.api.windowing.time.Time
- object BarrageBehavior01 {
- case class LoginEvent(userId:String, message:String, timestamp:Long){
- override def toString: String = userId
- }
- def main(args: Array[String]): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- // 使用IngestionTime作為EventTime
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- // 用于觀察測(cè)試數(shù)據(jù)處理順序
- env.setParallelism(1)
- // 模擬數(shù)據(jù)源
- val loginEventStream: DataStream[LoginEvent] = env.fromCollection(
- List(
- LoginEvent("1", "TMD", 1618498576),
- LoginEvent("1", "TMD", 1618498577),
- LoginEvent("1", "TMD", 1618498579),
- LoginEvent("1", "TMD", 1618498582),
- LoginEvent("2", "TMD", 1618498583),
- LoginEvent("1", "TMD", 1618498585)
- )
- ).assignAscendingTimestamps(_.timestamp * 1000)
- //定義模式
- val loginEventPattern: Pattern[LoginEvent, LoginEvent] = Pattern.begin[LoginEvent]("begin")
- .where(_.message == "TMD")
- .times(5)
- .within(Time.seconds(10))
- //匹配模式
- val patternStream: PatternStream[LoginEvent] = CEP.pattern(loginEventStream.keyBy(_.userId), loginEventPattern)
- import scala.collection.Map
- val result = patternStream.select((pattern:Map[String, Iterable[LoginEvent]])=> {
- val first = pattern.getOrElse("begin", null).iterator.next()
- (first.userId, first.timestamp)
- })
- //惡意用戶(hù),實(shí)際處理可將按用戶(hù)進(jìn)行禁言等處理,為簡(jiǎn)化此處僅打印出該用戶(hù)
- result.print("惡意用戶(hù)>>>")
- env.execute("BarrageBehavior01")
- }
- }
實(shí)例二:監(jiān)測(cè)刷屏用戶(hù)
規(guī)則:用戶(hù)如果在10s內(nèi),同時(shí)連續(xù)輸入同樣一句話超過(guò)5次,就認(rèn)為是惡意刷屏。
使用 Flink CEP檢測(cè)刷屏用戶(hù)
- object BarrageBehavior02 {
- case class Message(userId: String, ip: String, msg: String)
- def main(args: Array[String]): Unit = {
- //初始化運(yùn)行環(huán)境
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- //設(shè)置并行度
- env.setParallelism(1)
- // 模擬數(shù)據(jù)源
- val loginEventStream: DataStream[Message] = env.fromCollection(
- List(
- Message("1", "192.168.0.1", "beijing"),
- Message("1", "192.168.0.2", "beijing"),
- Message("1", "192.168.0.3", "beijing"),
- Message("1", "192.168.0.4", "beijing"),
- Message("2", "192.168.10.10", "shanghai"),
- Message("3", "192.168.10.10", "beijing"),
- Message("3", "192.168.10.11", "beijing"),
- Message("4", "192.168.10.10", "beijing"),
- Message("5", "192.168.10.11", "shanghai"),
- Message("4", "192.168.10.12", "beijing"),
- Message("5", "192.168.10.13", "shanghai"),
- Message("5", "192.168.10.14", "shanghai"),
- Message("5", "192.168.10.15", "beijing"),
- Message("6", "192.168.10.16", "beijing"),
- Message("6", "192.168.10.17", "beijing"),
- Message("6", "192.168.10.18", "beijing"),
- Message("5", "192.168.10.18", "shanghai"),
- Message("6", "192.168.10.19", "beijing"),
- Message("6", "192.168.10.19", "beijing"),
- Message("5", "192.168.10.18", "shanghai")
- )
- )
- //定義模式
- val loginbeijingPattern = Pattern.begin[Message]("start")
- .where(_.msg != null) //一條登錄失敗
- .times(5).optional //將滿足五次的數(shù)據(jù)配對(duì)打印
- .within(Time.seconds(10))
- //進(jìn)行分組匹配
- val loginbeijingDataPattern = CEP.pattern(loginEventStream.keyBy(_.userId), loginbeijingPattern)
- //查找符合規(guī)則的數(shù)據(jù)
- val loginbeijingResult: DataStream[Option[Iterable[Message]]] = loginbeijingDataPattern.select(patternSelectFun = (pattern: collection.Map[String, Iterable[Message]]) => {
- var loginEventList: Option[Iterable[Message]] = null
- loginEventList = pattern.get("start") match {
- case Some(value) => {
- if (value.toList.map(x => (x.userId, x.msg)).distinct.size == 1) {
- Some(value)
- } else {
- None
- }
- }
- }
- loginEventList
- })
- //打印測(cè)試
- loginbeijingResult.filter(x=>x!=None).map(x=>{
- x match {
- case Some(value)=> value
- }
- }).print()
- env.execute("BarrageBehavior02)
- }
- }
Flink CEP API
除了案例中介紹的幾個(gè)API外,我們?cè)诮榻B下其他的常用API:
1. 條件 API
為了讓傳入事件被模式所接受,給模式指定傳入事件必須滿足的條件,這些條件由事件本身的屬性或者前面匹配過(guò)的事件的屬性統(tǒng)計(jì)量等來(lái)設(shè)定。比如,事件的某個(gè)值大于5,或者大于先前接受事件的某個(gè)值的平均值。
可以使用pattern.where()、pattern.or()、pattern.until()方法來(lái)指定條件。條件既可以是迭代條件IterativeConditions,也可以是簡(jiǎn)單條件SimpleConditions。
FlinkCEP支持事件之間的三種臨近條件:
- next():嚴(yán)格的滿足條件
示例:模式為begin("first").where(_.name='a').next("second").where(.name='b')當(dāng)且僅當(dāng)數(shù)據(jù)為a,b時(shí),模式才會(huì)被命中。如果數(shù)據(jù)為a,c,b,由于a的后面跟了c,所以a會(huì)被直接丟棄,模式不會(huì)命中。
- followedBy():松散的滿足條件
示例:模式為begin("first").where(_.name='a').followedBy("second").where(.name='b')當(dāng)且僅當(dāng)數(shù)據(jù)為a,b或者為a,c,b,模式均被命中,中間的c會(huì)被忽略掉。
- followedByAny():非確定的松散滿足條件
示例:模式為begin("first").where(_.name='a').followedByAny("second").where(.name='b')當(dāng)且僅當(dāng)數(shù)據(jù)為a,c,b,b時(shí),對(duì)于followedBy模式而言命中的為{a,b},對(duì)于followedByAny而言會(huì)有兩次命中{a,b},{a,b}。
2. 量詞 API
還記得我們?cè)谏厦嬷v解模式概念時(shí)說(shuō)過(guò)的一句話:一般情況下,模式都是單例模式,可以使用量詞(Quantifiers)將其轉(zhuǎn)換為循環(huán)模式。這里的量詞就是指的量詞API。
以下這幾個(gè)量詞API,可以將模式指定為循環(huán)模式:
- pattern.oneOrMore():一個(gè)給定的事件有一次或多次出現(xiàn),例如上面提到的b+。
- pattern.times(#ofTimes):一個(gè)給定類(lèi)型的事件出現(xiàn)了指定次數(shù),例如4次。
- pattern.times(#fromTimes, #toTimes):一個(gè)給定類(lèi)型的事件出現(xiàn)的次數(shù)在指定次數(shù)范圍內(nèi),例如2~4次。
- 可以使用pattern.greedy()方法將模式變成循環(huán)模式,但是不能讓一組模式都變成循環(huán)模式。greedy:就是盡可能的重復(fù)。
- 使用pattern.optional()方法將循環(huán)模式變成可選的,即可以是循環(huán)模式也可以是單個(gè)模式。
3. 匹配后的跳過(guò)策略
所謂的匹配跳過(guò)策略,是對(duì)多個(gè)成功匹配的模式進(jìn)行篩選。也就是說(shuō)如果多個(gè)匹配成功,可能我不需要這么多,按照匹配策略,過(guò)濾下就可以。
Flink中有五種跳過(guò)策略:
- NO_SKIP: 不過(guò)濾,所有可能的匹配都會(huì)被發(fā)出。
- SKIP_TO_NEXT: 丟棄與開(kāi)始匹配到的事件相同的事件,發(fā)出開(kāi)始匹配到的事件,即直接跳到下一個(gè)模式匹配到的事件,以此類(lèi)推。
- SKIP_PAST_LAST_EVENT: 丟棄匹配開(kāi)始后但結(jié)束之前匹配到的事件。
- SKIP_TO_FIRST[PatternName]: 丟棄匹配開(kāi)始后但在PatternName模式匹配到的第一個(gè)事件之前匹配到的事件。
- SKIP_TO_LAST[PatternName]: 丟棄匹配開(kāi)始后但在PatternName模式匹配到的最后一個(gè)事件之前匹配到的事件。
怎么理解上述策略,我們以NO_SKIP和SKIP_PAST_LAST_EVENT為例講解下:
在模式為:begin("start").where(_.name='a').oneOrMore().followedBy("second").where(_.name='b')中,我們輸入數(shù)據(jù):a,a,a,a,b ,如果是NO_SKIP策略,即不過(guò)濾策略,模式匹配到的是:{a,b},{a,a,b},{a,a,a,b},{a,a,a,a,b};如果是SKIP_PAST_LAST_EVENT策略,即丟棄匹配開(kāi)始后但結(jié)束之前匹配到的事件,模式匹配到的是:{a,a,a,a,b}。
Flink CEP 的使用場(chǎng)景
除上述案例場(chǎng)景外,F(xiàn)link CEP 還廣泛用于網(wǎng)絡(luò)欺詐,故障檢測(cè),風(fēng)險(xiǎn)規(guī)避,智能營(yíng)銷(xiāo)等領(lǐng)域。
1. 實(shí)時(shí)反作弊和風(fēng)控
對(duì)于電商來(lái)說(shuō),羊毛黨是必不可少的,國(guó)內(nèi)拼多多曾爆出 100 元的無(wú)門(mén)檻券隨便領(lǐng),當(dāng)晚被人褥幾百億,對(duì)于這種情況肯定是沒(méi)有做好及時(shí)的風(fēng)控。另外還有就是商家上架商品時(shí)通過(guò)頻繁修改商品的名稱(chēng)和濫用標(biāo)題來(lái)提高搜索關(guān)鍵字的排名、批量注冊(cè)一批機(jī)器賬號(hào)快速刷單來(lái)提高商品的銷(xiāo)售量等作弊行為,各種各樣的作弊手法也是需要不斷的去制定規(guī)則去匹配這種行為。
2. 實(shí)時(shí)營(yíng)銷(xiāo)
分析用戶(hù)在手機(jī) APP 的實(shí)時(shí)行為,統(tǒng)計(jì)用戶(hù)的活動(dòng)周期,通過(guò)為用戶(hù)畫(huà)像來(lái)給用戶(hù)進(jìn)行推薦。比如用戶(hù)在登錄 APP 后 1 分鐘內(nèi)只瀏覽了商品沒(méi)有下單;用戶(hù)在瀏覽一個(gè)商品后,3 分鐘內(nèi)又去查看其他同類(lèi)的商品,進(jìn)行比價(jià)行為;用戶(hù)商品下單后 1 分鐘內(nèi)是否支付了該訂單。如果這些數(shù)據(jù)都可以很好的利用起來(lái),那么就可以給用戶(hù)推薦瀏覽過(guò)的類(lèi)似商品,這樣可以大大提高購(gòu)買(mǎi)率。
3. 實(shí)時(shí)網(wǎng)絡(luò)攻擊檢測(cè)
當(dāng)下互聯(lián)網(wǎng)安全形勢(shì)仍然嚴(yán)峻,網(wǎng)絡(luò)攻擊屢見(jiàn)不鮮且花樣眾多,這里我們以 DDOS(分布式拒絕服務(wù)攻擊)產(chǎn)生的流入流量來(lái)作為遭受攻擊的判斷依據(jù)。對(duì)網(wǎng)絡(luò)遭受的潛在攻擊進(jìn)行實(shí)時(shí)檢測(cè)并給出預(yù)警,云服務(wù)廠商的多個(gè)數(shù)據(jù)中心會(huì)定時(shí)向監(jiān)控中心上報(bào)其瞬時(shí)流量,如果流量在預(yù)設(shè)的正常范圍內(nèi)則認(rèn)為是正?,F(xiàn)象,不做任何操作;如果某數(shù)據(jù)中心在 10 秒內(nèi)連續(xù) 5 次上報(bào)的流量超過(guò)正常范圍的閾值,則觸發(fā)一條警告的事件;如果某數(shù)據(jù)中心 30 秒內(nèi)連續(xù)出現(xiàn) 30 次上報(bào)的流量超過(guò)正常范圍的閾值,則觸發(fā)嚴(yán)重的告警。
Flink CEP 的原理簡(jiǎn)單介紹
Apache Flink在實(shí)現(xiàn)CEP時(shí)借鑒了Efficient Pattern Matching over Event Streams論文中NFA的模型,在這篇論文中,還提到了一些優(yōu)化,我們?cè)谶@里先跳過(guò),只說(shuō)下NFA的概念。
在這篇論文中,提到了NFA,也就是Non-determined Finite Automaton,叫做不確定的有限狀態(tài)機(jī),指的是狀態(tài)有限,但是每個(gè)狀態(tài)可能被轉(zhuǎn)換成多個(gè)狀態(tài)(不確定)。
非確定有限自動(dòng)狀態(tài)機(jī):
先介紹兩個(gè)概念:
- 狀態(tài):狀態(tài)分為三類(lèi),起始狀態(tài)、中間狀態(tài)和最終狀態(tài)。
- 轉(zhuǎn)換:take/ignore/proceed都是轉(zhuǎn)換的名稱(chēng)。
在NFA匹配規(guī)則里,本質(zhì)上是一個(gè)狀態(tài)轉(zhuǎn)換的過(guò)程。三種轉(zhuǎn)換的含義如下所示:
- Take: 主要是條件的判斷,當(dāng)過(guò)來(lái)一條數(shù)據(jù)進(jìn)行判斷,一旦滿足條件,獲取當(dāng)前元素,放入到結(jié)果集中,然后將當(dāng)前狀態(tài)轉(zhuǎn)移到下一個(gè)的狀態(tài)。
- Proceed:當(dāng)前的狀態(tài)可以不依賴(lài)任何的事件轉(zhuǎn)移到下一個(gè)狀態(tài),比如說(shuō)透?jìng)鞯囊馑肌?/li>
- Ignore:當(dāng)一條數(shù)據(jù)到來(lái)的時(shí)候,可以忽略這個(gè)消息事件,當(dāng)前的狀態(tài)保持不變,相當(dāng)于自己到自己的一個(gè)狀態(tài)。
NFA的特點(diǎn):在NFA中,給定當(dāng)前狀態(tài),可能有多個(gè)下一個(gè)狀態(tài)??梢噪S機(jī)選擇下一個(gè)狀態(tài),也可以并行(同時(shí))選擇下一個(gè)狀態(tài)。輸入符號(hào)可以為空。
規(guī)則引擎
規(guī)則引擎:將業(yè)務(wù)決策從應(yīng)用程序代碼中分離出來(lái),并使用預(yù)定義的語(yǔ)義模塊編寫(xiě)業(yè)務(wù)決策。接受數(shù)據(jù)輸入,解釋業(yè)務(wù)規(guī)則,并根據(jù)業(yè)務(wù)規(guī)則做出業(yè)務(wù)決策。
使用規(guī)則引擎可以通過(guò)降低實(shí)現(xiàn)復(fù)雜業(yè)務(wù)邏輯的組件的復(fù)雜性,降低應(yīng)用程序的維護(hù)和可擴(kuò)展性成本。
1. Drools
Drools 是一款使用 Java 編寫(xiě)的開(kāi)源規(guī)則引擎,通常用來(lái)解決業(yè)務(wù)代碼與業(yè)務(wù)規(guī)則的分離,它內(nèi)置的 Drools Fusion 模塊也提供 CEP 的功能。
優(yōu)勢(shì):
- 功能較為完善,具有如系統(tǒng)監(jiān)控、操作平臺(tái)等功能。
- 規(guī)則支持動(dòng)態(tài)更新。
劣勢(shì):
- 以?xún)?nèi)存實(shí)現(xiàn)時(shí)間窗功能,無(wú)法支持較長(zhǎng)跨度的時(shí)間窗。
- 無(wú)法有效支持定時(shí)觸達(dá)(如用戶(hù)在瀏覽發(fā)生一段時(shí)間后觸達(dá)條件判斷)。
2. Aviator
Aviator 是一個(gè)高性能、輕量級(jí)的 Java 語(yǔ)言實(shí)現(xiàn)的表達(dá)式求值引擎,主要用于各種表達(dá)式的動(dòng)態(tài)求值。
優(yōu)勢(shì):
- 支持大部分運(yùn)算操作符。
- 支持函數(shù)調(diào)用和自定義函數(shù)。
- 支持正則表達(dá)式匹配。
- 支持傳入變量并且性能優(yōu)秀。
劣勢(shì):
沒(méi)有 if else、do while 等語(yǔ)句,沒(méi)有賦值語(yǔ)句,沒(méi)有位運(yùn)算符。
3. EasyRules
EasyRules 集成了 MVEL 和 SpEL 表達(dá)式的一款輕量級(jí)規(guī)則引擎。
優(yōu)勢(shì):
- 輕量級(jí)框架,學(xué)習(xí)成本低。
- 基于 POJO。
- 為定義業(yè)務(wù)引擎提供有用的抽象和簡(jiǎn)便的應(yīng)用。
- 支持從簡(jiǎn)單的規(guī)則組建成復(fù)雜規(guī)則。
4. Esper
Esper 設(shè)計(jì)目標(biāo)為 CEP 的輕量級(jí)解決方案,可以方便的嵌入服務(wù)中,提供 CEP 功能。
優(yōu)勢(shì):
- 輕量級(jí)可嵌入開(kāi)發(fā),常用的 CEP 功能簡(jiǎn)單好用。
- EPL 語(yǔ)法與 SQL 類(lèi)似,學(xué)習(xí)成本較低。
劣勢(shì):
- 單機(jī)全內(nèi)存方案,需要整合其他分布式和存儲(chǔ)。
- 以?xún)?nèi)存實(shí)現(xiàn)時(shí)間窗功能,無(wú)法支持較長(zhǎng)跨度的時(shí)間窗。
- 無(wú)法有效支持定時(shí)觸達(dá)(如用戶(hù)在瀏覽發(fā)生一段時(shí)間后觸達(dá)條件判斷)。
5. Flink CEP
Flink 是一個(gè)流式系統(tǒng),具有高吞吐低延遲的特點(diǎn),F(xiàn)link CEP 是一套極具通用性、易于使用的實(shí)時(shí)流式事件處理方案。
優(yōu)勢(shì):
- 繼承了 Flink 高吞吐的特點(diǎn)。
- 事件支持存儲(chǔ)到外部,可以支持較長(zhǎng)跨度的時(shí)間窗。
- 可以支持定時(shí)觸達(dá)(用 followedBy + PartternTimeoutFunction 實(shí)現(xiàn))。
網(wǎng)頁(yè)標(biāo)題:以直播平臺(tái)監(jiān)控用戶(hù)彈幕為例詳解FlinkCEP
文章分享:http://www.fisionsoft.com.cn/article/djijddj.html


咨詢(xún)
建站咨詢(xún)
