How to use skipped method in Slash

Best Python code snippet using slash

run-api-tests

Source:run-api-tests Github

copy

Full Screen

1#!/usr/bin/env python2#3# Copyright (C) 2011 Igalia S.L.4#5# This library is free software; you can redistribute it and/or6# modify it under the terms of the GNU Library General Public7# License as published by the Free Software Foundation; either8# version 2 of the License, or (at your option) any later version.9#10# This library is distributed in the hope that it will be useful,11# but WITHOUT ANY WARRANTY; without even the implied warranty of12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU13# Library General Public License for more details.14#15# You should have received a copy of the GNU Library General Public License16# along with this library; see the file COPYING.LIB. If not, write to17# the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,18# Boston, MA 02110-1301, USA.19import common20import subprocess21import os22import sys23import optparse24import re25from signal import alarm, signal, SIGALRM, SIGKILL26from gi.repository import Gio, GLib27class SkippedTest:28 def __init__(self, test, reason, bug=None, test_cases=[]):29 self.test = test30 self.reason = reason31 self.bug = bug32 self.test_cases = test_cases33 def __str__(self):34 skipped_test_str = "%s" % self.test35 if self.test_cases:36 skipped_test_str += " [%s]" % ", ".join(self.test_cases)37 skipped_test_str += ": %s " % self.reason38 if self.bug is not None:39 skipped_test_str += "(https://bugs.webkit.org/show_bug.cgi?id=%d)" % self.bug40 return skipped_test_str41class TestTimeout(Exception):42 pass43class TestRunner:44 TEST_DIRS = [ "unittests", "WebKit2APITests", "TestWebKitAPI/WTF", "TestWebKitAPI/WebKit2" ]45 SKIPPED = [46 SkippedTest("unittests/testdownload",47 "Test fails in GTK Linux 64-bit Release bot",48 82329,49 ["/webkit/download/not-found"]),50 SkippedTest("unittests/testwebview",51 "Test times out in GTK Linux 64-bit Release bot",52 82328,53 ["/webkit/webview/icon-uri"]),54 SkippedTest("unittests/testwebresource",55 "Test fails in GTK Linux 64-bit Release bot",56 82330,57 ["/webkit/webresource/sub_resource_loading"]),58 SkippedTest("unittests/testwebinspector",59 "Test is flaky in GTK Linux 32-bit Release bot",60 82869,61 ["/webkit/webinspector/close-and-inspect"]),62 SkippedTest("WebKit2APITests/TestWebKitWebView",63 "Test is flaky in GTK Linux 32-bit Release bot",64 82866,65 ["/webkit2/WebKitWebView/mouse-target"]),66 SkippedTest("WebKit2APITests/TestResources",67 "Test is flaky in GTK Linux 32-bit Release bot",68 82868,69 ["/webkit2/WebKitWebView/resources"]),70 SkippedTest("TestWebKitAPI/WebKit2/TestWKConnection",71 "Test times out",72 84959),73 SkippedTest("TestWebKitAPI/WebKit2/TestRestoreSessionStateContainingFormData",74 "Session State is not implemented in GTK+ port",75 84960),76 SkippedTest("TestWebKitAPI/WebKit2/TestSpacebarScrolling",77 "Test fails",78 84961),79 SkippedTest("TestWebKitAPI/WebKit2/TestNewFirstVisuallyNonEmptyLayoutFrames",80 "Test fails",81 85037),82 SkippedTest("TestWebKitAPI/WebKit2/TestMouseMoveAfterCrash",83 "Test is flaky",84 85066)85 ]86 def __init__(self, options, tests=[]):87 self._options = options88 self._programs_path = common.build_path("Programs")89 self._tests = self._get_tests(tests)90 self._skipped_tests = TestRunner.SKIPPED91 # These SPI daemons need to be active for the accessibility tests to work.92 self._spi_registryd = None93 self._spi_bus_launcher = None94 def _get_tests(self, tests):95 if tests:96 return tests97 tests = []98 for test_dir in self.TEST_DIRS:99 absolute_test_dir = os.path.join(self._programs_path, test_dir)100 if not os.path.isdir(absolute_test_dir):101 continue102 for test_file in os.listdir(absolute_test_dir):103 if not test_file.lower().startswith("test"):104 continue105 test_path = os.path.join(self._programs_path, test_dir, test_file)106 if os.path.isfile(test_path) and os.access(test_path, os.X_OK):107 tests.append(test_path)108 return tests109 def _lookup_atspi2_binary(self, filename):110 exec_prefix = common.pkg_config_file_variable('atspi-2', 'exec_prefix')111 if not exec_prefix:112 return None113 for path in ['libexec', 'lib/at-spi2-core', 'lib32/at-spi2-core', 'lib64/at-spi2-core']:114 filepath = os.path.join(exec_prefix, path, filename)115 if os.path.isfile(filepath):116 return filepath117 return None118 def _start_accessibility_daemons(self):119 spi_bus_launcher_path = self._lookup_atspi2_binary('at-spi-bus-launcher')120 spi_registryd_path = self._lookup_atspi2_binary('at-spi2-registryd')121 if not spi_bus_launcher_path or not spi_registryd_path:122 return False123 try:124 self._ally_bus_launcher = subprocess.Popen([spi_bus_launcher_path], env=self._test_env)125 except:126 sys.stderr.write("Failed to launch the accessibility bus\n")127 sys.stderr.flush()128 return False129 # We need to wait until the SPI bus is launched before trying to start the SPI130 # registry, so we spin a main loop until the bus name appears on DBus.131 loop = GLib.MainLoop()132 Gio.bus_watch_name(Gio.BusType.SESSION, 'org.a11y.Bus', Gio.BusNameWatcherFlags.NONE,133 lambda *args: loop.quit(), None)134 loop.run()135 try:136 self._spi_registryd = subprocess.Popen([spi_registryd_path], env=self._test_env)137 except:138 sys.stderr.write("Failed to launch the accessibility registry\n")139 sys.stderr.flush()140 return False141 return True142 def _setup_testing_environment(self):143 self._test_env = os.environ144 self._test_env["DISPLAY"] = self._options.display145 self._test_env["WEBKIT_INSPECTOR_PATH"] = os.path.abspath(os.path.join(self._programs_path, 'resources', 'inspector'))146 self._test_env['GSETTINGS_BACKEND'] = 'memory'147 self._test_env["TEST_WEBKIT_API_WEBKIT2_RESOURCES_PATH"] = common.top_level_path("Tools", "TestWebKitAPI", "Tests", "WebKit2")148 self._test_env["TEST_WEBKIT_API_WEBKIT2_INJECTED_BUNDLE_PATH"] = common.build_path("Libraries")149 self._test_env["WEBKIT_EXEC_PATH"] = self._programs_path150 try:151 self._xvfb = subprocess.Popen(["Xvfb", self._options.display, "-screen", "0", "800x600x24", "-nolisten", "tcp"],152 stdout=subprocess.PIPE, stderr=subprocess.PIPE)153 except Exception as e:154 sys.stderr.write("Failed to run Xvfb: %s\n", e)155 sys.stderr.flush()156 return False157 # If we cannot start the accessibility daemons, we can just skip the accessibility tests.158 if not self._start_accessibility_daemons():159 print "Could not start accessibility bus, so skipping TestWebKitAccessibility"160 self._skipped_tests.append(SkippedTest("WebKit2APITests/TestWebKitAccessibility",161 "Could not start accessibility bus"))162 return True163 def _tear_down_testing_environment(self):164 if self._spi_registryd:165 self._spi_registryd.terminate()166 if self._spi_bus_launcher:167 self._spi_bus_launcher.terminate()168 self._xvfb.terminate()169 def _find_skipped_test(self, test):170 for skipped in self._skipped_tests:171 if test.endswith(skipped.test):172 return skipped173 return None174 def _test_cases_to_skip(self, test):175 if self._options.skipped_action != 'skip':176 return []177 skipped = self._find_skipped_test(test)178 if skipped is not None:179 return skipped.test_cases180 return []181 def _should_run_test(self, test):182 # Skipped test are ignored, run all tests.183 if self._options.skipped_action == 'ignore':184 return True185 skipped = self._find_skipped_test(test)186 # By default skipped test are skipped, run them only when there are specific test cases failing.187 if self._options.skipped_action == 'skip':188 return skipped is None or skipped.test_cases189 # Run only skipped tests.190 return skipped is not None191 def _get_child_pid_from_test_output(self, output):192 if not output:193 return -1194 match = re.search(r'\(pid=(?P<child_pid>[0-9]+)\)', output)195 if not match:196 return -1197 return int(match.group('child_pid'))198 def _kill_process(self, pid):199 try:200 os.kill(pid, SIGKILL)201 except OSError:202 # Process already died.203 pass204 def _run_test_command(self, command, timeout=-1):205 def alarm_handler(signum, frame):206 raise TestTimeout207 p = subprocess.Popen(command, stdout=subprocess.PIPE, env=self._test_env)208 if timeout > 0:209 signal(SIGALRM, alarm_handler)210 alarm(timeout)211 stdout = ""212 try:213 stdout = p.communicate()[0]214 if timeout > 0:215 alarm(0)216 sys.stdout.write(stdout)217 sys.stdout.flush()218 except TestTimeout:219 self._kill_process(p.pid)220 child_pid = self._get_child_pid_from_test_output(stdout)221 if child_pid > 0:222 self._kill_process(child_pid)223 raise224 return not p.returncode225 def _run_test_glib(self, test):226 tester_command = ['gtester']227 if self._options.verbose:228 tester_command.append('--verbose')229 for test_case in self._test_cases_to_skip(test):230 tester_command.extend(['-s', test_case])231 tester_command.append(test)232 return self._run_test_command(tester_command, self._options.timeout)233 def _run_test_google(self, test):234 tester_command = [test, "--gtest_throw_on_failure"]235 skipped_tests_cases = self._test_cases_to_skip(test)236 if skipped_tests_cases:237 tester_command.append("--gtest_filter=-%s" % ":".join(skipped_tests_cases))238 return self._run_test_command(tester_command, self._options.timeout)239 def _run_test(self, test):240 if "unittests" in test or "WebKit2APITests" in test:241 return self._run_test_glib(test)242 if "TestWebKitAPI" in test:243 return self._run_test_google(test)244 return False245 def run_tests(self):246 if not self._tests:247 sys.stderr.write("ERROR: tests not found in %s.\n" % (self._programs_path))248 sys.stderr.flush()249 return 1250 if not self._setup_testing_environment():251 return 1252 # Remove skipped tests now instead of when we find them, because253 # some tests might be skipped while setting up the test environment.254 self._tests = [test for test in self._tests if self._should_run_test(test)]255 failed_tests = []256 timed_out_tests = []257 try:258 for test in self._tests:259 success = True260 try:261 success = self._run_test(test)262 except TestTimeout:263 sys.stdout.write("TEST: %s: TIMEOUT\n" % test)264 sys.stdout.flush()265 timed_out_tests.append(test)266 if not success:267 failed_tests.append(test)268 finally:269 self._tear_down_testing_environment()270 if failed_tests:271 names = [test.replace(self._programs_path, '', 1) for test in failed_tests]272 sys.stdout.write("Tests failed: %s\n" % ", ".join(names))273 sys.stdout.flush()274 if timed_out_tests:275 names = [test.replace(self._programs_path, '', 1) for test in timed_out_tests]276 sys.stdout.write("Tests that timed out: %s\n" % ", ".join(names))277 sys.stdout.flush()278 if self._skipped_tests and self._options.skipped_action == 'skip':279 sys.stdout.write("Tests skipped:\n%s\n" % "\n".join([str(skipped) for skipped in self._skipped_tests]))280 sys.stdout.flush()281 return len(failed_tests)282if __name__ == "__main__":283 option_parser = optparse.OptionParser(usage='usage: %prog [options] [test...]')284 option_parser.add_option('-r', '--release',285 action='store_true', dest='release',286 help='Run in Release')287 option_parser.add_option('-d', '--debug',288 action='store_true', dest='debug',289 help='Run in Debug')290 option_parser.add_option('-v', '--verbose',291 action='store_true', dest='verbose',292 help='Run gtester in verbose mode')293 option_parser.add_option('--display', action='store', dest='display', default=':55',294 help='Display to run Xvfb')295 option_parser.add_option('--skipped', action='store', dest='skipped_action',296 choices=['skip', 'ignore', 'only'], default='skip',297 metavar='skip|ignore|only',298 help='Specifies how to treat the skipped tests')299 option_parser.add_option('-t', '--timeout',300 action='store', type='int', dest='timeout', default=10,301 help='Time in seconds until a test times out')302 options, args = option_parser.parse_args()...

Full Screen

Full Screen

trigger_rule_dep.py

Source:trigger_rule_dep.py Github

copy

Full Screen

1#2# Licensed to the Apache Software Foundation (ASF) under one3# or more contributor license agreements. See the NOTICE file4# distributed with this work for additional information5# regarding copyright ownership. The ASF licenses this file6# to you under the Apache License, Version 2.0 (the7# "License"); you may not use this file except in compliance8# with the License. You may obtain a copy of the License at9#10# http://www.apache.org/licenses/LICENSE-2.011#12# Unless required by applicable law or agreed to in writing,13# software distributed under the License is distributed on an14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY15# KIND, either express or implied. See the License for the16# specific language governing permissions and limitations17# under the License.18from collections import Counter19from airflow.ti_deps.deps.base_ti_dep import BaseTIDep20from airflow.utils.session import provide_session21from airflow.utils.state import State22from airflow.utils.trigger_rule import TriggerRule as TR23class TriggerRuleDep(BaseTIDep):24 """25 Determines if a task's upstream tasks are in a state that allows a given task instance26 to run.27 """28 NAME = "Trigger Rule"29 IGNOREABLE = True30 IS_TASK_DEP = True31 @staticmethod32 def _get_states_count_upstream_ti(ti, finished_tasks):33 """34 This function returns the states of the upstream tis for a specific ti in order to determine35 whether this ti can run in this iteration36 :param ti: the ti that we want to calculate deps for37 :type ti: airflow.models.TaskInstance38 :param finished_tasks: all the finished tasks of the dag_run39 :type finished_tasks: list[airflow.models.TaskInstance]40 """41 counter = Counter(task.state for task in finished_tasks if task.task_id in ti.task.upstream_task_ids)42 return (43 counter.get(State.SUCCESS, 0),44 counter.get(State.SKIPPED, 0),45 counter.get(State.FAILED, 0),46 counter.get(State.UPSTREAM_FAILED, 0),47 sum(counter.values()),48 )49 @provide_session50 def _get_dep_statuses(self, ti, session, dep_context):51 # Checking that all upstream dependencies have succeeded52 if not ti.task.upstream_list:53 yield self._passing_status(reason="The task instance did not have any upstream tasks.")54 return55 if ti.task.trigger_rule == TR.DUMMY:56 yield self._passing_status(reason="The task had a dummy trigger rule set.")57 return58 # see if the task name is in the task upstream for our task59 successes, skipped, failed, upstream_failed, done = self._get_states_count_upstream_ti(60 ti=ti, finished_tasks=dep_context.ensure_finished_tasks(ti.task.dag, ti.execution_date, session)61 )62 yield from self._evaluate_trigger_rule(63 ti=ti,64 successes=successes,65 skipped=skipped,66 failed=failed,67 upstream_failed=upstream_failed,68 done=done,69 flag_upstream_failed=dep_context.flag_upstream_failed,70 session=session,71 )72 @provide_session73 def _evaluate_trigger_rule( # pylint: disable=too-many-branches74 self, ti, successes, skipped, failed, upstream_failed, done, flag_upstream_failed, session75 ):76 """77 Yields a dependency status that indicate whether the given task instance's trigger78 rule was met.79 :param ti: the task instance to evaluate the trigger rule of80 :type ti: airflow.models.TaskInstance81 :param successes: Number of successful upstream tasks82 :type successes: int83 :param skipped: Number of skipped upstream tasks84 :type skipped: int85 :param failed: Number of failed upstream tasks86 :type failed: int87 :param upstream_failed: Number of upstream_failed upstream tasks88 :type upstream_failed: int89 :param done: Number of completed upstream tasks90 :type done: int91 :param flag_upstream_failed: This is a hack to generate92 the upstream_failed state creation while checking to see93 whether the task instance is runnable. It was the shortest94 path to add the feature95 :type flag_upstream_failed: bool96 :param session: database session97 :type session: sqlalchemy.orm.session.Session98 """99 task = ti.task100 upstream = len(task.upstream_task_ids)101 trigger_rule = task.trigger_rule102 upstream_done = done >= upstream103 upstream_tasks_state = {104 "total": upstream,105 "successes": successes,106 "skipped": skipped,107 "failed": failed,108 "upstream_failed": upstream_failed,109 "done": done,110 }111 # TODO(aoen): Ideally each individual trigger rules would be its own class, but112 # this isn't very feasible at the moment since the database queries need to be113 # bundled together for efficiency.114 # handling instant state assignment based on trigger rules115 if flag_upstream_failed:116 if trigger_rule == TR.ALL_SUCCESS:117 if upstream_failed or failed:118 ti.set_state(State.UPSTREAM_FAILED, session)119 elif skipped:120 ti.set_state(State.SKIPPED, session)121 elif trigger_rule == TR.ALL_FAILED:122 if successes or skipped:123 ti.set_state(State.SKIPPED, session)124 elif trigger_rule == TR.ONE_SUCCESS:125 if upstream_done and not successes:126 ti.set_state(State.SKIPPED, session)127 elif trigger_rule == TR.ONE_FAILED:128 if upstream_done and not (failed or upstream_failed):129 ti.set_state(State.SKIPPED, session)130 elif trigger_rule == TR.NONE_FAILED:131 if upstream_failed or failed:132 ti.set_state(State.UPSTREAM_FAILED, session)133 elif trigger_rule == TR.NONE_FAILED_OR_SKIPPED:134 if upstream_failed or failed:135 ti.set_state(State.UPSTREAM_FAILED, session)136 elif skipped == upstream:137 ti.set_state(State.SKIPPED, session)138 elif trigger_rule == TR.NONE_SKIPPED:139 if skipped:140 ti.set_state(State.SKIPPED, session)141 if trigger_rule == TR.ONE_SUCCESS:142 if successes <= 0:143 yield self._failing_status(144 reason="Task's trigger rule '{}' requires one upstream "145 "task success, but none were found. "146 "upstream_tasks_state={}, upstream_task_ids={}".format(147 trigger_rule, upstream_tasks_state, task.upstream_task_ids148 )149 )150 elif trigger_rule == TR.ONE_FAILED:151 if not failed and not upstream_failed:152 yield self._failing_status(153 reason="Task's trigger rule '{}' requires one upstream "154 "task failure, but none were found. "155 "upstream_tasks_state={}, upstream_task_ids={}".format(156 trigger_rule, upstream_tasks_state, task.upstream_task_ids157 )158 )159 elif trigger_rule == TR.ALL_SUCCESS:160 num_failures = upstream - successes161 if num_failures > 0:162 yield self._failing_status(163 reason="Task's trigger rule '{}' requires all upstream "164 "tasks to have succeeded, but found {} non-success(es). "165 "upstream_tasks_state={}, upstream_task_ids={}".format(166 trigger_rule, num_failures, upstream_tasks_state, task.upstream_task_ids167 )168 )169 elif trigger_rule == TR.ALL_FAILED:170 num_successes = upstream - failed - upstream_failed171 if num_successes > 0:172 yield self._failing_status(173 reason="Task's trigger rule '{}' requires all upstream "174 "tasks to have failed, but found {} non-failure(s). "175 "upstream_tasks_state={}, upstream_task_ids={}".format(176 trigger_rule, num_successes, upstream_tasks_state, task.upstream_task_ids177 )178 )179 elif trigger_rule == TR.ALL_DONE:180 if not upstream_done:181 yield self._failing_status(182 reason="Task's trigger rule '{}' requires all upstream "183 "tasks to have completed, but found {} task(s) that "184 "were not done. upstream_tasks_state={}, "185 "upstream_task_ids={}".format(186 trigger_rule, upstream_done, upstream_tasks_state, task.upstream_task_ids187 )188 )189 elif trigger_rule == TR.NONE_FAILED:190 num_failures = upstream - successes - skipped191 if num_failures > 0:192 yield self._failing_status(193 reason="Task's trigger rule '{}' requires all upstream "194 "tasks to have succeeded or been skipped, but found {} non-success(es). "195 "upstream_tasks_state={}, upstream_task_ids={}".format(196 trigger_rule, num_failures, upstream_tasks_state, task.upstream_task_ids197 )198 )199 elif trigger_rule == TR.NONE_FAILED_OR_SKIPPED:200 num_failures = upstream - successes - skipped201 if num_failures > 0:202 yield self._failing_status(203 reason="Task's trigger rule '{}' requires all upstream "204 "tasks to have succeeded or been skipped, but found {} non-success(es). "205 "upstream_tasks_state={}, upstream_task_ids={}".format(206 trigger_rule, num_failures, upstream_tasks_state, task.upstream_task_ids207 )208 )209 elif trigger_rule == TR.NONE_SKIPPED:210 if not upstream_done or (skipped > 0):211 yield self._failing_status(212 reason="Task's trigger rule '{}' requires all upstream "213 "tasks to not have been skipped, but found {} task(s) skipped. "214 "upstream_tasks_state={}, upstream_task_ids={}".format(215 trigger_rule, skipped, upstream_tasks_state, task.upstream_task_ids216 )217 )218 else:...

Full Screen

Full Screen

run.py

Source:run.py Github

copy

Full Screen

1#!/usr/bin/env python32import multiprocessing as mp3import praw4import prawcore.exceptions5import refresh_token6from purge_reddit import PurgeReddit7import time8#### EDIT YOUR DETAILS BELOW ####9# Your login details10username = '' # optional11password = '' # optional12user_agent = 'PurgeBot' # Bot name13client_id = '##############' # '14 char client ID'14client_secret = '###########################' # '27 char client secret'15# Purge options16## Number of recent comments/submissions to delete.17## Set to None if no limits (purge ALL comments/submissions)18## Set to 10 will purge recent 10, etc.19limitation = None20## Only purge posts with score <= this number. Set to None if no threshold21max_score = None22## Set to False to not purge comments/submissions23purge_comments = True24purge_submissions = True25## Edit comments/submissions to this before deletion. This prevents archiving.26redact_msg = "[redacted]"27## Set to True to only edit posts to `redact_msg` without deleting them.28redact_only = False29## Use multiprocessing. Set to False if problems occur30use_multiprocessing = True31## Show comment body32show_comment = False33## Show submission titles34show_title = False35## Start purge from controversial first instead of newest36controversial_first = True37## Do not prompt at all. Use with EXTRA caution!38no_prompt = False39## Debug mode40debug = False41## Whitelist e.g.`['id1', 'id2', 'id3']`42comment_whitelist = []43submissions_whitelist = []44#### DO NOT EDIT BELOW ####45options = {'controversial_first': controversial_first,46 'debug': debug,47 'limitation': limitation,48 'redact_msg': redact_msg,49 'redact_only': redact_only,50 'max_score': max_score,51 'show_title': show_title,52 'show_comment': show_comment,53 'comment_whitelist': comment_whitelist,54 'submissions_whitelist': submissions_whitelist}55def save_log(log_type: str, entries: list):56 filename = f"log/{log_type} {time.asctime().replace(':', '.')}.log"57 try:58 f = open(filename, "w")59 for entry in entries:60 f.write(entry + '\n')61 f.close()62 except IOError:63 print(f"Could not write to {filename}")64if __name__ == '__main__':65 # Initialize reddit66 if password != '' and username != '':67 # use username and password68 reddit = praw.Reddit(69 client_id=client_id,70 client_secret=client_secret,71 user_agent=user_agent,72 username=username,73 password=password,74 redirect_uri="http://localhost:8080")75 else:76 # use OAuth77 reddit = praw.Reddit(78 client_id=client_id,79 client_secret=client_secret,80 user_agent=user_agent,81 redirect_uri="http://localhost:8080")82 # Check authentication key83 print("Checking authentication...")84 if client_id == '##############' \85 or client_secret == '###########################':86 print("Missing client ID/secret key!")87 exit()88 elif len(client_id) != 14 or len(client_secret) != 27:89 print("Failed to authenticate!",90 "Your client ID/secret key isn't the correct length.")91 print("Please check your configuration again!")92 exit()93 try:94 # Test authentication95 if reddit.user.me() is None:96 refresh_token.authorize_token(reddit)97 except prawcore.exceptions.ResponseException as exc:98 if f'{exc}'.find('401') != -1:99 # 401 error, invalid key ?100 print("ERROR 401: There's a problem with your authentication key."101 + "\nPlease check your configuration again!")102 else:103 print("\nResponseException:", exc)104 if debug:105 raise exc106 exit()107 except prawcore.exceptions.OAuthException:108 print("Failed to authenticate credentials! Possible causes:")109 print("1. Wrong username/password.")110 print("2. 2FA is enabled.")111 print("3. Invalid client ID/secret key.")112 try:113 refresh_token.authorize_token(reddit)114 except refresh_token.TokenException as exc:115 print("TokenException:", exc)116 if debug:117 raise exc118 exit()119 except refresh_token.TokenException as exc:120 print("TokenException:", exc)121 print("Could not authorize token!")122 exit()123 # Authkey all good! Check total to purge and confirm124 pr = PurgeReddit(reddit, options)125 comment_count = 0126 submission_count = 0127 if purge_comments:128 print("Calculating number of comments, please wait...")129 comment_count = pr.get_comment_total()130 if comment_count == 0:131 print("Found no comments to delete.")132 purge_comments = False133 elif not no_prompt:134 confirm = input(f"{comment_count} comments will be "135 + ("redacted" if redact_only else "deleted")136 + ". Are you sure? [y/N] ")137 if not confirm.lower().startswith("y"):138 print("Comment purge aborted.")139 purge_comments = False140 if purge_submissions:141 print("Calculating number of submissions, please wait...")142 submission_count = pr.get_submission_total()143 if submission_count == 0:144 print("Found no submissions to delete.")145 purge_submissions = False146 elif not no_prompt:147 confirm = input(f"{submission_count} submissions will be "148 + ("redacted" if redact_only else "deleted")149 + ". Are you sure? [y/N] ")150 if not confirm.lower().startswith("y"):151 print("Submission purge aborted.")152 purge_submissions = False153 if not (purge_submissions or purge_comments):154 print("Nothing to purge today. Have a nice day!")155 exit()156 # Begin purge157 while True:158 if use_multiprocessing:159 # Init multiprocessing and start each thread160 skipped_comments_queue = mp.Queue()161 skipped_submissions_queue = mp.Queue()162 if purge_comments:163 p1 = mp.Process(target=pr.purge_comments,164 args=(comment_count, skipped_comments_queue,))165 p1.start()166 time.sleep(2) # delay to avoid errors167 if purge_submissions:168 p2 = mp.Process(target=pr.purge_submissions,169 args=(submission_count,170 skipped_submissions_queue,))171 p2.start()172 # Get skipped posts173 if purge_comments:174 skipped_comments = skipped_comments_queue.get()175 p1.join()176 if len(skipped_comments) > 0:177 skipped_id = list(map(178 lambda c:179 f"{c.submission}/{c} in {c.subreddit}",180 skipped_comments))181 print(f"Comments not purged:\n", skipped_id)182 save_log('skipped_comments', skipped_id)183 else:184 print("All comments purged!")185 if purge_submissions:186 skipped_submissions = skipped_submissions_queue.get()187 p2.join()188 if len(skipped_submissions) > 0:189 skipped_id = list(map(lambda s: f'{s} in {s.subreddit}',190 skipped_submissions))191 print("Submissions not purged:\n", skipped_id)192 save_log('skipped_submissions', skipped_id)193 else:194 print("All submissions purged!")195 else:196 # Serial method197 serial_msg = ""198 if purge_comments:199 skipped_comments = pr.purge_comments(comment_count)200 if len(skipped_comments) > 0:201 skipped_id = list(map(202 lambda c:203 f"{c.submission}/{c} in {c.subreddit}",204 skipped_comments))205 serial_msg += f"Comments not purged:\n{skipped_id}"206 save_log('skipped_comments', skipped_id)207 else:208 serial_msg += "All comments purged!"209 if purge_submissions:210 skipped_submissions = pr.purge_submissions(submission_count)211 if len(skipped_submissions) > 0:212 skipped_id = list(map(lambda s: f'{s} in {s.subreddit}',213 skipped_submissions))214 serial_msg += f"Submissions not purged:\n{skipped_id}"215 save_log('skipped_submissions', skipped_id)216 else:217 serial_msg += "All submissions purged!"218 print(serial_msg)219 # if there were more than 1000, prompt to delete more220 if (submission_count >= 1000 or comment_count >= 1000) \221 and not redact_only:222 if not no_prompt:223 confirm = input("There were more than 1000 submissions/comments!",224 "Delete more? [y/N] ")225 if no_prompt or confirm.lower().startswith('y'):226 if limitation is not None:227 limitation -= 1000228 print("Calculating remaining submissions/comments...")229 if purge_comments:230 comment_count = pr.get_comment_total()231 print(f"{comment_count} remaining...")232 if purge_submissions:233 submission_count = pr.get_submission_total()234 print(f"{submission_count} remaining...")235 else:236 break...

Full Screen

Full Screen

text_cleaner.py

Source:text_cleaner.py Github

copy

Full Screen

1import re, multiprocessing2from tqdm import tqdm3import numpy as np4class Cleaner():5 def __init__(self, num_threads=1): # right now, it's single threaded6 self.num_threads = min(num_threads, int(multiprocessing.cpu_count()/2))7 """8 S- ar putea să fie necesar să- l recitiţi.9 """10 self.r1 = re.compile(r"([\w]+-)[\s]([\w]+)", re.IGNORECASE)11 """12 {LL/ AAAA}13 Humalog Mix50 100 U/ ml14 """15 self.r2 = re.compile(r"([\w]+/)\s([\w]+)", re.IGNORECASE)16 """17 All unicode dashes to normal '-', see https://www.fileformat.info/info/unicode/category/Pd/list.htm18 includes bull : • \u202219 """20 self.r3 = re.compile(r"([■\u2022\u007E\u00AD\u058A\u05BE\u1400\u1806\u2010\u2011\u2012\u2013\u2014\u2015\u2053\u207B\u208B\u2212\u2E17\u2E3A\u2E3B\u301C\u3030\u30A0\uFE31\uFE32\uFE63\uFF0D]+)", re.UNICODE)21 """22 spaces after comma in numbers: 1, 4% -> 1,4%23 """24 self.r4 = re.compile(r"([\d]+,)\s([\d]+)", re.IGNORECASE)25 """26 soft hyphens #\u00AD27 """28 self.r5 = re.compile(r"[\u00AD]")29 """30 remove URLS31 """32 self.r6 = re.compile(r'(?:www|http)\S+|<\S+|\w+\/*>')33 """34 remove emails35 """36 self.r7 = re.compile(r'([^@]+@[^@]+\.[^@]+)')37 """38 multiple spaces39 """40 self.space = re.compile(' +')41 """42 forbiden chars that cause a lot of bad sentences43 """44 self.forbidden_chars = "ºþÈ™ÓÑÄÈîƒ"45 def process(self, lines, percent_max_numeric=0.25, percent_max_non_ascii=0.40, min_line_length=20, verbose=False, disable_pbar=True):46 skipped_because_min_length = np.array([0,0], dtype=np.uint64)47 skipped_alpha_count = np.array([0,0], dtype=np.uint64)48 skipped_because_max_numeric = np.array([0,0], dtype=np.uint64)49 skipped_because_max_non_ascii = np.array([0,0], dtype=np.uint64)50 skipped_because_forbidden_chars = np.array([0,0], dtype=np.uint64)51 total_original_length = 052 total_clean_length = 053 output = []54 for line in tqdm(lines, disable = disable_pbar):55 line = line.strip()56 # get stats about line57 length = len(line)58 total_original_length += length59 if length < min_line_length:60 skipped_because_min_length += np.array([1,length], dtype=np.uint64)61 continue62 line = bytes(line, 'utf-8').decode('utf-8', 'ignore') # strip not utf-8 chars63 digit_count = 064 alpha_count = 065 ascii_count = 066 forbidden_char = False67 for char in line:68 if char in self.forbidden_chars:69 forbidden_char = True70 break71 if char.isnumeric():72 digit_count+=173 if char.isalpha():74 alpha_count+=175 if char.isascii():76 ascii_count+=177 # reject if forbidden char78 if forbidden_char:79 skipped_because_forbidden_chars += np.array([1,length], dtype=np.uint64)80 continue81 # reject if number of letters is too small82 if alpha_count == 0 or alpha_count / length < 0.5:83 skipped_alpha_count += np.array([1,length], dtype=np.uint64)84 if verbose:85 print("Skipping alpha={:.3f}: [{}]".format(alpha_count / length, line))86 continue87 # reject if too many numbers88 if digit_count / alpha_count >= percent_max_numeric and digit_count > 6:89 skipped_because_max_numeric += np.array([1,length], dtype=np.uint64)90 if verbose:91 print("Skipping digit={:.3f}: [{}]".format(digit_count / alpha_count, line))92 continue93 # reject if too many non-ascii94 if ascii_count / alpha_count < percent_max_non_ascii and length > 15:95 skipped_because_max_non_ascii += np.array([1,length], dtype=np.uint64)96 if verbose:97 print("Skipping ascii={:.3f}: [{}]".format(digit_count / alpha_count, line))98 continue99 # clean line100 #print("\nbef: {}".format(line))101 line = self.r1.sub(r"\1\2", line)102 line = self.r2.sub(r"\1\2", line)103 line = self.r3.sub("-", line)104 line = self.r4.sub(r"\1\2", line)105 line = self.r5.sub("", line)106 line = self.r6.sub("", line)107 line = self.r7.sub("", line)108 line = line.replace("( ă)", "(ă)")109 line = line.replace("ţ", "ț")110 line = line.replace("ş", "ș")111 line = line.replace("Ţ", "Ț")112 line = line.replace("Ş", "Ș")113 line = line.replace("â", "â")114 #print("aft: {}".format(line))115 line = self.space.sub(' ', line).strip()116 # check that after processing the line is not too short117 if len(line) < min_line_length:118 skipped_because_min_length += np.array([1,length], dtype=np.uint64)119 continue120 total_clean_length += len(line)121 output.append(line+"\n")122 # pack stats123 stats = {}124 stats["skipped_because_min_length"] = skipped_because_min_length125 stats["skipped_alpha_count"] = skipped_alpha_count126 stats["skipped_because_max_numeric"] = skipped_because_max_numeric127 stats["skipped_because_max_non_ascii"] = skipped_because_max_non_ascii128 stats["skipped_because_forbidden_chars"] = skipped_because_forbidden_chars129 stats["total_original_length"] = total_original_length130 stats["total_clean_length"] = total_clean_length131 return output, stats132 def add_stats(self, a, b):133 """134 Add two stats dict that are returned by the process function.135 This is used for multiple files136 :param a: stats dict137 :param b: stats dict138 :return: stats dict139 """140 stats = {}141 stats["skipped_because_min_length"] = a["skipped_because_min_length"] + b["skipped_because_min_length"]142 stats["skipped_alpha_count"] = a["skipped_alpha_count"] + b["skipped_alpha_count"]143 stats["skipped_because_max_numeric"] = a["skipped_because_max_numeric"] + b["skipped_because_max_numeric"]144 stats["skipped_because_max_non_ascii"] = a["skipped_because_max_non_ascii"] + b["skipped_because_max_non_ascii"]145 stats["skipped_because_forbidden_chars"] = a["skipped_because_forbidden_chars"] + b["skipped_because_forbidden_chars"]146 stats["total_original_length"] = a["total_original_length"] + b["total_original_length"]147 stats["total_clean_length"] = a["total_clean_length"] + b["total_clean_length"]148 return stats149 def print_stats(self, stats):150 print("\nCleaning statistics:")151 print("Total original length (chars) = {}".format(stats["total_original_length"]))152 print("Total length after cleaning (chars) = {}".format(stats["total_clean_length"]))153 print("Percent data kept = {:.3f} %".format(100.*stats["total_clean_length"]/stats["total_original_length"]))154 print("Skipped because line length was below minimum (lines/chars): {} ".format(stats["skipped_because_min_length"]))155 print("Skipped because line had forbidden characters (lines/chars): {} ".format(stats["skipped_because_forbidden_chars"]))156 print("Skipped because alpha count was below minimum (lines/chars): {} ".format(stats["skipped_alpha_count"]))157 print("Skipped because digit count was above maximum (lines/chars): {} ".format(stats["skipped_because_max_numeric"]))158 print("Skipped because too many non-ascii characters (lines/chars): {} ".format(stats["skipped_because_max_non_ascii"]))159text = [" - ~~~~~Păstraţi acest prospect. S- ar putea să fie necesar să- l recitiţi.",160 "- Dacă aveţi orice întrebări suplimentare, adresaţi- vă medicului dumneavoastră sau farmacistului.\n",161 "{LL/ AAAA}\n",162 "MANUALUL UTILIZATORULUI\n",163 "Vezi textul manualului mai jos.\n",164 "303 Informaţii detaliate privind acest medicament sunt disponibile pe website- ul Agenţiei Europene a Medicamentului (EMEA): http: // www. emea. europa. eu /.\n",165 "304 PROSPECT:­ \n",166 "INFORMAŢII PENTRU UTILIZATOR",167 "Humalog Mix50 100 U/ ml • • • ~~~~",168 "Τηλ: +30 210 629 4600 España Lilly S. A.",169 "Tel: + 34- 91 663 50 00 France Lilly France S. A. S.",170 "Tél: +33 - (0) 1 55 49 34 34 Ireland Eli Lilly and Company (Ireland) Limited Tel: + 353 - (0) 1 661 4377 Ísland Icepharma hf.",171 "Sími + 354 540 8000 Italia Eli Lilly Italia S. p. A.",172 "Tel: + 39 - 055 42571 Κύπρος Phadisco Ltd Τηλ: +357 22 715000 ",173 "Luxembourg/ Luxemburg Eli Lilly Benelux S. A.",174 "Tél/ Tel: + 32 - (0) 2 548 84 84 Magyarország Lilly Hungária Kft.",175 "Tel: + 36 1 328 5100 Malta Charles de Giorgio Ltd.",176 "Κύπρος Βαρνάβας Χατζηπαναγής Λτδ 7 Ανδροκλέους CY- 1060 Λευκωσία Tηλ"]177#tt = []178#for i in range(100000):179# tt.extend(text)180#print(len(tt))181"""182c = Cleaner(1)183lines, s1 = c.process(text)184lines, s2 = c.process(text)185stats = c.add_stats(s1, s2)186c.print_stats(s1)187c.print_stats(s2)188c.print_stats(stats)189print("DONE")...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Slash automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful