How to use _context_id method in autotest

Best Python code snippet using autotest_python

context.py

Source:context.py Github

copy

Full Screen

1# Copyright 2016 Intel Corporation2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7# http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.12# See the License for the specific language governing permissions and13# limitations under the License.14# ------------------------------------------------------------------------------15from sawtooth_sdk.protobuf.validator_pb2 import Message16from sawtooth_sdk.protobuf import state_context_pb217from sawtooth_sdk.protobuf import events_pb218from sawtooth_sdk.processor.exceptions import InternalError19from sawtooth_sdk.processor.exceptions import AuthorizationException20class Context(object):21 """22 Context provides an interface for getting, setting, and deleting23 validator state. All validator interactions by a handler should be24 through a Context instance.25 Attributes:26 _stream (sawtooth.client.stream.Stream): client grpc communication27 _context_id (str): the context_id passed in from the validator28 """29 def __init__(self, stream, context_id):30 self._stream = stream31 self._context_id = context_id32 def get_state(self, addresses, timeout=None):33 """34 get_state queries the validator state for data at each of the35 addresses in the given list. The addresses that have been set36 are returned in a list.37 Args:38 addresses (list): the addresses to fetch39 timeout: optional timeout, in seconds40 Returns:41 results (list): a list of Entries (address, data), for the42 addresses that have a value43 Raises:44 AuthorizationException45 """46 request = state_context_pb2.TpStateGetRequest(47 context_id=self._context_id,48 addresses=addresses)49 response_string = self._stream.send(50 Message.TP_STATE_GET_REQUEST,51 request.SerializeToString()).result(timeout).content52 response = state_context_pb2.TpStateGetResponse()53 response.ParseFromString(response_string)54 if response.status == \55 state_context_pb2.TpStateGetResponse.AUTHORIZATION_ERROR:56 raise AuthorizationException(57 'Tried to get unauthorized address: {}'.format(addresses))58 entries = response.entries if response is not None else []59 results = [e for e in entries if len(e.data) != 0]60 return results61 def set_state(self, entries, timeout=None):62 """63 set_state requests that each address in the provided dictionary be64 set in validator state to its corresponding value. A list is65 returned containing the successfully set addresses.66 Args:67 entries (dict): dictionary where addresses are the keys and data is68 the value.69 timeout: optional timeout, in seconds70 Returns:71 addresses (list): a list of addresses that were set72 Raises:73 AuthorizationException74 """75 state_entries = [state_context_pb2.TpStateEntry(76 address=e,77 data=entries[e]) for e in entries]78 request = state_context_pb2.TpStateSetRequest(79 entries=state_entries,80 context_id=self._context_id).SerializeToString()81 response = state_context_pb2.TpStateSetResponse()82 response.ParseFromString(83 self._stream.send(Message.TP_STATE_SET_REQUEST,84 request).result(timeout).content)85 if response.status == \86 state_context_pb2.TpStateSetResponse.AUTHORIZATION_ERROR:87 addresses = [e.address for e in entries]88 raise AuthorizationException(89 'Tried to set unauthorized address: {}'.format(addresses))90 return response.addresses91 def delete_state(self, addresses, timeout=None):92 """93 delete_state requests that each of the provided addresses be unset94 in validator state. A list of successfully deleted addresses95 is returned.96 Args:97 addresses (list): list of addresses to delete98 timeout: optional timeout, in seconds99 Returns:100 addresses (list): a list of addresses that were deleted101 Raises:102 AuthorizationException103 """104 request = state_context_pb2.TpStateDeleteRequest(105 context_id=self._context_id,106 addresses=addresses).SerializeToString()107 response = state_context_pb2.TpStateDeleteResponse()108 response.ParseFromString(109 self._stream.send(Message.TP_STATE_DELETE_REQUEST,110 request).result(timeout).content)111 if response.status == \112 state_context_pb2.TpStateDeleteResponse.AUTHORIZATION_ERROR:113 raise AuthorizationException(114 'Tried to delete unauthorized address: {}'.format(addresses))115 return response.addresses116 def add_receipt_data(self, data, timeout=None):117 """Add a blob to the execution result for this transaction.118 Args:119 data (bytes): The data to add.120 """121 request = state_context_pb2.TpReceiptAddDataRequest(122 context_id=self._context_id,123 data=data).SerializeToString()124 response = state_context_pb2.TpReceiptAddDataResponse()125 response.ParseFromString(126 self._stream.send(127 Message.TP_RECEIPT_ADD_DATA_REQUEST,128 request).result(timeout).content)129 if response.status == state_context_pb2.TpReceiptAddDataResponse.ERROR:130 raise InternalError(131 "Failed to add receipt data: {}".format((data)))132 def add_event(self, event_type, attributes=None, data=None, timeout=None):133 """Add a new event to the execution result for this transaction.134 Args:135 event_type (str): This is used to subscribe to events. It should be136 globally unique and describe what, in general, has occured.137 attributes (list of (str, str) tuples): Additional information138 about the event that is transparent to the validator.139 Attributes can be used by subscribers to filter the type of140 events they receive.141 data (bytes): Additional information about the event that is opaque142 to the validator.143 """144 if attributes is None:145 attributes = []146 event = events_pb2.Event(147 event_type=event_type,148 attributes=[149 events_pb2.Event.Attribute(key=key, value=value)150 for key, value in attributes151 ],152 data=data,153 )154 request = state_context_pb2.TpEventAddRequest(155 context_id=self._context_id, event=event).SerializeToString()156 response = state_context_pb2.TpEventAddResponse()157 response.ParseFromString(158 self._stream.send(159 Message.TP_EVENT_ADD_REQUEST,160 request).result(timeout).content)161 if response.status == state_context_pb2.TpEventAddResponse.ERROR:162 raise InternalError(163 "Failed to add event: ({}, {}, {})".format(...

Full Screen

Full Screen

task.py

Source:task.py Github

copy

Full Screen

...108 _context_id109 except NameError, e:110 # this is expected to happen, it's normal behaviour.111 logger.info(e) # global name '_context_id' is not defined112 _context_id = bauble.gui.widgets.statusbar.get_context_id('__task')113 logger.info("new context id: %s" % _context_id)114 msg_id = bauble.gui.widgets.statusbar.push(_context_id, msg)115 __message_ids.append(msg_id)116 return msg_id117def clear_messages():118 """119 Clear all the messages from the statusbar that were set with120 :func:`bauble.task.set_message`121 """122 if bauble.gui is None or bauble.gui.widgets is None \123 or bauble.gui.widgets.statusbar is None:124 return125 global _context_id, __message_ids126 for mid in __message_ids:...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful