diff --git a/controllers/data_controller/import_controller.go b/controllers/data_controller/import_controller.go index 57587dc207398e5a6a76a216828219dd192c125e..a8bd419a511ebe6f2af8330b243fece81a09a6bc 100644 --- a/controllers/data_controller/import_controller.go +++ b/controllers/data_controller/import_controller.go @@ -34,44 +34,6 @@ type importUpdate struct { var openImports = &sync.Map{} // importId => updateChan -func VerifyImport(data io.Reader, ctx rcontext.RequestContext) (int, int, []string, error) { - // Prepare the first update for the import (sync, so we can error) - // We do this before anything else because if the archive is invalid then we shouldn't - // even bother with an import. - results, err := processArchive(data) - if err != nil { - return 0, 0, nil, err - } - - manifestFile, ok := results["manifest.json"] - if !ok { - return 0, 0, nil, errors.New("no manifest provided in data package") - } - - archiveManifest := &Manifest{} - err = json.Unmarshal(manifestFile.Bytes(), archiveManifest) - if err != nil { - return 0, 0, nil, err - } - - expected := 0 - found := 0 - missing := make([]string, 0) - db := storage.GetDatabase().GetMediaStore(ctx) - for mxc, r := range archiveManifest.Media { - ctx.Log.Info("Checking file: ", mxc) - expected++ - _, err = db.Get(r.Origin, r.MediaId) - if err == nil { - found++ - } else { - missing = append(missing, mxc) - } - } - - return found, expected, missing, nil -} - func StartImport(data io.Reader, ctx rcontext.RequestContext) (*types.BackgroundTask, string, error) { // Prepare the first update for the import (sync, so we can error) // We do this before anything else because if the archive is invalid then we shouldn't @@ -104,14 +66,6 @@ func StartImport(data io.Reader, ctx rcontext.RequestContext) (*types.Background return task, importId, nil } -func IsImportWaiting(importId string) bool { - runningImport, ok := openImports.Load(importId) - if !ok || runningImport == nil { - return false - } - return true -} - func AppendToImport(importId string, data io.Reader, withReturnChan bool) (chan bool, error) { runningImport, ok := openImports.Load(importId) if !ok || runningImport == nil { @@ -151,38 +105,6 @@ func StopImport(importId string) error { return nil } -func GetFileNames(data io.Reader) ([]string, error) { - archiver, err := gzip.NewReader(data) - if err != nil { - return nil, err - } - - defer archiver.Close() - - tarFile := tar.NewReader(archiver) - names := make([]string, 0) - for { - header, err := tarFile.Next() - if err == io.EOF { - break // we're done - } - if err != nil { - return nil, err - } - - if header == nil { - continue // skip this weird file - } - if header.Typeflag != tar.TypeReg { - continue // skip directories and other stuff - } - - names = append(names, header.Name) - } - - return names, nil -} - func processArchive(data io.Reader) (map[string]*bytes.Buffer, error) { archiver, err := gzip.NewReader(data) if err != nil { diff --git a/controllers/download_controller/download_controller.go b/controllers/download_controller/download_controller.go deleted file mode 100644 index 5519d36d3824dd8a3bb14d7c23c5da323989cdfb..0000000000000000000000000000000000000000 --- a/controllers/download_controller/download_controller.go +++ /dev/null @@ -1,311 +0,0 @@ -package download_controller - -import ( - "bytes" - "database/sql" - "errors" - "fmt" - "io" - "time" - - "github.com/getsentry/sentry-go" - "github.com/turt2live/matrix-media-repo/util/stream_util" - - "github.com/disintegration/imaging" - "github.com/patrickmn/go-cache" - "github.com/turt2live/matrix-media-repo/common" - "github.com/turt2live/matrix-media-repo/common/globals" - "github.com/turt2live/matrix-media-repo/common/rcontext" - "github.com/turt2live/matrix-media-repo/controllers/quarantine_controller" - "github.com/turt2live/matrix-media-repo/internal_cache" - "github.com/turt2live/matrix-media-repo/storage" - "github.com/turt2live/matrix-media-repo/storage/datastore" - "github.com/turt2live/matrix-media-repo/types" - "github.com/turt2live/matrix-media-repo/util" -) - -var localCache = cache.New(30*time.Second, 60*time.Second) - -func GetMedia(origin string, mediaId string, downloadRemote bool, blockForMedia bool, ctx rcontext.RequestContext) (*types.MinimalMedia, error) { - cacheKey := fmt.Sprintf("%s/%s?r=%t&b=%t", origin, mediaId, downloadRemote, blockForMedia) - v, _, err := globals.DefaultRequestGroup.Do(cacheKey, func() (interface{}, error) { - var media *types.Media - var minMedia *types.MinimalMedia - var err error - if blockForMedia { - media, err = FindMediaRecord(origin, mediaId, downloadRemote, ctx) - if media != nil { - minMedia = &types.MinimalMedia{ - Origin: media.Origin, - MediaId: media.MediaId, - ContentType: media.ContentType, - UploadName: media.UploadName, - SizeBytes: media.SizeBytes, - Stream: nil, // we'll populate this later if we need to - KnownMedia: media, - } - } - } else { - minMedia, err = FindMinimalMediaRecord(origin, mediaId, downloadRemote, ctx) - if minMedia != nil { - media = minMedia.KnownMedia - } - } - if err != nil { - return nil, err - } - if minMedia == nil { - ctx.Log.Warn("Unexpected error while fetching media: no minimal media record") - return nil, common.ErrMediaNotFound - } - if media == nil && blockForMedia { - ctx.Log.Warn("Unexpected error while fetching media: no regular media record (block for media in place)") - return nil, common.ErrMediaNotFound - } - - // if we have a known media record, we might as well set it - // if we don't, this won't do anything different - minMedia.KnownMedia = media - - if media != nil { - if media.Quarantined { - ctx.Log.Warn("Quarantined media accessed") - defer stream_util.DumpAndCloseStream(minMedia.Stream) - - if ctx.Config.Quarantine.ReplaceDownloads { - ctx.Log.Info("Replacing thumbnail with a quarantined one") - - img, err := quarantine_controller.GenerateQuarantineThumbnail(512, 512, ctx) - if err != nil { - return nil, err - } - - data := &bytes.Buffer{} - imaging.Encode(data, img, imaging.PNG) - return &types.MinimalMedia{ - // Lie about all the details - Stream: stream_util.BufferToStream(data), - ContentType: "image/png", - UploadName: "quarantine.png", - SizeBytes: int64(data.Len()), - MediaId: mediaId, - Origin: origin, - KnownMedia: media, - }, nil - } - - return nil, common.ErrMediaQuarantined - } - - err = storage.GetDatabase().GetMetadataStore(ctx).UpsertLastAccess(media.Sha256Hash, util.NowMillis()) - if err != nil { - sentry.CaptureException(err) - ctx.Log.Warn("Failed to upsert the last access time: ", err) - } - - localCache.Set(origin+"/"+mediaId, media, cache.DefaultExpiration) - } - - if minMedia.Stream != nil { - ctx.Log.Info("Returning minimal media record with a viable stream") - return minMedia, nil - } - - if media == nil { - ctx.Log.Error("Failed to locate media") - return nil, errors.New("failed to locate media") - } - - ctx.Log.Info("Reading media from disk") - mediaStream, err := datastore.DownloadStream(ctx, media.DatastoreId, media.Location) - if err != nil { - return nil, err - } - - minMedia.Stream = mediaStream - return minMedia, nil - }, func(v interface{}, count int, err error) []interface{} { - if err != nil { - return nil - } - - rv := v.(*types.MinimalMedia) - vals := make([]interface{}, 0) - streams := stream_util.CloneReader(rv.Stream, count) - - for i := 0; i < count; i++ { - if rv.KnownMedia != nil { - internal_cache.Get().MarkDownload(rv.KnownMedia.Sha256Hash) - } - vals = append(vals, &types.MinimalMedia{ - Origin: rv.Origin, - MediaId: rv.MediaId, - UploadName: rv.UploadName, - ContentType: rv.ContentType, - SizeBytes: rv.SizeBytes, - KnownMedia: rv.KnownMedia, - Stream: streams[i], - }) - } - - return vals - }) - - var value *types.MinimalMedia - if v != nil { - value = v.(*types.MinimalMedia) - } - - return value, err -} - -func FindMinimalMediaRecord(origin string, mediaId string, downloadRemote bool, ctx rcontext.RequestContext) (*types.MinimalMedia, error) { - db := storage.GetDatabase().GetMediaStore(ctx) - - var media *types.Media - item, found := localCache.Get(origin + "/" + mediaId) - if found { - media = item.(*types.Media) - } else { - ctx.Log.Info("Getting media record from database") - dbMedia, err := db.Get(origin, mediaId) - if err != nil { - if err == sql.ErrNoRows { - if util.IsServerOurs(origin) { - ctx.Log.Warn("Media not found") - return nil, common.ErrMediaNotFound - } - } else { - // We don't even want to attempt a download - something very wrong happened - ctx.Log.Error("Database error trying to get media:", err.Error()) - return nil, err - } - - if !downloadRemote { - ctx.Log.Warn("Remote media not being downloaded") - return nil, common.ErrMediaNotFound - } - - mediaChan := getResourceHandler().DownloadRemoteMedia(origin, mediaId, true) - defer close(mediaChan) - - result := <-mediaChan - if result.err != nil { - return nil, result.err - } - if result.stream == nil { - ctx.Log.Info("No stream returned from remote download - attempting to create one") - if result.media == nil { - ctx.Log.Error("Fatal error: No stream and no media. Cannot acquire a stream for media") - return nil, errors.New("no stream available") - } - - stream, err := datastore.DownloadStream(ctx, result.media.DatastoreId, result.media.Location) - if err != nil { - return nil, err - } - - result.stream = stream - } - return &types.MinimalMedia{ - Origin: origin, - MediaId: mediaId, - ContentType: result.contentType, - UploadName: result.filename, - SizeBytes: -1, // unknown - Stream: result.stream, - KnownMedia: nil, // unknown - }, nil - } else { - media = dbMedia - } - } - - if media == nil { - ctx.Log.Warn("Despite all efforts, a media record could not be found") - return nil, common.ErrMediaNotFound - } - - var mediaStream io.ReadCloser - - cached, err := internal_cache.Get().GetMedia(media.Sha256Hash, internal_cache.StreamerForMedia(media), ctx) - if err != nil { - return nil, err - } - if cached != nil && cached.Contents != nil { - mediaStream = io.NopCloser(cached.Contents) - } else { - mediaStream, err = datastore.DownloadStream(ctx, media.DatastoreId, media.Location) - if err != nil { - return nil, err - } - } - - return &types.MinimalMedia{ - Origin: media.Origin, - MediaId: media.MediaId, - ContentType: media.ContentType, - UploadName: media.UploadName, - SizeBytes: media.SizeBytes, - Stream: mediaStream, - KnownMedia: media, - }, nil -} - -func FindMediaRecord(origin string, mediaId string, downloadRemote bool, ctx rcontext.RequestContext) (*types.Media, error) { - cacheKey := origin + "/" + mediaId - v, _, err := globals.DefaultRequestGroup.DoWithoutPost(cacheKey, func() (interface{}, error) { - db := storage.GetDatabase().GetMediaStore(ctx) - - var media *types.Media - item, found := localCache.Get(cacheKey) - if found { - media = item.(*types.Media) - } else { - ctx.Log.Info("Getting media record from database") - dbMedia, err := db.Get(origin, mediaId) - if err != nil { - if err == sql.ErrNoRows { - if util.IsServerOurs(origin) { - ctx.Log.Warn("Media not found") - return nil, common.ErrMediaNotFound - } - } else { - // We don't even want to attempt a download - something very wrong happened - ctx.Log.Error("Database error trying to get media:", err.Error()) - return nil, err - } - - if !downloadRemote { - ctx.Log.Warn("Remote media not being downloaded") - return nil, common.ErrMediaNotFound - } - - mediaChan := getResourceHandler().DownloadRemoteMedia(origin, mediaId, true) - defer close(mediaChan) - - result := <-mediaChan - if result.err != nil { - return nil, result.err - } - media = result.media - } else { - media = dbMedia - } - } - - if media == nil { - ctx.Log.Warn("Despite all efforts, a media record could not be found") - return nil, common.ErrMediaNotFound - } - - return media, nil - }) - - var value *types.Media - if v != nil { - value = v.(*types.Media) - } - - return value, err -} diff --git a/controllers/download_controller/download_resource_handler.go b/controllers/download_controller/download_resource_handler.go deleted file mode 100644 index fb15be23c66711af8095f3a4ff91d228a45de878..0000000000000000000000000000000000000000 --- a/controllers/download_controller/download_resource_handler.go +++ /dev/null @@ -1,289 +0,0 @@ -package download_controller - -import ( - "errors" - "io" - "mime" - "strconv" - "sync" - "time" - - "github.com/getsentry/sentry-go" - "github.com/turt2live/matrix-media-repo/util/stream_util" - - "github.com/djherbis/stream" - "github.com/patrickmn/go-cache" - "github.com/prometheus/client_golang/prometheus" - "github.com/sirupsen/logrus" - "github.com/turt2live/matrix-media-repo/common" - "github.com/turt2live/matrix-media-repo/common/config" - "github.com/turt2live/matrix-media-repo/common/rcontext" - "github.com/turt2live/matrix-media-repo/controllers/upload_controller" - "github.com/turt2live/matrix-media-repo/matrix" - "github.com/turt2live/matrix-media-repo/metrics" - "github.com/turt2live/matrix-media-repo/types" - "github.com/turt2live/matrix-media-repo/util/resource_handler" -) - -type mediaResourceHandler struct { - resourceHandler *resource_handler.ResourceHandler -} - -type downloadRequest struct { - origin string - mediaId string - blockForMedia bool -} - -type downloadResponse struct { - err error - - // This is only populated if the request was blocked pending this object - media *types.Media - - // These properties are populated if `media` is nil - filename string - contentType string - stream io.ReadCloser -} - -type workerDownloadResponse struct { - err error - - // This is only populated if the request was blocked pending this object - media *types.Media - - // These properties are populated if `media` is nil - filename string - contentType string - stream *stream.Stream -} - -type downloadedMedia struct { - Contents io.ReadCloser - DesiredFilename string - ContentType string - ContentLength int64 -} - -var resHandler *mediaResourceHandler -var resHandlerLock = &sync.Once{} -var downloadErrorsCache *cache.Cache -var downloadErrorCacheSingletonLock = &sync.Once{} - -func getResourceHandler() *mediaResourceHandler { - if resHandler == nil { - resHandlerLock.Do(func() { - handler, err := resource_handler.New(config.Get().Downloads.NumWorkers, func(r *resource_handler.WorkRequest) interface{} { - return downloadResourceWorkFn(r) - }) - if err != nil { - sentry.CaptureException(err) - panic(err) - } - - resHandler = &mediaResourceHandler{handler} - }) - } - - return resHandler -} - -func (h *mediaResourceHandler) DownloadRemoteMedia(origin string, mediaId string, blockForMedia bool) chan *downloadResponse { - resultChan := make(chan *downloadResponse) - go func() { - reqId := "remote_download:" + origin + "_" + mediaId - c := h.resourceHandler.GetResource(reqId, &downloadRequest{origin, mediaId, blockForMedia}) - defer close(c) - result := <-c - - // Translate the response stream into something that is safe to support multiple readers - resp := result.(*workerDownloadResponse) - respValue := &downloadResponse{ - err: resp.err, - media: resp.media, - contentType: resp.contentType, - filename: resp.filename, - } - if resp.stream != nil { - s, err := resp.stream.NextReader() - if err != nil { - logrus.Error("Unexpected error in processing response for remote media download: ", err) - respValue = &downloadResponse{err: err} - sentry.CaptureException(err) - } else { - respValue.stream = s - } - } - - resultChan <- respValue - }() - return resultChan -} - -func downloadResourceWorkFn(request *resource_handler.WorkRequest) (resp *workerDownloadResponse) { - info := request.Metadata.(*downloadRequest) - ctx := rcontext.Initial().LogWithFields(logrus.Fields{ - "worker_requestId": request.Id, - "worker_requestOrigin": info.origin, - "worker_requestMediaId": info.mediaId, - "worker_blockForMedia": info.blockForMedia, - }) - - resp = &workerDownloadResponse{} - defer func() { - if err := recover(); err != nil { - ctx.Log.Error("Caught panic: ", err) - sentry.CurrentHub().Recover(err) - resp.stream = nil - resp.filename = "" - resp.contentType = "" - resp.media = nil - resp.err = nil - } - }() - - ctx.Log.Info("Downloading remote media") - - downloaded, err := DownloadRemoteMediaDirect(info.origin, info.mediaId, ctx) - if err != nil { - resp.err = err - return resp - } - - persistFile := func(fileStream io.ReadCloser, r *workerDownloadResponse) *workerDownloadResponse { - defer stream_util.DumpAndCloseStream(fileStream) - userId := upload_controller.NoApplicableUploadUser - - ms := stream.NewMemStream() - io.Copy(ms, fileStream) - ms.Close() - - st, err := ms.NextReader() - if err != nil { - ctx.Log.Error("Unexpected error persisting file: ", err) - r.err = err - return r - } - - media, err := upload_controller.StoreDirect(nil, st, downloaded.ContentLength, downloaded.ContentType, downloaded.DesiredFilename, userId, info.origin, info.mediaId, common.KindRemoteMedia, ctx, true) - if err != nil { - ctx.Log.Error("Error persisting file: ", err) - r.err = err - return r - } - - ctx.Log.Info("Remote media persisted under datastore ", media.DatastoreId, " at ", media.Location) - r.media = media - r.contentType = media.ContentType - r.filename = media.UploadName - r.stream = ms - return r - } - - if info.blockForMedia { - ctx.Log.Warn("Not streaming remote media download request due to request for a block") - persistFile(downloaded.Contents, resp) - return resp - } - - ctx.Log.Info("Streaming remote media to filesystem and requesting party at the same time") - - reader, writer := io.Pipe() - tr := io.TeeReader(downloaded.Contents, writer) - - go persistFile(io.NopCloser(tr), &workerDownloadResponse{}) - - ms := stream.NewMemStream() - defer ms.Close() - io.Copy(ms, reader) - - resp.err = nil - resp.contentType = downloaded.ContentType - resp.filename = downloaded.DesiredFilename - resp.stream = ms - return resp -} - -func DownloadRemoteMediaDirect(server string, mediaId string, ctx rcontext.RequestContext) (*downloadedMedia, error) { - if downloadErrorsCache == nil { - downloadErrorCacheSingletonLock.Do(func() { - cacheTime := time.Duration(ctx.Config.Downloads.FailureCacheMinutes) * time.Minute - downloadErrorsCache = cache.New(cacheTime, cacheTime*2) - }) - } - - cacheKey := server + "/" + mediaId - item, found := downloadErrorsCache.Get(cacheKey) - if found { - ctx.Log.Warn("Returning cached error for remote media download failure") - return nil, item.(error) - } - - baseUrl, realHost, err := matrix.GetServerApiUrl(server) - if err != nil { - downloadErrorsCache.Set(cacheKey, err, cache.DefaultExpiration) - return nil, err - } - - downloadUrl := baseUrl + "/_matrix/media/r0/download/" + server + "/" + mediaId + "?allow_remote=false" - resp, err := matrix.FederatedGet(downloadUrl, realHost, ctx) - if err != nil { - downloadErrorsCache.Set(cacheKey, err, cache.DefaultExpiration) - return nil, err - } - - if resp.StatusCode == 404 { - ctx.Log.Info("Remote media not found") - - err = common.ErrMediaNotFound - downloadErrorsCache.Set(cacheKey, err, cache.DefaultExpiration) - return nil, err - } else if resp.StatusCode != 200 { - ctx.Log.Info("Unknown error fetching remote media; received status code " + strconv.Itoa(resp.StatusCode)) - - err = errors.New("could not fetch remote media") - downloadErrorsCache.Set(cacheKey, err, cache.DefaultExpiration) - return nil, err - } - - var contentLength int64 = 0 - if resp.Header.Get("Content-Length") != "" { - contentLength, err = strconv.ParseInt(resp.Header.Get("Content-Length"), 10, 64) - if err != nil { - return nil, err - } - } else { - ctx.Log.Warn("Missing Content-Length header on response - continuing anyway") - } - - if contentLength > 0 && ctx.Config.Downloads.MaxSizeBytes > 0 && contentLength > ctx.Config.Downloads.MaxSizeBytes { - ctx.Log.Warn("Attempted to download media that was too large") - - err = common.ErrMediaTooLarge - downloadErrorsCache.Set(cacheKey, err, cache.DefaultExpiration) - return nil, err - } - - contentType := resp.Header.Get("Content-Type") - if contentType == "" { - ctx.Log.Warn("Remote media has no content type; Assuming application/octet-stream") - contentType = "application/octet-stream" // binary - } - - request := &downloadedMedia{ - ContentType: contentType, - Contents: resp.Body, - ContentLength: contentLength, - // DesiredFilename (calculated below) - } - - _, params, err := mime.ParseMediaType(resp.Header.Get("Content-Disposition")) - if err == nil && params["filename"] != "" { - request.DesiredFilename = params["filename"] - } - - ctx.Log.Info("Persisting downloaded media") - metrics.MediaDownloaded.With(prometheus.Labels{"origin": server}).Inc() - return request, nil -} diff --git a/controllers/maintenance_controller/maintainance_controller.go b/controllers/maintenance_controller/maintainance_controller.go deleted file mode 100644 index ebf35bfc47623640a140350faff46ed6f906ec3c..0000000000000000000000000000000000000000 --- a/controllers/maintenance_controller/maintainance_controller.go +++ /dev/null @@ -1,238 +0,0 @@ -package maintenance_controller - -import ( - "database/sql" - "os" - - "github.com/turt2live/matrix-media-repo/common/rcontext" - "github.com/turt2live/matrix-media-repo/controllers/download_controller" - "github.com/turt2live/matrix-media-repo/storage" - "github.com/turt2live/matrix-media-repo/storage/datastore" - "github.com/turt2live/matrix-media-repo/types" - "github.com/turt2live/matrix-media-repo/util" -) - -func PurgeQuarantined(ctx rcontext.RequestContext) ([]*types.Media, error) { - mediaDb := storage.GetDatabase().GetMediaStore(ctx) - records, err := mediaDb.GetAllQuarantinedMedia() - if err != nil { - return nil, err - } - - for _, r := range records { - err = doPurge(r, ctx) - if err != nil { - return nil, err - } - } - - return records, nil -} - -func PurgeQuarantinedFor(serverName string, ctx rcontext.RequestContext) ([]*types.Media, error) { - mediaDb := storage.GetDatabase().GetMediaStore(ctx) - records, err := mediaDb.GetQuarantinedMediaFor(serverName) - if err != nil { - return nil, err - } - - for _, r := range records { - err = doPurge(r, ctx) - if err != nil { - return nil, err - } - } - - return records, nil -} - -func PurgeUserMedia(userId string, beforeTs int64, ctx rcontext.RequestContext) ([]*types.Media, error) { - mediaDb := storage.GetDatabase().GetMediaStore(ctx) - records, err := mediaDb.GetMediaByUserBefore(userId, beforeTs) - if err != nil { - return nil, err - } - - for _, r := range records { - err = doPurge(r, ctx) - if err != nil { - return nil, err - } - } - - return records, nil -} - -func PurgeOldMedia(beforeTs int64, includeLocal bool, ctx rcontext.RequestContext) ([]*types.Media, error) { - metadataDb := storage.GetDatabase().GetMetadataStore(ctx) - mediaDb := storage.GetDatabase().GetMediaStore(ctx) - - oldHashes, err := metadataDb.GetOldMedia(beforeTs) - if err != nil { - return nil, err - } - - purged := make([]*types.Media, 0) - - for _, r := range oldHashes { - media, err := mediaDb.GetByHash(r.Sha256Hash) - if err != nil { - return nil, err - } - - for _, m := range media { - if !includeLocal && util.IsServerOurs(m.Origin) { - continue - } - - err = doPurge(m, ctx) - if err != nil { - return nil, err - } - - purged = append(purged, m) - } - } - - return purged, nil -} - -func PurgeRoomMedia(mxcs []string, beforeTs int64, ctx rcontext.RequestContext) ([]*types.Media, error) { - mediaDb := storage.GetDatabase().GetMediaStore(ctx) - - purged := make([]*types.Media, 0) - - // we have to manually find each record because the SQL query is too complex - for _, mxc := range mxcs { - domain, mediaId, err := util.SplitMxc(mxc) - if err != nil { - return nil, err - } - - record, err := mediaDb.Get(domain, mediaId) - if err == sql.ErrNoRows { - continue - } - if err != nil { - return nil, err - } - - if record.CreationTs > beforeTs { - continue - } - - err = doPurge(record, ctx) - if err != nil { - return nil, err - } - - purged = append(purged, record) - } - - return purged, nil -} - -func PurgeDomainMedia(serverName string, beforeTs int64, ctx rcontext.RequestContext) ([]*types.Media, error) { - mediaDb := storage.GetDatabase().GetMediaStore(ctx) - records, err := mediaDb.GetMediaByDomainBefore(serverName, beforeTs) - if err != nil { - return nil, err - } - - for _, r := range records { - err = doPurge(r, ctx) - if err != nil { - return nil, err - } - } - - return records, nil -} - -func PurgeMedia(origin string, mediaId string, ctx rcontext.RequestContext) error { - media, err := download_controller.FindMediaRecord(origin, mediaId, false, ctx) - if err != nil { - return err - } - - return doPurge(media, ctx) -} - -func doPurge(media *types.Media, ctx rcontext.RequestContext) error { - // Delete all the thumbnails first - thumbsDb := storage.GetDatabase().GetThumbnailStore(ctx) - thumbs, err := thumbsDb.GetAllForMedia(media.Origin, media.MediaId) - if err != nil { - return err - } - for _, thumb := range thumbs { - ctx.Log.Info("Deleting thumbnail with hash: ", thumb.Sha256Hash) - ds, err := datastore.LocateDatastore(ctx, thumb.DatastoreId) - if err != nil { - return err - } - - err = ds.DeleteObject(thumb.Location) - if err != nil { - return err - } - } - err = thumbsDb.DeleteAllForMedia(media.Origin, media.MediaId) - if err != nil { - return err - } - - ds, err := datastore.LocateDatastore(ctx, media.DatastoreId) - if err != nil { - return err - } - - mediaDb := storage.GetDatabase().GetMediaStore(ctx) - similarMedia, err := mediaDb.GetByHash(media.Sha256Hash) - if err != nil { - return err - } - hasSimilar := false - for _, m := range similarMedia { - if m.Origin != media.Origin && m.MediaId != media.MediaId { - hasSimilar = true - break - } - } - - if !hasSimilar || media.Quarantined { - err = ds.DeleteObject(media.Location) - if err != nil && !os.IsNotExist(err) { - return err - } - } else { - ctx.Log.Warnf("Not deleting media from datastore: media is shared over %d objects", len(similarMedia)) - } - - metadataDb := storage.GetDatabase().GetMetadataStore(ctx) - - reserved, err := metadataDb.IsReserved(media.Origin, media.MediaId) - if err != nil { - return err - } - - if !reserved { - err = metadataDb.ReserveMediaId(media.Origin, media.MediaId, "purged / deleted") - if err != nil { - return err - } - } - - // Don't delete the media record itself if it is quarantined. If we delete it, the media - // becomes not-quarantined so we'll leave it and let it 404 in the datastores. - if media.Quarantined { - return nil - } - - err = mediaDb.Delete(media.Origin, media.MediaId) - if err != nil { - return err - } - - return nil -} diff --git a/controllers/quarantine_controller/quarantine_controller.go b/controllers/quarantine_controller/quarantine_controller.go deleted file mode 100644 index 9521e8f0002bf046264f03e490fad130c8fc6438..0000000000000000000000000000000000000000 --- a/controllers/quarantine_controller/quarantine_controller.go +++ /dev/null @@ -1,91 +0,0 @@ -package quarantine_controller - -import ( - "bytes" - "github.com/getsentry/sentry-go" - "image" - "image/color" - "math" - - "github.com/disintegration/imaging" - "github.com/fogleman/gg" - "github.com/golang/freetype/truetype" - "github.com/turt2live/matrix-media-repo/common/rcontext" - "golang.org/x/image/font/gofont/gosmallcaps" -) - -func GenerateQuarantineThumbnail(width int, height int, ctx rcontext.RequestContext) (image.Image, error) { - var centerImage image.Image - var err error - if ctx.Config.Quarantine.ThumbnailPath != "" { - centerImage, err = imaging.Open(ctx.Config.Quarantine.ThumbnailPath) - } else { - centerImage, err = generateDefaultQuarantineThumbnail() - } - if err != nil { - return nil, err - } - - c := gg.NewContext(width, height) - - centerImage = imaging.Fit(centerImage, width, height, imaging.Lanczos) - - c.DrawImageAnchored(centerImage, width/2, height/2, 0.5, 0.5) - - buf := &bytes.Buffer{} - c.EncodePNG(buf) - - return imaging.Decode(buf) -} - -func generateDefaultQuarantineThumbnail() (image.Image, error) { - c := gg.NewContext(700, 700) - c.Clear() - - red := color.RGBA{R: 190, G: 26, B: 25, A: 255} - orange := color.RGBA{R: 255, G: 186, B: 73, A: 255} - x := 350.0 - y := 300.0 - r := 256.0 - w := 55.0 - p := 64.0 - m := "media not allowed" - - c.SetColor(orange) - c.DrawRectangle(0, 0, 700, 700) - c.Fill() - - c.SetColor(red) - c.DrawCircle(x, y, r) - c.Fill() - - c.SetColor(color.White) - c.DrawCircle(x, y, r-w) - c.Fill() - - lr := r - (w / 2) - sx := x + (lr * math.Cos(gg.Radians(225.0))) - sy := y + (lr * math.Sin(gg.Radians(225.0))) - ex := x + (lr * math.Cos(gg.Radians(45.0))) - ey := y + (lr * math.Sin(gg.Radians(45.0))) - c.SetLineCap(gg.LineCapButt) - c.SetLineWidth(w) - c.SetColor(red) - c.DrawLine(sx, sy, ex, ey) - c.Stroke() - - f, err := truetype.Parse(gosmallcaps.TTF) - if err != nil { - sentry.CaptureException(err) - panic(err) - } - - c.SetColor(color.Black) - c.SetFontFace(truetype.NewFace(f, &truetype.Options{Size: 64})) - c.DrawStringAnchored(m, x, y+r+p, 0.5, 0.5) - - buf := &bytes.Buffer{} - c.EncodePNG(buf) - - return imaging.Decode(buf) -} diff --git a/controllers/upload_controller/upload_controller.go b/controllers/upload_controller/upload_controller.go index 2cf96b2c4eade45fd2f2c54f14f2b3862eef3583..2f684c92d7ddd21ecf1d629c72a665169b4d6b20 100644 --- a/controllers/upload_controller/upload_controller.go +++ b/controllers/upload_controller/upload_controller.go @@ -4,154 +4,27 @@ import ( "bytes" "errors" "io" - "strconv" - "time" "github.com/getsentry/sentry-go" - "github.com/turt2live/matrix-media-repo/util/ids" "github.com/turt2live/matrix-media-repo/util/stream_util" - "github.com/patrickmn/go-cache" "github.com/sirupsen/logrus" "github.com/turt2live/matrix-media-repo/common" "github.com/turt2live/matrix-media-repo/common/rcontext" - "github.com/turt2live/matrix-media-repo/internal_cache" "github.com/turt2live/matrix-media-repo/plugins" "github.com/turt2live/matrix-media-repo/storage" "github.com/turt2live/matrix-media-repo/storage/datastore" "github.com/turt2live/matrix-media-repo/types" "github.com/turt2live/matrix-media-repo/util" - "github.com/turt2live/matrix-media-repo/util/util_byte_seeker" ) const NoApplicableUploadUser = "" -var recentMediaIds = cache.New(30*time.Second, 60*time.Second) - type AlreadyUploadedFile struct { DS *datastore.DatastoreRef ObjectInfo *types.ObjectInfo } -func IsRequestTooLarge(contentLength int64, contentLengthHeader string, ctx rcontext.RequestContext) bool { - if ctx.Config.Uploads.MaxSizeBytes <= 0 { - return false - } - if contentLength >= 0 { - return contentLength > ctx.Config.Uploads.MaxSizeBytes - } - if contentLengthHeader != "" { - parsed, err := strconv.ParseInt(contentLengthHeader, 10, 64) - if err != nil { - ctx.Log.Warn("Invalid content length header given; assuming too large. Value received: " + contentLengthHeader) - sentry.CaptureException(err) - return true // Invalid header - } - - return parsed > ctx.Config.Uploads.MaxSizeBytes - } - - return false // We can only assume -} - -func IsRequestTooSmall(contentLength int64, contentLengthHeader string, ctx rcontext.RequestContext) bool { - if ctx.Config.Uploads.MinSizeBytes <= 0 { - return false - } - if contentLength >= 0 { - return contentLength < ctx.Config.Uploads.MinSizeBytes - } - if contentLengthHeader != "" { - parsed, err := strconv.ParseInt(contentLengthHeader, 10, 64) - if err != nil { - ctx.Log.Warn("Invalid content length header given; assuming too small. Value received: " + contentLengthHeader) - sentry.CaptureException(err) - return true // Invalid header - } - - return parsed < ctx.Config.Uploads.MinSizeBytes - } - - return false // We can only assume -} - -func EstimateContentLength(contentLength int64, contentLengthHeader string) int64 { - if contentLength >= 0 { - return contentLength - } - if contentLengthHeader != "" { - parsed, err := strconv.ParseInt(contentLengthHeader, 10, 64) - if err != nil { - logrus.Warn("Invalid content length header given. Value received: " + contentLengthHeader) - sentry.CaptureException(err) - return -1 // unknown - } - - return parsed - } - - return -1 // unknown -} - -func UploadMedia(contents io.ReadCloser, contentLength int64, contentType string, filename string, userId string, origin string, ctx rcontext.RequestContext) (*types.Media, error) { - defer stream_util.DumpAndCloseStream(contents) - - var data io.ReadCloser - if ctx.Config.Uploads.MaxSizeBytes > 0 { - data = io.NopCloser(io.LimitReader(contents, ctx.Config.Uploads.MaxSizeBytes)) - } else { - data = contents - } - - dataBytes, err := io.ReadAll(data) - if err != nil { - return nil, err - } - - metadataDb := storage.GetDatabase().GetMetadataStore(ctx) - - mediaTaken := true - var mediaId string - attempts := 0 - for mediaTaken { - attempts += 1 - if attempts > 10 { - return nil, errors.New("failed to generate a media ID after 10 rounds") - } - - mediaId, err = ids.NewUniqueId() - if err != nil { - return nil, err - } - - // Because we use the current time in the media ID, we don't need to worry about - // collisions from the database. - if _, present := recentMediaIds.Get(mediaId); present { - mediaTaken = true - continue - } - - mediaTaken, err = metadataDb.IsReserved(origin, mediaId) - if err != nil { - return nil, err - } - } - - _ = recentMediaIds.Add(mediaId, true, cache.DefaultExpiration) - - m, err := StoreDirect(nil, util_byte_seeker.NewByteSeeker(dataBytes), contentLength, contentType, filename, userId, origin, mediaId, common.KindLocalMedia, ctx, true) - if err != nil { - return m, err - } - if m != nil { - err = internal_cache.Get().UploadMedia(m.Sha256Hash, util_byte_seeker.NewByteSeeker(dataBytes), ctx) - if err != nil { - ctx.Log.Warn("Unexpected error trying to cache media: ", err) - } - } - return m, err -} - func trackUploadAsLastAccess(ctx rcontext.RequestContext, media *types.Media) { err := storage.GetDatabase().GetMetadataStore(ctx).UpsertLastAccess(media.Sha256Hash, util.NowMillis()) if err != nil { diff --git a/internal_cache/LEGACY.txt b/internal_cache/LEGACY.txt deleted file mode 100644 index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0000000000000000000000000000000000000000 diff --git a/internal_cache/cache.go b/internal_cache/cache.go deleted file mode 100644 index b871a07bd82eeb0a74ef6441d6bac5f57adfff53..0000000000000000000000000000000000000000 --- a/internal_cache/cache.go +++ /dev/null @@ -1,21 +0,0 @@ -package internal_cache - -import ( - "io" - - "github.com/turt2live/matrix-media-repo/common/rcontext" -) - -type CachedContent struct { - Contents io.ReadSeeker -} - -type FetchFunction func() (io.ReadCloser, error) - -type ContentCache interface { - Reset() - Stop() - MarkDownload(fileHash string) - GetMedia(sha256hash string, contents FetchFunction, ctx rcontext.RequestContext) (*CachedContent, error) - UploadMedia(sha256hash string, content io.ReadCloser, ctx rcontext.RequestContext) error -} diff --git a/internal_cache/instance.go b/internal_cache/instance.go deleted file mode 100644 index 0b1f7d1a6cc02e8788faad452e1270fb94f0434c..0000000000000000000000000000000000000000 --- a/internal_cache/instance.go +++ /dev/null @@ -1,39 +0,0 @@ -package internal_cache - -import ( - "sync" - - "github.com/sirupsen/logrus" - "github.com/turt2live/matrix-media-repo/common/config" -) - -var instance ContentCache -var lock = &sync.Once{} - -func Get() ContentCache { - if instance != nil { - return instance - } - - lock.Do(func() { - if config.Get().Redis.Enabled { - logrus.Info("Setting up Redis cache") - instance = NewRedisCache(config.Get().Redis) - } else { - logrus.Warn("Cache is disabled - setting up a dummy instance") - instance = NewNoopCache() - } - }) - - return instance -} - -func ReplaceInstance() { - if instance != nil { - instance.Reset() - instance.Stop() - instance = nil - } - - Get() // initializes new cache -} diff --git a/internal_cache/noop.go b/internal_cache/noop.go deleted file mode 100644 index 8a4f936270cb5d386777704ab201f37635a878ee..0000000000000000000000000000000000000000 --- a/internal_cache/noop.go +++ /dev/null @@ -1,37 +0,0 @@ -package internal_cache - -import ( - "io" - - "github.com/prometheus/client_golang/prometheus" - "github.com/turt2live/matrix-media-repo/common/rcontext" - "github.com/turt2live/matrix-media-repo/metrics" -) - -type NoopCache struct{} - -func NewNoopCache() *NoopCache { - return &NoopCache{} -} - -func (n *NoopCache) Reset() { - // do nothing -} - -func (n *NoopCache) Stop() { - // do nothing -} - -func (n *NoopCache) MarkDownload(fileHash string) { - // do nothing -} - -func (n *NoopCache) GetMedia(sha256hash string, contents FetchFunction, ctx rcontext.RequestContext) (*CachedContent, error) { - metrics.CacheMisses.With(prometheus.Labels{"cache": "media"}).Inc() - return nil, nil -} - -func (n *NoopCache) UploadMedia(sha256hash string, content io.ReadCloser, ctx rcontext.RequestContext) error { - // do nothing - return nil -} diff --git a/internal_cache/redis.go b/internal_cache/redis.go deleted file mode 100644 index c83a86389724ea6fa76c1b8c259624c5b50ba958..0000000000000000000000000000000000000000 --- a/internal_cache/redis.go +++ /dev/null @@ -1,68 +0,0 @@ -package internal_cache - -import ( - "bytes" - "io" - - "github.com/prometheus/client_golang/prometheus" - "github.com/turt2live/matrix-media-repo/common/config" - "github.com/turt2live/matrix-media-repo/common/rcontext" - "github.com/turt2live/matrix-media-repo/metrics" - "github.com/turt2live/matrix-media-repo/redis_cache" - "github.com/turt2live/matrix-media-repo/util/util_byte_seeker" -) - -type RedisCache struct { - redis *redis_cache.RedisCache -} - -func NewRedisCache(conf config.RedisConfig) *RedisCache { - return &RedisCache{redis: redis_cache.NewCache(conf)} -} - -func (c *RedisCache) Reset() { - // No-op -} - -func (c *RedisCache) Stop() { - _ = c.redis.Close() -} - -func (c *RedisCache) MarkDownload(fileHash string) { - // No-op -} - -func (c *RedisCache) GetMedia(sha256hash string, contents FetchFunction, ctx rcontext.RequestContext) (*CachedContent, error) { - return c.updateItemInCache(sha256hash, contents, ctx) -} - -func (c *RedisCache) updateItemInCache(sha256hash string, fetchFn FetchFunction, ctx rcontext.RequestContext) (*CachedContent, error) { - b, err := c.redis.GetBytes(ctx, sha256hash) - if err == redis_cache.ErrCacheMiss || err == redis_cache.ErrCacheDown { - metrics.CacheMisses.With(prometheus.Labels{"cache": "media"}).Inc() - s, err := fetchFn() - if err != nil { - return nil, err - } - defer s.Close() - fb, err := io.ReadAll(s) - if err != nil { - return nil, err - } - err = c.redis.SetStream(ctx, sha256hash, bytes.NewReader(fb)) - if err != nil && err != redis_cache.ErrCacheDown { - return nil, err - } - - metrics.CacheHits.With(prometheus.Labels{"cache": "media"}).Inc() - return &CachedContent{Contents: util_byte_seeker.NewByteSeeker(fb)}, nil - } - - metrics.CacheHits.With(prometheus.Labels{"cache": "media"}).Inc() - return &CachedContent{Contents: util_byte_seeker.NewByteSeeker(b)}, nil -} - -func (c *RedisCache) UploadMedia(sha256hash string, content io.ReadCloser, ctx rcontext.RequestContext) error { - defer content.Close() - return c.redis.SetStream(ctx, sha256hash, content) -} diff --git a/internal_cache/streamer.go b/internal_cache/streamer.go deleted file mode 100644 index 71d106a9bbe78905b9501a30ebf0d4a5660986c6..0000000000000000000000000000000000000000 --- a/internal_cache/streamer.go +++ /dev/null @@ -1,21 +0,0 @@ -package internal_cache - -import ( - "io" - - "github.com/turt2live/matrix-media-repo/common/rcontext" - "github.com/turt2live/matrix-media-repo/storage/datastore" - "github.com/turt2live/matrix-media-repo/types" -) - -func StreamerForMedia(media *types.Media) FetchFunction { - return func() (io.ReadCloser, error) { - return datastore.DownloadStream(rcontext.Initial(), media.DatastoreId, media.Location) - } -} - -func StreamerForThumbnail(media *types.Thumbnail) FetchFunction { - return func() (io.ReadCloser, error) { - return datastore.DownloadStream(rcontext.Initial(), media.DatastoreId, media.Location) - } -} diff --git a/matrix/admin.go b/matrix/admin.go index c8bf328ae43f33c721d4eee02cf8795122d54349..8d52819199848f28c230a1846ce033e839c9d0a0 100644 --- a/matrix/admin.go +++ b/matrix/admin.go @@ -37,10 +37,10 @@ func IsUserAdmin(ctx rcontext.RequestContext, serverName string, accessToken str return isAdmin, replyError } -func ListMedia(ctx rcontext.RequestContext, serverName string, accessToken string, roomId string, ipAddr string) (*mediaListResponse, error) { +func ListMedia(ctx rcontext.RequestContext, serverName string, accessToken string, roomId string, ipAddr string) (*MediaListResponse, error) { hs, cb := getBreakerAndConfig(serverName) - response := &mediaListResponse{} + response := &MediaListResponse{} var replyError error replyError = cb.CallContext(ctx, func() error { path := "" diff --git a/matrix/responses.go b/matrix/responses.go index 48b4ab335d94f1056ce3e51235e7c6f43eda0055..77efc65dd9b01a2a898b4b37842841030f4c0e29 100644 --- a/matrix/responses.go +++ b/matrix/responses.go @@ -17,7 +17,7 @@ type whoisResponse struct { // We don't actually care about any of the fields here } -type mediaListResponse struct { +type MediaListResponse struct { LocalMxcs []string `json:"local"` RemoteMxcs []string `json:"remote"` } diff --git a/pipelines/_steps/download/try_download.go b/pipelines/_steps/download/try_download.go index b6de06d9cccdd2ca1fb3f9c2e9ef804739e8aa1b..549333888348464a3c86295c65fe389dfca0a235 100644 --- a/pipelines/_steps/download/try_download.go +++ b/pipelines/_steps/download/try_download.go @@ -29,11 +29,6 @@ type downloadResult struct { err error } -type uploadResult struct { - m *database.DbMedia - err error -} - func TryDownload(ctx rcontext.RequestContext, origin string, mediaId string) (*database.DbMedia, io.ReadCloser, error) { if util.IsServerOurs(origin) { return nil, nil, common.ErrMediaNotFound diff --git a/storage/storage.go b/storage/storage.go index 5d7ae718222bb647c7f0d9f4fd1a290e86659ed8..6a84272375c309758662ab49457939beee1e1feb 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -2,7 +2,6 @@ package storage import ( "database/sql" - "github.com/getsentry/sentry-go" "sync" "github.com/DavidHuie/gomigrate" @@ -45,19 +44,6 @@ func GetDatabase() *Database { return dbInstance } -func ReloadDatabase() { - if dbInstance != nil { - if err := dbInstance.db.Close(); err != nil { - logrus.Error(err) - sentry.CaptureException(err) - } - } - - dbInstance = nil - singletonDbLock = &sync.Once{} - GetDatabase() -} - func OpenDatabase(connectionString string, maxConns int, maxIdleConns int) error { d := &Database{} var err error diff --git a/util/stream_util/streams.go b/util/stream_util/streams.go index d92b4600b0787479facd09cd15fa8b76e750626e..3e921e5d218a5323198eb2ae6f816195e93af648 100644 --- a/util/stream_util/streams.go +++ b/util/stream_util/streams.go @@ -8,8 +8,6 @@ import ( "fmt" "io" "math" - - "github.com/turt2live/matrix-media-repo/util/util_byte_seeker" ) func BufferToStream(buf *bytes.Buffer) io.ReadCloser { @@ -21,30 +19,6 @@ func BytesToStream(b []byte) io.ReadCloser { return io.NopCloser(bytes.NewBuffer(b)) } -func CloneReader(input io.ReadCloser, numReaders int) []io.ReadCloser { - readers := make([]io.ReadCloser, 0) - writers := make([]io.WriteCloser, 0) - - for i := 0; i < numReaders; i++ { - r, w := io.Pipe() - readers = append(readers, r) - writers = append(writers, w) - } - - go func() { - plainWriters := make([]io.Writer, 0) - for _, w := range writers { - defer w.Close() - plainWriters = append(plainWriters, w) - } - - mw := io.MultiWriter(plainWriters...) - io.Copy(mw, input) - }() - - return readers -} - func GetSha256HashOfStream(r io.ReadCloser) (string, error) { defer DumpAndCloseStream(r) @@ -57,10 +31,6 @@ func GetSha256HashOfStream(r io.ReadCloser) (string, error) { return hex.EncodeToString(hasher.Sum(nil)), nil } -func ClonedBufReader(buf bytes.Buffer) util_byte_seeker.ByteSeeker { - return util_byte_seeker.NewByteSeeker(buf.Bytes()) -} - func ForceDiscard(r io.Reader, nBytes int64) error { if nBytes == 0 { return nil // weird call, but ok @@ -99,21 +69,6 @@ func ForceDiscard(r io.Reader, nBytes int64) error { return nil } -func ManualSeekStream(r io.Reader, bytesStart int64, bytesToRead int64) (io.Reader, error) { - if sr, ok := r.(io.ReadSeeker); ok { - _, err := sr.Seek(bytesStart, io.SeekStart) - if err != nil { - return nil, err - } - } else { - err := ForceDiscard(r, bytesStart) - if err != nil { - return nil, err - } - } - return io.LimitReader(r, bytesToRead), nil -} - func DumpAndCloseStream(r io.ReadCloser) { if r == nil { return // nothing to dump or close