refactor: rework app logging, dependency injection and cancellation (#844)

* feat: add new logger

* refactor: use one struct for context handling and cancellation

* refactor: rework logging and config in controllers

* refactor: rework logging and config in middlewares

* refactor: rework logging and cancellation in services

* refactor: rework cli logging

* fix: improve logging in routines

* feat: use sync groups for better cancellation

* refactor: simplify middleware, controller and service init

* tests: fix controller tests

* tests: use require instead of assert where previous step is required

* tests: fix middleware tests

* tests: fix service tests

* tests: fix context tests

* fix: fix typos

* feat: add option to enable or disable concurrent listeners

* fix: assign public key correctly in oidc server

* tests: fix don't try to test logger with char size

* fix: coderabbit comments

* tests: use filepath join instead of path join

* fix: ensure unix socket shutdown doesn't run twice

* chore: remove temp lint file
This commit is contained in:
Stavros
2026-05-10 16:10:36 +03:00
committed by GitHub
parent 1b18e68ce0
commit 4f7335ed73
50 changed files with 1883 additions and 1716 deletions
+64 -60
View File
@@ -9,7 +9,7 @@ import (
"github.com/tinyauthapp/tinyauth/internal/model"
"github.com/tinyauthapp/tinyauth/internal/utils/decoders"
"github.com/tinyauthapp/tinyauth/internal/utils/tlog"
"github.com/tinyauthapp/tinyauth/internal/utils/logger"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
@@ -36,9 +36,10 @@ type ingressApp struct {
}
type KubernetesService struct {
log *logger.Logger
ctx context.Context
client dynamic.Interface
ctx context.Context
cancel context.CancelFunc
started bool
mu sync.RWMutex
ingressApps map[ingressKey][]ingressApp
@@ -46,12 +47,55 @@ type KubernetesService struct {
appNameIndex map[string]ingressAppKey
}
func NewKubernetesService() *KubernetesService {
return &KubernetesService{
func NewKubernetesService(
log *logger.Logger,
ctx context.Context,
wg *sync.WaitGroup,
) (*KubernetesService, error) {
cfg, err := rest.InClusterConfig()
if err != nil {
return nil, fmt.Errorf("failed to get in-cluster kubernetes config: %w", err)
}
client, err := dynamic.NewForConfig(cfg)
if err != nil {
return nil, fmt.Errorf("failed to create kubernetes client: %w", err)
}
gvr := schema.GroupVersionResource{
Group: "networking.k8s.io",
Version: "v1",
Resource: "ingresses",
}
accessCtx, accessCancel := context.WithTimeout(ctx, 5*time.Second)
defer accessCancel()
_, err = client.Resource(gvr).List(accessCtx, metav1.ListOptions{Limit: 1})
if err != nil {
log.App.Warn().Err(err).Str("api", gvr.GroupVersion().String()).Msg("Failed to access Ingress API, Kubernetes label provider will be disabled")
return nil, fmt.Errorf("failed to access ingress api: %w", err)
}
log.App.Debug().Str("api", gvr.GroupVersion().String()).Msg("Successfully accessed Ingress API, starting watcher")
service := &KubernetesService{
log: log,
ctx: ctx,
client: client,
ingressApps: make(map[ingressKey][]ingressApp),
domainIndex: make(map[string]ingressAppKey),
appNameIndex: make(map[string]ingressAppKey),
}
wg.Go(func() {
service.watchGVR(gvr)
})
service.started = true
log.App.Debug().Msg("Kubernetes label provider started successfully")
return service, nil
}
func (k *KubernetesService) addIngressApps(namespace, name string, apps []ingressApp) {
@@ -133,7 +177,7 @@ func (k *KubernetesService) updateFromItem(item *unstructured.Unstructured) {
}
labels, err := decoders.DecodeLabels[model.Apps](annotations, "apps")
if err != nil {
tlog.App.Debug().Err(err).Msg("Failed to decode labels from annotations")
k.log.App.Warn().Err(err).Str("namespace", namespace).Str("name", name).Msg("Failed to decode ingress labels, skipping")
k.removeIngress(namespace, name)
return
}
@@ -161,13 +205,13 @@ func (k *KubernetesService) resyncGVR(gvr schema.GroupVersionResource) error {
list, err := k.client.Resource(gvr).List(ctx, metav1.ListOptions{})
if err != nil {
tlog.App.Debug().Err(err).Str("api", gvr.GroupVersion().String()).Msg("Failed to list ingresses during resync")
k.log.App.Warn().Err(err).Str("api", gvr.GroupVersion().String()).Msg("Failed to list resources for resync")
return err
}
for i := range list.Items {
k.updateFromItem(&list.Items[i])
}
tlog.App.Debug().Str("api", gvr.GroupVersion().String()).Int("count", len(list.Items)).Msg("Resynced ingress cache")
k.log.App.Debug().Str("api", gvr.GroupVersion().String()).Int("count", len(list.Items)).Msg("Resync complete")
return nil
}
@@ -181,14 +225,14 @@ func (k *KubernetesService) runWatcher(gvr schema.GroupVersionResource, w watch.
return false
case event, ok := <-w.ResultChan():
if !ok {
tlog.App.Debug().Str("api", gvr.GroupVersion().String()).Msg("Watcher channel closed, restarting in 5 seconds")
k.log.App.Warn().Str("api", gvr.GroupVersion().String()).Msg("Watcher channel closed, restarting watcher")
w.Stop()
time.Sleep(5 * time.Second)
return true
}
item, ok := event.Object.(*unstructured.Unstructured)
if !ok {
tlog.App.Warn().Str("api", gvr.GroupVersion().String()).Msg("Failed to cast watched object")
k.log.App.Warn().Str("api", gvr.GroupVersion().String()).Msg("Received unexpected event object, skipping")
continue
}
switch event.Type {
@@ -199,7 +243,7 @@ func (k *KubernetesService) runWatcher(gvr schema.GroupVersionResource, w watch.
}
case <-resyncTicker.C:
if err := k.resyncGVR(gvr); err != nil {
tlog.App.Warn().Err(err).Str("api", gvr.GroupVersion().String()).Msg("Periodic resync failed")
k.log.App.Warn().Err(err).Str("api", gvr.GroupVersion().String()).Msg("Periodic resync failed during watcher run")
}
}
}
@@ -210,29 +254,29 @@ func (k *KubernetesService) watchGVR(gvr schema.GroupVersionResource) {
defer resyncTicker.Stop()
if err := k.resyncGVR(gvr); err != nil {
tlog.App.Error().Err(err).Str("api", gvr.GroupVersion().String()).Msg("Initial resync failed, retrying in 30 seconds")
k.log.App.Warn().Err(err).Str("api", gvr.GroupVersion().String()).Msg("Initial resync failed, will retry")
time.Sleep(30 * time.Second)
}
for {
select {
case <-k.ctx.Done():
tlog.App.Debug().Str("api", gvr.GroupVersion().String()).Msg("Stopping watcher")
k.log.App.Debug().Str("api", gvr.GroupVersion().String()).Msg("Shutting down kubernetes watcher")
return
case <-resyncTicker.C:
if err := k.resyncGVR(gvr); err != nil {
tlog.App.Warn().Err(err).Str("api", gvr.GroupVersion().String()).Msg("Periodic resync failed")
k.log.App.Warn().Err(err).Str("api", gvr.GroupVersion().String()).Msg("Periodic resync failed, will retry")
}
default:
ctx, cancel := context.WithCancel(k.ctx)
watcher, err := k.client.Resource(gvr).Watch(ctx, metav1.ListOptions{})
if err != nil {
tlog.App.Error().Err(err).Str("api", gvr.GroupVersion().String()).Msg("Failed to start watcher")
k.log.App.Warn().Err(err).Str("api", gvr.GroupVersion().String()).Msg("Failed to start watcher, will retry")
cancel()
time.Sleep(10 * time.Second)
continue
}
tlog.App.Debug().Str("api", gvr.GroupVersion().String()).Msg("Watcher started")
k.log.App.Debug().Str("api", gvr.GroupVersion().String()).Msg("Watcher started successfully")
if !k.runWatcher(gvr, watcher, resyncTicker) {
cancel()
return
@@ -242,65 +286,25 @@ func (k *KubernetesService) watchGVR(gvr schema.GroupVersionResource) {
}
}
func (k *KubernetesService) Init() error {
var cfg *rest.Config
var err error
cfg, err = rest.InClusterConfig()
if err != nil {
return fmt.Errorf("failed to get in-cluster Kubernetes config: %w", err)
}
client, err := dynamic.NewForConfig(cfg)
if err != nil {
return fmt.Errorf("failed to create Kubernetes client: %w", err)
}
k.client = client
k.ctx, k.cancel = context.WithCancel(context.Background())
gvr := schema.GroupVersionResource{
Group: "networking.k8s.io",
Version: "v1",
Resource: "ingresses",
}
accessCtx, accessCancel := context.WithTimeout(k.ctx, 5*time.Second)
defer accessCancel()
_, err = k.client.Resource(gvr).List(accessCtx, metav1.ListOptions{Limit: 1})
if err != nil {
tlog.App.Warn().Err(err).Msg("Insufficient permissions for networking.k8s.io/v1 Ingress, Kubernetes label provider will not work")
k.started = false
return nil
}
tlog.App.Debug().Msg("networking.k8s.io/v1 Ingress API accessible")
go k.watchGVR(gvr)
k.started = true
tlog.App.Info().Msg("Kubernetes label provider initialized")
return nil
}
func (k *KubernetesService) GetLabels(appDomain string) (*model.App, error) {
if !k.started {
tlog.App.Debug().Msg("Kubernetes not connected, returning empty labels")
k.log.App.Debug().Str("domain", appDomain).Msg("Kubernetes label provider not started, skipping")
return nil, nil
}
// First check cache
app := k.getByDomain(appDomain)
if app != nil {
tlog.App.Debug().Str("domain", appDomain).Msg("Found labels in cache by domain")
k.log.App.Debug().Str("domain", appDomain).Msg("Found labels in cache by domain")
return app, nil
}
appName := strings.SplitN(appDomain, ".", 2)[0]
app = k.getByAppName(appName)
if app != nil {
tlog.App.Debug().Str("domain", appDomain).Str("appName", appName).Msg("Found labels in cache by app name")
k.log.App.Debug().Str("domain", appDomain).Str("appName", appName).Msg("Found labels in cache by app name")
return app, nil
}
tlog.App.Debug().Str("domain", appDomain).Msg("Cache miss, no matching ingress found")
k.log.App.Debug().Str("domain", appDomain).Msg("No labels found for domain")
return nil, nil
}