How to use not_implemented method in Robotframework

Best Python code snippet using robotframework

Run Robotframework automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

defaults.py

Source: defaults.py Github

copy
1import operator
2import random
3import sys
4
5from eth_tester.exceptions import (
6    BlockNotFound,
7    FilterNotFound,
8    TransactionNotFound,
9    ValidationError,
10)
11from client_sdk_python.packages.eth_utils import (
12    decode_hex,
13    encode_hex,
14    is_null,
15    keccak,
16)
17
18from client_sdk_python.utils.formatters import (
19    apply_formatter_if,
20)
21from client_sdk_python.utils.toolz import (
22    compose,
23    curry,
24    excepts,
25)
26
27
28def not_implemented(*args, **kwargs):
29    raise NotImplementedError("RPC method not implemented")
30
31
32@curry
33def call_eth_tester(fn_name, eth_tester, fn_args, fn_kwargs=None):
34    if fn_kwargs is None:
35        fn_kwargs = {}
36    return getattr(eth_tester, fn_name)(*fn_args, **fn_kwargs)
37
38
39def without_eth_tester(fn):
40    # workaround for: https://github.com/pytoolz/cytoolz/issues/103
41    # @functools.wraps(fn)
42    def inner(eth_tester, params):
43        return fn(params)
44    return inner
45
46
47def without_params(fn):
48    # workaround for: https://github.com/pytoolz/cytoolz/issues/103
49    # @functools.wraps(fn)
50    def inner(eth_tester, params):
51        return fn(eth_tester)
52    return inner
53
54
55@curry
56def preprocess_params(eth_tester, params, preprocessor_fn):
57    return eth_tester, preprocessor_fn(params)
58
59
60def static_return(value):
61    def inner(*args, **kwargs):
62        return value
63    return inner
64
65
66def client_version(eth_tester, params):
67    # TODO: account for the backend that is in use.
68    from eth_tester import __version__
69    return "EthereumTester/{version}/{platform}/python{v.major}.{v.minor}.{v.micro}".format(
70        version=__version__,
71        v=sys.version_info,
72        platform=sys.platform,
73    )
74
75
76@curry
77def null_if_excepts(exc_type, fn):
78    return excepts(
79        exc_type,
80        fn,
81        static_return(None),
82    )
83
84
85null_if_block_not_found = null_if_excepts(BlockNotFound)
86null_if_transaction_not_found = null_if_excepts(TransactionNotFound)
87null_if_filter_not_found = null_if_excepts(FilterNotFound)
88null_if_indexerror = null_if_excepts(IndexError)
89
90
91@null_if_indexerror
92@null_if_block_not_found
93def get_transaction_by_block_hash_and_index(eth_tester, params):
94    block_hash, transaction_index = params
95    block = eth_tester.get_block_by_hash(block_hash, full_transactions=True)
96    transaction = block['transactions'][transaction_index]
97    return transaction
98
99
100@null_if_indexerror
101@null_if_block_not_found
102def get_transaction_by_block_number_and_index(eth_tester, params):
103    block_number, transaction_index = params
104    block = eth_tester.get_block_by_number(block_number, full_transactions=True)
105    transaction = block['transactions'][transaction_index]
106    return transaction
107
108
109def create_log_filter(eth_tester, params):
110    filter_params = params[0]
111    filter_id = eth_tester.create_log_filter(**filter_params)
112    return filter_id
113
114
115def get_logs(eth_tester, params):
116    filter_params = params[0]
117    logs = eth_tester.get_logs(**filter_params)
118    return logs
119
120
121def _generate_random_private_key():
122    """
123    WARNING: This is not a secure way to generate private keys and should only
124    be used for testing purposes.
125    """
126    return encode_hex(bytes(bytearray((
127        random.randint(0, 255)
128        for _ in range(32)
129    ))))
130
131
132@without_params
133def create_new_account(eth_tester):
134    return eth_tester.add_account(_generate_random_private_key())
135
136
137def personal_send_transaction(eth_tester, params):
138    transaction, password = params
139
140    try:
141        eth_tester.unlock_account(transaction['from'], password)
142        transaction_hash = eth_tester.send_transaction(transaction)
143    finally:
144        eth_tester.lock_account(transaction['from'])
145
146    return transaction_hash
147
148
149API_ENDPOINTS = {
150    'web3': {
151        'clientVersion': client_version,
152        'sha3': compose(
153            encode_hex,
154            keccak,
155            decode_hex,
156            without_eth_tester(operator.itemgetter(0)),
157        ),
158    },
159    'net': {
160        'version': not_implemented,
161        'peerCount': not_implemented,
162        'listening': not_implemented,
163    },
164    'eth': {
165        'protocolVersion': not_implemented,
166        'syncing': not_implemented,
167        'coinbase': compose(
168            operator.itemgetter(0),
169            call_eth_tester('get_accounts'),
170        ),
171        'mining': not_implemented,
172        'hashrate': not_implemented,
173        'gasPrice': not_implemented,
174        'accounts': call_eth_tester('get_accounts'),
175        'blockNumber': compose(
176            operator.itemgetter('number'),
177            call_eth_tester('get_block_by_number', fn_kwargs={'block_number': 'latest'}),
178        ),
179        'getBalance': call_eth_tester('get_balance'),
180        'getStorageAt': not_implemented,
181        'getTransactionCount': call_eth_tester('get_nonce'),
182        'getBlockTransactionCountByHash': null_if_block_not_found(compose(
183            len,
184            operator.itemgetter('transactions'),
185            call_eth_tester('get_block_by_hash'),
186        )),
187        'getBlockTransactionCountByNumber': null_if_block_not_found(compose(
188            len,
189            operator.itemgetter('transactions'),
190            call_eth_tester('get_block_by_number'),
191        )),
192        'getUncleCountByBlockHash': null_if_block_not_found(compose(
193            len,
194            operator.itemgetter('uncles'),
195            call_eth_tester('get_block_by_hash'),
196        )),
197        'getUncleCountByBlockNumber': null_if_block_not_found(compose(
198            len,
199            operator.itemgetter('uncles'),
200            call_eth_tester('get_block_by_number'),
201        )),
202        'getCode': call_eth_tester('get_code'),
203        'sign': not_implemented,
204        'sendTransaction': call_eth_tester('send_transaction'),
205        'sendRawTransaction': call_eth_tester('send_raw_transaction'),
206        'call': call_eth_tester('call'),  # TODO: untested
207        'estimateGas': call_eth_tester('estimate_gas'),  # TODO: untested
208        'getBlockByHash': null_if_block_not_found(call_eth_tester('get_block_by_hash')),
209        'getBlockByNumber': null_if_block_not_found(call_eth_tester('get_block_by_number')),
210        'getTransactionByHash': null_if_transaction_not_found(
211            call_eth_tester('get_transaction_by_hash')
212        ),
213        'getTransactionByBlockHashAndIndex': get_transaction_by_block_hash_and_index,
214        'getTransactionByBlockNumberAndIndex': get_transaction_by_block_number_and_index,
215        'getTransactionReceipt': null_if_transaction_not_found(compose(
216            apply_formatter_if(
217                compose(is_null, operator.itemgetter('block_number')),
218                static_return(None),
219            ),
220            call_eth_tester('get_transaction_receipt'),
221        )),
222        'getUncleByBlockHashAndIndex': not_implemented,
223        'getUncleByBlockNumberAndIndex': not_implemented,
224        'getCompilers': not_implemented,
225        'compileLLL': not_implemented,
226        'compileSolidity': not_implemented,
227        'compileSerpent': not_implemented,
228        'newFilter': create_log_filter,
229        'newBlockFilter': call_eth_tester('create_block_filter'),
230        'newPendingTransactionFilter': call_eth_tester('create_pending_transaction_filter'),
231        'uninstallFilter': excepts(
232            FilterNotFound,
233            compose(
234                is_null,
235                call_eth_tester('delete_filter'),
236            ),
237            static_return(False),
238        ),
239        'getFilterChanges': null_if_filter_not_found(call_eth_tester('get_only_filter_changes')),
240        'getFilterLogs': null_if_filter_not_found(call_eth_tester('get_all_filter_logs')),
241        'getLogs': get_logs,
242        'getWork': not_implemented,
243        'submitWork': not_implemented,
244        'submitHashrate': not_implemented,
245    },
246    'db': {
247        'putString': not_implemented,
248        'getString': not_implemented,
249        'putHex': not_implemented,
250        'getHex': not_implemented,
251    },
252    'shh': {
253        'post': not_implemented,
254        'version': not_implemented,
255        'newIdentity': not_implemented,
256        'hasIdentity': not_implemented,
257        'newGroup': not_implemented,
258        'addToGroup': not_implemented,
259        'newFilter': not_implemented,
260        'uninstallFilter': not_implemented,
261        'getFilterChanges': not_implemented,
262        'getMessages': not_implemented,
263    },
264    'admin': {
265        'addPeer': not_implemented,
266        'datadir': not_implemented,
267        'nodeInfo': not_implemented,
268        'peers': not_implemented,
269        'setSolc': not_implemented,
270        'startRPC': not_implemented,
271        'startWS': not_implemented,
272        'stopRPC': not_implemented,
273        'stopWS': not_implemented,
274    },
275    'debug': {
276        'backtraceAt': not_implemented,
277        'blockProfile': not_implemented,
278        'cpuProfile': not_implemented,
279        'dumpBlock': not_implemented,
280        'gtStats': not_implemented,
281        'getBlockRLP': not_implemented,
282        'goTrace': not_implemented,
283        'memStats': not_implemented,
284        'seedHashSign': not_implemented,
285        'setBlockProfileRate': not_implemented,
286        'setHead': not_implemented,
287        'stacks': not_implemented,
288        'startCPUProfile': not_implemented,
289        'startGoTrace': not_implemented,
290        'stopCPUProfile': not_implemented,
291        'stopGoTrace': not_implemented,
292        'traceBlock': not_implemented,
293        'traceBlockByNumber': not_implemented,
294        'traceBlockByHash': not_implemented,
295        'traceBlockFromFile': not_implemented,
296        'traceTransaction': not_implemented,
297        'verbosity': not_implemented,
298        'vmodule': not_implemented,
299        'writeBlockProfile': not_implemented,
300        'writeMemProfile': not_implemented,
301    },
302    'miner': {
303        'makeDAG': not_implemented,
304        'setExtra': not_implemented,
305        'setGasPrice': not_implemented,
306        'start': not_implemented,
307        'startAutoDAG': not_implemented,
308        'stop': not_implemented,
309        'stopAutoDAG': not_implemented,
310    },
311    'personal': {
312        'ecRecover': not_implemented,
313        'importRawKey': call_eth_tester('add_account'),
314        'listAccounts': call_eth_tester('get_accounts'),
315        'lockAccount': excepts(
316            ValidationError,
317            compose(static_return(True), call_eth_tester('lock_account')),
318            static_return(False),
319        ),
320        'newAccount': create_new_account,
321        'unlockAccount': excepts(
322            ValidationError,
323            compose(static_return(True), call_eth_tester('unlock_account')),
324            static_return(False),
325        ),
326        'sendTransaction': personal_send_transaction,
327        'sign': not_implemented,
328    },
329    'testing': {
330        'timeTravel': call_eth_tester('time_travel'),
331    },
332    'txpool': {
333        'content': not_implemented,
334        'inspect': not_implemented,
335        'status': not_implemented,
336    },
337    'evm': {
338        'mine': call_eth_tester('mine_blocks'),
339        'revert': call_eth_tester('revert_to_snapshot'),
340        'snapshot': call_eth_tester('take_snapshot'),
341    },
342}
343
Full Screen

Accelerate Your Automation Test Cycles With LambdaTest

Leverage LambdaTest’s cloud-based platform to execute your automation tests in parallel and trim down your test execution time significantly. Your first 100 automation testing minutes are on us.

Try LambdaTest

Run Python Tests on LambdaTest Cloud Grid

Execute automation tests with Robotframework on a cloud-based Grid of 3000+ real browsers and operating systems for both web and mobile applications.

Test now for Free
LambdaTestX

We use cookies to give you the best experience. Cookies help to provide a more personalized experience and relevant advertising for you, and web analytics for us. Learn More in our Cookies policy, Privacy & Terms of service

Allow Cookie
Sarah

I hope you find the best code examples for your project.

If you want to accelerate automated browser testing, try LambdaTest. Your first 100 automation testing minutes are FREE.

Sarah Elson (Product & Growth Lead)