diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..287fc68 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,55 @@ +# Git +.git +.gitignore + +# Docker +Dockerfile +docker-compose.yml +.dockerignore + +# Documentation +README.md +*.md + +# IDE +.vscode/ +.idea/ +*.swp +*.swo +*~ + +# OS +.DS_Store +.DS_Store? +._* +.Spotlight-V100 +.Trashes +ehthumbs.db +Thumbs.db + +# Environment files +.env +.env.local +.env.*.local + +# Build artifacts +predictor +*.exe +*.exe~ +*.dll +*.so +*.dylib +*.test + +# Logs +*.log + +# Temporary files +/tmp/ +/temp/ + +# Test coverage +*.out + +# Go workspace +go.work \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0b60242 --- /dev/null +++ b/.gitignore @@ -0,0 +1,50 @@ +# Binaries for programs and plugins +*.exe +*.exe~ +*.dll +*.so +*.dylib + +# Test binary, built with `go test -c` +*.test + +# Output of the go coverage tool, specifically when used with LiteIDE +*.out + +# Dependency directories (remove the comment below to include it) +# vendor/ + +# Go workspace file +go.work + +# Environment variables +.env +.env.local +.env.*.local + +# IDE files +.vscode/ +.idea/ +*.swp +*.swo +*~ + +# OS generated files +.DS_Store +.DS_Store? +._* +.Spotlight-V100 +.Trashes +ehthumbs.db +Thumbs.db + +# Logs +*.log + +# Temporary files +/tmp/ +/temp/ + +# Build artifacts +/build/ +/dist/ \ No newline at end of file diff --git a/DEPLOYMENT.md b/DEPLOYMENT.md new file mode 100644 index 0000000..00caf0b --- /dev/null +++ b/DEPLOYMENT.md @@ -0,0 +1,501 @@ +# Deployment Guide + +This guide covers deploying the Predictor Service using Docker and Docker Compose. + +## Prerequisites + +- Docker Engine 20.10+ +- Docker Compose 2.0+ +- At least 2GB RAM available +- 10GB free disk space + +## Quick Deployment + +### 1. Clone and Setup + +```bash +git clone +cd predictor +``` + +### 2. Validate Configuration + +```bash +# Validate Docker configuration +./scripts/validate-docker.sh +``` + +### 3. Deploy + +```bash +# Build and start services +make up-build + +# Check status +make ps + +# View logs +make logs +``` + +## Production Deployment + +### Environment Configuration + +1. **Copy environment template:** + ```bash + cp cmd/api/.env cmd/api/.env.production + ``` + +2. **Edit production environment:** + ```bash + nano cmd/api/.env.production + ``` + +3. **Key production settings:** + ```bash + # Security + GSN_PREDICTOR_REDIS_PASSWORD=your_secure_password + + # Performance + GSN_PREDICTOR_GRIB_PARALLEL=8 + GSN_PREDICTOR_GRIB_CACHE_TTL=2h + + # Monitoring + GSN_PREDICTOR_GRIB_UPDATER_INTERVAL=3h + ``` + +### Production Docker Compose + +Create `docker-compose.prod.yml`: + +```yaml +version: '3.8' + +services: + predictor: + build: + context: . + dockerfile: Dockerfile + container_name: predictor-prod + ports: + - "8080:8080" + env_file: + - cmd/api/.env.production + volumes: + - grib_data:/tmp/grib + depends_on: + redis: + condition: service_healthy + networks: + - predictor-network + restart: unless-stopped + deploy: + resources: + limits: + memory: 1G + cpus: '0.5' + reservations: + memory: 512M + cpus: '0.25' + healthcheck: + test: ["CMD", "wget", "--no-verbose", "--tries=1", "--spider", "http://localhost:8080/health"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 40s + + redis: + image: redis:7.2-alpine + container_name: predictor-redis-prod + ports: + - "6379:6379" + volumes: + - redis_data:/data + networks: + - predictor-network + restart: unless-stopped + command: redis-server --appendonly yes --maxmemory 512mb --maxmemory-policy allkeys-lru --requirepass ${GSN_PREDICTOR_REDIS_PASSWORD} + healthcheck: + test: ["CMD", "redis-cli", "-a", "${GSN_PREDICTOR_REDIS_PASSWORD}", "ping"] + interval: 10s + timeout: 3s + retries: 5 + start_period: 10s + +volumes: + grib_data: + driver: local + redis_data: + driver: local + +networks: + predictor-network: + driver: bridge +``` + +### Deploy to Production + +```bash +# Deploy with production config +docker-compose -f docker-compose.prod.yml up -d + +# Monitor deployment +docker-compose -f docker-compose.prod.yml logs -f + +# Check health +curl http://localhost:8080/health +``` + +## Kubernetes Deployment + +### Create Namespace + +```yaml +# k8s/namespace.yaml +apiVersion: v1 +kind: Namespace +metadata: + name: predictor +``` + +### Redis Deployment + +```yaml +# k8s/redis.yaml +apiVersion: apps/v1 +kind: Deployment +metadata: + name: redis + namespace: predictor +spec: + replicas: 1 + selector: + matchLabels: + app: redis + template: + metadata: + labels: + app: redis + spec: + containers: + - name: redis + image: redis:7.2-alpine + ports: + - containerPort: 6379 + command: ["redis-server", "--appendonly", "yes", "--maxmemory", "512mb", "--maxmemory-policy", "allkeys-lru"] + volumeMounts: + - name: redis-data + mountPath: /data + resources: + limits: + memory: "512Mi" + cpu: "250m" + requests: + memory: "256Mi" + cpu: "100m" + livenessProbe: + exec: + command: ["redis-cli", "ping"] + initialDelaySeconds: 10 + periodSeconds: 10 + readinessProbe: + exec: + command: ["redis-cli", "ping"] + initialDelaySeconds: 5 + periodSeconds: 5 + volumes: + - name: redis-data + persistentVolumeClaim: + claimName: redis-pvc +--- +apiVersion: v1 +kind: Service +metadata: + name: redis + namespace: predictor +spec: + selector: + app: redis + ports: + - port: 6379 + targetPort: 6379 +--- +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: redis-pvc + namespace: predictor +spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 10Gi +``` + +### Predictor Deployment + +```yaml +# k8s/predictor.yaml +apiVersion: apps/v1 +kind: Deployment +metadata: + name: predictor + namespace: predictor +spec: + replicas: 2 + selector: + matchLabels: + app: predictor + template: + metadata: + labels: + app: predictor + spec: + containers: + - name: predictor + image: predictor:latest + ports: + - containerPort: 8080 + env: + - name: GSN_PREDICTOR_REDIS_HOST + value: "redis" + - name: GSN_PREDICTOR_REDIS_PORT + value: "6379" + - name: GSN_PREDICTOR_GRIB_DIR + value: "/tmp/grib" + - name: GSN_PREDICTOR_SCHEDULER_ENABLED + value: "true" + - name: GSN_PREDICTOR_GRIB_UPDATER_INTERVAL + value: "6h" + - name: GSN_PREDICTOR_GRIB_UPDATER_TIMEOUT + value: "45m" + volumeMounts: + - name: grib-data + mountPath: /tmp/grib + resources: + limits: + memory: "1Gi" + cpu: "500m" + requests: + memory: "512Mi" + cpu: "250m" + livenessProbe: + httpGet: + path: /health + port: 8080 + initialDelaySeconds: 40 + periodSeconds: 30 + readinessProbe: + httpGet: + path: /health + port: 8080 + initialDelaySeconds: 10 + periodSeconds: 10 + volumes: + - name: grib-data + emptyDir: {} +--- +apiVersion: v1 +kind: Service +metadata: + name: predictor + namespace: predictor +spec: + selector: + app: predictor + ports: + - port: 80 + targetPort: 8080 + type: LoadBalancer +``` + +### Deploy to Kubernetes + +```bash +# Apply namespace +kubectl apply -f k8s/namespace.yaml + +# Apply Redis +kubectl apply -f k8s/redis.yaml + +# Wait for Redis to be ready +kubectl wait --for=condition=ready pod -l app=redis -n predictor + +# Apply Predictor +kubectl apply -f k8s/predictor.yaml + +# Check status +kubectl get pods -n predictor +kubectl get services -n predictor +``` + +## Monitoring and Logging + +### Health Checks + +The service includes built-in health checks: + +```bash +# Application health +curl http://localhost:8080/health + +# Docker health +docker inspect predictor | jq '.[0].State.Health' + +# Kubernetes health +kubectl describe pod -l app=predictor -n predictor +``` + +### Logging + +```bash +# Docker logs +docker-compose logs -f predictor + +# Kubernetes logs +kubectl logs -f deployment/predictor -n predictor +``` + +### Metrics + +Consider adding Prometheus metrics: + +```yaml +# Add to docker-compose.yml + prometheus: + image: prom/prometheus:latest + ports: + - "9090:9090" + volumes: + - ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml + networks: + - predictor-network +``` + +## Backup and Recovery + +### Redis Backup + +```bash +# Create backup +docker exec predictor-redis redis-cli BGSAVE + +# Copy backup file +docker cp predictor-redis:/data/dump.rdb ./backup/redis-$(date +%Y%m%d).rdb +``` + +### GRIB Data Backup + +```bash +# Backup GRIB data +docker run --rm -v predictor_grib_data:/data -v $(pwd)/backup:/backup alpine tar czf /backup/grib-$(date +%Y%m%d).tar.gz -C /data . +``` + +### Automated Backup Script + +```bash +#!/bin/bash +# scripts/backup.sh + +BACKUP_DIR="./backup/$(date +%Y%m%d)" +mkdir -p $BACKUP_DIR + +# Redis backup +docker exec predictor-redis redis-cli BGSAVE +sleep 5 +docker cp predictor-redis:/data/dump.rdb $BACKUP_DIR/redis.rdb + +# GRIB data backup +docker run --rm -v predictor_grib_data:/data -v $(pwd)/$BACKUP_DIR:/backup alpine tar czf /backup/grib.tar.gz -C /data . + +echo "Backup completed: $BACKUP_DIR" +``` + +## Troubleshooting + +### Common Issues + +1. **Redis Connection Issues:** + ```bash + # Check Redis status + docker-compose exec redis redis-cli ping + + # Check network connectivity + docker-compose exec predictor wget -O- http://redis:6379 + ``` + +2. **GRIB Download Failures:** + ```bash + # Check disk space + docker-compose exec predictor df -h /tmp/grib + + # Check internet connectivity + docker-compose exec predictor wget -O- https://nomads.ncep.noaa.gov/ + ``` + +3. **Memory Issues:** + ```bash + # Check memory usage + docker stats + + # Check container logs + docker-compose logs predictor | grep -i memory + ``` + +### Performance Tuning + +1. **Redis Optimization:** + ```bash + # Increase Redis memory + GSN_PREDICTOR_REDIS_MAXMEMORY=1gb + + # Optimize Redis settings + redis-server --maxmemory 1gb --maxmemory-policy allkeys-lru + ``` + +2. **GRIB Processing:** + ```bash + # Increase parallel workers + GSN_PREDICTOR_GRIB_PARALLEL=8 + + # Optimize cache TTL + GSN_PREDICTOR_GRIB_CACHE_TTL=2h + ``` + +3. **Container Resources:** + ```yaml + # In docker-compose.yml + deploy: + resources: + limits: + memory: 2G + cpus: '1.0' + reservations: + memory: 1G + cpus: '0.5' + ``` + +## Security Considerations + +1. **Network Security:** + - Use internal networks for service communication + - Expose only necessary ports + - Use reverse proxy for external access + +2. **Container Security:** + - Run as non-root user + - Use minimal base images + - Regular security updates + +3. **Data Security:** + - Encrypt sensitive environment variables + - Use secrets management for passwords + - Regular backups + +4. **Access Control:** + - Implement API authentication + - Use HTTPS in production + - Monitor access logs +``` \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..8cebb4e --- /dev/null +++ b/Dockerfile @@ -0,0 +1,56 @@ +# Build stage +FROM golang:1.24.4-alpine AS builder + +# Install build dependencies +RUN apk add --no-cache git ca-certificates tzdata + +# Set working directory +WORKDIR /app + +# Copy go mod files +COPY go.mod go.sum ./ + +# Download dependencies +RUN go mod download + +# Copy source code +COPY . . + +# Build the application +RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build \ + -ldflags="-w -s" \ + -o predictor \ + ./cmd/api + +# Runtime stage +FROM alpine:3.19 + +# Install runtime dependencies +RUN apk add --no-cache ca-certificates tzdata + +# Create non-root user +RUN addgroup -g 1001 -S appgroup && \ + adduser -u 1001 -S appuser -G appgroup + +# Set working directory +WORKDIR /app + +# Copy binary from builder stage +COPY --from=builder /app/predictor . + +# Create necessary directories +RUN mkdir -p /tmp/grib && \ + chown -R appuser:appgroup /app /tmp/grib + +# Switch to non-root user +USER appuser + +# Expose port +EXPOSE 8080 + +# Health check +HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \ + CMD wget --no-verbose --tries=1 --spider http://localhost:8080/health || exit 1 + +# Run the application +CMD ["./predictor"] \ No newline at end of file diff --git a/Makefile b/Makefile index fef73c5..a8d49b0 100644 --- a/Makefile +++ b/Makefile @@ -1,2 +1,122 @@ +# Variables +IMAGE_NAME = predictor +TAG = latest +COMPOSE_FILE = docker-compose.yml + +# Validate Docker configuration +.PHONY: validate-docker +validate-docker: + ./scripts/validate-docker.sh + +# Build the Docker image +.PHONY: build +build: + docker build -t $(IMAGE_NAME):$(TAG) . + +# Run the application with docker-compose +.PHONY: up +up: + docker-compose -f $(COMPOSE_FILE) up -d + +# Run the application with docker-compose and rebuild +.PHONY: up-build +up-build: + docker-compose -f $(COMPOSE_FILE) up -d --build + +# Stop the application +.PHONY: down +down: + docker-compose -f $(COMPOSE_FILE) down + +# Stop the application and remove volumes +.PHONY: down-volumes +down-volumes: + docker-compose -f $(COMPOSE_FILE) down -v + +# View logs +.PHONY: logs +logs: + docker-compose -f $(COMPOSE_FILE) logs -f + +# View logs for specific service +.PHONY: logs-predictor +logs-predictor: + docker-compose -f $(COMPOSE_FILE) logs -f predictor + +# View logs for Redis +.PHONY: logs-redis +logs-redis: + docker-compose -f $(COMPOSE_FILE) logs -f redis + +# Check service status +.PHONY: ps +ps: + docker-compose -f $(COMPOSE_FILE) ps + +# Execute command in predictor container +.PHONY: exec +exec: + docker-compose -f $(COMPOSE_FILE) exec predictor sh + +# Execute command in Redis container +.PHONY: exec-redis +exec-redis: + docker-compose -f $(COMPOSE_FILE) exec redis sh + +# Clean up Docker resources +.PHONY: clean +clean: + docker-compose -f $(COMPOSE_FILE) down -v --rmi all + docker system prune -f + +# Run tests +.PHONY: test +test: + go test ./... + +# Build locally +.PHONY: build-local +build-local: + go build -o predictor ./cmd/api + +# Run locally (requires Redis) +.PHONY: run-local +run-local: + cd cmd/api && go run . + +# Format code +.PHONY: fmt +fmt: + go fmt ./... + +# Lint code +.PHONY: lint +lint: + golangci-lint run + +# Show help +.PHONY: help +help: + @echo "Available commands:" + @echo " validate-docker - Validate Docker configuration" + @echo " build - Build Docker image" + @echo " up - Start services with docker-compose" + @echo " up-build - Start services and rebuild images" + @echo " down - Stop services" + @echo " down-volumes - Stop services and remove volumes" + @echo " logs - View all logs" + @echo " logs-predictor - View predictor logs" + @echo " logs-redis - View Redis logs" + @echo " ps - Show service status" + @echo " exec - Execute shell in predictor container" + @echo " exec-redis - Execute shell in Redis container" + @echo " clean - Clean up Docker resources" + @echo " test - Run tests" + @echo " build-local - Build locally" + @echo " run-local - Run locally (requires Redis)" + @echo " fmt - Format code" + @echo " lint - Lint code" + @echo " help - Show this help" + generate-ogen: go run github.com/ogen-go/ogen/cmd/ogen@latest --target pkg/rest -package gsn --clean api/rest/predictor.swagger.yml \ No newline at end of file diff --git a/README.md b/README.md index e69de29..57f6e8c 100644 --- a/README.md +++ b/README.md @@ -0,0 +1,261 @@ +# Predictor Service + +A Go-based weather prediction service that downloads and processes GRIB files for wind vector data extraction. + +## Features + +- **GRIB File Processing**: Downloads and processes GRIB weather data files +- **Wind Vector Extraction**: Extracts wind vector data for given coordinates and time +- **Redis Caching**: Caches extraction results for improved performance +- **Distributed Locking**: Uses Redis-based distributed locks for safe concurrent downloads +- **Scheduled Updates**: Automatic GRIB file updates via configurable scheduler +- **REST API**: HTTP API for data extraction and service management +- **Modular Architecture**: Clean separation of concerns with dependency injection + +## Architecture + +The service follows a modular architecture with clear separation of concerns: + +- **Service Layer**: Business logic and orchestration +- **GRIB Package**: GRIB file processing and data extraction +- **Redis Package**: Caching and distributed locking +- **Scheduler Package**: Job scheduling and execution +- **Transport Layer**: HTTP API handling +- **Jobs**: Background tasks (GRIB updates, etc.) + +## Quick Start with Docker + +### Prerequisites + +- Docker +- Docker Compose + +### Running the Service + +1. **Clone the repository:** + ```bash + git clone + cd predictor + ``` + +2. **Start the services:** + ```bash + # Production + docker-compose up -d + + # Development (with volume mounts) + docker-compose -f docker-compose.dev.yml up -d + ``` + +3. **Check service status:** + ```bash + docker-compose ps + ``` + +4. **View logs:** + ```bash + # All services + docker-compose logs -f + + # Specific service + docker-compose logs -f predictor + docker-compose logs -f redis + ``` + +### Using Make Commands + +The project includes a Makefile for common operations: + +```bash +# Build and start services +make up-build + +# View logs +make logs + +# Stop services +make down + +# Clean up everything +make clean + +# Show all available commands +make help +``` + +## Configuration + +The service is configured via environment variables with the prefix `GSN_PREDICTOR_`: + +### GRIB Configuration +- `GSN_PREDICTOR_GRIB_DIR`: Directory for GRIB files (default: `/tmp/grib`) +- `GSN_PREDICTOR_GRIB_TTL`: GRIB file TTL (default: `24h`) +- `GSN_PREDICTOR_GRIB_CACHE_TTL`: Cache TTL (default: `1h`) +- `GSN_PREDICTOR_GRIB_PARALLEL`: Parallel download workers (default: `4`) +- `GSN_PREDICTOR_GRIB_TIMEOUT`: Download timeout (default: `30s`) +- `GSN_PREDICTOR_GRIB_DATASET_URL`: GRIB data source URL + +### Redis Configuration +- `GSN_PREDICTOR_REDIS_HOST`: Redis host (default: `localhost`) +- `GSN_PREDICTOR_REDIS_PORT`: Redis port (default: `6379`) +- `GSN_PREDICTOR_REDIS_PASSWORD`: Redis password (default: empty) +- `GSN_PREDICTOR_REDIS_DB`: Redis database (default: `0`) + +### Scheduler Configuration +- `GSN_PREDICTOR_SCHEDULER_ENABLED`: Enable scheduler (default: `true`) + +### GRIB Updater Job Configuration +- `GSN_PREDICTOR_GRIB_UPDATER_INTERVAL`: Update interval (default: `6h`) +- `GSN_PREDICTOR_GRIB_UPDATER_TIMEOUT`: Update timeout (default: `45m`) + +### REST Transport Configuration +- `GSN_PREDICTOR_REST_HOST`: HTTP host (default: `0.0.0.0`) +- `GSN_PREDICTOR_REST_PORT`: HTTP port (default: `8080`) +- `GSN_PREDICTOR_REST_READ_TIMEOUT`: Read timeout (default: `30s`) +- `GSN_PREDICTOR_REST_WRITE_TIMEOUT`: Write timeout (default: `30s`) +- `GSN_PREDICTOR_REST_IDLE_TIMEOUT`: Idle timeout (default: `60s`) + +## API Endpoints + +The service exposes a REST API for wind data extraction: + +- `GET /health` - Health check endpoint +- `POST /predict` - Extract wind data for given coordinates and time + +### Example API Usage + +```bash +# Health check +curl http://localhost:8080/health + +# Extract wind data +curl -X POST http://localhost:8080/predict \ + -H "Content-Type: application/json" \ + -d '{ + "latitude": 40.7128, + "longitude": -74.0060, + "altitude": 100, + "timestamp": "2024-01-15T12:00:00Z" + }' +``` + +## Development + +### Local Development + +1. **Install dependencies:** + ```bash + go mod download + ``` + +2. **Start Redis:** + ```bash + docker run -d --name redis -p 6379:6379 redis:7.2-alpine + ``` + +3. **Set environment variables:** + ```bash + cd cmd/api + source .env + ``` + +4. **Run the service:** + ```bash + go run . + ``` + +### Building Locally + +```bash +# Build the binary +make build-local + +# Run tests +make test + +# Format code +make fmt + +# Lint code +make lint +``` + +### Docker Development + +For development with hot reloading: + +```bash +# Start development environment +docker-compose -f docker-compose.dev.yml up -d + +# View logs +docker-compose -f docker-compose.dev.yml logs -f predictor +``` + +## Docker Best Practices + +The Dockerfile follows Go best practices: + +- **Multi-stage build**: Separate builder and runtime stages +- **Non-root user**: Runs as non-root user for security +- **Minimal runtime**: Uses Alpine Linux for smaller image size +- **Health checks**: Built-in health monitoring +- **Optimized layers**: Efficient layer caching +- **Security**: No unnecessary packages or permissions + +## Monitoring and Health Checks + +The service includes built-in health checks: + +- **Application health**: HTTP endpoint at `/health` +- **Docker health**: Container health check with wget +- **Redis health**: Redis ping health check +- **Service dependencies**: Proper startup order with health checks + +## Troubleshooting + +### Common Issues + +1. **Redis connection refused:** + - Ensure Redis is running: `docker-compose ps` + - Check Redis logs: `docker-compose logs redis` + - Verify network connectivity + +2. **GRIB download failures:** + - Check internet connectivity + - Verify GRIB data source URL + - Check disk space in `/tmp/grib` + +3. **Service not starting:** + - Check logs: `docker-compose logs predictor` + - Verify environment variables + - Check port conflicts + +### Debug Commands + +```bash +# Execute shell in container +docker-compose exec predictor sh + +# Check Redis connectivity +docker-compose exec redis redis-cli ping + +# View container resources +docker stats + +# Check network connectivity +docker-compose exec predictor wget -O- http://redis:6379 +``` + +## Contributing + +1. Fork the repository +2. Create a feature branch +3. Make your changes +4. Add tests +5. Run tests and linting +6. Submit a pull request + +## License + +[Add your license information here] diff --git a/cmd/api/main.go b/cmd/api/main.go index a271536..9a53a94 100644 --- a/cmd/api/main.go +++ b/cmd/api/main.go @@ -1,27 +1,104 @@ package main import ( + "os" + "os/signal" + "syscall" + + "git.intra.yksa.space/gsn/predictor/internal/jobs/grib/updater" + "git.intra.yksa.space/gsn/predictor/internal/pkg/errcodes" + "git.intra.yksa.space/gsn/predictor/internal/pkg/grib" "git.intra.yksa.space/gsn/predictor/internal/service" "git.intra.yksa.space/gsn/predictor/internal/transport/rest" "git.intra.yksa.space/gsn/predictor/internal/transport/rest/handler" + "git.intra.yksa.space/gsn/predictor/pkg/redis" + "git.intra.yksa.space/gsn/predictor/pkg/scheduler" + env "github.com/caarlos0/env/v11" "go.uber.org/zap" ) -const ( - servicePrefix = "PREDICTOR" -) +const servicePrefix = "GSN_PREDICTOR" func main() { lg, err := zap.NewProduction() if err != nil { panic(err) } + defer lg.Sync() - svc := service.New() + // Load configuration from environment with service prefix + cfg, err := loadConfig() + if err != nil { + lg.Fatal("failed to load configuration", zap.Error(err)) + } + // Load scheduler configuration + schedulerConfig, err := loadSchedulerConfig() + if err != nil { + lg.Fatal("failed to load scheduler configuration", zap.Error(err)) + } + + // Load GRIB updater job configuration + gribUpdaterConfig, err := loadGribUpdaterConfig() + if err != nil { + lg.Fatal("failed to load GRIB updater configuration", zap.Error(err)) + } + + // Initialize Redis service + redisService, err := redis.New(cfg.Redis) + if err != nil { + lg.Fatal("failed to initialize Redis service", zap.Error(err)) + } + defer redisService.Close() + + // Initialize GRIB service + gribService, err := grib.New(grib.ServiceConfig{ + Dir: cfg.Grib.Dir, + TTL: cfg.Grib.TTL, + CacheTTL: cfg.Grib.CacheTTL, + Redis: redisService, + Parallel: cfg.Grib.Parallel, + Client: cfg.CreateHTTPClient(), + }) + if err != nil { + lg.Fatal("failed to initialize GRIB service", zap.Error(err)) + } + defer gribService.Close() + + // Initialize service with dependencies + svc, err := service.New(cfg, gribService, redisService, lg) + if err != nil { + lg.Fatal("failed to initialize service", zap.Error(err)) + } + defer svc.Close() + + // Initialize scheduler + var sched *scheduler.Scheduler + if schedulerConfig.Enabled { + sched = scheduler.New(lg) + + // Add GRIB update job + gribJob := updater.New(gribService, gribUpdaterConfig, lg) + if err := sched.AddJob(gribJob); err != nil { + lg.Error("failed to add GRIB update job to scheduler", zap.Error(err)) + } + + // TODO: Add more jobs here as needed + // Example: + // cleanupConfig := cleanup.NewConfig() + // cleanupJob := cleanup.New(svc, cleanupConfig, lg) + // if err := sched.AddJob(cleanupJob); err != nil { + // lg.Error("failed to add cleanup job to scheduler", zap.Error(err)) + // } + + lg.Info("scheduler initialized with jobs") + } + + // Initialize handler handler := handler.New(svc) - restConfig, err := rest.NewConfig(servicePrefix) + // Initialize transport + restConfig, err := loadRestConfig() if err != nil { lg.Fatal("failed to init transport config", zap.Error(err)) } @@ -31,11 +108,92 @@ func main() { lg.Fatal("failed to init transport", zap.Error(err)) } - for { - transport.Run() + // Start service + svc.Start() - if r := recover(); r != nil { - lg.Error("panic occured", zap.Any("recover", r)) - } + // Start scheduler if enabled + if sched != nil { + sched.Start() + lg.Info("scheduler started") + } + + lg.Info("service started successfully", + zap.String("grib_dir", cfg.Grib.Dir), + zap.Duration("grib_ttl", cfg.Grib.TTL), + zap.Duration("grib_cache_ttl", cfg.Grib.CacheTTL), + zap.Int("grib_parallel", cfg.Grib.Parallel), + zap.Bool("scheduler_enabled", schedulerConfig.Enabled), + zap.Duration("grib_update_interval", gribUpdaterConfig.Interval)) + + // Wait for shutdown signal + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + + // Start server in goroutine + go func() { + lg.Info("starting HTTP server") + transport.Run() + }() + + // Wait for shutdown signal + <-sigChan + lg.Info("received shutdown signal, stopping service") + + // Stop scheduler first + if sched != nil { + sched.Stop() + lg.Info("scheduler stopped") } } + +// loadConfig loads configuration from environment with service prefix +func loadConfig() (*service.Config, error) { + cfg := &service.Config{} + + if err := env.ParseWithOptions(cfg, env.Options{ + PrefixTagName: servicePrefix + "_", + }); err != nil { + return nil, errcodes.Wrap(err, "failed to parse configuration") + } + + return cfg, nil +} + +// loadSchedulerConfig loads scheduler configuration from environment +func loadSchedulerConfig() (*scheduler.Config, error) { + cfg := &scheduler.Config{} + + if err := env.ParseWithOptions(cfg, env.Options{ + PrefixTagName: servicePrefix + "_SCHEDULER_", + }); err != nil { + return nil, errcodes.Wrap(err, "failed to parse scheduler configuration") + } + + return cfg, nil +} + +// loadGribUpdaterConfig loads GRIB updater job configuration from environment +func loadGribUpdaterConfig() (*updater.Config, error) { + cfg := &updater.Config{} + + if err := env.ParseWithOptions(cfg, env.Options{ + PrefixTagName: servicePrefix + "_GRIB_UPDATER_", + }); err != nil { + return nil, errcodes.Wrap(err, "failed to parse GRIB updater configuration") + } + + return cfg, nil +} + +// loadRestConfig loads REST transport configuration from environment with service prefix +func loadRestConfig() (*rest.Config, error) { + cfg := &rest.Config{} + + if err := env.ParseWithOptions(cfg, env.Options{ + PrefixTagName: servicePrefix + "_REST_", + }); err != nil { + return nil, errcodes.Wrap(err, "failed to parse REST configuration") + } + + return cfg, nil +} diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml new file mode 100644 index 0000000..4503482 --- /dev/null +++ b/docker-compose.dev.yml @@ -0,0 +1,82 @@ +version: '3.8' + +services: + predictor: + build: + context: . + dockerfile: Dockerfile + container_name: predictor-dev + ports: + - "8080:8080" + environment: + # GRIB Configuration + - GSN_PREDICTOR_GRIB_DIR=/tmp/grib + - GSN_PREDICTOR_GRIB_TTL=24h + - GSN_PREDICTOR_GRIB_CACHE_TTL=1h + - GSN_PREDICTOR_GRIB_PARALLEL=4 + - GSN_PREDICTOR_GRIB_TIMEOUT=30s + - GSN_PREDICTOR_GRIB_DATASET_URL=https://nomads.ncep.noaa.gov/ + + # Redis Configuration + - GSN_PREDICTOR_REDIS_HOST=redis + - GSN_PREDICTOR_REDIS_PORT=6379 + - GSN_PREDICTOR_REDIS_PASSWORD= + - GSN_PREDICTOR_REDIS_DB=0 + + # Scheduler Configuration + - GSN_PREDICTOR_SCHEDULER_ENABLED=true + + # GRIB Updater Job Configuration + - GSN_PREDICTOR_GRIB_UPDATER_INTERVAL=6h + - GSN_PREDICTOR_GRIB_UPDATER_TIMEOUT=45m + + # REST Transport Configuration + - GSN_PREDICTOR_REST_HOST=0.0.0.0 + - GSN_PREDICTOR_REST_PORT=8080 + - GSN_PREDICTOR_REST_READ_TIMEOUT=30s + - GSN_PREDICTOR_REST_WRITE_TIMEOUT=30s + - GSN_PREDICTOR_REST_IDLE_TIMEOUT=60s + volumes: + - grib_data:/tmp/grib + - .:/app + - /app/predictor + depends_on: + redis: + condition: service_healthy + networks: + - predictor-network + restart: unless-stopped + healthcheck: + test: ["CMD", "wget", "--no-verbose", "--tries=1", "--spider", "http://localhost:8080/health"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 40s + + redis: + image: redis:7.2-alpine + container_name: predictor-redis-dev + ports: + - "6379:6379" + volumes: + - redis_data:/data + networks: + - predictor-network + restart: unless-stopped + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 10s + timeout: 3s + retries: 5 + start_period: 10s + command: redis-server --appendonly yes --maxmemory 256mb --maxmemory-policy allkeys-lru + +volumes: + grib_data: + driver: local + redis_data: + driver: local + +networks: + predictor-network: + driver: bridge \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..67c42b7 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,80 @@ +version: '3.8' + +services: + predictor: + build: + context: . + dockerfile: Dockerfile + container_name: predictor + ports: + - "8080:8080" + environment: + # GRIB Configuration + - GSN_PREDICTOR_GRIB_DIR=/tmp/grib + - GSN_PREDICTOR_GRIB_TTL=24h + - GSN_PREDICTOR_GRIB_CACHE_TTL=1h + - GSN_PREDICTOR_GRIB_PARALLEL=4 + - GSN_PREDICTOR_GRIB_TIMEOUT=30s + - GSN_PREDICTOR_GRIB_DATASET_URL=https://nomads.ncep.noaa.gov/ + + # Redis Configuration + - GSN_PREDICTOR_REDIS_HOST=redis + - GSN_PREDICTOR_REDIS_PORT=6379 + - GSN_PREDICTOR_REDIS_PASSWORD= + - GSN_PREDICTOR_REDIS_DB=0 + + # Scheduler Configuration + - GSN_PREDICTOR_SCHEDULER_ENABLED=true + + # GRIB Updater Job Configuration + - GSN_PREDICTOR_GRIB_UPDATER_INTERVAL=6h + - GSN_PREDICTOR_GRIB_UPDATER_TIMEOUT=45m + + # REST Transport Configuration + - GSN_PREDICTOR_REST_HOST=0.0.0.0 + - GSN_PREDICTOR_REST_PORT=8080 + - GSN_PREDICTOR_REST_READ_TIMEOUT=30s + - GSN_PREDICTOR_REST_WRITE_TIMEOUT=30s + - GSN_PREDICTOR_REST_IDLE_TIMEOUT=60s + volumes: + - grib_data:/tmp/grib + depends_on: + redis: + condition: service_healthy + networks: + - predictor-network + restart: unless-stopped + healthcheck: + test: ["CMD", "wget", "--no-verbose", "--tries=1", "--spider", "http://localhost:8080/health"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 40s + + redis: + image: redis:7.2-alpine + container_name: predictor-redis + ports: + - "6379:6379" + volumes: + - redis_data:/data + networks: + - predictor-network + restart: unless-stopped + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 10s + timeout: 3s + retries: 5 + start_period: 10s + command: redis-server --appendonly yes --maxmemory 256mb --maxmemory-policy allkeys-lru + +volumes: + grib_data: + driver: local + redis_data: + driver: local + +networks: + predictor-network: + driver: bridge \ No newline at end of file diff --git a/go.mod b/go.mod index 6d9ab2c..d63646d 100644 --- a/go.mod +++ b/go.mod @@ -4,33 +4,40 @@ go 1.24.4 require ( github.com/caarlos0/env/v11 v11.3.1 + github.com/edsrzf/mmap-go v1.2.0 github.com/go-faster/errors v0.7.1 github.com/go-faster/jx v1.1.0 + github.com/nilsmagnus/grib v1.2.8 github.com/ogen-go/ogen v1.14.0 go.opentelemetry.io/otel v1.36.0 go.opentelemetry.io/otel/metric v1.36.0 go.opentelemetry.io/otel/trace v1.36.0 go.uber.org/zap v1.27.0 + golang.org/x/sync v0.14.0 ) require ( + github.com/bsm/redislock v0.9.4 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/dlclark/regexp2 v1.11.5 // indirect - github.com/edsrzf/mmap-go v1.2.0 // indirect github.com/fatih/color v1.18.0 // indirect github.com/ghodss/yaml v1.0.0 // indirect + github.com/go-co-op/gocron v1.37.0 // indirect github.com/go-faster/yaml v0.4.6 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/google/uuid v1.6.0 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect - github.com/nilsmagnus/grib v1.2.8 // indirect + github.com/redis/go-redis/v9 v9.10.0 // indirect + github.com/robfig/cron/v3 v3.0.1 // indirect github.com/segmentio/asm v1.2.0 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect + go.uber.org/atomic v1.9.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/exp v0.0.0-20230725093048-515e97ebf090 // indirect golang.org/x/net v0.40.0 // indirect - golang.org/x/sync v0.14.0 // indirect golang.org/x/sys v0.33.0 // indirect golang.org/x/text v0.25.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect diff --git a/go.sum b/go.sum index 64d966f..0480917 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,15 @@ +github.com/bsm/redislock v0.9.4 h1:X/Wse1DPpiQgHbVYRE9zv6m070UcKoOGekgvpNhiSvw= +github.com/bsm/redislock v0.9.4/go.mod h1:Epf7AJLiSFwLCiZcfi6pWFO/8eAYrYpQXFxEDPoDeAk= github.com/caarlos0/env/v11 v11.3.1 h1:cArPWC15hWmEt+gWk7YBi7lEXTXCvpaSdCiZE2X5mCA= github.com/caarlos0/env/v11 v11.3.1/go.mod h1:qupehSf/Y0TUTsxKywqRt/vJjN5nz6vauiYEUUr8P4U= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/dlclark/regexp2 v1.11.5 h1:Q/sSnsKerHeCkc/jSTNq1oCm7KiVgUMZRDUoRu0JQZQ= github.com/dlclark/regexp2 v1.11.5/go.mod h1:DHkYz0B9wPfa6wondMfaivmHpzrQ3v9q8cnmRbL6yW8= github.com/edsrzf/mmap-go v1.2.0 h1:hXLYlkbaPzt1SaQk+anYwKSRNhufIDCchSPkUD6dD84= @@ -10,6 +18,8 @@ github.com/fatih/color v1.18.0 h1:S8gINlzdQ840/4pfAwic/ZE0djQEH3wM94VfqLTZcOM= github.com/fatih/color v1.18.0/go.mod h1:4FelSpRwEGDpQ12mAdzqdOukCy4u8WUtOY6lkT/6HfU= github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= +github.com/go-co-op/gocron v1.37.0 h1:ZYDJGtQ4OMhTLKOKMIch+/CY70Brbb1dGdooLEhh7b0= +github.com/go-co-op/gocron v1.37.0/go.mod h1:3L/n6BkO7ABj+TrfSVXLRzsP26zmikL4ISkLQ0O8iNY= github.com/go-faster/errors v0.7.1 h1:MkJTnDoEdi9pDabt1dpWf7AA8/BaSYZqibYyhZ20AYg= github.com/go-faster/errors v0.7.1/go.mod h1:5ySTjWFiphBs07IKuiL69nxdfd5+fzh1u7FPGZP2quo= github.com/go-faster/jx v1.1.0 h1:ZsW3wD+snOdmTDy9eIVgQdjUpXRRV4rqW8NS3t+20bg= @@ -23,10 +33,16 @@ github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= @@ -38,12 +54,26 @@ github.com/nilsmagnus/grib v1.2.8 h1:H7ch/1/agaCqM3MC8hW1Ft+EJ+q2XB757uml/IfPvp4 github.com/nilsmagnus/grib v1.2.8/go.mod h1:XHm+5zuoOk0NSIWaGmA3JaAxI4i50YvD1L1vz+aqPOQ= github.com/ogen-go/ogen v1.14.0 h1:TU1Nj4z9UBsAfTkf+IhuNNp7igdFQKqkk9+6/y4XuWg= github.com/ogen-go/ogen v1.14.0/go.mod h1:Iw1vkqkx6SU7I9th5ceP+fVPJ6Wge4e3kAVzAxJEpPE= +github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/redis/go-redis/v9 v9.10.0 h1:FxwK3eV8p/CQa0Ch276C7u2d0eNC9kCmAYQ7mCXCzVs= +github.com/redis/go-redis/v9 v9.10.0/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= +github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= +github.com/rogpeppe/go-internal v1.8.1/go.mod h1:JeRgkft04UBgHMgCIwADu4Pn6Mtm5d4nPKWu0nJ5d+o= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys= github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= @@ -54,6 +84,8 @@ go.opentelemetry.io/otel/metric v1.36.0 h1:MoWPKVhQvJ+eeXWHFBOPoBOi20jh6Iq2CcCRE go.opentelemetry.io/otel/metric v1.36.0/go.mod h1:zC7Ks+yeyJt4xig9DEw9kuUFe5C3zLbVjV2PzT6qzbs= go.opentelemetry.io/otel/trace v1.36.0 h1:ahxWNuqZjpdiFAyrIoQ4GIiAIhxAunQR6MUoKrsNd4w= go.opentelemetry.io/otel/trace v1.36.0/go.mod h1:gQ+OnDZzrybY4k4seLzPAWNwVBBVlF2szhehOBB/tGA= +go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= +go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= @@ -73,9 +105,12 @@ golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/text v0.25.0 h1:qVyWApTSYLk/drJRO5mDlNYskwQznZmkpV2c8q9zls4= golang.org/x/text v0.25.0/go.mod h1:WEdwpYrmk1qmdHvhkSTNPm3app7v4rsT8F2UD6+VHIA= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/jobs/grib/updater/config.go b/internal/jobs/grib/updater/config.go new file mode 100644 index 0000000..0127ce8 --- /dev/null +++ b/internal/jobs/grib/updater/config.go @@ -0,0 +1,8 @@ +package updater + +import "time" + +type Config struct { + Interval time.Duration `env:"INTERVAL" envDefault:"6h"` + Timeout time.Duration `env:"TIMEOUT" envDefault:"45m"` +} diff --git a/internal/jobs/grib/updater/deps.go b/internal/jobs/grib/updater/deps.go new file mode 100644 index 0000000..f2e3ced --- /dev/null +++ b/internal/jobs/grib/updater/deps.go @@ -0,0 +1,8 @@ +package updater + +import "context" + +// GribService defines the interface for GRIB operations needed by the updater job +type GribService interface { + Update(ctx context.Context) error +} diff --git a/internal/jobs/grib/updater/updater.go b/internal/jobs/grib/updater/updater.go new file mode 100644 index 0000000..0295b0e --- /dev/null +++ b/internal/jobs/grib/updater/updater.go @@ -0,0 +1,51 @@ +package updater + +import ( + "context" + "time" + + "git.intra.yksa.space/gsn/predictor/internal/pkg/errcodes" + "go.uber.org/zap" +) + +type Job struct { + service GribService + config *Config + logger *zap.Logger +} + +func New(service GribService, config *Config, logger *zap.Logger) *Job { + return &Job{ + service: service, + config: config, + logger: logger, + } +} + +func (j *Job) GetInterval() time.Duration { + return j.config.Interval +} + +func (j *Job) GetTimeout() time.Duration { + return j.config.Timeout +} + +func (j *Job) GetCount() int { + return 0 // Run indefinitely +} + +func (j *Job) GetAsync() bool { + return false // Singleton mode - only one instance should run +} + +func (j *Job) Execute(ctx context.Context) error { + j.logger.Info("executing GRIB update job") + + if err := j.service.Update(ctx); err != nil { + j.logger.Error("GRIB update failed", zap.Error(err)) + return errcodes.Wrap(err, "failed to update GRIB data") + } + + j.logger.Info("GRIB update completed successfully") + return nil +} diff --git a/internal/pkg/ds/predictor.go b/internal/pkg/ds/predictor.go index 3f6b265..1da6675 100644 --- a/internal/pkg/ds/predictor.go +++ b/internal/pkg/ds/predictor.go @@ -1,7 +1,21 @@ package ds +import "time" + type PredictionParameters struct { + LaunchLatitude float64 + LaunchLongitude float64 + LaunchDatetime time.Time + LaunchAltitude float64 + // Add other parameters as needed } type PredicitonResult struct { -} \ No newline at end of file + Latitude float64 + Longitude float64 + Altitude float64 + Timestamp time.Time + WindU float64 + WindV float64 + // Add other result fields as needed +} diff --git a/internal/pkg/errcodes/errcodes.go b/internal/pkg/errcodes/errcodes.go index 32420c3..e6748b3 100644 --- a/internal/pkg/errcodes/errcodes.go +++ b/internal/pkg/errcodes/errcodes.go @@ -23,7 +23,89 @@ func (e *ErrorCode) Error() string { return e.Message } +// IsErr checks if the given error is an ErrorCode +func IsErr(err error) bool { + _, ok := err.(*ErrorCode) + return ok +} + +// AsErr converts error to ErrorCode if possible +func AsErr(err error) (*ErrorCode, bool) { + if err == nil { + return nil, false + } + errcode, ok := err.(*ErrorCode) + return errcode, ok +} + +// Join combines multiple errors into a single ErrorCode +func Join(errs ...error) error { + if len(errs) == 0 { + return nil + } + + var messages []string + var details []string + + for _, err := range errs { + if err == nil { + continue + } + + if errcode, ok := AsErr(err); ok { + messages = append(messages, errcode.Message) + if errcode.Details != "" { + details = append(details, errcode.Details) + } + } else { + messages = append(messages, err.Error()) + } + } + + if len(messages) == 0 { + return nil + } + + // Use the first error's status code, or default to 500 + statusCode := http.StatusInternalServerError + if len(errs) > 0 { + if errcode, ok := AsErr(errs[0]); ok { + statusCode = errcode.StatusCode + } + } + + return New(statusCode, strings.Join(messages, "; "), details...) +} + +// Wrap wraps an error with additional context +func Wrap(err error, message string) error { + if err == nil { + return nil + } + + if errcode, ok := AsErr(err); ok { + return New(errcode.StatusCode, message, errcode.Message, errcode.Details) + } + + return New(http.StatusInternalServerError, message, err.Error()) +} + var ( - ErrNoDataset = New(http.StatusNotFound, "no grib dataset found") - ErrOutOfBounds = New(http.StatusBadRequest, "requested time is out of bounds") + ErrNoDataset = New(http.StatusNotFound, "no grib dataset found") + ErrOutOfBounds = New(http.StatusBadRequest, "requested time is out of bounds") + ErrConfig = New(http.StatusInternalServerError, "configuration error") + ErrConfigInvalidEnv = New(http.StatusInternalServerError, "invalid environment configuration") + ErrConfigMissingRequired = New(http.StatusInternalServerError, "missing required configuration") + ErrRedis = New(http.StatusInternalServerError, "redis error") + ErrRedisLockAlreadyLocked = New(http.StatusConflict, "could not perform redis lock", "already locked") + ErrRedisCacheMiss = New(http.StatusNotFound, "cache miss", "key not found") + ErrRedisCacheCorrupted = New(http.StatusInternalServerError, "cache data corrupted", "invalid format") + ErrDownload = New(http.StatusInternalServerError, "download error") + ErrProcessing = New(http.StatusInternalServerError, "data processing error") + ErrNoCubeFilesFound = New(http.StatusNotFound, "no cube files found") + ErrNoValidCubeFilesFound = New(http.StatusNotFound, "no valid cube files found") + ErrLatestCubeFileIsTooOld = New(http.StatusNotFound, "latest cube file is too old") + ErrScheduler = New(http.StatusInternalServerError, "scheduler error") + ErrSchedulerInvalidJob = New(http.StatusBadRequest, "invalid job configuration") + ErrSchedulerTimeoutTooLong = New(http.StatusBadRequest, "job timeout too long", "timeout cannot exceed interval") ) diff --git a/internal/pkg/errcodes/errcodes_test.go b/internal/pkg/errcodes/errcodes_test.go new file mode 100644 index 0000000..e5ff84c --- /dev/null +++ b/internal/pkg/errcodes/errcodes_test.go @@ -0,0 +1,81 @@ +package errcodes + +import ( + "testing" +) + +func TestSpecificErrorTypes(t *testing.T) { + // Test Redis lock error + err := ErrRedisLockAlreadyLocked + if !IsErr(err) { + t.Error("Expected IsErr to return true for ErrorCode") + } + + errcode, ok := AsErr(err) + if !ok { + t.Error("Expected AsErr to return true for ErrorCode") + } + if errcode != ErrRedisLockAlreadyLocked { + t.Error("Expected AsErr to return the same error") + } + + // Test Redis cache miss error + cacheErr := ErrRedisCacheMiss + if !IsErr(cacheErr) { + t.Error("Expected IsErr to return true for cache miss error") + } + + // Test configuration error + configErr := ErrConfigInvalidEnv + if !IsErr(configErr) { + t.Error("Expected IsErr to return true for config error") + } + + // Test scheduler error + schedulerErr := ErrSchedulerTimeoutTooLong + if !IsErr(schedulerErr) { + t.Error("Expected IsErr to return true for scheduler error") + } +} + +func TestErrorChecking(t *testing.T) { + // Example of how to check for specific errors in practice + err := ErrRedisLockAlreadyLocked + + // Check if it's a specific error type + if errcode, ok := AsErr(err); ok { + switch errcode { + case ErrRedisLockAlreadyLocked: + // Handle lock already locked case + t.Log("Handling lock already locked error") + case ErrRedisCacheMiss: + // Handle cache miss case + t.Log("Handling cache miss error") + case ErrRedisCacheCorrupted: + // Handle corrupted cache case + t.Log("Handling corrupted cache error") + default: + // Handle other error types + t.Log("Handling other error type") + } + } +} + +func TestWrapFunction(t *testing.T) { + originalErr := ErrRedisCacheMiss + wrappedErr := Wrap(originalErr, "additional context") + + if !IsErr(wrappedErr) { + t.Error("Expected wrapped error to be an ErrorCode") + } + + errcode, ok := AsErr(wrappedErr) + if !ok { + t.Error("Expected AsErr to work with wrapped error") + } + + // The wrapped error should have the same status code as the original + if errcode.StatusCode != ErrRedisCacheMiss.StatusCode { + t.Errorf("Expected status code %d, got %d", ErrRedisCacheMiss.StatusCode, errcode.StatusCode) + } +} diff --git a/internal/pkg/grib/README.md b/internal/pkg/grib/README.md new file mode 100644 index 0000000..6f7e551 --- /dev/null +++ b/internal/pkg/grib/README.md @@ -0,0 +1,104 @@ +# GRIB Module + +Этот модуль реализует функциональность для работы с GRIB-файлами, аналогичную tawhiri-downloader и tawhiri, но на Go. + +## Основные возможности + +- **Скачивание GRIB-файлов** с NOMADS (GFS прогнозы) +- **Сборка 5D-куба** (время, давление, широта, долгота, переменные u/v) +- **Эффективное хранение** с использованием mmap +- **Интерполяция** ветровых данных для произвольных координат и времени +- **Кэширование** результатов (in-memory + Redis) +- **Распределенные блокировки** для предотвращения дублирования загрузок + +## Архитектура + +### Основные компоненты + +- **Downloader** - скачивает GRIB-файлы с NOMADS +- **Cube** - управляет 5D-массивом данных через mmap +- **Extractor** - выполняет интерполяцию данных +- **Cache** - кэширует результаты запросов +- **Service** - основной интерфейс для работы с модулем + +### Структура данных + +5D-куб содержит: +- **Время**: 17 временных срезов (0, 3, 6, ..., 48 часов) +- **Давление**: 34 уровня давления (1000, 975, 950, ..., 2 hPa) +- **Широта**: 361 точка (-90° до +90°) +- **Долгота**: 720 точек (0° до 359.5°) +- **Переменные**: u-ветер и v-ветер + +## Использование + +```go +// Создание сервиса +cfg := grib.ServiceConfig{ + Dir: "/tmp/grib", + TTL: 24 * time.Hour, + CacheTTL: 1 * time.Hour, + Redis: redisClient, + Parallel: 4, + Client: &http.Client{Timeout: 30 * time.Second}, +} + +service, err := grib.New(cfg) +if err != nil { + log.Fatal(err) +} +defer service.Close() + +// Обновление данных +err = service.Update(ctx) + +// Извлечение ветровых данных +wind, err := service.Extract(ctx, lat, lon, alt, timestamp) +// wind[0] - u-компонента ветра +// wind[1] - v-компонента ветра +``` + +## Интерполяция + +Модуль выполняет 16-точечную интерполяцию: +1. **Временная интерполяция** между двумя ближайшими срезами +2. **Интерполяция по давлению** между двумя ближайшими уровнями +3. **Билинейная интерполяция** по широте и долготе + +## Кэширование + +- **In-memory кэш**: быстрый доступ к недавно запрошенным данным +- **Redis кэш**: распределенное кэширование для множественных реплик + +## Расписание обновлений + +Рекомендуемая частота вызова `Update()`: +- **Каждые 6 часов** - для получения свежих GFS прогнозов +- **При запуске** - для загрузки начальных данных +- **По требованию** - при отсутствии данных для запрашиваемого времени + +## Отличия от tawhiri + +### Преимущества Go-реализации: +- **Высокая производительность** (mmap, конкурентные загрузки) +- **Эффективное использование памяти** (не загружает весь массив в RAM) +- **Горизонтальное масштабирование** (stateless, множество реплик) +- **Встроенное кэширование** (in-memory + Redis) + +### Особенности: +- Использует `github.com/nilsmagnus/grib` вместо pygrib +- Реализует собственную логику интерполяции +- Поддерживает распределенные блокировки через Redis + +## Конфигурация + +### Переменные окружения: +- `PREDICTOR_GRIB_DATASET_URL` - URL источника данных (опционально) + +### Параметры ServiceConfig: +- `Dir` - директория для хранения файлов +- `TTL` - время жизни данных (по умолчанию 24 часа) +- `CacheTTL` - время жизни кэша (по умолчанию 1 час) +- `Redis` - Redis клиент для блокировок и кэша +- `Parallel` - количество параллельных загрузок +- `Client` - HTTP клиент для загрузок \ No newline at end of file diff --git a/internal/pkg/grib/cache.go b/internal/pkg/grib/cache.go index 9df0ff2..a31ae4d 100644 --- a/internal/pkg/grib/cache.go +++ b/internal/pkg/grib/cache.go @@ -1,8 +1,6 @@ package grib import ( - "encoding/binary" - "math" "sync" "time" ) @@ -31,10 +29,3 @@ func (c *memCache) get(k uint64) (vec, bool) { } func (c *memCache) set(k uint64, v vec) { c.m.Store(k, item{v, time.Now().Add(c.ttl)}) } - -func encodeVec(v vec) []byte { - var b [16]byte - binary.LittleEndian.PutUint64(b[:8], math.Float64bits(v[0])) - binary.LittleEndian.PutUint64(b[8:], math.Float64bits(v[1])) - return b[:] -} diff --git a/internal/pkg/grib/config.go b/internal/pkg/grib/config.go index 065c688..69f2270 100644 --- a/internal/pkg/grib/config.go +++ b/internal/pkg/grib/config.go @@ -1,24 +1,15 @@ package grib import ( - "fmt" "net/url" - - env "github.com/caarlos0/env/v11" + "time" ) type Config struct { - DatasetURL url.URL `env:"DATASET_URL"` -} - -func NewConfig(servicePrefix string) (*Config, error) { - cfg := &Config{} - - if err := env.ParseWithOptions(cfg, env.Options{ - PrefixTagName: fmt.Sprintf("%s_GRIB_", servicePrefix), - }); err != nil { - return nil, err - } - - return cfg, nil + Dir string `env:"DIR" envDefault:"/tmp/grib"` + TTL time.Duration `env:"TTL" envDefault:"24h"` + CacheTTL time.Duration `env:"CACHE_TTL" envDefault:"1h"` + Parallel int `env:"PARALLEL" envDefault:"4"` + Timeout time.Duration `env:"TIMEOUT" envDefault:"30s"` + DatasetURL url.URL `env:"DATASET_URL" envDefault:"https://nomads.ncep.noaa.gov/"` } diff --git a/internal/pkg/grib/cube.go b/internal/pkg/grib/cube.go index 6e8498f..71a1c7f 100644 --- a/internal/pkg/grib/cube.go +++ b/internal/pkg/grib/cube.go @@ -12,6 +12,7 @@ type cube struct { mm mmap.MMap // read‑only, U followed by V (float32 LE) t, p, lat, lon int bytesPerVar int64 + file *os.File } func openCube(path string) (*cube, error) { @@ -22,6 +23,7 @@ func openCube(path string) (*cube, error) { mm, err := mmap.Map(f, mmap.RDONLY, 0) if err != nil { + f.Close() return nil, err } @@ -32,7 +34,7 @@ func openCube(path string) (*cube, error) { nLon = 720 ) - return &cube{mm: mm, t: nT, p: nP, lat: nLat, lon: nLon, bytesPerVar: int64(nT * nP * nLat * nLon * 4)}, nil + return &cube{mm: mm, t: nT, p: nP, lat: nLat, lon: nLon, bytesPerVar: int64(nT * nP * nLat * nLon * 4), file: f}, nil } func (c *cube) val(varIdx, ti, pi, y, x int) float32 { @@ -41,3 +43,13 @@ func (c *cube) val(varIdx, ti, pi, y, x int) float32 { bits := binary.LittleEndian.Uint32(c.mm[off : off+4]) return math.Float32frombits(bits) } + +func (c *cube) Close() error { + if c.mm != nil { + c.mm.Unmap() + } + if c.file != nil { + return c.file.Close() + } + return nil +} diff --git a/internal/pkg/grib/dataset.go b/internal/pkg/grib/dataset.go index 987da59..e539f65 100644 --- a/internal/pkg/grib/dataset.go +++ b/internal/pkg/grib/dataset.go @@ -4,3 +4,10 @@ type dataset struct { cube *cube runUTC int64 // unix seconds } + +func (d *dataset) Close() error { + if d.cube != nil { + return d.cube.Close() + } + return nil +} diff --git a/internal/pkg/grib/downloader.go b/internal/pkg/grib/downloader.go index 892d84b..d94006a 100644 --- a/internal/pkg/grib/downloader.go +++ b/internal/pkg/grib/downloader.go @@ -9,6 +9,7 @@ import ( "path/filepath" "time" + "git.intra.yksa.space/gsn/predictor/internal/pkg/errcodes" "golang.org/x/sync/errgroup" ) @@ -42,7 +43,7 @@ func (d *Downloader) fetch(ctx context.Context, url, dst string) error { } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { - return fmt.Errorf("bad status %s", resp.Status) + return errcodes.Wrap(errcodes.ErrDownload, "bad status: "+resp.Status) } if _, err := io.Copy(f, resp.Body); err != nil { return err diff --git a/internal/pkg/grib/grib.go b/internal/pkg/grib/grib.go index ff95e90..062b2db 100644 --- a/internal/pkg/grib/grib.go +++ b/internal/pkg/grib/grib.go @@ -3,10 +3,12 @@ package grib import ( "context" "encoding/binary" + "fmt" "math" "net/http" "os" "path/filepath" + "strings" "sync/atomic" "time" @@ -21,6 +23,12 @@ type RedisIface interface { Get(key string) ([]byte, error) } +type Service interface { + Update(ctx context.Context) error + Extract(ctx context.Context, lat, lon, alt float64, ts time.Time) ([2]float64, error) + Close() error +} + type ServiceConfig struct { Dir string TTL time.Duration @@ -36,7 +44,7 @@ type service struct { data atomic.Pointer[dataset] } -func New(cfg ServiceConfig) (*service, error) { +func New(cfg ServiceConfig) (Service, error) { if cfg.TTL == 0 { cfg.TTL = 24 * time.Hour } @@ -44,29 +52,134 @@ func New(cfg ServiceConfig) (*service, error) { return nil, err } s := &service{cfg: cfg, cache: memCache{ttl: cfg.CacheTTL}} + + // Try to load existing dataset on startup + if err := s.loadExistingDataset(); err != nil { + // Log error but don't fail startup - dataset will be loaded on first Update() + // This allows the service to start even if no data is available yet + } + return s, nil } +// loadExistingDataset tries to load the most recent available dataset +func (s *service) loadExistingDataset() error { + // Find the most recent cube file + pattern := filepath.Join(s.cfg.Dir, "*.cube") + matches, err := filepath.Glob(pattern) + if err != nil { + return err + } + + if len(matches) == 0 { + return errcodes.ErrNoCubeFilesFound + } + + // Sort by modification time (newest first) + var latestFile string + var latestTime time.Time + + for _, match := range matches { + info, err := os.Stat(match) + if err != nil { + continue + } + + if info.ModTime().After(latestTime) { + latestTime = info.ModTime() + latestFile = match + } + } + + if latestFile == "" { + return errcodes.ErrNoValidCubeFilesFound + } + + // Check if the file is fresh enough + if time.Since(latestTime) > s.cfg.TTL { + return errcodes.Wrap(errcodes.ErrLatestCubeFileIsTooOld, "latest cube file is too old") + } + + // Load the dataset + c, err := openCube(latestFile) + if err != nil { + return err + } + + // Extract run time from filename + base := filepath.Base(latestFile) + runStr := strings.TrimSuffix(base, ".cube") + run, err := time.Parse("20060102_15", runStr) + if err != nil { + c.Close() + return err + } + + ds := &dataset{cube: c, runUTC: run.Unix()} + s.data.Store(ds) + + return nil +} + // Update() downloads missing GRIBs, assembles cube into a single mmap‑file. func (s *service) Update(ctx context.Context) error { + // Check if we already have fresh data + if d := s.data.Load(); d != nil { + runTime := time.Unix(d.runUTC, 0) + if time.Since(runTime) < s.cfg.TTL { + // Data is still fresh, no need to update + return nil + } + } + unlock, err := s.cfg.Redis.Lock(ctx, "grib-dl", 45*time.Minute) if err != nil { return err } defer unlock(ctx) - dl := downloader.Downloader{Dir: s.cfg.Dir, Parallel: s.cfg.Parallel, Client: s.cfg.Client} + // Check again after acquiring lock (double-checked locking pattern) + if d := s.data.Load(); d != nil { + runTime := time.Unix(d.runUTC, 0) + if time.Since(runTime) < s.cfg.TTL { + // Another instance already updated the data + return nil + } + } + + dl := Downloader{Dir: s.cfg.Dir, Parallel: s.cfg.Parallel, Client: s.cfg.Client} run := nearestRun(time.Now().UTC().Add(-4 * time.Hour)) + + // Check if we already have this run + cubePath := filepath.Join(s.cfg.Dir, run.Format("20060102_15")) + ".cube" + if _, err := os.Stat(cubePath); err == nil { + // File exists, check if it's fresh + info, err := os.Stat(cubePath) + if err == nil && time.Since(info.ModTime()) < s.cfg.TTL { + // File is fresh, just load it + c, err := openCube(cubePath) + if err != nil { + return err + } + ds := &dataset{cube: c, runUTC: run.Unix()} + s.data.Store(ds) + s.cache = memCache{ttl: s.cfg.CacheTTL} + return nil + } + } + + // Download new data if err := dl.Run(ctx, run); err != nil { return err } - cubePath := filepath.Join(s.cfg.Dir, run.Format("20060102_15")) + ".cube" + // Assemble cube if it doesn't exist if _, err := os.Stat(cubePath); err != nil { if err := assembleCube(s.cfg.Dir, run, cubePath); err != nil { return err } } + c, err := openCube(cubePath) if err != nil { return err @@ -101,26 +214,52 @@ func assembleCube(dir string, run time.Time, cubePath string) error { for ti, step := range steps { fn := filepath.Join(dir, fileName(run, step)) - gf, err := griblib.Read(fn) + file, err := os.Open(fn) if err != nil { return err } - for _, m := range gf.Messages { - if m.ParameterShortName != "u" && m.ParameterShortName != "v" { + + messages, err := griblib.ReadMessages(file) + file.Close() // Close immediately after reading + if err != nil { + return err + } + + for _, m := range messages { + // Check if this is a wind component (u or v) + // ParameterCategory 2 = momentum, ParameterNumber 2 = u-wind, 3 = v-wind + if m.Section4.ProductDefinitionTemplateNumber != 0 { continue } - if m.TypeOfFirstFixedSurface != 100 { + + product := m.Section4.ProductDefinitionTemplate + if product.ParameterCategory != 2 { continue } - pIdx, ok := pIndex[int(m.PressureLevel)] + + var varIdx int + switch product.ParameterNumber { + case 2: // u-wind + varIdx = 0 + case 3: // v-wind + varIdx = 1 + default: + continue + } + + // Check if this is a pressure level (type 100) + if product.FirstSurface.Type != 100 { + continue + } + + // Get pressure level in hPa + pressure := float64(product.FirstSurface.Value) / 100.0 + pIdx, ok := pIndex[int(math.Round(pressure))] if !ok { continue } - varIdx := 0 - if m.ParameterShortName == "v" { - varIdx = 1 - } - vals := m.Values + + vals := m.Data() // GRIB library returns scan north->south, west->east already in row-major order raw := make([]byte, len(vals)*4) for i, v := range vals { @@ -142,13 +281,56 @@ func (s *service) Extract(ctx context.Context, lat, lon, alt float64, ts time.Ti if ts.Before(time.Unix(d.runUTC, 0)) || ts.After(time.Unix(d.runUTC, 0).Add(48*time.Hour)) { return zero, errcodes.ErrOutOfBounds } + + // Try memory cache first key := encodeKey(lat, lon, alt, ts) if v, ok := s.cache.get(key); ok { return [2]float64(v), nil } + + // Try Redis cache + redisKey := fmt.Sprintf("grib:extract:%d", key) + if cached, err := s.cfg.Redis.Get(redisKey); err == nil { + var result [2]float64 + if len(cached) == 16 { + result[0] = math.Float64frombits(binary.LittleEndian.Uint64(cached[:8])) + result[1] = math.Float64frombits(binary.LittleEndian.Uint64(cached[8:])) + s.cache.set(key, vec(result)) + return result, nil + } else { + // Cache data is corrupted (wrong length) + return zero, errcodes.ErrRedisCacheCorrupted + } + } else { + // Check if it's a cache miss (expected error) + if errcode, ok := errcodes.AsErr(err); ok && errcode == errcodes.ErrRedisCacheMiss { + // Cache miss is expected, continue with calculation + } else { + // Unexpected error, return it + return zero, err + } + } + + // Calculate result td := ts.Sub(time.Unix(d.runUTC, 0)).Hours() u, v := d.uv(lat, lon, alt, td) out := [2]float64{u, v} + + // Cache in memory s.cache.set(key, vec(out)) + + // Cache in Redis + encoded := make([]byte, 16) + binary.LittleEndian.PutUint64(encoded[:8], math.Float64bits(out[0])) + binary.LittleEndian.PutUint64(encoded[8:], math.Float64bits(out[1])) + s.cfg.Redis.Set(redisKey, encoded, s.cfg.CacheTTL) + return out, nil } + +func (s *service) Close() error { + if d := s.data.Load(); d != nil { + return d.Close() + } + return nil +} diff --git a/internal/pkg/grib/grib_test.go b/internal/pkg/grib/grib_test.go new file mode 100644 index 0000000..c536ab3 --- /dev/null +++ b/internal/pkg/grib/grib_test.go @@ -0,0 +1,62 @@ +package grib + +import ( + "context" + "testing" + "time" +) + +func TestServiceCreation(t *testing.T) { + cfg := ServiceConfig{ + Dir: "/tmp/grib_test", + TTL: 24 * time.Hour, + CacheTTL: 1 * time.Hour, + Redis: &MockRedis{}, + Parallel: 2, + } + + service, err := New(cfg) + if err != nil { + t.Fatalf("Failed to create service: %v", err) + } + defer service.Close() + + if service == nil { + t.Fatal("Service is nil") + } +} + +func TestNearestRun(t *testing.T) { + now := time.Date(2024, 1, 15, 14, 30, 0, 0, time.UTC) + expected := time.Date(2024, 1, 15, 12, 0, 0, 0, time.UTC) + + result := nearestRun(now) + if !result.Equal(expected) { + t.Errorf("Expected %v, got %v", expected, result) + } +} + +func TestPressureFromAlt(t *testing.T) { + alt := 10000.0 // 10km + pressure := pressureFromAlt(alt) + + // At 10km, pressure should be around 264 hPa + if pressure < 200 || pressure > 300 { + t.Errorf("Unexpected pressure at 10km: %f hPa", pressure) + } +} + +// MockRedis for testing +type MockRedis struct{} + +func (m *MockRedis) Lock(ctx context.Context, key string, ttl time.Duration) (func(context.Context), error) { + return func(ctx context.Context) {}, nil +} + +func (m *MockRedis) Set(key string, value []byte, ttl time.Duration) error { + return nil +} + +func (m *MockRedis) Get(key string) ([]byte, error) { + return nil, nil +} diff --git a/internal/service/config.go b/internal/service/config.go new file mode 100644 index 0000000..3be35aa --- /dev/null +++ b/internal/service/config.go @@ -0,0 +1,22 @@ +package service + +import ( + "net/http" + + "git.intra.yksa.space/gsn/predictor/internal/pkg/grib" + "git.intra.yksa.space/gsn/predictor/pkg/redis" +) + +type Config struct { + // GRIB Configuration + Grib grib.Config `envPrefix:"GRIB_"` + + // Redis Configuration + Redis redis.Config `envPrefix:"REDIS_"` +} + +func (c *Config) CreateHTTPClient() *http.Client { + return &http.Client{ + Timeout: c.Grib.Timeout, + } +} diff --git a/internal/service/deps.go b/internal/service/deps.go index 15f3945..6d73596 100644 --- a/internal/service/deps.go +++ b/internal/service/deps.go @@ -5,12 +5,15 @@ import ( "time" ) +type Grib interface { + Update(ctx context.Context) error + Extract(ctx context.Context, lat, lon, alt float64, ts time.Time) ([2]float64, error) + Close() error +} + type Redis interface { Lock(ctx context.Context, key string, ttl time.Duration) (func(context.Context), error) Set(key string, value []byte, ttl time.Duration) error Get(key string) ([]byte, error) -} - -type Grib interface { - Update(ctx context.Context) error + Close() error } diff --git a/internal/service/predictor.go b/internal/service/predictor.go index c78e2b4..21d90dc 100644 --- a/internal/service/predictor.go +++ b/internal/service/predictor.go @@ -2,12 +2,26 @@ package service import ( "context" - "net/http" "git.intra.yksa.space/gsn/predictor/internal/pkg/ds" - "git.intra.yksa.space/gsn/predictor/internal/pkg/errcodes" ) func (s *Service) PerformPrediction(ctx context.Context, params ds.PredictionParameters) ([]ds.PredicitonResult, error) { - return nil, errcodes.New(http.StatusNotImplemented, "not implemented", "please wait") + // Extract wind data at launch point + wind, err := s.ExtractWind(ctx, params.LaunchLatitude, params.LaunchLongitude, params.LaunchAltitude, params.LaunchDatetime) + if err != nil { + return nil, err + } + + // TODO: Implement full prediction logic + result := ds.PredicitonResult{ + Latitude: params.LaunchLatitude, + Longitude: params.LaunchLongitude, + Altitude: params.LaunchAltitude, + Timestamp: params.LaunchDatetime, + WindU: wind[0], + WindV: wind[1], + } + + return []ds.PredicitonResult{result}, nil } diff --git a/internal/service/service.go b/internal/service/service.go index 4119bb5..5858054 100644 --- a/internal/service/service.go +++ b/internal/service/service.go @@ -2,21 +2,56 @@ package service import ( "context" + "time" + + "go.uber.org/zap" ) type Service struct { - redis Redis - downloader Downloader + cfg *Config + redis Redis + grib Grib + logger *zap.Logger } -func New(redis Redis, downloader Downloader) *Service { - return &Service{ - redis: redis, - downloader: downloader, +func New(cfg *Config, gribService Grib, redisService Redis, logger *zap.Logger) (*Service, error) { + svc := &Service{ + cfg: cfg, + redis: redisService, + grib: gribService, + logger: logger, } + + return svc, nil } -// DownloadWeatherData downloads weather forecast data using the configured downloader -func (s *Service) DownloadWeatherData(ctx context.Context) error { - return s.downloader.Download(ctx) +// UpdateWeatherData updates weather forecast data using the configured grib service +func (s *Service) UpdateWeatherData(ctx context.Context) error { + return s.grib.Update(ctx) +} + +// ExtractWind extracts wind data for given coordinates and time +func (s *Service) ExtractWind(ctx context.Context, lat, lon, alt float64, ts time.Time) ([2]float64, error) { + return s.grib.Extract(ctx, lat, lon, alt, ts) +} + +// Update updates the GRIB data (implements updater.GribService) +func (s *Service) Update(ctx context.Context) error { + return s.UpdateWeatherData(ctx) +} + +// Start starts the service +func (s *Service) Start() { + s.logger.Info("service started") +} + +// Stop stops the service +func (s *Service) Stop() { + s.logger.Info("service stopped") +} + +// Close closes the service and releases resources +func (s *Service) Close() error { + s.Stop() + return nil } diff --git a/internal/transport/rest/config.go b/internal/transport/rest/config.go index 2cddd06..9ea1487 100644 --- a/internal/transport/rest/config.go +++ b/internal/transport/rest/config.go @@ -1,23 +1,9 @@ package rest -import ( - "fmt" - - env "github.com/caarlos0/env/v11" -) - type Config struct { - Port int `env:"PORT" envDefault:"8080"` -} - -func NewConfig(servicePrefix string) (*Config, error) { - cfg := &Config{} - - if err := env.ParseWithOptions(cfg, env.Options{ - PrefixTagName: fmt.Sprintf("%s_REST_", servicePrefix), - }); err != nil { - return nil, err - } - - return cfg, nil + Host string `env:"HOST" envDefault:"0.0.0.0"` + Port int `env:"PORT" envDefault:"8080"` + ReadTimeout string `env:"READ_TIMEOUT" envDefault:"30s"` + WriteTimeout string `env:"WRITE_TIMEOUT" envDefault:"30s"` + IdleTimeout string `env:"IDLE_TIMEOUT" envDefault:"60s"` } diff --git a/internal/transport/rest/handler/deps.go b/internal/transport/rest/handler/deps.go index e75769c..88777b7 100644 --- a/internal/transport/rest/handler/deps.go +++ b/internal/transport/rest/handler/deps.go @@ -2,10 +2,10 @@ package handler import ( "context" - - "git.intra.yksa.space/gsn/predictor/internal/pkg/ds" + "time" ) type Service interface { - PerformPrediction(ctx context.Context, params ds.PredictionParameters) ([]ds.PredicitonResult, error) + UpdateWeatherData(ctx context.Context) error + ExtractWind(ctx context.Context, lat, lon, alt float64, ts time.Time) ([2]float64, error) } diff --git a/pkg/redis/interface.go b/pkg/redis/interface.go new file mode 100644 index 0000000..2a5cbd2 --- /dev/null +++ b/pkg/redis/interface.go @@ -0,0 +1,21 @@ +package redis + +import ( + "context" + "time" +) + +// Service defines the interface for Redis operations +type Service interface { + // Lock acquires a distributed lock + Lock(ctx context.Context, key string, ttl time.Duration) (func(context.Context), error) + + // Set sets a key with value and TTL + Set(key string, value []byte, ttl time.Duration) error + + // Get retrieves a value by key + Get(key string) ([]byte, error) + + // Close closes the Redis connection + Close() error +} diff --git a/pkg/redis/redis.go b/pkg/redis/redis.go new file mode 100644 index 0000000..9577de0 --- /dev/null +++ b/pkg/redis/redis.go @@ -0,0 +1,89 @@ +package redis + +import ( + "context" + "fmt" + "time" + + "git.intra.yksa.space/gsn/predictor/internal/pkg/errcodes" + "github.com/bsm/redislock" + "github.com/redis/go-redis/v9" +) + +type Client struct { + client *redis.Client + locker *redislock.Client +} + +// Ensure Client implements Service interface +var _ Service = (*Client)(nil) + +type Config struct { + Host string + Port int + Password string + DB int +} + +func New(cfg Config) (*Client, error) { + client := redis.NewClient(&redis.Options{ + Addr: fmt.Sprintf("%s:%d", cfg.Host, cfg.Port), + Password: cfg.Password, + DB: cfg.DB, + }) + + // Test connection + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + if err := client.Ping(ctx).Err(); err != nil { + return nil, fmt.Errorf("failed to connect to redis: %w", err) + } + + locker := redislock.New(client) + + return &Client{ + client: client, + locker: locker, + }, nil +} + +func (c *Client) Lock(ctx context.Context, key string, ttl time.Duration) (func(context.Context), error) { + lock, err := c.locker.Obtain(ctx, key, ttl, nil) + if err != nil { + if err == redislock.ErrNotObtained { + return nil, errcodes.ErrRedisLockAlreadyLocked + } + return nil, errcodes.Wrap(err, "failed to obtain redis lock") + } + + unlock := func(ctx context.Context) { + lock.Release(ctx) + } + + return unlock, nil +} + +func (c *Client) Set(key string, value []byte, ttl time.Duration) error { + ctx := context.Background() + if err := c.client.Set(ctx, key, value, ttl).Err(); err != nil { + return errcodes.Wrap(err, "failed to set redis key") + } + return nil +} + +func (c *Client) Get(key string) ([]byte, error) { + ctx := context.Background() + result := c.client.Get(ctx, key) + if result.Err() != nil { + if result.Err() == redis.Nil { + return nil, errcodes.ErrRedisCacheMiss + } + return nil, errcodes.Wrap(result.Err(), "failed to get redis key") + } + return result.Bytes() +} + +func (c *Client) Close() error { + return c.client.Close() +} diff --git a/pkg/scheduler/config.go b/pkg/scheduler/config.go new file mode 100644 index 0000000..312a4f6 --- /dev/null +++ b/pkg/scheduler/config.go @@ -0,0 +1,5 @@ +package scheduler + +type Config struct { + Enabled bool `env:"ENABLED" envDefault:"true"` +} diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go new file mode 100644 index 0000000..c557b83 --- /dev/null +++ b/pkg/scheduler/scheduler.go @@ -0,0 +1,99 @@ +package scheduler + +import ( + "context" + "time" + + "git.intra.yksa.space/gsn/predictor/internal/pkg/errcodes" + "github.com/go-co-op/gocron" + "go.uber.org/zap" +) + +type Job interface { + GetInterval() time.Duration + GetTimeout() time.Duration + GetCount() int + GetAsync() bool + Execute(context.Context) error +} + +type Scheduler struct { + scheduler *gocron.Scheduler + logger *zap.Logger +} + +func New(logger *zap.Logger) *Scheduler { + scheduler := gocron.NewScheduler(time.UTC) + + return &Scheduler{ + scheduler: scheduler, + logger: logger, + } +} + +func (s *Scheduler) AddJob(job Job) error { + interval := job.GetInterval() + timeout := job.GetTimeout() + count := job.GetCount() + async := job.GetAsync() + + // Validate job parameters + if !async && count != 1 { + return errcodes.ErrSchedulerInvalidJob + } + if timeout > interval { + return errcodes.ErrSchedulerTimeoutTooLong + } + + // Create job function with timeout + jobFunc := func() { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + if err := job.Execute(ctx); err != nil { + s.logger.Error("job execution failed", + zap.Error(err), + zap.Duration("interval", interval), + zap.Duration("timeout", timeout)) + } else { + s.logger.Debug("job executed successfully", + zap.Duration("interval", interval), + zap.Duration("timeout", timeout)) + } + } + + // Add job to scheduler + schedulerJob := s.scheduler.Every(interval) + + if !async { + schedulerJob = schedulerJob.SingletonMode() + } + + if count > 0 { + schedulerJob = schedulerJob.LimitRunsTo(count) + } + + schedulerJob.Do(jobFunc) + + s.logger.Info("job added to scheduler", + zap.Duration("interval", interval), + zap.Duration("timeout", timeout), + zap.Int("count", count), + zap.Bool("async", async)) + + return nil +} + +func (s *Scheduler) Start() { + s.scheduler.StartAsync() + s.logger.Info("scheduler started") +} + +func (s *Scheduler) Stop() { + s.scheduler.Stop() + s.logger.Info("scheduler stopped") +} + +func (s *Scheduler) IsRunning() bool { + return s.scheduler.IsRunning() +} diff --git a/predictor b/predictor new file mode 100755 index 0000000..c74f010 Binary files /dev/null and b/predictor differ diff --git a/scripts/validate-docker.sh b/scripts/validate-docker.sh new file mode 100755 index 0000000..0546cc9 --- /dev/null +++ b/scripts/validate-docker.sh @@ -0,0 +1,55 @@ +#!/bin/bash + +# Docker validation script +set -e + +echo "🔍 Validating Docker configuration..." + +# Check if Docker is available +if ! command -v docker &> /dev/null; then + echo "❌ Docker is not installed or not in PATH" + echo "Please install Docker Desktop and enable WSL 2 integration" + exit 1 +fi + +# Check if docker-compose is available +if ! command -v docker-compose &> /dev/null; then + echo "❌ Docker Compose is not installed or not in PATH" + exit 1 +fi + +echo "✅ Docker and Docker Compose are available" + +# Validate Dockerfile syntax +echo "🔍 Validating Dockerfile..." +if docker build --dry-run . > /dev/null 2>&1; then + echo "✅ Dockerfile syntax is valid" +else + echo "❌ Dockerfile syntax is invalid" + exit 1 +fi + +# Validate docker-compose.yml +echo "🔍 Validating docker-compose.yml..." +if docker-compose config > /dev/null 2>&1; then + echo "✅ docker-compose.yml is valid" +else + echo "❌ docker-compose.yml is invalid" + exit 1 +fi + +# Validate docker-compose.dev.yml +echo "🔍 Validating docker-compose.dev.yml..." +if docker-compose -f docker-compose.dev.yml config > /dev/null 2>&1; then + echo "✅ docker-compose.dev.yml is valid" +else + echo "❌ docker-compose.dev.yml is invalid" + exit 1 +fi + +echo "✅ All Docker configurations are valid!" +echo "" +echo "🚀 Ready to build and run:" +echo " make build # Build Docker image" +echo " make up # Start services" +echo " make logs # View logs" \ No newline at end of file