esync

Directory watching and remote syncing
Log | Files | Refs | README | LICENSE

sync.go (14310B)


      1 package cmd
      2 
      3 import (
      4 	"context"
      5 	"fmt"
      6 	"os"
      7 	"os/signal"
      8 	"path/filepath"
      9 	"regexp"
     10 	"strings"
     11 	"sync"
     12 	"syscall"
     13 	"time"
     14 
     15 	tea "github.com/charmbracelet/bubbletea"
     16 	"github.com/spf13/cobra"
     17 
     18 	"github.com/louloulibs/esync/internal/config"
     19 	"github.com/louloulibs/esync/internal/logger"
     20 	"github.com/louloulibs/esync/internal/syncer"
     21 	"github.com/louloulibs/esync/internal/tui"
     22 	"github.com/louloulibs/esync/internal/watcher"
     23 )
     24 
     25 // ---------------------------------------------------------------------------
     26 // Flags
     27 // ---------------------------------------------------------------------------
     28 
     29 var (
     30 	localPath   string
     31 	remotePath  string
     32 	daemon      bool
     33 	dryRun      bool
     34 	initialSync bool
     35 	verbose     bool
     36 )
     37 
     38 // ---------------------------------------------------------------------------
     39 // Command
     40 // ---------------------------------------------------------------------------
     41 
     42 var syncCmd = &cobra.Command{
     43 	Use:   "sync",
     44 	Short: "Watch and sync files to a remote destination",
     45 	Long:  "Watch a local directory for changes and automatically sync them to a remote destination using rsync.",
     46 	RunE:  runSync,
     47 }
     48 
     49 func init() {
     50 	syncCmd.Flags().StringVarP(&localPath, "local", "l", "", "local path to watch")
     51 	syncCmd.Flags().StringVarP(&remotePath, "remote", "r", "", "remote destination path")
     52 	syncCmd.Flags().BoolVar(&daemon, "daemon", false, "run in daemon mode (no TUI)")
     53 	syncCmd.Flags().BoolVar(&dryRun, "dry-run", false, "show what would be synced without syncing")
     54 	syncCmd.Flags().BoolVar(&initialSync, "initial-sync", false, "force a full sync on startup")
     55 	syncCmd.Flags().BoolVarP(&verbose, "verbose", "v", false, "verbose output")
     56 
     57 	rootCmd.AddCommand(syncCmd)
     58 }
     59 
     60 // reProgress2 matches the percentage in rsync --info=progress2 output lines.
     61 var reProgress2 = regexp.MustCompile(`(\d+)%`)
     62 
     63 // ---------------------------------------------------------------------------
     64 // Config loading
     65 // ---------------------------------------------------------------------------
     66 
     67 // loadOrBuildConfig resolves configuration from CLI flags, a config file, or
     68 // builds a minimal config in memory when --local and --remote are both given.
     69 func loadOrBuildConfig() (*config.Config, error) {
     70 	// 1. Explicit config file via -c flag
     71 	if cfgFile != "" {
     72 		cfg, err := config.Load(cfgFile)
     73 		if err != nil {
     74 			return nil, fmt.Errorf("loading config %s: %w", cfgFile, err)
     75 		}
     76 		applyCLIOverrides(cfg)
     77 		return cfg, nil
     78 	}
     79 
     80 	// 2. Quick mode: both --local and --remote provided
     81 	if localPath != "" && remotePath != "" {
     82 		cfg := &config.Config{
     83 			Sync: config.SyncSection{
     84 				Local:    localPath,
     85 				Remote:   remotePath,
     86 				Interval: 1,
     87 			},
     88 			Settings: config.Settings{
     89 				WatcherDebounce: 500,
     90 				InitialSync:     initialSync,
     91 				Ignore:          []string{".git", "node_modules", ".DS_Store"},
     92 				Rsync: config.RsyncSettings{
     93 					Archive:  true,
     94 					Compress: true,
     95 				},
     96 			},
     97 		}
     98 		return cfg, nil
     99 	}
    100 
    101 	// 3. Auto-detect config file
    102 	path := config.FindConfigFile()
    103 	if path == "" {
    104 		return nil, fmt.Errorf("no config file found; use -c, or provide both -l and -r")
    105 	}
    106 
    107 	cfg, err := config.Load(path)
    108 	if err != nil {
    109 		return nil, fmt.Errorf("loading config %s: %w", path, err)
    110 	}
    111 	applyCLIOverrides(cfg)
    112 	return cfg, nil
    113 }
    114 
    115 // applyCLIOverrides applies command-line flag values onto a loaded config.
    116 func applyCLIOverrides(cfg *config.Config) {
    117 	if localPath != "" {
    118 		cfg.Sync.Local = localPath
    119 	}
    120 	if remotePath != "" {
    121 		cfg.Sync.Remote = remotePath
    122 	}
    123 	if initialSync {
    124 		cfg.Settings.InitialSync = true
    125 	}
    126 }
    127 
    128 // ---------------------------------------------------------------------------
    129 // Run entry point
    130 // ---------------------------------------------------------------------------
    131 
    132 func runSync(cmd *cobra.Command, args []string) error {
    133 	cfg, err := loadOrBuildConfig()
    134 	if err != nil {
    135 		return err
    136 	}
    137 
    138 	s := syncer.New(cfg)
    139 	s.DryRun = dryRun
    140 
    141 	// Optional initial sync
    142 	if cfg.Settings.InitialSync {
    143 		if verbose {
    144 			fmt.Println("Running initial sync...")
    145 		}
    146 		result, err := s.Run()
    147 		if err != nil {
    148 			fmt.Fprintf(os.Stderr, "Initial sync error: %s\n", result.ErrorMessage)
    149 		} else if verbose {
    150 			fmt.Printf("Initial sync complete: %d files, %s\n", result.FilesCount, formatSize(result.BytesTotal))
    151 		}
    152 	}
    153 
    154 	if daemon {
    155 		return runDaemon(cfg, s)
    156 	}
    157 	return runTUI(cfg, s)
    158 }
    159 
    160 // ---------------------------------------------------------------------------
    161 // TUI mode
    162 // ---------------------------------------------------------------------------
    163 
    164 // watchState holds the watcher and syncer that can be torn down and rebuilt.
    165 type watchState struct {
    166 	watcher  *watcher.Watcher
    167 	cancel   context.CancelFunc
    168 	inflight sync.WaitGroup
    169 }
    170 
    171 // startWatching creates a syncer, watcher, and sync handler from the given config.
    172 func startWatching(cfg *config.Config, syncCh chan<- tui.SyncEvent, logCh chan<- tui.LogEntry) (*watchState, error) {
    173 	ctx, cancel := context.WithCancel(context.Background())
    174 
    175 	s := syncer.New(cfg)
    176 	s.DryRun = dryRun
    177 
    178 	ws := &watchState{cancel: cancel}
    179 
    180 	handler := func() {
    181 		ws.inflight.Add(1)
    182 		defer ws.inflight.Done()
    183 
    184 		syncCh <- tui.SyncEvent{Status: "status:syncing"}
    185 
    186 		var lastPct string
    187 		onLine := func(line string) {
    188 			trimmed := strings.TrimSpace(line)
    189 			if trimmed == "" {
    190 				return
    191 			}
    192 			select {
    193 			case logCh <- tui.LogEntry{Time: time.Now(), Level: "INF", Message: trimmed}:
    194 			default:
    195 			}
    196 			if m := reProgress2.FindStringSubmatch(trimmed); len(m) > 1 {
    197 				pct := m[1]
    198 				if pct != lastPct {
    199 					lastPct = pct
    200 					select {
    201 					case syncCh <- tui.SyncEvent{Status: "status:syncing " + pct + "%"}:
    202 					default:
    203 					}
    204 				}
    205 			}
    206 		}
    207 
    208 		result, err := s.RunWithProgress(ctx, onLine)
    209 		now := time.Now()
    210 
    211 		if err != nil {
    212 			syncCh <- tui.SyncEvent{
    213 				File:   "sync error",
    214 				Status: "error",
    215 				Time:   now,
    216 			}
    217 			syncCh <- tui.SyncEvent{Status: "status:watching"}
    218 			return
    219 		}
    220 
    221 		groups := groupFilesByTopLevel(result.Files)
    222 
    223 		totalGroupBytes := int64(0)
    224 		totalGroupFiles := 0
    225 		for _, g := range groups {
    226 			totalGroupBytes += g.bytes
    227 			totalGroupFiles += g.count
    228 		}
    229 
    230 		for _, g := range groups {
    231 			file := g.name
    232 			bytes := g.bytes
    233 			if totalGroupBytes == 0 && result.BytesTotal > 0 && totalGroupFiles > 0 {
    234 				bytes = result.BytesTotal * int64(g.count) / int64(totalGroupFiles)
    235 			}
    236 			size := formatSize(bytes)
    237 			syncCh <- tui.SyncEvent{
    238 				File:      file,
    239 				Size:      size,
    240 				Duration:  result.Duration,
    241 				Status:    "synced",
    242 				Time:      now,
    243 				Files:     truncateFiles(g.files, 10),
    244 				FileCount: g.count,
    245 			}
    246 		}
    247 
    248 		if len(groups) == 0 && result.FilesCount > 0 {
    249 			syncCh <- tui.SyncEvent{
    250 				File:     fmt.Sprintf("%d files", result.FilesCount),
    251 				Size:     formatSize(result.BytesTotal),
    252 				Duration: result.Duration,
    253 				Status:   "synced",
    254 				Time:     now,
    255 			}
    256 		}
    257 
    258 		syncCh <- tui.SyncEvent{Status: "status:watching"}
    259 	}
    260 
    261 	w, err := watcher.New(
    262 		cfg.Sync.Local,
    263 		cfg.Settings.WatcherDebounce,
    264 		cfg.AllIgnorePatterns(),
    265 		cfg.Settings.Include,
    266 		handler,
    267 	)
    268 	if err != nil {
    269 		cancel()
    270 		return nil, fmt.Errorf("creating watcher: %w", err)
    271 	}
    272 
    273 	if err := w.Start(); err != nil {
    274 		cancel()
    275 		return nil, fmt.Errorf("starting watcher: %w", err)
    276 	}
    277 
    278 	ws.watcher = w
    279 	return ws, nil
    280 }
    281 
    282 // reportBrokenSymlinks sends prominent warnings about broken symlinks
    283 // to the TUI log channel.
    284 func reportBrokenSymlinks(broken []watcher.BrokenSymlink, logCh chan<- tui.LogEntry) {
    285 	if len(broken) == 0 {
    286 		return
    287 	}
    288 	now := time.Now()
    289 	logCh <- tui.LogEntry{
    290 		Time:    now,
    291 		Level:   "WRN",
    292 		Message: fmt.Sprintf("Found %d broken symlink(s) — directories containing them are not watched:", len(broken)),
    293 	}
    294 	for _, b := range broken {
    295 		logCh <- tui.LogEntry{
    296 			Time:    now,
    297 			Level:   "WRN",
    298 			Message: fmt.Sprintf("  %s -> %s", b.Path, b.Target),
    299 		}
    300 	}
    301 }
    302 
    303 func runTUI(cfg *config.Config, s *syncer.Syncer) error {
    304 	app := tui.NewApp(cfg.Sync.Local, cfg.Sync.Remote)
    305 	syncCh := app.SyncEventChan()
    306 	logCh := app.LogEntryChan()
    307 
    308 	ws, err := startWatching(cfg, syncCh, logCh)
    309 	if err != nil {
    310 		return err
    311 	}
    312 	reportBrokenSymlinks(ws.watcher.BrokenSymlinks, logCh)
    313 	app.SetWarnings(len(ws.watcher.BrokenSymlinks))
    314 
    315 	var wsMu sync.Mutex
    316 
    317 	// Handle resync requests
    318 	resyncCh := app.ResyncChan()
    319 	go func() {
    320 		for range resyncCh {
    321 			wsMu.Lock()
    322 			w := ws
    323 			wsMu.Unlock()
    324 			w.watcher.TriggerSync()
    325 		}
    326 	}()
    327 
    328 	p := tea.NewProgram(app, tea.WithAltScreen())
    329 
    330 	// Handle config reload
    331 	configCh := app.ConfigReloadChan()
    332 	go func() {
    333 		for newCfg := range configCh {
    334 			wsMu.Lock()
    335 			oldWs := ws
    336 			wsMu.Unlock()
    337 
    338 			// Tear down: stop watcher, wait for in-flight syncs
    339 			oldWs.watcher.Stop()
    340 			oldWs.inflight.Wait()
    341 			oldWs.cancel()
    342 
    343 			// Rebuild with new config
    344 			newWs, err := startWatching(newCfg, syncCh, logCh)
    345 			if err != nil {
    346 				select {
    347 				case syncCh <- tui.SyncEvent{Status: "status:error"}:
    348 				default:
    349 				}
    350 				continue
    351 			}
    352 
    353 			wsMu.Lock()
    354 			ws = newWs
    355 			wsMu.Unlock()
    356 
    357 			// Update paths via Bubbletea message (safe — goes through Update loop)
    358 			p.Send(tui.ConfigReloadedMsg{
    359 				Local:  newCfg.Sync.Local,
    360 				Remote: newCfg.Sync.Remote,
    361 			})
    362 
    363 			select {
    364 			case syncCh <- tui.SyncEvent{Status: "status:watching"}:
    365 			default:
    366 			}
    367 		}
    368 	}()
    369 
    370 	if _, err := p.Run(); err != nil {
    371 		wsMu.Lock()
    372 		w := ws
    373 		wsMu.Unlock()
    374 		w.watcher.Stop()
    375 		w.cancel()
    376 		return fmt.Errorf("TUI error: %w", err)
    377 	}
    378 
    379 	wsMu.Lock()
    380 	w := ws
    381 	wsMu.Unlock()
    382 	w.watcher.Stop()
    383 	w.cancel()
    384 	return nil
    385 }
    386 
    387 // ---------------------------------------------------------------------------
    388 // Daemon mode
    389 // ---------------------------------------------------------------------------
    390 
    391 func runDaemon(cfg *config.Config, s *syncer.Syncer) error {
    392 	// Write PID file so `esync status` can find us
    393 	pidPath := filepath.Join(os.TempDir(), "esync.pid")
    394 	os.WriteFile(pidPath, []byte(fmt.Sprintf("%d", os.Getpid())), 0644)
    395 	defer os.Remove(pidPath)
    396 
    397 	var log *logger.Logger
    398 	if cfg.Settings.Log.File != "" {
    399 		var err error
    400 		log, err = logger.New(cfg.Settings.Log.File, cfg.Settings.Log.Format)
    401 		if err != nil {
    402 			return fmt.Errorf("creating logger: %w", err)
    403 		}
    404 		defer log.Close()
    405 	}
    406 
    407 	fmt.Printf("esync daemon started (PID %d)\n", os.Getpid())
    408 	fmt.Printf("Watching: %s -> %s\n", cfg.Sync.Local, cfg.Sync.Remote)
    409 
    410 	if log != nil {
    411 		log.Info("started", map[string]interface{}{
    412 			"local":  cfg.Sync.Local,
    413 			"remote": cfg.Sync.Remote,
    414 			"pid":    os.Getpid(),
    415 		})
    416 	}
    417 
    418 	handler := func() {
    419 		result, err := s.Run()
    420 
    421 		if err != nil {
    422 			msg := result.ErrorMessage
    423 			if verbose {
    424 				fmt.Fprintf(os.Stderr, "Sync error: %s\n", msg)
    425 			}
    426 			if log != nil {
    427 				log.Error("sync_failed", map[string]interface{}{
    428 					"error": msg,
    429 				})
    430 			}
    431 			// Terminal bell on error
    432 			fmt.Print("\a")
    433 			return
    434 		}
    435 
    436 		if verbose {
    437 			fmt.Printf("Synced %d files (%s) in %s\n",
    438 				result.FilesCount,
    439 				formatSize(result.BytesTotal),
    440 				result.Duration.Truncate(time.Millisecond),
    441 			)
    442 		}
    443 		if log != nil {
    444 			log.Info("sync_complete", map[string]interface{}{
    445 				"files":    result.FilesCount,
    446 				"bytes":    result.BytesTotal,
    447 				"duration": result.Duration.String(),
    448 			})
    449 		}
    450 	}
    451 
    452 	w, err := watcher.New(
    453 		cfg.Sync.Local,
    454 		cfg.Settings.WatcherDebounce,
    455 		cfg.AllIgnorePatterns(),
    456 		cfg.Settings.Include,
    457 		handler,
    458 	)
    459 	if err != nil {
    460 		return fmt.Errorf("creating watcher: %w", err)
    461 	}
    462 
    463 	if err := w.Start(); err != nil {
    464 		return fmt.Errorf("starting watcher: %w", err)
    465 	}
    466 	defer w.Stop()
    467 
    468 	if len(w.BrokenSymlinks) > 0 {
    469 		fmt.Fprintf(os.Stderr, "Warning: %d broken symlink(s) found — directories containing them are not watched:\n", len(w.BrokenSymlinks))
    470 		for _, b := range w.BrokenSymlinks {
    471 			fmt.Fprintf(os.Stderr, "  %s -> %s\n", b.Path, b.Target)
    472 		}
    473 	}
    474 
    475 	// Block until SIGINT or SIGTERM
    476 	sigCh := make(chan os.Signal, 1)
    477 	signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
    478 	<-sigCh
    479 
    480 	if log != nil {
    481 		log.Info("stopping", nil)
    482 	}
    483 	fmt.Println("\nesync daemon stopped.")
    484 	return nil
    485 }
    486 
    487 // ---------------------------------------------------------------------------
    488 // Helpers
    489 // ---------------------------------------------------------------------------
    490 
    491 // formatSize converts a byte count to a human-readable string (B, KB, MB, GB).
    492 func formatSize(bytes int64) string {
    493 	switch {
    494 	case bytes >= 1<<30:
    495 		return fmt.Sprintf("%.1fGB", float64(bytes)/float64(1<<30))
    496 	case bytes >= 1<<20:
    497 		return fmt.Sprintf("%.1fMB", float64(bytes)/float64(1<<20))
    498 	case bytes >= 1<<10:
    499 		return fmt.Sprintf("%.1fKB", float64(bytes)/float64(1<<10))
    500 	default:
    501 		return fmt.Sprintf("%dB", bytes)
    502 	}
    503 }
    504 
    505 // groupedEvent represents a top-level directory or root file for the TUI.
    506 type groupedEvent struct {
    507 	name  string   // "cmd/" or "main.go"
    508 	count int      // number of files (1 for root files)
    509 	bytes int64    // total bytes
    510 	files []string // individual file paths within the group
    511 }
    512 
    513 // groupFilesByTopLevel collapses file entries into top-level directories
    514 // and root files. "cmd/sync.go" + "cmd/init.go" become one entry "cmd/" with count=2.
    515 // When a directory contains only one file, the full relative path is kept.
    516 func groupFilesByTopLevel(files []syncer.FileEntry) []groupedEvent {
    517 	dirMap := make(map[string]*groupedEvent)
    518 	// Track the original filename for single-file groups.
    519 	dirFirstFile := make(map[string]string)
    520 	var rootFiles []groupedEvent
    521 	var dirOrder []string
    522 
    523 	for _, f := range files {
    524 		parts := strings.SplitN(f.Name, "/", 2)
    525 		if len(parts) == 1 {
    526 			// Root-level file
    527 			rootFiles = append(rootFiles, groupedEvent{
    528 				name:  f.Name,
    529 				count: 1,
    530 				bytes: f.Bytes,
    531 			})
    532 		} else {
    533 			dir := parts[0] + "/"
    534 			if g, ok := dirMap[dir]; ok {
    535 				g.count++
    536 				g.bytes += f.Bytes
    537 				g.files = append(g.files, f.Name)
    538 			} else {
    539 				dirMap[dir] = &groupedEvent{
    540 					name:  dir,
    541 					count: 1,
    542 					bytes: f.Bytes,
    543 					files: []string{f.Name},
    544 				}
    545 				dirFirstFile[dir] = f.Name
    546 				dirOrder = append(dirOrder, dir)
    547 			}
    548 		}
    549 	}
    550 
    551 	var out []groupedEvent
    552 	for _, dir := range dirOrder {
    553 		g := *dirMap[dir]
    554 		if g.count == 1 {
    555 			g.name = dirFirstFile[dir]
    556 			g.files = nil
    557 		}
    558 		out = append(out, g)
    559 	}
    560 	out = append(out, rootFiles...)
    561 	return out
    562 }
    563 
    564 // truncateFiles returns at most n elements from files.
    565 func truncateFiles(files []string, n int) []string {
    566 	if len(files) <= n {
    567 		return files
    568 	}
    569 	return files[:n]
    570 }