@@ -10,7 +10,7 @@ module.exports = class SlackApiRx {
1010 // Returns an {Observable} that signals completion
1111 static openDms ( slackApi , users ) {
1212 let ret = rx . Observable . fromArray ( users )
13- . flatMap ( user => SlackApiRx . getOrOpenDm ( slackApi , user ) )
13+ . flatMap ( ( user ) => SlackApiRx . getOrOpenDm ( slackApi , user ) )
1414 . reduce ( ( acc , x ) => {
1515 acc [ x . id ] = x . dm ;
1616 return acc ;
@@ -21,33 +21,67 @@ module.exports = class SlackApiRx {
2121 return ret ;
2222 }
2323
24- // Private: Checks for the existence of an open DM channel for the user, and
25- // opens one if necessary.
24+ // Private: Checks for the existence of an open DM channel for the user,
25+ // opens one if necessary, then waits for the `im_open` event and retrieves
26+ // the DM channel.
2627 //
2728 // slackApi - An instance of the Slack client
2829 // user - The user we are trying to DM with
2930 //
30- // Returns an {Observable} of the opened DM channel
31+ // Returns an {Observable} representing the opened channel. This will be an
32+ // object with two keys: `id` and `dm`. DM will be null if the API call
33+ // failed for some reason (e.g., an invalid user).
3134 static getOrOpenDm ( slackApi , user ) {
32- let readySubj = new rx . AsyncSubject ( ) ;
35+ console . log ( `Getting DM channel for ${ user . name } ` ) ;
3336 let dm = slackApi . getDMByName ( user . name ) ;
34- console . log ( `Opening DM channel for ${ user . name } ` ) ;
3537
36- if ( ! dm || ! dm . is_open ) {
37- slackApi . openDM ( user . id , ( result ) => {
38- if ( result . ok ) {
39- dm = slackApi . getDMByName ( user . name ) ;
40- readySubj . onNext ( { id : user . id , dm : dm } ) ;
41- readySubj . onCompleted ( ) ;
42- } else {
43- console . log ( `Unable to open DM for ${ user . name } : ${ result . error } ` ) ;
44- readySubj . onCompleted ( ) ;
45- }
46- } ) ;
47- } else {
48- readySubj . onNext ( { id : user . id , dm : dm } ) ;
49- readySubj . onCompleted ( ) ;
38+ // Bot players don't need DM channels; we only talk to humans
39+ if ( ( dm && dm . is_open ) || user . isBot ) {
40+ return rx . Observable . return ( { id : user . id , dm : dm } ) ;
5041 }
51- return readySubj ;
42+
43+ console . log ( `No open channel found, opening one using ${ user . id } ` ) ;
44+
45+ return SlackApiRx . openDm ( slackApi , user )
46+ . flatMap ( ( ) => SlackApiRx . waitForDmToOpen ( slackApi , user ) )
47+ . flatMap ( ( dm ) => rx . Observable . return ( { id : user . id , dm : dm } ) )
48+ . catch ( rx . Observable . return ( { id : user . id , dm : null } ) ) ;
49+ }
50+
51+ // Private: Maps the `im.open` API call into an {Observable}.
52+ //
53+ // Returns an {Observable} that signals completion, or an error if the API
54+ // call fails
55+ static openDm ( slackApi , user ) {
56+ let calledOpen = new rx . AsyncSubject ( ) ;
57+
58+ slackApi . openDM ( user . id , ( result ) => {
59+ if ( result . ok ) {
60+ calledOpen . onNext ( user . name ) ;
61+ calledOpen . onCompleted ( ) ;
62+ } else {
63+ console . log ( `Unable to open DM for ${ user . name } : ${ result . error } ` ) ;
64+ calledOpen . onError ( new Error ( result . error ) ) ;
65+ }
66+ } )
67+
68+ return calledOpen ;
69+ }
70+
71+ // Private: The `im.open` callback is still not late enough; we need to wait
72+ // for the client's underlying list of DM's to be updated. When that occurs
73+ // we can retrieve the DM.
74+ //
75+ // Returns a replayable {Observable} containing the opened DM channel
76+ static waitForDmToOpen ( slackApi , user ) {
77+ let ret = rx . DOM . fromEvent ( slackApi , 'raw_message' )
78+ . where ( ( m ) => m . type === 'im_open' && m . user === user . id )
79+ . take ( 1 )
80+ . flatMap ( ( ) => rx . Observable . timer ( 100 ) . map ( ( ) =>
81+ slackApi . getDMByName ( user . name ) ) )
82+ . publishLast ( ) ;
83+
84+ ret . connect ( ) ;
85+ return ret ;
5286 }
5387} ;
0 commit comments