forked from gsn/predictor
feat: cleanup
This commit is contained in:
parent
8e9f117799
commit
82ef1cb3b8
66 changed files with 0 additions and 9521 deletions
|
|
@ -1,23 +0,0 @@
|
|||
package updater
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"git.intra.yksa.space/gsn/predictor/internal/pkg/errcodes"
|
||||
env "github.com/caarlos0/env/v11"
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
Interval time.Duration `env:"INTERVAL" envDefault:"6h"`
|
||||
Timeout time.Duration `env:"TIMEOUT" envDefault:"45m"`
|
||||
}
|
||||
|
||||
func NewConfig() (*Config, error) {
|
||||
cfg := &Config{}
|
||||
if err := env.ParseWithOptions(cfg, env.Options{
|
||||
PrefixTagName: "GSN_PREDICTOR_GRIB_UPDATER_",
|
||||
}); err != nil {
|
||||
return nil, errcodes.Wrap(err, "failed to parse GRIB updater config")
|
||||
}
|
||||
return cfg, nil
|
||||
}
|
||||
|
|
@ -1,8 +0,0 @@
|
|||
package updater
|
||||
|
||||
import "context"
|
||||
|
||||
// GribService defines the interface for GRIB operations needed by the updater job
|
||||
type GribService interface {
|
||||
Update(ctx context.Context) error
|
||||
}
|
||||
|
|
@ -1,51 +0,0 @@
|
|||
package updater
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"git.intra.yksa.space/gsn/predictor/internal/pkg/errcodes"
|
||||
"git.intra.yksa.space/gsn/predictor/internal/pkg/log"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type Job struct {
|
||||
service GribService
|
||||
config *Config
|
||||
}
|
||||
|
||||
func New(service GribService, config *Config) *Job {
|
||||
return &Job{
|
||||
service: service,
|
||||
config: config,
|
||||
}
|
||||
}
|
||||
|
||||
func (j *Job) GetInterval() time.Duration {
|
||||
return j.config.Interval
|
||||
}
|
||||
|
||||
func (j *Job) GetTimeout() time.Duration {
|
||||
return j.config.Timeout
|
||||
}
|
||||
|
||||
func (j *Job) GetCount() int {
|
||||
return 1
|
||||
}
|
||||
|
||||
func (j *Job) GetAsync() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (j *Job) Execute(ctx context.Context) error {
|
||||
log := log.Ctx(ctx)
|
||||
log.Info("executing GRIB update job")
|
||||
|
||||
if err := j.service.Update(ctx); err != nil {
|
||||
log.Error("GRIB update failed", zap.Error(err))
|
||||
return errcodes.Wrap(err, "failed to update GRIB data")
|
||||
}
|
||||
|
||||
log.Info("GRIB update completed successfully")
|
||||
return nil
|
||||
}
|
||||
|
|
@ -1,96 +0,0 @@
|
|||
package ds
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
api "git.intra.yksa.space/gsn/predictor/pkg/rest"
|
||||
)
|
||||
|
||||
type PredictionParameters struct {
|
||||
LaunchLatitude *float64
|
||||
LaunchLongitude *float64
|
||||
LaunchDatetime *time.Time
|
||||
LaunchAltitude *float64
|
||||
Profile *string
|
||||
AscentRate *float64
|
||||
BurstAltitude *float64
|
||||
DescentRate *float64
|
||||
FloatAltitude *float64
|
||||
StopDatetime *time.Time
|
||||
AscentCurve *string // base64
|
||||
DescentCurve *string // base64
|
||||
SimulateStages []string
|
||||
Interpolate *bool
|
||||
Format *string
|
||||
Dataset *time.Time
|
||||
// Add other parameters as needed
|
||||
}
|
||||
|
||||
type PredicitonResult struct {
|
||||
Latitude *float64
|
||||
Longitude *float64
|
||||
Altitude *float64
|
||||
Timestamp *time.Time
|
||||
WindU *float64
|
||||
WindV *float64
|
||||
// Add other result fields as needed
|
||||
}
|
||||
|
||||
// Converts flat ogen params to internal pointer-based model
|
||||
func ConvertFlatPredictionParams(params api.PerformPredictionParams) *PredictionParameters {
|
||||
out := &PredictionParameters{}
|
||||
if v, ok := params.LaunchLatitude.Get(); ok {
|
||||
out.LaunchLatitude = &v
|
||||
}
|
||||
if v, ok := params.LaunchLongitude.Get(); ok {
|
||||
out.LaunchLongitude = &v
|
||||
}
|
||||
if v, ok := params.LaunchDatetime.Get(); ok {
|
||||
out.LaunchDatetime = &v
|
||||
}
|
||||
if v, ok := params.LaunchAltitude.Get(); ok {
|
||||
out.LaunchAltitude = &v
|
||||
}
|
||||
if v, ok := params.Profile.Get(); ok {
|
||||
s := string(v)
|
||||
out.Profile = &s
|
||||
}
|
||||
if v, ok := params.AscentRate.Get(); ok {
|
||||
out.AscentRate = &v
|
||||
}
|
||||
if v, ok := params.BurstAltitude.Get(); ok {
|
||||
out.BurstAltitude = &v
|
||||
}
|
||||
if v, ok := params.DescentRate.Get(); ok {
|
||||
out.DescentRate = &v
|
||||
}
|
||||
if v, ok := params.FloatAltitude.Get(); ok {
|
||||
out.FloatAltitude = &v
|
||||
}
|
||||
if v, ok := params.StopDatetime.Get(); ok {
|
||||
out.StopDatetime = &v
|
||||
}
|
||||
if v, ok := params.AscentCurve.Get(); ok {
|
||||
out.AscentCurve = &v
|
||||
}
|
||||
if v, ok := params.DescentCurve.Get(); ok {
|
||||
out.DescentCurve = &v
|
||||
}
|
||||
if v, ok := params.Interpolate.Get(); ok {
|
||||
out.Interpolate = &v
|
||||
}
|
||||
if v, ok := params.Format.Get(); ok {
|
||||
s := string(v)
|
||||
out.Format = &s
|
||||
}
|
||||
if v, ok := params.Dataset.Get(); ok {
|
||||
out.Dataset = &v
|
||||
}
|
||||
if len(params.SimulateStages) > 0 {
|
||||
out.SimulateStages = make([]string, len(params.SimulateStages))
|
||||
for i, stage := range params.SimulateStages {
|
||||
out.SimulateStages[i] = string(stage)
|
||||
}
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
|
@ -1,102 +0,0 @@
|
|||
package errcodes
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type ErrorCode struct {
|
||||
StatusCode int
|
||||
Message string
|
||||
Details string
|
||||
}
|
||||
|
||||
func New(statusCode int, message string, details ...string) *ErrorCode {
|
||||
return &ErrorCode{
|
||||
StatusCode: statusCode,
|
||||
Message: message,
|
||||
Details: strings.Join(details, " "),
|
||||
}
|
||||
}
|
||||
|
||||
func (e *ErrorCode) Error() string {
|
||||
return e.Message
|
||||
}
|
||||
|
||||
func IsErr(err error) bool {
|
||||
_, ok := err.(*ErrorCode)
|
||||
return ok
|
||||
}
|
||||
|
||||
func AsErr(err error) (*ErrorCode, bool) {
|
||||
if err == nil {
|
||||
return nil, false
|
||||
}
|
||||
errcode, ok := err.(*ErrorCode)
|
||||
return errcode, ok
|
||||
}
|
||||
|
||||
func Join(errs ...error) error {
|
||||
if len(errs) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
var messages []string
|
||||
var details []string
|
||||
|
||||
for _, err := range errs {
|
||||
if err == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
if errcode, ok := AsErr(err); ok {
|
||||
messages = append(messages, errcode.Message)
|
||||
if errcode.Details != "" {
|
||||
details = append(details, errcode.Details)
|
||||
}
|
||||
} else {
|
||||
messages = append(messages, err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
if len(messages) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
statusCode := http.StatusInternalServerError
|
||||
if len(errs) > 0 {
|
||||
if errcode, ok := AsErr(errs[0]); ok {
|
||||
statusCode = errcode.StatusCode
|
||||
}
|
||||
}
|
||||
|
||||
return New(statusCode, strings.Join(messages, "; "), details...)
|
||||
}
|
||||
|
||||
func Wrap(err error, message string) error {
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if errcode, ok := AsErr(err); ok {
|
||||
return New(errcode.StatusCode, message, errcode.Message, errcode.Details)
|
||||
}
|
||||
|
||||
return New(http.StatusInternalServerError, message, err.Error())
|
||||
}
|
||||
|
||||
var (
|
||||
ErrNoDataset = New(http.StatusNotFound, "no grib dataset found")
|
||||
ErrOutOfBounds = New(http.StatusBadRequest, "requested time is out of bounds")
|
||||
ErrConfig = New(http.StatusInternalServerError, "configuration error")
|
||||
ErrConfigInvalidEnv = New(http.StatusInternalServerError, "invalid environment configuration")
|
||||
ErrConfigMissingRequired = New(http.StatusInternalServerError, "missing required configuration")
|
||||
ErrDownload = New(http.StatusInternalServerError, "download error")
|
||||
ErrProcessing = New(http.StatusInternalServerError, "data processing error")
|
||||
ErrNoCubeFilesFound = New(http.StatusNotFound, "no cube files found")
|
||||
ErrNoValidCubeFilesFound = New(http.StatusNotFound, "no valid cube files found")
|
||||
ErrLatestCubeFileIsTooOld = New(http.StatusNotFound, "latest cube file is too old")
|
||||
ErrScheduler = New(http.StatusInternalServerError, "scheduler error")
|
||||
ErrSchedulerInvalidJob = New(http.StatusBadRequest, "invalid job configuration")
|
||||
ErrSchedulerTimeoutTooLong = New(http.StatusBadRequest, "job timeout too long", "timeout cannot exceed interval")
|
||||
)
|
||||
|
|
@ -1,100 +0,0 @@
|
|||
# GRIB Module
|
||||
|
||||
Этот модуль реализует функциональность для работы с GRIB-файлами, аналогичную tawhiri-downloader и tawhiri, но на Go.
|
||||
|
||||
## Основные возможности
|
||||
|
||||
- **Скачивание GRIB-файлов** с NOMADS (GFS прогнозы)
|
||||
- **Сборка 5D-куба** (время, давление, широта, долгота, переменные u/v)
|
||||
- **Эффективное хранение** с использованием mmap
|
||||
- **Интерполяция** ветровых данных для произвольных координат и времени
|
||||
- **Кэширование** результатов (in-memory)
|
||||
- **Распределенные блокировки** для предотвращения дублирования загрузок
|
||||
|
||||
## Архитектура
|
||||
|
||||
### Основные компоненты
|
||||
|
||||
- **Downloader** - скачивает GRIB-файлы с NOMADS
|
||||
- **Cube** - управляет 5D-массивом данных через mmap
|
||||
- **Extractor** - выполняет интерполяцию данных
|
||||
- **Cache** - кэширует результаты запросов
|
||||
- **Service** - основной интерфейс для работы с модулем
|
||||
|
||||
### Структура данных
|
||||
|
||||
5D-куб содержит:
|
||||
- **Время**: 17 временных срезов (0, 3, 6, ..., 48 часов)
|
||||
- **Давление**: 34 уровня давления (1000, 975, 950, ..., 2 hPa)
|
||||
- **Широта**: 361 точка (-90° до +90°)
|
||||
- **Долгота**: 720 точек (0° до 359.5°)
|
||||
- **Переменные**: u-ветер и v-ветер
|
||||
|
||||
## Использование
|
||||
|
||||
```go
|
||||
// Создание сервиса
|
||||
cfg := grib.ServiceConfig{
|
||||
Dir: "/tmp/grib",
|
||||
TTL: 24 * time.Hour,
|
||||
CacheTTL: 1 * time.Hour,
|
||||
Parallel: 4,
|
||||
Client: &http.Client{Timeout: 30 * time.Second},
|
||||
}
|
||||
|
||||
service, err := grib.New(cfg)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
defer service.Close()
|
||||
|
||||
// Обновление данных
|
||||
err = service.Update(ctx)
|
||||
|
||||
// Извлечение ветровых данных
|
||||
wind, err := service.Extract(ctx, lat, lon, alt, timestamp)
|
||||
// wind[0] - u-компонента ветра
|
||||
// wind[1] - v-компонента ветра
|
||||
```
|
||||
|
||||
## Интерполяция
|
||||
|
||||
Модуль выполняет 16-точечную интерполяцию:
|
||||
1. **Временная интерполяция** между двумя ближайшими срезами
|
||||
2. **Интерполяция по давлению** между двумя ближайшими уровнями
|
||||
3. **Билинейная интерполяция** по широте и долготе
|
||||
|
||||
## Кэширование
|
||||
|
||||
- **In-memory кэш**: быстрый доступ к недавно запрошенным данным
|
||||
|
||||
## Расписание обновлений
|
||||
|
||||
Рекомендуемая частота вызова `Update()`:
|
||||
- **Каждые 6 часов** - для получения свежих GFS прогнозов
|
||||
- **При запуске** - для загрузки начальных данных
|
||||
- **По требованию** - при отсутствии данных для запрашиваемого времени
|
||||
|
||||
## Отличия от tawhiri
|
||||
|
||||
### Преимущества Go-реализации:
|
||||
- **Высокая производительность** (mmap, конкурентные загрузки)
|
||||
- **Эффективное использование памяти** (не загружает весь массив в RAM)
|
||||
- **Горизонтальное масштабирование** (stateless, множество реплик)
|
||||
- **Встроенное кэширование** (in-memory)
|
||||
|
||||
### Особенности:
|
||||
- Использует `github.com/nilsmagnus/grib` вместо pygrib
|
||||
- Реализует собственную логику интерполяции
|
||||
|
||||
## Конфигурация
|
||||
|
||||
### Переменные окружения:
|
||||
- `PREDICTOR_GRIB_DATASET_URL` - URL источника данных (опционально)
|
||||
|
||||
### Параметры ServiceConfig:
|
||||
- `Dir` - директория для хранения файлов
|
||||
- `TTL` - время жизни данных (по умолчанию 24 часа)
|
||||
- `CacheTTL` - время жизни кэша (по умолчанию 1 час)
|
||||
- `Parallel` - количество параллельных загрузок
|
||||
- `Client` - HTTP клиент для загрузок
|
||||
|
|
@ -1,25 +0,0 @@
|
|||
package grib
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestAssembleCubeFromExisting(t *testing.T) {
|
||||
dir := "C:/tmp/grib"
|
||||
run := time.Date(2026, 1, 16, 6, 0, 0, 0, time.UTC)
|
||||
cubePath := dir + "/" + run.Format("20060102_15") + ".cube"
|
||||
|
||||
t.Logf("Assembling cube from existing GRIB files...")
|
||||
t.Logf("Directory: %s", dir)
|
||||
t.Logf("Run: %s", run.Format("2006-01-02 15:04 MST"))
|
||||
t.Logf("Output: %s", cubePath)
|
||||
|
||||
err := assembleCube(dir, run, cubePath)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to assemble cube: %v", err)
|
||||
}
|
||||
|
||||
t.Logf("✓ Cube assembled successfully!")
|
||||
t.Logf("Cube file: %s", cubePath)
|
||||
}
|
||||
|
|
@ -1,36 +0,0 @@
|
|||
package grib
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type vec [2]float64
|
||||
|
||||
type item struct {
|
||||
v vec
|
||||
exp time.Time
|
||||
}
|
||||
|
||||
type memCache struct {
|
||||
ttl time.Duration
|
||||
m sync.Map
|
||||
}
|
||||
|
||||
func (c *memCache) get(k uint64) (vec, bool) {
|
||||
if v, ok := c.m.Load(k); ok {
|
||||
it := v.(item)
|
||||
|
||||
if time.Now().Before(it.exp) {
|
||||
return it.v, true
|
||||
}
|
||||
|
||||
c.m.Delete(k)
|
||||
}
|
||||
|
||||
return vec{}, false
|
||||
}
|
||||
|
||||
func (c *memCache) set(k uint64, v vec) {
|
||||
c.m.Store(k, item{v, time.Now().Add(c.ttl)})
|
||||
}
|
||||
|
|
@ -1,139 +0,0 @@
|
|||
package grib
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"git.intra.yksa.space/gsn/predictor/internal/pkg/errcodes"
|
||||
env "github.com/caarlos0/env/v11"
|
||||
)
|
||||
|
||||
// DatasetConfig описывает параметры GFS-датасета: сетку, временные шаги,
|
||||
// уровни давления и URL для загрузки.
|
||||
type DatasetConfig struct {
|
||||
// Сетка
|
||||
Resolution float64 // шаг сетки в градусах (0.25 или 0.5)
|
||||
NLat int // точек по широте (721 для 0.25°, 361 для 0.5°)
|
||||
NLon int // точек по долготе (1440 для 0.25°, 720 для 0.5°)
|
||||
|
||||
// Время
|
||||
NT int // кол-во временных шагов (97 для 0–96 ч с шагом 1)
|
||||
MaxHour int // последний час прогноза (96)
|
||||
TimeStep int // интервал между шагами, часы (1 или 3)
|
||||
|
||||
// Вертикаль
|
||||
NP int // кол-во уровней давления
|
||||
Levels []float64 // уровни давления в гПа, по убыванию (1000 … 1)
|
||||
|
||||
// Переменные в кубе (порядок важен: индексы 0, 1, 2, …)
|
||||
NVar int // кол-во переменных
|
||||
Variables []string // GRIB-имена для фильтрации idx (HGT, UGRD, VGRD)
|
||||
|
||||
// URL загрузки (fmt-шаблоны: date, hour, hour, step)
|
||||
URLMask string // основной pgrb2
|
||||
URLMaskB string // дополнительный pgrb2b
|
||||
|
||||
// Имена файлов
|
||||
FileSuffix string // токен разрешения в именах файлов ("0p25", "0p50")
|
||||
}
|
||||
|
||||
// SizePerVar возвращает размер одной переменной в кубе, байт.
|
||||
func (dc *DatasetConfig) SizePerVar() int64 {
|
||||
return int64(dc.NT) * int64(dc.NP) * int64(dc.NLat) * int64(dc.NLon) * 4
|
||||
}
|
||||
|
||||
// CubeSize возвращает полный размер куба, байт.
|
||||
func (dc *DatasetConfig) CubeSize() int64 {
|
||||
return dc.SizePerVar() * int64(dc.NVar)
|
||||
}
|
||||
|
||||
// GridSize возвращает NLat * NLon.
|
||||
func (dc *DatasetConfig) GridSize() int {
|
||||
return dc.NLat * dc.NLon
|
||||
}
|
||||
|
||||
// InvResolution возвращает 1/Resolution — множитель для перевода координат в индексы.
|
||||
func (dc *DatasetConfig) InvResolution() float64 {
|
||||
return 1.0 / dc.Resolution
|
||||
}
|
||||
|
||||
// Steps возвращает список часов прогноза [0, TimeStep, 2*TimeStep, …, MaxHour].
|
||||
func (dc *DatasetConfig) Steps() []int {
|
||||
out := make([]int, 0, dc.NT)
|
||||
for h := 0; h <= dc.MaxHour; h += dc.TimeStep {
|
||||
out = append(out, h)
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// FileName возвращает имя основного GRIB-файла (pgrb2).
|
||||
func (dc *DatasetConfig) FileName(run time.Time, step int) string {
|
||||
return fmt.Sprintf("gfs.t%02dz.pgrb2.%s.f%03d", run.Hour(), dc.FileSuffix, step)
|
||||
}
|
||||
|
||||
// FileNameB возвращает имя вторичного GRIB-файла (pgrb2b).
|
||||
func (dc *DatasetConfig) FileNameB(run time.Time, step int) string {
|
||||
return fmt.Sprintf("gfs.t%02dz.pgrb2b.%s.f%03d", run.Hour(), dc.FileSuffix, step)
|
||||
}
|
||||
|
||||
// GribURL возвращает URL основного GRIB-файла.
|
||||
func (dc *DatasetConfig) GribURL(run time.Time, step int) string {
|
||||
return fmt.Sprintf(dc.URLMask, run.Format("20060102"), run.Hour(), run.Hour(), step)
|
||||
}
|
||||
|
||||
// GribURLB возвращает URL вторичного GRIB-файла.
|
||||
func (dc *DatasetConfig) GribURLB(run time.Time, step int) string {
|
||||
return fmt.Sprintf(dc.URLMaskB, run.Format("20060102"), run.Hour(), run.Hour(), step)
|
||||
}
|
||||
|
||||
// DefaultDatasetConfig возвращает конфиг GFS 0.25° / 1 час / 47 уровней.
|
||||
func DefaultDatasetConfig() DatasetConfig {
|
||||
return DatasetConfig{
|
||||
Resolution: 0.25,
|
||||
NLat: 721,
|
||||
NLon: 1440,
|
||||
|
||||
NT: 97,
|
||||
MaxHour: 96,
|
||||
TimeStep: 1,
|
||||
|
||||
NP: 47,
|
||||
Levels: []float64{
|
||||
1000, 975, 950, 925, 900, 875, 850, 825, 800, 775,
|
||||
750, 725, 700, 675, 650, 625, 600, 575, 550, 525,
|
||||
500, 475, 450, 425, 400, 375, 350, 325, 300, 275,
|
||||
250, 225, 200, 175, 150, 125, 100, 70, 50, 30,
|
||||
20, 10, 7, 5, 3, 2, 1,
|
||||
},
|
||||
|
||||
NVar: 3,
|
||||
Variables: []string{"HGT", "UGRD", "VGRD"},
|
||||
|
||||
URLMask: "https://noaa-gfs-bdp-pds.s3.amazonaws.com/gfs.%s/%02d/atmos/gfs.t%02dz.pgrb2.0p25.f%03d",
|
||||
URLMaskB: "https://noaa-gfs-bdp-pds.s3.amazonaws.com/gfs.%s/%02d/atmos/gfs.t%02dz.pgrb2b.0p25.f%03d",
|
||||
|
||||
FileSuffix: "0p25",
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
type Config struct {
|
||||
Dir string `env:"DIR" envDefault:"C:/tmp/grib"`
|
||||
TTL time.Duration `env:"TTL" envDefault:"48h"`
|
||||
CacheTTL time.Duration `env:"CACHE_TTL" envDefault:"1h"`
|
||||
Parallel int `env:"PARALLEL" envDefault:"8"`
|
||||
|
||||
Dataset DatasetConfig
|
||||
}
|
||||
|
||||
func NewConfig() (*Config, error) {
|
||||
cfg := &Config{}
|
||||
if err := env.ParseWithOptions(cfg, env.Options{
|
||||
PrefixTagName: "GSN_PREDICTOR_GRIB_",
|
||||
}); err != nil {
|
||||
return nil, errcodes.Wrap(err, "failed to parse GRIB config")
|
||||
}
|
||||
cfg.Dataset = DefaultDatasetConfig()
|
||||
return cfg, nil
|
||||
}
|
||||
|
|
@ -1,56 +0,0 @@
|
|||
package grib
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"math"
|
||||
"os"
|
||||
|
||||
mmap "github.com/edsrzf/mmap-go"
|
||||
)
|
||||
|
||||
type cube struct {
|
||||
mm mmap.MMap
|
||||
t, p, lat, lon int
|
||||
bytesPerVar int64
|
||||
file *os.File
|
||||
}
|
||||
|
||||
func openCube(path string, dc *DatasetConfig) (*cube, error) {
|
||||
f, err := os.Open(path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
mm, err := mmap.Map(f, mmap.RDONLY, 0)
|
||||
if err != nil {
|
||||
f.Close()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &cube{
|
||||
mm: mm,
|
||||
t: dc.NT,
|
||||
p: dc.NP,
|
||||
lat: dc.NLat,
|
||||
lon: dc.NLon,
|
||||
bytesPerVar: dc.SizePerVar(),
|
||||
file: f,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c *cube) val(varIdx, ti, pi, y, x int) float32 {
|
||||
idx := (((ti*c.p+pi)*c.lat + y) * c.lon) + x
|
||||
off := int64(varIdx)*c.bytesPerVar + int64(idx)*4
|
||||
bits := binary.LittleEndian.Uint32(c.mm[off : off+4])
|
||||
return math.Float32frombits(bits)
|
||||
}
|
||||
|
||||
func (c *cube) Close() error {
|
||||
if c.mm != nil {
|
||||
c.mm.Unmap()
|
||||
}
|
||||
if c.file != nil {
|
||||
return c.file.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
@ -1,14 +0,0 @@
|
|||
package grib
|
||||
|
||||
type dataset struct {
|
||||
cube *cube
|
||||
ds *DatasetConfig
|
||||
runUTC int64 // unix seconds
|
||||
}
|
||||
|
||||
func (d *dataset) Close() error {
|
||||
if d.cube != nil {
|
||||
return d.cube.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
@ -1,126 +0,0 @@
|
|||
package grib
|
||||
|
||||
import "math"
|
||||
|
||||
func lerp(a, b, t float64) float64 { return a + t*(b-a) }
|
||||
|
||||
// ghInterp returns interpolated geopotential height at given time/pressure/lat/lon
|
||||
func (d *dataset) ghInterp(ti, pi int, y0, y1, x0, x1 int, wy, wx float64) float64 {
|
||||
g00 := d.cube.val(0, ti, pi, y0, x0)
|
||||
g10 := d.cube.val(0, ti, pi, y0, x1)
|
||||
g01 := d.cube.val(0, ti, pi, y1, x0)
|
||||
g11 := d.cube.val(0, ti, pi, y1, x1)
|
||||
return (1-wy)*((1-wx)*float64(g00)+wx*float64(g10)) + wy*((1-wx)*float64(g01)+wx*float64(g11))
|
||||
}
|
||||
|
||||
// searchAltLevel uses geopotential height to find pressure level bracket for target altitude.
|
||||
func (d *dataset) searchAltLevel(alt float64, ti, y0, y1, x0, x1 int, wy, wx float64) (int, float64) {
|
||||
levels := d.ds.Levels
|
||||
nLevels := len(levels)
|
||||
|
||||
lo, hi := 0, nLevels-1
|
||||
for lo < hi-1 {
|
||||
mid := (lo + hi) / 2
|
||||
ghMid := d.ghInterp(ti, mid, y0, y1, x0, x1, wy, wx)
|
||||
if ghMid < alt {
|
||||
lo = mid
|
||||
} else {
|
||||
hi = mid
|
||||
}
|
||||
}
|
||||
|
||||
ghLo := d.ghInterp(ti, lo, y0, y1, x0, x1, wy, wx)
|
||||
ghHi := d.ghInterp(ti, hi, y0, y1, x0, x1, wy, wx)
|
||||
|
||||
wp := 0.0
|
||||
if ghHi != ghLo {
|
||||
wp = (alt - ghLo) / (ghHi - ghLo)
|
||||
}
|
||||
if wp < 0 {
|
||||
wp = 0
|
||||
}
|
||||
if wp > 1 {
|
||||
wp = 1
|
||||
}
|
||||
|
||||
return lo, wp
|
||||
}
|
||||
|
||||
// uv выполняет интерполяцию ветра по 4 измерениям (time, pressure, lat, lon).
|
||||
func (d *dataset) uv(lat, lon, alt float64, tHours float64) (float64, float64) {
|
||||
if lon < 0 {
|
||||
lon += 360
|
||||
}
|
||||
|
||||
inv := d.ds.InvResolution()
|
||||
|
||||
// GRIB scan north→south: index 0 = 90°N
|
||||
iy := (90 - lat) * inv
|
||||
y0 := int(math.Floor(iy))
|
||||
if y0 < 0 {
|
||||
y0 = 0
|
||||
}
|
||||
if y0 >= d.cube.lat-1 {
|
||||
y0 = d.cube.lat - 2
|
||||
}
|
||||
y1 := y0 + 1
|
||||
wy := iy - float64(y0)
|
||||
|
||||
ix := lon * inv
|
||||
x0 := int(math.Floor(ix)) % d.cube.lon
|
||||
x1 := (x0 + 1) % d.cube.lon
|
||||
wx := ix - float64(x0)
|
||||
|
||||
// Время: tHours делим на шаг, чтобы получить индекс в кубе
|
||||
tIdx := tHours / float64(d.ds.TimeStep)
|
||||
it0 := int(math.Floor(tIdx))
|
||||
if it0 < 0 {
|
||||
it0 = 0
|
||||
}
|
||||
if it0 >= d.cube.t-1 {
|
||||
it0 = d.cube.t - 2
|
||||
}
|
||||
wt := tIdx - float64(it0)
|
||||
|
||||
// ISA: высота → давление → индекс уровня
|
||||
levels := d.ds.Levels
|
||||
p := pressureFromAlt(alt)
|
||||
ip0 := 0
|
||||
for ip0+1 < len(levels) && levels[ip0+1] > p {
|
||||
ip0++
|
||||
}
|
||||
ip1 := ip0 + 1
|
||||
if ip1 >= len(levels) {
|
||||
ip1 = len(levels) - 1
|
||||
}
|
||||
wp := 0.0
|
||||
if levels[ip0] != levels[ip1] {
|
||||
wp = (levels[ip0] - p) / (levels[ip0] - levels[ip1])
|
||||
}
|
||||
|
||||
fetch := func(ti, pi int) (float64, float64) {
|
||||
u00 := d.cube.val(1, ti, pi, y0, x0)
|
||||
u10 := d.cube.val(1, ti, pi, y0, x1)
|
||||
u01 := d.cube.val(1, ti, pi, y1, x0)
|
||||
u11 := d.cube.val(1, ti, pi, y1, x1)
|
||||
v00 := d.cube.val(2, ti, pi, y0, x0)
|
||||
v10 := d.cube.val(2, ti, pi, y0, x1)
|
||||
v01 := d.cube.val(2, ti, pi, y1, x0)
|
||||
v11 := d.cube.val(2, ti, pi, y1, x1)
|
||||
uxy := (1-wy)*((1-wx)*float64(u00)+wx*float64(u10)) + wy*((1-wx)*float64(u01)+wx*float64(u11))
|
||||
vxy := (1-wy)*((1-wx)*float64(v00)+wx*float64(v10)) + wy*((1-wx)*float64(v01)+wx*float64(v11))
|
||||
return uxy, vxy
|
||||
}
|
||||
|
||||
u0p0, v0p0 := fetch(it0, ip0)
|
||||
u0p1, v0p1 := fetch(it0, ip1)
|
||||
u1p0, v1p0 := fetch(it0+1, ip0)
|
||||
u1p1, v1p1 := fetch(it0+1, ip1)
|
||||
uLow := lerp(u0p0, u0p1, wp)
|
||||
vLow := lerp(v0p0, v0p1, wp)
|
||||
uHig := lerp(u1p0, u1p1, wp)
|
||||
vHig := lerp(v1p0, v1p1, wp)
|
||||
u := lerp(uLow, uHig, wt)
|
||||
v := lerp(vLow, vHig, wt)
|
||||
return u, v
|
||||
}
|
||||
|
|
@ -1,291 +0,0 @@
|
|||
package grib
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"math"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"git.intra.yksa.space/gsn/predictor/internal/pkg/errcodes"
|
||||
"github.com/edsrzf/mmap-go"
|
||||
"github.com/nilsmagnus/grib/griblib"
|
||||
)
|
||||
|
||||
type Service interface {
|
||||
Update(ctx context.Context) error
|
||||
Extract(ctx context.Context, lat, lon, alt float64, ts time.Time) ([2]float64, error)
|
||||
Close() error
|
||||
GetStatus() (ready bool, lastUpdate time.Time, isFresh bool, errMsg string)
|
||||
}
|
||||
|
||||
type service struct {
|
||||
cfg *Config
|
||||
cache memCache
|
||||
data atomic.Pointer[dataset]
|
||||
}
|
||||
|
||||
func New(cfg *Config) (Service, error) {
|
||||
if cfg.TTL == 0 {
|
||||
cfg.TTL = 24 * time.Hour
|
||||
}
|
||||
if err := os.MkdirAll(cfg.Dir, 0o755); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s := &service{cfg: cfg, cache: memCache{ttl: cfg.CacheTTL}}
|
||||
|
||||
// Try to load existing dataset on startup
|
||||
if err := s.loadExistingDataset(); err != nil {
|
||||
// Log error but don't fail startup - dataset will be loaded on first Update()
|
||||
}
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (s *service) loadExistingDataset() error {
|
||||
pattern := filepath.Join(s.cfg.Dir, "*.cube")
|
||||
matches, err := filepath.Glob(pattern)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(matches) == 0 {
|
||||
return errcodes.ErrNoCubeFilesFound
|
||||
}
|
||||
|
||||
var latestFile string
|
||||
var latestTime time.Time
|
||||
|
||||
for _, match := range matches {
|
||||
info, err := os.Stat(match)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if info.ModTime().After(latestTime) {
|
||||
latestTime = info.ModTime()
|
||||
latestFile = match
|
||||
}
|
||||
}
|
||||
|
||||
if latestFile == "" {
|
||||
return errcodes.ErrNoValidCubeFilesFound
|
||||
}
|
||||
|
||||
if time.Since(latestTime) > s.cfg.TTL {
|
||||
return errcodes.Wrap(errcodes.ErrLatestCubeFileIsTooOld, "latest cube file is too old")
|
||||
}
|
||||
|
||||
dc := &s.cfg.Dataset
|
||||
c, err := openCube(latestFile, dc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
base := filepath.Base(latestFile)
|
||||
runStr := strings.TrimSuffix(base, ".cube")
|
||||
run, err := time.Parse("20060102_15", runStr)
|
||||
if err != nil {
|
||||
c.Close()
|
||||
return err
|
||||
}
|
||||
|
||||
s.data.Store(&dataset{cube: c, ds: dc, runUTC: run.Unix()})
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *service) Update(ctx context.Context) error {
|
||||
if d := s.data.Load(); d != nil {
|
||||
runTime := time.Unix(d.runUTC, 0)
|
||||
if time.Since(runTime) < s.cfg.TTL {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
if d := s.data.Load(); d != nil {
|
||||
runTime := time.Unix(d.runUTC, 0)
|
||||
if time.Since(runTime) < s.cfg.TTL {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
dc := &s.cfg.Dataset
|
||||
run := nearestRun(time.Now().UTC().Add(-6 * time.Hour))
|
||||
|
||||
cubePath := filepath.Join(s.cfg.Dir, run.Format("20060102_15")) + ".cube"
|
||||
if _, err := os.Stat(cubePath); err == nil {
|
||||
info, err := os.Stat(cubePath)
|
||||
if err == nil && time.Since(info.ModTime()) < s.cfg.TTL {
|
||||
c, err := openCube(cubePath, dc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.data.Store(&dataset{cube: c, ds: dc, runUTC: run.Unix()})
|
||||
s.cache = memCache{ttl: s.cfg.CacheTTL}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
downloadCtx, cancel := context.WithTimeout(ctx, 60*time.Minute)
|
||||
defer cancel()
|
||||
|
||||
dl := NewPartialDownloader(s.cfg.Dir, s.cfg.Parallel, dc)
|
||||
if err := dl.Run(downloadCtx, run); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := os.Stat(cubePath); err != nil {
|
||||
if err := assembleCube(s.cfg.Dir, run, cubePath, dc); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
c, err := openCube(cubePath, dc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.data.Store(&dataset{cube: c, ds: dc, runUTC: run.Unix()})
|
||||
s.cache = memCache{ttl: s.cfg.CacheTTL}
|
||||
return nil
|
||||
}
|
||||
|
||||
func assembleCube(dir string, run time.Time, cubePath string, dc *DatasetConfig) error {
|
||||
sizePerVar := dc.SizePerVar()
|
||||
total := dc.CubeSize()
|
||||
gridBytes := int64(dc.GridSize()) * 4
|
||||
|
||||
f, err := os.Create(cubePath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := f.Truncate(total); err != nil {
|
||||
return err
|
||||
}
|
||||
mm, err := mmap.MapRegion(f, int(total), mmap.RDWR, 0, 0)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer mm.Unmap()
|
||||
defer f.Close()
|
||||
|
||||
pIndex := make(map[int]int)
|
||||
for i, p := range dc.Levels {
|
||||
pIndex[int(math.Round(p))] = i
|
||||
}
|
||||
|
||||
processFile := func(fn string, ti int) error {
|
||||
file, err := os.Open(fn)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
messages, err := griblib.ReadMessages(file)
|
||||
file.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, m := range messages {
|
||||
if m.Section4.ProductDefinitionTemplateNumber != 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
product := m.Section4.ProductDefinitionTemplate
|
||||
|
||||
var varIdx int
|
||||
if product.ParameterCategory == 2 {
|
||||
switch product.ParameterNumber {
|
||||
case 2: // u-wind
|
||||
varIdx = 1
|
||||
case 3: // v-wind
|
||||
varIdx = 2
|
||||
default:
|
||||
continue
|
||||
}
|
||||
} else if product.ParameterCategory == 3 && product.ParameterNumber == 5 {
|
||||
varIdx = 0 // geopotential height
|
||||
} else {
|
||||
continue
|
||||
}
|
||||
|
||||
if product.FirstSurface.Type != 100 {
|
||||
continue
|
||||
}
|
||||
|
||||
pressure := float64(product.FirstSurface.Value) / 100.0
|
||||
pIdx, ok := pIndex[int(math.Round(pressure))]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
vals := m.Data()
|
||||
raw := make([]byte, len(vals)*4)
|
||||
for i, v := range vals {
|
||||
binary.LittleEndian.PutUint32(raw[i*4:], math.Float32bits(float32(v)))
|
||||
}
|
||||
base := int64(varIdx)*sizePerVar + (int64(ti)*int64(dc.NP)+int64(pIdx))*gridBytes
|
||||
copy(mm[base:base+int64(len(raw))], raw)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
steps := dc.Steps()
|
||||
for ti, step := range steps {
|
||||
fn := filepath.Join(dir, dc.FileName(run, step))
|
||||
if err := processFile(fn, ti); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
fnB := filepath.Join(dir, dc.FileNameB(run, step))
|
||||
if err := processFile(fnB, ti); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return mm.Flush()
|
||||
}
|
||||
|
||||
func (s *service) Extract(ctx context.Context, lat, lon, alt float64, ts time.Time) ([2]float64, error) {
|
||||
var zero [2]float64
|
||||
d := s.data.Load()
|
||||
if d == nil {
|
||||
return zero, errcodes.ErrNoDataset
|
||||
}
|
||||
maxDur := time.Duration(s.cfg.Dataset.MaxHour) * time.Hour
|
||||
if ts.Before(time.Unix(d.runUTC, 0)) || ts.After(time.Unix(d.runUTC, 0).Add(maxDur)) {
|
||||
return zero, errcodes.ErrOutOfBounds
|
||||
}
|
||||
|
||||
key := encodeKey(lat, lon, alt, ts)
|
||||
if v, ok := s.cache.get(key); ok {
|
||||
return [2]float64(v), nil
|
||||
}
|
||||
|
||||
td := ts.Sub(time.Unix(d.runUTC, 0)).Hours()
|
||||
u, v := d.uv(lat, lon, alt, td)
|
||||
out := [2]float64{u, v}
|
||||
|
||||
s.cache.set(key, vec(out))
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (s *service) Close() error {
|
||||
if d := s.data.Load(); d != nil {
|
||||
return d.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *service) GetStatus() (ready bool, lastUpdate time.Time, isFresh bool, errMsg string) {
|
||||
d := s.data.Load()
|
||||
if d == nil {
|
||||
return false, time.Time{}, false, "no dataset loaded"
|
||||
}
|
||||
runTime := time.Unix(d.runUTC, 0)
|
||||
fresh := time.Since(runTime) < s.cfg.TTL
|
||||
if !fresh {
|
||||
return false, runTime, false, "dataset is too old"
|
||||
}
|
||||
return true, runTime, true, ""
|
||||
}
|
||||
|
|
@ -1,350 +0,0 @@
|
|||
package grib
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"git.intra.yksa.space/gsn/predictor/internal/pkg/errcodes"
|
||||
"git.intra.yksa.space/gsn/predictor/internal/pkg/log"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
// PartialDownloader загружает только необходимые поля из GRIB файлов
|
||||
// используя HTTP Range requests и .idx индексные файлы
|
||||
type PartialDownloader struct {
|
||||
Dir string
|
||||
Parallel int
|
||||
Client *http.Client
|
||||
Variables []string
|
||||
ds *DatasetConfig
|
||||
}
|
||||
|
||||
// NewPartialDownloader создаёт новый partial downloader
|
||||
func NewPartialDownloader(dir string, parallel int, dc *DatasetConfig) *PartialDownloader {
|
||||
return &PartialDownloader{
|
||||
Dir: dir,
|
||||
Parallel: parallel,
|
||||
Client: &http.Client{
|
||||
Timeout: 60 * time.Second,
|
||||
},
|
||||
Variables: dc.Variables,
|
||||
ds: dc,
|
||||
}
|
||||
}
|
||||
|
||||
// idxEntry представляет запись из .idx файла
|
||||
type idxEntry struct {
|
||||
Index int
|
||||
ByteStart int64
|
||||
Date string
|
||||
Variable string
|
||||
Level string
|
||||
Forecast string
|
||||
}
|
||||
|
||||
type ProgressWriter struct {
|
||||
Total int64
|
||||
Downloaded int64
|
||||
OnProgress func(percent float64)
|
||||
}
|
||||
|
||||
func (pw *ProgressWriter) Write(p []byte) (int, error) {
|
||||
n := len(p)
|
||||
pw.Downloaded += int64(n)
|
||||
if pw.Total > 0 && pw.OnProgress != nil {
|
||||
percent := float64(pw.Downloaded) / float64(pw.Total) * 100
|
||||
pw.OnProgress(percent)
|
||||
}
|
||||
return n, nil
|
||||
}
|
||||
|
||||
// parseIdx парсит .idx файл и возвращает записи
|
||||
func (d *PartialDownloader) parseIdx(body []byte) []idxEntry {
|
||||
var entries []idxEntry
|
||||
lines := strings.Split(string(body), "\n")
|
||||
|
||||
for _, line := range lines {
|
||||
if line == "" {
|
||||
continue
|
||||
}
|
||||
parts := strings.Split(line, ":")
|
||||
if len(parts) < 7 {
|
||||
continue
|
||||
}
|
||||
|
||||
byteStart, _ := strconv.ParseInt(parts[1], 10, 64)
|
||||
entries = append(entries, idxEntry{
|
||||
Index: len(entries),
|
||||
ByteStart: byteStart,
|
||||
Date: parts[2],
|
||||
Variable: parts[3],
|
||||
Level: parts[4],
|
||||
Forecast: parts[5],
|
||||
})
|
||||
}
|
||||
return entries
|
||||
}
|
||||
|
||||
// filterEntries фильтрует записи по нужным переменным и уровням давления
|
||||
func (d *PartialDownloader) filterEntries(entries []idxEntry) []idxEntry {
|
||||
var filtered []idxEntry
|
||||
|
||||
for _, e := range entries {
|
||||
isNeededVar := false
|
||||
for _, v := range d.Variables {
|
||||
if v == e.Variable {
|
||||
isNeededVar = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
isPressureLevel := strings.HasSuffix(e.Level, " mb")
|
||||
|
||||
if isNeededVar && isPressureLevel {
|
||||
filtered = append(filtered, e)
|
||||
}
|
||||
}
|
||||
|
||||
return filtered
|
||||
}
|
||||
|
||||
// Вспомогательная функция для выполнения запроса с повторами
|
||||
func (d *PartialDownloader) doWithRetry(ctx context.Context, req *http.Request) (*http.Response, error) {
|
||||
var resp *http.Response
|
||||
var err error
|
||||
|
||||
backoff := 1 * time.Second
|
||||
maxRetries := 3
|
||||
|
||||
for i := 0; i < maxRetries; i++ {
|
||||
resp, err = d.Client.Do(req)
|
||||
if err == nil && resp.StatusCode < 500 {
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
if resp != nil {
|
||||
resp.Body.Close()
|
||||
}
|
||||
|
||||
log.Ctx(ctx).Warn("retry download", zap.Int("attempt", i+1), zap.Error(err))
|
||||
|
||||
select {
|
||||
case <-time.After(backoff):
|
||||
backoff *= 2
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// downloadRange загружает диапазон байтов из URL
|
||||
func (d *PartialDownloader) downloadRange(ctx context.Context, url string, start, end int64, out io.Writer) error {
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
rangeHeader := fmt.Sprintf("bytes=%d-", start)
|
||||
if end > 0 {
|
||||
rangeHeader = fmt.Sprintf("bytes=%d-%d", start, end)
|
||||
}
|
||||
req.Header.Set("Range", rangeHeader)
|
||||
|
||||
resp, err := d.Client.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusPartialContent && resp.StatusCode != http.StatusOK {
|
||||
return errcodes.Wrap(errcodes.ErrDownload, "bad status: "+resp.Status)
|
||||
}
|
||||
|
||||
_, err = io.Copy(out, resp.Body)
|
||||
return err
|
||||
}
|
||||
|
||||
func (d *PartialDownloader) downloadFieldsFromURL(ctx context.Context, url string, dst string, step int) (err error) {
|
||||
idxURL := url + ".idx"
|
||||
tmp := dst + ".part"
|
||||
|
||||
if info, err := os.Stat(dst); err == nil && info.Size() > 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
reqIdx, _ := http.NewRequestWithContext(ctx, http.MethodGet, idxURL, nil)
|
||||
respIdx, err := d.doWithRetry(ctx, reqIdx)
|
||||
if err != nil {
|
||||
return errcodes.Wrap(err, "failed to get idx")
|
||||
}
|
||||
defer respIdx.Body.Close()
|
||||
|
||||
idxBody, _ := io.ReadAll(respIdx.Body)
|
||||
entries := d.parseIdx(idxBody)
|
||||
filtered := d.filterEntries(entries)
|
||||
if len(filtered) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
var totalBytes int64
|
||||
type chunk struct{ start, end int64 }
|
||||
chunks := make([]chunk, 0, len(filtered))
|
||||
|
||||
for _, entry := range filtered {
|
||||
var endByte int64 = -1
|
||||
for j, e := range entries {
|
||||
if e.ByteStart == entry.ByteStart && j+1 < len(entries) {
|
||||
endByte = entries[j+1].ByteStart - 1
|
||||
break
|
||||
}
|
||||
}
|
||||
chunks = append(chunks, chunk{entry.ByteStart, endByte})
|
||||
if endByte > 0 {
|
||||
totalBytes += (endByte - entry.ByteStart + 1)
|
||||
}
|
||||
}
|
||||
|
||||
f, err := os.OpenFile(tmp, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var downloaded int64
|
||||
|
||||
err = func() error {
|
||||
defer f.Close()
|
||||
bufWriter := bufio.NewWriterSize(f, 1024*1024)
|
||||
|
||||
for i, c := range chunks {
|
||||
countingWriter := &proxyWriter{
|
||||
Writer: bufWriter,
|
||||
OnWrite: func(n int) {
|
||||
downloaded += int64(n)
|
||||
if totalBytes > 0 && i%20 == 0 {
|
||||
pct := float64(downloaded) / float64(totalBytes) * 100
|
||||
log.Ctx(ctx).Debug("download progress",
|
||||
zap.Int("step", step),
|
||||
zap.String("pct", fmt.Sprintf("%.1f%%", pct)))
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
if err := d.downloadRange(ctx, url, c.start, c.end, countingWriter); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return bufWriter.Flush()
|
||||
}()
|
||||
|
||||
if err != nil {
|
||||
f.Close()
|
||||
os.Remove(tmp)
|
||||
return err
|
||||
}
|
||||
|
||||
return d.safeRename(tmp, dst)
|
||||
}
|
||||
|
||||
type proxyWriter struct {
|
||||
io.Writer
|
||||
OnWrite func(int)
|
||||
}
|
||||
|
||||
func (p *proxyWriter) Write(data []byte) (int, error) {
|
||||
n, err := p.Writer.Write(data)
|
||||
if n > 0 && p.OnWrite != nil {
|
||||
p.OnWrite(n)
|
||||
}
|
||||
return n, err
|
||||
}
|
||||
|
||||
func (d *PartialDownloader) safeRename(src, dst string) error {
|
||||
var lastErr error
|
||||
for i := 0; i < 5; i++ {
|
||||
if err := os.Rename(src, dst); err == nil {
|
||||
return nil
|
||||
} else {
|
||||
lastErr = err
|
||||
}
|
||||
time.Sleep(150 * time.Millisecond)
|
||||
}
|
||||
return fmt.Errorf("rename failed: %w", lastErr)
|
||||
}
|
||||
|
||||
// Run запускает загрузку всех необходимых файлов (pgrb2 + pgrb2b)
|
||||
func (d *PartialDownloader) Run(ctx context.Context, run time.Time) error {
|
||||
log.Ctx(ctx).Info("starting partial download",
|
||||
zap.Time("run", run),
|
||||
zap.Strings("variables", d.Variables))
|
||||
|
||||
g, ctx := errgroup.WithContext(ctx)
|
||||
sem := make(chan struct{}, d.Parallel)
|
||||
steps := d.ds.Steps()
|
||||
|
||||
for _, step := range steps {
|
||||
step := step
|
||||
|
||||
// Download primary pgrb2
|
||||
sem <- struct{}{}
|
||||
g.Go(func() error {
|
||||
defer func() { <-sem }()
|
||||
url := d.ds.GribURL(run, step)
|
||||
dst := filepath.Join(d.Dir, d.ds.FileName(run, step))
|
||||
return d.downloadFieldsFromURL(ctx, url, dst, step)
|
||||
})
|
||||
|
||||
// Download secondary pgrb2b
|
||||
sem <- struct{}{}
|
||||
g.Go(func() error {
|
||||
defer func() { <-sem }()
|
||||
url := d.ds.GribURLB(run, step)
|
||||
dst := filepath.Join(d.Dir, d.ds.FileNameB(run, step))
|
||||
return d.downloadFieldsFromURL(ctx, url, dst, step)
|
||||
})
|
||||
}
|
||||
|
||||
return g.Wait()
|
||||
}
|
||||
|
||||
// GetLatestModelRun находит последний доступный прогноз GFS
|
||||
func GetLatestModelRun(ctx context.Context, dc *DatasetConfig) (time.Time, error) {
|
||||
now := time.Now().UTC()
|
||||
hour := now.Hour() - (now.Hour() % 6)
|
||||
current := time.Date(now.Year(), now.Month(), now.Day(), hour, 0, 0, 0, time.UTC)
|
||||
|
||||
client := &http.Client{Timeout: 10 * time.Second}
|
||||
|
||||
for i := 0; i < 8; i++ {
|
||||
url := dc.GribURL(current, dc.MaxHour)
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodHead, url, nil)
|
||||
if err != nil {
|
||||
current = current.Add(-6 * time.Hour)
|
||||
continue
|
||||
}
|
||||
|
||||
resp, err := client.Do(req)
|
||||
if err == nil && resp.StatusCode == http.StatusOK {
|
||||
resp.Body.Close()
|
||||
log.Ctx(ctx).Info("found latest model run", zap.Time("run", current))
|
||||
return current, nil
|
||||
}
|
||||
if resp != nil {
|
||||
resp.Body.Close()
|
||||
}
|
||||
|
||||
current = current.Add(-6 * time.Hour)
|
||||
}
|
||||
|
||||
return time.Time{}, errcodes.Wrap(errcodes.ErrDownload, "no recent GFS forecast found")
|
||||
}
|
||||
|
|
@ -1,7 +0,0 @@
|
|||
package grib
|
||||
|
||||
import "math"
|
||||
|
||||
func pressureFromAlt(alt float64) float64 { // ICAO ISA
|
||||
return 1013.25 * math.Pow(1-alt/44307.69396, 5.255877)
|
||||
}
|
||||
|
|
@ -1,20 +0,0 @@
|
|||
package grib
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"hash/fnv"
|
||||
"time"
|
||||
)
|
||||
|
||||
func nearestRun(t time.Time) time.Time {
|
||||
h := t.UTC().Hour() - t.UTC().Hour()%6
|
||||
return time.Date(t.Year(), t.Month(), t.Day(), h, 0, 0, 0, time.UTC)
|
||||
}
|
||||
|
||||
func encodeKey(a ...any) uint64 {
|
||||
h := fnv.New64a()
|
||||
for _, v := range a {
|
||||
fmt.Fprint(h, v)
|
||||
}
|
||||
return h.Sum64()
|
||||
}
|
||||
|
|
@ -1,23 +0,0 @@
|
|||
package log
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type ctxLogKey struct{}
|
||||
|
||||
func ToCtx(ctx context.Context, lg *zap.Logger) context.Context {
|
||||
return context.WithValue(ctx, ctxLogKey{}, lg)
|
||||
}
|
||||
|
||||
func Ctx(ctx context.Context) *zap.Logger {
|
||||
lg, ok := ctx.Value(ctxLogKey{}).(*zap.Logger)
|
||||
if !ok || lg == nil {
|
||||
zap.L().Error("no logger in context, using global")
|
||||
return zap.L()
|
||||
}
|
||||
|
||||
return lg
|
||||
}
|
||||
|
|
@ -1,12 +0,0 @@
|
|||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Grib interface {
|
||||
Update(ctx context.Context) error
|
||||
Extract(ctx context.Context, lat, lon, alt float64, ts time.Time) ([2]float64, error)
|
||||
Close() error
|
||||
}
|
||||
|
|
@ -1,684 +0,0 @@
|
|||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"math"
|
||||
"time"
|
||||
|
||||
"git.intra.yksa.space/gsn/predictor/internal/pkg/ds"
|
||||
"git.intra.yksa.space/gsn/predictor/internal/pkg/errcodes"
|
||||
"git.intra.yksa.space/gsn/predictor/internal/pkg/log"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
var ErrInvalidParameters = errcodes.New(400, "missing required prediction parameters")
|
||||
|
||||
// Stage represents a prediction stage (ascent, descent, float)
|
||||
type Stage struct {
|
||||
Name string
|
||||
Results []ds.PredicitonResult
|
||||
StartTime time.Time
|
||||
EndTime time.Time
|
||||
}
|
||||
|
||||
// shouldSimulateStage checks if a given stage should be simulated based on the SimulateStages filter
|
||||
func shouldSimulateStage(params ds.PredictionParameters, stage string) bool {
|
||||
// If no filter is specified, simulate all stages
|
||||
if len(params.SimulateStages) == 0 {
|
||||
return true
|
||||
}
|
||||
|
||||
// Check if the stage is in the filter list
|
||||
for _, s := range params.SimulateStages {
|
||||
if s == stage {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// CustomCurve represents a custom ascent/descent curve
|
||||
type CustomCurve struct {
|
||||
Altitude []float64 `json:"altitude"`
|
||||
Time []float64 `json:"time"` // seconds from start
|
||||
}
|
||||
|
||||
func (s *Service) PerformPrediction(ctx context.Context, params ds.PredictionParameters) ([]ds.PredicitonResult, error) {
|
||||
// Validate required parameters
|
||||
if params.LaunchLatitude == nil || params.LaunchLongitude == nil || params.LaunchAltitude == nil || params.LaunchDatetime == nil {
|
||||
return nil, ErrInvalidParameters
|
||||
}
|
||||
|
||||
// Get default values
|
||||
profile := "standard_profile"
|
||||
if params.Profile != nil {
|
||||
profile = *params.Profile
|
||||
}
|
||||
|
||||
ascentRate := 5.0
|
||||
if params.AscentRate != nil {
|
||||
ascentRate = *params.AscentRate
|
||||
}
|
||||
|
||||
burstAltitude := 30000.0
|
||||
if params.BurstAltitude != nil {
|
||||
burstAltitude = *params.BurstAltitude
|
||||
}
|
||||
|
||||
descentRate := 5.0
|
||||
if params.DescentRate != nil {
|
||||
descentRate = *params.DescentRate
|
||||
}
|
||||
|
||||
floatAltitude := 0.0
|
||||
if params.FloatAltitude != nil {
|
||||
floatAltitude = *params.FloatAltitude
|
||||
}
|
||||
|
||||
// Parse custom curves if provided
|
||||
var ascentCurve, descentCurve *CustomCurve
|
||||
if params.AscentCurve != nil && *params.AscentCurve != "" {
|
||||
if curve, err := parseCustomCurve(*params.AscentCurve); err == nil {
|
||||
ascentCurve = curve
|
||||
}
|
||||
}
|
||||
if params.DescentCurve != nil && *params.DescentCurve != "" {
|
||||
if curve, err := parseCustomCurve(*params.DescentCurve); err == nil {
|
||||
descentCurve = curve
|
||||
}
|
||||
}
|
||||
|
||||
log.Ctx(ctx).Warn("🚀 PREDICTION STARTING",
|
||||
zap.String("profile", profile),
|
||||
zap.Float64("lat", *params.LaunchLatitude),
|
||||
zap.Float64("lon", *params.LaunchLongitude),
|
||||
zap.Float64("alt", *params.LaunchAltitude),
|
||||
zap.Time("time", *params.LaunchDatetime),
|
||||
)
|
||||
|
||||
var allResults []ds.PredicitonResult
|
||||
|
||||
switch profile {
|
||||
case "standard_profile":
|
||||
allResults = s.standardProfile(ctx, params, ascentRate, burstAltitude, descentRate, ascentCurve, descentCurve)
|
||||
case "float_profile":
|
||||
allResults = s.floatProfile(ctx, params, ascentRate, burstAltitude, floatAltitude, descentRate, ascentCurve, descentCurve)
|
||||
case "reverse_profile":
|
||||
allResults = s.reverseProfile(ctx, params, ascentRate, burstAltitude, descentRate, ascentCurve, descentCurve)
|
||||
case "custom_profile":
|
||||
allResults = s.customProfile(ctx, params, ascentCurve, descentCurve)
|
||||
default:
|
||||
return nil, errcodes.New(400, "unsupported profile: "+profile)
|
||||
}
|
||||
|
||||
log.Ctx(ctx).Info("Prediction complete", zap.Int("total_steps", len(allResults)))
|
||||
return allResults, nil
|
||||
}
|
||||
|
||||
func (s *Service) standardProfile(ctx context.Context, params ds.PredictionParameters, ascentRate, burstAltitude, descentRate float64, ascentCurve, descentCurve *CustomCurve) []ds.PredicitonResult {
|
||||
var results []ds.PredicitonResult
|
||||
var lastResult ds.PredicitonResult
|
||||
|
||||
// Stage 1: Ascent
|
||||
if shouldSimulateStage(params, "ascent") {
|
||||
ascentResults := s.simulateAscent(ctx, params, ascentRate, burstAltitude, ascentCurve)
|
||||
results = append(results, ascentResults...)
|
||||
if len(ascentResults) > 0 {
|
||||
lastResult = ascentResults[len(ascentResults)-1]
|
||||
}
|
||||
} else {
|
||||
// If ascent is skipped, use initial position as starting point
|
||||
lastResult = ds.PredicitonResult{
|
||||
Latitude: params.LaunchLatitude,
|
||||
Longitude: params.LaunchLongitude,
|
||||
Altitude: &burstAltitude,
|
||||
Timestamp: params.LaunchDatetime,
|
||||
}
|
||||
}
|
||||
|
||||
// Stage 2: Descent
|
||||
if shouldSimulateStage(params, "descent") && lastResult.Latitude != nil {
|
||||
descentParams := ds.PredictionParameters{
|
||||
LaunchLatitude: lastResult.Latitude,
|
||||
LaunchLongitude: lastResult.Longitude,
|
||||
LaunchAltitude: lastResult.Altitude,
|
||||
LaunchDatetime: lastResult.Timestamp,
|
||||
}
|
||||
|
||||
descentResults := s.simulateDescent(ctx, descentParams, descentRate, 0, descentCurve)
|
||||
results = append(results, descentResults...)
|
||||
}
|
||||
|
||||
return results
|
||||
}
|
||||
|
||||
func (s *Service) floatProfile(ctx context.Context, params ds.PredictionParameters, ascentRate, burstAltitude, floatAltitude, descentRate float64, ascentCurve, descentCurve *CustomCurve) []ds.PredicitonResult {
|
||||
var results []ds.PredicitonResult
|
||||
var lastResult ds.PredicitonResult
|
||||
|
||||
// Stage 1: Ascent to float altitude
|
||||
if shouldSimulateStage(params, "ascent") {
|
||||
ascentResults := s.simulateAscent(ctx, params, ascentRate, floatAltitude, ascentCurve)
|
||||
results = append(results, ascentResults...)
|
||||
if len(ascentResults) > 0 {
|
||||
lastResult = ascentResults[len(ascentResults)-1]
|
||||
}
|
||||
} else {
|
||||
// If ascent is skipped, use initial position at float altitude as starting point
|
||||
lastResult = ds.PredicitonResult{
|
||||
Latitude: params.LaunchLatitude,
|
||||
Longitude: params.LaunchLongitude,
|
||||
Altitude: &floatAltitude,
|
||||
Timestamp: params.LaunchDatetime,
|
||||
}
|
||||
}
|
||||
|
||||
// Stage 2: Float (simulate for some time)
|
||||
if shouldSimulateStage(params, "float") && lastResult.Latitude != nil {
|
||||
floatResults := s.simulateFloat(ctx, lastResult, 30*time.Minute) // Float for 30 minutes
|
||||
results = append(results, floatResults...)
|
||||
if len(floatResults) > 0 {
|
||||
lastResult = floatResults[len(floatResults)-1]
|
||||
}
|
||||
}
|
||||
|
||||
// Stage 3: Descent
|
||||
if shouldSimulateStage(params, "descent") && lastResult.Latitude != nil {
|
||||
descentParams := ds.PredictionParameters{
|
||||
LaunchLatitude: lastResult.Latitude,
|
||||
LaunchLongitude: lastResult.Longitude,
|
||||
LaunchAltitude: lastResult.Altitude,
|
||||
LaunchDatetime: lastResult.Timestamp,
|
||||
}
|
||||
|
||||
descentResults := s.simulateDescent(ctx, descentParams, descentRate, 0, descentCurve)
|
||||
results = append(results, descentResults...)
|
||||
}
|
||||
|
||||
return results
|
||||
}
|
||||
|
||||
func (s *Service) reverseProfile(ctx context.Context, params ds.PredictionParameters, ascentRate, burstAltitude, descentRate float64, ascentCurve, descentCurve *CustomCurve) []ds.PredicitonResult {
|
||||
var results []ds.PredicitonResult
|
||||
var lastResult ds.PredicitonResult
|
||||
|
||||
// Stage 1: Ascent
|
||||
if shouldSimulateStage(params, "ascent") {
|
||||
ascentResults := s.simulateAscent(ctx, params, ascentRate, burstAltitude, ascentCurve)
|
||||
results = append(results, ascentResults...)
|
||||
if len(ascentResults) > 0 {
|
||||
lastResult = ascentResults[len(ascentResults)-1]
|
||||
}
|
||||
} else {
|
||||
// If ascent is skipped, use initial position at burst altitude as starting point
|
||||
lastResult = ds.PredicitonResult{
|
||||
Latitude: params.LaunchLatitude,
|
||||
Longitude: params.LaunchLongitude,
|
||||
Altitude: &burstAltitude,
|
||||
Timestamp: params.LaunchDatetime,
|
||||
}
|
||||
}
|
||||
|
||||
// Stage 2: Descent to float altitude
|
||||
floatAlt := 0.0
|
||||
if params.FloatAltitude != nil {
|
||||
floatAlt = *params.FloatAltitude
|
||||
}
|
||||
|
||||
if shouldSimulateStage(params, "descent") && lastResult.Latitude != nil {
|
||||
descentParams := ds.PredictionParameters{
|
||||
LaunchLatitude: lastResult.Latitude,
|
||||
LaunchLongitude: lastResult.Longitude,
|
||||
LaunchAltitude: lastResult.Altitude,
|
||||
LaunchDatetime: lastResult.Timestamp,
|
||||
}
|
||||
|
||||
descentResults := s.simulateDescent(ctx, descentParams, descentRate, floatAlt, descentCurve)
|
||||
results = append(results, descentResults...)
|
||||
if len(descentResults) > 0 {
|
||||
lastResult = descentResults[len(descentResults)-1]
|
||||
}
|
||||
} else if floatAlt > 0 {
|
||||
// If descent is skipped but we need to float, position at float altitude
|
||||
lastResult = ds.PredicitonResult{
|
||||
Latitude: lastResult.Latitude,
|
||||
Longitude: lastResult.Longitude,
|
||||
Altitude: &floatAlt,
|
||||
Timestamp: lastResult.Timestamp,
|
||||
}
|
||||
}
|
||||
|
||||
// Stage 3: Float
|
||||
if shouldSimulateStage(params, "float") && floatAlt > 0 && lastResult.Latitude != nil {
|
||||
floatResults := s.simulateFloat(ctx, lastResult, 30*time.Minute)
|
||||
results = append(results, floatResults...)
|
||||
}
|
||||
|
||||
return results
|
||||
}
|
||||
|
||||
func (s *Service) customProfile(ctx context.Context, params ds.PredictionParameters, ascentCurve, descentCurve *CustomCurve) []ds.PredicitonResult {
|
||||
var results []ds.PredicitonResult
|
||||
var lastResult ds.PredicitonResult
|
||||
|
||||
// Custom ascent
|
||||
if shouldSimulateStage(params, "ascent") && ascentCurve != nil {
|
||||
ascentResults := s.simulateCustomAscent(ctx, params, ascentCurve)
|
||||
results = append(results, ascentResults...)
|
||||
if len(ascentResults) > 0 {
|
||||
lastResult = ascentResults[len(ascentResults)-1]
|
||||
}
|
||||
} else if len(results) == 0 {
|
||||
// If ascent is skipped, use initial position
|
||||
lastResult = ds.PredicitonResult{
|
||||
Latitude: params.LaunchLatitude,
|
||||
Longitude: params.LaunchLongitude,
|
||||
Altitude: params.LaunchAltitude,
|
||||
Timestamp: params.LaunchDatetime,
|
||||
}
|
||||
}
|
||||
|
||||
// Custom descent
|
||||
if shouldSimulateStage(params, "descent") && descentCurve != nil && lastResult.Latitude != nil {
|
||||
descentParams := ds.PredictionParameters{
|
||||
LaunchLatitude: lastResult.Latitude,
|
||||
LaunchLongitude: lastResult.Longitude,
|
||||
LaunchAltitude: lastResult.Altitude,
|
||||
LaunchDatetime: lastResult.Timestamp,
|
||||
}
|
||||
|
||||
descentResults := s.simulateCustomDescent(ctx, descentParams, descentCurve)
|
||||
results = append(results, descentResults...)
|
||||
}
|
||||
|
||||
return results
|
||||
}
|
||||
|
||||
func rk4Step(lat, lon, alt float64, t time.Time, dt float64, windFunc func(lat, lon, alt float64, t time.Time) (float64, float64), altRate float64) (float64, float64, float64) {
|
||||
// Helper for RK4 integration step
|
||||
toRad := math.Pi / 180.0
|
||||
toDeg := 180.0 / math.Pi
|
||||
R := func(alt float64) float64 { return 6371009.0 + alt }
|
||||
|
||||
f := func(lat, lon, alt float64, t time.Time) (float64, float64, float64) {
|
||||
windU, windV := windFunc(lat, lon, alt, t)
|
||||
Rnow := R(alt)
|
||||
dlat := toDeg * windV / Rnow
|
||||
dlon := toDeg * windU / (Rnow * math.Cos(lat*toRad))
|
||||
return dlat, dlon, altRate
|
||||
}
|
||||
|
||||
k1_lat, k1_lon, k1_alt := f(lat, lon, alt, t)
|
||||
k2_lat, k2_lon, k2_alt := f(lat+0.5*k1_lat*dt, lon+0.5*k1_lon*dt, alt+0.5*k1_alt*dt, t.Add(time.Duration(0.5*dt)*time.Second))
|
||||
k3_lat, k3_lon, k3_alt := f(lat+0.5*k2_lat*dt, lon+0.5*k2_lon*dt, alt+0.5*k2_alt*dt, t.Add(time.Duration(0.5*dt)*time.Second))
|
||||
k4_lat, k4_lon, k4_alt := f(lat+k3_lat*dt, lon+k3_lon*dt, alt+k3_alt*dt, t.Add(time.Duration(dt)*time.Second))
|
||||
|
||||
latNew := lat + (dt/6.0)*(k1_lat+2*k2_lat+2*k3_lat+k4_lat)
|
||||
lonNew := lon + (dt/6.0)*(k1_lon+2*k2_lon+2*k3_lon+k4_lon)
|
||||
altNew := alt + (dt/6.0)*(k1_alt+2*k2_alt+2*k3_alt+k4_alt)
|
||||
return latNew, lonNew, altNew
|
||||
}
|
||||
|
||||
func (s *Service) simulateAscent(ctx context.Context, params ds.PredictionParameters, ascentRate, targetAltitude float64, customCurve *CustomCurve) []ds.PredicitonResult {
|
||||
const dt = 10.0 // simulation step in seconds
|
||||
const outputInterval = 60.0 // output every 60 seconds
|
||||
|
||||
log.Ctx(ctx).Warn("⬆️ ASCENT SIMULATION STARTING",
|
||||
zap.Float64("ascentRate", ascentRate),
|
||||
zap.Float64("targetAlt", targetAltitude))
|
||||
|
||||
lat := *params.LaunchLatitude
|
||||
lon := *params.LaunchLongitude
|
||||
alt := *params.LaunchAltitude
|
||||
timeCur := *params.LaunchDatetime
|
||||
|
||||
results := make([]ds.PredicitonResult, 0, 1000)
|
||||
|
||||
latCopy := lat
|
||||
lonCopy := lon
|
||||
altCopy := alt
|
||||
timeCopy := timeCur
|
||||
wind := [2]float64{0, 0}
|
||||
windU := wind[0]
|
||||
windV := wind[1]
|
||||
results = append(results, ds.PredicitonResult{
|
||||
Latitude: &latCopy,
|
||||
Longitude: &lonCopy,
|
||||
Altitude: &altCopy,
|
||||
Timestamp: &timeCopy,
|
||||
WindU: &windU,
|
||||
WindV: &windV,
|
||||
})
|
||||
|
||||
nextOutputTime := timeCur.Add(time.Duration(outputInterval) * time.Second)
|
||||
firstExtraction := true
|
||||
windFunc := func(lat, lon, alt float64, t time.Time) (float64, float64) {
|
||||
w, err := s.ExtractWind(ctx, lat, lon, alt, t)
|
||||
if err != nil {
|
||||
log.Ctx(ctx).Error("Wind extraction FAILED during ascent",
|
||||
zap.Error(err),
|
||||
zap.Float64("lat", lat),
|
||||
zap.Float64("lon", lon),
|
||||
zap.Float64("alt", alt),
|
||||
zap.Time("time", t))
|
||||
return 0, 0
|
||||
}
|
||||
// Log only first extraction and when wind is zero
|
||||
if firstExtraction || (w[0] == 0 && w[1] == 0) {
|
||||
log.Ctx(ctx).Warn("Wind data check",
|
||||
zap.Bool("first", firstExtraction),
|
||||
zap.Float64("lat", lat),
|
||||
zap.Float64("lon", lon),
|
||||
zap.Float64("alt", alt),
|
||||
zap.Float64("u", w[0]),
|
||||
zap.Float64("v", w[1]))
|
||||
firstExtraction = false
|
||||
}
|
||||
return w[0], w[1]
|
||||
}
|
||||
|
||||
for alt < targetAltitude {
|
||||
altRate := ascentRate
|
||||
if customCurve != nil {
|
||||
altRate = s.getCustomAltitudeRate(customCurve, alt, ascentRate)
|
||||
}
|
||||
latNew, lonNew, altNew := rk4Step(lat, lon, alt, timeCur, dt, windFunc, altRate)
|
||||
timeCur = timeCur.Add(time.Duration(dt) * time.Second)
|
||||
lat = latNew
|
||||
lon = lonNew
|
||||
alt = altNew
|
||||
|
||||
if alt >= targetAltitude {
|
||||
alt = targetAltitude
|
||||
// Record burst point
|
||||
wU, wV := windFunc(lat, lon, alt, timeCur)
|
||||
latCopy := lat
|
||||
lonCopy := lon
|
||||
altCopy := alt
|
||||
timeCopy := timeCur
|
||||
windU := wU
|
||||
windV := wV
|
||||
results = append(results, ds.PredicitonResult{
|
||||
Latitude: &latCopy,
|
||||
Longitude: &lonCopy,
|
||||
Altitude: &altCopy,
|
||||
Timestamp: &timeCopy,
|
||||
WindU: &windU,
|
||||
WindV: &windV,
|
||||
})
|
||||
break
|
||||
}
|
||||
|
||||
if !timeCur.Before(nextOutputTime) {
|
||||
wU, wV := windFunc(lat, lon, alt, timeCur)
|
||||
latCopy := lat
|
||||
lonCopy := lon
|
||||
altCopy := alt
|
||||
timeCopy := timeCur
|
||||
windU := wU
|
||||
windV := wV
|
||||
results = append(results, ds.PredicitonResult{
|
||||
Latitude: &latCopy,
|
||||
Longitude: &lonCopy,
|
||||
Altitude: &altCopy,
|
||||
Timestamp: &timeCopy,
|
||||
WindU: &windU,
|
||||
WindV: &windV,
|
||||
})
|
||||
nextOutputTime = nextOutputTime.Add(time.Duration(outputInterval) * time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
return results
|
||||
}
|
||||
|
||||
func (s *Service) simulateDescent(ctx context.Context, params ds.PredictionParameters, descentRate, targetAltitude float64, customCurve *CustomCurve) []ds.PredicitonResult {
|
||||
const dt = 10.0 // simulation step in seconds
|
||||
const outputInterval = 60.0 // output every 60 seconds
|
||||
|
||||
lat := *params.LaunchLatitude
|
||||
lon := *params.LaunchLongitude
|
||||
alt := *params.LaunchAltitude
|
||||
timeCur := *params.LaunchDatetime
|
||||
|
||||
results := make([]ds.PredicitonResult, 0, 1000)
|
||||
|
||||
latCopy := lat
|
||||
lonCopy := lon
|
||||
altCopy := alt
|
||||
timeCopy := timeCur
|
||||
wind := [2]float64{0, 0}
|
||||
windU := wind[0]
|
||||
windV := wind[1]
|
||||
results = append(results, ds.PredicitonResult{
|
||||
Latitude: &latCopy,
|
||||
Longitude: &lonCopy,
|
||||
Altitude: &altCopy,
|
||||
Timestamp: &timeCopy,
|
||||
WindU: &windU,
|
||||
WindV: &windV,
|
||||
})
|
||||
|
||||
nextOutputTime := timeCur.Add(time.Duration(outputInterval) * time.Second)
|
||||
windFunc := func(lat, lon, alt float64, t time.Time) (float64, float64) {
|
||||
w, err := s.ExtractWind(ctx, lat, lon, alt, t)
|
||||
if err != nil {
|
||||
log.Ctx(ctx).Error("Wind extraction FAILED during descent",
|
||||
zap.Error(err),
|
||||
zap.Float64("lat", lat),
|
||||
zap.Float64("lon", lon),
|
||||
zap.Float64("alt", alt),
|
||||
zap.Time("time", t))
|
||||
return 0, 0
|
||||
}
|
||||
return w[0], w[1]
|
||||
}
|
||||
|
||||
for alt > targetAltitude {
|
||||
altRate := -descentRateAtAlt(descentRate, alt)
|
||||
if customCurve != nil {
|
||||
altRate = -s.getCustomAltitudeRate(customCurve, alt, descentRate)
|
||||
}
|
||||
latNew, lonNew, altNew := rk4Step(lat, lon, alt, timeCur, dt, windFunc, altRate)
|
||||
timeCur = timeCur.Add(time.Duration(dt) * time.Second)
|
||||
lat = latNew
|
||||
lon = lonNew
|
||||
alt = altNew
|
||||
|
||||
if alt <= targetAltitude {
|
||||
alt = targetAltitude
|
||||
// Record landing point
|
||||
wU, wV := windFunc(lat, lon, alt, timeCur)
|
||||
latCopy := lat
|
||||
lonCopy := lon
|
||||
altCopy := alt
|
||||
timeCopy := timeCur
|
||||
windU := wU
|
||||
windV := wV
|
||||
results = append(results, ds.PredicitonResult{
|
||||
Latitude: &latCopy,
|
||||
Longitude: &lonCopy,
|
||||
Altitude: &altCopy,
|
||||
Timestamp: &timeCopy,
|
||||
WindU: &windU,
|
||||
WindV: &windV,
|
||||
})
|
||||
break
|
||||
}
|
||||
|
||||
if !timeCur.Before(nextOutputTime) {
|
||||
wU, wV := windFunc(lat, lon, alt, timeCur)
|
||||
latCopy := lat
|
||||
lonCopy := lon
|
||||
altCopy := alt
|
||||
timeCopy := timeCur
|
||||
windU := wU
|
||||
windV := wV
|
||||
results = append(results, ds.PredicitonResult{
|
||||
Latitude: &latCopy,
|
||||
Longitude: &lonCopy,
|
||||
Altitude: &altCopy,
|
||||
Timestamp: &timeCopy,
|
||||
WindU: &windU,
|
||||
WindV: &windV,
|
||||
})
|
||||
nextOutputTime = nextOutputTime.Add(time.Duration(outputInterval) * time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
return results
|
||||
}
|
||||
|
||||
func (s *Service) simulateFloat(ctx context.Context, startResult ds.PredicitonResult, duration time.Duration) []ds.PredicitonResult {
|
||||
const dt = 10.0 // simulation step in seconds
|
||||
const outputInterval = 60.0 // output every 60 seconds
|
||||
|
||||
lat := *startResult.Latitude
|
||||
lon := *startResult.Longitude
|
||||
alt := *startResult.Altitude
|
||||
timeCur := *startResult.Timestamp
|
||||
endTime := timeCur.Add(duration)
|
||||
|
||||
results := make([]ds.PredicitonResult, 0, 1000)
|
||||
|
||||
// Always include the initial float point
|
||||
latCopy := lat
|
||||
lonCopy := lon
|
||||
altCopy := alt
|
||||
timeCopy := timeCur
|
||||
wind := [2]float64{0, 0}
|
||||
windU := wind[0]
|
||||
windV := wind[1]
|
||||
results = append(results, ds.PredicitonResult{
|
||||
Latitude: &latCopy,
|
||||
Longitude: &lonCopy,
|
||||
Altitude: &altCopy,
|
||||
Timestamp: &timeCopy,
|
||||
WindU: &windU,
|
||||
WindV: &windV,
|
||||
})
|
||||
|
||||
var nextOutputTime = timeCur.Add(time.Duration(outputInterval) * time.Second)
|
||||
|
||||
for timeCur.Before(endTime) {
|
||||
wind, err := s.ExtractWind(ctx, lat, lon, alt, timeCur)
|
||||
if err != nil {
|
||||
log.Ctx(ctx).Warn("Wind extraction failed during float", zap.Error(err))
|
||||
break
|
||||
}
|
||||
|
||||
latDot := (wind[1] / 111320.0)
|
||||
lonDot := (wind[0] / (40075000.0 * math.Cos(lat*math.Pi/180) / 360.0))
|
||||
|
||||
lat += latDot * dt
|
||||
lon += lonDot * dt
|
||||
// alt remains constant during float
|
||||
timeCur = timeCur.Add(time.Duration(dt) * time.Second)
|
||||
|
||||
if !timeCur.Before(nextOutputTime) {
|
||||
latCopy := lat
|
||||
lonCopy := lon
|
||||
altCopy := alt
|
||||
timeCopy := timeCur
|
||||
windU := wind[0]
|
||||
windV := wind[1]
|
||||
results = append(results, ds.PredicitonResult{
|
||||
Latitude: &latCopy,
|
||||
Longitude: &lonCopy,
|
||||
Altitude: &altCopy,
|
||||
Timestamp: &timeCopy,
|
||||
WindU: &windU,
|
||||
WindV: &windV,
|
||||
})
|
||||
nextOutputTime = nextOutputTime.Add(time.Duration(outputInterval) * time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
return results
|
||||
}
|
||||
|
||||
// airDensity returns ISA air density in kg/m³ at given altitude in meters
|
||||
func airDensity(h float64) float64 {
|
||||
var T, p float64
|
||||
switch {
|
||||
case h < 11000:
|
||||
T = 288.15 - 0.0065*h
|
||||
p = 101325 * math.Pow(T/288.15, 5.2561)
|
||||
case h < 20000:
|
||||
T = 216.65
|
||||
p = 22632.1 * math.Exp(-0.00015769*(h-11000))
|
||||
case h < 32000:
|
||||
T = 216.65 + 0.001*(h-20000)
|
||||
p = 5474.89 * math.Pow(T/216.65, -34.1632)
|
||||
default:
|
||||
T = 228.65 + 0.0028*(h-32000)
|
||||
p = 868.019 * math.Pow(T/228.65, -12.2009)
|
||||
}
|
||||
return p / (287.05 * T)
|
||||
}
|
||||
|
||||
// descentRateAtAlt returns descent rate adjusted for air density at altitude.
|
||||
// descent_rate parameter is the sea-level rate. At altitude, thinner air means faster descent.
|
||||
func descentRateAtAlt(seaLevelRate, alt float64) float64 {
|
||||
rho0 := airDensity(0)
|
||||
rhoH := airDensity(alt)
|
||||
if rhoH <= 0 {
|
||||
return seaLevelRate
|
||||
}
|
||||
return seaLevelRate * math.Sqrt(rho0/rhoH)
|
||||
}
|
||||
|
||||
func (s *Service) simulateCustomAscent(ctx context.Context, params ds.PredictionParameters, curve *CustomCurve) []ds.PredicitonResult {
|
||||
// Implementation for custom ascent curve
|
||||
// This would interpolate the altitude rate from the custom curve
|
||||
return s.simulateAscent(ctx, params, 5.0, 30000.0, curve)
|
||||
}
|
||||
|
||||
func (s *Service) simulateCustomDescent(ctx context.Context, params ds.PredictionParameters, curve *CustomCurve) []ds.PredicitonResult {
|
||||
// Implementation for custom descent curve
|
||||
// This would interpolate the altitude rate from the custom curve
|
||||
return s.simulateDescent(ctx, params, 5.0, 0.0, curve)
|
||||
}
|
||||
|
||||
func (s *Service) getCustomAltitudeRate(curve *CustomCurve, currentAltitude, defaultRate float64) float64 {
|
||||
if curve == nil || len(curve.Altitude) < 2 {
|
||||
return defaultRate
|
||||
}
|
||||
|
||||
// Find the two points in the curve that bracket the current altitude
|
||||
for i := 0; i < len(curve.Altitude)-1; i++ {
|
||||
if curve.Altitude[i] <= currentAltitude && currentAltitude <= curve.Altitude[i+1] {
|
||||
// Linear interpolation
|
||||
alt1, alt2 := curve.Altitude[i], curve.Altitude[i+1]
|
||||
time1, time2 := curve.Time[i], curve.Time[i+1]
|
||||
|
||||
if alt2 == alt1 {
|
||||
return defaultRate
|
||||
}
|
||||
|
||||
// Calculate rate (change in altitude per second)
|
||||
if time2 > time1 {
|
||||
return (alt2 - alt1) / (time2 - time1)
|
||||
}
|
||||
return defaultRate
|
||||
}
|
||||
}
|
||||
|
||||
return defaultRate
|
||||
}
|
||||
|
||||
func parseCustomCurve(base64Data string) (*CustomCurve, error) {
|
||||
data, err := base64.StdEncoding.DecodeString(base64Data)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var curve CustomCurve
|
||||
if err := json.Unmarshal(data, &curve); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &curve, nil
|
||||
}
|
||||
|
|
@ -1,492 +0,0 @@
|
|||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"git.intra.yksa.space/gsn/predictor/internal/pkg/ds"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
)
|
||||
|
||||
// MockGrib is a mock implementation of the Grib interface
|
||||
type MockGrib struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
func (m *MockGrib) Update(ctx context.Context) error {
|
||||
args := m.Called(ctx)
|
||||
return args.Error(0)
|
||||
}
|
||||
|
||||
func (m *MockGrib) Extract(ctx context.Context, lat, lon, alt float64, t time.Time) ([2]float64, error) {
|
||||
args := m.Called(ctx, lat, lon, alt, t)
|
||||
return args.Get(0).([2]float64), args.Error(1)
|
||||
}
|
||||
|
||||
func (m *MockGrib) Close() error {
|
||||
args := m.Called()
|
||||
return args.Error(0)
|
||||
}
|
||||
|
||||
// Helper function to create a test service with mocked GRIB
|
||||
func createTestService() (*Service, *MockGrib) {
|
||||
mockGrib := new(MockGrib)
|
||||
|
||||
// Default mock behavior: return constant wind (5 m/s east, 3 m/s north)
|
||||
mockGrib.On("Extract", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
|
||||
Return([2]float64{5.0, 3.0}, nil)
|
||||
|
||||
service := &Service{
|
||||
grib: mockGrib,
|
||||
}
|
||||
|
||||
return service, mockGrib
|
||||
}
|
||||
|
||||
// Helper function to create basic prediction parameters
|
||||
func createBasicParams() ds.PredictionParameters {
|
||||
lat := 40.0
|
||||
lon := -105.0
|
||||
alt := 1000.0
|
||||
launchTime := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC)
|
||||
profile := "standard_profile"
|
||||
ascentRate := 5.0
|
||||
burstAltitude := 10000.0
|
||||
descentRate := 5.0
|
||||
|
||||
return ds.PredictionParameters{
|
||||
LaunchLatitude: &lat,
|
||||
LaunchLongitude: &lon,
|
||||
LaunchAltitude: &alt,
|
||||
LaunchDatetime: &launchTime,
|
||||
Profile: &profile,
|
||||
AscentRate: &ascentRate,
|
||||
BurstAltitude: &burstAltitude,
|
||||
DescentRate: &descentRate,
|
||||
}
|
||||
}
|
||||
|
||||
func TestRestrictedPrediction_OnlyAscent(t *testing.T) {
|
||||
service, _ := createTestService()
|
||||
params := createBasicParams()
|
||||
|
||||
// Restrict to ascent only
|
||||
params.SimulateStages = []string{"ascent"}
|
||||
|
||||
results, err := service.PerformPrediction(context.Background(), params)
|
||||
|
||||
assert.NoError(t, err)
|
||||
assert.NotEmpty(t, results)
|
||||
|
||||
// Verify all results are during ascent phase (altitude increasing)
|
||||
for i := 1; i < len(results); i++ {
|
||||
assert.GreaterOrEqual(t, *results[i].Altitude, *results[i-1].Altitude,
|
||||
"Altitude should be increasing or equal during ascent")
|
||||
}
|
||||
|
||||
// Last altitude should be near burst altitude
|
||||
lastAlt := *results[len(results)-1].Altitude
|
||||
burstAlt := *params.BurstAltitude
|
||||
assert.InDelta(t, burstAlt, lastAlt, 500.0, "Last altitude should be near burst altitude")
|
||||
}
|
||||
|
||||
func TestRestrictedPrediction_OnlyDescent(t *testing.T) {
|
||||
service, _ := createTestService()
|
||||
params := createBasicParams()
|
||||
|
||||
// Restrict to descent only
|
||||
params.SimulateStages = []string{"descent"}
|
||||
|
||||
results, err := service.PerformPrediction(context.Background(), params)
|
||||
|
||||
assert.NoError(t, err)
|
||||
assert.NotEmpty(t, results)
|
||||
|
||||
// First result should be at burst altitude (since ascent was skipped)
|
||||
firstAlt := *results[0].Altitude
|
||||
burstAlt := *params.BurstAltitude
|
||||
assert.Equal(t, burstAlt, firstAlt, "Should start at burst altitude when ascent is skipped")
|
||||
|
||||
// Verify all results are during descent phase (altitude decreasing)
|
||||
for i := 1; i < len(results); i++ {
|
||||
assert.LessOrEqual(t, *results[i].Altitude, *results[i-1].Altitude,
|
||||
"Altitude should be decreasing or equal during descent")
|
||||
}
|
||||
|
||||
// Last altitude should be near ground
|
||||
lastAlt := *results[len(results)-1].Altitude
|
||||
assert.Less(t, lastAlt, 1000.0, "Last altitude should be near ground")
|
||||
}
|
||||
|
||||
func TestRestrictedPrediction_AscentAndDescent(t *testing.T) {
|
||||
service, _ := createTestService()
|
||||
params := createBasicParams()
|
||||
|
||||
// Include both ascent and descent
|
||||
params.SimulateStages = []string{"ascent", "descent"}
|
||||
|
||||
results, err := service.PerformPrediction(context.Background(), params)
|
||||
|
||||
assert.NoError(t, err)
|
||||
assert.NotEmpty(t, results)
|
||||
|
||||
// Find the peak altitude (transition point)
|
||||
maxAlt := 0.0
|
||||
maxIdx := 0
|
||||
for i, result := range results {
|
||||
if *result.Altitude > maxAlt {
|
||||
maxAlt = *result.Altitude
|
||||
maxIdx = i
|
||||
}
|
||||
}
|
||||
|
||||
// Verify ascent phase
|
||||
for i := 1; i <= maxIdx; i++ {
|
||||
assert.GreaterOrEqual(t, *results[i].Altitude, *results[i-1].Altitude,
|
||||
"Altitude should increase during ascent phase")
|
||||
}
|
||||
|
||||
// Verify descent phase
|
||||
for i := maxIdx + 1; i < len(results); i++ {
|
||||
assert.LessOrEqual(t, *results[i].Altitude, *results[i-1].Altitude,
|
||||
"Altitude should decrease during descent phase")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRestrictedPrediction_FloatProfile_OnlyFloat(t *testing.T) {
|
||||
service, _ := createTestService()
|
||||
params := createBasicParams()
|
||||
|
||||
profile := "float_profile"
|
||||
floatAlt := 15000.0
|
||||
params.Profile = &profile
|
||||
params.FloatAltitude = &floatAlt
|
||||
|
||||
// Restrict to float only
|
||||
params.SimulateStages = []string{"float"}
|
||||
|
||||
results, err := service.PerformPrediction(context.Background(), params)
|
||||
|
||||
assert.NoError(t, err)
|
||||
assert.NotEmpty(t, results)
|
||||
|
||||
// All results should be at the float altitude
|
||||
for _, result := range results {
|
||||
assert.Equal(t, floatAlt, *result.Altitude,
|
||||
"Altitude should remain constant at float altitude")
|
||||
}
|
||||
|
||||
// Verify horizontal movement (lat/lon changes due to wind)
|
||||
firstLat := *results[0].Latitude
|
||||
lastLat := *results[len(results)-1].Latitude
|
||||
assert.NotEqual(t, firstLat, lastLat, "Latitude should change during float due to wind")
|
||||
}
|
||||
|
||||
func TestRestrictedPrediction_FloatProfile_AllStages(t *testing.T) {
|
||||
service, _ := createTestService()
|
||||
params := createBasicParams()
|
||||
|
||||
profile := "float_profile"
|
||||
floatAlt := 15000.0
|
||||
params.Profile = &profile
|
||||
params.FloatAltitude = &floatAlt
|
||||
|
||||
// Include all stages
|
||||
params.SimulateStages = []string{"ascent", "float", "descent"}
|
||||
|
||||
results, err := service.PerformPrediction(context.Background(), params)
|
||||
|
||||
assert.NoError(t, err)
|
||||
assert.NotEmpty(t, results)
|
||||
|
||||
// Verify we have ascending, constant, and descending altitude patterns
|
||||
hasAscent := false
|
||||
hasFloat := false
|
||||
hasDescent := false
|
||||
|
||||
const altTolerance = 50.0 // Tolerance for altitude comparison
|
||||
|
||||
for i := 1; i < len(results); i++ {
|
||||
altDiff := *results[i].Altitude - *results[i-1].Altitude
|
||||
|
||||
if altDiff > altTolerance {
|
||||
hasAscent = true
|
||||
} else if altDiff < -altTolerance {
|
||||
hasDescent = true
|
||||
} else if *results[i].Altitude > 10000 { // Float happens at high altitude
|
||||
hasFloat = true
|
||||
}
|
||||
}
|
||||
|
||||
assert.True(t, hasAscent, "Should have ascent phase")
|
||||
assert.True(t, hasFloat, "Should have float phase")
|
||||
assert.True(t, hasDescent, "Should have descent phase")
|
||||
|
||||
// Verify maximum altitude is near float altitude
|
||||
maxAlt := 0.0
|
||||
for _, result := range results {
|
||||
if *result.Altitude > maxAlt {
|
||||
maxAlt = *result.Altitude
|
||||
}
|
||||
}
|
||||
assert.InDelta(t, floatAlt, maxAlt, 1000.0, "Max altitude should be near float altitude")
|
||||
}
|
||||
|
||||
func TestRestrictedPrediction_ReverseProfile_OnlyFloat(t *testing.T) {
|
||||
service, _ := createTestService()
|
||||
params := createBasicParams()
|
||||
|
||||
profile := "reverse_profile"
|
||||
floatAlt := 5000.0
|
||||
params.Profile = &profile
|
||||
params.FloatAltitude = &floatAlt
|
||||
|
||||
// Restrict to float only
|
||||
params.SimulateStages = []string{"float"}
|
||||
|
||||
results, err := service.PerformPrediction(context.Background(), params)
|
||||
|
||||
assert.NoError(t, err)
|
||||
assert.NotEmpty(t, results)
|
||||
|
||||
// All results should be at the float altitude
|
||||
for _, result := range results {
|
||||
assert.InDelta(t, floatAlt, *result.Altitude, 10.0,
|
||||
"Altitude should remain near float altitude")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRestrictedPrediction_EmptyStages_SimulatesAll(t *testing.T) {
|
||||
service, _ := createTestService()
|
||||
params := createBasicParams()
|
||||
|
||||
// Empty SimulateStages should simulate all stages
|
||||
params.SimulateStages = []string{}
|
||||
|
||||
results, err := service.PerformPrediction(context.Background(), params)
|
||||
|
||||
assert.NoError(t, err)
|
||||
assert.NotEmpty(t, results)
|
||||
|
||||
// Should have both ascent and descent
|
||||
// Find the peak
|
||||
maxAlt := 0.0
|
||||
hasAscent := false
|
||||
hasDescent := false
|
||||
|
||||
for i := 1; i < len(results); i++ {
|
||||
if *results[i].Altitude > *results[i-1].Altitude {
|
||||
hasAscent = true
|
||||
}
|
||||
if *results[i].Altitude < *results[i-1].Altitude {
|
||||
hasDescent = true
|
||||
}
|
||||
if *results[i].Altitude > maxAlt {
|
||||
maxAlt = *results[i].Altitude
|
||||
}
|
||||
}
|
||||
|
||||
assert.True(t, hasAscent, "Should have ascent phase")
|
||||
assert.True(t, hasDescent, "Should have descent phase")
|
||||
}
|
||||
|
||||
func TestRestrictedPrediction_NilStages_SimulatesAll(t *testing.T) {
|
||||
service, _ := createTestService()
|
||||
params := createBasicParams()
|
||||
|
||||
// Nil SimulateStages should simulate all stages
|
||||
params.SimulateStages = nil
|
||||
|
||||
results, err := service.PerformPrediction(context.Background(), params)
|
||||
|
||||
assert.NoError(t, err)
|
||||
assert.NotEmpty(t, results)
|
||||
|
||||
// Should have both ascent and descent
|
||||
maxAlt := 0.0
|
||||
minAltAfterMax := 1000000.0
|
||||
|
||||
for _, result := range results {
|
||||
if *result.Altitude > maxAlt {
|
||||
maxAlt = *result.Altitude
|
||||
}
|
||||
}
|
||||
|
||||
foundMax := false
|
||||
for _, result := range results {
|
||||
if *result.Altitude == maxAlt {
|
||||
foundMax = true
|
||||
}
|
||||
if foundMax && *result.Altitude < minAltAfterMax {
|
||||
minAltAfterMax = *result.Altitude
|
||||
}
|
||||
}
|
||||
|
||||
// Should reach high altitude and come back down
|
||||
assert.Greater(t, maxAlt, 5000.0, "Should reach high altitude")
|
||||
assert.Less(t, minAltAfterMax, maxAlt, "Should descend after reaching max altitude")
|
||||
}
|
||||
|
||||
func TestRestrictedPrediction_InvalidStage_IgnoresInvalid(t *testing.T) {
|
||||
service, _ := createTestService()
|
||||
params := createBasicParams()
|
||||
|
||||
// Include invalid stage name (should be ignored)
|
||||
params.SimulateStages = []string{"ascent", "invalid_stage", "descent"}
|
||||
|
||||
results, err := service.PerformPrediction(context.Background(), params)
|
||||
|
||||
assert.NoError(t, err)
|
||||
assert.NotEmpty(t, results)
|
||||
// Should still simulate ascent and descent, ignoring the invalid stage
|
||||
}
|
||||
|
||||
func TestRestrictedPrediction_WindImpact(t *testing.T) {
|
||||
service, mockGrib := createTestService()
|
||||
|
||||
// Override mock to return strong eastward wind
|
||||
mockGrib.ExpectedCalls = nil
|
||||
mockGrib.On("Extract", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
|
||||
Return([2]float64{20.0, 0.0}, nil) // Strong eastward wind
|
||||
|
||||
params := createBasicParams()
|
||||
params.SimulateStages = []string{"ascent"}
|
||||
|
||||
results, err := service.PerformPrediction(context.Background(), params)
|
||||
|
||||
assert.NoError(t, err)
|
||||
assert.NotEmpty(t, results)
|
||||
|
||||
// Longitude should increase significantly due to eastward wind
|
||||
firstLon := *results[0].Longitude
|
||||
lastLon := *results[len(results)-1].Longitude
|
||||
assert.Greater(t, lastLon, firstLon, "Longitude should increase with eastward wind")
|
||||
|
||||
// Verify wind values are captured in results
|
||||
for _, result := range results {
|
||||
if result.WindU != nil {
|
||||
// Wind values should be present in results
|
||||
assert.NotNil(t, result.WindV, "WindV should be present if WindU is present")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestRestrictedPrediction_MissingRequiredParams(t *testing.T) {
|
||||
service, _ := createTestService()
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
params ds.PredictionParameters
|
||||
}{
|
||||
{
|
||||
name: "Missing latitude",
|
||||
params: ds.PredictionParameters{
|
||||
LaunchLongitude: floatPtr(-105.0),
|
||||
LaunchAltitude: floatPtr(1000.0),
|
||||
LaunchDatetime: timePtr(time.Now()),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Missing longitude",
|
||||
params: ds.PredictionParameters{
|
||||
LaunchLatitude: floatPtr(40.0),
|
||||
LaunchAltitude: floatPtr(1000.0),
|
||||
LaunchDatetime: timePtr(time.Now()),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Missing altitude",
|
||||
params: ds.PredictionParameters{
|
||||
LaunchLatitude: floatPtr(40.0),
|
||||
LaunchLongitude: floatPtr(-105.0),
|
||||
LaunchDatetime: timePtr(time.Now()),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Missing datetime",
|
||||
params: ds.PredictionParameters{
|
||||
LaunchLatitude: floatPtr(40.0),
|
||||
LaunchLongitude: floatPtr(-105.0),
|
||||
LaunchAltitude: floatPtr(1000.0),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
tc.params.SimulateStages = []string{"ascent"}
|
||||
results, err := service.PerformPrediction(context.Background(), tc.params)
|
||||
|
||||
assert.Error(t, err)
|
||||
assert.Equal(t, ErrInvalidParameters, err)
|
||||
assert.Nil(t, results)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestShouldSimulateStage(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
stages []string
|
||||
queryStage string
|
||||
shouldSimulate bool
|
||||
}{
|
||||
{
|
||||
name: "Empty filter simulates all",
|
||||
stages: []string{},
|
||||
queryStage: "ascent",
|
||||
shouldSimulate: true,
|
||||
},
|
||||
{
|
||||
name: "Nil filter simulates all",
|
||||
stages: nil,
|
||||
queryStage: "descent",
|
||||
shouldSimulate: true,
|
||||
},
|
||||
{
|
||||
name: "Stage in filter",
|
||||
stages: []string{"ascent", "descent"},
|
||||
queryStage: "ascent",
|
||||
shouldSimulate: true,
|
||||
},
|
||||
{
|
||||
name: "Stage not in filter",
|
||||
stages: []string{"ascent"},
|
||||
queryStage: "descent",
|
||||
shouldSimulate: false,
|
||||
},
|
||||
{
|
||||
name: "Float stage in filter",
|
||||
stages: []string{"float"},
|
||||
queryStage: "float",
|
||||
shouldSimulate: true,
|
||||
},
|
||||
{
|
||||
name: "Multiple stages excluding one",
|
||||
stages: []string{"ascent", "float"},
|
||||
queryStage: "descent",
|
||||
shouldSimulate: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
params := ds.PredictionParameters{
|
||||
SimulateStages: tc.stages,
|
||||
}
|
||||
result := shouldSimulateStage(params, tc.queryStage)
|
||||
assert.Equal(t, tc.shouldSimulate, result)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Helper functions
|
||||
func floatPtr(f float64) *float64 {
|
||||
return &f
|
||||
}
|
||||
|
||||
func timePtr(t time.Time) *time.Time {
|
||||
return &t
|
||||
}
|
||||
|
|
@ -1,60 +0,0 @@
|
|||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"git.intra.yksa.space/gsn/predictor/internal/pkg/log"
|
||||
)
|
||||
|
||||
type Service struct {
|
||||
grib Grib
|
||||
}
|
||||
|
||||
func New(gribService Grib) (*Service, error) {
|
||||
svc := &Service{
|
||||
grib: gribService,
|
||||
}
|
||||
|
||||
return svc, nil
|
||||
}
|
||||
|
||||
// UpdateWeatherData updates weather forecast data using the configured grib service
|
||||
func (s *Service) UpdateWeatherData(ctx context.Context) error {
|
||||
return s.grib.Update(ctx)
|
||||
}
|
||||
|
||||
// ExtractWind extracts wind data for given coordinates and time
|
||||
func (s *Service) ExtractWind(ctx context.Context, lat, lon, alt float64, ts time.Time) ([2]float64, error) {
|
||||
return s.grib.Extract(ctx, lat, lon, alt, ts)
|
||||
}
|
||||
|
||||
// Update updates the GRIB data (implements updater.GribService)
|
||||
func (s *Service) Update(ctx context.Context) error {
|
||||
return s.UpdateWeatherData(ctx)
|
||||
}
|
||||
|
||||
// Start starts the service
|
||||
func (s *Service) Start() {
|
||||
log.Ctx(context.Background()).Info("service started")
|
||||
}
|
||||
|
||||
// Stop stops the service
|
||||
func (s *Service) Stop() {
|
||||
log.Ctx(context.Background()).Info("service stopped")
|
||||
}
|
||||
|
||||
// Close closes the service and releases resources
|
||||
func (s *Service) Close() error {
|
||||
s.Stop()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) GetGribStatus(ctx context.Context) (ready bool, lastUpdate time.Time, isFresh bool, errMsg string) {
|
||||
if gribStatus, ok := s.grib.(interface {
|
||||
GetStatus() (ready bool, lastUpdate time.Time, isFresh bool, errMsg string)
|
||||
}); ok {
|
||||
return gribStatus.GetStatus()
|
||||
}
|
||||
return false, time.Time{}, false, "grib service does not implement GetStatus"
|
||||
}
|
||||
|
|
@ -1,44 +0,0 @@
|
|||
package middleware
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"git.intra.yksa.space/gsn/predictor/internal/pkg/errcodes"
|
||||
"git.intra.yksa.space/gsn/predictor/internal/pkg/log"
|
||||
"github.com/ogen-go/ogen/middleware"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func Logging() middleware.Middleware {
|
||||
return func(req middleware.Request, next func(req middleware.Request) (middleware.Response, error)) (middleware.Response, error) {
|
||||
lg := log.Ctx(req.Context).With(
|
||||
zap.String("operationId", req.OperationID),
|
||||
)
|
||||
|
||||
lg.Info("started request")
|
||||
|
||||
req.Context = log.ToCtx(req.Context, lg)
|
||||
|
||||
start := time.Now()
|
||||
resp, err := next(req)
|
||||
dur := time.Since(start).Microseconds()
|
||||
|
||||
if err != nil {
|
||||
if errcode, ok := err.(*errcodes.ErrorCode); ok {
|
||||
lg.Error("request error",
|
||||
zap.Int("status_code", errcode.StatusCode),
|
||||
zap.String("message", errcode.Message),
|
||||
zap.String("details", errcode.Details),
|
||||
)
|
||||
} else {
|
||||
lg.Error("request internal error",
|
||||
zap.Error(err),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
lg.Info("done request", zap.Float64("duration_ms", float64(dur)/float64(1000)))
|
||||
|
||||
return resp, err
|
||||
}
|
||||
}
|
||||
|
|
@ -1,24 +0,0 @@
|
|||
package rest
|
||||
|
||||
import (
|
||||
"git.intra.yksa.space/gsn/predictor/internal/pkg/errcodes"
|
||||
env "github.com/caarlos0/env/v11"
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
Host string `env:"HOST" envDefault:"0.0.0.0"`
|
||||
Port int `env:"PORT" envDefault:"8080"`
|
||||
ReadTimeout string `env:"READ_TIMEOUT" envDefault:"30s"`
|
||||
WriteTimeout string `env:"WRITE_TIMEOUT" envDefault:"30s"`
|
||||
IdleTimeout string `env:"IDLE_TIMEOUT" envDefault:"60s"`
|
||||
}
|
||||
|
||||
func NewConfig() (*Config, error) {
|
||||
cfg := &Config{}
|
||||
if err := env.ParseWithOptions(cfg, env.Options{
|
||||
PrefixTagName: "GSN_PREDICTOR_REST_",
|
||||
}); err != nil {
|
||||
return nil, errcodes.Wrap(err, "failed to parse REST config")
|
||||
}
|
||||
return cfg, nil
|
||||
}
|
||||
|
|
@ -1,14 +0,0 @@
|
|||
package handler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"git.intra.yksa.space/gsn/predictor/internal/pkg/ds"
|
||||
)
|
||||
|
||||
type Service interface {
|
||||
UpdateWeatherData(ctx context.Context) error
|
||||
ExtractWind(ctx context.Context, lat, lon, alt float64, ts time.Time) ([2]float64, error)
|
||||
PerformPrediction(ctx context.Context, params ds.PredictionParameters) ([]ds.PredicitonResult, error)
|
||||
}
|
||||
|
|
@ -1,194 +0,0 @@
|
|||
package handler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"git.intra.yksa.space/gsn/predictor/internal/pkg/ds"
|
||||
"git.intra.yksa.space/gsn/predictor/internal/pkg/errcodes"
|
||||
api "git.intra.yksa.space/gsn/predictor/pkg/rest"
|
||||
)
|
||||
|
||||
var (
|
||||
_ api.Handler = (*Handler)(nil)
|
||||
)
|
||||
|
||||
type Handler struct {
|
||||
svc Service
|
||||
}
|
||||
|
||||
func New(svc Service) *Handler {
|
||||
return &Handler{
|
||||
svc: svc,
|
||||
}
|
||||
}
|
||||
|
||||
func (h *Handler) PerformPrediction(ctx context.Context, params api.PerformPredictionParams) (*api.PredictionResult, error) {
|
||||
internalParams := ds.ConvertFlatPredictionParams(params)
|
||||
if internalParams == nil {
|
||||
return nil, errcodes.New(http.StatusBadRequest, "invalid or missing parameters")
|
||||
}
|
||||
results, err := h.svc.PerformPrediction(ctx, *internalParams)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(results) == 0 {
|
||||
return nil, errcodes.New(http.StatusInternalServerError, "no prediction results")
|
||||
}
|
||||
|
||||
// Group results into stages (ascent and descent)
|
||||
stages := h.groupResultsIntoStages(results)
|
||||
|
||||
// Map to OpenAPI schema
|
||||
var predictionItems []api.PredictionResultPredictionItem
|
||||
|
||||
for _, stage := range stages {
|
||||
var trajectory []api.PredictionResultPredictionItemTrajectoryItem
|
||||
|
||||
for _, result := range stage.Results {
|
||||
traj := api.PredictionResultPredictionItemTrajectoryItem{
|
||||
Datetime: *result.Timestamp,
|
||||
Latitude: *result.Latitude,
|
||||
Longitude: *result.Longitude,
|
||||
Altitude: *result.Altitude,
|
||||
}
|
||||
trajectory = append(trajectory, traj)
|
||||
}
|
||||
|
||||
item := api.PredictionResultPredictionItem{
|
||||
Stage: stage.Stage,
|
||||
Trajectory: trajectory,
|
||||
}
|
||||
predictionItems = append(predictionItems, item)
|
||||
}
|
||||
|
||||
metadata := api.PredictionResultMetadata{
|
||||
StartDatetime: *results[0].Timestamp,
|
||||
CompleteDatetime: *results[len(results)-1].Timestamp,
|
||||
}
|
||||
|
||||
resp := &api.PredictionResult{
|
||||
Metadata: metadata,
|
||||
Prediction: predictionItems,
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// StageResult represents a stage with its results
|
||||
type StageResult struct {
|
||||
Stage api.PredictionResultPredictionItemStage
|
||||
Results []ds.PredicitonResult
|
||||
}
|
||||
|
||||
// groupResultsIntoStages groups the prediction results into ascent and descent stages
|
||||
func (h *Handler) groupResultsIntoStages(results []ds.PredicitonResult) []StageResult {
|
||||
if len(results) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
var stages []StageResult
|
||||
var currentStage []ds.PredicitonResult
|
||||
var currentStageType api.PredictionResultPredictionItemStage
|
||||
|
||||
// Determine if we're in ascent or descent based on altitude changes
|
||||
prevAlt := *results[0].Altitude
|
||||
currentStage = append(currentStage, results[0])
|
||||
currentStageType = api.PredictionResultPredictionItemStageAscent
|
||||
|
||||
for i := 1; i < len(results); i++ {
|
||||
result := results[i]
|
||||
currentAlt := *result.Altitude
|
||||
|
||||
// Determine if we're still in the same stage
|
||||
var stageType api.PredictionResultPredictionItemStage
|
||||
if currentAlt > prevAlt {
|
||||
stageType = api.PredictionResultPredictionItemStageAscent
|
||||
} else if currentAlt < prevAlt {
|
||||
stageType = api.PredictionResultPredictionItemStageDescent
|
||||
} else {
|
||||
// Same altitude - continue with current stage
|
||||
stageType = currentStageType
|
||||
}
|
||||
|
||||
// If stage type changed, finalize current stage and start new one
|
||||
if stageType != currentStageType && len(currentStage) > 0 {
|
||||
stages = append(stages, StageResult{
|
||||
Stage: currentStageType,
|
||||
Results: currentStage,
|
||||
})
|
||||
currentStage = nil
|
||||
currentStageType = stageType
|
||||
}
|
||||
|
||||
currentStage = append(currentStage, result)
|
||||
prevAlt = currentAlt
|
||||
}
|
||||
|
||||
// Add the final stage
|
||||
if len(currentStage) > 0 {
|
||||
stages = append(stages, StageResult{
|
||||
Stage: currentStageType,
|
||||
Results: currentStage,
|
||||
})
|
||||
}
|
||||
|
||||
return stages
|
||||
}
|
||||
|
||||
func (h *Handler) NewError(ctx context.Context, err error) *api.ErrorStatusCode {
|
||||
if errcode, ok := err.(*errcodes.ErrorCode); ok {
|
||||
resp := api.Error{
|
||||
Message: errcode.Message,
|
||||
}
|
||||
|
||||
if errcode.Details != "" {
|
||||
resp.Details = api.NewOptString(errcode.Details)
|
||||
}
|
||||
|
||||
return &api.ErrorStatusCode{
|
||||
StatusCode: errcode.StatusCode,
|
||||
Response: resp,
|
||||
}
|
||||
}
|
||||
|
||||
return &api.ErrorStatusCode{
|
||||
StatusCode: http.StatusInternalServerError,
|
||||
Response: api.Error{
|
||||
Message: "undefined internal error",
|
||||
Details: api.NewOptString(err.Error()),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (h *Handler) ReadinessCheck(ctx context.Context) (*api.ReadinessResponse, error) {
|
||||
status := api.ReadinessResponseStatusNotReady
|
||||
var lastUpdate time.Time
|
||||
var isFresh bool
|
||||
var errMsg string
|
||||
|
||||
if s, ok := h.svc.(interface {
|
||||
GetGribStatus(ctx context.Context) (ready bool, lastUpdate time.Time, isFresh bool, errMsg string)
|
||||
}); ok {
|
||||
ready, lu, fresh, em := s.GetGribStatus(ctx)
|
||||
lastUpdate = lu
|
||||
isFresh = fresh
|
||||
errMsg = em
|
||||
if ready {
|
||||
status = api.ReadinessResponseStatusOk
|
||||
} else if em != "" {
|
||||
status = api.ReadinessResponseStatusError
|
||||
}
|
||||
} else {
|
||||
errMsg = "service does not implement GetGribStatus"
|
||||
status = api.ReadinessResponseStatusError
|
||||
}
|
||||
|
||||
resp := &api.ReadinessResponse{
|
||||
Status: status,
|
||||
IsFresh: api.NewOptBool(isFresh),
|
||||
LastUpdate: api.NewOptDateTime(lastUpdate),
|
||||
ErrorMessage: api.NewOptString(errMsg),
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
|
@ -1,47 +0,0 @@
|
|||
package rest
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
||||
"git.intra.yksa.space/gsn/predictor/internal/pkg/log"
|
||||
"git.intra.yksa.space/gsn/predictor/internal/transport/middleware"
|
||||
handler "git.intra.yksa.space/gsn/predictor/internal/transport/rest/handler"
|
||||
api "git.intra.yksa.space/gsn/predictor/pkg/rest"
|
||||
"github.com/rs/cors"
|
||||
)
|
||||
|
||||
type Transport struct {
|
||||
cfg *Config
|
||||
srv *api.Server
|
||||
handler *handler.Handler
|
||||
}
|
||||
|
||||
func New(handler *handler.Handler, cfg *Config) (*Transport, error) {
|
||||
srv, err := api.NewServer(
|
||||
handler,
|
||||
api.WithMiddleware(middleware.Logging()),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Transport{
|
||||
srv: srv,
|
||||
cfg: cfg,
|
||||
handler: handler,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (t *Transport) Run() {
|
||||
log.Ctx(context.Background()).Info("started")
|
||||
|
||||
mux := http.NewServeMux()
|
||||
mux.Handle("/", t.srv)
|
||||
cors.AllowAll().Handler(mux)
|
||||
|
||||
if err := http.ListenAndServe(fmt.Sprintf(":%d", t.cfg.Port), t.srv); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue