From 0167c87e4278acb596d57164c8a52eef248b4d7d Mon Sep 17 00:00:00 2001 From: Travis Ralston <travpc@gmail.com> Date: Sun, 29 Dec 2019 01:39:36 -0700 Subject: [PATCH] Add recurring tasks to purge removable content Fixes https://github.com/turt2live/matrix-media-repo/issues/39 --- cmd/media_repo/main.go | 7 +++ cmd/media_repo/reloads.go | 17 +++++++ common/config/conf_main.go | 4 +- common/config/models_main.go | 3 ++ common/config/watch.go | 3 ++ common/globals/reload.go | 1 + config.sample.yaml | 15 ++++++ docs/config.md | 3 ++ storage/stores/thumbnail_store.go | 50 +++++++++++++++++++ storage/stores/url_store.go | 14 +++++- tasks/all.go | 13 +++++ tasks/purge_previews.go | 55 +++++++++++++++++++++ tasks/purge_remote_media.go | 54 +++++++++++++++++++++ tasks/purge_thumbnails.go | 79 +++++++++++++++++++++++++++++++ 14 files changed, 315 insertions(+), 3 deletions(-) create mode 100644 tasks/all.go create mode 100644 tasks/purge_previews.go create mode 100644 tasks/purge_remote_media.go create mode 100644 tasks/purge_thumbnails.go diff --git a/cmd/media_repo/main.go b/cmd/media_repo/main.go index ab8b7792..2db4bc18 100644 --- a/cmd/media_repo/main.go +++ b/cmd/media_repo/main.go @@ -12,6 +12,7 @@ import ( "github.com/turt2live/matrix-media-repo/common/logging" "github.com/turt2live/matrix-media-repo/common/version" "github.com/turt2live/matrix-media-repo/metrics" + "github.com/turt2live/matrix-media-repo/tasks" ) func printVersion(usingLogger bool) { @@ -66,6 +67,9 @@ func main() { logrus.Fatal(err) } + logrus.Info("Starting recurring tasks...") + tasks.StartAll() + logrus.Info("Starting config watcher...") watcher := config.Watch() defer watcher.Close() @@ -82,6 +86,9 @@ func main() { logrus.Info("Stopping metrics...") metrics.Stop() + + logrus.Info("Stopping recurring tasks...") + tasks.StopAll() } // Set up a listener for SIGINT diff --git a/cmd/media_repo/reloads.go b/cmd/media_repo/reloads.go index e2097146..93679a8d 100644 --- a/cmd/media_repo/reloads.go +++ b/cmd/media_repo/reloads.go @@ -5,6 +5,7 @@ import ( "github.com/turt2live/matrix-media-repo/common/globals" "github.com/turt2live/matrix-media-repo/metrics" "github.com/turt2live/matrix-media-repo/storage" + "github.com/turt2live/matrix-media-repo/tasks" ) func setupReloads() { @@ -12,6 +13,7 @@ func setupReloads() { reloadMetricsOnChan(globals.MetricsReloadChan) reloadDatabaseOnChan(globals.DatabaseReloadChan) reloadDatastoresOnChan(globals.DatastoresReloadChan) + reloadRecurringTasksOnChan(globals.RecurringTasksReloadChan) } func stopReloads() { @@ -20,6 +22,7 @@ func stopReloads() { globals.MetricsReloadChan <- false globals.DatabaseReloadChan <- false globals.DatastoresReloadChan <- false + globals.RecurringTasksReloadChan <- false } func reloadWebOnChan(reloadChan chan bool) { @@ -75,3 +78,17 @@ func reloadDatastoresOnChan(reloadChan chan bool) { } }() } + +func reloadRecurringTasksOnChan(reloadChan chan bool) { + go func() { + for { + shouldReload := <-reloadChan + if shouldReload { + tasks.StopAll() + tasks.StartAll() + } else { + return // received stop + } + } + }() +} diff --git a/common/config/conf_main.go b/common/config/conf_main.go index 0bae01a8..0495fdc4 100644 --- a/common/config/conf_main.go +++ b/common/config/conf_main.go @@ -49,6 +49,7 @@ func NewDefaultMainConfig() MainRepoConfig { MinCacheTimeSeconds: 300, // 5min MinEvictedTimeSeconds: 60, }, + ExpireDays: 0, }, UrlPreviews: MainUrlPreviewsConfig{ UrlPreviewsConfig: UrlPreviewsConfig{ @@ -76,8 +77,8 @@ func NewDefaultMainConfig() MainRepoConfig { "0.0.0.0/0", // "Everything" }, }, - NumWorkers: 10, + ExpireDays: 0, }, Thumbnails: MainThumbnailsConfig{ ThumbnailsConfig: ThumbnailsConfig{ @@ -101,6 +102,7 @@ func NewDefaultMainConfig() MainRepoConfig { }, }, NumWorkers: 10, + ExpireDays: 0, }, RateLimit: RateLimitConfig{ Enabled: true, diff --git a/common/config/models_main.go b/common/config/models_main.go index 86081c3f..86649ffe 100644 --- a/common/config/models_main.go +++ b/common/config/models_main.go @@ -29,6 +29,7 @@ type MainDownloadsConfig struct { DownloadsConfig `yaml:",inline"` NumWorkers int `yaml:"numWorkers"` Cache CacheConfig `yaml:"cache"` + ExpireDays int `yaml:"expireAfterDays"` } type CacheConfig struct { @@ -44,11 +45,13 @@ type CacheConfig struct { type MainThumbnailsConfig struct { ThumbnailsConfig `yaml:",inline"` NumWorkers int `yaml:"numWorkers"` + ExpireDays int `yaml:"expireAfterDays"` } type MainUrlPreviewsConfig struct { UrlPreviewsConfig `yaml:",inline"` NumWorkers int `yaml:"numWorkers"` + ExpireDays int `yaml:"expireAfterDays"` } type RateLimitConfig struct { diff --git a/common/config/watch.go b/common/config/watch.go index 8b6dc80c..1e64ff8c 100644 --- a/common/config/watch.go +++ b/common/config/watch.go @@ -89,4 +89,7 @@ func onFileChanged() { // Always update the datastores logrus.Warn("Updating datastores to ensure accuracy") globals.DatastoresReloadChan <- true + + logrus.Info("Restarting recurring tasks") + globals.RecurringTasksReloadChan <- true } diff --git a/common/globals/reload.go b/common/globals/reload.go index 086c1539..6b680dbb 100644 --- a/common/globals/reload.go +++ b/common/globals/reload.go @@ -4,3 +4,4 @@ var WebReloadChan = make(chan bool) var MetricsReloadChan = make(chan bool) var DatabaseReloadChan = make(chan bool) var DatastoresReloadChan = make(chan bool) +var RecurringTasksReloadChan = make(chan bool) diff --git a/config.sample.yaml b/config.sample.yaml index 786bc0cb..81d11087 100644 --- a/config.sample.yaml +++ b/config.sample.yaml @@ -208,6 +208,11 @@ downloads: # The minimum amount of time an item should remain outside the cache once it is removed. minEvictedTimeSeconds: 60 + # How many days after a piece of remote content is downloaded before it expires. It can be + # re-downloaded on demand, this just helps free up space in your datastore. Set to zero or + # negative to disable. Defaults to disabled. + expireAfterDays: 0 + # URL Preview settings urlPreviews: enabled: true # If enabled, the preview_url routes will be accessible @@ -256,6 +261,11 @@ urlPreviews: - "0.0.0.0/0" # "Everything". The blacklist will help limit this. # This is the default value for this field. + # How many days after a preview is generated before it expires and is deleted. The preview + # can be regenerated safely - this just helps free up some space in your database. Set to + # zero or negative to disable. Defaults to disabled. + expireAfterDays: 0 + # The thumbnail configuration for the media repository. thumbnails: # The maximum number of bytes an image can be before the thumbnailer refuses. @@ -310,6 +320,11 @@ thumbnails: # and thumbnail animated content? Defaults to 0.5 (middle of animation). stillFrame: 0.5 + # How many days after a thumbnail is generated before it expires and is deleted. The thumbnail + # can be regenerated safely - this just helps free up some space in your datastores. Set to + # zero or negative to disable. Defaults to disabled. + expireAfterDays: 0 + # Controls for the rate limit functionality rateLimit: # Set this to false if rate limiting is handled at a higher level or you don't want it enabled. diff --git a/docs/config.md b/docs/config.md index f9342737..f316436b 100644 --- a/docs/config.md +++ b/docs/config.md @@ -51,6 +51,9 @@ Any options from the main config can then be overridden per-domain with the exce * `urlPreviews.numWorkers` - because workers are configured repo-wide. * `thumbnails.numWorkers` - because workers are configured repo-wide. * `federation` - because the federation options are repo-wide. +* `downloads.expireAfterDays` - because remote media downloads are not for any particular domain. +* `thumbnails.expireAfterDays` - because thumbnails aren't associated with any particular domain. +* `urlPreviews.expireAfterDays` - because previews aren't associated with any particular domain. To override a value, simply provide it in any valid per-domain config: diff --git a/storage/stores/thumbnail_store.go b/storage/stores/thumbnail_store.go index f5fa028b..e9fcccc8 100644 --- a/storage/stores/thumbnail_store.go +++ b/storage/stores/thumbnail_store.go @@ -15,6 +15,8 @@ const selectThumbnailsWithoutDatastore = "SELECT origin, media_id, width, height const updateThumbnailDatastoreAndLocation = "UPDATE thumbnails SET location = $8, datastore_id = $7 WHERE origin = $1 and media_id = $2 and width = $3 and height = $4 and method = $5 and animated = $6;" const selectThumbnailsForMedia = "SELECT origin, media_id, width, height, method, animated, content_type, size_bytes, datastore_id, location, creation_ts, sha256_hash FROM thumbnails WHERE origin = $1 AND media_id = $2;" const deleteThumbnailsForMedia = "DELETE FROM thumbnails WHERE origin = $1 AND media_id = $2;" +const selectThumbnailsCreatedBefore = "SELECT origin, media_id, width, height, method, animated, content_type, size_bytes, datastore_id, location, creation_ts, sha256_hash FROM thumbnails WHERE creation_ts < $1;" +const deleteThumbnailsWithHash = "DELETE FROM thumbnails WHERE sha256_hash = $1;" type thumbnailStatements struct { selectThumbnail *sql.Stmt @@ -25,6 +27,8 @@ type thumbnailStatements struct { updateThumbnailDatastoreAndLocation *sql.Stmt selectThumbnailsForMedia *sql.Stmt deleteThumbnailsForMedia *sql.Stmt + selectThumbnailsCreatedBefore *sql.Stmt + deleteThumbnailsWithHash *sql.Stmt } type ThumbnailStoreFactory struct { @@ -68,6 +72,12 @@ func InitThumbnailStore(sqlDb *sql.DB) (*ThumbnailStoreFactory, error) { if store.stmts.deleteThumbnailsForMedia, err = store.sqlDb.Prepare(deleteThumbnailsForMedia); err != nil { return nil, err } + if store.stmts.selectThumbnailsCreatedBefore, err = store.sqlDb.Prepare(selectThumbnailsCreatedBefore); err != nil { + return nil, err + } + if store.stmts.deleteThumbnailsWithHash, err = store.sqlDb.Prepare(deleteThumbnailsWithHash); err != nil { + return nil, err + } return &store, nil } @@ -253,3 +263,43 @@ func (s *ThumbnailStore) DeleteAllForMedia(origin string, mediaId string) error } return nil } + +func (s *ThumbnailStore) GetOldThumbnails(beforeTs int64) ([]*types.Thumbnail, error) { + rows, err := s.statements.selectThumbnailsCreatedBefore.QueryContext(s.ctx, beforeTs) + if err != nil { + return nil, err + } + + var results []*types.Thumbnail + for rows.Next() { + obj := &types.Thumbnail{} + err = rows.Scan( + &obj.Origin, + &obj.MediaId, + &obj.Width, + &obj.Height, + &obj.Method, + &obj.Animated, + &obj.ContentType, + &obj.SizeBytes, + &obj.DatastoreId, + &obj.Location, + &obj.CreationTs, + &obj.Sha256Hash, + ) + if err != nil { + return nil, err + } + results = append(results, obj) + } + + return results, nil +} + +func (s *ThumbnailStore) DeleteWithHash(sha256hash string) error { + _, err := s.statements.deleteThumbnailsWithHash.ExecContext(s.ctx, sha256hash) + if err != nil { + return err + } + return nil +} diff --git a/storage/stores/url_store.go b/storage/stores/url_store.go index 53163dac..5a6d9fdb 100644 --- a/storage/stores/url_store.go +++ b/storage/stores/url_store.go @@ -10,10 +10,12 @@ import ( const selectUrlPreview = "SELECT url, error_code, bucket_ts, site_url, site_name, resource_type, description, title, image_mxc, image_type, image_size, image_width, image_height FROM url_previews WHERE url = $1 AND bucket_ts = $2;" const insertUrlPreview = "INSERT INTO url_previews (url, error_code, bucket_ts, site_url, site_name, resource_type, description, title, image_mxc, image_type, image_size, image_width, image_height) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13);" +const deletePreviewsOlderThan = "DELETE FROM url_previews WHERE bucket_ts <= $1;" type urlStatements struct { - selectUrlPreview *sql.Stmt - insertUrlPreview *sql.Stmt + selectUrlPreview *sql.Stmt + insertUrlPreview *sql.Stmt + deletePreviewsOlderThan *sql.Stmt } type UrlStoreFactory struct { @@ -39,6 +41,9 @@ func InitUrlStore(sqlDb *sql.DB) (*UrlStoreFactory, error) { if store.stmts.insertUrlPreview, err = store.sqlDb.Prepare(insertUrlPreview); err != nil { return nil, err } + if store.stmts.deletePreviewsOlderThan, err = store.sqlDb.Prepare(deletePreviewsOlderThan); err != nil { + return nil, err + } return &store, nil } @@ -104,6 +109,11 @@ func (s *UrlStore) InsertPreviewError(url string, errorCode string) error { }) } +func (s *UrlStore) DeleteOlderThan(beforeTs int64) error { + _, err := s.statements.deletePreviewsOlderThan.ExecContext(s.ctx, beforeTs) + return err +} + func GetBucketTs(ts int64) int64 { // 1 hour buckets return (ts / 3600000) * 3600000 diff --git a/tasks/all.go b/tasks/all.go new file mode 100644 index 00000000..87d8dcfa --- /dev/null +++ b/tasks/all.go @@ -0,0 +1,13 @@ +package tasks + +func StartAll() { + StartRemoteMediaPurgeRecurring() + StartThumbnailPurgeRecurring() + StartPreviewsPurgeRecurring() +} + +func StopAll() { + StopRemoteMediaPurgeRecurring() + StopThumbnailPurgeRecurring() + StopPreviewsPurgeRecurring() +} diff --git a/tasks/purge_previews.go b/tasks/purge_previews.go new file mode 100644 index 00000000..aea32abd --- /dev/null +++ b/tasks/purge_previews.go @@ -0,0 +1,55 @@ +package tasks + +import ( + "math/rand" + "time" + + "github.com/sirupsen/logrus" + "github.com/turt2live/matrix-media-repo/common/config" + "github.com/turt2live/matrix-media-repo/common/rcontext" + "github.com/turt2live/matrix-media-repo/storage" + "github.com/turt2live/matrix-media-repo/util" +) + +var previewsPurgeDone chan bool + +func StartPreviewsPurgeRecurring() { + r := rand.New(rand.NewSource(time.Now().UnixNano())) + ticker := time.NewTicker((1 * time.Hour) + (time.Duration(r.Intn(15)) * time.Minute)) + previewsPurgeDone = make(chan bool) + + go func() { + for { + select { + case <-previewsPurgeDone: + ticker.Stop() + return + case <-ticker.C: + if config.Get().UrlPreviews.ExpireDays <= 0 { + continue + } + + doRecurringPreviewPurge() + } + } + }() +} + +func StopPreviewsPurgeRecurring() { + previewsPurgeDone <- true +} + +func doRecurringPreviewPurge() { + ctx := rcontext.Initial().LogWithFields(logrus.Fields{"task": "recurring_purge_url_previews"}) + ctx.Log.Info("Starting URL preview purge task") + + // We get media that is N days old to make sure it gets cleared safely. + beforeTs := util.NowMillis() - int64(config.Get().UrlPreviews.ExpireDays*24*60*60*1000) + + db := storage.GetDatabase().GetUrlStore(ctx) + err := db.DeleteOlderThan(beforeTs) + if err != nil { + ctx.Log.Error(err) + } + ctx.Log.Info("Purge task completed") +} diff --git a/tasks/purge_remote_media.go b/tasks/purge_remote_media.go new file mode 100644 index 00000000..978cfe04 --- /dev/null +++ b/tasks/purge_remote_media.go @@ -0,0 +1,54 @@ +package tasks + +import ( + "math/rand" + "time" + + "github.com/sirupsen/logrus" + "github.com/turt2live/matrix-media-repo/common/config" + "github.com/turt2live/matrix-media-repo/common/rcontext" + "github.com/turt2live/matrix-media-repo/controllers/maintenance_controller" + "github.com/turt2live/matrix-media-repo/util" +) + +var mediaPurgeDone chan bool + +func StartRemoteMediaPurgeRecurring() { + r := rand.New(rand.NewSource(time.Now().UnixNano())) + ticker := time.NewTicker((1 * time.Hour) + (time.Duration(r.Intn(15)) * time.Minute)) + mediaPurgeDone = make(chan bool) + + go func() { + for { + select { + case <-mediaPurgeDone: + ticker.Stop() + return + case <-ticker.C: + if config.Get().Downloads.ExpireDays <= 0 { + continue + } + + doRecurringRemoteMediaPurge() + } + } + }() +} + +func StopRemoteMediaPurgeRecurring() { + mediaPurgeDone <- true +} + +func doRecurringRemoteMediaPurge() { + ctx := rcontext.Initial().LogWithFields(logrus.Fields{"task": "recurring_purge_remote_media"}) + ctx.Log.Info("Starting remote media purge task") + + // We get media that is N days old to make sure it gets cleared safely. + beforeTs := util.NowMillis() - int64(config.Get().Downloads.ExpireDays*24*60*60*1000) + + _, err := maintenance_controller.PurgeRemoteMediaBefore(beforeTs, ctx) + if err != nil { + ctx.Log.Error(err) + } + ctx.Log.Info("Purge task completed") +} diff --git a/tasks/purge_thumbnails.go b/tasks/purge_thumbnails.go new file mode 100644 index 00000000..86549e27 --- /dev/null +++ b/tasks/purge_thumbnails.go @@ -0,0 +1,79 @@ +package tasks + +import ( + "math/rand" + "time" + + "github.com/sirupsen/logrus" + "github.com/turt2live/matrix-media-repo/common/config" + "github.com/turt2live/matrix-media-repo/common/rcontext" + "github.com/turt2live/matrix-media-repo/storage" + "github.com/turt2live/matrix-media-repo/storage/datastore" + "github.com/turt2live/matrix-media-repo/util" +) + +var thumbnailsPurgeDone chan bool + +func StartThumbnailPurgeRecurring() { + r := rand.New(rand.NewSource(time.Now().UnixNano())) + ticker := time.NewTicker((1 * time.Hour) + (time.Duration(r.Intn(15)) * time.Minute)) + thumbnailsPurgeDone = make(chan bool) + + go func() { + for { + select { + case <-thumbnailsPurgeDone: + ticker.Stop() + return + case <-ticker.C: + if config.Get().Thumbnails.ExpireDays <= 0 { + continue + } + + doRecurringThumbnailPurge() + } + } + }() +} + +func StopThumbnailPurgeRecurring() { + thumbnailsPurgeDone <- true +} + +func doRecurringThumbnailPurge() { + ctx := rcontext.Initial().LogWithFields(logrus.Fields{"task": "recurring_purge_thumbnails"}) + ctx.Log.Info("Starting thumbnail purge task") + + // We get media that is N days old to make sure it gets cleared safely. + beforeTs := util.NowMillis() - int64(config.Get().Thumbnails.ExpireDays*24*60*60*1000) + + db := storage.GetDatabase().GetThumbnailStore(ctx) + thumbs, err := db.GetOldThumbnails(beforeTs) + if err != nil { + ctx.Log.Error(err) + return + } + + for _, thumb := range thumbs { + ctx.Log.Info("Deleting thumbnails with hash: ", thumb.Sha256Hash) + err = db.DeleteWithHash(thumb.Sha256Hash) + if err != nil { + ctx.Log.Error(err) + return + } + + ds, err := datastore.LocateDatastore(ctx, thumb.DatastoreId) + if err != nil { + ctx.Log.Error(err) + return + } + + err = ds.DeleteObject(thumb.Location) + if err != nil { + ctx.Log.Error(err) + // don't return on this one - we'll continue otherwise + } + } + + ctx.Log.Info("Purge task completed") +} -- GitLab