Skip to content

Commit

Permalink
add sync function for incremental update (#84)
Browse files Browse the repository at this point in the history
* fishjam#1

add go.mod and fix build error(infini.sh/framework, fasthttp, util)

* run following command to format code:
  gofmt -l -w .

* fishjam#1

1.add Config: SortField, TruncateOutFile, SkipFields, so can dump es index to local file and compare.
2.add Config function: Sync , so can scroll and compare the source and dest index records, and just index/update/delete the changed records
3.refactor code:
  - add some functions in esapi, ClusterVersion() and DeleteScroll(), add ParseEsApi
  - move bulk.go to migrator.go, and add some functions
  - refactor all http method(GET/Post/DoRequest) to sinle Request method, and support proxy.
  - delete some commented and useless code

* fix error while source index not exist and change log.

* change model name

* change log

* update README.md and change the description of some configurations
  • Loading branch information
fishjam committed Jan 5, 2024
1 parent a6a88ae commit 628122f
Show file tree
Hide file tree
Showing 18 changed files with 1,359 additions and 1,016 deletions.
19 changes: 18 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Links:
* Support specify which _source fields to return from source
* Support specify query string query to filter the data source
* Support rename source fields while do bulk indexing
* Support incremental update(add/update/delete changed records) with `--sync`. Notice: it use different implementation, just handle the ***changed*** records, but not as fast as the old way
* Load generating with

## ESM is fast!
Expand Down Expand Up @@ -69,6 +70,11 @@ copy index `src_index` from `192.168.1.x` to `192.168.1.y:9200` and save with `d
./bin/esm -s http://localhost:9200 -d http://localhost:9200 -x src_index -y dest_index -w=5 -b=100
```

use sync feature for incremental update index `src_index` from `192.168.1.x` to `192.168.1.y:9200`
```
./bin/esm --sync -s http://localhost:9200 -d http://localhost:9200 -x src_index -y dest_index
```

support Basic-Auth
```
./bin/esm -s http://localhost:9200 -x "src_index" -y "dest_index" -d http://localhost:9201 -n admin:111111
Expand All @@ -91,6 +97,13 @@ dump elasticsearch documents into local file
./bin/esm -s http://localhost:9200 -x "src_index" -m admin:111111 -c 5000 -q=query:mixer --refresh -o=dump.bin
```

dump source and target index to local file and compare them, so can find the difference quickly
```
./bin/esm --sort=_id -s http://localhost:9200 -x "src_index" --truncate_output --skip=_index -o=src.json
./bin/esm --sort=_id -s http://localhost:9200 -x "dst_index" --truncate_output --skip=_index -o=dst.json
diff -W 200 -ry --suppress-common-lines src.json dst.json
```

loading data from dump files, bulk insert to another es instance
```
./bin/esm -d http://localhost:9200 -y "dest_index" -n admin:111111 -c 5000 -b 5 --refresh -i=dump.bin
Expand Down Expand Up @@ -172,6 +185,7 @@ Usage:
Application Options:
-s, --source= source elasticsearch instance, ie: http://localhost:9200
-q, --query= query against source elasticsearch instance, filter data before migrate, ie: name:medcl
--sort= sort field when scroll, ie: _id (default: _id)
-d, --dest= destination elasticsearch instance, ie: http://localhost:9201
-m, --source_auth= basic auth of source elasticsearch instance, ie: user:pass
-n, --dest_auth= basic auth of target elasticsearch instance, ie: user:pass
Expand All @@ -192,12 +206,15 @@ Application Options:
--green wait for both hosts cluster status to be green before dump. otherwise yellow is okay
-v, --log= setting log level,options:trace,debug,info,warn,error (INFO)
-o, --output_file= output documents of source index into local file
--truncate_output= truncate before dump to output file
-i, --input_file= indexing from local dump file
--input_file_type= the data type of input file, options: dump, json_line, json_array, log_line (dump)
--source_proxy= set proxy to source http connections, ie: http://127.0.0.1:8080
--dest_proxy= set proxy to target http connections, ie: http://127.0.0.1:8080
--refresh refresh after migration finished
--fields= filter source fields, comma separated, ie: col1,col2,col3,...
--sync= sync will use scroll for both source and target index, compare the data and sync(index/update/delete)
--fields= filter source fields(white list), comma separated, ie: col1,col2,col3,...
--skip= skip source fields(black list), comma separated, ie: col1,col2,col3,...
--rename= rename source fields, comma separated, ie: _type:type, name:myname
-l, --logstash_endpoint= target logstash tcp endpoint, ie: 127.0.0.1:5055
--secured_logstash_endpoint target logstash tcp endpoint was secured by TLS
Expand Down
20 changes: 10 additions & 10 deletions buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ limitations under the License.
package main

import (
"io"
"errors"
"io"
)

//https://golangtc.com/t/5a49e2104ce40d740bbbc515
Expand All @@ -39,40 +39,40 @@ func (b *buffer) Len() int {
return b.end - b.start
}

//将有用的字节前移
// 将有用的字节前移
func (b *buffer) grow() {
if b.start == 0 {
return
}
copy(b.buf, b.buf[b.start:b.end])
b.end -= b.start
b.start = 0;
b.start = 0
}

//从reader里面读取数据,如果reader阻塞,会发生阻塞
// 从reader里面读取数据,如果reader阻塞,会发生阻塞
func (b *buffer) readFromReader() (int, error) {
b.grow()
n, err := b.reader.Read(b.buf[b.end:])
if (err != nil) {
if err != nil {
return n, err
}
b.end += n
return n, nil
}

//返回n个字节,而不产生移位
// 返回n个字节,而不产生移位
func (b *buffer) seek(n int) ([]byte, error) {
if b.end-b.start >= n {
buf := b.buf[b.start:b.start+n]
buf := b.buf[b.start : b.start+n]
return buf, nil
}
return nil, errors.New("not enough")
}

//舍弃offset个字段,读取n个字段
func (b *buffer) read(offset, n int) ([]byte) {
// 舍弃offset个字段,读取n个字段
func (b *buffer) read(offset, n int) []byte {
b.start += offset
buf := b.buf[b.start:b.start+n]
buf := b.buf[b.start : b.start+n]
b.start += n
return buf
}
183 changes: 0 additions & 183 deletions bulk.go

This file was deleted.

10 changes: 7 additions & 3 deletions domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ type ClusterHealth struct {
Status string `json:"status,omitempty"`
}

//{"took":23,"errors":true,"items":[{"create":{"_index":"mybank3","_type":"my_doc2","_id":"AWz8rlgUkzP-cujdA_Fv","status":409,"error":{"type":"version_conflict_engine_exception","reason":"[AWz8rlgUkzP-cujdA_Fv]: version conflict, document already exists (current version [1])","index_uuid":"w9JZbJkfSEWBI-uluWorgw","shard":"0","index":"mybank3"}}},{"create":{"_index":"mybank3","_type":"my_doc4","_id":"AWz8rpF2kzP-cujdA_Fx","status":400,"error":{"type":"illegal_argument_exception","reason":"Rejecting mapping update to [mybank3] as the final mapping would have more than 1 type: [my_doc2, my_doc4]"}}},{"create":{"_index":"mybank3","_type":"my_doc1","_id":"AWz8rjpJkzP-cujdA_Fu","status":400,"error":{"type":"illegal_argument_exception","reason":"Rejecting mapping update to [mybank3] as the final mapping would have more than 1 type: [my_doc2, my_doc1]"}}},{"create":{"_index":"mybank3","_type":"my_doc3","_id":"AWz8rnbckzP-cujdA_Fw","status":400,"error":{"type":"illegal_argument_exception","reason":"Rejecting mapping update to [mybank3] as the final mapping would have more than 1 type: [my_doc2, my_doc3]"}}},{"create":{"_index":"mybank3","_type":"my_doc5","_id":"AWz8rrsEkzP-cujdA_Fy","status":400,"error":{"type":"illegal_argument_exception","reason":"Rejecting mapping update to [mybank3] as the final mapping would have more than 1 type: [my_doc2, my_doc5]"}}},{"create":{"_index":"mybank3","_type":"doc","_id":"3","status":400,"error":{"type":"illegal_argument_exception","reason":"Rejecting mapping update to [mybank3] as the final mapping would have more than 1 type: [my_doc2, doc]"}}}]}
// {"took":23,"errors":true,"items":[{"create":{"_index":"mybank3","_type":"my_doc2","_id":"AWz8rlgUkzP-cujdA_Fv","status":409,"error":{"type":"version_conflict_engine_exception","reason":"[AWz8rlgUkzP-cujdA_Fv]: version conflict, document already exists (current version [1])","index_uuid":"w9JZbJkfSEWBI-uluWorgw","shard":"0","index":"mybank3"}}},{"create":{"_index":"mybank3","_type":"my_doc4","_id":"AWz8rpF2kzP-cujdA_Fx","status":400,"error":{"type":"illegal_argument_exception","reason":"Rejecting mapping update to [mybank3] as the final mapping would have more than 1 type: [my_doc2, my_doc4]"}}},{"create":{"_index":"mybank3","_type":"my_doc1","_id":"AWz8rjpJkzP-cujdA_Fu","status":400,"error":{"type":"illegal_argument_exception","reason":"Rejecting mapping update to [mybank3] as the final mapping would have more than 1 type: [my_doc2, my_doc1]"}}},{"create":{"_index":"mybank3","_type":"my_doc3","_id":"AWz8rnbckzP-cujdA_Fw","status":400,"error":{"type":"illegal_argument_exception","reason":"Rejecting mapping update to [mybank3] as the final mapping would have more than 1 type: [my_doc2, my_doc3]"}}},{"create":{"_index":"mybank3","_type":"my_doc5","_id":"AWz8rrsEkzP-cujdA_Fy","status":400,"error":{"type":"illegal_argument_exception","reason":"Rejecting mapping update to [mybank3] as the final mapping would have more than 1 type: [my_doc2, my_doc5]"}}},{"create":{"_index":"mybank3","_type":"doc","_id":"3","status":400,"error":{"type":"illegal_argument_exception","reason":"Rejecting mapping update to [mybank3] as the final mapping would have more than 1 type: [my_doc2, doc]"}}}]}
type BulkResponse struct {
Took int `json:"took,omitempty"`
Errors bool `json:"errors,omitempty"`
Expand Down Expand Up @@ -107,11 +107,12 @@ type Config struct {
// config options
SourceEs string `short:"s" long:"source" description:"source elasticsearch instance, ie: http://localhost:9200"`
Query string `short:"q" long:"query" description:"query against source elasticsearch instance, filter data before migrate, ie: name:medcl"`
SortField string `long:"sort" description:"sort field when scroll, ie: _id" default:"_id"`
TargetEs string `short:"d" long:"dest" description:"destination elasticsearch instance, ie: http://localhost:9201"`
SourceEsAuthStr string `short:"m" long:"source_auth" description:"basic auth of source elasticsearch instance, ie: user:pass"`
TargetEsAuthStr string `short:"n" long:"dest_auth" description:"basic auth of target elasticsearch instance, ie: user:pass"`
DocBufferCount int `short:"c" long:"count" description:"number of documents at a time: ie \"size\" in the scroll request" default:"10000"`
BufferCount int `long:"buffer_count" description:"number of buffered documents in memory" default:"1000000"`
BufferCount int `long:"buffer_count" description:"number of buffered documents in memory" default:"1000000"`
Workers int `short:"w" long:"workers" description:"concurrency number for bulk workers" default:"1"`
BulkSizeInMB int `short:"b" long:"bulk_size" description:"bulk size in MB" default:"5"`
ScrollTime string `short:"t" long:"time" description:"scroll time" default:"10m"`
Expand All @@ -127,12 +128,15 @@ type Config struct {
WaitForGreen bool `long:"green" description:"wait for both hosts cluster status to be green before dump. otherwise yellow is okay"`
LogLevel string `short:"v" long:"log" description:"setting log level,options:trace,debug,info,warn,error" default:"INFO"`
DumpOutFile string `short:"o" long:"output_file" description:"output documents of source index into local file" `
TruncateOutFile bool `long:"truncate_output" description:"truncate before dump to output file" `
DumpInputFile string `short:"i" long:"input_file" description:"indexing from local dump file" `
InputFileType string `long:"input_file_type" description:"the data type of input file, options: dump, json_line, json_array, log_line" default:"dump" `
SourceProxy string `long:"source_proxy" description:"set proxy to source http connections, ie: http://127.0.0.1:8080"`
TargetProxy string `long:"dest_proxy" description:"set proxy to target http connections, ie: http://127.0.0.1:8080"`
Refresh bool `long:"refresh" description:"refresh after migration finished"`
Fields string `long:"fields" description:"filter source fields, comma separated, ie: col1,col2,col3,..." `
Sync bool `long:"sync" description:"sync will use scroll for both source and target index, compare the data and sync(index/update/delete)"`
Fields string `long:"fields" description:"filter source fields(white list), comma separated, ie: col1,col2,col3,..." `
SkipFields string `long:"skip" description:"skip source fields(black list), comma separated, ie: col1,col2,col3,..." `
RenameFields string `long:"rename" description:"rename source fields, comma separated, ie: _type:type, name:myname" `
LogstashEndpoint string `short:"l" long:"logstash_endpoint" description:"target logstash tcp endpoint, ie: 127.0.0.1:5055" `
LogstashSecEndpoint bool `long:"secured_logstash_endpoint" description:"target logstash tcp endpoint was secured by TLS" `
Expand Down
21 changes: 12 additions & 9 deletions esapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,19 @@ package main

import "bytes"

type ESAPI interface{
type ESAPI interface {
ClusterHealth() *ClusterHealth
Bulk(data *bytes.Buffer)
ClusterVersion() *ClusterVersion
Bulk(data *bytes.Buffer) error
GetIndexSettings(indexNames string) (*Indexes, error)
DeleteIndex(name string) (error)
CreateIndex(name string,settings map[string]interface{}) (error)
GetIndexMappings(copyAllIndexes bool,indexNames string)(string,int,*Indexes,error)
UpdateIndexSettings(indexName string,settings map[string]interface{})(error)
UpdateIndexMapping(indexName string,mappings map[string]interface{})(error)
NewScroll(indexNames string,scrollTime string,docBufferCount int,query string, slicedId,maxSlicedCount int, fields string)(interface{}, error)
NextScroll(scrollTime string,scrollId string)(interface{},error)
DeleteIndex(name string) error
CreateIndex(name string, settings map[string]interface{}) error
GetIndexMappings(copyAllIndexes bool, indexNames string) (string, int, *Indexes, error)
UpdateIndexSettings(indexName string, settings map[string]interface{}) error
UpdateIndexMapping(indexName string, mappings map[string]interface{}) error
NewScroll(indexNames string, scrollTime string, docBufferCount int, query string, sort string,
slicedId int, maxSlicedCount int, fields string) (ScrollAPI, error)
NextScroll(scrollTime string, scrollId string) (ScrollAPI, error)
DeleteScroll(scrollId string) error
Refresh(name string) (err error)
}

0 comments on commit 628122f

Please sign in to comment.