36 #include <sys/types.h> 37 #include <sys/socket.h> 39 #include <sys/ioctl.h> 40 #include <netinet/in.h> 49 #include <netinet/in.h> 50 #include <arpa/inet.h> 57 #include <qb/qbipc_common.h> 65 #define MESSAGE_REQ_SYNC_BARRIER 0 66 #define MESSAGE_REQ_SYNC_SERVICE_BUILD 1 67 #define MESSAGE_REQ_SYNC_MEMB_DETERMINE 2 84 const unsigned int *trans_list,
85 size_t trans_list_entries,
86 const unsigned int *member_list,
87 size_t member_list_entries,
102 struct qb_ipc_request_header header __attribute__((aligned(8)));
103 struct memb_ring_id ring_id __attribute__((aligned(8)));
107 struct qb_ipc_request_header header __attribute__((aligned(8)));
108 struct memb_ring_id ring_id __attribute__((aligned(8)));
114 struct qb_ipc_request_header header __attribute__((aligned(8)));
115 struct memb_ring_id ring_id __attribute__((aligned(8)));
124 static int my_memb_determine = 0;
128 static unsigned int my_memb_determine_list_entries = 0;
130 static int my_processing_idx = 0;
140 static size_t my_member_list_entries = 0;
142 static size_t my_trans_list_entries = 0;
144 static int my_processor_list_entries = 0;
146 static struct service_entry my_service_list[SERVICES_COUNT_MAX];
148 static int my_service_list_entries = 0;
152 static void (*sync_synchronization_completed) (void);
154 static void sync_deliver_fn (
157 unsigned int msg_len,
158 int endian_conversion_required);
160 static int schedwrk_processor (
const void *context);
162 static void sync_process_enter (
void);
169 static void *sync_group_handle;
176 int (*sync_callbacks_retrieve) (
179 void (*synchronization_completed) (
void))
189 "Couldn't initialize groups interface.");
202 sync_synchronization_completed = synchronization_completed;
208 static void sync_barrier_handler (
unsigned int nodeid,
const void *msg)
212 int barrier_reached = 1;
214 if (memcmp (&my_ring_id, &req_exec_barrier_message->ring_id,
220 for (i = 0; i < my_processor_list_entries; i++) {
221 if (my_processor_list[i].nodeid == nodeid) {
225 for (i = 0; i < my_processor_list_entries; i++) {
226 if (my_processor_list[i].received == 0) {
230 if (barrier_reached) {
232 my_service_list[my_processing_idx].
name);
239 my_processing_idx += 1;
240 if (my_service_list_entries == my_processing_idx) {
241 my_memb_determine_list_entries = 0;
242 sync_synchronization_completed ();
244 sync_process_enter ();
249 static void dummy_sync_init (
250 const unsigned int *trans_list,
251 size_t trans_list_entries,
252 const unsigned int *member_list,
253 size_t member_list_entries,
258 static void dummy_sync_abort (
void)
262 static int dummy_sync_process (
void)
267 static void dummy_sync_activate (
void)
271 static int service_entry_compare (
const void *a,
const void *b)
279 static void sync_memb_determine (
unsigned int nodeid,
const void *msg)
285 if (memcmp (&req_exec_memb_determine_message->ring_id,
286 &my_memb_determine_ring_id, sizeof (
struct memb_ring_id)) != 0) {
292 my_memb_determine = 1;
293 for (i = 0; i < my_memb_determine_list_entries; i++) {
294 if (my_memb_determine_list[i] == nodeid) {
299 my_memb_determine_list[my_memb_determine_list_entries] =
nodeid;
300 my_memb_determine_list_entries += 1;
304 static void sync_service_build_handler (
unsigned int nodeid,
const void *msg)
308 int barrier_reached = 1;
310 int qsort_trigger = 0;
312 if (memcmp (&my_ring_id, &req_exec_service_build_message->ring_id,
317 for (i = 0; i < req_exec_service_build_message->service_list_entries; i++) {
320 for (j = 0; j < my_service_list_entries; j++) {
321 if (req_exec_service_build_message->service_list[i] ==
328 my_service_list[my_service_list_entries].
state =
330 my_service_list[my_service_list_entries].
service_id =
331 req_exec_service_build_message->service_list[i];
332 sprintf (my_service_list[my_service_list_entries].
name,
333 "Unknown External Service (id = %d)\n",
334 req_exec_service_build_message->service_list[i]);
335 my_service_list[my_service_list_entries].
sync_init =
337 my_service_list[my_service_list_entries].
sync_abort =
343 my_service_list_entries += 1;
349 qsort (my_service_list, my_service_list_entries,
352 for (i = 0; i < my_processor_list_entries; i++) {
353 if (my_processor_list[i].nodeid == nodeid) {
357 for (i = 0; i < my_processor_list_entries; i++) {
358 if (my_processor_list[i].received == 0) {
362 if (barrier_reached) {
363 sync_process_enter ();
367 static void sync_deliver_fn (
370 unsigned int msg_len,
371 int endian_conversion_required)
373 struct qb_ipc_request_header *
header = (
struct qb_ipc_request_header *)msg;
375 switch (header->id) {
377 sync_barrier_handler (nodeid, msg);
380 sync_service_build_handler (nodeid, msg);
383 sync_memb_determine (nodeid, msg);
388 static void memb_determine_message_transmit (
void)
393 req_exec_memb_determine_message.header.size =
sizeof (
struct req_exec_memb_determine_message);
396 memcpy (&req_exec_memb_determine_message.ring_id,
397 &my_memb_determine_ring_id,
400 iovec.iov_base = (
char *)&req_exec_memb_determine_message;
401 iovec.iov_len =
sizeof (req_exec_memb_determine_message);
407 static void barrier_message_transmit (
void)
412 req_exec_barrier_message.header.size =
sizeof (
struct req_exec_barrier_message);
415 memcpy (&req_exec_barrier_message.ring_id, &my_ring_id,
418 iovec.iov_base = (
char *)&req_exec_barrier_message;
419 iovec.iov_len =
sizeof (req_exec_barrier_message);
432 memcpy (&service_build_message->ring_id, &my_ring_id,
435 iovec.iov_base = (
void *)service_build_message;
442 static void sync_barrier_enter (
void)
445 barrier_message_transmit ();
448 static void sync_process_enter (
void)
457 if (my_service_list_entries == 0) {
459 my_memb_determine_list_entries = 0;
460 sync_synchronization_completed ();
463 for (i = 0; i < my_processor_list_entries; i++) {
471 static void sync_servicelist_build_enter (
472 const unsigned int *member_list,
473 size_t member_list_entries,
482 for (i = 0; i < member_list_entries; i++) {
483 my_processor_list[i].
nodeid = member_list[i];
486 my_processor_list_entries = member_list_entries;
488 memcpy (my_member_list, member_list,
489 member_list_entries *
sizeof (
unsigned int));
490 my_member_list_entries = member_list_entries;
492 my_processing_idx = 0;
494 memset(my_service_list, 0,
sizeof (
struct service_entry) * SERVICES_COUNT_MAX);
495 my_service_list_entries = 0;
505 my_service_list[my_service_list_entries].
state =
INIT;
506 my_service_list[my_service_list_entries].
service_id = i;
507 strcpy (my_service_list[my_service_list_entries].
name,
508 sync_callbacks.
name);
513 my_service_list_entries += 1;
516 for (i = 0; i < my_service_list_entries; i++) {
517 service_build.service_list[i] =
520 service_build.service_list_entries = my_service_list_entries;
522 service_build_message_transmit (&service_build);
525 static int schedwrk_processor (
const void *context)
529 if (my_service_list[my_processing_idx].
state ==
INIT) {
531 size_t old_trans_list_entries = 0;
535 memcpy (old_trans_list, my_trans_list, my_trans_list_entries *
536 sizeof (
unsigned int));
537 old_trans_list_entries = my_trans_list_entries;
539 my_trans_list_entries = 0;
540 for (o = 0; o < old_trans_list_entries; o++) {
541 for (m = 0; m < my_member_list_entries; m++) {
542 if (old_trans_list[o] == my_member_list[m]) {
543 my_trans_list[my_trans_list_entries] = my_member_list[m];
544 my_trans_list_entries++;
551 my_service_list[my_processing_idx].
sync_init (my_trans_list,
552 my_trans_list_entries, my_member_list,
553 my_member_list_entries,
557 if (my_service_list[my_processing_idx].
state ==
PROCESS) {
560 res = my_service_list[my_processing_idx].
sync_process ();
565 sync_barrier_enter();
574 const unsigned int *member_list,
575 size_t member_list_entries,
579 memcpy (&my_ring_id, ring_id,
sizeof (
struct memb_ring_id));
581 if (my_memb_determine) {
582 my_memb_determine = 0;
583 sync_servicelist_build_enter (my_memb_determine_list,
584 my_memb_determine_list_entries, ring_id);
586 sync_servicelist_build_enter (member_list, member_list_entries,
592 const unsigned int *member_list,
593 size_t member_list_entries,
597 memcpy (my_trans_list, member_list, member_list_entries *
598 sizeof (
unsigned int));
599 my_trans_list_entries = member_list_entries;
608 my_service_list[my_processing_idx].
sync_abort ();
621 memcpy (&my_memb_determine_ring_id, ring_id,
624 memb_determine_message_transmit ();
630 my_memb_determine_list_entries = 0;
631 memset (&my_memb_determine_ring_id, 0,
sizeof (
struct memb_ring_id));
void sync_start(const unsigned int *member_list, size_t member_list_entries, const struct memb_ring_id *ring_id)
Totem Single Ring Protocol.
void(* sync_init)(const unsigned int *trans_list, size_t trans_list_entries, const unsigned int *member_list, size_t member_list_entries, const struct memb_ring_id *ring_id)
void(* sync_activate)(void)
#define MESSAGE_REQ_SYNC_MEMB_DETERMINE
struct message_header header
int totempg_groups_initialize(void **instance, void(*deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required), void(*confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id))
Initialize a groups instance.
#define log_printf(level, format, args...)
void(* sync_activate)(void)
void schedwrk_destroy(hdb_handle_t handle)
void(* sync_init)(const unsigned int *trans_list, size_t trans_list_entries, const unsigned int *member_list, size_t member_list_entries, const struct memb_ring_id *ring_id)
#define LOGSYS_LEVEL_ERROR
void sync_save_transitional(const unsigned int *member_list, size_t member_list_entries, const struct memb_ring_id *ring_id)
int totempg_groups_mcast_joined(void *instance, const struct iovec *iovec, unsigned int iov_len, int guarantee)
#define LOGSYS_LEVEL_DEBUG
enum sync_process_state state
#define MESSAGE_REQ_SYNC_SERVICE_BUILD
#define PROCESSOR_COUNT_MAX
void sync_memb_list_abort(void)
#define MESSAGE_REQ_SYNC_BARRIER
#define SERVICES_COUNT_MAX
int(* my_sync_callbacks_retrieve)(int service_id, struct sync_callbacks *callbacks)
int totempg_groups_join(void *instance, const struct totempg_group *groups, size_t group_cnt)
LOGSYS_DECLARE_SUBSYS("SYNC")
struct memb_ring_id ring_id
void sync_memb_list_determine(const struct memb_ring_id *ring_id)
int(* sync_process)(void)
int schedwrk_create(hdb_handle_t *handle, int(schedwrk_fn)(const void *), const void *context)
int(* sync_process)(void)