package grib import ( "context" "fmt" "io" "os" "path/filepath" "time" "git.intra.yksa.space/gsn/predictor/internal/pkg/errcodes" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/s3" "golang.org/x/sync/errgroup" ) // S3Downloader downloads GRIB files from AWS S3 type S3Downloader struct { Dir string Parallel int Bucket string Region string Client *s3.Client } // NewS3Downloader creates a new S3 downloader with anonymous access func NewS3Downloader(dir string, parallel int, bucket, region string) (*S3Downloader, error) { // Create AWS config with anonymous credentials for public bucket cfg, err := config.LoadDefaultConfig(context.Background(), config.WithRegion(region), config.WithCredentialsProvider(aws.AnonymousCredentials{}), ) if err != nil { return nil, errcodes.Wrap(err, "failed to load AWS config") } client := s3.NewFromConfig(cfg) return &S3Downloader{ Dir: dir, Parallel: parallel, Bucket: bucket, Region: region, Client: client, }, nil } // s3Key generates the S3 key for a GRIB file // Path format: gfs.YYYYMMDD/HH/atmos/gfs.tHHz.pgrb2.0p50.fFFF func (d *S3Downloader) s3Key(run string, hour int, step int) string { return fmt.Sprintf("gfs.%s/%02d/atmos/gfs.t%02dz.pgrb2.0p50.f%03d", run, hour, hour, step) } // CheckFileExists checks if a file exists in S3 using HeadObject func (d *S3Downloader) CheckFileExists(ctx context.Context, key string) (bool, int64, error) { input := &s3.HeadObjectInput{ Bucket: aws.String(d.Bucket), Key: aws.String(key), } result, err := d.Client.HeadObject(ctx, input) if err != nil { // Check if error is NotFound // AWS SDK v2 doesn't export specific error types, check error string if isNotFoundError(err) { return false, 0, nil } return false, 0, errcodes.Wrap(err, "failed to check file existence") } size := int64(0) if result.ContentLength != nil { size = *result.ContentLength } return true, size, nil } // isNotFoundError checks if error is a NotFound error func isNotFoundError(err error) bool { if err == nil { return false } // AWS SDK v2 error handling errStr := err.Error() return contains(errStr, "NotFound") || contains(errStr, "404") || contains(errStr, "NoSuchKey") } func contains(s, substr string) bool { return len(s) >= len(substr) && (s == substr || len(s) > len(substr) && findSubstring(s, substr)) } func findSubstring(s, substr string) bool { for i := 0; i <= len(s)-len(substr); i++ { if s[i:i+len(substr)] == substr { return true } } return false } // ListAvailableFiles lists all available files for a given run func (d *S3Downloader) ListAvailableFiles(ctx context.Context, run string, hour int) ([]string, error) { prefix := fmt.Sprintf("gfs.%s/%02d/atmos/", run, hour) input := &s3.ListObjectsV2Input{ Bucket: aws.String(d.Bucket), Prefix: aws.String(prefix), } var files []string paginator := s3.NewListObjectsV2Paginator(d.Client, input) for paginator.HasMorePages() { page, err := paginator.NextPage(ctx) if err != nil { return nil, errcodes.Wrap(err, "failed to list S3 objects") } for _, obj := range page.Contents { if obj.Key != nil { files = append(files, *obj.Key) } } } return files, nil } // fetchFromS3 downloads a file from S3 to local disk with retry logic func (d *S3Downloader) fetchFromS3(ctx context.Context, key, dst string) (err error) { // Check if final file already exists if _, err := os.Stat(dst); err == nil { return nil } const maxRetries = 3 var lastErr error for attempt := 0; attempt < maxRetries; attempt++ { if attempt > 0 { // Exponential backoff: 2s, 4s, 8s waitTime := time.Duration(1< 0 && written != size { return errcodes.Wrap(errcodes.ErrDownload, fmt.Sprintf("size mismatch: got %d bytes, expected %d", written, size)) } // Close file before rename if err := f.Close(); err != nil { return err } fileClosed = true // If rename fails, err will be set and defer will cleanup .part file return os.Rename(tmp, dst) } // Run downloads all required GRIB files for a forecast run func (d *S3Downloader) Run(ctx context.Context, run time.Time) error { runStr := run.Format("20060102") hour := run.Hour() // First, list available files to verify they exist availableFiles, err := d.ListAvailableFiles(ctx, runStr, hour) if err != nil { return errcodes.Wrap(err, "failed to list available files") } if len(availableFiles) == 0 { return errcodes.Wrap(errcodes.ErrDownload, fmt.Sprintf("no files found for run %s/%02d", runStr, hour)) } // Build a map of available files for quick lookup availableMap := make(map[string]bool) for _, file := range availableFiles { availableMap[file] = true } g, ctx := errgroup.WithContext(ctx) sem := make(chan struct{}, d.Parallel) for _, step := range steps { step := step key := d.s3Key(runStr, hour, step) // Check if file is available in S3 if !availableMap[key] { // Log warning but don't fail - some forecast hours might not be available yet continue } sem <- struct{}{} g.Go(func() error { defer func() { <-sem }() dst := filepath.Join(d.Dir, fileName(run, step)) return d.fetchFromS3(ctx, key, dst) }) } return g.Wait() }