1
+ import asyncio
1
2
import pickle
2
3
from logging import getLogger
3
- from typing import Any , AsyncGenerator , Callable , Optional , TypeVar
4
+ from typing import Any , Callable , Coroutine , Optional , TypeVar
4
5
5
6
from redis .asyncio import ConnectionPool , Redis
6
7
from taskiq .abc .broker import AsyncBroker
@@ -49,7 +50,7 @@ def __init__(
49
50
50
51
async def shutdown (self ) -> None :
51
52
"""Closes redis connection pool."""
52
- self .connection_pool .disconnect ()
53
+ await self .connection_pool .disconnect ()
53
54
54
55
async def kick (self , message : BrokerMessage ) -> None :
55
56
"""
@@ -69,26 +70,30 @@ async def kick(self, message: BrokerMessage) -> None:
69
70
pickle .dumps (message ),
70
71
)
71
72
72
- async def listen (self ) -> AsyncGenerator [BrokerMessage , None ]:
73
+ async def listen (
74
+ self ,
75
+ callback : Callable [[BrokerMessage ], Coroutine [Any , Any , None ]],
76
+ ) -> None :
73
77
"""
74
78
Listen redis list for new messages.
75
79
76
- This function listens to list and yields new messages.
80
+ This function listens to list calls callback on
81
+ new messages.
77
82
78
- :yields: parsed broker messages .
83
+ :param callback: function to call on new message .
79
84
"""
85
+ loop = asyncio .get_event_loop ()
80
86
async with Redis (connection_pool = self .connection_pool ) as redis_conn :
81
87
redis_pubsub_channel = redis_conn .pubsub ()
82
88
await redis_pubsub_channel .subscribe (self .redis_pubsub_channel )
83
- while True :
84
- redis_pickled_message = await redis_pubsub_channel .get_message ()
85
- if redis_pickled_message :
89
+ async for message in redis_pubsub_channel .listen ():
90
+ if message :
86
91
try :
87
92
redis_message = pickle .loads (
88
- redis_pickled_message ["data" ],
93
+ message ["data" ],
89
94
)
90
95
if isinstance (redis_message , BrokerMessage ):
91
- yield redis_message
96
+ loop . create_task ( callback ( redis_message ))
92
97
except (
93
98
TypeError ,
94
99
AttributeError ,
0 commit comments