Commit c932315a authored by Adrien Béraud's avatar Adrien Béraud

connectionmanager: do not call callback while locked

Change-Id: I605b6ccccb83ee482a34640ba71cb9c0d548502f
parent 7ae5cbf5
......@@ -183,7 +183,20 @@ public:
onICERequestCallback iceReqCb_ {};
std::mutex connectCbsMtx_ {};
std::map<std::pair<std::string, dht::Value::Id>, ConnectCallback> pendingCbs_ {};
using CallbackId = std::pair<std::string, dht::Value::Id>;
std::map<CallbackId, ConnectCallback> pendingCbs_ {};
ConnectCallback getPendingCallback(const CallbackId& cbId)
{
ConnectCallback ret;
std::lock_guard<std::mutex> lk(connectCbsMtx_);
auto cbIt = pendingCbs_.find(cbId);
if (cbIt != pendingCbs_.end()) {
ret = std::move(cbIt->second);
pendingCbs_.erase(cbIt);
}
return ret;
}
std::shared_ptr<ConnectionManager::Impl> shared()
{
......@@ -214,19 +227,13 @@ ConnectionManager::Impl::connectDeviceStartIce(const std::string& deviceId,
return;
}
std::pair<std::string, dht::Value::Id> cbId(deviceId, vid);
std::unique_lock<std::mutex> lk(info->mutex_);
auto& ice = info->ice_;
auto onError = [&]() {
ice.reset();
std::lock_guard<std::mutex> lk(connectCbsMtx_);
auto cbIt = pendingCbs_.find(cbId);
if (cbIt != pendingCbs_.end()) {
if (cbIt->second)
cbIt->second(nullptr);
pendingCbs_.erase(cbIt);
}
if (auto cb = getPendingCallback({deviceId, vid}))
cb(nullptr);
};
if (!ice) {
......@@ -295,30 +302,17 @@ ConnectionManager::Impl::connectDeviceOnNegoDone(
if (!info)
return;
std::pair<std::string, dht::Value::Id> cbId(deviceId, vid);
std::unique_lock<std::mutex> lk {info->mutex_};
auto& ice = info->ice_;
auto onError = [&]() {
std::lock_guard<std::mutex> lk(connectCbsMtx_);
auto cbIt = pendingCbs_.find(cbId);
if (cbIt != pendingCbs_.end()) {
if (cbIt->second)
cbIt->second(nullptr);
pendingCbs_.erase(cbIt);
}
};
if (!ice || !ice->isRunning()) {
JAMI_ERR("No ICE detected or not running");
onError();
if (auto cb = getPendingCallback({deviceId, vid}))
cb(nullptr);
return;
}
// Build socket
auto endpoint = std::make_unique<IceSocketEndpoint>(std::shared_ptr<IceTransport>(
std::move(ice)),
true);
auto endpoint = std::make_unique<IceSocketEndpoint>(std::shared_ptr<IceTransport>(std::move(ice)), true);
// Negotiate a TLS session
JAMI_DBG() << account << "Start TLS session";
......@@ -338,14 +332,8 @@ ConnectionManager::Impl::connectDeviceOnNegoDone(
return;
if (!ok) {
JAMI_ERR() << "TLS connection failure for peer " << deviceId;
std::lock_guard<std::mutex> lk(sthis->connectCbsMtx_);
std::pair<std::string, dht::Value::Id> cbId(deviceId, vid);
auto cbIt = sthis->pendingCbs_.find(cbId);
if (cbIt != sthis->pendingCbs_.end()) {
if (cbIt->second)
cbIt->second(nullptr);
sthis->pendingCbs_.erase(cbIt);
}
if (auto cb = sthis->getPendingCallback({deviceId, vid}))
cb(nullptr);
} else {
// The socket is ready, store it
sthis->addNewMultiplexedSocket(deviceId, vid);
......@@ -433,22 +421,15 @@ ConnectionManager::Impl::connectDevice(const std::string& deviceId,
return;
if (!ok) {
JAMI_ERR("Cannot initialize ICE session.");
std::lock_guard<std::mutex> lk(sthis->connectCbsMtx_);
auto cbIt = sthis->pendingCbs_.find(cbId);
if (cbIt != sthis->pendingCbs_.end()) {
if (cbIt->second)
cbIt->second(nullptr);
sthis->pendingCbs_.erase(cbIt);
}
if (auto cb = sthis->getPendingCallback(cbId))
cb(nullptr);
return;
}
dht::ThreadPool::io().run(
[w = std::move(w), deviceId = std::move(deviceId), vid = std::move(vid)] {
auto sthis = w.lock();
if (!sthis)
return;
sthis->connectDeviceStartIce(deviceId, vid);
if (auto sthis = w.lock())
sthis->connectDeviceStartIce(deviceId, vid);
});
};
ice_config.onNegoDone = [w,
......@@ -462,13 +443,8 @@ ConnectionManager::Impl::connectDevice(const std::string& deviceId,
return;
if (!ok) {
JAMI_ERR("ICE negotiation failed.");
std::lock_guard<std::mutex> lk(sthis->connectCbsMtx_);
auto cbIt = sthis->pendingCbs_.find(cbId);
if (cbIt != sthis->pendingCbs_.end()) {
if (cbIt->second)
cbIt->second(nullptr);
sthis->pendingCbs_.erase(cbIt);
}
if (auto cb = sthis->getPendingCallback(cbId))
cb(nullptr);
return;
}
......@@ -498,13 +474,8 @@ ConnectionManager::Impl::connectDevice(const std::string& deviceId,
if (!info->ice_) {
JAMI_ERR("Cannot initialize ICE session.");
std::lock_guard<std::mutex> lk(sthis->connectCbsMtx_);
auto cbIt = sthis->pendingCbs_.find(cbId);
if (cbIt != sthis->pendingCbs_.end()) {
if (cbIt->second)
cbIt->second(nullptr);
sthis->pendingCbs_.erase(cbIt);
}
if (auto cb = sthis->getPendingCallback(cbId))
cb(nullptr);
return;
}
});
......@@ -526,16 +497,9 @@ ConnectionManager::Impl::sendChannelRequest(std::shared_ptr<MultiplexedSocket>&
msgpack::pack(ss, val);
auto toSend = ss.str();
sock->setOnChannelReady(channelSock->channel(), [channelSock, deviceId, vid, w = weak()]() {
auto shared = w.lock();
if (!shared)
return;
std::lock_guard<std::mutex> lk(shared->connectCbsMtx_);
std::pair<std::string, dht::Value::Id> cbId(deviceId, vid);
auto cbIt = shared->pendingCbs_.find(cbId);
if (cbIt != shared->pendingCbs_.end()) {
if (cbIt->second)
cbIt->second(channelSock);
shared->pendingCbs_.erase(cbIt);
if (auto shared = w.lock()) {
if (auto cb = shared->getPendingCallback({deviceId, vid}))
cb(channelSock);
}
});
std::error_code ec;
......@@ -842,25 +806,13 @@ ConnectionManager::Impl::addNewMultiplexedSocket(const std::string& deviceId,
if (!sthis)
return;
// Cancel current outgoing connections
{
std::lock_guard<std::mutex> lk(sthis->connectCbsMtx_);
if (!sthis->pendingCbs_.empty()) {
auto it = sthis->pendingCbs_.begin();
while (it != sthis->pendingCbs_.end()) {
if (it->first.first == deviceId && it->first.second == vid) {
it->second(nullptr);
it = sthis->pendingCbs_.erase(it);
} else {
++it;
}
}
}
}
dht::ThreadPool::io().run([w, deviceId, vid] {
if (auto cb = sthis->getPendingCallback({deviceId, vid}))
cb(nullptr);
dht::ThreadPool::io().run([w, deviceId = dht::InfoHash(deviceId), vid] {
auto sthis = w.lock();
if (!sthis)
return;
auto info = sthis->getInfo(dht::InfoHash(deviceId), vid);
auto info = sthis->getInfo(deviceId, vid);
if (!info)
return;
......@@ -871,7 +823,7 @@ ConnectionManager::Impl::addNewMultiplexedSocket(const std::string& deviceId,
info->ice_->cancelOperations();
std::lock_guard<std::mutex> lk(sthis->infosMtx_);
sthis->infos_.erase({dht::InfoHash(deviceId), vid});
sthis->infos_.erase({deviceId, vid});
});
});
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment