Files
MosisService/portal/internal/telemetry/telemetry.go

667 lines
19 KiB
Go

// Package telemetry provides app analytics and crash reporting
package telemetry
import (
"context"
"crypto/sha256"
"database/sql"
"encoding/hex"
"encoding/json"
"fmt"
"log"
"regexp"
"strings"
"sync"
"time"
_ "modernc.org/sqlite"
)
// Event types
const (
EventAppStart = "app_start"
EventAppStop = "app_stop"
EventAppCrash = "app_crash"
EventLuaError = "lua_error"
EventPerfFrame = "perf_frame"
EventPerfMemory = "perf_memory"
EventPerfStartup = "perf_startup"
EventScreenView = "screen_view"
EventFeatureUsed = "feature_used"
)
// Event represents a telemetry event
type Event struct {
Type string `json:"type"`
Timestamp string `json:"timestamp"`
Data json.RawMessage `json:"data,omitempty"`
}
// EventBatch represents a batch of events from a device
type EventBatch struct {
AppID string `json:"app_id"`
AppVersion string `json:"app_version"`
MosisVersion string `json:"mosis_version"`
DeviceID string `json:"device_id"`
SessionID string `json:"session_id,omitempty"`
Events []Event `json:"events"`
}
// CrashReport represents a crash report from a device
type CrashReport struct {
AppID string `json:"app_id"`
AppVersion string `json:"app_version"`
MosisVersion string `json:"mosis_version"`
DeviceID string `json:"device_id"`
Timestamp string `json:"timestamp"`
Crash CrashDetails `json:"crash"`
}
// CrashDetails contains crash information
type CrashDetails struct {
Type string `json:"type"`
Message string `json:"message"`
StackTrace string `json:"stack_trace"`
Context map[string]interface{} `json:"context,omitempty"`
}
// CrashGroup represents a group of similar crashes
type CrashGroup struct {
ID string `json:"id"`
AppID string `json:"app_id"`
Fingerprint string `json:"fingerprint"`
CrashType string `json:"crash_type"`
Message string `json:"message"`
SampleStackTrace string `json:"sample_stack_trace"`
FirstSeen string `json:"first_seen"`
LastSeen string `json:"last_seen"`
OccurrenceCount int `json:"occurrence_count"`
AffectedVersions []string `json:"affected_versions"`
Status string `json:"status"` // open, resolved, ignored
}
// DailyStats represents aggregated daily statistics
type DailyStats struct {
AppID string `json:"app_id"`
Date string `json:"date"`
EventType string `json:"event_type"`
Count int `json:"count"`
UniqueDevices int `json:"unique_devices"`
}
// AnalyticsOverview represents the analytics summary for an app
type AnalyticsOverview struct {
DAU int `json:"dau"`
DAUChange float64 `json:"dau_change"`
TotalCrashes int `json:"total_crashes"`
CrashChange float64 `json:"crash_change"`
CrashFreeRate float64 `json:"crash_free_rate"`
TotalSessions int `json:"total_sessions"`
}
// Service handles telemetry operations
type Service struct {
db *sql.DB
dbPath string
mu sync.Mutex
stopCh chan struct{}
}
// New creates a new telemetry service with a separate database
func New(dbPath string) (*Service, error) {
db, err := sql.Open("sqlite", dbPath+"?_journal_mode=WAL&_busy_timeout=5000")
if err != nil {
return nil, fmt.Errorf("open telemetry db: %w", err)
}
s := &Service{
db: db,
dbPath: dbPath,
stopCh: make(chan struct{}),
}
if err := s.migrate(); err != nil {
db.Close()
return nil, fmt.Errorf("migrate telemetry db: %w", err)
}
return s, nil
}
// migrate creates the telemetry database schema
func (s *Service) migrate() error {
schema := `
-- Raw events (7-day retention)
CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
app_id TEXT NOT NULL,
device_id TEXT NOT NULL,
session_id TEXT,
event_type TEXT NOT NULL,
event_data TEXT,
app_version TEXT,
mosis_version TEXT,
timestamp TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_events_app_time ON events(app_id, timestamp);
CREATE INDEX IF NOT EXISTS idx_events_type ON events(event_type, timestamp);
CREATE INDEX IF NOT EXISTS idx_events_cleanup ON events(timestamp);
-- Hourly aggregates
CREATE TABLE IF NOT EXISTS hourly_stats (
app_id TEXT NOT NULL,
hour TEXT NOT NULL,
event_type TEXT NOT NULL,
count INTEGER NOT NULL,
unique_devices INTEGER NOT NULL,
PRIMARY KEY (app_id, hour, event_type)
);
-- Daily aggregates
CREATE TABLE IF NOT EXISTS daily_stats (
app_id TEXT NOT NULL,
date TEXT NOT NULL,
event_type TEXT NOT NULL,
count INTEGER NOT NULL,
unique_devices INTEGER NOT NULL,
PRIMARY KEY (app_id, date, event_type)
);
-- Crash groups (deduplicated by fingerprint)
CREATE TABLE IF NOT EXISTS crash_groups (
id TEXT PRIMARY KEY,
app_id TEXT NOT NULL,
fingerprint TEXT NOT NULL,
crash_type TEXT NOT NULL,
message TEXT,
sample_stack_trace TEXT,
first_seen TEXT NOT NULL,
last_seen TEXT NOT NULL,
occurrence_count INTEGER DEFAULT 1,
affected_versions TEXT,
status TEXT DEFAULT 'open',
UNIQUE(app_id, fingerprint)
);
CREATE INDEX IF NOT EXISTS idx_crashes_app ON crash_groups(app_id, status);
-- Individual crash occurrences (for recent list)
CREATE TABLE IF NOT EXISTS crash_occurrences (
id INTEGER PRIMARY KEY AUTOINCREMENT,
crash_group_id TEXT NOT NULL,
device_id TEXT NOT NULL,
app_version TEXT,
context TEXT,
timestamp TEXT NOT NULL,
FOREIGN KEY (crash_group_id) REFERENCES crash_groups(id)
);
CREATE INDEX IF NOT EXISTS idx_occurrences_group ON crash_occurrences(crash_group_id, timestamp);
`
_, err := s.db.Exec(schema)
return err
}
// Close closes the telemetry database
func (s *Service) Close() error {
close(s.stopCh)
return s.db.Close()
}
// RecordEvents records a batch of events
func (s *Service) RecordEvents(ctx context.Context, batch *EventBatch) (int, error) {
s.mu.Lock()
defer s.mu.Unlock()
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return 0, err
}
defer tx.Rollback()
stmt, err := tx.PrepareContext(ctx, `
INSERT INTO events (app_id, device_id, session_id, event_type, event_data, app_version, mosis_version, timestamp)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
`)
if err != nil {
return 0, err
}
defer stmt.Close()
count := 0
for _, event := range batch.Events {
var eventData string
if event.Data != nil {
eventData = string(event.Data)
}
_, err := stmt.ExecContext(ctx,
batch.AppID,
batch.DeviceID,
batch.SessionID,
event.Type,
eventData,
batch.AppVersion,
batch.MosisVersion,
event.Timestamp,
)
if err != nil {
log.Printf("Failed to insert event: %v", err)
continue
}
count++
}
if err := tx.Commit(); err != nil {
return 0, err
}
return count, nil
}
// RecordCrash records a crash report
func (s *Service) RecordCrash(ctx context.Context, report *CrashReport) (string, error) {
s.mu.Lock()
defer s.mu.Unlock()
// Generate fingerprint for crash grouping
fingerprint := s.fingerprintCrash(report)
groupID := generateID()
// Try to find existing crash group
var existingID string
var affectedVersions string
err := s.db.QueryRowContext(ctx, `
SELECT id, affected_versions FROM crash_groups WHERE app_id = ? AND fingerprint = ?
`, report.AppID, fingerprint).Scan(&existingID, &affectedVersions)
if err == sql.ErrNoRows {
// Create new crash group
versions, _ := json.Marshal([]string{report.AppVersion})
_, err = s.db.ExecContext(ctx, `
INSERT INTO crash_groups (id, app_id, fingerprint, crash_type, message, sample_stack_trace, first_seen, last_seen, occurrence_count, affected_versions, status)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, 1, ?, 'open')
`, groupID, report.AppID, fingerprint, report.Crash.Type, report.Crash.Message, report.Crash.StackTrace, report.Timestamp, report.Timestamp, string(versions))
if err != nil {
return "", err
}
} else if err != nil {
return "", err
} else {
// Update existing crash group
groupID = existingID
// Add version if not already in list
var versions []string
json.Unmarshal([]byte(affectedVersions), &versions)
if !contains(versions, report.AppVersion) {
versions = append(versions, report.AppVersion)
}
versionsJSON, _ := json.Marshal(versions)
_, err = s.db.ExecContext(ctx, `
UPDATE crash_groups SET
last_seen = ?,
occurrence_count = occurrence_count + 1,
affected_versions = ?,
status = CASE WHEN status = 'resolved' THEN 'open' ELSE status END
WHERE id = ?
`, report.Timestamp, string(versionsJSON), groupID)
if err != nil {
return "", err
}
}
// Record individual occurrence
contextJSON, _ := json.Marshal(report.Crash.Context)
_, err = s.db.ExecContext(ctx, `
INSERT INTO crash_occurrences (crash_group_id, device_id, app_version, context, timestamp)
VALUES (?, ?, ?, ?, ?)
`, groupID, report.DeviceID, report.AppVersion, string(contextJSON), report.Timestamp)
return groupID, err
}
// fingerprintCrash generates a unique fingerprint for crash grouping
func (s *Service) fingerprintCrash(report *CrashReport) string {
// Normalize stack trace (remove line numbers)
normalized := normalizeStackTrace(report.Crash.StackTrace)
// Create fingerprint from type, message, and normalized stack
key := fmt.Sprintf("%s:%s:%s", report.Crash.Type, report.Crash.Message, normalized)
hash := sha256.Sum256([]byte(key))
return hex.EncodeToString(hash[:8])
}
// normalizeStackTrace removes line numbers for consistent fingerprinting
func normalizeStackTrace(stack string) string {
re := regexp.MustCompile(`:\d+:`)
return re.ReplaceAllString(stack, ":?:")
}
// GetAnalyticsOverview returns analytics summary for an app
func (s *Service) GetAnalyticsOverview(ctx context.Context, appID string, days int) (*AnalyticsOverview, error) {
endDate := time.Now()
startDate := endDate.AddDate(0, 0, -days)
prevStartDate := startDate.AddDate(0, 0, -days)
// Current period DAU (average)
var currentDAU float64
err := s.db.QueryRowContext(ctx, `
SELECT COALESCE(AVG(unique_devices), 0) FROM daily_stats
WHERE app_id = ? AND event_type = 'app_start' AND date >= ? AND date <= ?
`, appID, startDate.Format("2006-01-02"), endDate.Format("2006-01-02")).Scan(&currentDAU)
if err != nil {
return nil, err
}
// Previous period DAU
var prevDAU float64
s.db.QueryRowContext(ctx, `
SELECT COALESCE(AVG(unique_devices), 0) FROM daily_stats
WHERE app_id = ? AND event_type = 'app_start' AND date >= ? AND date < ?
`, appID, prevStartDate.Format("2006-01-02"), startDate.Format("2006-01-02")).Scan(&prevDAU)
// Current crashes
var currentCrashes int
s.db.QueryRowContext(ctx, `
SELECT COALESCE(SUM(count), 0) FROM daily_stats
WHERE app_id = ? AND event_type = 'app_crash' AND date >= ? AND date <= ?
`, appID, startDate.Format("2006-01-02"), endDate.Format("2006-01-02")).Scan(&currentCrashes)
// Previous crashes
var prevCrashes int
s.db.QueryRowContext(ctx, `
SELECT COALESCE(SUM(count), 0) FROM daily_stats
WHERE app_id = ? AND event_type = 'app_crash' AND date >= ? AND date < ?
`, appID, prevStartDate.Format("2006-01-02"), startDate.Format("2006-01-02")).Scan(&prevCrashes)
// Total sessions
var totalSessions int
s.db.QueryRowContext(ctx, `
SELECT COALESCE(SUM(count), 0) FROM daily_stats
WHERE app_id = ? AND event_type = 'app_start' AND date >= ? AND date <= ?
`, appID, startDate.Format("2006-01-02"), endDate.Format("2006-01-02")).Scan(&totalSessions)
// Calculate changes
dauChange := 0.0
if prevDAU > 0 {
dauChange = ((currentDAU - prevDAU) / prevDAU) * 100
}
crashChange := 0.0
if prevCrashes > 0 {
crashChange = ((float64(currentCrashes) - float64(prevCrashes)) / float64(prevCrashes)) * 100
}
crashFreeRate := 100.0
if totalSessions > 0 {
crashFreeRate = (1 - float64(currentCrashes)/float64(totalSessions)) * 100
if crashFreeRate < 0 {
crashFreeRate = 0
}
}
return &AnalyticsOverview{
DAU: int(currentDAU),
DAUChange: dauChange,
TotalCrashes: currentCrashes,
CrashChange: crashChange,
CrashFreeRate: crashFreeRate,
TotalSessions: totalSessions,
}, nil
}
// GetDailyStats returns daily statistics for an app
func (s *Service) GetDailyStats(ctx context.Context, appID, eventType string, days int) ([]DailyStats, error) {
startDate := time.Now().AddDate(0, 0, -days).Format("2006-01-02")
query := `
SELECT app_id, date, event_type, count, unique_devices
FROM daily_stats
WHERE app_id = ? AND date >= ?
`
args := []interface{}{appID, startDate}
if eventType != "" {
query += " AND event_type = ?"
args = append(args, eventType)
}
query += " ORDER BY date ASC"
rows, err := s.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, err
}
defer rows.Close()
var stats []DailyStats
for rows.Next() {
var s DailyStats
if err := rows.Scan(&s.AppID, &s.Date, &s.EventType, &s.Count, &s.UniqueDevices); err != nil {
return nil, err
}
stats = append(stats, s)
}
return stats, rows.Err()
}
// GetCrashGroups returns crash groups for an app
func (s *Service) GetCrashGroups(ctx context.Context, appID, status string, limit, offset int) ([]CrashGroup, int, error) {
// Count total
countQuery := "SELECT COUNT(*) FROM crash_groups WHERE app_id = ?"
args := []interface{}{appID}
if status != "" {
countQuery += " AND status = ?"
args = append(args, status)
}
var total int
if err := s.db.QueryRowContext(ctx, countQuery, args...).Scan(&total); err != nil {
return nil, 0, err
}
// Fetch groups
query := `
SELECT id, app_id, fingerprint, crash_type, message, sample_stack_trace,
first_seen, last_seen, occurrence_count, affected_versions, status
FROM crash_groups WHERE app_id = ?
`
args = []interface{}{appID}
if status != "" {
query += " AND status = ?"
args = append(args, status)
}
query += " ORDER BY last_seen DESC LIMIT ? OFFSET ?"
args = append(args, limit, offset)
rows, err := s.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, 0, err
}
defer rows.Close()
var groups []CrashGroup
for rows.Next() {
var g CrashGroup
var versionsJSON string
if err := rows.Scan(&g.ID, &g.AppID, &g.Fingerprint, &g.CrashType, &g.Message,
&g.SampleStackTrace, &g.FirstSeen, &g.LastSeen, &g.OccurrenceCount,
&versionsJSON, &g.Status); err != nil {
return nil, 0, err
}
json.Unmarshal([]byte(versionsJSON), &g.AffectedVersions)
groups = append(groups, g)
}
return groups, total, rows.Err()
}
// GetCrashGroup returns a single crash group with recent occurrences
func (s *Service) GetCrashGroup(ctx context.Context, appID, groupID string) (*CrashGroup, error) {
var g CrashGroup
var versionsJSON string
err := s.db.QueryRowContext(ctx, `
SELECT id, app_id, fingerprint, crash_type, message, sample_stack_trace,
first_seen, last_seen, occurrence_count, affected_versions, status
FROM crash_groups WHERE app_id = ? AND id = ?
`, appID, groupID).Scan(&g.ID, &g.AppID, &g.Fingerprint, &g.CrashType, &g.Message,
&g.SampleStackTrace, &g.FirstSeen, &g.LastSeen, &g.OccurrenceCount,
&versionsJSON, &g.Status)
if err != nil {
return nil, err
}
json.Unmarshal([]byte(versionsJSON), &g.AffectedVersions)
return &g, nil
}
// UpdateCrashGroupStatus updates the status of a crash group
func (s *Service) UpdateCrashGroupStatus(ctx context.Context, appID, groupID, status string) error {
_, err := s.db.ExecContext(ctx, `
UPDATE crash_groups SET status = ? WHERE app_id = ? AND id = ?
`, status, appID, groupID)
return err
}
// StartBackgroundWorkers starts the aggregation and cleanup workers
func (s *Service) StartBackgroundWorkers(ctx context.Context) {
// Hourly aggregation
go s.runPeriodic(ctx, time.Hour, "hourly aggregation", s.aggregateHourly)
// Daily aggregation at 2am
go s.runDaily(ctx, 2, "daily aggregation", s.aggregateDaily)
// Cleanup old events at 3am
go s.runDaily(ctx, 3, "event cleanup", s.cleanupOldEvents)
log.Println("Telemetry background workers started")
}
func (s *Service) runPeriodic(ctx context.Context, interval time.Duration, name string, fn func(context.Context) error) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-s.stopCh:
return
case <-ticker.C:
if err := fn(ctx); err != nil {
log.Printf("Telemetry %s error: %v", name, err)
}
}
}
}
func (s *Service) runDaily(ctx context.Context, hour int, name string, fn func(context.Context) error) {
for {
now := time.Now()
next := time.Date(now.Year(), now.Month(), now.Day(), hour, 0, 0, 0, now.Location())
if next.Before(now) {
next = next.Add(24 * time.Hour)
}
wait := next.Sub(now)
select {
case <-ctx.Done():
return
case <-s.stopCh:
return
case <-time.After(wait):
if err := fn(ctx); err != nil {
log.Printf("Telemetry %s error: %v", name, err)
}
}
}
}
func (s *Service) aggregateHourly(ctx context.Context) error {
hour := time.Now().Add(-time.Hour).Format("2006-01-02T15")
_, err := s.db.ExecContext(ctx, `
INSERT OR REPLACE INTO hourly_stats (app_id, hour, event_type, count, unique_devices)
SELECT
app_id,
strftime('%Y-%m-%dT%H', timestamp) as hour,
event_type,
COUNT(*) as count,
COUNT(DISTINCT device_id) as unique_devices
FROM events
WHERE strftime('%Y-%m-%dT%H', timestamp) = ?
GROUP BY app_id, hour, event_type
`, hour)
if err == nil {
log.Printf("Telemetry: hourly aggregation completed for %s", hour)
}
return err
}
func (s *Service) aggregateDaily(ctx context.Context) error {
yesterday := time.Now().AddDate(0, 0, -1).Format("2006-01-02")
_, err := s.db.ExecContext(ctx, `
INSERT OR REPLACE INTO daily_stats (app_id, date, event_type, count, unique_devices)
SELECT
app_id,
? as date,
event_type,
SUM(count) as count,
SUM(unique_devices) as unique_devices
FROM hourly_stats
WHERE hour LIKE ? || 'T%'
GROUP BY app_id, event_type
`, yesterday, yesterday)
if err == nil {
log.Printf("Telemetry: daily aggregation completed for %s", yesterday)
}
return err
}
func (s *Service) cleanupOldEvents(ctx context.Context) error {
cutoff := time.Now().AddDate(0, 0, -7).Format(time.RFC3339)
result, err := s.db.ExecContext(ctx, "DELETE FROM events WHERE timestamp < ?", cutoff)
if err != nil {
return err
}
deleted, _ := result.RowsAffected()
log.Printf("Telemetry: cleaned up %d old events", deleted)
// Also clean old crash occurrences (90 days)
crashCutoff := time.Now().AddDate(0, 0, -90).Format(time.RFC3339)
s.db.ExecContext(ctx, "DELETE FROM crash_occurrences WHERE timestamp < ?", crashCutoff)
return nil
}
// Helper functions
func generateID() string {
hash := sha256.Sum256([]byte(fmt.Sprintf("%d", time.Now().UnixNano())))
return hex.EncodeToString(hash[:8])
}
func contains(slice []string, item string) bool {
for _, s := range slice {
if s == item {
return true
}
}
return false
}
// TriggerAggregation manually triggers aggregation (useful for testing)
func (s *Service) TriggerAggregation(ctx context.Context) error {
if err := s.aggregateHourly(ctx); err != nil {
return err
}
return s.aggregateDaily(ctx)
}