How to use test_secret method in localstack

Best Python code snippet using localstack_python

diffscanworker_test.py

Source:diffscanworker_test.py Github

copy

Full Screen

1import asyncio2import json3import os4from unittest import mock5from unittest import TestCase6from unittest.mock import MagicMock7from unittest.mock import patch8import psycopg29import pytest10import responses11from requests.exceptions import HTTPError12from .messagemock_test import MessageMock13from detect_secrets_stream.github_client.github_app import GitHubApp14from detect_secrets_stream.github_client.installation_id_request_exception import InstallationIDRequestException15from detect_secrets_stream.scan_worker.commit import Commit16from detect_secrets_stream.scan_worker.diffscanworker import DiffScanWorker17from detect_secrets_stream.scan_worker.secret import Secret18from detect_secrets_stream.secret_corpus_db.data_cleanliness_exception import DataCleanlinessException19from detect_secrets_stream.util.conf import ConfUtil20class DiffScanWorkerTest (TestCase):21 def setUp(self):22 self.github_host = ConfUtil.load_github_conf()['host']23 self.email_domain = 'test.test'24 self.kafka_config = {25 'client.id': 'scan-worker-test',26 'group.id': 'scan-worker-test-group',27 'bootstrap.servers': 'fake_KAFKA_BROKERS_SASL',28 'security.protocol': 'SASL_SSL',29 'sasl.mechanisms': 'PLAIN',30 'sasl.username': 'token',31 'sasl.password': 'fake_KAFKA_API_KEY',32 'api.version.request': True,33 'broker.version.fallback': '0.10.2.1',34 'log.connection.close': False,35 }36 self.test_diff_topic = 'diff-scan-test'37 self.test_notification_topic = 'notification-test'38 self.diffscanworker = DiffScanWorker(39 self.kafka_config, self.test_diff_topic, self.test_notification_topic, async_sleep_time=0.1,40 )41 self.diff_filename = './diff.txt'42 self.test_commit = '0000000000'43 self.test_repo = 'test-repo'44 self.test_branch = 'test-branch'45 self.test_user = 'test-user'46 self.test_json_payload = {47 'commitHash': self.test_commit,48 'repoSlug': self.test_repo,49 'branchName': self.test_branch,50 'githubUser': self.test_user,51 'repoPublic': 'true',52 }53 self.test_json_payload_private = {54 'commitHash': self.test_commit,55 'repoSlug': self.test_repo,56 'branchName': self.test_branch,57 'githubUser': self.test_user,58 'repoPublic': 'false',59 }60 def tearDown(self):61 if os.path.exists(self.diff_filename):62 os.remove(self.diff_filename)63 self.diffscanworker.stop()64 @patch('detect_secrets_stream.github_client.github_app.GitHubApp._create_installation_token')65 @patch('detect_secrets_stream.github_client.github_app.GitHubApp._get_installation_id')66 @responses.activate67 def test_create_diff_file_for_private_repo(68 self, mock_get_install_id, mock_create_install_token,69 ):70 repo = 'fake-org/fake-repo'71 commit = '0000000000'72 responses.add(73 responses.GET, f'https://{self.github_host}/api/v3/repos/{repo}/commits/{commit}',74 headers={'X-RateLimit-Remaining': 'test'}, body='this is a test diff', status=200,75 )76 mock_get_install_id.return_value = 177 self.diffscanworker.create_diff_file(78 repo, commit, github=GitHubApp().get_github_client(repo),79 )80 with open(self.diff_filename, 'r') as diff_file:81 content = diff_file.read()82 self.assertEqual(content, 'this is a test diff')83 mock_get_install_id.assert_called_with('fake-org/fake-repo')84 mock_create_install_token.assert_called_with(1)85 @responses.activate86 def test_create_diff_file(self):87 repo = 'fake-org/fake-repo'88 commit = '0000000000'89 responses.add(90 responses.GET, f'https://{self.github_host}/api/v3/repos/{repo}/commits/{commit}',91 headers={'X-RateLimit-Remaining': 'test'}, body='this is a test diff', status=200,92 )93 self.diffscanworker.create_diff_file(repo, commit, self.diffscanworker.github)94 with open(self.diff_filename, 'r') as diff_file:95 content = diff_file.read()96 self.assertEqual(content, 'this is a test diff')97 @responses.activate98 @patch('logging.Logger.error')99 def test_create_diff_file_bad_request(self, mock):100 repo = 'fake-org/fake-repo'101 commit = '0000000000'102 responses.add(103 responses.GET, f'https://{self.github_host}/api/v3/repos/{repo}/commits/{commit}',104 headers={'X-RateLimit-Remaining': 'test'}, body='not found', status=401,105 )106 with pytest.raises(HTTPError, match=r'401 .*'):107 self.diffscanworker.create_diff_file(repo, commit, self.diffscanworker.github)108 mock.assert_called()109 def test_run_detect_secrets_no_results(self):110 commit = '0000000000'111 diff_file_content = 'no secrets here!'112 with open(self.diff_filename, 'w') as diff_file:113 diff_file.write(diff_file_content)114 ds_output = json.loads(self.diffscanworker.run_detect_secrets(commit))115 self.assertNotIn(self.diff_filename, ds_output['results'])116 self.assertEqual(0, len(ds_output['results']))117 def test_run_detect_secrets_slack(self):118 commit = '0000000000'119 fake_slack_token = 'xoxp-1-testytesttest' # pragma: whitelist secret120 with open(self.diff_filename, 'w') as diff_file:121 diff_file.write(fake_slack_token)122 ds_output = json.loads(self.diffscanworker.run_detect_secrets(commit, verify=False))123 self.assertIn(self.diff_filename, ds_output['results'])124 self.assertIn(125 fake_slack_token, [126 secret['secret'] for secret in ds_output['results'][self.diff_filename]127 ],128 )129 self.assertIn(130 'Slack Token', [131 secret['type'] for secret in ds_output['results'][self.diff_filename]132 ],133 )134 self.assertIn(135 1, [136 secret['line_number']137 for secret in ds_output['results'][self.diff_filename]138 ],139 )140 def test_validate_secrets_slack(self):141 commit = '0000000000'142 ds_output = {143 'results': {144 self.diff_filename: [145 {146 'line_number': 1, 'secret': 'fake_secret',147 'hashed_secret': 'fake_hashed_secret', 'type': 'Slack Token',148 'is_verified': True,149 },150 ],151 },152 }153 validated_secrets = self.diffscanworker.validate_secrets(154 json.dumps(ds_output), commit,155 )156 self.assertIn(157 'fake_secret', [158 secret.secret for secret in validated_secrets159 ],160 )161 self.assertIn(162 'Slack Token', [163 secret.secret_type for secret in validated_secrets164 ],165 )166 self.assertIn(167 1, [secret.diff_file_linenumber for secret in validated_secrets],168 )169 def test_validate_secrets_slack_unverified(self):170 commit = '0000000000'171 ds_output = {172 'results': {173 self.diff_filename: [174 {175 'line_number': 1, 'secret': 'fake_secret',176 'hashed_secret': 'fake_hashed_secret', 'type': 'Slack Token',177 'is_verified': False,178 },179 ],180 },181 }182 validated_secrets = self.diffscanworker.validate_secrets(183 json.dumps(ds_output), commit,184 )185 self.assertEqual(0, len(validated_secrets))186 def test_validate_secrets_slack_no_validation_info(self):187 commit = '0000000000'188 ds_output = {189 'results': {190 self.diff_filename: [191 {192 'line_number': 1, 'secret': 'fake_secret',193 'hashed_secret': 'fake_hashed_secret', 'type': 'Slack Token',194 },195 ],196 },197 }198 validated_secrets = self.diffscanworker.validate_secrets(199 json.dumps(ds_output), commit,200 )201 self.assertEqual(0, len(validated_secrets))202 def test_validate_secrets_unsupported_type(self):203 commit = '0000000000'204 ds_output = {205 'results': {206 self.diff_filename: [207 {208 'line_number': 1, 'secret': 'fake_secret',209 'hashed_secret': 'fake_hashed_secret', 'type': 'fake_type',210 },211 ],212 },213 }214 validated_secrets = self.diffscanworker.validate_secrets(215 json.dumps(ds_output), commit,216 )217 self.assertEqual(0, len(validated_secrets))218 def test_validate_secrets_no_verified_results(self):219 commit = '0000000000'220 ds_output = {221 'results': {},222 }223 validated_secrets = self.diffscanworker.validate_secrets(224 json.dumps(ds_output), commit,225 )226 self.assertEqual(0, len(validated_secrets))227 def test_validate_secrets_multifactor_verified(self):228 commit = '0000000000'229 ds_output = {230 'results': {231 self.diff_filename: [232 {233 'line_number': 1, 'secret': 'fake_secret',234 'hashed_secret': 'fake_hashed_secret', 'type': 'multifactor',235 'is_verified': True, 'other_factors': {'second factor': 'test'},236 },237 ],238 },239 }240 validated_secrets = self.diffscanworker.validate_secrets(241 json.dumps(ds_output), commit,242 )243 self.assertIn(244 'fake_secret', [245 secret.secret for secret in validated_secrets246 ],247 )248 self.assertIn(249 'multifactor', [250 secret.secret_type for secret in validated_secrets251 ],252 )253 self.assertIn(254 1, [secret.diff_file_linenumber for secret in validated_secrets],255 )256 self.assertIn(257 {'second factor': 'test'}, [secret.other_factors for secret in validated_secrets],258 )259 def test_validate_secrets_multifactor_unverified(self):260 commit = '0000000000'261 ds_output = {262 'results': {263 self.diff_filename: [264 {265 'line_number': 1, 'secret': 'fake_secret',266 'hashed_secret': 'fake_hashed_secret', 'type': 'multifactor',267 'is_verified': False, 'other_factors': {'second factor': 'test'},268 },269 ],270 },271 }272 validated_secrets = self.diffscanworker.validate_secrets(273 json.dumps(ds_output), commit,274 )275 self.assertEqual(0, len(validated_secrets))276 def test_extract_filename_linenumber(self):277 test_secret_1 = Secret('test_secret_1', 'test_type')278 test_secret_2 = Secret('test_secret_2', 'test_type')279 test_secret_3 = Secret('test_secret_3', 'test_type')280 test_secret_4 = Secret('test_secret_4', 'test_type')281 with open(self.diff_filename, 'w') as diff_file:282 with open('detect_secrets_stream/scan_worker/test_data/diff_files/generic.diff') as test_diff:283 test_diff_data = test_diff.read()284 diff_file.write(test_diff_data)285 # secret is in metadata286 test_secret_1.diff_file_linenumber = 3287 # secret line was neither added or removed288 test_secret_2.diff_file_linenumber = 16289 # secret line was removed290 test_secret_3.diff_file_linenumber = 17291 # secret line was added292 test_secret_4.diff_file_linenumber = 18293 test_secrets = [294 test_secret_1, test_secret_2,295 test_secret_3, test_secret_4,296 ]297 self.diffscanworker.extract_filename_linenumber(test_secrets)298 self.assertEqual(test_secret_1.filename, '')299 self.assertEqual(test_secret_1.linenumber, -1)300 self.assertEqual(test_secret_2.filename, 'diffscanworker.py')301 self.assertEqual(test_secret_2.linenumber, 42)302 self.assertEqual(test_secret_3.filename, 'diffscanworker.py')303 self.assertEqual(test_secret_3.linenumber, 42)304 self.assertEqual(test_secret_4.filename, 'diffscanworker.py')305 self.assertEqual(test_secret_4.linenumber, 43)306 @patch('detect_secrets_stream.scan_worker.diffscanworker.GHElookup')307 def test_lookup_additional_github_info(self, mock_ghe_lookup):308 user = 'username'309 repo = 'some-org/some-repo'310 commit = '000000'311 repo_public = 'true'312 self.diffscanworker = DiffScanWorker(313 self.kafka_config, self.test_diff_topic, self.test_notification_topic, async_sleep_time=0.1,314 )315 mock_ghe_lookup.return_value.ghe_email_lookup.return_value = 'pusheremail'316 mock_ghe_lookup.return_value.ghe_author_committer_lookup.return_value = \317 'author', 'authoremail', 'committer', 'committeremail'318 result = self.diffscanworker.lookup_additional_github_info(user, repo, commit, repo_public)319 assert result == ('pusheremail', 'author', 'authoremail', 'committer', 'committeremail')320 mock_ghe_lookup.assert_called_with(self.diffscanworker.github)321 mock_ghe_lookup.return_value.ghe_email_lookup.assert_called_with(user)322 mock_ghe_lookup.return_value.ghe_author_committer_lookup.assert_called_with(repo, commit)323 @patch('detect_secrets_stream.scan_worker.diffscanworker.GitHubApp.get_github_client')324 @patch('detect_secrets_stream.scan_worker.diffscanworker.GHElookup')325 def test_lookup_additional_github_info_private_repo(self, mock_ghe_lookup, mock_get_private_client):326 user = 'username'327 repo = 'some-org/some-repo'328 commit = '000000'329 repo_public = 'false'330 mock_ghe_lookup.return_value.ghe_email_lookup.return_value = 'pusheremail'331 mock_ghe_lookup.return_value.ghe_author_committer_lookup.return_value = \332 'author', 'authoremail', 'committer', 'committeremail'333 result = self.diffscanworker.lookup_additional_github_info(user, repo, commit, repo_public)334 assert result == ('pusheremail', 'author', 'authoremail', 'committer', 'committeremail')335 mock_ghe_lookup.assert_called_with(mock_get_private_client.return_value)336 mock_ghe_lookup.return_value.ghe_email_lookup.assert_called_with(user)337 mock_ghe_lookup.return_value.ghe_author_committer_lookup.assert_called_with(repo, commit)338 def test_get_github_client_for_repo_public(self):339 repo = 'some-org/some-repo'340 repo_public = 'true'341 client = self.diffscanworker.get_github_client_for_repo(repo, repo_public)342 assert client == self.diffscanworker.github343 @patch('detect_secrets_stream.scan_worker.diffscanworker.GitHubApp.get_github_client')344 def test_get_github_client_for_repo_private(self, mock_get_private_client):345 repo = 'some-org/some-repo'346 repo_public = 'false'347 client = self.diffscanworker.get_github_client_for_repo(repo, repo_public)348 assert client == mock_get_private_client.return_value349 @patch('detect_secrets_stream.scan_worker.diffscanworker.Vault')350 def test_write_to_vault(self, mock_vault):351 mock_vault.return_value.create_or_update_secret = mock_create_or_update = MagicMock()352 mock_create_or_update.return_value = MagicMock()353 test_secret = Secret('test_secret', 'test_type')354 test_secret.id = 1355 test_secret.other_factors = {'another': 'factor'}356 self.diffscanworker.write_to_vault(test_secret)357 mock_create_or_update.assert_called_with(1, 'test_secret', {'another': 'factor'})358 @patch('detect_secrets_stream.scan_worker.diffscanworker.Vault')359 def test_write_to_vault_fails(self, mock_vault):360 mock_vault.return_value.create_or_update_secret = mock_create_or_update = MagicMock()361 mock_create_or_update.return_value.raise_for_status.side_effect = HTTPError('oops')362 test_secret = Secret('test_secret', 'test_type')363 test_secret.other_factors = {'another': 'factor'}364 test_secret.id = 1365 with pytest.raises(HTTPError):366 self.diffscanworker.write_to_vault(test_secret)367 @patch('detect_secrets_stream.scan_worker.diffscanworker.Vault')368 def test_write_to_vault_fails_data_cleanliness(self, mock_vault):369 mock_vault.return_value.create_or_update_secret = mock_create_or_update = MagicMock()370 mock_create_or_update.return_value.raise_for_status.side_effect = HTTPError('oops')371 test_secret = Secret('test_secret', 'test_type')372 test_secret.other_factors = {'another': 'factor'}373 test_secret.encrypted_other_factors = None374 test_secret.id = None375 with pytest.raises(DataCleanlinessException):376 self.diffscanworker.write_to_vault(test_secret)377 @patch('detect_secrets_stream.scan_worker.diffscanworker.get_token_id_by_type_hash')378 @patch('detect_secrets_stream.scan_worker.diffscanworker.add_token_row')379 @patch('detect_secrets_stream.scan_worker.diffscanworker.connect_db')380 def test_insert_token_to_db_no_duplicate(self, mock_conn, mock_add_token, mock_get_token_id):381 test_secret = Secret('test_secret', 'test_type')382 test_secret.live = True383 test_secret.filename = 'filename.test'384 test_secret.linenumber = 100385 test_secret.other_factors = {'another': 'factor'}386 test_token_id = 'token_id'387 mock_get_token_id.return_value = []388 mock_add_token.return_value = test_token_id389 token_id = self.diffscanworker.insert_token_to_db(390 mock_conn, test_secret,391 )392 assert token_id == test_token_id393 mock_get_token_id.assert_called_with(394 mock.ANY, test_secret.secret_type, test_secret.hashed_secret,395 )396 mock_add_token.assert_called_with(397 mock.ANY, None, test_secret.secret_type,398 mock.ANY, None, test_secret.uuid, True, test_secret.hashed_secret,399 test_secret.owner_email,400 )401 @patch('detect_secrets_stream.scan_worker.diffscanworker.get_token_id_by_type_hash')402 @patch('detect_secrets_stream.scan_worker.diffscanworker.add_token_row')403 def test_insert_token_to_db_has_duplicate(self, mock_add_token, mock_get_token_id):404 test_secret = Secret('test_secret', 'test_type')405 test_token_id = 'token_id'406 mock_get_token_id.return_value = [test_token_id]407 token_id = self.diffscanworker.insert_token_to_db(408 'mock_conn', test_secret,409 )410 assert token_id == test_token_id411 mock_get_token_id.assert_called_with(412 mock.ANY, test_secret.secret_type, test_secret.hashed_secret,413 )414 mock_add_token.assert_not_called()415 @patch('detect_secrets_stream.scan_worker.diffscanworker.get_token_id_by_type_hash')416 @patch('detect_secrets_stream.scan_worker.diffscanworker.add_token_row')417 def test_insert_token_to_db_has_duplicate_tuple(self, mock_add_token, mock_get_token_id):418 test_secret = Secret('test_secret', 'test_type')419 test_token_id = 'token_id'420 mock_get_token_id.return_value = [(test_token_id,)]421 token_id = self.diffscanworker.insert_token_to_db(422 'mock_conn', test_secret,423 )424 assert token_id == test_token_id425 mock_get_token_id.assert_called_with(426 mock.ANY, test_secret.secret_type, test_secret.hashed_secret,427 )428 mock_add_token.assert_not_called()429 @patch('detect_secrets_stream.scan_worker.diffscanworker.get_token_id_by_type_hash')430 @patch('detect_secrets_stream.scan_worker.diffscanworker.add_token_row')431 @patch('detect_secrets_stream.scan_worker.diffscanworker.connect_db')432 def test_insert_token_with_uuid(self, mock_conn, mock_add_token, mock_get_token_id):433 test_secret = Secret('test_secret', 'test_type')434 test_secret.live = True435 test_uuid = 'test_uuid'436 test_secret.uuid = test_uuid437 test_token_id = 'token_id'438 mock_get_token_id.return_value = []439 mock_add_token.return_value = test_token_id440 token_id = self.diffscanworker.insert_token_to_db(441 mock_conn, test_secret,442 )443 assert token_id == test_token_id444 mock_get_token_id.assert_called_with(445 mock.ANY, test_secret.secret_type, test_secret.hashed_secret,446 )447 mock_add_token.assert_called_with(448 mock.ANY, None,449 'test_type', mock.ANY,450 None, 'test_uuid', True,451 mock.ANY, mock.ANY,452 )453 @patch('detect_secrets_stream.scan_worker.diffscanworker.add_commit_row')454 def test_insert_commit_to_db_has_duplicate(self, mock_add_commit):455 test_commit = Commit(commit_hash='0000000000', repo_slug='test-repo', branch_name='test-branch')456 test_commit.author_name = 'test-author'457 test_commit.author_email = f'test-author@{self.email_domain}'458 test_commit.pusher_username = 'test-pusher'459 test_commit.pusher_email = f'test-pusher@{self.email_domain}'460 test_commit.committer_name = 'test-committer'461 test_commit.committer_email = f'test-committer@{self.email_domain}'462 test_commit.token_id = 123463 mock_add_commit.side_effect = psycopg2.errors.UniqueViolation('foo')464 self.diffscanworker.insert_commit_to_db(465 'mock_conn', test_commit,466 )467 assert test_commit.uniqueness_hash is not None468 mock_add_commit.assert_called_with(469 mock.ANY, test_commit.token_id, test_commit.encrypted_commit_hash, test_commit.repo_slug,470 test_commit.encrypted_branch_name, test_commit.encrypted_filename, test_commit.encrypted_linenumber,471 test_commit.author_name, test_commit.author_email, test_commit.pusher_username,472 test_commit.pusher_email, test_commit.committer_name, test_commit.committer_email,473 test_commit.encrypted_location_url, test_commit.repo_public, test_commit.uniqueness_hash,474 )475 @patch('detect_secrets_stream.scan_worker.diffscanworker.add_commit_row')476 @patch('detect_secrets_stream.scan_worker.diffscanworker.connect_db')477 def test_write_to_db(self, mock_connect, mock_add_commit):478 test_commit = Commit(commit_hash='0000000000', repo_slug='test-repo', branch_name='test-branch')479 test_commit.location_url = 'https://abc.com'480 test_commit.author_name = 'test-author'481 test_commit.author_email = f'test-author@{self.email_domain}'482 test_commit.pusher_username = 'test-pusher'483 test_commit.pusher_email = f'test-pusher@{self.email_domain}'484 test_commit.committer_name = 'test-committer'485 test_commit.committer_email = f'test-committer@{self.email_domain}'486 test_secret = Secret('test_secret', 'test_type')487 test_secret.filename = 'test_filename'488 test_secret.linenumber = 100489 test_secret.other_factors = {'second factor': 'another one'}490 encrypted_secrets = [test_secret]491 mock_connect.return_value = 'test_connection'492 self.diffscanworker.insert_token_to_db = mock_insert_token = MagicMock()493 mock_insert_token.return_value = 1494 self.diffscanworker.write_to_vault = MagicMock()495 token_ids = self.diffscanworker.write_to_db(encrypted_secrets, test_commit)496 mock_connect.assert_called()497 mock_insert_token.assert_called_with('test_connection', test_secret)498 mock_add_commit.assert_called_with(499 'test_connection', 1, test_commit.encrypted_commit_hash, test_commit.repo_slug,500 test_commit.encrypted_branch_name, test_commit.encrypted_filename, test_commit.encrypted_linenumber,501 test_commit.author_name, test_commit.author_email, test_commit.pusher_username,502 test_commit.pusher_email, test_commit.committer_name, test_commit.committer_email,503 test_commit.encrypted_location_url, test_commit.repo_public, test_commit.uniqueness_hash,504 )505 self.assertEqual(token_ids, [1])506 self.diffscanworker.write_to_vault.assert_called()507 def test_write_messages_to_queue(self):508 for count in range(3):509 self.diffscanworker.write_message_to_queue = MagicMock()510 ids = []511 for i in range(count):512 ids.append('mock_id')513 self.diffscanworker.write_messages_to_queue(ids)514 self.diffscanworker.write_message_to_queue.call_count = count515 @patch('logging.Logger.error')516 def test_write_message_to_queue(self, mock_error):517 test_topic = 'notification-test'518 test_message = '{testy: test}'519 self.diffscanworker.producer = MagicMock()520 self.diffscanworker.write_message_to_queue(test_message, test_topic)521 self.diffscanworker.producer.produce.assert_called()522 self.diffscanworker.producer.poll.assert_called()523 self.diffscanworker.producer.flush.assert_called()524 mock_error.assert_not_called()525 @pytest.mark.asyncio526 def test_run_no_messages(self):527 consumer_wrapper = MagicMock()528 consumer_wrapper.poll.side_effect = [None]529 self.diffscanworker.consumer = consumer_wrapper530 consumer_wrapper.subscribe.assert_not_called()531 consumer_wrapper.poll.assert_not_called()532 consumer_wrapper.unsubscribe.assert_not_called()533 consumer_wrapper.close.assert_not_called()534 loop = asyncio.get_event_loop()535 loop.call_later(0.1, self.diffscanworker.stop)536 loop.run_until_complete(self.diffscanworker.run())537 consumer_wrapper.subscribe.assert_called()538 consumer_wrapper.poll.assert_called()539 consumer_wrapper.unsubscribe.assert_called()540 consumer_wrapper.close.assert_called()541 @pytest.mark.asyncio542 def test_run_with_message(self):543 self.diffscanworker.tracer = MagicMock()544 queue_message_value = json.dumps(self.test_json_payload)545 message = MessageMock(546 self.test_diff_topic, -1,547 0, 'key', queue_message_value,548 )549 consumer_wrapper = MagicMock()550 # return twice, first time is message, second time is None551 # Then we will break out of the running loop552 consumer_wrapper.poll.side_effect = [message, None]553 self.diffscanworker.consumer = consumer_wrapper554 self.diffscanworker.process_message = MagicMock()555 loop = asyncio.get_event_loop()556 loop.call_later(0.1, self.diffscanworker.stop)557 loop.run_until_complete(self.diffscanworker.run())558 self.diffscanworker.consumer.subscribe.assert_called()559 self.diffscanworker.consumer.poll.assert_called()560 self.diffscanworker.consumer.unsubscribe.assert_called()561 self.diffscanworker.consumer.close.assert_called()562 self.diffscanworker.process_message.assert_called_with(563 self.test_json_payload,564 )565 def test_process_message(self):566 self.diffscanworker.tracer = MagicMock()567 self.diffscanworker.create_diff_file = MagicMock()568 self.diffscanworker.run_detect_secrets = MagicMock()569 self.diffscanworker.validate_secrets = MagicMock()570 self.diffscanworker.extract_filename_linenumber = MagicMock()571 self.diffscanworker.lookup_additional_github_info = MagicMock()572 self.diffscanworker.write_to_db = MagicMock()573 self.diffscanworker.write_messages_to_queue = MagicMock()574 self.diffscanworker.process_message(self.test_json_payload)575 self.diffscanworker.create_diff_file.assert_called()576 self.diffscanworker.run_detect_secrets.assert_called()577 self.diffscanworker.validate_secrets.assert_called()578 self.diffscanworker.extract_filename_linenumber.assert_called()579 self.diffscanworker.lookup_additional_github_info.assert_not_called()580 self.diffscanworker.write_to_db.assert_called()581 self.diffscanworker.write_messages_to_queue.assert_called()582 def test_process_message_private_repo(self):583 self.diffscanworker.tracer = MagicMock()584 self.diffscanworker.create_diff_file = MagicMock()585 self.diffscanworker.run_detect_secrets = MagicMock()586 self.diffscanworker.validate_secrets = MagicMock()587 self.diffscanworker.extract_filename_linenumber = MagicMock()588 self.diffscanworker.lookup_additional_github_info = MagicMock()589 self.diffscanworker.write_to_db = MagicMock()590 self.diffscanworker.write_messages_to_queue = MagicMock()591 self.diffscanworker.github_app.get_github_client = MagicMock()592 self.diffscanworker.process_message(self.test_json_payload_private)593 self.diffscanworker.create_diff_file.assert_called()594 self.diffscanworker.run_detect_secrets.assert_called()595 self.diffscanworker.validate_secrets.assert_called()596 self.diffscanworker.extract_filename_linenumber.assert_called()597 self.diffscanworker.lookup_additional_github_info.assert_not_called()598 self.diffscanworker.write_to_db.assert_called()599 self.diffscanworker.write_messages_to_queue.assert_called()600 def test_process_message_private_repo_app_not_installed(self):601 self.diffscanworker.tracer = MagicMock()602 self.diffscanworker.create_diff_file = MagicMock()603 self.diffscanworker.run_detect_secrets = MagicMock()604 self.diffscanworker.validate_secrets = MagicMock()605 self.diffscanworker.extract_filename_linenumber = MagicMock()606 self.diffscanworker.lookup_additional_github_info = MagicMock()607 self.diffscanworker.write_to_db = MagicMock()608 self.diffscanworker.write_messages_to_queue = MagicMock()609 self.diffscanworker.github_app.get_github_client = \610 get_private_repo_github_mock = MagicMock()611 get_private_repo_github_mock.side_effect = InstallationIDRequestException()612 self.diffscanworker.process_message(self.test_json_payload_private)613 self.diffscanworker.create_diff_file.assert_not_called()614 self.diffscanworker.run_detect_secrets.assert_not_called()615 self.diffscanworker.validate_secrets.assert_not_called()616 self.diffscanworker.extract_filename_linenumber.assert_not_called()617 self.diffscanworker.lookup_additional_github_info.assert_not_called()618 self.diffscanworker.write_to_db.assert_not_called()619 self.diffscanworker.write_messages_to_queue.assert_not_called()620 def test_process_message_lookup_called(self):621 self.diffscanworker.tracer = MagicMock()622 self.diffscanworker.create_diff_file = MagicMock()623 self.diffscanworker.run_detect_secrets = MagicMock()624 self.diffscanworker.validate_secrets = MagicMock()625 self.diffscanworker.extract_filename_linenumber = mock_encrypt = MagicMock()626 self.diffscanworker.lookup_additional_github_info = mock_lookup = MagicMock()627 self.diffscanworker.write_to_db = MagicMock()628 self.diffscanworker.write_messages_to_queue = MagicMock()629 mock_encrypt.return_value = [Secret('test-secret', 'test-type')]630 mock_lookup.return_value = ('', '', '', '', '')631 self.diffscanworker.process_message(self.test_json_payload)632 self.diffscanworker.create_diff_file.assert_called()633 self.diffscanworker.run_detect_secrets.assert_called()634 self.diffscanworker.validate_secrets.assert_called()635 self.diffscanworker.extract_filename_linenumber.assert_called()636 self.diffscanworker.lookup_additional_github_info.assert_called()637 self.diffscanworker.write_to_db.assert_called()...

Full Screen

Full Screen

pi_cleaner_test.py

Source:pi_cleaner_test.py Github

copy

Full Screen

...13 pi_cleaner.get_db = mock_get_db = MagicMock()14 mock_get_db.return_value = mock_db_inst = MagicMock()15 return (pi_cleaner, mock_db_inst)16 @pytest.fixture17 def test_secret(self):18 secret = Secret('test_secret', 'secret_type')19 return secret20 @pytest.fixture21 def test_commit(self):22 commit = Commit('test_hash', 'test_repo', 'test_branch')23 return commit24 def test_remove_pi(self, test_pi_cleaner, test_secret, test_commit):25 pi_cleaner, mock_db_inst = test_pi_cleaner26 test_secret.live = False27 test_secret.id = 128 test_secret.remediation_date = datetime.datetime(2018, 8, 25, 0, 0, 0, 0).astimezone(tz=datetime.timezone.utc)29 test_secret.owner_email = f'pi@{self.email_domain}'30 test_secret.secret = 'secret'31 test_secret.encrypted_secret = 'secret_enc'...

Full Screen

Full Screen

revalidation_test.py

Source:revalidation_test.py Github

copy

Full Screen

...7from detect_secrets_stream.validation.revalidation import Revalidator8from detect_secrets_stream.validation.validateException import ValidationException9class TestRevalidator:10 @pytest.fixture11 def test_secret(self):12 secret = Secret('test_secret', 'secret_type')13 secret.lookup_token_owner = MagicMock()14 secret.verify = MagicMock()15 return secret16 @pytest.fixture17 def test_commit(self):18 commit = Commit('test_hash', 'test_repo', 'test_branch')19 return commit20 @pytest.fixture21 def test_revalidator(self):22 revalidator = Revalidator()23 revalidator.get_db = mock_get_db = MagicMock()24 mock_get_db.return_value = mock_db_inst = MagicMock()25 return (revalidator, mock_db_inst)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful