Skip to content

Commit

Permalink
Open recording file from the recorder thread
Browse files Browse the repository at this point in the history
The recorder opened the target file from the packet sink open()
callback, called by the demuxer. Only then the recorder thread was
started.

One golden rule for the recorder is to never block the demuxer for I/O,
because it would impact mirroring. This rule is respected on recording
packets, but not for the initial recorder opening.

Therefore, start the recorder thread from sc_recorder_init(), open the
file immediately from the recorder thread, then make it wait for the
stream to start (on packet sink open()).

Now that the recorder can report errors directly (rather than making the
demuxer call fail), it is possible to report file opening error even
before the packet sink is open.
  • Loading branch information
rom1v committed Mar 10, 2023
1 parent 6b5dfef commit a039124
Show file tree
Hide file tree
Showing 4 changed files with 178 additions and 91 deletions.
250 changes: 161 additions & 89 deletions app/src/recorder.c
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,76 @@ sc_recorder_write(struct sc_recorder *recorder, AVPacket *packet) {
return av_write_frame(recorder->ctx, packet) >= 0;
}

static int
run_recorder(void *data) {
struct sc_recorder *recorder = data;
static bool
sc_recorder_open_output_file(struct sc_recorder *recorder) {
const char *format_name = sc_recorder_get_format_name(recorder->format);
assert(format_name);
const AVOutputFormat *format = find_muxer(format_name);
if (!format) {
LOGE("Could not find muxer");
return false;
}

recorder->ctx = avformat_alloc_context();
if (!recorder->ctx) {
LOG_OOM();
return false;
}

int ret = avio_open(&recorder->ctx->pb, recorder->filename,
AVIO_FLAG_WRITE);
if (ret < 0) {
LOGE("Failed to open output file: %s", recorder->filename);
avformat_free_context(recorder->ctx);
return false;
}

// contrary to the deprecated API (av_oformat_next()), av_muxer_iterate()
// returns (on purpose) a pointer-to-const, but AVFormatContext.oformat
// still expects a pointer-to-non-const (it has not be updated accordingly)
// <https://github.com/FFmpeg/FFmpeg/commit/0694d8702421e7aff1340038559c438b61bb30dd>
recorder->ctx->oformat = (AVOutputFormat *) format;

av_dict_set(&recorder->ctx->metadata, "comment",
"Recorded by scrcpy " SCRCPY_VERSION, 0);

LOGI("Recording started to %s file: %s", format_name, recorder->filename);
return true;
}

static void
sc_recorder_close_output_file(struct sc_recorder *recorder) {
avio_close(recorder->ctx->pb);
avformat_free_context(recorder->ctx);
}

static bool
sc_recorder_wait_video_stream(struct sc_recorder *recorder) {
sc_mutex_lock(&recorder->mutex);
while (!recorder->codec && !recorder->stopped) {
sc_cond_wait(&recorder->stream_cond, &recorder->mutex);
}
const AVCodec *codec = recorder->codec;
sc_mutex_unlock(&recorder->mutex);

if (codec) {
AVStream *ostream = avformat_new_stream(recorder->ctx, codec);
if (!ostream) {
return false;
}

ostream->codecpar->codec_type = AVMEDIA_TYPE_VIDEO;
ostream->codecpar->codec_id = codec->id;
ostream->codecpar->format = AV_PIX_FMT_YUV420P;
ostream->codecpar->width = recorder->declared_frame_size.width;
ostream->codecpar->height = recorder->declared_frame_size.height;
}

return true;
}

static bool
sc_recorder_process_packets(struct sc_recorder *recorder) {
int64_t pts_origin = AV_NOPTS_VALUE;

// We can write a packet only once we received the next one so that we can
Expand Down Expand Up @@ -206,42 +272,70 @@ run_recorder(void *data) {
if (!ok) {
LOGE("Could not record packet");

sc_mutex_lock(&recorder->mutex);
recorder->failed = true;
// discard pending packets
sc_recorder_queue_clear(&recorder->queue);
sc_mutex_unlock(&recorder->mutex);
break;
return false;
}

previous = rec;
}

if (!recorder->failed) {
if (recorder->header_written) {
int ret = av_write_trailer(recorder->ctx);
if (ret < 0) {
LOGE("Failed to write trailer to %s", recorder->filename);
recorder->failed = true;
}
} else {
// the recorded file is empty
recorder->failed = true;
}
if (!recorder->header_written) {
// the recorded file is empty
return false;
}

if (recorder->failed) {
LOGE("Recording failed to %s", recorder->filename);
} else {
int ret = av_write_trailer(recorder->ctx);
if (ret < 0) {
LOGE("Failed to write trailer to %s", recorder->filename);
return false;
}

return true;
}

static bool
sc_recorder_record(struct sc_recorder *recorder) {
bool ok = sc_recorder_open_output_file(recorder);
if (!ok) {
return false;
}

ok = sc_recorder_wait_video_stream(recorder);
if (!ok) {
sc_recorder_close_output_file(recorder);
return false;
}

// If recorder->stopped, process any queued packet anyway

ok = sc_recorder_process_packets(recorder);
sc_recorder_close_output_file(recorder);
return ok;
}

static int
run_recorder(void *data) {
struct sc_recorder *recorder = data;

bool success = sc_recorder_record(recorder);

sc_mutex_lock(&recorder->mutex);
// Prevent the producer to push any new packet
recorder->stopped = true;
// Discard pending packets
sc_recorder_queue_clear(&recorder->queue);
sc_mutex_unlock(&recorder->mutex);

if (success) {
const char *format_name = sc_recorder_get_format_name(recorder->format);
LOGI("Recording complete to %s file: %s", format_name,
recorder->filename);
} else {
LOGE("Recording failed to %s", recorder->filename);
}

LOGD("Recorder thread ended");

recorder->cbs->on_ended(recorder, !recorder->failed,
recorder->cbs_userdata);
recorder->cbs->on_ended(recorder, success, recorder->cbs_userdata);

return 0;
}
Expand All @@ -252,81 +346,28 @@ sc_recorder_packet_sink_open(struct sc_packet_sink *sink,
struct sc_recorder *recorder = DOWNCAST(sink);
assert(codec);

const char *format_name = sc_recorder_get_format_name(recorder->format);
assert(format_name);
const AVOutputFormat *format = find_muxer(format_name);
if (!format) {
LOGE("Could not find muxer");
return false;
}

recorder->ctx = avformat_alloc_context();
if (!recorder->ctx) {
LOG_OOM();
sc_mutex_lock(&recorder->mutex);
if (recorder->stopped) {
sc_mutex_unlock(&recorder->mutex);
return false;
}

// contrary to the deprecated API (av_oformat_next()), av_muxer_iterate()
// returns (on purpose) a pointer-to-const, but AVFormatContext.oformat
// still expects a pointer-to-non-const (it has not be updated accordingly)
// <https://github.com/FFmpeg/FFmpeg/commit/0694d8702421e7aff1340038559c438b61bb30dd>
recorder->ctx->oformat = (AVOutputFormat *) format;

av_dict_set(&recorder->ctx->metadata, "comment",
"Recorded by scrcpy " SCRCPY_VERSION, 0);

AVStream *ostream = avformat_new_stream(recorder->ctx, codec);
if (!ostream) {
goto error_avformat_free_context;
}

ostream->codecpar->codec_type = AVMEDIA_TYPE_VIDEO;
ostream->codecpar->codec_id = codec->id;
ostream->codecpar->format = AV_PIX_FMT_YUV420P;
ostream->codecpar->width = recorder->declared_frame_size.width;
ostream->codecpar->height = recorder->declared_frame_size.height;

int ret = avio_open(&recorder->ctx->pb, recorder->filename,
AVIO_FLAG_WRITE);
if (ret < 0) {
LOGE("Failed to open output file: %s", recorder->filename);
// ostream will be cleaned up during context cleaning
goto error_avformat_free_context;
}

LOGD("Starting recorder thread");
bool ok = sc_thread_create(&recorder->thread, run_recorder,
"scrcpy-recorder", recorder);
if (!ok) {
LOGE("Could not start recorder thread");
goto error_avio_close;
}

LOGI("Recording started to %s file: %s", format_name, recorder->filename);
recorder->codec = codec;
sc_cond_signal(&recorder->stream_cond);
sc_mutex_unlock(&recorder->mutex);

return true;

error_avio_close:
avio_close(recorder->ctx->pb);
error_avformat_free_context:
avformat_free_context(recorder->ctx);

return false;
}

static void
sc_recorder_packet_sink_close(struct sc_packet_sink *sink) {
struct sc_recorder *recorder = DOWNCAST(sink);

sc_mutex_lock(&recorder->mutex);
// EOS also stops the recorder
recorder->stopped = true;
sc_cond_signal(&recorder->queue_cond);
sc_mutex_unlock(&recorder->mutex);

sc_thread_join(&recorder->thread, NULL);

avio_close(recorder->ctx->pb);
avformat_free_context(recorder->ctx);
}

static bool
Expand All @@ -335,10 +376,9 @@ sc_recorder_packet_sink_push(struct sc_packet_sink *sink,
struct sc_recorder *recorder = DOWNCAST(sink);

sc_mutex_lock(&recorder->mutex);
assert(!recorder->stopped);

if (recorder->failed) {
// reject any new packet (this will stop the stream)
if (recorder->stopped) {
// reject any new packet
sc_mutex_unlock(&recorder->mutex);
return false;
}
Expand Down Expand Up @@ -378,11 +418,17 @@ sc_recorder_init(struct sc_recorder *recorder, const char *filename,
goto error_mutex_destroy;
}

ok = sc_cond_init(&recorder->stream_cond);
if (!ok) {
goto error_queue_cond_destroy;
}

sc_queue_init(&recorder->queue);
recorder->stopped = false;
recorder->failed = false;
recorder->header_written = false;

recorder->codec = NULL;

recorder->format = format;
recorder->declared_frame_size = declared_frame_size;

Expand All @@ -398,8 +444,19 @@ sc_recorder_init(struct sc_recorder *recorder, const char *filename,

recorder->packet_sink.ops = &ops;

ok = sc_thread_create(&recorder->thread, run_recorder, "scrcpy-recorder",
recorder);
if (!ok) {
LOGE("Could not start recorder thread");
goto error_stream_cond_destroy;
}

return true;

error_stream_cond_destroy:
sc_cond_destroy(&recorder->stream_cond);
error_queue_cond_destroy:
sc_cond_destroy(&recorder->queue_cond);
error_mutex_destroy:
sc_mutex_destroy(&recorder->mutex);
error_free_filename:
Expand All @@ -408,8 +465,23 @@ sc_recorder_init(struct sc_recorder *recorder, const char *filename,
return false;
}

void
sc_recorder_stop(struct sc_recorder *recorder) {
sc_mutex_lock(&recorder->mutex);
recorder->stopped = true;
sc_cond_signal(&recorder->queue_cond);
sc_cond_signal(&recorder->stream_cond);
sc_mutex_unlock(&recorder->mutex);
}

void
sc_recorder_join(struct sc_recorder *recorder) {
sc_thread_join(&recorder->thread, NULL);
}

void
sc_recorder_destroy(struct sc_recorder *recorder) {
sc_cond_destroy(&recorder->stream_cond);
sc_cond_destroy(&recorder->queue_cond);
sc_mutex_destroy(&recorder->mutex);
free(recorder->filename);
Expand Down
14 changes: 12 additions & 2 deletions app/src/recorder.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,14 @@ struct sc_recorder {
sc_thread thread;
sc_mutex mutex;
sc_cond queue_cond;
bool stopped; // set on recorder_close()
bool failed; // set on packet write failure
// set on sc_recorder_stop(), packet_sink close or recording failure
bool stopped;
struct sc_recorder_queue queue;

// wake up the recorder thread once the codec in known
sc_cond stream_cond;
const AVCodec *codec;

const struct sc_recorder_callbacks *cbs;
void *cbs_userdata;
};
Expand All @@ -50,6 +54,12 @@ sc_recorder_init(struct sc_recorder *recorder, const char *filename,
struct sc_size declared_frame_size,
const struct sc_recorder_callbacks *cbs, void *cbs_userdata);

void
sc_recorder_stop(struct sc_recorder *recorder);

void
sc_recorder_join(struct sc_recorder *recorder);

void
sc_recorder_destroy(struct sc_recorder *recorder);

Expand Down
4 changes: 4 additions & 0 deletions app/src/scrcpy.c
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,9 @@ scrcpy(struct scrcpy_options *options) {
if (file_pusher_initialized) {
sc_file_pusher_stop(&s->file_pusher);
}
if (recorder_initialized) {
sc_recorder_stop(&s->recorder);
}
if (screen_initialized) {
sc_screen_interrupt(&s->screen);
}
Expand Down Expand Up @@ -706,6 +709,7 @@ scrcpy(struct scrcpy_options *options) {
}

if (recorder_initialized) {
sc_recorder_join(&s->recorder);
sc_recorder_destroy(&s->recorder);
}

Expand Down
1 change: 1 addition & 0 deletions app/src/trait/packet_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ struct sc_packet_sink {
};

struct sc_packet_sink_ops {
/* The codec instance is static, it is valid until the end of the program */
bool (*open)(struct sc_packet_sink *sink, const AVCodec *codec);
void (*close)(struct sc_packet_sink *sink);
bool (*push)(struct sc_packet_sink *sink, const AVPacket *packet);
Expand Down

0 comments on commit a039124

Please # to comment.