Intel(R) Threading Building Blocks Doxygen Documentation version 4.2.3
tbb::internal::micro_queue Struct Reference

A queue using simple locking. More...

Collaboration diagram for tbb::internal::micro_queue:

Public Types

typedef concurrent_queue_base::page page
 

Public Member Functions

void push (const void *item, ticket k, concurrent_queue_base &base, concurrent_queue_base::copy_specifics op_type)
 
void abort_push (ticket k, concurrent_queue_base &base)
 
bool pop (void *dst, ticket k, concurrent_queue_base &base)
 
micro_queueassign (const micro_queue &src, concurrent_queue_base &base, concurrent_queue_base::copy_specifics op_type)
 
pagemake_copy (concurrent_queue_base &base, const page *src_page, size_t begin_in_page, size_t end_in_page, ticket &g_index, concurrent_queue_base::copy_specifics op_type)
 
void make_invalid (ticket k)
 

Public Attributes

atomic< page * > head_page
 
atomic< tickethead_counter
 
atomic< page * > tail_page
 
atomic< tickettail_counter
 
spin_mutex page_mutex
 

Friends

class micro_queue_pop_finalizer
 

Detailed Description

A queue using simple locking.

For efficiency, this class has no constructor. The caller is expected to zero-initialize it.

Definition at line 47 of file concurrent_queue.cpp.

Member Typedef Documentation

◆ page

Member Function Documentation

◆ abort_push()

void tbb::internal::micro_queue::abort_push ( ticket  k,
concurrent_queue_base base 
)

Definition at line 226 of file concurrent_queue.cpp.

226 {
227 push(NULL, k, base, concurrent_queue_base::copy);
228}
void push(const void *item, ticket k, concurrent_queue_base &base, concurrent_queue_base::copy_specifics op_type)

References tbb::internal::concurrent_queue_base_v3::copy, and push().

Referenced by tbb::internal::concurrent_queue_base_v3::internal_insert_item().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ assign()

micro_queue & tbb::internal::micro_queue::assign ( const micro_queue src,
concurrent_queue_base base,
concurrent_queue_base::copy_specifics  op_type 
)

Definition at line 253 of file concurrent_queue.cpp.

255{
256 head_counter = src.head_counter;
257 tail_counter = src.tail_counter;
258
259 const page* srcp = src.head_page;
260 if( srcp ) {
261 ticket g_index = head_counter;
262 __TBB_TRY {
264 size_t index = modulo_power_of_two( head_counter/concurrent_queue_rep::n_queue, base.items_per_page );
265 size_t end_in_first_page = (index+n_items<base.items_per_page)?(index+n_items):base.items_per_page;
266
267 head_page = make_copy( base, srcp, index, end_in_first_page, g_index, op_type );
268 page* cur_page = head_page;
269
270 if( srcp != src.tail_page ) {
271 for( srcp = srcp->next; srcp!=src.tail_page; srcp=srcp->next ) {
272 cur_page->next = make_copy( base, srcp, 0, base.items_per_page, g_index, op_type );
273 cur_page = cur_page->next;
274 }
275
276 __TBB_ASSERT( srcp==src.tail_page, NULL );
277
278 size_t last_index = modulo_power_of_two( tail_counter/concurrent_queue_rep::n_queue, base.items_per_page );
279 if( last_index==0 ) last_index = base.items_per_page;
280
281 cur_page->next = make_copy( base, srcp, 0, last_index, g_index, op_type );
282 cur_page = cur_page->next;
283 }
284 tail_page = cur_page;
285 } __TBB_CATCH(...) {
286 make_invalid( g_index );
288 }
289 } else {
290 head_page = tail_page = NULL;
291 }
292 return *this;
293}
#define __TBB_CATCH(e)
Definition: tbb_stddef.h:284
#define __TBB_TRY
Definition: tbb_stddef.h:283
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165
#define __TBB_RETHROW()
Definition: tbb_stddef.h:286
argument_integer_type modulo_power_of_two(argument_integer_type arg, divisor_integer_type divisor)
A function to compute arg modulo divisor where divisor is a power of 2.
Definition: tbb_stddef.h:382
page * make_copy(concurrent_queue_base &base, const page *src_page, size_t begin_in_page, size_t end_in_page, ticket &g_index, concurrent_queue_base::copy_specifics op_type)
concurrent_queue_base::page page
static const size_t n_queue
Must be power of 2.

References __TBB_ASSERT, __TBB_CATCH, __TBB_RETHROW, __TBB_TRY, head_counter, head_page, tbb::internal::concurrent_queue_base_v3::items_per_page, make_copy(), make_invalid(), tbb::internal::modulo_power_of_two(), tbb::internal::concurrent_queue_rep::n_queue, tbb::internal::concurrent_queue_base_v3::page::next, tail_counter, and tail_page.

Referenced by tbb::internal::concurrent_queue_base_v3::internal_assign().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ make_copy()

concurrent_queue_base::page * tbb::internal::micro_queue::make_copy ( concurrent_queue_base base,
const page src_page,
size_t  begin_in_page,
size_t  end_in_page,
ticket g_index,
concurrent_queue_base::copy_specifics  op_type 
)

Definition at line 295 of file concurrent_queue.cpp.

298{
299 page* new_page = base.allocate_page();
300 new_page->next = NULL;
301 new_page->mask = src_page->mask;
302 for( ; begin_in_page!=end_in_page; ++begin_in_page, ++g_index )
303 if( new_page->mask & uintptr_t(1)<<begin_in_page ) {
304 if( concurrent_queue_base::copy == op_type ) {
305 base.copy_page_item( *new_page, begin_in_page, *src_page, begin_in_page );
306 } else {
307 __TBB_ASSERT( concurrent_queue_base::move == op_type, NULL );
308 static_cast<concurrent_queue_base_v8&>(base).move_page_item( *new_page, begin_in_page, *src_page, begin_in_page );
309 }
310 }
311 return new_page;
312}

References __TBB_ASSERT, tbb::internal::concurrent_queue_base_v3::allocate_page(), tbb::internal::concurrent_queue_base_v3::copy, tbb::internal::concurrent_queue_base_v3::copy_page_item(), tbb::internal::concurrent_queue_base_v3::page::mask, tbb::internal::concurrent_queue_base_v3::move, and tbb::internal::concurrent_queue_base_v3::page::next.

Referenced by assign().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ make_invalid()

void tbb::internal::micro_queue::make_invalid ( ticket  k)

Definition at line 314 of file concurrent_queue.cpp.

315{
316 static concurrent_queue_base::page dummy = {static_cast<page*>((void*)1), 0};
317 // mark it so that no more pushes are allowed.
318 static_invalid_page = &dummy;
319 {
322 if( page* q = tail_page )
323 q->next = static_cast<page*>(static_invalid_page);
324 else
325 head_page = static_cast<page*>(static_invalid_page);
326 tail_page = static_cast<page*>(static_invalid_page);
327 }
328}
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
static void * static_invalid_page
friend class scoped_lock
Definition: spin_mutex.h:179

References head_page, lock, tbb::internal::concurrent_queue_rep::n_queue, page_mutex, tbb::internal::static_invalid_page, tail_counter, and tail_page.

Referenced by assign(), and push().

Here is the caller graph for this function:

◆ pop()

bool tbb::internal::micro_queue::pop ( void dst,
ticket  k,
concurrent_queue_base base 
)

Definition at line 230 of file concurrent_queue.cpp.

230 {
234 page *p = head_page;
235 __TBB_ASSERT( p, NULL );
236 size_t index = modulo_power_of_two( k/concurrent_queue_rep::n_queue, base.items_per_page );
237 bool success = false;
238 {
239 micro_queue_pop_finalizer finalizer( *this, base, k+concurrent_queue_rep::n_queue, index==base.items_per_page-1 ? p : NULL );
240 if( p->mask & uintptr_t(1)<<index ) {
241 success = true;
242 ITT_NOTIFY( sync_acquired, dst );
243 ITT_NOTIFY( sync_acquired, head_page );
244 base.assign_and_destroy_item( dst, *p, index );
246 } else {
247 --base.my_rep->n_invalid_entries;
248 }
249 }
250 return success;
251}
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p sync_releasing
void const char const char int ITT_FORMAT __itt_group_sync p
#define ITT_NOTIFY(name, obj)
Definition: itt_notify.h:112
void spin_wait_until_eq(const volatile T &location, const U value)
Spin UNTIL the value of the variable is equal to a given value.
Definition: tbb_machine.h:399
void spin_wait_while_eq(const volatile T &location, U value)
Spin WHILE the value of the variable is equal to a given value.
Definition: tbb_machine.h:391

References __TBB_ASSERT, tbb::internal::concurrent_queue_base_v3::assign_and_destroy_item(), head_counter, head_page, tbb::internal::concurrent_queue_base_v3::items_per_page, ITT_NOTIFY, tbb::internal::modulo_power_of_two(), tbb::internal::concurrent_queue_base_v3::my_rep, tbb::internal::concurrent_queue_rep::n_invalid_entries, tbb::internal::concurrent_queue_rep::n_queue, p, tbb::internal::spin_wait_until_eq(), tbb::internal::spin_wait_while_eq(), sync_releasing, and tail_counter.

Referenced by tbb::internal::concurrent_queue_base_v3::internal_pop(), and tbb::internal::concurrent_queue_base_v3::internal_pop_if_present().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ push()

void tbb::internal::micro_queue::push ( const void item,
ticket  k,
concurrent_queue_base base,
concurrent_queue_base::copy_specifics  op_type 
)

Definition at line 161 of file concurrent_queue.cpp.

162 {
164 page* p = NULL;
165 // find index on page where we would put the data
166 size_t index = modulo_power_of_two( k/concurrent_queue_rep::n_queue, base.items_per_page );
167 if( !index ) { // make a new page
168 __TBB_TRY {
169 p = base.allocate_page();
170 } __TBB_CATCH(...) {
171 ++base.my_rep->n_invalid_entries;
172 make_invalid( k );
174 }
175 p->mask = 0;
176 p->next = NULL;
177 }
178
179 // wait for my turn
180 if( tail_counter!=k ) // The developer insisted on keeping first check out of the backoff loop
181 for( atomic_backoff b(true);;b.pause() ) {
183 if( tail==k ) break;
184 else if( tail&0x1 ) {
185 // no memory. throws an exception; assumes concurrent_queue_rep::n_queue>1
186 ++base.my_rep->n_invalid_entries;
188 }
189 }
190
191 if( p ) { // page is newly allocated; insert in micro_queue
193 if( page* q = tail_page )
194 q->next = p;
195 else
196 head_page = p;
197 tail_page = p;
198 }
199
200 if (item) {
201 p = tail_page;
202 ITT_NOTIFY( sync_acquired, p );
203 __TBB_TRY {
204 if( concurrent_queue_base::copy == op_type ) {
205 base.copy_item( *p, index, item );
206 } else {
207 __TBB_ASSERT( concurrent_queue_base::move == op_type, NULL );
208 static_cast<concurrent_queue_base_v8&>(base).move_item( *p, index, item );
209 }
210 } __TBB_CATCH(...) {
211 ++base.my_rep->n_invalid_entries;
214 }
216 // If no exception was thrown, mark item as present.
217 p->mask |= uintptr_t(1)<<index;
218 }
219 else // no item; this was called from abort_push
220 ++base.my_rep->n_invalid_entries;
221
223}
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type size_t void ITT_FORMAT p const __itt_domain __itt_id __itt_string_handle const wchar_t size_t ITT_FORMAT lu const __itt_domain __itt_id __itt_relation __itt_id tail
void throw_exception(exception_id eid)
Versionless convenience wrapper for throw_exception_v4()

References __TBB_ASSERT, __TBB_CATCH, __TBB_RETHROW, __TBB_TRY, tbb::internal::concurrent_queue_base_v3::allocate_page(), tbb::internal::concurrent_queue_base_v3::copy, tbb::internal::concurrent_queue_base_v3::copy_item(), tbb::internal::eid_bad_last_alloc, head_page, tbb::internal::concurrent_queue_base_v3::items_per_page, ITT_NOTIFY, lock, make_invalid(), tbb::internal::modulo_power_of_two(), tbb::internal::concurrent_queue_base_v3::move, tbb::internal::concurrent_queue_base_v3::my_rep, tbb::internal::concurrent_queue_rep::n_invalid_entries, tbb::internal::concurrent_queue_rep::n_queue, p, page_mutex, tbb::internal::atomic_backoff::pause(), sync_releasing, tail, tail_counter, tail_page, and tbb::internal::throw_exception().

Referenced by abort_push(), tbb::internal::concurrent_queue_base_v3::internal_insert_if_not_full(), and tbb::internal::concurrent_queue_base_v3::internal_insert_item().

Here is the call graph for this function:
Here is the caller graph for this function:

Friends And Related Function Documentation

◆ micro_queue_pop_finalizer

friend class micro_queue_pop_finalizer
friend

Definition at line 50 of file concurrent_queue.cpp.

Member Data Documentation

◆ head_counter

atomic<ticket> tbb::internal::micro_queue::head_counter

◆ head_page

◆ page_mutex

spin_mutex tbb::internal::micro_queue::page_mutex

◆ tail_counter

atomic<ticket> tbb::internal::micro_queue::tail_counter

Definition at line 56 of file concurrent_queue.cpp.

Referenced by assign(), make_invalid(), pop(), and push().

◆ tail_page


The documentation for this struct was generated from the following file:

Copyright © 2005-2020 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.