Kafka介绍

Kafka是由Apache软件基金会开发的统一、高吞吐、低延迟的一个开源流处理平台。在游戏中,一些日志会用到它,游戏服务器是消息的生产者产生日志消息;日志服务器是消息的消费者,处理游戏的日志并存数据库。

Kafka基本使用

消息生产者

import (
	"strings"
	"github.com/IBM/sarama"
	"github.com/zngw/log"
)

var producer sarama.AsyncProducer

// 初始化生产者
func InitProducer(hosts string) {
	config := sarama.NewConfig()
	client, err := sarama.NewClient(strings.Split(hosts, ","), config)
	if err != nil {
		log.Error("unable to create kafka client: ", err)
	}
	producer, err = sarama.NewAsyncProducerFromClient(client)
	if err != nil {
		log.Error(err)
	}
}

// 发送消息
func Send(topic, data string) {
	producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(data)}
	log.Trace("kafka", "Produced message: ["+ data+"]")
}

func Close() {
	if producer != nil {
		producer.Close()
	}
}

消息的消费者

import (
	"strings"
	"github.com/IBM/sarama"
	"github.com/zngw/log"
)

var consumer sarama.Consumer

// 消费者回调函数
type ConsumerCallback func(data []byte)

// 初始化消费者
func InitConsumer(hosts string) {
	config := sarama.NewConfig()
	client, err := sarama.NewClient(strings.Split(hosts, ","), config)
	if err != nil {
		log.Error("unable to create kafka client: ", err)
	}

	consumer, err = sarama.NewConsumerFromClient(client)
	if err != nil {
		log.Error(err)
	}
}

// 消费者循环
func LoopConsumer(topic string, callback ConsumerCallback) {
	partitionConsumer, err := consumer.ConsumePartition(topic, 0, sarama.OffsetNewest)
	if err != nil {
		log.Error(err)
		return
	}
	defer partitionConsumer.Close()

	for {
		msg := <-partitionConsumer.Messages()
		if callback != nil {
			callback(msg.Value)
		}
	}
}

func Close() {
	if consumer!= nil {
		consumer.Close()
	}
}

完整的测试实例

  • 生产者测试producer.go
package main

import (
	"github.com/zngw/kafka"
	"github.com/zngw/log"
	"os/signal"
	"runtime"
	"syscall"
)

func main() {
	// 初始化日志
	err := log.Init(nil)
	if err != nil {
		panic(err)
	}

	// 初始化生产生
	err = kafka.InitProducer("192.168.1.29:9092")
	if err != nil {
		panic(err)
	}

	// 关闭
	defer kafka.Close()

	// 发送测试消息
	kafka.Send("Test","This is Test Msg")
	kafka.Send("Test","Hello Guoke")

	signal.Ignore(syscall.SIGHUP)
	runtime.Goexit()
}

  • 消费者测试consumer.go
package main

import (
	"github.com/zngw/kafka"
	"github.com/zngw/log"
	"os/signal"
	"runtime"
	"syscall"
)

func main() {
	// 初始化日志
	err := log.Init(nil)
	if err != nil {
		panic(err)
	}

	// 初始化消费者
	err = kafka.InitConsumer("192.168.1.29:9092")
	if err != nil {
		panic(err)
	}

	// 监听
	go func() {
		err = kafka.LoopConsumer("Test", TopicCallBack)
		if err != nil {
			panic(err)
		}
	}()

	signal.Ignore(syscall.SIGHUP)
	runtime.Goexit()
}

func TopicCallBack(data []byte) {
	log.Trace("kafka", "Test:"+string(data))
}
  • 执行结果