Commit e26b621b authored by Andreas Traczyk's avatar Andreas Traczyk Committed by Kateryna Kostiuk

conversations: fix race condition in message status update

Change-Id: I7c7393a1a85ede1a78c0a5311f9edd3f023274cd
Reviewed-by: Kateryna Kostiuk's avatarKateryna Kostiuk <kateryna.kostiuk@savoirfairelinux.com>
parent 2e779ee3
......@@ -126,9 +126,12 @@ class ConversationsManager: MessagesAdapterDelegate {
func messageStatusChanged(_ status: MessageStatus, for messageId: UInt64, from accountId: String,
to uri: String) {
guard let account = self.accountsService.getAccount(fromAccountId: accountId) else {
return
}
self.conversationService.messageStatusChanged(status,
for: messageId,
from: accountId,
fromAccount: account,
to: uri)
}
}
......@@ -37,6 +37,8 @@ class ConversationsService {
var conversations = Variable([ConversationModel]())
var messagesSemaphore = DispatchSemaphore(value: 1)
lazy var conversationsForCurrentAccount: Observable<[ConversationModel]> = {
return self.conversations.asObservable()
}()
......@@ -146,6 +148,7 @@ class ConversationsService {
shouldRefreshConversations: Bool) -> Completable {
return Completable.create(subscribe: { [unowned self] completable in
self.messagesSemaphore.wait()
self.dbManager.saveMessage(for: toAccountUri,
with: recipientRingId,
message: message,
......@@ -153,6 +156,12 @@ class ConversationsService {
interactionType: InteractionType.text)
.subscribeOn(ConcurrentDispatchQueueScheduler(qos: .background))
.subscribe(onCompleted: { [weak self] in
// append new message so it can be found if a status update is received before the DB finishes reload
self?.conversations.value.filter({ conversation in
return conversation.recipientRingId == recipientRingId &&
conversation.accountId == toAccountId
}).first?.messages.append(message)
self?.messagesSemaphore.signal()
if shouldRefreshConversations {
self?.dbManager.getConversationsObservable(for: toAccountId, accountURI: toAccountUri)
.subscribeOn(ConcurrentDispatchQueueScheduler(qos: .background))
......@@ -163,10 +172,11 @@ class ConversationsService {
}
completable(.completed)
}, onError: { error in
self.messagesSemaphore.signal()
completable(.error(error))
}).disposed(by: self.disposeBag)
return Disposables.create { }
return Disposables.create { }
})
}
......@@ -260,38 +270,42 @@ class ConversationsService {
func messageStatusChanged(_ status: MessageStatus,
for messageId: UInt64,
from accountId: String,
fromAccount account: AccountModel,
to uri: String) {
self.messagesSemaphore.wait()
//Get conversations for this sender
let conversation = self.conversations.value.filter({ conversation in
return conversation.recipientRingId == uri &&
conversation.accountId == accountId
conversation.accountId == account.id
}).first
//Find message
if let messages: [MessageModel] = conversation?.messages.filter({ (messages) -> Bool in
return !messages.daemonId.isEmpty && messages.daemonId == String(messageId) &&
((status.rawValue > messages.status.rawValue && status != .failure) ||
(status == .failure && messages.status == .sending))
if let messages: [MessageModel] = conversation?.messages.filter({ (message) -> Bool in
return !message.daemonId.isEmpty && message.daemonId == String(messageId) &&
((status.rawValue > message.status.rawValue && status != .failure) ||
(status == .failure && message.status == .sending))
}) {
if let message = messages.first {
self.dbManager
.updateMessageStatus(daemonID: message.daemonId, withStatus: status)
.subscribeOn(ConcurrentDispatchQueueScheduler(qos: .background))
.subscribe(onCompleted: { [unowned self] in
self.log.info("Message status updated")
self.messagesSemaphore.signal()
self.log.info("messageStatusChanged: Message status updated")
var event = ServiceEvent(withEventType: .messageStateChanged)
event.addEventInput(.messageStatus, value: status)
event.addEventInput(.messageId, value: String(messageId))
event.addEventInput(.id, value: accountId)
event.addEventInput(.id, value: account.id)
event.addEventInput(.uri, value: uri)
self.responseStream.onNext(event)
})
.disposed(by: disposeBag)
.disposed(by: self.disposeBag)
} else {
self.log.warning("messageStatusChanged: Message not found")
self.messagesSemaphore.signal()
}
}
log.debug("messageStatusChanged: \(status.rawValue) for: \(messageId) from: \(accountId) to: \(uri)")
log.debug("messageStatusChanged: \(status.rawValue) for: \(messageId) from: \(account.id) to: \(uri)")
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment