Skip to content

Commit

Permalink
Reschedule next check date for rate limited feeds
Browse files Browse the repository at this point in the history
  • Loading branch information
shizunge committed Dec 4, 2023
1 parent 5b83e53 commit c2db23c
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 10 deletions.
5 changes: 4 additions & 1 deletion internal/model/feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (f *Feed) CheckedNow() {
}

// ScheduleNextCheck set "next_check_at" of a feed based on the scheduler selected from the configuration.
func (f *Feed) ScheduleNextCheck(weeklyCount int, newTTL int) {
func (f *Feed) ScheduleNextCheck(weeklyCount int, newTTL int, rateLimited bool) {
f.TTL = newTTL
// Default to the global config Polling Frequency.
var intervalMinutes int
Expand All @@ -123,6 +123,9 @@ func (f *Feed) ScheduleNextCheck(weeklyCount int, newTTL int) {
default:
intervalMinutes = config.Opts.SchedulerRoundRobinMinInterval()
}
if rateLimited {
intervalMinutes += (12 * 60)
}
// If the feed has a TTL defined, we use it to make sure we don't check it too often.
if newTTL > intervalMinutes && newTTL > 0 {
intervalMinutes = newTTL
Expand Down
39 changes: 31 additions & 8 deletions internal/model/feed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
const (
largeWeeklyCount = 10080
noNewTTL = 0
noRateLimited = false
)

func TestFeedCategorySetter(t *testing.T) {
Expand Down Expand Up @@ -89,7 +90,7 @@ func TestFeedScheduleNextCheckDefault(t *testing.T) {
timeBefore := time.Now()
feed := &Feed{}
weeklyCount := 10
feed.ScheduleNextCheck(weeklyCount, noNewTTL)
feed.ScheduleNextCheck(weeklyCount, noNewTTL, noRateLimited)

if feed.NextCheckAt.IsZero() {
t.Error(`The next_check_at must be set`)
Expand All @@ -115,7 +116,7 @@ func TestFeedScheduleNextCheckRoundRobinMinInterval(t *testing.T) {
timeBefore := time.Now()
feed := &Feed{}
weeklyCount := 100
feed.ScheduleNextCheck(weeklyCount, noNewTTL)
feed.ScheduleNextCheck(weeklyCount, noNewTTL, noRateLimited)

if feed.NextCheckAt.IsZero() {
t.Error(`The next_check_at must be set`)
Expand Down Expand Up @@ -144,7 +145,7 @@ func TestFeedScheduleNextCheckEntryFrequencyMaxInterval(t *testing.T) {
feed := &Feed{}
// Use a very small weekly count to trigger the max interval
weeklyCount := 1
feed.ScheduleNextCheck(weeklyCount, noNewTTL)
feed.ScheduleNextCheck(weeklyCount, noNewTTL, noRateLimited)

if feed.NextCheckAt.IsZero() {
t.Error(`The next_check_at must be set`)
Expand Down Expand Up @@ -173,7 +174,7 @@ func TestFeedScheduleNextCheckEntryFrequencyMaxIntervalZeroWeeklyCount(t *testin
feed := &Feed{}
// Use a very small weekly count to trigger the max interval
weeklyCount := 0
feed.ScheduleNextCheck(weeklyCount, noNewTTL)
feed.ScheduleNextCheck(weeklyCount, noNewTTL, noRateLimited)

if feed.NextCheckAt.IsZero() {
t.Error(`The next_check_at must be set`)
Expand Down Expand Up @@ -202,7 +203,7 @@ func TestFeedScheduleNextCheckEntryFrequencyMinInterval(t *testing.T) {
feed := &Feed{}
// Use a very large weekly count to trigger the min interval
weeklyCount := largeWeeklyCount
feed.ScheduleNextCheck(weeklyCount, noNewTTL)
feed.ScheduleNextCheck(weeklyCount, noNewTTL, noRateLimited)

if feed.NextCheckAt.IsZero() {
t.Error(`The next_check_at must be set`)
Expand All @@ -228,7 +229,7 @@ func TestFeedScheduleNextCheckEntryFrequencyFactor(t *testing.T) {
timeBefore := time.Now()
feed := &Feed{}
weeklyCount := 7
feed.ScheduleNextCheck(weeklyCount, noNewTTL)
feed.ScheduleNextCheck(weeklyCount, noNewTTL, noRateLimited)

if feed.NextCheckAt.IsZero() {
t.Error(`The next_check_at must be set`)
Expand Down Expand Up @@ -260,7 +261,7 @@ func TestFeedScheduleNextCheckEntryFrequencySmallNewTTL(t *testing.T) {
weeklyCount := largeWeeklyCount
// TTL is smaller than minInterval.
newTTL := minInterval / 2
feed.ScheduleNextCheck(weeklyCount, newTTL)
feed.ScheduleNextCheck(weeklyCount, newTTL, noRateLimited)

if feed.NextCheckAt.IsZero() {
t.Error(`The next_check_at must be set`)
Expand Down Expand Up @@ -296,7 +297,7 @@ func TestFeedScheduleNextCheckEntryFrequencyLargeNewTTL(t *testing.T) {
weeklyCount := largeWeeklyCount
// TTL is larger than minInterval.
newTTL := minInterval * 2
feed.ScheduleNextCheck(weeklyCount, newTTL)
feed.ScheduleNextCheck(weeklyCount, newTTL, noRateLimited)

if feed.NextCheckAt.IsZero() {
t.Error(`The next_check_at must be set`)
Expand All @@ -309,3 +310,25 @@ func TestFeedScheduleNextCheckEntryFrequencyLargeNewTTL(t *testing.T) {
t.Error(`The next_check_at should be after timeBefore + entry frequency min interval`)
}
}

func TestFeedScheduleNextCheckRateLimited(t *testing.T) {
var err error
parser := config.NewParser()
config.Opts, err = parser.ParseEnvironmentVariables()
if err != nil {
t.Fatalf(`Parsing failure: %v`, err)
}

feed := &Feed{}
weeklyCount := 10
rateLimited := true
feed.ScheduleNextCheck(weeklyCount, noNewTTL, rateLimited)

if feed.NextCheckAt.IsZero() {
t.Error(`The next_check_at must be set`)
}

if feed.NextCheckAt.Before(time.Now().Add(time.Minute * time.Duration(60*12))) {
t.Error(`The next_check_at should not be before the now + 12 hours`)
}
}
4 changes: 4 additions & 0 deletions internal/reader/fetcher/response_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ func (r *ResponseHandler) IsModified(lastEtagValue, lastModifiedValue string) bo
return true
}

func (r *ResponseHandler) IsRateLimited() bool {
return r.httpResponse.StatusCode == http.StatusTooManyRequests
}

func (r *ResponseHandler) Close() {
if r.httpResponse != nil && r.httpResponse.Body != nil && r.clientErr == nil {
r.httpResponse.Body.Close()
Expand Down
9 changes: 8 additions & 1 deletion internal/reader/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ func RefreshFeed(store *storage.Storage, userID, feedID int64, forceRefresh bool

weeklyEntryCount := 0
newTTL := 0
rateLimited := false
if config.Opts.PollingScheduler() == model.SchedulerEntryFrequency {
var weeklyCountErr error
weeklyEntryCount, weeklyCountErr = store.WeeklyFeedEntryCount(userID, feedID)
Expand All @@ -230,12 +231,13 @@ func RefreshFeed(store *storage.Storage, userID, feedID int64, forceRefresh bool
// Commit the result to the database at the end of this function.
// If we met an error before entering the defer function, localizedError would not be nil.
defer func() {
originalFeed.ScheduleNextCheck(weeklyEntryCount, newTTL)
originalFeed.ScheduleNextCheck(weeklyEntryCount, newTTL, rateLimited)
slog.Debug("Updated next check date",
slog.Int64("user_id", userID),
slog.Int64("feed_id", feedID),
slog.Int("weeklyEntryCount", weeklyEntryCount),
slog.Int("ttl", newTTL),
slog.Bool("rateLimited", rateLimited),
slog.Time("new_next_check_at", originalFeed.NextCheckAt),
)
if localizedError == nil {
Expand Down Expand Up @@ -266,6 +268,11 @@ func RefreshFeed(store *storage.Storage, userID, feedID int64, forceRefresh bool
responseHandler := fetcher.NewResponseHandler(requestBuilder.ExecuteRequest(originalFeed.FeedURL))
defer responseHandler.Close()

rateLimited = responseHandler.IsRateLimited()
if rateLimited {
slog.Warn("Feed is rate limited (429 status code)", slog.String("feed_url", originalFeed.FeedURL))
}

if localizedError = responseHandler.LocalizedError(); localizedError != nil {
slog.Warn("Unable to fetch feed", slog.String("feed_url", originalFeed.FeedURL), slog.Any("error", localizedError.Error()))
return localizedError
Expand Down

0 comments on commit c2db23c

Please sign in to comment.