refactor(matches): split the external program handling code into more functions

This commit is contained in:
Emmanuel BENOîT 2024-07-20 21:51:01 +02:00
parent 09df5aa44e
commit 2b58a27cc0
Signed by: Emmanuel BENOîT
SSH key fingerprint: SHA256:l7PFUUF5TCDsvYeQC9OnTNz08dFY7Fvf4Hv3neIqYpg
4 changed files with 336 additions and 239 deletions

116
cmd/matches/extprogram.go Normal file
View file

@ -0,0 +1,116 @@
package matches // import nocternity.net/gomonop/pkg/matches
import (
"bufio"
"context"
"io"
"os/exec"
)
// streamObtainer refers to functions that may return either an input stream or
// an error.
type streamObtainer func() (io.ReadCloser, error)
// pipeCopy encapsulates the structures that are used to transfer data from a
// single stream to the reader.
type pipeCopy struct {
reader io.ReadCloser // The input stream being read from
dataPipe chan string // Channel that receives the data we read
aborter chan struct{} // Channel that causes the copy to abort
readResult chan error // Channel that returns results from reading
}
// startPipeCopy starts a goroutine that copies data from the reader to the
// dataPipe.
func startPipeCopy(obtainer streamObtainer, pipes *readerPipes) *pipeCopy {
stream, err := obtainer()
if err != nil {
pipes.done <- err
return nil
}
pipeCopyData := &pipeCopy{
reader: stream,
dataPipe: pipes.data,
aborter: make(chan struct{}),
readResult: make(chan error),
}
go pipeCopyData.run()
return pipeCopyData
}
// run runs the copy operation from the input stream to the data pipe. It is
// meant to be executed as a goroutine.
func (pc *pipeCopy) run() {
scanner := bufio.NewScanner(pc.reader)
for scanner.Scan() {
select {
case <-pc.aborter:
return
case pc.dataPipe <- scanner.Text():
}
}
pc.readResult <- scanner.Err()
}
// close closes the stream and channels used by a copy pipe.
func (pc *pipeCopy) close() {
pc.abort()
_ = pc.reader.Close()
close(pc.aborter)
close(pc.readResult)
}
// abort causes the pipe copy to be aborted.
func (pc *pipeCopy) abort() {
pc.aborter <- struct{}{}
}
// readFromProgram starts a goroutine that controls the program, sending lines
// from both stderr and stdout to the dataPipe.
func (pluginInst *matchesPlugin) readFromProgram(ctx context.Context, pipes *readerPipes) {
go func() {
cmd := exec.Command(pluginInst.dataSource) //nolint:gosec // Command is in fact user-provided
outs := startPipeCopy(func() (io.ReadCloser, error) { return cmd.StdoutPipe() }, pipes)
defer outs.close()
errs := startPipeCopy(func() (io.ReadCloser, error) { return cmd.StderrPipe() }, pipes)
defer errs.close()
if err := cmd.Start(); err != nil {
pipes.done <- err
return
}
abort := func(err error) {
_ = cmd.Process.Kill()
pipes.done <- err
}
errComplete := false
outComplete := false
for !(errComplete && outComplete) {
select {
case <-ctx.Done():
abort(ctx.Err())
return
case err := <-outs.readResult:
if err != nil {
abort(err)
return
}
outComplete = true
case err := <-errs.readResult:
if err != nil {
abort(err)
return
}
errComplete = true
}
}
pipes.done <- cmd.Wait()
}()
}

View file

@ -5,17 +5,8 @@
package matches // import nocternity.net/gomonop/cmd/matches package matches // import nocternity.net/gomonop/cmd/matches
import ( import (
"bufio"
"context"
"fmt"
"io"
"os"
"os/exec"
"regexp" "regexp"
"strconv"
"strings"
"nocternity.net/gomonop/pkg/perfdata"
"nocternity.net/gomonop/pkg/plugin" "nocternity.net/gomonop/pkg/plugin"
"nocternity.net/gomonop/pkg/results" "nocternity.net/gomonop/pkg/results"
"nocternity.net/gomonop/pkg/status" "nocternity.net/gomonop/pkg/status"
@ -77,236 +68,6 @@ func (pluginInst *matchesPlugin) CheckArguments() bool {
return true return true
} }
// readFromFile starts a goroutine that reads from the file and sends each line
// to the dataPipe.
func (pluginInst *matchesPlugin) readFromFile(ctx context.Context, donePipe chan error, dataPipe chan string) {
go func() {
file, err := os.Open(pluginInst.dataSource)
if err != nil {
donePipe <- err
return
}
defer file.Close()
scanner := bufio.NewScanner(file)
for scanner.Scan() {
select {
case <-ctx.Done():
donePipe <- ctx.Err()
return
case dataPipe <- scanner.Text():
}
}
donePipe <- scanner.Err()
}()
}
// startPipeCopy starts a goroutine that copies data from the reader to the
// dataPipe.
func startPipeCopy(reader io.Reader, dataPipe chan string) (aborter chan struct{}, readError chan error) {
aborter = make(chan struct{})
readError = make(chan error)
go func() {
scanner := bufio.NewScanner(reader)
for scanner.Scan() {
select {
case <-aborter:
return
case dataPipe <- scanner.Text():
}
}
readError <- scanner.Err()
}()
return
}
// readFromProgram starts a goroutine that controls the program, sending lines
// from both stderr and stdout to the dataPipe.
func (pluginInst *matchesPlugin) readFromProgram(ctx context.Context, donePipe chan error, dataPipe chan string) {
go func() {
cmd := exec.Command(pluginInst.dataSource) //nolint:gosec // Command is in fact user-provided
stdout, err := cmd.StdoutPipe()
if err != nil {
donePipe <- err
return
}
stderr, err := cmd.StderrPipe()
if err != nil {
donePipe <- err
return
}
if err := cmd.Start(); err != nil {
donePipe <- err
return
}
outAborter, outReadError := startPipeCopy(stdout, dataPipe)
errAborter, errReadError := startPipeCopy(stderr, dataPipe)
defer func() {
_ = stdout.Close()
_ = stderr.Close()
close(outAborter)
close(errAborter)
close(outReadError)
close(errReadError)
}()
abort := func(err error) {
outAborter <- struct{}{}
errAborter <- struct{}{}
_ = cmd.Process.Kill()
donePipe <- err
}
errComplete := false
outComplete := false
for !(errComplete && outComplete) {
select {
case <-ctx.Done():
abort(ctx.Err())
return
case err := <-outReadError:
if err != nil {
abort(err)
return
}
outComplete = true
case err := <-errReadError:
if err != nil {
abort(err)
return
}
errComplete = true
}
}
donePipe <- cmd.Wait()
}()
}
// startReading starts reading data from either a file or a program.
func (pluginInst *matchesPlugin) startReading(ctx context.Context, donePipe chan error, dataPipe chan string) {
if pluginInst.isFile {
pluginInst.readFromFile(ctx, donePipe, dataPipe)
} else {
pluginInst.readFromProgram(ctx, donePipe, dataPipe)
}
}
// processLine processes a single line obtained from the program or file.
func (pluginInst *matchesPlugin) processLine(line string) {
hadMatch := false
for index := range pluginInst.matches {
if pluginInst.matches[index].matches(line) {
pluginInst.counters[index]++
hadMatch = true
}
}
if !hadMatch {
pluginInst.unmatchedLines++
}
}
// processData processes data from either the program or the file.
func (pluginInst *matchesPlugin) processData() error {
ctx, cancel := pluginInst.makeContext()
defer cancel()
donePipe := make(chan error)
dataPipe := make(chan string)
pluginInst.startReading(ctx, donePipe, dataPipe)
defer func() {
close(donePipe)
close(dataPipe)
}()
for {
select {
case line := <-dataPipe:
pluginInst.processLine(line)
case err := <-donePipe:
return err
}
}
}
// checkResults checks the various counters against their configured thresholds,
// and the strict mode failure count.
func (pluginInst *matchesPlugin) checkResults(readError error) {
nWarns, nCrits, nUnknowns := 0, 0, 0
for index := range pluginInst.counters {
config := &pluginInst.matches[index]
var nature string
if config.isRegexp {
nature = "regexp"
} else {
nature = "substring"
}
label := fmt.Sprintf("#%2d : %s %s", index+1, nature, config.matchString)
value := strconv.Itoa(pluginInst.counters[index])
pdat := perfdata.New(label, perfdata.UomNone, value)
pdat.SetWarn(config.warn)
pdat.SetCrit(config.crit)
pluginInst.results.AddPerfData(pdat)
switch pdat.GetStatus() {
case status.StatusCritical:
nCrits++
case status.StatusWarning:
nWarns++
case status.StatusUnknown:
nUnknowns++
}
}
umlPdat := perfdata.New("unmatched", perfdata.UomNone, strconv.Itoa(pluginInst.unmatchedLines))
if pluginInst.strict {
umlPdat.SetCrit(perfdata.RangeMinMax("~", "0"))
}
switch umlPdat.GetStatus() {
case status.StatusCritical:
nCrits++
}
pluginInst.results.AddPerfData(umlPdat)
problems := []string{}
if nCrits > 0 {
problems = append(problems, fmt.Sprintf("%d value(s) critical", nCrits))
}
if nWarns > 0 {
problems = append(problems, fmt.Sprintf("%d value(s) warning", nWarns))
}
if nUnknowns > 0 {
problems = append(problems, fmt.Sprintf("%d value(s) unknown", nUnknowns))
}
if len(problems) == 0 {
problems = append(problems, "No match errors")
}
if readError != nil {
pluginInst.results.AddLinef("Error while reading data: %s", readError.Error())
nUnknowns++
}
var finalStatus status.Status
switch {
case nCrits > 0:
finalStatus = status.StatusCritical
case nWarns > 0:
finalStatus = status.StatusWarning
case nUnknowns > 0:
finalStatus = status.StatusUnknown
default:
finalStatus = status.StatusOK
}
pluginInst.results.SetState(finalStatus, strings.Join(problems, ", "))
}
// Run the check. // Run the check.
func (pluginInst *matchesPlugin) RunCheck() { func (pluginInst *matchesPlugin) RunCheck() {
pluginInst.counters = make([]int, len(pluginInst.matches)) pluginInst.counters = make([]int, len(pluginInst.matches))

95
cmd/matches/reader.go Normal file
View file

@ -0,0 +1,95 @@
package matches // import nocternity.net/gomonop/pkg/matches
import (
"bufio"
"context"
"os"
)
// readerPipes encapsulates the two channels used to communicate data and state
// from the functions that acquire the data.
type readerPipes struct {
done chan error
data chan string
}
// makeReaderPipes initializes the channels.
func makeReaderPipes() *readerPipes {
return &readerPipes{
done: make(chan error),
data: make(chan string),
}
}
// close closes the channels.
func (rdp *readerPipes) close() {
close(rdp.done)
close(rdp.data)
}
// readFromFile starts a goroutine that reads from the file and sends each line
// to the dataPipe.
func (pluginInst *matchesPlugin) readFromFile(ctx context.Context, pipes *readerPipes) {
go func() {
file, err := os.Open(pluginInst.dataSource)
if err != nil {
pipes.done <- err
return
}
defer file.Close()
scanner := bufio.NewScanner(file)
for scanner.Scan() {
select {
case <-ctx.Done():
pipes.done <- ctx.Err()
return
case pipes.data <- scanner.Text():
}
}
pipes.done <- scanner.Err()
}()
}
// startReading starts reading data from either a file or a program.
func (pluginInst *matchesPlugin) startReading(ctx context.Context, pipes *readerPipes) {
if pluginInst.isFile {
pluginInst.readFromFile(ctx, pipes)
} else {
pluginInst.readFromProgram(ctx, pipes)
}
}
// processLine processes a single line obtained from the program or file.
func (pluginInst *matchesPlugin) processLine(line string) {
hadMatch := false
for index := range pluginInst.matches {
if pluginInst.matches[index].matches(line) {
pluginInst.counters[index]++
hadMatch = true
}
}
if !hadMatch {
pluginInst.unmatchedLines++
}
}
// processData processes data from either the program or the file.
func (pluginInst *matchesPlugin) processData() error {
ctx, cancel := pluginInst.makeContext()
defer cancel()
pipes := makeReaderPipes()
defer pipes.close()
pluginInst.startReading(ctx, pipes)
for {
select {
case line := <-pipes.data:
pluginInst.processLine(line)
case err := <-pipes.done:
return err
}
}
}

125
cmd/matches/results.go Normal file
View file

@ -0,0 +1,125 @@
package matches // import nocternity.net/gomonop/cmd/matches
import (
"fmt"
"strconv"
"strings"
"nocternity.net/gomonop/pkg/perfdata"
"nocternity.net/gomonop/pkg/status"
)
// resultsInfo contains information about results being processed.
type resultsInfo struct {
nWarns, nCrits, nUnknowns uint
}
// updateResults updates the result counters based on the specified status.
func (info *resultsInfo) updateFrom(itemStatus status.Status) {
switch itemStatus {
case status.StatusCritical:
info.nCrits++
case status.StatusWarning:
info.nWarns++
case status.StatusUnknown:
info.nUnknowns++
case status.StatusOK:
// do nothing
}
}
// toProblems generate the list of problems found while running the plugin,
// as a string which can be returned to the monitoring server.
func (info *resultsInfo) toProblems() string {
problems := []string{}
if info.nCrits > 0 {
problems = append(problems,
fmt.Sprintf("%d value(s) critical", info.nCrits),
)
}
if info.nWarns > 0 {
problems = append(problems,
fmt.Sprintf("%d value(s) warning", info.nWarns),
)
}
if info.nUnknowns > 0 {
problems = append(problems,
fmt.Sprintf("%d value(s) unknown", info.nUnknowns),
)
}
if len(problems) == 0 {
problems = append(problems, "No match errors")
}
return strings.Join(problems, ", ")
}
// toStatus computes the final status of the plugin based on the status
// counters.
func (info *resultsInfo) toStatus() status.Status {
switch {
case info.nCrits > 0:
return status.StatusCritical
case info.nWarns > 0:
return status.StatusWarning
case info.nUnknowns > 0:
return status.StatusUnknown
default:
return status.StatusOK
}
}
// checkCounters checks the match counters against their configured thresholds,
// if any, and adds the corresponding perf data to the result. It initializes a
// resultsInfo structure containing the counts of critical, warning and unknown
// statuses gathered so far.
func (pluginInst *matchesPlugin) checkCounters() resultsInfo {
info := resultsInfo{}
for index := range pluginInst.counters {
config := &pluginInst.matches[index]
var nature string
if config.isRegexp {
nature = "regexp"
} else {
nature = "substring"
}
label := fmt.Sprintf("#%2d : %s %s", index+1, nature, config.matchString)
value := strconv.Itoa(pluginInst.counters[index])
pdat := perfdata.New(label, perfdata.UomNone, value)
pdat.SetWarn(config.warn)
pdat.SetCrit(config.crit)
pluginInst.results.AddPerfData(pdat)
info.updateFrom(pdat.GetStatus())
}
return info
}
// checkUnmatched checks the unmatched lines counters, applying a limit if
// strict mode is enabled. The corresponding performance data is added to the
// result.
func (pluginInst *matchesPlugin) checkUnmatched(info *resultsInfo) {
umlPdat := perfdata.New("unmatched", perfdata.UomNone, strconv.Itoa(pluginInst.unmatchedLines))
if pluginInst.strict {
umlPdat.SetCrit(perfdata.RangeMinMax("~", "0"))
}
info.updateFrom(umlPdat.GetStatus())
pluginInst.results.AddPerfData(umlPdat)
}
// checkResults checks the various counters against their configured thresholds,
// and the strict mode failure count.
func (pluginInst *matchesPlugin) checkResults(readError error) {
info := pluginInst.checkCounters()
pluginInst.checkUnmatched(&info)
if readError != nil {
pluginInst.results.AddLinef("Error while reading data: %s", readError.Error())
info.nUnknowns++
}
pluginInst.results.SetState(info.toStatus(), info.toProblems())
}