Best Python code snippet using localstack_python
newspaper_cdk_stack.py
Source:newspaper_cdk_stack.py  
...149        # Give the lambda permission to write to the destination bucket150        self.data_bucket.grant_read_write(download_jp2_lambda)151        output_topic.grant_publish(download_jp2_lambda)152        # Connect the lambda to the input sqs queue153        download_jp2_lambda.add_event_source(lambda_sources.SqsEventSource(pages_sqs))154        return output_topic155    def buildConvertJp2(self):156        # Now that we have the imagemagick layer, we can build the convert lambda on top of it157        convert_jp2_lambda = PythonFunction(158          self, 'ConvertJP2',159          entry='convert_jp2_lambda',160          index='convert_jp2.py',161          handler='handler',162          layers=[self.imagemagick_layer],163          environment= {164            'DATA_BUCKET': self.data_bucket.bucket_name,165          },166          timeout=core.Duration.minutes(1),167          memory_size=512,168        )169        # Give the lambda permission to write to the destination bucket170        self.data_bucket.grant_read_write(convert_jp2_lambda)171        172        # Connect the lambda to the s3 bucket173        convert_jp2_lambda.add_event_source(lambda_sources.S3EventSource(self.data_bucket,174          events=[s3.EventType.OBJECT_CREATED],175          filters=[s3.NotificationKeyFilter(prefix="jp2/", suffix="jp2")],176        ))177        return178    def buildProcessPages(self):179      jpg_sns_topic = sns.Topic(self, 'JpgTopic')180      self.data_bucket.add_event_notification(181                          s3.EventType.OBJECT_CREATED, 182                          s3_notifications.SnsDestination(jpg_sns_topic),183                          s3.NotificationKeyFilter(prefix="converted/", suffix="jpg"),184                          )185      jpg_sqs_queue = sqs.Queue(self, 'JpgQueue')186      jpg_sns_topic.add_subscription(subs.SqsSubscription(jpg_sqs_queue))187      detectron_image = ecr_assets.DockerImageAsset(self, "Detectron", directory="process_pages")188      print("{}".format(detectron_image))189      log_group = logs.LogGroup(self, "LogGroup")190      exec_bucket = s3.Bucket(self, "EcsLogs")191      cluster = ecs.Cluster(self, "DetectronCluster", 192                            vpc=self.vpc,193                            execute_command_configuration={194                              "log_configuration": {195                                  "cloud_watch_log_group": log_group,196                                  "s3_bucket": exec_bucket,197                                  "s3_key_prefix": "exec-command-output"198                              },199                              "logging": ecs.ExecuteCommandLogging.OVERRIDE200                            }201                            )202      # g4dn.xlarge seems to be the cheapest instance type with nvidia GPUs203      # t2.micro lets you stay in the free tier204      # t2.small is (one of) the cheapest with enough memory to hold the model205      # cluster.add_capacity("DefaultAutoScalingGroupCapacity",206      #     instance_type=ec2.InstanceType("t2.small"),207      #     min_capacity=0,208      #     desired_capacity=0,209      #     max_capacity=1,210      #     key_name='virginia',211      # )212      auto_scaling_group = autoscaling.AutoScalingGroup(self, "DetectronASG",213          vpc=self.vpc,214          instance_type=ec2.InstanceType("t2.small"),215          machine_image=ecs.EcsOptimizedImage.amazon_linux2(),216          min_capacity=0,217          max_capacity=1,218          instance_monitoring=autoscaling.Monitoring.BASIC,219          key_name='virginia',220          new_instances_protected_from_scale_in=False,221      )222      capacity_provider = ecs.AsgCapacityProvider(self, "AsgCapacityProvider",223          auto_scaling_group=auto_scaling_group,224          enable_managed_termination_protection=False,225      )226      cluster.add_asg_capacity_provider(capacity_provider)227      task_definition = ecs.Ec2TaskDefinition(self, "TaskDef")228      # container = task_definition.add_container("DefaultContainer",229      #     image=ecs.ContainerImage.from_registry(detectron_image.image_uri),230      #     environment= {231      #       "SQS_URL": "{}".format(jpg_sqs_queue.queue_url)232      #     },233      #     memory_limit_mib=512,234      # )235      container = task_definition.add_container("DefaultContainer",236          image=ecs.ContainerImage.from_asset("./process_pages"),237          environment= {238            "SQS_URL": "{}".format(jpg_sqs_queue.queue_url)239          },240          memory_limit_mib=1700,241          logging=ecs.LogDrivers.aws_logs(stream_prefix="detectron"),242      )243      start_task_lambda = PythonFunction(244        self, 'StartTask',245        runtime=_lambda.Runtime.PYTHON_3_8,246        entry='start_task_lambda',247        index='start_task.py',248        handler='handler',249        environment= {250          'CLUSTER_NAME': cluster.cluster_name,251          'CONTAINER_ID': task_definition.default_container.container_name,252          'TASK_ARN': task_definition.task_definition_arn,253        },254      )255      task_definition.execution_role.add_to_policy(iam.PolicyStatement(256        effect=iam.Effect.ALLOW,257        resources=['*'],258        actions=["sqs:DeleteMessage",259                "sqs:ListQueues",260                "sqs:GetQueueUrl",261                "sqs:ListDeadLetterSourceQueues",262                "sqs:DeleteMessageBatch",263                "sqs:ReceiveMessage",264                "sqs:GetQueueAttributes",265                "sqs:ListQueueTags"]266      ))267      task_definition.task_role.add_to_policy(iam.PolicyStatement(268        effect=iam.Effect.ALLOW,269        resources=['*'],270        actions=["sqs:DeleteMessage",271                "sqs:ListQueues",272                "sqs:GetQueueUrl",273                "sqs:ListDeadLetterSourceQueues",274                "sqs:DeleteMessageBatch",275                "sqs:ReceiveMessage",276                "sqs:GetQueueAttributes",277                "sqs:ListQueueTags",278                "s3:PutObject",279                "s3:GetObject",]280      ))281      # arn_parts = task_definition.task_definition_arn.split(':')282      # print(arn_parts)283      # Connect the lambda to the s3 bucket284      start_task_lambda.role.add_to_policy(iam.PolicyStatement(285        effect=iam.Effect.ALLOW,286        resources=['*'],287        actions=['ecs:ListTasks', 'ecs:RunTask', 'iam:PassRole'],288      ))289      start_task_lambda.add_event_source(lambda_sources.SnsEventSource(jpg_sns_topic))290    def buildCheckHeadlines(self):291        dlq_sqs = sqs.Queue(292          self, 'BannerHeadlinesDeadLetter',293        )294        dlq = sqs.DeadLetterQueue(max_receive_count=4, queue=dlq_sqs)295        banner_queue = sqs.Queue(296          self, 'BannerHeadlines', dead_letter_queue=dlq,297        )298        # Now that we have the imagemagick layer, we can build the convert lambda on top of it299        check_headlines_lambda = PythonFunction(300          self, 'CheckHeadlines',301          entry='check_headlines_lambda',302          index='check_headlines.py',303          handler='handler',304          environment= {305            'DATA_BUCKET': self.data_bucket.bucket_name,306            'BANNER_QUEUE_URL': banner_queue.queue_url,307          },308          timeout=core.Duration.seconds(10),309          memory_size=128,310        )311        self.data_bucket.grant_read_write(check_headlines_lambda)312        banner_queue.grant_send_messages(check_headlines_lambda)313        # Connect the lambda to the s3 bucket314        check_headlines_lambda.add_event_source(lambda_sources.S3EventSource(self.data_bucket,315          events=[s3.EventType.OBJECT_CREATED],316          filters=[s3.NotificationKeyFilter(prefix="converted/", suffix="-predictions.json")],317        ))318        return banner_queue319    def buildShrinker(self, shrink_queue):320        shrink_jpg_lambda = PythonFunction(321          self, 'ShrinkJpg',322          entry='shrink_jpg_lambda',323          index='shrink_jpg.py',324          handler='handler',325          layers=[self.imagemagick_layer],326          environment= {327            'DATA_BUCKET': self.data_bucket.bucket_name,328            'WEB_BUCKET': self.web_bucket.bucket_name,329          },330          timeout=core.Duration.seconds(30),331          memory_size=512,332        )333        # Give the lambda permission to write to the destination bucket334        self.data_bucket.grant_read(shrink_jpg_lambda)335        self.web_bucket.grant_read_write(shrink_jpg_lambda)336        shrink_jpg_lambda.add_event_source(lambda_sources.SqsEventSource(shrink_queue))337        return338    def buildCloudfront(self):339      cloudfront.Distribution(self, "cloudfrontDistro",...stellar_stream_stack.py
Source:stellar_stream_stack.py  
...80                             security_group=sg,81                             code=lmb.Code.from_asset(82                                   params["lambda"]["kstream"]["code_folder"])83                             )84        func1.add_event_source(kinesis_event_source)85        cw_alarm_notification = TeamsNotifier(self, "CwAlarmNotification")86        database_topic_arn = f'arn:aws:sns:{aws_region}:{aws_account}:stellar-cw-dashboar-stack-DatabaseAlarmTopic-1WM83SYXHEMNX'87        database_topic = sns.Topic.from_topic_arn(self,88                                                  'database_topic',89                                                  database_topic_arn)90        cw_alarm_notification._teams_notifier.add_event_source(SnsEventSource(database_topic))91        kinesis_topic_arn = f'arn:aws:sns:{aws_region}:{aws_account}:stellar-cw-dashboar-stack-KinesisAlarmTopic-1LWXVV357LT3P'92        kinesis_topic = sns.Topic.from_topic_arn(self,93                                                 'kinesis_topic',94                                                 kinesis_topic_arn)95        cw_alarm_notification._teams_notifier.add_event_source(SnsEventSource(kinesis_topic))96        lambda_topic_arn = f'arn:aws:sns:{aws_region}:{aws_account}:stellar-cw-dashboar-stack-LambdaAlarmTopic-1O91TDOEAAFLK'97        lambda_topic = sns.Topic.from_topic_arn(self,98                                                'lambda_topic',99                                                lambda_topic_arn)...analyzer.py
Source:analyzer.py  
...21    def _name(self, applet, event):22        # return "{}-{}".format(applet.name, event)23        return event24    def add_generic_event(self, applet, name, signal):25        event_source = self.event_analyzer.add_event_source(26            name=self._name(applet, name), kind="change", width=signal.nbits)27        signal_r = Signal.like(signal)28        event_source.sync += [29            signal_r.eq(signal),30        ]31        event_source.comb += [32            event_source.data.eq(signal),33            event_source.trigger.eq(signal != signal_r),34        ]35    def add_in_fifo_event(self, applet, fifo):36        event_source = self.event_analyzer.add_event_source(37            name=self._name(applet, "fifo-in"), kind="strobe", width=8)38        event_source.sync += [39            event_source.trigger.eq(fifo.writable & fifo.we),40            event_source.data.eq(fifo.din)41        ]42    def add_out_fifo_event(self, applet, fifo):43        event_source = self.event_analyzer.add_event_source(44            name=self._name(applet, "fifo-out"), kind="strobe", width=8)45        event_source.comb += [46            event_source.trigger.eq(fifo.readable & fifo.re),47            event_source.data.eq(fifo.dout)48        ]49    def add_pin_event(self, applet, name, triple):50        self._pins.append((self._name(applet, name), triple))51    def _finalize_pin_events(self):52        if not self._pins:53            return54        reg_reset = Signal()55        self.sync += reg_reset.eq(self.mux_interface.reset)56        pin_oes = []57        pin_ios = []58        for (name, triple) in self._pins:59            sync_i = Signal.like(triple.i)60            self.specials += MultiReg(triple.i, sync_i)61            pin_oes.append((name, triple.oe))62            pin_ios.append((name, Mux(triple.oe, triple.o, sync_i)))63        sig_oes = Cat(oe for n, oe in pin_oes)64        reg_oes = Signal.like(sig_oes)65        sig_ios = Cat(io for n, io in pin_ios)66        reg_ios = Signal.like(sig_ios)67        self.sync += [68            reg_oes.eq(sig_oes),69            reg_ios.eq(sig_ios),70        ]71        oe_event_source = self.event_analyzer.add_event_source(72            name="oe", kind="change", width=value_bits_sign(sig_oes)[0],73            fields=[(name, value_bits_sign(oe)[0]) for name, oe in pin_oes])74        io_event_source = self.event_analyzer.add_event_source(75            name="io", kind="change", width=value_bits_sign(sig_ios)[0],76            fields=[(name, value_bits_sign(io)[0]) for name, io in pin_ios])77        self.comb += [78            oe_event_source.trigger.eq(reg_reset | (sig_oes != reg_oes)),79            oe_event_source.data.eq(sig_oes),80            io_event_source.trigger.eq(reg_reset | (sig_ios != reg_ios)),81            io_event_source.data.eq(sig_ios),...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
