How to use slow_map method in hypothesis

Best Python code snippet using hypothesis

files.py

Source:files.py Github

copy

Full Screen

1#*****************************************************************************************************2# CONTAINS TECHNICAL DATA/COMPUTER SOFTWARE DELIVERED TO THE U.S. GOVERNMENT WITH UNLIMITED RIGHTS3#4# Contract No.: <Contract number, if applicable.>5# Contractor Name: <Space Science Data Center (SSDC), Italian Space Agency (ASI)>6# Contractor Address: <Via del Politecnico snc, 00133 Rome, Italy>7#8# Copyright 2018-2022 by <The Imaging X-ray Polarimetry Explorer (IXPE) team>. All rights reserved.9#10# Use by Non-US Government recipients is allowed by a BSD 3-Clause "Revised" Licensed detailed11# below:12#13# Developed by: <SSDC IXPE Team, INFN IXPE Team>14# <SSDC-ASI, INFN, INAF>15# <www.ssdc.asi.it, home.infn.it, www.inaf.it>16#17# Redistribution and use in source and binary forms, with or without modification, are permitted18# provided that the following conditions are met:19#20# 1. Redistributions of source code must retain the above copyright notice, this list of21# conditions and the following disclaimer.22# 2. Redistributions in binary form must reproduce the above copyright notice, this list of23# conditions and the following disclaimer in the documentation and/or other materials provided24# with the distribution.25# 3. Neither the name of the copyright holder nor the names of its contributors may be used to26# endorse or promote products derived from this software without specific prior written27# permission.28#29# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR30# IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND31# FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR32# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL33# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,34# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER35# IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT36# OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.37#*****************************************************************************************************38from __future__ import print_function, division39import os40import sys41import numpy42from astropy.io import fits43from .utils import current_datetime_string44import logging45#from logging_ import logger, abort46NOT_AVLB = 'N/A'47# Dictionary of correspondence between DET_ID and DETNAM for flight DUs48DETNAMS_DICT = {'DU_FM1' : 'DU4',49 'DU_FM2' : 'DU1',50 'DU_FM3' : 'DU2',51 'DU_FM4' : 'DU3'}52def det_label(detnam=NOT_AVLB, det_id=NOT_AVLB):53 """ Return a label corresponding to the the detector identifier. The label54 is 'dx' where x is equal to 1, 2, 3 or 4 for flight models and to a 2-digit55 number for other detectors (e.g. x = 29 for GPD 29).56 Parameters57 ----------58 detnam : string59 Logical name of the instrument (e.g. DU1)60 det_id : string61 Physical name of the instrument (e.g. DU_FM1 or GPD29)62 """63 if detnam == NOT_AVLB:64 if det_id == NOT_AVLB:65 return 'd0'66 else:67 if det_id.startswith('DU_FM'):68 detnam = DETNAMS_DICT[det_id]69 detnum = int(detnam[2])70 elif det_id.startswith('GPD'):71 detnum = int(det_id[3:5])72 else:73 detnum = int(detnam[2])74 return 'd%d' % detnum75def open_fits_file(file_path, **kwargs):76 """ Thin wrapper around astropy.fits.open, with the aim of performing a few77 basic checks on the file path.78 Parameters79 ----------80 file_path : string81 The input file path82 """83 logger = logging.getLogger('ixpechrgcorr')84 if not os.path.isfile(file_path):85# abort('Cannot open input file %s' % file_path)86 logger.error('Cannot open input file %s' % file_path)87 sys.stdout.write('Cannot open input file %s' % file_path)88 sys.exit('Cannot open input file %s' % file_path)89 if not file_path.endswith('.fits'):90# abort('Input file %s does not look like a FITS file' % file_path)91 logger.error('Input file %s does not look like a FITS file' % file_path)92 sys.stdout.write('Input file %s does not look like a FITS file' % file_path)93 sys.exit('Input file %s does not look like a FITS file' % file_path)94 logger.info('Opening input file %s...' % file_path)95# sys.stdout.write('Opening input file %s...' % file_path)96 return fits.open(file_path, **kwargs)97def fits_openfile(file_path, **kwargs):98 """ Thin wrapper around astropy.fits.open, with the aim of performing a few99 basic checks on the file path.100 Parameters101 ----------102 file_path : string103 The input file path104 """105 logger = logging.getLogger('ixpechrgcorr')106 logger.info('Opening input file %s...' % file_path)107 sys.stdout.write('Opening input file %s...' % file_path)108 return fits.open(file_path, **kwargs)109def read_initial_charging_map(initial_map_file):110 """ Open the initial charging map file and read the values for the slow and111 fast component. We get the number of bins per side from the header.112 Parameters113 ----------114 initial_map_file : string115 the path to the input FITS file storing the charging map116 """117 logger = logging.getLogger('ixpechrgcorr')118 charging_map = open_fits_file(initial_map_file)['CHRG_MAP']119 nside = charging_map.header['NUM_BINS']120 # Initialize the initial slow and fast values as 2d arrays filled with zeroes121 initial_dg_fast = numpy.full((nside, nside), 0.)122 initial_dg_slow = numpy.full((nside, nside), 0.)123 # We fill the two arrays without doing any assumption on the order of the124 # values in the FITS files, but using explicitly the index from the BINX125 # and BINY columns. This is slower but safer, as it will work even if we126 # change the ordering in the input file.127 logger.info('Charging map has (%d x %d) bins' % (nside, nside))128 fast = charging_map.data['FAST']129 slow = charging_map.data['SLOW']130 binx = charging_map.data['BINX']131 biny = charging_map.data['BINY']132 for i, (x, y) in enumerate(zip(binx, biny)):133 # Note: indexes of numpy are (row, column), so y goes first134 initial_dg_fast[y, x] = fast[i]135 initial_dg_slow[y, x] = slow[i]136 return initial_dg_fast, initial_dg_slow137def read_charging_parameters(input_file_path):138 """Open a charging parameters calibration file, read the parameters and139 return them.140 Parameters141 ----------142 input_file_path : string143 the path to the input FITS file storing the charging parameters144 """145 extension = open_fits_file(input_file_path)['CHRG_PAR']146 params = extension.data[0]147 return params['KC_FAST'], params['TD_FAST'], params['DM_FAST'], \148 params['KC_SLOW'], params['TD_SLOW'], params['DM_SLOW']149def create_charging_map_extension(fast_map, slow_map, start_time=NOT_AVLB,150 start_date=NOT_AVLB, version=1, **keywords):151 """ Create the CHRG_MAP extension for a charging map file.152 Parameters153 ----------154 fast_map : numpy array155 a map of the fast component of the charging156 slow_map : numpy array157 a map of the slow component of the charging158 start_time : string159 the time of the day to which the map is referred to (fmt="%m/%d/%Y")160 start_date : string161 the date to which the map is referred to (fmt="%H:%M:%S")162 version : int163 version nuber of the extension164 keywords : dictionary [string] -> keyword165 a dictionary of kewyords that will be written in the header of the166 extension167 """168 # The shape of the fast and of the slow map must match169 if fast_map.shape[0] != slow_map.shape[0]:170 raise RuntimeError('Could not create the CHRG_MAP extension: '\171 'fast and slow map shape mismatch! (%d != %d)' % \172 (fast_map.shape[0], slow_map.shape[0]))173 # After the check it is safe to take the size from either of the maps174 nside = fast_map.shape[0]175 # We use numpy.tile and numpy.repeat to get the right sequence176 # respectively for the rows and the columns177 binx = fits.Column(name='BINX',178 array=numpy.tile(numpy.arange(nside), nside),179 format='I')180 biny = fits.Column(name='BINY',181 array=numpy.repeat(numpy.arange(nside), nside),182 format='I')183 slow = fits.Column(name='SLOW', array=slow_map.flatten(), format='D')184 fast = fits.Column(name='FAST', array=fast_map.flatten(), format='D')185 charging_hdu = fits.BinTableHDU.from_columns([binx, biny, fast, slow])186 # Additional keywords, specific of the charging extension187 charging_keywords = {188 'EXTNAME' : 'CHRG_MAP',189 'VERSION' : (version, 'Extension version number'),190 'CVSD0001' : (start_date, 'Date when this file should first be used'),191 'CVST0001' : (start_time, 'Time of day when this file should first be used'),192 'NUM_BINS' : (nside, 'Number of bins per side of the map'),193 'COMMENT' : 'This extension provides a map of the detector charging '\194 'status expressed as a fraction of its maximum value.'195 }196 # Write the keywords into the header of the extension197 keywords.update(charging_keywords)198 for key, value in keywords.items():199 charging_hdu.header[key] = value200 return charging_hdu201def write_charging_map_to_file(output_file_path, fast_map, slow_map,202 start_time=NOT_AVLB, start_date=NOT_AVLB,203 detnam=NOT_AVLB, det_id=NOT_AVLB,204 version=1, creator=NOT_AVLB):205 """ Write a map of the slow and fast charging to file. For the meaning of206 some of the arguments see the create_charging_map_extension() function207 Parameters208 ----------209 output_file_path : string210 path to the output file211 detnam : string212 detector unit logical name (e.g. DU1)213 det_id : string214 detector unit physical name (e.g. DU_FM2)215 """216 logger = logging.getLogger('ixpechrgcorr')217 # Create a PRIMARY HDU218 primary_hdu = fits.PrimaryHDU()219 # Define the keywords for the PRIMARY extension220# current_date = current_datetime_string("%m/%d/%Y %H:%M:%S")221 current_date = current_datetime_string("%Y-%m-%dT%H:%M:%S.%f")222 primary_keywords = {223 'CREATOR' : (creator, 'creator app'),224 'ORIGIN' : ('IXPE Italy', 'Source of FITS file'),225 'DATE' : (current_date, 'File creation date')226 }227 # Define the keywords common to the PRIMARY and the charging map extension228 shared_keywords = {229 'TELESCOP' : ('IXPE', 'Telescope (mission) name'),230 'INSTRUME' : ('GPD', 'Instrument name'),231 'DETNAM' : (detnam, 'name of the logical Detector Unit'),232 'DET_ID' : (det_id, 'name of the physical Detector Unit')233 }234 # Write both set of keywords into the header of the PRIMARY extension235 primary_keywords.update(shared_keywords)236 for key, value in primary_keywords.items():237 primary_hdu.header[key] = value238 # Now add the FILENAME keyword for the charging extension only:239 shared_keywords.update(240 {'FILENAME' : (os.path.basename(output_file_path), 'File name')}241 )242 # Create the charging map extension243 charging_hdu = create_charging_map_extension(fast_map, slow_map,244 start_time=start_time, start_date=start_date, detnam=detnam,245 det_id=det_id, version=version, **shared_keywords)246 # Create the HDUList and write everything to file247 new_hdul = fits.HDUList([primary_hdu, charging_hdu])248 logger.info('Writing charging map to %s...', output_file_path)249 new_hdul.writeto(output_file_path, overwrite=True)250 logger.info('Done.')251 return output_file_path252def create_charging_parameter_file(output_file_path=None,253 detnam=NOT_AVLB, det_id=NOT_AVLB,254 start_date=None, start_time=None,255 version=1, creator=NOT_AVLB,256 **charging_parameters):257 """258 """259 logger = logging.getLogger('ixpechrgcorr')260 if output_file_path is None:261 du_label = det_label(detnam=detnam, det_id=det_id)262 output_file_path = 'ixpe_sample_%s_%s_chrgparams_%02d.fits' % \263 (du_label, current_datetime_string("%Y%m%d"),264 version)265# date_ = current_datetime_string("%m/%d/%Y %H:%M:%S")266 date_ = current_datetime_string("%Y-%m-%dT%H:%M:%S.%f")267 if start_date is None:268 start_date = current_datetime_string("%m/%d/%Y")269 if start_time is None:270 start_time = current_datetime_string("%H:%M:%S")271 output_file_name = os.path.basename(output_file_path)272 keywords = {273 'TELESCOP' : ('IXPE', 'Telescope (mission) name'),274 'INSTRUME' : ('GPD', 'Instrument name'),275 'CREATOR' : (creator, 'creator app'),276 'ORIGIN' : ('IXPE Italy', 'Source of FITS file'),277 'DATE' : (date_, 'file creation date'),278 'DETNAM' : (detnam, 'Detector Unit Logical ID (1,2 or 3)'),279 'DET_ID' : (det_id, 'Name of the physical Detector Unit of the instr'),280 }281 primary_hdu = fits.PrimaryHDU()282 for key, value in keywords.items():283 primary_hdu.header[key] = value284 k_c_fast = fits.Column(name='KC_FAST',285 array=numpy.array([charging_parameters.get('k_c_fast')]), format='D')286 tau_d_fast = fits.Column(name='TD_FAST',287 array=numpy.array([charging_parameters.get('tau_d_fast')]), format='D')288 delta_max_fast = fits.Column(name='DM_FAST',289 array=numpy.array([charging_parameters.get('delta_max_fast')]),290 format='D')291 k_c_slow = fits.Column(name='KC_SLOW',292 array=numpy.array([charging_parameters.get('k_c_slow')]), format='D')293 tau_d_slow = fits.Column(name='TD_SLOW',294 array=numpy.array([charging_parameters.get('tau_d_slow')]), format='D')295 delta_max_slow = fits.Column(name='DM_SLOW',296 array=numpy.array([charging_parameters.get('delta_max_slow')]),297 format='D')298 chrg_params_hdu = fits.BinTableHDU.from_columns([k_c_fast, tau_d_fast,299 delta_max_fast, k_c_slow, tau_d_slow, delta_max_slow])300 chrg_params_keywords = {301 'EXTNAME' : 'CHRG_PAR',302 'FILENAME' : (output_file_name, 'File name'),303 'VERSION' : (version, 'Extension version number'),304 'CONTENT' : ('IXPE Charging Parameters Files', 'File content'),305 'CCLS0001' : ('BCF', 'Dataset is a Basic Calibration File'),306 'CDTP0001' : ('DATA', 'Calibration file contains data'),307 'CCNM0001' : ('CHRG_PAR', 'Type of calibration data'),308 'CVSD0001' : (start_date, 'Date when this file should first be used'),309 'CVST0001' : (start_time, 'Time of day when this file should first be used'),310 'CDES0001' : ('IXPE Charging parameters', 'Description'),311 'COMMENT' : 'This extension provides the value of the parameters used '\312 'in charging correction.'313 }314 keywords.update(chrg_params_keywords)315 for key, value in keywords.items():316 chrg_params_hdu.header[key] = value317 new_hdul = fits.HDUList([primary_hdu, chrg_params_hdu])318 logger.info('Writing charging parameters file %s...' % output_file_path)319 new_hdul.writeto(output_file_path, overwrite=True)...

Full Screen

Full Screen

cloudfront.py

Source:cloudfront.py Github

copy

Full Screen

1import re2import os3import boto4from django.conf import settings5_cf_connection = None6_cf_distribution = None7def _upload_to_cloudfront(filepath):8 global _cf_connection9 global _cf_distribution10 11 if _cf_connection is None:12 _cf_connection = boto.connect_cloudfront(settings.AWS_ACCESS_KEY, 13 settings.AWS_ACCESS_SECRET)14 15 if _cf_distribution is None:16 _cf_distribution = _cf_connection.create_distribution(17 origin='%s.s3.amazonaws.com' % settings.AWS_STORAGE_BUCKET_NAME,18 enabled=True,19 comment=settings.AWS_CLOUDFRONT_DISTRIBUTION_COMMENT)20 21 22 # now we can delete any old versions of the same file that have the 23 # same name but a different timestamp24 basename = os.path.basename(filepath)25 object_regex = re.compile('%s\.(\d+)\.%s' % \26 (re.escape('.'.join(basename.split('.')[:-2])),27 re.escape(basename.split('.')[-1])))28 for obj in _cf_distribution.get_objects():29 match = object_regex.findall(obj.name)30 if match:31 old_timestamp = int(match[0])32 new_timestamp = int(object_regex.findall(basename)[0])33 if new_timestamp == old_timestamp:34 # an exact copy already exists35 return obj.url()36 elif new_timestamp > old_timestamp:37 # we've come across the same file but with an older timestamp38 #print "DELETE!", obj_.name39 obj.delete()40 break41 42 # Still here? That means that the file wasn't already in the distribution43 44 fp = open(filepath)45 46 # Because the name will always contain a timestamp we set faaar future 47 # caching headers. Doesn't matter exactly as long as it's really far future.48 headers = {'Cache-Control':'max-age=315360000, public',49 'Expires': 'Thu, 31 Dec 2037 23:55:55 GMT',50 }51 52 #print "\t\t\tAWS upload(%s)" % basename53 obj = _cf_distribution.add_object(basename, fp, headers=headers)54 return obj.url()55from time import time56class SlowMap(object):57 """58 >>> slow_map = SlowMap(60)59 >>> slow_map[key] = value60 >>> print slow_map.get(key)61 None62 Then 60 seconds goes past:63 >>> slow_map.get(key)64 value65 """66 def __init__(self, timeout_seconds):67 self.timeout = timeout_seconds68 self.guard = dict()69 self.data = dict()70 def get(self, key, default=None):71 value = self.data.get(key)72 if value is not None:73 return value74 75 if key not in self.guard:76 return default77 78 value, expires = self.guard.get(key)79 80 if expires < time():81 # good to release82 self.data[key] = value83 del self.guard[key]84 return value85 else:86 # held back87 return default88 def __setitem__(self, key, value):89 self.guard[key] = (value, time() + self.timeout)90 91# The estimated time it takes AWS CloudFront to create the domain name is92# 1 hour.93DISTRIBUTION_WAIT_TIME = 60 * 6094_conversion_map = SlowMap(DISTRIBUTION_WAIT_TIME)95def file_proxy(uri, new=False, filepath=None, changed=False, **kwargs):96 if filepath and (new or changed):97 if filepath.lower().split('.')[-1] in ('jpg','gif','png'):98 #print "UPLOAD TO CLOUDFRONT", filepath99 _conversion_map[uri] = _upload_to_cloudfront(filepath)100 return _conversion_map.get(uri, uri)101if __name__ == '__main__':102 import sys103 try:104 filepath = sys.argv[1]105 assert os.path.isfile(filepath)106 except (AssertionError, IndexError):107 print "python %s /path/to/a/file" % __file__108 sys.exit(1)109 110 from django.core.management import setup_environ111 import cloudfront_static_settings112 setup_environ(cloudfront_static_settings)...

Full Screen

Full Screen

test_seed_printing.py

Source:test_seed_printing.py Github

copy

Full Screen

...21):22 monkeypatch.setattr(core, "running_under_pytest", in_pytest)23 strategy = st.integers()24 if fail_healthcheck:25 def slow_map(i):26 time.sleep(10)27 return i28 strategy = strategy.map(slow_map)29 expected_exc = FailedHealthCheck30 else:31 expected_exc = AssertionError32 @settings(database=None, verbosity=verbosity)33 @given(strategy)34 def test(i):35 assert fail_healthcheck36 with capture_out() as o:37 with pytest.raises(expected_exc):38 test()39 output = o.getvalue()...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run hypothesis automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful