Flink——项目:实时计算PVUV
主要内容:
- 介绍实时计算PVUV项目
需求
根据埋点信息,包括移动端埋点和Web端埋点,实时统计T系统的PVUV变化情况。
- 实时显示:当天总PVUV变化
- 实时显示:当前每一个小时PVUV的变化
- 数据不能保证有序,需要做延时处理
- 数据源是kafka,PVUV数据来自两个topic,日志格式不同
- 数据量:移动端+Web 端 28-32G/天
分析
先分析可确定项:
- 时间语义选择EventTime,利用数据自带的时间戳信息:
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
提取时间戳字段并生成watermark(尽量放在靠近数据源的位置):
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[LogMsg](Time.seconds(2)) {
override def extractTimestamp(element: LogMsg): Long = {
element.time
}
})
- 数据源DataSource使用KafkaSource,Consumer使用FlinkKafkaConsumer,topic参数使用List
:
public FlinkKafkaConsumer(List<String> topics, DeserializationSchema<T> deserializer, Properties props) {
this(topics, new KafkaDeserializationSchemaWrapper<>(deserializer), props);
}
- 根据不同的日志格式编写正则,从多个字段中匹配出子系统编码、用户ID、时间戳等有用字段封装成LogMag对象:
case class LogMsg(subSysCode:String = "", userId:String = "", time:Long = 0L, enabledflag: Int = 0)
kafkaData.filter(new FilterFunction[String] {
override def filter(value: String): Boolean = {
val subSysCode: Try[String] = getSubSysCode(value)
if (subSysCode.isSuccess) {
if (ICENTER_SUB_SYS_CODE == subSysCode.get) true else false
} else false
}
}).map(line => {
if (mapToLogMsg(line).isSuccess) {
mapToLogMsg(line).get
} else {
LogMsg()
}
}).filter(_.enabledflag == 1)
在将数据封装成LogMsg时,使用了Try来处理异常,返回结果需要调用isSuccess
方法判断是否存在异常,获取具体值时需要调用get
方法。
匹配不同日志格式中的子系统编码信息:
def getSubSysCode(value: String): Try[String] = {
Try {
val logSplit: Array[String] = value.split("\\|")
if (logSplit.length > 2) { // web端埋点
logSplit(5)
} else { // 移动端埋点
(subSysCodeMobilePattern findFirstIn value).get.split("\"")(3)
}
}
}
匹配不同日志格式中的时间戳和用户ID信息:
def mapToLogMsg(value: String): Try[LogMsg] = {
val subSysCode: String = T_SUB_SYS_CODE
Try {
val logSplit: Array[String] = value.split("\\|")
if (logSplit.length > 2) { // web端埋点
val time: Long = dateFormat.parse(logSplit(0)).getTime
val userId: String = (userIdWebPattern findFirstIn value).get.split("\\^")(1)
LogMsg(subSysCode, userId, time, 1)
} else { // 移动端埋点
val time: Long = dateFormat.parse(logSplit(0)).getTime
val userId: String = (userIdMobilePattern findFirstIn value).get.split("\"")(3)
LogMsg(subSysCode, userId, time, 1)
}
}
}
以上工作均为准备工作,重点在于选择哪种方式统计PVUV,以及数据存储位置。
方案
根据统计方式和数据存储位置的不同,可供选择的方案:
方案1
- 统计方式:使用windowAll,窗口关闭时统计整个窗口内数据,PV使用计数器统计,UV使用Set去重。windowAll:Non-Keyed Window,不分组,将数据流中的所有元素分配到相应的窗口中,不可设置并行度,始终为1。
- 存储位置:内存
该方案以空间换时间,处理速度相对较快,但处理过程中会将数据暂存在内存中,若是数据量特别大或者内存大小有限的情况下会造成OOM问题。若数据量不大时,也是一个不错的选择。
该方案若不设置触发器,则等待的过程中没有数据输出,即在计算一个小时内的PVUV时,只有到达整点时才有数据。所以无法满足需求1和需求2。但可以通过设置Trigger来触发计算,如可以设置固定时间间隔触发或者数据到达一定量时触发计算,如:ContinuousEventTimeTrigger
和CountTrigger
。
另外,该方案在计算PVUV时,若不借助外界存储,是不能统计全天的PVUV累积量的,只能统计每个小时的变化情况。
.timeWindowAll(Time.hours(24))
.apply(new AllWindowFunction[LogMsg, PvUvCount, TimeWindow] {
override def apply(window: TimeWindow, input: Iterable[LogMsg], out: Collector[PvUvCount]): Unit = {
// 定义一个set 保存userId
val userSet: Set[String] = Set[String]()
// 定义一个计数器
var userCount: Long = 0
// 把当前所有userId收集到set,最后输出set大小
for (eachLog <- input) {
userSet += eachLog.userId
userCount += 1
}
val windowStart: String = DATE_FORMAT.format(window.getStart)
val windowEnd: String = DATE_FORMAT.format(window.getEnd)
out.collect(PvUvCount(windowStart, windowEnd, userCount, userSet.size))
}
}).print("output data ")
方案2
- 统计方式:使用timeWindow,可以设置多并行度,可以设置数据来一条处理一条,或处理多条。使用process函数时借助
ProcessWindowFunction
抽象类,该抽象类继承了RichFunction的一些方法,可选择使用open或者close来创建和关闭Redis连接。也可以通过addSink写入Redis。 - 存储位置:Reids或者Hbase,这里选择redis。计算PV使用Hashmap数据结构的hincrBy方法执行+1操作,计算UV使用Set数据结构去重。
该方案可借助Redis计算PVUV。当天的总PV可以按照每小时的PV统计量进行累加,但是UV不能累加,可将每个小时的userId都放在同一个Set里。当窗口触发时,先执行PV+1操作,再将userId加入到Set,最后执行读取Set长度的操作,并记录到另一个hashmap里。
该方案可以满足需求上述全部需求。但是该方案在数据量特别大的特殊情况下,比如每秒几亿条数据,userId会撑爆Redis服务器的内存。但是在本需求提供的数据量下是没有问题的。
.keyBy(_.subSysCode)
.timeWindow(Time.hours(24))
.process(new ProcessWindowFunction[LogMsg, PvUvCount, String, TimeWindow] {
var jedis: Jedis = _
override def open(parameters: Configuration): Unit = {
super.open(parameters)
this.jedis = new Jedis("", 6379)
}
override def close(): Unit = {
super.close()
if (this.jedis != null) {
this.jedis.close()
}
}
override def process(key: String, context: Context, elements: Iterable[LogMsg], out: Collector[PvUvCount]): Unit = {
val userId: String = elements.last.userId
jedis.hincrBy("key", context.window.getEnd.toString, 1)
for (element <- elements) {
// pv
jedis.hincrBy("pvkey", context.window.getEnd.toString, 1)
// uv
jedis.sadd("uvkey", userId)
}
val uvCount: lang.Long = jedis.scard("uvkey")
jedis.hset("uvkey-hours", context.window.getEnd.toString, uvCount.toString)
}
})
方案3
在方案2的基础上,使用布隆过滤器压缩处理userId,存储在位图,用一个位来表示userId。