需求

最近在弄一个游戏的gate网关转发服务器,服务器之间使用的是nats通讯,gate的作用是接收客户端发来的消息转发到对应的服务器上,并从nats上获取游戏服务器发送给客户端的消息并转发给客户端。前面接收还好处理,因为都是发布订阅模式的消息,收到消息直接向nats上扔就行了。但转发服务器来的消息就不一样了,从nats上取的速度远大于gate转发给客户端的速度,会有数据囤积在nats中。为了解决这个问题,可以一个协和去nats中取数据,用多个协程并行转发给客户端,因为现在cpu都是n核的,多协程转发肯定会快的不止一点点,这里要注意一点的是,同个玩家的消息转发的顺序不能变,就是按一定的规则把同一个玩家的所有消息在同一个协程上转发就可以了。
为了现实这个转发,需要一个无限缓存的channel,先装nats中的数据读出来分别放到各转发协程的channal中,转发协程只从自己的channel取数据一条一条慢慢转发给客户端就行了。

设计

先说一下设计无限缓存channel的大至思路,然后直接上完整代码。

分析

要达到前面的需求,我们需要设计的无限缓存channel应该满足几个要求:

  1. 缓存无限。因为不知道nats那边会接收到多少消息,而转发可能会因网络波动阻塞。
  2. 不能阻塞写。要保证接收nats消息的协程能及时处理所有的消息,并写入转发协程的channel中。
  3. 无数据时阻塞读,此特性保持和普通channle一样。没数据时转发协程处理阻塞等待。
  4. 读写都应通过channle操作,和普通channel的操作一样。
  5. channle被关闭后,未读取的数据应该仍然可读,此特性和普通channle保持一致。

针对上面的要求,设计如下:

  1. 因为go暂不支持操作符重载,所以封装一个结构体,包含二个channel来分离读(Out Channel)和写(In Channel)。
  2. 因为channel的缓存大小是有限的,需要一个可无限扩容缓存多于的数据,这里可以使用ringbuffer来实现。
  3. 无限缓存内部数据FIFO实现

当Out Channel还没有满时,并且buf中没有数据,读取In中数据,将其放入Out,直到Out满
当Buf中有数据时,无论Out是否满,都将将In中读到的数据,直接写入到Buf中,目的就是为了保证数据的FIFO原则

实现代码

完整工程参见:https://github.com/zngw/zchan

1、双向环形链表

package ringbuffer

import (
	"errors"
	"fmt"
	"sync/atomic"
)

type T interface{}

var ErrIsEmpty = errors.New("ringbuffer is empty")

type cell struct {
	Data     []T   // 数据部分
	fullFlag bool  // cell满的标志
	next     *cell // 指向后一个cellBuffer
	pre      *cell // 指向前一个cellBuffer

	r int // 下一个要读的指针
	w int // 下一个要下的指针
}

type RingBuffer struct {
	cellSize  int   // cell大小
	cellCount int   // cell数量
	count     int32 // 有效元素个数

	readCell  *cell // 下一个要读的cell
	writeCell *cell // 下一个要写的cell
}

// NewRingBuffer 新建一个RingBuffer,包含两个cell
func NewRingBuffer(cellSize int) (buf *RingBuffer, err error) {
	if cellSize <= 0 || cellSize&(cellSize-1) != 0 {
		err = fmt.Errorf("初始大小必须是 2 的幂")
		return
	}

	rootCell := &cell{
		Data: make([]T, cellSize),
	}
	lastCell := &cell{
		Data: make([]T, cellSize),
	}
	rootCell.pre = lastCell
	lastCell.pre = rootCell
	rootCell.next = lastCell
	lastCell.next = rootCell

	buf = &RingBuffer{
		cellSize:  cellSize,
		cellCount: 2,
		count:     0,
		readCell:  rootCell,
		writeCell: rootCell,
	}

	return
}

// Read 读取数据
func (r *RingBuffer) Read() (data T, err error) {
	// 无数据
	if r.IsEmpty() {
		err = ErrIsEmpty
		return
	}

	// 读取数据,并将读指针向右移动一位
	data = r.readCell.Data[r.readCell.r]
	r.readCell.r++
	atomic.AddInt32(&r.count, -1)

	// 此cell已经读完
	if r.readCell.r == r.cellSize {
		// 读指针归零,并将该cell状态置为非满
		r.readCell.r = 0
		r.readCell.fullFlag = false
		// 将readCell指向下一个cell
		r.readCell = r.readCell.next
	}

	return
}

// Pop 读一个元素,读完后移动指针
func (r *RingBuffer) Pop() (data T) {
	data, err := r.Read()
	if errors.Is(err, ErrIsEmpty) {
		panic(ErrIsEmpty.Error())
	}
	return
}

// Peek 窥视 读一个元素,仅读但不移动指针
func (r *RingBuffer) Peek() (data T) {
	if r.IsEmpty() {
		panic(ErrIsEmpty.Error())
	}

	// 仅读
	data = r.readCell.Data[r.readCell.r]
	return
}

// Write 写入数据
func (r *RingBuffer) Write(value T) {
	// 在 r.writeCell.w 位置写入数据,指针向右移动一位
	r.writeCell.Data[r.writeCell.w] = value
	r.writeCell.w++
	atomic.AddInt32(&r.count, 1)

	// 当前cell写满了
	if r.writeCell.w == r.cellSize {
		// 指针置0,将该cell标记为已满,并指向下一个cell
		r.writeCell.w = 0
		r.writeCell.fullFlag = true
		r.writeCell = r.writeCell.next
	}

	// 下一个cell也已满,扩容
	if r.writeCell.fullFlag == true {
		r.grow()
	}
}

// grow 扩容
func (r *RingBuffer) grow() {
	// 新建一个cell
	newCell := &cell{
		Data: make([]T, r.cellSize),
	}

	// 总共三个cell,writeCell,preCell,newCell
	// 本来关系: preCell <===> writeCell
	// 现在将newcell插入:preCell <===> newCell <===> writeCell
	pre := r.writeCell.pre
	pre.next = newCell
	newCell.pre = pre
	newCell.next = r.writeCell
	r.writeCell.pre = newCell

	// 将writeCell指向新建的cell
	r.writeCell = r.writeCell.pre

	// cell 数量加一
	r.cellCount++
}

// IsEmpty 判断RingBuffer是否为空
func (r *RingBuffer) IsEmpty() bool {
	return r.Len() == 0
}

// Capacity RingBuffer容量
func (r *RingBuffer) Capacity() int {
	return r.cellCount * r.cellSize
}

// Len RingBuffer数据长度
func (r *RingBuffer) Len() (count int) {
	count = int(r.count)
	return
}

// Reset 重置为仅指向两个cell的ring
func (r *RingBuffer) Reset() {
	// 没有数据切cellCount只有两个时,无需重置
	if r.count == 0 && r.cellCount == 2 {
		return
	}

	lastCell := r.readCell.next

	lastCell.w = 0
	lastCell.r = 0
	r.readCell.r = 0
	r.readCell.w = 0
	r.cellCount = 2
	r.count = 0

	lastCell.next = r.readCell
}

2. 无限缓存channel

package zchan

import (
	"github.com/zngw/ringbuffer"
)

type T interface{}

type ZChan struct {
	In     chan<- T               // 写入channel
	Out    <-chan T               // 读取channel
	buffer *ringbuffer.RingBuffer // 双向环形链表
}

// Len uc中总共的元素数量
func (uc *ZChan) Len() int {
	return len(uc.In) + uc.BufLen() + len(uc.Out)
}

// BufLen uc的buf中的元素数量
func (uc *ZChan) BufLen() int {
	return uc.buffer.Len()
}

// New 新建一个无限缓存的Channel,并指定In和Out大小(In和Out设置得一样大)
func New(initCapacity int) (ch *ZChan, err error) {
	rb, err := ringbuffer.NewRingBuffer(512)
	if err != nil {
		return
	}

	in := make(chan T, initCapacity)
	out := make(chan T, initCapacity)
	ch = &ZChan{In: in, Out: out, buffer: rb}

	go process(in, out, ch)

	return
}

// 内部Worker Goroutine实现
func process(in, out chan T, ch *ZChan) {
	defer close(out) // in 关闭,数据读取后也把out关闭

	// 不断从in中读取数据放入到out或者buf中
loop:
	for {
		// 第一步:从in中读取数据
		value, ok := <-in
		if !ok {
			// in 关闭了,退出loop
			break loop
		}

		// 第二步:将数据存储到out或者buf中
		if ch.buffer.Len() > 0 {
			// 当buf中有数据时,新数据优先存放到buf中,确保数据FIFO原则
			ch.buffer.Write(value)

		} else {
			// out 没有满,数据放入out中
			select {
			case out <- value:
				continue
			default:
			}

			// out 满了,数据放入buf中
			ch.buffer.Write(value)
		}

		// 第三步:处理buf,一直尝试把buf中的数据放入到out中,直到buf中没有数据
		for ch.buffer.Len() > 0 {
			select {
			// 为了避免阻塞in,还要尝试从in中读取数据
			case val, ok := <-in:
				if !ok {
					// in 关闭了,退出loop
					break loop
				}
				// 因为这个时候out是满的,新数据直接放入buf中
				ch.buffer.Write(val)

			// 将buf中数据放入out
			case out <- ch.buffer.Peek():
				ch.buffer.Pop()

				if ch.buffer.IsEmpty() {
					ch.buffer.Reset()
				}
			}
		}
	}

	// in被关闭退出loop后,buf中还有可能有未处理的数据,将他们塞入out中,并重置buf
	for ch.buffer.Len() > 0 {
		out <- ch.buffer.Pop()
	}
}

3. 使用测试

package main

import (
	"fmt"
	"github.com/zngw/zchan"
	"time"
)

func main() {
	zc, err := zchan.New(4)
	if err != nil {
		panic(err.Error())
	}

	go func() {
		// 写入channel数据
		// 10毫秒写入1次
		for i := 0; i < 55; i++ {
			zc.In <- i
			fmt.Printf("写入数据:%v, chan长度:%v, Buf长度: %v \n", i, zc.Len(), zc.BufLen())
			time.Sleep(10 * time.Millisecond)
		}

		close(zc.In)
	}()

	for v := range zc.Out {
		// 20 毫毛读取一次数据
		fmt.Printf("读取入数据:%v, chan长度:%v, Buf长度:%v \n", v, zc.Len(), zc.BufLen())
		time.Sleep(20 * time.Millisecond)
	}
}

参考文献:
[1] https://blog.csdn.net/qq_39382769/article/details/122423070
[2] https://colobu.com/2021/05/11/unbounded-channel-in-go/