From ac1f200335795550cab0ff9d8020f777ec6423c3 Mon Sep 17 00:00:00 2001 From: Kacper Sawicki Date: Tue, 9 Dec 2025 09:43:09 +0000 Subject: [PATCH 1/4] feat: add KinD integration tests Add integration tests that run against a real Kubernetes cluster using KinD (Kubernetes in Docker). This addresses #58. Changes: - Add integration_test.go with tests for: - Pod events (create/delete) - ReplicaSet events - Multi-namespace support - Label selector filtering - Add .github/workflows/integration.yaml CI workflow that: - Runs lint and unit tests first - Then runs integration tests with KinD - 10 minute timeout - Add scripts/kind-setup.sh for local development - Update README.md with integration test documentation The integration tests use the existing fakeAgentAPI to mock the Coder server, focusing on validating real Kubernetes informer behavior. --- .github/workflows/integration.yaml | 61 ++++ README.md | 39 +++ integration_test.go | 543 +++++++++++++++++++++++++++++ scripts/kind-setup.sh | 109 ++++++ 4 files changed, 752 insertions(+) create mode 100644 .github/workflows/integration.yaml create mode 100644 integration_test.go create mode 100755 scripts/kind-setup.sh diff --git a/.github/workflows/integration.yaml b/.github/workflows/integration.yaml new file mode 100644 index 0000000..9666564 --- /dev/null +++ b/.github/workflows/integration.yaml @@ -0,0 +1,61 @@ +name: integration + +on: + push: + branches: + - main + pull_request: + +permissions: + actions: none + checks: none + contents: read + deployments: none + issues: none + packages: none + pull-requests: none + repository-projects: none + security-events: none + statuses: none + +# Cancel in-progress runs for pull requests when developers push +# additional changes +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: ${{ github.event_name == 'pull_request' }} + +jobs: + unit-test: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Setup Go + uses: actions/setup-go@v5 + with: + go-version: "~1.22" + + - name: Test + run: go test ./... -race + + integration-test: + needs: [unit-test] + runs-on: ubuntu-latest + timeout-minutes: 10 + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Setup Go + uses: actions/setup-go@v5 + with: + go-version: "~1.22" + + - name: Create KinD cluster + uses: helm/kind-action@v1 + with: + cluster_name: integration-test + + - name: Run integration tests + run: go test -tags=integration -v -timeout=8m ./... diff --git a/README.md b/README.md index bf55d8d..6a4f300 100644 --- a/README.md +++ b/README.md @@ -64,3 +64,42 @@ Kubernetes provides an [informers](https://pkg.go.dev/k8s.io/client-go/informers - [`SSL_CERT_FILE`](https://go.dev/src/crypto/x509/root_unix.go#L19): Specifies the path to an SSL certificate. - [`SSL_CERT_DIR`](https://go.dev/src/crypto/x509/root_unix.go#L25): Identifies which directory to check for SSL certificate files. + +## Development + +### Running Tests + +Unit tests can be run with: + +```console +go test ./... -race +``` + +### Integration Tests + +Integration tests run against a real Kubernetes cluster using [KinD (Kubernetes in Docker)](https://kind.sigs.k8s.io/). + +**Prerequisites:** +- [Docker](https://docs.docker.com/get-docker/) +- [KinD](https://kind.sigs.k8s.io/docs/user/quick-start/#installation) +- [kubectl](https://kubernetes.io/docs/tasks/tools/) + +**Setup and run:** + +```console +# Create a KinD cluster +./scripts/kind-setup.sh create + +# Run integration tests +go test -tags=integration -v ./... + +# Clean up when done +./scripts/kind-setup.sh delete +``` + +The integration tests validate: +- Pod event streaming with real Kubernetes informers +- ReplicaSet event handling +- Multi-namespace support +- Label selector filtering + diff --git a/integration_test.go b/integration_test.go new file mode 100644 index 0000000..e08da9f --- /dev/null +++ b/integration_test.go @@ -0,0 +1,543 @@ +//go:build integration + +package main + +import ( + "context" + "fmt" + "net/url" + "os" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" + + "cdr.dev/slog" + "cdr.dev/slog/sloggers/slogtest" + "github.com/coder/quartz" +) + +// getKubeClient creates a Kubernetes client from the default kubeconfig. +// It will use KUBECONFIG env var if set, otherwise ~/.kube/config. +func getKubeClient(t *testing.T) kubernetes.Interface { + t.Helper() + + kubeconfig := os.Getenv("KUBECONFIG") + if kubeconfig == "" { + home, err := os.UserHomeDir() + require.NoError(t, err, "failed to get user home dir") + kubeconfig = home + "/.kube/config" + } + + config, err := clientcmd.BuildConfigFromFlags("", kubeconfig) + require.NoError(t, err, "failed to build kubeconfig") + + client, err := kubernetes.NewForConfig(config) + require.NoError(t, err, "failed to create kubernetes client") + + return client +} + +// createTestNamespace creates a unique namespace for test isolation. +// It registers cleanup to delete the namespace after the test. +func createTestNamespace(t *testing.T, ctx context.Context, client kubernetes.Interface) string { + t.Helper() + + name := fmt.Sprintf("logstream-test-%d", time.Now().UnixNano()) + + ns := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + } + + _, err := client.CoreV1().Namespaces().Create(ctx, ns, metav1.CreateOptions{}) + require.NoError(t, err, "failed to create test namespace") + + t.Cleanup(func() { + // Use a fresh context for cleanup in case the test context is cancelled + cleanupCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + err := client.CoreV1().Namespaces().Delete(cleanupCtx, name, metav1.DeleteOptions{}) + if err != nil { + t.Logf("warning: failed to delete test namespace %s: %v", name, err) + } + }) + + return name +} + +// waitForLogs waits for logs to be received on the channel with a timeout. +func waitForLogs(t *testing.T, ctx context.Context, api *fakeAgentAPI, timeout time.Duration) []string { + t.Helper() + + timeoutCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + select { + case logs := <-api.logs: + var outputs []string + for _, log := range logs { + outputs = append(outputs, log.Output) + } + return outputs + case <-timeoutCtx.Done(): + t.Fatal("timeout waiting for logs") + return nil + } +} + +// waitForLogSource waits for log source registration with a timeout. +func waitForLogSource(t *testing.T, ctx context.Context, api *fakeAgentAPI, timeout time.Duration) { + t.Helper() + + timeoutCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + select { + case <-api.logSource: + return + case <-timeoutCtx.Done(): + t.Fatal("timeout waiting for log source registration") + } +} + +func TestIntegration_PodEvents(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + + client := getKubeClient(t) + namespace := createTestNamespace(t, ctx, client) + + // Start fake Coder API server + api := newFakeAgentAPI(t) + defer api.server.Close() + + agentURL, err := url.Parse(api.server.URL) + require.NoError(t, err) + + // Create the pod event logger + clock := quartz.NewMock(t) + reporter, err := newPodEventLogger(ctx, podEventLoggerOptions{ + client: client, + coderURL: agentURL, + namespaces: []string{namespace}, + logger: slogtest.Make(t, nil).Leveled(slog.LevelDebug), + logDebounce: 30 * time.Second, + clock: clock, + }) + require.NoError(t, err) + defer reporter.Close() + + // Wait a bit for informers to sync + time.Sleep(500 * time.Millisecond) + + // Create a pod with CODER_AGENT_TOKEN + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Namespace: namespace, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test-container", + Image: "busybox:latest", + Command: []string{"sleep", "3600"}, + Env: []corev1.EnvVar{ + { + Name: "CODER_AGENT_TOKEN", + Value: "test-token-integration", + }, + }, + }, + }, + // Use a non-existent node to keep the pod in Pending state + // This avoids needing to actually run the container + NodeSelector: map[string]string{ + "non-existent-label": "non-existent-value", + }, + }, + } + + _, err = client.CoreV1().Pods(namespace).Create(ctx, pod, metav1.CreateOptions{}) + require.NoError(t, err) + + // Wait for log source registration + waitForLogSource(t, ctx, api, 30*time.Second) + + // Wait for the "Created pod" log + logs := waitForLogs(t, ctx, api, 30*time.Second) + require.NotEmpty(t, logs) + + var foundCreatedPod bool + for _, log := range logs { + if strings.Contains(log, "Created pod") { + foundCreatedPod = true + break + } + } + require.True(t, foundCreatedPod, "expected 'Created pod' log, got: %v", logs) + + // Delete the pod and verify deletion event + err = client.CoreV1().Pods(namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}) + require.NoError(t, err) + + // Wait for the "Deleted pod" log + logs = waitForLogs(t, ctx, api, 30*time.Second) + require.NotEmpty(t, logs) + + var foundDeletedPod bool + for _, log := range logs { + if strings.Contains(log, "Deleted pod") { + foundDeletedPod = true + break + } + } + require.True(t, foundDeletedPod, "expected 'Deleted pod' log, got: %v", logs) +} + +func TestIntegration_ReplicaSetEvents(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + + client := getKubeClient(t) + namespace := createTestNamespace(t, ctx, client) + + // Start fake Coder API server + api := newFakeAgentAPI(t) + defer api.server.Close() + + agentURL, err := url.Parse(api.server.URL) + require.NoError(t, err) + + // Create the pod event logger + clock := quartz.NewMock(t) + reporter, err := newPodEventLogger(ctx, podEventLoggerOptions{ + client: client, + coderURL: agentURL, + namespaces: []string{namespace}, + logger: slogtest.Make(t, nil).Leveled(slog.LevelDebug), + logDebounce: 30 * time.Second, + clock: clock, + }) + require.NoError(t, err) + defer reporter.Close() + + // Wait a bit for informers to sync + time.Sleep(500 * time.Millisecond) + + // Create a ReplicaSet with CODER_AGENT_TOKEN + replicas := int32(1) + rs := &appsv1.ReplicaSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-rs", + Namespace: namespace, + }, + Spec: appsv1.ReplicaSetSpec{ + Replicas: &replicas, + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "test-rs", + }, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app": "test-rs", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test-container", + Image: "busybox:latest", + Command: []string{"sleep", "3600"}, + Env: []corev1.EnvVar{ + { + Name: "CODER_AGENT_TOKEN", + Value: "test-token-rs-integration", + }, + }, + }, + }, + // Use a non-existent node to keep pods in Pending state + NodeSelector: map[string]string{ + "non-existent-label": "non-existent-value", + }, + }, + }, + }, + } + + _, err = client.AppsV1().ReplicaSets(namespace).Create(ctx, rs, metav1.CreateOptions{}) + require.NoError(t, err) + + // Wait for log source registration + waitForLogSource(t, ctx, api, 30*time.Second) + + // Wait for the "Queued pod from ReplicaSet" log + logs := waitForLogs(t, ctx, api, 30*time.Second) + require.NotEmpty(t, logs) + + var foundQueuedPod bool + for _, log := range logs { + if strings.Contains(log, "Queued pod from ReplicaSet") { + foundQueuedPod = true + break + } + } + require.True(t, foundQueuedPod, "expected 'Queued pod from ReplicaSet' log, got: %v", logs) + + // Delete the ReplicaSet + err = client.AppsV1().ReplicaSets(namespace).Delete(ctx, rs.Name, metav1.DeleteOptions{}) + require.NoError(t, err) + + // Wait for the "Deleted ReplicaSet" log + logs = waitForLogs(t, ctx, api, 30*time.Second) + require.NotEmpty(t, logs) + + var foundDeletedRS bool + for _, log := range logs { + if strings.Contains(log, "Deleted ReplicaSet") { + foundDeletedRS = true + break + } + } + require.True(t, foundDeletedRS, "expected 'Deleted ReplicaSet' log, got: %v", logs) +} + +func TestIntegration_MultiNamespace(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + + client := getKubeClient(t) + + // Create two namespaces + namespace1 := createTestNamespace(t, ctx, client) + namespace2 := createTestNamespace(t, ctx, client) + + // Start fake Coder API server + api := newFakeAgentAPI(t) + defer api.server.Close() + + agentURL, err := url.Parse(api.server.URL) + require.NoError(t, err) + + // Create the pod event logger watching both namespaces + clock := quartz.NewMock(t) + reporter, err := newPodEventLogger(ctx, podEventLoggerOptions{ + client: client, + coderURL: agentURL, + namespaces: []string{namespace1, namespace2}, + logger: slogtest.Make(t, nil).Leveled(slog.LevelDebug), + logDebounce: 30 * time.Second, + clock: clock, + }) + require.NoError(t, err) + defer reporter.Close() + + // Wait for informers to sync + time.Sleep(500 * time.Millisecond) + + // Create a pod in namespace1 + pod1 := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod-ns1", + Namespace: namespace1, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test-container", + Image: "busybox:latest", + Command: []string{"sleep", "3600"}, + Env: []corev1.EnvVar{ + { + Name: "CODER_AGENT_TOKEN", + Value: "test-token-ns1", + }, + }, + }, + }, + NodeSelector: map[string]string{ + "non-existent-label": "non-existent-value", + }, + }, + } + + _, err = client.CoreV1().Pods(namespace1).Create(ctx, pod1, metav1.CreateOptions{}) + require.NoError(t, err) + + // Wait for log source and logs from first pod + waitForLogSource(t, ctx, api, 30*time.Second) + logs := waitForLogs(t, ctx, api, 30*time.Second) + require.NotEmpty(t, logs) + + // Create a pod in namespace2 + pod2 := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod-ns2", + Namespace: namespace2, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test-container", + Image: "busybox:latest", + Command: []string{"sleep", "3600"}, + Env: []corev1.EnvVar{ + { + Name: "CODER_AGENT_TOKEN", + Value: "test-token-ns2", + }, + }, + }, + }, + NodeSelector: map[string]string{ + "non-existent-label": "non-existent-value", + }, + }, + } + + _, err = client.CoreV1().Pods(namespace2).Create(ctx, pod2, metav1.CreateOptions{}) + require.NoError(t, err) + + // Wait for log source and logs from second pod + waitForLogSource(t, ctx, api, 30*time.Second) + logs = waitForLogs(t, ctx, api, 30*time.Second) + require.NotEmpty(t, logs) + + // Both namespaces should have received events + t.Log("Successfully received events from both namespaces") +} + +func TestIntegration_LabelSelector(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + + client := getKubeClient(t) + namespace := createTestNamespace(t, ctx, client) + + // Start fake Coder API server + api := newFakeAgentAPI(t) + defer api.server.Close() + + agentURL, err := url.Parse(api.server.URL) + require.NoError(t, err) + + // Create the pod event logger with a label selector + clock := quartz.NewMock(t) + reporter, err := newPodEventLogger(ctx, podEventLoggerOptions{ + client: client, + coderURL: agentURL, + namespaces: []string{namespace}, + labelSelector: "coder-workspace=true", + logger: slogtest.Make(t, nil).Leveled(slog.LevelDebug), + logDebounce: 30 * time.Second, + clock: clock, + }) + require.NoError(t, err) + defer reporter.Close() + + // Wait for informers to sync + time.Sleep(500 * time.Millisecond) + + // Create a pod WITHOUT the matching label - should be ignored + podNoLabel := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod-no-label", + Namespace: namespace, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test-container", + Image: "busybox:latest", + Command: []string{"sleep", "3600"}, + Env: []corev1.EnvVar{ + { + Name: "CODER_AGENT_TOKEN", + Value: "test-token-no-label", + }, + }, + }, + }, + NodeSelector: map[string]string{ + "non-existent-label": "non-existent-value", + }, + }, + } + + _, err = client.CoreV1().Pods(namespace).Create(ctx, podNoLabel, metav1.CreateOptions{}) + require.NoError(t, err) + + // Wait a bit to ensure the pod without label is not picked up + time.Sleep(2 * time.Second) + + // Create a pod WITH the matching label - should be tracked + podWithLabel := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod-with-label", + Namespace: namespace, + Labels: map[string]string{ + "coder-workspace": "true", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test-container", + Image: "busybox:latest", + Command: []string{"sleep", "3600"}, + Env: []corev1.EnvVar{ + { + Name: "CODER_AGENT_TOKEN", + Value: "test-token-with-label", + }, + }, + }, + }, + NodeSelector: map[string]string{ + "non-existent-label": "non-existent-value", + }, + }, + } + + _, err = client.CoreV1().Pods(namespace).Create(ctx, podWithLabel, metav1.CreateOptions{}) + require.NoError(t, err) + + // Wait for log source registration - this should only happen for the labeled pod + waitForLogSource(t, ctx, api, 30*time.Second) + + // Wait for logs + logs := waitForLogs(t, ctx, api, 30*time.Second) + require.NotEmpty(t, logs) + + // Verify that the log is for the labeled pod, not the unlabeled one + var foundLabeledPod bool + for _, log := range logs { + if strings.Contains(log, "Created pod") && strings.Contains(log, "test-pod-with-label") { + foundLabeledPod = true + break + } + // Make sure we didn't get logs for the unlabeled pod + require.NotContains(t, log, "test-pod-no-label", "should not receive logs for unlabeled pod") + } + require.True(t, foundLabeledPod, "expected 'Created pod' log for labeled pod, got: %v", logs) +} diff --git a/scripts/kind-setup.sh b/scripts/kind-setup.sh new file mode 100755 index 0000000..31a2eda --- /dev/null +++ b/scripts/kind-setup.sh @@ -0,0 +1,109 @@ +#!/usr/bin/env bash + +# This script sets up a KinD cluster for running integration tests locally. +# Usage: ./scripts/kind-setup.sh [create|delete] + +set -euo pipefail + +CLUSTER_NAME="${KIND_CLUSTER_NAME:-logstream-integration-test}" + +usage() { + echo "Usage: $0 [create|delete|status]" + echo "" + echo "Commands:" + echo " create - Create a KinD cluster for integration tests" + echo " delete - Delete the KinD cluster" + echo " status - Check if the cluster exists and is running" + echo "" + echo "Environment variables:" + echo " KIND_CLUSTER_NAME - Name of the cluster (default: logstream-integration-test)" + exit 1 +} + +check_kind() { + if ! command -v kind &> /dev/null; then + echo "Error: 'kind' is not installed." + echo "Install it from: https://kind.sigs.k8s.io/docs/user/quick-start/#installation" + exit 1 + fi +} + +check_kubectl() { + if ! command -v kubectl &> /dev/null; then + echo "Error: 'kubectl' is not installed." + echo "Install it from: https://kubernetes.io/docs/tasks/tools/" + exit 1 + fi +} + +cluster_exists() { + kind get clusters 2>/dev/null | grep -q "^${CLUSTER_NAME}$" +} + +create_cluster() { + check_kind + check_kubectl + + if cluster_exists; then + echo "Cluster '${CLUSTER_NAME}' already exists." + echo "Use '$0 delete' to remove it first, or '$0 status' to check its status." + exit 0 + fi + + echo "Creating KinD cluster '${CLUSTER_NAME}'..." + kind create cluster --name "${CLUSTER_NAME}" --wait 60s + + echo "" + echo "Cluster created successfully!" + echo "" + echo "To run integration tests:" + echo " go test -tags=integration -v ./..." + echo "" + echo "To delete the cluster when done:" + echo " $0 delete" +} + +delete_cluster() { + check_kind + + if ! cluster_exists; then + echo "Cluster '${CLUSTER_NAME}' does not exist." + exit 0 + fi + + echo "Deleting KinD cluster '${CLUSTER_NAME}'..." + kind delete cluster --name "${CLUSTER_NAME}" + echo "Cluster deleted successfully!" +} + +status_cluster() { + check_kind + + if cluster_exists; then + echo "Cluster '${CLUSTER_NAME}' exists." + echo "" + echo "Cluster info:" + kubectl cluster-info --context "kind-${CLUSTER_NAME}" 2>/dev/null || echo " (unable to get cluster info)" + echo "" + echo "Nodes:" + kubectl get nodes --context "kind-${CLUSTER_NAME}" 2>/dev/null || echo " (unable to get nodes)" + else + echo "Cluster '${CLUSTER_NAME}' does not exist." + echo "Use '$0 create' to create it." + fi +} + +case "${1:-}" in + create) + create_cluster + ;; + delete) + delete_cluster + ;; + status) + status_cluster + ;; + *) + usage + ;; +esac From 6558412c48abb3835518ec1b4552d9bf6106f7e7 Mon Sep 17 00:00:00 2001 From: Kacper Sawicki Date: Tue, 9 Dec 2025 10:06:25 +0000 Subject: [PATCH 2/4] fix: use real clock for integration tests The integration tests were using quartz.NewMock(t) which creates a mock clock that doesn't advance automatically. This caused timeouts when waiting for log source registration because the timers in the log queuer never fired. Changes: - Remove mock clock usage from all integration tests - Use real clock (nil) which is the default - Reduce logDebounce to 5s for faster test execution - Increase informer sync wait to 1s for reliability --- integration_test.go | 29 ++++++++++++----------------- 1 file changed, 12 insertions(+), 17 deletions(-) diff --git a/integration_test.go b/integration_test.go index e08da9f..94d66cc 100644 --- a/integration_test.go +++ b/integration_test.go @@ -20,7 +20,6 @@ import ( "cdr.dev/slog" "cdr.dev/slog/sloggers/slogtest" - "github.com/coder/quartz" ) // getKubeClient creates a Kubernetes client from the default kubeconfig. @@ -126,20 +125,19 @@ func TestIntegration_PodEvents(t *testing.T) { require.NoError(t, err) // Create the pod event logger - clock := quartz.NewMock(t) + // Note: We don't set clock, so it uses a real clock for integration tests reporter, err := newPodEventLogger(ctx, podEventLoggerOptions{ client: client, coderURL: agentURL, namespaces: []string{namespace}, logger: slogtest.Make(t, nil).Leveled(slog.LevelDebug), - logDebounce: 30 * time.Second, - clock: clock, + logDebounce: 5 * time.Second, // Use shorter debounce for faster tests }) require.NoError(t, err) defer reporter.Close() // Wait a bit for informers to sync - time.Sleep(500 * time.Millisecond) + time.Sleep(1 * time.Second) // Create a pod with CODER_AGENT_TOKEN pod := &corev1.Pod{ @@ -223,20 +221,19 @@ func TestIntegration_ReplicaSetEvents(t *testing.T) { require.NoError(t, err) // Create the pod event logger - clock := quartz.NewMock(t) + // Note: We don't set clock, so it uses a real clock for integration tests reporter, err := newPodEventLogger(ctx, podEventLoggerOptions{ client: client, coderURL: agentURL, namespaces: []string{namespace}, logger: slogtest.Make(t, nil).Leveled(slog.LevelDebug), - logDebounce: 30 * time.Second, - clock: clock, + logDebounce: 5 * time.Second, // Use shorter debounce for faster tests }) require.NoError(t, err) defer reporter.Close() // Wait a bit for informers to sync - time.Sleep(500 * time.Millisecond) + time.Sleep(1 * time.Second) // Create a ReplicaSet with CODER_AGENT_TOKEN replicas := int32(1) @@ -338,20 +335,19 @@ func TestIntegration_MultiNamespace(t *testing.T) { require.NoError(t, err) // Create the pod event logger watching both namespaces - clock := quartz.NewMock(t) + // Note: We don't set clock, so it uses a real clock for integration tests reporter, err := newPodEventLogger(ctx, podEventLoggerOptions{ client: client, coderURL: agentURL, namespaces: []string{namespace1, namespace2}, logger: slogtest.Make(t, nil).Leveled(slog.LevelDebug), - logDebounce: 30 * time.Second, - clock: clock, + logDebounce: 5 * time.Second, // Use shorter debounce for faster tests }) require.NoError(t, err) defer reporter.Close() // Wait for informers to sync - time.Sleep(500 * time.Millisecond) + time.Sleep(1 * time.Second) // Create a pod in namespace1 pod1 := &corev1.Pod{ @@ -442,21 +438,20 @@ func TestIntegration_LabelSelector(t *testing.T) { require.NoError(t, err) // Create the pod event logger with a label selector - clock := quartz.NewMock(t) + // Note: We don't set clock, so it uses a real clock for integration tests reporter, err := newPodEventLogger(ctx, podEventLoggerOptions{ client: client, coderURL: agentURL, namespaces: []string{namespace}, labelSelector: "coder-workspace=true", logger: slogtest.Make(t, nil).Leveled(slog.LevelDebug), - logDebounce: 30 * time.Second, - clock: clock, + logDebounce: 5 * time.Second, // Use shorter debounce for faster tests }) require.NoError(t, err) defer reporter.Close() // Wait for informers to sync - time.Sleep(500 * time.Millisecond) + time.Sleep(1 * time.Second) // Create a pod WITHOUT the matching label - should be ignored podNoLabel := &corev1.Pod{ From c731c5fb76cd8209d255397cc1ab65527dda3acc Mon Sep 17 00:00:00 2001 From: Kacper Sawicki Date: Tue, 9 Dec 2025 10:37:35 +0000 Subject: [PATCH 3/4] fix: remove duplicate unit-test job from integration workflow --- .github/workflows/integration.yaml | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/.github/workflows/integration.yaml b/.github/workflows/integration.yaml index 9666564..319edbb 100644 --- a/.github/workflows/integration.yaml +++ b/.github/workflows/integration.yaml @@ -25,22 +25,7 @@ concurrency: cancel-in-progress: ${{ github.event_name == 'pull_request' }} jobs: - unit-test: - runs-on: ubuntu-latest - steps: - - name: Checkout - uses: actions/checkout@v4 - - - name: Setup Go - uses: actions/setup-go@v5 - with: - go-version: "~1.22" - - - name: Test - run: go test ./... -race - integration-test: - needs: [unit-test] runs-on: ubuntu-latest timeout-minutes: 10 steps: From 7630edab2f62aa1f3ac3c10142d9576c79cee5c3 Mon Sep 17 00:00:00 2001 From: Kacper Sawicki Date: Thu, 11 Dec 2025 09:06:54 +0000 Subject: [PATCH 4/4] fix: make integration tests more robust against event ordering The tests now use waitForLogContaining which continuously collects logs until finding the expected message, rather than expecting specific messages in the first batch of logs received. This fixes flaky tests caused by Kubernetes scheduling events arriving before pod lifecycle events. --- integration_test.go | 102 ++++++++++++++------------------------------ 1 file changed, 31 insertions(+), 71 deletions(-) diff --git a/integration_test.go b/integration_test.go index 94d66cc..453efe6 100644 --- a/integration_test.go +++ b/integration_test.go @@ -73,23 +73,26 @@ func createTestNamespace(t *testing.T, ctx context.Context, client kubernetes.In return name } -// waitForLogs waits for logs to be received on the channel with a timeout. -func waitForLogs(t *testing.T, ctx context.Context, api *fakeAgentAPI, timeout time.Duration) []string { +// waitForLogContaining waits until a log containing the given substring is received. +// It collects all logs seen and returns them along with whether the target was found. +func waitForLogContaining(t *testing.T, ctx context.Context, api *fakeAgentAPI, timeout time.Duration, substring string) (allLogs []string, found bool) { t.Helper() timeoutCtx, cancel := context.WithTimeout(ctx, timeout) defer cancel() - select { - case logs := <-api.logs: - var outputs []string - for _, log := range logs { - outputs = append(outputs, log.Output) + for { + select { + case logs := <-api.logs: + for _, log := range logs { + allLogs = append(allLogs, log.Output) + if strings.Contains(log.Output, substring) { + return allLogs, true + } + } + case <-timeoutCtx.Done(): + return allLogs, false } - return outputs - case <-timeoutCtx.Done(): - t.Fatal("timeout waiting for logs") - return nil } } @@ -173,35 +176,17 @@ func TestIntegration_PodEvents(t *testing.T) { // Wait for log source registration waitForLogSource(t, ctx, api, 30*time.Second) - // Wait for the "Created pod" log - logs := waitForLogs(t, ctx, api, 30*time.Second) - require.NotEmpty(t, logs) - - var foundCreatedPod bool - for _, log := range logs { - if strings.Contains(log, "Created pod") { - foundCreatedPod = true - break - } - } - require.True(t, foundCreatedPod, "expected 'Created pod' log, got: %v", logs) + // Wait for the "Created pod" log (may receive other logs first like scheduling warnings) + logs, found := waitForLogContaining(t, ctx, api, 30*time.Second, "Created pod") + require.True(t, found, "expected 'Created pod' log, got: %v", logs) // Delete the pod and verify deletion event err = client.CoreV1().Pods(namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}) require.NoError(t, err) // Wait for the "Deleted pod" log - logs = waitForLogs(t, ctx, api, 30*time.Second) - require.NotEmpty(t, logs) - - var foundDeletedPod bool - for _, log := range logs { - if strings.Contains(log, "Deleted pod") { - foundDeletedPod = true - break - } - } - require.True(t, foundDeletedPod, "expected 'Deleted pod' log, got: %v", logs) + logs, found = waitForLogContaining(t, ctx, api, 30*time.Second, "Deleted pod") + require.True(t, found, "expected 'Deleted pod' log, got: %v", logs) } func TestIntegration_ReplicaSetEvents(t *testing.T) { @@ -285,34 +270,16 @@ func TestIntegration_ReplicaSetEvents(t *testing.T) { waitForLogSource(t, ctx, api, 30*time.Second) // Wait for the "Queued pod from ReplicaSet" log - logs := waitForLogs(t, ctx, api, 30*time.Second) - require.NotEmpty(t, logs) - - var foundQueuedPod bool - for _, log := range logs { - if strings.Contains(log, "Queued pod from ReplicaSet") { - foundQueuedPod = true - break - } - } - require.True(t, foundQueuedPod, "expected 'Queued pod from ReplicaSet' log, got: %v", logs) + logs, found := waitForLogContaining(t, ctx, api, 30*time.Second, "Queued pod from ReplicaSet") + require.True(t, found, "expected 'Queued pod from ReplicaSet' log, got: %v", logs) // Delete the ReplicaSet err = client.AppsV1().ReplicaSets(namespace).Delete(ctx, rs.Name, metav1.DeleteOptions{}) require.NoError(t, err) // Wait for the "Deleted ReplicaSet" log - logs = waitForLogs(t, ctx, api, 30*time.Second) - require.NotEmpty(t, logs) - - var foundDeletedRS bool - for _, log := range logs { - if strings.Contains(log, "Deleted ReplicaSet") { - foundDeletedRS = true - break - } - } - require.True(t, foundDeletedRS, "expected 'Deleted ReplicaSet' log, got: %v", logs) + logs, found = waitForLogContaining(t, ctx, api, 30*time.Second, "Deleted ReplicaSet") + require.True(t, found, "expected 'Deleted ReplicaSet' log, got: %v", logs) } func TestIntegration_MultiNamespace(t *testing.T) { @@ -380,8 +347,8 @@ func TestIntegration_MultiNamespace(t *testing.T) { // Wait for log source and logs from first pod waitForLogSource(t, ctx, api, 30*time.Second) - logs := waitForLogs(t, ctx, api, 30*time.Second) - require.NotEmpty(t, logs) + logs, found := waitForLogContaining(t, ctx, api, 30*time.Second, "Created pod") + require.True(t, found, "expected 'Created pod' log for first pod, got: %v", logs) // Create a pod in namespace2 pod2 := &corev1.Pod{ @@ -414,8 +381,8 @@ func TestIntegration_MultiNamespace(t *testing.T) { // Wait for log source and logs from second pod waitForLogSource(t, ctx, api, 30*time.Second) - logs = waitForLogs(t, ctx, api, 30*time.Second) - require.NotEmpty(t, logs) + logs, found = waitForLogContaining(t, ctx, api, 30*time.Second, "Created pod") + require.True(t, found, "expected 'Created pod' log for second pod, got: %v", logs) // Both namespaces should have received events t.Log("Successfully received events from both namespaces") @@ -520,19 +487,12 @@ func TestIntegration_LabelSelector(t *testing.T) { // Wait for log source registration - this should only happen for the labeled pod waitForLogSource(t, ctx, api, 30*time.Second) - // Wait for logs - logs := waitForLogs(t, ctx, api, 30*time.Second) - require.NotEmpty(t, logs) + // Wait for logs - look specifically for "Created pod" with the labeled pod name + logs, found := waitForLogContaining(t, ctx, api, 30*time.Second, "Created pod") + require.True(t, found, "expected 'Created pod' log for labeled pod, got: %v", logs) - // Verify that the log is for the labeled pod, not the unlabeled one - var foundLabeledPod bool + // Verify that none of the logs mention the unlabeled pod for _, log := range logs { - if strings.Contains(log, "Created pod") && strings.Contains(log, "test-pod-with-label") { - foundLabeledPod = true - break - } - // Make sure we didn't get logs for the unlabeled pod require.NotContains(t, log, "test-pod-no-label", "should not receive logs for unlabeled pod") } - require.True(t, foundLabeledPod, "expected 'Created pod' log for labeled pod, got: %v", logs) }