Skip to content
Snippets Groups Projects
Commit a8b142cf authored by Travis Ralston's avatar Travis Ralston
Browse files

Merge branch 'travis/usage-api'

parents 8796cc24 b9cacba2
No related branches found
No related tags found
No related merge requests found
Showing
with 535 additions and 19 deletions
......@@ -6,7 +6,7 @@ jobs:
working_directory: /go/src/github.com/turt2live/matrix-media-repo
steps:
- checkout
- run:
- run:
name: build binaries
command: 'GOBIN=$PWD/`dirname $0`/bin go install -v ./cmd/...'
- store_artifacts:
......
......@@ -4,6 +4,7 @@ import (
"net/http"
"github.com/sirupsen/logrus"
"github.com/turt2live/matrix-media-repo/common/config"
"github.com/turt2live/matrix-media-repo/matrix"
"github.com/turt2live/matrix-media-repo/util"
)
......@@ -61,7 +62,7 @@ func AccessTokenOptionalRoute(next func(r *http.Request, log *logrus.Entry, user
}
func RepoAdminRoute(next func(r *http.Request, log *logrus.Entry, user UserInfo) interface{}) func(*http.Request, *logrus.Entry) interface{} {
return AccessTokenRequiredRoute(func(r *http.Request, log *logrus.Entry, user UserInfo) interface{} {
regularFunc := AccessTokenRequiredRoute(func(r *http.Request, log *logrus.Entry, user UserInfo) interface{} {
if user.UserId == "" {
log.Warn("Could not identify user for this admin route")
return AuthFailed()
......@@ -74,4 +75,17 @@ func RepoAdminRoute(next func(r *http.Request, log *logrus.Entry, user UserInfo)
log = log.WithFields(logrus.Fields{"isRepoAdmin": true})
return next(r, log, user)
})
return func(r *http.Request, log *logrus.Entry) interface{} {
if config.Get().SharedSecret.Enabled {
accessToken := util.GetAccessTokenFromRequest(r)
if accessToken == config.Get().SharedSecret.Token {
log = log.WithFields(logrus.Fields{"isRepoAdmin": true})
log.Info("User authed using shared secret")
return next(r, log, UserInfo{UserId: "@sharedsecret", AccessToken: accessToken})
}
}
return regularFunc(r, log)
}
}
......@@ -29,7 +29,7 @@ func GetDatastores(r *http.Request, log *logrus.Entry, user api.UserInfo) interf
response[ds.DatastoreId] = dsMap
}
return response
return &api.DoNotCacheResponse{Payload: response}
}
func MigrateBetweenDatastores(r *http.Request, log *logrus.Entry, user api.UserInfo) interface{} {
......@@ -79,7 +79,7 @@ func MigrateBetweenDatastores(r *http.Request, log *logrus.Entry, user api.UserI
return api.InternalServerError("Unexpected error getting storage estimate")
}
return estimate
return &api.DoNotCacheResponse{Payload: estimate}
}
func GetDatastoreStorageEstimate(r *http.Request, log *logrus.Entry, user api.UserInfo) interface{} {
......@@ -107,5 +107,5 @@ func GetDatastoreStorageEstimate(r *http.Request, log *logrus.Entry, user api.Us
log.Error(err)
return api.InternalServerError("Unexpected error getting storage estimate")
}
return result
return &api.DoNotCacheResponse{Payload: result}
}
......@@ -50,5 +50,5 @@ func GetFederationInfo(r *http.Request, log *logrus.Entry, user api.UserInfo) in
resp["base_url"] = url
resp["hostname"] = hostname
resp["versions_response"] = out
return resp
return &api.DoNotCacheResponse{Payload: resp}
}
......@@ -13,8 +13,10 @@ type HealthzResponse struct {
}
func GetHealthz(r *http.Request, log *logrus.Entry, user api.UserInfo) interface{} {
return &HealthzResponse{
OK: true,
Status: "Probably not dead",
return &api.DoNotCacheResponse{
Payload: &HealthzResponse{
OK: true,
Status: "Probably not dead",
},
}
}
......@@ -34,7 +34,5 @@ func PurgeRemoteMedia(r *http.Request, log *logrus.Entry, user api.UserInfo) int
return api.InternalServerError("Error purging remote media")
}
return &MediaPurgedResponse{
NumRemoved: removed,
}
return &api.DoNotCacheResponse{Payload: &MediaPurgedResponse{NumRemoved: removed}}
}
......@@ -69,7 +69,7 @@ func QuarantineRoomMedia(r *http.Request, log *logrus.Entry, user api.UserInfo)
total += resp.(*MediaQuarantinedResponse).NumQuarantined
}
return &MediaQuarantinedResponse{NumQuarantined: total}
return &api.DoNotCacheResponse{Payload: &MediaQuarantinedResponse{NumQuarantined: total}}
}
func QuarantineMedia(r *http.Request, log *logrus.Entry, user api.UserInfo) interface{} {
......@@ -94,7 +94,7 @@ func QuarantineMedia(r *http.Request, log *logrus.Entry, user api.UserInfo) inte
}
resp, _ := doQuarantine(r.Context(), log, server, mediaId, allowOtherHosts)
return resp
return &api.DoNotCacheResponse{Payload: resp}
}
func doQuarantine(ctx context.Context, log *logrus.Entry, origin string, mediaId string, allowOtherHosts bool) (interface{}, bool) {
......
package custom
import (
"net/http"
"github.com/gorilla/mux"
"github.com/sirupsen/logrus"
"github.com/turt2live/matrix-media-repo/api"
"github.com/turt2live/matrix-media-repo/storage"
"github.com/turt2live/matrix-media-repo/types"
"github.com/turt2live/matrix-media-repo/util"
)
type MinimalUsageInfo struct {
Total int64 `json:"total"`
Media int64 `json:"media"`
}
type UsageInfo struct {
*MinimalUsageInfo
Thumbnails int64 `json:"thumbnails"`
}
type CountsUsageResponse struct {
RawBytes *UsageInfo `json:"raw_bytes"`
RawCounts *UsageInfo `json:"raw_counts"`
}
type UserUsageEntry struct {
RawBytes *MinimalUsageInfo `json:"raw_bytes"`
RawCounts *MinimalUsageInfo `json:"raw_counts"`
UploadedMxcs []string `json:"uploaded,flow"`
}
type MediaUsageEntry struct {
SizeBytes int64 `json:"size_bytes"`
UploadedBy string `json:"uploaded_by"`
DatastoreId string `json:"datastore_id"`
DatastoreLocation string `json:"datastore_location"`
Sha256Hash string `json:"sha256_hash"`
Quarantined bool `json:"quarantined"`
UploadName string `json:"upload_name"`
ContentType string `json:"content_type"`
CreatedTs int64 `json:"created_ts"`
}
func GetDomainUsage(r *http.Request, log *logrus.Entry, user api.UserInfo) interface{} {
params := mux.Vars(r)
serverName := params["serverName"]
log = log.WithFields(logrus.Fields{
"serverName": serverName,
})
db := storage.GetDatabase().GetMetadataStore(r.Context(), log)
mediaBytes, thumbBytes, err := db.GetByteUsageForServer(serverName)
if err != nil {
log.Error(err)
return api.InternalServerError("Failed to get byte usage for server")
}
mediaCount, thumbCount, err := db.GetCountUsageForServer(serverName)
if err != nil {
log.Error(err)
return api.InternalServerError("Failed to get count usage for server")
}
return &api.DoNotCacheResponse{
Payload: &CountsUsageResponse{
RawBytes: &UsageInfo{
MinimalUsageInfo: &MinimalUsageInfo{
Total: mediaBytes + thumbBytes,
Media: mediaBytes,
},
Thumbnails: thumbBytes,
},
RawCounts: &UsageInfo{
MinimalUsageInfo: &MinimalUsageInfo{
Total: mediaCount + thumbCount,
Media: mediaCount,
},
Thumbnails: thumbCount,
},
},
}
}
func GetUserUsage(r *http.Request, log *logrus.Entry, user api.UserInfo) interface{} {
params := mux.Vars(r)
serverName := params["serverName"]
userIds := r.URL.Query()["user_id"]
log = log.WithFields(logrus.Fields{
"serverName": serverName,
})
db := storage.GetDatabase().GetMediaStore(r.Context(), log)
var records []*types.Media
var err error
if userIds == nil || len(userIds) == 0 {
records, err = db.GetAllMediaForServer(serverName)
} else {
records, err = db.GetAllMediaForServerUsers(serverName, userIds)
}
if err != nil {
log.Error(err)
return api.InternalServerError("Failed to get media records for users")
}
parsed := make(map[string]*UserUsageEntry)
for _, media := range records {
var entry *UserUsageEntry
var ok bool
if entry, ok = parsed[media.UserId]; !ok {
entry = &UserUsageEntry{
UploadedMxcs: make([]string, 0),
RawCounts: &MinimalUsageInfo{
Total: 0,
Media: 0,
},
RawBytes: &MinimalUsageInfo{
Total: 0,
Media: 0,
},
}
parsed[media.UserId] = entry
}
entry.RawBytes.Total += media.SizeBytes
entry.RawBytes.Media += media.SizeBytes
entry.RawCounts.Total += 1
entry.RawCounts.Media += 1
entry.UploadedMxcs = append(entry.UploadedMxcs, media.MxcUri())
}
return &api.DoNotCacheResponse{Payload: parsed}
}
func GetUploadsUsage(r *http.Request, log *logrus.Entry, user api.UserInfo) interface{} {
params := mux.Vars(r)
serverName := params["serverName"]
mxcs := r.URL.Query()["mxc"]
log = log.WithFields(logrus.Fields{
"serverName": serverName,
})
db := storage.GetDatabase().GetMediaStore(r.Context(), log)
var records []*types.Media
var err error
if mxcs == nil || len(mxcs) == 0 {
records, err = db.GetAllMediaForServer(serverName)
} else {
split := make([]string, 0)
for _, mxc := range mxcs {
o, i, err := util.SplitMxc(mxc)
if err != nil {
log.Error(err)
return api.InternalServerError("Error parsing MXC " + mxc)
}
if o != serverName {
return api.BadRequest("MXC URIs must match the requested server")
}
split = append(split, i)
}
records, err = db.GetAllMediaInIds(serverName, split)
}
if err != nil {
log.Error(err)
return api.InternalServerError("Failed to get media records for users")
}
parsed := make(map[string]*MediaUsageEntry)
for _, media := range records {
parsed[media.MxcUri()] = &MediaUsageEntry{
SizeBytes: media.SizeBytes,
UploadName: media.UploadName,
ContentType: media.ContentType,
CreatedTs: media.CreationTs,
DatastoreId: media.DatastoreId,
DatastoreLocation: media.Location,
Quarantined: media.Quarantined,
Sha256Hash: media.Sha256Hash,
UploadedBy: media.UserId,
}
}
return &api.DoNotCacheResponse{Payload: parsed}
}
......@@ -4,6 +4,10 @@ import "github.com/turt2live/matrix-media-repo/common"
type EmptyResponse struct{}
type DoNotCacheResponse struct {
Payload interface{}
}
type ErrorResponse struct {
Code string `json:"errcode"`
Message string `json:"error"`
......
......@@ -62,7 +62,6 @@ func (h handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS")
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Content-Security-Policy", "sandbox; default-src 'none'; script-src 'none'; plugin-types application/pdf; style-src 'unsafe-inline'; media-src 'self'; object-src 'self';")
w.Header().Set("Cache-Control", "public,max-age=86400,s-maxage=86400")
w.Header().Set("Server", "matrix-media-repo")
// Process response
......@@ -89,6 +88,15 @@ func (h handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
res = api.InternalServerError("Error processing response")
}
switch result := res.(type) {
case *api.DoNotCacheResponse:
res = result.Payload
break
default:
w.Header().Set("Cache-Control", "public,max-age=86400,s-maxage=86400")
break
}
contextLog.Info(fmt.Sprintf("Replying with result: %T %+v", res, res))
statusCode := http.StatusOK
......
......@@ -42,6 +42,9 @@ func Init() {
dsTransferHandler := handler{api.RepoAdminRoute(custom.MigrateBetweenDatastores), "datastore_transfer", counter, false}
fedTestHandler := handler{api.RepoAdminRoute(custom.GetFederationInfo), "federation_test", counter, false}
healthzHandler := handler{api.AccessTokenOptionalRoute(custom.GetHealthz), "healthz", counter, true}
domainUsageHandler := handler{api.RepoAdminRoute(custom.GetDomainUsage), "domain_usage", counter, false}
userUsageHandler := handler{api.RepoAdminRoute(custom.GetUserUsage), "user_usage", counter, false}
uploadsUsageHandler := handler{api.RepoAdminRoute(custom.GetUploadsUsage), "uploads_usage", counter, false}
routes := make(map[string]route)
versions := []string{"r0", "v1", "unstable"} // r0 is typically clients and v1 is typically servers. v1 is deprecated.
......@@ -64,6 +67,9 @@ func Init() {
routes["/_matrix/media/"+version+"/admin/datastores"] = route{"GET", datastoreListHandler}
routes["/_matrix/media/"+version+"/admin/datastores/{sourceDsId:[^/]+}/transfer_to/{targetDsId:[^/]+}"] = route{"POST", dsTransferHandler}
routes["/_matrix/media/"+version+"/admin/federation/test/{serverName:[a-zA-Z0-9.:\\-_]+}"] = route{"GET", fedTestHandler}
routes["/_matrix/media/"+version+"/admin/usage/{serverName:[a-zA-Z0-9.:\\-_]+}"] = route{"GET", domainUsageHandler}
routes["/_matrix/media/"+version+"/admin/usage/{serverName:[a-zA-Z0-9.:\\-_]+}/users"] = route{"GET", userUsageHandler}
routes["/_matrix/media/"+version+"/admin/usage/{serverName:[a-zA-Z0-9.:\\-_]+}/uploads"] = route{"GET", uploadsUsageHandler}
// Routes that we should handle but aren't in the media namespace (synapse compat)
routes["/_matrix/client/"+version+"/admin/purge_media_cache"] = route{"POST", purgeHandler}
......
......@@ -5,7 +5,7 @@ clone_folder: c:\gopath\src\github.com\turt2live\matrix-media-repo
environment:
GOPATH: c:\gopath
GOVERSION: 1.12
branches:
only: [master, develop]
......
......@@ -128,6 +128,11 @@ type MetricsConfig struct {
Port int `yaml:"port"`
}
type SharedSecretConfig struct {
Enabled bool `yaml:"enabled"`
Token string `yaml:"token"`
}
type MediaRepoConfig struct {
General *GeneralConfig `yaml:"repo"`
Homeservers []*HomeserverConfig `yaml:"homeservers,flow"`
......@@ -143,6 +148,7 @@ type MediaRepoConfig struct {
Quarantine *QuarantineConfig `yaml:"quarantine"`
TimeoutSeconds *TimeoutsConfig `yaml:"timeouts"`
Metrics *MetricsConfig `yaml:"metrics"`
SharedSecret *SharedSecretConfig `yaml:"sharedSecretAuth"`
}
var instance *MediaRepoConfig
......@@ -313,5 +319,9 @@ func NewDefaultConfig() *MediaRepoConfig {
BindAddress: "localhost",
Port: 9000,
},
SharedSecret: &SharedSecretConfig{
Enabled: false,
Token: "ReplaceMe",
},
}
}
......@@ -36,6 +36,18 @@ homeservers:
admins:
- "@your_username:example.org"
# Shared secret auth is useful for applications building on top of the media repository, such
# as a management interface. The `token` provided here is treated as a repository administrator
# when shared secret auth is enabled: if the `token` is used in place of an access token, the'
# request will be authorized. This is not limited to any particular domain, giving applications
# the ability to use it on any configured hostname.
sharedSecretAuth:
# Set this to true to enable shared secret auth.
enabled: false
# Use a secure value here to prevent unauthorized access to the media repository.
token: "PutSomeRandomSecureValueHere"
# Datastores are places where media should be persisted. This isn't dedicated for just uploads:
# thumbnails and other misc data is also stored in these places. When the media repo is looking
# to store new media (such as user uploads, thumbnails, etc) it will look for a datastore which
......@@ -87,7 +99,7 @@ uploads:
# placed anywhere to match everything (eg: "image/*" matches all images). This will also
# restrict which file types are downloaded from remote servers.
allowedTypes:
- "*/*"
- "*/*"
# Specific users can have their own set of allowed file types. These are applied instead
# of those listed in the allowedTypes list when a user is found. Much like allowedTypes,
......
......@@ -40,7 +40,7 @@ func GetPreview(urlStr string, onHost string, forUserId string, atTs int64, ctx
now := util.NowMillis()
atTsBk := stores.GetBucketTs(atTs)
nowBk := stores.GetBucketTs(now)
if (now - atTs) > 60000 && atTsBk != nowBk {
if (now-atTs) > 60000 && atTsBk != nowBk {
// Because we don't have a cached preview, we'll use the current time as the preview time.
// We also give a 60 second buffer so we don't cause an infinite loop (considering we're
// calling ourselves), and to give a lenient opportunity for slow execution.
......
......@@ -83,3 +83,89 @@ The response is the estimated amount of data being transferred:
"total_bytes": 366601489
}
```
## Data usage for servers/users
Individual servers and users can often hoard data in the media repository. These endpoints will tell you how much. These endpoints can only be called by repository admins - they are not available to admins of the homeservers.
**Caution**: These endpoints may return *lots* of data. Making very specific requests is recommended.
#### Per-server usage
URL: `GET /_matrix/media/unstable/admin/usage/<server name>?access_token=your_access_token`
The response is how much data the server is using:
```json
{
"raw_bytes": {
"total": 1594009,
"media": 1392009,
"thumbnails": 202000
},
"raw_counts": {
"total": 7,
"media": 4,
"thumbnails": 3
}
}
```
**Note**: The endpoint may return values which represent duplicated media across itself and other hosts.
#### Per-user usage (all known users)
URL: `GET /_matrix/media/unstable/admin/usage/<server name>/users?access_token=your_access_token`
The response is how much data the server is using:
```json
{
"@alice:example.org": {
"raw_bytes": {
"total": 1392009,
"media": 1392009
},
"raw_counts": {
"total": 4,
"media": 4
},
"uploaded": [
"mxc://example.org/abc123",
"mxc://example.org/abc124",
"mxc://example.org/abc125"
]
}
}
```
**Note**: The endpoint may return values which represent duplicated media across itself and other hosts.
**Note**: Thumbnails are not associated with users and therefore are not included by this endpoint.
#### Per-user usage (batch of users / single user)
Use the same endpoint as above, but specifying one or more `?user_id=@alice:example.org` query parameters. Note that encoding the values may be required (not shown here). Users that are unknown to the media repo will not be returned.
#### Per-upload usage (all uploads)
URL: `GET /_matrix/media/unstable/admin/usage/<server name>/uploads?access_token=your_access_token`
The response is how much data the server is using:
```json
{
"mxc://example.org/abc123": {
"size_bytes": 102400,
"uploaded_by": "@alice:example.org",
"datastore_id": "def456",
"datastore_location": "/var/media-repo/ab/cd/12345",
"sha256_hash": "ghi789",
"quarantined": false,
"upload_name": "info.txt",
"content_type": "text/plain",
"created_ts": 1561514528225
}
}
```
#### Per-upload usage (batch of uploads / single upload)
Use the same endpoint as above, but specifying one or more `?mxc=mxc://example.org/abc123` query parameters. Note that encoding the values may be required (not shown here).
DROP INDEX IF EXISTS idx_origin_media;
DROP INDEX IF EXISTS idx_origin_thumbnails;
DROP INDEX IF EXISTS idx_origin_user_id_media;
CREATE INDEX IF NOT EXISTS idx_origin_media ON media(origin);
CREATE INDEX IF NOT EXISTS idx_origin_thumbnails ON thumbnails(origin);
CREATE INDEX IF NOT EXISTS idx_origin_user_id_media ON media(origin, user_id);
......@@ -22,7 +22,10 @@ const selectDatastoreByUri = "SELECT datastore_id, ds_type, uri FROM datastores
const insertDatastore = "INSERT INTO datastores (datastore_id, ds_type, uri) VALUES ($1, $2, $3);"
const selectMediaWithoutDatastore = "SELECT origin, media_id, upload_name, content_type, user_id, sha256_hash, size_bytes, datastore_id, location, creation_ts, quarantined FROM media WHERE datastore_id IS NULL OR datastore_id = '';"
const updateMediaDatastoreAndLocation = "UPDATE media SET location = $4, datastore_id = $3 WHERE origin = $1 AND media_id = $2;"
const selectAllDatastores = "SELECT datastore_id, ds_type, uri FROM datastores;";
const selectAllDatastores = "SELECT datastore_id, ds_type, uri FROM datastores;"
const selectAllMediaForServer = "SELECT origin, media_id, upload_name, content_type, user_id, sha256_hash, size_bytes, datastore_id, location, creation_ts, quarantined FROM media WHERE origin = $1"
const selectAllMediaForServerUsers = "SELECT origin, media_id, upload_name, content_type, user_id, sha256_hash, size_bytes, datastore_id, location, creation_ts, quarantined FROM media WHERE origin = $1 AND user_id = ANY($2)"
const selectAllMediaForServerIds = "SELECT origin, media_id, upload_name, content_type, user_id, sha256_hash, size_bytes, datastore_id, location, creation_ts, quarantined FROM media WHERE origin = $1 AND media_id = ANY($2)"
var dsCacheByPath = sync.Map{} // [string] => Datastore
var dsCacheById = sync.Map{} // [string] => Datastore
......@@ -42,6 +45,9 @@ type mediaStoreStatements struct {
updateMediaDatastoreAndLocation *sql.Stmt
selectAllDatastores *sql.Stmt
selectMediaInDatastoreOlderThan *sql.Stmt
selectAllMediaForServer *sql.Stmt
selectAllMediaForServerUsers *sql.Stmt
selectAllMediaForServerIds *sql.Stmt
}
type MediaStoreFactory struct {
......@@ -101,6 +107,15 @@ func InitMediaStore(sqlDb *sql.DB) (*MediaStoreFactory, error) {
if store.stmts.selectAllDatastores, err = store.sqlDb.Prepare(selectAllDatastores); err != nil {
return nil, err
}
if store.stmts.selectAllMediaForServer, err = store.sqlDb.Prepare(selectAllMediaForServer); err != nil {
return nil, err
}
if store.stmts.selectAllMediaForServerUsers, err = store.sqlDb.Prepare(selectAllMediaForServerUsers); err != nil {
return nil, err
}
if store.stmts.selectAllMediaForServerIds, err = store.sqlDb.Prepare(selectAllMediaForServerIds); err != nil {
return nil, err
}
return &store, nil
}
......@@ -381,3 +396,96 @@ func (s *MediaStore) GetAllDatastores() ([]*types.Datastore, error) {
return results, nil
}
func (s *MediaStore) GetAllMediaForServer(serverName string) ([]*types.Media, error) {
rows, err := s.statements.selectAllMediaForServer.QueryContext(s.ctx, serverName)
if err != nil {
return nil, err
}
var results []*types.Media
for rows.Next() {
obj := &types.Media{}
err = rows.Scan(
&obj.Origin,
&obj.MediaId,
&obj.UploadName,
&obj.ContentType,
&obj.UserId,
&obj.Sha256Hash,
&obj.SizeBytes,
&obj.DatastoreId,
&obj.Location,
&obj.CreationTs,
&obj.Quarantined,
)
if err != nil {
return nil, err
}
results = append(results, obj)
}
return results, nil
}
func (s *MediaStore) GetAllMediaForServerUsers(serverName string, userIds []string) ([]*types.Media, error) {
rows, err := s.statements.selectAllMediaForServerUsers.QueryContext(s.ctx, serverName, pq.Array(userIds))
if err != nil {
return nil, err
}
var results []*types.Media
for rows.Next() {
obj := &types.Media{}
err = rows.Scan(
&obj.Origin,
&obj.MediaId,
&obj.UploadName,
&obj.ContentType,
&obj.UserId,
&obj.Sha256Hash,
&obj.SizeBytes,
&obj.DatastoreId,
&obj.Location,
&obj.CreationTs,
&obj.Quarantined,
)
if err != nil {
return nil, err
}
results = append(results, obj)
}
return results, nil
}
func (s *MediaStore) GetAllMediaInIds(serverName string, mediaIds []string) ([]*types.Media, error) {
rows, err := s.statements.selectAllMediaForServerIds.QueryContext(s.ctx, serverName, pq.Array(mediaIds))
if err != nil {
return nil, err
}
var results []*types.Media
for rows.Next() {
obj := &types.Media{}
err = rows.Scan(
&obj.Origin,
&obj.MediaId,
&obj.UploadName,
&obj.ContentType,
&obj.UserId,
&obj.Sha256Hash,
&obj.SizeBytes,
&obj.DatastoreId,
&obj.Location,
&obj.CreationTs,
&obj.Quarantined,
)
if err != nil {
return nil, err
}
results = append(results, obj)
}
return results, nil
}
......@@ -18,6 +18,9 @@ const selectMediaLastAccessedBeforeInDatastore = "SELECT m.sha256_hash, m.size_b
const selectThumbnailsLastAccessedBeforeInDatastore = "SELECT m.sha256_hash, m.size_bytes, m.datastore_id, m.location, m.creation_ts, a.last_access_ts FROM thumbnails AS m JOIN last_access AS a ON m.sha256_hash = a.sha256_hash WHERE a.last_access_ts < $1 AND m.datastore_id = $2"
const changeDatastoreOfMediaHash = "UPDATE media SET datastore_id = $1, location = $2 WHERE sha256_hash = $3"
const changeDatastoreOfThumbnailHash = "UPDATE thumbnails SET datastore_id = $1, location = $2 WHERE sha256_hash = $3"
const selectUploadCountsForServer = "SELECT COALESCE((SELECT COUNT(origin) FROM media WHERE origin = $1), 0) AS media, COALESCE((SELECT COUNT(origin) FROM thumbnails WHERE origin = $1), 0) AS thumbnails"
const selectUploadSizesForServer = "SELECT COALESCE((SELECT SUM(size_bytes) FROM media WHERE origin = $1), 0) AS media, COALESCE((SELECT SUM(size_bytes) FROM thumbnails WHERE origin = $1), 0) AS thumbnails"
const selectUsersForServer = "SELECT DISTINCT user_id FROM media WHERE origin = $1 AND user_id IS NOT NULL AND LENGTH(user_id) > 0"
type metadataStoreStatements struct {
upsertLastAccessed *sql.Stmt
......@@ -26,6 +29,9 @@ type metadataStoreStatements struct {
selectThumbnailsLastAccessedBeforeInDatastore *sql.Stmt
changeDatastoreOfMediaHash *sql.Stmt
changeDatastoreOfThumbnailHash *sql.Stmt
selectUploadCountsForServer *sql.Stmt
selectUploadSizesForServer *sql.Stmt
selectUsersForServer *sql.Stmt
}
type MetadataStoreFactory struct {
......@@ -64,6 +70,15 @@ func InitMetadataStore(sqlDb *sql.DB) (*MetadataStoreFactory, error) {
if store.stmts.changeDatastoreOfThumbnailHash, err = store.sqlDb.Prepare(changeDatastoreOfThumbnailHash); err != nil {
return nil, err
}
if store.stmts.selectUsersForServer, err = store.sqlDb.Prepare(selectUsersForServer); err != nil {
return nil, err
}
if store.stmts.selectUploadSizesForServer, err = store.sqlDb.Prepare(selectUploadSizesForServer); err != nil {
return nil, err
}
if store.stmts.selectUploadCountsForServer, err = store.sqlDb.Prepare(selectUploadCountsForServer); err != nil {
return nil, err
}
return &store, nil
}
......@@ -151,3 +166,48 @@ func (s *MetadataStore) GetOldThumbnailsInDatastore(datastoreId string, beforeTs
return results, nil
}
func (s *MetadataStore) GetUsersForServer(serverName string) ([]string, error) {
rows, err := s.statements.selectUsersForServer.QueryContext(s.ctx, serverName)
if err != nil {
return nil, err
}
results := make([]string, 0)
for rows.Next() {
v := ""
err = rows.Scan(&v)
if err != nil {
return nil, err
}
results = append(results, v)
}
return results, nil
}
func (s *MetadataStore) GetByteUsageForServer(serverName string) (int64, int64, error) {
row := s.statements.selectUploadSizesForServer.QueryRowContext(s.ctx, serverName)
media := int64(0)
thumbs := int64(0)
err := row.Scan(&media, &thumbs)
if err != nil {
return 0, 0, err
}
return media, thumbs, nil
}
func (s *MetadataStore) GetCountUsageForServer(serverName string) (int64, int64, error) {
row := s.statements.selectUploadCountsForServer.QueryRowContext(s.ctx, serverName)
media := int64(0)
thumbs := int64(0)
err := row.Scan(&media, &thumbs)
if err != nil {
return 0, 0, err
}
return media, thumbs, nil
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment