How to use safe_get_result method in Slash

Best Python code snippet using slash

base.py

Source:base.py Github

copy

Full Screen

1import json2import tldextract3import time4import re5import hashlib6import logging7import sys8import base649from dateutil import parser10from domaintools import API11from domaintools.exceptions import (BadRequestException, InternalServerErrorException, NotAuthorizedException,12 NotFoundException, ServiceException, ServiceUnavailableException)13class dt_module_helpers():14 def __init__(self, plugin):15 self.plugin = plugin16 def append_unique_payload(self, payload):17 m = hashlib.md5()18 for k in payload['values']:19 if k == '':20 continue21 if payload['values'][k] == '':22 if len(payload['values']) == 1:23 return24 continue25 m.update('{0}'.format(k).encode('utf-8'))26 m.update('{0}'.format(payload['values'][k]).encode('utf-8'))27 item = m.hexdigest()28 if item not in self.plugin.unique:29 self.plugin.unique[item] = True30 self.plugin.payload.append(payload)31 def append_unique_value(self, a_list, a_value):32 a_list.append(a_value)33 a_set = set(a_list)34 return list(a_set)35 def module_has_type(self, type):36 for t in self.plugin.module_info['module-type']:37 if t == type:38 return True39 return False40 def format_age(self, age):41 if age < 60:42 return '{age} seconds'.format(age=age)43 elif age < (60 * 60):44 return '{age} minutes'.format(age=round(age / 60, 1))45 elif age < (60 * 60 * 24):46 return '{age} hours'.format(age=round(age / (60 * 60), 1))47 elif age < (60 * 60 * 24 * 7):48 return '{age} days'.format(age=round(age / (60 * 60 * 24), 1))49 elif age < (60 * 60 * 24 * 30):50 return '{age} weeks'.format(age=round(age / (60 * 60 * 24 * 7), 1))51 elif age < (60 * 60 * 24 * 365):52 return '{age} months'.format(age=round(age / (60 * 60 * 24 * 30), 1))53 else:54 return '{age} years'.format(age=round(age / (60 * 60 * 24 * 365), 1))55 def calculate_age(self, born):56 return self.format_age(time.time() - time.mktime(parser.parse(born).timetuple()))57 def safe_path(self, src, path):58 self.safe_path_result = None59 ptr = src60 for p in path:61 if p in ptr:62 ptr = ptr[p]63 else:64 return False65 self.safe_path_result = ptr66 return True67 def safe_get(self, src, key):68 self.safe_get_result = src.get(key, None)69 return self.safe_get_result is not None and self.safe_get_result != ''70 def multiplex_atrb(self, src, keys, label):71 self.multiplex_atrb_result = ''72 values = []73 for key in keys:74 if self.safe_path(src,75 ['contacts', 'admin', key]):76 if type(self.safe_path_result) is list:77 for item in self.safe_path_result:78 if item != '' and item not in values:79 self.append_unique_value(values, item)80 else:81 if self.safe_path_result != '':82 self.append_unique_value(values, self.safe_path_result)83 if self.safe_path(src,84 ['contacts', 'billing',85 key]):86 if type(self.safe_path_result) is list:87 for item in self.safe_path_result:88 if item != '' and item not in values:89 self.append_unique_value(values, item)90 else:91 if self.safe_path_result != '':92 self.append_unique_value(values, self.safe_path_result)93 if self.safe_path(src,94 ['contacts', 'registrant',95 key]):96 if type(self.safe_path_result) is list:97 for item in self.safe_path_result:98 if item != '' and item not in values:99 self.append_unique_value(values, item)100 else:101 if self.safe_path_result != '':102 self.append_unique_value(values, self.safe_path_result)103 if self.safe_path(src,104 ['contacts', 'tech',105 key]):106 if type(self.safe_path_result) is list:107 for item in self.safe_path_result:108 if item != '':109 self.append_unique_value(values, item)110 else:111 if self.safe_path_result != '':112 self.append_unique_value(values, self.safe_path_result)113 for item in values:114 self.multiplex_atrb_result = {'types': ['{key}'.format(key=label)],115 'values': {label: item},116 'comment': '{0} from DomainTools'.format(label),117 'tags': ['DomainTools', 'whois', label]}118 def simple_parse(self, _what, _from_where, _types=['text'], _comment='{0} from DomainTools',119 _tags=['DomainTools', 'whois'], _label='{0}', _categories=['External analysis'],120 _parent=None):121 if _parent is not None:122 _comment = '{0} -> {1}'.format(_parent, _comment)123 _label = '{0} -> {1}'.format(_parent, _label)124 if type(_from_where) is dict:125 if _what == '*':126 for dict_key in _from_where:127 _tags_local = _tags[:]128 self.simple_parse(dict_key, _from_where, _types,129 _comment,130 self.append_unique_value(_tags_local, _what.replace('_', ' ')),131 _label, _categories)132 elif self.safe_get(_from_where, _what):133 if type(self.safe_get_result) is dict:134 for dict_key in self.safe_get_result:135 _tags_local = _tags[:]136 self.simple_parse(dict_key, self.safe_get_result, _types,137 _comment.format(_what.replace('_', ' ') + ' -> {0}'),138 self.append_unique_value(_tags_local, _what.replace('_', ' ')),139 _label.format(_what.replace('_', ' ') + ' -> {0}'), _categories)140 elif type(self.safe_get_result) is list:141 for item in self.safe_get_result:142 _tags_local = _tags[:]143 self.simple_parse('*', item, _types,144 _comment.format(_what.replace('_', ' ') + ' -> {0}'),145 self.append_unique_value(_tags_local, _what.replace('_', ' ')),146 _label.format(_what.replace('_', ' ') + ' -> {0}'), _categories)147 # self.append_unique_payload({'types': _types, 'values': {_label.format(_what.replace('_',' ')): item}, 'comment': _comment.format(_what.replace('_',' ')), 'tags': self.append_unique_value(_tags_local,_what.replace('_',' ')), 'categories': _categories})148 else:149 self.append_unique_payload({'types': _types, 'values': {150 _label.format(_what.replace('_', ' ')): self.safe_get_result},151 'comment': _comment.format(_what.replace('_', ' ')),152 'tags': _tags, 'categories': _categories})153 elif type(_from_where) is list:154 for item in _from_where:155 _tags_local = _tags[:]156 self.simple_parse(_what, item, _types,157 _comment.format(_what.replace('_', ' ') + ' -> {0}'),158 self.append_unique_value(_tags_local, _what.replace('_', ' ')),159 _label.format(_what.replace('_', ' ') + ' -> {0}'), _categories)160 else:161 self.append_unique_payload(162 {'types': _types, 'values': {_label.format(_what.replace('_', ' ')): _from_where},163 'comment': _comment.format(_what.replace('_', ' ')), 'tags': _tags, 'categories': _categories})164 def iris_add(self, item, type, label, categories=['External analysis']):165 count = None166 tags = ['DomainTools', 'Iris']167 threshold = int(self.plugin.config.get('guided_pivot_threshold'))168 if self.safe_get(item, 'count'):169 count = self.safe_get_result170 comment = '{0} (GP: {1:,}) from DomainTools Iris'.format(label, count)171 else:172 comment = '{0} from DomainTools Iris'.format(label)173 if count and count < threshold:174 tags.append('Guided Pivot')175 self.simple_parse('value', item, type, comment, tags, label, _categories=categories)176 def iris_address(self, item, label):177 if self.safe_get(item, 'name'):178 if label == 'Registrant Contact':179 self.iris_add(self.safe_get_result, ['whois-registrant-name'], '{0} Name'.format(label),180 categories=['Attribution'])181 else:182 self.iris_add(self.safe_get_result, ['text'], '{0} Name'.format(label))183 if self.safe_get(item, 'org'):184 self.iris_add(self.safe_get_result, ['text'], '{0} Org'.format(label))185 if self.safe_get(item, 'street'):186 self.iris_add(self.safe_get_result, ['text'], '{0} Street'.format(label))187 if self.safe_get(item, 'state'):188 self.iris_add(self.safe_get_result, ['text'], '{0} State'.format(label))189 if self.safe_get(item, 'city'):190 self.iris_add(self.safe_get_result, ['text'], '{0} City'.format(label))191 if self.safe_get(item, 'country'):192 self.iris_add(self.safe_get_result, ['text'], '{0} Country'.format(label))193 if self.safe_get(item, 'fax'):194 self.iris_add(self.safe_get_result, ['text'], '{0} Fax'.format(label))195 if self.safe_get(item, 'postal'):196 self.iris_add(self.safe_get_result, ['text'], '{0} Postal'.format(label))197 if self.safe_get(item, 'phone'):198 self.iris_add(self.safe_get_result, ['text'], '{0} Phone'.format(label))199 for email in item['email']:200 if label == 'Registrant Contact':201 self.iris_add(email, ['whois-registrant-email'], '{0} Email'.format(label), categories=['Attribution'])202 else:203 self.iris_add(email, ['text'], '{0} Email'.format(label))204 def guided_pivots_value(self, value, label):205 if type(value) is list:206 for item in value:207 self.guided_pivots_value(item, label)208 return209 if type(value) is not dict:210 # not countable property211 return212 if 'count' in value and value['count'] > 0 and value['count'] < 300:213 self.append_unique_payload({'types': ['text'], 'categories': ['External analysis'],214 'values': {'Guided Pivot': '{0} ({1})'.format(label, value['count'])},215 'comment': 'Guided Pivot',216 'tags': ['DomainTools', 'Guided Pivot']})217 def guided_pivots(self, iris_property, label):218 for key, value in iris_property.items():219 self.guided_pivots_value(value, '{0} {1}'.format(label, key))220class dt_misp_module_base:221 def __init__(self):222 self.misp_attributes = {223 'input': ['domain', 'hostname', 'url', 'uri', 'email-src', 'email-dst', 'target-email',224 'whois-registrant-email',225 'whois-registrant-name', 'whois-registrant-phone', 'ip-src', 'ip-dst', 'whois-creation-date',226 'text', 'x509-fingerprint-sha1'],227 'output': ['whois-registrant-email', 'whois-registrant-phone', 'whois-registrant-name',228 'whois-registrar', 'whois-creation-date', 'comment', 'domain', 'ip-src', 'ip-dst', 'text']229 }230 self.module_config = ['username', 'api_key', 'results_limit']231 self.results_limit = 100232 self.guided_pivot_threshold = 300233 self.historic_enabled = False234 self.debug = False235 self.payload = list()236 self.unique = dict()237 self.errors = {'error': 'An unknown error has occurred'}238 self.helper = dt_module_helpers(self)239 self.log = logging.getLogger('DomainTools')240 self.log.setLevel(logging.DEBUG)241 self.ch = logging.StreamHandler(sys.stdout)242 self.ch.setLevel(logging.DEBUG)243 self.formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')244 self.ch.setFormatter(self.formatter)245 self.log.addHandler(self.ch)246 def check_config(self, request):247 """Check the incoming request for valid info."""248 self.config = request.get('config', None)249 if self.config is None:250 self.errors['error'] = "Configuration is missing from the request."251 return False252 if self.config.get('username', None) is None:253 self.errors['error'] = "DomainTools API username is not configured."254 return False255 if self.config.get('api_key', None) is None:256 self.errors['error'] = "DomainTools API key is not configured."257 return False258 if self.helper.module_has_type('expansion') and self.config.get('results_limit', None) is None:259 self.config['results_limit'] = self.results_limit260 if self.helper.module_has_type('expansion') and self.config.get('guided_pivot_threshold', '') == '':261 self.config['guided_pivot_threshold'] = self.guided_pivot_threshold262 self.api = dt_api_adapter_misp(self)263 self.svc_map = {264 'text': [self.api.parsed_whois, self.api.domain_profile, self.api.reputation, self.api.hosting_history,265 self.api.whois_history, self.api.risk],266 'domain': [self.api.parsed_whois, self.api.domain_profile, self.api.reputation, self.api.hosting_history,267 self.api.whois_history, self.api.risk, self.api.iris_hover, self.api.iris_pivot],268 'hostname': [self.api.parsed_whois, self.api.domain_profile, self.api.reputation, self.api.hosting_history,269 self.api.whois_history, self.api.risk, self.api.iris_pivot],270 'url': [self.api.parsed_whois, self.api.domain_profile, self.api.reputation, self.api.hosting_history,271 self.api.whois_history, self.api.risk],272 'uri': [self.api.parsed_whois, self.api.domain_profile, self.api.reputation, self.api.hosting_history,273 self.api.whois_history, self.api.risk],274 'email-src': [self.api.reverse_whois, self.api.iris_pivot],275 'email-dst': [self.api.reverse_whois, self.api.iris_pivot],276 'target-email': [self.api.reverse_whois],277 'whois-registrant-email': [self.api.reverse_whois, self.api.iris_pivot],278 'whois-registrant-name': [self.api.reverse_whois, self.api.iris_pivot],279 'whois-registrant-phone': [self.api.reverse_whois],280 'whois-registrar': [self.api.iris_pivot],281 'ip-src': [self.api.host_domains, self.api.parsed_whois, self.api.hosting_history, self.api.iris_pivot],282 'ip-dst': [self.api.host_domains, self.api.parsed_whois, self.api.hosting_history, self.api.iris_pivot],283 'whois-creation-date': [self.api.reverse_whois],284 'data': [self.api.iris_import],285 'x509-fingerprint-sha1': [self.api.iris_pivot]286 }287 return True288 def process_request(self, request_raw):289 try:290 self.__init__()291 if self.debug:292 self.log.debug("process_request: {0}".format(request_raw))293 request = json.loads(request_raw)294 proceed = self.check_config(request)295 if proceed:296 for type in self.misp_attributes['input']:297 if type in request:298 for svc in self.svc_map[type]:299 try:300 svc(request[type], type)301 except BadRequestException as e:302 self.log.debug("API returned a Bad Request response: {0}".format(request[type]))303 pass304 except NotAuthorizedException as e:305 self.log.debug("API returned a Not Authorized response: {0}".format(request[type]))306 pass307 except NotFoundException as e:308 self.log.debug("API returned a Not Found response: {0}".format(request[type]))309 pass310 except InternalServerErrorException as e:311 self.log.debug("API returned a Internal Server Error response: {0}".format(request[type]))312 pass313 except ServiceUnavailableException as e:314 self.log.debug("API returned a Service Unavailable response: {0}".format(request[type]))315 pass316 except ServiceException as e:317 self.log.debug("API returned a Service Error response: {0}".format(request[type]))318 pass319 break # can there realistically be more than one type in a request?320 return {'results': self.payload}321 except NotFoundException as e:322 self.errors = {'results': 'No information found'}323 return self.errors324class dt_api_adapter_misp():325 def __init__(self, plugin):326 self.plugin = plugin327 self.helper = plugin.helper328 self.svc_enabled = dict()329 self.api = API(username=plugin.config.get('username'), key=plugin.config.get('api_key'), app_partner='MISP',330 app_name=plugin.module['name'], app_version=plugin.module_info['version'])331 self.account_information = self.api.account_information().data()332 for svc in self.account_information['response']['products']:333 if svc['per_month_limit'] is not None:334 self.svc_enabled[svc['id']] = int(svc['per_month_limit']) - int(svc['usage']['month'])335 else:336 self.svc_enabled[svc['id']] = True337 def parsed_whois(self, query, query_type):338 if 'parsed-whois' not in self.svc_enabled or self.svc_enabled['parsed-whois'] <= 0:339 if self.plugin.debug:340 self.plugin.log.debug("parsed-whois: service disabled or over monthly limit")341 if self.plugin.module['name'] == 'DomainTools-Historic':342 return True343 if self.plugin.module['name'] == 'DomainTools-Iris-Analyze':344 return True345 if self.plugin.module['name'] == 'DomainTools-Iris-Pivot':346 return True347 tldex = tldextract.extract(query.replace('\\/', '/'))348 q = '.'.join(tldex[1:])349 if q == '':350 return True351 method = self.api.parsed_whois(q)352 results = method.data()353 results = results['response']354 if method._status != 200:355 return True356 results = results.get('parsed_whois')357 if results.get('source'):358 if self.helper.module_has_type('expansion'):359 if self.helper.safe_get(results, 'contacts'):360 for contact in self.helper.safe_get_result:361 self.helper.simple_parse('abuse_mailbox', contact, ['email-src', 'email-dst'])362 self.helper.simple_parse('address', contact)363 self.helper.simple_parse('changed_by', contact)364 self.helper.simple_parse('descr', contact)365 self.helper.simple_parse('fax', contact, ['whois-registrant-phone'])366 self.helper.simple_parse('notify_email', contact, ['whois-registrant-email'])367 self.helper.simple_parse('phone', contact, ['whois-registrant-phone'])368 self.helper.simple_parse('remarks', contact)369 self.helper.simple_parse('contact_keys', contact)370 self.helper.simple_parse('mnt_keys', contact)371 self.helper.simple_parse('other', contact)372 self.helper.simple_parse('country', contact)373 self.helper.simple_parse('created_date', contact, ['whois-creation-date'])374 self.helper.simple_parse('id', contact)375 self.helper.simple_parse('name', contact, ['whois-registrant-name'])376 self.helper.simple_parse('ref', contact, ['url', 'uri'])377 self.helper.simple_parse('source', contact)378 self.helper.simple_parse('type', contact)379 self.helper.simple_parse('updated_date', contact)380 if self.helper.safe_get(results, 'networks'):381 for network in self.helper.safe_get_result:382 self.helper.simple_parse('asn', network)383 self.helper.simple_parse('changed_by', network)384 self.helper.simple_parse('contact_keys', network)385 self.helper.simple_parse('country', network)386 self.helper.simple_parse('created_date', network, ['whois-creation-date'])387 self.helper.simple_parse('customer', network)388 self.helper.simple_parse('descr', network)389 self.helper.simple_parse('id', network)390 self.helper.simple_parse('mnt_keys', network)391 self.helper.simple_parse('name', network, ['whois-registrant-name'])392 self.helper.simple_parse('notify_email', network, ['whois-registrant-email'])393 self.helper.simple_parse('org', network)394 self.helper.simple_parse('other', network)395 self.helper.simple_parse('parent', network)396 self.helper.simple_parse('parent_id', network)397 self.helper.simple_parse('parent_id', network)398 self.helper.simple_parse('phone', network, ['whois-registrant-phone'])399 self.helper.simple_parse('range', network)400 self.helper.simple_parse('ref', contact)401 self.helper.simple_parse('remarks', contact)402 self.helper.simple_parse('source', contact)403 self.helper.simple_parse('status', contact)404 self.helper.simple_parse('updated_date', contact)405 else:406 self.helper.multiplex_atrb(results, ['email'], 'whois-registrant-email')407 if self.helper.multiplex_atrb_result != '':408 self.helper.append_unique_payload(self.helper.multiplex_atrb_result)409 self.helper.multiplex_atrb(results, ['org'], 'whois-registrant-org')410 if self.helper.multiplex_atrb_result != '':411 self.helper.append_unique_payload(self.helper.multiplex_atrb_result)412 self.helper.multiplex_atrb(results, ['country'], 'whois-registrant-country')413 if self.helper.multiplex_atrb_result != '':414 self.helper.append_unique_payload(self.helper.multiplex_atrb_result)415 self.helper.multiplex_atrb(results, ['phone'], 'whois-registrant-phone')416 if self.helper.multiplex_atrb_result != '':417 self.helper.append_unique_payload(self.helper.multiplex_atrb_result)418 self.helper.multiplex_atrb(results, ['fax'], 'whois-registrant-phone')419 if self.helper.multiplex_atrb_result != '':420 self.helper.append_unique_payload(self.helper.multiplex_atrb_result)421 self.helper.multiplex_atrb(results, ['name'], 'whois-registrant-name')422 if self.helper.multiplex_atrb_result != '':423 self.helper.append_unique_payload(self.helper.multiplex_atrb_result)424 self.helper.multiplex_atrb(results, ['city'], 'text')425 if self.helper.multiplex_atrb_result != '':426 self.helper.append_unique_payload(self.helper.multiplex_atrb_result)427 self.helper.multiplex_atrb(results, ['country'], 'text')428 if self.helper.multiplex_atrb_result != '':429 self.helper.append_unique_payload(self.helper.multiplex_atrb_result)430 self.helper.multiplex_atrb(results, ['state'], 'text')431 if self.helper.multiplex_atrb_result != '':432 self.helper.append_unique_payload(self.helper.multiplex_atrb_result)433 self.helper.multiplex_atrb(results, ['postal'], 'text')434 if self.helper.multiplex_atrb_result != '':435 self.helper.append_unique_payload(self.helper.multiplex_atrb_result)436 self.helper.multiplex_atrb(results, ['street'], 'text')437 if self.helper.multiplex_atrb_result != '':438 self.helper.append_unique_payload(self.helper.multiplex_atrb_result)439 self.helper.simple_parse('registrar', results, ['whois-registrar'])440 self.helper.simple_parse('created_date', results, ['whois-creation-date'])441 if self.helper.safe_get(results, 'created_date') and self.helper.safe_get_result != '':442 age = self.helper.calculate_age(self.helper.safe_get_result)443 self.helper.append_unique_payload({'types': ['text'], 'categories': ['External analysis'],444 'values': {'domain age': '{0} created {1} ago'.format(q, age)},445 'comment': 'domain age from DomainTools',446 'tags': ['DomainTools', 'domain age']})447 # print("created_date: {0} -> {1}".format(self.helper.safe_get_result, age))448 self.helper.simple_parse('other_properties', results)449 return True450 def whois(self, query, query_type):451 if 'whois' not in self.svc_enabled or self.svc_enabled['whois'] <= 0:452 if self.plugin.debug:453 self.log.debug("whois: service disabled or over monthly limit")454 if self.plugin.module['name'] == 'DomainTools-Historic':455 return True456 method = self.api.whois(query)457 results = method.data()458 results = results['response']459 if method._status != 200:460 return True461 if self.helper.safe_path(results, ['whois', 'registrant']) and self.helper.safe_path_result != '':462 self.helper.append_unique_payload({'types': ['whois-registrant-name'],463 'values': {'registrant name': self.helper.safe_path_result},464 'comment': 'registrant name from DomainTools',465 'tags': ['DomainTools', 'whois', 'registrant name']})466 if self.helper.safe_path(results, ['whois', 'registration', 'created']) and self.helper.safe_path_result != '':467 self.helper.append_unique_payload({'types': ['whois-creation-date'],468 'values': {'creation date': self.helper.safe_path_result},469 'comment': 'creation date from DomainTools',470 'tags': ['DomainTools', 'whois', 'creation date']})471 return True472 def domain_profile(self, query, query_type):473 if 'domain-profile' not in self.svc_enabled or self.svc_enabled['domain-profile'] <= 0:474 if self.plugin.debug:475 self.log.debug("domain-profile: service disabled or over monthly limit")476 if self.plugin.module['name'] == 'DomainTools-Historic':477 return True478 if self.plugin.module['name'] == 'DomainTools-Iris-Analyze':479 return True480 if self.plugin.module['name'] == 'DomainTools-Iris-Pivot':481 return True482 tldex = tldextract.extract(query.replace('\\/', '/'))483 q = '.'.join(tldex[1:])484 if q == '':485 return True486 method = self.api.domain_profile(q)487 results = method.data()488 results = results['response']489 if method._status != 200:490 return True491 if results.get('error'):492 self.plugin.errors['error'] = results['error']['message']493 return False494 if self.helper.safe_path(results, ['server', 'ip_address']) and self.helper.safe_path_result != '':495 self.helper.append_unique_payload({'types': ['ip-src', 'ip-dst'],496 'values': {'domain ip address': self.helper.safe_path_result},497 'comment': 'domain ip address from DomainTools',498 'tags': ['DomainTools', 'whois', 'domain ip address']})499 if self.helper.safe_path(results, ['registrant', 'name']) and self.helper.safe_path_result != '':500 self.helper.append_unique_payload({'types': ['whois-registrant-name'],501 'values': {'registrant name': self.helper.safe_path_result},502 'comment': 'registrant name from DomainTools',503 'tags': ['DomainTools', 'whois', 'registrant name']})504 if self.helper.safe_path(results, ['registrant', 'email']) and self.helper.safe_path_result != '':505 self.helper.append_unique_payload({'types': ['whois-registrant-email'],506 'values': {'registrant email': self.helper.safe_path_result},507 'comment': 'registrant email from DomainTools',508 'tags': ['DomainTools', 'whois', 'registrant email']})509 if self.helper.safe_path(results, ['registrant', 'domains']) and self.helper.safe_path_result != '':510 self.helper.append_unique_payload({'types': ['text'], 'categories': ['External analysis'],511 'values': {512 'registrant domain count': 'registrant has {count} other domains'.format(513 count=self.helper.safe_path_result)},514 'comment': 'registrant domain count from DomainTools',515 'tags': ['DomainTools', 'whois', 'registrant domain count']})516 if self.helper.safe_path(results, ['server', 'other_domains']) and self.helper.safe_path_result != '':517 self.helper.append_unique_payload({'types': ['text'], 'categories': ['External analysis'],518 'values': {519 'co-located domain count': 'IP is shared with {count} other domains'.format(520 count=self.helper.safe_path_result)},521 'comment': 'co-located domain count from DomainTools',522 'tags': ['DomainTools', 'co-located domain count']})523 if self.helper.safe_path(results, ['name_servers']):524 for ns in self.helper.safe_path_result:525 if ns != '':526 self.helper.append_unique_payload({'types': ['hostname'],527 'values': {'name server': ns['server']},528 'comment': 'name server from DomainTools',529 'tags': ['DomainTools', 'whois', 'nameserver']})530 if self.helper.safe_path(results, ['registration', 'created']) and self.helper.safe_path_result != '':531 self.helper.append_unique_payload({'types': ['whois-creation-date'],532 'values': {'creation date': self.helper.safe_path_result},533 'comment': 'creation date from DomainTools',534 'tags': ['DomainTools', 'whois', 'creation date']})535 if self.helper.safe_path(results, ['registration', 'registrar']) and self.helper.safe_path_result != '':536 self.helper.append_unique_payload({'types': ['whois-registrar'],537 'values': {'registrar': self.helper.safe_path_result},538 'comment': 'registrar from DomainTools',539 'tags': ['DomainTools', 'whois', 'registrar']})540 return True541 def reputation(self, query, query_type):542 if 'reputation' not in self.svc_enabled or self.svc_enabled['reputation'] <= 0:543 if self.plugin.debug:544 self.log.debug("reputation: service disabled or over monthly limit")545 if self.plugin.module['name'] == 'DomainTools-Historic':546 return True547 if self.plugin.module['name'] == 'DomainTools-Iris-Analyze':548 return True549 if self.plugin.module['name'] == 'DomainTools-Iris-Pivot':550 return True551 tldex = tldextract.extract(query.replace('\\/', '/'))552 if self.helper.module_has_type('expansion'):553 method = self.api.reputation('.'.join(tldex[1:]), include_reasons=True)554 else:555 method = self.api.reputation('.'.join(tldex[1:]))556 results = method.data()557 results = results['response']558 if method._status != 200:559 return True560 if results.get('error'):561 self.errors['error'] = results['error']['message']562 return False563 self.helper.simple_parse('risk_score', results, ['text'], 'domain reputation score from DomainTools',564 ['DomainTools', 'whois', 'domain reputation score'], 'domain reputation score')565 # self.helper.simple_parse('reasons', pw, ['text'], 'domain reputation score reasons from DomainTools',566 # ['DomainTools', 'whois', 'domain reputation score reasons'], 'domain reputation score reasons')567 if self.helper.safe_get(results, 'reasons'):568 reasons = list()569 for item in self.helper.safe_get_result:570 if type(item) is dict:571 for dict_key in item:572 reasons.append('{0}: {1}'.format(dict_key, self.helper.safe_get_result[dict_key]))573 else:574 reasons.append(item)575 if len(reasons) > 0:576 self.helper.append_unique_payload({'types': ['text'], 'categories': ['External analysis'], 'values': {577 'domain reputation score reason': ', '.join(reasons)},578 'comment': 'domain reputation score reason from DomainTools',579 'tags': ['DomainTools', 'domain reputation score reason']})580 return True581 def risk(self, query, query_type):582 if 'risk' not in self.svc_enabled or self.svc_enabled['risk'] <= 0:583 if self.plugin.debug:584 self.log.debug("risk: service disabled or over monthly limit")585 if self.plugin.module['name'] == 'DomainTools-Historic':586 return True587 if self.plugin.module['name'] == 'DomainTools-Iris-Analyze':588 return True589 if self.plugin.module['name'] == 'DomainTools-Iris-Pivot':590 return True591 tldex = tldextract.extract(query.replace('\\/', '/'))592 q = '.'.join(tldex[1:])593 if q == '':594 return True595 method = self.api._results('risk', '/v1/risk', domain=q)596 results = method.data()597 results = results['response']598 if method._status != 200:599 return True600 if results.get('error'):601 self.errors['error'] = results['error']['message']602 return False603 self.helper.simple_parse('risk_score', results)604 # self.helper.simple_parse(_what='components', _from_where=pw, _parent='risk score')605 if self.helper.safe_get(results, 'components'):606 reasons = list()607 for item in self.helper.safe_get_result:608 for dict_key in item:609 reasons.append('{0}: {1}'.format(dict_key, item[dict_key]))610 if len(reasons) > 0:611 self.helper.append_unique_payload(612 {'types': ['text'], 'categories': ['External analysis'], 'values': {613 'risk score reason': ', '.join(reasons)},614 'comment': 'risk score reason from DomainTools',615 'tags': ['DomainTools', 'risk score reason']})616 return True617 def reverse_ip(self, query, query_type):618 if 'reverse-ip' not in self.svc_enabled or self.svc_enabled['reverse-ip'] <= 0:619 if self.plugin.debug:620 self.log.debug("reverse-ip: service disabled or over monthly limit")621 if self.plugin.module['name'] == 'DomainTools-Historic':622 return True623 if self.plugin.module['name'] == 'DomainTools-Iris-Analyze':624 return True625 if self.plugin.module['name'] == 'DomainTools-Iris-Pivot':626 return True627 tldex = tldextract.extract(query.replace('\\/', '/'))628 q = '.'.join(tldex[1:])629 if q == '':630 return True631 if self.helper.module_has_type('expansion'):632 method = self.api.reverse_ip(query)633 else:634 method = self.api.reverse_ip(query, limit=0)635 results = method.data()636 results = results['response']637 if method._status != 200:638 return True639 if results.get('error'):640 self.errors['error'] = results['error']['message']641 return False642 if self.helper.safe_get(results, 'ip_addresses'):643 if type(self.helper.safe_get_result) is list:644 for item in self.helper.safe_get_result:645 if item['domain_count'] != '':646 self.helper.append_unique_payload(647 {'types': ['text'], 'categories': ['External analysis'], 'values': {648 'co-located domain count': '{0} is shared with {count} other domains'.format(q, item[649 'domain_count'])},650 'comment': 'co-located domain count from DomainTools',651 'tags': ['DomainTools', 'co-located domain count']})652 if self.helper.module_has_type('expansion'):653 limit = int(self.plugin.config.get('results_limit'))654 for d in item['domain_names']:655 if limit == 0:656 break657 limit = limit - 1658 if d != '':659 self.helper.append_unique_payload({'types': ['domain'],660 'values': {'reverse ip domain': d},661 'comment': 'reverse ip domain from DomainTools',662 'tags': ['DomainTools', 'reverse ip domain']})663 if item['ip_address'] != query and item['ip_address'] != '':664 self.helper.append_unique_payload({'types': ['ip-src', 'ip-dst'],665 'values': {'reverse ip': item['ip_address']},666 'comment': 'reverse ip from DomainTools',667 'tags': ['DomainTools', 'reverse ip']})668 else:669 if self.helper.safe_get_result['domain_count'] != '':670 self.helper.append_unique_payload(671 {'types': ['text'], 'categories': ['External analysis'], 'values': {672 'co-located domain count': '{0} is shared with {count} other domains'.format(q, count=673 self.helper.safe_get_result['domain_count'])},674 'comment': 'co-located domain count from DomainTools',675 'tags': ['DomainTools', 'co-located domain count']})676 if self.helper.module_has_type('expansion'):677 limit = int(self.plugin.config.get('results_limit'))678 for d in self.helper.safe_get_result['domain_names']:679 if limit == 0:680 break681 limit = limit - 1682 if d != '':683 self.helper.append_unique_payload({'types': ['domain'],684 'values': {'reverse ip domain': d},685 'comment': 'reverse ip domain from DomainTools',686 'tags': ['DomainTools', 'reverse ip domain']})687 return True688 def reverse_whois(self, query, query_type):689 if 'reverse-whois' not in self.svc_enabled or self.svc_enabled['reverse-whois'] <= 0:690 if self.plugin.debug:691 self.log.debug("reverse-whois: service disabled or over monthly limit")692 if self.plugin.module['name'] == 'DomainTools-Historic':693 return True694 if self.plugin.module['name'] == 'DomainTools-Iris-Analyze':695 return True696 if self.plugin.module['name'] == 'DomainTools-Iris-Pivot':697 return True698 if self.helper.module_has_type('expansion'):699 method = self.api.reverse_whois(query, mode='purchase')700 else:701 method = self.api.reverse_whois(query, mode='quote')702 results = method.data()703 results = results['response']704 if method._status != 200:705 return True706 if results.get('error'):707 self.errors['error'] = results['error']['message']708 return False709 if self.helper.safe_get(results, 'domains'):710 for domain in self.helper.safe_get_result:711 if domain != '':712 self.helper.append_unique_payload({'types': ['domain'], 'values': {'reverse whois domain': domain},713 'comment': 'reverse whois domain from DomainTools',714 'tags': ['DomainTools', 'reverse whois domain']})715 if self.helper.safe_get(results, 'domain_count'):716 if 'current' in self.helper.safe_get_result:717 self.helper.append_unique_payload({'types': ['text'], 'categories': ['External analysis'], 'values': {718 'current domain count': self.helper.safe_get_result['current']},719 'comment': 'domain count current from DomainTools',720 'tags': ['DomainTools', 'reverse whois domain']})721 if 'historic' in self.helper.safe_get_result:722 self.helper.append_unique_payload({'types': ['text'], 'categories': ['External analysis'], 'values': {723 'historic domain count': self.helper.safe_get_result['historic']},724 'comment': 'historic domain count from DomainTools',725 'tags': ['DomainTools', 'reverse whois']})726 return True727 def host_domains(self, query, query_type):728 if 'reverse-ip' not in self.svc_enabled or self.svc_enabled['reverse-ip'] <= 0:729 if self.plugin.debug:730 self.log.debug("reverse-ip: service disabled or over monthly limit")731 if self.plugin.module['name'] == 'DomainTools-Historic':732 return True733 if self.plugin.module['name'] == 'DomainTools-Iris-Analyze':734 return True735 if self.plugin.module['name'] == 'DomainTools-Iris-Pivot':736 return True737 if self.helper.module_has_type('expansion'):738 method = self.api.host_domains(query)739 else:740 method = self.api.host_domains(query, limit=0)741 results = method.data()742 results = results['response']743 if method._status != 200:744 return True745 if results.get('error'):746 self.errors['error'] = results['error']['message']747 return False748 if self.helper.safe_get(results, 'ip_addresses'):749 if self.helper.safe_get_result['ip_address'] != '':750 self.helper.append_unique_payload({'types': ['text'], 'categories': ['External analysis'], 'values': {751 'co-located domain count': self.helper.safe_get_result['domain_count']},752 'comment': 'co-located domain count from DomainTools',753 'tags': ['DomainTools', 'co-located domain count']})754 if self.helper.module_has_type('expansion'):755 limit = int(self.plugin.config.get('results_limit'))756 for d in self.helper.safe_get_result['domain_names']:757 if limit == 0:758 break759 limit = limit - 1760 if d != '':761 self.helper.append_unique_payload({'types': ['domain'],762 'values': {'co-located domain': d},763 'comment': 'co-located domain from DomainTools',764 'tags': ['DomainTools', 'co-located domain']})765 return True766 def hosting_history(self, query, query_type):767 if 'hosting-history' not in self.svc_enabled or self.svc_enabled['hosting-history'] <= 0:768 if self.plugin.debug:769 self.log.debug("hosting-history: service disabled or over monthly limit")770 if self.plugin.module['name'] != 'DomainTools-Historic':771 return True772 tldex = tldextract.extract(query.replace('\\/', '/'))773 q = '.'.join(tldex[1:])774 if q == '':775 return True776 method = self.api.hosting_history(q)777 results = method.data()778 results = results['response']779 if method._status != 200:780 return781 if results.get('error'):782 self.errors['error'] = results['error']['message']783 return False784 if self.helper.safe_get(results, 'ip_history'):785 for item in self.helper.safe_get_result:786 if item['pre_ip'] is not None and item['pre_ip'] != '':787 self.helper.append_unique_payload({'types': ['ip-src', 'ip-dst'], 'values': {788 'ip': item['pre_ip']},789 'comment': 'record date: {0}'.format(item['actiondate']),790 'tags': ['DomainTools', 'hosting history', 'whois', 'ip',791 'historic']})792 if item['post_ip'] is not None and item['post_ip'] != '':793 self.helper.append_unique_payload({'types': ['ip-src', 'ip-dst'], 'values': {794 'ip': item['post_ip']},795 'comment': 'record date: {0}'.format(item['actiondate']),796 'tags': ['DomainTools', 'hosting history', 'whois', 'ip',797 'historic']})798 if self.helper.safe_get(results, 'nameserver_history'):799 for item in self.helper.safe_get_result:800 if item['pre_mns'] is not None and item['pre_mns'] != '':801 self.helper.append_unique_payload({'types': ['hostname'], 'values': {802 'nameserver': item['pre_mns']},803 'comment': 'record date: {0}'.format(item['actiondate']),804 'tags': ['DomainTools', 'hosting history', 'whois', 'nameserver',805 'historic']})806 if item['post_mns'] is not None and item['post_mns'] != '':807 self.helper.append_unique_payload({'types': ['hostname'], 'values': {808 'nameserver': item['post_mns']},809 'comment': 'record date: {0}'.format(item['actiondate']),810 'tags': ['DomainTools', 'hosting history', 'whois', 'nameserver',811 'historic']})812 if self.helper.safe_get(results, 'registrar_history'):813 for item in self.helper.safe_get_result:814 if item['registrar'] is not None and item['registrar'] != '':815 self.helper.append_unique_payload({'types': ['whois-registrar'], 'values': {816 'registrar': item['registrar']},817 'comment': 'record date: {0}'.format(item['date_lastchecked']),818 'tags': ['DomainTools', 'hosting history', 'registrar',819 'historic']})820 return True821 def whois_history(self, query, query_type):822 if 'whois-history' not in self.svc_enabled or self.svc_enabled['whois-history'] <= 0:823 if self.plugin.debug:824 self.log.debug("whois-history: service disabled or over monthly limit")825 if self.plugin.module['name'] != 'DomainTools-Historic':826 return True827 tldex = tldextract.extract(query.replace('\\/', '/'))828 q = '.'.join(tldex[1:])829 if q == '':830 return True831 method = self.api.whois_history(q)832 results = method.data()833 results = results['response']834 if method._status != 200:835 return True836 if results.get('error'):837 self.errors['error'] = results['error']['message']838 return False839 if self.helper.safe_get(results, 'history'):840 for item in self.helper.safe_get_result:841 matches = re.findall(r"([a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+)", item['whois']['record'])842 for match in list(set(matches)):843 if match != '':844 self.helper.append_unique_payload({'types': ['whois-registrant-email'], 'values': {845 'email': match},846 'comment': 'record date: {0}'.format(item['date']),847 'tags': ['DomainTools', 'whois history']})848 return True849 def iris_hover(self, query, query_type):850 if 'iris-investigate' not in self.svc_enabled or self.svc_enabled['iris-investigate'] <= 0:851 if self.plugin.debug:852 self.log.debug("iris-investigate: service disabled or over monthly limit")853 if self.plugin.module['name'] != 'DomainTools-Iris-Analyze':854 return True855 tldex = tldextract.extract(query.replace('\\/', '/'))856 q = '.'.join(tldex[1:])857 if q == '':858 self.log.debug("q is empty")859 return True860 method = self.api.iris_investigate(q)861 results = method.data()862 results = results['response']863 if method._status != 200:864 return865 if results.get('error'):866 self.errors['error'] = results['error']['message']867 return False868 if not self.helper.safe_get(results, 'results'):869 return True870 result = self.helper.safe_get_result.pop()871 if self.helper.safe_path(result, ['create_date', 'value']) and self.helper.safe_path_result != '':872 create_date = self.helper.safe_path_result873 age = self.helper.calculate_age(create_date)874 self.helper.append_unique_payload({'types': ['text'], 'categories': ['External analysis'],875 'values': {'Create Date': '{0}'.format(create_date)},876 'comment': 'Create Date from DomainTools',877 'tags': ['DomainTools', 'Create Date']})878 self.helper.append_unique_payload({'types': ['text'], 'categories': ['External analysis'],879 'values': {'Domain Age': '{0} created {1} ago'.format(q, age)},880 'comment': 'Domain Age from DomainTools',881 'tags': ['DomainTools', 'Domain Age']})882 if self.helper.safe_get(result, 'registrant_contact'):883 registrant_contact = self.helper.safe_get_result884 self.helper.simple_parse('value', registrant_contact['name'], ['text'], 'registrant name from DomainTools',885 ['DomainTools', 'Iris'], 'registrant name')886 for email in registrant_contact['email']:887 self.helper.simple_parse('value', email, ['whois-registrant-email'], 'Registrant Email from DomainTools',888 ['DomainTools', 'Iris'], 'Registrant Email')889 if self.helper.safe_get(result, 'registrar'):890 self.helper.simple_parse('value', self.helper.safe_get_result, ['text'], 'Registrar from DomainTools',891 ['DomainTools', 'Iris'], 'Registrar Name')892 if self.helper.safe_get(result, 'ip'):893 for ip in self.helper.safe_get_result:894 self.helper.simple_parse('value', ip['address'], ['ip-dst'], 'IP address from DomainTools',895 ['DomainTools', 'Iris'], 'IP Address')896 self.helper.simple_parse('value', ip['country_code'], ['text'], 'country code from DomainTools',897 ['DomainTools', 'Iris'], 'IP Country Code')898 self.helper.simple_parse('value', ip['isp'], ['text'], 'ISP from DomainTools',899 ['DomainTools', 'Iris'], 'IP ISP')900 if self.helper.safe_get(result, 'domain_risk'):901 risk = self.helper.safe_get_result902 self.helper.simple_parse('risk_score', risk, ['text'], 'Risk from DomainTools',903 ['DomainTools', 'Iris'], 'Risk Score')904 for index, component in enumerate(risk['components']):905 self.helper.append_unique_payload({'types': ['text'], 'categories': ['External analysis'],906 'values': {'Risk Component': '{0} ({1})'.format(component['name'], component['risk_score'])},907 'comment': 'Risk Component from DomainTools',908 'tags': ['DomainTools', 'name server']})909 if self.helper.safe_get(result, 'name_server'):910 for index, name_server in zip(range(2), self.helper.safe_get_result):911 self.helper.simple_parse('value', name_server['host'], ['host'], 'Name Server from DomainTools',912 ['DomainTools', 'Iris'], 'Name Server')913 if self.helper.safe_get(result, 'mx'):914 for index, name_server in zip(range(2), self.helper.safe_get_result):915 self.helper.simple_parse('value', name_server['host'], ['text'], 'Mail Server from DomainTools',916 ['DomainTools', 'Iris'], 'Mail Server Host')917 if self.helper.safe_get(result, 'ssl_info'):918 for ssl in self.helper.safe_get_result:919 self.helper.simple_parse('value', ssl['organization'], ['text'], 'SSL Org from DomainTools',920 ['DomainTools', 'Iris'], 'SSL Org')921 # guided pivots922 if self.helper.safe_get(result, 'create_date'):923 self.helper.guided_pivots_value(self.helper.safe_get_result, 'create date')924 if self.helper.safe_get(result, 'technical_contact'):925 self.helper.guided_pivots(self.helper.safe_get_result, 'technical contact')926 if self.helper.safe_get(result, 'admin_contact'):927 self.helper.guided_pivots(self.helper.safe_get_result, 'admin contact')928 if self.helper.safe_get(result, 'billing_contact'):929 self.helper.guided_pivots(self.helper.safe_get_result, 'billing contact')930 if self.helper.safe_get(result, 'redirect_domain'):931 self.helper.guided_pivots_value(self.helper.safe_get_result, 'redirect domain')932 if self.helper.safe_get(result, 'expiration_date'):933 self.helper.guided_pivots_value(self.helper.safe_get_result, 'expiration date')934 if self.helper.safe_get(result, 'registrar'):935 self.helper.guided_pivots_value(self.helper.safe_get_result, 'registrar')936 if self.helper.safe_get(result, 'google_analytics'):937 self.helper.guided_pivots_value(self.helper.safe_get_result, 'google analytics')938 if self.helper.safe_get(result, 'registrant_contact'):939 self.helper.guided_pivots(self.helper.safe_get_result, 'registrant contact')940 if self.helper.safe_get(result, 'registrant_org'):941 self.helper.guided_pivots(self.helper.safe_get_result, 'registrant organization')942 if self.helper.safe_get(result, 'registrant_name'):943 self.helper.guided_pivots(self.helper.safe_get_result, 'registrant name')944 if self.helper.safe_get(result, 'ip'):945 for ip in self.helper.safe_get_result:946 self.helper.guided_pivots(ip, 'ip')947 if self.helper.safe_get(result, 'ssl_info'):948 for ssl in self.helper.safe_get_result:949 self.helper.guided_pivots(ssl, 'ssl')950 if self.helper.safe_get(result, 'mx'):951 for mx in self.helper.safe_get_result:952 self.helper.guided_pivots(mx, 'mx')953 if self.helper.safe_get(result, 'email_domain'):954 for email_domain in self.helper.safe_get_result:955 self.helper.guided_pivots_value(email_domain, 'email_domain')956 return True957 def iris_pivot(self, query, query_type):958 if 'iris-investigate' not in self.svc_enabled or self.svc_enabled['iris-investigate'] <= 0:959 if self.plugin.debug:960 self.log.debug("iris-investigate: service disabled or over monthly limit")961 if self.plugin.module['name'] != 'DomainTools-Iris-Pivot':962 return True963 pivot_map = {964 'domain': 'domain',965 'ip-src': 'ip',966 'ip-dst': 'ip',967 'whois-registrant-email': 'email',968 'email-dst': 'email',969 'email-src': 'email',970 'hostname': 'nameserver_host',971 'whois-registrar': 'registrar',972 'whois-registrant-name': 'registrant',973 'x509-fingerprint-sha1': 'ssl_hash'974 }975 if query_type == 'domain':976 self.iris_pivot_domain(query)977 elif query_type in pivot_map:978 self.iris_pivot_other(query, pivot_map[query_type])979 return True980 def iris_pivot_other(self, query, pivot_type):981 arguments = {pivot_type: query}982 method = self.api.iris_investigate(**arguments)983 results = method.data()984 results = results['response']985 if method._status != 200:986 return987 if results.get('error'):988 self.errors['error'] = results['error']['message']989 return False990 if not self.helper.safe_get(results, 'results'):991 return True992 limit = int(self.plugin.config.get('results_limit'))993 for result in self.helper.safe_get_result[0:limit]:994 self.helper.simple_parse('domain', result, ['domain'], 'Domain from DomainTools Iris',995 ['DomainTools', 'Iris'], 'Domain')996 def iris_pivot_domain(self, query):997 tldex = tldextract.extract(query.replace('\\/', '/'))998 q = '.'.join(tldex[1:])999 if q == '':1000 self.log.debug("q is empty")1001 return True1002 method = self.api.iris_investigate(q)1003 results = method.data()1004 results = results['response']1005 if method._status != 200:1006 return1007 if results.get('error'):1008 self.errors['error'] = results['error']['message']1009 return False1010 if not self.helper.safe_get(results, 'results'):1011 return True1012 result = self.helper.safe_get_result.pop()1013 self.helper.simple_parse('alexa', result, ['text'], 'Alexa from DomainTools',1014 ['DomainTools', 'Iris'], 'Alexa')1015 self.helper.simple_parse('spf_info', result, ['text'], 'SPF Info from DomainTools',1016 ['DomainTools', 'Iris'], 'SPF')1017 self.helper.simple_parse('website_response', result, ['text'], 'Website Response from DomainTools',1018 ['DomainTools', 'Iris'], 'Website Response')1019 self.helper.simple_parse('alexa', result, ['text'], 'Alexa from DomainTools',1020 ['DomainTools', 'Iris'], 'Alexa')1021 if self.helper.safe_get(result, 'create_date'):1022 self.helper.iris_add(self.helper.safe_get_result, ['whois-creation-date'], 'Create Date', categories=['Attribution'])1023 if self.helper.safe_get(result, 'expiration_date'):1024 self.helper.iris_add(self.helper.safe_get_result, ['text'], 'Expiration Date')1025 if self.helper.safe_get(result, 'redirect_domain'):1026 self.helper.iris_add(self.helper.safe_get_result, ['domain'], 'Redirect Domain')1027 if self.helper.safe_get(result, 'registrar'):1028 self.helper.iris_add(self.helper.safe_get_result, ['whois-registrar'], 'Registrar'1029 , categories=['Attribution'])1030 if self.helper.safe_get(result, 'adsense'):1031 self.helper.iris_add(self.helper.safe_get_result, ['text'], 'Adsense')1032 if self.helper.safe_get(result, 'technical_contact'):1033 self.helper.iris_address(self.helper.safe_get_result, 'Technical Contact')1034 if self.helper.safe_get(result, 'registrant_contact'):1035 self.helper.iris_address(self.helper.safe_get_result, 'Registrant Contact')1036 if self.helper.safe_get(result, 'admin_contact'):1037 self.helper.iris_address(self.helper.safe_get_result, 'Admin Contact')1038 if self.helper.safe_get(result, 'billing_contact'):1039 self.helper.iris_address(self.helper.safe_get_result, 'Billing Contact')1040 if self.helper.safe_get(result, 'ip'):1041 ips = self.helper.safe_get_result1042 for ip in ips:1043 self.helper.iris_add(ip['address'], ['ip-dst'], 'IP Address')1044 self.helper.iris_add(ip['country_code'], ['text'], 'Country Code')1045 self.helper.iris_add(ip['isp'], ['text'], 'IP ISP')1046 for asn in ip['asn']:1047 self.helper.iris_add(asn, ['text'], 'IP ASN')1048 if self.helper.safe_get(result, 'ssl_info'):1049 for ssl in self.helper.safe_get_result:1050 self.helper.iris_add(ssl['organization'], ['text'], 'SSL Org')1051 self.helper.iris_add(ssl['subject'], ['text'], 'SSL Subject')1052 self.helper.iris_add(ssl['hash'], ['x509-fingerprint-sha1'], 'SSL Hash')1053 for email in ssl['email']:1054 self.helper.iris_add(email, ['email-dst'], 'SSL Email', categories=['Network activity'])1055 if self.helper.safe_get(result, 'name_server'):1056 for name_server in self.helper.safe_get_result:1057 self.helper.iris_add(name_server['host'], ['hostname'], 'Name Server Host')1058 self.helper.iris_add(name_server['domain'], ['domain'], 'Name Server Domain')1059 for ip in name_server['ip']:1060 self.helper.iris_add(ip, ['ip-src'], 'Name Server IP')1061 if self.helper.safe_get(result, 'mx'):1062 for mx in self.helper.safe_get_result:1063 self.helper.iris_add(mx['host'], ['hostname'], 'Mail Server Host')1064 self.helper.iris_add(mx['domain'], ['domain'], 'Mail Server Domain')1065 for ip in mx['ip']:1066 self.helper.iris_add(ip, ['ip-src'], 'Mail Server IP')1067 if self.helper.safe_get(result, 'soa_email'):1068 for soa in self.helper.safe_get_result:1069 self.helper.iris_add(soa, ['email-dst'], 'SOA email', categories=['Network activity'])1070 if self.helper.safe_get(result, 'additional_whois_email'):1071 for email in self.helper.safe_get_result:1072 self.helper.iris_add(email, ['whois-registrant-email'], 'Whois Email', categories=['Attribution'])1073 if self.helper.safe_get(result, 'domain_risk'):1074 risk = self.helper.safe_get_result1075 self.helper.simple_parse('risk_score', risk, ['text'], 'Risk Score from DomainTools',1076 ['DomainTools', 'Iris'], 'Risk Score')1077 for index, component in enumerate(risk['components']):1078 self.helper.append_unique_payload({'types': ['text'], 'categories': ['External analysis'], 'values': {1079 '{0} Risk Component'.format(component['name']): '{0}'.format(component['risk_score'],1080 component['risk_score'])},1081 'comment': '{0} Risk Component from DomainTools'.format(1082 component['name']),1083 'tags': ['DomainTools', 'Iris']})1084 def iris_import(self, query, query_type):1085 if 'iris-import' not in self.svc_enabled or self.svc_enabled['iris-import'] <= 0:1086 if self.plugin.debug:1087 self.log.debug("iris-import: service disabled or over monthly limit")1088 if self.plugin.module['name'] != 'DomainTools-Iris-Import':1089 return True1090 q = base64.b64decode(query)1091 if q == '':1092 self.log.debug("query is empty")1093 return True1094 method = self.api.iris_investigate(search_hash=q)1095 results = method.data()1096 results = results['response']1097 if method._status != 200:1098 return1099 if results.get('error'):1100 self.errors['error'] = results['error']['message']1101 return False1102 if not self.helper.safe_get(results, 'results'):1103 return True1104 limit = int(self.plugin.config.get('results_limit'))1105 for result in self.helper.safe_get_result[0:limit]:1106 self.helper.simple_parse('domain', result, ['domain'], 'Domain from DomainTools Iris',1107 ['DomainTools', 'Iris'], 'Domain')...

Full Screen

Full Screen

result.py

Source:result.py Github

copy

Full Screen

...355 """356 if test.__slash__ is None:357 raise LookupError("Could not find result for {}".format(test))358 return self._results_dict[test.__slash__.id]359 def safe_get_result(self, test):360 if test.__slash__ is None:361 return None362 return self._results_dict.get(test.__slash__.id)363 def __getitem__(self, test):364 if isinstance(test, Number):365 try:366 return next(itertools.islice(self._results_dict.values(), test, test + 1))367 except StopIteration:368 raise IndexError()...

Full Screen

Full Screen

resuming.py

Source:resuming.py Github

copy

Full Screen

...91 test_to_resume.status = ResumeTestStatus.PLANNED92 else:93 test_to_resume.status = ResumeTestStatus.FAILED94 tests_to_resume.append(test_to_resume)95 tests_with_no_results = {test.__slash__ for test in collected_tests if session_result.safe_get_result(test) is None}96 for test_metadata in tests_with_no_results:97 test_to_resume = ResumeState(session_id=session_result.session.id, file_name=test_metadata.file_path,\98 address_in_file=test_metadata.address_in_file, status=ResumeTestStatus.PLANNED)99 test_to_resume.variation = str(test_metadata.variation.id) if test_metadata.variation else None100 tests_to_resume.append(test_to_resume)101 with connecting_to_db() as conn:102 conn.add(session_metadata) # pylint: disable=no-member103 conn.add_all(tests_to_resume) # pylint: disable=no-member104 _logger.debug('Saved resume state to DB')105def get_last_resumeable_session_id():106 current_folder = os.path.abspath(os.getcwd())107 with connecting_to_db() as conn:108 # pylint: disable=no-member109 session_id = conn.query(SessionMetadata).filter(SessionMetadata.src_folder == current_folder) \...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Slash automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful