-
Notifications
You must be signed in to change notification settings - Fork 38.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Kubelet synchronizes pod states in a parallel manner. #124596
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,6 +35,7 @@ import ( | |
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
"k8s.io/apimachinery/pkg/runtime" | ||
"k8s.io/apimachinery/pkg/runtime/schema" | ||
"k8s.io/apimachinery/pkg/types" | ||
utilfeature "k8s.io/apiserver/pkg/util/feature" | ||
clientset "k8s.io/client-go/kubernetes" | ||
"k8s.io/client-go/kubernetes/fake" | ||
|
@@ -71,6 +72,26 @@ func getTestPod() *v1.Pod { | |
} | ||
} | ||
|
||
// getTestPods generates a slice of test Pod objects based on the input integer n. | ||
func getTestPods(n int) (pods []v1.Pod) { | ||
for i := 0; i < n; i++ { | ||
s := strconv.Itoa(i) | ||
pod := v1.Pod{ | ||
TypeMeta: metav1.TypeMeta{ | ||
Kind: "Pod", | ||
APIVersion: "v1", | ||
}, | ||
ObjectMeta: metav1.ObjectMeta{ | ||
UID: types.UID(s), | ||
Name: "foo" + s, | ||
Namespace: "new", | ||
}, | ||
} | ||
pods = append(pods, pod) | ||
} | ||
return | ||
} | ||
|
||
// After adding reconciliation, if status in pod manager is different from the cached status, a reconciliation | ||
// will be triggered, which will mess up all the old unit test. | ||
// To simplify the implementation of unit test, we add testSyncBatch() here, it will make sure the statuses in | ||
|
@@ -426,7 +447,9 @@ func TestStaleUpdates(t *testing.T) { | |
|
||
t.Log("... even if it's stale as long as nothing changes") | ||
mirrorPodUID := kubetypes.MirrorPodUID(pod.UID) | ||
m.apiStatusVersions[mirrorPodUID] = m.apiStatusVersions[mirrorPodUID] - 1 | ||
|
||
statusVersion, _ := m.getApiStatusVersion(mirrorPodUID) | ||
m.setApiStatusVersion(mirrorPodUID, statusVersion-1) | ||
|
||
m.SetPodStatus(pod, status) | ||
m.syncBatch(true) | ||
|
@@ -436,6 +459,19 @@ func TestStaleUpdates(t *testing.T) { | |
verifyUpdates(t, m, 0) | ||
} | ||
|
||
func TestManyPodsUpdates(t *testing.T) { | ||
const podNums = 100 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @zhifei92 FWIW In my environment(Ubuntu 22.04 on Intel(R) Core(TM) i9-7920X CPU @ 2.90GHz 24 cores) the change slows down the test:
this PR:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "Yes, that's correct, especially if we're not simulating any latency scenarios, where the extra overhead of locks does become a factor. However, considering there is inherent network latency in the communication between the kubelet and the kube-apiserver, we need to emulate a delay of 10 to 20 milliseconds within the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It looks like provided unit test is not the best way to demonstrate benefits of the change. How about providing e2e test? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The e2e test should be better. I'll try it There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @bart0sh Using the spec There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe it would be better to have it reproducible in an e2e (not e2e_node) test case(s). This would allow us to avoid simulation and see a real life delays. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, I had mistakenly believed that an e2e_node test case was needed. I need to carefully contemplate how to cover this scenario using an end-to-end (e2e) test case. As I am not yet well-versed in e2e testing, it might take me some additional time to figure this out. |
||
pods := getTestPods(podNums) | ||
client := fake.NewSimpleClientset(&v1.PodList{Items: pods}) | ||
m := newTestManager(client) | ||
|
||
for i := 0; i < podNums; i++ { | ||
m.SetPodStatus(&pods[i], getRandomPodStatus()) | ||
} | ||
|
||
verifyUpdates(t, m, podNums) | ||
} | ||
|
||
// shuffle returns a new shuffled list of container statuses. | ||
func shuffle(statuses []v1.ContainerStatus) []v1.ContainerStatus { | ||
numStatuses := len(statuses) | ||
|
@@ -1320,13 +1356,16 @@ func TestSyncBatchCleanupVersions(t *testing.T) { | |
} | ||
|
||
t.Logf("Orphaned pods should be removed.") | ||
m.apiStatusVersions[kubetypes.MirrorPodUID(testPod.UID)] = 100 | ||
m.apiStatusVersions[kubetypes.MirrorPodUID(mirrorPod.UID)] = 200 | ||
m.setApiStatusVersions(map[kubetypes.MirrorPodUID]uint64{ | ||
kubetypes.MirrorPodUID(testPod.UID): 100, | ||
kubetypes.MirrorPodUID(mirrorPod.UID): 200, | ||
}) | ||
m.syncBatch(true) | ||
if _, ok := m.apiStatusVersions[kubetypes.MirrorPodUID(testPod.UID)]; ok { | ||
|
||
if _, ok := m.getApiStatusVersion(kubetypes.MirrorPodUID(testPod.UID)); ok { | ||
t.Errorf("Should have cleared status for testPod") | ||
} | ||
if _, ok := m.apiStatusVersions[kubetypes.MirrorPodUID(mirrorPod.UID)]; ok { | ||
if _, ok := m.getApiStatusVersion(kubetypes.MirrorPodUID(mirrorPod.UID)); ok { | ||
t.Errorf("Should have cleared status for mirrorPod") | ||
} | ||
|
||
|
@@ -1337,13 +1376,15 @@ func TestSyncBatchCleanupVersions(t *testing.T) { | |
staticPod.UID = "static-uid" | ||
staticPod.Annotations = map[string]string{kubetypes.ConfigSourceAnnotationKey: "file"} | ||
m.podManager.(mutablePodManager).AddPod(staticPod) | ||
m.apiStatusVersions[kubetypes.MirrorPodUID(testPod.UID)] = 100 | ||
m.apiStatusVersions[kubetypes.MirrorPodUID(mirrorPod.UID)] = 200 | ||
m.setApiStatusVersions(map[kubetypes.MirrorPodUID]uint64{ | ||
kubetypes.MirrorPodUID(testPod.UID): 100, | ||
kubetypes.MirrorPodUID(mirrorPod.UID): 200, | ||
}) | ||
m.testSyncBatch() | ||
if _, ok := m.apiStatusVersions[kubetypes.MirrorPodUID(testPod.UID)]; !ok { | ||
if _, ok := m.getApiStatusVersion(kubetypes.MirrorPodUID(testPod.UID)); !ok { | ||
t.Errorf("Should not have cleared status for testPod") | ||
} | ||
if _, ok := m.apiStatusVersions[kubetypes.MirrorPodUID(mirrorPod.UID)]; !ok { | ||
if _, ok := m.getApiStatusVersion(kubetypes.MirrorPodUID(mirrorPod.UID)); !ok { | ||
t.Errorf("Should not have cleared status for mirrorPod") | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
syncPod
will cause multiple request to Apiserver, but kubelet's request to Apiserver is limited by kube-api-qps) and max-requests-inflight.And if there is a large cluster, multiple pods are created at the same time, and the kubelet of multiple nodes runs
syncPod
in parallel at the same time, it will undoubtedly trigger throttling and affect the normal work of Apiserver.This PR will bring unpredictable risks, I may prefer #123877
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your comment. The impact of GET operations on the API server and etcd is considerably less than that of LIST(list all pods). Coupled with the apiserver APF, QPS limits, and the Kubelet QPS limits, this should suffice for the majority of scenarios.