Best Testkube code snippet using workerpool.Run
workers.go
Source:workers.go
2import (3 "errors"4 "sync"5)6type RunFunc func(...interface{})7type WorkerPool struct {8 // The worker's run function9 run RunFunc10 // The channel for workers to listen for jobs11 jobs chan []interface{}12 // The channel to stop a certain number of workers13 stop chan struct{}14 // The size of this worker pool (number of workers)15 size int16 sizeMutex sync.Mutex17 // The number of busy workers in this worker pool18 busy int19 busyMutex sync.Mutex20 // The number of workers waiting to close21 closing int22 closingMutex sync.Mutex23}24// Create a new WorkerPool with an initial worker count25//26// Panics when size < 027func NewPool(size int, run RunFunc) *WorkerPool {28 if size < 0 {29 panic("size must be greater than zero")30 }31 pool := &WorkerPool{32 run: run,33 jobs: make(chan []interface{}),34 stop: make(chan struct{}),35 size: size,36 busy: 0,37 }38 // spawn workers up to the limit39 pool.createWorkers(size)40 return pool41}42// Create a new WorkerPool with an initial worker count and job buffer size43//44// The job buffer allows new jobs to be queued without blocking if45// all the workers are busy46//47// Panics when size < 048func NewBufferedPool(size, bufSize int, run RunFunc) *WorkerPool {49 if size < 0 {50 panic("size must be greater than zero")51 }52 pool := &WorkerPool{53 run: run,54 jobs: make(chan []interface{}, bufSize),55 stop: make(chan struct{}),56 size: size,57 busy: 0,58 }59 // spawn workers up to the limit60 pool.createWorkers(size)61 return pool62}63// Add a job to this WorkerPool64func (w *WorkerPool) Run(data ...interface{}) {65 w.jobs <- data66}67// Resize the WorkerPool by scaling up or down to accommodate a new size68func (w *WorkerPool) ScaleTo(newSize int) error {69 if newSize < w.size {70 return w.ScaleDown(newSize)71 }72 if newSize > w.size {73 return w.ScaleUp(newSize)74 }75 return errors.New("newSize must not be equal to the current size")76}77// Scale the WorkerPool up to a new specified size78//...
job_test.go
Source:job_test.go
...37 go job.Cancel()38 done := <-ctx.Done()39 assert.Equal(t, struct{}{}, done)40}41func TestJob_Run(t *testing.T) {42 hasRun := false43 job := &workerpool.Job{44 ActionFunc: func(j *workerpool.Job) error {45 hasRun = true46 return nil47 },48 }49 job.Init(&nullLogger{})50 job.Run()51 assert.True(t, hasRun)52 assert.Equal(t, workerpool.COMPLETED, job.Status())53}54func TestJob_Cancel(t *testing.T) {55 cancelled := false56 job := &workerpool.Job{57 ActionFunc: func(j *workerpool.Job) error {58 return nil59 },60 CancelFunc: func(j *workerpool.Job) error {61 cancelled = true62 return nil63 },64 }65 job.Init(&nullLogger{})66 ctx := job.Context()67 go job.Cancel()68 done := <-ctx.Done()69 assert.Equal(t, struct{}{}, done)70 assert.True(t, cancelled)71}72func TestJob_Status(t *testing.T) {73 job := &workerpool.Job{74 ActionFunc: func(j *workerpool.Job) error {75 return nil76 },77 }78 job.Init(&nullLogger{})79 assert.Equal(t, workerpool.PENDING, job.Status())80}81func TestJob_Error(t *testing.T) {82 job := &workerpool.Job{}83 job.Init(&nullLogger{})84 assert.Equal(t, workerpool.ErrActionNotDefined, job.Error())85}86func TestJob_ErrHandler(t *testing.T) {87 var errFromHandler error88 job := &workerpool.Job{89 ErrHandler: func(_ *workerpool.Job, err error, panic bool) {90 errFromHandler = err91 },92 }93 job.Init(&nullLogger{})94 assert.Equal(t, workerpool.ErrActionNotDefined, errFromHandler)95}96func TestJob_OnStatusChangeFunc(t *testing.T) {97 status := workerpool.PENDING98 job := &workerpool.Job{99 ActionFunc: func(j *workerpool.Job) error {100 return nil101 },102 OnStatusChangeFunc: func(j *workerpool.Job) error {103 status = j.Status()104 return nil105 },106 }107 job.Init(&nullLogger{})108 go job.Run()109 <-job.Context().Done()110 assert.Equal(t, workerpool.COMPLETED, status)111}...
worker_pool.go
Source:worker_pool.go
...3 "log"4)5// WorkerPool is a contract for Worker Pool implementation6type WorkerPool interface {7 Run()8 AddTask(task func())9}10type workerPool struct {11 maxWorker int12 queuedTaskC chan func()13}14// NewWorkerPool will create an instance of WorkerPool.15func NewWorkerPool(maxWorker int) WorkerPool {16 wp := &workerPool{17 maxWorker: maxWorker,18 queuedTaskC: make(chan func()),19 }20 return wp21}22func (wp *workerPool) Run() {23 wp.run()24}25func (wp *workerPool) AddTask(task func()) {26 wp.queuedTaskC <- task27}28func (wp *workerPool) GetTotalQueuedTask() int {29 return len(wp.queuedTaskC)30}31func (wp *workerPool) run() {32 for i := 0; i < wp.maxWorker; i++ {33 wID := i + 134 log.Printf("[WorkerPool] Worker %d has been spawned", wID)35 go func(workerID int) {36 for task := range wp.queuedTaskC {...
Run
Using AI Code Generation
1import (2func main() {3 wp := workerpool.New(2)4 for i := 0; i < 10; i++ {5 wp.Run(func() {6 fmt.Println("Hello")7 time.Sleep(time.Second)8 })9 }10 time.Sleep(time.Second * 5)11}12import (13func main() {14 wp := workerpool.New(2)15 for i := 0; i < 10; i++ {16 wp.Run(func() {17 fmt.Println("Hello")18 time.Sleep(time.Second)19 })20 }21 wp.Wait()22}23import (24func main() {25 start := time.Now()26 wp := workerpool.New(2)27 for i := 0; i < 10; i++ {28 wp.Run(func() {29 fmt.Println("Hello")30 time.Sleep(time.Second)31 })32 }33 wp.Wait()34 fmt.Println(time.Since(start))35}36import (37func main() {38 start := time.Now()39 wp := workerpool.New(2)40 for i := 0; i < 10; i++ {41 wp.Run(func() {42 fmt.Println("Hello")43 time.Sleep(time.Second)44 })45 }46 wp.Wait()47 fmt.Println(time.Since(start))
Run
Using AI Code Generation
1import (2func main() {3 wp := workerpool.New(4)4 for i := 0; i < 10; i++ {5 wp.Run(func() {6 fmt.Println("worker")7 time.Sleep(1 * time.Second)8 })9 }10 wp.Wait()11}
Run
Using AI Code Generation
1func main() {2 wp := workerpool.New(5)3 for i := 0; i < 100; i++ {4 wp.Run(func() {5 fmt.Println("Hello World")6 })7 }8 wp.Wait()9}
Run
Using AI Code Generation
1import (2func main() {3 workerPool.Run(3)4 for i := 1; i <= 10; i++ {5 workerPool.AddJob(func() {6 fmt.Println("Hello World")7 time.Sleep(2 * time.Second)8 })9 }10 workerPool.Close()11}
Run
Using AI Code Generation
1import (2func main() {3 wp := workerpool.New(5)4 for i := 0; i < 100; i++ {5 wp.Add(func() {6 time.Sleep(1 * time.Second)7 })8 }9 wp.Wait()10 fmt.Println("All
Run
Using AI Code Generation
1import (2func main() {3 work := func() {4 fmt.Println("worker")5 time.Sleep(time.Second)6 }7 pool := NewWorkerPool(2)8 pool.Run(work)
Run
Using AI Code Generation
1func main() {2 wp := workerpool.New(10)3 wp.Run(func() {4 fmt.Println("Hello World!")5 })6 wp.Wait()7}8import (9func main() {10 wp := workerpool.New(10)11 wp.Run(func() {12 fmt.Println("Hello World!")13 })14 wp.Wait()15}
Run
Using AI Code Generation
1import (2func test() {3 fmt.Println("test")4 time.Sleep(1 * time.Second)5}6func main() {7 pool := workerpool.New(2)8 for i := 0; i < 4; i++ {9 pool.AddTask(test)10 }11 pool.Wait()12}13import (14func test() {15 fmt.Println("test")16 time.Sleep(1 * time.Second)17}18func main() {19 pool := workerpool.New(2)20 for i := 0; i < 4; i++ {21 pool.AddTask(test)22 }23 pool.Wait()24 for i := 0; i < 4; i++ {25 pool.AddTask(test)26 }27 pool.Wait()28}29import (30func test() {31 fmt.Println("test")32 time.Sleep(1 * time.Second)33}34func main() {35 pool := workerpool.New(2)36 for i := 0; i < 4; i++ {37 pool.AddTask(test)38 }39 pool.Wait()40 for i := 0; i < 4; i++ {41 pool.AddTask(test)42 }43 pool.Wait()
Run
Using AI Code Generation
1import (2func main() {3 pool := workerpool.New(5)4 for i := 0; i < 10; i++ {5 pool.Run(func() {6 fmt.Println("Task", i, "started")7 time.Sleep(time.Second)8 fmt.Println("Task", i, "finished")9 })10 }11 pool.Wait()12}13import (14func main() {15 pool := workerpool.New(5)16 for i := 0; i < 10; i++ {17 pool.Run(func() {18 fmt.Println("Task", i, "started")19 time.Sleep(time.Second)20 fmt.Println("Task", i, "finished")21 })22 }23 pool.Wait()24}25import (26func main() {27 pool := workerpool.New(5)28 for i := 0; i < 10; i++ {29 pool.Run(func() {30 fmt.Println("Task", i, "started")31 time.Sleep(time.Second)32 fmt.Println("Task", i, "finished")33 })34 }35 pool.Wait()36}37import (38func main() {39 pool := workerpool.New(5)40 for i := 0; i < 10; i++ {41 pool.Run(func() {
Run
Using AI Code Generation
1import (2func main() {3 fmt.Println("Hello, playground")4 pool := workerpool.New(4)5 for i := 0; i < 1000; i++ {6 pool.Run(func() {7 time.Sleep(time.Second)8 fmt.Println(i)9 })10 }11 pool.Wait()12}
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!