Best Python code snippet using localstack_python
rest.py
Source:rest.py  
1# pylint: disable=no-member2import math3import os4import requests5from flask import Blueprint, abort, request, jsonify, current_app, Response6from flask_restful import Api, reqparse7from sqlalchemy.orm.exc import NoResultFound8from sqlalchemy import func, or_9from sqlalchemy.orm import aliased10from flask_simple_api import error_abort11from flask_security import current_user12from .. import models13from ..models import Error, Session, Test, User, Subject, db, UserStarredTests14from .. import activity15from ..utils.identification import parse_test_id, parse_session_id16from ..utils.users import has_role17from ..utils.rest import ModelResource18from ..filters import filter_by_statuses19from ..search import get_orm_query_from_search_string20blueprint = Blueprint('rest', __name__, url_prefix='/rest')21rest = Api(blueprint)22def _resource(*args, **kwargs):23    def decorator(resource):24        rest.add_resource(resource, *args, **kwargs)25        return resource26    return decorator27##########################################################################28session_parser = reqparse.RequestParser()29session_parser.add_argument('user_id', type=int, default=None)30session_parser.add_argument('subject_name', type=str, default=None)31session_parser.add_argument('search', type=str, default=None)32session_parser.add_argument('parent_logical_id', type=str, default=None)33session_parser.add_argument('id', type=str, default=None)34@_resource('/sessions', '/sessions/<object_id>')35class SessionResource(ModelResource):36    MODEL = Session37    DEFAULT_SORT = (Session.start_time.desc(),)38    def _get_object_by_id(self, object_id):39        return _get_object_by_id_or_logical_id(self.MODEL, object_id)40    def _get_iterator(self):41        args = session_parser.parse_args()42        if args.id is not None:43            return _get_query_by_id_or_logical_id(self.MODEL, args.id)44        if args.search:45            returned = get_orm_query_from_search_string('session', args.search, abort_on_syntax_error=True)46        else:47            returned = super(SessionResource, self)._get_iterator()48        if args.parent_logical_id is not None:49            returned =  returned.filter(Session.parent_logical_id == args.parent_logical_id)50        else:51            returned = returned.filter(Session.parent_logical_id == None)52        if args.subject_name is not None:53            returned = (54                returned55                .join(Session.subject_instances)56                .join(Subject)57                .filter(Subject.name == args.subject_name))58        if args.user_id is not None:59            returned = returned.filter(Session.user_id == args.user_id)60        returned = filter_by_statuses(returned, self.MODEL)61        return returned62test_query_parser = reqparse.RequestParser()63test_query_parser.add_argument('session_id', default=None)64test_query_parser.add_argument('info_id', type=int, default=None)65test_query_parser.add_argument('search', type=str, default=None)66test_query_parser.add_argument('after_index', type=int, default=None)67test_query_parser.add_argument('before_index', type=int, default=None)68test_query_parser.add_argument('id', type=str, default=None)69test_query_parser.add_argument('starred', type=bool, default=False)70test_query_parser.add_argument('user_id', type=int, default=None)71@_resource('/tests', '/tests/<object_id>', '/sessions/<session_id>/tests')72class TestResource(ModelResource):73    MODEL = Test74    DEFAULT_SORT = (Test.start_time.desc(),)75    SORTABLE_FIELDS = INVERSE_SORTS = ['start_time', 'test_index']76    from .filter_configs import TEST_FILTERS as FILTER_CONFIG77    def _get_object_by_id(self, object_id):78        return _get_object_by_id_or_logical_id(self.MODEL, object_id)79    def _render_many(self, objects, *, in_collection: bool):80        args = test_query_parser.parse_args()81        user_id = args.user_id or getattr(current_user, 'id', None)82        rendered_tests = super()._render_many(objects, in_collection=in_collection)83        if not rendered_tests or not current_user.is_authenticated:84            return rendered_tests85        if not in_collection:86            rendered_tests['is_starred'] = db.session.query(db.session.query(UserStarredTests).filter(UserStarredTests.user_id == user_id, UserStarredTests.test_id==rendered_tests['id']).exists()).scalar()87        else:88            rendered_test_ids = [test['id'] for test in rendered_tests['tests']]89            starred_test_ids = set([test_id for (test_id, ) in db.session.query(Test.id).join(UserStarredTests).filter(UserStarredTests.user_id == user_id, Test.id.in_(rendered_test_ids)).all()])90            for test in rendered_tests['tests']:91                test['is_starred'] = test['id'] in starred_test_ids92        return rendered_tests93    def _get_iterator(self):94        args = test_query_parser.parse_args()95        user_id = user_id = args.user_id or getattr(current_user, 'id', None)96        if args.id is not None:97            return _get_query_by_id_or_logical_id(self.MODEL, args.id)98        if args.starred:99            return Test.query.join(UserStarredTests).filter(UserStarredTests.user_id == user_id).order_by(UserStarredTests.star_creation_time.desc()).all()100        if args.session_id is None:101            args.session_id = request.view_args.get('session_id')102        if args.search:103            returned = get_orm_query_from_search_string('test', args.search, abort_on_syntax_error=True)104        else:105            returned = super(TestResource, self)._get_iterator().join(Session, Session.id == Test.session_id)106        # get session107        if args.session_id is not None:108            returned = self._filter_by_session_id(returned, args.session_id)109        if args.info_id is not None:110            returned = returned.filter(Test.test_info_id == args.info_id)111        returned = filter_by_statuses(returned, Test)112        if args.session_id is not None:113            if args.after_index is not None:114                returned = returned.filter(self.MODEL.test_index > args.after_index).order_by(self.MODEL.test_index.asc()).limit(1).all()115            elif args.before_index is not None:116                returned = returned.filter(self.MODEL.test_index < args.before_index).order_by(self.MODEL.test_index.desc()).limit(1).all()117        return returned118    def _filter_by_session_id(self, query, session_id):119        try:120            int(session_id)121        except ValueError:122            id_field = lambda model: model.logical_id123        else:124            id_field = lambda model: model.id125        session_aliased = aliased(Session)126        children = (127            db.session.query(id_field(Session))128            .filter(129                db.session.query(session_aliased.id)130                .filter(id_field(session_aliased) == session_id)131                .filter(session_aliased.logical_id == Session.parent_logical_id)132                .exists()133            )134            .all()135        )136        children_ids = [row[0] for row in children]137        criterion = id_field(Session) == session_id138        if children_ids:139            criterion |= id_field(Session).in_(children_ids)140        returned = query.filter(criterion)141        return returned142    def _paginate(self, query, metadata):143        count_pages = bool(request.args.get('session_id'))144        if count_pages:145            num_objects = query.count()146        else:147            num_objects = None148        returned = super()._paginate(query, metadata)149        if count_pages:150            metadata['num_pages'] = math.ceil(num_objects / metadata['page_size']) or 1151        return returned152@_resource('/test_infos/<object_id>')153class TestInfoResource(ModelResource):154    MODEL = models.TestInformation155@_resource('/subjects', '/subjects/<object_id>')156class SubjectResource(ModelResource):157    MODEL = models.Subject158    ONLY_FIELDS = ['id', 'name', 'last_activity']159    SORTABLE_FIELDS = ['last_activity', 'name']160    INVERSE_SORTS = ['last_activity']161    def _get_object_by_id(self, object_id):162        try:163            object_id = int(object_id)164        except ValueError:165            try:166                return self.MODEL.query.filter(models.Subject.name == object_id).one()167            except NoResultFound:168                abort(requests.codes.not_found)169        else:170            return self.MODEL.query.get_or_404(object_id)171def _get_query_by_id_or_logical_id(model, object_id):172    query_filter = model.logical_id == object_id173    try:174        numeric_object_id = int(object_id)175    except ValueError:176        pass177    else:178        query_filter = (model.id == numeric_object_id) | query_filter179    return model.query.filter(query_filter)180def _get_object_by_id_or_logical_id(model, object_id):181    returned = _get_query_by_id_or_logical_id(model, object_id).first()182    if returned is None:183        abort(requests.codes.not_found) # pylint: disable=no-member184    return returned185session_test_user_query_parser = reqparse.RequestParser()186session_test_user_query_parser.add_argument('session_id', type=int, default=None)187session_test_user_query_parser.add_argument('test_id', type=int, default=None)188session_test_user_query_parser.add_argument('user_id', type=int, default=None)189session_test_query_parser = reqparse.RequestParser()190session_test_query_parser.add_argument('session_id', type=int, default=None)191session_test_query_parser.add_argument('test_id', type=int, default=None)192errors_query_parser = reqparse.RequestParser()193errors_query_parser.add_argument('session_id', default=None)194errors_query_parser.add_argument('test_id', default=None)195errors_query_parser.add_argument('interruptions', default=False, type=bool)196@_resource('/warnings', '/warnings/<int:object_id>')197class WarningsResource(ModelResource):198    MODEL = models.Warning199    DEFAULT_SORT = (models.Warning.timestamp.asc(),)200    def _get_iterator(self):201        args = session_test_user_query_parser.parse_args()202        returned = self.MODEL.query.filter_by(test_id=args.test_id, session_id=args.session_id)203        return returned204@_resource('/errors', '/errors/<int:object_id>')205class ErrorResource(ModelResource):206    MODEL = Error207    DEFAULT_SORT = (Error.timestamp.asc(),)208    def _get_iterator(self):209        args = errors_query_parser.parse_args()210        if args.session_id is not None:211            session = _get_object_by_id_or_logical_id(Session, args.session_id)212            # We want to query only the session's own errors if it is either a non-parllel session or a child session213            if session.parent_logical_id is not None or not session.is_parent_session:214                query = db.session.query(Error).filter(Error.session_id == session.id)215            else:216                query = db.session.query(Error).join(Session).filter(or_(Session.id == session.id, Session.parent_logical_id == session.logical_id))217        elif args.test_id is not None:218            query = Error.query.filter_by(test_id=parse_test_id(args.test_id))219        else:220            abort(requests.codes.bad_request)221        if args.interruptions:222            query = query.filter_by(is_interruption=True)223        else:224            query = query.filter((self.MODEL.is_interruption == False) | (self.MODEL.is_interruption == None)) # pylint: disable=singleton-comparison225        return query226@blueprint.route('/tracebacks/<uuid>')227def get_traceback(uuid):228    if not current_app.config['DEBUG'] and not current_app.config['TESTING']:229        abort(requests.codes.not_found)230    path = _get_traceback_path(uuid)231    if not os.path.isfile(path):232        abort(requests.codes.not_found)233    def sender():234        with open(path, 'rb') as f:235            while True:236                buff = f.read(4096)237                if not buff:238                    break239                yield buff240    return Response(sender(), headers={'Content-Encoding': 'gzip', 'Content-Type': 'application/json'})241def _get_traceback_path(uuid):242    return os.path.join(current_app.config['TRACEBACK_DIR'], uuid[:2], uuid + '.gz')243@_resource('/users', '/users/<object_id>')244class UserResource(ModelResource):245    ONLY_FIELDS = ['id', 'email', 'last_activity']246    SORTABLE_FIELDS = ['last_activity', 'email', 'first_name', 'last_name']247    INVERSE_SORTS = ['last_activity']248    MODEL = User249    def _get_iterator(self):250        returned = super()._get_iterator()251        if request.args.get('current_user'):252            if not current_user.is_authenticated:253                return []254            object_id = current_user.id255            returned = returned.filter(self.MODEL.id == object_id)256        filter = request.args.get('filter')257        if filter:258            filter = filter.lower()259            returned = returned.filter(func.lower(User.first_name).contains(filter) | func.lower(User.last_name).contains(filter) | func.lower(User.email).contains(filter))260        return returned261    def _get_object_by_id(self, object_id):262        try:263            object_id = int(object_id)264        except ValueError:265            try:266                object_id = User.query.filter_by(email=object_id).one().id267            except NoResultFound:268                abort(requests.codes.not_found)269        return User.query.get_or_404(int(object_id))270@_resource('/comments', '/comments/<object_id>', methods=['get', 'delete', 'put'])271class CommentResource(ModelResource):272    MODEL = models.Comment273    DEFAULT_SORT = (models.Comment.timestamp.asc(),)274    def _get_iterator(self):275        args = session_test_query_parser.parse_args()276        if not ((args.session_id is not None) ^ (args.test_id is not None)): # pylint: disable=superfluous-parens277            error_abort('Either test_id or session_id must be passed to the query')278        return models.Comment.query.filter_by(session_id=args.session_id, test_id=args.test_id)279    def delete(self, object_id=None):280        if object_id is None:281            error_abort('Not implemented', code=requests.codes.not_implemented)282        comment = models.Comment.query.get_or_404(object_id)283        if comment.session_id is not None:284            obj = models.Session.query.get(comment.session_id)285        else:286            obj = models.Test.query.get(comment.test_id)287        if comment.user_id != current_user.id:288            error_abort('Not allowed to delete comment', code=requests.codes.forbidden)289        obj.num_comments = type(obj).num_comments - 1290        models.db.session.add(obj)291        models.db.session.delete(comment)292        models.db.session.commit()293    def put(self, object_id=None):294        if object_id is None:295            error_abort('Not implemented', code=requests.codes.not_implemented)296        comment = models.Comment.query.get_or_404(object_id)297        if comment.user_id != current_user.id:298            error_abort('Not allowed to delete comment', code=requests.codes.forbidden)299        comment.comment = request.get_json().get('comment', {}).get('comment')300        comment.edited = True301        models.db.session.add(comment)302        models.db.session.commit()303        return jsonify({'comment': self._render_single(comment, in_collection=False)})304related_entity_parser = reqparse.RequestParser()305related_entity_parser.add_argument('session_id', default=None, type=int)306related_entity_parser.add_argument('test_id', default=None, type=int)307@_resource('/entities', '/entities/<int:object_id>')308class RelatedEntityResource(ModelResource):309    MODEL = models.Entity310    def _get_iterator(self):311        args = related_entity_parser.parse_args()312        if not ((args.session_id is None) ^ (args.test_id is None)):313            error_abort('Either test_id or session_id must be provided')314        if args.session_id is not None:315            return self._get_all_children_entities(args.session_id)316        elif args.test_id is not None:317            return models.Entity.query.join(models.test_entity).filter(models.test_entity.c.test_id == args.test_id)318        else:319            raise NotImplementedError() # pragma: no cover320    def _get_all_children_entities(self, session_id):321        query = models.Entity.query.join(models.session_entity).join(models.Session)322        session_alias = aliased(models.Session)323        criterion = models.Session.id == session_id324        criterion |= (db.session.query(session_alias)325                      .filter(session_alias.id == session_id)326                      .filter(session_alias.logical_id is not None)327                      .filter(session_alias.logical_id == models.Session.parent_logical_id).exists().correlate(models.Session))328        return query.filter(criterion)329@_resource('/migrations', '/migrations/<object_id>')330class MigrationsResource(ModelResource):331    MODEL = models.BackgroundMigration332    DEFAULT_SORT = (models.BackgroundMigration.started_time.desc(),)333    ONLY_FIELDS = [334        'id',335        'name',336        'started',337        'started_time',338        'finished',339        'finished_time',340        'total_num_objects',341        'remaining_num_objects'342    ]343@_resource('/cases', '/cases/<object_id>')344class CaseResource(ModelResource):345    MODEL = models.TestInformation346    DEFAULT_SORT = (models.TestInformation.name, models.TestInformation.file_name, models.TestInformation.class_name)347    def _get_iterator(self):348        search = request.args.get('search')349        if search:350            returned = get_orm_query_from_search_string('case', search, abort_on_syntax_error=True)351        else:352            returned = super()._get_iterator()353        returned = returned.filter(~self.MODEL.file_name.like('/%'))354        return returned355@_resource('/replications', '/replications/<object_id>')356class ReplicationsResource(ModelResource):357    MODEL = models.Replication358    ONLY_FIELDS = [359        'id',360        'paused',361        'avg_per_second',362        'backlog_remaining',363        'last_replicated_timestamp',364        'last_error',365        'service_type',366        'username',367        'url',368        'index_name'369    ]370    def _render_many(self, objects, *, in_collection: bool):371        returned = super()._render_many(objects, in_collection=in_collection)372        [latest_timestamp] = models.db.session.query(func.max(models.Test.updated_at)).one()373        if latest_timestamp:374            latest_timestamp = latest_timestamp.timestamp()375        if in_collection:376            collection = returned.get('replications', [])377        else:378            collection = [returned]379        for replication in collection:380            last_replicated = replication['last_replicated_timestamp']381            if not latest_timestamp or not last_replicated:382                lag = None383            else:384                lag = latest_timestamp - last_replicated385            replication['lag_seconds'] = lag386        return returned387    def put(self, object_id=None):388        if object_id is None:389            error_abort('Not implemented', code=requests.codes.not_implemented)390        if not has_role(current_user, 'admin'):391            error_abort('Forbidden', code=requests.codes.forbidden)392        replication = models.Replication.query.get_or_404(object_id)393        request_json = request.get_json().get("replication", {})394        for field_name in {'username', 'url', 'password'}:395            value = request_json.get(field_name)396            if value is not None:397                setattr(replication, field_name, value)398        models.db.session.commit()399        return jsonify({'replication': self._render_single(replication, in_collection=False)})400@blueprint.route('/admin_alerts')401def get_admin_alerts():402    return jsonify({403        'admin_alerts': [404            {405                'id': index,406                "message": alert,407            }408            for index, alert in enumerate(_iter_alerts(), 1)409        ]410    })411def _iter_alerts():412    if models.Replication.query.filter(models.Replication.last_error != None).count():...test_query_parser.py
Source:test_query_parser.py  
1import unittest2from query_parser import QueryParser3class TestQueryParser(unittest.TestCase):4    test_query_parser = QueryParser()5    def test_split_query_into_arguments(self):6        query = "search user field value"7        query_args = QueryParser.split_query_into_arguments(query)8        self.assertEqual(query_args[0], "search")9        self.assertEqual(query_args[1], "user")10        self.assertEqual(query_args[2], "field")11        self.assertEqual(query_args[3], "value")12    def test_type_of_query(self):13        query_args = ["search", "user", "field", "value"]14        query_type = self.test_query_parser.get_type_of_query(query_args)15        self.assertEqual(query_type, QueryParser.QueryType.SEARCH_QUERY)16        query_args = ["list", "user"]17        query_type = self.test_query_parser.get_type_of_query(query_args)18        self.assertEqual(query_type, QueryParser.QueryType.LIST_QUERY)19        query_args = ["help"]20        query_type = self.test_query_parser.get_type_of_query(query_args)21        self.assertEqual(query_type, QueryParser.QueryType.HELP_QUERY)22if __name__ == '__main__':...__main__.py
Source:__main__.py  
1import unittest2from test_parse_date import *3from test_parsing import *4from test_query_lexer import *5from test_query_parser import *6from test_query import *7from test_textutils import *8from test_todos import *9if __name__ == '__main__':...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
