diff --git a/cmd/media_repo/main.go b/cmd/media_repo/main.go index 88c1509f6ce0902e543c55844f4a1e46a14bbc71..4f4f58c8d6147b403623d58349e6449bea1b1e42 100644 --- a/cmd/media_repo/main.go +++ b/cmd/media_repo/main.go @@ -73,13 +73,6 @@ func main() { logrus.Info("Starting up...") runtime.RunStartupSequence() - logrus.Info("Checking background tasks...") - err = scanAndStartUnfinishedTasks() - if err != nil { - sentry.CaptureException(err) - logrus.Fatal(err) - } - logrus.Info("Starting recurring tasks...") tasks.StartAll() diff --git a/tasks/all.go b/tasks/all.go index 713ef16b4ce5fa9b1a2e7e59a9967d70b5eb07c0..a4eb7e01a4548984d7403680a5084f668ca4bad2 100644 --- a/tasks/all.go +++ b/tasks/all.go @@ -10,6 +10,8 @@ func StartAll() { scheduleHourly(RecurringTaskPurgeRemoteMedia, task_runner.PurgeRemoteMedia) scheduleHourly(RecurringTaskPurgeThumbnails, task_runner.PurgeThumbnails) scheduleHourly(RecurringTaskPurgePreviews, task_runner.PurgePreviews) + + scheduleUnfinished() } func StopAll() { diff --git a/tasks/exec.go b/tasks/exec.go index dc9d3ca82253476bda03e2ebacd709382ad81c3c..a8eab10c85d3c1b9b16d582358888bdf2db12dbe 100644 --- a/tasks/exec.go +++ b/tasks/exec.go @@ -56,6 +56,10 @@ func tryBeginTask(id int, recur bool) { } func beginTask(task *database.DbTask) { + if task.EndTs > 0 { + return // just skip it + } + // TODO: Worker group: https://github.com/turt2live/matrix-media-repo/issues/425 runnerCtx := rcontext.Initial().LogWithFields(logrus.Fields{"task_id": task.TaskId}) if task.Name == string(TaskDatastoreMigrate) { go task_runner.DatastoreMigrate(runnerCtx, task) diff --git a/tasks/schedule.go b/tasks/schedule.go index c1f394b2902291d1580be07d09161cf05202b4db..9821a7cb4e252ff52b2925c29a19ad637258cfab 100644 --- a/tasks/schedule.go +++ b/tasks/schedule.go @@ -5,6 +5,7 @@ import ( "sync" "time" + "github.com/getsentry/sentry-go" "github.com/sirupsen/logrus" "github.com/turt2live/matrix-media-repo/common/rcontext" "github.com/turt2live/matrix-media-repo/database" @@ -97,6 +98,23 @@ func stopRecurring() { } } +func scheduleUnfinished() { + if ids.GetMachineId() != 0 { + return // don't schedule here + } + ctx := rcontext.Initial().LogWithFields(logrus.Fields{"startup": true}) + taskDb := database.GetInstance().Tasks.Prepare(ctx) + tasks, err := taskDb.GetAll(false) + if err != nil { + sentry.CaptureException(err) + ctx.Log.Fatal("Error getting unfinished tasks: ", err) + return + } + for _, task := range tasks { + beginTask(task) + } +} + func RunDatastoreMigration(ctx rcontext.RequestContext, sourceDsId string, targetDsId string, beforeTs int64) (*database.DbTask, error) { return scheduleTask(ctx, TaskDatastoreMigrate, task_runner.DatastoreMigrateParams{ SourceDsId: sourceDsId,