Kafka——Consumer提交偏移量
Kafka提交偏移量的方式:
- 自动提交
- 手动提交——同步
- 手动提交——异步
自动提交
package test.kafka
import java.util.Properties
import org.apache.kafka.common.serialization.StringSerializer
class kafkaProperities {
private val KFK_BROKERS = "host1:port1,host2:port2,post3:port3"
def getKakConPros:Properties = {
val kfkConProps: Properties = new Properties()
kfkConProps.setProperty("bootstrap.servers", KFK_BROKERS)
kfkConProps.setProperty("group.id", "kafka_consumer")
kfkConProps.setProperty("enable.auto.commit", "true")
kfkConProps.setProperty("auto.commit.interval.ms", "5000")
kfkConProps.setProperty("key.deserializer", classOf[StringSerializer].getName)
kfkConProps.setProperty("value.deserializer", classOf[StringSerializer].getName)
kfkConProps
}
}
package test.kafka.kafkaconsumer
import java.util
import java.util.Properties
import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerRecords, KafkaConsumer}
import test.kafka.kafkaProperities
object autoCommitOffsets {
private val KAFKA_PROPERITIES = new kafkaProperities()
def main(args: Array[String]): Unit = {
val kfkConProps: Properties = KAFKA_PROPERITIES.getKakConPros
val consumer: KafkaConsumer[String, String] = new KafkaConsumer(kfkConProps)
consumer.subscribe(util.Arrays.asList(KAFKA_PROPERITIES.TOPIC))
try {
while(true) {
val consumerRecords: ConsumerRecords[String, String] = consumer.poll(100)
val records: util.Iterator[ConsumerRecord[String, String]] = consumerRecords.iterator()
while (records.hasNext){
val record: ConsumerRecord[String, String] = records.next()
println("topic: " + record.topic() + " partition: " + record.partition() + " offset: " + record.offset())
}
}
} finally {
consumer.close()
}
}
}
手动提交——同步
修改getKakConPros的参数:
kfkConProps.setProperty("enable.auto.commit", "false")
//kfkConProps.setProperty("auto.commit.interval.ms", "5000")
package test.kafka.kafkaconsumer
import java.util
import java.util.Properties
import java.util.logging.Logger
import org.apache.kafka.clients.consumer.{CommitFailedException, ConsumerRecord, ConsumerRecords, KafkaConsumer}
import test.kafka.kafkaProperities
object syncCommitOffsets {
private val KAFKA_PROPERITIES = new kafkaProperities()
def main(args: Array[String]): Unit = {
val kfkConProps: Properties = KAFKA_PROPERITIES.getKakConPros
val consumer: KafkaConsumer[String, String] = new KafkaConsumer(kfkConProps)
consumer.subscribe(util.Arrays.asList(KAFKA_PROPERITIES.TOPIC))
while(true) {
val consumerRecords: ConsumerRecords[String, String] = consumer.poll(100)
val records: util.Iterator[ConsumerRecord[String, String]] = consumerRecords.iterator()
while (records.hasNext) {
val record: ConsumerRecord[String, String] = records.next()
println("topic: " + record.topic() + " partition: " + record.partition() + " offset: " + record.offset())
}
try {
consumer.commitSync()
} catch {
case e: CommitFailedException => {
println("commit failed!" + e)
}
}
}
}
}
手动提交——异步提交
package test.kafka.kafkaconsumer
import java.util
import java.util.Properties
import org.apache.kafka.clients.consumer._
import org.apache.kafka.common.TopicPartition
import test.kafka.kafkaProperities
object asyncCommitOffsets {
private val KAFKA_PROPERITIES = new kafkaProperities()
def main(args: Array[String]): Unit = {
val kfkConProps: Properties = KAFKA_PROPERITIES.getKakConPros
val consumer: KafkaConsumer[String, String] = new KafkaConsumer(kfkConProps)
consumer.subscribe(util.Arrays.asList(KAFKA_PROPERITIES.TOPIC))
while(true) {
val consumerRecords: ConsumerRecords[String, String] = consumer.poll(100)
val records: util.Iterator[ConsumerRecord[String, String]] = consumerRecords.iterator()
while (records.hasNext){
val record: ConsumerRecord[String, String] = records.next()
println("topic: " + record.topic() + " partition: " + record.partition() + " offset: " + record.offset())
}
//consumer.commitAsync()
//也支持回调
consumer.commitAsync(new OffsetCommitCallback {
override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = {
if (exception != null) {
println("Commit failed for offset " + offsets.values().iterator().next().offset())
} else {
println("Commit succeeded!")
}
}
})
}
}
}