predictor/internal/datasets/manager.go

466 lines
11 KiB
Go

package datasets
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/google/uuid"
"go.uber.org/zap"
"predictor-refactored/internal/weather"
)
// JobStatus is the lifecycle state of a download job.
type JobStatus string
const (
JobPending JobStatus = "pending"
JobRunning JobStatus = "running"
JobComplete JobStatus = "complete"
JobFailed JobStatus = "failed"
JobCancelled JobStatus = "cancelled"
)
// JobInfo is the externally-visible snapshot of a download job.
type JobInfo struct {
ID string
Source string
Dataset DatasetID
Status JobStatus
StartedAt time.Time
EndedAt *time.Time
Err string
Total int
Done int
Bytes int64
}
type jobEntry struct {
id string
source string
dataset DatasetID
startedAt time.Time
cancel context.CancelFunc
mu sync.Mutex
status JobStatus
endedAt time.Time
errStr string
total atomic.Int64
done atomic.Int64
bytes atomic.Int64
}
func (e *jobEntry) snapshot() JobInfo {
e.mu.Lock()
info := JobInfo{
ID: e.id, Source: e.source, Dataset: e.dataset,
StartedAt: e.startedAt, Status: e.status, Err: e.errStr,
}
if !e.endedAt.IsZero() {
ts := e.endedAt
info.EndedAt = &ts
}
e.mu.Unlock()
info.Total = int(e.total.Load())
info.Done = int(e.done.Load())
info.Bytes = e.bytes.Load()
return info
}
type jobProgress struct{ e *jobEntry }
func (p jobProgress) SetTotal(n int) { p.e.total.Store(int64(n)) }
func (p jobProgress) StepComplete() { p.e.done.Add(1) }
func (p jobProgress) Bytes(n int64) { p.e.bytes.Add(n) }
// loadedDataset bundles a loaded WindField with its identity and coverage.
type loadedDataset struct {
ID DatasetID
Field weather.WindField
Coverage Coverage
}
// Manager coordinates dataset downloads and exposes the active WindFields.
type Manager struct {
src Source
store Storage
throttle Throttle
log *zap.Logger
activeMu sync.RWMutex
active []loadedDataset
jobsMu sync.RWMutex
jobs map[string]*jobEntry
inFlight sync.Map // key: dataset filename, value: jobID
}
// New wires a Manager.
func New(src Source, store Storage, throttle Throttle, log *zap.Logger) *Manager {
if log == nil {
log = zap.NewNop()
}
if src.ID() != store.SourceID() {
log.Warn("source/store ID mismatch",
zap.String("src", src.ID()),
zap.String("store", store.SourceID()))
}
return &Manager{
src: src, store: store, throttle: throttle, log: log,
jobs: make(map[string]*jobEntry),
}
}
// Source returns the underlying source ID.
func (m *Manager) Source() string { return m.src.ID() }
// Active returns the currently-loaded global WindField (the dataset with
// IsGlobal subset, most recently loaded). Returns nil if no global
// dataset is loaded; in cluster setups with only regional subsets, callers
// should use SelectFor.
func (m *Manager) Active() weather.WindField {
m.activeMu.RLock()
defer m.activeMu.RUnlock()
for _, d := range m.active {
if d.ID.Subset.IsGlobal() {
return d.Field
}
}
if len(m.active) > 0 {
return m.active[0].Field
}
return nil
}
// Ready reports whether at least one dataset is loaded.
func (m *Manager) Ready() bool { return m.Active() != nil }
// SelectFor returns a loaded WindField whose coverage contains (t, lat, lng).
// Returns nil when no loaded dataset covers the query.
func (m *Manager) SelectFor(t time.Time, lat, lng float64) weather.WindField {
m.activeMu.RLock()
defer m.activeMu.RUnlock()
for _, d := range m.active {
if d.Coverage.Covers(t, lat, lng) {
return d.Field
}
}
// Fallback: any global dataset is permissive about region.
for _, d := range m.active {
if d.ID.Subset.IsGlobal() {
return d.Field
}
}
return nil
}
// LoadedDatasets returns snapshots of every currently-loaded dataset.
func (m *Manager) LoadedDatasets() []LoadedDatasetInfo {
m.activeMu.RLock()
defer m.activeMu.RUnlock()
out := make([]LoadedDatasetInfo, 0, len(m.active))
for _, d := range m.active {
out = append(out, LoadedDatasetInfo{ID: d.ID, Coverage: d.Coverage})
}
return out
}
// LoadedDatasetInfo is a serializable snapshot of one active dataset.
type LoadedDatasetInfo struct {
ID DatasetID
Coverage Coverage
}
// ListEpochs returns all stored datasets, newest first.
func (m *Manager) ListEpochs() ([]DatasetID, error) { return m.store.List() }
// ListJobs returns snapshots of every job recorded since startup.
func (m *Manager) ListJobs() []JobInfo {
m.jobsMu.RLock()
defer m.jobsMu.RUnlock()
out := make([]JobInfo, 0, len(m.jobs))
for _, e := range m.jobs {
out = append(out, e.snapshot())
}
return out
}
// GetJob returns the snapshot for a job.
func (m *Manager) GetJob(id string) (JobInfo, bool) {
m.jobsMu.RLock()
e, ok := m.jobs[id]
m.jobsMu.RUnlock()
if !ok {
return JobInfo{}, false
}
return e.snapshot(), true
}
// CancelJob cancels a running job.
func (m *Manager) CancelJob(id string) bool {
m.jobsMu.RLock()
e, ok := m.jobs[id]
m.jobsMu.RUnlock()
if !ok {
return false
}
e.mu.Lock()
terminal := e.status == JobComplete || e.status == JobFailed || e.status == JobCancelled
e.mu.Unlock()
if terminal {
return false
}
e.cancel()
return true
}
// Remove deletes a stored dataset. If the dataset is currently loaded,
// it is unloaded first.
func (m *Manager) Remove(id DatasetID) error {
m.activeMu.Lock()
out := m.active[:0]
var removed *loadedDataset
for i := range m.active {
d := m.active[i]
if d.ID.Equals(id) {
removed = &d
continue
}
out = append(out, d)
}
m.active = out
m.activeMu.Unlock()
if removed != nil {
closeField(removed.Field, m.log)
}
return m.store.Remove(id)
}
// Download starts (or resumes) a download job for id in the background.
func (m *Manager) Download(id DatasetID) string {
key := id.Filename()
if existing, ok := m.inFlight.Load(key); ok {
return existing.(string)
}
jobID := uuid.New().String()
if other, loaded := m.inFlight.LoadOrStore(key, jobID); loaded {
return other.(string)
}
ctx, cancel := context.WithCancel(context.Background())
now := time.Now().UTC()
e := &jobEntry{
id: jobID,
source: m.src.ID(),
dataset: id,
startedAt: now,
status: JobPending,
cancel: cancel,
}
m.jobsMu.Lock()
m.jobs[jobID] = e
m.jobsMu.Unlock()
if m.store.Exists(id) {
go m.completeShortCircuit(ctx, e)
return jobID
}
go m.runDownload(ctx, e)
return jobID
}
// Load swaps in id's stored dataset, making it available to predictions.
func (m *Manager) Load(ctx context.Context, id DatasetID) error {
if !m.store.Exists(id) {
return fmt.Errorf("dataset %s not present on disk", id.Filename())
}
field, err := m.src.Open(ctx, id, m.store)
if err != nil {
return fmt.Errorf("open dataset: %w", err)
}
cov := m.src.Coverage(id)
m.activeMu.Lock()
// Replace any previously-loaded dataset with the same ID.
for i := range m.active {
if m.active[i].ID.Equals(id) {
closeField(m.active[i].Field, m.log)
m.active[i] = loadedDataset{ID: id, Field: field, Coverage: cov}
m.activeMu.Unlock()
return nil
}
}
m.active = append(m.active, loadedDataset{ID: id, Field: field, Coverage: cov})
m.activeMu.Unlock()
m.log.Info("loaded dataset",
zap.String("filename", id.Filename()),
zap.String("source", m.src.ID()))
return nil
}
// Refresh ensures the freshest global dataset is downloaded and active.
//
// Returns the JobID started, or empty string when nothing was scheduled.
func (m *Manager) Refresh(ctx context.Context, freshnessTTL time.Duration) (string, error) {
if a := m.activeGlobal(); a != nil && time.Since(a.ID.Epoch) < freshnessTTL {
return "", nil
}
if datasets, err := m.store.List(); err == nil {
for _, id := range datasets {
if !id.Subset.IsGlobal() {
continue
}
if time.Since(id.Epoch) > freshnessTTL {
continue
}
if a := m.activeGlobal(); a != nil && a.ID.Equals(id) {
return "", nil
}
if err := m.Load(ctx, id); err == nil {
return "", nil
}
}
}
latest, err := m.src.LatestEpoch(ctx)
if err != nil {
return "", fmt.Errorf("latest epoch: %w", err)
}
id := DatasetID{Epoch: latest}
if a := m.activeGlobal(); a != nil && !latest.After(a.ID.Epoch) {
return "", nil
}
jobID := m.Download(id)
go m.loadAfterCompletion(jobID, id)
return jobID, nil
}
// activeGlobal returns the currently-loaded global dataset, if any.
func (m *Manager) activeGlobal() *loadedDataset {
m.activeMu.RLock()
defer m.activeMu.RUnlock()
for i := range m.active {
if m.active[i].ID.Subset.IsGlobal() {
d := m.active[i]
return &d
}
}
return nil
}
func (m *Manager) loadAfterCompletion(jobID string, id DatasetID) {
for {
info, ok := m.GetJob(jobID)
if !ok {
return
}
switch info.Status {
case JobComplete:
if err := m.Load(context.Background(), id); err != nil {
m.log.Error("load after download", zap.Error(err))
}
return
case JobFailed, JobCancelled:
return
}
time.Sleep(2 * time.Second)
}
}
func (m *Manager) runDownload(ctx context.Context, e *jobEntry) {
defer m.inFlight.Delete(e.dataset.Filename())
e.mu.Lock()
e.status = JobRunning
e.mu.Unlock()
m.log.Info("download started",
zap.String("job", e.id),
zap.String("dataset", e.dataset.Filename()))
err := m.downloadLocked(ctx, e)
now := time.Now().UTC()
e.mu.Lock()
e.endedAt = now
switch {
case errors.Is(err, context.Canceled):
e.status = JobCancelled
case err != nil:
e.status = JobFailed
e.errStr = err.Error()
default:
e.status = JobComplete
}
finalStatus := e.status
e.mu.Unlock()
m.log.Info("download finished",
zap.String("job", e.id),
zap.String("status", string(finalStatus)),
zap.NamedError("err", err))
}
// downloadLocked runs the source download while holding the storage's
// cross-process lock, so multiple replicas sharing a node-local dataset
// volume coordinate instead of each fetching ~9 GB. After acquiring the lock
// it re-checks existence: if another replica committed the dataset while this
// one waited, it skips the download and lets the caller load the committed file.
func (m *Manager) downloadLocked(ctx context.Context, e *jobEntry) error {
release, err := m.store.Lock(ctx)
if err != nil {
return fmt.Errorf("acquire download lock: %w", err)
}
defer release()
if m.store.Exists(e.dataset) {
m.log.Info("dataset committed by another instance while waiting; skipping download",
zap.String("dataset", e.dataset.Filename()))
return nil
}
return m.src.Download(ctx, e.dataset, m.store, jobProgress{e: e}, m.throttle)
}
func (m *Manager) completeShortCircuit(ctx context.Context, e *jobEntry) {
_ = ctx
defer m.inFlight.Delete(e.dataset.Filename())
now := time.Now().UTC()
e.mu.Lock()
e.status = JobComplete
e.endedAt = now
e.mu.Unlock()
}
// Close releases all resources, cancelling any in-flight jobs.
func (m *Manager) Close() error {
m.jobsMu.Lock()
for _, e := range m.jobs {
e.cancel()
}
m.jobsMu.Unlock()
m.activeMu.Lock()
for _, d := range m.active {
closeField(d.Field, m.log)
}
m.active = nil
m.activeMu.Unlock()
return nil
}
func closeField(f weather.WindField, log *zap.Logger) {
if c, ok := f.(interface{ Close() error }); ok && c != nil {
if err := c.Close(); err != nil && log != nil {
log.Warn("close dataset", zap.Error(err))
}
}
}