Commit dbcb0e6d authored by Sébastien Blin's avatar Sébastien Blin

jamiaccount: use p2p socket for all outgoing calls

Note: this patch changes the fallback logic. Because now, it only send a new
request when the previous connection is closed, not after 2 seconds

Change-Id: Ia99e1d6bfbf75fd87bb841f6ee4c9e9c3273e959
parent 5e74c021
......@@ -91,65 +91,40 @@ Call::Call(Account& account,
{
updateDetails(details);
addStateListener([this](Call::CallState call_state,
Call::ConnectionState cnx_state,
UNUSED int code) {
if (cnx_state == ConnectionState::PROGRESSING && startFallback_.exchange(false)
&& not isSubcall()) {
// If the other peer lose the connectivity during the progressing
// this means that the other peer had a connectivity change we didn't
// detect for now.
// In this case, we let two secs before sending a request via the DHT
// just to bypass the CONNECTING status
addStateListener(
[this](Call::CallState call_state, Call::ConnectionState cnx_state, UNUSED int code) {
checkPendingIM();
std::weak_ptr<Call> callWkPtr = shared_from_this();
Manager::instance().scheduler().scheduleIn(
[callWkPtr] {
if (auto callShPtr = callWkPtr.lock()) {
if (callShPtr->getConnectionState() == Call::ConnectionState::PROGRESSING) {
JAMI_WARN("Call %s is still connecting after timeout, sending fallback "
"request",
callShPtr->getCallId().c_str());
if (callShPtr->onNeedFallback_)
callShPtr->onNeedFallback_();
}
}
},
std::chrono::seconds(2));
}
checkPendingIM();
std::weak_ptr<Call> callWkPtr = shared_from_this();
runOnMainThread([callWkPtr] {
if (auto call = callWkPtr.lock())
call->checkAudio();
});
// if call just started ringing, schedule call timeout
if (type_ == CallType::INCOMING and cnx_state == ConnectionState::RINGING) {
auto timeout = Manager::instance().getRingingTimeout();
JAMI_DBG("Scheduling call timeout in %d seconds", timeout);
runOnMainThread([callWkPtr] {
if (auto call = callWkPtr.lock())
call->checkAudio();
});
std::weak_ptr<Call> callWkPtr = shared_from_this();
Manager::instance().scheduler().scheduleIn(
[callWkPtr] {
if (auto callShPtr = callWkPtr.lock()) {
if (callShPtr->getConnectionState() == Call::ConnectionState::RINGING) {
JAMI_DBG(
"Call %s is still ringing after timeout, setting state to BUSY",
callShPtr->getCallId().c_str());
callShPtr->hangup(PJSIP_SC_BUSY_HERE);
Manager::instance().callFailure(*callShPtr);
// if call just started ringing, schedule call timeout
if (type_ == CallType::INCOMING and cnx_state == ConnectionState::RINGING) {
auto timeout = Manager::instance().getRingingTimeout();
JAMI_DBG("Scheduling call timeout in %d seconds", timeout);
std::weak_ptr<Call> callWkPtr = shared_from_this();
Manager::instance().scheduler().scheduleIn(
[callWkPtr] {
if (auto callShPtr = callWkPtr.lock()) {
if (callShPtr->getConnectionState() == Call::ConnectionState::RINGING) {
JAMI_DBG(
"Call %s is still ringing after timeout, setting state to BUSY",
callShPtr->getCallId().c_str());
callShPtr->hangup(PJSIP_SC_BUSY_HERE);
Manager::instance().callFailure(*callShPtr);
}
}
}
},
std::chrono::seconds(timeout));
}
},
std::chrono::seconds(timeout));
}
// kill pending subcalls at disconnect
if (call_state == CallState::OVER)
hangupCalls(safePopSubcalls(), 0);
});
// kill pending subcalls at disconnect
if (call_state == CallState::OVER)
hangupCalls(safePopSubcalls(), 0);
});
time(&timestamp_start_);
account_.attachCall(id_);
......
......@@ -63,7 +63,6 @@ class Call : public Recordable, public std::enable_shared_from_this<Call>
{
public:
using SubcallSet = std::set<std::shared_ptr<Call>, std::owner_less<std::shared_ptr<Call>>>;
using OnNeedFallbackCb = std::function<void()>;
using OnReadyCb = std::function<void(bool)>;
static const char* const DEFAULT_ID;
......@@ -272,8 +271,6 @@ public:
return parent_ != nullptr;
}
void setOnNeedFallback(OnNeedFallbackCb&& cb) { onNeedFallback_ = std::move(cb); }
public: // media management
virtual bool toggleRecording();
......@@ -412,10 +409,6 @@ private:
///< MultiDevice: message received by subcall to merged yet
MsgList pendingInMessages_;
// If the call is blocked during the progressing state
OnNeedFallbackCb onNeedFallback_;
std::atomic_bool startFallback_ {true};
mutable std::mutex confInfoMutex_ {};
mutable ConfInfo confInfo_ {};
};
......
......@@ -508,160 +508,31 @@ JamiAccount::startOutgoingCall(const std::shared_ptr<SIPCall>& call, const std::
#endif
dht::InfoHash peer_account(toUri);
auto sendDhtRequest = [this, wCall, toUri, peer_account](const std::string& deviceId) {
auto sendRequest = [this, wCall, toUri](const std::string& deviceId) {
auto call = wCall.lock();
if (not call)
return;
JAMI_DBG("[call %s] calling device %s", call->getCallId().c_str(), deviceId.c_str());
auto& manager = Manager::instance();
auto dev_call = manager.callFactory.newCall<SIPCall, JamiAccount>(*this,
manager.getNewCallID(),
Call::CallType::OUTGOING,
call->getDetails());
auto state = call->getConnectionState();
if (state > Call::ConnectionState::PROGRESSING)
return;
auto callId = dev_call->getCallId();
auto onNegoDone = [callId, w = weak()](bool) {
runOnMainThread([callId, w]() {
if (auto shared = w.lock())
shared->checkPendingCall(callId);
});
};
auto dev_call = Manager::instance().callFactory.newCall<SIPCall, JamiAccount>(
*this, Manager::instance().getNewCallID(), Call::CallType::OUTGOING, call->getDetails());
std::weak_ptr<SIPCall> weak_dev_call = dev_call;
auto iceOptions = getIceOptions();
iceOptions.onNegoDone = onNegoDone;
dev_call->setIPToIP(true);
dev_call->setSecure(isTlsEnabled());
auto ice = createIceTransport(("sip:" + dev_call->getCallId()).c_str(),
ICE_COMPONENTS,
true,
iceOptions);
if (not ice) {
JAMI_WARN("[call %s] Can't create ICE", call->getCallId().c_str());
dev_call->removeCall();
return;
}
iceOptions.tcpEnable = true;
auto ice_tcp = createIceTransport(("sip:" + dev_call->getCallId()).c_str(),
ICE_COMPONENTS,
true,
iceOptions);
if (not ice_tcp) {
JAMI_WARN("Can't create ICE over TCP, will only use UDP");
}
dev_call->setState(Call::ConnectionState::TRYING);
call->addSubCall(*dev_call);
{
std::lock_guard<std::mutex> lk(pendingCallsMutex_);
pendingCalls_[deviceId].emplace_back(dev_call);
}
manager.addTask([w = weak(), weak_dev_call, ice, ice_tcp, deviceId, toUri, peer_account] {
auto sthis = w.lock();
if (not sthis) {
dht::ThreadPool::io().run([ice = std::move(ice), ice_tcp = std::move(ice_tcp)]() {});
return false;
}
auto call = weak_dev_call.lock();
// call aborted?
if (not call) {
dht::ThreadPool::io().run([ice = std::move(ice), ice_tcp = std::move(ice_tcp)]() {});
return false;
}
if (ice->isFailed()) {
JAMI_ERR("[call:%s] ice init failed", call->getCallId().c_str());
call->onFailure(EIO);
dht::ThreadPool::io().run([ice = std::move(ice), ice_tcp = std::move(ice_tcp)]() {});
return false;
}
if (ice_tcp && ice_tcp->isFailed()) {
JAMI_WARN("[call:%s] ice tcp init failed, will only use UDP",
call->getCallId().c_str());
}
// Loop until ICE transport is initialized.
// Note: we suppose that ICE init routine has a an internal timeout (bounded in time)
// and we let upper layers decide when the call shall be aborded (our first check upper).
if ((not ice->isInitialized()) || (ice_tcp && !ice_tcp->isInitialized()))
return true;
sthis->registerDhtAddress(*ice);
if (ice_tcp)
sthis->registerDhtAddress(*ice_tcp);
// Next step: sent the ICE data to peer through DHT
const dht::Value::Id callvid = ValueIdDist()(sthis->rand);
const auto callkey = dht::InfoHash::get("callto:" + deviceId);
auto blob = ice->packIceMsg();
if (ice_tcp) {
auto ice_tcp_msg = ice_tcp->packIceMsg(2);
blob.insert(blob.end(), ice_tcp_msg.begin(), ice_tcp_msg.end());
}
dht::Value val {dht::IceCandidates(callvid, blob)};
dht::InfoHash dev(deviceId);
sthis->dht_->putEncrypted(callkey,
dev,
std::move(val),
[weak_dev_call](bool ok) { // Put complete callback
if (!ok) {
JAMI_WARN("Can't put ICE descriptor on DHT");
if (auto call = weak_dev_call.lock())
call->onFailure();
} else
JAMI_DBG("Successfully put ICE descriptor on DHT");
});
auto listenKey = sthis->dht_->listen<dht::IceCandidates>(
callkey, [weak_dev_call, ice, ice_tcp, callvid, deviceId](dht::IceCandidates&& msg) {
if (msg.id != callvid or msg.from.toString() != deviceId)
return true;
auto call = weak_dev_call.lock();
if (!call)
return false;
// remove unprintable characters
auto iceData = std::string(msg.ice_data.cbegin(), msg.ice_data.cend());
iceData.erase(std::remove_if(iceData.begin(),
iceData.end(),
[](unsigned char c) {
return !std::isprint(c) && !std::isspace(c);
}),
iceData.end());
JAMI_WARN("ICE request for call %s replied from DHT peer %s\nData: %s",
call->getCallId().c_str(),
deviceId.c_str(),
iceData.c_str());
call->setState(Call::ConnectionState::PROGRESSING);
auto udp_failed = true, tcp_failed = true;
initICE(msg.ice_data, ice, ice_tcp, udp_failed, tcp_failed);
if (udp_failed && tcp_failed) {
call->onFailure();
return true;
}
return false;
});
std::lock_guard<std::mutex> lock(sthis->callsMutex_);
sthis->pendingCalls_
.emplace(call->getCallId(),
PendingCall {std::chrono::steady_clock::now(),
std::move(ice),
std::move(ice_tcp),
weak_dev_call,
std::move(listenKey),
callkey,
dev,
peer_account,
tls::CertificateStore::instance().getCertificate(toUri)});
Manager::instance().scheduleTask(
[w, callId = call->getCallId()]() {
if (auto shared = w.lock())
shared->checkPendingCall(callId);
},
std::chrono::steady_clock::now() + ICE_NEGOTIATION_TIMEOUT);
return false;
});
JAMI_WARN("[call %s] No channeled socket with this peer. Send request",
call->getCallId().c_str());
// Else, ask for a channel (for future calls/text messages)
requestSIPConnection(toUri, deviceId);
};
// Call connected devices
......@@ -708,6 +579,23 @@ JamiAccount::startOutgoingCall(const std::shared_ptr<SIPCall>& call, const std::
// and avoid to get an active call in a TRYING state.
dev_call->setState(Call::ConnectionState::PROGRESSING);
{
std::lock_guard<std::mutex> lk(onConnectionClosedMtx_);
onConnectionClosed_[deviceConnIt->first] = sendRequest;
}
call->addStateListener(
[w = weak(),
deviceId = deviceConnIt->first](Call::CallState, Call::ConnectionState state, int) {
if (state >= Call::ConnectionState::PROGRESSING) {
if (auto shared = w.lock()) {
JAMI_ERR("@@@ ERASE");
std::lock_guard<std::mutex> lk(shared->onConnectionClosedMtx_);
shared->onConnectionClosed_.erase(deviceId);
}
}
});
auto remoted_address = it.channel->underlyingICE()->getRemoteAddress(ICE_COMP_SIP_TRANSPORT);
try {
onConnectedOutgoingCall(*dev_call, toUri, remoted_address);
......@@ -720,26 +608,16 @@ JamiAccount::startOutgoingCall(const std::shared_ptr<SIPCall>& call, const std::
continue;
}
devices.emplace(deviceConnIt->first);
call->setOnNeedFallback(
[sendDhtRequest, deviceId = deviceConnIt->first]() { sendDhtRequest(deviceId); });
}
// Find listening devices for this account
accountManager_->forEachDevice(
peer_account,
[this, toUri, devices, sendDhtRequest, callId = call->getCallId()](
const dht::InfoHash& dev) {
[this, toUri, devices, call, sendRequest](const dht::InfoHash& dev) {
// Test if already sent via a SIP transport
if (devices.find(dev.toString()) != devices.end())
return;
JAMI_WARN("[call %s] No channeled socket with this peer. Send request + DHT request",
callId.c_str());
// Else, ask for a channel (for future calls/text messages) and send a DHT message
requestSIPConnection(toUri, dev.toString());
sendDhtRequest(dev.toString());
sendRequest(dev.toString());
},
[wCall, dummyCall](bool ok) {
// Mark the temp call as failed to stop the main call if necessary
......@@ -1618,8 +1496,8 @@ JamiAccount::checkPendingCall(const std::string& callId)
// Note only one check at a time. In fact, the UDP and TCP negotiation
// can finish at the same time and we need to avoid potential race conditions.
std::lock_guard<std::mutex> lk(callsMutex_);
auto it = pendingCalls_.find(callId);
if (it == pendingCalls_.end())
auto it = pendingCallsDht_.find(callId);
if (it == pendingCallsDht_.end())
return;
bool incoming = !it->second.call_key;
......@@ -1636,7 +1514,7 @@ JamiAccount::checkPendingCall(const std::string& callId)
// Cancel pending listen (outgoing call)
dht_->cancelListen(it->second.call_key, std::move(it->second.listen_key));
}
pendingCalls_.erase(it);
pendingCallsDht_.erase(it);
}
}
......@@ -2535,16 +2413,16 @@ JamiAccount::replyToIncomingIceMsg(const std::shared_ptr<SIPCall>& call,
// Let the call handled by the PendingCall handler loop
std::lock_guard<std::mutex> lock(callsMutex_);
pendingCalls_.emplace(call->getCallId(),
PendingCall {/*.start = */ started_time,
/*.ice_sp = */ udp_failed ? nullptr : ice,
/*.ice_tcp_sp = */ tcp_failed ? nullptr : ice_tcp,
/*.call = */ wcall,
/*.listen_key = */ {},
/*.call_key = */ {},
/*.from = */ peer_ice_msg.from,
/*.from_account = */ from_id,
/*.from_cert = */ from_cert});
pendingCallsDht_.emplace(call->getCallId(),
PendingCall {/*.start = */ started_time,
/*.ice_sp = */ udp_failed ? nullptr : ice,
/*.ice_tcp_sp = */ tcp_failed ? nullptr : ice_tcp,
/*.call = */ wcall,
/*.listen_key = */ {},
/*.call_key = */ {},
/*.from = */ peer_ice_msg.from,
/*.from_account = */ from_id,
/*.from_cert = */ from_cert});
Manager::instance().scheduleTask(
[w = weak(), callId = call->getCallId()]() {
......@@ -2573,10 +2451,15 @@ JamiAccount::doUnregister(std::function<void(bool)> released_cb)
{
std::lock_guard<std::mutex> lock(callsMutex_);
pendingCalls_.clear();
pendingCallsDht_.clear();
pendingSipCalls_.clear();
}
{
std::lock_guard<std::mutex> lk(pendingCallsMutex_);
pendingCalls_.clear();
}
dht_->join();
if (upnp_)
......@@ -3793,19 +3676,36 @@ JamiAccount::cacheSIPConnection(std::shared_ptr<ChannelSocket>&& socket,
auto shared = w.lock();
if (!shared)
return;
std::lock_guard<std::mutex> lk(shared->sipConnectionsMtx_);
auto& connections = shared->sipConnections_[peerId][deviceId];
auto conn = connections.begin();
while (conn != connections.end()) {
if (conn->channel == socket)
conn = connections.erase(conn);
else
conn++;
{
std::lock_guard<std::mutex> lk(shared->sipConnectionsMtx_);
auto& connections = shared->sipConnections_[peerId][deviceId];
auto conn = connections.begin();
while (conn != connections.end()) {
if (conn->channel == socket)
conn = connections.erase(conn);
else
conn++;
}
}
// The connection can be closed during the SIP initialization, so
// if this happens, the request should be re-sent to ask for a new
// SIP channel to make the call pass through
std::function<void(const std::string&)> cb;
{
std::lock_guard<std::mutex> lk(shared->onConnectionClosedMtx_);
if (shared->onConnectionClosed_[deviceId]) {
cb = std::move(shared->onConnectionClosed_[deviceId]);
shared->onConnectionClosed_.erase(deviceId);
}
}
if (cb) {
JAMI_WARN("An outgoing call was in progress while shutdown, relaunch the request");
cb(deviceId);
}
};
auto sip_tr = link_.sipTransportBroker->getChanneledTransport(socket, std::move(onShutdown));
// Store the connection
sipConnections_[peerId][deviceId].emplace_back(SipConnection {std::move(sip_tr), socket});
sipConnections_[peerId][deviceId].emplace_back(SipConnection {sip_tr, socket});
JAMI_WARN("New SIP channel opened with %s", deviceId.c_str());
lk.unlock();
......@@ -3813,6 +3713,30 @@ JamiAccount::cacheSIPConnection(std::shared_ptr<ChannelSocket>&& socket,
// Retry messages
messageEngine_.onPeerOnline(peerId);
// Connect pending calls
std::vector<std::shared_ptr<SIPCall>> pc;
{
std::lock_guard<std::mutex> lk(pendingCallsMutex_);
pc = std::move(pendingCalls_[deviceId]);
}
for (auto& pendingCall : pc) {
pendingCall->setTransport(sip_tr);
pendingCall->setState(Call::ConnectionState::PROGRESSING);
if (auto ice = socket->underlyingICE()) {
auto remoted_address = ice->getRemoteAddress(ICE_COMP_SIP_TRANSPORT);
try {
onConnectedOutgoingCall(*pendingCall, peerId, remoted_address);
} catch (const VoipLinkException&) {
// In this case, the main scenario is that SIPStartCall failed because
// the ICE is dead and the TLS session didn't send any packet on that dead
// link (connectivity change, killed by the os, etc)
// Here, we don't need to do anything, the TLS will fail and will delete
// the cached transport
continue;
}
}
}
}
} // namespace jami
......@@ -602,7 +602,7 @@ private:
/**
* DHT calls waiting for ICE negotiation
*/
std::map<std::string, PendingCall> pendingCalls_;
std::map<std::string, PendingCall> pendingCallsDht_;
/**
* Incoming DHT calls that are not yet actual SIP calls.
......@@ -729,6 +729,12 @@ private:
std::set<std::pair<std::string /* accountId */, std::string /* deviceId */>>
pendingSipConnections_ {};
std::mutex pendingCallsMutex_;
std::map<std::string, std::vector<std::shared_ptr<SIPCall>>> pendingCalls_;
std::mutex onConnectionClosedMtx_ {};
std::map<std::string, std::function<void(const std::string&)>> onConnectionClosed_ {};
/**
* Ask a device to open a channeled SIP socket
* @param peerId The contact who owns the device
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment