predictor/internal/engine/events.go
2026-05-23 00:55:35 +09:00

89 lines
2.4 KiB
Go

package engine
import "sync"
// Event is a non-fatal observation made during integration.
//
// Events generalise the warnings counter from the original Tawhiri port:
// any model or constraint can emit them, the EventSink aggregates by Type,
// and each Result carries a summary slice for the API to surface.
type Event struct {
Type string // short identifier, e.g. "above_model"
Time float64 // UNIX seconds when the event was emitted
State State
Message string
}
// EventSummary is the per-type aggregation of repeated emissions.
type EventSummary struct {
Type string `json:"type"`
Count int64 `json:"count"`
FirstTime float64 `json:"first_time"`
LastTime float64 `json:"last_time"`
FirstState State `json:"first_state"`
LastState State `json:"last_state"`
Message string `json:"message"`
}
// EventSink collects events from models and the integrator, aggregating
// duplicate types into a single EventSummary. Safe for concurrent use.
type EventSink struct {
mu sync.Mutex
summaries map[string]*EventSummary
}
// NewEventSink returns an empty sink.
func NewEventSink() *EventSink { return &EventSink{summaries: make(map[string]*EventSummary)} }
// Emit records one occurrence of typ at (t, s) with the provided message.
// Subsequent emits with the same typ update LastTime/LastState and Count.
func (s *EventSink) Emit(typ string, t float64, state State, message string) {
if s == nil {
return
}
s.mu.Lock()
defer s.mu.Unlock()
sum, ok := s.summaries[typ]
if !ok {
s.summaries[typ] = &EventSummary{
Type: typ, Count: 1,
FirstTime: t, LastTime: t,
FirstState: state, LastState: state,
Message: message,
}
return
}
sum.Count++
sum.LastTime = t
sum.LastState = state
if sum.Message == "" && message != "" {
sum.Message = message
}
}
// Snapshot returns a stable copy of every summary in deterministic order
// (sorted by Type).
func (s *EventSink) Snapshot() []EventSummary {
if s == nil {
return nil
}
s.mu.Lock()
defer s.mu.Unlock()
out := make([]EventSummary, 0, len(s.summaries))
for _, sum := range s.summaries {
out = append(out, *sum)
}
sortEventSummaries(out)
return out
}
func sortEventSummaries(s []EventSummary) {
// Insertion sort: usually one or two entries.
for i := 1; i < len(s); i++ {
j := i
for j > 0 && s[j-1].Type > s[j].Type {
s[j-1], s[j] = s[j], s[j-1]
j--
}
}
}