predictor/internal/api/datasets.go

189 lines
5.6 KiB
Go

package api
import (
"context"
"net/http"
"runtime"
"time"
"predictor-refactored/internal/datasets"
apirest "predictor-refactored/pkg/rest"
)
// ListDatasets implements GET /api/v1/admin/datasets.
func (h *Handler) ListDatasets(_ context.Context) (*apirest.DatasetList, error) {
stored, err := h.mgr.ListEpochs()
if err != nil {
return nil, apiError(http.StatusInternalServerError, err.Error())
}
loaded := make(map[string]datasets.LoadedDatasetInfo)
for _, ld := range h.mgr.LoadedDatasets() {
loaded[ld.ID.Filename()] = ld
}
out := &apirest.DatasetList{Source: h.mgr.Source(), Datasets: make([]apirest.DatasetEntry, 0, len(stored))}
for _, id := range stored {
entry := apirest.DatasetEntry{
Filename: id.Filename(),
Epoch: id.Epoch.UTC(),
}
if !id.Subset.IsGlobal() {
entry.Subset = apirest.NewOptSubsetSpec(subsetToAPI(id.Subset))
}
if ld, ok := loaded[id.Filename()]; ok {
entry.Loaded = true
entry.Coverage = apirest.NewOptCoverage(coverageToAPI(ld.Coverage))
}
out.Datasets = append(out.Datasets, entry)
}
return out, nil
}
// TriggerDatasetDownload implements POST /api/v1/admin/datasets.
func (h *Handler) TriggerDatasetDownload(ctx context.Context, req *apirest.DownloadRequest) (*apirest.DownloadAccepted, error) {
if req.Latest.Or(false) {
dctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
jobID, err := h.mgr.Refresh(dctx, 0)
if err != nil {
return nil, apiError(http.StatusInternalServerError, err.Error())
}
return &apirest.DownloadAccepted{JobID: jobID}, nil
}
epoch, ok := req.Epoch.Get()
if !ok {
return nil, apiError(http.StatusBadRequest, "specify either epoch or latest=true")
}
id := datasets.DatasetID{Epoch: epoch.UTC()}
if s, ok := req.Subset.Get(); ok {
id.Subset = subsetFromAPI(s)
}
return &apirest.DownloadAccepted{JobID: h.mgr.Download(id)}, nil
}
// DeleteDataset implements DELETE /api/v1/admin/datasets/{name}.
func (h *Handler) DeleteDataset(_ context.Context, params apirest.DeleteDatasetParams) error {
stored, err := h.mgr.ListEpochs()
if err != nil {
return apiError(http.StatusInternalServerError, err.Error())
}
for _, id := range stored {
if id.Filename() == params.Name {
if err := h.mgr.Remove(id); err != nil {
return apiError(http.StatusInternalServerError, err.Error())
}
return nil
}
}
return apiError(http.StatusNotFound, "dataset not found")
}
// ListDatasetJobs implements GET /api/v1/admin/jobs.
func (h *Handler) ListDatasetJobs(_ context.Context) ([]apirest.DownloadJob, error) {
jobs := h.mgr.ListJobs()
out := make([]apirest.DownloadJob, 0, len(jobs))
for _, j := range jobs {
out = append(out, downloadJobToAPI(j))
}
return out, nil
}
// GetDatasetJob implements GET /api/v1/admin/jobs/{id}.
func (h *Handler) GetDatasetJob(_ context.Context, params apirest.GetDatasetJobParams) (*apirest.DownloadJob, error) {
j, ok := h.mgr.GetJob(params.ID)
if !ok {
return nil, apiError(http.StatusNotFound, "job not found")
}
dto := downloadJobToAPI(j)
return &dto, nil
}
// CancelDatasetJob implements DELETE /api/v1/admin/jobs/{id}.
func (h *Handler) CancelDatasetJob(_ context.Context, params apirest.CancelDatasetJobParams) error {
if !h.mgr.CancelJob(params.ID) {
return apiError(http.StatusConflict, "job not found or already terminal")
}
return nil
}
// GetServiceStatus implements GET /api/v1/admin/status.
func (h *Handler) GetServiceStatus(_ context.Context) (*apirest.StatusResponse, error) {
jobs := h.mgr.ListJobs()
stored, _ := h.mgr.ListEpochs()
loaded := h.mgr.LoadedDatasets()
byStatus := apirest.StatusResponseJobsByStatus{}
for _, j := range jobs {
byStatus[string(j.Status)]++
}
var mem runtime.MemStats
runtime.ReadMemStats(&mem)
return &apirest.StatusResponse{
Source: h.mgr.Source(),
Uptime: time.Since(h.started).Round(time.Second).String(),
Goroutines: runtime.NumGoroutine(),
MemoryMB: int64(mem.Alloc / 1024 / 1024),
JobsByStatus: byStatus,
StoredDatasets: len(stored),
LoadedDatasets: len(loaded),
}, nil
}
// --- dataset mapping helpers ----------------------------------------------
func downloadJobToAPI(j datasets.JobInfo) apirest.DownloadJob {
dto := apirest.DownloadJob{
ID: j.ID,
Source: j.Source,
Dataset: j.Dataset.Filename(),
Epoch: j.Dataset.Epoch.UTC(),
Status: apirest.DownloadJobStatus(j.Status),
StartedAt: j.StartedAt.UTC(),
TotalUnits: j.Total,
DoneUnits: j.Done,
Bytes: j.Bytes,
}
if j.EndedAt != nil {
dto.EndedAt = apirest.NewOptDateTime(j.EndedAt.UTC())
}
if j.Err != "" {
dto.Error = apirest.NewOptString(j.Err)
}
return dto
}
func subsetToAPI(s datasets.SubsetSpec) apirest.SubsetSpec {
out := apirest.SubsetSpec{Members: s.Members}
if s.Region != nil {
out.Region = apirest.NewOptRegion(regionToAPI(*s.Region))
}
if s.HourRange != nil {
out.HourRange = apirest.NewOptHourRange(apirest.HourRange{MinHour: s.HourRange.MinHour, MaxHour: s.HourRange.MaxHour})
}
return out
}
func subsetFromAPI(s apirest.SubsetSpec) datasets.SubsetSpec {
out := datasets.SubsetSpec{Members: s.Members}
if r, ok := s.Region.Get(); ok {
out.Region = &datasets.Region{MinLat: r.MinLat, MaxLat: r.MaxLat, MinLng: r.MinLng, MaxLng: r.MaxLng}
}
if hr, ok := s.HourRange.Get(); ok {
out.HourRange = &datasets.HourRange{MinHour: hr.MinHour, MaxHour: hr.MaxHour}
}
return out
}
func regionToAPI(r datasets.Region) apirest.Region {
return apirest.Region{MinLat: r.MinLat, MaxLat: r.MaxLat, MinLng: r.MinLng, MaxLng: r.MaxLng}
}
func coverageToAPI(c datasets.Coverage) apirest.Coverage {
return apirest.Coverage{
Region: regionToAPI(c.Region),
StartTime: c.StartTime.UTC(),
EndTime: c.EndTime.UTC(),
}
}