Best Toxiproxy code snippet using stream.Write
writesched_priority_test.go
Source:writesched_priority_test.go
...7 "fmt"8 "sort"9 "testing"10)11func defaultPriorityWriteScheduler() *priorityWriteScheduler {12 return NewPriorityWriteScheduler(nil).(*priorityWriteScheduler)13}14func checkPriorityWellFormed(ws *priorityWriteScheduler) error {15 for id, n := range ws.nodes {16 if id != n.id {17 return fmt.Errorf("bad ws.nodes: ws.nodes[%d] = %d", id, n.id)18 }19 if n.parent == nil {20 if n.next != nil || n.prev != nil {21 return fmt.Errorf("bad node %d: nil parent but prev/next not nil", id)22 }23 continue24 }25 found := false26 for k := n.parent.kids; k != nil; k = k.next {27 if k.id == id {28 found = true29 break30 }31 }32 if !found {33 return fmt.Errorf("bad node %d: not found in parent %d kids list", id, n.parent.id)34 }35 }36 return nil37}38func fmtTree(ws *priorityWriteScheduler, fmtNode func(*priorityNode) string) string {39 var ids []int40 for _, n := range ws.nodes {41 ids = append(ids, int(n.id))42 }43 sort.Ints(ids)44 var buf bytes.Buffer45 for _, id := range ids {46 if buf.Len() != 0 {47 buf.WriteString(" ")48 }49 if id == 0 {50 buf.WriteString(fmtNode(&ws.root))51 } else {52 buf.WriteString(fmtNode(ws.nodes[uint32(id)]))53 }54 }55 return buf.String()56}57func fmtNodeParentSkipRoot(n *priorityNode) string {58 switch {59 case n.id == 0:60 return ""61 case n.parent == nil:62 return fmt.Sprintf("%d{parent:nil}", n.id)63 default:64 return fmt.Sprintf("%d{parent:%d}", n.id, n.parent.id)65 }66}67func fmtNodeWeightParentSkipRoot(n *priorityNode) string {68 switch {69 case n.id == 0:70 return ""71 case n.parent == nil:72 return fmt.Sprintf("%d{weight:%d,parent:nil}", n.id, n.weight)73 default:74 return fmt.Sprintf("%d{weight:%d,parent:%d}", n.id, n.weight, n.parent.id)75 }76}77func TestPriorityTwoStreams(t *testing.T) {78 ws := defaultPriorityWriteScheduler()79 ws.OpenStream(1, OpenStreamOptions{})80 ws.OpenStream(2, OpenStreamOptions{})81 want := "1{weight:15,parent:0} 2{weight:15,parent:0}"82 if got := fmtTree(ws, fmtNodeWeightParentSkipRoot); got != want {83 t.Errorf("After open\ngot %q\nwant %q", got, want)84 }85 // Move 1's parent to 2.86 ws.AdjustStream(1, PriorityParam{87 StreamDep: 2,88 Weight: 32,89 Exclusive: false,90 })91 want = "1{weight:32,parent:2} 2{weight:15,parent:0}"92 if got := fmtTree(ws, fmtNodeWeightParentSkipRoot); got != want {93 t.Errorf("After adjust\ngot %q\nwant %q", got, want)94 }95 if err := checkPriorityWellFormed(ws); err != nil {96 t.Error(err)97 }98}99func TestPriorityAdjustExclusiveZero(t *testing.T) {100 // 1, 2, and 3 are all children of the 0 stream.101 // Exclusive reprioritization to any of the streams should bring102 // the rest of the streams under the reprioritized stream.103 ws := defaultPriorityWriteScheduler()104 ws.OpenStream(1, OpenStreamOptions{})105 ws.OpenStream(2, OpenStreamOptions{})106 ws.OpenStream(3, OpenStreamOptions{})107 want := "1{weight:15,parent:0} 2{weight:15,parent:0} 3{weight:15,parent:0}"108 if got := fmtTree(ws, fmtNodeWeightParentSkipRoot); got != want {109 t.Errorf("After open\ngot %q\nwant %q", got, want)110 }111 ws.AdjustStream(2, PriorityParam{112 StreamDep: 0,113 Weight: 20,114 Exclusive: true,115 })116 want = "1{weight:15,parent:2} 2{weight:20,parent:0} 3{weight:15,parent:2}"117 if got := fmtTree(ws, fmtNodeWeightParentSkipRoot); got != want {118 t.Errorf("After adjust\ngot %q\nwant %q", got, want)119 }120 if err := checkPriorityWellFormed(ws); err != nil {121 t.Error(err)122 }123}124func TestPriorityAdjustOwnParent(t *testing.T) {125 // Assigning a node as its own parent should have no effect.126 ws := defaultPriorityWriteScheduler()127 ws.OpenStream(1, OpenStreamOptions{})128 ws.OpenStream(2, OpenStreamOptions{})129 ws.AdjustStream(2, PriorityParam{130 StreamDep: 2,131 Weight: 20,132 Exclusive: true,133 })134 want := "1{weight:15,parent:0} 2{weight:15,parent:0}"135 if got := fmtTree(ws, fmtNodeWeightParentSkipRoot); got != want {136 t.Errorf("After adjust\ngot %q\nwant %q", got, want)137 }138 if err := checkPriorityWellFormed(ws); err != nil {139 t.Error(err)140 }141}142func TestPriorityClosedStreams(t *testing.T) {143 ws := NewPriorityWriteScheduler(&PriorityWriteSchedulerConfig{MaxClosedNodesInTree: 2}).(*priorityWriteScheduler)144 ws.OpenStream(1, OpenStreamOptions{})145 ws.OpenStream(2, OpenStreamOptions{PusherID: 1})146 ws.OpenStream(3, OpenStreamOptions{PusherID: 2})147 ws.OpenStream(4, OpenStreamOptions{PusherID: 3})148 // Close the first three streams. We lose 1, but keep 2 and 3.149 ws.CloseStream(1)150 ws.CloseStream(2)151 ws.CloseStream(3)152 want := "2{weight:15,parent:0} 3{weight:15,parent:2} 4{weight:15,parent:3}"153 if got := fmtTree(ws, fmtNodeWeightParentSkipRoot); got != want {154 t.Errorf("After close\ngot %q\nwant %q", got, want)155 }156 if err := checkPriorityWellFormed(ws); err != nil {157 t.Error(err)158 }159 // Adding a stream as an exclusive child of 1 gives it default160 // priorities, since 1 is gone.161 ws.OpenStream(5, OpenStreamOptions{})162 ws.AdjustStream(5, PriorityParam{StreamDep: 1, Weight: 15, Exclusive: true})163 // Adding a stream as an exclusive child of 2 should work, since 2 is not gone.164 ws.OpenStream(6, OpenStreamOptions{})165 ws.AdjustStream(6, PriorityParam{StreamDep: 2, Weight: 15, Exclusive: true})166 want = "2{weight:15,parent:0} 3{weight:15,parent:6} 4{weight:15,parent:3} 5{weight:15,parent:0} 6{weight:15,parent:2}"167 if got := fmtTree(ws, fmtNodeWeightParentSkipRoot); got != want {168 t.Errorf("After add streams\ngot %q\nwant %q", got, want)169 }170 if err := checkPriorityWellFormed(ws); err != nil {171 t.Error(err)172 }173}174func TestPriorityClosedStreamsDisabled(t *testing.T) {175 ws := NewPriorityWriteScheduler(&PriorityWriteSchedulerConfig{}).(*priorityWriteScheduler)176 ws.OpenStream(1, OpenStreamOptions{})177 ws.OpenStream(2, OpenStreamOptions{PusherID: 1})178 ws.OpenStream(3, OpenStreamOptions{PusherID: 2})179 // Close the first two streams. We keep only 3.180 ws.CloseStream(1)181 ws.CloseStream(2)182 want := "3{weight:15,parent:0}"183 if got := fmtTree(ws, fmtNodeWeightParentSkipRoot); got != want {184 t.Errorf("After close\ngot %q\nwant %q", got, want)185 }186 if err := checkPriorityWellFormed(ws); err != nil {187 t.Error(err)188 }189}190func TestPriorityIdleStreams(t *testing.T) {191 ws := NewPriorityWriteScheduler(&PriorityWriteSchedulerConfig{MaxIdleNodesInTree: 2}).(*priorityWriteScheduler)192 ws.AdjustStream(1, PriorityParam{StreamDep: 0, Weight: 15}) // idle193 ws.AdjustStream(2, PriorityParam{StreamDep: 0, Weight: 15}) // idle194 ws.AdjustStream(3, PriorityParam{StreamDep: 2, Weight: 20}) // idle195 ws.OpenStream(4, OpenStreamOptions{})196 ws.OpenStream(5, OpenStreamOptions{})197 ws.OpenStream(6, OpenStreamOptions{})198 ws.AdjustStream(4, PriorityParam{StreamDep: 1, Weight: 15})199 ws.AdjustStream(5, PriorityParam{StreamDep: 2, Weight: 15})200 ws.AdjustStream(6, PriorityParam{StreamDep: 3, Weight: 15})201 want := "2{weight:15,parent:0} 3{weight:20,parent:2} 4{weight:15,parent:0} 5{weight:15,parent:2} 6{weight:15,parent:3}"202 if got := fmtTree(ws, fmtNodeWeightParentSkipRoot); got != want {203 t.Errorf("After open\ngot %q\nwant %q", got, want)204 }205 if err := checkPriorityWellFormed(ws); err != nil {206 t.Error(err)207 }208}209func TestPriorityIdleStreamsDisabled(t *testing.T) {210 ws := NewPriorityWriteScheduler(&PriorityWriteSchedulerConfig{}).(*priorityWriteScheduler)211 ws.AdjustStream(1, PriorityParam{StreamDep: 0, Weight: 15}) // idle212 ws.AdjustStream(2, PriorityParam{StreamDep: 0, Weight: 15}) // idle213 ws.AdjustStream(3, PriorityParam{StreamDep: 2, Weight: 20}) // idle214 ws.OpenStream(4, OpenStreamOptions{})215 want := "4{weight:15,parent:0}"216 if got := fmtTree(ws, fmtNodeWeightParentSkipRoot); got != want {217 t.Errorf("After open\ngot %q\nwant %q", got, want)218 }219 if err := checkPriorityWellFormed(ws); err != nil {220 t.Error(err)221 }222}223func TestPrioritySection531NonExclusive(t *testing.T) {224 // Example from RFC 7540 Section 5.3.1.225 // A,B,C,D = 1,2,3,4226 ws := defaultPriorityWriteScheduler()227 ws.OpenStream(1, OpenStreamOptions{})228 ws.OpenStream(2, OpenStreamOptions{PusherID: 1})229 ws.OpenStream(3, OpenStreamOptions{PusherID: 1})230 ws.OpenStream(4, OpenStreamOptions{})231 ws.AdjustStream(4, PriorityParam{232 StreamDep: 1,233 Weight: 15,234 Exclusive: false,235 })236 want := "1{parent:0} 2{parent:1} 3{parent:1} 4{parent:1}"237 if got := fmtTree(ws, fmtNodeParentSkipRoot); got != want {238 t.Errorf("After adjust\ngot %q\nwant %q", got, want)239 }240 if err := checkPriorityWellFormed(ws); err != nil {241 t.Error(err)242 }243}244func TestPrioritySection531Exclusive(t *testing.T) {245 // Example from RFC 7540 Section 5.3.1.246 // A,B,C,D = 1,2,3,4247 ws := defaultPriorityWriteScheduler()248 ws.OpenStream(1, OpenStreamOptions{})249 ws.OpenStream(2, OpenStreamOptions{PusherID: 1})250 ws.OpenStream(3, OpenStreamOptions{PusherID: 1})251 ws.OpenStream(4, OpenStreamOptions{})252 ws.AdjustStream(4, PriorityParam{253 StreamDep: 1,254 Weight: 15,255 Exclusive: true,256 })257 want := "1{parent:0} 2{parent:4} 3{parent:4} 4{parent:1}"258 if got := fmtTree(ws, fmtNodeParentSkipRoot); got != want {259 t.Errorf("After adjust\ngot %q\nwant %q", got, want)260 }261 if err := checkPriorityWellFormed(ws); err != nil {262 t.Error(err)263 }264}265func makeSection533Tree() *priorityWriteScheduler {266 // Initial tree from RFC 7540 Section 5.3.3.267 // A,B,C,D,E,F = 1,2,3,4,5,6268 ws := defaultPriorityWriteScheduler()269 ws.OpenStream(1, OpenStreamOptions{})270 ws.OpenStream(2, OpenStreamOptions{PusherID: 1})271 ws.OpenStream(3, OpenStreamOptions{PusherID: 1})272 ws.OpenStream(4, OpenStreamOptions{PusherID: 3})273 ws.OpenStream(5, OpenStreamOptions{PusherID: 3})274 ws.OpenStream(6, OpenStreamOptions{PusherID: 4})275 return ws276}277func TestPrioritySection533NonExclusive(t *testing.T) {278 // Example from RFC 7540 Section 5.3.3.279 // A,B,C,D,E,F = 1,2,3,4,5,6280 ws := defaultPriorityWriteScheduler()281 ws.OpenStream(1, OpenStreamOptions{})282 ws.OpenStream(2, OpenStreamOptions{PusherID: 1})283 ws.OpenStream(3, OpenStreamOptions{PusherID: 1})284 ws.OpenStream(4, OpenStreamOptions{PusherID: 3})285 ws.OpenStream(5, OpenStreamOptions{PusherID: 3})286 ws.OpenStream(6, OpenStreamOptions{PusherID: 4})287 ws.AdjustStream(1, PriorityParam{288 StreamDep: 4,289 Weight: 15,290 Exclusive: false,291 })292 want := "1{parent:4} 2{parent:1} 3{parent:1} 4{parent:0} 5{parent:3} 6{parent:4}"293 if got := fmtTree(ws, fmtNodeParentSkipRoot); got != want {294 t.Errorf("After adjust\ngot %q\nwant %q", got, want)295 }296 if err := checkPriorityWellFormed(ws); err != nil {297 t.Error(err)298 }299}300func TestPrioritySection533Exclusive(t *testing.T) {301 // Example from RFC 7540 Section 5.3.3.302 // A,B,C,D,E,F = 1,2,3,4,5,6303 ws := defaultPriorityWriteScheduler()304 ws.OpenStream(1, OpenStreamOptions{})305 ws.OpenStream(2, OpenStreamOptions{PusherID: 1})306 ws.OpenStream(3, OpenStreamOptions{PusherID: 1})307 ws.OpenStream(4, OpenStreamOptions{PusherID: 3})308 ws.OpenStream(5, OpenStreamOptions{PusherID: 3})309 ws.OpenStream(6, OpenStreamOptions{PusherID: 4})310 ws.AdjustStream(1, PriorityParam{311 StreamDep: 4,312 Weight: 15,313 Exclusive: true,314 })315 want := "1{parent:4} 2{parent:1} 3{parent:1} 4{parent:0} 5{parent:3} 6{parent:1}"316 if got := fmtTree(ws, fmtNodeParentSkipRoot); got != want {317 t.Errorf("After adjust\ngot %q\nwant %q", got, want)318 }319 if err := checkPriorityWellFormed(ws); err != nil {320 t.Error(err)321 }322}323func checkPopAll(ws WriteScheduler, order []uint32) error {324 for k, id := range order {325 wr, ok := ws.Pop()326 if !ok {327 return fmt.Errorf("Pop[%d]: got ok=false, want %d (order=%v)", k, id, order)328 }329 if got := wr.StreamID(); got != id {330 return fmt.Errorf("Pop[%d]: got %v, want %d (order=%v)", k, got, id, order)331 }332 }333 wr, ok := ws.Pop()334 if ok {335 return fmt.Errorf("Pop[%d]: got %v, want ok=false (order=%v)", len(order), wr.StreamID(), order)336 }337 return nil338}339func TestPriorityPopFrom533Tree(t *testing.T) {340 ws := makeSection533Tree()341 ws.Push(makeWriteHeadersRequest(3 /*C*/))342 ws.Push(makeWriteNonStreamRequest())343 ws.Push(makeWriteHeadersRequest(5 /*E*/))344 ws.Push(makeWriteHeadersRequest(1 /*A*/))345 t.Log("tree:", fmtTree(ws, fmtNodeParentSkipRoot))346 if err := checkPopAll(ws, []uint32{0 /*NonStream*/, 1, 3, 5}); err != nil {347 t.Error(err)348 }349}350func TestPriorityPopFromLinearTree(t *testing.T) {351 ws := defaultPriorityWriteScheduler()352 ws.OpenStream(1, OpenStreamOptions{})353 ws.OpenStream(2, OpenStreamOptions{PusherID: 1})354 ws.OpenStream(3, OpenStreamOptions{PusherID: 2})355 ws.OpenStream(4, OpenStreamOptions{PusherID: 3})356 ws.Push(makeWriteHeadersRequest(3))357 ws.Push(makeWriteHeadersRequest(4))358 ws.Push(makeWriteHeadersRequest(1))359 ws.Push(makeWriteHeadersRequest(2))360 ws.Push(makeWriteNonStreamRequest())361 ws.Push(makeWriteNonStreamRequest())362 t.Log("tree:", fmtTree(ws, fmtNodeParentSkipRoot))363 if err := checkPopAll(ws, []uint32{0, 0 /*NonStreams*/, 1, 2, 3, 4}); err != nil {364 t.Error(err)365 }366}367func TestPriorityFlowControl(t *testing.T) {368 ws := NewPriorityWriteScheduler(&PriorityWriteSchedulerConfig{ThrottleOutOfOrderWrites: false})369 ws.OpenStream(1, OpenStreamOptions{})370 ws.OpenStream(2, OpenStreamOptions{PusherID: 1})371 sc := &serverConn{maxFrameSize: 16}372 st1 := &stream{id: 1, sc: sc}373 st2 := &stream{id: 2, sc: sc}374 ws.Push(FrameWriteRequest{&writeData{1, make([]byte, 16), false}, st1, nil})375 ws.Push(FrameWriteRequest{&writeData{2, make([]byte, 16), false}, st2, nil})376 ws.AdjustStream(2, PriorityParam{StreamDep: 1})377 // No flow-control bytes available.378 if wr, ok := ws.Pop(); ok {379 t.Fatalf("Pop(limited by flow control)=%v,true, want false", wr)380 }381 // Add enough flow-control bytes to write st2 in two Pop calls.382 // Should write data from st2 even though it's lower priority than st1.383 for i := 1; i <= 2; i++ {384 st2.flow.add(8)385 wr, ok := ws.Pop()386 if !ok {387 t.Fatalf("Pop(%d)=false, want true", i)388 }389 if got, want := wr.DataSize(), 8; got != want {390 t.Fatalf("Pop(%d)=%d bytes, want %d bytes", i, got, want)391 }392 }393}394func TestPriorityThrottleOutOfOrderWrites(t *testing.T) {395 ws := NewPriorityWriteScheduler(&PriorityWriteSchedulerConfig{ThrottleOutOfOrderWrites: true})396 ws.OpenStream(1, OpenStreamOptions{})397 ws.OpenStream(2, OpenStreamOptions{PusherID: 1})398 sc := &serverConn{maxFrameSize: 4096}399 st1 := &stream{id: 1, sc: sc}400 st2 := &stream{id: 2, sc: sc}401 st1.flow.add(4096)402 st2.flow.add(4096)403 ws.Push(FrameWriteRequest{&writeData{2, make([]byte, 4096), false}, st2, nil})404 ws.AdjustStream(2, PriorityParam{StreamDep: 1})405 // We have enough flow-control bytes to write st2 in a single Pop call.406 // However, due to out-of-order write throttling, the first call should407 // only write 1KB.408 wr, ok := ws.Pop()409 if !ok {410 t.Fatalf("Pop(st2.first)=false, want true")411 }412 if got, want := wr.StreamID(), uint32(2); got != want {413 t.Fatalf("Pop(st2.first)=stream %d, want stream %d", got, want)414 }415 if got, want := wr.DataSize(), 1024; got != want {416 t.Fatalf("Pop(st2.first)=%d bytes, want %d bytes", got, want)417 }418 // Now add data on st1. This should take precedence.419 ws.Push(FrameWriteRequest{&writeData{1, make([]byte, 4096), false}, st1, nil})420 wr, ok = ws.Pop()421 if !ok {422 t.Fatalf("Pop(st1)=false, want true")423 }424 if got, want := wr.StreamID(), uint32(1); got != want {425 t.Fatalf("Pop(st1)=stream %d, want stream %d", got, want)426 }427 if got, want := wr.DataSize(), 4096; got != want {428 t.Fatalf("Pop(st1)=%d bytes, want %d bytes", got, want)429 }430 // Should go back to writing 1KB from st2.431 wr, ok = ws.Pop()432 if !ok {433 t.Fatalf("Pop(st2.last)=false, want true")434 }435 if got, want := wr.StreamID(), uint32(2); got != want {436 t.Fatalf("Pop(st2.last)=stream %d, want stream %d", got, want)437 }438 if got, want := wr.DataSize(), 1024; got != want {439 t.Fatalf("Pop(st2.last)=%d bytes, want %d bytes", got, want)440 }441}442func TestPriorityWeights(t *testing.T) {443 ws := defaultPriorityWriteScheduler()444 ws.OpenStream(1, OpenStreamOptions{})445 ws.OpenStream(2, OpenStreamOptions{})446 sc := &serverConn{maxFrameSize: 8}447 st1 := &stream{id: 1, sc: sc}448 st2 := &stream{id: 2, sc: sc}449 st1.flow.add(40)450 st2.flow.add(40)451 ws.Push(FrameWriteRequest{&writeData{1, make([]byte, 40), false}, st1, nil})452 ws.Push(FrameWriteRequest{&writeData{2, make([]byte, 40), false}, st2, nil})453 ws.AdjustStream(1, PriorityParam{StreamDep: 0, Weight: 34})454 ws.AdjustStream(2, PriorityParam{StreamDep: 0, Weight: 9})455 // st1 gets 3.5x the bandwidth of st2 (3.5 = (34+1)/(9+1)).456 // The maximum frame size is 8 bytes. The write sequence should be:457 // st1, total bytes so far is (st1=8, st=0)458 // st2, total bytes so far is (st1=8, st=8)459 // st1, total bytes so far is (st1=16, st=8)460 // st1, total bytes so far is (st1=24, st=8) // 3x bandwidth461 // st1, total bytes so far is (st1=32, st=8) // 4x bandwidth462 // st2, total bytes so far is (st1=32, st=16) // 2x bandwidth463 // st1, total bytes so far is (st1=40, st=16)464 // st2, total bytes so far is (st1=40, st=24)465 // st2, total bytes so far is (st1=40, st=32)466 // st2, total bytes so far is (st1=40, st=40)467 if err := checkPopAll(ws, []uint32{1, 2, 1, 1, 1, 2, 1, 2, 2, 2}); err != nil {468 t.Error(err)469 }470}471func TestPriorityRstStreamOnNonOpenStreams(t *testing.T) {472 ws := NewPriorityWriteScheduler(&PriorityWriteSchedulerConfig{473 MaxClosedNodesInTree: 0,474 MaxIdleNodesInTree: 0,475 })476 ws.OpenStream(1, OpenStreamOptions{})477 ws.CloseStream(1)478 ws.Push(FrameWriteRequest{write: streamError(1, ErrCodeProtocol)})479 ws.Push(FrameWriteRequest{write: streamError(2, ErrCodeProtocol)})480 if err := checkPopAll(ws, []uint32{1, 2}); err != nil {481 t.Error(err)482 }483}...
writesched.go
Source:writesched.go
2// Use of this source code is governed by a BSD-style3// license that can be found in the LICENSE file.4package http25import "fmt"6// WriteScheduler is the interface implemented by HTTP/2 write schedulers.7// Methods are never called concurrently.8type WriteScheduler interface {9 // OpenStream opens a new stream in the write scheduler.10 // It is illegal to call this with streamID=0 or with a streamID that is11 // already open -- the call may panic.12 OpenStream(streamID uint32, options OpenStreamOptions)13 // CloseStream closes a stream in the write scheduler. Any frames queued on14 // this stream should be discarded. It is illegal to call this on a stream15 // that is not open -- the call may panic.16 CloseStream(streamID uint32)17 // AdjustStream adjusts the priority of the given stream. This may be called18 // on a stream that has not yet been opened or has been closed. Note that19 // RFC 7540 allows PRIORITY frames to be sent on streams in any state. See:20 // https://tools.ietf.org/html/rfc7540#section-5.121 AdjustStream(streamID uint32, priority PriorityParam)22 // Push queues a frame in the scheduler. In most cases, this will not be23 // called with wr.StreamID()!=0 unless that stream is currently open. The one24 // exception is RST_STREAM frames, which may be sent on idle or closed streams.25 Push(wr FrameWriteRequest)26 // Pop dequeues the next frame to write. Returns false if no frames can27 // be written. Frames with a given wr.StreamID() are Pop'd in the same28 // order they are Push'd. No frames should be discarded except by CloseStream.29 Pop() (wr FrameWriteRequest, ok bool)30}31// OpenStreamOptions specifies extra options for WriteScheduler.OpenStream.32type OpenStreamOptions struct {33 // PusherID is zero if the stream was initiated by the client. Otherwise,34 // PusherID names the stream that pushed the newly opened stream.35 PusherID uint3236}37// FrameWriteRequest is a request to write a frame.38type FrameWriteRequest struct {39 // write is the interface value that does the writing, once the40 // WriteScheduler has selected this frame to write. The write41 // functions are all defined in write.go.42 write writeFramer43 // stream is the stream on which this frame will be written.44 // nil for non-stream frames like PING and SETTINGS.45 stream *stream46 // done, if non-nil, must be a buffered channel with space for47 // 1 message and is sent the return value from write (or an48 // earlier error) when the frame has been written.49 done chan error50}51// StreamID returns the id of the stream this frame will be written to.52// 0 is used for non-stream frames such as PING and SETTINGS.53func (wr FrameWriteRequest) StreamID() uint32 {54 if wr.stream == nil {55 if se, ok := wr.write.(StreamError); ok {56 // (*serverConn).resetStream doesn't set57 // stream because it doesn't necessarily have58 // one. So special case this type of write59 // message.60 return se.StreamID61 }62 return 063 }64 return wr.stream.id65}66// isControl reports whether wr is a control frame for MaxQueuedControlFrames67// purposes. That includes non-stream frames and RST_STREAM frames.68func (wr FrameWriteRequest) isControl() bool {69 return wr.stream == nil70}71// DataSize returns the number of flow control bytes that must be consumed72// to write this entire frame. This is 0 for non-DATA frames.73func (wr FrameWriteRequest) DataSize() int {74 if wd, ok := wr.write.(*writeData); ok {75 return len(wd.p)76 }77 return 078}79// Consume consumes min(n, available) bytes from this frame, where available80// is the number of flow control bytes available on the stream. Consume returns81// 0, 1, or 2 frames, where the integer return value gives the number of frames82// returned.83//84// If flow control prevents consuming any bytes, this returns (_, _, 0). If85// the entire frame was consumed, this returns (wr, _, 1). Otherwise, this86// returns (consumed, rest, 2), where 'consumed' contains the consumed bytes and87// 'rest' contains the remaining bytes. The consumed bytes are deducted from the88// underlying stream's flow control budget.89func (wr FrameWriteRequest) Consume(n int32) (FrameWriteRequest, FrameWriteRequest, int) {90 var empty FrameWriteRequest91 // Non-DATA frames are always consumed whole.92 wd, ok := wr.write.(*writeData)93 if !ok || len(wd.p) == 0 {94 return wr, empty, 195 }96 // Might need to split after applying limits.97 allowed := wr.stream.flow.available()98 if n < allowed {99 allowed = n100 }101 if wr.stream.sc.maxFrameSize < allowed {102 allowed = wr.stream.sc.maxFrameSize103 }104 if allowed <= 0 {105 return empty, empty, 0106 }107 if len(wd.p) > int(allowed) {108 wr.stream.flow.take(allowed)109 consumed := FrameWriteRequest{110 stream: wr.stream,111 write: &writeData{112 streamID: wd.streamID,113 p: wd.p[:allowed],114 // Even if the original had endStream set, there115 // are bytes remaining because len(wd.p) > allowed,116 // so we know endStream is false.117 endStream: false,118 },119 // Our caller is blocking on the final DATA frame, not120 // this intermediate frame, so no need to wait.121 done: nil,122 }123 rest := FrameWriteRequest{124 stream: wr.stream,125 write: &writeData{126 streamID: wd.streamID,127 p: wd.p[allowed:],128 endStream: wd.endStream,129 },130 done: wr.done,131 }132 return consumed, rest, 2133 }134 // The frame is consumed whole.135 // NB: This cast cannot overflow because allowed is <= math.MaxInt32.136 wr.stream.flow.take(int32(len(wd.p)))137 return wr, empty, 1138}139// String is for debugging only.140func (wr FrameWriteRequest) String() string {141 var des string142 if s, ok := wr.write.(fmt.Stringer); ok {143 des = s.String()144 } else {145 des = fmt.Sprintf("%T", wr.write)146 }147 return fmt.Sprintf("[FrameWriteRequest stream=%d, ch=%v, writer=%v]", wr.StreamID(), wr.done != nil, des)148}149// replyToWriter sends err to wr.done and panics if the send must block150// This does nothing if wr.done is nil.151func (wr *FrameWriteRequest) replyToWriter(err error) {152 if wr.done == nil {153 return154 }155 select {156 case wr.done <- err:157 default:158 panic(fmt.Sprintf("unbuffered done channel passed in for type %T", wr.write))159 }160 wr.write = nil // prevent use (assume it's tainted after wr.done send)161}162// writeQueue is used by implementations of WriteScheduler.163type writeQueue struct {164 s []FrameWriteRequest165}166func (q *writeQueue) empty() bool { return len(q.s) == 0 }167func (q *writeQueue) push(wr FrameWriteRequest) {168 q.s = append(q.s, wr)169}170func (q *writeQueue) shift() FrameWriteRequest {171 if len(q.s) == 0 {172 panic("invalid use of queue")173 }174 wr := q.s[0]175 // TODO: less copy-happy queue.176 copy(q.s, q.s[1:])177 q.s[len(q.s)-1] = FrameWriteRequest{}178 q.s = q.s[:len(q.s)-1]179 return wr180}181// consume consumes up to n bytes from q.s[0]. If the frame is182// entirely consumed, it is removed from the queue. If the frame183// is partially consumed, the frame is kept with the consumed184// bytes removed. Returns true iff any bytes were consumed.185func (q *writeQueue) consume(n int32) (FrameWriteRequest, bool) {186 if len(q.s) == 0 {187 return FrameWriteRequest{}, false188 }189 consumed, rest, numresult := q.s[0].Consume(n)190 switch numresult {191 case 0:192 return FrameWriteRequest{}, false193 case 1:194 q.shift()195 case 2:196 q.s[0] = rest197 }198 return consumed, true199}200type writeQueuePool []*writeQueue201// put inserts an unused writeQueue into the pool.202func (p *writeQueuePool) put(q *writeQueue) {203 for i := range q.s {204 q.s[i] = FrameWriteRequest{}205 }206 q.s = q.s[:0]207 *p = append(*p, q)208}209// get returns an empty writeQueue.210func (p *writeQueuePool) get() *writeQueue {211 ln := len(*p)212 if ln == 0 {213 return new(writeQueue)214 }215 x := ln - 1216 q := (*p)[x]217 (*p)[x] = nil218 *p = (*p)[:x]...
writesched_test.go
Source:writesched_test.go
...7 "math"8 "reflect"9 "testing"10)11func makeWriteNonStreamRequest() FrameWriteRequest {12 return FrameWriteRequest{writeSettingsAck{}, nil, nil}13}14func makeWriteHeadersRequest(streamID uint32) FrameWriteRequest {15 st := &stream{id: streamID}16 return FrameWriteRequest{&writeResHeaders{streamID: streamID, httpResCode: 200}, st, nil}17}18func makeHandlerPanicRST(streamID uint32) FrameWriteRequest {19 st := &stream{id: streamID}20 return FrameWriteRequest{&handlerPanicRST{StreamID: streamID}, st, nil}21}22func checkConsume(wr FrameWriteRequest, nbytes int32, want []FrameWriteRequest) error {23 consumed, rest, n := wr.Consume(nbytes)24 var wantConsumed, wantRest FrameWriteRequest25 switch len(want) {26 case 0:27 case 1:28 wantConsumed = want[0]29 case 2:30 wantConsumed = want[0]31 wantRest = want[1]32 }33 if !reflect.DeepEqual(consumed, wantConsumed) || !reflect.DeepEqual(rest, wantRest) || n != len(want) {34 return fmt.Errorf("got %v, %v, %v\nwant %v, %v, %v", consumed, rest, n, wantConsumed, wantRest, len(want))35 }36 return nil37}38func TestFrameWriteRequestNonData(t *testing.T) {39 wr := makeWriteNonStreamRequest()40 if got, want := wr.DataSize(), 0; got != want {41 t.Errorf("DataSize: got %v, want %v", got, want)42 }43 // Non-DATA frames are always consumed whole.44 if err := checkConsume(wr, 0, []FrameWriteRequest{wr}); err != nil {45 t.Errorf("Consume:\n%v", err)46 }47}48func TestFrameWriteRequestData(t *testing.T) {49 st := &stream{50 id: 1,51 sc: &serverConn{maxFrameSize: 16},52 }53 const size = 3254 wr := FrameWriteRequest{&writeData{st.id, make([]byte, size), true}, st, make(chan error)}55 if got, want := wr.DataSize(), size; got != want {56 t.Errorf("DataSize: got %v, want %v", got, want)57 }58 // No flow-control bytes available: cannot consume anything.59 if err := checkConsume(wr, math.MaxInt32, []FrameWriteRequest{}); err != nil {60 t.Errorf("Consume(limited by flow control):\n%v", err)61 }62 // Add enough flow-control bytes to consume the entire frame,63 // but we're now restricted by st.sc.maxFrameSize.64 st.flow.add(size)65 want := []FrameWriteRequest{66 {67 write: &writeData{st.id, make([]byte, st.sc.maxFrameSize), false},68 stream: st,69 done: nil,70 },71 {72 write: &writeData{st.id, make([]byte, size-st.sc.maxFrameSize), true},73 stream: st,74 done: wr.done,75 },76 }77 if err := checkConsume(wr, math.MaxInt32, want); err != nil {78 t.Errorf("Consume(limited by maxFrameSize):\n%v", err)79 }80 rest := want[1]81 // Consume 8 bytes from the remaining frame.82 want = []FrameWriteRequest{83 {84 write: &writeData{st.id, make([]byte, 8), false},85 stream: st,86 done: nil,87 },88 {89 write: &writeData{st.id, make([]byte, size-st.sc.maxFrameSize-8), true},90 stream: st,91 done: wr.done,92 },93 }94 if err := checkConsume(rest, 8, want); err != nil {95 t.Errorf("Consume(8):\n%v", err)96 }97 rest = want[1]98 // Consume all remaining bytes.99 want = []FrameWriteRequest{100 {101 write: &writeData{st.id, make([]byte, size-st.sc.maxFrameSize-8), true},102 stream: st,103 done: wr.done,104 },105 }106 if err := checkConsume(rest, math.MaxInt32, want); err != nil {107 t.Errorf("Consume(remainder):\n%v", err)108 }109}110func TestFrameWriteRequest_StreamID(t *testing.T) {111 const streamID = 123112 wr := FrameWriteRequest{write: streamError(streamID, ErrCodeNo)}113 if got := wr.StreamID(); got != streamID {114 t.Errorf("FrameWriteRequest(StreamError) = %v; want %v", got, streamID)115 }116}...
Write
Using AI Code Generation
1import (2func main() {3 f, err := os.Create("test.txt")4 if err != nil {5 fmt.Println(err)6 }7 l, err := f.Write([]byte("Hello World"))8 if err != nil {9 fmt.Println(err)10 f.Close()11 }12 fmt.Println(l, "bytes written successfully")13 err = f.Close()14 if err != nil {15 fmt.Println(err)16 }17}18func (f *File) WriteString(s string) (ret int, err error)19import (20func main() {21 f, err := os.Create("test.txt")22 if err != nil {23 fmt.Println(err)24 }25 l, err := f.WriteString("Hello World")26 if err != nil {27 fmt.Println(err)28 f.Close()29 }30 fmt.Println(l, "bytes written successfully")31 err = f.Close()32 if err != nil {33 fmt.Println(err)34 }35}36func (f *File) WriteAt(b []byte, off int64) (n int, err error)37import (38func main() {39 f, err := os.Create("test.txt")40 if err != nil {41 fmt.Println(err)42 }43 l, err := f.WriteAt([]byte("Hello World"), 6)44 if err != nil {45 fmt.Println(err)46 f.Close()47 }48 fmt.Println(l, "bytes written successfully")49 err = f.Close()50 if err != nil {51 fmt.Println(err)52 }53}
Write
Using AI Code Generation
1import (2func main() {3 f, err := os.Create("test.txt")4 if err != nil {5 fmt.Println(err)6 }7 l, err := f.Write([]byte("Hello World"))8 if err != nil {9 fmt.Println(err)10 f.Close()11 }12 fmt.Println(l, "bytes written successfully")13 err = f.Close()14 if err != nil {15 fmt.Println(err)16 }17}18func (f *File) WriteString(s string) (ret int, err error)19import (20func main() {21 f, err := os.Create("test.txt")22 if err != nil {23 fmt.Println(err)24 }25 l, err := f.WriteString("Hello World")26 if err != nil {27 fmt.Println(err)28 f.Close()29 }30 fmt.Println(l, "bytes written successfully")31 err = f.Close()32 if err != nil {33 fmt.Println(err)34 }35}36func (f *File) WriteAt(b []byte, off int64) (n int, err error)37import (38func main() {39 f, err := os.Create("test.txt")40 if err != nil {41 fmt.Println(err)42 }43 l, err := f.WriteAt([]byte("Hello World"), 6)44 if err != nil {45 fmt.Println(err)46 f.Close()47 }48 fmt.Println(l, "bytes written successfully")49 err = f.Close()50 if err != nil {51 fmt.Println(err)52 }53}
Write
Using AI Code Generation
1import (2func main() {3 file, err := os.Create("1.txt")4 if err != nil {5 fmt.Println(err)6 }7 defer file.Close()8 file.WriteString("Hello World!")9}10import (11func main() {12 file, err := os.Create("2.txt")13 if err != nil {14 fmt.Println(err)15 }16 defer file.Close()17 file.Write([]byte("Hello World!"))18}19import (20func main() {21 err := ioutil.WriteFile("3.txt", []byte("Hello World!"), 0644)22 if err != nil {23 fmt.Println(err)24 }25}26import (27func main() {28 file, err := os.Create("4.txt")29 if err != nil {30 fmt.Println(err)31 }32 defer file.Close()33 fmt.Fprintln(file, "Hello World!")34}35import (36func main() {37 file, err := os.Create("5.txt")38 if err != nil {39 fmt.Println(err)40 }41 defer file.Close()42 fmt.Fprintf(file, "Hello %s!", "World")43}44import (45func main() {46 file, err := os.Create("6.txt")47 if err != nil {48 fmt.Println(err)49 }50 defer file.Close()51 fmt.Fprint(file, "Hello World!")52}53import (54func main() {55 file, err := os.Create("7.txt")56 if err != nil {57 fmt.Println(err)58 }59 defer file.Close()60 file.Write([]byte("Hello World!"))61}
Write
Using AI Code Generation
1import (2func main() {3 file, err := os.Create("test.txt")4 if err != nil {5 fmt.Println(err)6 }7 defer file.Close()8 data := []byte("Hello World")9 file.Write(data)10}11func (f *File) WriteString(s string) (n int, err error)12import (13func main() {14 file, err := os.Create("test.txt")15 if err != nil {16 fmt.Println(err)17 }18 defer file.Close()19 file.WriteString("Hello World")20}21func (f *File) WriteAt(b []byte, off int64) (n int, err error)22import (23func main() {24 file, err := os.Create("test.txt")25 if err != nil {26 fmt.Println(err)27 }28 defer file.Close()29 data := []byte("Hello World")30 file.WriteAt(data, 0)31}
Write
Using AI Code Generation
1import (2func main() {3 f, err := os.Create("test.txt")4 if err != nil {5 fmt.Println(err)6 }7 l, err := f.Write([]byte("Hello World"))8 if err != nil {9 fmt.Println(err)10 f.Close()11 }12 fmt.Println(l, "bytes written successfully")13 err = f.Close()14 if err != nil {15 fmt.Println(err)16 }17}18import (19func main() {20 f, err := os.Create("test.txt")21 if err != nil {22 fmt.Println(err)23 }24 l, err := f.WriteString("Hello World")25 if err != nil {26 fmt.Println(err)27 f.Close()28 }29 fmt.Println(l, "bytes written successfully")30 err = f.Close()31 if err != nil {32 fmt.Println(err)33 }34}35import (36func main() {37 f, err := os.Create("test.txt")38 if err != nil {39 fmt.Println(err)40 }41 l, err := f.WriteAt([]byte("Hello World"), 6)42 if err != nil {43 fmt.Println(err)44 f.Close()45 }46 fmt.Println(l, "bytes written successfully")47 err = f.Close()48 if err != nil {49 fmt.Println(err)50 }51}52import (53func main() {54 f, err := os.Create("test.txt")55 if err != nil {56 fmt.Println(err)
Write
Using AI Code Generation
1import (2func main() {3 f, err := os.Create("test.txt")4 if err != nil {5 fmt.Println("Error creating file")6 }7 defer f.Close()8 w := bufio.NewWriter(f)9 w.WriteString("Hello World")10 w.Flush()11}
Write
Using AI Code Generation
1import (2func main() {3 file, err := os.OpenFile("file.txt", os.O_WRONLY|os.O_CREATE, 0666)4 if err != nil {5 fmt.Println(err)6 }7 defer file.Close()8 writer := bufio.NewWriter(file)9 writer.WriteString("Hello World")10 writer.Flush()11}12import (13func main() {14 data := []byte("Hello World")15 err := ioutil.WriteFile("file.txt", data, 0666)16 if err != nil {17 fmt.Println(err)18 os.Exit(1)19 }20}21import (22func main() {23 data := []byte("Hello World")24 file, err := os.Create("file.txt")25 if err != nil {26 fmt.Println(err)27 os.Exit(1)28 }29 defer file.Close()30 file.Write(data)31}32import (33func main() {
Write
Using AI Code Generation
1import (2func main() {3 file, _ := os.Create("test.txt")4 file.Write([]byte("This is a test file"))5 file.Close()6}7import (8func main() {9 file, _ := os.Create("test.txt")10 file.WriteString("This is a test file")11 file.Close()12}13import (14func main() {15 file, _ := os.Create("test.txt")16 file.WriteAt([]byte("This is a test file"), 0)17 file.Close()18}19import (20func main() {21 file, _ := os.Create("test.txt")22 file.WriteStringAt("This is a test file", 0)23 file.Close()24}25import (26func main() {27 file, _ := os.Create("test.txt")28 file.WriteRune('a')29 file.Close()30}31import (32func main() {33 file, _ := os.Create("test.txt")34 file.WriteString("This is a test file")35 file.Close()36}37import (38func main() {39 file, _ := os.Create("test.txt")40 file.WriteByte('a')41 file.Close()42}43import (44func main() {45 file, _ := os.Create("test.txt")46 file.WriteRune('a')47 file.Close()48}49import (50func main() {51 file, _ := os.Create("test.txt")52 file.WriteStringAt("This is a test file", 0)53 file.Close()54}
Write
Using AI Code Generation
1import (2func main() {3 file, err := os.Open("test.txt")4 checkError(err)5 defer file.Close()6 file2, err2 := os.Create("test2.txt")7 checkError(err2)8 defer file2.Close()9 _, err3 := io.Copy(file2, file)10 checkError(err3)11 fmt.Println("File copied successfully")12}13func checkError(err error) {14 if err != nil {15 panic(err)16 }17}
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!