Intel(R) Threading Building Blocks Doxygen Documentation version 4.2.3
observer_proxy.cpp
Go to the documentation of this file.
1/*
2 Copyright (c) 2005-2020 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17#include "tbb/tbb_config.h"
18
19#if __TBB_SCHEDULER_OBSERVER
20
21#include "observer_proxy.h"
22#include "tbb_main.h"
23#include "governor.h"
24#include "scheduler.h"
25#include "arena.h"
26
27namespace tbb {
28namespace internal {
29
30padded<observer_list> the_global_observer_list;
31
32#if TBB_USE_ASSERT
33static atomic<int> observer_proxy_count;
34
35struct check_observer_proxy_count {
36 ~check_observer_proxy_count() {
37 if( observer_proxy_count!=0 ) {
38 runtime_warning( "Leaked %ld observer_proxy objects\n", long(observer_proxy_count) );
39 }
40 }
41};
42
43static check_observer_proxy_count the_check_observer_proxy_count;
44#endif /* TBB_USE_ASSERT */
45
46#if __TBB_ARENA_OBSERVER
47interface6::task_scheduler_observer* observer_proxy::get_v6_observer() {
48 if(my_version != 6) return NULL;
49 return static_cast<interface6::task_scheduler_observer*>(my_observer);
50}
51#endif
52
53#if __TBB_ARENA_OBSERVER
54bool observer_proxy::is_global() {
55 return !get_v6_observer() || get_v6_observer()->my_context_tag == interface6::task_scheduler_observer::global_tag;
56}
57#endif /* __TBB_ARENA_OBSERVER */
58
59observer_proxy::observer_proxy( task_scheduler_observer_v3& tso )
60 : my_list(NULL), my_next(NULL), my_prev(NULL), my_observer(&tso)
61{
62#if TBB_USE_ASSERT
63 ++observer_proxy_count;
64#endif /* TBB_USE_ASSERT */
65 // 1 for observer
66 my_ref_count = 1;
67 my_version =
68#if __TBB_ARENA_OBSERVER
69 load<relaxed>(my_observer->my_busy_count)
70 == interface6::task_scheduler_observer::v6_trait ? 6 :
71#endif
72 0;
73 __TBB_ASSERT( my_version >= 6 || !load<relaxed>(my_observer->my_busy_count), NULL );
74}
75
76#if TBB_USE_ASSERT
77observer_proxy::~observer_proxy () {
78 __TBB_ASSERT( !my_ref_count, "Attempt to destroy proxy still in use" );
79 poison_value(my_ref_count);
80 poison_pointer(my_prev);
81 poison_pointer(my_next);
82 --observer_proxy_count;
83}
84#endif /* TBB_USE_ASSERT */
85
86template<memory_semantics M, class T, class V>
87T atomic_fetch_and_store ( T* addr, const V& val ) {
88 return (T)atomic_traits<sizeof(T), M>::fetch_and_store( addr, (T)val );
89}
90
91void observer_list::clear () {
92 __TBB_ASSERT( this != &the_global_observer_list, "Method clear() cannot be used on the list of global observers" );
93 // Though the method will work fine for the empty list, we require the caller
94 // to check for the list emptiness before invoking it to avoid extra overhead.
95 __TBB_ASSERT( !empty(), NULL );
96 {
97 scoped_lock lock(mutex(), /*is_writer=*/true);
98 observer_proxy *next = my_head;
99 while ( observer_proxy *p = next ) {
100 __TBB_ASSERT( p->my_version >= 6, NULL );
101 next = p->my_next;
102 // Both proxy p and observer p->my_observer (if non-null) are guaranteed
103 // to be alive while the list is locked.
104 task_scheduler_observer_v3 *obs = p->my_observer;
105 // Make sure that possible concurrent observer destruction does not
106 // conflict with the proxy list cleanup.
107 if ( !obs || !(p = (observer_proxy*)__TBB_FetchAndStoreW(&obs->my_proxy, 0)) )
108 continue;
109 // accessing 'obs' after detaching of obs->my_proxy leads to the race with observer destruction
110 __TBB_ASSERT( !next || p == next->my_prev, NULL );
111 __TBB_ASSERT( is_alive(p->my_ref_count), "Observer's proxy died prematurely" );
112 __TBB_ASSERT( p->my_ref_count == 1, "Reference for observer is missing" );
113#if TBB_USE_ASSERT
114 p->my_observer = NULL;
115 p->my_ref_count = 0;
116#endif /* TBB_USE_ASSERT */
117 remove(p);
118 delete p;
119 }
120 }
121 while( my_head )
122 __TBB_Yield();
123}
124
125void observer_list::insert ( observer_proxy* p ) {
126 scoped_lock lock(mutex(), /*is_writer=*/true);
127 if ( my_head ) {
128 p->my_prev = my_tail;
129 my_tail->my_next = p;
130 }
131 else
132 my_head = p;
133 my_tail = p;
134}
135
136void observer_list::remove ( observer_proxy* p ) {
137 __TBB_ASSERT( my_head, "Attempt to remove an item from an empty list" );
138 __TBB_ASSERT( !my_tail->my_next, "Last item's my_next must be NULL" );
139 if( p == my_tail ) {
140 __TBB_ASSERT( !p->my_next, NULL );
141 my_tail = p->my_prev;
142 }
143 else {
144 __TBB_ASSERT( p->my_next, NULL );
145 p->my_next->my_prev = p->my_prev;
146 }
147 if ( p == my_head ) {
148 __TBB_ASSERT( !p->my_prev, NULL );
149 my_head = p->my_next;
150 }
151 else {
152 __TBB_ASSERT( p->my_prev, NULL );
153 p->my_prev->my_next = p->my_next;
154 }
155 __TBB_ASSERT( (my_head && my_tail) || (!my_head && !my_tail), NULL );
156}
157
158void observer_list::remove_ref( observer_proxy* p ) {
159 int r = p->my_ref_count;
160 __TBB_ASSERT( is_alive(r), NULL );
161 while(r>1) {
162 __TBB_ASSERT( r!=0, NULL );
163 int r_old = p->my_ref_count.compare_and_swap(r-1,r);
164 if( r_old==r ) {
165 // Successfully decremented count.
166 return;
167 }
168 r = r_old;
169 }
170 __TBB_ASSERT( r==1, NULL );
171 // Reference count might go to zero
172 {
173 // Use lock to avoid resurrection by a thread concurrently walking the list
174 observer_list::scoped_lock lock(mutex(), /*is_writer=*/true);
175 r = --p->my_ref_count;
176 if( !r )
177 remove(p);
178 }
179 __TBB_ASSERT( r || !p->my_ref_count, NULL );
180 if( !r )
181 delete p;
182}
183
184void observer_list::do_notify_entry_observers( observer_proxy*& last, bool worker ) {
185 // Pointer p marches though the list from last (exclusively) to the end.
186 observer_proxy *p = last, *prev = p;
187 for(;;) {
188 task_scheduler_observer_v3* tso=NULL;
189 // Hold lock on list only long enough to advance to the next proxy in the list.
190 {
191 scoped_lock lock(mutex(), /*is_writer=*/false);
192 do {
193 if( p ) {
194 // We were already processing the list.
195 if( observer_proxy* q = p->my_next ) {
196 if( p == prev )
197 remove_ref_fast(prev); // sets prev to NULL if successful
198 p = q;
199 }
200 else {
201 // Reached the end of the list.
202 if( p == prev ) {
203 // Keep the reference as we store the 'last' pointer in scheduler
204 __TBB_ASSERT(p->my_ref_count >= 1 + (p->my_observer?1:0), NULL);
205 } else {
206 // The last few proxies were empty
207 __TBB_ASSERT(p->my_ref_count, NULL);
208 ++p->my_ref_count;
209 if( prev ) {
210 lock.release();
211 remove_ref(prev);
212 }
213 }
214 last = p;
215 return;
216 }
217 } else {
218 // Starting pass through the list
219 p = my_head;
220 if( !p )
221 return;
222 }
223 tso = p->my_observer;
224 } while( !tso );
225 ++p->my_ref_count;
226 ++tso->my_busy_count;
227 }
228 __TBB_ASSERT( !prev || p!=prev, NULL );
229 // Release the proxy pinned before p
230 if( prev )
231 remove_ref(prev);
232 // Do not hold any locks on the list while calling user's code.
233 // Do not intercept any exceptions that may escape the callback so that
234 // they are either handled by the TBB scheduler or passed to the debugger.
235 tso->on_scheduler_entry(worker);
236 __TBB_ASSERT(p->my_ref_count, NULL);
237 intptr_t bc = --tso->my_busy_count;
238 __TBB_ASSERT_EX( bc>=0, "my_busy_count underflowed" );
239 prev = p;
240 }
241}
242
243void observer_list::do_notify_exit_observers( observer_proxy* last, bool worker ) {
244 // Pointer p marches though the list from the beginning to last (inclusively).
245 observer_proxy *p = NULL, *prev = NULL;
246 for(;;) {
247 task_scheduler_observer_v3* tso=NULL;
248 // Hold lock on list only long enough to advance to the next proxy in the list.
249 {
250 scoped_lock lock(mutex(), /*is_writer=*/false);
251 do {
252 if( p ) {
253 // We were already processing the list.
254 if( p != last ) {
255 __TBB_ASSERT( p->my_next, "List items before 'last' must have valid my_next pointer" );
256 if( p == prev )
257 remove_ref_fast(prev); // sets prev to NULL if successful
258 p = p->my_next;
259 } else {
260 // remove the reference from the last item
261 remove_ref_fast(p);
262 if( p ) {
263 lock.release();
264 remove_ref(p);
265 }
266 return;
267 }
268 } else {
269 // Starting pass through the list
270 p = my_head;
271 __TBB_ASSERT( p, "Nonzero 'last' must guarantee that the global list is non-empty" );
272 }
273 tso = p->my_observer;
274 } while( !tso );
275 // The item is already refcounted
276 if ( p != last ) // the last is already referenced since entry notification
277 ++p->my_ref_count;
278 ++tso->my_busy_count;
279 }
280 __TBB_ASSERT( !prev || p!=prev, NULL );
281 if( prev )
282 remove_ref(prev);
283 // Do not hold any locks on the list while calling user's code.
284 // Do not intercept any exceptions that may escape the callback so that
285 // they are either handled by the TBB scheduler or passed to the debugger.
286 tso->on_scheduler_exit(worker);
287 __TBB_ASSERT(p->my_ref_count || p == last, NULL);
288 intptr_t bc = --tso->my_busy_count;
289 __TBB_ASSERT_EX( bc>=0, "my_busy_count underflowed" );
290 prev = p;
291 }
292}
293
294void task_scheduler_observer_v3::observe( bool enable ) {
295 if( enable ) {
296 if( !my_proxy ) {
297 my_proxy = new observer_proxy( *this );
298 my_busy_count = 0; // proxy stores versioning information, clear it
299#if __TBB_ARENA_OBSERVER
300 if ( !my_proxy->is_global() ) {
301 // Local observer activation
302 generic_scheduler* s = governor::local_scheduler_if_initialized();
303 __TBB_ASSERT( my_proxy->get_v6_observer(), NULL );
304 intptr_t tag = my_proxy->get_v6_observer()->my_context_tag;
305 if( tag != interface6::task_scheduler_observer::implicit_tag ) { // explicit arena
306 task_arena *a = reinterpret_cast<task_arena*>(tag);
307 if ( a->my_arena==NULL ) // Avoid recursion during arena initialization
308 a->initialize();
309 my_proxy->my_list = &a->my_arena->my_observers;
310 } else {
311 if( !(s && s->my_arena) )
312 s = governor::init_scheduler( task_scheduler_init::automatic, 0, true );
313 __TBB_ASSERT( __TBB_InitOnce::initialization_done(), NULL );
314 __TBB_ASSERT( s && s->my_arena, NULL );
315 my_proxy->my_list = &s->my_arena->my_observers;
316 }
317 my_proxy->my_list->insert(my_proxy);
318 // Notify newly activated observer and other pending ones if it belongs to current arena
319 if(s && &s->my_arena->my_observers == my_proxy->my_list )
320 my_proxy->my_list->notify_entry_observers( s->my_last_local_observer, s->is_worker() );
321 } else
322#endif /* __TBB_ARENA_OBSERVER */
323 {
324 // Obsolete. Global observer activation
325 if( !__TBB_InitOnce::initialization_done() )
327 my_proxy->my_list = &the_global_observer_list;
328 my_proxy->my_list->insert(my_proxy);
329 if( generic_scheduler* s = governor::local_scheduler_if_initialized() ) {
330 // Notify newly created observer of its own thread.
331 // Any other pending observers are notified too.
332 the_global_observer_list.notify_entry_observers( s->my_last_global_observer, s->is_worker() );
333 }
334 }
335 }
336 } else {
337 // Make sure that possible concurrent proxy list cleanup does not conflict
338 // with the observer destruction here.
339 if ( observer_proxy* proxy = (observer_proxy*)__TBB_FetchAndStoreW(&my_proxy, 0) ) {
340 // List destruction should not touch this proxy after we've won the above interlocked exchange.
341 __TBB_ASSERT( proxy->my_observer == this, NULL );
342 __TBB_ASSERT( is_alive(proxy->my_ref_count), "Observer's proxy died prematurely" );
343 __TBB_ASSERT( proxy->my_ref_count >= 1, "reference for observer missing" );
344 observer_list &list = *proxy->my_list;
345 {
346 // Ensure that none of the list walkers relies on observer pointer validity
347 observer_list::scoped_lock lock(list.mutex(), /*is_writer=*/true);
348 proxy->my_observer = NULL;
349 // Proxy may still be held by other threads (to track the last notified observer)
350 if( !--proxy->my_ref_count ) {// nobody can increase it under exclusive lock
351 list.remove(proxy);
352 __TBB_ASSERT( !proxy->my_ref_count, NULL );
353 delete proxy;
354 }
355 }
356 while( my_busy_count ) // other threads are still accessing the callback
357 __TBB_Yield();
358 }
359 }
360}
361
362} // namespace internal
363} // namespace tbb
364
365#endif /* __TBB_SCHEDULER_OBSERVER */
#define __TBB_ASSERT_EX(predicate, comment)
"Extended" version is useful to suppress warnings if a variable is only used with an assert
Definition: tbb_stddef.h:167
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165
#define __TBB_Yield()
Definition: ibm_aix51.h:44
void const char const char int ITT_FORMAT __itt_group_sync s
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
void * addr
void const char const char int ITT_FORMAT __itt_group_sync p
#define poison_value(g)
The graph class.
void __TBB_EXPORTED_FUNC runtime_warning(const char *format,...)
Report a runtime warning.
void poison_pointer(T *__TBB_atomic &)
Definition: tbb_stddef.h:305
void DoOneTimeInitializations()
Performs thread-safe lazy one-time general TBB initialization.
Definition: tbb_main.cpp:215
auto last(Container &c) -> decltype(begin(c))

Copyright © 2005-2020 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.