Best Python code snippet using pyresttest_python
test_median.py
Source:test_median.py  
1import test_median2import os3import data_select4import gevfit5from datetime import datetime, timedelta6import numpy as np7from map_parameters import polar_stereographic8import matplotlib.pyplot as plt9from shape.basin_boundaries import plot_basin_boundaries_from_shape10import application_properties11application_properties.set_current_directory()12class Test_median():13    def __init__(self, data_path = ''):14        data, times, i_indices, j_indices = data_select.get_data_from_file(data_path)15        self._id, rest = os.path.basename(data_path).split('_', 1)16        self._data = data17        self._times = times18        self._i_indices = i_indices19        self._j_indices = j_indices20        self.data_extremes = None21        self.return_period_years = 222        self.high_flow = True #low_flows are calculated if False        23        self.start_date = None24        self.end_date = None25        self.start_month = 126        self.end_month = 1227        self.event_duration = timedelta(days = 1)28        self.median_field = None29        self.ret_level_2yr = None30        self.longitudes = polar_stereographic.lons[i_indices, j_indices]31        self.latitudes = polar_stereographic.lats[i_indices, j_indices]32    def get_indices_in_2d_grid(self):33        return self._i_indices, self._j_indices34    def select_data_and_calculate(self, save_txt_file = False):35        '''36        Calculate 2 yr return levels and the corresponding median37        '''38        if self.high_flow:39            self.data_extremes = data_select.get_list_of_annual_maximums_for_domain(self._data, self._times,40                                        start_date = self.start_date, end_date = self.end_date,41                                        start_month = self.start_month, end_month = self.end_month,42                                        event_duration = self.event_duration)43        else:44            self.data_extremes = data_select.get_list_of_annual_minimums_for_domain(self._data, self._times,45                                        start_date = self.start_date, end_date = self.end_date,46                                        start_month = self.start_month, end_month = self.end_month,47                                        event_duration = self.event_duration)48        the_type = 'high' if self.high_flow else 'low'49        if save_txt_file:50            save_extremes_to_txt_file('{0}_{1}_values.txt'.format(self._id, the_type),51                                      self.data_extremes, self._i_indices, self._j_indices)52        53        self._calculate_median_field()54        self._calculate_return_level_field()55        pass56    def _calculate_median_field(self):57        assert self.start_date != None and self.end_date != None, 'start_date and end_date fields should be set.'58        self.median_field = []59        #cycle through the points of the domain60        for pos in range(len(self._i_indices)):61            values = self.data_extremes[pos]62            self.median_field.append(np.median(values))63        self.median_field = np.array(self.median_field)64        pass65    def _calculate_return_level_field(self, save_to_txt_file = False):66        if self.high_flow:67            field = gevfit.get_high_levels_for_id(self._id, return_period = self.return_period_years)68        else:69            field = gevfit.get_low_levels_for_id(self._id, return_period = self.return_period_years)70        the_type = 'high' if self.high_flow else 'low'71        if save_to_txt_file:72            save_ret_levels_to_txt('{0}_{1}yr_{2}_ret_level.txt'.format(self._id,73                                        self.return_period_years, the_type),74                                        field, self._i_indices, self._j_indices)75            save_pars_to_txt_file('{0}_{1}_params.txt'.format(self._id, the_type),76                                gevfit.get_gevd_params_for_id_and_type(self._id, self.high_flow),77                                self._i_indices, self._j_indices)78        self.ret_level_2yr = []79        for k in range(len(self._i_indices)):80            self.ret_level_2yr.append(field[k])81        self.ret_level_2yr = np.array(self.ret_level_2yr)82    def plot(self):83        basemap = polar_stereographic.basemap84        plt.subplot(3,1,1)85        gevfit.plot_data(self.ret_level_2yr, imagefile = None,86                            units = 'm**3/s', minmax = (0, None),87                            i_list = self._i_indices, j_list = self._j_indices)88        plt.title('2 year return levels')89        plt.subplot(3,1,2)90        gevfit.plot_data(self.median_field, imagefile = None,91                            units = 'm**3/s', minmax = (0, None),92                            i_list = self._i_indices, j_list = self._j_indices)93        plt.title('median')94        95        96        plt.subplot(3,1,3)97        gevfit.plot_data(np.array(self.median_field) - np.array(self.ret_level_2yr), imagefile = None,98                            units = 'm**3/s', minmax = (None, None),99                            i_list = self._i_indices, j_list = self._j_indices)100        plt.title('difference: median - 2 year return level')101        plot_basin_boundaries_from_shape(basemap, plotter = plt, linewidth = 0.1)102        basemap.drawcoastlines()103        plt.savefig('median_test.png')104        pass105def save_pars_to_txt_file(filename, pars_dict, i_indices, j_indices):106    f = open(filename, 'w')107    f.write('{0}\t{1}\t{2}\t{3}\t{4}\n'.format('i', 'j', 'sigma', 'mu', 'ksi'))108    for pos in range(len(i_indices)):109        f.write('{0}\t{1}\t{2}\t{3}\t{4}\n'.format(i_indices[pos], j_indices[pos],110                            pars_dict[pos][0], pars_dict[pos][1], pars_dict[pos][2]))111    f.close()112    pass113def save_extremes_to_txt_file(filename, data, i_indices, j_indices):114    f = open(filename, 'w')115    for i, j, the_value_list in zip(i_indices, j_indices, data):116        f.write('i = {0},\t j = {1}\n'.format(i, j))117        for value in the_value_list:118            f.write('{0}\n'.format(value))119    f.close()120    pass121def save_ret_levels_to_txt(filename, data, i_indices, j_indices):122    f = open(filename, 'w')123    for i, j, the_value in zip(i_indices, j_indices, data):124        f.write('i = {0},\t j = {1}, \t {2} \n'.format(i, j, the_value))125    f.close()126def main():127    paths = [128        'data/streamflows/hydrosheds_euler9/aet_discharge_1970_01_01_00_00.nc',129        'data/streamflows/hydrosheds_euler9/aeu_discharge_2041_01_01_00_00.nc'130    ]131    path_to_start_date = {paths[0]:datetime(1970,1,1), paths[1]:datetime(2041,1,1)}132    start_to_end = {datetime(1970,1,1): datetime(1999,12, 31), datetime(2041,1,1) : datetime(2070,12, 31)}133    the_types = ['low', 'high']134    type_to_startmonth = {'low':3,'high':4}135    type_to_end_month = {'low':4,'high':6}136    type_to_duration = {'low':timedelta(days = 15),'high':timedelta(days = 1)}137    #low_return_periods = [2, 5]138    high_return_periods = [10]139    the_type = 'high'140    for path in paths:141        for the_period in high_return_periods:142            test_median = Test_median(data_path = path)143            start_date = path_to_start_date[path]144            test_median.start_date = start_date145            test_median.end_date = start_to_end[start_date]146            test_median.start_month = type_to_startmonth[the_type]147            test_median.end_month = type_to_end_month[the_type]148            test_median.return_period_years = the_period149            test_median.high_flow = True150            test_median.event_duration = type_to_duration[the_type]151            print('selecting data and calculating')152            print('path={0}, the_type = {1}, the_period = {2}'.format(path, the_type, the_period))153            test_median.select_data_and_calculate()154            #plot155            test_median.plot()156            return157    the_type = 'low'158    for path in paths:159        for the_period in low_return_periods:160            test_median = Test_median(data_path = path)161            start_date = path_to_start_date[path]162            test_median.start_date = start_date163            test_median.end_date = start_to_end[start_date]164            test_median.start_month = type_to_startmonth[the_type]165            test_median.end_month = type_to_end_month[the_type]166            test_median.return_period_years = the_period167            test_median.high_flow = False168            test_median.event_duration = type_to_duration[the_type]169            print('selecting data and calculating')170            print('path={0}, the_type = {1}, the_period = {2}'.format(path, the_type, the_period))171            test_median.select_data_and_calculate()172#    data_path = 'data/streamflows/hydrosheds_euler2_1/aet_discharge_1970_01_01_00_00.nc'173#    test_median = Test_median(data_path = data_path)174#    test_median.start_date = datetime(1970,1,1)175#    test_median.end_date = datetime(1999,12, 31)176#    test_median.start_month = 4177#    test_median.end_month = 6178#    test_median.return_period_years = 2179#    test_median.high_flow = True180#    test_median.select_data_and_calculate()181#    test_median.plot()182#    data_path = 'data/streamflows/hydrosheds_euler2/aeu_discharge_2041_01_01_00_00.nc'183#    test_median = Test_median(data_path = data_path)184#    test_median.start_date = datetime(2041,1,1)185#    test_median.end_date = datetime(2070,12, 31)186#    test_median.start_month = 4187#    test_median.end_month = 6188#    test_median.return_period_years = 30189#    test_median.select_data_and_calculate()190#    test_median.plot()191#192if __name__ == '__main__':193    main()...geo_median.py
Source:geo_median.py  
1# Code largely taken from: https://gist.github.com/endolith/28371602#  with some help from https://github.com/ahwolf/meetup_location/blob/master/code/geo_median.py3#  and adapted to support great circle distances over Euclidean.4import csv5from geopy.distance import vincenty6from geopy.distance import great_circle7import numpy8VGI_REPOSITORY = 't51m'9LIMIT_MAD = 30  # acceptable km limit to median absolute deviation of points10LIMIT_POINTS = 5  # acceptable minimum number of GPS points for a user11DISTANCE_THRESHOLD = 1  # distance (meters) between iterations that determines end of search12DATA_POINTS_FILE = 'geo_median/{0}/user_points.csv'.format(VGI_REPOSITORY)13OUTPUT_MEDIANS = 'geo_median/{0}/user_medians.csv'.format(VGI_REPOSITORY)14SNAP_TO_USER_POINTS = False15OUTPUT_ALL_USERS = True16def cand_median(dataPoints):17    """Calculate the first candidate median as the geometric mean."""18    tempLat = 0.019    tempLon = 0.020    for i in range(0, len(dataPoints)):21        tempLat += dataPoints[i][0]22        tempLon += dataPoints[i][1]23    return (tempLat / len(dataPoints), tempLon / len(dataPoints))24def check_median_absolute_deviation(data_points, median):25    """Calculate Median Absolute Deviation of a set of points."""26    distances = []27    for i in range(0, len(data_points)):28        try:29            distances.append(vincenty(median, data_points[i]).kilometers)30        except ValueError:31            # Vincenty doesn't always converge so fall back on great circle distance which is less accurate but always converges32            distances.append(great_circle(median, data_points[i]).kilometers)33    return(numpy.median(distances))34def compute_user_median(data_points, num_iter, csvwriter, current_uid):35    if len(data_points) < LIMIT_POINTS:  # Insufficient points for the user - don't record median36        if OUTPUT_ALL_USERS:37            csvwriter.writerow([current_uid, None])38    else:39        if SNAP_TO_USER_POINTS: # ensure median is one of the user's points40            lowest_dev = float("inf")41            for point in data_points:42                tmp_abs_dev = objfunc(point, data_points)43                if tmp_abs_dev < lowest_dev:44                    lowest_dev = tmp_abs_dev45                    test_median = point46        else:47            test_median = cand_median(data_points)  # Calculate centroid more or less as starting point48            if objfunc(test_median, data_points) != 0:  # points aren't all the same49                # iterate to find reasonable estimate of median50                for x in range(0, num_iter):51                    denom = denomsum(test_median, data_points)52                    next_lat = 0.053                    next_lon = 0.054                    for y in range(0, len(data_points)):55                        next_lat += (data_points[y][0] * numersum(test_median, data_points[y])) / denom56                        next_lon += (data_points[y][1] * numersum(test_median, data_points[y])) / denom57                    prev_median = test_median58                    test_median = (next_lat, next_lon)59                    try:60                        if vincenty(prev_median, test_median).meters < DISTANCE_THRESHOLD:61                            break62                    except:63                        if great_circle(prev_median, test_median).meters < DISTANCE_THRESHOLD:64                            break65                if x == num_iter - 1:66                    print('{0}: failed to converge. Last change between iterations was {1} meters.'.format(current_uid, great_circle(prev_median, test_median).meters))67        # Check if user points are under the limit median absolute deviation68        if check_median_absolute_deviation(data_points, test_median) <= LIMIT_MAD:69            csvwriter.writerow([current_uid, (round(test_median[0],6), round(test_median[1],6))])70        else:71            if OUTPUT_ALL_USERS:72                csvwriter.writerow([current_uid, None])73def denomsum(test_median, data_points):74    """Provides the denominator of the weiszfeld algorithm."""75    temp = 0.076    for i in range(0, len(data_points)):77        try:78            temp += 1 / vincenty(test_median, data_points[i]).kilometers79        except ZeroDivisionError:80            continue  # filter points that equal the median out (otherwise no convergence)81        except ValueError:82            # Vincenty doesn't always converge so fall back on great circle distance which is less accurate but always converges83            temp += 1 / great_circle(test_median, data_points[i]).kilometers84    return temp85def numersum(test_median, data_point):86    """Provides the denominator of the weiszfeld algorithm depending on whether you are adjusting the candidate x or y."""87    try:88        return 1 / vincenty(test_median, data_point).kilometers89    except ZeroDivisionError:90        return 0  # filter points that equal the median out (otherwise no convergence)91    except ValueError:92        # Vincenty doesn't always converge so fall back on great circle distance which is less accurate but always converges93        return 1 / great_circle(test_median, data_point).kilometers94def objfunc(test_median, data_points):95    """This function calculates the sum of linear distances from the current candidate median to all points96    in the data set, as such it is the objective function that we are minimising.97    """98    temp = 0.099    for i in range(0, len(data_points)):100        try:101            temp += vincenty(test_median, data_points[i]).kilometers102        except ValueError:103            # Vincenty doesn't always converge so fall back on great circle distance which is less accurate but always converges104            temp += great_circle(test_median, data_points[i]).kilometers105    return temp106def main(iterations=1000):107    count = 0108    with open(DATA_POINTS_FILE, 'r') as fin:109        csvreader = csv.reader(fin)110        assert next(csvreader) == ['uid','lat','lon']111        with open(OUTPUT_MEDIANS, 'w') as fout:112            csvwriter = csv.writer(fout)113            csvwriter.writerow(['uid','median'])114            line = next(csvreader)115            data_points = [(float(line[1]), float(line[2]))]116            current_uid = line[0]117            for line in csvreader:118                if line[0] == current_uid:119                    data_points.append((float(line[1]), float(line[2])))120                else:121                    count += 1122                    if count % 2500 == 0:123                        print("Processed {0} users.".format(count))124                    compute_user_median(data_points, iterations, csvwriter, current_uid)125                    # set user and restart array for new current user126                    current_uid = line[0]127                    data_points = [(float(line[1]), float(line[2]))]128            # compute final user median129            compute_user_median(data_points, iterations, csvwriter, current_uid)130if __name__ == "__main__":...test_haralick.py
Source:test_haralick.py  
1import unittest2import os.path3import numpy as np4from eolearn.core import EOPatch, FeatureType5from eolearn.features import HaralickTask6class TestHaralick(unittest.TestCase):7    TEST_PATCH_FILENAME = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'TestInputs', 'TestPatch')8    @classmethod9    def setUpClass(cls):10        cls.patch = EOPatch.load(cls.TEST_PATCH_FILENAME)11        cls._prepare_patch(cls.patch)12        HaralickTask((FeatureType.DATA, 'ndvi', 'haralick_contrast'), texture_feature='contrast', distance=1, angle=0,13                     levels=255, window_size=3, stride=1).execute(cls.patch)14        HaralickTask((FeatureType.DATA, 'ndvi', 'haralick_sum_of_square_variance'),15                     texture_feature='sum_of_square_variance', distance=1, angle=np.pi/2,16                     levels=8, window_size=5, stride=1).execute(cls.patch)17        HaralickTask((FeatureType.DATA, 'ndvi', 'haralick_sum_entropy'),18                     texture_feature='sum_entropy', distance=1, angle=-np.pi/2,19                     levels=8, window_size=7, stride=1).execute(cls.patch)20        cls.initial_patch = EOPatch.load(cls.TEST_PATCH_FILENAME)21        cls._prepare_patch(cls.initial_patch)22    @staticmethod23    def _prepare_patch(patch):24        ndvi = patch.data['ndvi'][:10]25        ndvi[np.isnan(ndvi)] = 026        patch.data['ndvi'] = ndvi27    def test_new_feature(self):28        haralick = self.patch.data['haralick_contrast']29        delta = 1e-430        test_min = np.min(haralick)31        exp_min = 0.032        self.assertAlmostEqual(test_min, exp_min, delta=delta, msg="Expected min {}, got {}".format(exp_min, test_min))33        test_max = np.max(haralick)34        exp_max = 15620.8333335        self.assertAlmostEqual(test_max, exp_max, delta=delta, msg="Expected max {}, got {}".format(exp_max, test_max))36        test_mean = np.mean(haralick)37        exp_mean = 1585.090538        self.assertAlmostEqual(test_mean, exp_mean, delta=delta,39                               msg="Expected mean {}, got {}".format(exp_mean, test_mean))40        test_median = np.median(haralick)41        exp_median = 1004.91666642        self.assertAlmostEqual(test_median, exp_median, delta=delta,43                               msg="Expected median {}, got {}".format(exp_median, test_median))44        haralick = self.patch.data['haralick_sum_of_square_variance']45        test_min = np.min(haralick)46        exp_min = 7.717447        self.assertAlmostEqual(test_min, exp_min, delta=delta, msg="Expected min {}, got {}".format(exp_min, test_min))48        test_max = np.max(haralick)49        exp_max = 48.781450        self.assertAlmostEqual(test_max, exp_max, delta=delta, msg="Expected max {}, got {}".format(exp_max, test_max))51        test_mean = np.mean(haralick)52        exp_mean = 31.949053        self.assertAlmostEqual(test_mean, exp_mean, delta=delta,54                               msg="Expected mean {}, got {}".format(exp_mean, test_mean))55        test_median = np.median(haralick)56        exp_median = 25.035757        self.assertAlmostEqual(test_median, exp_median, delta=delta,58                               msg="Expected median {}, got {}".format(exp_median, test_median))59        haralick = self.patch.data['haralick_sum_entropy']60        test_min = np.min(haralick)61        exp_min = 062        self.assertAlmostEqual(test_min, exp_min, delta=delta, msg="Expected min {}, got {}".format(exp_min, test_min))63        test_max = np.max(haralick)64        exp_max = 1.297165        self.assertAlmostEqual(test_max, exp_max, delta=delta, msg="Expected max {}, got {}".format(exp_max, test_max))66        test_mean = np.mean(haralick)67        exp_mean = 0.389868        self.assertAlmostEqual(test_mean, exp_mean, delta=delta,69                               msg="Expected mean {}, got {}".format(exp_mean, test_mean))70        test_median = np.median(haralick)71        exp_median = 0.401972        self.assertAlmostEqual(test_median, exp_median, delta=delta,73                               msg="Expected median {}, got {}".format(exp_median, test_median))74    def test_unchanged_features(self):75        for feature, value in self.initial_patch.data.items():76            self.assertTrue(np.array_equal(value, self.patch.data[feature]),77                            msg="EOPatch data feature '{}' was changed in the process".format(feature))78if __name__ == '__main__':...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
