/
owned_utils.py
108 lines (93 loc) · 3.34 KB
/
owned_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
#!/usr/bin/python3
# Author: Patrick Hurd, Penetration Tester, Coalfire Federal 2019, 2020
import requests, json
import getopt, sys
headers = { "Accept": "application/json; charset=UTF-8",
"Content-Type": "application/json",
"Authorization": "bmVvNGo6Qmxvb2Rob3VuZA==" }
url = 'http://localhost:7474/db/data/transaction/commit'
def main(argv):
node_type = ''
node_label = ''
request = ''
domain = ''
try:
opts, args = getopt.getopt(argv,"hr:t:l:d:",["request=", "type=","label=","domain="])
except getopt.GetoptError:
print('test.py -r <request type> -t <node type> -l <node label>')
sys.exit(2)
for opt, arg in opts:
if opt == '-h':
print ('test.py -r <request type> -t <node type> -l <node label>')
sys.exit()
elif opt in ("-r", "--request"):
request = arg
elif opt in ("-t", "--type"):
node_type = arg
elif opt in ("-l", "--label"):
node_label = arg
elif opt in ("-d", "--domain"):
domain = arg
mux(request, node_type, node_label)
def mux(request, node_type, node_label):
if request == 'domains':
get_domains();
elif request == 'owned':
mark_owned(node_type, node_label)
elif request == 'create':
create(node_type, node_label)
elif request == 'exists':
exists(node_type, node_label)
elif request == 'exists_like':
exists_starts_with(node_type, node_label)
elif request == 'owned_like':
owned_starts_with(node_type, node_label)
else:
print("Error: unknown request type")
def mark_owned(nodetype, nodelabel):
statement = f'MATCH (n:{nodetype}) WHERE lower(n.name) = "{nodelabel.lower()}" SET n.owned = TRUE'
data = {"statements": [{'statement': statement}]}
r = requests.post(url=url,headers=headers,json=data)
print(r.text)
def create(nodetype, nodelabel):
statement = "CREATE (n:" + nodetype + ') SET n.name="' + nodelabel + '"'
data = {"statements": [{'statement': statement}]}
r = requests.post(url=url,headers=headers,json=data)
def exists_starts_with(nodetype, nodelabel):
statement = f'MATCH (n:{nodetype}) WHERE lower(n.name) STARTS WITH "{nodelabel.lower()}" RETURN n'
data = {"statements": [{'statement': statement}]}
r = requests.post(url=url,headers=headers,json=data)
if nodelabel in r.text:
print(1)
else:
print(r.text)
print(0)
def owned_starts_with(nodetype, nodelabel):
statement = f'MATCH (n:{nodetype}) WHERE lower(n.name) STARTS WITH "{nodelabel.lower()}" SET n.owned = TRUE'
data = {"statements": [{'statement': statement}]}
r = requests.post(url=url,headers=headers,json=data)
print(r.text)
def exists(nodetype, nodelabel):
statement = 'MATCH (n:*) WHERE lower(n.name) = "' + nodelabel.lower() + '" RETURN n'
data = {"statements": [{'statement': statement}]}
r = requests.post(url=url,headers=headers,json=data)
if nodelabel in r.text:
return 1
else:
return 0
def get_domains():
statement = "MATCH (n:Domain) RETURN n"
data = {"statements": [{'statement': statement}]}
r = requests.post(url=url,headers=headers,json=data)
j = json.loads(r.text)
output = ''
for x in range(len(j["results"][0]["data"])):
output = output + j["results"][0]["data"][x]["row"][0]["name"] + ','
print(output[0:len(output)-1])
def test(nodetype, nodelabel):
statement = "MATCH (n:" + nodetype + " {name:'" + nodelabel + "'}) RETURN n"
data = {"statements": [{'statement': statement}]}
r = requests.post(url=url,headers=headers,json=data)
print(r.text)
if __name__ == '__main__':
main(sys.argv[1:])