Skip to content

Commit

Permalink
Improvements in threading model in dap use case.
Browse files Browse the repository at this point in the history
  • Loading branch information
fabioz committed Feb 18, 2024
1 parent 262e341 commit 08cafa3
Show file tree
Hide file tree
Showing 8 changed files with 145 additions and 36 deletions.
6 changes: 5 additions & 1 deletion .github/workflows/pydevd-tests-python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,17 @@ jobs:
pip install psutil --no-warn-script-location
pip install ipython --no-warn-script-location
pip install untangle --no-warn-script-location
pip install "django<=4.2" --no-warn-script-location
- name: Install Python 3.x deps
if: contains(matrix.name, 'py3') && !contains(matrix.name, 'pypy') && !contains(matrix.name, 'py312') && !contains(matrix.name, 'py311')
run: |
pip install PySide2 --no-warn-script-location
pip install cherrypy --no-warn-script-location
pip install gevent greenlet
- name: Install django
if: "!contains(matrix.name, 'py38')"
run: pip install "django<=4.2" --no-warn-script-location

- name: Install Pandas
if: contains(matrix.name, 'py310') && !contains(matrix.name, 'pypy')
# The pandas Styler also requires jinja2.
Expand Down
3 changes: 3 additions & 0 deletions _pydevd_bundle/pydevd_comm.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,9 @@ def start_server(port):

try:
s.listen(1)
# Let the user know it's halted waiting for the connection.
pydev_log.critical("pydevd: waiting for connection at: %s:%s\n", *s.getsockname())

new_socket, _addr = s.accept()
pydev_log.info("Connection accepted")
# closing server socket is not necessary but we don't need it
Expand Down
60 changes: 55 additions & 5 deletions _pydevd_bundle/pydevd_net_command_factory_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
CMD_SET_NEXT_STATEMENT, CMD_THREAD_SUSPEND_SINGLE_NOTIFICATION, \
CMD_THREAD_RESUME_SINGLE_NOTIFICATION, CMD_THREAD_KILL, CMD_STOP_ON_START, CMD_INPUT_REQUESTED, \
CMD_EXIT, CMD_STEP_INTO_COROUTINE, CMD_STEP_RETURN_MY_CODE, CMD_SMART_STEP_INTO, \
CMD_SET_FUNCTION_BREAK
CMD_SET_FUNCTION_BREAK, CMD_THREAD_RUN
from _pydevd_bundle.pydevd_constants import get_thread_id, ForkSafeLock, DebugInfoHolder
from _pydevd_bundle.pydevd_net_command import NetCommand, NULL_NET_COMMAND
from _pydevd_bundle.pydevd_net_command_factory_xml import NetCommandFactory
Expand Down Expand Up @@ -435,12 +435,62 @@ def after_send(socket):
return cmd

@overrides(NetCommandFactory.make_thread_suspend_message)
def make_thread_suspend_message(self, *args, **kwargs):
return NULL_NET_COMMAND # Not a part of the debug adapter protocol
def make_thread_suspend_message(self, py_db, thread_id, frames_list, stop_reason, message, trace_suspend_type, thread, info):
from _pydevd_bundle.pydevd_comm_constants import CMD_THREAD_SUSPEND
if py_db.multi_threads_single_notification:
pydev_log.debug("Skipping per-thread thread suspend notification.")
return NULL_NET_COMMAND # Don't send per-thread, send a single one.
pydev_log.debug("Sending per-thread thread suspend notification (stop_reason: %s)", stop_reason)

exc_desc = None
exc_name = None
preserve_focus_hint = False
if stop_reason in self._STEP_REASONS:
if info.pydev_original_step_cmd == CMD_STOP_ON_START:

# Just to make sure that's not set as the original reason anymore.
info.pydev_original_step_cmd = -1
stop_reason = 'entry'
else:
stop_reason = 'step'
elif stop_reason in self._EXCEPTION_REASONS:
stop_reason = 'exception'
elif stop_reason == CMD_SET_BREAK:
stop_reason = 'breakpoint'
elif stop_reason == CMD_SET_FUNCTION_BREAK:
stop_reason = 'function breakpoint'
elif stop_reason == CMD_SET_NEXT_STATEMENT:
stop_reason = 'goto'
else:
stop_reason = 'pause'
preserve_focus_hint = True

if stop_reason == 'exception':
exception_info_response = build_exception_info_response(
py_db, thread_id, thread, -1, set_additional_thread_info, self._iter_visible_frames_info, max_frames=-1)
exception_info_response

exc_name = exception_info_response.body.exceptionId
exc_desc = exception_info_response.body.description

body = pydevd_schema.StoppedEventBody(
reason=stop_reason,
description=exc_desc,
threadId=thread_id,
text=exc_name,
allThreadsStopped=False,
preserveFocusHint=preserve_focus_hint,
)
event = pydevd_schema.StoppedEvent(body)
return NetCommand(CMD_THREAD_SUSPEND, 0, event, is_json=True)

@overrides(NetCommandFactory.make_thread_run_message)
def make_thread_run_message(self, *args, **kwargs):
return NULL_NET_COMMAND # Not a part of the debug adapter protocol
def make_thread_run_message(self, py_db, thread_id, reason):
if py_db.multi_threads_single_notification:
return NULL_NET_COMMAND # Don't send per-thread, send a single one.
body = ContinuedEventBody(threadId=thread_id, allThreadsContinued=False)
event = pydevd_schema.ContinuedEvent(body)
return NetCommand(CMD_THREAD_RUN, 0, event, is_json=True)

@overrides(NetCommandFactory.make_reloaded_code_message)
def make_reloaded_code_message(self, *args, **kwargs):
Expand Down
4 changes: 2 additions & 2 deletions _pydevd_bundle/pydevd_net_command_factory_xml.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ def make_thread_suspend_str(

return ''.join(cmd_text_list), thread_stack_str

def make_thread_suspend_message(self, py_db, thread_id, frames_list, stop_reason, message, trace_suspend_type):
def make_thread_suspend_message(self, py_db, thread_id, frames_list, stop_reason, message, trace_suspend_type, thread, additional_info):
try:
thread_suspend_str, thread_stack_str = self.make_thread_suspend_str(
py_db, thread_id, frames_list, stop_reason, message, trace_suspend_type)
Expand All @@ -307,7 +307,7 @@ def make_thread_resume_single_notification(self, thread_id):
except:
return self.make_error_message(0, get_exception_traceback_str())

def make_thread_run_message(self, thread_id, reason):
def make_thread_run_message(self, py_db, thread_id, reason):
try:
return NetCommand(CMD_THREAD_RUN, 0, "%s\t%s" % (thread_id, reason))
except:
Expand Down
25 changes: 17 additions & 8 deletions _pydevd_bundle/pydevd_process_net_command_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -541,12 +541,21 @@ def on_resumed():
cmd = NetCommand(CMD_RETURN, 0, response, is_json=True)
py_db.writer.add_command(cmd)

# Only send resumed notification when it has actually resumed!
# (otherwise the user could send a continue, receive the notification and then
# request a new pause which would be paused without sending any notification as
# it didn't really run in the first place).
py_db.threads_suspended_single_notification.add_on_resumed_callback(on_resumed)
self.api.request_resume_thread(thread_id)
if py_db.multi_threads_single_notification:

# Only send resumed notification when it has actually resumed!
# (otherwise the user could send a continue, receive the notification and then
# request a new pause which would be paused without sending any notification as
# it didn't really run in the first place).
py_db.threads_suspended_single_notification.add_on_resumed_callback(on_resumed)
self.api.request_resume_thread(thread_id)
else:
# Only send resumed notification when it has actually resumed!
# (otherwise the user could send a continue, receive the notification and then
# request a new pause which would be paused without sending any notification as
# it didn't really run in the first place).
self.api.request_resume_thread(thread_id)
on_resumed()

def on_next_request(self, py_db, request):
'''
Expand Down Expand Up @@ -735,7 +744,7 @@ def on_setfunctionbreakpoints_request(self, py_db, request):

arguments = request.arguments # : :type arguments: SetFunctionBreakpointsArguments
function_breakpoints = []
suspend_policy = 'ALL'
suspend_policy = 'ALL' if py_db.multi_threads_single_notification else 'NONE'

# Not currently covered by the DAP.
is_logpoint = False
Expand Down Expand Up @@ -775,7 +784,7 @@ def on_setbreakpoints_request(self, py_db, request):
self.api.remove_all_breakpoints(py_db, filename)

btype = 'python-line'
suspend_policy = 'ALL'
suspend_policy = 'ALL' if py_db.multi_threads_single_notification else 'NONE'

if not filename.lower().endswith('.py'): # Note: check based on original file, not mapping.
if self._options.django_debug:
Expand Down
6 changes: 3 additions & 3 deletions _pydevd_bundle/pydevd_suspended_frames.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ def __init__(self, suspended_frames_manager, py_db):
self._untracked = False

# We need to be thread-safe!
self._lock = ForkSafeLock()
self._lock = ForkSafeLock(rlock=True)

self._variable_reference_to_variable = {}

Expand Down Expand Up @@ -433,13 +433,13 @@ def find_frame(self, thread_id, frame_id):
with self._lock:
return self._frame_id_to_frame.get(frame_id)

def create_thread_suspend_command(self, thread_id, stop_reason, message, trace_suspend_type):
def create_thread_suspend_command(self, thread_id, stop_reason, message, trace_suspend_type, thread, additional_info):
with self._lock:
# First one is topmost frame suspended.
frames_list = self._thread_id_to_frames_list[thread_id]

cmd = self.py_db.cmd_factory.make_thread_suspend_message(
self.py_db, thread_id, frames_list, stop_reason, message, trace_suspend_type)
self.py_db, thread_id, frames_list, stop_reason, message, trace_suspend_type, thread, additional_info)

frames_list = None
return cmd
Expand Down
12 changes: 7 additions & 5 deletions pydevd.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,9 +461,11 @@ def send_suspend_notification(self, thread_id, thread, stop_reason):
@contextmanager
def notify_thread_suspended(self, thread_id, thread, stop_reason):
if self.multi_threads_single_notification:
pydev_log.info('Thread suspend mode: single notification')
with AbstractSingleNotificationBehavior.notify_thread_suspended(self, thread_id, thread, stop_reason):
yield
else:
pydev_log.info('Thread suspend mode: NOT single notification')
yield


Expand Down Expand Up @@ -2096,7 +2098,7 @@ def do_wait_suspend(self, thread, frame, event, arg, exception_type=None): # @U

with self.suspended_frames_manager.track_frames(self) as frames_tracker:
frames_tracker.track(thread_id, frames_list)
cmd = frames_tracker.create_thread_suspend_command(thread_id, stop_reason, message, trace_suspend_type)
cmd = frames_tracker.create_thread_suspend_command(thread_id, stop_reason, message, trace_suspend_type, thread, thread.additional_info)
self.writer.add_command(cmd)

with CustomFramesContainer.custom_frames_lock: # @UndefinedVariable
Expand All @@ -2111,7 +2113,7 @@ def do_wait_suspend(self, thread, frame, event, arg, exception_type=None): # @U
frame_custom_thread_id, custom_frame.name))

self.writer.add_command(
frames_tracker.create_thread_suspend_command(frame_custom_thread_id, CMD_THREAD_SUSPEND, "", trace_suspend_type))
frames_tracker.create_thread_suspend_command(frame_custom_thread_id, CMD_THREAD_SUSPEND, "", trace_suspend_type, thread, thread.additional_info))

from_this_thread.append(frame_custom_thread_id)

Expand Down Expand Up @@ -2228,7 +2230,7 @@ def _do_wait_suspend(self, thread, frame, event, arg, trace_suspend_type, from_t
if stop:
# Uninstall the current frames tracker before running it.
frames_tracker.untrack_all()
cmd = self.cmd_factory.make_thread_run_message(get_current_thread_id(thread), info.pydev_step_cmd)
cmd = self.cmd_factory.make_thread_run_message(self, get_current_thread_id(thread), info.pydev_step_cmd)
self.writer.add_command(cmd)
info.pydev_state = STATE_SUSPEND
thread.stop_reason = CMD_SET_NEXT_STATEMENT
Expand Down Expand Up @@ -2287,7 +2289,7 @@ def _do_wait_suspend(self, thread, frame, event, arg, trace_suspend_type, from_t
del f

del frame
cmd = self.cmd_factory.make_thread_run_message(get_current_thread_id(thread), info.pydev_step_cmd)
cmd = self.cmd_factory.make_thread_run_message(self, get_current_thread_id(thread), info.pydev_step_cmd)
self.writer.add_command(cmd)

with CustomFramesContainer.custom_frames_lock:
Expand Down Expand Up @@ -2862,7 +2864,7 @@ def settrace(
port=5678,
suspend=True,
trace_only_current_thread=False,
overwrite_prev_trace=False,
overwrite_prev_trace=False, # Deprecated
patch_multiprocessing=False,
stop_at_frame=None,
block_until_connected=True,
Expand Down
65 changes: 53 additions & 12 deletions tests_python/test_debugger_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
ExceptionOptions, Response, StoppedEvent, ContinuedEvent, ProcessEvent, InitializeRequest,
InitializeRequestArguments, TerminateArguments, TerminateRequest, TerminatedEvent,
FunctionBreakpoint, SetFunctionBreakpointsRequest, SetFunctionBreakpointsArguments,
BreakpointEvent, InitializedEvent)
BreakpointEvent, InitializedEvent, ContinueResponse)
from _pydevd_bundle.pydevd_comm_constants import file_system_encoding
from _pydevd_bundle.pydevd_constants import (int_types, IS_64BIT_PROCESS,
PY_VERSION_STR, PY_IMPL_VERSION_STR, PY_IMPL_NAME, IS_PY36_OR_GREATER,
Expand Down Expand Up @@ -85,7 +85,7 @@ def accept_json_message(msg):
msg = self.writer.wait_for_message(accept_json_message, unquote_msg=False, expect_xml=False)
return from_json(msg)

def wait_for_response(self, request, response_class=None):
def build_accept_response(self, request, response_class=None):
if response_class is None:
response_class = pydevd_base_schema.get_response_class(request)

Expand All @@ -98,7 +98,11 @@ def accept_message(response):
return True
return False

return self.wait_for_json_message((response_class, Response), accept_message)
return (response_class, Response), accept_message

def wait_for_response(self, request, response_class=None):
expected_classes, accept_message = self.build_accept_response(request, response_class)
return self.wait_for_json_message(expected_classes, accept_message)

def write_request(self, request):
seq = self.writer.next_seq()
Expand Down Expand Up @@ -337,19 +341,38 @@ def pop_variables_reference(self, lst):
references.append(reference)
return references

def wait_for_continued_event(self):
assert self.wait_for_json_message(ContinuedEvent).body.allThreadsContinued
def wait_for_continued_event(self, all_threads_continued=True):
ev = self.wait_for_json_message(ContinuedEvent)
assert ev.body.allThreadsContinued == all_threads_continued

def _by_type(self, *msgs):
ret = {}
for msg in msgs:
assert msg.__class__ not in ret
ret[msg.__class__] = msg
return ret

def write_continue(self, wait_for_response=True):
def write_continue(self, wait_for_response=True, thread_id='*'):
continue_request = self.write_request(
pydevd_schema.ContinueRequest(pydevd_schema.ContinueArguments('*')))
pydevd_schema.ContinueRequest(pydevd_schema.ContinueArguments(threadId=thread_id)))

if wait_for_response:
# The continued event is received before the response.
self.wait_for_continued_event()

continue_response = self.wait_for_response(continue_request)
assert continue_response.body.allThreadsContinued
if thread_id != '*':
# event, response may be sent in any order
msg1 = self.wait_for_json_message((ContinuedEvent, ContinueResponse))
msg2 = self.wait_for_json_message((ContinuedEvent, ContinueResponse))
by_type = self._by_type(msg1, msg2)
continued_ev = by_type[ContinuedEvent]
continue_response = by_type[ContinueResponse]
assert continue_response.request_seq == continue_request.seq

assert continued_ev.body.allThreadsContinued == False
assert continue_response.body.allThreadsContinued == False
else:
# The continued event is received before the response.
self.wait_for_continued_event(all_threads_continued=True)
continue_response = self.wait_for_response(continue_request)
assert continue_response.body.allThreadsContinued

def write_pause(self):
pause_request = self.write_request(
Expand Down Expand Up @@ -723,6 +746,24 @@ def test_case_json_change_breaks(case_setup_dap):
writer.finished_ok = True


def test_case_json_suspend_notification(case_setup_dap):
with case_setup_dap.test_file('_debugger_case_change_breaks.py') as writer:
json_facade = JsonFacade(writer)
json_facade.writer.write_multi_threads_single_notification(False)
break1_line = writer.get_line_index_with_content('break 1')
json_facade.write_launch()
json_facade.write_set_breakpoints(break1_line)
json_facade.write_make_initial_run()

json_hit = json_facade.wait_for_thread_stopped(line=break1_line)
json_facade.write_continue(thread_id=json_hit.thread_id)

json_hit = json_facade.wait_for_thread_stopped(line=break1_line)
json_facade.write_continue(thread_id=json_hit.thread_id, wait_for_response=False)

writer.finished_ok = True


def test_case_handled_exception_no_break_on_generator(case_setup_dap):
with case_setup_dap.test_file('_debugger_case_ignore_exceptions.py') as writer:
json_facade = JsonFacade(writer)
Expand Down

0 comments on commit 08cafa3

Please sign in to comment.