This commit is contained in:
Anatoly Antonov 2026-05-18 03:17:17 +09:00
parent 7a8d5d13fa
commit 9e663db9dc
68 changed files with 5647 additions and 2958 deletions

11
internal/datasets/doc.go Normal file
View file

@ -0,0 +1,11 @@
// Package datasets manages the lifecycle of atmospheric datasets. It exposes:
//
// - A Source interface for pluggable dataset origins (GFS now, ECMWF later).
// - A Storage interface for transactional, resumable on-disk persistence.
// - A Manager that coordinates downloads, tracks job state, and owns the
// currently-active weather.WindField.
//
// The package is the only one in the service that knows about download
// scheduling, manifests, or bandwidth throttling — engine and API layers
// only see WindField + Manager-as-admin.
package datasets

View file

@ -0,0 +1,125 @@
package gfs
import (
"fmt"
"strconv"
"strings"
)
// IdxEntry is one parsed line from a NOAA GRIB .idx file.
//
// Example line: "15:1207405:d=2024010100:HGT:1000 mb:0 hour fcst:"
type IdxEntry struct {
Index int
Offset int64
Variable string
LevelMB int // 0 when the level is not isobaric
Hour int // forecast hour; 0 for analysis ("anl"); -1 if unparseable
EndOffset int64 // computed from the next entry's Offset; -1 for the final entry
}
// Length returns the byte length of this GRIB message, or -1 if unknown
// (the final entry in an idx file).
func (e *IdxEntry) Length() int64 {
if e.EndOffset <= 0 {
return -1
}
return e.EndOffset - e.Offset
}
// ParseIdx parses a .idx file body. Unparseable lines are silently skipped.
func ParseIdx(body []byte) []IdxEntry {
lines := strings.Split(string(body), "\n")
var entries []IdxEntry
for _, line := range lines {
line = strings.TrimSpace(line)
if line == "" {
continue
}
parts := strings.Split(line, ":")
if len(parts) < 7 {
continue
}
idx, err := strconv.Atoi(parts[0])
if err != nil {
continue
}
off, err := strconv.ParseInt(parts[1], 10, 64)
if err != nil {
continue
}
entries = append(entries, IdxEntry{
Index: idx,
Offset: off,
Variable: parts[3],
LevelMB: parseLevelMB(parts[4]),
Hour: parseHour(parts[5]),
EndOffset: -1,
})
}
for i := 0; i < len(entries)-1; i++ {
entries[i].EndOffset = entries[i+1].Offset
}
return entries
}
// FilterIdx returns entries matching one of the wanted variables at a known
// pressure level with a computable byte length.
func FilterIdx(entries []IdxEntry, wanted map[string]bool) []IdxEntry {
var out []IdxEntry
for _, e := range entries {
if !wanted[e.Variable] || e.LevelMB <= 0 || e.Length() <= 0 {
continue
}
out = append(out, e)
}
return out
}
func parseLevelMB(s string) int {
s = strings.TrimSpace(s)
if !strings.HasSuffix(s, " mb") {
return 0
}
n, err := strconv.Atoi(strings.TrimSuffix(s, " mb"))
if err != nil {
return 0
}
return n
}
func parseHour(s string) int {
s = strings.TrimSpace(s)
if s == "anl" {
return 0
}
n, err := strconv.Atoi(strings.TrimSuffix(s, " hour fcst"))
if err != nil {
return -1
}
return n
}
// ByteRange is one HTTP range download corresponding to one GRIB message.
type ByteRange struct {
Start int64
End int64 // inclusive
Entry IdxEntry
}
// EntriesToRanges converts idx entries to inclusive HTTP byte ranges.
func EntriesToRanges(entries []IdxEntry) []ByteRange {
out := make([]ByteRange, 0, len(entries))
for _, e := range entries {
if e.Length() <= 0 {
continue
}
out = append(out, ByteRange{Start: e.Offset, End: e.EndOffset - 1, Entry: e})
}
return out
}
// FormatRange returns an HTTP Range header value for the byte range.
func (r ByteRange) FormatRange() string {
return fmt.Sprintf("bytes=%d-%d", r.Start, r.End)
}

View file

@ -0,0 +1,70 @@
package gfs
import "testing"
const sampleIdx = `1:0:d=2024010100:HGT:1000 mb:0 hour fcst:
2:289012:d=2024010100:HGT:975 mb:0 hour fcst:
3:541876:d=2024010100:TMP:1000 mb:0 hour fcst:
4:789012:d=2024010100:UGRD:1000 mb:0 hour fcst:
5:1045678:d=2024010100:VGRD:1000 mb:0 hour fcst:
6:1298765:d=2024010100:UGRD:975 mb:0 hour fcst:
7:1567890:d=2024010100:UGRD:2 m above ground:0 hour fcst:
8:1812345:d=2024010100:VGRD:975 mb:0 hour fcst:
9:2098765:d=2024010100:HGT:500 mb:3 hour fcst:
`
func TestParseIdx(t *testing.T) {
entries := ParseIdx([]byte(sampleIdx))
if len(entries) != 9 {
t.Fatalf("expected 9 entries, got %d", len(entries))
}
if e := entries[0]; e.Index != 1 || e.Offset != 0 || e.Variable != "HGT" || e.LevelMB != 1000 || e.Hour != 0 || e.EndOffset != 289012 {
t.Errorf("entry 0: %+v", e)
}
if e := entries[6]; e.LevelMB != 0 {
t.Errorf("non-pressure level should have LevelMB=0, got %d", e.LevelMB)
}
if e := entries[len(entries)-1]; e.EndOffset != -1 {
t.Errorf("last entry EndOffset: got %d, want -1", e.EndOffset)
}
}
func TestFilterIdx(t *testing.T) {
entries := ParseIdx([]byte(sampleIdx))
want := map[string]bool{"HGT": true, "UGRD": true, "VGRD": true}
filtered := FilterIdx(entries, want)
// HGT@1000, HGT@975, UGRD@1000, VGRD@1000, UGRD@975, VGRD@975 = 6
// HGT@500 at 3hr is last entry (no EndOffset), so dropped.
if len(filtered) != 6 {
t.Errorf("expected 6, got %d", len(filtered))
}
}
func TestParseLevelMB(t *testing.T) {
cases := []struct {
in string
want int
}{
{"1000 mb", 1000}, {"975 mb", 975}, {"1 mb", 1},
{"2 m above ground", 0}, {"surface", 0}, {"tropopause", 0},
}
for _, c := range cases {
if got := parseLevelMB(c.in); got != c.want {
t.Errorf("parseLevelMB(%q) = %d, want %d", c.in, got, c.want)
}
}
}
func TestParseHour(t *testing.T) {
cases := []struct {
in string
want int
}{
{"0 hour fcst", 0}, {"3 hour fcst", 3}, {"192 hour fcst", 192}, {"anl", 0},
}
for _, c := range cases {
if got := parseHour(c.in); got != c.want {
t.Errorf("parseHour(%q) = %d, want %d", c.in, got, c.want)
}
}
}

View file

@ -0,0 +1,430 @@
// Package gfs implements datasets.Source for NOAA GFS 0.5-degree forecasts.
package gfs
import (
"context"
"errors"
"fmt"
"io"
"math"
"net/http"
"os"
"sync"
"time"
"github.com/nilsmagnus/grib/griblib"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"predictor-refactored/internal/datasets"
"predictor-refactored/internal/weather"
wgfs "predictor-refactored/internal/weather/gfs"
)
// Source is the GFS implementation of datasets.Source.
type Source struct {
Parallel int // max concurrent step downloads
Client *http.Client // optional; defaults to a 2-minute-timeout client
Log *zap.Logger
}
// NewSource returns a default Source.
func NewSource(log *zap.Logger) *Source {
return &Source{
Parallel: 8,
Client: &http.Client{Timeout: 2 * time.Minute},
Log: log,
}
}
// ID returns the source identifier.
func (s *Source) ID() string { return "noaa-gfs-0p50" }
func (s *Source) log() *zap.Logger {
if s.Log == nil {
return zap.NewNop()
}
return s.Log
}
func (s *Source) client() *http.Client {
if s.Client == nil {
return &http.Client{Timeout: 2 * time.Minute}
}
return s.Client
}
func (s *Source) parallel() int {
if s.Parallel <= 0 {
return 8
}
return s.Parallel
}
// LatestEpoch returns the most recent run NOAA has finished publishing,
// determined by HEAD-ing the .idx for the final forecast hour. Walks back
// up to 8 runs (48 hours) before giving up.
func (s *Source) LatestEpoch(ctx context.Context) (time.Time, error) {
now := time.Now().UTC()
hour := now.Hour() - (now.Hour() % 6)
current := time.Date(now.Year(), now.Month(), now.Day(), hour, 0, 0, 0, time.UTC)
for range 8 {
date := current.Format("20060102")
url := wgfs.GribURL(date, current.Hour(), wgfs.MaxHour) + ".idx"
req, err := http.NewRequestWithContext(ctx, http.MethodHead, url, nil)
if err == nil {
resp, err := s.client().Do(req)
if err == nil {
resp.Body.Close()
if resp.StatusCode == http.StatusOK {
s.log().Info("latest GFS run discovered",
zap.Time("run", current),
zap.String("verified_url", url))
return current, nil
}
}
}
current = current.Add(-6 * time.Hour)
}
return time.Time{}, fmt.Errorf("no recent GFS run found (checked 8 runs)")
}
// Open loads a stored dataset as a WindField.
func (s *Source) Open(_ context.Context, epoch time.Time, store datasets.Storage) (weather.WindField, error) {
if !store.Exists(epoch) {
return nil, fmt.Errorf("epoch %s not found", epoch.Format(time.RFC3339))
}
file, err := wgfs.Open(store.Path(epoch), epoch.UTC())
if err != nil {
return nil, err
}
return wgfs.NewWind(file), nil
}
// neededVariables is the GRIB variable set we extract.
var neededVariables = map[string]bool{"HGT": true, "UGRD": true, "VGRD": true}
// Download fetches the full dataset for epoch in parallel, resuming any
// previously-completed work units. Honours ctx cancellation and prog
// (which may be nil).
func (s *Source) Download(ctx context.Context, epoch time.Time, store datasets.Storage, prog datasets.ProgressSink, throttle datasets.Throttle) error {
if prog == nil {
prog = noopSink{}
}
handle, err := store.BeginWrite(epoch)
if err != nil {
return fmt.Errorf("begin write: %w", err)
}
manifest := handle.Manifest()
// Open or create the temp file. If a previous attempt left a partial
// file of the right size, reuse it (resume); otherwise Create.
file, err := openOrCreateCube(handle.Path())
if err != nil {
_ = handle.Abort()
return err
}
date := epoch.UTC().Format("20060102")
runHour := epoch.UTC().Hour()
steps := wgfs.Hours()
totalUnits := len(steps) * 2
prog.SetTotal(totalUnits)
// Pre-count already-done units so progress is accurate on resume.
for _, u := range manifest.Units() {
_ = u
prog.StepComplete()
}
start := time.Now()
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(s.parallel())
// fileMu serialises concurrent BlitGribData calls because the underlying
// mmap is shared and SetVal isn't atomic.
var fileMu sync.Mutex
for _, step := range steps {
hourIdx := wgfs.HourIndex(step)
if hourIdx < 0 {
continue
}
for _, ls := range []wgfs.LevelSet{wgfs.LevelSetA, wgfs.LevelSetB} {
unit := unitKey(step, ls)
if manifest.Has(unit) {
continue
}
g.Go(func() error {
var url string
switch ls {
case wgfs.LevelSetA:
url = wgfs.GribURL(date, runHour, step)
case wgfs.LevelSetB:
url = wgfs.GribURLB(date, runHour, step)
}
if err := s.downloadAndBlit(ctx, file, &fileMu, url, hourIdx, ls, prog, throttle); err != nil {
return fmt.Errorf("step %d %s: %w", step, levelSetLabel(ls), err)
}
if err := manifest.Mark(unit); err != nil {
return fmt.Errorf("mark unit: %w", err)
}
prog.StepComplete()
return nil
})
}
}
if err := g.Wait(); err != nil {
_ = file.Close()
// Don't Abort on context cancellation — preserve progress for resume.
if errors.Is(err, context.Canceled) {
return err
}
// Other errors: abort if no progress was made; otherwise leave for resume.
if len(manifest.Units()) == 0 {
_ = handle.Abort()
}
return err
}
if err := file.Flush(); err != nil {
_ = file.Close()
return fmt.Errorf("flush: %w", err)
}
if err := file.Close(); err != nil {
return fmt.Errorf("close: %w", err)
}
if err := handle.Commit(); err != nil {
return fmt.Errorf("commit: %w", err)
}
s.log().Info("download complete",
zap.Time("epoch", epoch),
zap.Duration("elapsed", time.Since(start)))
return nil
}
// openOrCreateCube returns a writable cube file at path, creating it if the
// file does not exist or has the wrong size.
func openOrCreateCube(path string) (*wgfs.File, error) {
info, err := os.Stat(path)
if err == nil && info.Size() == wgfs.DatasetSize {
return wgfs.OpenWritable(path)
}
if err != nil && !errors.Is(err, os.ErrNotExist) {
return nil, fmt.Errorf("stat cube: %w", err)
}
// Wrong-size or missing — truncate-create.
return wgfs.Create(path)
}
// downloadAndBlit fetches and decodes one (URL, level-set) chunk and writes
// it into the dataset.
func (s *Source) downloadAndBlit(
ctx context.Context,
file *wgfs.File,
fileMu *sync.Mutex,
baseURL string,
hourIdx int,
ls wgfs.LevelSet,
prog datasets.ProgressSink,
throttle datasets.Throttle,
) error {
idxBody, err := s.httpGet(ctx, baseURL+".idx", throttle, prog)
if err != nil {
return fmt.Errorf("idx: %w", err)
}
entries := ParseIdx(idxBody)
filtered := FilterIdx(entries, neededVariables)
var relevant []IdxEntry
for _, e := range filtered {
set, ok := wgfs.PressureLevelSet(e.LevelMB)
if ok && set == ls {
relevant = append(relevant, e)
}
}
if len(relevant) == 0 {
return nil
}
ranges := EntriesToRanges(relevant)
tmp, err := os.CreateTemp("", "gfs-msg-*.tmp")
if err != nil {
return fmt.Errorf("temp: %w", err)
}
tmpPath := tmp.Name()
defer os.Remove(tmpPath)
for _, r := range ranges {
body, err := s.httpGetRange(ctx, baseURL, r.Start, r.End, throttle, prog)
if err != nil {
tmp.Close()
return fmt.Errorf("range %d-%d: %w", r.Start, r.End, err)
}
if _, err := tmp.Write(body); err != nil {
tmp.Close()
return fmt.Errorf("write tmp: %w", err)
}
}
if err := tmp.Close(); err != nil {
return err
}
f, err := os.Open(tmpPath)
if err != nil {
return err
}
messages, err := griblib.ReadMessages(f)
f.Close()
if err != nil {
return fmt.Errorf("read grib: %w", err)
}
for _, msg := range messages {
if msg.Section4.ProductDefinitionTemplateNumber != 0 {
continue
}
p := msg.Section4.ProductDefinitionTemplate
varIdx := wgfs.VariableIndex(int(p.ParameterCategory), int(p.ParameterNumber))
if varIdx < 0 {
continue
}
if p.FirstSurface.Type != 100 { // isobaric only
continue
}
pressureMB := int(math.Round(float64(p.FirstSurface.Value) / 100.0))
levelIdx := wgfs.PressureIndex(pressureMB)
if levelIdx < 0 {
continue
}
data := msg.Data()
fileMu.Lock()
err := file.BlitGribData(hourIdx, levelIdx, varIdx, data)
fileMu.Unlock()
if err != nil {
return fmt.Errorf("blit: %w", err)
}
}
return nil
}
// httpGet downloads a URL body with 3 retries and optional throttling.
func (s *Source) httpGet(ctx context.Context, url string, throttle datasets.Throttle, prog datasets.ProgressSink) ([]byte, error) {
var lastErr error
for attempt := range 3 {
if attempt > 0 {
select {
case <-time.After(time.Duration(attempt*2) * time.Second):
case <-ctx.Done():
return nil, ctx.Err()
}
}
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, err
}
resp, err := s.client().Do(req)
if err != nil {
lastErr = err
continue
}
body, err := readThrottled(ctx, resp.Body, throttle, prog)
resp.Body.Close()
if resp.StatusCode != http.StatusOK {
lastErr = fmt.Errorf("HTTP %d for %s", resp.StatusCode, url)
continue
}
if err != nil {
lastErr = err
continue
}
return body, nil
}
return nil, fmt.Errorf("after 3 attempts: %w", lastErr)
}
// httpGetRange downloads an inclusive byte range with 3 retries and throttling.
func (s *Source) httpGetRange(ctx context.Context, url string, start, end int64, throttle datasets.Throttle, prog datasets.ProgressSink) ([]byte, error) {
var lastErr error
for attempt := range 3 {
if attempt > 0 {
select {
case <-time.After(time.Duration(attempt*2) * time.Second):
case <-ctx.Done():
return nil, ctx.Err()
}
}
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, err
}
req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", start, end))
resp, err := s.client().Do(req)
if err != nil {
lastErr = err
continue
}
body, err := readThrottled(ctx, resp.Body, throttle, prog)
resp.Body.Close()
if resp.StatusCode != http.StatusPartialContent && resp.StatusCode != http.StatusOK {
lastErr = fmt.Errorf("HTTP %d for range %d-%d of %s", resp.StatusCode, start, end, url)
continue
}
if err != nil {
lastErr = err
continue
}
return body, nil
}
return nil, fmt.Errorf("after 3 attempts: %w", lastErr)
}
// readThrottled reads r into memory, consulting throttle (if non-nil) before
// each chunk and reporting bytes to prog.
func readThrottled(ctx context.Context, r io.Reader, throttle datasets.Throttle, prog datasets.ProgressSink) ([]byte, error) {
buf := make([]byte, 0, 64*1024)
chunk := make([]byte, 32*1024)
for {
if throttle != nil {
if err := throttle.Wait(ctx, len(chunk)); err != nil {
return nil, err
}
}
n, err := r.Read(chunk)
if n > 0 {
buf = append(buf, chunk[:n]...)
prog.Bytes(int64(n))
}
if errors.Is(err, io.EOF) {
return buf, nil
}
if err != nil {
return nil, err
}
}
}
func unitKey(step int, ls wgfs.LevelSet) string {
return fmt.Sprintf("step%03d-%s", step, levelSetLabel(ls))
}
func levelSetLabel(ls wgfs.LevelSet) string {
if ls == wgfs.LevelSetB {
return "B"
}
return "A"
}
// noopSink discards progress events.
type noopSink struct{}
func (noopSink) SetTotal(int) {}
func (noopSink) StepComplete() {}
func (noopSink) Bytes(int64) {}

View file

@ -0,0 +1,383 @@
package datasets
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/google/uuid"
"go.uber.org/zap"
"predictor-refactored/internal/weather"
)
// JobStatus is the lifecycle state of a download job.
type JobStatus string
const (
JobPending JobStatus = "pending"
JobRunning JobStatus = "running"
JobComplete JobStatus = "complete"
JobFailed JobStatus = "failed"
JobCancelled JobStatus = "cancelled"
)
// JobInfo is the externally-visible snapshot of a download job.
type JobInfo struct {
ID string
Source string
Epoch time.Time
Status JobStatus
StartedAt time.Time
EndedAt *time.Time
Err string
Total int
Done int
Bytes int64
}
// jobEntry is the Manager's mutable record for one job.
type jobEntry struct {
id string
source string
epoch time.Time
startedAt time.Time
cancel context.CancelFunc
mu sync.Mutex
status JobStatus
endedAt time.Time
errStr string
total atomic.Int64
done atomic.Int64
bytes atomic.Int64
}
func (e *jobEntry) snapshot() JobInfo {
e.mu.Lock()
info := JobInfo{
ID: e.id, Source: e.source, Epoch: e.epoch,
StartedAt: e.startedAt, Status: e.status, Err: e.errStr,
}
if !e.endedAt.IsZero() {
ts := e.endedAt
info.EndedAt = &ts
}
e.mu.Unlock()
info.Total = int(e.total.Load())
info.Done = int(e.done.Load())
info.Bytes = e.bytes.Load()
return info
}
// jobProgress is the ProgressSink wired into a jobEntry.
type jobProgress struct{ e *jobEntry }
func (p jobProgress) SetTotal(n int) { p.e.total.Store(int64(n)) }
func (p jobProgress) StepComplete() { p.e.done.Add(1) }
func (p jobProgress) Bytes(n int64) { p.e.bytes.Add(n) }
// Manager coordinates dataset downloads and exposes the active WindField.
type Manager struct {
src Source
store Storage
throttle Throttle
log *zap.Logger
activeMu sync.RWMutex
active weather.WindField
jobsMu sync.RWMutex
jobs map[string]*jobEntry
// inFlight maps an epoch's RFC3339 representation to its jobID, enforcing
// single-flight per epoch.
inFlight sync.Map
}
// New returns a Manager wiring source, store, and an optional throttle.
// A nil log uses zap.NewNop().
func New(src Source, store Storage, throttle Throttle, log *zap.Logger) *Manager {
if log == nil {
log = zap.NewNop()
}
if src.ID() != store.SourceID() {
log.Warn("source/store ID mismatch",
zap.String("src", src.ID()),
zap.String("store", store.SourceID()))
}
return &Manager{
src: src, store: store, throttle: throttle, log: log,
jobs: make(map[string]*jobEntry),
}
}
// Source returns the underlying source ID.
func (m *Manager) Source() string { return m.src.ID() }
// Active returns the currently-loaded WindField, or nil.
func (m *Manager) Active() weather.WindField {
m.activeMu.RLock()
defer m.activeMu.RUnlock()
return m.active
}
// Ready reports whether a dataset is currently loaded.
func (m *Manager) Ready() bool { return m.Active() != nil }
// ListEpochs returns all stored dataset epochs, newest first.
func (m *Manager) ListEpochs() ([]time.Time, error) { return m.store.List() }
// ListJobs returns snapshots of every job recorded since startup.
func (m *Manager) ListJobs() []JobInfo {
m.jobsMu.RLock()
defer m.jobsMu.RUnlock()
out := make([]JobInfo, 0, len(m.jobs))
for _, e := range m.jobs {
out = append(out, e.snapshot())
}
return out
}
// GetJob returns the snapshot for a job, or false if id is unknown.
func (m *Manager) GetJob(id string) (JobInfo, bool) {
m.jobsMu.RLock()
e, ok := m.jobs[id]
m.jobsMu.RUnlock()
if !ok {
return JobInfo{}, false
}
return e.snapshot(), true
}
// CancelJob cancels a running job. Returns false if id is unknown or the
// job is already terminal.
func (m *Manager) CancelJob(id string) bool {
m.jobsMu.RLock()
e, ok := m.jobs[id]
m.jobsMu.RUnlock()
if !ok {
return false
}
e.mu.Lock()
terminal := e.status == JobComplete || e.status == JobFailed || e.status == JobCancelled
e.mu.Unlock()
if terminal {
return false
}
e.cancel()
return true
}
// RemoveEpoch deletes a stored dataset. If epoch is currently active, the
// active field is cleared.
func (m *Manager) RemoveEpoch(epoch time.Time) error {
epoch = epoch.UTC()
if active := m.Active(); active != nil && active.Epoch().Equal(epoch) {
m.activeMu.Lock()
m.active = nil
m.activeMu.Unlock()
}
return m.store.Remove(epoch)
}
// Download starts (or resumes) a download job for epoch in the background.
// Returns the JobID. If a job for the same epoch is already running, its
// existing JobID is returned.
//
// If the dataset is already present on disk, a synthetic completed JobInfo
// is recorded and its JobID returned.
func (m *Manager) Download(epoch time.Time) string {
epoch = epoch.UTC()
key := epoch.Format(time.RFC3339)
if existing, ok := m.inFlight.Load(key); ok {
return existing.(string)
}
jobID := uuid.New().String()
if other, loaded := m.inFlight.LoadOrStore(key, jobID); loaded {
return other.(string)
}
ctx, cancel := context.WithCancel(context.Background())
now := time.Now().UTC()
e := &jobEntry{
id: jobID,
source: m.src.ID(),
epoch: epoch,
startedAt: now,
status: JobPending,
cancel: cancel,
}
m.jobsMu.Lock()
m.jobs[jobID] = e
m.jobsMu.Unlock()
if m.store.Exists(epoch) {
// Skip the download but still record the job for traceability.
go m.completeShortCircuit(ctx, e)
return jobID
}
go m.runDownload(ctx, e)
return jobID
}
// LoadEpoch swaps the active WindField to epoch's stored dataset.
func (m *Manager) LoadEpoch(ctx context.Context, epoch time.Time) error {
epoch = epoch.UTC()
if !m.store.Exists(epoch) {
return fmt.Errorf("epoch %s not present on disk", epoch.Format(time.RFC3339))
}
field, err := m.src.Open(ctx, epoch, m.store)
if err != nil {
return fmt.Errorf("open epoch: %w", err)
}
m.swapActive(field)
m.log.Info("loaded dataset",
zap.Time("epoch", epoch),
zap.String("source", m.src.ID()))
return nil
}
// Refresh ensures the most recent upstream dataset is downloaded and active.
//
// If the freshest stored dataset is newer than retentionTTL old, no upstream
// check is performed. Otherwise the source's LatestEpoch is consulted; if it
// is newer than the active dataset, a download is started and on completion
// the new dataset becomes active.
//
// Returns the JobID started, or empty string when nothing was scheduled.
func (m *Manager) Refresh(ctx context.Context, freshnessTTL time.Duration) (string, error) {
if active := m.Active(); active != nil && time.Since(active.Epoch()) < freshnessTTL {
return "", nil
}
// Try loading the freshest existing dataset before going to the network.
if epochs, err := m.store.List(); err == nil {
for _, e := range epochs {
if time.Since(e) > freshnessTTL {
continue
}
if active := m.Active(); active != nil && active.Epoch().Equal(e) {
return "", nil
}
if err := m.LoadEpoch(ctx, e); err == nil {
return "", nil
}
}
}
latest, err := m.src.LatestEpoch(ctx)
if err != nil {
return "", fmt.Errorf("latest epoch: %w", err)
}
if active := m.Active(); active != nil && !latest.After(active.Epoch()) {
return "", nil
}
jobID := m.Download(latest)
// Spawn a watcher that loads the dataset on successful completion.
go func() {
for {
info, ok := m.GetJob(jobID)
if !ok {
return
}
switch info.Status {
case JobComplete:
if err := m.LoadEpoch(context.Background(), latest); err != nil {
m.log.Error("load after download", zap.Error(err))
}
return
case JobFailed, JobCancelled:
return
}
time.Sleep(2 * time.Second)
}
}()
return jobID, nil
}
// runDownload executes one Source.Download invocation and records its outcome.
func (m *Manager) runDownload(ctx context.Context, e *jobEntry) {
defer m.inFlight.Delete(e.epoch.Format(time.RFC3339))
e.mu.Lock()
e.status = JobRunning
e.mu.Unlock()
m.log.Info("download started",
zap.String("job", e.id),
zap.Time("epoch", e.epoch))
err := m.src.Download(ctx, e.epoch, m.store, jobProgress{e: e}, m.throttle)
now := time.Now().UTC()
e.mu.Lock()
e.endedAt = now
switch {
case errors.Is(err, context.Canceled):
e.status = JobCancelled
case err != nil:
e.status = JobFailed
e.errStr = err.Error()
default:
e.status = JobComplete
}
finalStatus := e.status
e.mu.Unlock()
m.log.Info("download finished",
zap.String("job", e.id),
zap.String("status", string(finalStatus)),
zap.NamedError("err", err))
}
// completeShortCircuit records a job as complete without performing any work.
func (m *Manager) completeShortCircuit(ctx context.Context, e *jobEntry) {
_ = ctx
defer m.inFlight.Delete(e.epoch.Format(time.RFC3339))
now := time.Now().UTC()
e.mu.Lock()
e.status = JobComplete
e.endedAt = now
e.mu.Unlock()
}
// swapActive replaces the active field and closes the previous one if it
// implements io.Closer.
func (m *Manager) swapActive(f weather.WindField) {
m.activeMu.Lock()
old := m.active
m.active = f
m.activeMu.Unlock()
if c, ok := old.(interface{ Close() error }); ok && c != nil {
if err := c.Close(); err != nil {
m.log.Warn("close old dataset", zap.Error(err))
}
}
}
// Close releases all resources, cancelling any in-flight jobs.
func (m *Manager) Close() error {
m.jobsMu.Lock()
for _, e := range m.jobs {
e.cancel()
}
m.jobsMu.Unlock()
m.activeMu.Lock()
active := m.active
m.active = nil
m.activeMu.Unlock()
if c, ok := active.(interface{ Close() error }); ok && c != nil {
return c.Close()
}
return nil
}

View file

@ -0,0 +1,118 @@
package datasets
import (
"encoding/json"
"errors"
"fmt"
"os"
"sort"
"sync"
)
// Manifest tracks completed work units for a partial dataset download.
// Units are arbitrary opaque strings; sources choose the format
// (e.g. "step12-A" for "forecast step 12, level set A").
//
// A Manifest is persisted as a JSON object: {"units": ["step0-A", "step0-B", ...]}.
type Manifest struct {
path string
mu sync.Mutex
units map[string]struct{}
}
// LoadManifest opens or creates the manifest at path. Missing or unreadable
// files are treated as empty; a corrupt file returns an error.
func LoadManifest(path string) (*Manifest, error) {
m := &Manifest{path: path, units: make(map[string]struct{})}
data, err := os.ReadFile(path)
if errors.Is(err, os.ErrNotExist) {
return m, nil
}
if err != nil {
return nil, fmt.Errorf("read manifest %s: %w", path, err)
}
if len(data) == 0 {
return m, nil
}
var doc struct {
Units []string `json:"units"`
}
if err := json.Unmarshal(data, &doc); err != nil {
return nil, fmt.Errorf("parse manifest %s: %w", path, err)
}
for _, u := range doc.Units {
m.units[u] = struct{}{}
}
return m, nil
}
// Has reports whether unit has been recorded as completed.
func (m *Manifest) Has(unit string) bool {
m.mu.Lock()
defer m.mu.Unlock()
_, ok := m.units[unit]
return ok
}
// Mark records unit as completed and persists the manifest to disk.
func (m *Manifest) Mark(unit string) error {
m.mu.Lock()
defer m.mu.Unlock()
if _, ok := m.units[unit]; ok {
return nil
}
m.units[unit] = struct{}{}
return m.persistLocked()
}
// Units returns the completed units in sorted order.
func (m *Manifest) Units() []string {
m.mu.Lock()
defer m.mu.Unlock()
out := make([]string, 0, len(m.units))
for u := range m.units {
out = append(out, u)
}
sort.Strings(out)
return out
}
// Reset clears all recorded units and removes the manifest file.
func (m *Manifest) Reset() error {
m.mu.Lock()
defer m.mu.Unlock()
m.units = make(map[string]struct{})
if err := os.Remove(m.path); err != nil && !errors.Is(err, os.ErrNotExist) {
return fmt.Errorf("remove manifest %s: %w", m.path, err)
}
return nil
}
// persistLocked writes the manifest to disk via temp+rename.
// The caller must hold m.mu.
func (m *Manifest) persistLocked() error {
units := make([]string, 0, len(m.units))
for u := range m.units {
units = append(units, u)
}
sort.Strings(units)
data, err := json.Marshal(struct {
Units []string `json:"units"`
}{Units: units})
if err != nil {
return err
}
tmp := m.path + ".new"
if err := os.WriteFile(tmp, data, 0o644); err != nil {
return fmt.Errorf("write manifest temp: %w", err)
}
if err := os.Rename(tmp, m.path); err != nil {
os.Remove(tmp)
return fmt.Errorf("rename manifest: %w", err)
}
return nil
}

View file

@ -0,0 +1,167 @@
package datasets
import (
"errors"
"fmt"
"os"
"path/filepath"
"sort"
"strings"
"time"
)
// LocalStore stores dataset files on the local filesystem.
//
// Layout under Root:
//
// <epoch>.bin — committed dataset (binary cube)
// <epoch>.bin.downloading — in-progress dataset
// <epoch>.bin.manifest.json — manifest of completed work units
//
// The .bin suffix exists to differentiate from sidecars in directory listings;
// epoch is formatted as "20060102T150405Z" (UTC).
type LocalStore struct {
Root string
Source string // source ID, recorded for safety but currently advisory
Extension string // default ".bin"
}
// NewLocalStore returns a LocalStore at root. The directory is created if missing.
func NewLocalStore(root, sourceID string) (*LocalStore, error) {
if err := os.MkdirAll(root, 0o755); err != nil {
return nil, fmt.Errorf("create store root %s: %w", root, err)
}
return &LocalStore{Root: root, Source: sourceID, Extension: ".bin"}, nil
}
// SourceID returns the source ID this store is configured for.
func (s *LocalStore) SourceID() string { return s.Source }
const epochFormat = "20060102T150405Z"
func (s *LocalStore) ext() string {
if s.Extension == "" {
return ".bin"
}
return s.Extension
}
// Path returns the canonical path for an epoch's committed dataset file.
func (s *LocalStore) Path(epoch time.Time) string {
return filepath.Join(s.Root, epoch.UTC().Format(epochFormat)+s.ext())
}
func (s *LocalStore) tempPath(epoch time.Time) string {
return s.Path(epoch) + ".downloading"
}
func (s *LocalStore) manifestPath(epoch time.Time) string {
return s.Path(epoch) + ".manifest.json"
}
// Exists reports whether a committed dataset for epoch is present.
func (s *LocalStore) Exists(epoch time.Time) bool {
info, err := os.Stat(s.Path(epoch))
return err == nil && !info.IsDir()
}
// List returns all committed epochs, newest first.
func (s *LocalStore) List() ([]time.Time, error) {
entries, err := os.ReadDir(s.Root)
if err != nil {
return nil, fmt.Errorf("read store: %w", err)
}
var out []time.Time
ext := s.ext()
for _, e := range entries {
if e.IsDir() {
continue
}
name := e.Name()
if !strings.HasSuffix(name, ext) {
continue
}
stem := strings.TrimSuffix(name, ext)
// skip in-progress files (their stem already has .bin.downloading...)
if strings.Contains(stem, ".") {
continue
}
t, err := time.Parse(epochFormat, stem)
if err != nil {
continue
}
out = append(out, t.UTC())
}
sort.Slice(out, func(i, j int) bool { return out[i].After(out[j]) })
return out, nil
}
// Remove deletes the committed dataset and any sidecar files for epoch.
func (s *LocalStore) Remove(epoch time.Time) error {
var errs []error
for _, p := range []string{s.Path(epoch), s.tempPath(epoch), s.manifestPath(epoch)} {
if err := os.Remove(p); err != nil && !errors.Is(err, os.ErrNotExist) {
errs = append(errs, err)
}
}
if len(errs) > 0 {
return fmt.Errorf("remove dataset: %v", errs)
}
return nil
}
// BeginWrite opens or resumes a TempHandle for epoch.
//
// If a partial download is already present, its file and manifest are reused
// so the new download picks up where the previous one stopped.
func (s *LocalStore) BeginWrite(epoch time.Time) (TempHandle, error) {
man, err := LoadManifest(s.manifestPath(epoch))
if err != nil {
return nil, err
}
return &localHandle{
store: s,
epoch: epoch,
manifest: man,
}, nil
}
type localHandle struct {
store *LocalStore
epoch time.Time
manifest *Manifest
closed bool
}
func (h *localHandle) Path() string { return h.store.tempPath(h.epoch) }
func (h *localHandle) Manifest() *Manifest { return h.manifest }
// Commit promotes the temp file to its final path and removes the manifest.
func (h *localHandle) Commit() error {
if h.closed {
return nil
}
h.closed = true
if err := os.Rename(h.store.tempPath(h.epoch), h.store.Path(h.epoch)); err != nil {
return fmt.Errorf("commit rename: %w", err)
}
if err := os.Remove(h.store.manifestPath(h.epoch)); err != nil && !errors.Is(err, os.ErrNotExist) {
return fmt.Errorf("commit remove manifest: %w", err)
}
return nil
}
// Abort removes the in-progress file and manifest.
func (h *localHandle) Abort() error {
if h.closed {
return nil
}
h.closed = true
var firstErr error
for _, p := range []string{h.store.tempPath(h.epoch), h.store.manifestPath(h.epoch)} {
if err := os.Remove(p); err != nil && !errors.Is(err, os.ErrNotExist) && firstErr == nil {
firstErr = err
}
}
return firstErr
}

View file

@ -0,0 +1,82 @@
package datasets
import (
"os"
"path/filepath"
"testing"
"time"
)
func TestLocalStoreBeginWriteResume(t *testing.T) {
dir := t.TempDir()
store, err := NewLocalStore(dir, "gfs-test")
if err != nil {
t.Fatalf("NewLocalStore: %v", err)
}
epoch := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC)
h, err := store.BeginWrite(epoch)
if err != nil {
t.Fatalf("BeginWrite: %v", err)
}
if err := os.WriteFile(h.Path(), []byte("partial"), 0o644); err != nil {
t.Fatalf("write partial: %v", err)
}
if err := h.Manifest().Mark("step000-A"); err != nil {
t.Fatalf("mark: %v", err)
}
// Re-open should see the previous manifest entry.
h2, err := store.BeginWrite(epoch)
if err != nil {
t.Fatalf("BeginWrite resume: %v", err)
}
if !h2.Manifest().Has("step000-A") {
t.Errorf("resumed manifest missing step000-A; units = %v", h2.Manifest().Units())
}
// Commit promotes the temp file and removes the manifest.
if err := h2.Commit(); err != nil {
t.Fatalf("Commit: %v", err)
}
if !store.Exists(epoch) {
t.Errorf("Exists after commit returned false")
}
if _, err := os.Stat(filepath.Join(dir, store.manifestPath(epoch))); !os.IsNotExist(err) {
t.Errorf("manifest should be removed, got err=%v", err)
}
// Listing finds the committed epoch.
epochs, err := store.List()
if err != nil {
t.Fatalf("List: %v", err)
}
if len(epochs) != 1 || !epochs[0].Equal(epoch) {
t.Errorf("List = %v, want [%v]", epochs, epoch)
}
// Remove cleans up.
if err := store.Remove(epoch); err != nil {
t.Fatalf("Remove: %v", err)
}
if store.Exists(epoch) {
t.Errorf("Exists after remove returned true")
}
}
func TestLocalStoreAbort(t *testing.T) {
dir := t.TempDir()
store, _ := NewLocalStore(dir, "gfs-test")
epoch := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC)
h, _ := store.BeginWrite(epoch)
os.WriteFile(h.Path(), []byte("x"), 0o644)
h.Manifest().Mark("step000-A")
if err := h.Abort(); err != nil {
t.Fatalf("Abort: %v", err)
}
if _, err := os.Stat(h.Path()); !os.IsNotExist(err) {
t.Errorf("temp file should be removed after abort, got %v", err)
}
}

View file

@ -0,0 +1,63 @@
package datasets
import (
"context"
"sync"
"time"
)
// TokenBucket is a simple bytes-per-second rate limiter.
//
// The bucket is initialised full (capacity = rate × 1 second). Calls to Wait
// block until enough tokens have accumulated.
type TokenBucket struct {
mu sync.Mutex
rate float64 // tokens per second
tokens float64
cap float64
last time.Time
}
// NewTokenBucket returns a TokenBucket emitting at most bytesPerSecond.
// A non-positive rate disables throttling (Wait becomes a no-op).
func NewTokenBucket(bytesPerSecond int64) *TokenBucket {
if bytesPerSecond <= 0 {
return &TokenBucket{rate: 0}
}
r := float64(bytesPerSecond)
return &TokenBucket{rate: r, tokens: r, cap: r, last: time.Now()}
}
// Wait blocks until n tokens are available or ctx is cancelled.
func (t *TokenBucket) Wait(ctx context.Context, n int) error {
if t.rate <= 0 {
return nil
}
want := float64(n)
for {
t.mu.Lock()
now := time.Now()
elapsed := now.Sub(t.last).Seconds()
t.last = now
t.tokens += elapsed * t.rate
if t.tokens > t.cap {
t.tokens = t.cap
}
if t.tokens >= want {
t.tokens -= want
t.mu.Unlock()
return nil
}
// Sleep until we expect enough tokens.
need := want - t.tokens
sleep := time.Duration(need / t.rate * float64(time.Second))
t.mu.Unlock()
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(sleep):
}
}
}

View file

@ -0,0 +1,97 @@
package datasets
import (
"context"
"time"
"predictor-refactored/internal/weather"
)
// Source is a pluggable origin for atmospheric datasets.
//
// Implementations download dataset files in a transactional, resumable
// manner and load them as weather.WindField. A Source must be safe for
// concurrent use across multiple Manager calls.
type Source interface {
// ID is a stable identifier, e.g. "noaa-gfs-0p50".
ID() string
// LatestEpoch returns the most recent dataset epoch this source can provide.
LatestEpoch(ctx context.Context) (time.Time, error)
// Download fetches the dataset for epoch into store. Sources must honour
// any partial progress recorded in store's manifest and skip
// already-completed work, so re-invocation after a crash resumes cleanly.
//
// prog receives progress events; nil is acceptable.
// throttle, if non-nil, is consulted before each network read for
// bandwidth limiting; nil means no throttling.
Download(ctx context.Context, epoch time.Time, store Storage, prog ProgressSink, throttle Throttle) error
// Open loads epoch's stored dataset and returns it as a WindField.
Open(ctx context.Context, epoch time.Time, store Storage) (weather.WindField, error)
}
// Storage abstracts the on-disk location of dataset files and their manifests.
//
// Atomicity: only datasets promoted via TempHandle.Commit appear in Exists or
// List. Aborted or in-progress downloads are invisible to readers.
type Storage interface {
// SourceID identifies the data source these files belong to. Mixing
// sources in one Storage is not supported.
SourceID() string
// Path returns the canonical local path for epoch's dataset. The path
// is valid even when the dataset has not been written.
Path(epoch time.Time) string
// Exists reports whether a committed dataset for epoch is present.
Exists(epoch time.Time) bool
// List returns all committed epochs available, newest first.
List() ([]time.Time, error)
// Remove deletes the dataset and any sidecar manifest for epoch.
Remove(epoch time.Time) error
// BeginWrite opens (or resumes) a transactional handle for downloading
// epoch's dataset. Callers must Commit or Abort the returned handle.
BeginWrite(epoch time.Time) (TempHandle, error)
}
// TempHandle is the storage state for one in-progress download.
type TempHandle interface {
// Path returns the path of the in-progress file. Sources write directly here.
Path() string
// Manifest is the tracker of completed work units for resume support.
Manifest() *Manifest
// Commit promotes the temp file to its canonical location and removes
// the manifest. Subsequent calls are no-ops.
Commit() error
// Abort discards the temp file and manifest. Subsequent calls are no-ops.
Abort() error
}
// ProgressSink receives progress events during a download.
//
// All methods are safe to call concurrently.
type ProgressSink interface {
// SetTotal sets the total number of work units this download expects.
// May be called multiple times if discovery happens incrementally.
SetTotal(n int)
// StepComplete records one work unit as completed.
StepComplete()
// Bytes records n bytes received from the network.
Bytes(n int64)
}
// Throttle is an optional bandwidth limiter consulted by sources before
// each network read.
type Throttle interface {
// Wait blocks until n bytes can be consumed from the budget,
// or returns ctx's error if the context is cancelled while waiting.
Wait(ctx context.Context, n int) error
}