How to use Drain method of internal Package

Best Ginkgo code snippet using internal.Drain

processorsbase.go

Source:processorsbase.go Github

copy

Full Screen

...207	if outRow == nil {208		if ok {209			return NeedMoreRows, nil210		}211		return DrainRequested, nil212	}213	if log.V(3) {214		log.InfofDepth(ctx, 1, "pushing row %s", outRow.String(h.OutputTypes))215	}216	if r := output.Push(outRow, nil); r != NeedMoreRows {217		log.VEventf(ctx, 1, "no more rows required. drain requested: %t",218			r == DrainRequested)219		return r, nil220	}221	if h.rowIdx == h.maxRowIdx {222		log.VEventf(ctx, 1, "hit row limit; asking producer to drain")223		return DrainRequested, nil224	}225	status := NeedMoreRows226	if !ok {227		status = DrainRequested228	}229	return status, nil230}231// ProcessRow sends the invoked row through the post-processing stage and returns232// the post-processed row. Results from ProcessRow aren't safe past the next call233// to ProcessRow.234//235// The moreRowsOK retval is true if more rows can be processed, false if the236// limit has been reached (if there's a limit). Upon seeing a false value, the237// caller is expected to start draining. Note that both a row and238// moreRowsOK=false can be returned at the same time: the row that satisfies the239// limit is returned at the same time as a DrainRequested status. In that case,240// the caller is supposed to both deal with the row and start draining.241func (h *ProcOutputHelper) ProcessRow(242	ctx context.Context, row rowenc.EncDatumRow,243) (_ rowenc.EncDatumRow, moreRowsOK bool, _ error) {244	if h.rowIdx >= h.maxRowIdx {245		return nil, false, nil246	}247	h.rowIdx++248	if h.rowIdx <= h.offset {249		// Suppress row.250		return nil, true, nil251	}252	if len(h.renderExprs) > 0 {253		// Rendering.254		for i := range h.renderExprs {255			datum, err := h.renderExprs[i].Eval(row)256			if err != nil {257				return nil, false, err258			}259			h.outputRow[i] = rowenc.DatumToEncDatum(h.OutputTypes[i], datum)260		}261	} else if h.outputCols != nil {262		// Projection.263		for i, col := range h.outputCols {264			h.outputRow[i] = row[col]265		}266	} else {267		// No rendering or projection.268		return row, h.rowIdx < h.maxRowIdx, nil269	}270	// If this row satisfies the limit, the caller is told to drain.271	return h.outputRow, h.rowIdx < h.maxRowIdx, nil272}273// consumerClosed stops output of additional rows from ProcessRow.274func (h *ProcOutputHelper) consumerClosed() {275	h.rowIdx = h.maxRowIdx276}277// Stats returns output statistics.278func (h *ProcOutputHelper) Stats() execinfrapb.OutputStats {279	return execinfrapb.OutputStats{280		NumTuples: optional.MakeUint(h.rowIdx),281	}282}283// ProcessorConstructor is a function that creates a Processor. It is284// abstracted away so that we could create mixed flows (i.e. a vectorized flow285// with wrapped processors) without bringing a dependency on sql/rowexec286// package into sql/colexec package.287type ProcessorConstructor func(288	ctx context.Context,289	flowCtx *FlowCtx,290	processorID int32,291	core *execinfrapb.ProcessorCoreUnion,292	post *execinfrapb.PostProcessSpec,293	inputs []RowSource,294	outputs []RowReceiver,295	localProcessors []LocalProcessor,296) (Processor, error)297// ProcessorBase is supposed to be embedded by Processors. It provides298// facilities for dealing with filtering and projection (through a299// ProcOutputHelper) and for implementing the RowSource interface (draining,300// trailing metadata).301type ProcessorBase struct {302	ProcessorBaseNoHelper303	// OutputHelper is used to handle the post-processing spec.304	OutputHelper ProcOutputHelper305	// MemMonitor is the processor's memory monitor.306	MemMonitor *mon.BytesMonitor307	// SemaCtx is used to avoid allocating a new SemaCtx during processor setup.308	SemaCtx tree.SemaContext309}310// ProcessorBaseNoHelper is slightly reduced version of ProcessorBase that311// should be used by the processors that don't need to handle the312// post-processing spec.313type ProcessorBaseNoHelper struct {314	self RowSource315	ProcessorID int32316	// Output is the consumer of the rows produced by this ProcessorBase. If317	// Output is nil, one can invoke ProcessRow to obtain the post-processed row318	// directly.319	Output RowReceiver320	FlowCtx *FlowCtx321	// EvalCtx is used for expression evaluation. It overrides the one in flowCtx.322	EvalCtx *tree.EvalContext323	// Closed is set by InternalClose(). Once set, the processor's tracing span324	// has been closed.325	Closed bool326	// Ctx and span contain the tracing state while the processor is active327	// (i.e. hasn't been closed). Initialized using flowCtx.Ctx (which should not be otherwise328	// used).329	Ctx  context.Context330	span *tracing.Span331	// origCtx is the context from which ctx was derived. InternalClose() resets332	// ctx to this.333	origCtx context.Context334	State procState335	// ExecStatsForTrace, if set, will be called before getting the trace data from336	// the span and adding the recording to the trailing metadata. The returned337	// ComponentStats are associated with the processor's span. The Component338	// field of the returned stats will be set by the calling code.339	//340	// Can return nil.341	ExecStatsForTrace func() *execinfrapb.ComponentStats342	// trailingMetaCallback, if set, will be called by moveToTrailingMeta(). The343	// callback is expected to close all inputs, do other cleanup on the processor344	// (including calling InternalClose()) and generate the trailing meta that345	// needs to be returned to the consumer. As a special case,346	// moveToTrailingMeta() handles getting the tracing information into347	// trailingMeta, so the callback doesn't need to worry about that.348	//349	// If no callback is specified, InternalClose() will be called automatically.350	// So, if no trailing metadata other than the trace needs to be returned (and351	// other than what has otherwise been manually put in trailingMeta) and no352	// closing other than InternalClose is needed, then no callback needs to be353	// specified.354	trailingMetaCallback func() []execinfrapb.ProducerMetadata355	// trailingMeta is scratch space where metadata is stored to be returned356	// later.357	trailingMeta []execinfrapb.ProducerMetadata358	// inputsToDrain, if not empty, contains inputs to be drained by359	// DrainHelper(). MoveToDraining() calls ConsumerDone() on them,360	// InternalClose() calls ConsumerClosed() on then.361	//362	// ConsumerDone() is called on all inputs at once and then inputs are drained363	// one by one (in StateDraining, inputsToDrain[curInputToDrain] is the one364	// currently being drained).365	inputsToDrain []RowSource366	// curInputToDrain is the index into inputsToDrain that needs to be drained367	// next.368	curInputToDrain int369}370// MustBeStreaming implements the Processor interface.371func (pb *ProcessorBaseNoHelper) MustBeStreaming() bool {372	return false373}374// Reset resets this ProcessorBaseNoHelper, retaining allocated memory in375// slices.376func (pb *ProcessorBaseNoHelper) Reset() {377	// Deeply reset the slices so that we don't hold onto the old objects.378	for i := range pb.trailingMeta {379		pb.trailingMeta[i] = execinfrapb.ProducerMetadata{}380	}381	for i := range pb.inputsToDrain {382		pb.inputsToDrain[i] = nil383	}384	*pb = ProcessorBaseNoHelper{385		trailingMeta:  pb.trailingMeta[:0],386		inputsToDrain: pb.inputsToDrain[:0],387	}388}389// Reset resets this ProcessorBase, retaining allocated memory in slices.390func (pb *ProcessorBase) Reset() {391	pb.ProcessorBaseNoHelper.Reset()392	pb.OutputHelper.Reset()393	*pb = ProcessorBase{394		ProcessorBaseNoHelper: pb.ProcessorBaseNoHelper,395		OutputHelper:          pb.OutputHelper,396	}397}398// procState represents the standard states that a processor can be in. These399// states are relevant when the processor is using the draining utilities in400// ProcessorBase.401type procState int402//go:generate stringer -type=procState403const (404	// StateRunning is the common state of a processor: it's producing rows for405	// its consumer and forwarding metadata from its input. Different processors406	// might have sub-states internally.407	//408	// If the consumer calls ConsumerDone or if the ProcOutputHelper.maxRowIdx is409	// reached, then the processor will transition to StateDraining. If the input410	// is exhausted, then the processor can transition to StateTrailingMeta411	// directly, although most always go through StateDraining.412	StateRunning procState = iota413	// StateDraining is the state in which the processor is forwarding metadata414	// from its input and otherwise ignoring all rows. Once the input is415	// exhausted, the processor will transition to StateTrailingMeta.416	//417	// In StateDraining, processors are required to swallow418	// ReadWithinUncertaintyIntervalErrors received from its sources. We're419	// already draining, so we don't care about whatever data generated this420	// uncertainty error. Besides generally seeming like a good idea, doing this421	// allows us to offer a nice guarantee to SQL clients: a read-only query that422	// produces at most one row, run as an implicit txn, never produces retriable423	// errors, regardless of the size of the row being returned (in relation to424	// the size of the result buffer on the connection). One would naively expect425	// that to be true: either the error happens before any rows have been426	// delivered to the client, in which case the auto-retries kick in, or, if a427	// row has been delivered, then the query is done and so how can there be an428	// error? What our naive friend is ignoring is that, if it weren't for this429	// code, it'd be possible for a retriable error to sneak in after the query's430	// limit has been satisfied but while processors are still draining. Note431	// that uncertainty errors are not retried automatically by the leaf432	// TxnCoordSenders (i.e. by refresh txn interceptor).433	//434	// Other categories of errors might be safe to ignore too; however we435	// can't ignore all of them. Generally, we need to ensure that all the436	// trailing metadata (e.g. LeafTxnFinalState's) make it to the gateway for437	// successful flows. If an error is telling us that some metadata might438	// have been dropped, we can't ignore that.439	StateDraining440	// StateTrailingMeta is the state in which the processor is outputting final441	// metadata such as the tracing information or the LeafTxnFinalState. Once all the442	// trailing metadata has been produced, the processor transitions to443	// StateExhausted.444	StateTrailingMeta445	// StateExhausted is the state of a processor that has no more rows or446	// metadata to produce.447	StateExhausted448)449// MoveToDraining switches the processor to the StateDraining. Only metadata is450// returned from now on. In this state, the processor is expected to drain its451// inputs (commonly by using DrainHelper()).452//453// If the processor has no input (ProcStateOpts.inputToDrain was not specified454// at init() time), then we move straight to the StateTrailingMeta.455//456// An error can be optionally passed. It will be the first piece of metadata457// returned by DrainHelper().458func (pb *ProcessorBaseNoHelper) MoveToDraining(err error) {459	if pb.State != StateRunning {460		// Calling MoveToDraining in any state is allowed in order to facilitate the461		// ConsumerDone() implementations that just call this unconditionally.462		// However, calling it with an error in states other than StateRunning is463		// not permitted.464		if err != nil {465			logcrash.ReportOrPanic(466				pb.Ctx,467				&pb.FlowCtx.Cfg.Settings.SV,468				"MoveToDraining called in state %s with err: %+v",469				pb.State, err)470		}471		return472	}473	if err != nil {474		pb.trailingMeta = append(pb.trailingMeta, execinfrapb.ProducerMetadata{Err: err})475	}476	if pb.curInputToDrain < len(pb.inputsToDrain) {477		// We go to StateDraining here. DrainHelper() will transition to478		// StateTrailingMeta when the inputs are drained (including if the inputs479		// are already drained).480		pb.State = StateDraining481		for _, input := range pb.inputsToDrain[pb.curInputToDrain:] {482			input.ConsumerDone()483		}484	} else {485		pb.moveToTrailingMeta()486	}487}488// DrainHelper is supposed to be used in states draining and trailingMetadata.489// It deals with optionally draining an input and returning trailing meta. It490// also moves from StateDraining to StateTrailingMeta when appropriate.491func (pb *ProcessorBaseNoHelper) DrainHelper() *execinfrapb.ProducerMetadata {492	if pb.State == StateRunning {493		logcrash.ReportOrPanic(494			pb.Ctx,495			&pb.FlowCtx.Cfg.Settings.SV,496			"drain helper called in StateRunning",497		)498	}499	// trailingMeta always has priority; it seems like a good idea because it500	// causes metadata to be sent quickly after it is produced (e.g. the error501	// passed to MoveToDraining()).502	if len(pb.trailingMeta) > 0 {503		return pb.popTrailingMeta()504	}505	if pb.State != StateDraining {506		return nil507	}508	// Ignore all rows; only return meta.509	for {510		input := pb.inputsToDrain[pb.curInputToDrain]511		row, meta := input.Next()512		if row == nil && meta == nil {513			pb.curInputToDrain++514			if pb.curInputToDrain >= len(pb.inputsToDrain) {515				pb.moveToTrailingMeta()516				return pb.popTrailingMeta()517			}518			continue519		}520		if meta != nil {521			// Swallow ReadWithinUncertaintyIntervalErrors. See comments on522			// StateDraining.523			if ShouldSwallowReadWithinUncertaintyIntervalError(meta) {524				continue525			}526			return meta527		}528	}529}530// ShouldSwallowReadWithinUncertaintyIntervalError examines meta and returns531// true if it should be swallowed and not propagated further. It is the case if532// meta contains roachpb.ReadWithinUncertaintyIntervalError.533func ShouldSwallowReadWithinUncertaintyIntervalError(meta *execinfrapb.ProducerMetadata) bool {534	if err := meta.Err; err != nil {535		// We only look for UnhandledRetryableErrors. Local reads (which would536		// be transformed by the Root TxnCoordSender into537		// TransactionRetryWithProtoRefreshErrors) don't have any uncertainty.538		if ure := (*roachpb.UnhandledRetryableError)(nil); errors.As(err, &ure) {539			if _, uncertain := ure.PErr.GetDetail().(*roachpb.ReadWithinUncertaintyIntervalError); uncertain {540				return true541			}542		}543	}544	return false545}546// popTrailingMeta peels off one piece of trailing metadata or advances to547// StateExhausted if there's no more trailing metadata.548func (pb *ProcessorBaseNoHelper) popTrailingMeta() *execinfrapb.ProducerMetadata {549	if len(pb.trailingMeta) > 0 {550		meta := &pb.trailingMeta[0]551		pb.trailingMeta = pb.trailingMeta[1:]552		return meta553	}554	pb.State = StateExhausted555	return nil556}557// ExecStatsForTraceHijacker is an interface that allows us to hijack558// ExecStatsForTrace function from the ProcessorBase.559type ExecStatsForTraceHijacker interface {560	// HijackExecStatsForTrace returns ExecStatsForTrace function, if set, and561	// sets it to nil. The caller becomes responsible for collecting and562	// propagating the execution statistics.563	HijackExecStatsForTrace() func() *execinfrapb.ComponentStats564}565var _ ExecStatsForTraceHijacker = &ProcessorBase{}566// HijackExecStatsForTrace is a part of the ExecStatsForTraceHijacker interface.567func (pb *ProcessorBase) HijackExecStatsForTrace() func() *execinfrapb.ComponentStats {568	execStatsForTrace := pb.ExecStatsForTrace569	pb.ExecStatsForTrace = nil570	return execStatsForTrace571}572// moveToTrailingMeta switches the processor to the "trailing meta" state: only573// trailing metadata is returned from now on. For simplicity, processors are574// encouraged to always use MoveToDraining() instead of this method, even when575// there's nothing to drain. moveToDrain() or DrainHelper() will internally call576// moveToTrailingMeta().577//578// trailingMetaCallback, if any, is called; it is expected to close the579// processor's inputs.580//581// This method is to be called when the processor is done producing rows and582// draining its inputs (if it wants to drain them).583func (pb *ProcessorBaseNoHelper) moveToTrailingMeta() {584	if pb.State == StateTrailingMeta || pb.State == StateExhausted {585		logcrash.ReportOrPanic(586			pb.Ctx,587			&pb.FlowCtx.Cfg.Settings.SV,588			"moveToTrailingMeta called in state: %s",589			pb.State,590		)591	}592	pb.State = StateTrailingMeta593	if pb.span != nil {594		if pb.ExecStatsForTrace != nil {595			if stats := pb.ExecStatsForTrace(); stats != nil {596				stats.Component = pb.FlowCtx.ProcessorComponentID(pb.ProcessorID)597				pb.span.RecordStructured(stats)598			}599		}600		if trace := pb.span.GetRecording(); trace != nil {601			pb.trailingMeta = append(pb.trailingMeta, execinfrapb.ProducerMetadata{TraceData: trace})602		}603	}604	if util.CrdbTestBuild && pb.Ctx == nil {605		panic(606			errors.AssertionFailedf(607				"unexpected nil ProcessorBase.Ctx when draining. Was StartInternal called?",608			),609		)610	}611	// trailingMetaCallback is called after reading the tracing data because it612	// generally calls InternalClose, indirectly, which switches the context and613	// the span.614	if pb.trailingMetaCallback != nil {615		pb.trailingMeta = append(pb.trailingMeta, pb.trailingMetaCallback()...)616	} else {617		pb.InternalClose()618	}619}620// ProcessRowHelper is a wrapper on top of ProcOutputHelper.ProcessRow(). It621// takes care of handling errors and drain requests by moving the processor to622// StateDraining.623//624// It takes a row and returns the row after processing. The return value can be625// nil, in which case the caller shouldn't return anything to its consumer; it626// should continue processing other rows, with the awareness that the processor627// might have been transitioned to the draining phase.628func (pb *ProcessorBase) ProcessRowHelper(row rowenc.EncDatumRow) rowenc.EncDatumRow {629	outRow, ok, err := pb.OutputHelper.ProcessRow(pb.Ctx, row)630	if err != nil {631		pb.MoveToDraining(err)632		return nil633	}634	if !ok {635		pb.MoveToDraining(nil /* err */)636	}637	// Note that outRow might be nil here.638	// TODO(yuzefovich): there is a problem with this logging when MetadataTest*639	// processors are planned - there is a mismatch between the row and the640	// output types (rendering is added to the stage of test processors and the641	// actual processors that are inputs to the test ones have an unset post642	// processing; I think that we need to set the post processing on the stages643	// of processors below the test ones).644	//if outRow != nil && log.V(3) && pb.Ctx != nil {645	//	log.InfofDepth(pb.Ctx, 1, "pushing row %s", outRow.String(pb.Out.OutputTypes))646	//}647	return outRow648}649// OutputTypes is part of the Processor interface.650func (pb *ProcessorBase) OutputTypes() []*types.T {651	return pb.OutputHelper.OutputTypes652}653// Run is part of the Processor interface.654func (pb *ProcessorBaseNoHelper) Run(ctx context.Context) {655	if pb.Output == nil {656		panic("processor output is not set for emitting rows")657	}658	pb.self.Start(ctx)659	Run(pb.Ctx, pb.self, pb.Output)660}661// ProcStateOpts contains fields used by the ProcessorBase's family of functions662// that deal with draining and trailing metadata: the ProcessorBase implements663// generic useful functionality that needs to call back into the Processor.664type ProcStateOpts struct {665	// TrailingMetaCallback, if specified, is a callback to be called by666	// moveToTrailingMeta(). See ProcessorBase.TrailingMetaCallback.667	TrailingMetaCallback func() []execinfrapb.ProducerMetadata668	// InputsToDrain, if specified, will be drained by DrainHelper().669	// MoveToDraining() calls ConsumerDone() on them, InternalClose() calls670	// ConsumerClosed() on them.671	InputsToDrain []RowSource672}673// Init initializes the ProcessorBase.674// - coreOutputTypes are the type schema of the rows output by the processor675// core (i.e. the "internal schema" of the processor, see676// execinfrapb.ProcessorSpec for more details).677func (pb *ProcessorBase) Init(678	self RowSource,679	post *execinfrapb.PostProcessSpec,680	coreOutputTypes []*types.T,681	flowCtx *FlowCtx,682	processorID int32,683	output RowReceiver,684	memMonitor *mon.BytesMonitor,685	opts ProcStateOpts,686) error {687	return pb.InitWithEvalCtx(688		self, post, coreOutputTypes, flowCtx, flowCtx.NewEvalCtx(), processorID, output, memMonitor, opts,689	)690}691// InitWithEvalCtx initializes the ProcessorBase with a given EvalContext.692// - coreOutputTypes are the type schema of the rows output by the processor693// core (i.e. the "internal schema" of the processor, see694// execinfrapb.ProcessorSpec for more details).695func (pb *ProcessorBase) InitWithEvalCtx(696	self RowSource,697	post *execinfrapb.PostProcessSpec,698	coreOutputTypes []*types.T,699	flowCtx *FlowCtx,700	evalCtx *tree.EvalContext,701	processorID int32,702	output RowReceiver,703	memMonitor *mon.BytesMonitor,704	opts ProcStateOpts,705) error {706	pb.ProcessorBaseNoHelper.Init(707		self, flowCtx, evalCtx, processorID, output, opts,708	)709	pb.MemMonitor = memMonitor710	// Hydrate all types used in the processor.711	resolver := flowCtx.TypeResolverFactory.NewTypeResolver(evalCtx.Txn)712	if err := resolver.HydrateTypeSlice(evalCtx.Context, coreOutputTypes); err != nil {713		return err714	}715	pb.SemaCtx = tree.MakeSemaContext()716	pb.SemaCtx.TypeResolver = resolver717	return pb.OutputHelper.Init(post, coreOutputTypes, &pb.SemaCtx, pb.EvalCtx)718}719// Init initializes the ProcessorBaseNoHelper.720func (pb *ProcessorBaseNoHelper) Init(721	self RowSource,722	flowCtx *FlowCtx,723	evalCtx *tree.EvalContext,724	processorID int32,725	output RowReceiver,726	opts ProcStateOpts,727) {728	pb.self = self729	pb.FlowCtx = flowCtx730	pb.EvalCtx = evalCtx731	pb.ProcessorID = processorID732	pb.Output = output733	pb.trailingMetaCallback = opts.TrailingMetaCallback734	if opts.InputsToDrain != nil {735		// Only initialize this if non-nil, because we cache the slice of inputs736		// to drain in our object pool, and overwriting the slice in Init would737		// be horribly counterproductive.738		pb.inputsToDrain = opts.InputsToDrain739	}740}741// AddInputToDrain adds an input to drain when moving the processor to a742// draining state.743func (pb *ProcessorBaseNoHelper) AddInputToDrain(input RowSource) {744	pb.inputsToDrain = append(pb.inputsToDrain, input)745}746// AppendTrailingMeta appends metadata to the trailing metadata without changing747// the state to draining (as opposed to MoveToDraining).748func (pb *ProcessorBase) AppendTrailingMeta(meta execinfrapb.ProducerMetadata) {749	pb.trailingMeta = append(pb.trailingMeta, meta)750}751// ProcessorSpan creates a child span for a processor (if we are doing any752// tracing). The returned span needs to be finished using tracing.FinishSpan.753func ProcessorSpan(ctx context.Context, name string) (context.Context, *tracing.Span) {754	return tracing.ChildSpanRemote(ctx, name)755}756// StartInternal prepares the ProcessorBase for execution. It returns the757// annotated context that's also stored in pb.Ctx.758//759// It is likely that this method is called from RowSource.Start implementation,760// and the recommended layout is the following:761//   ctx = pb.StartInternal(ctx, name)762//   < inputs >.Start(ctx) // if there are any inputs-RowSources to pb763//   < other initialization >764// so that the caller doesn't mistakenly use old ctx object.765func (pb *ProcessorBaseNoHelper) StartInternal(ctx context.Context, name string) context.Context {766	return pb.startImpl(ctx, true /* createSpan */, name)767}768// StartInternalNoSpan does the same as StartInternal except that it does not769// start a span. This is used by pass-through components whose goal is to be a770// silent translation layer for components that actually do work (e.g. a771// planNodeToRowSource wrapping an insertNode, or a columnarizer wrapping a772// rowexec flow).773func (pb *ProcessorBaseNoHelper) StartInternalNoSpan(ctx context.Context) context.Context {774	return pb.startImpl(ctx, false /* createSpan */, "")775}776func (pb *ProcessorBaseNoHelper) startImpl(777	ctx context.Context, createSpan bool, spanName string,778) context.Context {779	pb.origCtx = ctx780	if createSpan {781		pb.Ctx, pb.span = ProcessorSpan(ctx, spanName)782		if pb.span != nil && pb.span.IsVerbose() {783			pb.span.SetTag(execinfrapb.FlowIDTagKey, pb.FlowCtx.ID.String())784			pb.span.SetTag(execinfrapb.ProcessorIDTagKey, pb.ProcessorID)785		}786	} else {787		pb.Ctx = ctx788	}789	pb.EvalCtx.Context = pb.Ctx790	return pb.Ctx791}792// InternalClose helps processors implement the RowSource interface, performing793// common close functionality. Returns true iff the processor was not already794// closed.795//796// Notably, it calls ConsumerClosed() on all the inputsToDrain and updates797// pb.Ctx to the context passed into StartInternal() call.798//799//   if pb.InternalClose() {800//     // Perform processor specific close work.801//   }802func (pb *ProcessorBase) InternalClose() bool {803	closing := pb.ProcessorBaseNoHelper.InternalClose()804	if closing {805		// This prevents Next() from returning more rows.806		pb.OutputHelper.consumerClosed()807	}808	return closing809}810// InternalClose is the meat of ProcessorBase.InternalClose.811func (pb *ProcessorBaseNoHelper) InternalClose() bool {812	closing := !pb.Closed813	// Protection around double closing is useful for allowing ConsumerClosed() to814	// be called on processors that have already closed themselves by moving to815	// StateTrailingMeta.816	if closing {817		for _, input := range pb.inputsToDrain[pb.curInputToDrain:] {818			input.ConsumerClosed()819		}820		pb.Closed = true821		pb.span.Finish()822		pb.span = nil823		// Reset the context so that any incidental uses after this point do not824		// access the finished span.825		pb.Ctx = pb.origCtx826		pb.EvalCtx.Context = pb.origCtx827	}828	return closing829}830// ConsumerDone is part of the RowSource interface.831func (pb *ProcessorBaseNoHelper) ConsumerDone() {832	pb.MoveToDraining(nil /* err */)833}834// ConsumerClosed is part of the RowSource interface.835func (pb *ProcessorBaseNoHelper) ConsumerClosed() {836	// The consumer is done, Next() will not be called again.837	pb.InternalClose()838}839// NewMonitor is a utility function used by processors to create a new840// memory monitor with the given name and start it. The returned monitor must841// be closed.842func NewMonitor(ctx context.Context, parent *mon.BytesMonitor, name string) *mon.BytesMonitor {843	monitor := mon.NewMonitorInheritWithLimit(name, 0 /* limit */, parent)844	monitor.Start(ctx, parent, mon.BoundAccount{})845	return monitor846}...

Full Screen

Full Screen

materializer.go

Source:materializer.go Github

copy

Full Screen

...61	ctx          context.Context62	bufferedMeta []execinfrapb.ProducerMetadata63}64var _ execinfra.RowSource = &drainHelper{}65func newDrainHelper(sources execinfrapb.MetadataSources) *drainHelper {66	return &drainHelper{67		MetadataSources: sources,68	}69}70// OutputTypes implements the RowSource interface.71func (d *drainHelper) OutputTypes() []*types.T {72	colexecerror.InternalError("unimplemented")73	// Unreachable code.74	return nil75}76// Start implements the RowSource interface.77func (d *drainHelper) Start(ctx context.Context) context.Context {78	d.ctx = ctx79	return ctx80}81// Next implements the RowSource interface.82func (d *drainHelper) Next() (sqlbase.EncDatumRow, *execinfrapb.ProducerMetadata) {83	if d.bufferedMeta == nil {84		d.bufferedMeta = d.DrainMeta(d.ctx)85		if d.bufferedMeta == nil {86			// Still nil, avoid more calls to DrainMeta.87			d.bufferedMeta = []execinfrapb.ProducerMetadata{}88		}89	}90	if len(d.bufferedMeta) == 0 {91		return nil, nil92	}93	meta := d.bufferedMeta[0]94	d.bufferedMeta = d.bufferedMeta[1:]95	return nil, &meta96}97// ConsumerDone implements the RowSource interface.98func (d *drainHelper) ConsumerDone() {}99// ConsumerClosed implements the RowSource interface.100func (d *drainHelper) ConsumerClosed() {}101const materializerProcName = "materializer"102// NewMaterializer creates a new Materializer processor which processes the103// columnar data coming from input to return it as rows.104// Arguments:105// - typs is the output types scheme.106// - metadataSourcesQueue are all of the metadata sources that are planned on107// the same node as the Materializer and that need to be drained.108// - outputStatsToTrace (when tracing is enabled) finishes the stats.109// - cancelFlow should return the context cancellation function that cancels110// the context of the flow (i.e. it is Flow.ctxCancel). It should only be111// non-nil in case of a root Materializer (i.e. not when we're wrapping a row112// source).113// NOTE: the constructor does *not* take in an execinfrapb.PostProcessSpec114// because we expect input to handle that for us.115func NewMaterializer(116	flowCtx *execinfra.FlowCtx,117	processorID int32,118	input colexecbase.Operator,119	typs []*types.T,120	output execinfra.RowReceiver,121	metadataSourcesQueue []execinfrapb.MetadataSource,122	toClose []Closer,123	outputStatsToTrace func(),124	cancelFlow func() context.CancelFunc,125) (*Materializer, error) {126	m := &Materializer{127		input:       input,128		typs:        typs,129		drainHelper: newDrainHelper(metadataSourcesQueue),130		// nil vecIdxsToConvert indicates that we want to convert all vectors.131		converter: newVecToDatumConverter(len(typs), nil /* vecIdxsToConvert */),132		row:       make(sqlbase.EncDatumRow, len(typs)),133		closers:   toClose,134	}135	if err := m.ProcessorBase.Init(136		m,137		// input must have handled any post-processing itself, so we pass in138		// an empty post-processing spec.139		&execinfrapb.PostProcessSpec{},140		typs,141		flowCtx,142		processorID,143		output,144		nil, /* memMonitor */145		execinfra.ProcStateOpts{146			InputsToDrain: []execinfra.RowSource{m.drainHelper},147			TrailingMetaCallback: func(ctx context.Context) []execinfrapb.ProducerMetadata {148				m.InternalClose()149				return nil150			},151		},152	); err != nil {153		return nil, err154	}155	m.FinishTrace = outputStatsToTrace156	m.cancelFlow = cancelFlow157	return m, nil158}159var _ execinfra.OpNode = &Materializer{}160// ChildCount is part of the exec.OpNode interface.161func (m *Materializer) ChildCount(verbose bool) int {162	return 1163}164// Child is part of the exec.OpNode interface.165func (m *Materializer) Child(nth int, verbose bool) execinfra.OpNode {166	if nth == 0 {167		return m.input168	}169	colexecerror.InternalError(fmt.Sprintf("invalid index %d", nth))170	// This code is unreachable, but the compiler cannot infer that.171	return nil172}173// Start is part of the execinfra.RowSource interface.174func (m *Materializer) Start(ctx context.Context) context.Context {175	m.input.Init()176	ctx = m.drainHelper.Start(ctx)177	return m.ProcessorBase.StartInternal(ctx, materializerProcName)178}179// nextAdapter calls next() and saves the returned results in m. For internal180// use only. The purpose of having this function is to not create an anonymous181// function on every call to Next().182func (m *Materializer) nextAdapter() {183	m.outputRow, m.outputMetadata = m.next()184}185// next is the logic of Next() extracted in a separate method to be used by an186// adapter to be able to wrap the latter with a catcher.187func (m *Materializer) next() (sqlbase.EncDatumRow, *execinfrapb.ProducerMetadata) {188	if m.State == execinfra.StateRunning {189		if m.batch == nil || m.curIdx >= m.batch.Length() {190			// Get a fresh batch.191			m.batch = m.input.Next(m.Ctx)192			if m.batch.Length() == 0 {193				m.MoveToDraining(nil /* err */)194				return nil, m.DrainHelper()195			}196			m.curIdx = 0197			m.converter.convertBatch(m.batch)198		}199		for colIdx := range m.typs {200			// Note that we don't need to apply the selection vector of the201			// batch to index m.curIdx because vecToDatumConverter returns a202			// "dense" datum column.203			m.row[colIdx].Datum = m.converter.getDatumColumn(colIdx)[m.curIdx]204		}205		m.curIdx++206		// Note that there is no post-processing to be done in the207		// materializer, so we do not use ProcessRowHelper and emit the row208		// directly.209		return m.row, nil210	}211	return nil, m.DrainHelper()212}213// Next is part of the execinfra.RowSource interface.214func (m *Materializer) Next() (sqlbase.EncDatumRow, *execinfrapb.ProducerMetadata) {215	if err := colexecerror.CatchVectorizedRuntimeError(m.nextAdapter); err != nil {216		m.MoveToDraining(err)217		return nil, m.DrainHelper()218	}219	return m.outputRow, m.outputMetadata220}221// InternalClose helps implement the execinfra.RowSource interface.222func (m *Materializer) InternalClose() bool {223	if m.ProcessorBase.InternalClose() {224		if m.cancelFlow != nil {225			m.cancelFlow()()226		}227		m.closers.CloseAndLogOnErr(m.Ctx, "materializer")228		return true229	}230	return false231}232// ConsumerDone is part of the execinfra.RowSource interface.233func (m *Materializer) ConsumerDone() {234	// Materializer will move into 'draining' state, and after all the metadata235	// has been drained - as part of TrailingMetaCallback - InternalClose() will236	// be called which will cancel the flow.237	m.MoveToDraining(nil /* err */)238}239// ConsumerClosed is part of the execinfra.RowSource interface.240func (m *Materializer) ConsumerClosed() {241	m.InternalClose()242}...

Full Screen

Full Screen

Drain

Using AI Code Generation

copy

Full Screen

1import "fmt"2func main() {3    c := make(chan int)4    go func() {5        for i := 0; i < 10; i++ {6        }7        close(c)8    }()9    for n := range c {10        fmt.Println(n)11    }12}

Full Screen

Full Screen

Drain

Using AI Code Generation

copy

Full Screen

1import (2func main() {3	fmt.Println("Hello World!")4	internal.Drain()5}6import (7func Drain() {8	fmt.Println("Drain")9}10import (11func main() {12	fmt.Println("Hello World!")13	internal.Drain()14}15import (16func main() {17	fmt.Println("Hello World!")18	internal.Drain()19}20import (21func Drain() {22	fmt.Println("Drain")23}

Full Screen

Full Screen

Drain

Using AI Code Generation

copy

Full Screen

1import (2type Employee struct {3}4func main() {5    wg.Add(1)6    go func() {7        defer wg.Done()8        employeeChan := make(chan Employee, 10)9        employeeChan <- Employee{"John", 30}10        employeeChan <- Employee{"Mark", 40}11        employeeChan <- Employee{"Sandy", 50}12        close(employeeChan)13        for employee := range employeeChan {14            fmt.Println("Name:", employee.Name)15            fmt.Println("Age:", employee.Age)16        }17    }()18    wg.Wait()19}20import (21func main() {22    wg.Add(1)23    go func() {24        defer wg.Done()25        intChan := make(chan int, 10)26        close(intChan)27        for i := range intChan {28            fmt.Println("Value:", i)29        }30    }()31    wg.Wait()32}33import (

Full Screen

Full Screen

Drain

Using AI Code Generation

copy

Full Screen

1func main() {2    b.Write([]byte("Hello"))3    b.Write([]byte("World"))4    fmt.Println(b.String())5    b.Drain()6    fmt.Println(b.String())7}8func main() {9    b.Write([]byte("Hello"))10    b.Write([]byte("World"))11    fmt.Println(b.String())12    b.Drain()13    fmt.Println(b.String())14}15func main() {16    b.Write([]byte("Hello"))17    b.Write([]byte("World"))18    fmt.Println(b.String())19    b.Drain()20    fmt.Println(b.String())21}22func main() {23    b.Write([]byte("Hello"))24    b.Write([]byte("World"))25    fmt.Println(b.String())26    b.Drain()27    fmt.Println(b.String())28}

Full Screen

Full Screen

Drain

Using AI Code Generation

copy

Full Screen

1import (2func main() {3	fmt.Println(x.Drain())4}5func (i MyInt) Drain() int {6	return int(i) - 17}8import "testing"9func TestDrain(t *testing.T) {10	if x.Drain() != 4 {11		t.Errorf("Drain should return 4, but returned %d", x.Drain())12	}13}

Full Screen

Full Screen

Drain

Using AI Code Generation

copy

Full Screen

1import (2func main() {3  ic := internal.NewInternalClass()4  ic.Drain()5  fmt.Println("Done")6}7import (8type InternalClass struct {9}10func NewInternalClass() *InternalClass {11  return &InternalClass{}12}13func (ic *InternalClass) Drain() {14  fmt.Println("Drain")15}

Full Screen

Full Screen

Drain

Using AI Code Generation

copy

Full Screen

1import (2func main() {3    fmt.Println("Hello, playground")4    p := &internal.Person{}5    p.Drain()6}7import (8func main() {9    fmt.Println("Hello, playground")10    p := &internal.Person{}11    p.Drain()12}13import (14func main() {15    fmt.Println("Hello, playground")16    p := &internal.Person{}17    p.Drain()18}

Full Screen

Full Screen

Drain

Using AI Code Generation

copy

Full Screen

1import (2func main() {3	fmt.Println(internal.Drain())4}5import "fmt"6func Drain() string {7}8func Flush() string {9}10func FlushToilet() string {11}12func FlushSink() string {13}14import (15func main() {16	fmt.Println(internal.Drain())17}

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Ginkgo automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Most used method in

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful