diff --git a/api/r0/upload.go b/api/r0/upload.go index aa1274a23c27c039d2b254ce7d300be771f08e42..8b70d2f3defaf8898401d0d93e9e141c53ec019b 100644 --- a/api/r0/upload.go +++ b/api/r0/upload.go @@ -38,7 +38,9 @@ func UploadMedia(r *http.Request, log *logrus.Entry, user api.UserInfo) interfac return api.RequestTooSmall() } - media, err := upload_controller.UploadMedia(r.Body, contentType, filename, user.UserId, r.Host, r.Context(), log) + contentLength := upload_controller.EstimateContentLength(r.ContentLength, r.Header.Get("Content-Length")) + + media, err := upload_controller.UploadMedia(r.Body, contentLength, contentType, filename, user.UserId, r.Host, r.Context(), log) if err != nil { io.Copy(ioutil.Discard, r.Body) // Ditch the entire request diff --git a/api/unstable/local_copy.go b/api/unstable/local_copy.go index a23efddb5b1f1f17b462073468597603956a10bd..6d17fbc3679725a96c318ea985bb87f2602165e1 100644 --- a/api/unstable/local_copy.go +++ b/api/unstable/local_copy.go @@ -56,7 +56,7 @@ func LocalCopy(r *http.Request, log *logrus.Entry, user api.UserInfo) interface{ return &r0.MediaUploadedResponse{ContentUri: streamedMedia.KnownMedia.MxcUri()} } - newMedia, err := upload_controller.UploadMedia(streamedMedia.Stream, streamedMedia.KnownMedia.ContentType, streamedMedia.KnownMedia.UploadName, user.UserId, r.Host, r.Context(), log) + newMedia, err := upload_controller.UploadMedia(streamedMedia.Stream, streamedMedia.KnownMedia.SizeBytes, streamedMedia.KnownMedia.ContentType, streamedMedia.KnownMedia.UploadName, user.UserId, r.Host, r.Context(), log) if err != nil { if err == common.ErrMediaNotAllowed { return api.BadRequest("Media content type not allowed on this server") diff --git a/cmd/import_synapse/main.go b/cmd/import_synapse/main.go index e099b971d3e109bd6faa07f75a3392fe7c75bf26..30d8ac0c2f67a02834512d5ea2770e2602c18af6 100644 --- a/cmd/import_synapse/main.go +++ b/cmd/import_synapse/main.go @@ -133,7 +133,7 @@ func fetchMedia(req interface{}) interface{} { return nil } - _, err = upload_controller.StoreDirect(body, record.ContentType, record.UploadName, record.UserId, payload.serverName, record.MediaId, ctx, log) + _, err = upload_controller.StoreDirect(body, -1, record.ContentType, record.UploadName, record.UserId, payload.serverName, record.MediaId, ctx, log) if err != nil { logrus.Error(err.Error()) return nil diff --git a/config.sample.yaml b/config.sample.yaml index f469492bf2ba0f059a3c38f121a54e2369bc1f8e..fd9cd893a8680521b7d0df08e495c84ee736b4a1 100644 --- a/config.sample.yaml +++ b/config.sample.yaml @@ -50,6 +50,11 @@ datastores: enabled: false forUploads: false # Enable this to put uploads in s3 opts: + # The s3 uploader needs a temporary location to buffer files to reduce memory usage on + # small file uploads. If the file size is unknown, the file is written to this location + # before being uploaded to s3 (then the file is deleted). If you aren't concerned about + # memory usage, set this to an empty string. + tempPath: "/tmp/mediarepo_s3_upload" endpoint: sfo2.digitaloceanspaces.com accessKeyId: "" accessSecret: "" diff --git a/controllers/download_controller/download_resource_handler.go b/controllers/download_controller/download_resource_handler.go index 6503972993d74008b2381e4dd7cc170b7af81dd0..6898f8fcdd5e49ca60fcd6d34413d25cf5712cf2 100644 --- a/controllers/download_controller/download_resource_handler.go +++ b/controllers/download_controller/download_resource_handler.go @@ -61,6 +61,7 @@ type downloadedMedia struct { Contents io.ReadCloser DesiredFilename string ContentType string + ContentLength int64 } var resHandler *mediaResourceHandler @@ -139,7 +140,7 @@ func downloadResourceWorkFn(request *resource_handler.WorkRequest) interface{} { defer fileStream.Close() userId := upload_controller.NoApplicableUploadUser - media, err := upload_controller.StoreDirect(fileStream, downloaded.ContentType, downloaded.DesiredFilename, userId, info.origin, info.mediaId, ctx, log) + media, err := upload_controller.StoreDirect(fileStream, downloaded.ContentLength, downloaded.ContentType, downloaded.DesiredFilename, userId, info.origin, info.mediaId, ctx, log) if err != nil { log.Error("Error persisting file: ", err) return &workerDownloadResponse{err: err} @@ -240,8 +241,9 @@ func DownloadRemoteMediaDirect(server string, mediaId string, log *logrus.Entry) } request := &downloadedMedia{ - ContentType: contentType, - Contents: resp.Body, + ContentType: contentType, + Contents: resp.Body, + ContentLength: contentLength, // DesiredFilename (calculated below) } diff --git a/controllers/maintenance_controller/maintainance_controller.go b/controllers/maintenance_controller/maintainance_controller.go index a42627609f6acbbdb516906908e921ac25d44b18..89e0951074e48a50171ed40c8e626db599d38d49 100644 --- a/controllers/maintenance_controller/maintainance_controller.go +++ b/controllers/maintenance_controller/maintainance_controller.go @@ -31,7 +31,7 @@ func StartStorageMigration(sourceDs *datastore.DatastoreRef, targetDs *datastore continue } - newLocation, err := targetDs.UploadFile(sourceStream, ctx, log) + newLocation, err := targetDs.UploadFile(sourceStream, record.SizeBytes, ctx, log) if err != nil { log.Error(err) log.Error("Failed to upload file to target datastore") diff --git a/controllers/preview_controller/preview_resource_handler.go b/controllers/preview_controller/preview_resource_handler.go index acafb1103b9c2ad65c2469797083123e267f362e..6e231b74159a4150a0956995fa0c00d3b1472d4a 100644 --- a/controllers/preview_controller/preview_resource_handler.go +++ b/controllers/preview_controller/preview_resource_handler.go @@ -95,8 +95,10 @@ func urlPreviewWorkFn(request *resource_handler.WorkRequest) interface{} { // Store the thumbnail, if there is one if preview.Image != nil && !upload_controller.IsRequestTooLarge(preview.Image.ContentLength, preview.Image.ContentLengthHeader) { + contentLength := upload_controller.EstimateContentLength(preview.Image.ContentLength, preview.Image.ContentLengthHeader) + // UploadMedia will close the read stream for the thumbnail and dedupe the image - media, err := upload_controller.UploadMedia(preview.Image.Data, preview.Image.ContentType, preview.Image.Filename, info.forUserId, info.onHost, ctx, log) + media, err := upload_controller.UploadMedia(preview.Image.Data, contentLength, preview.Image.ContentType, preview.Image.Filename, info.forUserId, info.onHost, ctx, log) if err != nil { log.Warn("Non-fatal error storing preview thumbnail: " + err.Error()) } else { diff --git a/controllers/thumbnail_controller/thumbnail_resource_handler.go b/controllers/thumbnail_controller/thumbnail_resource_handler.go index 1db8ac8d5ed443ab9a7d1d0430825c75841362b9..190965076ab23b8039fa761757cc1e1553a8f9da 100644 --- a/controllers/thumbnail_controller/thumbnail_resource_handler.go +++ b/controllers/thumbnail_controller/thumbnail_resource_handler.go @@ -288,7 +288,7 @@ func GenerateThumbnail(media *types.Media, width int, height int, method string, if err != nil { return nil, err } - info, err := ds.UploadFile(util.BufferToStream(imgData), ctx, log) + info, err := ds.UploadFile(util.BufferToStream(imgData), int64(len(imgData.Bytes())), ctx, log) if err != nil { log.Error("Unexpected error saving thumbnail: " + err.Error()) return nil, err diff --git a/controllers/upload_controller/upload_controller.go b/controllers/upload_controller/upload_controller.go index ff2a3afd73623b2a6c7e1374514a0bc70ce99399..0e91c5ee5d34c1e6aad56221e7d2fa9bf418152e 100644 --- a/controllers/upload_controller/upload_controller.go +++ b/controllers/upload_controller/upload_controller.go @@ -59,7 +59,24 @@ func IsRequestTooSmall(contentLength int64, contentLengthHeader string) bool { return false // We can only assume } -func UploadMedia(contents io.ReadCloser, contentType string, filename string, userId string, origin string, ctx context.Context, log *logrus.Entry) (*types.Media, error) { +func EstimateContentLength(contentLength int64, contentLengthHeader string) int64 { + if contentLength >= 0 { + return contentLength + } + if contentLengthHeader != "" { + parsed, err := strconv.ParseInt(contentLengthHeader, 10, 64) + if err != nil { + logrus.Warn("Invalid content length header given. Value received: " + contentLengthHeader) + return -1 // unknown + } + + return parsed + } + + return -1 // unknown +} + +func UploadMedia(contents io.ReadCloser, contentLength int64, contentType string, filename string, userId string, origin string, ctx context.Context, log *logrus.Entry) (*types.Media, error) { defer contents.Close() var data io.ReadCloser @@ -74,7 +91,7 @@ func UploadMedia(contents io.ReadCloser, contentType string, filename string, us return nil, err } - return StoreDirect(data, contentType, filename, userId, origin, mediaId, ctx, log) + return StoreDirect(data, contentLength, contentType, filename, userId, origin, mediaId, ctx, log) } func trackUploadAsLastAccess(ctx context.Context, log *logrus.Entry, media *types.Media) { @@ -127,12 +144,12 @@ func IsAllowed(contentType string, reportedContentType string, userId string, lo return allowed } -func StoreDirect(contents io.ReadCloser, contentType string, filename string, userId string, origin string, mediaId string, ctx context.Context, log *logrus.Entry) (*types.Media, error) { +func StoreDirect(contents io.ReadCloser, expectedSize int64, contentType string, filename string, userId string, origin string, mediaId string, ctx context.Context, log *logrus.Entry) (*types.Media, error) { ds, err := datastore.PickDatastore(ctx, log) if err != nil { return nil, err } - info, err := ds.UploadFile(contents, ctx, log) + info, err := ds.UploadFile(contents, expectedSize, ctx, log) if err != nil { return nil, err } diff --git a/storage/datastore/datastore_ref.go b/storage/datastore/datastore_ref.go index f518edeffdfc56d914aaefc71f3e52284a6f6803..d1d8c7576409bdbeed0c8709ad229da784909a2d 100644 --- a/storage/datastore/datastore_ref.go +++ b/storage/datastore/datastore_ref.go @@ -35,7 +35,7 @@ func newDatastoreRef(ds *types.Datastore, config config.DatastoreConfig) *Datast } } -func (d *DatastoreRef) UploadFile(file io.ReadCloser, ctx context.Context, log *logrus.Entry) (*types.ObjectInfo, error) { +func (d *DatastoreRef) UploadFile(file io.ReadCloser, expectedLength int64, ctx context.Context, log *logrus.Entry) (*types.ObjectInfo, error) { log = log.WithFields(logrus.Fields{"datastoreId": d.DatastoreId, "datastoreUri": d.Uri}) if d.Type == "file" { @@ -45,7 +45,7 @@ func (d *DatastoreRef) UploadFile(file io.ReadCloser, ctx context.Context, log * if err != nil { return nil, err } - return s3.UploadFile(file, ctx, log) + return s3.UploadFile(file, expectedLength, ctx, log) } else { return nil, errors.New("unknown datastore type") } diff --git a/storage/datastore/ds_s3/s3_store.go b/storage/datastore/ds_s3/s3_store.go index 314e8496b844db0dfa95e74ca556e8d796d5e029..8cfcecc27bd6a0eda8b2421fcc464330019ffa12 100644 --- a/storage/datastore/ds_s3/s3_store.go +++ b/storage/datastore/ds_s3/s3_store.go @@ -4,6 +4,7 @@ import ( "context" "io" "io/ioutil" + "os" "strconv" "github.com/minio/minio-go" @@ -17,10 +18,11 @@ import ( var stores = make(map[string]*s3Datastore) type s3Datastore struct { - conf config.DatastoreConfig - dsId string - client *minio.Client - bucket string + conf config.DatastoreConfig + dsId string + client *minio.Client + bucket string + tempPath string } func GetOrCreateS3Datastore(dsId string, conf config.DatastoreConfig) (*s3Datastore, error) { @@ -32,9 +34,13 @@ func GetOrCreateS3Datastore(dsId string, conf config.DatastoreConfig) (*s3Datast bucket, bucketFound := conf.Options["bucketName"] accessKeyId, keyFound := conf.Options["accessKeyId"] accessSecret, secretFound := conf.Options["accessSecret"] + tempPath, tempPathFound := conf.Options["tempPath"] if !epFound || !bucketFound || !keyFound || !secretFound { return nil, errors.New("invalid configuration: missing s3 options") } + if !tempPathFound { + logrus.Warn("Datastore ", dsId, " (s3) does not have a tempPath set - this could lead to excessive memory usage by the media repo") + } useSsl := true useSslStr, sslFound := conf.Options["ssl"] @@ -48,10 +54,11 @@ func GetOrCreateS3Datastore(dsId string, conf config.DatastoreConfig) (*s3Datast } s3ds := &s3Datastore{ - conf: conf, - dsId: dsId, - client: s3client, - bucket: bucket, + conf: conf, + dsId: dsId, + client: s3client, + bucket: bucket, + tempPath: tempPath, } stores[dsId] = s3ds return s3ds, nil @@ -68,7 +75,7 @@ func (s *s3Datastore) EnsureBucketExists() error { return nil } -func (s *s3Datastore) UploadFile(file io.ReadCloser, ctx context.Context, log *logrus.Entry) (*types.ObjectInfo, error) { +func (s *s3Datastore) UploadFile(file io.ReadCloser, expectedLength int64, ctx context.Context, log *logrus.Entry) (*types.ObjectInfo, error) { defer file.Close() objectName, err := util.GenerateRandomString(512) @@ -76,7 +83,9 @@ func (s *s3Datastore) UploadFile(file io.ReadCloser, ctx context.Context, log *l return nil, err } - rs3, ws3 := io.Pipe() + var rs3 io.ReadCloser + var ws3 io.WriteCloser + rs3, ws3 = io.Pipe() tr := io.TeeReader(file, ws3) done := make(chan bool) @@ -96,8 +105,26 @@ func (s *s3Datastore) UploadFile(file io.ReadCloser, ctx context.Context, log *l }() go func() { + if expectedLength <= 0 { + if s.tempPath != "" { + log.Info("Buffering file to temp path due to unknown file size") + var f *os.File + f, uploadErr = ioutil.TempFile(s.tempPath, "mr*") + if uploadErr != nil { + io.Copy(ioutil.Discard, rs3) + done <- true + return + } + expectedLength, uploadErr = io.Copy(f, rs3) + rs3 = f + defer f.Close() + } else { + log.Warn("Uploading content of unknown length to s3 - this could result in high memory usage") + expectedLength = -1 + } + } log.Info("Uploading file...") - sizeBytes, uploadErr = s.client.PutObjectWithContext(ctx, s.bucket, objectName, rs3, -1, minio.PutObjectOptions{}) + sizeBytes, uploadErr = s.client.PutObjectWithContext(ctx, s.bucket, objectName, rs3, expectedLength, minio.PutObjectOptions{}) log.Info("Uploaded ", sizeBytes, " bytes to s3") done <- true }()