len(set(z)):
pout("options_duplicates", url, escape(allow), noisymsg=True)
@@ -488,18 +488,18 @@ def test_optionsbleed(url):
@DEFAULT
def test_privatekey(url):
- hostkey = re.sub('^www.', '', re.sub('(.*//|/.*)', "", url)) + ".key"
+ hostkey = re.sub("^www.", "", re.sub("(.*//|/.*)", "", url)) + ".key"
wwwkey = "www." + hostkey
for fn in ["server.key", "privatekey.key", "myserver.key", "key.pem",
hostkey, wwwkey]:
r = fetcher(url + "/" + fn)
- if 'BEGIN PRIVATE KEY' in r:
+ if "BEGIN PRIVATE KEY" in r:
pout("privatekey_pkcs8", f"{url}/{fn}")
- if 'BEGIN RSA PRIVATE KEY' in r:
+ if "BEGIN RSA PRIVATE KEY" in r:
pout("privatekey_rsa", f"{url}/{fn}")
- if 'BEGIN DSA PRIVATE KEY' in r:
+ if "BEGIN DSA PRIVATE KEY" in r:
pout("privatekey_dsa", f"{url}/{fn}")
- if 'BEGIN EC PRIVATE KEY' in r:
+ if "BEGIN EC PRIVATE KEY" in r:
pout("privatekey_ec", f"{url}/{fn}")
@@ -507,7 +507,7 @@ def test_privatekey(url):
def test_sshkey(url):
for fn in ["id_rsa", "id_dsa", ".ssh/id_rsa", ".ssh/id_dsa"]:
r = fetcher(url + "/" + fn)
- if 'BEGIN' in r and 'PRIVATE KEY' in r:
+ if "BEGIN" in r and "PRIVATE KEY" in r:
pout("sshkey", f"{url}/{fn}")
@@ -522,7 +522,7 @@ def test_dotenv(url):
def test_invalidsrc(url):
r = getmainpage(url)
try:
- p = bs4.BeautifulSoup(r, 'html.parser')
+ p = bs4.BeautifulSoup(r, "html.parser")
except (NotImplementedError, TypeError, AssertionError):
# This is due to a python bug, please remove in the future.
# https://bugs.python.org/issue32876
@@ -530,7 +530,7 @@ def test_invalidsrc(url):
return
srcs = []
for tag in p.findAll(attrs={"src": True}):
- srcs.append(tag['src'])
+ srcs.append(tag["src"])
srcs = list(set(srcs))
srcs.sort()
@@ -545,7 +545,7 @@ def test_invalidsrc(url):
continue
if domain is None:
continue
- if protocol not in ['https', 'http']:
+ if protocol not in ["https", "http"]:
continue
# We avoid double-checking multiple requests to the same host.
@@ -560,7 +560,7 @@ def test_invalidsrc(url):
continue
try:
- r = pool.request('GET', realurl, retries=False, redirect=False)
+ r = pool.request("GET", realurl, retries=False, redirect=False)
if r.status >= 400:
pout("invalidsrc_http", url, escape(src))
except UnicodeEncodeError:
@@ -573,18 +573,18 @@ def test_invalidsrc(url):
@DEFAULT
def test_ilias_defaultpw(url):
getmainpage(url)
- if (url + "/ilias.php" in mainpage_cache[url]['location']
- or (url + "/login.php" in mainpage_cache[url]['location']
- and 'powered by ILIAS' in fetcher(mainpage_cache[url]['location']))):
+ if (url + "/ilias.php" in mainpage_cache[url]["location"]
+ or (url + "/login.php" in mainpage_cache[url]["location"]
+ and "powered by ILIAS" in fetcher(mainpage_cache[url]["location"]))):
# we're confident we found an ILIAS installation
pdebug("Ilias found")
try:
login = pool.request("POST", url + "/ilias.php?cmd=post&baseClass=ilStartUpGUI",
- fields={'username': 'root',
- 'password': 'homer',
- 'cmd[doStandardAuthentication]': 'Login'},
- headers={'Cookie': 'iltest=;PHPSESSID=' + randstring()})
- data = login.data.decode('ascii', errors='ignore')
+ fields={"username": "root",
+ "password": "homer",
+ "cmd[doStandardAuthentication]": "Login"},
+ headers={"Cookie": "iltest=;PHPSESSID=" + randstring()})
+ data = login.data.decode("ascii", errors="ignore")
if (('class="ilFailureMessage"' not in data)
and ('name="il_message_focus"' not in data)
and (('class="ilBlockContent"' in data)
@@ -600,7 +600,7 @@ def test_cgiecho(url):
for pre in [url + "/cgi-bin/cgiecho", url + "/cgi-sys/cgiecho"]:
try:
r = pool.request("GET", pre + "/" + randstring())
- if r.status == 500 and 'cgiemail' in r.data.decode('ascii', errors='ignore'):
+ if r.status == 500 and "cgiemail" in r.data.decode("ascii", errors="ignore"):
pout("cgiecho", pre)
except (ConnectionRefusedError, ConnectionResetError,
urllib3.exceptions.HTTPError):
@@ -612,7 +612,7 @@ def test_phpunit_eval(url):
try:
r = pool.request("POST", url + "/vendor/phpunit/phpunit/src/Util/PHP/eval-stdin.php",
body='" + reflect,
+ r = pool.request("GET", url + "/.well-known/acme-challenge/" + reflect,
retries=False, redirect=False)
- if not r.data.decode('ascii', errors='ignore').startswith("" + reflect):
+ if not r.data.decode("ascii", errors="ignore").startswith("" + reflect):
return
headers = {k.lower(): v for k, v in r.headers.items()}
- if ('content-type' in headers) and headers['content-type'].startswith("text/plain"):
+ if ("content-type" in headers) and headers["content-type"].startswith("text/plain"):
return
pout("acmereflect", url + "/.well-known/acme-challenge/reflect")
@@ -641,7 +641,7 @@ def test_acmereflect(url):
@DEFAULT
def test_drupaldb(url):
r = fetchpartial(url + "/sites/default/files/.ht.sqlite", 20, binary=True)
- if r and r[0:13] == b'SQLite format':
+ if r and r[0:13] == b"SQLite format":
pout("drupaldb", url + "/sites/default/files/.ht.sqlite")
@@ -649,8 +649,8 @@ def test_drupaldb(url):
def test_phpwarnings(url):
try:
r = pool.request("GET", url, headers={"Cookie": "PHPSESSID=in_vålíd"})
- if ('The session id is too long or contains illegal characters'
- in r.data.decode('ascii', errors='ignore')):
+ if ("The session id is too long or contains illegal characters"
+ in r.data.decode("ascii", errors="ignore")):
pout("phpwarnings", url)
except (urllib3.exceptions.HTTPError, UnicodeError,
ConnectionRefusedError):
@@ -677,10 +677,10 @@ def test_elmah(url):
@DEFAULT
def test_citrix_rce(url):
try:
- r = pool.request('GET', url + "/vpn/../vpns/portal/tips.html",
+ r = pool.request("GET", url + "/vpn/../vpns/portal/tips.html",
retries=False, redirect=False,
headers={"NSC_USER": "x", "NSC_NONCE": "x"})
- if '' in r.data.decode('ascii', errors='ignore'):
+ if '
' in r.data.decode("ascii", errors="ignore"):
pout("citrix_rce", url + "/vpn/../vpns/portal/tips.html")
except (urllib3.exceptions.HTTPError, UnicodeError,
ConnectionRefusedError):
@@ -690,22 +690,22 @@ def test_citrix_rce(url):
@DEFAULT
def test_installer(url):
r = getmainpage(url)
- if (mainpage_cache[url]['location'].endswith("wp-admin/setup-config.php")
+ if (mainpage_cache[url]["location"].endswith("wp-admin/setup-config.php")
or 'href="wp-admin/css/install.css"' in r):
pout("installer_wordpress", url)
- elif mainpage_cache[url]['location'].endswith("installation/index.php"):
+ elif mainpage_cache[url]["location"].endswith("installation/index.php"):
pout("installer_joomla", url)
- elif mainpage_cache[url]['location'].endswith("typo3/install.php"):
+ elif mainpage_cache[url]["location"].endswith("typo3/install.php"):
pout("installer_typo3", url)
- elif mainpage_cache[url]['location'].endswith("install.php"):
+ elif mainpage_cache[url]["location"].endswith("install.php"):
pout("installer_drupal", url)
- elif mainpage_cache[url]['location'].endswith("serendipity_admin.php"):
+ elif mainpage_cache[url]["location"].endswith("serendipity_admin.php"):
pout("installer_s9y", url)
- elif 'LocalSettings.php not found' in r:
+ elif "LocalSettings.php not found" in r:
pout("installer_mediawiki", url)
- elif '8 easy steps and will take around 5 minutes' in r:
+ elif "8 easy steps and will take around 5 minutes" in r:
pout("installer_matomo", url)
- elif 'Create an admin account' in r:
+ elif "Create an admin account" in r:
pout("installer_nextcloud", url)
@@ -720,16 +720,16 @@ def test_wpsubdir(url):
@DEFAULT
def test_telescope(url):
r = fetcher(url + "/telescope", geterrpage=True)
- if 'Laravel Telescope' in r:
+ if "Laravel Telescope" in r:
pout("telescope", url + "/telescope")
- elif 'The Telescope assets are not published' in r:
+ elif "The Telescope assets are not published" in r:
pout("telescope_inactive", url + "/telescope")
@DEFAULT
def test_vb_test(url):
r = fetcher(url + "/vb_test.php")
- if '
vBulletin Test Script' in r:
+ if "vBulletin Test Script" in r:
pout("vb_test", url + "/vb_test.php")
@@ -737,9 +737,9 @@ def test_vb_test(url):
def test_headerinject(url):
rnd = randstring()
try:
- r = pool.request('GET', f'{url}/%0D%0A{rnd}:1', retries=False, redirect=False)
+ r = pool.request("GET", f"{url}/%0D%0A{rnd}:1", retries=False, redirect=False)
if rnd in r.headers:
- pout("headerinject", f'{url}/%0D%0A{rnd}:1')
+ pout("headerinject", f"{url}/%0D%0A{rnd}:1")
except (urllib3.exceptions.HTTPError, UnicodeError,
ConnectionRefusedError):
pass
@@ -748,36 +748,36 @@ def test_headerinject(url):
@DEFAULT
def test_wpdebug(url):
r = fetcher(url + "/wp-content/debug.log")
- if re.match(r'^\[\d\d-\w\w\w-\d\d\d\d ', r):
+ if re.match(r"^\[\d\d-\w\w\w-\d\d\d\d ", r):
pout("wpdebug", url + "/wp-content/debug.log")
@DEFAULT
def test_thumbsdb(url):
r = fetcher(url + "/Thumbs.db", binary=True)
- if r and r[0:8] == b'\xd0\xcf\x11\xe0\xa1\xb1\x1a\xe1':
+ if r and r[0:8] == b"\xd0\xcf\x11\xe0\xa1\xb1\x1a\xe1":
pout("thumbsdb", url + "/Thumbs.db")
@DEFAULT
def test_duplicator(url):
- for fn in ['installer.php', 'installer-backup.php']:
+ for fn in ["installer.php", "installer-backup.php"]:
r = fetcher(f"{url}/{fn}")
- if '/dup-installer/main.installer.php' in r:
+ if "/dup-installer/main.installer.php" in r:
pout("duplicator", f"{url}/{fn}")
- for fn in ['backups-dup-pro', 'backups-dup-lite']:
+ for fn in ["backups-dup-pro", "backups-dup-lite"]:
r = fetcher(f"{url}/wp-content/{fn}/")
- if '>Index of /' in r:
+ if ">Index of /" in r:
pout("duplicator_dirlisting", f"{url}/wp-content/{fn}/")
@DEFAULT
def test_desktopini(url):
r = fetcher(url + "/desktop.ini")
- if '[\x00.\x00S\x00h\x00e\x00l\x00l\x00C\x00l\x00a\x00s\x00s' in r:
+ if "[\x00.\x00S\x00h\x00e\x00l\x00l\x00C\x00l\x00a\x00s\x00s" in r:
pout("desktopini", url + "/desktop.ini")
r = fetcher(url + "/Desktop.ini")
- if '[\x00.\x00S\x00h\x00e\x00l\x00l\x00C\x00l\x00a\x00s\x00s' in r:
+ if "[\x00.\x00S\x00h\x00e\x00l\x00l\x00C\x00l\x00a\x00s\x00s" in r:
pout("desktopini", url + "/Desktop.ini")
@@ -785,10 +785,10 @@ def test_desktopini(url):
@HOSTNAME
def test_axfr(qhost):
try:
- if 'resolve' in dir(dns.resolver):
- ns = dns.resolver.resolve(qhost, 'NS')
+ if "resolve" in dir(dns.resolver):
+ ns = dns.resolver.resolve(qhost, "NS")
else: # dnspython before 2.0
- ns = dns.resolver.query(qhost, 'NS')
+ ns = dns.resolver.query(qhost, "NS")
except (dns.exception.DNSException, dns.exception.Timeout,
ConnectionResetError, ConnectionRefusedError,
EOFError, socket.gaierror, TimeoutError, OSError):
@@ -798,12 +798,12 @@ def test_axfr(qhost):
ipv4 = []
ipv6 = []
try:
- if 'resolve' in dir(dns.resolver):
- ipv4 = dns.resolver.resolve(r, 'a').rrset
- ipv6 = dns.resolver.resolve(r, 'aaaa').rrset
+ if "resolve" in dir(dns.resolver):
+ ipv4 = dns.resolver.resolve(r, "a").rrset
+ ipv6 = dns.resolver.resolve(r, "aaaa").rrset
else: # dnspython before 2.0
- ipv4 = dns.resolver.query(r, 'a').rrset
- ipv6 = dns.resolver.query(r, 'aaaa').rrset
+ ipv4 = dns.resolver.query(r, "a").rrset
+ ipv6 = dns.resolver.query(r, "aaaa").rrset
except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN):
pass
ips = []
@@ -826,10 +826,10 @@ def test_axfr(qhost):
@HOSTNAME
def test_openmonit(qhost):
url = f"http://{qhost}:2812/"
- headers = urllib3.util.make_headers(basic_auth='admin:monit')
+ headers = urllib3.util.make_headers(basic_auth="admin:monit")
try:
- r = pool.request('GET', url, headers=headers)
- if 'Monit:' in r.data.decode('ascii', errors='ignore'):
+ r = pool.request("GET", url, headers=headers)
+ if "Monit:" in r.data.decode("ascii", errors="ignore"):
pout("openmonit", url)
except (urllib3.exceptions.HTTPError, UnicodeError,
ConnectionRefusedError):
@@ -839,17 +839,17 @@ def test_openmonit(qhost):
@DEFAULT
@HOSTNAME
def test_openelasticsearch(qhost):
- headers = urllib3.util.make_headers(basic_auth='admin:admin')
+ headers = urllib3.util.make_headers(basic_auth="admin:admin")
try:
r = pool.request("GET", f"http://{qhost}:9200", headers=headers)
- if '"cluster_name" :' in r.data.decode('ascii', errors='ignore'):
+ if '"cluster_name" :' in r.data.decode("ascii", errors="ignore"):
pout("openelasticsearch", f"http://{qhost}:9200")
except (urllib3.exceptions.HTTPError, UnicodeError,
ConnectionRefusedError):
pass
try:
r = pool.request("GET", f"https://{qhost}:9200", headers=headers)
- if '"cluster_name" :' in r.data.decode('ascii', errors='ignore'):
+ if '"cluster_name" :' in r.data.decode("ascii", errors="ignore"):
pout("openelasticsearch", f"https://{qhost}:9200")
except (urllib3.exceptions.HTTPError, UnicodeError,
ConnectionRefusedError):
@@ -877,10 +877,10 @@ def test_drupal(url):
def test_wordpress(url):
r = getmainpage(url)
try:
- p = bs4.BeautifulSoup(r, 'html.parser')
+ p = bs4.BeautifulSoup(r, "html.parser")
g = p.findAll("meta", {"name": "generator"})
- if g and g[0]['content'][:9] == "WordPress":
- version = g[0]['content'][10:]
+ if g and g[0]["content"][:9] == "WordPress":
+ version = g[0]["content"][10:]
if not set(version).issubset("0123456789."):
return
pout("wordpress", url, version)
@@ -896,13 +896,13 @@ def test_wordpress(url):
def test_mailman(url):
murl = f"{url}/mailman/listinfo"
r = fetcher(murl)
- if 'Delivered by Mailman' in r:
- ver = re.findall('version ([0-9.]+)', r)
+ if "Delivered by Mailman" in r:
+ ver = re.findall("version ([0-9.]+)", r)
if len(ver) > 0:
ver = ver[0]
else:
ver = "unknown"
- if 'There currently are no publicly-advertised' in r:
+ if "There currently are no publicly-advertised" in r:
pout("mailman_unused", f"{murl} {ver}")
else:
pout("mailman", f"{murl} {ver}")
@@ -910,21 +910,21 @@ def test_mailman(url):
@INFO
def test_django_staticfiles_json(url):
- furl = url + '/static/staticfiles.json'
+ furl = url + "/static/staticfiles.json"
data = fetcher(furl)
try:
parsed = json.loads(data)
except json.JSONDecodeError:
pass
else:
- if isinstance(parsed, dict) and 'paths' in parsed:
- pout('django_staticfiles_json', furl)
+ if isinstance(parsed, dict) and "paths" in parsed:
+ pout("django_staticfiles_json", furl)
@INFO
def test_composer(url):
- for c in ['composer.json', 'composer.lock']:
- furl = url + '/' + c
+ for c in ["composer.json", "composer.lock"]:
+ furl = url + "/" + c
r = fetcher(furl)
if '"require":' in r or '"packages":' in r:
pout("composer", furl)
@@ -934,7 +934,7 @@ def test_composer(url):
def test_phpinfo(url):
for fn in ["phpinfo.php", "info.php", "i.php", "test.php"]:
r = fetcher(url + "/" + fn)
- if 'phpinfo()' in r:
+ if "phpinfo()" in r:
pout("phpinfo", url + "/" + fn)
@@ -956,7 +956,7 @@ sys.excepthook = new_excepthook
parser = argparse.ArgumentParser()
-parser.add_argument("hosts", nargs='+', help="hostname to scan")
+parser.add_argument("hosts", nargs="+", help="hostname to scan")
parser.add_argument("-t", "--tests", help="Comma-separated tests to run.")
parser.add_argument("--useragent", help="User agent to send")
parser.add_argument("--nowww", action="store_true",
@@ -969,7 +969,7 @@ parser.add_argument("-i", "--info", action="store_true",
help="Enable all info tests (no bugs/security vulnerabilities)")
parser.add_argument("-n", "--noisy", action="store_true",
help="Show noisy messages that indicate boring bugs, but no security issue")
-parser.add_argument("-p", "--path", default='', action="store", type=str,
+parser.add_argument("-p", "--path", default="", action="store", type=str,
help="Base path on server (scans root dir by default)")
parser.add_argument("-j", "--json", action="store_true",
help="Produce JSON output")
@@ -978,33 +978,33 @@ parser.add_argument("-d", "--debug", action="store_true",
args = parser.parse_args()
# Initializing global pool manager
-user_agent = {'user-agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:53.0) Gecko/20100101 Firefox/53.0'}
+user_agent = {"user-agent": "Mozilla/5.0 (X11; Linux x86_64; rv:53.0) Gecko/20100101 Firefox/53.0"}
if args.useragent:
- user_agent = {'user-agent': args.useragent}
+ user_agent = {"user-agent": args.useragent}
urllib3_major = int(urllib3.__version__.split(".", maxsplit=1)[0])
if urllib3_major >= 2:
- pool = urllib3.PoolManager(10, headers=user_agent, cert_reqs='CERT_NONE', # noqa: DUO132
+ pool = urllib3.PoolManager(10, headers=user_agent, cert_reqs="CERT_NONE", # noqa: DUO132
retries=False, timeout=2, ssl_minimum_version=ssl.TLSVersion.SSLv3)
else:
- pool = urllib3.PoolManager(10, headers=user_agent, cert_reqs='CERT_NONE', # noqa: DUO132
+ pool = urllib3.PoolManager(10, headers=user_agent, cert_reqs="CERT_NONE", # noqa: DUO132
retries=False, timeout=2)
# This is necessary for directory traversal attacks like citrix_cve
urllib3.util.url.NORMALIZABLE_SCHEMES = ()
if args.tests is None:
- tests = [g for f, g in locals().items() if hasattr(g, '_is_default_test')]
+ tests = [g for f, g in locals().items() if hasattr(g, "_is_default_test")]
else:
tests = []
try:
- for x in args.tests.split(','):
+ for x in args.tests.split(","):
tests.append(locals()["test_" + x])
except KeyError:
print(f"Test {x} does not exist")
sys.exit(1)
if args.info:
- tests += [g for f, g in locals().items() if hasattr(g, '_is_info_test')]
+ tests += [g for f, g in locals().items() if hasattr(g, "_is_info_test")]
path = args.path.rstrip("/")
if path != "" and path[0] != "/":
@@ -1037,7 +1037,7 @@ for host in hosts:
pdebug(f"Scanning {host}")
for test in tests:
pdebug(f"Running {test.__name__} test")
- if hasattr(test, '_is_hostname_test'):
+ if hasattr(test, "_is_hostname_test"):
test(host)
else:
if not args.nohttp:
diff --git a/tests/test_codingstyle.py b/tests/test_codingstyle.py
index a06d5a5..cce34a3 100644
--- a/tests/test_codingstyle.py
+++ b/tests/test_codingstyle.py
@@ -17,5 +17,5 @@ def test_codingstyle():
subprocess.run(["pyupgrade", "--py311-plus"] + pyfiles, check=True)
-if __name__ == '__main__':
+if __name__ == "__main__":
unittest.main()
diff --git a/tests/test_docs.py b/tests/test_docs.py
index ffe4174..e7151db 100644
--- a/tests/test_docs.py
+++ b/tests/test_docs.py
@@ -20,5 +20,5 @@ def test_docs(self):
self.assertEqual(funcs, docs)
-if __name__ == '__main__':
+if __name__ == "__main__":
unittest.main()
diff --git a/tests/test_scan_testdata.py b/tests/test_scan_testdata.py
index 4545bbf..80ff6f8 100644
--- a/tests/test_scan_testdata.py
+++ b/tests/test_scan_testdata.py
@@ -35,9 +35,9 @@ def test_scan_testdata(self):
olddir = os.getcwd()
os.chdir(tmp + "/testdata")
- httpd = http.server.HTTPServer(('localhost', 4443), http.server.SimpleHTTPRequestHandler)
+ httpd = http.server.HTTPServer(("localhost", 4443), http.server.SimpleHTTPRequestHandler)
context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
- context.load_cert_chain(certfile=tmp + '/testdata/testserver.pem')
+ context.load_cert_chain(certfile=tmp + "/testdata/testserver.pem")
httpd.socket = context.wrap_socket(httpd.socket, server_side=True)
t = threading.Thread(target=httpd.serve_forever)
t.daemon = True
@@ -51,5 +51,5 @@ def test_scan_testdata(self):
self.assertEqual(output, expected)
-if __name__ == '__main__':
+if __name__ == "__main__":
unittest.main()