How to use copy_local method in autotest

Best Python code snippet using autotest_python

skin_parents.py

Source:skin_parents.py Github

copy

Full Screen

1# SPDX-License-Identifier: GPL-2.0-or-later2import bpy3from itertools import count4from string import Template5from ...utils.naming import make_derived_name6from ...utils.misc import force_lazy, LazyRef7from ...base_rig import LazyRigComponent, stage8class ControlBoneParentBase(LazyRigComponent):9 """10 Base class for components that generate parent mechanisms for skin controls.11 The generated parent bone is accessible through the output_bone field or property.12 """13 # Run this component after the @stage methods of the owner node and its slave nodes14 rigify_sub_object_run_late = True15 # This generator's output bone cannot be modified by generators layered on top.16 # Otherwise they may optimize bone count by adding more constraints in place.17 # (This generally signals the bone is shared between multiple users.)18 is_parent_frozen = False19 def __init__(self, rig, node):20 super().__init__(node)21 # Rig that provides this parent mechanism.22 self.rig = rig23 # Control node that the mechanism is provided for24 self.node = node25 def __eq__(self, other):26 raise NotImplementedError()27class ControlBoneParentOrg:28 """Control node parent generator wrapping a single ORG bone."""29 is_parent_frozen = True30 def __init__(self, org):31 self._output_bone = org32 @property33 def output_bone(self):34 return force_lazy(self._output_bone)35 def enable_component(self):36 pass37 def __eq__(self, other):38 return isinstance(other, ControlBoneParentOrg) and self._output_bone == other._output_bone39class ControlBoneParentArmature(ControlBoneParentBase):40 """Control node parent generator using the Armature constraint to parent the bone."""41 def __init__(self, rig, node, *, bones, orientation=None, copy_scale=None, copy_rotation=None):42 super().__init__(rig, node)43 # List of Armature constraint target specs for make_constraint (lazy).44 self.bones = bones45 # Orientation quaternion for the bone (lazy)46 self.orientation = orientation47 # Bone to copy scale from (lazy)48 self.copy_scale = copy_scale49 # Bone to copy rotation from (lazy)50 self.copy_rotation = copy_rotation51 if copy_scale or copy_rotation:52 self.is_parent_frozen = True53 def __eq__(self, other):54 return (55 isinstance(other, ControlBoneParentArmature) and56 self.node.point == other.node.point and57 self.orientation == other.orientation and58 self.bones == other.bones and59 self.copy_scale == other.copy_scale and60 self.copy_rotation == other.copy_rotation61 )62 def generate_bones(self):63 self.output_bone = self.node.make_bone(64 make_derived_name(self.node.name, 'mch', '_arm'), 1/4, rig=self.rig)65 self.rig.generator.disable_auto_parent(self.output_bone)66 if self.orientation:67 matrix = force_lazy(self.orientation).to_matrix().to_4x4()68 matrix.translation = self.node.point69 self.get_bone(self.output_bone).matrix = matrix70 def parent_bones(self):71 self.targets = force_lazy(self.bones)72 assert len(self.targets) > 073 # Single target can be simplified to parenting74 if len(self.targets) == 1:75 target = force_lazy(self.targets[0])76 if isinstance(target, tuple):77 target = target[0]78 self.set_bone_parent(79 self.output_bone, target,80 inherit_scale='NONE' if self.copy_scale else 'FIX_SHEAR'81 )82 def rig_bones(self):83 # Multiple targets use the Armature constraint84 if len(self.targets) > 1:85 self.make_constraint(86 self.output_bone, 'ARMATURE', targets=self.targets,87 use_deform_preserve_volume=True88 )89 self.make_constraint(self.output_bone, 'LIMIT_ROTATION')90 if self.copy_rotation:91 self.make_constraint(self.output_bone, 'COPY_ROTATION', self.copy_rotation)92 if self.copy_scale:93 self.make_constraint(self.output_bone, 'COPY_SCALE', self.copy_scale)94class ControlBoneParentMix(ControlBoneParentBase):95 """Combine multiple parent mechanisms using the Armature constraint."""96 def __init__(self, rig, node, parents, *, suffix=None):97 super().__init__(rig, node)98 self.parents = []99 self.parent_weights = []100 self.suffix = suffix101 self.add_parents(parents)102 def add_parents(self, parents):103 for item in parents:104 if isinstance(item, tuple):105 parent, weight = item106 else:107 parent, weight = item, 1108 for i, cur in enumerate(self.parents):109 if parent == cur:110 self.parent_weights[i] += weight111 break112 else:113 self.parents.append(parent)114 self.parent_weights.append(weight)115 def enable_component(self):116 for parent in self.parents:117 parent.enable_component()118 super().enable_component()119 def __eq__(self, other):120 return (121 isinstance(other, ControlBoneParentMix) and122 self.parents == other.parents and123 self.parent_weights == other.parent_weights124 )125 def generate_bones(self):126 self.output_bone = self.node.make_bone(127 make_derived_name(self.node.name, 'mch', self.suffix or '_mix'), 1/2, rig=self.rig)128 self.rig.generator.disable_auto_parent(self.output_bone)129 def parent_bones(self):130 if len(self.parents) == 1:131 self.set_bone_parent(self.output_bone, target)132 def rig_bones(self):133 if len(self.parents) > 1:134 targets = [(p.output_bone, w) for p, w in zip(self.parents, self.parent_weights)]135 self.make_constraint(136 self.output_bone, 'ARMATURE', targets=targets,137 use_deform_preserve_volume=True138 )139class ControlBoneParentLayer(ControlBoneParentBase):140 """Base class for parent generators that build on top of another mechanism."""141 def __init__(self, rig, node, parent):142 super().__init__(rig, node)143 self.parent = parent144 def enable_component(self):145 self.parent.enable_component()146 super().enable_component()147class ControlBoneWeakParentLayer(ControlBoneParentLayer):148 """149 Base class for layered parent generator that is only used for the reparent source.150 I.e. it doesn't affect the control for its owner rig, but only for other rigs151 that have controls merged into this one.152 """153 @staticmethod154 def strip(parent):155 while isinstance(parent, ControlBoneWeakParentLayer):156 parent = parent.parent157 return parent158class ControlBoneParentOffset(ControlBoneParentLayer):159 """160 Parent mechanism generator that offsets the control's location.161 Supports Copy Transforms (Local) constraints and location drivers.162 Multiple offsets can be accumulated in the same generator, which163 will automatically create as many bones as needed.164 """165 @classmethod166 def wrap(cls, owner, parent, node, *constructor_args):167 return cls(owner, node, parent, *constructor_args)168 def __init__(self, rig, node, parent):169 super().__init__(rig, node, parent)170 self.copy_local = {}171 self.add_local = {}172 self.add_orientations = {}173 self.limit_distance = []174 def enable_component(self):175 # Automatically merge an unfrozen sequence of this generator instances176 while isinstance(self.parent, ControlBoneParentOffset) and not self.parent.is_parent_frozen:177 self.prepend_contents(self.parent)178 self.parent = self.parent.parent179 super().enable_component()180 def prepend_contents(self, other):181 """Merge all offsets stored in the other generator into the current one."""182 for key, val in other.copy_local.items():183 if key not in self.copy_local:184 self.copy_local[key] = val185 else:186 inf, expr, cbs = val187 inf0, expr0, cbs0 = self.copy_local[key]188 self.copy_local[key] = [inf+inf0, expr+expr0, cbs+cbs0]189 for key, val in other.add_orientations.items():190 if key not in self.add_orientations:191 self.add_orientations[key] = val192 for key, val in other.add_local.items():193 if key not in self.add_local:194 self.add_local[key] = val195 else:196 ot0, ot1, ot2 = val197 my0, my1, my2 = self.add_local[key]198 self.add_local[key] = (ot0+my0, ot1+my1, ot2+my2)199 self.limit_distance = other.limit_distance + self.limit_distance200 def add_copy_local_location(self, target, *, influence=1, influence_expr=None, influence_vars={}):201 """202 Add a Copy Location (Local, Owner Orientation) offset.203 The influence may be specified as a (lazy) constant, or a driver expression204 with variables (using the same $var syntax as add_location_driver).205 """206 if target not in self.copy_local:207 self.copy_local[target] = [0, [], []]208 if influence_expr:209 self.copy_local[target][1].append((influence_expr, influence_vars))210 elif callable(influence):211 self.copy_local[target][2].append(influence)212 else:213 self.copy_local[target][0] += influence214 def add_location_driver(self, orientation, index, expression, variables):215 """216 Add a driver offsetting along the specified axis in the given Quaternion orientation.217 The variables may have to be renamed due to conflicts between multiple add requests,218 so the expression should use the $var syntax of Template to reference them.219 """220 assert isinstance(variables, dict)221 key = tuple(round(x*10000) for x in orientation)222 if key not in self.add_local:223 self.add_orientations[key] = orientation224 self.add_local[key] = ([], [], [])225 self.add_local[key][index].append((expression, variables))226 def add_limit_distance(self, target, *, ensure_order=False, **kwargs):227 """Add a limit distance constraint with the given make_constraint arguments."""228 self.limit_distance.append((target, kwargs))229 # Prevent merging from reordering this limit230 if ensure_order:231 self.is_parent_frozen = True232 def __eq__(self, other):233 return (234 isinstance(other, ControlBoneParentOffset) and235 self.parent == other.parent and236 self.copy_local == other.copy_local and237 self.add_local == other.add_local and238 self.limit_distance == other.limit_distance239 )240 @property241 def output_bone(self):242 return self.mch_bones[-1] if self.mch_bones else self.parent.output_bone243 def generate_bones(self):244 self.mch_bones = []245 self.reuse_mch = False246 if self.copy_local or self.add_local or self.limit_distance:247 mch_name = make_derived_name(self.node.name, 'mch', '_poffset')248 if self.add_local:249 # Generate a bone for every distinct orientation used for the drivers250 for key in self.add_local:251 self.mch_bones.append(self.node.make_bone(252 mch_name, 1/4, rig=self.rig, orientation=self.add_orientations[key]))253 else:254 # Try piggybacking on the parent bone if allowed255 if not self.parent.is_parent_frozen:256 bone = self.get_bone(self.parent.output_bone)257 if (bone.head - self.node.point).length < 1e-5:258 self.reuse_mch = True259 self.mch_bones = [bone.name]260 return261 self.mch_bones.append(self.node.make_bone(mch_name, 1/4, rig=self.rig))262 def parent_bones(self):263 if self.mch_bones:264 if not self.reuse_mch:265 self.rig.set_bone_parent(self.mch_bones[0], self.parent.output_bone)266 self.rig.parent_bone_chain(self.mch_bones, use_connect=False)267 def compile_driver(self, items):268 variables = {}269 expressions = []270 # Loop through all expressions and combine the variable maps.271 for expr, varset in items:272 template = Template(expr)273 varmap = {}274 # Check that all variables are present275 try:276 template.substitute({k: '' for k in varset})277 except Exception as e:278 self.rig.raise_error('Invalid driver expression: {}\nError: {}', expr, e)279 # Merge variables280 for name, desc in varset.items():281 # Check if the variable is used.282 try:283 template.substitute({k: '' for k in varset if k != name})284 continue285 except KeyError:286 pass287 # Descriptors may not be hashable, so linear search288 for vn, vdesc in variables.items():289 if vdesc == desc:290 varmap[name] = vn291 break292 else:293 # Find an unique name for the new variable and add to map294 new_name = name295 if new_name in variables:296 for i in count(1):297 new_name = '%s_%d' % (name, i)298 if new_name not in variables:299 break300 variables[new_name] = desc301 varmap[name] = new_name302 # Substitute the new names into the expression303 expressions.append(template.substitute(varmap))304 # Add all expressions together305 if len(expressions) > 1:306 final_expr = '+'.join('('+expr+')' for expr in expressions)307 else:308 final_expr = expressions[0]309 return final_expr, variables310 def rig_bones(self):311 # Emit the Copy Location constraints312 if self.copy_local:313 mch = self.mch_bones[0]314 for target, (influence, drivers, lazyinf) in self.copy_local.items():315 influence += sum(map(force_lazy, lazyinf))316 con = self.make_constraint(317 mch, 'COPY_LOCATION', target, use_offset=True,318 target_space='LOCAL_OWNER_ORIENT', owner_space='LOCAL', influence=influence,319 )320 if drivers:321 if influence > 0:322 drivers.append((str(influence), {}))323 expr, variables = self.compile_driver(drivers)324 self.make_driver(con, 'influence', expression=expr, variables=variables)325 # Add the direct offset drivers326 if self.add_local:327 for mch, (key, specs) in zip(self.mch_bones, self.add_local.items()):328 for index, vals in enumerate(specs):329 if vals:330 expr, variables = self.compile_driver(vals)331 self.make_driver(mch, 'location', index=index,332 expression=expr, variables=variables)333 # Add the limit distance constraints334 for target, kwargs in self.limit_distance:...

Full Screen

Full Screen

update.py

Source:update.py Github

copy

Full Screen

1#!/usr/bin/env python2from __future__ import print_function, division3from time import time4import argparse5import collections6import csv7import gc8import json9import os10import shutil11import re12import sys13import tempfile14from . import db_sqlite3 as db15from . import defs16from . import env17from . import util18from .thirdparty import gzipinputstream as gzis19log = util.get_logger("update")20class DownloadOnly(object):21 def ignore(self, many, drop_indices = False):22 for _ in many:23 continue24 populate_table_systems = ignore25 populate_table_stations = ignore26 populate_table_coriolis_fsds = ignore27 def close(self): pass28edsm_systems_url = "https://www.edsm.net/dump/systemsWithCoordinates.json.gz"29edsm_syspop_url = "https://www.edsm.net/dump/systemsPopulated.json.gz"30edsm_stations_url = "https://www.edsm.net/dump/stations.json.gz"31coriolis_fsds_url = "https://raw.githubusercontent.com/edcd/coriolis-data/master/modules/standard/frame_shift_drive.json"32local_path = 'data'33edsm_systems_local_path = os.path.join(local_path, "systemsWithCoordinates.json")34edsm_syspop_local_path = os.path.join(local_path, "systemsPopulated.json")35edsm_stations_local_path = os.path.join(local_path, "stations.json")36coriolis_fsds_local_path = os.path.join(local_path, "frame_shift_drive.json")37_re_json_line = re.compile(r'^\s*(\{.*\})[\s,]*$')38default_steps = ['clean', 'systems', 'stations', 'fsds']39extra_steps = ['systems_populated', 'id64']40valid_steps = default_steps + extra_steps41all_steps = valid_steps + ['default', 'extra', 'all']42def steps_type(s):43 step_names = s.lower().split(',') if s else []44 steps = []45 for step in step_names:46 if step not in all_steps:47 raise ValueError('Invalid step "{}". Valid steps are: {}', step, ','.join(all_steps))48 if step == 'default':49 steps += default_steps50 elif step == 'extra':51 steps += extra_steps52 elif step == 'default':53 steps += default_steps54 elif step == 'all':55 return valid_steps56 else:57 steps.append(step)58 return steps59class StreamingStringIO(object):60 def __init__(self): self.data = collections.deque()61 def add(self, data): self.data.appendleft(data)62 def __iter__(self): return self63 def next(self):64 if any(self.data):65 return self.data.pop()66 else:67 raise StopIteration68 __next__ = next69def read_header_csv(line):70 sio = StreamingStringIO()71 sio.add(line)72 csvr = csv.DictReader(sio)73 return (sio, csvr)74def read_line_csv(line, header):75 if len(line) == 0:76 return False77 header[0].add(line)78 return next(header[1])79def read_all_csv(data):80 return [row for row in csv.DictReader(data)]81def read_line_json(line, header):82 m = _re_json_line.match(line)83 if m is None:84 return False85 try:86 return json.loads(m.group(1))87 except ValueError:88 log.debug("Line failed JSON parse: {0}", line)89 return None90def read_all_json(data):91 return json.loads(data)92def cleanup_local(f, scratch):93 if f is not None and not f.closed:94 try:95 f.close()96 except:97 log.error("Error closing temporary file{}", ' {}'.format(scratch) if scratch is not None else '')98 if scratch is not None:99 try:100 os.unlink(scratch)101 except:102 log.error("Error cleaning up temporary file {}", scratch)103class Application(object):104 def __init__(self, arg, hosted, state = {}):105 ap = argparse.ArgumentParser(description = 'Update local database', parents = [env.arg_parser], prog = "update", epilog='Valid choices for --steps: {}'.format(','.join(all_steps)))106 ap.add_argument_group("Processing options")107 bex = ap.add_mutually_exclusive_group()108 bex.add_argument('-b', '--batch', dest='batch', action='store_true', default=True, help='Import data in batches')109 bex.add_argument('-n', '--no-batch', dest='batch', action='store_false', help='Import data in one load - this will use massive amounts of RAM and may fail!')110 ap.add_argument('-c', '--copy-local', required=False, action='store_true', help='Keep local copy of downloaded files')111 ap.add_argument('-d', '--download-only', required=False, action='store_true', help='Do not import, just download files - implies --copy-local')112 ap.add_argument('-s', '--batch-size', required=False, type=int, help='Batch size; higher sizes are faster but consume more memory')113 ap.add_argument('-l', '--local', required=False, action='store_true', help='Instead of downloading, update from local files in the data directory')114 ap.add_argument( '--steps', required=False, type=steps_type, default=default_steps, help='Manually (re-)perform comma-separated steps of the update process.')115 ap.add_argument( '--print-urls', required=False, action='store_true', help='Do not download anything, just print the URLs which we would fetch from')116 args = ap.parse_args(sys.argv[1:])117 if args.batch or args.batch_size:118 args.batch_size = args.batch_size if args.batch_size is not None else 1024119 if not args.batch_size > 0:120 raise ValueError("Batch size must be a natural number!")121 args.copy_local = args.download_only or args.copy_local122 if args.copy_local and args.local:123 raise ValueError("Invalid use of --local and --{}!", "download-only" if args.download_only else "copy-local")124 self.args = args125 def run(self):126 env.log_versions()127 db.log_versions()128 # Get the relative path to the "edtslib" base directory from the current directory129 relpath = util.get_relative_path(os.getcwd(), os.path.dirname(__file__))130 if self.args.print_urls:131 if self.args.local:132 for path in [edsm_systems_local_path, edsm_stations_local_path, coriolis_fsds_local_path]:133 print(path)134 else:135 for path in [edsm_systems_url, edsm_stations_url, coriolis_fsds_url]:136 print(path)137 return138 g = util.start_timer()139 if self.args.download_only:140 log.info("Downloading files locally...")141 dbc = DownloadOnly()142 else:143 db_file = os.path.join(defs.default_path, env.global_args.db_file)144 db_dir = os.path.dirname(db_file)145 # If the data directory doesn't exist, make it146 if db_dir and not os.path.exists(db_dir):147 os.makedirs(db_dir)148 if 'clean' in self.args.steps:149 # Open then close a temporary file, essentially reserving the name.150 fd, db_tmp_filename = tempfile.mkstemp('.tmp', os.path.basename(db_file), db_dir if db_dir else '.')151 os.close(fd)152 log.info("Initialising database...")153 sys.stdout.flush()154 t = util.start_timer()155 dbc = db.initialise_db(db_tmp_filename)156 db_open_filename = db_tmp_filename157 log.info("Done in {}.", util.format_timer(t))158 else:159 log.info("Opening existing database...")160 sys.stdout.flush()161 t = util.start_timer()162 dbc = db.open_db(db_file)163 db_open_filename = db_file164 if dbc:165 log.info("Done in {}.", util.format_timer(t))166 else:167 log.error("Failed to open existing DB!")168 sys.exit(2)169 try:170 # Repoint local paths to use the right relative path171 cur_edsm_systems_local_path = os.path.join(relpath, edsm_systems_local_path)172 cur_edsm_syspop_local_path = os.path.join(relpath, edsm_syspop_local_path)173 cur_edsm_stations_local_path = os.path.join(relpath, edsm_stations_local_path)174 cur_coriolis_fsds_local_path = os.path.join(relpath, coriolis_fsds_local_path)175 # Decide whether to source data from local paths or remote URLs176 edsm_systems_path = util.path_to_url(cur_edsm_systems_local_path) if self.args.local else edsm_systems_url177 edsm_syspop_path = util.path_to_url(cur_edsm_syspop_local_path) if self.args.local else edsm_syspop_url178 edsm_stations_path = util.path_to_url(cur_edsm_stations_local_path) if self.args.local else edsm_stations_url179 coriolis_fsds_path = util.path_to_url(cur_coriolis_fsds_local_path) if self.args.local else coriolis_fsds_url180 if self.args.copy_local:181 download_dir = os.path.sep.join([relpath, local_path])182 if not os.path.exists(download_dir):183 os.makedirs(download_dir)184 if 'systems' in self.args.steps:185 dbc.populate_table_systems(self.import_json_from_url(edsm_systems_path, cur_edsm_systems_local_path, 'EDSM systems', self.args.batch_size, is_url_local=self.args.local), True)186 log.info("Done.")187 if 'systems_populated' in self.args.steps:188 dbc.populate_table_systems(self.import_json_from_url(edsm_syspop_path, cur_edsm_syspop_local_path, 'EDSM populated systems', self.args.batch_size, is_url_local=self.args.local))189 log.info("Done.")190 if 'stations' in self.args.steps:191 dbc.populate_table_stations(self.import_json_from_url(edsm_stations_path, cur_edsm_stations_local_path, 'EDSM stations', self.args.batch_size, is_url_local=self.args.local))192 log.info("Done.")193 if 'fsds' in self.args.steps:194 dbc.populate_table_coriolis_fsds(self.import_json_from_url(coriolis_fsds_path, cur_coriolis_fsds_local_path, 'Coriolis FSDs', None, is_url_local=self.args.local, key='fsd'))195 log.info("Done.")196 if 'id64' in self.args.steps:197 log.info("Setting known system ID64s...")198 sys.stdout.flush()199 t = util.start_timer()200 dbc.update_table_systems_with_id64()201 log.info("Done in {}.".format(util.format_timer(t)))202 except MemoryError:203 log.error("Out of memory!")204 if self.args.batch_size is None:205 log.error("Try the --batch flag for a slower but more memory-efficient method!")206 elif self.args.batch_size > 64:207 log.error("Try --batch-size {0}", self.args.batch_size / 2)208 if not self.args.download_only:209 if 'clean' in self.args.steps:210 cleanup_local(None, db_open_filename)211 else:212 log.warning("Update operation on existing database cancelled - database state could be invalid")213 return214 except:215 if not self.args.download_only:216 if 'clean' in self.args.steps:217 cleanup_local(None, db_open_filename)218 else:219 log.warning("Update operation on existing database cancelled - database state could be invalid")220 raise221 if not self.args.download_only:222 dbc.close()223 # If we just made a new DB...224 if 'clean' in self.args.steps:225 if os.path.isfile(db_file):226 os.unlink(db_file)227 shutil.move(db_open_filename, db_file)228 else:229 log.debug("Existing database updated")230 log.info("All done in {}.".format(util.format_timer(g)))231 def import_csv_from_url(self, url, filename, description, batch_size, is_url_local = False, key = None):232 return self.import_data_from_url(read_header_csv, read_line_csv, read_all_csv, url, filename, description, batch_size, is_url_local, key)233 def import_json_from_url(self, url, filename, description, batch_size, is_url_local = False, key = None):234 return self.import_data_from_url(None, read_line_json, read_all_json, url, filename, description, batch_size, is_url_local, key)235 def import_data_from_url(self, fn_read_header, fn_read_line, fn_read_all, url, filename, description, batch_size, is_url_local, key):236 if self.args.copy_local:237 try:238 dirname = os.path.dirname(filename)239 fd, scratch = tempfile.mkstemp('.tmp', os.path.basename(filename), dirname if dirname else '.')240 f = os.fdopen(fd, 'wb')241 except:242 log.error("Failed to create a temporary file")243 raise244 try:245 is_url_gzip = url.endswith(".gz")246 request_gzip_enc = (not is_url_gzip) # Don't try to request gzip encoding if the file is already gzipped247 # Try to open the stream248 stream = util.open_url(url, allow_no_ssl=is_url_local, allow_gzip=request_gzip_enc)249 if stream is None:250 if self.args.copy_local:251 cleanup_local(f, scratch)252 return253 # If we have a gzip file, wrap the stream in a decompressor254 if is_url_gzip:255 stream = gzis.GzipInputStream(stream)256 # Are we batch downloading?257 if batch_size is not None:258 log.info("Batch downloading {0} list from {1} ... ", description, url)259 sys.stdout.flush()260 start = int(util.start_timer())261 done = 0262 failed = 0263 last_elapsed = 0264 header = None265 batch = []266 # Begin reading267 while True:268 line = util.read_stream_line(stream)269 if not line:270 break271 if self.args.copy_local:272 util.write_stream(f, line)273 if self.args.download_only:274 continue275 # Check if this is the first line and we should read a header276 if fn_read_header is not None and header is None:277 header = fn_read_header(line)278 if header is None:279 raise Exception("Failed to read header")280 continue281 # OK, read a normal line282 obj = fn_read_line(line, header)283 if obj in [None, False]:284 if obj is None:285 failed += 1286 continue287 # Add to batch and check if we're now full288 batch.append(obj)289 if len(batch) >= batch_size:290 for obj in batch:291 yield obj292 done += len(batch)293 elapsed = int(util.get_timer(start))294 if elapsed - last_elapsed >= 30:295 log.info("Loaded {0} row(s) of {1} data to DB...", done, description)296 last_elapsed = elapsed297 batch = []298 done += len(batch)299 if not self.args.download_only:300 for obj in batch:301 yield obj302 if failed:303 log.info("Lines failing JSON parse: {0}", failed)304 log.info("Loaded {0} row(s) of {1} data to DB...", done, description)305 log.info("Imported data in {}, generating relevant indexes...".format(util.format_timer(start)))306 else:307 log.info("Downloading {0} list from {1} ... ", description, url)308 sys.stdout.flush()309 t = util.start_timer()310 encoded = util.read_stream(stream)311 log.info("Done in {}.".format(util.format_timer(t)))312 if self.args.copy_local:313 log.info("Writing {0} local data...", description)314 sys.stdout.flush()315 t = util.start_timer()316 util.write_stream(f, encoded)317 log.info("Done in {}.".format(util.format_timer(t)))318 if not self.args.download_only:319 log.info("Loading {0} data...", description)320 sys.stdout.flush()321 t = util.start_timer()322 obj = fn_read_all(encoded)323 log.info("Done in {}.".format(util.format_timer(t)))324 log.info("Adding {0} data to DB...", description)325 sys.stdout.flush()326 t = util.start_timer()327 if key is not None:328 obj = obj[key]329 for o in obj:330 yield o331 log.info("Imported data in {}, generating relevant indexes...".format(util.format_timer(t)))332 # Force GC collection to try to avoid memory errors333 encoded = None334 obj = None335 batch = None336 gc.collect()337 if self.args.copy_local:338 f.close()339 f = None340 shutil.move(scratch, filename)341 except MemoryError:342 encoded = None343 obj = None344 batch = None345 gc.collect()346 raise347 except:348 if self.args.copy_local:349 cleanup_local(f, scratch)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful