Add retry mechanism and failure notifications for nCore/qBittorrent setup
- Implemented a retry mechanism for external calls, allowing up to 3 attempts before failing. - Enhanced error handling to send failure notifications when setup steps fail, including detailed error messages. - Updated RunSummary model to include status, error step, and error message fields for better tracking of run outcomes. - Modified database schema to store failure metadata for runs. - Updated README.md to reflect changes in error handling and notification behavior.
This commit is contained in:
@@ -16,52 +16,68 @@ import (
|
||||
"ncore-hnr/internal/store"
|
||||
)
|
||||
|
||||
const retryAttempts = 3
|
||||
const retryDelay = 20 * time.Second
|
||||
|
||||
func Run(cfg config.Config) (model.RunSummary, error) {
|
||||
startedAt := time.Now().UTC()
|
||||
summary := model.RunSummary{StartedAt: startedAt, DryRun: cfg.DryRun}
|
||||
summary := model.RunSummary{StartedAt: startedAt, Status: "success", DryRun: cfg.DryRun}
|
||||
|
||||
db, err := store.Open(cfg.DBPath)
|
||||
if err != nil {
|
||||
return summary, err
|
||||
return failRun(cfg, nil, summary, "open database", err)
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
ncoreClient := ncore.New(newHTTPClient(cfg.HTTPTimeout), cfg.NCoreLoginURL, cfg.NCoreHitRunURL)
|
||||
if err := ncoreClient.Login(cfg.NCoreUsername, cfg.NCorePassword); err != nil {
|
||||
return summary, err
|
||||
}
|
||||
|
||||
page, err := ncoreClient.FetchHitRun()
|
||||
if err != nil {
|
||||
return summary, err
|
||||
var page model.HitRunPage
|
||||
if err := withRetry("fetch nCore HnR page", func() error {
|
||||
ncoreClient := ncore.New(newHTTPClient(cfg.HTTPTimeout), cfg.NCoreLoginURL, cfg.NCoreHitRunURL)
|
||||
if err := ncoreClient.Login(cfg.NCoreUsername, cfg.NCorePassword); err != nil {
|
||||
return fmt.Errorf("login: %w", err)
|
||||
}
|
||||
fetched, err := ncoreClient.FetchHitRun()
|
||||
if err != nil {
|
||||
return fmt.Errorf("fetch: %w", err)
|
||||
}
|
||||
page = fetched
|
||||
return nil
|
||||
}); err != nil {
|
||||
return failRun(cfg, db, summary, "fetch nCore HnR page", err)
|
||||
}
|
||||
summary.TotalRisk = len(page.Torrents)
|
||||
|
||||
var qbitClient *qbit.Client
|
||||
var qbitTorrents []model.QBitTorrent
|
||||
if !cfg.SkipQBit && cfg.QBitURL != "" {
|
||||
if err := withRetry("load qBittorrent torrents", func() error {
|
||||
client := qbit.New(cfg.QBitURL, newHTTPClient(cfg.HTTPTimeout))
|
||||
if err := client.Login(cfg.QBitUsername, cfg.QBitPassword); err != nil {
|
||||
return fmt.Errorf("login: %w", err)
|
||||
}
|
||||
torrents, err := client.Torrents()
|
||||
if err != nil {
|
||||
return fmt.Errorf("list torrents: %w", err)
|
||||
}
|
||||
qbitClient = client
|
||||
qbitTorrents = torrents
|
||||
return nil
|
||||
}); err != nil {
|
||||
return failRun(cfg, db, summary, "load qBittorrent torrents", err)
|
||||
}
|
||||
}
|
||||
|
||||
activeIDs := map[int64]bool{}
|
||||
for _, torrent := range page.Torrents {
|
||||
activeIDs[torrent.ID] = true
|
||||
}
|
||||
if err := db.MarkResolved(activeIDs, startedAt); err != nil {
|
||||
return summary, err
|
||||
}
|
||||
|
||||
var qbitClient *qbit.Client
|
||||
var qbitTorrents []model.QBitTorrent
|
||||
if !cfg.SkipQBit && cfg.QBitURL != "" {
|
||||
qbitClient = qbit.New(cfg.QBitURL, newHTTPClient(cfg.HTTPTimeout))
|
||||
if err := qbitClient.Login(cfg.QBitUsername, cfg.QBitPassword); err != nil {
|
||||
return summary, err
|
||||
}
|
||||
qbitTorrents, err = qbitClient.Torrents()
|
||||
if err != nil {
|
||||
return summary, err
|
||||
}
|
||||
return failRun(cfg, db, summary, "mark resolved torrents", err)
|
||||
}
|
||||
|
||||
for _, torrent := range page.Torrents {
|
||||
state, err := db.UpsertSeen(torrent, startedAt)
|
||||
if err != nil {
|
||||
return summary, err
|
||||
return failRun(cfg, db, summary, "upsert torrent state", err)
|
||||
}
|
||||
|
||||
result := model.ActionResult{
|
||||
@@ -90,28 +106,32 @@ func Run(cfg config.Config) (model.RunSummary, error) {
|
||||
summary.Matched++
|
||||
|
||||
if !cfg.DryRun {
|
||||
if err := qbitClient.ForceStart(matched.Hash); err != nil {
|
||||
return summary, fmt.Errorf("force-start %q: %w", torrent.Name, err)
|
||||
if err := withRetry(fmt.Sprintf("force-start %q", torrent.Name), func() error {
|
||||
return qbitClient.ForceStart(matched.Hash)
|
||||
}); err != nil {
|
||||
return failRun(cfg, db, summary, fmt.Sprintf("force-start %q", torrent.Name), err)
|
||||
}
|
||||
result.ForceStarted = true
|
||||
summary.ForceStarted++
|
||||
|
||||
if err := qbitClient.Reannounce(matched.Hash); err != nil {
|
||||
return summary, fmt.Errorf("reannounce %q: %w", torrent.Name, err)
|
||||
if err := withRetry(fmt.Sprintf("reannounce %q", torrent.Name), func() error {
|
||||
return qbitClient.Reannounce(matched.Hash)
|
||||
}); err != nil {
|
||||
return failRun(cfg, db, summary, fmt.Sprintf("reannounce %q", torrent.Name), err)
|
||||
}
|
||||
result.Reannounced = true
|
||||
summary.Reannounced++
|
||||
}
|
||||
|
||||
if err := db.RecordQBit(torrent, matched, startedAt, result.ForceStarted, result.Reannounced); err != nil {
|
||||
return summary, err
|
||||
return failRun(cfg, db, summary, "record qBittorrent state", err)
|
||||
}
|
||||
|
||||
if startedAt.Sub(state.FirstSeenAt) >= cfg.AlertAfter {
|
||||
result.ManualNeeded = true
|
||||
summary.ManualNeeded++
|
||||
if err := db.MarkManualNeeded(torrent.ID, startedAt); err != nil {
|
||||
return summary, err
|
||||
return failRun(cfg, db, summary, "mark manual-needed torrent", err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -119,7 +139,7 @@ func Run(cfg config.Config) (model.RunSummary, error) {
|
||||
}
|
||||
|
||||
if err := db.InsertRun(summary); err != nil {
|
||||
return summary, err
|
||||
return failRun(cfg, db, summary, "record run", err)
|
||||
}
|
||||
|
||||
if cfg.NotificationDryRun {
|
||||
@@ -129,7 +149,9 @@ func Run(cfg config.Config) (model.RunSummary, error) {
|
||||
|
||||
notifier := newNotifier(cfg)
|
||||
if notifier != nil {
|
||||
if err := notifier.SendManualNeeded(summary.Results); err != nil {
|
||||
if err := withRetry("send manual-needed notification", func() error {
|
||||
return notifier.SendManualNeeded(summary.Results)
|
||||
}); err != nil {
|
||||
return summary, err
|
||||
}
|
||||
}
|
||||
@@ -215,8 +237,10 @@ func PrintStats(snapshot model.StatsSnapshot, asJSON bool) error {
|
||||
fmt.Println("Last run: none")
|
||||
} else {
|
||||
run := snapshot.LastRun
|
||||
fmt.Printf("Last run: %s | total=%d matched=%d unmatched=%d force-started=%d reannounced=%d manual-needed=%d dry-run=%t\n",
|
||||
status := defaultText(run.Status, "success")
|
||||
fmt.Printf("Last run: %s | status=%s total=%d matched=%d unmatched=%d force-started=%d reannounced=%d manual-needed=%d dry-run=%t\n",
|
||||
run.StartedAt,
|
||||
status,
|
||||
run.TotalRisk,
|
||||
run.Matched,
|
||||
run.Unmatched,
|
||||
@@ -225,6 +249,9 @@ func PrintStats(snapshot model.StatsSnapshot, asJSON bool) error {
|
||||
run.ManualNeeded,
|
||||
run.DryRun,
|
||||
)
|
||||
if status == "failed" {
|
||||
fmt.Printf("Last error: %s | %s\n", defaultText(run.ErrorStep, "-"), defaultText(run.ErrorMessage, "-"))
|
||||
}
|
||||
}
|
||||
|
||||
if len(snapshot.ManualNeeded) > 0 {
|
||||
@@ -319,3 +346,58 @@ func printNotificationPreview(notificationType string, results []model.ActionRes
|
||||
fmt.Fprintf(os.Stderr, "notification dry-run: type=%s\n", notificationType)
|
||||
fmt.Fprintf(os.Stderr, "Subject: %s\n\n%s", subject, body)
|
||||
}
|
||||
|
||||
func withRetry(step string, fn func() error) error {
|
||||
var lastErr error
|
||||
for attempt := 1; attempt <= retryAttempts; attempt++ {
|
||||
if err := fn(); err != nil {
|
||||
lastErr = err
|
||||
fmt.Fprintf(os.Stderr, "%s failed (attempt %d/%d): %v\n", step, attempt, retryAttempts, err)
|
||||
if attempt < retryAttempts {
|
||||
time.Sleep(retryDelay)
|
||||
}
|
||||
continue
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("failed after %d attempts: %w", retryAttempts, lastErr)
|
||||
}
|
||||
|
||||
func failRun(cfg config.Config, db *store.Store, summary model.RunSummary, step string, err error) (model.RunSummary, error) {
|
||||
summary.Status = "failed"
|
||||
summary.ErrorStep = step
|
||||
summary.ErrorMessage = err.Error()
|
||||
|
||||
fmt.Fprintf(os.Stderr, "run failed at %s: %v\n", step, err)
|
||||
if db != nil {
|
||||
if insertErr := db.InsertRun(summary); insertErr != nil {
|
||||
fmt.Fprintf(os.Stderr, "record failed run: %v\n", insertErr)
|
||||
}
|
||||
}
|
||||
|
||||
if notifyErr := sendFailureNotification(cfg, step, err); notifyErr != nil {
|
||||
return summary, fmt.Errorf("%s: %w; failure notification failed: %v", step, err, notifyErr)
|
||||
}
|
||||
return summary, fmt.Errorf("%s: %w", step, err)
|
||||
}
|
||||
|
||||
func sendFailureNotification(cfg config.Config, step string, err error) error {
|
||||
if cfg.NotificationDryRun {
|
||||
subject, body := notify.FailureMessage(step, err)
|
||||
notificationType := cfg.NotificationType
|
||||
if notificationType == "" {
|
||||
notificationType = "unconfigured"
|
||||
}
|
||||
fmt.Fprintf(os.Stderr, "failure notification dry-run: type=%s\n", notificationType)
|
||||
fmt.Fprintf(os.Stderr, "Subject: %s\n\n%s", subject, body)
|
||||
return nil
|
||||
}
|
||||
|
||||
notifier := newNotifier(cfg)
|
||||
if notifier == nil {
|
||||
return nil
|
||||
}
|
||||
return withRetry("send failure notification", func() error {
|
||||
return notifier.SendFailure(step, err)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -48,6 +48,9 @@ type ActionResult struct {
|
||||
|
||||
type RunSummary struct {
|
||||
StartedAt time.Time `json:"started_at"`
|
||||
Status string `json:"status"`
|
||||
ErrorStep string `json:"error_step,omitempty"`
|
||||
ErrorMessage string `json:"error_message,omitempty"`
|
||||
DryRun bool `json:"dry_run"`
|
||||
TotalRisk int `json:"total_risk"`
|
||||
Matched int `json:"matched"`
|
||||
@@ -65,6 +68,9 @@ type StatusCount struct {
|
||||
|
||||
type RunRecord struct {
|
||||
StartedAt string `json:"started_at"`
|
||||
Status string `json:"status"`
|
||||
ErrorStep string `json:"error_step,omitempty"`
|
||||
ErrorMessage string `json:"error_message,omitempty"`
|
||||
DryRun bool `json:"dry_run"`
|
||||
TotalRisk int `json:"total_risk"`
|
||||
Matched int `json:"matched"`
|
||||
|
||||
@@ -14,9 +14,11 @@ import (
|
||||
)
|
||||
|
||||
const manualNeededSubject = "nCore HnR manual work"
|
||||
const failureSubject = "nCore HnR check failed"
|
||||
|
||||
type Sender interface {
|
||||
SendManualNeeded(results []model.ActionResult) error
|
||||
SendFailure(step string, failure error) error
|
||||
}
|
||||
|
||||
type NotificationNTFY struct {
|
||||
@@ -29,6 +31,18 @@ func (n NotificationNTFY) SendManualNeeded(results []model.ActionResult) error {
|
||||
if strings.TrimSpace(n.URL) == "" || !ok {
|
||||
return nil
|
||||
}
|
||||
return n.sendMessage(manualNeededSubject, body)
|
||||
}
|
||||
|
||||
func (n NotificationNTFY) SendFailure(step string, failure error) error {
|
||||
if strings.TrimSpace(n.URL) == "" {
|
||||
return nil
|
||||
}
|
||||
_, body := FailureMessage(step, failure)
|
||||
return n.sendMessage(failureSubject, body)
|
||||
}
|
||||
|
||||
func (n NotificationNTFY) sendMessage(subject string, body string) error {
|
||||
client := n.HTTPClient
|
||||
if client == nil {
|
||||
client = &http.Client{Timeout: 15 * time.Second}
|
||||
@@ -39,7 +53,7 @@ func (n NotificationNTFY) SendManualNeeded(results []model.ActionResult) error {
|
||||
return err
|
||||
}
|
||||
req.Header.Set("Content-Type", "text/plain; charset=utf-8")
|
||||
req.Header.Set("Title", manualNeededSubject)
|
||||
req.Header.Set("Title", subject)
|
||||
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
@@ -66,7 +80,18 @@ func (s NotificationSMTP) SendManualNeeded(results []model.ActionResult) error {
|
||||
if strings.TrimSpace(s.Host) == "" || !ok {
|
||||
return nil
|
||||
}
|
||||
return s.sendMessage(manualNeededSubject, body)
|
||||
}
|
||||
|
||||
func (s NotificationSMTP) SendFailure(step string, failure error) error {
|
||||
if strings.TrimSpace(s.Host) == "" {
|
||||
return nil
|
||||
}
|
||||
_, body := FailureMessage(step, failure)
|
||||
return s.sendMessage(failureSubject, body)
|
||||
}
|
||||
|
||||
func (s NotificationSMTP) sendMessage(subject string, body string) error {
|
||||
host := strings.TrimSpace(s.Host)
|
||||
addr := net.JoinHostPort(host, strings.TrimSpace(s.Port))
|
||||
|
||||
@@ -92,7 +117,7 @@ func (s NotificationSMTP) SendManualNeeded(results []model.ActionResult) error {
|
||||
message := strings.Builder{}
|
||||
message.WriteString(fmt.Sprintf("From: %s\r\n", from.String()))
|
||||
message.WriteString(fmt.Sprintf("To: %s\r\n", formatAddressList(recipients)))
|
||||
message.WriteString(fmt.Sprintf("Subject: %s\r\n", manualNeededSubject))
|
||||
message.WriteString(fmt.Sprintf("Subject: %s\r\n", subject))
|
||||
message.WriteString("MIME-Version: 1.0\r\n")
|
||||
message.WriteString("Content-Type: text/plain; charset=UTF-8\r\n")
|
||||
message.WriteString("\r\n")
|
||||
@@ -109,6 +134,17 @@ func ManualNeededMessage(results []model.ActionResult) (string, string, bool) {
|
||||
return manualNeededSubject, body, ok
|
||||
}
|
||||
|
||||
func FailureMessage(step string, failure error) (string, string) {
|
||||
var body strings.Builder
|
||||
body.WriteString("nCore HnR check failed after retries.\n")
|
||||
body.WriteString(fmt.Sprintf("Step: %s\n", strings.TrimSpace(step)))
|
||||
if failure != nil {
|
||||
body.WriteString(fmt.Sprintf("Error: %s\n", failure.Error()))
|
||||
}
|
||||
body.WriteString("Action: review the CronJob logs. Failed nCore/qBittorrent setup steps do not clear unresolved torrent state; the next cron run will retry.\n")
|
||||
return failureSubject, body.String()
|
||||
}
|
||||
|
||||
func manualNeededText(results []model.ActionResult) (string, bool) {
|
||||
var body strings.Builder
|
||||
manualCount := 0
|
||||
|
||||
@@ -135,10 +135,17 @@ func (s *Store) MarkManualNeeded(id int64, now time.Time) error {
|
||||
}
|
||||
|
||||
func (s *Store) InsertRun(summary model.RunSummary) error {
|
||||
status := summary.Status
|
||||
if status == "" {
|
||||
status = "success"
|
||||
}
|
||||
_, err := s.db.Exec(`
|
||||
INSERT INTO runs (started_at, dry_run, total_risk, matched, unmatched, force_started, reannounced, manual_needed)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
||||
`, summary.StartedAt.Format(time.RFC3339), boolInt(summary.DryRun), summary.TotalRisk, summary.Matched, summary.Unmatched, summary.ForceStarted, summary.Reannounced, summary.ManualNeeded)
|
||||
INSERT INTO runs (
|
||||
started_at, status, error_step, error_message, dry_run,
|
||||
total_risk, matched, unmatched, force_started, reannounced, manual_needed
|
||||
)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
`, summary.StartedAt.Format(time.RFC3339), status, summary.ErrorStep, summary.ErrorMessage, boolInt(summary.DryRun), summary.TotalRisk, summary.Matched, summary.Unmatched, summary.ForceStarted, summary.Reannounced, summary.ManualNeeded)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -215,11 +222,22 @@ func (s *Store) lastRun() (*model.RunRecord, error) {
|
||||
var run model.RunRecord
|
||||
var dryRun int
|
||||
err := s.db.QueryRow(`
|
||||
SELECT started_at, dry_run, total_risk, matched, unmatched, force_started, reannounced, manual_needed
|
||||
SELECT
|
||||
started_at,
|
||||
status,
|
||||
COALESCE(error_step, ''),
|
||||
COALESCE(error_message, ''),
|
||||
dry_run,
|
||||
total_risk,
|
||||
matched,
|
||||
unmatched,
|
||||
force_started,
|
||||
reannounced,
|
||||
manual_needed
|
||||
FROM runs
|
||||
ORDER BY id DESC
|
||||
LIMIT 1
|
||||
`).Scan(&run.StartedAt, &dryRun, &run.TotalRisk, &run.Matched, &run.Unmatched, &run.ForceStarted, &run.Reannounced, &run.ManualNeeded)
|
||||
`).Scan(&run.StartedAt, &run.Status, &run.ErrorStep, &run.ErrorMessage, &dryRun, &run.TotalRisk, &run.Matched, &run.Unmatched, &run.ForceStarted, &run.Reannounced, &run.ManualNeeded)
|
||||
if err == sql.ErrNoRows {
|
||||
return nil, nil
|
||||
}
|
||||
@@ -327,6 +345,9 @@ func (s *Store) migrate() error {
|
||||
CREATE TABLE IF NOT EXISTS runs (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
started_at TEXT NOT NULL,
|
||||
status TEXT NOT NULL DEFAULT 'success',
|
||||
error_step TEXT,
|
||||
error_message TEXT,
|
||||
dry_run INTEGER NOT NULL,
|
||||
total_risk INTEGER NOT NULL,
|
||||
matched INTEGER NOT NULL,
|
||||
@@ -336,6 +357,47 @@ func (s *Store) migrate() error {
|
||||
manual_needed INTEGER NOT NULL
|
||||
);
|
||||
`)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := s.addColumnIfMissing("runs", "status", "TEXT NOT NULL DEFAULT 'success'"); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := s.addColumnIfMissing("runs", "error_step", "TEXT"); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := s.addColumnIfMissing("runs", "error_message", "TEXT"); err != nil {
|
||||
return err
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *Store) addColumnIfMissing(table string, column string, definition string) error {
|
||||
rows, err := s.db.Query(fmt.Sprintf(`PRAGMA table_info(%s)`, table))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
for rows.Next() {
|
||||
var cid int
|
||||
var name string
|
||||
var columnType string
|
||||
var notNull int
|
||||
var defaultValue sql.NullString
|
||||
var pk int
|
||||
if err := rows.Scan(&cid, &name, &columnType, ¬Null, &defaultValue, &pk); err != nil {
|
||||
return err
|
||||
}
|
||||
if name == column {
|
||||
return rows.Err()
|
||||
}
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = s.db.Exec(fmt.Sprintf(`ALTER TABLE %s ADD COLUMN %s %s`, table, column, definition))
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
@@ -43,3 +43,42 @@ func TestMarkResolvedResolvesManualNeededTorrent(t *testing.T) {
|
||||
t.Fatalf("expected last_resolved_at %q, got %q", resolvedAt.Format(time.RFC3339), got.LastResolvedAt)
|
||||
}
|
||||
}
|
||||
|
||||
func TestInsertRunRecordsFailureMetadata(t *testing.T) {
|
||||
db, err := Open(t.TempDir() + "/state.sqlite")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
startedAt := time.Date(2026, 5, 7, 11, 0, 0, 0, time.UTC)
|
||||
if err := db.InsertRun(model.RunSummary{
|
||||
StartedAt: startedAt,
|
||||
Status: "failed",
|
||||
ErrorStep: "fetch nCore HnR page",
|
||||
ErrorMessage: "timeout",
|
||||
DryRun: true,
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
stats, err := db.Stats()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if stats.LastRun == nil {
|
||||
t.Fatal("expected last run")
|
||||
}
|
||||
if stats.LastRun.Status != "failed" {
|
||||
t.Fatalf("expected status failed, got %q", stats.LastRun.Status)
|
||||
}
|
||||
if stats.LastRun.ErrorStep != "fetch nCore HnR page" {
|
||||
t.Fatalf("expected error step to be recorded, got %q", stats.LastRun.ErrorStep)
|
||||
}
|
||||
if stats.LastRun.ErrorMessage != "timeout" {
|
||||
t.Fatalf("expected error message to be recorded, got %q", stats.LastRun.ErrorMessage)
|
||||
}
|
||||
if !stats.LastRun.DryRun {
|
||||
t.Fatal("expected dry-run to be recorded")
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user