feat: add ding for ordered go routine shutdown

This commit is contained in:
Stavros
2026-05-24 18:44:31 +03:00
parent 2737a25227
commit 33ee4f8b15
11 changed files with 107 additions and 100 deletions
+5 -4
View File
@@ -9,6 +9,7 @@ import (
"sync"
"time"
"github.com/steveiliop56/ding"
"github.com/tinyauthapp/tinyauth/internal/model"
"github.com/tinyauthapp/tinyauth/internal/repository"
"github.com/tinyauthapp/tinyauth/internal/utils"
@@ -96,7 +97,7 @@ func NewAuthService(
config model.Config,
runtime model.RuntimeConfig,
ctx context.Context,
wg *sync.WaitGroup,
dg *ding.Ding,
ldap *LdapService,
queries repository.Store,
oauthBroker *OAuthBrokerService,
@@ -116,7 +117,7 @@ func NewAuthService(
tailscale: tailscale,
}
wg.Go(service.CleanupOAuthSessionsRoutine)
dg.Go(service.cleanupOAuthSessions, ding.RingMinor)
return service
}
@@ -584,7 +585,7 @@ func (auth *AuthService) EndOAuthSession(sessionId string) {
auth.oauthMutex.Unlock()
}
func (auth *AuthService) CleanupOAuthSessionsRoutine() {
func (auth *AuthService) cleanupOAuthSessions(ctx context.Context) {
auth.log.App.Debug().Msg("Starting OAuth session cleanup routine")
ticker := time.NewTicker(30 * time.Minute)
@@ -607,7 +608,7 @@ func (auth *AuthService) CleanupOAuthSessionsRoutine() {
auth.oauthMutex.Unlock()
auth.log.App.Debug().Msg("OAuth session cleanup completed")
case <-auth.context.Done():
case <-ctx.Done():
auth.log.App.Debug().Msg("Stopping OAuth session cleanup routine")
return
}
+5 -5
View File
@@ -3,8 +3,8 @@ package service
import (
"context"
"strings"
"sync"
"github.com/steveiliop56/ding"
"github.com/tinyauthapp/tinyauth/internal/model"
"github.com/tinyauthapp/tinyauth/internal/utils/decoders"
"github.com/tinyauthapp/tinyauth/internal/utils/logger"
@@ -24,7 +24,7 @@ type DockerService struct {
func NewDockerService(
log *logger.Logger,
ctx context.Context,
wg *sync.WaitGroup,
dg *ding.Ding,
) (*DockerService, error) {
client, err := client.NewClientWithOpts(client.FromEnv)
@@ -50,7 +50,7 @@ func NewDockerService(
service.isConnected = true
service.log.App.Debug().Msg("Docker connected successfully")
wg.Go(service.watchAndClose)
dg.Go(service.watchAndClose, ding.RingMajor)
return service, nil
}
@@ -108,8 +108,8 @@ func (docker *DockerService) GetLabels(appDomain string) (*model.App, error) {
return nil, nil
}
func (docker *DockerService) watchAndClose() {
<-docker.context.Done()
func (docker *DockerService) watchAndClose(ctx context.Context) {
<-ctx.Done()
docker.log.App.Debug().Msg("Closing Docker client")
if docker.client != nil {
err := docker.client.Close()
+16 -17
View File
@@ -8,6 +8,7 @@ import (
"sync"
"time"
"github.com/steveiliop56/ding"
"github.com/tinyauthapp/tinyauth/internal/model"
"github.com/tinyauthapp/tinyauth/internal/utils/decoders"
"github.com/tinyauthapp/tinyauth/internal/utils/logger"
@@ -38,7 +39,6 @@ type ingressApp struct {
type KubernetesService struct {
log *logger.Logger
ctx context.Context
client dynamic.Interface
started bool
@@ -51,7 +51,7 @@ type KubernetesService struct {
func NewKubernetesService(
log *logger.Logger,
ctx context.Context,
wg *sync.WaitGroup,
dg *ding.Ding,
) (*KubernetesService, error) {
cfg, err := rest.InClusterConfig()
if err != nil {
@@ -82,16 +82,15 @@ func NewKubernetesService(
service := &KubernetesService{
log: log,
ctx: ctx,
client: client,
ingressApps: make(map[ingressKey][]ingressApp),
domainIndex: make(map[string]ingressAppKey),
appNameIndex: make(map[string]ingressAppKey),
}
wg.Go(func() {
service.watchGVR(gvr)
})
dg.Go(func(ctx context.Context) {
service.watchGVR(gvr, ctx)
}, ding.RingMajor)
service.started = true
log.App.Debug().Msg("Kubernetes label provider started successfully")
@@ -271,8 +270,8 @@ func (k *KubernetesService) updateFromItem(item *unstructured.Unstructured) {
}
}
func (k *KubernetesService) resyncGVR(gvr schema.GroupVersionResource) error {
ctx, cancel := context.WithTimeout(k.ctx, 30*time.Second)
func (k *KubernetesService) resyncGVR(gvr schema.GroupVersionResource, ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
list, err := k.client.Resource(gvr).List(ctx, metav1.ListOptions{})
@@ -289,10 +288,10 @@ func (k *KubernetesService) resyncGVR(gvr schema.GroupVersionResource) error {
// runWatcher drains events from an active watcher until it closes or the context is done.
// Returns true if the caller should restart the watcher, false if it should exit.
func (k *KubernetesService) runWatcher(gvr schema.GroupVersionResource, w watch.Interface, resyncTicker *time.Ticker) bool {
func (k *KubernetesService) runWatcher(gvr schema.GroupVersionResource, w watch.Interface, resyncTicker *time.Ticker, ctx context.Context) bool {
for {
select {
case <-k.ctx.Done():
case <-ctx.Done():
w.Stop()
return false
case event, ok := <-w.ResultChan():
@@ -314,33 +313,33 @@ func (k *KubernetesService) runWatcher(gvr schema.GroupVersionResource, w watch.
k.removeIngress(item.GetNamespace(), item.GetName())
}
case <-resyncTicker.C:
if err := k.resyncGVR(gvr); err != nil {
if err := k.resyncGVR(gvr, ctx); err != nil {
k.log.App.Warn().Err(err).Str("api", gvr.GroupVersion().String()).Msg("Periodic resync failed during watcher run")
}
}
}
}
func (k *KubernetesService) watchGVR(gvr schema.GroupVersionResource) {
func (k *KubernetesService) watchGVR(gvr schema.GroupVersionResource, ctx context.Context) {
resyncTicker := time.NewTicker(5 * time.Minute)
defer resyncTicker.Stop()
if err := k.resyncGVR(gvr); err != nil {
if err := k.resyncGVR(gvr, ctx); err != nil {
k.log.App.Warn().Err(err).Str("api", gvr.GroupVersion().String()).Msg("Initial resync failed, will retry")
time.Sleep(30 * time.Second)
}
for {
select {
case <-k.ctx.Done():
case <-ctx.Done():
k.log.App.Debug().Str("api", gvr.GroupVersion().String()).Msg("Shutting down kubernetes watcher")
return
case <-resyncTicker.C:
if err := k.resyncGVR(gvr); err != nil {
if err := k.resyncGVR(gvr, ctx); err != nil {
k.log.App.Warn().Err(err).Str("api", gvr.GroupVersion().String()).Msg("Periodic resync failed, will retry")
}
default:
ctx, cancel := context.WithCancel(k.ctx)
ctx, cancel := context.WithCancel(ctx)
watcher, err := k.client.Resource(gvr).Watch(ctx, metav1.ListOptions{})
if err != nil {
k.log.App.Warn().Err(err).Str("api", gvr.GroupVersion().String()).Msg("Failed to start watcher, will retry")
@@ -349,7 +348,7 @@ func (k *KubernetesService) watchGVR(gvr schema.GroupVersionResource) {
continue
}
k.log.App.Debug().Str("api", gvr.GroupVersion().String()).Msg("Watcher started successfully")
if !k.runWatcher(gvr, watcher, resyncTicker) {
if !k.runWatcher(gvr, watcher, resyncTicker, ctx) {
cancel()
return
}
+9 -11
View File
@@ -9,14 +9,14 @@ import (
"github.com/cenkalti/backoff/v5"
ldapgo "github.com/go-ldap/ldap/v3"
"github.com/steveiliop56/ding"
"github.com/tinyauthapp/tinyauth/internal/model"
"github.com/tinyauthapp/tinyauth/internal/utils/logger"
)
type LdapService struct {
log *logger.Logger
config model.Config
context context.Context
log *logger.Logger
config model.Config
conn *ldapgo.Conn
mutex sync.RWMutex
@@ -26,17 +26,15 @@ type LdapService struct {
func NewLdapService(
log *logger.Logger,
config model.Config,
ctx context.Context,
wg *sync.WaitGroup,
dg *ding.Ding,
) (*LdapService, error) {
if config.LDAP.Address == "" {
return nil, nil
}
ldap := &LdapService{
log: log,
config: config,
context: ctx,
log: log,
config: config,
}
// Check whether authentication with client certificate is possible
@@ -69,7 +67,7 @@ func NewLdapService(
return nil, fmt.Errorf("failed to connect to ldap server: %w", err)
}
wg.Go(func() {
dg.Go(func(ctx context.Context) {
ldap.log.App.Debug().Msg("Starting LDAP connection heartbeat routine")
ticker := time.NewTicker(5 * time.Minute)
@@ -87,12 +85,12 @@ func NewLdapService(
}
ldap.log.App.Info().Msg("Successfully reconnected to LDAP server")
}
case <-ldap.context.Done():
case <-ctx.Done():
ldap.log.App.Debug().Msg("LDAP service context cancelled, stopping heartbeat")
return
}
}
})
}, ding.RingMajor)
return ldap, nil
}
+11 -14
View File
@@ -15,13 +15,13 @@ import (
"net/url"
"os"
"strings"
"sync"
"time"
"slices"
"github.com/gin-gonic/gin"
"github.com/go-jose/go-jose/v4"
"github.com/steveiliop56/ding"
"github.com/tinyauthapp/tinyauth/internal/model"
"github.com/tinyauthapp/tinyauth/internal/repository"
"github.com/tinyauthapp/tinyauth/internal/utils"
@@ -116,7 +116,6 @@ type OIDCService struct {
config model.Config
runtime model.RuntimeConfig
queries repository.Store
context context.Context
clients map[string]model.OIDCClientConfig
privateKey *rsa.PrivateKey
@@ -129,8 +128,7 @@ func NewOIDCService(
config model.Config,
runtime model.RuntimeConfig,
queries repository.Store,
ctx context.Context,
wg *sync.WaitGroup) (*OIDCService, error) {
dg *ding.Ding) (*OIDCService, error) {
// If not configured, skip init
if len(runtime.OIDCClients) == 0 {
return nil, nil
@@ -276,7 +274,6 @@ func NewOIDCService(
config: config,
runtime: runtime,
queries: queries,
context: ctx,
clients: clients,
privateKey: privateKey,
@@ -285,7 +282,7 @@ func NewOIDCService(
}
// Start cleanup routine
wg.Go(service.cleanupRoutine)
dg.Go(service.cleanupRoutine, ding.RingMinor)
return service, nil
}
@@ -759,7 +756,7 @@ func (service *OIDCService) DeleteOldSession(ctx context.Context, sub string) er
}
// Cleanup routine - Resource heavy due to the linked tables
func (service *OIDCService) cleanupRoutine() {
func (service *OIDCService) cleanupRoutine(ctx context.Context) {
service.log.App.Debug().Msg("Starting OIDC cleanup routine")
ticker := time.NewTicker(time.Duration(30) * time.Minute)
defer ticker.Stop()
@@ -772,7 +769,7 @@ func (service *OIDCService) cleanupRoutine() {
currentTime := time.Now().Unix()
// For the OIDC tokens, if they are expired we delete the userinfo and codes
expiredTokens, err := service.queries.DeleteExpiredOidcTokens(service.context, repository.DeleteExpiredOidcTokensParams{
expiredTokens, err := service.queries.DeleteExpiredOidcTokens(ctx, repository.DeleteExpiredOidcTokensParams{
TokenExpiresAt: currentTime,
RefreshTokenExpiresAt: currentTime,
})
@@ -782,21 +779,21 @@ func (service *OIDCService) cleanupRoutine() {
}
for _, expiredToken := range expiredTokens {
err := service.DeleteOldSession(service.context, expiredToken.Sub)
err := service.DeleteOldSession(ctx, expiredToken.Sub)
if err != nil {
service.log.App.Warn().Err(err).Msg("Failed to delete session for expired token")
}
}
// For expired codes, we need to get the sub, check if tokens are expired and if they are remove everything
expiredCodes, err := service.queries.DeleteExpiredOidcCodes(service.context, currentTime)
expiredCodes, err := service.queries.DeleteExpiredOidcCodes(ctx, currentTime)
if err != nil {
service.log.App.Warn().Err(err).Msg("Failed to delete expired codes")
service.log.App.Warn().Err(err).Msg("Failed to delete expired codes")
}
for _, expiredCode := range expiredCodes {
token, err := service.queries.GetOidcTokenBySub(service.context, expiredCode.Sub)
token, err := service.queries.GetOidcTokenBySub(ctx, expiredCode.Sub)
if err != nil {
if !errors.Is(err, repository.ErrNotFound) {
@@ -806,7 +803,7 @@ func (service *OIDCService) cleanupRoutine() {
}
if token.TokenExpiresAt < currentTime && token.RefreshTokenExpiresAt < currentTime {
err := service.DeleteOldSession(service.context, expiredCode.Sub)
err := service.DeleteOldSession(ctx, expiredCode.Sub)
if err != nil {
service.log.App.Warn().Err(err).Msg("Failed to delete session for expired code")
}
@@ -814,7 +811,7 @@ func (service *OIDCService) cleanupRoutine() {
}
service.log.App.Debug().Msg("Finished OIDC cleanup routine")
case <-service.context.Done():
case <-ctx.Done():
service.log.App.Debug().Msg("Stopping OIDC cleanup routine")
return
}
+5 -6
View File
@@ -9,6 +9,7 @@ import (
"sync"
"time"
"github.com/steveiliop56/ding"
"github.com/tinyauthapp/tinyauth/internal/model"
"github.com/tinyauthapp/tinyauth/internal/utils/logger"
"tailscale.com/client/local"
@@ -25,7 +26,6 @@ type TailscaleWhoisResponse struct {
type TailscaleService struct {
log *logger.Logger
wg *sync.WaitGroup
config model.Config
ctx context.Context
@@ -35,7 +35,7 @@ type TailscaleService struct {
mu sync.Mutex
}
func NewTailscaleService(log *logger.Logger, config model.Config, ctx context.Context, wg *sync.WaitGroup) (*TailscaleService, error) {
func NewTailscaleService(log *logger.Logger, config model.Config, ctx context.Context, dg *ding.Ding) (*TailscaleService, error) {
if !config.Tailscale.Enabled {
return nil, nil
}
@@ -67,7 +67,6 @@ func NewTailscaleService(log *logger.Logger, config model.Config, ctx context.Co
service := &TailscaleService{
log: log,
wg: wg,
config: config,
ctx: ctx,
srv: srv,
@@ -84,13 +83,13 @@ func NewTailscaleService(log *logger.Logger, config model.Config, ctx context.Co
return nil, fmt.Errorf("failed to connect to tailscale network: %w", err)
}
wg.Go(service.watchAndClose)
dg.Go(service.watchAndClose, ding.RingMajor)
return service, nil
}
func (ts *TailscaleService) watchAndClose() {
<-ts.ctx.Done()
func (ts *TailscaleService) watchAndClose(ctx context.Context) {
<-ctx.Done()
ts.log.App.Debug().Msg("Shutting down Tailscale service")
ts.mu.Lock()
srv := ts.srv