Skip to content
Snippets Groups Projects
Commit b5e4c88a authored by Travis Ralston's avatar Travis Ralston
Browse files

Remove more dead code to make it easier to see changes

parent ddbdd424
No related branches found
No related tags found
No related merge requests found
package main
import (
"fmt"
"github.com/sirupsen/logrus"
"github.com/turt2live/matrix-media-repo/common/rcontext"
"github.com/turt2live/matrix-media-repo/controllers/maintenance_controller"
"github.com/turt2live/matrix-media-repo/storage"
"github.com/turt2live/matrix-media-repo/storage/datastore"
)
func scanAndStartUnfinishedTasks() error {
ctx := rcontext.Initial().LogWithFields(logrus.Fields{"stage": "startup"})
db := storage.GetDatabase().GetMetadataStore(ctx)
tasks, err := db.GetAllBackgroundTasks()
if err != nil {
return err
}
for _, task := range tasks {
if task.EndTs > 0 {
continue
}
taskCtx := ctx.LogWithFields(logrus.Fields{
"prev_task_id": task.ID,
"prev_task_name": task.Name,
})
if task.Name == "storage_migration" {
beforeTs := int64(task.Params["before_ts"].(float64))
sourceDsId := task.Params["source_datastore_id"].(string)
targetDsId := task.Params["target_datastore_id"].(string)
sourceDs, err := datastore.LocateDatastore(taskCtx, sourceDsId)
if err != nil {
return err
}
targetDs, err := datastore.LocateDatastore(taskCtx, targetDsId)
if err != nil {
return err
}
newTask, err := maintenance_controller.StartStorageMigration(sourceDs, targetDs, beforeTs, taskCtx)
if err != nil {
return err
}
err = db.FinishedBackgroundTask(task.ID)
if err != nil {
return err
}
taskCtx.Log.Infof("Started replacement task ID %d for unfinished task %d (%s)", newTask.ID, task.ID, task.Name)
} else {
taskCtx.Log.Warn(fmt.Sprintf("Unknown task %s at ID %d - ignoring", task.Name, task.ID))
}
}
return nil
}
...@@ -4,9 +4,6 @@ import ( ...@@ -4,9 +4,6 @@ import (
"database/sql" "database/sql"
"os" "os"
"github.com/getsentry/sentry-go"
"github.com/sirupsen/logrus"
"github.com/turt2live/matrix-media-repo/common/rcontext" "github.com/turt2live/matrix-media-repo/common/rcontext"
"github.com/turt2live/matrix-media-repo/controllers/download_controller" "github.com/turt2live/matrix-media-repo/controllers/download_controller"
"github.com/turt2live/matrix-media-repo/storage" "github.com/turt2live/matrix-media-repo/storage"
...@@ -15,153 +12,6 @@ import ( ...@@ -15,153 +12,6 @@ import (
"github.com/turt2live/matrix-media-repo/util" "github.com/turt2live/matrix-media-repo/util"
) )
// Returns an error only if starting up the background task failed.
func StartStorageMigration(sourceDs *datastore.DatastoreRef, targetDs *datastore.DatastoreRef, beforeTs int64, ctx rcontext.RequestContext) (*types.BackgroundTask, error) {
db := storage.GetDatabase().GetMetadataStore(ctx)
task, err := db.CreateBackgroundTask("storage_migration", map[string]interface{}{
"source_datastore_id": sourceDs.DatastoreId,
"target_datastore_id": targetDs.DatastoreId,
"before_ts": beforeTs,
})
if err != nil {
return nil, err
}
go func() {
ctx.Log.Info("Starting transfer")
db := storage.GetDatabase().GetMetadataStore(ctx)
doUpdate := func(records []*types.MinimalMediaMetadata) {
for _, record := range records {
rctx := ctx.LogWithFields(logrus.Fields{"mediaSha256": record.Sha256Hash})
rctx.Log.Info("Starting transfer of media")
sourceStream, err := sourceDs.DownloadFile(record.Location)
if err != nil {
rctx.Log.Error(err)
rctx.Log.Error("Failed to start download from source datastore")
sentry.CaptureException(err)
continue
}
newLocation, err := targetDs.UploadFile(sourceStream, record.SizeBytes, rctx)
if err != nil {
rctx.Log.Error(err)
rctx.Log.Error("Failed to upload file to target datastore")
sentry.CaptureException(err)
continue
}
if newLocation.Sha256Hash != record.Sha256Hash {
rctx.Log.Error("sha256 hash does not match - not moving media")
sentry.CaptureMessage("sha256 hash does not match - not moving media")
targetDs.DeleteObject(newLocation.Location)
continue
}
rctx.Log.Info("Updating media records...")
err = db.ChangeDatastoreOfHash(targetDs.DatastoreId, newLocation.Location, record.Sha256Hash)
if err != nil {
rctx.Log.Error(err)
rctx.Log.Error("Failed to update database records")
sentry.CaptureException(err)
continue
}
rctx.Log.Info("Deleting media from old datastore")
err = sourceDs.DeleteObject(record.Location)
if err != nil {
rctx.Log.Error(err)
rctx.Log.Error("Failed to delete old media")
sentry.CaptureException(err)
continue
}
rctx.Log.Info("Media updated!")
}
}
media, err := db.GetOldMediaInDatastore(sourceDs.DatastoreId, beforeTs)
if err != nil {
ctx.Log.Error(err)
sentry.CaptureException(err)
return
}
doUpdate(media)
thumbs, err := db.GetOldThumbnailsInDatastore(sourceDs.DatastoreId, beforeTs)
if err != nil {
ctx.Log.Error(err)
sentry.CaptureException(err)
return
}
doUpdate(thumbs)
err = db.FinishedBackgroundTask(task.ID)
if err != nil {
ctx.Log.Error(err)
ctx.Log.Error("Failed to flag task as finished")
sentry.CaptureException(err)
}
ctx.Log.Info("Finished transfer")
}()
return task, nil
}
func EstimateDatastoreSizeWithAge(beforeTs int64, datastoreId string, ctx rcontext.RequestContext) (*types.DatastoreMigrationEstimate, error) {
estimates := &types.DatastoreMigrationEstimate{}
seenHashes := make(map[string]bool)
seenMediaHashes := make(map[string]bool)
seenThumbnailHashes := make(map[string]bool)
db := storage.GetDatabase().GetMetadataStore(ctx)
media, err := db.GetOldMediaInDatastore(datastoreId, beforeTs)
if err != nil {
return nil, err
}
for _, record := range media {
estimates.MediaAffected++
if _, found := seenHashes[record.Sha256Hash]; !found {
estimates.TotalBytes += record.SizeBytes
estimates.TotalHashesAffected++
}
if _, found := seenMediaHashes[record.Sha256Hash]; !found {
estimates.MediaBytes += record.SizeBytes
estimates.MediaHashesAffected++
}
seenHashes[record.Sha256Hash] = true
seenMediaHashes[record.Sha256Hash] = true
}
thumbnails, err := db.GetOldThumbnailsInDatastore(datastoreId, beforeTs)
if err != nil {
return nil, err
}
for _, record := range thumbnails {
estimates.ThumbnailsAffected++
if _, found := seenHashes[record.Sha256Hash]; !found {
estimates.TotalBytes += record.SizeBytes
estimates.TotalHashesAffected++
}
if _, found := seenThumbnailHashes[record.Sha256Hash]; !found {
estimates.ThumbnailBytes += record.SizeBytes
estimates.ThumbnailHashesAffected++
}
seenHashes[record.Sha256Hash] = true
seenThumbnailHashes[record.Sha256Hash] = true
}
return estimates, nil
}
func PurgeQuarantined(ctx rcontext.RequestContext) ([]*types.Media, error) { func PurgeQuarantined(ctx rcontext.RequestContext) ([]*types.Media, error) {
mediaDb := storage.GetDatabase().GetMediaStore(ctx) mediaDb := storage.GetDatabase().GetMediaStore(ctx)
records, err := mediaDb.GetAllQuarantinedMedia() records, err := mediaDb.GetAllQuarantinedMedia()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment