Refactor exporter (#3)

* refactor exporter code for better readability and structure
* modified startup log output a little
This commit is contained in:
T.v.Dein
2025-10-23 12:14:33 +02:00
committed by GitHub
parent 5184c3a03e
commit 99222b1cae
4 changed files with 107 additions and 75 deletions

View File

@@ -15,7 +15,7 @@ import (
) )
const ( const (
Version = `v0.0.5` Version = `v0.0.6`
SLEEP = 5 SLEEP = 5
Usage = `io-exporter [options] <file> Usage = `io-exporter [options] <file>
Options: Options:

View File

@@ -11,33 +11,85 @@ import (
"github.com/ncw/directio" "github.com/ncw/directio"
) )
func report(err error, fd *os.File) bool { // our primary container for the io checks
slog.Debug("failed to check io", "error", err) type Exporter struct {
conf *Config
if fd != nil { alloc *Alloc
if err := fd.Close(); err != nil { metrics *Metrics
slog.Debug("failed to close filehandle", "error", err)
}
}
return false
} }
// Calls runcheck* with timeout type Result struct {
func runExporter(file string, alloc *Alloc, timeout time.Duration, op int) bool { result bool
elapsed float64
}
func NewExporter(conf *Config, alloc *Alloc, metrics *Metrics) *Exporter {
return &Exporter{
conf: conf,
alloc: alloc,
metrics: metrics,
}
}
// starts the primary go-routine, which will run the io checks for ever
func (exp *Exporter) RunIOchecks() {
go func() {
for {
var res_r, res_w Result
exp.alloc.Clean()
if exp.conf.WriteMode {
res_w = exp.measure(O_W)
slog.Debug("elapsed write time", "elapsed", res_w.elapsed, "result", res_w.result)
}
if exp.conf.ReadMode {
res_r = exp.measure(O_R)
slog.Debug("elapsed read time", "elapsed", res_r.elapsed, "result", res_r.result)
}
if exp.conf.WriteMode && exp.conf.ReadMode {
if !exp.alloc.Compare() {
res_r.result = false
}
}
exp.metrics.Set(res_r, res_w)
time.Sleep(time.Duration(exp.conf.Sleeptime) * time.Second)
}
}()
}
// call an io measurement and collect time needed
func (exp *Exporter) measure(mode int) Result {
start := time.Now()
result := exp.runExporter(mode)
// ns => s
now := time.Now()
elapsed := float64(now.Sub(start).Nanoseconds()) / 10000000000
return Result{elapsed: elapsed, result: result}
}
// Calls runcheck's with context timeout
func (exp *Exporter) runExporter(mode int) bool {
ctx := context.Background() ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, timeout) ctx, cancel := context.WithTimeout(ctx, time.Duration(exp.conf.Timeout)*time.Second)
defer cancel() defer cancel()
run := make(chan struct{}, 1) run := make(chan struct{}, 1)
var res bool var res bool
go func() { go func() {
switch op { switch mode {
case O_R: case O_R:
res = runcheck_r(file, alloc) res = exp.runcheck_r()
case O_W: case O_W:
res = runcheck_w(file, alloc) res = exp.runcheck_w()
} }
run <- struct{}{} run <- struct{}{}
}() }()
@@ -60,19 +112,19 @@ func runExporter(file string, alloc *Alloc, timeout time.Duration, op int) bool
// //
// Returns false if anything failed during that sequence, // Returns false if anything failed during that sequence,
// true otherwise. // true otherwise.
func runcheck_r(file string, alloc *Alloc) bool { func (exp *Exporter) runcheck_r() bool {
// read // read
in, err := directio.OpenFile(file, os.O_RDONLY, 0640) in, err := directio.OpenFile(exp.conf.File, os.O_RDONLY, 0640)
if err != nil { if err != nil {
report(err, nil) report(err, nil)
} }
n, err := io.ReadFull(in, alloc.readBlock) n, err := io.ReadFull(in, exp.alloc.readBlock)
if err != nil { if err != nil {
return report(err, in) return report(err, in)
} }
if n != len(alloc.writeBlock) { if n != len(exp.alloc.writeBlock) {
return report(errors.New("failed to read block"), in) return report(errors.New("failed to read block"), in)
} }
@@ -92,23 +144,23 @@ func runcheck_r(file string, alloc *Alloc) bool {
// //
// Returns false if anything failed during that sequence, // Returns false if anything failed during that sequence,
// true otherwise. // true otherwise.
func runcheck_w(file string, alloc *Alloc) bool { func (exp *Exporter) runcheck_w() bool {
// write // write
fd, err := directio.OpenFile(file, os.O_RDWR|os.O_TRUNC|os.O_CREATE, 0640) fd, err := directio.OpenFile(exp.conf.File, os.O_RDWR|os.O_TRUNC|os.O_CREATE, 0640)
if err != nil { if err != nil {
report(err, nil) report(err, nil)
} }
for i := 0; i < len(alloc.writeBlock); i++ { for i := 0; i < len(exp.alloc.writeBlock); i++ {
alloc.writeBlock[i] = 'A' exp.alloc.writeBlock[i] = 'A'
} }
n, err := fd.Write(alloc.writeBlock) n, err := fd.Write(exp.alloc.writeBlock)
if err != nil { if err != nil {
return report(err, fd) return report(err, fd)
} }
if n != len(alloc.writeBlock) { if n != len(exp.alloc.writeBlock) {
return report(errors.New("failed to write block"), fd) return report(errors.New("failed to write block"), fd)
} }

View File

@@ -101,25 +101,25 @@ func NewMetrics(conf *Config) *Metrics {
return metrics return metrics
} }
func (metrics *Metrics) Set(result_r, result_w bool, elapsed_r, elapsed_w float64) { func (metrics *Metrics) Set(result_r, result_w Result) {
var res float64 var res float64
switch metrics.mode { switch metrics.mode {
case O_RW: case O_RW:
if result_r && result_w { if result_r.result && result_w.result {
res = 1 res = 1
} }
case O_R: case O_R:
if result_r { if result_r.result {
res = 1 res = 1
} }
case O_W: case O_W:
if result_w { if result_w.result {
res = 1 res = 1
} }
} }
metrics.run.WithLabelValues(metrics.values...).Set(res) metrics.run.WithLabelValues(metrics.values...).Set(res)
metrics.latency_r.WithLabelValues(metrics.values...).Set(elapsed_r) metrics.latency_r.WithLabelValues(metrics.values...).Set(result_r.elapsed)
metrics.latency_w.WithLabelValues(metrics.values...).Set(elapsed_w) metrics.latency_w.WithLabelValues(metrics.values...).Set(result_w.elapsed)
} }

View File

@@ -7,11 +7,13 @@ import (
"net/http" "net/http"
"os" "os"
"strings" "strings"
"time"
"github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/client_golang/prometheus/promhttp"
) )
// Main program. starts 2 goroutines: our exporter and the http server
// for the prometheus metrics. The exporter reports measurement
// results to prometheus metrics directly
func Run() { func Run() {
conf, err := InitConfig(os.Stdout) conf, err := InitConfig(os.Stdout)
if err != nil { if err != nil {
@@ -23,62 +25,40 @@ func Run() {
os.Exit(0) os.Exit(0)
} }
metrics := NewMetrics(conf)
alloc := NewAlloc()
setLogger(os.Stdout, conf.Debug) setLogger(os.Stdout, conf.Debug)
go func() { metrics := NewMetrics(conf)
for { alloc := NewAlloc()
var result_r, result_w bool exporter := NewExporter(conf, alloc, metrics)
var elapsed_w, elapsed_r float64
alloc.Clean() exporter.RunIOchecks()
if conf.WriteMode {
elapsed_w, result_w = measure(conf.File, alloc, conf.Timeout, O_W)
slog.Debug("elapsed write time", "elapsed", elapsed_w, "result", result_w)
}
if conf.ReadMode {
elapsed_r, result_r = measure(conf.File, alloc, conf.Timeout, O_R)
slog.Debug("elapsed read time", "elapsed", elapsed_r, "result", result_r)
}
if conf.WriteMode && conf.ReadMode {
if !alloc.Compare() {
result_r = false
}
}
metrics.Set(result_r, result_w, elapsed_r, elapsed_w)
time.Sleep(time.Duration(conf.Sleeptime) * time.Second)
}
}()
http.Handle("/metrics", promhttp.HandlerFor( http.Handle("/metrics", promhttp.HandlerFor(
metrics.registry, metrics.registry,
promhttp.HandlerOpts{}, promhttp.HandlerOpts{},
)) ))
slog.Info("start testing and serving metrics on localhost", "port", conf.Port) slog.Info(" ╭──")
slog.Info("test setup", "file", conf.File, "labels", strings.Join(conf.Label, ",")) slog.Info(" │ io-exporter starting up", "version", Version)
slog.Info("measuring", "read", conf.ReadMode, "write", conf.WriteMode, "timeout(s)", conf.Timeout) slog.Info(" │ serving metrics", "host", "localhost", "port", conf.Port)
slog.Info(" │ test setup", "file", conf.File, "labels", strings.Join(conf.Label, ","))
slog.Info(" │ measuring", "read", conf.ReadMode, "write", conf.WriteMode, "timeout(s)", conf.Timeout)
slog.Info(" │ debugging", "enabled", conf.Debug)
slog.Info(" ╰──")
if err := http.ListenAndServe(fmt.Sprintf(":%d", conf.Port), nil); err != nil { if err := http.ListenAndServe(fmt.Sprintf(":%d", conf.Port), nil); err != nil {
log.Fatal(err) log.Fatal(err)
} }
} }
func measure(file string, alloc *Alloc, timeout int, mode int) (float64, bool) { func report(err error, fd *os.File) bool {
start := time.Now() slog.Debug("failed to check io", "error", err)
result := runExporter(file, alloc, time.Duration(timeout)*time.Second, mode) if fd != nil {
if err := fd.Close(); err != nil {
slog.Debug("failed to close filehandle", "error", err)
}
}
// ns => s return false
now := time.Now()
elapsed := float64(now.Sub(start).Nanoseconds()) / 10000000000
return elapsed, result
} }