-
Notifications
You must be signed in to change notification settings - Fork 13
/
server.py
637 lines (507 loc) · 22.2 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
from __future__ import annotations
import functools
import logging
import pathlib
import re
from typing import Any
import lsprotocol.types as lsp_types
import yaramod
from lsprotocol.types import INITIALIZED
from lsprotocol.types import TEXT_DOCUMENT_CODE_ACTION
from lsprotocol.types import TEXT_DOCUMENT_CODE_LENS
from lsprotocol.types import TEXT_DOCUMENT_COMPLETION
from lsprotocol.types import TEXT_DOCUMENT_DEFINITION
from lsprotocol.types import TEXT_DOCUMENT_DID_CHANGE
from lsprotocol.types import TEXT_DOCUMENT_DID_OPEN
from lsprotocol.types import TEXT_DOCUMENT_DID_SAVE
from lsprotocol.types import TEXT_DOCUMENT_DOCUMENT_HIGHLIGHT
from lsprotocol.types import TEXT_DOCUMENT_DOCUMENT_SYMBOL
from lsprotocol.types import TEXT_DOCUMENT_FORMATTING
from lsprotocol.types import TEXT_DOCUMENT_HOVER
from lsprotocol.types import TEXT_DOCUMENT_REFERENCES
from lsprotocol.types import TEXT_DOCUMENT_SIGNATURE_HELP
from pygls.server import LanguageServer
from pygls.uris import from_fs_path
from yls import code_actions
from yls import icons
from yls import linting
from yls import utils
from yls.completer import Completer
from yls.hookspecs import ErrorMessage
from yls.hover import Hoverer
from yls.plugin_manager_provider import PluginManagerProvider
from yls.version import __version__
from yls.yaramod_provider import YaramodProvider
log = logging.getLogger(__name__)
utils.setup_logging()
# NOTE: All yaramod parsing can fail, create wrapper
# if we fail fallback to cached YaraFile (last valid)
class YaraLanguageServer(LanguageServer):
"""YaraLanguageServer implementation."""
COMMAND_SCAN = "yls.scan"
COMMAND_SCAN_ALL = "yls.scan_all"
COMMAND_EVAL_SET_CONTEXT = "yls.eval_set_context"
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
log.debug("[__init__] LanguageServer constructor")
# Yaramod parser provider
self.ymod = YaramodProvider.instance()
# Completion manager
self.completer = Completer(self)
# Hover manager
self.hoverer = Hoverer(self)
# Show debugging information on hover
self.debug_hover = False
# Lint the file when editing the document
self.did_change_lint = False
# Cached version of yara file used for completions
# NOTE: We need to create dict not only one variable
self.last_valid_yara_file: yaramod.YaraFile | None = None
self.register_plugin_commands()
def register_plugin_commands(self) -> None:
"""Initialize the custom commands from plugins."""
async def command_wrapper(command: str, ls: YaraLanguageServer, params: Any) -> Any:
"""Generic command hadler that delegates the custom commands to plugins."""
log.info(f"Executing registered {command=} from plugins with {params=}")
return await utils.pluggy_results(
PluginManagerProvider.instance().hook.yls_execute_command(
command=command, ls=ls, params=params
)
)
# Get the list of commands to register
commands_to_register = utils.flatten_list(
PluginManagerProvider.instance().hook.yls_supported_commands(ls=self)
)
for command in commands_to_register:
func = functools.partial(command_wrapper, command)
self.command(command)(func)
def get_current_rule(
self, uri: str, position: lsp_types.Position, yara_file: yaramod.YaraFile | None = None
) -> yaramod.Rule | None:
"""Get the last defined rule for given cursor position."""
doc_path = pathlib.Path(self.workspace.get_document(uri).path)
log.debug(f"[GET_CURRENT_RULE] Position={position}, Document={doc_path}")
parsed_yfile = yara_file or utils.yaramod_parse_file(str(doc_path))
if parsed_yfile is None:
return None
last_rule = None
for rule in parsed_yfile.rules:
if (
rule.location.file_path
and doc_path.exists()
and doc_path.samefile(rule.location.file_path)
and rule.location.begin.line > position.line
):
return last_rule
last_rule = rule
return last_rule
def get_references(
self, uri: str, word: str, cursor: lsp_types.Position | None = None
) -> list[lsp_types.Location]:
"""Return list of locations that textually match given word."""
text_doc = self.workspace.get_document(uri)
source = text_doc.source
res = []
for m in re.finditer(re.escape(word), source):
offset = m.start()
line_counter = 0
match_len = m.end() - m.start()
for line in source.splitlines(keepends=True):
line_len = len(line)
if offset >= line_len:
offset -= line_len
line_counter += 1
else:
# NOTE: Also we should check the character
if cursor and cursor.line == line_counter:
# Ignore this match
break
res.append(
lsp_types.Location(
uri=uri,
range=utils.range_from_coords(
(line_counter, offset), (line_counter, offset + match_len)
),
)
)
break
return res
def show_error_message(self, msg: str, log_msg: str | None = None) -> None:
"""Show error message to user (also log it)."""
log.error(f"[SHOW_ERROR_MESSAGE] {log_msg or msg}")
self.show_message(msg, lsp_types.MessageType.Error)
# The main YLS Server instance
SERVER = YaraLanguageServer("yara-language-server", f"v{__version__}")
@SERVER.feature(INITIALIZED)
def initiliazed(_ls: YaraLanguageServer, _params: Any) -> None:
"""Connection is initialized."""
utils.log_command(INITIALIZED)
log.debug("[INITIALIZED] Connection was established")
# NOTE: In the future we can parse all files in the workspace
@SERVER.feature(TEXT_DOCUMENT_COMPLETION, lsp_types.CompletionOptions(trigger_characters=["."]))
def completion(
ls: YaraLanguageServer, params: lsp_types.CompletionParams
) -> lsp_types.CompletionList:
"""Code completion."""
utils.log_command(TEXT_DOCUMENT_COMPLETION)
return ls.completer.complete(params)
@SERVER.feature(
TEXT_DOCUMENT_SIGNATURE_HELP, lsp_types.SignatureHelpOptions(trigger_characters=["("])
)
def signature_help(
ls: YaraLanguageServer, params: lsp_types.CompletionParams
) -> lsp_types.SignatureHelp | None:
"""Signature help."""
utils.log_command(TEXT_DOCUMENT_SIGNATURE_HELP)
return ls.completer.signature_help(params)
@SERVER.feature(TEXT_DOCUMENT_HOVER)
async def hover(
ls: YaraLanguageServer, params: lsp_types.TextDocumentPositionParams
) -> lsp_types.Hover | None:
"""Cursor over information."""
utils.log_command(TEXT_DOCUMENT_HOVER)
return await ls.hoverer.hover(params)
def lint(
ls: YaraLanguageServer,
params: (
lsp_types.DidOpenTextDocumentParams
| lsp_types.DidChangeTextDocumentParams
| lsp_types.DidSaveTextDocumentParams
),
) -> None:
"""Lint and publish diagnostics."""
# NOTE: DiagnosticRelatedInformation can be used to show the redefinition of a rule/string
# Represents a related message and source code location for a diagnostic. This should be
# used to point to code locations that cause or are related to a diagnostics, e.g when
# duplicating a symbol in a scope.
text_doc = ls.workspace.get_document(params.text_document.uri)
log.info(f'[LINT] Lint request start for file "{text_doc.path}"')
# Save the last valid yarafile, that can be used as a fallback in subsequent requests
yara_file = utils.yaramod_parse_file(text_doc.path)
if yara_file is not None:
ls.last_valid_yara_file = yara_file
diag = linting.get_diagnostics(text_doc)
ls.publish_diagnostics(text_doc.uri, diag)
@SERVER.feature(TEXT_DOCUMENT_DID_OPEN)
def did_open(ls: YaraLanguageServer, params: lsp_types.DidOpenTextDocumentParams) -> None:
"""The document was opened."""
utils.log_command(TEXT_DOCUMENT_DID_OPEN)
lint(ls, params)
@SERVER.feature(TEXT_DOCUMENT_DID_SAVE)
def did_save(ls: YaraLanguageServer, params: lsp_types.DidSaveTextDocumentParams) -> None:
"""The document was saved."""
utils.log_command(TEXT_DOCUMENT_DID_SAVE)
lint(ls, params)
@SERVER.feature(TEXT_DOCUMENT_DID_CHANGE)
def did_change(ls: YaraLanguageServer, params: lsp_types.DidChangeTextDocumentParams) -> None:
"""The document was changed."""
utils.log_command(TEXT_DOCUMENT_DID_CHANGE)
if ls.did_change_lint:
lint(ls, params)
@SERVER.feature(TEXT_DOCUMENT_FORMATTING)
def formatting(
ls: YaraLanguageServer, params: lsp_types.DocumentFormattingParams
) -> list[lsp_types.TextEdit]:
"""Format the whole buffer."""
utils.log_command(TEXT_DOCUMENT_FORMATTING)
document = ls.workspace.get_document(params.text_document.uri)
source = document.source
path_str = document.path
path = pathlib.Path(path_str)
# Check if the file exists also on the filesystem, not only in the editor buffer.
if not path.exists():
ls.show_message("Please save the file before formatting.", lsp_types.MessageType.Warning)
return []
# Open current file from the disk using `binary` mode in order to preserve
# original new-lines of the file.
with open(path_str, "rb") as disk_yfile:
disk_source = disk_yfile.read()
# Compare contents with the editor buffer converted to bytes, in case they
# are not the same the file was not saved to the disk and we should not proceed
if source.encode() != disk_source:
ls.show_message("Please save the file before formatting.", lsp_types.MessageType.Warning)
return []
res: list[lsp_types.TextEdit] = PluginManagerProvider.instance().hook.yls_formatting(
ls=ls, params=params, document=document
)
return res
@SERVER.feature(TEXT_DOCUMENT_DEFINITION)
def definition(
ls: YaraLanguageServer, params: lsp_types.TextDocumentPositionParams
) -> list[lsp_types.Location] | None:
"""Jump to definition."""
utils.log_command(TEXT_DOCUMENT_DEFINITION)
text_doc = ls.workspace.get_document(params.text_document.uri)
token = utils.cursor_token(text_doc, params.position)
if not token:
return None
res = []
log.debug(f'[DEFINITION] Cursor in on token "{token}" with type "{token.type}"')
if token.type == yaramod.TokenType.StringId:
log.debug(f'[DEFINITION] Searching for "{token.text}" string definition')
rule = ls.get_current_rule(params.text_document.uri, params.position)
if rule is None:
return None
log.debug('[DEFINITION] Current rule "{rule}" with name "{rule.name}"')
for string in rule.strings:
if string.identifier == token.pure_text:
res.append(
lsp_types.Location(
uri=params.text_document.uri,
range=utils.range_from_coords(
(string.location.begin.line - 1, string.location.begin.column - 1),
(string.location.end.line - 1, string.location.begin.column - 1),
),
)
)
# Since strings cannot be redefined in a single rule there is
# no reason to continue this loop and immediately return
break
elif token.type == yaramod.TokenType.Id:
log.debug(f'[DEFINITION] Searching for "{token.text}" rule definition')
path = ls.workspace.get_document(params.text_document.uri).path
yara_file = utils.yaramod_parse_file(path)
if yara_file is None:
return []
for rule in yara_file.rules:
if rule.name == token.pure_text:
res.append(
lsp_types.Location(
uri=from_fs_path(rule.location.file_path),
range=utils.range_from_coords(
(rule.location.begin.line - 1, rule.location.begin.column - 1),
(rule.location.end.line - 1, rule.location.end.column),
),
)
)
return res
@SERVER.feature(TEXT_DOCUMENT_REFERENCES)
def references(
ls: YaraLanguageServer, params: lsp_types.ReferenceParams
) -> list[lsp_types.Location]:
"""Provide a list of references for the object under the cursor."""
utils.log_command(TEXT_DOCUMENT_REFERENCES)
text_doc = ls.workspace.get_document(params.text_document.uri)
cursor_string = utils.cursor_word(text_doc, params.position)
log.debug(f'[REFERENCES] Searching for references of "{cursor_string}"')
if cursor_string:
return ls.get_references(params.text_document.uri, cursor_string, cursor=params.position)
return []
@SERVER.feature(TEXT_DOCUMENT_DOCUMENT_HIGHLIGHT)
def document_highlight(
ls: YaraLanguageServer, params: lsp_types.TextDocumentPositionParams
) -> list[lsp_types.DocumentHighlight]:
"""Highlight references of the object under the cursor."""
utils.log_command(TEXT_DOCUMENT_DOCUMENT_HIGHLIGHT)
text_doc = ls.workspace.get_document(params.text_document.uri)
token = utils.cursor_token(text_doc, params.position)
if not token:
return []
if token.type not in [
yaramod.TokenType.Id,
yaramod.TokenType.RuleName,
yaramod.TokenType.StringId,
yaramod.TokenType.StringIdAfterNewline,
]:
return []
refs = ls.get_references(params.text_document.uri, token.pure_text)
res = [
lsp_types.DocumentHighlight(
range=utils.range_from_coords(
(ref.range.start.line, ref.range.start.character),
(ref.range.end.line, ref.range.end.character),
)
)
for ref in refs
]
return res
# NOTE: For now unimplemented
# @SERVER.feature(WORKSPACE_SYMBOL)
# def workspace_symbol(
# ls: YaraLanguageServer, params: lsp_types.WorkspaceSymbolParams
# ) -> Optional[List[lsp_types.SymbolInformation]]:
# """Create list of symbols in the current workspace."""
# utils.log_command(WORKSPACE_SYMBOL)
# # NOTE: This needs to find all *.yar files in the workspace and provide information on them
# return None
@SERVER.feature(TEXT_DOCUMENT_DOCUMENT_SYMBOL)
def document_symbol(
ls: YaraLanguageServer, params: lsp_types.DocumentSymbolParams
) -> list[lsp_types.DocumentSymbol]:
"""Provide a list of symbols in the current document."""
utils.log_command(TEXT_DOCUMENT_DOCUMENT_SYMBOL)
text_doc = ls.workspace.get_document(params.text_document.uri)
res: list[lsp_types.DocumentSymbol] = []
parsed_yfile = utils.yaramod_parse_file(text_doc.path)
if parsed_yfile is None:
return []
for rule in utils.yaramod_rules_in_file(parsed_yfile):
# Construct the string information for this rule. This is necessary because yls is
# creating the whole document symbol hierarchy instead of flat list
rule_strings: list[lsp_types.DocumentSymbol] = []
for string in rule.strings:
rule_strings.append(
lsp_types.DocumentSymbol(
name=string.identifier,
detail=string.text,
kind=lsp_types.SymbolKind.String,
range=utils.range_from_coords(
(string.location.begin.line - 1, string.location.begin.column - 1),
(string.location.end.line, string.location.end.column),
),
selection_range=utils.range_from_coords(
(string.location.begin.line - 1, string.location.begin.column - 1),
(string.location.end.line, string.location.end.column),
),
)
)
res.append(
lsp_types.DocumentSymbol(
name=rule.name,
kind=lsp_types.SymbolKind.Class,
range=utils.range_from_coords(
(rule.location.begin.line - 1, rule.location.begin.column),
(rule.location.end.line - 1, rule.location.end.column),
),
selection_range=utils.range_from_coords(
(rule.location.begin.line - 1, rule.location.begin.column),
(rule.location.begin.line - 1, rule.location.begin.column),
),
children=rule_strings,
)
)
return res
@SERVER.feature(TEXT_DOCUMENT_CODE_ACTION)
def code_action(
ls: YaraLanguageServer, params: lsp_types.CodeActionParams
) -> list[lsp_types.Command | lsp_types.CodeAction] | None:
"""List available code actions for given context."""
utils.log_command(TEXT_DOCUMENT_CODE_ACTION)
# Provide hoverer service with selected text range context
ls.hoverer.selected_range = params.range
return code_actions.from_params(ls, params)
def code_lens_eval(yara_file: yaramod.YaraFile) -> list[lsp_types.CodeLens]:
"""Create evaluation code lenses from YaraFile."""
yls_eval_enabled = any(PluginManagerProvider.instance().hook.yls_eval_enabled())
if not yls_eval_enabled:
return []
res = []
for rule in utils.yaramod_rules_in_file(yara_file):
for meta in rule.metas:
# Consider only hash metas that have valid hash as value
if meta.key != "hash" or not utils.is_hash(meta.value.pure_text):
continue
# Create code lens for debugging
lens = lsp_types.CodeLens(
range=utils.range_from_coords(
(meta.token_key.location.begin.line - 1, meta.token_key.location.begin.column),
(meta.token_value.location.end.line - 1, meta.token_value.location.end.column),
),
command=lsp_types.Command(
title=f"{icons.SEARCH} Select hash for context",
command=YaraLanguageServer.COMMAND_EVAL_SET_CONTEXT,
arguments=[
meta.value.pure_text,
utils.extract_rule_context_from_yarafile(yara_file, rule),
],
),
)
res.append(lens)
return res
def code_lens_scan(yara_file: yaramod.YaraFile, uri: str) -> list[lsp_types.CodeLens]:
"""Create scannig code lenses from YaraFile."""
yls_scan_enabled = any(PluginManagerProvider.instance().hook.yls_scan_enabled())
if not yls_scan_enabled:
return []
res = []
for rule in utils.yaramod_rules_in_file(yara_file):
if not utils.yaramod_rule_has_hashes(rule):
continue
# Create code lens for scans
lens_scan = lsp_types.CodeLens(
range=utils.range_from_yaramod_token(rule.token_first),
command=lsp_types.Command(
title=f"{icons.PLAY} Scan",
command=YaraLanguageServer.COMMAND_SCAN,
arguments=[uri, rule.name],
),
)
res.append(lens_scan)
return res
@SERVER.feature(TEXT_DOCUMENT_CODE_LENS)
def code_lens(ls: YaraLanguageServer, params: lsp_types.CodeLensParams) -> list[lsp_types.CodeLens]:
utils.log_command(TEXT_DOCUMENT_CODE_LENS)
text_doc = ls.workspace.get_document(params.text_document.uri)
path = text_doc.path
yara_file = utils.yaramod_parse_file(path)
if yara_file is None:
return []
res = []
res.extend(
utils.flatten_list(
PluginManagerProvider.instance().hook.yls_code_lens(ls=ls, params=params)
)
)
res.extend(code_lens_eval(yara_file))
res.extend(code_lens_scan(yara_file, params.text_document.uri))
return res
@SERVER.command(YaraLanguageServer.COMMAND_SCAN)
async def command_scan(ls: YaraLanguageServer, args: list[Any]) -> None:
utils.log_command(YaraLanguageServer.COMMAND_SCAN)
log.debug(f"{args=}")
if len(args) != 2:
return
file_uri = args[0]
rule_name = args[1]
document = ls.workspace.get_document(file_uri)
scan_results = await utils.pluggy_results(
PluginManagerProvider.instance().hook.yls_scan(
ls=ls, document=document, rule_name=rule_name
)
)
for diagnostics in scan_results:
ls.publish_diagnostics(document.uri, diagnostics)
@SERVER.command(YaraLanguageServer.COMMAND_SCAN_ALL)
async def command_scan_all(ls: YaraLanguageServer, args: list[Any]) -> None:
utils.log_command(YaraLanguageServer.COMMAND_SCAN_ALL)
log.debug(f"{args=}")
if len(args) != 1:
return
file_uri = args[0]
document = ls.workspace.get_document(file_uri)
scan_results = await utils.pluggy_results(
PluginManagerProvider.instance().hook.yls_scan(ls=ls, document=document, rule_name=None)
)
for diagnostics in scan_results:
ls.publish_diagnostics(document.uri, diagnostics)
@SERVER.command(YaraLanguageServer.COMMAND_EVAL_SET_CONTEXT)
async def command_eval_set_context(ls: YaraLanguageServer, args: list[Any]) -> None:
utils.log_command(YaraLanguageServer.COMMAND_EVAL_SET_CONTEXT)
log.debug(f"{args=}")
if len(args) != 2 or not utils.is_hash(args[0]):
return
_hash = args[0]
ruleset = args[1]
res_set_contexts = await utils.pluggy_results(
PluginManagerProvider.instance().hook.yls_eval_set_context(
ls=ls, _hash=_hash, ruleset=ruleset
)
)
res_success = next((res for res in res_set_contexts if not isinstance(res, ErrorMessage)), None)
if res_success:
res_success.show(ls)
else:
for i, res_set_context in enumerate(res_set_contexts):
if len(res_set_contexts) > 1:
res_set_context.message = f"{utils.DEBUGGER_SOURCES[i]}: {res_set_context.message}"
log.debug(res_set_context.message)
res_set_context.show(ls)
def main() -> None:
"""YLS entry point."""
# Parse command line arguments
args = utils.create_options_parser().parse_args()
utils.set_logging_level(args.verbose)
utils.logging_prolog(PluginManagerProvider.instance())
SERVER.start_io()
if __name__ == "__main__":
main()