Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kubelet synchronizes pod states in a parallel manner. #124596

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
59 changes: 50 additions & 9 deletions pkg/kubelet/status/status_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ type manager struct {
podStatusesLock sync.RWMutex
podStatusChannel chan struct{}
// Map from (mirror) pod UID to latest status version successfully sent to the API server.
// apiStatusVersions must only be accessed from the sync thread.
apiStatusVersions map[kubetypes.MirrorPodUID]uint64
podDeletionSafety PodDeletionSafetyProvider
apiStatusVersions map[kubetypes.MirrorPodUID]uint64
apiStatusVersionsLock sync.RWMutex
podDeletionSafety PodDeletionSafetyProvider

podStartupLatencyHelper PodStartupLatencyStateHelper
// state allows to save/restore pod resource allocation and tolerate kubelet restarts.
Expand Down Expand Up @@ -761,6 +761,40 @@ func (m *manager) RemoveOrphanedStatuses(podUIDs map[types.UID]bool) {
}
}

// deleteApiStatusVersion removes API status version associated with the specified Mirror Pod UID in a thread-safe manner.
func (m *manager) deleteApiStatusVersion(uid kubetypes.MirrorPodUID) {
m.apiStatusVersionsLock.Lock()
defer m.apiStatusVersionsLock.Unlock()
delete(m.apiStatusVersions, uid)
}

// getApiStatusVersion retrieves the API status version for a given Mirror Pod UID in a thread-safe read operation.
func (m *manager) getApiStatusVersion(uid kubetypes.MirrorPodUID) (version uint64, ok bool) {
m.apiStatusVersionsLock.RLock()
defer m.apiStatusVersionsLock.RUnlock()
version, ok = m.apiStatusVersions[uid]
if ok {
return version, ok
}
return 0, false
}

// setApiStatusVersion sets the version information for the specified uid
func (m *manager) setApiStatusVersion(uid kubetypes.MirrorPodUID, version uint64) {
m.apiStatusVersionsLock.Lock()
defer m.apiStatusVersionsLock.Unlock()
m.apiStatusVersions[uid] = version
}

// setApiStatusVersions batch updates multiple API status versions concurrently in a thread-safe manner.
func (m *manager) setApiStatusVersions(statusVersions map[kubetypes.MirrorPodUID]uint64) {
m.apiStatusVersionsLock.Lock()
defer m.apiStatusVersionsLock.Unlock()
for uid, version := range statusVersions {
m.apiStatusVersions[uid] = version
}
}

// syncBatch syncs pods statuses with the apiserver. Returns the number of syncs
// attempted for testing.
func (m *manager) syncBatch(all bool) int {
Expand All @@ -782,7 +816,7 @@ func (m *manager) syncBatch(all bool) int {
_, hasPod := m.podStatuses[types.UID(uid)]
_, hasMirror := mirrorToPod[uid]
if !hasPod && !hasMirror {
delete(m.apiStatusVersions, uid)
m.deleteApiStatusVersion(uid)
}
}
}
Expand All @@ -806,7 +840,8 @@ func (m *manager) syncBatch(all bool) int {
// if a new status update has been delivered, trigger an update, otherwise the
// pod can wait for the next bulk check (which performs reconciliation as well)
if !all {
if m.apiStatusVersions[uidOfStatus] >= status.version {
statusVersion, _ := m.getApiStatusVersion(uidOfStatus)
if statusVersion >= status.version {
continue
}
updatedStatuses = append(updatedStatuses, podSync{uid, uidOfStatus, status})
Expand All @@ -823,16 +858,22 @@ func (m *manager) syncBatch(all bool) int {
// In most cases the deleted apiStatusVersions here should be filled
// soon after the following syncPod() [If the syncPod() sync an update
// successfully].
delete(m.apiStatusVersions, uidOfStatus)
m.deleteApiStatusVersion(uidOfStatus)
updatedStatuses = append(updatedStatuses, podSync{uid, uidOfStatus, status})
}
}
}()

wg := sync.WaitGroup{}
wg.Add(len(updatedStatuses))
for _, update := range updatedStatuses {
klog.V(5).InfoS("Sync pod status", "podUID", update.podUID, "statusUID", update.statusUID, "version", update.status.version)
m.syncPod(update.podUID, update.status)
go func() {
defer wg.Done()
m.syncPod(update.podUID, update.status)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

syncPod will cause multiple request to Apiserver, but kubelet's request to Apiserver is limited by kube-api-qps) and max-requests-inflight.

And if there is a large cluster, multiple pods are created at the same time, and the kubelet of multiple nodes runs syncPod in parallel at the same time, it will undoubtedly trigger throttling and affect the normal work of Apiserver.

This PR will bring unpredictable risks, I may prefer #123877

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And if there is a large cluster, multiple pods are created at the same time, and the kubelet of multiple nodes runs syncPod in parallel at the same time, it will undoubtedly trigger throttling and affect the normal work of Apiserver.

This PR will bring unpredictable risks, I may prefer #123877

Thanks for your comment. The impact of GET operations on the API server and etcd is considerably less than that of LIST(list all pods). Coupled with the apiserver APF, QPS limits, and the Kubelet QPS limits, this should suffice for the majority of scenarios.

}()
}
wg.Wait()

return len(updatedStatuses)
}
Expand Down Expand Up @@ -894,7 +935,7 @@ func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
metrics.PodStatusSyncDuration.Observe(duration.Seconds())
}

m.apiStatusVersions[kubetypes.MirrorPodUID(pod.UID)] = status.version
m.setApiStatusVersion(kubetypes.MirrorPodUID(pod.UID), status.version)

// We don't handle graceful deletion of mirror pods.
if m.canBeDeleted(pod, status.status, status.podIsFinished) {
Expand All @@ -917,7 +958,7 @@ func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
// needsUpdate returns whether the status is stale for the given pod UID.
// This method is not thread safe, and must only be accessed by the sync thread.
func (m *manager) needsUpdate(uid types.UID, status versionedPodStatus) bool {
latest, ok := m.apiStatusVersions[kubetypes.MirrorPodUID(uid)]
latest, ok := m.getApiStatusVersion(kubetypes.MirrorPodUID(uid))
if !ok || latest < status.version {
return true
}
Expand Down
59 changes: 50 additions & 9 deletions pkg/kubelet/status/status_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
utilfeature "k8s.io/apiserver/pkg/util/feature"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
Expand Down Expand Up @@ -71,6 +72,26 @@ func getTestPod() *v1.Pod {
}
}

// getTestPods generates a slice of test Pod objects based on the input integer n.
func getTestPods(n int) (pods []v1.Pod) {
for i := 0; i < n; i++ {
s := strconv.Itoa(i)
pod := v1.Pod{
TypeMeta: metav1.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
UID: types.UID(s),
Name: "foo" + s,
Namespace: "new",
},
}
pods = append(pods, pod)
}
return
}

// After adding reconciliation, if status in pod manager is different from the cached status, a reconciliation
// will be triggered, which will mess up all the old unit test.
// To simplify the implementation of unit test, we add testSyncBatch() here, it will make sure the statuses in
Expand Down Expand Up @@ -426,7 +447,9 @@ func TestStaleUpdates(t *testing.T) {

t.Log("... even if it's stale as long as nothing changes")
mirrorPodUID := kubetypes.MirrorPodUID(pod.UID)
m.apiStatusVersions[mirrorPodUID] = m.apiStatusVersions[mirrorPodUID] - 1

statusVersion, _ := m.getApiStatusVersion(mirrorPodUID)
m.setApiStatusVersion(mirrorPodUID, statusVersion-1)

m.SetPodStatus(pod, status)
m.syncBatch(true)
Expand All @@ -436,6 +459,19 @@ func TestStaleUpdates(t *testing.T) {
verifyUpdates(t, m, 0)
}

func TestManyPodsUpdates(t *testing.T) {
const podNums = 100
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhifei92 FWIW In my environment(Ubuntu 22.04 on Intel(R) Core(TM) i9-7920X CPU @ 2.90GHz 24 cores) the change slows down the test:
master:

kubernetes (master) $ make test WHAT=./pkg/kubelet/status GOFLAGS="-v" KUBE_TEST_ARGS='-count 1 -run ^TestManyPodsUpdates$'
+++ [0508 14:06:19] Set GOMAXPROCS automatically to 24
+++ [0508 14:06:19] Running tests without code coverage and with -race
=== RUN   TestManyPodsUpdates
--- PASS: TestManyPodsUpdates (0.05s)
PASS
ok  	k8s.io/kubernetes/pkg/kubelet/status	1.113s

this PR:

kubernetes (124596) $ make test WHAT=./pkg/kubelet/status GOFLAGS="-v" KUBE_TEST_ARGS='-count 1 -run ^TestManyPodsUpdates$'
+++ [0508 14:03:41] Set GOMAXPROCS automatically to 24
+++ [0508 14:03:42] Running tests without code coverage and with -race
=== RUN   TestManyPodsUpdates
--- PASS: TestManyPodsUpdates (0.13s)
PASS
ok  	k8s.io/kubernetes/pkg/kubelet/status	1.231s

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Yes, that's correct, especially if we're not simulating any latency scenarios, where the extra overhead of locks does become a factor. However, considering there is inherent network latency in the communication between the kubelet and the kube-apiserver, we need to emulate a delay of 10 to 20 milliseconds within the syncPod function to truly demonstrate the value of parallel processing."

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like provided unit test is not the best way to demonstrate benefits of the change. How about providing e2e test?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The e2e test should be better. I'll try it

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bart0sh Using the spec latency/resource should be within limit when create 90 pods with 0s interval should achieve the objective; if feasible, consider employing tc tc qdisc add dev lo root netem delay 20ms to simulate network latency.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it would be better to have it reproducible in an e2e (not e2e_node) test case(s). This would allow us to avoid simulation and see a real life delays.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I had mistakenly believed that an e2e_node test case was needed. I need to carefully contemplate how to cover this scenario using an end-to-end (e2e) test case. As I am not yet well-versed in e2e testing, it might take me some additional time to figure this out.

pods := getTestPods(podNums)
client := fake.NewSimpleClientset(&v1.PodList{Items: pods})
m := newTestManager(client)

for i := 0; i < podNums; i++ {
m.SetPodStatus(&pods[i], getRandomPodStatus())
}

verifyUpdates(t, m, podNums)
}

// shuffle returns a new shuffled list of container statuses.
func shuffle(statuses []v1.ContainerStatus) []v1.ContainerStatus {
numStatuses := len(statuses)
Expand Down Expand Up @@ -1320,13 +1356,16 @@ func TestSyncBatchCleanupVersions(t *testing.T) {
}

t.Logf("Orphaned pods should be removed.")
m.apiStatusVersions[kubetypes.MirrorPodUID(testPod.UID)] = 100
m.apiStatusVersions[kubetypes.MirrorPodUID(mirrorPod.UID)] = 200
m.setApiStatusVersions(map[kubetypes.MirrorPodUID]uint64{
kubetypes.MirrorPodUID(testPod.UID): 100,
kubetypes.MirrorPodUID(mirrorPod.UID): 200,
})
m.syncBatch(true)
if _, ok := m.apiStatusVersions[kubetypes.MirrorPodUID(testPod.UID)]; ok {

if _, ok := m.getApiStatusVersion(kubetypes.MirrorPodUID(testPod.UID)); ok {
t.Errorf("Should have cleared status for testPod")
}
if _, ok := m.apiStatusVersions[kubetypes.MirrorPodUID(mirrorPod.UID)]; ok {
if _, ok := m.getApiStatusVersion(kubetypes.MirrorPodUID(mirrorPod.UID)); ok {
t.Errorf("Should have cleared status for mirrorPod")
}

Expand All @@ -1337,13 +1376,15 @@ func TestSyncBatchCleanupVersions(t *testing.T) {
staticPod.UID = "static-uid"
staticPod.Annotations = map[string]string{kubetypes.ConfigSourceAnnotationKey: "file"}
m.podManager.(mutablePodManager).AddPod(staticPod)
m.apiStatusVersions[kubetypes.MirrorPodUID(testPod.UID)] = 100
m.apiStatusVersions[kubetypes.MirrorPodUID(mirrorPod.UID)] = 200
m.setApiStatusVersions(map[kubetypes.MirrorPodUID]uint64{
kubetypes.MirrorPodUID(testPod.UID): 100,
kubetypes.MirrorPodUID(mirrorPod.UID): 200,
})
m.testSyncBatch()
if _, ok := m.apiStatusVersions[kubetypes.MirrorPodUID(testPod.UID)]; !ok {
if _, ok := m.getApiStatusVersion(kubetypes.MirrorPodUID(testPod.UID)); !ok {
t.Errorf("Should not have cleared status for testPod")
}
if _, ok := m.apiStatusVersions[kubetypes.MirrorPodUID(mirrorPod.UID)]; !ok {
if _, ok := m.getApiStatusVersion(kubetypes.MirrorPodUID(mirrorPod.UID)); !ok {
t.Errorf("Should not have cleared status for mirrorPod")
}
}
Expand Down