diff --git a/cmd/basic/main.go b/cmd/basic/main.go index ebacd52..d1566bb 100644 --- a/cmd/basic/main.go +++ b/cmd/basic/main.go @@ -9,7 +9,8 @@ import ( "time" "github.com/dgryski/go-pcgr" - mct "github.com/memcached/mctester" + mct "github.com/memcached/mctester/internal" + "github.com/memcached/mctester/pkg/client" ) var cpuprofile = flag.String("cpuprofile", "", "dump cpu profile to file") @@ -163,7 +164,7 @@ func (l *BasicLoader) Timer(tag string, start time.Time) { func (l *BasicLoader) Worker(doneChan chan<- int) { // FIXME: selector. host := l.servers[0] - mc := mct.NewClient(host, l.socket, l.pipelines, l.keyPrefix, l.stripKeyPrefix) + mc := client.NewClient(host, l.socket, l.pipelines, l.keyPrefix, l.stripKeyPrefix) bundles := l.requestBundlesPerConn rs := pcgr.New(time.Now().UnixNano(), 0) @@ -224,7 +225,7 @@ func (l *BasicLoader) Worker(doneChan chan<- int) { return } // set missing values - if code == mct.McMISS { + if code == client.McMISS { // TODO: random sizing value := mct.RandBytes(&rs, int(l.valueSize)) start := time.Now() diff --git a/cmd/ratectrl/main.go b/cmd/ratectrl/main.go new file mode 100644 index 0000000..f7e3a65 --- /dev/null +++ b/cmd/ratectrl/main.go @@ -0,0 +1,73 @@ +package main + +import ( + "flag" + "fmt" + "time" + + "github.com/memcached/mctester/pkg/ratectrl" +) + +func main() { + fmt.Println("starting") + + clientFlags := flag.Uint("clientflags", 0, "(32bit unsigned) client flag bits to set on miss") + connCount := flag.Int("conncount", 1, "number of client connections to establish") + duration := flag.Duration("duration", 0, "length of time that the test will run (0 for unlimited)") + keyLength := flag.Int("keylength", 10, "number of random characters to append to key") + keyPrefix := flag.String("keyprefix", "mctester:", "prefix to append to all generated keys") + keySpace := flag.Int("keyspace", 1000, "number of unique keys to generate") + metaDelFlags := flag.String("mdflags", "", "flags sent alongside 'Meta Delete' commands") + useMeta := flag.Bool("meta", false, "if true, communicate with Memcached using meta commands") + metaGetFlags := flag.String("mgflags", "f v", "flags sent alongside 'Meta Get' commands") + metaSetFlags := flag.String("msflags", "", "flags sent alongside 'Meta Set' commands") + pipelines := flag.Uint("pipelines", 1, "(32bit unsigned) number of GET requests to stack within the same syscall") + delRatio := flag.Int("ratiodel", 0, "proportion of requests that should be sent as 'deletes'") + getRatio := flag.Int("ratioget", 90, "proportion of requests that should be sent as 'gets'") + setRatio := flag.Int("ratioset", 10, "proportion of requests that should be sent as 'sets'") + rngSeed := flag.Int64("rngseed", time.Now().UnixNano(), "seed value used when initializing RNG") + rps := flag.Int("rps", 0, "target number of requests per second (0 for unlimited)") + server := flag.String("server", "127.0.0.1:11211", "`ip:port` for Memcached instance under test") + socket := flag.String("socket", "", "domain socket used for connections") + stripKeyPrefix := flag.Bool("stripkeyprefix", false, "remove key prefix before comparing with response") + keyTTL := flag.Uint("ttl", 180, "TTL to set with new items") + validateGets := flag.Bool("validate", false, "compare the value returned from a 'get' to what was initially 'set'") + valueSize := flag.Uint("valuesize", 1000, "size of value (in bytes) to store on miss") + warmPercent := flag.Int("warm", 90, "percent of keys to 'set' in Memcached before testing begins") + useZipf := flag.Bool("zipf", false, "use Zipf instead of uniform randomness (slow)") + zipfS := flag.Float64("zipfS", 1.01, "zipf S value (general pull toward zero) must be > 1.0") + zipfV := flag.Float64("zipfV", float64(*keySpace/2), "zipf V value (pull below this number") + + flag.Parse() + + testConfig := &ratectrl.Config{ + ClientFlags: *clientFlags, + ConnCount: *connCount, + DelRatio: *delRatio, + Duration: *duration, + GetRatio: *getRatio, + KeyLength: *keyLength, + KeyPrefix: *keyPrefix, + KeySpace: *keySpace, + KeyTTL: *keyTTL, + MetaDelFlags: *metaDelFlags, + MetaGetFlags: *metaGetFlags, + MetaSetFlags: *metaSetFlags, + Pipelines: *pipelines, + RngSeed: *rngSeed, + RPS: *rps, + Servers: []string{*server}, + SetRatio: *setRatio, + Socket: *socket, + StripKeyPrefix: *stripKeyPrefix, + UseMeta: *useMeta, + UseZipf: *useZipf, + ValidateGets: *validateGets, + ValueSize: *valueSize, + WarmPercent: *warmPercent, + ZipfS: *zipfS, + ZipfV: *zipfV, + } + + testConfig.Run() +} diff --git a/cmd/server/loader_basic.go b/cmd/server/loader_basic.go index e923401..a80e68d 100644 --- a/cmd/server/loader_basic.go +++ b/cmd/server/loader_basic.go @@ -6,7 +6,8 @@ import ( "time" "github.com/dgryski/go-pcgr" - mct "github.com/memcached/mctester" + mct "github.com/memcached/mctester/internal" + "github.com/memcached/mctester/pkg/client" ) // Basic persistent load test, using text protocol: @@ -123,7 +124,7 @@ func runBasicLoader(Update <-chan interface{}, worker interface{}) { func basicWorker(id int, doneChan chan<- int, updateChan <-chan *BasicLoader, l *BasicLoader) { // TODO: server selector. host := l.Servers[0] - mc := mct.NewClient(host, l.Socket, l.Pipelines, l.KeyPrefix, l.StripKeyPrefix) + mc := client.NewClient(host, l.Socket, l.Pipelines, l.KeyPrefix, l.StripKeyPrefix) bundles := l.RequestBundlesPerConn rs := pcgr.New(time.Now().UnixNano(), 0) @@ -179,7 +180,7 @@ func basicWorker(id int, doneChan chan<- int, updateChan <-chan *BasicLoader, l return } // set missing values - if code == mct.McMISS { + if code == client.McMISS { // TODO: random sizing value := mct.RandBytes(&rs, int(l.ValueSize)) mc.Set(key, uint32(l.ClientFlags), uint32(l.KeyTTL), value) diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..7c8b03a --- /dev/null +++ b/go.mod @@ -0,0 +1,14 @@ +module github.com/memcached/mctester + +go 1.18 + +require ( + github.com/jamiealquiza/tachymeter v2.0.0+incompatible + go.uber.org/ratelimit v0.2.0 + golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f +) + +require ( + github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129 // indirect + github.com/dgryski/go-pcgr v0.0.0-20211101192959-4b34ab9ccb8c // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..6e9ae11 --- /dev/null +++ b/go.sum @@ -0,0 +1,24 @@ +github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129 h1:MzBOUgng9orim59UnfUTLRjMpd09C5uEVQ6RPGeCaVI= +github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129/go.mod h1:rFgpPQZYZ8vdbc+48xibu8ALc3yeyd64IhHS+PU6Yyg= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-pcgr v0.0.0-20211101192959-4b34ab9ccb8c h1:f26exq4Heidug80gN340GXU86doiFKGUYFluBO+mbDk= +github.com/dgryski/go-pcgr v0.0.0-20211101192959-4b34ab9ccb8c/go.mod h1:ztV/u9hqJRBCT0P03v0Ueol7unBefCKL+paOoIZkR88= +github.com/jamiealquiza/tachymeter v2.0.0+incompatible h1:mGiF1DGo8l6vnGT8FXNNcIXht/YmjzfraiUprXYwJ6g= +github.com/jamiealquiza/tachymeter v2.0.0+incompatible/go.mod h1:Ayf6zPZKEnLsc3winWEXJRkTBhdHo58HODAu1oFJkYU= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= +go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/ratelimit v0.2.0 h1:UQE2Bgi7p2B85uP5dC2bbRtig0C+OeNRnNEafLjsLPA= +go.uber.org/ratelimit v0.2.0/go.mod h1:YYBV4e4naJvhpitQrWJu1vCpgB7CboMe0qhltKt6mUg= +golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f h1:Ax0t5p6N38Ga0dThY21weqDEyz2oklo4IvDkpigvkD8= +golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/support.go b/internal/support.go similarity index 99% rename from support.go rename to internal/support.go index 2c9ae65..d2263c4 100644 --- a/support.go +++ b/internal/support.go @@ -1,4 +1,4 @@ -package mctester +package internal // "github.com/cespare/xxhash" import ( diff --git a/protocol.go b/pkg/client/protocol.go similarity index 99% rename from protocol.go rename to pkg/client/protocol.go index 7ed0ac0..f1fe932 100644 --- a/protocol.go +++ b/pkg/client/protocol.go @@ -6,7 +6,7 @@ // https://github.com/stathat/consistent MIT licensed. // maybe just an example that uses it separately? -package mctester +package client import ( "bufio" @@ -112,6 +112,7 @@ const ( McOK McEN McME + McHD McNS McEX McNF @@ -153,11 +154,10 @@ func (c *Client) ParseMetaResponse() (rflags []byte, value []byte, code McCode, rflags = line[4+offset : len(line)-2] // Have some value data to read. + 2 bytes for \r\n value = make([]byte, size+2) - read, err := io.ReadFull(c.cn.b, value) + _, err := io.ReadFull(c.cn.b, value) if err != nil { return nil, nil, 0, err } - fmt.Printf("res size: %d read: %d\n", size, read) // check for \r\n, cut extra bytes off. if !bytes.Equal(value[len(value)-2:], []byte("\r\n")) { return nil, nil, 0, ErrCorruptValue @@ -177,6 +177,9 @@ func (c *Client) ParseMetaResponse() (rflags []byte, value []byte, code McCode, // Meta Debug command value = line[3 : len(line)-2] code = McME + case "HD": + // Meta STORED/DELETED + code = McHD case "NS": // Meta NOT_STORED rflags = line[3 : len(line)-2] @@ -261,6 +264,8 @@ func (c *Client) MetaSet(key string, flags string, value []byte) (err error) { b.WriteString("ms ") b.WriteString(key) b.WriteString(" ") + b.WriteString(strconv.FormatUint(uint64(len(value)), 10)) + b.WriteString(" ") b.WriteString(flags) b.WriteString("\r\n") // For large sets this ends up flushing twice. diff --git a/protocol_test.go b/pkg/client/protocol_test.go similarity index 97% rename from protocol_test.go rename to pkg/client/protocol_test.go index 357558c..9b0b3d4 100644 --- a/protocol_test.go +++ b/pkg/client/protocol_test.go @@ -1,4 +1,4 @@ -package mctester +package client import ( "bytes" @@ -28,12 +28,12 @@ func newcli() *Client { func TestMeta(t *testing.T) { mc := newcli() { - err := mc.MetaSet("doob", "S4 T300", []byte("foop")) + err := mc.MetaSet("doob", "T300", []byte("foop")) if err != nil { t.Fatalf("metaset error: %v", err) } _, _, c, err := mc.MetaReceive() - if c != McOK { + if c != McHD { t.Fatalf("metaset not stored: %d", c) } } diff --git a/pkg/ratectrl/config.go b/pkg/ratectrl/config.go new file mode 100644 index 0000000..b837490 --- /dev/null +++ b/pkg/ratectrl/config.go @@ -0,0 +1,314 @@ +package ratectrl + +import ( + "bytes" + "context" + "fmt" + "math/big" + "math/rand" + "time" + + "github.com/dgryski/go-pcgr" + "github.com/jamiealquiza/tachymeter" + mct "github.com/memcached/mctester/internal" + "github.com/memcached/mctester/pkg/client" + "go.uber.org/ratelimit" + "golang.org/x/sync/errgroup" +) + +type Config struct { + ClientFlags uint + ConnCount int + DelRatio int + Duration time.Duration + GetRatio int + KeyLength int + KeyPrefix string + KeySpace int + KeyTTL uint + MetaDelFlags string + MetaGetFlags string + MetaSetFlags string + Pipelines uint + RngSeed int64 + RPS int + Servers []string + SetRatio int + Socket string + StripKeyPrefix bool + UseMeta bool + UseZipf bool + ValidateGets bool + ValueSize uint + WarmPercent int + ZipfS float64 // (> 1, generally 1.01-2) pulls the power curve toward 0) + ZipfV float64 // v (< keySpace) puts the main part of the curve before this number + + cacheEntries []CacheEntry + rateLimiter ratelimit.Limiter + tachymeter *tachymeter.Tachymeter +} + +type CacheEntry struct { + key string + value []byte +} + +func (conf *Config) GenerateEntries() (entries []CacheEntry) { + entries = make([]CacheEntry, conf.KeySpace) + subRS := pcgr.New(1, 0) + + for i := 0; i < conf.KeySpace; i++ { + subRS.Seed(conf.RngSeed + int64(i)) + key := mct.RandString(&subRS, conf.KeyLength, conf.KeyPrefix) + + valSeed := new(big.Int).SetBytes([]byte(key)).Int64() + subRS.Seed(valSeed) + value := mct.RandBytes(&subRS, int(conf.ValueSize)) + + entries[i] = CacheEntry{key, value} + } + + return +} + +func (conf *Config) Run() (err error) { + g, _ := errgroup.WithContext(context.Background()) + + samples := conf.RPS * conf.ConnCount + if samples < 1000 { + samples = 1000 + } + + conf.cacheEntries = conf.GenerateEntries() + + if conf.WarmPercent > 0 { + err = conf.WarmCache() + if err != nil { + return + } + } + + if conf.RPS > 0 { + conf.rateLimiter = ratelimit.New(conf.RPS) + } else { + conf.rateLimiter = ratelimit.NewUnlimited() + } + + threadStats := make(chan Stats, conf.ConnCount) + conf.tachymeter = tachymeter.New(&tachymeter.Config{Size: samples}) + startTime := time.Now() + + for worker := 0; worker < conf.ConnCount; worker++ { + index := worker + g.Go(func() error { + return conf.Worker(index, threadStats) + }) + } + + err = g.Wait() + endTime := time.Now() + if err != nil { + return + } + + conf.tachymeter.SetWallTime(time.Since(startTime)) + close(threadStats) + testStats := &Stats{} + for stats := range threadStats { + testStats.Add(&stats) + } + if !conf.ValidateGets { + testStats.KeyCollisions = -1 + } + + report := &Report{ + StartTime: startTime, + EndTime: endTime, + Config: conf, + Metrics: conf.tachymeter.Calc(), + Stats: testStats, + } + err = report.PrettyPrint() + + return +} + +func (conf *Config) WarmCache() error { + mc := client.NewClient(conf.Servers[0], conf.Socket, conf.Pipelines, conf.KeyPrefix, conf.StripKeyPrefix) + rs := pcgr.New(conf.RngSeed, 0) + randR := rand.New(&rs) + + for keyIndex := 0; keyIndex < conf.KeySpace; keyIndex++ { + if randR.Intn(100) < conf.WarmPercent { + entry := conf.cacheEntries[keyIndex] + key := entry.key + value := entry.value + + if conf.UseMeta { + err := mc.MetaSet(key, conf.MetaSetFlags, value) + if err != nil { + fmt.Printf("metaset error: %v\n", err) + return err + } + + _, _, c, err := mc.MetaReceive() + if c != client.McHD { + fmt.Printf("metaset not stored: %d\n", c) + } + if err != nil { + fmt.Printf("metaset receive error: %v\n", err) + return err + } + } else { + _, err := mc.Set(key, uint32(conf.ClientFlags), uint32(conf.KeyTTL), value) + if err != nil { + fmt.Println(err) + return err + } + } + } + } + + return nil +} + +func (conf *Config) Worker(index int, results chan Stats) error { + mc := client.NewClient(conf.Servers[0], conf.Socket, conf.Pipelines, conf.KeyPrefix, conf.StripKeyPrefix) + stats := Stats{} + rl := conf.rateLimiter + + workerSeed := conf.RngSeed + int64(index) + int64(conf.KeySpace) + rs := pcgr.New(workerSeed, 0) + randR := rand.New(&rs) + + var zipRS *rand.Zipf + if conf.UseZipf { + zipRS = rand.NewZipf(randR, conf.ZipfS, conf.ZipfV, uint64(conf.KeySpace)) + if zipRS == nil { + fmt.Printf("bad arguments to zipf: S: %f V: %f\n", conf.ZipfS, conf.ZipfV) + return nil + } + } + + for start := time.Now(); ; { + iterStart := time.Now() + if iterStart.Sub(start) > conf.Duration { + break + } + + var index int + if conf.UseZipf { + index = int(zipRS.Uint64()) + } else { + index = randR.Intn(conf.KeySpace) + } + + entry := conf.cacheEntries[index] + key := entry.key + + rl.Take() + switch rng := randR.Intn(conf.DelRatio + conf.SetRatio + conf.GetRatio); { + case rng < conf.DelRatio: + var code client.McCode + var err error + + if conf.UseMeta { + err = mc.MetaDelete(key, conf.MetaDelFlags) + if err != nil { + fmt.Printf("metadelete error: %v\n", err) + return err + } + + _, _, code, err = mc.MetaReceive() + if code != client.McHD && code != client.McNF { + fmt.Printf("metadelete not successful: %d\n", code) + } + if err != nil { + fmt.Printf("metadelete receive error: %v\n", err) + return err + } + } else { + code, err = mc.Delete(key) + if err != nil { + fmt.Println(err) + return err + } + } + + switch code { + case client.McDELETED, client.McHD: + stats.DeleteHits++ + case client.McNOT_FOUND, client.McNF: + stats.DeleteMisses++ + } + case rng < (conf.DelRatio + conf.SetRatio): + value := entry.value + + if conf.UseMeta { + err := mc.MetaSet(key, conf.MetaSetFlags, value) + if err != nil { + fmt.Printf("metaset error: %v\n", err) + return err + } + + _, _, code, err := mc.MetaReceive() + if code != client.McHD { + fmt.Printf("metaset not stored: %d\n", code) + } + if err != nil { + fmt.Printf("metaset receive error: %v\n", err) + return err + } + } else { + _, err := mc.Set(key, uint32(conf.ClientFlags), uint32(conf.KeyTTL), value) + if err != nil { + fmt.Println(err) + return err + } + } + + stats.SetsTotal++ + default: + var code client.McCode + var value []byte + if conf.UseMeta { + err := mc.MetaGet(key, conf.MetaGetFlags) + if err != nil { + fmt.Printf("metaget error: %v\n", err) + return err + } + _, value, code, err = mc.MetaReceive() + if err != nil { + fmt.Printf("metaget receive error: %v\n", err) + return err + } + } else { + var err error + _, value, code, err = mc.Get(key) + if err != nil { + fmt.Println(err, value) + return err + } + } + + switch code { + case client.McHIT, client.McVA: + stats.GetHits++ + + expectedValue := entry.value + if conf.ValidateGets && !bytes.Equal(value, expectedValue) { + stats.KeyCollisions++ + fmt.Printf("Unexpected value found for key `%s`\n\tExpected Value: %s\n\tActual Value: %s\n", key, expectedValue, value) + } + case client.McMISS, client.McEN: + stats.GetMisses++ + } + } + + conf.tachymeter.AddTime(time.Since(iterStart)) + } + + results <- stats + return nil +} diff --git a/pkg/ratectrl/report.go b/pkg/ratectrl/report.go new file mode 100644 index 0000000..47c6e78 --- /dev/null +++ b/pkg/ratectrl/report.go @@ -0,0 +1,43 @@ +package ratectrl + +import ( + "encoding/json" + "fmt" + "time" + + "github.com/jamiealquiza/tachymeter" +) + +type Report struct { + StartTime time.Time + EndTime time.Time + Config *Config + Metrics *tachymeter.Metrics + Stats *Stats +} + +func (report *Report) PrettyPrint() (err error) { + jsonReport, err := json.MarshalIndent(report, "", "\t") + if err == nil { + fmt.Println(string(jsonReport)) + } + return +} + +type Stats struct { + DeleteHits int + DeleteMisses int + GetHits int + GetMisses int + KeyCollisions int + SetsTotal int +} + +func (stats *Stats) Add(other *Stats) { + stats.DeleteHits += other.DeleteHits + stats.DeleteMisses += other.DeleteMisses + stats.GetHits += other.GetHits + stats.GetMisses += other.GetMisses + stats.KeyCollisions += other.KeyCollisions + stats.SetsTotal += other.SetsTotal +}