36 #include <sys/types.h>
37 #include <sys/socket.h>
39 #include <sys/ioctl.h>
40 #include <netinet/in.h>
48 #include <arpa/inet.h>
55 #include <qb/qbipc_common.h>
63 #define MESSAGE_REQ_SYNC_BARRIER 0
64 #define MESSAGE_REQ_SYNC_SERVICE_BUILD 1
80 const unsigned int *trans_list,
81 size_t trans_list_entries,
82 const unsigned int *member_list,
83 size_t member_list_entries,
113 static int my_processing_idx = 0;
123 static size_t my_member_list_entries = 0;
125 static size_t my_trans_list_entries = 0;
127 static int my_processor_list_entries = 0;
131 static int my_service_list_entries = 0;
133 static void (*sync_synchronization_completed) (void);
135 static void sync_deliver_fn (
138 unsigned int msg_len,
139 int endian_conversion_required);
141 static int schedwrk_processor (
const void *context);
143 static void sync_process_enter (
void);
145 static void sync_process_call_init (
void);
152 static void *sync_group_handle;
159 int (*sync_callbacks_retrieve) (
162 void (*synchronization_completed) (
void))
172 "Couldn't initialize groups interface.");
185 sync_synchronization_completed = synchronization_completed;
191 static void sync_barrier_handler (
unsigned int nodeid,
const void *msg)
195 int barrier_reached = 1;
203 for (i = 0; i < my_processor_list_entries; i++) {
208 for (i = 0; i < my_processor_list_entries; i++) {
209 if (my_processor_list[i].received == 0) {
213 if (barrier_reached) {
215 my_service_list[my_processing_idx].name);
222 my_processing_idx += 1;
223 if (my_service_list_entries == my_processing_idx) {
224 sync_synchronization_completed ();
226 sync_process_enter ();
231 static void dummy_sync_abort (
void)
235 static int dummy_sync_process (
void)
240 static void dummy_sync_activate (
void)
244 static int service_entry_compare (
const void *a,
const void *b)
252 static void sync_service_build_handler (
unsigned int nodeid,
const void *msg)
256 int barrier_reached = 1;
258 int qsort_trigger = 0;
268 for (j = 0; j < my_service_list_entries; j++) {
270 my_service_list[j].service_id) {
276 my_service_list[my_service_list_entries].
state =
PROCESS;
277 my_service_list[my_service_list_entries].
service_id =
279 sprintf (my_service_list[my_service_list_entries].name,
280 "Unknown External Service (id = %d)\n",
282 my_service_list[my_service_list_entries].
sync_init =
284 my_service_list[my_service_list_entries].
sync_abort =
290 my_service_list_entries += 1;
296 qsort (my_service_list, my_service_list_entries,
299 for (i = 0; i < my_processor_list_entries; i++) {
304 for (i = 0; i < my_processor_list_entries; i++) {
305 if (my_processor_list[i].received == 0) {
309 if (barrier_reached) {
311 sync_process_enter ();
315 static void sync_deliver_fn (
318 unsigned int msg_len,
319 int endian_conversion_required)
321 struct qb_ipc_request_header *
header = (
struct qb_ipc_request_header *)msg;
325 sync_barrier_handler (
nodeid, msg);
328 sync_service_build_handler (
nodeid, msg);
333 static void barrier_message_transmit (
void)
360 memcpy (&service_build_message->ring_id, &my_ring_id,
363 iovec.iov_base = (
void *)service_build_message;
370 static void sync_barrier_enter (
void)
373 barrier_message_transmit ();
376 static void sync_process_call_init (
void)
379 size_t old_trans_list_entries = 0;
383 memcpy (old_trans_list, my_trans_list, my_trans_list_entries *
384 sizeof (
unsigned int));
385 old_trans_list_entries = my_trans_list_entries;
387 my_trans_list_entries = 0;
388 for (o = 0; o < old_trans_list_entries; o++) {
389 for (m = 0; m < my_member_list_entries; m++) {
390 if (old_trans_list[o] == my_member_list[m]) {
391 my_trans_list[my_trans_list_entries] = my_member_list[m];
392 my_trans_list_entries++;
398 for (i = 0; i < my_service_list_entries; i++) {
400 my_service_list[i].
sync_init (my_trans_list,
401 my_trans_list_entries, my_member_list,
402 my_member_list_entries,
408 static void sync_process_enter (
void)
417 if (my_service_list_entries == 0) {
419 sync_synchronization_completed ();
422 for (i = 0; i < my_processor_list_entries; i++) {
431 static void sync_servicelist_build_enter (
432 const unsigned int *member_list,
433 size_t member_list_entries,
441 memset(&service_build, 0,
sizeof(service_build));
444 for (i = 0; i < member_list_entries; i++) {
445 my_processor_list[i].
nodeid = member_list[i];
448 my_processor_list_entries = member_list_entries;
450 memcpy (my_member_list, member_list,
451 member_list_entries *
sizeof (
unsigned int));
452 my_member_list_entries = member_list_entries;
454 my_processing_idx = 0;
457 my_service_list_entries = 0;
467 my_service_list[my_service_list_entries].
state =
PROCESS;
468 my_service_list[my_service_list_entries].
service_id = i;
470 assert(strlen(
sync_callbacks.
name) <
sizeof(my_service_list[my_service_list_entries].name));
472 strcpy (my_service_list[my_service_list_entries].
name,
478 my_service_list_entries += 1;
481 for (i = 0; i < my_service_list_entries; i++) {
482 service_build.service_list[i] =
485 service_build.service_list_entries = my_service_list_entries;
487 service_build_message_transmit (&service_build);
490 sync_process_call_init ();
493 static int schedwrk_processor (
const void *context)
497 if (my_service_list[my_processing_idx].state ==
PROCESS) {
499 res = my_service_list[my_processing_idx].
sync_process ();
504 sync_barrier_enter();
513 const unsigned int *member_list,
514 size_t member_list_entries,
520 sync_servicelist_build_enter (member_list, member_list_entries,
525 const unsigned int *member_list,
526 size_t member_list_entries,
530 memcpy (my_trans_list, member_list, member_list_entries *
531 sizeof (
unsigned int));
532 my_trans_list_entries = member_list_entries;
541 my_service_list[my_processing_idx].
sync_abort ();