Feature: Parallel message processing #796 #832
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
The goal of this PR is to process messages in parallel based on address
Related Clickup or Jira tickets : ALEPH-XXX
Self proofreading checklist
Changes
This pull request introduces significant changes to migrate the database session handling from synchronous to asynchronous operations across multiple modules. Additionally, a new column and indexes are added to the
pending_messages
table to optimize querying. Below are the most important changes grouped by theme:Database Migration
content_address
column to thepending_messages
table, which is computed from thecontent
JSON field, and created two new indexes (ix_pending_messages_content_address
andix_pending_messages_content_address_attempt
) to improve query performance.Transition to Asynchronous Database Sessions
DbSessionFactory
andDbSession
to their asynchronous counterparts (AsyncDbSessionFactory
andAsyncDbSession
) across all modules, includingsrc/aleph/api_entrypoint.py
,src/aleph/chains/bsc.py
,src/aleph/chains/ethereum.py
,src/aleph/chains/nuls2.py
,src/aleph/chains/connector.py
, andsrc/aleph/chains/indexer_reader.py
. [1] [2] [3] [4] [5] [6]Refactoring to Support Asynchronous Operations
with self.session_factory() as session
) with asynchronous context managers (async with self.session_factory() as session
) and updated database operations to useawait
for methods likecommit
,upsert
, and query execution. [1] [2] [3] [4]Updates to Chain Connectors and Services
BscConnector
,EthereumConnector
,Nuls2Connector
) and services (e.g.,ChainDataService
,AlephIndexerReader
) to useAsyncDbSessionFactory
and updated their methods to handle asynchronous database interactions. [1] [2] [3] [4]Asynchronous Pending Transaction Handling
add_pending_tx
,add_and_publish_pending_tx
) to use asynchronous database operations and ensure consistency between the database and message queue. [1] [2]These changes collectively improve the scalability and responsiveness of the system by enabling non-blocking database operations and optimizing query performance.
How to test
Explain how to test your PR.
Full node syncing to ensure no issue
Print screen / video
Notes
If a node have lot's of pending messages the migrations might take some times since it's need to create new index and and computed the value of the address (based on the content address) to simplify access during the processing