package grib import ( "context" "fmt" "io" "net/http" "os" "path/filepath" "time" "git.intra.yksa.space/gsn/predictor/internal/pkg/errcodes" "golang.org/x/sync/errgroup" ) type Downloader struct { Dir string Parallel int Client *http.Client DatasetURL string } func (d *Downloader) fileURL(run string, hour int, step int) string { return fmt.Sprintf("%s/gfs.%s/%02d/atmos/gfs.t%02dz.pgrb2.0p50.f%03d", d.DatasetURL, run, hour, hour, step) } func (d *Downloader) fetch(ctx context.Context, url, dst string) (err error) { // Check if final file already exists if _, err := os.Stat(dst); err == nil { return nil } tmp := dst + ".part" // Remove old .part file if it exists (fixes race condition) os.Remove(tmp) f, err := os.Create(tmp) if err != nil { return err } // Cleanup .part file on any error (using named return value) defer func() { f.Close() if err != nil { os.Remove(tmp) } }() req, _ := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) resp, err := d.Client.Do(req) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return errcodes.Wrap(errcodes.ErrDownload, "bad status: "+resp.Status) } if _, err := io.Copy(f, resp.Body); err != nil { return err } // Close file before rename if err := f.Close(); err != nil { return err } // If rename fails, err will be set and defer will cleanup .part file return os.Rename(tmp, dst) } func (d *Downloader) Run(ctx context.Context, run time.Time) error { runStr := run.Format("20060102") hour := run.Hour() g, ctx := errgroup.WithContext(ctx) sem := make(chan struct{}, d.Parallel) for _, step := range steps { step := step sem <- struct{}{} g.Go(func() error { defer func() { <-sem }() url := d.fileURL(runStr, hour, step) dst := filepath.Join(d.Dir, fileName(run, step)) return d.fetch(ctx, url, dst) }) } return g.Wait() }