Skip to content

Commit

Permalink
fix: Bug fix for clearing unread messages. (#677)
Browse files Browse the repository at this point in the history
Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>
  • Loading branch information
FGadvancer authored Aug 29, 2024
1 parent fc60b8e commit 9effc2b
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 139 deletions.
71 changes: 2 additions & 69 deletions internal/conversation_msg/conversation_msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"github.com/openimsdk/openim-sdk-core/v3/pkg/db/db_interface"
"github.com/openimsdk/openim-sdk-core/v3/pkg/db/model_struct"
"github.com/openimsdk/openim-sdk-core/v3/pkg/page"
sdk "github.com/openimsdk/openim-sdk-core/v3/pkg/sdk_params_callback"
"github.com/openimsdk/openim-sdk-core/v3/pkg/sdkerrs"
"github.com/openimsdk/openim-sdk-core/v3/pkg/syncer"
pbConversation "github.com/openimsdk/protocol/conversation"
Expand Down Expand Up @@ -152,7 +151,7 @@ func (c *Conversation) initSyncer() {
"update_unread_count_time": serverConversation.UpdateUnreadCountTime,
"attached_info": serverConversation.AttachedInfo, "ex": serverConversation.Ex, "msg_destruct_time": serverConversation.MsgDestructTime,
"is_msg_destruct": serverConversation.IsMsgDestruct,
"max_seq": serverConversation.MaxSeq, "min_seq": serverConversation.MinSeq, "has_read_seq": serverConversation.HasReadSeq})
"max_seq": serverConversation.MaxSeq, "min_seq": serverConversation.MinSeq})
}),
syncer.WithUUID[*model_struct.LocalConversation, pbConversation.GetOwnerConversationResp, string](func(value *model_struct.LocalConversation) string {
return value.ConversationID
Expand Down Expand Up @@ -666,7 +665,7 @@ func (c *Conversation) batchUpdateMessageList(ctx context.Context, updateMsg map
conversation.LatestMsg = utils.StructToJsonString(latestMsg)

c.doUpdateConversation(common.Cmd2Value{Value: common.UpdateConNode{ConID: conversation.ConversationID,
Action: constant.AddConOrUpLatMsg, Args: *conversation}})
Action: constant.AddConOrUpLatMsg, Args: *conversation, Caller: "batchUpdateMessageList"}})

}
}
Expand Down Expand Up @@ -898,72 +897,6 @@ func (c *Conversation) batchNewMessages(ctx context.Context, newMessagesList sdk

}

func (c *Conversation) doMsgReadState(ctx context.Context, msgReadList []*sdk_struct.MsgStruct) {
var messageReceiptResp []*sdk_struct.MessageReceipt
var msgIdList []string
chrsList := make(map[string][]string)
var conversationID string

for _, rd := range msgReadList {
err := json.Unmarshal([]byte(rd.Content), &msgIdList)
if err != nil {
// log.Error("internal", "unmarshal failed, err : ", err.Error())
return
}
var msgIdListStatusOK []string
for _, v := range msgIdList {
//m, err := c.db.GetMessage(ctx, v)
//if err != nil {
// log.Error("internal", "GetMessage err:", err, "ClientMsgID", v)
// continue
//}
//attachInfo := sdk_struct.AttachedInfoElem{}
//_ = utils.JsonStringToStruct(m.AttachedInfo, &attachInfo)
//attachInfo.HasReadTime = rd.SendTime
//m.AttachedInfo = utils.StructToJsonString(attachInfo)
//m.IsRead = true
//err = c.db.UpdateMessage(ctx, m)
//if err != nil {
// log.Error("internal", "setMessageHasReadByMsgID err:", err, "ClientMsgID", v)
// continue
//}

msgIdListStatusOK = append(msgIdListStatusOK, v)
}
if len(msgIdListStatusOK) > 0 {
msgRt := new(sdk_struct.MessageReceipt)
msgRt.ContentType = rd.ContentType
msgRt.MsgFrom = rd.MsgFrom
msgRt.ReadTime = rd.SendTime
msgRt.UserID = rd.SendID
msgRt.SessionType = constant.SingleChatType
msgRt.MsgIDList = msgIdListStatusOK
messageReceiptResp = append(messageReceiptResp, msgRt)
}
if rd.SendID == c.loginUserID {
conversationID = c.getConversationIDBySessionType(rd.RecvID, constant.SingleChatType)
} else {
conversationID = c.getConversationIDBySessionType(rd.SendID, constant.SingleChatType)
}
if v, ok := chrsList[conversationID]; ok {
chrsList[conversationID] = append(v, msgIdListStatusOK...)
} else {
chrsList[conversationID] = msgIdListStatusOK
}
c.doUpdateConversation(common.Cmd2Value{Value: common.UpdateConNode{Action: constant.ConversationLatestMsgHasRead, Args: chrsList}})
}
if len(messageReceiptResp) > 0 {

// log.Info("internal", "OnRecvC2CReadReceipt: ", utils.StructToJsonString(messageReceiptResp))
c.msgListener().OnRecvC2CReadReceipt(utils.StructToJsonString(messageReceiptResp))
}
}

type messageKvList struct {
ClientMsgID string `json:"clientMsgID"`
ChangedKvList []*sdk.SingleTypeKeyInfoSum `json:"changedKvList"`
}

func (c *Conversation) msgConvert(msg *sdk_struct.MsgStruct) (err error) {
err = c.msgHandleByContentType(msg)
if err != nil {
Expand Down
65 changes: 27 additions & 38 deletions internal/conversation_msg/conversation_notification.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,15 +125,8 @@ func (c *Conversation) doNotificationManager(c2v common.Cmd2Value) {

for conversationID, msgs := range allMsg {
log.ZDebug(ctx, "notification handling", "conversationID", conversationID, "msgs", msgs)
if len(msgs.Msgs) != 0 {
lastMsg := msgs.Msgs[len(msgs.Msgs)-1]
log.ZDebug(ctx, "SetNotificationSeq", "conversationID", conversationID, "seq", lastMsg.Seq)
if lastMsg.Seq != 0 {
if err := c.db.SetNotificationSeq(ctx, conversationID, lastMsg.Seq); err != nil {
log.ZError(ctx, "SetNotificationSeq err", err, "conversationID", conversationID, "lastMsg", lastMsg)
}
}
}

// First, process all the notifications
for _, msg := range msgs.Msgs {
if msg.ContentType > constant.FriendNotificationBegin && msg.ContentType < constant.FriendNotificationEnd {
c.friend.DoNotification(ctx, msg)
Expand All @@ -147,13 +140,26 @@ func (c *Conversation) doNotificationManager(c2v common.Cmd2Value) {
c.DoNotification(ctx, msg)
}
}

// After all notifications are processed, update the sequence number
if len(msgs.Msgs) != 0 {
lastMsg := msgs.Msgs[len(msgs.Msgs)-1]
log.ZDebug(ctx, "SetNotificationSeq", "conversationID", conversationID, "seq", lastMsg.Seq)
if lastMsg.Seq != 0 {
if err := c.db.SetNotificationSeq(ctx, conversationID, lastMsg.Seq); err != nil {
// Log an error if setting the sequence number fails
log.ZError(ctx, "SetNotificationSeq err", err, "conversationID", conversationID, "lastMsg", lastMsg)
}
}
}
}

}

func (c *Conversation) DoNotification(ctx context.Context, msg *sdkws.MsgData) {
go func() {
if err := c.doNotification(ctx, msg); err != nil {
log.ZError(ctx, "DoGroupNotification failed", err)
log.ZWarn(ctx, "DoGroupNotification failed", err)
}
}()
}
Expand Down Expand Up @@ -187,6 +193,7 @@ func (c *Conversation) getConversationLatestMsgClientID(latestMsg string) string
func (c *Conversation) doUpdateConversation(c2v common.Cmd2Value) {
ctx := c2v.Ctx
node := c2v.Value.(common.UpdateConNode)
log.ZInfo(ctx, "doUpdateConversation", "node", node)
switch node.Action {
case constant.AddConOrUpLatMsg:
var list []*model_struct.LocalConversation
Expand All @@ -206,38 +213,20 @@ func (c *Conversation) doUpdateConversation(c2v common.Cmd2Value) {
}
}
} else {
// log.Info("this is new conversation", lc)
log.ZDebug(ctx, "new conversation", "lc", lc)
err4 := c.db.InsertConversation(ctx, &lc)
if err4 != nil {
// log.Error("internal", "insert new conversation err:", err4.Error())
log.ZWarn(ctx, "insert new conversation err", err4)
} else {
list = append(list, &lc)
c.ConversationListener().OnNewConversation(utils.StructToJsonString(list))
}
}

case constant.UnreadCountSetZero:
if err := c.db.UpdateColumnsConversation(ctx, node.ConID, map[string]interface{}{"unread_count": 0}); err != nil {
log.ZError(ctx, "updateConversationUnreadCountModel err", err, "conversationID", node.ConID)
} else {
totalUnreadCount, err := c.db.GetTotalUnreadMsgCountDB(ctx)
if err == nil {
c.ConversationListener().OnTotalUnreadMessageCountChanged(totalUnreadCount)
} else {
log.ZError(ctx, "getTotalUnreadMsgCountDB err", err)
}

}
case constant.IncrUnread:
err := c.db.IncrConversationUnreadCount(ctx, node.ConID)
if err != nil {
// log.Error("internal", "incrConversationUnreadCount database err:", err.Error())
return
}
case constant.TotalUnreadMessageChanged:
totalUnreadCount, err := c.db.GetTotalUnreadMsgCountDB(ctx)
if err != nil {
// log.Error("internal", "TotalUnreadMessageChanged database err:", err.Error())
log.ZWarn(ctx, "GetTotalUnreadMsgCountDB err", err)
} else {
c.ConversationListener().OnTotalUnreadMessageCountChanged(totalUnreadCount)
}
Expand Down Expand Up @@ -312,10 +301,10 @@ func (c *Conversation) doUpdateConversation(c2v common.Cmd2Value) {
cidList := node.Args.([]string)
cLists, err := c.db.GetMultipleConversationDB(ctx, cidList)
if err != nil {
// log.Error("internal", "getMultipleConversationModel err :", err.Error())
log.ZWarn(ctx, "getMultipleConversationModel err", err)
} else {
if cLists != nil {
// log.Info("internal", "getMultipleConversationModel success :", cLists)
log.ZDebug(ctx, "getMultipleConversationModel success", "cLists", cLists)
c.ConversationListener().OnNewConversation(utils.StructToJsonString(cLists))
}
}
Expand All @@ -325,7 +314,7 @@ func (c *Conversation) doUpdateConversation(c2v common.Cmd2Value) {

case constant.NewConDirect:
cidList := node.Args.(string)
// log.Debug("internal", "NewConversation", cidList)
log.ZDebug(ctx, "NewConversation", "cidList", cidList)
c.ConversationListener().OnNewConversation(cidList)

case constant.ConversationLatestMsgHasRead:
Expand All @@ -336,12 +325,12 @@ func (c *Conversation) doUpdateConversation(c2v common.Cmd2Value) {
for conversationID, msgIDList := range hasReadMsgList {
LocalConversation, err := c.db.GetConversation(ctx, conversationID)
if err != nil {
// log.Error("internal", "get conversation err", err.Error(), conversationID)
log.ZWarn(ctx, "get conversation err", err, "conversationID", conversationID)
continue
}
err = utils.JsonStringToStruct(LocalConversation.LatestMsg, &latestMsg)
if err != nil {
// log.Error("internal", "JsonStringToStruct err", err.Error(), conversationID)
log.ZWarn(ctx, "JsonStringToStruct err", err, "conversationID", conversationID)
continue
}
if utils.IsContain(latestMsg.ClientMsgID, msgIDList) {
Expand All @@ -351,15 +340,15 @@ func (c *Conversation) doUpdateConversation(c2v common.Cmd2Value) {
LocalConversation.LatestMsg = utils.StructToJsonString(latestMsg)
err := c.db.UpdateConversation(ctx, &lc)
if err != nil {
// log.Error("internal", "UpdateConversation database err:", err.Error())
log.ZWarn(ctx, "UpdateConversation err", err)
continue
} else {
result = append(result, LocalConversation)
}
}
}
if result != nil {
// log.Info("internal", "getMultipleConversationModel success :", result)
log.ZDebug(ctx, "getMultipleConversationModel success", "result", result)
c.ConversationListener().OnNewConversation(utils.StructToJsonString(result))
}
case constant.SyncConversation:
Expand Down
51 changes: 27 additions & 24 deletions internal/conversation_msg/read_drawing.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,13 @@ func (c *Conversation) getConversationMaxSeqAndSetHasRead(ctx context.Context, c
if maxSeq == 0 {
return nil
}
if err := c.setConversationHasReadSeq(ctx, conversationID, maxSeq); err != nil {
return err
}
if err := c.db.UpdateColumnsConversation(ctx, conversationID, map[string]interface{}{"has_read_seq": maxSeq}); err != nil {
return err
}
return nil
return c.setConversationHasReadSeq(ctx, conversationID, maxSeq)
}

// mark a conversation's all message as read
func (c *Conversation) markConversationMessageAsRead(ctx context.Context, conversationID string) error {
c.conversationSyncMutex.Lock()
defer c.conversationSyncMutex.Unlock()
conversation, err := c.db.GetConversation(ctx, conversationID)
if err != nil {
return err
Expand Down Expand Up @@ -181,8 +177,6 @@ func (c *Conversation) getAsReadMsgMapAndList(ctx context.Context,
}

func (c *Conversation) unreadChangeTrigger(ctx context.Context, conversationID string, latestMsgIsRead bool) {
c.conversationSyncMutex.Lock()
defer c.conversationSyncMutex.Unlock()
if latestMsgIsRead {
c.doUpdateConversation(common.Cmd2Value{Value: common.UpdateConNode{ConID: conversationID,
Action: constant.UpdateLatestMessageChange, Args: []string{conversationID}}, Ctx: ctx})
Expand All @@ -196,23 +190,33 @@ func (c *Conversation) unreadChangeTrigger(ctx context.Context, conversationID s
func (c *Conversation) doUnreadCount(ctx context.Context, conversation *model_struct.LocalConversation, hasReadSeq int64, seqs []int64) error {
if conversation.ConversationType == constant.SingleChatType {
if len(seqs) != 0 {
_, err := c.db.MarkConversationMessageAsReadBySeqs(ctx, conversation.ConversationID, seqs)
hasReadMessage, err := c.db.GetMessageBySeq(ctx, conversation.ConversationID, hasReadSeq)
if err != nil {
log.ZWarn(ctx, "MarkConversationMessageAsReadBySeqs err", err, "conversationID", conversation.ConversationID, "seqs", seqs)
return err
}
if hasReadMessage.IsRead {
return errs.New("read info from self can be ignored").Wrap()

} else {
_, err := c.db.MarkConversationMessageAsReadBySeqs(ctx, conversation.ConversationID, seqs)
if err != nil {
return err
}
}

} else {
log.ZWarn(ctx, "seqs is empty", nil, "conversationID", conversation.ConversationID, "hasReadSeq", hasReadSeq)
return errs.New("seqs is empty", "conversationID", conversation.ConversationID, "hasReadSeq", hasReadSeq).Wrap()
return errs.New("seqList is empty", "conversationID", conversation.ConversationID, "hasReadSeq", hasReadSeq).Wrap()
}
if hasReadSeq > conversation.HasReadSeq {
decrUnreadCount := hasReadSeq - conversation.HasReadSeq
if err := c.db.DecrConversationUnreadCount(ctx, conversation.ConversationID, decrUnreadCount); err != nil {
log.ZError(ctx, "DecrConversationUnreadCount err", err, "conversationID", conversation.ConversationID, "decrUnreadCount", decrUnreadCount)
return err
currentMaxSeq := c.maxSeqRecorder.Get(conversation.ConversationID)
if currentMaxSeq == 0 {
return errs.New("currentMaxSeq is 0", "conversationID", conversation.ConversationID).Wrap()
} else {
unreadCount := currentMaxSeq - hasReadSeq
if unreadCount < 0 {
log.ZWarn(ctx, "unread count is less than 0", nil, "conversationID", conversation.ConversationID, "currentMaxSeq", currentMaxSeq, "hasReadSeq", hasReadSeq)
unreadCount = 0
}
if err := c.db.UpdateColumnsConversation(ctx, conversation.ConversationID, map[string]interface{}{"has_read_seq": hasReadSeq}); err != nil {
log.ZError(ctx, "UpdateColumnsConversation err", err, "conversationID", conversation.ConversationID)
if err := c.db.UpdateColumnsConversation(ctx, conversation.ConversationID, map[string]interface{}{"unread_count": unreadCount}); err != nil {
return err
}
}
Expand All @@ -222,9 +226,8 @@ func (c *Conversation) doUnreadCount(ctx context.Context, conversation *model_st
return err
}
if (!latestMsg.IsRead) && datautil.Contain(latestMsg.Seq, seqs...) {
latestMsg.IsRead = true
conversation.LatestMsg = utils.StructToJsonString(&latestMsg)
_ = common.TriggerCmdUpdateConversation(ctx, common.UpdateConNode{ConID: conversation.ConversationID, Action: constant.AddConOrUpLatMsg, Args: *conversation}, c.GetCh())
c.doUpdateConversation(common.Cmd2Value{Value: common.UpdateConNode{ConID: conversation.ConversationID,
Action: constant.UpdateLatestMessageChange, Args: []string{conversation.ConversationID}, Caller: "doUnreadCount"}, Ctx: ctx})
}
} else {
if err := c.db.UpdateColumnsConversation(ctx, conversation.ConversationID, map[string]interface{}{"unread_count": 0}); err != nil {
Expand Down Expand Up @@ -293,7 +296,7 @@ func (c *Conversation) doReadDrawing(ctx context.Context, msg *sdkws.MsgData) er
c.msgListener().OnRecvC2CReadReceipt(utils.StructToJsonString(messageReceiptResp))
}
} else {
c.doUnreadCount(ctx, conversation, tips.HasReadSeq, tips.Seqs)
return c.doUnreadCount(ctx, conversation, tips.HasReadSeq, tips.Seqs)
}
return nil
}
5 changes: 2 additions & 3 deletions internal/conversation_msg/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ func (c *Conversation) SyncAllConversationHashReadSeqs(ctx context.Context) erro
unreadCount = int32(v.MaxSeq - v.HasReadSeq)
}
if conversation, ok := conversationsOnLocalMap[conversationID]; ok {
if conversation.UnreadCount != unreadCount || conversation.HasReadSeq != v.HasReadSeq {
if err := c.db.UpdateColumnsConversation(ctx, conversationID, map[string]interface{}{"unread_count": unreadCount, "has_read_seq": v.HasReadSeq}); err != nil {
if conversation.UnreadCount != unreadCount {
if err := c.db.UpdateColumnsConversation(ctx, conversationID, map[string]interface{}{"unread_count": unreadCount}); err != nil {
log.ZWarn(ctx, "UpdateColumnsConversation err", err, "conversationID", conversationID)
continue
}
Expand Down Expand Up @@ -108,7 +108,6 @@ func (c *Conversation) SyncAllConversationHashReadSeqs(ctx context.Context) erro
unreadCount = int32(v.MaxSeq - v.HasReadSeq)
}
conversation.UnreadCount = unreadCount
conversation.HasReadSeq = v.HasReadSeq
}

stepStartTime = time.Now()
Expand Down
6 changes: 3 additions & 3 deletions internal/interaction/msg_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,14 +256,14 @@ func (m *MsgSyncer) doPushMsg(ctx context.Context, push *sdkws.PushMessages) {
m.pushTriggerAndSync(ctx, push.NotificationMsgs, m.triggerNotification)
}

func (m *MsgSyncer) pushTriggerAndSync(ctx context.Context, pullMsgs map[string]*sdkws.PullMsgs, triggerFunc func(ctx context.Context, msgs map[string]*sdkws.PullMsgs) error) {
if len(pullMsgs) == 0 {
func (m *MsgSyncer) pushTriggerAndSync(ctx context.Context, pushMessages map[string]*sdkws.PullMsgs, triggerFunc func(ctx context.Context, msgs map[string]*sdkws.PullMsgs) error) {
if len(pushMessages) == 0 {
return
}
needSyncSeqMap := make(map[string][2]int64)
var lastSeq int64
var storageMsgs []*sdkws.MsgData
for conversationID, msgs := range pullMsgs {
for conversationID, msgs := range pushMessages {
for _, msg := range msgs.Msgs {
if msg.Seq == 0 {
_ = triggerFunc(ctx, map[string]*sdkws.PullMsgs{conversationID: {Msgs: []*sdkws.MsgData{msg}}})
Expand Down
3 changes: 2 additions & 1 deletion pkg/common/trigger_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,8 @@ type UpdateConNode struct {
ConID string
Action int //1 Delete the conversation; 2 Update the latest news in the conversation or add a conversation; 3 Put a conversation on the top;
// 4 Cancel a conversation on the top, 5 Messages are not read and set to 0, 6 New conversations
Args interface{}
Args interface{}
Caller string
}
type UpdateMessageNode struct {
Action int
Expand Down
Loading

0 comments on commit 9effc2b

Please sign in to comment.