predictor/internal/api/transport.go

131 lines
3.8 KiB
Go

// Package api is the HTTP surface of the service. Every REST operation is
// defined in the OpenAPI spec (api/rest/predictor.swagger.yml) and served by
// the ogen-generated server in pkg/rest; this package implements the
// generated Handler interface and wires the server together with the
// non-OpenAPI endpoints (Prometheus metrics, ReDoc docs).
package api
import (
"context"
"fmt"
"net/http"
"time"
"go.uber.org/zap"
"predictor-refactored/internal/api/async"
"predictor-refactored/internal/api/docs"
"predictor-refactored/internal/api/middleware"
"predictor-refactored/internal/datasets"
"predictor-refactored/internal/elevation"
"predictor-refactored/internal/metrics"
"predictor-refactored/internal/windviz"
apirest "predictor-refactored/pkg/rest"
)
// Server is the top-level HTTP server.
type Server struct {
port int
mux *http.ServeMux
async *async.Manager
log *zap.Logger
}
// Deps are the runtime dependencies the API layer needs.
type Deps struct {
Manager *datasets.Manager
Elevation *elevation.Dataset
Metrics metrics.Sink
MetricsHandler http.Handler // optional; mounted at MetricsPath when non-nil
MetricsPath string
EnableWind bool
WindCache *windviz.Cache // optional; created if nil and EnableWind
AsyncWorkers int
AsyncQueueSize int
AsyncResultTTL time.Duration
Log *zap.Logger
}
// New wires the HTTP server. The returned Server is not yet started.
func New(port int, d Deps) (*Server, error) {
if d.Log == nil {
d.Log = zap.NewNop()
}
if d.Metrics == nil {
d.Metrics = metrics.Noop()
}
if d.EnableWind && d.WindCache == nil {
d.WindCache = windviz.NewCache(64, 10*time.Minute)
}
h := &Handler{
mgr: d.Manager,
elev: d.Elevation,
metrics: d.Metrics,
cache: d.WindCache,
started: time.Now().UTC(),
log: d.Log,
}
// The async worker pool runs the same prediction core as the synchronous
// endpoint; inject it so async stays decoupled from the wire types.
h.async = async.New(async.Config{
Workers: d.AsyncWorkers,
QueueSize: d.AsyncQueueSize,
ResultTTL: d.AsyncResultTTL,
}, h.runPredictionV2, d.Metrics, d.Log)
ogenSrv, err := apirest.NewServer(h, apirest.WithMiddleware(middleware.OgenLogging(d.Log)))
if err != nil {
return nil, fmt.Errorf("create ogen server: %w", err)
}
mux := http.NewServeMux()
// Liveness: always 200 while the process is up, independent of whether a
// dataset is loaded. Container/orchestrator health checks use this; the
// readiness of the data plane is /ready (an OpenAPI operation).
mux.HandleFunc("GET /health", func(w http.ResponseWriter, _ *http.Request) {
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{"status":"alive"}`))
})
docs.New().Register(mux)
if d.MetricsHandler != nil && d.MetricsPath != "" {
mux.Handle(d.MetricsPath, d.MetricsHandler)
}
// The ogen server owns every OpenAPI route; mount it last as the catch-all.
mux.Handle("/", ogenSrv)
return &Server{port: port, mux: mux, async: h.async, log: d.Log}, nil
}
// Run starts the HTTP server and blocks until ctx is cancelled or the server
// fails. The handler chain is CORS → mux (ogen routes + docs + metrics).
func (s *Server) Run(ctx context.Context) error {
srv := &http.Server{
Addr: fmt.Sprintf(":%d", s.port),
Handler: middleware.CORS(s.mux),
}
errCh := make(chan error, 1)
go func() {
s.log.Info("HTTP server starting", zap.Int("port", s.port))
errCh <- srv.ListenAndServe()
}()
select {
case err := <-errCh:
return err
case <-ctx.Done():
shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
return srv.Shutdown(shutdownCtx)
}
}
// Close releases background resources (the async worker pool).
func (s *Server) Close() {
if s.async != nil {
s.async.Close()
}
}