diff --git a/archival/entity_export.go b/archival/entity_export.go index c3e748aa56d9b20b1a0247ae5f8d0e6a71e01695..262c28d4759647215933774125347a193f7e493b 100644 --- a/archival/entity_export.go +++ b/archival/entity_export.go @@ -61,7 +61,7 @@ func ExportEntityData(ctx rcontext.RequestContext, exportId string, entityId str // "should never happen" because we downloaded the file, in theory ctx.Log.Warnf("Cannot populate S3 URL for %s because datastore for media could not be found", mxc) } else { - s3url, err = datastores.GetS3Url(dsConf) + s3url, err = datastores.GetS3Url(dsConf, media.Location) if err != nil { ctx.Log.Warnf("Cannot populate S3 URL for %s because there was an error getting S3 information: %s", mxc, err) } diff --git a/archival/v2archive/manifest.go b/archival/v2archive/manifest.go index 3ebfedfd20d573ac7a0b52aa0e62e7a98b460448..15a4e1772e646f2ff34f3662c56ab93a715a19d4 100644 --- a/archival/v2archive/manifest.go +++ b/archival/v2archive/manifest.go @@ -1,10 +1,19 @@ package v2archive +const ( + ManifestVersionV1 = 1 + ManifestVersionV2 = 2 +) +const ManifestVersion = ManifestVersionV2 + type Manifest struct { Version int `json:"version"` EntityId string `json:"entity_id"` CreatedTs int64 `json:"created_ts"` Media map[string]*ManifestRecord `json:"media"` + + // Deprecated: for v1 manifests, now called EntityId + UserId string `json:"user_id,omitempty"` } type ManifestRecord struct { diff --git a/archival/v2archive/reader.go b/archival/v2archive/reader.go new file mode 100644 index 0000000000000000000000000000000000000000..5ee6cd0741a3955663164835ad49aba372fac0ef --- /dev/null +++ b/archival/v2archive/reader.go @@ -0,0 +1,264 @@ +package v2archive + +import ( + "archive/tar" + "compress/gzip" + "encoding/json" + "errors" + "io" + + "github.com/turt2live/matrix-media-repo/common/rcontext" + "github.com/turt2live/matrix-media-repo/database" + "github.com/turt2live/matrix-media-repo/datastores" + "github.com/turt2live/matrix-media-repo/pipelines/pipeline_upload" + "github.com/turt2live/matrix-media-repo/util" +) + +type ProcessOpts struct { + LockedEntityId string + CheckUploadedOnly bool +} + +type ArchiveReader struct { + ctx rcontext.RequestContext + + manifest *Manifest + uploaded map[string]bool + fileNamesToMxcs map[string][]string +} + +func NewReader(ctx rcontext.RequestContext) *ArchiveReader { + return &ArchiveReader{ + ctx: ctx, + manifest: nil, + uploaded: make(map[string]bool), + fileNamesToMxcs: make(map[string][]string), + } +} + +type archiveWorkFn = func(header *tar.Header, f io.Reader) error + +func readArchive(file io.ReadCloser, workFn archiveWorkFn) error { + archiver, err := gzip.NewReader(file) + if err != nil { + return err + } + defer func(archiver *gzip.Reader) { + _ = archiver.Close() + }(archiver) + + tarFile := tar.NewReader(archiver) + for { + header, err := tarFile.Next() + if err == io.EOF { + break // we're done + } + if err != nil { + return err + } + + if header == nil { + continue // skip weird file + } + if header.Typeflag != tar.TypeReg { + continue // skip directories and other stuff + } + + err = workFn(header, tarFile) + if err != nil { + return err + } + } + + return nil +} + +func (r *ArchiveReader) GetEntityId() string { + if r.manifest != nil { + return r.manifest.EntityId + } + return "" +} + +func (r *ArchiveReader) GetNotUploadedMxcUris() []string { + uris := make([]string, 0) + for k, v := range r.uploaded { + if !v { + uris = append(uris, k) + } + } + return uris +} + +func (r *ArchiveReader) TryGetManifestFrom(file io.ReadCloser) (bool, error) { + defer file.Close() + if r.manifest != nil { + return false, errors.New("manifest already discovered") + } + + err := readArchive(file, func(header *tar.Header, f io.Reader) error { + if header.Name == "manifest.json" { + manifest := &Manifest{} + decoder := json.NewDecoder(f) + err := decoder.Decode(&manifest) + if err != nil { + return err + } + if manifest.Version == ManifestVersionV1 { + manifest.EntityId = manifest.UserId + manifest.Version = ManifestVersionV2 + r.ctx.Log.Debug("Upgraded manifest to v2") + } + if manifest.Version != ManifestVersionV2 { + // We only support the one version for now. + return errors.New("unsupported manifest version") + } + if manifest.EntityId == "" { + return errors.New("invalid manifest: no entity") + } + if manifest.Media == nil { + return errors.New("invalid manifest: no media") + } + r.ctx.Log.Infof("Using manifest for %s (v%d) created %d", manifest.EntityId, manifest.Version, manifest.CreatedTs) + r.manifest = manifest + + for k, v := range r.manifest.Media { + r.uploaded[k] = false + if _, ok := r.fileNamesToMxcs[v.ArchivedName]; !ok { + r.fileNamesToMxcs[v.ArchivedName] = make([]string, 0) + } + r.fileNamesToMxcs[v.ArchivedName] = append(r.fileNamesToMxcs[v.ArchivedName], k) + } + + return nil + } + return nil + }) + return r.manifest != nil, err +} + +func (r *ArchiveReader) ProcessS3Files(opts ProcessOpts) error { + if r.manifest == nil { + return errors.New("missing manifest") + } + + missing := r.GetNotUploadedMxcUris() + for _, mxc := range missing { + metadata := r.manifest.Media[mxc] + if metadata.S3Url != "" { + dsConf, location, err := datastores.ParseS3Url(metadata.S3Url) + if err != nil { + r.ctx.Log.Warn("Error while parsing S3 URL for ", mxc, err) + continue + } + f, err := datastores.Download(r.ctx, dsConf, location) + if err != nil { + r.ctx.Log.Warn("Error while downloading from S3 URL for ", mxc, err) + continue + } + err = r.importFileFromStream(metadata.FileName, f, opts) + if err != nil { + return err + } + } + } + + return nil +} + +func (r *ArchiveReader) ProcessFile(file io.ReadCloser, opts ProcessOpts) error { + defer file.Close() + if r.manifest == nil { + return errors.New("missing manifest") + } + + err := readArchive(file, func(header *tar.Header, f io.Reader) error { + r.ctx.Log.Debugf("Processing file in tar: %s", header.Name) + return r.importFileFromStream(header.Name, io.NopCloser(f), opts) + }) + return err +} + +func (r *ArchiveReader) importFileFromStream(fileName string, f io.ReadCloser, opts ProcessOpts) error { + mxcs, ok := r.fileNamesToMxcs[fileName] + if !ok { + r.ctx.Log.Debugf("File %s does not map to an MXC URI", fileName) + return nil // ignore file + } + + db := database.GetInstance().Media.Prepare(r.ctx) + for _, mxc := range mxcs { + if r.uploaded[mxc] { + continue // ignore duplicate file + } + + metadata := r.manifest.Media[mxc] + genMxc := util.MxcUri(metadata.Origin, metadata.MediaId) + if genMxc != mxc { + r.ctx.Log.Warnf("File name maps to %s but expected %s from metadata - skipping file", genMxc, mxc) + continue + } + if metadata.Uploader != "" { + _, s, err := util.SplitUserId(metadata.Uploader) + if err != nil { + r.ctx.Log.Warnf("Invalid user ID in metadata: %s (media %s)", metadata.Uploader, mxc) + metadata.Uploader = "" + } else { + if s != metadata.Origin { + r.ctx.Log.Warnf("File has uploader on %s but MXC URI is for %s - skipping file", s, metadata.Origin) + continue + } + } + } + + if opts.LockedEntityId != "" { + if opts.LockedEntityId[0] == '@' && metadata.Uploader != opts.LockedEntityId { + r.ctx.Log.Warnf("Found media uploaded by %s but locked to %s - skipping file", metadata.Uploader, opts.LockedEntityId) + continue + } + if opts.LockedEntityId[0] != '@' && metadata.Origin != opts.LockedEntityId { + r.ctx.Log.Warnf("Found media uploaded by server %s but locked to %s - skipping file", metadata.Origin, opts.LockedEntityId) + continue + } + } + + record, err := db.GetById(metadata.Origin, metadata.MediaId) + if err != nil { + return err + } + if record != nil { + r.uploaded[mxc] = true + continue + } + + if opts.CheckUploadedOnly { + continue + } + + serverName := r.manifest.EntityId + userId := metadata.Uploader + if userId[0] != '@' { + userId = "" + } else { + _, s, err := util.SplitUserId(userId) + if err != nil { + r.ctx.Log.Warnf("Invalid user ID: %s (media %s)", userId, mxc) + serverName = "" + } else { + serverName = s + } + } + kind := datastores.LocalMediaKind + if !util.IsServerOurs(serverName) { + kind = datastores.RemoteMediaKind + } + + r.ctx.Log.Debugf("Importing file %s as kind %s", mxc, kind) + if _, err = pipeline_upload.Execute(r.ctx, metadata.Origin, metadata.MediaId, f, metadata.ContentType, metadata.FileName, metadata.Uploader, kind); err != nil { + return err + } + r.uploaded[mxc] = true + } + + return nil +} diff --git a/archival/v2archive/writer.go b/archival/v2archive/writer.go index 7d934078ae984762de49ea7594ed462aae412ac0..7e11138aedb0f841b3cb024d0d29bbd5cb069e4c 100644 --- a/archival/v2archive/writer.go +++ b/archival/v2archive/writer.go @@ -237,7 +237,7 @@ func (w *ArchiveWriter) Finish() error { w.writingManifest = true defer func() { w.writingManifest = false }() manifest := &Manifest{ - Version: 2, + Version: ManifestVersion, EntityId: w.entity, CreatedTs: util.NowMillis(), Media: w.mediaManifest, diff --git a/cmd/gdpr_import/main.go b/cmd/gdpr_import/main.go index ee75110322c6c785f0847de10031d959cd7356d3..6e1afea3b12b9a692fe106afad390a35d3bfe239 100644 --- a/cmd/gdpr_import/main.go +++ b/cmd/gdpr_import/main.go @@ -4,17 +4,14 @@ import ( "flag" "os" "path" - "time" "github.com/sirupsen/logrus" + "github.com/turt2live/matrix-media-repo/archival/v2archive" "github.com/turt2live/matrix-media-repo/common/assets" "github.com/turt2live/matrix-media-repo/common/config" "github.com/turt2live/matrix-media-repo/common/logging" "github.com/turt2live/matrix-media-repo/common/rcontext" "github.com/turt2live/matrix-media-repo/common/runtime" - "github.com/turt2live/matrix-media-repo/controllers/data_controller" - "github.com/turt2live/matrix-media-repo/storage" - "github.com/turt2live/matrix-media-repo/util" ) func main() { @@ -22,6 +19,7 @@ func main() { migrationsPath := flag.String("migrations", config.DefaultMigrationsPath, "The absolute path for the migrations folder") filesDir := flag.String("directory", "./gdpr-data", "The directory for where the entity's exported files are") verifyMode := flag.Bool("verify", false, "If set, no media will be imported and instead be tested to see if they've been imported already") + onlyEntity := flag.String("onlyEntity", "", "The entity (user ID or server name) to import for") flag.Parse() // Override config path with config for Docker users @@ -61,116 +59,58 @@ func main() { files = append(files, path.Join(*filesDir, f.Name())) } - // Find the manifest so we can import as soon as possible - manifestIdx := 0 - for i, fname := range files { - logrus.Infof("Checking %s for export manifest", fname) + // Make an archive reader + archiver := v2archive.NewReader(rcontext.Initial()) + + // Find the manifest + for _, fname := range files { + logrus.Debugf("Scanning %s for manifest", fname) f, err := os.Open(fname) if err != nil { panic(err) } - defer f.Close() - names, err := data_controller.GetFileNames(f) - if err != nil { + if ok, err := archiver.TryGetManifestFrom(f); err != nil { panic(err) - } - - if util.ArrayContains(names, "manifest.json") { - manifestIdx = i + } else if ok { break } } - - ctx := rcontext.Initial().LogWithFields(logrus.Fields{"flagDir": *filesDir}) - - f, err := os.Open(files[manifestIdx]) - if err != nil { - panic(err) - } - defer f.Close() - - if *verifyMode { - found, expected, missingIds, err := data_controller.VerifyImport(f, ctx) - if err != nil { - panic(err) - } - logrus.Info("Known imported media IDs: ", found) - logrus.Info("Expected media IDs: ", expected) - - if len(missingIds) > 0 { - for _, mxc := range missingIds { - logrus.Error("Expected media ID but was not present: ", mxc) - } - logrus.Warn("Not all media is present. See logs for details.") - os.Exit(1) - } - logrus.Info("All media present!") - return // exit 0 + if len(archiver.GetNotUploadedMxcUris()) <= 0 { + logrus.Warn("Found zero or fewer MXC URIs to import. This usually means there was no manifest found.") + return } + logrus.Debugf("Importing media for %s", archiver.GetEntityId()) - logrus.Info("Starting import...") - task, importId, err := data_controller.StartImport(f, ctx) - if err != nil { - panic(err) + // Re-process all the files properly this time + opts := v2archive.ProcessOpts{ + LockedEntityId: *onlyEntity, + CheckUploadedOnly: *verifyMode, } - - logrus.Info("Appending all other files to import") - for i, fname := range files { - if i == manifestIdx { - continue // already imported - } - - logrus.Info("Appending ", fname) - - if !data_controller.IsImportWaiting(importId) { - logrus.Info("Import claimed closed - ignoring file") - continue - } - + for _, fname := range files { + logrus.Debugf("Processing %s for media", fname) f, err := os.Open(fname) if err != nil { panic(err) } - defer f.Close() - ch, err := data_controller.AppendToImport(importId, f, true) - if err != nil { + if err = archiver.ProcessFile(f, opts); err != nil { panic(err) } - - if ch == nil { - logrus.Info("No channel returned by data controller - moving on to next file") - continue + } + if !opts.CheckUploadedOnly { + if err = archiver.ProcessS3Files(opts); err != nil { + panic(err) } - - logrus.Info("Waiting for file to be processed before moving on") - <-ch - close(ch) } - logrus.Info("Waiting for import to complete") - waitChan := make(chan bool) - defer close(waitChan) - go func() { - // Initial sleep to let the caches fill - time.Sleep(1 * time.Second) - - ctx := rcontext.Initial().LogWithFields(logrus.Fields{"async": true}) - db := storage.GetDatabase().GetMetadataStore(ctx) - for true { - ctx.Log.Info("Checking if task is complete") - - task, err := db.GetBackgroundTask(task.ID) - if err != nil { - logrus.Error(err) - } else if task.EndTs > 0 { - waitChan <- true - return - } - - time.Sleep(1 * time.Second) + missing := archiver.GetNotUploadedMxcUris() + if len(missing) > 0 { + for _, mxc := range missing { + logrus.Warnf("%s has not been uploaded yet - was it included in the package?", mxc) } - }() - <-waitChan - - logrus.Infof("Import complete!") + logrus.Warnf("%d MXC URIs have not been imported.", len(missing)) + } else if *verifyMode { + logrus.Info("All MXC URIs have been imported.") + } else { + logrus.Info("Import complete.") + } } diff --git a/datastores/s3.go b/datastores/s3.go index 3a0cb1e9b36946695964396b2a052c897b70400f..067664ac21470ed036e2024dc20bba54ef264ff2 100644 --- a/datastores/s3.go +++ b/datastores/s3.go @@ -1,7 +1,10 @@ package datastores import ( + "errors" + "fmt" "strconv" + "strings" "sync" "github.com/minio/minio-go/v7" @@ -63,7 +66,7 @@ func getS3(ds config.DatastoreConfig) (*s3, error) { return s3c, nil } -func GetS3Url(ds config.DatastoreConfig) (string, error) { +func GetS3Url(ds config.DatastoreConfig, location string) (string, error) { if ds.Type != "s3" { return "", nil } @@ -73,5 +76,34 @@ func GetS3Url(ds config.DatastoreConfig) (string, error) { return "", err } - return s3c.client.EndpointURL().String(), nil + // HACK: Surely there's a better way... + return fmt.Sprintf("https://%s/%s/%s", s3c.client.EndpointURL(), s3c.bucket, location), nil +} + +func ParseS3Url(s3url string) (config.DatastoreConfig, string, error) { + parts := strings.Split(s3url[len("https://"):], "/") + if len(parts) < 3 { + return config.DatastoreConfig{}, "", errors.New("invalid url") + } + + endpoint := parts[0] + location := parts[len(parts)-1] + bucket := strings.Join(parts[1:len(parts)-1], "/") + + for _, c := range config.Get().DataStores { + if c.Type != "s3" { + continue + } + + s3c, err := getS3(c) + if err != nil { + return config.DatastoreConfig{}, "", err + } + + if s3c.client.EndpointURL().Host == endpoint && s3c.bucket == bucket { + return c, location, nil + } + } + + return config.DatastoreConfig{}, "", errors.New("could not locate datastore") }