-
Notifications
You must be signed in to change notification settings - Fork 0
/
pdknockr.py
165 lines (119 loc) Β· 5.8 KB
/
pdknockr.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
#!/usr/bin/env python
# Passive DNS Knocker (PDK) - developed by acidvegas in python (https://git.acid.vegas/pdknockr)
import argparse
import asyncio
import logging
import logging.handlers
import os
import random
import time
try:
import aiodns
except ImportError:
raise SystemExit('missing required \'aiodns\' module (pip install aiodns)')
async def dns_lookup(semaphore: asyncio.Semaphore, domain: str, dns_server: str, record_type: str, timeout: int):
'''
Perform a DNS lookup on a target domain.
:param domain: The target domain to perform the lookup on.
:param subdomain: The subdomain to look up.
:param dns_server: The DNS server to perform the lookup on.
:param dns_type: The DNS record type to look up.
:param timeout: The timeout for the DNS lookup.
:param semaphore: The semaphore to use for concurrency.
'''
async with semaphore:
resolver = aiodns.DNSResolver(nameservers=[dns_server], timeout=timeout)
logging.info(f'Knocking {dns_server} with {domain} ({record_type})')
try:
await resolver.query(domain, record_type)
except:
pass # We're just knocking so errors are expected and ignored
def read_domain_file(file_path: str):
'''
Generator function to read domains line by line.
:param file_path: The path to the file containing the DNS servers.
'''
with open(file_path, 'r') as file:
while True:
for line in file:
line = line.strip()
if line:
yield line
def read_dns_file(file_path: str):
'''
Generator function to read DNS servers line by line.
:param file_path: The path to the file containing the DNS servers.
'''
with open(file_path, 'r') as file:
while True:
for line in file:
line = line.strip()
if line:
yield line
def generate_subdomain(sub_domains: list, domain: str, max_size: int):
'''
Generator function to read subdomains line by line.
:param sub_domains: The list of subdomains to use for generating noise.
'''
while True:
subs = random.sample(sub_domains, random.randint(2, max_size))
if random.choice([True, False]):
subs_index = random.randint(0, max_size - 1)
subs[subs_index] = subs[subs_index] + str(random.randint(1, 99))
yield random.choice(['.', '-']).join(subs) + '.' + domain
def setup_logging():
'''Setup the logging for the program.'''
os.makedirs('logs', exist_ok=True)
sh = logging.StreamHandler()
sh.setFormatter(logging.Formatter('%(asctime)s | %(levelname)9s | %(message)s', '%I:%M %p'))
log_filename = time.strftime('pdk_%Y-%m-%d_%H-%M-%S.log')
fh = logging.handlers.RotatingFileHandler(f'logs/{log_filename}', maxBytes=268435456, encoding='utf-8')
fh.setFormatter(logging.Formatter('%(asctime)s | %(levelname)9s | %(message)s', '%Y-%m-%d %I:%M %p'))
logging.basicConfig(level=logging.NOTSET, handlers=(sh,fh))
async def main():
'''Main function for the program.'''
parser = argparse.ArgumentParser(description='Passive DNS Knocking Tool')
parser.add_argument('-d', '--domains', help='Comma seperate list of domains or file containing list of domains')
parser.add_argument('-s', '--subdomains', help='File containing list of subdomains')
parser.add_argument('-r', '--resolvers', help='File containing list of DNS resolvers')
parser.add_argument('-rt', '--rectype', default='A,AAAA', help='Comma-seperated list of DNS record type (default: A,AAAA)')
parser.add_argument('-c', '--concurrency', type=int, default=25, help='Concurrency limit (default: 50)')
parser.add_argument('-t', '--timeout', type=int, default=3, help='Timeout for DNS lookup (default: 3)')
parser.add_argument('-n', '--noise', action='store_true', help='Enable random subdomain noise')
args = parser.parse_args()
setup_logging()
args.rectype = [record_type.upper() for record_type in args.rectype.split(',')]
if not args.domains:
raise SystemExit('no domains specified')
elif not os.path.exists(args.domains):
raise FileNotFoundError('domains file not found')
if not args.subdomains:
raise SystemExit('no subdomains file specified')
elif not os.path.exists(args.subdomains):
raise FileNotFoundError('subdomains file not found')
if not args.resolvers:
raise SystemExit('no resolvers file specified')
elif not os.path.exists(args.resolvers):
raise FileNotFoundError('resolvers file not found')
valid_record_types = ('A', 'AAAA', 'CNAME', 'MX', 'NS', 'PTR', 'SOA', 'SRV', 'TXT')
for record_type in args.rectype:
if record_type not in valid_record_types:
raise SystemExit(f'invalid record type: {record_type}')
semaphore = asyncio.BoundedSemaphore(args.concurrency)
while True:
tasks = []
for domain in read_domain_file(args.domains):
for dns_server in read_dns_file(args.resolvers):
sub_domain = generate_subdomain(args.subdomains, domain, 3)
if len(tasks) < args.concurrency:
query_record = random.choice(args.record_types)
task = asyncio.create_task(dns_lookup(semaphore, domain, sub_domain, dns_server, query_record, args.timeout))
tasks.append(task)
else:
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
tasks = list(pending)
await asyncio.wait(tasks) # Wait for any remaining tasks to complete
if not args.noise:
break
if __name__ == '__main__':
asyncio.run(main())