Skip to content

Commit

Permalink
Sync ROS 2 branch up to version 0.2.1 (#95)
Browse files Browse the repository at this point in the history
* fix SteadyTimer check for backported ROS versions (#71)

i.e. on current kinetic

* Add PngStreamer (#74)

* lax rule for topic name (#77)

* Add a workaround for MultipartStream constant busy state (#83)

* Add a workaround for MultipartStream constant busy state
* Remove C++11 features

* Add "default_stream_type" parameter (#84)

This allows users to specify default stream type in their .launch files. Using a "ros_compressed" stream type sometimes
results in a much lower resource consumption, and having it set as a default is much nicer for end users.

* update changelog

* 0.2.0

* Fall back to mjpeg if ros_compressed is unavailable (#87)

* Update travis config (#89)

* Restream buffered frames with minimum publish rate (#88)

* Restream buffered frames with minimum publish rate

* Implement restreaming for ros_compressed_streamer

* update changelog

* 0.2.1
  • Loading branch information
dirk-thomas authored Sep 19, 2019
1 parent 84b410b commit ffdf25c
Show file tree
Hide file tree
Showing 15 changed files with 411 additions and 32 deletions.
27 changes: 27 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,33 @@
Changelog for package web_video_server
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

0.2.1 (2019-06-05)
------------------
* Restream buffered frames with minimum publish rate (`#88 <https://github.com/RobotWebTools/web_video_server/issues/88>`_)
* Restream buffered frames with minimum publish rate
* Implement restreaming for ros_compressed_streamer
* Update travis config (`#89 <https://github.com/RobotWebTools/web_video_server/issues/89>`_)
* Fall back to mjpeg if ros_compressed is unavailable (`#87 <https://github.com/RobotWebTools/web_video_server/issues/87>`_)
* Contributors: Jihoon Lee, Viktor Kunovski, sfalexrog

0.2.0 (2019-01-30)
------------------
* Add "default_stream_type" parameter (`#84 <https://github.com/RobotWebTools/web_video_server/issues/84>`_)
This allows users to specify default stream type in their .launch files. Using a "ros_compressed" stream type sometimes
results in a much lower resource consumption, and having it set as a default is much nicer for end users.
* Add a workaround for MultipartStream constant busy state (`#83 <https://github.com/RobotWebTools/web_video_server/issues/83>`_)
* Add a workaround for MultipartStream constant busy state
* Remove C++11 features
* lax rule for topic name (`#77 <https://github.com/RobotWebTools/web_video_server/issues/77>`_)
* Add PngStreamer (`#74 <https://github.com/RobotWebTools/web_video_server/issues/74>`_)
* fix SteadyTimer check for backported ROS versions (`#71 <https://github.com/RobotWebTools/web_video_server/issues/71>`_)
i.e. on current kinetic
* Pkg format 2 (`#68 <https://github.com/RobotWebTools/web_video_server/issues/68>`_)
* use package format 2
* add missing dependency on sensor_msgs
* fixed undeclared CODEC_FLAG_GLOBAL_HEADER (`#65 <https://github.com/RobotWebTools/web_video_server/issues/65>`_)
* Contributors: Andreas Klintberg, Dirk Thomas, Felix Ruess, Kazuto Murase, Viktor Kunovski, sfalexrog

0.1.0 (2018-07-01)
------------------
* Avoid queuing of images on slow ethernet connection (`#64 <https://github.com/RobotWebTools/web_video_server/issues/64>`_)
Expand Down
4 changes: 3 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ add_executable(${PROJECT_NAME}
src/vp9_streamer.cpp
src/multipart_stream.cpp
src/ros_compressed_streamer.cpp
src/jpeg_streamers.cpp)
src/jpeg_streamers.cpp
src/png_streamers.cpp
)

ament_target_dependencies(${PROJECT_NAME}
async_web_server_cpp cv_bridge image_transport rclcpp sensor_msgs)
Expand Down
14 changes: 13 additions & 1 deletion include/web_video_server/image_streamer.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,19 @@ class ImageStreamer
rclcpp::Node::SharedPtr nh);

virtual void start() = 0;
virtual ~ImageStreamer();

bool isInactive()
{
return inactive_;
}
;

/**
* Restreams the last received image frame if older than max_age.
*/
virtual void restreamFrame(double max_age) = 0;

std::string getTopic()
{
return topic_;
Expand All @@ -46,19 +52,25 @@ class ImageTransportImageStreamer : public ImageStreamer
public:
ImageTransportImageStreamer(const async_web_server_cpp::HttpRequest &request, async_web_server_cpp::HttpConnectionPtr connection,
rclcpp::Node::SharedPtr nh);
virtual ~ImageTransportImageStreamer();

virtual void start();

protected:
virtual void sendImage(const cv::Mat &, const rclcpp::Time &time) = 0;

virtual void restreamFrame(double max_age);
virtual void initialize(const cv::Mat &);

image_transport::Subscriber image_sub_;
int output_width_;
int output_height_;
bool invert_;
std::string default_transport_;

rclcpp::Time last_frame;
cv::Mat output_size_image;
boost::mutex send_mutex_;

private:
image_transport::ImageTransport it_;
bool initialized_;
Expand Down
4 changes: 2 additions & 2 deletions include/web_video_server/jpeg_streamers.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class MjpegStreamer : public ImageTransportImageStreamer
public:
MjpegStreamer(const async_web_server_cpp::HttpRequest &request, async_web_server_cpp::HttpConnectionPtr connection,
rclcpp::Node::SharedPtr nh);

~MjpegStreamer();
protected:
virtual void sendImage(const cv::Mat &, const rclcpp::Time &time);

Expand All @@ -38,7 +38,7 @@ class JpegSnapshotStreamer : public ImageTransportImageStreamer
public:
JpegSnapshotStreamer(const async_web_server_cpp::HttpRequest &request,
async_web_server_cpp::HttpConnectionPtr connection, rclcpp::Node::SharedPtr nh);

~JpegSnapshotStreamer();
protected:
virtual void sendImage(const cv::Mat &, const rclcpp::Time &time);

Expand Down
13 changes: 10 additions & 3 deletions include/web_video_server/multipart_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,21 @@
namespace web_video_server
{

struct PendingFooter {
rclcpp::Time timestamp;
std::weak_ptr<std::string> contents;
};

class MultipartStream {
public:
MultipartStream(async_web_server_cpp::HttpConnectionPtr& connection,
MultipartStream(std::function<rclcpp::Time()> get_now,
async_web_server_cpp::HttpConnectionPtr& connection,
const std::string& boundry="boundarydonotcross",
std::size_t max_queue_size=1);

void sendInitialHeader();
void sendPartHeader(const rclcpp::Time &time, const std::string& type, size_t payload_size);
void sendPartFooter();
void sendPartFooter(const rclcpp::Time &time);
void sendPartAndClear(const rclcpp::Time &time, const std::string& type, std::vector<unsigned char> &data);
void sendPart(const rclcpp::Time &time, const std::string& type, const boost::asio::const_buffer &buffer,
async_web_server_cpp::HttpConnection::ResourcePtr resource);
Expand All @@ -26,10 +32,11 @@ class MultipartStream {
bool isBusy();

private:
std::function<rclcpp::Time()> get_now_;
const std::size_t max_queue_size_;
async_web_server_cpp::HttpConnectionPtr connection_;
std::string boundry_;
std::queue<std::weak_ptr<const void> > pending_footers_;
std::queue<PendingFooter> pending_footers_;
};

}
Expand Down
51 changes: 51 additions & 0 deletions include/web_video_server/png_streamers.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#ifndef PNG_STREAMERS_H_
#define PNG_STREAMERS_H_

#include <image_transport/image_transport.h>
#include "web_video_server/image_streamer.h"
#include "async_web_server_cpp/http_request.hpp"
#include "async_web_server_cpp/http_connection.hpp"
#include "web_video_server/multipart_stream.h"

namespace web_video_server
{

class PngStreamer : public ImageTransportImageStreamer
{
public:
PngStreamer(const async_web_server_cpp::HttpRequest &request, async_web_server_cpp::HttpConnectionPtr connection,
rclcpp::Node::SharedPtr nh);
~PngStreamer();
protected:
virtual void sendImage(const cv::Mat &, const rclcpp::Time &time);

private:
MultipartStream stream_;
int quality_;
};

class PngStreamerType : public ImageStreamerType
{
public:
boost::shared_ptr<ImageStreamer> create_streamer(const async_web_server_cpp::HttpRequest &request,
async_web_server_cpp::HttpConnectionPtr connection,
rclcpp::Node::SharedPtr nh);
std::string create_viewer(const async_web_server_cpp::HttpRequest &request);
};

class PngSnapshotStreamer : public ImageTransportImageStreamer
{
public:
PngSnapshotStreamer(const async_web_server_cpp::HttpRequest &request,
async_web_server_cpp::HttpConnectionPtr connection, rclcpp::Node::SharedPtr nh);
~PngSnapshotStreamer();
protected:
virtual void sendImage(const cv::Mat &, const rclcpp::Time &time);

private:
int quality_;
};

}

#endif
9 changes: 8 additions & 1 deletion include/web_video_server/ros_compressed_streamer.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,20 @@ class RosCompressedStreamer : public ImageStreamer
public:
RosCompressedStreamer(const async_web_server_cpp::HttpRequest &request, async_web_server_cpp::HttpConnectionPtr connection,
rclcpp::Node::SharedPtr nh);
~RosCompressedStreamer();
virtual void start();
virtual void restreamFrame(double max_age);

protected:
virtual void sendImage(const sensor_msgs::msg::CompressedImage::ConstSharedPtr msg, const rclcpp::Time &time);

private:
void imageCallback(const sensor_msgs::msg::CompressedImage::ConstSharedPtr msg);

MultipartStream stream_;
rclcpp::Subscription<sensor_msgs::msg::CompressedImage>::SharedPtr image_sub_;
rclcpp::Time last_frame;
sensor_msgs::msg::CompressedImage::ConstSharedPtr last_msg;
boost::mutex send_mutex_;
};

class RosCompressedStreamerType : public ImageStreamerType
Expand Down
2 changes: 2 additions & 0 deletions include/web_video_server/web_video_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,13 @@ class WebVideoServer
async_web_server_cpp::HttpConnectionPtr connection, const char* begin, const char* end);

private:
void restreamFrames(double max_age);
void cleanup_inactive_streams();

rclcpp::Node::SharedPtr nh_;
rclcpp::WallTimer<rclcpp::VoidCallbackType>::SharedPtr cleanup_timer_;
int ros_threads_;
double publish_rate_;
int port_;
std::string address_;
boost::shared_ptr<async_web_server_cpp::HttpServer> server_;
Expand Down
2 changes: 1 addition & 1 deletion package.xml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?xml version="1.0"?>
<package format="2">
<name>web_video_server</name>
<version>0.1.0</version>
<version>0.2.1</version>
<description>HTTP Streaming of ROS Image Topics in Multiple Formats</description>

<maintainer email="rctoris@wpi.edu">Russell Toris</maintainer>
Expand Down
50 changes: 47 additions & 3 deletions src/image_streamer.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "web_video_server/image_streamer.h"
#include <cv_bridge/cv_bridge.h>
#include <iostream>

namespace web_video_server
{
Expand All @@ -11,6 +12,10 @@ ImageStreamer::ImageStreamer(const async_web_server_cpp::HttpRequest &request,
topic_ = request.get_query_param_value_or_default("topic", "");
}

ImageStreamer::~ImageStreamer()
{
}

ImageTransportImageStreamer::ImageTransportImageStreamer(const async_web_server_cpp::HttpRequest &request,
async_web_server_cpp::HttpConnectionPtr connection, rclcpp::Node::SharedPtr nh) :
ImageStreamer(request, connection, nh), it_(nh), initialized_(false)
Expand All @@ -21,6 +26,10 @@ ImageTransportImageStreamer::ImageTransportImageStreamer(const async_web_server_
default_transport_ = request.get_query_param_value_or_default("default_transport", "raw");
}

ImageTransportImageStreamer::~ImageTransportImageStreamer()
{
}

void ImageTransportImageStreamer::start()
{
image_transport::TransportHints hints(nh_.get(), default_transport_);
Expand All @@ -32,7 +41,7 @@ void ImageTransportImageStreamer::start()
break;
}
auto & topic_name = topic_and_types.first;
if(topic_name == topic_){
if(topic_name == topic_ || (topic_name.find("/") == 0 && topic_name.substr(1) == topic_)){
inactive_ = false;
break;
}
Expand All @@ -44,6 +53,39 @@ void ImageTransportImageStreamer::initialize(const cv::Mat &)
{
}

void ImageTransportImageStreamer::restreamFrame(double max_age)
{
if (inactive_ || !initialized_ )
return;
try {
if ( last_frame + rclcpp::Duration(max_age) < nh_->now() ) {
boost::mutex::scoped_lock lock(send_mutex_);
sendImage(output_size_image, nh_->now() ); // don't update last_frame, it may remain an old value.
}
}
catch (boost::system::system_error &e)
{
// happens when client disconnects
RCLCPP_DEBUG(nh_->get_logger(), "system_error exception: %s", e.what());
inactive_ = true;
return;
}
catch (std::exception &e)
{
// TODO THROTTLE with 30
RCLCPP_ERROR(nh_->get_logger(), "exception: %s", e.what());
inactive_ = true;
return;
}
catch (...)
{
// TODO THROTTLE with 30
RCLCPP_ERROR(nh_->get_logger(), "exception");
inactive_ = true;
return;
}
}

void ImageTransportImageStreamer::imageCallback(const sensor_msgs::msg::Image::ConstSharedPtr &msg)
{
if (inactive_)
Expand Down Expand Up @@ -87,7 +129,7 @@ void ImageTransportImageStreamer::imageCallback(const sensor_msgs::msg::Image::C
cv::flip(img, img, true);
}

cv::Mat output_size_image;
boost::mutex::scoped_lock lock(send_mutex_); // protects output_size_image
if (output_width_ != input_width || output_height_ != input_height)
{
cv::Mat img_resized;
Expand All @@ -105,7 +147,9 @@ void ImageTransportImageStreamer::imageCallback(const sensor_msgs::msg::Image::C
initialize(output_size_image);
initialized_ = true;
}
sendImage(output_size_image, msg->header.stamp);

last_frame = nh_->now();
sendImage(output_size_image, last_frame );

}
catch (cv_bridge::Exception &e)
Expand Down
Loading

0 comments on commit ffdf25c

Please # to comment.