package core import ( "backea/internal/backup/models" "backea/internal/backup/strategy" "backea/internal/logging" "backea/internal/mail" "context" "sync" ) // Executor handles the execution of backups for multiple services type Executor struct { Config *models.Configuration Factory strategy.Factory Mailer *mail.Mailer concurrency int Logger logging.Logger } // NewExecutor creates a new backup executor func NewExecutor(config *models.Configuration, factory strategy.Factory) *Executor { return &Executor{ Config: config, Factory: factory, // Mailer: mailer, concurrency: 5, Logger: *logging.GetLogger(), } } // SetConcurrency sets the maximum number of concurrent backups func (e *Executor) SetConcurrency(n int) { if n > 0 { e.concurrency = n } } // PerformBackups executes backups for multiple services based on configuration func (e *Executor) PerformBackups(ctx context.Context, serviceName string, serviceIndex string) error { // Process services based on parameters if serviceName != "" { // Process single service group or specific service if serviceIndex != "" { // Process specific service within a group return e.processSpecificService(ctx, serviceName, serviceIndex) } else { // Process all services in the specified group return e.processServiceGroup(ctx, serviceName) } } else { // Process all service groups in parallel return e.processAllServiceGroups(ctx) } } // processAllServiceGroups processes all service groups in parallel func (e *Executor) processAllServiceGroups(ctx context.Context) error { var wg sync.WaitGroup errs := make(chan error, len(e.Config.Services)) // Create a semaphore to limit concurrency sem := make(chan struct{}, e.concurrency) for groupName := range e.Config.Services { wg.Add(1) sem <- struct{}{} // Acquire semaphore go func(group string) { defer wg.Done() defer func() { <-sem }() // Release semaphore if err := e.processServiceGroup(ctx, group); err != nil { e.Logger.Error("Failed to backup service group %s: %v", group, err) errs <- NewBackupError("backup failed for group "+group, err) } }(groupName) } // Wait for all backups to complete wg.Wait() close(errs) // Check if any errors occurred and return the last one var lastErr error for err := range errs { lastErr = err } return lastErr } // processServiceGroup handles the backup for all services in a group func (e *Executor) processServiceGroup(ctx context.Context, groupName string) error { // Get service group configuration serviceGroup, exists := e.Config.Services[groupName] if !exists { e.Logger.Error("Service group not found: %s", groupName) return NewBackupError("service group not found: "+groupName, nil) } // Create hook runner hooks := NewHookRunner(serviceGroup.Source.Path, serviceGroup.Hooks.BeforeHook, serviceGroup.Hooks.AfterHook) // Execute the before hook once for the entire group if err := hooks.RunBeforeHook(); err != nil { e.Logger.Error("Failed to execute before hook for group %s: %v", groupName, err) return NewBackupError("before hook failed for group "+groupName, err) } // Process all services in the group in parallel var wg sync.WaitGroup errs := make(chan error, len(serviceGroup.BackupConfigs)) // Create a semaphore to limit concurrency within the group sem := make(chan struct{}, e.concurrency) for configIndex := range serviceGroup.BackupConfigs { wg.Add(1) sem <- struct{}{} // Acquire semaphore go func(group, index string) { defer wg.Done() defer func() { <-sem }() // Release semaphore if err := e.processSpecificService(ctx, group, index); err != nil { e.Logger.Error("Failed to backup service %s.%s: %v", group, index, err) errs <- NewBackupError("backup failed for "+group+"."+index, err) } }(groupName, configIndex) } // Wait for all backups to complete wg.Wait() close(errs) var lastErr error for err := range errs { lastErr = err } // Execute the after hook once for the entire group if err := hooks.RunAfterHook(); err != nil { e.Logger.Error("Failed to execute after hook for group %s: %v", groupName, err) // If no backup errors occurred, return the after hook error, // otherwise prioritize the backup errors if lastErr == nil { lastErr = NewBackupError("after hook failed for group "+groupName, err) } } return lastErr } // processSpecificService handles the backup for a specific service in a group func (e *Executor) processSpecificService(ctx context.Context, groupName string, configIndex string) error { // Get service configuration serviceGroup, exists := e.Config.Services[groupName] if !exists { return NewBackupError("service group not found: "+groupName, nil) } // Check if the config index exists _, exists = serviceGroup.BackupConfigs[configIndex] if !exists { return NewBackupError("service index not found: "+groupName+"."+configIndex, nil) } // Create the appropriate backup strategy using the factory backupStrategy, err := e.Factory.CreateBackupStrategyForService(groupName, configIndex) if err != nil { e.Logger.Error("Failed to create backup strategy for service %s.%s: %v", groupName, configIndex, err) return NewBackupError("failed to create backup strategy for "+groupName+"."+configIndex, err) } // Create and run service service := NewService( groupName+"."+configIndex, serviceGroup.Source.Path, backupStrategy, e.Mailer, ) if err := service.Backup(ctx); err != nil { return NewBackupError("backup execution failed for "+groupName+"."+configIndex, err) } return nil }