From 987f9cc0023f2c62e5123f47590df370cd8af42d Mon Sep 17 00:00:00 2001
From: Travis Ralston <travpc@gmail.com>
Date: Sat, 27 May 2023 21:45:44 -0600
Subject: [PATCH] Re-route direct uploads through new pipeline

For https://github.com/turt2live/matrix-media-repo/issues/411
---
 api/r0/upload.go                              |  61 +++-----
 cmd/media_repo/reloads.go                     |  20 +++
 common/config/watch.go                        |   6 +-
 common/errors.go                              |   1 +
 common/globals/reload.go                      |   1 +
 common/logging/logger.go                      |  11 +-
 common/media_kinds.go                         |  18 +--
 common/runtime/init.go                        |   4 +
 .../data_controller/export_controller.go      |   4 +-
 .../thumbnail_resource_handler.go             |   2 +-
 .../upload_controller/upload_controller.go    |   4 +-
 database/db.go                                |   6 +-
 database/table_media.go                       |  57 ++++++-
 database/virtualtable_metadata.go             |  48 ++++++
 datastores/buffer.go                          | 114 ++++++++++++++
 datastores/delete.go                          |  37 +++++
 datastores/kind.go                            |  21 +++
 datastores/pick.go                            |  41 ++++++
 datastores/s3.go                              |  60 ++++++++
 datastores/upload.go                          | 139 ++++++++++++++++++
 go.mod                                        |  30 ++--
 go.sum                                        | 128 +++++++++++++---
 pipline/_steps/quota/check.go                 |  95 +++++++-----
 pipline/_steps/upload/deduplicate.go          |  30 ++++
 .../upload/limit.go}                          |   4 +-
 pipline/_steps/upload/lock.go                 |  28 ++++
 .../upload/quarantine.go}                     |   9 +-
 pipline/upload_pipeline/pipeline.go           | 117 ++++++++++-----
 pipline/upload_pipeline/step_buffer.go        |  11 --
 pipline/upload_pipeline/step_gen_media_id.go  |  45 ------
 pipline/upload_pipeline/step_hash.go          |  12 --
 pool/init.go                                  |  26 ++++
 pool/queue.go                                 |  36 +++++
 redis_cache/redis.go                          |   2 +-
 redislib/connection.go                        |  68 +++++++++
 redislib/locking.go                           |  16 ++
 storage/datastore/datastore.go                |   2 +-
 storage/datastore/ds_s3/s3_store.go           |   2 +-
 38 files changed, 1077 insertions(+), 239 deletions(-)
 create mode 100644 database/virtualtable_metadata.go
 create mode 100644 datastores/buffer.go
 create mode 100644 datastores/delete.go
 create mode 100644 datastores/kind.go
 create mode 100644 datastores/pick.go
 create mode 100644 datastores/s3.go
 create mode 100644 datastores/upload.go
 create mode 100644 pipline/_steps/upload/deduplicate.go
 rename pipline/{upload_pipeline/step_limited.go => _steps/upload/limit.go} (66%)
 create mode 100644 pipline/_steps/upload/lock.go
 rename pipline/{upload_pipeline/step_check_quarantine.go => _steps/upload/quarantine.go} (50%)
 delete mode 100644 pipline/upload_pipeline/step_buffer.go
 delete mode 100644 pipline/upload_pipeline/step_gen_media_id.go
 delete mode 100644 pipline/upload_pipeline/step_hash.go
 create mode 100644 pool/init.go
 create mode 100644 pool/queue.go
 create mode 100644 redislib/connection.go
 create mode 100644 redislib/locking.go

diff --git a/api/r0/upload.go b/api/r0/upload.go
index 245b15ee..f9003166 100644
--- a/api/r0/upload.go
+++ b/api/r0/upload.go
@@ -4,6 +4,9 @@ import (
 	"github.com/getsentry/sentry-go"
 	"github.com/turt2live/matrix-media-repo/api/_apimeta"
 	"github.com/turt2live/matrix-media-repo/api/_responses"
+	"github.com/turt2live/matrix-media-repo/datastores"
+	"github.com/turt2live/matrix-media-repo/pipline/upload_pipeline"
+	"github.com/turt2live/matrix-media-repo/util"
 	"github.com/turt2live/matrix-media-repo/util/stream_util"
 
 	"io"
@@ -13,9 +16,7 @@ import (
 	"github.com/sirupsen/logrus"
 	"github.com/turt2live/matrix-media-repo/common"
 	"github.com/turt2live/matrix-media-repo/common/rcontext"
-	"github.com/turt2live/matrix-media-repo/controllers/info_controller"
 	"github.com/turt2live/matrix-media-repo/controllers/upload_controller"
-	"github.com/turt2live/matrix-media-repo/quota"
 )
 
 type MediaUploadedResponse struct {
@@ -36,57 +37,43 @@ func UploadMedia(r *http.Request, rctx rcontext.RequestContext, user _apimeta.Us
 		contentType = "application/octet-stream" // binary
 	}
 
+	// TODO: Move to new function - https://github.com/turt2live/matrix-media-repo/issues/411
 	if upload_controller.IsRequestTooLarge(r.ContentLength, r.Header.Get("Content-Length"), rctx) {
 		io.Copy(io.Discard, r.Body) // Ditch the entire request
 		return _responses.RequestTooLarge()
 	}
 
+	// TODO: Move to new function - https://github.com/turt2live/matrix-media-repo/issues/411
 	if upload_controller.IsRequestTooSmall(r.ContentLength, r.Header.Get("Content-Length"), rctx) {
 		io.Copy(io.Discard, r.Body) // Ditch the entire request
 		return _responses.RequestTooSmall()
 	}
 
-	inQuota, err := quota.IsUserWithinQuota(rctx, user.UserId)
+	media, err := upload_pipeline.UploadMedia(rctx, r.Host, "", r.Body, contentType, filename, user.UserId, datastores.LocalMediaKind)
 	if err != nil {
-		io.Copy(io.Discard, r.Body) // Ditch the entire request
-		rctx.Log.Error("Unexpected error checking quota: " + err.Error())
-		sentry.CaptureException(err)
-		return _responses.InternalServerError("Unexpected Error")
-	}
-	if !inQuota {
-		io.Copy(io.Discard, r.Body) // Ditch the entire request
-		return _responses.QuotaExceeded()
-	}
-
-	contentLength := upload_controller.EstimateContentLength(r.ContentLength, r.Header.Get("Content-Length"))
-
-	media, err := upload_controller.UploadMedia(r.Body, contentLength, contentType, filename, user.UserId, r.Host, rctx)
-	if err != nil {
-		io.Copy(io.Discard, r.Body) // Ditch the entire request
-
-		if err == common.ErrMediaQuarantined {
-			return _responses.BadRequest("This file is not permitted on this server")
+		if err == common.ErrQuotaExceeded {
+			return _responses.QuotaExceeded()
 		}
-
-		rctx.Log.Error("Unexpected error storing media: " + err.Error())
+		rctx.Log.Error("Unexpected error uploading media: " + err.Error())
 		sentry.CaptureException(err)
 		return _responses.InternalServerError("Unexpected Error")
 	}
 
-	if rctx.Config.Features.MSC2448Blurhash.Enabled && r.URL.Query().Get("xyz.amorgan.generate_blurhash") == "true" {
-		hash, err := info_controller.GetOrCalculateBlurhash(media, rctx)
-		if err != nil {
-			rctx.Log.Warn("Failed to calculate blurhash: " + err.Error())
-			sentry.CaptureException(err)
-		}
-
-		return &MediaUploadedResponse{
-			ContentUri: media.MxcUri(),
-			Blurhash:   hash,
-		}
-	}
-
 	return &MediaUploadedResponse{
-		ContentUri: media.MxcUri(),
+		ContentUri: util.MxcUri(media.Origin, media.MediaId),
+		//Blurhash:   "", // TODO: Re-add blurhash support - https://github.com/turt2live/matrix-media-repo/issues/411
 	}
+
+	//if rctx.Config.Features.MSC2448Blurhash.Enabled && r.URL.Query().Get("xyz.amorgan.generate_blurhash") == "true" {
+	//	hash, err := info_controller.GetOrCalculateBlurhash(media, rctx)
+	//	if err != nil {
+	//		rctx.Log.Warn("Failed to calculate blurhash: " + err.Error())
+	//		sentry.CaptureException(err)
+	//	}
+	//
+	//	return &MediaUploadedResponse{
+	//		ContentUri: media.MxcUri(),
+	//		Blurhash:   hash,
+	//	}
+	//}
 }
diff --git a/cmd/media_repo/reloads.go b/cmd/media_repo/reloads.go
index f7881eed..ebae6e7b 100644
--- a/cmd/media_repo/reloads.go
+++ b/cmd/media_repo/reloads.go
@@ -9,6 +9,8 @@ import (
 	"github.com/turt2live/matrix-media-repo/internal_cache"
 	"github.com/turt2live/matrix-media-repo/metrics"
 	"github.com/turt2live/matrix-media-repo/plugins"
+	"github.com/turt2live/matrix-media-repo/pool"
+	"github.com/turt2live/matrix-media-repo/redislib"
 	"github.com/turt2live/matrix-media-repo/tasks"
 )
 
@@ -21,6 +23,7 @@ func setupReloads() {
 	reloadAccessTokensOnChan(globals.AccessTokenReloadChan)
 	reloadCacheOnChan(globals.CacheReplaceChan)
 	reloadPluginsOnChan(globals.PluginReloadChan)
+	reloadPoolOnChan(globals.PoolReloadChan)
 }
 
 func stopReloads() {
@@ -33,6 +36,7 @@ func stopReloads() {
 	globals.RecurringTasksReloadChan <- false
 	globals.CacheReplaceChan <- false
 	globals.PluginReloadChan <- false
+	globals.PoolReloadChan <- false
 }
 
 func reloadWebOnChan(reloadChan chan bool) {
@@ -126,8 +130,10 @@ func reloadCacheOnChan(reloadChan chan bool) {
 		for {
 			shouldReload := <-reloadChan
 			if shouldReload {
+				redislib.Reconnect()
 				internal_cache.ReplaceInstance()
 			} else {
+				redislib.Stop()
 				internal_cache.Get().Stop()
 			}
 		}
@@ -147,3 +153,17 @@ func reloadPluginsOnChan(reloadChan chan bool) {
 		}
 	}()
 }
+
+func reloadPoolOnChan(reloadChan chan bool) {
+	go func() {
+		defer close(reloadChan)
+		for {
+			shouldReload := <-reloadChan
+			if shouldReload {
+				pool.AdjustSize()
+			} else {
+				pool.Drain()
+			}
+		}
+	}()
+}
diff --git a/common/config/watch.go b/common/config/watch.go
index bf455c43..843037d2 100644
--- a/common/config/watch.go
+++ b/common/config/watch.go
@@ -3,10 +3,9 @@ package config
 import (
 	"time"
 
-	"github.com/getsentry/sentry-go"
-
 	"github.com/bep/debounce"
 	"github.com/fsnotify/fsnotify"
+	"github.com/getsentry/sentry-go"
 	"github.com/sirupsen/logrus"
 	"github.com/turt2live/matrix-media-repo/common/globals"
 )
@@ -62,6 +61,9 @@ func onFileChanged() {
 	PrintDomainInfo()
 	CheckDeprecations()
 
+	logrus.Info("Reloading pool configuration")
+	globals.PoolReloadChan <- true
+
 	bindAddressChange := configNew.General.BindAddress != configNow.General.BindAddress
 	bindPortChange := configNew.General.Port != configNow.General.Port
 	forwardAddressChange := configNew.General.TrustAnyForward != configNow.General.TrustAnyForward
diff --git a/common/errors.go b/common/errors.go
index 5fc0e690..c7712e84 100644
--- a/common/errors.go
+++ b/common/errors.go
@@ -10,3 +10,4 @@ var ErrInvalidHost = errors.New("invalid host")
 var ErrHostNotFound = errors.New("host not found")
 var ErrHostBlacklisted = errors.New("host not allowed")
 var ErrMediaQuarantined = errors.New("media quarantined")
+var ErrQuotaExceeded = errors.New("quota exceeded")
diff --git a/common/globals/reload.go b/common/globals/reload.go
index ae2870d4..f09e1fdf 100644
--- a/common/globals/reload.go
+++ b/common/globals/reload.go
@@ -8,3 +8,4 @@ var RecurringTasksReloadChan = make(chan bool)
 var AccessTokenReloadChan = make(chan bool)
 var CacheReplaceChan = make(chan bool)
 var PluginReloadChan = make(chan bool)
+var PoolReloadChan = make(chan bool)
diff --git a/common/logging/logger.go b/common/logging/logger.go
index d9d5c098..16553fcd 100644
--- a/common/logging/logger.go
+++ b/common/logging/logger.go
@@ -78,22 +78,23 @@ func Setup(dir string, colors bool, json bool, level string) error {
 	return nil
 }
 
-type GoMigrateLogger struct {
+type SendToDebugLogger struct {
 	gomigrate.Logger
+	//ants.Logger
 }
 
-func (*GoMigrateLogger) Print(v ...interface{}) {
+func (*SendToDebugLogger) Print(v ...interface{}) {
 	logrus.Debug(v...)
 }
 
-func (*GoMigrateLogger) Printf(format string, v ...interface{}) {
+func (*SendToDebugLogger) Printf(format string, v ...interface{}) {
 	logrus.Debugf(format, v...)
 }
 
-func (*GoMigrateLogger) Println(v ...interface{}) {
+func (*SendToDebugLogger) Println(v ...interface{}) {
 	logrus.Debugln(v...)
 }
 
-func (*GoMigrateLogger) Fatalf(format string, v ...interface{}) {
+func (*SendToDebugLogger) Fatalf(format string, v ...interface{}) {
 	logrus.Fatalf(format, v...)
 }
diff --git a/common/media_kinds.go b/common/media_kinds.go
index 5421b051..a4395434 100644
--- a/common/media_kinds.go
+++ b/common/media_kinds.go
@@ -1,20 +1,20 @@
 package common
 
-const KindLocalMedia = "local_media"
-const KindRemoteMedia = "remote_media"
-const KindThumbnails = "thumbnails"
-const KindArchives = "archives"
-const KindAll = "all"
+type Kind string
 
-var AllKinds = []string{KindLocalMedia, KindRemoteMedia, KindThumbnails}
+const KindLocalMedia Kind = "local_media"
+const KindRemoteMedia Kind = "remote_media"
+const KindThumbnails Kind = "thumbnails"
+const KindArchives Kind = "archives"
+const KindAll Kind = "all"
 
-func IsKind(have string, want string) bool {
+func IsKind(have Kind, want Kind) bool {
 	return have == want || have == KindAll
 }
 
-func HasKind(have []string, want string) bool {
+func HasKind(have []string, want Kind) bool {
 	for _, k := range have {
-		if IsKind(k, want) {
+		if IsKind(Kind(k), want) {
 			return true
 		}
 	}
diff --git a/common/runtime/init.go b/common/runtime/init.go
index e79b745d..ed6875a3 100644
--- a/common/runtime/init.go
+++ b/common/runtime/init.go
@@ -3,6 +3,8 @@ package runtime
 import (
 	"github.com/getsentry/sentry-go"
 	"github.com/turt2live/matrix-media-repo/database"
+	"github.com/turt2live/matrix-media-repo/pool"
+	"github.com/turt2live/matrix-media-repo/redislib"
 	"github.com/turt2live/matrix-media-repo/util/ids"
 
 	"github.com/sirupsen/logrus"
@@ -20,6 +22,8 @@ func RunStartupSequence() {
 	LoadDatabase()
 	LoadDatastores()
 	plugins.ReloadPlugins()
+	pool.Init()
+	redislib.Reconnect()
 }
 
 func LoadDatabase() {
diff --git a/controllers/data_controller/export_controller.go b/controllers/data_controller/export_controller.go
index 2b50affc..95e2053d 100644
--- a/controllers/data_controller/export_controller.go
+++ b/controllers/data_controller/export_controller.go
@@ -71,7 +71,7 @@ func StartServerExport(serverName string, s3urls bool, includeData bool, ctx rco
 		ctx.Context = context.Background()
 		db := storage.GetDatabase().GetMetadataStore(ctx)
 
-		ds, err := datastore.PickDatastore(common.KindArchives, ctx)
+		ds, err := datastore.PickDatastore(string(common.KindArchives), ctx)
 		if err != nil {
 			ctx.Log.Error(err)
 			sentry.CaptureException(err)
@@ -124,7 +124,7 @@ func StartUserExport(userId string, s3urls bool, includeData bool, ctx rcontext.
 		ctx.Context = context.Background()
 		db := storage.GetDatabase().GetMetadataStore(ctx)
 
-		ds, err := datastore.PickDatastore(common.KindArchives, ctx)
+		ds, err := datastore.PickDatastore(string(common.KindArchives), ctx)
 		if err != nil {
 			ctx.Log.Error(err)
 			sentry.CaptureException(err)
diff --git a/controllers/thumbnail_controller/thumbnail_resource_handler.go b/controllers/thumbnail_controller/thumbnail_resource_handler.go
index d7afe9f0..51c5e5c2 100644
--- a/controllers/thumbnail_controller/thumbnail_resource_handler.go
+++ b/controllers/thumbnail_controller/thumbnail_resource_handler.go
@@ -198,7 +198,7 @@ func GenerateThumbnail(media *types.Media, width int, height int, method string,
 		return nil, err
 	}
 
-	ds, err := datastore.PickDatastore(common.KindThumbnails, ctx)
+	ds, err := datastore.PickDatastore(string(common.KindThumbnails), ctx)
 	if err != nil {
 		return nil, err
 	}
diff --git a/controllers/upload_controller/upload_controller.go b/controllers/upload_controller/upload_controller.go
index 99c8c58e..69eec8ff 100644
--- a/controllers/upload_controller/upload_controller.go
+++ b/controllers/upload_controller/upload_controller.go
@@ -171,13 +171,13 @@ func checkSpam(contents []byte, filename string, contentType string, userId stri
 	return nil
 }
 
-func StoreDirect(f *AlreadyUploadedFile, contents io.ReadCloser, expectedSize int64, contentType string, filename string, userId string, origin string, mediaId string, kind string, ctx rcontext.RequestContext, filterUserDuplicates bool) (*types.Media, error) {
+func StoreDirect(f *AlreadyUploadedFile, contents io.ReadCloser, expectedSize int64, contentType string, filename string, userId string, origin string, mediaId string, kind common.Kind, ctx rcontext.RequestContext, filterUserDuplicates bool) (*types.Media, error) {
 	var err error
 	var ds *datastore.DatastoreRef
 	var info *types.ObjectInfo
 	var contentBytes []byte
 	if f == nil {
-		dsPicked, err := datastore.PickDatastore(kind, ctx)
+		dsPicked, err := datastore.PickDatastore(string(kind), ctx)
 		if err != nil {
 			return nil, err
 		}
diff --git a/database/db.go b/database/db.go
index b6b05202..477c96de 100644
--- a/database/db.go
+++ b/database/db.go
@@ -19,6 +19,7 @@ type Database struct {
 	ExpiringMedia *expiringMediaTableStatements
 	UserStats     *userStatsTableStatements
 	ReservedMedia *reservedMediaTableStatements
+	MetadataView  *metadataVirtualTableStatements
 }
 
 var instance *Database
@@ -64,7 +65,7 @@ func openDatabase(connectionString string, maxConns int, maxIdleConns int) error
 
 	// Run migrations
 	var migrator *gomigrate.Migrator
-	if migrator, err = gomigrate.NewMigratorWithLogger(d.conn, gomigrate.Postgres{}, config.Runtime.MigrationsPath, &logging.GoMigrateLogger{}); err != nil {
+	if migrator, err = gomigrate.NewMigratorWithLogger(d.conn, gomigrate.Postgres{}, config.Runtime.MigrationsPath, &logging.SendToDebugLogger{}); err != nil {
 		return errors.New("error setting up migrator: " + err.Error())
 	}
 	if err = migrator.Migrate(); err != nil {
@@ -84,6 +85,9 @@ func openDatabase(connectionString string, maxConns int, maxIdleConns int) error
 	if d.ReservedMedia, err = prepareReservedMediaTables(d.conn); err != nil {
 		return errors.New("failed to create reserved media table accessor: " + err.Error())
 	}
+	if d.MetadataView, err = prepareMetadataVirtualTables(d.conn); err != nil {
+		return errors.New("failed to create metadata virtual table accessor: " + err.Error())
+	}
 
 	instance = d
 	return nil
diff --git a/database/table_media.go b/database/table_media.go
index e068ee0a..1c2da8ac 100644
--- a/database/table_media.go
+++ b/database/table_media.go
@@ -22,9 +22,15 @@ type DbMedia struct {
 }
 
 const selectDistinctMediaDatastoreIds = "SELECT DISTINCT datastore_id FROM media;"
+const selectMediaIsQuarantinedByHash = "SELECT quarantined FROM media WHERE quarantined = TRUE AND sha256_hash = $1;"
+const selectMediaByHash = "SELECT origin, media_id, upload_name, content_type, user_id, sha256_hash, size_bytes, creation_ts, quarantined, datastore_id, location FROM media WHERE sha256_hash = $1;"
+const insertMedia = "INSERT INTO media (origin, media_id, upload_name, content_type, user_id, sha256_hash, size_bytes, creation_ts, quarantined, datastore_id, location) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11);"
 
 type mediaTableStatements struct {
 	selectDistinctMediaDatastoreIds *sql.Stmt
+	selectMediaIsQuarantinedByHash  *sql.Stmt
+	selectMediaByHash               *sql.Stmt
+	insertMedia                     *sql.Stmt
 }
 
 type mediaTableWithContext struct {
@@ -39,6 +45,15 @@ func prepareMediaTables(db *sql.DB) (*mediaTableStatements, error) {
 	if stmts.selectDistinctMediaDatastoreIds, err = db.Prepare(selectDistinctMediaDatastoreIds); err != nil {
 		return nil, errors.New("error preparing selectDistinctMediaDatastoreIds: " + err.Error())
 	}
+	if stmts.selectMediaIsQuarantinedByHash, err = db.Prepare(selectMediaIsQuarantinedByHash); err != nil {
+		return nil, errors.New("error preparing selectMediaIsQuarantinedByHash: " + err.Error())
+	}
+	if stmts.selectMediaByHash, err = db.Prepare(selectMediaByHash); err != nil {
+		return nil, errors.New("error preparing selectMediaByHash: " + err.Error())
+	}
+	if stmts.insertMedia, err = db.Prepare(insertMedia); err != nil {
+		return nil, errors.New("error preparing insertMedia: " + err.Error())
+	}
 
 	return stmts, nil
 }
@@ -51,12 +66,15 @@ func (s *mediaTableStatements) Prepare(ctx rcontext.RequestContext) *mediaTableW
 }
 
 func (s *mediaTableWithContext) GetDistinctDatastoreIds() ([]string, error) {
+	results := make([]string, 0)
 	rows, err := s.statements.selectDistinctMediaDatastoreIds.QueryContext(s.ctx)
 	if err != nil {
+		if err == sql.ErrNoRows {
+			return results, nil
+		}
 		return nil, err
 	}
 
-	var results []string
 	for rows.Next() {
 		val := ""
 		if err = rows.Scan(&val); err != nil {
@@ -67,3 +85,40 @@ func (s *mediaTableWithContext) GetDistinctDatastoreIds() ([]string, error) {
 
 	return results, nil
 }
+
+func (s *mediaTableWithContext) IsHashQuarantined(sha256hash string) (bool, error) {
+	// TODO: https://github.com/turt2live/matrix-media-repo/issues/410
+	row := s.statements.selectMediaIsQuarantinedByHash.QueryRowContext(s.ctx, sha256hash)
+	val := false
+	err := row.Scan(&val)
+	if err == sql.ErrNoRows {
+		err = nil
+		val = false
+	}
+	return val, err
+}
+
+func (s *mediaTableWithContext) GetByHash(sha256hash string) ([]*DbMedia, error) {
+	results := make([]*DbMedia, 0)
+	rows, err := s.statements.selectMediaByHash.QueryContext(s.ctx, sha256hash)
+	if err != nil {
+		if err == sql.ErrNoRows {
+			return results, nil
+		}
+		return nil, err
+	}
+	for rows.Next() {
+		val := &DbMedia{}
+		if err = rows.Scan(&val.Origin, &val.MediaId, &val.UploadName, &val.ContentType, &val.UserId, &val.Sha256Hash, &val.SizeBytes, &val.CreationTs, &val.Quarantined, &val.DatastoreId, &val.Location); err != nil {
+			return nil, err
+		}
+		results = append(results, val)
+	}
+
+	return results, nil
+}
+
+func (s *mediaTableWithContext) Insert(record *DbMedia) error {
+	_, err := s.statements.insertMedia.ExecContext(s.ctx, record.Origin, record.MediaId, record.UploadName, record.ContentType, record.UserId, record.Sha256Hash, record.SizeBytes, record.CreationTs, record.Quarantined, record.DatastoreId, record.Location)
+	return err
+}
diff --git a/database/virtualtable_metadata.go b/database/virtualtable_metadata.go
new file mode 100644
index 00000000..bbed7f1d
--- /dev/null
+++ b/database/virtualtable_metadata.go
@@ -0,0 +1,48 @@
+package database
+
+import (
+	"database/sql"
+	"errors"
+
+	"github.com/turt2live/matrix-media-repo/common/rcontext"
+)
+
+const selectEstimatedDatastoreSize = "SELECT COALESCE(SUM(m2.size_bytes), 0) + COALESCE((SELECT SUM(t2.size_bytes) FROM (SELECT DISTINCT t.sha256_hash, MAX(t.size_bytes) AS size_bytes FROM thumbnails AS t WHERE t.datastore_id = $1 GROUP BY t.sha256_hash) AS t2), 0) AS size_total FROM (SELECT DISTINCT m.sha256_hash, MAX(m.size_bytes) AS size_bytes FROM media AS m WHERE m.datastore_id = $1 GROUP BY m.sha256_hash) AS m2;"
+
+type metadataVirtualTableStatements struct {
+	selectEstimatedDatastoreSize *sql.Stmt
+}
+
+type metadataVirtualTableWithContext struct {
+	statements *metadataVirtualTableStatements
+	ctx        rcontext.RequestContext
+}
+
+func prepareMetadataVirtualTables(db *sql.DB) (*metadataVirtualTableStatements, error) {
+	var err error
+	var stmts = &metadataVirtualTableStatements{}
+
+	if stmts.selectEstimatedDatastoreSize, err = db.Prepare(selectEstimatedDatastoreSize); err != nil {
+		return nil, errors.New("error preparing selectEstimatedDatastoreSize: " + err.Error())
+	}
+
+	return stmts, nil
+}
+
+func (s *metadataVirtualTableStatements) Prepare(ctx rcontext.RequestContext) *metadataVirtualTableWithContext {
+	return &metadataVirtualTableWithContext{
+		statements: s,
+		ctx:        ctx,
+	}
+}
+
+func (s *metadataVirtualTableWithContext) EstimateDatastoreSize(datastoreId string) (int64, error) {
+	row := s.statements.selectEstimatedDatastoreSize.QueryRowContext(s.ctx, datastoreId)
+	val := int64(0)
+	err := row.Scan(&val)
+	if err == sql.ErrNoRows {
+		err = nil
+		val = 0
+	}
+	return val, err
+}
diff --git a/datastores/buffer.go b/datastores/buffer.go
new file mode 100644
index 00000000..3ef1221d
--- /dev/null
+++ b/datastores/buffer.go
@@ -0,0 +1,114 @@
+package datastores
+
+import (
+	"bytes"
+	"crypto/sha256"
+	"encoding/hex"
+	"errors"
+	"io"
+	"os"
+	"path"
+
+	"github.com/sirupsen/logrus"
+	"github.com/turt2live/matrix-media-repo/common/config"
+	"github.com/turt2live/matrix-media-repo/util/ids"
+)
+
+func BufferTemp(datastore config.DatastoreConfig, contents io.ReadCloser) (string, int64, io.ReadCloser, error) {
+	fpath := ""
+	var err error
+	if datastore.Type == "s3" {
+		fpath = datastore.Options["tempPath"]
+	} else if datastore.Type == "file" {
+		var id string
+		id, err = ids.NewUniqueId()
+		if err != nil {
+			return "", 0, nil, errors.New("error generating temporary file ID: " + err.Error())
+		}
+		fpath = path.Join(os.TempDir(), id)
+		fpath, err = os.MkdirTemp(fpath, "mmr")
+		if err != nil {
+			return "", 0, nil, errors.New("error generating temporary directory: " + err.Error())
+		}
+	} else {
+		return "", 0, nil, errors.New("unknown datastore type - contact developer")
+	}
+
+	var target io.Writer
+	if fpath == "" {
+		logrus.Warnf("Datastore %s does not have a valid temporary path configured. This will lead to increased memory usage.", datastore.Id)
+		target = &bytes.Buffer{}
+	} else {
+		var file *os.File
+		file, err = os.CreateTemp(fpath, "mmr")
+		target = file
+	}
+
+	// Prepare a sha256 calculation
+	hasher := sha256.New()
+
+	// Build a multi writer, so we can calculate the hash while we also write to a temporary directory
+	mw := io.MultiWriter(hasher, target)
+
+	// Actually copy to the temp file
+	var sizeBytes int64
+	if sizeBytes, err = io.Copy(mw, contents); err != nil {
+		return "", 0, nil, err
+	}
+	if err = contents.Close(); err != nil {
+		return "", 0, nil, err
+	}
+
+	// Utility function for finalizing the hash
+	hash := func() string {
+		return hex.EncodeToString(hasher.Sum(nil))
+	}
+
+	// Close out the file and return a read stream (with cleanup function), or return a copy of the byte buffer
+	if f, ok := target.(*os.File); ok {
+		if err = f.Close(); err != nil {
+			return "", 0, nil, err
+		}
+		f, err = os.Open(f.Name())
+		if err != nil {
+			return "", 0, nil, err
+		}
+		return hash(), sizeBytes, &tempFileCloser{
+			fname:    f.Name(),
+			fpath:    fpath,
+			upstream: f,
+			closed:   false,
+		}, nil
+	} else if b, ok := target.(*bytes.Buffer); ok {
+		return hash(), sizeBytes, io.NopCloser(b), nil
+	} else {
+		return "", 0, nil, errors.New("developer error - did not account for possible stream writer type")
+	}
+}
+
+type tempFileCloser struct {
+	io.ReadCloser
+	fname    string
+	fpath    string
+	upstream io.ReadCloser
+	closed   bool
+}
+
+func (c *tempFileCloser) Close() error {
+	if c.closed {
+		return nil
+	}
+	var err error
+	if err = os.Remove(c.fname); err != nil {
+		return err
+	}
+	if err = os.Remove(c.fpath); err != nil {
+		return err
+	}
+	c.closed = true
+	return c.upstream.Close()
+}
+
+func (c *tempFileCloser) Read(p []byte) (n int, err error) {
+	return c.upstream.Read(p)
+}
diff --git a/datastores/delete.go b/datastores/delete.go
new file mode 100644
index 00000000..5efaab8a
--- /dev/null
+++ b/datastores/delete.go
@@ -0,0 +1,37 @@
+package datastores
+
+import (
+	"errors"
+	"os"
+	"path"
+
+	"github.com/minio/minio-go/v7"
+	"github.com/prometheus/client_golang/prometheus"
+	"github.com/turt2live/matrix-media-repo/common/config"
+	"github.com/turt2live/matrix-media-repo/common/rcontext"
+	"github.com/turt2live/matrix-media-repo/metrics"
+)
+
+func Remove(ctx rcontext.RequestContext, ds config.DatastoreConfig, location string) error {
+	var err error
+	if ds.Type == "s3" {
+		var s3c *s3
+		s3c, err = getS3(ds)
+		if err != nil {
+			return err
+		}
+
+		metrics.S3Operations.With(prometheus.Labels{"operation": "RemoveObject"}).Inc()
+		err = s3c.client.RemoveObject(ctx.Context, s3c.bucket, location, minio.RemoveObjectOptions{})
+	} else if ds.Type == "file" {
+		basePath := ds.Options["path"]
+		err = os.Remove(path.Join(basePath, location))
+		if err != nil && os.IsNotExist(err) {
+			return nil // not existing means it was deleted, as far as we care
+		}
+	} else {
+		return errors.New("unknown datastore type - contact developer")
+	}
+
+	return err
+}
diff --git a/datastores/kind.go b/datastores/kind.go
new file mode 100644
index 00000000..9fecef93
--- /dev/null
+++ b/datastores/kind.go
@@ -0,0 +1,21 @@
+package datastores
+
+type Kind string
+
+const (
+	LocalMediaKind  Kind = "local_media"
+	RemoteMediaKind Kind = "remote_media"
+	ThumbnailsKind  Kind = "thumbnails"
+	ArchivesKind    Kind = "archives"
+	AllKind         Kind = "all"
+)
+
+func HasListedKind(have []string, want Kind) bool {
+	for _, k := range have {
+		k2 := Kind(k)
+		if k2 == want || k2 == AllKind {
+			return true
+		}
+	}
+	return false
+}
diff --git a/datastores/pick.go b/datastores/pick.go
new file mode 100644
index 00000000..748ea0f4
--- /dev/null
+++ b/datastores/pick.go
@@ -0,0 +1,41 @@
+package datastores
+
+import (
+	"errors"
+
+	"github.com/turt2live/matrix-media-repo/common/config"
+	"github.com/turt2live/matrix-media-repo/common/rcontext"
+	"github.com/turt2live/matrix-media-repo/database"
+)
+
+func Pick(ctx rcontext.RequestContext, kind Kind) (config.DatastoreConfig, error) {
+	usable := make([]config.DatastoreConfig, 0)
+	for _, conf := range ctx.Config.DataStores {
+		if !HasListedKind(conf.MediaKinds, kind) {
+			continue
+		}
+		usable = append(usable, conf)
+	}
+
+	if len(usable) < 0 {
+		return config.DatastoreConfig{}, errors.New("unable to locate a usable datastore")
+	}
+	if len(usable) == 1 {
+		return usable[0], nil
+	}
+
+	// Find the smallest datastore, by relative size
+	dsSize := int64(-1)
+	idx := 0
+	db := database.GetInstance().MetadataView.Prepare(ctx)
+	for i, ds := range usable {
+		size, err := db.EstimateDatastoreSize(ds.Id)
+		if err != nil {
+			return config.DatastoreConfig{}, err
+		}
+		if dsSize < 0 || size > dsSize {
+			idx = i
+		}
+	}
+	return usable[idx], nil
+}
diff --git a/datastores/s3.go b/datastores/s3.go
new file mode 100644
index 00000000..38d412e8
--- /dev/null
+++ b/datastores/s3.go
@@ -0,0 +1,60 @@
+package datastores
+
+import (
+	"strconv"
+	"sync"
+
+	"github.com/minio/minio-go/v7"
+	"github.com/minio/minio-go/v7/pkg/credentials"
+	"github.com/turt2live/matrix-media-repo/common/config"
+)
+
+var s3clients = &sync.Map{}
+
+type s3 struct {
+	client       *minio.Client
+	storageClass string
+	bucket       string
+}
+
+func getS3(ds config.DatastoreConfig) (*s3, error) {
+	if val, ok := s3clients.Load(ds.Id); ok {
+		return val.(*s3), nil
+	}
+
+	endpoint := ds.Options["endpoint"]
+	bucket := ds.Options["bucketName"]
+	accessKeyId := ds.Options["accessKeyId"]
+	accessSecret := ds.Options["accessSecret"]
+	region := ds.Options["region"]
+	storageClass, hasStorageClass := ds.Options["storageClass"]
+	useSslStr, hasSsl := ds.Options["ssl"]
+
+	if !hasStorageClass {
+		storageClass = "STANDARD"
+	}
+
+	useSsl := true
+	if hasSsl && useSslStr != "" {
+		useSsl, _ = strconv.ParseBool(useSslStr)
+	}
+
+	var err error
+	var client *minio.Client
+	client, err = minio.New(endpoint, &minio.Options{
+		Region: region,
+		Secure: useSsl,
+		Creds:  credentials.NewStaticV4(accessKeyId, accessSecret, ""),
+	})
+	if err != nil {
+		return nil, err
+	}
+
+	s3c := &s3{
+		client:       client,
+		storageClass: storageClass,
+		bucket:       bucket,
+	}
+	s3clients.Store(ds.Id, s3c)
+	return s3c, nil
+}
diff --git a/datastores/upload.go b/datastores/upload.go
new file mode 100644
index 00000000..d23c52f1
--- /dev/null
+++ b/datastores/upload.go
@@ -0,0 +1,139 @@
+package datastores
+
+import (
+	"crypto/sha256"
+	"encoding/hex"
+	"errors"
+	"fmt"
+	"io"
+	"net/http"
+	"os"
+	"path"
+
+	"github.com/minio/minio-go/v7"
+	"github.com/prometheus/client_golang/prometheus"
+	"github.com/turt2live/matrix-media-repo/common/config"
+	"github.com/turt2live/matrix-media-repo/common/rcontext"
+	"github.com/turt2live/matrix-media-repo/metrics"
+	"github.com/turt2live/matrix-media-repo/util/ids"
+)
+
+func Upload(ctx rcontext.RequestContext, ds config.DatastoreConfig, data io.ReadCloser, size int64, contentType string, sha256hash string) (string, error) {
+	defer data.Close()
+	hasher := sha256.New()
+	tee := io.TeeReader(data, hasher)
+	var objectName string
+	var err error
+
+	var uploadedBytes int64
+	if ds.Type == "s3" {
+		var s3c *s3
+		s3c, err = getS3(ds)
+		if err != nil {
+			return "", err
+		}
+
+		// Ensure unique ID
+		exists := true
+		attempts := 0
+		for exists {
+			objectName, err = ids.NewUniqueId()
+			if err != nil {
+				return "", err
+			}
+
+			attempts++
+			if attempts > 10 {
+				return "", errors.New("failed to generate suitable object name for S3 store")
+			}
+			metrics.S3Operations.With(prometheus.Labels{"operation": "StatObject"}).Inc()
+			_, err = s3c.client.StatObject(ctx.Context, s3c.bucket, objectName, minio.StatObjectOptions{})
+			if err != nil {
+				if merr, ok := err.(minio.ErrorResponse); ok {
+					if merr.Code == "NoSuchKey" || merr.StatusCode == http.StatusNotFound {
+						exists = false
+					}
+				} else {
+					return "", err
+				}
+			}
+		}
+
+		metrics.S3Operations.With(prometheus.Labels{"operation": "PutObject"}).Inc()
+		var info minio.UploadInfo
+		info, err = s3c.client.PutObject(ctx.Context, s3c.bucket, objectName, tee, size, minio.PutObjectOptions{StorageClass: s3c.storageClass, ContentType: contentType})
+		uploadedBytes = info.Size
+	} else if ds.Type == "file" {
+		basePath := ds.Options["path"]
+
+		var firstContainer string
+		var secondContainer string
+		var fileName string
+		var targetDir string
+
+		// Ensure unique ID
+		exists := true
+		attempts := 0
+		for exists {
+			objectName, err = ids.NewUniqueId()
+			if err != nil {
+				return "", err
+			}
+
+			attempts++
+			if attempts > 10 {
+				return "", errors.New("failed to generate suitable file name for persistence")
+			}
+
+			firstContainer = objectName[0:2]
+			secondContainer = objectName[2:4]
+			fileName = objectName[4:]
+			targetDir = path.Join(basePath, firstContainer, secondContainer)
+			objectName = path.Join(targetDir, fileName)
+
+			_, err = os.Stat(objectName)
+			if err != nil && !os.IsNotExist(err) {
+				return "", err
+			} else if err != nil && os.IsNotExist(err) {
+				exists = false
+			}
+		}
+
+		// Persist file
+		var file *os.File
+		if err = os.MkdirAll(targetDir, 0755); err != nil {
+			return "", err
+		}
+		file, err = os.OpenFile(objectName, os.O_WRONLY|os.O_CREATE, 0644)
+		if err != nil {
+			return "", err
+		}
+		uploadedBytes, err = io.Copy(file, tee)
+		if err != nil {
+			return "", err
+		}
+		err = file.Close()
+	} else {
+		return "", errors.New("unknown datastore type - contact developer")
+	}
+
+	if err != nil {
+		return "", err
+	}
+	if uploadedBytes != size {
+		if err = Remove(ctx, ds, objectName); err != nil {
+			ctx.Log.Warn("Error deleting upload (delete attempted due to persistence error): ", err)
+		}
+		return "", errors.New(fmt.Sprintf("upload size mismatch: expected %d got %d bytes", size, uploadedBytes))
+	}
+
+	uploadedHash := hex.EncodeToString(hasher.Sum(nil))
+	if uploadedHash != sha256hash {
+		if err = Remove(ctx, ds, objectName); err != nil {
+			ctx.Log.Warn("Error deleting upload (delete attempted due to persistence error): ", err)
+		}
+		return "", errors.New(fmt.Sprintf("upload hash mismatch: expected %s got %s", sha256hash, uploadedHash))
+	}
+
+	return objectName, nil
+}
diff --git a/go.mod b/go.mod
index 2dcda79f..94c78b1a 100644
--- a/go.mod
+++ b/go.mod
@@ -25,7 +25,6 @@ require (
 	github.com/fsnotify/fsnotify v1.6.0
 	github.com/gabriel-vasile/mimetype v1.4.1
 	github.com/getsentry/sentry-go v0.18.0
-	github.com/go-redis/redis/v9 v9.0.0-rc.2
 	github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0
 	github.com/hashicorp/go-hclog v1.4.0
 	github.com/hashicorp/go-plugin v1.4.8
@@ -43,13 +42,19 @@ require (
 	github.com/ryanuber/go-glob v1.0.0
 	github.com/saintfish/chardet v0.0.0-20230101081208-5e3ef4b5456d
 	github.com/sebest/xff v0.0.0-20210106013422-671bd2870b3a
-	github.com/sirupsen/logrus v1.9.0
-	golang.org/x/crypto v0.6.0
+	github.com/sirupsen/logrus v1.9.2
+	golang.org/x/crypto v0.9.0
 	golang.org/x/image v0.5.0
-	golang.org/x/net v0.7.0
+	golang.org/x/net v0.10.0
 )
 
-require github.com/julienschmidt/httprouter v1.3.0
+require (
+	github.com/go-redsync/redsync/v4 v4.8.1
+	github.com/julienschmidt/httprouter v1.3.0
+	github.com/minio/minio-go/v7 v7.0.55
+	github.com/panjf2000/ants/v2 v2.7.4
+	github.com/redis/go-redis/v9 v9.0.4
+)
 
 require (
 	github.com/Jeffail/gabs v1.4.0 // indirect
@@ -62,14 +67,18 @@ require (
 	github.com/go-errors/errors v1.4.2 // indirect
 	github.com/golang/geo v0.0.0-20210211234256-740aa86cb551 // indirect
 	github.com/golang/protobuf v1.5.2 // indirect
+	github.com/google/uuid v1.3.0 // indirect
 	github.com/gopherjs/gopherjs v0.0.0-20190430165422-3e4dfb77656c // indirect
 	github.com/hajimehoshi/go-mp3 v0.3.4 // indirect
+	github.com/hashicorp/errwrap v1.1.0 // indirect
+	github.com/hashicorp/go-multierror v1.1.1 // indirect
 	github.com/icza/bitio v1.1.0 // indirect
 	github.com/jfreymuth/oggvorbis v1.0.4 // indirect
 	github.com/jfreymuth/vorbis v1.0.2 // indirect
 	github.com/jonboulle/clockwork v0.1.0 // indirect
 	github.com/json-iterator/go v1.1.12 // indirect
-	github.com/klauspost/cpuid/v2 v2.2.3 // indirect
+	github.com/klauspost/compress v1.16.5 // indirect
+	github.com/klauspost/cpuid/v2 v2.2.4 // indirect
 	github.com/kr/pretty v0.3.0 // indirect
 	github.com/mattn/go-colorable v0.1.13 // indirect
 	github.com/mattn/go-isatty v0.0.17 // indirect
@@ -82,11 +91,12 @@ require (
 	github.com/prometheus/client_model v0.3.0 // indirect
 	github.com/prometheus/common v0.39.0 // indirect
 	github.com/prometheus/procfs v0.9.0 // indirect
+	github.com/rs/xid v1.5.0 // indirect
 	github.com/rwcarlsen/goexif v0.0.0-20190401172101-9e8deecbddbd // indirect
 	github.com/smartystreets/assertions v1.0.0 // indirect
-	golang.org/x/sys v0.5.0 // indirect
-	golang.org/x/term v0.5.0 // indirect
-	golang.org/x/text v0.7.0 // indirect
+	golang.org/x/sys v0.8.0 // indirect
+	golang.org/x/term v0.8.0 // indirect
+	golang.org/x/text v0.9.0 // indirect
 	google.golang.org/protobuf v1.28.1 // indirect
 	gopkg.in/yaml.v2 v2.4.0 // indirect
 )
@@ -104,7 +114,7 @@ require (
 	github.com/lestrrat/go-strftime v0.0.0-20180220042222-ba3bf9c1d042 // indirect
 	github.com/mattn/go-sqlite3 v2.0.3+incompatible // indirect
 	github.com/minio/md5-simd v1.1.2 // indirect
-	github.com/minio/sha256-simd v1.0.0 // indirect
+	github.com/minio/sha256-simd v1.0.1 // indirect
 	github.com/mitchellh/go-testing-interface v1.14.1 // indirect
 	github.com/oklog/run v1.1.0 // indirect
 	github.com/peterbourgon/g2s v0.0.0-20170223122336-d4e7ad98afea // indirect
diff --git a/go.sum b/go.sum
index aab53bae..628e0a40 100644
--- a/go.sum
+++ b/go.sum
@@ -22,12 +22,17 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
 github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
 github.com/bep/debounce v1.2.1 h1:v67fRdBA9UQu2NhLFXrSg0Brw7CexQekrBwDMM8bzeY=
 github.com/bep/debounce v1.2.1/go.mod h1:H8yggRPQKLUhUoqrJC1bO2xNya7vanpDl7xR3ISbCJ0=
+github.com/bsm/ginkgo/v2 v2.5.0/go.mod h1:AiKlXPm7ItEHNc/2+OkrNG4E0ITzojb9/xWzvQ9XZ9w=
+github.com/bsm/ginkgo/v2 v2.7.0 h1:ItPMPH90RbmZJt5GtkcNvIRuGEdwlBItdNVoyzaNQao=
+github.com/bsm/gomega v1.20.0/go.mod h1:JifAceMQ4crZIWYUKrlGcmbN3bqHogVTADMD2ATsbwk=
+github.com/bsm/gomega v1.26.0 h1:LhQm+AFcgV2M0WyKroMASzAzCAJVpAxQXv4SaI9a69Y=
 github.com/buckket/go-blurhash v1.1.0 h1:X5M6r0LIvwdvKiUtiNcRL2YlmOfMzYobI3VCKCZc9Do=
 github.com/buckket/go-blurhash v1.1.0/go.mod h1:aT2iqo5W9vu9GpyoLErKfTHwgODsZp3bQfXjXJUxNb8=
 github.com/bwmarrin/snowflake v0.3.0 h1:xm67bEhkKh6ij1790JB83OujPR5CzNe8QuQqAgISZN0=
 github.com/bwmarrin/snowflake v0.3.0/go.mod h1:NdZxfVWX+oR6y2K0o6qAYv6gIOP9rjG0/E9WsDpxqwE=
 github.com/cenk/backoff v2.2.1+incompatible h1:djdFT7f4gF2ttuzRKPbMOWgZajgesItGLwG5FTQKmmE=
 github.com/cenk/backoff v2.2.1+incompatible/go.mod h1:7FtoeaSnHoZnmZzz47cM35Y9nSW7tNyaidugnHTaFDE=
+github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
 github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
 github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
 github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
@@ -82,6 +87,8 @@ github.com/fatih/color v1.14.1 h1:qfhVLaG5s+nCROl1zJsZRxFeYrHLqWroPOQ8BWiNb4w=
 github.com/fatih/color v1.14.1/go.mod h1:2oHN61fhTpgcxD3TSWCgKDiH1+x4OiDVVGH8WlgGZGg=
 github.com/fogleman/gg v1.3.0 h1:/7zJX8F6AaYQc57WQCyN9cAIz+4bCJGO9B+dyW29am8=
 github.com/fogleman/gg v1.3.0/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
+github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
+github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
 github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=
 github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw=
 github.com/gabriel-vasile/mimetype v1.4.1 h1:TRWk7se+TOjCYgRth7+1/OYLNiRNIotknkFtf/dnN7Q=
@@ -98,10 +105,17 @@ github.com/go-errors/errors v1.0.2/go.mod h1:psDX2osz5VnTOnFWbDeWwS7yejl+uV3FEWE
 github.com/go-errors/errors v1.1.1/go.mod h1:psDX2osz5VnTOnFWbDeWwS7yejl+uV3FEWEp4lssFEs=
 github.com/go-errors/errors v1.4.2 h1:J6MZopCL4uSllY1OfXM374weqZFFItUbrImctkmUxIA=
 github.com/go-errors/errors v1.4.2/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og=
-github.com/go-redis/redis/v9 v9.0.0-rc.2 h1:IN1eI8AvJJeWHjMW/hlFAv2sAfvTun2DVksDDJ3a6a0=
-github.com/go-redis/redis/v9 v9.0.0-rc.2/go.mod h1:cgBknjwcBJa2prbnuHH/4k/Mlj4r0pWNV2HBanHujfY=
+github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg=
+github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
+github.com/go-redis/redis/v7 v7.4.0 h1:7obg6wUoj05T0EpY0o8B59S9w5yeMWql7sw2kwNW1x4=
+github.com/go-redis/redis/v7 v7.4.0/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRfnwdHj/Dcg=
+github.com/go-redis/redis/v8 v8.11.4 h1:kHoYkfZP6+pe04aFTnhDH6GDROa5yJdHJVNxV3F46Tg=
+github.com/go-redis/redis/v8 v8.11.4/go.mod h1:2Z2wHZXdQpCDXEGzqMockDpNyYvi2l4Pxt6RJr792+w=
+github.com/go-redsync/redsync/v4 v4.8.1 h1:rq2RvdTI0obznMdxKUWGdmmulo7lS9yCzb8fgDKOlbM=
+github.com/go-redsync/redsync/v4 v4.8.1/go.mod h1:LmUAsQuQxhzZAoGY7JS6+dNhNmZyonMZiiEDY9plotM=
 github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gGcHOs=
 github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
+github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE=
 github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 h1:DACJavvAHhabrF08vX0COfcOBJRhZ8lUbR+ZWIs0Y5g=
 github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k=
 github.com/golang/geo v0.0.0-20190916061304-5b978397cfec/go.mod h1:QZ0nwyI2jOfgRAoBvP+ab5aRr7c9x7lhGEJrKvBwjWI=
@@ -109,13 +123,28 @@ github.com/golang/geo v0.0.0-20200319012246-673a6f80352d/go.mod h1:QZ0nwyI2jOfgR
 github.com/golang/geo v0.0.0-20210211234256-740aa86cb551 h1:gtexQ/VGyN+VVFRXSFiguSNcXmS6rkKT+X7FdIrTtfo=
 github.com/golang/geo v0.0.0-20210211234256-740aa86cb551/go.mod h1:QZ0nwyI2jOfgRAoBvP+ab5aRr7c9x7lhGEJrKvBwjWI=
 github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
 github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk=
+github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
+github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
+github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
+github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
+github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
+github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
 github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
 github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
 github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
+github.com/gomodule/redigo v1.8.2 h1:H5XSIre1MB5NbPYFp+i1NBbb5qN1W8Y8YAQoAYbkm8k=
+github.com/gomodule/redigo v1.8.2/go.mod h1:P9dn9mFrCBvWhGE1wpxx6fgq7BAeLBk+UUUzlpkBYO0=
+github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
+github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
+github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
 github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
 github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
 github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
+github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
+github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
 github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
 github.com/gopherjs/gopherjs v0.0.0-20190430165422-3e4dfb77656c h1:7lF+Vz0LqiRidnzC1Oq86fpX1q/iEv2KJdrCtttYjT4=
 github.com/gopherjs/gopherjs v0.0.0-20190430165422-3e4dfb77656c/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
@@ -125,12 +154,18 @@ github.com/hajimehoshi/go-mp3 v0.3.4/go.mod h1:fRtZraRFcWb0pu7ok0LqyFhCUrPeMsGRS
 github.com/hajimehoshi/oto v0.6.1/go.mod h1:0QXGEkbuJRohbJaxr7ZQSxnju7hEhseiPx2hrh6raOI=
 github.com/hajimehoshi/oto v0.7.1/go.mod h1:wovJ8WWMfFKvP587mhHgot/MBr4DnNy9m6EepeVGnos=
 github.com/hajimehoshi/oto/v2 v2.3.1/go.mod h1:seWLbgHH7AyUMYKfKYT9pg7PhUu9/SisyJvNTT+ASQo=
+github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
+github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
+github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
 github.com/hashicorp/go-hclog v1.4.0 h1:ctuWFGrhFha8BnnzxqeRGidlEcQkDyL5u8J8t5eA11I=
 github.com/hashicorp/go-hclog v1.4.0/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVHBcfoyhpF5M=
+github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
+github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
 github.com/hashicorp/go-plugin v1.4.8 h1:CHGwpxYDOttQOY7HOWgETU9dyVjOXzniXDqJcYJE1zM=
 github.com/hashicorp/go-plugin v1.4.8/go.mod h1:viDMjcLJuDui6pXb8U4HVfb8AamCWhHGUjr2IrTF67s=
 github.com/hashicorp/yamux v0.1.1 h1:yrQxtgseBDrq9Y652vSRDvsKCJKOUD+GzTS4Y0Y8pvE=
 github.com/hashicorp/yamux v0.1.1/go.mod h1:CtWFDAQgb7dxtzFs4tWbplKIe2jSi3+5vKbgIO0SLnQ=
+github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
 github.com/icza/bitio v1.0.0/go.mod h1:0jGnlLAx8MKMr9VGnn/4YrvZiprkvBelsVIbA9Jjr9A=
 github.com/icza/bitio v1.1.0 h1:ysX4vtldjdi3Ygai5m1cWy4oLkhWTAi+SyO6HC8L9T0=
 github.com/icza/bitio v1.1.0/go.mod h1:0jGnlLAx8MKMr9VGnn/4YrvZiprkvBelsVIbA9Jjr9A=
@@ -162,11 +197,12 @@ github.com/k3a/html2text v1.1.0/go.mod h1:ieEXykM67iT8lTvEWBh6fhpH4B23kB9OMKPdIB
 github.com/kettek/apng v0.0.0-20220823221153-ff692776a607 h1:8tP9cdXzcGX2AvweVVG/lxbI7BSjWbNNUustwJ9dQVA=
 github.com/kettek/apng v0.0.0-20220823221153-ff692776a607/go.mod h1:x78/VRQYKuCftMWS0uK5e+F5RJ7S4gSlESRWI0Prl6Q=
 github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
+github.com/klauspost/compress v1.16.5 h1:IFV2oUNUzZaz+XyusxpLzpzS8Pt5rh0Z16For/djlyI=
+github.com/klauspost/compress v1.16.5/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
 github.com/klauspost/cpuid v1.2.3/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
 github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
-github.com/klauspost/cpuid/v2 v2.0.4/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
-github.com/klauspost/cpuid/v2 v2.2.3 h1:sxCkb+qR91z4vsqw4vGGZlDgPz3G7gjaLyK3V8y70BU=
-github.com/klauspost/cpuid/v2 v2.2.3/go.mod h1:RVVoqg1df56z8g3pUjL/3lE5UfnlrJX8tyFgg4nqhuY=
+github.com/klauspost/cpuid/v2 v2.2.4 h1:acbojRNwl3o09bUq+yDCtZFc1aiwaAAxtcn8YkZXnvk=
+github.com/klauspost/cpuid/v2 v2.2.4/go.mod h1:RVVoqg1df56z8g3pUjL/3lE5UfnlrJX8tyFgg4nqhuY=
 github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
 github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
 github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
@@ -208,9 +244,11 @@ github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34=
 github.com/minio/md5-simd v1.1.2/go.mod h1:MzdKDxYpY2BT9XQFocsiZf/NKVtR7nkE4RoEpN+20RM=
 github.com/minio/minio-go/v6 v6.0.57 h1:ixPkbKkyD7IhnluRgQpGSpHdpvNVaW6OD5R9IAO/9Tw=
 github.com/minio/minio-go/v6 v6.0.57/go.mod h1:5+R/nM9Pwrh0vqF+HbYYDQ84wdUFPyXHkrdT4AIkifM=
+github.com/minio/minio-go/v7 v7.0.55 h1:ZXqUO/8cgfHzI+08h/zGuTTFpISSA32BZmBE3FCLJas=
+github.com/minio/minio-go/v7 v7.0.55/go.mod h1:NUDy4A4oXPq1l2yK6LTSvCEzAMeIcoz9lcj5dbzSrRE=
 github.com/minio/sha256-simd v0.1.1/go.mod h1:B5e1o+1/KgNmWrSQK08Y6Z1Vb5pwIktudl0J58iy0KM=
-github.com/minio/sha256-simd v1.0.0 h1:v1ta+49hkWZyvaKwrQB8elexRqm6Y0aMLjCNsrYxo6g=
-github.com/minio/sha256-simd v1.0.0/go.mod h1:OuYzVNI5vcoYIAmbIvHPl3N3jUzVedXbKy5RFepssQM=
+github.com/minio/sha256-simd v1.0.1 h1:6kaan5IFmwTNynnKKpDHe6FWHohJOHhCPchzK49dzMM=
+github.com/minio/sha256-simd v1.0.1/go.mod h1:Pz6AKMiUdngCLpeTL/RJY1M9rUuPMYujV5xJjtbRSN8=
 github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
 github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
 github.com/mitchellh/go-testing-interface v1.14.1 h1:jrgshOhYAUVNMAJiKbEu7EqAwgJJ2JqpQmpLJOu07cU=
@@ -221,13 +259,22 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJ
 github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
 github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
 github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
-github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
+github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
+github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
 github.com/oklog/run v1.1.0 h1:GEenZ1cK0+q0+wsJew9qUg/DyD8k3JzYsZAi5gYi2mA=
 github.com/oklog/run v1.1.0/go.mod h1:sVPdnTZT1zYwAJeCMu2Th4T21pA3FPOQRfWjQlk7DVU=
 github.com/olebedev/emitter v0.0.0-20190110104742-e8d1457e6aee h1:IquUs3fIykn10zWDIyddanhpTqBvAHMaPnFhQuyYw5U=
 github.com/olebedev/emitter v0.0.0-20190110104742-e8d1457e6aee/go.mod h1:eT2/Pcsim3XBjbvldGiJBvvgiqZkAFyiOJJsDKXs/ts=
-github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
-github.com/onsi/gomega v1.24.1 h1:KORJXNNTzJXzu4ScJWssJfJMnJ+2QJqhoQSRwNlze9E=
+github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
+github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
+github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk=
+github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0=
+github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
+github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
+github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
+github.com/onsi/gomega v1.16.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY=
+github.com/panjf2000/ants/v2 v2.7.4 h1:mJqMDtMckZltyL458pq81IGNfiDhEgzX5s/lhjwPWIM=
+github.com/panjf2000/ants/v2 v2.7.4/go.mod h1:KIBmYG9QQX5U2qzFP/yQJaq/nSb6rahS9iEHkrCMgM8=
 github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
 github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
 github.com/peterbourgon/g2s v0.0.0-20170223122336-d4e7ad98afea h1:sKwxy1H95npauwu8vtF95vG/syrL0p8fSZo/XlDg5gk=
@@ -246,10 +293,15 @@ github.com/prometheus/common v0.39.0 h1:oOyhkDq05hPZKItWVBkJ6g6AtGxi+fy7F4JvUV8u
 github.com/prometheus/common v0.39.0/go.mod h1:6XBZ7lYdLCbkAVhwRsWTZn+IN5AB9F/NXd5w0BbEX0Y=
 github.com/prometheus/procfs v0.9.0 h1:wzCHvIvM5SxWqYvwgVL7yJY8Lz3PKn49KQtpgMYJfhI=
 github.com/prometheus/procfs v0.9.0/go.mod h1:+pB4zwohETzFnmlpe6yd2lSc+0/46IYZRB/chUwxUZY=
+github.com/redis/go-redis/v9 v9.0.2/go.mod h1:/xDTe9EF1LM61hek62Poq2nzQSGj0xSrEtEHbBQevps=
+github.com/redis/go-redis/v9 v9.0.4 h1:FC82T+CHJ/Q/PdyLW++GeCO+Ol59Y4T7R4jbgjvktgc=
+github.com/redis/go-redis/v9 v9.0.4/go.mod h1:WqMKv5vnQbRuZstUwxQI195wHy+t4PuXDOjzMvcuQHk=
 github.com/rifflock/lfshook v0.0.0-20180920164130-b9218ef580f5 h1:mZHayPoR0lNmnHyvtYjDeq0zlVHn9K/ZXoy17ylucdo=
 github.com/rifflock/lfshook v0.0.0-20180920164130-b9218ef580f5/go.mod h1:GEXHk5HgEKCvEIIrSpFI3ozzG5xOKA2DVlEX/gGnewM=
 github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k=
 github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
+github.com/rs/xid v1.5.0 h1:mKX4bl4iPYJtEIxp6CYiUuLQ/8DYMoz0PUdtGgMFRVc=
+github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
 github.com/rubyist/circuitbreaker v2.2.1+incompatible h1:KUKd/pV8Geg77+8LNDwdow6rVCAYOp8+kHUyFvL6Mhk=
 github.com/rubyist/circuitbreaker v2.2.1+incompatible/go.mod h1:Ycs3JgJADPuzJDwffe12k6BZT8hxVi6lFK+gWYJLN4A=
 github.com/rwcarlsen/goexif v0.0.0-20190401172101-9e8deecbddbd h1:CmH9+J6ZSsIjUK3dcGsnCnO41eRBOnY12zwkn5qVwgc=
@@ -261,8 +313,8 @@ github.com/saintfish/chardet v0.0.0-20230101081208-5e3ef4b5456d/go.mod h1:uugorj
 github.com/sebest/xff v0.0.0-20210106013422-671bd2870b3a h1:iLcLb5Fwwz7g/DLK89F+uQBDeAhHhwdzB5fSlVdhGcM=
 github.com/sebest/xff v0.0.0-20210106013422-671bd2870b3a/go.mod h1:wozgYq9WEBQBaIJe4YZ0qTSFAMxmcwBhQH0fO0R34Z0=
 github.com/sirupsen/logrus v1.5.0/go.mod h1:+F7Ogzej0PZc/94MaYx/nvG9jOFMD2osvC3s+Squfpo=
-github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0=
-github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
+github.com/sirupsen/logrus v1.9.2 h1:oxx1eChJGI6Uks2ZC4W1zpLlVgqB8ner4EuQwV4Ik1Y=
+github.com/sirupsen/logrus v1.9.2/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
 github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
 github.com/smartystreets/assertions v1.0.0 h1:UVQPSSmc3qtTi+zPPkCXvZX9VvW/xT/NsRvKfwY81a8=
 github.com/smartystreets/assertions v1.0.0/go.mod h1:kHHU4qYBaI3q23Pp3VPrmWhuIUrLW/7eUrw0BU5VaoM=
@@ -270,11 +322,19 @@ github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a/go.mod h1:s
 github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s=
 github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
 github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
+github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
 github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
 github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
+github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
 github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
 github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals=
+github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
 github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
+github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
+github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203 h1:QVqDTf3h2WHt08YuiTGPZLls0Wq99X9bWd0Q5ZSBesM=
+github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203/go.mod h1:oqN97ltKNihBbwlX8dLpwxCl3+HnXKV/R0e+sRLd9C8=
 github.com/tebeka/strftime v0.1.3 h1:5HQXOqWKYRFfNyBMNVc9z5+QzuBtIXy03psIhtdJYto=
 github.com/tebeka/strftime v0.1.3/go.mod h1:7wJm3dZlpr4l/oVK0t1HYIc4rMzQ2XJlOMIUJUJH6XQ=
 github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
@@ -284,8 +344,8 @@ golang.org/x/crypto v0.0.0-20190513172903-22d7a77e9e5f/go.mod h1:yigFU9vqHzYiE8U
 golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
 golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
 golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
-golang.org/x/crypto v0.6.0 h1:qfktjS5LUO+fFKeJXZ+ikTRijMmljikvG68fpMMruSc=
-golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58=
+golang.org/x/crypto v0.9.0 h1:LF6fAI+IutBocDJ2OT0Q1g8plpYljMZ4+lty+dsqw3g=
+golang.org/x/crypto v0.9.0/go.mod h1:yrmDGqONDYtNj3tH8X9dzUun2m2lzPa9ngI6/RUPGR0=
 golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
 golang.org/x/image v0.0.0-20190220214146-31aff87c08e9/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
 golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
@@ -295,39 +355,53 @@ golang.org/x/image v0.5.0/go.mod h1:FVC7BI/5Ym8R25iw5OLsgshdUBbT1h5jZTpA+mvAdZ4=
 golang.org/x/mobile v0.0.0-20190415191353-3e0bab5405d6/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o=
 golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
 golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
+golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
 golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
 golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
 golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
 golang.org/x/net v0.0.0-20190522155817-f3200d17e092/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks=
 golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20200320220750-118fecf932d8/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20200501053045-e0ff5e5a1de5/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
 golang.org/x/net v0.0.0-20200513185701-a91f0712d120/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
+golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
 golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
 golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
+golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk=
 golang.org/x/net v0.0.0-20210916014120-12bc252f5db8/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
 golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
 golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
 golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
 golang.org/x/net v0.0.0-20221002022538-bcab6841153b/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk=
-golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g=
 golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
+golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M=
+golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
+golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
+golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20190429190828-d89cdac9e872/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20190626150813-e07cf5db2756/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20191010194322-b09406accb47/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -345,23 +419,28 @@ golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBc
 golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20220928140112-f11e5e49a4ec/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU=
 golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU=
+golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
 golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
-golang.org/x/term v0.5.0 h1:n2a8QNdAb0sZNpU9R1ALUXBbY+w51fCQDN+7EdxNBsY=
 golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
+golang.org/x/term v0.8.0 h1:n5xxQn2i3PC0yLAbjTpNT85q/Kgzcr2gIoX9OrJUols=
+golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo=
 golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
 golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
 golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
 golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
-golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo=
 golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
+golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE=
+golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
 golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=
 golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
 golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
 golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
 golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
 golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0=
 golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
 golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
@@ -372,18 +451,29 @@ google.golang.org/genproto v0.0.0-20230216225411-c8e22ba71e44 h1:EfLuoKW5WfkgVdD
 google.golang.org/genproto v0.0.0-20230216225411-c8e22ba71e44/go.mod h1:8B0gmkoRebU8ukX6HP+4wrVQUY1+6PkQ44BSyIlflHA=
 google.golang.org/grpc v1.53.0 h1:LAv2ds7cmFV/XTS3XG1NneeENYrXGmorPxsBbptIjNc=
 google.golang.org/grpc v1.53.0/go.mod h1:OnIrk0ipVdj4N5d9IUoFUx72/VlD7+jUsHwZgwSMQpw=
+google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
+google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
+google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
+google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
+google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
+google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
 google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
 google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
 google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w=
 google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
 gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
 gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
+gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
 gopkg.in/ini.v1 v1.42.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
 gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA=
 gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
-gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
+gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
+gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
diff --git a/pipline/_steps/quota/check.go b/pipline/_steps/quota/check.go
index 4cfff0d1..878f2850 100644
--- a/pipline/_steps/quota/check.go
+++ b/pipline/_steps/quota/check.go
@@ -4,6 +4,7 @@ import (
 	"errors"
 
 	"github.com/ryanuber/go-glob"
+	"github.com/turt2live/matrix-media-repo/common"
 	"github.com/turt2live/matrix-media-repo/common/rcontext"
 	"github.com/turt2live/matrix-media-repo/database"
 )
@@ -15,58 +16,80 @@ const (
 	MaxPending Type = 1
 )
 
-var ErrQuotaExceeded = errors.New("quota exceeded")
-
 func Check(ctx rcontext.RequestContext, userId string, quotaType Type) error {
+	limit, err := Limit(ctx, userId, quotaType)
+	if err != nil {
+		return err
+	}
+
+	var count int64
+	if quotaType == MaxBytes {
+		if limit < 0 {
+			return nil
+		}
+		count, err = database.GetInstance().UserStats.Prepare(ctx).UserUploadedBytes(userId)
+	} else if quotaType == MaxPending {
+		count, err = database.GetInstance().ExpiringMedia.Prepare(ctx).ByUserCount(userId)
+	} else {
+		return errors.New("missing check for quota type - contact developer")
+	}
+
+	if err != nil {
+		return err
+	}
+	if count < limit {
+		return nil
+	} else {
+		return common.ErrQuotaExceeded
+	}
+}
+
+func CanUpload(ctx rcontext.RequestContext, userId string, bytes int64) error {
+	limit, err := Limit(ctx, userId, MaxBytes)
+	if err != nil {
+		return err
+	}
+	if limit < 0 {
+		return nil
+	}
+
+	count, err := database.GetInstance().UserStats.Prepare(ctx).UserUploadedBytes(userId)
+	if err != nil {
+		return err
+	}
+
+	if (count + bytes) > limit {
+		return common.ErrQuotaExceeded
+	}
+
+	return nil
+}
+
+func Limit(ctx rcontext.RequestContext, userId string, quotaType Type) (int64, error) {
 	if !ctx.Config.Uploads.Quota.Enabled {
-		return checkDefault(ctx, userId, quotaType)
+		return defaultLimit(ctx, quotaType)
 	}
 
 	for _, q := range ctx.Config.Uploads.Quota.UserQuotas {
 		if glob.Glob(q.Glob, userId) {
 			if quotaType == MaxBytes {
-				if q.MaxBytes == 0 {
-					return nil
-				}
-				total, err := database.GetInstance().UserStats.Prepare(ctx).UserUploadedBytes(userId)
-				if err != nil {
-					return err
-				}
-				if total >= q.MaxBytes {
-					return ErrQuotaExceeded
-				}
-				return nil
+				return q.MaxBytes, nil
 			} else if quotaType == MaxPending {
-				count, err := database.GetInstance().ExpiringMedia.Prepare(ctx).ByUserCount(userId)
-				if err != nil {
-					return err
-				}
-				if count < ctx.Config.Uploads.MaxPending {
-					return nil
-				}
-				return ErrQuotaExceeded
+				return q.MaxPending, nil
 			} else {
-				return errors.New("no default for quota type - contact developer")
+				return 0, errors.New("missing glob switch for quota type - contact developer")
 			}
 		}
 	}
 
-	return checkDefault(ctx, userId, quotaType)
+	return defaultLimit(ctx, quotaType)
 }
 
-func checkDefault(ctx rcontext.RequestContext, userId string, quotaType Type) error {
+func defaultLimit(ctx rcontext.RequestContext, quotaType Type) (int64, error) {
 	if quotaType == MaxBytes {
-		return nil
+		return -1, nil
 	} else if quotaType == MaxPending {
-		count, err := database.GetInstance().ExpiringMedia.Prepare(ctx).ByUserCount(userId)
-		if err != nil {
-			return err
-		}
-		if count < ctx.Config.Uploads.MaxPending {
-			return nil
-		}
-		return ErrQuotaExceeded
+		return ctx.Config.Uploads.MaxPending, nil
 	}
-
-	return errors.New("no default for quota type - contact developer")
+	return 0, errors.New("no default for quota type - contact developer")
 }
diff --git a/pipline/_steps/upload/deduplicate.go b/pipline/_steps/upload/deduplicate.go
new file mode 100644
index 00000000..337739e5
--- /dev/null
+++ b/pipline/_steps/upload/deduplicate.go
@@ -0,0 +1,30 @@
+package upload
+
+import (
+	"github.com/turt2live/matrix-media-repo/common/rcontext"
+	"github.com/turt2live/matrix-media-repo/database"
+)
+
+func FindRecord(ctx rcontext.RequestContext, hash string, userId string, contentType string, fileName string) (*database.DbMedia, bool, error) {
+	mediaDb := database.GetInstance().Media.Prepare(ctx)
+	records, err := mediaDb.GetByHash(hash)
+	if err != nil {
+		return nil, false, err
+	}
+	var perfectMatch *database.DbMedia = nil
+	var hashMatch *database.DbMedia = nil
+	for _, r := range records {
+		if hashMatch == nil {
+			hashMatch = r
+		}
+		if r.UserId == userId && r.ContentType == r.ContentType && r.UploadName == fileName {
+			perfectMatch = r
+			break
+		}
+	}
+	if perfectMatch != nil {
+		return perfectMatch, true, nil
+	} else {
+		return hashMatch, false, nil
+	}
+}
diff --git a/pipline/upload_pipeline/step_limited.go b/pipline/_steps/upload/limit.go
similarity index 66%
rename from pipline/upload_pipeline/step_limited.go
rename to pipline/_steps/upload/limit.go
index a110e7a6..df62c265 100644
--- a/pipline/upload_pipeline/step_limited.go
+++ b/pipline/_steps/upload/limit.go
@@ -1,4 +1,4 @@
-package upload_pipeline
+package upload
 
 import (
 	"io"
@@ -6,7 +6,7 @@ import (
 	"github.com/turt2live/matrix-media-repo/common/rcontext"
 )
 
-func limitStreamLength(ctx rcontext.RequestContext, r io.ReadCloser) io.ReadCloser {
+func LimitStream(ctx rcontext.RequestContext, r io.ReadCloser) io.ReadCloser {
 	if ctx.Config.Uploads.MaxSizeBytes > 0 {
 		return io.NopCloser(io.LimitReader(r, ctx.Config.Uploads.MaxSizeBytes))
 	} else {
diff --git a/pipline/_steps/upload/lock.go b/pipline/_steps/upload/lock.go
new file mode 100644
index 00000000..9aaa7529
--- /dev/null
+++ b/pipline/_steps/upload/lock.go
@@ -0,0 +1,28 @@
+package upload
+
+import (
+	"errors"
+	"time"
+
+	"github.com/turt2live/matrix-media-repo/common/rcontext"
+	"github.com/turt2live/matrix-media-repo/redislib"
+)
+
+func LockForUpload(ctx rcontext.RequestContext, hash string) (func() error, error) {
+	mutex := redislib.GetMutex(hash, 5*time.Minute)
+	if mutex != nil {
+		if err := mutex.LockContext(ctx.Context); err != nil {
+			return nil, errors.New("failed to acquire upload lock: " + err.Error())
+		}
+		return func() error {
+			b, err := mutex.UnlockContext(ctx.Context)
+			if !b {
+				ctx.Log.Warn("Did not get quorum on unlock")
+			}
+			return err
+		}, nil
+	} else {
+		ctx.Log.Warn("Continuing upload without lock! Set up Redis to make this warning go away.")
+		return func() error { return nil }, nil
+	}
+}
diff --git a/pipline/upload_pipeline/step_check_quarantine.go b/pipline/_steps/upload/quarantine.go
similarity index 50%
rename from pipline/upload_pipeline/step_check_quarantine.go
rename to pipline/_steps/upload/quarantine.go
index a840ade0..a2daed6a 100644
--- a/pipline/upload_pipeline/step_check_quarantine.go
+++ b/pipline/_steps/upload/quarantine.go
@@ -1,14 +1,13 @@
-package upload_pipeline
+package upload
 
 import (
 	"github.com/turt2live/matrix-media-repo/common"
 	"github.com/turt2live/matrix-media-repo/common/rcontext"
-	"github.com/turt2live/matrix-media-repo/storage"
+	"github.com/turt2live/matrix-media-repo/database"
 )
 
-func checkQuarantineStatus(ctx rcontext.RequestContext, hash string) error {
-	db := storage.GetDatabase().GetMediaStore(ctx)
-	q, err := db.IsQuarantined(hash)
+func CheckQuarantineStatus(ctx rcontext.RequestContext, hash string) error {
+	q, err := database.GetInstance().Media.Prepare(ctx).IsHashQuarantined(hash)
 	if err != nil {
 		return err
 	}
diff --git a/pipline/upload_pipeline/pipeline.go b/pipline/upload_pipeline/pipeline.go
index 6bd4e240..2a125394 100644
--- a/pipline/upload_pipeline/pipeline.go
+++ b/pipline/upload_pipeline/pipeline.go
@@ -1,65 +1,110 @@
 package upload_pipeline
 
 import (
-	"bytes"
-	"errors"
 	"io"
 
+	"github.com/getsentry/sentry-go"
 	"github.com/turt2live/matrix-media-repo/common/rcontext"
-	"github.com/turt2live/matrix-media-repo/types"
-	"github.com/turt2live/matrix-media-repo/util/stream_util"
+	"github.com/turt2live/matrix-media-repo/database"
+	"github.com/turt2live/matrix-media-repo/datastores"
+	"github.com/turt2live/matrix-media-repo/pipline/_steps/quota"
+	"github.com/turt2live/matrix-media-repo/pipline/_steps/upload"
+	"github.com/turt2live/matrix-media-repo/util"
 )
 
-func UploadMedia(ctx rcontext.RequestContext, origin string, mediaId string, r io.ReadCloser, contentType string, fileName string, userId string) (*types.Media, error) {
-	defer stream_util.DumpAndCloseStream(r)
-
+// UploadMedia Media upload. If mediaId is an empty string, one will be generated.
+func UploadMedia(ctx rcontext.RequestContext, origin string, mediaId string, r io.ReadCloser, contentType string, fileName string, userId string, kind datastores.Kind) (*database.DbMedia, error) {
 	// Step 1: Limit the stream's length
-	r = limitStreamLength(ctx, r)
+	r = upload.LimitStream(ctx, r)
+
+	// Step 2: Create a media ID (if needed)
+	if mediaId == "" {
+		var err error
+		mediaId, err = upload.GenerateMediaId(ctx, origin)
+		if err != nil {
+			return nil, err
+		}
+	}
 
-	// Step 2: Buffer the stream
-	b, err := bufferStream(ctx, r)
+	// Step 3: Pick a datastore
+	dsConf, err := datastores.Pick(ctx, kind)
 	if err != nil {
 		return nil, err
 	}
 
-	// Create a utility function for getting at the buffer easily
-	stream := func() io.ReadCloser {
-		return io.NopCloser(bytes.NewBuffer(b))
+	// Step 4: Buffer to the datastore's temporary path
+	sha256hash, sizeBytes, reader, err := datastores.BufferTemp(dsConf, r)
+	if err != nil {
+		return nil, err
+	}
+	defer reader.Close()
+
+	// Step 5: Check quarantine
+	if err = upload.CheckQuarantineStatus(ctx, sha256hash); err != nil {
+		return nil, err
 	}
 
-	// Step 3: Get a hash of the file
-	hash, err := hashFile(ctx, stream())
+	// Step 6: Ensure user can upload within quota
+	err = quota.CanUpload(ctx, userId, sizeBytes)
 	if err != nil {
 		return nil, err
 	}
 
-	// Step 4: Check if the media is quarantined
-	err = checkQuarantineStatus(ctx, hash)
+	// Step 7: Acquire a lock on the media hash for uploading
+	unlockFn, err := upload.LockForUpload(ctx, sha256hash)
+	//goland:noinspection GoUnhandledErrorResult
+	defer unlockFn()
 	if err != nil {
 		return nil, err
 	}
 
-	// Step 5: Generate a media ID if we need to
-	if mediaId == "" {
-		mediaId, err = generateMediaID(ctx, origin)
-		if err != nil {
-			return nil, err
+	// Step 8: Pull all upload records (to check if an upload has already happened)
+	newRecord := &database.DbMedia{
+		Origin:      origin,
+		MediaId:     mediaId,
+		UploadName:  fileName,
+		ContentType: contentType,
+		UserId:      userId,
+		Sha256Hash:  sha256hash,
+		SizeBytes:   sizeBytes,
+		CreationTs:  util.NowMillis(),
+		Quarantined: false,
+		DatastoreId: "", // Populated later
+		Location:    "", // Populated later
+	}
+	record, perfect, err := upload.FindRecord(ctx, sha256hash, userId, contentType, fileName)
+	if record != nil {
+		// We already had this record in some capacity
+		if perfect {
+			// Exact match - deduplicate, skip upload to datastore
+			return record, nil
+		} else {
+			// We already uploaded it somewhere else - use the datastore ID and location
+			newRecord.Quarantined = record.Quarantined // just in case (shouldn't be a different value by here)
+			newRecord.DatastoreId = record.DatastoreId
+			newRecord.Location = record.Location
+			if err = database.GetInstance().Media.Prepare(ctx).Insert(newRecord); err != nil {
+				return nil, err
+			}
+			return newRecord, nil
 		}
 	}
 
-	// Step 6: De-duplicate the media
-	// TODO: Implementation. Check to see if uploading is required, also if the user has already uploaded a copy.
-
-	// Step 7: Cache the file before uploading
-	// TODO
-
-	// Step 8: Prepare an async job to persist the media
-	// TODO: Implementation. Limit the number of concurrent jobs on this to avoid queue flooding.
-	// TODO: Should this be configurable?
-	// TODO: Handle partial uploads/incomplete uploads.
-
-	// Step 9: Return the media while it gets persisted
-	// TODO
+	// Step 9: Since we didn't find a duplicate, upload it to the datastore
+	dsLocation, err := datastores.Upload(ctx, dsConf, reader, sizeBytes, contentType, sha256hash)
+	if err != nil {
+		return nil, err
+	}
 
-	return nil, errors.New("not yet implemented")
+	// Step 10: Everything finally looks good - return some stuff
+	newRecord.DatastoreId = dsConf.Id
+	newRecord.Location = dsLocation
+	if err = database.GetInstance().Media.Prepare(ctx).Insert(newRecord); err != nil {
+		if err2 := datastores.Remove(ctx, dsConf, dsLocation); err2 != nil {
+			sentry.CaptureException(err2)
+			ctx.Log.Warn("Error deleting upload (delete attempted due to persistence error): ", err2)
+		}
+		return nil, err
+	}
+	return newRecord, nil
 }
diff --git a/pipline/upload_pipeline/step_buffer.go b/pipline/upload_pipeline/step_buffer.go
deleted file mode 100644
index 858b9933..00000000
--- a/pipline/upload_pipeline/step_buffer.go
+++ /dev/null
@@ -1,11 +0,0 @@
-package upload_pipeline
-
-import (
-	"io"
-
-	"github.com/turt2live/matrix-media-repo/common/rcontext"
-)
-
-func bufferStream(ctx rcontext.RequestContext, r io.ReadCloser) ([]byte, error) {
-	return io.ReadAll(r)
-}
diff --git a/pipline/upload_pipeline/step_gen_media_id.go b/pipline/upload_pipeline/step_gen_media_id.go
deleted file mode 100644
index 9f09aa4b..00000000
--- a/pipline/upload_pipeline/step_gen_media_id.go
+++ /dev/null
@@ -1,45 +0,0 @@
-package upload_pipeline
-
-import (
-	"errors"
-	"github.com/turt2live/matrix-media-repo/util/ids"
-	"time"
-
-	"github.com/patrickmn/go-cache"
-	"github.com/turt2live/matrix-media-repo/common/rcontext"
-	"github.com/turt2live/matrix-media-repo/storage"
-)
-
-var recentMediaIds = cache.New(30*time.Second, 60*time.Second)
-
-func generateMediaID(ctx rcontext.RequestContext, origin string) (string, error) {
-	metadataDb := storage.GetDatabase().GetMetadataStore(ctx)
-	mediaTaken := true
-	var mediaId string
-	var err error
-	attempts := 0
-	for mediaTaken {
-		attempts += 1
-		if attempts > 10 {
-			return "", errors.New("failed to generate a media ID after 10 rounds")
-		}
-
-		mediaId, err = ids.NewUniqueId()
-
-		// Because we use the current time in the media ID, we don't need to worry about
-		// collisions from the database.
-		if _, present := recentMediaIds.Get(mediaId); present {
-			mediaTaken = true
-			continue
-		}
-
-		mediaTaken, err = metadataDb.IsReserved(origin, mediaId)
-		if err != nil {
-			return "", err
-		}
-	}
-
-	_ = recentMediaIds.Add(mediaId, true, cache.DefaultExpiration)
-
-	return mediaId, nil
-}
diff --git a/pipline/upload_pipeline/step_hash.go b/pipline/upload_pipeline/step_hash.go
deleted file mode 100644
index eedab08c..00000000
--- a/pipline/upload_pipeline/step_hash.go
+++ /dev/null
@@ -1,12 +0,0 @@
-package upload_pipeline
-
-import (
-	"io"
-
-	"github.com/turt2live/matrix-media-repo/common/rcontext"
-	"github.com/turt2live/matrix-media-repo/util/stream_util"
-)
-
-func hashFile(ctx rcontext.RequestContext, r io.ReadCloser) (string, error) {
-	return stream_util.GetSha256HashOfStream(r)
-}
diff --git a/pool/init.go b/pool/init.go
new file mode 100644
index 00000000..4b683249
--- /dev/null
+++ b/pool/init.go
@@ -0,0 +1,26 @@
+package pool
+
+import (
+	"github.com/getsentry/sentry-go"
+	"github.com/sirupsen/logrus"
+	"github.com/turt2live/matrix-media-repo/common/config"
+)
+
+var DownloadQueue *Queue
+
+func Init() {
+	var err error
+	if DownloadQueue, err = NewQueue(config.Get().Downloads.NumWorkers, "downloads"); err != nil {
+		sentry.CaptureException(err)
+		logrus.Error("Error setting up downloads queue")
+		logrus.Fatal(err)
+	}
+}
+
+func AdjustSize() {
+	DownloadQueue.pool.Tune(config.Get().Downloads.NumWorkers)
+}
+
+func Drain() {
+	DownloadQueue.pool.Release()
+}
diff --git a/pool/queue.go b/pool/queue.go
new file mode 100644
index 00000000..aec5a09d
--- /dev/null
+++ b/pool/queue.go
@@ -0,0 +1,36 @@
+package pool
+
+import (
+	"time"
+
+	"github.com/getsentry/sentry-go"
+	"github.com/panjf2000/ants/v2"
+	"github.com/sirupsen/logrus"
+	"github.com/turt2live/matrix-media-repo/common/logging"
+)
+
+type Queue struct {
+	pool *ants.Pool
+}
+
+func NewQueue(workers int, name string) (*Queue, error) {
+	p, err := ants.NewPool(workers, ants.WithOptions(ants.Options{
+		ExpiryDuration:   1 * time.Minute, // worker lifespan when unused
+		PreAlloc:         false,
+		MaxBlockingTasks: 0, // no limit on tasks we can submit
+		Nonblocking:      false,
+		PanicHandler: func(err interface{}) {
+			logrus.Errorf("Panic from internal queue %s", name)
+			logrus.Error(err)
+			if e, ok := err.(error); ok {
+				sentry.CaptureException(e)
+			}
+		},
+		Logger:       &logging.SendToDebugLogger{},
+		DisablePurge: false,
+	}))
+	if err != nil {
+		return nil, err
+	}
+	return &Queue{pool: p}, nil
+}
diff --git a/redis_cache/redis.go b/redis_cache/redis.go
index dadbb879..d9d65fd2 100644
--- a/redis_cache/redis.go
+++ b/redis_cache/redis.go
@@ -7,7 +7,7 @@ import (
 	"io"
 	"time"
 
-	"github.com/go-redis/redis/v9"
+	"github.com/redis/go-redis/v9"
 	"github.com/sirupsen/logrus"
 	"github.com/turt2live/matrix-media-repo/common/config"
 	"github.com/turt2live/matrix-media-repo/common/rcontext"
diff --git a/redislib/connection.go b/redislib/connection.go
new file mode 100644
index 00000000..de03604f
--- /dev/null
+++ b/redislib/connection.go
@@ -0,0 +1,68 @@
+package redislib
+
+import (
+	"sync"
+	"time"
+
+	"github.com/go-redsync/redsync/v4"
+	rsredis "github.com/go-redsync/redsync/v4/redis"
+	"github.com/go-redsync/redsync/v4/redis/goredis/v9"
+	"github.com/redis/go-redis/v9"
+	"github.com/turt2live/matrix-media-repo/common/config"
+)
+
+var connectionLock = &sync.Once{}
+var ring *redis.Ring
+var rs *redsync.Redsync
+var pools = make([]rsredis.Pool, 0)
+var clients = make([]*redis.Client, 0)
+
+func makeConnection() {
+	if ring != nil {
+		return
+	}
+
+	connectionLock.Do(func() {
+		conf := config.Get().Redis
+		if !conf.Enabled {
+			return
+		}
+		addresses := make(map[string]string)
+		for _, c := range conf.Shards {
+			addresses[c.Name] = c.Address
+
+			client := redis.NewClient(&redis.Options{
+				DialTimeout: 10 * time.Second,
+				DB:          conf.DbNum,
+				Addr:        c.Address,
+			})
+			clients = append(clients, client)
+			pools = append(pools, goredis.NewPool(client))
+		}
+		ring = redis.NewRing(&redis.RingOptions{
+			Addrs:       addresses,
+			DialTimeout: 10 * time.Second,
+			DB:          conf.DbNum,
+		})
+		rs = redsync.New(pools...)
+	})
+}
+
+func Reconnect() {
+	Stop()
+	makeConnection()
+}
+
+func Stop() {
+	if ring != nil {
+		_ = ring.Close()
+	}
+	for _, c := range clients {
+		_ = c.Close()
+	}
+	ring = nil
+	rs = nil
+	pools = make([]rsredis.Pool, 0)
+	clients = make([]*redis.Client, 0)
+	connectionLock = &sync.Once{}
+}
diff --git a/redislib/locking.go b/redislib/locking.go
new file mode 100644
index 00000000..9f60a74e
--- /dev/null
+++ b/redislib/locking.go
@@ -0,0 +1,16 @@
+package redislib
+
+import (
+	"time"
+
+	"github.com/go-redsync/redsync/v4"
+)
+
+func GetMutex(key string, expiration time.Duration) *redsync.Mutex {
+	makeConnection()
+	if rs == nil {
+		return nil
+	}
+
+	return rs.NewMutex(key, redsync.WithExpiry(expiration))
+}
diff --git a/storage/datastore/datastore.go b/storage/datastore/datastore.go
index dd6faa6e..f482e77e 100644
--- a/storage/datastore/datastore.go
+++ b/storage/datastore/datastore.go
@@ -102,7 +102,7 @@ func PickDatastore(forKind string, ctx rcontext.RequestContext) (*DatastoreRef,
 	// size of the datastore).
 	var possibleDatastores = make([]config.DatastoreConfig, 0)
 	for _, dsConf := range confDatastores {
-		allowed := common.HasKind(dsConf.MediaKinds, forKind)
+		allowed := common.HasKind(dsConf.MediaKinds, common.Kind(forKind))
 		if !allowed {
 			continue
 		}
diff --git a/storage/datastore/ds_s3/s3_store.go b/storage/datastore/ds_s3/s3_store.go
index 2bf84179..df7b3725 100644
--- a/storage/datastore/ds_s3/s3_store.go
+++ b/storage/datastore/ds_s3/s3_store.go
@@ -7,10 +7,10 @@ import (
 	"strconv"
 	"strings"
 
+	"github.com/minio/minio-go/v6"
 	"github.com/turt2live/matrix-media-repo/util/ids"
 	"github.com/turt2live/matrix-media-repo/util/stream_util"
 
-	"github.com/minio/minio-go/v6"
 	"github.com/pkg/errors"
 	"github.com/prometheus/client_golang/prometheus"
 	"github.com/sirupsen/logrus"
-- 
GitLab