Skip to content
This repository has been archived by the owner on Sep 8, 2018. It is now read-only.

[WIP] Trigram based indexer sidecar #123

Open
wants to merge 3 commits into
base: topics2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
222 changes: 222 additions & 0 deletions cmd/oklog/index.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
package main

import (
"flag"
"fmt"
"net"
"net/http"
"os"
"path/filepath"
"strings"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"

"github.com/oklog/oklog/pkg/cluster"
"github.com/oklog/oklog/pkg/event"
"github.com/oklog/oklog/pkg/fs"
"github.com/oklog/oklog/pkg/group"
"github.com/oklog/oklog/pkg/index"
)

var (
defaultIndexPath = filepath.Join("data", "index")
)

func runIndex(args []string) error {
flagset := flag.NewFlagSet("store", flag.ExitOnError)
var (
debug = flagset.Bool("debug", false, "debug logging")
apiAddr = flagset.String("api", defaultAPIAddr, "listen address for store API")
clusterBindAddr = flagset.String("cluster", defaultClusterAddr, "listen address for cluster")
clusterAdvertiseAddr = flagset.String("cluster.advertise-addr", "", "optional, explicit address to advertise in cluster")
storePath = flagset.String("store.path", defaultStorePath, "path holding segment files for the storage tier")
indexPath = flagset.String("index.path", defaultIndexPath, "path holding segment and index files for the index tier")
filesystem = flagset.String("filesystem", defaultFilesystem, "real, virtual, nop")
clusterPeers = stringslice{}
)
flagset.Var(&clusterPeers, "peer", "cluster peer host:port (repeatable)")
flagset.Usage = usageFor(flagset, "oklog store [flags]")
if err := flagset.Parse(args); err != nil {
return err
}

// Logging.
var logger log.Logger
{
logLevel := level.AllowInfo()
if *debug {
logLevel = level.AllowAll()
}
logger = log.NewLogfmtLogger(os.Stderr)
logger = log.With(logger, "ts", log.DefaultTimestampUTC)
logger = level.NewFilter(logger, logLevel)
}

// Parse URLs for listeners.
apiNetwork, apiAddress, _, apiPort, err := parseAddr(*apiAddr, defaultAPIPort)
if err != nil {
return err
}
_, _, clusterBindHost, clusterBindPort, err := parseAddr(*clusterBindAddr, defaultClusterPort)
if err != nil {
return err
}
level.Info(logger).Log("cluster_bind", fmt.Sprintf("%s:%d", clusterBindHost, clusterBindPort))
var (
clusterAdvertiseHost string
clusterAdvertisePort int
)
if *clusterAdvertiseAddr != "" {
_, _, clusterAdvertiseHost, clusterAdvertisePort, err = parseAddr(*clusterAdvertiseAddr, defaultClusterPort)
if err != nil {
return err
}
level.Info(logger).Log("cluster_advertise", fmt.Sprintf("%s:%d", clusterAdvertiseHost, clusterAdvertisePort))
}

// Calculate an advertise IP.
advertiseIP, err := cluster.CalculateAdvertiseIP(clusterBindHost, clusterAdvertiseHost, net.DefaultResolver, logger)
if err != nil {
level.Error(logger).Log("err", "couldn't deduce an advertise IP: "+err.Error())
return err
}
if hasNonlocal(clusterPeers) && isUnroutable(advertiseIP.String()) {
level.Warn(logger).Log("err", "this node advertises itself on an unroutable IP", "ip", advertiseIP.String())
level.Warn(logger).Log("err", "this node will be unreachable in the cluster")
level.Warn(logger).Log("err", "provide -cluster.advertise-addr as a routable IP address or hostname")
}
level.Info(logger).Log("user_bind_host", clusterBindHost, "user_advertise_host", clusterAdvertiseHost, "calculated_advertise_ip", advertiseIP)
clusterAdvertiseHost = advertiseIP.String()
if clusterAdvertisePort == 0 {
clusterAdvertisePort = clusterBindPort
}

// Bind listeners.
apiListener, err := net.Listen(apiNetwork, apiAddress)
if err != nil {
return err
}
level.Info(logger).Log("API", fmt.Sprintf("%s://%s", apiNetwork, apiAddress))
_ = apiListener

// Create storelog.
var fsys fs.Filesystem
switch strings.ToLower(*filesystem) {
case "real":
fsys = fs.NewRealFilesystem()
case "virtual":
fsys = fs.NewVirtualFilesystem()
case "nop":
fsys = fs.NewNopFilesystem()
default:
return errors.Errorf("invalid -filesystem %q", *filesystem)
}

// Create peer.
peer, err := cluster.NewPeer(
clusterBindHost, clusterBindPort,
clusterAdvertiseHost, clusterAdvertisePort,
clusterPeers,
cluster.PeerTypeIndex, apiPort,
log.With(logger, "component", "cluster"),
)
if err != nil {
return err
}
prometheus.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: "oklog",
Name: "cluster_size",
Help: "Number of peers in the cluster from this node's perspective.",
}, func() float64 { return float64(peer.ClusterSize()) }))

// Create the HTTP clients we'll use for various purposes.
unlimitedClient := http.DefaultClient // no timeouts, be careful
timeoutClient := &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
ResponseHeaderTimeout: 5 * time.Second,
Dial: (&net.Dialer{
Timeout: 10 * time.Second,
KeepAlive: 30 * time.Second,
}).Dial,
TLSHandshakeTimeout: 10 * time.Second,
DisableKeepAlives: false,
MaxIdleConnsPerHost: 1,
},
}
_ = unlimitedClient
_ = timeoutClient

// Execution group.
var g group.Group
{
cancel := make(chan struct{})
g.Add(func() error {
<-cancel
return peer.Leave(time.Second)
}, func(error) {
close(cancel)
})
}
{
ix, err := index.NewIndex(fsys, *indexPath, *storePath,
event.LogReporter{Logger: log.With(logger, "component", "Indexer")})
if err != nil {
return err
}
tickr := time.NewTicker(time.Second)
stopc := make(chan struct{})

g.Add(func() error {
for {
select {
case <-tickr.C:
if err := ix.Sync(); err != nil {
fmt.Println("err", err)
}
case <-stopc:
return nil
}
}
}, func(error) {
close(stopc)
tickr.Stop()
})
}
// {
// g.Add(func() error {
// mux := http.NewServeMux()
// api := index.NewAPI(
// peer,
// indexer,
// timeoutClient,
// unlimitedClient,
// )
// defer func() {
// if err := api.Close(); err != nil {
// level.Warn(logger).Log("err", err)
// }
// }()
// mux.Handle("/index/", http.StripPrefix("/index", api))
// registerMetrics(mux)
// registerProfile(mux)
// registerHealthCheck(mux)
// return http.Serve(apiListener, cors.Default().Handler(mux))
// }, func(error) {
// apiListener.Close()
// })
// }
{
cancel := make(chan struct{})
g.Add(func() error {
return interrupt(cancel)
}, func(error) {
close(cancel)
})
}
return g.Run()
}
15 changes: 8 additions & 7 deletions cmd/oklog/ingeststore.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/rs/cors"

"github.com/oklog/oklog/pkg/cluster"
"github.com/oklog/oklog/pkg/event"
"github.com/oklog/oklog/pkg/fs"
"github.com/oklog/oklog/pkg/group"
"github.com/oklog/oklog/pkg/ingest"
Expand Down Expand Up @@ -330,7 +331,7 @@ func runIngestStore(args []string) error {
fsys,
stagingPath,
*segmentTargetSize, *segmentBufferSize,
store.LogReporter{Logger: log.With(logger, "component", "FileLog")},
event.LogReporter{Logger: log.With(logger, "component", "FileLog")},
)
if err != nil {
return err
Expand Down Expand Up @@ -457,7 +458,7 @@ func runIngestStore(args []string) error {
consumedBytes,
replicatedSegments.WithLabelValues("egress"),
replicatedBytes.WithLabelValues("egress"),
store.LogReporter{Logger: log.With(logger, "component", "Consumer")},
event.LogReporter{Logger: log.With(logger, "component", "Consumer")},
)
g.Add(func() error {
c.Run()
Expand All @@ -473,7 +474,7 @@ func runIngestStore(args []string) error {
topicPath,
*segmentTargetSize,
*segmentBufferSize,
store.LogReporter{Logger: log.With(logger, "component", "TopicLogs")},
event.LogReporter{Logger: log.With(logger, "component", "TopicLogs")},
)
}
{
Expand All @@ -482,7 +483,7 @@ func runIngestStore(args []string) error {
return err
}
// TODO(fabxc): track delay between staging and demux completion in a metric.
reporter := store.LogReporter{Logger: log.With(logger, "component", "Demuxer")}
reporter := event.LogReporter{Logger: log.With(logger, "component", "Demuxer")}
demux := store.NewDemuxer(stagingLog, topicLogs, reporter)

g.Add(func() error {
Expand All @@ -502,14 +503,14 @@ func runIngestStore(args []string) error {
compactDuration,
trashedSegments,
purgedSegments,
store.LogReporter{Logger: log.With(logger, "component", "Compacter", "topic", t)},
event.LogReporter{Logger: log.With(logger, "component", "Compacter", "topic", t)},
)
}
topicLogs, err := newTopicLogs()
if err != nil {
return err
}
reporter := store.LogReporter{Logger: log.With(logger, "component", "TopicCompacter")}
reporter := event.LogReporter{Logger: log.With(logger, "component", "TopicCompacter")}
compacters := store.NewTopicCompacters(topicLogs, cfac, reporter)

g.Add(func() error {
Expand Down Expand Up @@ -544,7 +545,7 @@ func runIngestStore(args []string) error {
replicatedSegments.WithLabelValues("ingress"),
replicatedBytes.WithLabelValues("ingress"),
apiDuration,
store.LogReporter{Logger: log.With(logger, "component", "API")},
event.LogReporter{Logger: log.With(logger, "component", "API")},
)
defer func() {
if err := api.Close(); err != nil {
Expand Down
3 changes: 3 additions & 0 deletions cmd/oklog/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func usage() {
fmt.Fprintf(os.Stderr, " ingest Ingester node\n")
fmt.Fprintf(os.Stderr, " store Storage node\n")
fmt.Fprintf(os.Stderr, " ingeststore Combination ingest+store node, for small installations\n")
fmt.Fprintf(os.Stderr, " index Index node\n")
fmt.Fprintf(os.Stderr, " query Querying commandline tool\n")
fmt.Fprintf(os.Stderr, " stream Streaming commandline tool\n")
fmt.Fprintf(os.Stderr, " testsvc Test service, emits log lines at a fixed rate\n")
Expand Down Expand Up @@ -61,6 +62,8 @@ func main() {
run = runQuery
case "stream":
run = runStream
case "index":
run = runIndex
case "testsvc":
run = runTestService
default:
Expand Down
3 changes: 2 additions & 1 deletion cmd/oklog/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/pkg/errors"

"github.com/oklog/oklog/pkg/record"
"github.com/oklog/oklog/pkg/store"
"github.com/oklog/ulid"
)
Expand Down Expand Up @@ -111,7 +112,7 @@ func runQuery(args []string) error {
return errors.Errorf("%s %s: %s", req.Method, req.URL.String(), resp.Status)
}

var result store.QueryResult
var result record.QueryResult
if err := result.DecodeFrom(resp); err != nil {
return errors.Wrap(err, "decoding query result")
}
Expand Down
15 changes: 8 additions & 7 deletions cmd/oklog/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/rs/cors"

"github.com/oklog/oklog/pkg/cluster"
"github.com/oklog/oklog/pkg/event"
"github.com/oklog/oklog/pkg/fs"
"github.com/oklog/oklog/pkg/group"
"github.com/oklog/oklog/pkg/store"
Expand Down Expand Up @@ -218,7 +219,7 @@ func runStore(args []string) error {
fsys,
stagingPath,
*segmentTargetSize, *segmentBufferSize,
store.LogReporter{Logger: log.With(logger, "component", "FileLog")},
event.LogReporter{Logger: log.With(logger, "component", "FileLog")},
)
if err != nil {
return err
Expand Down Expand Up @@ -287,7 +288,7 @@ func runStore(args []string) error {
consumedBytes,
replicatedSegments.WithLabelValues("egress"),
replicatedBytes.WithLabelValues("egress"),
store.LogReporter{Logger: log.With(logger, "component", "Consumer")},
event.LogReporter{Logger: log.With(logger, "component", "Consumer")},
)
g.Add(func() error {
c.Run()
Expand All @@ -303,7 +304,7 @@ func runStore(args []string) error {
topicPath,
*segmentTargetSize,
*segmentBufferSize,
store.LogReporter{Logger: log.With(logger, "component", "TopicLogs")},
event.LogReporter{Logger: log.With(logger, "component", "TopicLogs")},
)
}
{
Expand All @@ -312,7 +313,7 @@ func runStore(args []string) error {
return err
}
// TODO(fabxc): track delay between staging and demux completion in a metric.
reporter := store.LogReporter{Logger: log.With(logger, "component", "Demuxer")}
reporter := event.LogReporter{Logger: log.With(logger, "component", "Demuxer")}
demux := store.NewDemuxer(stagingLog, topicLogs, reporter)

g.Add(func() error {
Expand All @@ -332,14 +333,14 @@ func runStore(args []string) error {
compactDuration,
trashedSegments,
purgedSegments,
store.LogReporter{Logger: log.With(logger, "component", "Compacter", "topic", t)},
event.LogReporter{Logger: log.With(logger, "component", "Compacter", "topic", t)},
)
}
topicLogs, err := newTopicLogs()
if err != nil {
return err
}
reporter := store.LogReporter{Logger: log.With(logger, "component", "TopicCompacter")}
reporter := event.LogReporter{Logger: log.With(logger, "component", "TopicCompacter")}
compacters := store.NewTopicCompacters(topicLogs, cfac, reporter)

g.Add(func() error {
Expand All @@ -365,7 +366,7 @@ func runStore(args []string) error {
replicatedSegments.WithLabelValues("ingress"),
replicatedBytes.WithLabelValues("ingress"),
apiDuration,
store.LogReporter{Logger: log.With(logger, "component", "API")},
event.LogReporter{Logger: log.With(logger, "component", "API")},
)
defer func() {
if err := api.Close(); err != nil {
Expand Down