Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/Config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ export interface Config {
formingNetworkCycleThreshold: number // CODE REVIEW WARNING: never allow this to be set more than 30. we have some trusted execution until this cycle is reached (specifically allowing global tx receipts) - will be refactored to be avoided
maxResponseSize: number
enableDuplicateReceiptsCheck: boolean // To enable duplicate receipts check in storeReceiptData and not allow overwriting of success with failure
receiptSignatureOptimization?: {
enabled: boolean
cacheSize?: number
batchSize?: number
}
}

let config: Config = {
Expand Down Expand Up @@ -295,6 +300,11 @@ let config: Config = {
formingNetworkCycleThreshold: 30,
maxResponseSize: 15 * 1024 * 1024, // 15MB
enableDuplicateReceiptsCheck: true,
receiptSignatureOptimization: {
enabled: false, // Disabled by default, can be enabled via config file
cacheSize: 10000,
batchSize: 100,
},
}
// Override default config params from config file, env vars, and cli args
export async function overrideDefaultConfig(file: string): Promise<void> {
Expand Down
10 changes: 10 additions & 0 deletions src/dbstore/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,16 @@ export const initializeDB = async (config: Config): Promise<void> => {
checkpointStatusDatabase,
'CREATE INDEX if not exists `checkpoint_status_unified_status` ON `checkpoint_status` (`cycle`)'
)

// Nodes table for receipt signature optimization
await runCreate(
receiptDatabase,
'CREATE TABLE if not exists `nodes` (`node_id` INTEGER PRIMARY KEY AUTOINCREMENT, `public_key` CHAR(64) UNIQUE NOT NULL, `first_seen` BIGINT NOT NULL)'
)
await runCreate(
receiptDatabase,
'CREATE INDEX if not exists `idx_nodes_public_key` ON `nodes` (`public_key`)'
)
}

export const closeDatabase = async (): Promise<void> => {
Expand Down
94 changes: 66 additions & 28 deletions src/dbstore/receipts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { AccountsCopy } from '../dbstore/accounts'
import { ReceiptCheckpointData, calculateBucketID, receiptCheckpointManager } from '../checkpoint/ReceiptData'
import { bulkUpdateCheckpointStatusField, CheckpointStatusType } from './checkpointStatus'
import * as State from '../State'
import { compressReceiptSignatures, decompressReceiptSignatures } from '../middleware/receiptTransformer'

// const superjson = require('superjson')
export type Proposal = {
Expand Down Expand Up @@ -119,6 +120,9 @@ export async function insertReceipt(receipt: Receipt, storeCheckpoints: boolean
receiptCheckpointManager.addData(checkpointData, bucketID)
}

// Compress receipt signatures if optimization is enabled
const processedReceipt = await compressReceiptSignatures(receipt)

// Define the columns to match the database schema
const columns = [
'receiptId',
Expand All @@ -140,9 +144,9 @@ export async function insertReceipt(receipt: Receipt, storeCheckpoints: boolean

// Map the receipt object to match the columns
const values = columns.map((column) =>
typeof receipt[column] === 'object'
? SerializeToJsonString(receipt[column]) // Serialize objects to JSON strings
: receipt[column]
typeof processedReceipt[column] === 'object'
? SerializeToJsonString(processedReceipt[column]) // Serialize objects to JSON strings
: processedReceipt[column]
)

// Execute the query directly
Expand Down Expand Up @@ -172,6 +176,37 @@ export async function bulkInsertReceipts(receipts: Receipt[], storeCheckpoints:
}
}

// Compress all receipts if optimization is enabled
const compressionResults = await Promise.allSettled(
receipts.map(receipt => compressReceiptSignatures(receipt))
)

// Separate successful and failed compressions
const processedReceipts = []
const failedCompressions = []

compressionResults.forEach((result, index) => {
if (result.status === 'fulfilled') {
processedReceipts.push(result.value)
} else {
failedCompressions.push({
receiptId: receipts[index].receiptId,
error: result.reason
})
Logger.mainLogger.error(`Failed to compress receipt ${receipts[index].receiptId}:`, result.reason)
}
})

// If no receipts were successfully processed, throw an error
if (processedReceipts.length === 0) {
throw new Error(`All receipt compressions failed. Total failures: ${failedCompressions.length}`)
}

// Log compression statistics
if (failedCompressions.length > 0) {
Logger.mainLogger.warn(`Receipt compression completed with ${processedReceipts.length} successes and ${failedCompressions.length} failures`)
}

// Define the table columns based on schema
const columns = [
'receiptId',
Expand All @@ -188,11 +223,11 @@ export async function bulkInsertReceipts(receipts: Receipt[], storeCheckpoints:
]

// Construct the SQL query with placeholders
const placeholders = receipts.map(() => `(${columns.map(() => '?').join(', ')})`).join(', ')
const placeholders = processedReceipts.map(() => `(${columns.map(() => '?').join(', ')})`).join(', ')
const sql = `INSERT OR REPLACE INTO receipts (${columns.join(', ')}) VALUES ${placeholders}`

// Flatten the `receipts` array into a single list of values
const values = receipts.flatMap((receipt) =>
const values = processedReceipts.flatMap((receipt) =>
columns.map((column) =>
typeof receipt[column] === 'object'
? SerializeToJsonString(receipt[column]) // Serialize objects to JSON
Expand Down Expand Up @@ -228,11 +263,10 @@ export async function queryReceiptByReceiptId(receiptId: string, timestamp = 0):
const sql = `SELECT * FROM receipts WHERE receiptId=?` + (timestamp ? ` AND timestamp=?` : '')
const value = timestamp ? [receiptId, timestamp] : [receiptId]
const receipt = (await db.get(receiptDatabase, sql, value)) as DbReceipt
if (receipt) deserializeDbReceipt(receipt)
if (config.VERBOSE) {
Logger.mainLogger.debug('Receipt receiptId', receipt)
if (receipt) {
return await deserializeDbReceipt(receipt)
}
return receipt
return null
} catch (e) {
Logger.mainLogger.error(e)
return null
Expand All @@ -248,12 +282,8 @@ export async function queryLatestReceipts(count: number): Promise<Receipt[]> {
const sql = `SELECT * FROM receipts ORDER BY cycle DESC, timestamp DESC LIMIT ${count ? count : 100}`
const receipts = (await db.all(receiptDatabase, sql)) as DbReceipt[]
if (receipts.length > 0) {
receipts.forEach((receipt: DbReceipt) => {
deserializeDbReceipt(receipt)
})
}
if (config.VERBOSE) {
Logger.mainLogger.debug('Receipt latest', receipts)
const deserializedReceipts = await Promise.all(receipts.map((receipt: DbReceipt) => deserializeDbReceipt(receipt)))
return deserializedReceipts
}
return receipts
} catch (e) {
Comment on lines 282 to 289
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: Returning the raw receipts array when no results are found may cause type inconsistencies, as it is typed as DbReceipt[] but the function promises Receipt[]. Return an empty array instead to maintain type safety. [possible issue, importance: 7]

New proposed code:
 export async function queryLatestReceipts(count: number): Promise<Receipt[]> {
   if (!Number.isInteger(count)) {
     Logger.mainLogger.error('queryLatestReceipts - Invalid count value')
     return null
   }
   try {
     const sql = `SELECT * FROM receipts ORDER BY cycle DESC, timestamp DESC LIMIT ${count ? count : 100}`
     const receipts = (await db.all(receiptDatabase, sql)) as DbReceipt[]
     if (receipts.length > 0) {
       const deserializedReceipts = await Promise.all(receipts.map((receipt: DbReceipt) => deserializeDbReceipt(receipt)))
       return deserializedReceipts
     }
-    return receipts
+    return []
   } catch (e) {
     Logger.mainLogger.error(e)

Expand All @@ -270,11 +300,9 @@ export async function queryReceipts(skip = 0, limit = 10000): Promise<Receipt[]>
}
try {
const sql = `SELECT * FROM receipts ORDER BY cycle ASC, timestamp ASC LIMIT ${limit} OFFSET ${skip}`
receipts = (await db.all(receiptDatabase, sql)) as DbReceipt[]
if (receipts.length > 0) {
receipts.forEach((receipt: DbReceipt) => {
deserializeDbReceipt(receipt)
})
const dbReceipts = (await db.all(receiptDatabase, sql)) as DbReceipt[]
if (dbReceipts.length > 0) {
receipts = await Promise.all(dbReceipts.map((receipt: DbReceipt) => deserializeDbReceipt(receipt)))
}
} catch (e) {
Logger.mainLogger.error(e)
Expand Down Expand Up @@ -356,11 +384,9 @@ export async function queryReceiptsBetweenCycles(
}
try {
const sql = `SELECT * FROM receipts WHERE cycle BETWEEN ? AND ? ORDER BY cycle ASC, timestamp ASC LIMIT ${limit} OFFSET ${skip}`
receipts = (await db.all(receiptDatabase, sql, [startCycleNumber, endCycleNumber])) as DbReceipt[]
if (receipts.length > 0) {
receipts.forEach((receipt: DbReceipt) => {
deserializeDbReceipt(receipt)
})
const dbReceipts = (await db.all(receiptDatabase, sql, [startCycleNumber, endCycleNumber])) as DbReceipt[]
if (dbReceipts.length > 0) {
receipts = await Promise.all(dbReceipts.map((receipt: DbReceipt) => deserializeDbReceipt(receipt)))
}
} catch (e) {
console.log(e)
Expand All @@ -383,9 +409,10 @@ export async function queryInitNetworkReceiptCountBetweenCycles(
`
const dbReceipts = (await db.all(receiptDatabase, sql, [startCycleNumber, endCycleNumber])) as DbReceipt[]

const filtered = dbReceipts.filter((receipt: DbReceipt) => {
deserializeDbReceipt(receipt)
// Deserialize all receipts first
const deserializedReceipts = await Promise.all(dbReceipts.map((receipt: DbReceipt) => deserializeDbReceipt(receipt)))

const filtered = deserializedReceipts.filter((receipt: Receipt) => {
// Inline type for safely accessing internalTXType
const tx = receipt.tx as {
originalTxData?: {
Expand All @@ -410,12 +437,23 @@ export async function queryInitNetworkReceiptCountBetweenCycles(
return count
}

function deserializeDbReceipt(receipt: DbReceipt): void {
async function deserializeDbReceipt(receipt: DbReceipt): Promise<Receipt> {
if (receipt.tx) receipt.tx = DeSerializeFromJsonString(receipt.tx)
if (receipt.beforeStates) receipt.beforeStates = DeSerializeFromJsonString(receipt.beforeStates)
if (receipt.afterStates) receipt.afterStates = DeSerializeFromJsonString(receipt.afterStates)
if (receipt.appReceiptData) receipt.appReceiptData = DeSerializeFromJsonString(receipt.appReceiptData)
if (receipt.signedReceipt) receipt.signedReceipt = DeSerializeFromJsonString(receipt.signedReceipt)
// globalModification is stored as 0 or 1 in the database, convert it to boolean
receipt.globalModification = (receipt.globalModification as unknown as number) === 1

// Decompress receipt signatures if optimization is enabled
const decompressedReceipt = await decompressReceiptSignatures(receipt as Receipt)

// Ensure we return a proper Receipt object with all required fields
return {
...decompressedReceipt,
receiptId: receipt.receiptId,
timestamp: receipt.timestamp,
applyTimestamp: receipt.applyTimestamp
} as Receipt
}
Loading
Loading