Skip to content
Snippets Groups Projects
Commit d856942b authored by Travis Ralston's avatar Travis Ralston
Browse files

Revert "Be slightly more responsible with channels in download/thumbnails"

This reverts commit 73dfdc84.
parent 73dfdc84
No related branches found
No related tags found
No related merge requests found
......@@ -27,13 +27,12 @@ func GetUploadWaitChannel(origin string, mediaId string) (<-chan *database.DbMed
localUploadWaiters[mxc] = make([]chan *database.DbMedia, 0)
}
ch := make(chan *database.DbMedia, 1)
ch := make(chan *database.DbMedia)
localUploadWaiters[mxc] = append(localUploadWaiters[mxc], ch)
finishFn := func() {
uploadMutex.Lock()
defer uploadMutex.Unlock()
defer close(ch)
if arr, ok := localUploadWaiters[mxc]; ok {
newArr := make([]chan *database.DbMedia, 0)
......@@ -44,6 +43,8 @@ func GetUploadWaitChannel(origin string, mediaId string) (<-chan *database.DbMed
}
localUploadWaiters[mxc] = newArr
}
close(ch)
}
return ch, finishFn
......
......@@ -15,6 +15,7 @@ import (
"github.com/turt2live/matrix-media-repo/pipelines/_steps/download"
"github.com/turt2live/matrix-media-repo/pipelines/_steps/meta"
"github.com/turt2live/matrix-media-repo/pipelines/_steps/quarantine"
"github.com/turt2live/matrix-media-repo/util/readers"
)
var sf = new(sfstreams.Group)
......@@ -34,11 +35,11 @@ func (o DownloadOpts) String() string {
func Execute(ctx rcontext.RequestContext, origin string, mediaId string, opts DownloadOpts) (*database.DbMedia, io.ReadCloser, error) {
// Step 1: Make our context a timeout context
var cancel context.CancelFunc
//goland:noinspection GoVetLostCancel - we handle the function in our custom cancelCloser struct
ctx.Context, cancel = context.WithTimeout(ctx.Context, opts.BlockForReadUntil)
defer cancel()
// Step 2: Join the singleflight queue
recordCh := make(chan *database.DbMedia, 1)
recordCh := make(chan *database.DbMedia)
defer close(recordCh)
r, err, _ := sf.Do(fmt.Sprintf("%s/%s?%s", origin, mediaId, opts.String()), func() (io.ReadCloser, error) {
serveRecord := func(recordCh chan *database.DbMedia, record *database.DbMedia) {
......@@ -103,9 +104,11 @@ func Execute(ctx rcontext.RequestContext, origin string, mediaId string, opts Do
return r, nil
})
if errors.Is(err, common.ErrMediaQuarantined) {
cancel()
return nil, r, err
}
if err != nil {
cancel()
return nil, nil, err
}
record := <-recordCh
......@@ -116,7 +119,8 @@ func Execute(ctx rcontext.RequestContext, origin string, mediaId string, opts Do
sentry.CaptureException(devErr)
r.Close()
}
cancel()
return record, nil, nil
}
return record, r, nil
return record, readers.NewCancelCloser(r, cancel), nil
}
......@@ -15,6 +15,7 @@ import (
"github.com/turt2live/matrix-media-repo/pipelines/_steps/quarantine"
"github.com/turt2live/matrix-media-repo/pipelines/_steps/thumbnails"
"github.com/turt2live/matrix-media-repo/pipelines/pipeline_download"
"github.com/turt2live/matrix-media-repo/util/readers"
)
var sf = new(sfstreams.Group)
......@@ -56,11 +57,11 @@ func Execute(ctx rcontext.RequestContext, origin string, mediaId string, opts Th
// Step 2: Make our context a timeout context
var cancel context.CancelFunc
//goland:noinspection GoVetLostCancel - we handle the function in our custom cancelCloser struct
ctx.Context, cancel = context.WithTimeout(ctx.Context, opts.BlockForReadUntil)
defer cancel()
// Step 3: Join the singleflight queue
recordCh := make(chan *database.DbThumbnail, 1)
recordCh := make(chan *database.DbThumbnail)
defer close(recordCh)
r, err, _ := sf.Do(fmt.Sprintf("%s/%s?%s", origin, mediaId, opts.String()), func() (io.ReadCloser, error) {
serveRecord := func(recordCh chan *database.DbThumbnail, record *database.DbThumbnail) {
......@@ -116,9 +117,11 @@ func Execute(ctx rcontext.RequestContext, origin string, mediaId string, opts Th
return download.CreateLimitedStream(ctx, r, opts.StartByte, opts.EndByte)
})
if errors.Is(err, common.ErrMediaQuarantined) {
cancel()
return nil, r, err
}
if err != nil {
cancel()
return nil, nil, err
}
record := <-recordCh
......@@ -129,7 +132,8 @@ func Execute(ctx rcontext.RequestContext, origin string, mediaId string, opts Th
sentry.CaptureException(devErr)
r.Close()
}
cancel()
return record, nil, nil
}
return record, r, nil
return record, readers.NewCancelCloser(r, cancel), nil
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment