How to use parse_tags method in Behave

Best Python code snippet using behave

tests.py

Source:tests.py Github

copy

Full Screen

...330 def test_with_simple_space_delimited_tags(self):331 """332 Test with simple space-delimited tags.333 """334 self.assertEqual(parse_tags('one'), [u'one'])335 self.assertEqual(parse_tags('one two'), [u'one', u'two'])336 self.assertEqual(parse_tags('one two three'), [u'one', u'three', u'two'])337 self.assertEqual(parse_tags('one one two two'), [u'one', u'two'])338 def test_with_comma_delimited_multiple_words(self):339 """340 Test with comma-delimited multiple words.341 An unquoted comma in the input will trigger this.342 """343 self.assertEqual(parse_tags(',one'), [u'one'])344 self.assertEqual(parse_tags(',one two'), [u'one two'])345 self.assertEqual(parse_tags(',one two three'), [u'one two three'])346 self.assertEqual(parse_tags('a-one, a-two and a-three'),347 [u'a-one', u'a-two and a-three'])348 def test_with_double_quoted_multiple_words(self):349 """350 Test with double-quoted multiple words.351 A completed quote will trigger this. Unclosed quotes are ignored.352 """353 self.assertEqual(parse_tags('"one'), [u'one'])354 self.assertEqual(parse_tags('"one two'), [u'one', u'two'])355 self.assertEqual(parse_tags('"one two three'), [u'one', u'three', u'two'])356 self.assertEqual(parse_tags('"one two"'), [u'one two'])357 self.assertEqual(parse_tags('a-one "a-two and a-three"'),358 [u'a-one', u'a-two and a-three'])359 def test_with_no_loose_commas(self):360 """361 Test with no loose commas -- split on spaces.362 """363 self.assertEqual(parse_tags('one two "thr,ee"'), [u'one', u'thr,ee', u'two'])364 def test_with_loose_commas(self):365 """366 Loose commas - split on commas367 """368 self.assertEqual(parse_tags('"one", two three'), [u'one', u'two three'])369 def test_tags_with_double_quotes_can_contain_commas(self):370 """371 Double quotes can contain commas372 """373 self.assertEqual(parse_tags('a-one "a-two, and a-three"'),374 [u'a-one', u'a-two, and a-three'])375 self.assertEqual(parse_tags('"two", one, one, two, "one"'),376 [u'one', u'two'])377 def test_with_naughty_input(self):378 """379 Test with naughty input.380 """381 # Bad users! Naughty users!382 self.assertEqual(parse_tags(None), [])383 self.assertEqual(parse_tags(''), [])384 self.assertEqual(parse_tags('"'), [])385 self.assertEqual(parse_tags('""'), [])386 self.assertEqual(parse_tags('"' * 7), [])387 self.assertEqual(parse_tags(',,,,,,'), [])388 self.assertEqual(parse_tags('",",",",",",","'), [u','])389 self.assertEqual(parse_tags('a-one "a-two" and "a-three'),390 [u'a-one', u'a-three', u'a-two', u'and'])391 def test_recreation_of_tag_list_string_representations(self):392 plain = Tag.objects.create(name='plain')393 spaces = Tag.objects.create(name='spa ces')394 comma = Tag.objects.create(name='com,ma')395 self.assertEqual(edit_string_for_tags([plain]), u'plain')396 self.assertEqual(edit_string_for_tags([plain, spaces]), u'"spa ces", plain')397 self.assertEqual(edit_string_for_tags([plain, spaces, comma]), u'"com,ma", "spa ces", plain')398 self.assertEqual(edit_string_for_tags([plain, comma]), u'"com,ma", plain')...

Full Screen

Full Screen

vttablet_collectd.py

Source:vttablet_collectd.py Github

copy

Full Screen

1#!/usr/bin/python2import util3NAME = 'vttablet'4class Vttablet(util.BaseCollector):5 def __init__(self, collectd, json_provider=None, verbose=False, interval=None):6 super(Vttablet, self).__init__(collectd, NAME, 15101, json_provider, verbose, interval)7 self.include_per_table_per_user_stats = True8 self.include_per_user_timings = True9 self.include_streamlog_stats = True10 self.include_acl_stats = True11 self.include_results_histogram = True12 self.include_reparent_timings = True13 self.include_heartbeat = False14 self.include_query_timings = False15 self.include_per_table_stats = True16 def configure_callback(self, conf):17 super(Vttablet, self).configure_callback(conf)18 for node in conf.children:19 if node.key == 'IncludeResultsHistogram':20 self.include_results_histogram = util.boolval(node.values[0])21 elif node.key == 'IncludeStatsPerTablePerUser':22 self.include_per_table_per_user_stats = util.boolval(node.values[0])23 elif node.key == 'IncludeTimingsPerUser':24 self.include_per_user_timings = util.boolval(node.values[0])25 elif node.key == 'IncludeStreamLog':26 self.include_streamlog_stats = util.boolval(node.values[0])27 elif node.key == 'IncludeACLStats':28 self.include_acl_stats = util.boolval(node.values[0])29 elif node.key == 'IncludeExternalReparentTimings':30 self.include_reparent_timings = util.boolval(node.values[0])31 elif node.key == 'IncludeHeartbeat':32 self.include_heartbeat = util.boolval(node.values[0])33 elif node.key == 'IncludeQueryTimings':34 self.include_query_timings = util.boolval(node.values[0])35 elif node.key == 'IncludePerTableStats':36 self.include_per_table_stats = util.boolval(node.values[0])37 self.register_read_callback()38 def process_data(self, json_data):39 # Current connections and total accepted40 self.process_metric(json_data, 'ConnAccepted', 'counter')41 self.process_metric(json_data, 'ConnCount', 'gauge')42 # Health-related metrics.43 # TabletState is an integer mapping to one of SERVING (2), NOT_SERVING (0, 1, 3), or SHUTTING_DOWN (4)44 self.process_metric(json_data, 'TabletState', 'gauge', base_tags={'TabletType': json_data['TabletType'].lower()},)45 # Report on whether this is a master46 self.process_metric(json_data, 'TabletType', 'gauge', alt_name='IsMaster', transformer=lambda val: 1 if val.lower() == 'master' else 0)47 self.process_metric(json_data, 'HealthcheckErrors', 'counter', parse_tags=['keyspace', 'shard', 'type'])48 # GC Stats49 memstats = json_data['memstats']50 self.process_metric(memstats, 'GCCPUFraction', 'counter', prefix='GC.', alt_name='CPUFraction')51 self.process_metric(memstats, 'PauseTotalNs', 'counter', prefix='GC.')52 # Tracking usage of the connection pools used by apps53 self.process_pool_data(json_data, 'Conn')54 self.process_pool_data(json_data, 'StreamConn')55 self.process_pool_data(json_data, 'Transaction')56 self.process_pool_data(json_data, 'FoundRows')57 # Tracking ExecuteOptions_DBA transactions58 self.process_metric(json_data, 'TransactionPoolDbaInUse', 'gauge')59 self.process_metric(json_data, 'TransactionPoolDbaTotal', 'gauge')60 # If enabled, track histogram of number of results returned from user queries61 if self.include_results_histogram:62 self.process_histogram(json_data, 'Results')63 # Counters tagged by type, for tracking various error modes of the vttablet64 for metric in ['Errors', 'InternalErrors', 'Kills']:65 self.process_metric(json_data, metric, 'counter', parse_tags=['type'])66 if self.include_per_table_stats:67 # Counters tagged by table and type, for tracking counts of the various query types, times, and ways in which a query can fail68 # all broken down by table69 for metric in ['QueryCounts', 'QueryErrorCounts', 'QueryRowCounts', 'QueryTimesNs']:70 alt_name = 'QueryTimes' if metric == 'QueryTimeNs' else None71 transformer = util.nsToMs if metric == 'QueryTimesNs' else None72 self.process_metric(json_data, metric, 'counter', alt_name=alt_name, parse_tags=['table', 'type'], transformer=transformer)73 # Tracks data from information_schema about the size of tables74 for metric in ['DataFree', 'DataLength', 'IndexLength', 'TableRows']:75 self.process_metric(json_data, metric, 'gauge', parse_tags=['table'])76 if self.include_per_table_per_user_stats:77 # Tracks counts and timings of user queries by user, table, and type78 user_table_tags = ['table', 'user', 'type']79 self.process_metric(json_data, 'UserTableQueryCount', 'counter', parse_tags=user_table_tags)80 self.process_metric(json_data, 'UserTableQueryTimesNs', 'counter', alt_name='UserTableQueryTime', parse_tags=user_table_tags, transformer=util.nsToMs)81 # Tracks counts and timings of user transactions by user and type82 user_tx_tags = ['user', 'type']83 self.process_metric(json_data, 'UserTransactionCount', 'counter', parse_tags=user_tx_tags)84 self.process_metric(json_data, 'UserTransactionTimesNs', 'counter', alt_name='UserTransactionTime', parse_tags=user_tx_tags, transformer=util.nsToMs)85 # Tracks a variety of metrics for timing of the various layers of execution86 # MySQL is how long it takes to actually execute in MySQL. While Queries is the total time with vitess overhead87 # Waits tracks instances where we are able to consolidate identical queries while waiting for a connection88 self.process_timing_data(json_data, 'Mysql', process_histograms=False)89 self.process_timing_data(json_data, 'Queries', process_histograms=False)90 self.process_timing_data(json_data, 'Transactions', process_histograms=False)91 self.process_timing_data(json_data, 'Waits')92 if self.include_reparent_timings:93 self.process_timing_data(json_data, 'ExternalReparents', process_histograms=False)94 # MySQL timings above, broken down by user95 if self.include_per_user_timings:96 self.process_timing_data(json_data, 'MysqlAllPrivs')97 self.process_timing_data(json_data, 'MysqlApp')98 self.process_timing_data(json_data, 'MysqlDba')99 # Track usage of Vitess' query PLAN cache100 self.process_metric(json_data, 'QueryCacheCapacity', 'gauge', alt_name='QueryPlanCacheCapacity')101 self.process_metric(json_data, 'QueryCacheLength', 'gauge', alt_name='QueryPlanCacheLength')102 # Tracks messages sent and success of delivery for the stream log103 if self.include_streamlog_stats:104 self.process_metric(json_data, 'StreamlogSend', 'counter', parse_tags=['log'])105 parse_tags = ['log', 'subscriber']106 self.process_metric(json_data, 'StreamlogDelivered', 'counter', parse_tags=parse_tags)107 self.process_metric(json_data, 'StreamlogDeliveryDroppedMessages', 'counter', parse_tags=parse_tags)108 # Tracks the impact of ACLs on user queries109 if self.include_acl_stats and self.include_per_table_stats:110 acl_tags = ['table', 'plan', 'id', 'user']111 self.process_metric(json_data, 'TableACLAllowed', 'counter', parse_tags=acl_tags)112 self.process_metric(json_data, 'TableACLDenied', 'counter', parse_tags=acl_tags)113 self.process_metric(json_data, 'TableACLPseudoDenied', 'counter', parse_tags=acl_tags)114 # Super users are exempt and are tracked by this115 self.process_metric(json_data, 'TableACLExemptCount', 'counter')116 # Look for DDL executed by users not in migration group117 for tags, value in self._extract_values(json_data, 'TableACLAllowed', acl_tags):118 if tags['id'] == "DDL" and not tags['user'].startswith('migration.'):119 self.emitter.emit("UnprivilegedDDL", value, 'counter', tags)120 if self.include_heartbeat:121 self.process_metric(json_data, 'HeartbeatCurrentLagNs', 'gauge')122 self.process_metric(json_data, 'HeartbeatReads', 'counter')123 self.process_metric(json_data, 'HeartbeatReadErrors', 'counter')124 self.process_metric(json_data, 'HeartbeatWrites', 'counter')125 self.process_metric(json_data, 'HeartbeatWriteErrors', 'counter')126 if self.include_query_timings:127 query_timing_tags = ['Median', 'NinetyNinth']128 if "AggregateQueryTimings" in json_data:129 timing_json = json_data["AggregateQueryTimings"]130 self.process_timing_quartile_metric(timing_json, "TotalQueryTime")131 self.process_timing_quartile_metric(timing_json, "MysqlQueryTime")132 self.process_timing_quartile_metric(timing_json, "ConnectionAcquisitionTime")133 def process_pool_data(self, json_data, pool_name):134 self.process_metric(json_data, '%sPoolAvailable' % pool_name, 'gauge')135 self.process_metric(json_data, '%sPoolCapacity' % pool_name, 'gauge')136 self.process_metric(json_data, '%sPoolWaitCount' % pool_name, 'counter')137 self.process_metric(json_data, '%sPoolWaitTime' % pool_name, 'counter', transformer=util.nsToMs)138 self.process_metric(json_data, '%sPoolIdleClosed' % pool_name, 'counter')139 self.process_metric(json_data, '%sPoolExhausted' % pool_name, 'counter')140if __name__ == '__main__':141 util.run_local(NAME, Vttablet)142else:143 import collectd144 vt = Vttablet(collectd)...

Full Screen

Full Screen

tag_compiler.py

Source:tag_compiler.py Github

copy

Full Screen

...34 for parsed in self.parse_data():35 update_tags_dict(parsed.clip_tags)36 update_tags_dict(parsed.track_tags)37 update_tags_dict(parsed.track_comment_tags)38 session_tags = parse_tags(self.session.header.session_name).tag_dict39 update_tags_dict(session_tags)40 for m in self.session.markers:41 marker_tags = parse_tags(m.name).tag_dict42 marker_comment_tags = parse_tags(m.comments).tag_dict43 update_tags_dict(marker_tags)44 update_tags_dict(marker_comment_tags)45 return tags_dict46 def compile_events(self) -> Iterator[Event]:47 step0 = self.parse_data()48 step1 = self.apply_appends(step0)49 step2 = self.collect_time_spans(step1)50 step3 = self.apply_tags(step2)51 for datum in step3:52 yield Event(clip_name=datum[0], track_name=datum[1], session_name=datum[2],53 tags=datum[3], start=datum[4], finish=datum[5])54 def _marker_tags(self, at):55 retval = dict()56 applicable = [(m, t) for (m, t) in self.session.markers_timed() if t <= at]57 for marker, time in sorted(applicable, key=lambda x: x[1]):58 retval.update(parse_tags(marker.comments).tag_dict)59 retval.update(parse_tags(marker.name).tag_dict)60 return retval61 @staticmethod62 def _coalesce_tags(clip_tags: dict, track_tags: dict,63 track_comment_tags: dict,64 timespan_tags: dict,65 marker_tags: dict, session_tags: dict):66 effective_tags = dict()67 effective_tags.update(session_tags)68 effective_tags.update(marker_tags)69 effective_tags.update(timespan_tags)70 effective_tags.update(track_comment_tags)71 effective_tags.update(track_tags)72 effective_tags.update(clip_tags)73 return effective_tags74 def parse_data(self) -> Iterator[Intermediate]:75 for track, clip, start, finish, _ in self.session.track_clips_timed():76 if clip.state == 'Muted':77 continue78 track_parsed = parse_tags(track.name)79 track_comments_parsed = parse_tags(track.comments)80 clip_parsed = parse_tags(clip.clip_name)81 yield TagCompiler.Intermediate(track_content=track_parsed.content,82 track_tags=track_parsed.tag_dict,83 track_comment_tags=track_comments_parsed.tag_dict,84 clip_content=clip_parsed.content,85 clip_tags=clip_parsed.tag_dict,86 clip_tag_mode=clip_parsed.mode,87 start=start, finish=finish)88 @staticmethod89 def apply_appends(parsed: Iterator[Intermediate]) -> Iterator[Intermediate]:90 def should_append(a, b):91 return b.clip_tag_mode == TagPreModes.APPEND and b.start >= a.finish92 def do_append(a, b):93 merged_tags = dict(a.clip_tags)94 merged_tags.update(b.clip_tags)95 return TagCompiler.Intermediate(track_content=a.track_content,96 track_tags=a.track_tags,97 track_comment_tags=a.track_comment_tags,98 clip_content=a.clip_content + ' ' + b.clip_content,99 clip_tags=merged_tags, clip_tag_mode=a.clip_tag_mode,100 start=a.start, finish=b.finish)101 yield from apply_appends(parsed, should_append, do_append)102 @staticmethod103 def collect_time_spans(parsed: Iterator[Intermediate]) -> \104 Iterator[Tuple[Intermediate, Tuple[dict, Fraction, Fraction]]]:105 time_spans = list()106 for item in parsed:107 if item.clip_tag_mode == TagPreModes.TIMESPAN:108 time_spans.append((item.clip_tags, item.start, item.finish))109 else:110 yield item, list(time_spans)111 @staticmethod112 def _time_span_tags(at_time: Fraction, applicable_spans) -> dict:113 retval = dict()114 for tags in reversed([a[0] for a in applicable_spans if a[1] <= at_time <= a[2]]):115 retval.update(tags)116 return retval117 def apply_tags(self, parsed_with_time_spans) -> Iterator[Tuple[str, str, str, dict, Fraction, Fraction]]:118 session_parsed = parse_tags(self.session.header.session_name)119 for event, time_spans in parsed_with_time_spans:120 event: 'TagCompiler.Intermediate'121 marker_tags = self._marker_tags(event.start)122 time_span_tags = self._time_span_tags(event.start, time_spans)123 tags = self._coalesce_tags(clip_tags=event.clip_tags,124 track_tags=event.track_tags,125 track_comment_tags=event.track_comment_tags,126 timespan_tags=time_span_tags,127 marker_tags=marker_tags,128 session_tags=session_parsed.tag_dict)129 yield event.clip_content, event.track_content, session_parsed.content, tags, event.start, event.finish130def apply_appends(source: Iterator,131 should_append: Callable,132 do_append: Callable) -> Generator:...

Full Screen

Full Screen

vtgate_collectd.py

Source:vtgate_collectd.py Github

copy

Full Screen

1#!/usr/bin/python2import time3import util4import mock5NAME = 'vtgate'6class Vtgate(util.BaseCollector):7 def __init__(self, collectd, json_provider=None, verbose=False, interval=None):8 super(Vtgate, self).__init__(collectd, NAME, 15001, json_provider, verbose, interval)9 def configure_callback(self, conf):10 super(Vtgate, self).configure_callback(conf)11 self.include_query_timings = False12 self.include_per_keyspace_metrics = False13 for node in conf.children:14 if node.key == 'IncludeQueryTimings':15 self.include_query_timings = util.boolval(node.values[0])16 elif node.key == 'IncludePerKeyspaceMetrics':17 self.include_per_keyspace_metrics = util.boolval(node.values[0])18 self.register_read_callback()19 def process_data(self, json_data):20 # Current connections and total accepted21 self.process_metric(json_data, 'ConnAccepted', 'counter')22 self.process_metric(json_data, 'ConnCount', 'gauge')23 # GC Stats24 memstats = json_data['memstats']25 self.process_metric(memstats, 'GCCPUFraction', 'counter', prefix='GC.', alt_name='CPUFraction')26 self.process_metric(memstats, 'PauseTotalNs', 'counter', prefix='GC.')27 # We should endeavor to have 0 statements that are unfriendly to filtered replication for any keyspaces that want to be sharded28 self.process_metric(json_data, 'FilteredReplicationUnfriendlyStatementsCount', 'counter')29 self.process_rates(json_data, 'QPSByDbType', 'DbType')30 self.process_rates(json_data, 'QPSByOperation', 'Operation')31 self.process_rates(json_data, 'ErrorsByDbType', 'DbType')32 self.process_rates(json_data, 'ErrorsByOperation', 'Operation')33 self.process_rates(json_data, 'ErrorsByCode', 'Code')34 if self.include_per_keyspace_metrics:35 self.process_rates(json_data, 'QPSByKeyspace', 'Keyspace')36 self.process_rates(json_data, 'ErrorsByKeyspace', 'Keyspace')37 # healthcheck metrics, both errors and connections38 hc_tags = ['keyspace', 'shard', 'type']39 self.process_metric(json_data, 'HealthcheckErrors', 'counter', parse_tags=hc_tags)40 self.process_metric(json_data, 'HealthcheckConnections', 'gauge', parse_tags=hc_tags)41 # Subtracting VtgateApi from VttabletCall times below should allow seeing what overhead vtgate adds42 parse_tags = ['Operation', 'Keyspace', 'DbType']43 self.process_timing_data(json_data, 'VtgateApi', parse_tags=parse_tags)44 parse_tags = ['Operation', 'Keyspace', 'DbType', 'Code']45 self.process_metric(json_data, 'VtgateApiErrorCounts', 'counter', parse_tags=parse_tags)46 parse_tags = ['Operation', 'Keyspace', 'ShardName', 'DbType']47 self.process_metric(json_data, 'VttabletCallErrorCount', 'counter', parse_tags=parse_tags)48 self.process_timing_data(json_data, 'VttabletCall', parse_tags=parse_tags)49 parse_tags = ['Keyspace', 'ShardName']50 self.process_metric(json_data, 'BufferUtilizationSum', 'counter', parse_tags=parse_tags)51 self.process_metric(json_data, 'BufferStarts', 'counter', parse_tags=parse_tags)52 self.process_metric(json_data, 'BufferRequestsBuffered', 'counter', parse_tags=parse_tags)53 self.process_metric(json_data, 'BufferRequestsDrained', 'counter', parse_tags=parse_tags)54 parse_tags = ['Keyspace', 'ShardName', 'Reason']55 self.process_metric(json_data, 'BufferRequestsEvicted', 'counter', parse_tags=parse_tags)56 self.process_metric(json_data, 'BufferRequestsSkipped', 'counter', parse_tags=parse_tags)57 if self.include_query_timings:58 query_timing_tags = ['Median', 'NinetyNinth']59 if "AggregateQueryTimings" in json_data:60 timing_json = json_data["AggregateQueryTimings"]61 if "TotalQueryTime" in timing_json:62 self.process_timing_quartile_metric(timing_json, "TotalQueryTime")63 if "TotalRequestTime" in timing_json:64 self.process_timing_quartile_metric(timing_json, "TotalRequestTime")65 def process_rates(self, json_data, metric_name, tag_name):66 rates = json_data[metric_name]67 for key, values in rates.items():68 if key.lower() == "all":69 continue70 oneMin = values[-1]71 fiveMin = sum(values[-5:])/572 fifteenMin = sum(values[-15:])/1573 tags = dict()74 tags[tag_name] = key75 self.emitter.emit("vitess.%s.1min" % metric_name, oneMin, 'gauge', tags)76 self.emitter.emit("vitess.%s.5min" % metric_name, fiveMin, 'gauge', tags)77 self.emitter.emit("vitess.%s.15min" % metric_name, fifteenMin, 'gauge', tags)78if __name__ == '__main__':79 util.run_local(NAME, Vtgate)80else:81 import collectd82 vt = Vtgate(collectd)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Behave automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful