Best Ginkgo code snippet using internal.Drain
processorsbase.go
Source:processorsbase.go
...207 if outRow == nil {208 if ok {209 return NeedMoreRows, nil210 }211 return DrainRequested, nil212 }213 if log.V(3) {214 log.InfofDepth(ctx, 1, "pushing row %s", outRow.String(h.OutputTypes))215 }216 if r := output.Push(outRow, nil); r != NeedMoreRows {217 log.VEventf(ctx, 1, "no more rows required. drain requested: %t",218 r == DrainRequested)219 return r, nil220 }221 if h.rowIdx == h.maxRowIdx {222 log.VEventf(ctx, 1, "hit row limit; asking producer to drain")223 return DrainRequested, nil224 }225 status := NeedMoreRows226 if !ok {227 status = DrainRequested228 }229 return status, nil230}231// ProcessRow sends the invoked row through the post-processing stage and returns232// the post-processed row. Results from ProcessRow aren't safe past the next call233// to ProcessRow.234//235// The moreRowsOK retval is true if more rows can be processed, false if the236// limit has been reached (if there's a limit). Upon seeing a false value, the237// caller is expected to start draining. Note that both a row and238// moreRowsOK=false can be returned at the same time: the row that satisfies the239// limit is returned at the same time as a DrainRequested status. In that case,240// the caller is supposed to both deal with the row and start draining.241func (h *ProcOutputHelper) ProcessRow(242 ctx context.Context, row rowenc.EncDatumRow,243) (_ rowenc.EncDatumRow, moreRowsOK bool, _ error) {244 if h.rowIdx >= h.maxRowIdx {245 return nil, false, nil246 }247 h.rowIdx++248 if h.rowIdx <= h.offset {249 // Suppress row.250 return nil, true, nil251 }252 if len(h.renderExprs) > 0 {253 // Rendering.254 for i := range h.renderExprs {255 datum, err := h.renderExprs[i].Eval(row)256 if err != nil {257 return nil, false, err258 }259 h.outputRow[i] = rowenc.DatumToEncDatum(h.OutputTypes[i], datum)260 }261 } else if h.outputCols != nil {262 // Projection.263 for i, col := range h.outputCols {264 h.outputRow[i] = row[col]265 }266 } else {267 // No rendering or projection.268 return row, h.rowIdx < h.maxRowIdx, nil269 }270 // If this row satisfies the limit, the caller is told to drain.271 return h.outputRow, h.rowIdx < h.maxRowIdx, nil272}273// consumerClosed stops output of additional rows from ProcessRow.274func (h *ProcOutputHelper) consumerClosed() {275 h.rowIdx = h.maxRowIdx276}277// Stats returns output statistics.278func (h *ProcOutputHelper) Stats() execinfrapb.OutputStats {279 return execinfrapb.OutputStats{280 NumTuples: optional.MakeUint(h.rowIdx),281 }282}283// ProcessorConstructor is a function that creates a Processor. It is284// abstracted away so that we could create mixed flows (i.e. a vectorized flow285// with wrapped processors) without bringing a dependency on sql/rowexec286// package into sql/colexec package.287type ProcessorConstructor func(288 ctx context.Context,289 flowCtx *FlowCtx,290 processorID int32,291 core *execinfrapb.ProcessorCoreUnion,292 post *execinfrapb.PostProcessSpec,293 inputs []RowSource,294 outputs []RowReceiver,295 localProcessors []LocalProcessor,296) (Processor, error)297// ProcessorBase is supposed to be embedded by Processors. It provides298// facilities for dealing with filtering and projection (through a299// ProcOutputHelper) and for implementing the RowSource interface (draining,300// trailing metadata).301type ProcessorBase struct {302 ProcessorBaseNoHelper303 // OutputHelper is used to handle the post-processing spec.304 OutputHelper ProcOutputHelper305 // MemMonitor is the processor's memory monitor.306 MemMonitor *mon.BytesMonitor307 // SemaCtx is used to avoid allocating a new SemaCtx during processor setup.308 SemaCtx tree.SemaContext309}310// ProcessorBaseNoHelper is slightly reduced version of ProcessorBase that311// should be used by the processors that don't need to handle the312// post-processing spec.313type ProcessorBaseNoHelper struct {314 self RowSource315 ProcessorID int32316 // Output is the consumer of the rows produced by this ProcessorBase. If317 // Output is nil, one can invoke ProcessRow to obtain the post-processed row318 // directly.319 Output RowReceiver320 FlowCtx *FlowCtx321 // EvalCtx is used for expression evaluation. It overrides the one in flowCtx.322 EvalCtx *tree.EvalContext323 // Closed is set by InternalClose(). Once set, the processor's tracing span324 // has been closed.325 Closed bool326 // Ctx and span contain the tracing state while the processor is active327 // (i.e. hasn't been closed). Initialized using flowCtx.Ctx (which should not be otherwise328 // used).329 Ctx context.Context330 span *tracing.Span331 // origCtx is the context from which ctx was derived. InternalClose() resets332 // ctx to this.333 origCtx context.Context334 State procState335 // ExecStatsForTrace, if set, will be called before getting the trace data from336 // the span and adding the recording to the trailing metadata. The returned337 // ComponentStats are associated with the processor's span. The Component338 // field of the returned stats will be set by the calling code.339 //340 // Can return nil.341 ExecStatsForTrace func() *execinfrapb.ComponentStats342 // trailingMetaCallback, if set, will be called by moveToTrailingMeta(). The343 // callback is expected to close all inputs, do other cleanup on the processor344 // (including calling InternalClose()) and generate the trailing meta that345 // needs to be returned to the consumer. As a special case,346 // moveToTrailingMeta() handles getting the tracing information into347 // trailingMeta, so the callback doesn't need to worry about that.348 //349 // If no callback is specified, InternalClose() will be called automatically.350 // So, if no trailing metadata other than the trace needs to be returned (and351 // other than what has otherwise been manually put in trailingMeta) and no352 // closing other than InternalClose is needed, then no callback needs to be353 // specified.354 trailingMetaCallback func() []execinfrapb.ProducerMetadata355 // trailingMeta is scratch space where metadata is stored to be returned356 // later.357 trailingMeta []execinfrapb.ProducerMetadata358 // inputsToDrain, if not empty, contains inputs to be drained by359 // DrainHelper(). MoveToDraining() calls ConsumerDone() on them,360 // InternalClose() calls ConsumerClosed() on then.361 //362 // ConsumerDone() is called on all inputs at once and then inputs are drained363 // one by one (in StateDraining, inputsToDrain[curInputToDrain] is the one364 // currently being drained).365 inputsToDrain []RowSource366 // curInputToDrain is the index into inputsToDrain that needs to be drained367 // next.368 curInputToDrain int369}370// MustBeStreaming implements the Processor interface.371func (pb *ProcessorBaseNoHelper) MustBeStreaming() bool {372 return false373}374// Reset resets this ProcessorBaseNoHelper, retaining allocated memory in375// slices.376func (pb *ProcessorBaseNoHelper) Reset() {377 // Deeply reset the slices so that we don't hold onto the old objects.378 for i := range pb.trailingMeta {379 pb.trailingMeta[i] = execinfrapb.ProducerMetadata{}380 }381 for i := range pb.inputsToDrain {382 pb.inputsToDrain[i] = nil383 }384 *pb = ProcessorBaseNoHelper{385 trailingMeta: pb.trailingMeta[:0],386 inputsToDrain: pb.inputsToDrain[:0],387 }388}389// Reset resets this ProcessorBase, retaining allocated memory in slices.390func (pb *ProcessorBase) Reset() {391 pb.ProcessorBaseNoHelper.Reset()392 pb.OutputHelper.Reset()393 *pb = ProcessorBase{394 ProcessorBaseNoHelper: pb.ProcessorBaseNoHelper,395 OutputHelper: pb.OutputHelper,396 }397}398// procState represents the standard states that a processor can be in. These399// states are relevant when the processor is using the draining utilities in400// ProcessorBase.401type procState int402//go:generate stringer -type=procState403const (404 // StateRunning is the common state of a processor: it's producing rows for405 // its consumer and forwarding metadata from its input. Different processors406 // might have sub-states internally.407 //408 // If the consumer calls ConsumerDone or if the ProcOutputHelper.maxRowIdx is409 // reached, then the processor will transition to StateDraining. If the input410 // is exhausted, then the processor can transition to StateTrailingMeta411 // directly, although most always go through StateDraining.412 StateRunning procState = iota413 // StateDraining is the state in which the processor is forwarding metadata414 // from its input and otherwise ignoring all rows. Once the input is415 // exhausted, the processor will transition to StateTrailingMeta.416 //417 // In StateDraining, processors are required to swallow418 // ReadWithinUncertaintyIntervalErrors received from its sources. We're419 // already draining, so we don't care about whatever data generated this420 // uncertainty error. Besides generally seeming like a good idea, doing this421 // allows us to offer a nice guarantee to SQL clients: a read-only query that422 // produces at most one row, run as an implicit txn, never produces retriable423 // errors, regardless of the size of the row being returned (in relation to424 // the size of the result buffer on the connection). One would naively expect425 // that to be true: either the error happens before any rows have been426 // delivered to the client, in which case the auto-retries kick in, or, if a427 // row has been delivered, then the query is done and so how can there be an428 // error? What our naive friend is ignoring is that, if it weren't for this429 // code, it'd be possible for a retriable error to sneak in after the query's430 // limit has been satisfied but while processors are still draining. Note431 // that uncertainty errors are not retried automatically by the leaf432 // TxnCoordSenders (i.e. by refresh txn interceptor).433 //434 // Other categories of errors might be safe to ignore too; however we435 // can't ignore all of them. Generally, we need to ensure that all the436 // trailing metadata (e.g. LeafTxnFinalState's) make it to the gateway for437 // successful flows. If an error is telling us that some metadata might438 // have been dropped, we can't ignore that.439 StateDraining440 // StateTrailingMeta is the state in which the processor is outputting final441 // metadata such as the tracing information or the LeafTxnFinalState. Once all the442 // trailing metadata has been produced, the processor transitions to443 // StateExhausted.444 StateTrailingMeta445 // StateExhausted is the state of a processor that has no more rows or446 // metadata to produce.447 StateExhausted448)449// MoveToDraining switches the processor to the StateDraining. Only metadata is450// returned from now on. In this state, the processor is expected to drain its451// inputs (commonly by using DrainHelper()).452//453// If the processor has no input (ProcStateOpts.inputToDrain was not specified454// at init() time), then we move straight to the StateTrailingMeta.455//456// An error can be optionally passed. It will be the first piece of metadata457// returned by DrainHelper().458func (pb *ProcessorBaseNoHelper) MoveToDraining(err error) {459 if pb.State != StateRunning {460 // Calling MoveToDraining in any state is allowed in order to facilitate the461 // ConsumerDone() implementations that just call this unconditionally.462 // However, calling it with an error in states other than StateRunning is463 // not permitted.464 if err != nil {465 logcrash.ReportOrPanic(466 pb.Ctx,467 &pb.FlowCtx.Cfg.Settings.SV,468 "MoveToDraining called in state %s with err: %+v",469 pb.State, err)470 }471 return472 }473 if err != nil {474 pb.trailingMeta = append(pb.trailingMeta, execinfrapb.ProducerMetadata{Err: err})475 }476 if pb.curInputToDrain < len(pb.inputsToDrain) {477 // We go to StateDraining here. DrainHelper() will transition to478 // StateTrailingMeta when the inputs are drained (including if the inputs479 // are already drained).480 pb.State = StateDraining481 for _, input := range pb.inputsToDrain[pb.curInputToDrain:] {482 input.ConsumerDone()483 }484 } else {485 pb.moveToTrailingMeta()486 }487}488// DrainHelper is supposed to be used in states draining and trailingMetadata.489// It deals with optionally draining an input and returning trailing meta. It490// also moves from StateDraining to StateTrailingMeta when appropriate.491func (pb *ProcessorBaseNoHelper) DrainHelper() *execinfrapb.ProducerMetadata {492 if pb.State == StateRunning {493 logcrash.ReportOrPanic(494 pb.Ctx,495 &pb.FlowCtx.Cfg.Settings.SV,496 "drain helper called in StateRunning",497 )498 }499 // trailingMeta always has priority; it seems like a good idea because it500 // causes metadata to be sent quickly after it is produced (e.g. the error501 // passed to MoveToDraining()).502 if len(pb.trailingMeta) > 0 {503 return pb.popTrailingMeta()504 }505 if pb.State != StateDraining {506 return nil507 }508 // Ignore all rows; only return meta.509 for {510 input := pb.inputsToDrain[pb.curInputToDrain]511 row, meta := input.Next()512 if row == nil && meta == nil {513 pb.curInputToDrain++514 if pb.curInputToDrain >= len(pb.inputsToDrain) {515 pb.moveToTrailingMeta()516 return pb.popTrailingMeta()517 }518 continue519 }520 if meta != nil {521 // Swallow ReadWithinUncertaintyIntervalErrors. See comments on522 // StateDraining.523 if ShouldSwallowReadWithinUncertaintyIntervalError(meta) {524 continue525 }526 return meta527 }528 }529}530// ShouldSwallowReadWithinUncertaintyIntervalError examines meta and returns531// true if it should be swallowed and not propagated further. It is the case if532// meta contains roachpb.ReadWithinUncertaintyIntervalError.533func ShouldSwallowReadWithinUncertaintyIntervalError(meta *execinfrapb.ProducerMetadata) bool {534 if err := meta.Err; err != nil {535 // We only look for UnhandledRetryableErrors. Local reads (which would536 // be transformed by the Root TxnCoordSender into537 // TransactionRetryWithProtoRefreshErrors) don't have any uncertainty.538 if ure := (*roachpb.UnhandledRetryableError)(nil); errors.As(err, &ure) {539 if _, uncertain := ure.PErr.GetDetail().(*roachpb.ReadWithinUncertaintyIntervalError); uncertain {540 return true541 }542 }543 }544 return false545}546// popTrailingMeta peels off one piece of trailing metadata or advances to547// StateExhausted if there's no more trailing metadata.548func (pb *ProcessorBaseNoHelper) popTrailingMeta() *execinfrapb.ProducerMetadata {549 if len(pb.trailingMeta) > 0 {550 meta := &pb.trailingMeta[0]551 pb.trailingMeta = pb.trailingMeta[1:]552 return meta553 }554 pb.State = StateExhausted555 return nil556}557// ExecStatsForTraceHijacker is an interface that allows us to hijack558// ExecStatsForTrace function from the ProcessorBase.559type ExecStatsForTraceHijacker interface {560 // HijackExecStatsForTrace returns ExecStatsForTrace function, if set, and561 // sets it to nil. The caller becomes responsible for collecting and562 // propagating the execution statistics.563 HijackExecStatsForTrace() func() *execinfrapb.ComponentStats564}565var _ ExecStatsForTraceHijacker = &ProcessorBase{}566// HijackExecStatsForTrace is a part of the ExecStatsForTraceHijacker interface.567func (pb *ProcessorBase) HijackExecStatsForTrace() func() *execinfrapb.ComponentStats {568 execStatsForTrace := pb.ExecStatsForTrace569 pb.ExecStatsForTrace = nil570 return execStatsForTrace571}572// moveToTrailingMeta switches the processor to the "trailing meta" state: only573// trailing metadata is returned from now on. For simplicity, processors are574// encouraged to always use MoveToDraining() instead of this method, even when575// there's nothing to drain. moveToDrain() or DrainHelper() will internally call576// moveToTrailingMeta().577//578// trailingMetaCallback, if any, is called; it is expected to close the579// processor's inputs.580//581// This method is to be called when the processor is done producing rows and582// draining its inputs (if it wants to drain them).583func (pb *ProcessorBaseNoHelper) moveToTrailingMeta() {584 if pb.State == StateTrailingMeta || pb.State == StateExhausted {585 logcrash.ReportOrPanic(586 pb.Ctx,587 &pb.FlowCtx.Cfg.Settings.SV,588 "moveToTrailingMeta called in state: %s",589 pb.State,590 )591 }592 pb.State = StateTrailingMeta593 if pb.span != nil {594 if pb.ExecStatsForTrace != nil {595 if stats := pb.ExecStatsForTrace(); stats != nil {596 stats.Component = pb.FlowCtx.ProcessorComponentID(pb.ProcessorID)597 pb.span.RecordStructured(stats)598 }599 }600 if trace := pb.span.GetRecording(); trace != nil {601 pb.trailingMeta = append(pb.trailingMeta, execinfrapb.ProducerMetadata{TraceData: trace})602 }603 }604 if util.CrdbTestBuild && pb.Ctx == nil {605 panic(606 errors.AssertionFailedf(607 "unexpected nil ProcessorBase.Ctx when draining. Was StartInternal called?",608 ),609 )610 }611 // trailingMetaCallback is called after reading the tracing data because it612 // generally calls InternalClose, indirectly, which switches the context and613 // the span.614 if pb.trailingMetaCallback != nil {615 pb.trailingMeta = append(pb.trailingMeta, pb.trailingMetaCallback()...)616 } else {617 pb.InternalClose()618 }619}620// ProcessRowHelper is a wrapper on top of ProcOutputHelper.ProcessRow(). It621// takes care of handling errors and drain requests by moving the processor to622// StateDraining.623//624// It takes a row and returns the row after processing. The return value can be625// nil, in which case the caller shouldn't return anything to its consumer; it626// should continue processing other rows, with the awareness that the processor627// might have been transitioned to the draining phase.628func (pb *ProcessorBase) ProcessRowHelper(row rowenc.EncDatumRow) rowenc.EncDatumRow {629 outRow, ok, err := pb.OutputHelper.ProcessRow(pb.Ctx, row)630 if err != nil {631 pb.MoveToDraining(err)632 return nil633 }634 if !ok {635 pb.MoveToDraining(nil /* err */)636 }637 // Note that outRow might be nil here.638 // TODO(yuzefovich): there is a problem with this logging when MetadataTest*639 // processors are planned - there is a mismatch between the row and the640 // output types (rendering is added to the stage of test processors and the641 // actual processors that are inputs to the test ones have an unset post642 // processing; I think that we need to set the post processing on the stages643 // of processors below the test ones).644 //if outRow != nil && log.V(3) && pb.Ctx != nil {645 // log.InfofDepth(pb.Ctx, 1, "pushing row %s", outRow.String(pb.Out.OutputTypes))646 //}647 return outRow648}649// OutputTypes is part of the Processor interface.650func (pb *ProcessorBase) OutputTypes() []*types.T {651 return pb.OutputHelper.OutputTypes652}653// Run is part of the Processor interface.654func (pb *ProcessorBaseNoHelper) Run(ctx context.Context) {655 if pb.Output == nil {656 panic("processor output is not set for emitting rows")657 }658 pb.self.Start(ctx)659 Run(pb.Ctx, pb.self, pb.Output)660}661// ProcStateOpts contains fields used by the ProcessorBase's family of functions662// that deal with draining and trailing metadata: the ProcessorBase implements663// generic useful functionality that needs to call back into the Processor.664type ProcStateOpts struct {665 // TrailingMetaCallback, if specified, is a callback to be called by666 // moveToTrailingMeta(). See ProcessorBase.TrailingMetaCallback.667 TrailingMetaCallback func() []execinfrapb.ProducerMetadata668 // InputsToDrain, if specified, will be drained by DrainHelper().669 // MoveToDraining() calls ConsumerDone() on them, InternalClose() calls670 // ConsumerClosed() on them.671 InputsToDrain []RowSource672}673// Init initializes the ProcessorBase.674// - coreOutputTypes are the type schema of the rows output by the processor675// core (i.e. the "internal schema" of the processor, see676// execinfrapb.ProcessorSpec for more details).677func (pb *ProcessorBase) Init(678 self RowSource,679 post *execinfrapb.PostProcessSpec,680 coreOutputTypes []*types.T,681 flowCtx *FlowCtx,682 processorID int32,683 output RowReceiver,684 memMonitor *mon.BytesMonitor,685 opts ProcStateOpts,686) error {687 return pb.InitWithEvalCtx(688 self, post, coreOutputTypes, flowCtx, flowCtx.NewEvalCtx(), processorID, output, memMonitor, opts,689 )690}691// InitWithEvalCtx initializes the ProcessorBase with a given EvalContext.692// - coreOutputTypes are the type schema of the rows output by the processor693// core (i.e. the "internal schema" of the processor, see694// execinfrapb.ProcessorSpec for more details).695func (pb *ProcessorBase) InitWithEvalCtx(696 self RowSource,697 post *execinfrapb.PostProcessSpec,698 coreOutputTypes []*types.T,699 flowCtx *FlowCtx,700 evalCtx *tree.EvalContext,701 processorID int32,702 output RowReceiver,703 memMonitor *mon.BytesMonitor,704 opts ProcStateOpts,705) error {706 pb.ProcessorBaseNoHelper.Init(707 self, flowCtx, evalCtx, processorID, output, opts,708 )709 pb.MemMonitor = memMonitor710 // Hydrate all types used in the processor.711 resolver := flowCtx.TypeResolverFactory.NewTypeResolver(evalCtx.Txn)712 if err := resolver.HydrateTypeSlice(evalCtx.Context, coreOutputTypes); err != nil {713 return err714 }715 pb.SemaCtx = tree.MakeSemaContext()716 pb.SemaCtx.TypeResolver = resolver717 return pb.OutputHelper.Init(post, coreOutputTypes, &pb.SemaCtx, pb.EvalCtx)718}719// Init initializes the ProcessorBaseNoHelper.720func (pb *ProcessorBaseNoHelper) Init(721 self RowSource,722 flowCtx *FlowCtx,723 evalCtx *tree.EvalContext,724 processorID int32,725 output RowReceiver,726 opts ProcStateOpts,727) {728 pb.self = self729 pb.FlowCtx = flowCtx730 pb.EvalCtx = evalCtx731 pb.ProcessorID = processorID732 pb.Output = output733 pb.trailingMetaCallback = opts.TrailingMetaCallback734 if opts.InputsToDrain != nil {735 // Only initialize this if non-nil, because we cache the slice of inputs736 // to drain in our object pool, and overwriting the slice in Init would737 // be horribly counterproductive.738 pb.inputsToDrain = opts.InputsToDrain739 }740}741// AddInputToDrain adds an input to drain when moving the processor to a742// draining state.743func (pb *ProcessorBaseNoHelper) AddInputToDrain(input RowSource) {744 pb.inputsToDrain = append(pb.inputsToDrain, input)745}746// AppendTrailingMeta appends metadata to the trailing metadata without changing747// the state to draining (as opposed to MoveToDraining).748func (pb *ProcessorBase) AppendTrailingMeta(meta execinfrapb.ProducerMetadata) {749 pb.trailingMeta = append(pb.trailingMeta, meta)750}751// ProcessorSpan creates a child span for a processor (if we are doing any752// tracing). The returned span needs to be finished using tracing.FinishSpan.753func ProcessorSpan(ctx context.Context, name string) (context.Context, *tracing.Span) {754 return tracing.ChildSpanRemote(ctx, name)755}756// StartInternal prepares the ProcessorBase for execution. It returns the757// annotated context that's also stored in pb.Ctx.758//759// It is likely that this method is called from RowSource.Start implementation,760// and the recommended layout is the following:761// ctx = pb.StartInternal(ctx, name)762// < inputs >.Start(ctx) // if there are any inputs-RowSources to pb763// < other initialization >764// so that the caller doesn't mistakenly use old ctx object.765func (pb *ProcessorBaseNoHelper) StartInternal(ctx context.Context, name string) context.Context {766 return pb.startImpl(ctx, true /* createSpan */, name)767}768// StartInternalNoSpan does the same as StartInternal except that it does not769// start a span. This is used by pass-through components whose goal is to be a770// silent translation layer for components that actually do work (e.g. a771// planNodeToRowSource wrapping an insertNode, or a columnarizer wrapping a772// rowexec flow).773func (pb *ProcessorBaseNoHelper) StartInternalNoSpan(ctx context.Context) context.Context {774 return pb.startImpl(ctx, false /* createSpan */, "")775}776func (pb *ProcessorBaseNoHelper) startImpl(777 ctx context.Context, createSpan bool, spanName string,778) context.Context {779 pb.origCtx = ctx780 if createSpan {781 pb.Ctx, pb.span = ProcessorSpan(ctx, spanName)782 if pb.span != nil && pb.span.IsVerbose() {783 pb.span.SetTag(execinfrapb.FlowIDTagKey, pb.FlowCtx.ID.String())784 pb.span.SetTag(execinfrapb.ProcessorIDTagKey, pb.ProcessorID)785 }786 } else {787 pb.Ctx = ctx788 }789 pb.EvalCtx.Context = pb.Ctx790 return pb.Ctx791}792// InternalClose helps processors implement the RowSource interface, performing793// common close functionality. Returns true iff the processor was not already794// closed.795//796// Notably, it calls ConsumerClosed() on all the inputsToDrain and updates797// pb.Ctx to the context passed into StartInternal() call.798//799// if pb.InternalClose() {800// // Perform processor specific close work.801// }802func (pb *ProcessorBase) InternalClose() bool {803 closing := pb.ProcessorBaseNoHelper.InternalClose()804 if closing {805 // This prevents Next() from returning more rows.806 pb.OutputHelper.consumerClosed()807 }808 return closing809}810// InternalClose is the meat of ProcessorBase.InternalClose.811func (pb *ProcessorBaseNoHelper) InternalClose() bool {812 closing := !pb.Closed813 // Protection around double closing is useful for allowing ConsumerClosed() to814 // be called on processors that have already closed themselves by moving to815 // StateTrailingMeta.816 if closing {817 for _, input := range pb.inputsToDrain[pb.curInputToDrain:] {818 input.ConsumerClosed()819 }820 pb.Closed = true821 pb.span.Finish()822 pb.span = nil823 // Reset the context so that any incidental uses after this point do not824 // access the finished span.825 pb.Ctx = pb.origCtx826 pb.EvalCtx.Context = pb.origCtx827 }828 return closing829}830// ConsumerDone is part of the RowSource interface.831func (pb *ProcessorBaseNoHelper) ConsumerDone() {832 pb.MoveToDraining(nil /* err */)833}834// ConsumerClosed is part of the RowSource interface.835func (pb *ProcessorBaseNoHelper) ConsumerClosed() {836 // The consumer is done, Next() will not be called again.837 pb.InternalClose()838}839// NewMonitor is a utility function used by processors to create a new840// memory monitor with the given name and start it. The returned monitor must841// be closed.842func NewMonitor(ctx context.Context, parent *mon.BytesMonitor, name string) *mon.BytesMonitor {843 monitor := mon.NewMonitorInheritWithLimit(name, 0 /* limit */, parent)844 monitor.Start(ctx, parent, mon.BoundAccount{})845 return monitor846}...
materializer.go
Source:materializer.go
...61 ctx context.Context62 bufferedMeta []execinfrapb.ProducerMetadata63}64var _ execinfra.RowSource = &drainHelper{}65func newDrainHelper(sources execinfrapb.MetadataSources) *drainHelper {66 return &drainHelper{67 MetadataSources: sources,68 }69}70// OutputTypes implements the RowSource interface.71func (d *drainHelper) OutputTypes() []*types.T {72 colexecerror.InternalError("unimplemented")73 // Unreachable code.74 return nil75}76// Start implements the RowSource interface.77func (d *drainHelper) Start(ctx context.Context) context.Context {78 d.ctx = ctx79 return ctx80}81// Next implements the RowSource interface.82func (d *drainHelper) Next() (sqlbase.EncDatumRow, *execinfrapb.ProducerMetadata) {83 if d.bufferedMeta == nil {84 d.bufferedMeta = d.DrainMeta(d.ctx)85 if d.bufferedMeta == nil {86 // Still nil, avoid more calls to DrainMeta.87 d.bufferedMeta = []execinfrapb.ProducerMetadata{}88 }89 }90 if len(d.bufferedMeta) == 0 {91 return nil, nil92 }93 meta := d.bufferedMeta[0]94 d.bufferedMeta = d.bufferedMeta[1:]95 return nil, &meta96}97// ConsumerDone implements the RowSource interface.98func (d *drainHelper) ConsumerDone() {}99// ConsumerClosed implements the RowSource interface.100func (d *drainHelper) ConsumerClosed() {}101const materializerProcName = "materializer"102// NewMaterializer creates a new Materializer processor which processes the103// columnar data coming from input to return it as rows.104// Arguments:105// - typs is the output types scheme.106// - metadataSourcesQueue are all of the metadata sources that are planned on107// the same node as the Materializer and that need to be drained.108// - outputStatsToTrace (when tracing is enabled) finishes the stats.109// - cancelFlow should return the context cancellation function that cancels110// the context of the flow (i.e. it is Flow.ctxCancel). It should only be111// non-nil in case of a root Materializer (i.e. not when we're wrapping a row112// source).113// NOTE: the constructor does *not* take in an execinfrapb.PostProcessSpec114// because we expect input to handle that for us.115func NewMaterializer(116 flowCtx *execinfra.FlowCtx,117 processorID int32,118 input colexecbase.Operator,119 typs []*types.T,120 output execinfra.RowReceiver,121 metadataSourcesQueue []execinfrapb.MetadataSource,122 toClose []Closer,123 outputStatsToTrace func(),124 cancelFlow func() context.CancelFunc,125) (*Materializer, error) {126 m := &Materializer{127 input: input,128 typs: typs,129 drainHelper: newDrainHelper(metadataSourcesQueue),130 // nil vecIdxsToConvert indicates that we want to convert all vectors.131 converter: newVecToDatumConverter(len(typs), nil /* vecIdxsToConvert */),132 row: make(sqlbase.EncDatumRow, len(typs)),133 closers: toClose,134 }135 if err := m.ProcessorBase.Init(136 m,137 // input must have handled any post-processing itself, so we pass in138 // an empty post-processing spec.139 &execinfrapb.PostProcessSpec{},140 typs,141 flowCtx,142 processorID,143 output,144 nil, /* memMonitor */145 execinfra.ProcStateOpts{146 InputsToDrain: []execinfra.RowSource{m.drainHelper},147 TrailingMetaCallback: func(ctx context.Context) []execinfrapb.ProducerMetadata {148 m.InternalClose()149 return nil150 },151 },152 ); err != nil {153 return nil, err154 }155 m.FinishTrace = outputStatsToTrace156 m.cancelFlow = cancelFlow157 return m, nil158}159var _ execinfra.OpNode = &Materializer{}160// ChildCount is part of the exec.OpNode interface.161func (m *Materializer) ChildCount(verbose bool) int {162 return 1163}164// Child is part of the exec.OpNode interface.165func (m *Materializer) Child(nth int, verbose bool) execinfra.OpNode {166 if nth == 0 {167 return m.input168 }169 colexecerror.InternalError(fmt.Sprintf("invalid index %d", nth))170 // This code is unreachable, but the compiler cannot infer that.171 return nil172}173// Start is part of the execinfra.RowSource interface.174func (m *Materializer) Start(ctx context.Context) context.Context {175 m.input.Init()176 ctx = m.drainHelper.Start(ctx)177 return m.ProcessorBase.StartInternal(ctx, materializerProcName)178}179// nextAdapter calls next() and saves the returned results in m. For internal180// use only. The purpose of having this function is to not create an anonymous181// function on every call to Next().182func (m *Materializer) nextAdapter() {183 m.outputRow, m.outputMetadata = m.next()184}185// next is the logic of Next() extracted in a separate method to be used by an186// adapter to be able to wrap the latter with a catcher.187func (m *Materializer) next() (sqlbase.EncDatumRow, *execinfrapb.ProducerMetadata) {188 if m.State == execinfra.StateRunning {189 if m.batch == nil || m.curIdx >= m.batch.Length() {190 // Get a fresh batch.191 m.batch = m.input.Next(m.Ctx)192 if m.batch.Length() == 0 {193 m.MoveToDraining(nil /* err */)194 return nil, m.DrainHelper()195 }196 m.curIdx = 0197 m.converter.convertBatch(m.batch)198 }199 for colIdx := range m.typs {200 // Note that we don't need to apply the selection vector of the201 // batch to index m.curIdx because vecToDatumConverter returns a202 // "dense" datum column.203 m.row[colIdx].Datum = m.converter.getDatumColumn(colIdx)[m.curIdx]204 }205 m.curIdx++206 // Note that there is no post-processing to be done in the207 // materializer, so we do not use ProcessRowHelper and emit the row208 // directly.209 return m.row, nil210 }211 return nil, m.DrainHelper()212}213// Next is part of the execinfra.RowSource interface.214func (m *Materializer) Next() (sqlbase.EncDatumRow, *execinfrapb.ProducerMetadata) {215 if err := colexecerror.CatchVectorizedRuntimeError(m.nextAdapter); err != nil {216 m.MoveToDraining(err)217 return nil, m.DrainHelper()218 }219 return m.outputRow, m.outputMetadata220}221// InternalClose helps implement the execinfra.RowSource interface.222func (m *Materializer) InternalClose() bool {223 if m.ProcessorBase.InternalClose() {224 if m.cancelFlow != nil {225 m.cancelFlow()()226 }227 m.closers.CloseAndLogOnErr(m.Ctx, "materializer")228 return true229 }230 return false231}232// ConsumerDone is part of the execinfra.RowSource interface.233func (m *Materializer) ConsumerDone() {234 // Materializer will move into 'draining' state, and after all the metadata235 // has been drained - as part of TrailingMetaCallback - InternalClose() will236 // be called which will cancel the flow.237 m.MoveToDraining(nil /* err */)238}239// ConsumerClosed is part of the execinfra.RowSource interface.240func (m *Materializer) ConsumerClosed() {241 m.InternalClose()242}...
Drain
Using AI Code Generation
1import "fmt"2func main() {3 c := make(chan int)4 go func() {5 for i := 0; i < 10; i++ {6 }7 close(c)8 }()9 for n := range c {10 fmt.Println(n)11 }12}
Drain
Using AI Code Generation
1import (2func main() {3 fmt.Println("Hello World!")4 internal.Drain()5}6import (7func Drain() {8 fmt.Println("Drain")9}10import (11func main() {12 fmt.Println("Hello World!")13 internal.Drain()14}15import (16func main() {17 fmt.Println("Hello World!")18 internal.Drain()19}20import (21func Drain() {22 fmt.Println("Drain")23}
Drain
Using AI Code Generation
1import (2type Employee struct {3}4func main() {5 wg.Add(1)6 go func() {7 defer wg.Done()8 employeeChan := make(chan Employee, 10)9 employeeChan <- Employee{"John", 30}10 employeeChan <- Employee{"Mark", 40}11 employeeChan <- Employee{"Sandy", 50}12 close(employeeChan)13 for employee := range employeeChan {14 fmt.Println("Name:", employee.Name)15 fmt.Println("Age:", employee.Age)16 }17 }()18 wg.Wait()19}20import (21func main() {22 wg.Add(1)23 go func() {24 defer wg.Done()25 intChan := make(chan int, 10)26 close(intChan)27 for i := range intChan {28 fmt.Println("Value:", i)29 }30 }()31 wg.Wait()32}33import (
Drain
Using AI Code Generation
1func main() {2 b.Write([]byte("Hello"))3 b.Write([]byte("World"))4 fmt.Println(b.String())5 b.Drain()6 fmt.Println(b.String())7}8func main() {9 b.Write([]byte("Hello"))10 b.Write([]byte("World"))11 fmt.Println(b.String())12 b.Drain()13 fmt.Println(b.String())14}15func main() {16 b.Write([]byte("Hello"))17 b.Write([]byte("World"))18 fmt.Println(b.String())19 b.Drain()20 fmt.Println(b.String())21}22func main() {23 b.Write([]byte("Hello"))24 b.Write([]byte("World"))25 fmt.Println(b.String())26 b.Drain()27 fmt.Println(b.String())28}
Drain
Using AI Code Generation
1import (2func main() {3 fmt.Println(x.Drain())4}5func (i MyInt) Drain() int {6 return int(i) - 17}8import "testing"9func TestDrain(t *testing.T) {10 if x.Drain() != 4 {11 t.Errorf("Drain should return 4, but returned %d", x.Drain())12 }13}
Drain
Using AI Code Generation
1import (2func main() {3 ic := internal.NewInternalClass()4 ic.Drain()5 fmt.Println("Done")6}7import (8type InternalClass struct {9}10func NewInternalClass() *InternalClass {11 return &InternalClass{}12}13func (ic *InternalClass) Drain() {14 fmt.Println("Drain")15}
Drain
Using AI Code Generation
1import (2func main() {3 fmt.Println("Hello, playground")4 p := &internal.Person{}5 p.Drain()6}7import (8func main() {9 fmt.Println("Hello, playground")10 p := &internal.Person{}11 p.Drain()12}13import (14func main() {15 fmt.Println("Hello, playground")16 p := &internal.Person{}17 p.Drain()18}
Drain
Using AI Code Generation
1import (2func main() {3 fmt.Println(internal.Drain())4}5import "fmt"6func Drain() string {7}8func Flush() string {9}10func FlushToilet() string {11}12func FlushSink() string {13}14import (15func main() {16 fmt.Println(internal.Drain())17}
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!