How to use consumer_filter method in localstack

Best Python code snippet using localstack_python

baseconsumer.py

Source:baseconsumer.py Github

copy

Full Screen

1"""PgQ consumer framework for Python.2todo:3 - pgq.next_batch_details()4 - tag_done() by default5"""6from typing import Optional, Sequence, List, Iterator, Union, Dict, Any7import sys8import time9import optparse10import skytools11from skytools.basetypes import Cursor, Connection, DictRow12from pgq.event import Event13__all__ = ['BaseConsumer', 'BaseBatchWalker']14EventList = Union[List[Event], "BaseBatchWalker"]15BatchInfo = Dict[str, Any]16class BaseBatchWalker(object):17 """Lazy iterator over batch events.18 Events are loaded using cursor. It will be given19 as ev_list to process_batch(). It allows:20 - one for loop over events21 - len() after that22 """23 queue_name: str24 fetch_size: int25 sql_cursor: str26 curs: Cursor27 length: int28 batch_id: int29 fetch_status: int30 consumer_filter: Optional[str]31 def __init__(self, curs: Cursor, batch_id: int, queue_name: str, fetch_size: int = 300, consumer_filter: Optional[str] = None) -> None:32 self.queue_name = queue_name33 self.fetch_size = fetch_size34 self.sql_cursor = "batch_walker"35 self.curs = curs36 self.length = 037 self.batch_id = batch_id38 self.fetch_status = 0 # 0-not started, 1-in-progress, 2-done39 self.consumer_filter = consumer_filter40 def _make_event(self, queue_name: str, row: DictRow) -> Event:41 return Event(queue_name, row)42 def __iter__(self) -> Iterator[Event]:43 if self.fetch_status:44 raise Exception("BatchWalker: double fetch? (%d)" % self.fetch_status)45 self.fetch_status = 146 q = "select * from pgq.get_batch_cursor(%s, %s, %s, %s)"47 self.curs.execute(q, [self.batch_id, self.sql_cursor, self.fetch_size, self.consumer_filter])48 # this will return first batch of rows49 q = "fetch %d from %s" % (self.fetch_size, self.sql_cursor)50 while True:51 rows = self.curs.fetchall()52 if not len(rows):53 break54 self.length += len(rows)55 for row in rows:56 ev = self._make_event(self.queue_name, row)57 yield ev58 # if less rows than requested, it was final block59 if len(rows) < self.fetch_size:60 break61 # request next block of rows62 self.curs.execute(q)63 self.curs.execute("close %s" % self.sql_cursor)64 self.fetch_status = 265 def __len__(self) -> int:66 return self.length67class BaseConsumer(skytools.DBScript):68 """Consumer base class.69 Do not subclass directly (use pgq.Consumer or pgq.LocalConsumer instead)70 Config template::71 ## Parameters for pgq.Consumer ##72 # queue name to read from73 queue_name =74 # override consumer name75 #consumer_name = %(job_name)s76 # filter out only events for specific tables77 #table_filter = table1, table278 # whether to use cursor to fetch events (0 disables)79 #pgq_lazy_fetch = 30080 # whether to read from source size in autocommmit mode81 # not compatible with pgq_lazy_fetch82 # the actual user script on top of pgq.Consumer must also support it83 #pgq_autocommit = 084 # whether to wait for specified number of events,85 # before assigning a batch (0 disables)86 #pgq_batch_collect_events = 087 # whether to wait specified amount of time,88 # before assigning a batch (postgres interval)89 #pgq_batch_collect_interval =90 # whether to stay behind queue top (postgres interval)91 #pgq_keep_lag =92 # in how many seconds to write keepalive stats for idle consumers93 # this stats is used for detecting that consumer is still running94 #keepalive_stats = 30095 """96 # by default, use cursor-based fetch97 default_lazy_fetch: int = 30098 # should reader connection be used in autocommit mode99 pgq_autocommit: int = 0100 # proper variables101 consumer_name: str102 queue_name: str103 # compat variables104 pgq_queue_name: Optional[str] = None105 pgq_consumer_id: Optional[str] = None106 pgq_lazy_fetch: Optional[int] = None107 pgq_min_count: Optional[int] = None108 pgq_min_interval: Optional[str] = None109 pgq_min_lag: Optional[str] = None110 batch_info: Optional[BatchInfo] = None111 consumer_filter: Optional[str] = None112 keepalive_stats: int113 # statistics: time spent waiting for events114 idle_start: float115 stat_batch_start: float116 _batch_walker_class = BaseBatchWalker117 def __init__(self, service_name: str, db_name: str, args: Sequence[str]) -> None:118 """Initialize new consumer.119 @param service_name: service_name for DBScript120 @param db_name: name of database for get_database()121 @param args: cmdline args for DBScript122 """123 super().__init__(service_name, args)124 self.db_name = db_name125 # compat params126 self.consumer_name = self.cf.get("pgq_consumer_id", '')127 self.queue_name = self.cf.get("pgq_queue_name", '')128 # proper params129 if not self.consumer_name:130 self.consumer_name = self.cf.get("consumer_name", self.job_name)131 if not self.queue_name:132 self.queue_name = self.cf.get("queue_name")133 self.stat_batch_start = 0134 # compat vars135 self.pgq_queue_name = self.queue_name136 self.consumer_id = self.consumer_name137 # set default just once138 self.pgq_autocommit = self.cf.getint("pgq_autocommit", self.pgq_autocommit)139 if self.pgq_autocommit and self.pgq_lazy_fetch:140 raise skytools.UsageError("pgq_autocommit is not compatible with pgq_lazy_fetch")141 self.set_database_defaults(self.db_name, autocommit=self.pgq_autocommit)142 self.idle_start = time.time()143 def reload(self) -> None:144 skytools.DBScript.reload(self)145 self.pgq_lazy_fetch = self.cf.getint("pgq_lazy_fetch", self.default_lazy_fetch)146 # set following ones to None if not set147 self.pgq_min_count = self.cf.getint("pgq_batch_collect_events", 0) or None148 self.pgq_min_interval = self.cf.get("pgq_batch_collect_interval", '') or None149 self.pgq_min_lag = self.cf.get("pgq_keep_lag", '') or None150 # filter out specific tables only151 tfilt = []152 for t in self.cf.getlist('table_filter', ''):153 tfilt.append(skytools.quote_literal(skytools.fq_name(t)))154 if len(tfilt) > 0:155 expr = "ev_extra1 in (%s)" % ','.join(tfilt)156 self.consumer_filter = expr157 self.keepalive_stats = self.cf.getint("keepalive_stats", 300)158 def startup(self) -> None:159 """Handle commands here. __init__ does not have error logging."""160 if self.options.register:161 self.register_consumer()162 sys.exit(0)163 if self.options.unregister:164 self.unregister_consumer()165 sys.exit(0)166 return skytools.DBScript.startup(self)167 def init_optparse(self, parser: Optional[optparse.OptionParser] = None) -> optparse.OptionParser:168 p = super().init_optparse(parser)169 p.add_option('--register', action='store_true',170 help='register consumer on queue')171 p.add_option('--unregister', action='store_true',172 help='unregister consumer from queue')173 return p174 def process_event(self, db: Connection, event: Event) -> None:175 """Process one event.176 Should be overridden by user code.177 """178 raise Exception("needs to be implemented")179 def process_batch(self, db: Connection, batch_id: int, event_list: EventList) -> None:180 """Process all events in batch.181 By default calls process_event for each.182 Can be overridden by user code.183 """184 for ev in event_list:185 self.process_event(db, ev)186 def work(self) -> int:187 """Do the work loop, once (internal).188 Returns: true if wants to be called again,189 false if script can sleep.190 """191 db = self.get_database(self.db_name)192 curs = db.cursor()193 self.stat_start()194 # acquire batch195 batch_id = self._load_next_batch(curs)196 db.commit()197 if batch_id is None:198 return 0199 # load events200 ev_list = self._load_batch_events(curs, batch_id)201 db.commit()202 # process events203 self._launch_process_batch(db, batch_id, ev_list)204 # done205 self._finish_batch(curs, batch_id, ev_list)206 db.commit()207 self.stat_end(len(ev_list))208 return 1209 def register_consumer(self) -> int:210 self.log.info("Registering consumer on source queue")211 db = self.get_database(self.db_name)212 cx = db.cursor()213 cx.execute("select pgq.register_consumer(%s, %s)",214 [self.queue_name, self.consumer_name])215 res = cx.fetchone()[0]216 db.commit()217 return res218 def unregister_consumer(self) -> None:219 self.log.info("Unregistering consumer from source queue")220 db = self.get_database(self.db_name)221 cx = db.cursor()222 cx.execute("select pgq.unregister_consumer(%s, %s)",223 [self.queue_name, self.consumer_name])224 db.commit()225 def _launch_process_batch(self, db: Connection, batch_id: int, ev_list: EventList) -> None:226 self.process_batch(db, batch_id, ev_list)227 def _make_event(self, queue_name: str, row: DictRow) -> Event:228 return Event(queue_name, row)229 def _load_batch_events_old(self, curs: Cursor, batch_id: int) -> List[Event]:230 """Fetch all events for this batch."""231 # load events232 sql = "select * from pgq.get_batch_events(%d)" % batch_id233 if self.consumer_filter is not None:234 sql += " where %s" % self.consumer_filter235 curs.execute(sql)236 rows = curs.fetchall()237 # map them to python objects238 ev_list = []239 for r in rows:240 ev = self._make_event(self.queue_name, r)241 ev_list.append(ev)242 return ev_list243 def _load_batch_events(self, curs: Cursor, batch_id: int) -> EventList:244 """Fetch all events for this batch."""245 if self.pgq_lazy_fetch:246 return self._batch_walker_class(curs, batch_id, self.queue_name, self.pgq_lazy_fetch, self.consumer_filter)247 else:248 return self._load_batch_events_old(curs, batch_id)249 def _load_next_batch(self, curs: Cursor) -> Optional[int]:250 """Allocate next batch. (internal)"""251 q = """select * from pgq.next_batch_custom(%s, %s, %s, %s, %s)"""252 curs.execute(q, [self.queue_name, self.consumer_name,253 self.pgq_min_lag, self.pgq_min_count, self.pgq_min_interval])254 inf = dict(curs.fetchone().items())255 inf['tick_id'] = inf['cur_tick_id']256 inf['batch_end'] = inf['cur_tick_time']257 inf['batch_start'] = inf['prev_tick_time']258 inf['seq_start'] = inf['prev_tick_event_seq']259 inf['seq_end'] = inf['cur_tick_event_seq']260 self.batch_info = inf261 return self.batch_info['batch_id']262 def _finish_batch(self, curs: Cursor, batch_id: int, ev_list: EventList) -> None:263 """Tag events and notify that the batch is done."""264 curs.execute("select pgq.finish_batch(%s)", [batch_id])265 def stat_start(self) -> None:266 t = time.time()267 self.stat_batch_start = t268 if self.stat_batch_start - self.idle_start > self.keepalive_stats:269 self.stat_put('idle', round(self.stat_batch_start - self.idle_start, 4))270 self.idle_start = t271 def stat_end(self, count: int) -> None:272 t = time.time()273 self.stat_put('count', count)274 self.stat_put('duration', round(t - self.stat_batch_start, 4))275 if count > 0: # reset timer if we got some events276 self.stat_put('idle', round(self.stat_batch_start - self.idle_start, 4))...

Full Screen

Full Screen

simple_local_consumer.py

Source:simple_local_consumer.py Github

copy

Full Screen

1#!/usr/bin/env python2"""Consumer that simply calls SQL query for each event.3It tracks completed batches in local file.4Config::5 # source database6 src_db =7 # destination database8 dst_db =9 # query to call10 dst_query = select * from somefunc(%%(pgq.ev_data)s);11 ## Use table_filter where possible instead of this ##12 # filter for events (SQL fragment)13 consumer_filter = ev_extra1 = 'public.mytable1'14"""15import sys16import pkgloader17pkgloader.require('skytools', '3.0')18import pgq19import skytools20class SimpleLocalConsumer(pgq.LocalConsumer):21 __doc__ = __doc__22 def reload(self):23 super(SimpleLocalConsumer, self).reload()24 self.dst_query = self.cf.get("dst_query")25 if self.cf.get("consumer_filter", ""):26 self.consumer_filter = self.cf.get("consumer_filter", "")27 def process_local_event(self, db, batch_id, ev):28 if ev.ev_type[:2] not in ('I:', 'U:', 'D:'):29 return30 if ev.ev_data is None:31 payload = {}32 else:33 payload = skytools.db_urldecode(ev.ev_data)34 payload['pgq.tick_id'] = self.batch_info['cur_tick_id']35 payload['pgq.ev_id'] = ev.ev_id36 payload['pgq.ev_time'] = ev.ev_time37 payload['pgq.ev_type'] = ev.ev_type38 payload['pgq.ev_data'] = ev.ev_data39 payload['pgq.ev_extra1'] = ev.ev_extra140 payload['pgq.ev_extra2'] = ev.ev_extra241 payload['pgq.ev_extra3'] = ev.ev_extra342 payload['pgq.ev_extra4'] = ev.ev_extra443 self.log.debug(self.dst_query, payload)44 retries, curs = self.execute_with_retry('dst_db', self.dst_query, payload)45 if curs.statusmessage[:6] == 'SELECT':46 res = curs.fetchall()47 self.log.debug(res)48 else:49 self.log.debug(curs.statusmessage)50if __name__ == '__main__':51 script = SimpleLocalConsumer("simple_local_consumer3", "src_db", sys.argv[1:])...

Full Screen

Full Screen

simple_consumer.py

Source:simple_consumer.py Github

copy

Full Screen

1#!/usr/bin/env python2"""Consumer that simply calls SQL query for each event.3Config::4 # source database5 src_db =6 # destination database7 dst_db =8 # query to call9 dst_query = select * from somefunc(%%(pgq.ev_data)s);10 ## Deprecated, use table_filter ##11 # filter for events (SQL fragment)12 consumer_filter = ev_extra1 = 'public.mytable1'13"""14import sys15import pkgloader16pkgloader.require('skytools', '3.0')17import pgq18import skytools19class SimpleConsumer(pgq.Consumer):20 __doc__ = __doc__21 def reload(self):22 super(SimpleConsumer, self).reload()23 self.dst_query = self.cf.get("dst_query")24 if self.cf.get("consumer_filter", ""):25 self.consumer_filter = self.cf.get("consumer_filter", "")26 def process_event(self, db, ev):27 curs = self.get_database('dst_db', autocommit = 1).cursor()28 if ev.ev_type[:2] not in ('I:', 'U:', 'D:'):29 return30 if ev.ev_data is None:31 payload = {}32 else:33 payload = skytools.db_urldecode(ev.ev_data)34 payload['pgq.tick_id'] = self.batch_info['cur_tick_id']35 payload['pgq.ev_id'] = ev.ev_id36 payload['pgq.ev_time'] = ev.ev_time37 payload['pgq.ev_type'] = ev.ev_type38 payload['pgq.ev_data'] = ev.ev_data39 payload['pgq.ev_extra1'] = ev.ev_extra140 payload['pgq.ev_extra2'] = ev.ev_extra241 payload['pgq.ev_extra3'] = ev.ev_extra342 payload['pgq.ev_extra4'] = ev.ev_extra443 self.log.debug(self.dst_query, payload)44 curs.execute(self.dst_query, payload)45 if curs.statusmessage[:6] == 'SELECT':46 res = curs.fetchall()47 self.log.debug(res)48 else:49 self.log.debug(curs.statusmessage)50if __name__ == '__main__':51 script = SimpleConsumer("simple_consumer3", "src_db", sys.argv[1:])...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful