Best Python code snippet using slash
audit_bash.py
Source:audit_bash.py  
...144                print "\n[STRICT AUDIT]report session start failed ![%s]" % (e,)145                os.kill(os.getpid(), signal.SIGKILL)146            else:147                pass148    def report_session_end(self):149        if AUDIT_MODEL == "no-audit":150            return151        try:152            param = {}153            param["session_id"] = self.session_id154            param["report_key"] = REPORT_KEY155            tms = datetime.datetime.now()156            time_now = tms.strftime('%Y-%m-%d %H:%M:%S')157            param["time"] = time_now158            r = requests.post("%s%s" % (REPORT_ADDR, URL_SESSION_END), data=param, timeout=TIMEOUT, verify=CA_PATH)159            if r.status_code == 200:160                return161            else:162                if AUDIT_MODEL == 'strict':163                    print "\n[STRICT AUDIT]Session End report Failed! [HTTP ERROR][code:%s][body:%s]" % (164                    r.status_code, r.text)165                else:166                    pass167        except Exception, e:168            if AUDIT_MODEL == 'strict':169                print "\n[STRICT AUDIT] report session end failed ![%s]" % (e,)170            else:171                pass172    def report_command(self, command):173        if AUDIT_MODEL == "no-audit" or command == "":174            return175        try:176            param = {}177            param["session_id"] = self.session_id178            param["user"] = self.username179            param["role"] = self.role180            param["hostname"] = self.hostname181            param["command"] = command182            param["report_key"] = REPORT_KEY183            tms = datetime.datetime.now()184            time_now = tms.strftime('%Y-%m-%d %H:%M:%S')185            param["time"] = time_now186            r = requests.post("%s%s" % (REPORT_ADDR, URL_COMMAND), param, timeout=TIMEOUT, verify=CA_PATH)187            if r.status_code == 200:188                pass189            else:190                if AUDIT_MODEL == 'strict':191                    print "\n[STRICT AUDIT]Command Report Failed! [HTTP ERROR][code:%s][body:%s]" % (192                    r.status_code, r.text)193                    os.kill(os.getpid(), signal.SIGINT)194                else:195                    pass196        except Exception, e:197            if AUDIT_MODEL == 'strict':198                print "\n[STRICT AUDIT] report command failed ![%s]" % (e,)199                os.kill(os.getpid(), signal.SIGINT)200            else:201                pass202    def start(self):203        stdin_origin_attr = termios.tcgetattr(sys.stdin.fileno())204        data = ""205        input_mode = False206        pre_timestamp = time.time()207        log_file_f, log_time_f = pty.get_log()208        # signal.signal(signal.SIGCHLD, signal.SIG_IGN)209        try:210            if self.pty_pid == 0:211                os.execlp("/bin/bash", "-i")212                os.kill(os.getpid(), signal.SIGINT)213            else:214                self.set_window_size()215                signal.signal(signal.SIGWINCH, self.set_window_size)216                stdin_fd = sys.stdin.fileno()217                stdout_fd = sys.stdout.fileno()218                stdout_attr = termios.tcgetattr(stdout_fd)219                termios.tcsetattr(self.pty_fd, termios.TCSADRAIN, stdout_attr)220                flag = fcntl.fcntl(sys.stdin, fcntl.F_GETFL, 0)221                fcntl.fcntl(sys.stdin.fileno(), fcntl.F_SETFL, flag | os.O_NONBLOCK)222                tty.setraw(stdin_fd)223                tty.setcbreak(stdin_fd)224                ts = threading.Thread(target=self.report_session_start, args=())225                ts.setDaemon(True)226                ts.start()227                epoll = select.epoll()228                epoll.register(stdin_fd, select.POLLIN)229                epoll.register(self.pty_fd, select.POLLIN | select.POLLOUT)230                while self.pty_is_alived():231                    events = epoll.poll(6)232                    if not events:233                        continue234                    rs = []235                    ws = []236                    for fd, event in events:237                        if event & select.EPOLLIN:238                            rs.append(fd)239                        if event & select.EPOLLOUT:240                            ws.append(fd)241                    if self.pty_fd in rs:242                        try:243                            output = os.read(self.pty_fd, BUFSIZ)244                            os.write(stdout_fd, output)245                            now_timestamp = time.time()246                            log_time_f.write('%s %s\n' % (round(now_timestamp - pre_timestamp, 4), len(output)))247                            log_time_f.flush()248                            log_file_f.write(output)249                            log_file_f.flush()250                            pre_timestamp = now_timestamp251                            log_file_f.flush()252                            self.vim_data += output253                            if input_mode:254                                data += output255                            couple = self.ps1_pattern.findall(output)256                            if len(couple) > 0:257                                couple_hit = couple[-1]258                                couple_sp_l = couple_hit.split("@")259                                if len(couple_sp_l) == 2:260                                    self.role = couple_sp_l[0]261                                    self.hostname = couple_sp_l[1]262                        except OSError:263                            pass264                    if stdin_fd in rs:265                        inpt = os.read(stdin_fd, BUFSIZ)266                        os.write(self.pty_fd, inpt)267                        input_mode = True268                        if self.is_output(str(inpt)):269                            # 妿len(str(x)) > 1 è¯´ææ¯å¤å¶è¾å
¥ç270                            if len(str(inpt)) > 1:271                                data = inpt272                            match = self.vim_end_pattern.findall(self.vim_data)273                            if match:274                                if self.vim_flag or len(match) == 2:275                                    self.vim_flag = False276                                else:277                                    self.vim_flag = True278                            elif not self.vim_flag:279                                self.vim_flag = False280                                data = self.deal_command(data)[0:200]281                                if data is not None:282                                    # TtyLog(log=log, datetime=datetime.datetime.now(), cmd=data).save()283                                    # TODO: to record command284                                    # ffff.write("%s %s %s\n" % (self.role, self.hostname, data))285                                    tc = threading.Thread(target=self.report_command, args=(data,))286                                    tc.setDaemon(True)287                                    tc.start()288                            data = ''289                            self.vim_data = ''290                            input_mode = False291                    time.sleep(0.01)292        finally:293            termios.tcsetattr(stdin_fd, termios.TCSAFLUSH, stdin_origin_attr)294            log_file_f.write('End time is %s' % datetime.datetime.now())295            log_file_f.close()296            log_time_f.close()297            self.report_session_end()298    def get_log(self):299        """300        Logging user command and output.301        """302        tty_log_dir = os.path.join(LOG_DIR, 'tty')303        date_today = datetime.datetime.now()304        date_start = date_today.strftime('%Y%m%d')305        time_start = date_today.strftime('%H%M%S')306        today_connect_log_dir = os.path.join(tty_log_dir, date_start)307        log_file_path = os.path.join(today_connect_log_dir,308                                     '%s_%s_%s_%s' % (self.username, self.cs_name, date_start, time_start))309        self.log_file = log_file_path310        try:311            mkdir(os.path.dirname(today_connect_log_dir), mode=777)...session.py
Source:session.py  
1# pylint: disable=no-member2from sentinels import NOTHING3from .api_object import APIObject4from .archiveable import Archiveable5from .commentable import Commentable6from .error_container import ErrorContainer7from .related_entity_container import RelatedEntityContainer8from .warning_container import WarningContainer9from .lazy_query import LazyQuery10from .metadata_holder import MetadataHolder11from .timing_container import TimingContainer12from typing import Dict, Any13APPEND_UPCOMING_TESTS_STR = 'append_upcoming_tests'14class Session(APIObject, MetadataHolder, ErrorContainer, WarningContainer, Archiveable, Commentable, RelatedEntityContainer, TimingContainer):15    @property16    def ui_url(self) -> str:17        return self.client.get_ui_url(f'/sessions/{self.logical_id or self.id}')18    def report_end(self, duration=NOTHING, has_fatal_errors=NOTHING) -> None:19        kwargs = {'id': self.id, 'duration': duration, 'has_fatal_errors': has_fatal_errors}20        self.client.api.call_function('report_session_end', kwargs)21    def send_keepalive(self) -> None:22        self.client.api.call_function('send_keepalive', {'session_id': self.id})23    def report_test_start(self, name, file_name=NOTHING, class_name=NOTHING, test_logical_id=NOTHING, scm=NOTHING,24                          file_hash=NOTHING,25                          scm_revision=NOTHING, scm_dirty=NOTHING, scm_local_branch=NOTHING, scm_remote_branch=NOTHING,26                          is_interactive=NOTHING, variation=NOTHING, metadata=NOTHING, test_index=NOTHING, parameters=NOTHING):27        params = {28            'session_id': self.id,29            'name': name,30            'scm': scm,31            'file_hash': file_hash,32            'scm_revision': scm_revision,33            'scm_dirty': scm_dirty,34            'scm_local_branch': scm_local_branch,35            'scm_remote_branch': scm_remote_branch,36            'class_name': class_name,37            'file_name': file_name,38            'is_interactive': is_interactive,39            'variation': variation,40            'test_logical_id': test_logical_id,41            'test_index': test_index,42            'parameters': _sanitize_params(parameters),43        }44        if metadata is not NOTHING:45            supports_inline_metadata = (self.client.api.info().endpoints.report_test_start.version >= 2)46            if supports_inline_metadata:47                params['metadata'] = metadata48        returned = self.client.api.call_function(49            'report_test_start', params50        )51        if metadata is not NOTHING and not supports_inline_metadata:52            returned.set_metadata_dict(metadata)53        return returned54    def report_test_distributed(self, test_logical_id) -> None:55        self.client.api.call_function('report_test_distributed', {'session_id': self.id, 'test_logical_id': test_logical_id})56    def report_upcoming_tests(self, tests) -> None:57        self.client.api.call_function(APPEND_UPCOMING_TESTS_STR,58                                      {'tests':tests, 'session_id':self.id}59                                     )60    def report_in_pdb(self) -> None:61        self.client.api.call_function('report_in_pdb', {'session_id': self.id})62    def report_not_in_pdb(self) -> None:63        self.client.api.call_function('report_not_in_pdb', {'session_id': self.id})64    def report_interrupted(self) -> None:65        if 'report_session_interrupted' in self.client.api.info().endpoints:66            self.client.api.call_function('report_session_interrupted', {'id': self.id})67    def add_subject(self, name: str, product=NOTHING, version=NOTHING, revision=NOTHING):68        return self.client.api.call_function(69            'add_subject',70            {'session_id': self.id, 'name': name, 'product': product, 'version': version, 'revision': revision})71    def edit_status(self, status):72        return self.client.api.call_function('edit_session_status', {'id': self.id, 'status': status})73    def query_tests(self, include_planned: bool=False) -> LazyQuery:74        """Queries tests of the current session75        :rtype: A lazy query object76        """77        params = None78        if include_planned:79            params = {'show_planned':'true'}80        return LazyQuery(self.client, f'/rest/sessions/{self.id}/tests', query_params=params)81    def query_errors(self) -> LazyQuery:82        """Queries tests of the current session83        :rtype: A lazy query object84        """85        return LazyQuery(self.client, '/rest/errors', query_params={'session_id': self.id})86    def toggle_investigated(self):87        return self.client.api.call_function('toggle_investigated', {'session_id': self.id})88    def get_parent(self) -> None:89        return None90def _sanitize_params(params: Dict[str, Any], max_length: int=100) -> Dict[str, Any]:91    if params is NOTHING:92        return params93    returned = {}94    for key, value in params.items():95        if value is not None and not isinstance(value, (int, bool, float, str, bytes)):96            value = str(value)97        if isinstance(value, (str, bytes)) and len(value) > max_length:98            remainder = 599            value = value[:max_length-remainder-3] + '...' + value[-remainder:]100        returned[key] = value...reporter_interface.py
Source:reporter_interface.py  
...6    def report_before_debugger(self, exc_info):7        pass8    def report_session_start(self, session):9        pass10    def report_session_end(self, session):11        pass12    def report_file_start(self, filename):13        pass14    def report_file_end(self, filename):15        pass16    def report_collection_start(self):17        pass18    def report_test_collected(self, all_tests, test):19        pass20    def report_collection_end(self, collected):21        pass22    def report_test_start(self, test):23        pass24    def report_test_end(self, test, result):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
