Best Python code snippet using localstack_python
events_starter.py
Source:events_starter.py  
...71                            return False72            elif isinstance(value, list) and not identify_content_base_parameter_in_pattern(value):73                if (74                    isinstance(event_value, list)75                    and get_two_lists_intersection(value, event_value) == []76                ):77                    return False78                elif (79                    not isinstance(event_value, list)80                    and isinstance(event_value, (str, int))81                    and event_value not in value82                ):83                    return False84            elif isinstance(value, list) and identify_content_base_parameter_in_pattern(value):85                if not filter_event_with_content_base_parameter(value, event_value):86                    return False87            elif isinstance(value, (str, dict)):88                try:89                    value = json.loads(value) if isinstance(value, str) else value90                    if isinstance(value, dict) and not filter_event(value, event_value):91                        return False92                except json.decoder.JSONDecodeError:93                    return False94        return True95    rule_information = self.events_backend.describe_rule(rule_name)96    if not rule_information:97        LOG.info('Unable to find rule "%s" in backend: %s', rule_name, rule_information)98        return False99    if rule_information.event_pattern._pattern:100        event_pattern = rule_information.event_pattern._pattern101        if not filter_event(event_pattern, event):102            return False103    return True104def process_events(event: Dict, targets: List[Dict]):105    for target in targets:106        arn = target["Arn"]107        changed_event = filter_event_with_target_input_path(target, event)108        try:109            send_event_to_target(arn, changed_event, aws_stack.get_events_target_attributes(target))110        except Exception as e:111            LOG.info(f"Unable to send event notification {truncate(event)} to target {target}: {e}")112def apply_patches():113    # Fix events ARN114    def rule_model_generate_arn(self, name):115        return "arn:aws:events:{region_name}:{account_id}:rule/{name}".format(116            region_name=self.region_name, account_id=TEST_AWS_ACCOUNT_ID, name=name117        )118    # specific logic for put_events which forwards matching events to target listeners119    def events_handler_put_events(self):120        entries = self._get_param("Entries")121        events = list(map(lambda event: {"event": event, "uuid": str(uuid.uuid4())}, entries))122        _dump_events_to_files(events)123        event_rules = self.events_backend.rules124        for event_envelope in events:125            event = event_envelope["event"]126            event_bus = event.get("EventBusName") or DEFAULT_EVENT_BUS_NAME127            matchine_rules = [r for r in event_rules.values() if r.event_bus_name == event_bus]128            if not matchine_rules:129                continue130            formatted_event = {131                "version": "0",132                "id": event_envelope["uuid"],133                "detail-type": event.get("DetailType"),134                "source": event.get("Source"),135                "account": TEST_AWS_ACCOUNT_ID,136                "time": datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"),137                "region": self.region,138                "resources": event.get("Resources", []),139                "detail": json.loads(event.get("Detail", "{}")),140            }141            targets = []142            for rule in matchine_rules:143                if filter_event_based_on_event_format(self, rule.name, formatted_event):144                    targets.extend(self.events_backend.list_targets_by_rule(rule.name)["Targets"])145            # process event146            process_events(formatted_event, targets)147        content = {148            "FailedEntryCount": 0,  # TODO: dynamically set proper value when refactoring149            "Entries": list(map(lambda event: {"EventId": event["uuid"]}, events)),150        }151        self.response_headers.update(152            {"Content-Type": APPLICATION_AMZ_JSON_1_1, "x-amzn-RequestId": short_uid()}153        )154        return json.dumps(content), self.response_headers155    rule_model._generate_arn = rule_model_generate_arn156    events_handler.put_events = events_handler_put_events157def start_scheduler():158    JobScheduler.start()159def start_events(port=None, asynchronous=None, update_listener=None):160    port = port or config.service_port("events")161    apply_patches()162    start_scheduler()163    return start_moto_server(164        key="events",165        port=port,166        name="Cloudwatch Events",167        asynchronous=asynchronous,168        update_listener=update_listener,169    )170# ---------------171# HELPER METHODS172# ---------------173def get_two_lists_intersection(lst1, lst2):174    lst3 = [value for value in lst1 if value in lst2]175    return lst3176def identify_content_base_parameter_in_pattern(parameters):177    if any(178        list(param.keys())[0] in CONTENT_BASE_FILTER_KEYWORDS179        for param in parameters180        if isinstance(param, dict)181    ):182        return True183def filter_event_with_content_base_parameter(pattern_value, event_value):184    for element in pattern_value:185        if (isinstance(element, (str, int))) and (event_value == element or element in event_value):186            return True187        elif isinstance(element, dict):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
