diff --git a/pipline/_steps/upload/lock.go b/pipline/_steps/upload/lock.go index 9aaa7529f38604242d0741e8a05eead829cbdbfa..6a9f12d2b584ec3c95e75817d058d8430ac29cd9 100644 --- a/pipline/_steps/upload/lock.go +++ b/pipline/_steps/upload/lock.go @@ -8,11 +8,28 @@ import ( "github.com/turt2live/matrix-media-repo/redislib" ) +const maxLockAttemptTime = 30 * time.Second + func LockForUpload(ctx rcontext.RequestContext, hash string) (func() error, error) { - mutex := redislib.GetMutex(hash, 5*time.Minute) + mutex := redislib.GetMutex(hash, 1*time.Minute) if mutex != nil { - if err := mutex.LockContext(ctx.Context); err != nil { - return nil, errors.New("failed to acquire upload lock: " + err.Error()) + attemptDoneAt := time.Now().Add(maxLockAttemptTime) + acquired := false + for !acquired { + if chErr := ctx.Context.Err(); chErr != nil { + return nil, chErr + } + if err := mutex.LockContext(ctx.Context); err != nil { + ctx.Log.Warn("failed to acquire upload lock") + if time.Now().After(attemptDoneAt) { + return nil, errors.New("failed to acquire upload lock: " + err.Error()) + } + } else { + acquired = true + } + } + if !acquired { + return nil, errors.New("failed to acquire upload lock: timeout") } return func() error { b, err := mutex.UnlockContext(ctx.Context)