Files
mattermost/plugin/rpcplugin/io.go
Chris 402491b7e5 PLT-7407: Back-end plugins (#7409)
* tie back-end plugins together

* fix comment typo

* add tests and a bit of polish

* tests and polish

* add test, don't let backend executable paths escape the plugin directory
2017-09-11 10:02:02 -05:00

211 lines
3.7 KiB
Go

package rpcplugin
import (
"bufio"
"bytes"
"encoding/binary"
"io"
"sync"
)
type asyncRead struct {
b []byte
err error
}
type asyncReadCloser struct {
io.ReadCloser
buffer bytes.Buffer
read chan struct{}
reads chan asyncRead
close chan struct{}
closeOnce sync.Once
}
// NewAsyncReadCloser returns a ReadCloser that supports Close during Read.
func NewAsyncReadCloser(r io.ReadCloser) io.ReadCloser {
ret := &asyncReadCloser{
ReadCloser: r,
read: make(chan struct{}),
reads: make(chan asyncRead),
close: make(chan struct{}),
}
go ret.loop()
return ret
}
func (r *asyncReadCloser) loop() {
buf := make([]byte, 1024*8)
var n int
var err error
for {
select {
case <-r.read:
n = 0
if err == nil {
n, err = r.ReadCloser.Read(buf)
}
select {
case r.reads <- asyncRead{buf[:n], err}:
case <-r.close:
}
case <-r.close:
r.ReadCloser.Close()
return
}
}
}
func (r *asyncReadCloser) Read(b []byte) (int, error) {
if r.buffer.Len() > 0 {
return r.buffer.Read(b)
}
select {
case r.read <- struct{}{}:
case <-r.close:
}
select {
case read := <-r.reads:
if read.err != nil {
return 0, read.err
}
n := copy(b, read.b)
if n < len(read.b) {
r.buffer.Write(read.b[n:])
}
return n, nil
case <-r.close:
return 0, io.EOF
}
}
func (r *asyncReadCloser) Close() error {
r.closeOnce.Do(func() {
close(r.close)
})
return nil
}
type asyncWrite struct {
n int
err error
}
type asyncWriteCloser struct {
io.WriteCloser
writeBuffer bytes.Buffer
write chan struct{}
writes chan asyncWrite
close chan struct{}
closeOnce sync.Once
}
// NewAsyncWriteCloser returns a WriteCloser that supports Close during Write.
func NewAsyncWriteCloser(w io.WriteCloser) io.WriteCloser {
ret := &asyncWriteCloser{
WriteCloser: w,
write: make(chan struct{}),
writes: make(chan asyncWrite),
close: make(chan struct{}),
}
go ret.loop()
return ret
}
func (w *asyncWriteCloser) loop() {
var n int64
var err error
for {
select {
case <-w.write:
n = 0
if err == nil {
n, err = w.writeBuffer.WriteTo(w.WriteCloser)
}
select {
case w.writes <- asyncWrite{int(n), err}:
case <-w.close:
}
case <-w.close:
w.WriteCloser.Close()
return
}
}
}
func (w *asyncWriteCloser) Write(b []byte) (int, error) {
if n, err := w.writeBuffer.Write(b); err != nil {
return n, err
}
select {
case w.write <- struct{}{}:
case <-w.close:
}
select {
case write := <-w.writes:
return write.n, write.err
case <-w.close:
return 0, io.EOF
}
}
func (w *asyncWriteCloser) Close() error {
w.closeOnce.Do(func() {
close(w.close)
})
return nil
}
type rwc struct {
io.ReadCloser
io.WriteCloser
}
func (rwc *rwc) Close() (err error) {
err = rwc.WriteCloser.Close()
if rerr := rwc.ReadCloser.Close(); err == nil {
err = rerr
}
return
}
func NewReadWriteCloser(r io.ReadCloser, w io.WriteCloser) io.ReadWriteCloser {
return &rwc{r, w}
}
type RemoteIOReader struct {
conn io.ReadWriteCloser
}
func (r *RemoteIOReader) Read(b []byte) (int, error) {
var buf [10]byte
n := binary.PutVarint(buf[:], int64(len(b)))
if _, err := r.conn.Write(buf[:n]); err != nil {
return 0, err
}
return r.conn.Read(b)
}
func (r *RemoteIOReader) Close() error {
return r.conn.Close()
}
func ConnectIOReader(conn io.ReadWriteCloser) io.ReadCloser {
return &RemoteIOReader{conn}
}
func ServeIOReader(r io.Reader, conn io.ReadWriteCloser) {
cr := bufio.NewReader(conn)
defer conn.Close()
buf := make([]byte, 32*1024)
for {
n, err := binary.ReadVarint(cr)
if err != nil {
break
}
if written, err := io.CopyBuffer(conn, io.LimitReader(r, n), buf); err != nil || written < n {
break
}
}
}