diff --git a/CHANGELOG.md b/CHANGELOG.md index 65c34e58593298c8c6b5649461bc9873bf64629c..ebe6b7c40c866b58e8b0a6536d37f39c1f3773a2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,7 +7,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ## [Unreleased] -*Nothing yet.* +### Added + +* Added a new tool, `export_synapse_for_import`, which can be used to do an offline import from Synapse. + * After running this tool, use the `gdpr_import` tool to bring the export into the media repo. ## [1.2.0] - August 2nd, 2020 diff --git a/cmd/export_synapse_for_import/main.go b/cmd/export_synapse_for_import/main.go new file mode 100644 index 0000000000000000000000000000000000000000..568417a9feda8ced419bb7181f729897fafd09fc --- /dev/null +++ b/cmd/export_synapse_for_import/main.go @@ -0,0 +1,302 @@ +package main + +import ( + "archive/tar" + "bytes" + "compress/gzip" + "encoding/json" + "flag" + "fmt" + "io" + "io/ioutil" + "os" + "path" + "strconv" + "time" + + "github.com/dustin/go-humanize" + "github.com/sirupsen/logrus" + "github.com/turt2live/matrix-media-repo/common/assets" + "github.com/turt2live/matrix-media-repo/common/config" + "github.com/turt2live/matrix-media-repo/common/logging" + "github.com/turt2live/matrix-media-repo/controllers/data_controller" + "github.com/turt2live/matrix-media-repo/synapse" + "github.com/turt2live/matrix-media-repo/templating" + "github.com/turt2live/matrix-media-repo/util" + "golang.org/x/crypto/ssh/terminal" +) + +func main() { + postgresHost := flag.String("dbHost", "localhost", "The PostgresSQL hostname for your Synapse database") + postgresPort := flag.Int("dbPort", 5432, "The port for your Synapse's PostgreSQL database") + postgresUsername := flag.String("dbUsername", "synapse", "The username for your Synapse's PostgreSQL database") + postgresPassword := flag.String("dbPassword", "", "The password for your Synapse's PostgreSQL database. Can be omitted to be prompted when run") + postgresDatabase := flag.String("dbName", "synapse", "The name of your Synapse database") + serverName := flag.String("serverName", "localhost", "The name of your homeserver (eg: matrix.org)") + templatesPath := flag.String("templates", config.DefaultTemplatesPath, "The absolute path for the templates folder") + exportPath := flag.String("destination", "./media-export", "The directory to export the files to (will be created if needed)") + importPath := flag.String("mediaDirectory", "./media_store", "The media_store_path for Synapse") + partSizeBytes := flag.Int64("partSize", 104857600, "The number of bytes (roughly) to split the export files into.") + flag.Parse() + + assets.SetupTemplates(*templatesPath) + + _ = os.MkdirAll(*exportPath, 0755) + + var realPsqlPassword string + if *postgresPassword == "" { + if !terminal.IsTerminal(int(os.Stdin.Fd())) { + fmt.Println("Sorry, your terminal does not support reading passwords. Please supply a -dbPassword or use a different terminal.") + fmt.Println("If you're on Windows, try using a plain Command Prompt window instead of a bash-like terminal.") + os.Exit(1) + return // for good measure + } + fmt.Printf("Postgres password: ") + pass, err := terminal.ReadPassword(int(os.Stdin.Fd())) + if err != nil { + panic(err) + } + realPsqlPassword = string(pass[:]) + } else { + realPsqlPassword = *postgresPassword + } + + err := logging.Setup(config.Get().General.LogDirectory) + if err != nil { + panic(err) + } + + logrus.Info("Setting up for importing...") + + connectionString := "postgres://" + *postgresUsername + ":" + realPsqlPassword + "@" + *postgresHost + ":" + strconv.Itoa(*postgresPort) + "/" + *postgresDatabase + "?sslmode=disable" + + logrus.Info("Connecting to synapse database...") + synDb, err := synapse.OpenDatabase(connectionString) + if err != nil { + panic(err) + } + + logrus.Info("Fetching all local media records from synapse...") + records, err := synDb.GetAllMedia() + if err != nil { + panic(err) + } + + logrus.Info(fmt.Sprintf("Exporting %d media records", len(records))) + + // TODO: Share this logic with export_controller somehow + var currentTar *tar.Writer + var currentTarBytes bytes.Buffer + part := 0 + currentSize := int64(0) + isManifestTar := false + + persistTar := func() error { + _ = currentTar.Close() + + // compress + logrus.Info("Compressing tar file...") + gzipBytes := bytes.Buffer{} + archiver := gzip.NewWriter(&gzipBytes) + archiver.Name = fmt.Sprintf("export-part-%d.tar", part) + if isManifestTar { + archiver.Name = fmt.Sprintf("export-manifest.tar") + } + _, err := io.Copy(archiver, bytes.NewBuffer(currentTarBytes.Bytes())) + if err != nil { + return err + } + _ = archiver.Close() + + logrus.Info("Writing compressed tar to disk...") + name := fmt.Sprintf("export-part-%d.tgz", part) + if isManifestTar { + name = "export-manifest.tgz" + } + f, err := os.Create(path.Join(*exportPath, name)) + if err != nil { + return err + } + _, _ = io.Copy(f, &gzipBytes) + _ = f.Close() + + return nil + } + + newTar := func() error { + if part > 0 { + logrus.Info("Persisting complete tar file...") + err := persistTar() + if err != nil { + return err + } + } + + logrus.Info("Starting new tar file...") + currentTarBytes = bytes.Buffer{} + currentTar = tar.NewWriter(¤tTarBytes) + part = part + 1 + currentSize = 0 + + return nil + } + + // Start the first tar file + logrus.Info("Preparing first tar file...") + err = newTar() + if err != nil { + logrus.Fatal(err) + } + + putFile := func(name string, size int64, creationTime time.Time, file io.Reader) error { + header := &tar.Header{ + Name: name, + Size: size, + Mode: int64(0644), + ModTime: creationTime, + } + err := currentTar.WriteHeader(header) + if err != nil { + return err + } + + i, err := io.Copy(currentTar, file) + if err != nil { + return err + } + + currentSize += i + + return nil + } + + archivedName := func(origin string, mediaId string) string { + // TODO: Pick the right extension for the file type + return fmt.Sprintf("%s__%s.obj", origin, mediaId) + } + + logrus.Info("Preparing manifest...") + indexModel := &templating.ExportIndexModel{ + Entity: *serverName, + ExportID: "OOB", + Media: make([]*templating.ExportIndexMediaModel, 0), + } + mediaManifest := make(map[string]*data_controller.ManifestRecord) + + for _, r := range records { + // For MediaID AABBCCDD : + // $importPath/local_content/AA/BB/CCDD + mxc := fmt.Sprintf("mxc://%s/%s", *serverName, r.MediaId) + + logrus.Info("Copying " + mxc) + + f, err := os.Open(path.Join(*importPath, "local_content", r.MediaId[0:2], r.MediaId[2:4], r.MediaId[4:])) + if err != nil { + logrus.Fatal(err) + } + + d := &bytes.Buffer{} + _, _ = io.Copy(d, f) + _ = f.Close() + + temp := bytes.NewBuffer(d.Bytes()) + sha256, err := util.GetSha256HashOfStream(ioutil.NopCloser(temp)) + if err != nil { + logrus.Fatal(err) + } + + err = putFile(archivedName(*serverName, r.MediaId), r.SizeBytes, util.FromMillis(r.CreatedTs), d) + if err != nil { + logrus.Fatal(err) + } + + if currentSize >= *partSizeBytes { + logrus.Info("Rotating tar...") + err = newTar() + if err != nil { + logrus.Fatal(err) + } + } + + mediaManifest[mxc] = &data_controller.ManifestRecord{ + ArchivedName: archivedName(*serverName, r.MediaId), + FileName: r.UploadName, + SizeBytes: r.SizeBytes, + ContentType: r.ContentType, + S3Url: "", + Sha256: sha256, + Origin: *serverName, + MediaId: r.MediaId, + CreatedTs: r.CreatedTs, + Uploader: r.UserId, + } + indexModel.Media = append(indexModel.Media, &templating.ExportIndexMediaModel{ + ExportID: "OOB", + ArchivedName: archivedName(*serverName, r.MediaId), + FileName: r.UploadName, + SizeBytes: r.SizeBytes, + SizeBytesHuman: humanize.Bytes(uint64(r.SizeBytes)), + Origin: *serverName, + MediaID: r.MediaId, + Sha256Hash: sha256, + ContentType: r.ContentType, + UploadTs: r.CreatedTs, + UploadDateHuman: util.FromMillis(r.CreatedTs).Format(time.UnixDate), + Uploader: r.UserId, + }) + } + + logrus.Info("Preparing manifest-specific tar...") + err = newTar() + if err != nil { + logrus.Fatal(err) + } + + logrus.Info("Writing manifest...") + isManifestTar = true + manifest := &data_controller.Manifest{ + Version: 2, + EntityId: *serverName, + CreatedTs: util.NowMillis(), + Media: mediaManifest, + } + b, err := json.Marshal(manifest) + if err != nil { + logrus.Fatal(err) + } + err = putFile("manifest.json", int64(len(b)), time.Now(), bytes.NewBuffer(b)) + if err != nil { + logrus.Fatal(err) + } + + logrus.Info("Building and writing index...") + t, err := templating.GetTemplate("export_index") + if err != nil { + logrus.Fatal(err) + return + } + html := bytes.Buffer{} + err = t.Execute(&html, indexModel) + if err != nil { + logrus.Fatal(err) + return + } + err = putFile("index.html", int64(html.Len()), time.Now(), util.BufferToStream(bytes.NewBuffer(html.Bytes()))) + if err != nil { + logrus.Fatal(err) + return + } + + logrus.Info("Writing final tar...") + err = persistTar() + if err != nil { + logrus.Fatal(err) + } + + logrus.Info("Done export - cleaning up...") + + // Clean up + assets.Cleanup() + + logrus.Info("Import completed") +} diff --git a/cmd/gdpr_export/main.go b/cmd/gdpr_export/main.go index f604e30ec1ab3877ac087010cfcf073283c4bbef..c3213de6d364142254523a47af3a9dad20ddf39c 100644 --- a/cmd/gdpr_export/main.go +++ b/cmd/gdpr_export/main.go @@ -41,7 +41,8 @@ func main() { } config.Path = *configPath - assets.SetupTemplatesAndMigrations(*migrationsPath, *templatesPath) + assets.SetupMigrations(*migrationsPath) + assets.SetupTemplates(*templatesPath) var err error err = logging.Setup(config.Get().General.LogDirectory) diff --git a/cmd/gdpr_import/main.go b/cmd/gdpr_import/main.go index 74416951c02b3d3d156e4740ed6501e9fedf39f1..fd420b3a0f42774671d9ab095e8b99b4342a8dd1 100644 --- a/cmd/gdpr_import/main.go +++ b/cmd/gdpr_import/main.go @@ -30,7 +30,7 @@ func main() { } config.Path = *configPath - assets.SetupTemplatesAndMigrations(*migrationsPath, "") + assets.SetupMigrations(*migrationsPath) var err error err = logging.Setup(config.Get().General.LogDirectory) diff --git a/cmd/import_synapse/main.go b/cmd/import_synapse/main.go index 5855e4e2ec2c0a44edacd2cd5af593843b9090a4..958c8b9a69ab9af6e01eddafc26c39ddb3fa3643 100644 --- a/cmd/import_synapse/main.go +++ b/cmd/import_synapse/main.go @@ -52,7 +52,7 @@ func main() { } config.Path = *configPath - assets.SetupTemplatesAndMigrations(*migrationsPath, "") + assets.SetupMigrations(*migrationsPath) var realPsqlPassword string if *postgresPassword == "" { diff --git a/cmd/media_repo/main.go b/cmd/media_repo/main.go index b87cd245535f6c3727802fbb81108746a92bd4ed..0e1da9b26a276b901de906be445838ec430012a3 100644 --- a/cmd/media_repo/main.go +++ b/cmd/media_repo/main.go @@ -37,7 +37,8 @@ func main() { } config.Path = *configPath - assets.SetupTemplatesAndMigrations(*migrationsPath, *templatesPath) + assets.SetupMigrations(*migrationsPath) + assets.SetupTemplates(*templatesPath) assets.SetupAssets(*assetsPath) err := logging.Setup(config.Get().General.LogDirectory) diff --git a/common/assets/process.go b/common/assets/process.go index 4453908c6d900531ee9517444b360fbc98122650..29b066328822afb327414edd602d76c45178ccd9 100644 --- a/common/assets/process.go +++ b/common/assets/process.go @@ -17,7 +17,7 @@ import ( var tempMigrations string var tempTemplates string -func SetupTemplatesAndMigrations(givenMigrationsPath string, givenTemplatesPath string) { +func SetupMigrations(givenMigrationsPath string) { _, err := os.Stat(givenMigrationsPath) exists := err == nil || !os.IsNotExist(err) if !exists { @@ -30,9 +30,13 @@ func SetupTemplatesAndMigrations(givenMigrationsPath string, givenTemplatesPath givenMigrationsPath = tempMigrations } + config.Runtime.MigrationsPath = givenMigrationsPath +} + +func SetupTemplates(givenTemplatesPath string) { if givenTemplatesPath != "" { - _, err = os.Stat(givenTemplatesPath) - exists = err == nil || !os.IsNotExist(err) + _, err := os.Stat(givenTemplatesPath) + exists := err == nil || !os.IsNotExist(err) if !exists { tempTemplates, err = ioutil.TempDir(os.TempDir(), "media-repo-templates") if err != nil { @@ -44,7 +48,6 @@ func SetupTemplatesAndMigrations(givenMigrationsPath string, givenTemplatesPath } } - config.Runtime.MigrationsPath = givenMigrationsPath config.Runtime.TemplatesPath = givenTemplatesPath } diff --git a/controllers/data_controller/export_controller.go b/controllers/data_controller/export_controller.go index 10437c5df2eeb3aac1f3858d002db2c4041bd6aa..b423935ad3a16a7ad5376a98081ec86880ff9c51 100644 --- a/controllers/data_controller/export_controller.go +++ b/controllers/data_controller/export_controller.go @@ -22,7 +22,7 @@ import ( "github.com/turt2live/matrix-media-repo/util/cleanup" ) -type manifestRecord struct { +type ManifestRecord struct { FileName string `json:"name"` ArchivedName string `json:"file_name"` SizeBytes int64 `json:"size_bytes"` @@ -35,11 +35,11 @@ type manifestRecord struct { Uploader string `json:"uploader"` } -type manifest struct { +type Manifest struct { Version int `json:"version"` EntityId string `json:"entity_id"` CreatedTs int64 `json:"created_ts"` - Media map[string]*manifestRecord `json:"media"` + Media map[string]*ManifestRecord `json:"media"` // Deprecated: for v1 manifests UserId string `json:"user_id,omitempty"` @@ -253,7 +253,7 @@ func compileArchive(exportId string, entityId string, archiveDs *datastore.Datas ExportID: exportId, Media: make([]*templating.ExportIndexMediaModel, 0), } - mediaManifest := make(map[string]*manifestRecord) + mediaManifest := make(map[string]*ManifestRecord) for _, m := range media { var s3url string if s3urls { @@ -262,7 +262,7 @@ func compileArchive(exportId string, entityId string, archiveDs *datastore.Datas ctx.Log.Warn(err) } } - mediaManifest[m.MxcUri()] = &manifestRecord{ + mediaManifest[m.MxcUri()] = &ManifestRecord{ ArchivedName: archivedName(m), FileName: m.UploadName, SizeBytes: m.SizeBytes, @@ -289,7 +289,7 @@ func compileArchive(exportId string, entityId string, archiveDs *datastore.Datas Uploader: m.UserId, }) } - manifest := &manifest{ + manifest := &Manifest{ Version: 2, EntityId: entityId, CreatedTs: util.NowMillis(), diff --git a/controllers/data_controller/import_controller.go b/controllers/data_controller/import_controller.go index 7eec6e5bc1ff94b5ecf160e6950ab6be1010099c..01a29be215fd4b3d8e992fd39987e37f0723a8db 100644 --- a/controllers/data_controller/import_controller.go +++ b/controllers/data_controller/import_controller.go @@ -138,7 +138,7 @@ func doImport(updateChannel chan *importUpdate, taskId int, importId string, ctx ctx.Log.Info("Preparing for import...") fileMap := make(map[string]*bytes.Buffer) stopImport := false - archiveManifest := &manifest{} + archiveManifest := &Manifest{} haveManifest := false imported := make(map[string]bool) db := storage.GetDatabase().GetMediaStore(ctx)