17 #ifndef __TBB_pipeline_H 18 #define __TBB_pipeline_H 25 #if __TBB_CPP11_TYPE_PROPERTIES_PRESENT || __TBB_TR1_TYPE_PROPERTIES_IN_STD_PRESENT 26 #include <type_traits> 38 #define __TBB_PIPELINE_VERSION(x) ((unsigned char)(x-2)<<1) 49 namespace interface6 {
50 template<
typename T,
typename U>
class filter_t;
67 static const unsigned char filter_is_serial = 0x1;
72 static const unsigned char filter_is_out_of_order = 0x1<<4;
75 static const unsigned char filter_is_bound = 0x1<<5;
78 static const unsigned char filter_may_emit_null = 0x1<<6;
81 static const unsigned char exact_exception_propagation =
82 #if TBB_USE_CAPTURED_EXCEPTION 89 static const unsigned char version_mask = 0x7<<1;
93 parallel = current_version | filter_is_out_of_order,
95 serial_in_order = current_version | filter_is_serial,
97 serial_out_of_order = current_version | filter_is_serial | filter_is_out_of_order,
103 next_filter_in_pipeline(not_in_pipeline()),
104 my_input_buffer(NULL),
105 my_filter_mode(static_cast<unsigned char>((is_serial_ ?
serial : parallel) | exact_exception_propagation)),
106 prev_filter_in_pipeline(not_in_pipeline()),
112 next_filter_in_pipeline(not_in_pipeline()),
113 my_input_buffer(NULL),
114 my_filter_mode(static_cast<unsigned char>(filter_mode | exact_exception_propagation)),
115 prev_filter_in_pipeline(not_in_pipeline()),
126 return bool( my_filter_mode & filter_is_serial );
131 return (my_filter_mode & (filter_is_out_of_order|filter_is_serial))==filter_is_serial;
136 return ( my_filter_mode & filter_is_bound )==filter_is_bound;
141 return ( my_filter_mode & filter_may_emit_null ) == filter_may_emit_null;
146 virtual void* operator()(
void* item ) = 0;
152 #if __TBB_TASK_GROUP_CONTEXT 156 virtual void finalize(
void* ) {};
166 bool has_more_work();
172 friend class internal::stage_task;
173 friend class internal::pipeline_root_task;
227 result_type internal_process_item(
bool is_blocking);
247 #if __TBB_TASK_GROUP_CONTEXT 256 friend class internal::stage_task;
257 friend class internal::pipeline_root_task;
260 friend class internal::pipeline_cleaner;
285 void remove_filter(
filter& filter_ );
290 #if __TBB_TASK_GROUP_CONTEXT 291 void clear_filters();
300 namespace interface6 {
310 template<
typename T,
typename U,
typename Body>
friend class internal::concrete_filter;
312 void stop() { is_pipeline_stopped =
true; }
321 #if __TBB_CPP11_TYPE_PROPERTIES_PRESENT 323 #elif __TBB_TR1_TYPE_PROPERTIES_IN_STD_PRESENT 337 #endif // Obtaining type properties 351 pointer output_t = allocator().allocate(1);
352 return new (output_t) T(source);
354 static value_type &
token(pointer & t) {
return *t;}
358 allocator().destroy(token);
359 allocator().deallocate(token,1);
369 static pointer
create_token(
const value_type & source) {
return source; }
370 static value_type &
token(pointer & t) {
return t;}
382 } type_to_void_ptr_map;
388 static value_type &
token(pointer & t) {
return t;}
390 type_to_void_ptr_map mymap;
391 mymap.void_overlay = NULL;
392 mymap.actual_value = ref;
393 return mymap.void_overlay;
396 type_to_void_ptr_map mymap;
397 mymap.void_overlay = ref;
398 return mymap.actual_value;
403 template<
typename T,
typename U,
typename Body>
412 t_pointer temp_input = t_helper::cast_from_void_ptr(input);
413 u_pointer output_u = u_helper::create_token(my_body(t_helper::token(temp_input)));
414 t_helper::destroy_token(temp_input);
415 return u_helper::cast_to_void_ptr(output_u);
419 t_pointer temp_input = t_helper::cast_from_void_ptr(input);
420 t_helper::destroy_token(temp_input);
428 template<
typename U,
typename Body>
436 u_pointer output_u = u_helper::create_token(my_body(control));
438 u_helper::destroy_token(output_u);
442 return u_helper::cast_to_void_ptr(output_u);
452 template<
typename T,
typename Body>
459 t_pointer temp_input = t_helper::cast_from_void_ptr(input);
460 my_body(t_helper::token(temp_input));
461 t_helper::destroy_token(temp_input);
465 t_pointer temp_input = t_helper::cast_from_void_ptr(input);
466 t_helper::destroy_token(temp_input);
473 template<
typename Body>
509 #ifdef __TBB_TEST_FILTER_NODE_COUNT 510 ++(__TBB_TEST_FILTER_NODE_COUNT);
515 virtual void add_to(
pipeline& ) = 0;
525 #ifdef __TBB_TEST_FILTER_NODE_COUNT 526 --(__TBB_TEST_FILTER_NODE_COUNT);
532 template<
typename T,
typename U,
typename Body>
568 template<
typename T,
typename U,
typename Body>
570 return new internal::filter_node_leaf<T,U,Body>(
mode, body);
573 template<
typename T,
typename V,
typename U>
575 __TBB_ASSERT(left.
root,
"cannot use default-constructed filter_t as left argument of '&'");
576 __TBB_ASSERT(right.
root,
"cannot use default-constructed filter_t as right argument of '&'");
577 return new internal::filter_node_join(*left.
root,*right.
root);
581 template<
typename T,
typename U>
588 friend class internal::pipeline_proxy;
589 template<
typename T_,
typename U_,
typename Body>
591 template<
typename T_,
typename V_,
typename U_>
597 if( root ) root->add_ref();
599 template<
typename Body>
601 root( new internal::filter_node_leaf<T,U,Body>(mode, body) ) {
608 filter_node* old = root;
610 if( root ) root->add_ref();
611 if( old ) old->remove_ref();
614 if( root ) root->remove_ref();
619 filter_node* old = root;
627 __TBB_ASSERT( filter_chain.
root,
"cannot apply parallel_pipeline to default-constructed filter_t" );
636 internal::pipeline_proxy pipe(filter_chain);
638 pipe->run(max_number_of_live_tokens
639 #
if __TBB_TASK_GROUP_CONTEXT
645 #if __TBB_TASK_GROUP_CONTEXT 650 #endif // __TBB_TASK_GROUP_CONTEXT
token_helper< U, is_large_object< U >::value > u_helper
void * operator()(void *input) __TBB_override
Operate on an item from the input stream, and return item for output stream.
bool is_bound() const
True if filter is thread-bound.
concrete_filter(tbb::filter::mode filter_mode, const Body &body)
token_helper< U, is_large_object< U >::value > u_helper
static pointer create_token(const value_type &source)
bool has_thread_bound_filters
True if the pipeline contains a thread-bound filter; false otherwise.
#define __TBB_EXPORTED_METHOD
t_helper::pointer t_pointer
static value_type & token(pointer &t)
static pointer cast_from_void_ptr(void *ref)
filter_t(const filter_t< T, U > &rhs)
virtual void add_to(pipeline &)=0
Add concrete_filter to pipeline.
static const unsigned char filter_is_serial
The lowest bit 0 is for parallel vs. serial.
static void destroy_token(pointer)
filter_node_join(filter_node &x, filter_node &y)
static pointer create_token(const value_type &source)
const unsigned char my_filter_mode
Storage for filter mode and dynamically checked implementation version.
filter_t< T, U > operator &(const filter_t< T, V > &left, const filter_t< V, U > &right)
thread_bound_filter(mode filter_mode)
Node in parse tree representing join of two filters.
static value_type & token(pointer &t)
The class that represents an object of the pipeline for parallel_pipeline().
bool is_ordered() const
True if filter must receive stream in order.
pipeline * my_pipeline
Pointer to the pipeline.
token_helper< T, is_large_object< T >::value > t_helper
void * operator()(void *input) __TBB_override
Operate on an item from the input stream, and return item for output stream.
#define __TBB_PIPELINE_VERSION(x)
internal::input_buffer * my_input_buffer
Buffer for incoming tokens, or NULL if not required.
void add_to(pipeline &p) __TBB_override
Add concrete_filter to pipeline.
void add_to(pipeline &p) __TBB_override
Add concrete_filter to pipeline.
void add_ref()
Increment reference count.
filter_node_leaf(tbb::filter::mode m, const Body &b)
filter * next_filter_in_pipeline
Pointer to next filter in the pipeline.
void parallel_pipeline(size_t max_number_of_live_tokens, const filter_t< void, void > &filter_chain, tbb::task_group_context &context)
void finalize(void *input) __TBB_override
Destroys item if pipeline was cancelled.
t_helper::pointer t_pointer
void remove_ref()
Decrement reference count and delete if it becomes zero.
filter * filter_end
Pointer to location where address of next filter to be added should be stored.
void * operator()(void *) __TBB_override
filter_t< T, U > make_filter(tbb::filter::mode mode, const Body &body)
Create a filter to participate in parallel_pipeline.
tbb::pipeline * operator->()
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long value
concrete_filter(tbb::filter::mode filter_mode, const Body &body)
filter_t(tbb::filter::mode mode, const Body &body)
u_helper::pointer u_pointer
void finalize(void *input) __TBB_override
Destroys item if pipeline was cancelled.
A processing pipeline that applies filters to items.
Node in parse tree representing result of make_filter.
Class representing a chain of type-safe pipeline filters.
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
u_helper::pointer u_pointer
atomic< internal::Token > input_tokens
Number of idle tokens waiting for input stage.
void * operator()(void *) __TBB_override
Operate on an item from the input stream, and return item for output stream.
A buffer of input items for a filter.
#define __TBB_TASK_GROUP_CONTEXT
static value_type & token(pointer &t)
concrete_filter(tbb::filter::mode filter_mode, const Body &body)
filter * next_segment
Pointer to the next "segment" of filters, or NULL if not required.
input_filter control to signal end-of-input for parallel_pipeline
internal::filter_node filter_node
Used to form groups of tasks.
static void * cast_to_void_ptr(pointer ref)
A stage in a pipeline served by a user thread.
Meets "allocator" requirements of ISO C++ Standard, Section 20.1.5.
Abstract base class that represents a node in a parse tree underlying a filter_t. ...
concrete_filter(filter::mode filter_mode, const Body &body)
static filter * not_in_pipeline()
Value used to mark "not in pipeline".
static void * cast_to_void_ptr(pointer ref)
atomic< internal::Token > token_counter
Global counter of tokens.
filter_t(filter_node *root_)
Base class for types that should not be copied or assigned.
task * end_counter
task who's reference count is used to determine when all stages are done.
static void destroy_token(pointer token)
tbb::atomic< intptr_t > ref_count
filter * prev_filter_in_pipeline
Pointer to previous filter in the pipeline.
void operator=(const filter_t< T, U > &rhs)
Base class for user-defined tasks.
static pointer cast_from_void_ptr(void *ref)
filter * filter_list
Pointer to first filter in the pipeline.
bool is_serial() const
True if filter is serial.
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type size_t void ITT_FORMAT p const __itt_domain __itt_id __itt_string_handle const wchar_t size_t ITT_FORMAT lu const __itt_domain __itt_id __itt_relation __itt_id ITT_FORMAT p const wchar_t int ITT_FORMAT __itt_group_mark d int
void const char const char int ITT_FORMAT __itt_group_sync p
const tbb::filter::mode mode
static pointer create_token(const value_type &source)
static pointer cast_from_void_ptr(void *ref)
token_helper< T, is_large_object< T >::value > t_helper
tbb::tbb_allocator< T > allocator
bool object_may_be_null()
true if an input filter can emit null
bool end_of_input
False until fetch_input returns NULL.
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t mode
static void * cast_to_void_ptr(pointer ref)
static void destroy_token(pointer)