How to use forward_events method in localstack

Best Python code snippet using localstack_python

forward_events.py

Source:forward_events.py Github

copy

Full Screen

1import awkward as ak2from hzupsilonphoton.builders import (3 build_boson,4 build_bosons_combination,5 build_dimuons,6 build_good_muons,7 build_good_photons,8 build_mu_1,9 build_mu_2,10 build_photon,11 build_upsilon,12)13from hzupsilonphoton.feed_forward import (14 FeedForwardSequence,15 FilterSequence,16 ObjectSequence,17 WeightSequence,18)19from hzupsilonphoton.filters import (20 lumisection_filter,21 mass_selection_filter,22 signal_selection_filter,23 trigger_filter,24)25from hzupsilonphoton.weighters import (26 generator_weight,27 l1prefr_weights,28 muon_id_weight,29 muon_iso_weight,30 photon_electron_veto_weight,31 photon_id_weight,32 pileup_weight,33)34forward_events = FeedForwardSequence("base_sequence")35forward_events.register_sequence(FilterSequence("lumisection", lumisection_filter))36forward_events.register_sequence(WeightSequence("pileup", pileup_weight))37forward_events.register_sequence(WeightSequence("generator", generator_weight))38forward_events.register_sequence(WeightSequence("l1_prefiring", l1prefr_weights))39forward_events.register_sequence(FilterSequence("trigger", trigger_filter))40forward_events.register_sequence(ObjectSequence("good_muons", build_good_muons))41forward_events.register_sequence(ObjectSequence("good_photons", build_good_photons))42forward_events.register_sequence(43 FilterSequence("n_muons", lambda evts: ak.num(evts.events.good_muons) >= 2)44)45forward_events.register_sequence(46 FilterSequence("n_photons", lambda evts: ak.num(evts.events.good_photons) >= 1)47)48forward_events.register_sequence(ObjectSequence("dimuons", build_dimuons))49forward_events.register_sequence(50 FilterSequence("n_dimuons", lambda evts: ak.num(evts.events.dimuons) >= 1)51)52forward_events.register_sequence(53 ObjectSequence("bosons_combinations", build_bosons_combination)54)55forward_events.register_sequence(56 FilterSequence(57 "n_bosons", lambda evts: ak.num(evts.events.bosons_combinations) >= 158 )59)60forward_events.register_sequence(ObjectSequence("boson", build_boson))61forward_events.register_sequence(ObjectSequence("mu_1", build_mu_1))62forward_events.register_sequence(ObjectSequence("mu_2", build_mu_2))63forward_events.register_sequence(ObjectSequence("upsilon", build_upsilon))64forward_events.register_sequence(ObjectSequence("photon", build_photon))65forward_events.register_sequence(WeightSequence("muon_id", muon_id_weight))66forward_events.register_sequence(WeightSequence("muon_iso", muon_iso_weight))67forward_events.register_sequence(WeightSequence("photon_id", photon_id_weight))68forward_events.register_sequence(69 WeightSequence("photon_electron_veto", photon_electron_veto_weight)70)71forward_events.register_sequence(72 FilterSequence("signal_selection", signal_selection_filter)73)74forward_events.register_sequence(75 FilterSequence("mass_selection", mass_selection_filter)...

Full Screen

Full Screen

test_002_cache.py

Source:test_002_cache.py Github

copy

Full Screen

1#!/usr/bin/env python2# coding: utf83#4# Andreas Müller, 20095# andrmuel@ee.ethz.ch6#7# This code may be freely used under GNU GPL conditions.8import unittest9import types10from ace import event, contexts, cache, rulebase11from ace.util.exceptions import *12from ace.util import configuration, logging13from ace.basisfunctions import rulecomponents14class TestEventHandler:15 """16 Test EventHandler, to see which events would be forwarded ..17 """18 def __init__(self):19 self.events = []20 21 def generateOutputEvent(self, event):22 self.events.append(event)23class TestCache(unittest.TestCase):24 """25 Unittest for event cache Class.26 """27 28 def setUp(self):29 self.config = configuration.Config()30 self.logger = logging.Logger(self.config)31 self.evgen = event.EventGenerator()32 self.ticker = None33 self.cache = cache.EventCache(self.config, self.logger, self.ticker)34 self.cm = contexts.ContextManager(self.config, self.logger, self.ticker, self.cache)35 self.eh = TestEventHandler()36 def testDrop(self):37 events = self.evgen.randomEvents(20)38 self.cache.addEvents(events)39 dropfunc = rulecomponents.drop40 self.assert_(len(self.cache.events)==20)41 drop = [events[10],events[11]]42 dropfunc(cache=self.cache, selected_events=drop)43 self.assert_(len(self.cache.events)==18)44 self.assert_(events[9] in self.cache.getEvents())45 self.assert_(events[10] not in self.cache.getEvents())46 self.assert_(events[11] not in self.cache.getEvents())47 self.assert_(events[12] in self.cache.getEvents())48 def testForward(self):49 events = self.evgen.randomEvents(20)50 events[1].forwarded = True51 self.cache.addEvents(events)52 # events2 = self.evgen.randomEvents(20)53 # forward_events = events[0:10]+events2[0:10]54 forward_events = events[0:10]55 forwardfunc = rulecomponents.forward56 forwardfunc(cache=self.cache, selected_events=forward_events, core=self.eh)57 self.assert_(len(self.eh.events)==9)58 self.assert_(events[0] in self.eh.events)59 self.assert_(events[1] not in self.eh.events) # was already forwarded60 # self.assert_(events2[0] not in self.eh.events) # not in cache61if __name__ == '__main__':...

Full Screen

Full Screen

webhook.py

Source:webhook.py Github

copy

Full Screen

...13 def post(self, request: Request) -> Response:14 events = request.data['events']15 is_verification = events[0]['replyToken'] == '00000000000000000000000000000000'16 if not is_verification:17 self.forward_events(request, events)18 return Response(None, HTTP_200_OK)19 def forward_events(self, request: Request, events: list) -> List[APIResponse]:20 events = [21 self.forward_event(request, event)22 for event in events23 ]24 return events25 def forward_event(self, request: Request, event: dict) -> APIResponse:26 source_type = event['source']['type']27 source_id = (28 event['source']['groupId'] if source_type == 'group' else29 event['source']['roomId'] if source_type == 'room' else30 event['source']['userId']31 )32 event_type = event['type']33 url = reverse(f'webhook-{source_type}-{event_type}', [source_id], request=request)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful