Skip to content
Snippets Groups Projects
Commit 18f55d15 authored by Frédéric Guillot's avatar Frédéric Guillot
Browse files

Add scheduler to clean old sessions

parent 00257988
No related branches found
No related tags found
No related merge requests found
......@@ -11,17 +11,18 @@ import (
// Default config parameters values
const (
DefaultBaseURL = "http://localhost"
DefaultDatabaseURL = "postgres://postgres:postgres@localhost/miniflux2?sslmode=disable"
DefaultWorkerPoolSize = 5
DefaultPollingFrequency = 60
DefaultBatchSize = 10
DefaultDatabaseMaxConns = 20
DefaultListenAddr = "127.0.0.1:8080"
DefaultCertFile = ""
DefaultKeyFile = ""
DefaultCertDomain = ""
DefaultCertCache = "/tmp/cert_cache"
DefaultBaseURL = "http://localhost"
DefaultDatabaseURL = "postgres://postgres:postgres@localhost/miniflux2?sslmode=disable"
DefaultWorkerPoolSize = 5
DefaultPollingFrequency = 60
DefaultBatchSize = 10
DefaultDatabaseMaxConns = 20
DefaultListenAddr = "127.0.0.1:8080"
DefaultCertFile = ""
DefaultKeyFile = ""
DefaultCertDomain = ""
DefaultCertCache = "/tmp/cert_cache"
DefaultSessionCleanupFrequency = 24
)
// Config manages configuration parameters.
......
......@@ -47,13 +47,15 @@ func run(cfg *config.Config, store *storage.Storage) {
pool := scheduler.NewWorkerPool(feedHandler, cfg.GetInt("WORKER_POOL_SIZE", config.DefaultWorkerPoolSize))
server := server.NewServer(cfg, store, pool, feedHandler)
scheduler.NewScheduler(
scheduler.NewFeedScheduler(
store,
pool,
cfg.GetInt("POLLING_FREQUENCY", config.DefaultPollingFrequency),
cfg.GetInt("BATCH_SIZE", config.DefaultBatchSize),
)
scheduler.NewSessionScheduler(store, config.DefaultSessionCleanupFrequency)
<-stop
logger.Info("Shutting down the server...")
ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
......
......@@ -11,18 +11,30 @@ import (
"github.com/miniflux/miniflux/storage"
)
// NewScheduler starts a new scheduler that push jobs to a pool of workers.
func NewScheduler(store *storage.Storage, workerPool *WorkerPool, frequency, batchSize int) {
// NewFeedScheduler starts a new scheduler that push jobs to a pool of workers.
func NewFeedScheduler(store *storage.Storage, workerPool *WorkerPool, frequency, batchSize int) {
go func() {
c := time.Tick(time.Duration(frequency) * time.Minute)
for now := range c {
for _ = range c {
jobs, err := store.NewBatch(batchSize)
if err != nil {
logger.Error("[Scheduler] %v", err)
logger.Error("[FeedScheduler] %v", err)
} else {
logger.Debug("[Scheduler:%v] => Pushing %d jobs", now, len(jobs))
logger.Debug("[FeedScheduler] Pushing %d jobs", len(jobs))
workerPool.Push(jobs)
}
}
}()
}
// NewSessionScheduler starts a new scheduler that clean old sessions.
func NewSessionScheduler(store *storage.Storage, frequency int) {
go func() {
c := time.Tick(time.Duration(frequency) * time.Hour)
for _ = range c {
nbSessions := store.CleanOldSessions()
nbUserSessions := store.CleanOldUserSessions()
logger.Debug("[SessionScheduler] cleaned %d sessions and %d user sessions", nbSessions, nbUserSessions)
}
}()
}
......@@ -75,3 +75,17 @@ func (s *Storage) FlushAllSessions() (err error) {
return nil
}
// CleanOldSessions removes sessions older than 30 days.
func (s *Storage) CleanOldSessions() int64 {
query := `DELETE FROM sessions
WHERE id IN (SELECT id FROM sessions WHERE created_at < now() - interval '30 days')`
result, err := s.db.Exec(query)
if err != nil {
return 0
}
n, _ := result.RowsAffected()
return n
}
......@@ -127,3 +127,17 @@ func (s *Storage) RemoveUserSessionByID(userID, sessionID int64) error {
return nil
}
// CleanOldUserSessions removes user sessions older than 30 days.
func (s *Storage) CleanOldUserSessions() int64 {
query := `DELETE FROM user_sessions
WHERE id IN (SELECT id FROM user_sessions WHERE created_at < now() - interval '30 days')`
result, err := s.db.Exec(query)
if err != nil {
return 0
}
n, _ := result.RowsAffected()
return n
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment