From b6cb5744a99883e2c1953021344e165da19bcc9c Mon Sep 17 00:00:00 2001 From: Travis Ralston <travpc@gmail.com> Date: Mon, 2 Sep 2019 16:17:15 -0600 Subject: [PATCH] Continue transfers that didn't finish before the restart --- cmd/media_repo/main.go | 58 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/cmd/media_repo/main.go b/cmd/media_repo/main.go index 193a430a..4d72505d 100644 --- a/cmd/media_repo/main.go +++ b/cmd/media_repo/main.go @@ -9,6 +9,7 @@ import ( "github.com/turt2live/matrix-media-repo/api/webserver" "github.com/turt2live/matrix-media-repo/common/config" "github.com/turt2live/matrix-media-repo/common/logging" + "github.com/turt2live/matrix-media-repo/controllers/maintenance_controller" "github.com/turt2live/matrix-media-repo/metrics" "github.com/turt2live/matrix-media-repo/storage" "github.com/turt2live/matrix-media-repo/storage/datastore" @@ -96,7 +97,64 @@ func main() { logrus.Warn("You are using `storagePaths` in your configuration - in a future update, this will be removed. Please use datastores instead (see sample config).") } + logrus.Info("Checking background tasks...") + err = scanAndStartUnfinishedTasks() + if err != nil { + logrus.Fatal(err) + } + logrus.Info("Starting media repository...") metrics.Init() webserver.Init() // blocks to listen for requests } + +func scanAndStartUnfinishedTasks() error { + ctx := context.Background() + log := logrus.WithFields(logrus.Fields{"stage": "startup"}) + db := storage.GetDatabase().GetMetadataStore(ctx, log) + tasks, err := db.GetAllBackgroundTasks() + if err != nil { + return err + } + for _, task := range tasks { + if task.EndTs > 0 { + continue + } + + taskLog := log.WithFields(logrus.Fields{ + "prev_task_id": task.ID, + "prev_task_name": task.Name, + }) + + if task.Name == "storage_migration" { + beforeTs := int64(task.Params["before_ts"].(float64)) + sourceDsId := task.Params["source_datastore_id"].(string) + targetDsId := task.Params["target_datastore_id"].(string) + + sourceDs, err := datastore.LocateDatastore(ctx, taskLog, sourceDsId) + if err != nil { + return err + } + targetDs, err := datastore.LocateDatastore(ctx, taskLog, targetDsId) + if err != nil { + return err + } + + newTask, err := maintenance_controller.StartStorageMigration(sourceDs, targetDs, beforeTs, taskLog) + if err != nil { + return err + } + + err = db.FinishedBackgroundTask(task.ID) + if err != nil { + return err + } + + taskLog.Infof("Started replacement task ID %d for unfinished task %d (%s)", newTask.ID, task.ID, task.Name) + } else { + taskLog.Warn(fmt.Sprintf("Unknown task %s at ID %d - ignoring", task.Name, task.ID)) + } + } + + return nil +} -- GitLab