package api import ( "context" "net/http" "runtime" "time" "predictor-refactored/internal/datasets" apirest "predictor-refactored/pkg/rest" ) // ListDatasets implements GET /api/v1/admin/datasets. func (h *Handler) ListDatasets(_ context.Context) (*apirest.DatasetList, error) { stored, err := h.mgr.ListEpochs() if err != nil { return nil, apiError(http.StatusInternalServerError, err.Error()) } loaded := make(map[string]datasets.LoadedDatasetInfo) for _, ld := range h.mgr.LoadedDatasets() { loaded[ld.ID.Filename()] = ld } out := &apirest.DatasetList{Source: h.mgr.Source(), Datasets: make([]apirest.DatasetEntry, 0, len(stored))} for _, id := range stored { entry := apirest.DatasetEntry{ Filename: id.Filename(), Epoch: id.Epoch.UTC(), } if !id.Subset.IsGlobal() { entry.Subset = apirest.NewOptSubsetSpec(subsetToAPI(id.Subset)) } if ld, ok := loaded[id.Filename()]; ok { entry.Loaded = true entry.Coverage = apirest.NewOptCoverage(coverageToAPI(ld.Coverage)) } out.Datasets = append(out.Datasets, entry) } return out, nil } // TriggerDatasetDownload implements POST /api/v1/admin/datasets. func (h *Handler) TriggerDatasetDownload(ctx context.Context, req *apirest.DownloadRequest) (*apirest.DownloadAccepted, error) { if req.Latest.Or(false) { dctx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() jobID, err := h.mgr.Refresh(dctx, 0) if err != nil { return nil, apiError(http.StatusInternalServerError, err.Error()) } return &apirest.DownloadAccepted{JobID: jobID}, nil } epoch, ok := req.Epoch.Get() if !ok { return nil, apiError(http.StatusBadRequest, "specify either epoch or latest=true") } id := datasets.DatasetID{Epoch: epoch.UTC()} if s, ok := req.Subset.Get(); ok { id.Subset = subsetFromAPI(s) } return &apirest.DownloadAccepted{JobID: h.mgr.Download(id)}, nil } // DeleteDataset implements DELETE /api/v1/admin/datasets/{name}. func (h *Handler) DeleteDataset(_ context.Context, params apirest.DeleteDatasetParams) error { stored, err := h.mgr.ListEpochs() if err != nil { return apiError(http.StatusInternalServerError, err.Error()) } for _, id := range stored { if id.Filename() == params.Name { if err := h.mgr.Remove(id); err != nil { return apiError(http.StatusInternalServerError, err.Error()) } return nil } } return apiError(http.StatusNotFound, "dataset not found") } // ListDatasetJobs implements GET /api/v1/admin/jobs. func (h *Handler) ListDatasetJobs(_ context.Context) ([]apirest.DownloadJob, error) { jobs := h.mgr.ListJobs() out := make([]apirest.DownloadJob, 0, len(jobs)) for _, j := range jobs { out = append(out, downloadJobToAPI(j)) } return out, nil } // GetDatasetJob implements GET /api/v1/admin/jobs/{id}. func (h *Handler) GetDatasetJob(_ context.Context, params apirest.GetDatasetJobParams) (*apirest.DownloadJob, error) { j, ok := h.mgr.GetJob(params.ID) if !ok { return nil, apiError(http.StatusNotFound, "job not found") } dto := downloadJobToAPI(j) return &dto, nil } // CancelDatasetJob implements DELETE /api/v1/admin/jobs/{id}. func (h *Handler) CancelDatasetJob(_ context.Context, params apirest.CancelDatasetJobParams) error { if !h.mgr.CancelJob(params.ID) { return apiError(http.StatusConflict, "job not found or already terminal") } return nil } // GetServiceStatus implements GET /api/v1/admin/status. func (h *Handler) GetServiceStatus(_ context.Context) (*apirest.StatusResponse, error) { jobs := h.mgr.ListJobs() stored, _ := h.mgr.ListEpochs() loaded := h.mgr.LoadedDatasets() byStatus := apirest.StatusResponseJobsByStatus{} for _, j := range jobs { byStatus[string(j.Status)]++ } var mem runtime.MemStats runtime.ReadMemStats(&mem) return &apirest.StatusResponse{ Source: h.mgr.Source(), Uptime: time.Since(h.started).Round(time.Second).String(), Goroutines: runtime.NumGoroutine(), MemoryMB: int64(mem.Alloc / 1024 / 1024), JobsByStatus: byStatus, StoredDatasets: len(stored), LoadedDatasets: len(loaded), }, nil } // --- dataset mapping helpers ---------------------------------------------- func downloadJobToAPI(j datasets.JobInfo) apirest.DownloadJob { dto := apirest.DownloadJob{ ID: j.ID, Source: j.Source, Dataset: j.Dataset.Filename(), Epoch: j.Dataset.Epoch.UTC(), Status: apirest.DownloadJobStatus(j.Status), StartedAt: j.StartedAt.UTC(), TotalUnits: j.Total, DoneUnits: j.Done, Bytes: j.Bytes, } if j.EndedAt != nil { dto.EndedAt = apirest.NewOptDateTime(j.EndedAt.UTC()) } if j.Err != "" { dto.Error = apirest.NewOptString(j.Err) } return dto } func subsetToAPI(s datasets.SubsetSpec) apirest.SubsetSpec { out := apirest.SubsetSpec{Members: s.Members} if s.Region != nil { out.Region = apirest.NewOptRegion(regionToAPI(*s.Region)) } if s.HourRange != nil { out.HourRange = apirest.NewOptHourRange(apirest.HourRange{MinHour: s.HourRange.MinHour, MaxHour: s.HourRange.MaxHour}) } return out } func subsetFromAPI(s apirest.SubsetSpec) datasets.SubsetSpec { out := datasets.SubsetSpec{Members: s.Members} if r, ok := s.Region.Get(); ok { out.Region = &datasets.Region{MinLat: r.MinLat, MaxLat: r.MaxLat, MinLng: r.MinLng, MaxLng: r.MaxLng} } if hr, ok := s.HourRange.Get(); ok { out.HourRange = &datasets.HourRange{MinHour: hr.MinHour, MaxHour: hr.MaxHour} } return out } func regionToAPI(r datasets.Region) apirest.Region { return apirest.Region{MinLat: r.MinLat, MaxLat: r.MaxLat, MinLng: r.MinLng, MaxLng: r.MaxLng} } func coverageToAPI(c datasets.Coverage) apirest.Coverage { return apirest.Coverage{ Region: regionToAPI(c.Region), StartTime: c.StartTime.UTC(), EndTime: c.EndTime.UTC(), } }