Skip to content

Commit

Permalink
Merge pull request #85 from 0chain/sprint-july-4
Browse files Browse the repository at this point in the history
Sprint-july-4
  • Loading branch information
Kishan-Dhakan committed Aug 16, 2023
2 parents 0dd0b6b + 6b320df commit 7051997
Show file tree
Hide file tree
Showing 9 changed files with 222 additions and 28 deletions.
19 changes: 19 additions & 0 deletions cmd/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ var (
retryCount int
deleteSource bool
workDir string
chunkSize int64
chunkNumber int
batchSize int
)

// migrateCmd is the migrateFromS3 sub command to migrate whole objects from some buckets.
Expand Down Expand Up @@ -65,6 +68,9 @@ func init() {
migrateCmd.Flags().IntVar(&retryCount, "retry", 3, "retry count for upload to dstorage")
migrateCmd.Flags().StringVar(&newerThanStr, "newer-than", "", "eg; 7d10h --> migrate objects that is newer than 7 days 10 hours")
migrateCmd.Flags().StringVar(&olderThanStr, "older-than", "", "eg; 7d10h --> migrate objects that is older than 7 days 10 hours")
migrateCmd.Flags().Int64Var(&chunkSize, "chunk-size", 50*1024*1024, "chunk size in bytes")
migrateCmd.Flags().IntVar(&chunkNumber, "chunk-number", 1000, "number of chunks to upload")
migrateCmd.Flags().IntVar(&batchSize, "batch-size", 30, "number of files to upload in a batch")

}

Expand Down Expand Up @@ -196,6 +202,16 @@ var migrateCmd = &cobra.Command{
return err
}

if chunkNumber == 0 {
chunkNumber = 500
}
if chunkSize == 0 {
chunkSize = 50 * 1024 * 1024
}
if batchSize == 0 {
batchSize = 30
}

mConfig := migration.MigrationConfig{
AllocationID: allocationId,
Region: region,
Expand All @@ -213,6 +229,9 @@ var migrateCmd = &cobra.Command{
StartAfter: startAfter,
StateFilePath: stateFilePath,
WorkDir: workDir,
ChunkSize: chunkSize,
ChunkNumber: chunkNumber,
BatchSize: batchSize,
}

if err := migration.InitMigration(&mConfig); err != nil {
Expand Down
12 changes: 9 additions & 3 deletions dstorage/dstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type DStoreI interface {
GetAvailableSpace() int64
GetTotalSpace() int64
UpdateAllocationDetails() error
GetChunkWriteSize() int64
}

type DStorageService struct {
Expand All @@ -53,12 +54,12 @@ type DStorageService struct {
//then duplicate path should be /path/to/remote/file{duplicateSuffix}.txt
duplicateSuffix string
workDir string
chunkNumer int
}

const (
GetRefRetryWaitTime = 500 * time.Millisecond
GetRefRetryCount = 2
ChunkNumber = 1000
)

func (d *DStorageService) GetFileMetaData(ctx context.Context, remotePath string) (*sdk.ORef, error) {
Expand Down Expand Up @@ -110,7 +111,7 @@ func (d *DStorageService) Upload(ctx context.Context, remotePath string, r io.Re

options := []sdk.ChunkedUploadOption{
sdk.WithEncrypt(d.encrypt),
sdk.WithChunkNumber(ChunkNumber),
sdk.WithChunkNumber(d.chunkNumer),
}

op := sdk.OperationRequest{
Expand Down Expand Up @@ -171,7 +172,7 @@ func (d *DStorageService) GetTotalSpace() int64 {
return d.allocation.Size
}

func GetDStorageService(allocationID, migrateTo, duplicateSuffix, workDir string, encrypt bool) (*DStorageService, error) {
func GetDStorageService(allocationID, migrateTo, duplicateSuffix, workDir string, encrypt bool, chunkNumber int) (*DStorageService, error) {
allocation, err := sdk.GetAllocation(allocationID)

if err != nil {
Expand All @@ -196,5 +197,10 @@ func GetDStorageService(allocationID, migrateTo, duplicateSuffix, workDir string
migrateTo: migrateTo,
duplicateSuffix: duplicateSuffix,
workDir: workDir,
chunkNumer: chunkNumber,
}, nil
}

func (d *DStorageService) GetChunkWriteSize() int64 {
return d.allocation.GetChunkReadSize(d.encrypt)
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.20

require (
github.com/0chain/errors v1.0.3
github.com/0chain/gosdk v1.8.17-0.20230726135820-9629181f245e
github.com/0chain/gosdk v1.8.17-0.20230809212922-e71a28baf114
github.com/aws/aws-sdk-go-v2 v1.17.1
github.com/aws/aws-sdk-go-v2/config v1.17.10
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.37
Expand Down
6 changes: 3 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ github.com/0chain/common v0.0.6-0.20230127095721-8df4d1d72565 h1:z+DtCR8mBsjPnEs
github.com/0chain/common v0.0.6-0.20230127095721-8df4d1d72565/go.mod h1:UyDC8Qyl5z9lGkCnf9RHJPMektnFX8XtCJZHXCCVj8E=
github.com/0chain/errors v1.0.3 h1:QQZPFxTfnMcRdt32DXbzRQIfGWmBsKoEdszKQDb0rRM=
github.com/0chain/errors v1.0.3/go.mod h1:xymD6nVgrbgttWwkpSCfLLEJbFO6iHGQwk/yeSuYkIc=
github.com/0chain/gosdk v1.8.17-0.20230726135820-9629181f245e h1:6c8JM+0uQhnYcVgxHB+PCOgDyIbk4WY7BXsxmvL8cQM=
github.com/0chain/gosdk v1.8.17-0.20230726135820-9629181f245e/go.mod h1:3NKNYzmnMIYqZwwwOgZwMmTW1DT1ZUAmKyVPmYQOiT4=
github.com/0chain/gosdk v1.8.17-0.20230809212922-e71a28baf114 h1:fgaUQSUpAqhjhD3ONmiY+3yWn56qHADEd0TCoRcDSZ0=
github.com/0chain/gosdk v1.8.17-0.20230809212922-e71a28baf114/go.mod h1:3NKNYzmnMIYqZwwwOgZwMmTW1DT1ZUAmKyVPmYQOiT4=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/Luzifer/go-openssl/v3 v3.1.0 h1:QqKqo6kYXGGUsvtUoCpRZm8lHw+jDfhbzr36gVj+/gw=
Expand Down Expand Up @@ -728,4 +728,4 @@ honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9
honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
129 changes: 113 additions & 16 deletions migration/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ const (
)

const (
batchSize = 50
maxBatchSize = 1024 * 1024 * 1024 // 1GB
)

Expand Down Expand Up @@ -75,6 +74,8 @@ type Migration struct {
workDir string
deleteSource bool
bucket string
chunkSize int64
batchSize int
}

type MigrationOperation struct {
Expand Down Expand Up @@ -120,6 +121,7 @@ func InitMigration(mConfig *MigrationConfig) error {
mConfig.DuplicateSuffix,
mConfig.WorkDir,
mConfig.Encrypt,
mConfig.ChunkNumber,
)
if err != nil {
zlogger.Logger.Error(err)
Expand Down Expand Up @@ -160,6 +162,8 @@ func InitMigration(mConfig *MigrationConfig) error {
workDir: mConfig.WorkDir,
bucket: mConfig.Bucket,
fs: util.Fs,
chunkSize: mConfig.ChunkSize,
batchSize: mConfig.BatchSize,
}

rootContext, rootContextCancel = context.WithCancel(context.Background())
Expand Down Expand Up @@ -232,7 +236,7 @@ func StartMigration() error {

migrationWorker := NewMigrationWorker(migration.workDir)
go migration.DownloadWorker(rootContext, migrationWorker)
go migration.UploadWorker(rootContext, migrationWorker)
// go migration.UploadWorker(rootContext, migrationWorker)
migration.UpdateStateFile(migrationWorker)
err := migrationWorker.GetMigrationError()
if err != nil {
Expand All @@ -247,21 +251,36 @@ func (m *Migration) DownloadWorker(ctx context.Context, migrator *MigrationWorke
defer migrator.CloseDownloadQueue()
objCh, errCh := migration.awsStore.ListFilesInBucket(rootContext)
wg := &sync.WaitGroup{}
ops := make([]MigrationOperation, 0, m.batchSize)
var opLock sync.Mutex
currentSize := 0
opCtx, opCtxCancel := context.WithCancel(ctx)
for obj := range objCh {
zlogger.Logger.Info("Downloading object: ", obj.Key)
migrator.PauseDownload()
if migrator.IsMigrationError() {
opCtxCancel()
return
}
wg.Add(1)

if currentSize >= m.batchSize {
processOps := ops
// Here scope of improvement
wg.Wait()
m.processMultiOperation(opCtx, processOps, migrator)
opCtxCancel()
opCtx, opCtxCancel = context.WithCancel(ctx)
ops = nil
currentSize = 0
}
currentSize++
downloadObjMeta := &DownloadObjectMeta{
ObjectKey: obj.Key,
Size: obj.Size,
DoneChan: make(chan struct{}, 1),
ErrChan: make(chan error, 1),
mimeType: obj.ContentType,
}

wg.Add(1)
go func() {
defer wg.Done()
err := checkIsFileExist(ctx, downloadObjMeta)
Expand All @@ -276,22 +295,31 @@ func (m *Migration) DownloadWorker(ctx context.Context, migrator *MigrationWorke
migrator.DownloadDone(downloadObjMeta, "", nil)
return
}
migrator.DownloadStart(downloadObjMeta)
zlogger.Logger.Info("download start", downloadObjMeta.ObjectKey, downloadObjMeta.Size)
downloadPath, err := m.awsStore.DownloadToFile(ctx, downloadObjMeta.ObjectKey)
migrator.DownloadDone(downloadObjMeta, downloadPath, err)
migrator.SetMigrationError(err)
zlogger.Logger.Info("download done", downloadObjMeta.ObjectKey, downloadObjMeta.Size, err)
dataChan := make(chan *util.DataChan, 200)
streamWriter := util.NewStreamWriter(dataChan)
go m.processChunkDownload(opCtx, streamWriter, migrator, downloadObjMeta)
// Always return nil as error
op, _ := processOperationForMemory(ctx, downloadObjMeta, streamWriter)
opLock.Lock()
ops = append(ops, op)
opLock.Unlock()
}()
time.Sleep(1 * time.Second)
}
if currentSize > 0 {
wg.Wait()
processOps := ops
// Here scope of improvement
m.processMultiOperation(ctx, processOps, migrator)
ops = nil
}
opCtxCancel()
wg.Wait()
err := <-errCh
if err != nil {
zlogger.Logger.Error(err)
migrator.SetMigrationError(err)
}

migrator.CloseUploadQueue()
}

func (m *Migration) UploadWorker(ctx context.Context, migrator *MigrationWorker) {
Expand All @@ -301,7 +329,7 @@ func (m *Migration) UploadWorker(ctx context.Context, migrator *MigrationWorker)

downloadQueue := migrator.GetDownloadQueue()
wg := &sync.WaitGroup{}
ops := make([]MigrationOperation, 0, batchSize)
ops := make([]MigrationOperation, 0, m.batchSize)
totalSize := int64(0)
for d := range downloadQueue {
zlogger.Logger.Info("Uploading object: ", d.ObjectKey)
Expand Down Expand Up @@ -337,7 +365,7 @@ func (m *Migration) UploadWorker(ctx context.Context, migrator *MigrationWorker)
op.uploadObj = uploadObj
ops = append(ops, op)
totalSize += downloadObj.Size
if len(ops) >= batchSize || totalSize >= maxBatchSize {
if len(ops) >= m.batchSize || totalSize >= maxBatchSize {
processOps := ops
ops = nil
wg.Add(1)
Expand Down Expand Up @@ -427,6 +455,35 @@ func processOperation(ctx context.Context, downloadObj *DownloadObjectMeta) (Mig
return op, nil
}

func processOperationForMemory(ctx context.Context, downloadObj *DownloadObjectMeta, r io.Reader) (MigrationOperation, error) {
remotePath := getRemotePath(downloadObj.ObjectKey)
var op MigrationOperation
mimeType := downloadObj.mimeType
var fileOperation sdk.OperationRequest
if downloadObj.IsFileAlreadyExist {
switch migration.skip {
case Replace:
zlogger.Logger.Info("Replacing object" + downloadObj.ObjectKey + " size " + strconv.FormatInt(downloadObj.Size, 10))
fileOperation = migration.zStore.Replace(ctx, remotePath, r, downloadObj.Size, mimeType)
case Duplicate:
zlogger.Logger.Info("Duplicating object" + downloadObj.ObjectKey + " size " + strconv.FormatInt(downloadObj.Size, 10))
fileOperation = migration.zStore.Duplicate(ctx, remotePath, r, downloadObj.Size, mimeType)
}
} else {
zlogger.Logger.Info("Uploading object: " + downloadObj.ObjectKey + " size " + strconv.FormatInt(downloadObj.Size, 10))
fileOperation = migration.zStore.Upload(ctx, remotePath, r, downloadObj.Size, mimeType, false)
}
op.Operation = fileOperation
op.uploadObj = &UploadObjectMeta{
ObjectKey: downloadObj.ObjectKey,
DoneChan: make(chan struct{}, 1),
ErrChan: make(chan error, 1),
Size: downloadObj.Size,
LocalPath: downloadObj.LocalPath,
}
return op, nil
}

func processUpload(ctx context.Context, ops []sdk.OperationRequest) error {

err := migration.zStore.MultiUpload(ctx, ops)
Expand Down Expand Up @@ -467,7 +524,7 @@ func (m *Migration) UpdateStateFile(migrateHandler *MigrationWorker) {
}
}

func (m *Migration) processMultiOperation(ctx context.Context, ops []MigrationOperation, migrator *MigrationWorker) {
func (m *Migration) processMultiOperation(ctx context.Context, ops []MigrationOperation, migrator *MigrationWorker) error {
var err error
defer func() {
for _, op := range ops {
Expand Down Expand Up @@ -509,4 +566,44 @@ func (m *Migration) processMultiOperation(ctx context.Context, ops []MigrationOp
zlogger.Logger.Info("upload done: ", op.uploadObj.ObjectKey, "size ", op.uploadObj.Size, err)
}
migrator.SetMigrationError(err)
return err
}

func (m *Migration) processChunkDownload(ctx context.Context, sw *util.StreamWriter, migrator *MigrationWorker, downloadObjMeta *DownloadObjectMeta) {
// chunk download and pipe data

migrator.DownloadStart(downloadObjMeta)
offset := 0
chunkSize := int(m.chunkSize)
acceptedChunkSize := int(m.zStore.GetChunkWriteSize())
defer close(sw.DataChan)
for {
select {
case <-ctx.Done():
return
default:
}
data, err := m.awsStore.DownloadToMemory(ctx, downloadObjMeta.ObjectKey, int64(offset), int64(chunkSize))
if err != nil {
migrator.DownloadDone(downloadObjMeta, "", err)
ctx.Err()
return
}
if len(data) > 0 {
current := 0
for ; current < len(data); current += acceptedChunkSize {
high := current + acceptedChunkSize
if high > len(data) {
high = len(data)
}
sw.Write(data[current:high])
}
}
offset += chunkSize
// End of file
if len(data) < chunkSize {
break
}
}
migrator.DownloadDone(downloadObjMeta, "", nil)
}
3 changes: 3 additions & 0 deletions migration/migrateConfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,7 @@ type MigrationConfig struct {
StartAfter string
StateFilePath string
WorkDir string
ChunkSize int64
ChunkNumber int
BatchSize int
}
1 change: 1 addition & 0 deletions migration/migration_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type DownloadObjectMeta struct {
DoneChan chan struct{}
ErrChan chan error
IsFileAlreadyExist bool
mimeType string
}

type UploadObjectMeta struct {
Expand Down

0 comments on commit 7051997

Please sign in to comment.