Best Python code snippet using yandex-tank
id15v2.py
Source:id15v2.py  
1#!/usr/bin/env python2# -*- coding: utf-8 -*-3"""Data Analysis plugin tailored for ID154 5"""6__authors__ = ["Jérôme Kieffer"]7__contact__ = "Jerome.Kieffer@ESRF.eu"8__license__ = "MIT"9__copyright__ = "European Synchrotron Radiation Facility, Grenoble, France"10__date__ = "03/09/2020"11__status__ = "development"12version = "0.5.0"13import os14import numpy15import logging16import copy17import json18import glob19from dahu import version as dahu_version20from dahu.plugin import Plugin21from dahu.factory import register22from dahu.utils import get_isotime23logger = logging.getLogger("id15v2")24try:25    import fabio26    import pyFAI, pyFAI.io27    from pyFAI import azimuthalIntegrator28    from pyFAI.method_registry import IntegrationMethod29    from silx.opencl.codec.byte_offset import ByteOffset30except ImportError:31    logger.error("Failed to import PyFAI, fabio or silx: download and install it from pypi")32@register33class IntegrateManyFrames(Plugin):34    """This is the basic plugin of PyFAI for azimuthal integration35       36    Typical JSON file:37    {"poni_file": "/tmp/example.poni",38     "input_files": ["/tmp/file1.cbf", "/tmp/file2.cbf"],39     "monitor_values": [1, 1.1],40     "npt": 2000,41     "npt_azim": None, 42     "unit": "2th_deg",43     "output_file": "/path/to/dest.h5",44     "save_raw": "/path/to/hdf5/with/raw.h5",45     "delete_incoming": False,46     "do_SA":True,47     }48     Plus:49     "mask":50     "wavelength"51     "unit"52     "dummy"53     "delta_dummy"54     "do_polarziation"55     "polarization_factor"56     "do_SA"57     "norm"58     "raw_compression":  "bitshuffle",59     "integration_method": "integrate1d"60     "sigma_clip_thresold":361     self.sigma_clip_max_iter: 562    63    """64    def __init__(self):65        """66        """67        Plugin.__init__(self)68        self.ai = None  # this is the azimuthal integrator to use69        self.npt = 200070        self.npt_azim = 25671        self.input_files = []72        self.method = "ocl_nosplit_csr_gpu"73        self.unit = "q_nm^-1"74        self.output_file = None75        self.mask = None76        self.wavelength = None77        self.polarization_factor = None78        self.do_SA = False79        self.dummy = None80        self.delta_dummy = None81        self.norm = 182        self.error_model = None  # "poisson"83        self.save_raw = None84        self.raw_nxs = None85        self.raw_ds = None86        self.raw_compression = None87        self.integration_method = "integrate1d"88        self.sigma_clip_thresold = 389        self.sigma_clip_max_iter = 590        self.medfilt1d_percentile = (10, 90)91    def setup(self, kwargs=None):92        """Perform the setup of the job.93        mainly parsing of the kwargs. 94         95        :param kwargs: dict with parmaters.96        :return: None97        """98        logger.debug("IntegrateManyFrames.setup")99        Plugin.setup(self, kwargs)100        self.input_files = self.input.get("input_files")101        if not self.input_files:102            self.log_error("InputError: input_files not in input.")103        if not isinstance(self.input_files, list):104            self.input_files = glob.glob(self.input_files)105            self.input_files.sort()106        if "output_file" not in self.input:107            self.log_error("InputWarning: output_file not in input, save in input directory",108                           do_raise=False)109            self.output_file = os.path.join(os.path.dirname(self.input_files[0]), "output.h5")110        else:111            self.output_file = os.path.abspath(self.input["output_file"])112        if not self.output_file.endswith(".h5"):113            self.output_file = self.output_file + ".h5"114        poni_file = self.input.get("poni_file")115        if not poni_file:116            self.log_error("InputError: poni_file not in input.")117        self.ai = pyFAI.load(poni_file)118#         stored = self._ais.get(poni_file, ai)119#         if stored is ai:120#             self.ai = stored121#         else:122#             self.ai = copy.deepcopy(stored)123        self.npt = int(self.input.get("npt", self.npt))124        self.npt_azim = self.input.get("npt_azim", self.npt_azim)125        self.unit = self.input.get("unit", self.unit)126        self.wavelength = self.input.get("wavelength", self.wavelength)127        if os.path.exists(self.input.get("mask", "")):128            self.mask = fabio.open(self.input["mask"]).data129        self.dummy = self.input.get("dummy", self.dummy)130        self.delta_dummy = self.input.get("delta_dummy", self.delta_dummy)131        if self.input.get("do_polarziation"):132            self.polarization_factor = self.input.get("polarization_factor", self.polarization_factor)133        self.do_SA = self.input.get("do_SA", self.do_SA)134        self.norm = self.input.get("norm", self.norm)135        self.save_raw = self.input.get("save_raw", self.save_raw)136        self.integration_method = self.input.get("integration_method", self.integration_method)137        self.sigma_clip_thresold = self.input.get("sigma_clip_thresold", self.sigma_clip_thresold)138        self.sigma_clip_max_iter = self.input.get("sigma_clip_max_iter", self.sigma_clip_max_iter)139        self.medfilt1d_percentile = self.input.get("medfilt1d_percentile", self.medfilt1d_percentile)140        method = self.input.get("method", self.method)141        if "1" in self.integration_method:142            integration_dim = 1143        else:144            integration_dim = 2145        if isinstance(method, (str, bytes)):146            self.method = IntegrationMethod.select_old_method(integration_dim, method)147        else:148            self.method = IntegrationMethod.select_one_available(method, dim=integration_dim, degradable=True)149        print(self.method)150        self.raw_compression = self.input.get("raw_compression", self.raw_compression)151        if self.save_raw:152            self.prepare_raw_hdf5(self.raw_compression)153    def process(self):154        Plugin.process(self)155        logger.debug("IntegrateManyFrames.process")156        if self.integration_method == "integrate2d":157            res = numpy.zeros((len(self.input_files), self.npt_azim, self.npt), dtype=numpy.float32)  # numpy array for storing data158        else:159            res = numpy.zeros((len(self.input_files), self.npt), dtype=numpy.float32)  # numpy array for storing data160        sigma = None161        if self.error_model or self.integration_method == "sigma_clip":162            if self.integration_method == "integrate2d":163                sigma = numpy.zeros((len(self.input_files), self.npt_azim, self.npt), dtype=numpy.float32)  # numpy array for storing data164            else:165                sigma = numpy.zeros((len(self.input_files), self.npt), dtype=numpy.float32)  # numpy array for storing data166        method = self.ai.__getattribute__(self.integration_method)167        common_param = {"method": self.method,168                        "unit": self.unit,169                        "dummy": self.dummy,170                        "delta_dummy": self.delta_dummy,171                        "mask": self.mask,172                        "polarization_factor": self.polarization_factor,173                        "normalization_factor": self.norm,174                        "correctSolidAngle": self.do_SA}175        if self.integration_method in ("integrate1d", "integrate_radial"):176            common_param["npt"] = self.npt177            common_param["error_model"] = self.error_model178            common_param["safe"] = False179        else:180            common_param["npt_rad"] = self.npt181            common_param["npt_azim"] = self.npt_azim182        if self.integration_method == "sigma_clip":183            common_param["thres"] = self.sigma_clip_thresold,184            common_param["max_iter"] = self.sigma_clip_max_iter185        if self.integration_method == "medfilt1d":186            common_param["percentile"] = self.medfilt1d_percentile187        # prepare some tools188        cbf = fabio.open(self.input_files[0])189        bo = ByteOffset(os.path.getsize(self.input_files[0]), cbf.data.size,190                        devicetype="gpu")191        shape = cbf.data.shape192        for idx, fname in enumerate(self.input_files):193            logger.debug("process %s: %s", idx, fname)194            if fname.endswith("cbf"):195                raw = cbf.read(fname, only_raw=True)196                data = bo(raw, as_float=False).get().reshape(shape)197            else:198                data = fabio.open(fname).data199            if data is None:200                self.log_error("Failed reading file: %s" % self.input_files[idx],201                               do_raise=False)202                continue203            if self.save_raw:204                self.raw_ds[idx] = data205            out = method(data, **common_param)206            res[idx] = out.intensity207            if self.error_model or self.integration_method == "sigma_clip":208                sigma[idx] = out.sigma209        self.save_result(out, res, sigma)210        if self.input.get("delete_incoming"):211            for fname in self.input_files:212                try:213                    os.unlink(fname)214                except IOError as err:215                    self.log_warning(err)216    def prepare_raw_hdf5(self, filter_=None):217        """Prepare an HDF5 output file for saving raw data218        :param filter_: name of the compression filter 219        """220        kwfilter = {}221        if filter_ == "gzip":222            kwfilter = {"compression": "gzip", "shuffle": True}223        elif filter_ == "lz4":224            kwfilter = {"compression": 32004, "shuffle": True}225        elif filter_ == "bitshuffle":226            kwfilter = {"compression": 32008, "compression_opts": (0, 2)}  # enforce lz4 compression227        first_image = self.input_files[0]228        fimg = fabio.open(first_image)229        shape = fimg.data.shape230        stack_shape = (len(self.input_files),) + shape231        first_frame_timestamp = os.stat(first_image).st_ctime232        try:233            self.raw_nxs = pyFAI.io.Nexus(self.save_raw, "a")234        except IOError as error:235            self.log_warning("invalid HDF5 file %s: remove and re-create!\n%s" % (self.save_raw, error))236            os.unlink(self.save_raw)237            self.raw_nxs = pyFAI.io.Nexus(self.save_raw)238        entry = self.raw_nxs.new_entry("entry",239                                       program_name="dahu",240                                       title="ID15.raw_data",241                                       force_time=first_frame_timestamp,242                                       force_name=True)243        entry["program_name"].attrs["version"] = dahu_version244        entry["plugin_name"] = numpy.string_(".".join((os.path.splitext(os.path.basename(__file__))[0], self.__class__.__name__)))245        entry["plugin_name"].attrs["version"] = version246        coll = self.raw_nxs.new_class(entry, "data", class_type="NXdata")247        try:248            self.raw_ds = coll.require_dataset(name="data", shape=stack_shape,249                                               dtype=fimg.data.dtype,250                                               chunks=(1,) + shape,251                                               **kwfilter)252        except Exception as error:253            logger.error("Error in creating dataset, disabling compression:%s", error)254            self.raw_ds = coll.require_dataset(name="data", shape=stack_shape,255                                               dtype=fimg.data.dtype,256                                               chunks=(1,) + shape)257        return self.raw_ds258    def save_result(self, out, I, sigma=None):259        """Save the result of the work as a HDF5 file260        261        :param out: scattering result 262        :param I: Intensities as 2D array263        :param sigma: standard deviation of I as 2D array, is possible 264        """265        logger.debug("IntegrateManyFrames.save_result")266        isotime = numpy.string_(get_isotime())267        try:268            nxs = pyFAI.io.Nexus(self.output_file, "a")269        except IOError as error:270            self.log_warning("invalid HDF5 file %s: remove and re-create!\n%s" % (self.output_file, error))271            os.unlink(self.output_file)272            nxs = pyFAI.io.Nexus(self.output_file)273        entry = nxs.new_entry("entry", program_name="dahu", title="ID15.IntegrateManyFrames ")274        entry["program_name"].attrs["version"] = dahu_version275        entry["plugin_name"] = numpy.string_(".".join((os.path.splitext(os.path.basename(__file__))[0], self.__class__.__name__)))276        entry["plugin_name"].attrs["version"] = version277        entry["input"] = numpy.string_(json.dumps(self.input))278        entry["input"].attrs["format"] = 'json'279        subentry = nxs.new_class(entry, "PyFAI", class_type="NXprocess")280        subentry["program"] = numpy.string_("PyFAI")281        subentry["version"] = numpy.string_(pyFAI.version)282        subentry["date"] = isotime283        subentry["processing_type"] = numpy.string_(self.integration_method)284        coll = nxs.new_class(subentry, "process_%s" % self.integration_method,285                             class_type="NXdata")286        metadata_grp = coll.require_group("parameters")287        for key, value in self.ai.getPyFAI().items():288            metadata_grp[key] = numpy.string_(value)289        scale, unit = str(out.unit).split("_", 1)290        coll[scale] = out.radial.astype("float32")291        coll[scale].attrs["interpretation"] = "scalar"292        coll[scale].attrs["unit"] = unit293        coll["I"] = I.astype("float32")294        coll["I"].attrs["interpretation"] = "spectrum"295        coll["I"].attrs["signal"] = "1"296        coll.attrs["signal"] = "I"297        coll.attrs["axes"] = [".", scale]298        if sigma is not None:299            coll["errors"] = sigma.astype("float32")300            coll["errors"].attrs["interpretation"] = "spectrum"301        nxs.close()302    def teardown(self):303        Plugin.teardown(self)304        logger.debug("IntegrateManyFrames.teardown")305        # Create some output data306        self.output["output_file"] = self.output_file307        if self.save_raw:308            self.raw_nxs.close()309            self.output["save_raw"] = self.save_raw310            self.raw_nxs = None...dribble.py
Source:dribble.py  
1#!/usr/bin/env python32# ----------------------------------------------------------------------3# This file is part of the 'Dribble' package for percolation simulations.4# Copyright (c) 2013-2018 Alexander Urban (aurban@atomistic.net)5# ----------------------------------------------------------------------6# This Source Code Form is subject to the terms of the Mozilla Public7# License, v. 2.0. If a copy of the MPL was not distributed with this8# file, You can obtain one at http://mozilla.org/MPL/2.0/.9#10# This program is distributed in the hope that it will be useful, but11# WITHOUT ANY WARRANTY; without even the implied warranty of12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the13# Mozilla Public License, v. 2.0, for more details.14"""15Dribble - Percolation Simulation on Lattices16Analyze the ionic percolation properties of an input structure.17"""18import argparse19import sys20import time21import numpy as np22from dribble.io import Input23from dribble.percolator import Percolator24from dribble.lattice import Lattice25from dribble.misc import uprint26__author__ = "Alexander Urban"27def check_if_percolating(percolator, inp, save_clusters, tortuosity):28    noccup = percolator.num_occupied29    nspan = percolator.check_spanning(verbose=True,30                                      save_clusters=save_clusters,31                                      static_sites=inp.static_sites)32    if (nspan > 0):33        uprint(" The initial structure is percolating.\n")34        uprint(" Fraction of accessible sites: {}\n".format(35            float(nspan)/float(noccup)))36        if tortuosity:37            for c in percolator.percolating_clusters:38                t_min, t_mean, t_std = percolator.get_tortuosity(c)39                uprint(" Tortuosity of cluster {} (min, mean): ".format(c)40                       + "{:5.3f}, {:5.3f} +/- {:5.3f}".format(41                           t_min, t_mean, t_std))42            uprint("")43    else:44        uprint(" The initial structure is NOT percolating.\n")45        uprint(" Fraction of accessible sites: 0.0\n")46def calc_critical_concentration(percolator, save_clusters, samples,47                                file_name, sequence):48    if save_clusters:49        (pc_site_any, pc_site_two, pc_site_all, pc_bond_any,50         pc_bond_two, pc_bond_all) = percolator.percolation_point(51             sequence, samples=samples, file_name=file_name+".vasp")52    else:53        (pc_site_any, pc_site_two, pc_site_all, pc_bond_any,54         pc_bond_two, pc_bond_all) = percolator.percolation_point(55             sequence, samples=samples)56    uprint(" Critical site (bond) concentrations to find a "57           "wrapping cluster\n")58    uprint(" in one or more dimensions   p_c1 = {:.8f}  ({:.8f})".format(59        pc_site_any, pc_bond_any))60    uprint(" in two or three dimensions  p_c2 = {:.8f}  ({:.8f})".format(61        pc_site_two, pc_bond_two))62    uprint(" in all three dimensions     p_c3 = {:.8f}  ({:.8f})".format(63        pc_site_all, pc_bond_all))64    uprint("")65def calc_p_infinity(percolator, samples, save_raw, file_name, sequence):66    plist = np.arange(0.01, 1.00, 0.01)67    (Q, X) = percolator.calc_p_infinity(68        plist, sequence, samples=samples,69        save_discrete=save_raw)70    # integrate susceptibility X in order to normalize it71    intX = np.sum(X)*(plist[1]-plist[0])72    fname = file_name + ".infty"73    uprint(" Writing results to: {}\n".format(fname))74    with open(fname, 'w') as f:75        f.write("# {:^10s}   {:>10s}   {:>15s}   {:>15s}\n".format(76            "p", "P_infty(p)", "Chi(p)", "normalized"))77        for p in range(len(plist)):78            f.write("  {:10.8f}   {:10.8f}   {:15.8f}   {:15.8f}\n".format(79                plist[p], Q[p], X[p], X[p]/intX))80def calc_p_wrapping(percolator, samples, save_raw, file_name, sequence):81    plist = np.arange(0.01, 1.00, 0.01)82    (Q, Qc) = percolator.calc_p_wrapping(83        plist, sequence, samples=samples,84        save_discrete=save_raw)85    fname = file_name + ".wrap"86    uprint(" Writing results to: {}\n".format(fname))87    with open(fname, 'w') as f:88        f.write("# {:^10s}   {:>10s}   {:>10s}\n".format(89            "p", "P_wrap(p)", "cumulative"))90        for p in range(len(plist)):91            f.write("  {:10.8f}   {:10.8f}   {:10.8f}\n".format(92                plist[p], Q[p], Qc[p]))93def calc_inaccessible_sites(percolator, samples, save_raw, file_name,94                            sequence, species):95    plist = np.arange(0.01, 1.00, 0.01)96    (F_inacc, nclus) = percolator.inaccessible_sites(97        plist, sequence, species, samples=samples,98        save_discrete=save_raw)99    fname = file_name + ".inacc"100    uprint(" Writing results to: {}\n".format(fname))101    with open(fname, 'w') as f:102        f.write("# {:^10s}   {:>10s}   {:>10s}\n".format(103            "p", "F_inacc(p)", "N_percol(p)"))104        for p in range(len(plist)):105            f.write("  {:10.8f}   {:10.8f}   {:12.6f}\n".format(106                plist[p], F_inacc[p], nclus[p]))107def calc_mean_tortuosity(percolator, samples, file_name, sequence):108    F_tort = percolator.mean_tortuosity(109        sequence, samples=samples)110    fname = file_name + ".tortuosity"111    uprint(" Writing results to: {}\n".format(fname))112    with open(fname, 'w') as f:113        f.write("# {:^10s}  {:^10s}   {:s}\n".format(114            "N", "p", "Tortuosity(p)"))115        N = len(F_tort)116        for i, T in enumerate(F_tort):117            f.write("  {:10d}  {:10.8f}   {:10.8f}\n".format(118                i+1, (i+1)/float(N), T))119def compute_percolation(input_file, structure_file, samples,120                        save_clusters, save_raw, file_name, pc, check,121                        pinf, pwrap, inaccessible, tortuosity,122                        mean_tortuosity, supercell):123    if not (check or pc or pinf or pwrap or inaccessible or mean_tortuosity):124        print("\n Nothing to do.")125        print(" Please specify the quantity to be calculated.")126        print(" Use the `--help' flag to list all options.\n")127        sys.exit()128    input_params = {}129    if structure_file is not None:130        uprint("\n Reading structure from file: {}".format(structure_file))131        input_params['structure'] = structure_file132    uprint("\n Parsing input file '{}'...".format(input_file), end="")133    inp = Input.from_file(input_file, **input_params)134    uprint(" done.")135    uprint("\n Setting up lattice and neighbor lists...", end="")136    lattice = Lattice.from_input_object(inp, supercell=supercell)137    uprint(" done.")138    uprint(lattice)139    uprint(" Initializing percolator...", end="")140    percolator = Percolator.from_input_object(inp, lattice, verbose=True)141    uprint(" done.")142    uprint("\n MC percolation simulation\n -------------------------\n")143    if check:  # check, if initial structure is percolating144        check_if_percolating(percolator, inp, save_clusters, tortuosity)145    if pc:  # calculate critical site concentrations146        calc_critical_concentration(percolator, save_clusters, samples,147                                    file_name, inp.flip_sequence)148    if pinf:  # estimate P_infinity(p)149        calc_p_infinity(percolator, samples, save_raw, file_name,150                        inp.flip_sequence)151    if pwrap:  # estimate P_wrapping(p)152        calc_p_wrapping(percolator, samples, save_raw, file_name,153                        inp.flip_sequence)154    if inaccessible is not None:  # fraction of inaccessible sites155        calc_inaccessible_sites(percolator, samples, save_raw,156                                file_name, inp.flip_sequence,157                                inaccessible)158    if mean_tortuosity:  # tortuosity as function of concentration159        calc_mean_tortuosity(percolator, samples, file_name,160                             inp.flip_sequence)161    dt = time.gmtime(time.clock())162    uprint(" All done.  Elapsed CPU time: {:02d}h{:02d}m{:02d}s\n".format(163            dt.tm_hour, dt.tm_min, dt.tm_sec))164def main():165    parser = argparse.ArgumentParser(166        description=__doc__,167        formatter_class=argparse.RawDescriptionHelpFormatter)168    parser.add_argument(169        "input_file",170        help="Input file in JSON format")171    parser.add_argument(172        "structure_file",173        help="Optional structure file in VASP's POSCAR format.",174        default=None,175        nargs="?")176    parser.add_argument(177        "--supercell",178        help="List of multiples of the lattice cell" +179             " in the three lattice directions",180        type=int,181        default=(1, 1, 1),182        nargs=3)183    parser.add_argument(184        "--inaccessible", "-i",185        help="Calculate fraction of inaccessible sites for given "186             "reference species",187        type=str,188        default=None,189        metavar="SPECIES")190    parser.add_argument(191        "--pc", "-p",192        help="Calculate critical site concentrations",193        action="store_true")194    parser.add_argument(195        "--check",196        help="Check, if the initial structure is percolating.",197        action="store_true")198    parser.add_argument(199        "--pinf", "-s",200        help="Estimate P_infinity and percolation susceptibility",201        action="store_true")202    parser.add_argument(203        "--pwrap", "-w",204        help="Estimate P_wrap(p)",205        action="store_true")206    parser.add_argument(207        "--tortuosity", "-t",208        help="Compute tortuosity of the percolating clusters as function "209             "of the concentration.  Together with '--check', only compute "210             "tortuosity of the input structure.",211        action="store_true")212    parser.add_argument(213        "--samples",214        help="number of samples to be averaged",215        type=int,216        default=500)217    parser.add_argument(218        "--file-name",219        help="base file name for all output files",220        default="percol")221    parser.add_argument(222        "--save-clusters",223        help="save wrapping clusters to file",224        action="store_true")225    parser.add_argument(226        "--save-raw",227        help="Also store raw data before convolution (where available).",228        action="store_true")229    parser.add_argument(230        "--debug",231        help="run in debugging mode",232        action="store_true")233    args = parser.parse_args()234    if args.debug:235        np.random.seed(seed=1)236    if args.tortuosity and not args.check:237        mean_tortuosity = True238    else:239        mean_tortuosity = False240    compute_percolation(input_file=args.input_file,241                        structure_file=args.structure_file,242                        samples=args.samples,243                        save_clusters=args.save_clusters,244                        save_raw=args.save_raw,245                        file_name=args.file_name,246                        pc=args.pc,247                        check=args.check,248                        pinf=args.pinf,249                        pwrap=args.pwrap,250                        inaccessible=args.inaccessible,251                        tortuosity=args.tortuosity,252                        mean_tortuosity=mean_tortuosity,253                        supercell=args.supercell)254if (__name__ == "__main__"):...get_chargecloud_data.py
Source:get_chargecloud_data.py  
1import pandas as pd 2import typing 3import requests4import datetime5import logging 6import logging.config7import time 8import json 9import os10logger = logging.getLogger(__name__)11from settings import LOGGING_CONFIG12BASE_URL_CHARGECLOUD = "https://new-poi.chargecloud.de"13SCRAPING_INTERVAL = 1014CITIES_CC = ['koeln','dortmund','bonn','muenster','kiel',15            'chemnitz','krefeld','leverkusen','heidelberg',16            'solingen','ingolstadt','pforzheim','goettingen',17            'erlangen','tuebingen','bayreuth','bocholt','dormagen',18            'rastatt','hof','weinheim','bruchsal','nettetal','ansbach',19            'schwabach','ettlingen','crailsheim','deggendorf','forchheim',20            'bretten','buehl','zirndorf','roth','calw','herzogenaurach','wertheim',21            'kitzingen','lichtenfels']22DIR_SAVE_API_RESULTS =  "../data/scraped_data"23SAVE_RAW = True24def scrape_cp_cities(cities: typing.List[str], 25                     dir_save: str, 26                     save_raw: bool = False): 27    """Scrape charging point information for a given list of cities28    Args:29        cities (typing.List[str], optional): list of cities to scrape.30        dir_save (str): directory for saving scraped cities. 31        save_raw (bool, optional): whether to save API result as raw json (True) or pickle file (False). Defaults to False.32    """    33    data_cities = {}34    now = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")        35    for city in cities: 36        r = requests.get(BASE_URL_CHARGECLOUD + "/" + city)37        try: 38            data = json.loads(r.text)39            data_cities[city] = data40            logger.debug(f"Successfully scraped '{city}' at {now}.")41        except: 42            logger.error(f"Error occurred while scraping '{city}' at {now}.")43    if save_raw: 44        fname = now + "_cp_data_cities.json"45        path_save = os.path.join(dir_save, fname)46        with open(path_save, 'w', encoding='utf-8') as f:47            json.dump(data_cities, f)48    else: 49        fname = now + "_cp_data_cities.pkl"50        path_save = os.path.join(dir_save, fname)51        pd.to_pickle(data_cities, path_save)52	53def call_chargecloud_api(scraping_interval: typing.Union[int, float] = SCRAPING_INTERVAL, 54                         cities: typing.List[str] = CITIES_CC, 55                         dir_save_api_results: str = DIR_SAVE_API_RESULTS, 56                         save_raw: str = SAVE_RAW): 57    """Scrape a given list of cities from chargecloud API in a given interval.58    Args:59        scraping_interval (typing.Union[int, float], optional): interval in minutes between API lookups. Defaults to SCRAPING_INTERVAL.60        cities (typing.List[str], optional): list of cities to scrape61        dir_save (str, optional): directory for saving scraped cities. Defaults to "../data/scraped_data".62        save_raw (bool, optional): whether to save API result as raw json (True) or pickle file (False). Defaults to False.63    """    64    while True: 65        66        now = datetime.datetime.now().strftime("%Y/%m/%d_%H:%M:%S")67        info_msg = f"Scraping cities: {now}"68        logger.info(info_msg)69        70        scrape_cp_cities(cities=cities, dir_save=dir_save_api_results, save_raw=save_raw)71        72        # API call of all cities takes approx. 15 seconds, therefore subtract it from specified interval73        sleep = max(scraping_interval*60 - 15, 0)74        time.sleep(sleep)75if __name__ == '__main__':76    logging.config.dictConfig(LOGGING_CONFIG)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
