How to use requires_auth method in Splinter

Best Python code snippet using splinter

endpoint_tests.py

Source:endpoint_tests.py Github

copy

Full Screen

1#!/usr/bin/env python2# -*- coding: utf-8 -*-3"""4/***************************************************************************5 endpoint_tests.py6 Performs GET/POST and token tests on a test repository endpoint loaded7 with the test plugins as explained in the README.md8 WARNING: the wrong credentials tests must be disabled (skipped) to avoid Auth09 IP and account locks.10 Test matrix11 plugin auth required role12 1 no -13 2 yes Registered Users,DesktopBasic,DesktopEnterprise14 3 yes -15 4 yes DesktopEnterprise16Tested Authentication methods:17- HTTP Basic with username and password method GET18- HTTP Basic with username and password method POST19- Authorization Request Header Field access token (Bearer) method POST20- Access token passed on the query string as `access_token` method GET21- Form-Encoded Body Parameter access token passed as `access_token` method POST22 -------------------23 begin : 2016-06-2824 git sha : $Format:%H$25 copyright : (C) 2016 by26 Alessandro Pasotti/Boundless Spatial Inc.27 email : apasotti@boundlessgeo.com28 ***************************************************************************/29/***************************************************************************30 * *31 * This program is free software; you can redistribute it and/or modify *32 * it under the terms of the GNU General Public License as published by *33 * the Free Software Foundation; either version 2 of the License, or *34 * (at your option) any later version. *35 * *36 ***************************************************************************/37"""38import sys39import os40import json41import unittest42import urllib43import urllib244import base6445env = None46AUTH0_ENV = os.path.join(os.path.expanduser("~"), '.auth0.env')47def env_err():48 """Print the env error message and die"""49 print("A .env file with sample accounts is needed to run this tests.")50 print("""The .env must contain the Auth0 configuration and:51QGIS_FREE_USERNAME=********* # with Registered Users permissions52QGIS_FREE_PASSWORD=*********53QGIS_BASIC_USERNAME=******** # with DesktopBasic and Suite permissions54QGIS_BASIC_PASSWORD=********55QGIS_ENTERPRISE_USERNAME=*** # with DesktopEnterprise and Suite permissions56QGIS_ENTERPRISE_PASSWORD=***57 """)58 sys.exit(1)59try:60 from dotenv import Dotenv61 if os.path.exists(AUTH0_ENV):62 env = Dotenv(AUTH0_ENV)63 else:64 env = Dotenv('../.env')65except IOError:66 env_err()67XML_ENDPOINT = os.environ.get('XML_ENDPOINT', 'https://qgis.boundless.test/plugins/')68API_ENDPOINT = os.environ.get('API_ENDPOINT', 'https://qgis.boundless.test/api/')69try:70 DESKTOP_ROLE_ACCOUNTS = {71 'Registered Users': (env['QGIS_FREE_USERNAME'], env['QGIS_FREE_PASSWORD']),72 'DesktopBasic': (env['QGIS_BASIC_USERNAME'], env['QGIS_BASIC_PASSWORD']),73 'DesktopEnterprise' : (env['QGIS_ENTERPRISE_USERNAME'], env['QGIS_ENTERPRISE_PASSWORD']),74 }75except KeyError:76 env_err()77class TestAuth0Base(unittest.TestCase):78 _tokens_cache = {}79 endpoint = XML_ENDPOINT80 api_endpoint = API_ENDPOINT81 xml = None82 def _get_download_ur(self, plugin_name, requires_auth=False):83 """Get it from XML"""84 if self.xml is None:85 import xml.etree.ElementTree as ET86 self.xml = ET.fromstring(self._do_get(self.endpoint + 'plugins.xml').read())87 # Search the plugins name and get the URL88 return [p for p in self.xml.findall('.//pyqgis_plugin') if p.find('file_name').text.find(plugin_name) != -1][0].find('download_url').text89 def _do_post(self, url, requires_auth=True, values={}, headers={}):90 """91 Make a POST request, turns into GET if not requires auth92 (nginx does not support POST on static files)93 """94 if not requires_auth:95 return self._do_get(url, requires_auth, values, headers)96 data = urllib.urlencode(values)97 req = urllib2.Request(url, data, headers)98 return urllib2.urlopen(req)99 def _do_get(self, url, requires_auth=True, values={}, headers={}):100 """101 Make a GET request102 """103 req = urllib2.Request(url)104 for n, v in headers.iteritems():105 req.add_header(n, v)106 return urllib2.urlopen(req)107 def _http_basic_header(self, username, password):108 """109 Add the header for HTTP Basic110 """111 base64string = base64.encodestring('%s:%s' % (username, password))[:-1]112 return {"Authorization": "Basic %s" % base64string}113 def _http_bearer_header(self, access_token):114 """115 Add the Authorization: Bearer header116 """117 return {"Authorization": "Bearer %s" % access_token}118 def _get_access_token(self, url, username=None, password=None):119 """Return the access_token"""120 try:121 access_token = self._tokens_cache['%s%s' % (username, password)]122 except KeyError:123 headers = self._http_basic_header(username, password)124 response = self._do_post(url, requires_auth=True, headers=headers)125 access_token = response.headers['X-Access-Token']126 self._tokens_cache['%s%s' % (username, password)] = access_token127 return access_token128class TestAuth0GET(TestAuth0Base):129 """130 HTTP Basic with username and password method POST131 """132 def _do_test(self, url, username=None, password=None, requires_auth=True, values={}):133 headers = {}134 if requires_auth:135 headers = self._http_basic_header(username, password)136 return self._do_get(url, requires_auth, values, headers)137 def test_noAuthRequired(self):138 """139 Test a valid request for a plugin that:140 - do not require authentication141 """142 response = self._do_test(self._get_download_ur('test_plugin_1.0.1'),143 requires_auth=False)144 self.assertGreater(len(response.read()), 4800)145 self.assertLess(len(response.read()), 5000)146 self.assertEqual(response.getcode(), 200)147 def test_ValidAuthNoAuthRequired(self):148 """149 Test a valid auth request for a plugin that:150 - do not require authentication151 """152 response = self._do_test(self._get_download_ur('test_plugin_1.0.1'),153 *DESKTOP_ROLE_ACCOUNTS['DesktopBasic'],154 requires_auth=False)155 self.assertGreater(len(response.read()), 4800)156 self.assertLess(len(response.read()), 5000)157 self.assertEqual(response.getcode(), 200)158 def test_ValidAuthUserProfile(self):159 """160 Test a valid auth request for a user profile161 """162 response = self._do_test(self.api_endpoint + 'user_profile',163 *DESKTOP_ROLE_ACCOUNTS['DesktopBasic'],164 requires_auth=True)165 self.assertEqual(response.getcode(), 200)166 text = response.read()167 response_j = json.loads(text)168 self.assertTrue(response_j.get('SiteRole').find('DesktopBasic') != -1)169 def test_ValidAuthUserRoles(self):170 """171 Test a valid auth request for a user roles172 """173 response = self._do_test(self.api_endpoint + 'user_roles',174 *DESKTOP_ROLE_ACCOUNTS['DesktopBasic'],175 requires_auth=True)176 self.assertEqual(response.getcode(), 200)177 text = response.read()178 response_j = json.loads(text)179 self.assertEquals(response_j, [u'Registered Users', u'Suite', u'DesktopBasic'])180 #@unittest.skip("Auth0 locks")181 def test_WrongAuthNoRoleRequired(self):182 """183 Test that a wrong auth request for a plugin that184 - requires authentication185 - requires no role186 """187 with self.assertRaises(urllib2.HTTPError) as cm:188 response = self._do_test(self._get_download_ur('test_plugin_3.0.1'))189 the_exception = cm.exception190 self.assertEqual(the_exception.msg, 'UNAUTHORIZED')191 self.assertEqual(the_exception.getcode(), 401)192 # Wrong username and password193 with self.assertRaises(urllib2.HTTPError) as cm:194 response = self._do_test(self._get_download_ur('test_plugin_3.0.1'),195 'wrong_username', 'wrong_password')196 the_exception = cm.exception197 self.assertEqual(the_exception.msg, 'UNAUTHORIZED')198 self.assertEqual(the_exception.getcode(), 401)199 def test_ValidAuthNoRoleRequired(self):200 """201 Test that a valid auth request for a plugin that202 - requires authentication203 - requires no role204 """205 response = self._do_test(self._get_download_ur('test_plugin_3.0.1'),206 *DESKTOP_ROLE_ACCOUNTS['DesktopBasic'])207 self.assertGreater(len(response.read()), 5000)208 self.assertEqual(response.getcode(), 200)209 #@unittest.skip("Auth0 locks")210 def test_WrongAuthDesktopBasicRequired(self):211 """212 Test that a wrong auth request for a plugin that213 - requires authentication214 - require DesktopBasic authorization215 """216 with self.assertRaises(urllib2.HTTPError) as cm:217 response = self._do_test(self._get_download_ur('test_plugin_2.0.1'))218 the_exception = cm.exception219 self.assertEqual(the_exception.msg, 'UNAUTHORIZED')220 self.assertEqual(the_exception.getcode(), 401)221 # Wrong username and password222 with self.assertRaises(urllib2.HTTPError) as cm:223 response = self._do_test(self._get_download_ur('test_plugin_2.0.1'),224 'wrong_username', 'wrong_password')225 the_exception = cm.exception226 self.assertEqual(the_exception.msg, 'UNAUTHORIZED')227 self.assertEqual(the_exception.getcode(), 401)228 def test_ValidAuthDesktopBasicRequired(self):229 """230 Test that a valid auth request for a plugin that231 - requires authentication232 - require DesktopBasic authorization233 """234 response = self._do_test(self._get_download_ur('test_plugin_2.0.1'),235 *DESKTOP_ROLE_ACCOUNTS['DesktopBasic'])236 self.assertGreater(len(response.read()), 5000)237 self.assertEqual(response.getcode(), 200)238 def test_ValidAuthWrongRoleDesktopEnterpriseRequired(self):239 """240 Test that a valid auth/wrong role request for a plugin that241 - requires authentication242 - require DesktopEnterprise authorization243 """244 with self.assertRaises(urllib2.HTTPError) as cm:245 response = self._do_test(self._get_download_ur('test_plugin_4.0.1'),246 *DESKTOP_ROLE_ACCOUNTS['DesktopBasic'])247 the_exception = cm.exception248 self.assertEqual(the_exception.getcode(), 403)249 self.assertEqual(the_exception.msg, 'FORBIDDEN')250 def test_ValidAuthDesktopEnterpriseRequired(self):251 """252 Test that a valid auth request for a plugin that253 - requires authentication254 - require DesktopEnterprise authorization255 """256 response = self._do_test(self._get_download_ur('test_plugin_4.0.1'),257 *DESKTOP_ROLE_ACCOUNTS['DesktopEnterprise'])258 self.assertGreater(len(response.read()), 2900)259 self.assertLess(len(response.read()), 3000)260 self.assertEqual(response.getcode(), 200)261 def test_ValidAuthHigherRoleDesktopBasicRequired(self):262 """263 Test that a valid auth/role request for a plugin that264 - requires authentication265 - require DesktopBasic or DesktopEnterprise authorization266 """267 response = self._do_test(self._get_download_ur('test_plugin_2.0.1'),268 *DESKTOP_ROLE_ACCOUNTS['DesktopEnterprise'])269 self.assertGreater(len(response.read()), 5000)270 self.assertEqual(response.getcode(), 200)271class TestAuth0POST(TestAuth0GET):272 """273 HTTP Basic with username and password method POST274 """275 def _do_test(self, url, username=None, password=None, requires_auth=True, values={}):276 headers = {}277 if not requires_auth or username is None or password is None:278 return self._do_get(url, requires_auth=requires_auth)279 headers = self._http_basic_header(username, password)280 return self._do_post(url, requires_auth, values, headers)281class TestAuth0Bearer(TestAuth0GET):282 """283 Authorization Request Header Field access token (Bearer) method POST284 """285 def _do_test(self, url, username=None, password=None, requires_auth=True, values={}):286 # Get the token287 headers = {}288 if requires_auth and username is not None and password is not None:289 headers = self._http_bearer_header(self._get_access_token(url, username, password))290 return self._do_post(url, requires_auth, values, headers)291 else: # for clarity292 return self._do_get(url, requires_auth, values, headers)293class TestAuth0GETQueryString(TestAuth0GET):294 """295 Access token passed on the query string as `access_token` method GET296 """297 def _do_test(self, url, username=None, password=None, requires_auth=True, values={}):298 headers = {}299 if requires_auth and username is not None and password is not None:300 url = '%s?access_token=%s' % (url, self._get_access_token(url, username, password))301 return self._do_get(url, requires_auth, values, headers)302class TestAuth0POSTAccessToken(TestAuth0GET):303 """304 Form-Encoded Body Parameter access token passed as `access_token` method POST305 """306 def _do_test(self, url, username=None, password=None, requires_auth=True, values={}):307 if requires_auth and username is not None and password is not None:308 values = {'access_token': self._get_access_token(url, username, password)}309 return self._do_post(url, requires_auth, values)310if __name__ == '__main__':...

Full Screen

Full Screen

app.py

Source:app.py Github

copy

Full Screen

...13 return jsonify({14 'message': 'Welcome to a farming tracker API system'15 })16@app.route('/procedures', methods=['GET'])17@requires_auth('get:procedures')18def get_procedures(payload):19 procedures = Procedure.query.all()20 if not procedures:21 abort(404)22 formattedprocedure = [procedure.format() for procedure in procedures]23 for procedure in formattedprocedure:24 workername = get_worker(procedure['worker'])25 fieldname = get_field(procedure['field'])26 inputname = get_input(procedure['input'])27 procedure['worker'] = workername28 procedure['field'] = fieldname29 procedure['input'] = inputname30 return jsonify({31 'success': True,32 'total_procedures': len(procedures),33 'procedures': formattedprocedure34 }), 20035def get_worker(workerid):36 worker = Worker.query.filter_by(id=workerid).first().name37 return worker38def get_input(inputid):39 input = Inputs.query.filter_by(id=inputid).first().name40 return input41def get_field(fieldid):42 field = Fields.query.filter_by(id=fieldid).first().name43 return field44 # add procedures45@app.route('/procedures', methods=['POST'])46@requires_auth('post:procedures')47def createprocedure(payload):48 data = request.get_json()49 if not data:50 abort(404)51 try:52 new_name = data.get('name', None)53 new_date = data.get('date', None)54 new_activity = data.get('activity', None)55 new_field_id = data.get('field_id', None)56 new_worker_id = data.get('worker_id', None)57 new_input_id = data.get('input_id', None)58 new_inputs_qty = data.get('inputs_quantity', None)59 new_image_link = data.get('image_link', None)60 procedure = Procedure(name=new_name,61 date=new_date,62 activity=new_activity,63 field_id=new_field_id,64 worker_id=new_worker_id,65 input_id=new_input_id,66 inputs_quantity=new_inputs_qty,67 image_link=new_image_link)68 procedure.insert()69 except Exception as e:70 abort(500)71 return jsonify({72 'success': True,73 'procedure': procedure.format(),74 'created': procedure.id75 })76 # Update Procedure77@app.route('/procedures/<int:id>', methods=['PATCH'])78@requires_auth('patch:procedures')79def update_procedure(payload, id):80 data = request.get_json()81 procedure = Procedure.query.filter(Procedure.id == id).one_or_none()82 if not procedure:83 abort(404)84 try:85 updated_name = data.get('name', None)86 updated_date = data.get('date', None)87 updated_activity = data.get('activity', None)88 updated_field = data.get('field_id', None)89 updated_worker = data.get('worker_id', None)90 updated_input = data.get('input_id', None)91 updated_inputs_quantity = data.get('inputs_quantity', None)92 updated_image_link = data.get('image_link', None)93 if updated_name:94 procedure.name = updated_name95 if updated_date:96 procedure.date = updated_date97 if updated_activity:98 procedure.activity = updated_activity99 if updated_field:100 procedure.field = updated_field101 if updated_worker:102 procedure.worker_id = updated_worker103 if updated_input:104 procedure.input_id = updated_input105 if updated_inputs_quantity:106 procedure.inputs_quantity = updated_inputs_quantity107 if updated_image_link:108 procedure.image_link = updated_image_link109 procedure.update()110 except BaseException:111 abort(400)112 return jsonify({113 'success': True,114 'procedure': [procedure.format()]115 }), 200116 # Delete Procedure117@app.route('/procedures/<id>', methods=['DELETE'])118@requires_auth('delete:procedures')119def delete_procedure(payload, id):120 procedure = Procedure.query.filter(Procedure.id == id).one_or_none()121 if not procedure:122 abort(404)123 try:124 procedure.delete()125 except BaseException:126 abort(400)127 return jsonify({128 'success': True,129 'delete': id130 }), 200131 # get workers132@app.route('/workers', methods=['GET'])133@requires_auth('get:workers')134def get_workers(payload):135 workers = Worker.query.all()136 if not workers:137 abort(404)138 return jsonify({139 'success': True,140 'workers': [worker.format() for worker in workers]141 }), 200142 # Add workers143@app.route('/workers', methods=['POST'])144@requires_auth('post:workers')145def create_workers(payload):146 data = request.get_json()147 try:148 new_name = data.get('name', None)149 new_national_id = data.get('national_id', None)150 new_phone_number = data.get('phone_number', None)151 new_type = data.get('type', None)152 worker = Worker(name=new_name,153 national_id=new_national_id,154 phone_number=new_phone_number,155 type=new_type,156 )157 worker.insert()158 except Exception as e:159 print(e)160 abort(404)161 return jsonify({162 'success': True,163 'worker': worker.format()164 })165 # Delete Worker166@app.route('/workers/<id>', methods=['DELETE'])167@requires_auth('delete:workers')168def delete_worker(payload, id):169 worker = Worker.query.filter(Worker.id == id).one_or_none()170 if not worker:171 abort(404)172 try:173 worker.delete()174 except BaseException:175 abort(400)176 return jsonify({177 'success': True,178 'delete': id179 }), 200180 # Update Worker181@app.route('/workers/<int:id>', methods=['PATCH'])182@requires_auth('patch:workers')183def update_workers(payload, id):184 data = request.get_json()185 186 worker = Worker.query.filter(Worker.id == id).one_or_none()187 if not worker:188 abort(404)189 try:190 updated_name = data.get('name', None)191 updated_id = data.get('national_id', None)192 updated_no = data.get('phone_number', None)193 updated_type = data.get('type', None)194 if updated_name:195 worker.name = updated_name196 if updated_id:197 worker.national_id = updated_id198 if updated_no:199 worker.phone_number = updated_no200 if updated_type:201 worker.type = updated_type202 worker.update()203 except BaseException as e:204 print(f'Error ==> {e}')205 abort(400)206 return jsonify({207 'success': True,208 'worker': [worker.format()]209 }), 200210 # Get Inputs211@app.route('/inputs', methods=['GET'])212@requires_auth('get:inputs')213def get_inputs(payload):214 inputs = Inputs.query.all()215 if not inputs:216 abort(404)217 return jsonify({218 'success': True,219 'inputs': [input.format() for input in inputs]220 }), 200221 # Add Inputs222@app.route('/inputs', methods=['POST'])223@requires_auth('post:inputs')224def create_inputs(payload):225 data = request.get_json()226 227 try:228 new_name = data.get('name', None)229 new_quantity = data.get('quantity', None)230 new_type = data.get('type', None)231 new_metrics = data.get('metrics', None)232 inputs = Inputs(name=new_name,233 quantity=new_quantity,234 type=new_type,235 metrics= new_metrics236 )237 inputs.insert()238 except Exception as e:239 print(e)240 abort(404)241 return jsonify({242 'success': True,243 'inputs': inputs.format()244 })245 # Delete Inputs246@app.route('/inputs/<id>', methods=['DELETE'])247@requires_auth('delete:inputs')248def delete_inputs(payload, id):249 input = Inputs.query.filter(Inputs.id == id).one_or_none()250 if not input:251 abort(404)252 try:253 input.delete()254 except BaseException:255 abort(422)256 return jsonify({257 'success': True,258 'delete': id259 }), 200260 # Update Worker261@app.route('/inputs/<int:id>', methods=['PATCH'])262@requires_auth('patch:inputs')263def update_inputs(payload, id):264 data = request.get_json()265 input = Inputs.query.filter(Inputs.id == id).one_or_none()266 if not input:267 abort(404)268 try:269 updated_name = data.get('name', None)270 updated_qty = data.get('quantity', None)271 updated_metrics = data.get('metrics', None)272 updated_type = data.get('type', None)273 if updated_name:274 input.name = updated_name275 if updated_qty:276 input.phone_number = updated_qty277 if updated_metrics:278 input.metrics = updated_metrics279 if updated_type:280 input.type = updated_type281 input.update()282 except BaseException:283 abort(400)284 return jsonify({285 'success': True,286 'input': [input.format()]287 }), 200288 # Get Fields289@app.route('/fields', methods=['GET'])290@requires_auth('get:fields')291def get_fields(payload):292 fields = Fields.query.all()293 if not fields:294 abort(404)295 return jsonify({296 'success': True,297 'fields': [field.format() for field in fields]298 }), 200299 # Add Fields300@app.route('/fields', methods=['POST'])301@requires_auth('post:fields')302def create_fields(payload):303 data = request.get_json()304 305 try:306 new_name = data.get('name', None)307 new_size = data.get('size', None)308 print(new_name)309 print(new_size)310 fields = Fields(name=new_name, size=new_size)311 fields.insert()312 except Exception as e:313 print(e)314 abort(404)315 return jsonify({316 'success': True,317 'fields': fields.format()318 })319 # Delete Fields320@app.route('/fields/<id>', methods=['DELETE'])321@requires_auth('delete:fields')322def delete_fields(payload, id):323 field = Fields.query.filter(Fields.id == id).one_or_none()324 if not field:325 abort(404)326 try:327 field.delete()328 except BaseException:329 abort(400)330 return jsonify({331 'success': True,332 'delete': id333 }), 200334@app.route('/fields/<int:id>', methods=['PATCH'])335@requires_auth('patch:fields ')336def update_fields(payload, id):337 data = request.get_json()338 field = Fields.query.filter(Fields.id == id).one_or_none()339 if not field:340 abort(404)341 try:342 updated_name = data.get('name', None)343 updated_size = data.get('size', None)344 if updated_name:345 field.name = updated_name346 if updated_size:347 field.size = updated_size348 field.update()349 except BaseException:...

Full Screen

Full Screen

main.py

Source:main.py Github

copy

Full Screen

1"""2Docker Compose UI, flask based application3"""4from json import loads5import logging6import os7import traceback8from shutil import rmtree9from compose.service import ImageType, BuildAction10import docker11import requests12from flask import Flask, jsonify, request13from scripts.git_repo import git_pull, git_repo, GIT_YML_PATH14from scripts.bridge import ps_, get_project, get_container_from_id, get_yml_path, containers15from scripts.find_files import find_yml_files, get_readme_file, get_logo_file16from scripts.requires_auth import requires_auth, authentication_enabled, \17 disable_authentication, set_authentication18# Flask Application19API_V1 = '/api/v1/'20YML_PATH = os.getenv('DOCKER_COMPOSE_UI_YML_PATH') \21 or '/opt/docker-compose-projects'22logging.basicConfig(level=logging.DEBUG)23app = Flask(__name__, static_url_path='')24def load_projects():25 """26 load project definitions (docker-compose.yml files)27 """28 global projects29 if git_repo:30 git_pull()31 projects = find_yml_files(GIT_YML_PATH)32 else:33 projects = find_yml_files(YML_PATH)34 logging.debug(projects)35load_projects()36def get_project_with_name(name):37 """38 get docker compose project given a project name39 """40 path = projects[name]41 return get_project(path)42# REST endpoints43@app.route(API_V1 + "projects", methods=['GET'])44def list_projects():45 """46 List docker compose projects47 """48 load_projects()49 active = [container['Labels']['com.docker.compose.project'] \50 if 'com.docker.compose.project' in container['Labels'] \51 else [] for container in containers()]52 return jsonify(projects=projects, active=active)53@app.route(API_V1 + "remove/<name>", methods=['DELETE'])54@requires_auth55def rm_(name):56 """57 remove previous cached containers. docker-compose rm -f58 """59 project = get_project_with_name(name)60 project.remove_stopped()61 return jsonify(command='rm')62@app.route(API_V1 + "projects/<name>", methods=['GET'])63def project_containers(name):64 """65 get project details66 """67 project = get_project_with_name(name)68 return jsonify(containers=ps_(project))69@app.route(API_V1 + "projects/<project>/<service_id>", methods=['POST'])70@requires_auth71def run_service(project, service_id):72 """73 docker-compose run service74 """75 json = loads(request.data)76 service = get_project_with_name(project).get_service(service_id)77 command = json["command"] if 'command' in json else service.options.get('command')78 container = service \79 .create_container(one_off=True, command=command)80 container.start()81 return jsonify(\82 command='run %s/%s' % (project, service_id), \83 name=container.name, \84 id=container.id \85 )86@app.route(API_V1 + "projects/yml/<name>", methods=['GET'])87def project_yml(name):88 """89 get yml content90 """91 path = get_yml_path(projects[name])92 with open(path) as data_file:93 return jsonify(yml=data_file.read())94@app.route(API_V1 + "projects/readme/<name>", methods=['GET'])95def get_project_readme(name):96 """97 get README.md or readme.md if available98 """99 path = projects[name]100 return jsonify(readme=get_readme_file(path))101@app.route(API_V1 + "projects/logo/<name>", methods=['GET'])102def get_project_logo(name):103 """104 get logo.png if available105 """106 path = projects[name]107 return get_logo_file(path)108@app.route(API_V1 + "projects/<name>/<container_id>", methods=['GET'])109def project_container(name, container_id):110 """111 get container details112 """113 project = get_project_with_name(name)114 container = get_container_from_id(project.client, container_id)115 return jsonify(116 id=container.id,117 short_id=container.short_id,118 human_readable_command=container.human_readable_command,119 name=container.name,120 name_without_project=container.name_without_project,121 number=container.number,122 ports=container.ports,123 ip=container.get('NetworkSettings.IPAddress'),124 labels=container.labels,125 log_config=container.log_config,126 image=container.image,127 environment=container.environment,128 started_at=container.get('State.StartedAt'),129 repo_tags=container.image_config['RepoTags']130 )131@app.route(API_V1 + "projects/<name>", methods=['DELETE'])132@requires_auth133def kill(name):134 """135 docker-compose kill136 """137 get_project_with_name(name).kill()138 return jsonify(command='kill')139@app.route(API_V1 + "projects", methods=['PUT'])140@requires_auth141def pull():142 """143 docker-compose pull144 """145 name = loads(request.data)["id"]146 get_project_with_name(name).pull()147 return jsonify(command='pull')148@app.route(API_V1 + "services", methods=['PUT'])149@requires_auth150def scale():151 """152 docker-compose scale153 """154 req = loads(request.data)155 name = req['project']156 service_name = req['service']157 num = req['num']158 project = get_project_with_name(name)159 project.get_service(service_name).scale(desired_num=int(num))160 return jsonify(command='scale')161@app.route(API_V1 + "projects", methods=['POST'])162@requires_auth163def up_():164 """165 docker-compose up166 """167 req = loads(request.data)168 name = req["id"]169 service_names = req.get('service_names', None)170 do_build = BuildAction.force if req.get('do_build', False) else BuildAction.none171 container_list = get_project_with_name(name).up(172 service_names=service_names,173 do_build=do_build)174 return jsonify(175 {176 'command': 'up',177 'containers': [container.name for container in container_list]178 })179@app.route(API_V1 + "build", methods=['POST'])180@requires_auth181def build():182 """183 docker-compose build184 """185 json = loads(request.data)186 name = json["id"]187 dic = dict(no_cache=json["no_cache"] if "no_cache" in json \188 else None, pull=json["pull"] if "pull" in json else None)189 get_project_with_name(name).build(**dic)190 return jsonify(command='build')191@app.route(API_V1 + "create", methods=['POST'])192@requires_auth193def create():194 """195 create new project196 """197 data = loads(request.data)198 directory = YML_PATH + '/' + data["name"]199 os.makedirs(directory)200 file_path = directory + "/docker-compose.yml"201 out_file = open(file_path, "w")202 out_file.write(data["yml"])203 out_file.close()204 load_projects()205 return jsonify(path=file_path)206@app.route(API_V1 + "remove-project/<name>", methods=['DELETE'])207@requires_auth208def remove_project(name):209 """210 remove project211 """212 directory = YML_PATH + '/' + name213 rmtree(directory)214 load_projects()215 return jsonify(path=directory)216@app.route(API_V1 + "search", methods=['POST'])217def search():218 """219 search for a project on www.composeregistry.com220 """221 query = loads(request.data)['query']222 response = requests.get('https://www.composeregistry.com/api/v1/search', \223 params={'query': query}, headers={'x-key': 'default'})224 if response.status_code == 200:225 return jsonify(response.json())226 else:227 result = jsonify(response.json())228 result.status_code = response.status_code229 return result230@app.route(API_V1 + "yml", methods=['POST'])231def yml():232 """233 get yml content from www.composeregistry.com234 """235 item_id = loads(request.data)['id']236 response = requests.get('https://www.composeregistry.com/api/v1/yml', \237 params={'id': item_id}, headers={'x-key': 'default'})238 return jsonify(response.json())239@app.route(API_V1 + "start", methods=['POST'])240@requires_auth241def start():242 """243 docker-compose start244 """245 name = loads(request.data)["id"]246 get_project_with_name(name).start()247 return jsonify(command='start')248@app.route(API_V1 + "stop", methods=['POST'])249@requires_auth250def stop():251 """252 docker-compose stop253 """254 name = loads(request.data)["id"]255 get_project_with_name(name).stop()256 return jsonify(command='stop')257@app.route(API_V1 + "down", methods=['POST'])258@requires_auth259def down():260 """261 docker-compose down262 """263 name = loads(request.data)["id"]264 get_project_with_name(name).down(ImageType.none, None)265 return jsonify(command='down')266@app.route(API_V1 + "logs/<name>", defaults={'limit': "all"}, methods=['GET'])267@app.route(API_V1 + "logs/<name>/<int:limit>", methods=['GET'])268def logs(name, limit):269 """270 docker-compose logs271 """272 lines = {}273 for k in get_project_with_name(name).containers(stopped=True):274 lines[k.name] = k.logs(timestamps=True, tail=limit).split('\n')275 return jsonify(logs=lines)276@app.route(API_V1 + "logs/<name>/<container_id>", defaults={'limit': "all"}, methods=['GET'])277@app.route(API_V1 + "logs/<name>/<container_id>/<int:limit>", methods=['GET'])278def container_logs(name, container_id, limit):279 """280 docker-compose logs of a specific container281 """282 project = get_project_with_name(name)283 container = get_container_from_id(project.client, container_id)284 lines = container.logs(timestamps=True, tail=limit).split('\n')285 return jsonify(logs=lines)286@app.route(API_V1 + "host", methods=['GET'])287def host():288 """289 docker host info290 """291 host_value = os.getenv('DOCKER_HOST')292 return jsonify(host=host_value)293@app.route(API_V1 + "host", methods=['POST'])294@requires_auth295def set_host():296 """297 set docker host298 """299 new_host = loads(request.data)["id"]300 if new_host is None:301 if os.environ.has_key('DOCKER_HOST'):302 del os.environ['DOCKER_HOST']303 return jsonify()304 else:305 os.environ['DOCKER_HOST'] = new_host306 return jsonify(host=new_host)307@app.route(API_V1 + "authentication", methods=['GET'])308def authentication():309 """310 check if basic authentication is enabled311 """312 return jsonify(enabled=authentication_enabled())313@app.route(API_V1 + "authentication", methods=['DELETE'])314@requires_auth315def disable_basic_authentication():316 """317 disable basic authentication318 """319 disable_authentication()320 return jsonify(enabled=False)321@app.route(API_V1 + "authentication", methods=['POST'])322@requires_auth323def enable_basic_authentication():324 """325 set up basic authentication326 """327 data = loads(request.data)328 set_authentication(data["username"], data["password"])329 return jsonify(enabled=True)330# static resources331@app.route("/")332def index():333 """334 index.html335 """336 return app.send_static_file('index.html')337## basic exception handling338@app.errorhandler(requests.exceptions.ConnectionError)339def handle_connection_error(err):340 """341 connection exception handler342 """343 return 'docker host not found: ' + str(err), 500344@app.errorhandler(docker.errors.DockerException)345def handle_docker_error(err):346 """347 docker exception handler348 """349 return 'docker exception: ' + str(err), 500350@app.errorhandler(Exception)351def handle_generic_error(err):352 """353 default exception handler354 """355 traceback.print_exc()356 return 'error: ' + str(err), 500357# run app358if __name__ == "__main__":...

Full Screen

Full Screen

rest_server.py

Source:rest_server.py Github

copy

Full Screen

1import logging2import keystone.middleware.auth_token as auth_token3from flask import Flask, Response, request4logger = logging.getLogger('service.rest_server')5class Rest_Server():6 def start(self):7 self.app.run(self.host, self.port)8 def __init__(self, conf):9 self.app = Flask(__name__)10 self.app.config['PROPAGATE_EXCEPTIONS'] = True11 self.rest_api = conf.get('rest_api')12 self.host = conf.get('host')13 self.port = conf.get('port')14 self.request = None15 self.app.wsgi_app = auth_token.AuthProtocol(self.app.wsgi_app, conf)16 @self.app.route('/')17 def root():18 result = self.rest_api.route_root()19 if result is None:20 return Response(response='root not found', status=404)21 return Response(response=result, status=200)22 @self.app.route('/hosts')23 @self.app.route('/hosts/')24 #requires_auth25 def hosts():26 result = self.rest_api.route_hosts(request.query_string)27 if result is None:28 return Response(response='no hosts found', status=404)29 return Response(response=result, status=200)30 @self.app.route('/hosts/<host_id>')31 @self.app.route('/hosts/<host_id>/')32 #requires_auth33 def hosts_hid(host_id):34 result = self.rest_api.route_hosts_hid(host_id,35 request.query_string)36 if result is None:37 return Response(response='host not found', status=404)38 return Response(response=result, status=200)39 @self.app.route('/hosts/<host_id>/meters')40 @self.app.route('/hosts/<host_id>/meters/')41 #requires_auth42 def hosts_hid_meters(host_id):43 result = self.rest_api.route_hosts_hid_meters(host_id,44 request.query_string)45 if result is None:46 return Response(response='no meters found', status=404)47 return Response(response=result, status=200)48 @self.app.route('/hosts/<host_id>/meters/<meter_id>/records')49 @self.app.route('/hosts/<host_id>/meters/<meter_id>/records/')50 def hosts_hid_meters_mid_records(host_id, meter_id):51 result = self.rest_api.route_hosts_hid_meters_mid_records(\52 host_id, meter_id, request.query_string)53 if result is None:54 return Response(response='no records found', status=404)55 return Response(response=result, status=200)56 @self.app.route('/projects')57 @self.app.route('/projects/')58 #requires_auth59 def projects():60 result = self.rest_api.route_projects(request.query_string)61 if result is None:62 return Response(response='no projects found', status=404)63 return Response(response=result, status=200)64 @self.app.route('/projects/<project_id>')65 @self.app.route('/projects/<project_id>/')66 #requires_auth67 def projects_pid(project_id):68 result = self.rest_api.route_projects_pid(project_id,69 request.query_string)70 if result is None:71 return Response(response='project not found', status=404)72 return Response(response=result, status=200)73 @self.app.route('/projects/<project_id>/meters')74 @self.app.route('/projects/<project_id>/meters/')75 #requires_auth76 def projects_pid_meters(project_id):77 result = self.rest_api.route_projects_pid_meters(project_id,\78 request.query_string)79 if result is None:80 return Response(response='no meters found', status=404)81 return Response(response=result, status=200)82 @self.app.route('/projects/<project_id>/meters/<meter_id>/records')83 @self.app.route('/projects/<project_id>/meters/<meter_id>/records/')84 #requires_auth85 def projects_pid_meters_mid_records(project_id, meter_id):86 result = self.rest_api.\87 route_projects_pid_meters_mid_records(project_id,88 meter_id,\89 request.query_string)90 if result is None:91 return Response(response='no records found', status=404)92 return Response(response=result, status=200)93 @self.app.route('/projects/<project_id>/instaces')94 @self.app.route('/projects/<project_id>/instances/')95 #requires_auth96 def projects_pid_instances(project_id):97 result = self.rest_api.\98 route_projects_pid_instances(project_id,\99 request.query_string)100 if result is None:101 return Response(response='no instances found', status=404)102 return Response(response=result, status=200)103 @self.app.route('/meters')104 @self.app.route('/meters/')105 #requires_auth106 def meters():107 result = self.rest_api.route_meters(request.query_string)108 if result is None:109 return Response(response='no meters found', status=404)110 return Response(response=result, status=200)111 @self.app.route('/meters/<meter_id>')112 @self.app.route('/meters/<meter_id>/')113 #requires_auth114 def meters_mid(meter_id):115 result = self.rest_api.route_meters_mid(meter_id,116 request.query_string)117 if result is None:118 return Response(response='meter not found', status=404)119 return Response(response=result, status=200)120 @self.app.route('/users')121 @self.app.route('/users/')122 #requires_auth123 def users():124 result = self.rest_api.route_users(request.query_string)125 if result is None:126 return Response(response='no users found', status=404)127 return Response(response=result, status=200)128 @self.app.route('/users/<user_id>/meters/<meter_id>/records')129 @self.app.route('/users/<user_id>/meters/<meter_id>/records/')130 #requires_auth131 def users_pid_meters_mid_records(user_id, meter_id):132 result = self.rest_api.\133 route_users_uid_meters_mid_records(user_id, meter_id,\134 request.query_string)135 if result is None:136 return Response(response='no records found', status=404)137 return Response(response=result, status=200)138 @self.app.route('/instances')139 @self.app.route('/instances/')140 #requires_auth141 def instances():142 result = self.rest_api.route_instances(request.query_string)143 if result is None:144 return Response(response='no instances found', status=404)145 return Response(response=result, status=200)146 @self.app.route('/instances/<instance_id>/meters/<meter_id>/records')147 @self.app.route('/instances/<instance_id>/meters/<meter_id>/records/')148 #requires_auth149 def instances_iid_meters_mid_records(instance_id, meter_id):150 result = self.rest_api.\151 route_instances_iid_meters_mid_records(instance_id,152 meter_id,\153 request.query_string)154 if result is None:155 return Response(response='no records found', status=404)156 return Response(response=result, status=200)157 @self.app.route('/records')158 @self.app.route('/records/')159 #requires_auth160 def records():161 result = self.rest_api.route_records(request.query_string)162 if result is None:163 return Response(response='no records found', status=404)164 return Response(response=result, status=200)165 @self.app.route('/records/<record_id>')166 @self.app.route('/records/<record_id>/')167 #requires_auth168 def records_rid(record_id):169 result = self.rest_api.route_records_rid(record_id,170 request.query_string)171 if result is None:172 return Response(response='record not found', status=404)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Splinter automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful