forked from kubernetes-sigs/kueue
-
Notifications
You must be signed in to change notification settings - Fork 1
/
batchjob_adapter.go
107 lines (87 loc) · 3.04 KB
/
batchjob_adapter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package multikueue
import (
"context"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1"
"sigs.k8s.io/kueue/pkg/controller/constants"
kueuejob "sigs.k8s.io/kueue/pkg/controller/jobs/job"
)
type batchJobAdapter struct{}
var _ jobAdapter = (*batchJobAdapter)(nil)
func (b *batchJobAdapter) SyncJob(ctx context.Context, localClient client.Client, remoteClient client.Client, key types.NamespacedName, workloadName, origin string) error {
localJob := batchv1.Job{}
err := localClient.Get(ctx, key, &localJob)
if err != nil {
return err
}
remoteJob := batchv1.Job{}
err = remoteClient.Get(ctx, key, &remoteJob)
if client.IgnoreNotFound(err) != nil {
return err
}
// the remote job exists
if err == nil {
// This will no longer be necessary when batchJob will support live status update, by then
// we should only sync the Status of the job if it's "Finished".
remoteFinished := false
for _, c := range remoteJob.Status.Conditions {
if (c.Type == batchv1.JobComplete || c.Type == batchv1.JobFailed) && c.Status == corev1.ConditionTrue {
remoteFinished = true
break
}
}
if remoteFinished {
localJob.Status = remoteJob.Status
return localClient.Status().Update(ctx, &localJob)
} else {
return nil
}
}
remoteJob = batchv1.Job{
ObjectMeta: cleanObjectMeta(&localJob.ObjectMeta),
Spec: *localJob.Spec.DeepCopy(),
}
// cleanup
// drop the selector
remoteJob.Spec.Selector = nil
// drop the templates cleanup labels
for _, cl := range kueuejob.ManagedLabels {
delete(remoteJob.Spec.Template.Labels, cl)
}
// add the prebuilt workload
if remoteJob.Labels == nil {
remoteJob.Labels = map[string]string{}
}
remoteJob.Labels[constants.PrebuiltWorkloadLabel] = workloadName
remoteJob.Labels[kueuealpha.MultiKueueOriginLabel] = origin
return remoteClient.Create(ctx, &remoteJob)
}
func (b *batchJobAdapter) DeleteRemoteObject(ctx context.Context, remoteClient client.Client, key types.NamespacedName) error {
job := batchv1.Job{}
err := remoteClient.Get(ctx, key, &job)
if err != nil {
return client.IgnoreNotFound(err)
}
return client.IgnoreNotFound(remoteClient.Delete(ctx, &job))
}
func (b *batchJobAdapter) KeepAdmissionCheckPending() bool {
return true
}
func (b *batchJobAdapter) IsJobManagedByKueue(_ context.Context, _ client.Client, _ types.NamespacedName) (bool, string, error) {
return true, "", nil
}