How to use run_processor method in localstack

Best Python code snippet using localstack_python

test_run_processor.py

Source:test_run_processor.py Github

copy

Full Screen

1"""Unit tests for SOG run processor.2:Author: Doug Latornell <djl@douglatornell.ca>3:License: Apache License, Version 2.04Copyright 2010-2014 Doug Latornell and The University of British Columbia5Licensed under the Apache License, Version 2.0 (the "License");6you may not use this file except in compliance with the License.7You may obtain a copy of the License at8 http://www.apache.org/licenses/LICENSE-2.09Unless required by applicable law or agreed to in writing, software10distributed under the License is distributed on an "AS IS" BASIS,11WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.12See the License for the specific language governing permissions and13limitations under the License.14"""15import os16import six17import unittest18try:19 from unittest.mock import (20 Mock,21 patch,22 )23except ImportError:24 from mock import (25 Mock,26 patch,27 )28from .. import run_processor29class TestPrepare(object):30 """Unit tests for the run processor prepare function.31 """32 def _call_prepare(self, args):33 return run_processor.prepare(args)34 @patch.object(run_processor, 'os')35 @patch.object(36 run_processor, 'create_infile', return_value='/tmp/foo.infile')37 def test_prepare_run_cmd_defaults(self, mock_create_infile, mock_os):38 """prepare_run_cmd returns expected command for default args39 """40 mock_os.path.exists.return_value = True41 mock_os.path.basename = os.path.basename42 mock_os.path.abspath = os.path.abspath43 args = Mock(44 SOG_exec='./SOG', infile='infile', outfile=None,45 nice=19, dry_run=False, watch=False, legacy_infile=False)46 cmd = self._call_prepare(args)47 outfile = os.path.abspath(os.path.basename(args.infile) + '.out')48 expected = (49 'nice -n 19 ./SOG < /tmp/foo.infile > {} 2>&1'.format(outfile))50 assert cmd == expected51 @patch.object(run_processor, 'os')52 @patch.object(run_processor, 'create_infile')53 def test_prepare_run_cmd_legacy_infile(self, mock_create_infile, mock_os):54 """prepare_run_cmd returns expected command w/ legacy_infle=True55 """56 mock_os.path.exists.return_value = True57 mock_os.path.basename = os.path.basename58 mock_os.path.abspath = os.path.abspath59 args = Mock(60 SOG_exec='./SOG', infile='infile', outfile=None,61 nice=19, dry_run=False, watch=False, legacy_infile=True)62 cmd = self._call_prepare(args)63 outfile = os.path.abspath(os.path.basename(args.infile) + '.out')64 expected = (65 'nice -n 19 ./SOG < infile > {} 2>&1'.format(outfile))66 assert cmd == expected67 @patch.object(run_processor, 'os')68 @patch.object(run_processor, 'create_infile')69 def test_prepare_run_cmd_editfile_list(self, mock_create_infile, mock_os):70 """prepare_run_cmd calls create_infile with editfile list71 """72 mock_os.path.exists.return_value = True73 mock_os.path.basename = os.path.basename74 mock_os.path.abspath = os.path.abspath75 args = Mock(76 SOG_exec='./SOG', infile='infile.yaml', outfile=None, editfile=[],77 nice=19, dry_run=False, watch=False, legacy_infile=False)78 self._call_prepare(args)79 mock_create_infile.assert_called_once_with(args.infile, args.editfile)80 @patch.object(run_processor, 'os')81 @patch.object(run_processor, 'create_infile')82 def test_prepare_run_cmd_outfile_relative_path(83 self, mock_create_infile, mock_os):84 """prepare_run_cmd returns exp cmd when outfile arg is relative path85 """86 mock_os.path.exists.return_value = True87 mock_create_infile.return_value = '/tmp/foo.infile'88 args = Mock(89 SOG_exec='./SOG', infile='infile', outfile='../foo/bar',90 nice=19, dry_run=False, watch=False, legacy_infile=False)91 cmd = self._call_prepare(args)92 expected = 'nice -n 19 ./SOG < /tmp/foo.infile > ../foo/bar 2>&1'93 assert cmd == expected94 @patch.object(run_processor, 'os')95 @patch.object(96 run_processor, 'create_infile', return_value='/tmp/foo.infile')97 def test_prepare_run_cmd_outfile_absolute_path(98 self, mock_create_infile, mock_os):99 """prepare_run_cmd returns exp cmd when outfile arg is absolute path100 """101 mock_os.path.exists.return_value = True102 args = Mock(103 SOG_exec='./SOG', infile='infile', outfile='/foo/bar',104 nice=19, dry_run=False, watch=False, legacy_infile=False)105 cmd = self._call_prepare(args)106 expected = 'nice -n 19 ./SOG < /tmp/foo.infile > /foo/bar 2>&1'107 assert cmd == expected108class TestDryRun(unittest.TestCase):109 """Unit tests for the run processor dry_run function.110 """111 @patch('sys.stdout', new_callable=six.StringIO)112 def test_run_dry_run_std_msg(self, mock_stdout):113 """run_dry_run prints intro msg and command that would be run114 """115 cmd = 'nice -n 19 ./SOG < infile > ./infile.out'116 args = Mock(watch=False)117 run_processor.dry_run(cmd, args)118 self.assertEqual(119 mock_stdout.getvalue(),120 'Command that would have been used to run SOG:\n {0}\n'121 .format(cmd))122 @patch('sys.stdout', new_callable=six.StringIO)123 def test_run_dry_run_watch_msg(self, mock_stdout):124 """run_dry_run w/ watch prints suffix msg about outfile125 """126 cmd = 'nice -n 19 ./SOG < infile > ./infile.out'127 args = Mock(outfile='./infile.out', watch=True, legacy_infile=False)128 run_processor.dry_run(cmd, args)129 self.assertEqual(130 mock_stdout.getvalue(),131 'Command that would have been used to run SOG:\n {0}\n'132 'Contents of {1.outfile} would have been shown on screen while '...

Full Screen

Full Screen

main.py

Source:main.py Github

copy

Full Screen

...17 '''elif source.endswith('.txt'):18 proc = TxtDataProcessorFactory().get_processor(source)'''19 return proc20# Запуск обработки21def run_processor(proc: DataProcessor) -> None:22 proc.run()23 proc.print_result()24 proc.stat_for_period()25if __name__ == '__main__':26 proc = init_processor("temperature.csv")27 if proc is not None:28 run_processor(proc)29 '''proc = init_processor("humidity.csv")30 if proc is not None:31 run_processor(proc)32 proc = init_processor("pressure.csv")33 if proc is not None:34 run_processor(proc)35 proc = init_processor("wind_direction.csv")36 if proc is not None:37 run_processor(proc)38 proc = init_processor("wind_speed.csv")39 if proc is not None:...

Full Screen

Full Screen

C2W1A3.py

Source:C2W1A3.py Github

copy

Full Screen

...6 for i, _ in enumerate(range(n_packages)):7 values = [int(v) for v in input().split()]8 data.append((i, values[0], values[1]))9 return data10def run_processor(buffer_size, n_packages, packages):11 if n_packages == 0:12 return []13 all_jobs = packages14 results = [None] * n_packages15 buffer = deque()16 finish_times = deque()17 current_time = 018 while all_jobs or buffer:19 while buffer and finish_times[-1][1] == current_time:20 job_done = buffer.pop()21 finish_data = finish_times.pop()22 results[job_done[0]] = finish_data[0]23 if all_jobs and all_jobs[0][1] == current_time:24 received_job = all_jobs.popleft()25 if buffer_size == len(buffer):26 results[received_job[0]] = -127 else:28 started_at = current_time if not buffer else finish_times[0][1]29 complete_at = started_at + received_job[2]30 buffer.appendleft(received_job)31 finish_times.appendleft((started_at, complete_at))32 else:33 current_time += 134 return results35if __name__ == '__main__':36 # assert run_processor(1, 0, deque()) == []37 # assert run_processor(1, 1, deque([(0, 0, 0)])) == [0]38 # assert run_processor(1, 2, deque([(0, 0, 1), (1, 1, 1)])) == [0, 1]39 # assert run_processor(1, 2, deque([(0, 0, 1), (1, 0, 1)])) == [0, -1]40 # assert run_processor(1, 2, deque([(0, 0, 0), (1, 0, 0)])) == [0, 0]41 # assert run_processor(1, 2, deque([(0, 0, 1), (1, 0, 0)])) == [0, -1]42 # assert run_processor(2, 2, deque([(0, 0, 1), (1, 0, 1)])) == [0, 1]43 # assert run_processor(1, 3, deque([(0, 0, 2), (1, 1, 4), (2, 5, 3)])) == [0, -1, 5]44 #45 # import time, random46 #47 # ps = 100048 # test_data = deque()49 # for i, x in enumerate(range(ps)):50 # test_data.append((i, i, random.randint(0, x)))51 #52 # start_time = time.time()53 # res = run_processor(ps, ps, test_data)54 # print(time.time() - start_time)55 buffer_size, n_packages = [int(v) for v in input().split()]56 for r in run_processor(buffer_size, n_packages, input_data(n_packages)):...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful