2025-03-25 21:57:23 +01:00

631 lines
18 KiB
Go

package kopia
import (
"archive/zip"
"context"
"encoding/json"
"fmt"
"io"
"log"
"os"
"os/exec"
"path/filepath"
"strconv"
"strings"
"time"
"backea/internal/backup/models"
"backea/internal/backup/security"
)
// Strategy implements the backup strategy using Kopia
type Strategy struct {
Retention Retention
Provider Provider
ConfigPath string
SourcePath string
}
// Retention represents retention policy for backups
type Retention struct {
KeepLatest int
KeepHourly int
KeepDaily int
KeepWeekly int
KeepMonthly int
KeepYearly int
}
// NewStrategy creates a new kopia strategy with specified provider
func NewStrategy(retention Retention, provider Provider, configPath string, sourcePath string) *Strategy {
return &Strategy{
Retention: retention,
Provider: provider,
ConfigPath: configPath,
SourcePath: sourcePath,
}
}
// Execute performs the kopia backup
func (s *Strategy) Execute(ctx context.Context, serviceName, directory string) error {
log.Printf("Performing kopia backup for service %s: %s", serviceName, directory)
// Get or create password for this service
password, err := security.GetOrCreatePassword(serviceName, 48)
if err != nil {
return fmt.Errorf("failed to get or create password: %w", err)
}
// Ensure Kopia config directory exists
kopiaConfigDir := filepath.Join(os.Getenv("HOME"), ".kopia")
if err := os.MkdirAll(kopiaConfigDir, 0755); err != nil {
return fmt.Errorf("failed to create kopia config directory: %w", err)
}
// Repository not connected, connect or create via provider
log.Printf("Connecting to repository for %s", serviceName)
if err := s.Provider.Connect(ctx, serviceName, password, s.ConfigPath); err != nil {
return fmt.Errorf("failed to connect to repository: %w", err)
}
// Create snapshot
log.Printf("Creating snapshot for directory: %s", directory)
snapshotCmd := exec.Command("kopia", "--config-file", s.ConfigPath, "snapshot", "create", directory)
snapshotOutput, err := snapshotCmd.CombinedOutput()
if err != nil {
return fmt.Errorf("failed to create snapshot: %w\nOutput: %s", err, snapshotOutput)
}
// Set retention policy
log.Printf("Setting retention policy for %s", serviceName)
args := []string{
"--config-file", s.ConfigPath,
"policy", "set",
"--keep-latest", fmt.Sprintf("%d", s.Retention.KeepLatest),
"--keep-hourly", fmt.Sprintf("%d", s.Retention.KeepHourly),
"--keep-daily", fmt.Sprintf("%d", s.Retention.KeepDaily),
"--keep-weekly", fmt.Sprintf("%d", s.Retention.KeepWeekly),
"--keep-monthly", fmt.Sprintf("%d", s.Retention.KeepMonthly),
"--keep-annual", fmt.Sprintf("%d", s.Retention.KeepYearly),
directory,
}
policyCmd := exec.Command("kopia", args...)
policyOutput, err := policyCmd.CombinedOutput()
if err != nil {
return fmt.Errorf("failed to set policy: %w\nOutput: %s", err, policyOutput)
}
log.Printf("Snapshot and policy set successfully for %s", serviceName)
return nil
}
// ListBackups returns information about existing backups
func (s *Strategy) ListBackups(ctx context.Context, serviceName string) ([]models.BackupInfo, error) {
// Parse service group and index from service name
_, _, err := parseServiceName(serviceName)
if err != nil {
return nil, fmt.Errorf("invalid service name format: %w", err)
}
// Use the source path from the Strategy instead of trying to get it from the factory
directoryPath := s.SourcePath
if directoryPath == "" {
return nil, fmt.Errorf("source path not specified")
}
// Trim trailing slash if any
directoryPath = strings.TrimRight(directoryPath, "/")
// Ensure we're connected to the repository
err = s.EnsureRepositoryConnected(ctx, serviceName)
if err != nil {
return nil, fmt.Errorf("failed to connect to repository: %w", err)
}
// Run kopia snapshot list command with JSON output
cmd := exec.CommandContext(
ctx,
"kopia",
"--config-file", s.ConfigPath,
"snapshot",
"list",
"--json",
)
output, err := cmd.Output()
if err != nil {
return nil, fmt.Errorf("failed to list snapshots: %w", err)
}
// Parse the JSON output
var snapshots []map[string]interface{}
if err := json.Unmarshal(output, &snapshots); err != nil {
return nil, fmt.Errorf("failed to parse snapshot list: %w", err)
}
// Convert to BackupInfo
var result []models.BackupInfo
for _, snap := range snapshots {
// Get basic info
id, _ := snap["id"].(string)
endTime, _ := snap["endTime"].(string)
// Only include snapshots for the requested service by directory path
source, ok := snap["source"].(map[string]interface{})
if !ok {
continue
}
sourcePath, _ := source["path"].(string)
// Match by exact directory path, not service name
if sourcePath != directoryPath {
continue
}
// Parse time
var creationTime time.Time
if endTime != "" {
t, err := time.Parse(time.RFC3339, endTime)
if err == nil {
creationTime = t
}
}
// Get size information from stats
var size int64
if stats, ok := snap["stats"].(map[string]interface{}); ok {
if totalSize, ok := stats["totalSize"].(float64); ok {
size = int64(totalSize)
}
}
// Determine retention tag
retentionTag := "none"
if reasons, ok := snap["retentionReason"].([]interface{}); ok && len(reasons) > 0 {
// Get the first reason which indicates the highest priority retention
if reason, ok := reasons[0].(string); ok {
parts := strings.SplitN(reason, "-", 2)
if len(parts) > 0 {
retentionTag = parts[0]
}
}
}
result = append(result, models.BackupInfo{
ID: id,
CreationTime: creationTime,
Size: size,
Source: sourcePath,
Type: "kopia",
RetentionTag: retentionTag,
})
}
return result, nil
}
// GetStorageUsage returns information about the total storage used by the repository
func (s *Strategy) GetStorageUsage(ctx context.Context, serviceName string) (*models.StorageUsageInfo, error) {
// Ensure we're connected to the repository
err := s.EnsureRepositoryConnected(ctx, serviceName)
if err != nil {
return nil, fmt.Errorf("failed to connect to repository: %w", err)
}
// Get provider type (b2, local, sftp)
providerType := "unknown"
providerType = s.Provider.GetProviderType()
// Initialize storage info
info := &models.StorageUsageInfo{
Provider: providerType,
ProviderID: s.Provider.GetBucketName(serviceName),
}
// Calculate logical size by summing up the sizes of all snapshots
backups, err := s.ListBackups(ctx, serviceName)
if err == nil {
var totalLogicalBytes int64
for _, backup := range backups {
totalLogicalBytes += backup.Size
}
info.TotalBytes = totalLogicalBytes
}
// Try to get physical storage stats using blob stats command
blobStatsCmd := exec.CommandContext(
ctx,
"kopia",
"--config-file", s.ConfigPath,
"blob",
"stats",
)
blobStatsOutput, err := blobStatsCmd.CombinedOutput()
if err == nil {
// Parse the text output
outputStr := string(blobStatsOutput)
log.Printf("Blob stats output: %s", outputStr)
// Look for the line with "Total:"
lines := strings.Split(outputStr, "\n")
for _, line := range lines {
line = strings.TrimSpace(line)
if strings.HasPrefix(line, "Total:") {
parts := strings.SplitN(line, ":", 2)
if len(parts) == 2 {
sizeStr := strings.TrimSpace(parts[1])
size, err := parseHumanSize(sizeStr)
if err == nil {
info.TotalBytes = size
log.Printf("Got physical size from blob stats: %d bytes", size)
break
} else {
log.Printf("Failed to parse size '%s': %v", sizeStr, err)
}
}
}
}
} else {
log.Printf("Blob stats command failed: %v - %s", err, string(blobStatsOutput))
}
return info, nil
}
// RestoreBackup restores a backup with the given ID
func (s *Strategy) RestoreBackup(ctx context.Context, backupID string, serviceName string) error {
// Ensure repository is connected
err := s.EnsureRepositoryConnected(ctx, serviceName)
if err != nil {
return fmt.Errorf("failed to connect to repository: %w", err)
}
// Get source directory from strategy instead of importing factory
targetDir := s.SourcePath
if targetDir == "" {
return fmt.Errorf("source path not specified")
}
// Create a temporary directory for restore
restoreDir := filepath.Join(os.TempDir(), fmt.Sprintf("backea-restore-%s-%d", serviceName, time.Now().Unix()))
if err := os.MkdirAll(restoreDir, 0755); err != nil {
return fmt.Errorf("failed to create restore directory: %w", err)
}
log.Printf("Restoring backup %s to temporary directory %s", backupID, restoreDir)
// Run kopia restore command to restore the snapshot to the temporary directory
restoreCmd := exec.CommandContext(
ctx,
"kopia",
"--config-file", s.ConfigPath,
"snapshot",
"restore",
backupID,
restoreDir,
)
restoreOutput, err := restoreCmd.CombinedOutput()
if err != nil {
return fmt.Errorf("failed to restore snapshot: %w\nOutput: %s", err, restoreOutput)
}
// Now we need to sync the restored data to the original directory
log.Printf("Syncing restored data from %s to %s", restoreDir, targetDir)
// Use rsync for the final transfer to avoid permissions issues
syncCmd := exec.CommandContext(
ctx,
"rsync",
"-av",
"--delete", // Delete extraneous files from target
restoreDir+"/", // Source directory with trailing slash to copy contents
targetDir, // Target directory
)
syncOutput, err := syncCmd.CombinedOutput()
if err != nil {
return fmt.Errorf("failed to sync restored data: %w\nOutput: %s", err, syncOutput)
}
// Clean up the temporary directory
go func() {
time.Sleep(5 * time.Minute) // Wait 5 minutes before cleaning up
log.Printf("Cleaning up temporary restore directory %s", restoreDir)
os.RemoveAll(restoreDir)
}()
log.Printf("Successfully restored backup %s to %s", backupID, targetDir)
return nil
}
// DownloadBackup provides a reader with backup summary information in text format
func (s *Strategy) DownloadBackup(ctx context.Context, backupID string, serviceName string) (io.ReadCloser, error) {
// Ensure repository is connected
err := s.EnsureRepositoryConnected(ctx, serviceName)
if err != nil {
return nil, fmt.Errorf("failed to connect to repository: %w", err)
}
// Create a single temporary directory for restore and ZIP creation
// Note: We're not creating a nested "restore" subdirectory anymore
tempDir, err := os.MkdirTemp("", "backea-download-*")
if err != nil {
return nil, fmt.Errorf("failed to create temporary directory: %w", err)
}
zipFile := filepath.Join(tempDir, "backup.zip")
// Restore the snapshot directly to the temporary directory
log.Printf("Restoring snapshot %s to temporary directory %s", backupID, tempDir)
restoreCmd := exec.CommandContext(
ctx,
"kopia",
"--config-file", s.ConfigPath,
"snapshot",
"restore",
backupID,
tempDir, // Restore directly to tempDir instead of a subdirectory
)
restoreOutput, err := restoreCmd.CombinedOutput()
if err != nil {
os.RemoveAll(tempDir)
return nil, fmt.Errorf("failed to restore snapshot: %w\nOutput: %s", err, restoreOutput)
}
// Create ZIP archive of the restored files
log.Printf("Creating ZIP archive at %s", zipFile)
// Use Go's zip package instead of command line tools
zipWriter, err := os.Create(zipFile)
if err != nil {
os.RemoveAll(tempDir)
return nil, fmt.Errorf("failed to create zip file: %w", err)
}
defer zipWriter.Close()
// Create ZIP writer
archive := zip.NewWriter(zipWriter)
defer archive.Close()
// Walk the restore directory and add files to ZIP with error handling
err = filepath.Walk(tempDir, func(path string, info os.FileInfo, err error) error {
// Handle errors accessing files/directories
if err != nil {
log.Printf("Warning: Error accessing path %s: %v", path, err)
return nil // Skip this file but continue walking
}
// Skip the zip file itself to avoid recursion
if path == zipFile {
return nil
}
// Skip directories, we only want to add files
if info.IsDir() {
return nil
}
// Create ZIP header
header, err := zip.FileInfoHeader(info)
if err != nil {
log.Printf("Warning: Couldn't create header for %s: %v", path, err)
return nil // Skip this file but continue walking
}
// Make the path relative to the temp directory
relPath, err := filepath.Rel(tempDir, path)
if err != nil {
log.Printf("Warning: Couldn't get relative path for %s: %v", path, err)
return nil // Skip this file but continue walking
}
// Set the name in the archive to the relative path
header.Name = relPath
// Create the file in the ZIP
writer, err := archive.CreateHeader(header)
if err != nil {
log.Printf("Warning: Couldn't create zip entry for %s: %v", path, err)
return nil // Skip this file but continue walking
}
// Open the file
file, err := os.Open(path)
if err != nil {
log.Printf("Warning: Couldn't open file %s: %v", path, err)
return nil // Skip this file but continue walking
}
defer file.Close()
// Copy the file content to the ZIP
_, err = io.Copy(writer, file)
if err != nil {
log.Printf("Warning: Error copying content from %s: %v", path, err)
}
return nil // Continue to next file regardless of error
})
// Even if we had some file errors, don't fail the whole process
// as long as we created the zip file
if err != nil {
log.Printf("Some files may have been skipped during zip creation: %v", err)
}
// Close the ZIP writer before opening it for reading
archive.Close()
zipWriter.Close()
// Open the ZIP file for reading
zipReader, err := os.Open(zipFile)
if err != nil {
os.RemoveAll(tempDir)
return nil, fmt.Errorf("failed to open zip archive: %w", err)
}
// Return a reader that will clean up when closed
return &cleanupReadCloser{
ReadCloser: zipReader,
cleanup: func() {
zipReader.Close()
os.RemoveAll(tempDir)
},
}, nil
}
// GetBackupInfo returns detailed information about a specific backup
func (s *Strategy) GetBackupInfo(ctx context.Context, backupID string, serviceName string) (*models.BackupInfo, error) {
err := s.EnsureRepositoryConnected(ctx, serviceName)
if err != nil {
return nil, fmt.Errorf("failed to connect to repository: %w", err)
}
// Run kopia snapshot describe command with JSON output
cmd := exec.CommandContext(
ctx,
"kopia",
"--config-file", s.ConfigPath,
"snapshot",
"describe",
"--json",
backupID,
)
output, err := cmd.Output()
if err != nil {
return nil, fmt.Errorf("failed to describe snapshot: %w", err)
}
// Parse the JSON output
var snap map[string]interface{}
if err := json.Unmarshal(output, &snap); err != nil {
return nil, fmt.Errorf("failed to parse snapshot info: %w", err)
}
// Extract the relevant information
id, _ := snap["id"].(string)
endTime, _ := snap["endTime"].(string)
// Parse time
var creationTime time.Time
if endTime != "" {
t, err := time.Parse(time.RFC3339, endTime)
if err == nil {
creationTime = t
}
}
// Get size information from stats
var size int64
if stats, ok := snap["stats"].(map[string]interface{}); ok {
if totalSize, ok := stats["totalSize"].(float64); ok {
size = int64(totalSize)
}
}
// Get source path
sourcePath := ""
if source, ok := snap["source"].(map[string]interface{}); ok {
if path, ok := source["path"].(string); ok {
sourcePath = path
}
}
// Determine retention tag
retentionTag := "none"
if reasons, ok := snap["retentionReason"].([]interface{}); ok && len(reasons) > 0 {
// Get the first reason which indicates the highest priority retention
if reason, ok := reasons[0].(string); ok {
parts := strings.SplitN(reason, "-", 2)
if len(parts) > 0 {
retentionTag = parts[0]
}
}
}
return &models.BackupInfo{
ID: id,
CreationTime: creationTime,
Size: size,
Source: sourcePath,
Type: "kopia",
RetentionTag: retentionTag,
}, nil
}
// Helper method to ensure the repository is connected before operations
func (s *Strategy) EnsureRepositoryConnected(ctx context.Context, serviceName string) error {
// Check if kopia repository is already connected with config file
cmd := exec.Command("kopia", "--config-file", s.ConfigPath, "repository", "status")
err := cmd.Run()
if err != nil {
// Repository not connected, try to connect
password, err := security.GetOrCreatePassword(serviceName, 48)
if err != nil {
return fmt.Errorf("failed to get password: %w", err)
}
// Connect using provider
if err := s.Provider.Connect(ctx, serviceName, password, s.ConfigPath); err != nil {
return fmt.Errorf("failed to connect to repository: %w", err)
}
}
return nil
}
// parseServiceName splits a service name in the format "group.index" into its components
func parseServiceName(serviceName string) (string, string, error) {
parts := strings.SplitN(serviceName, ".", 2)
if len(parts) != 2 {
return "", "", fmt.Errorf("service name must be in format 'group.index', got '%s'", serviceName)
}
return parts[0], parts[1], nil
}
// parseHumanSize parses a human-readable size string (e.g., "32.8 MB") into bytes
func parseHumanSize(sizeStr string) (int64, error) {
parts := strings.Fields(sizeStr)
if len(parts) != 2 {
return 0, fmt.Errorf("invalid size format: %s", sizeStr)
}
value, err := strconv.ParseFloat(parts[0], 64)
if err != nil {
return 0, fmt.Errorf("invalid size value: %w", err)
}
unit := strings.ToUpper(parts[1])
switch unit {
case "B":
return int64(value), nil
case "KB", "KIB":
return int64(value * 1024), nil
case "MB", "MIB":
return int64(value * 1024 * 1024), nil
case "GB", "GIB":
return int64(value * 1024 * 1024 * 1024), nil
case "TB", "TIB":
return int64(value * 1024 * 1024 * 1024 * 1024), nil
default:
return 0, fmt.Errorf("unknown size unit: %s", unit)
}
}
// cleanupReadCloser is a wrapper around io.ReadCloser that performs cleanup when closed
type cleanupReadCloser struct {
io.ReadCloser
cleanup func()
}
// Close closes the underlying ReadCloser and performs cleanup
func (c *cleanupReadCloser) Close() error {
err := c.ReadCloser.Close()
c.cleanup()
return err
}