Skip to content

Commit bd77418

Browse files
authored
Add acquireLock, releaseLock. Fix metadata for multiple queues (#39)
* Add acquireLock, releaseLock. Fix metadata for multiple queues * Comment * Use infinity not +infinity * Revert to using main branch of swift-jobs
1 parent 489e671 commit bd77418

File tree

4 files changed

+188
-45
lines changed

4 files changed

+188
-45
lines changed
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the Hummingbird server framework project
4+
//
5+
// Copyright (c) 2025 the Hummingbird authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import Logging
16+
import PostgresMigrations
17+
import PostgresNIO
18+
19+
struct CreateJobMetadataMigration: DatabaseMigration {
20+
21+
func apply(connection: PostgresNIO.PostgresConnection, logger: Logging.Logger) async throws {
22+
try await connection.query(
23+
"""
24+
CREATE TABLE IF NOT EXISTS swift_jobs.metadata(
25+
key TEXT NOT NULL,
26+
value BYTEA NOT NULL,
27+
expires TIMESTAMPTZ NOT NULL DEFAULT 'infinity',
28+
queue_name TEXT NOT NULL DEFAULT 'default',
29+
CONSTRAINT key_queue_name PRIMARY KEY (key, queue_name)
30+
)
31+
""",
32+
logger: logger
33+
)
34+
}
35+
36+
func revert(connection: PostgresNIO.PostgresConnection, logger: Logging.Logger) async throws {
37+
try await connection.query(
38+
"""
39+
DROP TABLE swift_jobs.metadata
40+
""",
41+
logger: logger
42+
)
43+
}
44+
45+
var description: String { "__JobMetadataMigration__" }
46+
var group: DatabaseMigrationGroup { .jobQueue }
47+
}

Sources/JobsPostgres/Migrations/CreateSwiftJobsMigrations.swift

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -55,25 +55,6 @@ struct CreateSwiftJobsMigrations: DatabaseMigration {
5555
""",
5656
logger: logger
5757
)
58-
59-
try await connection.query(
60-
"""
61-
CREATE TABLE IF NOT EXISTS swift_jobs.queues_metadata(
62-
key TEXT PRIMARY KEY,
63-
value BYTEA NOT NULL,
64-
queue_name TEXT NOT NULL DEFAULT 'default'
65-
)
66-
""",
67-
logger: logger
68-
)
69-
70-
try await connection.query(
71-
"""
72-
CREATE INDEX IF NOT EXISTS queues_metadata_key_queue_name_idx
73-
ON swift_jobs.queues_metadata(key, queue_name)
74-
""",
75-
logger: logger
76-
)
7758
}
7859

7960
func revert(connection: PostgresNIO.PostgresConnection, logger: Logging.Logger) async throws {

Sources/JobsPostgres/PostgresJobsQueue.swift

Lines changed: 77 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ import PostgresNIO
4242
/// try await migrations.apply(client: postgresClient, logger: logger, dryRun: applyMigrations)
4343
/// }
4444
/// ```
45-
public final class PostgresJobQueue: JobQueueDriver, JobMetadataDriver, CancellableJobQueue, ResumableJobQueue {
45+
public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, ResumableJobQueue {
4646

4747
public typealias JobID = UUID
4848

@@ -158,6 +158,7 @@ public final class PostgresJobQueue: JobQueueDriver, JobMetadataDriver, Cancella
158158
self.isStopped = .init(false)
159159
self.migrations = migrations
160160
await migrations.add(CreateSwiftJobsMigrations(), skipDuplicates: true)
161+
await migrations.add(CreateJobMetadataMigration(), skipDuplicates: true)
161162
self.registerCleanupJob()
162163
}
163164

@@ -290,31 +291,6 @@ public final class PostgresJobQueue: JobQueueDriver, JobMetadataDriver, Cancella
290291
/// shutdown queue once all active jobs have been processed
291292
public func shutdownGracefully() async {}
292293

293-
@inlinable
294-
public func getMetadata(_ key: String) async throws -> ByteBuffer? {
295-
let stream = try await self.client.query(
296-
"SELECT value FROM swift_jobs.queues_metadata WHERE key = \(key) AND queue_name = \(configuration.queueName)",
297-
logger: self.logger
298-
)
299-
for try await value in stream.decode(ByteBuffer.self) {
300-
return value
301-
}
302-
return nil
303-
}
304-
305-
@inlinable
306-
public func setMetadata(key: String, value: ByteBuffer) async throws {
307-
try await self.client.query(
308-
"""
309-
INSERT INTO swift_jobs.queues_metadata (key, value, queue_name)
310-
VALUES (\(key), \(value), \(configuration.queueName))
311-
ON CONFLICT (key)
312-
DO UPDATE SET value = \(value)
313-
""",
314-
logger: self.logger
315-
)
316-
}
317-
318294
@usableFromInline
319295
func popFirst() async throws -> JobQueueResult<JobID>? {
320296
enum PopFirstResult {
@@ -575,6 +551,81 @@ extension PostgresJobQueue {
575551
}
576552
}
577553

554+
extension PostgresJobQueue: JobMetadataDriver {
555+
@inlinable
556+
public func getMetadata(_ key: String) async throws -> ByteBuffer? {
557+
let stream = try await self.client.query(
558+
"SELECT value FROM swift_jobs.metadata WHERE key = \(key) AND queue_name = \(self.configuration.queueName)",
559+
logger: self.logger
560+
)
561+
for try await value in stream.decode(ByteBuffer.self) {
562+
return value
563+
}
564+
return nil
565+
}
566+
567+
@inlinable
568+
public func setMetadata(key: String, value: ByteBuffer) async throws {
569+
try await self.client.query(
570+
"""
571+
INSERT INTO swift_jobs.metadata (key, value, queue_name)
572+
VALUES (\(key), \(value), \(self.configuration.queueName))
573+
ON CONFLICT (key, queue_name)
574+
DO UPDATE SET value = \(value)
575+
""",
576+
logger: self.logger
577+
)
578+
}
579+
580+
/// Acquire metadata lock
581+
///
582+
/// - Parameters:
583+
/// - key: Metadata key
584+
/// - id: Lock identifier
585+
/// - expiresIn: When lock will expire
586+
/// - Returns: If lock was acquired
587+
@inlinable
588+
public func acquireLock(key: String, id: ByteBuffer, expiresIn: TimeInterval) async throws -> Bool {
589+
let expires = Date.now + expiresIn
590+
// insert key, value, expiration into table. On conflict with key and queue_name only set value and
591+
// expiration if expiration is out of date or value is the same
592+
let stream = try await self.client.query(
593+
"""
594+
INSERT INTO swift_jobs.metadata (key, value, expires, queue_name)
595+
VALUES (\(key), \(id), \(expires), \(self.configuration.queueName))
596+
ON CONFLICT (key, queue_name)
597+
DO UPDATE
598+
SET value = \(id), expires = \(expires)
599+
WHERE swift_jobs.metadata.expires <= now()
600+
OR swift_jobs.metadata.value = \(id)
601+
RETURNING value
602+
""",
603+
logger: self.logger
604+
)
605+
for try await value in stream.decode(ByteBuffer.self) {
606+
return value == id
607+
}
608+
return false
609+
}
610+
611+
/// Release metadata lock
612+
///
613+
/// - Parameters:
614+
/// - key: Metadata key
615+
/// - id: Lock identifier
616+
@inlinable
617+
public func releaseLock(key: String, id: ByteBuffer) async throws {
618+
_ = try await self.client.query(
619+
"""
620+
DELETE FROM swift_jobs.metadata
621+
WHERE key = \(key)
622+
AND value = \(id)
623+
AND queue_name = \(self.configuration.queueName)
624+
"""
625+
)
626+
}
627+
}
628+
578629
extension JobQueueDriver where Self == PostgresJobQueue {
579630
/// Return Postgres driver for Job Queue
580631
/// - Parameters:

Tests/JobsPostgresTests/JobsTests.swift

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -697,6 +697,19 @@ final class JobsTests: XCTestCase {
697697
}
698698
}
699699

700+
func testMultipleQueueMetadata() async throws {
701+
try await self.testJobQueue(numWorkers: 1, configuration: .init(queueName: "queue1")) { jobQueue1 in
702+
try await self.testJobQueue(numWorkers: 1, configuration: .init(queueName: "queue2")) { jobQueue2 in
703+
try await jobQueue1.queue.setMetadata(key: "test", value: .init(string: "queue1"))
704+
try await jobQueue2.queue.setMetadata(key: "test", value: .init(string: "queue2"))
705+
let value1 = try await jobQueue1.queue.getMetadata("test")
706+
let value2 = try await jobQueue2.queue.getMetadata("test")
707+
XCTAssertEqual(value1.map { String(buffer: $0) }, "queue1")
708+
XCTAssertEqual(value2.map { String(buffer: $0) }, "queue2")
709+
}
710+
}
711+
}
712+
700713
func testResumableAndPausableJobs() async throws {
701714
struct TestParameters: JobParameters {
702715
static let jobName = "TestJob"
@@ -993,4 +1006,55 @@ final class JobsTests: XCTestCase {
9931006
}
9941007
}
9951008
}
1009+
1010+
func testMetadataLock() async throws {
1011+
try await self.testJobQueue(numWorkers: 1) { jobQueue in
1012+
// 1 - acquire lock
1013+
var result = try await jobQueue.queue.acquireLock(key: "lock", id: .init(string: "one"), expiresIn: 10)
1014+
XCTAssertTrue(result)
1015+
// 2 - check I can acquire lock once I already have the lock
1016+
result = try await jobQueue.queue.acquireLock(key: "lock", id: .init(string: "one"), expiresIn: 10)
1017+
XCTAssertTrue(result)
1018+
// 3 - check I cannot acquire lock if a different identifer has it
1019+
result = try await jobQueue.queue.acquireLock(key: "lock", id: .init(string: "two"), expiresIn: 10)
1020+
XCTAssertFalse(result)
1021+
// 4 - release lock with identifier that doesn own it
1022+
try await jobQueue.queue.releaseLock(key: "lock", id: .init(string: "two"))
1023+
// 5 - check I still cannot acquire lock
1024+
result = try await jobQueue.queue.acquireLock(key: "lock", id: .init(string: "two"), expiresIn: 10)
1025+
XCTAssertFalse(result)
1026+
// 6 - release lock
1027+
try await jobQueue.queue.releaseLock(key: "lock", id: .init(string: "one"))
1028+
// 7 - check I can acquire lock after it has been released
1029+
result = try await jobQueue.queue.acquireLock(key: "lock", id: .init(string: "two"), expiresIn: 1)
1030+
XCTAssertTrue(result)
1031+
// 8 - check I can acquire lock after it has expired
1032+
try await Task.sleep(for: .seconds(1.5))
1033+
result = try await jobQueue.queue.acquireLock(key: "lock", id: .init(string: "one"), expiresIn: 10)
1034+
XCTAssertTrue(result)
1035+
// 9 - release lock
1036+
try await jobQueue.queue.releaseLock(key: "lock", id: .init(string: "one"))
1037+
}
1038+
}
1039+
1040+
func testMultipleQueueMetadataLock() async throws {
1041+
try await self.testJobQueue(numWorkers: 1, configuration: .init(queueName: "queue1")) { jobQueue1 in
1042+
try await self.testJobQueue(numWorkers: 1, configuration: .init(queueName: "queue2")) { jobQueue2 in
1043+
let result1 = try await jobQueue1.queue.acquireLock(
1044+
key: "testMultipleQueueMetadataLock",
1045+
id: .init(string: "queue1"),
1046+
expiresIn: 60
1047+
)
1048+
let result2 = try await jobQueue2.queue.acquireLock(
1049+
key: "testMultipleQueueMetadataLock",
1050+
id: .init(string: "queue2"),
1051+
expiresIn: 60
1052+
)
1053+
XCTAssert(result1)
1054+
XCTAssert(result2)
1055+
try await jobQueue1.queue.releaseLock(key: "testMultipleQueueMetadataLock", id: .init(string: "queue1"))
1056+
try await jobQueue2.queue.releaseLock(key: "testMultipleQueueMetadataLock", id: .init(string: "queue2"))
1057+
}
1058+
}
1059+
}
9961060
}

0 commit comments

Comments
 (0)