Best Python code snippet using hypothesis
witch_matching.py
Source:witch_matching.py  
1"""Main class, holding information about models and training/testing routines."""2import torch3from ..consts import BENCHMARK, NON_BLOCKING4from ..utils import bypass_last_layer5torch.backends.cudnn.benchmark = BENCHMARK6from .witch_base import _Witch7class WitchGradientMatching(_Witch):8    """Brew passenger poison with given arguments.9    âDouble, double toil and trouble;10    Fire burn, and cauldron bubble....11    Round about the cauldron go;12    In the poison'd entrails throw.â13    """14    def _define_objective(self, inputs, labels, criterion, targets, intended_classes, true_classes):15        """Implement the closure here."""16        def closure(model, optimizer, target_grad, target_clean_grad, target_gnorm):17            """This function will be evaluated on all GPUs."""  # noqa: D40118            differentiable_params = [p for p in model.parameters() if p.requires_grad]19            outputs = model(inputs)20            poison_loss = criterion(outputs, labels)21            prediction = (outputs.data.argmax(dim=1) == labels).sum()22            poison_grad = torch.autograd.grad(poison_loss, differentiable_params, retain_graph=True, create_graph=True)23            passenger_loss = self._passenger_loss(poison_grad, target_grad, target_clean_grad, target_gnorm)24            if self.args.centreg != 0:25                passenger_loss = passenger_loss + self.args.centreg * poison_loss26            passenger_loss.backward(retain_graph=self.retain)27            return passenger_loss.detach().cpu(), prediction.detach().cpu()28        return closure29    def _passenger_loss(self, poison_grad, target_grad, target_clean_grad, target_gnorm):30        """Compute the blind passenger loss term."""31        passenger_loss = 032        poison_norm = 033        SIM_TYPE = ['similarity', 'similarity-narrow', 'top5-similarity', 'top10-similarity', 'top20-similarity']34        if self.args.loss == 'top10-similarity':35            _, indices = torch.topk(torch.stack([p.norm() for p in target_grad], dim=0), 10)36        elif self.args.loss == 'top20-similarity':37            _, indices = torch.topk(torch.stack([p.norm() for p in target_grad], dim=0), 20)38        elif self.args.loss == 'top5-similarity':39            _, indices = torch.topk(torch.stack([p.norm() for p in target_grad], dim=0), 5)40        else:41            indices = torch.arange(len(target_grad))42        for i in indices:43            if self.args.loss in ['scalar_product', *SIM_TYPE]:44                passenger_loss -= (target_grad[i] * poison_grad[i]).sum()45            elif self.args.loss == 'cosine1':46                passenger_loss -= torch.nn.functional.cosine_similarity(target_grad[i].flatten(), poison_grad[i].flatten(), dim=0)47            elif self.args.loss == 'SE':48                passenger_loss += 0.5 * (target_grad[i] - poison_grad[i]).pow(2).sum()49            elif self.args.loss == 'MSE':50                passenger_loss += torch.nn.functional.mse_loss(target_grad[i], poison_grad[i])51            if self.args.loss in SIM_TYPE or self.args.normreg != 0:52                poison_norm += poison_grad[i].pow(2).sum()53        if self.args.repel != 0:54            for i in indices:55                if self.args.loss in ['scalar_product', *SIM_TYPE]:56                    passenger_loss += self.args.repel * (target_grad[i] * poison_grad[i]).sum()57                elif self.args.loss == 'cosine1':58                    passenger_loss -= self.args.repel * torch.nn.functional.cosine_similarity(target_grad[i].flatten(), poison_grad[i].flatten(), dim=0)59                elif self.args.loss == 'SE':60                    passenger_loss -= 0.5 * self.args.repel * (target_grad[i] - poison_grad[i]).pow(2).sum()61                elif self.args.loss == 'MSE':62                    passenger_loss -= self.args.repel * torch.nn.functional.mse_loss(target_grad[i], poison_grad[i])63        passenger_loss = passenger_loss / target_gnorm  # this is a constant64        if self.args.loss in SIM_TYPE:65            passenger_loss = 1 + passenger_loss / poison_norm.sqrt()66        if self.args.normreg != 0:67            passenger_loss = passenger_loss + self.args.normreg * poison_norm.sqrt()68        if self.args.loss == 'similarity-narrow':69            for i in indices[-2:]:  # normalize norm of classification layer70                passenger_loss += 0.5 * poison_grad[i].pow(2).sum() / target_gnorm71        return passenger_loss72class WitchGradientMatchingNoisy(WitchGradientMatching):73    """Brew passenger poison with given arguments.74    Both the poison gradient and the target gradient are modified to be diff. private before calcuating the loss.75    """76    def _initialize_brew(self, victim, kettle):77        super()._initialize_brew(victim, kettle)78        self.defs = victim.defs  # leaking abstractions here, but only for this adaptive attack subset :>79    def _define_objective(self, inputs, labels, criterion, targets, intended_classes, true_classes):80        """Implement the closure here."""81        def closure(model, optimizer, target_grad, target_clean_grad, target_gnorm):82            """This function will be evaluated on all GPUs."""  # noqa: D40183            differentiable_params = [p for p in model.parameters() if p.requires_grad]84            outputs = model(inputs)85            poison_loss = criterion(outputs, labels)86            prediction = (outputs.data.argmax(dim=1) == labels).sum()87            poison_grad = torch.autograd.grad(poison_loss, differentiable_params, retain_graph=True, create_graph=True, only_inputs=True)88            # add noise to samples89            self._hide_gradient(poison_grad)90            # Compute blind passenger loss91            passenger_loss = self._passenger_loss(poison_grad, target_grad, target_clean_grad, target_gnorm)92            if self.args.centreg != 0:93                passenger_loss = passenger_loss + self.args.centreg * poison_loss94            passenger_loss.backward(retain_graph=self.retain)95            return passenger_loss.detach().cpu(), prediction.detach().cpu()96        return closure97    def _hide_gradient(self, gradient_list):98        """Enforce batch-wise privacy if necessary.99        This is attacking a defense discussed in Hong et al., 2020100        We enforce privacy on mini batches instead of instances to cope with effects on batch normalization101        This is reasonble as Hong et al. discuss that defense against poisoning mostly arises from the addition102        of noise to the gradient signal103        """104        if self.defs.privacy['clip'] is not None:105            total_norm = torch.norm(torch.stack([torch.norm(grad) for grad in gradient_list]))106            clip_coef = self.defs.privacy['clip'] / (total_norm + 1e-6)107            if clip_coef < 1:108                for grad in gradient_list:109                    grad.mul(clip_coef)110        if self.defs.privacy['noise'] is not None:111            loc = torch.as_tensor(0.0, device=kettle.setup['device'])112            clip_factor = defs.privacy['clip'] if defs.privacy['clip'] is not None else 1.0113            scale = torch.as_tensor(clip_factor * defs.privacy['noise'], device=kettle.setup['device'])114            if self.defs.privacy['distribution'] == 'gaussian':115                generator = torch.distributions.normal.Normal(loc=loc, scale=scale)116            elif self.defs.privacy['distribution'] == 'laplacian':117                generator = torch.distributions.laplace.Laplace(loc=loc, scale=scale)118            else:119                raise ValueError(f'Invalid distribution {self.defs.privacy["distribution"]} given.')120            for grad in gradient_list:121                grad += generator.sample(grad.shape)122class WitchGradientMatchingHidden(WitchGradientMatching):123    """Brew passenger poison with given arguments.124    Try to match the original image feature representation to hide the attack from filter defenses.125    This class does a ton of horrid overwriting of the _batched_step method to add some additional126    computations that I dont want to be executed for all attacks. todo: refactor :>127    """128    FEATURE_WEIGHT = 1.0129    def _batched_step(self, poison_delta, poison_bounds, example, victim, kettle):130        """Take a step toward minmizing the current target loss."""131        inputs, labels, ids = example132        inputs = inputs.to(**self.setup)133        labels = labels.to(dtype=torch.long, device=self.setup['device'], non_blocking=NON_BLOCKING)134        # Check adversarial pattern ids135        poison_slices, batch_positions = kettle.lookup_poison_indices(ids)136        # This is a no-op in single network brewing137        # In distributed brewing, this is a synchronization operation138        inputs, labels, poison_slices, batch_positions, randgen = victim.distributed_control(139            inputs, labels, poison_slices, batch_positions)140        # save out clean inputs141        # These will be representative of "average" unpoisoned versions of the poison images142        # as such they will be augmented differently143        clean_inputs = inputs.clone().detach()144        # If a poisoned id position is found, the corresponding pattern is added here:145        if len(batch_positions) > 0:146            delta_slice = poison_delta[poison_slices].detach().to(**self.setup)147            if self.args.clean_grad:148                delta_slice = torch.zeros_like(delta_slice)149            delta_slice.requires_grad_()  # TRACKING GRADIENTS FROM HERE150            poison_images = inputs[batch_positions]151            inputs[batch_positions] += delta_slice152            # Add additional clean data if mixing during the attack:153            if self.args.pmix:154                if 'mix' in victim.defs.mixing_method['type']:   # this covers mixup, cutmix 4waymixup, maxup-mixup155                    try:156                        extra_data = next(self.extra_data)157                    except StopIteration:158                        self.extra_data = iter(kettle.trainloader)159                        extra_data = next(self.extra_data)160                    extra_inputs = extra_data[0].to(**self.setup)161                    extra_labels = extra_data[1].to(dtype=torch.long, device=self.setup['device'], non_blocking=NON_BLOCKING)162                    inputs = torch.cat((inputs, extra_inputs), dim=0)163                    clean_inputs = torch.cat((clean_inputs, extra_inputs), dim=0)164                    labels = torch.cat((labels, extra_labels), dim=0)165            # Perform differentiable data augmentation166            if self.args.paugment:167                inputs = kettle.augment(inputs, randgen=randgen)168                clean_inputs = kettle.augment(clean_inputs)169            # Perform mixing170            if self.args.pmix:171                inputs, extra_labels, mixing_lmb = kettle.mixer(inputs, labels)172                clean_inputs, _, _ = kettle.mixer(clean_inputs, labels)173            if self.args.padversarial is not None:174                # The optimal choice of the 3rd and 4th argument here are debatable175                # This is likely the strongest anti-defense:176                # but the defense itself splits the batch and uses half of it as targets177                # instead of using the known target [as the defense does not know about the target]178                # delta = self.attacker.attack(inputs.detach(), labels,179                #                              self.targets, self.true_classes, steps=victim.defs.novel_defense['steps'])180                # This is a more accurate anti-defense:181                [temp_targets, inputs,182                 temp_true_labels, labels,183                 temp_fake_label] = _split_data(inputs, labels, target_selection=victim.defs.novel_defense['target_selection'])184                delta, additional_info = self.attacker.attack(inputs.detach(), labels,185                                                              temp_targets, temp_fake_label, steps=victim.defs.novel_defense['steps'])186                inputs = inputs + delta  # Kind of a reparametrization trick187                clean_inputs = clean_inputs + delta188            # Define the loss objective and compute gradients189            if self.args.target_criterion in ['cw', 'carlini-wagner']:190                loss_fn = cw_loss191            else:192                loss_fn = torch.nn.CrossEntropyLoss()193            # Change loss function to include corrective terms if mixing with correction194            if self.args.pmix:195                def criterion(outputs, labels):196                    loss, pred = kettle.mixer.corrected_loss(outputs, extra_labels, lmb=mixing_lmb, loss_fn=loss_fn)197                    return loss198            else:199                criterion = loss_fn200            closure = self._define_objective(inputs, clean_inputs, labels, criterion, self.targets, self.intended_classes,201                                             self.true_classes)202            loss, prediction = victim.compute(closure, self.target_grad, self.target_clean_grad, self.target_gnorm)203            delta_slice = victim.sync_gradients(delta_slice)204            if self.args.clean_grad:205                delta_slice.data = poison_delta[poison_slices].detach().to(**self.setup)206            # Update Step207            if self.args.attackoptim in ['PGD', 'GD']:208                delta_slice = self._pgd_step(delta_slice, poison_images, self.tau0, kettle.dm, kettle.ds)209                # Return slice to CPU:210                poison_delta[poison_slices] = delta_slice.detach().to(device=torch.device('cpu'))211            elif self.args.attackoptim in ['Adam', 'signAdam', 'momSGD', 'momPGD']:212                poison_delta.grad[poison_slices] = delta_slice.grad.detach().to(device=torch.device('cpu'))213                poison_bounds[poison_slices] = poison_images.detach().to(device=torch.device('cpu'))214            else:215                raise NotImplementedError('Unknown attack optimizer.')216        else:217            loss, prediction = torch.tensor(0), torch.tensor(0)218        return loss.item(), prediction.item()219    def _define_objective(self, inputs, clean_inputs, labels, criterion, targets, intended_classes, true_classes):220        """Implement the closure here."""221        def closure(model, optimizer, target_grad, target_clean_grad, target_gnorm):222            """This function will be evaluated on all GPUs."""  # noqa: D401223            differentiable_params = [p for p in model.parameters() if p.requires_grad]224            feature_model, last_layer = bypass_last_layer(model)225            features = feature_model(inputs)226            outputs = last_layer(features)227            # clean features:228            clean_features = feature_model(clean_inputs)229            poison_loss = criterion(outputs, labels)230            prediction = (outputs.data.argmax(dim=1) == labels).sum()231            poison_grad = torch.autograd.grad(poison_loss, differentiable_params, retain_graph=True, create_graph=True, only_inputs=True)232            # add feature term233            feature_loss = (features - clean_features).pow(2).mean()234            # Compute blind passenger loss235            passenger_loss = self._passenger_loss(poison_grad, target_grad, target_clean_grad, target_gnorm)236            total_loss = passenger_loss + self.FEATURE_WEIGHT * feature_loss237            if self.args.centreg != 0:238                total_loss = passenger_loss + self.args.centreg * poison_loss239            total_loss.backward(retain_graph=self.retain)240            return total_loss.detach().cpu(), prediction.detach().cpu()241        return closure242class WitchMatchingMultiTarget(WitchGradientMatching):243    """Variant in which target gradients are matched separately."""244    def _initialize_brew(self, victim, kettle):245        super()._initialize_brew(victim, kettle)246        self.target_grad, self.target_gnorm = [], []247        for target, intended_class in zip(self.targets, self.intended_classes):248            grad, gnorm = victim.gradient(target.unsqueeze(0), intended_class.unsqueeze(0))249            self.target_grad.append(grad)250            self.target_gnorm.append(gnorm)251    def _define_objective(self, inputs, labels, criterion, targets, intended_classes, true_classes):252        """Implement the closure here."""253        def closure(model, optimizer, target_grad, target_clean_grad, target_gnorm):254            """This function will be evaluated on all GPUs."""  # noqa: D401255            differentiable_params = [p for p in model.parameters() if p.requires_grad]256            outputs = model(inputs)257            poison_loss = criterion(outputs, labels)258            prediction = (outputs.data.argmax(dim=1) == labels).sum()259            poison_grad = torch.autograd.grad(poison_loss, differentiable_params, retain_graph=True, create_graph=True)260            matching_loss = 0261            for tgrad, tnorm in zip(target_grad, target_gnorm):262                matching_loss += self._passenger_loss(poison_grad, tgrad, None, tnorm)263            if self.args.centreg != 0:264                matching_loss = matching_loss + self.args.centreg * poison_loss265            matching_loss.backward(retain_graph=self.retain)266            return matching_loss.detach().cpu(), prediction.detach().cpu()...merge_the_tools.py
Source:merge_the_tools.py  
...41    assert get_substrings_with_unique_chars("AB", 2) == ["AB"]42    assert get_substrings_with_unique_chars("ABCC", 2) == ["AB", "C"]43    assert get_substrings_with_unique_chars("ABCDCC", 2) == ["AB", "CD", "C"]44    assert get_substrings_with_unique_chars("ABDDCC", 3) == ["ABD", "DC"]45def test_clean_inputs():46    assert clean_inputs("Aa", 1) == ("AA", 1)47    assert clean_inputs("AaB", 1) == ("AAB", 1)48    assert clean_inputs("Aa ", 1) == ("AA", 1)49    assert clean_inputs("Aa#", 1) == ("AA", 1)50    assert clean_inputs("#Aa", 1) == ("AA", 1)51def run_tests():52    test_get_unique_chars()53    test_split_string_into_len_string_over_k_substrings()54    test_get_substrings_with_unique_chars()55    test_clean_inputs()56def clean_inputs(string, k):57    string = "".join([c for c in string if c.isalpha()])58    string = string.upper()59    # string = string.replace(" ", "")60    k = int(k)61    return string, k62def merge_the_tools(string, k):63    # your code goes here64    # string, k = clean_inputs(string, k)65    substrings = get_substrings_with_unique_chars(string, k)66    [print(s) for s in substrings]67if __name__ == "__main__":68    run_tests()69    # string, k = input(), int(input())...inputRecorder.py
Source:inputRecorder.py  
1from inputs import get_gamepad2import time3controller_events = ["ABS_HAT0X","ABS_HAT0Y","BTN_SOUTH"]4def recordInputs(config):5    input_list = []6    state_queue = {}7    toggled = False8    for e in controller_events:9        state_queue[e] = time.time()10    # Record inputs until toggle is pressed11    while not toggled:12        events = get_gamepad()13        for event in events:14            current_time = time.time()15            if event.code in controller_events:16                try: # Record how long an event lasted17                    previous_event = state_queue[event.code]18                    input_list.append({"Event": event.code, "Start": previous_event[1], "End": current_time, "State": previous_event[0]})19                    state_queue[event.code] = [event.state,current_time]20                except Exception as e: # Used to initialize state_queue21                    state_queue[event.code] = [event.state,current_time]22            elif event.code in config['Record'] and event.state == 0:23                toggled = True24    return cleanInputs(input_list)25# Event.code[event.state] = Keyword26translate_events = {27    "ABS_HAT0X": {-1: "Left", 1: "Right", 0: "IGNORE"},28    "ABS_HAT0Y": {-1: "Up", 1: "Down", 0: "IGNORE"},29    "BTN_SOUTH": {1: "Grab", 0: "IGNORE"}30}31def cleanInputs(unclean_inputs):32    print(unclean_inputs)33    clean_inputs = []34    for item in unclean_inputs:35        if translate_events[item['Event']][item['State']] != "IGNORE":36            clean_inputs.append(item)37    clean_inputs = sorted(clean_inputs, key=lambda k: k['Start'])38    start_time = clean_inputs[0]['Start']39    for item in clean_inputs:40        item['Start'] = round(item['Start'] - start_time,2)41        item['End'] = round(item['End'] - start_time,2)42        item['State'] = translate_events[item['Event']][item['State']]...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
