How to use Put method of client Package

Best Testkube code snippet using client.Put

cloudwatchlogs_test.go

Source:cloudwatchlogs_test.go Github

copy

Full Screen

...128 logStreamName: streamName,129 sequenceToken: aws.String(sequenceToken),130 }131 mockClient.putLogEventsResult <- &putLogEventsResult{132 successResult: &cloudwatchlogs.PutLogEventsOutput{133 NextSequenceToken: aws.String(nextSequenceToken),134 },135 }136 events := []wrappedEvent{137 {138 inputLogEvent: &cloudwatchlogs.InputLogEvent{139 Message: aws.String(logline),140 },141 },142 }143 stream.publishBatch(events)144 if stream.sequenceToken == nil {145 t.Fatal("Expected non-nil sequenceToken")146 }147 if *stream.sequenceToken != nextSequenceToken {148 t.Errorf("Expected sequenceToken to be %s, but was %s", nextSequenceToken, *stream.sequenceToken)149 }150 argument := <-mockClient.putLogEventsArgument151 if argument == nil {152 t.Fatal("Expected non-nil PutLogEventsInput")153 }154 if argument.SequenceToken == nil {155 t.Fatal("Expected non-nil PutLogEventsInput.SequenceToken")156 }157 if *argument.SequenceToken != sequenceToken {158 t.Errorf("Expected PutLogEventsInput.SequenceToken to be %s, but was %s", sequenceToken, *argument.SequenceToken)159 }160 if len(argument.LogEvents) != 1 {161 t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))162 }163 if argument.LogEvents[0] != events[0].inputLogEvent {164 t.Error("Expected event to equal input")165 }166}167func TestPublishBatchError(t *testing.T) {168 mockClient := newMockClient()169 stream := &logStream{170 client: mockClient,171 logGroupName: groupName,172 logStreamName: streamName,173 sequenceToken: aws.String(sequenceToken),174 }175 mockClient.putLogEventsResult <- &putLogEventsResult{176 errorResult: errors.New("Error!"),177 }178 events := []wrappedEvent{179 {180 inputLogEvent: &cloudwatchlogs.InputLogEvent{181 Message: aws.String(logline),182 },183 },184 }185 stream.publishBatch(events)186 if stream.sequenceToken == nil {187 t.Fatal("Expected non-nil sequenceToken")188 }189 if *stream.sequenceToken != sequenceToken {190 t.Errorf("Expected sequenceToken to be %s, but was %s", sequenceToken, *stream.sequenceToken)191 }192}193func TestPublishBatchInvalidSeqSuccess(t *testing.T) {194 mockClient := newMockClientBuffered(2)195 stream := &logStream{196 client: mockClient,197 logGroupName: groupName,198 logStreamName: streamName,199 sequenceToken: aws.String(sequenceToken),200 }201 mockClient.putLogEventsResult <- &putLogEventsResult{202 errorResult: awserr.New(invalidSequenceTokenCode, "use token token", nil),203 }204 mockClient.putLogEventsResult <- &putLogEventsResult{205 successResult: &cloudwatchlogs.PutLogEventsOutput{206 NextSequenceToken: aws.String(nextSequenceToken),207 },208 }209 events := []wrappedEvent{210 {211 inputLogEvent: &cloudwatchlogs.InputLogEvent{212 Message: aws.String(logline),213 },214 },215 }216 stream.publishBatch(events)217 if stream.sequenceToken == nil {218 t.Fatal("Expected non-nil sequenceToken")219 }220 if *stream.sequenceToken != nextSequenceToken {221 t.Errorf("Expected sequenceToken to be %s, but was %s", nextSequenceToken, *stream.sequenceToken)222 }223 argument := <-mockClient.putLogEventsArgument224 if argument == nil {225 t.Fatal("Expected non-nil PutLogEventsInput")226 }227 if argument.SequenceToken == nil {228 t.Fatal("Expected non-nil PutLogEventsInput.SequenceToken")229 }230 if *argument.SequenceToken != sequenceToken {231 t.Errorf("Expected PutLogEventsInput.SequenceToken to be %s, but was %s", sequenceToken, *argument.SequenceToken)232 }233 if len(argument.LogEvents) != 1 {234 t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))235 }236 if argument.LogEvents[0] != events[0].inputLogEvent {237 t.Error("Expected event to equal input")238 }239 argument = <-mockClient.putLogEventsArgument240 if argument == nil {241 t.Fatal("Expected non-nil PutLogEventsInput")242 }243 if argument.SequenceToken == nil {244 t.Fatal("Expected non-nil PutLogEventsInput.SequenceToken")245 }246 if *argument.SequenceToken != "token" {247 t.Errorf("Expected PutLogEventsInput.SequenceToken to be %s, but was %s", "token", *argument.SequenceToken)248 }249 if len(argument.LogEvents) != 1 {250 t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))251 }252 if argument.LogEvents[0] != events[0].inputLogEvent {253 t.Error("Expected event to equal input")254 }255}256func TestPublishBatchAlreadyAccepted(t *testing.T) {257 mockClient := newMockClient()258 stream := &logStream{259 client: mockClient,260 logGroupName: groupName,261 logStreamName: streamName,262 sequenceToken: aws.String(sequenceToken),263 }264 mockClient.putLogEventsResult <- &putLogEventsResult{265 errorResult: awserr.New(dataAlreadyAcceptedCode, "use token token", nil),266 }267 events := []wrappedEvent{268 {269 inputLogEvent: &cloudwatchlogs.InputLogEvent{270 Message: aws.String(logline),271 },272 },273 }274 stream.publishBatch(events)275 if stream.sequenceToken == nil {276 t.Fatal("Expected non-nil sequenceToken")277 }278 if *stream.sequenceToken != "token" {279 t.Errorf("Expected sequenceToken to be %s, but was %s", "token", *stream.sequenceToken)280 }281 argument := <-mockClient.putLogEventsArgument282 if argument == nil {283 t.Fatal("Expected non-nil PutLogEventsInput")284 }285 if argument.SequenceToken == nil {286 t.Fatal("Expected non-nil PutLogEventsInput.SequenceToken")287 }288 if *argument.SequenceToken != sequenceToken {289 t.Errorf("Expected PutLogEventsInput.SequenceToken to be %s, but was %s", sequenceToken, *argument.SequenceToken)290 }291 if len(argument.LogEvents) != 1 {292 t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))293 }294 if argument.LogEvents[0] != events[0].inputLogEvent {295 t.Error("Expected event to equal input")296 }297}298func TestCollectBatchSimple(t *testing.T) {299 mockClient := newMockClient()300 stream := &logStream{301 client: mockClient,302 logGroupName: groupName,303 logStreamName: streamName,304 sequenceToken: aws.String(sequenceToken),305 messages: make(chan *logger.Message),306 }307 mockClient.putLogEventsResult <- &putLogEventsResult{308 successResult: &cloudwatchlogs.PutLogEventsOutput{309 NextSequenceToken: aws.String(nextSequenceToken),310 },311 }312 ticks := make(chan time.Time)313 newTicker = func(_ time.Duration) *time.Ticker {314 return &time.Ticker{315 C: ticks,316 }317 }318 go stream.collectBatch()319 stream.Log(&logger.Message{320 Line: []byte(logline),321 Timestamp: time.Time{},322 })323 ticks <- time.Time{}324 stream.Close()325 argument := <-mockClient.putLogEventsArgument326 if argument == nil {327 t.Fatal("Expected non-nil PutLogEventsInput")328 }329 if len(argument.LogEvents) != 1 {330 t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))331 }332 if *argument.LogEvents[0].Message != logline {333 t.Errorf("Expected message to be %s but was %s", logline, *argument.LogEvents[0].Message)334 }335}336func TestCollectBatchTicker(t *testing.T) {337 mockClient := newMockClient()338 stream := &logStream{339 client: mockClient,340 logGroupName: groupName,341 logStreamName: streamName,342 sequenceToken: aws.String(sequenceToken),343 messages: make(chan *logger.Message),344 }345 mockClient.putLogEventsResult <- &putLogEventsResult{346 successResult: &cloudwatchlogs.PutLogEventsOutput{347 NextSequenceToken: aws.String(nextSequenceToken),348 },349 }350 ticks := make(chan time.Time)351 newTicker = func(_ time.Duration) *time.Ticker {352 return &time.Ticker{353 C: ticks,354 }355 }356 go stream.collectBatch()357 stream.Log(&logger.Message{358 Line: []byte(logline + " 1"),359 Timestamp: time.Time{},360 })361 stream.Log(&logger.Message{362 Line: []byte(logline + " 2"),363 Timestamp: time.Time{},364 })365 ticks <- time.Time{}366 // Verify first batch367 argument := <-mockClient.putLogEventsArgument368 if argument == nil {369 t.Fatal("Expected non-nil PutLogEventsInput")370 }371 if len(argument.LogEvents) != 2 {372 t.Errorf("Expected LogEvents to contain 2 elements, but contains %d", len(argument.LogEvents))373 }374 if *argument.LogEvents[0].Message != logline+" 1" {375 t.Errorf("Expected message to be %s but was %s", logline+" 1", *argument.LogEvents[0].Message)376 }377 if *argument.LogEvents[1].Message != logline+" 2" {378 t.Errorf("Expected message to be %s but was %s", logline+" 2", *argument.LogEvents[0].Message)379 }380 stream.Log(&logger.Message{381 Line: []byte(logline + " 3"),382 Timestamp: time.Time{},383 })384 ticks <- time.Time{}385 argument = <-mockClient.putLogEventsArgument386 if argument == nil {387 t.Fatal("Expected non-nil PutLogEventsInput")388 }389 if len(argument.LogEvents) != 1 {390 t.Errorf("Expected LogEvents to contain 1 elements, but contains %d", len(argument.LogEvents))391 }392 if *argument.LogEvents[0].Message != logline+" 3" {393 t.Errorf("Expected message to be %s but was %s", logline+" 3", *argument.LogEvents[0].Message)394 }395 stream.Close()396}397func TestCollectBatchClose(t *testing.T) {398 mockClient := newMockClient()399 stream := &logStream{400 client: mockClient,401 logGroupName: groupName,402 logStreamName: streamName,403 sequenceToken: aws.String(sequenceToken),404 messages: make(chan *logger.Message),405 }406 mockClient.putLogEventsResult <- &putLogEventsResult{407 successResult: &cloudwatchlogs.PutLogEventsOutput{408 NextSequenceToken: aws.String(nextSequenceToken),409 },410 }411 var ticks = make(chan time.Time)412 newTicker = func(_ time.Duration) *time.Ticker {413 return &time.Ticker{414 C: ticks,415 }416 }417 go stream.collectBatch()418 stream.Log(&logger.Message{419 Line: []byte(logline),420 Timestamp: time.Time{},421 })422 // no ticks423 stream.Close()424 argument := <-mockClient.putLogEventsArgument425 if argument == nil {426 t.Fatal("Expected non-nil PutLogEventsInput")427 }428 if len(argument.LogEvents) != 1 {429 t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))430 }431 if *argument.LogEvents[0].Message != logline {432 t.Errorf("Expected message to be %s but was %s", logline, *argument.LogEvents[0].Message)433 }434}435func TestCollectBatchLineSplit(t *testing.T) {436 mockClient := newMockClient()437 stream := &logStream{438 client: mockClient,439 logGroupName: groupName,440 logStreamName: streamName,441 sequenceToken: aws.String(sequenceToken),442 messages: make(chan *logger.Message),443 }444 mockClient.putLogEventsResult <- &putLogEventsResult{445 successResult: &cloudwatchlogs.PutLogEventsOutput{446 NextSequenceToken: aws.String(nextSequenceToken),447 },448 }449 var ticks = make(chan time.Time)450 newTicker = func(_ time.Duration) *time.Ticker {451 return &time.Ticker{452 C: ticks,453 }454 }455 go stream.collectBatch()456 longline := strings.Repeat("A", maximumBytesPerEvent)457 stream.Log(&logger.Message{458 Line: []byte(longline + "B"),459 Timestamp: time.Time{},460 })461 // no ticks462 stream.Close()463 argument := <-mockClient.putLogEventsArgument464 if argument == nil {465 t.Fatal("Expected non-nil PutLogEventsInput")466 }467 if len(argument.LogEvents) != 2 {468 t.Errorf("Expected LogEvents to contain 2 elements, but contains %d", len(argument.LogEvents))469 }470 if *argument.LogEvents[0].Message != longline {471 t.Errorf("Expected message to be %s but was %s", longline, *argument.LogEvents[0].Message)472 }473 if *argument.LogEvents[1].Message != "B" {474 t.Errorf("Expected message to be %s but was %s", "B", *argument.LogEvents[1].Message)475 }476}477func TestCollectBatchMaxEvents(t *testing.T) {478 mockClient := newMockClientBuffered(1)479 stream := &logStream{480 client: mockClient,481 logGroupName: groupName,482 logStreamName: streamName,483 sequenceToken: aws.String(sequenceToken),484 messages: make(chan *logger.Message),485 }486 mockClient.putLogEventsResult <- &putLogEventsResult{487 successResult: &cloudwatchlogs.PutLogEventsOutput{488 NextSequenceToken: aws.String(nextSequenceToken),489 },490 }491 var ticks = make(chan time.Time)492 newTicker = func(_ time.Duration) *time.Ticker {493 return &time.Ticker{494 C: ticks,495 }496 }497 go stream.collectBatch()498 line := "A"499 for i := 0; i <= maximumLogEventsPerPut; i++ {500 stream.Log(&logger.Message{501 Line: []byte(line),502 Timestamp: time.Time{},503 })504 }505 // no ticks506 stream.Close()507 argument := <-mockClient.putLogEventsArgument508 if argument == nil {509 t.Fatal("Expected non-nil PutLogEventsInput")510 }511 if len(argument.LogEvents) != maximumLogEventsPerPut {512 t.Errorf("Expected LogEvents to contain %d elements, but contains %d", maximumLogEventsPerPut, len(argument.LogEvents))513 }514 argument = <-mockClient.putLogEventsArgument515 if argument == nil {516 t.Fatal("Expected non-nil PutLogEventsInput")517 }518 if len(argument.LogEvents) != 1 {519 t.Errorf("Expected LogEvents to contain %d elements, but contains %d", 1, len(argument.LogEvents))520 }521}522func TestCollectBatchMaxTotalBytes(t *testing.T) {523 mockClient := newMockClientBuffered(1)524 stream := &logStream{525 client: mockClient,526 logGroupName: groupName,527 logStreamName: streamName,528 sequenceToken: aws.String(sequenceToken),529 messages: make(chan *logger.Message),530 }531 mockClient.putLogEventsResult <- &putLogEventsResult{532 successResult: &cloudwatchlogs.PutLogEventsOutput{533 NextSequenceToken: aws.String(nextSequenceToken),534 },535 }536 var ticks = make(chan time.Time)537 newTicker = func(_ time.Duration) *time.Ticker {538 return &time.Ticker{539 C: ticks,540 }541 }542 go stream.collectBatch()543 longline := strings.Repeat("A", maximumBytesPerPut)544 stream.Log(&logger.Message{545 Line: []byte(longline + "B"),546 Timestamp: time.Time{},547 })548 // no ticks549 stream.Close()550 argument := <-mockClient.putLogEventsArgument551 if argument == nil {552 t.Fatal("Expected non-nil PutLogEventsInput")553 }554 bytes := 0555 for _, event := range argument.LogEvents {556 bytes += len(*event.Message)557 }558 if bytes > maximumBytesPerPut {559 t.Errorf("Expected <= %d bytes but was %d", maximumBytesPerPut, bytes)560 }561 argument = <-mockClient.putLogEventsArgument562 if len(argument.LogEvents) != 1 {563 t.Errorf("Expected LogEvents to contain 1 elements, but contains %d", len(argument.LogEvents))564 }565 message := *argument.LogEvents[0].Message566 if message[len(message)-1:] != "B" {567 t.Errorf("Expected message to be %s but was %s", "B", message[len(message)-1:])568 }569}570func TestCollectBatchWithDuplicateTimestamps(t *testing.T) {571 mockClient := newMockClient()572 stream := &logStream{573 client: mockClient,574 logGroupName: groupName,575 logStreamName: streamName,576 sequenceToken: aws.String(sequenceToken),577 messages: make(chan *logger.Message),578 }579 mockClient.putLogEventsResult <- &putLogEventsResult{580 successResult: &cloudwatchlogs.PutLogEventsOutput{581 NextSequenceToken: aws.String(nextSequenceToken),582 },583 }584 ticks := make(chan time.Time)585 newTicker = func(_ time.Duration) *time.Ticker {586 return &time.Ticker{587 C: ticks,588 }589 }590 go stream.collectBatch()591 times := maximumLogEventsPerPut592 expectedEvents := []*cloudwatchlogs.InputLogEvent{}593 timestamp := time.Now()594 for i := 0; i < times; i++ {595 line := fmt.Sprintf("%d", i)596 if i%2 == 0 {597 timestamp.Add(1 * time.Nanosecond)598 }599 stream.Log(&logger.Message{600 Line: []byte(line),601 Timestamp: timestamp,602 })603 expectedEvents = append(expectedEvents, &cloudwatchlogs.InputLogEvent{604 Message: aws.String(line),605 Timestamp: aws.Int64(timestamp.UnixNano() / int64(time.Millisecond)),606 })607 }608 ticks <- time.Time{}609 stream.Close()610 argument := <-mockClient.putLogEventsArgument611 if argument == nil {612 t.Fatal("Expected non-nil PutLogEventsInput")613 }614 if len(argument.LogEvents) != times {615 t.Errorf("Expected LogEvents to contain %d elements, but contains %d", times, len(argument.LogEvents))616 }617 for i := 0; i < times; i++ {618 if !reflect.DeepEqual(*argument.LogEvents[i], *expectedEvents[i]) {619 t.Errorf("Expected event to be %v but was %v", *expectedEvents[i], *argument.LogEvents[i])620 }621 }622}623func TestCreateTagSuccess(t *testing.T) {624 mockClient := newMockClient()625 ctx := logger.Context{626 ContainerName: "/test-container",...

Full Screen

Full Screen

put_stream_client.go

Source:put_stream_client.go Github

copy

Full Screen

...20 "google.golang.org/grpc"21 "google.golang.org/grpc/codes"22 "google.golang.org/grpc/status"23)24type PutStreamClientService struct {25 filename string26 srcPath string27 destPath string28 port string29 nodelist string30 node string31 num int32 width int3233 uid uint3234 gid uint3235 filemod uint3236 modtime int6437 conn *grpc.ClientConn38 stream pb.RpcService_PutStreamClient39}40func NewPutStreamClientService(fp, dp, nodelist, port string, width int32) (*PutStreamClientService, error) {41 // 判断文件是否存在42 if !utils.Isfile(fp) {43 return nil, fmt.Errorf("[%s] not found", fp)44 }45 srcPath, err := filepath.Abs(fp)46 if err != nil {47 log.Error(err.Error())48 return nil, err49 }50 return &PutStreamClientService{51 filename: filepath.Base(fp),52 srcPath: srcPath,53 destPath: dp,54 nodelist: nodelist,55 width: width,56 port: port,57 }, nil58}59func (p *PutStreamClientService) SetFileInfo(uid, gid, filemod uint32, modtime int64) {60 p.uid = uid61 p.gid = gid62 p.filemod = filemod63 p.modtime = modtime64}65func (p *PutStreamClientService) GetSrcPath() string {66 return p.srcPath67}68func (p *PutStreamClientService) GetDestPath() string {69 return p.destPath70}71func (p *PutStreamClientService) GetPort() string {72 return p.port73}74func (p *PutStreamClientService) GetNodelist() string {75 return p.nodelist76}77func (p *PutStreamClientService) GetNodes() []string {78 return utils.ExpNodes(p.nodelist)79}80func (p *PutStreamClientService) GetAllNodelist() string {81 if p.nodelist != "" {82 return fmt.Sprintf("%s,%s", p.node, p.nodelist)83 } else {84 return p.node85 }86}87// 获取节点树宽度88func (p *PutStreamClientService) GetWidth() int32 {89 return p.width90}91// 获取grpc数据流92func (p *PutStreamClientService) GetStream() pb.RpcService_PutStreamClient {93 return p.stream94}95func (p *PutStreamClientService) CloseConn() {96 log.Debugf("close conn to node %s\n", p.node)97 p.conn.Close()98}99// 检查到目标节点的连接是否正常可用100func (p *PutStreamClientService) checkConn(ctx context.Context, node string, authority grpc.DialOption) (*grpc.ClientConn, pb.RpcService_PutStreamClient, error) {101 var waitc chan struct{} = make(chan struct{})102 var conn *grpc.ClientConn103 var stream pb.RpcService_PutStreamClient104 var err error105 tctx, tcancel := context.WithTimeout(context.Background(), 2*time.Second)106 defer tcancel()107 go func() {108 defer close(waitc)109 addr := fmt.Sprintf("%s:%s", node, p.port)110 conn, err = grpc.DialContext(ctx, addr, authority, global.ClientTransportCredentials)111 if err != nil {112 log.Error(err)113 return114 }115 client := pb.NewRpcServiceClient(conn)116 stream, err = client.PutStream(ctx)117 if err != nil {118 log.Error(err)119 return120 }121 log.Debugf("Gen client stream -> %s\n", addr)122 }()123 select {124 case <-tctx.Done():125 log.Errorf("connect timeout for %s\n", node)126 return nil, nil, status.Error(codes.DeadlineExceeded, "connect timeout")127 case <-waitc:128 if err != nil {129 return conn, stream, err130 }131 return conn, stream, err132 }133}134// 生成grpc流135func (p *PutStreamClientService) GenStreamWithContext(ctx context.Context, authority grpc.DialOption) ([]string, string, error) {136 nodes := utils.ExpNodes(p.nodelist)137 p.num = len(nodes)138 down := make([]string, 0)139 var conn *grpc.ClientConn140 var stream pb.RpcService_PutStreamClient141 var err error142 for i := 0; i < p.num; i++ {143 node := nodes[i]144 conn, stream, err = p.checkConn(ctx, node, authority)145 if err != nil {146 down = append(down, node)147 continue148 }149 p.node, p.conn, p.stream = node, conn, stream150 p.nodelist = utils.Merge(nodes[i+1 : p.num]...)151 break152 }153 // 只要有一个连接成功,那么err就会被赋值为nil,否则则是连接失败的错误154 // 故当err为错误的时候,所有节点都连接失败155 return down, p.node, err156}157// 发送数据158func (p *PutStreamClientService) Send(data []byte) error {159 putStreamReq := &pb.PutStreamReq{160 Name: p.filename,161 Md5: utils.Md5sum(data),162 Location: p.destPath,163 Body: data,164 Sn: utils.Hostname(),165 Node: p.node,166 Nodelist: p.nodelist,167 Port: p.port,168 Width: p.width,169 Uid: p.uid,170 Gid: p.gid,171 Filemod: p.filemod,172 Modtime: p.modtime,173 }174 return p.stream.Send(putStreamReq)175}176// 用于客户端,开启服务177func (p *PutStreamClientService) RunServe(ctx context.Context, buffer int) error {178 fp, err := os.Open(p.srcPath)179 if err != nil {180 return err181 }182 fi, err := fp.Stat()183 if err != nil {184 return err185 }186 p.modtime = fi.ModTime().Unix()187 p.filemod = uint32(fi.Mode().Perm())188 p.uid = fi.Sys().(*syscall.Stat_t).Uid189 p.gid = fi.Sys().(*syscall.Stat_t).Gid190 counts := int64(math.Ceil(float64(fi.Size()) / float64(int64(buffer))))191 cnt := 0192 log.Debug("Client Stream Serve ... ")193LOOP:194 for {195 select {196 case <-ctx.Done():197 fmt.Printf("\r数据读取: %d/%d %s\n", cnt, counts, log.ColorWrapper("CANCEL", log.Cancel))198 log.Debugf("\nCancel Client, cnt=[%d]\n", cnt)199 break LOOP200 default:201 bufferBytes := make([]byte, buffer)202 n, err := fp.Read(bufferBytes)203 if err == io.EOF && n == 0 {204 fmt.Printf("\r数据读取: %d/%d %s\n", cnt, counts, log.ColorWrapper("EOF", log.Success))205 log.Debugf("Read FILE EOF, cnt=[%d]\n", cnt)206 if err = p.stream.CloseSend(); err != nil {207 log.Errorf("close send failed, %v\n", err)208 }209 break LOOP210 }211 if err != nil {212 fmt.Printf("\r数据读取: %d/%d %s %s\n", cnt, counts, log.ColorWrapper("ERROR", log.Failed), log.ColorWrapper(err.Error(), log.Failed))213 return err214 }215 cnt++216 data := bufferBytes[0:n]217 if err = p.Send(data); err != nil {218 fmt.Printf("\r数据读取: %d/%d %s %s\n", cnt, counts, log.ColorWrapper("ERROR ==>", log.Failed), log.ColorWrapper(utils.GrpcErrorMsg(err), log.Failed))219 return err220 }221 fmt.Printf("\r数据读取: %d/%d\r", cnt, counts)222 }223 }224 return nil225}226func (p *PutStreamClientService) Gather(reply []*pb.Reply) {227 gather(reply)228}229// 把节点映射到map中,方便判断是否有该节点的响应230// 20W节点0.4s运行完毕231func hashNodesMap(nodes string) (sync.Map, error) {232 var resultSet sync.Map233 iter, err := nodeset.Yield(nodes)234 if err != nil {235 return resultSet, err236 }237 maxWorkers := runtime.NumCPU()238 nodeChannel := make(chan string, maxWorkers)239 var wg sync.WaitGroup240 wg.Add(maxWorkers)241 for i := 0; i < maxWorkers; i++ {242 go func() {243 defer wg.Done()244 for node := range nodeChannel {245 resultSet.Store(node, false)246 }247 }()248 }249 for iter.Next() {250 nodeChannel <- iter.Value()251 }252 close(nodeChannel)253 wg.Wait()254 return resultSet, nil255}256// 客户端服务257// 用于myclush258func PutStreamClientServiceSetup(ctx context.Context, cancel func(), localFile, destDir, nodes, buffer string, port, width int) {259 defer cancel()260 bufferSize, err := utils.ConvertSize(buffer)261 if err != nil {262 log.Error(err)263 return264 }265 // resultSet存储每个节点的是否有响应,用于在enter时输出当前没有拿到响应的节点列表266 resOriginMap, err := hashNodesMap(nodes)267 if err != nil {268 log.Error(err)269 return270 }271 // log.Info("hashNodesMap done")272 clientService, err := NewPutStreamClientService(localFile, destDir, nodes, strconv.Itoa(port), int32(width))273 if err != nil {274 log.Errorf("PutStreamClientService Failed, err=[%s]\n", err.Error())275 return276 }277 resps := make([]*pb.Reply, 0)278 down, node, err := clientService.GenStreamWithContext(ctx, global.Authority)279 if err != nil {280 log.Errorf("PutStreamClientService Failed, err=[%s]\n",281 status.Code(err).String())282 return283 }284 log.Debugf("Server Stream Client [%s] Setup Success", node)285 defer clientService.CloseConn()286 // 获取运行状态下未获取到响应的节点287 go func() {288 for {289 idle := make([]string, 0)290 stdinBuf := bufio.NewReaderSize(os.Stdin, 1)291 key, _ := stdinBuf.ReadByte()292 if key == 10 {293 // press enter294 resOriginMap.Range(func(key, value interface{}) bool {295 if !value.(bool) {296 idle = append(idle, key.(string))297 }298 return true299 })300 idleNodes := utils.Merge(idle...)301 fmt.Printf("\r等待结果: %s\n", idleNodes)302 }303 }304 }()305 cnt := 0306 downCnt := len(down)307 if downCnt > 0 {308 cnt += downCnt309 resps = append(resps, newReply(false, "connect failed", utils.Merge(down...)))310 fmt.Printf("\r结果汇总: %d/%d\r", cnt, clientService.num)311 for _, node := range down {312 resOriginMap.Store(node, true)313 }314 }315 var waitc sync.WaitGroup316 waitc.Add(1)317 go func() {318 defer waitc.Done()319 fmt.Printf("\r结果汇总: %d/%d", 0, clientService.num)320 LOOP:321 for {322 data, err := clientService.stream.Recv()323 switch err {324 case nil:325 for _, node := range utils.ExpNodes(data.Nodelist) {326 log.Debugf("接收响应: pass -> %t, node -> %s, msg -> %s\n", data.Pass, node, data.Msg)327 if value, ok := resOriginMap.Load(node); !ok {328 // 节点存在于resOriginMap329 if !value.(bool) {330 // 之前未接收到该节点的响应331 resOriginMap.Store(node, true)332 resps = append(resps, newReply(data.Pass, data.Msg, node))333 cnt++334 fmt.Printf("\r结果汇总: %d/%d", cnt, clientService.num)335 }336 } else {337 // 节点被确认不存在于resOriginMap338 resps = append(resps, newReply(data.Pass, data.Msg, node))339 cnt++340 fmt.Printf("\r结果汇总: %d/%d", cnt, clientService.num)341 }342 }343 case io.EOF:344 fmt.Printf("\r结果汇总: %d/%d %s\n", cnt, clientService.num, log.ColorWrapper("EOF", log.Success))345 break LOOP346 default:347 fmt.Printf("\r结果汇总: %d/%d %s %s\n", cnt, clientService.num, log.ColorWrapper("ERROR ==>", log.Failed),348 log.ColorWrapper(utils.GrpcErrorMsg(err), log.Failed))349 break LOOP350 }351 }352 }()353 err = clientService.RunServe(ctx, bufferSize)354 if err != nil {355 log.Errorf("PutStreamClientService Failed, err=[%v]\n", err)356 // 取消或者发送失败需要汇总错误信息357 cancel()358 return359 }360 log.Debug("PutStreamClientService Start Recv All Replies...")361 waitc.Wait()362 clientService.Gather(resps)363 log.Debug("PutStreamClientService Stop")364}...

Full Screen

Full Screen

kv.go

Source:kv.go Github

copy

Full Screen

...19type GetReply struct {20 Err Err21 Value string22}23type PutArgs struct {24 Key string25 Value string26}27type PutReply struct {28 Err Err29}30// server31type KV struct {32 mu sync.Mutex33 data map[string]string34}35func server() {36 kv := new(KV)37 kv.data = map[string]string{}38 rpcs := rpc.NewServer()39 rpcs.Register(kv)40 l, e := net.Listen("tcp", ":1234")41 if e != nil {42 log.Fatal("Server Listen Error: ", e)43 }44 go func() {45 for {46 conn, err := l.Accept()47 if err == nil {48 go rpcs.ServeConn(conn)49 } else {50 break51 }52 }53 l.Close()54 }()55}56func (kv *KV) Get(args *GetArgs, reply *GetReply) error {57 kv.mu.Lock()58 defer kv.mu.Unlock()59 val, ok := kv.data[args.Key]60 if ok {61 reply.Err = OK62 reply.Value = val63 } else {64 reply.Err = ErrNoKey65 reply.Value = ""66 }67 return nil68}69func (kv *KV) Put(args *PutArgs, reply *PutReply) error {70 kv.mu.Lock()71 defer kv.mu.Unlock()72 kv.data[args.Key] = args.Value73 reply.Err = OK74 return nil75}76// client77func connect() *rpc.Client {78 client, err := rpc.Dial("tcp", ":1234")79 if err != nil {80 log.Fatal("client dialing:", err)81 }82 return client83}84func get(key string) string {85 client := connect()86 args := GetArgs{key}87 reply := GetReply{}88 err := client.Call("KV.Get", &args, &reply)89 if err != nil {90 log.Fatal("client get:", err)91 }92 client.Close()93 return reply.Value94}95func put(key string, val string) {96 client := connect()97 args := PutArgs{key, val}98 reply := PutReply{}99 err := client.Call("KV.Put", &args, &reply)100 if err != nil {101 log.Fatal("client put:", err)102 }103 client.Close()104}105//main106func main() {107 server()108 put("topic", "distribute")109 fmt.Println("put(topic, distribute) done\n")110 fmt.Printf("get(topic) -> %s\n", get("topic"))111}...

Full Screen

Full Screen

Put

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 cli, err := clientv3.New(clientv3.Config{4 })5 if err != nil {6 }7 defer cli.Close()8 ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)9 _, err = cli.Put(ctx, "sample_key", "sample_value")10 cancel()11 if err != nil {12 }13}14import (15func main() {16 cli, err := clientv3.New(clientv3.Config{17 })18 if err != nil {19 }20 defer cli.Close()21 ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)22 resp, err := cli.Get(ctx, "sample_key")23 cancel()24 if err != nil {25 }26 for _, ev := range resp.Kvs {27 fmt.Printf("%s : %s28 }29}30import (31func main() {32 cli, err := clientv3.New(clientv3.Config{33 })34 if err != nil {35 }36 defer cli.Close()37 ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)38 _, err = cli.Delete(ctx, "sample_key")39 cancel()40 if err != nil {41 }42}43import (44func main() {45 cli, err := clientv3.New(clientv3.Config{

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Testkube automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful