Go io 流管道连接

Go 一系列的 io 操作的接口非常棒,可以将一切东西归为 Read()Write(),以及其他少数几个接口。完美诠释了 Linux 万物皆文件的思想
(对比其他语言,虽然也是同样的思路,但是还暴露的更多的接口,反而增大了使用复杂度)

在一些场景下,需要实现将两个流拼接起来,比如提供一个服务器上的 gRPC 服务,可以通过 K8S 或是 Docker 的接口,连接到容器内执行命令(exec)。Docker 本身提供的是一个 Reader & Writer 的接口,CRI 接口则要求的是 Reader、Writer 流。

而 gRPC 流接口则给出的是被阻塞的 Message。
因此需要将 gRPC 的数据转换为流,目前内置的各种相关功能都无法满足需要:

  • bytes.Buffer: 读完缓冲区就会直接结束(这时可能 gRPC 还没开始写入)
  • io.Copy: 复制到 EOF 就会结束
  • io.Pipe: 返回的是新的 Reader 和 Writer,无法直接连接 Reader 和 Writer

因此,应该仿照 io.Pipe 实现一个可以将 Write()Read() 函数连接起来,同时可以通过 ReadFrom()WriteTo() 将其连接到其他 Reader 和 Writer 中

使用样例

两个流直接连接

将 stdin 连接到 stdout

pipe.RWPipe(stdin, stdout)

将输入流同步到其他流中

stdin := pipe.New()
go stdin.WriteTo(os.Stdout)

将非流数据转换为流

in := pipe.New()
go in.WriteTo(os.Stdout)
for {
    msg := getMessage()
    in.Write(msg)
}

将流数据转换为非流

out := pipe.New()
go out.ReadFrom(os.Stdin)
reader := bufio.NewReader(out)
for {
    b, err := reader.ReadBytes('\n')
    send(b)
}

代码

代码中 WriteTo()ReadFrom() 需要在单独的 goroutine 中运行

package pipe

import (
	"io"
	"sync"
)

func RWPipe(r io.Reader, w io.Writer) (n int64, err error) {
	buf := make([]byte, 512)
	var n1, n2 int
	for {
		n1, err = r.Read(buf)
		if err != nil {
			break
		}
		n2, err = w.Write(buf[:n1])
		if err != nil {
			break
		}
		n += int64(n2)
	}
	return
}

type Pipe struct {
	wrMu sync.Mutex
	wrCh chan []byte
	rdCh chan int

	once sync.Once
	done chan struct{}
}

func New() *Pipe {
	return &Pipe{
		wrMu: sync.Mutex{},
		wrCh: make(chan []byte),
		rdCh: make(chan int),
		once: sync.Once{},
		done: make(chan struct{}),
	}
}

func (p *Pipe) Read(b []byte) (int, error) {
	select {
	case bw := <-p.wrCh:
		nr := copy(b, bw)
		p.rdCh <- nr
		return nr, nil
	case <-p.done:
		return 0, io.EOF
	}
}

func (p *Pipe) Write(b []byte) (int, error) {
	select {
	case <-p.done:
		return 0, io.EOF
	default:
		p.wrMu.Lock()
		defer p.wrMu.Unlock()
	}

	n := 0
	for once := true; once || len(b) > 0; once = false {
		select {
		case p.wrCh <- b:
			nw := <-p.rdCh
			b = b[nw:]
			n += nw
		case <-p.done:
			return n, io.EOF
		}
	}
	return n, nil
}

func (p *Pipe) ReadFrom(r io.Reader) (n int64, err error) {
	defer p.Close()
	return RWPipe(r, p)
}

func (p *Pipe) WriteTo(w io.Writer) (n int64, err error) {
	defer p.Close()
	return RWPipe(p, w)
}

func (p *Pipe) Close() { p.once.Do(func() { close(p.done) }) }