diff --git a/archival/v2_export.go b/archival/v2_export.go deleted file mode 100644 index 0e504fe65dacbe29bbd0bde9055cb9d3225efb1a..0000000000000000000000000000000000000000 --- a/archival/v2_export.go +++ /dev/null @@ -1,218 +0,0 @@ -package archival - -import ( - "archive/tar" - "bytes" - "compress/gzip" - "encoding/json" - "fmt" - "io" - "time" - - "github.com/dustin/go-humanize" - "github.com/gabriel-vasile/mimetype" - "github.com/sirupsen/logrus" - "github.com/turt2live/matrix-media-repo/common/rcontext" - "github.com/turt2live/matrix-media-repo/templating" - "github.com/turt2live/matrix-media-repo/util" - "github.com/turt2live/matrix-media-repo/util/stream_util" -) - -type V2ArchiveWriter interface { - WritePart(part int, fileName string, archive io.Reader, size int64) error -} - -type V2ArchiveExport struct { - exportId string - entity string - indexModel *templating.ExportIndexModel - writer V2ArchiveWriter - mediaManifest map[string]*V2ManifestRecord - partSize int64 - ctx rcontext.RequestContext - - // state variables - currentPart int - currentTar *tar.Writer - currentTarBytes *bytes.Buffer - currentSize int64 - writingManifest bool -} - -func NewV2Export(exportId string, entity string, partSize int64, writer V2ArchiveWriter, ctx rcontext.RequestContext) (*V2ArchiveExport, error) { - ctx = ctx.LogWithFields(logrus.Fields{ - "v2_export-id": exportId, - "v2_export-entity": entity, - "v2_export-partSize": partSize, - }) - archiver := &V2ArchiveExport{ - exportId: exportId, - entity: entity, - writer: writer, - partSize: partSize, - ctx: ctx, - indexModel: &templating.ExportIndexModel{ - Entity: entity, - ExportID: exportId, - Media: make([]*templating.ExportIndexMediaModel, 0), - }, - mediaManifest: make(map[string]*V2ManifestRecord), - currentPart: 0, - } - ctx.Log.Info("Preparing first tar file...") - err := archiver.newTar() - return archiver, err -} - -func (e *V2ArchiveExport) newTar() error { - if e.currentPart > 0 { - e.ctx.Log.Info("Persisting complete tar file...") - if err := e.persistTar(); err != nil { - return err - } - } - - e.ctx.Log.Info("Starting new tar file...") - e.currentTarBytes = &bytes.Buffer{} - e.currentTar = tar.NewWriter(e.currentTarBytes) - e.currentPart = e.currentPart + 1 - e.currentSize = 0 - - return nil -} - -func (e *V2ArchiveExport) persistTar() error { - _ = e.currentTar.Close() - - e.ctx.Log.Info("Compressing tar file...") - gzipBytes := &bytes.Buffer{} - archiver := gzip.NewWriter(gzipBytes) - archiver.Name = fmt.Sprintf("export-part-%d.tar", e.currentPart) - if e.writingManifest { - archiver.Name = "export-manifest.tar" - } - - if _, err := io.Copy(archiver, stream_util.ClonedBufReader(*e.currentTarBytes)); err != nil { - return err - } - _ = archiver.Close() - - e.ctx.Log.Info("Writing compressed tar") - name := fmt.Sprintf("export-part-%d.tgz", e.currentPart) - if e.writingManifest { - name = "export-manifest.tgz" - } - return e.writer.WritePart(e.currentPart, name, gzipBytes, int64(len(gzipBytes.Bytes()))) -} - -func (e *V2ArchiveExport) putFile(buf *bytes.Buffer, name string, creationTime time.Time) (int64, error) { - length := int64(len(buf.Bytes())) - header := &tar.Header{ - Name: name, - Size: length, - Mode: int64(0644), - ModTime: creationTime, - } - if err := e.currentTar.WriteHeader(header); err != nil { - return 0, err - } - - i, err := io.Copy(e.currentTar, buf) - if err != nil { - return 0, err - } - e.currentSize += i - - return length, nil -} - -func (e *V2ArchiveExport) AppendMedia(origin string, mediaId string, originalName string, contentType string, creationTime time.Time, file io.Reader, sha256 string, s3Url string, userId string) error { - // buffer the entire file into memory - buf := &bytes.Buffer{} - if _, err := io.Copy(buf, file); err != nil { - return err - } - - mime := mimetype.Detect(buf.Bytes()) - internalName := fmt.Sprintf("%s__%s%s", origin, mediaId, mime.Extension()) - - length, err := e.putFile(buf, internalName, creationTime) - if err != nil { - return err - } - - mxc := fmt.Sprintf("mxc://%s/%s", origin, mediaId) - e.mediaManifest[mxc] = &V2ManifestRecord{ - ArchivedName: internalName, - FileName: originalName, - SizeBytes: length, - ContentType: contentType, - S3Url: s3Url, - Sha256: sha256, - Origin: origin, - MediaId: mediaId, - CreatedTs: creationTime.UnixNano() / 1000000, - Uploader: userId, - } - e.indexModel.Media = append(e.indexModel.Media, &templating.ExportIndexMediaModel{ - ExportID: e.exportId, - ArchivedName: internalName, - FileName: originalName, - SizeBytes: length, - SizeBytesHuman: humanize.Bytes(uint64(length)), - Origin: origin, - MediaID: mediaId, - Sha256Hash: sha256, - ContentType: contentType, - UploadTs: creationTime.UnixNano() / 1000000, - UploadDateHuman: creationTime.Format(time.UnixDate), - Uploader: userId, - }) - - if e.currentSize >= e.partSize { - e.ctx.Log.Info("Rotating tar...") - return e.newTar() - } - - return nil -} - -func (e *V2ArchiveExport) Finish() error { - if err := e.newTar(); err != nil { - return err - } - - e.ctx.Log.Info("Writing manifest...") - e.writingManifest = true - defer (func() { e.writingManifest = false })() - manifest := &V2Manifest{ - Version: 2, - EntityId: e.entity, - CreatedTs: util.NowMillis(), - Media: e.mediaManifest, - } - b, err := json.Marshal(manifest) - if err != nil { - e.writingManifest = false - return err - } - if _, err := e.putFile(bytes.NewBuffer(b), "manifest.json", time.Now()); err != nil { - return err - } - - e.ctx.Log.Info("Writing index...") - t, err := templating.GetTemplate("export_index") - if err != nil { - return err - } - html := bytes.Buffer{} - if err := t.Execute(&html, e.indexModel); err != nil { - return err - } - if _, err := e.putFile(bytes.NewBuffer(html.Bytes()), "index.html", time.Now()); err != nil { - return err - } - - e.ctx.Log.Info("Writing manifest tar...") - return e.persistTar() -} diff --git a/archival/v2_export_disk_writer.go b/archival/v2_export_disk_writer.go deleted file mode 100644 index 6d6cd1a3937959e8b992fa6f7ec330353b8a2b61..0000000000000000000000000000000000000000 --- a/archival/v2_export_disk_writer.go +++ /dev/null @@ -1,26 +0,0 @@ -package archival - -import ( - "io" - "os" - "path" -) - -type V2ArchiveDiskWriter struct { - directory string -} - -func NewV2ArchiveDiskWriter(directory string) *V2ArchiveDiskWriter { - return &V2ArchiveDiskWriter{directory: directory} -} - -func (w V2ArchiveDiskWriter) WritePart(part int, fileName string, archive io.Reader, size int64) error { - f, err := os.Create(path.Join(w.directory, fileName)) - if err != nil { - return err - } - if _, err := io.Copy(f, archive); err != nil { - return err - } - return f.Close() -} diff --git a/archival/v2_manifest.go b/archival/v2archive/manifest.go similarity index 56% rename from archival/v2_manifest.go rename to archival/v2archive/manifest.go index 249e28f22c3ec111e2c6ca425eb8a015a155ff6d..3ebfedfd20d573ac7a0b52aa0e62e7a98b460448 100644 --- a/archival/v2_manifest.go +++ b/archival/v2archive/manifest.go @@ -1,6 +1,13 @@ -package archival +package v2archive -type V2ManifestRecord struct { +type Manifest struct { + Version int `json:"version"` + EntityId string `json:"entity_id"` + CreatedTs int64 `json:"created_ts"` + Media map[string]*ManifestRecord `json:"media"` +} + +type ManifestRecord struct { FileName string `json:"name"` ArchivedName string `json:"file_name"` SizeBytes int64 `json:"size_bytes"` @@ -12,10 +19,3 @@ type V2ManifestRecord struct { CreatedTs int64 `json:"created_ts"` Uploader string `json:"uploader"` } - -type V2Manifest struct { - Version int `json:"version"` - EntityId string `json:"entity_id"` - CreatedTs int64 `json:"created_ts"` - Media map[string]*V2ManifestRecord `json:"media"` -} diff --git a/archival/v2archive/writer.go b/archival/v2archive/writer.go new file mode 100644 index 0000000000000000000000000000000000000000..7d934078ae984762de49ea7594ed462aae412ac0 --- /dev/null +++ b/archival/v2archive/writer.go @@ -0,0 +1,276 @@ +package v2archive + +import ( + "archive/tar" + "compress/gzip" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "fmt" + "html" + "io" + "os" + "time" + + "github.com/dustin/go-humanize" + "github.com/gabriel-vasile/mimetype" + "github.com/sirupsen/logrus" + "github.com/turt2live/matrix-media-repo/common/rcontext" + "github.com/turt2live/matrix-media-repo/templating" + "github.com/turt2live/matrix-media-repo/util" + "github.com/turt2live/matrix-media-repo/util/readers" +) + +type MediaInfo struct { + Origin string + MediaId string + FileName string + ContentType string + CreationTs int64 + S3Url string + UserId string +} + +type PartPersister func(part int, fileName string, data io.ReadCloser) error + +type ArchiveWriter struct { + ctx rcontext.RequestContext + + exportId string + entity string + index *templating.ExportIndexModel + mediaManifest map[string]*ManifestRecord + partSize int64 + writeFn PartPersister + + // state machine variables + currentPart int + currentTar *tar.Writer + currentTempFile *os.File + currentSize int64 + writingManifest bool +} + +func NewWriter(ctx rcontext.RequestContext, exportId string, entity string, partSize int64, writeFn PartPersister) (*ArchiveWriter, error) { + ctx = ctx.LogWithFields(logrus.Fields{ + "v2archive-id": exportId, + "v2archive-entity": entity, + }) + archiver := &ArchiveWriter{ + ctx: ctx, + exportId: exportId, + entity: entity, + index: &templating.ExportIndexModel{ + ExportID: exportId, + Entity: entity, + Media: make([]*templating.ExportIndexMediaModel, 0), + }, + mediaManifest: make(map[string]*ManifestRecord), + partSize: partSize, + writeFn: writeFn, + currentPart: 0, + } + err := archiver.beginTar() + return archiver, err +} + +func (w *ArchiveWriter) rotateTar() error { + if w.currentPart > 0 { + if err := w.writeTar(); err != nil { + return err + } + } + + return w.beginTar() +} + +func (w *ArchiveWriter) beginTar() error { + w.currentSize = 0 + w.currentPart = w.currentPart + 1 + + file, err := os.CreateTemp(os.TempDir(), "mmr-archive") + if err != nil { + return err + } + + w.currentTempFile = file + w.currentTar = tar.NewWriter(file) + return nil +} + +func (w *ArchiveWriter) writeTar() error { + _ = w.currentTar.Close() + + tempFilePath := w.currentTempFile.Name() + if err := w.currentTempFile.Close(); err != nil { + return err + } + f, err := os.Open(tempFilePath) + if err != nil { + return err + } + + pr, pw := io.Pipe() + archiver := gzip.NewWriter(pw) + fname := fmt.Sprintf("export-part-%d", w.currentPart) + if w.writingManifest { + fname = "export-manifest" + } + archiver.Name = fname + ".tar" + + w.ctx.Log.Debug("Writing tar file to gzip container: ", archiver.Name) + + go func() { + _, err := io.Copy(archiver, f) + if err != nil { + _ = pw.CloseWithError(err) + } else { + err = archiver.Close() + if err != nil { + _ = pw.CloseWithError(err) + } else { + _ = pw.Close() + } + } + }() + + closerStack := readers.NewCancelCloser(pr, func() { + _ = readers.NewTempFileCloser("", f.Name(), f).Close() + }) + return w.writeFn(w.currentPart, fname+".tgz", closerStack) +} + +// AppendMedia / returns (sha256hash, error) +func (w *ArchiveWriter) AppendMedia(file io.ReadCloser, info MediaInfo) (string, error) { + defer file.Close() + br := readers.NewBufferReadsReader(file) + mime, err := mimetype.DetectReader(br) + if err != nil { + return "", err + } + internalName := fmt.Sprintf("%s__%s%s", info.Origin, info.MediaId, mime.Extension()) + + createTime := util.FromMillis(info.CreationTs) + + size, sha256hash, err := w.putFile(br.GetRewoundReader(), internalName, createTime) + w.mediaManifest[util.MxcUri(info.Origin, info.MediaId)] = &ManifestRecord{ + FileName: info.FileName, + ArchivedName: internalName, + SizeBytes: size, + ContentType: info.ContentType, + S3Url: info.S3Url, + Sha256: sha256hash, + Origin: info.Origin, + MediaId: info.MediaId, + CreatedTs: info.CreationTs, + Uploader: info.UserId, + } + w.index.Media = append(w.index.Media, &templating.ExportIndexMediaModel{ + ExportID: w.exportId, + ArchivedName: internalName, + FileName: html.EscapeString(info.FileName), + Origin: info.Origin, + MediaID: info.MediaId, + SizeBytes: size, + SizeBytesHuman: humanize.Bytes(uint64(size)), + UploadTs: info.CreationTs, + UploadDateHuman: createTime.UTC().Format(time.UnixDate), + Sha256Hash: sha256hash, + ContentType: info.ContentType, + Uploader: info.UserId, + }) + + if w.currentSize >= w.partSize { + return sha256hash, w.rotateTar() + } + + return sha256hash, nil +} + +func (w *ArchiveWriter) putFile(r io.Reader, name string, creationTime time.Time) (int64, string, error) { + f, err := os.CreateTemp(os.TempDir(), "mmr-archive-put") + if err != nil { + return 0, "", err + } + defer func() { + f.Close() + os.Remove(f.Name()) + }() + i1, err := io.Copy(f, r) + if err != nil { + return 0, "", err + } + if _, err = f.Seek(0, io.SeekStart); err != nil { + return 0, "", err + } + + hasher := sha256.New() + header := &tar.Header{ + Name: name, + Mode: int64(0644), + ModTime: creationTime, + Size: i1, + } + if err := w.currentTar.WriteHeader(header); err != nil { + return 0, "", err + } + + mw := io.MultiWriter(hasher, w.currentTar) + i2, err := io.Copy(mw, f) + if err != nil { + return 0, "", err + } + w.currentSize = w.currentSize + i2 + + if i1 != i2 { + w.ctx.Log.Warnf("Size mismatch! Expected %d bytes but wrote %d instead", i1, i2) + } + + return i2, hex.EncodeToString(hasher.Sum(nil)), nil +} + +func (w *ArchiveWriter) Finish() error { + if err := w.rotateTar(); err != nil { + return err + } + + w.writingManifest = true + defer func() { w.writingManifest = false }() + manifest := &Manifest{ + Version: 2, + EntityId: w.entity, + CreatedTs: util.NowMillis(), + Media: w.mediaManifest, + } + pr, pw := io.Pipe() + jenc := json.NewEncoder(pw) + go func() { + if err := jenc.Encode(manifest); err != nil { + _ = pw.CloseWithError(err) + } else { + _ = pw.Close() + } + }() + if _, _, err := w.putFile(pr, "manifest.json", time.Now()); err != nil { + return err + } + + t, err := templating.GetTemplate("export_index") + if err != nil { + return err + } + + pr, pw = io.Pipe() + go func() { + if err := t.Execute(pw, w.index); err != nil { + _ = pw.CloseWithError(err) + } else { + _ = pw.Close() + } + }() + if _, _, err := w.putFile(pr, "index.html", time.Now()); err != nil { + return err + } + + return w.writeTar() +} diff --git a/cmd/export_synapse_for_import/main.go b/cmd/export_synapse_for_import/main.go index 76eb5acd9a4093a4111143e68921553cf9ccf84a..f58afbf4f5d1c4458790fb930680d83645adbdd8 100644 --- a/cmd/export_synapse_for_import/main.go +++ b/cmd/export_synapse_for_import/main.go @@ -1,7 +1,6 @@ package main import ( - "bytes" "flag" "fmt" "io" @@ -10,16 +9,14 @@ import ( "strconv" "strings" - "github.com/sirupsen/logrus" - "github.com/turt2live/matrix-media-repo/archival" + "github.com/turt2live/matrix-media-repo/archival/v2archive" "github.com/turt2live/matrix-media-repo/common/assets" "github.com/turt2live/matrix-media-repo/common/config" "github.com/turt2live/matrix-media-repo/common/logging" "github.com/turt2live/matrix-media-repo/common/rcontext" - "github.com/turt2live/matrix-media-repo/common/runtime" + "github.com/turt2live/matrix-media-repo/common/version" "github.com/turt2live/matrix-media-repo/homeserver_interop/synapse" "github.com/turt2live/matrix-media-repo/util" - "github.com/turt2live/matrix-media-repo/util/stream_util" "golang.org/x/crypto/ssh/terminal" ) @@ -35,8 +32,12 @@ func main() { importPath := flag.String("mediaDirectory", "./media_store", "The media_store_path for Synapse") partSizeBytes := flag.Int64("partSize", 104857600, "The number of bytes (roughly) to split the export files into.") skipMissing := flag.Bool("skipMissing", false, "If a media file can't be found, skip it.") + debug := flag.Bool("debug", false, "Enables debug logging.") + prettyLog := flag.Bool("prettyLog", false, "Enables pretty logging (colours).") flag.Parse() + version.SetDefaults() + version.Print(true) assets.SetupTemplates(*templatesPath) _ = os.MkdirAll(*exportPath, 0755) @@ -59,39 +60,51 @@ func main() { realPsqlPassword = *postgresPassword } - err := logging.Setup( - config.Get().General.LogDirectory, - config.Get().General.LogColors, - config.Get().General.JsonLogs, - config.Get().General.LogLevel, - ) - if err != nil { + level := "info" + if *debug { + level = "debug" + } + if err := logging.Setup( + "-", + *prettyLog, + false, + level, + ); err != nil { panic(err) } - logrus.Info("Setting up for importing...") - runtime.CheckIdGenerator() + ctx := rcontext.InitialNoConfig() connectionString := "postgres://" + *postgresUsername + ":" + realPsqlPassword + "@" + *postgresHost + ":" + strconv.Itoa(*postgresPort) + "/" + *postgresDatabase + "?sslmode=disable" - logrus.Info("Connecting to synapse database...") + ctx.Log.Debug("Connecting to synapse database...") synDb, err := synapse.OpenDatabase(connectionString) if err != nil { panic(err) } - logrus.Info("Fetching all local media records from synapse...") + ctx.Log.Info("Fetching all local media records from Synapse...") records, err := synDb.GetAllMedia() if err != nil { panic(err) } - logrus.Info(fmt.Sprintf("Exporting %d media records", len(records))) + ctx.Log.Info(fmt.Sprintf("Exporting %d media records", len(records))) - writer := archival.NewV2ArchiveDiskWriter(*exportPath) - exporter, err := archival.NewV2Export("OOB", *serverName, *partSizeBytes, writer, rcontext.Initial()) + archiver, err := v2archive.NewWriter(ctx, "OOB", *serverName, *partSizeBytes, func(part int, fileName string, data io.ReadCloser) error { + defer data.Close() + f, errf := os.Create(path.Join(*exportPath, fileName)) + if errf != nil { + return errf + } + _, errf = io.Copy(f, data) + if errf != nil { + return errf + } + return nil + }) if err != nil { - logrus.Fatal(err) + ctx.Log.Fatal(err) } missing := make([]string, 0) @@ -103,9 +116,9 @@ func main() { // For a URL MediaID 2020-08-17_AABBCCDD: // $importPath/url_cache/2020-08-17/AABBCCDD - mxc := fmt.Sprintf("mxc://%s/%s", *serverName, r.MediaId) + mxc := util.MxcUri(*serverName, r.MediaId) - logrus.Info("Copying " + mxc) + ctx.Log.Info("Copying " + mxc) filePath := path.Join(*importPath, "local_content", r.MediaId[0:2], r.MediaId[2:4], r.MediaId[4:]) if r.UrlCache != "" { @@ -115,36 +128,34 @@ func main() { f, err := os.Open(filePath) if os.IsNotExist(err) && *skipMissing { - logrus.Warn("File does not appear to exist, skipping: " + filePath) + ctx.Log.Warn("File does not appear to exist, skipping: " + filePath) missing = append(missing, filePath) continue } if err != nil { - logrus.Fatal(err) - } - - d := &bytes.Buffer{} - _, _ = io.Copy(d, f) - _ = f.Close() - - temp := bytes.NewBuffer(d.Bytes()) - sha256, err := stream_util.GetSha256HashOfStream(io.NopCloser(temp)) - if err != nil { - logrus.Fatal(err) + ctx.Log.Fatal(err) } - err = exporter.AppendMedia(*serverName, r.MediaId, r.UploadName, r.ContentType, util.FromMillis(r.CreatedTs), d, sha256, "", r.UserId) + _, err = archiver.AppendMedia(f, v2archive.MediaInfo{ + Origin: *serverName, + MediaId: r.MediaId, + FileName: r.UploadName, + ContentType: r.ContentType, + CreationTs: r.CreatedTs, + S3Url: "", + UserId: r.UserId, + }) if err != nil { - logrus.Fatal(err) + ctx.Log.Fatal(err) } } - err = exporter.Finish() + err = archiver.Finish() if err != nil { - logrus.Fatal(err) + ctx.Log.Fatal(err) } - logrus.Info("Done export - cleaning up...") + ctx.Log.Info("Done export - cleaning up...") // Clean up assets.Cleanup() @@ -152,9 +163,9 @@ func main() { // Report missing files if len(missing) > 0 { for _, m := range missing { - logrus.Warn("Was not able to find " + m) + ctx.Log.Warn("Was not able to find " + m) } } - logrus.Info("Export completed") + ctx.Log.Info("Export completed") } diff --git a/cmd/import_synapse/main.go b/cmd/import_synapse/main.go index b72affc28e53393ece740384e16c4ce19a308160..4eabd0421f30c6ad6f80100b165b9624ea967584 100644 --- a/cmd/import_synapse/main.go +++ b/cmd/import_synapse/main.go @@ -90,8 +90,6 @@ func main() { logrus.Info("Starting up...") runtime.RunStartupSequence() - logrus.Debug("Setting up for importing...") - connectionString := "postgres://" + *postgresUsername + ":" + realPsqlPassword + "@" + *postgresHost + ":" + strconv.Itoa(*postgresPort) + "/" + *postgresDatabase + "?sslmode=disable" csApiUrl := *baseUrl if csApiUrl[len(csApiUrl)-1:] == "/" { diff --git a/common/rcontext/request_context.go b/common/rcontext/request_context.go index 9c1bf8c4e28b84e6624416eb11c1d5f5b69a62e2..d5e151d846e56b2c1bb94ecb728bb0cfd5bcd7cf 100644 --- a/common/rcontext/request_context.go +++ b/common/rcontext/request_context.go @@ -11,7 +11,7 @@ import ( func Initial() RequestContext { return RequestContext{ Context: context.Background(), - Log: logrus.WithFields(logrus.Fields{"nocontext": true}), + Log: logrus.WithFields(logrus.Fields{"internal_flag": 1}), Config: config.DomainRepoConfig{ MinimumRepoConfig: config.Get().MinimumRepoConfig, Downloads: config.Get().Downloads.DownloadsConfig, @@ -22,6 +22,15 @@ func Initial() RequestContext { }.populate() } +func InitialNoConfig() RequestContext { + return RequestContext{ + Context: context.Background(), + Log: logrus.WithFields(logrus.Fields{"internal_flag": 2}), + Config: config.DomainRepoConfig{}, + Request: nil, + }.populate() +} + type RequestContext struct { context.Context