How to use _get_base_url method in robotframework-appiumlibrary

Best Python code snippet using robotframework-appiumlibrary_python

tests.py

Source:tests.py Github

copy

Full Screen

...92 CWE.objects.all().delete()93 # Delete all keywords.94 Keyword.objects.all().delete()95 # Helper methods96 def _get_base_url(self):97 return reverse("restapi_CWETextRelated")98 def _form_url_params(self, text):99 return {CWERelatedList.PARAM_TEXT: text}100 def _cwe_info_found(self, content, code):101 cwe_repr = "{\"id\":" + str(code-100) + ",\"code\":" + str(code) + ",\"name\":\"CWE #" + str(code) + "\"}"102 return cwe_repr in content103 def _cwe_info_empty(self, content):104 return content == '[]'105 # Positive test cases106 def test_positive_search_text_1_keyword(self):107 text = "authentication fails because ..."108 response = self.http_get(self._get_base_url(), self._form_url_params(text))109 self.assertEqual(response.status_code, status.HTTP_200_OK)110 self.assertEqual(self._cwe_info_found(content=response.content, code=101), True)111 self.assertEqual(self._cwe_info_found(content=response.content, code=102), False)112 self.assertEqual(self._cwe_info_found(content=response.content, code=103), False)113 def test_positive_search_text_multiple_keywords(self):114 text = "the user can bypass the file access check due to a stack overflow caused by ..."115 response = self.http_get(self._get_base_url(), self._form_url_params(text))116 self.assertEqual(self._cwe_info_found(content=response.content, code=101), False)117 self.assertEqual(self._cwe_info_found(content=response.content, code=102), True)118 self.assertEqual(self._cwe_info_found(content=response.content, code=103), True)119 def test_positive_search_text_multiple_keywords_with_limit(self):120 max_return = CWERelatedList.CWE_MAX_RETURN121 CWERelatedList.CWE_MAX_RETURN = 1 # Only return one CWE.122 text = "the user can bypass the file access check due to a stack overflow caused by ..."123 response = self.http_get(self._get_base_url(), self._form_url_params(text))124 self.assertEqual(self._cwe_info_found(content=response.content, code=101), False)125 self.assertEqual(self._cwe_info_found(content=response.content, code=102), True)126 self.assertEqual(self._cwe_info_found(content=response.content, code=103), False)127 CWERelatedList.CWE_MAX_RETURN = max_return128 # Negative test cases129 def test_negative_search_text_no_match(self):130 text = "the password is leaked because the security level is incorrectly set..."131 response = self.http_get(self._get_base_url(), self._form_url_params(text))132 self.assertEqual(self._cwe_info_found(content=response.content, code=101), False)133 self.assertEqual(self._cwe_info_found(content=response.content, code=102), False)134 self.assertEqual(self._cwe_info_found(content=response.content, code=103), False)135 def test_negative_no_authentication_token(self):136 text = "the password is leaked because the security level is incorrectly set..."137 response = self.http_get(self._get_base_url(), self._form_url_params(text),138 auth_token_type=RestAPITestBase.AUTH_TOKEN_TYPE_NONE)139 self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)140 def test_negative_inactive_authentication_token(self):141 text = "the password is leaked because the security level is incorrectly set..."142 response = self.http_get(self._get_base_url(), self._form_url_params(text),143 auth_token_type=RestAPITestBase.AUTH_TOKEN_TYPE_INACTIVE_USER)144 self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)145class TestCWEAllList(RestAPITestBase):146 CWE_CODES = [101, 102, 103] # The CWE codes used in the tests147 def set_up_test_data(self):148 # Construct the test database149 for code in self.CWE_CODES:150 cwe = CWE(code=code, name="CWE #"+str(code))151 cwe.save()152 def tear_down_test_data(self):153 # Destruct the test database154 for code in self.CWE_CODES:155 cwe = CWE.objects.get(code=code)156 cwe.delete()157 # Helper methods158 def _get_base_url(self):159 return reverse("restapi_CWEAll")160 def _form_url_params(self, offset=None, limit=None, code=None, name_contains=None):161 params = dict()162 if offset is not None:163 params[CWEAllList.PARAM_OFFSET] = str(offset)164 if limit is not None:165 params[CWEAllList.PARAM_LIMIT] = str(limit)166 if code is not None:167 params[CWEAllList.PARAM_CODE] = str(code)168 if name_contains is not None:169 params[CWEAllList.PARAM_NAME_CONTAINS] = str(name_contains)170 return params171 def _cwe_total_count(self, content):172 result = json.loads(content)173 return result[CWEAllList.RESPONSE_KEY_TOTAL_COUNT]174 def _cwe_info_found(self, content, code):175 result = json.loads(content)176 found = False177 for cwe_object in result[CWEAllList.RESPONSE_KEY_CWE_OBJECTS]:178 if (cwe_object['id'] == code-100 and179 cwe_object['code'] == code and180 cwe_object['name'] == "CWE #"+str(code)):181 found = True182 break183 return found184 def _cwe_info_empty(self, content):185 result = json.loads(content)186 return len(result[CWEAllList.RESPONSE_KEY_CWE_OBJECTS]) == 0187 # Positive test cases188 def test_positive_get_default_default(self):189 original_max_return = CWEAllList.MAX_RETURN190 CWEAllList.DEFAULT_LIMIT = 2 # For test purpose we only return at most two CWEs.191 response = self.http_get(self._get_base_url(), self._form_url_params(offset=None, limit=None))192 self.assertEqual(self._cwe_total_count(content=response.content), 3)193 self.assertEqual(self._cwe_info_found(content=response.content, code=101), True)194 self.assertEqual(self._cwe_info_found(content=response.content, code=102), True)195 CWEAllList.MAX_RETURN = original_max_return196 def test_positive_get_2_default(self):197 CWEAllList.DEFAULT_LIMIT = 2 # For test purpose we only return at most two CWEs.198 response = self.http_get(self._get_base_url(), self._form_url_params(offset=2, limit=None))199 self.assertEqual(self._cwe_total_count(content=response.content), 3)200 self.assertEqual(self._cwe_info_found(content=response.content, code=103), True)201 def test_positive_get_default_2(self):202 original_max_return = CWEAllList.MAX_RETURN203 CWEAllList.DEFAULT_OFFSET = 0 # For test purpose we only return at most two CWEs.204 response = self.http_get(self._get_base_url(), self._form_url_params(offset=None, limit=2))205 self.assertEqual(self._cwe_total_count(content=response.content), 3)206 self.assertEqual(self._cwe_info_found(content=response.content, code=101), True)207 self.assertEqual(self._cwe_info_found(content=response.content, code=102), True)208 CWEAllList.MAX_RETURN = original_max_return209 def test_positive_get_0_0(self):210 response = self.http_get(self._get_base_url(), self._form_url_params(offset=0, limit=0))211 self.assertEqual(self._cwe_total_count(content=response.content), 3)212 self.assertTrue(self._cwe_info_empty(content=response.content))213 def test_positive_get_0_1(self):214 response = self.http_get(self._get_base_url(), self._form_url_params(offset=0, limit=1))215 self.assertEqual(self._cwe_total_count(content=response.content), 3)216 self.assertEqual(self._cwe_info_found(content=response.content, code=101), True)217 def test_positive_get_0_2(self):218 response = self.http_get(self._get_base_url(), self._form_url_params(offset=0, limit=2))219 self.assertEqual(self._cwe_total_count(content=response.content), 3)220 self.assertEqual(self._cwe_info_found(content=response.content, code=101), True)221 self.assertEqual(self._cwe_info_found(content=response.content, code=102), True)222 def test_positive_get_0_10(self):223 response = self.http_get(self._get_base_url(), self._form_url_params(offset=0, limit=10))224 self.assertEqual(self._cwe_total_count(content=response.content), 3)225 self.assertEqual(self._cwe_info_found(content=response.content, code=101), True)226 self.assertEqual(self._cwe_info_found(content=response.content, code=102), True)227 self.assertEqual(self._cwe_info_found(content=response.content, code=103), True)228 def test_positive_get_2_0(self):229 response = self.http_get(self._get_base_url(), self._form_url_params(offset=2, limit=0))230 self.assertEqual(self._cwe_total_count(content=response.content), 3)231 self.assertTrue(self._cwe_info_empty(content=response.content))232 def test_positive_get_2_1(self):233 response = self.http_get(self._get_base_url(), self._form_url_params(offset=2, limit=1))234 self.assertEqual(self._cwe_total_count(content=response.content), 3)235 self.assertEqual(self._cwe_info_found(content=response.content, code=103), True)236 def test_positive_get_2_10(self):237 response = self.http_get(self._get_base_url(), self._form_url_params(offset=2, limit=10))238 self.assertEqual(self._cwe_total_count(content=response.content), 3)239 self.assertEqual(self._cwe_info_found(content=response.content, code=103), True)240 def test_positive_get_0_1_max_2(self):241 original_max_return = CWEAllList.MAX_RETURN242 CWEAllList.MAX_RETURN = 2243 response = self.http_get(self._get_base_url(), self._form_url_params(offset=0, limit=1))244 self.assertEqual(response.status_code, status.HTTP_200_OK)245 self.assertEqual(self._cwe_total_count(content=response.content), 3)246 self.assertEqual(self._cwe_info_found(content=response.content, code=101), True)247 CWEAllList.MAX_RETURN = original_max_return248 def test_positive_get_0_10_max_2(self):249 original_max_return = CWEAllList.MAX_RETURN250 CWEAllList.MAX_RETURN = 2251 response = self.http_get(self._get_base_url(), self._form_url_params(offset=0, limit=10))252 self.assertEqual(response.status_code, status.HTTP_200_OK)253 self.assertEqual(self._cwe_total_count(content=response.content), 3)254 self.assertEqual(self._cwe_info_found(content=response.content, code=101), True)255 self.assertEqual(self._cwe_info_found(content=response.content, code=102), True)256 self.assertEqual(self._cwe_info_found(content=response.content, code=103), False)257 CWEAllList.MAX_RETURN = original_max_return258 def test_positive_get_0_3_102_None(self):259 response = self.http_get(self._get_base_url(), self._form_url_params(offset=0, limit=3, code=102))260 self.assertEqual(response.status_code, status.HTTP_200_OK)261 self.assertEqual(self._cwe_total_count(content=response.content), 1)262 self.assertEqual(self._cwe_info_found(content=response.content, code=101), False)263 self.assertEqual(self._cwe_info_found(content=response.content, code=102), True)264 self.assertEqual(self._cwe_info_found(content=response.content, code=103), False)265 def test_positive_get_0_3_None_CWE(self):266 response = self.http_get(self._get_base_url(),267 self._form_url_params(offset=0, limit=3, name_contains="CWE"))268 self.assertEqual(response.status_code, status.HTTP_200_OK)269 self.assertEqual(self._cwe_total_count(content=response.content), 3)270 self.assertEqual(self._cwe_info_found(content=response.content, code=101), True)271 self.assertEqual(self._cwe_info_found(content=response.content, code=102), True)272 self.assertEqual(self._cwe_info_found(content=response.content, code=103), True)273 def test_positive_get_0_2_None_CWE(self):274 response = self.http_get(self._get_base_url(),275 self._form_url_params(offset=0, limit=2, name_contains="CWE"))276 self.assertEqual(response.status_code, status.HTTP_200_OK)277 self.assertEqual(self._cwe_total_count(content=response.content), 3)278 self.assertEqual(self._cwe_info_found(content=response.content, code=101), True)279 self.assertEqual(self._cwe_info_found(content=response.content, code=102), True)280 self.assertEqual(self._cwe_info_found(content=response.content, code=103), False)281 # Negative test cases282 def test_negative_get_3_default(self):283 response = self.http_get(self._get_base_url(), self._form_url_params(offset=3, limit=None))284 self.assertEqual(response.status_code, status.HTTP_200_OK)285 self.assertEqual(self._cwe_total_count(content=response.content), 3)286 self.assertTrue(self._cwe_info_empty(content=response.content))287 def test_negative_get_very_large_offset(self):288 response = self.http_get(self._get_base_url(), self._form_url_params(offset=self.VERY_LARGE_NUM, limit=None))289 self.assertEqual(response.status_code, status.HTTP_200_OK)290 self.assertEqual(self._cwe_total_count(content=response.content), 3)291 self.assertTrue(self._cwe_info_empty(content=response.content))292 def test_negative_get_3_0(self):293 response = self.http_get(self._get_base_url(), self._form_url_params(offset=3, limit=0))294 self.assertEqual(response.status_code, status.HTTP_200_OK)295 self.assertEqual(self._cwe_total_count(content=response.content), 3)296 self.assertTrue(self._cwe_info_empty(content=response.content))297 def test_negative_get_3_1(self):298 response = self.http_get(self._get_base_url(), self._form_url_params(offset=3, limit=1))299 self.assertEqual(response.status_code, status.HTTP_200_OK)300 self.assertEqual(self._cwe_total_count(content=response.content), 3)301 self.assertTrue(self._cwe_info_empty(content=response.content))302 def test_negative_get_3_10(self):303 response = self.http_get(self._get_base_url(), self._form_url_params(offset=3, limit=10))304 self.assertEqual(response.status_code, status.HTTP_200_OK)305 self.assertEqual(self._cwe_total_count(content=response.content), 3)306 self.assertTrue(self._cwe_info_empty(content=response.content))307 def test_negative_get_0_very_large_limit(self):308 original_max_return = CWEAllList.MAX_RETURN309 CWEAllList.MAX_RETURN = 2310 response = self.http_get(self._get_base_url(), self._form_url_params(offset=0, limit=self.VERY_LARGE_NUM))311 self.assertEqual(response.status_code, status.HTTP_200_OK)312 self.assertEqual(self._cwe_total_count(content=response.content), 3)313 self.assertEqual(self._cwe_info_found(content=response.content, code=101), True)314 self.assertEqual(self._cwe_info_found(content=response.content, code=102), True)315 self.assertEqual(self._cwe_info_found(content=response.content, code=103), False)316 CWEAllList.MAX_RETURN = original_max_return317 def test_negative_get_n1_1(self):318 response = self.http_get(self._get_base_url(), self._form_url_params(offset=-1, limit=1))319 self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)320 def test_negative_get_1_n1(self):321 response = self.http_get(self._get_base_url(), self._form_url_params(offset=1, limit=-1))322 self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)323 def test_negative_invalid_inputs(self):324 invalid_inputs = [325 ("0", "a"), # 'limit' is not integer.326 ("0", "1a"), # 'limit' is not integer.327 ("0", "a1"), # 'limit' is not integer.328 ("a", "1"), # 'offset' is not integer.329 ("1a", "1"), # 'offset' is not integer.330 ("a1", "1"), # 'offset' is not integer.331 ]332 for input_pair in invalid_inputs:333 response = self.http_get(self._get_base_url(), self._form_url_params(input_pair[0], input_pair[1]))334 self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)335 def test_negative_no_authentication_token(self):336 response = self.http_get(self._get_base_url(), self._form_url_params(offset=2, limit=1),337 auth_token_type=RestAPITestBase.AUTH_TOKEN_TYPE_NONE)338 self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)339 def test_negative_inactive_authentication_token(self):340 response = self.http_get(self._get_base_url(), self._form_url_params(offset=2, limit=1),341 auth_token_type=RestAPITestBase.AUTH_TOKEN_TYPE_INACTIVE_USER)342 self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)343 def test_negative_get_0_2_None_name(self):344 response = self.http_get(self._get_base_url(),345 self._form_url_params(offset=0, limit=2, name_contains="name"))346 self.assertEqual(response.status_code, status.HTTP_200_OK)347 self.assertEqual(self._cwe_total_count(content=response.content), 0)348 self.assertTrue(self._cwe_info_empty(content=response.content))349 def test_negative_get_0_3_code_name_contains_both_present(self):350 response = self.http_get(self._get_base_url(),351 self._form_url_params(offset=0, limit=3, code=102, name_contains="CWE"))352 self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)353 def test_negative_get_0_3_abc_None(self):354 invalid_codes = [355 "102a",356 "a102",357 "abc",358 "+-*/"359 ]360 for invalid_code in invalid_codes:361 response = self.http_get(self._get_base_url(),362 self._form_url_params(offset=0, limit=3, code=invalid_code))363 self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)364class TestCWESearchSingleString(RestAPITestBase):365 CWE_CODES = [0, 1, 11] # The CWE codes used in the tests366 CWE_ID_MAP = {0: 1, 1: 2, 11: 3} # The CWE's IDs.367 def set_up_test_data(self):368 # Construct the test database369 for code in self.CWE_CODES:370 cwe = CWE(code=code, name="CWE #"+str(code))371 cwe.save()372 def tear_down_test_data(self):373 # Destruct the test database374 for code in self.CWE_CODES:375 cwe = CWE.objects.get(code=code)376 cwe.delete()377 # Helper methods378 def _get_base_url(self):379 return reverse("restapi_CWESearchSingleString")380 def _form_url_params(self, offset=None, limit=None, search_str=None):381 params = dict()382 if offset is not None:383 params[CWESearchSingleString.PARAM_OFFSET] = str(offset)384 if limit is not None:385 params[CWESearchSingleString.PARAM_LIMIT] = str(limit)386 if search_str is not None:387 params[CWESearchSingleString.PARAM_SEARCH_STR] = str(search_str)388 return params389 def _cwe_total_count(self, content):390 result = json.loads(content)391 return result[CWESearchSingleString.RESPONSE_KEY_TOTAL_COUNT]392 def _cwe_info_found(self, content, code):393 result = json.loads(content)394 found = False395 for cwe_object in result[CWESearchSingleString.RESPONSE_KEY_CWE_OBJECTS]:396 if (cwe_object['id'] == self.CWE_ID_MAP[code] and397 cwe_object['code'] == code and398 cwe_object['name'] == "CWE #"+str(code)):399 found = True400 break401 return found402 def _cwe_info_empty(self, content):403 result = json.loads(content)404 return len(result[CWESearchSingleString.RESPONSE_KEY_CWE_OBJECTS]) == 0405 # Positive test cases406 def test_positive_get_default_default(self):407 original_max_return = CWESearchSingleString.MAX_RETURN408 CWESearchSingleString.DEFAULT_LIMIT = 2 # For test purpose we only return at most two CWEs.409 response = self.http_get(self._get_base_url(), self._form_url_params(offset=None, limit=None))410 self.assertEqual(self._cwe_total_count(content=response.content), 3)411 self.assertEqual(self._cwe_info_found(content=response.content, code=0), True)412 self.assertEqual(self._cwe_info_found(content=response.content, code=1), True)413 CWESearchSingleString.MAX_RETURN = original_max_return414 def test_positive_get_2_default(self):415 original_max_return = CWESearchSingleString.MAX_RETURN416 CWESearchSingleString.MAX_RETURN = 2 # For test purpose we only return at most two CWEs.417 response = self.http_get(self._get_base_url(), self._form_url_params(offset=2, limit=None))418 self.assertEqual(self._cwe_total_count(content=response.content), 3)419 self.assertEqual(self._cwe_info_found(content=response.content, code=0), False)420 self.assertEqual(self._cwe_info_found(content=response.content, code=1), False)421 self.assertEqual(self._cwe_info_found(content=response.content, code=11), True)422 CWESearchSingleString.MAX_RETURN = original_max_return423 def test_positive_get_default_2(self):424 original_max_return = CWESearchSingleString.MAX_RETURN425 CWESearchSingleString.DEFAULT_OFFSET = 0 # For test purpose we only return at most two CWEs.426 response = self.http_get(self._get_base_url(), self._form_url_params(offset=None, limit=2))427 self.assertEqual(self._cwe_total_count(content=response.content), 3)428 self.assertEqual(self._cwe_info_found(content=response.content, code=0), True)429 self.assertEqual(self._cwe_info_found(content=response.content, code=1), True)430 self.assertEqual(self._cwe_info_found(content=response.content, code=11), False)431 CWESearchSingleString.MAX_RETURN = original_max_return432 def test_positive_get_0_0(self):433 response = self.http_get(self._get_base_url(), self._form_url_params(offset=0, limit=0))434 self.assertEqual(self._cwe_total_count(content=response.content), 3)435 self.assertTrue(self._cwe_info_empty(content=response.content))436 def test_positive_get_0_1(self):437 response = self.http_get(self._get_base_url(), self._form_url_params(offset=0, limit=1))438 self.assertEqual(self._cwe_total_count(content=response.content), 3)439 self.assertEqual(self._cwe_info_found(content=response.content, code=0), True)440 self.assertEqual(self._cwe_info_found(content=response.content, code=1), False)441 self.assertEqual(self._cwe_info_found(content=response.content, code=11), False)442 def test_positive_get_0_2(self):443 response = self.http_get(self._get_base_url(), self._form_url_params(offset=0, limit=2))444 self.assertEqual(self._cwe_total_count(content=response.content), 3)445 self.assertEqual(self._cwe_info_found(content=response.content, code=0), True)446 self.assertEqual(self._cwe_info_found(content=response.content, code=1), True)447 self.assertEqual(self._cwe_info_found(content=response.content, code=11), False)448 def test_positive_get_0_10(self):449 response = self.http_get(self._get_base_url(), self._form_url_params(offset=0, limit=10))450 self.assertEqual(self._cwe_total_count(content=response.content), 3)451 self.assertEqual(self._cwe_info_found(content=response.content, code=0), True)452 self.assertEqual(self._cwe_info_found(content=response.content, code=1), True)453 self.assertEqual(self._cwe_info_found(content=response.content, code=11), True)454 def test_positive_get_2_0(self):455 response = self.http_get(self._get_base_url(), self._form_url_params(offset=2, limit=0))456 self.assertEqual(self._cwe_total_count(content=response.content), 3)457 self.assertTrue(self._cwe_info_empty(content=response.content))458 def test_positive_get_2_1(self):459 response = self.http_get(self._get_base_url(), self._form_url_params(offset=2, limit=1))460 self.assertEqual(self._cwe_total_count(content=response.content), 3)461 self.assertEqual(self._cwe_info_found(content=response.content, code=0), False)462 self.assertEqual(self._cwe_info_found(content=response.content, code=1), False)463 self.assertEqual(self._cwe_info_found(content=response.content, code=11), True)464 def test_positive_get_2_10(self):465 response = self.http_get(self._get_base_url(), self._form_url_params(offset=2, limit=10))466 self.assertEqual(self._cwe_total_count(content=response.content), 3)467 self.assertEqual(self._cwe_info_found(content=response.content, code=0), False)468 self.assertEqual(self._cwe_info_found(content=response.content, code=1), False)469 self.assertEqual(self._cwe_info_found(content=response.content, code=11), True)470 def test_positive_get_0_1_max_2(self):471 original_max_return = CWESearchSingleString.MAX_RETURN472 CWESearchSingleString.MAX_RETURN = 2473 response = self.http_get(self._get_base_url(), self._form_url_params(offset=0, limit=1))474 self.assertEqual(response.status_code, status.HTTP_200_OK)475 self.assertEqual(self._cwe_total_count(content=response.content), 3)476 self.assertEqual(self._cwe_info_found(content=response.content, code=0), True)477 self.assertEqual(self._cwe_info_found(content=response.content, code=1), False)478 self.assertEqual(self._cwe_info_found(content=response.content, code=11), False)479 CWESearchSingleString.MAX_RETURN = original_max_return480 def test_positive_get_0_10_max_2(self):481 original_max_return = CWESearchSingleString.MAX_RETURN482 CWESearchSingleString.MAX_RETURN = 2483 response = self.http_get(self._get_base_url(), self._form_url_params(offset=0, limit=10))484 self.assertEqual(response.status_code, status.HTTP_200_OK)485 self.assertEqual(self._cwe_total_count(content=response.content), 3)486 self.assertEqual(self._cwe_info_found(content=response.content, code=0), True)487 self.assertEqual(self._cwe_info_found(content=response.content, code=1), True)488 self.assertEqual(self._cwe_info_found(content=response.content, code=11), False)489 CWESearchSingleString.MAX_RETURN = original_max_return490 def test_positive_get_0_3_1_None(self):491 response = self.http_get(self._get_base_url(), self._form_url_params(offset=0, limit=3, search_str="1"))492 self.assertEqual(response.status_code, status.HTTP_200_OK)493 self.assertEqual(self._cwe_total_count(content=response.content), 2)494 self.assertEqual(self._cwe_info_found(content=response.content, code=0), False)495 self.assertEqual(self._cwe_info_found(content=response.content, code=1), True)496 self.assertEqual(self._cwe_info_found(content=response.content, code=11), True)497 def test_positive_get_0_3_None_CWE(self):498 response = self.http_get(self._get_base_url(),499 self._form_url_params(offset=0, limit=3, search_str="CWE"))500 self.assertEqual(response.status_code, status.HTTP_200_OK)501 self.assertEqual(self._cwe_total_count(content=response.content), 3)502 self.assertEqual(self._cwe_info_found(content=response.content, code=0), True)503 self.assertEqual(self._cwe_info_found(content=response.content, code=1), True)504 self.assertEqual(self._cwe_info_found(content=response.content, code=11), True)505 def test_positive_get_0_2_None_CWE(self):506 response = self.http_get(self._get_base_url(),507 self._form_url_params(offset=0, limit=2, search_str="CWE"))508 self.assertEqual(response.status_code, status.HTTP_200_OK)509 self.assertEqual(self._cwe_total_count(content=response.content), 3)510 self.assertEqual(self._cwe_info_found(content=response.content, code=0), True)511 self.assertEqual(self._cwe_info_found(content=response.content, code=1), True)512 self.assertEqual(self._cwe_info_found(content=response.content, code=11), False)513 def test_negative_get_0_3_abc_None(self):514 search_strings = [515 "102a",516 "a102",517 "abc",518 "+-*/"519 ]520 for search_string in search_strings:521 response = self.http_get(self._get_base_url(),522 self._form_url_params(offset=0, limit=3, search_str=search_string))523 self.assertEqual(self._cwe_total_count(content=response.content), 0)524 self.assertTrue(self._cwe_info_empty(content=response.content))525 # Negative test cases526 def test_negative_get_3_default(self):527 response = self.http_get(self._get_base_url(), self._form_url_params(offset=3, limit=None))528 self.assertEqual(response.status_code, status.HTTP_200_OK)529 self.assertEqual(self._cwe_total_count(content=response.content), 3)530 self.assertTrue(self._cwe_info_empty(content=response.content))531 def test_negative_get_very_large_offset(self):532 response = self.http_get(self._get_base_url(), self._form_url_params(offset=self.VERY_LARGE_NUM, limit=None))533 self.assertEqual(response.status_code, status.HTTP_200_OK)534 self.assertEqual(self._cwe_total_count(content=response.content), 3)535 self.assertTrue(self._cwe_info_empty(content=response.content))536 def test_negative_get_3_0(self):537 response = self.http_get(self._get_base_url(), self._form_url_params(offset=3, limit=0))538 self.assertEqual(response.status_code, status.HTTP_200_OK)539 self.assertEqual(self._cwe_total_count(content=response.content), 3)540 self.assertTrue(self._cwe_info_empty(content=response.content))541 def test_negative_get_3_1(self):542 response = self.http_get(self._get_base_url(), self._form_url_params(offset=3, limit=1))543 self.assertEqual(response.status_code, status.HTTP_200_OK)544 self.assertEqual(self._cwe_total_count(content=response.content), 3)545 self.assertTrue(self._cwe_info_empty(content=response.content))546 def test_negative_get_3_10(self):547 response = self.http_get(self._get_base_url(), self._form_url_params(offset=3, limit=10))548 self.assertEqual(response.status_code, status.HTTP_200_OK)549 self.assertEqual(self._cwe_total_count(content=response.content), 3)550 self.assertTrue(self._cwe_info_empty(content=response.content))551 def test_negative_get_0_very_large_limit(self):552 original_max_return = CWESearchSingleString.MAX_RETURN553 CWESearchSingleString.MAX_RETURN = 2554 response = self.http_get(self._get_base_url(), self._form_url_params(offset=0, limit=self.VERY_LARGE_NUM))555 self.assertEqual(response.status_code, status.HTTP_200_OK)556 self.assertEqual(self._cwe_total_count(content=response.content), 3)557 self.assertEqual(self._cwe_info_found(content=response.content, code=0), True)558 self.assertEqual(self._cwe_info_found(content=response.content, code=1), True)559 self.assertEqual(self._cwe_info_found(content=response.content, code=11), False)560 CWESearchSingleString.MAX_RETURN = original_max_return561 def test_negative_get_n1_1(self):562 response = self.http_get(self._get_base_url(), self._form_url_params(offset=-1, limit=1))563 self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)564 def test_negative_get_1_n1(self):565 response = self.http_get(self._get_base_url(), self._form_url_params(offset=1, limit=-1))566 self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)567 def test_negative_invalid_inputs(self):568 invalid_inputs = [569 ("0", "a"), # 'limit' is not integer.570 ("0", "1a"), # 'limit' is not integer.571 ("0", "a1"), # 'limit' is not integer.572 ("a", "1"), # 'offset' is not integer.573 ("1a", "1"), # 'offset' is not integer.574 ("a1", "1"), # 'offset' is not integer.575 ]576 for input_pair in invalid_inputs:577 response = self.http_get(self._get_base_url(), self._form_url_params(input_pair[0], input_pair[1]))578 self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)579 def test_negative_no_authentication_token(self):580 response = self.http_get(self._get_base_url(), self._form_url_params(offset=2, limit=1),581 auth_token_type=RestAPITestBase.AUTH_TOKEN_TYPE_NONE)582 self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)583 def test_negative_inactive_authentication_token(self):584 response = self.http_get(self._get_base_url(), self._form_url_params(offset=2, limit=1),585 auth_token_type=RestAPITestBase.AUTH_TOKEN_TYPE_INACTIVE_USER)586 self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)587 def test_negative_get_0_2_None_name(self):588 response = self.http_get(self._get_base_url(),589 self._form_url_params(offset=0, limit=2, search_str="name"))590 self.assertEqual(response.status_code, status.HTTP_200_OK)591 self.assertEqual(self._cwe_total_count(content=response.content), 0)592 self.assertTrue(self._cwe_info_empty(content=response.content))593class TestMisuseCaseSuggestion(RestAPITestBase):594 CWE_CODES = [101, 102, 103] # The CWE codes used in the tests595 def _create_muo_and_misuse_case(self, cwes, muc_desc, custom, approved, creator):596 # Create the MUO container597 cwe_code_list = [cwe.code for cwe in cwes]598 muc_dict = copy.deepcopy(SaveCustomMUO.TEMPLATE_MISUSE_CASE)599 muc_dict["misuse_case_description"] = muc_desc600 uc_dict = copy.deepcopy(SaveCustomMUO.TEMPLATE_USE_CASE)601 uc_dict["use_case_description"] = ""602 uc_dict["osr"] = ""603 MUOContainer.create_custom_muo(cwe_code_list,604 misusecase=muc_dict,605 usecase=uc_dict, # Use Case is not important here.606 created_by=creator607 )608 # We can find this MUO container because only one misuse case is associated with it.609 muo = MUOContainer.objects.get(misuse_case__misuse_case_description=muc_desc)610 if not custom:611 # This is not a custom MUO. We need to change its is_custom field.612 muo.is_custom = False613 if approved:614 # By default, the newly created MUO container is in 'draft' state.615 # If it should be in 'approved' state, we need to submit and approve it.616 muo.action_submit()617 muo.action_approve()618 def set_up_test_data(self):619 # Create CWEs.620 cwe101 = CWE(code=self.CWE_CODES[0])621 cwe101.save()622 cwe102 = CWE(code=self.CWE_CODES[1])623 cwe102.save()624 cwe103 = CWE(code=self.CWE_CODES[2])625 cwe103.save()626 # Create the MUO containers and misuse cases.627 self._create_muo_and_misuse_case(cwes=[cwe101],628 muc_desc="Misuse Case 1",629 custom=False,630 approved=True, # Approved, so it's generic.631 creator=self._user_1)632 self._create_muo_and_misuse_case(cwes=[cwe102],633 muc_desc="Misuse Case 2",634 custom=True,635 approved=True, # Approved, so it's generic but also custom.636 creator=self._user_1)637 self._create_muo_and_misuse_case(cwes=[cwe102, cwe103],638 muc_desc="Misuse Case 3",639 custom=True,640 approved=False, # Not approved, so it's custom.641 creator=self._user_1)642 self._create_muo_and_misuse_case(cwes=[cwe102],643 muc_desc="Misuse Case 4",644 custom=True,645 approved=False, # Not approved, so it's custom.646 creator=self._user_1)647 self._create_muo_and_misuse_case(cwes=[cwe103],648 muc_desc="Misuse Case 5",649 custom=True,650 approved=False, # Not approved, so it's custom.651 creator=self._user_2) # By another user652 def tear_down_test_data(self):653 # Reject all the MUO containers before deleting them.654 for muo in MUOContainer.objects.all():655 if muo.status == 'approved':656 muo.action_reject(reject_reason="In order to delete the test data.")657 # Delete all the MUO containers.658 MUOContainer.objects.all().delete()659 # Delete all the misuse cases.660 MisuseCase.objects.all().delete()661 # Delete all the CWEs.662 CWE.objects.all().delete()663 # Helper methods664 def _get_base_url(self):665 return reverse("restapi_MisuseCase_CWERelated")666 def _form_url_params(self, cwe_code_list):667 cwes_str = "".join(str(code)+"," for code in cwe_code_list).rstrip(',')668 return {MisuseCaseRelated.PARAM_CWES: cwes_str}669 def _misuse_case_info_found(self, json_content, mu_index):670 mu_name = "MU-0000" + str(mu_index)671 mu_description = "Misuse Case " + str(mu_index)672 found = False673 for json_mu in json_content:674 if (json_mu['id'] == mu_index675 and json_mu['name'] == mu_name676 and json_mu['misuse_case_description'] == mu_description):677 found = True678 break679 return found680 # Positive test cases681 def test_positive_single_cwe(self):682 response = self.http_get(self._get_base_url(), self._form_url_params([self.CWE_CODES[0]]))683 json_content = json.loads(response.content)684 # Make sure exactly one misuse case is returned.685 self.assertEqual(len(json_content), 1)686 # Make sure the first misuse case is returned.687 self.assertEqual(self._misuse_case_info_found(json_content, 1), True)688 self.assertEqual(self._misuse_case_info_found(json_content, 2), False)689 self.assertEqual(self._misuse_case_info_found(json_content, 3), False)690 self.assertEqual(self._misuse_case_info_found(json_content, 4), False)691 self.assertEqual(self._misuse_case_info_found(json_content, 5), False)692 def test_positive_multiple_cwes(self):693 response = self.http_get(self._get_base_url(), self._form_url_params([self.CWE_CODES[0], self.CWE_CODES[2]]))694 json_content = json.loads(response.content)695 # Make sure exactly two misuse cases are returned.696 self.assertEqual(len(json_content), 2)697 # Make sure the first, and third misuse cases are returned.698 self.assertEqual(self._misuse_case_info_found(json_content, 1), True)699 self.assertEqual(self._misuse_case_info_found(json_content, 2), False)700 self.assertEqual(self._misuse_case_info_found(json_content, 3), True)701 self.assertEqual(self._misuse_case_info_found(json_content, 4), False)702 self.assertEqual(self._misuse_case_info_found(json_content, 5), False)703 def test_positive_all_cwes(self):704 response = self.http_get(self._get_base_url(), self._form_url_params(self.CWE_CODES))705 json_content = json.loads(response.content)706 self.assertEqual(len(json_content), 4)707 self.assertEqual(self._misuse_case_info_found(json_content, 1), True)708 self.assertEqual(self._misuse_case_info_found(json_content, 2), True)709 self.assertEqual(self._misuse_case_info_found(json_content, 3), True)710 self.assertEqual(self._misuse_case_info_found(json_content, 4), True)711 self.assertEqual(self._misuse_case_info_found(json_content, 5), False)712 def test_positive_distinct_misuse_cases(self):713 response = self.http_get(self._get_base_url(), self._form_url_params([self.CWE_CODES[1], self.CWE_CODES[2]]))714 json_content = json.loads(response.content)715 # Both keyword #102 and #103 are associated with both misuse case #3.716 # We want to make sure that the same misuse case is returned only once.717 self.assertEqual(len(json_content), 3)718 self.assertEqual(self._misuse_case_info_found(json_content, 1), False)719 self.assertEqual(self._misuse_case_info_found(json_content, 2), True)720 self.assertEqual(self._misuse_case_info_found(json_content, 3), True)721 self.assertEqual(self._misuse_case_info_found(json_content, 4), True)722 self.assertEqual(self._misuse_case_info_found(json_content, 5), False)723 # Negative test cases724 def test_negative_very_large_cwe_id(self):725 response = self.http_get(self._get_base_url(), self._form_url_params([self.VERY_LARGE_NUM]))726 self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)727 def test_negative_malformed_cwes(self):728 cwes_str_list = [729 "", # Empty code list730 "101,102,", # Additional ',' at the end731 "101|102", # Not using ',' as separator732 "101.1,102.2", # Not using integers733 "10a", # Not using numeric values734 "10a,20b" # Not using numeric values735 ]736 for cwes_str in cwes_str_list:737 # Note we cannot use the _get_base_url() and _form_url_params() because these two methods738 # will construct a valid Http request, while here we want an invalid one.739 response = self.http_get(self._get_base_url()+'?'+MisuseCaseRelated.PARAM_CWES+'='+cwes_str, None)740 self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)741 # Make sure the error message is generated using this method.742 self.assertEqual(response.content,743 # Because response.content has been JSONized, we need to JSONize the744 # error message in order to compare for equality.745 str(json.dumps(MisuseCaseRelated()._form_err_msg_malformed_cwes(cwes_str)))746 )747 def test_negative_not_found_cwes(self):748 response = self.http_get(self._get_base_url(), self._form_url_params([101, 102, 103, 104, 105]))749 self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)750 # Make sure the error message is generated using this method.751 self.assertEqual(response.content,752 # Because response.content has been JSONized, we need to JSONize the753 # error message in order to compare for equality.754 str(json.dumps(MisuseCaseRelated()._form_err_msg_cwes_not_found([104, 105])))755 )756 def test_negative_no_authentication_token(self):757 response = self.http_get(self._get_base_url(), self._form_url_params([self.CWE_CODES[0]]),758 auth_token_type=RestAPITestBase.AUTH_TOKEN_TYPE_NONE)759 self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)760 def test_negative_inactive_authentication_token(self):761 response = self.http_get(self._get_base_url(), self._form_url_params([self.CWE_CODES[0]]),762 auth_token_type=RestAPITestBase.AUTH_TOKEN_TYPE_INACTIVE_USER)763 self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)764class TestUseCaseSuggestion(RestAPITestBase):765 DESCRIPTION_BASE_MISUSE_CASE = "Misuse Case " # Don't forget the trailing blank space.766 DESCRIPTION_BASE_USE_CASE = "Use Case " # Don't forget the trailing blank space.767 DESCRIPTION_BASE_OSR = "Overlooked Security Requirement " # Don't forget the trailing blank space.768 def _create_muo_and_misuse_case(self, cwes, muc_desc, custom, creator):769 # Create the misuse case and establish the relationship with the CWEs770 misuse_case = MisuseCase(misuse_case_description=muc_desc,771 created_by=creator772 )773 misuse_case.save()774 misuse_case.cwes.add(*cwes) # Establish the relationship between the misuse case and CWEs775 # Create the MUO container for the misuse case and establish the relationship between the776 # MUO Container and CWEs777 muo_container = MUOContainer(is_custom=custom,778 status='draft',779 misuse_case=misuse_case,780 created_by=creator781 )782 muo_container.save()783 muo_container.cwes.add(*cwes) # Establish the relationship between the muo container and cwes784 return misuse_case, muo_container785 def _create_use_case_and_link_muo(self, index, muc, muo, creator):786 uc = UseCase(use_case_description=self.DESCRIPTION_BASE_USE_CASE+str(index),787 osr=self.DESCRIPTION_BASE_OSR+str(index),788 created_by=creator789 )790 uc.muo_container = muo791 uc.misuse_case = muc792 uc.save()793 def _approve_muo_container(self, muo_container):794 muo_container.action_submit()795 muo_container.action_approve()796 def set_up_test_data(self):797 # Create some CWEs.798 cwe101 = CWE(code=101)799 cwe101.save()800 cwe102 = CWE(code=102)801 cwe102.save()802 cwe103 = CWE(code=103)803 cwe103.save()804 # Create the MUO containers and misuse cases.805 muc1, muo1 = self._create_muo_and_misuse_case(806 cwes=[cwe101],807 muc_desc="Misuse Case 1",808 custom=False,809 creator=self._user_1810 )811 muc2, muo2 = self._create_muo_and_misuse_case(812 cwes=[cwe102],813 muc_desc="Misuse Case 2",814 custom=True,815 creator=self._user_1816 )817 muc3, muo3 = self._create_muo_and_misuse_case(818 cwes=[cwe102, cwe103],819 muc_desc="Misuse Case 3",820 custom=True,821 creator=self._user_1822 )823 muc4, muo4 = self._create_muo_and_misuse_case(824 cwes=[cwe102],825 muc_desc="Misuse Case 4",826 custom=True,827 creator=self._user_1828 )829 muc5, muo5 = self._create_muo_and_misuse_case(830 cwes=[cwe103],831 muc_desc="Misuse Case 5",832 custom=True,833 creator=self._user_2834 ) # By another user835 # Create some use cases(with OSRs)836 self._create_use_case_and_link_muo(1, muc1, muo1, self._user_1)837 self._create_use_case_and_link_muo(2, muc2, muo2, self._user_1)838 self._create_use_case_and_link_muo(3, muc2, muo2, self._user_1)839 self._create_use_case_and_link_muo(4, muc3, muo3, self._user_1)840 self._create_use_case_and_link_muo(5, muc4, muo4, self._user_1)841 self._create_use_case_and_link_muo(6, muc4, muo4, self._user_1)842 self._create_use_case_and_link_muo(7, muc5, muo5, self._user_2)843 # Approve some of the MUO containers.844 self._approve_muo_container(muo1)845 self._approve_muo_container(muo2)846 def tear_down_test_data(self):847 # Reject all the MUO containers before deleting them.848 for muo in MUOContainer.objects.all():849 if muo.status == 'approved':850 muo.action_reject(reject_reason="In order to delete the test data.")851 # Delete all the MUO containers.852 MUOContainer.objects.all().delete()853 # Delete all the misuse cases854 MisuseCase.objects.all().delete()855 # Delete all the CWEs.856 CWE.objects.all().delete()857 def _get_base_url(self):858 return reverse("restapi_UseCase_MisuseCaseRelated")859 def _form_url_params(self, misuse_case_id_list):860 misuse_cases_str = "".join(str(code)+"," for code in misuse_case_id_list).rstrip(',')861 return {UseCaseRelated.PARAM_MISUSE_CASES: misuse_cases_str}862 def _use_case_info_found(self, json_content, uc_index):863 uc_name = "UC-{0:05d}".format(uc_index)864 uc_description = self.DESCRIPTION_BASE_USE_CASE + str(uc_index)865 osr = self.DESCRIPTION_BASE_OSR + str(uc_index)866 found = False867 for json_uc in json_content:868 if (json_uc['id'] == uc_index869 and json_uc['name'] == uc_name870 and json_uc['use_case_description'] == uc_description871 and json_uc['osr'] == osr):872 found = True873 break874 return found875 # Positive test cases876 def test_positive_single_misuse_case(self):877 response = self.http_get(self._get_base_url(), self._form_url_params([1]))878 json_content = json.loads(response.content)879 # Make sure exactly one use case is returned.880 self.assertEqual(len(json_content), 1)881 # Make sure the first use case is returned.882 self.assertEqual(self._use_case_info_found(json_content, 1), True)883 self.assertEqual(self._use_case_info_found(json_content, 2), False)884 self.assertEqual(self._use_case_info_found(json_content, 3), False)885 self.assertEqual(self._use_case_info_found(json_content, 4), False)886 self.assertEqual(self._use_case_info_found(json_content, 5), False)887 self.assertEqual(self._use_case_info_found(json_content, 6), False)888 self.assertEqual(self._use_case_info_found(json_content, 7), False)889 def test_positive_multiple_misuse_cases(self):890 response = self.http_get(self._get_base_url(), self._form_url_params([1, 2]))891 json_content = json.loads(response.content)892 # Make sure all the use cases are returned.893 self.assertEqual(len(json_content), 3)894 # Make sure all the use cases are returned.895 self.assertEqual(self._use_case_info_found(json_content, 1), True)896 self.assertEqual(self._use_case_info_found(json_content, 2), True)897 self.assertEqual(self._use_case_info_found(json_content, 3), True)898 self.assertEqual(self._use_case_info_found(json_content, 4), False)899 self.assertEqual(self._use_case_info_found(json_content, 5), False)900 self.assertEqual(self._use_case_info_found(json_content, 6), False)901 self.assertEqual(self._use_case_info_found(json_content, 7), False)902 def test_positive_all_misuse_cases(self):903 response = self.http_get(self._get_base_url(), self._form_url_params([n for n in range(1, 8)]))904 json_content = json.loads(response.content)905 # Make sure all the use cases are returned.906 self.assertEqual(len(json_content), 6)907 # Make sure all the use cases are returned.908 self.assertEqual(self._use_case_info_found(json_content, 1), True)909 self.assertEqual(self._use_case_info_found(json_content, 2), True)910 self.assertEqual(self._use_case_info_found(json_content, 3), True)911 self.assertEqual(self._use_case_info_found(json_content, 4), True)912 self.assertEqual(self._use_case_info_found(json_content, 5), True)913 self.assertEqual(self._use_case_info_found(json_content, 6), True)914 self.assertEqual(self._use_case_info_found(json_content, 7), False)915 # Negative test cases916 def test_negative_very_large_misuse_case_id(self):917 response = self.http_get(self._get_base_url(), self._form_url_params([self.VERY_LARGE_NUM]))918 self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)919 def test_negative_malformed_misuse_cases(self):920 misuse_cases_str_list = [921 "", # Empty id list922 "1,2,", # Additional ',' at the end923 "1|2", # Not using ',' as separator924 "1.1,2.2", # Not using integers925 "1a", # Not using numeric values926 "1a,2b" # Not using numeric values927 ]928 for misuse_cases_str in misuse_cases_str_list:929 # Note we cannot use the _get_base_url() and _form_url_params() because these two methods930 # will construct a valid Http request, while here we want an invalid one.931 response = self.http_get(self._get_base_url()+'?'+UseCaseRelated.PARAM_MISUSE_CASES+'='+misuse_cases_str,932 None)933 self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)934 # Make sure the error message is generated using this method.935 self.assertEqual(response.content,936 # Because response.content has been JSONized, we need to JSONize the937 # error message in order to compare for equality.938 str(json.dumps(UseCaseRelated()._form_err_msg_malformed_misuse_cases(misuse_cases_str)))939 )940 def test_negative_no_authentication_token(self):941 response = self.http_get(self._get_base_url(), self._form_url_params([1]),942 auth_token_type=RestAPITestBase.AUTH_TOKEN_TYPE_NONE)943 self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)944 def test_negative_inactive_authentication_token(self):945 response = self.http_get(self._get_base_url(), self._form_url_params([1]),946 auth_token_type=RestAPITestBase.AUTH_TOKEN_TYPE_INACTIVE_USER)947 self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)948class TestSaveCustomMUO(RestAPITestBase):949 _cli = Client()950 CWE_CODES = [101, 102, 103] # The CWE codes used in the tests951 CWE_IDS = []952 DESCRIPTION_MISUSE_CASE = "Sample Misuse Case Description"953 DESCRIPTION_USE_CASE = "Sample Use Case Description"954 DESCRIPTION_OSR = "Sample Overlooked Security Requirement Description"955 def set_up_test_data(self):956 # Clear the current CWE ID list.957 self.CWE_IDS = []958 # Create the CWEs959 cwe101 = CWE(code=self.CWE_CODES[0], name="CWE #"+str(self.CWE_CODES[0]))...

Full Screen

Full Screen

GeneralOba_v001.py

Source:GeneralOba_v001.py Github

copy

Full Screen

...94 retry = 395 try_cnt = 096 ob_response = None97 while True:98 # URLrequest = self._get_base_url() + "/onebox_info"99 if 'obagent_base_url' in req_dict:100 if req_dict.get('obagent_base_url') is None:101 URLrequest = self._get_base_url(req_dict.get('onebox_id')) + "/onebox_info"102 else:103 URLrequest = req_dict.get('obagent_base_url') + "/onebox_info"104 else:105 URLrequest = self._get_base_url(req_dict.get('onebox_id')) + "/onebox_info"106 log.debug("Request request URL: %s" %URLrequest)107 try:108 try_cnt += 1109 ob_response = requests.get(URLrequest, verify=False, cert=(CLIENT_ORCH_CRT, CLIENT_ORCH_KEY))110 except Exception, e:111 if try_cnt < retry:112 time.sleep(3)113 log.debug("Retry calling API to get onebox info : %d" % try_cnt)114 continue115 else:116 log.error("failed to get one-box info due to HTTP Error %s" %str(e))117 return -500, str(e)118 break119 return self._parse_json_response(ob_response)120 def onebox_backup(self, req_info):121 # log.debug("IN backup_onbox()")122 log.debug("[Action :: Backup] backup_onebox: %s" % (str(req_info)))123 headers_req = {'Accept': 'application/json', 'content-type': 'application/json'}124 if 'obagent_base_url' in req_info:125 if req_info.get('obagent_base_url') is None:126 URLrequest = self._get_base_url(req_info.get('onebox_id')) + "/backup"127 else:128 URLrequest = req_info.get('obagent_base_url') + "/backup"129 else:130 URLrequest = self._get_base_url(req_info.get('onebox_id')) + "/backup"131 log.debug("Backup request URL: %s" % URLrequest)132 # reqDict = {'backup_server_ip': req_info['backup_server']}133 # reqDict['backup_server_port'] = 9922134 # reqDict['remote_location'] = req_info['remote_location']135 # reqDict['local_location'] = req_info['local_location']136 # if req_info['tid']: reqDict['tid'] = req_info['tid']137 # if req_info['tpath']: reqDict['tpath'] = req_info['tpath']138 reqDict = {}139 payload_req = json.dumps(reqDict)140 # log.debug("Backup request body = %s" % str(payload_req))141 try:142 ob_response = requests.post(URLrequest, headers=headers_req, data=payload_req, verify=False, timeout=120, cert=(CLIENT_ORCH_CRT, CLIENT_ORCH_KEY))143 except (HTTPException, ConnectionError), e:144 log.exception("failed to backup One-Box due to HTTP Error %s" % str(e))145 return -500, str(e)146 except Exception, e:147 log.error("Failed to backup One-Box : %s" % str(e))148 return -500, str(e)149 return self._parse_json_response(ob_response)150 # restore151 def onebox_restore(self, req_dict, tid=None, tpath=None):152 log.debug("[HJC] restore_onebox: %s" % (str(req_dict)))153 headers_req = {'Accept': 'application/json', 'content-type': 'application/json'}154 # URLrequest = self._get_base_url() + "/restore"155 if 'obagent_base_url' in req_dict:156 if req_dict.get('obagent_base_url') is None:157 URLrequest = self._get_base_url(req_dict.get('onebox_id')) + "/restore"158 else:159 URLrequest = req_dict.get('obagent_base_url') + "/restore"160 else:161 URLrequest = self._get_base_url(req_dict.get('onebox_id')) + "/restore"162 log.debug("Request request URL: %s" % URLrequest)163 # reqDict = {'backup_server_ip': req_dict['backup_server']}164 reqDict = {}165 # if 'backup_location' in req_dict: reqDict['remote_location'] = req_dict['backup_location']166 if 'backup_local_location' in req_dict: reqDict['local_location'] = req_dict['backup_local_location']167 if 'backup_file' in req_dict: reqDict['backup_file'] = req_dict['backup_file']168 if tid: reqDict['tid'] = tid169 if tpath: reqDict['tpath'] = tpath170 payload_req = json.dumps(reqDict)171 log.debug("Request body = %s" % str(payload_req))172 try:173 ob_response = requests.post(URLrequest, headers=headers_req, data=payload_req, verify=False, timeout=120, cert=(CLIENT_ORCH_CRT, CLIENT_ORCH_KEY))174 except (HTTPException, ConnectionError), e:175 log.exception("failed to restore One-Box due to HTTP Error %s" % str(e))176 return -500, str(e)177 except Exception, e:178 log.error("Failed to restore One-Box : %s" % str(e))179 return -500, str(e)180 return self._parse_json_response(ob_response)181 # reboot agent call182 def onebox_reboot(self, req_dict, tid=None, tpath=None):183 log.debug("[WF - Reboot] obconnector > reboot_onebox: %s" % (str(req_dict)))184 headers_req = {'Accept': 'application/json', 'content-type': 'application/json'}185 # URLrequest = self._get_base_url() + "/reboot"186 if 'obagent_base_url' in req_dict:187 if req_dict.get('obagent_base_url') is None:188 URLrequest = self._get_base_url(req_dict.get('onebox_id')) + "/reboot"189 else:190 URLrequest = req_dict.get('obagent_base_url') + "/reboot"191 else:192 URLrequest = self._get_base_url(req_dict.get('onebox_id')) + "/reboot"193 log.debug("Request request URL: %s" % URLrequest)194 try:195 ob_response = requests.post(URLrequest, headers=headers_req, data=None, verify=False, timeout=120, cert=(CLIENT_ORCH_CRT, CLIENT_ORCH_KEY))196 except (HTTPException, ConnectionError), e:197 log.exception("failed to restore One-Box due to HTTP Error %s" % str(e))198 return -500, str(e)199 except Exception, e:200 log.error("Failed to restore One-Box : %s" % str(e))201 return -500, str(e)202 return self._parse_json_response(ob_response)203 # One-Box Agent 진행 상태 조회204 def wf_check_onebox_agent_progress(self, progress_dict, request_type=None, transaction_id=None):205 log.debug("[HJC] check_onebox_agent_progress")206 if 'obagent_base_url' in progress_dict:207 if progress_dict.get('obagent_base_url') is None:208 get_base_url = self._get_base_url(progress_dict.get('onebox_id'))209 else:210 get_base_url = progress_dict.get('obagent_base_url')211 else:212 get_base_url = self._get_base_url(progress_dict.get('onebox_id'))213 if request_type: # "restore", "reboot"214 if request_type == "restore":215 URLrequest = get_base_url + "/" + request_type + "/progress"216 else: # reboot217 # URLrequest = self._get_base_url() + "/version"218 URLrequest = get_base_url + "/onebox_info"219 else:220 URLrequest = get_base_url + "/progress"221 # 초소형인 경우 transaction_id 필요없음222 if transaction_id:223 URLrequest = URLrequest + "?transaction_id=" + transaction_id224 log.debug("Request request URL: %s" % URLrequest)225 try:226 ob_response = requests.get(URLrequest, verify=False, cert=(CLIENT_ORCH_CRT, CLIENT_ORCH_KEY), timeout=30)227 log.debug('wf_check_onebox_agent_progress : ob_response = %s' %str(ob_response))228 except (HTTPException, ConnectionError), e:229 log.exception("failed to check the agent-progress of One-Box due to HTTP Error %s" % str(e))230 return -500, str(e)231 except Exception, e:232 log.error("Failed to check the agent-progress of One-Box : %s" % str(e))233 return -500, str(e)234 # except requests.Timeout as timeout_error:235 # log.exception("failed to check the agent-progress of One-Box agent connection timeout HTTP Error %s" % str(timeout_error))236 # return -500, str(timeout_error)237 return self._parse_json_response(ob_response)238 # 동작 점검239 def connection_check(self, req_info=None):240 return self._get_onebox_net_mode(req_info, timeout_value=5)241 def _get_onebox_net_mode(self, req_info, tid=None, tpath=None, timeout_value=30):242 if 'obagent_base_url' in req_info:243 if req_info.get('obagent_base_url') is None:244 URLrequest = self._get_base_url(req_info.get('onebox_id')) + "/wanmonitor"245 else:246 URLrequest = req_info.get('obagent_base_url') + "/wanmonitor"247 else:248 URLrequest = self._get_base_url(req_info.get('onebox_id')) + "/wanmonitor"249 log.debug("Request request URL: %s" % URLrequest)250 try:251 ob_response = requests.get(URLrequest, verify=False, timeout=timeout_value, cert=(CLIENT_ORCH_CRT, CLIENT_ORCH_KEY))252 except Exception, e:253 log.exception("Failed to get the network mode of One-Box due to HTTP Error %s" % str(e))254 return -500, str(e)255 return self._parse_json_response(ob_response)256 def provisionning_arm(self, req_dict=None):257 log.debug("[WF - provisionning] obconnector > provisionning_arm: %s" % (str(req_dict)))258 headers_req = {'Accept': 'application/json', 'content-type': 'application/json'}259 # URLrequest = self._get_base_url() + "/reboot"260 if 'obagent_base_url' in req_dict:261 if req_dict.get('obagent_base_url') is None:262 URLrequest = self._get_base_url(req_dict.get('onebox_id')) + "/vnf/set"263 else:264 URLrequest = req_dict.get('obagent_base_url') + "/vnf/set"265 else:266 URLrequest = self._get_base_url(req_dict.get('onebox_id')) + "/vnf/set"267 log.debug("Request request URL: %s" % URLrequest)268 reqDict = {}269 reqDict['image_name'] = req_dict.get('image_name')270 payload_req = json.dumps(reqDict)271 log.debug("Request body = %s" % str(payload_req))272 try:273 ob_response = requests.post(URLrequest, headers=headers_req, data=payload_req, verify=False, timeout=120, cert=(CLIENT_ORCH_CRT, CLIENT_ORCH_KEY))274 except (HTTPException, ConnectionError), e:275 log.exception("failed to restore One-Box due to HTTP Error %s" % str(e))276 return -500, str(e)277 except Exception, e:278 log.error("Failed to restore One-Box : %s" % str(e))279 return -500, str(e)280 return self._parse_json_response(ob_response)281 def delete_arm(self, req_dict=None):282 log.debug("[WF - delete] obconnector > delete_arm: %s" % (str(req_dict)))283 headers_req = {'Accept': 'application/json', 'content-type': 'application/json'}284 # URLrequest = self._get_base_url() + "/reboot"285 if 'obagent_base_url' in req_dict:286 if req_dict.get('obagent_base_url') is None:287 URLrequest = self._get_base_url(req_dict.get('onebox_id')) + "/vnf/unset"288 else:289 URLrequest = req_dict.get('obagent_base_url') + "/vnf/unset"290 else:291 URLrequest = self._get_base_url(req_dict.get('onebox_id')) + "/vnf/unset"292 log.debug("Request request URL: %s" % URLrequest)293 try:294 ob_response = requests.post(URLrequest, headers=headers_req, verify=False, timeout=120, cert=(CLIENT_ORCH_CRT, CLIENT_ORCH_KEY))295 except (HTTPException, ConnectionError), e:296 log.exception("failed to restore One-Box due to HTTP Error %s" % str(e))297 return -500, str(e)298 except Exception, e:299 log.error("Failed to restore One-Box : %s" % str(e))300 return -500, str(e)301 return self._parse_json_response(ob_response)302 def prov_check_onebox_agent_progress_arm(self, progress_dict, request_type=None, transaction_id=None):303 log.debug("[BBG] prov_check_onebox_agent_progress_arm")304 if 'obagent_base_url' in progress_dict:305 if progress_dict.get('obagent_base_url') is None:306 get_base_url = self._get_base_url(progress_dict.get('onebox_id'))307 else:308 get_base_url = progress_dict.get('obagent_base_url')309 else:310 get_base_url = self._get_base_url(progress_dict.get('onebox_id'))311 if request_type: # "restore", "reboot"312 if request_type == "restore":313 URLrequest = get_base_url + "/vnf"314 else: # reboot315 # URLrequest = self._get_base_url() + "/version"316 URLrequest = get_base_url + "/vnf"317 else:318 URLrequest = get_base_url + "/vnf"319 # 초소형인 경우 transaction_id 필요없음320 if transaction_id:321 URLrequest = URLrequest + "?transaction_id=" + transaction_id322 log.debug("Request request URL: %s" % URLrequest)323 reqDict = {}324 reqDict['order'] = progress_dict.get('order')325 payload_req = json.dumps(reqDict)326 log.debug("Request body = %s" % str(payload_req))327 try:328 ob_response = requests.post(URLrequest, data=payload_req, verify=False, cert=(CLIENT_ORCH_CRT, CLIENT_ORCH_KEY), timeout=30)329 log.debug('wf_check_onebox_agent_progress : ob_response = %s' % str(ob_response))330 except (HTTPException, ConnectionError), e:331 log.exception("failed to check the agent-progress of One-Box due to HTTP Error %s" % str(e))332 return -500, str(e)333 except Exception, e:334 log.error("Failed to check the agent-progress of One-Box : %s" % str(e))335 return -500, str(e)336 # except requests.Timeout as timeout_error:337 # log.exception("failed to check the agent-progress of One-Box agent connection timeout HTTP Error %s" % str(timeout_error))338 # return -500, str(timeout_error)339 return self._parse_json_response(ob_response)340 def _get_base_url(self, onebox_id=None):341 self._refresh_ob(onebox_id)342 if self.url:343 return self.url344 else:345 return "https://%s:%s/v1" %(self.host, self.port)346 def _parse_json_response(self, ob_response):347 try:348 if "backup_data" not in ob_response:349 log.debug("Response from One-Box Agent = %s" % str(ob_response.text))350 log.debug("Response HTTP Code from One-Box Agent = %s" % str(ob_response.status_code))351 # if ob_response.status_code == 200:352 content = ob_response.json()353 except Exception, e:354 log.exception("Exception: [%s] %s" % (str(e), sys.exc_info()))...

Full Screen

Full Screen

observations_api_client.py

Source:observations_api_client.py Github

copy

Full Screen

1from typing import Dict, List2import requests3from environs import Env4from requests import Response5def _get_base_url() -> str:6 return Env().str("DHOS_OBSERVATIONS_BASE_URL", "http://dhos-observations-api:5000")7def post_observation_set(observation_set_data: Dict, jwt: str) -> Response:8 return requests.post(9 f"{_get_base_url()}/dhos/v2/observation_set",10 headers={"Authorization": f"Bearer {jwt}"},11 json=observation_set_data,12 timeout=15,13 )14def get_latest_observation_set(encounter_uuid: str, jwt: str) -> Response:15 return requests.get(16 f"{_get_base_url()}/dhos/v2/observation_set/latest",17 headers={"Authorization": f"Bearer {jwt}"},18 timeout=15,19 params={"encounter_id": encounter_uuid},20 )21def patch_observation_set(22 observation_set_uuid: str, observation_set_data: Dict, jwt: str23) -> Response:24 return requests.patch(25 f"{_get_base_url()}/dhos/v2/observation_set/{observation_set_uuid}",26 headers={"Authorization": f"Bearer {jwt}"},27 json=observation_set_data,28 timeout=15,29 )30def post_observation_set_count(encounter_uuids: List, jwt: str) -> Response:31 return requests.post(32 f"{_get_base_url()}/dhos/v2/observation_set/count",33 headers={"Authorization": f"Bearer {jwt}"},34 json=encounter_uuids,35 timeout=15,36 )37def post_latest_observations_by_encounter_list(38 encounter_uuids: List, jwt: str39) -> Response:40 return requests.post(41 f"{_get_base_url()}/dhos/v2/observation_set/latest",42 headers={"Authorization": f"Bearer {jwt}"},43 json=encounter_uuids,44 timeout=15,45 )46def get_observation_sets_by_location(47 location_uuid: str, start_date: str, end_date: str, jwt: str48) -> Response:49 return requests.get(50 f"{_get_base_url()}/dhos/v2/observation_set_search",51 headers={"Authorization": f"Bearer {jwt}"},52 timeout=15,53 params={54 "location": location_uuid,55 "start_date": start_date,56 "end_date": end_date,57 },58 )59def post_observation_sets_by_location_list(60 location_uuids: List, start_date: str, end_date: str, jwt: str61) -> Response:62 return requests.post(63 f"{_get_base_url()}/dhos/v2/observation_set_search",64 headers={"Authorization": f"Bearer {jwt}"},65 timeout=15,66 params={"start_date": start_date, "end_date": end_date},67 json=location_uuids,68 )69def post_agg_observation_sets_by_month(70 location_uuids: List, start_date: str, end_date: str, jwt: str71) -> Response:72 return requests.post(73 f"{_get_base_url()}/dhos/v2/observation_sets_by_month",74 headers={"Authorization": f"Bearer {jwt}"},75 timeout=15,76 params={"start_date": start_date, "end_date": end_date},77 json=location_uuids,78 )79def post_refresh_agg_observation_sets(jwt: str) -> Response:80 return requests.post(81 f"{_get_base_url()}/dhos/v2/aggregate_obs",82 headers={"Authorization": f"Bearer {jwt}"},83 timeout=15,84 )85def get_agg_observation_sets_by_location_by_month(86 start_date: str, end_date: str, jwt: str87) -> Response:88 return requests.get(89 f"{_get_base_url()}/dhos/v2/observation_sets_by_month",90 headers={"Authorization": f"Bearer {jwt}"},91 timeout=15,92 params={"start_date": start_date, "end_date": end_date},...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run robotframework-appiumlibrary automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful