Skip to content

Commit 1dda0dc

Browse files
committed
feat(scheduler): enhance sector's financials sync
1 parent c7490cb commit 1dda0dc

File tree

3 files changed

+19
-28
lines changed

3 files changed

+19
-28
lines changed

scheduler/internal/repository/pg/financial.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package pg
33
import (
44
"context"
55
"fmt"
6+
"sort"
67

78
"github.com/fcote/merlin/sheduler/internal/domain"
89
"github.com/fcote/merlin/sheduler/pkg/gmonitor"
@@ -229,5 +230,14 @@ where c.sector_id = $1;
229230
return nil, fmt.Errorf("could not read financial period rows: %w", err)
230231
}
231232

233+
sort.Slice(periods, func(i, j int) bool {
234+
yearI := periods[i].Year
235+
yearJ := periods[j].Year
236+
if yearI != yearJ {
237+
return yearI < yearJ
238+
}
239+
return periods[i].Period > periods[j].Period
240+
})
241+
232242
return periods, nil
233243
}

scheduler/internal/usecase/financial_sector.go

Lines changed: 8 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,8 @@ package usecase
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
6-
"runtime"
7-
8-
"github.com/sourcegraph/conc/pool"
97

108
"github.com/fcote/merlin/sheduler/internal/domain"
119
"github.com/fcote/merlin/sheduler/pkg/glog"
@@ -14,8 +12,6 @@ import (
1412
"github.com/fcote/merlin/sheduler/pkg/pointer"
1513
)
1614

17-
var sectorFinancialConcurrency = runtime.GOMAXPROCS(0)
18-
1915
type sectorJob struct {
2016
sector domain.Sector
2117
year int
@@ -49,30 +45,15 @@ func (uc FinancialSectorUsecase) SyncSectorFinancials(ctx context.Context, secto
4945
return fmt.Errorf("%s | could not fetch sector financial periods: %w", sector.Name, err)
5046
}
5147

52-
p := pool.New().
53-
WithErrors().
54-
WithContext(ctx).
55-
WithMaxGoroutines(sectorFinancialConcurrency)
56-
57-
uc.launchSyncs(p, sector, sectorPeriods)
58-
59-
if err := p.Wait(); err != nil {
60-
return err
61-
}
62-
63-
return nil
64-
}
65-
66-
func (uc FinancialSectorUsecase) launchSyncs(pool *pool.ContextPool, sector domain.Sector, sectorPeriods []domain.FinancialYearPeriod) {
6748
for _, period := range sectorPeriods {
68-
pool.Go(func(ctx context.Context) error {
69-
return uc.sync(ctx, sectorJob{
70-
sector: sector,
71-
year: period.Year,
72-
period: period.Period,
73-
})
74-
})
49+
err = errors.Join(err, uc.sync(ctx, sectorJob{
50+
sector: sector,
51+
year: period.Year,
52+
period: period.Period,
53+
}))
7554
}
55+
56+
return err
7657
}
7758

7859
func (uc FinancialSectorUsecase) sync(ctx context.Context, job sectorJob) error {

scheduler/internal/usecase/financial_ttm.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ func last4QuartersMap(financials domain.Financials, financialItemMap map[int]dom
100100

101101
for _, financial := range financials {
102102
// Only store quarters financials
103-
if financial.Period == domain.FinancialPeriodY {
103+
if !financial.Period.IsQuarter() {
104104
continue
105105
}
106106
fi := financialItemMap[financial.FinancialItemId]

0 commit comments

Comments
 (0)