Best Python code snippet using slash
xcap_manager.py
Source:xcap_manager.py  
1#!/usr/bin/python2"""3Script for testing XCAPManager. Initializes middleware using a configuration4file named "test-config" in the current directory. Uses the default account to5initialize an XCAPManager. Type "help" for a list of implemented commands. The6commands have the same arguments as the ones in XCAPManager. Arguments are7specified using JSON (don't use spaces or quote each argument):8add_contact 'contact={"name": "My buddy", "uri": "sip:buddy@example.org", "group": "Buddies"}'9or10add_contact contact={"name":"My buddy","uri":"sip:buddy@example.org","group":"Buddies"}11To exit, type "exit".12"""13import cjson14import os15import shlex16import string17from cmd import Cmd18from itertools import count, takewhile19from StringIO import StringIO20from application.notification import NotificationCenter, IObserver21from application.python.decorator import decorator, preserve_signature22from application.python.util import Null, Singleton23from threading import Event24from zope.interface import implements25from sipsimple.account import AccountManager26from sipsimple.application import SIPApplication27from sipsimple.configuration.backend.file import FileBackend28from sipsimple.util import Timestamp, run_in_green_thread29from sipsimple.xcap import All, CatchAllCondition, Class, Contact, DeviceID, DialoginfoPolicy, DomainCondition, DomainException, OccurenceID, OfflineStatus, PresencePolicy, ServiceURI, ServiceURIScheme, UserException, XCAPManager30class XCAPApplication(object):31    __metaclass__ = Singleton32    implements(IObserver)33    def __init__(self):34        self.application = SIPApplication()35        self.xcap_manager = None36        self.quit_event = Event()37        notification_center = NotificationCenter()38        notification_center.add_observer(self, sender=self.application)39    def start(self):40        self.application.start(FileBackend(os.path.realpath('test-config')))41    @run_in_green_thread42    def stop(self):43        self.xcap_manager.stop()44        self.application.stop()45    def handle_notification(self, notification):46        handler = getattr(self, '_NH_%s' % notification.name, Null)47        handler(notification)48    @run_in_green_thread49    def _NH_SIPApplicationDidStart(self, notification):50        account_manager = AccountManager()51        self.xcap_manager = XCAPManager(account_manager.default_account)52        notification_center = NotificationCenter()53        notification_center.add_observer(self, sender=self.xcap_manager)54        self.xcap_manager.load(os.path.realpath('xcap-cache'))55        self.xcap_manager.start()56    def _NH_SIPApplicationDidEnd(self, notification):57        self.quit_event.set()58    def _NH_XCAPManagerDidChangeState(self, notification):59        print 'XCAP Manager state %s -> %s' % (notification.data.prev_state, notification.data.state)60    def _NH_XCAPManagerWillStart(self, notification):61        print 'XCAP Manager will start'62    def _NH_XCAPManagerDidStart(self, notification):63        print 'XCAP Manager did start'64    def _NH_XCAPManagerDidDiscoverServerCapabilities(self, notification):65        print '  contact list supported: %s' % notification.data.contactlist_supported66        print '  presence policies supported: %s' % notification.data.presence_policies_supported67        print '  dialoginfo policies supported: %s' % notification.data.dialoginfo_policies_supported68        print '  status icon supported: %s' % notification.data.status_icon_supported69        print '  offline status supported: %s' % notification.data.offline_status_supported70    def _NH_XCAPManagerWillEnd(self, notification):71        print 'XCAP Manager will end'72    def _NH_XCAPManagerDidEnd(self, notification):73        print 'XCAP Manager did end'74    def _NH_XCAPManagerDidReloadData(self, notification):75        print 'XCAP Manager reloaded data:'76        groups = dict.fromkeys(notification.data.groups)77        for group in groups:78            groups[group] = []79        for contact in notification.data.contacts:80            if contact.group is not None:81                groups[contact.group].append(contact)82        print 'Buddies:'83        for group, contacts in groups.iteritems():84            print '  %s:' % group85            for contact in contacts:86                if contact.name:87                    print '    %s <%s>' % (contact.name, contact.uri)88                else:89                    print '    %s' % contact.uri90                print '      subscribe-to-presence    = %s' % contact.subscribe_to_presence91                print '      subscribe-to-dialoginfo  = %s' % contact.subscribe_to_dialoginfo92                print '      presence-policies        = %s' % (', '.join(p.id for p in contact.presence_policies) if contact.presence_policies else None)93                print '      dialoginfo-policies      = %s' % (', '.join(p.id for p in contact.dialoginfo_policies) if contact.dialoginfo_policies else None)94                for attr, value in contact.attributes.iteritems():95                    print '      x: %s = %s' % (attr, value)96            print97        print 'Presence policies:'98        for policy in notification.data.presence_policies:99            print '  %s -> %s' % (policy.id, policy.action)100            if policy.sphere:101                print '    sphere                     = %s' % policy.sphere102            if policy.validity:103                print '    valid between:'104                for from_timestamp, until_timestamp in policy.validity:105                    print '      %s - %s' % (from_timestamp, until_timestamp)106            if policy.multi_identity_conditions:107                print '    multi identity conditions:'108                for multi_condition in policy.multi_identity_conditions:109                    if isinstance(multi_condition, CatchAllCondition) and multi_condition.exceptions:110                        print '      anyone except'111                        for exception in multi_condition.exceptions:112                            if isinstance(exception, DomainException):113                                print '        users from domain %s' % exception.domain114                            elif isinstance(exception, UserException):115                                print '        user %s' % exception.uri116                    elif isinstance(multi_condition, CatchAllCondition):117                        print '      anyone'118                    elif isinstance(multi_condition, DomainCondition) and multi_condition.exceptions:119                        print '      anyone from domain %s except' % multi_condition.domain120                        for exception in multi_condition.exceptions:121                            if isinstance(exception, UserException):122                                print '        user %s' % exception.uri123                    elif isinstance(multi_condition, DomainCondition):124                        print '      anyone from domain %s' % multi_condition.domain125            if policy.provide_devices is All:126                print '    provide-devices            = All'127            elif policy.provide_devices:128                print '    provide-devices:'129                for prv in policy.provide_devices:130                    if isinstance(prv, Class):131                        print '      class                    = %s' % prv132                    elif isinstance(prv, OccurenceID):133                        print '      occurence-id             = %s' % prv134                    elif isinstance(prv, DeviceID):135                        print '      device-id                = %s' % prv136                    else:137                        print '      unknown                  = %s(%r)' % (prv, type(prv).__name__)138            if policy.provide_persons is All:139                print '    provide-persons            = All'140            elif policy.provide_persons:141                print '    provide-persons:'142                for prv in policy.provide_persons:143                    if isinstance(prv, Class):144                        print '      class                    = %s' % prv145                    elif isinstance(prv, OccurenceID):146                        print '      occurence-id             = %s' % prv147                    else:148                        print '      unknown                  = %s(%r)' % (prv, type(prv).__name__)149            if policy.provide_services is All:150                print '    provide-services           = All'151            elif policy.provide_services:152                print '    provide-services:'153                for prv in policy.provide_services:154                    if isinstance(prv, Class):155                        print '      class                    = %s' % prv156                    elif isinstance(prv, OccurenceID):157                        print '      occurence-id             = %s' % prv158                    elif isinstance(prv, ServiceURI):159                        print '      service-uri              = %s' % prv160                    elif isinstance(prv, ServiceURIScheme):161                        print '      service-uri-scheme       = %s' % prv162                    else:163                        print '      unknown                  = %s(%r)' % (prv, type(prv).__name__)164            print '    provide-activities         = %s' % policy.provide_activities165            print '    provide-class              = %s' % policy.provide_class166            print '    provide-device-id          = %s' % policy.provide_device_id167            print '    provide-mood               = %s' % policy.provide_mood168            print '    provide-place-is           = %s' % policy.provide_place_is169            print '    provide-place-type         = %s' % policy.provide_place_type170            print '    provide-privacy            = %s' % policy.provide_privacy171            print '    provide-relationship       = %s' % policy.provide_relationship172            print '    provide-status-icon        = %s' % policy.provide_status_icon173            print '    provide-sphere             = %s' % policy.provide_sphere174            print '    provide-time-offset        = %s' % policy.provide_time_offset175            print '    provide-user-input         = %s' % policy.provide_user_input176            print '    provide-unknown-attributes = %s' % policy.provide_unknown_attributes177            print '    provide-all-attributes     = %s' % policy.provide_all_attributes178            print179        print 'Dialog-info policies:'180        for policy in notification.data.dialoginfo_policies:181            print '  %s -> %s' % (policy.id, policy.action)182            if policy.sphere:183                print '    sphere                     = %s' % policy.sphere184            if policy.validity:185                print '    valid between:'186                for from_timestamp, until_timestamp in policy.validity:187                    print '      %s - %s' % (from_timestamp, until_timestamp)188            print189        print 'RLS services:'190        for service in notification.data.services:191            print '  %s -> %s' % (service.uri, ', '.join(service.packages))192            for entry in service.entries:193                print '    %s' % entry194            print195        print 'Offline status:'196        if notification.data.offline_status:197            print '  Note: %s' % notification.data.offline_status.note198            print '  Activity: %s' % notification.data.offline_status.activity199        else:200            print '  Missing'201application = XCAPApplication()202application.start()203class Stop(object):204    pass205@decorator206def command_handler(func):207    @preserve_signature(func)208    def wrapper(*args, **kw):209        try:210            return func(*args, **kw)211        except Exception, e:212            print 'Error: %s' % e213    return wrapper214class Interface(Cmd):215    prompt = 'xcap> '216    def emptyline(self):217        return218    def parse_arguments(self, line):219        s = shlex.shlex(StringIO(line))220        s.wordchars = ''.join(c for c in string.printable if c!="'" and c not in string.whitespace)221        s.quotes = "'"222        args = list(takewhile(lambda x: x, (s.get_token() for _ in count())))223        arguments = {}224        for arg in args:225            if len(arg) >= 2 and arg[0] == arg[-1] and arg[0] in ('"', "'"):226                arg = arg[1:-1]227            try:228                name, value = arg.split('=', 1)229                arguments[name] = cjson.decode(value)230            except (ValueError, cjson.DecodeError):231                continue232        return arguments233    @command_handler234    def do_start_transaction(self, line):235        application.xcap_manager.start_transaction()236    @command_handler237    def do_commit_transaction(self, line):238        application.xcap_manager.commit_transaction()239    @command_handler240    def do_add_group(self, line):241        arguments = self.parse_arguments(line)242        group = arguments.get('group')243        if not isinstance(group, basestring):244            raise ValueError('expected string group')245        application.xcap_manager.add_group(group)246    @command_handler247    def do_rename_group(self, line):248        arguments = self.parse_arguments(line)249        old_name = arguments.get('old_name')250        new_name = arguments.get('new_name')251        if not isinstance(old_name, basestring):252            raise ValueError('expected string old_name')253        if not isinstance(new_name, basestring):254            raise ValueError('expected string new_name')255        application.xcap_manager.rename_group(old_name, new_name)256    @command_handler257    def do_remove_group(self, line):258        arguments = self.parse_arguments(line)259        group = arguments.get('group')260        if not isinstance(group, basestring):261            raise ValueError('expected string group')262        application.xcap_manager.remove_group(group)263    @command_handler264    def do_add_contact(self, line):265        arguments = self.parse_arguments(line)266        contact_attrs = arguments.get('contact')267        if not isinstance(contact_attrs, dict):268            raise ValueError('expected object contact')269        contact = self.get_contact(contact_attrs)270        application.xcap_manager.add_contact(contact)271    @command_handler272    def do_update_contact(self, line):273        arguments = self.parse_arguments(line)274        contact_attrs = arguments.pop('contact', None)275        if not isinstance(contact_attrs, dict):276            raise ValueError('expected object contact')277        contact = self.get_contact(contact_attrs)278        if 'presence_policies' in arguments:279            presence_policies = arguments.pop('presence_policies')280            if presence_policies is not None and not isinstance(presence_policies, list):281                raise ValueError('expected list attribute presence_policies')282            if any(not isinstance(policy, dict) for policy in presence_policies):283                raise ValueError('expected object items in presence_policies')284            arguments['presence_policies'] = [self.get_presence_policy(policy) for policy in presence_policies]285        if 'dialoginfo_policies' in arguments:286            dialoginfo_policies = arguments.pop('dialoginfo_policies')287            if dialoginfo_policies is not None and not isinstance(dialoginfo_policies, list):288                raise ValueError('expected list attribute dialoginfo_policies')289            if any(not isinstance(policy, dict) for policy in dialoginfo_policies):290                raise ValueError('expected object items in dialoginfo_policies')291            arguments['dialoginfo_policies'] = [self.get_dialoginfo_policy(policy) for policy in dialoginfo_policies]292        application.xcap_manager.update_contact(contact, **arguments)293    @command_handler294    def do_remove_contact(self, line):295        arguments = self.parse_arguments(line)296        contact_attrs = arguments.pop('contact', None)297        if not isinstance(contact_attrs, dict):298            raise ValueError('expected object contact')299        contact = self.get_contact(contact_attrs)300        application.xcap_manager.remove_contact(contact)301    @command_handler302    def do_add_presence_policy(self, line):303        arguments = self.parse_arguments(line)304        policy = arguments.pop('policy', None)305        if not isinstance(policy, dict):306            raise ValueError('expected object policy')307        application.xcap_manager.add_presence_policy(self.get_presence_policy(policy))308    @command_handler309    def do_update_presence_policy(self, line):310        arguments = self.parse_arguments(line)311        policy = arguments.pop('policy', None)312        if not isinstance(policy, dict):313            raise ValueError('expected object policy')314        policy = self.get_presence_policy(policy)315        if 'validity' in arguments:316            validity = arguments.pop('validity')317            if validity is not None:318                if not isinstance(validity, list):319                    raise ValueError('expected list validity or nill')320                validity = [(Timestamp(from_timestamp), Timestamp(until_timestamp)) for from_timestamp, until_timestamp in validity]321            arguments['validity'] = validity322        multi_identity_conditions = arguments.pop('multi_identity_conditions', None)323        if multi_identity_conditions is not None and not isinstance(multi_identity_conditions, list):324            raise ValueError('expected list multi_identity_conditions or nill')325        if multi_identity_conditions is not None:326            arguments['multi_identity_conditions'] = []327            for multi_condition_attributes in multi_identity_conditions:328                if 'domain' in multi_condition_attributes:329                    multi_condition = DomainCondition(multi_condition_attributes.pop('domain'))330                else:331                    multi_condition = CatchAllCondition()332                arguments['multi_identity_conditions'].append(multi_condition)333                if multi_condition_attributes.get('exceptions', None):334                    for exception_attributes in multi_condition_attributes['exceptions']:335                        if 'domain' in exception_attributes:336                            multi_condition.exceptions.append(DomainException(exception_attributes.pop('domain')))337                        elif 'uri' in exception_attributes:338                            multi_condition.exceptions.append(UserException(exception_attributes.pop('uri')))339        application.xcap_manager.update_presence_policy(policy, **arguments)340    @command_handler341    def do_remove_presence_policy(self, line):342        arguments = self.parse_arguments(line)343        policy = arguments.pop('policy', None)344        if not isinstance(policy, dict):345            raise ValueError('expected object policy')346        policy = self.get_presence_policy(policy)347        application.xcap_manager.remove_presence_policy(policy)348    @command_handler349    def do_add_dialoginfo_policy(self, line):350        arguments = self.parse_arguments(line)351        policy = arguments.pop('policy', None)352        if not isinstance(policy, dict):353            raise ValueError('expected object policy')354        application.xcap_manager.add_dialoginfo_policy(self.get_dialoginfo_policy(policy))355    @command_handler356    def do_update_dialoginfo_policy(self, line):357        arguments = self.parse_arguments(line)358        policy = arguments.pop('policy', None)359        if not isinstance(policy, dict):360            raise ValueError('expected object policy')361        policy = self.get_dialoginfo_policy(policy)362        if 'validity' in arguments:363            validity = arguments.pop('validity')364            if validity is not None:365                if not isinstance(validity, list):366                    raise ValueError('expected list validity or nill')367                validity = [(Timestamp(from_timestamp), Timestamp(until_timestamp)) for from_timestamp, until_timestamp in validity]368            arguments['validity'] = validity369        multi_identity_conditions = arguments.pop('multi_identity_conditions', None)370        if multi_identity_conditions is not None and not isinstance(multi_identity_conditions, list):371            raise ValueError('expected list multi_identity_conditions or nill')372        if multi_identity_conditions is not None:373            arguments['multi_identity_conditions'] = []374            for multi_condition_attributes in multi_identity_conditions:375                if 'domain' in multi_condition_attributes:376                    multi_condition = DomainCondition(multi_condition_attributes.pop('domain'))377                else:378                    multi_condition = CatchAllCondition()379                arguments['multi_identity_conditions'].append(multi_condition)380                if multi_condition_attributes.get('exceptions', None):381                    for exception_attributes in multi_condition_attributes['exceptions']:382                        if 'domain' in exception_attributes:383                            multi_condition.exceptions.append(DomainException(exception_attributes.pop('domain')))384                        elif 'uri' in exception_attributes:385                            multi_condition.exceptions.append(UserException(exception_attributes.pop('uri')))386        application.xcap_manager.update_dialoginfo_policy(policy, **arguments)387    @command_handler388    def do_remove_dialoginfo_policy(self, line):389        arguments = self.parse_arguments(line)390        policy = arguments.pop('policy', None)391        if not isinstance(policy, dict):392            raise ValueError('expected object policy')393        policy = self.get_dialoginfo_policy(policy)394        application.xcap_manager.remove_dialoginfo_policy(policy)395    @command_handler396    def do_set_offline_status(self, line):397        arguments = self.parse_arguments(line)398        if not arguments:399            application.xcap_manager.set_offline_status(None)400        else:401            note = arguments.pop('note', None)402            activity = arguments.pop('activity', None)403            application.xcap_manager.set_offline_status(OfflineStatus(activity, note))404    @command_handler405    def do_exit(self, line):406        application.stop()407        application.quit_event.wait()408        return Stop409    def get_contact(self, obj):410        if 'uri' not in obj:411            raise ValueError('expected string attribute uri of contact object')412        contact = Contact(obj.pop('name', None), obj.pop('uri'), obj.pop('group', None))413        presence_policies = obj.pop('presence_policies', None)414        if presence_policies is not None:415            if not isinstance(presence_policies, list):416                raise ValueError('expected list attribute presence_policies of contact object or nill')417            if any(not isinstance(policy, dict) for policy in presence_policies):418                raise ValueError('expected object items in attribute presence_policies of contact object')419            contact.presence_policies = [self.get_presence_policy(policy) for policy in presence_policies]420        dialoginfo_policies = obj.pop('dialoginfo_policies', None)421        if dialoginfo_policies is not None:422            if not isinstance(dialoginfo_policies, list):423                raise ValueError('expected list attribute dialoginfo_policies of contact object or nill')424            if any(not isinstance(policy, dict) for policy in dialoginfo_policies):425                raise ValueError('expected object items in attribute dialoginfo_policies of contact object')426            contact.dialoginfo_policies = [self.get_dialoginfo_policy(policy) for policy in dialoginfo_policies]427        for attr, value in obj.iteritems():428            if attr in ('subscribe_to_presence', 'subscribe_to_dialoginfo'):429                value = True if value == 'True' else False430            setattr(contact, attr, value)431        return contact432    def get_policy(self, cls, obj):433        policy = cls(obj.pop('id', None), obj.pop('action', None))434        policy.name = obj.pop('name', None)435        validity = obj.pop('validity', None)436        if validity is not None:437            if not isinstance(validity, list):438                raise ValueError('expected list attribute validity of policy object or nill')439            policy.validity = [(Timestamp(from_timestamp), Timestamp(until_timestamp)) for from_timestamp, until_timestamp in validity]440        sphere = obj.pop('sphere', None)441        if sphere is not None and not isinstance(sphere, basestring):442            raise ValueError('expected string attribute sphere of policy object or nill')443        policy.sphere = sphere444        multi_identity_conditions = obj.pop('multi_identity_conditions', None)445        if multi_identity_conditions is not None and not isinstance(multi_identity_conditions, list):446            raise ValueError('expected list attribute multi_identity_conditions of policy object or nill')447        if multi_identity_conditions is not None:448            policy.multi_identity_conditions = []449            for multi_condition_attributes in multi_identity_conditions:450                if 'domain' in multi_condition_attributes:451                    multi_condition = DomainCondition(multi_condition_attributes.pop('domain'))452                else:453                    multi_condition = CatchAllCondition()454                policy.multi_identity_conditions.append(multi_condition)455                if multi_condition_attributes.get('exceptions', None):456                    for exception_attributes in multi_condition_attributes['exceptions']:457                        if 'domain' in exception_attributes:458                            print 'adding exception for domain'459                            multi_condition.exceptions.append(DomainException(exception_attributes.pop('domain')))460                        elif 'uri' in exception_attributes:461                            print 'adding exception for uri'462                            multi_condition.exceptions.append(UserException(exception_attributes.pop('uri')))463        return policy464    def get_presence_policy(self, obj):465        policy = self.get_policy(PresencePolicy, obj)466        return policy467    def get_dialoginfo_policy(self, obj):468        policy = self.get_policy(DialoginfoPolicy, obj)469        return policy470    def postcmd(self, stop, line):471        if stop is Stop:472            return True473interface = Interface()...encoders.py
Source:encoders.py  
1#-------------------------------------------------------------------------------2#3# Project: EOxServer <http://eoxserver.org>4# Authors: Fabian Schindler <fabian.schindler@eox.at>5#6#-------------------------------------------------------------------------------7# Copyright (C) 2013 EOX IT Services GmbH8#9# Permission is hereby granted, free of charge, to any person obtaining a copy10# of this software and associated documentation files (the "Software"), to deal11# in the Software without restriction, including without limitation the rights12# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell13# copies of the Software, and to permit persons to whom the Software is14# furnished to do so, subject to the following conditions:15#16# The above copyright notice and this permission notice shall be included in all17# copies of this Software or works derived from this Software.18#19# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR20# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,21# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE22# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER23# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,24# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN25# THE SOFTWARE.26#-------------------------------------------------------------------------------27from lxml.builder import ElementMaker28from eoxserver.core.util.xmltools import XMLEncoder, NameSpace, NameSpaceMap29ns_xlink = NameSpace("http://www.w3.org/1999/xlink", "xlink")30ns_ows = NameSpace("http://www.opengis.net/ows/1.1", "ows", "http://schemas.opengis.net/ows/1.1.0/owsExceptionReport.xsd")31ns_xml = NameSpace("http://www.w3.org/XML/1998/namespace", "xml")32nsmap = NameSpaceMap(ns_ows)33OWS = ElementMaker(namespace=ns_ows.uri, nsmap=nsmap)34class OWS11ExceptionXMLEncoder(XMLEncoder):35    def encode_exception(self, message, version, code, locator=None):36        exception_attributes = {37            "exceptionCode": code38        }39        if locator:40            exception_attributes["locator"] = locator41        exception_text = (OWS("ExceptionText", message),) if message else ()42        return OWS("ExceptionReport", 43            OWS("Exception", *exception_text, **exception_attributes44            ), version=version, **{ns_xml("lang"): "en"}45        )46    def get_schema_locations(self):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
