From 116e5cd52082a8d436e084823c1cce2e65f2ef32 Mon Sep 17 00:00:00 2001 From: Travis Ralston <travpc@gmail.com> Date: Sun, 4 Jun 2023 00:20:00 -0600 Subject: [PATCH] Move pipelines around a bit --- api/r0/download.go | 4 ++-- api/r0/upload.go | 4 ++-- api/v1/create.go | 4 ++-- {pipline => pipelines}/_steps/download/open_stream.go | 0 {pipline => pipelines}/_steps/download/try_download.go | 4 ++-- {pipline => pipelines}/_steps/quota/check.go | 0 {pipline => pipelines}/_steps/upload/blurhash_async.go | 0 {pipline => pipelines}/_steps/upload/deduplicate.go | 0 .../_steps/upload/generate_media_id.go | 0 {pipline => pipelines}/_steps/upload/limit.go | 0 {pipline => pipelines}/_steps/upload/lock.go | 0 {pipline => pipelines}/_steps/upload/quarantine.go | 0 {pipline => pipelines}/_steps/upload/redis_async.go | 0 .../pipeline_create}/pipeline.go | 6 +++--- .../pipeline_download}/pipeline.go | 6 +++--- .../pipeline_upload}/pipeline.go | 10 +++++----- 16 files changed, 19 insertions(+), 19 deletions(-) rename {pipline => pipelines}/_steps/download/open_stream.go (100%) rename {pipline => pipelines}/_steps/download/try_download.go (96%) rename {pipline => pipelines}/_steps/quota/check.go (100%) rename {pipline => pipelines}/_steps/upload/blurhash_async.go (100%) rename {pipline => pipelines}/_steps/upload/deduplicate.go (100%) rename {pipline => pipelines}/_steps/upload/generate_media_id.go (100%) rename {pipline => pipelines}/_steps/upload/limit.go (100%) rename {pipline => pipelines}/_steps/upload/lock.go (100%) rename {pipline => pipelines}/_steps/upload/quarantine.go (100%) rename {pipline => pipelines}/_steps/upload/redis_async.go (100%) rename {pipline/create_pipeline => pipelines/pipeline_create}/pipeline.go (87%) rename {pipline/download_pipeline => pipelines/pipeline_download}/pipeline.go (91%) rename {pipline/upload_pipeline => pipelines/pipeline_upload}/pipeline.go (90%) diff --git a/api/r0/download.go b/api/r0/download.go index 5272e386..7855f73e 100644 --- a/api/r0/download.go +++ b/api/r0/download.go @@ -9,7 +9,7 @@ import ( "github.com/turt2live/matrix-media-repo/api/_apimeta" "github.com/turt2live/matrix-media-repo/api/_responses" "github.com/turt2live/matrix-media-repo/api/_routers" - "github.com/turt2live/matrix-media-repo/pipline/download_pipeline" + "github.com/turt2live/matrix-media-repo/pipelines/pipeline_download" "github.com/turt2live/matrix-media-repo/util" "github.com/sirupsen/logrus" @@ -75,7 +75,7 @@ func DownloadMedia(r *http.Request, rctx rcontext.RequestContext, user _apimeta. return _responses.MediaBlocked() } - media, stream, err := download_pipeline.DownloadMedia(rctx, server, mediaId, download_pipeline.DownloadOpts{ + media, stream, err := pipeline_download.Execute(rctx, server, mediaId, pipeline_download.DownloadOpts{ FetchRemoteIfNeeded: downloadRemote, StartByte: -1, EndByte: -1, diff --git a/api/r0/upload.go b/api/r0/upload.go index 3287154f..4734ba0a 100644 --- a/api/r0/upload.go +++ b/api/r0/upload.go @@ -13,7 +13,7 @@ import ( "github.com/turt2live/matrix-media-repo/common/rcontext" "github.com/turt2live/matrix-media-repo/database" "github.com/turt2live/matrix-media-repo/datastores" - "github.com/turt2live/matrix-media-repo/pipline/upload_pipeline" + "github.com/turt2live/matrix-media-repo/pipelines/pipeline_upload" "github.com/turt2live/matrix-media-repo/util" ) @@ -60,7 +60,7 @@ func UploadMedia(r *http.Request, rctx rcontext.RequestContext, user _apimeta.Us } // Actually upload - media, err := upload_pipeline.UploadMedia(rctx, r.Host, "", r.Body, contentType, filename, user.UserId, datastores.LocalMediaKind) + media, err := pipeline_upload.Execute(rctx, r.Host, "", r.Body, contentType, filename, user.UserId, datastores.LocalMediaKind) if err != nil { if err == common.ErrQuotaExceeded { return _responses.QuotaExceeded() diff --git a/api/v1/create.go b/api/v1/create.go index 0617093f..2036482f 100644 --- a/api/v1/create.go +++ b/api/v1/create.go @@ -7,7 +7,7 @@ import ( "github.com/turt2live/matrix-media-repo/api/_apimeta" "github.com/turt2live/matrix-media-repo/api/_responses" "github.com/turt2live/matrix-media-repo/common/rcontext" - "github.com/turt2live/matrix-media-repo/pipline/create_pipeline" + "github.com/turt2live/matrix-media-repo/pipelines/pipeline_create" "github.com/turt2live/matrix-media-repo/util" ) @@ -17,7 +17,7 @@ type MediaCreatedResponse struct { } func CreateMedia(r *http.Request, rctx rcontext.RequestContext, user _apimeta.UserInfo) interface{} { - id, err := create_pipeline.Execute(rctx, r.Host, user.UserId, create_pipeline.DefaultExpirationTime) + id, err := pipeline_create.Execute(rctx, r.Host, user.UserId, pipeline_create.DefaultExpirationTime) if err != nil { rctx.Log.Error("Unexpected error creating media ID:", err) sentry.CaptureException(err) diff --git a/pipline/_steps/download/open_stream.go b/pipelines/_steps/download/open_stream.go similarity index 100% rename from pipline/_steps/download/open_stream.go rename to pipelines/_steps/download/open_stream.go diff --git a/pipline/_steps/download/try_download.go b/pipelines/_steps/download/try_download.go similarity index 96% rename from pipline/_steps/download/try_download.go rename to pipelines/_steps/download/try_download.go index a963505a..a892add7 100644 --- a/pipline/_steps/download/try_download.go +++ b/pipelines/_steps/download/try_download.go @@ -19,7 +19,7 @@ import ( "github.com/turt2live/matrix-media-repo/errcache" "github.com/turt2live/matrix-media-repo/matrix" "github.com/turt2live/matrix-media-repo/metrics" - "github.com/turt2live/matrix-media-repo/pipline/upload_pipeline" + "github.com/turt2live/matrix-media-repo/pipelines/pipeline_upload" "github.com/turt2live/matrix-media-repo/pool" "github.com/turt2live/matrix-media-repo/util" ) @@ -147,7 +147,7 @@ func TryDownload(ctx rcontext.RequestContext, origin string, mediaId string) (*d }(dsConf, pr, bufferCh) go func(ctx rcontext.RequestContext, origin string, mediaId string, r io.ReadCloser, upstreamClose func() error, contentType string, fileName string, uploadCh chan uploadResult) { - m, err2 := upload_pipeline.UploadMedia(ctx, origin, mediaId, r, contentType, fileName, "", datastores.RemoteMediaKind) + m, err2 := pipeline_upload.Execute(ctx, origin, mediaId, r, contentType, fileName, "", datastores.RemoteMediaKind) // async the channel update to avoid deadlocks go func(uploadCh chan uploadResult, err2 error, m *database.DbMedia) { uploadCh <- uploadResult{err: err2, m: m} diff --git a/pipline/_steps/quota/check.go b/pipelines/_steps/quota/check.go similarity index 100% rename from pipline/_steps/quota/check.go rename to pipelines/_steps/quota/check.go diff --git a/pipline/_steps/upload/blurhash_async.go b/pipelines/_steps/upload/blurhash_async.go similarity index 100% rename from pipline/_steps/upload/blurhash_async.go rename to pipelines/_steps/upload/blurhash_async.go diff --git a/pipline/_steps/upload/deduplicate.go b/pipelines/_steps/upload/deduplicate.go similarity index 100% rename from pipline/_steps/upload/deduplicate.go rename to pipelines/_steps/upload/deduplicate.go diff --git a/pipline/_steps/upload/generate_media_id.go b/pipelines/_steps/upload/generate_media_id.go similarity index 100% rename from pipline/_steps/upload/generate_media_id.go rename to pipelines/_steps/upload/generate_media_id.go diff --git a/pipline/_steps/upload/limit.go b/pipelines/_steps/upload/limit.go similarity index 100% rename from pipline/_steps/upload/limit.go rename to pipelines/_steps/upload/limit.go diff --git a/pipline/_steps/upload/lock.go b/pipelines/_steps/upload/lock.go similarity index 100% rename from pipline/_steps/upload/lock.go rename to pipelines/_steps/upload/lock.go diff --git a/pipline/_steps/upload/quarantine.go b/pipelines/_steps/upload/quarantine.go similarity index 100% rename from pipline/_steps/upload/quarantine.go rename to pipelines/_steps/upload/quarantine.go diff --git a/pipline/_steps/upload/redis_async.go b/pipelines/_steps/upload/redis_async.go similarity index 100% rename from pipline/_steps/upload/redis_async.go rename to pipelines/_steps/upload/redis_async.go diff --git a/pipline/create_pipeline/pipeline.go b/pipelines/pipeline_create/pipeline.go similarity index 87% rename from pipline/create_pipeline/pipeline.go rename to pipelines/pipeline_create/pipeline.go index 6cbf918f..c2167a70 100644 --- a/pipline/create_pipeline/pipeline.go +++ b/pipelines/pipeline_create/pipeline.go @@ -1,10 +1,10 @@ -package create_pipeline +package pipeline_create import ( "github.com/turt2live/matrix-media-repo/common/rcontext" "github.com/turt2live/matrix-media-repo/database" - "github.com/turt2live/matrix-media-repo/pipline/_steps/quota" - "github.com/turt2live/matrix-media-repo/pipline/_steps/upload" + "github.com/turt2live/matrix-media-repo/pipelines/_steps/quota" + "github.com/turt2live/matrix-media-repo/pipelines/_steps/upload" "github.com/turt2live/matrix-media-repo/util" ) diff --git a/pipline/download_pipeline/pipeline.go b/pipelines/pipeline_download/pipeline.go similarity index 91% rename from pipline/download_pipeline/pipeline.go rename to pipelines/pipeline_download/pipeline.go index c7b06097..f678b748 100644 --- a/pipline/download_pipeline/pipeline.go +++ b/pipelines/pipeline_download/pipeline.go @@ -1,4 +1,4 @@ -package download_pipeline +package pipeline_download import ( "context" @@ -10,7 +10,7 @@ import ( "github.com/turt2live/matrix-media-repo/common" "github.com/turt2live/matrix-media-repo/common/rcontext" "github.com/turt2live/matrix-media-repo/database" - "github.com/turt2live/matrix-media-repo/pipline/_steps/download" + "github.com/turt2live/matrix-media-repo/pipelines/_steps/download" ) var sf = new(sfstreams.Group) @@ -41,7 +41,7 @@ func (c *cancelCloser) Close() error { return c.r.Close() } -func DownloadMedia(ctx rcontext.RequestContext, origin string, mediaId string, opts DownloadOpts) (*database.DbMedia, io.ReadCloser, error) { +func Execute(ctx rcontext.RequestContext, origin string, mediaId string, opts DownloadOpts) (*database.DbMedia, io.ReadCloser, error) { // Step 1: Make our context a timeout context var cancel context.CancelFunc //goland:noinspection GoVetLostCancel - we handle the function in our custom cancelCloser struct diff --git a/pipline/upload_pipeline/pipeline.go b/pipelines/pipeline_upload/pipeline.go similarity index 90% rename from pipline/upload_pipeline/pipeline.go rename to pipelines/pipeline_upload/pipeline.go index 6a8c8765..15a92456 100644 --- a/pipline/upload_pipeline/pipeline.go +++ b/pipelines/pipeline_upload/pipeline.go @@ -1,4 +1,4 @@ -package upload_pipeline +package pipeline_upload import ( "errors" @@ -8,13 +8,13 @@ import ( "github.com/turt2live/matrix-media-repo/common/rcontext" "github.com/turt2live/matrix-media-repo/database" "github.com/turt2live/matrix-media-repo/datastores" - "github.com/turt2live/matrix-media-repo/pipline/_steps/quota" - "github.com/turt2live/matrix-media-repo/pipline/_steps/upload" + "github.com/turt2live/matrix-media-repo/pipelines/_steps/quota" + "github.com/turt2live/matrix-media-repo/pipelines/_steps/upload" "github.com/turt2live/matrix-media-repo/util" ) -// UploadMedia Media upload. If mediaId is an empty string, one will be generated. -func UploadMedia(ctx rcontext.RequestContext, origin string, mediaId string, r io.ReadCloser, contentType string, fileName string, userId string, kind datastores.Kind) (*database.DbMedia, error) { +// Execute Media upload. If mediaId is an empty string, one will be generated. +func Execute(ctx rcontext.RequestContext, origin string, mediaId string, r io.ReadCloser, contentType string, fileName string, userId string, kind datastores.Kind) (*database.DbMedia, error) { // Step 1: Limit the stream's length r = upload.LimitStream(ctx, r) -- GitLab