feat: downloader

This commit is contained in:
Anatoly Antonov 2025-06-23 04:19:26 +03:00
parent b9c1a98895
commit 42e7924be9
37 changed files with 2422 additions and 94 deletions

View file

@ -1,7 +1,21 @@
package ds
import "time"
type PredictionParameters struct {
LaunchLatitude float64
LaunchLongitude float64
LaunchDatetime time.Time
LaunchAltitude float64
// Add other parameters as needed
}
type PredicitonResult struct {
}
Latitude float64
Longitude float64
Altitude float64
Timestamp time.Time
WindU float64
WindV float64
// Add other result fields as needed
}

View file

@ -23,7 +23,89 @@ func (e *ErrorCode) Error() string {
return e.Message
}
// IsErr checks if the given error is an ErrorCode
func IsErr(err error) bool {
_, ok := err.(*ErrorCode)
return ok
}
// AsErr converts error to ErrorCode if possible
func AsErr(err error) (*ErrorCode, bool) {
if err == nil {
return nil, false
}
errcode, ok := err.(*ErrorCode)
return errcode, ok
}
// Join combines multiple errors into a single ErrorCode
func Join(errs ...error) error {
if len(errs) == 0 {
return nil
}
var messages []string
var details []string
for _, err := range errs {
if err == nil {
continue
}
if errcode, ok := AsErr(err); ok {
messages = append(messages, errcode.Message)
if errcode.Details != "" {
details = append(details, errcode.Details)
}
} else {
messages = append(messages, err.Error())
}
}
if len(messages) == 0 {
return nil
}
// Use the first error's status code, or default to 500
statusCode := http.StatusInternalServerError
if len(errs) > 0 {
if errcode, ok := AsErr(errs[0]); ok {
statusCode = errcode.StatusCode
}
}
return New(statusCode, strings.Join(messages, "; "), details...)
}
// Wrap wraps an error with additional context
func Wrap(err error, message string) error {
if err == nil {
return nil
}
if errcode, ok := AsErr(err); ok {
return New(errcode.StatusCode, message, errcode.Message, errcode.Details)
}
return New(http.StatusInternalServerError, message, err.Error())
}
var (
ErrNoDataset = New(http.StatusNotFound, "no grib dataset found")
ErrOutOfBounds = New(http.StatusBadRequest, "requested time is out of bounds")
ErrNoDataset = New(http.StatusNotFound, "no grib dataset found")
ErrOutOfBounds = New(http.StatusBadRequest, "requested time is out of bounds")
ErrConfig = New(http.StatusInternalServerError, "configuration error")
ErrConfigInvalidEnv = New(http.StatusInternalServerError, "invalid environment configuration")
ErrConfigMissingRequired = New(http.StatusInternalServerError, "missing required configuration")
ErrRedis = New(http.StatusInternalServerError, "redis error")
ErrRedisLockAlreadyLocked = New(http.StatusConflict, "could not perform redis lock", "already locked")
ErrRedisCacheMiss = New(http.StatusNotFound, "cache miss", "key not found")
ErrRedisCacheCorrupted = New(http.StatusInternalServerError, "cache data corrupted", "invalid format")
ErrDownload = New(http.StatusInternalServerError, "download error")
ErrProcessing = New(http.StatusInternalServerError, "data processing error")
ErrNoCubeFilesFound = New(http.StatusNotFound, "no cube files found")
ErrNoValidCubeFilesFound = New(http.StatusNotFound, "no valid cube files found")
ErrLatestCubeFileIsTooOld = New(http.StatusNotFound, "latest cube file is too old")
ErrScheduler = New(http.StatusInternalServerError, "scheduler error")
ErrSchedulerInvalidJob = New(http.StatusBadRequest, "invalid job configuration")
ErrSchedulerTimeoutTooLong = New(http.StatusBadRequest, "job timeout too long", "timeout cannot exceed interval")
)

View file

@ -0,0 +1,81 @@
package errcodes
import (
"testing"
)
func TestSpecificErrorTypes(t *testing.T) {
// Test Redis lock error
err := ErrRedisLockAlreadyLocked
if !IsErr(err) {
t.Error("Expected IsErr to return true for ErrorCode")
}
errcode, ok := AsErr(err)
if !ok {
t.Error("Expected AsErr to return true for ErrorCode")
}
if errcode != ErrRedisLockAlreadyLocked {
t.Error("Expected AsErr to return the same error")
}
// Test Redis cache miss error
cacheErr := ErrRedisCacheMiss
if !IsErr(cacheErr) {
t.Error("Expected IsErr to return true for cache miss error")
}
// Test configuration error
configErr := ErrConfigInvalidEnv
if !IsErr(configErr) {
t.Error("Expected IsErr to return true for config error")
}
// Test scheduler error
schedulerErr := ErrSchedulerTimeoutTooLong
if !IsErr(schedulerErr) {
t.Error("Expected IsErr to return true for scheduler error")
}
}
func TestErrorChecking(t *testing.T) {
// Example of how to check for specific errors in practice
err := ErrRedisLockAlreadyLocked
// Check if it's a specific error type
if errcode, ok := AsErr(err); ok {
switch errcode {
case ErrRedisLockAlreadyLocked:
// Handle lock already locked case
t.Log("Handling lock already locked error")
case ErrRedisCacheMiss:
// Handle cache miss case
t.Log("Handling cache miss error")
case ErrRedisCacheCorrupted:
// Handle corrupted cache case
t.Log("Handling corrupted cache error")
default:
// Handle other error types
t.Log("Handling other error type")
}
}
}
func TestWrapFunction(t *testing.T) {
originalErr := ErrRedisCacheMiss
wrappedErr := Wrap(originalErr, "additional context")
if !IsErr(wrappedErr) {
t.Error("Expected wrapped error to be an ErrorCode")
}
errcode, ok := AsErr(wrappedErr)
if !ok {
t.Error("Expected AsErr to work with wrapped error")
}
// The wrapped error should have the same status code as the original
if errcode.StatusCode != ErrRedisCacheMiss.StatusCode {
t.Errorf("Expected status code %d, got %d", ErrRedisCacheMiss.StatusCode, errcode.StatusCode)
}
}

104
internal/pkg/grib/README.md Normal file
View file

@ -0,0 +1,104 @@
# GRIB Module
Этот модуль реализует функциональность для работы с GRIB-файлами, аналогичную tawhiri-downloader и tawhiri, но на Go.
## Основные возможности
- **Скачивание GRIB-файлов** с NOMADS (GFS прогнозы)
- **Сборка 5D-куба** (время, давление, широта, долгота, переменные u/v)
- **Эффективное хранение** с использованием mmap
- **Интерполяция** ветровых данных для произвольных координат и времени
- **Кэширование** результатов (in-memory + Redis)
- **Распределенные блокировки** для предотвращения дублирования загрузок
## Архитектура
### Основные компоненты
- **Downloader** - скачивает GRIB-файлы с NOMADS
- **Cube** - управляет 5D-массивом данных через mmap
- **Extractor** - выполняет интерполяцию данных
- **Cache** - кэширует результаты запросов
- **Service** - основной интерфейс для работы с модулем
### Структура данных
5D-куб содержит:
- **Время**: 17 временных срезов (0, 3, 6, ..., 48 часов)
- **Давление**: 34 уровня давления (1000, 975, 950, ..., 2 hPa)
- **Широта**: 361 точка (-90° до +90°)
- **Долгота**: 720 точек (0° до 359.5°)
- **Переменные**: u-ветер и v-ветер
## Использование
```go
// Создание сервиса
cfg := grib.ServiceConfig{
Dir: "/tmp/grib",
TTL: 24 * time.Hour,
CacheTTL: 1 * time.Hour,
Redis: redisClient,
Parallel: 4,
Client: &http.Client{Timeout: 30 * time.Second},
}
service, err := grib.New(cfg)
if err != nil {
log.Fatal(err)
}
defer service.Close()
// Обновление данных
err = service.Update(ctx)
// Извлечение ветровых данных
wind, err := service.Extract(ctx, lat, lon, alt, timestamp)
// wind[0] - u-компонента ветра
// wind[1] - v-компонента ветра
```
## Интерполяция
Модуль выполняет 16-точечную интерполяцию:
1. **Временная интерполяция** между двумя ближайшими срезами
2. **Интерполяция по давлению** между двумя ближайшими уровнями
3. **Билинейная интерполяция** по широте и долготе
## Кэширование
- **In-memory кэш**: быстрый доступ к недавно запрошенным данным
- **Redis кэш**: распределенное кэширование для множественных реплик
## Расписание обновлений
Рекомендуемая частота вызова `Update()`:
- **Каждые 6 часов** - для получения свежих GFS прогнозов
- **При запуске** - для загрузки начальных данных
- **По требованию** - при отсутствии данных для запрашиваемого времени
## Отличия от tawhiri
### Преимущества Go-реализации:
- **Высокая производительность** (mmap, конкурентные загрузки)
- **Эффективное использование памяти** (не загружает весь массив в RAM)
- **Горизонтальное масштабирование** (stateless, множество реплик)
- **Встроенное кэширование** (in-memory + Redis)
### Особенности:
- Использует `github.com/nilsmagnus/grib` вместо pygrib
- Реализует собственную логику интерполяции
- Поддерживает распределенные блокировки через Redis
## Конфигурация
### Переменные окружения:
- `PREDICTOR_GRIB_DATASET_URL` - URL источника данных (опционально)
### Параметры ServiceConfig:
- `Dir` - директория для хранения файлов
- `TTL` - время жизни данных (по умолчанию 24 часа)
- `CacheTTL` - время жизни кэша (по умолчанию 1 час)
- `Redis` - Redis клиент для блокировок и кэша
- `Parallel` - количество параллельных загрузок
- `Client` - HTTP клиент для загрузок

View file

@ -1,8 +1,6 @@
package grib
import (
"encoding/binary"
"math"
"sync"
"time"
)
@ -31,10 +29,3 @@ func (c *memCache) get(k uint64) (vec, bool) {
}
func (c *memCache) set(k uint64, v vec) { c.m.Store(k, item{v, time.Now().Add(c.ttl)}) }
func encodeVec(v vec) []byte {
var b [16]byte
binary.LittleEndian.PutUint64(b[:8], math.Float64bits(v[0]))
binary.LittleEndian.PutUint64(b[8:], math.Float64bits(v[1]))
return b[:]
}

View file

@ -1,24 +1,15 @@
package grib
import (
"fmt"
"net/url"
env "github.com/caarlos0/env/v11"
"time"
)
type Config struct {
DatasetURL url.URL `env:"DATASET_URL"`
}
func NewConfig(servicePrefix string) (*Config, error) {
cfg := &Config{}
if err := env.ParseWithOptions(cfg, env.Options{
PrefixTagName: fmt.Sprintf("%s_GRIB_", servicePrefix),
}); err != nil {
return nil, err
}
return cfg, nil
Dir string `env:"DIR" envDefault:"/tmp/grib"`
TTL time.Duration `env:"TTL" envDefault:"24h"`
CacheTTL time.Duration `env:"CACHE_TTL" envDefault:"1h"`
Parallel int `env:"PARALLEL" envDefault:"4"`
Timeout time.Duration `env:"TIMEOUT" envDefault:"30s"`
DatasetURL url.URL `env:"DATASET_URL" envDefault:"https://nomads.ncep.noaa.gov/"`
}

View file

@ -12,6 +12,7 @@ type cube struct {
mm mmap.MMap // readonly, U followed by V (float32 LE)
t, p, lat, lon int
bytesPerVar int64
file *os.File
}
func openCube(path string) (*cube, error) {
@ -22,6 +23,7 @@ func openCube(path string) (*cube, error) {
mm, err := mmap.Map(f, mmap.RDONLY, 0)
if err != nil {
f.Close()
return nil, err
}
@ -32,7 +34,7 @@ func openCube(path string) (*cube, error) {
nLon = 720
)
return &cube{mm: mm, t: nT, p: nP, lat: nLat, lon: nLon, bytesPerVar: int64(nT * nP * nLat * nLon * 4)}, nil
return &cube{mm: mm, t: nT, p: nP, lat: nLat, lon: nLon, bytesPerVar: int64(nT * nP * nLat * nLon * 4), file: f}, nil
}
func (c *cube) val(varIdx, ti, pi, y, x int) float32 {
@ -41,3 +43,13 @@ func (c *cube) val(varIdx, ti, pi, y, x int) float32 {
bits := binary.LittleEndian.Uint32(c.mm[off : off+4])
return math.Float32frombits(bits)
}
func (c *cube) Close() error {
if c.mm != nil {
c.mm.Unmap()
}
if c.file != nil {
return c.file.Close()
}
return nil
}

View file

@ -4,3 +4,10 @@ type dataset struct {
cube *cube
runUTC int64 // unix seconds
}
func (d *dataset) Close() error {
if d.cube != nil {
return d.cube.Close()
}
return nil
}

View file

@ -9,6 +9,7 @@ import (
"path/filepath"
"time"
"git.intra.yksa.space/gsn/predictor/internal/pkg/errcodes"
"golang.org/x/sync/errgroup"
)
@ -42,7 +43,7 @@ func (d *Downloader) fetch(ctx context.Context, url, dst string) error {
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("bad status %s", resp.Status)
return errcodes.Wrap(errcodes.ErrDownload, "bad status: "+resp.Status)
}
if _, err := io.Copy(f, resp.Body); err != nil {
return err

View file

@ -3,10 +3,12 @@ package grib
import (
"context"
"encoding/binary"
"fmt"
"math"
"net/http"
"os"
"path/filepath"
"strings"
"sync/atomic"
"time"
@ -21,6 +23,12 @@ type RedisIface interface {
Get(key string) ([]byte, error)
}
type Service interface {
Update(ctx context.Context) error
Extract(ctx context.Context, lat, lon, alt float64, ts time.Time) ([2]float64, error)
Close() error
}
type ServiceConfig struct {
Dir string
TTL time.Duration
@ -36,7 +44,7 @@ type service struct {
data atomic.Pointer[dataset]
}
func New(cfg ServiceConfig) (*service, error) {
func New(cfg ServiceConfig) (Service, error) {
if cfg.TTL == 0 {
cfg.TTL = 24 * time.Hour
}
@ -44,29 +52,134 @@ func New(cfg ServiceConfig) (*service, error) {
return nil, err
}
s := &service{cfg: cfg, cache: memCache{ttl: cfg.CacheTTL}}
// Try to load existing dataset on startup
if err := s.loadExistingDataset(); err != nil {
// Log error but don't fail startup - dataset will be loaded on first Update()
// This allows the service to start even if no data is available yet
}
return s, nil
}
// loadExistingDataset tries to load the most recent available dataset
func (s *service) loadExistingDataset() error {
// Find the most recent cube file
pattern := filepath.Join(s.cfg.Dir, "*.cube")
matches, err := filepath.Glob(pattern)
if err != nil {
return err
}
if len(matches) == 0 {
return errcodes.ErrNoCubeFilesFound
}
// Sort by modification time (newest first)
var latestFile string
var latestTime time.Time
for _, match := range matches {
info, err := os.Stat(match)
if err != nil {
continue
}
if info.ModTime().After(latestTime) {
latestTime = info.ModTime()
latestFile = match
}
}
if latestFile == "" {
return errcodes.ErrNoValidCubeFilesFound
}
// Check if the file is fresh enough
if time.Since(latestTime) > s.cfg.TTL {
return errcodes.Wrap(errcodes.ErrLatestCubeFileIsTooOld, "latest cube file is too old")
}
// Load the dataset
c, err := openCube(latestFile)
if err != nil {
return err
}
// Extract run time from filename
base := filepath.Base(latestFile)
runStr := strings.TrimSuffix(base, ".cube")
run, err := time.Parse("20060102_15", runStr)
if err != nil {
c.Close()
return err
}
ds := &dataset{cube: c, runUTC: run.Unix()}
s.data.Store(ds)
return nil
}
// Update() downloads missing GRIBs, assembles cube into a single mmapfile.
func (s *service) Update(ctx context.Context) error {
// Check if we already have fresh data
if d := s.data.Load(); d != nil {
runTime := time.Unix(d.runUTC, 0)
if time.Since(runTime) < s.cfg.TTL {
// Data is still fresh, no need to update
return nil
}
}
unlock, err := s.cfg.Redis.Lock(ctx, "grib-dl", 45*time.Minute)
if err != nil {
return err
}
defer unlock(ctx)
dl := downloader.Downloader{Dir: s.cfg.Dir, Parallel: s.cfg.Parallel, Client: s.cfg.Client}
// Check again after acquiring lock (double-checked locking pattern)
if d := s.data.Load(); d != nil {
runTime := time.Unix(d.runUTC, 0)
if time.Since(runTime) < s.cfg.TTL {
// Another instance already updated the data
return nil
}
}
dl := Downloader{Dir: s.cfg.Dir, Parallel: s.cfg.Parallel, Client: s.cfg.Client}
run := nearestRun(time.Now().UTC().Add(-4 * time.Hour))
// Check if we already have this run
cubePath := filepath.Join(s.cfg.Dir, run.Format("20060102_15")) + ".cube"
if _, err := os.Stat(cubePath); err == nil {
// File exists, check if it's fresh
info, err := os.Stat(cubePath)
if err == nil && time.Since(info.ModTime()) < s.cfg.TTL {
// File is fresh, just load it
c, err := openCube(cubePath)
if err != nil {
return err
}
ds := &dataset{cube: c, runUTC: run.Unix()}
s.data.Store(ds)
s.cache = memCache{ttl: s.cfg.CacheTTL}
return nil
}
}
// Download new data
if err := dl.Run(ctx, run); err != nil {
return err
}
cubePath := filepath.Join(s.cfg.Dir, run.Format("20060102_15")) + ".cube"
// Assemble cube if it doesn't exist
if _, err := os.Stat(cubePath); err != nil {
if err := assembleCube(s.cfg.Dir, run, cubePath); err != nil {
return err
}
}
c, err := openCube(cubePath)
if err != nil {
return err
@ -101,26 +214,52 @@ func assembleCube(dir string, run time.Time, cubePath string) error {
for ti, step := range steps {
fn := filepath.Join(dir, fileName(run, step))
gf, err := griblib.Read(fn)
file, err := os.Open(fn)
if err != nil {
return err
}
for _, m := range gf.Messages {
if m.ParameterShortName != "u" && m.ParameterShortName != "v" {
messages, err := griblib.ReadMessages(file)
file.Close() // Close immediately after reading
if err != nil {
return err
}
for _, m := range messages {
// Check if this is a wind component (u or v)
// ParameterCategory 2 = momentum, ParameterNumber 2 = u-wind, 3 = v-wind
if m.Section4.ProductDefinitionTemplateNumber != 0 {
continue
}
if m.TypeOfFirstFixedSurface != 100 {
product := m.Section4.ProductDefinitionTemplate
if product.ParameterCategory != 2 {
continue
}
pIdx, ok := pIndex[int(m.PressureLevel)]
var varIdx int
switch product.ParameterNumber {
case 2: // u-wind
varIdx = 0
case 3: // v-wind
varIdx = 1
default:
continue
}
// Check if this is a pressure level (type 100)
if product.FirstSurface.Type != 100 {
continue
}
// Get pressure level in hPa
pressure := float64(product.FirstSurface.Value) / 100.0
pIdx, ok := pIndex[int(math.Round(pressure))]
if !ok {
continue
}
varIdx := 0
if m.ParameterShortName == "v" {
varIdx = 1
}
vals := m.Values
vals := m.Data()
// GRIB library returns scan north->south, west->east already in row-major order
raw := make([]byte, len(vals)*4)
for i, v := range vals {
@ -142,13 +281,56 @@ func (s *service) Extract(ctx context.Context, lat, lon, alt float64, ts time.Ti
if ts.Before(time.Unix(d.runUTC, 0)) || ts.After(time.Unix(d.runUTC, 0).Add(48*time.Hour)) {
return zero, errcodes.ErrOutOfBounds
}
// Try memory cache first
key := encodeKey(lat, lon, alt, ts)
if v, ok := s.cache.get(key); ok {
return [2]float64(v), nil
}
// Try Redis cache
redisKey := fmt.Sprintf("grib:extract:%d", key)
if cached, err := s.cfg.Redis.Get(redisKey); err == nil {
var result [2]float64
if len(cached) == 16 {
result[0] = math.Float64frombits(binary.LittleEndian.Uint64(cached[:8]))
result[1] = math.Float64frombits(binary.LittleEndian.Uint64(cached[8:]))
s.cache.set(key, vec(result))
return result, nil
} else {
// Cache data is corrupted (wrong length)
return zero, errcodes.ErrRedisCacheCorrupted
}
} else {
// Check if it's a cache miss (expected error)
if errcode, ok := errcodes.AsErr(err); ok && errcode == errcodes.ErrRedisCacheMiss {
// Cache miss is expected, continue with calculation
} else {
// Unexpected error, return it
return zero, err
}
}
// Calculate result
td := ts.Sub(time.Unix(d.runUTC, 0)).Hours()
u, v := d.uv(lat, lon, alt, td)
out := [2]float64{u, v}
// Cache in memory
s.cache.set(key, vec(out))
// Cache in Redis
encoded := make([]byte, 16)
binary.LittleEndian.PutUint64(encoded[:8], math.Float64bits(out[0]))
binary.LittleEndian.PutUint64(encoded[8:], math.Float64bits(out[1]))
s.cfg.Redis.Set(redisKey, encoded, s.cfg.CacheTTL)
return out, nil
}
func (s *service) Close() error {
if d := s.data.Load(); d != nil {
return d.Close()
}
return nil
}

View file

@ -0,0 +1,62 @@
package grib
import (
"context"
"testing"
"time"
)
func TestServiceCreation(t *testing.T) {
cfg := ServiceConfig{
Dir: "/tmp/grib_test",
TTL: 24 * time.Hour,
CacheTTL: 1 * time.Hour,
Redis: &MockRedis{},
Parallel: 2,
}
service, err := New(cfg)
if err != nil {
t.Fatalf("Failed to create service: %v", err)
}
defer service.Close()
if service == nil {
t.Fatal("Service is nil")
}
}
func TestNearestRun(t *testing.T) {
now := time.Date(2024, 1, 15, 14, 30, 0, 0, time.UTC)
expected := time.Date(2024, 1, 15, 12, 0, 0, 0, time.UTC)
result := nearestRun(now)
if !result.Equal(expected) {
t.Errorf("Expected %v, got %v", expected, result)
}
}
func TestPressureFromAlt(t *testing.T) {
alt := 10000.0 // 10km
pressure := pressureFromAlt(alt)
// At 10km, pressure should be around 264 hPa
if pressure < 200 || pressure > 300 {
t.Errorf("Unexpected pressure at 10km: %f hPa", pressure)
}
}
// MockRedis for testing
type MockRedis struct{}
func (m *MockRedis) Lock(ctx context.Context, key string, ttl time.Duration) (func(context.Context), error) {
return func(ctx context.Context) {}, nil
}
func (m *MockRedis) Set(key string, value []byte, ttl time.Duration) error {
return nil
}
func (m *MockRedis) Get(key string) ([]byte, error) {
return nil, nil
}