How to use save_raw method in yandex-tank

Best Python code snippet using yandex-tank

id15v2.py

Source:id15v2.py Github

copy

Full Screen

1#!/usr/bin/env python2# -*- coding: utf-8 -*-3"""Data Analysis plugin tailored for ID154 5"""6__authors__ = ["Jérôme Kieffer"]7__contact__ = "Jerome.Kieffer@ESRF.eu"8__license__ = "MIT"9__copyright__ = "European Synchrotron Radiation Facility, Grenoble, France"10__date__ = "03/09/2020"11__status__ = "development"12version = "0.5.0"13import os14import numpy15import logging16import copy17import json18import glob19from dahu import version as dahu_version20from dahu.plugin import Plugin21from dahu.factory import register22from dahu.utils import get_isotime23logger = logging.getLogger("id15v2")24try:25 import fabio26 import pyFAI, pyFAI.io27 from pyFAI import azimuthalIntegrator28 from pyFAI.method_registry import IntegrationMethod29 from silx.opencl.codec.byte_offset import ByteOffset30except ImportError:31 logger.error("Failed to import PyFAI, fabio or silx: download and install it from pypi")32@register33class IntegrateManyFrames(Plugin):34 """This is the basic plugin of PyFAI for azimuthal integration35 36 Typical JSON file:37 {"poni_file": "/tmp/example.poni",38 "input_files": ["/tmp/file1.cbf", "/tmp/file2.cbf"],39 "monitor_values": [1, 1.1],40 "npt": 2000,41 "npt_azim": None, 42 "unit": "2th_deg",43 "output_file": "/path/to/dest.h5",44 "save_raw": "/path/to/hdf5/with/raw.h5",45 "delete_incoming": False,46 "do_SA":True,47 }48 Plus:49 "mask":50 "wavelength"51 "unit"52 "dummy"53 "delta_dummy"54 "do_polarziation"55 "polarization_factor"56 "do_SA"57 "norm"58 "raw_compression": "bitshuffle",59 "integration_method": "integrate1d"60 "sigma_clip_thresold":361 self.sigma_clip_max_iter: 562 63 """64 def __init__(self):65 """66 """67 Plugin.__init__(self)68 self.ai = None # this is the azimuthal integrator to use69 self.npt = 200070 self.npt_azim = 25671 self.input_files = []72 self.method = "ocl_nosplit_csr_gpu"73 self.unit = "q_nm^-1"74 self.output_file = None75 self.mask = None76 self.wavelength = None77 self.polarization_factor = None78 self.do_SA = False79 self.dummy = None80 self.delta_dummy = None81 self.norm = 182 self.error_model = None # "poisson"83 self.save_raw = None84 self.raw_nxs = None85 self.raw_ds = None86 self.raw_compression = None87 self.integration_method = "integrate1d"88 self.sigma_clip_thresold = 389 self.sigma_clip_max_iter = 590 self.medfilt1d_percentile = (10, 90)91 def setup(self, kwargs=None):92 """Perform the setup of the job.93 mainly parsing of the kwargs. 94 95 :param kwargs: dict with parmaters.96 :return: None97 """98 logger.debug("IntegrateManyFrames.setup")99 Plugin.setup(self, kwargs)100 self.input_files = self.input.get("input_files")101 if not self.input_files:102 self.log_error("InputError: input_files not in input.")103 if not isinstance(self.input_files, list):104 self.input_files = glob.glob(self.input_files)105 self.input_files.sort()106 if "output_file" not in self.input:107 self.log_error("InputWarning: output_file not in input, save in input directory",108 do_raise=False)109 self.output_file = os.path.join(os.path.dirname(self.input_files[0]), "output.h5")110 else:111 self.output_file = os.path.abspath(self.input["output_file"])112 if not self.output_file.endswith(".h5"):113 self.output_file = self.output_file + ".h5"114 poni_file = self.input.get("poni_file")115 if not poni_file:116 self.log_error("InputError: poni_file not in input.")117 self.ai = pyFAI.load(poni_file)118# stored = self._ais.get(poni_file, ai)119# if stored is ai:120# self.ai = stored121# else:122# self.ai = copy.deepcopy(stored)123 self.npt = int(self.input.get("npt", self.npt))124 self.npt_azim = self.input.get("npt_azim", self.npt_azim)125 self.unit = self.input.get("unit", self.unit)126 self.wavelength = self.input.get("wavelength", self.wavelength)127 if os.path.exists(self.input.get("mask", "")):128 self.mask = fabio.open(self.input["mask"]).data129 self.dummy = self.input.get("dummy", self.dummy)130 self.delta_dummy = self.input.get("delta_dummy", self.delta_dummy)131 if self.input.get("do_polarziation"):132 self.polarization_factor = self.input.get("polarization_factor", self.polarization_factor)133 self.do_SA = self.input.get("do_SA", self.do_SA)134 self.norm = self.input.get("norm", self.norm)135 self.save_raw = self.input.get("save_raw", self.save_raw)136 self.integration_method = self.input.get("integration_method", self.integration_method)137 self.sigma_clip_thresold = self.input.get("sigma_clip_thresold", self.sigma_clip_thresold)138 self.sigma_clip_max_iter = self.input.get("sigma_clip_max_iter", self.sigma_clip_max_iter)139 self.medfilt1d_percentile = self.input.get("medfilt1d_percentile", self.medfilt1d_percentile)140 method = self.input.get("method", self.method)141 if "1" in self.integration_method:142 integration_dim = 1143 else:144 integration_dim = 2145 if isinstance(method, (str, bytes)):146 self.method = IntegrationMethod.select_old_method(integration_dim, method)147 else:148 self.method = IntegrationMethod.select_one_available(method, dim=integration_dim, degradable=True)149 print(self.method)150 self.raw_compression = self.input.get("raw_compression", self.raw_compression)151 if self.save_raw:152 self.prepare_raw_hdf5(self.raw_compression)153 def process(self):154 Plugin.process(self)155 logger.debug("IntegrateManyFrames.process")156 if self.integration_method == "integrate2d":157 res = numpy.zeros((len(self.input_files), self.npt_azim, self.npt), dtype=numpy.float32) # numpy array for storing data158 else:159 res = numpy.zeros((len(self.input_files), self.npt), dtype=numpy.float32) # numpy array for storing data160 sigma = None161 if self.error_model or self.integration_method == "sigma_clip":162 if self.integration_method == "integrate2d":163 sigma = numpy.zeros((len(self.input_files), self.npt_azim, self.npt), dtype=numpy.float32) # numpy array for storing data164 else:165 sigma = numpy.zeros((len(self.input_files), self.npt), dtype=numpy.float32) # numpy array for storing data166 method = self.ai.__getattribute__(self.integration_method)167 common_param = {"method": self.method,168 "unit": self.unit,169 "dummy": self.dummy,170 "delta_dummy": self.delta_dummy,171 "mask": self.mask,172 "polarization_factor": self.polarization_factor,173 "normalization_factor": self.norm,174 "correctSolidAngle": self.do_SA}175 if self.integration_method in ("integrate1d", "integrate_radial"):176 common_param["npt"] = self.npt177 common_param["error_model"] = self.error_model178 common_param["safe"] = False179 else:180 common_param["npt_rad"] = self.npt181 common_param["npt_azim"] = self.npt_azim182 if self.integration_method == "sigma_clip":183 common_param["thres"] = self.sigma_clip_thresold,184 common_param["max_iter"] = self.sigma_clip_max_iter185 if self.integration_method == "medfilt1d":186 common_param["percentile"] = self.medfilt1d_percentile187 # prepare some tools188 cbf = fabio.open(self.input_files[0])189 bo = ByteOffset(os.path.getsize(self.input_files[0]), cbf.data.size,190 devicetype="gpu")191 shape = cbf.data.shape192 for idx, fname in enumerate(self.input_files):193 logger.debug("process %s: %s", idx, fname)194 if fname.endswith("cbf"):195 raw = cbf.read(fname, only_raw=True)196 data = bo(raw, as_float=False).get().reshape(shape)197 else:198 data = fabio.open(fname).data199 if data is None:200 self.log_error("Failed reading file: %s" % self.input_files[idx],201 do_raise=False)202 continue203 if self.save_raw:204 self.raw_ds[idx] = data205 out = method(data, **common_param)206 res[idx] = out.intensity207 if self.error_model or self.integration_method == "sigma_clip":208 sigma[idx] = out.sigma209 self.save_result(out, res, sigma)210 if self.input.get("delete_incoming"):211 for fname in self.input_files:212 try:213 os.unlink(fname)214 except IOError as err:215 self.log_warning(err)216 def prepare_raw_hdf5(self, filter_=None):217 """Prepare an HDF5 output file for saving raw data218 :param filter_: name of the compression filter 219 """220 kwfilter = {}221 if filter_ == "gzip":222 kwfilter = {"compression": "gzip", "shuffle": True}223 elif filter_ == "lz4":224 kwfilter = {"compression": 32004, "shuffle": True}225 elif filter_ == "bitshuffle":226 kwfilter = {"compression": 32008, "compression_opts": (0, 2)} # enforce lz4 compression227 first_image = self.input_files[0]228 fimg = fabio.open(first_image)229 shape = fimg.data.shape230 stack_shape = (len(self.input_files),) + shape231 first_frame_timestamp = os.stat(first_image).st_ctime232 try:233 self.raw_nxs = pyFAI.io.Nexus(self.save_raw, "a")234 except IOError as error:235 self.log_warning("invalid HDF5 file %s: remove and re-create!\n%s" % (self.save_raw, error))236 os.unlink(self.save_raw)237 self.raw_nxs = pyFAI.io.Nexus(self.save_raw)238 entry = self.raw_nxs.new_entry("entry",239 program_name="dahu",240 title="ID15.raw_data",241 force_time=first_frame_timestamp,242 force_name=True)243 entry["program_name"].attrs["version"] = dahu_version244 entry["plugin_name"] = numpy.string_(".".join((os.path.splitext(os.path.basename(__file__))[0], self.__class__.__name__)))245 entry["plugin_name"].attrs["version"] = version246 coll = self.raw_nxs.new_class(entry, "data", class_type="NXdata")247 try:248 self.raw_ds = coll.require_dataset(name="data", shape=stack_shape,249 dtype=fimg.data.dtype,250 chunks=(1,) + shape,251 **kwfilter)252 except Exception as error:253 logger.error("Error in creating dataset, disabling compression:%s", error)254 self.raw_ds = coll.require_dataset(name="data", shape=stack_shape,255 dtype=fimg.data.dtype,256 chunks=(1,) + shape)257 return self.raw_ds258 def save_result(self, out, I, sigma=None):259 """Save the result of the work as a HDF5 file260 261 :param out: scattering result 262 :param I: Intensities as 2D array263 :param sigma: standard deviation of I as 2D array, is possible 264 """265 logger.debug("IntegrateManyFrames.save_result")266 isotime = numpy.string_(get_isotime())267 try:268 nxs = pyFAI.io.Nexus(self.output_file, "a")269 except IOError as error:270 self.log_warning("invalid HDF5 file %s: remove and re-create!\n%s" % (self.output_file, error))271 os.unlink(self.output_file)272 nxs = pyFAI.io.Nexus(self.output_file)273 entry = nxs.new_entry("entry", program_name="dahu", title="ID15.IntegrateManyFrames ")274 entry["program_name"].attrs["version"] = dahu_version275 entry["plugin_name"] = numpy.string_(".".join((os.path.splitext(os.path.basename(__file__))[0], self.__class__.__name__)))276 entry["plugin_name"].attrs["version"] = version277 entry["input"] = numpy.string_(json.dumps(self.input))278 entry["input"].attrs["format"] = 'json'279 subentry = nxs.new_class(entry, "PyFAI", class_type="NXprocess")280 subentry["program"] = numpy.string_("PyFAI")281 subentry["version"] = numpy.string_(pyFAI.version)282 subentry["date"] = isotime283 subentry["processing_type"] = numpy.string_(self.integration_method)284 coll = nxs.new_class(subentry, "process_%s" % self.integration_method,285 class_type="NXdata")286 metadata_grp = coll.require_group("parameters")287 for key, value in self.ai.getPyFAI().items():288 metadata_grp[key] = numpy.string_(value)289 scale, unit = str(out.unit).split("_", 1)290 coll[scale] = out.radial.astype("float32")291 coll[scale].attrs["interpretation"] = "scalar"292 coll[scale].attrs["unit"] = unit293 coll["I"] = I.astype("float32")294 coll["I"].attrs["interpretation"] = "spectrum"295 coll["I"].attrs["signal"] = "1"296 coll.attrs["signal"] = "I"297 coll.attrs["axes"] = [".", scale]298 if sigma is not None:299 coll["errors"] = sigma.astype("float32")300 coll["errors"].attrs["interpretation"] = "spectrum"301 nxs.close()302 def teardown(self):303 Plugin.teardown(self)304 logger.debug("IntegrateManyFrames.teardown")305 # Create some output data306 self.output["output_file"] = self.output_file307 if self.save_raw:308 self.raw_nxs.close()309 self.output["save_raw"] = self.save_raw310 self.raw_nxs = None...

Full Screen

Full Screen

dribble.py

Source:dribble.py Github

copy

Full Screen

1#!/usr/bin/env python32# ----------------------------------------------------------------------3# This file is part of the 'Dribble' package for percolation simulations.4# Copyright (c) 2013-2018 Alexander Urban (aurban@atomistic.net)5# ----------------------------------------------------------------------6# This Source Code Form is subject to the terms of the Mozilla Public7# License, v. 2.0. If a copy of the MPL was not distributed with this8# file, You can obtain one at http://mozilla.org/MPL/2.0/.9#10# This program is distributed in the hope that it will be useful, but11# WITHOUT ANY WARRANTY; without even the implied warranty of12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the13# Mozilla Public License, v. 2.0, for more details.14"""15Dribble - Percolation Simulation on Lattices16Analyze the ionic percolation properties of an input structure.17"""18import argparse19import sys20import time21import numpy as np22from dribble.io import Input23from dribble.percolator import Percolator24from dribble.lattice import Lattice25from dribble.misc import uprint26__author__ = "Alexander Urban"27def check_if_percolating(percolator, inp, save_clusters, tortuosity):28 noccup = percolator.num_occupied29 nspan = percolator.check_spanning(verbose=True,30 save_clusters=save_clusters,31 static_sites=inp.static_sites)32 if (nspan > 0):33 uprint(" The initial structure is percolating.\n")34 uprint(" Fraction of accessible sites: {}\n".format(35 float(nspan)/float(noccup)))36 if tortuosity:37 for c in percolator.percolating_clusters:38 t_min, t_mean, t_std = percolator.get_tortuosity(c)39 uprint(" Tortuosity of cluster {} (min, mean): ".format(c)40 + "{:5.3f}, {:5.3f} +/- {:5.3f}".format(41 t_min, t_mean, t_std))42 uprint("")43 else:44 uprint(" The initial structure is NOT percolating.\n")45 uprint(" Fraction of accessible sites: 0.0\n")46def calc_critical_concentration(percolator, save_clusters, samples,47 file_name, sequence):48 if save_clusters:49 (pc_site_any, pc_site_two, pc_site_all, pc_bond_any,50 pc_bond_two, pc_bond_all) = percolator.percolation_point(51 sequence, samples=samples, file_name=file_name+".vasp")52 else:53 (pc_site_any, pc_site_two, pc_site_all, pc_bond_any,54 pc_bond_two, pc_bond_all) = percolator.percolation_point(55 sequence, samples=samples)56 uprint(" Critical site (bond) concentrations to find a "57 "wrapping cluster\n")58 uprint(" in one or more dimensions p_c1 = {:.8f} ({:.8f})".format(59 pc_site_any, pc_bond_any))60 uprint(" in two or three dimensions p_c2 = {:.8f} ({:.8f})".format(61 pc_site_two, pc_bond_two))62 uprint(" in all three dimensions p_c3 = {:.8f} ({:.8f})".format(63 pc_site_all, pc_bond_all))64 uprint("")65def calc_p_infinity(percolator, samples, save_raw, file_name, sequence):66 plist = np.arange(0.01, 1.00, 0.01)67 (Q, X) = percolator.calc_p_infinity(68 plist, sequence, samples=samples,69 save_discrete=save_raw)70 # integrate susceptibility X in order to normalize it71 intX = np.sum(X)*(plist[1]-plist[0])72 fname = file_name + ".infty"73 uprint(" Writing results to: {}\n".format(fname))74 with open(fname, 'w') as f:75 f.write("# {:^10s} {:>10s} {:>15s} {:>15s}\n".format(76 "p", "P_infty(p)", "Chi(p)", "normalized"))77 for p in range(len(plist)):78 f.write(" {:10.8f} {:10.8f} {:15.8f} {:15.8f}\n".format(79 plist[p], Q[p], X[p], X[p]/intX))80def calc_p_wrapping(percolator, samples, save_raw, file_name, sequence):81 plist = np.arange(0.01, 1.00, 0.01)82 (Q, Qc) = percolator.calc_p_wrapping(83 plist, sequence, samples=samples,84 save_discrete=save_raw)85 fname = file_name + ".wrap"86 uprint(" Writing results to: {}\n".format(fname))87 with open(fname, 'w') as f:88 f.write("# {:^10s} {:>10s} {:>10s}\n".format(89 "p", "P_wrap(p)", "cumulative"))90 for p in range(len(plist)):91 f.write(" {:10.8f} {:10.8f} {:10.8f}\n".format(92 plist[p], Q[p], Qc[p]))93def calc_inaccessible_sites(percolator, samples, save_raw, file_name,94 sequence, species):95 plist = np.arange(0.01, 1.00, 0.01)96 (F_inacc, nclus) = percolator.inaccessible_sites(97 plist, sequence, species, samples=samples,98 save_discrete=save_raw)99 fname = file_name + ".inacc"100 uprint(" Writing results to: {}\n".format(fname))101 with open(fname, 'w') as f:102 f.write("# {:^10s} {:>10s} {:>10s}\n".format(103 "p", "F_inacc(p)", "N_percol(p)"))104 for p in range(len(plist)):105 f.write(" {:10.8f} {:10.8f} {:12.6f}\n".format(106 plist[p], F_inacc[p], nclus[p]))107def calc_mean_tortuosity(percolator, samples, file_name, sequence):108 F_tort = percolator.mean_tortuosity(109 sequence, samples=samples)110 fname = file_name + ".tortuosity"111 uprint(" Writing results to: {}\n".format(fname))112 with open(fname, 'w') as f:113 f.write("# {:^10s} {:^10s} {:s}\n".format(114 "N", "p", "Tortuosity(p)"))115 N = len(F_tort)116 for i, T in enumerate(F_tort):117 f.write(" {:10d} {:10.8f} {:10.8f}\n".format(118 i+1, (i+1)/float(N), T))119def compute_percolation(input_file, structure_file, samples,120 save_clusters, save_raw, file_name, pc, check,121 pinf, pwrap, inaccessible, tortuosity,122 mean_tortuosity, supercell):123 if not (check or pc or pinf or pwrap or inaccessible or mean_tortuosity):124 print("\n Nothing to do.")125 print(" Please specify the quantity to be calculated.")126 print(" Use the `--help' flag to list all options.\n")127 sys.exit()128 input_params = {}129 if structure_file is not None:130 uprint("\n Reading structure from file: {}".format(structure_file))131 input_params['structure'] = structure_file132 uprint("\n Parsing input file '{}'...".format(input_file), end="")133 inp = Input.from_file(input_file, **input_params)134 uprint(" done.")135 uprint("\n Setting up lattice and neighbor lists...", end="")136 lattice = Lattice.from_input_object(inp, supercell=supercell)137 uprint(" done.")138 uprint(lattice)139 uprint(" Initializing percolator...", end="")140 percolator = Percolator.from_input_object(inp, lattice, verbose=True)141 uprint(" done.")142 uprint("\n MC percolation simulation\n -------------------------\n")143 if check: # check, if initial structure is percolating144 check_if_percolating(percolator, inp, save_clusters, tortuosity)145 if pc: # calculate critical site concentrations146 calc_critical_concentration(percolator, save_clusters, samples,147 file_name, inp.flip_sequence)148 if pinf: # estimate P_infinity(p)149 calc_p_infinity(percolator, samples, save_raw, file_name,150 inp.flip_sequence)151 if pwrap: # estimate P_wrapping(p)152 calc_p_wrapping(percolator, samples, save_raw, file_name,153 inp.flip_sequence)154 if inaccessible is not None: # fraction of inaccessible sites155 calc_inaccessible_sites(percolator, samples, save_raw,156 file_name, inp.flip_sequence,157 inaccessible)158 if mean_tortuosity: # tortuosity as function of concentration159 calc_mean_tortuosity(percolator, samples, file_name,160 inp.flip_sequence)161 dt = time.gmtime(time.clock())162 uprint(" All done. Elapsed CPU time: {:02d}h{:02d}m{:02d}s\n".format(163 dt.tm_hour, dt.tm_min, dt.tm_sec))164def main():165 parser = argparse.ArgumentParser(166 description=__doc__,167 formatter_class=argparse.RawDescriptionHelpFormatter)168 parser.add_argument(169 "input_file",170 help="Input file in JSON format")171 parser.add_argument(172 "structure_file",173 help="Optional structure file in VASP's POSCAR format.",174 default=None,175 nargs="?")176 parser.add_argument(177 "--supercell",178 help="List of multiples of the lattice cell" +179 " in the three lattice directions",180 type=int,181 default=(1, 1, 1),182 nargs=3)183 parser.add_argument(184 "--inaccessible", "-i",185 help="Calculate fraction of inaccessible sites for given "186 "reference species",187 type=str,188 default=None,189 metavar="SPECIES")190 parser.add_argument(191 "--pc", "-p",192 help="Calculate critical site concentrations",193 action="store_true")194 parser.add_argument(195 "--check",196 help="Check, if the initial structure is percolating.",197 action="store_true")198 parser.add_argument(199 "--pinf", "-s",200 help="Estimate P_infinity and percolation susceptibility",201 action="store_true")202 parser.add_argument(203 "--pwrap", "-w",204 help="Estimate P_wrap(p)",205 action="store_true")206 parser.add_argument(207 "--tortuosity", "-t",208 help="Compute tortuosity of the percolating clusters as function "209 "of the concentration. Together with '--check', only compute "210 "tortuosity of the input structure.",211 action="store_true")212 parser.add_argument(213 "--samples",214 help="number of samples to be averaged",215 type=int,216 default=500)217 parser.add_argument(218 "--file-name",219 help="base file name for all output files",220 default="percol")221 parser.add_argument(222 "--save-clusters",223 help="save wrapping clusters to file",224 action="store_true")225 parser.add_argument(226 "--save-raw",227 help="Also store raw data before convolution (where available).",228 action="store_true")229 parser.add_argument(230 "--debug",231 help="run in debugging mode",232 action="store_true")233 args = parser.parse_args()234 if args.debug:235 np.random.seed(seed=1)236 if args.tortuosity and not args.check:237 mean_tortuosity = True238 else:239 mean_tortuosity = False240 compute_percolation(input_file=args.input_file,241 structure_file=args.structure_file,242 samples=args.samples,243 save_clusters=args.save_clusters,244 save_raw=args.save_raw,245 file_name=args.file_name,246 pc=args.pc,247 check=args.check,248 pinf=args.pinf,249 pwrap=args.pwrap,250 inaccessible=args.inaccessible,251 tortuosity=args.tortuosity,252 mean_tortuosity=mean_tortuosity,253 supercell=args.supercell)254if (__name__ == "__main__"):...

Full Screen

Full Screen

get_chargecloud_data.py

Source:get_chargecloud_data.py Github

copy

Full Screen

1import pandas as pd 2import typing 3import requests4import datetime5import logging 6import logging.config7import time 8import json 9import os10logger = logging.getLogger(__name__)11from settings import LOGGING_CONFIG12BASE_URL_CHARGECLOUD = "https://new-poi.chargecloud.de"13SCRAPING_INTERVAL = 1014CITIES_CC = ['koeln','dortmund','bonn','muenster','kiel',15 'chemnitz','krefeld','leverkusen','heidelberg',16 'solingen','ingolstadt','pforzheim','goettingen',17 'erlangen','tuebingen','bayreuth','bocholt','dormagen',18 'rastatt','hof','weinheim','bruchsal','nettetal','ansbach',19 'schwabach','ettlingen','crailsheim','deggendorf','forchheim',20 'bretten','buehl','zirndorf','roth','calw','herzogenaurach','wertheim',21 'kitzingen','lichtenfels']22DIR_SAVE_API_RESULTS = "../data/scraped_data"23SAVE_RAW = True24def scrape_cp_cities(cities: typing.List[str], 25 dir_save: str, 26 save_raw: bool = False): 27 """Scrape charging point information for a given list of cities28 Args:29 cities (typing.List[str], optional): list of cities to scrape.30 dir_save (str): directory for saving scraped cities. 31 save_raw (bool, optional): whether to save API result as raw json (True) or pickle file (False). Defaults to False.32 """ 33 data_cities = {}34 now = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") 35 for city in cities: 36 r = requests.get(BASE_URL_CHARGECLOUD + "/" + city)37 try: 38 data = json.loads(r.text)39 data_cities[city] = data40 logger.debug(f"Successfully scraped '{city}' at {now}.")41 except: 42 logger.error(f"Error occurred while scraping '{city}' at {now}.")43 if save_raw: 44 fname = now + "_cp_data_cities.json"45 path_save = os.path.join(dir_save, fname)46 with open(path_save, 'w', encoding='utf-8') as f:47 json.dump(data_cities, f)48 else: 49 fname = now + "_cp_data_cities.pkl"50 path_save = os.path.join(dir_save, fname)51 pd.to_pickle(data_cities, path_save)52 53def call_chargecloud_api(scraping_interval: typing.Union[int, float] = SCRAPING_INTERVAL, 54 cities: typing.List[str] = CITIES_CC, 55 dir_save_api_results: str = DIR_SAVE_API_RESULTS, 56 save_raw: str = SAVE_RAW): 57 """Scrape a given list of cities from chargecloud API in a given interval.58 Args:59 scraping_interval (typing.Union[int, float], optional): interval in minutes between API lookups. Defaults to SCRAPING_INTERVAL.60 cities (typing.List[str], optional): list of cities to scrape61 dir_save (str, optional): directory for saving scraped cities. Defaults to "../data/scraped_data".62 save_raw (bool, optional): whether to save API result as raw json (True) or pickle file (False). Defaults to False.63 """ 64 while True: 65 66 now = datetime.datetime.now().strftime("%Y/%m/%d_%H:%M:%S")67 info_msg = f"Scraping cities: {now}"68 logger.info(info_msg)69 70 scrape_cp_cities(cities=cities, dir_save=dir_save_api_results, save_raw=save_raw)71 72 # API call of all cities takes approx. 15 seconds, therefore subtract it from specified interval73 sleep = max(scraping_interval*60 - 15, 0)74 time.sleep(sleep)75if __name__ == '__main__':76 logging.config.dictConfig(LOGGING_CONFIG)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run yandex-tank automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful