Best Python code snippet using avocado_python
test.py
Source:test.py  
...23    >>> data ='''{24    ... ,,,25    ... }'''26    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("admin"), middlewares=[])27    >>> d = json_loads(r.data)28    >>> print(d)29    {'code': 400, 'msg': 'not json data in the request'}30    >>> #query self user31    >>> data ='''{32    ... "user":{33    ...         "@role":"OWNER"34    ...     }35    ... }'''36    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("admin"), middlewares=[])37    >>> d = json_loads(r.data)38    >>> print(d)39    {'code': 200, 'msg': 'success', 'user': {'username': 'admin', 'nickname': 'Administrator', 'email': 'admin@localhost', 'is_superuser': True, 'last_login': None, 'date_join': '2018-01-01 00:00:00', 'image': '', 'active': False, 'locked': False, 'deleted': False, 'auth_type': 'default', 'timezone': '', 'id': 1}}40    >>> #query with id41    >>> data ='''{42    ... "user":{43    ...         "@role":"ADMIN",44    ...         "id": 245    ...     }46    ... }'''47    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("admin"), middlewares=[])48    >>> d = json_loads(r.data)49    >>> print(d)50    {'code': 200, 'msg': 'success', 'user': {'username': 'usera', 'nickname': 'User A', 'email': 'usera@localhost', 'is_superuser': False, 'last_login': None, 'date_join': '2018-02-02 00:00:00', 'image': '', 'active': False, 'locked': False, 'deleted': False, 'auth_type': 'default', 'timezone': '', 'id': 2}}51    >>> #query with @column52    >>> data ='''{53    ... "user":{54    ...         "@role":"OWNER",55    ...         "@column": "id,username,email,nickname,is_superuser"56    ...     }57    ... }'''58    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("admin"), middlewares=[])59    >>> d = json_loads(r.data)60    >>> print(d)61    {'code': 200, 'msg': 'success', 'user': {'username': 'admin', 'nickname': 'Administrator', 'email': 'admin@localhost', 'is_superuser': True, 'id': 1}}62    >>> #query with @column which have a non existing column name63    >>> data ='''{64    ... "user":{65    ...         "@role":"OWNER",66    ...         "@column": "id,username,email,nickname,is_superuser,nonexisting"67    ...     }68    ... }'''69    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("admin"), middlewares=[])70    >>> d = json_loads(r.data)71    >>> print(d)72    {'code': 200, 'msg': 'success', 'user': {'username': 'admin', 'nickname': 'Administrator', 'email': 'admin@localhost', 'is_superuser': True, 'id': 1}}73    >>> #query with a non existing column property74    >>> data ='''{75    ... "user":{76    ...         "nonexisting": 177    ...     }78    ... }'''79    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("admin"), middlewares=[])80    >>> d = json_loads(r.data)81    >>> print(d)82    {'code': 400, 'msg': "'user' have no attribute 'nonexisting'"}83    >>> #query one with a non existing model84    >>> data ='''{85    ... "nonexist":{86    ...         "id": 187    ...     }88    ... }'''89    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("admin"), middlewares=[])90    >>> d = json_loads(r.data)91    >>> print(d)92    {'code': 400, 'msg': "model 'nonexist' not found"}93    >>> #query one with a non expose model94    >>> data ='''{95    ... "role":{96    ...         "id": 197    ...     }98    ... }'''99    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("admin"), middlewares=[])100    >>> d = json_loads(r.data)101    >>> print(d)102    {'code': 400, 'msg': "'role' not accessible"}103    >>> #query one with UNKNOWN role (expected ok)104    >>> data ='''{105    ... "moment":{106    ...         "id": 1107    ...     }108    ... }'''109    >>> r = handler.post('/apijson/get', data=data, middlewares=[])110    >>> d = json_loads(r.data)111    >>> print(d)112    {'code': 200, 'msg': 'success', 'moment': {'user_id': 2, 'date': '2018-11-01 00:00:00', 'content': 'test moment', 'picture_list': '[]', 'id': 1}}113    >>> #query one with UNKNOWN role (expected fail)114    >>> data ='''{115    ... "privacy":{116    ...         "id": 1117    ...     }118    ... }'''119    >>> r = handler.post('/apijson/get', data=data, middlewares=[])120    >>> d = json_loads(r.data)121    >>> print(d)122    {'code': 400, 'msg': "'privacy' not accessible by role 'UNKNOWN'"}123    >>> #query one without user but use a non-UNKNOWN role124    >>> data ='''{125    ... "publicnotice":{126    ...         "@role":"OWNER",127    ...         "id": 1128    ...     }129    ... }'''130    >>> r = handler.post('/apijson/get', data=data, middlewares=[])131    >>> d = json_loads(r.data)132    >>> print(d)133    {'code': 400, 'msg': "no login user for role 'OWNER'"}134    >>> #query one with OWNER but cannot filter with OWNER135    >>> data ='''{136    ... "publicnotice":{137    ...         "@role":"OWNER",138    ...         "id": 1139    ...     }140    ... }'''141    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("admin"), middlewares=[])142    >>> d = json_loads(r.data)143    >>> print(d)144    {'code': 400, 'msg': "'publicnotice' cannot filter with owner"}145    >>> #query one with OWNER which will use owner_condition() to filter146    >>> data ='''{147    ... "moment":{148    ...         "@role":"OWNER"149    ...     }150    ... }'''151    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("usera"), middlewares=[])152    Moment: owner_condition153    >>> d = json_loads(r.data)154    >>> print(d)155    {'code': 200, 'msg': 'success', 'moment': {'user_id': 2, 'date': '2018-11-01 00:00:00', 'content': 'test moment', 'picture_list': '[]', 'id': 1}}156    >>> #query one with UNKNOWN157    >>> data ='''{158    ... "publicnotice":{159    ...         "id": 1160    ...     }161    ... }'''162    >>> r = handler.post('/apijson/get', data=data, middlewares=[])163    >>> d = json_loads(r.data)164    >>> print(d)165    {'code': 200, 'msg': 'success', 'publicnotice': {'date': '2018-12-09 00:00:00', 'content': 'notice: a', 'id': 1}}166    >>> #query array with a non expose model167    >>> data ='''{168    ... "[]":{169    ...         "role": {"@role":"ADMIN"}170    ...     }171    ... }'''172    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("admin"), middlewares=[])173    >>> d = json_loads(r.data)174    >>> print(d)175    {'code': 400, 'msg': "'role' not accessible by apijson"}176    >>> #query array with a non existing model177    >>> data ='''{178    ... "[]":{179    ...         "nonexisting": {"@role":"ADMIN"}180    ...     }181    ... }'''182    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("admin"), middlewares=[])183    >>> d = json_loads(r.data)184    >>> print(d)185    {'code': 400, 'msg': "model 'nonexisting' not found"}186    >>> #query array with a non existing role187    >>> data ='''{188    ... "[]":{189    ...         "user": {"@role":"NONEXISTING"}190    ...     }191    ... }'''192    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("admin"), middlewares=[])193    >>> d = json_loads(r.data)194    >>> print(d)195    {'code': 400, 'msg': "'user' not accessible by role 'NONEXISTING'"}196    >>> #query array with a role user don't have197    >>> data ='''{198    ... "[]":{199    ...         "user": {"@role":"ADMIN"}200    ...     }201    ... }'''202    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("usera"), middlewares=[])203    >>> d = json_loads(r.data)204    >>> print(d)205    {'code': 400, 'msg': "user doesn't have role 'ADMIN'"}206    >>> #query array with no permission207    >>> data ='''{208    ... "[]":{209    ...         "user": {"@role":"superuser"}210    ...     }211    ... }'''212    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("admin"), middlewares=[])213    >>> d = json_loads(r.data)214    >>> print(d)215    {'code': 400, 'msg': "'user' not accessible by role 'superuser'"}216    >>> #query array217    >>> data ='''{218    ... "[]":{219    ...         "user": {"@role":"ADMIN"}220    ...     }221    ... }'''222    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("admin"), middlewares=[])223    >>> d = json_loads(r.data)224    >>> print(d)225    {'code': 200, 'msg': 'success', '[]': [{'user': {'username': 'admin', 'nickname': 'Administrator', 'email': 'admin@localhost', 'is_superuser': True, 'last_login': None, 'date_join': '2018-01-01 00:00:00', 'image': '', 'active': False, 'locked': False, 'deleted': False, 'auth_type': 'default', 'timezone': '', 'id': 1}}, {'user': {'username': 'usera', 'nickname': 'User A', 'email': 'usera@localhost', 'is_superuser': False, 'last_login': None, 'date_join': '2018-02-02 00:00:00', 'image': '', 'active': False, 'locked': False, 'deleted': False, 'auth_type': 'default', 'timezone': '', 'id': 2}}, {'user': {'username': 'userb', 'nickname': 'User B', 'email': 'userb@localhost', 'is_superuser': False, 'last_login': None, 'date_join': '2018-03-03 00:00:00', 'image': '', 'active': False, 'locked': False, 'deleted': False, 'auth_type': 'default', 'timezone': '', 'id': 3}}, {'user': {'username': 'userc', 'nickname': 'User C', 'email': 'userc@localhost', 'is_superuser': False, 'last_login': None, 'date_join': '2018-04-04 00:00:00', 'image': '', 'active': False, 'locked': False, 'deleted': False, 'auth_type': 'default', 'timezone': '', 'id': 4}}]}226    >>> #query array227    >>> data ='''{228    ... "[]":{229    ...         "user": {230    ...             "@role":"ADMIN",231    ...             "@column":"id,username,nickname,email"232    ...         }233    ...     }234    ... }'''235    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("admin"), middlewares=[])236    >>> d = json_loads(r.data)237    >>> print(d)238    {'code': 200, 'msg': 'success', '[]': [{'user': {'username': 'admin', 'nickname': 'Administrator', 'email': 'admin@localhost', 'id': 1}}, {'user': {'username': 'usera', 'nickname': 'User A', 'email': 'usera@localhost', 'id': 2}}, {'user': {'username': 'userb', 'nickname': 'User B', 'email': 'userb@localhost', 'id': 3}}, {'user': {'username': 'userc', 'nickname': 'User C', 'email': 'userc@localhost', 'id': 4}}]}239    >>> #query array with non existing role240    >>> data ='''{241    ... "[]":{242    ...         "user": {243    ...             "@role":"NONEXISTING",244    ...             "@column":"id,username,nickname,email"245    ...         }246    ...     }247    ... }'''248    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("admin"), middlewares=[])249    >>> d = json_loads(r.data)250    >>> print(d)251    {'code': 400, 'msg': "'user' not accessible by role 'NONEXISTING'"}252    >>> #query array with UNKNOWN253    >>> data ='''{254    ... "[]":{255    ...         "user": {256    ...             "@column":"id,username,nickname,email"257    ...         }258    ...     }259    ... }'''260    >>> r = handler.post('/apijson/get', data=data, middlewares=[])261    >>> d = json_loads(r.data)262    >>> print(d)263    {'code': 400, 'msg': "'user' not accessible by role 'UNKNOWN'"}264    >>> #query array without login user265    >>> data ='''{266    ... "[]":{267    ...         "user": {268    ...             "@role":"ADMIN",269    ...             "@column":"id,username,nickname,email"270    ...         }271    ...     }272    ... }'''273    >>> r = handler.post('/apijson/get', data=data, middlewares=[])274    >>> d = json_loads(r.data)275    >>> print(d)276    {'code': 400, 'msg': "no login user for role 'ADMIN'"}277    >>> #query array with a role which the user doesn't really have278    >>> data ='''{279    ... "[]":{280    ...         "user": {281    ...             "@role":"ADMIN",282    ...             "@column":"id,username,nickname,email"283    ...         }284    ...     }285    ... }'''286    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("usera"), middlewares=[])287    >>> d = json_loads(r.data)288    >>> print(d)289    {'code': 400, 'msg': "user doesn't have role 'ADMIN'"}290    >>> #query array with @count291    >>> data ='''{292    ... "[]":{293    ...         "@count":3,294    ...         "user": {295    ...         }296    ...     }297    ... }'''298    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("usera"), middlewares=[])299    >>> d = json_loads(r.data)300    >>> print(d)301    {'code': 200, 'msg': 'success', '[]': [{'user': {'username': 'admin', 'nickname': 'Administrator', 'email': 'admin@localhost', 'is_superuser': True, 'last_login': None, 'date_join': '2018-01-01 00:00:00', 'image': '', 'active': False, 'locked': False, 'deleted': False, 'auth_type': 'default', 'timezone': '', 'id': 1}}, {'user': {'username': 'usera', 'nickname': 'User A', 'email': 'usera@localhost', 'is_superuser': False, 'last_login': None, 'date_join': '2018-02-02 00:00:00', 'image': '', 'active': False, 'locked': False, 'deleted': False, 'auth_type': 'default', 'timezone': '', 'id': 2}}, {'user': {'username': 'userb', 'nickname': 'User B', 'email': 'userb@localhost', 'is_superuser': False, 'last_login': None, 'date_join': '2018-03-03 00:00:00', 'image': '', 'active': False, 'locked': False, 'deleted': False, 'auth_type': 'default', 'timezone': '', 'id': 3}}]}302    >>> #query array ,@count is bad param303    >>> data ='''{304    ... "[]":{305    ...         "@count":"bad",306    ...         "user": {307    ...         }308    ...     }309    ... }'''310    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("usera"), middlewares=[])311    >>> d = json_loads(r.data)312    >>> print(d)313    {'code': 400, 'msg': "@count should be an int, but get 'bad'"}314    >>> #query array with @count and @page315    >>> data ='''{316    ... "[]":{317    ...         "@count":2,318    ...         "@page":1,319    ...         "user": {320    ...         }321    ...     }322    ... }'''323    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("usera"), middlewares=[])324    >>> d = json_loads(r.data)325    >>> print(d)326    {'code': 200, 'msg': 'success', '[]': [{'user': {'username': 'userb', 'nickname': 'User B', 'email': 'userb@localhost', 'is_superuser': False, 'last_login': None, 'date_join': '2018-03-03 00:00:00', 'image': '', 'active': False, 'locked': False, 'deleted': False, 'auth_type': 'default', 'timezone': '', 'id': 3}}, {'user': {'username': 'userc', 'nickname': 'User C', 'email': 'userc@localhost', 'is_superuser': False, 'last_login': None, 'date_join': '2018-04-04 00:00:00', 'image': '', 'active': False, 'locked': False, 'deleted': False, 'auth_type': 'default', 'timezone': '', 'id': 4}}]}327    >>> #query array with @count and @page, @page bad param328    >>> data ='''{329    ... "[]":{330    ...         "@count":2,331    ...         "@page":"bad",332    ...         "user": {333    ...         }334    ...     }335    ... }'''336    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("usera"), middlewares=[])337    >>> d = json_loads(r.data)338    >>> print(d)339    {'code': 400, 'msg': "@page should be an int, but get 'bad'"}340    >>> #query array with @count and @page, @page <0341    >>> data ='''{342    ... "[]":{343    ...         "@count":2,344    ...         "@page":-2,345    ...         "user": {346    ...         }347    ...     }348    ... }'''349    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("usera"), middlewares=[])350    >>> d = json_loads(r.data)351    >>> print(d)352    {'code': 400, 'msg': "page should >0, but get '-2'"}353    >>> #query array with @count/@page/@query, @query bad param354    >>> data ='''{355    ... "[]":{356    ...         "@count":2,357    ...         "@page":1,358    ...         "@query":3,359    ...         "user": {360    ...         }361    ...     }362    ... }'''363    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("usera"), middlewares=[])364    >>> d = json_loads(r.data)365    >>> print(d)366    {'code': 400, 'msg': "bad param 'query': 3"}367    >>> #query array with @count/@page/@query, @query = 0368    >>> data ='''{369    ... "[]":{370    ...         "@count":2,371    ...         "@page":1,372    ...         "@query":0,373    ...         "user": {374    ...         }375    ...     }376    ... }'''377    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("usera"), middlewares=[])378    >>> d = json_loads(r.data)379    >>> print(d)380    {'code': 200, 'msg': 'success', '[]': [{'user': {'username': 'userb', 'nickname': 'User B', 'email': 'userb@localhost', 'is_superuser': False, 'last_login': None, 'date_join': '2018-03-03 00:00:00', 'image': '', 'active': False, 'locked': False, 'deleted': False, 'auth_type': 'default', 'timezone': '', 'id': 3}}, {'user': {'username': 'userc', 'nickname': 'User C', 'email': 'userc@localhost', 'is_superuser': False, 'last_login': None, 'date_join': '2018-04-04 00:00:00', 'image': '', 'active': False, 'locked': False, 'deleted': False, 'auth_type': 'default', 'timezone': '', 'id': 4}}]}381    >>> #query array with OWNER but cannot filter with OWNER382    >>> data ='''{383    ...    "[]":{384    ...         "publicnotice": {385    ...             "@role":"OWNER"386    ...         }387    ...    }388    ... }'''389    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("admin"), middlewares=[])390    >>> d = json_loads(r.data)391    >>> print(d)392    {'code': 400, 'msg': "'publicnotice' cannot filter with owner"}393    >>> #query array with OWNER394    >>> data ='''{395    ...    "[]":{396    ...         "comment": {397    ...             "@role":"OWNER"398    ...         }399    ...    }400    ... }'''401    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("admin"), middlewares=[])402    >>> d = json_loads(r.data)403    >>> print(d)404    {'code': 200, 'msg': 'success', '[]': [{'comment': {'user_id': 1, 'to_id': 3, 'moment_id': 1, 'date': '2018-11-01 00:00:00', 'content': 'comment from admin', 'id': 1}}]}405    >>> #query array with OWNER, the model using owner_condition406    >>> data ='''{407    ...    "[]":{408    ...         "moment": {409    ...             "@role":"OWNER"410    ...         }411    ...    }412    ... }'''413    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("usera"), middlewares=[])414    Moment: owner_condition415    >>> d = json_loads(r.data)416    >>> print(d)417    {'code': 200, 'msg': 'success', '[]': [{'moment': {'user_id': 2, 'date': '2018-11-01 00:00:00', 'content': 'test moment', 'picture_list': '[]', 'id': 1}}]}418    >>> #query array with some filter column419    >>> data ='''{420    ...   "[]":{421    ...     "@count":4,422    ...     "@page":0,423    ...     "user":{424    ...             "@column":"id,username,nickname,email",425    ...             "@order":"id-",426    ...             "@role":"ADMIN",427    ...             "username":"admin"428    ...         }429    ...     }430    ... }'''431    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("admin"), middlewares=[])432    >>> d = json_loads(r.data)433    >>> print(d)434    {'code': 200, 'msg': 'success', '[]': [{'user': {'username': 'admin', 'nickname': 'Administrator', 'email': 'admin@localhost', 'id': 1}}]}435    >>> #query array with reference, @query = 1436    >>> data ='''{437    ...     "[]":{438    ...         "@count":2,439    ...         "@page":0,440    ...         "@query":1,441    ...         "user":{442    ...             "@column":"id,username,nickname,email",443    ...             "@order":"id-",444    ...             "@role":"ADMIN"445    ...         }446    ...     },447    ...     "total@":"/[]/total"448    ... }'''449    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("admin"), middlewares=[])450    >>> d = json_loads(r.data)451    >>> print(d)452    {'code': 200, 'msg': 'success', 'total': 4}453    >>> #query array with reference, @query = 2454    >>> data ='''{455    ...     "[]":{456    ...         "@count":2,457    ...         "@page":0,458    ...         "@query":2,459    ...         "user":{460    ...             "@column":"id,username,nickname,email",461    ...             "@order":"id-",462    ...             "@role":"ADMIN"463    ...         }464    ...     },465    ...     "total@":"/[]/total"466    ... }'''467    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("admin"), middlewares=[])468    >>> d = json_loads(r.data)469    >>> print(d)470    {'code': 200, 'msg': 'success', '[]': [{'user': {'username': 'userc', 'nickname': 'User C', 'email': 'userc@localhost', 'id': 4}}, {'user': {'username': 'userb', 'nickname': 'User B', 'email': 'userb@localhost', 'id': 3}}], 'total': 4}471    >>> #query array with @order +472    >>> data ='''{473    ...     "[]":{474    ...         "@count":2,475    ...         "@page":0,476    ...         "@query":2,477    ...         "user":{478    ...             "@column":"id,username,nickname,email",479    ...             "@order":"id+",480    ...             "@role":"ADMIN"481    ...         }482    ...     },483    ...     "total@":"/[]/total"484    ... }'''485    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("admin"), middlewares=[])486    >>> d = json_loads(r.data)487    >>> print(d)488    {'code': 200, 'msg': 'success', '[]': [{'user': {'username': 'admin', 'nickname': 'Administrator', 'email': 'admin@localhost', 'id': 1}}, {'user': {'username': 'usera', 'nickname': 'User A', 'email': 'usera@localhost', 'id': 2}}], 'total': 4}489    >>> #query array with @order having a non existing column490    >>> data ='''{491    ...     "[]":{492    ...         "@count":2,493    ...         "@page":0,494    ...         "@query":2,495    ...         "user":{496    ...             "@column":"id,username,nickname,email",497    ...             "@order":"nonexist+",498    ...             "@role":"ADMIN"499    ...         }500    ...     },501    ...     "total@":"/[]/total"502    ... }'''503    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("admin"), middlewares=[])504    >>> d = json_loads(r.data)505    >>> print(d)506    {'code': 400, 'msg': "'user' doesn't have column 'nonexist'"}507    >>> #query array with @expr508    >>> data ='''{509    ...   "[]":{510    ...     "@count":4,511    ...     "@page":0,512    ...     "user":{513    ...             "@column":"id,username,nickname,email",514    ...             "@order":"id-",515    ...             "@role":"ADMIN",516    ...             "@expr":["username$","|","nickname$"],517    ...             "username$":"%b%",518    ...             "nickname$":"%c%"519    ...         }520    ...     }521    ... }'''522    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("admin"), middlewares=[])523    >>> d = json_loads(r.data)524    >>> print(d)525    {'code': 200, 'msg': 'success', '[]': [{'user': {'username': 'userc', 'nickname': 'User C', 'email': 'userc@localhost', 'id': 4}}, {'user': {'username': 'userb', 'nickname': 'User B', 'email': 'userb@localhost', 'id': 3}}]}526    >>> #query array with @expr, bad param which is not list527    >>> data ='''{528    ...   "[]":{529    ...     "@count":4,530    ...     "@page":0,531    ...     "user":{532    ...             "@column":"id,username,nickname,email",533    ...             "@order":"id-",534    ...             "@role":"ADMIN",535    ...             "@expr":{},536    ...             "username$":"%b%",537    ...             "nickname$":"%c%"538    ...         }539    ...     }540    ... }'''541    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("admin"), middlewares=[])542    >>> d = json_loads(r.data)543    >>> print(d)544    {'code': 400, 'msg': "only accept array in @expr, but get 'OrderedDict()'"}545    >>> #query array with @expr, bad param which is an empty list546    >>> data ='''{547    ...   "[]":{548    ...     "@count":4,549    ...     "@page":0,550    ...     "user":{551    ...             "@column":"id,username,nickname,email",552    ...             "@order":"id-",553    ...             "@role":"ADMIN",554    ...             "@expr":[],555    ...             "username$":"%b%",556    ...             "nickname$":"%c%"557    ...         }558    ...     }559    ... }'''560    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("admin"), middlewares=[])561    >>> d = json_loads(r.data)562    >>> print(d)563    {'code': 400, 'msg': "only accept 2 or 3 items in @expr, but get '[]'"}564    >>> #query array with @expr, bad param which is >3 items list565    >>> data ='''{566    ...   "[]":{567    ...     "@count":4,568    ...     "@page":0,569    ...     "user":{570    ...             "@column":"id,username,nickname,email",571    ...             "@order":"id-",572    ...             "@role":"ADMIN",573    ...             "@expr":["username$","|","username$","|","nickname$"],574    ...             "username$":"%b%",575    ...             "nickname$":"%c%"576    ...         }577    ...     }578    ... }'''579    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("admin"), middlewares=[])580    >>> d = json_loads(r.data)581    >>> print(d)582    {'code': 400, 'msg': "only accept 2 or 3 items in @expr, but get '['username$', '|', 'username$', '|', 'nickname$']'"}583    >>> #query array with @expr, bad param which have bad operator584    >>> data ='''{585    ...   "[]":{586    ...     "@count":4,587    ...     "@page":0,588    ...     "user":{589    ...             "@column":"id,username,nickname,email",590    ...             "@order":"id-",591    ...             "@role":"ADMIN",592    ...             "@expr":["username$","*","nickname$"],593    ...             "username$":"%b%",594    ...             "nickname$":"%c%"595    ...         }596    ...     }597    ... }'''598    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("admin"), middlewares=[])599    >>> d = json_loads(r.data)600    >>> print(d)601    {'code': 400, 'msg': "unknown operator: '*'"}602    >>> #query array with @expr, bad expr: & only 1 parameter603    >>> data ='''{604    ...   "[]":{605    ...     "@count":4,606    ...     "@page":0,607    ...     "user":{608    ...             "@column":"id,username,nickname,email",609    ...             "@order":"id-",610    ...             "@role":"ADMIN",611    ...             "@expr":["&","nickname$"],612    ...             "username$":"%b%",613    ...             "nickname$":"%c%"614    ...         }615    ...     }616    ... }'''617    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("admin"), middlewares=[])618    >>> d = json_loads(r.data)619    >>> print(d)620    {'code': 400, 'msg': "'&'(and) expression need 3 items, but get '['&', 'nickname$']'"}621    >>> #query array with @expr, bad expr: | only 1 parameter622    >>> data ='''{623    ...   "[]":{624    ...     "@count":4,625    ...     "@page":0,626    ...     "user":{627    ...             "@column":"id,username,nickname,email",628    ...             "@order":"id-",629    ...             "@role":"ADMIN",630    ...             "@expr":["|","nickname$"],631    ...             "username$":"%b%",632    ...             "nickname$":"%c%"633    ...         }634    ...     }635    ... }'''636    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("admin"), middlewares=[])637    >>> d = json_loads(r.data)638    >>> print(d)639    {'code': 400, 'msg': "'|'(or) expression need 3 items, but get '['|', 'nickname$']'"}640    >>> #query array with @expr, bad expr: | only 1 parameter641    >>> data ='''{642    ...   "[]":{643    ...     "@count":4,644    ...     "@page":0,645    ...     "user":{646    ...             "@column":"id,username,nickname,email",647    ...             "@order":"id-",648    ...             "@role":"ADMIN",649    ...             "@expr":["username$","!","nickname$"],650    ...             "username$":"%b%",651    ...             "nickname$":"%c%"652    ...         }653    ...     }654    ... }'''655    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("admin"), middlewares=[])656    >>> d = json_loads(r.data)657    >>> print(d)658    {'code': 400, 'msg': "'!'(not) expression need 2 items, but get '['username$', '!', 'nickname$']'"}659    >>> #query array with like660    >>> data ='''{661    ...   "[]":{662    ...     "@count":4,663    ...     "@page":0,664    ...     "user":{665    ...             "@column":"id,username,nickname,email",666    ...             "@order":"id-",667    ...             "@role":"ADMIN",668    ...             "username$":"%b%"669    ...         }670    ...     }671    ... }'''672    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("admin"), middlewares=[])673    >>> d = json_loads(r.data)674    >>> print(d)675    {'code': 200, 'msg': 'success', '[]': [{'user': {'username': 'userb', 'nickname': 'User B', 'email': 'userb@localhost', 'id': 3}}]}676    >>> #query array with like, but gave a nonexist column677    >>> data ='''{678    ...   "[]":{679    ...     "@count":4,680    ...     "@page":0,681    ...     "user":{682    ...             "@column":"id,username,nickname,email",683    ...             "@order":"id-",684    ...             "@role":"ADMIN",685    ...             "nonexist$":"%b%"686    ...         }687    ...     }688    ... }'''689    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("admin"), middlewares=[])690    >>> d = json_loads(r.data)691    >>> print(d)692    {'code': 400, 'msg': "model does not have column: 'nonexist'"}693    >>> #query array with a nonexist column694    >>> data ='''{695    ...   "[]":{696    ...     "@count":4,697    ...     "@page":0,698    ...     "user":{699    ...             "@column":"id,username,nickname,email",700    ...             "@order":"id-",701    ...             "@role":"ADMIN",702    ...             "nonexist":1703    ...         }704    ...     }705    ... }'''706    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("admin"), middlewares=[])707    >>> d = json_loads(r.data)708    >>> print(d)709    {'code': 400, 'msg': "non-existent column or not support item: 'nonexist'"}710    >>> #query array, {} with list711    >>> data ='''{712    ... "[]":{713    ...         "moment": {714    ...             "id{}": [1, 2]715    ...         }716    ...     }717    ... }'''718    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("admin"), middlewares=[])719    >>> d = json_loads(r.data)720    >>> print(d)721    {'code': 200, 'msg': 'success', '[]': [{'moment': {'user_id': 2, 'date': '2018-11-01 00:00:00', 'content': 'test moment', 'picture_list': '[]', 'id': 1}}, {'moment': {'user_id': 3, 'date': '2018-11-02 00:00:00', 'content': 'test moment from b', 'picture_list': '[]', 'id': 2}}]}722    >>> #query array, !{} with list723    >>> data ='''{724    ... "[]":{725    ...         "moment": {726    ...             "id!{}": [1, 2]727    ...         }728    ...     }729    ... }'''730    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("admin"), middlewares=[])731    >>> d = json_loads(r.data)732    >>> print(d)733    {'code': 200, 'msg': 'success', '[]': [{'moment': {'user_id': 4, 'date': '2018-11-06 00:00:00', 'content': 'test moment from c', 'picture_list': '[]', 'id': 3}}]}734    >>> #query array, {} with a non-exist column name735    >>> data ='''{736    ... "[]":{737    ...         "moment": {738    ...             "nonexist{}": [1, 2]739    ...         }740    ...     }741    ... }'''742    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("admin"), middlewares=[])743    >>> d = json_loads(r.data)744    >>> print(d)745    {'code': 400, 'msg': "model does not have column: 'nonexist'"}746    >>> #query array, {} >=747    >>> data ='''{748    ... "[]":{749    ...         "moment": {750    ...             "id{}": ">=2"751    ...         }752    ...     }753    ... }'''754    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("admin"), middlewares=[])755    >>> d = json_loads(r.data)756    >>> print(d)757    {'code': 200, 'msg': 'success', '[]': [{'moment': {'user_id': 3, 'date': '2018-11-02 00:00:00', 'content': 'test moment from b', 'picture_list': '[]', 'id': 2}}, {'moment': {'user_id': 4, 'date': '2018-11-06 00:00:00', 'content': 'test moment from c', 'picture_list': '[]', 'id': 3}}]}758    >>> #query array, {} =759    >>> data ='''{760    ... "[]":{761    ...         "moment": {762    ...             "id{}": "=2"763    ...         }764    ...     }765    ... }'''766    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("admin"), middlewares=[])767    >>> d = json_loads(r.data)768    >>> print(d)769    {'code': 200, 'msg': 'success', '[]': [{'moment': {'user_id': 3, 'date': '2018-11-02 00:00:00', 'content': 'test moment from b', 'picture_list': '[]', 'id': 2}}]}770    >>> #query array, {} >771    >>> data ='''{772    ... "[]":{773    ...         "moment": {774    ...             "id{}": ">2"775    ...         }776    ...     }777    ... }'''778    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("admin"), middlewares=[])779    >>> d = json_loads(r.data)780    >>> print(d)781    {'code': 200, 'msg': 'success', '[]': [{'moment': {'user_id': 4, 'date': '2018-11-06 00:00:00', 'content': 'test moment from c', 'picture_list': '[]', 'id': 3}}]}782    >>> #query array, {} <=783    >>> data ='''{784    ... "[]":{785    ...         "moment": {786    ...             "id{}": "<=2"787    ...         }788    ...     }789    ... }'''790    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("admin"), middlewares=[])791    >>> d = json_loads(r.data)792    >>> print(d)793    {'code': 200, 'msg': 'success', '[]': [{'moment': {'user_id': 2, 'date': '2018-11-01 00:00:00', 'content': 'test moment', 'picture_list': '[]', 'id': 1}}, {'moment': {'user_id': 3, 'date': '2018-11-02 00:00:00', 'content': 'test moment from b', 'picture_list': '[]', 'id': 2}}]}794    >>> #query array, {} <795    >>> data ='''{796    ... "[]":{797    ...         "moment": {798    ...             "id{}": "<2"799    ...         }800    ...     }801    ... }'''802    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("admin"), middlewares=[])803    >>> d = json_loads(r.data)804    >>> print(d)805    {'code': 200, 'msg': 'success', '[]': [{'moment': {'user_id': 2, 'date': '2018-11-01 00:00:00', 'content': 'test moment', 'picture_list': '[]', 'id': 1}}]}806    >>> #query array, {} <= with datetime807    >>> data ='''{808    ... "[]":{809    ...         "moment": {810    ...             "date{}": "<='2018-11-02 00:00'"811    ...         }812    ...     }813    ... }'''814    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("admin"), middlewares=[])815    >>> d = json_loads(r.data)816    >>> print(d)817    {'code': 200, 'msg': 'success', '[]': [{'moment': {'user_id': 2, 'date': '2018-11-01 00:00:00', 'content': 'test moment', 'picture_list': '[]', 'id': 1}}, {'moment': {'user_id': 3, 'date': '2018-11-02 00:00:00', 'content': 'test moment from b', 'picture_list': '[]', 'id': 2}}]}818    >>> #query array, {} >= with datetime819    >>> data ='''{820    ... "[]":{821    ...         "moment": {822    ...             "date{}": ">='2018-11-02 00:00'"823    ...         }824    ...     }825    ... }'''826    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("admin"), middlewares=[])827    >>> d = json_loads(r.data)828    >>> print(d)829    {'code': 200, 'msg': 'success', '[]': [{'moment': {'user_id': 3, 'date': '2018-11-02 00:00:00', 'content': 'test moment from b', 'picture_list': '[]', 'id': 2}}, {'moment': {'user_id': 4, 'date': '2018-11-06 00:00:00', 'content': 'test moment from c', 'picture_list': '[]', 'id': 3}}]}830    >>> #query array, {} >= with a invalid datetime831    >>> data ='''{832    ... "[]":{833    ...         "moment": {834    ...             "date{}": ">='2018-11-42 00:00'"835    ...         }836    ...     }837    ... }'''838    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("admin"), middlewares=[])839    >>> d = json_loads(r.data)840    >>> print(d)841    {'code': 400, 'msg': "''2018-11-42 00:00'' cannot convert to datetime"}842    >>> #query array, !{} <843    >>> data ='''{844    ... "[]":{845    ...         "moment": {846    ...             "id!{}": "<2"847    ...         }848    ...     }849    ... }'''850    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("admin"), middlewares=[])851    >>> d = json_loads(r.data)852    >>> print(d)853    {'code': 200, 'msg': 'success', '[]': [{'moment': {'user_id': 3, 'date': '2018-11-02 00:00:00', 'content': 'test moment from b', 'picture_list': '[]', 'id': 2}}, {'moment': {'user_id': 4, 'date': '2018-11-06 00:00:00', 'content': 'test moment from c', 'picture_list': '[]', 'id': 3}}]}854    >>> #query array, {} !=855    >>> data ='''{856    ... "[]":{857    ...         "moment": {858    ...             "id{}": "!=2"859    ...         }860    ...     }861    ... }'''862    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("admin"), middlewares=[])863    >>> d = json_loads(r.data)864    >>> print(d)865    {'code': 200, 'msg': 'success', '[]': [{'moment': {'user_id': 2, 'date': '2018-11-01 00:00:00', 'content': 'test moment', 'picture_list': '[]', 'id': 1}}, {'moment': {'user_id': 4, 'date': '2018-11-06 00:00:00', 'content': 'test moment from c', 'picture_list': '[]', 'id': 3}}]}866    >>> #query array, {} with wrong operator867    >>> data ='''{868    ... "[]":{869    ...         "moment": {870    ...             "id{}": "%=2"871    ...         }872    ...     }873    ... }'''874    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("admin"), middlewares=[])875    >>> d = json_loads(r.data)876    >>> print(d)877    {'code': 400, 'msg': "not support '%=2'"}878    >>> #query array, {} condition list879    >>> data ='''{880    ... "[]":{881    ...         "user": {882    ...             "@role": "ADMIN",883    ...             "id{}": "<=2,>3",884    ...             "@column": "username,nickname,id"885    ...         }886    ...     }887    ... }'''888    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("admin"), middlewares=[])889    >>> d = json_loads(r.data)890    >>> print(d)891    {'code': 200, 'msg': 'success', '[]': [{'user': {'username': 'admin', 'nickname': 'Administrator', 'id': 1}}, {'user': {'username': 'usera', 'nickname': 'User A', 'id': 2}}, {'user': {'username': 'userc', 'nickname': 'User C', 'id': 4}}]}892    >>> #query array, |{} condition list893    >>> data ='''{894    ... "[]":{895    ...         "user": {896    ...             "@role": "ADMIN",897    ...             "id|{}": "<=2,>3",898    ...             "@column": "username,nickname,id"899    ...         }900    ...     }901    ... }'''902    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("admin"), middlewares=[])903    >>> d = json_loads(r.data)904    >>> print(d)905    {'code': 200, 'msg': 'success', '[]': [{'user': {'username': 'admin', 'nickname': 'Administrator', 'id': 1}}, {'user': {'username': 'usera', 'nickname': 'User A', 'id': 2}}, {'user': {'username': 'userc', 'nickname': 'User C', 'id': 4}}]}906    >>> #query array, &{} condition list907    >>> data ='''{908    ... "[]":{909    ...         "user": {910    ...             "@role": "ADMIN",911    ...             "id&{}": ">2,<=4",912    ...             "@column": "username,nickname,id"913    ...         }914    ...     }915    ... }'''916    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("admin"), middlewares=[])917    >>> d = json_loads(r.data)918    >>> print(d)919    {'code': 200, 'msg': 'success', '[]': [{'user': {'username': 'userb', 'nickname': 'User B', 'id': 3}}, {'user': {'username': 'userc', 'nickname': 'User C', 'id': 4}}]}920    >>> #query array, &{} condition list921    >>> data ='''{922    ... "[]":{923    ...         "user": {924    ...             "@role": "ADMIN",925    ...             "date_join&{}": ">='2018-1-1 00:00',<='2018-2-2 00:00'",926    ...             "@column": "username,nickname,id,date_join"927    ...         }928    ...     }929    ... }'''930    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("admin"), middlewares=[])931    >>> d = json_loads(r.data)932    >>> print(d)933    {'code': 200, 'msg': 'success', '[]': [{'user': {'username': 'admin', 'nickname': 'Administrator', 'date_join': '2018-01-01 00:00:00', 'id': 1}}, {'user': {'username': 'usera', 'nickname': 'User A', 'date_join': '2018-02-02 00:00:00', 'id': 2}}]}934    >>> #query array, {} multiple condition to a same field935    >>> data ='''{936    ... "[]":{937    ...         "user": {938    ...             "@role": "ADMIN",939    ...             "id&{}": ">2,<=4",940    ...             "id{}": "!=3",941    ...             "@column": "username,nickname,id"942    ...         }943    ...     }944    ... }'''945    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("admin"), middlewares=[])946    >>> d = json_loads(r.data)947    >>> print(d)948    {'code': 200, 'msg': 'success', '[]': [{'user': {'username': 'userc', 'nickname': 'User C', 'id': 4}}]}949    >>> #query array, !{} condition list950    >>> data ='''{951    ... "[]":{952    ...         "user": {953    ...             "@role": "ADMIN",954    ...             "id!{}": ">2,<=4",955    ...             "@column": "username,nickname,id"956    ...         }957    ...     }958    ... }'''959    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("admin"), middlewares=[])960    >>> d = json_loads(r.data)961    >>> print(d)962    {'code': 400, 'msg': "'!' not supported in condition list"}963    >>> #query array, |{} condition list, item more than 2964    >>> data ='''{965    ... "[]":{966    ...         "user": {967    ...             "@role": "ADMIN",968    ...             "id|{}": "=1,=2,>=4",969    ...             "@column": "username,id"970    ...         }971    ...     }972    ... }'''973    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("admin"), middlewares=[])974    >>> d = json_loads(r.data)975    >>> print(d)976    {'code': 200, 'msg': 'success', '[]': [{'user': {'username': 'admin', 'id': 1}}, {'user': {'username': 'usera', 'id': 2}}, {'user': {'username': 'userc', 'id': 4}}]}977    >>> #Association query: Two tables, one to one,ref path is absolute path978    >>> data ='''{979    ...     "moment":{},980    ...     "user":{981    ...     "@column": "id,username,email",982    ...     "id@": "moment/user_id"983    ...     }984    ... }'''985    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("admin"), middlewares=[])986    >>> d = json_loads(r.data)987    >>> print(d)988    {'code': 200, 'msg': 'success', 'moment': {'user_id': 2, 'date': '2018-11-01 00:00:00', 'content': 'test moment', 'picture_list': '[]', 'id': 1}, 'user': {'username': 'usera', 'email': 'usera@localhost', 'id': 2}}989    >>> #Association query: Two tables, one is array, one is single, there is a abs reference to array990    >>> data ='''{991    ...     "moment[]":{"moment":{"@count":3}},992    ...     "user":{993    ...     "@column": "id,username,email",994    ...     "id@": "moment[]/1/moment/user_id"995    ...     }996    ... }'''997    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("admin"), middlewares=[])998    >>> d = json_loads(r.data)999    >>> print(d)1000    {'code': 200, 'msg': 'success', 'moment[]': [{'moment': {'user_id': 2, 'date': '2018-11-01 00:00:00', 'content': 'test moment', 'picture_list': '[]', 'id': 1}}, {'moment': {'user_id': 3, 'date': '2018-11-02 00:00:00', 'content': 'test moment from b', 'picture_list': '[]', 'id': 2}}, {'moment': {'user_id': 4, 'date': '2018-11-06 00:00:00', 'content': 'test moment from c', 'picture_list': '[]', 'id': 3}}], 'user': {'username': 'userb', 'email': 'userb@localhost', 'id': 3}}1001    >>> #Association query: Two tables, one is array, one is single, there is a rel reference to array1002    >>> data ='''{1003    ...     "moment[]":{"moment":{"@count":3}},1004    ...     "user":{1005    ...     "@column": "id,username,email",1006    ...     "id@": "/moment[]/1/moment/user_id"1007    ...     }1008    ... }'''1009    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("admin"), middlewares=[])1010    >>> d = json_loads(r.data)1011    >>> print(d)1012    {'code': 200, 'msg': 'success', 'moment[]': [{'moment': {'user_id': 2, 'date': '2018-11-01 00:00:00', 'content': 'test moment', 'picture_list': '[]', 'id': 1}}, {'moment': {'user_id': 3, 'date': '2018-11-02 00:00:00', 'content': 'test moment from b', 'picture_list': '[]', 'id': 2}}, {'moment': {'user_id': 4, 'date': '2018-11-06 00:00:00', 'content': 'test moment from c', 'picture_list': '[]', 'id': 3}}], 'user': {'username': 'userb', 'email': 'userb@localhost', 'id': 3}}1013    >>> #Association query: Two tables, one to one,ref path is relative path1014    >>> data ='''{1015    ...     "moment":{},1016    ...     "user":{1017    ...     "@column": "id,username,email",1018    ...     "id@": "/moment/user_id"1019    ...     }1020    ... }'''1021    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("admin"), middlewares=[])1022    >>> d = json_loads(r.data)1023    >>> print(d)1024    {'code': 200, 'msg': 'success', 'moment': {'user_id': 2, 'date': '2018-11-01 00:00:00', 'content': 'test moment', 'picture_list': '[]', 'id': 1}, 'user': {'username': 'usera', 'email': 'usera@localhost', 'id': 2}}1025    """1026def test_apijson_head():1027    """1028    >>> application = make_simple_application(project_dir='.')1029    >>> handler = application.handler()1030    >>> #apijson head1031    >>> data ='''{1032    ...     "moment": {1033    ...         "user_id": 21034    ...     }1035    ... }'''1036    >>> r = handler.post('/apijson/head', data=data, pre_call=pre_call_as("admin"), middlewares=[])1037    >>> d = json_loads(r.data)1038    >>> print(d)1039    {'code': 200, 'msg': 'success', 'moment': {'code': 200, 'msg': 'success', 'count': 1}}1040    >>> #apijson head, with a nonexistant model1041    >>> data ='''{1042    ...     "nonexist": {1043    ...         "user_id": 21044    ...     }1045    ... }'''1046    >>> r = handler.post('/apijson/head', data=data, pre_call=pre_call_as("admin"), middlewares=[])1047    >>> d = json_loads(r.data)1048    >>> print(d)1049    {'code': 400, 'msg': "model 'nonexist' not found"}1050    >>> #apijson head, without permission of HEAD1051    >>> data ='''{1052    ...     "privacy": {1053    ...         "@role":"LOGIN",1054    ...         "id": 11055    ...     }1056    ... }'''1057    >>> r = handler.post('/apijson/head', data=data, pre_call=pre_call_as("admin"), middlewares=[])1058    >>> d = json_loads(r.data)1059    >>> print(d)1060    {'code': 400, 'msg': "role 'LOGIN' not have permission HEAD for 'privacy'"}1061    >>> #apijson head, without user1062    >>> data ='''{1063    ...     "privacy": {1064    ...         "@role":"ADMIN",1065    ...         "id": 11066    ...     }1067    ... }'''1068    >>> r = handler.post('/apijson/head', data=data, middlewares=[])1069    >>> d = json_loads(r.data)1070    >>> print(d)1071    {'code': 400, 'msg': "user doesn't have role 'ADMIN'"}1072    >>> #apijson head, without user and @role1073    >>> data ='''{1074    ...     "privacy": {1075    ...         "id": 11076    ...     }1077    ... }'''1078    >>> r = handler.post('/apijson/head', data=data, middlewares=[])1079    >>> d = json_loads(r.data)1080    >>> print(d)1081    {'code': 400, 'msg': "role 'UNKNOWN' not have permission HEAD for 'privacy'"}1082    >>> #apijson head, user don't have role1083    >>> data ='''{1084    ...     "privacy": {1085    ...         "@role":"ADMIN",1086    ...         "id": 11087    ...     }1088    ... }'''1089    >>> r = handler.post('/apijson/head', data=data, pre_call=pre_call_as("usera"), middlewares=[])1090    >>> d = json_loads(r.data)1091    >>> print(d)1092    {'code': 400, 'msg': "user doesn't have role 'ADMIN'"}1093    >>> #apijson head, with OWNER1094    >>> data ='''{1095    ...     "moment": {1096    ...         "@role":"OWNER"1097    ...     }1098    ... }'''1099    >>> r = handler.post('/apijson/head', data=data, pre_call=pre_call_as("usera"), middlewares=[])1100    Moment: owner_condition1101    >>> d = json_loads(r.data)1102    >>> print(d)1103    {'code': 200, 'msg': 'success', 'moment': {'code': 200, 'msg': 'success', 'count': 1}}1104    >>> #apijson head, with OWNER but cannot filter with OWNER1105    >>> data ='''{1106    ...     "publicnotice": {1107    ...         "@role":"OWNER"1108    ...     }1109    ... }'''1110    >>> r = handler.post('/apijson/head', data=data, pre_call=pre_call_as("usera"), middlewares=[])1111    >>> d = json_loads(r.data)1112    >>> print(d)1113    {'code': 400, 'msg': "'publicnotice' cannot filter with owner"}1114    >>> #apijson head, with a nonexistant column1115    >>> data ='''{1116    ...     "moment": {1117    ...         "nonexist": 21118    ...     }1119    ... }'''1120    >>> r = handler.post('/apijson/head', data=data, pre_call=pre_call_as("admin"), middlewares=[])1121    >>> d = json_loads(r.data)1122    >>> print(d)1123    {'code': 400, 'msg': "'moment' don't have field 'nonexist'"}1124    """1125def test_apijson_post():1126    """1127    >>> application = make_simple_application(project_dir='.')1128    >>> handler = application.handler()1129    >>> #apijson post, without @tag1130    >>> data ='''{1131    ...     "moment": {1132    ...         "content": "new moment for test",1133    ...         "picture_list": ["http://static.oschina.net/uploads/user/48/96331_50.jpg"]1134    ...     }1135    ... }'''1136    >>> r = handler.post('/apijson/post', data=data, pre_call=pre_call_as("admin"), middlewares=[])1137    >>> d = json_loads(r.data)1138    >>> print(d)1139    {'code': 400, 'msg': "'tag' parameter is needed"}1140    >>> #apijson post1141    >>> data ='''{1142    ...     "moment": {1143    ...         "content": "new moment for test",1144    ...         "picture_list": ["http://static.oschina.net/uploads/user/48/96331_50.jpg"]1145    ...     },1146    ...     "@tag": "moment"1147    ... }'''1148    >>> r = handler.post('/apijson/post', data=data, pre_call=pre_call_as("admin"), middlewares=[])1149    >>> d = json_loads(r.data)1150    >>> print(d)1151    {'code': 200, 'msg': 'success', 'moment': {'id': 4, 'count': 1, 'code': 200, 'message': 'success'}}1152    >>> #apijson post to a non exist model1153    >>> data ='''{1154    ...     "nonexist": {1155    ...         "content": "new moment for test",1156    ...         "picture_list": ["http://static.oschina.net/uploads/user/48/96331_50.jpg"]1157    ...     },1158    ...     "@tag": "nonexist"1159    ... }'''1160    >>> r = handler.post('/apijson/post', data=data, pre_call=pre_call_as("admin"), middlewares=[])1161    >>> d = json_loads(r.data)1162    >>> print(d)1163    {'code': 400, 'msg': "model 'nonexist' not found"}1164    >>> #apijson post, tag is model which not define in APIJSON_REQUESTS1165    >>> data ='''{1166    ...     "moment": {1167    ...         "content": "new moment for test",1168    ...         "picture_list": ["http://static.oschina.net/uploads/user/48/96331_50.jpg"]1169    ...     },1170    ...     "@tag": "role"1171    ... }'''1172    >>> r = handler.post('/apijson/post', data=data, pre_call=pre_call_as("admin"), middlewares=[])1173    >>> d = json_loads(r.data)1174    >>> print(d)1175    {'code': 400, 'msg': "tag 'role' not found"}1176    >>> #apijson post, tag is model which not define in APIJSON_REQUESTS1177    >>> data ='''{1178    ...     "user": {1179    ...         "username": "test"1180    ...     },1181    ...     "@tag": "user"1182    ... }'''1183    >>> r = handler.post('/apijson/post', data=data, pre_call=pre_call_as("admin"), middlewares=[])1184    >>> d = json_loads(r.data)1185    >>> print(d)1186    {'code': 400, 'msg': "tag 'user' not found"}1187    """1188def test_apijson_put():1189    """1190    >>> application = make_simple_application(project_dir='.')1191    >>> handler = application.handler()1192    >>> #apijson put1193    >>> data ='''{1194    ...     "moment": {1195    ...         "id": 1,1196    ...         "content": "moment content after change"1197    ...     },1198    ...     "@tag": "moment"1199    ... }'''1200    >>> r = handler.post('/apijson/put', data=data, pre_call=pre_call_as("usera"), middlewares=[])1201    >>> d = json_loads(r.data)1202    >>> print(d)1203    {'code': 200, 'msg': 'success', 'moment': {'id': 1, 'code': 200, 'msg': 'success', 'count': 1}}1204    >>> #apijson put, with @role1205    >>> data ='''{1206    ...     "moment": {1207    ...         "@role": "ADMIN",1208    ...         "id": 1,1209    ...         "content": "moment content after change 2"1210    ...     },1211    ...     "@tag": "moment"1212    ... }'''1213    >>> r = handler.post('/apijson/put', data=data, pre_call=pre_call_as("admin"), middlewares=[])1214    >>> d = json_loads(r.data)1215    >>> print(d)1216    {'code': 200, 'msg': 'success', 'moment': {'id': 1, 'code': 200, 'msg': 'success', 'count': 1}}1217    >>> #apijson put, model not exists1218    >>> data ='''{1219    ...     "nonexist": {1220    ...         "@role": "ADMIN",1221    ...         "id": 1,1222    ...         "content": "moment content after change 2"1223    ...     },1224    ...     "@tag": "nonexist"1225    ... }'''1226    >>> r = handler.post('/apijson/put', data=data, pre_call=pre_call_as("admin"), middlewares=[])1227    >>> d = json_loads(r.data)1228    >>> print(d)1229    {'code': 400, 'msg': "model 'nonexist' not found"}1230    >>> #apijson put, with a model no tag1231    >>> data ='''{1232    ...     "norequesttag": {1233    ...         "user_id": 1,1234    ...         "content": "test"1235    ...     },1236    ...     "@tag": "norequesttag"1237    ... }'''1238    >>> r = handler.post('/apijson/put', data=data, pre_call=pre_call_as("usera"), middlewares=[])1239    >>> d = json_loads(r.data)1240    >>> print(d)1241    {'code': 400, 'msg': "tag 'norequesttag' not found"}1242    >>> #apijson put, test ADD in put1243    >>> data ='''{1244    ...     "moment": {1245    ...         "id": 2,1246    ...         "content": "moment content after change"1247    ...     },1248    ...     "@tag": "moment"1249    ... }'''1250    >>> r = handler.post('/apijson/put', data=data, pre_call=pre_call_as("usera"), middlewares=[])1251    >>> d = json_loads(r.data)1252    >>> print(d)1253    {'code': 400, 'msg': 'no permission'}1254    >>> #apijson put, without id1255    >>> data ='''{1256    ...     "moment": {1257    ...         "content": "moment content after change"1258    ...     },1259    ...     "@tag": "moment"1260    ... }'''1261    >>> r = handler.post('/apijson/put', data=data, pre_call=pre_call_as("usera"), middlewares=[])1262    >>> d = json_loads(r.data)1263    >>> print(d)1264    {'code': 400, 'msg': 'id param needed'}1265    >>> #apijson put, with id not integer1266    >>> data ='''{1267    ...     "moment": {1268    ...         "id": "abc",1269    ...         "content": "moment content after change"1270    ...     },1271    ...     "@tag": "moment"1272    ... }'''1273    >>> r = handler.post('/apijson/put', data=data, pre_call=pre_call_as("usera"), middlewares=[])1274    >>> d = json_loads(r.data)1275    >>> print(d)1276    {'code': 400, 'msg': "id 'abc' cannot convert to integer"}1277    >>> #apijson put, with a id cannot find record1278    >>> data ='''{1279    ...     "moment": {1280    ...         "id": 100,1281    ...         "content": "moment content after change"1282    ...     },1283    ...     "@tag": "moment"1284    ... }'''1285    >>> r = handler.post('/apijson/put', data=data, pre_call=pre_call_as("usera"), middlewares=[])1286    >>> d = json_loads(r.data)1287    >>> print(d)1288    {'code': 400, 'msg': "cannot find record which id = '100'"}1289    >>> #apijson put, with a role which have no permission1290    >>> data ='''{1291    ...     "moment": {1292    ...         "@role": "LOGIN",1293    ...         "id": 1,1294    ...         "content": "moment content after change"1295    ...     },1296    ...     "@tag": "moment"1297    ... }'''1298    >>> r = handler.post('/apijson/put', data=data, pre_call=pre_call_as("usera"), middlewares=[])1299    >>> d = json_loads(r.data)1300    >>> print(d)1301    {'code': 400, 'msg': "'moment' not accessible by role 'LOGIN'"}1302    >>> #apijson put, with UNKNOWN role1303    >>> data ='''{1304    ...     "publicnotice": {1305    ...         "@role": "UNKNOWN",1306    ...         "id": 1,1307    ...         "content": "content after change"1308    ...     },1309    ...     "@tag": "publicnotice"1310    ... }'''1311    >>> r = handler.post('/apijson/put', data=data, pre_call=pre_call_as("usera"), middlewares=[])1312    >>> d = json_loads(r.data)1313    >>> print(d)1314    {'code': 200, 'msg': 'success', 'publicnotice': {'id': 1, 'code': 200, 'msg': 'success', 'count': 1}}1315    >>> #apijson put, OWNER but not login1316    >>> data ='''{1317    ...     "moment": {1318    ...         "id": 1,1319    ...         "content": "moment content after change"1320    ...     },1321    ...     "@tag": "moment"1322    ... }'''1323    >>> r = handler.post('/apijson/put', data=data, middlewares=[])1324    >>> d = json_loads(r.data)1325    >>> print(d)1326    {'code': 400, 'msg': "'OWNER' need login user"}1327    >>> #apijson put, with a role not permission1328    >>> data ='''{1329    ...     "moment": {1330    ...         "@role": "superuser",1331    ...         "id": 1,1332    ...         "content": "moment content after change"1333    ...     },1334    ...     "@tag": "moment"1335    ... }'''1336    >>> r = handler.post('/apijson/put', data=data, pre_call=pre_call_as("admin"), middlewares=[])1337    >>> d = json_loads(r.data)1338    >>> print(d)1339    {'code': 400, 'msg': "'moment' not accessible by role 'superuser'"}1340    >>> #apijson put, with a field DISALLOWed1341    >>> data ='''{1342    ...     "comment": {1343    ...         "id": 2,1344    ...         "user_id": 2,1345    ...         "content": "content after change"1346    ...     },1347    ...     "@tag": "comment"1348    ... }'''1349    >>> r = handler.post('/apijson/put', data=data, pre_call=pre_call_as("usera"), middlewares=[])1350    >>> d = json_loads(r.data)1351    >>> print(d)1352    {'code': 400, 'msg': "request 'comment' disallow 'user_id'"}1353    >>> #apijson put, without a field in NECESSARY1354    >>> data ='''{1355    ...     "moment": {1356    ...         "id": 11357    ...     },1358    ...     "@tag": "moment"1359    ... }'''1360    >>> r = handler.post('/apijson/put', data=data, pre_call=pre_call_as("usera"), middlewares=[])1361    >>> d = json_loads(r.data)1362    >>> print(d)1363    {'code': 400, 'msg': "request 'moment' have not necessary field 'content'"}1364    >>> #apijson put, with a non exist field1365    >>> data ='''{1366    ...     "moment": {1367    ...         "id": 1,1368    ...         "content": "moment content after change",1369    ...         "noexist": "test"1370    ...     },1371    ...     "@tag": "moment"1372    ... }'''1373    >>> r = handler.post('/apijson/put', data=data, pre_call=pre_call_as("usera"), middlewares=[])1374    >>> d = json_loads(r.data)1375    >>> print(d)1376    {'code': 400, 'msg': "'moment' don't have field 'noexist'"}1377    >>> #apijson put, and check get result1378    >>> data ='''{1379    ...     "moment": {1380    ...         "id": 1,1381    ...         "content": "check 789"1382    ...     },1383    ...     "@tag": "moment"1384    ... }'''1385    >>> r = handler.post('/apijson/put', data=data, pre_call=pre_call_as("usera"), middlewares=[])1386    >>> d = json_loads(r.data)1387    >>> print(d)1388    {'code': 200, 'msg': 'success', 'moment': {'id': 1, 'code': 200, 'msg': 'success', 'count': 1}}1389    >>> data ='''{1390    ... "moment":{1391    ...         "id": %s1392    ...     }1393    ... }'''%(d['moment']['id'])1394    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("usera"), middlewares=[])1395    >>> d = json_loads(r.data)1396    >>> print(d)1397    {'code': 200, 'msg': 'success', 'moment': {'user_id': 2, 'date': '2018-11-01 00:00:00', 'content': 'check 789', 'picture_list': '[]', 'id': 1}}1398    >>> #apijson put, and check get result1399    >>> data ='''{1400    ...     "moment": {1401    ...         "id": 1,1402    ...         "content": "check 789"1403    ...     },1404    ...     "@tag": "moment"1405    ... }'''1406    >>> r = handler.post('/apijson/put', data=data, pre_call=pre_call_as("usera"), middlewares=[])1407    >>> d = json_loads(r.data)1408    >>> print(d)1409    {'code': 400, 'msg': 'failed when updating, maybe no change', 'moment': {'id': 1, 'code': 400, 'msg': 'failed when updating, maybe no change', 'count': 0}}1410    """1411def test_apijson_delete():1412    """1413    >>> application = make_simple_application(project_dir='.')1414    >>> handler = application.handler()1415    >>> #apijson delete1416    >>> data ='''{1417    ...     "moment": {1418    ...         "id": 11419    ...     },1420    ...     "@tag": "moment"1421    ... }'''1422    >>> r = handler.post('/apijson/delete', data=data, pre_call=pre_call_as("usera"), middlewares=[])1423    >>> d = json_loads(r.data)1424    >>> print(d)1425    {'code': 200, 'msg': 'success', 'moment': {'id': 1, 'code': 200, 'message': 'success', 'count': 1}}1426    >>> data ='''{1427    ...     "moment": {1428    ...         "id": 11429    ...     }1430    ... }'''1431    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("usera"), middlewares=[])1432    >>> d = json_loads(r.data)1433    >>> print(d)1434    {'code': 200, 'msg': 'success', 'moment': None}1435    >>> #apijson delete, without @tag1436    >>> data ='''{1437    ...     "moment": {1438    ...         "content": "new moment for test"1439    ...     },1440    ...     "@tag": "moment"1441    ... }'''1442    >>> r = handler.post('/apijson/post', data=data, pre_call=pre_call_as("usera"), middlewares=[])1443    >>> d = json_loads(r.data)1444    >>> data ='''{1445    ...     "moment": {1446    ...         "id": %s1447    ...     }1448    ... }'''%(d["moment"]["id"])1449    >>> r = handler.post('/apijson/delete', data=data, pre_call=pre_call_as("usera"), middlewares=[])1450    >>> d = json_loads(r.data)1451    >>> print(d)1452    {'code': 400, 'msg': "'tag' parameter is needed"}1453    >>> #apijson delete, with non exist model1454    >>> data ='''{1455    ...     "nonexist": {1456    ...         "id": 11457    ...     },1458    ...     "@tag": "nonexist"1459    ... }'''1460    >>> r = handler.post('/apijson/delete', data=data, pre_call=pre_call_as("usera"), middlewares=[])1461    >>> d = json_loads(r.data)1462    >>> print(d)1463    {'code': 400, 'msg': "model 'nonexist' not found"}1464    >>> #apijson delete, try to delete other's moment1465    >>> data ='''{1466    ...     "moment": {1467    ...         "id": 21468    ...     },1469    ...     "@tag": "moment"1470    ... }'''1471    >>> r = handler.post('/apijson/delete', data=data, pre_call=pre_call_as("usera"), middlewares=[])1472    >>> d = json_loads(r.data)1473    >>> print(d)1474    {'code': 400, 'msg': 'no role to access the data'}1475    >>> #apijson delete, without id1476    >>> data ='''{1477    ...     "moment": {1478    ...     },1479    ...     "@tag": "moment"1480    ... }'''1481    >>> r = handler.post('/apijson/delete', data=data, pre_call=pre_call_as("usera"), middlewares=[])1482    >>> d = json_loads(r.data)1483    >>> print(d)1484    {'code': 400, 'msg': 'id param needed'}1485    >>> #apijson delete, id not int1486    >>> data ='''{1487    ...     "moment": {1488    ...         "id": "abc"1489    ...     },1490    ...     "@tag": "moment"1491    ... }'''1492    >>> r = handler.post('/apijson/delete', data=data, pre_call=pre_call_as("usera"), middlewares=[])1493    >>> d = json_loads(r.data)1494    >>> print(d)1495    {'code': 400, 'msg': "id 'abc' cannot convert to integer"}1496    >>> #apijson delete1497    >>> data ='''{1498    ...     "moment": {1499    ...         "id": 1001500    ...     },1501    ...     "@tag": "moment"1502    ... }'''1503    >>> r = handler.post('/apijson/delete', data=data, pre_call=pre_call_as("usera"), middlewares=[])1504    >>> d = json_loads(r.data)1505    >>> print(d)1506    {'code': 400, 'msg': "cannot find record id = '100'"}1507    >>> #apijson delete, with a role having no permission1508    >>> data ='''{1509    ...     "moment": {1510    ...         "content": "new moment for test"1511    ...     },1512    ...     "@tag": "moment"1513    ... }'''1514    >>> r = handler.post('/apijson/post', data=data, pre_call=pre_call_as("usera"), middlewares=[])1515    >>> d = json_loads(r.data)1516    >>> data ='''{1517    ...     "moment": {1518    ...         "@role": "UNKNOWN",1519    ...         "id": %s1520    ...     },1521    ...     "@tag": "moment"1522    ... }'''%(d["moment"]["id"])1523    >>> r = handler.post('/apijson/delete', data=data, pre_call=pre_call_as("usera"), middlewares=[])1524    >>> d = json_loads(r.data)1525    >>> print(d)1526    {'code': 400, 'msg': "role 'UNKNOWN' has no permission to access the data"}1527    >>> #apijson delete, with OWNER but not login1528    >>> data ='''{1529    ...     "moment": {1530    ...         "content": "new moment for test"1531    ...     },1532    ...     "@tag": "moment"1533    ... }'''1534    >>> r = handler.post('/apijson/post', data=data, pre_call=pre_call_as("usera"), middlewares=[])1535    >>> d = json_loads(r.data)1536    >>> data ='''{1537    ...     "moment": {1538    ...         "id": %s1539    ...     },1540    ...     "@tag": "moment"1541    ... }'''%(d["moment"]["id"])1542    >>> r = handler.post('/apijson/delete', data=data, middlewares=[])1543    >>> d = json_loads(r.data)1544    >>> print(d)1545    {'code': 400, 'msg': 'no role to access the data'}1546    >>> #apijson delete, with UNKNOWN role1547    >>> data ='''{1548    ...     "publicnotice": {1549    ...         "@role": "UNKNOWN",1550    ...         "id": 11551    ...     },1552    ...     "@tag": "publicnotice"1553    ... }'''1554    >>> r = handler.post('/apijson/delete', data=data, middlewares=[])1555    >>> d = json_loads(r.data)1556    >>> print(d)1557    {'code': 200, 'msg': 'success', 'publicnotice': {'id': 1, 'code': 200, 'message': 'success', 'count': 1}}1558    >>> #apijson delete, with a role which have no permission1559    >>> data ='''{1560    ...     "moment": {1561    ...         "content": "new moment for test"1562    ...     },1563    ...     "@tag": "moment"1564    ... }'''1565    >>> r = handler.post('/apijson/post', data=data, pre_call=pre_call_as("usera"), middlewares=[])1566    >>> d = json_loads(r.data)1567    >>> data ='''{1568    ...     "moment": {1569    ...         "@role": "superuser",1570    ...         "id": %s1571    ...     },1572    ...     "@tag": "moment"1573    ... }'''%(d["moment"]["id"])1574    >>> r = handler.post('/apijson/delete', data=data, pre_call=pre_call_as("admin"), middlewares=[])1575    >>> d = json_loads(r.data)1576    >>> print(d)1577    {'code': 400, 'msg': "role 'superuser' has no permission to access the data"}1578    """1579def test_apijson_permission():1580    """1581    >>> application = make_simple_application(project_dir='.')1582    >>> handler = application.handler()1583    >>> #apijson get, query with id, access with owner1584    >>> data ='''{1585    ... "comment2":{1586    ...         "id": 11587    ...     }1588    ... }'''1589    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("admin"), middlewares=[])1590    >>> d = json_loads(r.data)1591    >>> print(d)1592    {'code': 200, 'msg': 'success', 'comment2': {'user_id': 1, 'to_id': 3, 'moment_id': 1, 'date': '2018-11-01 00:00:00', 'content': 'comment from admin', 'id': 1}}1593    >>> #apijson get, query with id, access other's comment, expect empty result1594    >>> data ='''{1595    ... "comment2":{1596    ...         "id": 11597    ...     }1598    ... }'''1599    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("userb"), middlewares=[])1600    >>> d = json_loads(r.data)1601    >>> print(d)1602    {'code': 200, 'msg': 'success', 'comment2': None}1603    >>> #apijson get, query array1604    >>> data ='''{1605    ... "comment2":{1606    ...     }1607    ... }'''1608    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("usera"), middlewares=[])1609    >>> d = json_loads(r.data)1610    >>> print(d)1611    {'code': 200, 'msg': 'success', 'comment2': {'user_id': 2, 'to_id': 3, 'moment_id': 1, 'date': '2018-12-01 00:00:00', 'content': 'comment from usera to userb', 'id': 2}}1612    >>> #apijson get, query one with admin as OWNER1613    >>> data ='''{1614    ... "comment2":{1615    ...        "@role":"OWNER"1616    ...     }1617    ... }'''1618    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("admin"), middlewares=[])1619    >>> d = json_loads(r.data)1620    >>> print(d)1621    {'code': 200, 'msg': 'success', 'comment2': {'user_id': 1, 'to_id': 3, 'moment_id': 1, 'date': '2018-11-01 00:00:00', 'content': 'comment from admin', 'id': 1}}1622    >>> #apijson get, query one with admin as ADMIN1623    >>> data ='''{1624    ... "comment2":{1625    ...        "@role":"ADMIN",1626    ...        "user_id": 21627    ...     }1628    ... }'''1629    >>> r = handler.post('/apijson/get', data=data, pre_call=pre_call_as("admin"), middlewares=[])1630    >>> d = json_loads(r.data)1631    >>> print(d)1632    {'code': 200, 'msg': 'success', 'comment2': {'user_id': 2, 'to_id': 3, 'moment_id': 1, 'date': '2018-12-01 00:00:00', 'content': 'comment from usera to userb', 'id': 2}}1633    >>> #apijson head1634    >>> data ='''{1635    ...     "comment2": {1636    ...         "user_id": 11637    ...     }1638    ... }'''1639    >>> r = handler.post('/apijson/head', data=data, pre_call=pre_call_as("userc"), middlewares=[])1640    >>> d = json_loads(r.data)1641    >>> print(d)1642    {'code': 200, 'msg': 'success', 'comment2': {'code': 200, 'msg': 'success', 'count': 0}}1643    >>> #apijson delete with a user which have no permission1644    >>> data ='''{1645    ...     "comment2": {1646    ...         "id": 11647    ...     },1648    ...     "@tag": "comment2"1649    ... }'''1650    >>> r = handler.post('/apijson/delete', data=data, pre_call=pre_call_as("userc"), middlewares=[])1651    >>> d = json_loads(r.data)1652    >>> print(d)1653    {'code': 400, 'msg': 'no permission'}1654    >>> #apijson delete with permission, ADMIN1655    >>> data ='''{1656    ...     "comment2": {1657    ...         "id": 11658    ...     },1659    ...     "@tag": "comment2"1660    ... }'''1661    >>> r = handler.post('/apijson/delete', data=data, pre_call=pre_call_as("admin"), middlewares=[])1662    >>> d = json_loads(r.data)1663    >>> print(d)1664    {'code': 200, 'msg': 'success', 'comment2': {'id': 1, 'code': 200, 'message': 'success', 'count': 1}}...test_app.py
Source:test_app.py  
...20    from zmq.eventloop.ioloop import ZMQIOLoop21    HAS_ZMQ_IOLOOP = True22except ImportError:23    HAS_ZMQ_IOLOOP = False24def json_loads(data):25    if six.PY3 and isinstance(data, bytes):26        data = data.decode('utf-8')27    return json.loads(data)28class _SaltnadoIntegrationTestCase(SaltnadoTestCase):  # pylint: disable=abstract-method29    @property30    def opts(self):31        return self.get_config('client_config', from_scratch=True)32    @property33    def mod_opts(self):34        return self.get_config('minion', from_scratch=True)35@skipIf(HAS_ZMQ_IOLOOP is False, 'PyZMQ version must be >= 14.0.1 to run these tests.')36@skipIf(StrictVersion(zmq.__version__) < StrictVersion('14.0.1'), 'PyZMQ must be >= 14.0.1 to run these tests.')37class TestSaltAPIHandler(_SaltnadoIntegrationTestCase):38    def get_app(self):39        urls = [('/', saltnado.SaltAPIHandler)]40        application = self.build_tornado_app(urls)41        application.event_listener = saltnado.EventListener({}, self.opts)42        return application43    def test_root(self):44        '''45        Test the root path which returns the list of clients we support46        '''47        response = self.fetch('/',48                              connect_timeout=30,49                              request_timeout=30,50                )51        self.assertEqual(response.code, 200)52        response_obj = json_loads(response.body)53        self.assertEqual(sorted(response_obj['clients']),54                         ['local', 'local_async', 'runner', 'runner_async'])55        self.assertEqual(response_obj['return'], 'Welcome')56    def test_post_no_auth(self):57        '''58        Test post with no auth token, should 40159        '''60        # get a token for this test61        low = [{'client': 'local',62                'tgt': '*',63                'fun': 'test.ping',64                }]65        response = self.fetch('/',66                              method='POST',67                              body=json.dumps(low),68                              headers={'Content-Type': self.content_type_map['json']},69                              follow_redirects=False,70                              connect_timeout=30,71                              request_timeout=30,72                              )73        self.assertEqual(response.code, 302)74        self.assertEqual(response.headers['Location'], '/login')75    # Local client tests76    @skipIf(True, 'to be re-enabled when #23623 is merged')77    def test_simple_local_post(self):78        '''79        Test a basic API of /80        '''81        low = [{'client': 'local',82                'tgt': '*',83                'fun': 'test.ping',84                }]85        response = self.fetch('/',86                              method='POST',87                              body=json.dumps(low),88                              headers={'Content-Type': self.content_type_map['json'],89                                       saltnado.AUTH_TOKEN_HEADER: self.token['token']},90                              connect_timeout=30,91                              request_timeout=30,92                              )93        response_obj = json_loads(response.body)94        self.assertEqual(response_obj['return'], [{'minion': True, 'sub_minion': True}])95    def test_simple_local_post_no_tgt(self):96        '''97        POST job with invalid tgt98        '''99        low = [{'client': 'local',100                'tgt': 'minion_we_dont_have',101                'fun': 'test.ping',102                }]103        response = self.fetch('/',104                              method='POST',105                              body=json.dumps(low),106                              headers={'Content-Type': self.content_type_map['json'],107                                       saltnado.AUTH_TOKEN_HEADER: self.token['token']},108                              connect_timeout=30,109                              request_timeout=30,110                              )111        response_obj = json_loads(response.body)112        self.assertEqual(response_obj['return'], ["No minions matched the target. No command was sent, no jid was assigned."])113    # local client request body test114    @skipIf(True, 'Undetermined race condition in test. Temporarily disabled.')115    def test_simple_local_post_only_dictionary_request(self):116        '''117        Test a basic API of /118        '''119        low = {'client': 'local',120                'tgt': '*',121                'fun': 'test.ping',122              }123        response = self.fetch('/',124                              method='POST',125                              body=json.dumps(low),126                              headers={'Content-Type': self.content_type_map['json'],127                                       saltnado.AUTH_TOKEN_HEADER: self.token['token']},128                              connect_timeout=30,129                              request_timeout=30,130                              )131        response_obj = json_loads(response.body)132        self.assertEqual(response_obj['return'], [{'minion': True, 'sub_minion': True}])133    def test_simple_local_post_invalid_request(self):134        '''135        Test a basic API of /136        '''137        low = ["invalid request"]138        response = self.fetch('/',139                              method='POST',140                              body=json.dumps(low),141                              headers={'Content-Type': self.content_type_map['json'],142                                       saltnado.AUTH_TOKEN_HEADER: self.token['token']},143                              connect_timeout=30,144                              request_timeout=30,145                              )146        self.assertEqual(response.code, 400)147    # local_async tests148    def test_simple_local_async_post(self):149        low = [{'client': 'local_async',150                'tgt': '*',151                'fun': 'test.ping',152                }]153        response = self.fetch('/',154                              method='POST',155                              body=json.dumps(low),156                              headers={'Content-Type': self.content_type_map['json'],157                                       saltnado.AUTH_TOKEN_HEADER: self.token['token']},158                              )159        response_obj = json_loads(response.body)160        ret = response_obj['return']161        ret[0]['minions'] = sorted(ret[0]['minions'])162        # TODO: verify pub function? Maybe look at how we test the publisher163        self.assertEqual(len(ret), 1)164        self.assertIn('jid', ret[0])165        self.assertEqual(ret[0]['minions'], sorted(['minion', 'sub_minion']))166    def test_multi_local_async_post(self):167        low = [{'client': 'local_async',168                'tgt': '*',169                'fun': 'test.ping',170                },171                {'client': 'local_async',172                'tgt': '*',173                'fun': 'test.ping',174                }]175        response = self.fetch('/',176                              method='POST',177                              body=json.dumps(low),178                              headers={'Content-Type': self.content_type_map['json'],179                                       saltnado.AUTH_TOKEN_HEADER: self.token['token']},180                              )181        response_obj = json_loads(response.body)182        ret = response_obj['return']183        ret[0]['minions'] = sorted(ret[0]['minions'])184        ret[1]['minions'] = sorted(ret[1]['minions'])185        self.assertEqual(len(ret), 2)186        self.assertIn('jid', ret[0])187        self.assertIn('jid', ret[1])188        self.assertEqual(ret[0]['minions'], sorted(['minion', 'sub_minion']))189        self.assertEqual(ret[1]['minions'], sorted(['minion', 'sub_minion']))190    def test_multi_local_async_post_multitoken(self):191        low = [{'client': 'local_async',192                'tgt': '*',193                'fun': 'test.ping',194                },195                {'client': 'local_async',196                'tgt': '*',197                'fun': 'test.ping',198                'token': self.token['token'],  # send a different (but still valid token)199                },200                {'client': 'local_async',201                'tgt': '*',202                'fun': 'test.ping',203                'token': 'BAD_TOKEN',  # send a bad token204                },205                ]206        response = self.fetch('/',207                              method='POST',208                              body=json.dumps(low),209                              headers={'Content-Type': self.content_type_map['json'],210                                       saltnado.AUTH_TOKEN_HEADER: self.token['token']},211                              )212        response_obj = json_loads(response.body)213        ret = response_obj['return']214        ret[0]['minions'] = sorted(ret[0]['minions'])215        ret[1]['minions'] = sorted(ret[1]['minions'])216        self.assertEqual(len(ret), 3)  # make sure we got 3 responses217        self.assertIn('jid', ret[0])  # the first 2 are regular returns218        self.assertIn('jid', ret[1])219        self.assertIn('Failed to authenticate', ret[2])  # bad auth220        self.assertEqual(ret[0]['minions'], sorted(['minion', 'sub_minion']))221        self.assertEqual(ret[1]['minions'], sorted(['minion', 'sub_minion']))222    def test_simple_local_async_post_no_tgt(self):223        low = [{'client': 'local_async',224                'tgt': 'minion_we_dont_have',225                'fun': 'test.ping',226                }]227        response = self.fetch('/',228                              method='POST',229                              body=json.dumps(low),230                              headers={'Content-Type': self.content_type_map['json'],231                                       saltnado.AUTH_TOKEN_HEADER: self.token['token']},232                              )233        response_obj = json_loads(response.body)234        self.assertEqual(response_obj['return'], [{}])235    # runner tests236    def test_simple_local_runner_post(self):237        low = [{'client': 'runner',238                'fun': 'manage.up',239                }]240        response = self.fetch('/',241                              method='POST',242                              body=json.dumps(low),243                              headers={'Content-Type': self.content_type_map['json'],244                                       saltnado.AUTH_TOKEN_HEADER: self.token['token']},245                              connect_timeout=30,246                              request_timeout=30,247                              )248        response_obj = json_loads(response.body)249        self.assertEqual(len(response_obj['return']), 1)250        self.assertEqual(set(response_obj['return'][0]), set(['minion', 'sub_minion']))251    # runner_async tests252    def test_simple_local_runner_async_post(self):253        low = [{'client': 'runner_async',254                'fun': 'manage.up',255                }]256        response = self.fetch('/',257                              method='POST',258                              body=json.dumps(low),259                              headers={'Content-Type': self.content_type_map['json'],260                                       saltnado.AUTH_TOKEN_HEADER: self.token['token']},261                              connect_timeout=10,262                              request_timeout=10,263                              )264        response_obj = json_loads(response.body)265        self.assertIn('return', response_obj)266        self.assertEqual(1, len(response_obj['return']))267        self.assertIn('jid', response_obj['return'][0])268        self.assertIn('tag', response_obj['return'][0])269@flaky270@skipIf(HAS_ZMQ_IOLOOP is False, 'PyZMQ version must be >= 14.0.1 to run these tests.')271class TestMinionSaltAPIHandler(_SaltnadoIntegrationTestCase):272    def get_app(self):273        urls = [(r"/minions/(.*)", saltnado.MinionSaltAPIHandler),274                (r"/minions", saltnado.MinionSaltAPIHandler),275                ]276        application = self.build_tornado_app(urls)277        application.event_listener = saltnado.EventListener({}, self.opts)278        return application279    @skipIf(True, 'issue #34753')280    def test_get_no_mid(self):281        response = self.fetch('/minions',282                              method='GET',283                              headers={saltnado.AUTH_TOKEN_HEADER: self.token['token']},284                              follow_redirects=False,285                              )286        response_obj = json_loads(response.body)287        self.assertEqual(len(response_obj['return']), 1)288        # one per minion289        self.assertEqual(len(response_obj['return'][0]), 2)290        # check a single grain291        for minion_id, grains in six.iteritems(response_obj['return'][0]):292            self.assertEqual(minion_id, grains['id'])293    @skipIf(True, 'to be re-enabled when #23623 is merged')294    def test_get(self):295        response = self.fetch('/minions/minion',296                              method='GET',297                              headers={saltnado.AUTH_TOKEN_HEADER: self.token['token']},298                              follow_redirects=False,299                              )300        response_obj = json_loads(response.body)301        self.assertEqual(len(response_obj['return']), 1)302        self.assertEqual(len(response_obj['return'][0]), 1)303        # check a single grain304        self.assertEqual(response_obj['return'][0]['minion']['id'], 'minion')305    def test_post(self):306        low = [{'tgt': '*',307                'fun': 'test.ping',308                }]309        response = self.fetch('/minions',310                              method='POST',311                              body=json.dumps(low),312                              headers={'Content-Type': self.content_type_map['json'],313                                       saltnado.AUTH_TOKEN_HEADER: self.token['token']},314                              )315        response_obj = json_loads(response.body)316        ret = response_obj['return']317        ret[0]['minions'] = sorted(ret[0]['minions'])318        # TODO: verify pub function? Maybe look at how we test the publisher319        self.assertEqual(len(ret), 1)320        self.assertIn('jid', ret[0])321        self.assertEqual(ret[0]['minions'], sorted(['minion', 'sub_minion']))322    def test_post_with_client(self):323        # get a token for this test324        low = [{'client': 'local_async',325                'tgt': '*',326                'fun': 'test.ping',327                }]328        response = self.fetch('/minions',329                              method='POST',330                              body=json.dumps(low),331                              headers={'Content-Type': self.content_type_map['json'],332                                       saltnado.AUTH_TOKEN_HEADER: self.token['token']},333                              )334        response_obj = json_loads(response.body)335        ret = response_obj['return']336        ret[0]['minions'] = sorted(ret[0]['minions'])337        # TODO: verify pub function? Maybe look at how we test the publisher338        self.assertEqual(len(ret), 1)339        self.assertIn('jid', ret[0])340        self.assertEqual(ret[0]['minions'], sorted(['minion', 'sub_minion']))341    def test_post_with_incorrect_client(self):342        '''343        The /minions endpoint is async only, so if you try something else344        make sure you get an error345        '''346        # get a token for this test347        low = [{'client': 'local',348                'tgt': '*',349                'fun': 'test.ping',350                }]351        response = self.fetch('/minions',352                              method='POST',353                              body=json.dumps(low),354                              headers={'Content-Type': self.content_type_map['json'],355                                       saltnado.AUTH_TOKEN_HEADER: self.token['token']},356                              )357        self.assertEqual(response.code, 400)358@skipIf(HAS_ZMQ_IOLOOP is False, 'PyZMQ version must be >= 14.0.1 to run these tests.')359class TestJobsSaltAPIHandler(_SaltnadoIntegrationTestCase):360    def get_app(self):361        urls = [(r"/jobs/(.*)", saltnado.JobsSaltAPIHandler),362                (r"/jobs", saltnado.JobsSaltAPIHandler),363                ]364        application = self.build_tornado_app(urls)365        application.event_listener = saltnado.EventListener({}, self.opts)366        return application367    @skipIf(True, 'to be re-enabled when #23623 is merged')368    def test_get(self):369        # test with no JID370        self.http_client.fetch(self.get_url('/jobs'),371                               self.stop,372                               method='GET',373                               headers={saltnado.AUTH_TOKEN_HEADER: self.token['token']},374                               follow_redirects=False,375                               )376        response = self.wait(timeout=30)377        response_obj = json_loads(response.body)['return'][0]378        try:379            for jid, ret in six.iteritems(response_obj):380                self.assertIn('Function', ret)381                self.assertIn('Target', ret)382                self.assertIn('Target-type', ret)383                self.assertIn('User', ret)384                self.assertIn('StartTime', ret)385                self.assertIn('Arguments', ret)386        except AttributeError as attribute_error:387            print(json_loads(response.body))388            raise389        # test with a specific JID passed in390        jid = next(six.iterkeys(response_obj))391        self.http_client.fetch(self.get_url('/jobs/{0}'.format(jid)),392                               self.stop,393                               method='GET',394                               headers={saltnado.AUTH_TOKEN_HEADER: self.token['token']},395                               follow_redirects=False,396                               )397        response = self.wait(timeout=30)398        response_obj = json_loads(response.body)['return'][0]399        self.assertIn('Function', response_obj)400        self.assertIn('Target', response_obj)401        self.assertIn('Target-type', response_obj)402        self.assertIn('User', response_obj)403        self.assertIn('StartTime', response_obj)404        self.assertIn('Arguments', response_obj)405        self.assertIn('Result', response_obj)406# TODO: run all the same tests from the root handler, but for now since they are407# the same code, we'll just sanity check408@skipIf(HAS_ZMQ_IOLOOP is False, 'PyZMQ version must be >= 14.0.1 to run these tests.')409class TestRunSaltAPIHandler(_SaltnadoIntegrationTestCase):410    def get_app(self):411        urls = [("/run", saltnado.RunSaltAPIHandler),412                ]413        application = self.build_tornado_app(urls)414        application.event_listener = saltnado.EventListener({}, self.opts)415        return application416    @skipIf(True, 'to be re-enabled when #23623 is merged')417    def test_get(self):418        low = [{'client': 'local',419                'tgt': '*',420                'fun': 'test.ping',421                }]422        response = self.fetch('/run',423                              method='POST',424                              body=json.dumps(low),425                              headers={'Content-Type': self.content_type_map['json'],426                                       saltnado.AUTH_TOKEN_HEADER: self.token['token']},427                              )428        response_obj = json_loads(response.body)429        self.assertEqual(response_obj['return'], [{'minion': True, 'sub_minion': True}])430@skipIf(HAS_ZMQ_IOLOOP is False, 'PyZMQ version must be >= 14.0.1 to run these tests.')431class TestEventsSaltAPIHandler(_SaltnadoIntegrationTestCase):432    def get_app(self):433        urls = [(r"/events", saltnado.EventsSaltAPIHandler),434                ]435        application = self.build_tornado_app(urls)436        application.event_listener = saltnado.EventListener({}, self.opts)437        # store a reference, for magic later!438        self.application = application439        self.events_to_fire = 0440        return application441    def test_get(self):442        self.events_to_fire = 5443        response = self.fetch('/events',444                              headers={saltnado.AUTH_TOKEN_HEADER: self.token['token']},445                              streaming_callback=self.on_event,446                              )447    def _stop(self):448        self.stop()449    def on_event(self, event):450        if six.PY3:451            event = event.decode('utf-8')452        if self.events_to_fire > 0:453            self.application.event_listener.event.fire_event({454                'foo': 'bar',455                'baz': 'qux',456            }, 'salt/netapi/test')457            self.events_to_fire -= 1458        # once we've fired all the events, lets call it a day459        else:460            # wait so that we can ensure that the next future is ready to go461            # to make sure we don't explode if the next one is ready462            ZMQIOLoop.current().add_timeout(time.time() + 0.5, self._stop)463        event = event.strip()464        # if we got a retry, just continue465        if event != 'retry: 400':466            tag, data = event.splitlines()467            self.assertTrue(tag.startswith('tag: '))468            self.assertTrue(data.startswith('data: '))469@skipIf(HAS_ZMQ_IOLOOP is False, 'PyZMQ version must be >= 14.0.1 to run these tests.')470class TestWebhookSaltAPIHandler(_SaltnadoIntegrationTestCase):471    def get_app(self):472        urls = [(r"/hook(/.*)?", saltnado.WebhookSaltAPIHandler),473                ]474        application = self.build_tornado_app(urls)475        self.application = application476        application.event_listener = saltnado.EventListener({}, self.opts)477        return application478    @skipIf(True, 'Skipping until we can devote more resources to debugging this test.')479    def test_post(self):480        self._future_resolved = threading.Event()481        try:482            def verify_event(future):483                '''484                Notify the threading event that the future is resolved485                '''486                self._future_resolved.set()487            self._finished = False  # TODO: remove after some cleanup of the event listener488            # get an event future489            future = self.application.event_listener.get_event(self,490                                                               tag='salt/netapi/hook',491                                                               callback=verify_event)492            # fire the event493            response = self.fetch('/hook',494                                  method='POST',495                                  body='foo=bar',496                                  headers={saltnado.AUTH_TOKEN_HEADER: self.token['token']},497                                  )498            response_obj = json_loads(response.body)499            self.assertTrue(response_obj['success'])500            resolve_future_timeout = 60501            self._future_resolved.wait(resolve_future_timeout)502            try:503                event = future.result()504            except Exception as exc:505                self.fail('Failed to resolve future under {} secs: {}'.format(resolve_future_timeout, exc))506            self.assertEqual(event['tag'], 'salt/netapi/hook')507            self.assertIn('headers', event['data'])508            self.assertEqual(509                event['data']['post'],510                {'foo': salt.utils.to_bytes('bar')}511            )512        finally:...basic_apis.py
Source:basic_apis.py  
...10    from json import loads as json_loads11def GetArticleJsonDataApi(article_url: str) -> Dict:12    request_url = article_url.replace("https://www.jianshu.com/", "https://www.jianshu.com/asimov/")13    source = httpx_get(request_url, headers=jianshu_request_header).content14    json_obj = json_loads(source)15    return json_obj16def GetArticleHtmlJsonDataApi(article_url: str) -> Dict:17    source = httpx_get(article_url, headers=PC_header).content18    html_obj = etree.HTML(source)19    json_obj = json_loads(html_obj.xpath("//script[@id='__NEXT_DATA__']/text()")[0])20    return json_obj21def GetArticleCommentsJsonDataApi(article_id: int, page: int, count: int,22                                  author_only: bool, order_by: str) -> Dict:23    params = {24        "page": page,25        "count": count,26        "author_only": author_only,27        "order_by": order_by28    }29    request_url = f"https://www.jianshu.com/shakespeare/notes/{article_id}/comments"30    source = httpx_get(request_url, params=params, headers=jianshu_request_header).content31    json_obj = json_loads(source)32    return json_obj33def GetBeikeIslandTradeRankListJsonDataApi(ranktype: Union[int, None], pageIndex: Union[int, None]) -> Dict:34    params = {35        "ranktype": ranktype,36        "pageIndex": pageIndex37    }38    source = httpx_post("https://www.beikeisland.com/api/Trade/getTradeRankList",39                        headers=BeikeIsland_request_header, json=params).content40    json_obj = json_loads(source)41    return json_obj42def GetBeikeIslandTradeListJsonDataApi(pageIndex: int, retype: int):43    params = {44        "pageIndex": pageIndex,45        "retype": retype46    }47    source = httpx_post("https://www.beikeisland.com/api/Trade/getTradeList",48                        headers=BeikeIsland_request_header, json=params).content49    json_obj = json_loads(source)50    return json_obj51def GetCollectionJsonDataApi(collection_url: str) -> Dict:52    request_url = collection_url.replace("https://www.jianshu.com/c/", "https://www.jianshu.com/asimov/collections/slug/")53    source = httpx_get(request_url, headers=jianshu_request_header).content54    json_obj = json_loads(source)55    return json_obj56def GetCollectionEditorsJsonDataApi(collection_id: int, page: int) -> Dict:57    request_url = f"https://www.jianshu.com/collections/{collection_id}/editors"58    params = {59        "page": page60    }61    source = httpx_get(request_url, params=params, headers=jianshu_request_header).content62    json_obj = json_loads(source)63    return json_obj64def GetCollectionRecommendedWritersJsonDataApi(collection_id: int, page: int, count: int) -> Dict:65    params = {66        "collection_id": collection_id,67        "page": page,68        "count": count69    }70    source = httpx_get("https://www.jianshu.com/collections/recommended_users",71                       params=params, headers=jianshu_request_header).content72    json_obj = json_loads(source)73    return json_obj74def GetCollectionSubscribersJsonDataApi(collection_id: int, max_sort_id: int) -> Dict:75    request_url = f"https://www.jianshu.com/collection/{collection_id}/subscribers"76    params = {77        "max_sort_id": max_sort_id78    }79    source = httpx_get(request_url, params=params, headers=jianshu_request_header).content80    json_obj = json_loads(source)81    return json_obj82def GetCollectionArticlesJsonDataApi(collection_slug: str, page: int, count: int, order_by: str) -> Dict:83    request_url = f"https://www.jianshu.com/asimov/collections/slug/{collection_slug}/public_notes"84    params = {85        "page": page,86        "count": count,87        "order_by": order_by88    }89    source = httpx_get(request_url, params=params, headers=jianshu_request_header).content90    json_obj = json_loads(source)91    return json_obj92def GetIslandJsonDataApi(island_url: str) -> Dict:93    request_url = island_url.replace("https://www.jianshu.com/g/", "https://www.jianshu.com/asimov/groups/")94    source = httpx_get(request_url, headers=jianshu_request_header).content95    json_obj = json_loads(source)96    return json_obj97def GetIslandPostsJsonDataApi(group_slug: str, max_id: int,98                              count: int, topic_id: int, order_by: str):99    params = {100        "group_slug": group_slug,101        "order_by": order_by,102        "max_id": max_id,103        "count": count,104        "topic_id": topic_id105    }106    source = httpx_get("https://www.jianshu.com/asimov/posts",107                       params=params, headers=jianshu_request_header).content108    json_obj = json_loads(source)109    return json_obj110def GetNotebookJsonDataApi(notebook_url: str) -> Dict:111    request_url = notebook_url.replace("https://www.jianshu.com/", "https://www.jianshu.com/asimov/")112    source = httpx_get(request_url, headers=jianshu_request_header).content113    json_obj = json_loads(source)114    return json_obj115def GetNotebookArticlesJsonDataApi(notebook_url: str, page: int,116                                   count: int, order_by: str) -> Dict:117    request_url = notebook_url.replace("https://www.jianshu.com/nb/",118                                       "https://www.jianshu.com/asimov/notebooks/") + "/public_notes/"119    params = {120        "page": page,121        "count": count,122        "order_by": order_by123    }124    source = httpx_get(request_url, params=params, headers=jianshu_request_header).content125    json_obj = json_loads(source)126    return json_obj127def GetAssetsRankJsonDataApi(max_id: int, since_id: int) -> Dict:128    params = {129        "max_id": max_id,130        "since_id": since_id131    }132    source = httpx_get("https://www.jianshu.com/asimov/fp_rankings", params=params, headers=jianshu_request_header).content133    json_obj = json_loads(source)134    return json_obj135def GetDailyArticleRankListJsonDataApi() -> Dict:136    source = httpx_get("https://www.jianshu.com/asimov/daily_activity_participants/rank", headers=jianshu_request_header).content137    json_obj = json_loads(source)138    return json_obj139def GetArticlesFPRankListJsonDataApi(date: str, type_: str) -> Dict:  # é¿å
è¦çå
ç½®å½æ°140    params = {141        "date": date,142        "type": type_143    }144    source = httpx_get("https://www.jianshu.com/asimov/fp_rankings/voter_notes", params=params, headers=jianshu_request_header).content145    json_obj = json_loads(source)146    return json_obj147def GetUserJsonDataApi(user_url: str) -> Dict:148    request_url = user_url.replace("https://www.jianshu.com/u/", "https://www.jianshu.com/asimov/users/slug/")149    source = httpx_get(request_url, headers=jianshu_request_header).content150    json_obj = json_loads(source)151    return json_obj152def GetUserPCHtmlDataApi(user_url: str) -> Dict:153    source = httpx_get(user_url, headers=PC_header).content154    html_obj = etree.HTML(source)155    return html_obj156def GetUserCollectionsAndNotebooksJsonDataApi(user_url: str, user_slug: str) -> Dict:157    request_url = user_url.replace("/u/", "/users/") + "/collections_and_notebooks"158    params = {159        "slug": user_slug160    }161    source = httpx_get(request_url, headers=jianshu_request_header, params=params).content162    json_obj = json_loads(source)163    return json_obj164def GetUserArticlesListJsonDataApi(user_url: str, page: int,165                                   count: int, order_by: str) -> Dict:166    request_url = user_url.replace("/u/", "/asimov/users/slug/") + "/public_notes"167    params = {168        "page": page,169        "count": count,170        "order_by": order_by171    }172    source = httpx_get(request_url, headers=jianshu_request_header, params=params).content173    json_obj = json_loads(source)174    return json_obj175def GetUserFollowingListHtmlDataApi(user_url: str, page: int):176    request_url = user_url.replace("/u/", "/users/") + "/following"177    params = {178        "page": page179    }180    source = httpx_get(request_url, headers=PC_header, params=params).content181    html_obj = etree.HTML(source)182    return html_obj183def GetUserFollowersListHtmlDataApi(user_url: str, page: int):184    request_url = user_url.replace("/u/", "/users/") + "/followers"185    params = {186        "page": page187    }188    source = httpx_get(request_url, headers=PC_header, params=params).content189    html_obj = etree.HTML(source)190    return html_obj191def GetUserNextAnniversaryDayHtmlDataApi(user_slug: str):192    request_url = f"https://www.jianshu.com/mobile/u/{user_slug}/anniversary"193    source = httpx_get(request_url, headers=mobile_header).content194    html_obj = etree.HTML(source)195    return html_obj196def GetIslandPostJsonDataApi(post_slug: str) -> List[Dict]:197    request_url = f"https://www.jianshu.com/asimov/posts/{post_slug}"198    source = httpx_get(request_url, headers=jianshu_request_header).content199    json_obj = json_loads(source)200    return json_obj201def GetUserTimelineHtmlDataApi(uslug: str, max_id: int) -> Dict:202    request_url = f"https://www.jianshu.com/users/{uslug}/timeline"203    params = {204        "max_id": max_id205    }206    source = httpx_get(request_url, headers=PC_header, params=params).content207    html_obj = etree.HTML(source)...test_json.py
Source:test_json.py  
...8from lib.NetconfMission import XmlMission9from lib.NetconfMission import CommandMission as RunCommand101112def json_loads(raw):13    if raw is not None:14        try:15            ret = json.loads(raw)16        except ValueError:17            print("error: check the json format")18            exit(1)19        except Exception:20            pass21        return ret22    pass232425def xml_mission_hosts_count(hosts_conf):26    a = json_loads(hosts_conf)27    print(len(a))28    return len(a)293031def xml_mission_xml_count(hosts_conf, host_key):32    a = json_loads(hosts_conf)33    if a.has_key("{}".format(str(int(host_key)-1))):34        b = a["{}".format(str(int(host_key)-1))]["xml_file"]35    else:36        print("error hosts_conf, no exist key")37        b = ""38    print(len(b))39    return len(b)404142def SingleXmlMission(hosts_conf, host_key, xml_key, path):43    a = json_loads(hosts_conf)44    tmp = a["{}".format(str(int(host_key)-1))]['xml_file'][str(int(xml_key)-1)]45    a["{}".format(str(int(host_key) - 1))]['xml_file'] = {}46    a["{}".format(str(int(host_key) - 1))]['xml_file']["0"] = tmp47    XmlMission("{}".format(path), a).run()484950def CommandMission(hosts_conf, mission, path, *args):51    print(args)52    a = json_loads(hosts_conf)53    try:54        value = json_loads(args[0])55        print("arguments: {} ".format(value))56    except:57        value = {}58        print("use the default arguments in configuration")59    try:60        result = RunCommand(path, a).run(mission, **value)61    except Exception:62        pass63    return result6465def json_test(value_set):66    a = json_loads(value_set)67    print(a)68697071def check_irf_config(hosts_conf, path):72    print("rebooting switch ...")73    a = json_loads(hosts_conf)74    time.sleep(180)75    for n in range(21):76        time.sleep(60)77        try:78            if n == 20:79                print('connect timeout .')80                exit(1)81            b = RunCommand(path, a).run("get_config", super_module='IRF', sub_module='IRFPorts')82            if len(re.findall(r'<MemberID>', b)) > 1:83                print("irf_config ok...")84                break85        except Exception :86            pass87
...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
