Files
tinyauth/internal/service/kubernetes_service.go
T
Contre 956d2f55c3 feat(access-control): Add support for Kubernetes Label (#627)
* feat(access-control): Add support for Kubernetes Label

* feat(access-control): Defaults to Docker

* feat(access-control): Remove kubeconfig fallback

* feat(watcher): Watcher for kubernetes service

* feat(watcher): Merge with main + remove nightly fix redirect

* fix(go): Go mod + Go sum after sync with main

* fix(config): Ser default value for LabelProvider to Docker

* feat(go): go mod tidy

* feat(k8s_service): Remove logic for deprecated Ingress k8s v1.22

* feat(k8s_service): (Watcher) -> Wait 5s before breaking to outer loop again

* feat(k8s_service): Remove logic for deprecated Ingress k8s v1.22

* feat(k8s_service): Remove logic for deprecated Ingress k8s v1.22

* feat(k8s_service): Remove logic for deprecated Ingress k8s v1.22

* feat(k8s_service): Remove
var _ = unstructured.Unstructured{} + comments + msg edits

* feat(bootstrap): Remove dockerService from bootstrap svc

* feat(auth_svc): Remove dockerService from authservice

* feat(test): Add tests for kubernetes_services

* feat(test): Remove docker serivce form proxy/user test

* fix(refactor): Remove update logic from watcher and resync

* fix(refactor): Split watchGVR to make it more readable

* fix(refactor): Remove discovery + drop K 1.22 completely

* fix(refactor): Move interface to acess_controls_service

* feat: Autodetect labelprovider if TINYAUTH_LABELPROVIDER not set

* fix(test): Match testing scheme to the controllers

* fix: service bootstrap import after merge

* fix: service bootstrap import after merge
2026-04-29 16:16:21 +03:00

304 lines
8.1 KiB
Go

package service
import (
"context"
"fmt"
"strings"
"sync"
"time"
"github.com/tinyauthapp/tinyauth/internal/config"
"github.com/tinyauthapp/tinyauth/internal/utils/decoders"
"github.com/tinyauthapp/tinyauth/internal/utils/tlog"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
)
type ingressKey struct {
namespace string
name string
}
type ingressAppKey struct {
ingressKey
appName string
}
type ingressApp struct {
domain string
appName string
app config.App
}
type KubernetesService struct {
client dynamic.Interface
ctx context.Context
cancel context.CancelFunc
started bool
mu sync.RWMutex
ingressApps map[ingressKey][]ingressApp
domainIndex map[string]ingressAppKey
appNameIndex map[string]ingressAppKey
}
func NewKubernetesService() *KubernetesService {
return &KubernetesService{
ingressApps: make(map[ingressKey][]ingressApp),
domainIndex: make(map[string]ingressAppKey),
appNameIndex: make(map[string]ingressAppKey),
}
}
func (k *KubernetesService) addIngressApps(namespace, name string, apps []ingressApp) {
k.mu.Lock()
defer k.mu.Unlock()
key := ingressKey{namespace, name}
// Remove existing entries for this ingress
if existing, ok := k.ingressApps[key]; ok {
for _, app := range existing {
delete(k.domainIndex, app.domain)
delete(k.appNameIndex, app.appName)
}
}
// Add new entries
k.ingressApps[key] = apps
for _, app := range apps {
appKey := ingressAppKey{key, app.appName}
k.domainIndex[app.domain] = appKey
k.appNameIndex[app.appName] = appKey
}
}
func (k *KubernetesService) removeIngress(namespace, name string) {
k.mu.Lock()
defer k.mu.Unlock()
key := ingressKey{namespace, name}
if apps, ok := k.ingressApps[key]; ok {
for _, app := range apps {
delete(k.domainIndex, app.domain)
delete(k.appNameIndex, app.appName)
}
delete(k.ingressApps, key)
}
}
func (k *KubernetesService) getByDomain(domain string) (config.App, bool) {
k.mu.RLock()
defer k.mu.RUnlock()
if appKey, ok := k.domainIndex[domain]; ok {
if apps, ok := k.ingressApps[appKey.ingressKey]; ok {
for _, app := range apps {
if app.domain == domain && app.appName == appKey.appName {
return app.app, true
}
}
}
}
return config.App{}, false
}
func (k *KubernetesService) getByAppName(appName string) (config.App, bool) {
k.mu.RLock()
defer k.mu.RUnlock()
if appKey, ok := k.appNameIndex[appName]; ok {
if apps, ok := k.ingressApps[appKey.ingressKey]; ok {
for _, app := range apps {
if app.appName == appName {
return app.app, true
}
}
}
}
return config.App{}, false
}
func (k *KubernetesService) updateFromItem(item *unstructured.Unstructured) {
namespace := item.GetNamespace()
name := item.GetName()
annotations := item.GetAnnotations()
if annotations == nil {
k.removeIngress(namespace, name)
return
}
labels, err := decoders.DecodeLabels[config.Apps](annotations, "apps")
if err != nil {
tlog.App.Debug().Err(err).Msg("Failed to decode labels from annotations")
k.removeIngress(namespace, name)
return
}
var apps []ingressApp
for appName, appLabels := range labels.Apps {
if appLabels.Config.Domain == "" {
continue
}
apps = append(apps, ingressApp{
domain: appLabels.Config.Domain,
appName: appName,
app: appLabels,
})
}
if len(apps) == 0 {
k.removeIngress(namespace, name)
} else {
k.addIngressApps(namespace, name, apps)
}
}
func (k *KubernetesService) resyncGVR(gvr schema.GroupVersionResource) error {
ctx, cancel := context.WithTimeout(k.ctx, 30*time.Second)
defer cancel()
list, err := k.client.Resource(gvr).List(ctx, metav1.ListOptions{})
if err != nil {
tlog.App.Debug().Err(err).Str("api", gvr.GroupVersion().String()).Msg("Failed to list ingresses during resync")
return err
}
for i := range list.Items {
k.updateFromItem(&list.Items[i])
}
tlog.App.Debug().Str("api", gvr.GroupVersion().String()).Int("count", len(list.Items)).Msg("Resynced ingress cache")
return nil
}
// runWatcher drains events from an active watcher until it closes or the context is done.
// Returns true if the caller should restart the watcher, false if it should exit.
func (k *KubernetesService) runWatcher(gvr schema.GroupVersionResource, w watch.Interface, resyncTicker *time.Ticker) bool {
for {
select {
case <-k.ctx.Done():
w.Stop()
return false
case event, ok := <-w.ResultChan():
if !ok {
tlog.App.Debug().Str("api", gvr.GroupVersion().String()).Msg("Watcher channel closed, restarting in 5 seconds")
w.Stop()
time.Sleep(5 * time.Second)
return true
}
item, ok := event.Object.(*unstructured.Unstructured)
if !ok {
tlog.App.Warn().Str("api", gvr.GroupVersion().String()).Msg("Failed to cast watched object")
continue
}
switch event.Type {
case watch.Added, watch.Modified:
k.updateFromItem(item)
case watch.Deleted:
k.removeIngress(item.GetNamespace(), item.GetName())
}
case <-resyncTicker.C:
if err := k.resyncGVR(gvr); err != nil {
tlog.App.Warn().Err(err).Str("api", gvr.GroupVersion().String()).Msg("Periodic resync failed")
}
}
}
}
func (k *KubernetesService) watchGVR(gvr schema.GroupVersionResource) {
resyncTicker := time.NewTicker(5 * time.Minute)
defer resyncTicker.Stop()
if err := k.resyncGVR(gvr); err != nil {
tlog.App.Error().Err(err).Str("api", gvr.GroupVersion().String()).Msg("Initial resync failed, retrying in 30 seconds")
time.Sleep(30 * time.Second)
}
for {
select {
case <-k.ctx.Done():
tlog.App.Debug().Str("api", gvr.GroupVersion().String()).Msg("Stopping watcher")
return
case <-resyncTicker.C:
if err := k.resyncGVR(gvr); err != nil {
tlog.App.Warn().Err(err).Str("api", gvr.GroupVersion().String()).Msg("Periodic resync failed")
}
default:
ctx, cancel := context.WithCancel(k.ctx)
watcher, err := k.client.Resource(gvr).Watch(ctx, metav1.ListOptions{})
if err != nil {
tlog.App.Error().Err(err).Str("api", gvr.GroupVersion().String()).Msg("Failed to start watcher")
cancel()
time.Sleep(10 * time.Second)
continue
}
tlog.App.Debug().Str("api", gvr.GroupVersion().String()).Msg("Watcher started")
if !k.runWatcher(gvr, watcher, resyncTicker) {
cancel()
return
}
cancel()
}
}
}
func (k *KubernetesService) Init() error {
var cfg *rest.Config
var err error
cfg, err = rest.InClusterConfig()
if err != nil {
return fmt.Errorf("failed to get in-cluster Kubernetes config: %w", err)
}
client, err := dynamic.NewForConfig(cfg)
if err != nil {
return fmt.Errorf("failed to create Kubernetes client: %w", err)
}
k.client = client
k.ctx, k.cancel = context.WithCancel(context.Background())
gvr := schema.GroupVersionResource{
Group: "networking.k8s.io",
Version: "v1",
Resource: "ingresses",
}
accessCtx, accessCancel := context.WithTimeout(k.ctx, 5*time.Second)
defer accessCancel()
_, err = k.client.Resource(gvr).List(accessCtx, metav1.ListOptions{Limit: 1})
if err != nil {
tlog.App.Warn().Err(err).Msg("Insufficient permissions for networking.k8s.io/v1 Ingress, Kubernetes label provider will not work")
k.started = false
return nil
}
tlog.App.Debug().Msg("networking.k8s.io/v1 Ingress API accessible")
go k.watchGVR(gvr)
k.started = true
tlog.App.Info().Msg("Kubernetes label provider initialized")
return nil
}
func (k *KubernetesService) GetLabels(appDomain string) (config.App, error) {
if !k.started {
tlog.App.Debug().Msg("Kubernetes not connected, returning empty labels")
return config.App{}, nil
}
// First check cache
if app, found := k.getByDomain(appDomain); found {
tlog.App.Debug().Str("domain", appDomain).Msg("Found labels in cache by domain")
return app, nil
}
appName := strings.SplitN(appDomain, ".", 2)[0]
if app, found := k.getByAppName(appName); found {
tlog.App.Debug().Str("domain", appDomain).Str("appName", appName).Msg("Found labels in cache by app name")
return app, nil
}
tlog.App.Debug().Str("domain", appDomain).Msg("Cache miss, no matching ingress found")
return config.App{}, nil
}