How to use add_transformer method in localstack

Best Python code snippet using localstack_python

events.py

Source:events.py Github

copy

Full Screen

...153 # 所有角色的上一位置154 self.last_seen_at = {}155 # 当没有旁白的时候把角色放入事件?156 self.exciting_developments = {}157 def add_transformer(self, transformer):158 self.transformers.append(transformer)159 def publish(self):160 paragraph_num = 1 # 段落编号161 while len(self.events) > 0:162 pov_actor = self.main_characters[self.pov_index]163 # 生成段落事件164 paragraph_events = self.generate_paragraph_events(pov_actor)165 for transformer in self.transformers:166 if paragraph_events:167 # 段落事件全部转换?168 paragraph_events = transformer.transform(169 self, paragraph_events, paragraph_num170 )171 # 发布段落事件172 self.publish_paragraph(paragraph_events)173 # 下一个角色。。。没有关联的么?如何关联?174 self.pov_index += 1175 if self.pov_index >= len(self.main_characters):176 self.pov_index = 0177 paragraph_num += 1178 def generate_paragraph_events(self, pov_actor):179 # 限定10到25句180 quota = random.randint(10, 25)181 paragraph_events = []182 while len(paragraph_events) < quota and len(self.events) > 0:183 event = self.events.pop()184 if not paragraph_events:185 # 这是段落的第一句186 # 如果读者没有意识到他们在这里,添加一个事件187 # 没有第一句就造个第一句188 if self.last_seen_at.get(pov_actor, None) != event.location:189 if not ('走到' in event.phrase) and not (event.phrase == '<1> <was-1> 在 <2>'):190 paragraph_events.append(Event('<1> <was-1> 在 <2>', [pov_actor, event.location]))191 # 如果有令人激动的事件,告诉读者,添加入段落事件并置空自己192 for (obj, loc) in self.exciting_developments.get(pov_actor, []):193 # 谁在哪发现了谁?194 paragraph_events.append(Event('<1> 发现 <2> 在 <3>', [pov_actor, obj, loc]))195 self.exciting_developments[pov_actor] = []196 # 更新我们的想法的角色是,即使这些不是我们将倾倒的事件197 # 事件发起者的位置更新198 self.character_location[event.initiator()] = event.location199 if event.location == self.character_location[pov_actor]:200 paragraph_events.append(event)201 # 更新读者知道的角色202 self.last_seen_at[event.initiator()] = event.location203 else:204 if event.exciting:205 self.exciting_developments.setdefault(event.initiator(), []).append(206 (event.participants[1], event.participants[2])207 )208 return paragraph_events209 # 发布段落210 def publish_paragraph(self, paragraph_events):211 for event in paragraph_events:212 sys.stdout.write(str(event) + " ")213# 转换器214class Transformer(object):215 pass216class DeduplicateTransformer(Transformer):217 # 检查逐字重复。218 # 这可能是“危险的”,219 # 如果你有两个字符,Bob Jones和Bob Smith,两个都被命名为“鲍勃”,220 # 它们实际上是两个不同的事件…但…现在,这是一个边缘案例。221 def transform(self, editor, incoming_events, paragraph_num):222 events = []223 for event in incoming_events:224 if events:225 if str(event) == str(events[-1]):226 events[-1].phrase = event.phrase + ', 两次'227 elif str(event.rephrase(event.phrase + ', 两次')) == str(events[-1]):228 events[-1].phrase = event.phrase + ', 很多次'229 elif str(event.rephrase(event.phrase + ', 很多次')) == str(events[-1]):230 pass231 else:232 events.append(event)233 else:234 events.append(event)235 return events236# 使用代词转换器237class UsePronounsTransformer(Transformer):238 # 用代词取代重复的专有名词239 def transform(self, editor, incoming_events, paragraph_num):240 events = []241 for event in incoming_events:242 if events:243 if event.initiator() == events[-1].initiator():244 event.phrase = event.phrase.replace('<1>', '<he-1>')245 events.append(event)246 else:247 events.append(event)248 return events249# 导航250class MadeTheirWayToTransformer(Transformer):251 def transform(self, editor, incoming_events, paragraph_num):252 events = []253 for event in incoming_events:254 if events and event.initiator() == events[-1].initiator():255 if (events[-1].phrase in ('<1> 走到 <2>',) and256 event.phrase == '<1> 走到 <2>'):257 assert event.location == event.participants[1]258 assert events[-1].previous_location() is not None259 assert events[-1].location == events[-1].participants[1]260 events[-1].phrase = '<1> 找到去 <2> 的路'261 events[-1].participants[1] = event.participants[1]262 events[-1].location = event.participants[1]263 elif (events[-1].phrase in ('<1> 找到去 <2> 的路',) and264 event.phrase == '<1> 走到 <2>'):265 assert event.location == event.participants[1]266 assert events[-1].previous_location() is not None267 assert events[-1].location == events[-1].participants[1]268 events[-1].phrase = '<1> 找到去 <2> 的路'269 events[-1].participants[1] = event.participants[1]270 events[-1].location = event.participants[1]271 else:272 events.append(event)273 else:274 events.append(event)275 return events276# well well well277from novel.swallows import Actor278weather = Actor('天气')279# 添加天气的转换器280class AddWeatherFrifferyTransformer(Transformer):281 def transform(self, editor, incoming_events, paragraph_num):282 events = []283 if paragraph_num == 1:284 choice = random.randint(0, 3)285 if choice == 0:286 events.append(Event("下雨了!!!", [weather]))287 if choice == 1:288 events.append(Event("雪花飘飘", [weather]))289 if choice == 2:290 events.append(Event("阳光四射", [weather]))291 if choice == 3:292 events.append(Event("天空阴沉", [weather]))293 return events + incoming_events294# 添加段落开始转换器295class AddParagraphStartFrifferyTransformer(Transformer):296 def transform(self, editor, incoming_events, paragraph_num):297 first_event = incoming_events[0]298 if paragraph_num == 1:299 return incoming_events300 if str(first_event).startswith("'"):301 return incoming_events302 if " 已经找到了 " in str(first_event):303 return incoming_events304 if " 刚刚在 " in str(first_event):305 return incoming_events306 choice = random.randint(0, 8)307 if choice == 0:308 first_event = first_event.rephrase(309 "过了一会儿, " + first_event.phrase310 )311 if choice == 1:312 first_event = first_event.rephrase(313 "突然, " + first_event.phrase314 )315 if choice == 2:316 first_event = first_event.rephrase(317 "考虑了一会儿, " + first_event.phrase318 )319 if choice == 3:320 first_event = first_event.rephrase(321 "有点焦急, " + first_event.phrase322 )323 return [first_event] + incoming_events[1:]324# 组合事件的转换器325class AggregateEventsTransformer(Transformer):326 # 简单衔接327 def transform(self, editor, incoming_events, paragraph_num):328 events = []329 for event in incoming_events:330 if events:331 if (event.initiator() == events[-1].initiator() and332 events[-1].phrase in ('<1> 走向 <2>',) and333 event.phrase in ('<1> 看到 <2>',)):334 event.phrase = event.phrase.replace('<1>', '<he-1>')335 events[-1] = AggregateEvent(336 "%s, 当 %s 的时候", [events[-1], event],337 excl=event.excl)338 else:339 events.append(event)340 else:341 events.append(event)342 return events343# 侦查闲逛344class DetectWanderingTransformer(Transformer):345 # 还没用到346 # 导航到原地...347 def transform(self, editor, incoming_events, paragraph_num):348 events = []349 for event in incoming_events:350 if event.phrase == '<1> 找到去 <2> 的路' and event.location == event.previous_location():351 event.phrase = '<1> 在附近转了转, 然后回到 <2>'352 events.append(event)353 return events354# 发布器355class Publisher(object):356 def __init__(self,357 characters=(),358 setting=(),359 friffery=False,360 debug=False,361 title='无标题',362 chapters=18,363 events_per_chapter=810): # 每章默认810个事件364 self.characters = characters365 self.setting = setting366 self.friffery = friffery367 self.debug = debug368 self.title = title369 self.chapters = chapters370 self.events_per_chapter = events_per_chapter371 def publish_chapter(self, chapter_num):372 # 构造事件收集器373 collector = EventCollector()374 # 为每个角色初始化375 for character in self.characters:376 character.collector = collector377 # 请不要继续前一章的对话。378 character.topic = None379 character.place_in(random.choice(self.setting))380 while len(collector.events) < self.events_per_chapter:381 for character in self.characters:382 # 生成生活动态???出口是???383 character.live()384 # print len(collector.events) # , repr([str(e) for e in collector.events])385 if self.debug:386 for character in self.characters:387 print("%s的 EVENTS:" % character.name.upper())388 for event in collector.events:389 # 第一参与者390 if event.participants[0] != character:391 continue392 print("%s 在 %s: %s" % (393 [p.render(event=event) for p in event.participants],394 event.location.render(),395 event.phrase396 ))397 print()398 for character in self.characters:399 print("%s的状态:" % character.name.upper())400 # dump intents?导出意图?401 character.dump_beliefs()402 print()403 print("- - - - -")404 print()405 editor = Editor(collector, self.characters)406 editor.add_transformer(MadeTheirWayToTransformer())407 editor.add_transformer(DeduplicateTransformer())408 editor.add_transformer(AggregateEventsTransformer())409 editor.add_transformer(DetectWanderingTransformer())410 # 这应该是最后一个,所以现有的转换器器不必担心自己找代词411 editor.add_transformer(UsePronounsTransformer())412 # 当您实例化一个发布服务器时,这应该是配置什么样的转换器使用的问题。413 if self.friffery:414 editor.add_transformer(AddWeatherFrifferyTransformer())415 editor.add_transformer(AddParagraphStartFrifferyTransformer())416 # 编辑器现在已经获得 角色,事件收集器,转换器,生成吧!417 editor.publish()418 def publish(self):419 print(self.title)420 print("=" * len(self.title))421 print()422 # 逐章生成423 for chapter in range(1, self.chapters + 1):424 print("第 %d 章" % chapter)425 print("-----------")426 print()...

Full Screen

Full Screen

math_binary_test.py

Source:math_binary_test.py Github

copy

Full Screen

1import math2import os3import shutil4import tempfile5import unittest6import mleap.pyspark # noqa7from mleap.pyspark.spark_support import SimpleSparkSerializer # noqa8import pandas as pd9from pandas.testing import assert_frame_equal10from pyspark.ml import Pipeline11from pyspark.sql.types import FloatType12from pyspark.sql.types import StructType13from pyspark.sql.types import StructField14from mleap.pyspark.feature.math_binary import MathBinary15from mleap.pyspark.feature.math_binary import BinaryOperation16from tests.pyspark.lib.spark_session import spark_session17INPUT_SCHEMA = StructType([18 StructField('f1', FloatType()),19 StructField('f2', FloatType()),20])21class MathBinaryTest(unittest.TestCase):22 @classmethod23 def setUpClass(cls):24 cls.spark = spark_session()25 @classmethod26 def tearDownClass(cls):27 cls.spark.stop()28 def setUp(self):29 self.input = self.spark.createDataFrame([30 (31 float(i),32 float(i * 2),33 )34 for i in range(1, 10)35 ], INPUT_SCHEMA)36 self.expected_add = pd.DataFrame(37 [(38 float(i + i * 2)39 )40 for i in range(1, 10)],41 columns=['add(f1, f2)'],42 )43 self.tmp_dir = tempfile.mkdtemp()44 def tearDown(self):45 shutil.rmtree(self.tmp_dir)46 def _new_add_math_binary(self):47 return MathBinary(48 operation=BinaryOperation.Add,49 inputA="f1",50 inputB="f2",51 outputCol="add(f1, f2)",52 )53 def test_add_math_binary(self):54 add_transformer = self._new_add_math_binary()55 result = add_transformer.transform(self.input).toPandas()[['add(f1, f2)']]56 assert_frame_equal(self.expected_add, result)57 def test_math_binary_pipeline(self):58 add_transformer = self._new_add_math_binary()59 mul_transformer = MathBinary(60 operation=BinaryOperation.Multiply,61 inputA="f1",62 inputB="add(f1, f2)",63 outputCol="mul(f1, add(f1, f2))",64 )65 expected = pd.DataFrame(66 [(67 float(i * (i + i * 2))68 )69 for i in range(1, 10)],70 columns=['mul(f1, add(f1, f2))'],71 )72 pipeline = Pipeline(73 stages=[add_transformer, mul_transformer]74 )75 pipeline_model = pipeline.fit(self.input)76 result = pipeline_model.transform(self.input).toPandas()[['mul(f1, add(f1, f2))']]77 assert_frame_equal(expected, result)78 def test_can_instantiate_all_math_binary(self):79 for binary_operation in BinaryOperation:80 transformer = MathBinary(81 operation=binary_operation,82 inputA="f1",83 inputB="f2",84 outputCol="operation",85 )86 def test_serialize_deserialize_math_binary(self):87 add_transformer = self._new_add_math_binary()88 file_path = '{}{}'.format('jar:file:', os.path.join(self.tmp_dir, 'math_binary.zip'))89 add_transformer.serializeToBundle(file_path, self.input)90 deserialized_math_binary = SimpleSparkSerializer().deserializeFromBundle(file_path)91 result = deserialized_math_binary.transform(self.input).toPandas()[['add(f1, f2)']]92 assert_frame_equal(self.expected_add, result)93 def test_serialize_deserialize_pipeline(self):94 add_transformer = self._new_add_math_binary()95 mul_transformer = MathBinary(96 operation=BinaryOperation.Multiply,97 inputA="f1",98 inputB="add(f1, f2)",99 outputCol="mul(f1, add(f1, f2))",100 )101 expected = pd.DataFrame(102 [(103 float(i * (i + i * 2))104 )105 for i in range(1, 10)],106 columns=['mul(f1, add(f1, f2))'],107 )108 pipeline = Pipeline(109 stages=[add_transformer, mul_transformer]110 )111 pipeline_model = pipeline.fit(self.input)112 file_path = '{}{}'.format('jar:file:', os.path.join(self.tmp_dir, 'math_binary_pipeline.zip'))113 pipeline_model.serializeToBundle(file_path, self.input)114 deserialized_pipeline = SimpleSparkSerializer().deserializeFromBundle(file_path)115 result = pipeline_model.transform(self.input).toPandas()[['mul(f1, add(f1, f2))']]116 assert_frame_equal(expected, result)117 def test_add_math_binary_defaults_none(self):118 add_transformer = self._new_add_math_binary()119 none_df = self.spark.createDataFrame([120 (None, float(i * 2))121 for i in range(1, 3)122 ], INPUT_SCHEMA)123 # Summing null + int yields NaN124 expected_df = pd.DataFrame([125 (float("NaN"),)126 for i in range(1, 3)127 ], columns=['add(f1, f2)'])128 result = add_transformer.transform(none_df).toPandas()[['add(f1, f2)']]129 assert_frame_equal(expected_df, result)130 def test_mult_math_binary_default_inputA(self):131 mult_transformer = MathBinary(132 operation=BinaryOperation.Multiply,133 inputB="f2",134 outputCol="mult(1, f2)",135 defaultA=1.0,136 )137 none_df = self.spark.createDataFrame([138 (None, float(i * 1234))139 for i in range(1, 3)140 ], INPUT_SCHEMA)141 expected_df = pd.DataFrame([142 (float(i * 1234), )143 for i in range(1, 3)144 ], columns=['mult(1, f2)'])145 result = mult_transformer.transform(none_df).toPandas()[['mult(1, f2)']]146 assert_frame_equal(expected_df, result)147 def test_mult_math_binary_default_inputB(self):148 mult_transformer = MathBinary(149 operation=BinaryOperation.Multiply,150 inputA="f1",151 outputCol="mult(f1, 2)",152 defaultB=2.0,153 )154 none_df = self.spark.createDataFrame([155 (float(i * 1234), None)156 for i in range(1, 3)157 ], INPUT_SCHEMA)158 expected_df = pd.DataFrame([159 (float(i * 1234 * 2), )160 for i in range(1, 3)161 ], columns=['mult(f1, 2)'])162 result = mult_transformer.transform(none_df).toPandas()[['mult(f1, 2)']]163 assert_frame_equal(expected_df, result)164 def test_mult_math_binary_default_both(self):165 mult_transformer = MathBinary(166 operation=BinaryOperation.Multiply,167 outputCol="mult(7, 8)",168 defaultA=7.0,169 defaultB=8.0,170 )171 none_df = self.spark.createDataFrame([172 (None, None)173 for i in range(1, 3)174 ], INPUT_SCHEMA)175 expected_df = pd.DataFrame([176 (float(7 * 8), )177 for i in range(1, 3)178 ], columns=['mult(7, 8)'])179 result = mult_transformer.transform(none_df).toPandas()[['mult(7, 8)']]...

Full Screen

Full Screen

visualize.py

Source:visualize.py Github

copy

Full Screen

...9 vis.process(graph)10def plot_cfg(cfg, fname, format="png", path=None, asminst=False, vexinst=False, func_addr=None, remove_imports=True, remove_path_terminator=True, remove_simprocedures=False, debug_info=False, comments=True, color_depth=False):11 vis = AngrVisFactory().default_cfg_pipeline(cfg, asminst=asminst, vexinst=vexinst, comments=comments)12 if remove_imports:13 vis.add_transformer(AngrRemoveImports(cfg.project))14 if remove_simprocedures:15 vis.add_transformer(AngrRemoveSimProcedures())16 if func_addr:17 vis.add_transformer(AngrFilterNodes(lambda node: node.obj.function_address in func_addr and func_addr[node.obj.function_address]))18 if debug_info:19 vis.add_content(AngrCFGDebugInfo())20 if path:21 vis.add_edge_annotator(AngrPathAnnotator(path))22 vis.add_node_annotator(AngrPathAnnotator(path))23 if color_depth:24 vis.add_clusterer(AngrCallstackKeyClusterer())25 vis.add_clusterer(ColorDepthClusterer(palette='greens'))26 vis.set_output(DotOutput(fname, format=format)) 27 vis.process(cfg.graph) 28def plot_func_graph(project, graph, fname, format="png", asminst=True, ailinst=True, vexinst=False, structure=None, color_depth=False):29 vis = AngrVisFactory().default_func_graph_pipeline(project, asminst=asminst, ailinst=ailinst, vexinst=vexinst)30 if structure:31 vis.add_clusterer(AngrStructuredClusterer(structure))32 if color_depth:33 vis.add_clusterer(ColorDepthClusterer(palette='greens'))34 vis.set_output(DotOutput(fname, format=format))35 vis.process(graph) 36#Note: method signature may be changed in the future37def plot_structured_graph(project, structure, fname, format="png", asminst=True, ailinst=True, vexinst=False, color_depth=False):38 vis = AngrVisFactory().default_structured_graph_pipeline(project, asminst=asminst, ailinst=ailinst, vexinst=vexinst)39 if color_depth:40 vis.add_clusterer(ColorDepthClusterer(palette='greens'))41 vis.set_output(DotOutput(fname, format=format))42 vis.process(structure)43def plot_cg(kb, fname, format="png", verbose=False, filter=None):44 vis = AngrVisFactory().default_cg_pipeline(kb, verbose=verbose)45 vis.set_output(DotOutput(fname, format=format))46 vis.process(kb, filter)47 48def plot_cdg(cfg, cdg, fname, format="png", pd_edges=False, cg_edges=True, remove_fakeret=True):49 vis = AngrVisFactory().default_cfg_pipeline(cfg, asminst=True, vexinst=False, color_edges=False)50 if remove_fakeret:51 vis.add_transformer(AngrRemoveFakeretEdges())52 if pd_edges:53 vis.add_transformer(AngrAddEdges(cdg.get_post_dominators(), color="green", reverse=True))54 if cg_edges:55 vis.add_transformer(AngrAddEdges(cdg.graph, color="purple", reverse=False))56 vis.set_output(DotOutput(fname, format=format))57 vis.process(cfg.graph)58def plot_dfg(dfg, fname, format="png"):59 vis = AngrVisFactory().default_common_graph_pipeline(type=True)60 vis.set_output(DotOutput(fname, format=format))61 vis.process(dfg)62#Note: method signature may change in the future63def plot_ddg_stmt(ddg_stmt, fname, format="png", project=None):64 vis = AngrVisFactory().default_common_graph_pipeline()65 if project:66 vis.add_content(AngrAsm(project))67 vis.add_content(AngrVex(project))68 vis.add_edge_annotator(AngrColorDDGStmtEdges(project))69 vis.set_output(DotOutput(fname, format=format))...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful