How to use D class

Best Quick code snippet using D

store.go

Source:store.go Github

copy

Full Screen

...66	// handle of the other ShareableStore.67	With(other basestore.ShareableStore) Store68	// QueuedCount returns the number of queued records matching the given conditions.69	QueuedCount(ctx context.Context, includeProcessing bool, conditions []*sqlf.Query) (int, error)70	// MaxDurationInQueue returns the maximum age of queued records in this store. Returns 0 if there are no queued records.71	MaxDurationInQueue(ctx context.Context) (time.Duration, error)72	// Dequeue selects the first queued record matching the given conditions and updates the state to processing. If there73	// is such a record, it is returned. If there is no such unclaimed record, a nil record and and a nil cancel function74	// will be returned along with a false-valued flag. This method must not be called from within a transaction.75	//76	// The supplied conditions may use the alias provided in `ViewName`, if one was supplied.77	Dequeue(ctx context.Context, workerHostname string, conditions []*sqlf.Query) (workerutil.Record, bool, error)78	// Heartbeat marks the given record as currently being processed.79	Heartbeat(ctx context.Context, ids []int, options HeartbeatOptions) (knownIDs []int, err error)80	// Requeue updates the state of the record with the given identifier to queued and adds a processing delay before81	// the next dequeue of this record can be performed.82	Requeue(ctx context.Context, id int, after time.Time) error83	// AddExecutionLogEntry adds an executor log entry to the record and returns the ID of the new entry (which can be84	// used with UpdateExecutionLogEntry) and a possible error. When the record is not found (due to options not matching85	// or the record being deleted), ErrExecutionLogEntryNotUpdated is returned.86	AddExecutionLogEntry(ctx context.Context, id int, entry workerutil.ExecutionLogEntry, options ExecutionLogEntryOptions) (entryID int, err error)87	// UpdateExecutionLogEntry updates the executor log entry with the given ID on the given record. When the record is not88	// found (due to options not matching or the record being deleted), ErrExecutionLogEntryNotUpdated is returned.89	UpdateExecutionLogEntry(ctx context.Context, recordID, entryID int, entry workerutil.ExecutionLogEntry, options ExecutionLogEntryOptions) error90	// MarkComplete attempts to update the state of the record to complete. If this record has already been moved from91	// the processing state to a terminal state, this method will have no effect. This method returns a boolean flag92	// indicating if the record was updated.93	MarkComplete(ctx context.Context, id int, options MarkFinalOptions) (bool, error)94	// MarkErrored attempts to update the state of the record to errored. This method will only have an effect95	// if the current state of the record is processing or completed. A requeued record or a record already marked96	// with an error will not be updated. This method returns a boolean flag indicating if the record was updated.97	MarkErrored(ctx context.Context, id int, failureMessage string, options MarkFinalOptions) (bool, error)98	// MarkFailed attempts to update the state of the record to failed. This method will only have an effect99	// if the current state of the record is processing or completed. A requeued record or a record already marked100	// with an error will not be updated. This method returns a boolean flag indicating if the record was updated.101	MarkFailed(ctx context.Context, id int, failureMessage string, options MarkFinalOptions) (bool, error)102	// ResetStalled moves all processing records that have not received a heartbeat within `StalledMaxAge` back to the103	// queued state. In order to prevent input that continually crashes worker instances, records that have been reset104	// more than `MaxNumResets` times will be marked as failed. This method returns a pair of maps from record105	// identifiers the age of the record's last heartbeat timestamp for each record reset to queued and failed states,106	// respectively.107	ResetStalled(ctx context.Context) (resetLastHeartbeatsByIDs, failedLastHeartbeatsByIDs map[int]time.Duration, err error)108}109type ExecutionLogEntry workerutil.ExecutionLogEntry110func (e *ExecutionLogEntry) Scan(value any) error {111	b, ok := value.([]byte)112	if !ok {113		return errors.Errorf("value is not []byte: %T", value)114	}115	return json.Unmarshal(b, &e)116}117func (e ExecutionLogEntry) Value() (driver.Value, error) {118	return json.Marshal(e)119}120func ExecutionLogEntries(raw []workerutil.ExecutionLogEntry) (entries []ExecutionLogEntry) {121	for _, entry := range raw {122		entries = append(entries, ExecutionLogEntry(entry))123	}124	return entries125}126type store struct {127	*basestore.Store128	options                         Options129	columnReplacer                  *strings.Replacer130	modifiedColumnExpressionMatches [][]MatchingColumnExpressions131	operations                      *operations132}133var _ Store = &store{}134// Options configure the behavior of Store over a particular set of tables, columns, and expressions.135type Options struct {136	// Name denotes the name of the store used to distinguish log messages and emitted metrics. The137	// store constructor will fail if this field is not supplied.138	Name string139	// TableName is the name of the table containing work records.140	//141	// The target table (and the target view referenced by `ViewName`) must have the following columns142	// and types:143	//144	//   - id: integer primary key145	//   - state: text (may be updated to `queued`, `processing`, `errored`, or `failed`)146	//   - failure_message: text147	//   - queued_at: timestamp with time zone148	//   - started_at: timestamp with time zone149	//   - last_heartbeat_at: timestamp with time zone150	//   - finished_at: timestamp with time zone151	//   - process_after: timestamp with time zone152	//   - num_resets: integer not null153	//   - num_failures: integer not null154	//   - execution_logs: json[] (each entry has the form of `ExecutionLogEntry`)155	//   - worker_hostname: text156	//157	// The names of these columns may be customized based on the table name by adding a replacement158	// pair in the AlternateColumnNames mapping.159	//160	// It's recommended to put an index or (or partial index) on the state column for more efficient161	// dequeue operations.162	TableName string163	// AlternateColumnNames is a map from expected column names to actual column names in the target164	// table. This allows existing tables to be more easily retrofitted into the expected record165	// shape.166	AlternateColumnNames map[string]string167	// ViewName is an optional name of a view on top of the table containing work records to query when168	// selecting a candidate. If this value is not supplied, `TableName` will be used. The value supplied169	// may also indicate a table alias, which can be referenced in `OrderByExpression`, `ColumnExpressions`,170	// and the conditions supplied to `Dequeue`.171	//172	// The target of this column must be a view on top of the configured table with the same column173	// requirements as the base table described above.174	//175	// Example use case:176	// The processor for LSIF uploads supplies `lsif_uploads_with_repository_name`, a view on top of the177	// `lsif_uploads` table that joins work records with the `repo` table and adds an additional repository178	// name column. This allows `Dequeue` to return a record with additional data so that a second query179	// is not necessary by the caller.180	ViewName string181	// Scan is the function used to convert a rows object into a record of the expected shape.182	Scan RecordScanFn183	// OrderByExpression is the SQL expression used to order candidate records when selecting the next184	// batch of work to perform. This expression may use the alias provided in `ViewName`, if one was185	// supplied.186	OrderByExpression *sqlf.Query187	// ColumnExpressions are the target columns provided to the query when selecting a job record. These188	// expressions may use the alias provided in `ViewName`, if one was supplied.189	ColumnExpressions []*sqlf.Query190	// StalledMaxAge is the maximum allowed duration between heartbeat updates of a job's last_heartbeat_at191	// field. An unmodified row that is marked as processing likely indicates that the worker that dequeued192	// the record has died.193	StalledMaxAge time.Duration194	// MaxNumResets is the maximum number of times a record can be implicitly reset back to the queued195	// state (via `ResetStalled`). If a record's reset attempts counter reaches this threshold, it will196	// be moved into the errored state rather than queued on its next reset to prevent an infinite retry197	// cycle of the same input.198	MaxNumResets int199	// ResetFailureMessage overrides the default failure message written to job records that have been200	// reset the maximum number of times.201	ResetFailureMessage string202	// RetryAfter determines whether the store dequeues jobs that have errored more than RetryAfter ago.203	// Setting this value to zero will disable retries entirely.204	//205	// If RetryAfter is a non-zero duration, the store dequeues records where:206	//207	//   - the state is 'errored'208	//   - the failed attempts counter hasn't reached MaxNumRetries209	//   - the finished_at timestamp was more than RetryAfter ago210	RetryAfter time.Duration211	// MaxNumRetries is the maximum number of times a record can be retried after an explicit failure.212	// Setting this value to zero will disable retries entirely.213	MaxNumRetries int214	// clock is used to mock out the wall clock used for heartbeat updates.215	clock glock.Clock216}217// RecordScanFn is a function that interprets row values as a particular record. This function should218// return a false-valued flag if the given result set was empty. This function must close the rows219// value if the given error value is nil.220//221// See the `CloseRows` function in the store/base package for suggested implementation details.222type RecordScanFn func(rows *sql.Rows, err error) (workerutil.Record, bool, error)223// New creates a new store with the given database handle and options.224func New(handle basestore.TransactableHandle, options Options) Store {225	return NewWithMetrics(handle, options, &observation.TestContext)226}227func NewWithMetrics(handle basestore.TransactableHandle, options Options, observationContext *observation.Context) Store {228	return newStore(handle, options, observationContext)229}230func newStore(handle basestore.TransactableHandle, options Options, observationContext *observation.Context) *store {231	if options.Name == "" {232		panic("no name supplied to github.com/sourcegraph/sourcegraph/internal/dbworker/store:newStore")233	}234	if options.ViewName == "" {235		options.ViewName = options.TableName236	}237	if options.clock == nil {238		options.clock = glock.NewRealClock()239	}240	alternateColumnNames := map[string]string{}241	for _, column := range columnNames {242		alternateColumnNames[column] = column243	}244	for k, v := range options.AlternateColumnNames {245		alternateColumnNames[k] = v246	}247	var replacements []string248	for k, v := range alternateColumnNames {249		replacements = append(replacements, fmt.Sprintf("{%s}", k), v)250	}251	modifiedColumnExpressionMatches := matchModifiedColumnExpressions(options.ViewName, options.ColumnExpressions, alternateColumnNames)252	for i, expression := range options.ColumnExpressions {253		for _, match := range modifiedColumnExpressionMatches[i] {254			if match.exact {255				continue256			}257			log15.Error(``+258				`dbworker store: column expression refers to a column modified by dequeue in a complex expression. `+259				`The given expression will currently evaluate to the OLD value of the row, and the associated handler `+260				`will not have a completely up-to-date record. Please refer to this column without a transform.`,261				"index", i,262				"expression", expression.Query(sqlf.PostgresBindVar),263				"columnName", match.columnName,264				"storeName", options.Name,265			)266		}267	}268	return &store{269		Store:                           basestore.NewWithHandle(handle),270		options:                         options,271		columnReplacer:                  strings.NewReplacer(replacements...),272		modifiedColumnExpressionMatches: modifiedColumnExpressionMatches,273		operations:                      newOperations(options.Name, observationContext),274	}275}276// With creates a new Store with the given basestore.Shareable store as the277// underlying basestore.Store.278func (s *store) With(other basestore.ShareableStore) Store {279	return &store{280		Store:                           s.Store.With(other),281		options:                         s.options,282		columnReplacer:                  s.columnReplacer,283		modifiedColumnExpressionMatches: s.modifiedColumnExpressionMatches,284		operations:                      s.operations,285	}286}287// columnNames contain the names of the columns expected to be defined by the target table.288// Note: adding a new column to this list requires updating the worker documentation289// https://github.com/sourcegraph/sourcegraph/blob/main/doc/dev/background-information/workers.md#database-backed-stores290var columnNames = []string{291	"id",292	"state",293	"failure_message",294	"queued_at",295	"started_at",296	"last_heartbeat_at",297	"finished_at",298	"process_after",299	"num_resets",300	"num_failures",301	"execution_logs",302	"worker_hostname",303}304// QueuedCount returns the number of queued records matching the given conditions.305func (s *store) QueuedCount(ctx context.Context, includeProcessing bool, conditions []*sqlf.Query) (_ int, err error) {306	ctx, _, endObservation := s.operations.queuedCount.With(ctx, &err, observation.Args{})307	defer endObservation(1, observation.Args{})308	stateQueries := make([]*sqlf.Query, 0, 2)309	stateQueries = append(stateQueries, sqlf.Sprintf("%s", "queued"))310	if includeProcessing {311		stateQueries = append(stateQueries, sqlf.Sprintf("%s", "processing"))312	}313	count, _, err := basestore.ScanFirstInt(s.Query(ctx, s.formatQuery(314		queuedCountQuery,315		quote(s.options.ViewName),316		sqlf.Join(stateQueries, ","),317		s.options.MaxNumRetries,318		makeConditionSuffix(conditions),319	)))320	return count, err321}322const queuedCountQuery = `323-- source: internal/workerutil/store.go:QueuedCount324SELECT COUNT(*) FROM %s WHERE (325	{state} IN (%s) OR326	({state} = 'errored' AND {num_failures} < %s)327) %s328`329// MaxDurationInQueue returns the longest duration for which a job associated with this store instance has330// been in the queued state (including errored records that can be retried in the future). This method returns331// an duration of zero if there are no jobs ready for processing.332//333// If records backed by this store do not have an initial state of 'queued', or if it is possible to requeue334// records outside of this package, manual care should be taken to set the queued_at column to the proper time.335// This method makes no guarantees otherwise.336//337// See https://github.com/sourcegraph/sourcegraph/issues/32624.338func (s *store) MaxDurationInQueue(ctx context.Context) (_ time.Duration, err error) {339	ctx, _, endObservation := s.operations.maxDurationInQueue.With(ctx, &err, observation.Args{})340	defer endObservation(1, observation.Args{})341	now := s.now()342	retryAfter := int(s.options.RetryAfter / time.Second)343	ageInSeconds, ok, err := basestore.ScanFirstInt(s.Query(ctx, s.formatQuery(344		maxDurationInQueueQuery,345		// candidates346		quote(s.options.ViewName),347		// oldest_queued348		now,349		// oldest_retryable350		retryAfter,351		retryAfter,352		now,353		retryAfter,354		s.options.MaxNumRetries,355	)))356	if err != nil {357		return 0, err358	}359	if !ok {360		return 0, nil361	}362	return time.Duration(ageInSeconds) * time.Second, nil363}364const maxDurationInQueueQuery = `365-- source: internal/workerutil/store.go:MaxDurationInQueue366WITH367candidates AS (368	SELECT * FROM %s369),370oldest_queued AS (371	SELECT372		-- Select when the record was most recently dequeueable373		GREATEST({queued_at}, {process_after}) AS last_queued_at374	FROM candidates375	WHERE376		{state} = 'queued' AND377		({process_after} IS NULL OR {process_after} <= %s)378),379oldest_retryable AS (380	SELECT381		-- Select when the record was most recently dequeueable382		{finished_at} + (%s * '1 second'::interval) AS last_queued_at383	FROM candidates384	WHERE385		%s > 0 AND386		{state} = 'errored' AND387		%s - {finished_at} > (%s * '1 second'::interval) AND388		{num_failures} < %s389),390oldest_record AS (391	(392		SELECT last_queued_at FROM oldest_queued393		UNION394		SELECT last_queued_at FROM oldest_retryable395	)396	ORDER BY last_queued_at397	LIMIT 1398)399SELECT EXTRACT(EPOCH FROM NOW() - last_queued_at)::integer AS age FROM oldest_record400`401// columnsUpdatedByDequeue are the unmapped column names modified by the dequeue method.402var columnsUpdatedByDequeue = []string{403	"state",404	"started_at",405	"last_heartbeat_at",406	"finished_at",407	"failure_message",408	"execution_logs",409	"worker_hostname",410}411// Dequeue selects the first queued record matching the given conditions and updates the state to processing. If there412// is such a record, it is returned. If there is no such unclaimed record, a nil record and and a nil cancel function413// will be returned along with a false-valued flag. This method must not be called from within a transaction.414//415// A background goroutine that continuously updates the record's last modified time will be started. The returned cancel416// function should be called once the record no longer needs to be locked from selection or reset by another process.417// Most often, this will be when the handler moves the record into a terminal state.418//419// The supplied conditions may use the alias provided in `ViewName`, if one was supplied.420func (s *store) Dequeue(ctx context.Context, workerHostname string, conditions []*sqlf.Query) (_ workerutil.Record, _ bool, err error) {421	ctx, trace, endObservation := s.operations.dequeue.With(ctx, &err, observation.Args{})422	defer endObservation(1, observation.Args{})423	if s.InTransaction() {424		return nil, false, ErrDequeueTransaction425	}426	now := s.now()427	retryAfter := int(s.options.RetryAfter / time.Second)428	var (429		processingExpr     = sqlf.Sprintf("%s", "processing")430		nowTimestampExpr   = sqlf.Sprintf("%s::timestamp", now)431		nullExpr           = sqlf.Sprintf("NULL")432		workerHostnameExpr = sqlf.Sprintf("%s", workerHostname)433	)434	// NOTE: Changes to this mapping should be reflected in the package variable435	// columnsUpdatedByDequeue, also defined in this file.436	updatedColumns := map[string]*sqlf.Query{437		s.columnReplacer.Replace("{state}"):             processingExpr,438		s.columnReplacer.Replace("{started_at}"):        nowTimestampExpr,439		s.columnReplacer.Replace("{last_heartbeat_at}"): nowTimestampExpr,440		s.columnReplacer.Replace("{finished_at}"):       nullExpr,441		s.columnReplacer.Replace("{failure_message}"):   nullExpr,442		s.columnReplacer.Replace("{execution_logs}"):    nullExpr,443		s.columnReplacer.Replace("{worker_hostname}"):   workerHostnameExpr,444	}445	record, exists, err := s.options.Scan(s.Query(ctx, s.formatQuery(446		dequeueQuery,447		s.options.OrderByExpression,448		quote(s.options.ViewName),449		now,450		retryAfter,451		now,452		retryAfter,453		s.options.MaxNumRetries,454		makeConditionSuffix(conditions),455		s.options.OrderByExpression,456		quote(s.options.TableName),457		quote(s.options.TableName),458		quote(s.options.TableName),459		sqlf.Join(s.makeDequeueUpdateStatements(updatedColumns), ", "),460		sqlf.Join(s.makeDequeueSelectExpressions(updatedColumns), ", "),461		quote(s.options.ViewName),462	)))463	if err != nil {464		return nil, false, err465	}466	if !exists {467		return nil, false, nil468	}469	trace.Log(log.Int("recordID", record.RecordID()))470	return record, true, nil471}472const dequeueQuery = `473-- source: internal/workerutil/store.go:Dequeue474WITH potential_candidates AS (475	SELECT476		{id} AS candidate_id,477		ROW_NUMBER() OVER (ORDER BY %s) AS order478	FROM %s479	WHERE480		(481			(482				{state} = 'queued' AND483				({process_after} IS NULL OR {process_after} <= %s)484			) OR (485				%s > 0 AND486				{state} = 'errored' AND487				%s - {finished_at} > (%s * '1 second'::interval) AND488				{num_failures} < %s489			)490		)491		%s492	ORDER BY %s493),494candidate AS (495	SELECT496		{id} FROM %s497	JOIN potential_candidates pc ON pc.candidate_id = {id}498	WHERE499		-- Recheck state.500		{state} IN ('queued', 'errored')501	ORDER BY pc.order502	FOR UPDATE OF %s SKIP LOCKED503	LIMIT 1504),505updated_record AS (506	UPDATE507		%s508	SET509		%s510	WHERE511		{id} IN (SELECT {id} FROM candidate)512)513SELECT514	%s515FROM516	%s517WHERE518	{id} IN (SELECT {id} FROM candidate)519`520// makeDequeueSelectExpressions constructs the ordered set of SQL expressions that are returned521// from the dequeue query. This method returns a copy of the configured column expressions slice522// where expressions referencing one of the column updated by dequeue are replaced by the updated523// value.524//525// Note that this method only considers select expressions like `alias.ColumnName` and not something526// more complex like `SomeFunction(alias.ColumnName) + 1`. We issue a warning on construction of a527// new store configured in this way to indicate this (probably) unwanted behavior.528func (s *store) makeDequeueSelectExpressions(updatedColumns map[string]*sqlf.Query) []*sqlf.Query {529	selectExpressions := make([]*sqlf.Query, len(s.options.ColumnExpressions))530	copy(selectExpressions, s.options.ColumnExpressions)531	for i := range selectExpressions {532		for _, match := range s.modifiedColumnExpressionMatches[i] {533			if match.exact {534				selectExpressions[i] = updatedColumns[match.columnName]535				break536			}537		}538	}539	return selectExpressions540}541// makeDequeueUpdateStatements constructs the set of SQL statements that update values of the target table542// in the dequeue query.543func (s *store) makeDequeueUpdateStatements(updatedColumns map[string]*sqlf.Query) []*sqlf.Query {544	updateStatements := make([]*sqlf.Query, 0, len(updatedColumns))545	for columnName, updateValue := range updatedColumns {546		updateStatements = append(updateStatements, sqlf.Sprintf(columnName+"=%s", updateValue))547	}548	return updateStatements549}550func (s *store) Heartbeat(ctx context.Context, ids []int, options HeartbeatOptions) (knownIDs []int, err error) {551	ctx, _, endObservation := s.operations.heartbeat.With(ctx, &err, observation.Args{})552	defer endObservation(1, observation.Args{})553	if len(ids) == 0 {554		return []int{}, nil555	}556	sqlIDs := make([]*sqlf.Query, 0, len(ids))557	for _, id := range ids {558		sqlIDs = append(sqlIDs, sqlf.Sprintf("%s", id))559	}560	quotedTableName := quote(s.options.TableName)561	conds := []*sqlf.Query{562		s.formatQuery("{id} IN (%s)", sqlf.Join(sqlIDs, ",")),563		s.formatQuery("{state} = 'processing'"),564	}565	conds = append(conds, options.ToSQLConds(s.formatQuery)...)566	knownIDs, err = basestore.ScanInts(s.Query(ctx, s.formatQuery(updateCandidateQuery, quotedTableName, sqlf.Join(conds, "AND"), quotedTableName, s.now())))567	if err != nil {568		return nil, err569	}570	if len(knownIDs) != len(ids) {571	outer:572		for _, recordID := range ids {573			for _, test := range knownIDs {574				if test == recordID {575					continue outer576				}577			}578			debug, debugErr := s.fetchDebugInformationForJob(ctx, recordID)579			if debugErr != nil {580				log15.Error("failed to fetch debug information for job",581					"recordID", recordID,582					"err", debugErr,583				)584			}585			log15.Error("heartbeat lost a job",586				"recordID", recordID,587				"debug", debug,588				"options.workerHostname", options.WorkerHostname,589			)590		}591	}592	return knownIDs, nil593}594const updateCandidateQuery = `595-- source: internal/workerutil/store.go:Heartbeat596WITH alive_candidates AS (597	SELECT598		{id}599	FROM600		%s601	WHERE602		%s603	ORDER BY604		{id} ASC605	FOR UPDATE606)607UPDATE608	%s609SET610	{last_heartbeat_at} = %s611WHERE612	{id} IN (SELECT {id} FROM alive_candidates)613RETURNING {id}614`615// Requeue updates the state of the record with the given identifier to queued and adds a processing delay before616// the next dequeue of this record can be performed.617func (s *store) Requeue(ctx context.Context, id int, after time.Time) (err error) {618	ctx, _, endObservation := s.operations.requeue.With(ctx, &err, observation.Args{LogFields: []log.Field{619		log.Int("id", id),620		log.String("after", after.String()),621	}})622	defer endObservation(1, observation.Args{})623	return s.Exec(ctx, s.formatQuery(624		requeueQuery,625		quote(s.options.TableName),626		after,627		id,628	))629}630const requeueQuery = `631-- source: internal/workerutil/store.go:Requeue632UPDATE %s633SET634	{state} = 'queued',635	{queued_at} = clock_timestamp(),636	{started_at} = null,637	{process_after} = %s638WHERE {id} = %s639`640// AddExecutionLogEntry adds an executor log entry to the record and returns the ID of the new entry (which can be641// used with UpdateExecutionLogEntry) and a possible error. When the record is not found (due to options not matching642// or the record being deleted), ErrExecutionLogEntryNotUpdated is returned.643func (s *store) AddExecutionLogEntry(ctx context.Context, id int, entry workerutil.ExecutionLogEntry, options ExecutionLogEntryOptions) (entryID int, err error) {644	ctx, _, endObservation := s.operations.addExecutionLogEntry.With(ctx, &err, observation.Args{LogFields: []log.Field{645		log.Int("id", id),646	}})647	defer endObservation(1, observation.Args{})648	conds := []*sqlf.Query{649		s.formatQuery("{id} = %s", id),650	}651	conds = append(conds, options.ToSQLConds(s.formatQuery)...)652	entryID, ok, err := basestore.ScanFirstInt(s.Query(ctx, s.formatQuery(653		addExecutionLogEntryQuery,654		quote(s.options.TableName),655		ExecutionLogEntry(entry),656		sqlf.Join(conds, "AND"),657	)))658	if err != nil {659		return entryID, err660	}661	if !ok {662		debug, debugErr := s.fetchDebugInformationForJob(ctx, id)663		if debugErr != nil {664			log15.Error("failed to fetch debug information for job",665				"recordID", id,666				"err", debugErr,667			)668		}669		log15.Error("updateExecutionLogEntry failed and didn't match rows",670			"recordID", id,671			"debug", debug,672			"options.workerHostname", options.WorkerHostname,673			"options.state", options.State,674		)675		return entryID, ErrExecutionLogEntryNotUpdated676	}677	return entryID, nil678}679const addExecutionLogEntryQuery = `680-- source: internal/workerutil/store.go:AddExecutionLogEntry681UPDATE682	%s683SET {execution_logs} = {execution_logs} || %s::json684WHERE685	%s686RETURNING array_length({execution_logs}, 1)687`688// UpdateExecutionLogEntry updates the executor log entry with the given ID on the given record. When the record is not689// found (due to options not matching or the record being deleted), ErrExecutionLogEntryNotUpdated is returned.690func (s *store) UpdateExecutionLogEntry(ctx context.Context, recordID, entryID int, entry workerutil.ExecutionLogEntry, options ExecutionLogEntryOptions) (err error) {691	ctx, _, endObservation := s.operations.updateExecutionLogEntry.With(ctx, &err, observation.Args{LogFields: []log.Field{692		log.Int("recordID", recordID),693		log.Int("entryID", entryID),694	}})695	defer endObservation(1, observation.Args{})696	conds := []*sqlf.Query{697		s.formatQuery("{id} = %s", recordID),698		s.formatQuery("array_length({execution_logs}, 1) >= %s", entryID),699	}700	conds = append(conds, options.ToSQLConds(s.formatQuery)...)701	_, ok, err := basestore.ScanFirstInt(s.Query(ctx, s.formatQuery(702		updateExecutionLogEntryQuery,703		quote(s.options.TableName),704		entryID,705		ExecutionLogEntry(entry),706		sqlf.Join(conds, "AND"),707	)))708	if err != nil {709		return err710	}711	if !ok {712		debug, debugErr := s.fetchDebugInformationForJob(ctx, recordID)713		if debugErr != nil {714			log15.Error("failed to fetch debug information for job",715				"recordID", recordID,716				"err", debugErr,717			)718		}719		log15.Error("updateExecutionLogEntry failed and didn't match rows",720			"recordID", recordID,721			"debug", debug,722			"options.workerHostname", options.WorkerHostname,723			"options.state", options.State,724		)725		return ErrExecutionLogEntryNotUpdated726	}727	return nil728}729const updateExecutionLogEntryQuery = `730-- source: internal/workerutil/store.go:UpdateExecutionLogEntry731UPDATE732	%s733SET {execution_logs}[%s] = %s::json734WHERE735	%s736RETURNING737	array_length({execution_logs}, 1)738`739// MarkComplete attempts to update the state of the record to complete. If this record has already been moved from740// the processing state to a terminal state, this method will have no effect. This method returns a boolean flag741// indicating if the record was updated.742func (s *store) MarkComplete(ctx context.Context, id int, options MarkFinalOptions) (_ bool, err error) {743	ctx, _, endObservation := s.operations.markComplete.With(ctx, &err, observation.Args{LogFields: []log.Field{744		log.Int("id", id),745	}})746	defer endObservation(1, observation.Args{})747	conds := []*sqlf.Query{748		s.formatQuery("{id} = %s", id),749		s.formatQuery("{state} = 'processing'"),750	}751	conds = append(conds, options.ToSQLConds(s.formatQuery)...)752	_, ok, err := basestore.ScanFirstInt(s.Query(ctx, s.formatQuery(markCompleteQuery, quote(s.options.TableName), sqlf.Join(conds, "AND"))))753	return ok, err754}755const markCompleteQuery = `756-- source: internal/workerutil/store.go:MarkComplete757UPDATE %s758SET {state} = 'completed', {finished_at} = clock_timestamp()759WHERE %s760RETURNING {id}761`762// MarkErrored attempts to update the state of the record to errored. This method will only have an effect763// if the current state of the record is processing. A requeued record or a record already marked with an764// error will not be updated. This method returns a boolean flag indicating if the record was updated.765func (s *store) MarkErrored(ctx context.Context, id int, failureMessage string, options MarkFinalOptions) (_ bool, err error) {766	ctx, _, endObservation := s.operations.markErrored.With(ctx, &err, observation.Args{LogFields: []log.Field{767		log.Int("id", id),768	}})769	defer endObservation(1, observation.Args{})770	conds := []*sqlf.Query{771		s.formatQuery("{id} = %s", id),772		s.formatQuery("{state} = 'processing'"),773	}774	conds = append(conds, options.ToSQLConds(s.formatQuery)...)775	q := s.formatQuery(markErroredQuery, quote(s.options.TableName), s.options.MaxNumRetries, failureMessage, sqlf.Join(conds, "AND"))776	_, ok, err := basestore.ScanFirstInt(s.Query(ctx, q))777	return ok, err778}779const markErroredQuery = `780-- source: internal/workerutil/store.go:MarkErrored781UPDATE %s782SET {state} = CASE WHEN {num_failures} + 1 >= %d THEN 'failed' ELSE 'errored' END,783	{finished_at} = clock_timestamp(),784	{failure_message} = %s,785	{num_failures} = {num_failures} + 1786WHERE %s787RETURNING {id}788`789// MarkFailed attempts to update the state of the record to failed. This method will only have an effect790// if the current state of the record is processing. A requeued record or a record already marked with an791// error will not be updated. This method returns a boolean flag indicating if the record was updated.792func (s *store) MarkFailed(ctx context.Context, id int, failureMessage string, options MarkFinalOptions) (_ bool, err error) {793	ctx, _, endObservation := s.operations.markFailed.With(ctx, &err, observation.Args{LogFields: []log.Field{794		log.Int("id", id),795	}})796	defer endObservation(1, observation.Args{})797	conds := []*sqlf.Query{798		s.formatQuery("{id} = %s", id),799		s.formatQuery("{state} = 'processing'"),800	}801	conds = append(conds, options.ToSQLConds(s.formatQuery)...)802	q := s.formatQuery(markFailedQuery, quote(s.options.TableName), failureMessage, sqlf.Join(conds, "AND"))803	_, ok, err := basestore.ScanFirstInt(s.Query(ctx, q))804	return ok, err805}806const markFailedQuery = `807-- source: internal/workerutil/store.go:MarkFailed808UPDATE %s809SET {state} = 'failed',810	{finished_at} = clock_timestamp(),811	{failure_message} = %s,812	{num_failures} = {num_failures} + 1813WHERE %s814RETURNING {id}815`816const defaultResetFailureMessage = "job processor died while handling this message too many times"817// ResetStalled moves all processing records that have not received a heartbeat within `StalledMaxAge` back to the818// queued state. In order to prevent input that continually crashes worker instances, records that have been reset819// more than `MaxNumResets` times will be marked as failed. This method returns a pair of maps from record820// identifiers the age of the record's last heartbeat timestamp for each record reset to queued and failed states,821// respectively.822func (s *store) ResetStalled(ctx context.Context) (resetLastHeartbeatsByIDs, failedLastHeartbeatsByIDs map[int]time.Duration, err error) {823	ctx, trace, endObservation := s.operations.resetStalled.With(ctx, &err, observation.Args{})824	defer endObservation(1, observation.Args{})825	now := s.now()826	scan := scanLastHeartbeatTimestampsFrom(now)827	resetLastHeartbeatsByIDs, err = scan(s.Query(828		ctx,829		s.formatQuery(830			resetStalledQuery,831			quote(s.options.TableName),832			now,833			int(s.options.StalledMaxAge/time.Second),834			s.options.MaxNumResets,835			quote(s.options.TableName),836		),837	))838	if err != nil {839		return resetLastHeartbeatsByIDs, failedLastHeartbeatsByIDs, err840	}841	trace.Log(log.Int("numResetIDs", len(resetLastHeartbeatsByIDs)))842	resetFailureMessage := s.options.ResetFailureMessage843	if resetFailureMessage == "" {844		resetFailureMessage = defaultResetFailureMessage845	}846	failedLastHeartbeatsByIDs, err = scan(s.Query(847		ctx,848		s.formatQuery(849			resetStalledMaxResetsQuery,850			quote(s.options.TableName),851			now,852			int(s.options.StalledMaxAge/time.Second),853			s.options.MaxNumResets,854			quote(s.options.TableName),855			resetFailureMessage,856		),857	))858	if err != nil {859		return resetLastHeartbeatsByIDs, failedLastHeartbeatsByIDs, err860	}861	trace.Log(log.Int("numErroredIDs", len(failedLastHeartbeatsByIDs)))862	return resetLastHeartbeatsByIDs, failedLastHeartbeatsByIDs, nil863}864func scanLastHeartbeatTimestampsFrom(now time.Time) func(rows *sql.Rows, queryErr error) (_ map[int]time.Duration, err error) {865	return func(rows *sql.Rows, queryErr error) (_ map[int]time.Duration, err error) {866		if queryErr != nil {867			return nil, queryErr868		}869		defer func() { err = basestore.CloseRows(rows, err) }()870		m := map[int]time.Duration{}871		for rows.Next() {872			var id int873			var lastHeartbeat time.Time874			if err := rows.Scan(&id, &lastHeartbeat); err != nil {875				return nil, err876			}877			m[id] = now.Sub(lastHeartbeat)878		}879		return m, nil880	}881}882const resetStalledQuery = `883-- source: internal/workerutil/store.go:ResetStalled884WITH stalled AS (885	SELECT {id} FROM %s886	WHERE887		{state} = 'processing' AND888		%s - {last_heartbeat_at} > (%s * '1 second'::interval) AND889		{num_resets} < %s890	FOR UPDATE SKIP LOCKED891)892UPDATE %s893SET894	{state} = 'queued',895	{queued_at} = clock_timestamp(),896	{started_at} = null,897	{num_resets} = {num_resets} + 1898WHERE {id} IN (SELECT {id} FROM stalled)899RETURNING {id}, {last_heartbeat_at}900`901const resetStalledMaxResetsQuery = `902-- source: internal/workerutil/store.go:ResetStalled903WITH stalled AS (904	SELECT {id} FROM %s905	WHERE906		{state} = 'processing' AND907		%s - {last_heartbeat_at} > (%s * '1 second'::interval) AND908		{num_resets} >= %s909	FOR UPDATE SKIP LOCKED910)911UPDATE %s912SET913	{state} = 'failed',914	{finished_at} = clock_timestamp(),915	{failure_message} = %s916WHERE {id} IN (SELECT {id} FROM stalled)917RETURNING {id}, {last_heartbeat_at}918`919func (s *store) formatQuery(query string, args ...any) *sqlf.Query {920	return sqlf.Sprintf(s.columnReplacer.Replace(query), args...)921}922func (s *store) now() time.Time {923	return s.options.clock.Now().UTC()924}925const fetchDebugInformationForJob = `926-- source: internal/workerutil/store.go:UpdateExecutionLogEntry927SELECT928	row_to_json(%s)929FROM930	%s931WHERE932	{id} = %s933`934func (s *store) fetchDebugInformationForJob(ctx context.Context, recordID int) (debug string, err error) {935	debug, ok, err := basestore.ScanFirstNullString(s.Query(ctx, s.formatQuery(936		fetchDebugInformationForJob,937		quote(extractTableName(s.options.TableName)),938		quote(s.options.TableName),939		recordID,940	)))941	if err != nil {942		return "", err943	}944	if !ok {945		return "", errors.Newf("fetching debug information for record %d didn't return rows")946	}947	return debug, nil948}949// quote wraps the given string in a *sqlf.Query so that it is not passed to the database950// as a parameter. It is necessary to quote things such as table names, columns, and other951// expressions that are not simple values.952func quote(s string) *sqlf.Query {953	return sqlf.Sprintf(s)954}955// makeConditionSuffix returns a *sqlf.Query containing "AND {c1 AND c2 AND ...}" when the956// given set of conditions is non-empty, and an empty string otherwise.957func makeConditionSuffix(conditions []*sqlf.Query) *sqlf.Query {958	if len(conditions) == 0 {959		return sqlf.Sprintf("")960	}961	var quotedConditions []*sqlf.Query962	for _, condition := range conditions {963		// Ensure everything is quoted in case the condition has an OR964		quotedConditions = append(quotedConditions, sqlf.Sprintf("(%s)", condition))965	}966	return sqlf.Sprintf("AND %s", sqlf.Join(quotedConditions, " AND "))967}968type MatchingColumnExpressions struct {969	columnName string970	exact      bool971}972// matchModifiedColumnExpressions returns a slice of columns to which each of the973// given column expressions refers. Column references that do not refere to a member974// of the columnsUpdatedByDequeue slice are ignored. Each match indicates the column975// name and whether or not the expression is an exact reference or a reference within976// a more complex expression (arithmetic, function call argument, etc).977//978// The output slice has the same number of elements as the input column expressions979// and the results are ordered in parallel with the given column expressions.980func matchModifiedColumnExpressions(viewName string, columnExpressions []*sqlf.Query, alternateColumnNames map[string]string) [][]MatchingColumnExpressions {981	matches := make([][]MatchingColumnExpressions, len(columnExpressions))982	columnPrefixes := makeColumnPrefixes(viewName)983	for i, columnExpression := range columnExpressions {984		columnExpressionText := columnExpression.Query(sqlf.PostgresBindVar)985		for _, columnName := range columnsUpdatedByDequeue {986			match := false987			exact := false988			if name, ok := alternateColumnNames[columnName]; ok {989				columnName = name990			}991			for _, columnPrefix := range columnPrefixes {992				if regexp.MustCompile(fmt.Sprintf(`^%s%s$`, columnPrefix, columnName)).MatchString(columnExpressionText) {993					match = true994					exact = true995					break996				}997				if !match && regexp.MustCompile(fmt.Sprintf(`\b%s%s\b`, columnPrefix, columnName)).MatchString(columnExpressionText) {998					match = true999				}...

Full Screen

Full Screen

store_test.go

Source:store_test.go Github

copy

Full Screen

...97	if count != 2 {98		t.Errorf("unexpected count. want=%d have=%d", 2, count)99	}100}101func TestStoreMaxDurationInQueue(t *testing.T) {102	db := setupStoreTest(t)103	if _, err := db.ExecContext(context.Background(), `104		INSERT INTO workerutil_test (id, state, created_at)105		VALUES106			(1, 'queued', NOW() - '20 minutes'::interval), -- young107			(2, 'queued', NOW() - '30 minutes'::interval), -- oldest queued108			(3, 'state2', NOW() - '40 minutes'::interval), -- wrong state109			(4, 'queued', NOW() - '10 minutes'::interval), -- young110			(5, 'state3', NOW() - '50 minutes'::interval)  -- wrong state111	`); err != nil {112		t.Fatalf("unexpected error inserting records: %s", err)113	}114	age, err := testStore(db, defaultTestStoreOptions(nil)).MaxDurationInQueue(context.Background())115	if err != nil {116		t.Fatalf("unexpected error getting max duration in queue: %s", err)117	}118	if age.Round(time.Second) != 30*time.Minute {119		t.Fatalf("unexpected max age. want=%s have=%s", 30*time.Minute, age)120	}121}122func TestStoreMaxDurationInQueueProcessAfter(t *testing.T) {123	db := setupStoreTest(t)124	if _, err := db.ExecContext(context.Background(), `125		INSERT INTO workerutil_test (id, state, created_at, process_after)126		VALUES127			(1, 'queued', NOW() - '90 minutes'::interval, NOW() + '10 minutes'::interval), -- oldest queued, waiting for process_after128			(2, 'queued', NOW() - '70 minutes'::interval, NOW() - '30 minutes'::interval), -- oldest queued129			(3, 'state2', NOW() - '40 minutes'::interval, NULL),                           -- wrong state130			(4, 'queued', NOW() - '10 minutes'::interval, NULL),                           -- young131			(5, 'state3', NOW() - '50 minutes'::interval, NULL)                            -- wrong state132	`); err != nil {133		t.Fatalf("unexpected error inserting records: %s", err)134	}135	age, err := testStore(db, defaultTestStoreOptions(nil)).MaxDurationInQueue(context.Background())136	if err != nil {137		t.Fatalf("unexpected error getting max duration in queue: %s", err)138	}139	if age.Round(time.Second) != 30*time.Minute {140		t.Fatalf("unexpected max age. want=%s have=%s", 30*time.Minute, age)141	}142}143func TestStoreMaxDurationInQueueFailed(t *testing.T) {144	db := setupStoreTest(t)145	if _, err := db.ExecContext(context.Background(), `146		INSERT INTO workerutil_test (id, state, created_at, finished_at, num_failures)147		VALUES148			(1, 'queued',  NOW() - '10 minutes'::interval, NULL,                           0), -- young149			(2, 'errored', NOW(),                          NOW() - '30 minutes'::interval, 2), -- oldest retryable error'd150			(3, 'errored', NOW(),                          NOW() - '10 minutes'::interval, 2), -- retryable, but too young to be queued151			(4, 'state2',  NOW() - '40 minutes'::interval, NULL,                           0), -- wrong state152			(5, 'errored', NOW(),                          NOW() - '50 minutes'::interval, 3), -- non-retryable (max attempts exceeded)153			(6, 'queued',  NOW() - '20 minutes'::interval, NULL,                           0), -- oldest queued154			(7, 'failed',  NOW(),                          NOW() - '60 minutes'::interval, 1)  -- wrong state155	`); err != nil {156		t.Fatalf("unexpected error inserting records: %s", err)157	}158	options := defaultTestStoreOptions(nil)159	options.RetryAfter = 5 * time.Minute160	age, err := testStore(db, options).MaxDurationInQueue(context.Background())161	if err != nil {162		t.Fatalf("unexpected error getting max duration in queue: %s", err)163	}164	if age.Round(time.Second) != 25*time.Minute {165		t.Fatalf("unexpected max age. want=%s have=%s", 25*time.Minute, age)166	}167}168func TestStoreMaxDurationInQueueEmpty(t *testing.T) {169	db := setupStoreTest(t)170	age, err := testStore(db, defaultTestStoreOptions(nil)).MaxDurationInQueue(context.Background())171	if err != nil {172		t.Fatalf("unexpected error getting max duration in queue: %s", err)173	}174	if age.Round(time.Second) != 0*time.Minute {175		t.Fatalf("unexpected max age. want=%s have=%s", 0*time.Minute, age)176	}177}178func TestStoreDequeueState(t *testing.T) {179	db := setupStoreTest(t)180	if _, err := db.ExecContext(context.Background(), `181		INSERT INTO workerutil_test (id, state, created_at)182		VALUES183			(1, 'queued', NOW() - '1 minute'::interval),184			(2, 'queued', NOW() - '2 minute'::interval),185			(3, 'state2', NOW() - '3 minute'::interval),186			(4, 'queued', NOW() - '4 minute'::interval),187			(5, 'state2', NOW() - '5 minute'::interval)188	`); err != nil {189		t.Fatalf("unexpected error inserting records: %s", err)190	}191	record, ok, err := testStore(db, defaultTestStoreOptions(nil)).Dequeue(context.Background(), "test", nil)192	assertDequeueRecordResult(t, 4, record, ok, err)193}194func TestStoreDequeueOrder(t *testing.T) {195	db := setupStoreTest(t)196	if _, err := db.ExecContext(context.Background(), `197		INSERT INTO workerutil_test (id, state, created_at)198		VALUES199			(1, 'queued', NOW() - '2 minute'::interval),200			(2, 'queued', NOW() - '5 minute'::interval),201			(3, 'queued', NOW() - '3 minute'::interval),202			(4, 'queued', NOW() - '1 minute'::interval),203			(5, 'queued', NOW() - '4 minute'::interval)204	`); err != nil {205		t.Fatalf("unexpected error inserting records: %s", err)206	}207	record, ok, err := testStore(db, defaultTestStoreOptions(nil)).Dequeue(context.Background(), "test", nil)208	assertDequeueRecordResult(t, 2, record, ok, err)209}210func TestStoreDequeueConditions(t *testing.T) {211	db := setupStoreTest(t)212	if _, err := db.ExecContext(context.Background(), `213		INSERT INTO workerutil_test (id, state, created_at)214		VALUES215			(1, 'queued', NOW() - '1 minute'::interval),216			(2, 'queued', NOW() - '2 minute'::interval),217			(3, 'queued', NOW() - '3 minute'::interval),218			(4, 'queued', NOW() - '4 minute'::interval),219			(5, 'queued', NOW() - '5 minute'::interval)220	`); err != nil {221		t.Fatalf("unexpected error inserting records: %s", err)222	}223	conditions := []*sqlf.Query{sqlf.Sprintf("workerutil_test.id < 4")}224	record, ok, err := testStore(db, defaultTestStoreOptions(nil)).Dequeue(context.Background(), "test", conditions)225	assertDequeueRecordResult(t, 3, record, ok, err)226}227func TestStoreDequeueResetExecutionLogs(t *testing.T) {228	db := setupStoreTest(t)229	if _, err := db.ExecContext(context.Background(), `230		INSERT INTO workerutil_test (id, state, execution_logs, created_at)231		VALUES232			(1, 'queued', E'{"{\\"key\\": \\"test\\"}"}', NOW() - '1 minute'::interval)233	`); err != nil {234		t.Fatalf("unexpected error inserting records: %s", err)235	}236	record, ok, err := testStore(db, defaultTestStoreOptions(nil)).Dequeue(context.Background(), "test", nil)237	assertDequeueRecordResult(t, 1, record, ok, err)238	assertDequeueRecordResultLogCount(t, 0, record)239}240func TestStoreDequeueDelay(t *testing.T) {241	db := setupStoreTest(t)242	if _, err := db.ExecContext(context.Background(), `243		INSERT INTO workerutil_test (id, state, created_at, process_after)244		VALUES245			(1, 'queued', NOW() - '1 minute'::interval, NULL),246			(2, 'queued', NOW() - '2 minute'::interval, NULL),247			(3, 'queued', NOW() - '3 minute'::interval, NOW() + '2 minute'::interval),248			(4, 'queued', NOW() - '4 minute'::interval, NULL),249			(5, 'queued', NOW() - '5 minute'::interval, NOW() + '1 minute'::interval)250	`); err != nil {251		t.Fatalf("unexpected error inserting records: %s", err)252	}253	record, ok, err := testStore(db, defaultTestStoreOptions(nil)).Dequeue(context.Background(), "test", nil)254	assertDequeueRecordResult(t, 4, record, ok, err)255}256func TestStoreDequeueView(t *testing.T) {257	db := setupStoreTest(t)258	if _, err := db.ExecContext(context.Background(), `259		INSERT INTO workerutil_test (id, state, created_at)260		VALUES261			(1, 'queued', NOW() - '1 minute'::interval),262			(2, 'queued', NOW() - '2 minute'::interval),263			(3, 'queued', NOW() - '3 minute'::interval),264			(4, 'queued', NOW() - '4 minute'::interval),265			(5, 'queued', NOW() - '5 minute'::interval)266	`); err != nil {267		t.Fatalf("unexpected error inserting records: %s", err)268	}269	options := defaultTestStoreOptions(nil)270	options.ViewName = "workerutil_test_view v"271	options.Scan = testScanFirstRecordView272	options.OrderByExpression = sqlf.Sprintf("v.created_at")273	options.ColumnExpressions = []*sqlf.Query{274		sqlf.Sprintf("v.id"),275		sqlf.Sprintf("v.state"),276		sqlf.Sprintf("v.new_field"),277	}278	conditions := []*sqlf.Query{sqlf.Sprintf("v.new_field < 15")}279	record, ok, err := testStore(db, options).Dequeue(context.Background(), "test", conditions)280	assertDequeueRecordViewResult(t, 2, 14, record, ok, err)281}282func TestStoreDequeueConcurrent(t *testing.T) {283	db := setupStoreTest(t)284	if _, err := db.ExecContext(context.Background(), `285		INSERT INTO workerutil_test (id, state, created_at)286		VALUES287			(1, 'queued', NOW() - '2 minute'::interval),288			(2, 'queued', NOW() - '1 minute'::interval)289	`); err != nil {290		t.Fatalf("unexpected error inserting records: %s", err)291	}292	store := testStore(db, defaultTestStoreOptions(nil))293	// Worker A294	record1, ok, err := store.Dequeue(context.Background(), "test", nil)295	if err != nil {296		t.Fatalf("unexpected error: %s", err)297	}298	if !ok {299		t.Fatalf("expected a dequeueable record")300	}301	// Worker B302	record2, ok, err := store.Dequeue(context.Background(), "test", nil)303	if err != nil {304		t.Fatalf("unexpected error: %s", err)305	}306	if !ok {307		t.Fatalf("expected a second dequeueable record")308	}309	if val := record1.(TestRecord).ID; val != 1 {310		t.Errorf("unexpected id. want=%d have=%d", 1, val)311	}312	if val := record2.(TestRecord).ID; val != 2 {313		t.Errorf("unexpected id. want=%d have=%d", 2, val)314	}315	// Worker C316	_, ok, err = store.Dequeue(context.Background(), "test", nil)317	if err != nil {318		t.Fatalf("unexpected error: %s", err)319	}320	if ok {321		t.Fatalf("did not expect a third dequeueable record")322	}323}324func TestStoreDequeueRetryAfter(t *testing.T) {325	db := setupStoreTest(t)326	if _, err := db.ExecContext(context.Background(), `327		INSERT INTO workerutil_test (id, state, finished_at, failure_message, num_failures, created_at)328		VALUES329			(1, 'errored', NOW() - '6 minute'::interval, 'error', 3, NOW() - '2 minutes'::interval),330			(2, 'errored', NOW() - '4 minute'::interval, 'error', 0, NOW() - '3 minutes'::interval),331			(3, 'errored', NOW() - '6 minute'::interval, 'error', 5, NOW() - '4 minutes'::interval),332			(4, 'queued',                          NULL,    NULL, 0, NOW() - '1 minutes'::interval)333	`); err != nil {334		t.Fatalf("unexpected error inserting records: %s", err)335	}336	options := defaultTestStoreOptions(nil)337	options.Scan = testScanFirstRecordRetry338	options.MaxNumRetries = 5339	options.RetryAfter = 5 * time.Minute340	options.ColumnExpressions = []*sqlf.Query{341		sqlf.Sprintf("workerutil_test.id"),342		sqlf.Sprintf("workerutil_test.state"),343		sqlf.Sprintf("workerutil_test.num_resets"),344	}345	store := testStore(db, options)346	// Dequeue errored record347	record1, ok, err := store.Dequeue(context.Background(), "test", nil)348	assertDequeueRecordRetryResult(t, 1, record1, ok, err)349	// Dequeue non-errored record350	record2, ok, err := store.Dequeue(context.Background(), "test", nil)351	assertDequeueRecordRetryResult(t, 4, record2, ok, err)352	// Does not dequeue old or max retried errored353	if _, ok, _ := store.Dequeue(context.Background(), "test", nil); ok {354		t.Fatalf("did not expect a third dequeueable record")355	}356}357func TestStoreDequeueRetryAfterDisabled(t *testing.T) {358	db := setupStoreTest(t)359	if _, err := db.ExecContext(context.Background(), `360		INSERT INTO workerutil_test (id, state, finished_at, failure_message, num_failures, created_at)361		VALUES362			(1, 'errored', NOW() - '6 minute'::interval, 'error', 3, NOW() - '2 minutes'::interval),363			(2, 'errored', NOW() - '4 minute'::interval, 'error', 0, NOW() - '3 minutes'::interval),364			(3, 'errored', NOW() - '6 minute'::interval, 'error', 5, NOW() - '4 minutes'::interval),365			(4, 'queued',                          NULL,    NULL, 0, NOW() - '1 minutes'::interval)366	`); err != nil {367		t.Fatalf("unexpected error inserting records: %s", err)368	}369	options := defaultTestStoreOptions(nil)370	options.Scan = testScanFirstRecordRetry371	options.MaxNumRetries = 5372	options.RetryAfter = 0373	options.ColumnExpressions = []*sqlf.Query{374		sqlf.Sprintf("workerutil_test.id"),375		sqlf.Sprintf("workerutil_test.state"),376		sqlf.Sprintf("workerutil_test.num_resets"),377	}378	store := testStore(db, options)379	// Dequeue non-errored record only380	record2, ok, err := store.Dequeue(context.Background(), "test", nil)381	assertDequeueRecordRetryResult(t, 4, record2, ok, err)382	// Does not dequeue errored383	if _, ok, _ := store.Dequeue(context.Background(), "test", nil); ok {384		t.Fatalf("did not expect a second dequeueable record")385	}386}387func TestStoreRequeue(t *testing.T) {388	db := setupStoreTest(t)389	if _, err := db.ExecContext(context.Background(), `390		INSERT INTO workerutil_test (id, state)391		VALUES392			(1, 'processing')393	`); err != nil {394		t.Fatalf("unexpected error inserting records: %s", err)395	}396	after := testNow().Add(time.Hour)397	if err := testStore(db, defaultTestStoreOptions(nil)).Requeue(context.Background(), 1, after); err != nil {398		t.Fatalf("unexpected error requeueing record: %s", err)399	}400	rows, err := db.QueryContext(context.Background(), `SELECT state, process_after FROM workerutil_test WHERE id = 1`)401	if err != nil {402		t.Fatalf("unexpected error querying record: %s", err)403	}404	defer func() { _ = basestore.CloseRows(rows, nil) }()405	if !rows.Next() {406		t.Fatal("expected record to exist")407	}408	var state string409	var processAfter *time.Time410	if err := rows.Scan(&state, &processAfter); err != nil {411		t.Fatalf("unexpected error scanning record: %s", err)412	}413	if state != "queued" {414		t.Errorf("unexpected state. want=%q have=%q", "queued", state)415	}416	if processAfter == nil || !processAfter.Equal(after) {417		t.Errorf("unexpected process after. want=%s have=%s", after, processAfter)418	}419}420func TestStoreAddExecutionLogEntry(t *testing.T) {421	db := setupStoreTest(t)422	if _, err := db.ExecContext(context.Background(), `423		INSERT INTO workerutil_test (id, state)424		VALUES425			(1, 'processing')426	`); err != nil {427		t.Fatalf("unexpected error inserting records: %s", err)428	}429	numEntries := 5430	for i := 0; i < numEntries; i++ {431		command := []string{"ls", "-a", fmt.Sprintf("%d", i+1)}432		payload := fmt.Sprintf("<load payload %d>", i+1)433		entry := workerutil.ExecutionLogEntry{434			Command: command,435			Out:     payload,436		}437		entryID, err := testStore(db, defaultTestStoreOptions(nil)).AddExecutionLogEntry(context.Background(), 1, entry, ExecutionLogEntryOptions{})438		if err != nil {439			t.Fatalf("unexpected error adding executor log entry: %s", err)440		}441		// PostgreSQL's arrays use 1-based indexing, so the first entry is at 1442		if entryID != i+1 {443			t.Fatalf("executor log entry has wrong entry id. want=%d, have=%d", i+1, entryID)444		}445	}446	contents, err := basestore.ScanStrings(db.QueryContext(context.Background(), `SELECT unnest(execution_logs)::text FROM workerutil_test WHERE id = 1`))447	if err != nil {448		t.Fatalf("unexpected error scanning record: %s", err)449	}450	if len(contents) != numEntries {451		t.Fatalf("unexpected number of payloads. want=%d have=%d", numEntries, len(contents))452	}453	for i := 0; i < numEntries; i++ {454		var entry workerutil.ExecutionLogEntry455		if err := json.Unmarshal([]byte(contents[i]), &entry); err != nil {456			t.Fatalf("unexpected error decoding entry: %s", err)457		}458		expected := workerutil.ExecutionLogEntry{459			Command: []string{"ls", "-a", fmt.Sprintf("%d", i+1)},460			Out:     fmt.Sprintf("<load payload %d>", i+1),461		}462		if diff := cmp.Diff(expected, entry); diff != "" {463			t.Errorf("unexpected entry (-want +got):\n%s", diff)464		}465	}466}467func TestStoreAddExecutionLogEntryNoRecord(t *testing.T) {468	db := setupStoreTest(t)469	entry := workerutil.ExecutionLogEntry{470		Command: []string{"ls", "-a"},471		Out:     "output",472	}473	_, err := testStore(db, defaultTestStoreOptions(nil)).AddExecutionLogEntry(context.Background(), 1, entry, ExecutionLogEntryOptions{})474	if err == nil {475		t.Fatalf("expected error but got none")476	}477}478func TestStoreUpdateExecutionLogEntry(t *testing.T) {479	db := setupStoreTest(t)480	if _, err := db.ExecContext(context.Background(), `481		INSERT INTO workerutil_test (id, state)482		VALUES483			(1, 'processing')484	`); err != nil {485		t.Fatalf("unexpected error inserting records: %s", err)486	}487	numEntries := 5488	for i := 0; i < numEntries; i++ {489		command := []string{"ls", "-a", fmt.Sprintf("%d", i+1)}490		payload := fmt.Sprintf("<load payload %d>", i+1)491		entry := workerutil.ExecutionLogEntry{492			Command: command,493			Out:     payload,494		}495		entryID, err := testStore(db, defaultTestStoreOptions(nil)).AddExecutionLogEntry(context.Background(), 1, entry, ExecutionLogEntryOptions{})496		if err != nil {497			t.Fatalf("unexpected error adding executor log entry: %s", err)498		}499		// PostgreSQL's arrays use 1-based indexing, so the first entry is at 1500		if entryID != i+1 {501			t.Fatalf("executor log entry has wrong entry id. want=%d, have=%d", i+1, entryID)502		}503		entry.Out += fmt.Sprintf("\n<load payload %d again, nobody was at home>", i+1)504		if err := testStore(db, defaultTestStoreOptions(nil)).UpdateExecutionLogEntry(context.Background(), 1, entryID, entry, ExecutionLogEntryOptions{}); err != nil {505			t.Fatalf("unexpected error updating executor log entry: %s", err)506		}507	}508	contents, err := basestore.ScanStrings(db.QueryContext(context.Background(), `SELECT unnest(execution_logs)::text FROM workerutil_test WHERE id = 1`))509	if err != nil {510		t.Fatalf("unexpected error scanning record: %s", err)511	}512	if len(contents) != numEntries {513		t.Fatalf("unexpected number of payloads. want=%d have=%d", numEntries, len(contents))514	}515	for i := 0; i < numEntries; i++ {516		var entry workerutil.ExecutionLogEntry517		if err := json.Unmarshal([]byte(contents[i]), &entry); err != nil {518			t.Fatalf("unexpected error decoding entry: %s", err)519		}520		expected := workerutil.ExecutionLogEntry{521			Command: []string{"ls", "-a", fmt.Sprintf("%d", i+1)},522			Out:     fmt.Sprintf("<load payload %d>\n<load payload %d again, nobody was at home>", i+1, i+1),523		}524		if diff := cmp.Diff(expected, entry); diff != "" {525			t.Errorf("unexpected entry (-want +got):\n%s", diff)526		}527	}528}529func TestStoreUpdateExecutionLogEntryUnknownEntry(t *testing.T) {530	db := setupStoreTest(t)531	if _, err := db.ExecContext(context.Background(), `532		INSERT INTO workerutil_test (id, state)533		VALUES534			(1, 'processing')535	`); err != nil {536		t.Fatalf("unexpected error inserting records: %s", err)537	}538	entry := workerutil.ExecutionLogEntry{539		Command: []string{"ls", "-a"},540		Out:     "<load payload>",541	}542	for unknownEntryID := 0; unknownEntryID < 2; unknownEntryID++ {543		err := testStore(db, defaultTestStoreOptions(nil)).UpdateExecutionLogEntry(context.Background(), 1, unknownEntryID, entry, ExecutionLogEntryOptions{})544		if err == nil {545			t.Fatal("expected error but got none")546		}547	}548}549func TestStoreMarkComplete(t *testing.T) {550	db := setupStoreTest(t)551	if _, err := db.ExecContext(context.Background(), `552		INSERT INTO workerutil_test (id, state)553		VALUES554			(1, 'processing')555	`); err != nil {556		t.Fatalf("unexpected error inserting records: %s", err)557	}558	marked, err := testStore(db, defaultTestStoreOptions(nil)).MarkComplete(context.Background(), 1, MarkFinalOptions{})559	if err != nil {560		t.Fatalf("unexpected error marking record as completed: %s", err)561	}562	if !marked {563		t.Fatalf("expected record to be marked")564	}565	rows, err := db.QueryContext(context.Background(), `SELECT state, failure_message FROM workerutil_test WHERE id = 1`)566	if err != nil {567		t.Fatalf("unexpected error querying record: %s", err)568	}569	defer func() { _ = basestore.CloseRows(rows, nil) }()570	if !rows.Next() {571		t.Fatal("expected record to exist")572	}573	var state string574	var failureMessage *string575	if err := rows.Scan(&state, &failureMessage); err != nil {576		t.Fatalf("unexpected error scanning record: %s", err)577	}578	if state != "completed" {579		t.Errorf("unexpected state. want=%q have=%q", "completed", state)580	}581	if failureMessage != nil {582		t.Errorf("unexpected failure message. want=%v have=%v", nil, failureMessage)583	}584}585func TestStoreMarkCompleteNotProcessing(t *testing.T) {586	db := setupStoreTest(t)587	if _, err := db.ExecContext(context.Background(), `588		INSERT INTO workerutil_test (id, state, failure_message)589		VALUES590			(1, 'errored', 'old message')591	`); err != nil {592		t.Fatalf("unexpected error inserting records: %s", err)593	}594	marked, err := testStore(db, defaultTestStoreOptions(nil)).MarkComplete(context.Background(), 1, MarkFinalOptions{})595	if err != nil {596		t.Fatalf("unexpected error marking record as completed: %s", err)597	}598	if marked {599		t.Fatalf("expected record not to be marked")600	}601	rows, err := db.QueryContext(context.Background(), `SELECT state, failure_message FROM workerutil_test WHERE id = 1`)602	if err != nil {603		t.Fatalf("unexpected error querying record: %s", err)604	}605	defer func() { _ = basestore.CloseRows(rows, nil) }()606	if !rows.Next() {607		t.Fatal("expected record to exist")608	}609	var state string610	var failureMessage *string611	if err := rows.Scan(&state, &failureMessage); err != nil {612		t.Fatalf("unexpected error scanning record: %s", err)613	}614	if state != "errored" {615		t.Errorf("unexpected state. want=%q have=%q", "errored", state)616	}617	if failureMessage == nil || *failureMessage != "old message" {618		t.Errorf("unexpected failure message. want=%v have=%v", "old message", failureMessage)619	}620}621func TestStoreMarkErrored(t *testing.T) {622	db := setupStoreTest(t)623	if _, err := db.ExecContext(context.Background(), `624		INSERT INTO workerutil_test (id, state)625		VALUES626			(1, 'processing')627	`); err != nil {628		t.Fatalf("unexpected error inserting records: %s", err)629	}630	marked, err := testStore(db, defaultTestStoreOptions(nil)).MarkErrored(context.Background(), 1, "new message", MarkFinalOptions{})631	if err != nil {632		t.Fatalf("unexpected error marking record as errored: %s", err)633	}634	if !marked {635		t.Fatalf("expected record to be marked")636	}637	rows, err := db.QueryContext(context.Background(), `SELECT state, failure_message FROM workerutil_test WHERE id = 1`)638	if err != nil {639		t.Fatalf("unexpected error querying record: %s", err)640	}641	defer func() { _ = basestore.CloseRows(rows, nil) }()642	if !rows.Next() {643		t.Fatal("expected record to exist")644	}645	var state string646	var failureMessage *string647	if err := rows.Scan(&state, &failureMessage); err != nil {648		t.Fatalf("unexpected error scanning record: %s", err)649	}650	if state != "errored" {651		t.Errorf("unexpected state. want=%q have=%q", "errored", state)652	}653	if failureMessage == nil || *failureMessage != "new message" {654		t.Errorf("unexpected failure message. want=%v have=%v", "new message", failureMessage)655	}656}657func TestStoreMarkFailed(t *testing.T) {658	db := setupStoreTest(t)659	if _, err := db.ExecContext(context.Background(), `660		INSERT INTO workerutil_test (id, state)661		VALUES662			(1, 'processing')663	`); err != nil {664		t.Fatalf("unexpected error inserting records: %s", err)665	}666	marked, err := testStore(db, defaultTestStoreOptions(nil)).MarkFailed(context.Background(), 1, "new message", MarkFinalOptions{})667	if err != nil {668		t.Fatalf("unexpected error marking upload as completed: %s", err)669	}670	if !marked {671		t.Fatalf("expected record to be marked")672	}673	rows, err := db.QueryContext(context.Background(), `SELECT state, failure_message FROM workerutil_test WHERE id = 1`)674	if err != nil {675		t.Fatalf("unexpected error querying record: %s", err)676	}677	defer func() { _ = basestore.CloseRows(rows, nil) }()678	if !rows.Next() {679		t.Fatal("expected record to exist")680	}681	var state string682	var failureMessage *string683	if err := rows.Scan(&state, &failureMessage); err != nil {684		t.Fatalf("unexpected error scanning record: %s", err)685	}686	if state != "failed" {687		t.Errorf("unexpected state. want=%q have=%q", "failed", state)688	}689	if failureMessage == nil || *failureMessage != "new message" {690		t.Errorf("unexpected failure message. want=%v have=%v", "new message", failureMessage)691	}692}693func TestStoreMarkErroredAlreadyCompleted(t *testing.T) {694	db := setupStoreTest(t)695	if _, err := db.ExecContext(context.Background(), `696		INSERT INTO workerutil_test (id, state)697		VALUES698			(1, 'completed')699	`); err != nil {700		t.Fatalf("unexpected error inserting records: %s", err)701	}702	marked, err := testStore(db, defaultTestStoreOptions(nil)).MarkErrored(context.Background(), 1, "new message", MarkFinalOptions{})703	if err != nil {704		t.Fatalf("unexpected error marking record as errored: %s", err)705	}706	if marked {707		t.Fatalf("expected record not to be marked errired")708	}709	rows, err := db.QueryContext(context.Background(), `SELECT state, failure_message FROM workerutil_test WHERE id = 1`)710	if err != nil {711		t.Fatalf("unexpected error querying record: %s", err)712	}713	defer func() { _ = basestore.CloseRows(rows, nil) }()714	if !rows.Next() {715		t.Fatal("expected record to exist")716	}717	var state string718	var failureMessage *string719	if err := rows.Scan(&state, &failureMessage); err != nil {720		t.Fatalf("unexpected error scanning record: %s", err)721	}722	if state != "completed" {723		t.Errorf("unexpected state. want=%q have=%q", "completed", state)724	}725	if failureMessage != nil {726		t.Errorf("unexpected non-empty failure message")727	}728}729func TestStoreMarkErroredAlreadyErrored(t *testing.T) {730	db := setupStoreTest(t)731	if _, err := db.ExecContext(context.Background(), `732		INSERT INTO workerutil_test (id, state, failure_message)733		VALUES734			(1, 'errored', 'old message')735	`); err != nil {736		t.Fatalf("unexpected error inserting records: %s", err)737	}738	marked, err := testStore(db, defaultTestStoreOptions(nil)).MarkErrored(context.Background(), 1, "new message", MarkFinalOptions{})739	if err != nil {740		t.Fatalf("unexpected error marking record as errored: %s", err)741	}742	if marked {743		t.Fatalf("expected record not to be marked")744	}745	rows, err := db.QueryContext(context.Background(), `SELECT state, failure_message FROM workerutil_test WHERE id = 1`)746	if err != nil {747		t.Fatalf("unexpected error querying record: %s", err)748	}749	defer func() { _ = basestore.CloseRows(rows, nil) }()750	if !rows.Next() {751		t.Fatal("expected record to exist")752	}753	var state string754	var failureMessage *string755	if err := rows.Scan(&state, &failureMessage); err != nil {756		t.Fatalf("unexpected error scanning record: %s", err)757	}758	if state != "errored" {759		t.Errorf("unexpected state. want=%q have=%q", "errored", state)760	}761	if failureMessage == nil || *failureMessage != "old message" {762		t.Errorf("unexpected failure message. want=%v have=%v", "old message", failureMessage)763	}764}765func TestStoreMarkErroredRetriesExhausted(t *testing.T) {766	db := setupStoreTest(t)767	if _, err := db.ExecContext(context.Background(), `768		INSERT INTO workerutil_test (id, state, num_failures)769		VALUES770			(1, 'processing', 0),771			(2, 'processing', 1)772	`); err != nil {773		t.Fatalf("unexpected error inserting records: %s", err)774	}775	options := defaultTestStoreOptions(nil)776	options.MaxNumRetries = 2777	store := testStore(db, options)778	for i := 1; i < 3; i++ {779		marked, err := store.MarkErrored(context.Background(), i, "new message", MarkFinalOptions{})780		if err != nil {781			t.Fatalf("unexpected error marking record as errored: %s", err)782		}783		if !marked {784			t.Fatalf("expected record to be marked")785		}786	}787	assertState := func(id int, wantState string) {788		q := fmt.Sprintf(`SELECT state FROM workerutil_test WHERE id = %d`, id)789		rows, err := db.QueryContext(context.Background(), q)790		if err != nil {791			t.Fatalf("unexpected error querying record: %s", err)792		}793		defer func() { _ = basestore.CloseRows(rows, nil) }()794		if !rows.Next() {795			t.Fatal("expected record to exist")796		}797		var state string798		if err := rows.Scan(&state); err != nil {799			t.Fatalf("unexpected error scanning record: %s", err)800		}801		if state != wantState {802			t.Errorf("record %d unexpected state. want=%q have=%q", id, wantState, state)803		}804	}805	assertState(1, "errored")806	assertState(2, "failed")807}808func TestStoreResetStalled(t *testing.T) {809	db := setupStoreTest(t)810	if _, err := db.ExecContext(context.Background(), `811		INSERT INTO workerutil_test (id, state, last_heartbeat_at, num_resets)812		VALUES813			(1, 'processing', NOW() - '6 second'::interval, 1),814			(2, 'processing', NOW() - '2 second'::interval, 0),815			(3, 'processing', NOW() - '3 second'::interval, 0),816			(4, 'processing', NOW() - '8 second'::interval, 0),817			(5, 'processing', NOW() - '8 second'::interval, 0),818			(6, 'processing', NOW() - '6 second'::interval, 5),819			(7, 'processing', NOW() - '8 second'::interval, 5)820	`); err != nil {821		t.Fatalf("unexpected error inserting records: %s", err)822	}823	tx, err := db.BeginTx(context.Background(), nil)824	if err != nil {825		t.Fatal(err)826	}827	defer func() { _ = tx.Rollback() }()828	// Row lock record 5 in a transaction which should be skipped by ResetStalled829	if _, err := tx.Exec(`SELECT * FROM workerutil_test WHERE id = 5 FOR UPDATE`); err != nil {830		t.Fatal(err)831	}832	resetLastHeartbeatsByIDs, erroredLastHeartbeatsByIDs, err := testStore(db, defaultTestStoreOptions(nil)).ResetStalled(context.Background())833	if err != nil {834		t.Fatalf("unexpected error resetting stalled records: %s", err)835	}836	var resetIDs []int837	for id := range resetLastHeartbeatsByIDs {838		resetIDs = append(resetIDs, id)839	}840	sort.Ints(resetIDs)841	var erroredIDs []int842	for id := range erroredLastHeartbeatsByIDs {843		erroredIDs = append(erroredIDs, id)844	}845	sort.Ints(erroredIDs)846	if diff := cmp.Diff([]int{1, 4}, resetIDs); diff != "" {847		t.Errorf("unexpected reset ids (-want +got):\n%s", diff)848	}849	if diff := cmp.Diff([]int{6, 7}, erroredIDs); diff != "" {850		t.Errorf("unexpected errored ids (-want +got):\n%s", diff)851	}852	rows, err := db.QueryContext(context.Background(), `SELECT state, num_resets FROM workerutil_test WHERE id = 1`)853	if err != nil {854		t.Fatalf("unexpected error querying record: %s", err)855	}856	defer func() { _ = basestore.CloseRows(rows, nil) }()857	if !rows.Next() {858		t.Fatal("expected record to exist")859	}860	var state string861	var numResets int862	if err := rows.Scan(&state, &numResets); err != nil {863		t.Fatalf("unexpected error scanning record: %s", err)864	}865	if state != "queued" {866		t.Errorf("unexpected state. want=%q have=%q", "queued", state)867	}868	if numResets != 2 {869		t.Errorf("unexpected num resets. want=%d have=%d", 2, numResets)870	}871	rows, err = db.QueryContext(context.Background(), `SELECT state FROM workerutil_test WHERE id = 6`)872	if err != nil {873		t.Fatalf("unexpected error querying record: %s", err)874	}875	defer func() { _ = basestore.CloseRows(rows, nil) }()876	if !rows.Next() {877		t.Fatal("expected record to exist")878	}879	if err := rows.Scan(&state); err != nil {880		t.Fatalf("unexpected error scanning record: %s", err)881	}882	if state != "failed" {883		t.Errorf("unexpected state. want=%q have=%q", "failed", state)884	}885}886func TestStoreHeartbeat(t *testing.T) {887	db := setupStoreTest(t)888	now := time.Unix(1587396557, 0).UTC()889	clock := glock.NewMockClockAt(now)890	store := testStore(db, defaultTestStoreOptions(clock))891	if err := store.Exec(context.Background(), sqlf.Sprintf(`892		INSERT INTO workerutil_test (id, state, worker_hostname, last_heartbeat_at)893		VALUES894			(1, 'queued', 'worker1', %s),895			(2, 'queued', 'worker1', %s),896			(3, 'queued', 'worker2', %s)897	`, now, now, now)); err != nil {898		t.Fatalf("unexpected error inserting records: %s", err)899	}900	readAndCompareTimes := func(expected map[int]time.Duration) {901		times, err := scanLastHeartbeatTimestampsFrom(clock.Now())(store.Query(context.Background(), sqlf.Sprintf(`902			SELECT id, last_heartbeat_at FROM workerutil_test903		`)))904		if err != nil {905			t.Fatalf("unexpected error scanning heartbeats: %s", err)906		}907		if diff := cmp.Diff(expected, times); diff != "" {908			t.Errorf("unexpected times (-want +got):\n%s", diff)909		}910	}911	clock.Advance(5 * time.Second)912	if _, err := store.Heartbeat(context.Background(), []int{1, 2, 3}, HeartbeatOptions{}); err != nil {913		t.Fatalf("unexpected error updating heartbeat: %s", err)914	}915	readAndCompareTimes(map[int]time.Duration{916		1: 5 * time.Second, // not updated, clock advanced 5s from start; note state='queued'917		2: 5 * time.Second, // not updated, clock advanced 5s from start; note state='queued'918		3: 5 * time.Second, // not updated, clock advanced 5s from start; note state='queued'919	})920	// Now update state to processing and expect it to update properly.921	if _, err := db.ExecContext(context.Background(), `UPDATE workerutil_test SET state = 'processing'`); err != nil {922		t.Fatalf("unexpected error updating records: %s", err)923	}924	clock.Advance(5 * time.Second)925	// Only one worker926	if _, err := store.Heartbeat(context.Background(), []int{1, 2, 3}, HeartbeatOptions{WorkerHostname: "worker1"}); err != nil {927		t.Fatalf("unexpected error updating heartbeat: %s", err)928	}929	readAndCompareTimes(map[int]time.Duration{930		1: 0,                // updated931		2: 0,                // updated932		3: 10 * time.Second, // not updated, clock advanced 10s from start; note worker_hostname=worker2933	})934	clock.Advance(5 * time.Second)935	// Multiple workers936	if _, err := store.Heartbeat(context.Background(), []int{1, 3}, HeartbeatOptions{}); err != nil {937		t.Fatalf("unexpected error updating heartbeat: %s", err)938	}939	readAndCompareTimes(map[int]time.Duration{940		1: 0,               // updated941		2: 5 * time.Second, // not in known ID list942		3: 0,               // updated943	})944}...

Full Screen

Full Screen

bulk_processor_test.go

Source:bulk_processor_test.go Github

copy

Full Screen

...19)20func TestBulkProcessor(t *testing.T) {21	t.Parallel()22	ctx := context.Background()23	sqlDB := dbtest.NewDB(t)24	tx := dbtest.NewTx(t, sqlDB)25	db := database.NewDB(sqlDB)26	bstore := store.New(database.NewDBWith(basestore.NewWithHandle(basestore.NewHandleWithTx(tx, sql.TxOptions{}))), &observation.TestContext, nil)27	user := ct.CreateTestUser(t, db, true)28	repo, _ := ct.CreateTestRepo(t, ctx, db)29	ct.CreateTestSiteCredential(t, bstore, repo)30	batchSpec := ct.CreateBatchSpec(t, ctx, bstore, "test-bulk", user.ID)31	batchChange := ct.CreateBatchChange(t, ctx, bstore, "test-bulk", user.ID, batchSpec.ID)32	changesetSpec := ct.CreateChangesetSpec(t, ctx, bstore, ct.TestSpecOpts{33		User:      user.ID,34		Repo:      repo.ID,35		BatchSpec: batchSpec.ID,36		HeadRef:   "main",37	})38	changeset := ct.CreateChangeset(t, ctx, bstore, ct.TestChangesetOpts{39		Repo:                repo.ID,40		BatchChanges:        []types.BatchChangeAssoc{{BatchChangeID: batchChange.ID}},41		Metadata:            &github.PullRequest{},42		ExternalServiceType: extsvc.TypeGitHub,43		CurrentSpec:         changesetSpec.ID,44	})45	t.Run("Unknown job type", func(t *testing.T) {46		fake := &sources.FakeChangesetSource{}47		bp := &bulkProcessor{48			tx:      bstore,49			sourcer: sources.NewFakeSourcer(nil, fake),50		}51		job := &types.ChangesetJob{JobType: types.ChangesetJobType("UNKNOWN")}52		err := bp.Process(ctx, job)53		if err == nil || err.Error() != `invalid job type "UNKNOWN"` {54			t.Fatalf("unexpected error returned %s", err)55		}56	})57	t.Run("changeset is processing", func(t *testing.T) {58		processingChangeset := ct.CreateChangeset(t, ctx, bstore, ct.TestChangesetOpts{59			Repo:                repo.ID,60			BatchChanges:        []types.BatchChangeAssoc{{BatchChangeID: batchChange.ID}},61			Metadata:            &github.PullRequest{},62			ExternalServiceType: extsvc.TypeGitHub,63			CurrentSpec:         changesetSpec.ID,64			ReconcilerState:     btypes.ReconcilerStateProcessing,65		})66		job := &types.ChangesetJob{67			// JobType doesn't matter but we need one for database validation68			JobType:     types.ChangesetJobTypeComment,69			ChangesetID: processingChangeset.ID,70			UserID:      user.ID,71		}72		if err := bstore.CreateChangesetJob(ctx, job); err != nil {73			t.Fatal(err)74		}75		bp := &bulkProcessor{tx: bstore}76		err := bp.Process(ctx, job)77		if err != changesetIsProcessingErr {78			t.Fatalf("unexpected error. want=%s, got=%s", changesetIsProcessingErr, err)79		}80	})81	t.Run("Comment job", func(t *testing.T) {82		fake := &sources.FakeChangesetSource{}83		bp := &bulkProcessor{84			tx:      bstore,85			sourcer: sources.NewFakeSourcer(nil, fake),86		}87		job := &types.ChangesetJob{88			JobType:     types.ChangesetJobTypeComment,89			ChangesetID: changeset.ID,90			UserID:      user.ID,91			Payload:     &btypes.ChangesetJobCommentPayload{},92		}93		if err := bstore.CreateChangesetJob(ctx, job); err != nil {94			t.Fatal(err)95		}96		err := bp.Process(ctx, job)97		if err != nil {98			t.Fatal(err)99		}100		if !fake.CreateCommentCalled {101			t.Fatal("expected CreateComment to be called but wasn't")102		}103	})104	t.Run("Detach job", func(t *testing.T) {105		fake := &sources.FakeChangesetSource{}106		bp := &bulkProcessor{107			tx:      bstore,108			sourcer: sources.NewFakeSourcer(nil, fake),109		}110		job := &types.ChangesetJob{111			JobType:       types.ChangesetJobTypeDetach,112			ChangesetID:   changeset.ID,113			UserID:        user.ID,114			BatchChangeID: batchChange.ID,115			Payload:       &btypes.ChangesetJobDetachPayload{},116		}117		err := bp.Process(ctx, job)118		if err != nil {119			t.Fatal(err)120		}121		ch, err := bstore.GetChangesetByID(ctx, changeset.ID)122		if err != nil {123			t.Fatal(err)124		}125		if len(ch.BatchChanges) != 1 {126			t.Fatalf("invalid batch changes associated, expected one, got=%+v", ch.BatchChanges)127		}128		if !ch.BatchChanges[0].Detach {129			t.Fatal("not marked as to be detached")130		}131		if ch.ReconcilerState != btypes.ReconcilerStateQueued {132			t.Fatalf("invalid reconciler state, got=%q", ch.ReconcilerState)133		}134	})135	t.Run("Reenqueue job", func(t *testing.T) {136		fake := &sources.FakeChangesetSource{}137		bp := &bulkProcessor{138			tx:      bstore,139			sourcer: sources.NewFakeSourcer(nil, fake),140		}141		job := &types.ChangesetJob{142			JobType:     types.ChangesetJobTypeReenqueue,143			ChangesetID: changeset.ID,144			UserID:      user.ID,145			Payload:     &btypes.ChangesetJobReenqueuePayload{},146		}147		changeset.ReconcilerState = btypes.ReconcilerStateFailed148		if err := bstore.UpdateChangeset(ctx, changeset); err != nil {149			t.Fatal(err)150		}151		err := bp.Process(ctx, job)152		if err != nil {153			t.Fatal(err)154		}155		changeset, err = bstore.GetChangesetByID(ctx, changeset.ID)156		if err != nil {157			t.Fatal(err)158		}159		if have, want := changeset.ReconcilerState, btypes.ReconcilerStateQueued; have != want {160			t.Fatalf("unexpected reconciler state, have=%q want=%q", have, want)161		}162	})163	t.Run("Merge job", func(t *testing.T) {164		fake := &sources.FakeChangesetSource{}165		bp := &bulkProcessor{166			tx:      bstore,167			sourcer: sources.NewFakeSourcer(nil, fake),168		}169		job := &types.ChangesetJob{170			JobType:     types.ChangesetJobTypeMerge,171			ChangesetID: changeset.ID,172			UserID:      user.ID,173			Payload:     &btypes.ChangesetJobMergePayload{},174		}175		err := bp.Process(ctx, job)176		if err != nil {177			t.Fatal(err)178		}179		if !fake.MergeChangesetCalled {180			t.Fatal("expected MergeChangeset to be called but wasn't")181		}182	})183	t.Run("Close job", func(t *testing.T) {184		fake := &sources.FakeChangesetSource{FakeMetadata: &github.PullRequest{}}185		bp := &bulkProcessor{186			tx:      bstore,187			sourcer: sources.NewFakeSourcer(nil, fake),188		}189		job := &types.ChangesetJob{190			JobType:     types.ChangesetJobTypeClose,191			ChangesetID: changeset.ID,192			UserID:      user.ID,193			Payload:     &btypes.ChangesetJobClosePayload{},194		}195		err := bp.Process(ctx, job)196		if err != nil {197			t.Fatal(err)198		}199		if !fake.CloseChangesetCalled {200			t.Fatal("expected CloseChangeset to be called but wasn't")201		}202	})203	t.Run("Publish job", func(t *testing.T) {204		fake := &sources.FakeChangesetSource{FakeMetadata: &github.PullRequest{}}205		bp := &bulkProcessor{206			tx:      bstore,207			sourcer: sources.NewFakeSourcer(nil, fake),208		}209		t.Run("errors", func(t *testing.T) {210			for name, tc := range map[string]struct {211				spec          *ct.TestSpecOpts212				changeset     ct.TestChangesetOpts213				wantRetryable bool214			}{215				"imported changeset": {216					spec: nil,217					changeset: ct.TestChangesetOpts{218						Repo:            repo.ID,219						BatchChange:     batchChange.ID,220						CurrentSpec:     0,221						ReconcilerState: btypes.ReconcilerStateCompleted,222					},223					wantRetryable: false,224				},225				"bogus changeset spec ID, dude": {226					spec: nil,227					changeset: ct.TestChangesetOpts{228						Repo:            repo.ID,229						BatchChange:     batchChange.ID,230						CurrentSpec:     -1,231						ReconcilerState: btypes.ReconcilerStateCompleted,232					},233					wantRetryable: false,234				},235				"publication state set": {236					spec: &ct.TestSpecOpts{237						User:      user.ID,238						Repo:      repo.ID,239						BatchSpec: batchSpec.ID,240						HeadRef:   "main",241						Published: false,242					},243					changeset: ct.TestChangesetOpts{244						Repo:            repo.ID,245						BatchChange:     batchChange.ID,246						ReconcilerState: btypes.ReconcilerStateCompleted,247					},248					wantRetryable: false,249				},250			} {251				t.Run(name, func(t *testing.T) {252					var changesetSpec *btypes.ChangesetSpec253					if tc.spec != nil {254						changesetSpec = ct.CreateChangesetSpec(t, ctx, bstore, *tc.spec)255					}256					if changesetSpec != nil {257						tc.changeset.CurrentSpec = changesetSpec.ID258					}259					changeset := ct.CreateChangeset(t, ctx, bstore, tc.changeset)260					job := &types.ChangesetJob{261						JobType:       types.ChangesetJobTypePublish,262						BatchChangeID: batchChange.ID,263						ChangesetID:   changeset.ID,264						UserID:        user.ID,265						Payload: &types.ChangesetJobPublishPayload{266							Draft: false,267						},268					}269					if err := bp.Process(ctx, job); err == nil {270						t.Error("unexpected nil error")271					} else if tc.wantRetryable && errcode.IsNonRetryable(err) {272						t.Errorf("error is not retryable: %v", err)273					} else if !tc.wantRetryable && !errcode.IsNonRetryable(err) {274						t.Errorf("error is retryable: %v", err)275					}276				})277			}278		})279		t.Run("success", func(t *testing.T) {280			for _, reconcilerState := range []btypes.ReconcilerState{281				btypes.ReconcilerStateCompleted,282				btypes.ReconcilerStateErrored,283				btypes.ReconcilerStateFailed,284				btypes.ReconcilerStateQueued,285				btypes.ReconcilerStateScheduled,286			} {287				t.Run(string(reconcilerState), func(t *testing.T) {288					for name, draft := range map[string]bool{289						"draft":     true,290						"published": false,291					} {292						t.Run(name, func(t *testing.T) {293							changesetSpec := ct.CreateChangesetSpec(t, ctx, bstore, ct.TestSpecOpts{294								User:      user.ID,295								Repo:      repo.ID,296								BatchSpec: batchSpec.ID,297								HeadRef:   "main",298							})299							changeset := ct.CreateChangeset(t, ctx, bstore, ct.TestChangesetOpts{300								Repo:            repo.ID,301								BatchChange:     batchChange.ID,302								CurrentSpec:     changesetSpec.ID,303								ReconcilerState: reconcilerState,304							})305							job := &types.ChangesetJob{306								JobType:       types.ChangesetJobTypePublish,307								BatchChangeID: batchChange.ID,308								ChangesetID:   changeset.ID,309								UserID:        user.ID,310								Payload: &types.ChangesetJobPublishPayload{311									Draft: draft,312								},313							}314							if err := bp.Process(ctx, job); err != nil {315								t.Errorf("unexpected error: %v", err)316							}317							changeset, err := bstore.GetChangesetByID(ctx, changeset.ID)318							if err != nil {319								t.Fatal(err)320							}321							var want btypes.ChangesetUiPublicationState322							if draft {323								want = btypes.ChangesetUiPublicationStateDraft324							} else {325								want = btypes.ChangesetUiPublicationStatePublished326							}327							if have := changeset.UiPublicationState; have == nil || *have != want {328								t.Fatalf("unexpected UI publication state: have=%v want=%q", have, want)329							}330							if have, want := changeset.ReconcilerState, global.DefaultReconcilerEnqueueState(); have != want {331								t.Fatalf("unexpected reconciler state, have=%q want=%q", have, want)332							}333						})334					}335				})336			}337		})338	})339}...

Full Screen

Full Screen

D

Using AI Code Generation

copy

Full Screen

1import Quick2import Nimble3class D: QuickSpec {4    override func spec() {5        describe("D") {6            it("D") {7                expect(1).to(equal(1))8            }9        }10    }11}12import Quick13import Nimble14class D: QuickSpec {15    override func spec() {16        describe("D") {17            it("D") {18                expect(1).to(equal(1))19            }20        }21    }22}23import Quick24import Nimble25class D: QuickSpec {26    override func spec() {27        describe("D") {28            it("D") {29                expect(1).to(equal(1))30            }31        }32    }33}34import Quick35import Nimble36class D: QuickSpec {37    override func spec() {38        describe("D") {39            it("D") {40                expect(1).to(equal(1))41            }42        }43    }44}45import Quick46import Nimble47class D: QuickSpec {48    override func spec() {49        describe("D") {50            it("D") {51                expect(1).to(equal(1))52            }53        }54    }55}56import Quick57import Nimble58class D: QuickSpec {59    override func spec() {60        describe("D") {61            it("D") {62                expect(1).to(equal(1))63            }64        }65    }66}67import Quick68import Nimble69class D: QuickSpec {70    override func spec() {71        describe("D") {72            it("D") {73                expect(1).to(equal(1))74            }75        }76    }77}78import Quick79import Nimble80class D: QuickSpec {81    override func spec() {82        describe("D") {83            it("D") {84                expect(1).to(equal(1))85            }86        }87    }88}

Full Screen

Full Screen

D

Using AI Code Generation

copy

Full Screen

1import QuickStart2var d = D()3d.doSomething()4import QuickStart5var e = E()6e.doSomething()7import QuickStart8var f = F()9f.doSomething()10import QuickStart11var g = G()12g.doSomething()13import QuickStart14var h = H()15h.doSomething()16import QuickStart17var i = I()18i.doSomething()19import QuickStart20var j = J()21j.doSomething()22import QuickStart23var k = K()24k.doSomething()25import QuickStart26var l = L()27l.doSomething()28import QuickStart29var m = M()30m.doSomething()31import QuickStart32var n = N()33n.doSomething()34import QuickStart35var o = O()36o.doSomething()37import QuickStart38var p = P()39p.doSomething()40import QuickStart41var q = Q()42q.doSomething()43import QuickStart44var r = R()45r.doSomething()46import QuickStart47var s = S()48s.doSomething()49import QuickStart50var t = T()51t.doSomething()

Full Screen

Full Screen

D

Using AI Code Generation

copy

Full Screen

1import Quick2import Nimble3import Foundation4class D: QuickSpec {5    override func spec() {6        describe("D") {7            it("has a test") {8                expect(true).to(beTrue())9            }10        }11    }12}13import Quick14import Nimble15import Foundation16class C: QuickSpec {17    override func spec() {18        describe("C") {19            it("has a test") {20                expect(true).to(beTrue())21            }22        }23    }24}25import Quick26import Nimble27import Foundation28class B: QuickSpec {29    override func spec() {30        describe("B") {31            it("has a test") {32                expect(true).to(beTrue())33            }34        }35    }36}37import Quick38import Nimble39import Foundation40class A: QuickSpec {41    override func spec() {42        describe("A") {43            it("has a test") {44                expect(true).to(beTrue())45            }46        }47    }48}49import Quick50import Nimble51import Foundation52class E: QuickSpec {53    override func spec() {54        describe("E") {55            it("has a test") {56                expect(true).to(beTrue())57            }58        }59    }60}61import Quick62import Nimble63import Foundation64class F: QuickSpec {65    override func spec() {66        describe("F") {67            it("has a test") {68                expect(true).to(beTrue())69            }70        }71    }72}73import Quick74import Nimble75import Foundation76class G: QuickSpec {77    override func spec() {78        describe("G") {79            it("has a test") {80                expect(true).to(beTrue())81            }82        }83    }84}85import Quick86import Nimble87import Foundation88class H: QuickSpec {89    override func spec() {90        describe("H") {

Full Screen

Full Screen

D

Using AI Code Generation

copy

Full Screen

1import QuickLook2var d = D()3import QuickLookUI4var d = D()5import QuickLookUI6var d = D()7import QuickLook8var d = D()9error: ambiguous use of 'D()'10var d = D()11clang: error: linker command failed with exit code 1 (use -v to see invocation)12I'm trying to use the QuickLook framework in my iOS app. I've added the framework to my project and I've added the #import <QuickLook/QuickLook.h> to my header file. However, when I try to build my project, I get the following error

Full Screen

Full Screen

D

Using AI Code Generation

copy

Full Screen

1import QuickLook2let d = D()3d.doSomething()4import QuickLook5let d = D()6d.doSomething()7import QuickLook8import QuickLook9class MyTableViewCell: UITableViewCell {10    var myString: String? {11        didSet {12        }13    }14    override func awakeFromNib() {15        super.awakeFromNib()16    }17    override func setSelected(_ selected: Bool, animated: Bool) {18        super.setSelected(selected, animated: animated)19    }20}21class ViewController: UIViewController, UITableViewDataSource, UITableViewDelegate {22    override func viewDidLoad() {23        super.viewDidLoad()24        tableView.register(UINib(nibName: "MyTableViewCell", bundle: nil), forCellReuseIdentifier: "MyTableViewCell")25    }26    override func didReceiveMemoryWarning() {27        super.didReceiveMemoryWarning()28    }29    func tableView(_ tableView: UITableView, numberOfRowsInSection section: Int) -> Int {30    }31    func tableView(_ tableView: UITableView, cellForRowAt indexPath: IndexPath) -> UITableViewCell {32        let cell = tableView.dequeueReusableCell(withIdentifier: "MyTableViewCell", for: indexPath) as! MyTableViewCell33    }34}

Full Screen

Full Screen

D

Using AI Code Generation

copy

Full Screen

1import QuickType2let d = D()3d.test()4import QuickType5let d = D()6d.test()7import QuickType8let d = D()9d.test()10import QuickType11let d = D()12d.test()13import QuickType14let d = D()15d.test()16import QuickType

Full Screen

Full Screen

D

Using AI Code Generation

copy

Full Screen

1let d = D()2print("d.property1 = \(d.property1)")3print("d.property2 = \(d.property2)")4print("d.property3 = \(d.property3)")5print("d.property4 = \(d.property4)")6print("d.property5 = \(d.property5)")7print("d.property6 = \(d.property6)")8print("d.property7 = \(d.property7)")9print("d.property8 = \(d.property8)")10print("d.property9 = \(d.property9)")11print("d.property10 = \(d.property10)")12print("d.property11 = \(d.property11)")13print("d.property12 = \(d.property12)")14print("d.property13 = \(d.property13)")15print("d.property14 = \(d.property14)")16print("d.property15 = \(d.property15)")17print("d.property16 = \(d.property16)")18print("d.property17 = \(d.property17)")19print("d.property18 = \(d.property18)")20print("d.property19 = \(d.property19)")21print("d.property20 = \(d.property20)")22print("d.property21 = \(d.property21)")23print("d.property22 = \(d.property22)")24print("d.property23 = \(d.property23)")25print("d.property24 = \(d.property24)")26print("d.property25 = \(d.property25)")27print("d.property26 = \(d.property26)")28print("d.property27 = \(d.property27)")29print("d.property28 = \(d.property28)")30print("d.property29 = \(d.property29)")31print("d.property30 = \(d.property30)")32print("d.property31 = \(d.property31)")33print("d.property32 = \(d.property32)")34print("d.property33 = \(d.property33)")35print("d.property34 = \(d.property34)")36print("d.property35 = \(d.property35)")37print("d.property36 = \(d.property36)")38print("d.property37 = \(d.property37)")39print("d.property38 = \(d.property38)")40print("d.property39 = \(d.property39)")41print("d.property40 = \(d.property40)")42print("d.property41 = \(d.property41)")43print("d.property42 = \(d.property42

Full Screen

Full Screen

D

Using AI Code Generation

copy

Full Screen

1import Foundation2let sorted = D.quickSort(list)3print(sorted)4import Foundation5let sorted = C.quickSort(list)6print(sorted)7import Foundation8let sorted = B.quickSort(list)9print(sorted)10import Foundation11let sorted = A.quickSort(list)12print(sorted)13import Foundation14let sorted = QuickSort.quickSort(list)15print(sorted)16import Foundation17let sorted = QuickSort.quickSort(list)18print(sorted)19import Foundation20let sorted = QuickSort.quickSort(list)21print(sorted)22import Foundation23let sorted = QuickSort.quickSort(list)24print(sorted)25import Foundation

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Quick automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful