Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wip] feat: curio: Unseal pipeline #11878

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 9 additions & 7 deletions curiosrc/ffi/sdr_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ func (l *storageProvider) AcquireSector(ctx context.Context, taskID *harmonytask
}, nil
}

func (sb *SealCalls) GenerateSDR(ctx context.Context, taskID harmonytask.TaskID, sector storiface.SectorRef, ticket abi.SealRandomness, commKcid cid.Cid) error {
paths, pathIDs, releaseSector, err := sb.sectors.AcquireSector(ctx, &taskID, sector, storiface.FTNone, storiface.FTCache, storiface.PathSealing)
func (sb *SealCalls) GenerateSDR(ctx context.Context, taskID harmonytask.TaskID, into storiface.SectorFileType, sector storiface.SectorRef, ticket abi.SealRandomness, commKcid cid.Cid) error {
paths, pathIDs, releaseSector, err := sb.sectors.AcquireSector(ctx, &taskID, sector, storiface.FTNone, into, storiface.PathSealing)
if err != nil {
return xerrors.Errorf("acquiring sector paths: %w", err)
}
Expand All @@ -148,25 +148,27 @@ func (sb *SealCalls) GenerateSDR(ctx context.Context, taskID harmonytask.TaskID,
return xerrors.Errorf("computing replica id: %w", err)
}

intoPath := storiface.PathByType(paths, into)

// make sure the cache dir is empty
if err := os.RemoveAll(paths.Cache); err != nil {
if err := os.RemoveAll(intoPath); err != nil {
return xerrors.Errorf("removing cache dir: %w", err)
}
if err := os.MkdirAll(paths.Cache, 0755); err != nil {
if err := os.MkdirAll(intoPath, 0755); err != nil {
return xerrors.Errorf("mkdir cache dir: %w", err)
}

// generate new sector key
err = ffi.GenerateSDR(
sector.ProofType,
paths.Cache,
intoPath,
replicaID,
)
if err != nil {
return xerrors.Errorf("generating SDR %d (%s): %w", sector.ID.Number, paths.Unsealed, err)
return xerrors.Errorf("generating SDR %d (%s): %w", sector.ID.Number, intoPath, err)
}

if err := sb.ensureOneCopy(ctx, sector.ID, pathIDs, storiface.FTCache); err != nil {
if err := sb.ensureOneCopy(ctx, sector.ID, pathIDs, into); err != nil {
return xerrors.Errorf("ensure one copy: %w", err)
}

Expand Down
4 changes: 2 additions & 2 deletions curiosrc/seal/task_sdr.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (s *SDRTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bo
// Trees; After one retry, it should return the sector to the
// SDR stage; max number of retries should be configurable

err = s.sc.GenerateSDR(ctx, taskID, sref, ticket, commd)
err = s.sc.GenerateSDR(ctx, taskID, storiface.FTCache, sref, ticket, commd)
if err != nil {
return false, xerrors.Errorf("generating sdr: %w", err)
}
Expand Down Expand Up @@ -200,7 +200,7 @@ func (s *SDRTask) TypeDetails() harmonytask.TaskTypeDetails {
res := harmonytask.TaskTypeDetails{
Max: s.max,
Name: "SDR",
Cost: resources.Resources{ // todo offset for prefetch?
Cost: resources.Resources{
Cpu: 4, // todo multicore sdr
Gpu: 0,
Ram: 54 << 30,
Expand Down
129 changes: 129 additions & 0 deletions curiosrc/unseal/task_unseal_sdr.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package unseal

import (
"context"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/curiosrc/ffi"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/lib/harmony/harmonytask"
"github.com/filecoin-project/lotus/lib/harmony/resources"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
"golang.org/x/xerrors"
)

var isDevnet = build.BlockDelaySecs < 30

type UnsealSDRApi interface {
StateSectorGetInfo(context.Context, address.Address, abi.SectorNumber, types.TipSetKey) (*miner.SectorOnChainInfo, error)
}

type TaskUnsealSdr struct {
max int

sc *ffi.SealCalls
db *harmonydb.DB
api UnsealSDRApi
}

func (t *TaskUnsealSdr) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
ctx := context.Background()

var sectorParamsArr []struct {
SpID int64 `db:"sp_id"`
SectorNumber int64 `db:"sector_number"`
}

err = t.db.Select(ctx, &sectorParamsArr, `
SELECT sp_id, sector_number
FROM sectors_unseal_pipeline
WHERE task_id_unseal_sdr = $1`, taskID)
if err != nil {
return false, xerrors.Errorf("getting sector params: %w", err)
}

if len(sectorParamsArr) == 0 {
return false, xerrors.Errorf("no sector params")
}

sectorParams := sectorParamsArr[0]

maddr, err := address.NewIDAddress(uint64(sectorParams.SpID))
if err != nil {
return false, xerrors.Errorf("failed to convert miner ID to address: %w", err)
}

sinfo, err := t.api.StateSectorGetInfo(ctx, maddr, abi.SectorNumber(sectorParams.SectorNumber), types.EmptyTSK)
if err != nil {
return false, xerrors.Errorf("getting sector info: %w", err)
}
if sinfo == nil {
return false, xerrors.Errorf("sector not found")
}

sref := storiface.SectorRef{
ID: abi.SectorID{
Miner: abi.ActorID(sectorParams.SpID),
Number: abi.SectorNumber(sectorParams.SectorNumber),
},
ProofType: sinfo.SealProof,
}

t.sc.GenerateSDR(ctx, taskID, storiface.FTKeyCache, sref,
}

Check failure on line 76 in curiosrc/unseal/task_unseal_sdr.go

View workflow job for this annotation

GitHub Actions / Check (gofmt)

expected operand, found '}'

Check failure on line 76 in curiosrc/unseal/task_unseal_sdr.go

View workflow job for this annotation

GitHub Actions / Check (gen-check)

expected operand, found '}'

Check failure on line 76 in curiosrc/unseal/task_unseal_sdr.go

View workflow job for this annotation

GitHub Actions / Check (docs-check)

expected operand, found '}'

Check failure on line 76 in curiosrc/unseal/task_unseal_sdr.go

View workflow job for this annotation

GitHub Actions / Check (lint-all)

syntax error: unexpected }, expected expression (typecheck)

Check failure on line 76 in curiosrc/unseal/task_unseal_sdr.go

View workflow job for this annotation

GitHub Actions / Check (lint-all)

expected operand, found '}' (typecheck)

Check failure on line 76 in curiosrc/unseal/task_unseal_sdr.go

View workflow job for this annotation

GitHub Actions / Test (unit-rest)

syntax error: unexpected }, expected expression

Check failure on line 76 in curiosrc/unseal/task_unseal_sdr.go

View workflow job for this annotation

GitHub Actions / Test (unit-rest)

syntax error: unexpected }, expected expression

func (t *TaskUnsealSdr) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) {
id := ids[0]
return &id, nil

Check failure on line 80 in curiosrc/unseal/task_unseal_sdr.go

View workflow job for this annotation

GitHub Actions / Check (gofmt)

missing ',' in argument list

Check failure on line 80 in curiosrc/unseal/task_unseal_sdr.go

View workflow job for this annotation

GitHub Actions / Check (gen-check)

missing ',' in argument list

Check failure on line 80 in curiosrc/unseal/task_unseal_sdr.go

View workflow job for this annotation

GitHub Actions / Check (docs-check)

missing ',' in argument list

Check failure on line 80 in curiosrc/unseal/task_unseal_sdr.go

View workflow job for this annotation

GitHub Actions / Check (lint-all)

missing ',' in argument list (typecheck)
}

Check failure on line 81 in curiosrc/unseal/task_unseal_sdr.go

View workflow job for this annotation

GitHub Actions / Check (gofmt)

expected operand, found '}'

Check failure on line 81 in curiosrc/unseal/task_unseal_sdr.go

View workflow job for this annotation

GitHub Actions / Check (gen-check)

expected operand, found '}'

Check failure on line 81 in curiosrc/unseal/task_unseal_sdr.go

View workflow job for this annotation

GitHub Actions / Check (docs-check)

expected operand, found '}'

Check failure on line 81 in curiosrc/unseal/task_unseal_sdr.go

View workflow job for this annotation

GitHub Actions / Check (lint-all)

expected operand, found '}' (typecheck)

func (t *TaskUnsealSdr) TypeDetails() harmonytask.TaskTypeDetails {
ssize := abi.SectorSize(32 << 30) // todo task details needs taskID to get correct sector size
if isDevnet {

Check failure on line 85 in curiosrc/unseal/task_unseal_sdr.go

View workflow job for this annotation

GitHub Actions / Check (gofmt)

missing ',' in argument list

Check failure on line 85 in curiosrc/unseal/task_unseal_sdr.go

View workflow job for this annotation

GitHub Actions / Check (gen-check)

missing ',' in argument list

Check failure on line 85 in curiosrc/unseal/task_unseal_sdr.go

View workflow job for this annotation

GitHub Actions / Check (docs-check)

missing ',' in argument list

Check failure on line 85 in curiosrc/unseal/task_unseal_sdr.go

View workflow job for this annotation

GitHub Actions / Check (lint-all)

missing ',' in argument list (typecheck)
ssize = abi.SectorSize(2 << 20)

Check failure on line 86 in curiosrc/unseal/task_unseal_sdr.go

View workflow job for this annotation

GitHub Actions / Check (gofmt)

expected '==', found '='

Check failure on line 86 in curiosrc/unseal/task_unseal_sdr.go

View workflow job for this annotation

GitHub Actions / Check (gen-check)

expected '==', found '='

Check failure on line 86 in curiosrc/unseal/task_unseal_sdr.go

View workflow job for this annotation

GitHub Actions / Check (docs-check)

expected '==', found '='

Check failure on line 86 in curiosrc/unseal/task_unseal_sdr.go

View workflow job for this annotation

GitHub Actions / Check (lint-all)

expected '==', found '=' (typecheck)
}

Check failure on line 87 in curiosrc/unseal/task_unseal_sdr.go

View workflow job for this annotation

GitHub Actions / Check (gofmt)

missing ',' before newline in argument list

Check failure on line 87 in curiosrc/unseal/task_unseal_sdr.go

View workflow job for this annotation

GitHub Actions / Check (gen-check)

missing ',' before newline in argument list

Check failure on line 87 in curiosrc/unseal/task_unseal_sdr.go

View workflow job for this annotation

GitHub Actions / Check (docs-check)

missing ',' before newline in argument list

Check failure on line 87 in curiosrc/unseal/task_unseal_sdr.go

View workflow job for this annotation

GitHub Actions / Check (lint-all)

missing ',' before newline in argument list (typecheck)

res := harmonytask.TaskTypeDetails{

Check failure on line 89 in curiosrc/unseal/task_unseal_sdr.go

View workflow job for this annotation

GitHub Actions / Check (gofmt)

missing ',' in argument list

Check failure on line 89 in curiosrc/unseal/task_unseal_sdr.go

View workflow job for this annotation

GitHub Actions / Check (gen-check)

missing ',' in argument list

Check failure on line 89 in curiosrc/unseal/task_unseal_sdr.go

View workflow job for this annotation

GitHub Actions / Check (docs-check)

missing ',' in argument list

Check failure on line 89 in curiosrc/unseal/task_unseal_sdr.go

View workflow job for this annotation

GitHub Actions / Check (lint-all)

missing ',' in argument list (typecheck)
Max: t.max,
Name: "SDRKeyRegen",
Cost: resources.Resources{
Cpu: 4, // todo multicore sdr
Gpu: 0,
Ram: 54 << 30,
Storage: t.sc.Storage(t.taskToSector, storiface.FTKeyCache, storiface.FTNone, ssize, storiface.PathSealing),
},
MaxFailures: 2,
Follows: nil,
}

Check failure on line 100 in curiosrc/unseal/task_unseal_sdr.go

View workflow job for this annotation

GitHub Actions / Check (gofmt)

missing ',' before newline in argument list

Check failure on line 100 in curiosrc/unseal/task_unseal_sdr.go

View workflow job for this annotation

GitHub Actions / Check (gen-check)

missing ',' before newline in argument list

Check failure on line 100 in curiosrc/unseal/task_unseal_sdr.go

View workflow job for this annotation

GitHub Actions / Check (docs-check)

missing ',' before newline in argument list

Check failure on line 100 in curiosrc/unseal/task_unseal_sdr.go

View workflow job for this annotation

GitHub Actions / Check (lint-all)

missing ',' before newline in argument list (typecheck)

if isDevnet {

Check failure on line 102 in curiosrc/unseal/task_unseal_sdr.go

View workflow job for this annotation

GitHub Actions / Check (gofmt)

expected operand, found 'if'

Check failure on line 102 in curiosrc/unseal/task_unseal_sdr.go

View workflow job for this annotation

GitHub Actions / Check (gen-check)

expected operand, found 'if'

Check failure on line 102 in curiosrc/unseal/task_unseal_sdr.go

View workflow job for this annotation

GitHub Actions / Check (docs-check)

expected operand, found 'if'

Check failure on line 102 in curiosrc/unseal/task_unseal_sdr.go

View workflow job for this annotation

GitHub Actions / Check (lint-all)

expected operand, found 'if' (typecheck)
res.Cost.Ram = 1 << 30

Check failure on line 103 in curiosrc/unseal/task_unseal_sdr.go

View workflow job for this annotation

GitHub Actions / Check (gofmt)

expected '==', found '='

Check failure on line 103 in curiosrc/unseal/task_unseal_sdr.go

View workflow job for this annotation

GitHub Actions / Check (gen-check)

expected '==', found '='

Check failure on line 103 in curiosrc/unseal/task_unseal_sdr.go

View workflow job for this annotation

GitHub Actions / Check (docs-check)

expected '==', found '='
}

return res
}

func (t *TaskUnsealSdr) Adder(taskFunc harmonytask.AddTaskFunc) {
//TODO implement me
panic("implement me")
}

func (t *TaskUnsealSdr) taskToSector(id harmonytask.TaskID) (ffi.SectorRef, error) {
var refs []ffi.SectorRef

err := t.db.Select(context.Background(), &refs, `SELECT sp_id, sector_number, reg_seal_proof FROM sectors_unseal_pipeline WHERE task_id_unseal_sdr = $1`, id)
if err != nil {
return ffi.SectorRef{}, xerrors.Errorf("getting sector ref: %w", err)
}

if len(refs) != 1 {
return ffi.SectorRef{}, xerrors.Errorf("expected 1 sector ref, got %d", len(refs))
}

return refs[0], nil
}

var _ harmonytask.TaskInterface = (*TaskUnsealSdr)(nil)
25 changes: 25 additions & 0 deletions lib/harmony/harmonydb/sql/20240415-unseal-pipeline.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
create table sectors_unseal_pipeline (
sp_id bigint not null,
sector_number bigint not null,

create_time timestamp not null default current_timestamp,

task_id_unseal_sdr bigint, -- builds unseal cache
after_unseal_sdr bool not null default false,

task_id_decode_sector bigint, -- makes the "unsealed" copy (runs either next to unseal cache OR in long-term storage)
after_decode_sector bool not null default false,

task_id_move_storage bigint, -- optional, moves the unsealed sector to storage
after_move_storage bool not null default false,

-- note: those foreign keys are a part of the retry mechanism. If a task
-- fails due to retry limit, it will drop the assigned task_id, and the
-- poller will reassign the task to a new node if it deems the task is
-- still valid to be retried.
foreign key (task_id_unseal_sdr) references harmony_task (id) on delete set null,
foreign key (task_id_decode_sector) references harmony_task (id) on delete set null,
foreign key (task_id_move_storage) references harmony_task (id) on delete set null,

primary key (sp_id, sector_number)
);
3 changes: 3 additions & 0 deletions storage/sealer/ffiwrapper/basicfs/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ func (b *Provider) AcquireSector(ctx context.Context, id storiface.SectorRef, ex
if err := os.Mkdir(filepath.Join(b.Root, storiface.FTPiece.String()), 0755); err != nil && !os.IsExist(err) { // nolint
return storiface.SectorPaths{}, nil, err
}
if err := os.Mkdir(filepath.Join(b.Root, storiface.FTKeyCache.String()), 0755); err != nil && !os.IsExist(err) { // nolint
return storiface.SectorPaths{}, nil, err
}

done := func() {}

Expand Down
16 changes: 15 additions & 1 deletion storage/sealer/storiface/filetype.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ const (
// Piece Park
FTPiece

// Curio unseal
FTKeyCache

FileTypes = iota
)

Expand All @@ -54,7 +57,7 @@ const (
// - FTUpdate: represents snap sectors
// - FTUpdateCache: represents snap cache sectors
// - FTPiece: represents Piece Park sectors
var PathTypes = []SectorFileType{FTUnsealed, FTSealed, FTCache, FTUpdate, FTUpdateCache, FTPiece}
var PathTypes = []SectorFileType{FTUnsealed, FTSealed, FTCache, FTUpdate, FTUpdateCache, FTPiece, FTKeyCache}

// FTNone represents a sector file type of none. This constant is used in the StorageLock method to specify that a sector should not have any file locked.
// Example usage:
Expand Down Expand Up @@ -86,6 +89,7 @@ var FSOverheadSeal = map[SectorFileType]int{ // 10x overheads
FTUpdateCache: FSOverheadDen*2 + 1,
FTCache: 141, // 11 layers + D(2x ssize) + C + R'
FTPiece: FSOverheadDen,
FTKeyCache: FSOverheadDen*11 + 1,
}

// sector size * disk / fs overhead. FSOverheadDen is like the unit of sector size
Expand All @@ -103,6 +107,7 @@ var FsOverheadFinalized = map[SectorFileType]int{
FTUpdateCache: 1,
FTCache: 1,
FTPiece: FSOverheadDen,
FTKeyCache: FSOverheadDen,
}

// SectorFileType represents the type of a sector file
Expand All @@ -126,6 +131,8 @@ func TypeFromString(s string) (SectorFileType, error) {
return FTUpdateCache, nil
case "piece":
return FTPiece, nil
case "key-cache":
return FTKeyCache, nil
default:
return 0, xerrors.Errorf("unknown sector file type '%s'", s)
}
Expand All @@ -146,6 +153,8 @@ func (t SectorFileType) String() string {
return "update-cache"
case FTPiece:
return "piece"
case FTKeyCache:
return "key-cache"
default:
return fmt.Sprintf("<unknown %d %v>", t, (t & ((1 << FileTypes) - 1)).Strings())
}
Expand Down Expand Up @@ -334,6 +343,7 @@ type SectorPaths struct {
Update string
UpdateCache string
Piece string
KeyCache string
}

// HasAllSet checks if all paths of a SectorPaths struct are set for a given SectorFileType.
Expand Down Expand Up @@ -425,6 +435,8 @@ func PathByType(sps SectorPaths, fileType SectorFileType) string {
return sps.UpdateCache
case FTPiece:
return sps.Piece
case FTKeyCache:
return sps.KeyCache
}

panic("requested unknown path type")
Expand All @@ -444,6 +456,8 @@ func SetPathByType(sps *SectorPaths, fileType SectorFileType, p string) {
sps.UpdateCache = p
case FTPiece:
sps.Piece = p
case FTKeyCache:
sps.KeyCache = p
}
}

Expand Down