Intel(R) Threading Building Blocks Doxygen Documentation version 4.2.3
Loading...
Searching...
No Matches
concurrent_priority_queue.h
Go to the documentation of this file.
1/*
2 Copyright (c) 2005-2020 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17#ifndef __TBB_concurrent_priority_queue_H
18#define __TBB_concurrent_priority_queue_H
19
20#define __TBB_concurrent_priority_queue_H_include_area
22
23#include "atomic.h"
25#include "tbb_exception.h"
26#include "tbb_stddef.h"
27#include "tbb_profiling.h"
31#include <vector>
32#include <iterator>
33#include <functional>
34#include __TBB_STD_SWAP_HEADER
35
36#if __TBB_INITIALIZER_LISTS_PRESENT
37 #include <initializer_list>
38#endif
39
40#if __TBB_CPP11_IS_COPY_CONSTRUCTIBLE_PRESENT
41 #include <type_traits>
42#endif
43
44namespace tbb {
45namespace interface5 {
46namespace internal {
47#if __TBB_CPP11_IS_COPY_CONSTRUCTIBLE_PRESENT
48 template<typename T, bool C = std::is_copy_constructible<T>::value>
49 struct use_element_copy_constructor {
51 };
52 template<typename T>
53 struct use_element_copy_constructor <T,false> {
55 };
56#else
57 template<typename>
60 };
61#endif
62} // namespace internal
63
64using namespace tbb::internal;
65
67template <typename T, typename Compare=std::less<T>, typename A=cache_aligned_allocator<T> >
69 public:
71 typedef T value_type;
72
74 typedef T& reference;
75
77 typedef const T& const_reference;
78
80 typedef size_t size_type;
81
83 typedef ptrdiff_t difference_type;
84
86 typedef A allocator_type;
87
90 {
92 }
93
95 explicit concurrent_priority_queue(const Compare& c, const allocator_type& a = allocator_type()) : mark(0), my_size(0), compare(c), data(a)
96 {
98 }
99
101 explicit concurrent_priority_queue(size_type init_capacity, const allocator_type& a = allocator_type()) :
102 mark(0), my_size(0), compare(), data(a)
103 {
104 data.reserve(init_capacity);
106 }
107
109 explicit concurrent_priority_queue(size_type init_capacity, const Compare& c, const allocator_type& a = allocator_type()) :
110 mark(0), my_size(0), compare(c), data(a)
111 {
112 data.reserve(init_capacity);
114 }
115
117 template<typename InputIterator>
118 concurrent_priority_queue(InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) :
119 mark(0), compare(), data(begin, end, a)
120 {
122 heapify();
123 my_size = data.size();
124 }
125
127 template<typename InputIterator>
128 concurrent_priority_queue(InputIterator begin, InputIterator end, const Compare& c, const allocator_type& a = allocator_type()) :
129 mark(0), compare(c), data(begin, end, a)
130 {
132 heapify();
133 my_size = data.size();
134 }
135
136#if __TBB_INITIALIZER_LISTS_PRESENT
138 concurrent_priority_queue(std::initializer_list<T> init_list, const allocator_type &a = allocator_type()) :
139 mark(0), compare(), data(init_list.begin(), init_list.end(), a)
140 {
142 heapify();
143 my_size = data.size();
144 }
145
147 concurrent_priority_queue(std::initializer_list<T> init_list, const Compare& c, const allocator_type &a = allocator_type()) :
148 mark(0), compare(c), data(init_list.begin(), init_list.end(), a)
149 {
151 heapify();
152 my_size = data.size();
153 }
154#endif //# __TBB_INITIALIZER_LISTS_PRESENT
155
157
159 my_size(src.my_size), data(src.data.begin(), src.data.end(), src.data.get_allocator())
160 {
162 heapify();
163 }
164
166
168 my_size(src.my_size), data(src.data.begin(), src.data.end(), a)
169 {
171 heapify();
172 }
173
175
177 if (this != &src) {
178 vector_t(src.data.begin(), src.data.end(), src.data.get_allocator()).swap(data);
179 mark = src.mark;
180 my_size = src.my_size;
181 }
182 return *this;
183 }
184
185#if __TBB_CPP11_RVALUE_REF_PRESENT
187
189 my_size(src.my_size), data(std::move(src.data))
190 {
192 }
193
195
197 my_size(src.my_size),
199 data(std::move(src.data), a)
200#else
201 // Some early version of C++11 STL vector does not have a constructor of vector(vector&& , allocator).
202 // It seems that the reason is absence of support of allocator_traits (stateful allocators).
203 data(a)
204#endif //__TBB_ALLOCATOR_TRAITS_PRESENT
205 {
207#if !__TBB_ALLOCATOR_TRAITS_PRESENT
208 if (a != src.data.get_allocator()){
209 data.reserve(src.data.size());
210 data.assign(std::make_move_iterator(src.data.begin()), std::make_move_iterator(src.data.end()));
211 }else{
212 data = std::move(src.data);
213 }
214#endif
215 }
216
218
220 if (this != &src) {
221 mark = src.mark;
222 my_size = src.my_size;
223#if !__TBB_ALLOCATOR_TRAITS_PRESENT
224 if (data.get_allocator() != src.data.get_allocator()){
225 vector_t(std::make_move_iterator(src.data.begin()), std::make_move_iterator(src.data.end()), data.get_allocator()).swap(data);
226 }else
227#endif
228 {
229 data = std::move(src.data);
230 }
231 }
232 return *this;
233 }
234#endif //__TBB_CPP11_RVALUE_REF_PRESENT
235
237 template<typename InputIterator>
238 void assign(InputIterator begin, InputIterator end) {
239 vector_t(begin, end, data.get_allocator()).swap(data);
240 mark = 0;
241 my_size = data.size();
242 heapify();
243 }
244
245#if __TBB_INITIALIZER_LISTS_PRESENT
247 void assign(std::initializer_list<T> il) { this->assign(il.begin(), il.end()); }
248
250 concurrent_priority_queue& operator=(std::initializer_list<T> il) {
251 this->assign(il.begin(), il.end());
252 return *this;
253 }
254#endif //# __TBB_INITIALIZER_LISTS_PRESENT
255
257
259 bool empty() const { return size()==0; }
260
262
265
267
269#if __TBB_CPP11_IS_COPY_CONSTRUCTIBLE_PRESENT
270 __TBB_STATIC_ASSERT( std::is_copy_constructible<value_type>::value, "The type is not copy constructible. Copying push operation is impossible." );
271#endif
272 cpq_operation op_data(elem, PUSH_OP);
273 my_aggregator.execute(&op_data);
274 if (op_data.status == FAILED) // exception thrown
276 }
277
278#if __TBB_CPP11_RVALUE_REF_PRESENT
280
281 void push(value_type &&elem) {
282 cpq_operation op_data(elem, PUSH_RVALUE_OP);
283 my_aggregator.execute(&op_data);
284 if (op_data.status == FAILED) // exception thrown
286 }
287
288#if __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
290
291 template<typename... Args>
292 void emplace(Args&&... args) {
293 push(value_type(std::forward<Args>(args)...));
294 }
295#endif /* __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT */
296#endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
297
299
302 bool try_pop(reference elem) {
303 cpq_operation op_data(POP_OP);
304 op_data.elem = &elem;
305 my_aggregator.execute(&op_data);
306 return op_data.status==SUCCEEDED;
307 }
308
310
313 void clear() {
314 data.clear();
315 mark = 0;
316 my_size = 0;
317 }
318
320
322 using std::swap;
323 data.swap(q.data);
324 swap(mark, q.mark);
325 swap(my_size, q.my_size);
326 }
327
329 allocator_type get_allocator() const { return data.get_allocator(); }
330
331 private:
334
335 class cpq_operation : public aggregated_operation<cpq_operation> {
336 public:
338 union {
341 };
343 type(t), elem(const_cast<value_type*>(&e)) {}
345 };
346
349 public:
352 void operator()(cpq_operation* op_list) {
353 cpq->handle_operations(op_list);
354 }
355 };
356
364 Compare compare;
366 char padding2[NFS_MaxLineSize - (2*sizeof(size_type)) - sizeof(Compare)];
368
385 typedef std::vector<value_type, allocator_type> vector_t;
387
389 cpq_operation *tmp, *pop_list=NULL;
390
391 __TBB_ASSERT(mark == data.size(), NULL);
392
393 // First pass processes all constant (amortized; reallocation may happen) time pushes and pops.
394 while (op_list) {
395 // ITT note: &(op_list->status) tag is used to cover accesses to op_list
396 // node. This thread is going to handle the operation, and so will acquire it
397 // and perform the associated operation w/o triggering a race condition; the
398 // thread that created the operation is waiting on the status field, so when
399 // this thread is done with the operation, it will perform a
400 // store_with_release to give control back to the waiting thread in
401 // aggregator::insert_operation.
402 call_itt_notify(acquired, &(op_list->status));
403 __TBB_ASSERT(op_list->type != INVALID_OP, NULL);
404 tmp = op_list;
405 op_list = itt_hide_load_word(op_list->next);
406 if (tmp->type == POP_OP) {
407 if (mark < data.size() &&
408 compare(data[0], data[data.size()-1])) {
409 // there are newly pushed elems and the last one
410 // is higher than top
411 *(tmp->elem) = tbb::internal::move(data[data.size()-1]);
414 data.pop_back();
415 __TBB_ASSERT(mark<=data.size(), NULL);
416 }
417 else { // no convenient item to pop; postpone
418 itt_hide_store_word(tmp->next, pop_list);
419 pop_list = tmp;
420 }
421 } else { // PUSH_OP or PUSH_RVALUE_OP
422 __TBB_ASSERT(tmp->type == PUSH_OP || tmp->type == PUSH_RVALUE_OP, "Unknown operation" );
423 __TBB_TRY{
424 if (tmp->type == PUSH_OP) {
426 } else {
427 data.push_back(tbb::internal::move(*(tmp->elem)));
428 }
431 } __TBB_CATCH(...) {
432 itt_store_word_with_release(tmp->status, uintptr_t(FAILED));
433 }
434 }
435 }
436
437 // second pass processes pop operations
438 while (pop_list) {
439 tmp = pop_list;
440 pop_list = itt_hide_load_word(pop_list->next);
441 __TBB_ASSERT(tmp->type == POP_OP, NULL);
442 if (data.empty()) {
443 itt_store_word_with_release(tmp->status, uintptr_t(FAILED));
444 }
445 else {
446 __TBB_ASSERT(mark<=data.size(), NULL);
447 if (mark < data.size() &&
448 compare(data[0], data[data.size()-1])) {
449 // there are newly pushed elems and the last one is
450 // higher than top
451 *(tmp->elem) = tbb::internal::move(data[data.size()-1]);
454 data.pop_back();
455 }
456 else { // extract top and push last element down heap
457 *(tmp->elem) = tbb::internal::move(data[0]);
460 reheap();
461 }
462 }
463 }
464
465 // heapify any leftover pushed elements before doing the next
466 // batch of operations
467 if (mark<data.size()) heapify();
468 __TBB_ASSERT(mark == data.size(), NULL);
469 }
470
472 void heapify() {
473 if (!mark && data.size()>0) mark = 1;
474 for (; mark<data.size(); ++mark) {
475 // for each unheapified element under size
476 size_type cur_pos = mark;
477 value_type to_place = tbb::internal::move(data[mark]);
478 do { // push to_place up the heap
479 size_type parent = (cur_pos-1)>>1;
480 if (!compare(data[parent], to_place)) break;
481 data[cur_pos] = tbb::internal::move(data[parent]);
482 cur_pos = parent;
483 } while( cur_pos );
484 data[cur_pos] = tbb::internal::move(to_place);
485 }
486 }
487
489
490 void reheap() {
491 size_type cur_pos=0, child=1;
492
493 while (child < mark) {
494 size_type target = child;
495 if (child+1 < mark && compare(data[child], data[child+1]))
496 ++target;
497 // target now has the higher priority child
498 if (compare(data[target], data[data.size()-1])) break;
499 data[cur_pos] = tbb::internal::move(data[target]);
500 cur_pos = target;
501 child = (cur_pos<<1)+1;
502 }
503 if (cur_pos != data.size()-1)
504 data[cur_pos] = tbb::internal::move(data[data.size()-1]);
505 data.pop_back();
506 if (mark > data.size()) mark = data.size();
507 }
508
510 data.push_back(t);
511 }
512
514 __TBB_ASSERT( false, "The type is not copy constructible. Copying push operation is impossible." );
515 }
516};
517
518#if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
519namespace internal {
520
521template<typename T, typename... Args>
522using priority_queue_t = concurrent_priority_queue<
523 T,
524 std::conditional_t< (sizeof...(Args)>0) && !is_allocator_v< pack_element_t<0, Args...> >,
525 pack_element_t<0, Args...>, std::less<T> >,
526 std::conditional_t< (sizeof...(Args)>0) && is_allocator_v< pack_element_t<sizeof...(Args)-1, Args...> >,
527 pack_element_t<sizeof...(Args)-1, Args...>, cache_aligned_allocator<T> >
528>;
529}
530
531// Deduction guide for the constructor from two iterators
532template<typename InputIterator,
533 typename T = typename std::iterator_traits<InputIterator>::value_type,
534 typename... Args
535> concurrent_priority_queue(InputIterator, InputIterator, Args...)
536-> internal::priority_queue_t<T, Args...>;
537
538template<typename T, typename CompareOrAllocalor>
539concurrent_priority_queue(std::initializer_list<T> init_list, CompareOrAllocalor)
540-> internal::priority_queue_t<T, CompareOrAllocalor>;
541
542#endif /* __TBB_CPP17_DEDUCTION_GUIDES_PRESENT */
543} // namespace interface5
544
546
547} // namespace tbb
548
550#undef __TBB_concurrent_priority_queue_H_include_area
551
552#endif /* __TBB_concurrent_priority_queue_H */
#define __TBB_CATCH(e)
Definition: tbb_stddef.h:284
#define __TBB_atomic
Definition: tbb_stddef.h:237
#define __TBB_TRY
Definition: tbb_stddef.h:283
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165
#define __TBB_STATIC_ASSERT(condition, msg)
Definition: tbb_stddef.h:553
#define __TBB_ALLOCATOR_TRAITS_PRESENT
Definition: tbb_config.h:335
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp end
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp begin
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id parent
const size_t NFS_MaxLineSize
Compile-time constant that is upper bound on cache line/sector size.
Definition: tbb_stddef.h:216
STL namespace.
The graph class.
void move(tbb_thread &t1, tbb_thread &t2)
Definition: tbb_thread.h:319
Identifiers declared inside namespace internal should never be used directly by client code.
Definition: atomic.h:65
T __TBB_load_with_acquire(const volatile T &location)
Definition: tbb_machine.h:709
void throw_exception(exception_id eid)
Versionless convenience wrapper for throw_exception_v4()
void itt_hide_store_word(T &dst, T src)
void itt_store_word_with_release(tbb::atomic< T > &dst, U src)
T itt_hide_load_word(const T &src)
void call_itt_notify(notify_type, void *)
void __TBB_store_with_release(volatile T &location, V value)
Definition: tbb_machine.h:713
Meets "allocator" requirements of ISO C++ Standard, Section 20.1.5.
concurrent_priority_queue(InputIterator begin, InputIterator end, const Compare &c, const allocator_type &a=allocator_type())
[begin,end) constructor
concurrent_priority_queue(InputIterator begin, InputIterator end, const allocator_type &a=allocator_type())
[begin,end) constructor
concurrent_priority_queue(concurrent_priority_queue &&src, const allocator_type &a)
Move constructor with specific allocator.
void push(const_reference elem)
Pushes elem onto the queue, increasing capacity of queue if necessary.
concurrent_priority_queue & operator=(std::initializer_list< T > il)
Assign from std::initializer_list, not thread-safe.
void clear()
Clear the queue; not thread-safe.
void push_back_helper(const T &, tbb::internal::false_type)
concurrent_priority_queue(const concurrent_priority_queue &src)
Copy constructor.
concurrent_priority_queue(const allocator_type &a=allocator_type())
Constructs a new concurrent_priority_queue with default capacity.
concurrent_priority_queue(size_type init_capacity, const allocator_type &a=allocator_type())
Constructs a new concurrent_priority_queue with init_sz capacity.
concurrent_priority_queue & operator=(const concurrent_priority_queue &src)
Assignment operator.
void push(value_type &&elem)
Pushes elem onto the queue, increasing capacity of queue if necessary.
tbb::internal::aggregator< my_functor_t, cpq_operation > aggregator_t
size_type size() const
Returns the current number of elements contained in the queue.
size_t size_type
Integral type for representing size of the queue.
char padding1[NFS_MaxLineSize - sizeof(aggregator_t)]
Padding added to avoid false sharing.
concurrent_priority_queue(size_type init_capacity, const Compare &c, const allocator_type &a=allocator_type())
Constructs a new concurrent_priority_queue with init_sz capacity.
bool try_pop(reference elem)
Gets a reference to and removes highest priority element.
concurrent_priority_queue(std::initializer_list< T > init_list, const Compare &c, const allocator_type &a=allocator_type())
Constructor from std::initializer_list.
concurrent_priority_queue(const concurrent_priority_queue &src, const allocator_type &a)
Copy constructor with specific allocator.
void swap(concurrent_priority_queue &q)
Swap this queue with another; not thread-safe.
concurrent_priority_queue(std::initializer_list< T > init_list, const allocator_type &a=allocator_type())
Constructor from std::initializer_list.
concurrent_priority_queue & operator=(concurrent_priority_queue &&src)
Move assignment operator.
void reheap()
Re-heapify after an extraction.
void assign(std::initializer_list< T > il)
Assign the queue from std::initializer_list, not thread-safe.
void heapify()
Merge unsorted elements into heap.
void assign(InputIterator begin, InputIterator end)
Assign the queue from [begin,end) range, not thread-safe.
size_type mark
The point at which unsorted elements begin.
bool empty() const
Returns true if empty, false otherwise.
std::vector< value_type, allocator_type > vector_t
Storage for the heap of elements in queue, plus unheapified elements.
concurrent_priority_queue(const Compare &c, const allocator_type &a=allocator_type())
Constructs a new concurrent_priority_queue with default capacity.
ptrdiff_t difference_type
Difference type for iterator.
concurrent_priority_queue(concurrent_priority_queue &&src)
Move constructor.
void emplace(Args &&... args)
Constructs a new element using args as the arguments for its construction and pushes it onto the queu...
void push_back_helper(const T &t, tbb::internal::true_type)
allocator_type get_allocator() const
Return allocator object.
char padding2[NFS_MaxLineSize -(2 *sizeof(size_type)) - sizeof(Compare)]
Padding added to avoid false sharing.
my_functor_t(concurrent_priority_queue< T, Compare, A > *cpq_)
uintptr_t status
Zero value means "wait" status, all other values are "user" specified values and are defined into the...

Copyright © 2005-2020 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.