package matches // import nocternity.net/gomonop/pkg/matches import ( "bufio" "context" "io" "os/exec" ) // streamObtainer refers to functions that may return either an input stream or // an error. type streamObtainer func() (io.ReadCloser, error) // pipeCopy encapsulates the structures that are used to transfer data from a // single stream to the reader. type pipeCopy struct { reader io.ReadCloser // The input stream being read from dataPipe chan string // Channel that receives the data we read aborter chan struct{} // Channel that causes the copy to abort readResult chan error // Channel that returns results from reading } // startPipeCopy starts a goroutine that copies data from the reader to the // dataPipe. func startPipeCopy(obtainer streamObtainer, pipes *readerPipes) *pipeCopy { stream, err := obtainer() if err != nil { pipes.done <- err return nil } pipeCopyData := &pipeCopy{ reader: stream, dataPipe: pipes.data, aborter: make(chan struct{}), readResult: make(chan error), } go pipeCopyData.run() return pipeCopyData } // run runs the copy operation from the input stream to the data pipe. It is // meant to be executed as a goroutine. func (pc *pipeCopy) run() { scanner := bufio.NewScanner(pc.reader) for scanner.Scan() { select { case <-pc.aborter: return case pc.dataPipe <- scanner.Text(): } } pc.readResult <- scanner.Err() } // close closes the stream and channels used by a copy pipe. func (pc *pipeCopy) close() { pc.abort() _ = pc.reader.Close() close(pc.aborter) close(pc.readResult) } // abort causes the pipe copy to be aborted. func (pc *pipeCopy) abort() { pc.aborter <- struct{}{} } // readFromProgram starts a goroutine that controls the program, sending lines // from both stderr and stdout to the dataPipe. func (pluginInst *matchesPlugin) readFromProgram(ctx context.Context, pipes *readerPipes) { go func() { cmd := exec.Command(pluginInst.dataSource) //nolint:gosec // Command is in fact user-provided outs := startPipeCopy(func() (io.ReadCloser, error) { return cmd.StdoutPipe() }, pipes) defer outs.close() errs := startPipeCopy(func() (io.ReadCloser, error) { return cmd.StderrPipe() }, pipes) defer errs.close() if err := cmd.Start(); err != nil { pipes.done <- err return } abort := func(err error) { _ = cmd.Process.Kill() pipes.done <- err } errComplete := false outComplete := false for !(errComplete && outComplete) { select { case <-ctx.Done(): abort(ctx.Err()) return case err := <-outs.readResult: if err != nil { abort(err) return } outComplete = true case err := <-errs.readResult: if err != nil { abort(err) return } errComplete = true } } pipes.done <- cmd.Wait() }() }