How to use Subscribe method of event Package

Best Testkube code snippet using event.Subscribe

client_test.go

Source:client_test.go Github

copy

Full Screen

...53 subsCopy[key] = val54 }55 return subsCopy56}57func TestSubscribe(t *testing.T) {58 ps := NewPubSub(logging.NewLogger("testing"))59 s := kite.New("s", "0.0.0")60 s.Config.DisableAuthentication = true61 doneC, subscribe := handlerWrapper(ps.Subscribe)62 s.HandleFunc("client.Subscribe", subscribe)63 ts := httptest.NewServer(s)64 defer ts.Close()65 c1 := kite.New("c1", "0.0.0").NewClient(fmt.Sprintf("%s/kite", ts.URL))66 c2 := kite.New("c2", "0.0.0").NewClient(fmt.Sprintf("%s/kite", ts.URL))67 err := c1.Dial()68 if err != nil {69 t.Fatal("Failed to connect to testing Kite", err)70 }71 err = c2.Dial()72 if err != nil {73 t.Fatal("Failed to connect to testing Kite", err)74 }75 // Should require arguments76 _, err = c1.Tell("client.Subscribe")77 if err == nil {78 t.Error("client.Subscribe should require args")79 }80 if err = wait(doneC, time.Second); err != nil {81 t.Fatalf("want err = nil; got %v", err)82 }83 // Should require eventName84 _, err = c1.Tell("client.Subscribe", struct {85 Data string86 OnPublish dnode.Function87 }{88 Data: "foo",89 OnPublish: dnode.Callback(func(f *dnode.Partial) {}),90 })91 if err == nil {92 t.Error("client.Subscribe should require EventName")93 }94 if err = wait(doneC, time.Second); err != nil {95 t.Fatalf("want err = nil; got %v", err)96 }97 // Should require onPublish98 _, err = c1.Tell("client.Subscribe", struct {99 eventName string100 Data string101 }{102 eventName: "foo",103 Data: "bar",104 })105 if err == nil {106 t.Error("client.Subscribe should require OnPublish")107 }108 if err = wait(doneC, time.Second); err != nil {109 t.Fatalf("want err = nil; got %v", err)110 }111 // Should require valid onPublish func112 _, err = c1.Tell("client.Subscribe", struct {113 eventName string114 onPublish string115 }{116 eventName: "foo",117 onPublish: "bar",118 })119 if err == nil {120 t.Error("client.Subscribe should require a valid OnPublish func")121 }122 if err = wait(doneC, time.Second); err != nil {123 t.Fatalf("want err = nil; got %v", err)124 }125 // Should subscribe to any given event name126 pRes, err := c1.Tell("client.Subscribe", SubscribeRequest{127 EventName: "test",128 OnPublish: dnode.Callback(func(f *dnode.Partial) {}),129 })130 if err != nil {131 t.Error(err)132 }133 if err = wait(doneC, time.Second); err != nil {134 t.Fatalf("want err = nil; got %v", err)135 }136 subs := getCopy(ps, "test")137 if len(subs) != 1 {138 t.Fatal("client.Subscribe should store a single onPublish callback")139 }140 // Should return the subIndex141 var res SubscribeResponse142 if err = pRes.Unmarshal(&res); err != nil {143 t.Errorf("client.Subscribe should return a valid response struct. err:%s", err)144 }145 if expected := 1; res.ID != expected {146 t.Errorf(147 "client.Subscribe should return the response id. Wanted:%d, Got:%d",148 expected, res.ID,149 )150 }151 // Should store the proper callback152 successC := make(chan struct{}, 1)153 pRes, _ = c1.Tell("client.Subscribe", SubscribeRequest{154 EventName: "test",155 OnPublish: dnode.Callback(func(f *dnode.Partial) {156 select {157 case successC <- struct{}{}:158 case <-time.After(time.Second): // Don't leak go-routines.159 }160 }),161 })162 if err = wait(doneC, time.Second); err != nil {163 t.Fatalf("want err = nil; got %v", err)164 }165 if err != nil {166 t.Fatal(err)167 }168 subs = getCopy(ps, "test")169 if len(subs) != 2 {170 t.Fatal("client.Subscribe should store multiple onPublish callbacks")171 }172 subs[2].Call()173 if err = wait(successC, time.Second); err != nil {174 t.Fatalf("want err = nil; got %v", err)175 }176 if err = pRes.Unmarshal(&res); err != nil {177 t.Errorf("client.Subscribe should return a valid response struct. err:%s", err)178 }179 if expected := 2; res.ID != expected {180 t.Errorf(181 "client.Subscribe should return the response id. Wanted:%d, Got:%d",182 expected, res.ID,183 )184 }185 // Should allow multiple clients to subscribe186 pRes, err = c2.Tell("client.Subscribe", SubscribeRequest{187 EventName: "test",188 OnPublish: dnode.Callback(func(_ *dnode.Partial) {}),189 })190 if err != nil {191 t.Error(err)192 }193 if err = wait(doneC, time.Second); err != nil {194 t.Fatalf("want err = nil; got %v", err)195 }196 subs = getCopy(ps, "test")197 if len(subs) != 3 {198 t.Fatal("client.Subscribe should allow multiple clients to Sub")199 }200 if err = pRes.Unmarshal(&res); err != nil {201 t.Errorf("client.Subscribe should return a valid response struct. err:%s", err)202 }203 if expected := 3; res.ID != expected {204 t.Errorf(205 "client.Subscribe should return the response id. Wanted:%d, Got:%d",206 expected, res.ID,207 )208 }209 // disconnectFunc will be added to kite's OnDisconnect callback slice.210 // Since kite callbacks are synchronous, we will provide synchronization211 // with Subscriptions map.212 disconnectedC := make(chan struct{})213 s.OnDisconnect(func(_ *kite.Client) {214 select {215 case disconnectedC <- struct{}{}:216 case <-time.After(time.Second):217 }218 })219 // Should remove onPublish func after the client disconnects220 c1.Close()221 if err = wait(disconnectedC, 2*time.Second); err != nil {222 t.Fatalf("want err = nil; got %v", err)223 }224 subs = getCopy(ps, "test")225 if len(subs) != 1 {226 t.Error("client.Subscribe",227 "should remove all of a clients callbacks on Disconnect")228 }229 // Should remove the map, when all clients disconnect230 c2.Close()231 if err = wait(disconnectedC, 2*time.Second); err != nil {232 t.Fatalf("want err = nil; got %v", err)233 }234 subs = getCopy(ps, "test")235 if subs != nil {236 t.Error("client.Subscribe",237 "should remove the event map when all clients disconnect")238 }239}240func TestPublish(t *testing.T) {241 ps := NewPubSub(logging.NewLogger("testing"))242 s := kite.New("s", "0.0.0")243 s.Config.DisableAuthentication = true244 doneC, publish := handlerWrapper(ps.Publish)245 s.HandleFunc("client.Publish", publish)246 ts := httptest.NewServer(s)247 defer ts.Close()248 k := kite.New("c", "0.0.0")249 c := k.NewClient(fmt.Sprintf("%s/kite", ts.URL))250 err := c.Dial()251 if err != nil {252 t.Fatal("Failed to connect to testing Kite", err)253 }254 // Should require args255 _, err = c.Tell("client.Publish")256 if err == nil {257 t.Error("client.Publish should require args")258 }259 if err = wait(doneC, time.Second); err != nil {260 t.Fatalf("want err = nil; got %v", err)261 }262 // Should require eventName263 _, err = c.Tell("client.Publish", struct {264 Random string265 Data string266 }{267 Random: "foo",268 Data: "bar",269 })270 if err == nil {271 t.Error("client.Publish should require EventName")272 }273 if err = wait(doneC, time.Second); err != nil {274 t.Fatalf("want err = nil; got %v", err)275 }276 // Should require subscriptions for the given event277 _, err = c.Tell("client.Publish", PublishRequest{278 EventName: "foo",279 })280 if err == nil {281 t.Error("client.Publish should return an error, without any subs")282 }283 if err = wait(doneC, time.Second); err != nil {284 t.Fatalf("want err = nil; got %v", err)285 }286 // Should call onPublish callbacks287 callbackCount := 0288 ps.Subscriptions["test"] = map[int]dnode.Function{289 0: {mockCaller(func(v ...interface{}) error {290 callbackCount += 1291 return nil292 })},293 1: {mockCaller(func(v ...interface{}) error {294 callbackCount += 2295 return nil296 })},297 }298 _, err = c.Tell("client.Publish", PublishRequest{299 EventName: "test",300 })301 if err != nil {302 t.Fatal("client.Publish should call onPublish callbacks without error.", err)303 }304 if err = wait(doneC, time.Second); err != nil {305 t.Fatalf("want err = nil; got %v", err)306 }307 if callbackCount != 3 {308 t.Fatal("client.Publish should call onPublish callbacks")309 }310 // Should publish arbitrary data311 var b []byte312 updatedC := make(chan struct{}, 1)313 ps.Subscriptions["other"] = map[int]dnode.Function{314 0: {mockCaller(func(v ...interface{}) error {315 b = v[0].([]interface{})[0].(*dnode.Partial).Raw316 select {317 case updatedC <- struct{}{}:318 case <-time.After(time.Second):319 }320 return nil321 })},322 }323 _, err = c.Tell("client.Publish", struct {324 EventName string325 CountData int326 ListData []string327 }{328 EventName: "other",329 CountData: 42,330 ListData: []string{"life", "universe", "everything"},331 })332 if err != nil {333 t.Fatal("client.Publish should publish data without error", err)334 }335 if err = wait(doneC, time.Second); err != nil {336 t.Fatalf("want err = nil; got %v", err)337 }338 // callback is called by another go-routine. we need to synchronize it.339 if err = wait(updatedC, time.Second); err != nil {340 t.Fatalf("want err = nil; got %v", err)341 }342 // This might be a faulty check, because the order of the data may343 // change. If it does, we'll just unmarshall and compare.344 expected := `{"EventName":"other","CountData":42,"ListData":["life","universe","everything"]}`345 if string(b) != expected {346 t.Error("client.Publish should publish arbitrary")347 }348}349func TestUnsubscribe(t *testing.T) {350 ps := NewPubSub(logging.NewLogger("testing"))351 s := kite.New("s", "0.0.0")352 s.Config.DisableAuthentication = true353 donePubC, publish := handlerWrapper(ps.Publish)354 s.HandleFunc("client.Publish", publish)355 doneSubC, subscribe := handlerWrapper(ps.Subscribe)356 s.HandleFunc("client.Subscribe", subscribe)357 doneUnsubC, unsubscribe := handlerWrapper(ps.Unsubscribe)358 s.HandleFunc("client.Unsubscribe", unsubscribe)359 ts := httptest.NewServer(s)360 defer ts.Close()361 c1 := kite.New("c1", "0.0.0").NewClient(fmt.Sprintf("%s/kite", ts.URL))362 c2 := kite.New("c2", "0.0.0").NewClient(fmt.Sprintf("%s/kite", ts.URL))363 err := c1.Dial()364 if err != nil {365 t.Fatal("Failed to connect to testing Kite", err)366 }367 err = c2.Dial()368 if err != nil {369 t.Fatal("Failed to connect to testing Kite", err)370 }371 // Track the calls to our subs.372 callsMu := sync.Mutex{} // protects calls map.373 calls := map[string]bool{}374 var wg sync.WaitGroup375 wg.Add(3)376 // Setup our event, sub index 1377 _, err = c1.Tell("client.Subscribe", SubscribeRequest{378 EventName: "test",379 OnPublish: dnode.Callback(func(f *dnode.Partial) {380 defer wg.Done()381 callsMu.Lock()382 defer callsMu.Unlock()383 calls["c1:1"] = true384 }),385 })386 if err != nil {387 t.Fatal(err)388 }389 if err = wait(doneSubC, time.Second); err != nil {390 t.Fatalf("want err = nil; got %v", err)391 }392 // Setup our event, sub index 2393 _, err = c2.Tell("client.Subscribe", SubscribeRequest{394 EventName: "test",395 OnPublish: dnode.Callback(func(f *dnode.Partial) {396 defer wg.Done()397 callsMu.Lock()398 defer callsMu.Unlock()399 calls["c2:2"] = true400 }),401 })402 if err != nil {403 t.Fatal(err)404 }405 if err = wait(doneSubC, time.Second); err != nil {406 t.Fatalf("want err = nil; got %v", err)407 }408 // Setup our event, sub index 3409 _, err = c2.Tell("client.Subscribe", SubscribeRequest{410 EventName: "test",411 OnPublish: dnode.Callback(func(f *dnode.Partial) {412 defer wg.Done()413 callsMu.Lock()414 defer callsMu.Unlock()415 calls["c2:3"] = true416 }),417 })418 if err != nil {419 t.Fatal(err)420 }421 if err = wait(doneSubC, time.Second); err != nil {422 t.Fatalf("want err = nil; got %v", err)423 }424 // Setup our event, sub index 4425 _, err = c1.Tell("client.Subscribe", SubscribeRequest{426 EventName: "test",427 OnPublish: dnode.Callback(func(f *dnode.Partial) {428 defer wg.Done()429 callsMu.Lock()430 defer callsMu.Unlock()431 calls["c1:4"] = true432 }),433 })434 if err != nil {435 t.Fatal(err)436 }437 if err = wait(doneSubC, time.Second); err != nil {438 t.Fatalf("want err = nil; got %v", err)439 }...

Full Screen

Full Screen

filter_system.go

Source:filter_system.go Github

copy

Full Screen

...139 es.install <- sub140 <-sub.installed141 return &Subscription{ID: sub.id, f: sub, es: es}142}143// SubscribeLogs creates a subscription that will write all logs matching the144// given criteria to the given logs channel. Default value for the from and to145// block is "latest". If the fromBlock > toBlock an error is returned.146func (es *EventSystem) SubscribeLogs(crit FilterCriteria, logs chan []*types.Log) (*Subscription, error) {147 var from, to rpc.BlockNumber148 if crit.FromBlock == nil {149 from = rpc.LatestBlockNumber150 } else {151 from = rpc.BlockNumber(crit.FromBlock.Int64())152 }153 if crit.ToBlock == nil {154 to = rpc.LatestBlockNumber155 } else {156 to = rpc.BlockNumber(crit.ToBlock.Int64())157 }158 // only interested in pending logs159 if from == rpc.PendingBlockNumber && to == rpc.PendingBlockNumber {160 return es.subscribePendingLogs(crit, logs), nil161 }162 // only interested in new mined logs163 if from == rpc.LatestBlockNumber && to == rpc.LatestBlockNumber {164 return es.subscribeLogs(crit, logs), nil165 }166 // only interested in mined logs within a specific block range167 if from >= 0 && to >= 0 && to >= from {168 return es.subscribeLogs(crit, logs), nil169 }170 // interested in mined logs from a specific block number, new logs and pending logs171 if from >= rpc.LatestBlockNumber && to == rpc.PendingBlockNumber {172 return es.subscribeMinedPendingLogs(crit, logs), nil173 }174 // interested in logs from a specific block number to new mined blocks175 if from >= 0 && to == rpc.LatestBlockNumber {176 return es.subscribeLogs(crit, logs), nil177 }178 return nil, fmt.Errorf("invalid from and to block combination: from > to")179}180// subscribeMinedPendingLogs creates a subscription that returned mined and181// pending logs that match the given criteria.182func (es *EventSystem) subscribeMinedPendingLogs(crit FilterCriteria, logs chan []*types.Log) *Subscription {183 sub := &subscription{184 id: rpc.NewID(),185 typ: MinedAndPendingLogsSubscription,186 logsCrit: crit,187 created: time.Now(),188 logs: logs,189 hashes: make(chan common.Hash),190 headers: make(chan *types.Header),191 installed: make(chan struct{}),192 err: make(chan error),193 }194 return es.subscribe(sub)195}196// subscribeLogs creates a subscription that will write all logs matching the197// given criteria to the given logs channel.198func (es *EventSystem) subscribeLogs(crit FilterCriteria, logs chan []*types.Log) *Subscription {199 sub := &subscription{200 id: rpc.NewID(),201 typ: LogsSubscription,202 logsCrit: crit,203 created: time.Now(),204 logs: logs,205 hashes: make(chan common.Hash),206 headers: make(chan *types.Header),207 installed: make(chan struct{}),208 err: make(chan error),209 }210 return es.subscribe(sub)211}212// subscribePendingLogs creates a subscription that writes transaction hashes for213// transactions that enter the transaction pool.214func (es *EventSystem) subscribePendingLogs(crit FilterCriteria, logs chan []*types.Log) *Subscription {215 sub := &subscription{216 id: rpc.NewID(),217 typ: PendingLogsSubscription,218 logsCrit: crit,219 created: time.Now(),220 logs: logs,221 hashes: make(chan common.Hash),222 headers: make(chan *types.Header),223 installed: make(chan struct{}),224 err: make(chan error),225 }226 return es.subscribe(sub)227}228// SubscribeNewHeads creates a subscription that writes the header of a block that is229// imported in the chain.230func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscription {231 sub := &subscription{232 id: rpc.NewID(),233 typ: BlocksSubscription,234 created: time.Now(),235 logs: make(chan []*types.Log),236 hashes: make(chan common.Hash),237 headers: headers,238 installed: make(chan struct{}),239 err: make(chan error),240 }241 return es.subscribe(sub)242}243// SubscribePendingTxEvents creates a subscription that writes transaction hashes for244// transactions that enter the transaction pool.245func (es *EventSystem) SubscribePendingTxEvents(hashes chan common.Hash) *Subscription {246 sub := &subscription{247 id: rpc.NewID(),248 typ: PendingTransactionsSubscription,249 created: time.Now(),250 logs: make(chan []*types.Log),251 hashes: hashes,252 headers: make(chan *types.Header),253 installed: make(chan struct{}),254 err: make(chan error),255 }256 return es.subscribe(sub)257}258type filterIndex map[Type]map[rpc.ID]*subscription259// broadcast event to filters that match criteria.260func (es *EventSystem) broadcast(filters filterIndex, ev interface{}) {261 if ev == nil {262 return263 }264 switch e := ev.(type) {265 case []*types.Log:266 if len(e) > 0 {267 for _, f := range filters[LogsSubscription] {268 if matchedLogs := filterLogs(e, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {269 f.logs <- matchedLogs270 }271 }272 }273 case core.RemovedLogsEvent:274 for _, f := range filters[LogsSubscription] {275 if matchedLogs := filterLogs(e.Logs, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {276 f.logs <- matchedLogs277 }278 }279 case *event.TypeMuxEvent:280 switch muxe := e.Data.(type) {281 case core.PendingLogsEvent:282 for _, f := range filters[PendingLogsSubscription] {283 if e.Time.After(f.created) {284 if matchedLogs := filterLogs(muxe.Logs, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {285 f.logs <- matchedLogs286 }287 }288 }289 }290 case core.TxPreEvent:291 for _, f := range filters[PendingTransactionsSubscription] {292 f.hashes <- e.Tx.Hash()293 }294 case core.ChainEvent:295 for _, f := range filters[BlocksSubscription] {296 f.headers <- e.Block.Header()297 }298 if es.lightMode && len(filters[LogsSubscription]) > 0 {299 es.lightFilterNewHead(e.Block.Header(), func(header *types.Header, remove bool) {300 for _, f := range filters[LogsSubscription] {301 if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 {302 f.logs <- matchedLogs303 }304 }305 })306 }307 }308}309func (es *EventSystem) lightFilterNewHead(newHeader *types.Header, callBack func(*types.Header, bool)) {310 oldh := es.lastHead311 es.lastHead = newHeader312 if oldh == nil {313 return314 }315 newh := newHeader316 // find common ancestor, create list of rolled back and new block hashes317 var oldHeaders, newHeaders []*types.Header318 for oldh.Hash() != newh.Hash() {319 if oldh.Number.Uint64() >= newh.Number.Uint64() {320 oldHeaders = append(oldHeaders, oldh)321 oldh = core.GetHeader(es.backend.ChainDb(), oldh.ParentHash, oldh.Number.Uint64()-1)322 }323 if oldh.Number.Uint64() < newh.Number.Uint64() {324 newHeaders = append(newHeaders, newh)325 newh = core.GetHeader(es.backend.ChainDb(), newh.ParentHash, newh.Number.Uint64()-1)326 if newh == nil {327 // happens when CHT syncing, nothing to do328 newh = oldh329 }330 }331 }332 // roll back old blocks333 for _, h := range oldHeaders {334 callBack(h, true)335 }336 // check new blocks (array is in reverse order)337 for i := len(newHeaders) - 1; i >= 0; i-- {338 callBack(newHeaders[i], false)339 }340}341// filter logs of a single header in light client mode342func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common.Address, topics [][]common.Hash, remove bool) []*types.Log {343 if bloomFilter(header.Bloom, addresses, topics) {344 // Get the logs of the block345 ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)346 defer cancel()347 receipts, err := es.backend.GetReceipts(ctx, header.Hash())348 if err != nil {349 return nil350 }351 var unfiltered []*types.Log352 for _, receipt := range receipts {353 for _, log := range receipt.Logs {354 logcopy := *log355 logcopy.Removed = remove356 unfiltered = append(unfiltered, &logcopy)357 }358 }359 logs := filterLogs(unfiltered, nil, nil, addresses, topics)360 return logs361 }362 return nil363}364// eventLoop (un)installs filters and processes mux events.365func (es *EventSystem) eventLoop() {366 var (367 index = make(filterIndex)368 sub = es.mux.Subscribe(core.PendingLogsEvent{})369 // Subscribe TxPreEvent form txpool370 txCh = make(chan core.TxPreEvent, txChanSize)371 txSub = es.backend.SubscribeTxPreEvent(txCh)372 // Subscribe RemovedLogsEvent373 rmLogsCh = make(chan core.RemovedLogsEvent, rmLogsChanSize)374 rmLogsSub = es.backend.SubscribeRemovedLogsEvent(rmLogsCh)375 // Subscribe []*types.Log376 logsCh = make(chan []*types.Log, logsChanSize)377 logsSub = es.backend.SubscribeLogsEvent(logsCh)378 // Subscribe ChainEvent379 chainEvCh = make(chan core.ChainEvent, chainEvChanSize)380 chainEvSub = es.backend.SubscribeChainEvent(chainEvCh)381 )382 // Unsubscribe all events383 defer sub.Unsubscribe()384 defer txSub.Unsubscribe()385 defer rmLogsSub.Unsubscribe()386 defer logsSub.Unsubscribe()387 defer chainEvSub.Unsubscribe()388 for i := UnknownSubscription; i < LastIndexSubscription; i++ {389 index[i] = make(map[rpc.ID]*subscription)390 }391 for {392 select {393 case ev, active := <-sub.Chan():394 if !active { // system stopped...

Full Screen

Full Screen

main.go

Source:main.go Github

copy

Full Screen

...19 client, err := examples.CreateChainClientWithSDKConf(sdkConfigOrg1Client1Path)20 if err != nil {21 log.Fatalln(err)22 }23 go testSubscribeBlock(client, false)24 go testSubscribeBlock(client, true)25 go testSubscribeTx(client)26 go testSubscribeContractEvent(client)27 select {}28}29func testSubscribeBlock(client *sdk.ChainClient, onlyHeader bool) {30 ctx, cancel := context.WithCancel(context.Background())31 defer cancel()32 c, err := client.SubscribeBlock(ctx, 0, 10, true, onlyHeader)33 //c, err := client.SubscribeBlock(ctx, 10, -1, true, onlyHeader)34 if err != nil {35 log.Fatalln(err)36 }37 for {38 select {39 case block, ok := <-c:40 if !ok {41 fmt.Println("chan is close!")42 return43 }44 if block == nil {45 log.Fatalln("require not nil")46 }47 if onlyHeader {48 blockHeader, ok := block.(*common.BlockHeader)49 if !ok {50 log.Fatalln("require true")51 }52 fmt.Printf("recv blockHeader [%d] => %+v\n", blockHeader.BlockHeight, blockHeader)53 } else {54 blockInfo, ok := block.(*common.BlockInfo)55 if !ok {56 log.Fatalln("require true")57 }58 fmt.Printf("recv blockInfo [%d] => %+v\n", blockInfo.Block.Header.BlockHeight, blockInfo)59 }60 //if err := client.Stop(); err != nil {61 // return62 //}63 //return64 case <-ctx.Done():65 return66 }67 }68}69func testSubscribeTx(client *sdk.ChainClient) {70 ctx, cancel := context.WithCancel(context.Background())71 defer cancel()72 c, err := client.SubscribeTx(ctx, 10, 30, "", nil)73 //c, err := client.SubscribeTx(ctx, 0, 4, "", nil)74 //c, err := client.SubscribeTx(ctx, 0, -1, "", []string{"1b70bb886c784a0587590da3a0af8fd336aa1a806be4431db31ceeba4a912f93"})75 //c, err := client.SubscribeTx(ctx, 0, -1, syscontract.SystemContract_CERT_MANAGE.String(), nil)76 if err != nil {77 log.Fatalln(err)78 }79 for {80 select {81 case txI, ok := <-c:82 if !ok {83 fmt.Println("chan is close!")84 return85 }86 if txI == nil {87 log.Fatalln("require not nil")88 }89 tx, ok := txI.(*common.Transaction)90 if !ok {91 log.Fatalln("require true")92 }93 fmt.Printf("recv tx [%s] => %+v\n", tx.Payload.TxId, tx)94 //if err := client.Stop(); err != nil {95 // return96 //}97 //return98 case <-ctx.Done():99 return100 }101 }102}103func testSubscribeContractEvent(client *sdk.ChainClient) {104 ctx, cancel := context.WithCancel(context.Background())105 defer cancel()106 //订阅指定合约的合约事件107 // 1. 获取所有历史+实时108 // c, err := client.SubscribeContractEvent(ctx, 0, -1, "claim005", "topic_vx")109 // 2. 获取实时110 //c, err := client.SubscribeContractEvent(ctx, -1, -1, "claim005", "topic_vx")111 // 3. 获取实时(兼容老版本)112 //c, err := client.SubscribeContractEvent(ctx, 0, 0, "claim005", "topic_vx")113 // 4. 获取历史到指定历史高度114 //c, err := client.SubscribeContractEvent(ctx, 0, 10, "claim005", "topic_vx")115 // 5. 获取历史到指定实时高度116 //c, err := client.SubscribeContractEvent(ctx, 0, 28, "claim005", "topic_vx")117 // 6. 获取实时直到指定高度118 //c, err := client.SubscribeContractEvent(ctx, -1, 25, "claim005", "topic_vx")119 // 7. 订阅所有topic120 c, err := client.SubscribeContractEvent(ctx, 0, -1, "claim005", "")121 // 7. 报错:起始高度高于当前区块高度,直接退出122 //c, err := client.SubscribeContractEvent(ctx, 25, 30, "claim005", "topic_vx")123 // 8. 报错:起始高度低于终止高度124 //c, err := client.SubscribeContractEvent(ctx, 25, 20, "claim005", "topic_vx")125 // 9. 报错:起始高度/终止高度低于-1126 //c, err := client.SubscribeContractEvent(ctx, -2, 20, "claim005", "topic_vx")127 //c, err := client.SubscribeContractEvent(ctx, 0, 0, "claim005", "")128 //c, err := client.SubscribeContractEvent(ctx, "64f50d594c2a739c7088f9fc6785e1934030e17b52f1a894baec61b98633a59f", "9c01b4c21d1907ab27aa23343493b3c9872777e3")129 if err != nil {130 log.Fatalln(err)131 }132 for {133 select {134 case event, ok := <-c:135 if !ok {136 fmt.Println("chan is close!")137 return138 }139 if event == nil {140 log.Fatalln("require not nil")141 }142 contractEventInfo, ok := event.(*common.ContractEventInfo)...

Full Screen

Full Screen

Subscribe

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 c := cron.New()4 c.AddFunc("*/1 * * * * *", func() { fmt.Println("Every second") })5 c.Start()6 select {}7}

Full Screen

Full Screen

Subscribe

Using AI Code Generation

copy

Full Screen

1import (2type Event struct {3 Subscribers []func(string)4}5func (e *Event) Subscribe(subscriber func(string)) {6 e.Subscribers = append(e.Subscribers, subscriber)7}8func (e *Event) Publish(msg string) {9 for _, subscriber := range e.Subscribers {10 subscriber(msg)11 }12}13func main() {14 e := new(Event)15 e.Subscribe(func(msg string) {16 fmt.Println("Subscriber 1:", msg)17 })18 e.Subscribe(func(msg string) {19 fmt.Println("Subscriber 2:", msg)20 })21 e.Publish("Hello")22}

Full Screen

Full Screen

Subscribe

Using AI Code Generation

copy

Full Screen

1import (2type Event struct {3 Subscribers []func(interface{})4}5func (e *Event) Subscribe(sub func(interface{})) {6 e.Subscribers = append(e.Subscribers, sub)7}8func (e *Event) Publish(data interface{}) {9 for _, sub := range e.Subscribers {10 sub(data)11 }12}13func main() {14 e := &Event{}15 e.Subscribe(func(data interface{}) {16 fmt.Println("Subscriber 1:", data)17 })18 e.Subscribe(func(data interface{}) {19 fmt.Println("Subscriber 2:", data)20 })21 e.Publish("Hello World")22}23import (24type Event struct {25 Subscribers []func(interface{})26}27func (e *Event) Subscribe(sub func(interface{})) {28 e.Subscribers = append(e.Subscribers, sub)29}30func (e *Event) Publish(data interface{}) {31 for _, sub := range e.Subscribers {32 sub(data)33 }34}35type Subscriber struct {36}37func (s *Subscriber) HandleEvent(data interface{}) {38 fmt.Println("Subscriber:", s.Name, "Data:", data)39}40func main() {41 e := &Event{}42 s1 := &Subscriber{Name: "S1"}43 s2 := &Subscriber{Name: "S2"}44 e.Subscribe(s1.HandleEvent)45 e.Subscribe(s2.HandleEvent)46 e.Publish("Hello World")47}48import (49type Event struct {50 Subscribers []func(interface{})51}52func (e *Event) Subscribe(sub func(interface{})) {53 e.Subscribers = append(e.Subscribers, sub)54}55func (e *Event) Publish(data interface{}) {56 for _, sub := range e.Subscribers {57 sub(data)58 }59}60type Subscriber struct {61}62func (s *Subscriber) HandleEvent(data interface{}) {63 fmt.Println("Subscriber:", s.Name, "Data:", data)64}65func main() {66 e := &Event{}

Full Screen

Full Screen

Subscribe

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 event := NewEvent()4 event.Subscribe("test", func() {5 fmt.Println("test")6 })7 event.Subscribe("test", func() {8 fmt.Println("test")9 })10 event.Publish("test")11}12type Event struct {13}14func NewEvent() *Event {15 return &Event{16 events: make(map[string][]reflect.Value),17 }18}19func (e *Event) Subscribe(name string, fn interface{}) {20 e.events[name] = append(e.events[name], reflect.ValueOf(fn))21}22func (e *Event) Publish(name string) {23 for _, fn := range e.events[name] {24 fn.Call([]reflect.Value{})25 }26}

Full Screen

Full Screen

Subscribe

Using AI Code Generation

copy

Full Screen

1import "fmt"2func main() {3 e := event.NewEvent()4 s := event.NewSubscriber()5 e.Subscribe(s)6 e.Publish("Hello World")7}8import "fmt"9func main() {10 e := event.NewEvent()11 s := event.NewSubscriber()12 e.Subscribe(s)13 e.Publish("Hello World")14}15import "fmt"16func main() {17 e := event.NewEvent()18 s := event.NewSubscriber()19 e.Subscribe(s)20 e.Publish("Hello World")21}22import "fmt"23func main() {24 e := event.NewEvent()25 s := event.NewSubscriber()26 e.Subscribe(s)27 e.Publish("Hello World")28}29import "fmt"30func main() {31 e := event.NewEvent()32 s := event.NewSubscriber()33 e.Subscribe(s)34 e.Publish("Hello World")35}36import "fmt"37func main() {38 e := event.NewEvent()

Full Screen

Full Screen

Subscribe

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 e := NewEvent()4 e.Subscribe(func() {5 fmt.Println("Hello World")6 })7 e.Publish()8 time.Sleep(time.Second)9}10import (11func main() {12 e := NewEvent()13 e.Subscribe(func() {14 fmt.Println("Hello World")15 })16 e.Publish()17 time.Sleep(time.Second)18}19type Event struct {20 subscribers []func()21}22func NewEvent() *Event {23 return &Event{}24}25func (e *Event) Subscribe(f func()) {26 e.subscribers = append(e.subscribers, f)27}28func (e *Event) Publish() {29 for _, f := range e.subscribers {30 f()31 }32}33import (34func main() {35 e := NewEvent()36 e.Subscribe(func() {37 fmt.Println("Hello World")38 })39 e.Subscribe(func() {40 fmt.Println("Hello Go")41 })42 e.Publish()43 time.Sleep(time.Second)44}45type Event struct {46 subscribers []func()47}48func NewEvent() *Event {49 return &Event{}50}51func (e *Event) Subscribe(f func()) {52 e.subscribers = append(e.subscribers, f)53}54func (e *Event) Publish() {55 for _, f := range e.subscribers {56 f()57 }58}59import (60func main() {61 e := NewEvent()62 e.Subscribe(func() {63 fmt.Println("Hello World")64 })

Full Screen

Full Screen

Subscribe

Using AI Code Generation

copy

Full Screen

1import "fmt"2import "time"3type Event struct {4 Data interface{}5}6type EventListener struct {7}8func (e *EventListener) Subscribe() {9 e.Events = make(chan *Event)10}11func (e *EventListener) Unsubscribe() {12 close(e.Events)13}14func (e *EventListener) Listen() {15 for {16 fmt.Println(event.Name)17 }18}19func main() {20 listener := EventListener{}21 listener.Subscribe()22 go listener.Listen()23 time.Sleep(time.Second)24 listener.Unsubscribe()25}26main.main()27listener.Events = make(chan *Event)28main.main()29listener.Unsubscribe()30main.main()31listener.Events <- &Event{Name: "test"}32main.main()

Full Screen

Full Screen

Subscribe

Using AI Code Generation

copy

Full Screen

1import (2type Event struct {3 subscribers []interface{}4}5func (e *Event) Subscribe(f interface{}) {6 e.subscribers = append(e.subscribers, f)7}8func (e *Event) Unsubscribe(f interface{}) {9 for i, s := range e.subscribers {10 if s == f {11 e.subscribers = append(e.subscribers[:i], e.subscribers[i+1:]...)12 }13 }14}15func (e *Event) Publish(params ...interface{}) {16 for _, s := range e.subscribers {17 f := reflect.ValueOf(s)18 in := make([]reflect.Value, len(params))19 for k, param := range params {20 in[k] = reflect.ValueOf(param)21 }22 f.Call(in)23 }24}25var event = Event{}26func subscriber1(name string, age int) {27 fmt.Println("Subscriber 1 called with name", name, "and age", age)28}29func subscriber2(name string, age int) {30 fmt.Println("Subscriber 2 called with name", name, "and age", age)31}32func main() {33 event.Subscribe(subscriber1)34 event.Subscribe(subscriber2)35 event.Publish("John", 25)36 event.Unsubscribe(subscriber1)37 event.Publish("Mary", 30)38}39import (40type Event struct {

Full Screen

Full Screen

Subscribe

Using AI Code Generation

copy

Full Screen

1import (2func main() {3e := events.New()4e.Subscribe("event", func() {5fmt.Println("Hello world")6})7e.Call("event")8}9import (10func main() {11e := events.New()12e.Subscribe("event", func() {13fmt.Println("Hello world")14})15e.Call("event")16}17import (18func main() {19e := events.New()20e.Subscribe("event", func() {21fmt.Println("Hello world")22})23e.Call("event")24}25import (26func main() {27e := events.New()28e.Subscribe("event", func() {29fmt.Println("Hello world")30})31e.Call("event")32}33import (34func main() {35e := events.New()36e.Subscribe("event", func() {37fmt.Println("Hello world")38})39e.Call("event")40}41import (42func main() {43e := events.New()44e.Subscribe("event", func() {45fmt.Println("Hello world")46})47e.Call("event")48}

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful