add telemetry system with analytics and crash reporting (M08)
This commit is contained in:
666
portal/internal/telemetry/telemetry.go
Normal file
666
portal/internal/telemetry/telemetry.go
Normal file
@@ -0,0 +1,666 @@
|
||||
// Package telemetry provides app analytics and crash reporting
|
||||
package telemetry
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"database/sql"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"regexp"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
_ "modernc.org/sqlite"
|
||||
)
|
||||
|
||||
// Event types
|
||||
const (
|
||||
EventAppStart = "app_start"
|
||||
EventAppStop = "app_stop"
|
||||
EventAppCrash = "app_crash"
|
||||
EventLuaError = "lua_error"
|
||||
EventPerfFrame = "perf_frame"
|
||||
EventPerfMemory = "perf_memory"
|
||||
EventPerfStartup = "perf_startup"
|
||||
EventScreenView = "screen_view"
|
||||
EventFeatureUsed = "feature_used"
|
||||
)
|
||||
|
||||
// Event represents a telemetry event
|
||||
type Event struct {
|
||||
Type string `json:"type"`
|
||||
Timestamp string `json:"timestamp"`
|
||||
Data json.RawMessage `json:"data,omitempty"`
|
||||
}
|
||||
|
||||
// EventBatch represents a batch of events from a device
|
||||
type EventBatch struct {
|
||||
AppID string `json:"app_id"`
|
||||
AppVersion string `json:"app_version"`
|
||||
MosisVersion string `json:"mosis_version"`
|
||||
DeviceID string `json:"device_id"`
|
||||
SessionID string `json:"session_id,omitempty"`
|
||||
Events []Event `json:"events"`
|
||||
}
|
||||
|
||||
// CrashReport represents a crash report from a device
|
||||
type CrashReport struct {
|
||||
AppID string `json:"app_id"`
|
||||
AppVersion string `json:"app_version"`
|
||||
MosisVersion string `json:"mosis_version"`
|
||||
DeviceID string `json:"device_id"`
|
||||
Timestamp string `json:"timestamp"`
|
||||
Crash CrashDetails `json:"crash"`
|
||||
}
|
||||
|
||||
// CrashDetails contains crash information
|
||||
type CrashDetails struct {
|
||||
Type string `json:"type"`
|
||||
Message string `json:"message"`
|
||||
StackTrace string `json:"stack_trace"`
|
||||
Context map[string]interface{} `json:"context,omitempty"`
|
||||
}
|
||||
|
||||
// CrashGroup represents a group of similar crashes
|
||||
type CrashGroup struct {
|
||||
ID string `json:"id"`
|
||||
AppID string `json:"app_id"`
|
||||
Fingerprint string `json:"fingerprint"`
|
||||
CrashType string `json:"crash_type"`
|
||||
Message string `json:"message"`
|
||||
SampleStackTrace string `json:"sample_stack_trace"`
|
||||
FirstSeen string `json:"first_seen"`
|
||||
LastSeen string `json:"last_seen"`
|
||||
OccurrenceCount int `json:"occurrence_count"`
|
||||
AffectedVersions []string `json:"affected_versions"`
|
||||
Status string `json:"status"` // open, resolved, ignored
|
||||
}
|
||||
|
||||
// DailyStats represents aggregated daily statistics
|
||||
type DailyStats struct {
|
||||
AppID string `json:"app_id"`
|
||||
Date string `json:"date"`
|
||||
EventType string `json:"event_type"`
|
||||
Count int `json:"count"`
|
||||
UniqueDevices int `json:"unique_devices"`
|
||||
}
|
||||
|
||||
// AnalyticsOverview represents the analytics summary for an app
|
||||
type AnalyticsOverview struct {
|
||||
DAU int `json:"dau"`
|
||||
DAUChange float64 `json:"dau_change"`
|
||||
TotalCrashes int `json:"total_crashes"`
|
||||
CrashChange float64 `json:"crash_change"`
|
||||
CrashFreeRate float64 `json:"crash_free_rate"`
|
||||
TotalSessions int `json:"total_sessions"`
|
||||
}
|
||||
|
||||
// Service handles telemetry operations
|
||||
type Service struct {
|
||||
db *sql.DB
|
||||
dbPath string
|
||||
mu sync.Mutex
|
||||
stopCh chan struct{}
|
||||
}
|
||||
|
||||
// New creates a new telemetry service with a separate database
|
||||
func New(dbPath string) (*Service, error) {
|
||||
db, err := sql.Open("sqlite", dbPath+"?_journal_mode=WAL&_busy_timeout=5000")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("open telemetry db: %w", err)
|
||||
}
|
||||
|
||||
s := &Service{
|
||||
db: db,
|
||||
dbPath: dbPath,
|
||||
stopCh: make(chan struct{}),
|
||||
}
|
||||
|
||||
if err := s.migrate(); err != nil {
|
||||
db.Close()
|
||||
return nil, fmt.Errorf("migrate telemetry db: %w", err)
|
||||
}
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
// migrate creates the telemetry database schema
|
||||
func (s *Service) migrate() error {
|
||||
schema := `
|
||||
-- Raw events (7-day retention)
|
||||
CREATE TABLE IF NOT EXISTS events (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
app_id TEXT NOT NULL,
|
||||
device_id TEXT NOT NULL,
|
||||
session_id TEXT,
|
||||
event_type TEXT NOT NULL,
|
||||
event_data TEXT,
|
||||
app_version TEXT,
|
||||
mosis_version TEXT,
|
||||
timestamp TEXT NOT NULL
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_events_app_time ON events(app_id, timestamp);
|
||||
CREATE INDEX IF NOT EXISTS idx_events_type ON events(event_type, timestamp);
|
||||
CREATE INDEX IF NOT EXISTS idx_events_cleanup ON events(timestamp);
|
||||
|
||||
-- Hourly aggregates
|
||||
CREATE TABLE IF NOT EXISTS hourly_stats (
|
||||
app_id TEXT NOT NULL,
|
||||
hour TEXT NOT NULL,
|
||||
event_type TEXT NOT NULL,
|
||||
count INTEGER NOT NULL,
|
||||
unique_devices INTEGER NOT NULL,
|
||||
PRIMARY KEY (app_id, hour, event_type)
|
||||
);
|
||||
|
||||
-- Daily aggregates
|
||||
CREATE TABLE IF NOT EXISTS daily_stats (
|
||||
app_id TEXT NOT NULL,
|
||||
date TEXT NOT NULL,
|
||||
event_type TEXT NOT NULL,
|
||||
count INTEGER NOT NULL,
|
||||
unique_devices INTEGER NOT NULL,
|
||||
PRIMARY KEY (app_id, date, event_type)
|
||||
);
|
||||
|
||||
-- Crash groups (deduplicated by fingerprint)
|
||||
CREATE TABLE IF NOT EXISTS crash_groups (
|
||||
id TEXT PRIMARY KEY,
|
||||
app_id TEXT NOT NULL,
|
||||
fingerprint TEXT NOT NULL,
|
||||
crash_type TEXT NOT NULL,
|
||||
message TEXT,
|
||||
sample_stack_trace TEXT,
|
||||
first_seen TEXT NOT NULL,
|
||||
last_seen TEXT NOT NULL,
|
||||
occurrence_count INTEGER DEFAULT 1,
|
||||
affected_versions TEXT,
|
||||
status TEXT DEFAULT 'open',
|
||||
UNIQUE(app_id, fingerprint)
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_crashes_app ON crash_groups(app_id, status);
|
||||
|
||||
-- Individual crash occurrences (for recent list)
|
||||
CREATE TABLE IF NOT EXISTS crash_occurrences (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
crash_group_id TEXT NOT NULL,
|
||||
device_id TEXT NOT NULL,
|
||||
app_version TEXT,
|
||||
context TEXT,
|
||||
timestamp TEXT NOT NULL,
|
||||
FOREIGN KEY (crash_group_id) REFERENCES crash_groups(id)
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_occurrences_group ON crash_occurrences(crash_group_id, timestamp);
|
||||
`
|
||||
|
||||
_, err := s.db.Exec(schema)
|
||||
return err
|
||||
}
|
||||
|
||||
// Close closes the telemetry database
|
||||
func (s *Service) Close() error {
|
||||
close(s.stopCh)
|
||||
return s.db.Close()
|
||||
}
|
||||
|
||||
// RecordEvents records a batch of events
|
||||
func (s *Service) RecordEvents(ctx context.Context, batch *EventBatch) (int, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
tx, err := s.db.BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer tx.Rollback()
|
||||
|
||||
stmt, err := tx.PrepareContext(ctx, `
|
||||
INSERT INTO events (app_id, device_id, session_id, event_type, event_data, app_version, mosis_version, timestamp)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
||||
`)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer stmt.Close()
|
||||
|
||||
count := 0
|
||||
for _, event := range batch.Events {
|
||||
var eventData string
|
||||
if event.Data != nil {
|
||||
eventData = string(event.Data)
|
||||
}
|
||||
|
||||
_, err := stmt.ExecContext(ctx,
|
||||
batch.AppID,
|
||||
batch.DeviceID,
|
||||
batch.SessionID,
|
||||
event.Type,
|
||||
eventData,
|
||||
batch.AppVersion,
|
||||
batch.MosisVersion,
|
||||
event.Timestamp,
|
||||
)
|
||||
if err != nil {
|
||||
log.Printf("Failed to insert event: %v", err)
|
||||
continue
|
||||
}
|
||||
count++
|
||||
}
|
||||
|
||||
if err := tx.Commit(); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return count, nil
|
||||
}
|
||||
|
||||
// RecordCrash records a crash report
|
||||
func (s *Service) RecordCrash(ctx context.Context, report *CrashReport) (string, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
// Generate fingerprint for crash grouping
|
||||
fingerprint := s.fingerprintCrash(report)
|
||||
groupID := generateID()
|
||||
|
||||
// Try to find existing crash group
|
||||
var existingID string
|
||||
var affectedVersions string
|
||||
err := s.db.QueryRowContext(ctx, `
|
||||
SELECT id, affected_versions FROM crash_groups WHERE app_id = ? AND fingerprint = ?
|
||||
`, report.AppID, fingerprint).Scan(&existingID, &affectedVersions)
|
||||
|
||||
if err == sql.ErrNoRows {
|
||||
// Create new crash group
|
||||
versions, _ := json.Marshal([]string{report.AppVersion})
|
||||
_, err = s.db.ExecContext(ctx, `
|
||||
INSERT INTO crash_groups (id, app_id, fingerprint, crash_type, message, sample_stack_trace, first_seen, last_seen, occurrence_count, affected_versions, status)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, 1, ?, 'open')
|
||||
`, groupID, report.AppID, fingerprint, report.Crash.Type, report.Crash.Message, report.Crash.StackTrace, report.Timestamp, report.Timestamp, string(versions))
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
} else if err != nil {
|
||||
return "", err
|
||||
} else {
|
||||
// Update existing crash group
|
||||
groupID = existingID
|
||||
|
||||
// Add version if not already in list
|
||||
var versions []string
|
||||
json.Unmarshal([]byte(affectedVersions), &versions)
|
||||
if !contains(versions, report.AppVersion) {
|
||||
versions = append(versions, report.AppVersion)
|
||||
}
|
||||
versionsJSON, _ := json.Marshal(versions)
|
||||
|
||||
_, err = s.db.ExecContext(ctx, `
|
||||
UPDATE crash_groups SET
|
||||
last_seen = ?,
|
||||
occurrence_count = occurrence_count + 1,
|
||||
affected_versions = ?,
|
||||
status = CASE WHEN status = 'resolved' THEN 'open' ELSE status END
|
||||
WHERE id = ?
|
||||
`, report.Timestamp, string(versionsJSON), groupID)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
}
|
||||
|
||||
// Record individual occurrence
|
||||
contextJSON, _ := json.Marshal(report.Crash.Context)
|
||||
_, err = s.db.ExecContext(ctx, `
|
||||
INSERT INTO crash_occurrences (crash_group_id, device_id, app_version, context, timestamp)
|
||||
VALUES (?, ?, ?, ?, ?)
|
||||
`, groupID, report.DeviceID, report.AppVersion, string(contextJSON), report.Timestamp)
|
||||
|
||||
return groupID, err
|
||||
}
|
||||
|
||||
// fingerprintCrash generates a unique fingerprint for crash grouping
|
||||
func (s *Service) fingerprintCrash(report *CrashReport) string {
|
||||
// Normalize stack trace (remove line numbers)
|
||||
normalized := normalizeStackTrace(report.Crash.StackTrace)
|
||||
|
||||
// Create fingerprint from type, message, and normalized stack
|
||||
key := fmt.Sprintf("%s:%s:%s", report.Crash.Type, report.Crash.Message, normalized)
|
||||
hash := sha256.Sum256([]byte(key))
|
||||
return hex.EncodeToString(hash[:8])
|
||||
}
|
||||
|
||||
// normalizeStackTrace removes line numbers for consistent fingerprinting
|
||||
func normalizeStackTrace(stack string) string {
|
||||
re := regexp.MustCompile(`:\d+:`)
|
||||
return re.ReplaceAllString(stack, ":?:")
|
||||
}
|
||||
|
||||
// GetAnalyticsOverview returns analytics summary for an app
|
||||
func (s *Service) GetAnalyticsOverview(ctx context.Context, appID string, days int) (*AnalyticsOverview, error) {
|
||||
endDate := time.Now()
|
||||
startDate := endDate.AddDate(0, 0, -days)
|
||||
prevStartDate := startDate.AddDate(0, 0, -days)
|
||||
|
||||
// Current period DAU (average)
|
||||
var currentDAU float64
|
||||
err := s.db.QueryRowContext(ctx, `
|
||||
SELECT COALESCE(AVG(unique_devices), 0) FROM daily_stats
|
||||
WHERE app_id = ? AND event_type = 'app_start' AND date >= ? AND date <= ?
|
||||
`, appID, startDate.Format("2006-01-02"), endDate.Format("2006-01-02")).Scan(¤tDAU)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Previous period DAU
|
||||
var prevDAU float64
|
||||
s.db.QueryRowContext(ctx, `
|
||||
SELECT COALESCE(AVG(unique_devices), 0) FROM daily_stats
|
||||
WHERE app_id = ? AND event_type = 'app_start' AND date >= ? AND date < ?
|
||||
`, appID, prevStartDate.Format("2006-01-02"), startDate.Format("2006-01-02")).Scan(&prevDAU)
|
||||
|
||||
// Current crashes
|
||||
var currentCrashes int
|
||||
s.db.QueryRowContext(ctx, `
|
||||
SELECT COALESCE(SUM(count), 0) FROM daily_stats
|
||||
WHERE app_id = ? AND event_type = 'app_crash' AND date >= ? AND date <= ?
|
||||
`, appID, startDate.Format("2006-01-02"), endDate.Format("2006-01-02")).Scan(¤tCrashes)
|
||||
|
||||
// Previous crashes
|
||||
var prevCrashes int
|
||||
s.db.QueryRowContext(ctx, `
|
||||
SELECT COALESCE(SUM(count), 0) FROM daily_stats
|
||||
WHERE app_id = ? AND event_type = 'app_crash' AND date >= ? AND date < ?
|
||||
`, appID, prevStartDate.Format("2006-01-02"), startDate.Format("2006-01-02")).Scan(&prevCrashes)
|
||||
|
||||
// Total sessions
|
||||
var totalSessions int
|
||||
s.db.QueryRowContext(ctx, `
|
||||
SELECT COALESCE(SUM(count), 0) FROM daily_stats
|
||||
WHERE app_id = ? AND event_type = 'app_start' AND date >= ? AND date <= ?
|
||||
`, appID, startDate.Format("2006-01-02"), endDate.Format("2006-01-02")).Scan(&totalSessions)
|
||||
|
||||
// Calculate changes
|
||||
dauChange := 0.0
|
||||
if prevDAU > 0 {
|
||||
dauChange = ((currentDAU - prevDAU) / prevDAU) * 100
|
||||
}
|
||||
|
||||
crashChange := 0.0
|
||||
if prevCrashes > 0 {
|
||||
crashChange = ((float64(currentCrashes) - float64(prevCrashes)) / float64(prevCrashes)) * 100
|
||||
}
|
||||
|
||||
crashFreeRate := 100.0
|
||||
if totalSessions > 0 {
|
||||
crashFreeRate = (1 - float64(currentCrashes)/float64(totalSessions)) * 100
|
||||
if crashFreeRate < 0 {
|
||||
crashFreeRate = 0
|
||||
}
|
||||
}
|
||||
|
||||
return &AnalyticsOverview{
|
||||
DAU: int(currentDAU),
|
||||
DAUChange: dauChange,
|
||||
TotalCrashes: currentCrashes,
|
||||
CrashChange: crashChange,
|
||||
CrashFreeRate: crashFreeRate,
|
||||
TotalSessions: totalSessions,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// GetDailyStats returns daily statistics for an app
|
||||
func (s *Service) GetDailyStats(ctx context.Context, appID, eventType string, days int) ([]DailyStats, error) {
|
||||
startDate := time.Now().AddDate(0, 0, -days).Format("2006-01-02")
|
||||
|
||||
query := `
|
||||
SELECT app_id, date, event_type, count, unique_devices
|
||||
FROM daily_stats
|
||||
WHERE app_id = ? AND date >= ?
|
||||
`
|
||||
args := []interface{}{appID, startDate}
|
||||
|
||||
if eventType != "" {
|
||||
query += " AND event_type = ?"
|
||||
args = append(args, eventType)
|
||||
}
|
||||
query += " ORDER BY date ASC"
|
||||
|
||||
rows, err := s.db.QueryContext(ctx, query, args...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var stats []DailyStats
|
||||
for rows.Next() {
|
||||
var s DailyStats
|
||||
if err := rows.Scan(&s.AppID, &s.Date, &s.EventType, &s.Count, &s.UniqueDevices); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
stats = append(stats, s)
|
||||
}
|
||||
|
||||
return stats, rows.Err()
|
||||
}
|
||||
|
||||
// GetCrashGroups returns crash groups for an app
|
||||
func (s *Service) GetCrashGroups(ctx context.Context, appID, status string, limit, offset int) ([]CrashGroup, int, error) {
|
||||
// Count total
|
||||
countQuery := "SELECT COUNT(*) FROM crash_groups WHERE app_id = ?"
|
||||
args := []interface{}{appID}
|
||||
if status != "" {
|
||||
countQuery += " AND status = ?"
|
||||
args = append(args, status)
|
||||
}
|
||||
|
||||
var total int
|
||||
if err := s.db.QueryRowContext(ctx, countQuery, args...).Scan(&total); err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
// Fetch groups
|
||||
query := `
|
||||
SELECT id, app_id, fingerprint, crash_type, message, sample_stack_trace,
|
||||
first_seen, last_seen, occurrence_count, affected_versions, status
|
||||
FROM crash_groups WHERE app_id = ?
|
||||
`
|
||||
args = []interface{}{appID}
|
||||
if status != "" {
|
||||
query += " AND status = ?"
|
||||
args = append(args, status)
|
||||
}
|
||||
query += " ORDER BY last_seen DESC LIMIT ? OFFSET ?"
|
||||
args = append(args, limit, offset)
|
||||
|
||||
rows, err := s.db.QueryContext(ctx, query, args...)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var groups []CrashGroup
|
||||
for rows.Next() {
|
||||
var g CrashGroup
|
||||
var versionsJSON string
|
||||
if err := rows.Scan(&g.ID, &g.AppID, &g.Fingerprint, &g.CrashType, &g.Message,
|
||||
&g.SampleStackTrace, &g.FirstSeen, &g.LastSeen, &g.OccurrenceCount,
|
||||
&versionsJSON, &g.Status); err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
json.Unmarshal([]byte(versionsJSON), &g.AffectedVersions)
|
||||
groups = append(groups, g)
|
||||
}
|
||||
|
||||
return groups, total, rows.Err()
|
||||
}
|
||||
|
||||
// GetCrashGroup returns a single crash group with recent occurrences
|
||||
func (s *Service) GetCrashGroup(ctx context.Context, appID, groupID string) (*CrashGroup, error) {
|
||||
var g CrashGroup
|
||||
var versionsJSON string
|
||||
err := s.db.QueryRowContext(ctx, `
|
||||
SELECT id, app_id, fingerprint, crash_type, message, sample_stack_trace,
|
||||
first_seen, last_seen, occurrence_count, affected_versions, status
|
||||
FROM crash_groups WHERE app_id = ? AND id = ?
|
||||
`, appID, groupID).Scan(&g.ID, &g.AppID, &g.Fingerprint, &g.CrashType, &g.Message,
|
||||
&g.SampleStackTrace, &g.FirstSeen, &g.LastSeen, &g.OccurrenceCount,
|
||||
&versionsJSON, &g.Status)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
json.Unmarshal([]byte(versionsJSON), &g.AffectedVersions)
|
||||
return &g, nil
|
||||
}
|
||||
|
||||
// UpdateCrashGroupStatus updates the status of a crash group
|
||||
func (s *Service) UpdateCrashGroupStatus(ctx context.Context, appID, groupID, status string) error {
|
||||
_, err := s.db.ExecContext(ctx, `
|
||||
UPDATE crash_groups SET status = ? WHERE app_id = ? AND id = ?
|
||||
`, status, appID, groupID)
|
||||
return err
|
||||
}
|
||||
|
||||
// StartBackgroundWorkers starts the aggregation and cleanup workers
|
||||
func (s *Service) StartBackgroundWorkers(ctx context.Context) {
|
||||
// Hourly aggregation
|
||||
go s.runPeriodic(ctx, time.Hour, "hourly aggregation", s.aggregateHourly)
|
||||
|
||||
// Daily aggregation at 2am
|
||||
go s.runDaily(ctx, 2, "daily aggregation", s.aggregateDaily)
|
||||
|
||||
// Cleanup old events at 3am
|
||||
go s.runDaily(ctx, 3, "event cleanup", s.cleanupOldEvents)
|
||||
|
||||
log.Println("Telemetry background workers started")
|
||||
}
|
||||
|
||||
func (s *Service) runPeriodic(ctx context.Context, interval time.Duration, name string, fn func(context.Context) error) {
|
||||
ticker := time.NewTicker(interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-s.stopCh:
|
||||
return
|
||||
case <-ticker.C:
|
||||
if err := fn(ctx); err != nil {
|
||||
log.Printf("Telemetry %s error: %v", name, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) runDaily(ctx context.Context, hour int, name string, fn func(context.Context) error) {
|
||||
for {
|
||||
now := time.Now()
|
||||
next := time.Date(now.Year(), now.Month(), now.Day(), hour, 0, 0, 0, now.Location())
|
||||
if next.Before(now) {
|
||||
next = next.Add(24 * time.Hour)
|
||||
}
|
||||
wait := next.Sub(now)
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-s.stopCh:
|
||||
return
|
||||
case <-time.After(wait):
|
||||
if err := fn(ctx); err != nil {
|
||||
log.Printf("Telemetry %s error: %v", name, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) aggregateHourly(ctx context.Context) error {
|
||||
hour := time.Now().Add(-time.Hour).Format("2006-01-02T15")
|
||||
|
||||
_, err := s.db.ExecContext(ctx, `
|
||||
INSERT OR REPLACE INTO hourly_stats (app_id, hour, event_type, count, unique_devices)
|
||||
SELECT
|
||||
app_id,
|
||||
strftime('%Y-%m-%dT%H', timestamp) as hour,
|
||||
event_type,
|
||||
COUNT(*) as count,
|
||||
COUNT(DISTINCT device_id) as unique_devices
|
||||
FROM events
|
||||
WHERE strftime('%Y-%m-%dT%H', timestamp) = ?
|
||||
GROUP BY app_id, hour, event_type
|
||||
`, hour)
|
||||
|
||||
if err == nil {
|
||||
log.Printf("Telemetry: hourly aggregation completed for %s", hour)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *Service) aggregateDaily(ctx context.Context) error {
|
||||
yesterday := time.Now().AddDate(0, 0, -1).Format("2006-01-02")
|
||||
|
||||
_, err := s.db.ExecContext(ctx, `
|
||||
INSERT OR REPLACE INTO daily_stats (app_id, date, event_type, count, unique_devices)
|
||||
SELECT
|
||||
app_id,
|
||||
? as date,
|
||||
event_type,
|
||||
SUM(count) as count,
|
||||
SUM(unique_devices) as unique_devices
|
||||
FROM hourly_stats
|
||||
WHERE hour LIKE ? || 'T%'
|
||||
GROUP BY app_id, event_type
|
||||
`, yesterday, yesterday)
|
||||
|
||||
if err == nil {
|
||||
log.Printf("Telemetry: daily aggregation completed for %s", yesterday)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *Service) cleanupOldEvents(ctx context.Context) error {
|
||||
cutoff := time.Now().AddDate(0, 0, -7).Format(time.RFC3339)
|
||||
|
||||
result, err := s.db.ExecContext(ctx, "DELETE FROM events WHERE timestamp < ?", cutoff)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
deleted, _ := result.RowsAffected()
|
||||
log.Printf("Telemetry: cleaned up %d old events", deleted)
|
||||
|
||||
// Also clean old crash occurrences (90 days)
|
||||
crashCutoff := time.Now().AddDate(0, 0, -90).Format(time.RFC3339)
|
||||
s.db.ExecContext(ctx, "DELETE FROM crash_occurrences WHERE timestamp < ?", crashCutoff)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Helper functions
|
||||
func generateID() string {
|
||||
hash := sha256.Sum256([]byte(fmt.Sprintf("%d", time.Now().UnixNano())))
|
||||
return hex.EncodeToString(hash[:8])
|
||||
}
|
||||
|
||||
func contains(slice []string, item string) bool {
|
||||
for _, s := range slice {
|
||||
if s == item {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// TriggerAggregation manually triggers aggregation (useful for testing)
|
||||
func (s *Service) TriggerAggregation(ctx context.Context) error {
|
||||
if err := s.aggregateHourly(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
return s.aggregateDaily(ctx)
|
||||
}
|
||||
Reference in New Issue
Block a user