Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make size optional #1390

Merged
merged 14 commits into from
Mar 26, 2024
4 changes: 2 additions & 2 deletions code/go/0chain.net/blobbercore/allocation/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func SaveFileChange(connectionID, pathHash, fileName string, cmd FileCommand, is
change.seqPQ.Done(seqpriorityqueue.UploadData{
Offset: offset,
DataBytes: dataWritten,
})
}, contentSize)
} else {
change.seqPQ.Push(seqpriorityqueue.UploadData{
Offset: offset,
Expand Down Expand Up @@ -256,7 +256,7 @@ func cleanConnectionObj() {
connectionObj.cnclCtx()
for _, change := range connectionObj.changes {
if change.seqPQ != nil {
change.seqPQ.Done(seqpriorityqueue.UploadData{})
change.seqPQ.Done(seqpriorityqueue.UploadData{}, 1)
}
}
delete(connectionProcessor, connectionID)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func (nf *UpdateFileChanger) ApplyChange(ctx context.Context, rootRef *reference
fileRef.EncryptedKeyPoint = nf.EncryptedKeyPoint
fileRef.ChunkSize = nf.ChunkSize
fileRef.IsPrecommit = true
fileRef.FilestoreVersion = filestore.VERSION

return rootRef, nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strings"

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/config"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/filestore"

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/reference"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/util"
Expand Down Expand Up @@ -119,6 +120,7 @@ func (nf *UploadFileChanger) applyChange(ctx context.Context, rootRef *reference
UpdatedAt: ts,
HashToBeComputed: true,
IsPrecommit: true,
FilestoreVersion: filestore.VERSION,
}

fileID, ok := fileIDMeta[newFile.Path]
Expand Down
11 changes: 6 additions & 5 deletions code/go/0chain.net/blobbercore/challenge/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,12 @@ func (cr *ChallengeEntity) LoadValidationTickets(ctx context.Context) error {
}

challengeReadInput := &filestore.ChallengeReadBlockInput{
Hash: objectPath.Meta["validation_root"].(string),
FileSize: objectPath.Meta["size"].(int64),
BlockOffset: blockoffset,
AllocationID: cr.AllocationID,
IsPrecommit: fromPreCommit,
Hash: objectPath.Meta["validation_root"].(string),
FileSize: objectPath.Meta["size"].(int64),
BlockOffset: blockoffset,
AllocationID: cr.AllocationID,
IsPrecommit: fromPreCommit,
FilestoreVersion: objectPath.FilestoreVersion,
}

t1 := time.Now()
Expand Down
71 changes: 47 additions & 24 deletions code/go/0chain.net/blobbercore/filestore/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ func (fs *FileStore) WriteFile(allocID, conID string, fileData *FileInputData, i
tempFilePath := fs.getTempPathForFile(allocID, fileData.Name, fileData.FilePathHash, conID)
var (
initialSize int64
nodeSize int64
offset int64
)
finfo, err := os.Stat(tempFilePath)
if err != nil && !errors.Is(err, os.ErrNotExist) {
Expand All @@ -73,10 +71,7 @@ func (fs *FileStore) WriteFile(allocID, conID string, fileData *FileInputData, i
if finfo != nil {
initialSize = finfo.Size()
}
if !fileData.IsThumbnail {
nodeSize = getNodesSize(fileData.Size, util.MaxMerkleLeavesSize)
offset = fileData.UploadOffset + nodeSize + FMTSize
}

if err = createDirs(filepath.Dir(tempFilePath)); err != nil {
return nil, common.NewError("dir_creation_error", err.Error())
}
Expand All @@ -86,7 +81,7 @@ func (fs *FileStore) WriteFile(allocID, conID string, fileData *FileInputData, i
}
defer f.Close()

_, err = f.Seek(offset, io.SeekStart)
_, err = f.Seek(fileData.UploadOffset, io.SeekStart)
if err != nil {
return nil, common.NewError("file_seek_error", err.Error())
}
Expand All @@ -107,15 +102,15 @@ func (fs *FileStore) WriteFile(allocID, conID string, fileData *FileInputData, i
if currentSize > initialSize { // Is chunk new or rewritten
fs.updateAllocTempFileSize(allocID, currentSize-initialSize)
}
if currentSize > fileData.Size+nodeSize+FMTSize {
if fileData.Size > 0 && currentSize > fileData.Size {
_ = os.Remove(tempFilePath)
return nil, common.NewError("file_size_mismatch", "File size is greater than expected")
}
logging.Logger.Info("temp_file_write: ", zap.String("filePath", fileData.Path), zap.Int64("currentSize", currentSize), zap.Int64("initialSize", initialSize), zap.Int64("writtenSize", writtenSize), zap.Int64("offset", fileData.UploadOffset), zap.Bool("ChunkUploaded", fileRef.ChunkUploaded))
fileRef.Size = writtenSize
fileRef.Name = fileData.Name
fileRef.Path = fileData.Path
fileRef.ContentSize = currentSize - nodeSize - FMTSize
fileRef.ContentSize = currentSize
return fileRef, nil
}

Expand Down Expand Up @@ -273,8 +268,8 @@ func (fs *FileStore) CommitWrite(allocID, conID string, fileData *FileInputData)
if err != nil {
return false, common.NewError("stat_error", err.Error())
}
nodeSie := getNodesSize(fileData.Size, util.MaxMerkleLeavesSize)
fileSize := rStat.Size() - nodeSie - FMTSize

fileSize := rStat.Size()
now := time.Now()
ctx, cancel := context.WithTimeout(context.TODO(), 30*time.Second)
defer cancel()
Expand All @@ -283,12 +278,16 @@ func (fs *FileStore) CommitWrite(allocID, conID string, fileData *FileInputData)
return false, common.NewError("hasher_wait_error", err.Error())
}
elapsedWait := time.Since(now)
_, err = r.Seek(fileSize, io.SeekStart)
if err != nil {
return false, common.NewError("seek_error", err.Error())
}
fmtRootBytes, err := fileData.Hasher.fmt.CalculateRootAndStoreNodes(r)
if err != nil {
return false, common.NewError("fmt_hash_calculation_error", err.Error())
}

validationRootBytes, err := fileData.Hasher.vt.CalculateRootAndStoreNodes(r)
validationRootBytes, err := fileData.Hasher.vt.CalculateRootAndStoreNodes(r, fileSize)
if err != nil {
return false, common.NewError("validation_hash_calculation_error", err.Error())
}
Expand Down Expand Up @@ -557,8 +556,13 @@ func (fs *FileStore) GetFileBlock(readBlockIn *ReadBlockInput) (*FileDownloadRes
vmp := &FileDownloadResponse{}

if readBlockIn.VerifyDownload {
vpOffset := int64(FMTSize)
if readBlockIn.FilestoreVersion == 1 {
vpOffset += readBlockIn.FileSize
}
vp := validationTreeProof{
dataSize: readBlockIn.FileSize,
offset: vpOffset,
}

logging.Logger.Debug("calling GetMerkleProofOfMultipleIndexes", zap.Any("readBlockIn", readBlockIn), zap.Any("vmp", vmp))
Expand All @@ -570,16 +574,24 @@ func (fs *FileStore) GetFileBlock(readBlockIn *ReadBlockInput) (*FileDownloadRes
vmp.Nodes = nodes
vmp.Indexes = indexes
}

fileOffset := FMTSize + nodesSize + int64(startBlock)*ChunkSize

_, err = file.Seek(fileOffset, io.SeekStart)
if err != nil {
return nil, common.NewError("seek_error", err.Error())
logging.Logger.Info("filestore_version", zap.Int("version", readBlockIn.FilestoreVersion))
fileOffset := int64(startBlock) * ChunkSize
if readBlockIn.FilestoreVersion == 1 {
_, err = file.Seek(fileOffset, io.SeekStart)
if err != nil {
return nil, common.NewError("seek_error", err.Error())
}
} else {
_, err = file.Seek(fileOffset+FMTSize+nodesSize, io.SeekStart)
if err != nil {
return nil, common.NewError("seek_error", err.Error())
}
}

fileReader := io.LimitReader(file, filesize-fileOffset)

buffer := make([]byte, readBlockIn.NumBlocks*ChunkSize)
n, err := file.Read(buffer)
n, err := fileReader.Read(buffer)
if err != nil && err != io.EOF {
return nil, err
}
Expand Down Expand Up @@ -623,21 +635,32 @@ func (fs *FileStore) GetBlocksMerkleTreeForChallenge(in *ChallengeReadBlockInput

defer file.Close()

var offset int64
if in.FilestoreVersion == 1 {
offset = in.FileSize
}

fmp := &fixedMerkleTreeProof{
idx: in.BlockOffset,
dataSize: in.FileSize,
offset: offset,
}

_, err = file.Seek(-in.FileSize, io.SeekEnd)
if err != nil {
return nil, common.NewError("seek_error", err.Error())
}
merkleProof, err := fmp.GetMerkleProof(file)
if err != nil {
return nil, common.NewError("get_merkle_proof_error", err.Error())
}

proofByte, err := fmp.GetLeafContent(file)
if in.FilestoreVersion == 0 {
_, err = file.Seek(-in.FileSize, io.SeekEnd)
if err != nil {
return nil, common.NewError("seek_error", err.Error())
}
}

fileReader := io.LimitReader(file, in.FileSize)

proofByte, err := fmp.GetLeafContent(fileReader)
if err != nil {
return nil, common.NewError("get_leaf_content_error", err.Error())
}
Expand Down
33 changes: 19 additions & 14 deletions code/go/0chain.net/blobbercore/filestore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ import (
"mime/multipart"
)

const CHUNK_SIZE = 64 * 1024
const (
CHUNK_SIZE = 64 * 1024
VERSION = 1
)

type FileInputData struct {
Name string
Expand Down Expand Up @@ -94,14 +97,15 @@ type FileDownloadResponse struct {
}

type ReadBlockInput struct {
AllocationID string
FileSize int64
Hash string
StartBlockNum int
NumBlocks int
IsThumbnail bool
VerifyDownload bool
IsPrecommit bool
AllocationID string
FileSize int64
Hash string
StartBlockNum int
NumBlocks int
IsThumbnail bool
VerifyDownload bool
IsPrecommit bool
FilestoreVersion int
}

type ChallengeResponse struct {
Expand All @@ -111,9 +115,10 @@ type ChallengeResponse struct {
}

type ChallengeReadBlockInput struct {
BlockOffset int
FileSize int64
Hash string
AllocationID string
IsPrecommit bool
BlockOffset int
FileSize int64
Hash string
AllocationID string
IsPrecommit bool
FilestoreVersion int
}