Skip to content
Snippets Groups Projects
Commit ddbdd424 authored by Travis Ralston's avatar Travis Ralston
Browse files

Fix unfinished task scheduling

parent 06781c8f
No related branches found
No related tags found
No related merge requests found
...@@ -73,13 +73,6 @@ func main() { ...@@ -73,13 +73,6 @@ func main() {
logrus.Info("Starting up...") logrus.Info("Starting up...")
runtime.RunStartupSequence() runtime.RunStartupSequence()
logrus.Info("Checking background tasks...")
err = scanAndStartUnfinishedTasks()
if err != nil {
sentry.CaptureException(err)
logrus.Fatal(err)
}
logrus.Info("Starting recurring tasks...") logrus.Info("Starting recurring tasks...")
tasks.StartAll() tasks.StartAll()
......
...@@ -10,6 +10,8 @@ func StartAll() { ...@@ -10,6 +10,8 @@ func StartAll() {
scheduleHourly(RecurringTaskPurgeRemoteMedia, task_runner.PurgeRemoteMedia) scheduleHourly(RecurringTaskPurgeRemoteMedia, task_runner.PurgeRemoteMedia)
scheduleHourly(RecurringTaskPurgeThumbnails, task_runner.PurgeThumbnails) scheduleHourly(RecurringTaskPurgeThumbnails, task_runner.PurgeThumbnails)
scheduleHourly(RecurringTaskPurgePreviews, task_runner.PurgePreviews) scheduleHourly(RecurringTaskPurgePreviews, task_runner.PurgePreviews)
scheduleUnfinished()
} }
func StopAll() { func StopAll() {
......
...@@ -56,6 +56,10 @@ func tryBeginTask(id int, recur bool) { ...@@ -56,6 +56,10 @@ func tryBeginTask(id int, recur bool) {
} }
func beginTask(task *database.DbTask) { func beginTask(task *database.DbTask) {
if task.EndTs > 0 {
return // just skip it
}
// TODO: Worker group: https://github.com/turt2live/matrix-media-repo/issues/425
runnerCtx := rcontext.Initial().LogWithFields(logrus.Fields{"task_id": task.TaskId}) runnerCtx := rcontext.Initial().LogWithFields(logrus.Fields{"task_id": task.TaskId})
if task.Name == string(TaskDatastoreMigrate) { if task.Name == string(TaskDatastoreMigrate) {
go task_runner.DatastoreMigrate(runnerCtx, task) go task_runner.DatastoreMigrate(runnerCtx, task)
......
...@@ -5,6 +5,7 @@ import ( ...@@ -5,6 +5,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/getsentry/sentry-go"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/turt2live/matrix-media-repo/common/rcontext" "github.com/turt2live/matrix-media-repo/common/rcontext"
"github.com/turt2live/matrix-media-repo/database" "github.com/turt2live/matrix-media-repo/database"
...@@ -97,6 +98,23 @@ func stopRecurring() { ...@@ -97,6 +98,23 @@ func stopRecurring() {
} }
} }
func scheduleUnfinished() {
if ids.GetMachineId() != 0 {
return // don't schedule here
}
ctx := rcontext.Initial().LogWithFields(logrus.Fields{"startup": true})
taskDb := database.GetInstance().Tasks.Prepare(ctx)
tasks, err := taskDb.GetAll(false)
if err != nil {
sentry.CaptureException(err)
ctx.Log.Fatal("Error getting unfinished tasks: ", err)
return
}
for _, task := range tasks {
beginTask(task)
}
}
func RunDatastoreMigration(ctx rcontext.RequestContext, sourceDsId string, targetDsId string, beforeTs int64) (*database.DbTask, error) { func RunDatastoreMigration(ctx rcontext.RequestContext, sourceDsId string, targetDsId string, beforeTs int64) (*database.DbTask, error) {
return scheduleTask(ctx, TaskDatastoreMigrate, task_runner.DatastoreMigrateParams{ return scheduleTask(ctx, TaskDatastoreMigrate, task_runner.DatastoreMigrateParams{
SourceDsId: sourceDsId, SourceDsId: sourceDsId,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment