90 #include <sys/types.h> 91 #include <sys/socket.h> 92 #include <netinet/in.h> 93 #include <arpa/inet.h> 104 #include <qb/qblist.h> 105 #include <qb/qbloop.h> 106 #include <qb/qbipcs.h> 108 #define LOGSYS_UTILS_ONLY 1 119 #if !(defined(__i386__) || defined(__x86_64__)) 123 #define TOTEMPG_NEED_ALIGN 1 153 #define TOTEMPG_PACKET_SIZE (totempg_totem_config->net_mtu - \ 154 sizeof (struct totempg_mcast)) 161 static int mcast_packed_msg_count = 0;
163 static int totempg_reserved = 1;
165 static unsigned int totempg_size_limit;
169 static uint32_t totempg_threaded_mode = 0;
171 static void *totemsrp_context;
176 static int totempg_log_level_security;
177 static int totempg_log_level_error;
178 static int totempg_log_level_warning;
179 static int totempg_log_level_notice;
180 static int totempg_log_level_debug;
181 static int totempg_subsys_id;
182 static void (*totempg_log_printf) (
185 const char *
function,
188 const char *format, ...)
__attribute__((format(printf, 6, 7)));
205 struct qb_list_head list;
234 static unsigned char *fragmentation_data;
236 static int fragment_size = 0;
238 static int fragment_continuation = 0;
240 static int totempg_waiting_transack = 0;
246 unsigned int msg_len,
247 int endian_conversion_required);
251 const unsigned int *member_list,
size_t member_list_entries,
252 const unsigned int *left_list,
size_t left_list_entries,
253 const unsigned int *joined_list,
size_t joined_list_entries,
261 struct qb_list_head list;
264 static unsigned char next_fragment = 1;
266 static pthread_mutex_t totempg_mutex = PTHREAD_MUTEX_INITIALIZER;
268 static pthread_mutex_t callback_token_mutex = PTHREAD_MUTEX_INITIALIZER;
270 static pthread_mutex_t mcast_msg_mutex = PTHREAD_MUTEX_INITIALIZER;
272 #define log_printf(level, format, args...) \ 274 totempg_log_printf(level, \ 276 __FUNCTION__, __FILE__, __LINE__, \ 280 static int msg_count_send_ok (
int msg_count);
282 static int byte_count_send_ok (
int byte_count);
284 static void totempg_waiting_trans_ack_cb (
int waiting_trans_ack)
286 log_printf(LOG_DEBUG,
"waiting_trans_ack changed to %u", waiting_trans_ack);
287 totempg_waiting_transack = waiting_trans_ack;
293 struct qb_list_head *list;
294 struct qb_list_head *active_assembly_list_inuse;
296 if (totempg_waiting_transack) {
297 active_assembly_list_inuse = &assembly_list_inuse_trans;
299 active_assembly_list_inuse = &assembly_list_inuse;
305 qb_list_for_each(list, active_assembly_list_inuse) {
306 assembly = qb_list_entry (list,
struct assembly, list);
308 if (nodeid == assembly->
nodeid) {
316 if (qb_list_empty (&assembly_list_free) == 0) {
317 assembly = qb_list_first_entry (&assembly_list_free,
struct assembly, list);
318 qb_list_del (&assembly->
list);
319 qb_list_add (&assembly->
list, active_assembly_list_inuse);
330 assembly = malloc (
sizeof (
struct assembly));
336 assembly->
data[0] = 0;
340 qb_list_init (&assembly->
list);
341 qb_list_add (&assembly->
list, active_assembly_list_inuse);
346 static void assembly_deref (
struct assembly *assembly)
348 qb_list_del (&assembly->
list);
349 qb_list_add (&assembly->
list, &assembly_list_free);
352 static void assembly_deref_from_normal_and_trans (
int nodeid)
355 struct qb_list_head *list, *tmp_iter;
356 struct qb_list_head *active_assembly_list_inuse;
357 struct assembly *assembly;
359 for (j = 0; j < 2; j++) {
361 active_assembly_list_inuse = &assembly_list_inuse;
363 active_assembly_list_inuse = &assembly_list_inuse_trans;
366 qb_list_for_each_safe(list, tmp_iter, active_assembly_list_inuse) {
367 assembly = qb_list_entry (list,
struct assembly, list);
369 if (nodeid == assembly->
nodeid) {
370 qb_list_del (&assembly->
list);
371 qb_list_add (&assembly->
list, &assembly_list_free);
378 static inline void app_confchg_fn (
380 const unsigned int *member_list,
size_t member_list_entries,
381 const unsigned int *left_list,
size_t left_list_entries,
382 const unsigned int *joined_list,
size_t joined_list_entries,
387 struct qb_list_head *list;
394 for (i = 0; i < left_list_entries; i++) {
395 assembly_deref_from_normal_and_trans (left_list[i]);
398 qb_list_for_each(list, &totempg_groups_list) {
415 static inline void group_endian_convert (
419 unsigned short *group_len;
423 #ifdef TOTEMPG_NEED_ALIGN 427 if ((
size_t)msg % 4 != 0) {
428 aligned_msg = alloca(msg_len);
429 memcpy(aligned_msg, msg, msg_len);
437 group_len = (
unsigned short *)aligned_msg;
438 group_len[0] =
swab16(group_len[0]);
439 for (i = 1; i < group_len[0] + 1; i++) {
440 group_len[i] =
swab16(group_len[i]);
443 if (aligned_msg != msg) {
444 memcpy(msg, aligned_msg, msg_len);
448 static inline int group_matches (
450 unsigned int iov_len,
452 unsigned int group_b_cnt,
453 unsigned int *adjust_iovec)
455 unsigned short *group_len;
459 #ifdef TOTEMPG_NEED_ALIGN 460 struct iovec iovec_aligned = { NULL, 0 };
463 assert (iov_len == 1);
465 #ifdef TOTEMPG_NEED_ALIGN 469 if ((
size_t)iovec->iov_base % 4 != 0) {
470 iovec_aligned.iov_base = alloca(iovec->iov_len);
471 memcpy(iovec_aligned.iov_base, iovec->iov_base, iovec->iov_len);
472 iovec_aligned.iov_len = iovec->iov_len;
473 iovec = &iovec_aligned;
477 group_len = (
unsigned short *)iovec->iov_base;
478 group_name = ((
char *)iovec->iov_base) +
479 sizeof (
unsigned short) * (group_len[0] + 1);
485 *adjust_iovec =
sizeof (
unsigned short) * (group_len[0] + 1);
486 for (i = 1; i < group_len[0] + 1; i++) {
487 *adjust_iovec += group_len[i];
493 for (i = 1; i < group_len[0] + 1; i++) {
494 for (j = 0; j < group_b_cnt; j++) {
495 if ((group_len[i] == groups_b[j].group_len) &&
496 (memcmp (groups_b[j].group, group_name, group_len[i]) == 0)) {
500 group_name += group_len[i];
506 static inline void app_deliver_fn (
509 unsigned int msg_len,
510 int endian_conversion_required)
513 struct iovec stripped_iovec;
514 unsigned int adjust_iovec;
516 struct qb_list_head *list;
518 struct iovec aligned_iovec = { NULL, 0 };
520 if (endian_conversion_required) {
521 group_endian_convert (msg, msg_len);
530 #ifdef TOTEMPG_NEED_ALIGN 534 aligned_iovec.iov_base = alloca(msg_len);
535 aligned_iovec.iov_len = msg_len;
536 memcpy(aligned_iovec.iov_base, msg, msg_len);
538 aligned_iovec.iov_base = msg;
539 aligned_iovec.iov_len = msg_len;
542 iovec = &aligned_iovec;
544 qb_list_for_each(list, &totempg_groups_list) {
546 if (group_matches (iovec, 1, instance->
groups, instance->
groups_cnt, &adjust_iovec)) {
547 stripped_iovec.iov_len = iovec->iov_len - adjust_iovec;
548 stripped_iovec.iov_base = (
char *)iovec->iov_base + adjust_iovec;
554 if ((
char *)iovec->iov_base + adjust_iovec % 4 != 0) {
558 stripped_iovec.iov_base =
559 alloca (stripped_iovec.iov_len);
560 memcpy (stripped_iovec.iov_base,
561 (
char *)iovec->iov_base + adjust_iovec,
562 stripped_iovec.iov_len);
567 stripped_iovec.iov_base,
568 stripped_iovec.iov_len,
569 endian_conversion_required);
574 static void totempg_confchg_fn (
576 const unsigned int *member_list,
size_t member_list_entries,
577 const unsigned int *left_list,
size_t left_list_entries,
578 const unsigned int *joined_list,
size_t joined_list_entries,
582 app_confchg_fn (configuration_type,
583 member_list, member_list_entries,
584 left_list, left_list_entries,
585 joined_list, joined_list_entries,
589 static void totempg_deliver_fn (
592 unsigned int msg_len,
593 int endian_conversion_required)
596 unsigned short *msg_lens;
598 struct assembly *assembly;
605 struct iovec iov_delv;
606 size_t expected_msg_len;
608 assembly = assembly_ref (nodeid);
613 "Message (totempg_mcast) received from node " CS_PRI_NODE_ID " is too short... Ignoring.", nodeid);
624 if (endian_conversion_required) {
630 msg_count *
sizeof (
unsigned short);
632 if (msg_len < datasize) {
634 "Message (totempg_mcast datasize) received from node " CS_PRI_NODE_ID 635 " is too short... Ignoring.", nodeid);
640 memcpy (header, msg, datasize);
643 msg_lens = (
unsigned short *) (header +
sizeof (
struct totempg_mcast));
644 expected_msg_len = datasize;
646 if (endian_conversion_required) {
647 msg_lens[i] =
swab16 (msg_lens[i]);
650 expected_msg_len += msg_lens[i];
653 if (msg_len != expected_msg_len) {
656 " doesn't have expected length of %zu (has %u) bytes... Ignoring.",
657 nodeid, expected_msg_len, msg_len);
662 assert((assembly->
index+msg_len) <
sizeof(assembly->
data));
663 memcpy (&assembly->
data[assembly->
index], &data[datasize],
674 iov_delv.iov_base = (
void *)&assembly->
data[0];
675 iov_delv.iov_len = assembly->
index + msg_lens[0];
694 assembly->
index += msg_lens[0];
695 iov_delv.iov_base = (
void *)&assembly->
data[assembly->
index];
696 iov_delv.iov_len = msg_lens[1];
704 app_deliver_fn(nodeid, iov_delv.iov_base, iov_delv.iov_len,
705 endian_conversion_required);
706 assembly->
index += msg_lens[i];
707 iov_delv.iov_base = (
void *)&assembly->
data[assembly->
index];
708 if (i < (msg_count - 1)) {
709 iov_delv.iov_len = msg_lens[i + 1];
713 log_printf (LOG_DEBUG,
"fragmented continuation %u is not equal to assembly last_frag_num %u",
725 assembly_deref (assembly);
731 memmove (&assembly->
data[0],
733 msg_lens[msg_count]);
752 struct iovec iovecs[3];
754 if (totempg_threaded_mode == 1) {
755 pthread_mutex_lock (&mcast_msg_mutex);
757 if (mcast_packed_msg_count == 0) {
758 if (totempg_threaded_mode == 1) {
759 pthread_mutex_unlock (&mcast_msg_mutex);
764 if (totempg_threaded_mode == 1) {
765 pthread_mutex_unlock (&mcast_msg_mutex);
778 fragment_continuation = 0;
780 mcast.
msg_count = mcast_packed_msg_count;
782 iovecs[0].iov_base = (
void *)&mcast;
784 iovecs[1].iov_base = (
void *)mcast_packed_msg_lens;
785 iovecs[1].iov_len = mcast_packed_msg_count *
sizeof (
unsigned short);
786 iovecs[2].iov_base = (
void *)&fragmentation_data[0];
787 iovecs[2].iov_len = fragment_size;
790 mcast_packed_msg_count = 0;
793 if (totempg_threaded_mode == 1) {
794 pthread_mutex_unlock (&mcast_msg_mutex);
803 qb_loop_t *poll_handle,
808 totempg_totem_config = totem_config;
818 if (fragmentation_data == 0) {
831 totempg_waiting_trans_ack_cb);
839 &callback_token_received_handle,
842 callback_token_received_fn,
846 (totempg_totem_config->
net_mtu -
849 qb_list_init (&totempg_groups_list);
857 if (totempg_threaded_mode == 1) {
858 pthread_mutex_lock (&totempg_mutex);
861 if (totempg_threaded_mode == 1) {
862 pthread_mutex_unlock (&totempg_mutex);
869 static int mcast_msg (
870 struct iovec *iovec_in,
871 unsigned int iov_len,
876 struct iovec iovecs[3];
877 struct iovec iovec[64];
880 int max_packet_size = 0;
885 if (totempg_threaded_mode == 1) {
886 pthread_mutex_lock (&mcast_msg_mutex);
893 assert (iov_len < 64);
894 for (dest = 0, src = 0; src < iov_len; src++) {
895 if (iovec_in[src].iov_len) {
896 memcpy (&iovec[dest++], &iovec_in[src],
897 sizeof (
struct iovec));
903 (
sizeof (
unsigned short) * (mcast_packed_msg_count + 1));
905 mcast_packed_msg_lens[mcast_packed_msg_count] = 0;
910 for (i = 0; i < iov_len; i++) {
911 total_size += iovec[i].iov_len;
914 if (byte_count_send_ok (total_size +
sizeof(
unsigned short) *
915 (mcast_packed_msg_count)) == 0) {
917 if (totempg_threaded_mode == 1) {
918 pthread_mutex_unlock (&mcast_msg_mutex);
923 memset(&mcast, 0,
sizeof(mcast));
926 for (i = 0; i < iov_len; ) {
929 copy_len = iovec[i].iov_len - copy_base;
937 if ((iovec[i].iov_len + fragment_size) <
938 (max_packet_size -
sizeof (
unsigned short))) {
940 memcpy (&fragmentation_data[fragment_size],
941 (
char *)iovec[i].iov_base + copy_base, copy_len);
942 fragment_size += copy_len;
943 mcast_packed_msg_lens[mcast_packed_msg_count] += copy_len;
954 unsigned char *data_ptr;
956 copy_len =
min(copy_len, max_packet_size - fragment_size);
957 if( copy_len == max_packet_size )
958 data_ptr = (
unsigned char *)iovec[i].iov_base + copy_base;
960 data_ptr = fragmentation_data;
963 memcpy (&fragmentation_data[fragment_size],
964 (
unsigned char *)iovec[i].iov_base + copy_base, copy_len);
965 mcast_packed_msg_lens[mcast_packed_msg_count] += copy_len;
972 if ((i < (iov_len - 1)) ||
973 ((copy_base + copy_len) < iovec[i].iov_len)) {
974 if (!next_fragment) {
977 fragment_continuation = next_fragment;
979 assert(fragment_continuation != 0);
982 fragment_continuation = 0;
988 mcast.
msg_count = ++mcast_packed_msg_count;
989 iovecs[0].iov_base = (
void *)&mcast;
991 iovecs[1].iov_base = (
void *)mcast_packed_msg_lens;
992 iovecs[1].iov_len = mcast_packed_msg_count *
993 sizeof(
unsigned short);
994 iovecs[2].iov_base = (
void *)data_ptr;
995 iovecs[2].iov_len = fragment_size + copy_len;
1005 mcast_packed_msg_lens[0] = 0;
1006 mcast_packed_msg_count = 0;
1013 if ((copy_base + copy_len) == iovec[i].iov_len) {
1022 copy_base += copy_len;
1032 if (mcast_packed_msg_lens[mcast_packed_msg_count]) {
1033 mcast_packed_msg_count++;
1037 if (totempg_threaded_mode == 1) {
1038 pthread_mutex_unlock (&mcast_msg_mutex);
1046 static int msg_count_send_ok (
1054 return ((avail - totempg_reserved) > msg_count);
1057 static int byte_count_send_ok (
1060 unsigned int msg_count = 0;
1065 msg_count = (byte_count / (totempg_totem_config->
net_mtu -
sizeof (
struct totempg_mcast) - 16)) + 1;
1067 return (avail >= msg_count);
1070 static int send_reserve (
1073 unsigned int msg_count = 0;
1075 msg_count = (msg_size / (totempg_totem_config->
net_mtu -
sizeof (
struct totempg_mcast) - 16)) + 1;
1082 static void send_release (
1089 #ifndef HAVE_SMALL_MEMORY_FOOTPRINT 1090 #undef MESSAGE_QUEUE_MAX 1091 #define MESSAGE_QUEUE_MAX ((4 * MESSAGE_SIZE_MAX) / totempg_totem_config->net_mtu) 1094 static uint32_t q_level_precent_used(
void)
1107 if (totempg_threaded_mode == 1) {
1108 pthread_mutex_lock (&callback_token_mutex);
1112 if (totempg_threaded_mode == 1) {
1113 pthread_mutex_unlock (&callback_token_mutex);
1121 if (totempg_threaded_mode == 1) {
1122 pthread_mutex_lock (&callback_token_mutex);
1125 if (totempg_threaded_mode == 1) {
1126 pthread_mutex_unlock (&callback_token_mutex);
1135 void **totempg_groups_instance,
1137 void (*deliver_fn) (
1138 unsigned int nodeid,
1140 unsigned int msg_len,
1141 int endian_conversion_required),
1143 void (*confchg_fn) (
1145 const unsigned int *member_list,
size_t member_list_entries,
1146 const unsigned int *left_list,
size_t left_list_entries,
1147 const unsigned int *joined_list,
size_t joined_list_entries,
1152 if (totempg_threaded_mode == 1) {
1153 pthread_mutex_lock (&totempg_mutex);
1157 if (instance == NULL) {
1165 instance->
q_level = QB_LOOP_MED;
1166 qb_list_init (&instance->
list);
1167 qb_list_add (&instance->
list, &totempg_groups_list);
1169 if (totempg_threaded_mode == 1) {
1170 pthread_mutex_unlock (&totempg_mutex);
1172 *totempg_groups_instance = instance;
1176 if (totempg_threaded_mode == 1) {
1177 pthread_mutex_unlock (&totempg_mutex);
1183 void *totempg_groups_instance,
1191 if (totempg_threaded_mode == 1) {
1192 pthread_mutex_lock (&totempg_mutex);
1195 new_groups = realloc (instance->
groups,
1198 if (new_groups == 0) {
1204 instance->
groups = new_groups;
1208 if (totempg_threaded_mode == 1) {
1209 pthread_mutex_unlock (&totempg_mutex);
1215 void *totempg_groups_instance,
1219 if (totempg_threaded_mode == 1) {
1220 pthread_mutex_lock (&totempg_mutex);
1223 if (totempg_threaded_mode == 1) {
1224 pthread_mutex_unlock (&totempg_mutex);
1229 #define MAX_IOVECS_FROM_APP 32 1230 #define MAX_GROUPS_PER_MSG 32 1233 void *totempg_groups_instance,
1234 const struct iovec *iovec,
1235 unsigned int iov_len,
1240 struct iovec iovec_mcast[MAX_GROUPS_PER_MSG + 1 + MAX_IOVECS_FROM_APP];
1244 if (totempg_threaded_mode == 1) {
1245 pthread_mutex_lock (&totempg_mutex);
1255 iovec_mcast[i + 1].iov_base = (
void *) instance->
groups[i].
group;
1257 iovec_mcast[0].iov_len = (instance->
groups_cnt + 1) *
sizeof (
unsigned short);
1258 iovec_mcast[0].iov_base = group_len;
1259 for (i = 0; i < iov_len; i++) {
1260 iovec_mcast[i + instance->
groups_cnt + 1].iov_len = iovec[i].iov_len;
1261 iovec_mcast[i + instance->
groups_cnt + 1].iov_base = iovec[i].iov_base;
1264 res = mcast_msg (iovec_mcast, iov_len + instance->
groups_cnt + 1, guarantee);
1266 if (totempg_threaded_mode == 1) {
1267 pthread_mutex_unlock (&totempg_mutex);
1273 static void check_q_level(
1274 void *totempg_groups_instance)
1277 int32_t old_level = instance->
q_level;
1278 int32_t percent_used = q_level_precent_used();
1289 if (totem_queue_level_changed && old_level != instance->
q_level) {
1290 totem_queue_level_changed(instance->
q_level);
1295 void *totempg_groups_instance)
1299 check_q_level(instance);
1303 void *totempg_groups_instance,
1304 const struct iovec *iovec,
1305 unsigned int iov_len)
1308 unsigned int size = 0;
1310 unsigned int reserved = 0;
1312 if (totempg_threaded_mode == 1) {
1313 pthread_mutex_lock (&totempg_mutex);
1314 pthread_mutex_lock (&mcast_msg_mutex);
1320 for (i = 0; i < iov_len; i++) {
1321 size += iovec[i].iov_len;
1324 if (size >= totempg_size_limit) {
1329 if (byte_count_send_ok (size)) {
1330 reserved = send_reserve (size);
1336 check_q_level(instance);
1338 if (totempg_threaded_mode == 1) {
1339 pthread_mutex_unlock (&mcast_msg_mutex);
1340 pthread_mutex_unlock (&totempg_mutex);
1348 if (totempg_threaded_mode == 1) {
1349 pthread_mutex_lock (&totempg_mutex);
1350 pthread_mutex_lock (&mcast_msg_mutex);
1352 send_release (msg_count);
1353 if (totempg_threaded_mode == 1) {
1354 pthread_mutex_unlock (&mcast_msg_mutex);
1355 pthread_mutex_unlock (&totempg_mutex);
1361 void *totempg_groups_instance,
1365 const struct iovec *iovec,
1366 unsigned int iov_len)
1369 struct iovec iovec_mcast[MAX_GROUPS_PER_MSG + 1 + MAX_IOVECS_FROM_APP];
1373 if (totempg_threaded_mode == 1) {
1374 pthread_mutex_lock (&totempg_mutex);
1380 group_len[0] = groups_cnt;
1381 for (i = 0; i < groups_cnt; i++) {
1383 iovec_mcast[i + 1].iov_len = groups[i].
group_len;
1384 iovec_mcast[i + 1].iov_base = (
void *) groups[i].group;
1386 iovec_mcast[0].iov_len = (groups_cnt + 1) *
sizeof (
unsigned short);
1387 iovec_mcast[0].iov_base = group_len;
1388 for (i = 0; i < iov_len; i++) {
1389 iovec_mcast[i + groups_cnt + 1].iov_len = iovec[i].iov_len;
1390 iovec_mcast[i + groups_cnt + 1].iov_base = iovec[i].iov_base;
1393 res = mcast_msg (iovec_mcast, iov_len + groups_cnt + 1, guarantee);
1395 if (totempg_threaded_mode == 1) {
1396 pthread_mutex_unlock (&totempg_mutex);
1405 void *totempg_groups_instance,
1408 const struct iovec *iovec,
1409 unsigned int iov_len)
1411 unsigned int size = 0;
1415 if (totempg_threaded_mode == 1) {
1416 pthread_mutex_lock (&totempg_mutex);
1419 for (i = 0; i < groups_cnt; i++) {
1422 for (i = 0; i < iov_len; i++) {
1423 size += iovec[i].iov_len;
1426 res = msg_count_send_ok (size);
1428 if (totempg_threaded_mode == 1) {
1429 pthread_mutex_unlock (&totempg_mutex);
1436 unsigned short ip_port,
1437 unsigned int iface_no)
1458 unsigned int nodeid,
1459 unsigned int *interface_id,
1461 unsigned int interfaces_size,
1463 unsigned int *iface_count)
1486 return &totempg_stats;
1490 const char *cipher_type,
1491 const char *hash_type)
1500 #define ONE_IFACE_LEN 63 1506 unsigned int iface_count;
1511 iface_string[0] =
'\0';
1513 res =
totempg_ifaces_get (nodeid, iface_ids, interfaces, INTERFACE_MAX, NULL, &iface_count);
1515 return (
"no interface found for nodeid");
1518 res =
totempg_ifaces_get (nodeid, iface_ids, interfaces, INTERFACE_MAX, NULL, &iface_count);
1520 for (i = 0; i < iface_count; i++) {
1521 if (!interfaces[i].
family) {
1527 strcat (iface_string, one_iface);
1529 return (iface_string);
1542 void (*totem_service_ready) (
void))
1549 totem_queue_level_changed = fn;
1587 totempg_threaded_mode = 1;
1606 memcpy(config, totempg_totem_config,
sizeof(
struct totem_config));
1618 memcpy(totempg_totem_config, config,
sizeof(
struct totem_config));
unsigned char last_frag_num
int totemsrp_reconfigure(void *context, struct totem_config *totem_config)
int totempg_initialize(qb_loop_t *poll_handle, struct totem_config *totem_config)
Initialize the totem process groups abstraction.
int totemsrp_iface_set(void *context, const struct totem_ip_address *interface_addr, unsigned short ip_port, unsigned int iface_no)
cfg_message_crypto_reconfig_phase_t
#define TOTEMPG_STATS_CLEAR_TOTEM
Totem Single Ring Protocol.
#define TOTEMPG_NEED_ALIGN
int totempg_groups_initialize(void **totempg_groups_instance, void(*deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required), void(*confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id))
Initialize a groups instance.
int totemsrp_crypto_reconfigure_phase(void *context, struct totem_config *totem_config, cfg_message_crypto_reconfig_phase_t phase)
struct totem_interface * interfaces
int totemsrp_my_family_get(void *srp_context)
void * totempg_get_stats(void)
The totem_ip_address struct.
const char * totemip_print(const struct totem_ip_address *addr)
Totem Single Ring Protocol.
void totempg_stats_clear(int flags)
int totempg_crypto_reconfigure_phase(cfg_message_crypto_reconfig_phase_t phase)
void(* confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id)
int totempg_groups_join(void *totempg_groups_instance, const struct totempg_group *groups, size_t group_cnt)
unsigned char data[MESSAGE_SIZE_MAX+KNET_MAX_PACKET_SIZE]
void totempg_queue_level_register_callback(totem_queue_level_changed_fn fn)
#define TOTEMPG_PACKET_SIZE
int totemsrp_member_add(void *context, const struct totem_ip_address *member, int iface_no)
totem_configuration_type
The totem_configuration_type enum.
unsigned int totemsrp_my_nodeid_get(void *srp_context)
struct totempg_group * groups
void totempg_trans_ack(void)
void totemsrp_net_mtu_adjust(struct totem_config *totem_config)
int totemsrp_crypto_set(void *srp_context, const char *cipher_type, const char *hash_type)
int totemsrp_member_remove(void *context, const struct totem_ip_address *member, int iface_no)
#define log_printf(level, format, args...)
void totemsrp_callback_token_destroy(void *srp_context, void **handle_out)
int totemsrp_callback_token_create(void *srp_context, void **handle_out, enum totem_callback_token_type type, int delete, int(*callback_fn)(enum totem_callback_token_type type, const void *), const void *data)
int totempg_groups_send_ok_groups(void *totempg_groups_instance, const struct totempg_group *groups, size_t groups_cnt, const struct iovec *iovec, unsigned int iov_len)
void totemsrp_service_ready_register(void *context, void(*totem_service_ready)(void))
int totempg_groups_mcast_groups(void *totempg_groups_instance, int guarantee, const struct totempg_group *groups, size_t groups_cnt, const struct iovec *iovec, unsigned int iov_len)
void totempg_put_config(struct totem_config *config)
void(*) in log_level_security)
int totemsrp_mcast(void *srp_context, struct iovec *iovec, unsigned int iov_len, int guarantee)
Multicast a message.
#define MAX_GROUPS_PER_MSG
const char * totempg_ifaces_print(unsigned int nodeid)
void totempg_threaded_mode_enable(void)
int totemsrp_nodestatus_get(void *srp_context, unsigned int nodeid, struct totem_node_status *node_status)
void(* totem_queue_level_changed_fn)(enum totem_q_level level)
int totempg_groups_leave(void *totempg_groups_instance, const struct totempg_group *groups, size_t group_cnt)
int totempg_reconfigure(void)
int totempg_my_family_get(void)
void(* deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required)
unsigned char continuation
void * callback_token_received_handle
void totempg_force_gather(void)
int totemsrp_avail(void *srp_context)
Return number of available messages that can be queued.
int totempg_groups_mcast_joined(void *totempg_groups_instance, const struct iovec *iovec, unsigned int iov_len, int guarantee)
void totemsrp_stats_clear(void *context, int flags)
void totempg_check_q_level(void *totempg_groups_instance)
unsigned int totempg_my_nodeid_get(void)
void(* log_printf)(int level, int subsys, const char *function_name, const char *file_name, int file_line, const char *format,...) __attribute__((format(printf
int totempg_groups_joined_release(int msg_count)
void totempg_event_signal(enum totem_event_type type, int value)
struct totem_message_header header
int totempg_iface_set(struct totem_ip_address *interface_addr, unsigned short ip_port, unsigned int iface_no)
int totempg_member_remove(const struct totem_ip_address *member, int ring_no)
void totempg_callback_token_destroy(void *handle_out)
void totemsrp_threaded_mode_enable(void *context)
int totemsrp_ifaces_get(void *srp_context, unsigned int nodeid, unsigned int *interface_id, struct totem_ip_address *interfaces, unsigned int interfaces_size, char ***status, unsigned int *iface_count)
int totemsrp_initialize(qb_loop_t *poll_handle, void **srp_context, struct totem_config *totem_config, totempg_stats_t *stats, void(*deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required), void(*confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id), void(*waiting_trans_ack_cb_fn)(int waiting_trans_ack))
Create a protocol instance.
void totemsrp_event_signal(void *srp_context, enum totem_event_type type, int value)
int totempg_groups_joined_reserve(void *totempg_groups_instance, const struct iovec *iovec, unsigned int iov_len)
void totemsrp_force_gather(void *context)
void totempg_get_config(struct totem_config *config)
#define MESSAGE_QUEUE_MAX
#define swab16(x)
The swab16 macro.
void totemsrp_finalize(void *srp_context)
int totempg_ifaces_get(unsigned int nodeid, unsigned int *interface_id, struct totem_ip_address *interfaces, unsigned int interfaces_size, char ***status, unsigned int *iface_count)
void totempg_service_ready_register(void(*totem_service_ready)(void))
struct totem_interface * orig_interfaces
QB_LIST_DECLARE(assembly_list_inuse)
struct totem_logging_configuration totem_logging_configuration
void totemsrp_trans_ack(void *context)
enum throw_away_mode throw_away_mode
int totempg_crypto_set(const char *cipher_type, const char *hash_type)
static void(*) struct totem_config totempg_totem_config)
void totempg_finalize(void)
struct memb_ring_id ring_id
int totempg_nodestatus_get(unsigned int nodeid, struct totem_node_status *node_status)
int totempg_member_add(const struct totem_ip_address *member, int ring_no)
int totempg_callback_token_create(void **handle_out, enum totem_callback_token_type type, int delete, int(*callback_fn)(enum totem_callback_token_type type, const void *), const void *data)
struct totempg_mcast_header header
totem_callback_token_type
The totem_callback_token_type enum.