refactor exporter code for better readability and structure

This commit is contained in:
2025-10-23 11:28:43 +02:00
parent 5184c3a03e
commit 3be1a6be5d
3 changed files with 103 additions and 74 deletions

View File

@@ -11,33 +11,85 @@ import (
"github.com/ncw/directio" "github.com/ncw/directio"
) )
func report(err error, fd *os.File) bool { // our primary container for the io checks
slog.Debug("failed to check io", "error", err) type Exporter struct {
conf *Config
if fd != nil { alloc *Alloc
if err := fd.Close(); err != nil { metrics *Metrics
slog.Debug("failed to close filehandle", "error", err)
}
}
return false
} }
// Calls runcheck* with timeout type Result struct {
func runExporter(file string, alloc *Alloc, timeout time.Duration, op int) bool { result bool
elapsed float64
}
func NewExporter(conf *Config, alloc *Alloc, metrics *Metrics) *Exporter {
return &Exporter{
conf: conf,
alloc: alloc,
metrics: metrics,
}
}
// starts the primary go-routine, which will run the io checks for ever
func (exp *Exporter) RunIOchecks() {
go func() {
for {
var res_r, res_w Result
exp.alloc.Clean()
if exp.conf.WriteMode {
res_w = exp.measure(O_W)
slog.Debug("elapsed write time", "elapsed", res_w.elapsed, "result", res_w.result)
}
if exp.conf.ReadMode {
res_r = exp.measure(O_R)
slog.Debug("elapsed read time", "elapsed", res_r.elapsed, "result", res_r.result)
}
if exp.conf.WriteMode && exp.conf.ReadMode {
if !exp.alloc.Compare() {
res_r.result = false
}
}
exp.metrics.Set(res_r, res_w)
time.Sleep(time.Duration(exp.conf.Sleeptime) * time.Second)
}
}()
}
// call an io measurement and collect time needed
func (exp *Exporter) measure(mode int) Result {
start := time.Now()
result := exp.runExporter(mode)
// ns => s
now := time.Now()
elapsed := float64(now.Sub(start).Nanoseconds()) / 10000000000
return Result{elapsed: elapsed, result: result}
}
// Calls runcheck's with context timeout
func (exp *Exporter) runExporter(mode int) bool {
ctx := context.Background() ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, timeout) ctx, cancel := context.WithTimeout(ctx, time.Duration(exp.conf.Timeout)*time.Second)
defer cancel() defer cancel()
run := make(chan struct{}, 1) run := make(chan struct{}, 1)
var res bool var res bool
go func() { go func() {
switch op { switch mode {
case O_R: case O_R:
res = runcheck_r(file, alloc) res = exp.runcheck_r()
case O_W: case O_W:
res = runcheck_w(file, alloc) res = exp.runcheck_w()
} }
run <- struct{}{} run <- struct{}{}
}() }()
@@ -60,19 +112,19 @@ func runExporter(file string, alloc *Alloc, timeout time.Duration, op int) bool
// //
// Returns false if anything failed during that sequence, // Returns false if anything failed during that sequence,
// true otherwise. // true otherwise.
func runcheck_r(file string, alloc *Alloc) bool { func (exp *Exporter) runcheck_r() bool {
// read // read
in, err := directio.OpenFile(file, os.O_RDONLY, 0640) in, err := directio.OpenFile(exp.conf.File, os.O_RDONLY, 0640)
if err != nil { if err != nil {
report(err, nil) report(err, nil)
} }
n, err := io.ReadFull(in, alloc.readBlock) n, err := io.ReadFull(in, exp.alloc.readBlock)
if err != nil { if err != nil {
return report(err, in) return report(err, in)
} }
if n != len(alloc.writeBlock) { if n != len(exp.alloc.writeBlock) {
return report(errors.New("failed to read block"), in) return report(errors.New("failed to read block"), in)
} }
@@ -92,23 +144,23 @@ func runcheck_r(file string, alloc *Alloc) bool {
// //
// Returns false if anything failed during that sequence, // Returns false if anything failed during that sequence,
// true otherwise. // true otherwise.
func runcheck_w(file string, alloc *Alloc) bool { func (exp *Exporter) runcheck_w() bool {
// write // write
fd, err := directio.OpenFile(file, os.O_RDWR|os.O_TRUNC|os.O_CREATE, 0640) fd, err := directio.OpenFile(exp.conf.File, os.O_RDWR|os.O_TRUNC|os.O_CREATE, 0640)
if err != nil { if err != nil {
report(err, nil) report(err, nil)
} }
for i := 0; i < len(alloc.writeBlock); i++ { for i := 0; i < len(exp.alloc.writeBlock); i++ {
alloc.writeBlock[i] = 'A' exp.alloc.writeBlock[i] = 'A'
} }
n, err := fd.Write(alloc.writeBlock) n, err := fd.Write(exp.alloc.writeBlock)
if err != nil { if err != nil {
return report(err, fd) return report(err, fd)
} }
if n != len(alloc.writeBlock) { if n != len(exp.alloc.writeBlock) {
return report(errors.New("failed to write block"), fd) return report(errors.New("failed to write block"), fd)
} }

View File

@@ -101,25 +101,25 @@ func NewMetrics(conf *Config) *Metrics {
return metrics return metrics
} }
func (metrics *Metrics) Set(result_r, result_w bool, elapsed_r, elapsed_w float64) { func (metrics *Metrics) Set(result_r, result_w Result) {
var res float64 var res float64
switch metrics.mode { switch metrics.mode {
case O_RW: case O_RW:
if result_r && result_w { if result_r.result && result_w.result {
res = 1 res = 1
} }
case O_R: case O_R:
if result_r { if result_r.result {
res = 1 res = 1
} }
case O_W: case O_W:
if result_w { if result_w.result {
res = 1 res = 1
} }
} }
metrics.run.WithLabelValues(metrics.values...).Set(res) metrics.run.WithLabelValues(metrics.values...).Set(res)
metrics.latency_r.WithLabelValues(metrics.values...).Set(elapsed_r) metrics.latency_r.WithLabelValues(metrics.values...).Set(result_r.elapsed)
metrics.latency_w.WithLabelValues(metrics.values...).Set(elapsed_w) metrics.latency_w.WithLabelValues(metrics.values...).Set(result_w.elapsed)
} }

View File

@@ -7,11 +7,13 @@ import (
"net/http" "net/http"
"os" "os"
"strings" "strings"
"time"
"github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/client_golang/prometheus/promhttp"
) )
// Main program. starts 2 goroutines: our exporter and the http server
// for the prometheus metrics. The exporter reports measurement
// results to prometheus metrics directly
func Run() { func Run() {
conf, err := InitConfig(os.Stdout) conf, err := InitConfig(os.Stdout)
if err != nil { if err != nil {
@@ -23,62 +25,37 @@ func Run() {
os.Exit(0) os.Exit(0)
} }
metrics := NewMetrics(conf)
alloc := NewAlloc()
setLogger(os.Stdout, conf.Debug) setLogger(os.Stdout, conf.Debug)
go func() { metrics := NewMetrics(conf)
for { alloc := NewAlloc()
var result_r, result_w bool exporter := NewExporter(conf, alloc, metrics)
var elapsed_w, elapsed_r float64
alloc.Clean() exporter.RunIOchecks()
if conf.WriteMode {
elapsed_w, result_w = measure(conf.File, alloc, conf.Timeout, O_W)
slog.Debug("elapsed write time", "elapsed", elapsed_w, "result", result_w)
}
if conf.ReadMode {
elapsed_r, result_r = measure(conf.File, alloc, conf.Timeout, O_R)
slog.Debug("elapsed read time", "elapsed", elapsed_r, "result", result_r)
}
if conf.WriteMode && conf.ReadMode {
if !alloc.Compare() {
result_r = false
}
}
metrics.Set(result_r, result_w, elapsed_r, elapsed_w)
time.Sleep(time.Duration(conf.Sleeptime) * time.Second)
}
}()
http.Handle("/metrics", promhttp.HandlerFor( http.Handle("/metrics", promhttp.HandlerFor(
metrics.registry, metrics.registry,
promhttp.HandlerOpts{}, promhttp.HandlerOpts{},
)) ))
slog.Info("start testing and serving metrics on localhost", "port", conf.Port) slog.Info("io-exporter starting up", "version", Version)
slog.Info("test setup", "file", conf.File, "labels", strings.Join(conf.Label, ",")) slog.Info(" serving metrics", "host", "localhost", "port", conf.Port)
slog.Info("measuring", "read", conf.ReadMode, "write", conf.WriteMode, "timeout(s)", conf.Timeout) slog.Info(" test setup", "file", conf.File, "labels", strings.Join(conf.Label, ","))
slog.Info(" measuring", "read", conf.ReadMode, "write", conf.WriteMode, "timeout(s)", conf.Timeout)
if err := http.ListenAndServe(fmt.Sprintf(":%d", conf.Port), nil); err != nil { if err := http.ListenAndServe(fmt.Sprintf(":%d", conf.Port), nil); err != nil {
log.Fatal(err) log.Fatal(err)
} }
} }
func measure(file string, alloc *Alloc, timeout int, mode int) (float64, bool) { func report(err error, fd *os.File) bool {
start := time.Now() slog.Debug("failed to check io", "error", err)
result := runExporter(file, alloc, time.Duration(timeout)*time.Second, mode) if fd != nil {
if err := fd.Close(); err != nil {
slog.Debug("failed to close filehandle", "error", err)
}
}
// ns => s return false
now := time.Now()
elapsed := float64(now.Sub(start).Nanoseconds()) / 10000000000
return elapsed, result
} }