Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Influx v2 support #209

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
39 changes: 31 additions & 8 deletions cmd/tsbs_load_influx/creator.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,16 @@ func (d *dbCreator) DBExists(dbName string) bool {
}

func (d *dbCreator) listDatabases() ([]string, error) {
client := http.Client{}
u := fmt.Sprintf("%s/query?q=show%%20databases", d.daemonURL)
resp, err := http.Get(u)
req, err := http.NewRequest("GET", u, nil)
if authToken != "" {
req.Header = http.Header{
headerAuthorization: []string{fmt.Sprintf("Token %s", authToken)},
}
}
resp, err := client.Do(req)

if err != nil {
return nil, fmt.Errorf("listDatabases error: %s", err.Error())
}
Expand All @@ -61,20 +69,30 @@ func (d *dbCreator) listDatabases() ([]string, error) {
}

ret := []string{}
for _, nestedName := range listing.Results[0].Series[0].Values {
name := nestedName[0]
// the _internal database is skipped:
if name == "_internal" {
continue
if len(listing.Results) > 0 {
for _, nestedName := range listing.Results[0].Series[0].Values {
name := nestedName[0]
// the _internal database is skipped:
if name == "_internal" {
continue
}
ret = append(ret, name)
}
ret = append(ret, name)
}
return ret, nil
}

func (d *dbCreator) RemoveOldDB(dbName string) error {
u := fmt.Sprintf("%s/query?q=drop+database+%s", d.daemonURL, dbName)
resp, err := http.Post(u, "text/plain", nil)
client := http.Client{}
req, err := http.NewRequest("POST", u, nil)
if authToken != "" {
req.Header = http.Header{
"Content-Type": []string{"text/plain"},
headerAuthorization: []string{fmt.Sprintf("Token %s", authToken)},
}
}
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("drop db error: %s", err.Error())
}
Expand All @@ -99,6 +117,11 @@ func (d *dbCreator) CreateDB(dbName string) error {
u.RawQuery = v.Encode()

req, err := http.NewRequest("GET", u.String(), nil)
if authToken != "" {
req.Header = http.Header{
headerAuthorization: []string{fmt.Sprintf("Token %s", authToken)},
}
}
if err != nil {
return err
}
Expand Down
8 changes: 6 additions & 2 deletions cmd/tsbs_load_influx/http_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
const (
httpClientName = "tsbs_load_influx"
headerContentEncoding = "Content-Encoding"
headerAuthorization = "Authorization"
headerGzip = "gzip"
)

Expand Down Expand Up @@ -65,13 +66,16 @@ var (
textPlain = []byte("text/plain")
)

func (w *HTTPWriter) initializeReq(req *fasthttp.Request, body []byte, isGzip bool) {
func (w *HTTPWriter) initializeReq(req *fasthttp.Request, body []byte, isGzip bool, authToken string) {
req.Header.SetContentTypeBytes(textPlain)
req.Header.SetMethodBytes(methodPost)
req.Header.SetRequestURIBytes(w.url)
if isGzip {
req.Header.Add(headerContentEncoding, headerGzip)
}
if authToken != "" {
req.Header.Add(headerAuthorization, fmt.Sprintf("Token %s", authToken))
}
req.SetBody(body)
}

Expand All @@ -96,7 +100,7 @@ func (w *HTTPWriter) executeReq(req *fasthttp.Request, resp *fasthttp.Response)
func (w *HTTPWriter) WriteLineProtocol(body []byte, isGzip bool) (int64, error) {
req := fasthttp.AcquireRequest()
defer fasthttp.ReleaseRequest(req)
w.initializeReq(req, body, isGzip)
w.initializeReq(req, body, isGzip, authToken)

resp := fasthttp.AcquireResponse()
defer fasthttp.ReleaseResponse(resp)
Expand Down
10 changes: 5 additions & 5 deletions cmd/tsbs_load_influx/http_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func TestHTTPWriterInitializeReq(t *testing.T) {
defer fasthttp.ReleaseRequest(req)
w := NewHTTPWriter(testConf, testConsistency)
body := "this is a test body"
w.initializeReq(req, []byte(body), false)
w.initializeReq(req, []byte(body), false, "")

if got := string(req.Body()); got != body {
t.Errorf("non-gzip: body not correct: got '%s' want '%s'", got, body)
Expand All @@ -129,7 +129,7 @@ func TestHTTPWriterInitializeReq(t *testing.T) {
t.Errorf("non-gzip: Content-Encoding is not empty: got %s", got)
}

w.initializeReq(req, []byte(body), true)
w.initializeReq(req, []byte(body), true, "")
if got := string(req.Header.Peek(headerContentEncoding)); got != headerGzip {
t.Errorf("gzip: Content-Encoding is not correct: got %s want %s", got, headerGzip)
}
Expand All @@ -144,7 +144,7 @@ func TestHTTPWriterExecuteReq(t *testing.T) {
w := NewHTTPWriter(testConf, testConsistency)
body := "this is a test body"
normalURL := w.url // save for later modification
w.initializeReq(req, []byte(body), false)
w.initializeReq(req, []byte(body), false, "")
resp := fasthttp.AcquireResponse()
defer fasthttp.ReleaseResponse(resp)
lat, err := w.executeReq(req, resp)
Expand All @@ -161,7 +161,7 @@ func TestHTTPWriterExecuteReq(t *testing.T) {
w.url = []byte(fmt.Sprintf("%s&%s=true", string(normalURL), shouldBackoffParam))
req = fasthttp.AcquireRequest()
defer fasthttp.ReleaseRequest(req)
w.initializeReq(req, []byte(body), false)
w.initializeReq(req, []byte(body), false, "")
lat, err = w.executeReq(req, resp)
if err != errBackoff {
t.Errorf("unexpected error response received (not backoff error): %v", err)
Expand All @@ -176,7 +176,7 @@ func TestHTTPWriterExecuteReq(t *testing.T) {
w.url = []byte(fmt.Sprintf("%s&%s=true", string(normalURL), shouldInvalidParam))
req = fasthttp.AcquireRequest()
defer fasthttp.ReleaseRequest(req)
w.initializeReq(req, []byte(body), false)
w.initializeReq(req, []byte(body), false, "")
lat, err = w.executeReq(req, resp)
if err == nil {
t.Errorf("unexpected non-error response received")
Expand Down
11 changes: 11 additions & 0 deletions cmd/tsbs_load_influx/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ var (
useGzip bool
doAbortOnExist bool
consistency string
authToken string // InfluxDB v2
bucketId string // InfluxDB v2
orgId string // InfluxDB v2
)

// Global vars
Expand Down Expand Up @@ -73,13 +76,21 @@ func init() {
csvDaemonURLs = viper.GetString("urls")
replicationFactor = viper.GetInt("replication-factor")
consistency = viper.GetString("consistency")
authToken = viper.GetString("auth-token")
orgId = viper.GetString("org")
backoff = viper.GetDuration("backoff")
useGzip = viper.GetBool("gzip")

if _, ok := consistencyChoices[consistency]; !ok {
log.Fatalf("invalid consistency settings")
}

if authToken != "" {
log.Println("Using Authorization header in benchmark")
} else {
log.Println("Given no Authorization header was provided will not send it in benchmark")
}

daemonURLs = strings.Split(csvDaemonURLs, ",")
if len(daemonURLs) == 0 {
log.Fatal("missing 'urls' flag")
Expand Down
13 changes: 11 additions & 2 deletions cmd/tsbs_run_queries_influx/http_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
)

var bytesSlash = []byte("/") // heap optimization
var headerAuthorization = "Authorization"

// HTTPClient is a reusable HTTP Client.
type HTTPClient struct {
Expand All @@ -22,6 +23,7 @@ type HTTPClient struct {
Host []byte
HostString string
uri []byte
authToken string
}

// HTTPClientDoOptions wraps options uses when calling `Do`.
Expand All @@ -46,12 +48,17 @@ func getHttpClient() *http.Client {
}

// NewHTTPClient creates a new HTTPClient.
func NewHTTPClient(host string) *HTTPClient {
func NewHTTPClient(host string, authToken string) *HTTPClient {
token := ""
if authToken != "" {
token = fmt.Sprintf("Token %s", authToken)
}
return &HTTPClient{
client: getHttpClient(),
Host: []byte(host),
HostString: host,
uri: []byte{}, // heap optimization
authToken: token,
}
}

Expand All @@ -74,7 +81,9 @@ func (w *HTTPClient) Do(q *query.HTTP, opts *HTTPClientDoOptions) (lag float64,
if err != nil {
panic(err)
}

if w.authToken != "" {
req.Header.Add(headerAuthorization, w.authToken)
}
// Perform the request while tracking latency:
start := time.Now()
resp, err := w.client.Do(req)
Expand Down
11 changes: 9 additions & 2 deletions cmd/tsbs_run_queries_influx/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
var (
daemonUrls []string
chunkSize uint64
authToken string
)

// Global vars:
Expand All @@ -35,6 +36,7 @@ func init() {

pflag.String("urls", "http://localhost:8086", "Daemon URLs, comma-separated. Will be used in a round-robin fashion.")
pflag.Uint64("chunk-response-size", 0, "Number of series to chunk results into. 0 means no chunking.")
pflag.String("auth-token", "", "Use the Authorization header with the Token scheme to provide your token to InfluxDB. If empty will not send the Authorization header.")

pflag.Parse()

Expand All @@ -49,8 +51,13 @@ func init() {
}

csvDaemonUrls = viper.GetString("urls")
authToken = viper.GetString("auth-token")
chunkSize = viper.GetUint64("chunk-response-size")

if authToken != "" {
log.Println("Using Authorization header in benchmark")
} else {
log.Println("Given no Authorization header was provided will not send it in benchmark")
}
daemonUrls = strings.Split(csvDaemonUrls, ",")
if len(daemonUrls) == 0 {
log.Fatal("missing 'urls' flag")
Expand Down Expand Up @@ -78,7 +85,7 @@ func (p *processor) Init(workerNumber int) {
database: runner.DatabaseName(),
}
url := daemonUrls[workerNumber%len(daemonUrls)]
p.w = NewHTTPClient(url)
p.w = NewHTTPClient(url, authToken)
}

func (p *processor) ProcessQuery(q query.Query, _ bool) ([]*query.Stat, error) {
Expand Down
39 changes: 39 additions & 0 deletions docs/influx.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,33 @@ using the data importer (`tsbs_load_influx`), and additional flags
available for the query runner (`tsbs_run_queries_influx`). **This
should be read *after* the main README.**

## Setup steps InfluxDB v2

If on a new setup run the following command:

```bash
influx setup
```

If you need to create a new bucket adjust the bucket name (`-n`) and the org name (`-o`) accordingly:

```bash
influx bucket create -n bucket-perf -o org -r 0
```

Create a DBRP mapping with the InfluxDB 1.x compatibility API ([official docs](https://docs.influxdata.com/influxdb/cloud/reference/cli/influx/v1/dbrp/create/)).

Adjust bucket name and db accordingly:

```bash
influx v1 dbrp create --db benchmark --rp 0 --bucket-id `influx bucket ls --name bucket-perf | awk -v i=2 -v j=1 'FNR == i {print $j}'` --default
```

Retrieve the auth token as follows:
```bash
influx auth list
```

## Data format

Data generated by `tsbs_generate_data` for InfluxDB is serialized in a
Expand Down Expand Up @@ -58,6 +85,11 @@ Whether to encode writes to the server with gzip. For best performance, encoding
with gzip is the best choice, but if the server does not support or has gzip
disabled, this flag should be set to false.

#### `-auth-token` (type: `string`, default: `""`)

Use the Authorization header with the Token scheme to provide your token to InfluxDB.
If empty will not send the Authorization header.

---

## `tsbs_run_queries_influx` Additional Flags
Expand All @@ -76,3 +108,10 @@ everything in a single response.

Comma-separated list of URLs to connect to for querying. Workers will be
distributed in a round robin fashion across the URLs.

### Miscellaneous

#### `-auth-token` (type: `string`, default: `""`)

Use the Authorization header with the Token scheme to provide your token to InfluxDB.
If empty will not send the Authorization header.
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -902,6 +902,7 @@ github.com/timescale/promscale v0.0.0-20201006153045-6a66a36f5c84/go.mod h1:rkhy
github.com/tinylib/msgp v1.0.2/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE=
github.com/tklauser/go-sysconf v0.3.5 h1:uu3Xl4nkLzQfXNsWn15rPc/HQCJKObbt1dKJeWp3vU4=
github.com/tklauser/go-sysconf v0.3.5/go.mod h1:MkWzOF4RMCshBAMXuhXJs64Rte09mITnppBXY/rYEFI=
github.com/tklauser/numcpus v0.2.2 h1:oyhllyrScuYI6g+h/zUvNXNp1wy7x8qQy3t/piefldA=
github.com/tklauser/numcpus v0.2.2/go.mod h1:x3qojaO3uyYt0i56EW/VUYs7uBvdl2fkfZFu0T9wgjM=
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
Expand Down
2 changes: 2 additions & 0 deletions pkg/targets/influx/implemented_target.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ func (t *influxTarget) TargetSpecificFlags(flagPrefix string, flagSet *pflag.Fla
flagSet.String(flagPrefix+"urls", "http://localhost:8086", "InfluxDB URLs, comma-separated. Will be used in a round-robin fashion.")
flagSet.Int(flagPrefix+"replication-factor", 1, "Cluster replication factor (only applies to clustered databases).")
flagSet.String(flagPrefix+"consistency", "all", "Write consistency. Must be one of: any, one, quorum, all.")
flagSet.String(flagPrefix+"auth-token", "", "Use the Authorization header with the Token scheme to provide your token to InfluxDB. If empty will not send the Authorization header.")
flagSet.String(flagPrefix+"organization", "", "Organization name (InfluxDB v2).")
flagSet.Duration(flagPrefix+"backoff", time.Second, "Time to sleep between requests when server indicates backpressure is needed.")
flagSet.Bool(flagPrefix+"gzip", true, "Whether to gzip encode requests (default true).")
}
Expand Down