How to use report_session_end method in Slash

Best Python code snippet using slash

audit_bash.py

Source:audit_bash.py Github

copy

Full Screen

...144 print "\n[STRICT AUDIT]report session start failed ![%s]" % (e,)145 os.kill(os.getpid(), signal.SIGKILL)146 else:147 pass148 def report_session_end(self):149 if AUDIT_MODEL == "no-audit":150 return151 try:152 param = {}153 param["session_id"] = self.session_id154 param["report_key"] = REPORT_KEY155 tms = datetime.datetime.now()156 time_now = tms.strftime('%Y-%m-%d %H:%M:%S')157 param["time"] = time_now158 r = requests.post("%s%s" % (REPORT_ADDR, URL_SESSION_END), data=param, timeout=TIMEOUT, verify=CA_PATH)159 if r.status_code == 200:160 return161 else:162 if AUDIT_MODEL == 'strict':163 print "\n[STRICT AUDIT]Session End report Failed! [HTTP ERROR][code:%s][body:%s]" % (164 r.status_code, r.text)165 else:166 pass167 except Exception, e:168 if AUDIT_MODEL == 'strict':169 print "\n[STRICT AUDIT] report session end failed ![%s]" % (e,)170 else:171 pass172 def report_command(self, command):173 if AUDIT_MODEL == "no-audit" or command == "":174 return175 try:176 param = {}177 param["session_id"] = self.session_id178 param["user"] = self.username179 param["role"] = self.role180 param["hostname"] = self.hostname181 param["command"] = command182 param["report_key"] = REPORT_KEY183 tms = datetime.datetime.now()184 time_now = tms.strftime('%Y-%m-%d %H:%M:%S')185 param["time"] = time_now186 r = requests.post("%s%s" % (REPORT_ADDR, URL_COMMAND), param, timeout=TIMEOUT, verify=CA_PATH)187 if r.status_code == 200:188 pass189 else:190 if AUDIT_MODEL == 'strict':191 print "\n[STRICT AUDIT]Command Report Failed! [HTTP ERROR][code:%s][body:%s]" % (192 r.status_code, r.text)193 os.kill(os.getpid(), signal.SIGINT)194 else:195 pass196 except Exception, e:197 if AUDIT_MODEL == 'strict':198 print "\n[STRICT AUDIT] report command failed ![%s]" % (e,)199 os.kill(os.getpid(), signal.SIGINT)200 else:201 pass202 def start(self):203 stdin_origin_attr = termios.tcgetattr(sys.stdin.fileno())204 data = ""205 input_mode = False206 pre_timestamp = time.time()207 log_file_f, log_time_f = pty.get_log()208 # signal.signal(signal.SIGCHLD, signal.SIG_IGN)209 try:210 if self.pty_pid == 0:211 os.execlp("/bin/bash", "-i")212 os.kill(os.getpid(), signal.SIGINT)213 else:214 self.set_window_size()215 signal.signal(signal.SIGWINCH, self.set_window_size)216 stdin_fd = sys.stdin.fileno()217 stdout_fd = sys.stdout.fileno()218 stdout_attr = termios.tcgetattr(stdout_fd)219 termios.tcsetattr(self.pty_fd, termios.TCSADRAIN, stdout_attr)220 flag = fcntl.fcntl(sys.stdin, fcntl.F_GETFL, 0)221 fcntl.fcntl(sys.stdin.fileno(), fcntl.F_SETFL, flag | os.O_NONBLOCK)222 tty.setraw(stdin_fd)223 tty.setcbreak(stdin_fd)224 ts = threading.Thread(target=self.report_session_start, args=())225 ts.setDaemon(True)226 ts.start()227 epoll = select.epoll()228 epoll.register(stdin_fd, select.POLLIN)229 epoll.register(self.pty_fd, select.POLLIN | select.POLLOUT)230 while self.pty_is_alived():231 events = epoll.poll(6)232 if not events:233 continue234 rs = []235 ws = []236 for fd, event in events:237 if event & select.EPOLLIN:238 rs.append(fd)239 if event & select.EPOLLOUT:240 ws.append(fd)241 if self.pty_fd in rs:242 try:243 output = os.read(self.pty_fd, BUFSIZ)244 os.write(stdout_fd, output)245 now_timestamp = time.time()246 log_time_f.write('%s %s\n' % (round(now_timestamp - pre_timestamp, 4), len(output)))247 log_time_f.flush()248 log_file_f.write(output)249 log_file_f.flush()250 pre_timestamp = now_timestamp251 log_file_f.flush()252 self.vim_data += output253 if input_mode:254 data += output255 couple = self.ps1_pattern.findall(output)256 if len(couple) > 0:257 couple_hit = couple[-1]258 couple_sp_l = couple_hit.split("@")259 if len(couple_sp_l) == 2:260 self.role = couple_sp_l[0]261 self.hostname = couple_sp_l[1]262 except OSError:263 pass264 if stdin_fd in rs:265 inpt = os.read(stdin_fd, BUFSIZ)266 os.write(self.pty_fd, inpt)267 input_mode = True268 if self.is_output(str(inpt)):269 # 如果len(str(x)) > 1 说明是复制输入的270 if len(str(inpt)) > 1:271 data = inpt272 match = self.vim_end_pattern.findall(self.vim_data)273 if match:274 if self.vim_flag or len(match) == 2:275 self.vim_flag = False276 else:277 self.vim_flag = True278 elif not self.vim_flag:279 self.vim_flag = False280 data = self.deal_command(data)[0:200]281 if data is not None:282 # TtyLog(log=log, datetime=datetime.datetime.now(), cmd=data).save()283 # TODO: to record command284 # ffff.write("%s %s %s\n" % (self.role, self.hostname, data))285 tc = threading.Thread(target=self.report_command, args=(data,))286 tc.setDaemon(True)287 tc.start()288 data = ''289 self.vim_data = ''290 input_mode = False291 time.sleep(0.01)292 finally:293 termios.tcsetattr(stdin_fd, termios.TCSAFLUSH, stdin_origin_attr)294 log_file_f.write('End time is %s' % datetime.datetime.now())295 log_file_f.close()296 log_time_f.close()297 self.report_session_end()298 def get_log(self):299 """300 Logging user command and output.301 """302 tty_log_dir = os.path.join(LOG_DIR, 'tty')303 date_today = datetime.datetime.now()304 date_start = date_today.strftime('%Y%m%d')305 time_start = date_today.strftime('%H%M%S')306 today_connect_log_dir = os.path.join(tty_log_dir, date_start)307 log_file_path = os.path.join(today_connect_log_dir,308 '%s_%s_%s_%s' % (self.username, self.cs_name, date_start, time_start))309 self.log_file = log_file_path310 try:311 mkdir(os.path.dirname(today_connect_log_dir), mode=777)...

Full Screen

Full Screen

session.py

Source:session.py Github

copy

Full Screen

1# pylint: disable=no-member2from sentinels import NOTHING3from .api_object import APIObject4from .archiveable import Archiveable5from .commentable import Commentable6from .error_container import ErrorContainer7from .related_entity_container import RelatedEntityContainer8from .warning_container import WarningContainer9from .lazy_query import LazyQuery10from .metadata_holder import MetadataHolder11from .timing_container import TimingContainer12from typing import Dict, Any13APPEND_UPCOMING_TESTS_STR = 'append_upcoming_tests'14class Session(APIObject, MetadataHolder, ErrorContainer, WarningContainer, Archiveable, Commentable, RelatedEntityContainer, TimingContainer):15 @property16 def ui_url(self) -> str:17 return self.client.get_ui_url(f'/sessions/{self.logical_id or self.id}')18 def report_end(self, duration=NOTHING, has_fatal_errors=NOTHING) -> None:19 kwargs = {'id': self.id, 'duration': duration, 'has_fatal_errors': has_fatal_errors}20 self.client.api.call_function('report_session_end', kwargs)21 def send_keepalive(self) -> None:22 self.client.api.call_function('send_keepalive', {'session_id': self.id})23 def report_test_start(self, name, file_name=NOTHING, class_name=NOTHING, test_logical_id=NOTHING, scm=NOTHING,24 file_hash=NOTHING,25 scm_revision=NOTHING, scm_dirty=NOTHING, scm_local_branch=NOTHING, scm_remote_branch=NOTHING,26 is_interactive=NOTHING, variation=NOTHING, metadata=NOTHING, test_index=NOTHING, parameters=NOTHING):27 params = {28 'session_id': self.id,29 'name': name,30 'scm': scm,31 'file_hash': file_hash,32 'scm_revision': scm_revision,33 'scm_dirty': scm_dirty,34 'scm_local_branch': scm_local_branch,35 'scm_remote_branch': scm_remote_branch,36 'class_name': class_name,37 'file_name': file_name,38 'is_interactive': is_interactive,39 'variation': variation,40 'test_logical_id': test_logical_id,41 'test_index': test_index,42 'parameters': _sanitize_params(parameters),43 }44 if metadata is not NOTHING:45 supports_inline_metadata = (self.client.api.info().endpoints.report_test_start.version >= 2)46 if supports_inline_metadata:47 params['metadata'] = metadata48 returned = self.client.api.call_function(49 'report_test_start', params50 )51 if metadata is not NOTHING and not supports_inline_metadata:52 returned.set_metadata_dict(metadata)53 return returned54 def report_test_distributed(self, test_logical_id) -> None:55 self.client.api.call_function('report_test_distributed', {'session_id': self.id, 'test_logical_id': test_logical_id})56 def report_upcoming_tests(self, tests) -> None:57 self.client.api.call_function(APPEND_UPCOMING_TESTS_STR,58 {'tests':tests, 'session_id':self.id}59 )60 def report_in_pdb(self) -> None:61 self.client.api.call_function('report_in_pdb', {'session_id': self.id})62 def report_not_in_pdb(self) -> None:63 self.client.api.call_function('report_not_in_pdb', {'session_id': self.id})64 def report_interrupted(self) -> None:65 if 'report_session_interrupted' in self.client.api.info().endpoints:66 self.client.api.call_function('report_session_interrupted', {'id': self.id})67 def add_subject(self, name: str, product=NOTHING, version=NOTHING, revision=NOTHING):68 return self.client.api.call_function(69 'add_subject',70 {'session_id': self.id, 'name': name, 'product': product, 'version': version, 'revision': revision})71 def edit_status(self, status):72 return self.client.api.call_function('edit_session_status', {'id': self.id, 'status': status})73 def query_tests(self, include_planned: bool=False) -> LazyQuery:74 """Queries tests of the current session75 :rtype: A lazy query object76 """77 params = None78 if include_planned:79 params = {'show_planned':'true'}80 return LazyQuery(self.client, f'/rest/sessions/{self.id}/tests', query_params=params)81 def query_errors(self) -> LazyQuery:82 """Queries tests of the current session83 :rtype: A lazy query object84 """85 return LazyQuery(self.client, '/rest/errors', query_params={'session_id': self.id})86 def toggle_investigated(self):87 return self.client.api.call_function('toggle_investigated', {'session_id': self.id})88 def get_parent(self) -> None:89 return None90def _sanitize_params(params: Dict[str, Any], max_length: int=100) -> Dict[str, Any]:91 if params is NOTHING:92 return params93 returned = {}94 for key, value in params.items():95 if value is not None and not isinstance(value, (int, bool, float, str, bytes)):96 value = str(value)97 if isinstance(value, (str, bytes)) and len(value) > max_length:98 remainder = 599 value = value[:max_length-remainder-3] + '...' + value[-remainder:]100 returned[key] = value...

Full Screen

Full Screen

reporter_interface.py

Source:reporter_interface.py Github

copy

Full Screen

...6 def report_before_debugger(self, exc_info):7 pass8 def report_session_start(self, session):9 pass10 def report_session_end(self, session):11 pass12 def report_file_start(self, filename):13 pass14 def report_file_end(self, filename):15 pass16 def report_collection_start(self):17 pass18 def report_test_collected(self, all_tests, test):19 pass20 def report_collection_end(self, collected):21 pass22 def report_test_start(self, test):23 pass24 def report_test_end(self, test, result):...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Slash automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful