Intel(R) Threading Building Blocks Doxygen Documentation version 4.2.3
Loading...
Searching...
No Matches
_flow_graph_cache_impl.h
Go to the documentation of this file.
1/*
2 Copyright (c) 2005-2020 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17#ifndef __TBB__flow_graph_cache_impl_H
18#define __TBB__flow_graph_cache_impl_H
19
20#ifndef __TBB_flow_graph_H
21#error Do not #include this internal file directly; use public TBB headers instead.
22#endif
23
24// included in namespace tbb::flow::interfaceX (in flow_graph.h)
25
26namespace internal {
27
29template< typename T, typename M=spin_mutex >
31 public:
32
33 typedef size_t size_type;
34
35 bool empty() {
36 typename mutex_type::scoped_lock lock( my_mutex );
37 return internal_empty();
38 }
39
40 void add( T &n ) {
41 typename mutex_type::scoped_lock lock( my_mutex );
43 }
44
45 void remove( T &n ) {
46 typename mutex_type::scoped_lock lock( my_mutex );
47 for ( size_t i = internal_size(); i != 0; --i ) {
48 T &s = internal_pop();
49 if ( &s == &n ) return; // only remove one predecessor per request
51 }
52 }
53
54 void clear() {
55 while( !my_q.empty()) (void)my_q.pop();
56#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
57 my_built_predecessors.clear();
58#endif
59 }
60
61#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
62 typedef edge_container<T> built_predecessors_type;
63 built_predecessors_type &built_predecessors() { return my_built_predecessors; }
64
65 typedef typename edge_container<T>::edge_list_type predecessor_list_type;
66 void internal_add_built_predecessor( T &n ) {
67 typename mutex_type::scoped_lock lock( my_mutex );
68 my_built_predecessors.add_edge(n);
69 }
70
71 void internal_delete_built_predecessor( T &n ) {
72 typename mutex_type::scoped_lock lock( my_mutex );
73 my_built_predecessors.delete_edge(n);
74 }
75
76 void copy_predecessors( predecessor_list_type &v) {
77 typename mutex_type::scoped_lock lock( my_mutex );
78 my_built_predecessors.copy_edges(v);
79 }
80
81 size_t predecessor_count() {
82 typename mutex_type::scoped_lock lock(my_mutex);
83 return (size_t)(my_built_predecessors.edge_count());
84 }
85#endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
86
87protected:
88
89 typedef M mutex_type;
91 std::queue< T * > my_q;
92#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
93 built_predecessors_type my_built_predecessors;
94#endif
95
96 // Assumes lock is held
97 inline bool internal_empty( ) {
98 return my_q.empty();
99 }
100
101 // Assumes lock is held
103 return my_q.size();
104 }
105
106 // Assumes lock is held
107 inline void internal_push( T &n ) {
108 my_q.push(&n);
109 }
110
111 // Assumes lock is held
112 inline T &internal_pop() {
113 T *v = my_q.front();
114 my_q.pop();
115 return *v;
116 }
117
118};
119
121template< typename T, typename M=spin_mutex >
122#if __TBB_PREVIEW_ASYNC_MSG
123// TODO: make predecessor_cache type T-independent when async_msg becomes regular feature
124class predecessor_cache : public node_cache< untyped_sender, M > {
125#else
126class predecessor_cache : public node_cache< sender<T>, M > {
127#endif // __TBB_PREVIEW_ASYNC_MSG
128public:
129 typedef M mutex_type;
130 typedef T output_type;
131#if __TBB_PREVIEW_ASYNC_MSG
132 typedef untyped_sender predecessor_type;
133 typedef untyped_receiver successor_type;
134#else
135 typedef sender<output_type> predecessor_type;
136 typedef receiver<output_type> successor_type;
137#endif // __TBB_PREVIEW_ASYNC_MSG
138
140
141 void set_owner( successor_type *owner ) { my_owner = owner; }
142
144
145 bool msg = false;
146
147 do {
148 predecessor_type *src;
149 {
150 typename mutex_type::scoped_lock lock(this->my_mutex);
151 if ( this->internal_empty() ) {
152 break;
153 }
154 src = &this->internal_pop();
155 }
156
157 // Try to get from this sender
158 msg = src->try_get( v );
159
160 if (msg == false) {
161 // Relinquish ownership of the edge
162 if (my_owner)
163 src->register_successor( *my_owner );
164 } else {
165 // Retain ownership of the edge
166 this->add(*src);
167 }
168 } while ( msg == false );
169 return msg;
170 }
171
172 // If we are removing arcs (rf_clear_edges), call clear() rather than reset().
173 void reset() {
174 if (my_owner) {
175 for(;;) {
176 predecessor_type *src;
177 {
178 if (this->internal_empty()) break;
179 src = &this->internal_pop();
180 }
181 src->register_successor( *my_owner );
182 }
183 }
184 }
185
186protected:
187
188#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
189 using node_cache< predecessor_type, M >::my_built_predecessors;
190#endif
192};
193
195// TODO: make reservable_predecessor_cache type T-independent when async_msg becomes regular feature
196template< typename T, typename M=spin_mutex >
198public:
199 typedef M mutex_type;
200 typedef T output_type;
201#if __TBB_PREVIEW_ASYNC_MSG
202 typedef untyped_sender predecessor_type;
203 typedef untyped_receiver successor_type;
204#else
205 typedef sender<T> predecessor_type;
206 typedef receiver<T> successor_type;
207#endif // __TBB_PREVIEW_ASYNC_MSG
208
209 reservable_predecessor_cache( ) : reserved_src(NULL) { }
210
211 bool
213 bool msg = false;
214
215 do {
216 {
217 typename mutex_type::scoped_lock lock(this->my_mutex);
218 if ( reserved_src || this->internal_empty() )
219 return false;
220
221 reserved_src = &this->internal_pop();
222 }
223
224 // Try to get from this sender
225 msg = reserved_src->try_reserve( v );
226
227 if (msg == false) {
228 typename mutex_type::scoped_lock lock(this->my_mutex);
229 // Relinquish ownership of the edge
230 reserved_src->register_successor( *this->my_owner );
231 reserved_src = NULL;
232 } else {
233 // Retain ownership of the edge
234 this->add( *reserved_src );
235 }
236 } while ( msg == false );
237
238 return msg;
239 }
240
241 bool
243 reserved_src->try_release( );
244 reserved_src = NULL;
245 return true;
246 }
247
248 bool
250 reserved_src->try_consume( );
251 reserved_src = NULL;
252 return true;
253 }
254
255 void reset( ) {
256 reserved_src = NULL;
258 }
259
260 void clear() {
261 reserved_src = NULL;
263 }
264
265private:
267};
268
269
271// TODO: make successor_cache type T-independent when async_msg becomes regular feature
272template<typename T, typename M=spin_rw_mutex >
274protected:
275
276 typedef M mutex_type;
278
279#if __TBB_PREVIEW_ASYNC_MSG
280 typedef untyped_receiver successor_type;
281 typedef untyped_receiver *pointer_type;
282 typedef untyped_sender owner_type;
283#else
284 typedef receiver<T> successor_type;
285 typedef receiver<T> *pointer_type;
286 typedef sender<T> owner_type;
287#endif // __TBB_PREVIEW_ASYNC_MSG
288 typedef std::list< pointer_type > successors_type;
289#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
290 edge_container<successor_type> my_built_successors;
291#endif
293
295
296public:
297#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
298 typedef typename edge_container<successor_type>::edge_list_type successor_list_type;
299
300 edge_container<successor_type> &built_successors() { return my_built_successors; }
301
302 void internal_add_built_successor( successor_type &r) {
303 typename mutex_type::scoped_lock l(my_mutex, true);
304 my_built_successors.add_edge( r );
305 }
306
307 void internal_delete_built_successor( successor_type &r) {
308 typename mutex_type::scoped_lock l(my_mutex, true);
309 my_built_successors.delete_edge(r);
310 }
311
312 void copy_successors( successor_list_type &v) {
313 typename mutex_type::scoped_lock l(my_mutex, false);
314 my_built_successors.copy_edges(v);
315 }
316
317 size_t successor_count() {
318 typename mutex_type::scoped_lock l(my_mutex,false);
319 return my_built_successors.edge_count();
320 }
321
322#endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
323
324 successor_cache( ) : my_owner(NULL) {}
325
326 void set_owner( owner_type *owner ) { my_owner = owner; }
327
328 virtual ~successor_cache() {}
329
331 typename mutex_type::scoped_lock l(my_mutex, true);
332 my_successors.push_back( &r );
333 }
334
336 typename mutex_type::scoped_lock l(my_mutex, true);
337 for ( typename successors_type::iterator i = my_successors.begin();
338 i != my_successors.end(); ++i ) {
339 if ( *i == & r ) {
340 my_successors.erase(i);
341 break;
342 }
343 }
344 }
345
346 bool empty() {
347 typename mutex_type::scoped_lock l(my_mutex, false);
348 return my_successors.empty();
349 }
350
351 void clear() {
352 my_successors.clear();
353#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
354 my_built_successors.clear();
355#endif
356 }
357
358#if !__TBB_PREVIEW_ASYNC_MSG
359 virtual task * try_put_task( const T &t ) = 0;
360#endif // __TBB_PREVIEW_ASYNC_MSG
361 }; // successor_cache<T>
362
364template<typename M>
365class successor_cache< continue_msg, M > : tbb::internal::no_copy {
366protected:
367
368 typedef M mutex_type;
370
371#if __TBB_PREVIEW_ASYNC_MSG
372 typedef untyped_receiver successor_type;
373 typedef untyped_receiver *pointer_type;
374#else
375 typedef receiver<continue_msg> successor_type;
376 typedef receiver<continue_msg> *pointer_type;
377#endif // __TBB_PREVIEW_ASYNC_MSG
378 typedef std::list< pointer_type > successors_type;
380#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
381 edge_container<successor_type> my_built_successors;
382 typedef edge_container<successor_type>::edge_list_type successor_list_type;
383#endif
384
385 sender<continue_msg> *my_owner;
386
387public:
388
389#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
390
391 edge_container<successor_type> &built_successors() { return my_built_successors; }
392
393 void internal_add_built_successor( successor_type &r) {
394 typename mutex_type::scoped_lock l(my_mutex, true);
395 my_built_successors.add_edge( r );
396 }
397
398 void internal_delete_built_successor( successor_type &r) {
399 typename mutex_type::scoped_lock l(my_mutex, true);
400 my_built_successors.delete_edge(r);
401 }
402
403 void copy_successors( successor_list_type &v) {
404 typename mutex_type::scoped_lock l(my_mutex, false);
405 my_built_successors.copy_edges(v);
406 }
407
408 size_t successor_count() {
409 typename mutex_type::scoped_lock l(my_mutex,false);
410 return my_built_successors.edge_count();
411 }
412
413#endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
414
415 successor_cache( ) : my_owner(NULL) {}
416
417 void set_owner( sender<continue_msg> *owner ) { my_owner = owner; }
418
419 virtual ~successor_cache() {}
420
422 typename mutex_type::scoped_lock l(my_mutex, true);
423 my_successors.push_back( &r );
424 if ( my_owner && r.is_continue_receiver() ) {
425 r.register_predecessor( *my_owner );
426 }
427 }
428
430 typename mutex_type::scoped_lock l(my_mutex, true);
431 for ( successors_type::iterator i = my_successors.begin();
432 i != my_successors.end(); ++i ) {
433 if ( *i == & r ) {
434 // TODO: Check if we need to test for continue_receiver before
435 // removing from r.
436 if ( my_owner )
437 r.remove_predecessor( *my_owner );
438 my_successors.erase(i);
439 break;
440 }
441 }
442 }
443
444 bool empty() {
445 typename mutex_type::scoped_lock l(my_mutex, false);
446 return my_successors.empty();
447 }
448
449 void clear() {
450 my_successors.clear();
451#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
452 my_built_successors.clear();
453#endif
454 }
455
456#if !__TBB_PREVIEW_ASYNC_MSG
457 virtual task * try_put_task( const continue_msg &t ) = 0;
458#endif // __TBB_PREVIEW_ASYNC_MSG
459
460}; // successor_cache< continue_msg >
461
463// TODO: make broadcast_cache type T-independent when async_msg becomes regular feature
464template<typename T, typename M=spin_rw_mutex>
465class broadcast_cache : public successor_cache<T, M> {
466 typedef M mutex_type;
468
469public:
470
472
473 // as above, but call try_put_task instead, and return the last task we received (if any)
474#if __TBB_PREVIEW_ASYNC_MSG
475 template<typename X>
476 task * try_put_task( const X &t ) {
477#else
479#endif // __TBB_PREVIEW_ASYNC_MSG
480 task * last_task = NULL;
481 bool upgraded = true;
482 typename mutex_type::scoped_lock l(this->my_mutex, upgraded);
483 typename successors_type::iterator i = this->my_successors.begin();
484 while ( i != this->my_successors.end() ) {
485 task *new_task = (*i)->try_put_task(t);
486 // workaround for icc bug
487 graph& graph_ref = (*i)->graph_reference();
488 last_task = combine_tasks(graph_ref, last_task, new_task); // enqueue if necessary
489 if(new_task) {
490 ++i;
491 }
492 else { // failed
493 if ( (*i)->register_predecessor(*this->my_owner) ) {
494 if (!upgraded) {
495 l.upgrade_to_writer();
496 upgraded = true;
497 }
498 i = this->my_successors.erase(i);
499 } else {
500 ++i;
501 }
502 }
503 }
504 return last_task;
505 }
506
507 // call try_put_task and return list of received tasks
508#if __TBB_PREVIEW_ASYNC_MSG
509 template<typename X>
510 bool gather_successful_try_puts( const X &t, task_list &tasks ) {
511#else
512 bool gather_successful_try_puts( const T &t, task_list &tasks ) {
513#endif // __TBB_PREVIEW_ASYNC_MSG
514 bool upgraded = true;
515 bool is_at_least_one_put_successful = false;
516 typename mutex_type::scoped_lock l(this->my_mutex, upgraded);
517 typename successors_type::iterator i = this->my_successors.begin();
518 while ( i != this->my_successors.end() ) {
519 task * new_task = (*i)->try_put_task(t);
520 if(new_task) {
521 ++i;
522 if(new_task != SUCCESSFULLY_ENQUEUED) {
523 tasks.push_back(*new_task);
524 }
525 is_at_least_one_put_successful = true;
526 }
527 else { // failed
528 if ( (*i)->register_predecessor(*this->my_owner) ) {
529 if (!upgraded) {
530 l.upgrade_to_writer();
531 upgraded = true;
532 }
533 i = this->my_successors.erase(i);
534 } else {
535 ++i;
536 }
537 }
538 }
539 return is_at_least_one_put_successful;
540 }
541};
542
544// TODO: make round_robin_cache type T-independent when async_msg becomes regular feature
545template<typename T, typename M=spin_rw_mutex >
546class round_robin_cache : public successor_cache<T, M> {
547 typedef size_t size_type;
548 typedef M mutex_type;
550
551public:
552
554
556 typename mutex_type::scoped_lock l(this->my_mutex, false);
557 return this->my_successors.size();
558 }
559
560#if __TBB_PREVIEW_ASYNC_MSG
561 template<typename X>
562 task * try_put_task( const X &t ) {
563#else
565#endif // __TBB_PREVIEW_ASYNC_MSG
566 bool upgraded = true;
567 typename mutex_type::scoped_lock l(this->my_mutex, upgraded);
568 typename successors_type::iterator i = this->my_successors.begin();
569 while ( i != this->my_successors.end() ) {
570 task *new_task = (*i)->try_put_task(t);
571 if ( new_task ) {
572 return new_task;
573 } else {
574 if ( (*i)->register_predecessor(*this->my_owner) ) {
575 if (!upgraded) {
576 l.upgrade_to_writer();
577 upgraded = true;
578 }
579 i = this->my_successors.erase(i);
580 }
581 else {
582 ++i;
583 }
584 }
585 }
586 return NULL;
587 }
588};
589
590} // namespace internal
591
592#endif // __TBB__flow_graph_cache_impl_H
#define __TBB_override
Definition: tbb_stddef.h:240
void const char const char int ITT_FORMAT __itt_group_sync s
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task * task
A node_cache maintains a std::queue of elements of type T. Each operation is protected by a lock.
A cache of predecessors that only supports try_get.
sender< output_type > predecessor_type
receiver< output_type > successor_type
void set_owner(successor_type *owner)
An cache of predecessors that supports requests and reservations.
An abstract cache of successors.
void register_successor(successor_type &r)
void set_owner(owner_type *owner)
void remove_successor(successor_type &r)
std::list< pointer_type > successors_type
virtual task * try_put_task(const T &t)=0
void set_owner(sender< continue_msg > *owner)
virtual task * try_put_task(const continue_msg &t)=0
A cache of successors that are broadcast to.
successor_cache< T, M >::successors_type successors_type
task * try_put_task(const T &t) __TBB_override
bool gather_successful_try_puts(const T &t, task_list &tasks)
A cache of successors that are put in a round-robin fashion.
successor_cache< T, M >::successors_type successors_type
task * try_put_task(const T &t) __TBB_override
Base class for types that should not be copied or assigned.
Definition: tbb_stddef.h:330

Copyright © 2005-2020 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.