How to use hash_sha256 method in localstack

Best Python code snippet using localstack_python

general.py

Source:general.py Github

copy

Full Screen

1import subprocess2import socket3import json4import os5import errno6from datetime import datetime7from lib.sample import SampleMeta8class DateTimeEncoder(json.JSONEncoder):9 def default(self, o):10 if isinstance(o, datetime):11 return o.isoformat()12 return json.JSONEncoder.default(self, o)13class KurasutaApi(object):14 def __init__(self, base_url):15 self.base_url = base_url16 def get_sha256_url(self, hash_sha256):17 return '%s/sha256/%s' % (self.base_url, hash_sha256)18 def get_task_url(self):19 return '%s/task' % self.base_url20 def get_user_agent(self):21 return 'Kurasuta Worker (%s-%s)' % (KurasutaSystem.get_host(), KurasutaSystem.git_revision())22class KurasutaSystem(object):23 def __init__(self, storage):24 if not storage:25 raise Exception('KURASUTA_STORAGE location "%s" missing' % storage)26 if not storage:27 raise Exception('KURASUTA_STORAGE location "%s" is not a directory' % storage)28 self.storage = storage29 @staticmethod30 def get_host():31 return socket.gethostname()32 @staticmethod33 def git_revision():34 return subprocess.check_output([35 'git', '-C', os.path.dirname(os.path.realpath(__file__)),36 'rev-parse', '--short', 'HEAD'37 ]).strip().decode('utf-8')38 def get_hash_dir(self, hash_sha256):39 return os.path.join(self.storage, hash_sha256[0], hash_sha256[1], hash_sha256[2])40 @staticmethod41 def mkdir_p(path):42 try:43 os.makedirs(path)44 except OSError as e:45 if e.errno == errno.EEXIST and os.path.isdir(path):46 pass47 else:48 raise49class KurasutaDatabase(object):50 def __init__(self, connection):51 self.connection = connection52 def delete_sample(self, hash_sha256):53 with self.connection.cursor() as cursor:54 cursor.execute('SELECT id FROM sample WHERE (hash_sha256 = %s)', (hash_sha256,))55 sample_id = cursor.fetchone()[0]56 cursor.execute('DELETE FROM sample_has_peyd WHERE (sample_id = %s)', (sample_id,))57 cursor.execute('DELETE FROM sample_has_heuristic_ioc WHERE (sample_id = %s)', (sample_id,))58 cursor.execute('DELETE FROM debug_directory WHERE (sample_id = %s)', (sample_id,))59 cursor.execute('DELETE FROM section WHERE (sample_id = %s)', (sample_id,))60 cursor.execute('DELETE FROM resource WHERE (sample_id = %s)', (sample_id,))61 cursor.execute('DELETE FROM guid WHERE (sample_id = %s)', (sample_id,))62 cursor.execute('DELETE FROM export_symbol WHERE (sample_id = %s)', (sample_id,))63 cursor.execute('DELETE FROM import WHERE (sample_id = %s)', (sample_id,))64 cursor.execute('DELETE FROM sample_function WHERE (sample_id = %s)', (sample_id,))65 cursor.execute('DELETE FROM sample_has_file_name WHERE (sample_id = %s)', (sample_id,))66 cursor.execute('DELETE FROM sample_has_tag WHERE (sample_id = %s)', (sample_id,))67 cursor.execute('DELETE FROM sample_has_source WHERE (sample_id = %s)', (sample_id,))68 cursor.execute('DELETE FROM sample WHERE (id = %s)', (sample_id,))69 def ensure_row(self, table, field, value):70 select_sql = 'SELECT id FROM %s WHERE %s = %%s' % (table, field)71 insert_sql = 'INSERT INTO %s (%s) VALUES(%%s) RETURNING id' % (table, field)72 with self.connection.cursor() as cursor:73 cursor.execute(select_sql, (value,))74 result = cursor.fetchone()75 if not result:76 with self.connection.cursor() as cursor:77 cursor.execute(insert_sql, (value,))78 result = cursor.fetchone()79 return result[0]80 def ensure_resource_pair(self, pair_name, content_id, content_str):81 select_sql = 'SELECT id FROM resource_%s_pair WHERE (content_id = %%s) AND (content_str = %%s)' % (pair_name,)82 insert_sql = 'INSERT INTO resource_%s_pair (content_id, content_str) VALUES(%%s) RETURNING id' % (pair_name,)83 with self.connection.cursor() as cursor:84 cursor.execute(select_sql, (content_id, content_str))85 result = cursor.fetchone()86 if not result:87 with self.connection.cursor() as cursor:88 cursor.execute(insert_sql, (content_id, content_str))89 result = cursor.fetchone()90 return result[0]91 def create_task(self, task_type, hash_sha256, meta=None):92 """93 :type task_type: str94 :type hash_sha256: str95 :type meta: SampleMeta96 """97 from psycopg2.extras import Json98 payload = meta.to_dict() if meta else {}99 payload['hash_sha256'] = hash_sha256100 with self.connection.cursor() as cursor:101 cursor.execute('INSERT INTO task ("type", payload) VALUES(%s, %s)', (task_type, Json(payload)))102class SampleSourceRepository(object):103 def __init__(self, connection):104 with connection.cursor() as cursor:105 cursor.execute('SELECT identifier, id FROM sample_source')106 self.cache = dict(cursor.fetchall())107 def get_by_identifier(self, identifier):108 if identifier not in self.cache.keys():109 raise Exception('sample source identifier "%s" not found' % identifier)...

Full Screen

Full Screen

test_plugins.py

Source:test_plugins.py Github

copy

Full Screen

...18 plugin_ctx = PluginsTrackingContext.from_block(cmd)19 assert plugin_ctx.schema == PluginsContextSchema.url20 assert len(plugin_ctx.data.get("plugins")) == 121 plugin = plugin_ctx.data.get("plugins")[0]22 assert plugin.get("name_hash") == hash_sha256(dbt.name)23 assert plugin.get("namespace_hash") == hash_sha256(dbt.namespace)24 assert plugin.get("executable_hash") == hash_sha256(dbt.executable)25 assert plugin.get("variant_name_hash") == hash_sha256(dbt.variant)26 assert plugin.get("pip_url_hash") == hash_sha256(dbt.formatted_pip_url)27 assert plugin.get("parent_name_hash") == hash_sha256(dbt.parent.name)28 assert plugin.get("command") == "test"29 def test_plugins_tracking_context(self, tap: ProjectPlugin, dbt: ProjectPlugin):30 plugin_ctx = PluginsTrackingContext([(tap, None), (dbt, "test")])31 assert plugin_ctx.schema == PluginsContextSchema.url32 assert len(plugin_ctx.data.get("plugins")) == 233 for plugin in plugin_ctx.data.get("plugins"):34 if plugin.get("category") == "extractors":35 assert plugin.get("name_hash") == hash_sha256(tap.name)36 assert plugin.get("namespace_hash") == hash_sha256(tap.namespace)37 assert plugin.get("executable_hash") == hash_sha256(tap.executable)38 assert plugin.get("variant_name_hash") == hash_sha256(tap.variant)39 assert plugin.get("pip_url_hash") == hash_sha256(tap.formatted_pip_url)40 assert plugin.get("parent_name_hash") == hash_sha256(tap.parent.name)41 assert plugin.get("command") is None42 elif plugin.get("category") == "transformers":43 assert plugin.get("name_hash") == hash_sha256(dbt.name)44 assert plugin.get("namespace_hash") == hash_sha256(dbt.namespace)45 assert plugin.get("executable_hash") == hash_sha256(dbt.executable)46 assert plugin.get("variant_name_hash") == hash_sha256(dbt.variant)47 assert plugin.get("pip_url_hash") == hash_sha256(dbt.formatted_pip_url)48 assert plugin.get("parent_name_hash") == hash_sha256(dbt.parent.name)49 assert plugin.get("command") == "test"50 # verify that passing a None object results in an empty plugin context.51 plugin_ctx = PluginsTrackingContext([(None, None)])52 assert plugin_ctx.data.get("plugins") == [{}]53 # verify that passing a plugin with no parent does not result in an error.54 # most likely this is a plugin that is not installed and is being removed or somehow referenced.55 tap.parent = None56 plugin_ctx = PluginsTrackingContext([(tap, None)])57 assert len(plugin_ctx.data.get("plugins")) == 158 plugin_with_no_parent = plugin_ctx.data.get("plugins")[0]59 assert plugin_with_no_parent.get("name_hash") == hash_sha256(tap.name)...

Full Screen

Full Screen

tools.py

Source:tools.py Github

copy

Full Screen

...16 x = {0, 1} * 25617 """18 z = int(str(z, encoding='utf-8'), 16)19 z = hex(z >> 2)[2:].encode('utf-8')20 return hash_sha256(x, z)21def prf_sn(sk: bytes, p: bytes):22 """23 sha256(sk||01||p)24 p = {0, 1} * 256 # random25 sk = {0, 1} * 256 # secret key26 """27 p = int(p.decode('utf-8'), 16)28 p = hex((p >> 2) | 1 << 254)[2:].encode('utf-8')29 return hash_sha256(sk, p)30def prf_pk(sk: bytes, pk_sig: bytes):31 """32 sha256(sk||10||256)33 sk = {0, 1} * 25634 pk_sig = {0, 1} * 25435 """36 pk_sig = int(pk_sig.decode('utf-8'), 16)37 pk_sig = hex((pk_sig >> 2) | 1 << 255)[2:].encode('utf-8')38 return hash_sha256(sk, pk_sig)39def comm_r(r:bytes, a_pk, p) -> str:40 """41 input:42 r = {0, 1} * (256 + 128)43 a_pk = {0, 1} * 25644 p = {0, 1} * 25645 output:46 str47 """48 str_h = hash_sha256(a_pk, p)[:128//4]49 return hash_sha256(r, bytes(str_h, encoding='utf-8'))50def comm_s(v:int, k) -> str:51 """52 input:53 k = {0, 1} * 25654 output:55 str56 """57 if isinstance(v, str):58 v = int(v)59 if isinstance(v, int):60 v = bytes(str(v), encoding='utf-8').zfill(64//4)61 return hash_sha256(k, b'0' * (192//4), v)62def CRH(*args):63 return hash_sha256(*args)64def hash_sha256(*args) -> str:65 """66 maps a 512-bit input to 256-bit output67 input: bytes68 output: str69 """70 msg = hashlib.sha256()71 # caoncat input to 512-bit input72 input = b''73 for v in args:74 if isinstance(v, str):75 v = bytes(v, encoding='utf-8')76 input += v77 msg.update(input)78 return msg.hexdigest()...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful