Go 中使用Kafka

Kafka介绍

Kafka是由Apache软件基金会开发的统一、高吞吐、低延迟的一个开源流处理平台。在游戏中,一些日志会用到它,游戏服务器是消息的生产者产生日志消息;日志服务器是消息的消费者,处理游戏的日志并存数据库。

Kafka基本使用

消息生产者

import (
    "strings"
    "github.com/Shopify/sarama"
    "github.com/zngw/log"
)

var producer sarama.AsyncProducer

// 初始化生产者
func InitProducer(hosts string) {
    config := sarama.NewConfig()
    client, err := sarama.NewClient(strings.Split(hosts, ","), config)
    if err != nil {
        log.Error("unable to create kafka client: ", err)
    }
    producer, err = sarama.NewAsyncProducerFromClient(client)
    if err != nil {
        log.Error(err)
    }
}

// 发送消息
func Send(topic, data string) {
    producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(data)}
    log.Trace("kafka", "Produced message: ["+ data+"]")
}

func Close() {
    if producer != nil {
        producer.Close()
    }
}

消息的消费者

import (
    "strings"
    "github.com/Shopify/sarama"
    "github.com/zngw/log"
)

var consumer sarama.Consumer

// 消费者回调函数
type ConsumerCallback func(data []byte)

// 初始化消费者
func InitConsumer(hosts string) {
    config := sarama.NewConfig()
    client, err := sarama.NewClient(strings.Split(hosts, ","), config)
    if err != nil {
        log.Error("unable to create kafka client: ", err)
    }

    consumer, err = sarama.NewConsumerFromClient(client)
    if err != nil {
        log.Error(err)
    }
}

// 消费者循环
func LoopConsumer(topic string, callback ConsumerCallback) {
    partitionConsumer, err := consumer.ConsumePartition(topic, 0, sarama.OffsetNewest)
    if err != nil {
        log.Error(err)
        return
    }
    defer partitionConsumer.Close()

    for {
        msg := <-partitionConsumer.Messages()
        if callback != nil {
            callback(msg.Value)
        }
    }
}

func Close() {
    if consumer!= nil {
        consumer.Close()
    }
}

完整的测试实例

  • 生产者测试producer.go
package main

import (
    "github.com/zngw/kafka"
    "github.com/zngw/log"
    "os/signal"
    "runtime"
    "syscall"
)

func main() {
    // 初始化日志
    err := log.Init(nil)
    if err != nil {
        panic(err)
    }

    // 初始化生产生
    err = kafka.InitProducer("192.168.1.29:9092")
    if err != nil {
        panic(err)
    }

    // 关闭
    defer kafka.Close()

    // 发送测试消息
    kafka.Send("Test","This is Test Msg")
    kafka.Send("Test","Hello Guoke")

    signal.Ignore(syscall.SIGHUP)
    runtime.Goexit()
}
  • 消费者测试consumer.go
package main

import (
    "github.com/zngw/kafka"
    "github.com/zngw/log"
    "os/signal"
    "runtime"
    "syscall"
)

func main() {
    // 初始化日志
    err := log.Init(nil)
    if err != nil {
        panic(err)
    }

    // 初始化消费者
    err = kafka.InitConsumer("192.168.1.29:9092")
    if err != nil {
        panic(err)
    }

    // 监听
    go func() {
        err = kafka.LoopConsumer("Test", TopicCallBack)
        if err != nil {
            panic(err)
        }
    }()

    signal.Ignore(syscall.SIGHUP)
    runtime.Goexit()
}

func TopicCallBack(data []byte) {
    log.Trace("kafka", "Test:"+string(data))
}
  • 执行结果
0%