How to use _get_login method in molecule

Best Python code snippet using molecule_python

email_handler.py

Source:email_handler.py Github

copy

Full Screen

...190class NoteEmailHandler(EmailHandler):191 def check_rule(my):192 '''determine whether an email should be sent'''193 return True194 def _get_login(my, assigned):195 return Login.get_by_login(assigned)196 197 198 def get_to(my):199 #recipients = super(NoteEmailHandler, my).get_to()200 recipients = set()201 note = my.sobject202 search = Search(Task)203 search.add_filter('search_type', note.get_value('search_type'))204 search.add_filter('search_id', note.get_value('search_id'))205 # it will get the context if process not found206 search.add_filter('process', note.get_process())207 search.add_filter('project_code',note.get_value('project_code'))208 tasks = search.get_sobjects()209 for task in tasks:210 assigned = my._get_login(task.get_assigned())211 if assigned:212 recipients.add(assigned)213 supe = my._get_login(task.get_supervisor())214 if supe:215 recipients.add(supe)216 return recipients217class GeneralNoteEmailHandler(EmailHandler):218 def check_rule(my):219 '''determine whether an email should be sent'''220 return True221 def _get_login(my, assigned):222 return Login.get_by_login(assigned)223 224 def get_to(my):225 #recipients = super(NoteEmailHandler, my).get_to()226 recipients = set()227 note = my.sobject228 search = Search(Task)229 grand_parent = None230 parent_search_type = note.get_value('search_type')231 if 'prod/submission' in parent_search_type:232 grand_parent = my.parent.get_parent()233 234 search_type = note.get_value('search_type')235 search_id = note.get_value('search_id')236 if grand_parent:237 search_type = grand_parent.get_search_type()238 search_id = grand_parent.get_id()239 search.add_filter('search_type', search_type)240 search.add_filter('search_id', search_id )241 # it will get the context if process not found242 #search.add_filter('process', note.get_process())243 search.add_filter('project_code',note.get_value('project_code'))244 tasks = search.get_sobjects()245 for task in tasks:246 assigned = my._get_login(task.get_assigned())247 if assigned:248 recipients.add(assigned)249 supe = my._get_login(task.get_supervisor())250 if supe:251 recipients.add(supe)252 return recipients253 def get_message(my):254 255 search_type_obj = my.sobject.get_search_type_obj()256 title = search_type_obj.get_title()257 258 notification_message = my.notification.get_value("message")259 message = "%s %s" % (title, my.sobject.get_name())260 if notification_message:261 message = "%s (%s)" %(message, notification_message)262 update_desc = my.sobject.get_update_description()263 parent_search_type = my.sobject.get_value('search_type')264 grand_parent = None265 if 'prod/submission' in parent_search_type:266 parent = Search.get_by_id(parent_search_type, my.sobject.get_value('search_id') )267 snapshot = Snapshot.get_latest_by_sobject(parent, 'publish')268 if snapshot:269 file_name = snapshot.get_file_name_by_type('main')270 update_desc = '%s \n %s \n' %(update_desc, file_name)271 grand_parent = parent.get_parent()272 if grand_parent:273 update_desc = '%s %s'%(update_desc, grand_parent.get_code())274 command_desc = my.command.get_description()275 message = '%s\n\nReport from transaction:\n%s\n\n%s' \276 % (message, update_desc, command_desc)277 return message278class GeneralPublishEmailHandler(EmailHandler):279 ''' On publish of a shot/asset, it will find all the assignees of this 280 shot/asset and send the notification to them'''281 def check_rule(my):282 '''determine whether an email should be sent'''283 return True284 def _get_login(my, assigned):285 return Login.get_by_login(assigned)286 287 def get_to(my):288 recipients = super(GeneralPublishEmailHandler, my).get_to()289 sobj = my.sobject290 search = Search(Task)291 292 search_type = sobj.get_search_type()293 294 search_id = sobj.get_id()295 296 search.add_filter('search_type', search_type)297 search.add_filter('search_id', search_id )298 # it will get the context if process not found299 #search.add_filter('process', note.get_process())300 from pyasm.biz import Project301 search.add_filter('project_code', Project.get_project_code())302 tasks = search.get_sobjects()303 for task in tasks:304 assigned = my._get_login(task.get_assigned())305 if assigned:306 recipients.add(assigned)307 supe = my._get_login(task.get_supervisor())308 if supe:309 recipients.add(supe)310 return recipients311 def get_message(my):312 313 search_type_obj = my.sobject.get_search_type_obj()314 title = search_type_obj.get_title()315 316 notification_message = my.notification.get_value("message")317 message = "%s %s" % (title, my.sobject.get_name())318 if notification_message:319 message = "%s (%s)" %(message, notification_message)320 update_desc = my.sobject.get_update_description()321 322 command_desc = my.command.get_description()323 message = '%s\n\nReport from transaction:\n%s\n\n%s' \324 % (message, update_desc, command_desc)325 return message326class TaskStatusEmailHandler(EmailHandler):327 def check_rule(my):328 '''determine whether an email should be sent'''329 return True330 def _get_login(my, assigned):331 return Login.get_by_login(assigned)332 333 def get_to(my):334 recipients = super(TaskStatusEmailHandler, my).get_to()335 sobj = my.sobject336 337 # it could be the parent of task:338 if not isinstance(sobj, Task):339 tasks = Task.get_by_sobject(sobj)340 else:341 tasks = [sobj]342 for task in tasks:343 assigned = my._get_login(task.get_assigned())344 if assigned:345 recipients.add(assigned)346 supe = my._get_login(task.get_supervisor())347 if supe:348 recipients.add(supe)349 return recipients350class SubmissionEmailHandler(EmailHandler):351 '''Email sent when a submission is entered'''352 def get_message(my):353 search_type_obj = my.sobject.get_search_type_obj()354 title = search_type_obj.get_title()355 subject = my.get_subject()356 notification_message = my.notification.get_value("message")357 message = "%s %s" % (title, my.sobject.get_name())358 if notification_message:359 message = "%s (%s)" %(message, notification_message)360 submit_desc = ''361 from pyasm.prod.biz import Submission362 if isinstance(my.sobject, Submission):363 update_info = ['']364 # add more info about the file and bin365 snapshot = Snapshot.get_latest_by_sobject(my.sobject)366 xpath = "snapshot/file[@type='main']"367 if not snapshot:368 return "no snapshot found"369 xml = snapshot.get_xml_value('snapshot')370 file = None371 if xml.get_node(xpath) is not None:372 file = my._get_file_obj(snapshot)373 else:374 snapshots = snapshot.get_all_ref_snapshots()375 snapshot_file_objects = []376 if snapshots:377 snapshot = snapshots[0]378 file = my._get_file_obj(snapshot, type=None)379 if file:380 file_name = file.get_file_name()381 web_path = file.get_web_path()382 from pyasm.web import WebContainer 383 host = WebContainer.get_web().get_base_url()384 update_info.append('Browse: %s %s%s' %( file_name, host.to_string(), web_path))385 bins = my.sobject.get_bins()386 bin_labels = [ bin.get_label() for bin in bins]387 update_info.append('Bin: %s' %', '.join(bin_labels))388 update_info.append('Artist: %s' %my.sobject.get_value('artist'))389 update_info.append('Description: %s' %my.sobject.get_value('description'))390 391 # get notes392 search = Note.get_search_by_sobjects([my.sobject])393 if search:394 search.add_order_by("context")395 search.add_order_by("timestamp desc")396 notes = search.get_sobjects()397 last_context = None398 note_list = []399 for i, note in enumerate(notes):400 context = note.get_value('context')401 # explicit compare to None402 if last_context == None or context != last_context:403 note_list.append( "[ %s ] " % context )404 last_context = context405 406 #child_notes = my.notes_dict.get(note.get_id())407 # draw note item408 date = Date(db=note.get_value('timestamp'))409 note_list.append('(%s) %s'%(date.get_display_time(), note.get_value("note")))410 update_info.append('Notes: \n %s' % '\n'.join(note_list))411 submit_desc = '\n'.join(update_info)412 413 update_desc = my.sobject.get_update_description()414 command_desc = my.command.get_description()415 message = '%s\n\nReport from transaction:\n%s\n\n%s\n%s' \416 % (message, update_desc, command_desc, submit_desc)417 return message418 def _get_file_obj(my, snapshot, type='main'):419 if type:420 xpath = "snapshot/file[@type='%s']" %type421 else:422 xpath = "snapshot/file[@type]"423 xml = snapshot.get_xml_value('snapshot')424 node = xml.get_node(xpath)425 file = None426 if node is not None:427 file_code = Xml.get_attribute(node, "file_code")428 file = File.get_by_code(file_code)429 return file430class SubmissionStatusEmailHandler(SubmissionEmailHandler):431 def check_rule(my):432 '''determine whether an email should be sent'''433 return True434 def _get_login(my, assigned):435 return Login.get_by_login(assigned)436 437 def get_to(my):438 recipients = super(SubmissionStatusEmailHandler, my).get_to()439 submission = my.sobject440 artist = submission.get_value('artist')441 assigned = my._get_login(artist)442 if assigned:443 recipients.add(assigned)444 445 return recipients446class SubmissionNoteEmailHandler(SubmissionEmailHandler):447 def check_rule(my):448 '''determine whether an email should be sent'''449 return True450 def get_message(my):451 search_type_obj = my.parent.get_search_type_obj()452 title = search_type_obj.get_title()453 subject = my.get_subject()454 notification_message = my.notification.get_value("message")455 message = "%s %s Note Entry" % (title, my.parent.get_name())456 if notification_message:457 message = "%s (%s)" %(message, notification_message)458 submit_desc = ''459 from pyasm.prod.biz import Submission460 if isinstance(my.parent, Submission):461 update_info = ['']462 # add more info about the file and bin463 snapshot = Snapshot.get_latest_by_sobject(my.parent, "publish")464 xpath = "snapshot/file[@type='main']"465 xml = snapshot.get_xml_value('snapshot')466 file = None467 if xml.get_node(xpath) is not None:468 file = my._get_file_obj(snapshot)469 else:470 snapshots = snapshot.get_all_ref_snapshots()471 snapshot_file_objects = []472 if snapshots:473 snapshot = snapshots[0]474 file = my._get_file_obj(snapshot, type=None)475 if file:476 file_name = file.get_file_name()477 web_path = file.get_web_path()478 from pyasm.web import WebContainer 479 host = WebContainer.get_web().get_base_url()480 update_info.append('Browse: %s %s%s' %( file_name, host.to_string(), web_path))481 bins = my.parent.get_bins()482 bin_labels = [ bin.get_label() for bin in bins]483 update_info.append('Bin: %s' %', '.join(bin_labels))484 update_info.append('Artist: %s' %my.parent.get_value('artist'))485 update_info.append('Description: %s' %my.parent.get_value('description'))486 487 # get notes488 search = Note.get_search_by_sobjects([my.parent])489 if search:490 search.add_order_by("context")491 search.add_order_by("timestamp desc")492 notes = search.get_sobjects()493 last_context = None494 note_list = []495 for i, note in enumerate(notes):496 context = note.get_value('context')497 # explicit compare to None498 if last_context == None or context != last_context:499 note_list.append( "[ %s ] " % context )500 last_context = context501 502 #child_notes = my.notes_dict.get(note.get_id())503 # draw note item504 date = Date(db=note.get_value('timestamp'))505 note_list.append('(%s) %s'%(date.get_display_time(), note.get_value("note")))506 update_info.append('Notes: \n %s' % '\n'.join(note_list))507 submit_desc = '\n'.join(update_info)508 509 update_desc = my.sobject.get_update_description()510 command_desc = my.command.get_description()511 message = '%s\n\nReport from transaction:\n%s\n\n%s\n%s' \512 % (message, update_desc, command_desc, submit_desc)513 return message514 515 def _get_login(my, assigned):516 return Login.get_by_login(assigned)517 518 def get_to(my):519 recipients = super(SubmissionNoteEmailHandler, my).get_to()520 submission = my.parent521 artist = submission.get_value('artist')522 assigned = my._get_login(artist)523 if assigned:524 recipients.add(assigned)525 526 return recipients527class TestEmailHandler(EmailHandler):528 '''Email sent when a task is assigned'''529 def check_rule(my):530 task = my.sobject531 532 assigned = task.get_value("assigned")533 task_process = task.get_value("process")534 task_description = task.get_value("description")535 # get the pipeline536 my.parent = task.get_parent()...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run molecule automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful