diff --git a/Sources/JobsPostgres/Migrations/CreateJobMetadataMigration.swift b/Sources/JobsPostgres/Migrations/CreateJobMetadataMigration.swift new file mode 100644 index 0000000..bd723e5 --- /dev/null +++ b/Sources/JobsPostgres/Migrations/CreateJobMetadataMigration.swift @@ -0,0 +1,47 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Hummingbird server framework project +// +// Copyright (c) 2025 the Hummingbird authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Logging +import PostgresMigrations +import PostgresNIO + +struct CreateJobMetadataMigration: DatabaseMigration { + + func apply(connection: PostgresNIO.PostgresConnection, logger: Logging.Logger) async throws { + try await connection.query( + """ + CREATE TABLE IF NOT EXISTS swift_jobs.metadata( + key TEXT NOT NULL, + value BYTEA NOT NULL, + expires TIMESTAMPTZ NOT NULL DEFAULT 'infinity', + queue_name TEXT NOT NULL DEFAULT 'default', + CONSTRAINT key_queue_name PRIMARY KEY (key, queue_name) + ) + """, + logger: logger + ) + } + + func revert(connection: PostgresNIO.PostgresConnection, logger: Logging.Logger) async throws { + try await connection.query( + """ + DROP TABLE swift_jobs.metadata + """, + logger: logger + ) + } + + var description: String { "__JobMetadataMigration__" } + var group: DatabaseMigrationGroup { .jobQueue } +} diff --git a/Sources/JobsPostgres/Migrations/CreateSwiftJobsMigrations.swift b/Sources/JobsPostgres/Migrations/CreateSwiftJobsMigrations.swift index 414f62a..5204dfe 100644 --- a/Sources/JobsPostgres/Migrations/CreateSwiftJobsMigrations.swift +++ b/Sources/JobsPostgres/Migrations/CreateSwiftJobsMigrations.swift @@ -55,25 +55,6 @@ struct CreateSwiftJobsMigrations: DatabaseMigration { """, logger: logger ) - - try await connection.query( - """ - CREATE TABLE IF NOT EXISTS swift_jobs.queues_metadata( - key TEXT PRIMARY KEY, - value BYTEA NOT NULL, - queue_name TEXT NOT NULL DEFAULT 'default' - ) - """, - logger: logger - ) - - try await connection.query( - """ - CREATE INDEX IF NOT EXISTS queues_metadata_key_queue_name_idx - ON swift_jobs.queues_metadata(key, queue_name) - """, - logger: logger - ) } func revert(connection: PostgresNIO.PostgresConnection, logger: Logging.Logger) async throws { diff --git a/Sources/JobsPostgres/PostgresJobsQueue.swift b/Sources/JobsPostgres/PostgresJobsQueue.swift index 3b67110..6fd465b 100644 --- a/Sources/JobsPostgres/PostgresJobsQueue.swift +++ b/Sources/JobsPostgres/PostgresJobsQueue.swift @@ -42,7 +42,7 @@ import PostgresNIO /// try await migrations.apply(client: postgresClient, logger: logger, dryRun: applyMigrations) /// } /// ``` -public final class PostgresJobQueue: JobQueueDriver, JobMetadataDriver, CancellableJobQueue, ResumableJobQueue { +public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, ResumableJobQueue { public typealias JobID = UUID @@ -158,6 +158,7 @@ public final class PostgresJobQueue: JobQueueDriver, JobMetadataDriver, Cancella self.isStopped = .init(false) self.migrations = migrations await migrations.add(CreateSwiftJobsMigrations(), skipDuplicates: true) + await migrations.add(CreateJobMetadataMigration(), skipDuplicates: true) self.registerCleanupJob() } @@ -290,31 +291,6 @@ public final class PostgresJobQueue: JobQueueDriver, JobMetadataDriver, Cancella /// shutdown queue once all active jobs have been processed public func shutdownGracefully() async {} - @inlinable - public func getMetadata(_ key: String) async throws -> ByteBuffer? { - let stream = try await self.client.query( - "SELECT value FROM swift_jobs.queues_metadata WHERE key = \(key) AND queue_name = \(configuration.queueName)", - logger: self.logger - ) - for try await value in stream.decode(ByteBuffer.self) { - return value - } - return nil - } - - @inlinable - public func setMetadata(key: String, value: ByteBuffer) async throws { - try await self.client.query( - """ - INSERT INTO swift_jobs.queues_metadata (key, value, queue_name) - VALUES (\(key), \(value), \(configuration.queueName)) - ON CONFLICT (key) - DO UPDATE SET value = \(value) - """, - logger: self.logger - ) - } - @usableFromInline func popFirst() async throws -> JobQueueResult? { enum PopFirstResult { @@ -575,6 +551,81 @@ extension PostgresJobQueue { } } +extension PostgresJobQueue: JobMetadataDriver { + @inlinable + public func getMetadata(_ key: String) async throws -> ByteBuffer? { + let stream = try await self.client.query( + "SELECT value FROM swift_jobs.metadata WHERE key = \(key) AND queue_name = \(self.configuration.queueName)", + logger: self.logger + ) + for try await value in stream.decode(ByteBuffer.self) { + return value + } + return nil + } + + @inlinable + public func setMetadata(key: String, value: ByteBuffer) async throws { + try await self.client.query( + """ + INSERT INTO swift_jobs.metadata (key, value, queue_name) + VALUES (\(key), \(value), \(self.configuration.queueName)) + ON CONFLICT (key, queue_name) + DO UPDATE SET value = \(value) + """, + logger: self.logger + ) + } + + /// Acquire metadata lock + /// + /// - Parameters: + /// - key: Metadata key + /// - id: Lock identifier + /// - expiresIn: When lock will expire + /// - Returns: If lock was acquired + @inlinable + public func acquireLock(key: String, id: ByteBuffer, expiresIn: TimeInterval) async throws -> Bool { + let expires = Date.now + expiresIn + // insert key, value, expiration into table. On conflict with key and queue_name only set value and + // expiration if expiration is out of date or value is the same + let stream = try await self.client.query( + """ + INSERT INTO swift_jobs.metadata (key, value, expires, queue_name) + VALUES (\(key), \(id), \(expires), \(self.configuration.queueName)) + ON CONFLICT (key, queue_name) + DO UPDATE + SET value = \(id), expires = \(expires) + WHERE swift_jobs.metadata.expires <= now() + OR swift_jobs.metadata.value = \(id) + RETURNING value + """, + logger: self.logger + ) + for try await value in stream.decode(ByteBuffer.self) { + return value == id + } + return false + } + + /// Release metadata lock + /// + /// - Parameters: + /// - key: Metadata key + /// - id: Lock identifier + @inlinable + public func releaseLock(key: String, id: ByteBuffer) async throws { + _ = try await self.client.query( + """ + DELETE FROM swift_jobs.metadata + WHERE key = \(key) + AND value = \(id) + AND queue_name = \(self.configuration.queueName) + """ + ) + } +} + extension JobQueueDriver where Self == PostgresJobQueue { /// Return Postgres driver for Job Queue /// - Parameters: diff --git a/Tests/JobsPostgresTests/JobsTests.swift b/Tests/JobsPostgresTests/JobsTests.swift index 70e34d9..9cb8e67 100644 --- a/Tests/JobsPostgresTests/JobsTests.swift +++ b/Tests/JobsPostgresTests/JobsTests.swift @@ -697,6 +697,19 @@ final class JobsTests: XCTestCase { } } + func testMultipleQueueMetadata() async throws { + try await self.testJobQueue(numWorkers: 1, configuration: .init(queueName: "queue1")) { jobQueue1 in + try await self.testJobQueue(numWorkers: 1, configuration: .init(queueName: "queue2")) { jobQueue2 in + try await jobQueue1.queue.setMetadata(key: "test", value: .init(string: "queue1")) + try await jobQueue2.queue.setMetadata(key: "test", value: .init(string: "queue2")) + let value1 = try await jobQueue1.queue.getMetadata("test") + let value2 = try await jobQueue2.queue.getMetadata("test") + XCTAssertEqual(value1.map { String(buffer: $0) }, "queue1") + XCTAssertEqual(value2.map { String(buffer: $0) }, "queue2") + } + } + } + func testResumableAndPausableJobs() async throws { struct TestParameters: JobParameters { static let jobName = "TestJob" @@ -993,4 +1006,55 @@ final class JobsTests: XCTestCase { } } } + + func testMetadataLock() async throws { + try await self.testJobQueue(numWorkers: 1) { jobQueue in + // 1 - acquire lock + var result = try await jobQueue.queue.acquireLock(key: "lock", id: .init(string: "one"), expiresIn: 10) + XCTAssertTrue(result) + // 2 - check I can acquire lock once I already have the lock + result = try await jobQueue.queue.acquireLock(key: "lock", id: .init(string: "one"), expiresIn: 10) + XCTAssertTrue(result) + // 3 - check I cannot acquire lock if a different identifer has it + result = try await jobQueue.queue.acquireLock(key: "lock", id: .init(string: "two"), expiresIn: 10) + XCTAssertFalse(result) + // 4 - release lock with identifier that doesn own it + try await jobQueue.queue.releaseLock(key: "lock", id: .init(string: "two")) + // 5 - check I still cannot acquire lock + result = try await jobQueue.queue.acquireLock(key: "lock", id: .init(string: "two"), expiresIn: 10) + XCTAssertFalse(result) + // 6 - release lock + try await jobQueue.queue.releaseLock(key: "lock", id: .init(string: "one")) + // 7 - check I can acquire lock after it has been released + result = try await jobQueue.queue.acquireLock(key: "lock", id: .init(string: "two"), expiresIn: 1) + XCTAssertTrue(result) + // 8 - check I can acquire lock after it has expired + try await Task.sleep(for: .seconds(1.5)) + result = try await jobQueue.queue.acquireLock(key: "lock", id: .init(string: "one"), expiresIn: 10) + XCTAssertTrue(result) + // 9 - release lock + try await jobQueue.queue.releaseLock(key: "lock", id: .init(string: "one")) + } + } + + func testMultipleQueueMetadataLock() async throws { + try await self.testJobQueue(numWorkers: 1, configuration: .init(queueName: "queue1")) { jobQueue1 in + try await self.testJobQueue(numWorkers: 1, configuration: .init(queueName: "queue2")) { jobQueue2 in + let result1 = try await jobQueue1.queue.acquireLock( + key: "testMultipleQueueMetadataLock", + id: .init(string: "queue1"), + expiresIn: 60 + ) + let result2 = try await jobQueue2.queue.acquireLock( + key: "testMultipleQueueMetadataLock", + id: .init(string: "queue2"), + expiresIn: 60 + ) + XCTAssert(result1) + XCTAssert(result2) + try await jobQueue1.queue.releaseLock(key: "testMultipleQueueMetadataLock", id: .init(string: "queue1")) + try await jobQueue2.queue.releaseLock(key: "testMultipleQueueMetadataLock", id: .init(string: "queue2")) + } + } + } }