How to use start_stream_encryption method in localstack

Best Python code snippet using localstack_python

test_kinesis_encrypt_stream.py

Source:test_kinesis_encrypt_stream.py Github

copy

Full Screen

...33 assert "region" in param34 assert param["region"] == "us-east-1"35 def test_remediate_success(self):36 class TestClient(object):37 def start_stream_encryption(self, **kwargs):38 return 039 def describe_stream(self, **kwargs):40 stream = {'StreamDescription': {'EncryptionType':"NONE"}}41 return stream42 client = TestClient()43 obj = KinesisEncryptStream()44 assert obj.remediate("stream_name", client, "us-west1") == 045 def test_remediate_not_success_resource_not_found(self):46 class TestClient(object):47 def start_stream_encryption(self, **kwargs):48 return 049 def describe_stream(self, **kwargs):50 raise ClientError(51 {52 "Error": {53 "Code": "ResourceNotFoundException",54 "Message": "ResourceNotFoundException for reason",55 }56 },57 "KinesisEncryptStream",58 )59 client = TestClient()60 obj = KinesisEncryptStream()61 assert obj.remediate("stream_name", client, "us-west1") == 162 def test_remediate_not_success_limit_exceed(self):63 class TestClient(object):64 def start_stream_encryption(self, **kwargs):65 return 066 def describe_stream(self, **kwargs):67 raise ClientError(68 {69 "Error": {70 "Code": "LimitExceededException",71 "Message": "LimitExceededException for reason",72 }73 },74 "KinesisEncryptStream",75 )76 client = TestClient()77 obj = KinesisEncryptStream()78 assert obj.remediate("stream_name", client, "us-west1") == 179 def test_remediate_not_success_resource_in_use(self):80 class TestClient(object):81 def describe_stream(self, **kwargs):82 stream = {'StreamDescription': {'EncryptionType':"NONE"}}83 return stream84 def start_stream_encryption(self, **kwargs):85 raise ClientError(86 {87 "Error": {88 "Code": "ResourceInUseException",89 "Message": "ResourceInUseException for reason",90 }91 },92 "KinesisEncryptStream",93 )94 client = TestClient()95 obj = KinesisEncryptStream()96 assert obj.remediate("stream_name", client, "us-west1") == 197 def test_remediate_not_success_other_aws_exceptions(self):98 class TestClient(object):99 def describe_stream(self, **kwargs):100 stream = {'StreamDescription': {'EncryptionType':"NONE"}}101 return stream102 def start_stream_encryption(self, **kwargs):103 raise ClientError(104 {105 "Error": {106 "Code": "InvalidArgumentException",107 "Message": "InvalidArgumentException for reason",108 }109 },110 "KinesisEncryptStream",111 )112 client = TestClient()113 obj = KinesisEncryptStream()114 assert obj.remediate("stream_name", client, "us-west1") == 1115 def test_remediate_not_success_other_exceptions(self):116 class TestClient(object):117 def describe_stream(self, **kwargs):118 stream = {'StreamDescription': {'EncryptionType':"NONE"}}119 return stream120 def start_stream_encryption(self, **kwargs):121 raise Exception('This is the exception you expect to handle')122 client = TestClient()123 obj = KinesisEncryptStream()...

Full Screen

Full Screen

KDS_Apply_Encryption_Playbook.py

Source:KDS_Apply_Encryption_Playbook.py Github

copy

Full Screen

...44 # create service client using the assumed role credentials45 kinesis = boto3.client('kinesis',aws_access_key_id=xAcctAccessKey,aws_secret_access_key=xAcctSecretKey,aws_session_token=xAcctSeshToken)46 try:47 # put default KDS encryption48 response = kinesis.start_stream_encryption(StreamName=kdsName,EncryptionType='KMS',KeyId='alias/aws/kinesis')49 print(response)50 try:51 response = securityhub.update_findings(52 Filters={'Id': [{'Value': findingId,'Comparison': 'EQUALS'}]},53 Note={'Text': 'AWS managed KMS encryption has been applied to the Kinesis stream and the finding was archived. Review the stream for any other configurations that should be applied and consider creating a KMS-CMK specific for the stream if it is business-critical.','UpdatedBy': lambdaFunctionName},54 RecordState='ARCHIVED'55 )56 print(response)57 except Exception as e:58 print(e)59 except Exception as e:60 print(e)61 else:62 try:63 kinesis = boto3.client('kinesis')64 # put default KDS encryption65 response = kinesis.start_stream_encryption(StreamName=kdsName,EncryptionType='KMS',KeyId='alias/aws/kinesis')66 print(response)67 try:68 response = securityhub.update_findings(69 Filters={'Id': [{'Value': findingId,'Comparison': 'EQUALS'}]},70 Note={'Text': 'AWS managed KMS encryption has been applied to the Kinesis stream and the finding was archived. Review the stream for any other configurations that should be applied and consider creating a KMS-CMK specific for the stream if it is business-critical.','UpdatedBy': lambdaFunctionName},71 RecordState='ARCHIVED'72 )73 print(response)74 except Exception as e:75 print(e)76 except Exception as e:...

Full Screen

Full Screen

test_kinesis_encryption.py

Source:test_kinesis_encryption.py Github

copy

Full Screen

...7 resp = client.describe_stream(StreamName="my-stream")8 desc = resp["StreamDescription"]9 desc.should.have.key("EncryptionType").should.equal("NONE")10 desc.shouldnt.have.key("KeyId")11 client.start_stream_encryption(12 StreamName="my-stream", EncryptionType="KMS", KeyId="n/a"13 )14 resp = client.describe_stream(StreamName="my-stream")15 desc = resp["StreamDescription"]16 desc.should.have.key("EncryptionType").should.equal("KMS")17 desc.should.have.key("KeyId").equals("n/a")18@mock_kinesis19def test_disable_encryption():20 client = boto3.client("kinesis", region_name="us-west-2")21 client.create_stream(StreamName="my-stream", ShardCount=2)22 resp = client.describe_stream(StreamName="my-stream")23 desc = resp["StreamDescription"]24 desc.should.have.key("EncryptionType").should.equal("NONE")25 client.start_stream_encryption(26 StreamName="my-stream", EncryptionType="KMS", KeyId="n/a"27 )28 client.stop_stream_encryption(29 StreamName="my-stream", EncryptionType="KMS", KeyId="n/a"30 )31 resp = client.describe_stream(StreamName="my-stream")32 desc = resp["StreamDescription"]33 desc.should.have.key("EncryptionType").should.equal("NONE")...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful