How to use newJobProcessor method of main Package

Best Syzkaller code snippet using main.newJobProcessor

job_processor.go

Source:job_processor.go Github

copy

Full Screen

1package concurrency2import (3 "container/list"4 "runtime"5 "github.com/pkg/errors"6)7var (8 // ErrJobProcessorStopped is the error returned by AddJob when the job processor has been stopped.9 ErrJobProcessorStopped = errors.New("job processor has been stopped")10)11// A JobProcessor allows callers to submit pieces of work that will be executed concurrently by a set of workers.12// When specifying a job, in addition to the function to be executed itself, callers can provide a function that13// specifies conflicts between the incoming job and whatever other jobs exist in the processor.14// The processor guarantees that when a job executes, no job it conflicts with is executed at the same time.15// Further, it guarantees that if one call to AddJob returns before another call to AddJob, and the two jobs16// conflict, the first job will execute first.17// This is useful when, for example, trying to ensure that jobs that touch the same data are not executed at the same18// time.19type JobProcessor interface {20 // AddJob adds a job to the processor. The execute func represents what actually gets executed.21 // Each job also comes with some arbitrary caller-supplied metadata.22 // The caller can also provide a conflictsWith function.23 // If not nil, this function is called with the caller-supplied metadata of each job24 // that is currently pending in the processor, and whichever job it returns true for25 // is marked as conflicting with this job.26 // It is generally expected that the conflictsWith function is symmetric27 // although this is not enforced.28 // Jobs are added in the order in which they were received, and if two jobs conflict,29 // the job received first will execute before the second.30 // It is safe to call AddJob simultaneously from multiple goroutines, although it is then unspecified31 // which job is marked as received first unless the caller takes additional steps to order the calls.32 // AddJob is non-blocking. If the caller wants to know when their job has finished executing,33 // they need to put that logic into the execute func they pass (which will need to be a closure).34 AddJob(metadata interface{}, conflictsWith func(otherJobMetadata interface{}) bool, execute func()) error35 // Stop shuts down the processor, terminating all goroutines.36 // Incomplete jobs are abandoned may or may not run to completion.37 Stop()38 // GracefulStop shuts down the processor gracefully.39 // After GracefulStop returns, the processor will not accept any new jobs.40 // However, any jobs that are in progress (ie, for which AddJob returned a `nil` error)41 // are allowed to run to completion.42 GracefulStop()43 // Stopped returns whether the processor has been shut down.44 // If Stopped returns true, that means all goroutines spawned by the processor45 // have been shut down.46 Stopped() bool47}48// A dagNode is a node in the DAG representing the jobs to be run.49// Each node stores both child and parent edges, so that the DAG can be efficiently50// traversed in both directions.51type dagNode struct {52 // execute MUST NOT be modified by the processLoop since it is accessed by the worker goroutines.53 // All other fields can be freely accessed by the processLoop, and must NOT be accessed in any way54 // by the worker goroutines.55 metadata interface{}56 execute func()57 numBlocking int58 blocks map[*dagNode]struct{}59}60// The dag represents the directed acyclic graph of jobs that have not yet been completed.61// It stores a reference to all the nodes that are unblocked (and ready to execute).62// Since it is a DAG, it is guaranteed that, unless the DAG is empty,63// there will always be at least one unblocked node.64type dag struct {65 allNodes map[*dagNode]struct{}66 unblockedNodes list.List67}68func newDAG() dag {69 return dag{70 allNodes: make(map[*dagNode]struct{}),71 }72}73// addJob adds a new job to the DAG, adding dependencies by invoking the conflictsWith function.74func (d *dag) addJob(metadata interface{}, isBlockedBy func(interface{}) bool, execute func()) {75 newNode := &dagNode{76 metadata: metadata,77 execute: execute,78 blocks: make(map[*dagNode]struct{}),79 }80 if isBlockedBy != nil {81 for existingNode := range d.allNodes {82 if isBlockedBy(existingNode.metadata) {83 newNode.numBlocking++84 existingNode.blocks[newNode] = struct{}{}85 }86 }87 }88 d.allNodes[newNode] = struct{}{}89 if newNode.numBlocking == 0 {90 d.unblockedNodes.PushBack(newNode)91 }92}93// removeJob removes a job from the DAG. All jobs that it blocks lose this edges,94// and if this makes them unblocked, they are moved to the unblockedNodes set.95// It is assumed that the removed job is in the unblocked nodes, since other96// jobs cannot be removed.97func (d *dag) removeJob(node *dagNode) {98 for blockedNode := range node.blocks {99 blockedNode.numBlocking--100 if blockedNode.numBlocking == 0 {101 d.unblockedNodes.PushBack(blockedNode)102 }103 }104 delete(d.allNodes, node)105}106// popUnblockedJob pops an unblocked job from the list107// or returns nil if there are no unblocked jobs.108func (d *dag) popUnblockedJob() *dagNode {109 front := d.unblockedNodes.Front()110 if front == nil {111 return nil112 }113 d.unblockedNodes.Remove(front)114 return front.Value.(*dagNode)115}116type jobRequest struct {117 metadata interface{}118 execute func()119 conflictsWith func(interface{}) bool120}121type jobProcessorImpl struct {122 jobDAG dag123 stopSig Signal124 gracefulStopSig Signal125 maxWorkers int126 currentRunningWorkers int127 jobReqC chan jobRequest128 completedJobsC chan *dagNode129}130func (j *jobProcessorImpl) AddJob(metadata interface{}, conflictsWith func(otherJobMetadata interface{}) bool, execute func()) error {131 // We check this first to avoid the chance that AddJob succeeds after a call to GracefulStop().132 // (Since the process loop may be in its select statement, it is conceivable that the job request133 // case succeeds instead of the gracefulStopSig case).134 if j.gracefulStopSig.IsDone() {135 return ErrJobProcessorStopped136 }137 select {138 case j.jobReqC <- jobRequest{metadata: metadata, conflictsWith: conflictsWith, execute: execute}:139 case <-j.gracefulStopSig.Done():140 return ErrJobProcessorStopped141 case <-j.stopSig.Done():142 return ErrJobProcessorStopped143 }144 return nil145}146func (j *jobProcessorImpl) Stop() {147 j.stopSig.Signal()148}149func (j *jobProcessorImpl) Stopped() bool {150 return j.stopSig.IsDone()151}152func (j *jobProcessorImpl) GracefulStop() {153 j.gracefulStopSig.Signal()154}155func (j *jobProcessorImpl) sendReadyJobsToAvailableWorkers() {156 for j.currentRunningWorkers < j.maxWorkers {157 nextJob := j.jobDAG.popUnblockedJob()158 if nextJob == nil {159 return160 }161 j.currentRunningWorkers++162 go j.runJob(nextJob)163 }164}165// The processLoop does all the meta-processing for the processor (that is, everything except the execution of the166// jobs itself). It helps sequentialize all data access to the processor's data structures, thus avoiding the need167// for locks.168func (j *jobProcessorImpl) processLoop() {169 for !j.stopSig.IsDone() {170 var gracefulStopWhenIdle <-chan struct{}171 if j.currentRunningWorkers == 0 {172 // Only select on j.gracefulStopSig.Done() when there are no running workers, because otherwise, we173 // will want to process the completed job before stopping the job processor (and we know that174 // this select won't block forever since there is going to be a completed job eventually).175 gracefulStopWhenIdle = j.gracefulStopSig.Done()176 }177 select {178 case jobReq := <-j.jobReqC:179 j.jobDAG.addJob(jobReq.metadata, jobReq.conflictsWith, jobReq.execute)180 case completedJobNode := <-j.completedJobsC:181 j.currentRunningWorkers--182 j.jobDAG.removeJob(completedJobNode)183 case <-j.stopSig.Done():184 return185 case <-gracefulStopWhenIdle:186 // Intentionally don't return here, since we do still want to signal j.stopSig.187 // We _could_ just put the following here:188 //189 // j.stopSig.Signal()190 // return191 //192 // But leaving it blank and allowing it to happen through the code below feels more DRY.193 case <-j.stopSig.Done():194 return195 }196 j.sendReadyJobsToAvailableWorkers()197 if j.gracefulStopSig.IsDone() && j.currentRunningWorkers == 0 {198 j.stopSig.Signal()199 }200 }201}202func (j *jobProcessorImpl) runJob(job *dagNode) {203 job.execute()204 select {205 case j.completedJobsC <- job:206 case <-j.stopSig.Done():207 }208}209// NewJobProcessor returns a new, ready-to-use JobProcessor, and kicks off a number of worker goroutines.210// equal to numWorkers, as well as the job processor's main processing loop.211// See the comments on the JobProcessor interface for details on how to use this.212// If numWorkers is <= 0, a number of workers equal to the number of CPUs213// is used.214func NewJobProcessor(numWorkers int) JobProcessor {215 if numWorkers <= 0 {216 numWorkers = runtime.NumCPU()217 }218 j := &jobProcessorImpl{219 jobDAG: newDAG(),220 stopSig: NewSignal(),221 gracefulStopSig: NewSignal(),222 maxWorkers: numWorkers,223 jobReqC: make(chan jobRequest),224 completedJobsC: make(chan *dagNode),225 }226 go j.processLoop()227 return j228}...

Full Screen

Full Screen

main.go

Source:main.go Github

copy

Full Screen

1// Command dequeuer dequeues jobs and sends them to a downstream server.2package main3import (4 "context"5 "flag"6 "fmt"7 "log"8 "os"9 "os/signal"10 "github.com/kevinburke/handlers"11 "github.com/kevinburke/rickover/config"12 "github.com/kevinburke/rickover/dequeuer"13 "github.com/kevinburke/rickover/metrics"14 "github.com/kevinburke/rickover/services"15 "golang.org/x/sys/unix"16)17func checkError(err error) {18 if err != nil {19 log.Fatal(err)20 }21}22func main() {23 flag.Parse()24 logger := handlers.Logger25 ctx, cancel := context.WithCancel(context.Background())26 go func() {27 sigterm := make(chan os.Signal, 1)28 signal.Notify(sigterm, unix.SIGINT, unix.SIGTERM)29 sig := <-sigterm30 fmt.Printf("Caught signal %v, shutting down...\n", sig)31 cancel()32 }()33 dbConns, err := config.GetInt("PG_WORKER_POOL_SIZE")34 if err != nil {35 log.Printf("Error getting database pool size: %s. Defaulting to 20", err)36 dbConns = 2037 }38 // We're going to make a lot of requests to the same downstream service.39 httpConns, err := config.GetInt("HTTP_MAX_IDLE_CONNS")40 if err == nil {41 config.SetMaxIdleConnsPerHost(httpConns)42 } else {43 config.SetMaxIdleConnsPerHost(100)44 }45 go metrics.Run(ctx, metrics.LibratoConfig{46 Namespace: "rickover.dequeuer",47 Source: "worker",48 Email: os.Getenv("LIBRATO_EMAIL_ACCOUNT"),49 })50 parsedUrl := config.GetURLOrBail("DOWNSTREAM_URL")51 downstreamPassword := os.Getenv("DOWNSTREAM_WORKER_AUTH")52 if downstreamPassword == "" {53 logger.Warn("No DOWNSTREAM_WORKER_AUTH configured, setting an empty password for auth")54 }55 handler := services.NewDownstreamHandler(logger, parsedUrl.String(), downstreamPassword)56 srv, err := dequeuer.New(ctx, dequeuer.Config{57 Logger: logger,58 NumConns: dbConns,59 Processor: services.NewJobProcessor(handler),60 StuckJobTimeout: dequeuer.DefaultStuckJobTimeout,61 DisableMetaShutdown: os.Getenv("DISABLE_META_SHUTDOWN") == "true",62 })63 checkError(err)64 if err := srv.Run(ctx); err != nil && err != context.Canceled {65 logger.Error("error running dequeuer", "err", err)66 os.Exit(1)67 }68 logger.Info("All pools shut down. Quitting.")69}...

Full Screen

Full Screen

newJobProcessor

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 jobProcessor := newJobProcessor()4 jobProcessor.start()5 time.Sleep(5 * time.Second)6 jobProcessor.stop()7 fmt.Println("Done")8}9import (10type JobProcessor struct {11}12func newJobProcessor() *JobProcessor {13 return &JobProcessor{14 stopChan: make(chan bool),15 }16}17func (jp *JobProcessor) start() {18 go func() {19 for {20 select {21 fmt.Println("Stopping job processor")22 fmt.Println("Processing job")23 }24 time.Sleep(1 * time.Second)25 }26 }()27}28func (jp *JobProcessor) stop() {29}

Full Screen

Full Screen

newJobProcessor

Using AI Code Generation

copy

Full Screen

1func main() {2 jobProcessor := newJobProcessor()3 jobProcessor.startProcessingJobs()4}5func newJobProcessor() *jobProcessor {6 return &jobProcessor{}7}8func (p *jobProcessor) startProcessingJobs() {9}10type jobProcessor struct {11}12type jobProcessor struct {13}14func main() {15 jobProcessor := newJobProcessor()16 startProcessingJobs(jobProcessor)17}18func startProcessingJobs(p *jobProcessor) {19}20func newJobProcessor() *jobProcessor {21 return &jobProcessor{}22}

Full Screen

Full Screen

newJobProcessor

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 jp := job.NewJobProcessor()4 fmt.Println(jp)5}6import "fmt"7type JobProcessor struct {8}9func NewJobProcessor() *JobProcessor {10 fmt.Println("JobProcessor created")11 return &JobProcessor{}12}13 /usr/local/go/src/main/job (from $GOROOT)14 /home/username/go/src/main/job (from $GOPATH)15What am I doing wrong here? How can I import a package from the same directory?16require (17import (18func main() {19 fmt.Println(uuid.New())20}21./mymodule.go:1:1: package mymodule is not in GOROOT (/usr/local/go/src/mymodule)

Full Screen

Full Screen

newJobProcessor

Using AI Code Generation

copy

Full Screen

1func main() {2 jobProcessor := newJobProcessor()3 jobProcessor.start()4}5func newJobProcessor() jobProcessor {6 return jobProcessor{}7}8func (jobProcessor jobProcessor) start() {9 jobProcessor := newJobProcessor()10 jobProcessor.start()11}12func (jobProcessor jobProcessor) newJobProcessor() jobProcessor {13 return jobProcessor{}14}15func (jobProcessor jobProcessor) start() {16 jobProcessor := jobProcessor.newJobProcessor()17 jobProcessor.start()18}19func (jobProcessor jobProcessor) newJobProcessor() jobProcessor {20 return jobProcessor{}21}22func (jobProcessor jobProcessor) start() {23 jobProcessor := jobProcessor.newJobProcessor()24 jobProcessor.start()25}26func (jobProcessor jobProcessor) newJobProcessor() jobProcessor {27 return jobProcessor{}28}29func (jobProcessor jobProcessor) start() {30 jobProcessor := jobProcessor.newJobProcessor()31 jobProcessor.start()32}33func (jobProcessor jobProcessor) newJobProcessor() jobProcessor {

Full Screen

Full Screen

newJobProcessor

Using AI Code Generation

copy

Full Screen

1func main() {2 jp := newJobProcessor()3 jp.start()4 jp.wait()5}6func newJobProcessor() *jobProcessor {7 jp := &jobProcessor{8 jobs: make(chan *job),9 }10}11func (jp *jobProcessor) start() {12 go func() {13 for j := range jp.jobs {14 j.process()15 }16 }()17}18func (jp *jobProcessor) wait() {19}20func newJob() *job {21 j := &job{22 wg: &sync.WaitGroup{},23 }24 j.wg.Add(1)25}26func (j *job) process() {27 defer j.wg.Done()28}29func (jp *jobProcessor) submit(j *job) {30}31func (j *job) wait() {32 j.wg.Wait()33}34func (jp *jobProcessor) close() {35 close(jp.jobs)36 close(jp.done)37}

Full Screen

Full Screen

newJobProcessor

Using AI Code Generation

copy

Full Screen

1import "fmt"2func main() {3 jp = newJobProcessor()4 jp.start()5}6import "fmt"7type jobProcessor struct {8}9func newJobProcessor() jobProcessor {10}11func (jp jobProcessor) start() {12 fmt.Println("job processor started")13}14import "fmt"15type jobProcessor struct {16}17func newJobProcessor() jobProcessor {18}19func (jp jobProcessor) start() {20 fmt.Println("job processor started")21}22import "fmt"23type jobProcessor struct {24}25func newJobProcessor() jobProcessor {26}27func (jp jobProcessor) start() {28 fmt.Println("job processor started")29}30import "fmt"31type jobProcessor struct {32}33func newJobProcessor() jobProcessor {34}35func (jp jobProcessor) start() {36 fmt.Println("job processor started")37}38import "fmt"39type jobProcessor struct {40}41func newJobProcessor() jobProcessor {42}

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Syzkaller automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Most used method in

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful