diff --git a/reader/cloudtrail/cloudtrail.go b/reader/cloudtrail/cloudtrail.go index e71682015..ec98fcfac 100644 --- a/reader/cloudtrail/cloudtrail.go +++ b/reader/cloudtrail/cloudtrail.go @@ -98,9 +98,9 @@ func (r *Reader) Reset() (err error) { } func (r *Reader) Close() error { - log.Debugf("runner[%v] syncMgr.stopSync...", r.Meta.RunnerName) + log.Infof("Runner[%v] syncMgr.stopSync...", r.Meta.RunnerName) r.syncMgr.stopSync() - log.Debugf("runner[%v] syncMgr closed, wait for BufReader closed...", r.Meta.RunnerName) + log.Infof("Runner[%v] syncMgr closed, wait for BufReader closed...", r.Meta.RunnerName) return r.BufReader.Close() } @@ -185,10 +185,17 @@ func buildSyncOptions(conf conf.MapConf) (*syncOptions, error) { if opts.interval, err = time.ParseDuration(s); err != nil { return nil, invalidConfigError(reader.KeySyncInterval, s, err) } + if opts.interval.Nanoseconds() <= 0 { + opts.interval = 5 * time.Minute + } + s, _ = conf.GetStringOr(reader.KeySyncConcurrent, "5") if opts.concurrent, err = strconv.Atoi(s); err != nil { return nil, invalidConfigError(reader.KeySyncInterval, s, err) } + if opts.concurrent <= 0 { + opts.concurrent = 5 + } return &opts, nil } @@ -231,24 +238,18 @@ func makeSyncSource(bucket, prefix string) string { func (mgr *syncManager) startSync() { ticker := time.NewTicker(mgr.interval) defer ticker.Stop() - - if err := mgr.syncOnce(); err != nil { - log.Errorf("sync failed: %v", err) - } - -Sync: for { + if err := mgr.syncOnce(); err != nil { + log.Errorf("Runner[%v] daemon sync once failed: %v", mgr.meta.RunnerName, err) + } + select { - case <-ticker.C: - if err := mgr.syncOnce(); err != nil { - log.Errorf("sync failed: %v", err) - } case <-mgr.quitChan: - break Sync + log.Infof("Runner[%v] daemon has stopped from running", mgr.meta.RunnerName) + return + case <-ticker.C: } } - - log.Info("sync stopped working") } func (mgr *syncManager) syncOnce() error { @@ -261,8 +262,7 @@ func (mgr *syncManager) syncOnce() error { concurrent: mgr.concurrent, region: mgr.region, } - runner := newSyncRunner(ctx, mgr.quitChan) - return runner.Sync() + return newSyncRunner(ctx, mgr.quitChan).Sync() } func (mgr *syncManager) stopSync() { @@ -317,32 +317,85 @@ func validTarget(target string) bool { return false } +const maximumFlushSyncedFilesOneTime = 10 + +func storeSyncedFiles(f *os.File, syncedFiles map[string]bool) error { + if len(syncedFiles) <= 0 { + return nil + } + + for path := range syncedFiles { + f.WriteString(filepath.Base(path)) + f.WriteString("\n") + } + + return f.Sync() +} + +// Note: 非线程安全,需由调用者保证同步调用 func (s *syncRunner) syncToDir() error { - log.Info("syncing from s3...") + log.Infof("Runner[%v] syncing from s3...", s.meta.RunnerName) + + metastore, err := os.OpenFile(s.metastore, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0644) + if err != nil { + return fmt.Errorf("open metastore: %v", err) + } + defer metastore.Close() s3url := newS3Url(s.source) bucket, err := lookupBucket(s3url.Bucket(), s.auth, s.region) if err != nil { - return err + return fmt.Errorf("lookup bucket: %v", err) } sourceFiles := make(map[string]bool) sourceFiles, err = loadS3Files(bucket, s3url.Path(), sourceFiles, "") if err != nil { - return err + return fmt.Errorf("load s3 files: %v", err) } if s.syncedFiles == nil { s.syncedFiles, err = s.loadSyncedFiles() if err != nil { - return err + return fmt.Errorf("load synced files: %v", err) } } - err = s.concurrentSyncToDir(s3url, bucket, sourceFiles) - if err != nil { - return err - } - return s.storeSyncedFiles(sourceFiles) + syncedChan := make(chan string, s.concurrent) + doneChan := make(chan struct{}) + go func() { + syncedFiles := make(map[string]bool, maximumFlushSyncedFilesOneTime) + for file := range syncedChan { + syncedFiles[file] = true + + if len(syncedFiles) < maximumFlushSyncedFilesOneTime { + continue + } + + if err = storeSyncedFiles(metastore, syncedFiles); err != nil { + log.Errorf("Runner[%v] wrote synced files to %q failed: %v", s.meta.RunnerName, s.metastore) + } else { + log.Infof("Runner[%v] wrote %d synced files to %q", s.meta.RunnerName, len(syncedFiles), s.metastore) + + // Note: 可能导致在 Sync 失败的情况下部分文件名重复输入到 metastore 中,但比丢失已同步记录重新处理一遍相同数据结果要更加合理 + syncedFiles = make(map[string]bool, maximumFlushSyncedFilesOneTime) + } + } + + if err = storeSyncedFiles(metastore, syncedFiles); err != nil { + log.Errorf("Runner[%v] wrote synced files to %q failed: %v", s.meta.RunnerName, s.metastore) + } else { + log.Infof("Runner[%v] wrote %d synced files to %q", s.meta.RunnerName, len(syncedFiles), s.metastore) + } + + doneChan <- struct{}{} + }() + + s.concurrentSyncToDir(syncedChan, s3url, bucket, sourceFiles) + close(syncedChan) + + <-doneChan + log.Infof("Runner[%v] daemon has finished syncing", s.meta.RunnerName) + return nil } type s3Url struct { @@ -445,28 +498,6 @@ func (s *syncRunner) loadSyncedFiles() (map[string]bool, error) { return files, nil } -func (s *syncRunner) storeSyncedFiles(files map[string]bool) error { - if len(files) <= 0 { - return nil - } - - f, err := os.OpenFile(s.metastore, os.O_WRONLY|os.O_APPEND, 0644) - if err != nil { - return err - } - defer f.Close() - - w := bufio.NewWriter(f) - for path := range files { - w.WriteString(filepath.Base(path)) - w.WriteByte('\n') - } - - log.Infof("write %d synced files to %q", len(files), s.metastore) - - return w.Flush() -} - func relativePath(path string, filePath string) string { if path == "." { return strings.TrimPrefix(filePath, "/") @@ -474,22 +505,22 @@ func relativePath(path string, filePath string) string { return strings.TrimPrefix(strings.TrimPrefix(filePath, path), "/") } -func (s *syncRunner) concurrentSyncToDir(s3url s3Url, bucket *s3.Bucket, sourceFiles map[string]bool) error { - doneChan := newDoneChan(s.concurrent) +// concurrentSyncToDir 并发地获取 bucket 中的文件,并返回本次同步实际完成的文件 +func (s *syncRunner) concurrentSyncToDir(syncedChan chan string, s3url s3Url, bucket *s3.Bucket, sourceFiles map[string]bool) { pool := newPool(s.concurrent) - var wg sync.WaitGroup + +DONE: for s3file := range sourceFiles { select { case <-s.quitChan: log.Warnf("Runner[%v] daemon has stopped, task is interrupted", s.meta.RunnerName) - return nil + break DONE default: } - //对于目录不同步 + // 对于目录不同步 if strings.HasSuffix(s3file, string(os.PathSeparator)) { - delete(sourceFiles, s3file) continue } basename := filepath.Base(s3file) @@ -499,38 +530,31 @@ func (s *syncRunner) concurrentSyncToDir(s3url s3Url, bucket *s3.Bucket, sourceF if filepath.Dir(filePath) != "." { err := os.MkdirAll(filepath.Dir(filePath), 0755) if err != nil { - return err + log.Errorf("Runner[%v] create local directory %q failed: %v", s.meta.RunnerName, filepath.Dir(filePath), err) + continue } } <-pool s.syncedFiles[basename] = true - log.Debugf("starting sync: s3://%s/%s -> %s", bucket.Name, s3file, filePath) + log.Debugf("Runner[%v] start syncing: s3://%s/%s -> %s", s.meta.RunnerName, bucket.Name, s3file, filePath) wg.Add(1) - go func(doneChan chan error, filePath string, bucket *s3.Bucket, s3file string) { + go func(filePath string, bucket *s3.Bucket, s3file string) { defer wg.Done() - syncSingleFile(doneChan, filePath, bucket, s3file) - pool <- 1 - }(doneChan, filePath, bucket, s3file) + if err := writeFile(filePath, bucket, s3file); err != nil { + log.Errorf("Runner[%v] write file %q to local failed: %v", s.meta.RunnerName, s3file, err) + return + } + syncedChan <- s3file + log.Debugf("Runner[%v] sync completed: s3://%s/%s -> %s", s.meta.RunnerName, bucket.Name, s3file, filePath) + pool <- struct{}{} + }(filePath, bucket, s3file) } else { - delete(sourceFiles, s3file) - log.Debugf("%s already synced, skip it...", unzipPath) + log.Debugf("Runner[%v] %q already synced, skipped this time", s.meta.RunnerName, unzipPath) } } wg.Wait() - - log.Info("sync done in this round") - return nil -} - -func syncSingleFile(doneChan chan error, filePath string, bucket *s3.Bucket, file string) { - err := writeFile(filePath, bucket, file) - if err != nil { - doneChan <- err - } - log.Debugf("sync completed: s3://%s/%s -> %s", bucket.Name, file, filePath) - doneChan <- nil } func writeToFile(zipf *zip.File, filename string) error { @@ -574,25 +598,10 @@ func writeFile(filename string, bucket *s3.Bucket, path string) error { return ioutil.WriteFile(filename, data, os.FileMode(0644)) } -func newPool(concurrent int) chan int { - pool := make(chan int, concurrent) +func newPool(concurrent int) chan struct{} { + pool := make(chan struct{}, concurrent) for x := 0; x < concurrent; x++ { - pool <- 1 + pool <- struct{}{} } return pool } - -func newDoneChan(concurrent int) chan error { - doneChan := make(chan error, concurrent) - go func() { - for { - select { - case err := <-doneChan: - if err != nil { - log.Error(err) - } - } - } - }() - return doneChan -}