How to use generate_message_id method in localstack

Best Python code snippet using localstack_python

tests.py

Source:tests.py Github

copy

Full Screen

...215 """216 Sends an Inbox Message to an invalid URL. 217 Should get back a 404218 """219 inbox_message = tm11.InboxMessage(generate_message_id())220 msg = make_request(post_data = inbox_message.to_xml(), path='/Services/PathThatShouldNotWork/', expected_code = 404)221 #msg = self.base_test(post_data = inbox_message.to_xml(), path='/Services/PathThatShouldNotWork/', expected_code = 404)222 223 def test_02(self):224 """225 Tests sending a GET request to the server. Should result in a BAD MESSAGE226 """227 # Lack of post_data makes it a GET request228 msg = make_request(path=DISCOVERY_11_PATH)229 #msg = self.base_test(path=DISCOVERY_11_PATH)230 231 def test_03(self):232 """233 Send an XML fragment to the server234 """235 msg = make_request(path=DISCOVERY_11_PATH, post_data='<XML_that_is_not_well_formed>', response_msg_type=MSG_STATUS_MESSAGE)236 #msg = self.base_test(path=DISCOVERY_11_PATH, post_data='<XML_that_is_not_well_formed>', response_msg_type=MSG_STATUS_MESSAGE)237 238 def test_04(self):239 """240 Send schema-invalid XML241 """242 msg = make_request(path=DISCOVERY_11_PATH, post_data='<well_formed_schema_invalid_xml/>', response_msg_type=MSG_STATUS_MESSAGE)243 #msg = self.base_test(path=DISCOVERY_11_PATH, post_data='<well_formed_schema_invalid_xml/>', response_msg_type=MSG_STATUS_MESSAGE)244 245 # The next few tests test headers presence/absence246 # and unsupported values247 248 def test_05(self):249 """250 For each set of TAXII Headers, test the following:251 - One header missing252 - A bad value for each header253 - Other permutations in the future?254 """255 256 #TODO: The responses could probably be checked better257 #TODO: This whole thing can probably be done better,258 # but it's probably sufficient for now259 260 #Tuples of services version / is_secure261 HTTP = False262 HTTPS = True263 tuples = ( (VID_TAXII_SERVICES_11, HTTPS),264 (VID_TAXII_SERVICES_11, HTTP),265 (VID_TAXII_SERVICES_10, HTTPS),266 (VID_TAXII_SERVICES_10, HTTP)267 )268 269 # Create a list of headers for mangling header values270 tmp_headers = get_headers( tuples[0][0], tuples[0][1] )271 header_list = tmp_headers.keys()272 273 # Iterate over every TAXII Service version / is_secure value,274 # over every header, and try that header with a bad value275 # and not present276 for tuple in tuples:277 if tuple[0] == VID_TAXII_SERVICES_11:278 disc_req_xml = tm11.DiscoveryRequest(generate_message_id()).to_xml()279 else:280 disc_req_xml = tm10.DiscoveryRequest(generate_message_id()).to_xml()281 for header in header_list:282 expected_code = 200 if header != 'HTTP_ACCEPT' else 406283 r_msg = MSG_STATUS_MESSAGE284 request_headers = get_headers(tuple[0], tuple[1])285 286 # Try the bad header value287 request_headers[header] = 'THIS_IS_A_BAD_VALUE'288 #print header, request_headers[header]289 msg = make_request(post_data = disc_req_xml, 290 path=DISCOVERY_11_PATH, 291 response_msg_type = r_msg, 292 header_dict=request_headers,293 expected_code = expected_code)294 #print msg.to_xml(pretty_print=True)295 296 # Now try without the header297 if header in ('HTTP_ACCEPT', 'HTTP_X_TAXII_ACCEPT'): # These headers, if missing, should result an a valid response being sent298 expected_code = 200299 r_msg = MSG_DISCOVERY_RESPONSE300 del request_headers[header]301 msg = make_request(post_data = disc_req_xml, 302 path=DISCOVERY_11_PATH, 303 response_msg_type = r_msg, 304 header_dict=request_headers, 305 expected_code = expected_code)306 #print msg.to_xml(pretty_print=True)307 # Good, now on to the next one!308 pass309 310 def test_06(self):311 """312 Send a discovery message mismatched with the message binding ID313 """314 # TODO: Write this315 pass316 317 def test_07(self):318 """319 Send a request mismatched with the protocol binding ID320 """321 # TODO: Write this322 pass323 324 def test_08(self):325 """326 Send a request mismatched with the services version327 """328 # TODO: Write this329 pass330 331 def test_09(self):332 """333 Send a TAXII 1.0 message with an X-TAXII-Accept value of TAXII 1.1334 """335 # TODO: Write this336 pass337 338 def test_10(self):339 """340 Send a TAXII 1.1 message with an X-TAXII-Accept value of TAXII 1.0341 """342 # TODO: Write this343 pass344class InboxTests(TestCase):345 """346 Test various aspects of the347 taxii_handlers.InboxMessage11Handler348 """349 350 # These tests expect the following objects in the database:351 # /services/test_inbox_1/ - TAXII 1.1 Inbox Service, accepts all content, enabled, dcns = required (default)352 # /services/test_inbox_2/ - TAXII 1.1 Inbox Service, accepts all content, enabled, dcns = optional (default)353 # /services/test_inbox_3/ - TAXII 1.1 Inbox Service, accepts all content, enabled, dcns = prohibited 354 # TODO: Write more involved tests355 356 #fixtures = ['inbox_test_fixture.json']357 fixtures = ['test_data.json']358 359 def test_01(self):360 """361 Send a message to test_inbox_1 with a valid destination collection name362 """363 inbox = tm11.InboxMessage(generate_message_id(), destination_collection_names=['default'])364 msg = make_request('/services/test_inbox_1/', inbox.to_xml(), get_headers(VID_TAXII_SERVICES_11, False), MSG_STATUS_MESSAGE, ST_SUCCESS)365 #msg = self.send_inbox_message('/services/test_inbox_1/', VID_TAXII_XML_11, inbox)366 367 def test_02(self):368 """369 Send a message to test_inbox_1 with an invalid destination collection name370 """371 inbox = tm11.InboxMessage(generate_message_id(), destination_collection_names=['default_INVALID'])372 msg = make_request('/services/test_inbox_1/', inbox.to_xml(), get_headers(VID_TAXII_SERVICES_11, False), MSG_STATUS_MESSAGE, ST_NOT_FOUND, sd_keys=[SD_ITEM])373 #msg = self.send_inbox_message('/services/test_inbox_1/', VID_TAXII_XML_11, inbox, st=ST_NOT_FOUND, sd_keys=[SD_ITEM])374 375 def test_03(self):376 """377 Send a message to test_inbox_1 without a destination collection name378 """379 inbox = tm11.InboxMessage(generate_message_id())380 msg = make_request('/services/test_inbox_1/', 381 inbox.to_xml(), 382 get_headers(VID_TAXII_SERVICES_11, False), 383 MSG_STATUS_MESSAGE, 384 st=ST_DESTINATION_COLLECTION_ERROR, 385 sd_keys=[SD_ACCEPTABLE_DESTINATION])386 #msg = self.send_inbox_message('/services/test_inbox_1/', VID_TAXII_XML_11, inbox, st=ST_DESTINATION_COLLECTION_ERROR, sd_keys=[SD_ACCEPTABLE_DESTINATION])387 388 def test_04(self):389 """390 Send a message to test_inbox_2 with a valid destination collection name391 """392 inbox = tm11.InboxMessage(generate_message_id(), destination_collection_names=['default'])393 msg = make_request('/services/test_inbox_2/', 394 inbox.to_xml(), 395 get_headers(VID_TAXII_SERVICES_11, False), 396 MSG_STATUS_MESSAGE, 397 st=ST_SUCCESS)398 #msg = self.send_inbox_message('/services/test_inbox_2/', VID_TAXII_XML_11, inbox)399 400 def test_05(self):401 """402 Send a message to test_inbox_2 with an invalid destination collection name403 """404 inbox = tm11.InboxMessage(generate_message_id(), destination_collection_names=['default_INVALID'])405 msg = make_request('/services/test_inbox_2/',406 inbox.to_xml(),407 get_headers(VID_TAXII_SERVICES_11, False),408 MSG_STATUS_MESSAGE,409 st=ST_NOT_FOUND,410 sd_keys=[SD_ITEM])411 #msg = self.send_inbox_message('/services/test_inbox_2/', VID_TAXII_XML_11, inbox, st=ST_NOT_FOUND, sd_keys=[SD_ITEM])412 413 def test_06(self):414 """415 Send a message to test_inbox_2 without a destination collection name416 """417 inbox = tm11.InboxMessage(generate_message_id())418 msg = make_request('/services/test_inbox_2/', 419 inbox.to_xml(), 420 get_headers(VID_TAXII_SERVICES_11, False), 421 MSG_STATUS_MESSAGE, 422 st=ST_SUCCESS)423 #msg = self.send_inbox_message('/services/test_inbox_2/', VID_TAXII_XML_11, inbox, st=ST_SUCCESS)424 425 def test_08(self):426 """427 Send a message to test_inbox_3 with a destination collection name428 """429 inbox = tm11.InboxMessage(generate_message_id(), destination_collection_names=['default'])430 msg = make_request('/services/test_inbox_3/', 431 inbox.to_xml(), 432 get_headers(VID_TAXII_SERVICES_11, False), 433 MSG_STATUS_MESSAGE, 434 st=ST_DESTINATION_COLLECTION_ERROR)435 #msg = self.send_inbox_message('/services/test_inbox_3/', VID_TAXII_XML_11, inbox, st=ST_DESTINATION_COLLECTION_ERROR)436 437 def test_09(self):438 """439 Send a message to test_inbox_3 without a destination collection name440 """441 inbox = tm11.InboxMessage(generate_message_id())442 msg = make_request('/services/test_inbox_3/', 443 inbox.to_xml(), 444 get_headers(VID_TAXII_SERVICES_11, False), 445 MSG_STATUS_MESSAGE, 446 st=ST_SUCCESS)447 #msg = self.send_inbox_message('/services/test_inbox_3/', VID_TAXII_XML_11, inbox)448 449 def test_10(self):450 """451 Send an Inbox message with a Record Count452 """453 inbox = tm11.InboxMessage(generate_message_id(), destination_collection_names=['default'])454 inbox.record_count = tm11.RecordCount(0, True)455 msg = make_request('/services/test_inbox_1/', 456 inbox.to_xml(), 457 get_headers(VID_TAXII_SERVICES_11, False), 458 MSG_STATUS_MESSAGE, 459 st=ST_SUCCESS)460 #msg = self.send_inbox_message('/services/test_inbox_1/', VID_TAXII_XML_11, inbox)461 462 def test_11(self):463 """464 Send a TAXII 1.0 Inbox Message to /services/test_inbox_1/. Will always465 fail because /services/test_inbox_1/ requires a DCN and TAXII 1.0 cannot specify that.466 """467 inbox = tm10.InboxMessage(generate_message_id())468 msg = make_request('/services/test_inbox_1/', 469 inbox.to_xml(), 470 get_headers(VID_TAXII_SERVICES_10, False), 471 MSG_STATUS_MESSAGE, 472 st=ST_FAILURE)473 #msg = self.send_inbox_message('/services/test_inbox_1/', VID_TAXII_XML_10, inbox)474 475 def test_12(self):476 """477 Send a TAXII 1.0 Inbox Message to /services/test_inbox_2/478 """479 inbox = tm10.InboxMessage(generate_message_id())480 msg = make_request('/services/test_inbox_2/', 481 inbox.to_xml(), 482 get_headers(VID_TAXII_SERVICES_10, False), 483 MSG_STATUS_MESSAGE, 484 st=ST_SUCCESS)485 #msg = self.send_inbox_message('/services/test_inbox_2/', VID_TAXII_XML_10, inbox)486 487 def test_13(self):488 """489 Send a TAXII 1.0 Inbox Message to /services/test_inbox_3/490 """491 inbox = tm10.InboxMessage(generate_message_id())492 msg = make_request('/services/test_inbox_3/', 493 inbox.to_xml(), 494 get_headers(VID_TAXII_SERVICES_10, False), 495 MSG_STATUS_MESSAGE, 496 st=ST_SUCCESS)497 #msg = self.send_inbox_message('/services/test_inbox_3/', VID_TAXII_XML_10, inbox)498class PollRequestTests11(TestCase):499 fixtures = ['test_data.json']500 501 # Make sure query tests are in here502 503 # These tests expect the following objects in the database:504 # /services/test_poll_1/ - (Collection: Default), max_result_size = 5; requires=Subscription = False505 506 def send_poll_request(self, path, messages_vid, poll_request, status_type=None, sd_keys=None):507 #status_type=None means expect a PollResponse508 #status_type=<value> means expect a StatusMessage509 if messages_vid not in (VID_TAXII_XML_11, VID_TAXII_XML_10):510 raise ValueError("Wrong messages_vid!")511 512 client = tc.HttpClient()513 client.setProxy(tc.HttpClient.NO_PROXY)514 resp = client.callTaxiiService2(HOST, path, messages_vid, poll_request.to_xml(), PORT)515 msg = t.get_message_from_http_response(resp, poll_request.message_id)516 517 if not status_type: # Expect a Poll Response518 if msg.message_type != MSG_POLL_RESPONSE:519 raise ValueError('Expected Poll Response. Got: %s.\r\n%s' % \520 (msg.message_type, msg.to_xml(pretty_print=True)) )521 else: # Expect a Status Message522 if msg.message_type != MSG_STATUS_MESSAGE:523 raise ValueError('Expected Status Message. Got: %s.\r\n%s' % \524 (msg.message_type, msg.to_xml(pretty_print=True)) )525 if sd_keys:526 for key in sd_keys:527 if key not in msg.status_detail:528 raise ValueError('Expected status detail key was not present: %s\r\n%s' % \529 (key, msg.to_xml(pretty_print=True)) )530 return msg531 532 def test_01(self):533 """534 Test an invalid collection name535 """536 pp = tm11.PollParameters()537 pr = tm11.PollRequest(538 message_id = generate_message_id(),539 collection_name = 'INVALID_COLLECTION_NAME_13820198320',540 poll_parameters = pp)541 542 msg = make_request('/services/test_poll_1/', 543 pr.to_xml(), 544 get_headers(VID_TAXII_SERVICES_11, False), 545 MSG_STATUS_MESSAGE, 546 st=ST_NOT_FOUND,547 sd_keys=[SD_ITEM])548 #msg = self.send_poll_request('/services/test_poll_1/', VID_TAXII_XML_11, pr, status_type=ST_NOT_FOUND, sd_keys=[SD_ITEM])549 550 def test_02(self):551 """552 Test a begin TS later than an end TS.553 """554 begin_ts = datetime.now(tzutc())555 end_ts = begin_ts - timedelta(days=-7)556 557 pp = tm11.PollParameters()558 pr = tm11.PollRequest(559 message_id = generate_message_id(),560 collection_name = 'default',561 poll_parameters = pp,562 exclusive_begin_timestamp_label = begin_ts,563 inclusive_end_timestamp_label = end_ts)564 msg = make_request('/services/test_poll_1/', 565 pr.to_xml(), 566 get_headers(VID_TAXII_SERVICES_11, False), 567 ST_FAILURE)568 #msg = self.send_poll_request('/services/test_poll_1/', VID_TAXII_XML_11, pr, status_type=ST_FAILURE)569 570 def test_03(self):571 """572 Test an invalid subscription ID573 """574 pr = tm11.PollRequest(575 message_id = generate_message_id(),576 collection_name = 'default',577 subscription_id = 'jdslkajdlksajdlksajld')578 msg = make_request('/services/test_poll_1/', 579 pr.to_xml(), 580 get_headers(VID_TAXII_SERVICES_11, False), 581 MSG_STATUS_MESSAGE, 582 st=ST_NOT_FOUND,583 sd_keys=[SD_ITEM])584 #msg = self.send_poll_request('/services/test_poll_1/', VID_TAXII_XML_11, pr, status_type=ST_NOT_FOUND, sd_keys=[SD_ITEM])585 586 def test_04(self):587 """588 Test a Content Binding ID not supported by the server589 """590 pp = tm11.PollParameters(content_bindings=[tm11.ContentBinding('some_random_binding')])591 pr = tm11.PollRequest(592 message_id = generate_message_id(),593 collection_name = 'default',594 poll_parameters = pp)595 msg = make_request('/services/test_poll_1/', 596 pr.to_xml(), 597 get_headers(VID_TAXII_SERVICES_11, False), 598 MSG_STATUS_MESSAGE, 599 st=ST_UNSUPPORTED_CONTENT_BINDING,600 sd_keys=[SD_SUPPORTED_CONTENT])601 #msg = self.send_poll_request('/services/test_poll_1/', VID_TAXII_XML_11, pr, status_type=ST_UNSUPPORTED_CONTENT_BINDING, sd_keys=[SD_SUPPORTED_CONTENT])602 603 def test_05(self):604 """605 Test a supported Content Binding ID with an unsupported subtype606 """607 pp = tm11.PollParameters(content_bindings=[tm11.ContentBinding(CB_STIX_XML_111, subtype_ids=['FakeSubtype'])])608 pr = tm11.PollRequest(609 message_id = generate_message_id(),610 collection_name = 'default',611 poll_parameters = pp)612 msg = make_request('/services/test_poll_1/', 613 pr.to_xml(), 614 get_headers(VID_TAXII_SERVICES_11, False), 615 MSG_STATUS_MESSAGE, 616 st=ST_UNSUPPORTED_CONTENT_BINDING,617 sd_keys=[SD_SUPPORTED_CONTENT])618 #msg = self.send_poll_request('/services/test_poll_1/', VID_TAXII_XML_11, pr, status_type=ST_UNSUPPORTED_CONTENT_BINDING, sd_keys=[SD_SUPPORTED_CONTENT])619 # This test case currently fails and will not be able to pass until either django-taxii-services changes or libtaxii changes.620 # def test_06(self):621 # """622 # Test an unsupported query format623 # """624 # pp = tm11.PollParameters(query=tm11.Query(format_id='unsupported_format_id'))625 # pr = tm11.PollRequest(626 # message_id = generate_message_id(),627 # collection_name = 'default',628 # poll_parameters = pp)629 # msg = self.send_poll_request('/services/test_poll_1/', VID_TAXII_XML_11, pr, status_type=ST_UNSUPPORTED_QUERY, sd_keys=[SD_SUPPORTED_QUERY])630 631 # This test won't be valid until Delivery Params are implemented632 # def test_07(self):633 # """634 # Test unsupported delivery parameters - protocol635 # """636 # dp = tm11.DeliveryParameters('protocol_x', 'http://example.com/whatever/', VID_TAXII_XML_11)637 # pp = tm11.PollParameters(delivery_parameters=dp)638 # pr = tm11.PollRequest(639 # message_id = generate_message_id(),640 # collection_name = 'default',641 # poll_parameters = pp)642 # msg = self.send_poll_request('/services/test_poll_1/', VID_TAXII_XML_11, pr, status_type=ST_UNSUPPORTED_PROTOCOL, sd_keys=[SD_SUPPORTED_PROTOCOL])643 644 # This test won't be valid until Delivery Params are implemented645 # def test_08(self):646 # """647 # Test unsupported delivery parameters - message_binding648 # """649 # dp = tm11.DeliveryParameters(VID_TAXII_HTTPS_11, 'http://example.com/whatever/', 'message_binding_x')650 # pp = tm11.PollParameters(delivery_parameters=dp)651 # pr = tm11.PollRequest(652 # message_id = generate_message_id(),653 # collection_name = 'default',654 # poll_parameters = pp)655 # msg = self.send_poll_request('/services/test_poll_1/', VID_TAXII_XML_11, pr, status_tpe=ST_UNSUPPORTED_MESSAGE, sd_keys=[SD_SUPPORTED_MESSAGE])656 657 658 def test_09(self):659 """660 Tests that a single PollRequest succeeds.661 """662 pp = tm11.PollParameters()663 pr = tm11.PollRequest(664 message_id=generate_message_id(),665 collection_name='default',666 poll_parameters=pp)667 msg = make_request('/services/test_poll_1/', 668 pr.to_xml(), 669 get_headers(VID_TAXII_SERVICES_11, False), 670 MSG_POLL_RESPONSE)671 #msg = self.send_poll_request('/services/test_poll_1/', VID_TAXII_XML_11, pr)672 if len(msg.content_blocks) != 5:673 raise ValueError('Got %s CBs' % len(msg.content_blocks))674 675 676 def test_10(self):677 """678 Test a query. Should match just the APT1 report.679 """680 test = tdq.Test(capability_id = tdq.CM_CORE, relationship=R_EQUALS, parameters={P_VALUE: 'Unit 61398', P_MATCH_TYPE: 'case_sensitive_string'} )681 criterion = tdq.Criterion(target='STIX_Package/Threat_Actors/Threat_Actor/Identity/Specification/PartyName/OrganisationName/SubDivisionName', test=test)682 criteria = tdq.Criteria(OP_AND, criterion=[criterion])683 q = tdq.DefaultQuery(CB_STIX_XML_111, criteria)684 pp = tm11.PollParameters()685 pr = tm11.PollRequest(686 message_id=generate_message_id(),687 collection_name='default',688 poll_parameters=pp)689 #msg = self.send_poll_request('/services/test_poll_1/', VID_TAXII_XML_11, pr)690 msg = make_request('/services/test_poll_1/', 691 pr.to_xml(), 692 get_headers(VID_TAXII_SERVICES_11, False), 693 MSG_POLL_RESPONSE)694 if len(msg.content_blocks) != 1:695 raise ValueError('Got %s CBs' % len(msg.content_blocks))696class PollRequestTests10(TestCase):697 fixtures = ['test_data.json']698 pass699class PollFulfillmentTests11(TestCase):700 fixtures = ['test_data.json']701 pass702class CollectionInformationTests11(TestCase):703 704 fixtures = ['test_data.json']705 706 def test_01(self):707 """708 Send a collection information request, look to get a collection information response back709 """710 cir = tm11.CollectionInformationRequest(generate_message_id())711 msg = make_request(COLLECTION_MGMT_11_PATH, 712 cir.to_xml(), 713 get_headers(VID_TAXII_SERVICES_11, False), 714 MSG_COLLECTION_INFORMATION_RESPONSE)715 716 # client = tc.HttpClient()717 # client.setProxy(tc.HttpClient.NO_PROXY)718 # cir = tm11.CollectionInformationRequest(generate_message_id())719 # resp = client.callTaxiiService2(HOST, COLLECTION_MGMT_11_PATH, VID_TAXII_XML_11, cir.to_xml(), PORT)720 # msg = t.get_message_from_http_response(resp, cir.message_id)721 # if msg.message_type != MSG_COLLECTION_INFORMATION_RESPONSE:722 # raise ValueError('Expected Collection Information Response. Got: %s.\r\n%s' % \723 # (msg.message_type, msg.to_xml(pretty_print=True)) )724class FeedInformationTests10(TestCase):725 726 fixtures = ['test_data.json']727 728 def test_01(self):729 """730 Send a feed information request, look to get a feed information response back731 """732 733 fir = tm10.FeedInformationRequest(generate_message_id())734 msg = make_request(COLLECTION_MGMT_11_PATH, 735 fir.to_xml(), 736 get_headers(VID_TAXII_SERVICES_10, False), 737 MSG_FEED_INFORMATION_RESPONSE)738 739 # client = tc.HttpClient()740 # client.setProxy(tc.HttpClient.NO_PROXY)741 # fir = tm10.FeedInformationRequest(generate_message_id())742 # resp = client.callTaxiiService2(HOST, COLLECTION_MGMT_11_PATH, VID_TAXII_XML_10, fir.to_xml(), PORT)743 # msg = t.get_message_from_http_response(resp, fir.message_id)744 # if msg.message_type != MSG_FEED_INFORMATION_RESPONSE:745 # raise ValueError('Expected Feed Information Response. Got: %s.\r\n%s' % \746 # (msg.message_type, msg.to_xml(pretty_print=True)) )747class SubscriptionTests11(TestCase):748 749 fixtures = ['test_data.json']750 751 # Make sure to test query in here752 pass753class SubscriptionTests10(TestCase):754 755 fixtures = ['test_data.json']756 757 pass758class DiscoveryTests11(TestCase):759 760 fixtures = ['test_data.json']761 762 def test_01(self):763 """764 Send a discovery request, look to get a discovery response back765 """766 767 dr = tm11.DiscoveryRequest(generate_message_id())768 msg = make_request(DISCOVERY_11_PATH, 769 dr.to_xml(), 770 get_headers(VID_TAXII_SERVICES_11, False), 771 MSG_DISCOVERY_RESPONSE)772 773 # client = tc.HttpClient()774 # client.setProxy(tc.HttpClient.NO_PROXY)775 # dr = tm11.DiscoveryRequest(generate_message_id())776 # resp = client.callTaxiiService2(HOST, DISCOVERY_11_PATH, VID_TAXII_XML_11, dr.to_xml(), PORT)777 # msg = t.get_message_from_http_response(resp, dr.message_id)778 # if msg.message_type != MSG_DISCOVERY_RESPONSE:779 # raise ValueError('Expected Discovery Response. Got: %s.\r\n%s' % \780 # (msg.message_type, msg.to_xml(pretty_print=True)) )781class DiscoveryTests10(TestCase):782 783 fixtures = ['test_data.json']784 785 def test_01(self):786 """787 Send a discovery request, look to get a discovery response back788 """789 790 dr = tm10.DiscoveryRequest(generate_message_id())791 msg = make_request(DISCOVERY_11_PATH, 792 dr.to_xml(), 793 get_headers(VID_TAXII_SERVICES_10, False), 794 MSG_DISCOVERY_RESPONSE)795 796 # client = tc.HttpClient()797 # client.setProxy(tc.HttpClient.NO_PROXY)798 # dr = tm10.DiscoveryRequest(generate_message_id())799 # resp = client.callTaxiiService2(HOST, DISCOVERY_10_PATH, VID_TAXII_XML_10, dr.to_xml(), PORT)800 # msg = t.get_message_from_http_response(resp, dr.message_id)801 # if msg.message_type != MSG_DISCOVERY_RESPONSE:802 # raise ValueError('Expected Discovery Response. Got: %s.\r\n%s' % \803 # (msg.message_type, msg.to_xml(pretty_print=True)) )804if __name__ == "__main__":...

Full Screen

Full Screen

__init__.py

Source:__init__.py Github

copy

Full Screen

...142 print( stackprinter.format() )143 # return False144 print( "Failed to Handshake with TV's Websocket Server" )145 sys.exit( 1 )146 def generate_message_id( self , prefix=False ):147 if prefix != False:148 return f"lgtv_{prefix}_{random.randint( 1 , 100 )}"149 else:150 return f"lgtv_{random.randint( 1 , 100 )}"151 def websocket_send_json( self , message={} ):152 try:153 self.open_websocket()154 self.handshake()155 self.ws.send( json.dumps( message ) )156 result = self.ws.recv()157 self.close_websocket()158 except Exception as e:159 print( stackprinter.format() )160 return False161 def send_message( self , message="hola" , icon_path=None ):162 try:163 icon_encoded_string = ""164 icon_extension = ""165 if icon_path is not None:166 icon_extension = os.path.splitext( icon_path )[ 1 ][ 1: ]167 with open( icon_path , "rb" ) as icon_file:168 icon_encoded_string = base64.b64encode( icon_file.read() ).decode( "ascii" )169 result = self.websocket_send_json({170 "id": self.generate_message_id( "send_message" ) ,171 "type": "request" ,172 "uri": f"ssap://{self.endpoints.show_message}" ,173 "payload": {174 "message": message ,175 "iconData": icon_encoded_string ,176 "iconExtension": icon_extension177 } ,178 })179 return result180 except Exception as e:181 print( stackprinter.format() )182 return False183 # Apps184 def get_apps( self ):185 try:186 return self.websocket_send_json({187 "id": self.generate_message_id( "get_apps" ) ,188 "type": "request" ,189 "uri": f"ssap://{self.endpoints.get_apps}" ,190 "payload": {} ,191 })192 except Exception as e:193 print( stackprinter.format() )194 return False195 def get_current_app( self ):196 try:197 return self.websocket_send_json({198 "id": self.generate_message_id( "get_current_app" ) ,199 "type": "request" ,200 "uri": f"ssap://{self.endpoints.get_current_app_info}" ,201 "payload": {} ,202 })203 except Exception as e:204 print( stackprinter.format() )205 return False206 def launch_app( self , app_name="netflix" ):207 try:208 return self.websocket_send_json({209 "id": self.generate_message_id( "launch_app" ) ,210 "type": "request" ,211 "uri": f"ssap://{self.endpoints.launch}" ,212 "payload": {213 "id": app_name214 } ,215 })216 except Exception as e:217 print( stackprinter.format() )218 return False219 def launch_app_with_params( self , app_name="netflix" , params={} ):220 try:221 return self.websocket_send_json({222 "id": self.generate_message_id( "launch_app_with_params" ) ,223 "type": "request" ,224 "uri": f"ssap://{self.endpoints.launch}" ,225 "payload": {226 "id": app_name ,227 "params": params228 } ,229 })230 except Exception as e:231 print( stackprinter.format() )232 return False233 def launch_app_with_with_content_id( self , app_name="netflix" , content_id=False ):234 try:235 return self.websocket_send_json({236 "id": self.generate_message_id( "launch_app_with_content_id" ) ,237 "type": "request" ,238 "uri": f"ssap://{self.endpoints.launch}" ,239 "payload": {240 "id": app_name ,241 "contentId": content_id242 } ,243 })244 except Exception as e:245 print( stackprinter.format() )246 return False247 def close_app( self , app_name="netflix" ):248 try:249 return self.websocket_send_json({250 "id": self.generate_message_id( "close_app" ) ,251 "type": "request" ,252 "uri": f"ssap://{self.endpoints.launch}" ,253 "payload": {254 "id": app_name255 }256 })257 except Exception as e:258 print( stackprinter.format() )259 return False260 # Services261 def get_services( self ):262 try:263 return self.websocket_send_json({264 "id": self.generate_message_id( "get_services" ) ,265 "type": "request" ,266 "uri": f"ssap://{self.endpoints.get_services}" ,267 "payload": {}268 })269 except Exception as e:270 print( stackprinter.format() )271 return False272 def get_software_info( self ):273 try:274 return self.websocket_send_json({275 "id": self.generate_message_id( "get_software_info" ) ,276 "type": "request" ,277 "uri": f"ssap://{self.endpoints.get_software_info}" ,278 "payload": {}279 })280 except Exception as e:281 print( stackprinter.format() )282 return False283 def power_off( self ):284 try:285 return self.websocket_send_json({286 "id": self.generate_message_id( "power_off" ) ,287 "type": "request" ,288 "uri": f"ssap://{self.endpoints.power_off}" ,289 "payload": {}290 })291 except Exception as e:292 print( stackprinter.format() )293 return False294 def power_on( self ):295 try:296 result = self.websocket_send_json({297 "id": self.generate_message_id( "power_on" ) ,298 "type": "request" ,299 "uri": f"ssap://{self.endpoints.power_on}" ,300 "payload": {}301 })302 if "mac_address" in self.config:303 send_magic_packet( self.config.mac_address )304 return result305 except Exception as e:306 print( stackprinter.format() )307 return False308 # 3D Mode309 def turn_3d_on( self ):310 try:311 return self.websocket_send_json({312 "id": self.generate_message_id( "turn_3d_on" ) ,313 "type": "request" ,314 "uri": f"ssap://{self.endpoints.x3d_on}" ,315 "payload": {}316 })317 except Exception as e:318 print( stackprinter.format() )319 return False320 def turn_3d_off( self ):321 try:322 return self.websocket_send_json({323 "id": self.generate_message_id( "turn_3d_off" ) ,324 "type": "request" ,325 "uri": f"ssap://{self.endpoints.x3d_off}" ,326 "payload": {}327 })328 except Exception as e:329 print( stackprinter.format() )330 return False331 # Inputs332 def get_inputs( self ):333 try:334 return self.websocket_send_json({335 "id": self.generate_message_id( "get_inputs" ) ,336 "type": "request" ,337 "uri": f"ssap://{self.endpoints.get_inputs}" ,338 "payload": {}339 })340 except Exception as e:341 print( stackprinter.format() )342 return False343 def get_input( self ):344 try:345 return self.get_current_app()346 except Exception as e:347 print( stackprinter.format() )348 return False349 def set_input( self , input_name="HDMI-1" ):350 try:351 return self.websocket_send_json({352 "id": self.generate_message_id( "set_input" ) ,353 "type": "request" ,354 "uri": f"ssap://{self.endpoints.set_input}" ,355 "payload": {356 "inputId": input_name357 }358 })359 except Exception as e:360 print( stackprinter.format() )361 return False362 # Audio363 def get_audio_status( self ):364 try:365 return self.websocket_send_json({366 "id": self.generate_message_id( "get_audio_status" ) ,367 "type": "request" ,368 "uri": f"ssap://{self.endpoints.get_audio_status}" ,369 "payload": {}370 })371 except Exception as e:372 print( stackprinter.format() )373 return False374 # def get_muted(self):375 # """Get mute status."""376 # return self.get_audio_status().get('mute')377 def set_mute( self , mute=True ):378 try:379 return self.websocket_send_json({380 "id": self.generate_message_id( "set_mute" ) ,381 "type": "request" ,382 "uri": f"ssap://{self.endpoints.set_mute}" ,383 "payload": {384 "mute": mute385 }386 })387 except Exception as e:388 print( stackprinter.format() )389 return False390 def get_volume( self ):391 try:392 return self.websocket_send_json({393 "id": self.generate_message_id( "get_volume" ) ,394 "type": "request" ,395 "uri": f"ssap://{self.endpoints.get_volume}" ,396 "payload": {}397 })398 except Exception as e:399 print( stackprinter.format() )400 return False401 def set_volume( self , volume=11 ):402 try:403 return self.websocket_send_json({404 "id": self.generate_message_id( "set_volume" ) ,405 "type": "request" ,406 "uri": f"ssap://{self.endpoints.set_volume}" ,407 "payload": {408 "volume": volume409 }410 })411 except Exception as e:412 print( stackprinter.format() )413 return False414 def volume_up( self ):415 try:416 return self.websocket_send_json({417 "id": self.generate_message_id( "volume_up" ) ,418 "type": "request" ,419 "uri": f"ssap://{self.endpoints.volume_up}" ,420 "payload": {}421 })422 except Exception as e:423 print( stackprinter.format() )424 return False425 def volume_down( self ):426 try:427 return self.websocket_send_json({428 "id": self.generate_message_id( "volume_down" ) ,429 "type": "request" ,430 "uri": f"ssap://{self.endpoints.volume_down}" ,431 "payload": {}432 })433 except Exception as e:434 print( stackprinter.format() )435 return False436 # TV Channel437 def channel_up( self ):438 try:439 return self.websocket_send_json({440 "id": self.generate_message_id( "channel_up" ) ,441 "type": "request" ,442 "uri": f"ssap://{self.endpoints.tv_channel_up}" ,443 "payload": {}444 })445 except Exception as e:446 print( stackprinter.format() )447 return False448 def channel_down( self ):449 try:450 return self.websocket_send_json({451 "id": self.generate_message_id( "channel_down" ) ,452 "type": "request" ,453 "uri": f"ssap://{self.endpoints.tv_channel_down}" ,454 "payload": {}455 })456 except Exception as e:457 print( stackprinter.format() )458 return False459 def get_channels( self ):460 try:461 return self.websocket_send_json({462 "id": self.generate_message_id( "get_channels" ) ,463 "type": "request" ,464 "uri": f"ssap://{self.endpoints.get_tv_channels}" ,465 "payload": {}466 })467 except Exception as e:468 print( stackprinter.format() )469 return False470 def get_current_channel( self ):471 try:472 return self.websocket_send_json({473 "id": self.generate_message_id( "get_current_channel" ) ,474 "type": "request" ,475 "uri": f"ssap://{self.endpoints.get_current_channel}" ,476 "payload": {}477 })478 except Exception as e:479 print( stackprinter.format() )480 return False481 def get_channel_info( self ):482 try:483 return self.websocket_send_json({484 "id": self.generate_message_id( "get_channel_info" ) ,485 "type": "request" ,486 "uri": f"ssap://{self.endpoints.get_channel_info}" ,487 "payload": {}488 })489 except Exception as e:490 print( stackprinter.format() )491 return False492 def set_channel( self , channel=3 ):493 try:494 return self.websocket_send_json({495 "id": self.generate_message_id( "set_channel" ) ,496 "type": "request" ,497 "uri": f"ssap://{self.endpoints.set_channel}" ,498 "payload": {499 "channelId": channel500 }501 })502 except Exception as e:503 print( stackprinter.format() )504 return False505 # Media control506 def play( self ):507 try:508 return self.websocket_send_json({509 "id": self.generate_message_id( "play" ) ,510 "type": "request" ,511 "uri": f"ssap://{self.endpoints.media_play}" ,512 "payload": {}513 })514 except Exception as e:515 print( stackprinter.format() )516 return False517 def pause( self ):518 try:519 return self.websocket_send_json({520 "id": self.generate_message_id( "pause" ) ,521 "type": "request" ,522 "uri": f"ssap://{self.endpoints.media_pause}" ,523 "payload": {}524 })525 except Exception as e:526 print( stackprinter.format() )527 return False528 def stop( self ):529 try:530 return self.websocket_send_json({531 "id": self.generate_message_id( "stop" ) ,532 "type": "request" ,533 "uri": f"ssap://{self.endpoints.media_stop}" ,534 "payload": {}535 })536 except Exception as e:537 print( stackprinter.format() )538 return False539 def close( self ):540 try:541 return self.websocket_send_json({542 "id": self.generate_message_id( "close" ) ,543 "type": "request" ,544 "uri": f"ssap://{self.endpoints.media_close}" ,545 "payload": {}546 })547 except Exception as e:548 print( stackprinter.format() )549 return False550 def rewind( self ):551 try:552 return self.websocket_send_json({553 "id": self.generate_message_id( "rewind" ) ,554 "type": "request" ,555 "uri": f"ssap://{self.endpoints.media_rewind}" ,556 "payload": {}557 })558 except Exception as e:559 print( stackprinter.format() )560 return False561 def fast_forward( self ):562 try:563 return self.websocket_send_json({564 "id": self.generate_message_id( "fast_forward" ) ,565 "type": "request" ,566 "uri": f"ssap://{self.endpoints.media_fast_forward}" ,567 "payload": {}568 })569 except Exception as e:570 print( stackprinter.format() )571 return False572 # Keys573 def send_enter_key( self ):574 try:575 return self.websocket_send_json({576 "id": self.generate_message_id( "send_enter_key" ) ,577 "type": "request" ,578 "uri": f"ssap://{self.endpoints.send_enter}" ,579 "payload": {}580 })581 except Exception as e:582 print( stackprinter.format() )583 return False584 def send_delete_key( self ):585 try:586 return self.websocket_send_json({587 "id": self.generate_message_id( "send_delete_key" ) ,588 "type": "request" ,589 "uri": f"ssap://{self.endpoints.send_delete}" ,590 "payload": {}591 })592 except Exception as e:593 print( stackprinter.format() )594 return False595 # Web596 def open_url( self , url="https://www.windy.com/-Thunderstorms-thunder?thunder,39.793,-80.717,6" ):597 try:598 return self.websocket_send_json({599 "id": self.generate_message_id( "open_url" ) ,600 "type": "request" ,601 "uri": f"ssap://{self.endpoints.open}" ,602 "payload": {603 "target": url604 }605 })606 except Exception as e:607 print( stackprinter.format() )608 return False609 def close_web( self ):610 try:611 return self.websocket_send_json({612 "id": self.generate_message_id( "close_web" ) ,613 "type": "request" ,614 "uri": f"ssap://{self.endpoints.close_web_app}" ,615 "payload": {}616 })617 except Exception as e:618 print( stackprinter.format() )619 return False620 # def press_button( self , button_name="MENU" ):621 # try:622 # self.open_websocket()623 # self.handshake()624 # self.ws.send(json.dumps({625 # "id": self.generate_message_id( "close_web" ) ,626 # "type": "request" ,627 # "uri": f"ssap://{self.endpoints.close_web_app}" ,628 # "payload": {}629 # }))630 # result = self.ws.recv()631 # self.close_websocket()632 # return result633 # except Exception as e:634 # print( stackprinter.format() )...

Full Screen

Full Screen

test_inbox_message.py

Source:test_inbox_message.py Github

copy

Full Screen

...16 def test_01(self):17 """18 Send a message to test_inbox_1 with a valid destination collection name19 """20 inbox = tm11.InboxMessage(generate_message_id(), destination_collection_names=['default'])21 make_request('/test_inbox_1/', inbox.to_xml(), get_headers(VID_TAXII_SERVICES_11, False), MSG_STATUS_MESSAGE, ST_SUCCESS)22 def test_02(self):23 """24 Send a message to test_inbox_1 with an invalid destination collection name25 """26 inbox = tm11.InboxMessage(generate_message_id(), destination_collection_names=['default_INVALID'])27 make_request('/test_inbox_1/', inbox.to_xml(), get_headers(VID_TAXII_SERVICES_11, False), MSG_STATUS_MESSAGE, ST_NOT_FOUND, sd_keys=[SD_ITEM])28 def test_03(self):29 """30 Send a message to test_inbox_1 without a destination collection name31 """32 inbox = tm11.InboxMessage(generate_message_id())33 make_request('/test_inbox_1/',34 inbox.to_xml(),35 get_headers(VID_TAXII_SERVICES_11, False),36 MSG_STATUS_MESSAGE,37 st=ST_DESTINATION_COLLECTION_ERROR,38 sd_keys=[SD_ACCEPTABLE_DESTINATION])39 def test_04(self):40 """41 Send a message to test_inbox_2 with a valid destination collection name42 """43 inbox = tm11.InboxMessage(generate_message_id(), destination_collection_names=['default'])44 make_request('/test_inbox_2/',45 inbox.to_xml(),46 get_headers(VID_TAXII_SERVICES_11, False),47 MSG_STATUS_MESSAGE,48 st=ST_SUCCESS)49 def test_05(self):50 """51 Send a message to test_inbox_2 with an invalid destination collection name52 """53 inbox = tm11.InboxMessage(generate_message_id(), destination_collection_names=['default_INVALID'])54 make_request('/test_inbox_2/',55 inbox.to_xml(),56 get_headers(VID_TAXII_SERVICES_11, False),57 MSG_STATUS_MESSAGE,58 st=ST_NOT_FOUND,59 sd_keys=[SD_ITEM])60 def test_06(self):61 """62 Send a message to test_inbox_2 without a destination collection name63 """64 inbox = tm11.InboxMessage(generate_message_id())65 make_request('/test_inbox_2/',66 inbox.to_xml(),67 get_headers(VID_TAXII_SERVICES_11, False),68 MSG_STATUS_MESSAGE,69 st=ST_SUCCESS)70 def test_08(self):71 """72 Send a message to test_inbox_3 with a destination collection name73 """74 inbox = tm11.InboxMessage(generate_message_id(), destination_collection_names=['default'])75 make_request('/test_inbox_3/',76 inbox.to_xml(),77 get_headers(VID_TAXII_SERVICES_11, False),78 MSG_STATUS_MESSAGE,79 st=ST_DESTINATION_COLLECTION_ERROR)80 def test_09(self):81 """82 Send a message to test_inbox_3 without a destination collection name83 """84 inbox = tm11.InboxMessage(generate_message_id())85 make_request('/test_inbox_3/',86 inbox.to_xml(),87 get_headers(VID_TAXII_SERVICES_11, False),88 MSG_STATUS_MESSAGE,89 st=ST_SUCCESS)90 def test_10(self):91 """92 Send an Inbox message with a Record Count93 """94 inbox = tm11.InboxMessage(generate_message_id(), destination_collection_names=['default'])95 inbox.record_count = tm11.RecordCount(0, True)96 make_request('/test_inbox_1/',97 inbox.to_xml(),98 get_headers(VID_TAXII_SERVICES_11, False),99 MSG_STATUS_MESSAGE,100 st=ST_SUCCESS)101 def test_11(self):102 """103 Replicate the InboxClientScript104 """105 from libtaxii.scripts.inbox_client import InboxClient11Script106 stix_xml = InboxClient11Script.stix_watchlist107 cb = tm11.ContentBlock(tm11.ContentBinding(CB_STIX_XML_111), stix_xml)108 inbox = tm11.InboxMessage(message_id=generate_message_id(),109 destination_collection_names=['default'],110 content_blocks=[cb])111 make_request('/test_inbox_1/',112 inbox.to_xml(),113 get_headers(VID_TAXII_SERVICES_11, False),114 MSG_STATUS_MESSAGE,115 st=ST_SUCCESS)116class InboxTests10(TestCase):117 def setUp(self):118 settings.DEBUG = True119 add_basics()120 add_inbox_service()121 def test_01(self):122 """123 Send a TAXII 1.0 Inbox Message to /test_inbox_1/. Will always124 fail because /test_inbox_1/ requires a DCN and TAXII 1.0 cannot specify that.125 """126 inbox = tm10.InboxMessage(generate_message_id())127 make_request('/test_inbox_1/',128 inbox.to_xml(),129 get_headers(VID_TAXII_SERVICES_10, False),130 MSG_STATUS_MESSAGE,131 st=ST_DESTINATION_COLLECTION_ERROR) # TODO: Is this the right behavior? it's kind of a TAXII 1.1 error for a TAXII 1.0 request but not really132 def test_02(self):133 """134 Send a TAXII 1.0 Inbox Message to /test_inbox_2/135 """136 inbox = tm10.InboxMessage(generate_message_id())137 make_request('/test_inbox_2/',138 inbox.to_xml(),139 get_headers(VID_TAXII_SERVICES_10, False),140 MSG_STATUS_MESSAGE,141 st=ST_SUCCESS)142 def test_03(self):143 """144 Send a TAXII 1.0 Inbox Message to /test_inbox_3/145 """146 inbox = tm10.InboxMessage(generate_message_id())147 make_request('/test_inbox_3/',148 inbox.to_xml(),149 get_headers(VID_TAXII_SERVICES_10, False),150 MSG_STATUS_MESSAGE,151 st=ST_SUCCESS)152 def test_04(self):153 """154 Send a TAXII 1.0 Inbox Message to /test_inbox_3/ with two content blocks155 """156 cb1 = tm10.ContentBlock(content_binding=CB_STIX_XML_111, content=stix_watchlist_111)157 cb2 = tm10.ContentBlock(content_binding=CB_STIX_XML_111, content=stix_watchlist_111)158 inbox = tm10.InboxMessage(message_id=generate_message_id(),159 content_blocks=[cb1, cb2])160 make_request('/test_inbox_3/',161 inbox.to_xml(),162 get_headers(VID_TAXII_SERVICES_10, False),163 MSG_STATUS_MESSAGE,...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful