Best Python code snippet using molecule_python
email_handler.py
Source:email_handler.py  
...190class NoteEmailHandler(EmailHandler):191    def check_rule(my):192        '''determine whether an email should be sent'''193        return True194    def _get_login(my, assigned):195        return Login.get_by_login(assigned)196        197    198    def get_to(my):199        #recipients = super(NoteEmailHandler, my).get_to()200        recipients = set()201        note = my.sobject202        search = Search(Task)203        search.add_filter('search_type', note.get_value('search_type'))204        search.add_filter('search_id', note.get_value('search_id'))205        # it will get the context if process not found206        search.add_filter('process', note.get_process())207        search.add_filter('project_code',note.get_value('project_code'))208        tasks = search.get_sobjects()209        for task in tasks:210            assigned = my._get_login(task.get_assigned())211            if assigned:212                recipients.add(assigned)213            supe = my._get_login(task.get_supervisor())214            if supe:215                recipients.add(supe)216        return recipients217class GeneralNoteEmailHandler(EmailHandler):218    def check_rule(my):219        '''determine whether an email should be sent'''220        return True221    def _get_login(my, assigned):222        return Login.get_by_login(assigned)223        224    def get_to(my):225        #recipients = super(NoteEmailHandler, my).get_to()226        recipients = set()227        note = my.sobject228        search = Search(Task)229        grand_parent = None230        parent_search_type = note.get_value('search_type')231        if 'prod/submission' in parent_search_type:232            grand_parent = my.parent.get_parent()233        234        search_type = note.get_value('search_type')235        search_id = note.get_value('search_id')236        if grand_parent:237            search_type = grand_parent.get_search_type()238            search_id = grand_parent.get_id()239        search.add_filter('search_type', search_type)240        search.add_filter('search_id', search_id )241        # it will get the context if process not found242        #search.add_filter('process', note.get_process())243        search.add_filter('project_code',note.get_value('project_code'))244        tasks = search.get_sobjects()245        for task in tasks:246            assigned = my._get_login(task.get_assigned())247            if assigned:248                recipients.add(assigned)249            supe = my._get_login(task.get_supervisor())250            if supe:251                recipients.add(supe)252        return recipients253    def get_message(my):254        255        search_type_obj = my.sobject.get_search_type_obj()256        title = search_type_obj.get_title()257        258        notification_message = my.notification.get_value("message")259        message = "%s %s" % (title, my.sobject.get_name())260        if notification_message:261            message = "%s (%s)" %(message, notification_message)262        update_desc = my.sobject.get_update_description()263        parent_search_type = my.sobject.get_value('search_type')264        grand_parent = None265        if 'prod/submission' in parent_search_type:266            parent = Search.get_by_id(parent_search_type, my.sobject.get_value('search_id') )267            snapshot = Snapshot.get_latest_by_sobject(parent, 'publish')268            if snapshot:269                file_name = snapshot.get_file_name_by_type('main')270                update_desc = '%s \n %s \n' %(update_desc, file_name)271            grand_parent = parent.get_parent()272            if grand_parent:273                update_desc = '%s %s'%(update_desc, grand_parent.get_code())274        command_desc = my.command.get_description()275        message = '%s\n\nReport from transaction:\n%s\n\n%s' \276            % (message, update_desc, command_desc)277        return message278class GeneralPublishEmailHandler(EmailHandler):279    ''' On publish of a shot/asset, it will find all the assignees of this 280        shot/asset and send the notification to them'''281    def check_rule(my):282        '''determine whether an email should be sent'''283        return True284    def _get_login(my, assigned):285        return Login.get_by_login(assigned)286        287    def get_to(my):288        recipients = super(GeneralPublishEmailHandler, my).get_to()289        sobj = my.sobject290        search = Search(Task)291        292        search_type = sobj.get_search_type()293        294        search_id = sobj.get_id()295       296        search.add_filter('search_type', search_type)297        search.add_filter('search_id', search_id )298        # it will get the context if process not found299        #search.add_filter('process', note.get_process())300        from pyasm.biz import Project301        search.add_filter('project_code', Project.get_project_code())302        tasks = search.get_sobjects()303        for task in tasks:304            assigned = my._get_login(task.get_assigned())305            if assigned:306                recipients.add(assigned)307            supe = my._get_login(task.get_supervisor())308            if supe:309                recipients.add(supe)310        return recipients311    def get_message(my):312        313        search_type_obj = my.sobject.get_search_type_obj()314        title = search_type_obj.get_title()315        316        notification_message = my.notification.get_value("message")317        message = "%s %s" % (title, my.sobject.get_name())318        if notification_message:319            message = "%s (%s)" %(message, notification_message)320        update_desc = my.sobject.get_update_description()321        322        command_desc = my.command.get_description()323        message = '%s\n\nReport from transaction:\n%s\n\n%s' \324            % (message, update_desc, command_desc)325        return message326class TaskStatusEmailHandler(EmailHandler):327    def check_rule(my):328        '''determine whether an email should be sent'''329        return True330    def _get_login(my, assigned):331        return Login.get_by_login(assigned)332        333    def get_to(my):334        recipients = super(TaskStatusEmailHandler, my).get_to()335        sobj = my.sobject336       337        # it could be the parent of task:338        if not isinstance(sobj, Task):339            tasks = Task.get_by_sobject(sobj)340        else:341            tasks = [sobj]342        for task in tasks:343            assigned = my._get_login(task.get_assigned())344            if assigned:345                recipients.add(assigned)346            supe = my._get_login(task.get_supervisor())347            if supe:348                recipients.add(supe)349        return recipients350class SubmissionEmailHandler(EmailHandler):351    '''Email sent when a submission is entered'''352    def get_message(my):353        search_type_obj = my.sobject.get_search_type_obj()354        title = search_type_obj.get_title()355        subject = my.get_subject()356        notification_message = my.notification.get_value("message")357        message = "%s %s" % (title, my.sobject.get_name())358        if notification_message:359            message = "%s (%s)" %(message, notification_message)360        submit_desc = ''361        from pyasm.prod.biz import Submission362        if isinstance(my.sobject, Submission):363            update_info = ['']364            # add more info about the file and bin365            snapshot = Snapshot.get_latest_by_sobject(my.sobject)366            xpath = "snapshot/file[@type='main']"367            if not snapshot:368                return "no snapshot found"369            xml = snapshot.get_xml_value('snapshot')370            file = None371            if xml.get_node(xpath) is not None:372                file = my._get_file_obj(snapshot)373            else:374                snapshots = snapshot.get_all_ref_snapshots()375                snapshot_file_objects = []376                if snapshots:377                    snapshot = snapshots[0]378                    file = my._get_file_obj(snapshot, type=None)379            if file:380                file_name = file.get_file_name()381                web_path = file.get_web_path()382                from pyasm.web import WebContainer 383                host = WebContainer.get_web().get_base_url()384                update_info.append('Browse: %s %s%s' %( file_name, host.to_string(), web_path))385            bins = my.sobject.get_bins()386            bin_labels = [ bin.get_label() for bin in bins]387            update_info.append('Bin: %s' %', '.join(bin_labels))388            update_info.append('Artist: %s' %my.sobject.get_value('artist'))389            update_info.append('Description: %s' %my.sobject.get_value('description'))390             391            # get notes392            search = Note.get_search_by_sobjects([my.sobject])393            if search:394                search.add_order_by("context")395                search.add_order_by("timestamp desc")396            notes = search.get_sobjects()397            last_context = None398            note_list = []399            for i, note in enumerate(notes):400                context = note.get_value('context')401                # explicit compare to None402                if last_context == None or context != last_context:403                    note_list.append( "[ %s ] " % context )404                last_context = context405                406                #child_notes = my.notes_dict.get(note.get_id())407                # draw note item408                date = Date(db=note.get_value('timestamp'))409                note_list.append('(%s) %s'%(date.get_display_time(), note.get_value("note")))410            update_info.append('Notes: \n %s' % '\n'.join(note_list))411            submit_desc =  '\n'.join(update_info)412            413        update_desc = my.sobject.get_update_description()414        command_desc = my.command.get_description()415        message = '%s\n\nReport from transaction:\n%s\n\n%s\n%s' \416            % (message, update_desc, command_desc, submit_desc)417        return message418    def _get_file_obj(my, snapshot, type='main'):419        if type:420            xpath = "snapshot/file[@type='%s']" %type421        else:422            xpath = "snapshot/file[@type]"423        xml = snapshot.get_xml_value('snapshot')424        node = xml.get_node(xpath)425        file = None426        if node is not None:427            file_code = Xml.get_attribute(node, "file_code")428            file = File.get_by_code(file_code)429        return file430class SubmissionStatusEmailHandler(SubmissionEmailHandler):431    def check_rule(my):432        '''determine whether an email should be sent'''433        return True434    def _get_login(my, assigned):435        return Login.get_by_login(assigned)436        437    def get_to(my):438        recipients = super(SubmissionStatusEmailHandler, my).get_to()439        submission = my.sobject440        artist = submission.get_value('artist')441        assigned = my._get_login(artist)442        if assigned:443            recipients.add(assigned)444        445        return recipients446class SubmissionNoteEmailHandler(SubmissionEmailHandler):447    def check_rule(my):448        '''determine whether an email should be sent'''449        return True450    def get_message(my):451        search_type_obj = my.parent.get_search_type_obj()452        title = search_type_obj.get_title()453        subject = my.get_subject()454        notification_message = my.notification.get_value("message")455        message = "%s %s Note Entry" % (title, my.parent.get_name())456        if notification_message:457            message = "%s (%s)" %(message, notification_message)458        submit_desc = ''459        from pyasm.prod.biz import Submission460        if isinstance(my.parent, Submission):461            update_info = ['']462            # add more info about the file and bin463            snapshot = Snapshot.get_latest_by_sobject(my.parent, "publish")464            xpath = "snapshot/file[@type='main']"465            xml = snapshot.get_xml_value('snapshot')466            file = None467            if xml.get_node(xpath) is not None:468                file = my._get_file_obj(snapshot)469            else:470                snapshots = snapshot.get_all_ref_snapshots()471                snapshot_file_objects = []472                if snapshots:473                    snapshot = snapshots[0]474                    file = my._get_file_obj(snapshot, type=None)475            if file:476                file_name = file.get_file_name()477                web_path = file.get_web_path()478                from pyasm.web import WebContainer 479                host = WebContainer.get_web().get_base_url()480                update_info.append('Browse: %s %s%s' %( file_name, host.to_string(), web_path))481            bins = my.parent.get_bins()482            bin_labels = [ bin.get_label() for bin in bins]483            update_info.append('Bin: %s' %', '.join(bin_labels))484            update_info.append('Artist: %s' %my.parent.get_value('artist'))485            update_info.append('Description: %s' %my.parent.get_value('description'))486             487            # get notes488            search = Note.get_search_by_sobjects([my.parent])489            if search:490                search.add_order_by("context")491                search.add_order_by("timestamp desc")492            notes = search.get_sobjects()493            last_context = None494            note_list = []495            for i, note in enumerate(notes):496                context = note.get_value('context')497                # explicit compare to None498                if last_context == None or context != last_context:499                    note_list.append( "[ %s ] " % context )500                last_context = context501                502                #child_notes = my.notes_dict.get(note.get_id())503                # draw note item504                date = Date(db=note.get_value('timestamp'))505                note_list.append('(%s) %s'%(date.get_display_time(), note.get_value("note")))506            update_info.append('Notes: \n %s' % '\n'.join(note_list))507            submit_desc =  '\n'.join(update_info)508            509        update_desc = my.sobject.get_update_description()510        command_desc = my.command.get_description()511        message = '%s\n\nReport from transaction:\n%s\n\n%s\n%s' \512            % (message, update_desc, command_desc, submit_desc)513        return message514        515    def _get_login(my, assigned):516        return Login.get_by_login(assigned)517        518    def get_to(my):519        recipients = super(SubmissionNoteEmailHandler, my).get_to()520        submission = my.parent521        artist = submission.get_value('artist')522        assigned = my._get_login(artist)523        if assigned:524            recipients.add(assigned)525        526        return recipients527class TestEmailHandler(EmailHandler):528    '''Email sent when a task is assigned'''529    def check_rule(my):530        task = my.sobject531       532        assigned = task.get_value("assigned")533        task_process = task.get_value("process")534        task_description = task.get_value("description")535        # get the pipeline536        my.parent = task.get_parent()...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
