/
line_data.py
222 lines (189 loc) · 9.16 KB
/
line_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
import contextlib
import re
from typing import Any, Dict, Optional, Tuple
from credsweeper.config import Config
from credsweeper.utils import Util
from credsweeper.utils.entropy_validator import EntropyValidator
class LineData:
"""Object to treat and store scanned line related data.
Parameters:
key: Optional[str] = None
line: string variable, line
line_num: int variable, number of line in file
path: string variable, path to file
file_type: string variable, extension of file '.txt'
info: additional info about how the data was detected
pattern: regex pattern, detected pattern in line
separator: optional string variable, separators between variable and value
separator_start: optional variable, separator position start
value: optional string variable, detected value in line
variable: optional string variable, detected variable in line
"""
comment_starts = ["//", "*", "#", "/*", "<!––", "%{", "%", "...", "(*", "--", "--[[", "#="]
bash_param_split = re.compile("\\s+(\\-|\\||\\>|\\w+?\\>|\\&)")
# some symbols e.g. double quotes cannot be in URL string https://www.ietf.org/rfc/rfc1738.txt
# \ - was added for case of url in escaped string \u0026amp; - means escaped & in HTML
url_detect_regex = re.compile(r".*\w{3,33}://[\w;,/?:@&=+$%.!~*'()#\\-]+$")
INITIAL_WRONG_POSITION = -3
EXCEPTION_POSITION = -2
def __init__(
self, #
config: Config, #
line: str, #
line_pos: int, #
line_num: int, #
path: str, #
file_type: str, #
info: str, #
pattern: re.Pattern, #
match_obj: Optional[re.Match] = None) -> None:
self.config = config
self.line: str = line
self.line_pos: int = line_pos
self.line_num: int = line_num
self.path: str = path
self.file_type: str = file_type
self.info: str = info
self.pattern: re.Pattern = pattern
# do not store match object due it cannot be pickled with multiprocessing
# start - end position of matched object
self.value_start = LineData.INITIAL_WRONG_POSITION
self.value_end = LineData.INITIAL_WRONG_POSITION
self.key: Optional[str] = None
self.separator: Optional[str] = None
self.separator_start: int = LineData.INITIAL_WRONG_POSITION
self.separator_end: int = LineData.INITIAL_WRONG_POSITION
self.value: Optional[str] = None
self.variable: Optional[str] = None
self.variable_start = LineData.INITIAL_WRONG_POSITION
self.variable_end = LineData.INITIAL_WRONG_POSITION
self.value_leftquote: Optional[str] = None
self.value_rightquote: Optional[str] = None
self.initialize(match_obj)
def initialize(self, match_obj: Optional[re.Match] = None) -> None:
"""Apply regex to the candidate line and set internal fields based on match."""
if not isinstance(match_obj, re.Match) and isinstance(self.pattern, re.Pattern):
match_obj = self.pattern.search(self.line)
if match_obj is None:
return
def get_group_from_match_obj(_match_obj: re.Match, group: str) -> Any:
with contextlib.suppress(Exception):
return _match_obj.group(group)
return None
def get_span_from_match_obj(_match_obj: re.Match, group: str) -> Tuple[int, int]:
with contextlib.suppress(Exception):
span = _match_obj.span(group)
return span[0], span[1]
return LineData.EXCEPTION_POSITION, LineData.EXCEPTION_POSITION
self.key = get_group_from_match_obj(match_obj, "keyword")
self.separator = get_group_from_match_obj(match_obj, "separator")
self.separator_start, self.separator_end = get_span_from_match_obj(match_obj, "separator")
self.value = get_group_from_match_obj(match_obj, "value")
self.value_start, self.value_end = get_span_from_match_obj(match_obj, "value")
self.variable = get_group_from_match_obj(match_obj, "variable")
self.variable_start, self.variable_end = get_span_from_match_obj(match_obj, "variable")
self.value_leftquote = get_group_from_match_obj(match_obj, "value_leftquote")
self.value_rightquote = get_group_from_match_obj(match_obj, "value_rightquote")
self.sanitize_value()
self.sanitize_variable()
def sanitize_value(self):
"""Clean found value from extra artifacts"""
_value = self.value
self.clean_url_parameters()
self.clean_bash_parameters()
self.check_value_pos(_value)
def check_value_pos(self, value: str) -> None:
"""checks and corrects value_start, value_end in case of self.value was shrink"""
if 0 <= self.value_start and 0 <= self.value_end and len(self.value) < len(value):
start = value.find(self.value)
self.value_start += start
self.value_end = self.value_start + len(self.value)
def clean_url_parameters(self) -> None:
"""Clean url address from 'query parameters'.
If line seem to be a URL - split by & character.
Variable should be right most value after & or ? ([-1]). And value should be left most before & ([0])
"""
line_before_value = self.line[:self.value_start]
if self.url_detect_regex.match(line_before_value):
if self.variable:
self.variable = self.variable.split('&')[-1].split('?')[-1].split(';')[-1]
if self.value:
self.value = self.value.split('&')[0].split(';')[0]
def clean_bash_parameters(self) -> None:
"""Split variable and value by bash special characters, if line assumed to be CLI command."""
if self.variable and self.variable.startswith("-") and self.value:
value_spl = self.bash_param_split.split(self.value)
# If variable name starts with `-` (usual case for args in CLI)
# and value can be split by bash special characters
if len(value_spl) > 1:
self.value = value_spl[0]
def sanitize_variable(self) -> None:
"""Remove trailing spaces, dashes and quotations around the variable."""
sanitized_var_len = 0
while self.variable and sanitized_var_len != len(self.variable):
sanitized_var_len = len(self.variable)
# Remove trailing \s. Can happen if there are \s between variable and `=` character
self.variable = self.variable.strip()
# Remove trailing `-` at the variable name start. Usual case for CLI commands
self.variable = self.variable.strip("-")
# Remove trailing `'"`. Usual case for JSON data
self.variable = self.variable.strip('"')
self.variable = self.variable.strip("'")
def is_comment(self) -> bool:
"""Check if line with credential is a comment.
Return:
True if line is a comment, False otherwise
"""
cleaned_line = self.line.strip()
for comment_start in self.comment_starts:
if cleaned_line.startswith(comment_start):
return True
return False
def is_source_file(self) -> bool:
"""Check if file with credential is a source code file or not (data, log, plain text).
Return:
True if file is source file, False otherwise
"""
if not self.path:
return False
if Util.get_extension(self.path) in self.config.source_extensions:
return True
return False
def is_source_file_with_quotes(self) -> bool:
"""Check if file with credential require quotation for string literals.
Return:
True if file require quotation, False otherwise
"""
if not self.path:
return False
if Util.get_extension(self.path) in self.config.source_quote_ext:
return True
return False
def __repr__(self) -> str:
return f"line: '{self.line}' | line_num: {self.line_num} | path: {self.path}" \
f" | value: '{self.value}' | entropy_validation: {EntropyValidator(self.value)}"
def to_json(self) -> Dict:
"""Convert line data object to dictionary.
Return:
Dictionary object generated from current line data
"""
full_output = {
"key": self.key,
"line": self.line,
"line_num": self.line_num,
"path": self.path,
"info": self.info,
"pattern": self.pattern.pattern,
"separator": self.separator,
"separator_start": self.separator_start,
"separator_end": self.separator_end,
"value": self.value,
"value_start": self.value_start,
"value_end": self.value_end,
"variable": self.variable,
"value_leftquote": self.value_leftquote,
"value_rightquote": self.value_rightquote,
"entropy_validation": EntropyValidator(self.value).to_dict()
}
reported_output = {k: v for k, v in full_output.items() if k in self.config.line_data_output}
return reported_output