Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reader/cloudtrail: improve sync process #698

Merged
merged 1 commit into from
Aug 8, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 101 additions & 92 deletions reader/cloudtrail/cloudtrail.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,9 @@ func (r *Reader) Reset() (err error) {
}

func (r *Reader) Close() error {
log.Debugf("runner[%v] syncMgr.stopSync...", r.Meta.RunnerName)
log.Infof("Runner[%v] syncMgr.stopSync...", r.Meta.RunnerName)
r.syncMgr.stopSync()
log.Debugf("runner[%v] syncMgr closed, wait for BufReader closed...", r.Meta.RunnerName)
log.Infof("Runner[%v] syncMgr closed, wait for BufReader closed...", r.Meta.RunnerName)
return r.BufReader.Close()
}

Expand Down Expand Up @@ -185,10 +185,17 @@ func buildSyncOptions(conf conf.MapConf) (*syncOptions, error) {
if opts.interval, err = time.ParseDuration(s); err != nil {
return nil, invalidConfigError(reader.KeySyncInterval, s, err)
}
if opts.interval.Nanoseconds() <= 0 {
opts.interval = 5 * time.Minute
}

s, _ = conf.GetStringOr(reader.KeySyncConcurrent, "5")
if opts.concurrent, err = strconv.Atoi(s); err != nil {
return nil, invalidConfigError(reader.KeySyncInterval, s, err)
}
if opts.concurrent <= 0 {
opts.concurrent = 5
}

return &opts, nil
}
Expand Down Expand Up @@ -231,24 +238,18 @@ func makeSyncSource(bucket, prefix string) string {
func (mgr *syncManager) startSync() {
ticker := time.NewTicker(mgr.interval)
defer ticker.Stop()

if err := mgr.syncOnce(); err != nil {
log.Errorf("sync failed: %v", err)
}

Sync:
for {
if err := mgr.syncOnce(); err != nil {
log.Errorf("Runner[%v] daemon sync once failed: %v", mgr.meta.RunnerName, err)
}

select {
case <-ticker.C:
if err := mgr.syncOnce(); err != nil {
log.Errorf("sync failed: %v", err)
}
case <-mgr.quitChan:
break Sync
log.Infof("Runner[%v] daemon has stopped from running", mgr.meta.RunnerName)
return
case <-ticker.C:
}
}

log.Info("sync stopped working")
}

func (mgr *syncManager) syncOnce() error {
Expand All @@ -261,8 +262,7 @@ func (mgr *syncManager) syncOnce() error {
concurrent: mgr.concurrent,
region: mgr.region,
}
runner := newSyncRunner(ctx, mgr.quitChan)
return runner.Sync()
return newSyncRunner(ctx, mgr.quitChan).Sync()
}

func (mgr *syncManager) stopSync() {
Expand Down Expand Up @@ -317,32 +317,85 @@ func validTarget(target string) bool {
return false
}

const maximumFlushSyncedFilesOneTime = 10

func storeSyncedFiles(f *os.File, syncedFiles map[string]bool) error {
if len(syncedFiles) <= 0 {
return nil
}

for path := range syncedFiles {
f.WriteString(filepath.Base(path))
f.WriteString("\n")
}

return f.Sync()
}

// Note: 非线程安全,需由调用者保证同步调用
func (s *syncRunner) syncToDir() error {
log.Info("syncing from s3...")
log.Infof("Runner[%v] syncing from s3...", s.meta.RunnerName)

metastore, err := os.OpenFile(s.metastore, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0644)
if err != nil {
return fmt.Errorf("open metastore: %v", err)
}
defer metastore.Close()

s3url := newS3Url(s.source)
bucket, err := lookupBucket(s3url.Bucket(), s.auth, s.region)
if err != nil {
return err
return fmt.Errorf("lookup bucket: %v", err)
}

sourceFiles := make(map[string]bool)
sourceFiles, err = loadS3Files(bucket, s3url.Path(), sourceFiles, "")
if err != nil {
return err
return fmt.Errorf("load s3 files: %v", err)
}
if s.syncedFiles == nil {
s.syncedFiles, err = s.loadSyncedFiles()
if err != nil {
return err
return fmt.Errorf("load synced files: %v", err)
}
}

err = s.concurrentSyncToDir(s3url, bucket, sourceFiles)
if err != nil {
return err
}
return s.storeSyncedFiles(sourceFiles)
syncedChan := make(chan string, s.concurrent)
doneChan := make(chan struct{})
go func() {
syncedFiles := make(map[string]bool, maximumFlushSyncedFilesOneTime)
for file := range syncedChan {
syncedFiles[file] = true

if len(syncedFiles) < maximumFlushSyncedFilesOneTime {
continue
}

if err = storeSyncedFiles(metastore, syncedFiles); err != nil {
log.Errorf("Runner[%v] wrote synced files to %q failed: %v", s.meta.RunnerName, s.metastore)
} else {
log.Infof("Runner[%v] wrote %d synced files to %q", s.meta.RunnerName, len(syncedFiles), s.metastore)

// Note: 可能导致在 Sync 失败的情况下部分文件名重复输入到 metastore 中,但比丢失已同步记录重新处理一遍相同数据结果要更加合理
syncedFiles = make(map[string]bool, maximumFlushSyncedFilesOneTime)
}
}

if err = storeSyncedFiles(metastore, syncedFiles); err != nil {
log.Errorf("Runner[%v] wrote synced files to %q failed: %v", s.meta.RunnerName, s.metastore)
} else {
log.Infof("Runner[%v] wrote %d synced files to %q", s.meta.RunnerName, len(syncedFiles), s.metastore)
}

doneChan <- struct{}{}
}()

s.concurrentSyncToDir(syncedChan, s3url, bucket, sourceFiles)
close(syncedChan)

<-doneChan
log.Infof("Runner[%v] daemon has finished syncing", s.meta.RunnerName)
return nil
}

type s3Url struct {
Expand Down Expand Up @@ -445,51 +498,29 @@ func (s *syncRunner) loadSyncedFiles() (map[string]bool, error) {
return files, nil
}

func (s *syncRunner) storeSyncedFiles(files map[string]bool) error {
if len(files) <= 0 {
return nil
}

f, err := os.OpenFile(s.metastore, os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
return err
}
defer f.Close()

w := bufio.NewWriter(f)
for path := range files {
w.WriteString(filepath.Base(path))
w.WriteByte('\n')
}

log.Infof("write %d synced files to %q", len(files), s.metastore)

return w.Flush()
}

func relativePath(path string, filePath string) string {
if path == "." {
return strings.TrimPrefix(filePath, "/")
}
return strings.TrimPrefix(strings.TrimPrefix(filePath, path), "/")
}

func (s *syncRunner) concurrentSyncToDir(s3url s3Url, bucket *s3.Bucket, sourceFiles map[string]bool) error {
doneChan := newDoneChan(s.concurrent)
// concurrentSyncToDir 并发地获取 bucket 中的文件,并返回本次同步实际完成的文件
func (s *syncRunner) concurrentSyncToDir(syncedChan chan string, s3url s3Url, bucket *s3.Bucket, sourceFiles map[string]bool) {
pool := newPool(s.concurrent)

var wg sync.WaitGroup

DONE:
for s3file := range sourceFiles {
select {
case <-s.quitChan:
log.Warnf("Runner[%v] daemon has stopped, task is interrupted", s.meta.RunnerName)
return nil
break DONE
default:
}

//对于目录不同步
// 对于目录不同步
if strings.HasSuffix(s3file, string(os.PathSeparator)) {
delete(sourceFiles, s3file)
continue
}
basename := filepath.Base(s3file)
Expand All @@ -499,38 +530,31 @@ func (s *syncRunner) concurrentSyncToDir(s3url s3Url, bucket *s3.Bucket, sourceF
if filepath.Dir(filePath) != "." {
err := os.MkdirAll(filepath.Dir(filePath), 0755)
if err != nil {
return err
log.Errorf("Runner[%v] create local directory %q failed: %v", s.meta.RunnerName, filepath.Dir(filePath), err)
continue
}
}
<-pool
s.syncedFiles[basename] = true

log.Debugf("starting sync: s3://%s/%s -> %s", bucket.Name, s3file, filePath)
log.Debugf("Runner[%v] start syncing: s3://%s/%s -> %s", s.meta.RunnerName, bucket.Name, s3file, filePath)

wg.Add(1)
go func(doneChan chan error, filePath string, bucket *s3.Bucket, s3file string) {
go func(filePath string, bucket *s3.Bucket, s3file string) {
defer wg.Done()
syncSingleFile(doneChan, filePath, bucket, s3file)
pool <- 1
}(doneChan, filePath, bucket, s3file)
if err := writeFile(filePath, bucket, s3file); err != nil {
log.Errorf("Runner[%v] write file %q to local failed: %v", s.meta.RunnerName, s3file, err)
return
}
syncedChan <- s3file
log.Debugf("Runner[%v] sync completed: s3://%s/%s -> %s", s.meta.RunnerName, bucket.Name, s3file, filePath)
pool <- struct{}{}
}(filePath, bucket, s3file)
} else {
delete(sourceFiles, s3file)
log.Debugf("%s already synced, skip it...", unzipPath)
log.Debugf("Runner[%v] %q already synced, skipped this time", s.meta.RunnerName, unzipPath)
}
}
wg.Wait()

log.Info("sync done in this round")
return nil
}

func syncSingleFile(doneChan chan error, filePath string, bucket *s3.Bucket, file string) {
err := writeFile(filePath, bucket, file)
if err != nil {
doneChan <- err
}
log.Debugf("sync completed: s3://%s/%s -> %s", bucket.Name, file, filePath)
doneChan <- nil
}

func writeToFile(zipf *zip.File, filename string) error {
Expand Down Expand Up @@ -574,25 +598,10 @@ func writeFile(filename string, bucket *s3.Bucket, path string) error {
return ioutil.WriteFile(filename, data, os.FileMode(0644))
}

func newPool(concurrent int) chan int {
pool := make(chan int, concurrent)
func newPool(concurrent int) chan struct{} {
pool := make(chan struct{}, concurrent)
for x := 0; x < concurrent; x++ {
pool <- 1
pool <- struct{}{}
}
return pool
}

func newDoneChan(concurrent int) chan error {
doneChan := make(chan error, concurrent)
go func() {
for {
select {
case err := <-doneChan:
if err != nil {
log.Error(err)
}
}
}
}()
return doneChan
}