Best Python code snippet using localstack_python
test_pxf.py
Source:test_pxf.py  
1#!/usr/bin/env python2'''3Licensed to the Apache Software Foundation (ASF) under one4or more contributor license agreements.  See the NOTICE file5distributed with this work for additional information6regarding copyright ownership.  The ASF licenses this file7to you under the Apache License, Version 2.0 (the8"License"); you may not use this file except in compliance9with the License.  You may obtain a copy of the License at10    http://www.apache.org/licenses/LICENSE-2.011Unless required by applicable law or agreed to in writing, software12distributed under the License is distributed on an "AS IS" BASIS,13WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.14See the License for the specific language governing permissions and15limitations under the License.16'''17import copy18import json19import os20from mock.mock import patch21from stacks.utils.RMFTestCase import Template, RMFTestCase22class TestPxf(RMFTestCase):23  PXF_PACKAGE_DIR = "PXF/3.0.0/package"24  STACK_VERSION = "2.3"25  PXF_USER = 'pxf'26  PXF_GROUP = 'pxf'27  TOMCAT_GROUP = 'tomcat'28  BASH_SHELL = '/bin/bash'29  DEFAULT_TIMEOUT = 60030  CONFIG_FILE = os.path.join(os.path.dirname(__file__), '../configs/pxf_default.json')31  with open(CONFIG_FILE, "r") as f:32     pxf_config = json.load(f)33  def setUp(self):34    self.config_dict = copy.deepcopy(self.pxf_config)35  def assert_configure_default(self):36    self.assertResourceCalled('User', self.PXF_USER,37                              groups=[self.getConfig()['configurations']['hdfs-site']['dfs.permissions.superusergroup'],38                              self.getConfig()['configurations']['cluster-env']['user_group'],39                              self.TOMCAT_GROUP],40        shell=self.BASH_SHELL)41    self.assertResourceCalled('File', '/etc/pxf/conf/pxf-env.sh',42                content=Template('pxf-env.j2'))43    self.assertResourceCalled('File', '/etc/pxf/conf/pxf-public.classpath',44                content = self.getConfig()['configurations']['pxf-public-classpath']['content'].lstrip())45    self.assertResourceCalled('File', '/etc/pxf/conf/pxf-profiles.xml',46                content = self.getConfig()['configurations']['pxf-profiles']['content'].lstrip())47    self.assertResourceCalled('XmlConfig', 'pxf-site.xml',48                              conf_dir='/etc/pxf/conf',49                              configurations=self.getConfig()['configurations']['pxf-site'],50                              configuration_attributes=self.getConfig()['configuration_attributes']['pxf-site'])51    self.assertResourceCalled('Execute', 'service pxf-service init',52                              timeout=self.DEFAULT_TIMEOUT,53                              logoutput=True)54  @patch('shutil.copy2')55  def test_install_default(self, shutil_copy2_mock):56    self.executeScript(self.PXF_PACKAGE_DIR + "/scripts/pxf.py",57                       classname="Pxf",58                       command="install",59                       config_dict=self.config_dict,60                       stack_version=self.STACK_VERSION,61                       target=RMFTestCase.TARGET_COMMON_SERVICES,62                       try_install=True)63    self.assertResourceCalled('Package', 'pxf-service',64                              retry_count=5,65                              retry_on_repo_unavailability=False)66    self.assertResourceCalled('Package', 'apache-tomcat',67                              retry_count=5,68                              retry_on_repo_unavailability=False)69    self.assertResourceCalled('Package', 'pxf-hive',70                              retry_count=5,71                              retry_on_repo_unavailability=False)72    self.assertResourceCalled('Package', 'pxf-hdfs',73                              retry_count=5,74                              retry_on_repo_unavailability=False)75    self.assertResourceCalled('Package', 'pxf-hbase',76                              retry_count=5,77                              retry_on_repo_unavailability=False)78    self.assertResourceCalled('Package', 'pxf-json',79                              retry_count=5,80                              retry_on_repo_unavailability=False)81    self.assert_configure_default()82  @patch('shutil.copy2')83  def test_configure_default(self, shutil_copy2_mock):84      self.executeScript(self.PXF_PACKAGE_DIR + "/scripts/pxf.py",85                   classname="Pxf",86                   command="configure",87                   config_dict=self.config_dict,88                   stack_version=self.STACK_VERSION,89                   target=RMFTestCase.TARGET_COMMON_SERVICES,90                   try_install=True)91      self.assert_configure_default()92  @patch('shutil.copy2')93  def test_start_default(self, shutil_copy2_mock):94      self.executeScript(self.PXF_PACKAGE_DIR + "/scripts/pxf.py",95                   classname="Pxf",96                   command="start",97                   config_dict=self.config_dict,98                   stack_version=self.STACK_VERSION,99                   target=RMFTestCase.TARGET_COMMON_SERVICES,100                   try_install=True)101      self.assert_configure_default()102      self.assertResourceCalled('Directory', '/var/pxf',103              owner=self.PXF_USER,104              group=self.PXF_GROUP,105              create_parents = True)106      self.assertResourceCalled('Execute', 'service pxf-service restart',107                          timeout=self.DEFAULT_TIMEOUT,108                          logoutput=True)109  def test_stop_default(self):110      self.executeScript(self.PXF_PACKAGE_DIR + "/scripts/pxf.py",111                   classname="Pxf",112                   command="stop",113                   config_dict=self.config_dict,114                   stack_version=self.STACK_VERSION,115                   target=RMFTestCase.TARGET_COMMON_SERVICES,116                   try_install=True)117      self.assertResourceCalled('Execute', 'service pxf-service stop',118                          timeout=self.DEFAULT_TIMEOUT,119                          logoutput=True)120  def test_status_default(self):121      self.executeScript(self.PXF_PACKAGE_DIR + "/scripts/pxf.py",122                   classname="Pxf",123                   command="status",124                   config_dict=self.config_dict,125                   stack_version=self.STACK_VERSION,126                   target=RMFTestCase.TARGET_COMMON_SERVICES,127                   try_install=True)128      self.assertResourceCalled('Execute', 'service pxf-service status',129                          timeout=self.DEFAULT_TIMEOUT,130                          logoutput=True)131  def tearDown(self):...test_basic.py
Source:test_basic.py  
...8from bottle import Bottle9from bottle_argsmap import ArgsMapPlugin, try_install10def test_try_install_new():11    app = Bottle()12    plugin = try_install(app)13    assert isinstance(plugin, ArgsMapPlugin)14    assert app.plugins[-1] is plugin15def test_try_install_exists():16    app = Bottle()17    plugin = try_install(app)18    assert plugin is try_install(app)19    assert plugin is try_install(app)20    assert plugin is try_install(app)21    assert len([p for p in app.plugins if isinstance(p, ArgsMapPlugin)]) == 122def test_without_inject():23    plugin = ArgsMapPlugin()24    app = Bottle()25    app.install(plugin)26    @app.get('/entires/<name>/<value>')27    def entires(name: str, value: str):28        return dict(name=name, value=value)29    tapp = webtest.TestApp(app)30    resp = tapp.get('/entires/test123/test4566')31    assert resp.status_code == 20032    assert resp.json == dict(name='test123', value='test4566')33def test_with_inject_value():34    plugin = ArgsMapPlugin()...__main__.py
Source:__main__.py  
2import os3import socket4from libinstall import FileInstaller, PackageInstaller5dir = os.path.dirname(__file__)6PackageInstaller.try_install('i3-gaps')7PackageInstaller.try_install('i3lock')8PackageInstaller.try_install('i3status')9PackageInstaller.try_install('feh')10PackageInstaller.try_install('rxvt-unicode')11sources = [os.path.join(dir, 'i3-config')]12if socket.gethostname() == 'sunjammer':13  FileInstaller.create_symlink(os.path.join(dir, 'a73', 'wrapper.py'), '~/.config/i3status/wrapper.py')14  sources.append(os.path.join(dir, 'a73/i3-config'))15else:16  sources.append(os.path.join(dir, 'pc/i3-config'))17FileInstaller.merge_files(sources, '~/.config/i3/config', '#')...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
