From c3b4828a5c294cb81e8347e265c9bcd784bf89a5 Mon Sep 17 00:00:00 2001 From: Travis Ralston <travpc@gmail.com> Date: Tue, 4 Jul 2023 22:11:27 -0600 Subject: [PATCH] Implement new URL preview pipeline engine --- api/r0/preview_url.go | 31 +++-- database/db.go | 4 + database/table_url_previews.go | 84 ++++++++++++ pipelines/_steps/url_preview/upload_image.go | 67 ++++++++++ pipelines/pipeline_preview/pipeline.go | 127 +++++++++++++++++++ util/time.go | 4 + 6 files changed, 308 insertions(+), 9 deletions(-) create mode 100644 database/table_url_previews.go create mode 100644 pipelines/_steps/url_preview/upload_image.go create mode 100644 pipelines/pipeline_preview/pipeline.go diff --git a/api/r0/preview_url.go b/api/r0/preview_url.go index c761a4f0..2f42a291 100644 --- a/api/r0/preview_url.go +++ b/api/r0/preview_url.go @@ -1,17 +1,18 @@ package r0 import ( - "github.com/getsentry/sentry-go" - "github.com/turt2live/matrix-media-repo/api/_apimeta" - "github.com/turt2live/matrix-media-repo/api/_responses" - + "errors" "net/http" "strconv" "strings" + "github.com/getsentry/sentry-go" + "github.com/turt2live/matrix-media-repo/api/_apimeta" + "github.com/turt2live/matrix-media-repo/api/_responses" + "github.com/turt2live/matrix-media-repo/pipelines/pipeline_preview" + "github.com/turt2live/matrix-media-repo/common" "github.com/turt2live/matrix-media-repo/common/rcontext" - "github.com/turt2live/matrix-media-repo/controllers/preview_controller" "github.com/turt2live/matrix-media-repo/util" ) @@ -61,7 +62,19 @@ func PreviewUrl(r *http.Request, rctx rcontext.RequestContext, user _apimeta.Use languageHeader = r.Header.Get("Accept-Language") } - preview, err := preview_controller.GetPreview(urlStr, r.Host, user.UserId, ts, languageHeader, rctx) + preview, err := pipeline_preview.Execute(rctx, r.Host, urlStr, user.UserId, pipeline_preview.PreviewOpts{ + Timestamp: ts, + LanguageHeader: languageHeader, + }) + if err == nil && preview != nil && preview.ErrorCode != "" { + if preview.ErrorCode == common.ErrCodeInvalidHost { + err = common.ErrInvalidHost + } else if preview.ErrorCode == common.ErrCodeNotFound { + err = common.ErrMediaNotFound + } else { + err = errors.New("url previews: unknown error code: " + preview.ErrorCode) + } + } if err != nil { if err == common.ErrMediaNotFound || err == common.ErrHostNotFound { return _responses.NotFoundError() @@ -69,14 +82,14 @@ func PreviewUrl(r *http.Request, rctx rcontext.RequestContext, user _apimeta.Use return _responses.BadRequest(err.Error()) } else { sentry.CaptureException(err) - return _responses.InternalServerError("unexpected error during request") + return _responses.InternalServerError("Unexpected Error") } } return &MatrixOpenGraph{ - Url: preview.Url, + Url: preview.SiteUrl, SiteName: preview.SiteName, - Type: preview.Type, + Type: preview.ResourceType, Description: preview.Description, Title: preview.Title, ImageMxc: preview.ImageMxc, diff --git a/database/db.go b/database/db.go index d4fbbfa2..e738f566 100644 --- a/database/db.go +++ b/database/db.go @@ -24,6 +24,7 @@ type Database struct { HeldMedia *heldMediaTableStatements Thumbnails *thumbnailsTableStatements LastAccess *lastAccessTableStatements + UrlPreviews *urlPreviewsTableStatements } var instance *Database @@ -104,6 +105,9 @@ func openDatabase(connectionString string, maxConns int, maxIdleConns int) error if d.LastAccess, err = prepareLastAccessTables(d.conn); err != nil { return errors.New("failed to create last access table accessor: " + err.Error()) } + if d.UrlPreviews, err = prepareUrlPreviewsTables(d.conn); err != nil { + return errors.New("failed to create url previews table accessor: " + err.Error()) + } instance = d return nil diff --git a/database/table_url_previews.go b/database/table_url_previews.go new file mode 100644 index 00000000..23514eb9 --- /dev/null +++ b/database/table_url_previews.go @@ -0,0 +1,84 @@ +package database + +import ( + "database/sql" + "errors" + + "github.com/turt2live/matrix-media-repo/common/rcontext" + "github.com/turt2live/matrix-media-repo/util" +) + +type DbUrlPreview struct { + Url string + ErrorCode string + BucketTs int64 + SiteUrl string + SiteName string + ResourceType string + Description string + Title string + ImageMxc string + ImageType string + ImageSize int64 + ImageWidth int + ImageHeight int + LanguageHeader string +} + +const selectUrlPreview = "SELECT url, error_code, bucket_ts, site_url, site_name, resource_type, description, title, image_mxc, image_type, image_size, image_width, image_height, language_header FROM url_previews WHERE url = $1 AND bucket_ts = $2 AND language_header = $3;" +const insertUrlPreview = "INSERT INTO url_previews (url, error_code, bucket_ts, site_url, site_name, resource_type, description, title, image_mxc, image_type, image_size, image_width, image_height, language_header) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14);" + +type urlPreviewsTableStatements struct { + selectUrlPreview *sql.Stmt + insertUrlPreview *sql.Stmt +} + +type urlPreviewsTableWithContext struct { + statements *urlPreviewsTableStatements + ctx rcontext.RequestContext +} + +func prepareUrlPreviewsTables(db *sql.DB) (*urlPreviewsTableStatements, error) { + var err error + var stmts = &urlPreviewsTableStatements{} + + if stmts.selectUrlPreview, err = db.Prepare(selectUrlPreview); err != nil { + return nil, errors.New("error preparing selectUrlPreview: " + err.Error()) + } + if stmts.insertUrlPreview, err = db.Prepare(insertUrlPreview); err != nil { + return nil, errors.New("error preparing insertUrlPreview: " + err.Error()) + } + + return stmts, nil +} + +func (s *urlPreviewsTableStatements) Prepare(ctx rcontext.RequestContext) *urlPreviewsTableWithContext { + return &urlPreviewsTableWithContext{ + statements: s, + ctx: ctx, + } +} + +func (s *urlPreviewsTableWithContext) Get(url string, ts int64, languageHeader string) (*DbUrlPreview, error) { + row := s.statements.selectUrlPreview.QueryRowContext(s.ctx, url, ts, languageHeader) + val := &DbUrlPreview{} + err := row.Scan(&val.Url, &val.ErrorCode, &val.BucketTs, &val.SiteUrl, &val.SiteName, &val.ResourceType, &val.Description, &val.Title, &val.ImageMxc, &val.ImageType, &val.ImageSize, &val.ImageWidth, &val.ImageHeight, &val.LanguageHeader) + if err == sql.ErrNoRows { + return nil, nil + } + return val, err +} + +func (s *urlPreviewsTableWithContext) Insert(p *DbUrlPreview) error { + _, err := s.statements.insertUrlPreview.ExecContext(s.ctx, p.Url, p.ErrorCode, p.BucketTs, p.SiteUrl, p.SiteName, p.ResourceType, p.Description, p.Title, p.ImageMxc, p.ImageType, p.ImageSize, p.ImageWidth, p.ImageHeight, p.LanguageHeader) + return err +} + +func (s *urlPreviewsTableWithContext) InsertError(url string, errorCode string) { + _ = s.Insert(&DbUrlPreview{ + Url: url, + ErrorCode: errorCode, + BucketTs: util.GetHourBucket(util.NowMillis()), + // remainder of fields don't matter + }) +} diff --git a/pipelines/_steps/url_preview/upload_image.go b/pipelines/_steps/url_preview/upload_image.go new file mode 100644 index 00000000..4694c925 --- /dev/null +++ b/pipelines/_steps/url_preview/upload_image.go @@ -0,0 +1,67 @@ +package url_preview + +import ( + "io" + + "github.com/getsentry/sentry-go" + "github.com/turt2live/matrix-media-repo/common/rcontext" + "github.com/turt2live/matrix-media-repo/controllers/preview_controller/preview_types" + "github.com/turt2live/matrix-media-repo/database" + "github.com/turt2live/matrix-media-repo/datastores" + "github.com/turt2live/matrix-media-repo/pipelines/pipeline_upload" + "github.com/turt2live/matrix-media-repo/thumbnailing" + "github.com/turt2live/matrix-media-repo/util" +) + +func UploadImage(ctx rcontext.RequestContext, image *preview_types.PreviewImage, onHost string, userId string, forRecord *database.DbUrlPreview) { + if image == nil { + return + } + + pr, pw := io.Pipe() + tee := io.TeeReader(image.Data, pw) + mediaChan := make(chan *database.DbMedia) + defer close(mediaChan) + go func() { + media, err := pipeline_upload.Execute(ctx, onHost, "", io.NopCloser(tee), image.ContentType, image.Filename, userId, datastores.LocalMediaKind) + if err != nil { + _ = pw.CloseWithError(err) + } else { + _ = pw.Close() + } + go func() { + defer func() { + recover() // consume write-to-closed-channel errors + }() + mediaChan <- media + }() + }() + + w := 0 + h := 0 + g, r, err := thumbnailing.GetGenerator(pr, image.ContentType, false) + _, _ = io.Copy(io.Discard, pr) + if err != nil { + ctx.Log.Warn("Non-fatal error handling URL preview thumbnail: ", err) + sentry.CaptureException(err) + return + } + if g != nil { + _, w, h, err = g.GetOriginDimensions(r, image.ContentType, ctx) + if err != nil { + ctx.Log.Warn("Non-fatal error getting URL preview thumbnail dimensions: ", err) + sentry.CaptureException(err) + } + } + + record := <-mediaChan + if record == nil { + return + } + + forRecord.ImageMxc = util.MxcUri(record.Origin, record.MediaId) + forRecord.ImageType = record.ContentType + forRecord.ImageSize = record.SizeBytes + forRecord.ImageWidth = w + forRecord.ImageHeight = h +} diff --git a/pipelines/pipeline_preview/pipeline.go b/pipelines/pipeline_preview/pipeline.go new file mode 100644 index 00000000..330428ac --- /dev/null +++ b/pipelines/pipeline_preview/pipeline.go @@ -0,0 +1,127 @@ +package pipeline_preview + +import ( + "errors" + "fmt" + "net/url" + + "github.com/getsentry/sentry-go" + "github.com/turt2live/matrix-media-repo/common" + "github.com/turt2live/matrix-media-repo/common/rcontext" + "github.com/turt2live/matrix-media-repo/controllers/preview_controller/preview_types" + "github.com/turt2live/matrix-media-repo/controllers/preview_controller/previewers" + "github.com/turt2live/matrix-media-repo/database" + "github.com/turt2live/matrix-media-repo/pipelines/_steps/url_preview" + "github.com/turt2live/matrix-media-repo/util" + "golang.org/x/sync/singleflight" +) + +var sf = new(singleflight.Group) + +type PreviewOpts struct { + Timestamp int64 + LanguageHeader string +} + +func Execute(ctx rcontext.RequestContext, onHost string, previewUrl string, userId string, opts PreviewOpts) (*database.DbUrlPreview, error) { + // Step 1: Check database cache + previewDb := database.GetInstance().UrlPreviews.Prepare(ctx) + record, err := previewDb.Get(previewUrl, opts.Timestamp, opts.LanguageHeader) + if err != nil || record != nil { + return record, err + } + + // Step 2: Fix timestamp bucket. If we're within 60 seconds of a bucket, just assume we're okay, so we don't + // infinitely recurse into ourselves. + now := util.NowMillis() + atBucket := util.GetHourBucket(opts.Timestamp) + nowBucket := util.GetHourBucket(now) + if (now-opts.Timestamp) > 60000 && atBucket != nowBucket { + return Execute(ctx, onHost, previewUrl, userId, PreviewOpts{ + Timestamp: now, + LanguageHeader: opts.LanguageHeader, + }) + } + + // Step 3: Process the URL + parsedUrl, err := url.Parse(previewUrl) + if err != nil { + previewDb.InsertError(previewUrl, common.ErrCodeInvalidHost) + return nil, common.ErrInvalidHost + } + parsedUrl.Fragment = "" // remove fragments because they're not useful to servers + + // Step 4: Join the singleflight queue + r, err, _ := sf.Do(fmt.Sprintf("%s:%s_%d/%s", onHost, previewUrl, opts.Timestamp, opts.LanguageHeader), func() (interface{}, error) { + payload := &preview_types.UrlPayload{ + UrlString: previewUrl, + ParsedUrl: parsedUrl, + } + var preview preview_types.PreviewResult + err = preview_types.ErrPreviewUnsupported + + // Step 5: Try oEmbed + if ctx.Config.UrlPreviews.OEmbed { + ctx.Log.Debug("Trying oEmbed previewer") + preview, err = previewers.GenerateOEmbedPreview(payload, opts.LanguageHeader, ctx) + } + + // Step 6: Try OpenGraph + if err == preview_types.ErrPreviewUnsupported { + ctx.Log.Debug("Trying OpenGraph previewer") + preview, err = previewers.GenerateOpenGraphPreview(payload, opts.LanguageHeader, ctx) + } + + // Step 7: Try scraping + if err == preview_types.ErrPreviewUnsupported { + ctx.Log.Debug("Trying built-in previewer") + preview, err = previewers.GenerateCalculatedPreview(payload, opts.LanguageHeader, ctx) + } + + // Step 8: Finish processing + if err != nil { + if err == preview_types.ErrPreviewUnsupported { + err = common.ErrMediaNotFound + } + + if err == common.ErrMediaNotFound { + previewDb.InsertError(previewUrl, common.ErrCodeNotFound) + } else { + previewDb.InsertError(previewUrl, common.ErrCodeUnknown) + } + return nil, err + } else { + result := &database.DbUrlPreview{ + Url: previewUrl, + ErrorCode: "", + BucketTs: util.GetHourBucket(opts.Timestamp), + SiteUrl: preview.Url, + SiteName: preview.SiteName, + ResourceType: preview.Type, + Description: preview.Description, + Title: preview.Title, + LanguageHeader: opts.LanguageHeader, + } + + // Step 9: Store the thumbnail, if needed + url_preview.UploadImage(ctx, preview.Image, onHost, userId, result) + + // Step 10: Insert the record + err = previewDb.Insert(result) + if err != nil { + ctx.Log.Warn("Non-fatal error caching URL preview: ", err) + sentry.CaptureException(err) + } + + return result, nil + } + }) + if err != nil { + return nil, err + } + if val, ok := r.(*database.DbUrlPreview); !ok { + return nil, errors.New("runtime error: expected DbUrlPreview, got something else") + } else { + return val, nil + } +} diff --git a/util/time.go b/util/time.go index d972bafb..2b8d5a30 100644 --- a/util/time.go +++ b/util/time.go @@ -30,3 +30,7 @@ func CalcBlockForDuration(timeoutMs string) (time.Duration, error) { } return blockFor, nil } + +func GetHourBucket(ts int64) int64 { + return (ts / 3600000) * 3600000 +} -- GitLab