Skip to content

Commit

Permalink
Merge pull request #1390 from 0chain/feat/optional-size
Browse files Browse the repository at this point in the history
Make size optional
  • Loading branch information
dabasov committed Mar 26, 2024
2 parents c2bd391 + 786760e commit 9f2ac26
Show file tree
Hide file tree
Showing 21 changed files with 405 additions and 363 deletions.
4 changes: 2 additions & 2 deletions code/go/0chain.net/blobbercore/allocation/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func SaveFileChange(connectionID, pathHash, fileName string, cmd FileCommand, is
change.seqPQ.Done(seqpriorityqueue.UploadData{
Offset: offset,
DataBytes: dataWritten,
})
}, contentSize)
} else {
change.seqPQ.Push(seqpriorityqueue.UploadData{
Offset: offset,
Expand Down Expand Up @@ -256,7 +256,7 @@ func cleanConnectionObj() {
connectionObj.cnclCtx()
for _, change := range connectionObj.changes {
if change.seqPQ != nil {
change.seqPQ.Done(seqpriorityqueue.UploadData{})
change.seqPQ.Done(seqpriorityqueue.UploadData{}, 1)
}
}
delete(connectionProcessor, connectionID)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func (nf *UpdateFileChanger) ApplyChange(ctx context.Context, rootRef *reference
fileRef.EncryptedKeyPoint = nf.EncryptedKeyPoint
fileRef.ChunkSize = nf.ChunkSize
fileRef.IsPrecommit = true
fileRef.FilestoreVersion = filestore.VERSION

return rootRef, nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strings"

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/config"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/filestore"

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/reference"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/util"
Expand Down Expand Up @@ -119,6 +120,7 @@ func (nf *UploadFileChanger) applyChange(ctx context.Context, rootRef *reference
UpdatedAt: ts,
HashToBeComputed: true,
IsPrecommit: true,
FilestoreVersion: filestore.VERSION,
}

fileID, ok := fileIDMeta[newFile.Path]
Expand Down
11 changes: 6 additions & 5 deletions code/go/0chain.net/blobbercore/challenge/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,12 @@ func (cr *ChallengeEntity) LoadValidationTickets(ctx context.Context) error {
}

challengeReadInput := &filestore.ChallengeReadBlockInput{
Hash: objectPath.Meta["validation_root"].(string),
FileSize: objectPath.Meta["size"].(int64),
BlockOffset: blockoffset,
AllocationID: cr.AllocationID,
IsPrecommit: fromPreCommit,
Hash: objectPath.Meta["validation_root"].(string),
FileSize: objectPath.Meta["size"].(int64),
BlockOffset: blockoffset,
AllocationID: cr.AllocationID,
IsPrecommit: fromPreCommit,
FilestoreVersion: objectPath.FilestoreVersion,
}

t1 := time.Now()
Expand Down
71 changes: 47 additions & 24 deletions code/go/0chain.net/blobbercore/filestore/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ func (fs *FileStore) WriteFile(allocID, conID string, fileData *FileInputData, i
tempFilePath := fs.getTempPathForFile(allocID, fileData.Name, fileData.FilePathHash, conID)
var (
initialSize int64
nodeSize int64
offset int64
)
finfo, err := os.Stat(tempFilePath)
if err != nil && !errors.Is(err, os.ErrNotExist) {
Expand All @@ -73,10 +71,7 @@ func (fs *FileStore) WriteFile(allocID, conID string, fileData *FileInputData, i
if finfo != nil {
initialSize = finfo.Size()
}
if !fileData.IsThumbnail {
nodeSize = getNodesSize(fileData.Size, util.MaxMerkleLeavesSize)
offset = fileData.UploadOffset + nodeSize + FMTSize
}

if err = createDirs(filepath.Dir(tempFilePath)); err != nil {
return nil, common.NewError("dir_creation_error", err.Error())
}
Expand All @@ -86,7 +81,7 @@ func (fs *FileStore) WriteFile(allocID, conID string, fileData *FileInputData, i
}
defer f.Close()

_, err = f.Seek(offset, io.SeekStart)
_, err = f.Seek(fileData.UploadOffset, io.SeekStart)
if err != nil {
return nil, common.NewError("file_seek_error", err.Error())
}
Expand All @@ -107,15 +102,15 @@ func (fs *FileStore) WriteFile(allocID, conID string, fileData *FileInputData, i
if currentSize > initialSize { // Is chunk new or rewritten
fs.updateAllocTempFileSize(allocID, currentSize-initialSize)
}
if currentSize > fileData.Size+nodeSize+FMTSize {
if fileData.Size > 0 && currentSize > fileData.Size {
_ = os.Remove(tempFilePath)
return nil, common.NewError("file_size_mismatch", "File size is greater than expected")
}
logging.Logger.Info("temp_file_write: ", zap.String("filePath", fileData.Path), zap.Int64("currentSize", currentSize), zap.Int64("initialSize", initialSize), zap.Int64("writtenSize", writtenSize), zap.Int64("offset", fileData.UploadOffset), zap.Bool("ChunkUploaded", fileRef.ChunkUploaded))
fileRef.Size = writtenSize
fileRef.Name = fileData.Name
fileRef.Path = fileData.Path
fileRef.ContentSize = currentSize - nodeSize - FMTSize
fileRef.ContentSize = currentSize
return fileRef, nil
}

Expand Down Expand Up @@ -273,8 +268,8 @@ func (fs *FileStore) CommitWrite(allocID, conID string, fileData *FileInputData)
if err != nil {
return false, common.NewError("stat_error", err.Error())
}
nodeSie := getNodesSize(fileData.Size, util.MaxMerkleLeavesSize)
fileSize := rStat.Size() - nodeSie - FMTSize

fileSize := rStat.Size()
now := time.Now()
ctx, cancel := context.WithTimeout(context.TODO(), 30*time.Second)
defer cancel()
Expand All @@ -283,12 +278,16 @@ func (fs *FileStore) CommitWrite(allocID, conID string, fileData *FileInputData)
return false, common.NewError("hasher_wait_error", err.Error())
}
elapsedWait := time.Since(now)
_, err = r.Seek(fileSize, io.SeekStart)
if err != nil {
return false, common.NewError("seek_error", err.Error())
}
fmtRootBytes, err := fileData.Hasher.fmt.CalculateRootAndStoreNodes(r)
if err != nil {
return false, common.NewError("fmt_hash_calculation_error", err.Error())
}

validationRootBytes, err := fileData.Hasher.vt.CalculateRootAndStoreNodes(r)
validationRootBytes, err := fileData.Hasher.vt.CalculateRootAndStoreNodes(r, fileSize)
if err != nil {
return false, common.NewError("validation_hash_calculation_error", err.Error())
}
Expand Down Expand Up @@ -557,8 +556,13 @@ func (fs *FileStore) GetFileBlock(readBlockIn *ReadBlockInput) (*FileDownloadRes
vmp := &FileDownloadResponse{}

if readBlockIn.VerifyDownload {
vpOffset := int64(FMTSize)
if readBlockIn.FilestoreVersion == 1 {
vpOffset += readBlockIn.FileSize
}
vp := validationTreeProof{
dataSize: readBlockIn.FileSize,
offset: vpOffset,
}

logging.Logger.Debug("calling GetMerkleProofOfMultipleIndexes", zap.Any("readBlockIn", readBlockIn), zap.Any("vmp", vmp))
Expand All @@ -570,16 +574,24 @@ func (fs *FileStore) GetFileBlock(readBlockIn *ReadBlockInput) (*FileDownloadRes
vmp.Nodes = nodes
vmp.Indexes = indexes
}

fileOffset := FMTSize + nodesSize + int64(startBlock)*ChunkSize

_, err = file.Seek(fileOffset, io.SeekStart)
if err != nil {
return nil, common.NewError("seek_error", err.Error())
logging.Logger.Info("filestore_version", zap.Int("version", readBlockIn.FilestoreVersion))
fileOffset := int64(startBlock) * ChunkSize
if readBlockIn.FilestoreVersion == 1 {
_, err = file.Seek(fileOffset, io.SeekStart)
if err != nil {
return nil, common.NewError("seek_error", err.Error())
}
} else {
_, err = file.Seek(fileOffset+FMTSize+nodesSize, io.SeekStart)
if err != nil {
return nil, common.NewError("seek_error", err.Error())
}
}

fileReader := io.LimitReader(file, filesize-fileOffset)

buffer := make([]byte, readBlockIn.NumBlocks*ChunkSize)
n, err := file.Read(buffer)
n, err := fileReader.Read(buffer)
if err != nil && err != io.EOF {
return nil, err
}
Expand Down Expand Up @@ -623,21 +635,32 @@ func (fs *FileStore) GetBlocksMerkleTreeForChallenge(in *ChallengeReadBlockInput

defer file.Close()

var offset int64
if in.FilestoreVersion == 1 {
offset = in.FileSize
}

fmp := &fixedMerkleTreeProof{
idx: in.BlockOffset,
dataSize: in.FileSize,
offset: offset,
}

_, err = file.Seek(-in.FileSize, io.SeekEnd)
if err != nil {
return nil, common.NewError("seek_error", err.Error())
}
merkleProof, err := fmp.GetMerkleProof(file)
if err != nil {
return nil, common.NewError("get_merkle_proof_error", err.Error())
}

proofByte, err := fmp.GetLeafContent(file)
if in.FilestoreVersion == 0 {
_, err = file.Seek(-in.FileSize, io.SeekEnd)
if err != nil {
return nil, common.NewError("seek_error", err.Error())
}
}

fileReader := io.LimitReader(file, in.FileSize)

proofByte, err := fmp.GetLeafContent(fileReader)
if err != nil {
return nil, common.NewError("get_leaf_content_error", err.Error())
}
Expand Down
33 changes: 19 additions & 14 deletions code/go/0chain.net/blobbercore/filestore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ import (
"mime/multipart"
)

const CHUNK_SIZE = 64 * 1024
const (
CHUNK_SIZE = 64 * 1024
VERSION = 1
)

type FileInputData struct {
Name string
Expand Down Expand Up @@ -94,14 +97,15 @@ type FileDownloadResponse struct {
}

type ReadBlockInput struct {
AllocationID string
FileSize int64
Hash string
StartBlockNum int
NumBlocks int
IsThumbnail bool
VerifyDownload bool
IsPrecommit bool
AllocationID string
FileSize int64
Hash string
StartBlockNum int
NumBlocks int
IsThumbnail bool
VerifyDownload bool
IsPrecommit bool
FilestoreVersion int
}

type ChallengeResponse struct {
Expand All @@ -111,9 +115,10 @@ type ChallengeResponse struct {
}

type ChallengeReadBlockInput struct {
BlockOffset int
FileSize int64
Hash string
AllocationID string
IsPrecommit bool
BlockOffset int
FileSize int64
Hash string
AllocationID string
IsPrecommit bool
FilestoreVersion int
}

0 comments on commit 9f2ac26

Please sign in to comment.