diff --git a/.circleci/config.yml b/.circleci/config.yml index 3db58f54fff56161dbce0514d3fe696430d1cad9..57ac2299d6d6b34217e578e73bfbd1a068284871 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -6,7 +6,7 @@ jobs: working_directory: /go/src/github.com/turt2live/matrix-media-repo steps: - checkout - - run: + - run: name: build binaries command: 'GOBIN=$PWD/`dirname $0`/bin go install -v ./cmd/...' - store_artifacts: diff --git a/api/auth.go b/api/auth.go index 8b62a57e5c4d95b712896d04b5d60671af20acef..56b76a334c8e83b48a9fb8afccb19d7fdb705df6 100644 --- a/api/auth.go +++ b/api/auth.go @@ -4,6 +4,7 @@ import ( "net/http" "github.com/sirupsen/logrus" + "github.com/turt2live/matrix-media-repo/common/config" "github.com/turt2live/matrix-media-repo/matrix" "github.com/turt2live/matrix-media-repo/util" ) @@ -61,7 +62,7 @@ func AccessTokenOptionalRoute(next func(r *http.Request, log *logrus.Entry, user } func RepoAdminRoute(next func(r *http.Request, log *logrus.Entry, user UserInfo) interface{}) func(*http.Request, *logrus.Entry) interface{} { - return AccessTokenRequiredRoute(func(r *http.Request, log *logrus.Entry, user UserInfo) interface{} { + regularFunc := AccessTokenRequiredRoute(func(r *http.Request, log *logrus.Entry, user UserInfo) interface{} { if user.UserId == "" { log.Warn("Could not identify user for this admin route") return AuthFailed() @@ -74,4 +75,17 @@ func RepoAdminRoute(next func(r *http.Request, log *logrus.Entry, user UserInfo) log = log.WithFields(logrus.Fields{"isRepoAdmin": true}) return next(r, log, user) }) + + return func(r *http.Request, log *logrus.Entry) interface{} { + if config.Get().SharedSecret.Enabled { + accessToken := util.GetAccessTokenFromRequest(r) + if accessToken == config.Get().SharedSecret.Token { + log = log.WithFields(logrus.Fields{"isRepoAdmin": true}) + log.Info("User authed using shared secret") + return next(r, log, UserInfo{UserId: "@sharedsecret", AccessToken: accessToken}) + } + } + + return regularFunc(r, log) + } } diff --git a/api/custom/datastores.go b/api/custom/datastores.go index 1552840c4f045a901edbe6181dc134bf58b85421..f7828a38c1fbe4c9b2da6662ea81e051054c4ec2 100644 --- a/api/custom/datastores.go +++ b/api/custom/datastores.go @@ -29,7 +29,7 @@ func GetDatastores(r *http.Request, log *logrus.Entry, user api.UserInfo) interf response[ds.DatastoreId] = dsMap } - return response + return &api.DoNotCacheResponse{Payload: response} } func MigrateBetweenDatastores(r *http.Request, log *logrus.Entry, user api.UserInfo) interface{} { @@ -79,7 +79,7 @@ func MigrateBetweenDatastores(r *http.Request, log *logrus.Entry, user api.UserI return api.InternalServerError("Unexpected error getting storage estimate") } - return estimate + return &api.DoNotCacheResponse{Payload: estimate} } func GetDatastoreStorageEstimate(r *http.Request, log *logrus.Entry, user api.UserInfo) interface{} { @@ -107,5 +107,5 @@ func GetDatastoreStorageEstimate(r *http.Request, log *logrus.Entry, user api.Us log.Error(err) return api.InternalServerError("Unexpected error getting storage estimate") } - return result + return &api.DoNotCacheResponse{Payload: result} } diff --git a/api/custom/federation.go b/api/custom/federation.go index ae931195ce47806ef7459b054e6bb2761b8ba14f..728b3423efaf8f60b0b9b5eee0c6b0c4d9e77a9f 100644 --- a/api/custom/federation.go +++ b/api/custom/federation.go @@ -50,5 +50,5 @@ func GetFederationInfo(r *http.Request, log *logrus.Entry, user api.UserInfo) in resp["base_url"] = url resp["hostname"] = hostname resp["versions_response"] = out - return resp + return &api.DoNotCacheResponse{Payload: resp} } diff --git a/api/custom/health.go b/api/custom/health.go index 859e5f42d4908b610bfb7b1c2741458658efbb82..d9d067db8decb8f0ba055343dd72553b01e6577f 100644 --- a/api/custom/health.go +++ b/api/custom/health.go @@ -13,8 +13,10 @@ type HealthzResponse struct { } func GetHealthz(r *http.Request, log *logrus.Entry, user api.UserInfo) interface{} { - return &HealthzResponse{ - OK: true, - Status: "Probably not dead", + return &api.DoNotCacheResponse{ + Payload: &HealthzResponse{ + OK: true, + Status: "Probably not dead", + }, } } diff --git a/api/custom/purge.go b/api/custom/purge.go index 7d69667c785ed911ea360c3e9282bc22fb07a1e8..a5be8e0a547f131b23eb24751bbd4b70a884893b 100644 --- a/api/custom/purge.go +++ b/api/custom/purge.go @@ -34,7 +34,5 @@ func PurgeRemoteMedia(r *http.Request, log *logrus.Entry, user api.UserInfo) int return api.InternalServerError("Error purging remote media") } - return &MediaPurgedResponse{ - NumRemoved: removed, - } + return &api.DoNotCacheResponse{Payload: &MediaPurgedResponse{NumRemoved: removed}} } diff --git a/api/custom/quarantine.go b/api/custom/quarantine.go index a1cfff12d7df5ec06d56c7a449ca7d6218efe70d..f28a5c01f03bb569370c697adb2b07f68df96140 100644 --- a/api/custom/quarantine.go +++ b/api/custom/quarantine.go @@ -69,7 +69,7 @@ func QuarantineRoomMedia(r *http.Request, log *logrus.Entry, user api.UserInfo) total += resp.(*MediaQuarantinedResponse).NumQuarantined } - return &MediaQuarantinedResponse{NumQuarantined: total} + return &api.DoNotCacheResponse{Payload: &MediaQuarantinedResponse{NumQuarantined: total}} } func QuarantineMedia(r *http.Request, log *logrus.Entry, user api.UserInfo) interface{} { @@ -94,7 +94,7 @@ func QuarantineMedia(r *http.Request, log *logrus.Entry, user api.UserInfo) inte } resp, _ := doQuarantine(r.Context(), log, server, mediaId, allowOtherHosts) - return resp + return &api.DoNotCacheResponse{Payload: resp} } func doQuarantine(ctx context.Context, log *logrus.Entry, origin string, mediaId string, allowOtherHosts bool) (interface{}, bool) { diff --git a/api/custom/usage.go b/api/custom/usage.go new file mode 100644 index 0000000000000000000000000000000000000000..379893bee8434ec90ccd8f81fb399baff0da2806 --- /dev/null +++ b/api/custom/usage.go @@ -0,0 +1,202 @@ +package custom + +import ( + "net/http" + + "github.com/gorilla/mux" + "github.com/sirupsen/logrus" + "github.com/turt2live/matrix-media-repo/api" + "github.com/turt2live/matrix-media-repo/storage" + "github.com/turt2live/matrix-media-repo/types" + "github.com/turt2live/matrix-media-repo/util" +) + +type MinimalUsageInfo struct { + Total int64 `json:"total"` + Media int64 `json:"media"` +} + +type UsageInfo struct { + *MinimalUsageInfo + Thumbnails int64 `json:"thumbnails"` +} + +type CountsUsageResponse struct { + RawBytes *UsageInfo `json:"raw_bytes"` + RawCounts *UsageInfo `json:"raw_counts"` +} + +type UserUsageEntry struct { + RawBytes *MinimalUsageInfo `json:"raw_bytes"` + RawCounts *MinimalUsageInfo `json:"raw_counts"` + UploadedMxcs []string `json:"uploaded,flow"` +} + +type MediaUsageEntry struct { + SizeBytes int64 `json:"size_bytes"` + UploadedBy string `json:"uploaded_by"` + DatastoreId string `json:"datastore_id"` + DatastoreLocation string `json:"datastore_location"` + Sha256Hash string `json:"sha256_hash"` + Quarantined bool `json:"quarantined"` + UploadName string `json:"upload_name"` + ContentType string `json:"content_type"` + CreatedTs int64 `json:"created_ts"` +} + +func GetDomainUsage(r *http.Request, log *logrus.Entry, user api.UserInfo) interface{} { + params := mux.Vars(r) + + serverName := params["serverName"] + + log = log.WithFields(logrus.Fields{ + "serverName": serverName, + }) + + db := storage.GetDatabase().GetMetadataStore(r.Context(), log) + + mediaBytes, thumbBytes, err := db.GetByteUsageForServer(serverName) + if err != nil { + log.Error(err) + return api.InternalServerError("Failed to get byte usage for server") + } + + mediaCount, thumbCount, err := db.GetCountUsageForServer(serverName) + if err != nil { + log.Error(err) + return api.InternalServerError("Failed to get count usage for server") + } + + return &api.DoNotCacheResponse{ + Payload: &CountsUsageResponse{ + RawBytes: &UsageInfo{ + MinimalUsageInfo: &MinimalUsageInfo{ + Total: mediaBytes + thumbBytes, + Media: mediaBytes, + }, + Thumbnails: thumbBytes, + }, + RawCounts: &UsageInfo{ + MinimalUsageInfo: &MinimalUsageInfo{ + Total: mediaCount + thumbCount, + Media: mediaCount, + }, + Thumbnails: thumbCount, + }, + }, + } +} + +func GetUserUsage(r *http.Request, log *logrus.Entry, user api.UserInfo) interface{} { + params := mux.Vars(r) + + serverName := params["serverName"] + userIds := r.URL.Query()["user_id"] + + log = log.WithFields(logrus.Fields{ + "serverName": serverName, + }) + + db := storage.GetDatabase().GetMediaStore(r.Context(), log) + + var records []*types.Media + var err error + if userIds == nil || len(userIds) == 0 { + records, err = db.GetAllMediaForServer(serverName) + } else { + records, err = db.GetAllMediaForServerUsers(serverName, userIds) + } + + if err != nil { + log.Error(err) + return api.InternalServerError("Failed to get media records for users") + } + + parsed := make(map[string]*UserUsageEntry) + + for _, media := range records { + var entry *UserUsageEntry + var ok bool + if entry, ok = parsed[media.UserId]; !ok { + entry = &UserUsageEntry{ + UploadedMxcs: make([]string, 0), + RawCounts: &MinimalUsageInfo{ + Total: 0, + Media: 0, + }, + RawBytes: &MinimalUsageInfo{ + Total: 0, + Media: 0, + }, + } + parsed[media.UserId] = entry + } + + entry.RawBytes.Total += media.SizeBytes + entry.RawBytes.Media += media.SizeBytes + entry.RawCounts.Total += 1 + entry.RawCounts.Media += 1 + + entry.UploadedMxcs = append(entry.UploadedMxcs, media.MxcUri()) + } + + return &api.DoNotCacheResponse{Payload: parsed} +} + +func GetUploadsUsage(r *http.Request, log *logrus.Entry, user api.UserInfo) interface{} { + params := mux.Vars(r) + + serverName := params["serverName"] + mxcs := r.URL.Query()["mxc"] + + log = log.WithFields(logrus.Fields{ + "serverName": serverName, + }) + + db := storage.GetDatabase().GetMediaStore(r.Context(), log) + + var records []*types.Media + var err error + if mxcs == nil || len(mxcs) == 0 { + records, err = db.GetAllMediaForServer(serverName) + } else { + split := make([]string, 0) + for _, mxc := range mxcs { + o, i, err := util.SplitMxc(mxc) + if err != nil { + log.Error(err) + return api.InternalServerError("Error parsing MXC " + mxc) + } + + if o != serverName { + return api.BadRequest("MXC URIs must match the requested server") + } + + split = append(split, i) + } + records, err = db.GetAllMediaInIds(serverName, split) + } + + if err != nil { + log.Error(err) + return api.InternalServerError("Failed to get media records for users") + } + + parsed := make(map[string]*MediaUsageEntry) + + for _, media := range records { + parsed[media.MxcUri()] = &MediaUsageEntry{ + SizeBytes: media.SizeBytes, + UploadName: media.UploadName, + ContentType: media.ContentType, + CreatedTs: media.CreationTs, + DatastoreId: media.DatastoreId, + DatastoreLocation: media.Location, + Quarantined: media.Quarantined, + Sha256Hash: media.Sha256Hash, + UploadedBy: media.UserId, + } + } + + return &api.DoNotCacheResponse{Payload: parsed} +} diff --git a/api/responses.go b/api/responses.go index 7bcad3f78b54b0db1afc13dec16844aca8f1d998..5fd04ef0184b6b339b6cc3860dd0df65d4e7161f 100644 --- a/api/responses.go +++ b/api/responses.go @@ -4,6 +4,10 @@ import "github.com/turt2live/matrix-media-repo/common" type EmptyResponse struct{} +type DoNotCacheResponse struct { + Payload interface{} +} + type ErrorResponse struct { Code string `json:"errcode"` Message string `json:"error"` diff --git a/api/webserver/route_handler.go b/api/webserver/route_handler.go index c3983d0a8a0e0e6fba04136a6368d0c9d316319b..c1efdb6026ff2a13f402dd52789c3f79d51ab802 100644 --- a/api/webserver/route_handler.go +++ b/api/webserver/route_handler.go @@ -62,7 +62,6 @@ func (h handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS") w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Content-Security-Policy", "sandbox; default-src 'none'; script-src 'none'; plugin-types application/pdf; style-src 'unsafe-inline'; media-src 'self'; object-src 'self';") - w.Header().Set("Cache-Control", "public,max-age=86400,s-maxage=86400") w.Header().Set("Server", "matrix-media-repo") // Process response @@ -89,6 +88,15 @@ func (h handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { res = api.InternalServerError("Error processing response") } + switch result := res.(type) { + case *api.DoNotCacheResponse: + res = result.Payload + break + default: + w.Header().Set("Cache-Control", "public,max-age=86400,s-maxage=86400") + break + } + contextLog.Info(fmt.Sprintf("Replying with result: %T %+v", res, res)) statusCode := http.StatusOK diff --git a/api/webserver/webserver.go b/api/webserver/webserver.go index 8890bac75fb171f0efb969153da588fa238c34c6..e259cfd189ae97642448670783f163db05e54d5d 100644 --- a/api/webserver/webserver.go +++ b/api/webserver/webserver.go @@ -42,6 +42,9 @@ func Init() { dsTransferHandler := handler{api.RepoAdminRoute(custom.MigrateBetweenDatastores), "datastore_transfer", counter, false} fedTestHandler := handler{api.RepoAdminRoute(custom.GetFederationInfo), "federation_test", counter, false} healthzHandler := handler{api.AccessTokenOptionalRoute(custom.GetHealthz), "healthz", counter, true} + domainUsageHandler := handler{api.RepoAdminRoute(custom.GetDomainUsage), "domain_usage", counter, false} + userUsageHandler := handler{api.RepoAdminRoute(custom.GetUserUsage), "user_usage", counter, false} + uploadsUsageHandler := handler{api.RepoAdminRoute(custom.GetUploadsUsage), "uploads_usage", counter, false} routes := make(map[string]route) versions := []string{"r0", "v1", "unstable"} // r0 is typically clients and v1 is typically servers. v1 is deprecated. @@ -64,6 +67,9 @@ func Init() { routes["/_matrix/media/"+version+"/admin/datastores"] = route{"GET", datastoreListHandler} routes["/_matrix/media/"+version+"/admin/datastores/{sourceDsId:[^/]+}/transfer_to/{targetDsId:[^/]+}"] = route{"POST", dsTransferHandler} routes["/_matrix/media/"+version+"/admin/federation/test/{serverName:[a-zA-Z0-9.:\\-_]+}"] = route{"GET", fedTestHandler} + routes["/_matrix/media/"+version+"/admin/usage/{serverName:[a-zA-Z0-9.:\\-_]+}"] = route{"GET", domainUsageHandler} + routes["/_matrix/media/"+version+"/admin/usage/{serverName:[a-zA-Z0-9.:\\-_]+}/users"] = route{"GET", userUsageHandler} + routes["/_matrix/media/"+version+"/admin/usage/{serverName:[a-zA-Z0-9.:\\-_]+}/uploads"] = route{"GET", uploadsUsageHandler} // Routes that we should handle but aren't in the media namespace (synapse compat) routes["/_matrix/client/"+version+"/admin/purge_media_cache"] = route{"POST", purgeHandler} diff --git a/appveyor.yml b/appveyor.yml index b3b2081265cbcfbc4ba10c0f98e23d56531a5ff8..a1d424a2eeb00288af6a7374532629948e34ee29 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -5,7 +5,7 @@ clone_folder: c:\gopath\src\github.com\turt2live\matrix-media-repo environment: GOPATH: c:\gopath GOVERSION: 1.12 - + branches: only: [master, develop] diff --git a/common/config/config.go b/common/config/config.go index 4113c9d4ba8dd8292bcb680d66c56b9b7c41397a..c01f25adeb4fba441476383372d80793ea7ee138 100644 --- a/common/config/config.go +++ b/common/config/config.go @@ -128,6 +128,11 @@ type MetricsConfig struct { Port int `yaml:"port"` } +type SharedSecretConfig struct { + Enabled bool `yaml:"enabled"` + Token string `yaml:"token"` +} + type MediaRepoConfig struct { General *GeneralConfig `yaml:"repo"` Homeservers []*HomeserverConfig `yaml:"homeservers,flow"` @@ -143,6 +148,7 @@ type MediaRepoConfig struct { Quarantine *QuarantineConfig `yaml:"quarantine"` TimeoutSeconds *TimeoutsConfig `yaml:"timeouts"` Metrics *MetricsConfig `yaml:"metrics"` + SharedSecret *SharedSecretConfig `yaml:"sharedSecretAuth"` } var instance *MediaRepoConfig @@ -313,5 +319,9 @@ func NewDefaultConfig() *MediaRepoConfig { BindAddress: "localhost", Port: 9000, }, + SharedSecret: &SharedSecretConfig{ + Enabled: false, + Token: "ReplaceMe", + }, } } diff --git a/config.sample.yaml b/config.sample.yaml index fd9cd893a8680521b7d0df08e495c84ee736b4a1..05ae3b86e18e029dc3df5756039ff203d09cdce2 100644 --- a/config.sample.yaml +++ b/config.sample.yaml @@ -36,6 +36,18 @@ homeservers: admins: - "@your_username:example.org" +# Shared secret auth is useful for applications building on top of the media repository, such +# as a management interface. The `token` provided here is treated as a repository administrator +# when shared secret auth is enabled: if the `token` is used in place of an access token, the' +# request will be authorized. This is not limited to any particular domain, giving applications +# the ability to use it on any configured hostname. +sharedSecretAuth: + # Set this to true to enable shared secret auth. + enabled: false + + # Use a secure value here to prevent unauthorized access to the media repository. + token: "PutSomeRandomSecureValueHere" + # Datastores are places where media should be persisted. This isn't dedicated for just uploads: # thumbnails and other misc data is also stored in these places. When the media repo is looking # to store new media (such as user uploads, thumbnails, etc) it will look for a datastore which @@ -87,7 +99,7 @@ uploads: # placed anywhere to match everything (eg: "image/*" matches all images). This will also # restrict which file types are downloaded from remote servers. allowedTypes: - - "*/*" + - "*/*" # Specific users can have their own set of allowed file types. These are applied instead # of those listed in the allowedTypes list when a user is found. Much like allowedTypes, diff --git a/controllers/preview_controller/preview_controller.go b/controllers/preview_controller/preview_controller.go index 02a6bf8be1be52793654615084cd8d23ede4b289..e0acd5afdc3151b08bccd64505b795b1b041e1bd 100644 --- a/controllers/preview_controller/preview_controller.go +++ b/controllers/preview_controller/preview_controller.go @@ -40,7 +40,7 @@ func GetPreview(urlStr string, onHost string, forUserId string, atTs int64, ctx now := util.NowMillis() atTsBk := stores.GetBucketTs(atTs) nowBk := stores.GetBucketTs(now) - if (now - atTs) > 60000 && atTsBk != nowBk { + if (now-atTs) > 60000 && atTsBk != nowBk { // Because we don't have a cached preview, we'll use the current time as the preview time. // We also give a 60 second buffer so we don't cause an infinite loop (considering we're // calling ourselves), and to give a lenient opportunity for slow execution. diff --git a/docs/admin.md b/docs/admin.md index 9e45867c8e0c5f74fe4086d227b9a950f4cb77c4..09a221ad16f12b7e95a401dc29f884870c7cce5a 100644 --- a/docs/admin.md +++ b/docs/admin.md @@ -83,3 +83,89 @@ The response is the estimated amount of data being transferred: "total_bytes": 366601489 } ``` + +## Data usage for servers/users + +Individual servers and users can often hoard data in the media repository. These endpoints will tell you how much. These endpoints can only be called by repository admins - they are not available to admins of the homeservers. + +**Caution**: These endpoints may return *lots* of data. Making very specific requests is recommended. + +#### Per-server usage + +URL: `GET /_matrix/media/unstable/admin/usage/<server name>?access_token=your_access_token` + +The response is how much data the server is using: +```json +{ + "raw_bytes": { + "total": 1594009, + "media": 1392009, + "thumbnails": 202000 + }, + "raw_counts": { + "total": 7, + "media": 4, + "thumbnails": 3 + } +} +``` + +**Note**: The endpoint may return values which represent duplicated media across itself and other hosts. + +#### Per-user usage (all known users) + +URL: `GET /_matrix/media/unstable/admin/usage/<server name>/users?access_token=your_access_token` + +The response is how much data the server is using: +```json +{ + "@alice:example.org": { + "raw_bytes": { + "total": 1392009, + "media": 1392009 + }, + "raw_counts": { + "total": 4, + "media": 4 + }, + "uploaded": [ + "mxc://example.org/abc123", + "mxc://example.org/abc124", + "mxc://example.org/abc125" + ] + } +} +``` + +**Note**: The endpoint may return values which represent duplicated media across itself and other hosts. + +**Note**: Thumbnails are not associated with users and therefore are not included by this endpoint. + +#### Per-user usage (batch of users / single user) + +Use the same endpoint as above, but specifying one or more `?user_id=@alice:example.org` query parameters. Note that encoding the values may be required (not shown here). Users that are unknown to the media repo will not be returned. + +#### Per-upload usage (all uploads) + +URL: `GET /_matrix/media/unstable/admin/usage/<server name>/uploads?access_token=your_access_token` + +The response is how much data the server is using: +```json +{ + "mxc://example.org/abc123": { + "size_bytes": 102400, + "uploaded_by": "@alice:example.org", + "datastore_id": "def456", + "datastore_location": "/var/media-repo/ab/cd/12345", + "sha256_hash": "ghi789", + "quarantined": false, + "upload_name": "info.txt", + "content_type": "text/plain", + "created_ts": 1561514528225 + } +} +``` + +#### Per-upload usage (batch of uploads / single upload) + +Use the same endpoint as above, but specifying one or more `?mxc=mxc://example.org/abc123` query parameters. Note that encoding the values may be required (not shown here). diff --git a/migrations/9_origin_indexes_down.sql b/migrations/9_origin_indexes_down.sql new file mode 100644 index 0000000000000000000000000000000000000000..fee15b35dbebcf2082a4d084947ed688e63084e6 --- /dev/null +++ b/migrations/9_origin_indexes_down.sql @@ -0,0 +1,3 @@ +DROP INDEX IF EXISTS idx_origin_media; +DROP INDEX IF EXISTS idx_origin_thumbnails; +DROP INDEX IF EXISTS idx_origin_user_id_media; diff --git a/migrations/9_origin_indexes_up.sql b/migrations/9_origin_indexes_up.sql new file mode 100644 index 0000000000000000000000000000000000000000..326ef7090b142395596683d7d5abb42227567c01 --- /dev/null +++ b/migrations/9_origin_indexes_up.sql @@ -0,0 +1,3 @@ +CREATE INDEX IF NOT EXISTS idx_origin_media ON media(origin); +CREATE INDEX IF NOT EXISTS idx_origin_thumbnails ON thumbnails(origin); +CREATE INDEX IF NOT EXISTS idx_origin_user_id_media ON media(origin, user_id); diff --git a/storage/stores/media_store.go b/storage/stores/media_store.go index 376674b46e55e43ca18c5865b0993d4c88adf441..d2e86a2fbe0c25ba839bedcb039da74c6af5e58b 100644 --- a/storage/stores/media_store.go +++ b/storage/stores/media_store.go @@ -22,7 +22,10 @@ const selectDatastoreByUri = "SELECT datastore_id, ds_type, uri FROM datastores const insertDatastore = "INSERT INTO datastores (datastore_id, ds_type, uri) VALUES ($1, $2, $3);" const selectMediaWithoutDatastore = "SELECT origin, media_id, upload_name, content_type, user_id, sha256_hash, size_bytes, datastore_id, location, creation_ts, quarantined FROM media WHERE datastore_id IS NULL OR datastore_id = '';" const updateMediaDatastoreAndLocation = "UPDATE media SET location = $4, datastore_id = $3 WHERE origin = $1 AND media_id = $2;" -const selectAllDatastores = "SELECT datastore_id, ds_type, uri FROM datastores;"; +const selectAllDatastores = "SELECT datastore_id, ds_type, uri FROM datastores;" +const selectAllMediaForServer = "SELECT origin, media_id, upload_name, content_type, user_id, sha256_hash, size_bytes, datastore_id, location, creation_ts, quarantined FROM media WHERE origin = $1" +const selectAllMediaForServerUsers = "SELECT origin, media_id, upload_name, content_type, user_id, sha256_hash, size_bytes, datastore_id, location, creation_ts, quarantined FROM media WHERE origin = $1 AND user_id = ANY($2)" +const selectAllMediaForServerIds = "SELECT origin, media_id, upload_name, content_type, user_id, sha256_hash, size_bytes, datastore_id, location, creation_ts, quarantined FROM media WHERE origin = $1 AND media_id = ANY($2)" var dsCacheByPath = sync.Map{} // [string] => Datastore var dsCacheById = sync.Map{} // [string] => Datastore @@ -42,6 +45,9 @@ type mediaStoreStatements struct { updateMediaDatastoreAndLocation *sql.Stmt selectAllDatastores *sql.Stmt selectMediaInDatastoreOlderThan *sql.Stmt + selectAllMediaForServer *sql.Stmt + selectAllMediaForServerUsers *sql.Stmt + selectAllMediaForServerIds *sql.Stmt } type MediaStoreFactory struct { @@ -101,6 +107,15 @@ func InitMediaStore(sqlDb *sql.DB) (*MediaStoreFactory, error) { if store.stmts.selectAllDatastores, err = store.sqlDb.Prepare(selectAllDatastores); err != nil { return nil, err } + if store.stmts.selectAllMediaForServer, err = store.sqlDb.Prepare(selectAllMediaForServer); err != nil { + return nil, err + } + if store.stmts.selectAllMediaForServerUsers, err = store.sqlDb.Prepare(selectAllMediaForServerUsers); err != nil { + return nil, err + } + if store.stmts.selectAllMediaForServerIds, err = store.sqlDb.Prepare(selectAllMediaForServerIds); err != nil { + return nil, err + } return &store, nil } @@ -381,3 +396,96 @@ func (s *MediaStore) GetAllDatastores() ([]*types.Datastore, error) { return results, nil } + +func (s *MediaStore) GetAllMediaForServer(serverName string) ([]*types.Media, error) { + rows, err := s.statements.selectAllMediaForServer.QueryContext(s.ctx, serverName) + if err != nil { + return nil, err + } + + var results []*types.Media + for rows.Next() { + obj := &types.Media{} + err = rows.Scan( + &obj.Origin, + &obj.MediaId, + &obj.UploadName, + &obj.ContentType, + &obj.UserId, + &obj.Sha256Hash, + &obj.SizeBytes, + &obj.DatastoreId, + &obj.Location, + &obj.CreationTs, + &obj.Quarantined, + ) + if err != nil { + return nil, err + } + results = append(results, obj) + } + + return results, nil +} + +func (s *MediaStore) GetAllMediaForServerUsers(serverName string, userIds []string) ([]*types.Media, error) { + rows, err := s.statements.selectAllMediaForServerUsers.QueryContext(s.ctx, serverName, pq.Array(userIds)) + if err != nil { + return nil, err + } + + var results []*types.Media + for rows.Next() { + obj := &types.Media{} + err = rows.Scan( + &obj.Origin, + &obj.MediaId, + &obj.UploadName, + &obj.ContentType, + &obj.UserId, + &obj.Sha256Hash, + &obj.SizeBytes, + &obj.DatastoreId, + &obj.Location, + &obj.CreationTs, + &obj.Quarantined, + ) + if err != nil { + return nil, err + } + results = append(results, obj) + } + + return results, nil +} + +func (s *MediaStore) GetAllMediaInIds(serverName string, mediaIds []string) ([]*types.Media, error) { + rows, err := s.statements.selectAllMediaForServerIds.QueryContext(s.ctx, serverName, pq.Array(mediaIds)) + if err != nil { + return nil, err + } + + var results []*types.Media + for rows.Next() { + obj := &types.Media{} + err = rows.Scan( + &obj.Origin, + &obj.MediaId, + &obj.UploadName, + &obj.ContentType, + &obj.UserId, + &obj.Sha256Hash, + &obj.SizeBytes, + &obj.DatastoreId, + &obj.Location, + &obj.CreationTs, + &obj.Quarantined, + ) + if err != nil { + return nil, err + } + results = append(results, obj) + } + + return results, nil +} diff --git a/storage/stores/metadata_store.go b/storage/stores/metadata_store.go index 61347caaccd226568e49ea9e1ba4d3f6db280914..12a3cf32fa19107d06675fe9edddfb312ac298cc 100644 --- a/storage/stores/metadata_store.go +++ b/storage/stores/metadata_store.go @@ -18,6 +18,9 @@ const selectMediaLastAccessedBeforeInDatastore = "SELECT m.sha256_hash, m.size_b const selectThumbnailsLastAccessedBeforeInDatastore = "SELECT m.sha256_hash, m.size_bytes, m.datastore_id, m.location, m.creation_ts, a.last_access_ts FROM thumbnails AS m JOIN last_access AS a ON m.sha256_hash = a.sha256_hash WHERE a.last_access_ts < $1 AND m.datastore_id = $2" const changeDatastoreOfMediaHash = "UPDATE media SET datastore_id = $1, location = $2 WHERE sha256_hash = $3" const changeDatastoreOfThumbnailHash = "UPDATE thumbnails SET datastore_id = $1, location = $2 WHERE sha256_hash = $3" +const selectUploadCountsForServer = "SELECT COALESCE((SELECT COUNT(origin) FROM media WHERE origin = $1), 0) AS media, COALESCE((SELECT COUNT(origin) FROM thumbnails WHERE origin = $1), 0) AS thumbnails" +const selectUploadSizesForServer = "SELECT COALESCE((SELECT SUM(size_bytes) FROM media WHERE origin = $1), 0) AS media, COALESCE((SELECT SUM(size_bytes) FROM thumbnails WHERE origin = $1), 0) AS thumbnails" +const selectUsersForServer = "SELECT DISTINCT user_id FROM media WHERE origin = $1 AND user_id IS NOT NULL AND LENGTH(user_id) > 0" type metadataStoreStatements struct { upsertLastAccessed *sql.Stmt @@ -26,6 +29,9 @@ type metadataStoreStatements struct { selectThumbnailsLastAccessedBeforeInDatastore *sql.Stmt changeDatastoreOfMediaHash *sql.Stmt changeDatastoreOfThumbnailHash *sql.Stmt + selectUploadCountsForServer *sql.Stmt + selectUploadSizesForServer *sql.Stmt + selectUsersForServer *sql.Stmt } type MetadataStoreFactory struct { @@ -64,6 +70,15 @@ func InitMetadataStore(sqlDb *sql.DB) (*MetadataStoreFactory, error) { if store.stmts.changeDatastoreOfThumbnailHash, err = store.sqlDb.Prepare(changeDatastoreOfThumbnailHash); err != nil { return nil, err } + if store.stmts.selectUsersForServer, err = store.sqlDb.Prepare(selectUsersForServer); err != nil { + return nil, err + } + if store.stmts.selectUploadSizesForServer, err = store.sqlDb.Prepare(selectUploadSizesForServer); err != nil { + return nil, err + } + if store.stmts.selectUploadCountsForServer, err = store.sqlDb.Prepare(selectUploadCountsForServer); err != nil { + return nil, err + } return &store, nil } @@ -151,3 +166,48 @@ func (s *MetadataStore) GetOldThumbnailsInDatastore(datastoreId string, beforeTs return results, nil } + +func (s *MetadataStore) GetUsersForServer(serverName string) ([]string, error) { + rows, err := s.statements.selectUsersForServer.QueryContext(s.ctx, serverName) + if err != nil { + return nil, err + } + + results := make([]string, 0) + for rows.Next() { + v := "" + err = rows.Scan(&v) + if err != nil { + return nil, err + } + results = append(results, v) + } + + return results, nil +} + +func (s *MetadataStore) GetByteUsageForServer(serverName string) (int64, int64, error) { + row := s.statements.selectUploadSizesForServer.QueryRowContext(s.ctx, serverName) + + media := int64(0) + thumbs := int64(0) + err := row.Scan(&media, &thumbs) + if err != nil { + return 0, 0, err + } + + return media, thumbs, nil +} + +func (s *MetadataStore) GetCountUsageForServer(serverName string) (int64, int64, error) { + row := s.statements.selectUploadCountsForServer.QueryRowContext(s.ctx, serverName) + + media := int64(0) + thumbs := int64(0) + err := row.Scan(&media, &thumbs) + if err != nil { + return 0, 0, err + } + + return media, thumbs, nil +}