4 Commits

Author SHA1 Message Date
ad80542619 return wg pointer 2025-10-23 12:41:34 +02:00
62f5b51be8 add waitgroup for goroutine 1 2025-10-23 12:39:09 +02:00
T.v.Dein
99222b1cae Refactor exporter (#3)
* refactor exporter code for better readability and structure
* modified startup log output a little
2025-10-23 12:14:33 +02:00
T.v.Dein
5184c3a03e Separate io tests to read and write mode with separate latencies (#2) 2025-10-22 18:02:26 +02:00
6 changed files with 250 additions and 106 deletions

View File

@@ -15,7 +15,12 @@ specified via commandline.
io-exporter [options] <file>
Options:
-t --timeout <int> When should the operation timeout in seconds
-s --sleeptime <int> Time to sleep between checks (default: 5s)
-l --label <label=value> Add label to exported metric
-i --internals Also add labels about resource usage
-r --read Only execute the read test
-w --write Only execute the write test
-d --debug Enable debug log level
-h --help Show help
-v --version Show program version
```
@@ -31,14 +36,20 @@ io-exporter -l foo=bar -l blah=blubb t/blah
You'll get such metrics:
```default
# HELP io_exporter_io_latency how long does the operation take in seconds
# TYPE io_exporter_io_latency gauge
io_exporter_io_latency{file="/tmp/blah",maxwait="1",namespace="debug",pod="foo1"} 0.0001142815
# HELP io_exporter_io_operation whether io is working on the pvc, 1=ok, 0=fail
# TYPE io_exporter_io_operation gauge
io_exporter_io_operation{file="/tmp/blah",maxwait="1",namespace="debug",pod="foo1"} 1
io_exporter_io_operation{blah="blubb",exectime="1761148383705",file="t/blah",foo="bar",maxwait="1"} 1
# HELP io_exporter_io_read_latency how long does the read operation take in seconds
# TYPE io_exporter_io_read_latency gauge
io_exporter_io_read_latency{blah="blubb",exectime="1761148383705",file="t/blah",foo="bar",maxwait="1"} 0.0040411716
# HELP io_exporter_io_write_latency how long does the write operation take in seconds
# TYPE io_exporter_io_write_latency gauge
io_exporter_io_write_latency{blah="blubb",exectime="1761148383705",file="t/blah",foo="bar",maxwait="1"} 0
```
You may also restrict the exporter to only test read (`-r` flag) or
write (`-w` flag) operation.
## Installation
There are no released binaries yet.

View File

@@ -1,6 +1,11 @@
package cmd
import "github.com/ncw/directio"
import (
"bytes"
"errors"
"github.com/ncw/directio"
)
// aligned allocs used for testing
type Alloc struct {
@@ -25,3 +30,12 @@ func NewAlloc() *Alloc {
readBlock: directio.AlignedBlock(directio.BlockSize),
}
}
func (alloc *Alloc) Compare() bool {
// compare
if !bytes.Equal(alloc.writeBlock, alloc.readBlock) {
return report(errors.New("read not the same as written"), nil)
}
return true
}

View File

@@ -15,7 +15,7 @@ import (
)
const (
Version = `v0.0.4`
Version = `v0.0.6`
SLEEP = 5
Usage = `io-exporter [options] <file>
Options:
@@ -23,9 +23,15 @@ Options:
-s --sleeptime <int> Time to sleep between checks (default: 5s)
-l --label <label=value> Add label to exported metric
-i --internals Also add labels about resource usage
-r --read Only execute the read test
-w --write Only execute the write test
-d --debug Enable debug log level
-h --help Show help
-v --version Show program version`
O_R = iota
O_W
O_RW
)
// config via commandline flags
@@ -34,6 +40,8 @@ type Config struct {
Showhelp bool `koanf:"help"` // -h
Internals bool `koanf:"internals"` // -i
Debug bool `koanf:"debug"` // -d
ReadMode bool `koanf:"read"` // -r
WriteMode bool `koanf:"write"` // -w
Label []string `koanf:"label"` // -v
Timeout int `koanf:"timeout"` // -t
Port int `koanf:"port"` // -p
@@ -60,6 +68,8 @@ func InitConfig(output io.Writer) (*Config, error) {
flagset.BoolP("help", "h", false, "show help")
flagset.BoolP("debug", "d", false, "enable debug logs")
flagset.BoolP("internals", "i", false, "add internal metrics")
flagset.BoolP("read", "r", false, "only execute the read test")
flagset.BoolP("write", "w", false, "only execute the write test")
flagset.StringArrayP("label", "l", nil, "additional labels")
flagset.IntP("timeout", "t", 1, "timeout for file operation in seconds")
flagset.IntP("port", "p", 9187, "prometheus metrics port to listen to")
@@ -103,5 +113,10 @@ func InitConfig(output io.Writer) (*Config, error) {
conf.Labels = append(conf.Labels, Label{Name: parts[0], Value: parts[1]})
}
if !conf.ReadMode && !conf.WriteMode {
conf.ReadMode = true
conf.WriteMode = true
}
return conf, nil
}

View File

@@ -1,47 +1,109 @@
package cmd
import (
"bytes"
"context"
"errors"
"io"
"log/slog"
"os"
"sync"
"time"
"github.com/ncw/directio"
)
func die(err error, fd *os.File) bool {
slog.Debug("failed to check io", "error", err)
// our primary container for the io checks
type Exporter struct {
conf *Config
alloc *Alloc
metrics *Metrics
}
if fd != nil {
if err := fd.Close(); err != nil {
slog.Debug("failed to close filehandle", "error", err)
type Result struct {
result bool
elapsed float64
}
func NewExporter(conf *Config, alloc *Alloc, metrics *Metrics) *Exporter {
return &Exporter{
conf: conf,
alloc: alloc,
metrics: metrics,
}
}
return false
// starts the primary go-routine, which will run the io checks for ever
func (exp *Exporter) RunIOchecks() *sync.WaitGroup {
var wg sync.WaitGroup
wg.Add(1)
go func() {
for {
var res_r, res_w Result
exp.alloc.Clean()
if exp.conf.WriteMode {
res_w = exp.measure(O_W)
slog.Debug("elapsed write time", "elapsed", res_w.elapsed, "result", res_w.result)
}
// Calls runcheck() with timeout
func runExporter(file string, alloc *Alloc, timeout time.Duration) bool {
if exp.conf.ReadMode {
res_r = exp.measure(O_R)
slog.Debug("elapsed read time", "elapsed", res_r.elapsed, "result", res_r.result)
}
if exp.conf.WriteMode && exp.conf.ReadMode {
if !exp.alloc.Compare() {
res_r.result = false
}
}
exp.metrics.Set(res_r, res_w)
time.Sleep(time.Duration(exp.conf.Sleeptime) * time.Second)
}
}()
return &wg
}
// call an io measurement and collect time needed
func (exp *Exporter) measure(mode int) Result {
start := time.Now()
result := exp.runExporter(mode)
// ns => s
now := time.Now()
elapsed := float64(now.Sub(start).Nanoseconds()) / 10000000000
return Result{elapsed: elapsed, result: result}
}
// Calls runcheck's with context timeout
func (exp *Exporter) runExporter(mode int) bool {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, timeout)
ctx, cancel := context.WithTimeout(ctx, time.Duration(exp.conf.Timeout)*time.Second)
defer cancel()
run := make(chan struct{}, 1)
var res bool
go func() {
res = runcheck(file, alloc)
switch mode {
case O_R:
res = exp.runcheck_r()
case O_W:
res = exp.runcheck_w()
}
run <- struct{}{}
}()
for {
select {
case <-ctx.Done():
return die(ctx.Err(), nil)
return report(ctx.Err(), nil)
case <-run:
return res
}
@@ -50,65 +112,66 @@ func runExporter(file string, alloc *Alloc, timeout time.Duration) bool {
// Checks file io on the specified path:
//
// - open the file (create if it doesnt exist)
// - truncate it if it already exists
// - write some data to it
// - closes the file
// - re-opens it for reading
// - opens it for reading
// - reads the block
// - compares if written block is equal to read block
// - closes file again
//
// Returns false if anything failed during that sequence,
// true otherwise.
func runcheck(file string, alloc *Alloc) bool {
alloc.Clean()
// write
fd, err := directio.OpenFile(file, os.O_RDWR|os.O_TRUNC|os.O_CREATE, 0640)
if err != nil {
die(err, nil)
}
for i := 0; i < len(alloc.writeBlock); i++ {
alloc.writeBlock[i] = 'A'
}
n, err := fd.Write(alloc.writeBlock)
if err != nil {
return die(err, fd)
}
if n != len(alloc.writeBlock) {
return die(errors.New("failed to write block"), fd)
}
if err := fd.Close(); err != nil {
return die(err, nil)
}
func (exp *Exporter) runcheck_r() bool {
// read
in, err := directio.OpenFile(file, os.O_RDONLY, 0640)
in, err := directio.OpenFile(exp.conf.File, os.O_RDONLY, 0640)
if err != nil {
die(err, nil)
report(err, nil)
}
n, err = io.ReadFull(in, alloc.readBlock)
n, err := io.ReadFull(in, exp.alloc.readBlock)
if err != nil {
return die(err, in)
return report(err, in)
}
if n != len(alloc.writeBlock) {
return die(errors.New("failed to read block"), fd)
if n != len(exp.alloc.writeBlock) {
return report(errors.New("failed to read block"), in)
}
if err := in.Close(); err != nil {
return die(err, nil)
}
// compare
if !bytes.Equal(alloc.writeBlock, alloc.readBlock) {
return die(errors.New("read not the same as written"), nil)
return report(err, nil)
}
return true
}
// Checks file io on the specified path:
//
// - open the file (create if it doesnt exist)
// - truncate it if it already exists
// - write some data to it
// - closes the file
//
// Returns false if anything failed during that sequence,
// true otherwise.
func (exp *Exporter) runcheck_w() bool {
// write
fd, err := directio.OpenFile(exp.conf.File, os.O_RDWR|os.O_TRUNC|os.O_CREATE, 0640)
if err != nil {
report(err, nil)
}
for i := 0; i < len(exp.alloc.writeBlock); i++ {
exp.alloc.writeBlock[i] = 'A'
}
n, err := fd.Write(exp.alloc.writeBlock)
if err != nil {
return report(err, fd)
}
if n != len(exp.alloc.writeBlock) {
return report(errors.New("failed to write block"), fd)
}
if err := fd.Close(); err != nil {
return report(err, nil)
}
return true

View File

@@ -2,6 +2,7 @@ package cmd
import (
"fmt"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
@@ -15,14 +16,16 @@ type Label struct {
// simple prometheus wrapper
type Metrics struct {
run *prometheus.GaugeVec
latency *prometheus.GaugeVec
latency_r *prometheus.GaugeVec
latency_w *prometheus.GaugeVec
registry *prometheus.Registry
values []string
mode int
}
func NewMetrics(conf *Config) *Metrics {
labels := []string{"file", "maxwait"}
LabelLen := 2
labels := []string{"file", "maxwait", "exectime"}
LabelLen := 3
for _, label := range conf.Labels {
labels = append(labels, label.Name)
@@ -36,10 +39,17 @@ func NewMetrics(conf *Config) *Metrics {
},
labels,
),
latency: prometheus.NewGaugeVec(
latency_r: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "io_exporter_io_latency",
Help: "how long does the operation take in seconds",
Name: "io_exporter_io_read_latency",
Help: "how long does the read operation take in seconds",
},
labels,
),
latency_w: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "io_exporter_io_write_latency",
Help: "how long does the write operation take in seconds",
},
labels,
),
@@ -53,7 +63,8 @@ func NewMetrics(conf *Config) *Metrics {
if conf.Internals {
metrics.registry.MustRegister(
metrics.run,
metrics.latency,
metrics.latency_r,
metrics.latency_w,
// we might need to take care of the exporter in terms of
// resources, so also report those internals
@@ -65,28 +76,50 @@ func NewMetrics(conf *Config) *Metrics {
),
)
} else {
metrics.registry.MustRegister(metrics.run, metrics.latency)
metrics.registry.MustRegister(metrics.run, metrics.latency_r, metrics.latency_w)
}
// static labels
metrics.values[0] = conf.File
metrics.values[1] = fmt.Sprintf("%d", conf.Timeout)
metrics.values[2] = fmt.Sprintf("%d", time.Now().UnixMilli())
// custom labels via -l label=value
for idx, label := range conf.Labels {
metrics.values[idx+LabelLen] = label.Value
}
switch {
case conf.ReadMode && conf.WriteMode:
metrics.mode = O_RW
case conf.ReadMode:
metrics.mode = O_R
case conf.WriteMode:
metrics.mode = O_W
}
return metrics
}
func (metrics *Metrics) Set(result bool, elapsed float64) {
func (metrics *Metrics) Set(result_r, result_w Result) {
var res float64
if result {
switch metrics.mode {
case O_RW:
if result_r.result && result_w.result {
res = 1
}
case O_R:
if result_r.result {
res = 1
}
case O_W:
if result_w.result {
res = 1
}
}
metrics.run.WithLabelValues(metrics.values...).Set(res)
metrics.latency.WithLabelValues(metrics.values...).Set(elapsed)
metrics.latency_r.WithLabelValues(metrics.values...).Set(result_r.elapsed)
metrics.latency_w.WithLabelValues(metrics.values...).Set(result_w.elapsed)
}

View File

@@ -7,11 +7,13 @@ import (
"net/http"
"os"
"strings"
"time"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
// Main program. starts 2 goroutines: our exporter and the http server
// for the prometheus metrics. The exporter reports measurement
// results to prometheus metrics directly
func Run() {
conf, err := InitConfig(os.Stdout)
if err != nil {
@@ -23,36 +25,42 @@ func Run() {
os.Exit(0)
}
metrics := NewMetrics(conf)
alloc := NewAlloc()
setLogger(os.Stdout, conf.Debug)
go func() {
for {
start := time.Now()
metrics := NewMetrics(conf)
alloc := NewAlloc()
exporter := NewExporter(conf, alloc, metrics)
result := runExporter(conf.File, alloc, time.Duration(conf.Timeout)*time.Second)
// ns => s
now := time.Now()
elapsed := float64(now.Sub(start).Nanoseconds()) / 10000000000
slog.Debug("elapsed time", "elapsed", elapsed, "result", result)
metrics.Set(result, elapsed)
time.Sleep(time.Duration(conf.Sleeptime) * time.Second)
}
}()
wg := exporter.RunIOchecks()
http.Handle("/metrics", promhttp.HandlerFor(
metrics.registry,
promhttp.HandlerOpts{},
))
slog.Info("start testing and serving metrics on localhost", "port", conf.Port)
slog.Info("test setup", "file", conf.File, "labels", strings.Join(conf.Label, ","))
slog.Info(" ╭──")
slog.Info(" │ io-exporter starting up", "version", Version)
slog.Info(" │ serving metrics", "host", "localhost", "port", conf.Port)
slog.Info(" │ test setup", "file", conf.File, "labels", strings.Join(conf.Label, ","))
slog.Info(" │ measuring", "read", conf.ReadMode, "write", conf.WriteMode, "timeout(s)", conf.Timeout)
slog.Info(" │ debugging", "enabled", conf.Debug)
slog.Info(" ╰──")
if err := http.ListenAndServe(fmt.Sprintf(":%d", conf.Port), nil); err != nil {
log.Fatal(err)
}
wg.Wait()
}
func report(err error, fd *os.File) bool {
slog.Debug("failed to check io", "error", err)
if fd != nil {
if err := fd.Close(); err != nil {
slog.Debug("failed to close filehandle", "error", err)
}
}
return false
}