667 lines
19 KiB
Go
667 lines
19 KiB
Go
// Package telemetry provides app analytics and crash reporting
|
|
package telemetry
|
|
|
|
import (
|
|
"context"
|
|
"crypto/sha256"
|
|
"database/sql"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log"
|
|
"regexp"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
_ "modernc.org/sqlite"
|
|
)
|
|
|
|
// Event types
|
|
const (
|
|
EventAppStart = "app_start"
|
|
EventAppStop = "app_stop"
|
|
EventAppCrash = "app_crash"
|
|
EventLuaError = "lua_error"
|
|
EventPerfFrame = "perf_frame"
|
|
EventPerfMemory = "perf_memory"
|
|
EventPerfStartup = "perf_startup"
|
|
EventScreenView = "screen_view"
|
|
EventFeatureUsed = "feature_used"
|
|
)
|
|
|
|
// Event represents a telemetry event
|
|
type Event struct {
|
|
Type string `json:"type"`
|
|
Timestamp string `json:"timestamp"`
|
|
Data json.RawMessage `json:"data,omitempty"`
|
|
}
|
|
|
|
// EventBatch represents a batch of events from a device
|
|
type EventBatch struct {
|
|
AppID string `json:"app_id"`
|
|
AppVersion string `json:"app_version"`
|
|
MosisVersion string `json:"mosis_version"`
|
|
DeviceID string `json:"device_id"`
|
|
SessionID string `json:"session_id,omitempty"`
|
|
Events []Event `json:"events"`
|
|
}
|
|
|
|
// CrashReport represents a crash report from a device
|
|
type CrashReport struct {
|
|
AppID string `json:"app_id"`
|
|
AppVersion string `json:"app_version"`
|
|
MosisVersion string `json:"mosis_version"`
|
|
DeviceID string `json:"device_id"`
|
|
Timestamp string `json:"timestamp"`
|
|
Crash CrashDetails `json:"crash"`
|
|
}
|
|
|
|
// CrashDetails contains crash information
|
|
type CrashDetails struct {
|
|
Type string `json:"type"`
|
|
Message string `json:"message"`
|
|
StackTrace string `json:"stack_trace"`
|
|
Context map[string]interface{} `json:"context,omitempty"`
|
|
}
|
|
|
|
// CrashGroup represents a group of similar crashes
|
|
type CrashGroup struct {
|
|
ID string `json:"id"`
|
|
AppID string `json:"app_id"`
|
|
Fingerprint string `json:"fingerprint"`
|
|
CrashType string `json:"crash_type"`
|
|
Message string `json:"message"`
|
|
SampleStackTrace string `json:"sample_stack_trace"`
|
|
FirstSeen string `json:"first_seen"`
|
|
LastSeen string `json:"last_seen"`
|
|
OccurrenceCount int `json:"occurrence_count"`
|
|
AffectedVersions []string `json:"affected_versions"`
|
|
Status string `json:"status"` // open, resolved, ignored
|
|
}
|
|
|
|
// DailyStats represents aggregated daily statistics
|
|
type DailyStats struct {
|
|
AppID string `json:"app_id"`
|
|
Date string `json:"date"`
|
|
EventType string `json:"event_type"`
|
|
Count int `json:"count"`
|
|
UniqueDevices int `json:"unique_devices"`
|
|
}
|
|
|
|
// AnalyticsOverview represents the analytics summary for an app
|
|
type AnalyticsOverview struct {
|
|
DAU int `json:"dau"`
|
|
DAUChange float64 `json:"dau_change"`
|
|
TotalCrashes int `json:"total_crashes"`
|
|
CrashChange float64 `json:"crash_change"`
|
|
CrashFreeRate float64 `json:"crash_free_rate"`
|
|
TotalSessions int `json:"total_sessions"`
|
|
}
|
|
|
|
// Service handles telemetry operations
|
|
type Service struct {
|
|
db *sql.DB
|
|
dbPath string
|
|
mu sync.Mutex
|
|
stopCh chan struct{}
|
|
}
|
|
|
|
// New creates a new telemetry service with a separate database
|
|
func New(dbPath string) (*Service, error) {
|
|
db, err := sql.Open("sqlite", dbPath+"?_journal_mode=WAL&_busy_timeout=5000")
|
|
if err != nil {
|
|
return nil, fmt.Errorf("open telemetry db: %w", err)
|
|
}
|
|
|
|
s := &Service{
|
|
db: db,
|
|
dbPath: dbPath,
|
|
stopCh: make(chan struct{}),
|
|
}
|
|
|
|
if err := s.migrate(); err != nil {
|
|
db.Close()
|
|
return nil, fmt.Errorf("migrate telemetry db: %w", err)
|
|
}
|
|
|
|
return s, nil
|
|
}
|
|
|
|
// migrate creates the telemetry database schema
|
|
func (s *Service) migrate() error {
|
|
schema := `
|
|
-- Raw events (7-day retention)
|
|
CREATE TABLE IF NOT EXISTS events (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
app_id TEXT NOT NULL,
|
|
device_id TEXT NOT NULL,
|
|
session_id TEXT,
|
|
event_type TEXT NOT NULL,
|
|
event_data TEXT,
|
|
app_version TEXT,
|
|
mosis_version TEXT,
|
|
timestamp TEXT NOT NULL
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_events_app_time ON events(app_id, timestamp);
|
|
CREATE INDEX IF NOT EXISTS idx_events_type ON events(event_type, timestamp);
|
|
CREATE INDEX IF NOT EXISTS idx_events_cleanup ON events(timestamp);
|
|
|
|
-- Hourly aggregates
|
|
CREATE TABLE IF NOT EXISTS hourly_stats (
|
|
app_id TEXT NOT NULL,
|
|
hour TEXT NOT NULL,
|
|
event_type TEXT NOT NULL,
|
|
count INTEGER NOT NULL,
|
|
unique_devices INTEGER NOT NULL,
|
|
PRIMARY KEY (app_id, hour, event_type)
|
|
);
|
|
|
|
-- Daily aggregates
|
|
CREATE TABLE IF NOT EXISTS daily_stats (
|
|
app_id TEXT NOT NULL,
|
|
date TEXT NOT NULL,
|
|
event_type TEXT NOT NULL,
|
|
count INTEGER NOT NULL,
|
|
unique_devices INTEGER NOT NULL,
|
|
PRIMARY KEY (app_id, date, event_type)
|
|
);
|
|
|
|
-- Crash groups (deduplicated by fingerprint)
|
|
CREATE TABLE IF NOT EXISTS crash_groups (
|
|
id TEXT PRIMARY KEY,
|
|
app_id TEXT NOT NULL,
|
|
fingerprint TEXT NOT NULL,
|
|
crash_type TEXT NOT NULL,
|
|
message TEXT,
|
|
sample_stack_trace TEXT,
|
|
first_seen TEXT NOT NULL,
|
|
last_seen TEXT NOT NULL,
|
|
occurrence_count INTEGER DEFAULT 1,
|
|
affected_versions TEXT,
|
|
status TEXT DEFAULT 'open',
|
|
UNIQUE(app_id, fingerprint)
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_crashes_app ON crash_groups(app_id, status);
|
|
|
|
-- Individual crash occurrences (for recent list)
|
|
CREATE TABLE IF NOT EXISTS crash_occurrences (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
crash_group_id TEXT NOT NULL,
|
|
device_id TEXT NOT NULL,
|
|
app_version TEXT,
|
|
context TEXT,
|
|
timestamp TEXT NOT NULL,
|
|
FOREIGN KEY (crash_group_id) REFERENCES crash_groups(id)
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_occurrences_group ON crash_occurrences(crash_group_id, timestamp);
|
|
`
|
|
|
|
_, err := s.db.Exec(schema)
|
|
return err
|
|
}
|
|
|
|
// Close closes the telemetry database
|
|
func (s *Service) Close() error {
|
|
close(s.stopCh)
|
|
return s.db.Close()
|
|
}
|
|
|
|
// RecordEvents records a batch of events
|
|
func (s *Service) RecordEvents(ctx context.Context, batch *EventBatch) (int, error) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
tx, err := s.db.BeginTx(ctx, nil)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
defer tx.Rollback()
|
|
|
|
stmt, err := tx.PrepareContext(ctx, `
|
|
INSERT INTO events (app_id, device_id, session_id, event_type, event_data, app_version, mosis_version, timestamp)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
|
`)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
defer stmt.Close()
|
|
|
|
count := 0
|
|
for _, event := range batch.Events {
|
|
var eventData string
|
|
if event.Data != nil {
|
|
eventData = string(event.Data)
|
|
}
|
|
|
|
_, err := stmt.ExecContext(ctx,
|
|
batch.AppID,
|
|
batch.DeviceID,
|
|
batch.SessionID,
|
|
event.Type,
|
|
eventData,
|
|
batch.AppVersion,
|
|
batch.MosisVersion,
|
|
event.Timestamp,
|
|
)
|
|
if err != nil {
|
|
log.Printf("Failed to insert event: %v", err)
|
|
continue
|
|
}
|
|
count++
|
|
}
|
|
|
|
if err := tx.Commit(); err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
return count, nil
|
|
}
|
|
|
|
// RecordCrash records a crash report
|
|
func (s *Service) RecordCrash(ctx context.Context, report *CrashReport) (string, error) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
// Generate fingerprint for crash grouping
|
|
fingerprint := s.fingerprintCrash(report)
|
|
groupID := generateID()
|
|
|
|
// Try to find existing crash group
|
|
var existingID string
|
|
var affectedVersions string
|
|
err := s.db.QueryRowContext(ctx, `
|
|
SELECT id, affected_versions FROM crash_groups WHERE app_id = ? AND fingerprint = ?
|
|
`, report.AppID, fingerprint).Scan(&existingID, &affectedVersions)
|
|
|
|
if err == sql.ErrNoRows {
|
|
// Create new crash group
|
|
versions, _ := json.Marshal([]string{report.AppVersion})
|
|
_, err = s.db.ExecContext(ctx, `
|
|
INSERT INTO crash_groups (id, app_id, fingerprint, crash_type, message, sample_stack_trace, first_seen, last_seen, occurrence_count, affected_versions, status)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, 1, ?, 'open')
|
|
`, groupID, report.AppID, fingerprint, report.Crash.Type, report.Crash.Message, report.Crash.StackTrace, report.Timestamp, report.Timestamp, string(versions))
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
} else if err != nil {
|
|
return "", err
|
|
} else {
|
|
// Update existing crash group
|
|
groupID = existingID
|
|
|
|
// Add version if not already in list
|
|
var versions []string
|
|
json.Unmarshal([]byte(affectedVersions), &versions)
|
|
if !contains(versions, report.AppVersion) {
|
|
versions = append(versions, report.AppVersion)
|
|
}
|
|
versionsJSON, _ := json.Marshal(versions)
|
|
|
|
_, err = s.db.ExecContext(ctx, `
|
|
UPDATE crash_groups SET
|
|
last_seen = ?,
|
|
occurrence_count = occurrence_count + 1,
|
|
affected_versions = ?,
|
|
status = CASE WHEN status = 'resolved' THEN 'open' ELSE status END
|
|
WHERE id = ?
|
|
`, report.Timestamp, string(versionsJSON), groupID)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
}
|
|
|
|
// Record individual occurrence
|
|
contextJSON, _ := json.Marshal(report.Crash.Context)
|
|
_, err = s.db.ExecContext(ctx, `
|
|
INSERT INTO crash_occurrences (crash_group_id, device_id, app_version, context, timestamp)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
`, groupID, report.DeviceID, report.AppVersion, string(contextJSON), report.Timestamp)
|
|
|
|
return groupID, err
|
|
}
|
|
|
|
// fingerprintCrash generates a unique fingerprint for crash grouping
|
|
func (s *Service) fingerprintCrash(report *CrashReport) string {
|
|
// Normalize stack trace (remove line numbers)
|
|
normalized := normalizeStackTrace(report.Crash.StackTrace)
|
|
|
|
// Create fingerprint from type, message, and normalized stack
|
|
key := fmt.Sprintf("%s:%s:%s", report.Crash.Type, report.Crash.Message, normalized)
|
|
hash := sha256.Sum256([]byte(key))
|
|
return hex.EncodeToString(hash[:8])
|
|
}
|
|
|
|
// normalizeStackTrace removes line numbers for consistent fingerprinting
|
|
func normalizeStackTrace(stack string) string {
|
|
re := regexp.MustCompile(`:\d+:`)
|
|
return re.ReplaceAllString(stack, ":?:")
|
|
}
|
|
|
|
// GetAnalyticsOverview returns analytics summary for an app
|
|
func (s *Service) GetAnalyticsOverview(ctx context.Context, appID string, days int) (*AnalyticsOverview, error) {
|
|
endDate := time.Now()
|
|
startDate := endDate.AddDate(0, 0, -days)
|
|
prevStartDate := startDate.AddDate(0, 0, -days)
|
|
|
|
// Current period DAU (average)
|
|
var currentDAU float64
|
|
err := s.db.QueryRowContext(ctx, `
|
|
SELECT COALESCE(AVG(unique_devices), 0) FROM daily_stats
|
|
WHERE app_id = ? AND event_type = 'app_start' AND date >= ? AND date <= ?
|
|
`, appID, startDate.Format("2006-01-02"), endDate.Format("2006-01-02")).Scan(¤tDAU)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Previous period DAU
|
|
var prevDAU float64
|
|
s.db.QueryRowContext(ctx, `
|
|
SELECT COALESCE(AVG(unique_devices), 0) FROM daily_stats
|
|
WHERE app_id = ? AND event_type = 'app_start' AND date >= ? AND date < ?
|
|
`, appID, prevStartDate.Format("2006-01-02"), startDate.Format("2006-01-02")).Scan(&prevDAU)
|
|
|
|
// Current crashes
|
|
var currentCrashes int
|
|
s.db.QueryRowContext(ctx, `
|
|
SELECT COALESCE(SUM(count), 0) FROM daily_stats
|
|
WHERE app_id = ? AND event_type = 'app_crash' AND date >= ? AND date <= ?
|
|
`, appID, startDate.Format("2006-01-02"), endDate.Format("2006-01-02")).Scan(¤tCrashes)
|
|
|
|
// Previous crashes
|
|
var prevCrashes int
|
|
s.db.QueryRowContext(ctx, `
|
|
SELECT COALESCE(SUM(count), 0) FROM daily_stats
|
|
WHERE app_id = ? AND event_type = 'app_crash' AND date >= ? AND date < ?
|
|
`, appID, prevStartDate.Format("2006-01-02"), startDate.Format("2006-01-02")).Scan(&prevCrashes)
|
|
|
|
// Total sessions
|
|
var totalSessions int
|
|
s.db.QueryRowContext(ctx, `
|
|
SELECT COALESCE(SUM(count), 0) FROM daily_stats
|
|
WHERE app_id = ? AND event_type = 'app_start' AND date >= ? AND date <= ?
|
|
`, appID, startDate.Format("2006-01-02"), endDate.Format("2006-01-02")).Scan(&totalSessions)
|
|
|
|
// Calculate changes
|
|
dauChange := 0.0
|
|
if prevDAU > 0 {
|
|
dauChange = ((currentDAU - prevDAU) / prevDAU) * 100
|
|
}
|
|
|
|
crashChange := 0.0
|
|
if prevCrashes > 0 {
|
|
crashChange = ((float64(currentCrashes) - float64(prevCrashes)) / float64(prevCrashes)) * 100
|
|
}
|
|
|
|
crashFreeRate := 100.0
|
|
if totalSessions > 0 {
|
|
crashFreeRate = (1 - float64(currentCrashes)/float64(totalSessions)) * 100
|
|
if crashFreeRate < 0 {
|
|
crashFreeRate = 0
|
|
}
|
|
}
|
|
|
|
return &AnalyticsOverview{
|
|
DAU: int(currentDAU),
|
|
DAUChange: dauChange,
|
|
TotalCrashes: currentCrashes,
|
|
CrashChange: crashChange,
|
|
CrashFreeRate: crashFreeRate,
|
|
TotalSessions: totalSessions,
|
|
}, nil
|
|
}
|
|
|
|
// GetDailyStats returns daily statistics for an app
|
|
func (s *Service) GetDailyStats(ctx context.Context, appID, eventType string, days int) ([]DailyStats, error) {
|
|
startDate := time.Now().AddDate(0, 0, -days).Format("2006-01-02")
|
|
|
|
query := `
|
|
SELECT app_id, date, event_type, count, unique_devices
|
|
FROM daily_stats
|
|
WHERE app_id = ? AND date >= ?
|
|
`
|
|
args := []interface{}{appID, startDate}
|
|
|
|
if eventType != "" {
|
|
query += " AND event_type = ?"
|
|
args = append(args, eventType)
|
|
}
|
|
query += " ORDER BY date ASC"
|
|
|
|
rows, err := s.db.QueryContext(ctx, query, args...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var stats []DailyStats
|
|
for rows.Next() {
|
|
var s DailyStats
|
|
if err := rows.Scan(&s.AppID, &s.Date, &s.EventType, &s.Count, &s.UniqueDevices); err != nil {
|
|
return nil, err
|
|
}
|
|
stats = append(stats, s)
|
|
}
|
|
|
|
return stats, rows.Err()
|
|
}
|
|
|
|
// GetCrashGroups returns crash groups for an app
|
|
func (s *Service) GetCrashGroups(ctx context.Context, appID, status string, limit, offset int) ([]CrashGroup, int, error) {
|
|
// Count total
|
|
countQuery := "SELECT COUNT(*) FROM crash_groups WHERE app_id = ?"
|
|
args := []interface{}{appID}
|
|
if status != "" {
|
|
countQuery += " AND status = ?"
|
|
args = append(args, status)
|
|
}
|
|
|
|
var total int
|
|
if err := s.db.QueryRowContext(ctx, countQuery, args...).Scan(&total); err != nil {
|
|
return nil, 0, err
|
|
}
|
|
|
|
// Fetch groups
|
|
query := `
|
|
SELECT id, app_id, fingerprint, crash_type, message, sample_stack_trace,
|
|
first_seen, last_seen, occurrence_count, affected_versions, status
|
|
FROM crash_groups WHERE app_id = ?
|
|
`
|
|
args = []interface{}{appID}
|
|
if status != "" {
|
|
query += " AND status = ?"
|
|
args = append(args, status)
|
|
}
|
|
query += " ORDER BY last_seen DESC LIMIT ? OFFSET ?"
|
|
args = append(args, limit, offset)
|
|
|
|
rows, err := s.db.QueryContext(ctx, query, args...)
|
|
if err != nil {
|
|
return nil, 0, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var groups []CrashGroup
|
|
for rows.Next() {
|
|
var g CrashGroup
|
|
var versionsJSON string
|
|
if err := rows.Scan(&g.ID, &g.AppID, &g.Fingerprint, &g.CrashType, &g.Message,
|
|
&g.SampleStackTrace, &g.FirstSeen, &g.LastSeen, &g.OccurrenceCount,
|
|
&versionsJSON, &g.Status); err != nil {
|
|
return nil, 0, err
|
|
}
|
|
json.Unmarshal([]byte(versionsJSON), &g.AffectedVersions)
|
|
groups = append(groups, g)
|
|
}
|
|
|
|
return groups, total, rows.Err()
|
|
}
|
|
|
|
// GetCrashGroup returns a single crash group with recent occurrences
|
|
func (s *Service) GetCrashGroup(ctx context.Context, appID, groupID string) (*CrashGroup, error) {
|
|
var g CrashGroup
|
|
var versionsJSON string
|
|
err := s.db.QueryRowContext(ctx, `
|
|
SELECT id, app_id, fingerprint, crash_type, message, sample_stack_trace,
|
|
first_seen, last_seen, occurrence_count, affected_versions, status
|
|
FROM crash_groups WHERE app_id = ? AND id = ?
|
|
`, appID, groupID).Scan(&g.ID, &g.AppID, &g.Fingerprint, &g.CrashType, &g.Message,
|
|
&g.SampleStackTrace, &g.FirstSeen, &g.LastSeen, &g.OccurrenceCount,
|
|
&versionsJSON, &g.Status)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
json.Unmarshal([]byte(versionsJSON), &g.AffectedVersions)
|
|
return &g, nil
|
|
}
|
|
|
|
// UpdateCrashGroupStatus updates the status of a crash group
|
|
func (s *Service) UpdateCrashGroupStatus(ctx context.Context, appID, groupID, status string) error {
|
|
_, err := s.db.ExecContext(ctx, `
|
|
UPDATE crash_groups SET status = ? WHERE app_id = ? AND id = ?
|
|
`, status, appID, groupID)
|
|
return err
|
|
}
|
|
|
|
// StartBackgroundWorkers starts the aggregation and cleanup workers
|
|
func (s *Service) StartBackgroundWorkers(ctx context.Context) {
|
|
// Hourly aggregation
|
|
go s.runPeriodic(ctx, time.Hour, "hourly aggregation", s.aggregateHourly)
|
|
|
|
// Daily aggregation at 2am
|
|
go s.runDaily(ctx, 2, "daily aggregation", s.aggregateDaily)
|
|
|
|
// Cleanup old events at 3am
|
|
go s.runDaily(ctx, 3, "event cleanup", s.cleanupOldEvents)
|
|
|
|
log.Println("Telemetry background workers started")
|
|
}
|
|
|
|
func (s *Service) runPeriodic(ctx context.Context, interval time.Duration, name string, fn func(context.Context) error) {
|
|
ticker := time.NewTicker(interval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-s.stopCh:
|
|
return
|
|
case <-ticker.C:
|
|
if err := fn(ctx); err != nil {
|
|
log.Printf("Telemetry %s error: %v", name, err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Service) runDaily(ctx context.Context, hour int, name string, fn func(context.Context) error) {
|
|
for {
|
|
now := time.Now()
|
|
next := time.Date(now.Year(), now.Month(), now.Day(), hour, 0, 0, 0, now.Location())
|
|
if next.Before(now) {
|
|
next = next.Add(24 * time.Hour)
|
|
}
|
|
wait := next.Sub(now)
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-s.stopCh:
|
|
return
|
|
case <-time.After(wait):
|
|
if err := fn(ctx); err != nil {
|
|
log.Printf("Telemetry %s error: %v", name, err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Service) aggregateHourly(ctx context.Context) error {
|
|
hour := time.Now().Add(-time.Hour).Format("2006-01-02T15")
|
|
|
|
_, err := s.db.ExecContext(ctx, `
|
|
INSERT OR REPLACE INTO hourly_stats (app_id, hour, event_type, count, unique_devices)
|
|
SELECT
|
|
app_id,
|
|
strftime('%Y-%m-%dT%H', timestamp) as hour,
|
|
event_type,
|
|
COUNT(*) as count,
|
|
COUNT(DISTINCT device_id) as unique_devices
|
|
FROM events
|
|
WHERE strftime('%Y-%m-%dT%H', timestamp) = ?
|
|
GROUP BY app_id, hour, event_type
|
|
`, hour)
|
|
|
|
if err == nil {
|
|
log.Printf("Telemetry: hourly aggregation completed for %s", hour)
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (s *Service) aggregateDaily(ctx context.Context) error {
|
|
yesterday := time.Now().AddDate(0, 0, -1).Format("2006-01-02")
|
|
|
|
_, err := s.db.ExecContext(ctx, `
|
|
INSERT OR REPLACE INTO daily_stats (app_id, date, event_type, count, unique_devices)
|
|
SELECT
|
|
app_id,
|
|
? as date,
|
|
event_type,
|
|
SUM(count) as count,
|
|
SUM(unique_devices) as unique_devices
|
|
FROM hourly_stats
|
|
WHERE hour LIKE ? || 'T%'
|
|
GROUP BY app_id, event_type
|
|
`, yesterday, yesterday)
|
|
|
|
if err == nil {
|
|
log.Printf("Telemetry: daily aggregation completed for %s", yesterday)
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (s *Service) cleanupOldEvents(ctx context.Context) error {
|
|
cutoff := time.Now().AddDate(0, 0, -7).Format(time.RFC3339)
|
|
|
|
result, err := s.db.ExecContext(ctx, "DELETE FROM events WHERE timestamp < ?", cutoff)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
deleted, _ := result.RowsAffected()
|
|
log.Printf("Telemetry: cleaned up %d old events", deleted)
|
|
|
|
// Also clean old crash occurrences (90 days)
|
|
crashCutoff := time.Now().AddDate(0, 0, -90).Format(time.RFC3339)
|
|
s.db.ExecContext(ctx, "DELETE FROM crash_occurrences WHERE timestamp < ?", crashCutoff)
|
|
|
|
return nil
|
|
}
|
|
|
|
// Helper functions
|
|
func generateID() string {
|
|
hash := sha256.Sum256([]byte(fmt.Sprintf("%d", time.Now().UnixNano())))
|
|
return hex.EncodeToString(hash[:8])
|
|
}
|
|
|
|
func contains(slice []string, item string) bool {
|
|
for _, s := range slice {
|
|
if s == item {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// TriggerAggregation manually triggers aggregation (useful for testing)
|
|
func (s *Service) TriggerAggregation(ctx context.Context) error {
|
|
if err := s.aggregateHourly(ctx); err != nil {
|
|
return err
|
|
}
|
|
return s.aggregateDaily(ctx)
|
|
}
|