mirror of
https://github.com/steveiliop56/tinyauth.git
synced 2026-05-27 14:40:14 +00:00
feat: use ding for ordered go routine shutdown order (#896)
This commit is contained in:
@@ -13,11 +13,11 @@ import (
|
||||
"os/signal"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/steveiliop56/ding"
|
||||
|
||||
"github.com/tinyauthapp/tinyauth/internal/model"
|
||||
"github.com/tinyauthapp/tinyauth/internal/repository"
|
||||
@@ -26,6 +26,12 @@ import (
|
||||
"github.com/tinyauthapp/tinyauth/internal/utils/logger"
|
||||
)
|
||||
|
||||
// Shutdown order for go routines
|
||||
// 1. Janitor routines (e.g. database cleanup, heartbeat) - ding.RingMinor
|
||||
// 2. HTTP server listeners - ding.RingNormal
|
||||
// 3. Networking layers, user and label providers (e.g. ailscale service, kubernetes service) - ding.RingMajor
|
||||
// 4. Database connection - ding.RingCritical
|
||||
|
||||
type Services struct {
|
||||
accessControlService *service.AccessControlsService
|
||||
authService *service.AuthService
|
||||
@@ -48,7 +54,7 @@ type BootstrapApp struct {
|
||||
queries repository.Store
|
||||
router *gin.Engine
|
||||
db *sql.DB
|
||||
wg sync.WaitGroup
|
||||
ding *ding.Ding
|
||||
listeners []Listener
|
||||
}
|
||||
|
||||
@@ -64,6 +70,10 @@ func (app *BootstrapApp) Setup() error {
|
||||
app.ctx = ctx
|
||||
app.cancel = cancel
|
||||
|
||||
// Create a ding instance
|
||||
dg := ding.New(ctx)
|
||||
app.ding = dg
|
||||
|
||||
// setup logger
|
||||
log := logger.NewLogger().WithConfig(app.config.Log)
|
||||
log.Init()
|
||||
@@ -186,15 +196,17 @@ func (app *BootstrapApp) Setup() error {
|
||||
return fmt.Errorf("failed to setup database: %w", err)
|
||||
}
|
||||
|
||||
// after this point, we start initializing dependencies so it's a good time to setup a defer
|
||||
// to ensure that resources are cleaned up properly in case of an error during initialization
|
||||
defer func() {
|
||||
app.cancel()
|
||||
app.wg.Wait()
|
||||
if app.db != nil {
|
||||
app.db.Close()
|
||||
app.ding.Go(func(ctx context.Context) {
|
||||
<-ctx.Done()
|
||||
app.log.App.Debug().Msg("Shutting down database connection")
|
||||
if app.db == nil {
|
||||
// using memory store, no db instance
|
||||
return
|
||||
}
|
||||
}()
|
||||
if err := app.db.Close(); err != nil {
|
||||
app.log.App.Error().Err(err).Msg("Failed to close database connection")
|
||||
}
|
||||
}, ding.RingCritical)
|
||||
|
||||
// store
|
||||
app.queries = store
|
||||
@@ -261,12 +273,12 @@ func (app *BootstrapApp) Setup() error {
|
||||
|
||||
// start db cleanup routine
|
||||
app.log.App.Debug().Msg("Starting database cleanup routine")
|
||||
app.wg.Go(app.dbCleanupRoutine)
|
||||
app.ding.Go(app.dbCleanupRoutine, ding.RingMinor)
|
||||
|
||||
// if analytics are not disabled, start heartbeat
|
||||
if app.config.Analytics.Enabled {
|
||||
app.log.App.Debug().Msg("Starting heartbeat routine")
|
||||
app.wg.Go(app.heartbeatRoutine)
|
||||
app.ding.Go(app.heartbeatRoutine, ding.RingMinor)
|
||||
}
|
||||
|
||||
// setup listeners
|
||||
@@ -287,6 +299,7 @@ func (app *BootstrapApp) Setup() error {
|
||||
for {
|
||||
select {
|
||||
case <-app.ctx.Done():
|
||||
app.ding.Wait()
|
||||
app.log.App.Info().Msg("Oh, it's time for me to go, bye!")
|
||||
return nil
|
||||
case err := <-lec:
|
||||
@@ -297,7 +310,7 @@ func (app *BootstrapApp) Setup() error {
|
||||
}
|
||||
}
|
||||
|
||||
func (app *BootstrapApp) heartbeatRoutine() {
|
||||
func (app *BootstrapApp) heartbeatRoutine(ctx context.Context) {
|
||||
ticker := time.NewTicker(time.Duration(12) * time.Hour)
|
||||
defer ticker.Stop()
|
||||
|
||||
@@ -350,7 +363,7 @@ func (app *BootstrapApp) heartbeatRoutine() {
|
||||
if res.StatusCode != 200 && res.StatusCode != 201 {
|
||||
app.log.App.Debug().Str("status", res.Status).Msg("Heartbeat returned non-200/201 status")
|
||||
}
|
||||
case <-app.ctx.Done():
|
||||
case <-ctx.Done():
|
||||
app.log.App.Debug().Msg("Stopping heartbeat routine")
|
||||
ticker.Stop()
|
||||
return
|
||||
@@ -358,7 +371,7 @@ func (app *BootstrapApp) heartbeatRoutine() {
|
||||
}
|
||||
}
|
||||
|
||||
func (app *BootstrapApp) dbCleanupRoutine() {
|
||||
func (app *BootstrapApp) dbCleanupRoutine(ctx context.Context) {
|
||||
ticker := time.NewTicker(time.Duration(30) * time.Minute)
|
||||
defer ticker.Stop()
|
||||
|
||||
@@ -367,14 +380,14 @@ func (app *BootstrapApp) dbCleanupRoutine() {
|
||||
case <-ticker.C:
|
||||
app.log.App.Debug().Msg("Running database cleanup")
|
||||
|
||||
err := app.queries.DeleteExpiredSessions(app.ctx, time.Now().Unix())
|
||||
err := app.queries.DeleteExpiredSessions(ctx, time.Now().Unix())
|
||||
|
||||
if err != nil {
|
||||
app.log.App.Error().Err(err).Msg("Failed to delete expired sessions")
|
||||
}
|
||||
|
||||
app.log.App.Debug().Msg("Database cleanup completed")
|
||||
case <-app.ctx.Done():
|
||||
case <-ctx.Done():
|
||||
app.log.App.Debug().Msg("Stopping database cleanup routine")
|
||||
ticker.Stop()
|
||||
return
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/steveiliop56/ding"
|
||||
"github.com/tinyauthapp/tinyauth/internal/controller"
|
||||
"github.com/tinyauthapp/tinyauth/internal/middleware"
|
||||
"github.com/tinyauthapp/tinyauth/internal/model"
|
||||
@@ -80,9 +81,9 @@ func (app *BootstrapApp) runListeners() (chan error, error) {
|
||||
return nil, fmt.Errorf("failed to get listener function: %w", err)
|
||||
}
|
||||
|
||||
app.wg.Go(func() {
|
||||
lec <- listenerFunc()
|
||||
})
|
||||
app.ding.Go(func(ctx context.Context) {
|
||||
lec <- listenerFunc(ctx)
|
||||
}, ding.RingNormal)
|
||||
}
|
||||
|
||||
return lec, nil
|
||||
@@ -125,7 +126,7 @@ func (app *BootstrapApp) calculateListenerPolicy() []Listener {
|
||||
return l
|
||||
}
|
||||
|
||||
func (app *BootstrapApp) listenerFromType(listenerType Listener) (func() error, error) {
|
||||
func (app *BootstrapApp) listenerFromType(listenerType Listener) (func(ctx context.Context) error, error) {
|
||||
switch listenerType {
|
||||
case ListenerHTTP:
|
||||
return app.serveHTTP, nil
|
||||
@@ -138,7 +139,7 @@ func (app *BootstrapApp) listenerFromType(listenerType Listener) (func() error,
|
||||
}
|
||||
}
|
||||
|
||||
func (app *BootstrapApp) serveHTTP() error {
|
||||
func (app *BootstrapApp) serveHTTP(ctx context.Context) error {
|
||||
address := fmt.Sprintf("%s:%d", app.config.Server.Address, app.config.Server.Port)
|
||||
|
||||
app.log.App.Info().Msgf("Starting server on %s", address)
|
||||
@@ -154,10 +155,10 @@ func (app *BootstrapApp) serveHTTP() error {
|
||||
Handler: app.router.Handler(),
|
||||
}
|
||||
|
||||
return app.serve(listener, server, "http")
|
||||
return app.serve(listener, server, ctx, "http")
|
||||
}
|
||||
|
||||
func (app *BootstrapApp) serveUnix() error {
|
||||
func (app *BootstrapApp) serveUnix(ctx context.Context) error {
|
||||
_, err := os.Stat(app.config.Server.SocketPath)
|
||||
|
||||
if err == nil {
|
||||
@@ -181,10 +182,10 @@ func (app *BootstrapApp) serveUnix() error {
|
||||
Handler: app.router.Handler(),
|
||||
}
|
||||
|
||||
return app.serve(listener, server, "unix socket")
|
||||
return app.serve(listener, server, ctx, "unix socket")
|
||||
}
|
||||
|
||||
func (app *BootstrapApp) serveTailscale() error {
|
||||
func (app *BootstrapApp) serveTailscale(ctx context.Context) error {
|
||||
app.log.App.Info().Msgf("Starting Tailscale server on %s", fmt.Sprintf("https://%s", app.services.tailscaleService.GetHostname()))
|
||||
|
||||
listener, err := app.services.tailscaleService.CreateListener()
|
||||
@@ -197,27 +198,23 @@ func (app *BootstrapApp) serveTailscale() error {
|
||||
Handler: app.router.Handler(),
|
||||
}
|
||||
|
||||
return app.serve(listener, server, "tailscale")
|
||||
return app.serve(listener, server, ctx, "tailscale")
|
||||
}
|
||||
|
||||
func (app *BootstrapApp) serve(listener net.Listener, server *http.Server, name string) error {
|
||||
func (app *BootstrapApp) serve(listener net.Listener, server *http.Server, ctx context.Context, name string) error {
|
||||
shutdown := func() {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), model.GracefulShutdownTimeout*time.Second)
|
||||
// we use a new context for the shutdown since the main one is cancelled
|
||||
sctx, cancel := context.WithTimeout(context.Background(), model.GracefulShutdownTimeout*time.Second)
|
||||
defer cancel()
|
||||
err := server.Shutdown(ctx)
|
||||
if err != nil &&
|
||||
// With tailscale, the goroutine for shutting down the tailscale connection
|
||||
// runs first and causes the connection the tailscale listener is running on to close
|
||||
// first so, the shutdown fails
|
||||
// TODO: add priority to the goroutine shutdowns
|
||||
!errors.Is(err, net.ErrClosed) {
|
||||
err := server.Shutdown(sctx)
|
||||
if err != nil {
|
||||
app.log.App.Error().Err(err).Msgf("Failed to shutdown %s listener gracefully", name)
|
||||
}
|
||||
listener.Close()
|
||||
}
|
||||
|
||||
go func() {
|
||||
<-app.ctx.Done()
|
||||
<-ctx.Done()
|
||||
app.log.App.Debug().Msgf("Shutting down %s listener", name)
|
||||
shutdown()
|
||||
}()
|
||||
|
||||
@@ -8,7 +8,7 @@ import (
|
||||
)
|
||||
|
||||
func (app *BootstrapApp) setupServices() error {
|
||||
ldapService, err := service.NewLdapService(app.log, app.config, app.ctx, &app.wg)
|
||||
ldapService, err := service.NewLdapService(app.log, app.config, app.ding)
|
||||
|
||||
if err != nil {
|
||||
app.log.App.Warn().Err(err).Msg("Failed to initialize LDAP connection, will continue without it")
|
||||
@@ -22,7 +22,7 @@ func (app *BootstrapApp) setupServices() error {
|
||||
return fmt.Errorf("failed to initialize label provider: %w", err)
|
||||
}
|
||||
|
||||
tailscaleService, err := service.NewTailscaleService(app.log, app.config, app.ctx, &app.wg)
|
||||
tailscaleService, err := service.NewTailscaleService(app.log, app.config, app.ctx, app.ding)
|
||||
|
||||
if err != nil {
|
||||
app.log.App.Warn().Err(err).Msg("Failed to initialize Tailscale connection, will continue without it")
|
||||
@@ -42,10 +42,10 @@ func (app *BootstrapApp) setupServices() error {
|
||||
oauthBrokerService := service.NewOAuthBrokerService(app.log, app.runtime.OAuthProviders, app.ctx)
|
||||
app.services.oauthBrokerService = oauthBrokerService
|
||||
|
||||
authService := service.NewAuthService(app.log, app.config, app.runtime, app.ctx, &app.wg, app.services.ldapService, app.queries, app.services.oauthBrokerService, app.services.tailscaleService, app.services.policyEngine)
|
||||
authService := service.NewAuthService(app.log, app.config, app.runtime, app.ctx, app.ding, app.services.ldapService, app.queries, app.services.oauthBrokerService, app.services.tailscaleService, app.services.policyEngine)
|
||||
app.services.authService = authService
|
||||
|
||||
oidcService, err := service.NewOIDCService(app.log, app.config, app.runtime, app.queries, app.ctx, &app.wg)
|
||||
oidcService, err := service.NewOIDCService(app.log, app.config, app.runtime, app.queries, app.ding)
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to initialize oidc service: %w", err)
|
||||
@@ -69,7 +69,7 @@ func (app *BootstrapApp) getLabelProvider() (service.LabelProvider, error) {
|
||||
if useKubernetes {
|
||||
app.log.App.Debug().Msg("Using Kubernetes label provider")
|
||||
|
||||
kubernetesService, err := service.NewKubernetesService(app.log, app.ctx, &app.wg)
|
||||
kubernetesService, err := service.NewKubernetesService(app.log, app.ctx, app.ding)
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to initialize kubernetes service: %w", err)
|
||||
@@ -81,7 +81,7 @@ func (app *BootstrapApp) getLabelProvider() (service.LabelProvider, error) {
|
||||
|
||||
app.log.App.Debug().Msg("Using Docker label provider")
|
||||
|
||||
dockerService, err := service.NewDockerService(app.log, app.ctx, &app.wg)
|
||||
dockerService, err := service.NewDockerService(app.log, app.ctx, app.ding)
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to initialize docker service: %w", err)
|
||||
|
||||
Reference in New Issue
Block a user