Best Python code snippet using tavern
test_16_HierarchicalLogging.py
Source:test_16_HierarchicalLogging.py  
1#2# This file is protected by Copyright. Please refer to the COPYRIGHT file 3# distributed with this source distribution.4# 5# This file is part of REDHAWK core.6# 7# REDHAWK core is free software: you can redistribute it and/or modify it under 8# the terms of the GNU Lesser General Public License as published by the Free 9# Software Foundation, either version 3 of the License, or (at your option) any 10# later version.11# 12# REDHAWK core is distributed in the hope that it will be useful, but WITHOUT 13# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 14# FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more15# details.16# 17# You should have received a copy of the GNU Lesser General Public License 18# along with this program.  If not, see http://www.gnu.org/licenses/.19#20from _unitTestHelpers import scatest21from ossie.utils import sb, redhawk22import unittest, contextlib, time, os23from ossie.cf import CF24import tempfile25def killDomain(name):26    pids = [pid for pid in os.listdir('/proc') if pid.isdigit()]27    for pid in pids:28        try:29            cmdline=open(os.path.join('/proc', pid, 'cmdline'), 'rb').read()30            if 'DOMAIN_NAME' in cmdline:31                if name in cmdline:32                    os.kill(int(pid), 2)33        except:34            pass35@scatest.requireLog4cxx36class CppHierarchicalDomainLogging(scatest.CorbaTestCase):37    def setUp(self):38        domBooter, self._domMgr = self.launchDomainManager()39        devBooter, self._devMgr = self.launchDeviceManager("/nodes/test_ExecutableDevice_node/DeviceManager.dcd.xml")40        self._rhDom = redhawk.attach(scatest.getTestDomainName())41        self.assertEquals(len(self._rhDom._get_applications()), 0)42    def tearDown(self):43        # Do all application shutdown before calling the base class tearDown,44        # or failures will probably occur.45        redhawk.core._cleanUpLaunchedApps()46        scatest.CorbaTestCase.tearDown(self)47        try:48            os.remove('sdr/dom/waveforms/logger_overload_w/tmp.sad.xml')49        except:50            pass51        try:52            os.remove('sdr/dom/waveforms/logger_config/tmp.sad.xml')53        except:54            pass55        # need to let event service clean up event channels56        # cycle period is 10 milliseconds57        time.sleep(0.1)58        redhawk.setTrackApps(False)59    def test_logconfiguri_application(self):60        self.cname = "logger"61        # Automatically clean up62        redhawk.setTrackApps(True)63        # Create Application from $SDRROOT path64        app_1 = self._rhDom.createApplication("/waveforms/logger_w/logger_w.sad.xml", initConfiguration={'LOGGING_CONFIG_URI':'file://'+os.getcwd()+'/high_thresh.cfg'})65        self.assertEquals(app_1.getLogLevel('logger_1'), 30000)66        self.assertEquals(app_1.getLogLevel('logger_2'), 30000)67        loggers_1 = app_1.getNamedLoggers()68        app_2 = self._rhDom.createApplication("/waveforms/logger_w/logger_w.sad.xml")69        loggers_2 = app_2.getNamedLoggers()70        self.assertEquals(app_1.getLogLevel('logger_1'), 30000)71        self.assertEquals(app_1.getLogLevel('logger_2'), 30000)72        self.assertEquals(app_2.getLogLevel('logger_1'), 40000)73        self.assertEquals(app_2.getLogLevel('logger_2'), 40000)74    def test_logconfiguri_overload(self):75        self.cname = "logger"76        # Automatically clean up77        redhawk.setTrackApps(True)78        # Create Application from $SDRROOT path79        fp = open('sdr/dom/waveforms/logger_overload_w/logger_overload_w.sad.xml','r')80        sad_contents = fp.read()81        fp.close()82        sad_contents = sad_contents.replace('@@@CWD@@@', os.getcwd())83        fp = open('sdr/dom/waveforms/logger_overload_w/tmp.sad.xml','w')84        fp.write(sad_contents)85        fp.close()86        app_1 = self._rhDom.createApplication("/waveforms/logger_overload_w/tmp.sad.xml")87        self.assertEquals(app_1.getLogLevel('logger_2'), 30000)88        loggers_1 = app_1.getNamedLoggers()89        app_2 = self._rhDom.createApplication("/waveforms/logger_w/logger_w.sad.xml")90        loggers_2 = app_2.getNamedLoggers()91        self.assertEquals(app_1.getLogLevel('logger_2'), 30000)92        self.assertEquals(app_2.getLogLevel('logger_1'), 40000)93    def test_loggingconfig(self):94        self.cname = "logger"95        fp=open('./runtest.props','r')96        runtest_props = fp.read()97        fp.close()98        fp=open('./high_thresh.cfg','r')99        high_thresh_cfg = fp.read()100        fp.close()101        # Automatically clean up102        redhawk.setTrackApps(True)103        # Create Application from $SDRROOT path104        fp = open('sdr/dom/waveforms/logger_config/logger_config.sad.xml','r')105        sad_contents = fp.read()106        fp.close()107        sad_contents = sad_contents.replace('@@@CWD@@@', os.getcwd())108        fp = open('sdr/dom/waveforms/logger_config/tmp.sad.xml','w')109        fp.write(sad_contents)110        fp.close()111        app_1 = self._rhDom.createApplication("/waveforms/logger_config/tmp.sad.xml")112        self.assertEquals(app_1.getLogLevel('logger_1'), 0)113        self.assertEquals(app_1.getLogLevel('logger_2'), 50000)114        logger_1 = -1115        logger_2 = -1116        for comp_idx in range(len(app_1.comps)):117            if app_1.comps[comp_idx].instanceName == 'logger_1':118                logger_1 = comp_idx119                break120        if logger_1 == 0:121            logger_2 = 1122        if logger_1 == 1:123            logger_2 = 0124        self.assertNotEqual(logger_1, -1)125        self.assertNotEqual(logger_2, -1)126        self.assertEquals(app_1.comps[logger_1].getLogConfig(), high_thresh_cfg)127        self.assertEquals(app_1.comps[logger_2].getLogConfig(), runtest_props)128        loggers_1 = app_1.getNamedLoggers()129        app_2 = self._rhDom.createApplication("/waveforms/logger_config/tmp.sad.xml", initConfiguration={'LOGGING_CONFIG_URI':'file://'+os.getcwd()+'/high_thresh.cfg'})130        loggers_2 = app_2.getNamedLoggers()131        self.assertEquals(app_2.getLogLevel('logger_1'), 30000)132        self.assertEquals(app_2.getLogLevel('logger_2'), 30000)133        self.assertEquals(app_2.comps[logger_1].getLogConfig(), high_thresh_cfg)134        self.assertEquals(app_2.comps[logger_2].getLogConfig(), high_thresh_cfg)135        app_1.start()136        time.sleep(1.5)137    def test_application_cpp_access(self):138        self.cname = "logger"139        self.applicationAccess("/waveforms/logger_w/logger_w.sad.xml")140    def test_application_py_access(self):141        self.cname = "logger_py"142        self.applicationAccess("/waveforms/logger_py_w/logger_py_w.sad.xml")143    @scatest.requireJava144    def test_application_java_access(self):145        self.cname = "logger_java"146        self.applicationAccess("/waveforms/logger_java_w/logger_java_w.sad.xml")147    def applicationAccess(self, sadfile):148        # Automatically clean up149        redhawk.setTrackApps(True)150        # Create Application from $SDRROOT path151        app = self._rhDom.createApplication(sadfile)152        loggers = app.getNamedLoggers()153        orig_loggers = {}154        orig_loggers[self.cname+'_1'] = app.getLogLevel(self.cname+'_1')155        orig_loggers[self.cname+'_1.lower'] = app.getLogLevel(self.cname+'_1.lower')156        orig_loggers[self.cname+'_1.namespace.lower'] = app.getLogLevel(self.cname+'_1.namespace.lower')157        orig_loggers[self.cname+'_1.user.more_stuff'] = app.getLogLevel(self.cname+'_1.user.more_stuff')158        orig_loggers[self.cname+'_1.user.some_stuff'] = app.getLogLevel(self.cname+'_1.user.some_stuff')159        self.assertTrue(self.cname+'_1' in loggers)160        self.assertTrue(self.cname+'_1.lower' in loggers)161        self.assertTrue(self.cname+'_1.namespace.lower' in loggers)162        self.assertTrue(self.cname+'_1.system.PortSupplier' in loggers)163        self.assertTrue(self.cname+'_1.system.PropertySet' in loggers)164        self.assertTrue(self.cname+'_1.system.Resource' in loggers)165        self.assertTrue(self.cname+'_1.user.more_stuff' in loggers)166        self.assertTrue(self.cname+'_1.user.some_stuff' in loggers)167        self.assertTrue(self.cname+'_2' in loggers)168        self.assertTrue(self.cname+'_2.lower' in loggers)169        self.assertTrue(self.cname+'_2.namespace.lower' in loggers)170        self.assertTrue(self.cname+'_2.system.PortSupplier' in loggers)171        self.assertTrue(self.cname+'_2.system.PropertySet' in loggers)172        self.assertTrue(self.cname+'_2.system.Resource' in loggers)173        self.assertTrue(self.cname+'_2.user.more_stuff' in loggers)174        self.assertTrue(self.cname+'_2.user.some_stuff' in loggers)175        self.assertRaises(CF.UnknownIdentifier, app.setLogLevel, self.cname+'_1.foo', 'all')176        self.assertRaises(CF.UnknownIdentifier, app.getLogLevel, self.cname+'_1.foo')177        app.setLogLevel(self.cname+'_1', 'all')178        self.assertEquals(app.getLogLevel(self.cname+'_1'), CF.LogLevels.ALL)179        self.assertEquals(app.getLogLevel(self.cname+'_1.lower'), CF.LogLevels.ALL)180        self.assertEquals(app.getLogLevel(self.cname+'_1.namespace.lower'), CF.LogLevels.ALL)181        self.assertEquals(app.getLogLevel(self.cname+'_1.user.more_stuff'), CF.LogLevels.ALL)182        self.assertEquals(app.getLogLevel(self.cname+'_1.user.some_stuff'), CF.LogLevels.ALL)183        app.setLogLevel(self.cname+'_1', 'off')184        self.assertEquals(app.getLogLevel(self.cname+'_1'), CF.LogLevels.OFF)185        self.assertEquals(app.getLogLevel(self.cname+'_1.lower'), CF.LogLevels.OFF)186        self.assertEquals(app.getLogLevel(self.cname+'_1.namespace.lower'), CF.LogLevels.OFF)187        self.assertEquals(app.getLogLevel(self.cname+'_1.user.more_stuff'), CF.LogLevels.OFF)188        self.assertEquals(app.getLogLevel(self.cname+'_1.user.some_stuff'), CF.LogLevels.OFF)189        # break the level inheritance190        app.setLogLevel(self.cname+'_1.user', 'trace')191        app.setLogLevel(self.cname+'_1', 'all')192        self.assertEquals(app.getLogLevel(self.cname+'_1'), CF.LogLevels.ALL)193        self.assertEquals(app.getLogLevel(self.cname+'_1.lower'), CF.LogLevels.ALL)194        self.assertEquals(app.getLogLevel(self.cname+'_1.namespace.lower'), CF.LogLevels.ALL)195        self.assertEquals(app.getLogLevel(self.cname+'_1.user.more_stuff'), CF.LogLevels.TRACE)196        self.assertEquals(app.getLogLevel(self.cname+'_1.user.some_stuff'), CF.LogLevels.TRACE)197        # set the log with a value rather than the string198        app.setLogLevel(self.cname+'_1', CF.LogLevels.DEBUG)199        self.assertEquals(app.getLogLevel(self.cname+'_1'), CF.LogLevels.DEBUG)200        self.assertEquals(app.getLogLevel(self.cname+'_1.lower'), CF.LogLevels.DEBUG)201        self.assertEquals(app.getLogLevel(self.cname+'_1.namespace.lower'), CF.LogLevels.DEBUG)202        self.assertEquals(app.getLogLevel(self.cname+'_1.user.more_stuff'), CF.LogLevels.TRACE)203        self.assertEquals(app.getLogLevel(self.cname+'_1.user.some_stuff'), CF.LogLevels.TRACE)204        app.resetLog()205        self.assertEquals(orig_loggers[self.cname+'_1'], app.getLogLevel(self.cname+'_1'))206        self.assertEquals(orig_loggers[self.cname+'_1.lower'], app.getLogLevel(self.cname+'_1.lower'))207        self.assertEquals(orig_loggers[self.cname+'_1.namespace.lower'], app.getLogLevel(self.cname+'_1.namespace.lower'))208        self.assertEquals(orig_loggers[self.cname+'_1.user.more_stuff'], app.getLogLevel(self.cname+'_1.user.more_stuff'))209        self.assertEquals(orig_loggers[self.cname+'_1.user.some_stuff'], app.getLogLevel(self.cname+'_1.user.some_stuff'))210        # verify that inheritance is re-established211        app.setLogLevel(self.cname+'_1', 'all')212        self.assertEquals(CF.LogLevels.ALL, app.getLogLevel(self.cname+'_1'))213        self.assertEquals(CF.LogLevels.ALL, app.getLogLevel(self.cname+'_1.lower'))214        self.assertEquals(CF.LogLevels.ALL, app.getLogLevel(self.cname+'_1.namespace.lower'))215        self.assertEquals(CF.LogLevels.ALL, app.getLogLevel(self.cname+'_1.user.more_stuff'))216        self.assertEquals(CF.LogLevels.ALL, app.getLogLevel(self.cname+'_1.user.some_stuff'))217@scatest.requireLog4cxx218class ApplicationDomainLogging(scatest.CorbaTestCase):219    def setUp(self):220        self.tmpfile=tempfile.mktemp()221        devmgrs = ['test_ExecutableDevice_node']222        self._rhDom = redhawk.kickDomain(domain_name=scatest.getTestDomainName(),223                                         kick_device_managers=True,224                                         device_managers = devmgrs,225                                         stdout=self.tmpfile,226                                         detached=False)227        try:228            self.waitForDeviceManager(devmgrs[0])229        except:230            traceback.print_exc()231            pass232        self.assertEquals(len(self._rhDom._get_applications()), 0)233    def tearDown(self):234        # Do all application shutdown before calling the base class tearDown,235        # or failures will probably occur.236        redhawk.core._cleanUpLaunchedApps()237        scatest.CorbaTestCase.tearDown(self)238        try:239            os.remove(self.tmpfile)240        except:241            pass242        try:243            self._rhDom.terminate()244        except:245            pass246        killDomain(scatest.getTestDomainName())247        # need to let event service clean up event channels248        # cycle period is 10 milliseconds249        time.sleep(0.1)250    def test_domain_hierarchy_all(self):251        self._rhDom.setLogLevel('DomainManager', 'all')252        self._rhDom.devMgrs[0].setLogLevel('DeviceManager', 'all')253        props = self._rhDom.query([])254        props = self._rhDom.devMgrs[0].query([])255        fp = open(self.tmpfile, 'r')256        output=fp.read()257        fp.close()258        self.assertTrue('DomainManager.PropertySet' in output)259        self.assertTrue('DeviceManager.PropertySet' in output)260    def test_domain_hierarchy(self):261        self._rhDom.setLogLevel('DomainManager', 'info')262        self._rhDom.devMgrs[0].setLogLevel('DeviceManager', 'info')263        props = self._rhDom.query([])264        props = self._rhDom.devMgrs[0].query([])265        fp = open(self.tmpfile, 'r')266        output=fp.read()267        fp.close()268        self.assertFalse('DomainManager.PropertySet' in output)269        self.assertFalse('DeviceManager.PropertySet' in output)270    def application_default_log(self, appname, compname):271        app = self._rhDom.createApplication(appname)272        app.setLogLevel(compname+'_1.user.more_stuff', 'all')273        app.start()274        begin_time = time.time()275        while time.time()-begin_time < 1.5:276            fp = open(self.tmpfile, 'r')277            output=fp.read()278            fp.close()279            if output.find(compname+'_1.user.more_stuff') != -1:280                break281            time.sleep(0.1)282        app.stop()283        self.assertNotEqual(output.find(compname+'_1.user.more_stuff'), -1)284    def test_application_default_log_cpp(self):285        self.application_default_log("/waveforms/logger_w/logger_w.sad.xml", 'logger')286    def test_application_default_log_py(self):287        self.application_default_log("/waveforms/logger_py_w/logger_py_w.sad.xml", 'logger_py')288    @scatest.requireJava289    def test_application_default_log_java(self):290        self.application_default_log("/waveforms/logger_java_w/logger_java_w.sad.xml", 'logger_java')291def all_log_levels(_obj):292    _obj.comp = sb.launch(_obj.cname, properties={'LOGGING_CONFIG_URI':'file://'+os.getcwd()+'/high_thresh.cfg'})293    _obj.comp.start()294    # make sure that all the named loggers appear295    loggers = _obj.comp.getNamedLoggers()296    _obj.assertTrue(_obj.cname+'_1' in loggers)297    _obj.assertTrue(_obj.cname+'_1.lower' in loggers)298    _obj.assertTrue(_obj.cname+'_1.lower.second.first' in loggers)299    _obj.assertTrue(_obj.cname+'_1.lower.third' in loggers)300    _obj.assertTrue(_obj.cname+'_1.namespace.lower' in loggers)301    _obj.assertTrue(_obj.cname+'_1.user.more_stuff' in loggers)302    _obj.assertTrue(_obj.cname+'_1.user.some_stuff' in loggers)303    _obj.assertTrue(_obj.cname+'_1.system.PortSupplier' in loggers)304    _obj.assertTrue(_obj.cname+'_1.system.PropertySet' in loggers)305    _obj.assertTrue(_obj.cname+'_1.system.Resource' in loggers)306    for logger in loggers:307        _obj.assertTrue(_obj.cname+'_1' in logger)308    # verify that the logger level is inherited309    _obj.comp.setLogLevel(_obj.cname+'_1', 'all')310    time.sleep(0.5)311    _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1'), CF.LogLevels.ALL)312    _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.lower'), CF.LogLevels.ALL)313    _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.lower.second.first'), CF.LogLevels.ALL)314    _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.lower.third'), CF.LogLevels.ALL)315    _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.namespace.lower'), CF.LogLevels.ALL)316    _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.user.more_stuff'), CF.LogLevels.ALL)317    _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.user.some_stuff'), CF.LogLevels.ALL)318    _obj.comp.setLogLevel(_obj.cname+'_1', 'off')319    _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1'), CF.LogLevels.OFF)320    _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.lower'), CF.LogLevels.OFF)321    _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.namespace.lower'), CF.LogLevels.OFF)322    _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.user.more_stuff'), CF.LogLevels.OFF)323    _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.user.some_stuff'), CF.LogLevels.OFF)324    # make sure that the log content is correct325    content = _obj.readLogFile('foo/bar/test.log')326    find_1 = content.find('message from _log')327    find_2 = content.find('message from baseline_1_logger')328    find_3 = content.find('message from baseline_2_logger')329    find_4 = content.find('message from namespaced_logger')330    find_5 = content.find('message from basetree_logger')331    _obj.assertTrue(find_1<find_2)332    _obj.assertTrue(find_2<find_3)333    _obj.assertTrue(find_3<find_4)334    _obj.assertTrue(find_4<find_5)335    # verify that the loggers are off336    time.sleep(0.5)337    content_again = _obj.readLogFile('foo/bar/test.log')338    _obj.assertEquals(len(content), len(content_again))339    # break the level inheritance340    _obj.comp.setLogLevel(_obj.cname+'_1.user', 'trace')341    _obj.comp.setLogLevel(_obj.cname+'_1', 'all')342    _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1'), CF.LogLevels.ALL)343    _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.lower'), CF.LogLevels.ALL)344    _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.lower.second.first'), CF.LogLevels.ALL)345    _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.lower.third'), CF.LogLevels.ALL)346    _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.namespace.lower'), CF.LogLevels.ALL)347    _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.user.more_stuff'), CF.LogLevels.TRACE)348    _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.user.some_stuff'), CF.LogLevels.TRACE)349    # set the log with a value rather than the string350    _obj.comp.setLogLevel(_obj.cname+'_1', CF.LogLevels.DEBUG)351    _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1'), CF.LogLevels.DEBUG)352    _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.lower'), CF.LogLevels.DEBUG)353    _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.lower.second.first'), CF.LogLevels.DEBUG)354    _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.lower.third'), CF.LogLevels.DEBUG)355    _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.namespace.lower'), CF.LogLevels.DEBUG)356    _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.user.more_stuff'), CF.LogLevels.TRACE)357    _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.user.some_stuff'), CF.LogLevels.TRACE)358    _obj.comp.stop()359def reset_logger(_obj):360    _obj.comp = sb.launch(_obj.cname, properties={'LOGGING_CONFIG_URI':'file://'+os.getcwd()+'/high_thresh.cfg'})361    _obj.comp.start()362    # make sure that all the named loggers appear363    loggers = _obj.comp.getNamedLoggers()364    _obj.assertTrue(_obj.cname+'_1' in loggers)365    _obj.assertTrue(_obj.cname+'_1.lower' in loggers)366    _obj.assertTrue(_obj.cname+'_1.lower.second.first' in loggers)367    _obj.assertTrue(_obj.cname+'_1.lower.third' in loggers)368    _obj.assertTrue(_obj.cname+'_1.namespace.lower' in loggers)369    _obj.assertTrue(_obj.cname+'_1.user.more_stuff' in loggers)370    _obj.assertTrue(_obj.cname+'_1.user.some_stuff' in loggers)371    orig_loggers = {}372    orig_loggers[_obj.cname+'_1'] = _obj.comp.getLogLevel(_obj.cname+'_1')373    orig_loggers[_obj.cname+'_1.lower'] = _obj.comp.getLogLevel(_obj.cname+'_1.lower')374    orig_loggers[_obj.cname+'_1.lower.second.first'] = _obj.comp.getLogLevel(_obj.cname+'_1.lower.second.first')375    orig_loggers[_obj.cname+'_1.lower.third'] = _obj.comp.getLogLevel(_obj.cname+'_1.lower.third')376    orig_loggers[_obj.cname+'_1.namespace.lower'] = _obj.comp.getLogLevel(_obj.cname+'_1.namespace.lower')377    orig_loggers[_obj.cname+'_1.user.more_stuff'] = _obj.comp.getLogLevel(_obj.cname+'_1.user.more_stuff')378    orig_loggers[_obj.cname+'_1.user.some_stuff'] = _obj.comp.getLogLevel(_obj.cname+'_1.user.some_stuff')379    # verify that the logger level is inherited380    _obj.comp.setLogLevel(_obj.cname+'_1', 'all')381    time.sleep(0.5)382    _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1'), CF.LogLevels.ALL)383    _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.lower'), CF.LogLevels.ALL)384    _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.lower.second.first'), CF.LogLevels.ALL)385    _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.lower.third'), CF.LogLevels.ALL)386    _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.namespace.lower'), CF.LogLevels.ALL)387    _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.user.more_stuff'), CF.LogLevels.ALL)388    _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.user.some_stuff'), CF.LogLevels.ALL)389    _obj.comp.setLogLevel(_obj.cname+'_1', 'off')390    _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1'), CF.LogLevels.OFF)391    _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.lower'), CF.LogLevels.OFF)392    _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.lower.second.first'), CF.LogLevels.OFF)393    _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.lower.third'), CF.LogLevels.OFF)394    _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.namespace.lower'), CF.LogLevels.OFF)395    _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.user.more_stuff'), CF.LogLevels.OFF)396    _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.user.some_stuff'), CF.LogLevels.OFF)397    # break the level inheritance398    _obj.comp.setLogLevel(_obj.cname+'_1.user', 'trace')399    _obj.comp.setLogLevel(_obj.cname+'_1', 'all')400    _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1'), CF.LogLevels.ALL)401    _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.lower'), CF.LogLevels.ALL)402    _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.lower.second.first'), CF.LogLevels.ALL)403    _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.lower.third'), CF.LogLevels.ALL)404    _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.namespace.lower'), CF.LogLevels.ALL)405    _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.user.more_stuff'), CF.LogLevels.TRACE)406    _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.user.some_stuff'), CF.LogLevels.TRACE)407    # verify that the levels are reset back to their original level408    _obj.comp.resetLog()409    _obj.assertEquals(orig_loggers[_obj.cname+'_1'], _obj.comp.getLogLevel(_obj.cname+'_1'))410    _obj.assertEquals(orig_loggers[_obj.cname+'_1.lower'], _obj.comp.getLogLevel(_obj.cname+'_1.lower'))411    _obj.assertEquals(orig_loggers[_obj.cname+'_1.lower.second.first'], _obj.comp.getLogLevel(_obj.cname+'_1.lower.second.first'))412    _obj.assertEquals(orig_loggers[_obj.cname+'_1.lower.third'], _obj.comp.getLogLevel(_obj.cname+'_1.lower.third'))413    _obj.assertEquals(orig_loggers[_obj.cname+'_1.namespace.lower'], _obj.comp.getLogLevel(_obj.cname+'_1.namespace.lower'))414    _obj.assertEquals(orig_loggers[_obj.cname+'_1.user.more_stuff'], _obj.comp.getLogLevel(_obj.cname+'_1.user.more_stuff'))415    _obj.assertEquals(orig_loggers[_obj.cname+'_1.user.some_stuff'], _obj.comp.getLogLevel(_obj.cname+'_1.user.some_stuff'))416    # verify that inheritance is re-established417    _obj.comp.setLogLevel(_obj.cname+'_1', 'all')418    _obj.assertEquals(CF.LogLevels.ALL, _obj.comp.getLogLevel(_obj.cname+'_1'))419    _obj.assertEquals(CF.LogLevels.ALL, _obj.comp.getLogLevel(_obj.cname+'_1.lower'))420    _obj.assertEquals(CF.LogLevels.ALL, _obj.comp.getLogLevel(_obj.cname+'_1.lower.second.first'))421    _obj.assertEquals(CF.LogLevels.ALL, _obj.comp.getLogLevel(_obj.cname+'_1.lower.third'))422    _obj.assertEquals(CF.LogLevels.ALL, _obj.comp.getLogLevel(_obj.cname+'_1.namespace.lower'))423    _obj.assertEquals(CF.LogLevels.ALL, _obj.comp.getLogLevel(_obj.cname+'_1.user.more_stuff'))424    _obj.assertEquals(CF.LogLevels.ALL, _obj.comp.getLogLevel(_obj.cname+'_1.user.some_stuff'))425    _obj.comp.stop()426def single_log_level(_obj):427    _obj.comp = sb.launch(_obj.cname, properties={'LOGGING_CONFIG_URI':'file://'+os.getcwd()+'/high_thresh.cfg'})428    _obj.comp.start()429    loggers = _obj.comp.getNamedLoggers()430    _obj.assertRaises(Exception, _obj.comp.setLogLevel, _obj.cname+'_1.user.more_stuff', 'hello')431    _obj.comp.setLogLevel(_obj.cname+'_1.user.more_stuff', 'all')432    _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.user.more_stuff'), CF.LogLevels.ALL)433    time.sleep(0.5)434    _obj.comp.setLogLevel(_obj.cname+'_1.user.more_stuff', 'off')435    _obj.comp.setLogLevel(_obj.cname+'_1.user.some_stuff', 'all')436    _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.user.some_stuff'), CF.LogLevels.ALL)437    time.sleep(0.5)438    _obj.comp.setLogLevel(_obj.cname+'_1.user.some_stuff', 'off')439    _obj.comp.setLogLevel(_obj.cname+'_1.user.more_stuff', 'all')440    _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.user.more_stuff'), CF.LogLevels.ALL)441    time.sleep(0.5)442    _obj.comp.setLogLevel(_obj.cname+'_1.user.more_stuff', 'off')443    content = _obj.readLogFile('foo/bar/test.log')444    find_1 = content.find('message from baseline_1_logger')445    find_2 = content.find('message from baseline_2_logger')446    _obj.assertTrue(find_2<find_1)447    find_3 = content.find('message from baseline_2_logger', find_1)448    _obj.assertTrue(find_1<find_3)449    _obj.comp.stop()450def selective_log_setting(_obj):451    _obj.comp = sb.launch(_obj.cname, instanceName='logger_1', properties={'LOGGING_CONFIG_URI':'file://'+os.getcwd()+'/hierarchical_log.cfg'})452    _obj.comp.start()453    loggers = _obj.comp.getNamedLoggers()454    _obj.comp.setLogLevel('logger_1.user.more_stuff', 'all')455    _obj.assertEquals(_obj.comp.getLogLevel('logger_1.user.more_stuff'), CF.LogLevels.ALL)456    _obj.assertEquals(_obj.comp.getLogLevel('logger_1.user.some_stuff'), CF.LogLevels.WARN)457    _obj.assertEquals(_obj.comp.getLogLevel('logger_1.namespace.lower'), CF.LogLevels.TRACE)458    time.sleep(0.5)459    _obj.comp.stop()460    test_content = _obj.readLogFile('foo/bar/test.log')461    logger_test_content = _obj.readLogFile('logger_test.log')462    count_test = test_content.count('message from baseline_2_logger')463    count_newline = test_content.count('\n')464    count_log4cxx_test = test_content.count('this is the log4cxx logger')465    count_java_eventchannel_messages = test_content.count('Unable to resolve EventChannelManager')466    count_logger_test = logger_test_content.count('message from namespaced_logger')467    count_logger_newline = logger_test_content.count('\n')468    _obj.assertEquals(count_test, count_newline-count_log4cxx_test-count_java_eventchannel_messages)469    _obj.assertEquals(count_logger_test, count_logger_newline)470class PyHierarchicalLogging(scatest.CorbaTestCase):471    def setUp(self):472        self.cname = "logger_py"473    def readLogFile(self, filename):474        fp = open(filename,'r')475        stuff = fp.read()476        return stuff477    def tearDown(self):478        sb.release()479        try:480            os.remove('foo/bar/test.log')481        except:482            pass483        try:484            os.rmdir('foo/bar')485        except:486            pass487        try:488            os.rmdir('foo')489        except:490            pass491        try:492            os.remove('logger_test.log')493        except:494            pass495        # Try to clean up the event channel, if it was created496        context = None497        # Do all application shutdown before calling the base class tearDown,498        # or failures will probably occur.499        scatest.CorbaTestCase.tearDown(self)500    def test_py_all_log_levels(self):501        all_log_levels(self)502    def test_py_reset_logger(self):503        reset_logger(self)504    def test_py_single_log_level(self):505        single_log_level(self)506    def test_py_selective_log_setting(self):507        selective_log_setting(self)508@scatest.requireJava509class JavaHierarchicalLogging(scatest.CorbaTestCase):510    def setUp(self):511        self.cname = "logger_java"512    def readLogFile(self, filename):513        fp = open(filename,'r')514        stuff = fp.read()515        return stuff516    def tearDown(self):517        sb.release()518        try:519            os.remove('foo/bar/test.log')520        except:521            pass522        try:523            os.rmdir('foo/bar')524        except:525            pass526        try:527            os.rmdir('foo')528        except:529            pass530        try:531            os.remove('logger_test.log')532        except:533            pass534        # Try to clean up the event channel, if it was created535        context = None536        # Do all application shutdown before calling the base class tearDown,537        # or failures will probably occur.538        scatest.CorbaTestCase.tearDown(self)539    def test_java_all_log_levels(self):540        all_log_levels(self)541    def test_java_reset_logger(self):542        reset_logger(self)543    def test_java_single_log_level(self):544        single_log_level(self)545    def test_java_selective_log_setting(self):546        selective_log_setting(self)547@scatest.requireLog4cxx548class CppHierarchicalLogging(scatest.CorbaTestCase):549    def setUp(self):550        self.cname = "logger"551    def readLogFile(self, filename):552        fp = open(filename,'r')553        stuff = fp.read()554        return stuff555    def tearDown(self):556        sb.release()557        try:558            os.remove('foo/bar/test.log')559        except:560            pass561        try:562            os.rmdir('foo/bar')563        except:564            pass565        try:566            os.rmdir('foo')567        except:568            pass569        try:570            os.remove('logger_test.log')571        except:572            pass573        # Try to clean up the event channel, if it was created574        context = None575        # Do all application shutdown before calling the base class tearDown,576        # or failures will probably occur.577        scatest.CorbaTestCase.tearDown(self)578    def test_access_log4cxx(self):579        self.comp = sb.launch(self.cname, properties={'LOGGING_CONFIG_URI':'file://'+os.getcwd()+'/high_thresh.cfg'})580        self.comp.start()581        loggers = self.comp.getNamedLoggers()582        self.assertTrue(self.cname+'_1.l4.access' in loggers)583        self.comp.setLogLevel(self.cname+'_1.l4.access', 'info')584        time.sleep(0.5)585        self.comp.stop()586        content = self.readLogFile('foo/bar/test.log')587        self.assertNotEqual(content.find('this is the log4cxx logger'), -1)588    def test_all_log_levels(self):589        all_log_levels(self)590    def test_reset_logger(self):591        reset_logger(self)592    def test_single_log_level(self):593        single_log_level(self)594    def test_selective_log_setting(self):595        selective_log_setting(self)596@scatest.requireLog4cxx597class CppDeviceHierarchicalLogging(scatest.CorbaTestCase):598    def setUp(self):599        self.cname = "log_test_cpp"600    def readLogFile(self, filename):601        fp = open(filename,'r')602        stuff = fp.read()603        return stuff604    def tearDown(self):605        sb.release()606        try:607            os.remove('foo/bar/test.log')608        except:609            pass610        try:611            os.rmdir('foo/bar')612        except:613            pass614        try:615            os.rmdir('foo')616        except:617            pass618        try:619            os.remove('logger_test.log')620        except:621            pass622        # Try to clean up the event channel, if it was created623        context = None624        # Do all application shutdown before calling the base class tearDown,625        # or failures will probably occur.626        scatest.CorbaTestCase.tearDown(self)627    def test_cpp_dev_all_log_levels(self):628        all_log_levels(self)629    def test_cpp_dev_reset_logger(self):630        reset_logger(self)631    def test_cpp_dev_single_log_level(self):632        single_log_level(self)633    def test_cpp_dev_selective_log_setting(self):634        selective_log_setting(self)635@scatest.requireJava636class JavaDeviceHierarchicalLogging(scatest.CorbaTestCase):637    def setUp(self):638        self.cname = "log_test_java"639    def readLogFile(self, filename):640        fp = open(filename,'r')641        stuff = fp.read()642        return stuff643    def tearDown(self):644        sb.release()645        try:646            os.remove('foo/bar/test.log')647        except:648            pass649        try:650            os.rmdir('foo/bar')651        except:652            pass653        try:654            os.rmdir('foo')655        except:656            pass657        try:658            os.remove('logger_test.log')659        except:660            pass661        # Try to clean up the event channel, if it was created662        context = None663        # Do all application shutdown before calling the base class tearDown,664        # or failures will probably occur.665        scatest.CorbaTestCase.tearDown(self)666    def test_java_dev_all_log_levels(self):667        all_log_levels(self)668    def test_java_dev_reset_logger(self):669        reset_logger(self)670    def test_java_dev_single_log_level(self):671        single_log_level(self)672    def test_java_dev_selective_log_setting(self):673        selective_log_setting(self)674class PyDeviceHierarchicalLogging(scatest.CorbaTestCase):675    def setUp(self):676        self.cname = "log_test_py"677    def readLogFile(self, filename):678        fp = open(filename,'r')679        stuff = fp.read()680        return stuff681    def tearDown(self):682        sb.release()683        try:684            os.remove('foo/bar/test.log')685        except:686            pass687        try:688            os.rmdir('foo/bar')689        except:690            pass691        try:692            os.rmdir('foo')693        except:694            pass695        try:696            os.remove('logger_test.log')697        except:698            pass699        # Try to clean up the event channel, if it was created700        context = None701        # Do all application shutdown before calling the base class tearDown,702        # or failures will probably occur.703        scatest.CorbaTestCase.tearDown(self)704    def test_py_dev_all_log_levels(self):705        all_log_levels(self)706    def test_py_dev_reset_logger(self):707        reset_logger(self)708    def test_py_dev_single_log_level(self):709        single_log_level(self)710    def test_py_dev_selective_log_setting(self):711        selective_log_setting(self)712@scatest.requireLog4cxx713class CppDeviceHierarchicalDomainLogging(scatest.CorbaTestCase):714    def setUp(self):715        domBooter, self._domMgr = self.launchDomainManager()716    def tearDown(self):717        scatest.CorbaTestCase.tearDown(self)718        # need to let event service clean up event channels719        # cycle period is 10 milliseconds720        time.sleep(0.1)721        try:722            os.remove('sdr/dev/nodes/log_test_cpp_override_node/tmp.dcd.xml')723        except:724            pass725    def test_domMgr_log_level(self):726        self._rhDom = redhawk.attach(scatest.getTestDomainName())727        current_level = self._rhDom._get_log_level()728        self.assertNotEqual(current_level, 5000)729        self.assertEquals(self._rhDom.getLogLevel('DomainManager'),current_level)730        self.assertEquals(self._rhDom.getLogLevel('DomainManager.AllocationManager'),current_level)731        self.assertEquals(self._rhDom.getLogLevel('DomainManager.ConnectionManager'),current_level)732        self.assertEquals(self._rhDom.getLogLevel('DomainManager.File'),current_level)733        self.assertEquals(self._rhDom.getLogLevel('DomainManager.FileManager'),current_level)734        self.assertEquals(self._rhDom.getLogLevel('DomainManager.proputils'),current_level)735        self.assertEquals(self._rhDom.getLogLevel('DomainManager.EventChannelManager'),current_level)736        self._rhDom._set_log_level(5000)737        current_level = self._rhDom._get_log_level()738        self.assertEquals(self._rhDom.getLogLevel('DomainManager'),current_level)739        self.assertEquals(self._rhDom.getLogLevel('DomainManager.AllocationManager'),current_level)740        self.assertEquals(self._rhDom.getLogLevel('DomainManager.ConnectionManager'),current_level)741        self.assertEquals(self._rhDom.getLogLevel('DomainManager.File'),current_level)742        self.assertEquals(self._rhDom.getLogLevel('DomainManager.FileManager'),current_level)743        self.assertEquals(self._rhDom.getLogLevel('DomainManager.proputils'),current_level)744        self.assertEquals(self._rhDom.getLogLevel('DomainManager.EventChannelManager'),current_level)745    def test_devMgr_cpp_access(self):746        self.cname = "log_test_cpp"747        self.devMgrAccess("/nodes/log_test_cpp_node/DeviceManager.dcd.xml")748    def test_devMgr_py_access(self):749        self.cname = "log_test_py"750        self.devMgrAccess("/nodes/log_test_py_node/DeviceManager.dcd.xml")751    @scatest.requireJava752    def test_devMgr_java_access(self):753        self.cname = "log_test_java"754        self.devMgrAccess("/nodes/log_test_java_node/DeviceManager.dcd.xml")755    def devMgrAccess(self, dcdfile):756        devBooter, self._devMgr = self.launchDeviceManager(dcdfile)757        self._rhDom = redhawk.attach(scatest.getTestDomainName())758        self.assertEquals(len(self._rhDom.devMgrs), 1)759        # Create Application from $SDRROOT path760        devMgr = self._rhDom.devMgrs[0]761        loggers = devMgr.getNamedLoggers()762        orig_loggers = {}763        orig_loggers[self.cname+'_1'] = devMgr.getLogLevel(self.cname+'_1')764        orig_loggers[self.cname+'_1.lower'] = devMgr.getLogLevel(self.cname+'_1.lower')765        orig_loggers[self.cname+'_1.namespace.lower'] = devMgr.getLogLevel(self.cname+'_1.namespace.lower')766        orig_loggers[self.cname+'_1.user.more_stuff'] = devMgr.getLogLevel(self.cname+'_1.user.more_stuff')767        orig_loggers[self.cname+'_1.user.some_stuff'] = devMgr.getLogLevel(self.cname+'_1.user.some_stuff')768        self.assertTrue(self.cname+'_1' in loggers)769        self.assertTrue(self.cname+'_1.lower' in loggers)770        self.assertTrue(self.cname+'_1.namespace.lower' in loggers)771        self.assertTrue(self.cname+'_1.system.PortSupplier' in loggers)772        self.assertTrue(self.cname+'_1.system.PropertySet' in loggers)773        self.assertTrue(self.cname+'_1.system.Resource' in loggers)774        self.assertTrue(self.cname+'_1.user.more_stuff' in loggers)775        self.assertTrue(self.cname+'_1.user.some_stuff' in loggers)776        self.assertTrue(self.cname+'_2' in loggers)777        self.assertTrue(self.cname+'_2.lower' in loggers)778        self.assertTrue(self.cname+'_2.namespace.lower' in loggers)779        self.assertTrue(self.cname+'_2.system.PortSupplier' in loggers)780        self.assertTrue(self.cname+'_2.system.PropertySet' in loggers)781        self.assertTrue(self.cname+'_2.system.Resource' in loggers)782        self.assertTrue(self.cname+'_2.user.more_stuff' in loggers)783        self.assertTrue(self.cname+'_2.user.some_stuff' in loggers)784        self.assertRaises(CF.UnknownIdentifier, devMgr.setLogLevel, self.cname+'_1.foo', 'all')785        self.assertRaises(CF.UnknownIdentifier, devMgr.getLogLevel, self.cname+'_1.foo')786        devMgr.setLogLevel(self.cname+'_1', 'all')787        self.assertEquals(devMgr.getLogLevel(self.cname+'_1'), CF.LogLevels.ALL)788        self.assertEquals(devMgr.getLogLevel(self.cname+'_1.lower'), CF.LogLevels.ALL)789        self.assertEquals(devMgr.getLogLevel(self.cname+'_1.namespace.lower'), CF.LogLevels.ALL)790        self.assertEquals(devMgr.getLogLevel(self.cname+'_1.user.more_stuff'), CF.LogLevels.ALL)791        self.assertEquals(devMgr.getLogLevel(self.cname+'_1.user.some_stuff'), CF.LogLevels.ALL)792        devMgr.setLogLevel(self.cname+'_1', 'off')793        self.assertEquals(devMgr.getLogLevel(self.cname+'_1'), CF.LogLevels.OFF)794        self.assertEquals(devMgr.getLogLevel(self.cname+'_1.lower'), CF.LogLevels.OFF)795        self.assertEquals(devMgr.getLogLevel(self.cname+'_1.namespace.lower'), CF.LogLevels.OFF)796        self.assertEquals(devMgr.getLogLevel(self.cname+'_1.user.more_stuff'), CF.LogLevels.OFF)797        self.assertEquals(devMgr.getLogLevel(self.cname+'_1.user.some_stuff'), CF.LogLevels.OFF)798        # break the level inheritance799        devMgr.setLogLevel(self.cname+'_1.user', 'trace')800        devMgr.setLogLevel(self.cname+'_1', 'all')801        self.assertEquals(devMgr.getLogLevel(self.cname+'_1'), CF.LogLevels.ALL)802        self.assertEquals(devMgr.getLogLevel(self.cname+'_1.lower'), CF.LogLevels.ALL)803        self.assertEquals(devMgr.getLogLevel(self.cname+'_1.namespace.lower'), CF.LogLevels.ALL)804        self.assertEquals(devMgr.getLogLevel(self.cname+'_1.user.more_stuff'), CF.LogLevels.TRACE)805        self.assertEquals(devMgr.getLogLevel(self.cname+'_1.user.some_stuff'), CF.LogLevels.TRACE)806        # set the log with a value rather than the string807        devMgr.setLogLevel(self.cname+'_1', CF.LogLevels.DEBUG)808        self.assertEquals(devMgr.getLogLevel(self.cname+'_1'), CF.LogLevels.DEBUG)809        self.assertEquals(devMgr.getLogLevel(self.cname+'_1.lower'), CF.LogLevels.DEBUG)810        self.assertEquals(devMgr.getLogLevel(self.cname+'_1.namespace.lower'), CF.LogLevels.DEBUG)811        self.assertEquals(devMgr.getLogLevel(self.cname+'_1.user.more_stuff'), CF.LogLevels.TRACE)812        self.assertEquals(devMgr.getLogLevel(self.cname+'_1.user.some_stuff'), CF.LogLevels.TRACE)813        devMgr.resetLog()814        self.assertEquals(orig_loggers[self.cname+'_1'], devMgr.getLogLevel(self.cname+'_1'))815        self.assertEquals(orig_loggers[self.cname+'_1.lower'], devMgr.getLogLevel(self.cname+'_1.lower'))816        self.assertEquals(orig_loggers[self.cname+'_1.namespace.lower'], devMgr.getLogLevel(self.cname+'_1.namespace.lower'))817        self.assertEquals(orig_loggers[self.cname+'_1.user.more_stuff'], devMgr.getLogLevel(self.cname+'_1.user.more_stuff'))818        self.assertEquals(orig_loggers[self.cname+'_1.user.some_stuff'], devMgr.getLogLevel(self.cname+'_1.user.some_stuff'))819        # verify that inheritance is re-established820        devMgr.setLogLevel(self.cname+'_1', 'all')821        self.assertEquals(CF.LogLevels.ALL, devMgr.getLogLevel(self.cname+'_1'))822        self.assertEquals(CF.LogLevels.ALL, devMgr.getLogLevel(self.cname+'_1.lower'))823        self.assertEquals(CF.LogLevels.ALL, devMgr.getLogLevel(self.cname+'_1.namespace.lower'))824        self.assertEquals(CF.LogLevels.ALL, devMgr.getLogLevel(self.cname+'_1.user.more_stuff'))825        self.assertEquals(CF.LogLevels.ALL, devMgr.getLogLevel(self.cname+'_1.user.some_stuff'))826    def test_devMgr_overload(self):827        fp = open('sdr/dev/nodes/log_test_cpp_override_node/DeviceManager.dcd.xml','r')828        dcd_contents = fp.read()829        fp.close()830        dcd_contents = dcd_contents.replace('@@@CWD@@@', os.getcwd())831        fp = open('sdr/dev/nodes/log_test_cpp_override_node/tmp.dcd.xml','w')832        fp.write(dcd_contents)833        fp.close()834        devBooter, self._devMgr = self.launchDeviceManager('/nodes/log_test_cpp_override_node/tmp.dcd.xml')835        devBooter_2, self._devMgr_2 = self.launchDeviceManager('/nodes/log_test_cpp_node/DeviceManager.dcd.xml')836        self._rhDom = redhawk.attach(scatest.getTestDomainName())837        self.assertEquals(len(self._rhDom.devMgrs), 2)838        # Create Application from $SDRROOT path839        _devMgr_o = None840        _devMgr = None841        for _d in self._rhDom.devMgrs:842            if _d.name == 'log_test_cpp_override_node':843                _devMgr_o = _d844            if _d.name == 'log_test_cpp_node':845                _devMgr = _d846        self.assertNotEqual(_devMgr, None)847        self.assertNotEqual(_devMgr_o, None)848        self.assertEquals(_devMgr_o.getLogLevel('log_test_cpp_1'), 40000)849        self.assertEquals(_devMgr_o.getLogLevel('log_test_cpp_2'), 30000)850        self.assertEquals(_devMgr.getLogLevel('log_test_cpp_1'), 40000)851        self.assertEquals(_devMgr.getLogLevel('log_test_cpp_2'), 40000)852    def test_devMgr_level_trace(self):853        devBooter, self._devMgr = self.launchDeviceManager('/nodes/node_device_deps/DeviceManager.dcd.xml.cpp', debug=5)854        self._rhDom = redhawk.attach(scatest.getTestDomainName())855        self.assertEquals(len(self._rhDom.devMgrs), 1)856        self.assertEquals(self._rhDom.devMgrs[0]._get_log_level(), CF.LogLevels.ALL) # CF ALL and CF TRACE overlap for compatibility reasons857        self.assertEquals(self._rhDom.devMgrs[0].devs[0]._get_log_level(), CF.LogLevels.ALL)858    def test_devMgr_level_debug(self):859        devBooter, self._devMgr = self.launchDeviceManager('/nodes/node_device_deps/DeviceManager.dcd.xml.cpp', debug=4)860        self._rhDom = redhawk.attach(scatest.getTestDomainName())861        self.assertEquals(len(self._rhDom.devMgrs), 1)862        self.assertEquals(self._rhDom.devMgrs[0]._get_log_level(), CF.LogLevels.DEBUG)863        self.assertEquals(self._rhDom.devMgrs[0].devs[0]._get_log_level(), CF.LogLevels.DEBUG)864    def test_devMgr_level_info(self):865        devBooter, self._devMgr = self.launchDeviceManager('/nodes/node_device_deps/DeviceManager.dcd.xml.cpp', debug=3)866        self._rhDom = redhawk.attach(scatest.getTestDomainName())867        self.assertEquals(len(self._rhDom.devMgrs), 1)868        self.assertEquals(self._rhDom.devMgrs[0]._get_log_level(), CF.LogLevels.INFO)869        self.assertEquals(self._rhDom.devMgrs[0].devs[0]._get_log_level(), CF.LogLevels.INFO)870    def test_devMgr_level_warn(self):871        devBooter, self._devMgr = self.launchDeviceManager('/nodes/node_device_deps/DeviceManager.dcd.xml.cpp', debug=2)872        self._rhDom = redhawk.attach(scatest.getTestDomainName())873        self.assertEquals(len(self._rhDom.devMgrs), 1)874        self.assertEquals(self._rhDom.devMgrs[0]._get_log_level(), CF.LogLevels.WARN)875        self.assertEquals(self._rhDom.devMgrs[0].devs[0]._get_log_level(), CF.LogLevels.WARN)876    def test_devMgr_level_error(self):877        devBooter, self._devMgr = self.launchDeviceManager('/nodes/node_device_deps/DeviceManager.dcd.xml.cpp', debug=1)878        self._rhDom = redhawk.attach(scatest.getTestDomainName())879        self.assertEquals(len(self._rhDom.devMgrs), 1)880        self.assertEquals(self._rhDom.devMgrs[0]._get_log_level(), CF.LogLevels.ERROR)881        self.assertEquals(self._rhDom.devMgrs[0].devs[0]._get_log_level(), CF.LogLevels.ERROR)882    def test_devMgr_level_fatal(self):883        devBooter, self._devMgr = self.launchDeviceManager('/nodes/node_device_deps/DeviceManager.dcd.xml.cpp', debug=0)884        self._rhDom = redhawk.attach(scatest.getTestDomainName())885        self.assertEquals(len(self._rhDom.devMgrs), 1)886        self.assertEquals(self._rhDom.devMgrs[0]._get_log_level(), CF.LogLevels.FATAL)887        self.assertEquals(self._rhDom.devMgrs[0].devs[0]._get_log_level(), CF.LogLevels.FATAL)888if __name__ == "__main__":889  # Run the unittests...dlm.py
Source:dlm.py  
1#!/usr/bin/python32# -*- coding: utf-8 -*-3import zlib, tempfile, io4from ._binarystream import _BinaryStream5from collections import OrderedDict6class InvalidDLMFile(Exception):7    def __init__(self, message):8        super(InvalidDLMFile, self).__init__(message)9        self.message = message10class DLM:11    def __init__(self, stream, key=None):12        if key == None:13            raise InvalidDLMFile("Map decryption key is empty.")14        self._stream = stream15        self._key = key16    def read(self):17        dlm_uncompressed = tempfile.TemporaryFile()18        dlm_uncompressed.write(zlib.decompress(self._stream.read()))19        dlm_uncompressed.seek(0)20        DLM_file_binary = _BinaryStream(dlm_uncompressed, True)21        map = Map(DLM_file_binary, self._key)22        map.read()23        dlm_uncompressed.close()24        return map.getObj()25    def write(self, obj):26        buffer = tempfile.TemporaryFile()27        buffer_stream = _BinaryStream(buffer, True)28        map = Map(buffer_stream, self._key)29        map.setObj(obj)30        map.write()31        buffer.seek(0)32        self._stream.write(zlib.compress(buffer.read()))33        buffer.close()34class Map:35    def __init__(self, raw, key):36        self._raw = raw37        self._key = key38        self._obj = OrderedDict()39        self.topArrowCell = []40        self.bottomArrowCell = []41        self.leftArrowCell = []42        self.rightArrowCell = []43    def raw(self):44        return self._raw45    def read(self):46        self._obj["header"] = self.raw().read_char()47        self._obj["mapVersion"] = self.raw().read_char()48        self._obj["mapId"] = self.raw().read_uint32()49        if self._obj["mapVersion"] >= 7:50            self._obj["encrypted"] = self.raw().read_bool()51            self._obj["encryptionVersion"] = self.raw().read_char()52            self.dataLen = self.raw().read_int32()53            if self._obj["encrypted"]:54                self.encryptedData = self.raw().read_bytes(self.dataLen)55                decryptedData = bytearray()56                for i in range(0, self.dataLen):57                    decryptedData.append(self.encryptedData[i] ^ ord(self._key[i % len(self._key)]))58                cleanData = io.BytesIO(decryptedData)59                self._raw = _BinaryStream(cleanData, True)60        self._obj["relativeId"] = self.raw().read_uint32()61        self._obj["mapType"] = self.raw().read_char()62        self._obj["subareaId"] = self.raw().read_int32()63        self._obj["topNeighbourId"] = self.raw().read_int32()64        self._obj["bottomNeighbourId"] = self.raw().read_int32()65        self._obj["leftNeighbourId"] = self.raw().read_int32()66        self._obj["rightNeighbourId"] = self.raw().read_int32()67        self._obj["shadowBonusOnEntities"] = self.raw().read_uint32()68        if self._obj["mapVersion"] >= 9:69            read_color = self.raw().read_int32()70            self._obj["backgroundAlpha"] = (read_color & 4278190080) >> 3271            self._obj["backgroundRed"] = (read_color & 16711680) >> 1672            self._obj["backgroundGreen"] = (read_color & 65280) >> 873            self._obj["backgroundBlue"] = read_color & 25574            read_color = self.raw().read_uint32()75            grid_alpha = (read_color & 4278190080) >> 3276            grid_red = (read_color & 16711680) >> 1677            grid_green = (read_color & 65280) >> 878            grid_blue = read_color & 25579            self._obj["gridColor"] = (grid_alpha & 255) << 32 | (grid_red & 255) << 16 | (80                        grid_green & 255) << 8 | grid_blue & 25581        elif self._obj["mapVersion"] >= 3:82            self._obj["backgroundRed"] = self.raw().read_char()83            self._obj["backgroundGreen"] = self.raw().read_char()84            self._obj["backgroundBlue"] = self.raw().read_char()85        self._obj["backgroundColor"] = (self._obj["backgroundRed"] & 255) << 16 | (86                    self._obj["backgroundGreen"] & 255) << 8 | self._obj["backgroundBlue"] & 25587        if self._obj["mapVersion"] >= 4:88            self._obj["zoomScale"] = self.raw().read_uint16() / 10089            self._obj["zoomOffsetX"] = self.raw().read_int16()90            self._obj["zoomOffsetY"] = self.raw().read_int16()91            if self._obj["zoomScale"] < 1:92                self._obj["zoomScale"] = 193                self._obj["zoomOffsetX"] = 094                self._obj["zoomOffsetY"] = 095        if self._obj["mapVersion"] > 10:96            self.raw().read_int32()97        self._obj["useLowPassFilter"] = self.raw().read_bool()98        self._obj["useReverb"] = self.raw().read_bool()99        if self._obj["useReverb"]:100            self._obj["presetId"] = self.raw().read_int32()101        else:102            self._obj["presetId"] = -1103        self._obj["backgroundsCount"] = self.raw().read_char()104        self._obj["backgroundFixtures"] = []105        for i in range(0, self._obj["backgroundsCount"]):106            bg = Fixture(self)107            bg.read()108            self._obj["backgroundFixtures"].append(bg.getObj())109        self._obj["foregroundsCount"] = self.raw().read_char()110        self._obj["foregroundsFixtures"] = []111        for i in range(0, self._obj["foregroundsCount"]):112            fg = Fixture(self)113            fg.read()114            self._obj["foregroundsFixtures"].append(fg.getObj())115        self.raw().read_int32()116        self._obj["groundCRC"] = self.raw().read_int32()117        self._obj["layersCount"] = self.raw().read_char()118        self._obj["layers"] = []119        for i in range(0, self._obj["layersCount"]):120            la = Layer(self, self._obj["mapVersion"])121            la.read()122            self._obj["layers"].append(la.getObj())123        self._obj["cellsCount"] = 560  # MAP_CELLS_COUNT124        self._obj["cells"] = []125        for i in range(0, self._obj["cellsCount"]):126            cd = CellData(self, i, self._obj["mapVersion"])127            cd.read()128            self._obj["cells"].append(cd.getObj())129    def write(self):130        output_stream = self._raw131        cleanData = io.BytesIO()  # tempfile.TemporaryFile()132        self._raw = _BinaryStream(cleanData, True)133        self.raw().write_uint32(self._obj["relativeId"])134        self.raw().write_char(self._obj["mapType"])135        self.raw().write_int32(self._obj["subareaId"])136        self.raw().write_int32(self._obj["topNeighbourId"])137        self.raw().write_int32(self._obj["bottomNeighbourId"])138        self.raw().write_int32(self._obj["leftNeighbourId"])139        self.raw().write_int32(self._obj["rightNeighbourId"])140        self.raw().write_int32(self._obj["shadowBonusOnEntities"])141        if self._obj["mapVersion"] >= 9:142            write_color = ((self._obj["backgroundAlpha"] << 32) & 4278190080 |143                           (self._obj["backgroundRed"] << 16) & 16711680 |144                           (self._obj["backgroundGreen"] << 8) & 65280 |145                           self._obj["backgroundBlue"] & 255)146            self.raw().write_int32(write_color)147            write_color = self._obj["gridColor"]148            self.raw().write_uint32(write_color)149        elif self._obj["mapVersion"] >= 3:150            self.raw().write_char(self._obj["backgroundRed"])151            self.raw().write_char(self._obj["backgroundGreen"])152            self.raw().write_char(self._obj["backgroundBlue"])153        if self._obj["mapVersion"] >= 4:154            self.raw().write_int16(self._obj["zoomScale"])155            self.raw().write_int16(self._obj["zoomOffsetX"])156            self.raw().write_int16(self._obj["zoomOffsetY"])157        self.raw().write_bool(self._obj["useLowPassFilter"])158        self.raw().write_bool(self._obj["useReverb"])159        if self._obj["useReverb"]:160            self.raw().write_int32(self._obj["presetId"])161        self.raw().write_char(self._obj["backgroundsCount"])162        for i in range(0, self._obj["backgroundsCount"]):163            self._obj["backgroundFixtures"][i].write()164        self.raw().write_char(self._obj["foregroundsCount"])165        for i in range(0, self._obj["foregroundsCount"]):166            self._obj["foregroundsFixtures"][i].write()167        self.raw().write_int32(self._obj["unknown_1"])168        self.raw().write_int32(self._obj["groundCRC"])169        self.raw().write_char(self._obj["layersCount"])170        for i in range(0, self._obj["layersCount"]):171            self._obj["layers"][i].write()172        for i in range(0, self._obj["cellsCount"]):173            self._obj["cells"][i].write()174        input_stram = self._raw175        self._raw = output_stream176        cleanData.seek(0)177        self.raw().write_char(self._obj["header"])178        self.raw().write_char(self._obj["mapVersion"])179        self.raw().write_int32(self._obj["mapId"])180        if self._obj["mapVersion"] >= 7:181            self.raw().write_bool(self._obj["encrypted"])182            self.raw().write_char(self._obj["encryptionVersion"])183            self.raw().write_int32(len(cleanData.getbuffer()))  # TODO: check len with getBuffer184            encryptedData = input_stram.read_bytes()185            for i in range(0, len(cleanData.getbuffer())):186                self.raw().write_uchar(encryptedData[i] ^ ord(self._key[i % len(self._key)][0]))187        else:188            self.raw().write_bytes(input_stram.read_bytes())189    def getObj(self):190        return self._obj191    def setObj(self, obj):192        self._obj = obj193        for i in range(0, self._obj["backgroundsCount"]):194            bg = Fixture(self)195            ce.setObj(self._obj["backgroundFixtures"][i])196            self._obj["backgroundFixtures"][i] = bg197        for i in range(0, self._obj["foregroundsCount"]):198            fg = Fixture(self)199            fg.setObj(self._obj["foregroundsFixtures"][i])200            self._obj["foregroundsFixtures"][i] = fg201        for i in range(0, self._obj["layersCount"]):202            la = Layer(self, self._obj["mapVersion"])203            la.setObj(self._obj["layers"][i])204            self._obj["layers"][i] = la205        for i in range(0, self._obj["cellsCount"]):206            ce = CellData(self, i, self._obj["mapVersion"])207            ce.setObj(self._obj["cells"][i])208            self._obj["cells"][i] = ce209class Fixture:210    def __init__(self, parrent):211        self._parrent = parrent212        self._obj = OrderedDict()213    def raw(self):214        return self._parrent.raw()215    def read(self):216        self._obj["fixtureId"] = self.raw().read_int32()217        self._obj["offsetX"] = self.raw().read_int16()218        self._obj["offsetY"] = self.raw().read_int16()219        self._obj["rotation"] = self.raw().read_int16()220        self._obj["xScale"] = self.raw().read_int16()221        self._obj["yScale"] = self.raw().read_int16()222        self._obj["redMultiplier"] = self.raw().read_char()223        self._obj["greenMultiplier"] = self.raw().read_char()224        self._obj["blueMultiplier"] = self.raw().read_char()225        self._obj["hue"] = self._obj["redMultiplier"] | self._obj["greenMultiplier"] | self._obj["blueMultiplier"]226        self._obj["alpha"] = self.raw().read_uchar()227    def write(self):228        self.raw().write_int32(self._obj["fixtureId"])229        self.raw().write_int16(self._obj["offsetX"])230        self.raw().write_int16(self._obj["offsetY"])231        self.raw().write_int16(self._obj["rotation"])232        self.raw().write_int16(self._obj["xScale"])233        self.raw().write_int16(self._obj["yScale"])234        self.raw().write_char(self._obj["redMultiplier"])235        self.raw().write_char(self._obj["greenMultiplier"])236        self.raw().write_char(self._obj["blueMultiplier"])237        self.raw().write_uchar(self._obj["alpha"])238    def getObj(self):239        return self._obj240    def setObj(self, obj):241        self._obj = obj242class Layer:243    def __init__(self, parrent, mapVersion):244        self._parrent = parrent245        self._obj = OrderedDict()246        self.mapVersion = mapVersion247    def raw(self):248        return self._parrent.raw()249    def read(self):250        if self.mapVersion >= 9:251            self._obj["layerId"] = self.raw().read_char()252        else:253            self._obj["layerId"] = self.raw().read_int32()254        self._obj["cellsCount"] = self.raw().read_int16()255        self._obj["cells"] = []256        for i in range(0, self._obj["cellsCount"]):257            ce = Cell(self, self.mapVersion)258            ce.read()259            self._obj["cells"].append(ce.getObj())260    def write(self):261        if self.mapVersion >= 9:262            self.raw().write_byte(self._obj["layerId"])263        else:264            self.raw().write_int32(self._obj["layerId"])265        self.raw().write_int16(self._obj["cellsCount"])266        for i in range(0, self._obj["cellsCount"]):267            self._obj["cells"][i].write()268    def getObj(self):269        return self._obj270    def setObj(self, obj):271        self._obj = obj272        for i in range(0, self._obj["cellsCount"]):273            ce = Cell(self, self.mapVersion)274            ce.setObj(self._obj["cells"][i])275            self._obj["cells"][i] = ce276class Cell:277    def __init__(self, parrent, mapVersion):278        self._parrent = parrent279        self._obj = OrderedDict()280        self.mapVersion = mapVersion281    def raw(self):282        return self._parrent.raw()283    def read(self):284        self._obj["cellId"] = self.raw().read_int16()285        self._obj["elementsCount"] = self.raw().read_int16()286        self._obj["elements"] = []287        for i in range(0, self._obj["elementsCount"]):288            el = BasicElement().GetElementFromType(self, self.raw().read_char(), self.mapVersion)289            el.read()290            self._obj["elements"].append(el.getObj())291    def write(self):292        self.raw().write_int16(self._obj["cellId"])293        self.raw().write_int16(self._obj["elementsCount"])294        for i in range(0, self._obj["elementsCount"]):295            if self._obj["elements"][i]._obj["elementName"] == "Graphical":296                self.raw().write_char(2)297            elif self._obj["elements"][i]._obj["elementName"] == "Sound":298                self.raw().write_char(33)299            else:300                raise InvalidDLMFile("Invalid element type.")301            self._obj["elements"][i].write()302    def getObj(self):303        return self._obj304    def setObj(self, obj):305        self._obj = obj306        for i in range(0, self._obj["elementsCount"]):307            if self._obj["elements"][i]["elementName"] == "Graphical":308                el = GraphicalElement(self, self.mapVersion)309            elif self._obj["elements"][i]["elementName"] == "Sound":310                el = SoundElement(self, self.mapVersion)311            else:312                raise InvalidDLMFile("Invalid element type.")313            el.setObj(self._obj["elements"][i])314            self._obj["elements"][i] = el315class CellData:316    def __init__(self, parrent, id, mapVersion):317        self._parrent = parrent318        self._obj = OrderedDict()319        self.cellId = id320        self.mapVersion = mapVersion321    def raw(self):322        return self._parrent.raw()323    def read(self):324        self._obj["floor"] = self.raw().read_char() * 10325        if self._obj["floor"] == -1280:326            return327        if self.mapVersion >= 9:328            tmp_bytes = self.raw().read_int16()329            self._obj["mov"] = (tmp_bytes & 1) == 0330            self._obj["nonWalkableDuringFight"] = (tmp_bytes & 2) != 0331            self._obj["nonWalkableDuringRP"] = (tmp_bytes & 4) != 0332            self._obj["los"] = (tmp_bytes & 8) == 0333            self._obj["blue"] = (tmp_bytes & 16) != 0334            self._obj["red"] = (tmp_bytes & 32) != 0335            self._obj["visible"] = (tmp_bytes & 64) != 0336            self._obj["farmCell"] = (tmp_bytes & 128) != 0337            if self.mapVersion >= 10:338                self._obj["havenbagCell"] = (tmp_bytes & 256) != 0339                top_arrow = (tmp_bytes & 512) != 0340                bottom_arrow = (tmp_bytes & 1024) != 0341                right_arrow = (tmp_bytes & 2048) != 0342                left_arrow = (tmp_bytes & 4096) != 0343            else:344                top_arrow = (tmp_bytes & 256) != 0345                bottom_arrow = (tmp_bytes & 512) != 0346                right_arrow = (tmp_bytes & 1024) != 0347                left_arrow = (tmp_bytes & 2048) != 0348            if top_arrow:349                self._parrent.topArrowCell.append(self.cellId)350            if bottom_arrow:351                self._parrent.bottomArrowCell.append(self.cellId)352            if right_arrow:353                self._parrent.rightArrowCell.append(self.cellId)354            if left_arrow:355                self._parrent.leftArrowCell.append(self.cellId)356        else:357            self._obj["losmov"] = self.raw().read_uchar()358            self._obj["los"] = (self._obj["losmov"] & 2) >> 1 == 1359            self._obj["mov"] = (self._obj["losmov"] & 1) == 1360            self._obj["visible"] = (self._obj["losmov"] & 64) >> 6 == 1361            self._obj["farmCell"] = (self._obj["losmov"] & 32) >> 5 == 1362            self._obj["blue"] = (self._obj["losmov"] & 16) >> 4 == 1363            self._obj["red"] = (self._obj["losmov"] & 8) >> 3 == 1364            self._obj["nonWalkableDuringRP"] = (self._obj["losmov"] & 128) >> 7 == 1365            self._obj["nonWalkableDuringFight"] = (self._obj["losmov"] & 4)366        self._obj["speed"] = self.raw().read_char()367        self._obj["mapChangeData"] = self.raw().read_char()368        if self.mapVersion > 5:369            self._obj["moveZone"] = self.raw().read_uchar()370            if self.mapVersion > 10 and (self.hasLinkedZoneFight() or self.hasLinkedZoneRp()):371                self.raw().read_uchar()372        if self.mapVersion > 7 and self.mapVersion < 9:373            self._obj["tmpBits"] = self.raw().read_char()374            self.arrow = 15 & self._obj["tmpBits"]375            if self.useTopArrow():376                self._parrent.topArrowCell.append(self.cellId)377            if self.useBottomArrow():378                self._parrent.bottomArrowCell.append(self.cellId)379            if self.useLeftArrow():380                self._parrent.leftArrowCell.append(self.cellId)381            if self.useRightArrow():382                self._parrent.rightArrowCell.append(self.cellId)383    def write(self):384        if self._obj["floor"] == -1280:385            return386        self.raw().write_char(self._obj["floor"])387        if self.mapVersion >= 9:388            tmp_bytes = self.raw().read_int16()389            tmp_bytes |= 1 if self._obj["mov"] else 0390            tmp_bytes |= 2 if self._obj["nonWalkableDuringFight"] else 0391            tmp_bytes |= 4 if self._obj["nonWalkableDuringRP"] else 0392            tmp_bytes |= 8 if self._obj["los"] else 0393            tmp_bytes |= 16 if self._obj["blue"] else 0394            tmp_bytes |= 32 if self._obj["red"] else 0395            tmp_bytes |= 64 if self._obj["visible"] else 0396            tmp_bytes |= 128 if self._obj["farmCell"] else 0397            if self.mapVersion >= 10:398                tmp_bytes |= 256 if self._obj["havenbagCell"] else 0399            self.raw().write_int16(tmp_bytes)400        else:401            self.raw().write_uchar(self._obj["losmov"])402        self.raw().write_char(self._obj["speed"])403        self.raw().write_uchar(self._obj["mapChangeData"])404        if self.mapVersion > 5:405            self.raw().write_uchar(self._obj["moveZone"])406        if self.mapVersion > 7 and self.mapVersion < 9:407            self.raw().write_char(self._obj["tmpBits"])408    def getObj(self):409        return self._obj410    def setObj(self, obj):411        self._obj = obj412    def useTopArrow(self):413        if (self.arrow & 1) == 0:414            return False415        else:416            return True417    def useBottomArrow(self):418        if (self.arrow & 2) == 0:419            return False420        else:421            return True422    def useLeftArrow(self):423        if (self.arrow & 4) == 0:424            return False425        else:426            return True427    def useRightArrow(self):428        if (self.arrow & 8) == 0:429            return False430        else:431            return True432    def hasLinkedZoneRp(self):433        if self._obj["mov"] and not self._obj["farmCell"]:434            return True435        else:436            return False437    def hasLinkedZoneFight(self):438        if self._obj["mov"] and not self._obj["nonWalkableDuringFight"] and not self._obj["farmCell"] and self._obj["havenbagCell"]:439            return True440        else:441            return False442class BasicElement:443    def GetElementFromType(self, parrent, type, mapVersion):444        if type == 2:  # GRAPHICAL445            return GraphicalElement(parrent, mapVersion)446        elif type == 33:  # SOUND447            return SoundElement(parrent, mapVersion)448        else:449            raise InvalidDLMFile("Invalid element type.")450class GraphicalElement:451    def __init__(self, parrent, mapVersion):452        self._parrent = parrent453        self._obj = OrderedDict()454        self.mapVersion = mapVersion455        self._obj["elementName"] = "Graphical"456    def raw(self):457        return self._parrent.raw()458    def read(self):459        self._obj["elementId"] = self.raw().read_uint32()460        # hue461        self._obj["hue_1"] = self.raw().read_char()462        self._obj["hue_2"] = self.raw().read_char()463        self._obj["hue_3"] = self.raw().read_char()464        # shadow465        self._obj["shadow_1"] = self.raw().read_char()466        self._obj["shadow_2"] = self.raw().read_char()467        self._obj["shadow_3"] = self.raw().read_char()468        if self.mapVersion <= 4:469            self._obj["offsetX"] = self.raw().read_char()470            self._obj["offsetY"] = self.raw().read_char()471        else:472            self._obj["offsetX"] = self.raw().read_int16()473            self._obj["offsetY"] = self.raw().read_int16()474        self._obj["altitude"] = self.raw().read_char()475        self._obj["identifier"] = self.raw().read_uint32()476    def write(self):477        self.raw().write_uint32(self._obj["elementId"])478        self.raw().write_char(self._obj["hue_1"])479        self.raw().write_char(self._obj["hue_2"])480        self.raw().write_char(self._obj["hue_3"])481        self.raw().write_char(self._obj["shadow_1"])482        self.raw().write_char(self._obj["shadow_2"])483        self.raw().write_char(self._obj["shadow_3"])484        if self.mapVersion <= 4:485            self.raw().write_char(self._obj["offsetX"])486            self.raw().write_char(self._obj["offsetY"])487        else:488            self.raw().write_int16(self._obj["offsetX"])489            self.raw().write_int16(self._obj["offsetY"])490        self.raw().write_char(self._obj["altitude"])491        self.raw().write_uint32(self._obj["identifier"])492    def getObj(self):493        return self._obj494    def setObj(self, obj):495        self._obj = obj496class SoundElement:497    def __init__(self, parrent, mapVersion):498        self._parrent = parrent499        self._obj = OrderedDict()500        self.mapVersion = mapVersion501        self._obj["elementName"] = "Sound"502    def raw(self):503        return self._parrent.raw()504    def read(self):505        self._obj["soundId"] = self.raw().read_int32()506        self._obj["baseVolume"] = self.raw().read_int16()507        self._obj["fullVolumeDistance"] = self.raw().read_int32()508        self._obj["nullVolumeDistance"] = self.raw().read_int32()509        self._obj["minDelayBetweenLoops"] = self.raw().read_int16()510        self._obj["maxDelayBetweenLoops"] = self.raw().read_int16()511    def write(self):512        self.raw().write_int32(self._obj["soundId"])513        self.raw().write_int16(self._obj["baseVolume"])514        self.raw().write_int32(self._obj["fullVolumeDistance"])515        self.raw().write_int32(self._obj["nullVolumeDistance"])516        self.raw().write_int16(self._obj["minDelayBetweenLoops"])517        self.raw().write_int16(self._obj["maxDelayBetweenLoops"])518    def getObj(self):519        return self._obj520    def setObj(self, obj):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
