diff --git a/Sources/PerfectPostgreSQL/PerfectPostgreSQL.swift b/Sources/PerfectPostgreSQL/PerfectPostgreSQL.swift index ebbf057..5487f87 100644 --- a/Sources/PerfectPostgreSQL/PerfectPostgreSQL.swift +++ b/Sources/PerfectPostgreSQL/PerfectPostgreSQL.swift @@ -327,74 +327,100 @@ public final class PGResult { /// connection management class public final class PGConnection { - + /// Connection Status enum public enum StatusType { case ok case bad } - + + // Take care that conn is not thread-safe. var conn = OpaquePointer(bitPattern: 0) var connectInfo: String = "" - + + // Acquire this lock before accessing conn. + // Need to be recursive to support transaction. + private var lock = NSRecursiveLock() + /// empty init public init() { - + } - + deinit { close() } - + /// Makes a new connection to the database server. public func connectdb(_ info: String) -> StatusType { + lock.lock() + defer { lock.unlock() } + conn = PQconnectdb(info) connectInfo = info return status() } - + /// Close db connection public func close() { finish() } - + /// Closes the connection to the server. Also frees memory used by the PGconn object. public func finish() { + lock.lock() + defer { lock.unlock() } + if conn != nil { PQfinish(conn) conn = OpaquePointer(bitPattern: 0) } } - + /// Returns the status of the connection. public func status() -> StatusType { + lock.lock() + defer { lock.unlock() } + let status = PQstatus(conn) return status == CONNECTION_OK ? .ok : .bad } - + /// Returns the error message most recently generated by an operation on the connection. public func errorMessage() -> String { + lock.lock() + defer { lock.unlock() } + return String(validatingUTF8: PQerrorMessage(conn)) ?? "" } - + /// Submits a command to the server and waits for the result. public func exec(statement: String) -> PGResult { + lock.lock() + defer { lock.unlock() } + return PGResult(PQexec(conn, statement)) } - + /// Sends data to the server during COPY_IN state. public func putCopyData(data: String) { + lock.lock() + defer { lock.unlock() } + PQputCopyData(self.conn, data, Int32(data.count)) } - + /// Sends end-of-data indication to the server during COPY_IN state. /// If withError is set, the copy is forced to fail with the error description supplied. public func putCopyEnd(withError: String? = nil) -> PGResult { + lock.lock() + defer { lock.unlock() } + PQputCopyEnd(self.conn, withError) let result = PGResult(PQgetResult(self.conn)) return result } - + // !FIX! does not handle binary data /// Submits a command to the server and waits for the result, with the ability to pass parameters separately from the SQL command text. public func exec(statement: String, params: [Any?]) -> PGResult { @@ -456,10 +482,14 @@ public final class PGConnection { formats[idx] = 0 } } + + lock.lock() + defer { lock.unlock() } + let r = PQexecParams(conn, statement, Int32(count), nil, values, lengths, formats, Int32(0)) return PGResult(r) } - + /// Assert that the connection status is OK /// /// - throws: If the connection status is bad @@ -498,13 +528,16 @@ public final class PGConnection { throw PostgreSQLError.error("Failed to execute statement. status: \(status)") } } - + /// Executes a BEGIN, calls the provided closure and executes a ROLLBACK if an exception occurs or a COMMIT if no exception occurs. /// /// - parameter closure: Block to be executed inside transaction /// - throws: If the provided closure fails /// - returns: If successful then the return value from the `closure` public func doWithTransaction(closure: () throws -> Result) throws -> Result { + lock.lock() + defer { lock.unlock() } + try ensureStatusIsOk() try execute(statement: "BEGIN") do { @@ -516,23 +549,26 @@ public final class PGConnection { throw error } } - + /// Handler for receiving a PGResult public typealias ReceiverProc = (PGResult) -> Void - + /// Handler for processing a text message public typealias ProcessorProc = (String) -> Void - + /// internal callback for notice receiving internal var receiver: ReceiverProc = { _ in } - + /// internal callback for notice processing internal var processor: ProcessorProc = { _ in } - + /// Set a new notice receiver /// - parameter handler: a closure to handle the incoming notice /// - returns: a C convention function pointer; would be nil if failed to set. public func setReceiver(_ handler: @escaping ReceiverProc) -> PQnoticeReceiver? { + lock.lock() + defer { lock.unlock() } + guard let cn = self.conn else { return nil } @@ -547,11 +583,14 @@ public final class PGConnection { this.receiver(pgresult) }, me) } - + /// Set a new notice processor /// - parameter handler: a closure to handle the incoming notice /// - returns: a C convention function pointer; would be nil if failed to set. public func setProcessor(_ handler: @escaping ProcessorProc) -> PQnoticeProcessor?{ + lock.lock() + defer { lock.unlock() } + guard let cn = self.conn else { return nil }