Skip to content
This repository has been archived by the owner on Sep 29, 2018. It is now read-only.

Commit

Permalink
some other stuff like wait for cluster state
Browse files Browse the repository at this point in the history
  • Loading branch information
hoffoo committed Jan 29, 2015
1 parent cd8bfad commit 8f12f5f
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 51 deletions.
31 changes: 17 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,33 +10,36 @@

```
Application Options:
-s, --source= Source elasticsearch instance
-d, --dest= Destination elasticsearch instance
-c, --count= Number of documents at a time: ie "size" in the scroll
request (100)
-t, --time= Scroll time (1m)
--settings Copy sharding settings from source (true)
-f, --force Delete destination index before copying (false)
-i, --indexes= List of indexes to copy, comma separated (_all)
-a, --all Copy indexes starting with . (false)
-w, --workers= Concurrency (1)
-s, --source= source elasticsearch instance
-d, --dest= destination elasticsearch instance
-c, --count= number of documents at a time: ie "size" in the scroll request (100)
-t, --time= scroll time (1m)
-f, --force delete destination index before copying (false)
--shards= set a number of shards on newly created indexes
--docs-only load documents only, do not try to recreate indexes (false)
--index-only only create indexes, do not load documents (false)
--replicate enable replication while indexing into the new indexes (false)
-i, --indexes= list of indexes to copy, comma separated (_all)
-a, --all copy indexes starting with . and _ (false)
-w, --workers= concurrency (1)
--settings copy sharding settings from source (true)
--green wait for both hosts cluster status to be green before dump. otherwise yellow is okay (false)
```


## NOTES:

1. Has been tested getting data from 0.9 onto a 1.4 box. For other scenaries YMMV.
1. Has been tested getting data from 0.9 onto a 1.4 box. For other scenaries YMMV. (look out for this bug: https://github.com/elasticsearch/elasticsearch/issues/5165)
1. Copies using the [_source](http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/mapping-source-field.html) field in elasticsearch. If you have made modifications to it (excluding fields, etc) they will not be indexed on the destination host.
1. ```--force``` will delete indexes on the destination host. Otherwise an error will be returned if the index exists
1. ```--time``` is the [scroll time](http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/search-request-scroll.html#scroll-search-context) passed to the source host, default is 1m. This is a string in es's format.
1. ```--count``` is the [number of documents](http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/search-request-scroll.html#scroll-scan) that will be request and bulk indexed at a time. Note that this depends on the number of shards (ie: size of 10 on 5 shards is 50 documents)
1. ```--indexes``` is a comma separated list of indexes to copy
1. ```--all``` indexes starting with . and _ are ignored by default, --all overrides this behavior
1. ```--workers``` concurrency when we post to the bulk api. Only one post happens at a time, but higher concurrency should give you more throughput when using larger scroll sizes.
1. Ports are required, otherwise 80 is the assumed port
1. Ports are required, otherwise 80 is the assumed port (what)

## BUGS:

1. It will not do anything special when copying the _id (copies _id from source host). If _id is remapped this probably won't do what you want.
1. Should check if the bulk index requests starts getting large (in bytes), and force a flush if that is the case. Right now we show an error if elasticsearch refuses a large request.
1. It will not do anything special when copying the _id (copies _id from source host). If _id is remapped it may not do what you want.
1. Should assume a default port of 9200
83 changes: 46 additions & 37 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,24 +40,25 @@ type ClusterHealth struct {

type Config struct {
FlushLock sync.Mutex
DocChan chan Document
DocChan chan map[string]interface{}
ErrChan chan error
Uid string // es scroll uid

// config options
SrcEs string `short:"s" long:"source" description:"source elasticsearch instance" required:"true"`
DstEs string `short:"d" long:"dest" description:"destination elasticsearch instance" required:"true"`
DocBufferCount int `short:"c" long:"count" description:"number of documents at a time: ie \"size\" in the scroll request. If 0 size will not be sent to es" default:"100"`
DocBufferCount int `short:"c" long:"count" description:"number of documents at a time: ie \"size\" in the scroll request" default:"100"`
ScrollTime string `short:"t" long:"time" description:"scroll time" default:"1m"`
Destructive bool `short:"f" long:"force" description:"delete destination index before copying" default:"false"`
ShardsCount int `long:"shards" description:"set a number of shards on newly created indexes"`
DocsOnly bool `long:"docs-only" description:"load documents only, do not try to recreate indexes" default:"false"`
CreateIndexesOnly bool `long:"index-only" description:"only create indexes, do not load documents" default:"false"`
EnableReplication bool `long:"replicate" description:"enable replication while indexing into the new indexes" default:"false"`
IndexNames string `short:"i" long:"indexes" description:"list of indexes to copy, comma separated" default:"_all"`
CopyAllIndexes bool `short:"a" long:"all" description:"copy all indexes, if false indexes starting with . and _ are not copied" default:"false"`
CopyAllIndexes bool `short:"a" long:"all" description:"copy indexes starting with . and _" default:"false"`
Workers int `short:"w" long:"workers" description:"concurrency" default:"1"`
CopySettings bool `long:"settings" description:"copy sharding settings from source" default:"true"`
WaitForGreen bool `long:"green" description:"wait for both hosts cluster status to be green before dump. otherwise yellow is okay" default:"false"`
}

func main() {
Expand All @@ -77,7 +78,7 @@ func main() {
}

// enough of a buffer to hold all the search results across all workers
c.DocChan = make(chan Document, c.DocBufferCount*c.Workers)
c.DocChan = make(chan map[string]interface{}, c.DocBufferCount*c.Workers)

// get all indexes from source
idxs := Indexes{}
Expand Down Expand Up @@ -125,30 +126,22 @@ func main() {
return
}

// wait for cluster state to be okay in case we are copying many indexes or
// shards
// wait for cluster state to be okay before dumping
timer := time.NewTimer(time.Second * 3)
var srcReady, dstReady bool
for {
if health := ClusterStatus(c.SrcEs); health.Status == "red" {
fmt.Printf("%s is %s %s, delaying start\n", health.Name, c.SrcEs, health.Status)
srcReady = false
} else {
srcReady = true
}

if health := ClusterStatus(c.DstEs); health.Status == "red" {
fmt.Printf("%s on %s is %s, delaying start\n", health.Name, c.DstEs, health.Status)
dstReady = false
} else {
dstReady = true
if status, ready := c.ClusterReady(c.SrcEs); !ready {
fmt.Printf("%s at %s is %s, delaying dump\n", status.Name, c.SrcEs, status.Status)
<-timer.C
continue
}

if !srcReady || !dstReady {
if status, ready := c.ClusterReady(c.DstEs); !ready {
fmt.Printf("%s at %s is %s, delaying dump\n", status.Name, c.DstEs, status.Status)
<-timer.C
} else {
break
continue
}

timer.Stop()
break
}
fmt.Println("starting dump..")

Expand Down Expand Up @@ -195,7 +188,13 @@ func (c *Config) NewWorker(docCount *int, bar *pb.ProgressBar, wg *sync.WaitGrou

for {
var err error
doc, open := <-c.DocChan
docI, open := <-c.DocChan
doc := Document{
Index: docI["_index"].(string),
Type: docI["_type"].(string),
source: docI["_source"].(map[string]interface{}),
Id: docI["_id"].(string),
}

// if channel is closed flush and gtfo
if !open {
Expand All @@ -219,8 +218,8 @@ func (c *Config) NewWorker(docCount *int, bar *pb.ProgressBar, wg *sync.WaitGrou
c.ErrChan <- err
}

// if we approach the 100mb (95mb) limit, flush to es and reset mainBuf
if mainBuf.Len()+docBuf.Len() > 95000000 {
// if we approach the 100mb es limit, flush to es and reset mainBuf
if mainBuf.Len()+docBuf.Len() > 100000000 {
c.BulkPost(&mainBuf)
}

Expand Down Expand Up @@ -384,7 +383,7 @@ func (c *Config) CopyShardingSettings(idxs *Indexes) (err error) {
// try the new style syntax first, which has an index object
shards = settings.(map[string]interface{})["settings"].(map[string]interface{})["index"].(map[string]interface{})["number_of_shards"].(string)
} else {
// if not, could be running from an old es intace, try the old style index.number_of_shards
// if not, could be running from old es, try the old style index.number_of_shards
shards = settings.(map[string]interface{})["settings"].(map[string]interface{})["index.number_of_shards"].(string)
}
index.(map[string]interface{})["settings"].(map[string]interface{})["index"] = map[string]interface{}{
Expand Down Expand Up @@ -441,8 +440,6 @@ func (c *Config) NewScroll() (scroll *Scroll, err error) {
scroll = &Scroll{}
err = dec.Decode(scroll)

fmt.Println(scroll.ScrollId)

return
}

Expand All @@ -466,7 +463,7 @@ func (s *Scroll) Next(c *Config) (done bool) {
// XXX this might be bad, but assume we are done
if resp.StatusCode != 200 {
b, _ := ioutil.ReadAll(resp.Body)
c.ErrChan <- fmt.Errorf("bad scroll response: %s", string(b))
c.ErrChan <- fmt.Errorf("scroll response: %s", string(b))
// flush and quit
return true
}
Expand Down Expand Up @@ -498,13 +495,7 @@ func (s *Scroll) Next(c *Config) (done bool) {

// write all the docs into a channel
for _, docI := range docs {
doc := docI.(map[string]interface{})
c.DocChan <- Document{
Index: doc["_index"].(string),
Type: doc["_type"].(string),
source: doc["_source"].(map[string]interface{}),
Id: doc["_id"].(string),
}
c.DocChan <- docI.(map[string]interface{})
}

return
Expand Down Expand Up @@ -532,6 +523,24 @@ func (c *Config) BulkPost(data *bytes.Buffer) {
}
}

func (c *Config) ClusterReady(host string) (*ClusterHealth, bool) {

health := ClusterStatus(host)
if health.Status == "red" {
return health, false
}

if c.WaitForGreen == false && health.Status == "yellow" {
return health, true
}

if health.Status == "green" {
return health, true
}

return health, false
}

func ClusterStatus(host string) *ClusterHealth {

resp, err := http.Get(fmt.Sprintf("%s/_cluster/health", host))
Expand Down
2 changes: 2 additions & 0 deletions release.sh
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,5 @@ unset -f go-alias

# crosscompile
go-build-all
# dont arm wtf
rm -f elasticsearch-dump*arm*

0 comments on commit 8f12f5f

Please sign in to comment.