Best Python code snippet using yandex-tank
shader.py
Source:shader.py  
...8    :example9        from core import shader10        from core import scenegraph11        knode = NodegraphAPI.GetAllSelectedNodes()[0]12        producer = scenegraph.get_producer(knode, location=None)13        shader.get_assigned_materials(producer)14    """15    locations = scenegraph.travel(producer)16    root_producer = producer.getRootProducer()17    assigned_materials = {}18    for location in locations:19        material_assign = location.getGlobalAttribute(20            "materialAssign"21        )22        if not material_assign:23            continue24        material = material_assign.getData()25        if not material:26            continue27        material_producer = root_producer.getProducerByPath(28            material[0]29        )30        if not material_producer:31            continue32        if material_producer.getType() != "material":33            continue34        assigned_materials.setdefault(material[0], []).append(35            location36        )37    return assigned_materials38def get_scene_materials(producer):39    """40    :description get scene (all) materials41    :param producer <GeometryProducer object>42    :example43        from core import shader44        from core import scenegraph45        knode = NodegraphAPI.GetAllSelectedNodes()[0]46        producer = scenegraph.get_producer(knode, location=None)47        shader.get_scene_materials(producer)48    """49    material_producers = scenegraph.list_specific_producer(50        producer, "material"51    )52    material_assigned_objects = get_material_assigned_objects(53        producer54    )55    scene_materials = {}56    for material_producer in material_producers:57        current_material = material_producer.getFullName()58        assigned_objects = None59        if current_material in material_assigned_objects:60            assigned_objects = material_assigned_objects[61                current_material62            ]63        scene_materials.setdefault(current_material, assigned_objects)64    return scene_materials65def get_material_assigned_objects(producer):66    """67    :description get material assigned objects68    :param producer <GeometryProducer object>69    :example70        from core import shader71        from core import scenegraph72        knode = NodegraphAPI.GetAllSelectedNodes()[0]73        producer = scenegraph.get_producer(knode, location=None)74        shader.get_material_assigned_objects(producer)75    """76    locations = scenegraph.travel(producer)77    root_producer = producer.getRootProducer()78    assigned_objects = {}79    for location in locations:80        material_assign = location.getGlobalAttribute(81            "materialAssign"82        )83        if not material_assign:84            continue85        material = material_assign.getData()86        if not material:87            continue88        material_producer = root_producer.getProducerByPath(89            material[0]90        )91        if not material_producer:92            continue93        if material_producer.getType() != "material":94            continue95        assigned_objects.setdefault(material[0], []).append(location)96    return assigned_objects97def get_object_assigned_material(producer):98    """99    :description get material assigned objects100    :param producer <GeometryProducer object>101    :example102        from core import shader103        from core import scenegraph104        knode = NodegraphAPI.GetAllSelectedNodes()[0]105        producer = scenegraph.get_producer(knode, location=None)106        shader.get_material_assigned_objects(producer)107    """108    locations = scenegraph.travel(producer)109    root_producer = producer.getRootProducer()110    assigned_objects = {}111    for location in locations:112        material_assign = location.getGlobalAttribute(113            "materialAssign"114        )115        if not material_assign:116            continue117        material = material_assign.getData()118        if not material:119            continue120        material_producer = root_producer.getProducerByPath(121            material[0]122        )123        if not material_producer:124            continue125        if material_producer.getType() != "material":126            continue127        assigned_objects.setdefault(128            location.getFullName(), material[0]129        )130    return assigned_objects131def get_material_assigned_texture_maps(producer):132    """133    :description get material assigned texture maps134    :param producer <GeometryProducer object>135    :example136        from core import shader137        from core import scenegraph138        knode = NodegraphAPI.GetAllSelectedNodes()[0]139        producer = scenegraph.get_producer(knode, location=None)140        shader.get_material_assigned_texture_maps(producer)141    """142    assigned_materials = get_assigned_materials(producer)143    root_producer = producer.getRootProducer()144    assigned_texture_maps = {}145    for material in assigned_materials:146        material_producer = root_producer.getProducerByPath(material)147        texture_maps = get_texture_maps(material_producer)148        assigned_texture_maps.setdefault(material, texture_maps)149    return assigned_texture_maps150def get_scene_material_texture_maps(producer):151    """152    :description get scene (all) material texture maps153    :param producer <GeometryProducer object>154    :example155        from core import shader156        from core import scenegraph157        knode = NodegraphAPI.GetAllSelectedNodes()[0]158        producer = scenegraph.get_producer(knode, location=None)159        shader.get_scene_material_texture_maps(producer)160    """161    scene_materials = get_scene_materials(producer)162    root_producer = producer.getRootProducer()163    scene_texture_maps = {}164    for material in scene_materials:165        material_producer = root_producer.getProducerByPath(material)166        texture_maps = get_texture_maps(material_producer)167        scene_texture_maps.setdefault(material, texture_maps)168    return scene_texture_maps169def get_texture_maps(material_producer):170    """171    :description get texture maps from material producer172    :param material_producer <GeometryProducer object>173    :example174        from core import shader175        from core import scenegraph176        knode = NodegraphAPI.GetAllSelectedNodes()[0]177        location = '/root/materials/body_material'178        material_producer = scenegraph.get_producer(knode, location=location)179        shader.get_texture_maps(material_producer)180    """181    texture_maps = {}182    dependency_nodes = get_material_dependency_nodes(183        material_producer184    )185    if not dependency_nodes:186        return texture_maps187    for dependency_node in dependency_nodes:188        filename = get_node_filename(189            material_producer, dependency_node190        )191        if not filename:192            continue193        sourcename = get_node_sourcename(194            material_producer, dependency_node195        )196        if not sourcename:197            sourcename = dependency_node198        texture_maps.setdefault(filename, []).append(sourcename)199    return texture_maps200def get_material_dependency_nodes(material_producer):201    """202    :description get material dependency nodes203    :param material_producer <GeometryProducer object>204    :example205        from core import shader206        from core import scenegraph207        knode = NodegraphAPI.GetAllSelectedNodes()[0]208        location = '/root/materials/body_material'209        material_producer = scenegraph.get_producer(knode, location=location)210        shader.get_material_dependency_nodes(material_producer)211    """212    global_attribute = material_producer.getGlobalAttribute(213        "material.nodes"214    )215    if not global_attribute:216        return None217    dependency_nodes = global_attribute.childNames()218    return dependency_nodes219def get_node_filename(material_producer, node):220    """221    :description get material dependency node file name (source file)222    :param material_producer <GeometryProducer object>223    :param node <str>224    :example225        from core import shader226        from core import scenegraph227        knode = NodegraphAPI.GetAllSelectedNodes()[0]228        location = '/root/materials/body_material'229        material_producer = scenegraph.get_producer(knode, location=location)230        shader.get_node_filename(material_producer, 'body_texture')231    """232    attribute = "material.nodes.%s.parameters.filename" % node233    filename = material_producer.getGlobalAttribute(attribute)234    if not filename:235        return None236    filenames = filename.getData()237    if not filenames:238        return None239    return filenames[0]240def get_node_sourcename(material_producer, node):241    """242    :description get material dependency node source name (source name)243    :param material_producer <GeometryProducer object>244    :param node <str>245    :example246        from core import shader247        from core import scenegraph248        knode = NodegraphAPI.GetAllSelectedNodes()[0]249        location = '/root/materials/body_material'250        material_producer = scenegraph.get_producer(knode, location=location)251        shader.get_node_sourcename(material_producer, 'body_texture')252    """253    attribute = "material.nodes.%s.srcName" % node254    sourcename = material_producer.getGlobalAttribute(attribute)255    if not sourcename:256        return None257    sourcenames = sourcename.getData()258    if not sourcenames:259        return None260    return sourcenames[0]261def get_assigned_shader_networks(producer):262    """263    :description get assigned shader networks data264    :param producer <GeometryProducer object>265    :example266        from core import shader267        from core import scenegraph268        knode = NodegraphAPI.GetAllSelectedNodes()[0]269        producer = scenegraph.get_producer(knode, location=None)270        shader.get_assigned_shader_networks(producer)271    """272    assigned_materials = get_assigned_materials(producer)273    root_producer = producer.getRootProducer()274    assigned_shader_networks = {}275    for assigned_material in assigned_materials:276        material_producer = root_producer.getProducerByPath(277            assigned_material278        )279        shader_networks = get_shader_networks(material_producer)280        assigned_shader_networks.setdefault(281            assigned_material, shader_networks282        )283        assigned_locations = get_assigned_locations(284            assigned_materials[assigned_material]285        )286        materail_assigments = {287            "assigned_locations": assigned_locations288        }289        assigned_shader_networks[assigned_material].update(290            materail_assigments291        )292    return assigned_shader_networks293def get_scene_shader_networks(producer, polymesh=False):294    """295    :description get scene (all) shader networks data296    :param producer <GeometryProducer object>297    :example298        from core import shader299        from core import scenegraph300        knode = NodegraphAPI.GetAllSelectedNodes()[0]301        producer = scenegraph.get_producer(knode, location=None)302        shader.get_scene_shader_networks(producer, polymesh=False)303    """304    scene_materials = get_scene_materials(producer)305    root_producer = producer.getRootProducer()306    scene_shader_networks = {}307    for scene_material in scene_materials:308        material_producer = root_producer.getProducerByPath(309            scene_material310        )311        shader_networks = get_shader_networks(material_producer)312        scene_shader_networks.setdefault(313            scene_material, shader_networks314        )315        assigned_locations = get_assigned_locations(316            scene_materials[scene_material], polymesh=polymesh317        )318        materail_assigments = {319            "assigned_locations": assigned_locations320        }321        scene_shader_networks[scene_material].update(322            materail_assigments323        )324    return scene_shader_networks325def get_assigned_locations(object_producers, polymesh=False):326    """327    :description convert GeometryProducer object to string (location)328    :param object_producers <list>329    :param polymesh <bool>330    :example331        from core import shader332        from core import scenegraph333        knode = NodegraphAPI.GetAllSelectedNodes()[0]334        location = '/root/materials/body_material'335        object_producers = scenegraph.get_producer(knode, location=location)336        shader.get_assigned_locations([object_producers])337    """338    if not object_producers:339        return None340    assigned_locations = {}341    for object_producer in object_producers:342        if object_producer.getType() == "curves":343            continue344        if not polymesh:345            if object_producer.getType() == "polymesh":346                continue347        child = object_producer.getFirstChild()348        if child.getType() == "polymesh":349            typed = "polymesh"350        else:351            typed = object_producer.getType()352        assigned_locations.setdefault(353            object_producer.getFullName(), typed354        )355    return assigned_locations356def get_shader_networks(material_producer):357    """358    :description get shader network dependency nodes and its prameter values359    :param material_producer <GeometryProducer object>360    :example361        from core import shader362        from core import scenegraph363        knode = NodegraphAPI.GetAllSelectedNodes()[0]364        location = '/root/materials/body_material'365        material_producer = scenegraph.get_producer(knode, location=location)366        shader.get_shader_networks(material_producer)367    """368    dependency_nodes = get_material_dependency_nodes(369        material_producer370    )371    shader_networks = {"nodes": {}, "terminals": {}}372    for dependency_node in dependency_nodes:373        shader_parameters = get_node_parameters(374            material_producer, dependency_node375        )376        shader_networks["nodes"].setdefault(377            dependency_node, shader_parameters378        )379    material_parameters = get_material_parameters(material_producer)380    shader_networks["terminals"] = material_parameters381    return shader_networks382def get_material_parameters(material_producer):383    """384    :description get material terminals prameter values385    :param material_producer <GeometryProducer object>386    :example387        from core import shader388        from core import scenegraph389        knode = NodegraphAPI.GetAllSelectedNodes()[0]390        location = '/root/materials/body_material'391        material_producer = scenegraph.get_producer(knode, location=location)392        shader.get_material_parameters(material_producer)393    """394    attributes = ["material.terminals"]395    material_attributes = scenegraph.get_attributes(396        material_producer, input_attributes=attributes397    )398    material_values = scenegraph.get_attribute_typed_values(399        material_producer, material_attributes400    )401    material_parameters = {}402    for k, v in material_values.items():403        paramter = k.rsplit("material.terminals.", 1)[1]404        material_parameters.setdefault(paramter, v)405    return material_parameters406def get_node_parameters(material_producer, node):407    """408    :description get the node prameter values409    :param material_producer <GeometryProducer object>410    :param node <str>411    :example412        from core import shader413        from core import scenegraph414        knode = NodegraphAPI.GetAllSelectedNodes()[0]415        location = '/root/materials/body_material'416        material_producer = scenegraph.get_producer(knode, location=location)417        shader.get_node_parameters(material_producer, 'face_texture')418    """419    attributes = [420        "material.nodes.%s.name" % node,421        "material.nodes.%s.type" % node,422        "material.nodes.%s.target" % node,423        "material.nodes.%s.parameters" % node,424        "material.nodes.%s.connections" % node,425    ]426    shader_parameters = {427        "primary": {},428        "parameters": {},429        "connections": {},430    }...test_pcp.py
Source:test_pcp.py  
1# -*- Python -*-2# Copyright (c) 2001-2004 Twisted Matrix Laboratories.3# See LICENSE for details.4__version__ = '$Revision: 1.5 $'[11:-2]5from StringIO import StringIO6from twisted.trial import unittest7from twisted.protocols import pcp8# Goal:9# Take a Protocol instance.  Own all outgoing data - anything that10# would go to p.transport.write.  Own all incoming data - anything11# that comes to p.dataReceived.12# I need:13# Something with the AbstractFileDescriptor interface.14# That is:15#  - acts as a Transport16#    - has a method write()17#    - which buffers18#  - acts as a Consumer19#    - has a registerProducer, unRegisterProducer20#    - tells the Producer to back off (pauseProducing) when its buffer is full.21#    - tells the Producer to resumeProducing when its buffer is not so full.22#  - acts as a Producer23#    - calls registerProducer24#    - calls write() on consumers25#    - honors requests to pause/resume producing26#    - honors stopProducing, and passes it along to upstream Producers27class DummyTransport:28    """A dumb transport to wrap around."""29    def __init__(self):30        self._writes = []31    def write(self, data):32        self._writes.append(data)33    def getvalue(self):34        return ''.join(self._writes)35class DummyProducer:36    resumed = False37    stopped = False38    paused = False39    def __init__(self, consumer):40        self.consumer = consumer41    def resumeProducing(self):42        self.resumed = True43        self.paused = False44    def pauseProducing(self):45        self.paused = True46    def stopProducing(self):47        self.stopped = True48class DummyConsumer(DummyTransport):49    producer = None50    finished = False51    unregistered = True52    def registerProducer(self, producer, streaming):53        self.producer = (producer, streaming)54    def unregisterProducer(self):55        self.unregistered = True56    def finish(self):57        self.finished = True58class TransportInterfaceTest(unittest.TestCase):59    proxyClass = pcp.BasicProducerConsumerProxy60    def setUp(self):61        self.underlying = DummyConsumer()62        self.transport = self.proxyClass(self.underlying)63    def testWrite(self):64        self.transport.write("some bytes")65class ConsumerInterfaceTest:66    """Test ProducerConsumerProxy as a Consumer.67    Normally we have ProducingServer -> ConsumingTransport.68    If I am to go between (Server -> Shaper -> Transport), I have to69    play the role of Consumer convincingly for the ProducingServer.70    """71    def setUp(self):72        self.underlying = DummyConsumer()73        self.consumer = self.proxyClass(self.underlying)74        self.producer = DummyProducer(self.consumer)75    def testRegisterPush(self):76        self.consumer.registerProducer(self.producer, True)77        ## Consumer should NOT have called PushProducer.resumeProducing78        self.failIf(self.producer.resumed)79    ## I'm I'm just a proxy, should I only do resumeProducing when80    ## I get poked myself?81    #def testRegisterPull(self):82    #    self.consumer.registerProducer(self.producer, False)83    #    ## Consumer SHOULD have called PushProducer.resumeProducing84    #    self.failUnless(self.producer.resumed)85    def testUnregister(self):86        self.consumer.registerProducer(self.producer, False)87        self.consumer.unregisterProducer()88        # Now when the consumer would ordinarily want more data, it89        # shouldn't ask producer for it.90        # The most succinct way to trigger "want more data" is to proxy for91        # a PullProducer and have someone ask me for data.92        self.producer.resumed = False93        self.consumer.resumeProducing()94        self.failIf(self.producer.resumed)95    def testFinish(self):96        self.consumer.registerProducer(self.producer, False)97        self.consumer.finish()98        # I guess finish should behave like unregister?99        self.producer.resumed = False100        self.consumer.resumeProducing()101        self.failIf(self.producer.resumed)102class ProducerInterfaceTest:103    """Test ProducerConsumerProxy as a Producer.104    Normally we have ProducingServer -> ConsumingTransport.105    If I am to go between (Server -> Shaper -> Transport), I have to106    play the role of Producer convincingly for the ConsumingTransport.107    """108    def setUp(self):109        self.consumer = DummyConsumer()110        self.producer = self.proxyClass(self.consumer)111    def testRegistersProducer(self):112        self.failUnlessEqual(self.consumer.producer[0], self.producer)113    def testPause(self):114        self.producer.pauseProducing()115        self.producer.write("yakkity yak")116        self.failIf(self.consumer.getvalue(),117                    "Paused producer should not have sent data.")118    def testResume(self):119        self.producer.pauseProducing()120        self.producer.resumeProducing()121        self.producer.write("yakkity yak")122        self.failUnlessEqual(self.consumer.getvalue(), "yakkity yak")123    def testResumeNoEmptyWrite(self):124        self.producer.pauseProducing()125        self.producer.resumeProducing()126        self.failUnlessEqual(len(self.consumer._writes), 0,127                             "Resume triggered an empty write.")128    def testResumeBuffer(self):129        self.producer.pauseProducing()130        self.producer.write("buffer this")131        self.producer.resumeProducing()132        self.failUnlessEqual(self.consumer.getvalue(), "buffer this")133    def testStop(self):134        self.producer.stopProducing()135        self.producer.write("yakkity yak")136        self.failIf(self.consumer.getvalue(),137                    "Stopped producer should not have sent data.")138class PCP_ConsumerInterfaceTest(ConsumerInterfaceTest, unittest.TestCase):139    proxyClass = pcp.BasicProducerConsumerProxy140class PCPII_ConsumerInterfaceTest(ConsumerInterfaceTest, unittest.TestCase):141    proxyClass = pcp.ProducerConsumerProxy142class PCP_ProducerInterfaceTest(ProducerInterfaceTest, unittest.TestCase):143    proxyClass = pcp.BasicProducerConsumerProxy144class PCPII_ProducerInterfaceTest(ProducerInterfaceTest, unittest.TestCase):145    proxyClass = pcp.ProducerConsumerProxy146class ProducerProxyTest(unittest.TestCase):147    """Producer methods on me should be relayed to the Producer I proxy.148    """149    proxyClass = pcp.BasicProducerConsumerProxy150    def setUp(self):151        self.proxy = self.proxyClass(None)152        self.parentProducer = DummyProducer(self.proxy)153        self.proxy.registerProducer(self.parentProducer, True)154    def testStop(self):155        self.proxy.stopProducing()156        self.failUnless(self.parentProducer.stopped)157class ConsumerProxyTest(unittest.TestCase):158    """Consumer methods on me should be relayed to the Consumer I proxy.159    """160    proxyClass = pcp.BasicProducerConsumerProxy161    def setUp(self):162        self.underlying = DummyConsumer()163        self.consumer = self.proxyClass(self.underlying)164    def testWrite(self):165        # NOTE: This test only valid for streaming (Push) systems.166        self.consumer.write("some bytes")167        self.failUnlessEqual(self.underlying.getvalue(), "some bytes")168    def testFinish(self):169        self.consumer.finish()170        self.failUnless(self.underlying.finished)171    def testUnregister(self):172        self.consumer.unregisterProducer()173        self.failUnless(self.underlying.unregistered)174class PullProducerTest:175    def setUp(self):176        self.underlying = DummyConsumer()177        self.proxy = self.proxyClass(self.underlying)178        self.parentProducer = DummyProducer(self.proxy)179        self.proxy.registerProducer(self.parentProducer, True)180    def testHoldWrites(self):181        self.proxy.write("hello")182        # Consumer should get no data before it says resumeProducing.183        self.failIf(self.underlying.getvalue(),184                    "Pulling Consumer got data before it pulled.")185    def testPull(self):186        self.proxy.write("hello")187        self.proxy.resumeProducing()188        self.failUnlessEqual(self.underlying.getvalue(), "hello")189    def testMergeWrites(self):190        self.proxy.write("hello ")191        self.proxy.write("sunshine")192        self.proxy.resumeProducing()193        nwrites = len(self.underlying._writes)194        self.failUnlessEqual(nwrites, 1, "Pull resulted in %d writes instead "195                             "of 1." % (nwrites,))196        self.failUnlessEqual(self.underlying.getvalue(), "hello sunshine")197    def testLateWrite(self):198        # consumer sends its initial pull before we have data199        self.proxy.resumeProducing()200        self.proxy.write("data")201        # This data should answer that pull request.202        self.failUnlessEqual(self.underlying.getvalue(), "data")203class PCP_PullProducerTest(PullProducerTest, unittest.TestCase):204    class proxyClass(pcp.BasicProducerConsumerProxy):205        iAmStreaming = False206class PCPII_PullProducerTest(PullProducerTest, unittest.TestCase):207    class proxyClass(pcp.ProducerConsumerProxy):208        iAmStreaming = False209# Buffering!210class BufferedConsumerTest(unittest.TestCase):211    """As a consumer, ask the producer to pause after too much data."""212    proxyClass = pcp.ProducerConsumerProxy213    def setUp(self):214        self.underlying = DummyConsumer()215        self.proxy = self.proxyClass(self.underlying)216        self.proxy.bufferSize = 100217        self.parentProducer = DummyProducer(self.proxy)218        self.proxy.registerProducer(self.parentProducer, True)219    def testRegisterPull(self):220        self.proxy.registerProducer(self.parentProducer, False)221        ## Consumer SHOULD have called PushProducer.resumeProducing222        self.failUnless(self.parentProducer.resumed)223    def testPauseIntercept(self):224        self.proxy.pauseProducing()225        self.failIf(self.parentProducer.paused)226    def testResumeIntercept(self):227        self.proxy.pauseProducing()228        self.proxy.resumeProducing()229        # With a streaming producer, just because the proxy was resumed is230        # not necessarily a reason to resume the parent producer.  The state231        # of the buffer should decide that.232        self.failIf(self.parentProducer.resumed)233    def testTriggerPause(self):234        """Make sure I say \"when.\""""235        # Pause the proxy so data sent to it builds up in its buffer.236        self.proxy.pauseProducing()237        self.failIf(self.parentProducer.paused, "don't pause yet")238        self.proxy.write("x" * 51)239        self.failIf(self.parentProducer.paused, "don't pause yet")240        self.proxy.write("x" * 51)241        self.failUnless(self.parentProducer.paused)242    def testTriggerResume(self):243        """Make sure I resumeProducing when my buffer empties."""244        self.proxy.pauseProducing()245        self.proxy.write("x" * 102)246        self.failUnless(self.parentProducer.paused, "should be paused")247        self.proxy.resumeProducing()248        # Resuming should have emptied my buffer, so I should tell my249        # parent to resume too.250        self.failIf(self.parentProducer.paused,251                    "Producer should have resumed.")252        self.failIf(self.proxy.producerPaused)253class BufferedPullTests(unittest.TestCase):254    class proxyClass(pcp.ProducerConsumerProxy):255        iAmStreaming = False256        def _writeSomeData(self, data):257            pcp.ProducerConsumerProxy._writeSomeData(self, data[:100])258            return min(len(data), 100)259    def setUp(self):260        self.underlying = DummyConsumer()261        self.proxy = self.proxyClass(self.underlying)262        self.proxy.bufferSize = 100263        self.parentProducer = DummyProducer(self.proxy)264        self.proxy.registerProducer(self.parentProducer, False)265    def testResumePull(self):266        # If proxy has no data to send on resumeProducing, it had better pull267        # some from its PullProducer.268        self.parentProducer.resumed = False269        self.proxy.resumeProducing()270        self.failUnless(self.parentProducer.resumed)271    def testLateWriteBuffering(self):272        # consumer sends its initial pull before we have data273        self.proxy.resumeProducing()274        self.proxy.write("datum" * 21)275        # This data should answer that pull request.276        self.failUnlessEqual(self.underlying.getvalue(), "datum" * 20)277        # but there should be some left over278        self.failUnlessEqual(self.proxy._buffer, ["datum"])279# TODO:280#  test that web request finishing bug (when we weren't proxying281#    unregisterProducer but were proxying finish, web file transfers282#    would hang on the last block.)...globalProcessors.py
Source:globalProcessors.py  
1# -*- coding: utf-8 -*-2import logging3import Artus.Utility.logger as logger4log = logging.getLogger(__name__)5import re6class globalProccesors(dict):7	def __init__(self, nickname):8		if re.search("(DY.?JetsToLLM(10to50|50|150))", nickname):9			self["Processors"] = [10				"#producer:PrintGenParticleDecayTreeProducer",11				"#filter:RunLumiEventFilter",12				"producer:GenBosonFromGenParticlesProducer",13				"producer:GenBosonDiLeptonDecayModeProducer",14				"producer:ValidGenTausProducer",15				"producer:GenDiLeptonDecayModeProducer"]16			if re.search("(Run2017|Summer17|Fall17)", nickname) == None:17				self["Processors"] += ["producer:LHEParticlesProducer"]18			self["Processors"] += [19				"producer:GenDiLeptonDecayModeProducer",20				"producer:GenParticleProducer",21				"producer:RecoElectronGenParticleMatchingProducer",22				"producer:RecoElectronGenTauMatchingProducer",23				"producer:RecoMuonGenParticleMatchingProducer",24				"producer:RecoMuonGenTauMatchingProducer",25				"producer:RecoTauGenParticleMatchingProducer",26				"producer:RecoTauGenTauMatchingProducer",27				"producer:MatchedLeptonsProducer",28				"producer:GenTauDecayProducer",29				"producer:GenTauCPProducer",30				"producer:TauSpinnerProducer",31				"producer:NicknameProducer",32				"producer:CrossSectionWeightProducer",33				"producer:GeneratorWeightProducer",34				"producer:NumberGeneratedEventsWeightProducer",35				"producer:PUWeightProducer",36				"producer:ScaleVariationProducer",37				"#filter:RunLumiEventFilter",38				"#filter:MetFilter"39			]40		elif re.search("LFV", nickname):41			self["Processors"] = [42				"#producer:PrintGenParticleDecayTreeProducer",43				"#filter:RunLumiEventFilter",44				"producer:GenBosonFromGenParticlesProducer",45				"producer:GenBosonDiLeptonDecayModeProducer",46				"producer:ValidGenTausProducer",47				"producer:GenDiLeptonDecayModeProducer",48				"#producer:LHEParticlesProducer",49				"producer:GenDiLeptonDecayModeLFVProducer",50				"producer:GenParticleProducer",51				"producer:RecoElectronGenParticleMatchingProducer",52				"producer:RecoElectronGenTauMatchingProducer",53				"producer:RecoMuonGenParticleMatchingProducer",54				"producer:RecoMuonGenTauMatchingProducer",55				"producer:RecoTauGenParticleMatchingProducer",56				"producer:RecoTauGenTauMatchingProducer",57				"producer:MatchedLeptonsProducer",58				"producer:GenTauDecayProducer",59				"producer:GenTauCPProducer",60				"producer:TauSpinnerProducer",61				"producer:NicknameProducer",62				"producer:CrossSectionWeightProducer",63				"producer:GeneratorWeightProducer",64				"producer:NumberGeneratedEventsWeightProducer",65				"producer:PUWeightProducer",66				"#producer:ScaleVariationProducer",67				"#filter:RunLumiEventFilter",68				"#filter:MetFilter"69			]70		elif re.search("Run201",nickname):71			self["Processors"] = [72				"#producer:PrintGenParticleDecayTreeProducer",73				"#filter:RunLumiEventFilter",74				"#filter:MetFilter",75				"filter:JsonFilter",76				"producer:NicknameProducer"77			]78		elif re.search("Embedding201", nickname):79			self["Processors"] = [80				"#filter:RunLumiEventFilter",81				"#producer:PrintGenParticleDecayTreeProducer",82				"#filter:MetFilter",83				"filter:JsonFilter",84				"producer:NicknameProducer",85				"producer:GenParticleProducer",86				"producer:GenBosonFromGenParticlesProducer",87				"producer:GenBosonDiLeptonDecayModeProducer",88				"producer:ValidGenTausProducer",89				"producer:GenDiLeptonDecayModeProducer",90				"producer:GenTauDecayProducer",91				"producer:GenTauCPProducer",92				"producer:RecoElectronGenParticleMatchingProducer",93				"producer:RecoElectronGenTauMatchingProducer",94				"producer:RecoMuonGenParticleMatchingProducer",95				"producer:RecoMuonGenTauMatchingProducer",96				"producer:RecoTauGenParticleMatchingProducer",97				"producer:RecoTauGenTauMatchingProducer",98				"producer:MatchedLeptonsProducer",99				"producer:GeneratorWeightProducer",100				"producer:TauSpinnerProducer",101			]102		elif re.search("EmbeddingMC", nickname):103			self["Processors"] =[104				"#producer:PrintGenParticleDecayTreeProducer",105				"#filter:RunLumiEventFilter",106				"#filter:MetFilter",107				"#filter:JsonFilter",108				"producer:NicknameProducer",109				"producer:GenParticleProducer",110				"producer:RecoElectronGenParticleMatchingProducer",111				"producer:RecoElectronGenTauMatchingProducer",112				"producer:RecoMuonGenParticleMatchingProducer",113				"producer:RecoMuonGenTauMatchingProducer",114				"producer:RecoTauGenParticleMatchingProducer",115				"producer:RecoTauGenTauMatchingProducer",116				"producer:MatchedLeptonsProducer",117				"producer:CrossSectionWeightProducer",118				"producer:GeneratorWeightProducer",119				"producer:NumberGeneratedEventsWeightProducer"120			]121		elif re.search("(HTo.*TauTau|H2JetsToTauTau|Higgs|JJHiggs)",nickname):122			self["Processors"] = [123				"#producer:PrintGenParticleDecayTreeProducer",124				"#filter:RunLumiEventFilter",125				"producer:GenBosonFromGenParticlesProducer",126				"producer:GenBosonDiLeptonDecayModeProducer",127				"producer:GenBosonProductionProducer",128				"producer:ValidGenTausProducer",129				"producer:GenDiLeptonDecayModeProducer",130				"producer:GenParticleProducer",131				"producer:RecoElectronGenParticleMatchingProducer",132				"producer:RecoMuonGenParticleMatchingProducer",133				"producer:RecoTauGenParticleMatchingProducer",134				"producer:RecoElectronGenTauMatchingProducer",135				"producer:RecoMuonGenTauMatchingProducer",136				"producer:RecoTauGenTauMatchingProducer",137				"producer:MatchedLeptonsProducer",138				"producer:GenTauDecayProducer",139				"producer:GenTauCPProducer"]140			if re.search("(Run2017|Summer17|Fall17|Run2018|Autumn18)", nickname) == None:141				self["Processors"] += ["producer:GenHiggsCPProducer"]      #needs lhe info which is not stored for 2017142			self["Processors"] += [143				"producer:TauSpinnerProducer",144				"producer:NicknameProducer",145				"producer:CrossSectionWeightProducer",146				"producer:GeneratorWeightProducer",147				"producer:NumberGeneratedEventsWeightProducer",148				"producer:PUWeightProducer",149				"producer:ScaleVariationProducer",150				"#filter:MetFilter"151			]152		else:153			self["Processors"] = [154				"#producer:PrintGenParticleDecayTreeProducer",155				"#filter:RunLumiEventFilter",156				"producer:NicknameProducer",157				"producer:GenParticleProducer",158				"producer:RecoElectronGenParticleMatchingProducer",159				"producer:RecoElectronGenTauMatchingProducer",160				"producer:RecoMuonGenParticleMatchingProducer",161				"producer:RecoMuonGenTauMatchingProducer",162				"producer:RecoTauGenParticleMatchingProducer",163				"producer:RecoTauGenTauMatchingProducer",164				"producer:MatchedLeptonsProducer",165				"producer:CrossSectionWeightProducer",166				"producer:GeneratorWeightProducer",167				"producer:NumberGeneratedEventsWeightProducer",168				"producer:PUWeightProducer",169				"#producer:ScaleVariationProducer",170				"#filter:MetFilter"...producer.ts
Source:producer.ts  
...100  const connect = async (client: BrokerClientType) => {101    if (kafkaProducer) {102      throw new Error('Not closed, connection of producter active. Invoke disconnect previous')103    }104    const producer = kafkaProducer ? kafkaProducer : (client as Kafka).producer();105    if (!kafkaProducer) {106      kafkaProducer = producer;107      await producer.connect();108      producer.on('producer.disconnect', () => {109        kafkaProducer = undefined;110      });111    }112  }113  const disconnect = async () => {114    if (kafkaProducer && brokerOptions.type === 'kafka') {115      kafkaProducer.disconnect();116    }117  }118  return {...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
