predictor/internal/api/prediction.go

239 lines
8.5 KiB
Go

package api
import (
"context"
"net/http"
"time"
"predictor-refactored/internal/engine"
"predictor-refactored/internal/weather"
apirest "predictor-refactored/pkg/rest"
)
// ReadinessCheck implements GET /ready.
func (h *Handler) ReadinessCheck(_ context.Context) (*apirest.ReadinessResponse, error) {
resp := &apirest.ReadinessResponse{}
if field := h.mgr.Active(); field != nil {
resp.Status = apirest.ReadinessResponseStatusOk
resp.DatasetTime = apirest.NewOptDateTime(field.Epoch())
} else {
resp.Status = apirest.ReadinessResponseStatusNotReady
resp.ErrorMessage = apirest.NewOptString("no dataset loaded")
}
return resp, nil
}
// PerformPredictionV2 implements POST /api/v2/prediction.
func (h *Handler) PerformPredictionV2(_ context.Context, req *apirest.PredictionV2Request) (*apirest.PredictionV2Response, error) {
resp, err := h.runPredictionV2(req)
if err == nil {
h.metrics.Prediction("v2", resp.CompletedAt.Sub(resp.StartedAt), nil)
}
return resp, err
}
// CreatePredictionJob implements POST /api/v1/predictions.
func (h *Handler) CreatePredictionJob(_ context.Context, req *apirest.PredictionV2Request) (*apirest.PredictionJob, error) {
info, accepted := h.async.Enqueue(req)
if !accepted {
return nil, apiError(http.StatusServiceUnavailable, info.Error)
}
return asyncJobToAPI(info), nil
}
// GetPredictionJob implements GET /api/v1/predictions/{id}.
func (h *Handler) GetPredictionJob(_ context.Context, params apirest.GetPredictionJobParams) (*apirest.PredictionJob, error) {
info, ok := h.async.Get(params.ID)
if !ok {
return nil, apiError(http.StatusNotFound, "prediction job not found")
}
return asyncJobToAPI(info), nil
}
// CancelPredictionJob implements DELETE /api/v1/predictions/{id}.
func (h *Handler) CancelPredictionJob(_ context.Context, params apirest.CancelPredictionJobParams) error {
if !h.async.Cancel(params.ID) {
return apiError(http.StatusConflict, "job not found or already terminal")
}
return nil
}
// runPredictionV2 is the synchronous prediction core, shared by the v2
// endpoint and the async worker pool.
func (h *Handler) runPredictionV2(req *apirest.PredictionV2Request) (*apirest.PredictionV2Response, error) {
// Validate the request shape before checking dataset availability, so a
// malformed request is a 400 regardless of startup state.
lat := req.Launch.Latitude
rawLng := req.Launch.Longitude
alt := req.Launch.Altitude.Or(0)
if lat < -90 || lat > 90 {
return nil, apiError(http.StatusBadRequest, "launch.latitude must be in [-90, 90]")
}
if rawLng < -180 || rawLng >= 360 {
return nil, apiError(http.StatusBadRequest, "launch.longitude must be in [-180, 360)")
}
lng := normalizeLng(rawLng)
field := h.mgr.Active()
if field == nil {
return nil, apiError(http.StatusServiceUnavailable, "no dataset loaded, service is starting up")
}
events := engine.NewEventSink()
deps := engine.BuildDeps{Wind: field, Events: events, Terrain: h.terrain()}
prof, err := buildProfile(req, deps)
if err != nil {
return nil, apiError(http.StatusBadRequest, err.Error())
}
started := time.Now().UTC()
results := prof.Run(float64(req.Launch.Time.Unix()), engine.State{Lat: lat, Lng: lng, Altitude: alt}, events)
completed := time.Now().UTC()
resp := &apirest.PredictionV2Response{
Stages: make([]apirest.StageResult, 0, len(results)),
Events: eventsToAPI(events.Snapshot()),
Dataset: apirest.DatasetInfo{Source: field.Source(), Epoch: field.Epoch()},
StartedAt: started,
CompletedAt: completed,
}
for _, r := range results {
resp.Stages = append(resp.Stages, stageResultToAPI(r))
}
return resp, nil
}
// PerformPrediction implements GET /api/v1/prediction (Tawhiri-compatible).
func (h *Handler) PerformPrediction(_ context.Context, params apirest.PerformPredictionParams) (*apirest.PredictionResponse, error) {
field := h.mgr.Active()
if field == nil {
return nil, apiError(http.StatusServiceUnavailable, "no dataset loaded, service is starting up")
}
profileKind := "standard_profile"
if p, ok := params.Profile.Get(); ok {
profileKind = string(p)
}
ascentRate := params.AscentRate.Or(5)
descentRate := params.DescentRate.Or(5)
launchAlt := params.LaunchAltitude.Or(0)
lng := normalizeLng(params.LaunchLongitude)
launchTime := float64(params.LaunchDatetime.Unix())
events := engine.NewEventSink()
var stageNames []string
var prof engine.Profile
switch profileKind {
case "standard_profile":
stageNames = []string{"ascent", "descent"}
prof = standardProfile(field, h.terrain(), events, ascentRate, params.BurstAltitude.Or(28000), descentRate)
case "float_profile":
stopTime := params.LaunchDatetime.Add(24 * time.Hour)
if v, ok := params.StopDatetime.Get(); ok {
stopTime = v
}
stageNames = []string{"ascent", "float"}
prof = floatProfile(field, events, ascentRate, params.FloatAltitude.Or(25000), stopTime)
default:
return nil, apiError(http.StatusBadRequest, "unknown profile: "+profileKind)
}
started := time.Now().UTC()
results := prof.Run(launchTime, engine.State{Lat: params.LaunchLatitude, Lng: lng, Altitude: launchAlt}, events)
completed := time.Now().UTC()
h.metrics.Prediction(profileKind, completed.Sub(started), nil)
resp := &apirest.PredictionResponse{
Metadata: apirest.PredictionResponseMetadata{StartDatetime: started, CompleteDatetime: completed},
}
for i, r := range results {
name := "ascent"
if i < len(stageNames) {
name = stageNames[i]
}
resp.Prediction = append(resp.Prediction, tawhiriItem(name, r))
}
resp.Request = apirest.NewOptPredictionResponseRequest(apirest.PredictionResponseRequest{
Dataset: apirest.NewOptString(field.Epoch().Format("2006-01-02T15:04:05Z")),
LaunchLatitude: apirest.NewOptFloat64(params.LaunchLatitude),
LaunchLongitude: apirest.NewOptFloat64(params.LaunchLongitude),
LaunchDatetime: apirest.NewOptString(params.LaunchDatetime.Format(time.RFC3339)),
LaunchAltitude: params.LaunchAltitude,
})
if ev := events.Snapshot(); len(ev) > 0 {
resp.Warnings = apirest.NewOptPredictionResponseWarnings(apirest.PredictionResponseWarnings{})
}
return resp, nil
}
// standardProfile builds the Tawhiri ascent → descent chain.
func standardProfile(field weather.WindField, elev engine.TerrainProvider, events *engine.EventSink, ascentRate, burst, descentRate float64) engine.Profile {
wind := engine.WindTransport(field, events)
descentTerm := []engine.Constraint{engine.Altitude{Op: engine.OpLessEqual, Limit: 0, On: engine.ActionStop}}
if elev != nil {
descentTerm = []engine.Constraint{engine.TerrainContact{Provider: elev, On: engine.ActionStop}}
}
return engine.Profile{
Direction: engine.Forward,
Stages: []*engine.Propagator{
{
Name: "ascent",
Step: 60,
Model: engine.Sum(engine.ConstantRate(ascentRate), wind),
Constraints: []engine.Constraint{engine.Altitude{Op: engine.OpGreaterEqual, Limit: burst, On: engine.ActionStop}},
},
{
Name: "descent",
Step: 60,
Model: engine.Sum(engine.ParachuteDescent(descentRate), wind),
Constraints: descentTerm,
},
},
}
}
// floatProfile builds the Tawhiri ascent → float chain.
func floatProfile(field weather.WindField, events *engine.EventSink, ascentRate, floatAlt float64, stopTime time.Time) engine.Profile {
wind := engine.WindTransport(field, events)
return engine.Profile{
Direction: engine.Forward,
Stages: []*engine.Propagator{
{
Name: "ascent",
Step: 60,
Model: engine.Sum(engine.ConstantRate(ascentRate), wind),
Constraints: []engine.Constraint{engine.Altitude{Op: engine.OpGreaterEqual, Limit: floatAlt, On: engine.ActionStop}},
},
{
Name: "float",
Step: 60,
Model: wind,
Constraints: []engine.Constraint{engine.Time{Op: engine.OpGreater, Limit: float64(stopTime.Unix()), On: engine.ActionStop}},
},
},
}
}
// tawhiriItem maps one engine stage result to a v1 prediction item.
func tawhiriItem(name string, r engine.Result) apirest.PredictionResponsePredictionItem {
stage := apirest.PredictionResponsePredictionItemStageAscent
switch name {
case "descent":
stage = apirest.PredictionResponsePredictionItemStageDescent
case "float":
stage = apirest.PredictionResponsePredictionItemStageFloat
}
n := r.Path.Len()
traj := make([]apirest.TawhiriPoint, 0, n)
for i := range n {
t, p := r.Path.At(i)
traj = append(traj, apirest.TawhiriPoint{
Datetime: time.Unix(int64(t), 0).UTC(),
Latitude: p.Lat,
Longitude: signedLng(p.Lng),
Altitude: p.Altitude,
})
}
return apirest.PredictionResponsePredictionItem{Stage: stage, Trajectory: traj}
}