Best Python code snippet using autotest_python
wb_admin_server_status.py
Source:wb_admin_server_status.py  
1# Copyright (c) 2007, 2015, Oracle and/or its affiliates. All rights reserved.2#3# This program is free software; you can redistribute it and/or4# modify it under the terms of the GNU General Public License as5# published by the Free Software Foundation; version 2 of the6# License.7#8# This program is distributed in the hope that it will be useful,9# but WITHOUT ANY WARRANTY; without even the implied warranty of10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the11# GNU General Public License for more details.12#13# You should have received a copy of the GNU General Public License14# along with this program; if not, write to the Free Software15# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA16# 02110-1301  USA17from __future__ import with_statement18from workbench.log import log_info, log_error, log_warning, log_debug319from workbench.utils import format_duration20from workbench.db_utils import QueryError21import mforms22import time23import wb_admin_monitor24def stradd(table, y, label, value):25    t = mforms.newLabel(label)26    table.add(t, 0, 1, y, y+1, mforms.HFillFlag)27    t = mforms.newLabel(value)28    t.set_style(mforms.BoldStyle)29    t.set_color("#555555")30    table.add(t, 1, 2, y, y+1, mforms.HFillFlag)31    return t32class StateIcon(mforms.Box):33    on_icon = None34    off_icon = None35    def __init__(self):36        mforms.Box.__init__(self, True)37        self.set_release_on_add()38        self.set_managed()39        if not self.on_icon:40            self.on_icon = mforms.App.get().get_resource_path("mysql-status-on.png")41            self.off_icon = mforms.App.get().get_resource_path("mysql-status-off.png")42        self.set_spacing(8)43        self.image = mforms.newImageBox()44        self.image.set_image(self.off_icon)45        self.add(self.image, False, True)46        self.label = mforms.newLabel("n/a")47        self.add(self.label, False, True)48        self.text = None49    def set_state(self, state):50        if state:51            self.image.set_image(self.on_icon)52            self.label.set_text("On")53        elif state is None:54            self.image.set_image(self.off_icon)55            self.label.set_text("n/a")56        else:57            self.image.set_image(self.off_icon)58            self.label.set_text("Off")59    def set_text(self, text):60        if not self.text:61            self.text = mforms.newLabel(text)62            self.text.set_style(mforms.BoldStyle)63            self.text.set_color("#555555")64            self.add(self.text, False, True)65        else:66            self.text.set_text(text)67class ConnectionInfo(mforms.Box):68    def __init__(self, owner):69        mforms.Box.__init__(self, True)70        self.set_release_on_add()71        self.set_managed()72        73        self.owner = owner74        self.set_spacing(35)75        self.icon = mforms.newImageBox()76        self.icon.set_image(mforms.App.get().get_resource_path("mysql-logo-00.png"))77        self.add(self.icon, False, True)78        vbox = mforms.newBox(False)79        self.vbox = vbox80        self.add(vbox, True, True)81        vbox.set_spacing(2)82        vbox.add(mforms.newLabel("Connection Name"), False, True)83        self.connection_name = mforms.newLabel("?")84        self.connection_name.set_style(mforms.VeryBigStyle)85        vbox.add(self.connection_name, False, True)86        self.info_table = None87    def update(self, ctrl_be):88        self.suspend_layout()89        self.connection_name.set_text(ctrl_be.server_profile.name)90        info = ctrl_be.server_variables91        status = ctrl_be.status_variables92        if self.info_table:93            self.vbox.remove(self.info_table)94        self.info_table = mforms.newTable()95        self.info_table.set_column_count(2)96        self.info_table.set_row_count(8)97        self.info_table.set_column_spacing(18)98        self.info_table.set_row_spacing(5)99        self.vbox.add(self.info_table, True, True)100        stradd(self.info_table, 0, "\nHost:", "\n"+info.get("hostname", "n/a"))101        stradd(self.info_table, 1, "Socket:", info.get("socket", "n/a"))102        stradd(self.info_table, 2, "Port:", info.get("port", "n/a"))103        stradd(self.info_table, 3, "Version:", "%s\n%s" % (info.get("version", "n/a"), info.get("version_comment", "")))104        stradd(self.info_table, 4, "Compiled For:", "%s   (%s)" % (info.get("version_compile_os", "n/a"), info.get("version_compile_machine", "n/a")))105        stradd(self.info_table, 5, "Configuration File:", ctrl_be.server_profile.config_file_path or "unknown")106        uptime = status.get("Uptime", None)107        if uptime:108            uptime = long(uptime)109            stradd(self.info_table, 6, "Running Since:", "%s (%s)" % (time.ctime(ctrl_be.status_variables_time-uptime), format_duration(uptime, True)))110        else:111            stradd(self.info_table, 6, "Running Since:", "n/a")112        box = mforms.newBox(True)113        refresh = mforms.newButton()114        refresh.set_text("Refresh")115        refresh.set_tooltip("Refresh server status information")116        refresh.add_clicked_callback(self.owner.refresh_status)117        box.add(refresh, False, False)118        self.info_table.add(box, 1, 2, 7, 8, 0)119        version = ctrl_be.target_version120        if version and info:121            icon = mforms.App.get().get_resource_path("mysql-logo-%i%i.png" % (version.majorNumber, version.minorNumber))122            if icon:123                self.icon.set_image(icon)124        self.resume_layout()125#===============================================================================126#127#===============================================================================128class WbAdminServerStatus(mforms.Box):129    status      = None130    connections = None131    _update_timeout = None132    @classmethod133    def wba_register(cls, admin_context):134        admin_context.register_page(cls, "wba_management", "Server Status", False)135    @classmethod136    def identifier(cls):137        return "admin_server_status"138    #---------------------------------------------------------------------------139    def __init__(self, ctrl_be, server_profile, main_view):140        mforms.Box.__init__(self, True)141        self.set_managed()142        self.set_release_on_add()143        self.ui_created = False144        self.set_spacing(24)145        self.ctrl_be = ctrl_be146        self.server_profile = server_profile147        self.main_view = main_view148        lbox = mforms.newBox(False)149        self.add(lbox, True, True)150        self.connection_info = ConnectionInfo(self)151        self.connection_info.set_padding(24)152        lbox.add(self.connection_info, False, True)153        self.scrollbox = mforms.newScrollPanel(mforms.ScrollPanelDrawBackground)154        self.scrollbox.set_padding(24)155        self.content = mforms.newBox(False)156        self.content.set_padding(20)157        self.content.set_spacing(4)158        self.scrollbox.add(self.content)159        lbox.add(self.scrollbox, True, True)160        image = mforms.newImageBox()161        if self.server_profile.host_os == "linux":162            image.set_image(mforms.App.get().get_resource_path("mysql-status-separator-linux.png"))163        else:164          image.set_image(mforms.App.get().get_resource_path("mysql-status-separator.png"))165        image.set_image_align(mforms.MiddleCenter)166        self.add(image, False, True)167        self.status = wb_admin_monitor.WbAdminMonitor(server_profile, self.ctrl_be)168        self.status.set_size(360, -1)169        self.status.set_padding(0, 24, 24, 24)170        self.add(self.status, False, False)171        self.controls = {}172        self.currently_started = None173        self.ctrl_be.add_me_for_event("server_started", self)174        self.ctrl_be.add_me_for_event("server_offline", self)175        self.ctrl_be.add_me_for_event("server_stopped", self)176        self.connection_info.update(self.ctrl_be)177    #---------------------------------------------------------------------------178    def server_started_event(self):179        if self.currently_started != True:180            self.refresh("started")181            self.currently_started = True182            if not self._update_timeout:183                self._update_timeout = mforms.Utilities.add_timeout(0.5, self.update)184    def server_offline_event(self):185        if self.currently_started != True:186            self.refresh("offline")187            self.currently_started = True188            if not self._update_timeout:189                self._update_timeout = mforms.Utilities.add_timeout(0.5, self.update)190    #---------------------------------------------------------------------------191    def server_stopped_event(self):192        if self.currently_started != False:193            self.refresh("stopped")194            self.currently_started = False195            if not self._update_timeout:196                self._update_timeout = mforms.Utilities.add_timeout(0.5, self.update)197    #---------------------------------------------------------------------------198    def refresh(self, status):199        self.status.refresh_status(status)200    #---------------------------------------------------------------------------201    def refresh_status(self):202        if not self._update_timeout:203            status = self.ctrl_be.force_check_server_state()204            if (status == "running" or not status  or status == "offline" ) and self.currently_started:205                self.ctrl_be.query_server_info()206            self._update_timeout = mforms.Utilities.add_timeout(0.5, self.update)207    #---------------------------------------------------------------------------208    def page_activated(self):209        self.suspend_layout()210        try:211            if not self.ui_created:212                self.create_info_sections()213                self.ui_created = True214                if not self._update_timeout:215                    self._update_timeout = mforms.Utilities.add_timeout(0.5, self.update)216        finally:217            self.resume_layout()218        if self.currently_started is None:219            if self.ctrl_be.is_server_running() == "running":220                self.server_started_event()221            elif self.ctrl_be.is_server_running() == "offline":222                self.server_offline_event()223            else:224                self.server_stopped_event()225        else:226            self.ctrl_be.query_server_info()227        self.refresh(self.ctrl_be.is_server_running())228    def update(self):229        self._update_timeout = None230        self.connection_info.update(self.ctrl_be)231        self.status.refresh_status(self.ctrl_be.is_server_running(verbose=False))232        info = self.ctrl_be.server_variables233        status = self.ctrl_be.status_variables234        plugins = dict(self.ctrl_be.server_active_plugins) # plugin -> type235        repl_error = None236        res = None237        try:238            res = self.ctrl_be.exec_query("SHOW SLAVE STATUS")239        except QueryError, e:240            if e.error == 1227:241                repl_error = "Insufficient privileges to view slave status"242            else:243                repl_error = "Error querying status: %s" % str(e)244        repl = {}245        if res and res.nextRow():246            for field in ["Slave_IO_State", "Master_Host"]:247                repl[field] = res.stringByName(field)248        disk_space = "unable to retrieve"249        if self.ctrl_be.server_control and info.get("datadir"):250            disk_space = self.ctrl_be.server_helper.get_available_space(info.get("datadir"))251        # Update the controls in the UI252        self.suspend_layout()253        self.controls["Disk Space in Data Dir:"][0].set_text(disk_space)254        table = self.controls["Replication Slave"][0]255        if repl:256            table.remove(self.controls[""][0])257            self.setup_info_table(table,258                                  [("Slave IO State:", repl.get("Slave_IO_State")),259                                   ("Master Host:", repl.get("Master_Host")),260                                   ("GTID Mode:", info.get("gtid_mode"))],261                                  plugins)262        else:263            self.controls[""][0].set_text(repl_error or "this server is not a slave in a replication setup")264        table.relayout()265        for key, (control, value_source) in self.controls.items():266            if callable(value_source):267                if isinstance(control, mforms.Label):268                    resp = value_source(info, plugins, status)269                    control.set_text(resp if resp else "n/a")270                else:271                    value = value_source(info, plugins, status)272                    if type(value) is tuple:273                        control.set_state(value[0])274                        if value[0] and value[1]:275                            control.set_text(value[1])276                    else:277                        control.set_state(value)278        self.resume_layout()279        mforms.Utilities.driver_shutdown()280        return False281    def create_info_sections(self):282        info = self.ctrl_be.server_variables283        status = self.ctrl_be.status_variables284        plugins = dict(self.ctrl_be.server_active_plugins) # plugin -> type285        repl = {}286        disk_space = "checking..."287        def tristate(value, true_value = None):288            if true_value is not None and value == true_value:289                return True290            if value == "OFF" or value == "NO":291                return False292            elif value and true_value is None:293                return True294            return None295        semi_sync_master = tristate(info.get("rpl_semi_sync_master_enabled"))296        semi_sync_slave = tristate(info.get("rpl_semi_sync_slave_enabled"))297        semi_sync_status = (semi_sync_master or semi_sync_slave, "(%s)"% ", ".join([x for x in [semi_sync_master and "master", semi_sync_slave and "slave"] if x]))298        memcached_status = True if plugins.has_key('daemon_memcached') else None299        300        if not repl:301            if semi_sync_master:302                semi_sync_master = False303            if semi_sync_slave:304                semi_sync_slave = False305        # the params to be passed to the lambdas306        params = (info, plugins, status)307        self.add_info_section_2("Available Server Features",308                              [("Performance Schema:", lambda info, plugins, status: tristate(info.get("performance_schema"))),309                               ("Thread Pool:", lambda info, plugins, status: tristate(info.get("thread_handling"), "loaded-dynamically")),310                               ("Memcached Plugin:", lambda info, plugins, status: memcached_status),311                               ("Semisync Replication Plugin:", lambda info, plugins, status: semi_sync_status),312                               ("SSL Availability:", lambda info, plugins, status: info.get("have_openssl") == "YES" or info.get("have_ssl") == "YES"),313                               ("Windows Authentication:", lambda info, plugins, status: plugins.has_key("authentication_windows")) if self.server_profile.target_is_windows else ("PAM Authentication:", lambda info, plugins, status: plugins.has_key("authentication_pam")),314                               ("Password Validation:", lambda info, plugins, status: (tristate(info.get("validate_password_policy")), "(Policy: %s)" % info.get("validate_password_policy"))),315                               ("Audit Log:", lambda info, plugins, status: (tristate(info.get("audit_log_policy")), "(Log Policy: %s)" % info.get("audit_log_policy"))),316                               ("Firewall:", lambda info, plugins, status: tristate(info.get("mysql_firewall_mode"))),317                               ("Firewall Trace:", lambda info, plugins, status: tristate(info.get("mysql_firewall_trace")))],318                                params)319        log_output = info.get("log_output", "FILE")320        self.add_info_section("Server Directories",321                              [("Base Directory:", lambda info, plugins, status: info.get("basedir")),322                               ("Data Directory:", lambda info, plugins, status: info.get("datadir")),323                               ("Disk Space in Data Dir:", disk_space),324                               ("InnoDB Data Directory:", lambda info, plugins, status: info.get("innodb_data_home_dir")) if info.get("innodb_data_home_dir") else None,325                               ("Plugins Directory:", lambda info, plugins, status: info.get("plugin_dir")),326                               ("Tmp Directory:", lambda info, plugins, status: info.get("tmpdir")),327                               ("Error Log:", lambda info, plugins, status: (info.get("log_error") and info.get("log_error")!="OFF", info.get("log_error"))),328                               ("General Log:", lambda info, plugins, status: (info.get("general_log")!="OFF" and log_output != "NONE", info.get("general_log_file") if "FILE" in log_output else "[Stored in database]")),329                               ("Slow Query Log:", lambda info, plugins, status: (info.get("slow_query_log")!="OFF" and log_output != "NONE", info.get("slow_query_log_file") if "FILE" in log_output else "[Stored in database]"))],330                              params)331        self.add_info_section("Replication Slave",332                              [("", "checking...")],333                              params)334        self.add_info_section("Authentication",335                              [("SHA256 password private key:", lambda info, plugins, status: info.get("sha256_password_private_key_path")),336                               ("SHA256 password public key:", lambda info, plugins, status: info.get("sha256_password_public_key_path"))],337                              params)338        self.add_info_section("SSL",339                              [("SSL CA:", lambda info, plugins, status: info.get("ssl_ca") or "n/a"),340                               ("SSL CA path:", lambda info, plugins, status: info.get("ssl_capath") or "n/a"),341                               ("SSL Cert:", lambda info, plugins, status: info.get("ssl_cert") or "n/a"),342                               ("SSL Cipher:", lambda info, plugins, status: info.get("ssl_cipher") or "n/a"),343                               ("SSL CRL:", lambda info, plugins, status: info.get("ssl_crl") or "n/a"),344                               ("SSL CRL path:", lambda info, plugins, status: info.get("ssl_crlpath") or "n/a"),345                               ("SSL Key:", lambda info, plugins, status: info.get("ssl_key") or "n/a")],346                              params)347        log_debug3("mysql_firewall_trace: %s\n" % info.get("mysql_firewall_trace"))348        log_debug3("Firewall_access_denied: %s\n" % status.get("Firewall_access_denied"))349        log_debug3("Firewall_access_granted: %s\n" % status.get("Firewall_access_granted"))350        log_debug3("Firewall_cached_entries: %s\n" % status.get("Firewall_cached_entries"))351        if info.get("mysql_firewall_mode") == "ON":352            self.add_info_section("Firewall",353                                  [("Access denied:", lambda info, plugins, status: str(status.get("Firewall_access_denied")) or "n/a"),354                                  ("Access granted:", lambda info, plugins, status: str(status.get("Firewall_access_granted")) or "n/a"),355                                  ("Access suspicious:", lambda info, plugins, status: str(status.get("Firewall_access_suspicious")) or "n/a"),356                                  ("Cached entries:", lambda info, plugins, status: str(status.get("Firewall_cached_entries")) or "n/a")],357                                  params)358    def add_info_section_2(self, title, info, params):359        label = mforms.newLabel(title)360        label.set_style(mforms.BigBoldStyle)361        label.set_color("#5f5f5f")362        self.content.add(label, False, True)363        sep = mforms.newBox(False)364        sep.set_back_color("#b2b2b2")365        sep.set_size(-1, 1)366        self.content.add(sep, False, True)367        hbox = mforms.newBox(True)368        info_table = self.make_info_table(info[:len(info)/2], params)369        hbox.add(info_table, True, True)370        info_table = self.make_info_table(info[len(info)/2:], params)371        hbox.add(info_table, True, True)372        self.content.add(hbox, False, True)373    def add_info_section(self, title, info, params):374        label = mforms.newLabel(title)375        label.set_style(mforms.BigBoldStyle)376        label.set_color("#5f5f5f")377        self.content.add(label, False, True)378        sep = mforms.newBox(False)379        sep.set_back_color("#b2b2b2")380        sep.set_size(-1, 1)381        self.content.add(sep, False, True)382        info_table = self.make_info_table([x for x in info if x], params)383        self.content.add(info_table, False, True)384        self.controls[title] = (info_table, None)385    def make_info_table(self, info, params):386        info_table = mforms.newTable()387        info_table.set_column_spacing(8)388        info_table.set_row_spacing(6)389        info_table.set_column_count(2)390        return self.setup_info_table(info_table, info, params)391    def setup_info_table(self, info_table, info, params):392        info_table.set_row_count(len(info)+1)393        for i, item in enumerate(info):394            (label, value_source) = item395            if callable(value_source):396                value = value_source(*params)397            else:398                value = value_source399            if self.controls.has_key(label):400                info_table.remove(self.controls[label][0])401            else:402                info_table.add(mforms.newLabel(label), 0, 1, i, i+1, mforms.HFillFlag)403            if type(value) is bool or value is None:404                b = StateIcon()405                b.set_state(value)406                info_table.add(b, 1, 2, i, i+1, mforms.HFillFlag|mforms.HExpandFlag)407                self.controls[label] = (b, value_source)408            elif type(value) is tuple:409                b = StateIcon()410                b.set_state(value[0])411                if value[0] and value[1]:412                    b.set_text(value[1])413                info_table.add(b, 1, 2, i, i+1, mforms.HFillFlag|mforms.HExpandFlag)414                self.controls[label] = (b, value_source)415            else:416                l2 = mforms.newLabel(value or "")417                l2.set_style(mforms.BoldStyle)418                l2.set_color("#1c1c1c")419                info_table.add(l2, 1, 2, i, i+1, mforms.HFillFlag|mforms.HExpandFlag)420                self.controls[label] = (l2, value_source)421        info_table.add(mforms.newLabel(""), 0, 1, len(info), len(info)+1, mforms.HFillFlag) # blank space422        return info_table423    #---------------------------------------------------------------------------424    def page_deactivated(self):425        pass426    427    #---------------------------------------------------------------------------428    def shutdown(self):429        if self._update_timeout:430            mforms.Utilities.cancel_timeout(self._update_timeout)431            self._update_timeout = None...__init__.py
Source:__init__.py  
...35        self._reader = None36    def _do_timeout(self):37        self._timeout = None38        self._session.do_timeout()39        self._update_timeout()40    def _cancel_timeout(self):41        if self._timeout:42            self._timeout.cancel()43            self._timeout = None44    def _update_timeout(self):45        self._cancel_timeout()46        timeout = self._session.timeout()47        if timeout is not None:48            self._timeout = reactor.callLater(timeout, self._do_timeout)49    def open(self):50        self._session.open()51        self._reader = SnmpReader(self._session)52        reactor.addReader(self._reader)53        self._update_timeout()54    def close(self):55        self._cancel_timeout()56        reactor.removeReader(self._reader)57        self._reader = None58        self._session.close()59    def _done(self, result, deferred):60        def fire():61            self._update_timeout()62            if isinstance(result, netsnmp.SnmpTimeout):63                deferred.errback(error.TimeoutError())64            elif isinstance(result, Exception):65                deferred.errback(result)66            else:67                deferred.callback(result)68        # We must fire the deferred later because we don't want69        # to allow the user of this class to call any other70        # netsnmp functions while inside this netsnmp callback.71        reactor.callLater(0, fire)72    def get(self, oids):73        deferred = defer.Deferred()74        self._session.get(oids, self._done, deferred)75        self._update_timeout()76        return deferred77    def walk(self, oids, strict=False):78        deferred = defer.Deferred()79        self._session.walk(oids, self._done, deferred, strict=strict)80        self._update_timeout()...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
