Skip to content
Snippets Groups Projects
Commit ebbbebdb authored by Travis Ralston's avatar Travis Ralston
Browse files

Add an API to purge old media by access time

parent 069f5175
No related branches found
No related tags found
No related merge requests found
......@@ -120,6 +120,46 @@ func PurgeQuarantined(r *http.Request, log *logrus.Entry, user api.UserInfo) int
return &api.DoNotCacheResponse{Payload: map[string]interface{}{"purged": true, "affected": mxcs}}
}
func PurgeOldMedia(r *http.Request, log *logrus.Entry, user api.UserInfo) interface{} {
var err error
beforeTs := util.NowMillis()
beforeTsStr := r.URL.Query().Get("before_ts")
if beforeTsStr != "" {
beforeTs, err = strconv.ParseInt(beforeTsStr, 10, 64)
if err != nil {
return api.BadRequest("Error parsing before_ts: " + err.Error())
}
}
includeLocal := false
includeLocalStr := r.URL.Query().Get("include_local")
if includeLocalStr != "" {
includeLocal, err = strconv.ParseBool(includeLocalStr)
if err != nil {
return api.BadRequest("Error parsing include_local: " + err.Error())
}
}
log = log.WithFields(logrus.Fields{
"before_ts": beforeTs,
"include_local": includeLocal,
})
affected, err := maintenance_controller.PurgeOldMedia(beforeTs, includeLocal, r.Context(), log)
if err != nil {
log.Error("Error purging media: " + err.Error())
return api.InternalServerError("error purging media")
}
mxcs := make([]string, 0)
for _, a := range affected {
mxcs = append(mxcs, a.MxcUri())
}
return &api.DoNotCacheResponse{Payload: map[string]interface{}{"purged": true, "affected": mxcs}}
}
func PurgeUserMedia(r *http.Request, log *logrus.Entry, user api.UserInfo) interface{} {
isGlobalAdmin, isLocalAdmin := getPurgeRequestInfo(r, log, user)
if !isGlobalAdmin && !isLocalAdmin {
......
......@@ -36,6 +36,7 @@ func Init() {
purgeQuarantinedHandler := handler{api.AccessTokenRequiredRoute(custom.PurgeQuarantined), "purge_quarantined", counter, false}
purgeUserMediaHandler := handler{api.AccessTokenRequiredRoute(custom.PurgeUserMedia), "purge_user_media", counter, false}
purgeRoomHandler := handler{api.AccessTokenRequiredRoute(custom.PurgeRoomMedia), "purge_room_media", counter, false}
purgeOldHandler := handler{api.RepoAdminRoute(custom.PurgeOldMedia), "purge_old_media", counter, false}
quarantineHandler := handler{api.AccessTokenRequiredRoute(custom.QuarantineMedia), "quarantine_media", counter, false}
quarantineRoomHandler := handler{api.AccessTokenRequiredRoute(custom.QuarantineRoomMedia), "quarantine_room", counter, false}
quarantineUserHandler := handler{api.AccessTokenRequiredRoute(custom.QuarantineUserMedia), "quarantine_user", counter, false}
......@@ -74,6 +75,7 @@ func Init() {
routes["/_matrix/media/"+version+"/admin/purge/quarantined"] = route{"POST", purgeQuarantinedHandler}
routes["/_matrix/media/"+version+"/admin/purge/user/{userId:[^/]+}"] = route{"POST", purgeUserMediaHandler}
routes["/_matrix/media/"+version+"/admin/purge/room/{roomId:[^/]+}"] = route{"POST", purgeRoomHandler}
routes["/_matrix/media/"+version+"/admin/purge/old"] = route{"POST", purgeOldHandler}
routes["/_matrix/media/"+version+"/admin/room/{roomId:[^/]+}/quarantine"] = route{"POST", quarantineRoomHandler} // deprecated
routes["/_matrix/media/"+version+"/admin/quarantine/{server:[a-zA-Z0-9.:\\-_]+}/{mediaId:[a-zA-Z0-9.\\-_]+}"] = route{"POST", quarantineHandler}
routes["/_matrix/media/"+version+"/admin/quarantine/room/{roomId:[^/]+}"] = route{"POST", quarantineRoomHandler}
......
......@@ -281,6 +281,40 @@ func PurgeUserMedia(userId string, beforeTs int64, ctx context.Context, log *log
return records, nil
}
func PurgeOldMedia(beforeTs int64, includeLocal bool, ctx context.Context, log *logrus.Entry) ([]*types.Media, error) {
metadataDb := storage.GetDatabase().GetMetadataStore(ctx, log)
mediaDb := storage.GetDatabase().GetMediaStore(ctx, log)
oldHashes, err := metadataDb.GetOldMedia(beforeTs)
if err != nil {
return nil, err
}
purged := make([]*types.Media, 0)
for _, r := range oldHashes {
media, err := mediaDb.GetByHash(r.Sha256Hash)
if err != nil {
return nil, err
}
for _, m := range media {
if !includeLocal && util.IsServerOurs(m.Origin) {
continue
}
err = doPurge(m, ctx, log)
if err != nil {
return nil, err
}
purged = append(purged, m)
}
}
return purged, nil
}
func PurgeRoomMedia(mxcs []string, beforeTs int64, ctx context.Context, log *logrus.Entry) ([]*types.Media, error) {
mediaDb := storage.GetDatabase().GetMediaStore(ctx, log)
......
......@@ -42,6 +42,14 @@ URL: `POST /_matrix/media/unstable/admin/purge/room/<room id>?before_ts=12345678
This will delete all media known to that room, regardless of it being local or remote, before the timestamp specified. If called by a homeserver administrator, only media uploaded to their domain will be deleted.
#### Purge media that hasn't been accessed in a while
URL: `POST /_matrix/media/unstable/admin/purge/old?before_ts=1234567890&include_local=false&access_token=your_access_token` (`before_ts` is in milliseconds)
This will delete all media that hasn't been accessed since `before_ts` (defaults to 'now'). If `include_local` is `false` (the default), only remote media will be deleted.
This endpoint is only available to repository administrators.
## Quarantine media
The quarantine media API allows administrators to quarantine media that may not be appropriate for their server. Using this API will prevent the media from being downloaded any further. It will *not* delete the file from your storage though: that is a task left for the administrator.
......
......@@ -29,6 +29,7 @@ const updateBackgroundTask = "UPDATE background_tasks SET end_ts = $2 WHERE id =
const selectAllBackgroundTasks = "SELECT id, task, params, start_ts, end_ts FROM background_tasks"
const insertReservation = "INSERT INTO reserved_media (origin, media_id, reason) VALUES ($1, $2, $3);"
const selectReservation = "SELECT origin, media_id, reason FROM reserved_media WHERE origin = $1 AND media_id = $2;"
const selectMediaLastAccessed = "SELECT m.sha256_hash, m.size_bytes, m.datastore_id, m.location, m.creation_ts, a.last_access_ts FROM media AS m JOIN last_access AS a ON m.sha256_hash = a.sha256_hash WHERE a.last_access_ts < $1;"
type metadataStoreStatements struct {
upsertLastAccessed *sql.Stmt
......@@ -46,6 +47,7 @@ type metadataStoreStatements struct {
selectAllBackgroundTasks *sql.Stmt
insertReservation *sql.Stmt
selectReservation *sql.Stmt
selectMediaLastAccessed *sql.Stmt
}
type MetadataStoreFactory struct {
......@@ -111,6 +113,9 @@ func InitMetadataStore(sqlDb *sql.DB) (*MetadataStoreFactory, error) {
if store.stmts.selectReservation, err = store.sqlDb.Prepare(selectReservation); err != nil {
return nil, err
}
if store.stmts.selectMediaLastAccessed, err = store.sqlDb.Prepare(selectMediaLastAccessed); err != nil {
return nil, err
}
return &store, nil
}
......@@ -147,6 +152,32 @@ func (s *MetadataStore) GetEstimatedSizeOfDatastore(datastoreId string) (int64,
return r.Size, err
}
func (s *MetadataStore) GetOldMedia(beforeTs int64) ([]*types.MinimalMediaMetadata, error) {
rows, err := s.statements.selectMediaLastAccessed.QueryContext(s.ctx, beforeTs)
if err != nil {
return nil, err
}
var results []*types.MinimalMediaMetadata
for rows.Next() {
obj := &types.MinimalMediaMetadata{}
err = rows.Scan(
&obj.Sha256Hash,
&obj.SizeBytes,
&obj.DatastoreId,
&obj.Location,
&obj.CreationTs,
&obj.LastAccessTs,
)
if err != nil {
return nil, err
}
results = append(results, obj)
}
return results, nil
}
func (s *MetadataStore) GetOldMediaInDatastore(datastoreId string, beforeTs int64) ([]*types.MinimalMediaMetadata, error) {
rows, err := s.statements.selectMediaLastAccessedBeforeInDatastore.QueryContext(s.ctx, beforeTs, datastoreId)
if err != nil {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment