diff --git a/aws/s3/main.go b/aws/s3/main.go index 2dfa453..1880be7 100644 --- a/aws/s3/main.go +++ b/aws/s3/main.go @@ -3,6 +3,7 @@ package s3 import ( "log" + gopilosa "github.com/pilosa/go-pilosa" "github.com/pilosa/pdk" "github.com/pkg/errors" ) @@ -59,7 +60,9 @@ func (m *Main) Run() error { mapper := pdk.NewCollapsingMapper() mapper.Framer = &m.Framer - indexer, err := pdk.SetupPilosa(m.PilosaHosts, m.Index, []pdk.FrameSpec{}, m.BatchSize) + indexer, err := pdk.SetupPilosa(m.PilosaHosts, m.Index, []pdk.FrameSpec{}, + gopilosa.OptImportStrategy(gopilosa.BatchImport), + gopilosa.OptImportBatchSize(int(m.BatchSize))) if err != nil { return errors.Wrap(err, "setting up Pilosa") } diff --git a/http/command.go b/http/command.go index 4667ebc..02d15b1 100644 --- a/http/command.go +++ b/http/command.go @@ -2,7 +2,9 @@ package http import ( "log" + "time" + gopilosa "github.com/pilosa/go-pilosa" "github.com/pilosa/pdk" "github.com/pkg/errors" ) @@ -14,25 +16,31 @@ type SubjecterOpts struct { // Main holds the config for the http command. type Main struct { - Bind string `help:"Listen for post requests on this address."` - PilosaHosts []string `help:"List of host:port pairs for Pilosa cluster."` - Index string `help:"Pilosa index to write to."` - BatchSize uint `help:"Batch size for Pilosa imports."` - Framer pdk.DashFrame - Subjecter SubjecterOpts - Proxy string `help:"Bind to this address to proxy and translate requests to Pilosa"` + Bind string `help:"Listen for post requests on this address."` + PilosaHosts []string `help:"List of host:port pairs for Pilosa cluster."` + Index string `help:"Pilosa index to write to."` + BatchSize uint `help:"Batch size for Pilosa imports. Default: 100000"` + ImportStrategy string `help:"Import strategy. One of 'batch' or 'timeout'. Default: batch"` + ThreadCount uint `help:"Number of import workers. Default: 1"` + ImportTimeout uint `help:"Timeout in milliseconds for the import strategy. Default: 100"` + Framer pdk.DashFrame + Subjecter SubjecterOpts + Proxy string `help:"Bind to this address to proxy and translate requests to Pilosa"` } // NewMain gets a new Main with default values. func NewMain() *Main { return &Main{ - Bind: ":12121", - PilosaHosts: []string{"localhost:10101"}, - Index: "jsonhttp", - BatchSize: 10, - Framer: pdk.DashFrame{}, - Subjecter: SubjecterOpts{}, - Proxy: ":13131", + Bind: ":12121", + PilosaHosts: []string{"localhost:10101"}, + Index: "jsonhttp", + BatchSize: 100000, + ImportStrategy: "batch", + ThreadCount: 1, + ImportTimeout: 200, + Framer: pdk.DashFrame{}, + Subjecter: SubjecterOpts{}, + Proxy: ":13131", } } @@ -79,7 +87,33 @@ func (m *Main) Run() error { mapper := pdk.NewCollapsingMapper() mapper.Framer = &m.Framer - indexer, err := pdk.SetupPilosa(m.PilosaHosts, m.Index, []pdk.FrameSpec{}, m.BatchSize) + importOptions := []gopilosa.ImportOption{} + + if m.BatchSize < 1 { + return errors.New("Batch size should be greater than 0") + } + importOptions = append(importOptions, gopilosa.OptImportBatchSize(int(m.BatchSize))) + + if m.ImportTimeout < 1 { + return errors.New("Import timeout should be greater than 0") + } + importOptions = append(importOptions, gopilosa.OptImportTimeout(time.Duration(m.ImportTimeout)*time.Millisecond)) + + if m.ThreadCount < 1 { + return errors.New("Number of import workers should be greater than 0") + } + importOptions = append(importOptions, gopilosa.OptImportThreadCount(int(m.ThreadCount))) + + switch m.ImportStrategy { + case "batch": + importOptions = append(importOptions, gopilosa.OptImportStrategy(gopilosa.BatchImport)) + case "timeout": + importOptions = append(importOptions, gopilosa.OptImportStrategy(gopilosa.TimeoutImport)) + default: + return errors.New("Import strategy should be one of: batch, timeout") + } + + indexer, err := pdk.SetupPilosa(m.PilosaHosts, m.Index, []pdk.FrameSpec{}, importOptions...) if err != nil { return errors.Wrap(err, "setting up Pilosa") } diff --git a/kafka/main.go b/kafka/main.go index d454ca1..439bc57 100644 --- a/kafka/main.go +++ b/kafka/main.go @@ -3,6 +3,7 @@ package kafka import ( "log" + gopilosa "github.com/pilosa/go-pilosa" "github.com/pilosa/pdk" "github.com/pkg/errors" ) @@ -61,7 +62,9 @@ func (m *Main) Run() error { mapper := pdk.NewCollapsingMapper() mapper.Framer = &m.Framer - indexer, err := pdk.SetupPilosa(m.PilosaHosts, m.Index, []pdk.FrameSpec{}, m.BatchSize) + indexer, err := pdk.SetupPilosa(m.PilosaHosts, m.Index, []pdk.FrameSpec{}, + gopilosa.OptImportStrategy(gopilosa.BatchImport), + gopilosa.OptImportBatchSize(int(m.BatchSize))) if err != nil { return errors.Wrap(err, "setting up Pilosa") } diff --git a/pilosa.go b/pilosa.go index 4d73092..bc7f2d8 100644 --- a/pilosa.go +++ b/pilosa.go @@ -11,8 +11,8 @@ import ( ) type Index struct { - client *gopilosa.Client - batchSize uint + client *gopilosa.Client + importOptions []gopilosa.ImportOption lock sync.RWMutex index *gopilosa.Index @@ -186,7 +186,7 @@ func (i *Index) setupFrame(frame FrameSpec) error { i.importWG.Add(1) go func(fram *gopilosa.Frame, cbi chanBitIterator) { defer i.importWG.Done() - err := i.client.ImportFrame(fram, cbi, gopilosa.OptImportStrategy(gopilosa.BatchImport), gopilosa.OptImportBatchSize(int(i.batchSize))) + err := i.client.ImportFrame(fram, cbi, i.importOptions...) if err != nil { log.Println(errors.Wrapf(err, "starting frame import for %v", frame.Name)) } @@ -213,7 +213,7 @@ func (i *Index) setupFrame(frame FrameSpec) error { i.importWG.Add(1) go func(fram *gopilosa.Frame, field FieldSpec, cvi chanValIterator) { defer i.importWG.Done() - err := i.client.ImportValueFrame(fram, field.Name, cvi, gopilosa.OptImportStrategy(gopilosa.BatchImport), gopilosa.OptImportBatchSize(int(i.batchSize))) + err := i.client.ImportValueFrame(fram, field.Name, cvi, i.importOptions...) if err != nil { log.Println(errors.Wrapf(err, "starting field import for %v", field)) } @@ -231,9 +231,9 @@ func (i *Index) ensureField(frame *gopilosa.Frame, fieldSpec FieldSpec) error { } // SetupPilosa returns a new Indexer after creating the given frames and starting importers. -func SetupPilosa(hosts []string, index string, frames []FrameSpec, batchsize uint) (Indexer, error) { +func SetupPilosa(hosts []string, index string, frames []FrameSpec, importOptions ...gopilosa.ImportOption) (Indexer, error) { indexer := newIndex() - indexer.batchSize = batchsize + indexer.importOptions = importOptions client, err := gopilosa.NewClient(hosts, gopilosa.SocketTimeout(time.Minute*60), gopilosa.ConnectTimeout(time.Second*60)) diff --git a/pilosa_test.go b/pilosa_test.go index 7918620..ada5f9a 100644 --- a/pilosa_test.go +++ b/pilosa_test.go @@ -52,7 +52,9 @@ func TestSetupPilosa(t *testing.T) { }, } - indexer, err := pdk.SetupPilosa(hosts, "newindex", frames, 2) + indexer, err := pdk.SetupPilosa(hosts, "newindex", frames, + gopilosa.OptImportStrategy(gopilosa.BatchImport), + gopilosa.OptImportBatchSize(2)) if err != nil { t.Fatalf("SetupPilosa: %v", err) } diff --git a/usecase/ssb/cmd.go b/usecase/ssb/cmd.go index ea17ef3..c365fc6 100644 --- a/usecase/ssb/cmd.go +++ b/usecase/ssb/cmd.go @@ -11,6 +11,7 @@ import ( "sync" "time" + gopilosa "github.com/pilosa/go-pilosa" "github.com/pilosa/pdk" "github.com/pkg/errors" ) @@ -49,7 +50,9 @@ func (m *Main) Run() (err error) { return errors.Wrap(err, "getting new translator") } log.Println("setting up pilosa") - m.index, err = pdk.SetupPilosa(m.Hosts, m.Index, frames, 1000000) + m.index, err = pdk.SetupPilosa(m.Hosts, m.Index, frames, + gopilosa.OptImportStrategy(gopilosa.BatchImport), + gopilosa.OptImportBatchSize(1000000)) if err != nil { return errors.Wrap(err, "setting up Pilosa") } diff --git a/usecase/taxi/main.go b/usecase/taxi/main.go index 0b95840..06d834d 100644 --- a/usecase/taxi/main.go +++ b/usecase/taxi/main.go @@ -17,6 +17,7 @@ import ( // for profiling _ "net/http/pprof" + gopilosa "github.com/pilosa/go-pilosa" "github.com/pilosa/pdk" "github.com/pkg/errors" ) @@ -111,7 +112,9 @@ func (m *Main) Run() error { pdk.NewRankedFrameSpec("pickup_elevation", 10000), pdk.NewRankedFrameSpec("drop_elevation", 10000), } - m.indexer, err = pdk.SetupPilosa([]string{m.PilosaHost}, m.Index, frames, uint(m.BufferSize)) + m.indexer, err = pdk.SetupPilosa([]string{m.PilosaHost}, m.Index, frames, + gopilosa.OptImportStrategy(gopilosa.BatchImport), + gopilosa.OptImportBatchSize(m.BufferSize)) if err != nil { return errors.Wrap(err, "setting up indexer") } diff --git a/usecase/weather/main.go b/usecase/weather/main.go index 77e4a8e..27d83dc 100644 --- a/usecase/weather/main.go +++ b/usecase/weather/main.go @@ -120,7 +120,9 @@ func (m *Main) Run() (err error) { pdk.NewRankedFrameSpec("pressure_i", 0), pdk.NewRankedFrameSpec("humidity", 0), } - m.importer, err = pdk.SetupPilosa([]string{m.PilosaHost}, m.Index, writeFrames, uint(m.BufferSize)) + m.importer, err = pdk.SetupPilosa([]string{m.PilosaHost}, m.Index, writeFrames, + gopilosa.OptImportStrategy(gopilosa.BatchImport), + gopilosa.OptImportBatchSize(m.BufferSize)) if err != nil { return errors.Wrap(err, "setting up pilosa") }