- Implemented a retry mechanism for external calls, allowing up to 3 attempts before failing. - Enhanced error handling to send failure notifications when setup steps fail, including detailed error messages. - Updated RunSummary model to include status, error step, and error message fields for better tracking of run outcomes. - Modified database schema to store failure metadata for runs. - Updated README.md to reflect changes in error handling and notification behavior.
410 lines
9.8 KiB
Go
410 lines
9.8 KiB
Go
package store
|
|
|
|
import (
|
|
"database/sql"
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"time"
|
|
|
|
_ "modernc.org/sqlite"
|
|
|
|
"ncore-hnr/internal/model"
|
|
)
|
|
|
|
type Store struct {
|
|
db *sql.DB
|
|
}
|
|
|
|
type State struct {
|
|
FirstSeenAt time.Time
|
|
LastSeenAt time.Time
|
|
}
|
|
|
|
func Open(path string) (*Store, error) {
|
|
if dir := filepath.Dir(path); dir != "." && dir != "" {
|
|
if err := os.MkdirAll(dir, 0o755); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
db, err := sql.Open("sqlite", path)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
store := &Store{db: db}
|
|
if err := store.migrate(); err != nil {
|
|
_ = db.Close()
|
|
return nil, err
|
|
}
|
|
return store, nil
|
|
}
|
|
|
|
func (s *Store) Close() error {
|
|
return s.db.Close()
|
|
}
|
|
|
|
func (s *Store) UpsertSeen(torrent model.Torrent, now time.Time) (State, error) {
|
|
existing, err := s.state(torrent.ID)
|
|
if err != nil {
|
|
return State{}, err
|
|
}
|
|
|
|
firstSeen := now
|
|
if !existing.FirstSeenAt.IsZero() {
|
|
firstSeen = existing.FirstSeenAt
|
|
}
|
|
|
|
_, err = s.db.Exec(`
|
|
INSERT INTO torrents (
|
|
ncore_id, name, first_seen_at, last_seen_at, status, hnr_marked,
|
|
uploaded_text, downloaded_text, remaining_text, ratio_text
|
|
)
|
|
VALUES (?, ?, ?, ?, 'active', ?, ?, ?, ?, ?)
|
|
ON CONFLICT(ncore_id) DO UPDATE SET
|
|
name = excluded.name,
|
|
last_seen_at = excluded.last_seen_at,
|
|
status = 'active',
|
|
hnr_marked = excluded.hnr_marked,
|
|
uploaded_text = excluded.uploaded_text,
|
|
downloaded_text = excluded.downloaded_text,
|
|
remaining_text = excluded.remaining_text,
|
|
ratio_text = excluded.ratio_text
|
|
`, torrent.ID, torrent.Name, firstSeen.Format(time.RFC3339), now.Format(time.RFC3339), boolInt(torrent.HnRMarked), torrent.Uploaded, torrent.Downloaded, torrent.Remaining, torrent.Ratio)
|
|
if err != nil {
|
|
return State{}, err
|
|
}
|
|
|
|
return State{FirstSeenAt: firstSeen, LastSeenAt: now}, nil
|
|
}
|
|
|
|
func (s *Store) MarkResolved(activeIDs map[int64]bool, now time.Time) error {
|
|
rows, err := s.db.Query(`SELECT ncore_id FROM torrents WHERE status IN ('active', 'manual_needed')`)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var resolved []int64
|
|
for rows.Next() {
|
|
var id int64
|
|
if err := rows.Scan(&id); err != nil {
|
|
return err
|
|
}
|
|
if !activeIDs[id] {
|
|
resolved = append(resolved, id)
|
|
}
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, id := range resolved {
|
|
if _, err := s.db.Exec(`UPDATE torrents SET status = 'resolved', last_resolved_at = ? WHERE ncore_id = ?`, now.Format(time.RFC3339), id); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *Store) RecordQBit(torrent model.Torrent, qbit model.QBitTorrent, now time.Time, forceStarted bool, reannounced bool) error {
|
|
_, err := s.db.Exec(`
|
|
UPDATE torrents SET
|
|
qbit_hash = ?,
|
|
qbit_name = ?,
|
|
qbit_state = ?,
|
|
qbit_progress = ?,
|
|
qbit_ratio = ?,
|
|
qbit_uploaded = ?,
|
|
qbit_downloaded = ?,
|
|
qbit_last_activity = ?,
|
|
last_action_at = CASE WHEN ? OR ? THEN ? ELSE last_action_at END
|
|
WHERE ncore_id = ?
|
|
`, qbit.Hash, qbit.Name, qbit.State, qbit.Progress, qbit.Ratio, qbit.Uploaded, qbit.Downloaded, qbit.LastActivity, forceStarted, reannounced, now.Format(time.RFC3339), torrent.ID)
|
|
return err
|
|
}
|
|
|
|
func (s *Store) MarkManualNeeded(id int64, now time.Time) error {
|
|
_, err := s.db.Exec(`
|
|
UPDATE torrents
|
|
SET status = 'manual_needed',
|
|
manual_needed_at = COALESCE(manual_needed_at, ?)
|
|
WHERE ncore_id = ?
|
|
`, now.Format(time.RFC3339), id)
|
|
return err
|
|
}
|
|
|
|
func (s *Store) InsertRun(summary model.RunSummary) error {
|
|
status := summary.Status
|
|
if status == "" {
|
|
status = "success"
|
|
}
|
|
_, err := s.db.Exec(`
|
|
INSERT INTO runs (
|
|
started_at, status, error_step, error_message, dry_run,
|
|
total_risk, matched, unmatched, force_started, reannounced, manual_needed
|
|
)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
`, summary.StartedAt.Format(time.RFC3339), status, summary.ErrorStep, summary.ErrorMessage, boolInt(summary.DryRun), summary.TotalRisk, summary.Matched, summary.Unmatched, summary.ForceStarted, summary.Reannounced, summary.ManualNeeded)
|
|
return err
|
|
}
|
|
|
|
func (s *Store) Status() (model.StatusSnapshot, error) {
|
|
torrents, err := s.torrentStatuses("")
|
|
if err != nil {
|
|
return model.StatusSnapshot{}, err
|
|
}
|
|
return model.StatusSnapshot{Torrents: torrents}, nil
|
|
}
|
|
|
|
func (s *Store) Stats() (model.StatsSnapshot, error) {
|
|
counts, err := s.statusCounts()
|
|
if err != nil {
|
|
return model.StatsSnapshot{}, err
|
|
}
|
|
lastRun, err := s.lastRun()
|
|
if err != nil {
|
|
return model.StatsSnapshot{}, err
|
|
}
|
|
manualNeeded, err := s.torrentStatuses("manual_needed")
|
|
if err != nil {
|
|
return model.StatsSnapshot{}, err
|
|
}
|
|
|
|
return model.StatsSnapshot{
|
|
Counts: counts,
|
|
LastRun: lastRun,
|
|
ManualNeeded: manualNeeded,
|
|
}, nil
|
|
}
|
|
|
|
func (s *Store) state(id int64) (State, error) {
|
|
var firstText string
|
|
var lastText string
|
|
err := s.db.QueryRow(`SELECT first_seen_at, last_seen_at FROM torrents WHERE ncore_id = ?`, id).Scan(&firstText, &lastText)
|
|
if err == sql.ErrNoRows {
|
|
return State{}, nil
|
|
}
|
|
if err != nil {
|
|
return State{}, err
|
|
}
|
|
|
|
firstSeen, err := time.Parse(time.RFC3339, firstText)
|
|
if err != nil {
|
|
return State{}, fmt.Errorf("parse first_seen_at for %d: %w", id, err)
|
|
}
|
|
lastSeen, err := time.Parse(time.RFC3339, lastText)
|
|
if err != nil {
|
|
return State{}, fmt.Errorf("parse last_seen_at for %d: %w", id, err)
|
|
}
|
|
return State{FirstSeenAt: firstSeen, LastSeenAt: lastSeen}, nil
|
|
}
|
|
|
|
func (s *Store) statusCounts() ([]model.StatusCount, error) {
|
|
rows, err := s.db.Query(`SELECT status, COUNT(*) FROM torrents GROUP BY status ORDER BY status`)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var counts []model.StatusCount
|
|
for rows.Next() {
|
|
var count model.StatusCount
|
|
if err := rows.Scan(&count.Status, &count.Count); err != nil {
|
|
return nil, err
|
|
}
|
|
counts = append(counts, count)
|
|
}
|
|
return counts, rows.Err()
|
|
}
|
|
|
|
func (s *Store) lastRun() (*model.RunRecord, error) {
|
|
var run model.RunRecord
|
|
var dryRun int
|
|
err := s.db.QueryRow(`
|
|
SELECT
|
|
started_at,
|
|
status,
|
|
COALESCE(error_step, ''),
|
|
COALESCE(error_message, ''),
|
|
dry_run,
|
|
total_risk,
|
|
matched,
|
|
unmatched,
|
|
force_started,
|
|
reannounced,
|
|
manual_needed
|
|
FROM runs
|
|
ORDER BY id DESC
|
|
LIMIT 1
|
|
`).Scan(&run.StartedAt, &run.Status, &run.ErrorStep, &run.ErrorMessage, &dryRun, &run.TotalRisk, &run.Matched, &run.Unmatched, &run.ForceStarted, &run.Reannounced, &run.ManualNeeded)
|
|
if err == sql.ErrNoRows {
|
|
return nil, nil
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
run.DryRun = dryRun != 0
|
|
return &run, nil
|
|
}
|
|
|
|
func (s *Store) torrentStatuses(status string) ([]model.TorrentStatus, error) {
|
|
query := `
|
|
SELECT
|
|
ncore_id,
|
|
name,
|
|
status,
|
|
first_seen_at,
|
|
last_seen_at,
|
|
COALESCE(last_resolved_at, ''),
|
|
hnr_marked,
|
|
COALESCE(qbit_name, ''),
|
|
COALESCE(qbit_state, ''),
|
|
COALESCE(qbit_progress, 0),
|
|
COALESCE(qbit_ratio, 0),
|
|
COALESCE(last_action_at, ''),
|
|
COALESCE(manual_needed_at, '')
|
|
FROM torrents
|
|
`
|
|
var args []any
|
|
if status != "" {
|
|
query += ` WHERE status = ?`
|
|
args = append(args, status)
|
|
}
|
|
query += `
|
|
ORDER BY
|
|
CASE status
|
|
WHEN 'manual_needed' THEN 0
|
|
WHEN 'active' THEN 1
|
|
WHEN 'resolved' THEN 2
|
|
ELSE 3
|
|
END,
|
|
last_seen_at DESC,
|
|
name ASC
|
|
`
|
|
|
|
rows, err := s.db.Query(query, args...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var torrents []model.TorrentStatus
|
|
for rows.Next() {
|
|
var torrent model.TorrentStatus
|
|
var hnrMarked int
|
|
if err := rows.Scan(
|
|
&torrent.ID,
|
|
&torrent.Name,
|
|
&torrent.Status,
|
|
&torrent.FirstSeenAt,
|
|
&torrent.LastSeenAt,
|
|
&torrent.LastResolvedAt,
|
|
&hnrMarked,
|
|
&torrent.QBitName,
|
|
&torrent.QBitState,
|
|
&torrent.QBitProgress,
|
|
&torrent.QBitRatio,
|
|
&torrent.LastActionAt,
|
|
&torrent.ManualNeededAt,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
torrent.HnRMarked = hnrMarked != 0
|
|
torrents = append(torrents, torrent)
|
|
}
|
|
return torrents, rows.Err()
|
|
}
|
|
|
|
func (s *Store) migrate() error {
|
|
_, err := s.db.Exec(`
|
|
CREATE TABLE IF NOT EXISTS torrents (
|
|
ncore_id INTEGER PRIMARY KEY,
|
|
name TEXT NOT NULL,
|
|
first_seen_at TEXT NOT NULL,
|
|
last_seen_at TEXT NOT NULL,
|
|
last_resolved_at TEXT,
|
|
status TEXT NOT NULL,
|
|
hnr_marked INTEGER NOT NULL DEFAULT 0,
|
|
uploaded_text TEXT,
|
|
downloaded_text TEXT,
|
|
remaining_text TEXT,
|
|
ratio_text TEXT,
|
|
qbit_hash TEXT,
|
|
qbit_name TEXT,
|
|
qbit_state TEXT,
|
|
qbit_progress REAL,
|
|
qbit_ratio REAL,
|
|
qbit_uploaded INTEGER,
|
|
qbit_downloaded INTEGER,
|
|
qbit_last_activity INTEGER,
|
|
last_action_at TEXT,
|
|
manual_needed_at TEXT
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS runs (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
started_at TEXT NOT NULL,
|
|
status TEXT NOT NULL DEFAULT 'success',
|
|
error_step TEXT,
|
|
error_message TEXT,
|
|
dry_run INTEGER NOT NULL,
|
|
total_risk INTEGER NOT NULL,
|
|
matched INTEGER NOT NULL,
|
|
unmatched INTEGER NOT NULL,
|
|
force_started INTEGER NOT NULL,
|
|
reannounced INTEGER NOT NULL,
|
|
manual_needed INTEGER NOT NULL
|
|
);
|
|
`)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := s.addColumnIfMissing("runs", "status", "TEXT NOT NULL DEFAULT 'success'"); err != nil {
|
|
return err
|
|
}
|
|
if err := s.addColumnIfMissing("runs", "error_step", "TEXT"); err != nil {
|
|
return err
|
|
}
|
|
if err := s.addColumnIfMissing("runs", "error_message", "TEXT"); err != nil {
|
|
return err
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (s *Store) addColumnIfMissing(table string, column string, definition string) error {
|
|
rows, err := s.db.Query(fmt.Sprintf(`PRAGMA table_info(%s)`, table))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer rows.Close()
|
|
|
|
for rows.Next() {
|
|
var cid int
|
|
var name string
|
|
var columnType string
|
|
var notNull int
|
|
var defaultValue sql.NullString
|
|
var pk int
|
|
if err := rows.Scan(&cid, &name, &columnType, ¬Null, &defaultValue, &pk); err != nil {
|
|
return err
|
|
}
|
|
if name == column {
|
|
return rows.Err()
|
|
}
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return err
|
|
}
|
|
|
|
_, err = s.db.Exec(fmt.Sprintf(`ALTER TABLE %s ADD COLUMN %s %s`, table, column, definition))
|
|
return err
|
|
}
|
|
|
|
func boolInt(value bool) int {
|
|
if value {
|
|
return 1
|
|
}
|
|
return 0
|
|
}
|