Kafka介绍
Kafka是由Apache软件基金会开发的统一、高吞吐、低延迟的一个开源流处理平台。在游戏中,一些日志会用到它,游戏服务器是消息的生产者产生日志消息;日志服务器是消息的消费者,处理游戏的日志并存数据库。
Kafka基本使用
消息生产者
import (
"strings"
"github.com/Shopify/sarama"
"github.com/zngw/log"
)
var producer sarama.AsyncProducer
// 初始化生产者
func InitProducer(hosts string) {
config := sarama.NewConfig()
client, err := sarama.NewClient(strings.Split(hosts, ","), config)
if err != nil {
log.Error("unable to create kafka client: ", err)
}
producer, err = sarama.NewAsyncProducerFromClient(client)
if err != nil {
log.Error(err)
}
}
// 发送消息
func Send(topic, data string) {
producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(data)}
log.Trace("kafka", "Produced message: ["+ data+"]")
}
func Close() {
if producer != nil {
producer.Close()
}
}
消息的消费者
import (
"strings"
"github.com/Shopify/sarama"
"github.com/zngw/log"
)
var consumer sarama.Consumer
// 消费者回调函数
type ConsumerCallback func(data []byte)
// 初始化消费者
func InitConsumer(hosts string) {
config := sarama.NewConfig()
client, err := sarama.NewClient(strings.Split(hosts, ","), config)
if err != nil {
log.Error("unable to create kafka client: ", err)
}
consumer, err = sarama.NewConsumerFromClient(client)
if err != nil {
log.Error(err)
}
}
// 消费者循环
func LoopConsumer(topic string, callback ConsumerCallback) {
partitionConsumer, err := consumer.ConsumePartition(topic, 0, sarama.OffsetNewest)
if err != nil {
log.Error(err)
return
}
defer partitionConsumer.Close()
for {
msg := <-partitionConsumer.Messages()
if callback != nil {
callback(msg.Value)
}
}
}
func Close() {
if consumer!= nil {
consumer.Close()
}
}
完整的测试实例
- 生产者测试producer.go
package main
import (
"github.com/zngw/kafka"
"github.com/zngw/log"
"os/signal"
"runtime"
"syscall"
)
func main() {
// 初始化日志
err := log.Init(nil)
if err != nil {
panic(err)
}
// 初始化生产生
err = kafka.InitProducer("192.168.1.29:9092")
if err != nil {
panic(err)
}
// 关闭
defer kafka.Close()
// 发送测试消息
kafka.Send("Test","This is Test Msg")
kafka.Send("Test","Hello Guoke")
signal.Ignore(syscall.SIGHUP)
runtime.Goexit()
}
- 消费者测试consumer.go
package main
import (
"github.com/zngw/kafka"
"github.com/zngw/log"
"os/signal"
"runtime"
"syscall"
)
func main() {
// 初始化日志
err := log.Init(nil)
if err != nil {
panic(err)
}
// 初始化消费者
err = kafka.InitConsumer("192.168.1.29:9092")
if err != nil {
panic(err)
}
// 监听
go func() {
err = kafka.LoopConsumer("Test", TopicCallBack)
if err != nil {
panic(err)
}
}()
signal.Ignore(syscall.SIGHUP)
runtime.Goexit()
}
func TopicCallBack(data []byte) {
log.Trace("kafka", "Test:"+string(data))
}
- 执行结果