diff --git a/elements/standard/expunqueue.cc b/elements/standard/expunqueue.cc new file mode 100644 index 0000000000..b7a4b36f94 --- /dev/null +++ b/elements/standard/expunqueue.cc @@ -0,0 +1,222 @@ +// -*- c-basic-offset: 4 -*- +/* + * expunqueue.{cc,hh} -- element pulls as many packets as possible from + * its input, pushes them out its output + * Tom Barbette + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, subject to the conditions + * listed in the Click LICENSE file. These conditions include: you must + * preserve this copyright notice, and you cannot mention the copyright + * holders in advertising related to the Software without their permission. + * The Software is provided WITHOUT ANY WARRANTY, EXPRESS OR IMPLIED. This + * notice is a summary of the Click LICENSE file; the license in that file is + * legally binding. + */ + +#include +#include "expunqueue.hh" +#include +#include +#include +#include +CLICK_DECLS + +EXPUnqueue::EXPUnqueue() + : _task(this), _timer(&_task), _runs(0), _packets(0), _pushes(0), _failed_pulls(0), _empty_runs(0), _burst(32), _active(true) +{ +#if HAVE_BATCH + in_batch_mode = BATCH_MODE_YES; +#endif +} + +int +EXPUnqueue::configure(Vector &conf, ErrorHandler *errh) +{ + if (Args(this, errh).bind(conf) + .read_or_set("BURST", _burst, 32) + .read_or_set("ACTIVE", _active, true) + .consume() < 0) + return -1; + return configure_helper(is_bandwidth(), this, conf, errh); +} + +double +EXPUnqueue::ran_expo(double lambda){ + //DEPRECATED + /*double u; + u = (double)_gen() / (_gen.max() + 1.0); + return -log(1- u) / lambda;*/ + return _poisson(_gen) * (double)cycles_hz(); + +} + +int +EXPUnqueue::configure_helper(bool is_bandwidth, Element *elt, Vector &conf, ErrorHandler *errh) +{ + unsigned r; + unsigned dur_msec = 20; + unsigned tokens; + bool dur_specified, tokens_specified; + const char *burst_size = is_bandwidth ? "BURST_BYTES" : "BURST_SIZE"; + int s = 0; + + Args args(conf, elt, errh); + + if (args.read_mp("RATE", r) + .read("SEED", s) + .read(burst_size, tokens).read_status(tokens_specified) + .complete() < 0) + return -1; + + if (s == 0) + _gen = std::mt19937(rand()); + else + _gen = std::mt19937(s); + + _lambda = (double)1.0f * ((double)r); + click_chatter("Lambda %f, cycles %lu",_lambda,cycles_hz()); + _last =0; + _poisson = std::exponential_distribution<> (_lambda); + _inter_arrival_time = 0;// ran_expo(_lambda); + //click_chatter("First packet at %lu",_inter_arrival_time); + return 0; +} + +int +EXPUnqueue::initialize(ErrorHandler *errh) +{ + ScheduleInfo::initialize_task(this, &_task, errh); + _signal = Notifier::upstream_empty_signal(this, 0, &_task); + _timer.initialize(this); + return 0; +} + + + +void EXPUnqueue::refill(uint64_t now) { + + while (_last < now) { + + _last += _inter_arrival_time; + _bucket++; + _inter_arrival_time = ran_expo(_lambda); + //click_chatter("Next in %lu",_inter_arrival_time); + } +} + +bool +EXPUnqueue::run_task(Task *) +{ + bool worked = false; + _runs++; + + if (!_active) + return false; + uint64_t now = click_get_cycles(); + if (_last == 0) + _last = now; + refill(now); + if (_bucket > 0) { +#if HAVE_BATCH + int burst = _bucket; + if (burst > (int)_burst) + burst = _burst; + PacketBatch* batch = input(0).pull_batch(burst); + if (batch) { + int c = batch->count(); + _bucket -= c; + _packets += c; + _pushes++; + worked = true; + output(0).push_batch(batch); + } else { + _failed_pulls++; + if (!_signal) + return false; // without rescheduling + } +#else + if (Packet *p = input(0).pull()) { + _bucket -= 1; + _packets++; + _pushes++; + worked = true; + output(0).push(p); + } else { // no Packet available + _failed_pulls++; + if (!_signal) + return false; // without rescheduling + } +#endif + } else { + uint64_t now = click_get_cycles(); + refill(now); + + _timer.schedule_after(Timestamp::make_nsec((double)(_last + _inter_arrival_time - now) / (cycles_hz() / 1000000000))); + _empty_runs++; + return false; + } + _task.fast_reschedule(); + return worked; +} + +String +EXPUnqueue::read_handler(Element *e, void *thunk) +{ + EXPUnqueue *ru = (EXPUnqueue *)e; + switch ((uintptr_t) thunk) { + case h_rate: + return String(1.0f/ru->_lambda); + case h_calls: { + StringAccum sa; + sa << ru->_runs << " calls to run_task()\n" + << ru->_empty_runs << " empty runs\n" + << ru->_pushes << " pushes\n" + << ru->_failed_pulls << " failed pulls\n" + << ru->_packets << " packets\n"; + return sa.take_string(); + } + } + return String(); +} + +enum {h_active}; +int +EXPUnqueue::write_param(const String &conf, Element *e, void *user_data, + ErrorHandler *errh) +{ + EXPUnqueue *u = static_cast(e); + switch (reinterpret_cast(user_data)) { + case h_active: + click_chatter("Active handler"); + if (!BoolArg().parse(conf, u->_active)) + return errh->error("syntax error"); + if (u->_active && !u->_task.scheduled()) { + + click_chatter("Scheduling"); + u->_task.reschedule(); + } + + break; + + } + return 0; +} + +void +EXPUnqueue::add_handlers() +{ + add_read_handler("calls", read_handler, h_calls); + add_read_handler("rate", read_handler, h_rate); + add_write_handler("rate", reconfigure_keyword_handler, "0 RATE"); + add_data_handlers("active", Handler::OP_READ | Handler::CHECKBOX, &_active); + add_write_handler("active", write_param, h_active); + add_task_handlers(&_task); + add_read_handler("config", read_handler, h_rate); + set_handler_flags("config", 0, Handler::CALM); +} + +CLICK_ENDDECLS +EXPORT_ELEMENT(EXPUnqueue) +ELEMENT_MT_SAFE(EXPUnqueue) diff --git a/elements/standard/expunqueue.hh b/elements/standard/expunqueue.hh new file mode 100644 index 0000000000..f535a6febe --- /dev/null +++ b/elements/standard/expunqueue.hh @@ -0,0 +1,97 @@ +// -*- c-basic-offset: 4 -*- +#ifndef CLICK_EXPUnqueue_HH +#define CLICK_EXPUnqueue_HH +#include +#include +#include +#include +#include +#include + +CLICK_DECLS + +/* + * =c + * EXPUnqueue(RATE, I[]) + * =s shaping + * pull-to-push converter + * =d + * + * Pulls packets at the given RATE in packets per second, and pushes them out + * its single output. It is implemented with a token bucket. The capacity of + * this token bucket defaults to 20 milliseconds worth of tokens, but can be + * customized by setting BURST_DURATION or BURST_SIZE. + * + * Keyword arguments are: + * + * =over 8 + * + * =item RATE + * + * Integer. Token bucket fill rate in packets per second. + * + * =item BURST_DURATION + * + * Time. If specified, the capacity of the token bucket is calculated as + * rate * burst_duration. + * + * =item BURST_SIZE + * + * Integer. If specified, the capacity of the token bucket is set to this + * value. + * + * =h rate read/write + * + * =a BandwidthEXPUnqueue, Unqueue, Shaper, RatedSplitter */ + +class EXPUnqueue : public BatchElement { public: + + EXPUnqueue() CLICK_COLD; + + const char *class_name() const override { return "EXPUnqueue"; } + const char *port_count() const override { return PORTS_1_1; } + const char *processing() const override { return PULL_TO_PUSH; } + bool is_bandwidth() const { return class_name()[0] == 'B'; } + + int configure(Vector &, ErrorHandler *) CLICK_COLD; + int configure_helper(bool is_bandwidth, Element *elt, Vector &conf, ErrorHandler *errh); + enum { tb_bandwidth_thresh = 131072 }; + + bool can_live_reconfigure() const { return true; } + int initialize(ErrorHandler *) CLICK_COLD; + void add_handlers() CLICK_COLD; + + bool run_task(Task *); + + protected: + + double ran_expo(double lambda); + static int write_param(const String &conf, Element *e, void *user_data, + ErrorHandler *errh); + + std::mt19937 _gen; + std::exponential_distribution<> _poisson; + uint64_t _bucket; + Task _task; + Timer _timer; + NotifierSignal _signal; + uint32_t _runs; + uint32_t _packets; + uint32_t _pushes; + uint32_t _failed_pulls; + uint32_t _empty_runs; + uint32_t _burst; + double _lambda; + uint64_t _inter_arrival_time; + uint64_t _last; + void refill(uint64_t now); + + enum { h_calls, h_rate }; + + static String read_handler(Element *e, void *thunk) CLICK_COLD; + + bool _active; +}; + +CLICK_ENDDECLS +#endif