readerwriterqueue.h
1 // ©2013-2016 Cameron Desrochers.
2 // Distributed under the simplified BSD license (see the license file that
3 // should have come with this header).
4 
5 #pragma once
6 
7 #include "atomicops.h"
8 #include <type_traits>
9 #include <utility>
10 #include <cassert>
11 #include <stdexcept>
12 #include <cstdint>
13 #include <cstdlib> // For malloc/free/abort & size_t
14 
15 
16 // A lock-free queue for a single-consumer, single-producer architecture.
17 // The queue is also wait-free in the common path (except if more memory
18 // needs to be allocated, in which case malloc is called).
19 // Allocates memory sparingly (O(lg(n) times, amortized), and only once if
20 // the original maximum size estimate is never exceeded.
21 // Tested on x86/x64 processors, but semantics should be correct for all
22 // architectures (given the right implementations in atomicops.h), provided
23 // that aligned integer and pointer accesses are naturally atomic.
24 // Note that there should only be one consumer thread and producer thread;
25 // Switching roles of the threads, or using multiple consecutive threads for
26 // one role, is not safe unless properly synchronized.
27 // Using the queue exclusively from one thread is fine, though a bit silly.
28 
29 #ifndef MOODYCAMEL_CACHE_LINE_SIZE
30 #define MOODYCAMEL_CACHE_LINE_SIZE 64
31 #endif
32 
33 #ifndef MOODYCAMEL_EXCEPTIONS_ENABLED
34 #if (defined(_MSC_VER) && defined(_CPPUNWIND)) || (defined(__GNUC__) && defined(__EXCEPTIONS)) || (!defined(_MSC_VER) && !defined(__GNUC__))
35 #define MOODYCAMEL_EXCEPTIONS_ENABLED
36 #endif
37 #endif
38 
39 #ifdef AE_VCPP
40 #pragma warning(push)
41 #pragma warning(disable: 4324) // structure was padded due to __declspec(align())
42 #pragma warning(disable: 4820) // padding was added
43 #pragma warning(disable: 4127) // conditional expression is constant
44 #endif
45 
46 namespace moodycamel {
47 
48 template<typename T, size_t MAX_BLOCK_SIZE = 512>
50 {
51  // Design: Based on a queue-of-queues. The low-level queues are just
52  // circular buffers with front and tail indices indicating where the
53  // next element to dequeue is and where the next element can be enqueued,
54  // respectively. Each low-level queue is called a "block". Each block
55  // wastes exactly one element's worth of space to keep the design simple
56  // (if front == tail then the queue is empty, and can't be full).
57  // The high-level queue is a circular linked list of blocks; again there
58  // is a front and tail, but this time they are pointers to the blocks.
59  // The front block is where the next element to be dequeued is, provided
60  // the block is not empty. The back block is where elements are to be
61  // enqueued, provided the block is not full.
62  // The producer thread owns all the tail indices/pointers. The consumer
63  // thread owns all the front indices/pointers. Both threads read each
64  // other's variables, but only the owning thread updates them. E.g. After
65  // the consumer reads the producer's tail, the tail may change before the
66  // consumer is done dequeuing an object, but the consumer knows the tail
67  // will never go backwards, only forwards.
68  // If there is no room to enqueue an object, an additional block (of
69  // equal size to the last block) is added. Blocks are never removed.
70 
71 public:
72  // Constructs a queue that can hold maxSize elements without further
73  // allocations. If more than MAX_BLOCK_SIZE elements are requested,
74  // then several blocks of MAX_BLOCK_SIZE each are reserved (including
75  // at least one extra buffer block).
76  explicit ReaderWriterQueue(size_t maxSize = 15)
77 #ifndef NDEBUG
78  : enqueuing(false)
79  ,dequeuing(false)
80 #endif
81  {
82  assert(maxSize > 0);
83  assert(MAX_BLOCK_SIZE == ceilToPow2(MAX_BLOCK_SIZE) && "MAX_BLOCK_SIZE must be a power of 2");
84  assert(MAX_BLOCK_SIZE >= 2 && "MAX_BLOCK_SIZE must be at least 2");
85 
86  Block* firstBlock = nullptr;
87 
88  largestBlockSize = ceilToPow2(maxSize + 1); // We need a spare slot to fit maxSize elements in the block
89  if (largestBlockSize > MAX_BLOCK_SIZE * 2) {
90  // We need a spare block in case the producer is writing to a different block the consumer is reading from, and
91  // wants to enqueue the maximum number of elements. We also need a spare element in each block to avoid the ambiguity
92  // between front == tail meaning "empty" and "full".
93  // So the effective number of slots that are guaranteed to be usable at any time is the block size - 1 times the
94  // number of blocks - 1. Solving for maxSize and applying a ceiling to the division gives us (after simplifying):
95  size_t initialBlockCount = (maxSize + MAX_BLOCK_SIZE * 2 - 3) / (MAX_BLOCK_SIZE - 1);
96  largestBlockSize = MAX_BLOCK_SIZE;
97  Block* lastBlock = nullptr;
98  for (size_t i = 0; i != initialBlockCount; ++i) {
99  auto block = make_block(largestBlockSize);
100  if (block == nullptr) {
101 #ifdef MOODYCAMEL_EXCEPTIONS_ENABLED
102  throw std::bad_alloc();
103 #else
104  abort();
105 #endif
106  }
107  if (firstBlock == nullptr) {
108  firstBlock = block;
109  }
110  else {
111  lastBlock->next = block;
112  }
113  lastBlock = block;
114  block->next = firstBlock;
115  }
116  }
117  else {
118  firstBlock = make_block(largestBlockSize);
119  if (firstBlock == nullptr) {
120 #ifdef MOODYCAMEL_EXCEPTIONS_ENABLED
121  throw std::bad_alloc();
122 #else
123  abort();
124 #endif
125  }
126  firstBlock->next = firstBlock;
127  }
128  frontBlock = firstBlock;
129  tailBlock = firstBlock;
130 
131  // Make sure the reader/writer threads will have the initialized memory setup above:
132  fence(memory_order_sync);
133  }
134 
135  // Note: The queue should not be accessed concurrently while it's
136  // being deleted. It's up to the user to synchronize this.
138  {
139  // Make sure we get the latest version of all variables from other CPUs:
140  fence(memory_order_sync);
141 
142  // Destroy any remaining objects in queue and free memory
143  Block* frontBlock_ = frontBlock;
144  Block* block = frontBlock_;
145  do {
146  Block* nextBlock = block->next;
147  size_t blockFront = block->front;
148  size_t blockTail = block->tail;
149 
150  for (size_t i = blockFront; i != blockTail; i = (i + 1) & block->sizeMask) {
151  auto element = reinterpret_cast<T*>(block->data + i * sizeof(T));
152  element->~T();
153  (void)element;
154  }
155 
156  auto rawBlock = block->rawThis;
157  block->~Block();
158  std::free(rawBlock);
159  block = nextBlock;
160  } while (block != frontBlock_);
161  }
162 
163 
164  // Enqueues a copy of element if there is room in the queue.
165  // Returns true if the element was enqueued, false otherwise.
166  // Does not allocate memory.
167  AE_FORCEINLINE bool try_enqueue(T const& element)
168  {
169  return inner_enqueue<CannotAlloc>(element);
170  }
171 
172  // Enqueues a moved copy of element if there is room in the queue.
173  // Returns true if the element was enqueued, false otherwise.
174  // Does not allocate memory.
175  AE_FORCEINLINE bool try_enqueue(T&& element)
176  {
177  return inner_enqueue<CannotAlloc>(std::forward<T>(element));
178  }
179 
180 
181  // Enqueues a copy of element on the queue.
182  // Allocates an additional block of memory if needed.
183  // Only fails (returns false) if memory allocation fails.
184  AE_FORCEINLINE bool enqueue(T const& element)
185  {
186  return inner_enqueue<CanAlloc>(element);
187  }
188 
189  // Enqueues a moved copy of element on the queue.
190  // Allocates an additional block of memory if needed.
191  // Only fails (returns false) if memory allocation fails.
192  AE_FORCEINLINE bool enqueue(T&& element)
193  {
194  return inner_enqueue<CanAlloc>(std::forward<T>(element));
195  }
196 
197 
198  // Attempts to dequeue an element; if the queue is empty,
199  // returns false instead. If the queue has at least one element,
200  // moves front to result using operator=, then returns true.
201  template<typename U>
202  bool try_dequeue(U& result)
203  {
204 #ifndef NDEBUG
205  ReentrantGuard guard(this->dequeuing);
206 #endif
207 
208  // High-level pseudocode:
209  // Remember where the tail block is
210  // If the front block has an element in it, dequeue it
211  // Else
212  // If front block was the tail block when we entered the function, return false
213  // Else advance to next block and dequeue the item there
214 
215  // Note that we have to use the value of the tail block from before we check if the front
216  // block is full or not, in case the front block is empty and then, before we check if the
217  // tail block is at the front block or not, the producer fills up the front block *and
218  // moves on*, which would make us skip a filled block. Seems unlikely, but was consistently
219  // reproducible in practice.
220  // In order to avoid overhead in the common case, though, we do a double-checked pattern
221  // where we have the fast path if the front block is not empty, then read the tail block,
222  // then re-read the front block and check if it's not empty again, then check if the tail
223  // block has advanced.
224 
225  Block* frontBlock_ = frontBlock.load();
226  size_t blockTail = frontBlock_->localTail;
227  size_t blockFront = frontBlock_->front.load();
228 
229  if (blockFront != blockTail || blockFront != (frontBlock_->localTail = frontBlock_->tail.load())) {
230  fence(memory_order_acquire);
231 
232  non_empty_front_block:
233  // Front block not empty, dequeue from here
234  auto element = reinterpret_cast<T*>(frontBlock_->data + blockFront * sizeof(T));
235  result = std::move(*element);
236  element->~T();
237 
238  blockFront = (blockFront + 1) & frontBlock_->sizeMask;
239 
240  fence(memory_order_release);
241  frontBlock_->front = blockFront;
242  }
243  else if (frontBlock_ != tailBlock.load()) {
244  fence(memory_order_acquire);
245 
246  frontBlock_ = frontBlock.load();
247  blockTail = frontBlock_->localTail = frontBlock_->tail.load();
248  blockFront = frontBlock_->front.load();
249  fence(memory_order_acquire);
250 
251  if (blockFront != blockTail) {
252  // Oh look, the front block isn't empty after all
253  goto non_empty_front_block;
254  }
255 
256  // Front block is empty but there's another block ahead, advance to it
257  Block* nextBlock = frontBlock_->next;
258  // Don't need an acquire fence here since next can only ever be set on the tailBlock,
259  // and we're not the tailBlock, and we did an acquire earlier after reading tailBlock which
260  // ensures next is up-to-date on this CPU in case we recently were at tailBlock.
261 
262  size_t nextBlockFront = nextBlock->front.load();
263  size_t nextBlockTail = nextBlock->localTail = nextBlock->tail.load();
264  fence(memory_order_acquire);
265 
266  // Since the tailBlock is only ever advanced after being written to,
267  // we know there's for sure an element to dequeue on it
268  assert(nextBlockFront != nextBlockTail);
269  AE_UNUSED(nextBlockTail);
270 
271  // We're done with this block, let the producer use it if it needs
272  fence(memory_order_release); // Expose possibly pending changes to frontBlock->front from last dequeue
273  frontBlock = frontBlock_ = nextBlock;
274 
275  compiler_fence(memory_order_release); // Not strictly needed
276 
277  auto element = reinterpret_cast<T*>(frontBlock_->data + nextBlockFront * sizeof(T));
278 
279  result = std::move(*element);
280  element->~T();
281 
282  nextBlockFront = (nextBlockFront + 1) & frontBlock_->sizeMask;
283 
284  fence(memory_order_release);
285  frontBlock_->front = nextBlockFront;
286  }
287  else {
288  // No elements in current block and no other block to advance to
289  return false;
290  }
291 
292  return true;
293  }
294 
295 
296  // Returns a pointer to the front element in the queue (the one that
297  // would be removed next by a call to `try_dequeue` or `pop`). If the
298  // queue appears empty at the time the method is called, nullptr is
299  // returned instead.
300  // Must be called only from the consumer thread.
301  T* peek()
302  {
303 #ifndef NDEBUG
304  ReentrantGuard guard(this->dequeuing);
305 #endif
306  // See try_dequeue() for reasoning
307 
308  Block* frontBlock_ = frontBlock.load();
309  size_t blockTail = frontBlock_->localTail;
310  size_t blockFront = frontBlock_->front.load();
311 
312  if (blockFront != blockTail || blockFront != (frontBlock_->localTail = frontBlock_->tail.load())) {
313  fence(memory_order_acquire);
314  non_empty_front_block:
315  return reinterpret_cast<T*>(frontBlock_->data + blockFront * sizeof(T));
316  }
317  else if (frontBlock_ != tailBlock.load()) {
318  fence(memory_order_acquire);
319  frontBlock_ = frontBlock.load();
320  blockTail = frontBlock_->localTail = frontBlock_->tail.load();
321  blockFront = frontBlock_->front.load();
322  fence(memory_order_acquire);
323 
324  if (blockFront != blockTail) {
325  goto non_empty_front_block;
326  }
327 
328  Block* nextBlock = frontBlock_->next;
329 
330  size_t nextBlockFront = nextBlock->front.load();
331  fence(memory_order_acquire);
332 
333  assert(nextBlockFront != nextBlock->tail.load());
334  return reinterpret_cast<T*>(nextBlock->data + nextBlockFront * sizeof(T));
335  }
336 
337  return nullptr;
338  }
339 
340  // Removes the front element from the queue, if any, without returning it.
341  // Returns true on success, or false if the queue appeared empty at the time
342  // `pop` was called.
343  bool pop()
344  {
345 #ifndef NDEBUG
346  ReentrantGuard guard(this->dequeuing);
347 #endif
348  // See try_dequeue() for reasoning
349 
350  Block* frontBlock_ = frontBlock.load();
351  size_t blockTail = frontBlock_->localTail;
352  size_t blockFront = frontBlock_->front.load();
353 
354  if (blockFront != blockTail || blockFront != (frontBlock_->localTail = frontBlock_->tail.load())) {
355  fence(memory_order_acquire);
356 
357  non_empty_front_block:
358  auto element = reinterpret_cast<T*>(frontBlock_->data + blockFront * sizeof(T));
359  element->~T();
360 
361  blockFront = (blockFront + 1) & frontBlock_->sizeMask;
362 
363  fence(memory_order_release);
364  frontBlock_->front = blockFront;
365  }
366  else if (frontBlock_ != tailBlock.load()) {
367  fence(memory_order_acquire);
368  frontBlock_ = frontBlock.load();
369  blockTail = frontBlock_->localTail = frontBlock_->tail.load();
370  blockFront = frontBlock_->front.load();
371  fence(memory_order_acquire);
372 
373  if (blockFront != blockTail) {
374  goto non_empty_front_block;
375  }
376 
377  // Front block is empty but there's another block ahead, advance to it
378  Block* nextBlock = frontBlock_->next;
379 
380  size_t nextBlockFront = nextBlock->front.load();
381  size_t nextBlockTail = nextBlock->localTail = nextBlock->tail.load();
382  fence(memory_order_acquire);
383 
384  assert(nextBlockFront != nextBlockTail);
385  AE_UNUSED(nextBlockTail);
386 
387  fence(memory_order_release);
388  frontBlock = frontBlock_ = nextBlock;
389 
390  compiler_fence(memory_order_release);
391 
392  auto element = reinterpret_cast<T*>(frontBlock_->data + nextBlockFront * sizeof(T));
393  element->~T();
394 
395  nextBlockFront = (nextBlockFront + 1) & frontBlock_->sizeMask;
396 
397  fence(memory_order_release);
398  frontBlock_->front = nextBlockFront;
399  }
400  else {
401  // No elements in current block and no other block to advance to
402  return false;
403  }
404 
405  return true;
406  }
407 
408  // Returns the approximate number of items currently in the queue.
409  // Safe to call from both the producer and consumer threads.
410  inline size_t size_approx() const
411  {
412  size_t result = 0;
413  Block* frontBlock_ = frontBlock.load();
414  Block* block = frontBlock_;
415  do {
416  fence(memory_order_acquire);
417  size_t blockFront = block->front.load();
418  size_t blockTail = block->tail.load();
419  result += (blockTail - blockFront) & block->sizeMask;
420  block = block->next.load();
421  } while (block != frontBlock_);
422  return result;
423  }
424 
425 
426 private:
427  enum AllocationMode { CanAlloc, CannotAlloc };
428 
429  template<AllocationMode canAlloc, typename U>
430  bool inner_enqueue(U&& element)
431  {
432 #ifndef NDEBUG
433  ReentrantGuard guard(this->enqueuing);
434 #endif
435 
436  // High-level pseudocode (assuming we're allowed to alloc a new block):
437  // If room in tail block, add to tail
438  // Else check next block
439  // If next block is not the head block, enqueue on next block
440  // Else create a new block and enqueue there
441  // Advance tail to the block we just enqueued to
442 
443  Block* tailBlock_ = tailBlock.load();
444  size_t blockFront = tailBlock_->localFront;
445  size_t blockTail = tailBlock_->tail.load();
446 
447  size_t nextBlockTail = (blockTail + 1) & tailBlock_->sizeMask;
448  if (nextBlockTail != blockFront || nextBlockTail != (tailBlock_->localFront = tailBlock_->front.load())) {
449  fence(memory_order_acquire);
450  // This block has room for at least one more element
451  char* location = tailBlock_->data + blockTail * sizeof(T);
452  new (location) T(std::forward<U>(element));
453 
454  fence(memory_order_release);
455  tailBlock_->tail = nextBlockTail;
456  }
457  else {
458  fence(memory_order_acquire);
459  if (tailBlock_->next.load() != frontBlock) {
460  // Note that the reason we can't advance to the frontBlock and start adding new entries there
461  // is because if we did, then dequeue would stay in that block, eventually reading the new values,
462  // instead of advancing to the next full block (whose values were enqueued first and so should be
463  // consumed first).
464 
465  fence(memory_order_acquire); // Ensure we get latest writes if we got the latest frontBlock
466 
467  // tailBlock is full, but there's a free block ahead, use it
468  Block* tailBlockNext = tailBlock_->next.load();
469  size_t nextBlockFront = tailBlockNext->localFront = tailBlockNext->front.load();
470  nextBlockTail = tailBlockNext->tail.load();
471  fence(memory_order_acquire);
472 
473  // This block must be empty since it's not the head block and we
474  // go through the blocks in a circle
475  assert(nextBlockFront == nextBlockTail);
476  tailBlockNext->localFront = nextBlockFront;
477 
478  char* location = tailBlockNext->data + nextBlockTail * sizeof(T);
479  new (location) T(std::forward<U>(element));
480 
481  tailBlockNext->tail = (nextBlockTail + 1) & tailBlockNext->sizeMask;
482 
483  fence(memory_order_release);
484  tailBlock = tailBlockNext;
485  }
486  else if (canAlloc == CanAlloc) {
487  // tailBlock is full and there's no free block ahead; create a new block
488  auto newBlockSize = largestBlockSize >= MAX_BLOCK_SIZE ? largestBlockSize : largestBlockSize * 2;
489  auto newBlock = make_block(newBlockSize);
490  if (newBlock == nullptr) {
491  // Could not allocate a block!
492  return false;
493  }
494  largestBlockSize = newBlockSize;
495 
496  new (newBlock->data) T(std::forward<U>(element));
497 
498  assert(newBlock->front == 0);
499  newBlock->tail = newBlock->localTail = 1;
500 
501  newBlock->next = tailBlock_->next.load();
502  tailBlock_->next = newBlock;
503 
504  // Might be possible for the dequeue thread to see the new tailBlock->next
505  // *without* seeing the new tailBlock value, but this is OK since it can't
506  // advance to the next block until tailBlock is set anyway (because the only
507  // case where it could try to read the next is if it's already at the tailBlock,
508  // and it won't advance past tailBlock in any circumstance).
509 
510  fence(memory_order_release);
511  tailBlock = newBlock;
512  }
513  else if (canAlloc == CannotAlloc) {
514  // Would have had to allocate a new block to enqueue, but not allowed
515  return false;
516  }
517  else {
518  assert(false && "Should be unreachable code");
519  return false;
520  }
521  }
522 
523  return true;
524  }
525 
526 
527  // Disable copying
529 
530  // Disable assignment
531  ReaderWriterQueue& operator=(ReaderWriterQueue const&) { }
532 
533 
534 
535  AE_FORCEINLINE static size_t ceilToPow2(size_t x)
536  {
537  // From http://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2
538  --x;
539  x |= x >> 1;
540  x |= x >> 2;
541  x |= x >> 4;
542  for (size_t i = 1; i < sizeof(size_t); i <<= 1) {
543  x |= x >> (i << 3);
544  }
545  ++x;
546  return x;
547  }
548 
549  template<typename U>
550  static AE_FORCEINLINE char* align_for(char* ptr)
551  {
552  const std::size_t alignment = std::alignment_of<U>::value;
553  return ptr + (alignment - (reinterpret_cast<std::uintptr_t>(ptr) % alignment)) % alignment;
554  }
555 private:
556 #ifndef NDEBUG
557  struct ReentrantGuard
558  {
559  ReentrantGuard(bool& _inSection)
560  : inSection(_inSection)
561  {
562  assert(!inSection && "ReaderWriterQueue does not support enqueuing or dequeuing elements from other elements' ctors and dtors");
563  inSection = true;
564  }
565 
566  ~ReentrantGuard() { inSection = false; }
567 
568  private:
569  ReentrantGuard& operator=(ReentrantGuard const&);
570 
571  private:
572  bool& inSection;
573  };
574 #endif
575 
576  struct Block
577  {
578  // Avoid false-sharing by putting highly contended variables on their own cache lines
579  weak_atomic<size_t> front; // (Atomic) Elements are read from here
580  size_t localTail; // An uncontended shadow copy of tail, owned by the consumer
581 
582  char cachelineFiller0[MOODYCAMEL_CACHE_LINE_SIZE - sizeof(weak_atomic<size_t>) - sizeof(size_t)];
583  weak_atomic<size_t> tail; // (Atomic) Elements are enqueued here
584  size_t localFront;
585 
586  char cachelineFiller1[MOODYCAMEL_CACHE_LINE_SIZE - sizeof(weak_atomic<size_t>) - sizeof(size_t)]; // next isn't very contended, but we don't want it on the same cache line as tail (which is)
587  weak_atomic<Block*> next; // (Atomic)
588 
589  char* data; // Contents (on heap) are aligned to T's alignment
590 
591  const size_t sizeMask;
592 
593 
594  // size must be a power of two (and greater than 0)
595  Block(size_t const& _size, char* _rawThis, char* _data)
596  : front(0), localTail(0), tail(0), localFront(0), next(nullptr), data(_data), sizeMask(_size - 1), rawThis(_rawThis)
597  {
598  }
599 
600  private:
601  // C4512 - Assignment operator could not be generated
602  Block& operator=(Block const&);
603 
604  public:
605  char* rawThis;
606  };
607 
608 
609  static Block* make_block(size_t capacity)
610  {
611  // Allocate enough memory for the block itself, as well as all the elements it will contain
612  auto size = sizeof(Block) + std::alignment_of<Block>::value - 1;
613  size += sizeof(T) * capacity + std::alignment_of<T>::value - 1;
614  auto newBlockRaw = static_cast<char*>(std::malloc(size));
615  if (newBlockRaw == nullptr) {
616  return nullptr;
617  }
618 
619  auto newBlockAligned = align_for<Block>(newBlockRaw);
620  auto newBlockData = align_for<T>(newBlockAligned + sizeof(Block));
621  return new (newBlockAligned) Block(capacity, newBlockRaw, newBlockData);
622  }
623 
624 private:
625  weak_atomic<Block*> frontBlock; // (Atomic) Elements are enqueued to this block
626 
627  char cachelineFiller[MOODYCAMEL_CACHE_LINE_SIZE - sizeof(weak_atomic<Block*>)];
628  weak_atomic<Block*> tailBlock; // (Atomic) Elements are dequeued from this block
629 
630  size_t largestBlockSize;
631 
632 #ifndef NDEBUG
633  bool enqueuing;
634  bool dequeuing;
635 #endif
636 };
637 
638 // Like ReaderWriterQueue, but also providees blocking operations
639 template<typename T, size_t MAX_BLOCK_SIZE = 512>
641 {
642 private:
643  typedef ::moodycamel::ReaderWriterQueue<T, MAX_BLOCK_SIZE> ReaderWriterQueue;
644 
645 public:
646  explicit BlockingReaderWriterQueue(size_t maxSize = 15)
647  : inner(maxSize)
648  { }
649 
650 
651  // Enqueues a copy of element if there is room in the queue.
652  // Returns true if the element was enqueued, false otherwise.
653  // Does not allocate memory.
654  AE_FORCEINLINE bool try_enqueue(T const& element)
655  {
656  if (inner.try_enqueue(element)) {
657  sema.signal();
658  return true;
659  }
660  return false;
661  }
662 
663  // Enqueues a moved copy of element if there is room in the queue.
664  // Returns true if the element was enqueued, false otherwise.
665  // Does not allocate memory.
666  AE_FORCEINLINE bool try_enqueue(T&& element)
667  {
668  if (inner.try_enqueue(std::forward<T>(element))) {
669  sema.signal();
670  return true;
671  }
672  return false;
673  }
674 
675 
676  // Enqueues a copy of element on the queue.
677  // Allocates an additional block of memory if needed.
678  // Only fails (returns false) if memory allocation fails.
679  AE_FORCEINLINE bool enqueue(T const& element)
680  {
681  if (inner.enqueue(element)) {
682  sema.signal();
683  return true;
684  }
685  return false;
686  }
687 
688  // Enqueues a moved copy of element on the queue.
689  // Allocates an additional block of memory if needed.
690  // Only fails (returns false) if memory allocation fails.
691  AE_FORCEINLINE bool enqueue(T&& element)
692  {
693  if (inner.enqueue(std::forward<T>(element))) {
694  sema.signal();
695  return true;
696  }
697  return false;
698  }
699 
700 
701  // Attempts to dequeue an element; if the queue is empty,
702  // returns false instead. If the queue has at least one element,
703  // moves front to result using operator=, then returns true.
704  template<typename U>
705  bool try_dequeue(U& result)
706  {
707  if (sema.tryWait()) {
708  bool success = inner.try_dequeue(result);
709  assert(success);
710  AE_UNUSED(success);
711  return true;
712  }
713  return false;
714  }
715 
716 
717  // Attempts to dequeue an element; if the queue is empty,
718  // waits until an element is available, then dequeues it.
719  template<typename U>
720  void wait_dequeue(U& result)
721  {
722  sema.wait();
723  bool success = inner.try_dequeue(result);
724  AE_UNUSED(result);
725  assert(success);
726  AE_UNUSED(success);
727  }
728 
729 
730  // Returns a pointer to the front element in the queue (the one that
731  // would be removed next by a call to `try_dequeue` or `pop`). If the
732  // queue appears empty at the time the method is called, nullptr is
733  // returned instead.
734  // Must be called only from the consumer thread.
735  AE_FORCEINLINE T* peek()
736  {
737  return inner.peek();
738  }
739 
740  // Removes the front element from the queue, if any, without returning it.
741  // Returns true on success, or false if the queue appeared empty at the time
742  // `pop` was called.
743  AE_FORCEINLINE bool pop()
744  {
745  if (sema.tryWait()) {
746  bool result = inner.pop();
747  assert(result);
748  AE_UNUSED(result);
749  return true;
750  }
751  return false;
752  }
753 
754  // Returns the approximate number of items currently in the queue.
755  // Safe to call from both the producer and consumer threads.
756  AE_FORCEINLINE size_t size_approx() const
757  {
758  return sema.availableApprox();
759  }
760 
761 
762 private:
763  // Disable copying & assignment
765  BlockingReaderWriterQueue& operator=(ReaderWriterQueue const&) { }
766 
767 private:
768  ReaderWriterQueue inner;
770 };
771 
772 } // end namespace moodycamel
773 
774 #ifdef AE_VCPP
775 #pragma warning(pop)
776 #endif
Definition: readerwriterqueue.h:49
Definition: readerwriterqueue.h:640