Skip to content

Add acquireLock, releaseLock. Fix metadata for multiple queues #39

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Hummingbird server framework project
//
// Copyright (c) 2025 the Hummingbird authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import Logging
import PostgresMigrations
import PostgresNIO

struct CreateJobMetadataMigration: DatabaseMigration {

func apply(connection: PostgresNIO.PostgresConnection, logger: Logging.Logger) async throws {
try await connection.query(
"""
CREATE TABLE IF NOT EXISTS swift_jobs.metadata(
key TEXT NOT NULL,
value BYTEA NOT NULL,
expires TIMESTAMPTZ NOT NULL DEFAULT 'infinity',
queue_name TEXT NOT NULL DEFAULT 'default',
CONSTRAINT key_queue_name PRIMARY KEY (key, queue_name)
)
""",
logger: logger
)
}

func revert(connection: PostgresNIO.PostgresConnection, logger: Logging.Logger) async throws {
try await connection.query(
"""
DROP TABLE swift_jobs.metadata
""",
logger: logger
)
}

var description: String { "__JobMetadataMigration__" }

Check warning on line 45 in Sources/JobsPostgres/Migrations/CreateJobMetadataMigration.swift

View check run for this annotation

Codecov / codecov/patch

Sources/JobsPostgres/Migrations/CreateJobMetadataMigration.swift#L45

Added line #L45 was not covered by tests
var group: DatabaseMigrationGroup { .jobQueue }
}
19 changes: 0 additions & 19 deletions Sources/JobsPostgres/Migrations/CreateSwiftJobsMigrations.swift
Original file line number Diff line number Diff line change
Expand Up @@ -55,25 +55,6 @@ struct CreateSwiftJobsMigrations: DatabaseMigration {
""",
logger: logger
)

try await connection.query(
"""
CREATE TABLE IF NOT EXISTS swift_jobs.queues_metadata(
key TEXT PRIMARY KEY,
value BYTEA NOT NULL,
queue_name TEXT NOT NULL DEFAULT 'default'
)
""",
logger: logger
)

try await connection.query(
"""
CREATE INDEX IF NOT EXISTS queues_metadata_key_queue_name_idx
ON swift_jobs.queues_metadata(key, queue_name)
""",
logger: logger
)
}

func revert(connection: PostgresNIO.PostgresConnection, logger: Logging.Logger) async throws {
Expand Down
103 changes: 77 additions & 26 deletions Sources/JobsPostgres/PostgresJobsQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
/// try await migrations.apply(client: postgresClient, logger: logger, dryRun: applyMigrations)
/// }
/// ```
public final class PostgresJobQueue: JobQueueDriver, JobMetadataDriver, CancellableJobQueue, ResumableJobQueue {
public final class PostgresJobQueue: JobQueueDriver, CancellableJobQueue, ResumableJobQueue {

public typealias JobID = UUID

Expand Down Expand Up @@ -158,6 +158,7 @@
self.isStopped = .init(false)
self.migrations = migrations
await migrations.add(CreateSwiftJobsMigrations(), skipDuplicates: true)
await migrations.add(CreateJobMetadataMigration(), skipDuplicates: true)
self.registerCleanupJob()
}

Expand Down Expand Up @@ -290,31 +291,6 @@
/// shutdown queue once all active jobs have been processed
public func shutdownGracefully() async {}

@inlinable
public func getMetadata(_ key: String) async throws -> ByteBuffer? {
let stream = try await self.client.query(
"SELECT value FROM swift_jobs.queues_metadata WHERE key = \(key) AND queue_name = \(configuration.queueName)",
logger: self.logger
)
for try await value in stream.decode(ByteBuffer.self) {
return value
}
return nil
}

@inlinable
public func setMetadata(key: String, value: ByteBuffer) async throws {
try await self.client.query(
"""
INSERT INTO swift_jobs.queues_metadata (key, value, queue_name)
VALUES (\(key), \(value), \(configuration.queueName))
ON CONFLICT (key)
DO UPDATE SET value = \(value)
""",
logger: self.logger
)
}

@usableFromInline
func popFirst() async throws -> JobQueueResult<JobID>? {
enum PopFirstResult {
Expand Down Expand Up @@ -575,6 +551,81 @@
}
}

extension PostgresJobQueue: JobMetadataDriver {
@inlinable
public func getMetadata(_ key: String) async throws -> ByteBuffer? {
let stream = try await self.client.query(
"SELECT value FROM swift_jobs.metadata WHERE key = \(key) AND queue_name = \(self.configuration.queueName)",
logger: self.logger
)
for try await value in stream.decode(ByteBuffer.self) {
return value
}
return nil

Check warning on line 564 in Sources/JobsPostgres/PostgresJobsQueue.swift

View check run for this annotation

Codecov / codecov/patch

Sources/JobsPostgres/PostgresJobsQueue.swift#L564

Added line #L564 was not covered by tests
}

@inlinable
public func setMetadata(key: String, value: ByteBuffer) async throws {
try await self.client.query(
"""
INSERT INTO swift_jobs.metadata (key, value, queue_name)
VALUES (\(key), \(value), \(self.configuration.queueName))
ON CONFLICT (key, queue_name)
DO UPDATE SET value = \(value)
""",
logger: self.logger
)
}

/// Acquire metadata lock
///
/// - Parameters:
/// - key: Metadata key
/// - id: Lock identifier
/// - expiresIn: When lock will expire
/// - Returns: If lock was acquired
@inlinable
public func acquireLock(key: String, id: ByteBuffer, expiresIn: TimeInterval) async throws -> Bool {
let expires = Date.now + expiresIn
// insert key, value, expiration into table. On conflict with key and queue_name only set value and
// expiration if expiration is out of date or value is the same
let stream = try await self.client.query(
"""
INSERT INTO swift_jobs.metadata (key, value, expires, queue_name)
VALUES (\(key), \(id), \(expires), \(self.configuration.queueName))
ON CONFLICT (key, queue_name)
DO UPDATE
SET value = \(id), expires = \(expires)
WHERE swift_jobs.metadata.expires <= now()
OR swift_jobs.metadata.value = \(id)
RETURNING value
""",
logger: self.logger
)
for try await value in stream.decode(ByteBuffer.self) {
return value == id
}
return false
}

/// Release metadata lock
///
/// - Parameters:
/// - key: Metadata key
/// - id: Lock identifier
@inlinable
public func releaseLock(key: String, id: ByteBuffer) async throws {
_ = try await self.client.query(
"""
DELETE FROM swift_jobs.metadata
WHERE key = \(key)
AND value = \(id)
AND queue_name = \(self.configuration.queueName)
"""
)
}
}

extension JobQueueDriver where Self == PostgresJobQueue {
/// Return Postgres driver for Job Queue
/// - Parameters:
Expand Down
64 changes: 64 additions & 0 deletions Tests/JobsPostgresTests/JobsTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,19 @@ final class JobsTests: XCTestCase {
}
}

func testMultipleQueueMetadata() async throws {
try await self.testJobQueue(numWorkers: 1, configuration: .init(queueName: "queue1")) { jobQueue1 in
try await self.testJobQueue(numWorkers: 1, configuration: .init(queueName: "queue2")) { jobQueue2 in
try await jobQueue1.queue.setMetadata(key: "test", value: .init(string: "queue1"))
try await jobQueue2.queue.setMetadata(key: "test", value: .init(string: "queue2"))
let value1 = try await jobQueue1.queue.getMetadata("test")
let value2 = try await jobQueue2.queue.getMetadata("test")
XCTAssertEqual(value1.map { String(buffer: $0) }, "queue1")
XCTAssertEqual(value2.map { String(buffer: $0) }, "queue2")
}
}
}

func testResumableAndPausableJobs() async throws {
struct TestParameters: JobParameters {
static let jobName = "TestJob"
Expand Down Expand Up @@ -993,4 +1006,55 @@ final class JobsTests: XCTestCase {
}
}
}

func testMetadataLock() async throws {
try await self.testJobQueue(numWorkers: 1) { jobQueue in
// 1 - acquire lock
var result = try await jobQueue.queue.acquireLock(key: "lock", id: .init(string: "one"), expiresIn: 10)
XCTAssertTrue(result)
// 2 - check I can acquire lock once I already have the lock
result = try await jobQueue.queue.acquireLock(key: "lock", id: .init(string: "one"), expiresIn: 10)
XCTAssertTrue(result)
// 3 - check I cannot acquire lock if a different identifer has it
result = try await jobQueue.queue.acquireLock(key: "lock", id: .init(string: "two"), expiresIn: 10)
XCTAssertFalse(result)
// 4 - release lock with identifier that doesn own it
try await jobQueue.queue.releaseLock(key: "lock", id: .init(string: "two"))
// 5 - check I still cannot acquire lock
result = try await jobQueue.queue.acquireLock(key: "lock", id: .init(string: "two"), expiresIn: 10)
XCTAssertFalse(result)
// 6 - release lock
try await jobQueue.queue.releaseLock(key: "lock", id: .init(string: "one"))
// 7 - check I can acquire lock after it has been released
result = try await jobQueue.queue.acquireLock(key: "lock", id: .init(string: "two"), expiresIn: 1)
XCTAssertTrue(result)
// 8 - check I can acquire lock after it has expired
try await Task.sleep(for: .seconds(1.5))
result = try await jobQueue.queue.acquireLock(key: "lock", id: .init(string: "one"), expiresIn: 10)
XCTAssertTrue(result)
// 9 - release lock
try await jobQueue.queue.releaseLock(key: "lock", id: .init(string: "one"))
}
}

func testMultipleQueueMetadataLock() async throws {
try await self.testJobQueue(numWorkers: 1, configuration: .init(queueName: "queue1")) { jobQueue1 in
try await self.testJobQueue(numWorkers: 1, configuration: .init(queueName: "queue2")) { jobQueue2 in
let result1 = try await jobQueue1.queue.acquireLock(
key: "testMultipleQueueMetadataLock",
id: .init(string: "queue1"),
expiresIn: 60
)
let result2 = try await jobQueue2.queue.acquireLock(
key: "testMultipleQueueMetadataLock",
id: .init(string: "queue2"),
expiresIn: 60
)
XCTAssert(result1)
XCTAssert(result2)
try await jobQueue1.queue.releaseLock(key: "testMultipleQueueMetadataLock", id: .init(string: "queue1"))
try await jobQueue2.queue.releaseLock(key: "testMultipleQueueMetadataLock", id: .init(string: "queue2"))
}
}
}
}
Loading