diff --git a/go.mod b/go.mod index f4b3b7047..a998f577d 100644 --- a/go.mod +++ b/go.mod @@ -40,3 +40,5 @@ require ( google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect google.golang.org/grpc v1.68.0 // indirect ) + +replace github.com/openimsdk/protocol => github.com/rookiewwj/protocol v0.0.1 diff --git a/go.sum b/go.sum index f8a2d6dd6..78d7e4483 100644 --- a/go.sum +++ b/go.sum @@ -30,8 +30,6 @@ github.com/lestrrat-go/strftime v1.0.6 h1:CFGsDEt1pOpFNU+TJB0nhz9jl+K0hZSLE205Ah github.com/lestrrat-go/strftime v1.0.6/go.mod h1:f7jQKgV5nnJpYgdEasS+/y7EsTb8ykN2z68n3TtcTaw= github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU= github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= -github.com/openimsdk/protocol v0.0.73-alpha.12 h1:2NYawXeHChYUeSme6QJ9pOLh+Empce2WmwEtbP4JvKk= -github.com/openimsdk/protocol v0.0.73-alpha.12/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= github.com/openimsdk/tools v0.0.50-alpha.80 h1:Nvt97Vm85CXr633Jf7WjRJeL2nxJJjwlZJFDgWWXkJU= github.com/openimsdk/tools v0.0.50-alpha.80/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo= github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= @@ -41,6 +39,8 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rookiewwj/protocol v0.0.1 h1:Bd9F8FfE/viObERdhEuRm8tfJkE82dl244WvK1fNF2M= +github.com/rookiewwj/protocol v0.0.1/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= diff --git a/internal/conversation_msg/api.go b/internal/conversation_msg/api.go index 0cbc08943..0d3b1279b 100644 --- a/internal/conversation_msg/api.go +++ b/internal/conversation_msg/api.go @@ -781,6 +781,10 @@ func (c *Conversation) RevokeMessage(ctx context.Context, conversationID, client return c.revokeOneMessage(ctx, conversationID, clientMsgID) } +func (c *Conversation) ProjectGroupReadInfo(ctx context.Context, conversationID string, clientMsgIDs []string) error { + return c.projectGroupReadInfo(ctx, conversationID, clientMsgIDs) +} + func (c *Conversation) TypingStatusUpdate(ctx context.Context, recvID, msgTip string) error { return c.typingStatusUpdate(ctx, recvID, msgTip) } diff --git a/internal/conversation_msg/projection.go b/internal/conversation_msg/projection.go new file mode 100644 index 000000000..fe91e5aa0 --- /dev/null +++ b/internal/conversation_msg/projection.go @@ -0,0 +1,97 @@ +package conversation_msg + +import ( + "context" + "sort" + + "github.com/openimsdk/openim-sdk-core/v3/pkg/db/model_struct" + "github.com/openimsdk/openim-sdk-core/v3/pkg/utils" + "github.com/openimsdk/openim-sdk-core/v3/sdk_struct" + "github.com/openimsdk/tools/errs" + "github.com/openimsdk/tools/log" +) + +func (c *Conversation) projectGroupReadInfo(ctx context.Context, conversationID string, clientMsgIDs []string) error { + + if len(clientMsgIDs) == 0 { + return nil + } + + cursorState, err := c.db.GetGroupReadCursorState(ctx, conversationID) + if err != nil { + return errs.WrapMsg(err, "GetGroupReadCursorState failed", "conversationID", conversationID) + } + var cursorVersion int64 + if cursorState != nil { + cursorVersion = cursorState.CursorVersion + } + + var messages []*model_struct.LocalChatLog + for _, clientMsgID := range clientMsgIDs { + message, err := c.waitForMessageSyncSeq(ctx, conversationID, clientMsgID) + if err != nil { + log.ZWarn(ctx, "waitForMessageSyncSeq failed", err, "conversationID", conversationID, "clientMsgID", clientMsgID) + continue + } + messages = append(messages, message) + } + if len(messages) == 0 { + return nil + } + + cursors, err := c.db.GetGroupReadCursorsByConversationID(ctx, conversationID) // []{UserID, MaxReadSeq} + if err != nil { + return errs.WrapMsg(err, "GetGroupReadCursorsByConversationID failed", "conversationID", conversationID) + } + memberCount := len(cursors) + + sortedMaxSeqs := make([]int64, 0, memberCount) + type pair struct { + uid string + seq int64 + } + userSeqPairs := make([]pair, 0, memberCount) + + for _, cur := range cursors { + sortedMaxSeqs = append(sortedMaxSeqs, cur.MaxReadSeq) + userSeqPairs = append(userSeqPairs, pair{uid: cur.UserID, seq: cur.MaxReadSeq}) + } + sort.Slice(sortedMaxSeqs, func(i, j int) bool { return sortedMaxSeqs[i] < sortedMaxSeqs[j] }) + + for _, m := range messages { + if m == nil || m.Seq == 0 { + continue + } + var attach sdk_struct.AttachedInfoElem + utils.JsonStringToStruct(m.AttachedInfo, &attach) + + alreadyFresh := (attach.GroupHasReadInfo.ReadCursorVersion == cursorVersion) + if alreadyFresh { + continue + } + + idx := sort.Search(len(sortedMaxSeqs), func(i int) bool { return sortedMaxSeqs[i] >= m.Seq }) + hasReadCount := len(sortedMaxSeqs) - idx + + attach.GroupHasReadInfo.HasReadCount = int32(hasReadCount) + attach.GroupHasReadInfo.GroupMemberCount = int32(memberCount) + attach.GroupHasReadInfo.ReadCursorVersion = cursorVersion + + if hasReadCount > 0 { + list := make([]string, 0, hasReadCount) + for _, p := range userSeqPairs { + if p.seq >= m.Seq { + list = append(list, p.uid) + } + } + attach.GroupHasReadInfo.HasReadUserIDList = list + } + + m.AttachedInfo = utils.StructToJsonString(attach) + if err := c.db.UpdateMessage(ctx, conversationID, m); err != nil { + log.ZWarn(ctx, "projectGroupReadInfo UpdateMessage err", err, "conversationID", conversationID, "seq", m.Seq, "clientMsgID", m.ClientMsgID) + } + } + + return nil +} diff --git a/internal/conversation_msg/read_drawing.go b/internal/conversation_msg/read_drawing.go index ad97fd57d..12e9aaac2 100644 --- a/internal/conversation_msg/read_drawing.go +++ b/internal/conversation_msg/read_drawing.go @@ -26,6 +26,7 @@ import ( "github.com/openimsdk/openim-sdk-core/v3/sdk_struct" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/utils/datautil" + "gorm.io/gorm" "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/log" @@ -248,7 +249,8 @@ func (c *Conversation) doReadDrawing(ctx context.Context, msg *sdkws.MsgData) er return err } - if conversation.ConversationType == constant.SingleChatType { + switch conversation.ConversationType { + case constant.SingleChatType: latestMsg := &sdk_struct.MsgStruct{} if err := json.Unmarshal([]byte(conversation.LatestMsg), latestMsg); err != nil { log.ZWarn(ctx, "Unmarshal err", err, "conversationID", tips.ConversationID, "latestMsg", conversation.LatestMsg) @@ -277,7 +279,46 @@ func (c *Conversation) doReadDrawing(ctx context.Context, msg *sdkws.MsgData) er var messageReceiptResp = []*sdk_struct.MessageReceipt{{UserID: tips.MarkAsReadUserID, MsgIDList: successMsgIDs, SessionType: conversation.ConversationType, ReadTime: msg.SendTime}} c.msgListener().OnRecvC2CReadReceipt(utils.StructToJsonString(messageReceiptResp)) + + case constant.ReadGroupChatType: + + maxReadSeq := tips.HasReadSeq + if maxReadSeq > 0 { + cursor, err := c.db.GetGroupReadCursor(ctx, tips.ConversationID, tips.MarkAsReadUserID) + if err != nil { + if err == gorm.ErrRecordNotFound { + newCursor := &model_struct.LocalGroupReadCursor{ + ConversationID: tips.ConversationID, + UserID: tips.MarkAsReadUserID, + MaxReadSeq: maxReadSeq, + } + if err = c.db.InsertGroupReadCursor(ctx, newCursor); err != nil { + log.ZWarn(ctx, "InsertGroupReadCursor err", err, "conversationID", tips.ConversationID, "userID", tips.MarkAsReadUserID) + } + if err = c.db.IncrementGroupReadCursorVersion(ctx, tips.ConversationID); err != nil { + log.ZWarn(ctx, "IncrementGroupReadCursorVersion err", err, "conversationID", tips.ConversationID, "userID", tips.MarkAsReadUserID) + } + } else { + log.ZWarn(ctx, "GetGroupReadCursor err", err, "conversationID", tips.ConversationID, "userID", tips.MarkAsReadUserID) + return err + } + + } else { + if maxReadSeq > cursor.MaxReadSeq { + if err = c.db.UpdateGroupReadCursor(ctx, tips.ConversationID, tips.MarkAsReadUserID, maxReadSeq); err != nil { + log.ZWarn(ctx, "UpdateGroupReadCursor err", err, "conversationID", tips.ConversationID, "userID", tips.MarkAsReadUserID) + } + if err = c.db.IncrementGroupReadCursorVersion(ctx, tips.ConversationID); err != nil { + log.ZWarn(ctx, "IncrementGroupReadCursorVersion err", err, "conversationID", tips.ConversationID, "userID", tips.MarkAsReadUserID) + } + } + } + } + var groupReceiptResp = []*sdk_struct.MessageReceipt{{GroupID: conversation.GroupID, UserID: tips.MarkAsReadUserID, MsgIDList: nil, + SessionType: conversation.ConversationType, ReadTime: msg.SendTime}} + c.msgListener().OnRecvGroupReadReceipt(utils.StructToJsonString(groupReceiptResp)) } + } else { return c.doUnreadCount(ctx, conversation, tips.HasReadSeq, tips.Seqs) } diff --git a/internal/conversation_msg/server_api.go b/internal/conversation_msg/server_api.go index 3eaa13f2e..bc4630383 100644 --- a/internal/conversation_msg/server_api.go +++ b/internal/conversation_msg/server_api.go @@ -74,6 +74,11 @@ func (c *Conversation) getIncrementalConversationFromServer(ctx context.Context, return api.GetIncrementalConversation.Invoke(ctx, req) } +func (c *Conversation) getConversationReadCursors(ctx context.Context, conversationIDs []string) (*pbConversation.GetConversationReadCursorsResp, error) { + req := &pbConversation.GetConversationReadCursorsReq{ConversationIDs: conversationIDs} + return api.GetConversationReadCursors.Invoke(ctx, req) +} + func (c *Conversation) GetActiveConversations(ctx context.Context) ([]*jssdk.ConversationMsg, error) { conf, err := cliconf.GetClientConfig(ctx) if err != nil { diff --git a/internal/conversation_msg/sync.go b/internal/conversation_msg/sync.go index 6da09a55c..c5d6518f6 100644 --- a/internal/conversation_msg/sync.go +++ b/internal/conversation_msg/sync.go @@ -131,6 +131,95 @@ func (c *Conversation) SyncAllConversationHashReadSeqs(ctx context.Context) erro log.ZDebug(ctx, "TriggerCmdUpdateConversation completed", "duration", time.Since(stepStartTime).Seconds()) } + stepStartTime = time.Now() + if err := c.syncAllGroupReadCursors(ctx); err != nil { + log.ZWarn(ctx, "syncAllGroupReadCursors failed", err) + } + log.ZDebug(ctx, "syncAllGroupReadCursors completed", "duration", time.Since(stepStartTime).Seconds()) + log.ZDebug(ctx, "SyncAllConversationHashReadSeqs completed", "totalDuration", time.Since(startTime).Seconds()) return nil } + +func (c *Conversation) syncAllGroupReadCursors(ctx context.Context) error { + conversations, err := c.db.GetAllConversations(ctx) + if err != nil { + log.ZWarn(ctx, "GetAllConversations failed", err) + return err + } + + var groupConversationIDs []string + for _, conv := range conversations { + if conv.ConversationType == constant.ReadGroupChatType { + groupConversationIDs = append(groupConversationIDs, conv.ConversationID) + } + } + + if len(groupConversationIDs) == 0 { + log.ZDebug(ctx, "no group conversations to sync cursors") + return nil + } + + log.ZDebug(ctx, "found group conversations", "count", len(groupConversationIDs), "conversationIDs", groupConversationIDs) + + for _, conversationID := range groupConversationIDs { + if _, err := c.db.GetGroupReadCursorState(ctx, conversationID); err != nil { + if ierr := c.db.InsertGroupReadCursorState(ctx, &model_struct.LocalGroupReadCursorState{ConversationID: conversationID, CursorVersion: 1}); ierr != nil { + log.ZWarn(ctx, "InsertGroupReadCursorState failed", ierr, "conversationID", conversationID) + } else { + log.ZDebug(ctx, "initialized LocalGroupReadCursorState", "conversationID", conversationID) + } + } + } + + stepStartTime := time.Now() + resp, err := c.getConversationReadCursors(ctx, groupConversationIDs) + if err != nil { + log.ZWarn(ctx, "getConversationReadCursorsFromServer failed", err) + return err + } + log.ZDebug(ctx, "getConversationReadCursorsFromServer completed", "duration", time.Since(stepStartTime).Seconds()) + + stepStartTime = time.Now() + allCursorCount := 0 + for conversationID, cursorList := range resp.Cursors { + curCursorCount := 0 + if cursorList == nil || len(cursorList.Cursors) == 0 { + continue + } + + for _, cursor := range cursorList.Cursors { + localCursor := &model_struct.LocalGroupReadCursor{ + ConversationID: conversationID, + UserID: cursor.UserID, + MaxReadSeq: cursor.MaxReadSeq, + } + + existingCursor, err := c.db.GetGroupReadCursor(ctx, conversationID, cursor.UserID) + if err != nil { + if err := c.db.InsertGroupReadCursor(ctx, localCursor); err != nil { + log.ZWarn(ctx, "InsertGroupReadCursor failed", err, "conversationID", conversationID, "userID", cursor.UserID) + } else { + curCursorCount++ + } + } else { + if cursor.MaxReadSeq > existingCursor.MaxReadSeq { + if err := c.db.UpdateGroupReadCursor(ctx, conversationID, cursor.UserID, cursor.MaxReadSeq); err != nil { + log.ZWarn(ctx, "UpdateGroupReadCursor failed", err, "conversationID", conversationID, "userID", cursor.UserID) + } else { + curCursorCount++ + } + } + } + } + allCursorCount += curCursorCount + if curCursorCount != 0 { + if err := c.db.IncrementGroupReadCursorVersion(ctx, conversationID); err != nil { + log.ZWarn(ctx, "IncrementGroupReadCursorVersion failed", err, "conversationID", conversationID) + } + } + } + + log.ZDebug(ctx, "syncAllGroupReadCursors completed", "duration", time.Since(stepStartTime).Seconds(), "cursorCount", allCursorCount) + return nil +} diff --git a/internal/group/notification.go b/internal/group/notification.go index d9931dd9e..3f9e19a4e 100644 --- a/internal/group/notification.go +++ b/internal/group/notification.go @@ -19,6 +19,7 @@ import ( "fmt" "github.com/openimsdk/openim-sdk-core/v3/pkg/constant" + "github.com/openimsdk/openim-sdk-core/v3/pkg/db/model_struct" "github.com/openimsdk/openim-sdk-core/v3/pkg/utils" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/utils/datautil" @@ -78,6 +79,23 @@ func (g *Group) doNotification(ctx context.Context, msg *sdkws.MsgData) error { return err } + conversationID := utils.GetConversationIDByGroupID(detail.Group.GroupID) + if err := g.db.InsertGroupReadCursorState(ctx, &model_struct.LocalGroupReadCursorState{ + ConversationID: conversationID, + CursorVersion: 1, + }); err != nil { + log.ZError(ctx, "InsertGroupReadCursorState on GroupCreatedNotification failed", err, "groupID", detail.Group.GroupID) + } + for _, member := range detail.MemberList { + if err := g.db.InsertGroupReadCursor(ctx, &model_struct.LocalGroupReadCursor{ + ConversationID: conversationID, + UserID: member.UserID, + MaxReadSeq: 0, + }); err != nil { + log.ZError(ctx, "InsertGroupReadCursor on GroupCreatedNotification failed", err, "groupID", detail.Group.GroupID, "userID", member.UserID) + } + } + if err := g.IncrSyncJoinGroup(ctx); err != nil { return err } @@ -96,9 +114,23 @@ func (g *Group) doNotification(ctx context.Context, msg *sdkws.MsgData) error { if err := utils.UnmarshalNotificationElem(msg.Content, &detail); err != nil { return err } + conversationID := utils.GetConversationIDByGroupID(detail.Group.GroupID) if detail.QuitUser.UserID == g.loginUserID { + if err := g.db.DeleteGroupReadCursorsByConversationID(ctx, conversationID); err != nil { + log.ZWarn(ctx, "DeleteGroupReadCursorsByConversationID err", err, "conversationID", conversationID) + } + if err := g.db.DeleteGroupReadCursorState(ctx, conversationID); err != nil { + log.ZWarn(ctx, "DeleteGroupReadCursorState err", err, "conversationID", conversationID) + } return g.IncrSyncJoinGroup(ctx) } else { + if err := g.db.DeleteGroupReadCursor(ctx, conversationID, detail.QuitUser.UserID); err != nil { + log.ZWarn(ctx, "DeleteGroupReadCursor err", err, "conversationID", conversationID, "userID", detail.QuitUser.UserID) + } else { + if err := g.db.IncrementGroupReadCursorVersion(ctx, conversationID); err != nil { + log.ZWarn(ctx, "IncrementGroupReadCursorVersion err", err, "conversationID", conversationID) + } + } return g.onlineSyncGroupAndMember(ctx, detail.Group.GroupID, []*sdkws.GroupMemberFullInfo{detail.QuitUser}, nil, nil, detail.Group, groupSortIDUnchanged, detail.GroupMemberVersion, detail.GroupMemberVersionID) } @@ -125,9 +157,29 @@ func (g *Group) doNotification(ctx context.Context, msg *sdkws.MsgData) error { break } } + conversationID := utils.GetConversationIDByGroupID(detail.Group.GroupID) if self { + if err := g.db.DeleteGroupReadCursorsByConversationID(ctx, conversationID); err != nil { + log.ZWarn(ctx, "DeleteGroupReadCursorsByConversationID err", err, "conversationID", conversationID) + } + if err := g.db.DeleteGroupReadCursorState(ctx, conversationID); err != nil { + log.ZWarn(ctx, "DeleteGroupReadCursorState err", err, "conversationID", conversationID) + } return g.IncrSyncJoinGroup(ctx) } else { + deleted := false + for _, info := range detail.KickedUserList { + if err := g.db.DeleteGroupReadCursor(ctx, conversationID, info.UserID); err != nil { + log.ZWarn(ctx, "DeleteGroupReadCursor err", err, "conversationID", conversationID, "userID", info.UserID) + } else { + deleted = true + } + } + if deleted { + if err := g.db.IncrementGroupReadCursorVersion(ctx, conversationID); err != nil { + log.ZWarn(ctx, "IncrementGroupReadCursorVersion err", err, "conversationID", conversationID) + } + } return g.onlineSyncGroupAndMember(ctx, detail.Group.GroupID, detail.KickedUserList, nil, nil, detail.Group, groupSortIDUnchanged, detail.GroupMemberVersion, detail.GroupMemberVersionID) } @@ -139,6 +191,21 @@ func (g *Group) doNotification(ctx context.Context, msg *sdkws.MsgData) error { userIDMap := datautil.SliceSetAny(detail.InvitedUserList, func(e *sdkws.GroupMemberFullInfo) string { return e.UserID }) + + conversationID := utils.GetConversationIDByGroupID(detail.Group.GroupID) + for _, member := range detail.InvitedUserList { + if err := g.db.InsertGroupReadCursor(ctx, &model_struct.LocalGroupReadCursor{ + ConversationID: conversationID, + UserID: member.UserID, + MaxReadSeq: 0, + }); err != nil { + log.ZError(ctx, "InsertGroupReadCursor on MemberInvitedNotification failed", err, "groupID", detail.Group.GroupID, "userID", member.UserID) + } + } + if err := g.db.IncrementGroupReadCursorVersion(ctx, conversationID); err != nil { + log.ZError(ctx, "IncrementGroupReadCursorVersion err", err, "conversationID", conversationID) + } + //Also invited as a member if _, ok := userIDMap[g.loginUserID]; ok { if err := g.IncrSyncJoinGroup(ctx); err != nil { @@ -154,6 +221,19 @@ func (g *Group) doNotification(ctx context.Context, msg *sdkws.MsgData) error { if err := utils.UnmarshalNotificationElem(msg.Content, &detail); err != nil { return err } + + conversationID := utils.GetConversationIDByGroupID(detail.Group.GroupID) + if err := g.db.InsertGroupReadCursor(ctx, &model_struct.LocalGroupReadCursor{ + ConversationID: conversationID, + UserID: detail.EntrantUser.UserID, + MaxReadSeq: 0, + }); err != nil { + log.ZError(ctx, "InsertGroupReadCursor on MemberEnterNotification failed", err, "groupID", detail.Group.GroupID, "userID", detail.EntrantUser.UserID) + } + if err := g.db.IncrementGroupReadCursorVersion(ctx, conversationID); err != nil { + log.ZError(ctx, "IncrementGroupReadCursorVersion err", err, "conversationID", conversationID) + } + if detail.EntrantUser.UserID == g.loginUserID { if err := g.IncrSyncJoinGroup(ctx); err != nil { return err @@ -170,6 +250,14 @@ func (g *Group) doNotification(ctx context.Context, msg *sdkws.MsgData) error { } g.listener().OnGroupDismissed(utils.StructToJsonString(detail.Group)) + conversationID := utils.GetConversationIDByGroupID(detail.Group.GroupID) + if err := g.db.DeleteGroupReadCursorsByConversationID(ctx, conversationID); err != nil { + log.ZWarn(ctx, "DeleteGroupReadCursorsByConversationID err", err, "conversationID", conversationID) + } + if err := g.db.DeleteGroupReadCursorState(ctx, conversationID); err != nil { + log.ZWarn(ctx, "DeleteGroupReadCursorState err", err, "conversationID", conversationID) + } + return g.IncrSyncJoinGroup(ctx) case constant.GroupMemberMutedNotification: // 1512 var detail sdkws.GroupMemberMutedTips diff --git a/open_im_sdk/conversation_msg.go b/open_im_sdk/conversation_msg.go index 17cb8dc3c..4123362f8 100644 --- a/open_im_sdk/conversation_msg.go +++ b/open_im_sdk/conversation_msg.go @@ -157,6 +157,10 @@ func RevokeMessage(callback open_im_sdk_callback.Base, operationID string, conve call(callback, operationID, IMUserContext.Conversation().RevokeMessage, conversationID, clientMsgID) } +func ProjectGroupReadInfo(callback open_im_sdk_callback.Base, operationID string, conversationID string, clientMsgIDs []string) { + call(callback, operationID, IMUserContext.Conversation().ProjectGroupReadInfo, conversationID, clientMsgIDs) +} + func TypingStatusUpdate(callback open_im_sdk_callback.Base, operationID string, recvID string, msgTip string) { call(callback, operationID, IMUserContext.Conversation().TypingStatusUpdate, recvID, msgTip) } diff --git a/open_im_sdk_callback/callback_client.go b/open_im_sdk_callback/callback_client.go index 11f9889b9..955e57d3e 100644 --- a/open_im_sdk_callback/callback_client.go +++ b/open_im_sdk_callback/callback_client.go @@ -70,6 +70,7 @@ type OnConversationListener interface { type OnAdvancedMsgListener interface { OnRecvNewMessage(message string) OnRecvC2CReadReceipt(msgReceiptList string) + OnRecvGroupReadReceipt(groupMsgReceiptList string) OnNewRecvMessageRevoked(messageRevoked string) OnRecvOfflineNewMessage(message string) OnMsgDeleted(message string) diff --git a/pkg/api/api.go b/pkg/api/api.go index af8825d87..22d3df17f 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -90,6 +90,7 @@ var ( GetIncrementalConversation = newApi[conversation.GetIncrementalConversationReq, conversation.GetIncrementalConversationResp]("/conversation/get_incremental_conversations") GetFullConversationIDs = newApi[conversation.GetFullOwnerConversationIDsReq, conversation.GetFullOwnerConversationIDsResp]("/conversation/get_full_conversation_ids") GetOwnerConversation = newApi[conversation.GetOwnerConversationReq, conversation.GetOwnerConversationResp]("/conversation/get_owner_conversation") + GetConversationReadCursors = newApi[conversation.GetConversationReadCursorsReq, conversation.GetConversationReadCursorsResp]("/conversation/get_conversation_read_cursors") GetActiveConversation = newApi[jssdk.GetActiveConversationsReq, jssdk.GetActiveConversationsResp]("/jssdk/get_active_conversations") ) diff --git a/pkg/db/chat_log_model.go b/pkg/db/chat_log_model.go index 98e3b2538..2dacc603a 100644 --- a/pkg/db/chat_log_model.go +++ b/pkg/db/chat_log_model.go @@ -422,6 +422,59 @@ func (d *DataBase) GetMessagesBySeqs(ctx context.Context, conversationID string, return msgs, err } +func (d *DataBase) InsertGroupReadCursor(ctx context.Context, cursor *model_struct.LocalGroupReadCursor) error { + d.mRWMutex.Lock() + defer d.mRWMutex.Unlock() + return errs.WrapMsg(d.conn.WithContext(ctx).Create(cursor).Error, "InsertGroupReadCursor failed") +} + +func (d *DataBase) UpdateGroupReadCursor(ctx context.Context, conversationID, userID string, maxReadSeq int64) error { + d.mRWMutex.Lock() + defer d.mRWMutex.Unlock() + + return errs.WrapMsg( + d.conn.WithContext(ctx). + Model(&model_struct.LocalGroupReadCursor{}). + Where("conversation_id = ? AND user_id = ? AND max_read_seq < ?", conversationID, userID, maxReadSeq). + Update("max_read_seq", maxReadSeq).Error, + "UpdateGroupReadCursor failed", + ) +} + +func (d *DataBase) GetGroupReadCursor(ctx context.Context, conversationID, userID string) (*model_struct.LocalGroupReadCursor, error) { + d.mRWMutex.RLock() + defer d.mRWMutex.RUnlock() + var cursor model_struct.LocalGroupReadCursor + err := d.conn.WithContext(ctx).Where("conversation_id = ? AND user_id = ?", conversationID, userID).First(&cursor).Error + if err != nil { + return nil, errs.WrapMsg(err, "GetGroupReadCursor failed") + } + return &cursor, nil +} + +func (d *DataBase) GetGroupReadCursorsByConversationID(ctx context.Context, conversationID string) ([]*model_struct.LocalGroupReadCursor, error) { + d.mRWMutex.RLock() + defer d.mRWMutex.RUnlock() + var cursors []*model_struct.LocalGroupReadCursor + err := d.conn.WithContext(ctx).Where("conversation_id = ?", conversationID).Find(&cursors).Error + return cursors, errs.WrapMsg(err, "GetGroupReadCursorsByConversationID failed") +} + +func (d *DataBase) DeleteGroupReadCursor(ctx context.Context, conversationID, userID string) error { + d.mRWMutex.Lock() + defer d.mRWMutex.Unlock() + return errs.WrapMsg(d.conn.WithContext(ctx).Where("conversation_id = ? AND user_id = ?", conversationID, userID). + Delete(&model_struct.LocalGroupReadCursor{}).Error, "DeleteGroupReadCursor failed") +} + +// DeleteGroupReadCursorsByConversationID deletes all group read cursors for the given conversationID. +func (d *DataBase) DeleteGroupReadCursorsByConversationID(ctx context.Context, conversationID string) error { + d.mRWMutex.Lock() + defer d.mRWMutex.Unlock() + return errs.WrapMsg(d.conn.WithContext(ctx).Where("conversation_id = ?", conversationID). + Delete(&model_struct.LocalGroupReadCursor{}).Error, "DeleteGroupReadCursorsByConversationID failed") +} + func (d *DataBase) GetConversationNormalMsgSeq(ctx context.Context, conversationID string) (int64, error) { err := d.initChatLog(ctx, conversationID) if err != nil { @@ -518,3 +571,34 @@ func (d *DataBase) GetLatestValidServerMessage(ctx context.Context, conversation return &result, nil } + +func (d *DataBase) InsertGroupReadCursorState(ctx context.Context, state *model_struct.LocalGroupReadCursorState) error { + d.mRWMutex.Lock() + defer d.mRWMutex.Unlock() + return errs.WrapMsg(d.conn.WithContext(ctx).Create(state).Error, "InsertGroupReadCursorState failed") +} + +func (d *DataBase) GetGroupReadCursorState(ctx context.Context, conversationID string) (*model_struct.LocalGroupReadCursorState, error) { + d.mRWMutex.RLock() + defer d.mRWMutex.RUnlock() + var state model_struct.LocalGroupReadCursorState + err := d.conn.WithContext(ctx).Where("conversation_id = ?", conversationID).First(&state).Error + if err != nil { + return nil, errs.WrapMsg(err, "GetGroupReadCursorState failed") + } + return &state, nil +} + +func (d *DataBase) DeleteGroupReadCursorState(ctx context.Context, conversationID string) error { + d.mRWMutex.Lock() + defer d.mRWMutex.Unlock() + return errs.WrapMsg(d.conn.WithContext(ctx).Where("conversation_id = ?", conversationID).Delete(&model_struct.LocalGroupReadCursorState{}).Error, "DeleteGroupReadCursorState failed") +} + +func (d *DataBase) IncrementGroupReadCursorVersion(ctx context.Context, conversationID string) error { + d.mRWMutex.Lock() + defer d.mRWMutex.Unlock() + return errs.WrapMsg(d.conn.WithContext(ctx).Model(&model_struct.LocalGroupReadCursorState{}). + Where("conversation_id = ?", conversationID). + UpdateColumn("cursor_version", gorm.Expr("cursor_version + 1")).Error, "IncrementGroupReadCursorVersion failed") +} diff --git a/pkg/db/db_init.go b/pkg/db/db_init.go index c515fe5b6..f161a69f4 100644 --- a/pkg/db/db_init.go +++ b/pkg/db/db_init.go @@ -185,6 +185,8 @@ func (d *DataBase) versionDataMigrate(ctx context.Context) error { &model_struct.LocalStranger{}, &model_struct.LocalSendingMessages{}, &model_struct.LocalVersionSync{}, + &model_struct.LocalGroupReadCursor{}, + &model_struct.LocalGroupReadCursorState{}, ) if err != nil { return err diff --git a/pkg/db/db_interface/databse.go b/pkg/db/db_interface/databse.go index 61384f201..485d59769 100644 --- a/pkg/db/db_interface/databse.go +++ b/pkg/db/db_interface/databse.go @@ -174,6 +174,22 @@ type AppSDKVersion interface { SetAppSDKVersion(ctx context.Context, version *model_struct.LocalAppSDKVersion) error } +type GroupReadCursorModel interface { + InsertGroupReadCursor(ctx context.Context, cursor *model_struct.LocalGroupReadCursor) error + UpdateGroupReadCursor(ctx context.Context, conversationID, userID string, maxReadSeq int64) error + GetGroupReadCursor(ctx context.Context, conversationID, userID string) (*model_struct.LocalGroupReadCursor, error) + GetGroupReadCursorsByConversationID(ctx context.Context, conversationID string) ([]*model_struct.LocalGroupReadCursor, error) + DeleteGroupReadCursor(ctx context.Context, conversationID, userID string) error + DeleteGroupReadCursorsByConversationID(ctx context.Context, conversationID string) error +} + +type GroupReadCursorStateModel interface { + InsertGroupReadCursorState(ctx context.Context, state *model_struct.LocalGroupReadCursorState) error + GetGroupReadCursorState(ctx context.Context, conversationID string) (*model_struct.LocalGroupReadCursorState, error) + DeleteGroupReadCursorState(ctx context.Context, conversationID string) error + IncrementGroupReadCursorVersion(ctx context.Context, conversationID string) error +} + type TableMaster interface { GetExistTables(ctx context.Context) ([]string, error) } @@ -189,5 +205,7 @@ type DataBase interface { SendingMessagesModel VersionSyncModel AppSDKVersion + GroupReadCursorModel + GroupReadCursorStateModel TableMaster } diff --git a/pkg/db/model_struct/data_model_struct.go b/pkg/db/model_struct/data_model_struct.go index 8c5ee887b..8ad4aa9b4 100644 --- a/pkg/db/model_struct/data_model_struct.go +++ b/pkg/db/model_struct/data_model_struct.go @@ -222,6 +222,25 @@ func (LocalConversation) TableName() string { return "local_conversations" } +type LocalGroupReadCursor struct { + ConversationID string `gorm:"column:conversation_id;primary_key;type:char(128)" json:"conversationID"` + UserID string `gorm:"column:user_id;primary_key;type:char(64)" json:"userID"` + MaxReadSeq int64 `gorm:"column:max_read_seq" json:"maxReadSeq"` +} + +func (LocalGroupReadCursor) TableName() string { + return "local_group_read_cursor" +} + +type LocalGroupReadCursorState struct { + ConversationID string `gorm:"column:conversation_id;primary_key;type:char(128)" json:"conversationID"` + CursorVersion int64 `gorm:"column:cursor_version" json:"cursorVersion"` +} + +func (LocalGroupReadCursorState) TableName() string { + return "local_group_read_cursor_state" +} + type LocalConversationUnreadMessage struct { ConversationID string `gorm:"column:conversation_id;primary_key;type:char(128)" json:"conversationID"` ClientMsgID string `gorm:"column:client_msg_id;primary_key;type:char(64)" json:"clientMsgID"` diff --git a/sdk_struct/sdk_struct.go b/sdk_struct/sdk_struct.go index 93ee2b38c..f45b06a00 100644 --- a/sdk_struct/sdk_struct.go +++ b/sdk_struct/sdk_struct.go @@ -289,6 +289,7 @@ type GroupHasReadInfo struct { HasReadUserIDList []string `json:"hasReadUserIDList,omitempty"` HasReadCount int32 `json:"hasReadCount"` GroupMemberCount int32 `json:"groupMemberCount"` + ReadCursorVersion int64 `json:"readCursorVersion"` } type NewMsgList []*MsgStruct