...27# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.28from assertpy import assert_that, fail, soft_fail, soft_assertions29def test_soft_fail_without_context():30 try:31 soft_fail()32 fail('should have raised error')33 except AssertionError as e:34 out = str(e)35 assert_that(out).is_equal_to('Fail!')36 assert_that(out).does_not_contain('should have raised error')37def test_soft_fail_with_msg_without_context():38 try:39 soft_fail('some msg')40 fail('should have raised error')41 except AssertionError as e:42 out = str(e)43 assert_that(out).is_equal_to('Fail: some msg!')44 assert_that(out).does_not_contain('should have raised error')45def test_soft_fail():46 try:47 with soft_assertions():48 soft_fail()49 fail('should have raised error')50 except AssertionError as e:51 out = str(e)52 assert_that(out).contains('Fail!')53 assert_that(out).does_not_contain('should have raised error')54def test_soft_fail_with_msg():55 try:56 with soft_assertions():57 soft_fail('foobar')58 fail('should have raised error')59 except AssertionError as e:60 out = str(e)61 assert_that(out).contains('Fail: foobar!')62 assert_that(out).does_not_contain('should have raised error')63def test_soft_fail_with_soft_failing_asserts():64 try:65 with soft_assertions():66 assert_that('foo').is_length(4)67 assert_that('foo').is_empty()68 soft_fail('foobar')69 assert_that('foo').is_not_equal_to('foo')70 assert_that('foo').is_equal_to_ignoring_case('BAR')71 fail('should have raised error')72 except AssertionError as e:73 out = str(e)74 assert_that(out).contains('Expected <foo> to be of length <4>, but was <3>.')75 assert_that(out).contains('Expected <foo> to be empty string, but was not.')76 assert_that(out).contains('Fail: foobar!')77 assert_that(out).contains('Expected <foo> to be not equal to <foo>, but was.')78 assert_that(out).contains('Expected <foo> to be case-insensitive equal to <BAR>, but was not.')79 assert_that(out).does_not_contain('should have raised error')80def test_double_soft_fail():81 try:82 with soft_assertions():83 soft_fail()84 soft_fail('foobar')85 fail('should have raised error')86 except AssertionError as e:87 out = str(e)88 assert_that(out).contains('Fail!')89 assert_that(out).contains('Fail: foobar!')...

1# -*- coding: utf-8 -*-2#3# Licensed to the Apache Software Foundation (ASF) under one4# or more contributor license agreements. See the NOTICE file5# distributed with this work for additional information6# regarding copyright ownership. The ASF licenses this file7# to you under the Apache License, Version 2.0 (the8# "License"); you may not use this file except in compliance9# with the License. You may obtain a copy of the License at10# 11# 13# Unless required by applicable law or agreed to in writing,14# software distributed under the License is distributed on an15# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY16# KIND, either express or implied. See the License for the17# specific language governing permissions and limitations18# under the License.19from time import sleep20from airflow.exceptions import AirflowException, AirflowSensorTimeout, \21 AirflowSkipException22from airflow.models import BaseOperator23from airflow.utils import timezone24from airflow.utils.decorators import apply_defaults25class BaseSensorOperator(BaseOperator):26 """27 Sensor operators are derived from this class an inherit these attributes.28 Sensor operators keep executing at a time interval and succeed when29 a criteria is met and fail if and when they time out.30 :param soft_fail: Set to true to mark the task as SKIPPED on failure31 :type soft_fail: bool32 :param poke_interval: Time in seconds that the job should wait in33 between each tries34 :type poke_interval: int35 :param timeout: Time, in seconds before the task times out and fails.36 :type timeout: int37 """38 ui_color = '#e6f1f2'39 @apply_defaults40 def __init__(self,41 poke_interval=60,42 timeout=60 * 60 * 24 * 7,43 soft_fail=False,44 *args,45 **kwargs):46 super(BaseSensorOperator, self).__init__(*args, **kwargs)47 self.poke_interval = poke_interval48 self.soft_fail = soft_fail49 self.timeout = timeout50 def poke(self, context):51 """52 Function that the sensors defined while deriving this class should53 override.54 """55 raise AirflowException('Override me.')56 def execute(self, context):57 started_at = timezone.utcnow()58 while not self.poke(context):59 if (timezone.utcnow() - started_at).total_seconds() > self.timeout:60 if self.soft_fail:61 raise AirflowSkipException('Snap. Time is OUT.')62 else:63 raise AirflowSensorTimeout('Snap. Time is OUT.')64 sleep(self.poke_interval)...

