// Package api is the HTTP surface of the service. Every REST operation is // defined in the OpenAPI spec (api/rest/predictor.swagger.yml) and served by // the ogen-generated server in pkg/rest; this package implements the // generated Handler interface and wires the server together with the // non-OpenAPI endpoints (Prometheus metrics, ReDoc docs). package api import ( "context" "fmt" "net/http" "time" "go.uber.org/zap" "predictor-refactored/internal/api/async" "predictor-refactored/internal/api/docs" "predictor-refactored/internal/api/middleware" "predictor-refactored/internal/datasets" "predictor-refactored/internal/elevation" "predictor-refactored/internal/metrics" "predictor-refactored/internal/windviz" apirest "predictor-refactored/pkg/rest" ) // Server is the top-level HTTP server. type Server struct { port int mux *http.ServeMux async *async.Manager log *zap.Logger } // Deps are the runtime dependencies the API layer needs. type Deps struct { Manager *datasets.Manager Elevation *elevation.Dataset Metrics metrics.Sink MetricsHandler http.Handler // optional; mounted at MetricsPath when non-nil MetricsPath string EnableWind bool WindCache *windviz.Cache // optional; created if nil and EnableWind AsyncWorkers int AsyncQueueSize int AsyncResultTTL time.Duration Log *zap.Logger } // New wires the HTTP server. The returned Server is not yet started. func New(port int, d Deps) (*Server, error) { if d.Log == nil { d.Log = zap.NewNop() } if d.Metrics == nil { d.Metrics = metrics.Noop() } if d.EnableWind && d.WindCache == nil { d.WindCache = windviz.NewCache(64, 10*time.Minute) } h := &Handler{ mgr: d.Manager, elev: d.Elevation, metrics: d.Metrics, cache: d.WindCache, started: time.Now().UTC(), log: d.Log, } // The async worker pool runs the same prediction core as the synchronous // endpoint; inject it so async stays decoupled from the wire types. h.async = async.New(async.Config{ Workers: d.AsyncWorkers, QueueSize: d.AsyncQueueSize, ResultTTL: d.AsyncResultTTL, }, h.runPredictionV2, d.Metrics, d.Log) ogenSrv, err := apirest.NewServer(h, apirest.WithMiddleware(middleware.OgenLogging(d.Log))) if err != nil { return nil, fmt.Errorf("create ogen server: %w", err) } mux := http.NewServeMux() // Liveness: always 200 while the process is up, independent of whether a // dataset is loaded. Container/orchestrator health checks use this; the // readiness of the data plane is /ready (an OpenAPI operation). mux.HandleFunc("GET /health", func(w http.ResponseWriter, _ *http.Request) { w.Header().Set("Content-Type", "application/json") _, _ = w.Write([]byte(`{"status":"alive"}`)) }) docs.New().Register(mux) if d.MetricsHandler != nil && d.MetricsPath != "" { mux.Handle(d.MetricsPath, d.MetricsHandler) } // The ogen server owns every OpenAPI route; mount it last as the catch-all. mux.Handle("/", ogenSrv) return &Server{port: port, mux: mux, async: h.async, log: d.Log}, nil } // Run starts the HTTP server and blocks until ctx is cancelled or the server // fails. The handler chain is CORS → mux (ogen routes + docs + metrics). func (s *Server) Run(ctx context.Context) error { srv := &http.Server{ Addr: fmt.Sprintf(":%d", s.port), Handler: middleware.CORS(s.mux), } errCh := make(chan error, 1) go func() { s.log.Info("HTTP server starting", zap.Int("port", s.port)) errCh <- srv.ListenAndServe() }() select { case err := <-errCh: return err case <-ctx.Done(): shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() return srv.Shutdown(shutdownCtx) } } // Close releases background resources (the async worker pool). func (s *Server) Close() { if s.async != nil { s.async.Close() } }