diff --git a/CHANGELOG.rst b/CHANGELOG.rst index c216909..fdb6121 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -2,6 +2,33 @@ Changelog for package web_video_server ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +0.2.1 (2019-06-05) +------------------ +* Restream buffered frames with minimum publish rate (`#88 `_) + * Restream buffered frames with minimum publish rate + * Implement restreaming for ros_compressed_streamer +* Update travis config (`#89 `_) +* Fall back to mjpeg if ros_compressed is unavailable (`#87 `_) +* Contributors: Jihoon Lee, Viktor Kunovski, sfalexrog + +0.2.0 (2019-01-30) +------------------ +* Add "default_stream_type" parameter (`#84 `_) + This allows users to specify default stream type in their .launch files. Using a "ros_compressed" stream type sometimes + results in a much lower resource consumption, and having it set as a default is much nicer for end users. +* Add a workaround for MultipartStream constant busy state (`#83 `_) + * Add a workaround for MultipartStream constant busy state + * Remove C++11 features +* lax rule for topic name (`#77 `_) +* Add PngStreamer (`#74 `_) +* fix SteadyTimer check for backported ROS versions (`#71 `_) + i.e. on current kinetic +* Pkg format 2 (`#68 `_) + * use package format 2 + * add missing dependency on sensor_msgs +* fixed undeclared CODEC_FLAG_GLOBAL_HEADER (`#65 `_) +* Contributors: Andreas Klintberg, Dirk Thomas, Felix Ruess, Kazuto Murase, Viktor Kunovski, sfalexrog + 0.1.0 (2018-07-01) ------------------ * Avoid queuing of images on slow ethernet connection (`#64 `_) diff --git a/CMakeLists.txt b/CMakeLists.txt index d291cbc..8d756a4 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -49,7 +49,9 @@ add_executable(${PROJECT_NAME} src/vp9_streamer.cpp src/multipart_stream.cpp src/ros_compressed_streamer.cpp - src/jpeg_streamers.cpp) + src/jpeg_streamers.cpp + src/png_streamers.cpp +) ament_target_dependencies(${PROJECT_NAME} async_web_server_cpp cv_bridge image_transport rclcpp sensor_msgs) diff --git a/include/web_video_server/image_streamer.h b/include/web_video_server/image_streamer.h index 23ca030..c2f5a07 100644 --- a/include/web_video_server/image_streamer.h +++ b/include/web_video_server/image_streamer.h @@ -19,6 +19,7 @@ class ImageStreamer rclcpp::Node::SharedPtr nh); virtual void start() = 0; + virtual ~ImageStreamer(); bool isInactive() { @@ -26,6 +27,11 @@ class ImageStreamer } ; + /** + * Restreams the last received image frame if older than max_age. + */ + virtual void restreamFrame(double max_age) = 0; + std::string getTopic() { return topic_; @@ -46,12 +52,13 @@ class ImageTransportImageStreamer : public ImageStreamer public: ImageTransportImageStreamer(const async_web_server_cpp::HttpRequest &request, async_web_server_cpp::HttpConnectionPtr connection, rclcpp::Node::SharedPtr nh); + virtual ~ImageTransportImageStreamer(); virtual void start(); protected: virtual void sendImage(const cv::Mat &, const rclcpp::Time &time) = 0; - + virtual void restreamFrame(double max_age); virtual void initialize(const cv::Mat &); image_transport::Subscriber image_sub_; @@ -59,6 +66,11 @@ class ImageTransportImageStreamer : public ImageStreamer int output_height_; bool invert_; std::string default_transport_; + + rclcpp::Time last_frame; + cv::Mat output_size_image; + boost::mutex send_mutex_; + private: image_transport::ImageTransport it_; bool initialized_; diff --git a/include/web_video_server/jpeg_streamers.h b/include/web_video_server/jpeg_streamers.h index 8dead91..30a202b 100644 --- a/include/web_video_server/jpeg_streamers.h +++ b/include/web_video_server/jpeg_streamers.h @@ -15,7 +15,7 @@ class MjpegStreamer : public ImageTransportImageStreamer public: MjpegStreamer(const async_web_server_cpp::HttpRequest &request, async_web_server_cpp::HttpConnectionPtr connection, rclcpp::Node::SharedPtr nh); - + ~MjpegStreamer(); protected: virtual void sendImage(const cv::Mat &, const rclcpp::Time &time); @@ -38,7 +38,7 @@ class JpegSnapshotStreamer : public ImageTransportImageStreamer public: JpegSnapshotStreamer(const async_web_server_cpp::HttpRequest &request, async_web_server_cpp::HttpConnectionPtr connection, rclcpp::Node::SharedPtr nh); - + ~JpegSnapshotStreamer(); protected: virtual void sendImage(const cv::Mat &, const rclcpp::Time &time); diff --git a/include/web_video_server/multipart_stream.h b/include/web_video_server/multipart_stream.h index 0a867ec..d6ea359 100644 --- a/include/web_video_server/multipart_stream.h +++ b/include/web_video_server/multipart_stream.h @@ -9,15 +9,21 @@ namespace web_video_server { +struct PendingFooter { + rclcpp::Time timestamp; + std::weak_ptr contents; +}; + class MultipartStream { public: - MultipartStream(async_web_server_cpp::HttpConnectionPtr& connection, + MultipartStream(std::function get_now, + async_web_server_cpp::HttpConnectionPtr& connection, const std::string& boundry="boundarydonotcross", std::size_t max_queue_size=1); void sendInitialHeader(); void sendPartHeader(const rclcpp::Time &time, const std::string& type, size_t payload_size); - void sendPartFooter(); + void sendPartFooter(const rclcpp::Time &time); void sendPartAndClear(const rclcpp::Time &time, const std::string& type, std::vector &data); void sendPart(const rclcpp::Time &time, const std::string& type, const boost::asio::const_buffer &buffer, async_web_server_cpp::HttpConnection::ResourcePtr resource); @@ -26,10 +32,11 @@ class MultipartStream { bool isBusy(); private: + std::function get_now_; const std::size_t max_queue_size_; async_web_server_cpp::HttpConnectionPtr connection_; std::string boundry_; - std::queue > pending_footers_; + std::queue pending_footers_; }; } diff --git a/include/web_video_server/png_streamers.h b/include/web_video_server/png_streamers.h new file mode 100644 index 0000000..197ecc3 --- /dev/null +++ b/include/web_video_server/png_streamers.h @@ -0,0 +1,51 @@ +#ifndef PNG_STREAMERS_H_ +#define PNG_STREAMERS_H_ + +#include +#include "web_video_server/image_streamer.h" +#include "async_web_server_cpp/http_request.hpp" +#include "async_web_server_cpp/http_connection.hpp" +#include "web_video_server/multipart_stream.h" + +namespace web_video_server +{ + +class PngStreamer : public ImageTransportImageStreamer +{ +public: + PngStreamer(const async_web_server_cpp::HttpRequest &request, async_web_server_cpp::HttpConnectionPtr connection, + rclcpp::Node::SharedPtr nh); + ~PngStreamer(); +protected: + virtual void sendImage(const cv::Mat &, const rclcpp::Time &time); + +private: + MultipartStream stream_; + int quality_; +}; + +class PngStreamerType : public ImageStreamerType +{ +public: + boost::shared_ptr create_streamer(const async_web_server_cpp::HttpRequest &request, + async_web_server_cpp::HttpConnectionPtr connection, + rclcpp::Node::SharedPtr nh); + std::string create_viewer(const async_web_server_cpp::HttpRequest &request); +}; + +class PngSnapshotStreamer : public ImageTransportImageStreamer +{ +public: + PngSnapshotStreamer(const async_web_server_cpp::HttpRequest &request, + async_web_server_cpp::HttpConnectionPtr connection, rclcpp::Node::SharedPtr nh); + ~PngSnapshotStreamer(); +protected: + virtual void sendImage(const cv::Mat &, const rclcpp::Time &time); + +private: + int quality_; +}; + +} + +#endif diff --git a/include/web_video_server/ros_compressed_streamer.h b/include/web_video_server/ros_compressed_streamer.h index 2b1d9e3..7219dfd 100644 --- a/include/web_video_server/ros_compressed_streamer.h +++ b/include/web_video_server/ros_compressed_streamer.h @@ -15,13 +15,20 @@ class RosCompressedStreamer : public ImageStreamer public: RosCompressedStreamer(const async_web_server_cpp::HttpRequest &request, async_web_server_cpp::HttpConnectionPtr connection, rclcpp::Node::SharedPtr nh); + ~RosCompressedStreamer(); virtual void start(); + virtual void restreamFrame(double max_age); + +protected: + virtual void sendImage(const sensor_msgs::msg::CompressedImage::ConstSharedPtr msg, const rclcpp::Time &time); private: void imageCallback(const sensor_msgs::msg::CompressedImage::ConstSharedPtr msg); - MultipartStream stream_; rclcpp::Subscription::SharedPtr image_sub_; + rclcpp::Time last_frame; + sensor_msgs::msg::CompressedImage::ConstSharedPtr last_msg; + boost::mutex send_mutex_; }; class RosCompressedStreamerType : public ImageStreamerType diff --git a/include/web_video_server/web_video_server.h b/include/web_video_server/web_video_server.h index 50c5dfc..1da6015 100644 --- a/include/web_video_server/web_video_server.h +++ b/include/web_video_server/web_video_server.h @@ -50,11 +50,13 @@ class WebVideoServer async_web_server_cpp::HttpConnectionPtr connection, const char* begin, const char* end); private: + void restreamFrames(double max_age); void cleanup_inactive_streams(); rclcpp::Node::SharedPtr nh_; rclcpp::WallTimer::SharedPtr cleanup_timer_; int ros_threads_; + double publish_rate_; int port_; std::string address_; boost::shared_ptr server_; diff --git a/package.xml b/package.xml index 60de407..1154d29 100644 --- a/package.xml +++ b/package.xml @@ -1,7 +1,7 @@ web_video_server - 0.1.0 + 0.2.1 HTTP Streaming of ROS Image Topics in Multiple Formats Russell Toris diff --git a/src/image_streamer.cpp b/src/image_streamer.cpp index 603d85f..f0a30f1 100644 --- a/src/image_streamer.cpp +++ b/src/image_streamer.cpp @@ -1,5 +1,6 @@ #include "web_video_server/image_streamer.h" #include +#include namespace web_video_server { @@ -11,6 +12,10 @@ ImageStreamer::ImageStreamer(const async_web_server_cpp::HttpRequest &request, topic_ = request.get_query_param_value_or_default("topic", ""); } +ImageStreamer::~ImageStreamer() +{ +} + ImageTransportImageStreamer::ImageTransportImageStreamer(const async_web_server_cpp::HttpRequest &request, async_web_server_cpp::HttpConnectionPtr connection, rclcpp::Node::SharedPtr nh) : ImageStreamer(request, connection, nh), it_(nh), initialized_(false) @@ -21,6 +26,10 @@ ImageTransportImageStreamer::ImageTransportImageStreamer(const async_web_server_ default_transport_ = request.get_query_param_value_or_default("default_transport", "raw"); } +ImageTransportImageStreamer::~ImageTransportImageStreamer() +{ +} + void ImageTransportImageStreamer::start() { image_transport::TransportHints hints(nh_.get(), default_transport_); @@ -32,7 +41,7 @@ void ImageTransportImageStreamer::start() break; } auto & topic_name = topic_and_types.first; - if(topic_name == topic_){ + if(topic_name == topic_ || (topic_name.find("/") == 0 && topic_name.substr(1) == topic_)){ inactive_ = false; break; } @@ -44,6 +53,39 @@ void ImageTransportImageStreamer::initialize(const cv::Mat &) { } +void ImageTransportImageStreamer::restreamFrame(double max_age) +{ + if (inactive_ || !initialized_ ) + return; + try { + if ( last_frame + rclcpp::Duration(max_age) < nh_->now() ) { + boost::mutex::scoped_lock lock(send_mutex_); + sendImage(output_size_image, nh_->now() ); // don't update last_frame, it may remain an old value. + } + } + catch (boost::system::system_error &e) + { + // happens when client disconnects + RCLCPP_DEBUG(nh_->get_logger(), "system_error exception: %s", e.what()); + inactive_ = true; + return; + } + catch (std::exception &e) + { + // TODO THROTTLE with 30 + RCLCPP_ERROR(nh_->get_logger(), "exception: %s", e.what()); + inactive_ = true; + return; + } + catch (...) + { + // TODO THROTTLE with 30 + RCLCPP_ERROR(nh_->get_logger(), "exception"); + inactive_ = true; + return; + } +} + void ImageTransportImageStreamer::imageCallback(const sensor_msgs::msg::Image::ConstSharedPtr &msg) { if (inactive_) @@ -87,7 +129,7 @@ void ImageTransportImageStreamer::imageCallback(const sensor_msgs::msg::Image::C cv::flip(img, img, true); } - cv::Mat output_size_image; + boost::mutex::scoped_lock lock(send_mutex_); // protects output_size_image if (output_width_ != input_width || output_height_ != input_height) { cv::Mat img_resized; @@ -105,7 +147,9 @@ void ImageTransportImageStreamer::imageCallback(const sensor_msgs::msg::Image::C initialize(output_size_image); initialized_ = true; } - sendImage(output_size_image, msg->header.stamp); + + last_frame = nh_->now(); + sendImage(output_size_image, last_frame ); } catch (cv_bridge::Exception &e) diff --git a/src/jpeg_streamers.cpp b/src/jpeg_streamers.cpp index c05feb3..029cb96 100644 --- a/src/jpeg_streamers.cpp +++ b/src/jpeg_streamers.cpp @@ -6,12 +6,18 @@ namespace web_video_server MjpegStreamer::MjpegStreamer(const async_web_server_cpp::HttpRequest &request, async_web_server_cpp::HttpConnectionPtr connection, rclcpp::Node::SharedPtr nh) : - ImageTransportImageStreamer(request, connection, nh), stream_(connection) + ImageTransportImageStreamer(request, connection, nh), stream_(std::bind(&rclcpp::Node::now, nh), connection) { quality_ = request.get_query_param_value_or_default("quality", 95); stream_.sendInitialHeader(); } +MjpegStreamer::~MjpegStreamer() +{ + this->inactive_ = true; + boost::mutex::scoped_lock lock(send_mutex_); // protects sendImage. +} + void MjpegStreamer::sendImage(const cv::Mat &img, const rclcpp::Time &time) { std::vector encode_params; @@ -48,6 +54,12 @@ JpegSnapshotStreamer::JpegSnapshotStreamer(const async_web_server_cpp::HttpReque quality_ = request.get_query_param_value_or_default("quality", 95); } +JpegSnapshotStreamer::~JpegSnapshotStreamer() +{ + this->inactive_ = true; + boost::mutex::scoped_lock lock(send_mutex_); // protects sendImage. +} + void JpegSnapshotStreamer::sendImage(const cv::Mat &img, const rclcpp::Time &time) { std::vector encode_params; @@ -59,13 +71,19 @@ void JpegSnapshotStreamer::sendImage(const cv::Mat &img, const rclcpp::Time &tim char stamp[20]; sprintf(stamp, "%.06lf", time.seconds()); - async_web_server_cpp::HttpReply::builder(async_web_server_cpp::HttpReply::ok).header("Connection", "close").header( - "Server", "web_video_server").header("Cache-Control", - "no-cache, no-store, must-revalidate, pre-check=0, post-check=0, max-age=0").header( - "X-Timestamp", stamp).header("Pragma", "no-cache").header("Content-type", "image/jpeg").header( - "Access-Control-Allow-Origin", "*").header("Content-Length", - boost::lexical_cast(encoded_buffer.size())).write( - connection_); + async_web_server_cpp::HttpReply::builder(async_web_server_cpp::HttpReply::ok) + .header("Connection", "close") + .header("Server", "web_video_server") + .header("Cache-Control", + "no-cache, no-store, must-revalidate, pre-check=0, post-check=0, " + "max-age=0") + .header("X-Timestamp", stamp) + .header("Pragma", "no-cache") + .header("Content-type", "image/jpeg") + .header("Access-Control-Allow-Origin", "*") + .header("Content-Length", + boost::lexical_cast(encoded_buffer.size())) + .write(connection_); connection_->write_and_clear(encoded_buffer); inactive_ = true; } diff --git a/src/multipart_stream.cpp b/src/multipart_stream.cpp index 8afed99..caf60b7 100644 --- a/src/multipart_stream.cpp +++ b/src/multipart_stream.cpp @@ -5,10 +5,11 @@ namespace web_video_server { MultipartStream::MultipartStream( + std::function get_now, async_web_server_cpp::HttpConnectionPtr& connection, const std::string& boundry, std::size_t max_queue_size) - : connection_(connection), boundry_(boundry), max_queue_size_(max_queue_size) + : get_now_(get_now), connection_(connection), boundry_(boundry), max_queue_size_(max_queue_size) {} void MultipartStream::sendInitialHeader() { @@ -32,10 +33,13 @@ void MultipartStream::sendPartHeader(const rclcpp::Time &time, const std::string connection_->write(async_web_server_cpp::HttpReply::to_buffers(*headers), headers); } -void MultipartStream::sendPartFooter() { +void MultipartStream::sendPartFooter(const rclcpp::Time &time) { std::shared_ptr str(new std::string("\r\n--"+boundry_+"\r\n")); + PendingFooter pf; + pf.timestamp = time; + pf.contents = str; connection_->write(boost::asio::buffer(*str), str); - if (max_queue_size_ > 0) pending_footers_.push(str); + if (max_queue_size_ > 0) pending_footers_.push(pf); } void MultipartStream::sendPartAndClear(const rclcpp::Time &time, const std::string& type, @@ -44,7 +48,7 @@ void MultipartStream::sendPartAndClear(const rclcpp::Time &time, const std::stri { sendPartHeader(time, type, data.size()); connection_->write_and_clear(data); - sendPartFooter(); + sendPartFooter(time); } } @@ -55,14 +59,24 @@ void MultipartStream::sendPart(const rclcpp::Time &time, const std::string& type { sendPartHeader(time, type, boost::asio::buffer_size(buffer)); connection_->write(buffer, resource); - sendPartFooter(); + sendPartFooter(time); } } bool MultipartStream::isBusy() { - while (!pending_footers_.empty() && pending_footers_.front().expired()) + rclcpp::Time currentTime = get_now_(); + while (!pending_footers_.empty()) { - pending_footers_.pop(); + if (pending_footers_.front().contents.expired()) { + pending_footers_.pop(); + } else { + rclcpp::Time footerTime = pending_footers_.front().timestamp; + if ((currentTime - footerTime).seconds() > 0.5) { + pending_footers_.pop(); + } else { + break; + } + } } return !(max_queue_size_ == 0 || pending_footers_.size() < max_queue_size_); } diff --git a/src/png_streamers.cpp b/src/png_streamers.cpp new file mode 100644 index 0000000..692e48b --- /dev/null +++ b/src/png_streamers.cpp @@ -0,0 +1,91 @@ +#include "web_video_server/png_streamers.h" +#include "async_web_server_cpp/http_reply.hpp" + +namespace web_video_server +{ + +PngStreamer::PngStreamer(const async_web_server_cpp::HttpRequest &request, + async_web_server_cpp::HttpConnectionPtr connection, rclcpp::Node::SharedPtr nh) : + ImageTransportImageStreamer(request, connection, nh), stream_(std::bind(&rclcpp::Node::now, nh), connection) +{ + quality_ = request.get_query_param_value_or_default("quality", 3); + stream_.sendInitialHeader(); +} + +PngStreamer::~PngStreamer() +{ + this->inactive_ = true; + boost::mutex::scoped_lock lock(send_mutex_); // protects sendImage. +} + +void PngStreamer::sendImage(const cv::Mat &img, const rclcpp::Time &time) +{ + std::vector encode_params; + encode_params.push_back(CV_IMWRITE_PNG_COMPRESSION); + encode_params.push_back(quality_); + + std::vector encoded_buffer; + cv::imencode(".png", img, encoded_buffer, encode_params); + + stream_.sendPartAndClear(time, "image/png", encoded_buffer); +} + +boost::shared_ptr PngStreamerType::create_streamer(const async_web_server_cpp::HttpRequest &request, + async_web_server_cpp::HttpConnectionPtr connection, + rclcpp::Node::SharedPtr nh) +{ + return boost::shared_ptr(new PngStreamer(request, connection, nh)); +} + +std::string PngStreamerType::create_viewer(const async_web_server_cpp::HttpRequest &request) +{ + std::stringstream ss; + ss << ""; + return ss.str(); +} + +PngSnapshotStreamer::PngSnapshotStreamer(const async_web_server_cpp::HttpRequest &request, + async_web_server_cpp::HttpConnectionPtr connection, + rclcpp::Node::SharedPtr nh) : + ImageTransportImageStreamer(request, connection, nh) +{ + quality_ = request.get_query_param_value_or_default("quality", 3); +} + +PngSnapshotStreamer::~PngSnapshotStreamer() +{ + this->inactive_ = true; + boost::mutex::scoped_lock lock(send_mutex_); // protects sendImage. +} + +void PngSnapshotStreamer::sendImage(const cv::Mat &img, const rclcpp::Time &time) +{ + std::vector encode_params; + encode_params.push_back(CV_IMWRITE_PNG_COMPRESSION); + encode_params.push_back(quality_); + + std::vector encoded_buffer; + cv::imencode(".png", img, encoded_buffer, encode_params); + + char stamp[20]; + sprintf(stamp, "%.06lf", time.seconds()); + async_web_server_cpp::HttpReply::builder(async_web_server_cpp::HttpReply::ok) + .header("Connection", "close") + .header("Server", "web_video_server") + .header("Cache-Control", + "no-cache, no-store, must-revalidate, pre-check=0, post-check=0, " + "max-age=0") + .header("X-Timestamp", stamp) + .header("Pragma", "no-cache") + .header("Content-type", "image/png") + .header("Access-Control-Allow-Origin", "*") + .header("Content-Length", + boost::lexical_cast(encoded_buffer.size())) + .write(connection_); + connection_->write_and_clear(encoded_buffer); + inactive_ = true; +} + +} diff --git a/src/ros_compressed_streamer.cpp b/src/ros_compressed_streamer.cpp index 1797da7..5ef2031 100644 --- a/src/ros_compressed_streamer.cpp +++ b/src/ros_compressed_streamer.cpp @@ -5,18 +5,36 @@ namespace web_video_server RosCompressedStreamer::RosCompressedStreamer(const async_web_server_cpp::HttpRequest &request, async_web_server_cpp::HttpConnectionPtr connection, rclcpp::Node::SharedPtr nh) : - ImageStreamer(request, connection, nh), stream_(connection) + ImageStreamer(request, connection, nh), stream_(std::bind(&rclcpp::Node::now, nh), connection) { stream_.sendInitialHeader(); } +RosCompressedStreamer::~RosCompressedStreamer() +{ + this->inactive_ = true; + boost::mutex::scoped_lock lock(send_mutex_); // protects sendImage. +} + void RosCompressedStreamer::start() { std::string compressed_topic = topic_ + "/compressed"; image_sub_ = nh_->create_subscription( compressed_topic, std::bind(&RosCompressedStreamer::imageCallback, this, std::placeholders::_1), 1); } -void RosCompressedStreamer::imageCallback(const sensor_msgs::msg::CompressedImage::ConstSharedPtr msg) { +void RosCompressedStreamer::restreamFrame(double max_age) +{ + if (inactive_ || (last_msg == 0)) + return; + + if ( last_frame + rclcpp::Duration(max_age) < nh_->now() ) { + boost::mutex::scoped_lock lock(send_mutex_); + sendImage(last_msg, nh_->now() ); // don't update last_frame, it may remain an old value. + } +} + +void RosCompressedStreamer::sendImage(const sensor_msgs::msg::CompressedImage::ConstSharedPtr msg, + const rclcpp::Time &time) { try { std::string content_type; if(msg->format.find("jpeg") != std::string::npos) { @@ -30,7 +48,7 @@ void RosCompressedStreamer::imageCallback(const sensor_msgs::msg::CompressedImag return; } - stream_.sendPart(rclcpp::Time(msg->header.stamp), content_type, boost::asio::buffer(msg->data), msg); + stream_.sendPart(time, content_type, boost::asio::buffer(msg->data), msg); } catch (boost::system::system_error &e) { @@ -56,6 +74,14 @@ void RosCompressedStreamer::imageCallback(const sensor_msgs::msg::CompressedImag } +void RosCompressedStreamer::imageCallback(const sensor_msgs::msg::CompressedImage::ConstSharedPtr msg) { + boost::mutex::scoped_lock lock(send_mutex_); // protects last_msg and last_frame + last_msg = msg; + last_frame = rclcpp::Time(msg->header.stamp.sec, msg->header.stamp.nanosec); + sendImage(last_msg, last_frame); +} + + boost::shared_ptr RosCompressedStreamerType::create_streamer(const async_web_server_cpp::HttpRequest &request, async_web_server_cpp::HttpConnectionPtr connection, rclcpp::Node::SharedPtr nh) diff --git a/src/web_video_server.cpp b/src/web_video_server.cpp index 89787ba..a442165 100644 --- a/src/web_video_server.cpp +++ b/src/web_video_server.cpp @@ -9,6 +9,7 @@ #include "web_video_server/web_video_server.h" #include "web_video_server/ros_compressed_streamer.h" #include "web_video_server/jpeg_streamers.h" +#include "web_video_server/png_streamers.h" #include "web_video_server/vp8_streamer.h" #include "web_video_server/h264_streamer.h" #include "web_video_server/vp9_streamer.h" @@ -21,6 +22,8 @@ namespace web_video_server static bool __verbose; +static std::string __default_stream_type; + static bool ros_connection_logger(async_web_server_cpp::HttpServerRequestHandler forward, const async_web_server_cpp::HttpRequest &request, async_web_server_cpp::HttpConnectionPtr connection, const char* begin, @@ -79,8 +82,20 @@ WebVideoServer::WebVideoServer(rclcpp::Node::SharedPtr &nh, rclcpp::Node::Shared } else { ros_threads_ = 2; } + if (private_nh->get_parameter("publish_rate", parameter)) { + publish_rate_ = parameter.as_double(); + } else { + publish_rate_ = -1.0; + } + + if (private_nh->get_parameter("default_stream_type", parameter)) { + __default_stream_type = parameter.as_string(); + } else { + __default_stream_type = "mjpeg"; + } stream_types_["mjpeg"] = boost::shared_ptr(new MjpegStreamerType()); + stream_types_["png"] = boost::shared_ptr(new PngStreamerType()); stream_types_["ros_compressed"] = boost::shared_ptr(new RosCompressedStreamerType()); stream_types_["vp8"] = boost::shared_ptr(new Vp8StreamerType()); stream_types_["h264"] = boost::shared_ptr(new H264StreamerType()); @@ -122,10 +137,25 @@ void WebVideoServer::spin() RCLCPP_INFO(nh_->get_logger(), "Waiting For connections on %s:%d", address_.c_str(), port_); rclcpp::executors::MultiThreadedExecutor spinner(rclcpp::executor::create_default_executor_arguments(), ros_threads_); spinner.add_node(nh_); + if ( publish_rate_ > 0 ) { + nh_->create_wall_timer(1s / publish_rate_, [this](){restreamFrames(1.0 / publish_rate_);}); + } spinner.spin(); server_->stop(); } +void WebVideoServer::restreamFrames( double max_age ) +{ + boost::mutex::scoped_lock lock(subscriber_mutex_); + + typedef std::vector >::iterator itr_type; + + for (itr_type itr = image_subscribers_.begin(); itr < image_subscribers_.end(); ++itr) + { + (*itr)->restreamFrame( max_age ); + } +} + void WebVideoServer::cleanup_inactive_streams() { boost::mutex::scoped_lock lock(subscriber_mutex_, boost::try_to_lock); @@ -149,9 +179,33 @@ bool WebVideoServer::handle_stream(const async_web_server_cpp::HttpRequest &requ async_web_server_cpp::HttpConnectionPtr connection, const char* begin, const char* end) { - std::string type = request.get_query_param_value_or_default("type", "mjpeg"); + std::string type = request.get_query_param_value_or_default("type", __default_stream_type); if (stream_types_.find(type) != stream_types_.end()) { + std::string topic = request.get_query_param_value_or_default("topic", ""); + // Fallback for topics without corresponding compressed topics + if (type == std::string("ros_compressed")) + { + std::string compressed_topic_name = topic + "/compressed"; + auto tnat = nh_->get_topic_names_and_types(); + bool did_find_compressed_topic = false; + for (auto topic_and_types : tnat) { + if (topic_and_types.second.size() > 1) { + // explicitly avoid topics with more than one type + break; + } + auto & topic_name = topic_and_types.first; + if(topic_name == compressed_topic_name || (topic_name.find("/") == 0 && topic_name.substr(1) == compressed_topic_name)){ + did_find_compressed_topic = true; + break; + } + } + if (!did_find_compressed_topic) + { + RCLCPP_WARN(nh_->get_logger(), "Could not find compressed image topic for %s, falling back to mjpeg", topic.c_str()); + type = "mjpeg"; + } + } boost::shared_ptr streamer = stream_types_[type]->create_streamer(request, connection, nh_); streamer->start(); boost::mutex::scoped_lock lock(subscriber_mutex_); @@ -181,10 +235,34 @@ bool WebVideoServer::handle_stream_viewer(const async_web_server_cpp::HttpReques async_web_server_cpp::HttpConnectionPtr connection, const char* begin, const char* end) { - std::string type = request.get_query_param_value_or_default("type", "mjpeg"); + std::string type = request.get_query_param_value_or_default("type", __default_stream_type); if (stream_types_.find(type) != stream_types_.end()) { std::string topic = request.get_query_param_value_or_default("topic", ""); + // Fallback for topics without corresponding compressed topics + if (type == std::string("ros_compressed")) + { + + std::string compressed_topic_name = topic + "/compressed"; + auto tnat = nh_->get_topic_names_and_types(); + bool did_find_compressed_topic = false; + for (auto topic_and_types : tnat) { + if (topic_and_types.second.size() > 1) { + // explicitly avoid topics with more than one type + break; + } + auto & topic_name = topic_and_types.first; + if(topic_name == compressed_topic_name || (topic_name.find("/") == 0 && topic_name.substr(1) == compressed_topic_name)){ + did_find_compressed_topic = true; + break; + } + } + if (!did_find_compressed_topic) + { + RCLCPP_WARN(nh_->get_logger(), "Could not find compressed image topic for %s, falling back to mjpeg", topic.c_str()); + type = "mjpeg"; + } + } async_web_server_cpp::HttpReply::builder(async_web_server_cpp::HttpReply::ok).header("Connection", "close").header( "Server", "web_video_server").header("Content-type", "text/html;").write(connection);