How to use stopTimer method of runner Package

Best Gauge code snippet using runner.stopTimer

bench_test.go

Source:bench_test.go Github

copy

Full Screen

1package go_optimizations2import (3 "context"4 "fmt"5 "runtime"6 "sync"7 "sync/atomic"8 "testing"9 "time"10 "unsafe"11 customFmt "go-optimizations/fmt"12 pool "github.com/delivery-club/bees"13)14//Как запускать бенчмарки:15// go test --bench=BenchmarkNewObject$ --benchmem -v --count=1016// go test --bench=. --benchmem -v --count=1017// count - bench run count18// benctime - count of iterations b.N19// benchmem - report allocs20const (21 benchCount = 100000022 poolSize = 50000023)24const (25 extraSmallArraySize = 64 << (1 * iota)26 smallArraySize27 averageArraySize28 mediumArraySize29 bigArraySize30 hugeArraySize31)32const (33 _ = 1 << (10 * iota)34 _35 MiB36)37type (38 extraSmallStruct struct {39 h uint6440 cache [extraSmallArraySize]byte41 body []byte42 }43 smallStruct struct {44 h uint6445 cache [smallArraySize]byte46 body []byte47 }48 averageStruct struct {49 h uint6450 cache [averageArraySize]byte51 body []byte52 }53 mediumStruct struct {54 h uint6455 cache [mediumArraySize]byte56 body []byte57 }58 bigStruct struct {59 h uint6460 cache [bigArraySize]byte61 body []byte62 }63 hugeStruct struct {64 h uint6465 cache [hugeArraySize]byte66 body []byte67 }68)69var hugeStructPool = sync.Pool{New: func() interface{} {70 return &hugeStruct{body: make([]byte, 0, mediumArraySize)}71}}72func BenchmarkRangeValueCopy(b *testing.B) {73 var sum uint64 = 074 var hugeSlice = make([]hugeStruct, benchCount)75 b.Run("range_value_copy", func(b *testing.B) {76 for i := 0; i < b.N; i++ {77 for _, hs := range hugeSlice {78 sum += hs.h79 }80 }81 })82 b.Run("range_value_index", func(b *testing.B) {83 for i := 0; i < b.N; i++ {84 for ii := range hugeSlice {85 sum += hugeSlice[ii].h86 }87 }88 })89 b.Run("range_value_pointer_and_index", func(b *testing.B) {90 for i := 0; i < b.N; i++ {91 for ii := range hugeSlice {92 sum += (&hugeSlice[ii]).h93 }94 }95 })96 b.Run("range_value_pointer_and_index_split", func(b *testing.B) {97 for i := 0; i < b.N; i++ {98 for ii := range hugeSlice {99 h := &hugeSlice[ii]100 sum += h.h101 }102 }103 })104 b.Logf("sum: %d", sum)105 stats := checkMem()106 b.Logf("memory usage:%d MB", stats.TotalAlloc/MiB)107 b.Logf("GC cycles: %d", stats.NumGC)108}109func BenchmarkRangeArrayValue(b *testing.B) {110 b.StopTimer()111 var sum uint64 = 0112 var hugeArray = [hugeArraySize]hugeStruct{}113 b.StartTimer()114 b.Run("range_array", func(b *testing.B) {115 for i := 0; i < b.N; i++ {116 for _, v := range hugeArray {117 sum += v.h118 }119 }120 })121 b.Run("range_array_with_pointer", func(b *testing.B) {122 for i := 0; i < b.N; i++ {123 for _, v := range &hugeArray {124 sum += v.h125 }126 }127 })128 _ = sum129}130func BenchmarkMakeIncorrectUsage(b *testing.B) {131 b.Run("benchmark_make_incorrect_usage", func(b *testing.B) {132 var t = make([][extraSmallArraySize]byte, 0, 10)133 for ii := 0; ii < b.N; ii++ {134 t = append(t, [extraSmallArraySize]byte{})135 }136 })137}138func BenchmarkMakeCorrectUsage(b *testing.B) {139 b.Run("benchmark_make_correct_usage", func(b *testing.B) {140 var t = make([][extraSmallArraySize]byte, 0, b.N)141 for ii := 0; ii < b.N; ii++ {142 t = append(t, [extraSmallArraySize]byte{})143 }144 })145}146var tCopy hugeStruct147func BenchmarkHugeParamByCopy(b *testing.B) {148 b.StopTimer()149 tCopy = hugeStruct{150 h: 0,151 cache: [2048]byte{},152 }153 b.StartTimer()154 b.Run("benchmark_huge_param_by_copy", func(b *testing.B) {155 for ii := 0; ii < b.N; ii++ {156 tCopy = dummyCopy(tCopy)157 }158 })159}160func dummyCopy(h hugeStruct) hugeStruct {161 for i := 0; i < 10; i++ {162 h.h = uint64(i)163 }164 return h165}166var tPointer *hugeStruct167func BenchmarkHugeParamByPointer(b *testing.B) {168 b.StopTimer()169 tPointer = &hugeStruct{170 h: 0,171 cache: [2048]byte{},172 }173 b.StartTimer()174 b.Run("benchmark_huge_param_by_pointer", func(b *testing.B) {175 for ii := 0; ii < b.N; ii++ {176 tPointer = dummyPointer(tPointer)177 }178 })179}180func dummyPointer(h *hugeStruct) *hugeStruct {181 for i := 0; i < 10; i++ {182 h.h = uint64(i)183 h.body = append(h.body, 'f')184 }185 return h186}187func BenchmarkNewObject(b *testing.B) {188 b.StopTimer()189 var (190 wg sync.WaitGroup191 h *hugeStruct192 )193 b.StartTimer()194 b.Run("new_object", func(b *testing.B) {195 wg.Add(b.N)196 for ii := 0; ii < b.N; ii++ {197 go func() {198 h = &hugeStruct{body: make([]byte, 0, mediumArraySize)}199 h = dummyPointer(h)200 wg.Done()201 }()202 }203 wg.Wait()204 })205 stats := checkMem()206 b.Logf("memory usage:%d MB", stats.TotalAlloc/MiB)207 b.Logf("GC cycles: %d", stats.NumGC)208 b.Logf("counter: %d", h.h)209}210func get() *hugeStruct {211 return hugeStructPool.Get().(*hugeStruct)212}213func put(h *hugeStruct) {214 h.h = 0215 h.body = h.body[:0]216 for i := range &h.cache {217 h.cache[i] = 0218 }219 hugeStructPool.Put(h)220}221func BenchmarkNewObjectWithSyncPool(b *testing.B) {222 b.StopTimer()223 hugeStructPool = sync.Pool{New: func() interface{} {224 return &hugeStruct{body: make([]byte, 0, mediumArraySize)}225 }}226 var h *hugeStruct227 b.StartTimer()228 b.Run("new_object_with_sync_pool", func(b *testing.B) {229 var wg sync.WaitGroup230 wg.Add(b.N)231 for ii := 0; ii < b.N; ii++ {232 go func() {233 h = get()234 h = dummyPointer(h)235 wg.Done()236 put(h)237 }()238 }239 wg.Wait()240 })241 stats := checkMem()242 b.Logf("memory usage:%d MB", stats.TotalAlloc/MiB)243 b.Logf("GC cycles: %d", stats.NumGC)244 b.Logf("counter: %d", h.h)245}246var mu sync.Mutex247func dummyProcess(rand int64) int64 {248 var sum int64249 for i := int64(0); i < 1000; i++ {250 sum += i251 }252 mu.Lock()253 sum += rand254 mu.Unlock()255 return sum256}257func BenchmarkGoroutinesRaw(b *testing.B) {258 b.StopTimer()259 var (260 wg sync.WaitGroup261 counter int64262 process = func(num int64) {263 atomic.AddInt64(&counter, dummyProcess(num))264 wg.Done()265 }266 )267 b.StartTimer()268 const name = "raw_goroutines"269 procFunc := func(j int64) {270 wg.Add(1)271 go process(j)272 }273 for ng := 1; ng < 16; ng++ {274 runnerParallel(b, name, ng, procFunc, &wg)275 }276 for ng := 16; ng < 128; ng += 8 {277 runnerParallel(b, name, ng, procFunc, &wg)278 }279 for ng := 128; ng < 512; ng += 16 {280 runnerParallel(b, name, ng, procFunc, &wg)281 }282 for ng := 512; ng < 1024; ng += 32 {283 runnerParallel(b, name, ng, procFunc, &wg)284 }285 for ng := 1024; ng < 2048; ng += 64 {286 runnerParallel(b, name, ng, procFunc, &wg)287 }288 for ng := 2048; ng < 4096; ng += 128 {289 runnerParallel(b, name, ng, procFunc, &wg)290 }291 for ng := 4096; ng < 16384; ng += 512 {292 runnerParallel(b, name, ng, procFunc, &wg)293 }294 for ng := 16384; ng < 65536; ng += 2048 {295 runnerParallel(b, name, ng, procFunc, &wg)296 }297 for ng := 65536; ng < 131072; ng += 4096 {298 runnerParallel(b, name, ng, procFunc, &wg)299 }300 for ng := 131072; ng < 262144; ng += 8192 {301 runnerParallel(b, name, ng, procFunc, &wg)302 }303 for ng := 262144; ng < 524288; ng += 16384 {304 runnerParallel(b, name, ng, procFunc, &wg)305 }306 for ng := 524288; ng < 1048576; ng += 32768 {307 runnerParallel(b, name, ng, procFunc, &wg)308 }309 for ng := 1048576; ng < 2097152; ng += 65536 {310 runnerParallel(b, name, ng, procFunc, &wg)311 }312 b.StopTimer()313 stats := checkMem()314 b.Logf("memory usage:%d MB", stats.TotalAlloc/MiB)315 b.Logf("GC cycles: %d", stats.NumGC)316 b.Logf("%d", counter)317}318func BenchmarkGoroutinesRawNotOptimized(b *testing.B) {319 b.StopTimer()320 var (321 wg sync.WaitGroup322 counter int64323 )324 b.StartTimer()325 b.Run("raw_goroutines_not_optimized", func(b *testing.B) {326 wg.Add(b.N)327 for j := 0; j < b.N; j++ {328 go func(num int64) {329 atomic.AddInt64(&counter, dummyProcess(num))330 wg.Done()331 }(int64(j))332 }333 wg.Wait()334 })335 stats := checkMem()336 b.Logf("memory usage:%d MB", stats.TotalAlloc/MiB)337 b.Logf("GC cycles: %d", stats.NumGC)338 b.Logf("%d", counter)339}340func BenchmarkGoroutinesSemaphore(b *testing.B) {341 b.StopTimer()342 var (343 wg sync.WaitGroup344 counter int64345 sema = make(chan struct{}, poolSize)346 process = func(num int64) {347 atomic.AddInt64(&counter, dummyProcess(num))348 <-sema349 wg.Done()350 }351 )352 procFunc := func(j int64) {353 sema <- struct{}{}354 wg.Add(1)355 go process(j)356 }357 const name = "semaphore"358 b.StartTimer()359 for ng := 1; ng < 16; ng++ {360 runnerParallel(b, name, ng, procFunc, &wg)361 }362 for ng := 16; ng < 128; ng += 8 {363 runnerParallel(b, name, ng, procFunc, &wg)364 }365 for ng := 128; ng < 512; ng += 16 {366 runnerParallel(b, name, ng, procFunc, &wg)367 }368 for ng := 512; ng < 1024; ng += 32 {369 runnerParallel(b, name, ng, procFunc, &wg)370 }371 for ng := 1024; ng < 2048; ng += 64 {372 runnerParallel(b, name, ng, procFunc, &wg)373 }374 for ng := 2048; ng < 4096; ng += 128 {375 runnerParallel(b, name, ng, procFunc, &wg)376 }377 for ng := 4096; ng < 16384; ng += 512 {378 runnerParallel(b, name, ng, procFunc, &wg)379 }380 for ng := 16384; ng < 65536; ng += 2048 {381 runnerParallel(b, name, ng, procFunc, &wg)382 }383 for ng := 65536; ng < 131072; ng += 4096 {384 runnerParallel(b, name, ng, procFunc, &wg)385 }386 for ng := 131072; ng < 262144; ng += 8192 {387 runnerParallel(b, name, ng, procFunc, &wg)388 }389 for ng := 262144; ng < 524288; ng += 16384 {390 runnerParallel(b, name, ng, procFunc, &wg)391 }392 for ng := 524288; ng < 1048576; ng += 32768 {393 runnerParallel(b, name, ng, procFunc, &wg)394 }395 for ng := 1048576; ng < 2097152; ng += 65536 {396 runnerParallel(b, name, ng, procFunc, &wg)397 }398 stats := checkMem()399 b.Logf("memory usage:%d MB", stats.TotalAlloc/MiB)400 b.Logf("GC cycles: %d", stats.NumGC)401 b.Logf("%d", counter)402}403func BenchmarkGoroutinesReusable(b *testing.B) {404 b.StopTimer()405 var (406 wg sync.WaitGroup407 counter int64408 )409 const name = "reusable_goroutines"410 p := pool.Create(context.Background(), func(ctx context.Context, task interface{}) {411 atomic.AddInt64(&counter, dummyProcess(task.(int64)))412 wg.Done()413 }, pool.WithCapacity(poolSize), pool.WithKeepAlive(5*time.Second))414 defer func() {415 p.Close()416 }()417 b.StartTimer()418 procFunc := func(j int64) {419 wg.Add(1)420 p.Submit(j)421 }422 for ng := 1; ng < 16; ng++ {423 runnerParallel(b, name, ng, procFunc, &wg)424 }425 for ng := 16; ng < 128; ng += 8 {426 runnerParallel(b, name, ng, procFunc, &wg)427 }428 for ng := 128; ng < 512; ng += 16 {429 runnerParallel(b, name, ng, procFunc, &wg)430 }431 for ng := 512; ng < 1024; ng += 32 {432 runnerParallel(b, name, ng, procFunc, &wg)433 }434 for ng := 1024; ng < 2048; ng += 64 {435 runnerParallel(b, name, ng, procFunc, &wg)436 }437 for ng := 2048; ng < 4096; ng += 128 {438 runnerParallel(b, name, ng, procFunc, &wg)439 }440 for ng := 4096; ng < 16384; ng += 512 {441 runnerParallel(b, name, ng, procFunc, &wg)442 }443 for ng := 16384; ng < 65536; ng += 2048 {444 runnerParallel(b, name, ng, procFunc, &wg)445 }446 for ng := 65536; ng < 131072; ng += 4096 {447 runnerParallel(b, name, ng, procFunc, &wg)448 }449 for ng := 131072; ng < 262144; ng += 8192 {450 runnerParallel(b, name, ng, procFunc, &wg)451 }452 for ng := 262144; ng < 524288; ng += 16384 {453 runnerParallel(b, name, ng, procFunc, &wg)454 }455 for ng := 524288; ng < 1048576; ng += 32768 {456 runnerParallel(b, name, ng, procFunc, &wg)457 }458 for ng := 1048576; ng < 2097152; ng += 65536 {459 runnerParallel(b, name, ng, procFunc, &wg)460 }461 b.StopTimer()462 stats := checkMem()463 b.Logf("memory usage:%d MB", stats.TotalAlloc/MiB)464 b.Logf("GC cycles: %d", stats.NumGC)465 b.Logf("%d", counter)466}467func BenchmarkGC(b *testing.B) {468 b.StopTimer()469 var wg sync.WaitGroup470 b.StartTimer()471 b.Run("gc_without_ballast", func(b *testing.B) {472 for i := 0; i < b.N; i++ {473 wg.Add(1)474 go func() {475 defer wg.Done()476 dummyApplication(benchCount / 2)477 time.Sleep(time.Nanosecond)478 }()479 dummyApplication(benchCount)480 time.Sleep(time.Nanosecond)481 wg.Wait()482 }483 })484 b.StopTimer()485 stats := checkMem()486 b.Logf("memory usage: %d MB", stats.TotalAlloc/MiB)487 b.Logf("GC cycles: %d", stats.NumGC)488}489//bench_test.go:329: memory usage: 59522 MB490//bench_test.go:330: GC cycles: 17838491func BenchmarkGCWithBallast(b *testing.B) {492 b.StopTimer()493 var (494 ballast = make([]byte, 10<<30)495 wg sync.WaitGroup496 )497 b.StartTimer()498 b.Run("gc_with_ballast", func(b *testing.B) {499 for i := 0; i < b.N; i++ {500 wg.Add(1)501 go func() {502 defer wg.Done()503 dummyApplication(benchCount / 2)504 time.Sleep(time.Nanosecond)505 }()506 dummyApplication(benchCount)507 time.Sleep(time.Nanosecond)508 wg.Wait()509 }510 })511 b.StopTimer()512 func(_ []byte) {513 for {514 break515 }516 }(ballast)517 _ = ballast518 stats := checkMem()519 b.Logf("memory usage: %d MB", stats.TotalAlloc/MiB)520 b.Logf("GC cycles: %d", stats.NumGC)521 b.Logf("%d", ballast[2])522}523//bench_test.go:353: memory usage: 69813 MB524//bench_test.go:354: GC cycles: 17017525func dummyApplication(count int) {526 var wg sync.WaitGroup527 wg.Add(count)528 for ii := 0; ii < count; ii++ {529 go func() {530 h := &hugeStruct{body: make([]byte, 0, mediumArraySize)}531 h = dummyPointer(h)532 wg.Done()533 }()534 }535 wg.Wait()536}537func BenchmarkInterfaceUsage(b *testing.B) {538 b.StopTimer()539 var foo string540 b.StartTimer()541 b.Run("fmt_sprint", func(b *testing.B) {542 for i := 0; i < b.N; i++ {543 foo = fmt.Sprint("foo", "bar")544 }545 })546 b.Run("fmt_sprintf", func(b *testing.B) {547 for i := 0; i < b.N; i++ {548 foo = fmt.Sprintf("foo %s", "bar")549 }550 })551 // TODO why slower than fmt.Sprint552 b.Run("fmt_sprint_string", func(b *testing.B) {553 for i := 0; i < b.N; i++ {554 foo = customFmt.SprintString("foo", "bar")555 }556 })557 b.Run("concatenation", func(b *testing.B) {558 for i := 0; i < b.N; i++ {559 foo = "foo" + "bar"560 }561 })562 foo = "bar"563 _ = foo564}565func BenchmarkStructSizes(b *testing.B) {566 type struct1 struct {567 counter int8568 secondCounter int8569 k chan string570 }571 type struct2 struct {572 counter int8573 k chan string574 secondCounter int8575 }576 b.Logf("%d", unsafe.Sizeof(struct1{}))577 b.Logf("%d", unsafe.Sizeof(struct2{}))578 b.Logf("%d", unsafe.Sizeof(extraSmallStruct{}))579 b.Logf("%d", unsafe.Sizeof(smallStruct{}))580 b.Logf("%d", unsafe.Sizeof(averageStruct{}))581 b.Logf("%d", unsafe.Sizeof(mediumStruct{}))582 b.Logf("%d", unsafe.Sizeof(bigStruct{}))583 b.Logf("%d", unsafe.Sizeof(hugeStruct{}))584}585type state struct {586 c int64587}588func BenchmarkAtomicBased(b *testing.B) {589 var counter = &state{}590 const name = "atomic_based_counter"591 atomicCounter := func(i int64) int64 {592 var taken bool593 var newCounter int64594 for !taken {595 oldCounter := atomic.LoadInt64(&counter.c)596 newCounter = i * oldCounter597 taken = atomic.CompareAndSwapInt64(&counter.c, oldCounter, newCounter)598 }599 return newCounter600 }601 for ng := 1; ng < 16; ng++ {602 runner(b, name, ng, atomicCounter)603 }604 for ng := 16; ng < 128; ng += 8 {605 runner(b, name, ng, atomicCounter)606 }607 for ng := 128; ng < 512; ng += 16 {608 runner(b, name, ng, atomicCounter)609 }610 for ng := 512; ng < 1024; ng += 32 {611 runner(b, name, ng, atomicCounter)612 }613 for ng := 1024; ng < 2048; ng += 64 {614 runner(b, name, ng, atomicCounter)615 }616 for ng := 2048; ng < 4096; ng += 128 {617 runner(b, name, ng, atomicCounter)618 }619 for ng := 4096; ng < 16384; ng += 512 {620 runner(b, name, ng, atomicCounter)621 }622 for ng := 16384; ng < 65536; ng += 2048 {623 runner(b, name, ng, atomicCounter)624 }625}626func BenchmarkMutexBased(b *testing.B) {627 var (628 counter = &state{}629 mu sync.Mutex630 )631 const name = "mutex_based_counter"632 mutexCounter := func(i int64) int64 {633 mu.Lock()634 newCounter := i * counter.c635 counter.c = newCounter636 mu.Unlock()637 return newCounter638 }639 for ng := 1; ng < 16; ng++ {640 runner(b, name, ng, mutexCounter)641 }642 for ng := 16; ng < 128; ng += 8 {643 runner(b, name, ng, mutexCounter)644 }645 for ng := 128; ng < 512; ng += 16 {646 runner(b, name, ng, mutexCounter)647 }648 for ng := 512; ng < 1024; ng += 32 {649 runner(b, name, ng, mutexCounter)650 }651 for ng := 1024; ng < 2048; ng += 64 {652 runner(b, name, ng, mutexCounter)653 }654 for ng := 2048; ng < 4096; ng += 128 {655 runner(b, name, ng, mutexCounter)656 }657 for ng := 4096; ng < 16384; ng += 512 {658 runner(b, name, ng, mutexCounter)659 }660 for ng := 16384; ng < 65536; ng += 2048 {661 runner(b, name, ng, mutexCounter)662 }663}664// runner - run batched func for multiply goroutines665func runner(b *testing.B, name string, ng int, procFunc func(i int64) int64) bool {666 return b.Run(fmt.Sprintf("type:%s-goroutines:%d", name, ng), func(b *testing.B) {667 var wg sync.WaitGroup668 var trigger int64 = 0669 n := b.N670 // if we will get batchSize = 1000 and n = 100k671 // we will start 1000 goroutines, each of which will execute 100 operations672 batchSize := n / ng // 100000 / 1000 = 100673 if batchSize == 0 {674 batchSize = n675 }676 for n > 0 {677 wg.Add(1)678 funcCallPerGoroutine := min(n, batchSize) // 100679 n -= funcCallPerGoroutine // 99900680 go func(quota int) {681 for atomic.LoadInt64(&trigger) == 0 {682 runtime.Gosched()683 }684 for i := 0; i < quota; i++ {685 procFunc(int64(i))686 }687 wg.Done()688 }(funcCallPerGoroutine)689 }690 b.StartTimer()691 atomic.StoreInt64(&trigger, 1)692 wg.Wait()693 b.StopTimer()694 })695}696// runnerParallel - run batched goroutines697func runnerParallel(b *testing.B, name string, ng int, funcWithGo func(i int64), procWg *sync.WaitGroup) bool {698 return b.Run(fmt.Sprintf("type:%s-goroutines:%d", name, ng), func(b *testing.B) {699 var wg sync.WaitGroup700 var trigger int64 = 0701 n := b.N702 // if we will get batchSize = 1000 and n = 100k703 // we will start 1000 goroutines, each of which will start 100 goroutines704 batchSize := n / ng // 100000 / 1000 = 100705 if batchSize == 0 {706 batchSize = n707 }708 for n > 0 {709 wg.Add(1)710 goCallsPerGoroutine := min(n, batchSize) // 100711 n -= goCallsPerGoroutine // 99900712 go func(quota int) {713 for atomic.LoadInt64(&trigger) == 0 {714 runtime.Gosched()715 }716 for i := 0; i < quota; i++ {717 funcWithGo(int64(i))718 }719 wg.Done()720 }(goCallsPerGoroutine)721 }722 b.StartTimer()723 atomic.StoreInt64(&trigger, 1)724 wg.Wait()725 procWg.Wait()726 b.StopTimer()727 })728}729func min(a, b int) int {730 if a < b {731 return a732 }733 return b734}735func checkMem() *runtime.MemStats {736 mem := &runtime.MemStats{}737 runtime.ReadMemStats(mem)738 return mem739}...

Full Screen

Full Screen

manager.go

Source:manager.go Github

copy

Full Screen

1package manifest2import (3 "context"4 "time"5 "github.com/pkg/errors"6 "github.com/tendermint/tendermint/libs/log"7 lifecycle "github.com/boz/go-lifecycle"8 "github.com/ovrclk/akash/manifest"9 "github.com/ovrclk/akash/provider/event"10 "github.com/ovrclk/akash/provider/session"11 "github.com/ovrclk/akash/pubsub"12 "github.com/ovrclk/akash/util/runner"13 "github.com/ovrclk/akash/validation"14 dquery "github.com/ovrclk/akash/x/deployment/query"15 dtypes "github.com/ovrclk/akash/x/deployment/types"16 mtypes "github.com/ovrclk/akash/x/market/types"17)18var (19 ErrShutdownTimerExpired = errors.New("shutdown timer expired")20)21func newManager(h *service, daddr dtypes.DeploymentID) (*manager, error) {22 session := h.session.ForModule("manifest-manager")23 sub, err := h.sub.Clone()24 if err != nil {25 return nil, err26 }27 m := &manager{28 config: h.config,29 daddr: daddr,30 session: session,31 bus: h.bus,32 sub: sub,33 leasech: make(chan event.LeaseWon),34 rmleasech: make(chan mtypes.LeaseID),35 manifestch: make(chan manifestRequest),36 updatech: make(chan []byte),37 log: session.Log().With("deployment", daddr),38 lc: lifecycle.New(),39 }40 go m.lc.WatchChannel(h.lc.ShuttingDown())41 go m.run(h.managerch)42 return m, nil43}44type manager struct {45 config config46 daddr dtypes.DeploymentID47 session session.Session48 bus pubsub.Bus49 sub pubsub.Subscriber50 leasech chan event.LeaseWon51 rmleasech chan mtypes.LeaseID52 manifestch chan manifestRequest53 updatech chan []byte54 data *dquery.Deployment55 requests []manifestRequest56 leases []event.LeaseWon57 manifests []*manifest.Manifest58 versions [][]byte59 stoptimer *time.Timer60 log log.Logger61 lc lifecycle.Lifecycle62}63func (m *manager) stop() {64 m.lc.ShutdownAsync(nil)65}66func (m *manager) handleLease(ev event.LeaseWon) {67 select {68 case m.leasech <- ev:69 case <-m.lc.ShuttingDown():70 m.log.Error("not running: handle manifest", "lease", ev.LeaseID)71 }72}73func (m *manager) removeLease(id mtypes.LeaseID) {74 select {75 case m.rmleasech <- id:76 case <-m.lc.ShuttingDown():77 m.log.Error("not running: remove lease", "lease", id)78 }79}80func (m *manager) handleManifest(req manifestRequest) {81 select {82 case m.manifestch <- req:83 case <-m.lc.ShuttingDown():84 m.log.Error("not running: handle manifest")85 req.ch <- ErrNotRunning86 }87}88func (m *manager) handleUpdate(version []byte) {89 select {90 case m.updatech <- version:91 case <-m.lc.ShuttingDown():92 m.log.Error("not running: version update", "version", version)93 }94}95func (m *manager) run(donech chan<- *manager) {96 defer m.lc.ShutdownCompleted()97 defer func() { donech <- m }()98 var runch <-chan runner.Result99 ctx, cancel := context.WithCancel(context.Background())100loop:101 for {102 var stopch <-chan time.Time103 if m.stoptimer != nil {104 stopch = m.stoptimer.C105 }106 select {107 case err := <-m.lc.ShutdownRequest():108 m.lc.ShutdownInitiated(err)109 break loop110 case <-stopch:111 m.log.Error(ErrShutdownTimerExpired.Error())112 m.lc.ShutdownInitiated(ErrShutdownTimerExpired)113 break loop114 case ev := <-m.leasech:115 m.log.Info("new lease", "lease", ev.LeaseID)116 m.leases = append(m.leases, ev)117 m.emitReceivedEvents()118 m.maybeScheduleStop()119 runch = m.maybeFetchData(ctx, runch)120 case id := <-m.rmleasech:121 m.log.Info("lease removed", "lease", id)122 for idx, lease := range m.leases {123 if id.Equals(lease.LeaseID) {124 m.leases = append(m.leases[:idx], m.leases[idx+1:]...)125 }126 }127 m.maybeScheduleStop()128 case req := <-m.manifestch:129 m.log.Info("manifest received")130 // TODO: fail fast if invalid request to prevent DoS131 m.requests = append(m.requests, req)132 m.validateRequests()133 m.emitReceivedEvents()134 m.maybeScheduleStop()135 runch = m.maybeFetchData(ctx, runch)136 case version := <-m.updatech:137 m.log.Info("received version", "version", version)138 m.versions = append(m.versions, version)139 if m.data != nil {140 m.data.Version = version141 }142 case result := <-runch:143 runch = nil144 if err := result.Error(); err != nil {145 m.log.Error("error fetching data", "err", err)146 break147 }148 m.data = result.Value().(*dquery.Deployment)149 m.log.Info("data received", "version", m.data.Version)150 m.validateRequests()151 m.emitReceivedEvents()152 m.maybeScheduleStop()153 }154 }155 cancel()156 for _, req := range m.requests {157 req.ch <- ErrNotRunning158 }159 if m.stoptimer != nil {160 if m.stoptimer.Stop() {161 <-m.stoptimer.C162 }163 }164 if runch != nil {165 <-runch166 }167}168func (m *manager) maybeFetchData(ctx context.Context, runch <-chan runner.Result) <-chan runner.Result {169 if m.data == nil && runch == nil {170 return m.fetchData(ctx)171 }172 return runch173}174func (m *manager) fetchData(ctx context.Context) <-chan runner.Result {175 return runner.Do(func() runner.Result {176 // TODO: retry177 return runner.NewResult(m.doFetchData(ctx))178 })179}180func (m *manager) doFetchData(_ context.Context) (*dquery.Deployment, error) {181 deployment, err := m.session.Client().Query().Deployment(m.daddr)182 if err != nil {183 return nil, err184 }185 return &deployment, nil186}187func (m *manager) maybeScheduleStop() bool { // nolint:golint,unparam188 if len(m.leases) > 0 || len(m.manifests) > 0 {189 if m.stoptimer != nil {190 m.log.Info("stopping stop timer")191 if m.stoptimer.Stop() {192 <-m.stoptimer.C193 }194 m.stoptimer = nil195 }196 return false197 }198 if m.stoptimer != nil {199 m.log.Info("starting stop timer", "duration", m.config.ManifestLingerDuration)200 m.stoptimer = time.NewTimer(m.config.ManifestLingerDuration)201 }202 return true203}204func (m *manager) emitReceivedEvents() {205 if m.data == nil || len(m.leases) == 0 || len(m.manifests) == 0 {206 return207 }208 manifest := m.manifests[len(m.manifests)-1]209 m.log.Debug("publishing manifest received", "num-leases", len(m.leases))210 for _, lease := range m.leases {211 if err := m.bus.Publish(event.ManifestReceived{212 LeaseID: lease.LeaseID,213 Group: lease.Group,214 Manifest: manifest,215 Deployment: m.data,216 }); err != nil {217 m.log.Error("publishing event", "err", err, "lease", lease.LeaseID)218 }219 }220}221func (m *manager) validateRequests() {222 if m.data == nil || len(m.requests) == 0 {223 return224 }225 manifests := make([]*manifest.Manifest, 0)226 for _, req := range m.requests {227 if err := m.validateRequest(req); err != nil {228 m.log.Error("invalid manifest", "err", err)229 req.ch <- err230 continue231 }232 manifests = append(manifests, &req.value.Manifest)233 req.ch <- nil234 }235 m.requests = nil236 m.log.Debug("requests valid", "num-requests", len(manifests))237 if len(manifests) > 0 {238 // XXX: only one version means only one valid manifest239 m.manifests = append(m.manifests, manifests[0])240 }241}242func (m *manager) validateRequest(req manifestRequest) error {243 // TODO: hash(manifest) == m.data.Version244 if err := validation.ValidateManifestWithDeployment(&req.value.Manifest, m.data.Groups); err != nil {245 return err246 }247 return nil248}...

Full Screen

Full Screen

test_bench_test.go

Source:test_bench_test.go Github

copy

Full Screen

1package servicetest2import (3 "sync"4 "testing"5 "time"6 service "github.com/shabbyrobe/go-service"7)8func BenchmarkRunnerStart1(b *testing.B) {9 benchmarkRunnerStartN(b, 1)10}11func BenchmarkRunnerStartWait1(b *testing.B) {12 benchmarkRunnerStartWaitN(b, 1)13}14func BenchmarkGoroutineStart1(b *testing.B) {15 benchmarkGoroutineStartN(b, 1)16}17func BenchmarkRunnerStart10(b *testing.B) {18 benchmarkRunnerStartN(b, 10)19}20func BenchmarkRunnerStartWait10(b *testing.B) {21 benchmarkRunnerStartWaitN(b, 10)22}23func BenchmarkGoroutineStart10(b *testing.B) {24 benchmarkGoroutineStartN(b, 10)25}26func benchmarkRunnerStartN(b *testing.B, n int) {27 r := service.NewRunner()28 svcs := make([]*service.Service, n)29 for i := 0; i < n; i++ {30 svcs[i] = &service.Service{31 Runnable: (&BlockingService{}).Init(),32 }33 }34 b.ResetTimer()35 for i := 0; i < b.N; i++ {36 b.StartTimer()37 if err := r.Start(nil, svcs...); err != nil {38 panic(err)39 }40 b.StopTimer()41 if err := r.Halt(nil, svcs...); err != nil {42 panic(err)43 }44 }45}46func benchmarkRunnerStartWaitN(b *testing.B, n int) {47 r := service.NewRunner()48 svcs := make([]*service.Service, n)49 for i := 0; i < n; i++ {50 svcs[i] = &service.Service{51 Runnable: (&BlockingService{}).Init(),52 }53 }54 b.ResetTimer()55 for i := 0; i < b.N; i++ {56 b.StartTimer()57 if err := service.StartTimeout(1*time.Second, r, svcs...); err != nil {58 panic(err)59 }60 b.StopTimer()61 for j := 0; j < n; j++ {62 if err := service.HaltTimeout(1*time.Second, r, svcs...); err != nil {63 panic(err)64 }65 }66 }67}68func benchmarkGoroutineStartN(b *testing.B, n int) {69 b.StopTimer()70 b.ResetTimer()71 var wg sync.WaitGroup72 for i := 0; i < b.N; i++ {73 stop := make(chan struct{})74 wg.Add(n)75 b.StartTimer()76 for i := 0; i < n; i++ {77 go func() {78 <-stop79 wg.Done()80 }()81 }82 b.StopTimer()83 close(stop)84 wg.Wait()85 }86}...

Full Screen

Full Screen

stopTimer

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 r := runner.New(3 * time.Second)4 r.Add(createTask(), createTask(), createTask())5 if err := r.Start(); err != nil {6 switch err {7 fmt.Println("Terminating due to timeout.")8 os.Exit(1)9 fmt.Println("Terminating due to interrupt.")10 os.Exit(2)11 }12 }13 fmt.Println("Process ended.")14}15import (16var ErrTimeout = errors.New("received timeout")17var ErrInterrupt = errors.New("received interrupt")18type Runner struct {19 tasks []func(int)20}21func New(d time.Duration) *Runner {22 return &Runner{23 interrupt: make(chan os.Signal, 1),24 complete: make(chan error),25 timeout: time.After(d),26 }27}28func (r *Runner) Add(tasks ...func(int)) {29 r.tasks = append(r.tasks, tasks...)30}31func (r *Runner) Start() error {32 signal.Notify(r.interrupt, os.Interrupt)33 go func() {34 r.complete <- r.run()35 }()36 select {37 }38}39func (r *Runner) run() error {40 for id, task := range r.tasks {41 if r.gotInterrupt() {42 }43 task(id)44 }45}46func (r *Runner) gotInterrupt() bool {47 select {48 signal.Stop(r.interrupt)49 }50}51import (52func main() {53 r := New(timeout)54 r.Add(createTask(), createTask(), createTask())55 if err := r.Start(); err != nil {56 switch err {57 fmt.Println("Terminating due to timeout

Full Screen

Full Screen

stopTimer

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 timer := time.NewTimer(2 * time.Second)4 fmt.Println("Timer 1 expired")5 timer1 := time.NewTimer(time.Second)6 fmt.Println("Timer 1 expired")7 stop2 := timer1.Stop()8 if stop2 {9 fmt.Println("Timer 1 stopped")10 }11}12import (13func main() {14 timer2 := time.NewTimer(time.Second * 2)15 timer2.Reset(0)16 fmt.Println("Timer 2 expired")17}18import (

Full Screen

Full Screen

stopTimer

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 log.Println("Starting work.")4 time.AfterFunc(5*time.Second, StopTimer)5 log.Println("Timer expired")6}7func StopTimer() {8 log.Println("Stopping timer.")9 timer.Stop()10 os.Exit(0)11}12Related Articles: GoLang | time.AfterFunc() method13GoLang | time.After() method14GoLang | time.Tick() method15GoLang | time.Ticker() method16GoLang | time.Sleep() method17GoLang | time.Now() method18GoLang | time.Date() method19GoLang | time.Parse() method20GoLang | time.ParseInLocation() method21GoLang | time.Format() method22GoLang | time.Format() method

Full Screen

Full Screen

stopTimer

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 t := time.NewTimer(2 * time.Second)4 stopped := t.Stop()5 if stopped {6 fmt.Println("Timer stopped")7 }8}9import (10func main() {11 t := time.NewTimer(2 * time.Second)12 stopped := t.Reset(1 * time.Second)13 if stopped {14 fmt.Println("Timer stopped")15 }16}17import (18func main() {19 t := time.NewTimer(2 * time.Second)20 stopped := t.Reset(3 * time.Second)21 if stopped {22 fmt.Println("Timer stopped")23 }24}25import (26func main() {27 t := time.NewTimer(2 * time.Second)28 stopped := t.Reset(1 * time.Second)29 if !stopped {30 fmt.Println("Timer not stopped")31 }32}33import (34func main() {35 t := time.NewTimer(2 * time.Second)36 stopped := t.Reset(3 * time.Second)37 if !stopped {38 fmt.Println("Timer not stopped")39 }40}41import (42func main() {

Full Screen

Full Screen

stopTimer

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 runner := new(Runner)4 runner.Start()5 time.Sleep(3 * time.Second)6 runner.Stop()7 fmt.Println("Done!")8}9import (10type Runner struct {11}12func (r *Runner) Start() {13 r.interrupt = make(chan os.Signal, 1)14 r.complete = make(chan error)15 r.timeout = time.After(5 * time.Second)16 signal.Notify(r.interrupt, os.Interrupt)17 go func() {18 r.complete <- r.run()19 }()20 select {21 if err != nil {22 fmt.Println("Error:", err)23 }24 fmt.Println("Timeout")25 }26}27func (r *Runner) run() error {28 fmt.Println("Starting task")29 time.Sleep(4 * time.Second)30 fmt.Println("Task complete")31}32func (r *Runner) Stop() {33 signal.Stop(r.interrupt)34 close(r.interrupt)35}

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful