Skip to content

Commit

Permalink
fix: replace rate-limiter (#2356)
Browse files Browse the repository at this point in the history
[rate-limiter-flexible](https://npmjs.com/package/rate-limiter-flexible) is a CJS module with a single export of all it's various implementations.

This defeats tree shaking resulting in the addition of 42KB to the bundle size.

animir/node-rate-limiter-flexible#249

This PR brings the source & tests for the in-memory rate limiter into `@libp2p/utils` which reduces the bundle size increase to a few KBs.

---------

Co-authored-by: chad <[email protected]>
  • Loading branch information
achingbrain and maschad committed Jan 12, 2024
1 parent 4691f41 commit ddaa59a
Show file tree
Hide file tree
Showing 8 changed files with 571 additions and 9 deletions.
1 change: 0 additions & 1 deletion packages/libp2p/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@
"merge-options": "^3.0.4",
"multiformats": "^13.0.0",
"private-ip": "^3.0.1",
"rate-limiter-flexible": "^4.0.0",
"uint8arrays": "^5.0.0"
},
"devDependencies": {
Expand Down
6 changes: 3 additions & 3 deletions packages/libp2p/src/connection-manager/index.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { CodeError, KEEP_ALIVE } from '@libp2p/interface'
import { PeerMap } from '@libp2p/peer-collections'
import { defaultAddressSort } from '@libp2p/utils/address-sort'
import { RateLimiter } from '@libp2p/utils/rate-limiter'
import { type Multiaddr, type Resolver, multiaddr } from '@multiformats/multiaddr'
import { dnsaddrResolver } from '@multiformats/multiaddr/resolvers'
import { RateLimiterMemory } from 'rate-limiter-flexible'
import { codes } from '../errors.js'
import { getPeerAddress } from '../get-peer.js'
import { AutoDial } from './auto-dial.js'
Expand Down Expand Up @@ -167,7 +167,7 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {
public readonly dialQueue: DialQueue
public readonly autoDial: AutoDial
public readonly connectionPruner: ConnectionPruner
private readonly inboundConnectionRateLimiter: RateLimiterMemory
private readonly inboundConnectionRateLimiter: RateLimiter

private readonly peerStore: PeerStore
private readonly metrics?: Metrics
Expand Down Expand Up @@ -206,7 +206,7 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {
this.maxIncomingPendingConnections = init.maxIncomingPendingConnections ?? defaultOptions.maxIncomingPendingConnections

// controls individual peers trying to dial us too quickly
this.inboundConnectionRateLimiter = new RateLimiterMemory({
this.inboundConnectionRateLimiter = new RateLimiter({
points: init.inboundConnectionThreshold ?? defaultOptions.inboundConnectionThreshold,
duration: 1
})
Expand Down
1 change: 0 additions & 1 deletion packages/stream-multiplexer-mplex/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@
"it-pipe": "^3.0.1",
"it-pushable": "^3.2.1",
"it-stream-types": "^2.0.1",
"rate-limiter-flexible": "^4.0.0",
"uint8-varint": "^2.0.0",
"uint8arraylist": "^2.4.3",
"uint8arrays": "^5.0.0"
Expand Down
6 changes: 3 additions & 3 deletions packages/stream-multiplexer-mplex/src/mplex.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { CodeError } from '@libp2p/interface'
import { closeSource } from '@libp2p/utils/close-source'
import { RateLimiter } from '@libp2p/utils/rate-limiter'
import { pipe } from 'it-pipe'
import { type Pushable, pushable } from 'it-pushable'
import { RateLimiterMemory } from 'rate-limiter-flexible'
import { toString as uint8ArrayToString } from 'uint8arrays'
import { Decoder } from './decode.js'
import { encode } from './encode.js'
Expand Down Expand Up @@ -59,7 +59,7 @@ export class MplexStreamMuxer implements StreamMuxer {
private readonly _init: MplexStreamMuxerInit
private readonly _source: Pushable<Message>
private readonly closeController: AbortController
private readonly rateLimiter: RateLimiterMemory
private readonly rateLimiter: RateLimiter
private readonly closeTimeout: number
private readonly logger: ComponentLogger

Expand Down Expand Up @@ -114,7 +114,7 @@ export class MplexStreamMuxer implements StreamMuxer {
*/
this.closeController = new AbortController()

this.rateLimiter = new RateLimiterMemory({
this.rateLimiter = new RateLimiter({
points: init.disconnectThreshold ?? DISCONNECT_THRESHOLD,
duration: 1
})
Expand Down
5 changes: 5 additions & 0 deletions packages/utils/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@
"types": "./dist/src/peer-queue.d.ts",
"import": "./dist/src/peer-queue.js"
},
"./rate-limiter": {
"types": "./dist/src/rate-limiter.d.ts",
"import": "./dist/src/rate-limiter.js"
},
"./stream-to-ma-conn": {
"types": "./dist/src/stream-to-ma-conn.d.ts",
"import": "./dist/src/stream-to-ma-conn.js"
Expand Down Expand Up @@ -119,6 +123,7 @@
},
"dependencies": {
"@chainsafe/is-ip": "^2.0.2",
"delay": "^6.0.0",
"@libp2p/interface": "^1.1.1",
"@libp2p/logger": "^4.0.4",
"@multiformats/multiaddr": "^12.1.10",
Expand Down
287 changes: 287 additions & 0 deletions packages/utils/src/rate-limiter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,287 @@
import { CodeError } from '@libp2p/interface'
import delay from 'delay'

export interface RateLimiterInit {
/**
* Number of points
*
* @default 4
*/
points?: number

/**
* Per seconds
*
* @default 1
*/
duration?: number

/**
* Block if consumed more than points in current duration for blockDuration seconds
*
* @default 0
*/
blockDuration?: number

/**
* Execute allowed actions evenly over duration
*
* @default false
*/
execEvenly?: boolean

/**
* ms, works with execEvenly=true option
*
* @default duration * 1000 / points
*/
execEvenlyMinDelayMs?: number

/**
* @default rlflx
*/
keyPrefix?: string
}

export interface GetKeySecDurationOptions {
customDuration?: number
}

export interface RateLimiterResult {
remainingPoints: number
msBeforeNext: number
consumedPoints: number
isFirstInDuration: boolean
}

export interface RateRecord {
value: number
expiresAt?: Date
timeoutId?: ReturnType<typeof setTimeout>
}

export class RateLimiter {
public readonly memoryStorage: MemoryStorage
protected points: number
protected duration: number
protected blockDuration: number
protected execEvenly: boolean
protected execEvenlyMinDelayMs: number
protected keyPrefix: string

constructor (opts: RateLimiterInit = {}) {
this.points = opts.points ?? 4
this.duration = opts.duration ?? 1
this.blockDuration = opts.blockDuration ?? 0
this.execEvenly = opts.execEvenly ?? false
this.execEvenlyMinDelayMs = opts.execEvenlyMinDelayMs ?? (this.duration * 1000 / this.points)
this.keyPrefix = opts.keyPrefix ?? 'rlflx'
this.memoryStorage = new MemoryStorage()
}

async consume (key: string, pointsToConsume: number = 1, options: GetKeySecDurationOptions = {}): Promise<RateLimiterResult> {
const rlKey = this.getKey(key)
const secDuration = this._getKeySecDuration(options)
let res = this.memoryStorage.incrby(rlKey, pointsToConsume, secDuration)
res.remainingPoints = Math.max(this.points - res.consumedPoints, 0)

if (res.consumedPoints > this.points) {
// Block only first time when consumed more than points
if (this.blockDuration > 0 && res.consumedPoints <= (this.points + pointsToConsume)) {
// Block key
res = this.memoryStorage.set(rlKey, res.consumedPoints, this.blockDuration)
}

throw new CodeError('Rate limit exceeded', 'ERR_RATE_LIMIT_EXCEEDED', res)
} else if (this.execEvenly && res.msBeforeNext > 0 && !res.isFirstInDuration) {
// Execute evenly
let delayMs = Math.ceil(res.msBeforeNext / (res.remainingPoints + 2))
if (delayMs < this.execEvenlyMinDelayMs) {
delayMs = res.consumedPoints * this.execEvenlyMinDelayMs
}

await delay(delayMs)
}

return res
}

penalty (key: string, points: number = 1, options: GetKeySecDurationOptions = {}): RateLimiterResult {
const rlKey = this.getKey(key)
const secDuration = this._getKeySecDuration(options)
const res = this.memoryStorage.incrby(rlKey, points, secDuration)
res.remainingPoints = Math.max(this.points - res.consumedPoints, 0)

return res
}

reward (key: string, points: number = 1, options: GetKeySecDurationOptions = {}): RateLimiterResult {
const rlKey = this.getKey(key)
const secDuration = this._getKeySecDuration(options)
const res = this.memoryStorage.incrby(rlKey, -points, secDuration)
res.remainingPoints = Math.max(this.points - res.consumedPoints, 0)

return res
}

/**
* Block any key for secDuration seconds
*
* @param key
* @param secDuration
*/
block (key: string, secDuration: number): RateLimiterResult {
const msDuration = secDuration * 1000
const initPoints = this.points + 1

this.memoryStorage.set(this.getKey(key), initPoints, secDuration)

return {
remainingPoints: 0,
msBeforeNext: msDuration === 0 ? -1 : msDuration,
consumedPoints: initPoints,
isFirstInDuration: false
}
}

set (key: string, points: number, secDuration: number = 0): RateLimiterResult {
const msDuration = (secDuration >= 0 ? secDuration : this.duration) * 1000

this.memoryStorage.set(this.getKey(key), points, secDuration)

return {
remainingPoints: 0,
msBeforeNext: msDuration === 0 ? -1 : msDuration,
consumedPoints: points,
isFirstInDuration: false
}
}

get (key: string): RateLimiterResult | undefined {
const res = this.memoryStorage.get(this.getKey(key))

if (res != null) {
res.remainingPoints = Math.max(this.points - res.consumedPoints, 0)
}

return res
}

delete (key: string): void {
this.memoryStorage.delete(this.getKey(key))
}

private _getKeySecDuration (options?: GetKeySecDurationOptions): number {
if (options?.customDuration != null && options.customDuration >= 0) {
return options.customDuration
}

return this.duration
}

getKey (key: string): string {
return this.keyPrefix.length > 0 ? `${this.keyPrefix}:${key}` : key
}

parseKey (rlKey: string): string {
return rlKey.substring(this.keyPrefix.length)
}
}

class MemoryStorage {
public readonly storage: Map<string, RateRecord>

constructor () {
this.storage = new Map()
}

incrby (key: string, value: number, durationSec: number): RateLimiterResult {
const existing = this.storage.get(key)

if (existing != null) {
const msBeforeExpires = existing.expiresAt != null
? existing.expiresAt.getTime() - new Date().getTime()
: -1

if (existing.expiresAt == null || msBeforeExpires > 0) {
// Change value
existing.value += value

return {
remainingPoints: 0,
msBeforeNext: msBeforeExpires,
consumedPoints: existing.value,
isFirstInDuration: false
}
}

return this.set(key, value, durationSec)
}

return this.set(key, value, durationSec)
}

set (key: string, value: number, durationSec: number): RateLimiterResult {
const durationMs = durationSec * 1000
const existing = this.storage.get(key)

if (existing != null) {
clearTimeout(existing.timeoutId)
}

const record: RateRecord = {
value,
expiresAt: durationMs > 0 ? new Date(Date.now() + durationMs) : undefined
}

this.storage.set(key, record)

if (durationMs > 0) {
record.timeoutId = setTimeout(() => {
this.storage.delete(key)
}, durationMs)

if (record.timeoutId.unref != null) {
record.timeoutId.unref()
}
}

return {
remainingPoints: 0,
msBeforeNext: durationMs === 0 ? -1 : durationMs,
consumedPoints: record.value,
isFirstInDuration: true
}
}

get (key: string): RateLimiterResult | undefined {
const existing = this.storage.get(key)

if (existing != null) {
const msBeforeExpires = existing.expiresAt != null
? existing.expiresAt.getTime() - new Date().getTime()
: -1
return {
remainingPoints: 0,
msBeforeNext: msBeforeExpires,
consumedPoints: existing.value,
isFirstInDuration: false
}
}
}

delete (key: string): boolean {
const record = this.storage.get(key)

if (record != null) {
if (record.timeoutId != null) {
clearTimeout(record.timeoutId)
}

this.storage.delete(key)

return true
}
return false
}
}

0 comments on commit ddaa59a

Please sign in to comment.