From 5e51abbdcf9145e323bfa7ca5e29e6a4995c2b18 Mon Sep 17 00:00:00 2001 From: Travis Ralston <travpc@gmail.com> Date: Sun, 2 May 2021 18:51:18 -0600 Subject: [PATCH] Early gzip compression work For https://github.com/turt2live/matrix-media-repo/issues/53 --- CHANGELOG.md | 4 + api/r0/download.go | 2 + api/r0/thumbnail.go | 1 + api/webserver/route_handler.go | 11 ++- common/config/conf_min_shared.go | 4 + common/config/models_domain.go | 14 ++- config.sample.yaml | 14 +++ .../thumbnail_resource_handler.go | 30 ++++++- .../upload_controller/upload_controller.go | 30 +++++-- migrations/19_add_compressed_flag_down.sql | 2 + migrations/19_add_compressed_flag_up.sql | 2 + storage/datastore/datastore_ref.go | 3 +- storage/datastore/ds_file/file_store.go | 50 ++--------- storage/datastore/ds_ipfs/ipfs_store.go | 34 +------- storage/datastore/ds_s3/s3_store.go | 86 ++++++------------- storage/stores/media_store.go | 42 ++++++--- storage/stores/thumbnail_store.go | 18 ++-- types/media.go | 1 + types/object_info.go | 4 +- types/thumbnail.go | 1 + util/compression.go | 43 ++++++++++ 21 files changed, 226 insertions(+), 170 deletions(-) create mode 100644 migrations/19_add_compressed_flag_down.sql create mode 100644 migrations/19_add_compressed_flag_up.sql create mode 100644 util/compression.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 88d467c6..f07a9c38 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), In a future version (likely the next), the in-memory cache support will be removed. Instead, please use the Redis caching that is now supported properly by this release, or disable caching if not applicable for your deployment. +### Added + +* Optional configuration for compressing uploads. Enabled by default. + ### Changed * Support the Redis config at the root level of the config, promoting it to a proper feature. diff --git a/api/r0/download.go b/api/r0/download.go index d84c74ec..fb75a00d 100644 --- a/api/r0/download.go +++ b/api/r0/download.go @@ -20,6 +20,7 @@ type DownloadMediaResponse struct { SizeBytes int64 Data io.ReadCloser TargetDisposition string + Compressed bool } func DownloadMedia(r *http.Request, rctx rcontext.RequestContext, user api.UserInfo) interface{} { @@ -79,5 +80,6 @@ func DownloadMedia(r *http.Request, rctx rcontext.RequestContext, user api.UserI SizeBytes: streamedMedia.SizeBytes, Data: streamedMedia.Stream, TargetDisposition: targetDisposition, + Compressed: streamedMedia.KnownMedia.Compressed, } } diff --git a/api/r0/thumbnail.go b/api/r0/thumbnail.go index 7c06aca1..62d9a74c 100644 --- a/api/r0/thumbnail.go +++ b/api/r0/thumbnail.go @@ -101,5 +101,6 @@ func ThumbnailMedia(r *http.Request, rctx rcontext.RequestContext, user api.User SizeBytes: streamedThumbnail.Thumbnail.SizeBytes, Data: streamedThumbnail.Stream, Filename: "thumbnail.png", + Compressed: streamedThumbnail.Thumbnail.Compressed, } } diff --git a/api/webserver/route_handler.go b/api/webserver/route_handler.go index 950e70f9..9e6553dc 100644 --- a/api/webserver/route_handler.go +++ b/api/webserver/route_handler.go @@ -163,6 +163,16 @@ func (h handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } break case *r0.DownloadMediaResponse: + defer result.Data.Close() + result.Data, err = util.DecompressBytesIfNeeded(result.Data, result.Compressed, rcontext.Initial()) + if err != nil { + sentry.CaptureException(err) + contextLog.Warn("Failed to decompress content: " + err.Error()) + statusCode = http.StatusInternalServerError + res = &api.ErrorResponse{Code: common.ErrCodeUnknown, InternalCode: common.ErrCodeUnknown, Message: "Unexpected error"} + break + } + metrics.HttpResponses.With(prometheus.Labels{ "host": r.Host, "action": h.action, @@ -221,7 +231,6 @@ func (h handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } else { w.Header().Set("Content-Disposition", disposition+"; filename*=utf-8''"+url.QueryEscape(fname)) } - defer result.Data.Close() writeResponseData(w, result.Data, result.SizeBytes) return // Prevent sending conflicting responses case *r0.IdenticonResponse: diff --git a/common/config/conf_min_shared.go b/common/config/conf_min_shared.go index 00408488..9eb26090 100644 --- a/common/config/conf_min_shared.go +++ b/common/config/conf_min_shared.go @@ -27,6 +27,10 @@ func NewDefaultMinimumRepoConfig() MinimumRepoConfig { Enabled: false, UserQuotas: []QuotaUserConfig{}, }, + Compression: CompressionConfig{ + Enabled: true, + Level: 6, + }, }, Identicons: IdenticonsConfig{ Enabled: true, diff --git a/common/config/models_domain.go b/common/config/models_domain.go index 14ef0f30..af60bee2 100644 --- a/common/config/models_domain.go +++ b/common/config/models_domain.go @@ -16,11 +16,17 @@ type QuotasConfig struct { UserQuotas []QuotaUserConfig `yaml:"users,flow"` } +type CompressionConfig struct { + Enabled bool `yaml:"enabled"` + Level int `yaml:"level"` +} + type UploadsConfig struct { - MaxSizeBytes int64 `yaml:"maxBytes"` - MinSizeBytes int64 `yaml:"minBytes"` - ReportedMaxSizeBytes int64 `yaml:"reportedMaxBytes"` - Quota QuotasConfig `yaml:"quotas"` + MaxSizeBytes int64 `yaml:"maxBytes"` + MinSizeBytes int64 `yaml:"minBytes"` + ReportedMaxSizeBytes int64 `yaml:"reportedMaxBytes"` + Quota QuotasConfig `yaml:"quotas"` + Compression CompressionConfig `yaml:"compression"'` } type DatastoreConfig struct { diff --git a/config.sample.yaml b/config.sample.yaml index e2aca618..606a1820 100644 --- a/config.sample.yaml +++ b/config.sample.yaml @@ -219,6 +219,20 @@ uploads: # Set this to -1 to indicate that there is no limit. Zero will force the use of maxBytes. #reportedMaxBytes: 104857600 + # Options for controlling compression on uploaded files. Compression is done through GZIP + # and applied to stored files. When a user is downloading a file, the uncompressed version + # will be used. + # + # Compression is not used for IPFS datastores. + compression: + # Set to false to disable all compression attempts. Default enabled (compressed). + enabled: true + + # The level of compression, between 1 and 9 inclusive, to apply to files. 6 is a common + # default and balances compression and speed. 9 will optimize for compression and not + # speed. + level: 6 + # Options for limiting how much content a user can upload. Quotas are applied to content # associated with a user regardless of de-duplication. Quotas which affect remote servers # or users will not take effect. When a user exceeds their quota they will be unable to diff --git a/controllers/thumbnail_controller/thumbnail_resource_handler.go b/controllers/thumbnail_controller/thumbnail_resource_handler.go index 7d084f9e..30954e00 100644 --- a/controllers/thumbnail_controller/thumbnail_resource_handler.go +++ b/controllers/thumbnail_controller/thumbnail_resource_handler.go @@ -4,6 +4,8 @@ import ( "bytes" "fmt" "github.com/getsentry/sentry-go" + "github.com/turt2live/matrix-media-repo/util/cleanup" + "github.com/turt2live/matrix-media-repo/util/util_byte_seeker" "io/ioutil" "strconv" "sync" @@ -46,6 +48,7 @@ type GeneratedThumbnail struct { SizeBytes int64 Animated bool Sha256Hash string + Compressed bool } var resHandlerInstance *thumbnailResourceHandler @@ -97,7 +100,6 @@ func thumbnailWorkFn(request *resource_handler.WorkRequest) (resp *thumbnailResp return &thumbnailResponse{err: err} } - if info.animated != generated.Animated { ctx.Log.Warn("Animation state changed to ", generated.Animated) @@ -118,6 +120,7 @@ func thumbnailWorkFn(request *resource_handler.WorkRequest) (resp *thumbnailResp Location: generated.DatastoreLocation, SizeBytes: generated.SizeBytes, Sha256Hash: generated.Sha256Hash, + Compressed: generated.Compressed, } db := storage.GetDatabase().GetThumbnailStore(ctx) @@ -159,6 +162,13 @@ func GenerateThumbnail(media *types.Media, width int, height int, method string, ctx.Log.Error("Error getting file: ", err) return nil, err } + defer cleanup.DumpAndCloseStream(mediaStream) + + mediaStream, err = util.DecompressBytesIfNeeded(mediaStream, media.Compressed, ctx) + if err != nil { + ctx.Log.Error("Error decompressing file: ", err) + return nil, err + } mediaContentType := util.FixContentType(media.ContentType) @@ -187,6 +197,7 @@ func GenerateThumbnail(media *types.Media, width int, height int, method string, thumb.DatastoreLocation = media.Location thumb.SizeBytes = media.SizeBytes thumb.Sha256Hash = media.Sha256Hash + thumb.Compressed = media.Compressed ctx.Log.Warn("Image too small, returning raw image") metric.Inc() return thumb, nil @@ -198,6 +209,19 @@ func GenerateThumbnail(media *types.Media, width int, height int, method string, return nil, err } + hash, err := util.GetSha256HashOfStream(util_byte_seeker.NewByteSeeker(b)) + if err != nil { + return nil, err + } + + length := int64(len(b)) + + b, compressed, err := util.CompressBytesIfNeeded(b, ctx) + if err != nil { + return nil, err + } + thumb.Compressed = compressed + ds, err := datastore.PickDatastore(common.KindThumbnails, ctx) if err != nil { return nil, err @@ -212,8 +236,8 @@ func GenerateThumbnail(media *types.Media, width int, height int, method string, thumb.DatastoreLocation = info.Location thumb.DatastoreId = ds.DatastoreId thumb.ContentType = thumbImg.ContentType - thumb.SizeBytes = info.SizeBytes - thumb.Sha256Hash = info.Sha256Hash + thumb.SizeBytes = length + thumb.Sha256Hash = hash metric.Inc() return thumb, nil diff --git a/controllers/upload_controller/upload_controller.go b/controllers/upload_controller/upload_controller.go index bde49d60..ed0e717e 100644 --- a/controllers/upload_controller/upload_controller.go +++ b/controllers/upload_controller/upload_controller.go @@ -198,6 +198,7 @@ func StoreDirect(f *AlreadyUploadedFile, contents io.ReadCloser, expectedSize in var ds *datastore.DatastoreRef var info *types.ObjectInfo var contentBytes []byte + compressed := false if f == nil { dsPicked, err := datastore.PickDatastore(kind, ctx) if err != nil { @@ -210,7 +211,14 @@ func StoreDirect(f *AlreadyUploadedFile, contents io.ReadCloser, expectedSize in return nil, err } - fInfo, err := ds.UploadFile(util.BytesToStream(contentBytes), expectedSize, ctx) + // We don't assign to contentBytes because that is used for antispam & hash checks + var dataBytes []byte + dataBytes, compressed, err = util.CompressBytesIfNeeded(contentBytes, ctx) + if err != nil { + return nil, err + } + + fInfo, err := ds.UploadFile(util.BytesToStream(dataBytes), expectedSize, ctx) if err != nil { return nil, err } @@ -230,15 +238,23 @@ func StoreDirect(f *AlreadyUploadedFile, contents io.ReadCloser, expectedSize in } } + // Do the hash on content, not on the compressed bytes + hash, err := util.GetSha256HashOfStream(util_byte_seeker.NewByteSeeker(contentBytes)) + if err != nil { + ds.DeleteObject(info.Location) // delete temp object + return nil, err + } + ctx.Log.Info("Hash of file is ", hash) + db := storage.GetDatabase().GetMediaStore(ctx) - records, err := db.GetByHash(info.Sha256Hash) + records, err := db.GetByHash(hash) if err != nil { ds.DeleteObject(info.Location) // delete temp object return nil, err } if len(records) > 0 { - ctx.Log.Info("Duplicate media for hash ", info.Sha256Hash) + ctx.Log.Info("Duplicate media for hash ", hash) // If the user is a real user (ie: actually uploaded media), then we'll see if there's // an exact duplicate that we can return. Otherwise we'll just pick the first record and @@ -289,6 +305,7 @@ func StoreDirect(f *AlreadyUploadedFile, contents io.ReadCloser, expectedSize in media.UploadName = filename media.ContentType = contentType media.CreationTs = util.NowMillis() + //media.Compressed = compressed // we'll be using the existing record's flag err = db.Insert(media) if err != nil { @@ -323,7 +340,7 @@ func StoreDirect(f *AlreadyUploadedFile, contents io.ReadCloser, expectedSize in // The media doesn't already exist - save it as new - if info.SizeBytes <= 0 { + if len(contentBytes) <= 0 { ds.DeleteObject(info.Location) return nil, errors.New("file has no contents") } @@ -342,11 +359,12 @@ func StoreDirect(f *AlreadyUploadedFile, contents io.ReadCloser, expectedSize in UploadName: filename, ContentType: contentType, UserId: userId, - Sha256Hash: info.Sha256Hash, - SizeBytes: info.SizeBytes, + Sha256Hash: hash, + SizeBytes: int64(len(contentBytes)), DatastoreId: ds.DatastoreId, Location: info.Location, CreationTs: util.NowMillis(), + Compressed: compressed, } err = db.Insert(media) diff --git a/migrations/19_add_compressed_flag_down.sql b/migrations/19_add_compressed_flag_down.sql new file mode 100644 index 00000000..41708721 --- /dev/null +++ b/migrations/19_add_compressed_flag_down.sql @@ -0,0 +1,2 @@ +ALTER TABLE media DROP COLUMN compressed; +ALTER TABLE thumbnails DROP COLUMN compressed; diff --git a/migrations/19_add_compressed_flag_up.sql b/migrations/19_add_compressed_flag_up.sql new file mode 100644 index 00000000..5a8702a8 --- /dev/null +++ b/migrations/19_add_compressed_flag_up.sql @@ -0,0 +1,2 @@ +ALTER TABLE media ADD COLUMN compressed BOOL NOT NULL DEFAULT FALSE; +ALTER TABLE thumbnails ADD COLUMN compressed BOOL NOT NULL DEFAULT FALSE; diff --git a/storage/datastore/datastore_ref.go b/storage/datastore/datastore_ref.go index f3b8ee88..c408f151 100644 --- a/storage/datastore/datastore_ref.go +++ b/storage/datastore/datastore_ref.go @@ -112,8 +112,7 @@ func (d *DatastoreRef) ObjectExists(location string) bool { func (d *DatastoreRef) OverwriteObject(location string, stream io.ReadCloser, ctx rcontext.RequestContext) error { if d.Type == "file" { - _, _, err := ds_file.PersistFileAtLocation(path.Join(d.Uri, location), stream, ctx) - return err + return ds_file.PersistFileAtLocation(path.Join(d.Uri, location), stream, ctx) } else if d.Type == "s3" { s3, err := ds_s3.GetOrCreateS3Datastore(d.DatastoreId, d.config) if err != nil { diff --git a/storage/datastore/ds_file/file_store.go b/storage/datastore/ds_file/file_store.go index 14c3bc44..19f900a7 100644 --- a/storage/datastore/ds_file/file_store.go +++ b/storage/datastore/ds_file/file_store.go @@ -3,7 +3,6 @@ package ds_file import ( "errors" "io" - "io/ioutil" "os" "path" @@ -55,68 +54,37 @@ func PersistFile(basePath string, file io.ReadCloser, ctx rcontext.RequestContex return nil, err } - sizeBytes, hash, err := PersistFileAtLocation(targetFile, file, ctx) + err = PersistFileAtLocation(targetFile, file, ctx) if err != nil { return nil, err } locationPath := path.Join(primaryContainer, secondaryContainer, fileName) return &types.ObjectInfo{ - Location: locationPath, - Sha256Hash: hash, - SizeBytes: sizeBytes, + Location: locationPath, }, nil } -func PersistFileAtLocation(targetFile string, file io.ReadCloser, ctx rcontext.RequestContext) (int64, string, error) { +func PersistFileAtLocation(targetFile string, file io.ReadCloser, ctx rcontext.RequestContext) error { defer cleanup.DumpAndCloseStream(file) f, err := os.OpenFile(targetFile, os.O_WRONLY|os.O_CREATE, 0644) if err != nil { - return 0, "", err + return err } defer cleanup.DumpAndCloseStream(f) - rfile, wfile := io.Pipe() - tr := io.TeeReader(file, wfile) - - done := make(chan bool) - defer close(done) - - var hash string var sizeBytes int64 - var hashErr error var writeErr error - go func() { - defer wfile.Close() - ctx.Log.Info("Calculating hash of stream...") - hash, hashErr = util.GetSha256HashOfStream(ioutil.NopCloser(tr)) - ctx.Log.Info("Hash of file is ", hash) - done <- true - }() - - go func() { - ctx.Log.Info("Writing file...") - sizeBytes, writeErr = io.Copy(f, rfile) - ctx.Log.Info("Wrote ", sizeBytes, " bytes to file") - done <- true - }() - - for c := 0; c < 2; c++ { - <-done - } - - if hashErr != nil { - defer os.Remove(targetFile) - return 0, "", hashErr - } - + ctx.Log.Info("Writing file...") + sizeBytes, writeErr = io.Copy(f, file) if writeErr != nil { - return 0, "", writeErr + return writeErr } + ctx.Log.Info("Wrote ", sizeBytes, " bytes to file") - return sizeBytes, hash, nil + return nil } func DeletePersistedFile(basePath string, location string) error { diff --git a/storage/datastore/ds_ipfs/ipfs_store.go b/storage/datastore/ds_ipfs/ipfs_store.go index 9f26d029..a77be54e 100644 --- a/storage/datastore/ds_ipfs/ipfs_store.go +++ b/storage/datastore/ds_ipfs/ipfs_store.go @@ -8,7 +8,6 @@ import ( "github.com/turt2live/matrix-media-repo/common/rcontext" "github.com/turt2live/matrix-media-repo/ipfs_proxy" "github.com/turt2live/matrix-media-repo/types" - "github.com/turt2live/matrix-media-repo/util" "github.com/turt2live/matrix-media-repo/util/cleanup" ) @@ -20,43 +19,18 @@ func UploadFile(file io.ReadCloser, ctx rcontext.RequestContext) (*types.ObjectI return nil, err } - done := make(chan bool) - defer close(done) - var cid string - var hash string - var hashErr error var writeErr error - go func() { - ctx.Log.Info("Writing file...") - cid, writeErr = ipfs_proxy.PutObject(bytes.NewBuffer(b), ctx) - ctx.Log.Info("Wrote file to IPFS") - done <- true - }() - - go func() { - ctx.Log.Info("Calculating hash of stream...") - hash, hashErr = util.GetSha256HashOfStream(ioutil.NopCloser(bytes.NewBuffer(b))) - ctx.Log.Info("Hash of file is ", hash) - done <- true - }() - - for c := 0; c < 2; c++ { - <-done - } - - if hashErr != nil { - return nil, hashErr - } + ctx.Log.Info("Writing file...") + cid, writeErr = ipfs_proxy.PutObject(bytes.NewBuffer(b), ctx) if writeErr != nil { return nil, writeErr } + ctx.Log.Info("Wrote file to IPFS") return &types.ObjectInfo{ - Location: "ipfs/" + cid, - Sha256Hash: hash, - SizeBytes: int64(len(b)), + Location: "ipfs/" + cid, }, nil } diff --git a/storage/datastore/ds_s3/s3_store.go b/storage/datastore/ds_s3/s3_store.go index 79824a3c..c48c75ba 100644 --- a/storage/datastore/ds_s3/s3_store.go +++ b/storage/datastore/ds_s3/s3_store.go @@ -25,7 +25,7 @@ type s3Datastore struct { dsId string client *minio.Client bucket string - region string + region string tempPath string } @@ -70,7 +70,7 @@ func GetOrCreateS3Datastore(dsId string, conf config.DatastoreConfig) (*s3Datast dsId: dsId, client: s3client, bucket: bucket, - region: region, + region: region, tempPath: tempPath, } stores[dsId] = s3ds @@ -129,72 +129,38 @@ func (s *s3Datastore) UploadFile(file io.ReadCloser, expectedLength int64, ctx r return nil, err } - var rs3 io.ReadCloser - var ws3 io.WriteCloser - rs3, ws3 = io.Pipe() - tr := io.TeeReader(file, ws3) - - done := make(chan bool) - defer close(done) - - var hash string var sizeBytes int64 - var hashErr error var uploadErr error - go func() { - defer ws3.Close() - ctx.Log.Info("Calculating hash of stream...") - hash, hashErr = util.GetSha256HashOfStream(ioutil.NopCloser(tr)) - ctx.Log.Info("Hash of file is ", hash) - done <- true - }() - - go func() { - if expectedLength <= 0 { - if s.tempPath != "" { - ctx.Log.Info("Buffering file to temp path due to unknown file size") - var f *os.File - f, uploadErr = ioutil.TempFile(s.tempPath, "mr*") - if uploadErr != nil { - io.Copy(ioutil.Discard, rs3) - done <- true - return - } - defer os.Remove(f.Name()) - expectedLength, uploadErr = io.Copy(f, rs3) - cleanup.DumpAndCloseStream(f) - f, uploadErr = os.Open(f.Name()) - if uploadErr != nil { - done <- true - return - } - rs3 = f - defer cleanup.DumpAndCloseStream(f) - } else { - ctx.Log.Warn("Uploading content of unknown length to s3 - this could result in high memory usage") - expectedLength = -1 + if expectedLength <= 0 { + if s.tempPath != "" { + ctx.Log.Info("Buffering file to temp path due to unknown file size") + var f *os.File + f, uploadErr = ioutil.TempFile(s.tempPath, "mr*") + if uploadErr != nil { + io.Copy(ioutil.Discard, file) + return nil, uploadErr + } + defer os.Remove(f.Name()) + expectedLength, uploadErr = io.Copy(f, file) + cleanup.DumpAndCloseStream(f) + f, uploadErr = os.Open(f.Name()) + if uploadErr != nil { + return nil, uploadErr } + file = f + defer cleanup.DumpAndCloseStream(f) + } else { + ctx.Log.Warn("Uploading content of unknown length to s3 - this could result in high memory usage") + expectedLength = -1 } - ctx.Log.Info("Uploading file...") - sizeBytes, uploadErr = s.client.PutObjectWithContext(ctx, s.bucket, objectName, rs3, expectedLength, minio.PutObjectOptions{}) - ctx.Log.Info("Uploaded ", sizeBytes, " bytes to s3") - done <- true - }() - - for c := 0; c < 2; c++ { - <-done } + ctx.Log.Info("Uploading file...") + sizeBytes, uploadErr = s.client.PutObjectWithContext(ctx, s.bucket, objectName, file, expectedLength, minio.PutObjectOptions{}) + ctx.Log.Info("Uploaded ", sizeBytes, " bytes to s3") obj := &types.ObjectInfo{ - Location: objectName, - Sha256Hash: hash, - SizeBytes: sizeBytes, - } - - if hashErr != nil { - s.DeleteObject(obj.Location) - return nil, hashErr + Location: objectName, } if uploadErr != nil { diff --git a/storage/stores/media_store.go b/storage/stores/media_store.go index cf00064c..1505f2bc 100644 --- a/storage/stores/media_store.go +++ b/storage/stores/media_store.go @@ -9,28 +9,28 @@ import ( "github.com/turt2live/matrix-media-repo/types" ) -const selectMedia = "SELECT origin, media_id, upload_name, content_type, user_id, sha256_hash, size_bytes, datastore_id, location, creation_ts, quarantined FROM media WHERE origin = $1 and media_id = $2;" -const selectMediaByHash = "SELECT origin, media_id, upload_name, content_type, user_id, sha256_hash, size_bytes, datastore_id, location, creation_ts, quarantined FROM media WHERE sha256_hash = $1;" -const insertMedia = "INSERT INTO media (origin, media_id, upload_name, content_type, user_id, sha256_hash, size_bytes, datastore_id, location, creation_ts, quarantined) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11);" -const selectOldMedia = "SELECT m.origin, m.media_id, m.upload_name, m.content_type, m.user_id, m.sha256_hash, m.size_bytes, m.datastore_id, m.location, m.creation_ts, quarantined FROM media AS m WHERE m.origin <> ANY($1) AND m.creation_ts < $2 AND (SELECT COUNT(*) FROM media AS d WHERE d.sha256_hash = m.sha256_hash AND d.creation_ts >= $2) = 0 AND (SELECT COUNT(*) FROM media AS d WHERE d.sha256_hash = m.sha256_hash AND d.origin = ANY($1)) = 0;" +const selectMedia = "SELECT origin, media_id, upload_name, content_type, user_id, sha256_hash, size_bytes, datastore_id, location, creation_ts, quarantined, compressed FROM media WHERE origin = $1 and media_id = $2;" +const selectMediaByHash = "SELECT origin, media_id, upload_name, content_type, user_id, sha256_hash, size_bytes, datastore_id, location, creation_ts, quarantined, compressed FROM media WHERE sha256_hash = $1;" +const insertMedia = "INSERT INTO media (origin, media_id, upload_name, content_type, user_id, sha256_hash, size_bytes, datastore_id, location, creation_ts, quarantined, compressed) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12);" +const selectOldMedia = "SELECT m.origin, m.media_id, m.upload_name, m.content_type, m.user_id, m.sha256_hash, m.size_bytes, m.datastore_id, m.location, m.creation_ts, m.quarantined, m.compressed FROM media AS m WHERE m.origin <> ANY($1) AND m.creation_ts < $2 AND (SELECT COUNT(*) FROM media AS d WHERE d.sha256_hash = m.sha256_hash AND d.creation_ts >= $2) = 0 AND (SELECT COUNT(*) FROM media AS d WHERE d.sha256_hash = m.sha256_hash AND d.origin = ANY($1)) = 0;" const selectOrigins = "SELECT DISTINCT origin FROM media;" const deleteMedia = "DELETE FROM media WHERE origin = $1 AND media_id = $2;" const updateQuarantined = "UPDATE media SET quarantined = $3 WHERE origin = $1 AND media_id = $2;" const selectDatastore = "SELECT datastore_id, ds_type, uri FROM datastores WHERE datastore_id = $1;" const selectDatastoreByUri = "SELECT datastore_id, ds_type, uri FROM datastores WHERE uri = $1;" const insertDatastore = "INSERT INTO datastores (datastore_id, ds_type, uri) VALUES ($1, $2, $3);" -const selectMediaWithoutDatastore = "SELECT origin, media_id, upload_name, content_type, user_id, sha256_hash, size_bytes, datastore_id, location, creation_ts, quarantined FROM media WHERE datastore_id IS NULL OR datastore_id = '';" +const selectMediaWithoutDatastore = "SELECT origin, media_id, upload_name, content_type, user_id, sha256_hash, size_bytes, datastore_id, location, creation_ts, quarantined, compressed FROM media WHERE datastore_id IS NULL OR datastore_id = '';" const updateMediaDatastoreAndLocation = "UPDATE media SET location = $4, datastore_id = $3 WHERE origin = $1 AND media_id = $2;" const selectAllDatastores = "SELECT datastore_id, ds_type, uri FROM datastores;" -const selectAllMediaForServer = "SELECT origin, media_id, upload_name, content_type, user_id, sha256_hash, size_bytes, datastore_id, location, creation_ts, quarantined FROM media WHERE origin = $1" -const selectAllMediaForServerUsers = "SELECT origin, media_id, upload_name, content_type, user_id, sha256_hash, size_bytes, datastore_id, location, creation_ts, quarantined FROM media WHERE origin = $1 AND user_id = ANY($2)" -const selectAllMediaForServerIds = "SELECT origin, media_id, upload_name, content_type, user_id, sha256_hash, size_bytes, datastore_id, location, creation_ts, quarantined FROM media WHERE origin = $1 AND media_id = ANY($2)" -const selectQuarantinedMedia = "SELECT origin, media_id, upload_name, content_type, user_id, sha256_hash, size_bytes, datastore_id, location, creation_ts, quarantined FROM media WHERE quarantined = true;" -const selectServerQuarantinedMedia = "SELECT origin, media_id, upload_name, content_type, user_id, sha256_hash, size_bytes, datastore_id, location, creation_ts, quarantined FROM media WHERE quarantined = true AND origin = $1;" -const selectMediaByUser = "SELECT origin, media_id, upload_name, content_type, user_id, sha256_hash, size_bytes, datastore_id, location, creation_ts, quarantined FROM media WHERE user_id = $1" -const selectMediaByUserBefore = "SELECT origin, media_id, upload_name, content_type, user_id, sha256_hash, size_bytes, datastore_id, location, creation_ts, quarantined FROM media WHERE user_id = $1 AND creation_ts <= $2" -const selectMediaByDomainBefore = "SELECT origin, media_id, upload_name, content_type, user_id, sha256_hash, size_bytes, datastore_id, location, creation_ts, quarantined FROM media WHERE origin = $1 AND creation_ts <= $2" -const selectMediaByLocation = "SELECT origin, media_id, upload_name, content_type, user_id, sha256_hash, size_bytes, datastore_id, location, creation_ts, quarantined FROM media WHERE datastore_id = $1 AND location = $2" +const selectAllMediaForServer = "SELECT origin, media_id, upload_name, content_type, user_id, sha256_hash, size_bytes, datastore_id, location, creation_ts, quarantined, compressed FROM media WHERE origin = $1" +const selectAllMediaForServerUsers = "SELECT origin, media_id, upload_name, content_type, user_id, sha256_hash, size_bytes, datastore_id, location, creation_ts, quarantined, compressed FROM media WHERE origin = $1 AND user_id = ANY($2)" +const selectAllMediaForServerIds = "SELECT origin, media_id, upload_name, content_type, user_id, sha256_hash, size_bytes, datastore_id, location, creation_ts, quarantined, compressed FROM media WHERE origin = $1 AND media_id = ANY($2)" +const selectQuarantinedMedia = "SELECT origin, media_id, upload_name, content_type, user_id, sha256_hash, size_bytes, datastore_id, location, creation_ts, quarantined, compressed FROM media WHERE quarantined = true;" +const selectServerQuarantinedMedia = "SELECT origin, media_id, upload_name, content_type, user_id, sha256_hash, size_bytes, datastore_id, location, creation_ts, quarantined, compressed FROM media WHERE quarantined = true AND origin = $1;" +const selectMediaByUser = "SELECT origin, media_id, upload_name, content_type, user_id, sha256_hash, size_bytes, datastore_id, location, creation_ts, quarantined, compressed FROM media WHERE user_id = $1" +const selectMediaByUserBefore = "SELECT origin, media_id, upload_name, content_type, user_id, sha256_hash, size_bytes, datastore_id, location, creation_ts, quarantined, compressed FROM media WHERE user_id = $1 AND creation_ts <= $2" +const selectMediaByDomainBefore = "SELECT origin, media_id, upload_name, content_type, user_id, sha256_hash, size_bytes, datastore_id, location, creation_ts, quarantined, compressed FROM media WHERE origin = $1 AND creation_ts <= $2" +const selectMediaByLocation = "SELECT origin, media_id, upload_name, content_type, user_id, sha256_hash, size_bytes, datastore_id, location, creation_ts, quarantined, compressed FROM media WHERE datastore_id = $1 AND location = $2" const selectIfQuarantined = "SELECT 1 FROM media WHERE sha256_hash = $1 AND quarantined = $2 LIMIT 1;" var dsCacheByPath = sync.Map{} // [string] => Datastore @@ -175,6 +175,7 @@ func (s *MediaStore) Insert(media *types.Media) error { media.Location, media.CreationTs, media.Quarantined, + media.Compressed, ) return err } @@ -200,6 +201,7 @@ func (s *MediaStore) GetByHash(hash string) ([]*types.Media, error) { &obj.Location, &obj.CreationTs, &obj.Quarantined, + &obj.Compressed, ) if err != nil { return nil, err @@ -224,6 +226,7 @@ func (s *MediaStore) Get(origin string, mediaId string) (*types.Media, error) { &m.Location, &m.CreationTs, &m.Quarantined, + &m.Compressed, ) return m, err } @@ -249,6 +252,7 @@ func (s *MediaStore) GetOldMedia(exceptOrigins []string, beforeTs int64) ([]*typ &obj.Location, &obj.CreationTs, &obj.Quarantined, + &obj.Compressed, ) if err != nil { return nil, err @@ -396,6 +400,7 @@ func (s *MediaStore) GetAllWithoutDatastore() ([]*types.Media, error) { &obj.Location, &obj.CreationTs, &obj.Quarantined, + &obj.Compressed, ) if err != nil { return nil, err @@ -450,6 +455,7 @@ func (s *MediaStore) GetAllMediaForServer(serverName string) ([]*types.Media, er &obj.Location, &obj.CreationTs, &obj.Quarantined, + &obj.Compressed, ) if err != nil { return nil, err @@ -481,6 +487,7 @@ func (s *MediaStore) GetAllMediaForServerUsers(serverName string, userIds []stri &obj.Location, &obj.CreationTs, &obj.Quarantined, + &obj.Compressed, ) if err != nil { return nil, err @@ -512,6 +519,7 @@ func (s *MediaStore) GetAllMediaInIds(serverName string, mediaIds []string) ([]* &obj.Location, &obj.CreationTs, &obj.Quarantined, + &obj.Compressed, ) if err != nil { return nil, err @@ -543,6 +551,7 @@ func (s *MediaStore) GetAllQuarantinedMedia() ([]*types.Media, error) { &obj.Location, &obj.CreationTs, &obj.Quarantined, + &obj.Compressed, ) if err != nil { return nil, err @@ -574,6 +583,7 @@ func (s *MediaStore) GetQuarantinedMediaFor(serverName string) ([]*types.Media, &obj.Location, &obj.CreationTs, &obj.Quarantined, + &obj.Compressed, ) if err != nil { return nil, err @@ -605,6 +615,7 @@ func (s *MediaStore) GetMediaByUser(userId string) ([]*types.Media, error) { &obj.Location, &obj.CreationTs, &obj.Quarantined, + &obj.Compressed, ) if err != nil { return nil, err @@ -636,6 +647,7 @@ func (s *MediaStore) GetMediaByUserBefore(userId string, beforeTs int64) ([]*typ &obj.Location, &obj.CreationTs, &obj.Quarantined, + &obj.Compressed, ) if err != nil { return nil, err @@ -667,6 +679,7 @@ func (s *MediaStore) GetMediaByDomainBefore(serverName string, beforeTs int64) ( &obj.Location, &obj.CreationTs, &obj.Quarantined, + &obj.Compressed, ) if err != nil { return nil, err @@ -698,6 +711,7 @@ func (s *MediaStore) GetMediaByLocation(datastoreId string, location string) ([] &obj.Location, &obj.CreationTs, &obj.Quarantined, + &obj.Compressed, ) if err != nil { return nil, err diff --git a/storage/stores/thumbnail_store.go b/storage/stores/thumbnail_store.go index e9fcccc8..d44d6b68 100644 --- a/storage/stores/thumbnail_store.go +++ b/storage/stores/thumbnail_store.go @@ -7,15 +7,15 @@ import ( "github.com/turt2live/matrix-media-repo/types" ) -const selectThumbnail = "SELECT origin, media_id, width, height, method, animated, content_type, size_bytes, datastore_id, location, creation_ts, sha256_hash FROM thumbnails WHERE origin = $1 and media_id = $2 and width = $3 and height = $4 and method = $5 and animated = $6;" -const insertThumbnail = "INSERT INTO thumbnails (origin, media_id, width, height, method, animated, content_type, size_bytes, datastore_id, location, creation_ts, sha256_hash) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12);" +const selectThumbnail = "SELECT origin, media_id, width, height, method, animated, content_type, size_bytes, datastore_id, location, creation_ts, sha256_hash, compressed FROM thumbnails WHERE origin = $1 and media_id = $2 and width = $3 and height = $4 and method = $5 and animated = $6;" +const insertThumbnail = "INSERT INTO thumbnails (origin, media_id, width, height, method, animated, content_type, size_bytes, datastore_id, location, creation_ts, sha256_hash, compressed) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13);" const updateThumbnailHash = "UPDATE thumbnails SET sha256_hash = $7 WHERE origin = $1 and media_id = $2 and width = $3 and height = $4 and method = $5 and animated = $6;" -const selectThumbnailsWithoutHash = "SELECT origin, media_id, width, height, method, animated, content_type, size_bytes, datastore_id, location, creation_ts, sha256_hash FROM thumbnails WHERE sha256_hash IS NULL OR sha256_hash = '';" -const selectThumbnailsWithoutDatastore = "SELECT origin, media_id, width, height, method, animated, content_type, size_bytes, datastore_id, location, creation_ts, sha256_hash FROM thumbnails WHERE datastore_id IS NULL OR datastore_id = '';" +const selectThumbnailsWithoutHash = "SELECT origin, media_id, width, height, method, animated, content_type, size_bytes, datastore_id, location, creation_ts, sha256_hash, compressed FROM thumbnails WHERE sha256_hash IS NULL OR sha256_hash = '';" +const selectThumbnailsWithoutDatastore = "SELECT origin, media_id, width, height, method, animated, content_type, size_bytes, datastore_id, location, creation_ts, sha256_hash, compressed FROM thumbnails WHERE datastore_id IS NULL OR datastore_id = '';" const updateThumbnailDatastoreAndLocation = "UPDATE thumbnails SET location = $8, datastore_id = $7 WHERE origin = $1 and media_id = $2 and width = $3 and height = $4 and method = $5 and animated = $6;" -const selectThumbnailsForMedia = "SELECT origin, media_id, width, height, method, animated, content_type, size_bytes, datastore_id, location, creation_ts, sha256_hash FROM thumbnails WHERE origin = $1 AND media_id = $2;" +const selectThumbnailsForMedia = "SELECT origin, media_id, width, height, method, animated, content_type, size_bytes, datastore_id, location, creation_ts, sha256_hash, compressed FROM thumbnails WHERE origin = $1 AND media_id = $2;" const deleteThumbnailsForMedia = "DELETE FROM thumbnails WHERE origin = $1 AND media_id = $2;" -const selectThumbnailsCreatedBefore = "SELECT origin, media_id, width, height, method, animated, content_type, size_bytes, datastore_id, location, creation_ts, sha256_hash FROM thumbnails WHERE creation_ts < $1;" +const selectThumbnailsCreatedBefore = "SELECT origin, media_id, width, height, method, animated, content_type, size_bytes, datastore_id, location, creation_ts, sha256_hash, compressed FROM thumbnails WHERE creation_ts < $1;" const deleteThumbnailsWithHash = "DELETE FROM thumbnails WHERE sha256_hash = $1;" type thumbnailStatements struct { @@ -105,6 +105,7 @@ func (s *ThumbnailStore) Insert(thumbnail *types.Thumbnail) error { thumbnail.Location, thumbnail.CreationTs, thumbnail.Sha256Hash, + thumbnail.Compressed, ) return err @@ -125,6 +126,7 @@ func (s *ThumbnailStore) Get(origin string, mediaId string, width int, height in &t.Location, &t.CreationTs, &t.Sha256Hash, + &t.Compressed, ) return t, err } @@ -182,6 +184,7 @@ func (s *ThumbnailStore) GetAllWithoutHash() ([]*types.Thumbnail, error) { &obj.Location, &obj.CreationTs, &obj.Sha256Hash, + &obj.Compressed, ) if err != nil { return nil, err @@ -214,6 +217,7 @@ func (s *ThumbnailStore) GetAllWithoutDatastore() ([]*types.Thumbnail, error) { &obj.Location, &obj.CreationTs, &obj.Sha256Hash, + &obj.Compressed, ) if err != nil { return nil, err @@ -246,6 +250,7 @@ func (s *ThumbnailStore) GetAllForMedia(origin string, mediaId string) ([]*types &obj.Location, &obj.CreationTs, &obj.Sha256Hash, + &obj.Compressed, ) if err != nil { return nil, err @@ -286,6 +291,7 @@ func (s *ThumbnailStore) GetOldThumbnails(beforeTs int64) ([]*types.Thumbnail, e &obj.Location, &obj.CreationTs, &obj.Sha256Hash, + &obj.Compressed, ) if err != nil { return nil, err diff --git a/types/media.go b/types/media.go index ce8ea0a1..e0bc8230 100644 --- a/types/media.go +++ b/types/media.go @@ -14,6 +14,7 @@ type Media struct { Location string CreationTs int64 Quarantined bool + Compressed bool } type MinimalMedia struct { diff --git a/types/object_info.go b/types/object_info.go index 11fe6a40..05ebdf2c 100644 --- a/types/object_info.go +++ b/types/object_info.go @@ -1,7 +1,5 @@ package types type ObjectInfo struct { - Location string - Sha256Hash string - SizeBytes int64 + Location string } diff --git a/types/thumbnail.go b/types/thumbnail.go index de71be34..9d240636 100644 --- a/types/thumbnail.go +++ b/types/thumbnail.go @@ -17,6 +17,7 @@ type Thumbnail struct { Location string CreationTs int64 Sha256Hash string + Compressed bool } type StreamedThumbnail struct { diff --git a/util/compression.go b/util/compression.go new file mode 100644 index 00000000..94b708bb --- /dev/null +++ b/util/compression.go @@ -0,0 +1,43 @@ +package util + +import ( + "bytes" + "compress/gzip" + "github.com/turt2live/matrix-media-repo/common/rcontext" + "io" +) + +func CompressBytesIfNeeded(b []byte, ctx rcontext.RequestContext) ([]byte, bool, error) { + if !ctx.Config.Uploads.Compression.Enabled { + return b, false, nil + } + + buf := &bytes.Buffer{} + w, err := gzip.NewWriterLevel(buf, ctx.Config.Uploads.Compression.Level) + if err != nil { + return nil, false, err + } + defer w.Close() + + _, err = w.Write(b) + if err != nil { + return nil, false, err + } + + // Everything is written: close it out + w.Close() + + return buf.Bytes(), true, nil +} + +func DecompressBytesIfNeeded(s io.ReadCloser, compressed bool, ctx rcontext.RequestContext) (io.ReadCloser, error) { + if !compressed { + return s, nil + } + + r, err := gzip.NewReader(s) + if err != nil { + return nil, err + } + return r, nil +} -- GitLab