libfilezilla
rate_limiter.hpp
Go to the documentation of this file.
1 #ifndef LIBFILEZILLA_RATE_LIMITER_HEADER
2 #define LIBFILEZILLA_RATE_LIMITER_HEADER
3 
12 #include "event_handler.hpp"
13 
14 #include <atomic>
15 #include <vector>
16 
17 namespace fz {
18 
19 namespace rate {
20 using type = uint64_t;
21 enum : type {
22  unlimited = static_cast<type>(-1)
23 };
24 }
25 
26 namespace direction {
27 enum type : size_t {
28  inbound,
29  outbound
30 };
31 }
32 
33 class rate_limiter;
34 
45 class FZ_PUBLIC_SYMBOL rate_limit_manager final : public event_handler
46 {
47 public:
48  explicit rate_limit_manager(event_loop & loop);
49  virtual ~rate_limit_manager();
50 
57  void add(rate_limiter* limiter);
58 
60  void set_burst_tolerance(rate::type tolerance);
61 
62 private:
63  friend class rate_limiter;
64  friend class bucket_base;
65  friend class bucket;
66 
67  void FZ_PRIVATE_SYMBOL record_activity();
68 
69  void FZ_PRIVATE_SYMBOL operator()(event_base const& ev);
70  void FZ_PRIVATE_SYMBOL on_timer(timer_id const&);
71 
72  void FZ_PRIVATE_SYMBOL process(rate_limiter* limiter, bool locked);
73 
74  std::atomic<int> activity_{2};
75  mutex mtx_{false};
76  std::vector<rate_limiter*> limiters_;
77 
78  std::atomic<timer_id> timer_{};
79 
80  std::atomic<rate::type> burst_tolerance_{1};
81 };
82 
84 class FZ_PUBLIC_SYMBOL bucket_base
85 {
86 public:
87  virtual ~bucket_base() noexcept = default;
88 
94  virtual void remove_bucket();
95 
96 protected:
97  friend class rate_limiter;
98 
104  virtual void lock_tree() { mtx_.lock(); }
105 
111  virtual void update_stats(bool & active) = 0;
112 
118  virtual size_t weight() const { return 1; }
119 
125  virtual size_t unsaturated(direction::type const /*d*/) const { return 0; }
126 
133 
143  virtual rate::type add_tokens(direction::type const /*d*/, rate::type /*tokens*/, rate::type /*limit*/) = 0;
144 
154  virtual rate::type distribute_overflow(direction::type const /*d*/, rate::type /*tokens*/) { return 0; }
155 
159  virtual void unlock_tree() { mtx_.unlock(); }
160 
166  virtual std::array<rate::type, 2> gather_unspent_for_removal() = 0;
167 
168  mutex mtx_{false};
169  rate_limit_manager * mgr_{};
170  void * parent_{};
171  size_t idx_{static_cast<size_t>(-1)};
172 };
173 
184 class FZ_PUBLIC_SYMBOL rate_limiter final : public bucket_base
185 {
186 public:
187  rate_limiter() = default;
188  explicit rate_limiter(rate_limit_manager * mgr);
189  virtual ~rate_limiter();
190 
198 
206  void set_limits(rate::type download_limit, rate::type upload_limit);
207 
209  rate::type limit(direction::type const d);
210 
211 private:
212  friend class bucket_base;
213  friend class rate_limit_manager;
214 
215  virtual void FZ_PRIVATE_SYMBOL lock_tree() override;
216 
217  bool FZ_PRIVATE_SYMBOL do_set_limit(direction::type const d, rate::type limit);
218 
219  virtual void FZ_PRIVATE_SYMBOL update_stats(bool & active) override;
220  virtual size_t FZ_PRIVATE_SYMBOL weight() const override { return weight_; }
221  virtual size_t FZ_PRIVATE_SYMBOL unsaturated(direction::type const d) const override { return data_[d].unused_capacity_ ? data_[d].unsaturated_ : 0; }
222  virtual void FZ_PRIVATE_SYMBOL set_mgr_recursive(rate_limit_manager * mgr) override;
223 
224  virtual rate::type FZ_PRIVATE_SYMBOL add_tokens(direction::type const d, rate::type tokens, rate::type limit) override;
225  virtual rate::type FZ_PRIVATE_SYMBOL distribute_overflow(direction::type const d, rate::type tokens) override;
226 
227  virtual void FZ_PRIVATE_SYMBOL unlock_tree() override;
228 
229  void FZ_PRIVATE_SYMBOL pay_debt(direction::type const d);
230 
231  virtual std::array<rate::type, 2> FZ_PRIVATE_SYMBOL gather_unspent_for_removal() override;
232 
233  std::vector<bucket_base*> buckets_;
234  std::vector<size_t> scratch_buffer_;
235  size_t weight_{};
236 
237  struct FZ_PRIVATE_SYMBOL data_t {
238  rate::type limit_{rate::unlimited};
239  rate::type merged_tokens_{};
240  rate::type overflow_{};
241  rate::type debt_{};
242  rate::type unused_capacity_{};
243  rate::type carry_{};
244  size_t unsaturated_{};
245  };
246  data_t data_[2];
247 };
248 
252 class FZ_PUBLIC_SYMBOL bucket : public bucket_base
253 {
254 public:
255  virtual ~bucket();
256 
257  virtual void remove_bucket() override;
258 
264  rate::type available(direction::type const d);
265 
272  void consume(direction::type const d, rate::type amount);
273 
274 protected:
280  virtual void wakeup(direction::type /*d*/) {}
281 
283  bool waiting(scoped_lock & l, direction::type d);
284 
285 private:
286  virtual void update_stats(bool & active) override;
287  virtual size_t unsaturated(direction::type const d) const override { return data_[d].unsaturated_ ? 1 : 0; }
288 
289  virtual rate::type add_tokens(direction::type const d, rate::type tokens, rate::type limit) override;
290  virtual rate::type distribute_overflow(direction::type const d, rate::type tokens) override;
291 
292  virtual void unlock_tree() override;
293 
294  virtual std::array<rate::type, 2> gather_unspent_for_removal() override;
295 
296  struct data_t {
297  rate::type available_{rate::unlimited};
298  rate::type overflow_multiplier_{1};
299  rate::type bucket_size_{rate::unlimited};
300  bool waiting_{};
301  bool unsaturated_{};
302  } data_[2];
303 };
304 
305 }
306 
307 #endif
Base class for buckets.
Definition: rate_limiter.hpp:85
virtual rate::type add_tokens(direction::type const, rate::type, rate::type)=0
Recursively adds tokens.
virtual size_t weight() const
Returns the weight of the tree.
Definition: rate_limiter.hpp:118
virtual void update_stats(bool &active)=0
Updates weight and usage statistics.
virtual rate::type distribute_overflow(direction::type const, rate::type)
Recursively distributes overflow.
Definition: rate_limiter.hpp:154
virtual std::array< rate::type, 2 > gather_unspent_for_removal()=0
Gather unspent tokens during removal to repay debt.
virtual size_t unsaturated(direction::type const) const
Returns the number of buckets not yet full.
Definition: rate_limiter.hpp:125
virtual void remove_bucket()
virtual void set_mgr_recursive(rate_limit_manager *mgr)
Recursively sets the manager.
virtual void lock_tree()
Recursively locks the mutexes of self and all children.
Definition: rate_limiter.hpp:104
virtual void unlock_tree()
Recursively unlocks the mutexes of self and all children.
Definition: rate_limiter.hpp:159
A rate-limited token bucket.
Definition: rate_limiter.hpp:253
rate::type available(direction::type const d)
Returns available octets.
virtual void remove_bucket() override
void consume(direction::type const d, rate::type amount)
Consumes octets.
virtual void wakeup(direction::type)
Called in response to unlock_tree if tokens have become available.
Definition: rate_limiter.hpp:280
bool waiting(scoped_lock &l, direction::type d)
Call with the bucket_base mutex lock.
Common base class for all events.
Definition: event.hpp:23
Simple handler for asynchronous event processing.
Definition: event_handler.hpp:55
A threaded event loop that supports sending events and timers.
Definition: event_loop.hpp:34
Lean replacement for std::(recursive_)mutex.
Definition: mutex.hpp:52
The process class manages an asynchronous process with redirected IO.
Definition: process.hpp:61
Context for rate_limiters.
Definition: rate_limiter.hpp:46
void add(rate_limiter *limiter)
Adds a limiter to the manager.
void set_burst_tolerance(rate::type tolerance)
Burst tolerance, a multiplier to bucket size, helps achieving the average rate on bursty connections.
A limiter for the attached buckets.
Definition: rate_limiter.hpp:185
void add(bucket_base *bucket)
Adds a bucket to the limiter.
void set_limits(rate::type download_limit, rate::type upload_limit)
Sets the number of octets all buckets combined may consume each second.
rate::type limit(direction::type const d)
Returns current limit.
A simple scoped lock.
Definition: mutex.hpp:93
Declares the event_handler class.
type
Definition: logger.hpp:16
The namespace used by libfilezilla.
Definition: apply.hpp:17