package matches // import nocternity.net/gomonop/pkg/matches import ( "bufio" "context" "os" ) // readerPipes encapsulates the two channels used to communicate data and state // from the functions that acquire the data. type readerPipes struct { done chan error data chan string } // makeReaderPipes initializes the channels. func makeReaderPipes() *readerPipes { return &readerPipes{ done: make(chan error), data: make(chan string), } } // close closes the channels. func (rdp *readerPipes) close() { close(rdp.done) close(rdp.data) } // readFromFile starts a goroutine that reads from the file and sends each line // to the dataPipe. func (pluginInst *matchesPlugin) readFromFile(ctx context.Context, pipes *readerPipes) { go func() { file, err := os.Open(pluginInst.dataSource) if err != nil { pipes.done <- err return } defer file.Close() scanner := bufio.NewScanner(file) for scanner.Scan() { select { case <-ctx.Done(): pipes.done <- ctx.Err() return case pipes.data <- scanner.Text(): } } pipes.done <- scanner.Err() }() } // startReading starts reading data from either a file or a program. func (pluginInst *matchesPlugin) startReading(ctx context.Context, pipes *readerPipes) { if pluginInst.isFile { pluginInst.readFromFile(ctx, pipes) } else { pluginInst.readFromProgram(ctx, pipes) } } // processLine processes a single line obtained from the program or file. func (pluginInst *matchesPlugin) processLine(line string) { hadMatch := false for index := range pluginInst.matches { if pluginInst.matches[index].matches(line) { pluginInst.counters[index]++ hadMatch = true } } if !hadMatch { pluginInst.unmatchedLines++ } } // processData processes data from either the program or the file. func (pluginInst *matchesPlugin) processData() error { ctx, cancel := pluginInst.makeContext() defer cancel() pipes := makeReaderPipes() defer pipes.close() pluginInst.startReading(ctx, pipes) for { select { case line := <-pipes.data: pluginInst.processLine(line) case err := <-pipes.done: return err } } }