From f1a07a1f63549daef5fec524736799876a6de436 Mon Sep 17 00:00:00 2001 From: Thomas von Dein Date: Tue, 21 Oct 2025 09:41:18 +0200 Subject: [PATCH] refactored for easier mainenance, add Alloc and Metrics classes --- cmd/alloc.go | 27 +++++ cmd/config.go | 98 ++++++++++++++++ cmd/exporter.go | 115 +++++++++++++++++++ cmd/log.go | 23 ++++ cmd/metrics.go | 75 ++++++++++++ cmd/root.go | 50 ++++++++ go.mod | 1 + go.sum | 2 + main.go | 299 +----------------------------------------------- 9 files changed, 393 insertions(+), 297 deletions(-) create mode 100644 cmd/alloc.go create mode 100644 cmd/config.go create mode 100644 cmd/exporter.go create mode 100644 cmd/log.go create mode 100644 cmd/metrics.go create mode 100644 cmd/root.go diff --git a/cmd/alloc.go b/cmd/alloc.go new file mode 100644 index 0000000..ddf5a10 --- /dev/null +++ b/cmd/alloc.go @@ -0,0 +1,27 @@ +package cmd + +import "github.com/ncw/directio" + +// aligned allocs used for testing +type Alloc struct { + writeBlock []byte + readBlock []byte +} + +// zero the memory blocks +func (alloc *Alloc) Clean() { + for i := range alloc.writeBlock { + alloc.writeBlock[i] = 0 + } + + for i := range alloc.readBlock { + alloc.readBlock[i] = 0 + } +} + +func NewAlloc() *Alloc { + return &Alloc{ + writeBlock: directio.AlignedBlock(directio.BlockSize), + readBlock: directio.AlignedBlock(directio.BlockSize), + } +} diff --git a/cmd/config.go b/cmd/config.go new file mode 100644 index 0000000..c2f9cfe --- /dev/null +++ b/cmd/config.go @@ -0,0 +1,98 @@ +package cmd + +import ( + "errors" + "fmt" + "io" + "log" + "os" + "strings" + + flag "github.com/spf13/pflag" + + "github.com/knadh/koanf/providers/posflag" + koanf "github.com/knadh/koanf/v2" +) + +const ( + Version = `v0.0.1` + SLEEP = 5 + Usage = `io-exporter [options] +Options: +-t --timeout When should the operation timeout in seconds +-l --label Add label to exported metric +-h --help Show help +-v --version Show program version` +) + +// config via commandline flags +type Config struct { + Showversion bool `koanf:"version"` // -v + Showhelp bool `koanf:"help"` // -h + Label []string `koanf:"label"` // -v + Timeout int `koanf:"timeout"` // -t + Port int `koanf:"port"` // -p + + File string + Labels []Label +} + +func InitConfig(output io.Writer) (*Config, error) { + var kloader = koanf.New(".") + + // setup custom usage + flagset := flag.NewFlagSet("config", flag.ContinueOnError) + flagset.Usage = func() { + _, err := fmt.Fprintln(output, Usage) + if err != nil { + log.Fatalf("failed to print to output: %s", err) + } + } + + // parse commandline flags + flagset.BoolP("version", "v", false, "show program version") + flagset.BoolP("help", "h", false, "show help") + flagset.StringArrayP("label", "l", nil, "additional labels") + flagset.IntP("timeout", "t", 1, "timeout for file operation in seconds") + flagset.IntP("port", "p", 9187, "prometheus metrics port to listen to") + + if err := flagset.Parse(os.Args[1:]); err != nil { + return nil, fmt.Errorf("failed to parse program arguments: %w", err) + } + + // command line setup + if err := kloader.Load(posflag.Provider(flagset, ".", kloader), nil); err != nil { + return nil, fmt.Errorf("error loading flags: %w", err) + } + + // fetch values + conf := &Config{} + if err := kloader.Unmarshal("", &conf); err != nil { + return nil, fmt.Errorf("error unmarshalling: %w", err) + } + + // arg is the file under test + if len(flagset.Args()) > 0 { + conf.File = flagset.Args()[0] + } else { + if !conf.Showversion { + flagset.Usage() + os.Exit(1) + } + } + + for _, label := range conf.Label { + if len(label) == 0 { + continue + } + + parts := strings.Split(label, "=") + if len(parts) != 2 { + return nil, errors.New("invalid label spec: " + label + ", expected label=value") + } + + conf.Labels = append(conf.Labels, Label{Name: parts[0], Value: parts[1]}) + } + + return conf, nil +} diff --git a/cmd/exporter.go b/cmd/exporter.go new file mode 100644 index 0000000..ed9d149 --- /dev/null +++ b/cmd/exporter.go @@ -0,0 +1,115 @@ +package cmd + +import ( + "bytes" + "context" + "errors" + "io" + "log/slog" + "os" + "time" + + "github.com/ncw/directio" +) + +func die(err error, fd *os.File) bool { + slog.Debug("failed to check io", "error", err) + + if fd != nil { + if err := fd.Close(); err != nil { + slog.Debug("failed to close filehandle", "error", err) + } + } + + return false +} + +// Calls runcheck() with timeout +func runExporter(file string, alloc *Alloc, timeout time.Duration) bool { + ctx := context.Background() + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + run := make(chan struct{}, 1) + var res bool + + go func() { + res = runcheck(file, alloc) + run <- struct{}{} + }() + + for { + select { + case <-ctx.Done(): + return die(ctx.Err(), nil) + case <-run: + return res + } + } +} + +// Checks file io on the specified path: +// +// - open the file (create if it doesnt exist) +// - truncate it if it already exists +// - write some data to it +// - closes the file +// - re-opens it for reading +// - reads the block +// - compares if written block is equal to read block +// - closes file again +// +// Returns false if anything failed during that sequence, +// true otherwise. +func runcheck(file string, alloc *Alloc) bool { + alloc.Clean() + + // write + fd, err := directio.OpenFile(file, os.O_RDWR|os.O_TRUNC|os.O_CREATE, 0640) + if err != nil { + die(err, nil) + } + + for i := 0; i < len(alloc.writeBlock); i++ { + alloc.writeBlock[i] = 'A' + } + + n, err := fd.Write(alloc.writeBlock) + if err != nil { + return die(err, fd) + } + + if n != len(alloc.writeBlock) { + return die(errors.New("failed to write block"), fd) + } + + if err := fd.Close(); err != nil { + return die(err, nil) + } + + // read + in, err := directio.OpenFile(file, os.O_RDONLY, 0640) + if err != nil { + die(err, nil) + } + + n, err = io.ReadFull(in, alloc.readBlock) + if err != nil { + return die(err, in) + } + + if n != len(alloc.writeBlock) { + return die(errors.New("failed to read block"), fd) + } + + if err := in.Close(); err != nil { + return die(err, nil) + } + + // compare + if !bytes.Equal(alloc.writeBlock, alloc.readBlock) { + return die(errors.New("Read not the same as written"), nil) + } + + return true +} diff --git a/cmd/log.go b/cmd/log.go new file mode 100644 index 0000000..fbe3f97 --- /dev/null +++ b/cmd/log.go @@ -0,0 +1,23 @@ +package cmd + +import ( + "io" + "log/slog" + + "github.com/lmittmann/tint" +) + +func setLogger(output io.Writer) { + logLevel := &slog.LevelVar{} + opts := &tint.Options{ + Level: logLevel, + AddSource: false, + } + + logLevel.Set(slog.LevelDebug) + + handler := tint.NewHandler(output, opts) + logger := slog.New(handler) + + slog.SetDefault(logger) +} diff --git a/cmd/metrics.go b/cmd/metrics.go new file mode 100644 index 0000000..48e15d3 --- /dev/null +++ b/cmd/metrics.go @@ -0,0 +1,75 @@ +package cmd + +import ( + "fmt" + + "github.com/prometheus/client_golang/prometheus" +) + +// custom labels +type Label struct { + Name, Value string +} + +// simple prometheus wrapper +type Metrics struct { + run *prometheus.GaugeVec + latency *prometheus.GaugeVec + registry *prometheus.Registry + values []string +} + +func NewMetrics(conf *Config) *Metrics { + labels := []string{"file", "maxwait"} + LabelLen := 2 + + for _, label := range conf.Labels { + labels = append(labels, label.Name) + } + + metrics := &Metrics{ + run: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "io_exporter_io_operation", + Help: "whether io is working on the pvc, 1=ok, 0=fail", + }, + labels, + ), + latency: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "io_exporter_io_latency", + Help: "how long does the operation take in seconds", + }, + labels, + ), + + // use fixed size slice to avoid repeated allocs + values: make([]string, LabelLen+len(conf.Labels)), + + registry: prometheus.NewRegistry(), + } + + metrics.registry.MustRegister(metrics.run, metrics.latency) + + // static labels + metrics.values[0] = conf.File + metrics.values[1] = fmt.Sprintf("%d", conf.Timeout) + + // custom labels via -l label=value + for idx, label := range conf.Labels { + metrics.values[idx+LabelLen] = label.Value + } + + return metrics +} + +func (metrics *Metrics) Set(result bool, elapsed float64) { + var res float64 + + if result { + res = 1 + } + + metrics.run.WithLabelValues(metrics.values...).Set(res) + metrics.latency.WithLabelValues(metrics.values...).Set(elapsed) +} diff --git a/cmd/root.go b/cmd/root.go new file mode 100644 index 0000000..e321d13 --- /dev/null +++ b/cmd/root.go @@ -0,0 +1,50 @@ +package cmd + +import ( + "fmt" + "log" + "log/slog" + "net/http" + "os" + "strings" + "time" + + "github.com/prometheus/client_golang/prometheus/promhttp" +) + +func Run() { + conf, err := InitConfig(os.Stdout) + if err != nil { + log.Fatal(err) + } + + metrics := NewMetrics(conf) + alloc := NewAlloc() + + setLogger(os.Stdout) + + go func() { + for { + start := time.Now() + + result := runExporter(conf.File, alloc, time.Duration(conf.Timeout)*time.Second) + + // ns => s + now := time.Now() + elapsed := float64(now.Sub(start).Nanoseconds()) / 10000000000 + + metrics.Set(result, elapsed) + + time.Sleep(SLEEP * time.Second) + } + }() + + http.Handle("/metrics", promhttp.HandlerFor( + metrics.registry, + promhttp.HandlerOpts{}, + )) + + slog.Info("start testing and serving metrics on localhost", "port", conf.Port) + slog.Info("test setup", "file", conf.File, "labels", strings.Join(conf.Label, ",")) + http.ListenAndServe(fmt.Sprintf(":%d", conf.Port), nil) +} diff --git a/go.mod b/go.mod index e014db9..e0b7e5e 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( ) require ( + github.com/alecthomas/repr v0.5.2 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/go-viper/mapstructure/v2 v2.4.0 // indirect diff --git a/go.sum b/go.sum index 3a3e5e6..a33d577 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +github.com/alecthomas/repr v0.5.2 h1:SU73FTI9D1P5UNtvseffFSGmdNci/O6RsqzeXJtP0Qs= +github.com/alecthomas/repr v0.5.2/go.mod h1:Fr0507jx4eOXV7AlPV6AVZLYrLIuIeSOWtW57eE/O/4= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= diff --git a/main.go b/main.go index 6c3909a..89ee6ff 100644 --- a/main.go +++ b/main.go @@ -1,302 +1,7 @@ package main -import ( - "bytes" - "context" - "errors" - "fmt" - "io" - "log" - "log/slog" - "net/http" - "os" - "strings" - "time" - - "github.com/lmittmann/tint" - flag "github.com/spf13/pflag" - - "github.com/knadh/koanf/providers/posflag" - koanf "github.com/knadh/koanf/v2" - "github.com/ncw/directio" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promhttp" -) - -const ( - Version = `v0.0.1` - SLEEP = 5 - Usage = `io-exporter [options] -Options: --t --timeout When should the operation timeout in seconds --l --label Add label to exported metric --h --help Show help --v --version Show program version` -) - -var ( - labels = []string{"file", "maxwait"} -) - -type Label struct { - Name, Value string -} - -type Config struct { - Showversion bool `koanf:"version"` // -v - Showhelp bool `koanf:"help"` // -h - Label []string `koanf:"label"` // -v - Timeout int `koanf:"timeout"` // -t - Port int `koanf:"port"` // -p - - File string - Labels []Label -} - -func InitConfig(output io.Writer) (*Config, error) { - var kloader = koanf.New(".") - - // setup custom usage - flagset := flag.NewFlagSet("config", flag.ContinueOnError) - flagset.Usage = func() { - _, err := fmt.Fprintln(output, Usage) - if err != nil { - log.Fatalf("failed to print to output: %s", err) - } - } - - // parse commandline flags - flagset.BoolP("version", "v", false, "show program version") - flagset.BoolP("help", "h", false, "show help") - flagset.StringArrayP("label", "l", nil, "additional labels") - flagset.IntP("timeout", "t", 1, "timeout for file operation in seconds") - flagset.IntP("port", "p", 9187, "prometheus metrics port to listen to") - - if err := flagset.Parse(os.Args[1:]); err != nil { - return nil, fmt.Errorf("failed to parse program arguments: %w", err) - } - - // command line setup - if err := kloader.Load(posflag.Provider(flagset, ".", kloader), nil); err != nil { - return nil, fmt.Errorf("error loading flags: %w", err) - } - - // fetch values - conf := &Config{} - if err := kloader.Unmarshal("", &conf); err != nil { - return nil, fmt.Errorf("error unmarshalling: %w", err) - } - - // arg is the file under test - if len(flagset.Args()) > 0 { - conf.File = flagset.Args()[0] - } else { - if !conf.Showversion { - flagset.Usage() - os.Exit(1) - } - } - - for _, label := range conf.Label { - if len(label) == 0 { - continue - } - - parts := strings.Split(label, "=") - if len(parts) != 2 { - return nil, errors.New("invalid label spec: " + label + ", expected label=value") - } - - conf.Labels = append(conf.Labels, Label{Name: parts[0], Value: parts[1]}) - } - - return conf, nil -} - -func die(err error, fd *os.File) bool { - slog.Debug("failed to check io", "error", err) - - if fd != nil { - if err := fd.Close(); err != nil { - slog.Debug("failed to close filehandle", "error", err) - } - } - - return false -} - -func setLogger(output io.Writer) { - logLevel := &slog.LevelVar{} - opts := &tint.Options{ - Level: logLevel, - AddSource: false, - } - - logLevel.Set(slog.LevelDebug) - - handler := tint.NewHandler(output, opts) - logger := slog.New(handler) - - slog.SetDefault(logger) -} +import "github.com/tlinden/io-exporter/cmd" func main() { - conf, err := InitConfig(os.Stdout) - if err != nil { - log.Fatal(err) - } - - promRegistry := prometheus.NewRegistry() - - for _, label := range conf.Labels { - labels = append(labels, label.Name) - } - - ioexporterRun := prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Name: "io_exporter_io_operation", - Help: "whether io is working on the pvc, 1=ok, 0=fail", - }, - labels, - ) - - ioexporterLatency := prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Name: "io_exporter_io_latency", - Help: "how long does the operation take in seconds", - }, - labels, - ) - - promRegistry.MustRegister(ioexporterRun, ioexporterLatency) - - timeoutstr := fmt.Sprintf("%d", conf.Timeout) - - setLogger(os.Stdout) - - go func() { - for { - var res float64 - start := time.Now() - - if check(conf.File, time.Duration(conf.Timeout)*time.Second) { - res = 1 - } else { - res = 0 - } - - // ns => s - now := time.Now() - elapsed := float64(now.Sub(start).Nanoseconds()) / 10000000000 - - values := []string{conf.File, timeoutstr} - for _, label := range conf.Labels { - values = append(values, label.Value) - } - - ioexporterRun.WithLabelValues(values...).Set(res) - ioexporterLatency.WithLabelValues(values...).Set(elapsed) - time.Sleep(SLEEP * time.Second) - } - }() - - http.Handle("/metrics", promhttp.HandlerFor( - promRegistry, - promhttp.HandlerOpts{}, - )) - - slog.Info("start testing and serving metrics on localhost", "port", conf.Port) - slog.Info("test setup", "file", conf.File, "labels", strings.Join(conf.Label, ",")) - http.ListenAndServe(fmt.Sprintf(":%d", conf.Port), nil) -} - -// Calls runcheck() with timeout -func check(file string, timeout time.Duration) bool { - ctx := context.Background() - ctx, cancel := context.WithTimeout(ctx, timeout) - defer cancel() - - run := make(chan struct{}, 1) - var res bool - - go func() { - res = runcheck(file) - run <- struct{}{} - }() - - for { - select { - case <-ctx.Done(): - return die(ctx.Err(), nil) - case <-run: - return res - } - } -} - -// Checks file io on the specified path: -// -// - open the file (create if it doesnt exist) -// - truncate it if it already exists -// - write some data to it -// - closes the file -// - re-opens it for reading -// - reads the block -// - compares if written block is equal to read block -// - closes file again -// -// Returns false if anything failed during that sequence, -// true otherwise. -func runcheck(file string) bool { - // write - fd, err := directio.OpenFile(file, os.O_RDWR|os.O_TRUNC|os.O_CREATE, 0640) - if err != nil { - die(err, nil) - } - - block1 := directio.AlignedBlock(directio.BlockSize) - for i := 0; i < len(block1); i++ { - block1[i] = 'A' - } - - n, err := fd.Write(block1) - if err != nil { - return die(err, fd) - } - - if n != len(block1) { - return die(errors.New("failed to write block"), fd) - } - - if err := fd.Close(); err != nil { - return die(err, nil) - } - - // read - in, err := directio.OpenFile(file, os.O_RDONLY, 0640) - if err != nil { - die(err, nil) - } - - block2 := directio.AlignedBlock(directio.BlockSize) - - n, err = io.ReadFull(in, block2) - if err != nil { - return die(err, in) - } - - if n != len(block1) { - return die(errors.New("failed to read block"), fd) - } - - if err := in.Close(); err != nil { - return die(err, nil) - } - - // compare - if !bytes.Equal(block1, block2) { - return die(errors.New("Read not the same as written"), nil) - } - - return true + cmd.Run() }