Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

concurrency: add ConcurrencyManager test for lock promotion deadlocks #123209

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
31 changes: 21 additions & 10 deletions pkg/kv/kvserver/concurrency/concurrency_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ import (
// debug-set-discovered-locks-threshold-to-consult-txn-status-cache n=<count>
// debug-set-batch-pushed-lock-resolution-enabled ok=<enabled>
// debug-set-max-locks n=<count>
// reset
// reset [namespace|force]
func TestConcurrencyManagerBasic(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -634,7 +634,13 @@ func TestConcurrencyManagerBasic(t *testing.T) {

case "reset":
if n := mon.numMonitored(); n > 0 {
d.Fatalf(t, "%d requests still in flight", n)
if d.HasArg("force") {
for gs := range mon.gs {
gs.ctxCancel()
}
} else {
d.Fatalf(t, "%d requests still in flight", n)
}
}
mon.resetSeqNums()
if err := c.reset(); err != nil {
Expand Down Expand Up @@ -1113,6 +1119,7 @@ type monitoredGoroutine struct {
finished int32

ctx context.Context
ctxCancel func()
collect func() tracingpb.Recording
cancel func()
prevEvents int
Expand All @@ -1128,11 +1135,13 @@ func newMonitor() *monitor {
}

func (m *monitor) runSync(opName string, fn func(context.Context)) {
ctx, sp := m.tr.StartSpanCtx(context.Background(), opName, tracing.WithRecording(tracingpb.RecordingVerbose))
ctx, ctxCancel := context.WithCancel(context.Background())
ctx, sp := m.tr.StartSpanCtx(ctx, opName, tracing.WithRecording(tracingpb.RecordingVerbose))
g := &monitoredGoroutine{
opSeq: 0, // synchronous
opName: opName,
ctx: ctx,
opSeq: 0, // synchronous
opName: opName,
ctx: ctx,
ctxCancel: ctxCancel,
collect: func() tracingpb.Recording {
return sp.GetConfiguredRecording()
},
Expand All @@ -1145,11 +1154,13 @@ func (m *monitor) runSync(opName string, fn func(context.Context)) {

func (m *monitor) runAsync(opName string, fn func(context.Context)) (cancel func()) {
m.seq++
ctx, sp := m.tr.StartSpanCtx(context.Background(), opName, tracing.WithRecording(tracingpb.RecordingVerbose))
ctx, ctxCancel := context.WithCancel(context.Background())
ctx, sp := m.tr.StartSpanCtx(ctx, opName, tracing.WithRecording(tracingpb.RecordingVerbose))
g := &monitoredGoroutine{
opSeq: m.seq,
opName: opName,
ctx: ctx,
opSeq: m.seq,
opName: opName,
ctx: ctx,
ctxCancel: ctxCancel,
collect: func() tracingpb.Recording {
return sp.GetConfiguredRecording()
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1330,3 +1330,99 @@ finish req=req5

reset namespace
----

# -----------------------------------------------------------------------------
# Deadlock test where the deadlock is created transactions promoting shared
# locks to something stronger.
# -----------------------------------------------------------------------------

new-txn name=txn1 ts=10,1 epoch=0
----

new-txn name=txn2 ts=10,1 epoch=0
----

new-request name=req1 txn=txn1 ts=10,1
get key=a str=shared
----

sequence req=req1
----
[1] sequence req1: sequencing request
[1] sequence req1: acquiring latches
[1] sequence req1: scanning lock table for conflicting locks
[1] sequence req1: sequencing complete, returned guard

on-lock-acquired req=req1 key=a dur=u str=shared
----
[-] acquire lock: txn 00000001 @ ‹a›

finish req=req1
----
[-] finish req1: finishing request

new-request name=req2 txn=txn2 ts=10,1
get key=a str=shared
----

sequence req=req2
----
[2] sequence req2: sequencing request
[2] sequence req2: acquiring latches
[2] sequence req2: scanning lock table for conflicting locks
[2] sequence req2: sequencing complete, returned guard

on-lock-acquired req=req2 key=a dur=u str=shared
----
[-] acquire lock: txn 00000002 @ ‹a›

finish req=req2
----
[-] finish req2: finishing request

new-request name=req3 txn=txn1 ts=10,1
get key=a str=exclusive
----

sequence req=req3
----
[3] sequence req3: sequencing request
[3] sequence req3: acquiring latches
[3] sequence req3: scanning lock table for conflicting locks
[3] sequence req3: waiting in lock wait-queues
[3] sequence req3: lock wait-queue event: wait for txn 00000002 holding lock @ key ‹"a"› (queuedLockingRequests: 1, queuedReaders: 0)
[3] sequence req3: pushing after 0s for: deadlock/liveness detection = true, timeout enforcement = false, priority enforcement = false, wait policy error = false
[3] sequence req3: pushing txn 00000002 to abort
[3] sequence req3: blocked on select in concurrency_test.(*cluster).PushTransaction


new-request name=req4 txn=txn2 ts=10,1
get key=a str=exclusive
----

debug-lock-table
----
num=1
lock: "a"
holders: txn: 00000001-0000-0000-0000-000000000000 epoch: 0, iso: Serializable, info: unrepl [(str: Shared seq: 0)]
txn: 00000002-0000-0000-0000-000000000000 epoch: 0, iso: Serializable, info: unrepl [(str: Shared seq: 0)]
queued locking requests:
active: true req: 43 promoting: true, strength: Exclusive, txn: 00000001-0000-0000-0000-000000000000

sequence req=req4
----
[3] sequence req3: dependency cycle detected 00000001->00000002->00000001
[4] sequence req4: sequencing request
[4] sequence req4: acquiring latches
[4] sequence req4: scanning lock table for conflicting locks
[4] sequence req4: waiting in lock wait-queues
[4] sequence req4: lock wait-queue event: wait for txn 00000001 holding lock @ key ‹"a"› (queuedLockingRequests: 2, queuedReaders: 0)
[4] sequence req4: pushing after 0s for: deadlock/liveness detection = true, timeout enforcement = false, priority enforcement = false, wait policy error = false
[4] sequence req4: pushing txn 00000001 to abort
[4] sequence req4: blocked on select in concurrency_test.(*cluster).PushTransaction
[4] sequence req4: dependency cycle detected 00000002->00000001->00000002

# We can't break the deadlock and get deterministic behaviour unfortunately.

reset force
----