// Package telemetry provides app analytics and crash reporting package telemetry import ( "context" "crypto/sha256" "database/sql" "encoding/hex" "encoding/json" "fmt" "log" "regexp" "strings" "sync" "time" _ "modernc.org/sqlite" ) // Event types const ( EventAppStart = "app_start" EventAppStop = "app_stop" EventAppCrash = "app_crash" EventLuaError = "lua_error" EventPerfFrame = "perf_frame" EventPerfMemory = "perf_memory" EventPerfStartup = "perf_startup" EventScreenView = "screen_view" EventFeatureUsed = "feature_used" ) // Event represents a telemetry event type Event struct { Type string `json:"type"` Timestamp string `json:"timestamp"` Data json.RawMessage `json:"data,omitempty"` } // EventBatch represents a batch of events from a device type EventBatch struct { AppID string `json:"app_id"` AppVersion string `json:"app_version"` MosisVersion string `json:"mosis_version"` DeviceID string `json:"device_id"` SessionID string `json:"session_id,omitempty"` Events []Event `json:"events"` } // CrashReport represents a crash report from a device type CrashReport struct { AppID string `json:"app_id"` AppVersion string `json:"app_version"` MosisVersion string `json:"mosis_version"` DeviceID string `json:"device_id"` Timestamp string `json:"timestamp"` Crash CrashDetails `json:"crash"` } // CrashDetails contains crash information type CrashDetails struct { Type string `json:"type"` Message string `json:"message"` StackTrace string `json:"stack_trace"` Context map[string]interface{} `json:"context,omitempty"` } // CrashGroup represents a group of similar crashes type CrashGroup struct { ID string `json:"id"` AppID string `json:"app_id"` Fingerprint string `json:"fingerprint"` CrashType string `json:"crash_type"` Message string `json:"message"` SampleStackTrace string `json:"sample_stack_trace"` FirstSeen string `json:"first_seen"` LastSeen string `json:"last_seen"` OccurrenceCount int `json:"occurrence_count"` AffectedVersions []string `json:"affected_versions"` Status string `json:"status"` // open, resolved, ignored } // DailyStats represents aggregated daily statistics type DailyStats struct { AppID string `json:"app_id"` Date string `json:"date"` EventType string `json:"event_type"` Count int `json:"count"` UniqueDevices int `json:"unique_devices"` } // AnalyticsOverview represents the analytics summary for an app type AnalyticsOverview struct { DAU int `json:"dau"` DAUChange float64 `json:"dau_change"` TotalCrashes int `json:"total_crashes"` CrashChange float64 `json:"crash_change"` CrashFreeRate float64 `json:"crash_free_rate"` TotalSessions int `json:"total_sessions"` } // Service handles telemetry operations type Service struct { db *sql.DB dbPath string mu sync.Mutex stopCh chan struct{} } // New creates a new telemetry service with a separate database func New(dbPath string) (*Service, error) { db, err := sql.Open("sqlite", dbPath+"?_journal_mode=WAL&_busy_timeout=5000") if err != nil { return nil, fmt.Errorf("open telemetry db: %w", err) } s := &Service{ db: db, dbPath: dbPath, stopCh: make(chan struct{}), } if err := s.migrate(); err != nil { db.Close() return nil, fmt.Errorf("migrate telemetry db: %w", err) } return s, nil } // migrate creates the telemetry database schema func (s *Service) migrate() error { schema := ` -- Raw events (7-day retention) CREATE TABLE IF NOT EXISTS events ( id INTEGER PRIMARY KEY AUTOINCREMENT, app_id TEXT NOT NULL, device_id TEXT NOT NULL, session_id TEXT, event_type TEXT NOT NULL, event_data TEXT, app_version TEXT, mosis_version TEXT, timestamp TEXT NOT NULL ); CREATE INDEX IF NOT EXISTS idx_events_app_time ON events(app_id, timestamp); CREATE INDEX IF NOT EXISTS idx_events_type ON events(event_type, timestamp); CREATE INDEX IF NOT EXISTS idx_events_cleanup ON events(timestamp); -- Hourly aggregates CREATE TABLE IF NOT EXISTS hourly_stats ( app_id TEXT NOT NULL, hour TEXT NOT NULL, event_type TEXT NOT NULL, count INTEGER NOT NULL, unique_devices INTEGER NOT NULL, PRIMARY KEY (app_id, hour, event_type) ); -- Daily aggregates CREATE TABLE IF NOT EXISTS daily_stats ( app_id TEXT NOT NULL, date TEXT NOT NULL, event_type TEXT NOT NULL, count INTEGER NOT NULL, unique_devices INTEGER NOT NULL, PRIMARY KEY (app_id, date, event_type) ); -- Crash groups (deduplicated by fingerprint) CREATE TABLE IF NOT EXISTS crash_groups ( id TEXT PRIMARY KEY, app_id TEXT NOT NULL, fingerprint TEXT NOT NULL, crash_type TEXT NOT NULL, message TEXT, sample_stack_trace TEXT, first_seen TEXT NOT NULL, last_seen TEXT NOT NULL, occurrence_count INTEGER DEFAULT 1, affected_versions TEXT, status TEXT DEFAULT 'open', UNIQUE(app_id, fingerprint) ); CREATE INDEX IF NOT EXISTS idx_crashes_app ON crash_groups(app_id, status); -- Individual crash occurrences (for recent list) CREATE TABLE IF NOT EXISTS crash_occurrences ( id INTEGER PRIMARY KEY AUTOINCREMENT, crash_group_id TEXT NOT NULL, device_id TEXT NOT NULL, app_version TEXT, context TEXT, timestamp TEXT NOT NULL, FOREIGN KEY (crash_group_id) REFERENCES crash_groups(id) ); CREATE INDEX IF NOT EXISTS idx_occurrences_group ON crash_occurrences(crash_group_id, timestamp); ` _, err := s.db.Exec(schema) return err } // Close closes the telemetry database func (s *Service) Close() error { close(s.stopCh) return s.db.Close() } // RecordEvents records a batch of events func (s *Service) RecordEvents(ctx context.Context, batch *EventBatch) (int, error) { s.mu.Lock() defer s.mu.Unlock() tx, err := s.db.BeginTx(ctx, nil) if err != nil { return 0, err } defer tx.Rollback() stmt, err := tx.PrepareContext(ctx, ` INSERT INTO events (app_id, device_id, session_id, event_type, event_data, app_version, mosis_version, timestamp) VALUES (?, ?, ?, ?, ?, ?, ?, ?) `) if err != nil { return 0, err } defer stmt.Close() count := 0 for _, event := range batch.Events { var eventData string if event.Data != nil { eventData = string(event.Data) } _, err := stmt.ExecContext(ctx, batch.AppID, batch.DeviceID, batch.SessionID, event.Type, eventData, batch.AppVersion, batch.MosisVersion, event.Timestamp, ) if err != nil { log.Printf("Failed to insert event: %v", err) continue } count++ } if err := tx.Commit(); err != nil { return 0, err } return count, nil } // RecordCrash records a crash report func (s *Service) RecordCrash(ctx context.Context, report *CrashReport) (string, error) { s.mu.Lock() defer s.mu.Unlock() // Generate fingerprint for crash grouping fingerprint := s.fingerprintCrash(report) groupID := generateID() // Try to find existing crash group var existingID string var affectedVersions string err := s.db.QueryRowContext(ctx, ` SELECT id, affected_versions FROM crash_groups WHERE app_id = ? AND fingerprint = ? `, report.AppID, fingerprint).Scan(&existingID, &affectedVersions) if err == sql.ErrNoRows { // Create new crash group versions, _ := json.Marshal([]string{report.AppVersion}) _, err = s.db.ExecContext(ctx, ` INSERT INTO crash_groups (id, app_id, fingerprint, crash_type, message, sample_stack_trace, first_seen, last_seen, occurrence_count, affected_versions, status) VALUES (?, ?, ?, ?, ?, ?, ?, ?, 1, ?, 'open') `, groupID, report.AppID, fingerprint, report.Crash.Type, report.Crash.Message, report.Crash.StackTrace, report.Timestamp, report.Timestamp, string(versions)) if err != nil { return "", err } } else if err != nil { return "", err } else { // Update existing crash group groupID = existingID // Add version if not already in list var versions []string json.Unmarshal([]byte(affectedVersions), &versions) if !contains(versions, report.AppVersion) { versions = append(versions, report.AppVersion) } versionsJSON, _ := json.Marshal(versions) _, err = s.db.ExecContext(ctx, ` UPDATE crash_groups SET last_seen = ?, occurrence_count = occurrence_count + 1, affected_versions = ?, status = CASE WHEN status = 'resolved' THEN 'open' ELSE status END WHERE id = ? `, report.Timestamp, string(versionsJSON), groupID) if err != nil { return "", err } } // Record individual occurrence contextJSON, _ := json.Marshal(report.Crash.Context) _, err = s.db.ExecContext(ctx, ` INSERT INTO crash_occurrences (crash_group_id, device_id, app_version, context, timestamp) VALUES (?, ?, ?, ?, ?) `, groupID, report.DeviceID, report.AppVersion, string(contextJSON), report.Timestamp) return groupID, err } // fingerprintCrash generates a unique fingerprint for crash grouping func (s *Service) fingerprintCrash(report *CrashReport) string { // Normalize stack trace (remove line numbers) normalized := normalizeStackTrace(report.Crash.StackTrace) // Create fingerprint from type, message, and normalized stack key := fmt.Sprintf("%s:%s:%s", report.Crash.Type, report.Crash.Message, normalized) hash := sha256.Sum256([]byte(key)) return hex.EncodeToString(hash[:8]) } // normalizeStackTrace removes line numbers for consistent fingerprinting func normalizeStackTrace(stack string) string { re := regexp.MustCompile(`:\d+:`) return re.ReplaceAllString(stack, ":?:") } // GetAnalyticsOverview returns analytics summary for an app func (s *Service) GetAnalyticsOverview(ctx context.Context, appID string, days int) (*AnalyticsOverview, error) { endDate := time.Now() startDate := endDate.AddDate(0, 0, -days) prevStartDate := startDate.AddDate(0, 0, -days) // Current period DAU (average) var currentDAU float64 err := s.db.QueryRowContext(ctx, ` SELECT COALESCE(AVG(unique_devices), 0) FROM daily_stats WHERE app_id = ? AND event_type = 'app_start' AND date >= ? AND date <= ? `, appID, startDate.Format("2006-01-02"), endDate.Format("2006-01-02")).Scan(¤tDAU) if err != nil { return nil, err } // Previous period DAU var prevDAU float64 s.db.QueryRowContext(ctx, ` SELECT COALESCE(AVG(unique_devices), 0) FROM daily_stats WHERE app_id = ? AND event_type = 'app_start' AND date >= ? AND date < ? `, appID, prevStartDate.Format("2006-01-02"), startDate.Format("2006-01-02")).Scan(&prevDAU) // Current crashes var currentCrashes int s.db.QueryRowContext(ctx, ` SELECT COALESCE(SUM(count), 0) FROM daily_stats WHERE app_id = ? AND event_type = 'app_crash' AND date >= ? AND date <= ? `, appID, startDate.Format("2006-01-02"), endDate.Format("2006-01-02")).Scan(¤tCrashes) // Previous crashes var prevCrashes int s.db.QueryRowContext(ctx, ` SELECT COALESCE(SUM(count), 0) FROM daily_stats WHERE app_id = ? AND event_type = 'app_crash' AND date >= ? AND date < ? `, appID, prevStartDate.Format("2006-01-02"), startDate.Format("2006-01-02")).Scan(&prevCrashes) // Total sessions var totalSessions int s.db.QueryRowContext(ctx, ` SELECT COALESCE(SUM(count), 0) FROM daily_stats WHERE app_id = ? AND event_type = 'app_start' AND date >= ? AND date <= ? `, appID, startDate.Format("2006-01-02"), endDate.Format("2006-01-02")).Scan(&totalSessions) // Calculate changes dauChange := 0.0 if prevDAU > 0 { dauChange = ((currentDAU - prevDAU) / prevDAU) * 100 } crashChange := 0.0 if prevCrashes > 0 { crashChange = ((float64(currentCrashes) - float64(prevCrashes)) / float64(prevCrashes)) * 100 } crashFreeRate := 100.0 if totalSessions > 0 { crashFreeRate = (1 - float64(currentCrashes)/float64(totalSessions)) * 100 if crashFreeRate < 0 { crashFreeRate = 0 } } return &AnalyticsOverview{ DAU: int(currentDAU), DAUChange: dauChange, TotalCrashes: currentCrashes, CrashChange: crashChange, CrashFreeRate: crashFreeRate, TotalSessions: totalSessions, }, nil } // GetDailyStats returns daily statistics for an app func (s *Service) GetDailyStats(ctx context.Context, appID, eventType string, days int) ([]DailyStats, error) { startDate := time.Now().AddDate(0, 0, -days).Format("2006-01-02") query := ` SELECT app_id, date, event_type, count, unique_devices FROM daily_stats WHERE app_id = ? AND date >= ? ` args := []interface{}{appID, startDate} if eventType != "" { query += " AND event_type = ?" args = append(args, eventType) } query += " ORDER BY date ASC" rows, err := s.db.QueryContext(ctx, query, args...) if err != nil { return nil, err } defer rows.Close() var stats []DailyStats for rows.Next() { var s DailyStats if err := rows.Scan(&s.AppID, &s.Date, &s.EventType, &s.Count, &s.UniqueDevices); err != nil { return nil, err } stats = append(stats, s) } return stats, rows.Err() } // GetCrashGroups returns crash groups for an app func (s *Service) GetCrashGroups(ctx context.Context, appID, status string, limit, offset int) ([]CrashGroup, int, error) { // Count total countQuery := "SELECT COUNT(*) FROM crash_groups WHERE app_id = ?" args := []interface{}{appID} if status != "" { countQuery += " AND status = ?" args = append(args, status) } var total int if err := s.db.QueryRowContext(ctx, countQuery, args...).Scan(&total); err != nil { return nil, 0, err } // Fetch groups query := ` SELECT id, app_id, fingerprint, crash_type, message, sample_stack_trace, first_seen, last_seen, occurrence_count, affected_versions, status FROM crash_groups WHERE app_id = ? ` args = []interface{}{appID} if status != "" { query += " AND status = ?" args = append(args, status) } query += " ORDER BY last_seen DESC LIMIT ? OFFSET ?" args = append(args, limit, offset) rows, err := s.db.QueryContext(ctx, query, args...) if err != nil { return nil, 0, err } defer rows.Close() var groups []CrashGroup for rows.Next() { var g CrashGroup var versionsJSON string if err := rows.Scan(&g.ID, &g.AppID, &g.Fingerprint, &g.CrashType, &g.Message, &g.SampleStackTrace, &g.FirstSeen, &g.LastSeen, &g.OccurrenceCount, &versionsJSON, &g.Status); err != nil { return nil, 0, err } json.Unmarshal([]byte(versionsJSON), &g.AffectedVersions) groups = append(groups, g) } return groups, total, rows.Err() } // GetCrashGroup returns a single crash group with recent occurrences func (s *Service) GetCrashGroup(ctx context.Context, appID, groupID string) (*CrashGroup, error) { var g CrashGroup var versionsJSON string err := s.db.QueryRowContext(ctx, ` SELECT id, app_id, fingerprint, crash_type, message, sample_stack_trace, first_seen, last_seen, occurrence_count, affected_versions, status FROM crash_groups WHERE app_id = ? AND id = ? `, appID, groupID).Scan(&g.ID, &g.AppID, &g.Fingerprint, &g.CrashType, &g.Message, &g.SampleStackTrace, &g.FirstSeen, &g.LastSeen, &g.OccurrenceCount, &versionsJSON, &g.Status) if err != nil { return nil, err } json.Unmarshal([]byte(versionsJSON), &g.AffectedVersions) return &g, nil } // UpdateCrashGroupStatus updates the status of a crash group func (s *Service) UpdateCrashGroupStatus(ctx context.Context, appID, groupID, status string) error { _, err := s.db.ExecContext(ctx, ` UPDATE crash_groups SET status = ? WHERE app_id = ? AND id = ? `, status, appID, groupID) return err } // StartBackgroundWorkers starts the aggregation and cleanup workers func (s *Service) StartBackgroundWorkers(ctx context.Context) { // Hourly aggregation go s.runPeriodic(ctx, time.Hour, "hourly aggregation", s.aggregateHourly) // Daily aggregation at 2am go s.runDaily(ctx, 2, "daily aggregation", s.aggregateDaily) // Cleanup old events at 3am go s.runDaily(ctx, 3, "event cleanup", s.cleanupOldEvents) log.Println("Telemetry background workers started") } func (s *Service) runPeriodic(ctx context.Context, interval time.Duration, name string, fn func(context.Context) error) { ticker := time.NewTicker(interval) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-s.stopCh: return case <-ticker.C: if err := fn(ctx); err != nil { log.Printf("Telemetry %s error: %v", name, err) } } } } func (s *Service) runDaily(ctx context.Context, hour int, name string, fn func(context.Context) error) { for { now := time.Now() next := time.Date(now.Year(), now.Month(), now.Day(), hour, 0, 0, 0, now.Location()) if next.Before(now) { next = next.Add(24 * time.Hour) } wait := next.Sub(now) select { case <-ctx.Done(): return case <-s.stopCh: return case <-time.After(wait): if err := fn(ctx); err != nil { log.Printf("Telemetry %s error: %v", name, err) } } } } func (s *Service) aggregateHourly(ctx context.Context) error { hour := time.Now().Add(-time.Hour).Format("2006-01-02T15") _, err := s.db.ExecContext(ctx, ` INSERT OR REPLACE INTO hourly_stats (app_id, hour, event_type, count, unique_devices) SELECT app_id, strftime('%Y-%m-%dT%H', timestamp) as hour, event_type, COUNT(*) as count, COUNT(DISTINCT device_id) as unique_devices FROM events WHERE strftime('%Y-%m-%dT%H', timestamp) = ? GROUP BY app_id, hour, event_type `, hour) if err == nil { log.Printf("Telemetry: hourly aggregation completed for %s", hour) } return err } func (s *Service) aggregateDaily(ctx context.Context) error { yesterday := time.Now().AddDate(0, 0, -1).Format("2006-01-02") _, err := s.db.ExecContext(ctx, ` INSERT OR REPLACE INTO daily_stats (app_id, date, event_type, count, unique_devices) SELECT app_id, ? as date, event_type, SUM(count) as count, SUM(unique_devices) as unique_devices FROM hourly_stats WHERE hour LIKE ? || 'T%' GROUP BY app_id, event_type `, yesterday, yesterday) if err == nil { log.Printf("Telemetry: daily aggregation completed for %s", yesterday) } return err } func (s *Service) cleanupOldEvents(ctx context.Context) error { cutoff := time.Now().AddDate(0, 0, -7).Format(time.RFC3339) result, err := s.db.ExecContext(ctx, "DELETE FROM events WHERE timestamp < ?", cutoff) if err != nil { return err } deleted, _ := result.RowsAffected() log.Printf("Telemetry: cleaned up %d old events", deleted) // Also clean old crash occurrences (90 days) crashCutoff := time.Now().AddDate(0, 0, -90).Format(time.RFC3339) s.db.ExecContext(ctx, "DELETE FROM crash_occurrences WHERE timestamp < ?", crashCutoff) return nil } // Helper functions func generateID() string { hash := sha256.Sum256([]byte(fmt.Sprintf("%d", time.Now().UnixNano()))) return hex.EncodeToString(hash[:8]) } func contains(slice []string, item string) bool { for _, s := range slice { if s == item { return true } } return false } // TriggerAggregation manually triggers aggregation (useful for testing) func (s *Service) TriggerAggregation(ctx context.Context) error { if err := s.aggregateHourly(ctx); err != nil { return err } return s.aggregateDaily(ctx) }