Best Python code snippet using avocado_python
test_qradar_events_stix_to_query.py
Source:test_qradar_events_stix_to_query.py  
1from stix_shifter.stix_translation import stix_translation2from stix_shifter_utils.utils.error_response import ErrorCode3import unittest4import random5import json6options_file = open('stix_shifter_modules/qradar/tests/stix_translation/qradar_stix_to_aql/options.json').read()7selections_file = open('stix_shifter_modules/qradar/stix_translation/json/aql_events_fields.json').read()8protocols_file = open('stix_shifter_modules/qradar/stix_translation/json/network_protocol_map.json').read()9OPTIONS = json.loads(options_file)10DEFAULT_SELECTIONS = json.loads(selections_file)11DEFAULT_LIMIT = 1000012DEFAULT_time_range = 513PROTOCOLS = json.loads(protocols_file)14MAPPING_ERROR = "Unable to map the following STIX objects and properties to data source fields:"15selections = "SELECT {}".format(", ".join(DEFAULT_SELECTIONS['default']))16custom_selections = "SELECT {}".format(", ".join(OPTIONS['mapping']['aql_events_fields']['default']))17from_statement = " FROM events "18default_limit = "limit {}".format(DEFAULT_LIMIT)19default_time = "last {} minutes".format(DEFAULT_time_range)20translation = stix_translation.StixTranslation()21def _test_query_assertions(query, selections, from_statement, where_statement):22    assert query['queries'] == [selections + from_statement + where_statement]23def _translate_query(stix_pattern):24    return translation.translate('qradar:events', 'query', '{}', stix_pattern)25class TestQueryTranslator(unittest.TestCase, object):26    def test_ipv4_query(self):27        stix_pattern = "[ipv4-addr:value = '192.168.122.83' OR ipv4-addr:value = '192.168.122.84/10']"28        query = _translate_query(stix_pattern)29        where_statement = "WHERE (INCIDR('192.168.122.84/10',sourceip) OR INCIDR('192.168.122.84/10',destinationip) OR INCIDR('192.168.122.84/10',identityip)) OR (sourceip = '192.168.122.83' OR destinationip = '192.168.122.83' OR identityip = '192.168.122.83') {} {}".format(30            default_limit, default_time)31        _test_query_assertions(query, selections, from_statement, where_statement)32    def test_ipv6_query(self):33        stix_pattern = "[ipv6-addr:value = '3001:0:0:0:0:0:0:2']"34        query = _translate_query(stix_pattern)35        where_statement = "WHERE (sourceip = '3001:0:0:0:0:0:0:2' OR destinationip = '3001:0:0:0:0:0:0:2') {} {}".format(default_limit, default_time)36        _test_query_assertions(query, selections, from_statement, where_statement)37    def test_url_query(self):38        stix_pattern = "[url:value = 'http://www.testaddress.com']"39        query = _translate_query(stix_pattern)40        where_statement = "WHERE url = 'http://www.testaddress.com' {} {}".format(default_limit, default_time)41        _test_query_assertions(query, selections, from_statement, where_statement)42    def test_NOT_and_not_equals_operators(self):43        search_string1 = "www.example.com"44        search_string2 = "www.example.ca"45        stix_pattern = "[url:value != '{}' OR url:value NOT = '{}']".format(search_string1, search_string2)46        query = _translate_query(stix_pattern)47        where_statement = "WHERE NOT (url = '{1}') OR url != '{0}' {2} {3}".format(48            search_string1, search_string2, default_limit, default_time)49        _test_query_assertions(query, selections, from_statement, where_statement)50    def test_mac_address_query(self):51        stix_pattern = "[mac-addr:value = '00-00-5E-00-53-00']"52        query = _translate_query(stix_pattern)53        where_statement = "WHERE (sourcemac = '00-00-5E-00-53-00' OR destinationmac = '00-00-5E-00-53-00') {} {}".format(default_limit, default_time)54        _test_query_assertions(query, selections, from_statement, where_statement)55    def test_query_from_multiple_observation_expressions_joined_by_AND(self):56        stix_pattern = "[url:value = 'www.example.com'] AND [mac-addr:value = '00-00-5E-00-53-00']"57        query = _translate_query(stix_pattern)58        # Expect the STIX and to convert to an AQL OR.59        where_statement = "WHERE (url = 'www.example.com') OR ((sourcemac = '00-00-5E-00-53-00' OR destinationmac = '00-00-5E-00-53-00')) {} {}".format(default_limit, default_time)60        _test_query_assertions(query, selections, from_statement, where_statement)61    def test_query_from_multiple_comparison_expressions_joined_by_AND(self):62        stix_pattern = "[(url:value = 'www.example.com' OR url:value = 'www.test.com') AND mac-addr:value = '00-00-5E-00-53-00']"63        query = _translate_query(stix_pattern)64        # Expect the STIX and to convert to an AQL AND.65        where_statement = "WHERE (sourcemac = '00-00-5E-00-53-00' OR destinationmac = '00-00-5E-00-53-00') AND (url = 'www.test.com' OR url = 'www.example.com') {} {}".format(default_limit, default_time)66        _test_query_assertions(query, selections, from_statement, where_statement)67    def test_file_query(self):68        # TODO: Add support for file hashes. Unsure at this point how QRadar queries them69        stix_pattern = "[file:name = 'some_file.exe']"70        query = _translate_query(stix_pattern)71        where_statement = "WHERE filename = 'some_file.exe' {} {}".format(default_limit, default_time)72        _test_query_assertions(query, selections, from_statement, where_statement)73    def test_port_queries(self):74        stix_pattern = "[network-traffic:src_port = 12345 OR network-traffic:dst_port = 23456]"75        query = _translate_query(stix_pattern)76        where_statement = "WHERE destinationport = '23456' OR sourceport = '12345' {} {}".format(default_limit, default_time)77        _test_query_assertions(query, selections, from_statement, where_statement)78    def test_unmapped_attribute_with_AND(self):79        stix_pattern = "[unmapped-object:some_invalid_attribute = 'whatever' AND file:name = 'some_file.exe']"80        result = translation.translate('qradar', 'query', '{}', stix_pattern)81        assert result['success'] == False82        assert ErrorCode.TRANSLATION_MAPPING_ERROR.value == result['code']83        assert MAPPING_ERROR in result['error']84    def test_pattern_with_two_observation_exp_with_one_unmapped_attribute(self):85        stix_pattern = "[unmapped-object:some_invalid_attribute = 'whatever'] AND [file:name = 'some_file.exe']"86        query = _translate_query(stix_pattern)87        where_statement = "WHERE filename = 'some_file.exe' {} {}".format(default_limit, default_time)88        _test_query_assertions(query, selections, from_statement, where_statement)89    def test_pattern_with_one_observation_exp_with_one_unmapped_attribute(self):90        stix_pattern = "[network-traffic:some_invalid_attribute = 'whatever']"91        result = translation.translate('qradar', 'query', '{}', stix_pattern)92        assert result['success'] == False93        assert ErrorCode.TRANSLATION_MAPPING_ERROR.value == result['code']94        assert MAPPING_ERROR in result['error']95    def test_unmapped_attribute_with_OR(self):96        stix_pattern = "[network-traffic:some_invalid_attribute = 'whatever' OR file:name = 'some_file.exe']"97        query = _translate_query(stix_pattern)98        where_statement = "WHERE filename = 'some_file.exe' {} {}".format(default_limit, default_time)99        _test_query_assertions(query, selections, from_statement, where_statement)100    def test_pattern_with_two_observation_exps_one_with_unmapped_attribute(self):101        stix_pattern = "[network-traffic:some_invalid_attribute = 'whatever'] OR [file:name = 'some_file.exe' AND url:value = 'www.example.com']"102        query = _translate_query(stix_pattern)103        where_statement = "WHERE url = 'www.example.com' AND filename = 'some_file.exe' {} {}".format(default_limit, default_time)104        assert query['queries'] == [selections + from_statement + where_statement]105    def test_pattern_with_three_observation_exps_one_with_unmapped_attribute(self):106        stix_pattern = "[file:name = 'some_file.exe' AND network-traffic:some_invalid_attribute = 'whatever'] OR [url:value = 'www.example.com'] AND [mac-addr:value = '00-00-5E-00-53-00']"107        query = _translate_query(stix_pattern)108        where_statement = "WHERE (url = 'www.example.com') OR ((sourcemac = '00-00-5E-00-53-00' OR destinationmac = '00-00-5E-00-53-00')) {} {}".format(default_limit, default_time)109        assert query['queries'] == [selections + from_statement + where_statement]110    def test_user_account_query(self):111        stix_pattern = "[user-account:user_id = 'root']"112        query = _translate_query(stix_pattern)113        where_statement = "WHERE username = 'root' {} {}".format(default_limit, default_time)114        _test_query_assertions(query, selections, from_statement, where_statement)115    def test_invalid_stix_pattern(self):116        stix_pattern = "[not_a_valid_pattern]"117        result = translation.translate('qradar', 'query', '{}', stix_pattern, {'validate_pattern': 'true'})118        assert result['success'] == False119        assert ErrorCode.TRANSLATION_STIX_VALIDATION.value == result['code']120        assert stix_pattern[1:-1] in result['error']121    def test_network_traffic_protocols(self):122        for key, value in PROTOCOLS.items():123            # Test for both upper and lower case protocols in the STIX pattern124            if random.randint(0, 1) == 0:125                key = key.upper()126            stix_pattern = "[network-traffic:protocols[*] = '" + key + "']"127            query = _translate_query(stix_pattern)128        where_statement = "WHERE protocolid = '{}' {} {}".format(value, default_limit, default_time)129        _test_query_assertions(query, selections, from_statement, where_statement)130    def test_network_traffic_start_stop(self):131        stix_pattern = "[network-traffic:'start' = '2018-06-14T08:36:24.000Z' OR network-traffic:end = '2018-06-14T08:36:24.567Z']"132        query = _translate_query(stix_pattern)133        where_statement = "WHERE endtime = '1528965384567' OR starttime = '1528965384000' {} {}".format(default_limit, default_time)134        _test_query_assertions(query, selections, from_statement, where_statement)135    def test_start_stop_qualifiers_with_one_observation_with_an_unmapped_attribute(self):136        start_time_01 = "t'2016-06-01T01:30:00.123Z'"137        stop_time_01 = "t'2016-06-01T02:20:00.123Z'"138        unix_start_time_01 = 1464744600123139        unix_stop_time_01 = 1464747600123140        stix_pattern = "[network-traffic:src_port = 37020 AND user-account:user_id = 'root' OR network-traffic:some_invalid_attribute = 'whatever'] START {} STOP {}".format(start_time_01, stop_time_01)141        query = _translate_query(stix_pattern)142        where_statement = "WHERE username = 'root' AND sourceport = '37020' {} START {} STOP {}".format(default_limit, unix_start_time_01, unix_stop_time_01)143        assert len(query['queries']) == 1144        assert query['queries'] == [selections + from_statement + where_statement]145    def test_start_stop_qualifiers_with_two_observations(self):146        start_time_01 = "t'2016-06-01T01:30:00.123Z'"147        stop_time_01 = "t'2016-06-01T02:20:00.123Z'"148        start_time_02 = "t'2016-06-01T03:55:00.123Z'"149        stop_time_02 = "t'2016-06-01T04:30:24.743Z'"150        unix_start_time_01 = 1464744600123151        unix_stop_time_01 = 1464747600123152        unix_start_time_02 = 1464753300123153        unix_stop_time_02 = 1464755424743154        stix_pattern = "[network-traffic:src_port = 37020 AND user-account:user_id = 'root'] START {} STOP {} OR [ipv4-addr:value = '192.168.122.83'] START {} STOP {}".format(start_time_01, stop_time_01, start_time_02, stop_time_02)155        query = _translate_query(stix_pattern)156        where_statement_01 = "WHERE username = 'root' AND sourceport = '37020' {} START {} STOP {}".format(default_limit, unix_start_time_01, unix_stop_time_01)157        where_statement_02 = "WHERE (sourceip = '192.168.122.83' OR destinationip = '192.168.122.83' OR identityip = '192.168.122.83') {} START {} STOP {}".format(default_limit, unix_start_time_02, unix_stop_time_02)158        assert len(query['queries']) == 2159        assert query['queries'] == [selections + from_statement + where_statement_01, selections + from_statement + where_statement_02]160    # BROKEN, not returning query without qualifier161    def test_start_stop_qualifiers_with_three_observations_and_an_unmapped_attribute(self):162        start_time_01 = "t'2016-06-01T00:00:00.123Z'"163        stop_time_01 = "t'2016-06-01T01:11:11.456Z'"164        start_time_02 = "t'2016-06-07T02:22:22.789Z'"165        stop_time_02 = "t'2016-06-07T03:33:33.012Z'"166        unix_start_time_01 = 1464739200123167        unix_stop_time_01 = 1464743471456168        unix_start_time_02 = 1465266142789169        unix_stop_time_02 = 1465270413012170        stix_pattern = "[network-traffic:src_port = 37020 AND network-traffic:dst_port = 635] START {} STOP {} OR [url:value = 'www.example.com'] OR [ipv4-addr:value = '333.333.333.0' OR network-traffic:some_invalid_attribute = 'whatever'] START {} STOP {}".format(171            start_time_01, stop_time_01, start_time_02, stop_time_02)172        query = _translate_query(stix_pattern)173        where_statement_01 = "WHERE destinationport = '635' AND sourceport = '37020' {} START {} STOP {}".format(default_limit, unix_start_time_01, unix_stop_time_01)174        where_statement_02 = "WHERE (sourceip = '333.333.333.0' OR destinationip = '333.333.333.0' OR identityip = '333.333.333.0') {} START {} STOP {}".format(default_limit, unix_start_time_02, unix_stop_time_02)175        where_statement_03 = "WHERE url = 'www.example.com' {} {}".format(default_limit, default_time)176        assert len(query['queries']) == 3177        assert query['queries'] == [selections + from_statement + where_statement_01, selections + from_statement + where_statement_02, selections + from_statement + where_statement_03]178    def test_start_stop_qualifiers_with_missing_or_partial_milliseconds(self):179        # missing milliseconds180        start_time_01 = "t'2016-06-01T01:30:00Z'"181        stop_time_01 = "t'2016-06-01T02:20:00Z'"182        # one-digit millisecond183        start_time_02 = "t'2016-06-01T03:55:00.1Z'"184        # four-digit millisecond185        stop_time_02 = "t'2016-06-01T04:30:24.1243Z'"186        unix_start_time_01 = 1464744600000187        unix_stop_time_01 = 1464747600000188        unix_start_time_02 = 1464753300100189        unix_stop_time_02 = 1464755424124190        stix_pattern = "[user-account:user_id = 'root'] START {} STOP {} OR [ipv4-addr:value = '192.168.122.83'] START {} STOP {}".format(start_time_01, stop_time_01, start_time_02, stop_time_02)191        query = _translate_query(stix_pattern)192        where_statement_01 = "WHERE username = 'root' {} START {} STOP {}".format(default_limit, unix_start_time_01, unix_stop_time_01)193        where_statement_02 = "WHERE (sourceip = '192.168.122.83' OR destinationip = '192.168.122.83' OR identityip = '192.168.122.83') {} START {} STOP {}".format(default_limit, unix_start_time_02, unix_stop_time_02)194        assert len(query['queries']) == 2195        assert query['queries'] == [selections + from_statement + where_statement_01, selections + from_statement + where_statement_02]196    def test_issubset_operators(self):197        stix_pattern = "[ipv4-addr:value ISSUBSET '198.51.100.0/24']"198        query = _translate_query(stix_pattern)199        where_statement = "WHERE (INCIDR('198.51.100.0/24',sourceip) OR INCIDR('198.51.100.0/24',destinationip) OR INCIDR('198.51.100.0/24',identityip)) {} {}".format(default_limit, default_time)200        _test_query_assertions(query, selections, from_statement, where_statement)201    def test_custom_time_limit_and_result_count_and_mappings(self):202        stix_pattern = "[ipv4-addr:value = '192.168.122.83']"203        result_limit = OPTIONS['result_limit']204        time_range = OPTIONS['time_range']205        query = translation.translate('qradar:events', 'query', '{}', stix_pattern, OPTIONS)206        where_statement = "WHERE (sourceip = '192.168.122.83' OR destinationip = '192.168.122.83' OR identityip = '192.168.122.83') limit {} last {} minutes".format(result_limit, time_range)207        assert query == {'queries': [custom_selections + from_statement + where_statement]}208    def test_domainname_query(self):209        stix_pattern = "[domain-name:value = 'example.com']"210        query = _translate_query(stix_pattern)211        where_statement = "WHERE (domainname LIKE '%example.com%' OR UrlHost LIKE '%example.com%') {} {}".format(default_limit, default_time)212        _test_query_assertions(query, selections, from_statement, where_statement)213    def test_generic_filehash_query(self):214        stix_pattern = "[file:hashes.'SHA-256' = 'sha256hash']"215        query = _translate_query(stix_pattern)216        where_statement = "WHERE sha256hash = 'sha256hash' {} {}".format(default_limit, default_time)217        _test_query_assertions(query, selections, from_statement, where_statement)218    # def test_sha256_filehash_query(self):219    #     stix_pattern = "[file:hashes.'SHA-256' = 'sha256hash']"220    #     query = translation.translate('qradar', 'query', '{}', stix_pattern, OPTIONS)221    #     where_statement = "WHERE (sha256hash = 'sha256hash' OR filehash = 'sha256hash') limit {} last {} minutes".format(OPTIONS['result_limit'], OPTIONS['time_range'])222    #     assert query == {'queries': [custom_selections + from_statement + where_statement]}223    # def test_multi_filehash_query(self):224    #     stix_pattern = "[file:hashes.'SHA-256' = 'sha256hash'] OR [file:hashes.'MD5' = 'md5hash']"225    #     query = translation.translate('qradar', 'query', '{}', stix_pattern, OPTIONS)226    #     where_statement = "WHERE ((sha256hash = 'sha256hash' OR filehash = 'sha256hash')) OR ((md5hash = 'md5hash' OR filehash = 'md5hash')) limit {} last {} minutes".format(OPTIONS['result_limit'], OPTIONS['time_range'])227    #     assert query == {'queries': [custom_selections + from_statement + where_statement]}228    def test_source_and_destination_references(self):229        where_statements = [230            [231                "WHERE sourceip = '192.0.2.0' {} {}".format(default_limit, default_time),232                "WHERE sourcemac = '00-00-5E-00-53-00' {} {}".format(default_limit, default_time),233                "WHERE INCIDR('192.0.2.0/25',sourceip) {} {}".format(default_limit, default_time),234                "WHERE sourceip = '3001:0:0:0:0:0:0:2' {} {}".format(default_limit, default_time)235            ],236            [237                "WHERE destinationip = '192.0.2.0' {} {}".format(default_limit, default_time),238                "WHERE destinationmac = '00-00-5E-00-53-00' {} {}".format(default_limit, default_time),239                "WHERE INCIDR('192.0.2.0/25',destinationip) {} {}".format(default_limit, default_time),240                "WHERE destinationip = '3001:0:0:0:0:0:0:2' {} {}".format(default_limit, default_time)241            ]242        ]243        for ref_index, reference in enumerate(["network-traffic:src_ref.value", "network-traffic:dst_ref.value"]):244            for dat_index, datum in enumerate(["'192.0.2.0'", "'00-00-5E-00-53-00'", "'192.0.2.0/25'", "'3001:0:0:0:0:0:0:2'"]):245                stix_pattern = "[{} = {}]".format(reference, datum)246                query = _translate_query(stix_pattern)247                where_statement = where_statements[ref_index][dat_index]248                _test_query_assertions(query, selections, from_statement, where_statement)249    def test_nested_parenthesis_in_pattern(self):250        stix_pattern = "[(ipv4-addr:value = '192.168.122.83' OR ipv4-addr:value = '100.100.122.90') AND network-traffic:src_port = 37020] OR [user-account:user_id = 'root'] AND [url:value = 'www.example.com']"251        query = _translate_query(stix_pattern)252        where_statement = "WHERE (sourceport = '37020' AND ((sourceip = '100.100.122.90' OR destinationip = '100.100.122.90' OR identityip = '100.100.122.90') OR (sourceip = '192.168.122.83' OR destinationip = '192.168.122.83' OR identityip = '192.168.122.83'))) OR ((username = 'root') OR (url = 'www.example.com')) {} {}".format(default_limit, default_time)253        assert query['queries'] == [selections + from_statement + where_statement]254    def test_complex_multiple_comparison_expression(self):255        url_1 = "example01.ru"256        url_2 = "example02.ru"257        url_3 = "example01.com"258        url_4 = "example03.ru"259        url_5 = "example02.com"260        url_6 = "example04.ru"261        stix_pattern = "[url:value = '{0}' OR url:value = '{1}' OR url:value = '{2}' OR url:value = '{3}' OR url:value = '{4}' OR url:value = '{5}'] START t'2019-06-24T19:05:43.000Z' STOP t'2019-06-25T19:05:43.000Z'".format(url_1, url_2, url_3, url_4, url_5, url_6)262        query = _translate_query(stix_pattern)263        where_statement = "WHERE url = '{5}' OR (url = '{4}' OR (url = '{3}' OR (url = '{2}' OR (url = '{1}' OR url = '{0}')))) {6} START 1561403143000 STOP 1561489543000".format(url_1, url_2, url_3, url_4, url_5, url_6, default_limit)264        assert query['queries'] == [selections + from_statement + where_statement]265    def test_LIKE_operator(self):266        search_string = 'example.com'267        stix_pattern = "[url:value LIKE '{}']".format(search_string)268        query = _translate_query(stix_pattern)269        where_statement = "WHERE url LIKE '%{0}%' {1} {2}".format(search_string, default_limit, default_time)270        _test_query_assertions(query, selections, from_statement, where_statement)271    def test_payload_string_matching_with_LIKE(self):272        search_string = 'search term'273        stix_pattern = "[artifact:payload_bin LIKE '{}']".format(search_string)274        query = _translate_query(stix_pattern)275        where_statement = "WHERE TEXT SEARCH '{}' {} {}".format(search_string, default_limit, default_time)276        _test_query_assertions(query, selections, from_statement, where_statement)277    def test_payload_string_matching_with_MATCH(self):278        search_string = '^.*https://wally.fireeye.com.*$'279        stix_pattern = "[artifact:payload_bin MATCHES '{}']".format(search_string)280        query = _translate_query(stix_pattern)281        where_statement = "WHERE eventpayload MATCHES '{}' {} {}".format(search_string, default_limit, default_time)282        _test_query_assertions(query, selections, from_statement, where_statement)283    def test_backslash_escaping(self):284        # Stix pattern requires backslash to be double escaped to pass pattern validation.285        # Not sure yet how we will make this work for an AQL query.286        # See https://github.com/oasis-open/cti-stix2-json-schemas/issues/51287        search_string = '^.*http://graphics8\\\.nytimes\\\.com/bcvideo.*$'288        stix_pattern = "[artifact:payload_bin MATCHES '{}']".format(search_string)289        query = _translate_query(stix_pattern)290        translated_value = '^.*http://graphics8\\.nytimes\\.com/bcvideo.*$'291        where_statement = "WHERE eventpayload MATCHES '{}' {} {}".format(translated_value, default_limit, default_time)292        _test_query_assertions(query, selections, from_statement, where_statement)293    def test_filepath_queries(self):294        first_path = 'C:/first/file/path'295        second_path = 'D:/second/file/path'296        stix_pattern = "[(file:parent_directory_ref = '{}' OR directory:path = '{}')]".format(first_path, second_path)297        query = _translate_query(stix_pattern)298        where_statement = "WHERE filepath = '{}' OR filepath = '{}' {} {}".format(second_path, first_path, default_limit, default_time)299        _test_query_assertions(query, selections, from_statement, where_statement)300    def test_risk_finding(self):301        stix_pattern="[x-ibm-finding:name = '*']"302        query = _translate_query(stix_pattern)303        where_statement = "WHERE devicetype = 18 {} {}".format(default_limit, default_time)304        _test_query_assertions(query, selections, from_statement, where_statement)305    def test_rule_name_query(self):306        rule_name = 'Context is Local to Remote'307        stix_pattern="[x-ibm-finding:rule_names[*] = '{}']".format(rule_name)308        query = _translate_query(stix_pattern)309        where_statement = "WHERE rulenames = '{}' {} {}".format(rule_name, default_limit, default_time)310        _test_query_assertions(query, selections, from_statement, where_statement)311        312    def test_text_search(self):313        stix_pattern = "[artifact:payload_bin LIKE '%Set-ItemProperty%' AND artifact:payload_bin LIKE '%New-Item%']"314        query = _translate_query(stix_pattern)315        where_statement = "WHERE TEXT SEARCH '%New-Item% AND %Set-ItemProperty%' {} {}".format(default_limit, default_time)316        _test_query_assertions(query, selections, from_statement, where_statement)317    318    def test_combined_observation_expression_with_qualifier(self):319        stix_pattern = "([ipv4-addr:value = '192.168.1.2'] OR [url:value LIKE '%.example.com']) START t'2020-09-11T13:00:52.000Z' STOP t'2020-09-11T13:59:04.000Z'"320        query = _translate_query(stix_pattern)321        where_statement_01 = "WHERE (sourceip = '192.168.1.2' OR destinationip = '192.168.1.2' OR identityip = '192.168.1.2') limit 10000 START 1599829252000 STOP 1599832744000"322        where_statement_02 = "WHERE url LIKE '%%.example.com%' limit 10000 START 1599829252000 STOP 1599832744000"323        assert len(query['queries']) == 2324        assert query['queries'][0] == selections + from_statement + where_statement_01325        assert query['queries'][1] == selections + from_statement + where_statement_02326    def test_registry_search(self):327        stix_pattern = "[windows-registry-key:values[*].name = 'abcd']"328        query = _translate_query(stix_pattern)329        where_statement = "WHERE RegistryValueName = 'abcd' {} {}".format(default_limit, default_time)330        _test_query_assertions(query, selections, from_statement, where_statement)331        stix_pattern = "[windows-registry-key:key = 'efgh']"332        query = _translate_query(stix_pattern)333        where_statement = "WHERE (ObjectName = 'efgh' OR RegistryKey = 'efgh') {} {}".format(default_limit, default_time)334        _test_query_assertions(query, selections, from_statement, where_statement)335    def test_x_ibm_host_search(self):336        stix_pattern = "[x-oca-asset:hostname = 'abcd']"337        query = _translate_query(stix_pattern)338        where_statement = "WHERE identityhostname = 'abcd' {} {}".format(default_limit, default_time)339        _test_query_assertions(query, selections, from_statement, where_statement)340        stix_pattern = "[x-oca-asset:ip_refs[*].value = '9.9.9.9']"341        query = _translate_query(stix_pattern)342        where_statement = "WHERE (identityip = '9.9.9.9' OR sourceip = '9.9.9.9') {} {}".format(default_limit, default_time)343        _test_query_assertions(query, selections, from_statement, where_statement)344        stix_pattern = "[x-oca-asset:mac_refs[*].value = '00-00-5E-00-53-00']"345        query = _translate_query(stix_pattern)346        where_statement = "WHERE sourcemac = '00-00-5E-00-53-00' {} {}".format(default_limit, default_time)347        _test_query_assertions(query, selections, from_statement, where_statement)348    def test_x_ibm_event_search(self):349        stix_pattern = "[x-oca-event:action = 'abcd']"350        query = _translate_query(stix_pattern)351        where_statement = "WHERE qidname = 'abcd' {} {}".format(default_limit, default_time)352        _test_query_assertions(query, selections, from_statement, where_statement)353        stix_pattern = "[x-oca-event:code = 1]"354        query = _translate_query(stix_pattern)355        where_statement = "WHERE EventID = '1' {} {}".format(default_limit, default_time)356        _test_query_assertions(query, selections, from_statement, where_statement)357        stix_pattern = "[x-oca-event:process_ref.command_line = 'abc']"358        query = _translate_query(stix_pattern)359        where_statement = "WHERE ProcessCommandLine = 'abc' {} {}".format(default_limit, default_time)360        _test_query_assertions(query, selections, from_statement, where_statement)361    def test_in_operators(self):362        stix_pattern = "[network-traffic:dst_port IN ('22','443')]"363        query = _translate_query(stix_pattern)364        where_statement = "WHERE destinationport IN ('22', '443') {} {}".format(default_limit, default_time)...test_class.py
Source:test_class.py  
1'''2STIX to CSQL query adaptor test cases3'''4from stix_shifter.stix_translation import stix_translation5from stix_shifter_utils.utils.error_response import ErrorCode6import unittest7import json8import random9selections = "SELECT Network.A as sourceip, Transport.A as sourceport, \10Link.A as sourcemac, Network.B as destinationip, Transport.B as destinationport, \11Link.B as destinationmac, Transport.Protocol as protocol, Start as starttime, \12Last as endtime"13at_selections = "SELECT initiator.id as initiator_id, initiator.name as initiator_name, \14initiator.credential.type as initiator_credential_type, \15initiator.host.address as initiator_host_address, ALCH_ACCOUNT_ID as \16alch_account_id, ALCH_TENANT_ID as alch_tenant_id, eventTime as eventTime, \17action as action, target.id as target_id, target.name as target_name, \18target.host.address as target_host_address, event_uuid as event_uuid, \19observer.host.address as observer_host_addres, observer.name as \20observer_name, observer.host.address as observer_host_address, api.name \21as api_name"22at_from_statement = " FROM cos://us-geo/at-hourly-dumps STORED AS JSON "23from_statement = " FROM cos://us-geo/nf-hourly-dumps STORED AS JSON "24num_rows = 102425protocols_file = open('stix_shifter_modules/csa/stix_translation/json/network_protocol_map.json').read()26PROTOCOLS = json.loads(protocols_file)27translation = stix_translation.StixTranslation()28def _translate_query(stix_pattern, dialect):29    return translation.translate("csa:{}".format(dialect), 'query', '{}', stix_pattern)30def _test_query_assertions(query, selections, from_statement, where_statement):31    assert query['queries'] == [selections + from_statement + where_statement + ' PARTITIONED EVERY {num_rows} ROWS'.format(num_rows=num_rows)]32class TestStixToSql(unittest.TestCase, object):33    def test_ipv4_query(self):34        dialect = 'nf'35        stix_pattern = "[ipv4-addr:value = '192.168.122.83' OR ipv4-addr:value = '192.168.122.84']"36        query = _translate_query(stix_pattern, dialect)37        where_statement = "WHERE (Network.A = '192.168.122.84' OR Network.B = '192.168.122.84') OR (Network.A = '192.168.122.83' OR Network.B = '192.168.122.83')"38        _test_query_assertions(query, selections, from_statement, where_statement)39    def test_ipv4_in_query(self):40        dialect = 'nf'41        stix_pattern = "[ipv4-addr:value IN ('192.168.122.83', '192.168.122.84')]"42        query = _translate_query(stix_pattern, dialect)43        where_statement = "WHERE (Network.A IN (192.168.122.83 OR 192.168.122.84) OR Network.B IN (192.168.122.83 OR 192.168.122.84))"44        assert query['queries'] == [selections + from_statement + where_statement + ' PARTITIONED EVERY {num_rows} ROWS'.format(num_rows=num_rows)]45    def test_ipv6_query(self):46        dialect = 'nf'47        stix_pattern = "[ipv6-addr:value = '192.168.122.83']"48        query = _translate_query(stix_pattern, dialect)49        where_statement = "WHERE (Network.A = '192.168.122.83' OR Network.B = '192.168.122.83')"50        _test_query_assertions(query, selections, from_statement, where_statement)51    # Non-mappable now throws an error. Skydive doesn't have url52    # def test_url_query(self):53    #     dialect = 'nf'54    #     stix_pattern = "[url:value = 'http://www.testaddress.com']"55    #     query = _translate_query(stix_pattern, dialect)56    #     where_statement = "WHERE url = 'http://www.testaddress.com'"57    #     _test_query_assertions(query, selections, from_statement, where_statement)58    def test_mac_address_query(self):59        dialect = 'nf'60        stix_pattern = "[mac-addr:value = '00-00-5E-00-53-00']"61        query = _translate_query(stix_pattern, dialect)62        where_statement = "WHERE (Link.A = '00-00-5E-00-53-00' OR Link.B = '00-00-5E-00-53-00')"63        _test_query_assertions(query, selections, from_statement, where_statement)64    # def test_domain_query(self):65    #     dialect = 'nf'66    #     stix_pattern = "[domain-name:value = 'example.com']"67    #     query = _translate_query(stix_pattern, dialect)68    #     where_statement = "WHERE domainname = 'example.com'"69    #     _test_query_assertions(query, selections, from_statement, where_statement)70    def test_query_from_multiple_observation_expressions_joined_by_and(self):71        dialect = 'nf'72        stix_pattern = "[domain-name:value = 'example.com'] AND [mac-addr:value = '00-00-5E-00-53-00']"73        query = _translate_query(stix_pattern, dialect)74        # Expect the STIX AND to convert to an AQL OR.75        where_statement = "WHERE domainname = 'example.com' OR (Link.A = '00-00-5E-00-53-00' OR Link.B = '00-00-5E-00-53-00')"76        _test_query_assertions(query, selections, from_statement, where_statement)77    def test_query_from_multiple_comparison_expressions_joined_by_and(self):78        dialect = 'nf'79        stix_pattern = "[domain-name:value = 'example.com' AND mac-addr:value = '00-00-5E-00-53-00']"80        query = _translate_query(stix_pattern, dialect)81        # Expect the STIX AND to convert to an AQL AND.82        where_statement = "WHERE (Link.A = '00-00-5E-00-53-00' OR Link.B = '00-00-5E-00-53-00') AND domainname = 'example.com'"83        _test_query_assertions(query, selections, from_statement, where_statement)84    # def test_file_query(self):85    #     # TODO: Add support for file hashes. Unsure at this point how QRadar queries them86    #     dialect = 'nf'87    #     stix_pattern = "[file:name = 'some_file.exe']"88    #     query = _translate_query(stix_pattern, dialect)89    #     where_statement = "WHERE filename = 'some_file.exe'"90    #     _test_query_assertions(query, selections, from_statement, where_statement)91    def test_port_queries(self):92        dialect = 'nf'93        stix_pattern = "[network-traffic:src_port = 12345 OR network-traffic:dst_port = 23456]"94        query = _translate_query(stix_pattern, dialect)95        where_statement = "WHERE Transport.B = '23456' OR Transport.A = '12345'"96        _test_query_assertions(query, selections, from_statement, where_statement)97    def test_unmapped_attribute_with_AND(self):98        stix_pattern = "[unmapped-object:some_invalid_attribute = 'whatever' AND file:name = 'some_file.exe']"99        result = translation.translate('csa:nf', 'query', '{}', stix_pattern)100        assert result['success'] == False101        assert ErrorCode.TRANSLATION_MAPPING_ERROR.value == result['code']102        assert 'Unable to map the following STIX objects and properties to data source fields' in result['error']103    def test_unmapped_attribute_with_OR(self):104        dialect = 'nf'105        stix_pattern = "[ipv4-addr:value = '1.2.3.4' OR unmapped-object:some_invalid_attribute = 'whatever']"106        query = _translate_query(stix_pattern, dialect)107        where_statement = "WHERE (Network.A = '1.2.3.4' OR Network.B = '1.2.3.4')"108        _test_query_assertions(query, selections, from_statement, where_statement)109        # assert(False)110    def test_user_account_query(self):111        dialect = 'at'112        stix_pattern = "[user-account:user_id = 'root']"113        query = _translate_query(stix_pattern, dialect)114        where_statement = "WHERE (initiator.id = 'root' OR target.id = 'root' OR observer.id = 'root')"115        _test_query_assertions(query, at_selections, at_from_statement, where_statement)116    def test_invalid_stix_pattern(self):117        dialect = 'nf'118        stix_pattern = "[not_a_valid_pattern]"119        result = translation.translate('csa', 'query', '{}', stix_pattern, {'validate_pattern': 'true'})120        assert False == result['success']121        assert ErrorCode.TRANSLATION_STIX_VALIDATION.value == result['code']122        assert stix_pattern[1:-1] in result['error']123    def test_network_traffic_protocols(self):124        dialect = 'nf'125        for key, value in PROTOCOLS.items():126            # Test for both upper AND lower case protocols in the STIX stix_pattern127            if random.randint(0, 1) == 0:128                key = key.upper()129            stix_pattern = "[network-traffic:protocols[*] = '" + key + "']"130            query = _translate_query(stix_pattern, dialect)131        where_statement = "WHERE Transport.Protocol = '" + value + "'"132        _test_query_assertions(query, selections, from_statement, where_statement)133    def test_network_traffic_start_stop(self):134        dialect = 'nf'135        stix_pattern = "[network-traffic:'start' = '2018-06-14T08:36:24.000Z' OR network-traffic:end = '2018-06-14T08:36:24.000Z']"136        query = _translate_query(stix_pattern, dialect)137        where_statement = "WHERE Last = '1528965384' OR Start = '1528965384'"138        _test_query_assertions(query, selections, from_statement, where_statement)139    # def test_artifact_queries(self):140    #     dialect = 'nf'141    #     stix_pattern = "[artifact:payload_bin MATCHES 'some text']"142    #     query = _translate_query(stix_pattern, dialect)143    #     where_statement = "WHERE payload MATCHES '.*some text.*'"144    #     _test_query_assertions(query, selections, from_statement, where_statement)145# Sample from SkyDive146#             {147#   "Start": 1531867319.982,148#   "Metric": {149#     "BAPackets": 15,150#     "BABytes": 6766,151#     "ABBytes": 1604,152#     "ABPackets": 10153#   },154#   "Last": 1531867320.174,155#   "Network": {156#     "A": "172.30.106.116",157#     "A_Name": "k8s_node",158#     "B": "75.126.81.67",159#     "Protocol": "IPV4"160#   },161#   "Transport": {162#     "A": "43748",163#     "B": "443",164#     "Protocol": "TCP"165#   }...test_qradar_flows_stix_to_query.py
Source:test_qradar_flows_stix_to_query.py  
1from stix_shifter.stix_translation import stix_translation2import unittest3import random4import json5options_file = open('stix_shifter_modules/qradar/tests/stix_translation/qradar_stix_to_aql/options.json').read()6selections_file = open('stix_shifter_modules/qradar/stix_translation/json/aql_flows_fields.json').read()7protocols_file = open('stix_shifter_modules/qradar/stix_translation/json/network_protocol_map.json').read()8OPTIONS = json.loads(options_file)9DEFAULT_SELECTIONS = json.loads(selections_file)10DEFAULT_LIMIT = 1000011DEFAULT_TIMERANGE = 512PROTOCOLS = json.loads(protocols_file)13MAPPING_ERROR = "Unable to map the following STIX objects and properties to data source fields:"14selections = "SELECT {}".format(", ".join(DEFAULT_SELECTIONS['default']))15from_statement = " FROM flows "16default_limit = "limit {}".format(DEFAULT_LIMIT)17default_time = "last {} minutes".format(DEFAULT_TIMERANGE)18translation = stix_translation.StixTranslation()19def _test_query_assertions(query, selections, from_statement, where_statement):20    assert query['queries'] == [selections + from_statement + where_statement]21def _translate_query(stix_pattern):22    return translation.translate('qradar:flows', 'query', '{}', stix_pattern)23class TestStixToAql(unittest.TestCase, object):24    def test_ipv4_query(self):25        stix_pattern = "[ipv4-addr:value = '192.168.122.83' OR ipv4-addr:value = '192.168.122.84/10']"26        query = _translate_query(stix_pattern)27        where_statement = "WHERE (INCIDR('192.168.122.84/10',sourceip) OR INCIDR('192.168.122.84/10',destinationip)) OR (sourceip = '192.168.122.83' OR destinationip = '192.168.122.83') {} {}".format(28            default_limit, default_time)29        _test_query_assertions(query, selections, from_statement, where_statement)30    def test_ipv6_query(self):31        stix_pattern = "[ipv6-addr:value = '3001:0:0:0:0:0:0:2']"32        query = _translate_query(stix_pattern)33        where_statement = "WHERE (sourcev6 = '3001:0:0:0:0:0:0:2' OR destinationv6 = '3001:0:0:0:0:0:0:2') {} {}".format(default_limit, default_time)34        _test_query_assertions(query, selections, from_statement, where_statement)35    def test_network_traffic_ports_query(self):36        stix_pattern = "[network-traffic:src_port = 123 OR network-traffic:dst_port = 456]"37        query = _translate_query(stix_pattern)38        where_statement = "WHERE destinationport = '456' OR sourceport = '123' {} {}".format(default_limit, default_time)39        _test_query_assertions(query, selections, from_statement, where_statement)40    def test_network_traffic_protocols(self):41        for key, value in PROTOCOLS.items():42            # Test for both upper and lower case protocols in the STIX pattern43            if random.randint(0, 1) == 0:44                key = key.upper()45            stix_pattern = "[network-traffic:protocols[*] = '" + key + "']"46            query = _translate_query(stix_pattern)47        where_statement = "WHERE protocolid = '{}' {} {}".format(value, default_limit, default_time)48        _test_query_assertions(query, selections, from_statement, where_statement)49    def test_network_traffic_start_end(self):50        stix_pattern = "[network-traffic:start = '2018-06-14T08:36:24.000Z' AND network-traffic:end = '2018-06-14T08:36:24.567Z']"51        query = _translate_query(stix_pattern)52        where_statement = "WHERE endtime = '1528965384567' AND starttime = '1528965384000' {} {}".format(default_limit, default_time)53        _test_query_assertions(query, selections, from_statement, where_statement)54    def test_network_traffic_src_dst_ipv4_ref(self):55        stix_pattern = "[network-traffic:src_ref.value = '192.168.122.83' OR network-traffic:dst_ref.value = '192.168.122.84/10']"56        query = _translate_query(stix_pattern)57        where_statement = "WHERE INCIDR('192.168.122.84/10',destinationip) OR sourceip = '192.168.122.83' {} {}".format(default_limit, default_time)58        _test_query_assertions(query, selections, from_statement, where_statement)59    def test_network_traffic_src_dst_ipv6_ref(self):60        stix_pattern = "[network-traffic:src_ref.value = '3001:0:0:0:0:0:0:2' OR network-traffic:dst_ref.value = '3001:2:4:7:3:0:0:1/32']"61        query = _translate_query(stix_pattern)62        where_statement = "WHERE INCIDR('3001:2:4:7:3:0:0:1/32',destinationv6) OR sourcev6 = '3001:0:0:0:0:0:0:2' {} {}".format(default_limit, default_time)63        _test_query_assertions(query, selections, from_statement, where_statement)64    def test_src_dst_byte_and_packet(self):65        start_time_01 = "t'2016-06-01T01:30:00.123Z'"66        stop_time_01 = "t'2016-06-01T02:20:00.123Z'"67        unix_start_time_01 = 146474460012368        unix_stop_time_01 = 146474760012369        stix_pattern = "[(network-traffic:src_byte_count = 306 AND network-traffic:dst_byte_count = 604) OR (network-traffic:src_packets = 2898 AND network-traffic:dst_packets = 1)] START {} STOP {}".format(start_time_01, stop_time_01)70        query = _translate_query(stix_pattern)71        where_statement = "WHERE (destinationpackets = '1' AND sourcepackets = '2898') OR (destinationbytes = '604' AND sourcebytes = '306') {} START {} STOP {}".format(default_limit, unix_start_time_01, unix_stop_time_01)72        _test_query_assertions(query, selections, from_statement, where_statement)73    def test_text_search(self):74        stix_pattern = "[artifact:payload_bin LIKE '%Set-ItemProperty%' AND artifact:payload_bin LIKE '%New-Item%']"75        query = _translate_query(stix_pattern)76        where_statement = "WHERE TEXT SEARCH '%New-Item% AND %Set-ItemProperty%' {} {}".format(default_limit, default_time)77        _test_query_assertions(query, selections, from_statement, where_statement)78    def test_combined_observation_expression_with_qualifier(self):79        stix_pattern = "([ipv4-addr:value = '192.168.1.2'] OR [network-traffic:src_port = 8080]) START t'2020-09-11T13:00:52.000Z' STOP t'2020-09-11T13:59:04.000Z'"80        query = _translate_query(stix_pattern)81        where_statement_01 = "WHERE (sourceip = '192.168.1.2' OR destinationip = '192.168.1.2') limit 10000 START 1599829252000 STOP 1599832744000"82        where_statement_02 = "WHERE sourceport = '8080' limit 10000 START 1599829252000 STOP 1599832744000"83        assert len(query['queries']) == 284        assert query['queries'][0] == selections + from_statement + where_statement_01...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
