How to use from_statement method in avocado

Best Python code snippet using avocado_python

test_qradar_events_stix_to_query.py

Source:test_qradar_events_stix_to_query.py Github

copy

Full Screen

1from stix_shifter.stix_translation import stix_translation2from stix_shifter_utils.utils.error_response import ErrorCode3import unittest4import random5import json6options_file = open('stix_shifter_modules/qradar/tests/stix_translation/qradar_stix_to_aql/options.json').read()7selections_file = open('stix_shifter_modules/qradar/stix_translation/json/aql_events_fields.json').read()8protocols_file = open('stix_shifter_modules/qradar/stix_translation/json/network_protocol_map.json').read()9OPTIONS = json.loads(options_file)10DEFAULT_SELECTIONS = json.loads(selections_file)11DEFAULT_LIMIT = 1000012DEFAULT_time_range = 513PROTOCOLS = json.loads(protocols_file)14MAPPING_ERROR = "Unable to map the following STIX objects and properties to data source fields:"15selections = "SELECT {}".format(", ".join(DEFAULT_SELECTIONS['default']))16custom_selections = "SELECT {}".format(", ".join(OPTIONS['mapping']['aql_events_fields']['default']))17from_statement = " FROM events "18default_limit = "limit {}".format(DEFAULT_LIMIT)19default_time = "last {} minutes".format(DEFAULT_time_range)20translation = stix_translation.StixTranslation()21def _test_query_assertions(query, selections, from_statement, where_statement):22 assert query['queries'] == [selections + from_statement + where_statement]23def _translate_query(stix_pattern):24 return translation.translate('qradar:events', 'query', '{}', stix_pattern)25class TestQueryTranslator(unittest.TestCase, object):26 def test_ipv4_query(self):27 stix_pattern = "[ipv4-addr:value = '192.168.122.83' OR ipv4-addr:value = '192.168.122.84/10']"28 query = _translate_query(stix_pattern)29 where_statement = "WHERE (INCIDR('192.168.122.84/10',sourceip) OR INCIDR('192.168.122.84/10',destinationip) OR INCIDR('192.168.122.84/10',identityip)) OR (sourceip = '192.168.122.83' OR destinationip = '192.168.122.83' OR identityip = '192.168.122.83') {} {}".format(30 default_limit, default_time)31 _test_query_assertions(query, selections, from_statement, where_statement)32 def test_ipv6_query(self):33 stix_pattern = "[ipv6-addr:value = '3001:0:0:0:0:0:0:2']"34 query = _translate_query(stix_pattern)35 where_statement = "WHERE (sourceip = '3001:0:0:0:0:0:0:2' OR destinationip = '3001:0:0:0:0:0:0:2') {} {}".format(default_limit, default_time)36 _test_query_assertions(query, selections, from_statement, where_statement)37 def test_url_query(self):38 stix_pattern = "[url:value = 'http://www.testaddress.com']"39 query = _translate_query(stix_pattern)40 where_statement = "WHERE url = 'http://www.testaddress.com' {} {}".format(default_limit, default_time)41 _test_query_assertions(query, selections, from_statement, where_statement)42 def test_NOT_and_not_equals_operators(self):43 search_string1 = "www.example.com"44 search_string2 = "www.example.ca"45 stix_pattern = "[url:value != '{}' OR url:value NOT = '{}']".format(search_string1, search_string2)46 query = _translate_query(stix_pattern)47 where_statement = "WHERE NOT (url = '{1}') OR url != '{0}' {2} {3}".format(48 search_string1, search_string2, default_limit, default_time)49 _test_query_assertions(query, selections, from_statement, where_statement)50 def test_mac_address_query(self):51 stix_pattern = "[mac-addr:value = '00-00-5E-00-53-00']"52 query = _translate_query(stix_pattern)53 where_statement = "WHERE (sourcemac = '00-00-5E-00-53-00' OR destinationmac = '00-00-5E-00-53-00') {} {}".format(default_limit, default_time)54 _test_query_assertions(query, selections, from_statement, where_statement)55 def test_query_from_multiple_observation_expressions_joined_by_AND(self):56 stix_pattern = "[url:value = 'www.example.com'] AND [mac-addr:value = '00-00-5E-00-53-00']"57 query = _translate_query(stix_pattern)58 # Expect the STIX and to convert to an AQL OR.59 where_statement = "WHERE (url = 'www.example.com') OR ((sourcemac = '00-00-5E-00-53-00' OR destinationmac = '00-00-5E-00-53-00')) {} {}".format(default_limit, default_time)60 _test_query_assertions(query, selections, from_statement, where_statement)61 def test_query_from_multiple_comparison_expressions_joined_by_AND(self):62 stix_pattern = "[(url:value = 'www.example.com' OR url:value = 'www.test.com') AND mac-addr:value = '00-00-5E-00-53-00']"63 query = _translate_query(stix_pattern)64 # Expect the STIX and to convert to an AQL AND.65 where_statement = "WHERE (sourcemac = '00-00-5E-00-53-00' OR destinationmac = '00-00-5E-00-53-00') AND (url = 'www.test.com' OR url = 'www.example.com') {} {}".format(default_limit, default_time)66 _test_query_assertions(query, selections, from_statement, where_statement)67 def test_file_query(self):68 # TODO: Add support for file hashes. Unsure at this point how QRadar queries them69 stix_pattern = "[file:name = 'some_file.exe']"70 query = _translate_query(stix_pattern)71 where_statement = "WHERE filename = 'some_file.exe' {} {}".format(default_limit, default_time)72 _test_query_assertions(query, selections, from_statement, where_statement)73 def test_port_queries(self):74 stix_pattern = "[network-traffic:src_port = 12345 OR network-traffic:dst_port = 23456]"75 query = _translate_query(stix_pattern)76 where_statement = "WHERE destinationport = '23456' OR sourceport = '12345' {} {}".format(default_limit, default_time)77 _test_query_assertions(query, selections, from_statement, where_statement)78 def test_unmapped_attribute_with_AND(self):79 stix_pattern = "[unmapped-object:some_invalid_attribute = 'whatever' AND file:name = 'some_file.exe']"80 result = translation.translate('qradar', 'query', '{}', stix_pattern)81 assert result['success'] == False82 assert ErrorCode.TRANSLATION_MAPPING_ERROR.value == result['code']83 assert MAPPING_ERROR in result['error']84 def test_pattern_with_two_observation_exp_with_one_unmapped_attribute(self):85 stix_pattern = "[unmapped-object:some_invalid_attribute = 'whatever'] AND [file:name = 'some_file.exe']"86 query = _translate_query(stix_pattern)87 where_statement = "WHERE filename = 'some_file.exe' {} {}".format(default_limit, default_time)88 _test_query_assertions(query, selections, from_statement, where_statement)89 def test_pattern_with_one_observation_exp_with_one_unmapped_attribute(self):90 stix_pattern = "[network-traffic:some_invalid_attribute = 'whatever']"91 result = translation.translate('qradar', 'query', '{}', stix_pattern)92 assert result['success'] == False93 assert ErrorCode.TRANSLATION_MAPPING_ERROR.value == result['code']94 assert MAPPING_ERROR in result['error']95 def test_unmapped_attribute_with_OR(self):96 stix_pattern = "[network-traffic:some_invalid_attribute = 'whatever' OR file:name = 'some_file.exe']"97 query = _translate_query(stix_pattern)98 where_statement = "WHERE filename = 'some_file.exe' {} {}".format(default_limit, default_time)99 _test_query_assertions(query, selections, from_statement, where_statement)100 def test_pattern_with_two_observation_exps_one_with_unmapped_attribute(self):101 stix_pattern = "[network-traffic:some_invalid_attribute = 'whatever'] OR [file:name = 'some_file.exe' AND url:value = 'www.example.com']"102 query = _translate_query(stix_pattern)103 where_statement = "WHERE url = 'www.example.com' AND filename = 'some_file.exe' {} {}".format(default_limit, default_time)104 assert query['queries'] == [selections + from_statement + where_statement]105 def test_pattern_with_three_observation_exps_one_with_unmapped_attribute(self):106 stix_pattern = "[file:name = 'some_file.exe' AND network-traffic:some_invalid_attribute = 'whatever'] OR [url:value = 'www.example.com'] AND [mac-addr:value = '00-00-5E-00-53-00']"107 query = _translate_query(stix_pattern)108 where_statement = "WHERE (url = 'www.example.com') OR ((sourcemac = '00-00-5E-00-53-00' OR destinationmac = '00-00-5E-00-53-00')) {} {}".format(default_limit, default_time)109 assert query['queries'] == [selections + from_statement + where_statement]110 def test_user_account_query(self):111 stix_pattern = "[user-account:user_id = 'root']"112 query = _translate_query(stix_pattern)113 where_statement = "WHERE username = 'root' {} {}".format(default_limit, default_time)114 _test_query_assertions(query, selections, from_statement, where_statement)115 def test_invalid_stix_pattern(self):116 stix_pattern = "[not_a_valid_pattern]"117 result = translation.translate('qradar', 'query', '{}', stix_pattern, {'validate_pattern': 'true'})118 assert result['success'] == False119 assert ErrorCode.TRANSLATION_STIX_VALIDATION.value == result['code']120 assert stix_pattern[1:-1] in result['error']121 def test_network_traffic_protocols(self):122 for key, value in PROTOCOLS.items():123 # Test for both upper and lower case protocols in the STIX pattern124 if random.randint(0, 1) == 0:125 key = key.upper()126 stix_pattern = "[network-traffic:protocols[*] = '" + key + "']"127 query = _translate_query(stix_pattern)128 where_statement = "WHERE protocolid = '{}' {} {}".format(value, default_limit, default_time)129 _test_query_assertions(query, selections, from_statement, where_statement)130 def test_network_traffic_start_stop(self):131 stix_pattern = "[network-traffic:'start' = '2018-06-14T08:36:24.000Z' OR network-traffic:end = '2018-06-14T08:36:24.567Z']"132 query = _translate_query(stix_pattern)133 where_statement = "WHERE endtime = '1528965384567' OR starttime = '1528965384000' {} {}".format(default_limit, default_time)134 _test_query_assertions(query, selections, from_statement, where_statement)135 def test_start_stop_qualifiers_with_one_observation_with_an_unmapped_attribute(self):136 start_time_01 = "t'2016-06-01T01:30:00.123Z'"137 stop_time_01 = "t'2016-06-01T02:20:00.123Z'"138 unix_start_time_01 = 1464744600123139 unix_stop_time_01 = 1464747600123140 stix_pattern = "[network-traffic:src_port = 37020 AND user-account:user_id = 'root' OR network-traffic:some_invalid_attribute = 'whatever'] START {} STOP {}".format(start_time_01, stop_time_01)141 query = _translate_query(stix_pattern)142 where_statement = "WHERE username = 'root' AND sourceport = '37020' {} START {} STOP {}".format(default_limit, unix_start_time_01, unix_stop_time_01)143 assert len(query['queries']) == 1144 assert query['queries'] == [selections + from_statement + where_statement]145 def test_start_stop_qualifiers_with_two_observations(self):146 start_time_01 = "t'2016-06-01T01:30:00.123Z'"147 stop_time_01 = "t'2016-06-01T02:20:00.123Z'"148 start_time_02 = "t'2016-06-01T03:55:00.123Z'"149 stop_time_02 = "t'2016-06-01T04:30:24.743Z'"150 unix_start_time_01 = 1464744600123151 unix_stop_time_01 = 1464747600123152 unix_start_time_02 = 1464753300123153 unix_stop_time_02 = 1464755424743154 stix_pattern = "[network-traffic:src_port = 37020 AND user-account:user_id = 'root'] START {} STOP {} OR [ipv4-addr:value = '192.168.122.83'] START {} STOP {}".format(start_time_01, stop_time_01, start_time_02, stop_time_02)155 query = _translate_query(stix_pattern)156 where_statement_01 = "WHERE username = 'root' AND sourceport = '37020' {} START {} STOP {}".format(default_limit, unix_start_time_01, unix_stop_time_01)157 where_statement_02 = "WHERE (sourceip = '192.168.122.83' OR destinationip = '192.168.122.83' OR identityip = '192.168.122.83') {} START {} STOP {}".format(default_limit, unix_start_time_02, unix_stop_time_02)158 assert len(query['queries']) == 2159 assert query['queries'] == [selections + from_statement + where_statement_01, selections + from_statement + where_statement_02]160 # BROKEN, not returning query without qualifier161 def test_start_stop_qualifiers_with_three_observations_and_an_unmapped_attribute(self):162 start_time_01 = "t'2016-06-01T00:00:00.123Z'"163 stop_time_01 = "t'2016-06-01T01:11:11.456Z'"164 start_time_02 = "t'2016-06-07T02:22:22.789Z'"165 stop_time_02 = "t'2016-06-07T03:33:33.012Z'"166 unix_start_time_01 = 1464739200123167 unix_stop_time_01 = 1464743471456168 unix_start_time_02 = 1465266142789169 unix_stop_time_02 = 1465270413012170 stix_pattern = "[network-traffic:src_port = 37020 AND network-traffic:dst_port = 635] START {} STOP {} OR [url:value = 'www.example.com'] OR [ipv4-addr:value = '333.333.333.0' OR network-traffic:some_invalid_attribute = 'whatever'] START {} STOP {}".format(171 start_time_01, stop_time_01, start_time_02, stop_time_02)172 query = _translate_query(stix_pattern)173 where_statement_01 = "WHERE destinationport = '635' AND sourceport = '37020' {} START {} STOP {}".format(default_limit, unix_start_time_01, unix_stop_time_01)174 where_statement_02 = "WHERE (sourceip = '333.333.333.0' OR destinationip = '333.333.333.0' OR identityip = '333.333.333.0') {} START {} STOP {}".format(default_limit, unix_start_time_02, unix_stop_time_02)175 where_statement_03 = "WHERE url = 'www.example.com' {} {}".format(default_limit, default_time)176 assert len(query['queries']) == 3177 assert query['queries'] == [selections + from_statement + where_statement_01, selections + from_statement + where_statement_02, selections + from_statement + where_statement_03]178 def test_start_stop_qualifiers_with_missing_or_partial_milliseconds(self):179 # missing milliseconds180 start_time_01 = "t'2016-06-01T01:30:00Z'"181 stop_time_01 = "t'2016-06-01T02:20:00Z'"182 # one-digit millisecond183 start_time_02 = "t'2016-06-01T03:55:00.1Z'"184 # four-digit millisecond185 stop_time_02 = "t'2016-06-01T04:30:24.1243Z'"186 unix_start_time_01 = 1464744600000187 unix_stop_time_01 = 1464747600000188 unix_start_time_02 = 1464753300100189 unix_stop_time_02 = 1464755424124190 stix_pattern = "[user-account:user_id = 'root'] START {} STOP {} OR [ipv4-addr:value = '192.168.122.83'] START {} STOP {}".format(start_time_01, stop_time_01, start_time_02, stop_time_02)191 query = _translate_query(stix_pattern)192 where_statement_01 = "WHERE username = 'root' {} START {} STOP {}".format(default_limit, unix_start_time_01, unix_stop_time_01)193 where_statement_02 = "WHERE (sourceip = '192.168.122.83' OR destinationip = '192.168.122.83' OR identityip = '192.168.122.83') {} START {} STOP {}".format(default_limit, unix_start_time_02, unix_stop_time_02)194 assert len(query['queries']) == 2195 assert query['queries'] == [selections + from_statement + where_statement_01, selections + from_statement + where_statement_02]196 def test_issubset_operators(self):197 stix_pattern = "[ipv4-addr:value ISSUBSET '198.51.100.0/24']"198 query = _translate_query(stix_pattern)199 where_statement = "WHERE (INCIDR('198.51.100.0/24',sourceip) OR INCIDR('198.51.100.0/24',destinationip) OR INCIDR('198.51.100.0/24',identityip)) {} {}".format(default_limit, default_time)200 _test_query_assertions(query, selections, from_statement, where_statement)201 def test_custom_time_limit_and_result_count_and_mappings(self):202 stix_pattern = "[ipv4-addr:value = '192.168.122.83']"203 result_limit = OPTIONS['result_limit']204 time_range = OPTIONS['time_range']205 query = translation.translate('qradar:events', 'query', '{}', stix_pattern, OPTIONS)206 where_statement = "WHERE (sourceip = '192.168.122.83' OR destinationip = '192.168.122.83' OR identityip = '192.168.122.83') limit {} last {} minutes".format(result_limit, time_range)207 assert query == {'queries': [custom_selections + from_statement + where_statement]}208 def test_domainname_query(self):209 stix_pattern = "[domain-name:value = 'example.com']"210 query = _translate_query(stix_pattern)211 where_statement = "WHERE (domainname LIKE '%example.com%' OR UrlHost LIKE '%example.com%') {} {}".format(default_limit, default_time)212 _test_query_assertions(query, selections, from_statement, where_statement)213 def test_generic_filehash_query(self):214 stix_pattern = "[file:hashes.'SHA-256' = 'sha256hash']"215 query = _translate_query(stix_pattern)216 where_statement = "WHERE sha256hash = 'sha256hash' {} {}".format(default_limit, default_time)217 _test_query_assertions(query, selections, from_statement, where_statement)218 # def test_sha256_filehash_query(self):219 # stix_pattern = "[file:hashes.'SHA-256' = 'sha256hash']"220 # query = translation.translate('qradar', 'query', '{}', stix_pattern, OPTIONS)221 # where_statement = "WHERE (sha256hash = 'sha256hash' OR filehash = 'sha256hash') limit {} last {} minutes".format(OPTIONS['result_limit'], OPTIONS['time_range'])222 # assert query == {'queries': [custom_selections + from_statement + where_statement]}223 # def test_multi_filehash_query(self):224 # stix_pattern = "[file:hashes.'SHA-256' = 'sha256hash'] OR [file:hashes.'MD5' = 'md5hash']"225 # query = translation.translate('qradar', 'query', '{}', stix_pattern, OPTIONS)226 # where_statement = "WHERE ((sha256hash = 'sha256hash' OR filehash = 'sha256hash')) OR ((md5hash = 'md5hash' OR filehash = 'md5hash')) limit {} last {} minutes".format(OPTIONS['result_limit'], OPTIONS['time_range'])227 # assert query == {'queries': [custom_selections + from_statement + where_statement]}228 def test_source_and_destination_references(self):229 where_statements = [230 [231 "WHERE sourceip = '192.0.2.0' {} {}".format(default_limit, default_time),232 "WHERE sourcemac = '00-00-5E-00-53-00' {} {}".format(default_limit, default_time),233 "WHERE INCIDR('192.0.2.0/25',sourceip) {} {}".format(default_limit, default_time),234 "WHERE sourceip = '3001:0:0:0:0:0:0:2' {} {}".format(default_limit, default_time)235 ],236 [237 "WHERE destinationip = '192.0.2.0' {} {}".format(default_limit, default_time),238 "WHERE destinationmac = '00-00-5E-00-53-00' {} {}".format(default_limit, default_time),239 "WHERE INCIDR('192.0.2.0/25',destinationip) {} {}".format(default_limit, default_time),240 "WHERE destinationip = '3001:0:0:0:0:0:0:2' {} {}".format(default_limit, default_time)241 ]242 ]243 for ref_index, reference in enumerate(["network-traffic:src_ref.value", "network-traffic:dst_ref.value"]):244 for dat_index, datum in enumerate(["'192.0.2.0'", "'00-00-5E-00-53-00'", "'192.0.2.0/25'", "'3001:0:0:0:0:0:0:2'"]):245 stix_pattern = "[{} = {}]".format(reference, datum)246 query = _translate_query(stix_pattern)247 where_statement = where_statements[ref_index][dat_index]248 _test_query_assertions(query, selections, from_statement, where_statement)249 def test_nested_parenthesis_in_pattern(self):250 stix_pattern = "[(ipv4-addr:value = '192.168.122.83' OR ipv4-addr:value = '100.100.122.90') AND network-traffic:src_port = 37020] OR [user-account:user_id = 'root'] AND [url:value = 'www.example.com']"251 query = _translate_query(stix_pattern)252 where_statement = "WHERE (sourceport = '37020' AND ((sourceip = '100.100.122.90' OR destinationip = '100.100.122.90' OR identityip = '100.100.122.90') OR (sourceip = '192.168.122.83' OR destinationip = '192.168.122.83' OR identityip = '192.168.122.83'))) OR ((username = 'root') OR (url = 'www.example.com')) {} {}".format(default_limit, default_time)253 assert query['queries'] == [selections + from_statement + where_statement]254 def test_complex_multiple_comparison_expression(self):255 url_1 = "example01.ru"256 url_2 = "example02.ru"257 url_3 = "example01.com"258 url_4 = "example03.ru"259 url_5 = "example02.com"260 url_6 = "example04.ru"261 stix_pattern = "[url:value = '{0}' OR url:value = '{1}' OR url:value = '{2}' OR url:value = '{3}' OR url:value = '{4}' OR url:value = '{5}'] START t'2019-06-24T19:05:43.000Z' STOP t'2019-06-25T19:05:43.000Z'".format(url_1, url_2, url_3, url_4, url_5, url_6)262 query = _translate_query(stix_pattern)263 where_statement = "WHERE url = '{5}' OR (url = '{4}' OR (url = '{3}' OR (url = '{2}' OR (url = '{1}' OR url = '{0}')))) {6} START 1561403143000 STOP 1561489543000".format(url_1, url_2, url_3, url_4, url_5, url_6, default_limit)264 assert query['queries'] == [selections + from_statement + where_statement]265 def test_LIKE_operator(self):266 search_string = 'example.com'267 stix_pattern = "[url:value LIKE '{}']".format(search_string)268 query = _translate_query(stix_pattern)269 where_statement = "WHERE url LIKE '%{0}%' {1} {2}".format(search_string, default_limit, default_time)270 _test_query_assertions(query, selections, from_statement, where_statement)271 def test_payload_string_matching_with_LIKE(self):272 search_string = 'search term'273 stix_pattern = "[artifact:payload_bin LIKE '{}']".format(search_string)274 query = _translate_query(stix_pattern)275 where_statement = "WHERE TEXT SEARCH '{}' {} {}".format(search_string, default_limit, default_time)276 _test_query_assertions(query, selections, from_statement, where_statement)277 def test_payload_string_matching_with_MATCH(self):278 search_string = '^.*https://wally.fireeye.com.*$'279 stix_pattern = "[artifact:payload_bin MATCHES '{}']".format(search_string)280 query = _translate_query(stix_pattern)281 where_statement = "WHERE eventpayload MATCHES '{}' {} {}".format(search_string, default_limit, default_time)282 _test_query_assertions(query, selections, from_statement, where_statement)283 def test_backslash_escaping(self):284 # Stix pattern requires backslash to be double escaped to pass pattern validation.285 # Not sure yet how we will make this work for an AQL query.286 # See https://github.com/oasis-open/cti-stix2-json-schemas/issues/51287 search_string = '^.*http://graphics8\\\.nytimes\\\.com/bcvideo.*$'288 stix_pattern = "[artifact:payload_bin MATCHES '{}']".format(search_string)289 query = _translate_query(stix_pattern)290 translated_value = '^.*http://graphics8\\.nytimes\\.com/bcvideo.*$'291 where_statement = "WHERE eventpayload MATCHES '{}' {} {}".format(translated_value, default_limit, default_time)292 _test_query_assertions(query, selections, from_statement, where_statement)293 def test_filepath_queries(self):294 first_path = 'C:/first/file/path'295 second_path = 'D:/second/file/path'296 stix_pattern = "[(file:parent_directory_ref = '{}' OR directory:path = '{}')]".format(first_path, second_path)297 query = _translate_query(stix_pattern)298 where_statement = "WHERE filepath = '{}' OR filepath = '{}' {} {}".format(second_path, first_path, default_limit, default_time)299 _test_query_assertions(query, selections, from_statement, where_statement)300 def test_risk_finding(self):301 stix_pattern="[x-ibm-finding:name = '*']"302 query = _translate_query(stix_pattern)303 where_statement = "WHERE devicetype = 18 {} {}".format(default_limit, default_time)304 _test_query_assertions(query, selections, from_statement, where_statement)305 def test_rule_name_query(self):306 rule_name = 'Context is Local to Remote'307 stix_pattern="[x-ibm-finding:rule_names[*] = '{}']".format(rule_name)308 query = _translate_query(stix_pattern)309 where_statement = "WHERE rulenames = '{}' {} {}".format(rule_name, default_limit, default_time)310 _test_query_assertions(query, selections, from_statement, where_statement)311 312 def test_text_search(self):313 stix_pattern = "[artifact:payload_bin LIKE '%Set-ItemProperty%' AND artifact:payload_bin LIKE '%New-Item%']"314 query = _translate_query(stix_pattern)315 where_statement = "WHERE TEXT SEARCH '%New-Item% AND %Set-ItemProperty%' {} {}".format(default_limit, default_time)316 _test_query_assertions(query, selections, from_statement, where_statement)317 318 def test_combined_observation_expression_with_qualifier(self):319 stix_pattern = "([ipv4-addr:value = '192.168.1.2'] OR [url:value LIKE '%.example.com']) START t'2020-09-11T13:00:52.000Z' STOP t'2020-09-11T13:59:04.000Z'"320 query = _translate_query(stix_pattern)321 where_statement_01 = "WHERE (sourceip = '192.168.1.2' OR destinationip = '192.168.1.2' OR identityip = '192.168.1.2') limit 10000 START 1599829252000 STOP 1599832744000"322 where_statement_02 = "WHERE url LIKE '%%.example.com%' limit 10000 START 1599829252000 STOP 1599832744000"323 assert len(query['queries']) == 2324 assert query['queries'][0] == selections + from_statement + where_statement_01325 assert query['queries'][1] == selections + from_statement + where_statement_02326 def test_registry_search(self):327 stix_pattern = "[windows-registry-key:values[*].name = 'abcd']"328 query = _translate_query(stix_pattern)329 where_statement = "WHERE RegistryValueName = 'abcd' {} {}".format(default_limit, default_time)330 _test_query_assertions(query, selections, from_statement, where_statement)331 stix_pattern = "[windows-registry-key:key = 'efgh']"332 query = _translate_query(stix_pattern)333 where_statement = "WHERE (ObjectName = 'efgh' OR RegistryKey = 'efgh') {} {}".format(default_limit, default_time)334 _test_query_assertions(query, selections, from_statement, where_statement)335 def test_x_ibm_host_search(self):336 stix_pattern = "[x-oca-asset:hostname = 'abcd']"337 query = _translate_query(stix_pattern)338 where_statement = "WHERE identityhostname = 'abcd' {} {}".format(default_limit, default_time)339 _test_query_assertions(query, selections, from_statement, where_statement)340 stix_pattern = "[x-oca-asset:ip_refs[*].value = '9.9.9.9']"341 query = _translate_query(stix_pattern)342 where_statement = "WHERE (identityip = '9.9.9.9' OR sourceip = '9.9.9.9') {} {}".format(default_limit, default_time)343 _test_query_assertions(query, selections, from_statement, where_statement)344 stix_pattern = "[x-oca-asset:mac_refs[*].value = '00-00-5E-00-53-00']"345 query = _translate_query(stix_pattern)346 where_statement = "WHERE sourcemac = '00-00-5E-00-53-00' {} {}".format(default_limit, default_time)347 _test_query_assertions(query, selections, from_statement, where_statement)348 def test_x_ibm_event_search(self):349 stix_pattern = "[x-oca-event:action = 'abcd']"350 query = _translate_query(stix_pattern)351 where_statement = "WHERE qidname = 'abcd' {} {}".format(default_limit, default_time)352 _test_query_assertions(query, selections, from_statement, where_statement)353 stix_pattern = "[x-oca-event:code = 1]"354 query = _translate_query(stix_pattern)355 where_statement = "WHERE EventID = '1' {} {}".format(default_limit, default_time)356 _test_query_assertions(query, selections, from_statement, where_statement)357 stix_pattern = "[x-oca-event:process_ref.command_line = 'abc']"358 query = _translate_query(stix_pattern)359 where_statement = "WHERE ProcessCommandLine = 'abc' {} {}".format(default_limit, default_time)360 _test_query_assertions(query, selections, from_statement, where_statement)361 def test_in_operators(self):362 stix_pattern = "[network-traffic:dst_port IN ('22','443')]"363 query = _translate_query(stix_pattern)364 where_statement = "WHERE destinationport IN ('22', '443') {} {}".format(default_limit, default_time)...

Full Screen

Full Screen

test_class.py

Source:test_class.py Github

copy

Full Screen

1'''2STIX to CSQL query adaptor test cases3'''4from stix_shifter.stix_translation import stix_translation5from stix_shifter_utils.utils.error_response import ErrorCode6import unittest7import json8import random9selections = "SELECT Network.A as sourceip, Transport.A as sourceport, \10Link.A as sourcemac, Network.B as destinationip, Transport.B as destinationport, \11Link.B as destinationmac, Transport.Protocol as protocol, Start as starttime, \12Last as endtime"13at_selections = "SELECT initiator.id as initiator_id, initiator.name as initiator_name, \14initiator.credential.type as initiator_credential_type, \15initiator.host.address as initiator_host_address, ALCH_ACCOUNT_ID as \16alch_account_id, ALCH_TENANT_ID as alch_tenant_id, eventTime as eventTime, \17action as action, target.id as target_id, target.name as target_name, \18target.host.address as target_host_address, event_uuid as event_uuid, \19observer.host.address as observer_host_addres, observer.name as \20observer_name, observer.host.address as observer_host_address, api.name \21as api_name"22at_from_statement = " FROM cos://us-geo/at-hourly-dumps STORED AS JSON "23from_statement = " FROM cos://us-geo/nf-hourly-dumps STORED AS JSON "24num_rows = 102425protocols_file = open('stix_shifter_modules/csa/stix_translation/json/network_protocol_map.json').read()26PROTOCOLS = json.loads(protocols_file)27translation = stix_translation.StixTranslation()28def _translate_query(stix_pattern, dialect):29 return translation.translate("csa:{}".format(dialect), 'query', '{}', stix_pattern)30def _test_query_assertions(query, selections, from_statement, where_statement):31 assert query['queries'] == [selections + from_statement + where_statement + ' PARTITIONED EVERY {num_rows} ROWS'.format(num_rows=num_rows)]32class TestStixToSql(unittest.TestCase, object):33 def test_ipv4_query(self):34 dialect = 'nf'35 stix_pattern = "[ipv4-addr:value = '192.168.122.83' OR ipv4-addr:value = '192.168.122.84']"36 query = _translate_query(stix_pattern, dialect)37 where_statement = "WHERE (Network.A = '192.168.122.84' OR Network.B = '192.168.122.84') OR (Network.A = '192.168.122.83' OR Network.B = '192.168.122.83')"38 _test_query_assertions(query, selections, from_statement, where_statement)39 def test_ipv4_in_query(self):40 dialect = 'nf'41 stix_pattern = "[ipv4-addr:value IN ('192.168.122.83', '192.168.122.84')]"42 query = _translate_query(stix_pattern, dialect)43 where_statement = "WHERE (Network.A IN (192.168.122.83 OR 192.168.122.84) OR Network.B IN (192.168.122.83 OR 192.168.122.84))"44 assert query['queries'] == [selections + from_statement + where_statement + ' PARTITIONED EVERY {num_rows} ROWS'.format(num_rows=num_rows)]45 def test_ipv6_query(self):46 dialect = 'nf'47 stix_pattern = "[ipv6-addr:value = '192.168.122.83']"48 query = _translate_query(stix_pattern, dialect)49 where_statement = "WHERE (Network.A = '192.168.122.83' OR Network.B = '192.168.122.83')"50 _test_query_assertions(query, selections, from_statement, where_statement)51 # Non-mappable now throws an error. Skydive doesn't have url52 # def test_url_query(self):53 # dialect = 'nf'54 # stix_pattern = "[url:value = 'http://www.testaddress.com']"55 # query = _translate_query(stix_pattern, dialect)56 # where_statement = "WHERE url = 'http://www.testaddress.com'"57 # _test_query_assertions(query, selections, from_statement, where_statement)58 def test_mac_address_query(self):59 dialect = 'nf'60 stix_pattern = "[mac-addr:value = '00-00-5E-00-53-00']"61 query = _translate_query(stix_pattern, dialect)62 where_statement = "WHERE (Link.A = '00-00-5E-00-53-00' OR Link.B = '00-00-5E-00-53-00')"63 _test_query_assertions(query, selections, from_statement, where_statement)64 # def test_domain_query(self):65 # dialect = 'nf'66 # stix_pattern = "[domain-name:value = 'example.com']"67 # query = _translate_query(stix_pattern, dialect)68 # where_statement = "WHERE domainname = 'example.com'"69 # _test_query_assertions(query, selections, from_statement, where_statement)70 def test_query_from_multiple_observation_expressions_joined_by_and(self):71 dialect = 'nf'72 stix_pattern = "[domain-name:value = 'example.com'] AND [mac-addr:value = '00-00-5E-00-53-00']"73 query = _translate_query(stix_pattern, dialect)74 # Expect the STIX AND to convert to an AQL OR.75 where_statement = "WHERE domainname = 'example.com' OR (Link.A = '00-00-5E-00-53-00' OR Link.B = '00-00-5E-00-53-00')"76 _test_query_assertions(query, selections, from_statement, where_statement)77 def test_query_from_multiple_comparison_expressions_joined_by_and(self):78 dialect = 'nf'79 stix_pattern = "[domain-name:value = 'example.com' AND mac-addr:value = '00-00-5E-00-53-00']"80 query = _translate_query(stix_pattern, dialect)81 # Expect the STIX AND to convert to an AQL AND.82 where_statement = "WHERE (Link.A = '00-00-5E-00-53-00' OR Link.B = '00-00-5E-00-53-00') AND domainname = 'example.com'"83 _test_query_assertions(query, selections, from_statement, where_statement)84 # def test_file_query(self):85 # # TODO: Add support for file hashes. Unsure at this point how QRadar queries them86 # dialect = 'nf'87 # stix_pattern = "[file:name = 'some_file.exe']"88 # query = _translate_query(stix_pattern, dialect)89 # where_statement = "WHERE filename = 'some_file.exe'"90 # _test_query_assertions(query, selections, from_statement, where_statement)91 def test_port_queries(self):92 dialect = 'nf'93 stix_pattern = "[network-traffic:src_port = 12345 OR network-traffic:dst_port = 23456]"94 query = _translate_query(stix_pattern, dialect)95 where_statement = "WHERE Transport.B = '23456' OR Transport.A = '12345'"96 _test_query_assertions(query, selections, from_statement, where_statement)97 def test_unmapped_attribute_with_AND(self):98 stix_pattern = "[unmapped-object:some_invalid_attribute = 'whatever' AND file:name = 'some_file.exe']"99 result = translation.translate('csa:nf', 'query', '{}', stix_pattern)100 assert result['success'] == False101 assert ErrorCode.TRANSLATION_MAPPING_ERROR.value == result['code']102 assert 'Unable to map the following STIX objects and properties to data source fields' in result['error']103 def test_unmapped_attribute_with_OR(self):104 dialect = 'nf'105 stix_pattern = "[ipv4-addr:value = '1.2.3.4' OR unmapped-object:some_invalid_attribute = 'whatever']"106 query = _translate_query(stix_pattern, dialect)107 where_statement = "WHERE (Network.A = '1.2.3.4' OR Network.B = '1.2.3.4')"108 _test_query_assertions(query, selections, from_statement, where_statement)109 # assert(False)110 def test_user_account_query(self):111 dialect = 'at'112 stix_pattern = "[user-account:user_id = 'root']"113 query = _translate_query(stix_pattern, dialect)114 where_statement = "WHERE (initiator.id = 'root' OR target.id = 'root' OR observer.id = 'root')"115 _test_query_assertions(query, at_selections, at_from_statement, where_statement)116 def test_invalid_stix_pattern(self):117 dialect = 'nf'118 stix_pattern = "[not_a_valid_pattern]"119 result = translation.translate('csa', 'query', '{}', stix_pattern, {'validate_pattern': 'true'})120 assert False == result['success']121 assert ErrorCode.TRANSLATION_STIX_VALIDATION.value == result['code']122 assert stix_pattern[1:-1] in result['error']123 def test_network_traffic_protocols(self):124 dialect = 'nf'125 for key, value in PROTOCOLS.items():126 # Test for both upper AND lower case protocols in the STIX stix_pattern127 if random.randint(0, 1) == 0:128 key = key.upper()129 stix_pattern = "[network-traffic:protocols[*] = '" + key + "']"130 query = _translate_query(stix_pattern, dialect)131 where_statement = "WHERE Transport.Protocol = '" + value + "'"132 _test_query_assertions(query, selections, from_statement, where_statement)133 def test_network_traffic_start_stop(self):134 dialect = 'nf'135 stix_pattern = "[network-traffic:'start' = '2018-06-14T08:36:24.000Z' OR network-traffic:end = '2018-06-14T08:36:24.000Z']"136 query = _translate_query(stix_pattern, dialect)137 where_statement = "WHERE Last = '1528965384' OR Start = '1528965384'"138 _test_query_assertions(query, selections, from_statement, where_statement)139 # def test_artifact_queries(self):140 # dialect = 'nf'141 # stix_pattern = "[artifact:payload_bin MATCHES 'some text']"142 # query = _translate_query(stix_pattern, dialect)143 # where_statement = "WHERE payload MATCHES '.*some text.*'"144 # _test_query_assertions(query, selections, from_statement, where_statement)145# Sample from SkyDive146# {147# "Start": 1531867319.982,148# "Metric": {149# "BAPackets": 15,150# "BABytes": 6766,151# "ABBytes": 1604,152# "ABPackets": 10153# },154# "Last": 1531867320.174,155# "Network": {156# "A": "172.30.106.116",157# "A_Name": "k8s_node",158# "B": "75.126.81.67",159# "Protocol": "IPV4"160# },161# "Transport": {162# "A": "43748",163# "B": "443",164# "Protocol": "TCP"165# }...

Full Screen

Full Screen

test_qradar_flows_stix_to_query.py

Source:test_qradar_flows_stix_to_query.py Github

copy

Full Screen

1from stix_shifter.stix_translation import stix_translation2import unittest3import random4import json5options_file = open('stix_shifter_modules/qradar/tests/stix_translation/qradar_stix_to_aql/options.json').read()6selections_file = open('stix_shifter_modules/qradar/stix_translation/json/aql_flows_fields.json').read()7protocols_file = open('stix_shifter_modules/qradar/stix_translation/json/network_protocol_map.json').read()8OPTIONS = json.loads(options_file)9DEFAULT_SELECTIONS = json.loads(selections_file)10DEFAULT_LIMIT = 1000011DEFAULT_TIMERANGE = 512PROTOCOLS = json.loads(protocols_file)13MAPPING_ERROR = "Unable to map the following STIX objects and properties to data source fields:"14selections = "SELECT {}".format(", ".join(DEFAULT_SELECTIONS['default']))15from_statement = " FROM flows "16default_limit = "limit {}".format(DEFAULT_LIMIT)17default_time = "last {} minutes".format(DEFAULT_TIMERANGE)18translation = stix_translation.StixTranslation()19def _test_query_assertions(query, selections, from_statement, where_statement):20 assert query['queries'] == [selections + from_statement + where_statement]21def _translate_query(stix_pattern):22 return translation.translate('qradar:flows', 'query', '{}', stix_pattern)23class TestStixToAql(unittest.TestCase, object):24 def test_ipv4_query(self):25 stix_pattern = "[ipv4-addr:value = '192.168.122.83' OR ipv4-addr:value = '192.168.122.84/10']"26 query = _translate_query(stix_pattern)27 where_statement = "WHERE (INCIDR('192.168.122.84/10',sourceip) OR INCIDR('192.168.122.84/10',destinationip)) OR (sourceip = '192.168.122.83' OR destinationip = '192.168.122.83') {} {}".format(28 default_limit, default_time)29 _test_query_assertions(query, selections, from_statement, where_statement)30 def test_ipv6_query(self):31 stix_pattern = "[ipv6-addr:value = '3001:0:0:0:0:0:0:2']"32 query = _translate_query(stix_pattern)33 where_statement = "WHERE (sourcev6 = '3001:0:0:0:0:0:0:2' OR destinationv6 = '3001:0:0:0:0:0:0:2') {} {}".format(default_limit, default_time)34 _test_query_assertions(query, selections, from_statement, where_statement)35 def test_network_traffic_ports_query(self):36 stix_pattern = "[network-traffic:src_port = 123 OR network-traffic:dst_port = 456]"37 query = _translate_query(stix_pattern)38 where_statement = "WHERE destinationport = '456' OR sourceport = '123' {} {}".format(default_limit, default_time)39 _test_query_assertions(query, selections, from_statement, where_statement)40 def test_network_traffic_protocols(self):41 for key, value in PROTOCOLS.items():42 # Test for both upper and lower case protocols in the STIX pattern43 if random.randint(0, 1) == 0:44 key = key.upper()45 stix_pattern = "[network-traffic:protocols[*] = '" + key + "']"46 query = _translate_query(stix_pattern)47 where_statement = "WHERE protocolid = '{}' {} {}".format(value, default_limit, default_time)48 _test_query_assertions(query, selections, from_statement, where_statement)49 def test_network_traffic_start_end(self):50 stix_pattern = "[network-traffic:start = '2018-06-14T08:36:24.000Z' AND network-traffic:end = '2018-06-14T08:36:24.567Z']"51 query = _translate_query(stix_pattern)52 where_statement = "WHERE endtime = '1528965384567' AND starttime = '1528965384000' {} {}".format(default_limit, default_time)53 _test_query_assertions(query, selections, from_statement, where_statement)54 def test_network_traffic_src_dst_ipv4_ref(self):55 stix_pattern = "[network-traffic:src_ref.value = '192.168.122.83' OR network-traffic:dst_ref.value = '192.168.122.84/10']"56 query = _translate_query(stix_pattern)57 where_statement = "WHERE INCIDR('192.168.122.84/10',destinationip) OR sourceip = '192.168.122.83' {} {}".format(default_limit, default_time)58 _test_query_assertions(query, selections, from_statement, where_statement)59 def test_network_traffic_src_dst_ipv6_ref(self):60 stix_pattern = "[network-traffic:src_ref.value = '3001:0:0:0:0:0:0:2' OR network-traffic:dst_ref.value = '3001:2:4:7:3:0:0:1/32']"61 query = _translate_query(stix_pattern)62 where_statement = "WHERE INCIDR('3001:2:4:7:3:0:0:1/32',destinationv6) OR sourcev6 = '3001:0:0:0:0:0:0:2' {} {}".format(default_limit, default_time)63 _test_query_assertions(query, selections, from_statement, where_statement)64 def test_src_dst_byte_and_packet(self):65 start_time_01 = "t'2016-06-01T01:30:00.123Z'"66 stop_time_01 = "t'2016-06-01T02:20:00.123Z'"67 unix_start_time_01 = 146474460012368 unix_stop_time_01 = 146474760012369 stix_pattern = "[(network-traffic:src_byte_count = 306 AND network-traffic:dst_byte_count = 604) OR (network-traffic:src_packets = 2898 AND network-traffic:dst_packets = 1)] START {} STOP {}".format(start_time_01, stop_time_01)70 query = _translate_query(stix_pattern)71 where_statement = "WHERE (destinationpackets = '1' AND sourcepackets = '2898') OR (destinationbytes = '604' AND sourcebytes = '306') {} START {} STOP {}".format(default_limit, unix_start_time_01, unix_stop_time_01)72 _test_query_assertions(query, selections, from_statement, where_statement)73 def test_text_search(self):74 stix_pattern = "[artifact:payload_bin LIKE '%Set-ItemProperty%' AND artifact:payload_bin LIKE '%New-Item%']"75 query = _translate_query(stix_pattern)76 where_statement = "WHERE TEXT SEARCH '%New-Item% AND %Set-ItemProperty%' {} {}".format(default_limit, default_time)77 _test_query_assertions(query, selections, from_statement, where_statement)78 def test_combined_observation_expression_with_qualifier(self):79 stix_pattern = "([ipv4-addr:value = '192.168.1.2'] OR [network-traffic:src_port = 8080]) START t'2020-09-11T13:00:52.000Z' STOP t'2020-09-11T13:59:04.000Z'"80 query = _translate_query(stix_pattern)81 where_statement_01 = "WHERE (sourceip = '192.168.1.2' OR destinationip = '192.168.1.2') limit 10000 START 1599829252000 STOP 1599832744000"82 where_statement_02 = "WHERE sourceport = '8080' limit 10000 START 1599829252000 STOP 1599832744000"83 assert len(query['queries']) == 284 assert query['queries'][0] == selections + from_statement + where_statement_01...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run avocado automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful