From 3ab6d898480e00a63d78e1579196cc3856cc4242 Mon Sep 17 00:00:00 2001 From: Peter Philipp Date: Tue, 30 Jul 2024 15:51:45 +0200 Subject: [PATCH 1/3] feat: Adjust prune compression mode to reduce calls to redis. --- Adapter/RedisTagAwareAdapter.php | 98 ++++++++++++++++++++++++++------ 1 file changed, 81 insertions(+), 17 deletions(-) diff --git a/Adapter/RedisTagAwareAdapter.php b/Adapter/RedisTagAwareAdapter.php index eb416ab..a5c659b 100644 --- a/Adapter/RedisTagAwareAdapter.php +++ b/Adapter/RedisTagAwareAdapter.php @@ -66,8 +66,9 @@ class RedisTagAwareAdapter extends AbstractTagAwareAdapter implements PruneableI * @param \Redis|\RedisArray|\RedisCluster|\Predis\ClientInterface|RedisProxy|RedisClusterProxy $redis The redis client * @param string $namespace The default namespace * @param int $defaultLifetime The default lifetime + * @param bool $pruneWithCompression Enable compressed prune. Way more resource intensive. */ - public function __construct($redis, string $namespace = '', int $defaultLifetime = 0, MarshallerInterface $marshaller = null) + public function __construct($redis, string $namespace = '', int $defaultLifetime = 0, MarshallerInterface $marshaller = null, bool $pruneWithCompression = true) { if ($redis instanceof \Predis\ClientInterface && $redis->getConnection() instanceof ClusterInterface && !$redis->getConnection() instanceof PredisCluster) { throw new InvalidArgumentException(sprintf('Unsupported Predis cluster connection: only "%s" is, "%s" given.', PredisCluster::class, get_debug_type($redis->getConnection()))); @@ -85,6 +86,7 @@ public function __construct($redis, string $namespace = '', int $defaultLifetime $this->init($redis, $namespace, $defaultLifetime, new TagAwareMarshaller($marshaller)); $this->namespace = $namespace; + $this->pruneWithCompression = $pruneWithCompression; } /** @@ -362,9 +364,15 @@ protected function getAllTagKeys(): array }); $setKeys = $results->valid() ? iterator_to_array($results) : []; - [$cursor, $ids] = $setKeys[$tagsPrefix] ?? [null, null]; - // merge the fetched ids together - $tagKeys = array_merge($tagKeys, $ids); + // $setKeys[$tagsPrefix] might be an RedisException object - + // check before just using it. + if (is_array($setKeys[$tagsPrefix])) { + [$cursor, $ids] = $setKeys[$tagsPrefix] ?? [null, null]; + // merge the fetched ids together + $tagKeys = array_merge($tagKeys, $ids); + } elseif (isset($setKeys[$tagsPrefix]) && $setKeys[$tagsPrefix] instanceof \Throwable) { + $this->logger->error($setKeys[$tagsPrefix]->getMessage()); + } } while ($cursor = (int) $cursor); return $tagKeys; @@ -425,15 +433,9 @@ private function getOrphanedTagsStats(bool $compressMode = false): array // referenced and existing cache keys differs collect the // missing references. if ($compressMode && \count($referencedCacheKeys) > $existingCacheKeysCount) { - // In order to create the delta each single reference - // has to be checked. - foreach ($referencedCacheKeys as $cacheKey) { - $existingCacheKeyResult = $this->pipeline(function () use ($cacheKey) { - yield 'exists' => [$cacheKey]; - }); - if ($existingCacheKeyResult->valid() && !$existingCacheKeyResult->current()) { - $orphanedTagReferenceKeys[$tagKey][] = $cacheKey; - } + $orphanedTagReferenceKeysInHash = $this->getOrphanedCacheKeys($referencedCacheKeys); + if (!empty($orphanedTagReferenceKeysInHash)) { + $orphanedTagReferenceKeys[$tagKey] = $orphanedTagReferenceKeysInHash; } } // Stop processing cursors in case compression mode is @@ -456,6 +458,65 @@ private function getOrphanedTagsStats(bool $compressMode = false): array return $stats; } + /** + * Accepts a list of cache keys and returns a list with orphaned keys. + * + * The method attempts to reduced optimize the testing of the keys by + * batching the key tests and reduce the amount of redis calls. + * + * @param array $referencedCacheKeys + * + * @return array + */ + private function getOrphanedCacheKeys($cacheKeys, $chunks = 2) + { + static $existsSupportsMultiKeys; + if (!isset($existsSupportsMultiKeys)) { + $hosts = $this->getHosts(); + $host = reset($hosts); + $info = $host->info('Server'); + $info = !$info instanceof ErrorInterface ? $info['Server'] ?? $info : ['redis_version' => '2.0']; + $existsSupportsMultiKeys = version_compare($info['redis_version'], '2.0.3', '>='); + } + $orphanedCacheKeys = []; + + if ($existsSupportsMultiKeys) { + // If we can check multiple keys at once divide and conquer to have + // faster execution. + $cacheKeysChunks = array_chunk($cacheKeys, floor(count($cacheKeys) / $chunks), true); + foreach ($cacheKeysChunks as $cacheKeysChunk) { + $result = $this->pipeline(function () use ($cacheKeysChunk) { + yield 'exists' => [$cacheKeysChunk]; + }); + if ($result->valid()) { + $existingKeys = $result->current(); + if ($existingKeys === 0) { + // None of the chunk exists - register all. + $orphanedCacheKeys = array_merge($orphanedCacheKeys, $cacheKeysChunk); + } elseif ($existingKeys !== count($cacheKeysChunk)) { + // Some exists some don't - trigger another batch of chunks. + // @TODO At what chunk size is a single item comparison more efficient? + // @TODO The call could set an optimized number of chunks. At this point the number of existing keys and the number + // of keys to check is known - this could allow to guesstimate the optimal fragmentation. + $orphanedCacheKeys = array_merge($orphanedCacheKeys, $this->getOrphanedCacheKeys($cacheKeysChunk)); + } + } + } + } else { + // Without multi-key support in exists each single reference + // has to be checked individually to create the delta. + foreach ($cacheKeys as $cacheKey) { + $result = $this->pipeline(function () use ($cacheKey) { + yield 'exists' => [$cacheKey]; + }); + if ($result->valid() && !$result->current()) { + $orphanedCacheKeys[] = $cacheKey; + } + } + } + return $orphanedCacheKeys; + } + /** * @TODO Verify the LUA scripts are redis-cluster safe. */ @@ -497,11 +558,14 @@ private function pruneOrphanedTags(bool $compressMode = false): bool return $success; } - /** - * @TODO Make compression mode flag configurable. - */ public function prune(): bool { - return $this->pruneOrphanedTags(true); + // First run without compression enabled to reduce data that is + // processed by the compression handling. + $result = $this->pruneOrphanedTags(); + if ($result && $this->pruneWithCompression) { + $result = $this->pruneOrphanedTags(true); + } + return $result; } } From 692237d38f4bda4769f7b341d10725183a47a39a Mon Sep 17 00:00:00 2001 From: Peter Philipp Date: Wed, 31 Jul 2024 09:58:18 +0200 Subject: [PATCH 2/3] chore: Added dedicated method the get redis version. Added flag to indicate if prune was run on an instance - avoiding double processing. --- Adapter/RedisTagAwareAdapter.php | 69 +++++++++++++++++++++++--------- 1 file changed, 51 insertions(+), 18 deletions(-) diff --git a/Adapter/RedisTagAwareAdapter.php b/Adapter/RedisTagAwareAdapter.php index a5c659b..fc7054e 100644 --- a/Adapter/RedisTagAwareAdapter.php +++ b/Adapter/RedisTagAwareAdapter.php @@ -60,6 +60,14 @@ class RedisTagAwareAdapter extends AbstractTagAwareAdapter implements PruneableI * @var string|null detected eviction policy used on Redis server */ private $redisEvictionPolicy; + /** + * @var string|null detected redis version of Redis server + */ + private $redisVersion; + /** + * @var bool|null Indicate whether this "namespace" has been pruned and what the result was. + */ + private $pruneResult; private $namespace; /** @@ -298,6 +306,36 @@ protected function doInvalidate(array $tagIds): bool return $success; } + /** + * @TODO Move to RedisTrait? It already has a version check - this would be handy. + * + * @return string + */ + private function getRedisVersion(): string + { + if (null !== $this->redisVersion) { + return $this->redisVersion; + } + + $hosts = $this->getHosts(); + $host = reset($hosts); + if ($host instanceof \Predis\Client && $host->getConnection() instanceof ReplicationInterface) { + // Predis supports info command only on the master in replication environments + $hosts = [$host->getClientFor('master')]; + } + + foreach ($hosts as $host) { + $info = $host->info('Server'); + + if ($info instanceof ErrorInterface) { + continue; + } + return $this->redisVersion = $info['redis_version']; + } + // Fallback to 2.0 like RedisTrait does. + return $this->redisVersion = '2.0'; + } + private function getRedisEvictionPolicy(): string { if (null !== $this->redisEvictionPolicy) { @@ -464,23 +502,15 @@ private function getOrphanedTagsStats(bool $compressMode = false): array * The method attempts to reduced optimize the testing of the keys by * batching the key tests and reduce the amount of redis calls. * - * @param array $referencedCacheKeys + * @param array $cacheKeys + * @param int $chunks Number of chunks to create when processing cacheKeys. * * @return array */ - private function getOrphanedCacheKeys($cacheKeys, $chunks = 2) + private function getOrphanedCacheKeys(array $cacheKeys, int $chunks = 2) { - static $existsSupportsMultiKeys; - if (!isset($existsSupportsMultiKeys)) { - $hosts = $this->getHosts(); - $host = reset($hosts); - $info = $host->info('Server'); - $info = !$info instanceof ErrorInterface ? $info['Server'] ?? $info : ['redis_version' => '2.0']; - $existsSupportsMultiKeys = version_compare($info['redis_version'], '2.0.3', '>='); - } $orphanedCacheKeys = []; - - if ($existsSupportsMultiKeys) { + if (version_compare($this->getRedisVersion(), '2.0.3', '>=')) { // If we can check multiple keys at once divide and conquer to have // faster execution. $cacheKeysChunks = array_chunk($cacheKeys, floor(count($cacheKeys) / $chunks), true); @@ -560,12 +590,15 @@ private function pruneOrphanedTags(bool $compressMode = false): bool public function prune(): bool { - // First run without compression enabled to reduce data that is - // processed by the compression handling. - $result = $this->pruneOrphanedTags(); - if ($result && $this->pruneWithCompression) { - $result = $this->pruneOrphanedTags(true); + // Only prune once per prune run. + if (!isset($this->pruneResult)) { + // First run without compression enabled to reduce data that is + // processed by the compression handling. + $this->pruneResult = $this->pruneOrphanedTags(); + if ($this->pruneResult && $this->pruneWithCompression) { + $this->pruneResult = $this->pruneOrphanedTags(true); + } } - return $result; + return $this->pruneResult; } } From 3476f73538188101feb974ab1a7983a694237505 Mon Sep 17 00:00:00 2001 From: Peter Philipp Date: Wed, 31 Jul 2024 10:27:19 +0200 Subject: [PATCH 3/3] chore: Disable compression on prune by default. --- Adapter/RedisTagAwareAdapter.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Adapter/RedisTagAwareAdapter.php b/Adapter/RedisTagAwareAdapter.php index fc7054e..6ec1038 100644 --- a/Adapter/RedisTagAwareAdapter.php +++ b/Adapter/RedisTagAwareAdapter.php @@ -76,7 +76,7 @@ class RedisTagAwareAdapter extends AbstractTagAwareAdapter implements PruneableI * @param int $defaultLifetime The default lifetime * @param bool $pruneWithCompression Enable compressed prune. Way more resource intensive. */ - public function __construct($redis, string $namespace = '', int $defaultLifetime = 0, MarshallerInterface $marshaller = null, bool $pruneWithCompression = true) + public function __construct($redis, string $namespace = '', int $defaultLifetime = 0, MarshallerInterface $marshaller = null, bool $pruneWithCompression = false) { if ($redis instanceof \Predis\ClientInterface && $redis->getConnection() instanceof ClusterInterface && !$redis->getConnection() instanceof PredisCluster) { throw new InvalidArgumentException(sprintf('Unsupported Predis cluster connection: only "%s" is, "%s" given.', PredisCluster::class, get_debug_type($redis->getConnection())));