package core import ( "backea/internal/backup/config" "backea/internal/backup/strategy" "backea/internal/mail" "context" "fmt" "log" "sync" ) // Executor handles the execution of backups for multiple services type Executor struct { Config *config.Configuration Factory strategy.Factory Mailer *mail.Mailer concurrency int } // NewExecutor creates a new backup executor func NewExecutor(config *config.Configuration, factory strategy.Factory) *Executor { return &Executor{ Config: config, Factory: factory, // Mailer: mailer, concurrency: 5, } } // SetConcurrency sets the maximum number of concurrent backups func (e *Executor) SetConcurrency(n int) { if n > 0 { e.concurrency = n } } // PerformBackups executes backups for multiple services based on configuration func (e *Executor) PerformBackups(ctx context.Context, serviceName string, serviceIndex string) error { // Process services based on parameters if serviceName != "" { // Process single service group or specific service if serviceIndex != "" { // Process specific service within a group return e.processSpecificService(ctx, serviceName, serviceIndex) } else { // Process all services in the specified group return e.processServiceGroup(ctx, serviceName) } } else { // Process all service groups in parallel return e.processAllServiceGroups(ctx) } } // processAllServiceGroups processes all service groups in parallel func (e *Executor) processAllServiceGroups(ctx context.Context) error { var wg sync.WaitGroup errs := make(chan error, len(e.Config.Services)) // Create a semaphore to limit concurrency sem := make(chan struct{}, e.concurrency) for groupName := range e.Config.Services { wg.Add(1) sem <- struct{}{} // Acquire semaphore go func(group string) { defer wg.Done() defer func() { <-sem }() // Release semaphore if err := e.processServiceGroup(ctx, group); err != nil { log.Printf("Failed to backup service group %s: %v", group, err) errs <- fmt.Errorf("backup failed for group %s: %w", group, err) } }(groupName) } // Wait for all backups to complete wg.Wait() close(errs) // Check if any errors occurred and return the last one var lastErr error for err := range errs { lastErr = err } return lastErr } // processServiceGroup handles the backup for all services in a group func (e *Executor) processServiceGroup(ctx context.Context, groupName string) error { // Get service group configuration serviceGroup, exists := e.Config.Services[groupName] if !exists { log.Printf("Service group not found: %s", groupName) return fmt.Errorf("service group not found: %s", groupName) } // Create hook runner hooks := NewHookRunner(serviceGroup.Source.Path, serviceGroup.Hooks.BeforeHook, serviceGroup.Hooks.AfterHook) // Execute the before hook once for the entire group if err := hooks.RunBeforeHook(); err != nil { log.Printf("Failed to execute before hook for group %s: %v", groupName, err) return fmt.Errorf("before hook failed: %w", err) } // Process all services in the group in parallel var wg sync.WaitGroup errs := make(chan error, len(serviceGroup.BackupConfigs)) // Create a semaphore to limit concurrency within the group sem := make(chan struct{}, e.concurrency) for configIndex := range serviceGroup.BackupConfigs { wg.Add(1) sem <- struct{}{} // Acquire semaphore go func(group, index string) { defer wg.Done() defer func() { <-sem }() // Release semaphore if err := e.processSpecificService(ctx, group, index); err != nil { log.Printf("Failed to backup service %s.%s: %v", group, index, err) errs <- fmt.Errorf("backup failed for %s.%s: %w", group, index, err) } }(groupName, configIndex) } // Wait for all backups to complete wg.Wait() close(errs) // Execute the after hook once for the entire group if err := hooks.RunAfterHook(); err != nil { log.Printf("Failed to execute after hook for group %s: %v", groupName, err) // Don't return here because we want to report backup errors as well } // Check if any errors occurred var lastErr error for err := range errs { lastErr = err } return lastErr } // processSpecificService handles the backup for a specific service in a group func (e *Executor) processSpecificService(ctx context.Context, groupName string, configIndex string) error { // Get service configuration serviceGroup, exists := e.Config.Services[groupName] if !exists { return fmt.Errorf("service group not found: %s", groupName) } // Check if the config index exists _, exists = serviceGroup.BackupConfigs[configIndex] if !exists { return fmt.Errorf("service index not found: %s.%s", groupName, configIndex) } // Create the appropriate backup strategy using the factory backupStrategy, err := e.Factory.CreateBackupStrategyForService(groupName, configIndex) if err != nil { log.Printf("Failed to create backup strategy for service %s.%s: %v", groupName, configIndex, err) return fmt.Errorf("failed to create backup strategy: %w", err) } // Create and run service service := NewService( fmt.Sprintf("%s.%s", groupName, configIndex), serviceGroup.Source.Path, backupStrategy, e.Mailer, ) return service.Backup(ctx) }