From 5bba07f8b98d3bc5253d8db7de761aff5da005df Mon Sep 17 00:00:00 2001
From: Travis Ralston <travpc@gmail.com>
Date: Mon, 2 Sep 2019 16:04:33 -0600
Subject: [PATCH] Add a simple background tasks API for storage migrations

To be expanded further.
---
 api/custom/datastores.go                      |  19 +++-
 api/custom/tasks.go                           | 103 ++++++++++++++++++
 api/webserver/webserver.go                    |   6 +
 .../maintainance_controller.go                |  21 +++-
 docs/admin.md                                 |  91 ++++++++++++++++
 .../10_add_background_tasks_table_down.sql    |   1 +
 .../10_add_background_tasks_table_up.sql      |   7 ++
 storage/stores/metadata_store.go              | 103 ++++++++++++++++++
 types/background_task.go                      |   9 ++
 9 files changed, 357 insertions(+), 3 deletions(-)
 create mode 100644 api/custom/tasks.go
 create mode 100644 migrations/10_add_background_tasks_table_down.sql
 create mode 100644 migrations/10_add_background_tasks_table_up.sql
 create mode 100644 types/background_task.go

diff --git a/api/custom/datastores.go b/api/custom/datastores.go
index f7828a38..3285a854 100644
--- a/api/custom/datastores.go
+++ b/api/custom/datastores.go
@@ -10,9 +10,15 @@ import (
 	"github.com/turt2live/matrix-media-repo/controllers/maintenance_controller"
 	"github.com/turt2live/matrix-media-repo/storage"
 	"github.com/turt2live/matrix-media-repo/storage/datastore"
+	"github.com/turt2live/matrix-media-repo/types"
 	"github.com/turt2live/matrix-media-repo/util"
 )
 
+type DatastoreMigration struct {
+	*types.DatastoreMigrationEstimate
+	TaskID int `json:"task_id"`
+}
+
 func GetDatastores(r *http.Request, log *logrus.Entry, user api.UserInfo) interface{} {
 	datastores, err := storage.GetDatabase().GetMediaStore(r.Context(), log).GetAllDatastores()
 	if err != nil {
@@ -71,7 +77,11 @@ func MigrateBetweenDatastores(r *http.Request, log *logrus.Entry, user api.UserI
 	}
 
 	log.Info("User ", user.UserId, " has started a datastore media transfer")
-	maintenance_controller.StartStorageMigration(sourceDatastore, targetDatastore, beforeTs, log)
+	task, err := maintenance_controller.StartStorageMigration(sourceDatastore, targetDatastore, beforeTs, log)
+	if err != nil {
+		log.Error(err)
+		return api.InternalServerError("Unexpected error starting migration")
+	}
 
 	estimate, err := maintenance_controller.EstimateDatastoreSizeWithAge(beforeTs, sourceDsId, r.Context(), log)
 	if err != nil {
@@ -79,7 +89,12 @@ func MigrateBetweenDatastores(r *http.Request, log *logrus.Entry, user api.UserI
 		return api.InternalServerError("Unexpected error getting storage estimate")
 	}
 
-	return &api.DoNotCacheResponse{Payload: estimate}
+	migration := &DatastoreMigration{
+		DatastoreMigrationEstimate: estimate,
+		TaskID:                     task.ID,
+	}
+
+	return &api.DoNotCacheResponse{Payload: migration}
 }
 
 func GetDatastoreStorageEstimate(r *http.Request, log *logrus.Entry, user api.UserInfo) interface{} {
diff --git a/api/custom/tasks.go b/api/custom/tasks.go
new file mode 100644
index 00000000..63d7a984
--- /dev/null
+++ b/api/custom/tasks.go
@@ -0,0 +1,103 @@
+package custom
+
+import (
+	"net/http"
+	"strconv"
+
+	"github.com/gorilla/mux"
+	"github.com/sirupsen/logrus"
+	"github.com/turt2live/matrix-media-repo/api"
+	"github.com/turt2live/matrix-media-repo/storage"
+)
+
+type TaskStatus struct {
+	TaskID     int                    `json:"task_id"`
+	Name       string                 `json:"task_name"`
+	Params     map[string]interface{} `json:"params"`
+	StartTs    int64                  `json:"start_ts"`
+	EndTs      int64                  `json:"end_ts"`
+	IsFinished bool                   `json:"is_finished"`
+}
+
+func GetTask(r *http.Request, log *logrus.Entry, user api.UserInfo) interface{} {
+	params := mux.Vars(r)
+
+	taskIdStr := params["taskId"]
+	taskId, err := strconv.Atoi(taskIdStr)
+	if err != nil {
+		log.Error(err)
+		return api.BadRequest("invalid task ID")
+	}
+
+	log = log.WithFields(logrus.Fields{
+		"taskId": taskId,
+	})
+
+	db := storage.GetDatabase().GetMetadataStore(r.Context(), log)
+
+	task, err := db.GetBackgroundTask(taskId)
+	if err != nil {
+		log.Error(err)
+		return api.InternalServerError("failed to get task information")
+	}
+
+	return &api.DoNotCacheResponse{Payload: &TaskStatus{
+		TaskID:     task.ID,
+		Name:       task.Name,
+		Params:     task.Params,
+		StartTs:    task.StartTs,
+		EndTs:      task.EndTs,
+		IsFinished: task.EndTs > 0,
+	}}
+}
+
+func ListAllTasks(r *http.Request, log *logrus.Entry, user api.UserInfo) interface{} {
+	db := storage.GetDatabase().GetMetadataStore(r.Context(), log)
+
+	tasks, err := db.GetAllBackgroundTasks()
+	if err != nil {
+		logrus.Error(err)
+		return api.InternalServerError("Failed to get background tasks")
+	}
+
+	statusObjs := make([]*TaskStatus, 0)
+	for _, task := range tasks {
+		statusObjs = append(statusObjs, &TaskStatus{
+			TaskID:     task.ID,
+			Name:       task.Name,
+			Params:     task.Params,
+			StartTs:    task.StartTs,
+			EndTs:      task.EndTs,
+			IsFinished: task.EndTs > 0,
+		})
+	}
+
+	return &api.DoNotCacheResponse{Payload: statusObjs}
+}
+
+func ListUnfinishedTasks(r *http.Request, log *logrus.Entry, user api.UserInfo) interface{} {
+	db := storage.GetDatabase().GetMetadataStore(r.Context(), log)
+
+	tasks, err := db.GetAllBackgroundTasks()
+	if err != nil {
+		logrus.Error(err)
+		return api.InternalServerError("Failed to get background tasks")
+	}
+
+	statusObjs := make([]*TaskStatus, 0)
+	for _, task := range tasks {
+		if task.EndTs > 0 {
+			continue
+		}
+		statusObjs = append(statusObjs, &TaskStatus{
+			TaskID:     task.ID,
+			Name:       task.Name,
+			Params:     task.Params,
+			StartTs:    task.StartTs,
+			EndTs:      task.EndTs,
+			IsFinished: task.EndTs > 0,
+		})
+	}
+
+	return &api.DoNotCacheResponse{Payload: statusObjs}
+}
diff --git a/api/webserver/webserver.go b/api/webserver/webserver.go
index e259cfd1..8478501d 100644
--- a/api/webserver/webserver.go
+++ b/api/webserver/webserver.go
@@ -45,6 +45,9 @@ func Init() {
 	domainUsageHandler := handler{api.RepoAdminRoute(custom.GetDomainUsage), "domain_usage", counter, false}
 	userUsageHandler := handler{api.RepoAdminRoute(custom.GetUserUsage), "user_usage", counter, false}
 	uploadsUsageHandler := handler{api.RepoAdminRoute(custom.GetUploadsUsage), "uploads_usage", counter, false}
+	getBackgroundTaskHandler := handler{api.RepoAdminRoute(custom.GetTask), "get_background_task", counter, false}
+	listAllBackgroundTasksHandler := handler{api.RepoAdminRoute(custom.ListAllTasks), "list_all_background_tasks", counter, false}
+	listUnfinishedBackgroundTasksHandler := handler{api.RepoAdminRoute(custom.ListUnfinishedTasks), "list_unfinished_background_tasks", counter, false}
 
 	routes := make(map[string]route)
 	versions := []string{"r0", "v1", "unstable"} // r0 is typically clients and v1 is typically servers. v1 is deprecated.
@@ -70,6 +73,9 @@ func Init() {
 		routes["/_matrix/media/"+version+"/admin/usage/{serverName:[a-zA-Z0-9.:\\-_]+}"] = route{"GET", domainUsageHandler}
 		routes["/_matrix/media/"+version+"/admin/usage/{serverName:[a-zA-Z0-9.:\\-_]+}/users"] = route{"GET", userUsageHandler}
 		routes["/_matrix/media/"+version+"/admin/usage/{serverName:[a-zA-Z0-9.:\\-_]+}/uploads"] = route{"GET", uploadsUsageHandler}
+		routes["/_matrix/media/"+version+"/admin/tasks/{taskId:[0-9]+}"] = route{"GET", getBackgroundTaskHandler}
+		routes["/_matrix/media/"+version+"/admin/tasks/all"] = route{"GET", listAllBackgroundTasksHandler}
+		routes["/_matrix/media/"+version+"/admin/tasks/unfinished"] = route{"GET", listUnfinishedBackgroundTasksHandler}
 
 		// Routes that we should handle but aren't in the media namespace (synapse compat)
 		routes["/_matrix/client/"+version+"/admin/purge_media_cache"] = route{"POST", purgeHandler}
diff --git a/controllers/maintenance_controller/maintainance_controller.go b/controllers/maintenance_controller/maintainance_controller.go
index 89e09510..8bc56af5 100644
--- a/controllers/maintenance_controller/maintainance_controller.go
+++ b/controllers/maintenance_controller/maintainance_controller.go
@@ -11,8 +11,20 @@ import (
 	"github.com/turt2live/matrix-media-repo/util"
 )
 
-func StartStorageMigration(sourceDs *datastore.DatastoreRef, targetDs *datastore.DatastoreRef, beforeTs int64, log *logrus.Entry) {
+// Returns an error only if starting up the background task failed.
+func StartStorageMigration(sourceDs *datastore.DatastoreRef, targetDs *datastore.DatastoreRef, beforeTs int64, log *logrus.Entry) (*types.BackgroundTask, error) {
 	ctx := context.Background()
+
+	db := storage.GetDatabase().GetMetadataStore(ctx, log)
+	task, err := db.CreateBackgroundTask("storage_migration", map[string]interface{}{
+		"source_datastore_id": sourceDs.DatastoreId,
+		"target_datastore_id": targetDs.DatastoreId,
+		"before_ts": beforeTs,
+	})
+	if err != nil {
+		return nil, err
+	}
+
 	go func() {
 		log.Info("Starting transfer")
 
@@ -72,8 +84,15 @@ func StartStorageMigration(sourceDs *datastore.DatastoreRef, targetDs *datastore
 		}
 		doUpdate(thumbs)
 
+		err = db.FinishedBackgroundTask(task.ID)
+		if err != nil {
+			log.Error(err)
+			log.Error("Failed to flag task as finished")
+		}
 		log.Info("Finished transfer")
 	}()
+
+	return task, nil
 }
 
 func EstimateDatastoreSizeWithAge(beforeTs int64, datastoreId string, ctx context.Context, log *logrus.Entry) (*types.DatastoreMigrationEstimate, error) {
diff --git a/docs/admin.md b/docs/admin.md
index 09a221ad..9b563bcc 100644
--- a/docs/admin.md
+++ b/docs/admin.md
@@ -73,6 +73,7 @@ URL: `POST /_matrix/media/unstable/admin/datastores/<source datastore id>/transf
 The response is the estimated amount of data being transferred:
 ```json
 {
+  "task_id": 12,
   "thumbnails_affected": 672,
   "thumbnail_hashes_affected": 598,
   "thumbnail_bytes": 49087657,
@@ -84,6 +85,8 @@ The response is the estimated amount of data being transferred:
 }
 ```
 
+The `task_id` can be given to the Background Tasks API described below.
+
 ## Data usage for servers/users
 
 Individual servers and users can often hoard data in the media repository. These endpoints will tell you how much. These endpoints can only be called by repository admins - they are not available to admins of the homeservers.
@@ -169,3 +172,91 @@ The response is how much data the server is using:
 #### Per-upload usage (batch of uploads / single upload)
 
 Use the same endpoint as above, but specifying one or more `?mxc=mxc://example.org/abc123` query parameters. Note that encoding the values may be required (not shown here).
+
+Only repository administrators can use these endpoints.
+
+## Background Tasks API
+
+The media repo keeps track of tasks that were started and did not block the request. For example, transferring media or quarantining large amounts of media may result in a background task. A `task_id` will be returned by those endpoints which can then be used here to get the status of a task.
+
+#### Listing all tasks
+
+URL: `GET /_matrix/media/unstable/admin/tasks/all`
+
+The response is a list of all known tasks:
+```json
+[
+  {
+    "task_id": 1,
+    "task_name": "storage_migration",
+    "params": {
+      "before_ts": 1567460189817,
+      "source_datastore_id": "abc123",
+      "target_datastore_id": "def456"
+    },
+    "start_ts": 1567460189913,
+    "end_ts": 1567460190502,
+    "is_finished": true
+  },
+  {
+    "task_id": 2,
+    "task_name": "storage_migration",
+    "params": {
+      "before_ts": 1567460189817,
+      "source_datastore_id": "ghi789",
+      "target_datastore_id": "123abc"
+    },
+    "start_ts": 1567460189913,
+    "end_ts": 0,
+    "is_finished": false
+  }
+]
+```
+
+**Note**: The `params` vary depending on the task.
+
+#### Listing unfinished tasks
+
+URL: `GET /_matrix/media/unstable/admin/tasks/unfinished`
+
+The response is a list of all unfinished tasks:
+```json
+[
+  {
+    "task_id": 2,
+    "task_name": "storage_migration",
+    "params": {
+      "before_ts": 1567460189817,
+      "source_datastore_id": "ghi789",
+      "target_datastore_id": "123abc"
+    },
+    "start_ts": 1567460189913,
+    "end_ts": 0,
+    "is_finished": false
+  }
+]
+```
+
+**Note**: The `params` vary depending on the task.
+
+#### Getting information on a specific task
+
+URL: `GET /_matrix/media/unstable/admin/tasks/<task ID>`
+
+The response is the status of the task:
+```json
+{
+  "task_id": 1,
+  "task_name": "storage_migration",
+  "params": {
+    "before_ts": 1567460189817,
+    "source_datastore_id": "abc123",
+    "target_datastore_id": "def456"
+  },
+  "start_ts": 1567460189913,
+  "end_ts": 1567460190502,
+  "is_finished": true
+}
+```
+
+**Note**: The `params` vary depending on the task.
diff --git a/migrations/10_add_background_tasks_table_down.sql b/migrations/10_add_background_tasks_table_down.sql
new file mode 100644
index 00000000..b9afa25f
--- /dev/null
+++ b/migrations/10_add_background_tasks_table_down.sql
@@ -0,0 +1 @@
+DROP TABLE IF EXISTS background_tasks;
diff --git a/migrations/10_add_background_tasks_table_up.sql b/migrations/10_add_background_tasks_table_up.sql
new file mode 100644
index 00000000..37e491e3
--- /dev/null
+++ b/migrations/10_add_background_tasks_table_up.sql
@@ -0,0 +1,7 @@
+CREATE TABLE IF NOT EXISTS background_tasks (
+    id SERIAL PRIMARY KEY,
+    task TEXT NOT NULL,
+    params JSON NOT NULL,
+    start_ts BIGINT NOT NULL,
+    end_ts BIGINT NULL
+);
diff --git a/storage/stores/metadata_store.go b/storage/stores/metadata_store.go
index 12a3cf32..6dfd90e2 100644
--- a/storage/stores/metadata_store.go
+++ b/storage/stores/metadata_store.go
@@ -3,9 +3,11 @@ package stores
 import (
 	"context"
 	"database/sql"
+	"encoding/json"
 
 	"github.com/sirupsen/logrus"
 	"github.com/turt2live/matrix-media-repo/types"
+	"github.com/turt2live/matrix-media-repo/util"
 )
 
 type folderSize struct {
@@ -21,6 +23,10 @@ const changeDatastoreOfThumbnailHash = "UPDATE thumbnails SET datastore_id = $1,
 const selectUploadCountsForServer = "SELECT COALESCE((SELECT COUNT(origin) FROM media WHERE origin = $1), 0) AS media, COALESCE((SELECT COUNT(origin) FROM thumbnails WHERE origin = $1), 0) AS thumbnails"
 const selectUploadSizesForServer = "SELECT COALESCE((SELECT SUM(size_bytes) FROM media WHERE origin = $1), 0) AS media, COALESCE((SELECT SUM(size_bytes) FROM thumbnails WHERE origin = $1), 0) AS thumbnails"
 const selectUsersForServer = "SELECT DISTINCT user_id FROM media WHERE origin = $1 AND user_id IS NOT NULL AND LENGTH(user_id) > 0"
+const insertNewBackgroundTask = "INSERT INTO background_tasks (task, params, start_ts) VALUES ($1, $2, $3) RETURNING id;"
+const selectBackgroundTask = "SELECT id, task, params, start_ts, end_ts FROM background_tasks WHERE id = $1"
+const updateBackgroundTask = "UPDATE background_tasks SET end_ts = $2 WHERE id = $1"
+const selectAllBackgroundTasks = "SELECT id, task, params, start_ts, end_ts FROM background_tasks"
 
 type metadataStoreStatements struct {
 	upsertLastAccessed                            *sql.Stmt
@@ -32,6 +38,10 @@ type metadataStoreStatements struct {
 	selectUploadCountsForServer                   *sql.Stmt
 	selectUploadSizesForServer                    *sql.Stmt
 	selectUsersForServer                          *sql.Stmt
+	insertNewBackgroundTask                       *sql.Stmt
+	selectBackgroundTask                          *sql.Stmt
+	updateBackgroundTask                          *sql.Stmt
+	selectAllBackgroundTasks                      *sql.Stmt
 }
 
 type MetadataStoreFactory struct {
@@ -79,6 +89,18 @@ func InitMetadataStore(sqlDb *sql.DB) (*MetadataStoreFactory, error) {
 	if store.stmts.selectUploadCountsForServer, err = store.sqlDb.Prepare(selectUploadCountsForServer); err != nil {
 		return nil, err
 	}
+	if store.stmts.insertNewBackgroundTask, err = store.sqlDb.Prepare(insertNewBackgroundTask); err != nil {
+		return nil, err
+	}
+	if store.stmts.selectBackgroundTask, err = store.sqlDb.Prepare(selectBackgroundTask); err != nil {
+		return nil, err
+	}
+	if store.stmts.updateBackgroundTask, err = store.sqlDb.Prepare(updateBackgroundTask); err != nil {
+		return nil, err
+	}
+	if store.stmts.selectAllBackgroundTasks, err = store.sqlDb.Prepare(selectAllBackgroundTasks); err != nil {
+		return nil, err
+	}
 
 	return &store, nil
 }
@@ -211,3 +233,84 @@ func (s *MetadataStore) GetCountUsageForServer(serverName string) (int64, int64,
 
 	return media, thumbs, nil
 }
+
+func (s *MetadataStore) CreateBackgroundTask(name string, params map[string]interface{}) (*types.BackgroundTask, error) {
+	now := util.NowMillis()
+	b, err := json.Marshal(params)
+	if err != nil {
+		return nil, err
+	}
+	r := s.statements.insertNewBackgroundTask.QueryRowContext(s.ctx, name, string(b), now)
+	var id int
+	err = r.Scan(&id)
+	if err != nil {
+		return nil, err
+	}
+	return &types.BackgroundTask{
+		ID:      id,
+		Name:    name,
+		StartTs: now,
+		EndTs:   0,
+	}, nil
+}
+
+func (s *MetadataStore) FinishedBackgroundTask(id int) error {
+	now := util.NowMillis()
+	_, err := s.statements.updateBackgroundTask.ExecContext(s.ctx, id, now)
+	return err
+}
+
+func (s *MetadataStore) GetBackgroundTask(id int) (*types.BackgroundTask, error) {
+	r := s.statements.selectBackgroundTask.QueryRowContext(s.ctx, id)
+	task := &types.BackgroundTask{}
+	var paramsStr string
+	var endTs sql.NullInt64
+
+	err := r.Scan(&task.ID, &task.Name, &paramsStr, &task.StartTs, &endTs)
+	if err != nil {
+		return nil, err
+	}
+
+	err = json.Unmarshal([]byte(paramsStr), &task.Params)
+	if err != nil {
+		return nil, err
+	}
+
+	if endTs.Valid {
+		task.EndTs = endTs.Int64
+	}
+
+	return task, nil
+}
+
+func (s *MetadataStore) GetAllBackgroundTasks() ([]*types.BackgroundTask, error) {
+	rows, err := s.statements.selectAllBackgroundTasks.QueryContext(s.ctx)
+	if err != nil {
+		return nil, err
+	}
+
+	results := make([]*types.BackgroundTask, 0)
+	for rows.Next() {
+		task := &types.BackgroundTask{}
+		var paramsStr string
+		var endTs sql.NullInt64
+
+		err := rows.Scan(&task.ID, &task.Name, &paramsStr, &task.StartTs, &endTs)
+		if err != nil {
+			return nil, err
+		}
+
+		err = json.Unmarshal([]byte(paramsStr), &task.Params)
+		if err != nil {
+			return nil, err
+		}
+
+		if endTs.Valid {
+			task.EndTs = endTs.Int64
+		}
+
+		results = append(results, task)
+	}
+
+	return results, nil
+}
diff --git a/types/background_task.go b/types/background_task.go
new file mode 100644
index 00000000..9f7111dd
--- /dev/null
+++ b/types/background_task.go
@@ -0,0 +1,9 @@
+package types
+
+type BackgroundTask struct {
+	ID      int
+	Name    string
+	Params  map[string]interface{}
+	StartTs int64
+	EndTs   int64
+}
-- 
GitLab