How to use checksum_crc32c method in localstack

Best Python code snippet using localstack_python

verify_release_manifest.py

Source:verify_release_manifest.py Github

copy

Full Screen

1"""2Given a file containing a list of constituent staging dirs for a DCP release (aka a manifest),3verify that data has been loaded from each of them to the target DCP dataset and the count of loaded files4matches the # in the staging area.5Files are determined to be loaded if they exist at the desired target path and crc as defined in the staging6areas descriptors. It's possible that an expected file was loaded by another staging dir (i.e,. they both7contain the same file). While this is discouraged, it's technically possible and we need to accommodate that.8So, we check if the target path was loaded, disregarding the source staging dir.9Additionally, this will check that metadata was loaded properly (including links) by pull the entity_id, version and10content from the files in GS and checking that the expected row is present in the given dataset. If a newer version11is present in the repo than is staged, we consider that valid.12Example invocation:13python verify_release_manifest.py -f testing.csv -g fake-gs-project -b fake-bq-project -d fake-dataset14"""15import argparse16import json17import logging18import sys19from dataclasses import dataclass20from datetime import datetime21from collections import defaultdict22from functools import partial23from multiprocessing import Pool24from typing import Tuple25from urllib.parse import urlparse26from dateutil import parser27from google.cloud import bigquery, storage28from google.cloud.storage.client import Client29from dagster_utils.contrib.google import get_credentials30from hca_orchestration.solids.load_hca.data_files.load_data_metadata_files import FileMetadataTypes31from hca_orchestration.solids.load_hca.non_file_metadata.load_non_file_metadata import NonFileMetadataTypes32from hca_orchestration.support.dates import parse_version_to_datetime33logging.basicConfig(level=logging.INFO, format='%(message)s')34@dataclass(frozen=True)35class PathWithCrc:36 path: str37 crc32c: str38@dataclass(frozen=True)39class StagingAreaVerificationResult:40 has_metadata_errors: bool41 has_file_errors: bool42 def has_errors(self) -> bool:43 return self.has_metadata_errors or self.has_file_errors44def get_staging_area_file_descriptors(storage_client: Client, staging_areas: set[str]) -> dict[str, set[PathWithCrc]]:45 """46 Given a set of GS staging areas, return the downloaded descriptors present in each area47 """48 expected: dict[str, set[PathWithCrc]] = defaultdict(set[PathWithCrc])49 for staging_area in staging_areas:50 url = urlparse(staging_area)51 for file_type in FileMetadataTypes:52 prefix = f"{url.path.lstrip('/')}/descriptors/{file_type.value}"53 blobs = list(storage_client.list_blobs(url.netloc, prefix=prefix))54 for blob in blobs:55 parsed = json.loads(blob.download_as_text())56 path_with_crc = PathWithCrc(target_path_from_descriptor(parsed), parsed["crc32c"])57 expected[staging_area].add(path_with_crc)58 return expected59def target_path_from_descriptor(descriptor: dict[str, str]) -> str:60 return f"/v1/{descriptor['file_id']}/{descriptor['crc32c']}/{descriptor['file_name']}"61def find_files_in_load_history(bq_project: str, dataset: str,62 areas: dict[str, set[PathWithCrc]]) -> dict[str, set[PathWithCrc]]:63 client = bigquery.Client(project=bq_project)64 loaded_paths = {}65 for area, paths_with_crc in areas.items():66 logging.debug(f"\tPulling loaded files for area {area}...")67 target_paths = [path_with_crc.path for path_with_crc in paths_with_crc]68 query = f"""69 SELECT target_path, checksum_crc32c70 FROM `datarepo_{dataset}.datarepo_load_history` dlh71 WHERE state = 'succeeded'72 AND target_path IN UNNEST(@paths)73 """74 job_config = bigquery.QueryJobConfig(75 query_parameters=[76 bigquery.ArrayQueryParameter("paths", "STRING", target_paths),77 ]78 )79 query_job = client.query(query, job_config=job_config)80 loaded_paths[area] = {PathWithCrc(row["target_path"], row["checksum_crc32c"]) for row in81 query_job}82 return loaded_paths83def parse_manifest_file(manifest_file: str) -> list[str]:84 with open(manifest_file) as manifest:85 # some of the staging areas submitted via the form need slight cleanup86 return [area.rstrip('\n/').strip() for area in manifest]87def process_staging_area(area: str, gs_project: str, bq_project: str, dataset: str,88 release_cutoff: datetime) -> StagingAreaVerificationResult:89 logging.info(f"Processing staging area = {area}")90 creds = get_credentials()91 storage_client = storage.Client(project=gs_project, credentials=creds)92 expected_loaded_paths = get_staging_area_file_descriptors(storage_client, {area})93 loaded_paths_by_staging_area = find_files_in_load_history(bq_project, dataset, expected_loaded_paths)94 has_file_error = False95 for area, paths_with_crc in expected_loaded_paths.items():96 load_paths_for_staging_area = loaded_paths_by_staging_area[area]97 diff = paths_with_crc - load_paths_for_staging_area98 loaded = len(load_paths_for_staging_area)99 staged = len(paths_with_crc)100 if diff:101 logging.warning(102 f"❌ area = {area} - (data files) Mismatched loaded paths; expected files loaded = {staged}, actual loaded = {loaded}"103 )104 logging.debug(diff)105 has_file_error = True106 else:107 logging.info(108 f"✅ area = {area} - (data files) expected files loaded = {staged}, actual loaded = {loaded}"109 )110 has_metadata_error = verify_metadata(area, bq_project, dataset, release_cutoff)111 return StagingAreaVerificationResult(has_metadata_error, has_file_error)112def inspect_entities_at_path(storage_client: Client, bq_client: bigquery.Client, bq_project: str,113 bq_dataset: str, staging_area: str, prefix: str, entity_type: str,114 release_cutoff: datetime) -> bool:115 metadata_entities: dict[str, Tuple[str, str]] = {}116 url = urlparse(staging_area)117 if prefix:118 prefix = f"{url.path.lstrip('/')}/{prefix}/{entity_type}"119 else:120 prefix = f"{url.path.lstrip('/')}/{entity_type}"121 blobs = list(storage_client.list_blobs(url.netloc, prefix=prefix))122 for blob in blobs:123 content = blob.download_as_text()124 file_name = blob.name.split('/')[-1]125 entity_id = file_name.split('_')[0]126 version = file_name.split('_')[1].replace('.json', '')127 # files may be staged after we import, guard against those versions being present128 version_timestamp = parse_version_to_datetime(version)129 if version_timestamp > release_cutoff:130 logging.info(f"Ignoring file {file_name} staged after cutoff")131 continue132 # multiple versions may be staged, the latest one should win133 if entity_id in metadata_entities:134 existing_version, _ = metadata_entities[entity_id]135 if existing_version >= version:136 continue137 metadata_entities[entity_id] = (version, content)138 if len(metadata_entities) == 0:139 if entity_type == 'links':140 logging.debug(f"area = {staging_area} no links data found")141 return False142 logging.debug(f"️area = {staging_area} No metadata for {entity_type} expected, skipping")143 return False144 logging.debug(f"Querying for metadata entities of type {entity_type} [area={staging_area}]")145 entity_ids = metadata_entities.keys()146 query = f"""147 SELECT {entity_type}_id, content, version FROM `{bq_project}.datarepo_{bq_dataset}.{entity_type}`148 WHERE {entity_type}_id IN UNNEST(@entity_ids)149 """150 job_config = bigquery.QueryJobConfig(151 query_parameters=[152 bigquery.ArrayQueryParameter("entity_ids", "STRING", entity_ids),153 ]154 )155 query_job = bq_client.query(query, job_config=job_config)156 rows = {row[f'{entity_type}_id']: (row['version'], row['content']) for row in query_job.result()}157 has_error = False158 for key, (version, content) in metadata_entities.items():159 if key not in rows.keys():160 logging.info(f"❌ area = {staging_area} {entity_type} ID {key} not in table")161 return True162 row = rows[key]163 parsed_version = parser.parse(version)164 if parsed_version < row[0]:165 # old version staged but a newer version was present, ignore166 logging.debug(167 f"Newer version of entity present in repo, ignoring. [area={staging_area}, entity_type={entity_type}, id={key}]"168 )169 continue170 if not parser.parse(version) == row[0]:171 has_error = True172 logging.info(f"❌ area = {staging_area} {entity_type} ID {key} version is incorrect")173 if not json.loads(content) == json.loads(row[1]):174 has_error = True175 logging.info(f"❌ area = {staging_area} {entity_type} ID {key} content is incorrect")176 logging.debug(177 f"✅ area = {staging_area} - (metadata) all {entity_type} entities found ({len(metadata_entities.keys())} entities)")178 return has_error179def verify_metadata(staging_area: str, bq_project: str, bq_dataset: str, release_cutoff: datetime) -> bool:180 creds = get_credentials()181 storage_client = storage.Client(project="broad-dsp-monster-hca-prod", credentials=creds)182 client = bigquery.Client(project=bq_project)183 logging.debug(f"Verifying metadata for {staging_area}")184 links_errors = inspect_entities_at_path(185 storage_client,186 client,187 bq_project,188 bq_dataset,189 staging_area,190 "",191 "links",192 release_cutoff193 )194 non_file_metadata_errors = [195 inspect_entities_at_path(196 storage_client,197 client,198 bq_project,199 bq_dataset,200 staging_area,201 "metadata",202 non_file_metadata_type.value,203 release_cutoff204 ) for non_file_metadata_type in205 NonFileMetadataTypes]206 file_metadata_errors = [207 inspect_entities_at_path(208 storage_client,209 client, bq_project,210 bq_dataset,211 staging_area,212 "metadata",213 file_metadata_type.value,214 release_cutoff215 ) for file_metadata_type in FileMetadataTypes]216 return any(file_metadata_errors) or any(non_file_metadata_errors) or links_errors217def verify(manifest_file: str, gs_project: str, bq_project: str,218 dataset: str, pool_size: int, release_cutoff: str) -> int:219 staging_areas = parse_manifest_file(manifest_file)220 parsed_cutoff = datetime.fromisoformat(release_cutoff)221 logging.info("Parsing manifest...")222 logging.info(f"Release cutoff = {release_cutoff}")223 logging.info(f"{len(staging_areas)} staging areas in manifest.")224 logging.info(f"Inspecting staging areas (pool_size = {pool_size})...")225 # we multiprocess because this takes quite awhile for > 10 projects, which is common for our releases226 frozen = partial(227 process_staging_area,228 gs_project=gs_project,229 bq_project=bq_project,230 dataset=dataset,231 release_cutoff=parsed_cutoff)232 if pool_size > 0:233 with Pool(pool_size) as p:234 results = p.map(frozen, staging_areas)235 else:236 results = [frozen(area) for area in staging_areas]237 logging.info('-' * 80)238 if any(map(lambda x: x.has_errors(), results)):239 logging.error(f"❌ Manifest {manifest_file} had errors")240 return 1241 else:242 logging.info(f"✅ Manifest {manifest_file} had no errors")243 return 0244if __name__ == '__main__':245 argparser = argparse.ArgumentParser()246 argparser.add_argument("-f", "--manifest-file", required=True)247 argparser.add_argument("-g", "--gs-project", required=True)248 argparser.add_argument("-b", "--bq-project", required=True)249 argparser.add_argument("-d", "--dataset", required=True)250 argparser.add_argument("-p", "--pool-size", type=int, default=4)251 argparser.add_argument("-r", "--release-cutoff", required=True)252 args = argparser.parse_args()253 exit_code = verify(254 args.manifest_file,255 args.gs_project,256 args.bq_project,257 args.dataset,258 args.pool_size,259 args.release_cutoff)...

Full Screen

Full Screen

strings.py

Source:strings.py Github

copy

Full Screen

...99def checksum_crc32(string: Union[str, bytes]) -> str:100 bytes = to_bytes(string)101 checksum = zlib.crc32(bytes)102 return base64.b64encode(checksum.to_bytes(4, "big")).decode()103def checksum_crc32c(string: Union[str, bytes]):104 checksum = CrtCrc32cChecksum()105 checksum.update(to_bytes(string))106 return base64.b64encode(checksum.digest()).decode()107def hash_sha1(string: Union[str, bytes]) -> str:108 digest = hashlib.sha1(to_bytes(string)).digest()109 return base64.b64encode(digest).decode()110def hash_sha256(string: Union[str, bytes]) -> str:111 digest = hashlib.sha256(to_bytes(string)).digest()112 return base64.b64encode(digest).decode()113def base64_to_hex(b64_string: str) -> bytes:114 return binascii.hexlify(base64.b64decode(b64_string))115def base64_decode(data: Union[str, bytes]) -> bytes:116 """Decode base64 data - with optional padding, and able to handle urlsafe encoding (containing -/_)."""117 data = to_str(data)...

Full Screen

Full Screen

load_data_metadata_files.py

Source:load_data_metadata_files.py Github

copy

Full Screen

1from enum import Enum2from dagster import (3 DynamicOutputDefinition,4 Failure,5 Optional,6 composite_solid,7 configured,8 solid,9)10from dagster.core.execution.context.compute import (11 AbstractComputeExecutionContext,12)13from google.cloud.bigquery.client import RowIterator14from hca_manage.common import JobId15from hca_orchestration.contrib.bigquery import BigQueryService16from hca_orchestration.models.hca_dataset import TdrDataset17from hca_orchestration.resources.config.scratch import ScratchConfig18from hca_orchestration.solids.load_hca.ingest_metadata_type import (19 ingest_metadata_type,20)21from hca_orchestration.solids.load_hca.load_table import (22 export_data,23 load_table_solid,24)25from hca_orchestration.support.typing import (26 HcaScratchDatasetName,27 MetadataType,28 MetadataTypeFanoutResult,29)30class FileMetadataTypes(Enum):31 """32 This Enum captures MetadataTypes that are directly describing a file type in the HCA33 """34 ANALYSIS_FILE = MetadataType('analysis_file')35 IMAGE_FILE = MetadataType('image_file')36 REFERENCE_FILE = MetadataType('reference_file')37 SEQUENCE_FILE = MetadataType('sequence_file')38 SUPPLEMENTARY_FILE = MetadataType('supplementary_file')39ingest_file_metadata_type = configured(ingest_metadata_type, name="ingest_file_metadata_type")(40 {"metadata_types": FileMetadataTypes, "prefix": "file-metadata-with-ids"})41class NullFileIdException(Failure):42 pass43def _inject_file_ids(44 target_hca_dataset: TdrDataset,45 scratch_config: ScratchConfig,46 file_metadata_type: str,47 scratch_dataset_name: HcaScratchDatasetName,48 bigquery_service: BigQueryService49) -> RowIterator:50 fq_dataset_id = target_hca_dataset.fully_qualified_jade_dataset_name()51 query = f"""52 SELECT S.{file_metadata_type}_id, S.version, J.file_id, S.content, S.descriptor53 FROM {file_metadata_type} S LEFT JOIN `{target_hca_dataset.project_id}.{fq_dataset_id}.datarepo_load_history` J54 ON J.state = 'succeeded'55 AND JSON_EXTRACT_SCALAR(S.descriptor, '$.crc32c') = J.checksum_crc32c56 AND '/v1/' || JSON_EXTRACT_SCALAR(S.descriptor, '$.file_id') || '/' || JSON_EXTRACT_SCALAR(S.descriptor, '$.crc32c') || '/' || JSON_EXTRACT_SCALAR(S.descriptor, '$.file_name') = J.target_path57 """58 destination_table_name = f"{file_metadata_type}_with_ids"59 source_path = f"{scratch_config.scratch_area()}/metadata/{file_metadata_type}/*"60 rows = bigquery_service.run_query_using_external_schema(61 query,62 source_paths=[source_path],63 schema=[64 {65 "mode": "REQUIRED",66 "name": f"{file_metadata_type}_id",67 "type": "STRING"68 },69 {70 "mode": "REQUIRED",71 "name": "version",72 "type": "TIMESTAMP"73 },74 {75 "mode": "REQUIRED",76 "name": "content",77 "type": "STRING"78 },79 {80 "mode": "REQUIRED",81 "name": "crc32c",82 "type": "STRING"83 },84 {85 "mode": "REQUIRED",86 "name": "descriptor",87 "type": "STRING"88 }89 ],90 table_name=file_metadata_type,91 destination=f"{scratch_dataset_name}.{destination_table_name}",92 bigquery_project=scratch_config.scratch_bq_project,93 location=target_hca_dataset.bq_location94 )95 for row in rows:96 # check for rows with no file_id97 # we allow null file_ids in the case where there is a `drs_uri` in the file descriptor (i.e., the file is98 # hosted in a repo external to TDR but we still want to expose the file metadata)99 if not row["file_id"] and "drs_uri" not in row["descriptor"]:100 raise NullFileIdException(101 f"File metadata with null file ID detected, will not ingest. Check crc32c and target_path [table={file_metadata_type}]")102 return rows103@solid(104 required_resource_keys={"bigquery_service", "target_hca_dataset", "scratch_config", "data_repo_client"}105)106def inject_file_ids_solid(107 context: AbstractComputeExecutionContext,108 file_metadata_fanout_result: MetadataTypeFanoutResult109) -> MetadataTypeFanoutResult:110 bigquery_service = context.resources.bigquery_service111 target_hca_dataset = context.resources.target_hca_dataset112 scratch_config = context.resources.scratch_config113 file_metadata_type = file_metadata_fanout_result.metadata_type114 scratch_dataset_name = file_metadata_fanout_result.scratch_dataset_name115 _inject_file_ids(116 target_hca_dataset=target_hca_dataset,117 scratch_config=scratch_config,118 file_metadata_type=file_metadata_fanout_result.metadata_type,119 scratch_dataset_name=file_metadata_fanout_result.scratch_dataset_name,120 bigquery_service=bigquery_service121 )122 export_data(123 "file-metadata-with-ids",124 table_name_extension="_with_ids",125 metadata_type=file_metadata_type,126 scratch_config=scratch_config,127 scratch_dataset_name=scratch_dataset_name,128 bigquery_service=bigquery_service129 )130 return file_metadata_fanout_result131@composite_solid(132 output_defs=[DynamicOutputDefinition(dagster_type=Optional[JobId])]133)134def file_metadata_fanout(result: list[JobId], scratch_dataset_name: HcaScratchDatasetName) -> Optional[JobId]:135 results = ingest_file_metadata_type(result, scratch_dataset_name).map(inject_file_ids_solid)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful