177 lines
5.1 KiB
Go
177 lines
5.1 KiB
Go
package core
|
|
|
|
import (
|
|
"backea/internal/backup/models"
|
|
"backea/internal/backup/strategy"
|
|
"backea/internal/mail"
|
|
"context"
|
|
"fmt"
|
|
"log"
|
|
"sync"
|
|
)
|
|
|
|
// Executor handles the execution of backups for multiple services
|
|
type Executor struct {
|
|
Config *models.Configuration
|
|
Factory strategy.Factory
|
|
Mailer *mail.Mailer
|
|
concurrency int
|
|
}
|
|
|
|
// NewExecutor creates a new backup executor
|
|
func NewExecutor(config *models.Configuration, factory strategy.Factory) *Executor {
|
|
return &Executor{
|
|
Config: config,
|
|
Factory: factory,
|
|
// Mailer: mailer,
|
|
concurrency: 5,
|
|
}
|
|
}
|
|
|
|
// SetConcurrency sets the maximum number of concurrent backups
|
|
func (e *Executor) SetConcurrency(n int) {
|
|
if n > 0 {
|
|
e.concurrency = n
|
|
}
|
|
}
|
|
|
|
// PerformBackups executes backups for multiple services based on configuration
|
|
func (e *Executor) PerformBackups(ctx context.Context, serviceName string, serviceIndex string) error {
|
|
// Process services based on parameters
|
|
if serviceName != "" {
|
|
// Process single service group or specific service
|
|
if serviceIndex != "" {
|
|
// Process specific service within a group
|
|
return e.processSpecificService(ctx, serviceName, serviceIndex)
|
|
} else {
|
|
// Process all services in the specified group
|
|
return e.processServiceGroup(ctx, serviceName)
|
|
}
|
|
} else {
|
|
// Process all service groups in parallel
|
|
return e.processAllServiceGroups(ctx)
|
|
}
|
|
}
|
|
|
|
// processAllServiceGroups processes all service groups in parallel
|
|
func (e *Executor) processAllServiceGroups(ctx context.Context) error {
|
|
var wg sync.WaitGroup
|
|
errs := make(chan error, len(e.Config.Services))
|
|
|
|
// Create a semaphore to limit concurrency
|
|
sem := make(chan struct{}, e.concurrency)
|
|
|
|
for groupName := range e.Config.Services {
|
|
wg.Add(1)
|
|
sem <- struct{}{} // Acquire semaphore
|
|
go func(group string) {
|
|
defer wg.Done()
|
|
defer func() { <-sem }() // Release semaphore
|
|
|
|
if err := e.processServiceGroup(ctx, group); err != nil {
|
|
log.Printf("Failed to backup service group %s: %v", group, err)
|
|
errs <- fmt.Errorf("backup failed for group %s: %w", group, err)
|
|
}
|
|
}(groupName)
|
|
}
|
|
|
|
// Wait for all backups to complete
|
|
wg.Wait()
|
|
close(errs)
|
|
|
|
// Check if any errors occurred and return the last one
|
|
var lastErr error
|
|
for err := range errs {
|
|
lastErr = err
|
|
}
|
|
return lastErr
|
|
}
|
|
|
|
// processServiceGroup handles the backup for all services in a group
|
|
func (e *Executor) processServiceGroup(ctx context.Context, groupName string) error {
|
|
// Get service group configuration
|
|
serviceGroup, exists := e.Config.Services[groupName]
|
|
if !exists {
|
|
log.Printf("Service group not found: %s", groupName)
|
|
return fmt.Errorf("service group not found: %s", groupName)
|
|
}
|
|
|
|
// Create hook runner
|
|
hooks := NewHookRunner(serviceGroup.Source.Path, serviceGroup.Hooks.BeforeHook, serviceGroup.Hooks.AfterHook)
|
|
|
|
// Execute the before hook once for the entire group
|
|
if err := hooks.RunBeforeHook(); err != nil {
|
|
log.Printf("Failed to execute before hook for group %s: %v", groupName, err)
|
|
return fmt.Errorf("before hook failed: %w", err)
|
|
}
|
|
|
|
// Process all services in the group in parallel
|
|
var wg sync.WaitGroup
|
|
errs := make(chan error, len(serviceGroup.BackupConfigs))
|
|
|
|
// Create a semaphore to limit concurrency within the group
|
|
sem := make(chan struct{}, e.concurrency)
|
|
|
|
for configIndex := range serviceGroup.BackupConfigs {
|
|
wg.Add(1)
|
|
sem <- struct{}{} // Acquire semaphore
|
|
go func(group, index string) {
|
|
defer wg.Done()
|
|
defer func() { <-sem }() // Release semaphore
|
|
|
|
if err := e.processSpecificService(ctx, group, index); err != nil {
|
|
log.Printf("Failed to backup service %s.%s: %v", group, index, err)
|
|
errs <- fmt.Errorf("backup failed for %s.%s: %w", group, index, err)
|
|
}
|
|
}(groupName, configIndex)
|
|
}
|
|
|
|
// Wait for all backups to complete
|
|
wg.Wait()
|
|
close(errs)
|
|
|
|
// Execute the after hook once for the entire group
|
|
if err := hooks.RunAfterHook(); err != nil {
|
|
log.Printf("Failed to execute after hook for group %s: %v", groupName, err)
|
|
// Don't return here because we want to report backup errors as well
|
|
}
|
|
|
|
// Check if any errors occurred
|
|
var lastErr error
|
|
for err := range errs {
|
|
lastErr = err
|
|
}
|
|
return lastErr
|
|
}
|
|
|
|
// processSpecificService handles the backup for a specific service in a group
|
|
func (e *Executor) processSpecificService(ctx context.Context, groupName string, configIndex string) error {
|
|
// Get service configuration
|
|
serviceGroup, exists := e.Config.Services[groupName]
|
|
if !exists {
|
|
return fmt.Errorf("service group not found: %s", groupName)
|
|
}
|
|
|
|
// Check if the config index exists
|
|
_, exists = serviceGroup.BackupConfigs[configIndex]
|
|
if !exists {
|
|
return fmt.Errorf("service index not found: %s.%s", groupName, configIndex)
|
|
}
|
|
|
|
// Create the appropriate backup strategy using the factory
|
|
backupStrategy, err := e.Factory.CreateBackupStrategyForService(groupName, configIndex)
|
|
if err != nil {
|
|
log.Printf("Failed to create backup strategy for service %s.%s: %v", groupName, configIndex, err)
|
|
return fmt.Errorf("failed to create backup strategy: %w", err)
|
|
}
|
|
|
|
// Create and run service
|
|
service := NewService(
|
|
fmt.Sprintf("%s.%s", groupName, configIndex),
|
|
serviceGroup.Source.Path,
|
|
backupStrategy,
|
|
e.Mailer,
|
|
)
|
|
return service.Backup(ctx)
|
|
}
|