Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changing to Mongo driver library #229

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
72 changes: 34 additions & 38 deletions cmd/tsbs_load_mongo/aggregate_loader.go
Original file line number Diff line number Diff line change
@@ -1,35 +1,41 @@
package main

import (
"context"
"fmt"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"hash/fnv"
"log"
"sync"
"time"

"github.com/globalsign/mgo"
"github.com/globalsign/mgo/bson"
//"github.com/globalsign/mgo"
//"github.com/globalsign/mgo/bson"
"github.com/timescale/tsbs/load"
"github.com/timescale/tsbs/pkg/data"
"github.com/timescale/tsbs/pkg/targets"
"github.com/timescale/tsbs/pkg/targets/mongo"
tsbsmongo "github.com/timescale/tsbs/pkg/targets/mongo" //to resolve name collision with mongo driver
)

type hostnameIndexer struct {
partitions uint
}

func (i *hostnameIndexer) GetIndex(item data.LoadedPoint) uint {
p := item.Data.(*mongo.MongoPoint)
t := &mongo.MongoTag{}
p := item.Data.(*tsbsmongo.MongoPoint)
t := &tsbsmongo.MongoTag{}
for j := 0; j < p.TagsLength(); j++ {
p.Tags(t, j)
key := string(t.Key())
if key == "hostname" || key == "name" {
// the hostame is the defacto index for devops tags
// the truck name is the defacto index for iot tags
h := fnv.New32a()
h.Write([]byte(string(t.Value())))
_, err := h.Write(t.Value())
if err != nil {
panic("cannot write value to hash")
}
return uint(h.Sum32()) % i.partitions
}
}
Expand Down Expand Up @@ -78,21 +84,19 @@ var pPool = &sync.Pool{New: func() interface{} { return &point{} }}

type aggProcessor struct {
dbc *dbCreator
collection *mgo.Collection
collection *mongo.Collection

createdDocs map[string]bool
createQueue []interface{}
}

func (p *aggProcessor) Init(_ int, doLoad, _ bool) {
if doLoad {
sess := p.dbc.session.Copy()
db := sess.DB(loader.DatabaseName())
p.collection = db.C(collectionName)
db := p.dbc.client.Database(loader.DatabaseName())
p.collection = db.Collection(collectionName)
}
p.createdDocs = make(map[string]bool)
p.createQueue = []interface{}{}

}

// ProcessBatch receives a batch of bson.M documents (BSON maps) that
Expand Down Expand Up @@ -127,7 +131,7 @@ func (p *aggProcessor) ProcessBatch(b targets.Batch, doLoad bool) (uint64, uint6
eventCnt := uint64(0)
for _, event := range batch.arr {
tagsMap := map[string]string{}
t := &mongo.MongoTag{}
t := &tsbsmongo.MongoTag{}
for j := 0; j < event.TagsLength(); j++ {
event.Tags(t, j)
tagsMap[string(t.Key())] = string(t.Value())
Expand Down Expand Up @@ -161,7 +165,7 @@ func (p *aggProcessor) ProcessBatch(b targets.Batch, doLoad bool) (uint64, uint6
}
x := pPool.Get().(*point)
x.Fields = map[string]interface{}{}
f := &mongo.MongoReading{}
f := &tsbsmongo.MongoReading{}
for j := 0; j < event.FieldsLength(); j++ {
event.Fields(f, j)
x.Fields[string(f.Key())] = f.Value()
Expand All @@ -171,38 +175,36 @@ func (p *aggProcessor) ProcessBatch(b targets.Batch, doLoad bool) (uint64, uint6

docToEvents[docKey] = append(docToEvents[docKey], x)
}
coll := p.dbc.client.Database("benchmark").Collection(collectionName)

if doLoad {
// Checks if any new documents need to be made and does so
bulk := p.collection.Bulk()
bulk = insertNewAggregateDocs(p.collection, bulk, p.createQueue)
//bulk := p.collection.Bulk()
insertNewAggregateDocs(coll, p.dbc.ctx, p.createQueue)
p.createQueue = p.createQueue[:0]

// For each document, create one 'set' command for all records
// that belong to the document
for docKey, events := range docToEvents {
selector := bson.M{aggDocID: docKey}
updateMap := bson.M{}
for _, event := range events {
minKey := (event.Timestamp / (1e9 * 60)) % 60
secKey := (event.Timestamp / 1e9) % 60
key := fmt.Sprintf("events.%d.%d", minKey, secKey)
val := event.Fields

val[timestampField] = event.Timestamp
updateMap[key] = val
}

update := bson.M{"$set": updateMap}
bulk.Update(selector, update)
}

// All documents accounted for, finally run the operation
_, err := bulk.Run()
if err != nil {
log.Fatalf("Bulk aggregate update err: %s\n", err.Error())
filter := bson.M{
aggDocID: bson.M{
"$eq": docKey, //check if aggDocID field has value of docKey
},
}
_, err := coll.UpdateOne(p.dbc.ctx, filter, update)
if err != nil {
log.Fatalf(" Update err: %s\n", err.Error())
}
}

for _, events := range docToEvents {
for _, e := range events {
delete(e.Fields, timestampField)
Expand All @@ -215,26 +217,20 @@ func (p *aggProcessor) ProcessBatch(b targets.Batch, doLoad bool) (uint64, uint6

// insertNewAggregateDocs handles creating new aggregated documents when new devices
// or time periods are encountered
func insertNewAggregateDocs(collection *mgo.Collection, bulk *mgo.Bulk, createQueue []interface{}) *mgo.Bulk {
b := bulk
func insertNewAggregateDocs(collection *mongo.Collection, ctx context.Context, createQueue []interface{}) {
if len(createQueue) > 0 {
off := 0
for off < len(createQueue) {
l := off + aggInsertBatchSize
if l > len(createQueue) {
l = len(createQueue)
}

b.Insert(createQueue[off:l]...)
_, err := b.Run()
opts := options.InsertMany().SetOrdered(false)
_, err := collection.InsertMany(ctx, createQueue[off:l], opts)
if err != nil {
log.Fatalf("Bulk aggregate docs err: %s\n", err.Error())
log.Fatalf("Insert many aggregate docs err: %s\n", err.Error())
}
b = collection.Bulk()

off = l
}
}

return b
}
90 changes: 45 additions & 45 deletions cmd/tsbs_load_mongo/creator.go
Original file line number Diff line number Diff line change
@@ -1,29 +1,36 @@
package main

import (
"context"
"fmt"
"log"
"strings"

"github.com/globalsign/mgo"
"github.com/globalsign/mgo/bson"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)

type dbCreator struct {
session *mgo.Session
client *mongo.Client
ctx context.Context
cancel context.CancelFunc
}

func (d *dbCreator) Init() {
//log.Println("tsbs_load_mongo/creator/Init")
var err error
d.session, err = mgo.DialWithTimeout(daemonURL, writeTimeout)
d.ctx, d.cancel = context.WithTimeout(context.Background(), writeTimeout)
d.client, err = mongo.Connect(d.ctx, options.Client().ApplyURI(daemonURL))
if err != nil {
log.Println("Can't establish connection with", daemonURL)
log.Fatal(err)
}
d.session.SetMode(mgo.Eventual, false)
}

func (d *dbCreator) DBExists(dbName string) bool {
dbs, err := d.session.DatabaseNames()
//log.Println("tsbs_load_mongo/creator/DBExists")
dbs, err := d.client.ListDatabaseNames(d.ctx, bson.D{})
if err != nil {
log.Fatal(err)
}
Expand All @@ -36,74 +43,67 @@ func (d *dbCreator) DBExists(dbName string) bool {
}

func (d *dbCreator) RemoveOldDB(dbName string) error {
collections, err := d.session.DB(dbName).CollectionNames()
//log.Println("tsbs_load_mongo/creator/RemoveOldDB")
collection_names, err := d.client.Database(dbName).ListCollectionNames(d.ctx, bson.D{})
log.Printf("collection_names : %s", collection_names)
if err != nil {
return err
}
for _, name := range collections {
d.session.DB(dbName).C(name).DropCollection()
for _, coll := range collection_names {
log.Printf("collection found : %s", coll)
log.Println("deleting the previous collection")
err := d.client.Database(dbName).Collection(coll).Drop(d.ctx)
if err != nil {
log.Printf("Could not delete collection : %s", err.Error())
}
}

return nil
}

func (d *dbCreator) CreateDB(dbName string) error {
cmd := make(bson.D, 0, 4)
cmd = append(cmd, bson.DocElem{Name: "create", Value: collectionName})

// wiredtiger settings
cmd = append(cmd, bson.DocElem{
Name: "storageEngine", Value: map[string]interface{}{
"wiredTiger": map[string]interface{}{
"configString": "block_compressor=snappy",
},
},
})

err := d.session.DB(dbName).Run(cmd, nil)
//Starting in MongoDB 3.2, the WiredTiger storage engine is the default storage engine
err := d.client.Database(dbName).CreateCollection(d.ctx, collectionName)
if err != nil {
if strings.Contains(err.Error(), "already exists") {
log.Printf("collection %s already exists", dbName)
return nil
}
log.Printf("create collection err: %v", err)
return fmt.Errorf("create collection err: %v", err)
}

collection := d.session.DB(dbName).C(collectionName)
var key []string
collection := d.client.Database(dbName).Collection(collectionName)
var key bson.D
if documentPer {
key = []string{"measurement", "tags.hostname", timestampField}
key = bson.D{{"measurement", 1}, {"tags.hostname", 1}, {timestampField, 1}}
} else {
key = []string{aggKeyID, "measurement", "tags.hostname"}
key = bson.D{{aggKeyID, 1}, {"measurement", 1}, {"tags.hostname", 1}}
}

index := mgo.Index{
Key: key,
Unique: false, // Unique does not work on the entire array of tags!
Background: false,
Sparse: false,
index := mongo.IndexModel{
Keys: key,
Options: options.Index().SetName("default_index"),
}
err = collection.EnsureIndex(index)
idxview := collection.Indexes()
_, err = idxview.CreateOne(d.ctx, index)
if err != nil {
return fmt.Errorf("create basic index err: %v", err)
log.Printf("create index err: %v", err)
panic(err)
}

// To make updates for new records more efficient, we need a efficient doc
// lookup index
if !documentPer {
err = collection.EnsureIndex(mgo.Index{
Key: []string{aggDocID},
Unique: false,
Background: false,
Sparse: false,
_, err := idxview.CreateOne(d.ctx, mongo.IndexModel{
Keys: bson.D{{aggDocID, 1}},
Options: options.Index().SetName("doc_lookup_index"),
})
if err != nil {
return fmt.Errorf("create agg doc index err: %v", err)
log.Printf("create index err: %v", err)
panic(err)
}
}

return nil
}

func (d *dbCreator) Close() {
d.session.Close()
//closing the database connection here
//causes an error in the bulk loading
}
22 changes: 10 additions & 12 deletions cmd/tsbs_load_mongo/document_per_loader.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package main

import (
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"log"
"sync"

"github.com/globalsign/mgo"
"github.com/timescale/tsbs/load"
"github.com/timescale/tsbs/pkg/targets"
"github.com/timescale/tsbs/pkg/targets/mongo"
tsbsmongo "github.com/timescale/tsbs/pkg/targets/mongo"
)

// naiveBenchmark allows you to run a benchmark using the naive, one document per
Expand Down Expand Up @@ -39,16 +39,15 @@ var spPool = &sync.Pool{New: func() interface{} { return &singlePoint{} }}

type naiveProcessor struct {
dbc *dbCreator
collection *mgo.Collection
collection *mongo.Collection

pvs []interface{}
}

func (p *naiveProcessor) Init(_ int, doLoad, _ bool) {
if doLoad {
sess := p.dbc.session.Copy()
db := sess.DB(loader.DatabaseName())
p.collection = db.C(collectionName)
db := p.dbc.client.Database(loader.DatabaseName())
p.collection = db.Collection(collectionName)
}
p.pvs = []interface{}{}
}
Expand All @@ -70,12 +69,12 @@ func (p *naiveProcessor) ProcessBatch(b targets.Batch, doLoad bool) (uint64, uin
x.Timestamp = event.Timestamp()
x.Fields = map[string]interface{}{}
x.Tags = map[string]string{}
f := &mongo.MongoReading{}
f := &tsbsmongo.MongoReading{}
for j := 0; j < event.FieldsLength(); j++ {
event.Fields(f, j)
x.Fields[string(f.Key())] = f.Value()
}
t := &mongo.MongoTag{}
t := &tsbsmongo.MongoTag{}
for j := 0; j < event.TagsLength(); j++ {
event.Tags(t, j)
x.Tags[string(t.Key())] = string(t.Value())
Expand All @@ -85,9 +84,8 @@ func (p *naiveProcessor) ProcessBatch(b targets.Batch, doLoad bool) (uint64, uin
}

if doLoad {
bulk := p.collection.Bulk()
bulk.Insert(p.pvs...)
_, err := bulk.Run()
opts := options.InsertMany().SetOrdered(false)
_, err := p.collection.InsertMany(p.dbc.ctx, p.pvs, opts)
if err != nil {
log.Fatalf("Bulk insert docs err: %s\n", err.Error())
}
Expand Down