Skip to content
Snippets Groups Projects
Commit 3dc8e5eb authored by Frédéric Guillot's avatar Frédéric Guillot
Browse files

Refresh feeds in the cronjob in parallel

parent c85b1909
No related branches found
No related tags found
No related merge requests found
...@@ -11,7 +11,6 @@ import ( ...@@ -11,7 +11,6 @@ import (
"miniflux.app/database" "miniflux.app/database"
"miniflux.app/locale" "miniflux.app/locale"
"miniflux.app/logger" "miniflux.app/logger"
feedHandler "miniflux.app/reader/handler"
"miniflux.app/storage" "miniflux.app/storage"
"miniflux.app/ui/static" "miniflux.app/ui/static"
"miniflux.app/version" "miniflux.app/version"
...@@ -192,18 +191,7 @@ func Parse() { ...@@ -192,18 +191,7 @@ func Parse() {
} }
if flagCronjob { if flagCronjob {
jobs, err := store.NewBatch(config.Opts.BatchSize()) runCronjob(store)
if err != nil {
logger.Error("[Cronjob] %v", err)
}
logger.Info("[Cronjob]] Processing %d jobs", len(jobs))
for _, job := range jobs {
if err := feedHandler.RefreshFeed(store, job.UserID, job.FeedID); err != nil {
logger.Error("[Cronjob] Refreshing the feed #%d returned this error: %v", job.FeedID, err)
}
}
return return
} }
......
// SPDX-FileCopyrightText: Copyright The Miniflux Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package cli // import "miniflux.app/cli"
import (
"sync"
"time"
"miniflux.app/config"
"miniflux.app/logger"
"miniflux.app/model"
feedHandler "miniflux.app/reader/handler"
"miniflux.app/storage"
)
func runCronjob(store *storage.Storage) {
var wg sync.WaitGroup
startTime := time.Now()
jobs, err := store.NewBatch(config.Opts.BatchSize())
if err != nil {
logger.Error("[Cronjob] %v", err)
}
nbJobs := len(jobs)
logger.Info("[Cronjob]] Created %d jobs from a batch size of %d", nbJobs, config.Opts.BatchSize())
var jobQueue = make(chan model.Job, nbJobs)
logger.Info("[Cronjob] Starting a pool of %d workers", config.Opts.WorkerPoolSize())
for i := 0; i < config.Opts.WorkerPoolSize(); i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for job := range jobQueue {
logger.Info("[Cronjob] Refreshing feed #%d for user #%d in worker #%d", job.FeedID, job.UserID, workerID)
if err := feedHandler.RefreshFeed(store, job.UserID, job.FeedID); err != nil {
logger.Error("[Cronjob] Refreshing the feed #%d returned this error: %v", job.FeedID, err)
}
}
}(i)
}
for _, job := range jobs {
jobQueue <- job
}
close(jobQueue)
wg.Wait()
logger.Info("[Cronjob] Refreshed %d feed(s) in %s", nbJobs, time.Since(startTime))
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment