@@ -32,6 +32,7 @@ import org.jitsi.utils.event.EventEmitter
3232import org.jitsi.utils.event.SyncEventEmitter
3333import org.jitsi.utils.logging2.createLogger
3434import org.jitsi.utils.observableWhenChanged
35+ import org.jitsi.utils.queue.PacketQueue
3536import org.jivesoftware.smack.PresenceListener
3637import org.jivesoftware.smack.SmackException
3738import org.jivesoftware.smack.XMPPConnection
@@ -86,12 +87,32 @@ class ChatRoomImpl(
8687 JicofoConfig .config.vnodeJoinLatencyInterval
8788 )
8889
90+ /* *
91+ * Latch that is counted down when the room metadata is set.
92+ * This is used to block the join until the room metadata is available.
93+ */
8994 private val roomMetadataLatch = CountDownLatch (1 )
9095
96+ /* *
97+ * Latch that is counted down when the room is joined. Processing presence blocks until this latch is counted down.
98+ */
99+ private val roomJoinedLatch = CountDownLatch (1 )
100+
91101 override fun visitorInvited () {
92102 pendingVisitorsCounter.eventPending()
93103 }
94104
105+ /* *
106+ * Process presence from smack in order in the IO pool.
107+ */
108+ private val presenceQueue = PacketQueue <Presence >(
109+ Integer .MAX_VALUE ,
110+ false ,
111+ " ChatRoomImpl presence queue" ,
112+ { doProcessPresence(it) },
113+ ioPool
114+ )
115+
95116 private val membersMap: MutableMap <EntityFullJid , ChatRoomMemberImpl > = ConcurrentHashMap ()
96117 override val members: List <ChatRoomMember >
97118 get() = synchronized(membersMap) { return membersMap.values.toList() }
@@ -324,6 +345,7 @@ class ChatRoomImpl(
324345 logger.warn(" Timed out waiting for RoomMetadata to be set. Will continue without it." )
325346 }
326347 }
348+ roomJoinedLatch.countDown()
327349
328350 return ChatRoomInfo (
329351 meetingId = config.getField(MucConfigFields .MEETING_ID )?.firstValue,
@@ -375,6 +397,11 @@ class ChatRoomImpl(
375397 muc.removeParticipantListener(this )
376398 leaveCallback(this )
377399
400+ // Unblock any threads waiting on the latches
401+ roomMetadataLatch.countDown()
402+ roomJoinedLatch.countDown()
403+ presenceQueue.close()
404+
378405 // Call MultiUserChat.leave() in an IO thread, because it now (with Smack 4.4.3) blocks waiting for a response
379406 // from the XMPP server (and we want ChatRoom#leave to return immediately).
380407 ioPool.execute {
@@ -614,17 +641,28 @@ class ChatRoomImpl(
614641 }
615642
616643 /* *
617- * Processes an incoming presence packet.
618- *
619- * @param presence the incoming presence.
644+ * Offload processing presence for the room from Smack's thread to a queue running in the jicofo IO pool.
620645 */
621646 override fun processPresence (presence : Presence ? ) {
622- if (presence == null || presence.error != null ) {
623- logger.warn(" Unable to handle packet: ${presence?.toXML()} " )
647+ if (presence == null ) {
648+ logger.warn(" Received null presence packet" )
624649 return
625650 }
651+ presenceQueue.add(presence)
652+ }
653+
654+ private fun doProcessPresence (presence : Presence ): Boolean {
655+ if (presence.error != null ) {
656+ logger.warn(" Received presence with error: ${presence.toXML()} " )
657+ return true
658+ }
626659 logger.trace { " Presence received ${presence.toXML()} " }
627660
661+ if (roomJoinedLatch.count > 0 ) {
662+ logger.info(" Received presence before roomJoinedLatch was counted down, waiting." )
663+ roomJoinedLatch.await()
664+ }
665+
628666 // Should never happen, but log if something is broken
629667 val myOccupantJid = this .myOccupantJid
630668 if (myOccupantJid == null ) {
@@ -635,6 +673,7 @@ class ChatRoomImpl(
635673 } else {
636674 processOtherPresence(presence)
637675 }
676+ return true
638677 }
639678
640679 override fun reloadConfiguration () {
0 commit comments