How to use has_fatal_errors method in Slash

Best Python code snippet using slash

test_session_api.py

Source:test_session_api.py Github

copy

Full Screen

1# pylint: disable=unused-argument2import socket3import flux4import pytest5from .utils import (6 raises_conflict,7 raises_not_found,8 without_single_rendered_fields,9 raises_bad_request,10)11def test_start_session(client):12 session = client.report_session_start()13 assert session14def test_session_user_email(client, started_session, testuser_email):15 assert started_session.user_email == testuser_email16def test_start_session_logical_id(client):17 logical_id = "1"18 session = client.report_session_start(logical_id=logical_id)19 assert session20def test_start_session_no_logical_id(client):21 session = client.report_session_start()22 assert session23def test_started_session_hostname(started_session):24 assert started_session.hostname == socket.getfqdn()25def test_started_session_hostname_specify_explicitly(client):26 hostname = "some_hostname"27 session = client.report_session_start(hostname=hostname)28 assert session.hostname == hostname29def test_session_user(started_session, testuser_id):30 assert started_session.user_id == testuser_id31def test_started_session_times(started_session):32 assert started_session.start_time is not None33 assert started_session.end_time is None34@pytest.mark.parametrize("use_duration", [True, False])35def test_end_session(started_session, use_duration):36 duration = 1037 start_time = started_session.start_time38 if use_duration:39 started_session.report_end(duration=duration)40 else:41 flux.current_timeline.sleep(10)42 started_session.report_end()43 started_session.refresh()44 assert started_session.end_time == start_time + duration45def test_end_session_doesnt_exist(nonexistent_session):46 with raises_not_found():47 nonexistent_session.report_end()48def test_end_session_twice(ended_session):49 with raises_conflict():50 ended_session.report_end()51def test_session_status_running(started_session):52 started_session.refresh()53 assert started_session.status == "RUNNING"54def test_session_status_finished(ended_session):55 ended_session.refresh()56 assert ended_session.status == "SUCCESS"57def test_session_status_failure_after_test_end(started_session_with_ended_test):58 (started_session, test) = started_session_with_ended_test59 test.add_failure("failure")60 test.refresh()61 assert test.status == "FAILURE"62 started_session.report_end()63 started_session.refresh()64 assert started_session.status == "FAILURE"65def test_session_status_error_after_test_end(started_session_with_ended_test):66 (started_session, test) = started_session_with_ended_test67 test.add_error("error")68 started_session.report_end()69 started_session.refresh()70 assert started_session.status == "ERROR"71def test_session_status_error_and_failure(started_session_with_ended_test):72 (started_session, test) = started_session_with_ended_test73 test.add_error("error")74 test.add_failure("failure")75 started_session.report_end()76 started_session.refresh()77 assert started_session.status == "ERROR"78def test_session_query_tests(started_session_with_ended_test):79 (started_session, test) = started_session_with_ended_test80 [queried_test] = started_session.query_tests()81 assert queried_test.id == test.id82 test.refresh() # need to update end_time83 assert queried_test == without_single_rendered_fields(test)84@pytest.mark.parametrize("report_end", [True, False])85def test_report_interrupted(started_session, report_end):86 started_session.report_interrupted()87 if report_end:88 started_session.report_end()89 assert started_session.refresh().status.lower() == "interrupted"90def test_report_interrupted_ended_session(started_session):91 started_session.report_end()92 started_session.report_interrupted()93 assert started_session.refresh().status.lower() == "interrupted"94@pytest.mark.parametrize("has_fatal_errors", [True, False])95def test_session_end_with_fatal_errors(started_session, has_fatal_errors):96 started_session.report_end(has_fatal_errors=has_fatal_errors)97 assert started_session.refresh().has_fatal_errors == has_fatal_errors98def test_session_ttl_cant_provide_without_keepalive(client):99 with raises_bad_request():100 client.report_session_start(ttl_seconds=100)101def test_session_ttl_null_by_default(started_session):102 assert started_session.ttl_seconds is None103 assert started_session.delete_at is None104def test_session_ttl_on_creation(ttl_session, ttl_seconds, keepalive_interval):105 assert ttl_session.ttl_seconds == ttl_seconds106 assert (107 ttl_session.delete_at108 == flux.current_timeline.time() + ttl_seconds + keepalive_interval109 )110def test_session_ttl_on_keepalive(client, ttl_session, ttl_seconds, keepalive_interval):111 sleep_seconds = 5112 flux.current_timeline.sleep(sleep_seconds)113 client.api.call.send_keepalive(session_id=ttl_session.id)114 ttl_session.refresh()115 assert ttl_session.ttl_seconds == ttl_seconds116 assert (117 ttl_session.delete_at118 == flux.current_timeline.time() + keepalive_interval + ttl_seconds119 )120 assert ttl_session.delete_at == ttl_session.next_keepalive + ttl_seconds121def test_session_ttl_on_session_end(client, ttl_session, ttl_seconds):122 sleep_seconds = 5123 flux.current_timeline.sleep(sleep_seconds)124 ttl_session.report_end()125 ttl_session.refresh()126 assert ttl_session.ttl_seconds == ttl_seconds127 assert ttl_session.delete_at == flux.current_timeline.time() + ttl_seconds128@pytest.mark.parametrize("child_has_delete_at", [True, False])129def test_parallel_session_discard(130 client, started_parallel_session, real_login, admin_role, child_has_delete_at131):132 parent, child = started_parallel_session133 if child_has_delete_at:134 client.api.call.discard_session(session_id=child.id, grace_period_seconds=100)135 assert parent.delete_at is None136 client.api.call.discard_session(session_id=parent.id, grace_period_seconds=1000)137 assert parent.refresh().delete_at is not None138 assert parent.delete_at == child.refresh().delete_at139@pytest.mark.parametrize("child_has_delete_at", [True, False])140@pytest.mark.parametrize("parent_has_delete_at", [True, False])141def test_parallel_session_preserve(142 client,143 started_parallel_session,144 real_login,145 admin_role,146 child_has_delete_at,147 parent_has_delete_at,148):149 parent, child = started_parallel_session150 if child_has_delete_at:151 client.api.call.discard_session(session_id=child.id, grace_period_seconds=100)152 if parent_has_delete_at:153 client.api.call.discard_session(session_id=parent.id, grace_period_seconds=100)154 assert parent.delete_at is None155 client.api.call.preserve_session(session_id=parent.id)156 assert parent.refresh().delete_at is None157 assert child.refresh().delete_at is None158def test_report_reporting_stopped(client, started_session):159 assert not started_session.reporting_stopped160 client.api.call.report_reporting_stopped(session_id=started_session.id)161 started_session.refresh()162 assert started_session.end_time is None...

Full Screen

Full Screen

session.py

Source:session.py Github

copy

Full Screen

1# pylint: disable=no-member2from sentinels import NOTHING3from .api_object import APIObject4from .archiveable import Archiveable5from .commentable import Commentable6from .error_container import ErrorContainer7from .related_entity_container import RelatedEntityContainer8from .warning_container import WarningContainer9from .lazy_query import LazyQuery10from .metadata_holder import MetadataHolder11from .timing_container import TimingContainer12from typing import Dict, Any13APPEND_UPCOMING_TESTS_STR = 'append_upcoming_tests'14class Session(APIObject, MetadataHolder, ErrorContainer, WarningContainer, Archiveable, Commentable, RelatedEntityContainer, TimingContainer):15 @property16 def ui_url(self) -> str:17 return self.client.get_ui_url(f'/sessions/{self.logical_id or self.id}')18 def report_end(self, duration=NOTHING, has_fatal_errors=NOTHING) -> None:19 kwargs = {'id': self.id, 'duration': duration, 'has_fatal_errors': has_fatal_errors}20 self.client.api.call_function('report_session_end', kwargs)21 def send_keepalive(self) -> None:22 self.client.api.call_function('send_keepalive', {'session_id': self.id})23 def report_test_start(self, name, file_name=NOTHING, class_name=NOTHING, test_logical_id=NOTHING, scm=NOTHING,24 file_hash=NOTHING,25 scm_revision=NOTHING, scm_dirty=NOTHING, scm_local_branch=NOTHING, scm_remote_branch=NOTHING,26 is_interactive=NOTHING, variation=NOTHING, metadata=NOTHING, test_index=NOTHING, parameters=NOTHING):27 params = {28 'session_id': self.id,29 'name': name,30 'scm': scm,31 'file_hash': file_hash,32 'scm_revision': scm_revision,33 'scm_dirty': scm_dirty,34 'scm_local_branch': scm_local_branch,35 'scm_remote_branch': scm_remote_branch,36 'class_name': class_name,37 'file_name': file_name,38 'is_interactive': is_interactive,39 'variation': variation,40 'test_logical_id': test_logical_id,41 'test_index': test_index,42 'parameters': _sanitize_params(parameters),43 }44 if metadata is not NOTHING:45 supports_inline_metadata = (self.client.api.info().endpoints.report_test_start.version >= 2)46 if supports_inline_metadata:47 params['metadata'] = metadata48 returned = self.client.api.call_function(49 'report_test_start', params50 )51 if metadata is not NOTHING and not supports_inline_metadata:52 returned.set_metadata_dict(metadata)53 return returned54 def report_test_distributed(self, test_logical_id) -> None:55 self.client.api.call_function('report_test_distributed', {'session_id': self.id, 'test_logical_id': test_logical_id})56 def report_upcoming_tests(self, tests) -> None:57 self.client.api.call_function(APPEND_UPCOMING_TESTS_STR,58 {'tests':tests, 'session_id':self.id}59 )60 def report_in_pdb(self) -> None:61 self.client.api.call_function('report_in_pdb', {'session_id': self.id})62 def report_not_in_pdb(self) -> None:63 self.client.api.call_function('report_not_in_pdb', {'session_id': self.id})64 def report_interrupted(self) -> None:65 if 'report_session_interrupted' in self.client.api.info().endpoints:66 self.client.api.call_function('report_session_interrupted', {'id': self.id})67 def add_subject(self, name: str, product=NOTHING, version=NOTHING, revision=NOTHING):68 return self.client.api.call_function(69 'add_subject',70 {'session_id': self.id, 'name': name, 'product': product, 'version': version, 'revision': revision})71 def edit_status(self, status):72 return self.client.api.call_function('edit_session_status', {'id': self.id, 'status': status})73 def query_tests(self, include_planned: bool=False) -> LazyQuery:74 """Queries tests of the current session75 :rtype: A lazy query object76 """77 params = None78 if include_planned:79 params = {'show_planned':'true'}80 return LazyQuery(self.client, f'/rest/sessions/{self.id}/tests', query_params=params)81 def query_errors(self) -> LazyQuery:82 """Queries tests of the current session83 :rtype: A lazy query object84 """85 return LazyQuery(self.client, '/rest/errors', query_params={'session_id': self.id})86 def toggle_investigated(self):87 return self.client.api.call_function('toggle_investigated', {'session_id': self.id})88 def get_parent(self) -> None:89 return None90def _sanitize_params(params: Dict[str, Any], max_length: int=100) -> Dict[str, Any]:91 if params is NOTHING:92 return params93 returned = {}94 for key, value in params.items():95 if value is not None and not isinstance(value, (int, bool, float, str, bytes)):96 value = str(value)97 if isinstance(value, (str, bytes)) and len(value) > max_length:98 remainder = 599 value = value[:max_length-remainder-3] + '...' + value[-remainder:]100 returned[key] = value...

Full Screen

Full Screen

d378ba47afba_add_fatal_errors_support.py

Source:d378ba47afba_add_fatal_errors_support.py Github

copy

Full Screen

1"""Add fatal errors support2Revision ID: d378ba47afba3Revises: f276be9ca1be4Create Date: 2017-07-02 14:54:21.6041975"""6# revision identifiers, used by Alembic.7revision = 'd378ba47afba'8down_revision = 'f276be9ca1be'9from alembic import op10import sqlalchemy as sa11def upgrade():12 # ### commands auto generated by Alembic - please adjust! ###13 op.add_column('session', sa.Column('has_fatal_errors', sa.Boolean(), nullable=True))14 # ### end Alembic commands ###15def downgrade():16 # ### commands auto generated by Alembic - please adjust! ###17 op.drop_column('session', 'has_fatal_errors')...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Slash automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful