How to use put_multiple method in autotest

Best Python code snippet using autotest_python

views.py

Source:views.py Github

copy

Full Screen

1import time2import boto33import json4from django.conf import settings5from django.views.decorators.csrf import csrf_exempt6from django.http import HttpResponse, JsonResponse7from libs.hash import get_md58from django.core.files.uploadedfile import UploadedFile9from django.db import transaction10from rest_framework import status, viewsets11from rest_framework.decorators import api_view12from rest_framework.response import Response13from rest_framework.decorators import permission_classes14from .models import MultipleGoods, TagClass, GoodsModel, SingleGoods, Advertising,\15 SpecificationParameter, ParameterOptions16from .serializers import AdvertisingSerializer17from applications.goods.serializers import SingleGoodsSerializer, SimpSingleGoodsSerializer,\18 GoodsModelSerializer, TagClassSerializer, MultipleGoodsSerializer,\19 SimpMultipleGoodsSerializer, SingleGoodsListSerializer,\20 ViewSelectionSerializer, ViewSingleGoodsSerializer, ViewOpenFunctionSerializer,\21 ViewSpecificationParameterSerializer, ListAndGetSpecificationParameterSerializer22from applications.production_manage.models import FunctionInfo, Product, SingleSelection23from applications.production_manage.serializers import SimpProductSerializer24from django.contrib.auth import get_user_model25from applications.setup.permissions import SingleGoodsGroupPermission, AdvertisingGroupPermission,\26 MultipleGoodsGroupPermission, PutawayGroupPermission, GoodsModelGroupPermission, TagClassGroupPermission27from applications.log_manage.models import OperateLog28import datetime29from libs.datetimes import str_to_date30User = get_user_model()31put_time = time.strftime('%Y-%m-%d', time.localtime(time.time()))32today = datetime.date.today()33class SingleMultipleSet(viewsets.ReadOnlyModelViewSet):34 '''35 组合页面的单品下拉列表36 '''37 queryset = SingleGoods.objects.all().order_by("-id")38 serializer_class = SingleGoodsListSerializer39 permission_classes = [MultipleGoodsGroupPermission]40 def get_queryset(self):41 queryset = SingleGoods.objects.all().order_by('-id')42 goods_name = self.request.GET.get('goods_name', "").strip()43 if goods_name:44 queryset = queryset.filter(goods_name__icontains=goods_name)45 return queryset46# 模块配置47class GoodsModelSet(viewsets.ModelViewSet):48 '''49 商品所属模块50 '''51 queryset = GoodsModel.objects.all().order_by("-id")52 serializer_class = GoodsModelSerializer53 permission_classes = [GoodsModelGroupPermission]54 def get_queryset(self):55 queryset = GoodsModel.objects.all().order_by('-id')56 good_model = self.request.GET.get('model_name', "").strip()57 if good_model: # 商品分类名称查询58 queryset = queryset.filter(model_name__icontains=good_model)59 return queryset60 def list(self, request, *args, **kwargs):61 queryset = self.filter_queryset(self.get_queryset())62 page = self.paginate_queryset(queryset)63 if page is not None:64 serializer = self.get_serializer(page, many=True)65 data = serializer.data66 for dd in data:67 parent_id = dd.get('good_parents', '')68 if parent_id is not None:69 goods_parent = GoodsModel.objects.get(pk=parent_id)70 goods_parent = str(goods_parent)71 dd.update({"good_parents": goods_parent})72 return self.get_paginated_response(data)73 serializer = self.get_serializer(queryset, many=True)74 data = serializer.data75 for dd in data:76 parent_id = dd.get('good_parents', '')77 if parent_id is not None:78 goods_parent = GoodsModel.objects.get(pk=parent_id)79 goods_parent = str(goods_parent)80 dd.update({"good_parents": goods_parent})81 return Response(data)82 def create(self, request, *args, **kwargs):83 try:84 with transaction.atomic():85 super(GoodsModelSet, self).create(request, *args, **kwargs)86 OperateLog.create_log(request)87 return Response({}, status=status.HTTP_200_OK)88 except Exception as e:89 return Response({'error': '该模块已存在', 'e': str(e)}, status=status.HTTP_400_BAD_REQUEST)90 def update(self, request, *args, **kwargs):91 try:92 super(GoodsModelSet, self).update(request, *args, **kwargs)93 OperateLog.create_log(request)94 return Response({}, status=status.HTTP_200_OK)95 except Exception as e:96 return Response({'error': '该模块已存在', 'e': str(e)}, status=status.HTTP_400_BAD_REQUEST)97 def destroy(self, request, *args, **kwargs):98 pk = kwargs99 model_query = GoodsModel.objects.all()100 model_num = model_query.filter(good_parents=pk['pk']).count()101 if model_num >0:102 return Response({'error': '该模块含有子类无法删除'}, status=status.HTTP_400_BAD_REQUEST)103 OperateLog.create_log(request)104 return super(GoodsModelSet, self).destroy(request, *args, **kwargs)105class FilterModelSet(viewsets.ReadOnlyModelViewSet):106 '''107 模块父类108 '''109 queryset = GoodsModel.objects.exclude(good_parents__isnull=False).order_by("-id")110 serializer_class = GoodsModelSerializer111 permission_classes = [GoodsModelGroupPermission]112 def get_queryset(self):113 queryset = GoodsModel.objects.exclude(good_parents__isnull=False).order_by("-id")114 good_model = self.request.GET.get('model_name', "").strip()115 if good_model: # 商品分类名称查询116 queryset = queryset.filter(model_name__icontains=good_model)117 return queryset118# 标签配置119class TagManageSet(viewsets.ModelViewSet):120 queryset = TagClass.objects.all().order_by("-id")121 serializer_class = TagClassSerializer122 permission_classes = [TagClassGroupPermission]123 def get_queryset(self):124 queryset = TagClass.objects.all().order_by('-id')125 label_category = self.request.GET.get('label_category', '').strip()126 goods_tag = self.request.GET.get('goods_tag', '').strip()127 if goods_tag:128 queryset = queryset.filter(goods_tag__icontains=goods_tag)129 if label_category:130 queryset = queryset.filter(label_category=label_category)131 return queryset132 def create(self, request, *args, **kwargs):133 try:134 with transaction.atomic():135 super(TagManageSet, self).create(request, *args, **kwargs)136 OperateLog.create_log(request)137 return Response({}, status=status.HTTP_200_OK)138 except Exception as e:139 return Response({'error': '该标签已存在', 'e': str(e)}, status=status.HTTP_400_BAD_REQUEST)140 def update(self, request, *args, **kwargs):141 try:142 super(TagManageSet, self).update(request, *args, **kwargs)143 OperateLog.create_log(request)144 return Response({}, status=status.HTTP_200_OK)145 except Exception as e:146 return Response({'error': '该标签已存在', 'e': str(e)}, status=status.HTTP_400_BAD_REQUEST)147 def destroy(self, request, *args, **kwargs):148 OperateLog.create_log(request)149 return super(TagManageSet, self).destroy(request, *args, **kwargs)150class SingleProductManageset(viewsets.ModelViewSet):151 queryset = SingleGoods.objects.all().order_by('-id')152 permission_classes = [SingleGoodsGroupPermission]153 def get_serializer_class(self):154 if self.suffix == 'List' and self.request.method == 'GET':155 return SimpSingleGoodsSerializer156 else:157 return SingleGoodsSerializer158 def get_queryset(self):159 queryset = SingleGoods.objects.all().order_by('-id')160 goods_sn = self.request.GET.get('goods_sn', "").strip()161 goods_model = self.request.GET.get('goods_model', "").strip()162 goods_name = self.request.GET.get('goods_name', "").strip()163 if goods_name:164 queryset = queryset.filter(goods_name__icontains=goods_name)165 if goods_sn:166 queryset = queryset.filter(goods_sn__icontains=goods_sn)167 if goods_model:168 queryset = queryset.filter(goods_model__model_name__icontains=goods_model)169 return queryset170 def retrieve(self, request, *args, **kwargs):171 instance = self.get_object()172 serializer = self.get_serializer(instance)173 for func_name_list in serializer.data['goods_selection']:174 pro = FunctionInfo.objects.all().filter(pk=func_name_list['function']).select_related('product') \175 .values_list('product__product', 'func_name')176 func_name_list['fun_name'] = list(pro)[0][1]177 func_name_list['product'] = list(pro)[0][0]178 for opts in serializer.data['par_options']:179 if isinstance(opts, dict):180 file_list = ParameterOptions.objects.filter(pk=opts['id'])\181 .select_related('control__file_name', 'control__fill_control')\182 .values_list('control__id', 'control__fill_control')183 control_id, control_type = file_list[0]184 opts['control_id'] = control_id185 opts['control_type'] = control_type186 return Response(serializer.data)187 def create(self, request, *args, **kwargs):188 serializer = self.get_serializer(data=request.data)189 serializer.is_valid(raise_exception=True)190 try:191 with transaction.atomic():192 self.perform_create(serializer)193 except Exception as e:194 return Response({'error': str(e)}, status=status.HTTP_400_BAD_REQUEST)195 OperateLog.create_log(request)196 return Response(status=status.HTTP_200_OK)197 def update(self, request, *args, **kwargs):198 data = request.data199 instance = self.get_object()200 serializer = self.get_serializer(instance, data=data)201 serializer.is_valid(raise_exception=True)202 with transaction.atomic():203 instance = serializer.save()204 OperateLog.create_log(request)205 return Response(SingleGoodsSerializer(instance).data, status=status.HTTP_200_OK)206 def destroy(self, request, *args, **kwargs):207 today = datetime.date.today()208 pk = kwargs209 goods_query = SingleGoods.objects.all()210 sings1 = goods_query.filter(parent=pk['pk']).count()211 putaway_off_time = goods_query.filter(pk=pk['pk'])\212 .values_list('putaway_off_time', flat=True)213 if putaway_off_time:214 if putaway_off_time[0] >= today:215 return Response({'error': '该商品已经上架,请下架后再删除'}, status=status.HTTP_400_BAD_REQUEST)216 if sings1 > 0:217 return Response({'error': '该商品含有子类无法删除'}, status=status.HTTP_400_BAD_REQUEST)218 else:219 OperateLog.create_log(request)220 return super(SingleProductManageset, self).destroy(request, *args, **kwargs)221class MultipleProductManageset(viewsets.ModelViewSet):222 '''223 组合商品配置224 '''225 queryset = MultipleGoods.objects.all().order_by('-id')226 permission_classes = [MultipleGoodsGroupPermission]227 def get_serializer_class(self):228 if self.suffix == 'List' and self.request.method == 'GET':229 return SimpMultipleGoodsSerializer230 else:231 return MultipleGoodsSerializer232 def get_queryset(self):233 queryset = MultipleGoods.objects.all().order_by('-id')234 m_goods_sn = self.request.GET.get('m_goods_sn', "").strip()235 m_goods_model = self.request.GET.get('m_goods_model', "").strip()236 m_goods_name = self.request.GET.get('m_goods_name', "").strip()237 if m_goods_name:238 queryset = queryset.filter(m_goods_name__icontains=m_goods_name)239 if m_goods_sn:240 queryset = queryset.filter(m_goods_sn__icontains=m_goods_sn)241 if m_goods_model:242 queryset = queryset.filter(m_goods_model__icontains=m_goods_model)243 return queryset244 def retrieve(self, request, *args, **kwargs):245 instance = self.get_object()246 serializer = self.get_serializer(instance)247 for opts in serializer.data['multiple_par_options']:248 if isinstance(opts, dict):249 file_name_list = ParameterOptions.objects.filter(pk=opts['id'])\250 .select_related('control__file_name', 'control__fill_control')\251 .values_list('control__id', 'control__fill_control')252 control_id, control_type = file_name_list[0]253 opts['control_id'] = control_id254 opts['control_type'] = control_type255 return Response(serializer.data)256 def create(self, request, *args, **kwargs):257 goods_data = request.data258 serializer = self.get_serializer(data=goods_data)259 serializer.is_valid(raise_exception=True)260 try:261 with transaction.atomic():262 self.perform_create(serializer)263 except Exception as e:264 return Response({'error': str(e)}, status=status.HTTP_400_BAD_REQUEST)265 OperateLog.create_log(request)266 return Response(status=status.HTTP_200_OK)267 def update(self, request, *args, **kwargs):268 data = request.data269 instance = self.get_object()270 serializer = self.get_serializer(instance, data=data)271 serializer.is_valid(raise_exception=True)272 with transaction.atomic():273 instance = serializer.save()274 OperateLog.create_log(request)275 return Response(MultipleGoodsSerializer(instance).data, status=status.HTTP_200_OK)276 def destroy(self, request, *args, **kwargs):277 today = datetime.date.today()278 putaway_off_time = MultipleGoods.objects.all().filter(pk=kwargs['pk'])\279 .values_list('putaway_off_time', flat=True)280 if putaway_off_time[0] >= today:281 return Response({'error': '该商品已经上架,请下架后再删除'}, status=status.HTTP_400_BAD_REQUEST)282 OperateLog.create_log(request)283 return super(MultipleProductManageset, self).destroy(request, *args, **kwargs)284# 广告管理285class AdvertisingViewSet(viewsets.ModelViewSet):286 queryset = Advertising.objects.all()287 serializer_class = AdvertisingSerializer288 permission_classes = [AdvertisingGroupPermission]289 def get_queryset(self):290 queryset = Advertising.objects.all().order_by('-id')291 ad_position = self.request.GET.get('ad_position', "").strip()292 ad_name = self.request.GET.get('ad_name', "").strip()293 ad_put_recent_on = self.request.GET.get('ad_put_recent_on', "").strip()294 ad_put_operator = self.request.GET.get('ad_put_operator', "").strip()295 ad_status = self.request.GET.get('ad_status', "").strip()296 # 操作人297 if ad_put_operator and ad_put_operator != 'undefined':298 queryset = queryset.filter(ad_put_operator__last_name__icontains=ad_put_operator)299 # 广告名称300 if ad_name and ad_name != 'undefined':301 queryset = queryset.filter(ad_name__icontains=ad_name)302 # 位置303 if ad_position and ad_position != 'undefined':304 queryset = queryset.filter(ad_position__icontains=ad_position)305 # 上架时间306 if ad_put_recent_on and ad_put_recent_on != 'undefined':307 queryset = queryset.filter(ad_put_recent_on__icontains=ad_put_recent_on)308 # 上架状态309 if ad_status and ad_status != 'undefined':310 queryset = queryset.filter(ad_status__icontains=ad_status)311 return queryset312 def list(self, request, *args, **kwargs):313 queryset = self.filter_queryset(self.get_queryset())314 page = self.paginate_queryset(queryset)315 if page is not None:316 serializer = self.get_serializer(page, many=True)317 return self.get_paginated_response(serializer.data)318 serializer = self.get_serializer(queryset, many=True)319 return Response(serializer.data)320 def create(self, request, *args, **kwargs):321 if request.user not in User.objects.all():322 return Response({'error': '亲,请先登录!'}, status=status.HTTP_400_BAD_REQUEST)323 serializer = self.get_serializer(data=request.data)324 serializer.is_valid(raise_exception=True)325 self.perform_create(serializer)326 headers = self.get_success_headers(serializer.data)327 OperateLog.create_log(request)328 return Response(serializer.data, status=status.HTTP_200_OK, headers=headers)329 def update(self, request, *args, **kwargs):330 data = request.data331 if int(data['ad_position']) == 0 and int(data['ad_status']) == 1:332 ad_instance = Advertising.objects.all().filter(ad_position=0, ad_status=1)333 ad_nums = ad_instance.count()334 if ad_nums >= 5 and (Advertising.objects.get(pk=data['id']) not in ad_instance):335 return Response({'error': '已经上架的首页轮播超过五个'}, status=status.HTTP_400_BAD_REQUEST)336 instance = self.get_object()337 serializer = self.get_serializer(instance, data=data)338 serializer.is_valid(raise_exception=True)339 with transaction.atomic():340 instance = serializer.save()341 OperateLog.create_log(request)342 return Response(AdvertisingSerializer(instance).data, status=status.HTTP_200_OK)343 def destroy(self, request, *args, **kwargs):344 instance = self.get_object()345 ad_status = Advertising.objects.all().filter(pk=kwargs['pk']) \346 .values_list('ad_status', flat=True)347 if ad_status[0] == 1:348 return Response({'error': '该商品已经上架,请下架后再删除'}, status=status.HTTP_400_BAD_REQUEST)349 self.perform_destroy(instance)350 OperateLog.create_log(request)351 return Response(status=status.HTTP_200_OK)352# 产品规格参数353class SpecificationParameterViewSet(viewsets.ModelViewSet):354 queryset = SpecificationParameter.objects.all()355 def get_serializer_class(self):356 if self.suffix == 'List' and self.request.method == 'GET':357 return ListAndGetSpecificationParameterSerializer358 else:359 return ViewSpecificationParameterSerializer360 def get_queryset(self):361 queryset = SpecificationParameter.objects.all().order_by('-id')362 file_name = self.request.GET.get('file_name', "").strip()363 param_model = self.request.GET.get('param_model', "").strip()364 if file_name:365 queryset = queryset.filter(file_name__icontains=file_name)366 if param_model:367 queryset = queryset.filter(param_model__model_name__icontains=param_model)368 return queryset369 def retrieve(self, request, *args, **kwargs):370 instance = self.get_object()371 serializer = self.get_serializer(instance)372 ret = dict(serializer.data)373 ret['param_model'] = ret['param_model']['id']374 return Response(ret)375 def create(self, request, *args, **kwargs):376 serializer = self.get_serializer(data=request.data)377 serializer.is_valid(raise_exception=True)378 self.perform_create(serializer)379 # OperateLog.create_log(request)380 return Response(serializer.data, status=status.HTTP_200_OK)381 def update(self, request, *args, **kwargs):382 instance = self.get_object()383 serializer = self.get_serializer(instance, data=request.data)384 serializer.is_valid(raise_exception=True)385 self.perform_update(serializer)386 OperateLog.create_log(request)387 return Response(serializer.data)388 def destroy(self, request, *args, **kwargs):389 instance = self.get_object()390 # 控件下属选项如果在产品参数配置中不允许删除391 number1 = SingleGoods.objects.all().filter(par_options__control=instance).count()392 number2 = MultipleGoods.objects.all().filter(multiple_par_options__control=instance).count()393 if (number1 > 0) or (number2 > 0):394 return Response({'error': '该规格参数正在使用中无法删除'}, status=status.HTTP_400_BAD_REQUEST)395 self.perform_destroy(instance)396 # OperateLog.create_log(request)397 return Response(status=status.HTTP_200_OK)398# 广告上架399@api_view(['GET'])400def ad_put(request):401 ad_id = request.GET.get('ad_id', '').strip()402 instance = Advertising.objects.all().get(pk=ad_id)403 # 判断是否是首页轮播图404 if instance.ad_position == 1:405 # 下架操作406 if instance.ad_status == 1:407 instance.ad_status = 2408 instance.ad_put_off = put_time409 instance.save()410 return Response(status=status.HTTP_200_OK)411 else:412 ad_nums = Advertising.objects.all().filter(ad_position=0, ad_status=1).count()413 # 下架状态的广告要上架,必须满足已经上架的没超过五个414 if instance.ad_status == 2 and ad_nums < 5:415 instance.ad_status = 1416 instance.ad_put_recent_on = put_time417 instance.save()418 return Response(status=status.HTTP_200_OK)419 elif instance.ad_status == 3 and ad_nums < 5:420 return Response({'reminder': '是否提前上架'}, status=status.HTTP_200_OK)421 elif ad_nums >= 5:422 return Response({'error': '已经上架的首页轮播超过五个'}, status=status.HTTP_400_BAD_REQUEST)423 # 不是首页轮播图暂时没有上架数量限制424 else:425 if instance.ad_status == 1:426 instance.ad_status = 2427 instance.ad_put_off = put_time428 instance.save()429 return Response(status=status.HTTP_200_OK)430 else:431 # ad_nums = Advertising.objects.all().filter(ad_position=0, ad_status=1).count()432 if instance.ad_status == 2:433 instance.ad_status = 1434 instance.ad_put_recent_on = put_time435 instance.save()436 return Response(status=status.HTTP_200_OK)437 elif instance.ad_status == 3:438 return Response({'reminder': '是否提前上架'}, status=status.HTTP_200_OK)439# 提前上架440@api_view(['GET'])441def ad_time(request):442 ad_id = request.GET.get('ad_id', '').strip()443 ins = Advertising.objects.all()444 instance = ins.get(pk=ad_id)445 # 判断是否是首页轮播图446 if instance.ad_position == 1:447 ad_nums = ins.filter(ad_position=0, ad_status=1).count()448 if ad_nums >= 5:449 return Response({'error': '已经上架的首页轮播超过五个'}, status=status.HTTP_400_BAD_REQUEST)450 # 不是首页轮播图暂时没有上架数量限制 上架首页轮播图少于五个可以上架451 instance.ad_status = 1452 instance.ad_put_recent_on = put_time453 instance.save()454 OperateLog.create_log(request)455 return Response(status=status.HTTP_200_OK)456# 已添加的组合成员单品列表457@api_view(['GET'])458def Members_list(request):459 ret = []460 try:461 single_id = dict(request.GET)['id']462 id_list = []463 for ids in single_id:464 if ids in id_list:465 continue466 else:467 id_list.append(ids)468 for id in id_list:469 good_instance = SingleGoods.objects.all().filter(pk=id)470 good = SimpSingleGoodsSerializer(good_instance, many=True)471 if []:472 ret = good.data473 else:474 for i in good.data:475 ret.append(i)476 return Response(ret, status=status.HTTP_200_OK)477 except:478 return Response(ret, status=status.HTTP_200_OK)479@api_view(['GET'])480def label_list(request):481 """482 标签下拉接口483 """484 label_category = request.GET.get('label_category', '').strip()485 ret = {'data': []}486 tagclass = TagClass.objects.all().filter(label_category=label_category)487 tags = TagClassSerializer(tagclass, many=True)488 for tag in tags.data:489 ret['data'].append(tag)490 return Response(ret, status=status.HTTP_200_OK)491@api_view(['GET'])492def product_list(request):493 """494 产品列表495 产品名称/功能开关名称/功能开关选项值496 参数: 不给 /选定产品id / 选定功能开关id497 """498 ret = []499 goods = Product.objects.all().filter(classify=2)500 pro = SimpProductSerializer(goods, many=True)501 ret.append(pro.data)502 return Response(ret, status=status.HTTP_200_OK)503@api_view(['GET'])504def function_list(request):505 """506 功能开关 给我一个product_id 不是单品id507 """508 func_id = request.GET.get('goods_id', '').strip()509 ret = []510 func_instance = FunctionInfo.objects.all().filter(product_id=func_id)511 func_list = ViewOpenFunctionSerializer(func_instance, many=True)512 for func in func_list.data:513 ret.append(func)514 return Response(ret, status=status.HTTP_200_OK)515@api_view(['GET'])516def selection_list(request):517 """518 功能开关选项519 """520 function_id = request.GET.get('function_id', '').strip()521 ret = []522 func = SingleSelection.objects.all().filter(function_id=function_id)523 tags = ViewSelectionSerializer(func, many=True)524 for tag in tags.data:525 ret.append(tag)526 return Response(ret, status=status.HTTP_200_OK)527@api_view(['GET'])528def parent_list(request):529 """530 前置产品531 """532 ret = []533 parent = SingleGoods.objects.all()534 par = ViewSingleGoodsSerializer(parent, many=True)535 ret.append(par.data)536 return Response(ret, status=status.HTTP_200_OK)537@api_view(['GET'])538def models_list(request):539 """540 所属商品模块541 """542 ret = []543 models = GoodsModel.objects.all()544 par = GoodsModelSerializer(models, many=True)545 mods = par.data546 for mod in mods:547 ret.append(mod)548 return Response(ret, status=status.HTTP_200_OK)549@csrf_exempt550def get_editor(request):551 """富文本上传jpeg图片,返回该图片的url"""552 if not request.method == "POST":553 return JsonResponse({"error": "request method not allowed"}, status=status.HTTP_400_BAD_REQUEST)554 cli = boto3.client(**settings.BAISHANYUN_CONFIGS) # 获得客户端会话对象555 file = request.FILES['myFileName']556 assert isinstance(file, UploadedFile) # 获取到的文件信息应为一个UploadedFile对象557 file_type = file.name.split(".")[-1]558 if file_type not in ("jpg", "jpeg", "png"):559 return JsonResponse({"error": "文件格式不合法,目前只支持jpeg, jpg, png"}, status=status.HTTP_400_BAD_REQUEST)560 data = file.read()561 key = get_md5(data)562 cli.put_object(563 ACL='public-read', # 公共可读564 Bucket='minioss', # 固定565 Key=f"oa/{key}.jpeg", # 文件保存在oa目录下566 ContentType=f'image/jpeg',567 Body=data568 )569 url = f'http://s2.i.qingcdn.com/minioss/oa/{key}.jpeg'570 return HttpResponse((json.dumps(url)))571# 上架新增&修改接口572@api_view(['POST', 'PUT'])573@permission_classes([PutawayGroupPermission, ])574def create_put_up(request):575 today = datetime.date.today()576 bod = request.body577 bod = str(bod, encoding="utf-8")578 bod = json.loads(bod)579 goods_sn = bod.get('goods_sn', '')580 on_time = bod.get('putaway_recent_on_time', '1970-01-01').strip()581 off_time = bod.get('putaway_off_time', '1970-01-01').strip()582 sell_status = bod.get('sell_status', '1')583 recommend = bod.get('recommend', '2')584 put_price = bod.get('put_price', '0')585 goods_price = bod.get('goods_price', '')586 user = request.user.last_name587 on_time = str_to_date(on_time)588 off_time = str_to_date(off_time)589 today = datetime.date.today()590 # 判断请求方式以及上架状态591 if (request.method == 'PUT') and (on_time <= today) and (off_time > today):592 return Response({'error': '上架商品无法修改,请先下架'}, status=status.HTTP_400_BAD_REQUEST)593 # 单品上架新增信息594 s_goods_info = {595 "putaway_recent_on_time": on_time,596 "putaway_off_time": off_time,597 "sell_status": sell_status,598 "recommend": recommend,599 "put_price": put_price,600 "putaway_operator": user,601 "goods_price": goods_price602 }603 # 组合商品上架新增信息604 m_goods_info = {605 "putaway_recent_on_time": on_time,606 "putaway_off_time": off_time,607 "sell_status": sell_status,608 "recommend": recommend,609 "put_price": put_price,610 "putaway_operator": user,611 }612 if goods_sn[0] == 'A':613 res = SingleGoods.objects.filter(goods_sn=goods_sn).update(**s_goods_info)614 if goods_sn[0] == 'B':615 res = MultipleGoods.objects.filter(m_goods_sn=goods_sn).update(**m_goods_info)616 OperateLog.create_log(request)617 return Response({"info": "上架成功"}, status=status.HTTP_200_OK)618# 上架查看接口619@api_view(['GET'])620@permission_classes([PutawayGroupPermission, ])621def list_put_up(request):622 today = datetime.date.today()623 # 过滤上架时间是1970-01-01的产品624 put_single = SingleGoods.objects.all().exclude(putaway_recent_on_time='1970-01-01').order_by('-putaway_recent_on_time')625 put_multiple = MultipleGoods.objects.all().exclude(putaway_recent_on_time='1970-01-01').order_by('-putaway_recent_on_time')626 # 页码627 page = request.GET.get('page', 1)628 page = int(page)629 # 商品编码630 goods_sn = request.GET.get('goods_sn', '').strip()631 # 商品名字632 goods_name = request.GET.get('goods_name', '').strip()633 # 商品属性634 goods_attribute = request.GET.get('goods_attribute', '').strip()635 # 售卖状态636 sell_status = request.GET.get('sell_status', '').strip()637 # 操作人638 putaway_operator = request.GET.get('putaway_operator', '').strip()639 # 最近上架时间640 putaway_recent_on_time = request.GET.get('putaway_recent_on_time', '').strip()641 # 上架状态642 putaway_on_status = request.GET.get('putaway_on_status', '').strip()643 # 下架时间644 putaway_off_time = request.GET.get('putaway_off_time', '').strip()645 if goods_sn:646 put_single = put_single.filter(goods_sn__icontains=goods_sn)647 put_multiple = put_multiple.filter(m_goods_sn__icontains=goods_sn)648 if goods_name:649 put_single = put_single.filter(goods_name__icontains=goods_name)650 put_multiple = put_multiple.filter(m_goods_name__icontains=goods_name)651 if sell_status:652 put_single = put_single.filter(sell_status=sell_status)653 put_multiple = put_multiple.filter(sell_status=sell_status)654 if putaway_on_status == '1':#上架655 put_single = put_single.filter(putaway_recent_on_time__lte=today,656 putaway_off_time__gt=today)657 put_multiple = put_multiple.filter(putaway_recent_on_time__lte=today,658 putaway_off_time__gt=today)659 if putaway_on_status == '2':#下架660 put_single = put_single.filter(putaway_off_time__lte=today)661 put_multiple = put_multiple.filter(putaway_off_time__lte=today)662 if putaway_on_status == '3':#待上架663 put_single = put_single.filter(putaway_recent_on_time__gt=today, putaway_off_time__gt=today)664 put_multiple = put_multiple.filter(putaway_recent_on_time__gt=today, putaway_off_time__gt=today)665 if putaway_operator:666 put_single = put_single.filter(putaway_operator=putaway_operator)667 put_multiple = put_multiple.filter(putaway_operator=putaway_operator)668 if putaway_recent_on_time:669 put_single = put_single.filter(putaway_recent_on_time=putaway_recent_on_time)670 put_multiple = put_multiple.filter(putaway_recent_on_time=putaway_recent_on_time)671 if putaway_off_time:672 put_single = put_single.filter(putaway_off_time=putaway_off_time)673 put_multiple = put_multiple.filter(putaway_off_time=putaway_off_time)674 # 获取总条数675 count1 = put_single.count()676 count2 = put_multiple.count()677 total_count = count1+count2678 # 获取总页数679 total_page = total_count//10680 total_page = total_page+1681 result_s = []682 index_s = 0683 for single in put_single:684 s_on_time = single.putaway_recent_on_time685 s_off_time = single.putaway_off_time686 if s_on_time <= today and s_off_time > today:687 putaway_on_status = "上架"688 elif (s_on_time <= today and s_off_time <= today) or (s_off_time <= today):689 putaway_on_status = "下架"690 elif s_on_time > today and s_off_time <=today:691 putaway_on_status = "下架"692 elif s_on_time > today and s_off_time > today:693 putaway_on_status = "待上架"694 index_s += 1695 result_s.append(696 {"id": single.id,697 "putaway_recent_on_time": s_on_time,698 "put_price": single.put_price,699 "putaway_on_status": putaway_on_status,700 "sell_status": single.sell_status,701 "putaway_off_time": s_off_time,702 "putaway_operator": single.putaway_operator,703 "goods_sn": single.goods_sn,704 "goods_name": single.goods_name,705 "recommend": single.recommend,706 "goods_price": single.goods_price,707 "goods_attribute": "单件商品",708 "index": index_s709 }710 )711 index_m = count1-1712 result_m = []713 for multiple in put_multiple:714 m_on_time = multiple.putaway_recent_on_time715 m_off_time = multiple.putaway_off_time716 if m_off_time <= today:717 putaway_on_status = "下架"718 elif m_on_time <= today and m_off_time > today:719 putaway_on_status = "上架"720 elif m_on_time > today:721 putaway_on_status = "待上架"722 index_m += 1723 result_m.append(724 {"id": multiple.id,725 "putaway_recent_on_time": m_on_time,726 "put_price": multiple.put_price,727 "putaway_on_status": putaway_on_status,728 "sell_status": multiple.sell_status,729 "putaway_off_time": m_on_time,730 "putaway_operator": multiple.putaway_operator,731 "goods_sn": multiple.m_goods_sn,732 "goods_name": multiple.m_goods_name,733 "recommend": multiple.recommend,734 "goods_attribute": "组合商品",735 "index": index_m736 }737 )738 if page == 1:739 start = 0740 end = 10741 else:742 start = 10 * (page - 1)743 end = 10 * page744 if goods_attribute == '1':745 result_s = result_s[start:end]746 return Response({"total_count": count1, "total_page": total_page, "results": result_s}, status=status.HTTP_200_OK)747 elif goods_attribute == '2':748 result_m = result_m[start:end]749 return Response({"total_count": count2, "total_page": total_page, "results": result_m}, status=status.HTTP_200_OK)750 result = result_s.extend(result_m)751 result_s = result_s[start:end]752 return Response({"total_count": total_count, "total_page": total_page, "results": result_s}, status=status.HTTP_200_OK)753# 上架商品选择接口754@api_view(['GET'])755@permission_classes([PutawayGroupPermission, ])756def put_goods(request):757 put_single = SingleGoods.objects.all().filter(putaway_recent_on_time='1970-01-01')758 put_multiple = MultipleGoods.objects.all().filter(putaway_recent_on_time='1970-01-01')759 goods_name = request.GET.get('goods_name', '').strip()760 sid = request.GET.get('sid', '')761 if sid == '1' and goods_name:762 put_single = put_single.filter(goods_name__icontains=goods_name)763 if sid == '2' and goods_name:764 put_multiple = put_multiple.filter(m_goods_name__icontains=goods_name)765 result_s = []766 for single in put_single:767 result_s.append(768 {"id": single.id,769 "goods_name": single.goods_name,770 "goods_sn": single.goods_sn771 })772 result_m = []773 for multiple in put_multiple:774 result_m.append(775 {776 "id":multiple.id,777 "goods_name": multiple.m_goods_name,778 "goods_sn":multiple.m_goods_sn779 }780 )781 count1 = put_single.count()782 count2 = put_multiple.count()783 total_page1 = count1//10+1784 total_page2 = count2//10+1785 if sid == '1':786 return Response({"total_count": count1, "total_page": total_page1, "results": result_s},status=status.HTTP_200_OK)787 if sid == '2':788 return Response({"total_count": count2, "total_page": total_page2, "results": result_m}, status=status.HTTP_200_OK)789# 上架删除操作接口790@api_view(['DELETE'])791@permission_classes([PutawayGroupPermission, ])792def delete_put(request):793 goods_sn = request.GET.get('goods_sn', '')794 putaway_on_status = request.GET.get('putaway_on_status', '')795 # 单品796 if putaway_on_status == '上架' and goods_sn:797 return Response({'error': '上架商品无法修改,请先下架'}, status=status.HTTP_400_BAD_REQUEST)798 if goods_sn[0] == 'A' and putaway_on_status is not '上架':799 ret = SingleGoods.objects.filter(goods_sn=goods_sn).update(putaway_recent_on_time='1970-01-01',800 putaway_off_time='1970-01-01')801 elif goods_sn[0] == 'B' and putaway_on_status is not '上架':802 ret = MultipleGoods.objects.filter(m_goods_sn=goods_sn).update(putaway_recent_on_time='1970-01-01',803 putaway_off_time='1970-01-01')804 OperateLog.create_log(request)805 return Response({"info": "删除成功"}, status=status.HTTP_200_OK)806@api_view(['PUT'])807@permission_classes([PutawayGroupPermission, ])808def put_up(request):809 """810 上架操作811 """812 today = datetime.date.today()813 bod = request.body814 bod = str(bod, encoding="utf-8")815 bod = json.loads(bod)816 goods_sn = bod.get('goods_sn', None)817 putaway_recent_on_time = bod.get('on_time', '1970-01-01')818 putaway_off_time = bod.get('off_time', None)819 if goods_sn[0] == 'A':820 up1 = SingleGoods.objects.filter(goods_sn=goods_sn).update(putaway_recent_on_time=putaway_recent_on_time,821 putaway_off_time=putaway_off_time)822 elif goods_sn[0] == 'B':823 up1 = MultipleGoods.objects.filter(m_goods_sn=goods_sn).update(putaway_recent_on_time=putaway_recent_on_time,824 putaway_off_time=putaway_off_time)825 OperateLog.create_log(request)826 return Response(status=status.HTTP_200_OK)827@api_view(['PUT'])828@permission_classes([PutawayGroupPermission, ])829def put_off(request):830 """831 下架操作832 """833 today = datetime.date.today()834 bod = request.body835 bod = str(bod, encoding="utf-8")836 bod = json.loads(bod)837 goods_sn = bod.get('goods_sn', None)838 if goods_sn[0] == 'A':839 off = SingleGoods.objects.filter(goods_sn=goods_sn).update(putaway_off_time=today)840 if goods_sn[0] == 'B':841 off = MultipleGoods.objects.filter(m_goods_sn=goods_sn).update(putaway_off_time=today)842 return Response(status=status.HTTP_200_OK)843@api_view(['GET'])844def test_put(request):845 ret1 = SingleGoods.objects.all().update(putaway_recent_on_time="2018-05-01", putaway_off_time="2020-05-01")846 ret2 = MultipleGoods.objects.all().update(putaway_recent_on_time="2018-05-01", putaway_off_time="2020-05-01")...

Full Screen

Full Screen

sch.py

Source:sch.py Github

copy

Full Screen

...17def set_id(prop, val):18 prop.put1('id', val)19 20def set_at(prop, x, y, rotation):21 prop.put_multiple('at', [x, y, rotation])22def set_effects(prop, top_bottom):23 elts = Sexp()24 elts.put1('font', Sexp(key='size', elts=[1.27, 1.27]))25 elts.put_multiple('justify', Sexp(elts=[sym('left'), sym(top_bottom)]))26 27 prop.put_multiple('effects', elts)28def make_stroke():29 val = Sexp()30 val.put1('width', 0.001)31 val.put1('type', sym('solid'))32 val.put_multiple('color', [132, 0, 132, 1])33 return val34 35def make_fill():36 val = Sexp()37 val.put_multiple('color', [255, 255, 255, 0])38 return val39def make_path(path, page_num):40 val = Sexp('path')41 val.append(path)42 val.put1('page', str(page_num) if page_num else '')43 return val44def read_sch(filename):45 with sexp.PeekStream(filename) as inf:46 sch = Sch().read_exp(inf)47 sch.local_name = filename48 return sch49def read_pcb(filename):50 with sexp.PeekStream(filename) as inf:51 pcb = Pcb().read_exp(inf)52 return pcb53class Pcb(Sexp):54 def __init__(self):55 pass56 57 def setup(self):58 left = math.inf59 right = -math.inf60 top = math.inf61 bottom = -math.inf62 self.nets = []63 for item in self:64 if keyeq(item, 'gr_line') and item.get1('layer') == 'Edge.Cuts':65 x0, y0 = item.get_multiple('start')66 x1, y1 = item.get_multiple('end')67 left = min(left, x0, x1)68 right = max(right,x0, x1)69 top = min(top, y0, y1)70 bottom = max(bottom, y0, y1 )71 if keyeq(item, 'net'):72 net_id = item.list[1]73 net_name = item.list[2]74 if net_id != len(self.nets):75 raise ValueError('unexpected net_id', net_id, len(self.nets))76 self.nets.append(net_name)77 self.left = left78 self.top = top79 self.width = right - left80 self.height = bottom - top81def make_empty():82 ret = Sch()83 ret.list.append(sym('kicad_sch'))84 ret.put1('version', 20201015)85 ret.put1('paper', "A4")86 return ret87class Sheet:88 def __init__(self):89 self.sch = None90 self.pcb = None91 self.local_name = None92 self.page_num = None93 self.insts = []94 def __repr__(self):95 items = []96 if self.local_name:97 items.append(re.sub('[.].*', '', self.local_name))98 if self.page_num:99 items.append(f'p{self.page_num}')100 items.append(f'insts={len(self.insts)}')101 items.append(f'{id(self) & 0xffff:x}')102 msg = ' '.join(items)103 return f'<sheet {msg}>'104# no sub sheets on top level105def is_simple(sch):106 if sch.get1('version') != 20201015:107 print('unexpected version')108 return False109 safe_clauses = {110 sym('version'), 111 sym('generator'), 112 sym('paper'), 113 sym('lib_symbols'),114 sym('sheet_instances'),115 sym('symbol_instances'),116 sym('junction'),117 sym('wire'),118 sym('hierarchical_label'),119 sym('symbol'),120 sym('no_connect'),121 }122 reject_clauses = {123 }124 for item in sch:125 if item == sym('kicad_sch'):126 continue127 if keyeq(item, 'sheet'):128 return False129 key = sexp.car(item)130 if key in safe_clauses:131 continue132 if key in reject_clauses:133 return False134 raise ValueError('unknown clause', str(item))135 return True136# top level has just a sheet137def is_just_sheet(sch):138 if sch.get1('version') != 20201015:139 print('unexpected version')140 return False141 safe_clauses = {142 sym('version'), 143 sym('generator'), 144 sym('paper'), 145 sym('lib_symbols'),146 sym('sheet_instances'),147 sym('symbol_instances'),148 }149 reject_clauses = {150 sym('no_connect'),151 sym('wire'),152 }153 sheet_count = 0154 for item in sch:155 if item == sym('kicad_sch'):156 continue157 if keyeq(item, 'sheet'):158 sheet_count += 1159 continue160 key = sexp.car(item)161 if key in safe_clauses:162 continue163 if key in reject_clauses:164 return False165 raise ValueError('unknown clause', str(item))166 if sheet_count == 1:167 return True168 169 return False170def round_to_50mil(x):171 return math.floor(x / 25.4 * 200) / 200 * 25.4172def make_sheet_item_prototype(sheet, inst_name):173 posx = round_to_50mil(random.uniform(160, 250))174 posy = round_to_50mil(random.uniform(20, 150))175 width = 0.7 * 25.4176 height = 0.5 * 25.4177 item = Sexp('sheet')178 item.put_multiple('at', [posx, posy])179 item.put_multiple('size', [width, height])180 item.put_multiple('stroke', make_stroke())181 item.put_multiple('fill', make_fill())182 # put the inst name at the top of the sheet block183 prop = item.set_prop('Sheet name', inst_name)184 set_id(prop, 0)185 set_at(prop, posx, posy - 0.2, 0)186 set_effects(prop, 'bottom')187 # put the sheet filename at the bottom of the sheet block188 prop = item.set_prop('Sheet file', sheet.local_name)189 set_id(prop, 1)190 set_at(prop, posx, posy + height + 0.2, 0)191 set_effects(prop, 'top')192 return item193def at_move(exp, dx, dy):194 if isinstance(exp, Sexp):195 if keyeq(exp, 'at') and len(exp.list) >= 3:196 exp.list[1] += dx197 exp.list[2] += dy198 for subexp in exp:199 at_move(subexp, dx, dy)200def start_end_move(item, dx, dy):201 ret = []202 for elt in item:203 if keyeq(elt, 'start') or keyeq(elt, 'end'):204 elt = Sexp(elt.list[0], elts=[elt.list[1] + dx, elt.list[2] + dy])205 if isinstance(elt, Sexp):206 elt = start_end_move(elt, dx, dy)207 ret.append(elt)208 return Sexp(elts=ret)209class Sch(Sexp):210 def __init__(self):211 super().__init__()212 self.sheets = {}213 self.local_name = '(unnamed)'214 self.symbol_ref_to_path = {}215 def __str__(self):216 return f'<Sch {self.local_name} {id(self) & 0xffff:x}>'217 def find_sheet_item(self, sheet, inst_name):218 for item in self:219 if (keyeq(item, 'sheet')220 and item.get_prop('Sheet file') == sheet.local_name221 and item.get_prop('Sheet name') == inst_name):222 return item223 return None224 def get_sheet(self, sheet_spec):225 sheet = Sheet()226 sch_file = sheet_spec.get1('sch_file')227 sheet.sch = read_sch(sch_file)228 pcb_filename = os.path.splitext(sch_file)[0] + '.kicad_pcb'229 if os.path.isfile(pcb_filename):230 sheet.pcb = read_pcb(pcb_filename)231 sheet.local_name = sheet_spec.get1('local_name')232 if sheet.local_name is None:233 sheet.local_name = os.path.basename(sch_file)234 if is_simple(sheet.sch):235 sheet.item_prototype = make_sheet_item_prototype(sheet, 'dummy')236 src_path = sch_file237 elif is_just_sheet(sheet.sch):238 sheet.item_prototype = sheet.sch.assoc('sheet')239 src_path = os.path.join(os.path.dirname(sch_file), 240 sheet.item_prototype.get_prop('Sheet file'))241 sheet.sch = read_sch(src_path)242 else:243 print((f'can\'t handle top level {sch_file}\n'244 f'must have no sheets, or just 1 sheet all by itself'))245 sys.exit(1)246 # copy to local directory247 # TODO don't copy onto self248 with open(sheet.local_name, 'w') as outf:249 with open(src_path) as inf:250 outf.write(inf.read())251 self.sheets[sheet.local_name] = sheet252 return sheet253 def add_sheet(self, sheet_spec, inst_name):254 sheet = self.get_sheet(sheet_spec)255 local = self.find_sheet_item(sheet, inst_name)256 if local is None:257 local = make_sheet_item_prototype(sheet, inst_name)258 local.put1('uuid', sym(str(uuid4())))259 self.list.append(local)260 new = copy.copy(sheet.item_prototype)261 new.put1('uuid', local.get1('uuid'))262 263 (old_x, old_y) = local.get_multiple('at')264 (new_x, new_y) = new.get_multiple('at')265 dx = old_x - new_x266 dy = old_y - new_y267 at_move(new, dx, dy)268 new.set_prop('Sheet file', sheet.local_name)269 new.set_prop('Sheet name', inst_name)270 local.list = new.list271 def generate_sheet_instances(self):272 for _, sheet in self.sheets.items():273 sheet.insts = []274 sheet.page_num = None275 sheets_used = list()276 page_num = 1277 for item in self.list:278 if keyeq(item, 'sheet'):279 local_name = item.get_prop('Sheet file')280 sheet = self.sheets.get(local_name)281 if sheet:282 if sheet.page_num is None:283 page_num += 1284 sheet.page_num = page_num285 sheet.insts.append(item)286 si = Sexp()287 si.append(make_path('/', 1))288 for _, sheet in self.sheets.items():289 page_num = sheet.page_num290 for inst in sheet.insts:291 uuid = inst.get1('uuid')292 path = f'/{str(uuid)}/'293 si.append(make_path(path, page_num))294 page_num = None295 self.put_multiple('sheet_instances', si)296 def fixup_symbol_instances(self):297 sheet_uuids_used = set()298 for _, sheet in self.sheets.items():299 for sheet_inst in sheet.insts:300 sheet_inst_uuid = sheet_inst.get1('uuid')301 sheet_uuids_used.add(sheet_inst_uuid)302 symbol_instances = []303 for inst in self.get_multiple('symbol_instances'):304 if keyeq(inst, 'path'):305 parts = inst.cadr().split('/')306 if len(parts) <= 2:307 symbol_instances.append(inst)308 for _, sheet in self.sheets.items():309 for sheet_inst in sheet.insts:310 sheet_inst_uuid = sheet_inst.get1('uuid')311 sheet_inst_name = sheet_inst.get_prop('Sheet name')312 for item in sheet.sch:313 if keyeq(item, 'symbol'):314 uuid = item.get1('uuid')315 unit = item.get1('unit')316 raw_ref = item.get_prop('Reference')317 value = item.get_prop('Value')318 footprint = item.get_prop('Footprint')319 if unit is None:320 unit = 1321 path = f'/{sheet_inst_uuid}/{uuid}'322 sub_ref = re.sub('^[^:]*:', '', raw_ref)323 ref = f'{sheet_inst_name}:{sub_ref}'324 self.symbol_ref_to_path[ref] = path325 syminst = Sexp('path')326 syminst.append(path)327 syminst.put1('reference', ref)328 syminst.put1('unit', unit)329 syminst.put1('value', value)330 syminst.put1('footprint', footprint)331 symbol_instances.append(syminst)332 symbol_instances.sort(key=lambda item: item.get1('reference'))333 si = Sexp(elts=symbol_instances)334 self.put_multiple('symbol_instances', si)335 336 def filter_sheets(self):337 new = Sexp()338 for item in self:339 if keyeq(item, 'sheet'):340 local_name = item.get_prop('Sheet file')341 if local_name in self.sheets:342 new.append(item)343 else:344 new.append(item)345 self.list = new346def generate_pcb(outname, sch):347 out = read_pcb('empty.kicad_pcb')348 for _, sheet in sch.sheets.items():...

Full Screen

Full Screen

test_put_multiple_files_onto_remote_server.py

Source:test_put_multiple_files_onto_remote_server.py Github

copy

Full Screen

...6 """Test what happens when you ask to transfer a7 file which doesn't exist.8 """9 files = ["totes.not.fake.no.really", "another.fake"]10 assert mput.put_multiple(sftp, files) is False11 assert mput.ERROR_MESSAGE in capsys.readouterr().out12def test_best_case(sftp):13 """This tests what happens if you put a file that does exist on14 to the remote server15 """16 # Create file locally17 f = open('test.txt', 'w')18 f.write('testing')19 f.close()20 f = open('test2.txt', 'w')21 f.write('testing')22 f.close()23 files = ["test.txt", "test2.txt"]24 assert mput.put_multiple(sftp, files) is True25 # Remove local files26 os.remove('test.txt')27 os.remove('test2.txt')28 # Remove remote files29 sftp.remove('test.txt')30 sftp.remove('test2.txt')31def test_best_case_single(sftp):32 """This tests what happens if you put a file that does exist on33 to the remote server34 """35 # Create file locally36 f = open('test.txt', 'w')37 f.write('testing')38 f.close()39 files = ["test.txt"]40 assert mput.put_multiple(sftp, files) is True41 # Remove local files42 os.remove('test.txt')43 # Remove remote files44 sftp.remove('test.txt')45def test_some_valid_files(sftp):46 """This tests what happens if you put a file that does exist on47 to the remote server, but also one that doesn't exist48 """49 # Create file locally50 f = open('test.txt', 'w')51 f.write('testing')52 f.close()53 files = ["test.txt", "not.a.real.file"]54 assert mput.put_multiple(sftp, files) is True55 # Remove local files56 os.remove('test.txt')57 # Remove remote files58 sftp.remove('test.txt')59def test_mput_catches_exceptions(sftp, capsys, monkeypatch):60 """This test verifies that the put_multiple action correctly catches61 OSError exceptions that put might raise and continues on.62 """63 # List of files that should succeed64 goodFiles = ["mput_good", "mput_great"]65 # List of files to attempt to mput, in the pattern: badfile, goodfile, ...66 files = ["mput_bad", "mput_good", "mput_fail", "mput_great"]67 def mock_put(sftp: pysftp.Connection, filename: str):68 """A mocked put() function that prints good filenames so we can69 test against them and raises OSErrors for bad filenames so we can be70 sure they're caught."""71 if filename in goodFiles:72 print(filename)73 else:74 raise OSError("mock_put: filename = " + filename)75 monkeypatch.setattr(put, "put", mock_put)76 mput.put_multiple(sftp, files)77 output = capsys.readouterr().out78 for file in files:...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful