From 42b9363ec904f0b29cdcac58ebce75893234ac74 Mon Sep 17 00:00:00 2001 From: Travis Ralston <travpc@gmail.com> Date: Wed, 23 Oct 2019 23:09:27 -0600 Subject: [PATCH] Early support for constructing archives --- api/custom/exports.go | 87 ++++++ api/webserver/webserver.go | 10 + common/media_kinds.go | 1 + .../data_controller/export_controller.go | 253 ++++++++++++++++++ docs/admin.md | 12 +- 5 files changed, 353 insertions(+), 10 deletions(-) create mode 100644 api/custom/exports.go create mode 100644 controllers/data_controller/export_controller.go diff --git a/api/custom/exports.go b/api/custom/exports.go new file mode 100644 index 00000000..34a8c45a --- /dev/null +++ b/api/custom/exports.go @@ -0,0 +1,87 @@ +package custom + +import ( + "net/http" + + "github.com/gorilla/mux" + "github.com/sirupsen/logrus" + "github.com/turt2live/matrix-media-repo/api" + "github.com/turt2live/matrix-media-repo/common/config" + "github.com/turt2live/matrix-media-repo/controllers/data_controller" + "github.com/turt2live/matrix-media-repo/util" +) + +type ExportStarted struct { + ExportID string `json:"export_id"` + TaskID int `json:"task_id"` +} + +func ExportUserData(r *http.Request, log *logrus.Entry, user api.UserInfo) interface{} { + if !config.Get().Archiving.Enabled { + return api.BadRequest("archiving is not enabled") + } + + isAdmin := util.IsGlobalAdmin(user.UserId) || user.IsShared + if !config.Get().Archiving.SelfService && !isAdmin { + return api.AuthFailed() + } + + includeData := r.URL.Query().Get("include_data") != "false" + s3urls := r.URL.Query().Get("s3_urls") != "false" + + params := mux.Vars(r) + + userId := params["userId"] + + if !isAdmin && user.UserId != userId { + return api.BadRequest("cannot export data for another user") + } + + log = log.WithFields(logrus.Fields{ + "exportUserId": userId, + "includeData": includeData, + "s3urls": s3urls, + }) + task, exportId, err := data_controller.StartUserExport(userId, s3urls, includeData, log) + if err != nil { + log.Error(err) + return api.InternalServerError("fatal error starting export") + } + + return &api.DoNotCacheResponse{Payload: &ExportStarted{ + TaskID: task.ID, + ExportID: exportId, + }} +} + +func ViewExport(r *http.Request, log *logrus.Entry, user api.UserInfo) interface{} { + if !config.Get().Archiving.Enabled { + return api.BadRequest("archiving is not enabled") + } + + return api.EmptyResponse{} +} + +func GetExportMetadata(r *http.Request, log *logrus.Entry, user api.UserInfo) interface{} { + if !config.Get().Archiving.Enabled { + return api.BadRequest("archiving is not enabled") + } + + return api.EmptyResponse{} +} + +func DownloadExportPart(r *http.Request, log *logrus.Entry, user api.UserInfo) interface{} { + if !config.Get().Archiving.Enabled { + return api.BadRequest("archiving is not enabled") + } + + return api.EmptyResponse{} +} + +func DeleteExport(r *http.Request, log *logrus.Entry, user api.UserInfo) interface{} { + if !config.Get().Archiving.Enabled { + return api.BadRequest("archiving is not enabled") + } + + return api.EmptyResponse{} +} diff --git a/api/webserver/webserver.go b/api/webserver/webserver.go index bf09ae36..947c6130 100644 --- a/api/webserver/webserver.go +++ b/api/webserver/webserver.go @@ -54,6 +54,11 @@ func Init() { getBackgroundTaskHandler := handler{api.RepoAdminRoute(custom.GetTask), "get_background_task", counter, false} listAllBackgroundTasksHandler := handler{api.RepoAdminRoute(custom.ListAllTasks), "list_all_background_tasks", counter, false} listUnfinishedBackgroundTasksHandler := handler{api.RepoAdminRoute(custom.ListUnfinishedTasks), "list_unfinished_background_tasks", counter, false} + exportUserDataHandler := handler{api.AccessTokenRequiredRoute(custom.ExportUserData), "export_user_data", counter, false} + viewExportHandler := handler{api.AccessTokenOptionalRoute(custom.ViewExport), "view_export", counter, false} + getExportMetadataHandler := handler{api.AccessTokenOptionalRoute(custom.GetExportMetadata), "get_export_metadata", counter, false} + downloadExportPartHandler := handler{api.AccessTokenOptionalRoute(custom.DownloadExportPart), "download_export_part", counter, false} + deleteExportHandler := handler{api.AccessTokenOptionalRoute(custom.DeleteExport), "delete_export", counter, false} routes := make(map[string]route) versions := []string{"r0", "v1", "unstable"} // r0 is typically clients and v1 is typically servers. v1 is deprecated. @@ -90,6 +95,11 @@ func Init() { routes["/_matrix/media/"+version+"/admin/tasks/{taskId:[0-9]+}"] = route{"GET", getBackgroundTaskHandler} routes["/_matrix/media/"+version+"/admin/tasks/all"] = route{"GET", listAllBackgroundTasksHandler} routes["/_matrix/media/"+version+"/admin/tasks/unfinished"] = route{"GET", listUnfinishedBackgroundTasksHandler} + routes["/_matrix/media/"+version+"/admin/user/{userId:[^/]+}/export"] = route{"POST", exportUserDataHandler} + routes["/_matrix/media/"+version+"/admin/export/{exportId:[a-zA-Z0-9.:\\-_]+}/view"] = route{"GET", viewExportHandler} + routes["/_matrix/media/"+version+"/admin/export/{exportId:[a-zA-Z0-9.:\\-_]+}/metadata"] = route{"GET", getExportMetadataHandler} + routes["/_matrix/media/"+version+"/admin/export/{exportId:[a-zA-Z0-9.:\\-_]+}/part/{partId:[0-9]+}"] = route{"GET", downloadExportPartHandler} + routes["/_matrix/media/"+version+"/admin/export/{exportId:[a-zA-Z0-9.:\\-_]+}/delete"] = route{"DELETE", deleteExportHandler} // Routes that we should handle but aren't in the media namespace (synapse compat) routes["/_matrix/client/"+version+"/admin/purge_media_cache"] = route{"POST", purgeRemote} diff --git a/common/media_kinds.go b/common/media_kinds.go index 0f1b7661..4ea68567 100644 --- a/common/media_kinds.go +++ b/common/media_kinds.go @@ -3,5 +3,6 @@ package common const KindLocalMedia = "local_media" const KindRemoteMedia = "remote_media" const KindThumbnails = "thumbnails" +const KindArchives = "archives" var AllKinds = []string{KindLocalMedia, KindRemoteMedia, KindThumbnails} diff --git a/controllers/data_controller/export_controller.go b/controllers/data_controller/export_controller.go new file mode 100644 index 00000000..5b2c7a78 --- /dev/null +++ b/controllers/data_controller/export_controller.go @@ -0,0 +1,253 @@ +package data_controller + +import ( + "archive/tar" + "bytes" + "compress/gzip" + "context" + "encoding/json" + "fmt" + "io" + "time" + + "github.com/sirupsen/logrus" + "github.com/turt2live/matrix-media-repo/common" + "github.com/turt2live/matrix-media-repo/common/config" + "github.com/turt2live/matrix-media-repo/storage" + "github.com/turt2live/matrix-media-repo/storage/datastore" + "github.com/turt2live/matrix-media-repo/types" + "github.com/turt2live/matrix-media-repo/util" +) + +type manifestRecord struct { + FileName string `json:"name"` + ArchivedName string `json:"file_name"` + SizeBytes int64 `json:"size_bytes"` + ContentType string `json:"content_type"` + S3Url string `json:"s3_url"` + Sha256 string `json:"sha256"` + Origin string `json:"origin"` + MediaId string `json:"media_id"` + CreatedTs int64 `json:"created_ts"` +} + +type manifest struct { + Version int `json:"version"` + UserId string `json:"user_id"` + CreatedTs int64 `json:"created_ts"` + Media map[string]*manifestRecord `json:"media"` +} + +func StartUserExport(userId string, s3urls bool, includeData bool, log *logrus.Entry) (*types.BackgroundTask, string, error) { + ctx := context.Background() + + exportId, err := util.GenerateRandomString(128) + if err != nil { + return nil, "", err + } + + db := storage.GetDatabase().GetMetadataStore(ctx, log) + task, err := db.CreateBackgroundTask("export_data", map[string]interface{}{ + "user_id": userId, + "include_s3_urls": s3urls, + "include_data": includeData, + "export_id": exportId, + }) + + if err != nil { + return nil, "", err + } + + go func() { + ds, err := datastore.PickDatastore(common.KindArchives, ctx, log) + if err != nil { + log.Error(err) + return + } + + mediaDb := storage.GetDatabase().GetMediaStore(ctx, log) + media, err := mediaDb.GetMediaByUser(userId) + if err != nil { + log.Error(err) + return + } + + var currentTar *tar.Writer + var currentTarBytes bytes.Buffer + part := 0 + parts := make([]*types.ObjectInfo, 0) + currentSize := int64(0) + + persistTar := func() error { + currentTar.Close() + + // compress + log.Info("Compressing tar file") + gzipBytes := bytes.Buffer{} + archiver := gzip.NewWriter(&gzipBytes) + archiver.Name = fmt.Sprintf("export-part-%d.tar", part) + _, err := io.Copy(archiver, util.BufferToStream(bytes.NewBuffer(currentTarBytes.Bytes()))) + if err != nil { + return err + } + archiver.Close() + + log.Info("Uploading compressed tar file") + buf := bytes.NewBuffer(gzipBytes.Bytes()) + obj, err := ds.UploadFile(util.BufferToStream(buf), int64(buf.Len()), ctx, log) + if err != nil { + return err + } + parts = append(parts, obj) + + return nil + } + + newTar := func() error { + if part > 0 { + log.Info("Persisting complete tar file") + err := persistTar() + if err != nil { + return err + } + } + + log.Info("Starting new tar file") + currentTarBytes = bytes.Buffer{} + currentTar = tar.NewWriter(¤tTarBytes) + part = part + 1 + currentSize = 0 + + return nil + } + + // Start the first tar file + log.Info("Creating first tar file") + err = newTar() + if err != nil { + log.Error(err) + return + } + + putFile := func(name string, size int64, creationTime time.Time, file io.Reader) error { + header := &tar.Header{ + Name: name, + Size: size, + Mode: int64(0644), + ModTime: creationTime, + } + err := currentTar.WriteHeader(header) + if err != nil { + log.Error("error writing header") + return err + } + + i, err := io.Copy(currentTar, file) + if err != nil { + log.Error("error writing file") + return err + } + + currentSize += i + + return nil + } + + archivedName := func(m *types.Media) string { + // TODO: Pick the right extension for the file type + return fmt.Sprintf("%s__%s.obj", m.Origin, m.MediaId) + } + + // Build a manifest first (JSON) + log.Info("Building manifest") + mediaManifest := make(map[string]*manifestRecord) + for _, m := range media { + mediaManifest[m.MxcUri()] = &manifestRecord{ + ArchivedName: archivedName(m), + FileName: m.UploadName, + SizeBytes: m.SizeBytes, + ContentType: m.ContentType, + S3Url: "TODO", + Sha256: m.Sha256Hash, + Origin: m.Origin, + MediaId: m.MediaId, + CreatedTs: m.CreationTs, + } + } + manifest := &manifest{ + Version: 1, + UserId: userId, + CreatedTs: util.NowMillis(), + Media: mediaManifest, + } + b, err := json.Marshal(manifest) + if err != nil { + log.Error(err) + return + } + + log.Info("Writing manifest") + err = putFile("manifest.json", int64(len(b)), time.Now(), util.BufferToStream(bytes.NewBuffer(b))) + if err != nil { + log.Error(err) + return + } + + if includeData { + log.Info("Including data in the archive") + for _, m := range media { + log.Info("Downloading ", m.MxcUri()) + s, err := datastore.DownloadStream(ctx, log, m.DatastoreId, m.Location) + if err != nil { + log.Error(err) + continue + } + + log.Infof("Copying %s to memory", m.MxcUri()) + b := bytes.Buffer{} + _, err = io.Copy(&b, s) + if err != nil { + log.Error(err) + continue + } + s.Close() + s = util.BufferToStream(bytes.NewBuffer(b.Bytes())) + + log.Info("Archiving ", m.MxcUri()) + err = putFile(archivedName(m), m.SizeBytes, time.Unix(0, m.CreationTs*int64(time.Millisecond)), s) + if err != nil { + log.Error(err) + return + } + + if currentSize >= config.Get().Archiving.TargetBytesPerPart { + log.Info("Rotating tar") + err = newTar() + if err != nil { + log.Error(err) + return + } + } + } + } + + if currentSize > 0 { + log.Info("Persisting last tar") + err = persistTar() + if err != nil { + log.Error(err) + return + } + } + + log.Info("Finishing export task") + err = db.FinishedBackgroundTask(task.ID) + if err != nil { + log.Error(err) + log.Error("Failed to flag task as finished") + } + log.Info("Finished export") + }() + + return task, exportId, nil +} diff --git a/docs/admin.md b/docs/admin.md index 032aa3d1..5278ce1d 100644 --- a/docs/admin.md +++ b/docs/admin.md @@ -317,17 +317,9 @@ Exports (and therefore imports) are currently done on a per-user basis. This is #### Exporting data for a user -URL: `POST /_matrix/media/unstable/admin/user/<user ID>/export` +URL: `POST /_matrix/media/unstable/admin/user/<user ID>/export?include_data=true&s3_urls=true` -The request body is: -```json -{ - "include_data": true, - "s3_urls": true -} -``` - -Both fields are optional, and their default values are shown. If `include_data` is false, only metadata will be returned by the export. `s3_urls`, when true, includes the s3 URL to the media in the metadata if one is available. +Both query params are optional, and their default values are shown. If `include_data` is false, only metadata will be returned by the export. `s3_urls`, when true, includes the s3 URL to the media in the metadata if one is available. The response is a task ID and export ID to put into the 'view export' URL: -- GitLab