diff --git a/lib/util/cache/package/decorator.ts b/lib/util/cache/package/decorator.ts index 052ac79c79d3490651c37bb06e5a99b76ee1e736..00c96640df67c859609a83129f5ee161ba7017ad 100644 --- a/lib/util/cache/package/decorator.ts +++ b/lib/util/cache/package/decorator.ts @@ -3,6 +3,7 @@ import { DateTime } from 'luxon'; import { GlobalConfig } from '../../../config/global'; import { logger } from '../../../logger'; import { Decorator, decorate } from '../../decorator'; +import { acquireLock } from '../../mutex'; import type { DecoratorCachedRecord, PackageCacheNamespace } from './types'; import * as packageCache from '.'; @@ -79,63 +80,71 @@ export function cache<T>({ } finalKey = `cache-decorator:${finalKey}`; - const oldRecord = await packageCache.get<DecoratorCachedRecord>( - finalNamespace, - finalKey, - ); - const ttlOverride = getTtlOverride(finalNamespace); - const softTtl = ttlOverride ?? ttlMinutes; + // prevent concurrent processing and cache writes + const releaseLock = await acquireLock(finalKey, finalNamespace); + + try { + const oldRecord = await packageCache.get<DecoratorCachedRecord>( + finalNamespace, + finalKey, + ); + + const ttlOverride = getTtlOverride(finalNamespace); + const softTtl = ttlOverride ?? ttlMinutes; + + const cacheHardTtlMinutes = GlobalConfig.get( + 'cacheHardTtlMinutes', + 7 * 24 * 60, + ); + let hardTtl = softTtl; + if (methodName === 'getReleases' || methodName === 'getDigest') { + hardTtl = Math.max(softTtl, cacheHardTtlMinutes); + } - const cacheHardTtlMinutes = GlobalConfig.get( - 'cacheHardTtlMinutes', - 7 * 24 * 60, - ); - let hardTtl = softTtl; - if (methodName === 'getReleases' || methodName === 'getDigest') { - hardTtl = Math.max(softTtl, cacheHardTtlMinutes); - } + let oldData: unknown; + if (oldRecord) { + const now = DateTime.local(); + const cachedAt = DateTime.fromISO(oldRecord.cachedAt); - let oldData: unknown; - if (oldRecord) { - const now = DateTime.local(); - const cachedAt = DateTime.fromISO(oldRecord.cachedAt); + const softDeadline = cachedAt.plus({ minutes: softTtl }); + if (now < softDeadline) { + return oldRecord.value; + } - const softDeadline = cachedAt.plus({ minutes: softTtl }); - if (now < softDeadline) { - return oldRecord.value; + const hardDeadline = cachedAt.plus({ minutes: hardTtl }); + if (now < hardDeadline) { + oldData = oldRecord.value; + } } - const hardDeadline = cachedAt.plus({ minutes: hardTtl }); - if (now < hardDeadline) { - oldData = oldRecord.value; + let newData: unknown; + if (oldData) { + try { + newData = (await callback()) as T | undefined; + } catch (err) { + logger.debug( + { err }, + 'Package cache decorator: callback error, returning old data', + ); + return oldData; + } + } else { + newData = (await callback()) as T | undefined; } - } - let newData: unknown; - if (oldData) { - try { - newData = (await callback()) as T | undefined; - } catch (err) { - logger.debug( - { err }, - 'Package cache decorator: callback error, returning old data', - ); - return oldData; + if (!is.undefined(newData)) { + const newRecord: DecoratorCachedRecord = { + cachedAt: DateTime.local().toISO(), + value: newData, + }; + await packageCache.set(finalNamespace, finalKey, newRecord, hardTtl); } - } else { - newData = (await callback()) as T | undefined; - } - if (!is.undefined(newData)) { - const newRecord: DecoratorCachedRecord = { - cachedAt: DateTime.local().toISO(), - value: newData, - }; - await packageCache.set(finalNamespace, finalKey, newRecord, hardTtl); + return newData; + } finally { + releaseLock(); } - - return newData; }); } diff --git a/lib/util/mutex.spec.ts b/lib/util/mutex.spec.ts new file mode 100644 index 0000000000000000000000000000000000000000..13ac8984461df057bc9afebcc9e72401ffc02d3b --- /dev/null +++ b/lib/util/mutex.spec.ts @@ -0,0 +1,28 @@ +import { afterEach } from '@jest/globals'; +import { acquireLock, getMutex } from './mutex'; + +describe('util/mutex', () => { + describe('getMutex', () => { + it('returns mutex with default namespace', () => { + expect(getMutex('test')).toBeDefined(); + }); + }); + + describe('acquireLock', () => { + afterEach(() => { + getMutex('test').release(); + }); + + it('return lock function with default namespace', async () => { + await expect(acquireLock('test')).resolves.toBeFunction(); + }); + + it('should lock if already used', async () => { + const mutex = getMutex('test'); + const releaseLock = await acquireLock('test'); + expect(mutex.isLocked()).toBeTrue(); + releaseLock(); + expect(mutex.isLocked()).toBeFalse(); + }); + }); +}); diff --git a/lib/util/mutex.ts b/lib/util/mutex.ts new file mode 100644 index 0000000000000000000000000000000000000000..310f48eb4c1862dcb298581b1eff7eb8eb61f2d3 --- /dev/null +++ b/lib/util/mutex.ts @@ -0,0 +1,21 @@ +import { Mutex, type MutexInterface, withTimeout } from 'async-mutex'; + +const DEFAULT_NAMESPACE = 'default'; +const mutexes: Record<string, Record<string, MutexInterface>> = {}; + +export function getMutex( + key: string, + namespace: string = DEFAULT_NAMESPACE, +): MutexInterface { + mutexes[namespace] ??= {}; + // create a new mutex if it doesn't exist with a timeout of 2 minutes + mutexes[namespace][key] ??= withTimeout(new Mutex(), 1000 * 60 * 2); + return mutexes[namespace][key]; +} + +export function acquireLock( + key: string, + namespace: string = DEFAULT_NAMESPACE, +): Promise<MutexInterface.Releaser> { + return getMutex(key, namespace).acquire(); +} diff --git a/package.json b/package.json index c05fd221ba5b4756dca9d235d4f8e031eadd81a1..ab980d6f3aa4fe0559d029d1346ca85f85ef4059 100644 --- a/package.json +++ b/package.json @@ -168,6 +168,7 @@ "@yarnpkg/parsers": "3.0.2", "agentkeepalive": "4.5.0", "aggregate-error": "3.1.0", + "async-mutex": "0.5.0", "auth-header": "1.0.0", "aws4": "1.13.1", "azure-devops-node-api": "14.0.2", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 98fe683f99a985700d19b08c902e933d9da5b168..5b5dd74e92925934f504b70f8908946b2da93bfa 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -98,6 +98,9 @@ importers: aggregate-error: specifier: 3.1.0 version: 3.1.0 + async-mutex: + specifier: 0.5.0 + version: 0.5.0 auth-header: specifier: 1.0.0 version: 1.0.0 @@ -1169,7 +1172,6 @@ packages: '@ls-lint/ls-lint@2.2.3': resolution: {integrity: sha512-ekM12jNm/7O2I/hsRv9HvYkRdfrHpiV1epVuI2NP+eTIcEgdIdKkKCs9KgQydu/8R5YXTov9aHdOgplmCHLupw==} - cpu: [x64, arm64, s390x] os: [darwin, linux, win32] hasBin: true @@ -2406,6 +2408,9 @@ packages: asn1.js@5.4.1: resolution: {integrity: sha512-+I//4cYPccV8LdmBLiX8CYvf9Sp3vQsrqu2QNXRcrbiWvcx/UdlFiqUJJzxRQxgsZmvhXhn4cSKeSmoFjVdupA==} + async-mutex@0.5.0: + resolution: {integrity: sha512-1A94B18jkJ3DYq284ohPxoXbfTA5HsQ7/Mf4DEhcyLx3Bz27Rh59iScbB6EPiP+B+joue6YCxcMXSbFC1tZKwA==} + async@3.2.5: resolution: {integrity: sha512-baNZyqaaLhyLVKm/DlvdW051MSgO6b8eVfIezl9E5PqWxFgzLm/wQntEW4zOytVburDEr0JlALEpdOFwvErLsg==} @@ -8982,6 +8987,10 @@ snapshots: safer-buffer: 2.1.2 optional: true + async-mutex@0.5.0: + dependencies: + tslib: 2.6.3 + async@3.2.5: {} auth-header@1.0.0: {}