How to use Close method of stream Package

Best Toxiproxy code snippet using stream.Close

stream.go

Source:stream.go Github

copy

Full Screen

1package fx2import (3 "sort"4 "sync"5 "github.com/tal-tech/go-zero/core/collection"6 "github.com/tal-tech/go-zero/core/lang"7 "github.com/tal-tech/go-zero/core/threading"8)9const (10 defaultWorkers = 1611 minWorkers = 112)13type (14 rxOptions struct {15 unlimitedWorkers bool16 workers int17 }18 // FilterFunc defines the method to filter a Stream.19 FilterFunc func(item interface{}) bool20 // ForAllFunc defines the method to handle all elements in a Stream.21 ForAllFunc func(pipe <-chan interface{})22 // ForEachFunc defines the method to handle each element in a Stream.23 ForEachFunc func(item interface{})24 // GenerateFunc defines the method to send elements into a Stream.25 GenerateFunc func(source chan<- interface{})26 // KeyFunc defines the method to generate keys for the elements in a Stream.27 KeyFunc func(item interface{}) interface{}28 // LessFunc defines the method to compare the elements in a Stream.29 LessFunc func(a, b interface{}) bool30 // MapFunc defines the method to map each element to another object in a Stream.31 MapFunc func(item interface{}) interface{}32 // Option defines the method to customize a Stream.33 Option func(opts *rxOptions)34 // ParallelFunc defines the method to handle elements parallelly.35 ParallelFunc func(item interface{})36 // ReduceFunc defines the method to reduce all the elements in a Stream.37 ReduceFunc func(pipe <-chan interface{}) (interface{}, error)38 // WalkFunc defines the method to walk through all the elements in a Stream.39 WalkFunc func(item interface{}, pipe chan<- interface{})40 // A Stream is a stream that can be used to do stream processing.41 Stream struct {42 source <-chan interface{}43 }44)45// Concat returns a concatenated Stream.46func Concat(s Stream, others ...Stream) Stream {47 return s.Concat(others...)48}49// From constructs a Stream from the given GenerateFunc.50func From(generate GenerateFunc) Stream {51 source := make(chan interface{})52 threading.GoSafe(func() {53 defer close(source)54 generate(source)55 })56 return Range(source)57}58// Just converts the given arbitrary items to a Stream.59func Just(items ...interface{}) Stream {60 source := make(chan interface{}, len(items))61 for _, item := range items {62 source <- item63 }64 close(source)65 return Range(source)66}67// Range converts the given channel to a Stream.68func Range(source <-chan interface{}) Stream {69 return Stream{70 source: source,71 }72}73// AllMach returns whether all elements of this stream match the provided predicate.74// May not evaluate the predicate on all elements if not necessary for determining the result.75// If the stream is empty then true is returned and the predicate is not evaluated.76func (s Stream) AllMach(predicate func(item interface{}) bool) bool {77 for item := range s.source {78 if !predicate(item) {79 return false80 }81 }82 return true83}84// AnyMach returns whether any elements of this stream match the provided predicate.85// May not evaluate the predicate on all elements if not necessary for determining the result.86// If the stream is empty then false is returned and the predicate is not evaluated.87func (s Stream) AnyMach(predicate func(item interface{}) bool) bool {88 for item := range s.source {89 if predicate(item) {90 return true91 }92 }93 return false94}95// Buffer buffers the items into a queue with size n.96// It can balance the producer and the consumer if their processing throughput don't match.97func (s Stream) Buffer(n int) Stream {98 if n < 0 {99 n = 0100 }101 source := make(chan interface{}, n)102 go func() {103 for item := range s.source {104 source <- item105 }106 close(source)107 }()108 return Range(source)109}110// Concat returns a Stream that concatenated other streams111func (s Stream) Concat(others ...Stream) Stream {112 source := make(chan interface{})113 go func() {114 group := threading.NewRoutineGroup()115 group.Run(func() {116 for item := range s.source {117 source <- item118 }119 })120 for _, each := range others {121 each := each122 group.Run(func() {123 for item := range each.source {124 source <- item125 }126 })127 }128 group.Wait()129 close(source)130 }()131 return Range(source)132}133// Count counts the number of elements in the result.134func (s Stream) Count() (count int) {135 for range s.source {136 count++137 }138 return139}140// Distinct removes the duplicated items base on the given KeyFunc.141func (s Stream) Distinct(fn KeyFunc) Stream {142 source := make(chan interface{})143 threading.GoSafe(func() {144 defer close(source)145 keys := make(map[interface{}]lang.PlaceholderType)146 for item := range s.source {147 key := fn(item)148 if _, ok := keys[key]; !ok {149 source <- item150 keys[key] = lang.Placeholder151 }152 }153 })154 return Range(source)155}156// Done waits all upstreaming operations to be done.157func (s Stream) Done() {158 for range s.source {159 }160}161// Filter filters the items by the given FilterFunc.162func (s Stream) Filter(fn FilterFunc, opts ...Option) Stream {163 return s.Walk(func(item interface{}, pipe chan<- interface{}) {164 if fn(item) {165 pipe <- item166 }167 }, opts...)168}169// ForAll handles the streaming elements from the source and no later streams.170func (s Stream) ForAll(fn ForAllFunc) {171 fn(s.source)172}173// ForEach seals the Stream with the ForEachFunc on each item, no successive operations.174func (s Stream) ForEach(fn ForEachFunc) {175 for item := range s.source {176 fn(item)177 }178}179// Group groups the elements into different groups based on their keys.180func (s Stream) Group(fn KeyFunc) Stream {181 groups := make(map[interface{}][]interface{})182 for item := range s.source {183 key := fn(item)184 groups[key] = append(groups[key], item)185 }186 source := make(chan interface{})187 go func() {188 for _, group := range groups {189 source <- group190 }191 close(source)192 }()193 return Range(source)194}195// Head returns the first n elements in p.196func (s Stream) Head(n int64) Stream {197 if n < 1 {198 panic("n must be greater than 0")199 }200 source := make(chan interface{})201 go func() {202 for item := range s.source {203 n--204 if n >= 0 {205 source <- item206 }207 if n == 0 {208 // let successive method go ASAP even we have more items to skip209 // why we don't just break the loop, because if break,210 // this former goroutine will block forever, which will cause goroutine leak.211 close(source)212 }213 }214 if n > 0 {215 close(source)216 }217 }()218 return Range(source)219}220// Map converts each item to another corresponding item, which means it's a 1:1 model.221func (s Stream) Map(fn MapFunc, opts ...Option) Stream {222 return s.Walk(func(item interface{}, pipe chan<- interface{}) {223 pipe <- fn(item)224 }, opts...)225}226// Merge merges all the items into a slice and generates a new stream.227func (s Stream) Merge() Stream {228 var items []interface{}229 for item := range s.source {230 items = append(items, item)231 }232 source := make(chan interface{}, 1)233 source <- items234 close(source)235 return Range(source)236}237// Parallel applies the given ParallelFunc to each item concurrently with given number of workers.238func (s Stream) Parallel(fn ParallelFunc, opts ...Option) {239 s.Walk(func(item interface{}, pipe chan<- interface{}) {240 fn(item)241 }, opts...).Done()242}243// Reduce is a utility method to let the caller deal with the underlying channel.244func (s Stream) Reduce(fn ReduceFunc) (interface{}, error) {245 return fn(s.source)246}247// Reverse reverses the elements in the stream.248func (s Stream) Reverse() Stream {249 var items []interface{}250 for item := range s.source {251 items = append(items, item)252 }253 // reverse, official method254 for i := len(items)/2 - 1; i >= 0; i-- {255 opp := len(items) - 1 - i256 items[i], items[opp] = items[opp], items[i]257 }258 return Just(items...)259}260// Skip returns a Stream that skips size elements.261func (s Stream) Skip(n int64) Stream {262 if n < 0 {263 panic("n must not be negative")264 }265 if n == 0 {266 return s267 }268 source := make(chan interface{})269 go func() {270 for item := range s.source {271 n--272 if n >= 0 {273 continue274 } else {275 source <- item276 }277 }278 close(source)279 }()280 return Range(source)281}282// Sort sorts the items from the underlying source.283func (s Stream) Sort(less LessFunc) Stream {284 var items []interface{}285 for item := range s.source {286 items = append(items, item)287 }288 sort.Slice(items, func(i, j int) bool {289 return less(items[i], items[j])290 })291 return Just(items...)292}293// Split splits the elements into chunk with size up to n,294// might be less than n on tailing elements.295func (s Stream) Split(n int) Stream {296 if n < 1 {297 panic("n should be greater than 0")298 }299 source := make(chan interface{})300 go func() {301 var chunk []interface{}302 for item := range s.source {303 chunk = append(chunk, item)304 if len(chunk) == n {305 source <- chunk306 chunk = nil307 }308 }309 if chunk != nil {310 source <- chunk311 }312 close(source)313 }()314 return Range(source)315}316// Tail returns the last n elements in p.317func (s Stream) Tail(n int64) Stream {318 if n < 1 {319 panic("n should be greater than 0")320 }321 source := make(chan interface{})322 go func() {323 ring := collection.NewRing(int(n))324 for item := range s.source {325 ring.Add(item)326 }327 for _, item := range ring.Take() {328 source <- item329 }330 close(source)331 }()332 return Range(source)333}334// Walk lets the callers handle each item, the caller may write zero, one or more items base on the given item.335func (s Stream) Walk(fn WalkFunc, opts ...Option) Stream {336 option := buildOptions(opts...)337 if option.unlimitedWorkers {338 return s.walkUnlimited(fn, option)339 }340 return s.walkLimited(fn, option)341}342func (s Stream) walkLimited(fn WalkFunc, option *rxOptions) Stream {343 pipe := make(chan interface{}, option.workers)344 go func() {345 var wg sync.WaitGroup346 pool := make(chan lang.PlaceholderType, option.workers)347 for {348 pool <- lang.Placeholder349 item, ok := <-s.source350 if !ok {351 <-pool352 break353 }354 wg.Add(1)355 // better to safely run caller defined method356 threading.GoSafe(func() {357 defer func() {358 wg.Done()359 <-pool360 }()361 fn(item, pipe)362 })363 }364 wg.Wait()365 close(pipe)366 }()367 return Range(pipe)368}369func (s Stream) walkUnlimited(fn WalkFunc, option *rxOptions) Stream {370 pipe := make(chan interface{}, defaultWorkers)371 go func() {372 var wg sync.WaitGroup373 for {374 item, ok := <-s.source375 if !ok {376 break377 }378 wg.Add(1)379 // better to safely run caller defined method380 threading.GoSafe(func() {381 defer wg.Done()382 fn(item, pipe)383 })384 }385 wg.Wait()386 close(pipe)387 }()388 return Range(pipe)389}390// UnlimitedWorkers lets the caller to use as many workers as the tasks.391func UnlimitedWorkers() Option {392 return func(opts *rxOptions) {393 opts.unlimitedWorkers = true394 }395}396// WithWorkers lets the caller to customize the concurrent workers.397func WithWorkers(workers int) Option {398 return func(opts *rxOptions) {399 if workers < minWorkers {400 opts.workers = minWorkers401 } else {402 opts.workers = workers403 }404 }405}406func buildOptions(opts ...Option) *rxOptions {407 options := newOptions()408 for _, opt := range opts {409 opt(options)410 }411 return options412}413func newOptions() *rxOptions {414 return &rxOptions{415 workers: defaultWorkers,416 }417}...

Full Screen

Full Screen

blocks_itr.go

Source:blocks_itr.go Github

copy

Full Screen

...23}24func (itr *blocksItr) waitForBlock(blockNum uint64) uint64 {25 itr.mgr.blkfilesInfoCond.L.Lock()26 defer itr.mgr.blkfilesInfoCond.L.Unlock()27 for itr.mgr.blockfilesInfo.lastPersistedBlock < blockNum && !itr.shouldClose() {28 logger.Debugf("Going to wait for newer blocks. maxAvailaBlockNumber=[%d], waitForBlockNum=[%d]",29 itr.mgr.blockfilesInfo.lastPersistedBlock, blockNum)30 itr.mgr.blkfilesInfoCond.Wait()31 logger.Debugf("Came out of wait. maxAvailaBlockNumber=[%d]", itr.mgr.blockfilesInfo.lastPersistedBlock)32 }33 return itr.mgr.blockfilesInfo.lastPersistedBlock34}35func (itr *blocksItr) initStream() error {36 var lp *fileLocPointer37 var err error38 if lp, err = itr.mgr.index.getBlockLocByBlockNum(itr.blockNumToRetrieve); err != nil {39 return err40 }41 if itr.stream, err = newBlockStream(itr.mgr.rootDir, lp.fileSuffixNum, int64(lp.offset), -1); err != nil {42 return err43 }44 return nil45}46func (itr *blocksItr) shouldClose() bool {47 itr.closeMarkerLock.Lock()48 defer itr.closeMarkerLock.Unlock()49 return itr.closeMarker50}51// Next moves the cursor to next block and returns true iff the iterator is not exhausted52func (itr *blocksItr) Next() (ledger.QueryResult, error) {53 if itr.maxBlockNumAvailable < itr.blockNumToRetrieve {54 itr.maxBlockNumAvailable = itr.waitForBlock(itr.blockNumToRetrieve)55 }56 itr.closeMarkerLock.Lock()57 defer itr.closeMarkerLock.Unlock()58 if itr.closeMarker {59 return nil, nil60 }61 if itr.stream == nil {62 logger.Debugf("Initializing block stream for iterator. itr.maxBlockNumAvailable=%d", itr.maxBlockNumAvailable)63 if err := itr.initStream(); err != nil {64 return nil, err65 }66 }67 nextBlockBytes, err := itr.stream.nextBlockBytes()68 if err != nil {69 return nil, err70 }71 itr.blockNumToRetrieve++72 return deserializeBlock(nextBlockBytes)73}74// Close releases any resources held by the iterator75func (itr *blocksItr) Close() {76 itr.mgr.blkfilesInfoCond.L.Lock()77 defer itr.mgr.blkfilesInfoCond.L.Unlock()78 itr.closeMarkerLock.Lock()79 defer itr.closeMarkerLock.Unlock()80 itr.closeMarker = true81 itr.mgr.blkfilesInfoCond.Broadcast()82 if itr.stream != nil {83 itr.stream.close()84 }85}...

Full Screen

Full Screen

inprocstream_test.go

Source:inprocstream_test.go Github

copy

Full Screen

...21 //bad send, should panic, unblock and return error22 err := stream.Send(msg)23 require.NotNil(t, err, "should have errored on panic")24}25func TestRecvChannelClosedError(t *testing.T) {26 ch := make(chan *pb.ChaincodeMessage)27 stream := newInProcStream(ch, ch)28 // Close the channel29 close(ch)30 // Trying to call a closed receive channel should return an error31 _, err := stream.Recv()32 require.Error(t, err, "Should return an error")33 require.Contains(t, err.Error(), "channel is closed")34}35func TestCloseSend(t *testing.T) {36 send := make(chan *pb.ChaincodeMessage)37 recv := make(chan *pb.ChaincodeMessage)38 stream := newInProcStream(recv, send)39 stream.CloseSend()40 _, ok := <-send41 require.False(t, ok, "send channel should be closed")42 require.NotPanics(t, func() { stream.CloseSend() }, "CloseSend should be idempotent")43}...

Full Screen

Full Screen

Close

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 f, err := os.Open("test.txt")4 if err != nil {5 fmt.Println(err)6 }7 defer f.Close()8 io.Copy(os.Stdout, f)9}

Full Screen

Full Screen

Close

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 file, err := os.Create("sample.txt")4 if err != nil {5 fmt.Println(err)6 }7 err = file.Close()8 if err != nil {9 fmt.Println(err)10 }11 fmt.Println("file closed")12}13import (14func main() {15 file, err := os.Create("sample.txt")16 if err != nil {17 fmt.Println(err)18 }19 defer file.Close()20 l, err := file.WriteString("Hello World")21 if err != nil {22 fmt.Println(err)23 file.Close()24 }25 fmt.Println(l, "bytes written successfully")26}27import (28func main() {29 content, err := ioutil.ReadFile("sample.txt")30 if err != nil {31 fmt.Println(err)32 }33 fmt.Println("File contents:", string(content))34}

Full Screen

Full Screen

Close

Using AI Code Generation

copy

Full Screen

1func main() {2 stream = NewStream("test.txt")3 defer stream.Close()4}5func main() {6 stream = NewStream("test.txt")7 defer func() {8 stream.Close()9 }()10}11func main() {12 stream = NewStream("test.txt")13 defer func(s *Stream) {14 s.Close()15 }(stream)16}17func main() {18 stream = NewStream("test.txt")19 defer func() {20 if stream != nil {21 stream.Close()22 }23 }()24}25func main() {26 stream = NewStream("test.txt")27 defer func(s *Stream) {28 if s != nil {29 s.Close()30 }31 }(stream)32}33func main() {34 stream = NewStream("test.txt")35 defer func() {36 if stream != nil {37 stream.Close()38 }39 }()40}41func main() {42 stream = NewStream("test.txt")43 defer func(s *Stream) {44 if s != nil {45 s.Close()46 }47 }(stream)48}49func main() {50 stream = NewStream("test.txt")51 defer func() {52 if stream != nil {53 stream.Close()54 }55 }()56}

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful