diff --git a/CHANGELOG.md b/CHANGELOG.md index dc9a297315dbe8ee7125d0d42b56cd1d6206eb86..915fc40123e34e65a4d3bd8c4fbc9e3c61be4d62 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -83,6 +83,7 @@ path/server, for example, then you can simply update the path in the config for ### Added * Added a `federation.ignoredHosts` config option to block media from individual homeservers. +* Support for MSC2246 (async uploads) is added, with per-user quota limiting options. ### Removed @@ -93,6 +94,8 @@ path/server, for example, then you can simply update the path in the config for * **Mandatory configuration change**: You must add datastore IDs to your datastore configuration, as matrix-media-repo will no longer manage datastores for you. * Datastores no longer use the `enabled` flag set on them. Use `forKinds: []` instead. * Some admin endpoints for purging media, quarantining media, and background task information now require additional path components. See [docs/admin.md](./docs/admin.md) for more information. +* Per-user upload quotas now do not allow users to exceed the maximum values, even by 1 byte. Previously, users could exceed the limits by a little bit. +* All uploads are now async (using MSC2246) internally. This should not have user-facing impact. * Updated to Go 1.19 ## [1.2.13] - February 12, 2023 diff --git a/README.md b/README.md index 3ff213ad0ca172d2caa8db5f5e6bfbfaacfc844c..a86dc80128b4ca625625e57c3a57f5bb5ecbe204 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,11 @@ # matrix-media-repo +# 🚨🚨 DANGER: MAJOR REFACTORING IN PROGRESS 🚨🚨 + +**This project is undergoing a major refactoring on the main branch.** This may lead to inconsistent database or storage +state while the project is effectively rewritten in large part. *Use `:latest` and the main branch at your own risk.* No +support is provided to people using `:latest` or `master`. + matrix-media-repo is a highly customizable multi-domain media repository for Matrix. Intended for medium to large environments consisting of several homeservers, this media repo de-duplicates media (including remote media) while being fully compliant with the specification. diff --git a/api/routes.go b/api/routes.go index ff6378de1a1568ae4aef5ca5665b455fff54f376..766cfc30585a30306384af7870ed6bdae2dfe6e7 100644 --- a/api/routes.go +++ b/api/routes.go @@ -13,6 +13,7 @@ import ( "github.com/turt2live/matrix-media-repo/api/custom" "github.com/turt2live/matrix-media-repo/api/r0" "github.com/turt2live/matrix-media-repo/api/unstable" + v1 "github.com/turt2live/matrix-media-repo/api/v1" ) const PrefixMedia = "/_matrix/media" @@ -39,6 +40,7 @@ func buildRoutes() http.Handler { register([]string{"GET"}, PrefixMedia, "config", false, router, makeRoute(_routers.RequireAccessToken(r0.PublicConfig), "config", false, counter)) register([]string{"POST"}, PrefixClient, "logout", false, router, makeRoute(_routers.RequireAccessToken(r0.Logout), "logout", false, counter)) register([]string{"POST"}, PrefixClient, "logout/all", false, router, makeRoute(_routers.RequireAccessToken(r0.LogoutAll), "logout_all", false, counter)) + register([]string{"POST"}, PrefixMedia, "create", false, router, makeRoute(_routers.RequireAccessToken(v1.CreateMedia), "create", false, counter)) // Custom features register([]string{"GET"}, PrefixMedia, "local_copy/:server/:mediaId", true, router, makeRoute(_routers.RequireAccessToken(unstable.LocalCopy), "local_copy", false, counter)) diff --git a/api/v1/create.go b/api/v1/create.go new file mode 100644 index 0000000000000000000000000000000000000000..0617093f809928103e9e594b07c3f6d7a9a5e0c6 --- /dev/null +++ b/api/v1/create.go @@ -0,0 +1,31 @@ +package v1 + +import ( + "net/http" + + "github.com/getsentry/sentry-go" + "github.com/turt2live/matrix-media-repo/api/_apimeta" + "github.com/turt2live/matrix-media-repo/api/_responses" + "github.com/turt2live/matrix-media-repo/common/rcontext" + "github.com/turt2live/matrix-media-repo/pipline/create_pipeline" + "github.com/turt2live/matrix-media-repo/util" +) + +type MediaCreatedResponse struct { + ContentUri string `json:"content_uri"` + ExpiresTs int64 `json:"unused_expires_at"` +} + +func CreateMedia(r *http.Request, rctx rcontext.RequestContext, user _apimeta.UserInfo) interface{} { + id, err := create_pipeline.Execute(rctx, r.Host, user.UserId, create_pipeline.DefaultExpirationTime) + if err != nil { + rctx.Log.Error("Unexpected error creating media ID:", err) + sentry.CaptureException(err) + return _responses.InternalServerError("unexpected error") + } + + return &MediaCreatedResponse{ + ContentUri: util.MxcUri(id.Origin, id.MediaId), + ExpiresTs: id.ExpiresTs, + } +} diff --git a/common/config/conf_min_shared.go b/common/config/conf_min_shared.go index 09eb8212303ad8305105b130374d00606d30ebea..02628743f63055853424f0c82eba521292123dd4 100644 --- a/common/config/conf_min_shared.go +++ b/common/config/conf_min_shared.go @@ -23,6 +23,8 @@ func NewDefaultMinimumRepoConfig() MinimumRepoConfig { MaxSizeBytes: 104857600, // 100mb MinSizeBytes: 100, ReportedMaxSizeBytes: 0, + MaxPending: 5, + MaxAgeSeconds: 1800, // 30 minutes Quota: QuotasConfig{ Enabled: false, UserQuotas: []QuotaUserConfig{}, diff --git a/common/config/models_domain.go b/common/config/models_domain.go index 40adc886a842828d83a613993b965a1bd58a1df9..1c69276ececd8ced0512ba76e4cb657becd99910 100644 --- a/common/config/models_domain.go +++ b/common/config/models_domain.go @@ -7,8 +7,9 @@ type ArchivingConfig struct { } type QuotaUserConfig struct { - Glob string `yaml:"glob"` - MaxBytes int64 `yaml:"maxBytes"` + Glob string `yaml:"glob"` + MaxBytes int64 `yaml:"maxBytes"` + MaxPending int64 `yaml:"maxPending"` } type QuotasConfig struct { @@ -20,6 +21,8 @@ type UploadsConfig struct { MaxSizeBytes int64 `yaml:"maxBytes"` MinSizeBytes int64 `yaml:"minBytes"` ReportedMaxSizeBytes int64 `yaml:"reportedMaxBytes"` + MaxPending int64 `yaml:"maxPending"` + MaxAgeSeconds int64 `yaml:"maxAgeSeconds"` Quota QuotasConfig `yaml:"quotas"` } diff --git a/config.sample.yaml b/config.sample.yaml index 13118b0905919111e8cbb6c6038570b7574bbe92..fefc5f3f29caf57f87f40ae319ec46c13c080121 100644 --- a/config.sample.yaml +++ b/config.sample.yaml @@ -227,24 +227,38 @@ uploads: # Set this to -1 to indicate that there is no limit. Zero will force the use of maxBytes. #reportedMaxBytes: 104857600 + # The number of pending uploads a user is permitted to have at a given time. They must cancel, + # complete, or otherwise let pending requests expire before uploading any more media. Set to + # zero to disable. + maxPending: 5 + + # The duration the server will wait to receive media that was asynchronously uploaded before + # expiring it entirely. This should be set sufficiently high for a client on poor connectivity + # to upload something. The Matrix specification recommends 24 hours (86400 seconds), however + # this project recommends 30 minutes (1800 seconds). + maxAgeSeconds: 1800 + # Options for limiting how much content a user can upload. Quotas are applied to content # associated with a user regardless of de-duplication. Quotas which affect remote servers # or users will not take effect. When a user exceeds their quota they will be unable to # upload any more media. quotas: - # Whether or not quotas are enabled/enforced. Note that even when disabled the media repo - # will track how much media a user has uploaded. This is disabled by default. + # Whether quotas are enabled/enforced. Note that even when disabled the media repo will + # track how much media a user has uploaded. Quotas are disabled by default. enabled: false - # The quota rules that affect users. The first rule to match the uploader will take effect. - # An implied rule which matches all users and has no quota is always last in this list, - # meaning that if no rules are supplied then users will be able to upload anything. Similarly, - # if no rules match a user then the implied rule will match, allowing the user to have no - # quota. The quota will let the user upload to 1 media past their quota, meaning that from - # a statistics perspective the user might exceed their quota however only by a small amount. + # The upload quota rules which affect users. The first rule to match the user ID will take + # effect. If a user does not match a rule, the defaults implied by the above config will + # take effect instead. The user will not be permitted to upload anything above these quota + # values, but can match them exactly. users: - glob: "@*:*" # Affect all users. Use asterisks (*) to match any character. - maxBytes: 53687063712 # 50GB default, 0 to disable + # The maximum number of TOTAL bytes a user can upload. Defaults to zero (no limit). + maxBytes: 53687063712 # 50gb + # The same as maxPending above - the number of uploads the user can have waiting to + # complete before starting another one. Defaults to maxPending above. Set to 0 to + # disable. + maxPending: 5 # Settings related to downloading files from the media repository downloads: diff --git a/database/db.go b/database/db.go index 319ccc8aedcc56b715b47a9d5cf0fb9aacf2c948..b6b05202d737cc9d15c38cb4eb8b07f9affbdf80 100644 --- a/database/db.go +++ b/database/db.go @@ -14,8 +14,11 @@ import ( ) type Database struct { - conn *sql.DB - Media *mediaTableStatements + conn *sql.DB + Media *mediaTableStatements + ExpiringMedia *expiringMediaTableStatements + UserStats *userStatsTableStatements + ReservedMedia *reservedMediaTableStatements } var instance *Database @@ -70,7 +73,16 @@ func openDatabase(connectionString string, maxConns int, maxIdleConns int) error // Prepare the table accessors if d.Media, err = prepareMediaTables(d.conn); err != nil { - return errors.New("failed to create media table accessor") + return errors.New("failed to create media table accessor: " + err.Error()) + } + if d.ExpiringMedia, err = prepareExpiringMediaTables(d.conn); err != nil { + return errors.New("failed to create expiring media table accessor: " + err.Error()) + } + if d.UserStats, err = prepareUserStatsTables(d.conn); err != nil { + return errors.New("failed to create user stats table accessor: " + err.Error()) + } + if d.ReservedMedia, err = prepareReservedMediaTables(d.conn); err != nil { + return errors.New("failed to create reserved media table accessor: " + err.Error()) } instance = d diff --git a/database/table_expiring_media.go b/database/table_expiring_media.go new file mode 100644 index 0000000000000000000000000000000000000000..daac75e9e41120de3575c8b90ca1a96797e5309d --- /dev/null +++ b/database/table_expiring_media.go @@ -0,0 +1,66 @@ +package database + +import ( + "database/sql" + "errors" + + "github.com/turt2live/matrix-media-repo/common/rcontext" + "github.com/turt2live/matrix-media-repo/util" +) + +type DbExpiringMedia struct { + Origin string + MediaId string + UserId string + ExpiresTs int64 +} + +const insertExpiringMedia = "INSERT INTO expiring_media (origin, media_id, user_id, expires_ts) VALUES ($1, $2, $3, $4);" +const selectExpiringMediaByUserCount = "SELECT COUNT(*) FROM expiring_media WHERE user_id = $1 AND expires_ts >= $2;" + +type expiringMediaTableStatements struct { + insertExpiringMedia *sql.Stmt + selectExpiringMediaByUserCount *sql.Stmt +} + +type expiringMediaTableWithContext struct { + statements *expiringMediaTableStatements + ctx rcontext.RequestContext +} + +func prepareExpiringMediaTables(db *sql.DB) (*expiringMediaTableStatements, error) { + var err error + var stmts = &expiringMediaTableStatements{} + + if stmts.insertExpiringMedia, err = db.Prepare(insertExpiringMedia); err != nil { + return nil, errors.New("error preparing insertExpiringMedia: " + err.Error()) + } + if stmts.selectExpiringMediaByUserCount, err = db.Prepare(selectExpiringMediaByUserCount); err != nil { + return nil, errors.New("error preparing selectExpiringMediaByUserCount: " + err.Error()) + } + + return stmts, nil +} + +func (s *expiringMediaTableStatements) Prepare(ctx rcontext.RequestContext) *expiringMediaTableWithContext { + return &expiringMediaTableWithContext{ + statements: s, + ctx: ctx, + } +} + +func (s *expiringMediaTableWithContext) Insert(origin string, mediaId string, userId string, expiresTs int64) error { + _, err := s.statements.insertExpiringMedia.ExecContext(s.ctx, origin, mediaId, userId, expiresTs) + return err +} + +func (s *expiringMediaTableWithContext) ByUserCount(userId string) (int64, error) { + row := s.statements.selectExpiringMediaByUserCount.QueryRowContext(s.ctx, userId, util.NowMillis()) + val := int64(0) + err := row.Scan(&val) + if err == sql.ErrNoRows { + err = nil + val = 0 + } + return val, err +} diff --git a/database/table_reserved_media.go b/database/table_reserved_media.go new file mode 100644 index 0000000000000000000000000000000000000000..807ccdfd0ba5394f48de11ec0ca0b319c242f4fe --- /dev/null +++ b/database/table_reserved_media.go @@ -0,0 +1,54 @@ +package database + +import ( + "database/sql" + "errors" + + "github.com/turt2live/matrix-media-repo/common/rcontext" +) + +type DbReservedMedia struct { + Origin string + MediaId string + Reason string +} + +type ReserveReason string + +const ( + ForCreateReserveReason ReserveReason = "media_create" +) + +const insertReservedMedia = "INSERT INTO reserved_media (origin, media_id, reason) VALUES ($1, $2, $3);" + +type reservedMediaTableStatements struct { + insertReservedMedia *sql.Stmt +} + +type reservedMediaTableWithContext struct { + statements *reservedMediaTableStatements + ctx rcontext.RequestContext +} + +func prepareReservedMediaTables(db *sql.DB) (*reservedMediaTableStatements, error) { + var err error + var stmts = &reservedMediaTableStatements{} + + if stmts.insertReservedMedia, err = db.Prepare(insertReservedMedia); err != nil { + return nil, errors.New("error preparing insertReservedMedia: " + err.Error()) + } + + return stmts, nil +} + +func (s *reservedMediaTableStatements) Prepare(ctx rcontext.RequestContext) *reservedMediaTableWithContext { + return &reservedMediaTableWithContext{ + statements: s, + ctx: ctx, + } +} + +func (s *reservedMediaTableWithContext) TryInsert(origin string, mediaId string, reason ReserveReason) error { + _, err := s.statements.insertReservedMedia.ExecContext(s.ctx, origin, mediaId, reason) + return err +} diff --git a/database/table_user_stats.go b/database/table_user_stats.go new file mode 100644 index 0000000000000000000000000000000000000000..3840ef25823aaedc077605fb05147a439020ec36 --- /dev/null +++ b/database/table_user_stats.go @@ -0,0 +1,53 @@ +package database + +import ( + "database/sql" + "errors" + + "github.com/turt2live/matrix-media-repo/common/rcontext" +) + +type DbUserStats struct { + UserId string + UploadedBytes int64 +} + +const selectUserStatsUploadedBytes = "SELECT uploaded_bytes FROM user_stats WHERE user_id = $1;" + +type userStatsTableStatements struct { + selectUserStatsUploadedBytes *sql.Stmt +} + +type userStatsTableWithContext struct { + statements *userStatsTableStatements + ctx rcontext.RequestContext +} + +func prepareUserStatsTables(db *sql.DB) (*userStatsTableStatements, error) { + var err error + var stmts = &userStatsTableStatements{} + + if stmts.selectUserStatsUploadedBytes, err = db.Prepare(selectUserStatsUploadedBytes); err != nil { + return nil, errors.New("error preparing selectUserStatsUploadedBytes: " + err.Error()) + } + + return stmts, nil +} + +func (s *userStatsTableStatements) Prepare(ctx rcontext.RequestContext) *userStatsTableWithContext { + return &userStatsTableWithContext{ + statements: s, + ctx: ctx, + } +} + +func (s *userStatsTableWithContext) UserUploadedBytes(userId string) (int64, error) { + row := s.statements.selectUserStatsUploadedBytes.QueryRowContext(s.ctx, userId) + val := int64(0) + err := row.Scan(&val) + if err == sql.ErrNoRows { + err = nil + val = 0 + } + return val, err +} diff --git a/migrations/19_create_expiring_media_table_down.sql b/migrations/19_create_expiring_media_table_down.sql new file mode 100644 index 0000000000000000000000000000000000000000..7dae08cf6b42b5e560c185bed0e39636017b3aa9 --- /dev/null +++ b/migrations/19_create_expiring_media_table_down.sql @@ -0,0 +1,4 @@ +DROP INDEX idx_expiring_media; +DROP INDEX idx_expiring_media_user_id; +DROP INDEX idx_expiring_media_expires_ts; +DROP TABLE expiring_media; \ No newline at end of file diff --git a/migrations/19_create_expiring_media_table_up.sql b/migrations/19_create_expiring_media_table_up.sql new file mode 100644 index 0000000000000000000000000000000000000000..97fa6b6c9b65234e05952a044ea53e0eef956e18 --- /dev/null +++ b/migrations/19_create_expiring_media_table_up.sql @@ -0,0 +1,9 @@ +CREATE TABLE IF NOT EXISTS expiring_media ( + origin TEXT NOT NULL, + media_id TEXT NOT NULL, + user_id TEXT NOT NULL, + expires_ts BIGINT NOT NULL +); +CREATE UNIQUE INDEX IF NOT EXISTS idx_expiring_media ON expiring_media (media_id, origin); +CREATE INDEX IF NOT EXISTS idx_expiring_media_user_id ON expiring_media (user_id); +CREATE INDEX IF NOT EXISTS idx_expiring_media_expires_ts ON expiring_media (expires_ts); \ No newline at end of file diff --git a/pipline/_steps/quota/check.go b/pipline/_steps/quota/check.go new file mode 100644 index 0000000000000000000000000000000000000000..4cfff0d15c1e145a21f7c954bc76779fca3f0d0b --- /dev/null +++ b/pipline/_steps/quota/check.go @@ -0,0 +1,72 @@ +package quota + +import ( + "errors" + + "github.com/ryanuber/go-glob" + "github.com/turt2live/matrix-media-repo/common/rcontext" + "github.com/turt2live/matrix-media-repo/database" +) + +type Type int64 + +const ( + MaxBytes Type = 0 + MaxPending Type = 1 +) + +var ErrQuotaExceeded = errors.New("quota exceeded") + +func Check(ctx rcontext.RequestContext, userId string, quotaType Type) error { + if !ctx.Config.Uploads.Quota.Enabled { + return checkDefault(ctx, userId, quotaType) + } + + for _, q := range ctx.Config.Uploads.Quota.UserQuotas { + if glob.Glob(q.Glob, userId) { + if quotaType == MaxBytes { + if q.MaxBytes == 0 { + return nil + } + total, err := database.GetInstance().UserStats.Prepare(ctx).UserUploadedBytes(userId) + if err != nil { + return err + } + if total >= q.MaxBytes { + return ErrQuotaExceeded + } + return nil + } else if quotaType == MaxPending { + count, err := database.GetInstance().ExpiringMedia.Prepare(ctx).ByUserCount(userId) + if err != nil { + return err + } + if count < ctx.Config.Uploads.MaxPending { + return nil + } + return ErrQuotaExceeded + } else { + return errors.New("no default for quota type - contact developer") + } + } + } + + return checkDefault(ctx, userId, quotaType) +} + +func checkDefault(ctx rcontext.RequestContext, userId string, quotaType Type) error { + if quotaType == MaxBytes { + return nil + } else if quotaType == MaxPending { + count, err := database.GetInstance().ExpiringMedia.Prepare(ctx).ByUserCount(userId) + if err != nil { + return err + } + if count < ctx.Config.Uploads.MaxPending { + return nil + } + return ErrQuotaExceeded + } + + return errors.New("no default for quota type - contact developer") +} diff --git a/pipline/_steps/upload/generate_media_id.go b/pipline/_steps/upload/generate_media_id.go new file mode 100644 index 0000000000000000000000000000000000000000..6e4782f02d5ae8c5646339ff1ac6b9fa52947f5f --- /dev/null +++ b/pipline/_steps/upload/generate_media_id.go @@ -0,0 +1,33 @@ +package upload + +import ( + "errors" + + "github.com/turt2live/matrix-media-repo/common/rcontext" + "github.com/turt2live/matrix-media-repo/database" + "github.com/turt2live/matrix-media-repo/util/ids" +) + +func GenerateMediaId(ctx rcontext.RequestContext, origin string) (string, error) { + db := database.GetInstance().ReservedMedia.Prepare(ctx) + var mediaId string + var err error + attempts := 0 + for true { + attempts += 1 + if attempts > 10 { + return "", errors.New("internal limit reached: unable to generate media ID") + } + + mediaId, err = ids.NewUniqueId() + + err = db.TryInsert(origin, mediaId, database.ForCreateReserveReason) + if err != nil { + return "", err + } + + // Check if there's a media table record for this media as well (there shouldn't be) + return mediaId, nil // TODO: @@TR - This + } + return "", errors.New("internal limit reached: fell out of media ID generation loop") +} diff --git a/pipline/create_pipeline/pipeline.go b/pipline/create_pipeline/pipeline.go new file mode 100644 index 0000000000000000000000000000000000000000..6cbf918ff4f8b19ed6fba32c50f5b16dc290751b --- /dev/null +++ b/pipline/create_pipeline/pipeline.go @@ -0,0 +1,41 @@ +package create_pipeline + +import ( + "github.com/turt2live/matrix-media-repo/common/rcontext" + "github.com/turt2live/matrix-media-repo/database" + "github.com/turt2live/matrix-media-repo/pipline/_steps/quota" + "github.com/turt2live/matrix-media-repo/pipline/_steps/upload" + "github.com/turt2live/matrix-media-repo/util" +) + +const DefaultExpirationTime = 0 + +func Execute(ctx rcontext.RequestContext, origin string, userId string, expirationTime int64) (*database.DbExpiringMedia, error) { + // Step 1: Check quota + if err := quota.Check(ctx, userId, quota.MaxPending); err != nil { + return nil, err + } + + // Step 2: Generate media ID + mediaId, err := upload.GenerateMediaId(ctx, origin) + if err != nil { + return nil, err + } + + // Step 3: Insert record of expiration + if expirationTime == DefaultExpirationTime { + expirationTime = ctx.Config.Uploads.MaxAgeSeconds * 1000 + } + expiresTs := util.NowMillis() + expirationTime + if err = database.GetInstance().ExpiringMedia.Prepare(ctx).Insert(origin, mediaId, userId, expiresTs); err != nil { + return nil, err + } + + // Step 4: Return database record + return &database.DbExpiringMedia{ + Origin: origin, + MediaId: mediaId, + UserId: userId, + ExpiresTs: expiresTs, + }, nil +} diff --git a/util/mxc.go b/util/mxc.go new file mode 100644 index 0000000000000000000000000000000000000000..83d57337ee143e8a073cc96a85f0fae4b50f8417 --- /dev/null +++ b/util/mxc.go @@ -0,0 +1,5 @@ +package util + +func MxcUri(origin string, mediaId string) string { + return "mxc://" + origin + "/" + mediaId +}