From 7d762f182f4010a1c1a5dfc8ba48c13e414f128c Mon Sep 17 00:00:00 2001 From: Travis Ralston <travpc@gmail.com> Date: Sat, 16 Jun 2018 17:21:22 -0600 Subject: [PATCH] Minor refactoring to bring the media downloader closer to the handler Alongside some other minor refactorings. Part of #58 --- .../download_resource_handler.go | 96 ++++++++++++++- .../remote_media_downloader.go | 110 ------------------ .../internal_cache/cache_types.go | 23 ---- .../internal_cache/media_cache.go | 15 +++ .../matrix-media-repo/matrix/federation.go | 2 +- 5 files changed, 108 insertions(+), 138 deletions(-) delete mode 100644 src/github.com/turt2live/matrix-media-repo/controllers/download_controller/remote_media_downloader.go delete mode 100644 src/github.com/turt2live/matrix-media-repo/internal_cache/cache_types.go diff --git a/src/github.com/turt2live/matrix-media-repo/controllers/download_controller/download_resource_handler.go b/src/github.com/turt2live/matrix-media-repo/controllers/download_controller/download_resource_handler.go index 17db3393..1d765431 100644 --- a/src/github.com/turt2live/matrix-media-repo/controllers/download_controller/download_resource_handler.go +++ b/src/github.com/turt2live/matrix-media-repo/controllers/download_controller/download_resource_handler.go @@ -2,14 +2,20 @@ package download_controller import ( "context" + "errors" "io" + "mime" "os" + "strconv" "sync" + "time" + "github.com/patrickmn/go-cache" "github.com/ryanuber/go-glob" "github.com/sirupsen/logrus" "github.com/turt2live/matrix-media-repo/common" "github.com/turt2live/matrix-media-repo/common/config" + "github.com/turt2live/matrix-media-repo/matrix" "github.com/turt2live/matrix-media-repo/storage" "github.com/turt2live/matrix-media-repo/types" "github.com/turt2live/matrix-media-repo/util" @@ -30,8 +36,16 @@ type downloadResponse struct { err error } +type downloadedMedia struct { + Contents io.ReadCloser + DesiredFilename string + ContentType string +} + var resHandler *mediaResourceHandler var resHandlerLock = &sync.Once{} +var downloadErrorsCache *cache.Cache +var downloadErrorCacheSingletonLock = &sync.Once{} func getResourceHandler() (*mediaResourceHandler) { if resHandler == nil { @@ -69,15 +83,14 @@ func downloadResourceWorkFn(request *resource_handler.WorkRequest) interface{} { ctx := context.TODO() // TODO: Should we use a real context? - downloader := newRemoteMediaDownloader(ctx, log) - downloaded, err := downloader.Download(info.origin, info.mediaId) + downloaded, err := DownloadRemoteMediaDirect(info.origin, info.mediaId, log) if err != nil { return &downloadResponse{err: err} } defer downloaded.Contents.Close() - media, err := storeMedia(downloaded.Contents, downloaded.ContentType, downloaded.DesiredFilename, info.origin, info.mediaId, ctx, log) + media, err := StoreDirect(downloaded.Contents, downloaded.ContentType, downloaded.DesiredFilename, info.origin, info.mediaId, ctx, log) if err != nil { return &downloadResponse{err: err} } @@ -85,7 +98,7 @@ func downloadResourceWorkFn(request *resource_handler.WorkRequest) interface{} { return &downloadResponse{media, err} } -func storeMedia(contents io.Reader, contentType string, filename string, origin string, mediaId string, ctx context.Context, log *logrus.Entry) (*types.Media, error) { +func StoreDirect(contents io.Reader, contentType string, filename string, origin string, mediaId string, ctx context.Context, log *logrus.Entry) (*types.Media, error) { fileLocation, err := storage.PersistFile(contents, ctx, log) if err != nil { return nil, err @@ -180,4 +193,79 @@ func storeMedia(contents io.Reader, contentType string, filename string, origin } return media, nil +} + +func DownloadRemoteMediaDirect(server string, mediaId string, log *logrus.Entry) (*downloadedMedia, error) { + if downloadErrorsCache == nil { + downloadErrorCacheSingletonLock.Do(func() { + cacheTime := time.Duration(config.Get().Downloads.FailureCacheMinutes) * time.Minute + downloadErrorsCache = cache.New(cacheTime, cacheTime*2) + }) + } + + cacheKey := server + "/" + mediaId + item, found := downloadErrorsCache.Get(cacheKey) + if found { + log.Warn("Returning cached error for remote media download failure") + return nil, item.(error) + } + + baseUrl, err := matrix.GetServerApiUrl(server) + if err != nil { + downloadErrorsCache.Set(cacheKey, err, cache.DefaultExpiration) + return nil, err + } + + downloadUrl := baseUrl + "/_matrix/media/v1/download/" + server + "/" + mediaId + "?allow_remote=false" + resp, err := matrix.FederatedGet(downloadUrl, server) + if err != nil { + downloadErrorsCache.Set(cacheKey, err, cache.DefaultExpiration) + return nil, err + } + + if resp.StatusCode == 404 { + log.Info("Remote media not found") + + err = common.ErrMediaNotFound + downloadErrorsCache.Set(cacheKey, err, cache.DefaultExpiration) + return nil, err + } else if resp.StatusCode != 200 { + log.Info("Unknown error fetching remote media; received status code " + strconv.Itoa(resp.StatusCode)) + + err = errors.New("could not fetch remote media") + downloadErrorsCache.Set(cacheKey, err, cache.DefaultExpiration) + return nil, err + } + + var contentLength int64 = 0 + if resp.Header.Get("Content-Length") != "" { + contentLength, err = strconv.ParseInt(resp.Header.Get("Content-Length"), 10, 64) + if err != nil { + return nil, err + } + } else { + log.Warn("Missing Content-Length header on response - continuing anyway") + } + + if contentLength > 0 && config.Get().Downloads.MaxSizeBytes > 0 && contentLength > config.Get().Downloads.MaxSizeBytes { + log.Warn("Attempted to download media that was too large") + + err = common.ErrMediaTooLarge + downloadErrorsCache.Set(cacheKey, err, cache.DefaultExpiration) + return nil, err + } + + request := &downloadedMedia{ + ContentType: resp.Header.Get("Content-Type"), + Contents: resp.Body, + // DesiredFilename (calculated below) + } + + _, params, err := mime.ParseMediaType(resp.Header.Get("Content-Disposition")) + if err == nil && params["filename"] != "" { + request.DesiredFilename = params["filename"] + } + + log.Info("Persisting downloaded media") + return request, nil } \ No newline at end of file diff --git a/src/github.com/turt2live/matrix-media-repo/controllers/download_controller/remote_media_downloader.go b/src/github.com/turt2live/matrix-media-repo/controllers/download_controller/remote_media_downloader.go deleted file mode 100644 index 15c00c82..00000000 --- a/src/github.com/turt2live/matrix-media-repo/controllers/download_controller/remote_media_downloader.go +++ /dev/null @@ -1,110 +0,0 @@ -package download_controller - -import ( - "context" - "errors" - "io" - "mime" - "strconv" - "sync" - "time" - - "github.com/patrickmn/go-cache" - "github.com/sirupsen/logrus" - "github.com/turt2live/matrix-media-repo/common" - "github.com/turt2live/matrix-media-repo/common/config" - "github.com/turt2live/matrix-media-repo/matrix" -) - -type downloadedMedia struct { - Contents io.ReadCloser - DesiredFilename string - ContentType string -} - -type remoteMediaDownloader struct { - ctx context.Context - log *logrus.Entry -} - -var downloadErrorsCache *cache.Cache -var downloadErrorCacheSingletonLock = &sync.Once{} - -func newRemoteMediaDownloader(ctx context.Context, log *logrus.Entry) *remoteMediaDownloader { - return &remoteMediaDownloader{ctx, log} -} - -func (r *remoteMediaDownloader) Download(server string, mediaId string) (*downloadedMedia, error) { - if downloadErrorsCache == nil { - downloadErrorCacheSingletonLock.Do(func() { - cacheTime := time.Duration(config.Get().Downloads.FailureCacheMinutes) * time.Minute - downloadErrorsCache = cache.New(cacheTime, cacheTime*2) - }) - } - - cacheKey := server + "/" + mediaId - item, found := downloadErrorsCache.Get(cacheKey) - if found { - r.log.Warn("Returning cached error for remote media download failure") - return nil, item.(error) - } - - baseUrl, err := matrix.GetServerApiUrl(server) - if err != nil { - downloadErrorsCache.Set(cacheKey, err, cache.DefaultExpiration) - return nil, err - } - - downloadUrl := baseUrl + "/_matrix/media/v1/download/" + server + "/" + mediaId + "?allow_remote=false" - resp, err := matrix.FederatedGet(downloadUrl, server) - if err != nil { - downloadErrorsCache.Set(cacheKey, err, cache.DefaultExpiration) - return nil, err - } - - if resp.StatusCode == 404 { - r.log.Info("Remote media not found") - - err = common.ErrMediaNotFound - downloadErrorsCache.Set(cacheKey, err, cache.DefaultExpiration) - return nil, err - } else if resp.StatusCode != 200 { - r.log.Info("Unknown error fetching remote media; received status code " + strconv.Itoa(resp.StatusCode)) - - err = errors.New("could not fetch remote media") - downloadErrorsCache.Set(cacheKey, err, cache.DefaultExpiration) - return nil, err - } - - var contentLength int64 = 0 - if resp.Header.Get("Content-Length") != "" { - contentLength, err = strconv.ParseInt(resp.Header.Get("Content-Length"), 10, 64) - if err != nil { - return nil, err - } - } else { - r.log.Warn("Missing Content-Length header on response - continuing anyway") - } - - if contentLength > 0 && config.Get().Downloads.MaxSizeBytes > 0 && contentLength > config.Get().Downloads.MaxSizeBytes { - r.log.Warn("Attempted to download media that was too large") - - err = common.ErrMediaTooLarge - downloadErrorsCache.Set(cacheKey, err, cache.DefaultExpiration) - return nil, err - } - - request := &downloadedMedia{ - ContentType: resp.Header.Get("Content-Type"), - Contents: resp.Body, - // DesiredFilename (calculated below) - } - - _, params, err := mime.ParseMediaType(resp.Header.Get("Content-Disposition")) - if err == nil && params["filename"] != "" { - request.DesiredFilename = params["filename"] - } - - r.log.Info("Persisting downloaded media") - return request, nil -} diff --git a/src/github.com/turt2live/matrix-media-repo/internal_cache/cache_types.go b/src/github.com/turt2live/matrix-media-repo/internal_cache/cache_types.go deleted file mode 100644 index efbd7b14..00000000 --- a/src/github.com/turt2live/matrix-media-repo/internal_cache/cache_types.go +++ /dev/null @@ -1,23 +0,0 @@ -package internal_cache - -import ( - "bytes" - - "github.com/turt2live/matrix-media-repo/types" - "github.com/turt2live/matrix-media-repo/util" -) - -type cachedFile struct { - media *types.Media - thumbnail *types.Thumbnail - Contents *bytes.Buffer -} - -type cooldown struct { - isEviction bool - expiresTs int64 -} - -func (c *cooldown) IsExpired() bool { - return util.NowMillis() >= c.expiresTs -} diff --git a/src/github.com/turt2live/matrix-media-repo/internal_cache/media_cache.go b/src/github.com/turt2live/matrix-media-repo/internal_cache/media_cache.go index 6b80667e..d025a83d 100644 --- a/src/github.com/turt2live/matrix-media-repo/internal_cache/media_cache.go +++ b/src/github.com/turt2live/matrix-media-repo/internal_cache/media_cache.go @@ -24,6 +24,17 @@ type MediaCache struct { enabled bool } +type cachedFile struct { + media *types.Media + thumbnail *types.Thumbnail + Contents *bytes.Buffer +} + +type cooldown struct { + isEviction bool + expiresTs int64 +} + var instance *MediaCache var lock = &sync.Once{} @@ -271,3 +282,7 @@ func (c *MediaCache) flagCached(recordId string) { duration := int64(config.Get().Downloads.Cache.MinCacheTimeSeconds) * 1000 c.cooldownCache.Set(recordId, &cooldown{isEviction: false, expiresTs: duration}, cache.DefaultExpiration) } + +func (c *cooldown) IsExpired() bool { + return util.NowMillis() >= c.expiresTs +} diff --git a/src/github.com/turt2live/matrix-media-repo/matrix/federation.go b/src/github.com/turt2live/matrix-media-repo/matrix/federation.go index ef03ff23..b3f67918 100644 --- a/src/github.com/turt2live/matrix-media-repo/matrix/federation.go +++ b/src/github.com/turt2live/matrix-media-repo/matrix/federation.go @@ -39,7 +39,7 @@ func GetServerApiUrl(hostname string) (string, error) { // Note: we ignore errors because they are parsing errors. Invalid hostnames will fail through elsewhere. h, p, _ := net.SplitHostPort(hostname) if p != "" { - url := fmt.Sprintf("https://%s:%d", h, p) + url := fmt.Sprintf("https://%s:%s", h, p) apiUrlCacheInstance.Set(hostname, url, cache.DefaultExpiration) logrus.Info("Server API URL for " + hostname + " is " + url + " (explicit port)") return url, nil -- GitLab