Source: client_server_test.go
package parallel_support_test
import (
"fmt"
"os"
"time"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/onsi/gomega/gbytes"
"github.com/onsi/ginkgo/v2/internal"
"github.com/onsi/ginkgo/v2/internal/parallel_support"
. "github.com/onsi/ginkgo/v2/internal/test_helpers"
"github.com/onsi/ginkgo/v2/types"
)
type ColorableStringerStruct struct {
Label string
Count int
}
func (s ColorableStringerStruct) String() string {
return fmt.Sprintf("%s %d", s.Label, s.Count)
}
func (s ColorableStringerStruct) ColorableString() string {
return fmt.Sprintf("{{red}}%s {{green}}%d{{/}}", s.Label, s.Count)
}
var _ = Describe("The Parallel Support Client & Server", func() {
for _, protocol := range []string{"RPC", "HTTP"} {
protocol := protocol
Describe(fmt.Sprintf("The %s protocol", protocol), Label(protocol), func() {
var (
server parallel_support.Server
client parallel_support.Client
reporter *FakeReporter
buffer *gbytes.Buffer
)
BeforeEach(func() {
GinkgoT().Setenv("GINKGO_PARALLEL_PROTOCOL", protocol)
var err error
reporter = &FakeReporter{}
server, err = parallel_support.NewServer(3, reporter)
Ω(err).ShouldNot(HaveOccurred())
server.Start()
buffer = gbytes.NewBuffer()
server.SetOutputDestination(buffer)
client = parallel_support.NewClient(server.Address())
Eventually(client.Connect).Should(BeTrue())
DeferCleanup(server.Close)
DeferCleanup(client.Close)
})
Describe("Reporting endpoints", func() {
var beginReport, thirdBeginReport types.Report
var endReport1, endReport2, endReport3 types.Report
var specReportA, specReportB, specReportC types.SpecReport
var t time.Time
BeforeEach(func() {
beginReport = types.Report{SuiteDescription: "my sweet suite"}
thirdBeginReport = types.Report{SuiteDescription: "last one in gets forwarded"}
specReportA = types.SpecReport{LeafNodeText: "A"}
specReportB = types.SpecReport{LeafNodeText: "B"}
specReportC = types.SpecReport{LeafNodeText: "C"}
t = time.Now()
endReport1 = types.Report{StartTime: t.Add(-time.Second), EndTime: t.Add(time.Second), SuiteSucceeded: true, SpecReports: types.SpecReports{specReportA}}
endReport2 = types.Report{StartTime: t.Add(-2 * time.Second), EndTime: t.Add(time.Second), SuiteSucceeded: true, SpecReports: types.SpecReports{specReportB}}
endReport3 = types.Report{StartTime: t.Add(-time.Second), EndTime: t.Add(2 * time.Second), SuiteSucceeded: false, SpecReports: types.SpecReports{specReportC}}
})
Context("before all procs have reported SuiteWillBegin", func() {
BeforeEach(func() {
Ω(client.PostSuiteWillBegin(beginReport)).Should(Succeed())
Ω(client.PostDidRun(specReportA)).Should(Succeed())
Ω(client.PostSuiteWillBegin(beginReport)).Should(Succeed())
Ω(client.PostDidRun(specReportB)).Should(Succeed())
})
It("should not forward anything to the attached reporter", func() {
Ω(reporter.Begin).Should(BeZero())
Ω(reporter.Will).Should(BeEmpty())
Ω(reporter.Did).Should(BeEmpty())
})
Context("when the final proc reports SuiteWillBegin", func() {
BeforeEach(func() {
Ω(client.PostSuiteWillBegin(thirdBeginReport)).Should(Succeed())
})
It("forwards to SuiteWillBegin and catches up on any received summaries", func() {
Ω(reporter.Begin).Should(Equal(thirdBeginReport))
Ω(reporter.Will.Names()).Should(ConsistOf("A", "B"))
Ω(reporter.Did.Names()).Should(ConsistOf("A", "B"))
})
Context("any subsequent summaries", func() {
BeforeEach(func() {
Ω(client.PostDidRun(specReportC)).Should(Succeed())
})
It("are forwarded immediately", func() {
Ω(reporter.Will.Names()).Should(ConsistOf("A", "B", "C"))
Ω(reporter.Did.Names()).Should(ConsistOf("A", "B", "C"))
})
})
Context("when SuiteDidEnd start arriving", func() {
BeforeEach(func() {
Ω(client.PostSuiteDidEnd(endReport1)).Should(Succeed())
Ω(client.PostSuiteDidEnd(endReport2)).Should(Succeed())
})
It("does not forward them yet...", func() {
Ω(reporter.End).Should(BeZero())
})
It("doesn't signal it's done", func() {
Ω(server.GetSuiteDone()).ShouldNot(BeClosed())
})
Context("when the final SuiteDidEnd arrive", func() {
BeforeEach(func() {
Ω(client.PostSuiteDidEnd(endReport3)).Should(Succeed())
})
It("forwards the aggregation of all received end summaries", func() {
Ω(reporter.End.StartTime.Unix()).Should(BeNumerically("~", t.Add(-2*time.Second).Unix()))
Ω(reporter.End.EndTime.Unix()).Should(BeNumerically("~", t.Add(2*time.Second).Unix()))
Ω(reporter.End.RunTime).Should(BeNumerically("~", 4*time.Second))
Ω(reporter.End.SuiteSucceeded).Should(BeFalse())
Ω(reporter.End.SpecReports).Should(ConsistOf(specReportA, specReportB, specReportC))
})
It("should signal it's done", func() {
Ω(server.GetSuiteDone()).Should(BeClosed())
})
})
})
})
})
})
Describe("supporting ReportEntries (which RPC struggled with when I first implemented it)", func() {
BeforeEach(func() {
Ω(client.PostSuiteWillBegin(types.Report{SuiteDescription: "my sweet suite"})).Should(Succeed())
Ω(client.PostSuiteWillBegin(types.Report{SuiteDescription: "my sweet suite"})).Should(Succeed())
Ω(client.PostSuiteWillBegin(types.Report{SuiteDescription: "my sweet suite"})).Should(Succeed())
})
It("can pass in ReportEntries that include custom types", func() {
cl := types.NewCodeLocation(0)
entry, err := internal.NewReportEntry("No Value Entry", cl)
Ω(err).ShouldNot(HaveOccurred())
Ω(client.PostDidRun(types.SpecReport{
LeafNodeText: "no-value",
ReportEntries: types.ReportEntries{entry},
})).Should(Succeed())
entry, err = internal.NewReportEntry("String Value Entry", cl, "The String")
Ω(err).ShouldNot(HaveOccurred())
Ω(client.PostDidRun(types.SpecReport{
LeafNodeText: "string-value",
ReportEntries: types.ReportEntries{entry},
})).Should(Succeed())
entry, err = internal.NewReportEntry("Custom Type Value Entry", cl, ColorableStringerStruct{Label: "apples", Count: 17})
Ω(err).ShouldNot(HaveOccurred())
Ω(client.PostDidRun(types.SpecReport{
LeafNodeText: "custom-value",
ReportEntries: types.ReportEntries{entry},
})).Should(Succeed())
Ω(reporter.Did.Find("no-value").ReportEntries[0].Name).Should(Equal("No Value Entry"))
Ω(reporter.Did.Find("no-value").ReportEntries[0].StringRepresentation()).Should(Equal(""))
Ω(reporter.Did.Find("string-value").ReportEntries[0].Name).Should(Equal("String Value Entry"))
Ω(reporter.Did.Find("string-value").ReportEntries[0].StringRepresentation()).Should(Equal("The String"))
Ω(reporter.Did.Find("custom-value").ReportEntries[0].Name).Should(Equal("Custom Type Value Entry"))
Ω(reporter.Did.Find("custom-value").ReportEntries[0].StringRepresentation()).Should(Equal("{{red}}apples {{green}}17{{/}}"))
})
})
Describe("Streaming output", func() {
It("is configured to stream to stdout", func() {
server, err := parallel_support.NewServer(3, reporter)
Ω(err).ShouldNot(HaveOccurred())
Ω(server.GetOutputDestination().(*os.File).Fd()).Should(Equal(uintptr(1)))
})
It("streams output to the provided buffer", func() {
n, err := client.Write([]byte("hello"))
Ω(n).Should(Equal(5))
Ω(err).ShouldNot(HaveOccurred())
Ω(buffer).Should(gbytes.Say("hello"))
})
})
Describe("Synchronization endpoints", func() {
var proc1Exited, proc2Exited, proc3Exited chan interface{}
BeforeEach(func() {
proc1Exited, proc2Exited, proc3Exited = make(chan interface{}), make(chan interface{}), make(chan interface{})
aliveFunc := func(c chan interface{}) func() bool {
return func() bool {
select {
case <-c:
return false
default:
return true
}
}
}
server.RegisterAlive(1, aliveFunc(proc1Exited))
server.RegisterAlive(2, aliveFunc(proc2Exited))
server.RegisterAlive(3, aliveFunc(proc3Exited))
})
Describe("Managing SynchronizedBeforeSuite synchronization", func() {
Context("when proc 1 succeeds and returns data", func() {
It("passes that data along to other procs", func() {
Ω(client.PostSynchronizedBeforeSuiteCompleted(types.SpecStatePassed, []byte("hello there"))).Should(Succeed())
state, data, err := client.BlockUntilSynchronizedBeforeSuiteData()
Ω(state).Should(Equal(types.SpecStatePassed))
Ω(data).Should(Equal([]byte("hello there")))
Ω(err).ShouldNot(HaveOccurred())
})
})
Context("when proc 1 succeeds and the data happens to be nil", func() {
It("passes reports success and returns nil", func() {
Ω(client.PostSynchronizedBeforeSuiteCompleted(types.SpecStatePassed, nil)).Should(Succeed())
state, data, err := client.BlockUntilSynchronizedBeforeSuiteData()
Ω(state).Should(Equal(types.SpecStatePassed))
Ω(data).Should(BeNil())
Ω(err).ShouldNot(HaveOccurred())
})
})
Context("when proc 1 is skipped", func() {
It("passes that state information along to the other procs", func() {
Ω(client.PostSynchronizedBeforeSuiteCompleted(types.SpecStateSkipped, nil)).Should(Succeed())
state, data, err := client.BlockUntilSynchronizedBeforeSuiteData()
Ω(state).Should(Equal(types.SpecStateSkipped))
Ω(data).Should(BeNil())
Ω(err).ShouldNot(HaveOccurred())
})
})
Context("when proc 1 fails", func() {
It("passes that state information along to the other procs", func() {
Ω(client.PostSynchronizedBeforeSuiteCompleted(types.SpecStateFailed, nil)).Should(Succeed())
state, data, err := client.BlockUntilSynchronizedBeforeSuiteData()
Ω(state).Should(Equal(types.SpecStateFailed))
Ω(data).Should(BeNil())
Ω(err).ShouldNot(HaveOccurred())
})
})
Context("when proc 1 disappears before reporting back", func() {
It("returns a meaningful error", func() {
close(proc1Exited)
state, data, err := client.BlockUntilSynchronizedBeforeSuiteData()
Ω(state).Should(Equal(types.SpecStateInvalid))
Ω(data).Should(BeNil())
Ω(err).Should(MatchError(types.GinkgoErrors.SynchronizedBeforeSuiteDisappearedOnProc1()))
})
})
Context("when proc 1 hasn't responded yet", func() {
It("blocks until it does", func() {
done := make(chan interface{})
go func() {
defer GinkgoRecover()
state, data, err := client.BlockUntilSynchronizedBeforeSuiteData()
Ω(state).Should(Equal(types.SpecStatePassed))
Ω(data).Should(Equal([]byte("hello there")))
Ω(err).ShouldNot(HaveOccurred())
close(done)
}()
Consistently(done).ShouldNot(BeClosed())
Ω(client.PostSynchronizedBeforeSuiteCompleted(types.SpecStatePassed, []byte("hello there"))).Should(Succeed())
Eventually(done).Should(BeClosed())
})
})
})
Describe("BlockUntilNonprimaryProcsHaveFinished", func() {
It("blocks until non-primary procs exit", func() {
done := make(chan interface{})
go func() {
defer GinkgoRecover()
Ω(client.BlockUntilNonprimaryProcsHaveFinished()).Should(Succeed())
close(done)
}()
Consistently(done).ShouldNot(BeClosed())
close(proc2Exited)
Consistently(done).ShouldNot(BeClosed())
close(proc3Exited)
Eventually(done).Should(BeClosed())
})
})
Describe("BlockUntilAggregatedNonprimaryProcsReport", func() {
var specReportA, specReportB types.SpecReport
var endReport2, endReport3 types.Report
BeforeEach(func() {
specReportA = types.SpecReport{LeafNodeText: "A"}
specReportB = types.SpecReport{LeafNodeText: "B"}
endReport2 = types.Report{SpecReports: types.SpecReports{specReportA}}
endReport3 = types.Report{SpecReports: types.SpecReports{specReportB}}
})
It("blocks until all non-primary procs exit, then returns the aggregated report", func() {
done := make(chan interface{})
go func() {
defer GinkgoRecover()
report, err := client.BlockUntilAggregatedNonprimaryProcsReport()
Ω(err).ShouldNot(HaveOccurred())
Ω(report.SpecReports).Should(ConsistOf(specReportA, specReportB))
close(done)
}()
Consistently(done).ShouldNot(BeClosed())
Ω(client.PostSuiteDidEnd(endReport2)).Should(Succeed())
close(proc2Exited)
Consistently(done).ShouldNot(BeClosed())
Ω(client.PostSuiteDidEnd(endReport3)).Should(Succeed())
close(proc3Exited)
Eventually(done).Should(BeClosed())
})
Context("when a non-primary proc disappears without reporting back", func() {
It("blocks returns an appropriate error", func() {
done := make(chan interface{})
go func() {
defer GinkgoRecover()
report, err := client.BlockUntilAggregatedNonprimaryProcsReport()
Ω(err).Should(Equal(types.GinkgoErrors.AggregatedReportUnavailableDueToNodeDisappearing()))
Ω(report).Should(BeZero())
close(done)
}()
Consistently(done).ShouldNot(BeClosed())
Ω(client.PostSuiteDidEnd(endReport2)).Should(Succeed())
close(proc2Exited)
Consistently(done).ShouldNot(BeClosed())
close(proc3Exited)
Eventually(done).Should(BeClosed())
})
})
})
Describe("Fetching counters", func() {
It("returns ascending counters", func() {
Ω(client.FetchNextCounter()).Should(Equal(0))
Ω(client.FetchNextCounter()).Should(Equal(1))
Ω(client.FetchNextCounter()).Should(Equal(2))
Ω(client.FetchNextCounter()).Should(Equal(3))
})
})
Describe("Aborting", func() {
It("should not abort by default", func() {
Ω(client.ShouldAbort()).Should(BeFalse())
})
Context("when told to abort", func() {
BeforeEach(func() {
Ω(client.PostAbort()).Should(Succeed())
})
It("should abort", func() {
Ω(client.ShouldAbort()).Should(BeTrue())
})
})
})
})
})
}
})