forked from gsn/predictor
265 lines
6.6 KiB
Go
265 lines
6.6 KiB
Go
package grib
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"path/filepath"
|
|
"time"
|
|
|
|
"git.intra.yksa.space/gsn/predictor/internal/pkg/errcodes"
|
|
"github.com/aws/aws-sdk-go-v2/aws"
|
|
"github.com/aws/aws-sdk-go-v2/config"
|
|
"github.com/aws/aws-sdk-go-v2/service/s3"
|
|
"golang.org/x/sync/errgroup"
|
|
)
|
|
|
|
// S3Downloader downloads GRIB files from AWS S3
|
|
type S3Downloader struct {
|
|
Dir string
|
|
Parallel int
|
|
Bucket string
|
|
Region string
|
|
Client *s3.Client
|
|
}
|
|
|
|
// NewS3Downloader creates a new S3 downloader with anonymous access
|
|
func NewS3Downloader(dir string, parallel int, bucket, region string) (*S3Downloader, error) {
|
|
// Create AWS config with anonymous credentials for public bucket
|
|
cfg, err := config.LoadDefaultConfig(context.Background(),
|
|
config.WithRegion(region),
|
|
config.WithCredentialsProvider(aws.AnonymousCredentials{}),
|
|
)
|
|
if err != nil {
|
|
return nil, errcodes.Wrap(err, "failed to load AWS config")
|
|
}
|
|
|
|
client := s3.NewFromConfig(cfg)
|
|
|
|
return &S3Downloader{
|
|
Dir: dir,
|
|
Parallel: parallel,
|
|
Bucket: bucket,
|
|
Region: region,
|
|
Client: client,
|
|
}, nil
|
|
}
|
|
|
|
// s3Key generates the S3 key for a GRIB file
|
|
// Path format: gfs.YYYYMMDD/HH/atmos/gfs.tHHz.pgrb2.0p50.fFFF
|
|
func (d *S3Downloader) s3Key(run string, hour int, step int) string {
|
|
return fmt.Sprintf("gfs.%s/%02d/atmos/gfs.t%02dz.pgrb2.0p50.f%03d", run, hour, hour, step)
|
|
}
|
|
|
|
// CheckFileExists checks if a file exists in S3 using HeadObject
|
|
func (d *S3Downloader) CheckFileExists(ctx context.Context, key string) (bool, int64, error) {
|
|
input := &s3.HeadObjectInput{
|
|
Bucket: aws.String(d.Bucket),
|
|
Key: aws.String(key),
|
|
}
|
|
|
|
result, err := d.Client.HeadObject(ctx, input)
|
|
if err != nil {
|
|
// Check if error is NotFound
|
|
// AWS SDK v2 doesn't export specific error types, check error string
|
|
if isNotFoundError(err) {
|
|
return false, 0, nil
|
|
}
|
|
return false, 0, errcodes.Wrap(err, "failed to check file existence")
|
|
}
|
|
|
|
size := int64(0)
|
|
if result.ContentLength != nil {
|
|
size = *result.ContentLength
|
|
}
|
|
|
|
return true, size, nil
|
|
}
|
|
|
|
// isNotFoundError checks if error is a NotFound error
|
|
func isNotFoundError(err error) bool {
|
|
if err == nil {
|
|
return false
|
|
}
|
|
// AWS SDK v2 error handling
|
|
errStr := err.Error()
|
|
return contains(errStr, "NotFound") || contains(errStr, "404") || contains(errStr, "NoSuchKey")
|
|
}
|
|
|
|
func contains(s, substr string) bool {
|
|
return len(s) >= len(substr) && (s == substr || len(s) > len(substr) && findSubstring(s, substr))
|
|
}
|
|
|
|
func findSubstring(s, substr string) bool {
|
|
for i := 0; i <= len(s)-len(substr); i++ {
|
|
if s[i:i+len(substr)] == substr {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// ListAvailableFiles lists all available files for a given run
|
|
func (d *S3Downloader) ListAvailableFiles(ctx context.Context, run string, hour int) ([]string, error) {
|
|
prefix := fmt.Sprintf("gfs.%s/%02d/atmos/", run, hour)
|
|
|
|
input := &s3.ListObjectsV2Input{
|
|
Bucket: aws.String(d.Bucket),
|
|
Prefix: aws.String(prefix),
|
|
}
|
|
|
|
var files []string
|
|
paginator := s3.NewListObjectsV2Paginator(d.Client, input)
|
|
|
|
for paginator.HasMorePages() {
|
|
page, err := paginator.NextPage(ctx)
|
|
if err != nil {
|
|
return nil, errcodes.Wrap(err, "failed to list S3 objects")
|
|
}
|
|
|
|
for _, obj := range page.Contents {
|
|
if obj.Key != nil {
|
|
files = append(files, *obj.Key)
|
|
}
|
|
}
|
|
}
|
|
|
|
return files, nil
|
|
}
|
|
|
|
// fetchFromS3 downloads a file from S3 to local disk with retry logic
|
|
func (d *S3Downloader) fetchFromS3(ctx context.Context, key, dst string) (err error) {
|
|
// Check if final file already exists
|
|
if _, err := os.Stat(dst); err == nil {
|
|
return nil
|
|
}
|
|
|
|
const maxRetries = 3
|
|
var lastErr error
|
|
|
|
for attempt := 0; attempt < maxRetries; attempt++ {
|
|
if attempt > 0 {
|
|
// Exponential backoff: 2s, 4s, 8s
|
|
waitTime := time.Duration(1<<uint(attempt)) * time.Second
|
|
time.Sleep(waitTime)
|
|
}
|
|
|
|
lastErr = d.fetchFromS3Once(ctx, key, dst)
|
|
if lastErr == nil {
|
|
return nil
|
|
}
|
|
}
|
|
|
|
return errcodes.Wrap(lastErr, fmt.Sprintf("failed after %d retries", maxRetries))
|
|
}
|
|
|
|
// fetchFromS3Once performs a single download attempt
|
|
func (d *S3Downloader) fetchFromS3Once(ctx context.Context, key, dst string) (err error) {
|
|
tmp := dst + ".part"
|
|
|
|
// Remove old .part file if it exists
|
|
os.Remove(tmp)
|
|
|
|
f, err := os.Create(tmp)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
fileClosed := false
|
|
// Cleanup .part file on any error (using named return value)
|
|
defer func() {
|
|
if !fileClosed {
|
|
f.Close()
|
|
}
|
|
if err != nil {
|
|
os.Remove(tmp)
|
|
}
|
|
}()
|
|
|
|
// Check if file exists in S3
|
|
exists, size, checkErr := d.CheckFileExists(ctx, key)
|
|
if checkErr != nil {
|
|
return errcodes.Wrap(checkErr, "failed to check S3 file existence")
|
|
}
|
|
if !exists {
|
|
return errcodes.Wrap(errcodes.ErrDownload, fmt.Sprintf("file not found in S3: %s", key))
|
|
}
|
|
|
|
// Download from S3
|
|
input := &s3.GetObjectInput{
|
|
Bucket: aws.String(d.Bucket),
|
|
Key: aws.String(key),
|
|
}
|
|
|
|
result, err := d.Client.GetObject(ctx, input)
|
|
if err != nil {
|
|
return errcodes.Wrap(err, "failed to get S3 object")
|
|
}
|
|
defer result.Body.Close()
|
|
|
|
// Copy to local file
|
|
written, err := io.Copy(f, result.Body)
|
|
if err != nil {
|
|
return errcodes.Wrap(err, fmt.Sprintf("failed to write S3 object to file %s", dst))
|
|
}
|
|
|
|
// Verify size if available
|
|
if size > 0 && written != size {
|
|
return errcodes.Wrap(errcodes.ErrDownload, fmt.Sprintf("size mismatch: got %d bytes, expected %d", written, size))
|
|
}
|
|
|
|
// Close file before rename
|
|
if err := f.Close(); err != nil {
|
|
return err
|
|
}
|
|
fileClosed = true
|
|
|
|
// If rename fails, err will be set and defer will cleanup .part file
|
|
return os.Rename(tmp, dst)
|
|
}
|
|
|
|
// Run downloads all required GRIB files for a forecast run
|
|
func (d *S3Downloader) Run(ctx context.Context, run time.Time) error {
|
|
runStr := run.Format("20060102")
|
|
hour := run.Hour()
|
|
|
|
// First, list available files to verify they exist
|
|
availableFiles, err := d.ListAvailableFiles(ctx, runStr, hour)
|
|
if err != nil {
|
|
return errcodes.Wrap(err, "failed to list available files")
|
|
}
|
|
|
|
if len(availableFiles) == 0 {
|
|
return errcodes.Wrap(errcodes.ErrDownload, fmt.Sprintf("no files found for run %s/%02d", runStr, hour))
|
|
}
|
|
|
|
// Build a map of available files for quick lookup
|
|
availableMap := make(map[string]bool)
|
|
for _, file := range availableFiles {
|
|
availableMap[file] = true
|
|
}
|
|
|
|
g, ctx := errgroup.WithContext(ctx)
|
|
sem := make(chan struct{}, d.Parallel)
|
|
|
|
for _, step := range steps {
|
|
step := step
|
|
key := d.s3Key(runStr, hour, step)
|
|
|
|
// Check if file is available in S3
|
|
if !availableMap[key] {
|
|
// Log warning but don't fail - some forecast hours might not be available yet
|
|
continue
|
|
}
|
|
|
|
sem <- struct{}{}
|
|
g.Go(func() error {
|
|
defer func() { <-sem }()
|
|
dst := filepath.Join(d.Dir, fileName(run, step))
|
|
return d.fetchFromS3(ctx, key, dst)
|
|
})
|
|
}
|
|
|
|
return g.Wait()
|
|
}
|