-
Notifications
You must be signed in to change notification settings - Fork 0
/
sky_dns_tool
executable file
·131 lines (116 loc) · 5.02 KB
/
sky_dns_tool
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
#! /usr/bin/python
# Importing the necessary libraries
import json
import dyn_func # private library to validate IP address and hostnames
import sys, traceback
from dns import resolver,reversename
print 'Welcome to the DNS tool'
prompt = 'Please enter the filename: '
file_name = raw_input(prompt)
try:
i_file = open(file_name)
except:
print "Please enter a valid file name : Unable to open input file: {}".format(file_name)
print "Exiting now with exit code 2"
exit(2)
output_file = 'raw_token_cache_file'
jop_file = 'json_human_file'
jop_file_mc = 'json_reg_file'
i_file = open(file_name) # Input file
o_file = open(output_file, 'w') # Raw token cache file
j_file = open(jop_file, 'w') # Json format file with indentation to make it human readable
jsp_file = open(jop_file_mc, 'w') # Regular Json format file
rtc = {} # raw token cache
etc = {} # error cache
ntc = {} # normalized token cache
iptc = {} # ip address token cache
stc = {} # summary data cache
rtc["localhost"] = {"line_num": [0]}
iptc_list = []
stc_list = []
ntc_list = []
etc_list = []
num_lines = 0
num_errors = 0
num_pqdn = 0
num_fqdn = 0
num_ipv4 = 0
dup_token = 0
dup_tsasn = 0 # token skipped already saw normalized
for line in i_file:
line = line.partition('#')[0]
line = line.strip()
if line != '':
num_lines += 1
token = line
token_key = rtc.keys()
if token in token_key:
if token != "localhost":
dup_token += 1
d = rtc[token]
l = d['line_num']
l.append(num_lines)
else:
rtc[token] = {"line_num" : [num_lines]}
if dyn_func.validate_ip(token):
addr=reversename.from_address(token)
try:
dsc_name = str(resolver.query(addr,"PTR")[0])
except Exception:
exc_type, exc_value, exc_traceback = sys.exc_info()
formatted_lines = traceback.format_exc().splitlines()
if formatted_lines[-1] == "NXDOMAIN":
err_msg = 'Queried IP address doesnot exist'.format(token)
occ = {"as": token, "line": num_lines, "phase": "normalization", "error": err_msg}
etc_cache = {"type": "Error", "occurrences": occ}
etc_list.append(etc_cache)
num_errors += 1
occ = {"as": token, "line": num_lines}
iptc_cache = {"type": "IPv4", "occurrences": occ, "normalized": token, "discoverdName" :dsc_name}
iptc_list.append(iptc_cache)
num_ipv4 += 1
elif dyn_func.is_valid_hostname(token)['result']:
alist = []
try:
answer = resolver.query(token, "A")
for rdata in answer:
alist.append(rdata.address)
except Exception:
exc_type, exc_value, exc_traceback = sys.exc_info()
formatted_lines = traceback.format_exc().splitlines()
if formatted_lines[-1] == "NXDOMAIN":
err = formatted_lines[-1]
err_msg = 'Error message for hostname : {1} invalid with error for {0}'.format(err, token)
elif formatted_lines[-1] == "NoAnswer":
err = formatted_lines[-1]
err_msg = 'Queried web address not found : {}'.format(token)
occ = {"as": token, "line": num_lines, "phase": "normalization", "error": err_msg}
etc_cache = {"type": "Error", "occurrences": occ}
etc_list.append(etc_cache)
num_errors += 1
if dyn_func.is_valid_hostname(token)['token_type'] == 'PQDN':
nmz_token = "{}.".format(token)
num_pqdn += 1
if nmz_token in token_key:
dup_tsasn += 1
else:
nmz_token = token
num_fqdn += 1
occ = {"as": token, "line": num_lines, "normalized": nmz_token, "recordsAType": alist}
ntc_cache = {"type": "DN", "occurrences": occ}
iptc_list.append(ntc_cache)
else:
occ = {"as": token, "line": num_lines}
etc_cache = {"type": "Error", "occurrences": occ}
etc_list.append(etc_cache)
num_errors += 1
stc_cache = {"linesWithToken": num_lines, "totalFQDN": num_fqdn, "totalPQDN": num_pqdn, "totalIPv4": num_ipv4,
"totalErrors": num_errors, "tokenSkippedAlreadySeen": dup_token,
"tokenSkippedAlreadySawNormalized": dup_tsasn}
stc_list.append(stc_cache)
final_cache = {"summaryData": stc_list, "discoveredInformation": iptc_list, "errors": etc_list}
json.dump(final_cache, j_file, indent=2)
json.dump(rtc, o_file)
json.dump(final_cache, jsp_file)
print "Tool Execution successful, Quitting now"
exit(0)