backea/internal/backup/core/executor.go
2025-03-28 19:57:44 +01:00

187 lines
5.5 KiB
Go

package core
import (
"backea/internal/backup/models"
"backea/internal/backup/strategy"
"backea/internal/logging"
"backea/internal/mail"
"context"
"sync"
)
// Executor handles the execution of backups for multiple services
type Executor struct {
Config *models.Configuration
Factory strategy.Factory
Mailer *mail.Mailer
concurrency int
Logger logging.Logger
}
// NewExecutor creates a new backup executor
func NewExecutor(config *models.Configuration, factory strategy.Factory) *Executor {
return &Executor{
Config: config,
Factory: factory,
// Mailer: mailer,
concurrency: 5,
Logger: *logging.GetLogger(),
}
}
// SetConcurrency sets the maximum number of concurrent backups
func (e *Executor) SetConcurrency(n int) {
if n > 0 {
e.concurrency = n
}
}
// PerformBackups executes backups for multiple services based on configuration
func (e *Executor) PerformBackups(ctx context.Context, serviceName string, serviceIndex string) error {
// Process services based on parameters
if serviceName != "" {
// Process single service group or specific service
if serviceIndex != "" {
// Process specific service within a group
return e.processSpecificService(ctx, serviceName, serviceIndex)
} else {
// Process all services in the specified group
return e.processServiceGroup(ctx, serviceName)
}
} else {
// Process all service groups in parallel
return e.processAllServiceGroups(ctx)
}
}
// processAllServiceGroups processes all service groups in parallel
func (e *Executor) processAllServiceGroups(ctx context.Context) error {
var wg sync.WaitGroup
errs := make(chan error, len(e.Config.Services))
// Create a semaphore to limit concurrency
sem := make(chan struct{}, e.concurrency)
for groupName := range e.Config.Services {
wg.Add(1)
sem <- struct{}{} // Acquire semaphore
go func(group string) {
defer wg.Done()
defer func() { <-sem }() // Release semaphore
if err := e.processServiceGroup(ctx, group); err != nil {
e.Logger.Error("Failed to backup service group %s: %v", group, err)
errs <- NewBackupError("backup failed for group "+group, err)
}
}(groupName)
}
// Wait for all backups to complete
wg.Wait()
close(errs)
// Check if any errors occurred and return the last one
var lastErr error
for err := range errs {
lastErr = err
}
return lastErr
}
// processServiceGroup handles the backup for all services in a group
func (e *Executor) processServiceGroup(ctx context.Context, groupName string) error {
// Get service group configuration
serviceGroup, exists := e.Config.Services[groupName]
if !exists {
e.Logger.Error("Service group not found: %s", groupName)
return NewBackupError("service group not found: "+groupName, nil)
}
// Create hook runner
hooks := NewHookRunner(serviceGroup.Source.Path, serviceGroup.Hooks.BeforeHook, serviceGroup.Hooks.AfterHook)
// Execute the before hook once for the entire group
if err := hooks.RunBeforeHook(); err != nil {
e.Logger.Error("Failed to execute before hook for group %s: %v", groupName, err)
return NewBackupError("before hook failed for group "+groupName, err)
}
// Process all services in the group in parallel
var wg sync.WaitGroup
errs := make(chan error, len(serviceGroup.BackupConfigs))
// Create a semaphore to limit concurrency within the group
sem := make(chan struct{}, e.concurrency)
for configIndex := range serviceGroup.BackupConfigs {
wg.Add(1)
sem <- struct{}{} // Acquire semaphore
go func(group, index string) {
defer wg.Done()
defer func() { <-sem }() // Release semaphore
if err := e.processSpecificService(ctx, group, index); err != nil {
e.Logger.Error("Failed to backup service %s.%s: %v", group, index, err)
errs <- NewBackupError("backup failed for "+group+"."+index, err)
}
}(groupName, configIndex)
}
// Wait for all backups to complete
wg.Wait()
close(errs)
var lastErr error
for err := range errs {
lastErr = err
}
// Execute the after hook once for the entire group
if err := hooks.RunAfterHook(); err != nil {
e.Logger.Error("Failed to execute after hook for group %s: %v", groupName, err)
// If no backup errors occurred, return the after hook error,
// otherwise prioritize the backup errors
if lastErr == nil {
lastErr = NewBackupError("after hook failed for group "+groupName, err)
}
}
return lastErr
}
// processSpecificService handles the backup for a specific service in a group
func (e *Executor) processSpecificService(ctx context.Context, groupName string, configIndex string) error {
// Get service configuration
serviceGroup, exists := e.Config.Services[groupName]
if !exists {
return NewBackupError("service group not found: "+groupName, nil)
}
// Check if the config index exists
_, exists = serviceGroup.BackupConfigs[configIndex]
if !exists {
return NewBackupError("service index not found: "+groupName+"."+configIndex, nil)
}
// Create the appropriate backup strategy using the factory
backupStrategy, err := e.Factory.CreateBackupStrategyForService(groupName, configIndex)
if err != nil {
e.Logger.Error("Failed to create backup strategy for service %s.%s: %v", groupName, configIndex, err)
return NewBackupError("failed to create backup strategy for "+groupName+"."+configIndex, err)
}
// Create and run service
service := NewService(
groupName+"."+configIndex,
serviceGroup.Source.Path,
backupStrategy,
e.Mailer,
)
if err := service.Backup(ctx); err != nil {
return NewBackupError("backup execution failed for "+groupName+"."+configIndex, err)
}
return nil
}