Skip to content

Commit

Permalink
Add S3 downloader, gunzip
Browse files Browse the repository at this point in the history
  • Loading branch information
bahlo committed May 10, 2022
1 parent 94ede33 commit 2b0d802
Showing 1 changed file with 35 additions and 7 deletions.
42 changes: 35 additions & 7 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,38 @@
package main

import (
"bytes"
"compress/gzip"
"context"
"fmt"
"log"
"os"
"sync"

"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/axiomhq/axiom-cloudfront-lambda/parser"
"github.com/axiomhq/axiom-go/axiom"
"github.com/axiomhq/pkg/cmd"
"go.uber.org/zap"
)

var (
downloader *s3manager.Downloader
downloaderOnce sync.Once
)

func main() {
downloaderOnce.Do(func() {
// TODO: Do we need credentials
sess := session.Must(session.NewSession())
downloader = s3manager.NewDownloader(sess)
})

cmd.Run("axiom-cloudwatch-lambda", run,
cmd.WithRequiredEnvVars("AXIOM_DATASET"),
cmd.WithValidateAxiomCredentials(),
Expand All @@ -38,21 +57,30 @@ func handler(client *axiom.Client, dataset string) func(context.Context, events.
continue
}

s3 := record.S3
// Download file
buf := aws.NewWriteAtBuffer([]byte{})
_, err := downloader.DownloadWithContext(ctx, buf, &s3.GetObjectInput{
Bucket: aws.String(record.S3.Bucket.Name),
Key: aws.String(record.S3.Object.Key),
})
if err != nil {
return fmt.Errorf("failed to download file: %w", err)
}
contents := bytes.NewBuffer(buf.Bytes()) // TODO: Can we stream directly into the reader?

// fetch logs from S3
file, err := os.Open(s3.Object.Key)
// Gunzip
gzipReader, err := gzip.NewReader(contents)
if err != nil {
return err
return fmt.Errorf("failed to gunzip file: %w", err)
}

// parse logs
events, err := parser.ParseCloudfrontLogs(file)
// Parse
events, err := parser.ParseCloudfrontLogs(gzipReader)
if err != nil {
return err
}

// send logs to Axiom
// Send logs to Axiom
if status, err := client.Datasets.IngestEvents(ctx, dataset, axiom.IngestOptions{TimestampField: "time"}, events...); err != nil {
return err
} else {
Expand Down

0 comments on commit 2b0d802

Please sign in to comment.