Commit 11585ed2 authored by Philippe Gorley's avatar Philippe Gorley Committed by Philippe Gorley

recorder: refactor pipeline

Moves the recorder up one level to the VideoInput, VideoReceiveThread
and AudioReceiveThread, instead of the MediaDecoder (there's no
equivalent to the VideoInput in the audio layer).

Emits the RecordPlaybackStopped when the recording is stopped, so the
client can sync its recording state with the daemon, in case the daemon
stops recording by itself (rather than user intervention).

Change-Id: I743b080cb354273ec074fec51caf2a4328fc1c58
parent e2de2c2d
......@@ -233,6 +233,9 @@ class AudioReceiveThread
void openDecoder();
bool decodeFrame();
std::weak_ptr<MediaRecorder> recorder_;
bool recordingStarted_{false};
/*-----------------------------------------------------------------*/
/* These variables should be used in thread (i.e. process()) only! */
/*-----------------------------------------------------------------*/
......@@ -273,6 +276,8 @@ AudioReceiveThread::AudioReceiveThread(const std::string& id,
AudioReceiveThread::~AudioReceiveThread()
{
if (auto rec = recorder_.lock())
rec->stopRecording();
loop_.join();
}
......@@ -309,6 +314,20 @@ AudioReceiveThread::process()
switch (audioDecoder_->decode(decodedFrame)) {
case MediaDecoder::Status::FrameFinished:
if (auto rec = recorder_.lock()) {
if (!recordingStarted_) {
if (rec->addStream(false, true, audioDecoder_->getStream()) >= 0) {
recordingStarted_ = true;
} else {
recorder_ = std::weak_ptr<MediaRecorder>();
}
}
if (recordingStarted_)
rec->recordData(decodedFrame.pointer(), false, true);
} else {
recordingStarted_ = false;
recorder_ = std::weak_ptr<MediaRecorder>();
}
audioDecoder_->writeToRingBuffer(decodedFrame, *ringbuffer_,
mainBuffFormat);
// Refresh the remote audio codec in the callback SmartInfo
......@@ -384,8 +403,8 @@ AudioReceiveThread::startLoop()
void
AudioReceiveThread::initRecorder(std::shared_ptr<MediaRecorder>& rec)
{
if (audioDecoder_)
audioDecoder_->initRecorder(rec);
rec->incrementStreams(1);
recorder_ = rec;
}
AudioRtpSession::AudioRtpSession(const std::string& id)
......
......@@ -28,7 +28,6 @@
#include "audio/resampler.h"
#include "decoder_finder.h"
#include "manager.h"
#include "media_recorder.h"
#ifdef RING_ACCEL
#include "video/accel.h"
......@@ -60,8 +59,6 @@ MediaDecoder::MediaDecoder() :
MediaDecoder::~MediaDecoder()
{
if (auto rec = recorder_.lock())
rec->stopRecording();
#ifdef RING_ACCEL
if (decoderCtx_ && decoderCtx_->hw_device_ctx)
av_buffer_unref(&decoderCtx_->hw_device_ctx);
......@@ -293,20 +290,7 @@ MediaDecoder::decode(VideoFrame& result)
frame->pts = av_rescale_q_rnd(av_gettime() - startTime_,
{1, AV_TIME_BASE}, decoderCtx_->time_base,
static_cast<AVRounding>(AV_ROUND_NEAR_INF|AV_ROUND_PASS_MINMAX));
if (auto rec = recorder_.lock()) {
bool fromPeer = (inputCtx_->iformat->name == std::string("sdp"));
if (!recordingStarted_) {
auto ms = MediaStream("", decoderCtx_, frame->pts);
ms.format = frame->format; // might not match avStream_ if accel is used
if (rec->addStream(true, fromPeer, ms) >= 0)
recordingStarted_ = true;
else
recorder_ = std::weak_ptr<MediaRecorder>();
}
if (recordingStarted_)
rec->recordData(frame, true, fromPeer);
}
lastTimestamp_ = frame->pts;
if (emulateRate_ and packetTimestamp != AV_NOPTS_VALUE) {
auto frame_time = getTimeBase()*(packetTimestamp - avStream_->start_time);
......@@ -365,18 +349,7 @@ MediaDecoder::decode(const AudioFrame& decodedFrame)
// NOTE don't use clock to rescale audio pts, it may create artifacts
frame->pts = av_rescale_q_rnd(frame->pts, avStream_->time_base, decoderCtx_->time_base,
static_cast<AVRounding>(AV_ROUND_NEAR_INF|AV_ROUND_PASS_MINMAX));
if (auto rec = recorder_.lock()) {
if (!recordingStarted_) {
auto ms = MediaStream("", decoderCtx_, frame->pts);
if (rec->addStream(false, true, ms) >= 0)
recordingStarted_ = true;
else
recorder_ = std::weak_ptr<MediaRecorder>();
}
if (recordingStarted_)
rec->recordData(frame, false, true);
}
lastTimestamp_ = frame->pts;
if (emulateRate_ and packetTimestamp != AV_NOPTS_VALUE) {
auto frame_time = getTimeBase()*(packetTimestamp - avStream_->start_time);
......@@ -525,15 +498,15 @@ MediaDecoder::correctPixFmt(int input_pix_fmt) {
return pix_fmt;
}
void
MediaDecoder::initRecorder(std::shared_ptr<MediaRecorder>& rec)
MediaStream
MediaDecoder::getStream() const
{
// recording will start once we can send an AVPacket to the recorder
recordingStarted_ = false;
recorder_ = rec;
if (auto r = recorder_.lock()) {
r->incrementStreams(1);
}
auto ms = MediaStream("", decoderCtx_, lastTimestamp_);
#ifdef RING_ACCEL
if (decoderCtx_->codec_type == AVMEDIA_TYPE_VIDEO && enableAccel_ && !accel_.name.empty())
ms.format = AV_PIX_FMT_NV12; // TODO option me!
#endif
return ms;
}
} // namespace ring
......@@ -34,6 +34,7 @@
#include "audio/audiobuffer.h"
#include "media_stream.h"
#include "rational.h"
#include "noncopyable.h"
......@@ -57,7 +58,6 @@ class RingBuffer;
class Resampler;
class MediaIOHandle;
struct DeviceParams;
class MediaRecorder;
class MediaDecoder {
public:
......@@ -100,7 +100,7 @@ class MediaDecoder {
void enableAccel(bool enableAccel);
#endif
void initRecorder(std::shared_ptr<MediaRecorder>& rec);
MediaStream getStream() const;
private:
NON_COPYABLE(MediaDecoder);
......@@ -115,6 +115,7 @@ class MediaDecoder {
int streamIndex_ = -1;
bool emulateRate_ = false;
int64_t startTime_;
int64_t lastTimestamp_;
AudioBuffer decBuff_;
AudioBuffer resamplingBuff_;
......@@ -131,9 +132,6 @@ class MediaDecoder {
unsigned short accelFailures_ = 0;
#endif
std::weak_ptr<MediaRecorder> recorder_;
bool recordingStarted_ = false;
protected:
AVDictionary *options_ = nullptr;
};
......
......@@ -446,13 +446,17 @@ int MediaEncoder::encode_audio(const AudioBuffer &buffer)
if (auto rec = recorder_.lock()) {
if (!recordingStarted_) {
auto ms = MediaStream("", encoders_[currentStreamIdx_], frame->pts);
if (rec->addStream(false, false, ms) >= 0)
if (rec->addStream(false, false, ms) >= 0) {
recordingStarted_ = true;
else
} else {
recorder_ = std::weak_ptr<MediaRecorder>();
}
}
if (recordingStarted_)
rec->recordData(frame, false, false);
} else {
recordingStarted_ = false;
recorder_ = std::weak_ptr<MediaRecorder>();
}
encode(frame, currentStreamIdx_);
......
......@@ -19,6 +19,7 @@
*/
#include "libav_deps.h" // MUST BE INCLUDED FIRST
#include "client/ring_signal.h"
#include "fileutils.h"
#include "logger.h"
#include "media_io_handle.h"
......@@ -179,6 +180,7 @@ MediaRecorder::stopRecording()
isRecording_ = false;
loop_.join();
flush();
emitSignal<DRing::CallSignal::RecordPlaybackStopped>(getPath());
}
resetToDefaults();
}
......@@ -220,7 +222,7 @@ MediaRecorder::recordData(AVFrame* frame, bool isVideo, bool fromPeer)
return 0;
// save a copy of the frame, will be filtered/encoded in another thread
const MediaStream& ms = streams_[isVideo][fromPeer];
MediaStream& ms = streams_[isVideo][fromPeer];
AVFrame* input = av_frame_clone(frame);
input->pts = input->pts - ms.firstTimestamp; // stream has to start at 0
......
......@@ -63,6 +63,8 @@ VideoInput::VideoInput()
VideoInput::~VideoInput()
{
if (auto rec = recorder_.lock())
rec->stopRecording();
#if defined(__ANDROID__) || defined(RING_UWP) || (defined(TARGET_OS_IOS) && TARGET_OS_IOS)
/* we need to stop the loop and notify the condition variable
* to unblock the process loop */
......@@ -206,7 +208,8 @@ bool VideoInput::captureFrame()
if (not decoder_)
return false;
const auto ret = decoder_->decode(getNewFrame());
auto& frame = getNewFrame();
const auto ret = decoder_->decode(frame);
switch (ret) {
case MediaDecoder::Status::ReadError:
return false;
......@@ -229,6 +232,20 @@ bool VideoInput::captureFrame()
return static_cast<bool>(decoder_);
case MediaDecoder::Status::FrameFinished:
if (auto rec = recorder_.lock()) {
if (!recordingStarted_) {
if (rec->addStream(true, false, decoder_->getStream()) >= 0) {
recordingStarted_ = true;
} else {
recorder_ = std::weak_ptr<MediaRecorder>();
}
}
if (recordingStarted_)
rec->recordData(frame.pointer(), true, false);
} else {
recordingStarted_ = false;
recorder_ = std::weak_ptr<MediaRecorder>();
}
publishFrame();
return true;
// continue decoding
......@@ -590,8 +607,8 @@ VideoInput::foundDecOpts(const DeviceParams& params)
void
VideoInput::initRecorder(std::shared_ptr<MediaRecorder>& rec)
{
if (decoder_)
decoder_->initRecorder(rec);
rec->incrementStreams(1);
recorder_ = rec;
}
}} // namespace ring::video
......@@ -143,6 +143,9 @@ private:
void releaseBufferCb(uint8_t* ptr);
std::array<struct VideoFrameBuffer, 8> buffers_;
#endif
std::weak_ptr<MediaRecorder> recorder_;
bool recordingStarted_{false};
};
}} // namespace ring::video
......
......@@ -56,6 +56,8 @@ VideoReceiveThread::VideoReceiveThread(const std::string& id,
VideoReceiveThread::~VideoReceiveThread()
{
if (auto rec = recorder_.lock())
rec->stopRecording();
loop_.join();
}
......@@ -166,10 +168,25 @@ void VideoReceiveThread::addIOContext(SocketPair& socketPair)
bool VideoReceiveThread::decodeFrame()
{
const auto ret = videoDecoder_->decode(getNewFrame());
auto& frame = getNewFrame();
const auto ret = videoDecoder_->decode(frame);
switch (ret) {
case MediaDecoder::Status::FrameFinished:
if (auto rec = recorder_.lock()) {
if (!recordingStarted_) {
if (rec->addStream(true, true, videoDecoder_->getStream()) >= 0) {
recordingStarted_ = true;
} else {
recorder_ = std::weak_ptr<MediaRecorder>();
}
}
if (recordingStarted_)
rec->recordData(frame.pointer(), true, true);
} else {
recordingStarted_ = false;
recorder_ = std::weak_ptr<MediaRecorder>();
}
publishFrame();
return true;
......@@ -241,8 +258,8 @@ VideoReceiveThread::triggerKeyFrameRequest()
void
VideoReceiveThread::initRecorder(std::shared_ptr<ring::MediaRecorder>& rec)
{
if (videoDecoder_)
videoDecoder_->initRecorder(rec);
rec->incrementStreams(1);
recorder_ = rec;
}
}} // namespace ring::video
......@@ -89,6 +89,9 @@ private:
static int interruptCb(void *ctx);
static int readFunction(void *opaque, uint8_t *buf, int buf_size);
std::weak_ptr<MediaRecorder> recorder_;
bool recordingStarted_{false};
ThreadLoop loop_;
// used by ThreadLoop
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment