Skip to content
Snippets Groups Projects
Commit 42b9363e authored by Travis Ralston's avatar Travis Ralston
Browse files

Early support for constructing archives

parent 1efadda1
No related branches found
No related tags found
No related merge requests found
package custom
import (
"net/http"
"github.com/gorilla/mux"
"github.com/sirupsen/logrus"
"github.com/turt2live/matrix-media-repo/api"
"github.com/turt2live/matrix-media-repo/common/config"
"github.com/turt2live/matrix-media-repo/controllers/data_controller"
"github.com/turt2live/matrix-media-repo/util"
)
type ExportStarted struct {
ExportID string `json:"export_id"`
TaskID int `json:"task_id"`
}
func ExportUserData(r *http.Request, log *logrus.Entry, user api.UserInfo) interface{} {
if !config.Get().Archiving.Enabled {
return api.BadRequest("archiving is not enabled")
}
isAdmin := util.IsGlobalAdmin(user.UserId) || user.IsShared
if !config.Get().Archiving.SelfService && !isAdmin {
return api.AuthFailed()
}
includeData := r.URL.Query().Get("include_data") != "false"
s3urls := r.URL.Query().Get("s3_urls") != "false"
params := mux.Vars(r)
userId := params["userId"]
if !isAdmin && user.UserId != userId {
return api.BadRequest("cannot export data for another user")
}
log = log.WithFields(logrus.Fields{
"exportUserId": userId,
"includeData": includeData,
"s3urls": s3urls,
})
task, exportId, err := data_controller.StartUserExport(userId, s3urls, includeData, log)
if err != nil {
log.Error(err)
return api.InternalServerError("fatal error starting export")
}
return &api.DoNotCacheResponse{Payload: &ExportStarted{
TaskID: task.ID,
ExportID: exportId,
}}
}
func ViewExport(r *http.Request, log *logrus.Entry, user api.UserInfo) interface{} {
if !config.Get().Archiving.Enabled {
return api.BadRequest("archiving is not enabled")
}
return api.EmptyResponse{}
}
func GetExportMetadata(r *http.Request, log *logrus.Entry, user api.UserInfo) interface{} {
if !config.Get().Archiving.Enabled {
return api.BadRequest("archiving is not enabled")
}
return api.EmptyResponse{}
}
func DownloadExportPart(r *http.Request, log *logrus.Entry, user api.UserInfo) interface{} {
if !config.Get().Archiving.Enabled {
return api.BadRequest("archiving is not enabled")
}
return api.EmptyResponse{}
}
func DeleteExport(r *http.Request, log *logrus.Entry, user api.UserInfo) interface{} {
if !config.Get().Archiving.Enabled {
return api.BadRequest("archiving is not enabled")
}
return api.EmptyResponse{}
}
......@@ -54,6 +54,11 @@ func Init() {
getBackgroundTaskHandler := handler{api.RepoAdminRoute(custom.GetTask), "get_background_task", counter, false}
listAllBackgroundTasksHandler := handler{api.RepoAdminRoute(custom.ListAllTasks), "list_all_background_tasks", counter, false}
listUnfinishedBackgroundTasksHandler := handler{api.RepoAdminRoute(custom.ListUnfinishedTasks), "list_unfinished_background_tasks", counter, false}
exportUserDataHandler := handler{api.AccessTokenRequiredRoute(custom.ExportUserData), "export_user_data", counter, false}
viewExportHandler := handler{api.AccessTokenOptionalRoute(custom.ViewExport), "view_export", counter, false}
getExportMetadataHandler := handler{api.AccessTokenOptionalRoute(custom.GetExportMetadata), "get_export_metadata", counter, false}
downloadExportPartHandler := handler{api.AccessTokenOptionalRoute(custom.DownloadExportPart), "download_export_part", counter, false}
deleteExportHandler := handler{api.AccessTokenOptionalRoute(custom.DeleteExport), "delete_export", counter, false}
routes := make(map[string]route)
versions := []string{"r0", "v1", "unstable"} // r0 is typically clients and v1 is typically servers. v1 is deprecated.
......@@ -90,6 +95,11 @@ func Init() {
routes["/_matrix/media/"+version+"/admin/tasks/{taskId:[0-9]+}"] = route{"GET", getBackgroundTaskHandler}
routes["/_matrix/media/"+version+"/admin/tasks/all"] = route{"GET", listAllBackgroundTasksHandler}
routes["/_matrix/media/"+version+"/admin/tasks/unfinished"] = route{"GET", listUnfinishedBackgroundTasksHandler}
routes["/_matrix/media/"+version+"/admin/user/{userId:[^/]+}/export"] = route{"POST", exportUserDataHandler}
routes["/_matrix/media/"+version+"/admin/export/{exportId:[a-zA-Z0-9.:\\-_]+}/view"] = route{"GET", viewExportHandler}
routes["/_matrix/media/"+version+"/admin/export/{exportId:[a-zA-Z0-9.:\\-_]+}/metadata"] = route{"GET", getExportMetadataHandler}
routes["/_matrix/media/"+version+"/admin/export/{exportId:[a-zA-Z0-9.:\\-_]+}/part/{partId:[0-9]+}"] = route{"GET", downloadExportPartHandler}
routes["/_matrix/media/"+version+"/admin/export/{exportId:[a-zA-Z0-9.:\\-_]+}/delete"] = route{"DELETE", deleteExportHandler}
// Routes that we should handle but aren't in the media namespace (synapse compat)
routes["/_matrix/client/"+version+"/admin/purge_media_cache"] = route{"POST", purgeRemote}
......
......@@ -3,5 +3,6 @@ package common
const KindLocalMedia = "local_media"
const KindRemoteMedia = "remote_media"
const KindThumbnails = "thumbnails"
const KindArchives = "archives"
var AllKinds = []string{KindLocalMedia, KindRemoteMedia, KindThumbnails}
package data_controller
import (
"archive/tar"
"bytes"
"compress/gzip"
"context"
"encoding/json"
"fmt"
"io"
"time"
"github.com/sirupsen/logrus"
"github.com/turt2live/matrix-media-repo/common"
"github.com/turt2live/matrix-media-repo/common/config"
"github.com/turt2live/matrix-media-repo/storage"
"github.com/turt2live/matrix-media-repo/storage/datastore"
"github.com/turt2live/matrix-media-repo/types"
"github.com/turt2live/matrix-media-repo/util"
)
type manifestRecord struct {
FileName string `json:"name"`
ArchivedName string `json:"file_name"`
SizeBytes int64 `json:"size_bytes"`
ContentType string `json:"content_type"`
S3Url string `json:"s3_url"`
Sha256 string `json:"sha256"`
Origin string `json:"origin"`
MediaId string `json:"media_id"`
CreatedTs int64 `json:"created_ts"`
}
type manifest struct {
Version int `json:"version"`
UserId string `json:"user_id"`
CreatedTs int64 `json:"created_ts"`
Media map[string]*manifestRecord `json:"media"`
}
func StartUserExport(userId string, s3urls bool, includeData bool, log *logrus.Entry) (*types.BackgroundTask, string, error) {
ctx := context.Background()
exportId, err := util.GenerateRandomString(128)
if err != nil {
return nil, "", err
}
db := storage.GetDatabase().GetMetadataStore(ctx, log)
task, err := db.CreateBackgroundTask("export_data", map[string]interface{}{
"user_id": userId,
"include_s3_urls": s3urls,
"include_data": includeData,
"export_id": exportId,
})
if err != nil {
return nil, "", err
}
go func() {
ds, err := datastore.PickDatastore(common.KindArchives, ctx, log)
if err != nil {
log.Error(err)
return
}
mediaDb := storage.GetDatabase().GetMediaStore(ctx, log)
media, err := mediaDb.GetMediaByUser(userId)
if err != nil {
log.Error(err)
return
}
var currentTar *tar.Writer
var currentTarBytes bytes.Buffer
part := 0
parts := make([]*types.ObjectInfo, 0)
currentSize := int64(0)
persistTar := func() error {
currentTar.Close()
// compress
log.Info("Compressing tar file")
gzipBytes := bytes.Buffer{}
archiver := gzip.NewWriter(&gzipBytes)
archiver.Name = fmt.Sprintf("export-part-%d.tar", part)
_, err := io.Copy(archiver, util.BufferToStream(bytes.NewBuffer(currentTarBytes.Bytes())))
if err != nil {
return err
}
archiver.Close()
log.Info("Uploading compressed tar file")
buf := bytes.NewBuffer(gzipBytes.Bytes())
obj, err := ds.UploadFile(util.BufferToStream(buf), int64(buf.Len()), ctx, log)
if err != nil {
return err
}
parts = append(parts, obj)
return nil
}
newTar := func() error {
if part > 0 {
log.Info("Persisting complete tar file")
err := persistTar()
if err != nil {
return err
}
}
log.Info("Starting new tar file")
currentTarBytes = bytes.Buffer{}
currentTar = tar.NewWriter(&currentTarBytes)
part = part + 1
currentSize = 0
return nil
}
// Start the first tar file
log.Info("Creating first tar file")
err = newTar()
if err != nil {
log.Error(err)
return
}
putFile := func(name string, size int64, creationTime time.Time, file io.Reader) error {
header := &tar.Header{
Name: name,
Size: size,
Mode: int64(0644),
ModTime: creationTime,
}
err := currentTar.WriteHeader(header)
if err != nil {
log.Error("error writing header")
return err
}
i, err := io.Copy(currentTar, file)
if err != nil {
log.Error("error writing file")
return err
}
currentSize += i
return nil
}
archivedName := func(m *types.Media) string {
// TODO: Pick the right extension for the file type
return fmt.Sprintf("%s__%s.obj", m.Origin, m.MediaId)
}
// Build a manifest first (JSON)
log.Info("Building manifest")
mediaManifest := make(map[string]*manifestRecord)
for _, m := range media {
mediaManifest[m.MxcUri()] = &manifestRecord{
ArchivedName: archivedName(m),
FileName: m.UploadName,
SizeBytes: m.SizeBytes,
ContentType: m.ContentType,
S3Url: "TODO",
Sha256: m.Sha256Hash,
Origin: m.Origin,
MediaId: m.MediaId,
CreatedTs: m.CreationTs,
}
}
manifest := &manifest{
Version: 1,
UserId: userId,
CreatedTs: util.NowMillis(),
Media: mediaManifest,
}
b, err := json.Marshal(manifest)
if err != nil {
log.Error(err)
return
}
log.Info("Writing manifest")
err = putFile("manifest.json", int64(len(b)), time.Now(), util.BufferToStream(bytes.NewBuffer(b)))
if err != nil {
log.Error(err)
return
}
if includeData {
log.Info("Including data in the archive")
for _, m := range media {
log.Info("Downloading ", m.MxcUri())
s, err := datastore.DownloadStream(ctx, log, m.DatastoreId, m.Location)
if err != nil {
log.Error(err)
continue
}
log.Infof("Copying %s to memory", m.MxcUri())
b := bytes.Buffer{}
_, err = io.Copy(&b, s)
if err != nil {
log.Error(err)
continue
}
s.Close()
s = util.BufferToStream(bytes.NewBuffer(b.Bytes()))
log.Info("Archiving ", m.MxcUri())
err = putFile(archivedName(m), m.SizeBytes, time.Unix(0, m.CreationTs*int64(time.Millisecond)), s)
if err != nil {
log.Error(err)
return
}
if currentSize >= config.Get().Archiving.TargetBytesPerPart {
log.Info("Rotating tar")
err = newTar()
if err != nil {
log.Error(err)
return
}
}
}
}
if currentSize > 0 {
log.Info("Persisting last tar")
err = persistTar()
if err != nil {
log.Error(err)
return
}
}
log.Info("Finishing export task")
err = db.FinishedBackgroundTask(task.ID)
if err != nil {
log.Error(err)
log.Error("Failed to flag task as finished")
}
log.Info("Finished export")
}()
return task, exportId, nil
}
......@@ -317,17 +317,9 @@ Exports (and therefore imports) are currently done on a per-user basis. This is
#### Exporting data for a user
URL: `POST /_matrix/media/unstable/admin/user/<user ID>/export`
URL: `POST /_matrix/media/unstable/admin/user/<user ID>/export?include_data=true&s3_urls=true`
The request body is:
```json
{
"include_data": true,
"s3_urls": true
}
```
Both fields are optional, and their default values are shown. If `include_data` is false, only metadata will be returned by the export. `s3_urls`, when true, includes the s3 URL to the media in the metadata if one is available.
Both query params are optional, and their default values are shown. If `include_data` is false, only metadata will be returned by the export. `s3_urls`, when true, includes the s3 URL to the media in the metadata if one is available.
The response is a task ID and export ID to put into the 'view export' URL:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment