Best Python code snippet using pyresttest_python
test_pool_get_deploy_mock.py
Source:test_pool_get_deploy_mock.py  
1# -*- coding: utf-8 -*-2import logging3from django.core.management import call_command4from django.test.client import Client5from mock import patch6from networkapi.api_pools.facade.v3 import deploy as facade_pool_deploy7from networkapi.test.mock import MockPlugin8from networkapi.test.test_case import NetworkApiTestCase9from networkapi.usuario.models import Usuario10log = logging.getLogger(__name__)11def setup():12    call_command(13        'loaddata',14        'networkapi/system/fixtures/initial_variables.json',15        'networkapi/api_pools/fixtures/initial_optionspool.json',16        'networkapi/requisicaovips/fixtures/initial_optionsvip.json',17        'networkapi/healthcheckexpect/fixtures/initial_healthcheck.json',18        'networkapi/usuario/fixtures/initial_usuario.json',19        'networkapi/grupo/fixtures/initial_ugrupo.json',20        'networkapi/usuario/fixtures/initial_usuariogrupo.json',21        'networkapi/api_ogp/fixtures/initial_objecttype.json',22        'networkapi/api_ogp/fixtures/initial_objectgrouppermissiongeneral.json',23        'networkapi/grupo/fixtures/initial_permissions.json',24        'networkapi/grupo/fixtures/initial_permissoes_administrativas.json',25        'networkapi/api_pools/fixtures/initial_base.json',26        'networkapi/api_pools/fixtures/initial_pools_2.json',27        verbosity=028    )29class PoolGetDeployMockTestCase(NetworkApiTestCase):30    maxDiff = None31    def setUp(self):32        self.client = Client()33        self.user = Usuario(id=1, nome='test')34    def tearDown(self):35        pass36    def mock_create_pool(self, id_pool, test_patch):37        pool = self.client.get(38            '/api/v3/pool/%s/' % id_pool,39            HTTP_AUTHORIZATION=self.get_http_authorization('test'))40        test_patch.return_value = MockPlugin()41        facade_pool_deploy.create_real_pool(42            pool.data['server_pools'],43            self.user)44        pool = self.client.get(45            '/api/v3/pool/%s/' % id_pool,46            HTTP_AUTHORIZATION=self.get_http_authorization('test')47        ).data['server_pools'][0]48        self.assertEqual(49            True,50            pool['pool_created'],51            'After deploy, flag created should be True.')52        pass53    def mock_delete_pool(self, id_pool, test_patch):54        pool = self.client.get(55            '/api/v3/pool/%s/' % id_pool,56            HTTP_AUTHORIZATION=self.get_http_authorization('test')).data57        test_patch.return_value = MockPlugin()58        facade_pool_deploy.delete_real_pool(pool['server_pools'], self.user)59        pool = self.client.get(60            '/api/v3/pool/%s/' % id_pool,61            HTTP_AUTHORIZATION=self.get_http_authorization('test')) \62            .data['server_pools'][0]63        self.assertEqual(64            False,65            pool['pool_created'],66            'After undeploy, flag created should be False.')67    @patch('networkapi.plugins.factory.PluginFactory.factory')68    def test_deploy_pool_without_reals(self, test_patch):69        """Tries to deploy a pool without reals."""70        self.mock_create_pool(31, test_patch)71    @patch('networkapi.plugins.factory.PluginFactory.factory')72    def test_deploy_pool_with_two_reals_and_tcp_protocol(self, test_patch):73        """Tries to deploy a pool with two reals and TCP protocol74        in healthcheck.75        """76        self.mock_create_pool(32, test_patch)77    @patch('networkapi.plugins.factory.PluginFactory.factory')78    def test_deploy_pool_with_two_reals_and_https_protocol(self, test_patch):79        """Tries to deploy a pool with two reals and HTTPS protocol80        in healthcheck.81        """82        self.mock_create_pool(33, test_patch)83    @patch('networkapi.plugins.factory.PluginFactory.factory')84    def test_deploy_pool_with_two_reals_and_udp_protocol(self, test_patch):85        """Tries to deploy a pool with two reals and UDP protocol86        in healthcheck.87        """88        self.mock_create_pool(34, test_patch)89    @patch('networkapi.plugins.factory.PluginFactory.factory')90    def test_deploy_pool_with_two_reals_and_http_protocol(self, test_patch):91        """Tries to deploy a pool with two reals and HTTP protocol92        in healthcheck.93        """94        self.mock_create_pool(35, test_patch)95    @patch('networkapi.plugins.factory.PluginFactory.factory')96    def test_deploy_pool_three_reals_and_weighted(self, test_patch):97        """Tries to deploy a pool with three reals and weighted balancing.98        """99        self.mock_create_pool(36, test_patch)100    @patch('networkapi.plugins.factory.PluginFactory.factory')101    def test_deploy_pool_three_reals_and_least_conn(self, test_patch):102        """Tries to deploy a pool with three reals and least-conn balancing.103        """104        self.mock_create_pool(37, test_patch)105    @patch('networkapi.plugins.factory.PluginFactory.factory')106    def test_update_deployed_pool_without_reals(self, test_patch):107        """Tries to update deployed pool without reals adding two reals to it.108        """109        id = 38110        pool = self.client.get(111            '/api/v3/pool/%s/' % id,112            HTTP_AUTHORIZATION=self.get_http_authorization('test')).data113        test_patch.return_value = MockPlugin()114        qt_reals = 2115        ipsv4 = [116            {'id': 45, 'ip_formated': '10.0.0.45'},117            {'id': 46, 'ip_formated': '10.0.0.46'}118        ]119        server_pool_members = [120            self.build_server_pool_member(121                ip__id=ipsv4[i]['id'],122                ip__ip_formated=ipsv4[i]['ip_formated'],123                port_real=8) for i in range(qt_reals)]124        pool['server_pools'][0]['server_pool_members'] = server_pool_members125        facade_pool_deploy.update_real_pool(pool['server_pools'], self.user)126        pool = self.client.get(127            '/api/v3/pool/%s/' % id,128            HTTP_AUTHORIZATION=self.get_http_authorization('test'))\129            .data['server_pools'][0]130        self.assertEqual(2,131                         len(pool['server_pool_members']),132                         'This server pool should have two members.')133        self.assertEqual(134            True,135            pool['pool_created'],136            'After deploy, flag created should be True.')137    @patch('networkapi.plugins.factory.PluginFactory.factory')138    def test_update_deployed_pool_with_reals_removing_them(self, test_patch):139        """Tries to update pool with reals removing them."""140        id = 39141        pool = self.client.get(142            '/api/v3/pool/%s/' % id,143            HTTP_AUTHORIZATION=self.get_http_authorization('test')).data144        test_patch.return_value = MockPlugin()145        pool['server_pools'][0]['server_pool_members'] = []146        facade_pool_deploy.update_real_pool(pool['server_pools'], self.user)147        pool = self.client.get(148            '/api/v3/pool/%s/' % id,149            HTTP_AUTHORIZATION=self.get_http_authorization('test')) \150            .data['server_pools'][0]151        self.assertEqual(0,152                         len(pool['server_pool_members']),153                         'This server pool should have zero members.')154        self.assertEqual(155            True,156            pool['pool_created'],157            'After deploy, flag created should be True.')158    @patch('networkapi.plugins.factory.PluginFactory.factory')159    def test_update_deployed_pool_removing_half_of_reals(self, test_patch):160        """Tries to remove half of reals in a deployed server pool."""161        id = 40  # Pool with this ID has four members162        pool = self.client.get(163            '/api/v3/pool/%s/' % id,164            HTTP_AUTHORIZATION=self.get_http_authorization('test')).data165        self.assertEqual(4,166                         len(pool['server_pools'][0]['server_pool_members']),167                         'This server pool should have four members.')168        test_patch.return_value = MockPlugin()169        qt_reals = 2170        ipsv4 = [171            {'id': 49, 'ip_formated': '10.0.0.49'},172            {'id': 50, 'ip_formated': '10.0.0.50'}173        ]174        server_pool_members = [175            self.build_server_pool_member(176                ip__id=ipsv4[i]['id'],177                ip__ip_formated=ipsv4[i]['ip_formated'],178                port_real=8) for i in range(qt_reals)]179        pool['server_pools'][0]['server_pool_members'] = server_pool_members180        facade_pool_deploy.update_real_pool(pool['server_pools'], self.user)181        pool = self.client.get(182            '/api/v3/pool/%s/' % id,183            HTTP_AUTHORIZATION=self.get_http_authorization('test')) \184            .data['server_pools'][0]185        self.assertEqual(2,186                         len(pool['server_pool_members']),187                         'This server pool should have two members.')188        self.assertEqual(189            True,190            pool['pool_created'],191            'After deploy, flag created should be True.')192    @patch('networkapi.plugins.factory.PluginFactory.factory')193    def test_update_deployed_pool_rm_half_reals_add_another(self, test_patch):194        """Tries to remove half of the reals in a deployed server pool and195        at same time adding a new real.196        """197        id = 41  # Pool with this ID has four members (IPs 53, 54, 55, 56)198        pool = self.client.get(199            '/api/v3/pool/%s/' % id,200            HTTP_AUTHORIZATION=self.get_http_authorization('test')).data201        self.assertEqual(4,202                         len(pool['server_pools'][0]['server_pool_members']),203                         'This server pool should have four members.')204        test_patch.return_value = MockPlugin()205        qt_reals = 3206        ipsv4 = [207            {'id': 53, 'ip_formated': '10.0.0.53'},208            {'id': 54, 'ip_formated': '10.0.0.54'},209            {'id': 57, 'ip_formated': '10.0.0.57'}210        ]211        server_pool_members = [212            self.build_server_pool_member(213                ip__id=ipsv4[i]['id'],214                ip__ip_formated=ipsv4[i]['ip_formated'],215                port_real=8) for i in range(qt_reals)]216        pool['server_pools'][0]['server_pool_members'] = server_pool_members217        facade_pool_deploy.update_real_pool(pool['server_pools'], self.user)218        pool = self.client.get(219            '/api/v3/pool/%s/' % id,220            HTTP_AUTHORIZATION=self.get_http_authorization('test')) \221            .data['server_pools'][0]222        self.assertEqual(3,223                         len(pool['server_pool_members']),224                         'This server pool should have three members.')225        self.assertEqual(226            True,227            pool['pool_created'],228            'After deploy, flag created should be True.')229    @patch('networkapi.plugins.factory.PluginFactory.factory')230    def test_undeploy_pool_with_reals(self, test_patch):231        """Tries to undeploy pool with two reals."""232        self.mock_delete_pool(42, test_patch)233    @patch('networkapi.plugins.factory.PluginFactory.factory')234    def test_undeploy_pool_without_reals(self, test_patch):235        """Tries to undeploy pool without reals."""236        self.mock_delete_pool(43, test_patch)237    def build_server_pool_member(self, **kwargs):238        ip__id = None239        ip__ip_formated = None240        ipv6__id = None241        ipv6__ip_formated = None242        port_real = None243        weight = None244        priority = None245        id = None246        for key in kwargs:247            if key == 'ip__id':248                ip__id = kwargs[key]249            elif key == 'ip__ip_formated':250                ip__ip_formated = kwargs[key]251            elif key == 'ipv6__id':252                ipv6__id = kwargs[key]253            elif key == 'ipv6__ip_formated':254                ipv6__ip_formated = kwargs[key]255            elif key == 'port_real':256                port_real = kwargs[key]257            elif key == 'weight':258                weight = kwargs[key]259            elif key == 'priority':260                priority = kwargs[key]261            elif key == 'id':262                id = kwargs[key]263        return {264            'id': id,265            'ip': {266                'id': ip__id,267                'ip_formated': ip__ip_formated268            } if ip__id is not None and ip__ip_formated is not None269            else None,270            'ipv6': {271                'id': ip__id,272                'ip_formated': ip__ip_formated273            } if ipv6__id is not None and ipv6__ip_formated is not None274            else None,275            'priority': priority if priority is not None else 0,276            'weight': weight if weight is not None else 0,277            'limit': 0,278            'port_real': port_real,279            'member_status': 7,280            'last_status_update_formated': None...gcloud_test.py
Source:gcloud_test.py  
1# Copyright 2017 The Forseti Security Authors. All rights reserved.2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7#    http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.12# See the License for the specific language governing permissions and13# limitations under the License.14"""Tests for install/gcp/installer/util/gcloud.py."""15import json16import sys17import unittest18from StringIO import StringIO19from contextlib import contextmanager20import mock21from install.gcp.installer.util import gcloud22from install.gcp.installer.util import utils23from tests.unittest_utils import ForsetiTestCase24FAKE_PROJECT = 'fake-project'25FAKE_ACCOUNT = 'fake-account@localhost.domain'26FAKE_GCLOUD_INFO = {27    'config': {28        'project': FAKE_PROJECT,29        'account': FAKE_ACCOUNT,30        'properties': {31        }32    }33}34FAKE_GCLOUD_INFO_CLOUDSHELL = {35    'config': {36        'project': FAKE_PROJECT,37        'account': FAKE_ACCOUNT,38        'properties': {39            'metrics': {40                'environment': 'devshell'41            }42        }43    }44}45FAKE_IAM_POLICY = {46    'bindings': [47        {'members': [48            'user:root@localhost.domain',49            'group:security-group@localhost.domain',50         ],51         'role': 'roles/fakeAdminRole'52        },53        {'members': [54            'user:user1@localhost.domain',55            'user:{}'.format(FAKE_ACCOUNT),56         ],57         'role': 'roles/fakeViewerRole'58        },59        {'members': [60            'user:{}'.format(FAKE_ACCOUNT),61         ],62         'role': 'roles/fakeEditorRole'63        },64    ]65}66GCLOUD_MIN_VERSION = (163, 0, 0)67# Thank you https://stackoverflow.com/questions/4219717/how-to-assert-output-with-nosetest-unittest-in-python/17981937#1798193768@contextmanager69def captured_output():70    new_out, new_err = StringIO(), StringIO()71    old_out, old_err = sys.stdout, sys.stderr72    try:73        sys.stdout, sys.stderr = new_out, new_err74        yield sys.stdout, sys.stderr75    finally:76        sys.stdout, sys.stderr = old_out, old_err77class GcloudTest(ForsetiTestCase):78    """Test the install_forseti."""79    def setUp(self):80        """Setup."""81        utils.run_command = mock.MagicMock()82        self.gcloud_min_ver_formatted = '.'.join([str(d) for d in GCLOUD_MIN_VERSION])83    @mock.patch('install.gcp.installer.util.gcloud.constants.GCLOUD_MIN_VERSION', GCLOUD_MIN_VERSION)84    @mock.patch('install.gcp.installer.util.gcloud.utils.run_command')85    def test_check_proper_gcloud(self, test_patch):86        """Test check_proper_gcloud() works with proper version/alpha."""87        test_patch.return_value = (88            0,89            'Google Cloud SDK %s\nalpha 12345\netc' % self.gcloud_min_ver_formatted,90            None91        )92        output_head = 'Current gcloud version'93        with captured_output() as (out, err):94            gcloud.check_proper_gcloud()95            output = out.getvalue()[:len(output_head)]96            self.assertEqual(output_head, output)97    @mock.patch('install.gcp.installer.util.gcloud.utils.run_command')98    def test_check_proper_gcloud_failed_command(self, test_patch):99        """Test check_proper_gcloud() exits when command fails."""100        test_patch.return_value = (101            1,102            'Google Cloud SDK %s\nalpha 12345\netc' % self.gcloud_min_ver_formatted,103            None104        )105        output_head = 'Error'106        with self.assertRaises(SystemExit):107            with captured_output() as (out, err):108                gcloud.check_proper_gcloud()109                output = out.getvalue()[:len(output_head)]110                self.assertEqual(output_head, output)111    @mock.patch('install.gcp.installer.util.gcloud.utils.run_command')112    def test_check_proper_gcloud_low_version(self, test_patch):113        """Test check_proper_gcloud() exits with low gcloud version."""114        test_patch.return_value = (115            0,116            'Google Cloud SDK 162.9.9\nalpha 12345\netc',117            None118        )119        output_head = ('Current gcloud version: %s\n'120                       'Has alpha components? True'121                       'You need' % self.gcloud_min_ver_formatted)122        with self.assertRaises(SystemExit):123            with captured_output() as (out, err):124                gcloud.check_proper_gcloud()125                output = out.getvalue()[:len(output_head)]126    @mock.patch('install.gcp.installer.util.gcloud.utils.run_command')127    def test_check_proper_gcloud_no_alpha(self, test_patch):128        """Test check_proper_gcloud() exits with no alpha components."""129        test_patch.return_value = (130            0,131            'Google Cloud SDK %s\netc' % self.gcloud_min_ver_formatted,132            None133        )134        output_head = ('Current gcloud version: %s\n'135                       'Has alpha components? False\n'136                       'You need' % self.gcloud_min_ver_formatted)137        with self.assertRaises(SystemExit):138            with captured_output() as (out, err):139                gcloud.check_proper_gcloud()140                output = out.getvalue()[:len(output_head)]141                self.assertEqual(output_head, output)142    @mock.patch('install.gcp.installer.util.gcloud.utils.run_command')143    def test_gcloud_info_works_nocloudshell(self, test_patch):144        """Test gcloud_info()."""145        test_patch.return_value = (146            0,147            json.dumps(FAKE_GCLOUD_INFO),148            None149        )150        with captured_output() as (out, err):151            gcloud.get_gcloud_info()152            output = out.getvalue().strip()153            self.assertEqual('Read gcloud info: Success', output)154    @mock.patch('install.gcp.installer.util.gcloud.utils.run_command')155    def test_gcloud_info_cmd_fails(self, test_patch):156        """Test gcloud_info() exits when command fails."""157        test_patch.return_value = (158            1,159            None,160            'Error output'161        )162        with self.assertRaises(SystemExit):163            with captured_output():164                gcloud.check_proper_gcloud()165    @mock.patch('install.gcp.installer.util.gcloud.utils.run_command')166    def test_gcloud_info_json_fails(self, test_patch):167        """Test gcloud_info() exits when json output fails."""168        test_patch.return_value = (169            0,170            'invalid json',171            None,172        )173        with self.assertRaises(SystemExit):174            with captured_output():175                gcloud.get_gcloud_info()176    @mock.patch('install.gcp.installer.util.gcloud.check_proper_gcloud')177    def test_check_cloudshell_no_flag_no_cloudshell(self, test_patch):178        """Test check_cloudshell() when no cloudshell and no flag to bypass."""179        test_patch.return_value = {}180        force_no_cloudshell = False181        is_dev_shell = False182        with self.assertRaises(SystemExit):183            with captured_output():184                gcloud.verify_gcloud_information(185                    'id', 'user', force_no_cloudshell, is_dev_shell)186    @mock.patch('install.gcp.installer.util.gcloud.utils.run_command')187    def test_lookup_organization(self, test_patch):188        """Test lookup_organization().189        Find organization from a project nested inside 3 folders.190        """191        project_desc = json.dumps({192            'name': 'project-1',193            'parent': {194                'id': '12345',195                'type': 'folder'196            },197        })198        folder_12345_desc = json.dumps({199            'name': 'folders/12345',200            'parent': 'folders/23456'201        })202        folder_23456_desc = json.dumps({203            'name': 'folders/23456',204            'parent': 'folders/34567'205        })206        folder_34567_desc = json.dumps({207            'name': 'folders/34567',208            'parent': 'organizations/1111122222'209        })210        test_patch.side_effect = [211            [0, project_desc, None],212            [0, folder_12345_desc, None],213            [0, folder_23456_desc, None],214            [0, folder_34567_desc, None],215        ]216        output_head = 'Organization id'217        with captured_output() as (out, err):218            gcloud.lookup_organization(FAKE_PROJECT)219            # collect all the output, the last line (excluding blank line)220            # should be 'Organization id: ...'221            all_output = [s for s in out.getvalue().split('\n') if len(s)]222            output = all_output[-1][:len(output_head)]223            self.assertEqual(output_head, output)224    @mock.patch('install.gcp.installer.util.gcloud.utils.run_command')225    def test_choose_organization_no_org(self, test_patch):226        """Test choose_organization()."""227        # No orgs228        test_patch.return_value = (0, '{}', None)229        with captured_output() as (out, err):230            target_id = gcloud.choose_organization()231            self.assertEqual(None, target_id)232    @mock.patch('install.gcp.installer.util.gcloud.utils.run_command')233    @mock.patch('__builtin__.raw_input')234    def test_choose_organization_has_org(self, mock_rawinput, test_patch):235        """Test choose_organization()."""236        mock_rawinput.side_effect = ['123']237        # Has orgs238        test_patch.return_value = (239            0, '[{"name": "organizations/123", "displayName": "fake org"}]', None)240        with captured_output() as (out, err):241            target_id = gcloud.choose_organization()242            self.assertEqual('123', target_id)243    @mock.patch('__builtin__.raw_input')244    def test_choose_folder(self, mock_rawinput):245        """Test choose_folder()."""246        mock_rawinput.side_effect = ['abc', '123']247        # Has orgs248        with captured_output() as (out, err):249            target_id = gcloud.choose_folder(FAKE_PROJECT)250            self.assertEqual('123', target_id)251    @mock.patch('__builtin__.raw_input')252    def test_choose_project(self, mock_rawinput):253        """Test choose_project()."""254        mock_rawinput.side_effect = ['abc']255        # Has orgs256        with captured_output() as (out, err):257            target_id = gcloud.choose_project()258            self.assertEqual('abc', target_id)259if __name__ == '__main__':...utils.py
Source:utils.py  
1from __future__ import division2import os, scipy.io3import re4import torch5import torch.nn as nn6import torch.optim as optim7import numpy as np8import glob9import cv210from scipy.stats import poisson11from skimage.measure import compare_psnr,compare_ssim12import time13def pack_gbrg_raw(raw):14    #pack GBRG Bayer raw to 4 channels15    black_level = 24016    white_level = 2**12-117    im = raw.astype(np.float32)18    im = np.maximum(im - black_level, 0) / (white_level-black_level)19    im = np.expand_dims(im, axis=2)20    img_shape = im.shape21    H = img_shape[0]22    W = img_shape[1]23    out = np.concatenate((im[1:H:2, 0:W:2, :],24                          im[1:H:2, 1:W:2, :],25                          im[0:H:2, 1:W:2, :],26                          im[0:H:2, 0:W:2, :]), axis=2)27    return np.minimum(out, 1.0)28def depack_gbrg_raw(raw):29    H = raw.shape[1]30    W = raw.shape[2]31    output = np.zeros((H*2,W*2))32            33    output[ ::2, ::2]=raw[...,3]34    output[ ::2,1::2]=raw[...,2]35    output[1::2, ::2]=raw[...,0]36    output[1::2,1::2]=raw[...,1]37    return output38def pack_rggb_raw(raw):39    #pack RGGB Bayer raw to 4 channels40    black_level = 24041    white_level = 2**12-142    im = raw.astype(np.float32)43    im = np.maximum(im - black_level, 0) / (white_level-black_level)44    im = np.expand_dims(im, axis=2)45    img_shape = im.shape46    H = img_shape[0]47    W = img_shape[1]48    out = np.concatenate((im[0:H:2, 0:W:2, :],49                          im[0:H:2, 1:W:2, :],50                          im[1:H:2, 1:W:2, :],51                          im[1:H:2, 0:W:2, :]), axis=2)52    return out53def generate_noisy_raw(gt_raw, a, b):54    """55    a: sigma_s^256    b: sigma_r^257    """58    gaussian_noise_var = b59    poisson_noisy_img = poisson((gt_raw-240)/a).rvs()*a60    gaussian_noise = np.sqrt(gaussian_noise_var)*np.random.randn(gt_raw.shape[0], gt_raw.shape[1])61    noisy_img = poisson_noisy_img + gaussian_noise + 24062    noisy_img = np.minimum(np.maximum(noisy_img,0), 2**12-1)63    64    return noisy_img65def generate_name(number):66    name = list('000000_raw.tiff')67    num_str = str(number)68    for i in range(len(num_str)):69        name[5-i] = num_str[-(i+1)]70    name = ''.join(name)71    return name72def reduce_mean(out_im, gt_im):73    return torch.abs(out_im - gt_im).mean()74def reduce_mean_with_weight(im1, im2, noisy_level_data):75    result = torch.abs(im1 - im2) * noisy_level_data * 0.176    return result.mean()77def preprocess(raw):78    input_full = raw.transpose((0, 3, 1, 2))79    input_full = torch.from_numpy(input_full)80    input_full = input_full.cuda()81    return input_full82def postprocess(output):83    output = output.cpu()84    output = output.detach().numpy().astype(np.float32)85    output = np.transpose(output, (0, 2, 3, 1))86    output = np.clip(output,0,1)87    return output88def findLastCheckpoint(save_dir):89    file_list = glob.glob(os.path.join(save_dir, 'model_epoch*.pth'))90    if file_list:91        epochs_exist = []92        for file_ in file_list:93            result = re.findall(".*model_epoch(.*).pth.*", file_)94            epochs_exist.append(int(result[0]))95        initial_epoch = max(epochs_exist)96    else:97        initial_epoch = 098    return initial_epoch99def bayer_preserving_augmentation(raw, aug_mode):100    if aug_mode == 0:  # horizontal flip101        aug_raw = np.flip(raw, axis=1)[:,1:-1]102    elif aug_mode == 1: # vertical flip103        aug_raw = np.flip(raw, axis=0)[1:-1,:]104    else:  # random transpose105        aug_raw = np.transpose(raw, (1, 0))106    return aug_raw107def test_big_size_raw(input_data, denoiser, patch_h = 256, patch_w = 256, patch_h_overlap = 64, patch_w_overlap = 64):108    H = input_data.shape[1]109    W = input_data.shape[2]110    111    test_result = np.zeros((input_data.shape[0],H,W,4))112    t0 = time.clock()113    h_index = 1114    while (patch_h*h_index-patch_h_overlap*(h_index-1)) < H:115        test_horizontal_result = np.zeros((input_data.shape[0],patch_h,W,4))116        h_begin = patch_h*(h_index-1)-patch_h_overlap*(h_index-1)117        h_end = patch_h*h_index-patch_h_overlap*(h_index-1) 118        w_index = 1119        while (patch_w*w_index-patch_w_overlap*(w_index-1)) < W:120            w_begin = patch_w*(w_index-1)-patch_w_overlap*(w_index-1)121            w_end = patch_w*w_index-patch_w_overlap*(w_index-1)122            test_patch = input_data[:,h_begin:h_end,w_begin:w_end,:]               123            test_patch = preprocess(test_patch)               124            with torch.no_grad():125                output_patch = denoiser(test_patch.reshape(1,3,4,patch_h,patch_w))126            test_patch_result = postprocess(output_patch)127            if w_index == 1:128                test_horizontal_result[:,:,w_begin:w_end,:] = test_patch_result129            else:130                for i in range(patch_w_overlap):131                    test_horizontal_result[:,:,w_begin+i,:] = test_horizontal_result[:,:,w_begin+i,:]*(patch_w_overlap-1-i)/(patch_w_overlap-1)+test_patch_result[:,:,i,:]*i/(patch_w_overlap-1)132                test_horizontal_result[:,:,w_begin+patch_w_overlap:w_end,:] = test_patch_result[:,:,patch_w_overlap:,:]133            w_index += 1                   134    135        test_patch = input_data[:,h_begin:h_end,-patch_w:,:]         136        test_patch = preprocess(test_patch)137        with torch.no_grad():138            output_patch = denoiser(test_patch.reshape(1,3,4,patch_h,patch_w))139        test_patch_result = postprocess(output_patch)       140        last_range = w_end-(W-patch_w)       141        for i in range(last_range):142            test_horizontal_result[:,:,W-patch_w+i,:] = test_horizontal_result[:,:,W-patch_w+i,:]*(last_range-1-i)/(last_range-1)+test_patch_result[:,:,i,:]*i/(last_range-1)143        test_horizontal_result[:,:,w_end:,:] = test_patch_result[:,:,last_range:,:]       144        if h_index == 1:145            test_result[:,h_begin:h_end,:,:] = test_horizontal_result146        else:147            for i in range(patch_h_overlap):148                test_result[:,h_begin+i,:,:] = test_result[:,h_begin+i,:,:]*(patch_h_overlap-1-i)/(patch_h_overlap-1)+test_horizontal_result[:,i,:,:]*i/(patch_h_overlap-1)149            test_result[:,h_begin+patch_h_overlap:h_end,:,:] = test_horizontal_result[:,patch_h_overlap:,:,:] 150        h_index += 1151    test_horizontal_result = np.zeros((input_data.shape[0],patch_h,W,4))152    w_index = 1153    while (patch_w*w_index-patch_w_overlap*(w_index-1)) < W:154        w_begin = patch_w*(w_index-1)-patch_w_overlap*(w_index-1)155        w_end = patch_w*w_index-patch_w_overlap*(w_index-1)156        test_patch = input_data[:,-patch_h:,w_begin:w_end,:]               157        test_patch = preprocess(test_patch)               158        with torch.no_grad():159            output_patch = denoiser(test_patch.reshape(1,3,4,patch_h,patch_w))160        test_patch_result = postprocess(output_patch)161        if w_index == 1:162            test_horizontal_result[:,:,w_begin:w_end,:] = test_patch_result163        else:164            for i in range(patch_w_overlap):165                test_horizontal_result[:,:,w_begin+i,:] = test_horizontal_result[:,:,w_begin+i,:]*(patch_w_overlap-1-i)/(patch_w_overlap-1)+test_patch_result[:,:,i,:]*i/(patch_w_overlap-1)166            test_horizontal_result[:,:,w_begin+patch_w_overlap:w_end,:] = test_patch_result[:,:,patch_w_overlap:,:]   167        w_index += 1168    test_patch = input_data[:,-patch_h:,-patch_w:,:]         169    test_patch = preprocess(test_patch)170    with torch.no_grad():171        output_patch = denoiser(test_patch.reshape(1,3,4,patch_h,patch_w))172    test_patch_result = postprocess(output_patch)173    last_range = w_end-(W-patch_w)       174    for i in range(last_range):175        test_horizontal_result[:,:,W-patch_w+i,:] = test_horizontal_result[:,:,W-patch_w+i,:]*(last_range-1-i)/(last_range-1)+test_patch_result[:,:,i,:]*i/(last_range-1) 176    test_horizontal_result[:,:,w_end:,:] = test_patch_result[:,:,last_range:,:] 177    last_last_range = h_end-(H-patch_h)178    for i in range(last_last_range):179        test_result[:,H-patch_w+i,:,:] = test_result[:,H-patch_w+i,:,:]*(last_last_range-1-i)/(last_last_range-1)+test_horizontal_result[:,i,:,:]*i/(last_last_range-1)180    test_result[:,h_end:,:,:] = test_horizontal_result[:,last_last_range:,:,:]181   182    t1 = time.clock()183    print('Total running time: %s s' % (str(t1 - t0)))184    return test_result185def pack_gbrg_raw_for_compute_ssim(raw):186    im = raw.astype(np.float32)187    im = np.expand_dims(im, axis=2)188    img_shape = im.shape189    H = img_shape[0]190    W = img_shape[1]191    out = np.concatenate((im[1:H:2, 0:W:2, :],192                          im[1:H:2, 1:W:2, :],193                          im[0:H:2, 1:W:2, :],194                          im[0:H:2, 0:W:2, :]), axis=2)195    return out196def compute_ssim_for_packed_raw(raw1, raw2):197    raw1_pack = pack_gbrg_raw_for_compute_ssim(raw1)198    raw2_pack = pack_gbrg_raw_for_compute_ssim(raw2)199    test_raw_ssim = 0200    for i in range(4):201        test_raw_ssim += compare_ssim(raw1_pack[:,:,i], raw2_pack[:,:,i], data_range=1.0)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
