Best Testkube code snippet using client.TailPodLogs
manager.go
Source:manager.go  
1package kubelogs2import (3	"bufio"4	"context"5	"errors"6	"fmt"7	"strings"8	"sync"9	"time"10	v1 "k8s.io/api/core/v1"11	apierrors "k8s.io/apimachinery/pkg/api/errors"12	"k8s.io/apimachinery/pkg/api/meta"13	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"14	"k8s.io/client-go/informers"15	"k8s.io/client-go/kubernetes"16	listerv1 "k8s.io/client-go/listers/core/v1"17	"k8s.io/client-go/tools/cache"18	"github.com/ripta/axe/pkg/logger"19)20var ErrInformerNeverSynced = errors.New("informer cache never completed syncing")21type Manager struct {22	kubernetes.Interface23	debug bool24	l     logger.Interface25	logCh chan logger.LogLine26	mu    sync.Mutex27	nsCancelers     map[string]context.CancelFunc28	nsInformers     map[string]informers.SharedInformerFactory29	podLogCancelers map[string]context.CancelFunc30	containerTails map[string]bool31	lookback time.Duration32	resync   time.Duration33}34func NewManager(l logger.Interface, cs kubernetes.Interface, lookback, resync time.Duration, debug bool) *Manager {35	if lookback > 0 {36		lookback = -lookback37	}38	if lookback == 0 {39		lookback = -5 * time.Minute40	}41	return &Manager{42		Interface: cs,43		debug:     debug,44		l:         l,45		mu:        sync.Mutex{},46		logCh:     make(chan logger.LogLine, 1000),47		containerTails:  make(map[string]bool),48		nsCancelers:     make(map[string]context.CancelFunc),49		nsInformers:     make(map[string]informers.SharedInformerFactory),50		podLogCancelers: make(map[string]context.CancelFunc),51		lookback: lookback,52		resync:   resync,53	}54}55func (m *Manager) ContainerCount() (int, int) {56	var active, all int57	m.mu.Lock()58	defer m.mu.Unlock()59	for _, up := range m.containerTails {60		all += 161		if up {62			active += 163		}64	}65	return active, all66}67func (m *Manager) Logs() <-chan logger.LogLine {68	return m.logCh69}70func (m *Manager) NamespaceCount() int {71	m.mu.Lock()72	defer m.mu.Unlock()73	return len(m.nsInformers)74}75func (m *Manager) Run(ctx context.Context) error {76	m.mu.Lock()77	defer m.mu.Unlock()78	for ns, inf := range m.nsInformers {79		ctx, cancel := context.WithCancel(ctx)80		m.nsCancelers[ns] = cancel81		inf.Start(ctx.Done())82	}83	return m.unsafeWaitForCacheSync(ctx.Done())84}85func (m *Manager) WaitForCacheSync(stopCh <-chan struct{}) error {86	m.mu.Lock()87	defer m.mu.Unlock()88	return m.unsafeWaitForCacheSync(stopCh)89}90func (m *Manager) unsafeWaitForCacheSync(stopCh <-chan struct{}) error {91	for ns, inf := range m.nsInformers {92		for typ, ok := range inf.WaitForCacheSync(stopCh) {93			if !ok {94				return fmt.Errorf("%w for type %s in namespace %s", ErrInformerNeverSynced, typ.String(), ns)95			}96		}97		m.l.Printf("cache synced for namespace %s", ns)98	}99	return nil100}101func (m *Manager) Watch(namespace string) {102	m.mu.Lock()103	defer m.mu.Unlock()104	if _, ok := m.nsInformers[namespace]; !ok {105		inf := informers.NewSharedInformerFactoryWithOptions(m.Interface, m.resync, informers.WithNamespace(namespace))106		inf.Core().V1().Pods().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{107			AddFunc: func(newobj interface{}) {108				om, err := meta.Accessor(newobj)109				if err != nil {110					m.l.Printf("could not retrieve meta information from new object during add: %+v", err)111					return112				}113				m.startPodLogs(om.GetNamespace(), om.GetName())114			},115			UpdateFunc: func(_, newobj interface{}) {116				om, err := meta.Accessor(newobj)117				if err != nil {118					m.l.Printf("could not retrieve meta information from new object during update: %+v", err)119					return120				}121				m.startPodLogs(om.GetNamespace(), om.GetName())122			},123			DeleteFunc: func(oldobj interface{}) {124				om, err := meta.Accessor(oldobj)125				if err != nil {126					m.l.Printf("could not retrieve meta information from old object during delete: %+v", err)127					return128				}129				m.stopPodLogs(om.GetNamespace(), om.GetName())130			},131		})132		m.l.Printf("registered watch for namespace %s", namespace)133		m.nsInformers[namespace] = inf134	}135}136func (m *Manager) Unwatch(namespace string) {137	m.mu.Lock()138	defer m.mu.Unlock()139	if _, ok := m.nsInformers[namespace]; ok {140		if cancel, ok := m.nsCancelers[namespace]; ok {141			cancel()142		}143		m.l.Printf("stopped watching namespace %s", namespace)144		delete(m.nsInformers, namespace)145		delete(m.nsCancelers, namespace)146		stops := make([]string, 0)147		for key := range m.podLogCancelers {148			if strings.HasPrefix(key, namespace+"/") {149				stops = append(stops, key)150			}151		}152		for _, key := range stops {153			if cancel, ok := m.podLogCancelers[key]; ok {154				cancel()155			}156			m.l.Printf("stopped tailing logs for %s", key)157			delete(m.podLogCancelers, key)158		}159	}160}161func (m *Manager) stopPodLogs(ns, name string) {162	m.mu.Lock()163	defer m.mu.Unlock()164	m.l.Printf("stopping pod logs for %s/%s", ns, name)165	key := fmt.Sprintf("%s/%s", ns, name)166	if cancel, ok := m.podLogCancelers[key]; ok {167		cancel()168	}169	delete(m.podLogCancelers, key)170}171func (m *Manager) startPodLogs(ns, name string) {172	m.mu.Lock()173	defer m.mu.Unlock()174	key := fmt.Sprintf("%s/%s", ns, name)175	if _, ok := m.podLogCancelers[key]; ok {176		return177	}178	ctx, cancel := context.WithCancel(context.Background())179	m.podLogCancelers[key] = cancel180	go m.tailPodLogs(ctx, ns, name)181}182func (m *Manager) tailPodLogs(ctx context.Context, ns, name string) {183	m.l.Printf("starting tail of logs for pod %s/%s", ns, name)184	defer m.stopPodLogs(ns, name)185	inf, ok := m.nsInformers[ns]186	if !ok {187		m.l.Printf("could not tail logs for %s/%s, because its namespace informer is missing", ns, name)188		return189	}190	pl := inf.Core().V1().Pods().Lister()191	pod, err := pl.Pods(ns).Get(name)192	if err != nil {193		if apierrors.IsNotFound(err) {194			m.l.Printf("ignoring deleted pod %s/%s", ns, name)195			return196		}197	}198	wg := sync.WaitGroup{}199	// TODO(ripta): handle init containers, which means we need to re-enter tailPodLogs200	for _, container := range pod.Spec.Containers {201		wg.Add(1)202		go m.tailPodContainerLogs(ctx, pl, ns, name, container.Name)203	}204	wg.Wait()205}206func (m *Manager) tailPodContainerLogs(ctx context.Context, pl listerv1.PodLister, ns, name, cn string) {207	key := fmt.Sprintf("%s/%s/%s", ns, name, cn)208	m.mu.Lock()209	m.containerTails[key] = true210	m.mu.Unlock()211	defer func() {212		m.mu.Lock()213		m.containerTails[key] = false214		m.mu.Unlock()215	}()216	m.l.Printf("starting tail of logs for container %s", key)217	plo := v1.PodLogOptions{218		Container: cn,219		Follow:    true,220		SinceTime: &metav1.Time{221			Time: time.Now().Add(m.lookback),222		},223	}224	for {225		if _, err := pl.Pods(ns).Get(name); err != nil {226			if apierrors.IsTooManyRequests(err) {227				// TODO(ripta): add jitter228				time.Sleep(5 * time.Second)229				m.l.Printf("got throttled by apiserver while asking about container %s", key)230				continue231			}232			if apierrors.IsNotFound(err) {233				m.l.Printf("ignoring container %s belonging to deleted pod %s/%s", cn, ns, name)234				return235			}236		}237		req := m.Interface.CoreV1().Pods(ns).GetLogs(name, &plo)238		stream, err := req.Context(ctx).Stream()239		if err != nil {240			// TODO(ripta): add jitter241			time.Sleep(5 * time.Second)242			m.l.Printf("could not tail container %s: %+v", key, err)243			continue244		}245		defer stream.Close()246		// lag := time.NewTimer(time.Millisecond)247		// defer lag.Stop()248		m.l.Printf("streaming logs for container %s", key)249		m.logCh <- logger.LogLine{250			Type: logger.LogLineTypeAxe,251			Text: fmt.Sprintf("streaming logs for container %s", key),252		}253		scanner := bufio.NewScanner(stream)254		for scanner.Scan() {255			line := logger.LogLine{256				Type:      logger.LogLineTypeContainer,257				Namespace: ns,258				Name:      name,259				Text:      scanner.Text(),260			}261			select {262			case m.logCh <- line:263			// case <-lag.C:264			// 	m.l.Printf("event buffer full, dropping logs for %s/%s", ns, name)265			case <-ctx.Done():266				m.l.Printf("stopped tailing container %s", key)267				return268			}269		}270		plo.SinceTime.Time = time.Now()271		m.l.Printf("end of logs for container %s", key)272		if err := scanner.Err(); err != nil {273			m.l.Printf("error tailing container %s: %+v", key, err)274		}275		// TODO(ripta): add jitter276		time.Sleep(5 * time.Second)277	}278}...inspect.go
Source:inspect.go  
...78			ui.Debug("pod", pod.GetNamespace()+"/"+pod.GetName(), "status", string(pod.Status.Phase))79			switch pod.Status.Phase {80			case corev1.PodRunning:81				logrus.Debug("tailing pod logs: immediately")82				return c.executor.TailPodLogs(ctx, pod, logs)83			case corev1.PodFailed:84				err := fmt.Errorf("can't get pod logs, pod failed: %s/%s", pod.Namespace, pod.Name)85				logrus.Error(err.Error())86				return c.GetLastLogLineError(ctx, pod)87			default:88				logrus.Debug("tailing job logs: waiting for pod to be ready")89				if err = wait.PollImmediate(pollInterval, pollTimeout, IsPodReady(ctx, c.client, client.ObjectKeyFromObject(&pod))); err != nil {90					ui.ExitOnError("poll immediate error when tailing logs", err)91					return c.GetLastLogLineError(ctx, pod)92				}93				logrus.Debug("tailing pod logs")94				return c.executor.TailPodLogs(ctx, pod, logs)95			}96		}97	}98	return99}100// GetLastLogLineError return error if last line is failed101func (c TestInspectionClient) GetLastLogLineError(ctx context.Context, pod corev1.Pod) error {102	log, err := c.GetPodLogError(ctx, pod)103	if err != nil {104		return fmt.Errorf("getPodLogs error: %w", err)105	}106	logrus.Debug("log", "got last log bytes", string(log)) // in case distorted log bytes107	entry, err := output.GetLogEntry(log)108	if err != nil {...executor.go
Source:executor.go  
...136	}137	return logs, nil138}139*/140func (e *Executor) TailPodLogs(ctx context.Context, pod corev1.Pod, logs chan []byte) (err error) {141	count := int64(1)142	var containers []string143	for _, container := range pod.Spec.InitContainers {144		containers = append(containers, container.Name)145	}146	for _, container := range pod.Spec.Containers {147		containers = append(containers, container.Name)148	}149	// go func() {150	defer close(logs)151	for _, container := range containers {152		podLogOptions := corev1.PodLogOptions{153			Follow:    true,154			TailLines: &count,155			Container: container,156		}157		podLogRequest := e.KubeClient.CoreV1().158			Pods(pod.GetNamespace()).159			GetLogs(pod.GetName(), &podLogOptions)160		stream, err := podLogRequest.Stream(ctx)161		if err != nil {162			logrus.Error("stream error", "error", err)163			continue164		}165		scanner := bufio.NewScanner(stream)166		// set default bufio scanner buffer (to limit bufio.Scanner: token too long errors on very long lines)167		buf := make([]byte, 0, 64*1024)168		scanner.Buffer(buf, 1024*1024)169		for scanner.Scan() {170			logrus.Debug("TailPodLogs stream scan", "out", scanner.Text(), "pod", pod.Name)171			logs <- scanner.Bytes()172		}173		if scanner.Err() != nil {174			return errors.Wrapf(scanner.Err(), "scanner error")175		}176	}177	// }()178	return179}...TailPodLogs
Using AI Code Generation
1import (2func main() {3	clientset := createClientSet()4	clientset := createFakeClientSet()5	clientset := createFakeClientSetWithRecorder()6	clientset := createFakeClientSetWithRESTConfig()7	clientset := createFakeClientSetWithRESTConfigAndScheme()8	clientset := createFakeClientSetWithRESTConfigAndSchemeAndRecorder()9	clientset := createFakeClientSetWithRESTConfigAndSchemeAndRecorderAndCorev1()10	clientset := createFakeClientSetWithRESTConfigAndSchemeAndRecorderAndCorev1AndFakeClientset()11	clientset := createFakeClientSetWithRESTConfigAndSchemeAndRecorderAndCorev1AndFakeClientsetAndPod()12	clientset := createClientSetWithRESTConfig()13	clientset := createFakeClientSetWithRESTConfig()TailPodLogs
Using AI Code Generation
1func main() {2    config, err := clientcmd.BuildConfigFromFlags("", "/home/user/.kube/config")3    if err != nil {4        panic(err.Error())5    }6    clientset, err := kubernetes.NewForConfig(config)7    if err != nil {8        panic(err.Error())9    }10    client := clientset.CoreV1().RESTClient()11    request := client.Get().Namespace("default").Resource("pods").Name("my-pod").SubResource("log").Param("follow", "true")12    watch, err := request.Stream()13    if err != nil {14        panic(err.Error())15    }16    defer watch.Close()17    buf := new(bytes.Buffer)18    scanner := bufio.NewScanner(watch)19    for scanner.Scan() {20        buf.Write(scanner.Bytes())21        fmt.Println(buf.String())22        buf.Reset()23    }24}TailPodLogs
Using AI Code Generation
1import (2func main() {3	config, err := rest.InClusterConfig()4	if err != nil {5		config, err = clientcmd.BuildConfigFromFlags("", "C:\\Users\\sudhanshu\\.kube\\config")6		if err != nil {7			panic(err.Error())8		}9	}10	clientset, err := kubernetes.NewForConfig(config)11	if err != nil {12		panic(err.Error())13	}14	logs, err := clientset.CoreV1().Pods("default").GetLogs("myapp-pod", &v1.PodLogOptions{}).Stream()15	if err != nil {16		panic(err.Error())17	}18	defer logs.Close()19	buf := new(bytes.Buffer)20	buf.ReadFrom(logs)21	newStr := buf.String()22	fmt.Println(newStr)23}24import (25func main() {26	config, err := rest.InClusterConfig()27	if err != nil {28		config, err = clientcmd.BuildConfigFromFlags("", "C:\\Users\\sudhanshu\\.kube\\config")29		if err != nil {30			panic(err.Error())31		}32	}33	clientset, err := kubernetes.NewForConfig(config)34	if err != nil {35		panic(err.Error())36	}37	watcher, err := watch.NewStreamWatcher(38		&watch.HTTPWatcher{39			Client: clientset.CoreV1().RESTClient(),40			Codec:  runtime.NewParameterCodec(clientset.CoreV1().RESTClient().GetConfig()),41		},TailPodLogs
Using AI Code Generation
1import (2func main() {3	config, err := rest.InClusterConfig()4	if err != nil {5		panic(err.Error())6	}7	clientset, err := kubernetes.NewForConfig(config)8	if err != nil {9		panic(err.Error())10	}11	for {12		watcher, err := clientset.CoreV1().Pods("").Watch(context.Background(), v1.ListOptions{})13		if err != nil {14			panic(err.Error())15		}16		for event := range watcher.ResultChan() {17			if event.Type == watch.Added {18				pod := event.Object.(*v1.Pod)19				fmt.Printf("%s %s %s20				if pod.Status.Phase == "Running" {21					fmt.Printf("Pod %s is running22					go func(pod *v1.Pod) {23						req := clientset.CoreV1().Pods(pod.Namespace).GetLogs(pod.Name, &v1.PodLogOptions{})24						podLogs, err := req.Stream(context.Background())25						if err != nil {26							panic(err.Error())27						}28						defer podLogs.Close()29						buf := make([]byte, 1024)30						for {31							n, err := podLogs.Read(buf)32							if err != nil && err != io.EOF {33								log.Fatal(err)34							}35							if n == 0 {36							}37							fmt.Printf("%s", string(buf[:n]))38						}39					}(pod)40				}41			}42		}43		time.Sleep(5 * time.Second)44	}45}TailPodLogs
Using AI Code Generation
1import (2func main() {3	config, err := rest.InClusterConfig()4	if err != nil {5		fmt.Println("Failed to get in-cluster config. Trying to load admin.kubeconfig file.")6		kubeconfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(7			&clientcmd.ClientConfigLoadingRules{ExplicitPath: "/root/admin.kubeconfig"},8			&clientcmd.ConfigOverrides{},9		config, err = kubeconfig.ClientConfig()10		if err != nil {11			panic(err)12		}13	}14	appsClient, err := versioned.NewForConfig(config)15	if err != nil {16		panic(err)17	}18	coreClient, err := kubernetes.NewForConfig(config)19	if err != nil {20		panic(err)21	}22	dep, err := appsClient.AppsV1().DeploymentConfigs("default").Get(context.Background(), "myapp", metav1.GetOptions{})23	if err != nil {TailPodLogs
Using AI Code Generation
1import (2func main() {3	config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)4	if err != nil {5		log.Fatal(err)6	}7	clientset, err := kubernetes.NewForConfig(config)8	if err != nil {9		log.Fatal(err)10	}11	pods, err := clientset.CoreV1().Pods("").List(metav1.ListOptions{})12	if err != nil {13		log.Fatal(err)14	}15	for _, pod := range pods.Items {16		fmt.Printf("Pod Name: %s17	}18	clientset.CoreV1().Pods(namespace).GetLogs(podName, &v1.PodLogOptions{}).Stream()19}20import (21func main() {22	config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)23	if err != nil {24		log.Fatal(err)25	}26	clientset, err := kubernetes.NewForConfig(config)27	if err != nil {28		log.Fatal(err)29	}30	pods, err := clientset.CoreV1().Pods("").List(metav1.ListOptions{})31	if err != nil {32		log.Fatal(err)33	}TailPodLogs
Using AI Code Generation
1import (2func main() {3	config, err := rest.InClusterConfig()4	if err != nil {5		kubeconfig := flag.String("kubeconfig", "", "absolute path to the kubeconfig file")6		flag.Parse()7		config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig)8		if err != nil {9			panic(err.Error())10		}11	}12	clientset, err := kubernetes.NewForConfig(config)13	if err != nil {14		panic(err.Error())15	}16	logs, err := clientset.CoreV1().Pods(namespace).GetLogs(podName, &v1.PodLogOptions{}).DoRaw()17	if err != nil {18		panic(err.Error())19	}20	fmt.Println(string(logs))21	logs, err = clientset.CoreV1().Pods(namespace).GetLogs(podName, &v1.PodLogOptions{22	}).DoRaw()23	if err != nil {24		panic(err.Error())25	}26	fmt.Println(string(logs))27	logFile, err := os.Create("pod-logs.txt")28	if err != nil {29		log.Fatal(err)30	}31	defer logFile.Close()32	logs, err = clientset.CoreV1().Pods(namespace).GetLogs(podName, &v1.PodLogOptions{33	}).Stream()34	if err != nil {35		panic(err.Error())36	}37	defer logs.Close()38	_, err = io.Copy(logFile, logs)39	if err != nil {40		log.Fatal(err)41	}42}Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
