Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,5 @@ require (
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect
google.golang.org/grpc v1.68.0 // indirect
)

replace github.com/openimsdk/protocol => github.com/rookiewwj/protocol v0.0.1
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ github.com/lestrrat-go/strftime v1.0.6 h1:CFGsDEt1pOpFNU+TJB0nhz9jl+K0hZSLE205Ah
github.com/lestrrat-go/strftime v1.0.6/go.mod h1:f7jQKgV5nnJpYgdEasS+/y7EsTb8ykN2z68n3TtcTaw=
github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU=
github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
github.com/openimsdk/protocol v0.0.73-alpha.12 h1:2NYawXeHChYUeSme6QJ9pOLh+Empce2WmwEtbP4JvKk=
github.com/openimsdk/protocol v0.0.73-alpha.12/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw=
github.com/openimsdk/tools v0.0.50-alpha.80 h1:Nvt97Vm85CXr633Jf7WjRJeL2nxJJjwlZJFDgWWXkJU=
github.com/openimsdk/tools v0.0.50-alpha.80/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
Expand All @@ -41,6 +39,8 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rookiewwj/protocol v0.0.1 h1:Bd9F8FfE/viObERdhEuRm8tfJkE82dl244WvK1fNF2M=
github.com/rookiewwj/protocol v0.0.1/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
Expand Down
4 changes: 4 additions & 0 deletions internal/conversation_msg/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -781,6 +781,10 @@ func (c *Conversation) RevokeMessage(ctx context.Context, conversationID, client
return c.revokeOneMessage(ctx, conversationID, clientMsgID)
}

func (c *Conversation) ProjectGroupReadInfo(ctx context.Context, conversationID string, clientMsgIDs []string) error {
return c.projectGroupReadInfo(ctx, conversationID, clientMsgIDs)
}

func (c *Conversation) TypingStatusUpdate(ctx context.Context, recvID, msgTip string) error {
return c.typingStatusUpdate(ctx, recvID, msgTip)
}
Expand Down
97 changes: 97 additions & 0 deletions internal/conversation_msg/projection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package conversation_msg

import (
"context"
"sort"

"github.com/openimsdk/openim-sdk-core/v3/pkg/db/model_struct"
"github.com/openimsdk/openim-sdk-core/v3/pkg/utils"
"github.com/openimsdk/openim-sdk-core/v3/sdk_struct"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
)

func (c *Conversation) projectGroupReadInfo(ctx context.Context, conversationID string, clientMsgIDs []string) error {

if len(clientMsgIDs) == 0 {
return nil
}

cursorState, err := c.db.GetGroupReadCursorState(ctx, conversationID)
if err != nil {
return errs.WrapMsg(err, "GetGroupReadCursorState failed", "conversationID", conversationID)
}
var cursorVersion int64
if cursorState != nil {
cursorVersion = cursorState.CursorVersion
}

var messages []*model_struct.LocalChatLog
for _, clientMsgID := range clientMsgIDs {
message, err := c.waitForMessageSyncSeq(ctx, conversationID, clientMsgID)
if err != nil {
log.ZWarn(ctx, "waitForMessageSyncSeq failed", err, "conversationID", conversationID, "clientMsgID", clientMsgID)
continue
}
messages = append(messages, message)
}
if len(messages) == 0 {
return nil
}

cursors, err := c.db.GetGroupReadCursorsByConversationID(ctx, conversationID) // []{UserID, MaxReadSeq}
if err != nil {
return errs.WrapMsg(err, "GetGroupReadCursorsByConversationID failed", "conversationID", conversationID)
}
memberCount := len(cursors)

sortedMaxSeqs := make([]int64, 0, memberCount)
type pair struct {
uid string
seq int64
}
userSeqPairs := make([]pair, 0, memberCount)

for _, cur := range cursors {
sortedMaxSeqs = append(sortedMaxSeqs, cur.MaxReadSeq)
userSeqPairs = append(userSeqPairs, pair{uid: cur.UserID, seq: cur.MaxReadSeq})
}
sort.Slice(sortedMaxSeqs, func(i, j int) bool { return sortedMaxSeqs[i] < sortedMaxSeqs[j] })

for _, m := range messages {
if m == nil || m.Seq == 0 {
continue
}
var attach sdk_struct.AttachedInfoElem
utils.JsonStringToStruct(m.AttachedInfo, &attach)

alreadyFresh := (attach.GroupHasReadInfo.ReadCursorVersion == cursorVersion)
if alreadyFresh {
continue
}

idx := sort.Search(len(sortedMaxSeqs), func(i int) bool { return sortedMaxSeqs[i] >= m.Seq })
hasReadCount := len(sortedMaxSeqs) - idx

attach.GroupHasReadInfo.HasReadCount = int32(hasReadCount)
attach.GroupHasReadInfo.GroupMemberCount = int32(memberCount)
attach.GroupHasReadInfo.ReadCursorVersion = cursorVersion

if hasReadCount > 0 {
list := make([]string, 0, hasReadCount)
for _, p := range userSeqPairs {
if p.seq >= m.Seq {
list = append(list, p.uid)
}
}
attach.GroupHasReadInfo.HasReadUserIDList = list
}

m.AttachedInfo = utils.StructToJsonString(attach)
if err := c.db.UpdateMessage(ctx, conversationID, m); err != nil {
log.ZWarn(ctx, "projectGroupReadInfo UpdateMessage err", err, "conversationID", conversationID, "seq", m.Seq, "clientMsgID", m.ClientMsgID)
}
}

return nil
}
43 changes: 42 additions & 1 deletion internal/conversation_msg/read_drawing.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/openimsdk/openim-sdk-core/v3/sdk_struct"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/utils/datautil"
"gorm.io/gorm"

"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/log"
Expand Down Expand Up @@ -248,7 +249,8 @@ func (c *Conversation) doReadDrawing(ctx context.Context, msg *sdkws.MsgData) er
return err

}
if conversation.ConversationType == constant.SingleChatType {
switch conversation.ConversationType {
case constant.SingleChatType:
latestMsg := &sdk_struct.MsgStruct{}
if err := json.Unmarshal([]byte(conversation.LatestMsg), latestMsg); err != nil {
log.ZWarn(ctx, "Unmarshal err", err, "conversationID", tips.ConversationID, "latestMsg", conversation.LatestMsg)
Expand Down Expand Up @@ -277,7 +279,46 @@ func (c *Conversation) doReadDrawing(ctx context.Context, msg *sdkws.MsgData) er
var messageReceiptResp = []*sdk_struct.MessageReceipt{{UserID: tips.MarkAsReadUserID, MsgIDList: successMsgIDs,
SessionType: conversation.ConversationType, ReadTime: msg.SendTime}}
c.msgListener().OnRecvC2CReadReceipt(utils.StructToJsonString(messageReceiptResp))

case constant.ReadGroupChatType:

maxReadSeq := tips.HasReadSeq
if maxReadSeq > 0 {
cursor, err := c.db.GetGroupReadCursor(ctx, tips.ConversationID, tips.MarkAsReadUserID)
if err != nil {
if err == gorm.ErrRecordNotFound {
newCursor := &model_struct.LocalGroupReadCursor{
ConversationID: tips.ConversationID,
UserID: tips.MarkAsReadUserID,
MaxReadSeq: maxReadSeq,
}
if err = c.db.InsertGroupReadCursor(ctx, newCursor); err != nil {
log.ZWarn(ctx, "InsertGroupReadCursor err", err, "conversationID", tips.ConversationID, "userID", tips.MarkAsReadUserID)
}
if err = c.db.IncrementGroupReadCursorVersion(ctx, tips.ConversationID); err != nil {
log.ZWarn(ctx, "IncrementGroupReadCursorVersion err", err, "conversationID", tips.ConversationID, "userID", tips.MarkAsReadUserID)
}
} else {
log.ZWarn(ctx, "GetGroupReadCursor err", err, "conversationID", tips.ConversationID, "userID", tips.MarkAsReadUserID)
return err
}

} else {
if maxReadSeq > cursor.MaxReadSeq {
if err = c.db.UpdateGroupReadCursor(ctx, tips.ConversationID, tips.MarkAsReadUserID, maxReadSeq); err != nil {
log.ZWarn(ctx, "UpdateGroupReadCursor err", err, "conversationID", tips.ConversationID, "userID", tips.MarkAsReadUserID)
}
if err = c.db.IncrementGroupReadCursorVersion(ctx, tips.ConversationID); err != nil {
log.ZWarn(ctx, "IncrementGroupReadCursorVersion err", err, "conversationID", tips.ConversationID, "userID", tips.MarkAsReadUserID)
}
}
}
}
var groupReceiptResp = []*sdk_struct.MessageReceipt{{GroupID: conversation.GroupID, UserID: tips.MarkAsReadUserID, MsgIDList: nil,
SessionType: conversation.ConversationType, ReadTime: msg.SendTime}}
c.msgListener().OnRecvGroupReadReceipt(utils.StructToJsonString(groupReceiptResp))
}

} else {
return c.doUnreadCount(ctx, conversation, tips.HasReadSeq, tips.Seqs)
}
Expand Down
5 changes: 5 additions & 0 deletions internal/conversation_msg/server_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ func (c *Conversation) getIncrementalConversationFromServer(ctx context.Context,
return api.GetIncrementalConversation.Invoke(ctx, req)
}

func (c *Conversation) getConversationReadCursors(ctx context.Context, conversationIDs []string) (*pbConversation.GetConversationReadCursorsResp, error) {
req := &pbConversation.GetConversationReadCursorsReq{ConversationIDs: conversationIDs}
return api.GetConversationReadCursors.Invoke(ctx, req)
}

func (c *Conversation) GetActiveConversations(ctx context.Context) ([]*jssdk.ConversationMsg, error) {
conf, err := cliconf.GetClientConfig(ctx)
if err != nil {
Expand Down
89 changes: 89 additions & 0 deletions internal/conversation_msg/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,95 @@ func (c *Conversation) SyncAllConversationHashReadSeqs(ctx context.Context) erro
log.ZDebug(ctx, "TriggerCmdUpdateConversation completed", "duration", time.Since(stepStartTime).Seconds())
}

stepStartTime = time.Now()
if err := c.syncAllGroupReadCursors(ctx); err != nil {
log.ZWarn(ctx, "syncAllGroupReadCursors failed", err)
}
log.ZDebug(ctx, "syncAllGroupReadCursors completed", "duration", time.Since(stepStartTime).Seconds())

log.ZDebug(ctx, "SyncAllConversationHashReadSeqs completed", "totalDuration", time.Since(startTime).Seconds())
return nil
}

func (c *Conversation) syncAllGroupReadCursors(ctx context.Context) error {
conversations, err := c.db.GetAllConversations(ctx)
if err != nil {
log.ZWarn(ctx, "GetAllConversations failed", err)
return err
}

var groupConversationIDs []string
for _, conv := range conversations {
if conv.ConversationType == constant.ReadGroupChatType {
groupConversationIDs = append(groupConversationIDs, conv.ConversationID)
}
}

if len(groupConversationIDs) == 0 {
log.ZDebug(ctx, "no group conversations to sync cursors")
return nil
}

log.ZDebug(ctx, "found group conversations", "count", len(groupConversationIDs), "conversationIDs", groupConversationIDs)

for _, conversationID := range groupConversationIDs {
if _, err := c.db.GetGroupReadCursorState(ctx, conversationID); err != nil {
if ierr := c.db.InsertGroupReadCursorState(ctx, &model_struct.LocalGroupReadCursorState{ConversationID: conversationID, CursorVersion: 1}); ierr != nil {
log.ZWarn(ctx, "InsertGroupReadCursorState failed", ierr, "conversationID", conversationID)
} else {
log.ZDebug(ctx, "initialized LocalGroupReadCursorState", "conversationID", conversationID)
}
}
}

stepStartTime := time.Now()
resp, err := c.getConversationReadCursors(ctx, groupConversationIDs)
if err != nil {
log.ZWarn(ctx, "getConversationReadCursorsFromServer failed", err)
return err
}
log.ZDebug(ctx, "getConversationReadCursorsFromServer completed", "duration", time.Since(stepStartTime).Seconds())

stepStartTime = time.Now()
allCursorCount := 0
for conversationID, cursorList := range resp.Cursors {
curCursorCount := 0
if cursorList == nil || len(cursorList.Cursors) == 0 {
continue
}

for _, cursor := range cursorList.Cursors {
localCursor := &model_struct.LocalGroupReadCursor{
ConversationID: conversationID,
UserID: cursor.UserID,
MaxReadSeq: cursor.MaxReadSeq,
}

existingCursor, err := c.db.GetGroupReadCursor(ctx, conversationID, cursor.UserID)
if err != nil {
if err := c.db.InsertGroupReadCursor(ctx, localCursor); err != nil {
log.ZWarn(ctx, "InsertGroupReadCursor failed", err, "conversationID", conversationID, "userID", cursor.UserID)
} else {
curCursorCount++
}
} else {
if cursor.MaxReadSeq > existingCursor.MaxReadSeq {
if err := c.db.UpdateGroupReadCursor(ctx, conversationID, cursor.UserID, cursor.MaxReadSeq); err != nil {
log.ZWarn(ctx, "UpdateGroupReadCursor failed", err, "conversationID", conversationID, "userID", cursor.UserID)
} else {
curCursorCount++
}
}
}
}
allCursorCount += curCursorCount
if curCursorCount != 0 {
if err := c.db.IncrementGroupReadCursorVersion(ctx, conversationID); err != nil {
log.ZWarn(ctx, "IncrementGroupReadCursorVersion failed", err, "conversationID", conversationID)
}
}
}

log.ZDebug(ctx, "syncAllGroupReadCursors completed", "duration", time.Since(stepStartTime).Seconds(), "cursorCount", allCursorCount)
return nil
}
Loading