Kafka——再均衡监听器
在为消费者分配新分区或者移除旧分区时,可以通过消费者API执行一些应用程序代码,在调用subscribe方法时,传进去一个ConsumerRebalanceListener实例就可以了。ConsumerRebalanceListener有两个需要实现的方法。
- onPartitionsRevoked方法会在再均衡开始之前和消费者停止读取消息之后被调用。如果在这里提交偏移量,下一个接管分区的消费者就知道该从哪里开始读取了。要注意,提交的是最近处理过的偏移量,而不是批次中还在处理的最后一个偏移量。因为分区有可能在我们还在处理消息的时候被撤回。我们要提交所有分区的偏移量,而不只是那些即将失去所有权的分区的偏移量——因为提交的偏移量时已经处理过的,所以不会有什么问题。调用commitSync方法,确保在再均衡发生之前提交偏移量。
- onPartitionsAssigned方法会在重新分配分区之后和消费者开始读取消息之前被调用。
package test.kafka.kafkaconsumer
import java.util
import org.apache.kafka.clients.consumer.{ConsumerRebalanceListener, KafkaConsumer, OffsetAndMetadata}
import org.apache.kafka.common.TopicPartition
class HandleRebalance(
consumer:KafkaConsumer[String, String],
currentOffsets: util.HashMap[TopicPartition, OffsetAndMetadata]
) extends ConsumerRebalanceListener {
override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]): Unit = {
// println("Lost partitions in rebalance. Committing current offsets: " + currentOffsets)
consumer.commitSync(currentOffsets)
}
override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): Unit = {}
}
package test.kafka.kafkaconsumer
import java.util
import java.util.Properties
import org.apache.kafka.clients.consumer._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.WakeupException
import test.kafka.kafkaProperities
object RebalanceListener {
private val KAFKA_PROPERITIES = new kafkaProperities()
def main(args: Array[String]): Unit = {
val kfkConProps: Properties = KAFKA_PROPERITIES.getKakConPros
val currentOffsets: util.HashMap[TopicPartition, OffsetAndMetadata] = new util.HashMap[TopicPartition, OffsetAndMetadata]()
val consumer: KafkaConsumer[String, String] = new KafkaConsumer(kfkConProps)
try {
consumer.subscribe(util.Arrays.asList(KAFKA_PROPERITIES.TOPIC), new HandleRebalance(consumer, currentOffsets))
while (true) {
val consumerRecords: ConsumerRecords[String, String] = consumer.poll(100)
val records: util.Iterator[ConsumerRecord[String, String]] = consumerRecords.iterator()
while (records.hasNext){
val record: ConsumerRecord[String, String] = records.next()
println("topic: " + record.topic() + " partition: " + record.partition() + " offset: " + record.offset())
currentOffsets.put(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1, ""))
}
consumer.commitAsync(currentOffsets, null)
}
} catch {
case e: WakeupException => println(e)
case e: Exception => println("Unexpected error:" + e)
} finally {
try {
consumer.commitSync(currentOffsets)
} finally {
consumer.close()
}
}
}
}