Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Restream buffered frames with minimum publish rate #88

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions include/web_video_server/image_streamer.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,19 @@ class ImageStreamer
ros::NodeHandle& nh);

virtual void start() = 0;
virtual ~ImageStreamer();

bool isInactive()
{
return inactive_;
}
;

/**
* Restreams the last received image frame if older than max_age.
*/
virtual void restreamFrame(double max_age) = 0;

std::string getTopic()
{
return topic_;
Expand All @@ -45,19 +51,24 @@ class ImageTransportImageStreamer : public ImageStreamer
public:
ImageTransportImageStreamer(const async_web_server_cpp::HttpRequest &request, async_web_server_cpp::HttpConnectionPtr connection,
ros::NodeHandle& nh);

virtual ~ImageTransportImageStreamer();
virtual void start();

protected:
virtual void sendImage(const cv::Mat &, const ros::Time &time) = 0;

virtual void restreamFrame(double max_age);
virtual void initialize(const cv::Mat &);

image_transport::Subscriber image_sub_;
int output_width_;
int output_height_;
bool invert_;
std::string default_transport_;

ros::Time last_frame;
cv::Mat output_size_image;
boost::mutex send_mutex_;

private:
image_transport::ImageTransport it_;
bool initialized_;
Expand Down
4 changes: 2 additions & 2 deletions include/web_video_server/jpeg_streamers.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class MjpegStreamer : public ImageTransportImageStreamer
public:
MjpegStreamer(const async_web_server_cpp::HttpRequest &request, async_web_server_cpp::HttpConnectionPtr connection,
ros::NodeHandle& nh);

~MjpegStreamer();
protected:
virtual void sendImage(const cv::Mat &, const ros::Time &time);

Expand All @@ -38,7 +38,7 @@ class JpegSnapshotStreamer : public ImageTransportImageStreamer
public:
JpegSnapshotStreamer(const async_web_server_cpp::HttpRequest &request,
async_web_server_cpp::HttpConnectionPtr connection, ros::NodeHandle& nh);

~JpegSnapshotStreamer();
protected:
virtual void sendImage(const cv::Mat &, const ros::Time &time);

Expand Down
4 changes: 2 additions & 2 deletions include/web_video_server/png_streamers.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class PngStreamer : public ImageTransportImageStreamer
public:
PngStreamer(const async_web_server_cpp::HttpRequest &request, async_web_server_cpp::HttpConnectionPtr connection,
ros::NodeHandle& nh);

~PngStreamer();
protected:
virtual void sendImage(const cv::Mat &, const ros::Time &time);

Expand All @@ -38,7 +38,7 @@ class PngSnapshotStreamer : public ImageTransportImageStreamer
public:
PngSnapshotStreamer(const async_web_server_cpp::HttpRequest &request,
async_web_server_cpp::HttpConnectionPtr connection, ros::NodeHandle& nh);

~PngSnapshotStreamer();
protected:
virtual void sendImage(const cv::Mat &, const ros::Time &time);

Expand Down
10 changes: 9 additions & 1 deletion include/web_video_server/ros_compressed_streamer.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,21 @@ class RosCompressedStreamer : public ImageStreamer
public:
RosCompressedStreamer(const async_web_server_cpp::HttpRequest &request, async_web_server_cpp::HttpConnectionPtr connection,
ros::NodeHandle& nh);
~RosCompressedStreamer();

virtual void start();
virtual void restreamFrame(double max_age);

protected:
virtual void sendImage(const sensor_msgs::CompressedImageConstPtr &msg, const ros::Time &time);

private:
void imageCallback(const sensor_msgs::CompressedImageConstPtr &msg);

MultipartStream stream_;
ros::Subscriber image_sub_;
ros::Time last_frame;
sensor_msgs::CompressedImageConstPtr last_msg;
boost::mutex send_mutex_;
};

class RosCompressedStreamerType : public ImageStreamerType
Expand Down
2 changes: 2 additions & 0 deletions include/web_video_server/web_video_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class WebVideoServer
async_web_server_cpp::HttpConnectionPtr connection, const char* begin, const char* end);

private:
void restreamFrames(double max_age);
void cleanup_inactive_streams();

ros::NodeHandle nh_;
Expand All @@ -59,6 +60,7 @@ class WebVideoServer
ros::Timer cleanup_timer_;
#endif
int ros_threads_;
double publish_rate_;
int port_;
std::string address_;
boost::shared_ptr<async_web_server_cpp::HttpServer> server_;
Expand Down
46 changes: 44 additions & 2 deletions src/image_streamer.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "web_video_server/image_streamer.h"
#include <cv_bridge/cv_bridge.h>
#include <iostream>

namespace web_video_server
{
Expand All @@ -11,6 +12,10 @@ ImageStreamer::ImageStreamer(const async_web_server_cpp::HttpRequest &request,
topic_ = request.get_query_param_value_or_default("topic", "");
}

ImageStreamer::~ImageStreamer()
{
}

ImageTransportImageStreamer::ImageTransportImageStreamer(const async_web_server_cpp::HttpRequest &request,
async_web_server_cpp::HttpConnectionPtr connection, ros::NodeHandle& nh) :
ImageStreamer(request, connection, nh), it_(nh), initialized_(false)
Expand All @@ -21,6 +26,10 @@ ImageTransportImageStreamer::ImageTransportImageStreamer(const async_web_server_
default_transport_ = request.get_query_param_value_or_default("default_transport", "raw");
}

ImageTransportImageStreamer::~ImageTransportImageStreamer()
{
}

void ImageTransportImageStreamer::start()
{
image_transport::TransportHints hints(default_transport_);
Expand All @@ -41,6 +50,37 @@ void ImageTransportImageStreamer::initialize(const cv::Mat &)
{
}

void ImageTransportImageStreamer::restreamFrame(double max_age)
{
if (inactive_ || !initialized_ )
return;
try {
if ( last_frame + ros::Duration(max_age) < ros::Time::now() ) {
boost::mutex::scoped_lock lock(send_mutex_);
sendImage(output_size_image, ros::Time::now() ); // don't update last_frame, it may remain an old value.
}
}
catch (boost::system::system_error &e)
{
// happens when client disconnects
ROS_DEBUG("system_error exception: %s", e.what());
inactive_ = true;
return;
}
catch (std::exception &e)
{
ROS_ERROR_THROTTLE(30, "exception: %s", e.what());
inactive_ = true;
return;
}
catch (...)
{
ROS_ERROR_THROTTLE(30, "exception");
inactive_ = true;
return;
}
}

void ImageTransportImageStreamer::imageCallback(const sensor_msgs::ImageConstPtr &msg)
{
if (inactive_)
Expand Down Expand Up @@ -84,7 +124,7 @@ void ImageTransportImageStreamer::imageCallback(const sensor_msgs::ImageConstPtr
cv::flip(img, img, true);
}

cv::Mat output_size_image;
boost::mutex::scoped_lock lock(send_mutex_); // protects output_size_image
if (output_width_ != input_width || output_height_ != input_height)
{
cv::Mat img_resized;
Expand All @@ -102,7 +142,9 @@ void ImageTransportImageStreamer::imageCallback(const sensor_msgs::ImageConstPtr
initialize(output_size_image);
initialized_ = true;
}
sendImage(output_size_image, msg->header.stamp);

last_frame = ros::Time::now();
sendImage(output_size_image, last_frame );

}
catch (cv_bridge::Exception &e)
Expand Down
32 changes: 25 additions & 7 deletions src/jpeg_streamers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ MjpegStreamer::MjpegStreamer(const async_web_server_cpp::HttpRequest &request,
stream_.sendInitialHeader();
}

MjpegStreamer::~MjpegStreamer()
{
this->inactive_ = true;
boost::mutex::scoped_lock lock(send_mutex_); // protects sendImage.
}

void MjpegStreamer::sendImage(const cv::Mat &img, const ros::Time &time)
{
std::vector<int> encode_params;
Expand Down Expand Up @@ -48,6 +54,12 @@ JpegSnapshotStreamer::JpegSnapshotStreamer(const async_web_server_cpp::HttpReque
quality_ = request.get_query_param_value_or_default<int>("quality", 95);
}

JpegSnapshotStreamer::~JpegSnapshotStreamer()
{
this->inactive_ = true;
boost::mutex::scoped_lock lock(send_mutex_); // protects sendImage.
}

void JpegSnapshotStreamer::sendImage(const cv::Mat &img, const ros::Time &time)
{
std::vector<int> encode_params;
Expand All @@ -59,13 +71,19 @@ void JpegSnapshotStreamer::sendImage(const cv::Mat &img, const ros::Time &time)

char stamp[20];
sprintf(stamp, "%.06lf", time.toSec());
async_web_server_cpp::HttpReply::builder(async_web_server_cpp::HttpReply::ok).header("Connection", "close").header(
"Server", "web_video_server").header("Cache-Control",
"no-cache, no-store, must-revalidate, pre-check=0, post-check=0, max-age=0").header(
"X-Timestamp", stamp).header("Pragma", "no-cache").header("Content-type", "image/jpeg").header(
"Access-Control-Allow-Origin", "*").header("Content-Length",
boost::lexical_cast<std::string>(encoded_buffer.size())).write(
connection_);
async_web_server_cpp::HttpReply::builder(async_web_server_cpp::HttpReply::ok)
.header("Connection", "close")
.header("Server", "web_video_server")
.header("Cache-Control",
"no-cache, no-store, must-revalidate, pre-check=0, post-check=0, "
"max-age=0")
.header("X-Timestamp", stamp)
.header("Pragma", "no-cache")
.header("Content-type", "image/jpeg")
.header("Access-Control-Allow-Origin", "*")
.header("Content-Length",
boost::lexical_cast<std::string>(encoded_buffer.size()))
.write(connection_);
connection_->write_and_clear(encoded_buffer);
inactive_ = true;
}
Expand Down
32 changes: 25 additions & 7 deletions src/png_streamers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ PngStreamer::PngStreamer(const async_web_server_cpp::HttpRequest &request,
stream_.sendInitialHeader();
}

PngStreamer::~PngStreamer()
{
this->inactive_ = true;
boost::mutex::scoped_lock lock(send_mutex_); // protects sendImage.
}

void PngStreamer::sendImage(const cv::Mat &img, const ros::Time &time)
{
std::vector<int> encode_params;
Expand Down Expand Up @@ -48,6 +54,12 @@ PngSnapshotStreamer::PngSnapshotStreamer(const async_web_server_cpp::HttpRequest
quality_ = request.get_query_param_value_or_default<int>("quality", 3);
}

PngSnapshotStreamer::~PngSnapshotStreamer()
{
this->inactive_ = true;
boost::mutex::scoped_lock lock(send_mutex_); // protects sendImage.
}

void PngSnapshotStreamer::sendImage(const cv::Mat &img, const ros::Time &time)
{
std::vector<int> encode_params;
Expand All @@ -59,13 +71,19 @@ void PngSnapshotStreamer::sendImage(const cv::Mat &img, const ros::Time &time)

char stamp[20];
sprintf(stamp, "%.06lf", time.toSec());
async_web_server_cpp::HttpReply::builder(async_web_server_cpp::HttpReply::ok).header("Connection", "close").header(
"Server", "web_video_server").header("Cache-Control",
"no-cache, no-store, must-revalidate, pre-check=0, post-check=0, max-age=0").header(
"X-Timestamp", stamp).header("Pragma", "no-cache").header("Content-type", "image/png").header(
"Access-Control-Allow-Origin", "*").header("Content-Length",
boost::lexical_cast<std::string>(encoded_buffer.size())).write(
connection_);
async_web_server_cpp::HttpReply::builder(async_web_server_cpp::HttpReply::ok)
.header("Connection", "close")
.header("Server", "web_video_server")
.header("Cache-Control",
"no-cache, no-store, must-revalidate, pre-check=0, post-check=0, "
"max-age=0")
.header("X-Timestamp", stamp)
.header("Pragma", "no-cache")
.header("Content-type", "image/png")
.header("Access-Control-Allow-Origin", "*")
.header("Content-Length",
boost::lexical_cast<std::string>(encoded_buffer.size()))
.write(connection_);
connection_->write_and_clear(encoded_buffer);
inactive_ = true;
}
Expand Down
30 changes: 28 additions & 2 deletions src/ros_compressed_streamer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,30 @@ RosCompressedStreamer::RosCompressedStreamer(const async_web_server_cpp::HttpReq
stream_.sendInitialHeader();
}

RosCompressedStreamer::~RosCompressedStreamer()
{
this->inactive_ = true;
boost::mutex::scoped_lock lock(send_mutex_); // protects sendImage.
}

void RosCompressedStreamer::start() {
std::string compressed_topic = topic_ + "/compressed";
image_sub_ = nh_.subscribe(compressed_topic, 1, &RosCompressedStreamer::imageCallback, this);
}

void RosCompressedStreamer::imageCallback(const sensor_msgs::CompressedImageConstPtr &msg) {
void RosCompressedStreamer::restreamFrame(double max_age)
{
if (inactive_ || (last_msg == 0))
return;

if ( last_frame + ros::Duration(max_age) < ros::Time::now() ) {
boost::mutex::scoped_lock lock(send_mutex_);
sendImage(last_msg, ros::Time::now() ); // don't update last_frame, it may remain an old value.
}
}

void RosCompressedStreamer::sendImage(const sensor_msgs::CompressedImageConstPtr &msg,
const ros::Time &time) {
try {
std::string content_type;
if(msg->format.find("jpeg") != std::string::npos) {
Expand All @@ -29,7 +47,7 @@ void RosCompressedStreamer::imageCallback(const sensor_msgs::CompressedImageCons
return;
}

stream_.sendPart(msg->header.stamp, content_type, boost::asio::buffer(msg->data), msg);
stream_.sendPart(time, content_type, boost::asio::buffer(msg->data), msg);
}
catch (boost::system::system_error &e)
{
Expand All @@ -53,6 +71,14 @@ void RosCompressedStreamer::imageCallback(const sensor_msgs::CompressedImageCons
}


void RosCompressedStreamer::imageCallback(const sensor_msgs::CompressedImageConstPtr &msg) {
boost::mutex::scoped_lock lock(send_mutex_); // protects last_msg and last_frame
last_msg = msg;
last_frame = ros::Time(msg->header.stamp.sec, msg->header.stamp.nsec);
sendImage(last_msg, last_frame);
}


boost::shared_ptr<ImageStreamer> RosCompressedStreamerType::create_streamer(const async_web_server_cpp::HttpRequest &request,
async_web_server_cpp::HttpConnectionPtr connection,
ros::NodeHandle& nh)
Expand Down
Loading