55 #include <sys/types.h> 57 #include <sys/socket.h> 60 #include <sys/ioctl.h> 61 #include <sys/param.h> 62 #include <netinet/in.h> 63 #include <arpa/inet.h> 76 #include <qb/qblist.h> 77 #include <qb/qbdefs.h> 78 #include <qb/qbutil.h> 79 #include <qb/qbloop.h> 84 #define LOGSYS_UTILS_ONLY 1 95 #define LOCALHOST_IP inet_addr("127.0.0.1") 96 #define QUEUE_RTR_ITEMS_SIZE_MAX 16384 97 #define RETRANS_MESSAGE_QUEUE_SIZE_MAX 16384 98 #define RECEIVED_MESSAGE_QUEUE_SIZE_MAX 500 100 #define RETRANSMIT_ENTRIES_MAX 30 101 #define TOKEN_SIZE_MAX 64000 102 #define LEAVE_DUMMY_NODEID 0 121 #define SEQNO_START_MSG 0x0 122 #define SEQNO_START_TOKEN 0x0 144 #define ENDIAN_LOCAL 0xff22 170 struct qb_list_head list;
269 unsigned int msg_len;
377 struct sq regular_sort_queue;
379 struct sq recovery_sort_queue;
388 struct qb_list_head token_callback_received_listhead;
390 struct qb_list_head token_callback_sent_listhead;
438 void (*totemsrp_log_printf) (
441 const char *
function,
444 const char *format, ...)
__attribute__((format(printf, 6, 7)));;
454 void (*totemsrp_deliver_fn) (
457 unsigned int msg_len,
458 int endian_conversion_required);
460 void (*totemsrp_confchg_fn) (
462 const unsigned int *member_list,
size_t member_list_entries,
463 const unsigned int *left_list,
size_t left_list_entries,
464 const unsigned int *joined_list,
size_t joined_list_entries,
467 void (*totemsrp_service_ready_fn) (void);
469 void (*totemsrp_waiting_trans_ack_cb_fn) (
470 int waiting_trans_ack);
472 void (*memb_ring_id_create_or_load) (
476 void (*memb_ring_id_store) (
477 const struct memb_ring_id *memb_ring_id,
530 char commit_token_storage[40000];
535 int (*handler_functions[6]) (
539 int endian_conversion_needed);
584 static int message_handler_orf_token (
588 int endian_conversion_needed);
590 static int message_handler_mcast (
594 int endian_conversion_needed);
596 static int message_handler_memb_merge_detect (
600 int endian_conversion_needed);
602 static int message_handler_memb_join (
606 int endian_conversion_needed);
608 static int message_handler_memb_commit_token (
612 int endian_conversion_needed);
614 static int message_handler_token_hold_cancel (
618 int endian_conversion_needed);
622 static void srp_addr_to_nodeid (
624 unsigned int *nodeid_out,
626 unsigned int entries);
628 static int srp_addr_equal (
const struct srp_addr *a,
const struct srp_addr *b);
634 static void messages_deliver_to_app (
struct totemsrp_instance *instance,
int skip,
unsigned int end_point);
636 int fcc_mcasts_allowed);
637 static void messages_free (
struct totemsrp_instance *instance,
unsigned int token_aru);
641 static void target_set_completed (
void *context);
643 static void memb_state_commit_token_target_set (
struct totemsrp_instance *instance);
648 static void orf_token_endian_convert (
const struct orf_token *in,
struct orf_token *out);
649 static void memb_commit_token_endian_convert (
const struct memb_commit_token *in,
struct memb_commit_token *out);
650 static void memb_join_endian_convert (
const struct memb_join *in,
struct memb_join *out);
651 static void mcast_endian_convert (
const struct mcast *in,
struct mcast *out);
652 static void memb_merge_detect_endian_convert (
655 static struct srp_addr srp_addr_endian_convert (struct srp_addr in);
656 static void timer_function_orf_token_timeout (
void *data);
657 static void timer_function_orf_token_warning (
void *data);
658 static void timer_function_pause_timeout (
void *data);
659 static void timer_function_heartbeat_timeout (
void *data);
660 static void timer_function_token_retransmit_timeout (
void *data);
661 static void timer_function_token_hold_retransmit_timeout (
void *data);
662 static void timer_function_merge_detect_timeout (
void *data);
664 static void totemsrp_buffer_release (
struct totemsrp_instance *instance,
void *ptr);
670 unsigned int msg_len,
676 unsigned int iface_no);
681 message_handler_orf_token,
682 message_handler_mcast,
683 message_handler_memb_merge_detect,
684 message_handler_memb_join,
685 message_handler_memb_commit_token,
686 message_handler_token_hold_cancel
690 #define log_printf(level, format, args...) \ 692 instance->totemsrp_log_printf ( \ 693 level, instance->totemsrp_subsys_id, \ 694 __FUNCTION__, __FILE__, __LINE__, \ 697 #define LOGSYS_PERROR(err_num, level, fmt, args...) \ 699 char _error_str[LOGSYS_MAX_PERROR_MSG_LEN]; \ 700 const char *_error_ptr = qb_strerror_r(err_num, _error_str, sizeof(_error_str)); \ 701 instance->totemsrp_log_printf ( \ 702 level, instance->totemsrp_subsys_id, \ 703 __FUNCTION__, __FILE__, __LINE__, \ 704 fmt ": %s (%d)\n", ##args, _error_ptr, err_num); \ 710 return gather_state_from_desc[gsfrom];
751 uint64_t timestamp_msec;
754 now_msec = (qb_util_nano_current_get () / QB_TIME_NS_IN_MSEC);
759 "Process pause detected for %d ms, flushing membership messages.", (
unsigned int)(now_msec - timestamp_msec));
774 unsigned long long nano_secs = qb_util_nano_current_get ();
776 time_now = (nano_secs / QB_TIME_NS_IN_MSEC);
805 static void totempg_mtu_changed(
void *context,
int net_mtu)
812 "Net MTU changed to %d, new value is %d",
820 qb_loop_t *poll_handle,
828 unsigned int msg_len,
829 int endian_conversion_required),
833 const unsigned int *member_list,
size_t member_list_entries,
834 const unsigned int *left_list,
size_t left_list_entries,
835 const unsigned int *joined_list,
size_t joined_list_entries,
837 void (*waiting_trans_ack_cb_fn) (
838 int waiting_trans_ack))
844 if (instance == NULL) {
848 totemsrp_instance_initialize (instance);
886 "Token Timeout (%d ms) retransmit timeout (%d ms)",
891 "Token warning every %d ms (%d%% of Token Timeout)",
893 if (token_warning_ms < totem_config->token_retransmit_timeout)
895 "The token warning interval (%d ms) is less than the token retransmit timeout (%d ms) " 896 "which can lead to spurious token warnings. Consider increasing the token_warning parameter.",
900 "Token warnings disabled");
903 "token hold (%d ms) retransmits before loss (%d retrans)",
906 "join (%d ms) send_join (%d ms) consensus (%d ms) merge (%d ms)",
913 "downcheck (%d ms) fail to recv const (%d msgs)",
919 "window size per rotation (%d messages) maximum messages per rotation (%d messages)",
923 "missed count const (%d messages)",
927 "send threads (%d threads)", totem_config->
threads);
951 timer_function_pause_timeout (instance);
955 "HeartBeat is Disabled. To enable set heartbeat_failures_allowed > 0");
966 "total heartbeat_timeout (%d ms) is not less than token timeout (%d ms)",
970 "heartbeat_timeout = heartbeat_failures_allowed * token_retransmit_timeout + max_network_delay");
972 "heartbeat timeout should be less than the token timeout. Heartbeat is disabled!!");
988 main_iface_change_fn,
990 target_set_completed);
1012 token_event_stats_collector,
1018 token_event_stats_collector,
1020 *srp_context = instance;
1032 memb_leave_message_send (instance);
1044 unsigned int nodeid,
1073 unsigned int nodeid,
1074 unsigned int *interface_id,
1076 unsigned int interfaces_size,
1078 unsigned int *iface_count)
1094 interface_id[num_ifs] = i;
1096 if (++num_ifs > interfaces_size) {
1105 *iface_count = num_ifs;
1111 const char *cipher_type,
1112 const char *hash_type)
1149 static int srp_addr_equal (
const struct srp_addr *a,
const struct srp_addr *b)
1157 static void srp_addr_to_nodeid (
1159 unsigned int *nodeid_out,
1160 struct srp_addr *srp_addr_in,
1161 unsigned int entries)
1165 for (i = 0; i < entries; i++) {
1166 nodeid_out[i] = srp_addr_in[i].
nodeid;
1170 static struct srp_addr srp_addr_endian_convert (struct srp_addr in)
1172 struct srp_addr res;
1184 static void memb_set_subtract (
1185 struct srp_addr *out_list,
int *out_list_entries,
1186 struct srp_addr *one_list,
int one_list_entries,
1187 struct srp_addr *two_list,
int two_list_entries)
1193 *out_list_entries = 0;
1195 for (i = 0; i < one_list_entries; i++) {
1196 for (j = 0; j < two_list_entries; j++) {
1197 if (srp_addr_equal (&one_list[i], &two_list[j])) {
1203 out_list[*out_list_entries] = one_list[i];
1204 *out_list_entries = *out_list_entries + 1;
1213 static void memb_consensus_set (
1215 const struct srp_addr *
addr)
1237 static int memb_consensus_isset (
1239 const struct srp_addr *addr)
1254 static int memb_consensus_agreed (
1257 struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
1258 int token_memb_entries = 0;
1262 memb_set_subtract (token_memb, &token_memb_entries,
1266 for (i = 0; i < token_memb_entries; i++) {
1267 if (memb_consensus_isset (instance, &token_memb[i]) == 0) {
1282 assert (token_memb_entries >= 1);
1287 static void memb_consensus_notset (
1289 struct srp_addr *no_consensus_list,
1290 int *no_consensus_list_entries,
1291 struct srp_addr *comparison_list,
1292 int comparison_list_entries)
1296 *no_consensus_list_entries = 0;
1299 if (memb_consensus_isset (instance, &instance->
my_proc_list[i]) == 0) {
1300 no_consensus_list[*no_consensus_list_entries] = instance->
my_proc_list[i];
1301 *no_consensus_list_entries = *no_consensus_list_entries + 1;
1309 static int memb_set_equal (
1310 struct srp_addr *set1,
int set1_entries,
1311 struct srp_addr *set2,
int set2_entries)
1318 if (set1_entries != set2_entries) {
1321 for (i = 0; i < set2_entries; i++) {
1322 for (j = 0; j < set1_entries; j++) {
1323 if (srp_addr_equal (&set1[j], &set2[i])) {
1339 static int memb_set_subset (
1340 const struct srp_addr *subset,
int subset_entries,
1341 const struct srp_addr *fullset,
int fullset_entries)
1347 if (subset_entries > fullset_entries) {
1350 for (i = 0; i < subset_entries; i++) {
1351 for (j = 0; j < fullset_entries; j++) {
1352 if (srp_addr_equal (&subset[i], &fullset[j])) {
1366 static void memb_set_merge (
1367 const struct srp_addr *subset,
int subset_entries,
1368 struct srp_addr *fullset,
int *fullset_entries)
1374 for (i = 0; i < subset_entries; i++) {
1375 for (j = 0; j < *fullset_entries; j++) {
1376 if (srp_addr_equal (&fullset[j], &subset[i])) {
1382 fullset[*fullset_entries] = subset[i];
1383 *fullset_entries = *fullset_entries + 1;
1390 static void memb_set_and_with_ring_id (
1391 struct srp_addr *set1,
1394 struct srp_addr *set2,
1397 struct srp_addr *and,
1406 for (i = 0; i < set2_entries; i++) {
1407 for (j = 0; j < set1_entries; j++) {
1408 if (srp_addr_equal (&set1[j], &set2[i])) {
1409 if (memcmp (&set1_ring_ids[j], old_ring_id,
sizeof (
struct memb_ring_id)) == 0) {
1416 and[*and_entries] = set1[j];
1417 *and_entries = *and_entries + 1;
1424 static void memb_set_log(
1428 struct srp_addr *list,
1435 memset(list_str, 0,
sizeof(list_str));
1437 for (i = 0; i < list_entries; i++) {
1439 snprintf(int_buf,
sizeof(int_buf),
CS_PRI_NODE_ID, list[i].nodeid);
1441 snprintf(int_buf,
sizeof(int_buf),
"," CS_PRI_NODE_ID, list[i].nodeid);
1444 if (strlen(list_str) + strlen(int_buf) >=
sizeof(list_str)) {
1447 strcat(list_str, int_buf);
1450 log_printf(level,
"List '%s' contains %d entries: %s",
string, list_entries, list_str);
1453 static void my_leave_memb_clear(
1460 static unsigned int my_leave_memb_match(
1462 unsigned int nodeid)
1465 unsigned int ret = 0;
1476 static void my_leave_memb_set(
1478 unsigned int nodeid)
1502 assert (instance != NULL);
1506 static void totemsrp_buffer_release (
struct totemsrp_instance *instance,
void *ptr)
1508 assert (instance != NULL);
1522 timer_function_token_retransmit_timeout,
1525 log_printf(instance->totemsrp_log_level_error,
"reset_token_retransmit_timeout - qb_loop_timer_add error : %d", res);
1539 timer_function_merge_detect_timeout,
1542 log_printf(instance->totemsrp_log_level_error,
"start_merge_detect_timeout - qb_loop_timer_add error : %d", res);
1545 instance->my_merge_detect_timeout_outstanding = 1;
1568 "Saving state aru %x high seq received %x",
1578 "Restoring instance->my_aru %x my high seq received %x",
1585 "Resetting old ring state");
1598 timer_function_pause_timeout,
1601 log_printf(instance->totemsrp_log_level_error,
"reset_pause_timeout - qb_loop_timer_add error : %d", res);
1613 timer_function_orf_token_warning,
1616 log_printf(instance->totemsrp_log_level_error,
"reset_token_warning - qb_loop_timer_add error : %d", res);
1628 timer_function_orf_token_timeout,
1631 log_printf(instance->totemsrp_log_level_error,
"reset_token_timeout - qb_loop_timer_add error : %d", res);
1634 if (instance->totem_config->token_warning)
1635 reset_token_warning(instance);
1646 timer_function_heartbeat_timeout,
1649 log_printf(instance->totemsrp_log_level_error,
"reset_heartbeat_timeout - qb_loop_timer_add error : %d", res);
1662 cancel_token_warning(instance);
1669 static void cancel_token_retransmit_timeout (
struct totemsrp_instance *instance)
1674 static void start_token_hold_retransmit_timeout (
struct totemsrp_instance *instance)
1682 timer_function_token_hold_retransmit_timeout,
1685 log_printf(instance->totemsrp_log_level_error,
"start_token_hold_retransmit_timeout - qb_loop_timer_add error : %d", res);
1689 static void cancel_token_hold_retransmit_timeout (
struct totemsrp_instance *instance)
1695 static void memb_state_consensus_timeout_expired (
1698 struct srp_addr no_consensus_list[PROCESSOR_COUNT_MAX];
1699 int no_consensus_list_entries;
1702 if (memb_consensus_agreed (instance)) {
1703 memb_consensus_reset (instance);
1705 memb_consensus_set (instance, &instance->
my_id);
1707 reset_token_timeout (instance);
1709 memb_consensus_notset (
1712 &no_consensus_list_entries,
1716 memb_set_merge (no_consensus_list, no_consensus_list_entries,
1729 static void timer_function_pause_timeout (
void *data)
1734 reset_pause_timeout (instance);
1739 old_ring_state_restore (instance);
1744 static void timer_function_orf_token_warning (
void *data)
1751 tv_diff = qb_util_nano_current_get () / QB_TIME_NS_IN_MSEC -
1754 "Token has not been received in %d ms ", (
unsigned int) tv_diff);
1755 reset_token_warning(instance);
1757 cancel_token_warning(instance);
1761 static void timer_function_orf_token_timeout (
void *data)
1768 "The token was lost in the OPERATIONAL state.");
1770 "A processor failed, forming new configuration:" 1771 " token timed out (%ums), waiting %ums for consensus.",
1781 "The consensus timeout expired (%ums).",
1783 memb_state_consensus_timeout_expired (instance);
1790 "The token was lost in the COMMIT state.");
1797 "The token was lost in the RECOVERY state.");
1798 memb_recovery_state_token_loss (instance);
1804 static void timer_function_heartbeat_timeout (
void *data)
1808 "HeartBeat Timer expired Invoking token loss mechanism in state %d ", instance->
memb_state);
1809 timer_function_orf_token_timeout(data);
1812 static void memb_timer_function_state_gather (
void *data)
1824 memb_join_message_send (instance);
1835 memb_timer_function_state_gather,
1836 &instance->memb_timer_state_gather_join_timeout);
1839 log_printf(instance->totemsrp_log_level_error,
"memb_timer_function_state_gather - qb_loop_timer_add error : %d", res);
1845 static void memb_timer_function_gather_consensus_timeout (
void *data)
1848 memb_state_consensus_timeout_expired (instance);
1851 static void deliver_messages_from_recovery_to_regular (
struct totemsrp_instance *instance)
1856 unsigned int range = 0;
1869 for (i = 1; i <= range; i++) {
1875 recovery_message_item = ptr;
1880 mcast = recovery_message_item->
mcast;
1886 regular_message_item.
mcast =
1887 (
struct mcast *)(((
char *)recovery_message_item->
mcast) +
sizeof (
struct mcast));
1888 regular_message_item.
msg_len =
1889 recovery_message_item->
msg_len -
sizeof (
struct mcast);
1890 mcast = regular_message_item.
mcast;
1900 (uint64_t)mcast->
seq);
1913 ®ular_message_item, mcast->
seq);
1930 struct srp_addr joined_list[PROCESSOR_COUNT_MAX];
1931 int joined_list_entries = 0;
1932 unsigned int aru_save;
1939 char left_node_msg[1024];
1940 char joined_node_msg[1024];
1941 char failed_node_msg[1024];
1945 memb_consensus_reset (instance);
1947 old_ring_state_reset (instance);
1949 deliver_messages_from_recovery_to_regular (instance);
1952 "Delivering to app %x to %x",
1955 aru_save = instance->
my_aru;
1968 memb_set_subtract (joined_list, &joined_list_entries,
1986 srp_addr_to_nodeid (instance, trans_memb_list_totemip,
2013 instance->
my_aru = aru_save;
2018 srp_addr_to_nodeid (instance, new_memb_list_totemip,
2020 srp_addr_to_nodeid (instance, joined_list_totemip, joined_list,
2021 joined_list_entries);
2025 joined_list_totemip, joined_list_entries, &instance->
my_ring_id);
2087 regular_message = ptr;
2088 free (regular_message->
mcast);
2094 if (joined_list_entries) {
2096 sptr += snprintf(joined_node_msg,
sizeof(joined_node_msg)-sptr,
" joined:");
2097 for (i=0; i< joined_list_entries; i++) {
2098 sptr += snprintf(joined_node_msg+sptr,
sizeof(joined_node_msg)-sptr,
" " CS_PRI_NODE_ID, joined_list_totemip[i]);
2102 joined_node_msg[0] =
'\0';
2108 sptr += snprintf(left_node_msg,
sizeof(left_node_msg)-sptr,
" left:");
2110 sptr += snprintf(left_node_msg+sptr,
sizeof(left_node_msg)-sptr,
" " CS_PRI_NODE_ID, left_list[i]);
2113 if (my_leave_memb_match(instance, left_list[i]) == 0) {
2115 sptr2 += snprintf(failed_node_msg,
sizeof(failed_node_msg)-sptr2,
" failed:");
2117 sptr2 += snprintf(failed_node_msg+sptr2,
sizeof(left_node_msg)-sptr2,
" " CS_PRI_NODE_ID, left_list[i]);
2121 failed_node_msg[0] =
'\0';
2125 left_node_msg[0] =
'\0';
2126 failed_node_msg[0] =
'\0';
2129 my_leave_memb_clear(instance);
2132 "entering OPERATIONAL state.");
2140 if (strlen(failed_node_msg)) {
2142 "Failed to receive the leave message.%s",
2153 reset_pause_timeout (instance);
2166 static void memb_state_gather_enter (
2177 &instance->
my_id, 1,
2180 memb_join_message_send (instance);
2191 memb_timer_function_state_gather,
2192 &instance->memb_timer_state_gather_join_timeout);
2194 log_printf(instance->totemsrp_log_level_error,
"memb_state_gather_enter - qb_loop_timer_add error(1) : %d", res);
2200 qb_loop_timer_del (instance->totemsrp_poll_handle,
2201 instance->memb_timer_state_gather_consensus_timeout);
2203 res = qb_loop_timer_add (instance->totemsrp_poll_handle,
2205 instance->totem_config->consensus_timeout*QB_TIME_NS_IN_MSEC,
2207 memb_timer_function_gather_consensus_timeout,
2208 &instance->memb_timer_state_gather_consensus_timeout);
2210 log_printf(instance->totemsrp_log_level_error,
"memb_state_gather_enter - qb_loop_timer_add error(2) : %d", res);
2216 cancel_token_retransmit_timeout (instance);
2217 cancel_token_timeout (instance);
2218 cancel_merge_detect_timeout (instance);
2220 memb_consensus_reset (instance);
2222 memb_consensus_set (instance, &instance->my_id);
2224 log_printf (instance->totemsrp_log_level_debug,
2225 "entering GATHER state from %d(%s).",
2226 gather_from, gsfrom_to_msg(gather_from));
2229 instance->stats.gather_entered++;
2235 instance->stats.continuous_gather++;
2241 static void timer_function_token_retransmit_timeout (
void *data);
2243 static void target_set_completed (
2248 memb_state_commit_token_send (instance);
2252 static void memb_state_commit_enter (
2255 old_ring_state_save (instance);
2257 memb_state_commit_token_update (instance);
2259 memb_state_commit_token_target_set (instance);
2276 "entering COMMIT state.");
2279 reset_token_retransmit_timeout (instance);
2280 reset_token_timeout (instance);
2296 static void memb_state_recovery_enter (
2301 int local_received_flg = 1;
2302 unsigned int low_ring_aru;
2303 unsigned int range = 0;
2304 unsigned int messages_originated = 0;
2305 const struct srp_addr *
addr;
2307 struct memb_ring_id my_new_memb_ring_id_list[PROCESSOR_COUNT_MAX];
2313 "entering RECOVERY state.");
2324 memb_state_commit_token_send_recovery (instance, commit_token);
2339 memcpy (&my_new_memb_ring_id_list[i],
2340 &memb_list[i].ring_id,
2343 memb_set_and_with_ring_id (
2345 my_new_memb_ring_id_list,
2365 "aru %x high delivered %x received flag %d",
2383 local_received_flg = 0;
2387 if (local_received_flg == 1) {
2403 if (sq_lt_compare (memb_list[i].
aru, low_ring_aru)) {
2405 low_ring_aru = memb_list[i].
aru;
2426 "copying all old ring messages from %x-%x.",
2429 for (i = 1; i <= range; i++) {
2436 low_ring_aru + i, &ptr);
2440 sort_queue_item = ptr;
2441 messages_originated++;
2442 memset (&message_item, 0,
sizeof (
struct message_item));
2444 message_item.
mcast = totemsrp_buffer_alloc (instance);
2445 assert (message_item.
mcast);
2446 memset(message_item.
mcast, 0, sizeof (
struct mcast));
2458 memcpy (((
char *)message_item.
mcast) + sizeof (
struct mcast),
2459 sort_queue_item->
mcast,
2464 "Originated %d messages in RECOVERY.", messages_originated);
2469 "Did not need to originate any messages in recovery.");
2479 reset_token_timeout (instance);
2480 reset_token_retransmit_timeout (instance);
2493 token_hold_cancel_send (instance);
2500 struct iovec *iovec,
2501 unsigned int iov_len,
2508 unsigned int addr_idx;
2517 if (cs_queue_is_full (queue_use)) {
2522 memset (&message_item, 0,
sizeof (
struct message_item));
2527 message_item.
mcast = totemsrp_buffer_alloc (instance);
2528 if (message_item.
mcast == 0) {
2535 memset(message_item.
mcast, 0, sizeof (
struct mcast));
2547 addr = (
char *)message_item.
mcast;
2548 addr_idx = sizeof (
struct mcast);
2549 for (i = 0; i < iov_len; i++) {
2550 memcpy (&addr[addr_idx], iovec[i].iov_base, iovec[i].iov_len);
2551 addr_idx += iovec[i].iov_len;
2554 message_item.
msg_len = addr_idx;
2558 cs_queue_item_add (queue_use, &message_item);
2580 cs_queue_avail (queue_use, &avail);
2591 static int orf_token_remcast (
2599 struct sq *sort_queue;
2607 res = sq_in_range (sort_queue, seq);
2616 res = sq_item_get (sort_queue, seq, &ptr);
2621 sort_queue_item = ptr;
2625 sort_queue_item->
mcast,
2635 static void messages_free (
2637 unsigned int token_aru)
2642 int log_release = 0;
2643 unsigned int release_to;
2644 unsigned int range = 0;
2646 release_to = token_aru;
2647 if (sq_lt_compare (instance->
my_last_aru, release_to)) {
2667 for (i = 1; i <= range; i++) {
2673 regular_message = ptr;
2674 totemsrp_buffer_release (instance, regular_message->
mcast);
2685 "releasing messages up to and including %x", release_to);
2689 static void update_aru (
2694 struct sq *sort_queue;
2696 unsigned int my_aru_saved = 0;
2706 my_aru_saved = instance->
my_aru;
2707 for (i = 1; i <= range; i++) {
2711 res = sq_item_get (sort_queue, my_aru_saved + i, &ptr);
2719 instance->
my_aru += i - 1;
2725 static int orf_token_mcast (
2728 int fcc_mcasts_allowed)
2732 struct sq *sort_queue;
2735 unsigned int fcc_mcast_current;
2740 reset_token_retransmit_timeout (instance);
2751 for (fcc_mcast_current = 0; fcc_mcast_current < fcc_mcasts_allowed; fcc_mcast_current++) {
2752 if (cs_queue_is_empty (mcast_queue)) {
2755 message_item = (
struct message_item *)cs_queue_item_get (mcast_queue);
2763 memset (&sort_queue_item, 0,
sizeof (
struct sort_queue_item));
2767 mcast = sort_queue_item.
mcast;
2774 sq_item_add (sort_queue, &sort_queue_item, message_item->
mcast->
seq);
2778 message_item->
mcast,
2784 cs_queue_item_remove (mcast_queue);
2792 update_aru (instance);
2797 return (fcc_mcast_current);
2804 static int orf_token_rtr (
2807 unsigned int *fcc_allowed)
2812 struct sq *sort_queue;
2814 unsigned int range = 0;
2815 char retransmit_msg[1024];
2824 rtr_list = &orf_token->
rtr_list[0];
2826 strcpy (retransmit_msg,
"Retransmit List: ");
2831 sprintf (value,
"%x ", rtr_list[i].seq);
2832 strcat (retransmit_msg, value);
2834 strcat (retransmit_msg,
"");
2836 "%s", retransmit_msg);
2849 if (memcmp (&rtr_list[i].ring_id, &instance->
my_ring_id,
2856 res = orf_token_remcast (instance, rtr_list[i].seq);
2863 memmove (&rtr_list[i], &rtr_list[i + 1],
2879 range = orf_token->
seq - instance->
my_aru;
2883 (i <= range); i++) {
2888 res = sq_in_range (sort_queue, instance->
my_aru + i);
2896 res = sq_item_inuse (sort_queue, instance->
my_aru + i);
2907 res = sq_item_miss_count (sort_queue, instance->
my_aru + i);
2917 if (instance->
my_aru + i == rtr_list[j].
seq) {
2947 static void timer_function_token_retransmit_timeout (
void *data)
2957 token_retransmit (instance);
2958 reset_token_retransmit_timeout (instance);
2963 static void timer_function_token_hold_retransmit_timeout (
void *data)
2974 token_retransmit (instance);
2979 static void timer_function_merge_detect_timeout(
void *data)
2988 memb_merge_detect_transmit (instance);
3001 static int token_send (
3003 struct orf_token *orf_token,
3007 unsigned int orf_token_size;
3009 orf_token_size =
sizeof (
struct orf_token) +
3010 (orf_token->rtr_list_entries *
sizeof (
struct rtr_item));
3017 if (forward_token == 0) {
3055 sizeof (
struct token_hold_cancel));
3062 struct orf_token orf_token;
3095 res = token_send (instance, &orf_token, 1);
3100 static void memb_state_commit_token_update (
3103 struct srp_addr *
addr;
3105 unsigned int high_aru;
3141 if (sq_lt_compare (high_aru, memb_list[i].
aru)) {
3142 high_aru = memb_list[i].
aru;
3152 if (sq_lt_compare (memb_list[i].
aru, high_aru)) {
3167 static void memb_state_commit_token_target_set (
3170 struct srp_addr *
addr;
3181 static int memb_state_commit_token_send_recovery (
3183 struct memb_commit_token *commit_token)
3185 unsigned int commit_token_size;
3189 commit_token_size =
sizeof (
struct memb_commit_token) +
3190 ((sizeof (struct srp_addr) +
3191 sizeof (struct memb_commit_token_memb_entry)) * commit_token->
addr_entries);
3207 reset_token_retransmit_timeout (instance);
3211 static int memb_state_commit_token_send (
3214 unsigned int commit_token_size;
3218 commit_token_size =
sizeof (
struct memb_commit_token) +
3219 ((sizeof (struct srp_addr) +
3236 reset_token_retransmit_timeout (instance);
3243 struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
3244 int token_memb_entries = 0;
3246 unsigned int lowest_nodeid;
3248 memb_set_subtract (token_memb, &token_memb_entries,
3255 assert(token_memb_entries > 0);
3257 lowest_nodeid = token_memb[0].
nodeid;
3258 for (i = 1; i < token_memb_entries; i++) {
3259 if (lowest_nodeid > token_memb[i].nodeid) {
3260 lowest_nodeid = token_memb[i].
nodeid;
3266 static int srp_addr_compare (
const void *a,
const void *b)
3268 const struct srp_addr *srp_a = (
const struct srp_addr *)a;
3269 const struct srp_addr *srp_b = (
const struct srp_addr *)b;
3280 static void memb_state_commit_token_create (
3283 struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
3284 struct srp_addr *
addr;
3286 int token_memb_entries = 0;
3289 "Creating commit token because I am the rep.");
3291 memb_set_subtract (token_memb, &token_memb_entries,
3295 memset (instance->
commit_token, 0, sizeof (
struct memb_commit_token));
3310 qsort (token_memb, token_memb_entries,
sizeof (
struct srp_addr),
3319 memcpy (addr, token_memb,
3320 token_memb_entries *
sizeof (
struct srp_addr));
3321 memset (memb_list, 0,
3327 char memb_join_data[40000];
3330 unsigned int addr_idx;
3340 msg_len =
sizeof(
struct memb_join) +
3341 ((instance->my_proc_list_entries + instance->my_failed_list_entries) *
sizeof(
struct srp_addr));
3343 if (msg_len >
sizeof(memb_join_data)) {
3344 log_printf (instance->totemsrp_log_level_error,
3345 "memb_join_message too long. Ignoring message.");
3350 memb_join->
ring_seq = instance->my_ring_id.seq;
3359 addr = (
char *)memb_join;
3360 addr_idx =
sizeof (
struct memb_join);
3361 memcpy (&addr[addr_idx],
3362 instance->my_proc_list,
3363 instance->my_proc_list_entries *
3364 sizeof (
struct srp_addr));
3366 instance->my_proc_list_entries *
3367 sizeof (
struct srp_addr);
3368 memcpy (&addr[addr_idx],
3369 instance->my_failed_list,
3370 instance->my_failed_list_entries *
3371 sizeof (
struct srp_addr));
3373 instance->my_failed_list_entries *
3374 sizeof (
struct srp_addr);
3376 if (instance->totem_config->send_join_timeout) {
3377 usleep (random() % (instance->totem_config->send_join_timeout * 1000));
3380 instance->stats.memb_join_tx++;
3383 instance->totemnet_context,
3390 char memb_join_data[40000];
3393 unsigned int addr_idx;
3394 int active_memb_entries;
3395 struct srp_addr active_memb[PROCESSOR_COUNT_MAX];
3399 "sending join/leave message");
3406 &instance->
my_id, 1,
3409 memb_set_subtract (active_memb, &active_memb_entries,
3411 &instance->
my_id, 1);
3413 msg_len =
sizeof(
struct memb_join) +
3414 ((active_memb_entries + instance->my_failed_list_entries) *
sizeof(
struct srp_addr));
3416 if (msg_len >
sizeof(memb_join_data)) {
3417 log_printf (instance->totemsrp_log_level_error,
3418 "memb_leave message too long. Ignoring message.");
3429 memb_join->
ring_seq = instance->my_ring_id.seq;
3439 addr = (
char *)memb_join;
3440 addr_idx =
sizeof (
struct memb_join);
3441 memcpy (&addr[addr_idx],
3443 active_memb_entries *
3444 sizeof (
struct srp_addr));
3446 active_memb_entries *
3447 sizeof (
struct srp_addr);
3448 memcpy (&addr[addr_idx],
3449 instance->my_failed_list,
3450 instance->my_failed_list_entries *
3451 sizeof (
struct srp_addr));
3453 instance->my_failed_list_entries *
3454 sizeof (
struct srp_addr);
3457 if (instance->totem_config->send_join_timeout) {
3458 usleep (random() % (instance->totem_config->send_join_timeout * 1000));
3460 instance->stats.memb_join_tx++;
3463 instance->totemnet_context,
3485 sizeof (
struct memb_merge_detect));
3488 static void memb_ring_id_set (
3507 token_hold_cancel_send (instance);
3510 if (callback_handle == 0) {
3513 *handle_out = (
void *)callback_handle;
3514 qb_list_init (&callback_handle->
list);
3516 callback_handle->
data = (
void *) data;
3518 callback_handle->
delete =
delete;
3537 qb_list_del (&h->
list);
3544 static void token_callbacks_execute (
3548 struct qb_list_head *list, *tmp_iter;
3549 struct qb_list_head *callback_listhead = 0;
3565 qb_list_for_each_safe(list, tmp_iter, callback_listhead) {
3566 token_callback_instance = qb_list_entry (list,
struct token_callback_instance, list);
3567 del = token_callback_instance->
delete;
3574 token_callback_instance->
data);
3578 if (res == -1 && del == 1) {
3579 qb_list_add (list, callback_listhead);
3581 free (token_callback_instance);
3605 if (queue_use != NULL) {
3606 backlog = cs_queue_used (queue_use);
3613 static int fcc_calculate (
3615 struct orf_token *token)
3617 unsigned int transmits_allowed;
3618 unsigned int backlog_calc;
3626 instance->
my_cbl = backlog_get (instance);
3635 if (backlog_calc > 0 && transmits_allowed > backlog_calc) {
3636 transmits_allowed = backlog_calc;
3640 return (transmits_allowed);
3646 static void fcc_rtr_limit (
3648 struct orf_token *token,
3649 unsigned int *transmits_allowed)
3653 assert (check >= 0);
3660 *transmits_allowed = 0;
3664 static void fcc_token_update (
3666 struct orf_token *token,
3667 unsigned int msgs_transmitted)
3669 token->
fcc += msgs_transmitted - instance->
my_trc;
3671 instance->
my_trc = msgs_transmitted;
3678 static int check_orf_token_sanity(
3682 int endian_conversion_needed)
3685 const struct orf_token *token = (
const struct orf_token *)msg;
3686 size_t required_len;
3688 if (msg_len <
sizeof(
struct orf_token)) {
3690 "Received orf_token message is too short... ignoring.");
3695 if (endian_conversion_needed) {
3701 required_len =
sizeof(
struct orf_token) + rtr_entries *
sizeof(
struct rtr_item);
3702 if (msg_len < required_len) {
3704 "Received orf_token message is too short... ignoring.");
3712 static int check_mcast_sanity(
3716 int endian_conversion_needed)
3719 if (msg_len <
sizeof(
struct mcast)) {
3721 "Received mcast message is too short... ignoring.");
3729 static int check_memb_merge_detect_sanity(
3733 int endian_conversion_needed)
3738 "Received memb_merge_detect message is too short... ignoring.");
3746 static int check_memb_join_sanity(
3750 int endian_conversion_needed)
3755 size_t required_len;
3757 if (msg_len <
sizeof(
struct memb_join)) {
3759 "Received memb_join message is too short... ignoring.");
3767 if (endian_conversion_needed) {
3768 proc_list_entries =
swab32(proc_list_entries);
3769 failed_list_entries =
swab32(failed_list_entries);
3772 required_len =
sizeof(
struct memb_join) + ((proc_list_entries + failed_list_entries) *
sizeof(
struct srp_addr));
3773 if (msg_len < required_len) {
3775 "Received memb_join message is too short... ignoring.");
3783 static int check_memb_commit_token_sanity(
3787 int endian_conversion_needed)
3789 const struct memb_commit_token *mct_msg = (
const struct memb_commit_token *)msg;
3791 size_t required_len;
3793 if (msg_len <
sizeof(
struct memb_commit_token)) {
3795 "Received memb_commit_token message is too short... ignoring.");
3801 if (endian_conversion_needed) {
3802 addr_entries =
swab32(addr_entries);
3805 required_len =
sizeof(
struct memb_commit_token) +
3806 (addr_entries * (
sizeof(
struct srp_addr) + sizeof(struct memb_commit_token_memb_entry)));
3807 if (msg_len < required_len) {
3809 "Received memb_commit_token message is too short... ignoring.");
3817 static int check_token_hold_cancel_sanity(
3821 int endian_conversion_needed)
3826 "Received token_hold_cancel message is too short... ignoring.");
3842 static int message_handler_orf_token (
3846 int endian_conversion_needed)
3848 char token_storage[1500];
3849 char token_convert[1500];
3850 struct orf_token *token = NULL;
3852 unsigned int transmits_allowed;
3853 unsigned int mcasted_retransmit;
3854 unsigned int mcasted_regular;
3855 unsigned int last_aru;
3858 unsigned long long tv_current;
3859 unsigned long long tv_diff;
3861 tv_current = qb_util_nano_current_get ();
3862 tv_diff = tv_current -
tv_old;
3863 tv_old = tv_current;
3866 "Time since last token %0.4f ms", ((
float)tv_diff) / 1000000.0);
3869 if (check_orf_token_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
3876 #ifdef TEST_DROP_ORF_TOKEN_PERCENTAGE 3877 if (random()%100 < TEST_DROP_ORF_TOKEN_PERCENTAGE) {
3882 if (endian_conversion_needed) {
3883 orf_token_endian_convert ((
struct orf_token *)msg,
3884 (
struct orf_token *)token_convert);
3885 msg = (
struct orf_token *)token_convert;
3892 token = (
struct orf_token *)token_storage;
3893 memcpy (token, msg,
sizeof (
struct orf_token));
3894 memcpy (&token->
rtr_list[0], (
char *)msg + sizeof (
struct orf_token),
3902 start_merge_detect_timeout (instance);
3905 cancel_merge_detect_timeout (instance);
3906 cancel_token_hold_retransmit_timeout (instance);
3912 #ifdef TEST_RECOVERY_MSG_COUNT 3952 messages_free (instance, token->
aru);
3971 reset_heartbeat_timeout(instance);
3974 cancel_heartbeat_timeout(instance);
3995 transmits_allowed = fcc_calculate (instance, token);
3996 mcasted_retransmit = orf_token_rtr (instance, token, &transmits_allowed);
4005 fcc_rtr_limit (instance, token, &transmits_allowed);
4006 mcasted_regular = orf_token_mcast (instance, token, transmits_allowed);
4013 fcc_token_update (instance, token, mcasted_retransmit +
4016 if (sq_lt_compare (instance->
my_aru, token->
aru) ||
4021 if (token->
aru == token->
seq) {
4027 if (token->
aru == last_aru && token->
aru_addr != 0) {
4042 "FAILED TO RECEIVE");
4046 memb_set_merge (&instance->
my_id, 1,
4073 "token retrans flag is %d my set retrans flag%d retrans queue empty %d count %d, aru %x",
4086 "install seq %x aru %x high seq received %x",
4104 "retrans flag count %x token aru %x install seq %x aru %x %x",
4108 memb_state_operational_enter (instance);
4115 token_send (instance, token, forward_token);
4118 tv_current = qb_util_nano_current_get ();
4119 tv_diff = tv_current -
tv_old;
4120 tv_old = tv_current;
4123 ((
float)tv_diff) / 1000000.0);
4126 messages_deliver_to_app (instance, 0,
4134 reset_token_timeout (instance);
4135 reset_token_retransmit_timeout (instance);
4139 start_token_hold_retransmit_timeout (instance);
4149 reset_heartbeat_timeout(instance);
4152 cancel_heartbeat_timeout(instance);
4158 static void messages_deliver_to_app (
4161 unsigned int end_point)
4166 struct mcast *mcast_in;
4167 struct mcast mcast_header;
4168 unsigned int range = 0;
4169 int endian_conversion_required;
4170 unsigned int my_high_delivered_stored = 0;
4171 struct srp_addr aligned_system_from;
4186 for (i = 1; i <= range; i++) {
4194 my_high_delivered_stored + i);
4200 my_high_delivered_stored + i, &ptr);
4204 if (res != 0 && skip == 0) {
4215 sort_queue_item_p = ptr;
4217 mcast_in = sort_queue_item_p->
mcast;
4218 assert (mcast_in != (
struct mcast *)0xdeadbeef);
4220 endian_conversion_required = 0;
4222 endian_conversion_required = 1;
4223 mcast_endian_convert (mcast_in, &mcast_header);
4225 memcpy (&mcast_header, mcast_in,
sizeof (
struct mcast));
4234 memb_set_subset (&aligned_system_from,
4248 "Delivering MCAST message with seq %x to pending delivery queue",
4256 ((
char *)sort_queue_item_p->
mcast) + sizeof (
struct mcast),
4258 endian_conversion_required);
4265 static int message_handler_mcast (
4269 int endian_conversion_needed)
4272 struct sq *sort_queue;
4273 struct mcast mcast_header;
4274 struct srp_addr aligned_system_from;
4276 if (check_mcast_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4280 if (endian_conversion_needed) {
4281 mcast_endian_convert (msg, &mcast_header);
4283 memcpy (&mcast_header, msg,
sizeof (
struct mcast));
4294 #ifdef TEST_DROP_MCAST_PERCENTAGE 4295 if (random()%100 < TEST_DROP_MCAST_PERCENTAGE) {
4311 &aligned_system_from, 1,
4317 if (!memb_set_subset (
4318 &aligned_system_from,
4323 memb_set_merge (&aligned_system_from, 1,
4354 sq_in_range (sort_queue, mcast_header.
seq) &&
4355 sq_item_inuse (sort_queue, mcast_header.
seq) == 0) {
4361 sort_queue_item.
mcast = totemsrp_buffer_alloc (instance);
4362 if (sort_queue_item.
mcast == NULL) {
4365 memcpy (sort_queue_item.
mcast, msg, msg_len);
4366 sort_queue_item.
msg_len = msg_len;
4369 mcast_header.
seq)) {
4373 sq_item_add (sort_queue, &sort_queue_item, mcast_header.
seq);
4376 update_aru (instance);
4385 static int message_handler_memb_merge_detect (
4389 int endian_conversion_needed)
4392 struct srp_addr aligned_system_from;
4394 if (check_memb_merge_detect_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4398 if (endian_conversion_needed) {
4399 memb_merge_detect_endian_convert (msg, &memb_merge_detect);
4401 memcpy (&memb_merge_detect, msg,
4402 sizeof (
struct memb_merge_detect));
4414 aligned_system_from = memb_merge_detect.
system_from;
4421 memb_set_merge (&aligned_system_from, 1,
4427 if (!memb_set_subset (
4428 &aligned_system_from,
4433 memb_set_merge (&aligned_system_from, 1,
4451 static void memb_join_process (
4455 struct srp_addr *proc_list;
4456 struct srp_addr *failed_list;
4457 int gather_entered = 0;
4458 int fail_minus_memb_entries = 0;
4459 struct srp_addr fail_minus_memb[PROCESSOR_COUNT_MAX];
4460 struct srp_addr aligned_system_from;
4502 if (memb_set_equal (proc_list,
4507 memb_set_equal (failed_list,
4513 memb_consensus_set (instance, &aligned_system_from);
4516 if (memb_consensus_agreed (instance) && instance->
failed_to_recv == 1) {
4522 memb_state_commit_token_create (instance);
4524 memb_state_commit_enter (instance);
4527 if (memb_consensus_agreed (instance) &&
4528 memb_lowest_in_config (instance)) {
4530 memb_state_commit_token_create (instance);
4532 memb_state_commit_enter (instance);
4537 if (memb_set_subset (proc_list,
4542 memb_set_subset (failed_list,
4549 if (memb_set_subset (&aligned_system_from, 1,
4554 memb_set_merge (proc_list,
4558 if (memb_set_subset (
4559 &instance->
my_id, 1,
4563 &aligned_system_from, 1,
4566 if (memb_set_subset (
4567 &aligned_system_from, 1,
4571 if (memb_set_subset (
4572 &aligned_system_from, 1,
4576 memb_set_merge (failed_list,
4580 memb_set_subtract (fail_minus_memb,
4581 &fail_minus_memb_entries,
4587 memb_set_merge (fail_minus_memb,
4588 fail_minus_memb_entries,
4599 if (gather_entered == 0 &&
4606 static void memb_join_endian_convert (
const struct memb_join *in,
struct memb_join *out)
4609 struct srp_addr *in_proc_list;
4610 struct srp_addr *in_failed_list;
4611 struct srp_addr *out_proc_list;
4612 struct srp_addr *out_failed_list;
4629 out_proc_list[i] = srp_addr_endian_convert (in_proc_list[i]);
4632 out_failed_list[i] = srp_addr_endian_convert (in_failed_list[i]);
4636 static void memb_commit_token_endian_convert (
const struct memb_commit_token *in,
struct memb_commit_token *out)
4658 out_addr[i] = srp_addr_endian_convert (in_addr[i]);
4663 if (in_memb_list[i].ring_id.
rep != 0) {
4675 static void orf_token_endian_convert (
const struct orf_token *in,
struct orf_token *out)
4700 static void mcast_endian_convert (
const struct mcast *in,
struct mcast *out)
4717 static void memb_merge_detect_endian_convert (
4730 static int ignore_join_under_operational (
4732 const struct memb_join *memb_join)
4734 struct srp_addr *proc_list;
4735 struct srp_addr *failed_list;
4737 struct srp_addr aligned_system_from;
4744 if (memb_set_subset (&instance->
my_id, 1,
4753 if ((memb_set_subset (&aligned_system_from, 1,
4762 static int message_handler_memb_join (
4766 int endian_conversion_needed)
4768 const struct memb_join *memb_join;
4769 struct memb_join *memb_join_convert = alloca (msg_len);
4770 struct srp_addr aligned_system_from;
4772 if (check_memb_join_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4776 if (endian_conversion_needed) {
4777 memb_join = memb_join_convert;
4778 memb_join_endian_convert (msg, memb_join_convert);
4791 if (pause_flush (instance)) {
4800 if (!ignore_join_under_operational (instance, memb_join)) {
4801 memb_join_process (instance, memb_join);
4806 memb_join_process (instance, memb_join);
4810 if (memb_set_subset (&aligned_system_from,
4817 memb_join_process (instance, memb_join);
4823 if (memb_set_subset (&aligned_system_from,
4830 memb_join_process (instance, memb_join);
4831 memb_recovery_state_token_loss (instance);
4839 static int message_handler_memb_commit_token (
4843 int endian_conversion_needed)
4845 struct memb_commit_token *memb_commit_token_convert = alloca (msg_len);
4846 struct memb_commit_token *memb_commit_token;
4847 struct srp_addr sub[PROCESSOR_COUNT_MAX];
4850 struct srp_addr *
addr;
4853 "got commit token");
4855 if (check_memb_commit_token_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4859 if (endian_conversion_needed) {
4860 memb_commit_token_endian_convert (msg, memb_commit_token_convert);
4862 memcpy (memb_commit_token_convert, msg, msg_len);
4864 memb_commit_token = memb_commit_token_convert;
4867 #ifdef TEST_DROP_COMMIT_TOKEN_PERCENTAGE
4868 if (random()%100 < TEST_DROP_COMMIT_TOKEN_PERCENTAGE) {
4878 memb_set_subtract (sub, &sub_entries,
4882 if (memb_set_equal (addr,
4888 memcpy (instance->
commit_token, memb_commit_token, msg_len);
4889 memb_state_commit_enter (instance);
4903 memb_state_recovery_enter (instance, memb_commit_token);
4918 "Sending initial ORF token");
4921 orf_token_send_initial (instance);
4922 reset_token_timeout (instance);
4923 reset_token_retransmit_timeout (instance);
4930 static int message_handler_token_hold_cancel (
4934 int endian_conversion_needed)
4938 if (check_token_hold_cancel_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4947 timer_function_token_retransmit_timeout (instance);
4953 static int check_message_header_validity(
4956 unsigned int msg_len,
4957 const struct sockaddr_storage *system_from)
4961 const char *guessed_str;
4962 const char *msg_byte = msg;
4966 "Message received from %s is too short... Ignoring %u.",
4980 if (message_header->
magic == 0xFFFF) {
4984 guessed_str =
"Corosync 2.2";
4985 }
else if (message_header->
magic == 0xFEFE) {
4989 guessed_str =
"Corosync 2.3+";
4990 }
else if (msg_byte[0] == 0x01) {
4994 guessed_str =
"unencrypted Kronosnet";
4995 }
else if (msg_byte[0] >= 0 && msg_byte[0] <= 5) {
5000 guessed_str =
"unencrypted Corosync 2.0/2.1/1.x/OpenAIS";
5011 guessed_str =
"encrypted Kronosnet/Corosync 2.0/2.1/1.x/OpenAIS or unknown";
5015 "Message received from %s has bad magic number (probably sent by %s).. Ignoring",
5024 "Message received from %s has unsupported version %u... Ignoring",
5038 unsigned int msg_len,
5039 const struct sockaddr_storage *system_from)
5044 if (check_message_header_validity(context, msg, msg_len, system_from) == -1) {
5048 switch (message_header->
type) {
5069 "Message received from %s has wrong type... ignoring %d.\n",
5071 (
int)message_header->
type);
5089 unsigned short ip_port,
5090 unsigned int iface_no)
5110 unsigned int iface_no)
5134 "Created or loaded sequence id " CS_PRI_RING_ID " for this ring.",
5170 void (*totem_service_ready) (
void))
5249 timer_function_orf_token_timeout(context);
void(* totemsrp_service_ready_fn)(void)
void(* totemsrp_deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required)
void(*) enum memb_stat memb_state)
int totemsrp_reconfigure(void *context, struct totem_config *totem_config)
struct totem_message_header header
int totemsrp_iface_set(void *context, const struct totem_ip_address *interface_addr, unsigned short ip_port, unsigned int iface_no)
cfg_message_crypto_reconfig_phase_t
void main_iface_change_fn(void *context, const struct totem_ip_address *iface_address, unsigned int iface_no)
struct srp_addr system_from
struct memb_ring_id ring_id
int totemnet_mcast_flush_send(void *net_context, const void *msg, unsigned int msg_len)
uint32_t waiting_trans_ack
struct srp_addr system_from
struct memb_ring_id ring_id
int totemsrp_log_level_debug
struct memb_ring_id my_ring_id
Totem Single Ring Protocol.
uint64_t memb_commit_token_rx
qb_loop_timer_handle timer_orf_token_warning
int my_leave_memb_entries
unsigned int old_ring_state_high_seq_received
unsigned int proc_list_entries
int totemsrp_crypto_reconfigure_phase(void *context, struct totem_config *totem_config, cfg_message_crypto_reconfig_phase_t phase)
struct totem_interface * interfaces
int totemsrp_my_family_get(void *srp_context)
void(* memb_ring_id_create_or_load)(struct memb_ring_id *memb_ring_id, unsigned int nodeid)
void(* totemsrp_confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id)
The totem_ip_address struct.
totemsrp_token_stats_t token[TOTEM_TOKEN_STATS_MAX]
int totemsrp_log_level_error
#define LEAVE_DUMMY_NODEID
struct memb_ring_id ring_id
qb_loop_timer_handle timer_heartbeat_timeout
unsigned int failed_list_entries
unsigned char end_of_memb_join[0]
unsigned long long int tv_old
void totemconfig_commit_new_params(struct totem_config *totem_config, icmap_map_t map)
int totemsrp_member_add(void *context, const struct totem_ip_address *member, int iface_no)
#define SEQNO_START_TOKEN
void totemnet_stats_clear(void *net_context)
unsigned int token_hold_timeout
struct memb_ring_id ring_id
struct totem_ip_address member_list[PROCESSOR_COUNT_MAX]
void * token_sent_event_handle
struct srp_addr system_from
totem_configuration_type
The totem_configuration_type enum.
int totemsrp_log_level_notice
unsigned int proc_list_entries
unsigned int totemsrp_my_nodeid_get(void *srp_context)
void(* totem_memb_ring_id_create_or_load)(struct memb_ring_id *memb_ring_id, unsigned int nodeid)
int totemnet_member_remove(void *net_context, const struct totem_ip_address *member, int ring_no)
#define TOTEM_TOKEN_STATS_MAX
void totemsrp_net_mtu_adjust(struct totem_config *totem_config)
int totemsrp_log_level_warning
int totemsrp_crypto_set(void *srp_context, const char *cipher_type, const char *hash_type)
int totemsrp_member_remove(void *context, const struct totem_ip_address *member, int iface_no)
uint64_t memb_merge_detect_rx
struct cs_queue new_message_queue_trans
unsigned char end_of_commit_token[0]
unsigned char addr[TOTEMIP_ADDRLEN]
char commit_token_storage[40000]
struct cs_queue new_message_queue
struct srp_addr my_deliver_memb_list[PROCESSOR_COUNT_MAX]
void totemsrp_callback_token_destroy(void *srp_context, void **handle_out)
uint64_t gather_token_lost
#define TOTEM_NODE_STATUS_STRUCTURE_VERSION
#define CS_PRI_RING_ID_SEQ
int totemsrp_log_level_trace
void totemip_copy(struct totem_ip_address *addr1, const struct totem_ip_address *addr2)
struct memb_ring_id my_old_ring_id
icmap_map_t icmap_get_global_map(void)
Return global icmap.
unsigned int downcheck_timeout
struct qb_list_head token_callback_received_listhead
struct srp_addr my_new_memb_list[PROCESSOR_COUNT_MAX]
uint64_t memb_commit_token_tx
int my_deliver_memb_entries
unsigned int max_network_delay
unsigned int heartbeat_failures_allowed
#define swab64(x)
The swab64 macro.
struct message_item __attribute__
unsigned long long token_ring_id_seq
struct totem_ip_address mcast_address
void(* totemsrp_log_printf)(int level, int subsys, const char *function, const char *file, int line, const char *format,...) __attribute__((format(printf
int totemsrp_callback_token_create(void *srp_context, void **handle_out, enum totem_callback_token_type type, int delete, int(*callback_fn)(enum totem_callback_token_type type, const void *), const void *data)
unsigned int send_join_timeout
void totemsrp_service_ready_register(void *context, void(*totem_service_ready)(void))
struct srp_addr my_memb_list[PROCESSOR_COUNT_MAX]
uint64_t operational_entered
void(*) in log_level_security)
unsigned long long ring_seq
int totemsrp_mcast(void *srp_context, struct iovec *iovec, unsigned int iov_len, int guarantee)
Multicast a message.
uint64_t operational_token_lost
unsigned int received_flg
uint64_t consensus_timeouts
Totem Network interface - also does encryption/decryption.
unsigned int my_high_delivered
struct message_handlers totemsrp_message_handlers
qb_loop_timer_handle memb_timer_state_gather_consensus_timeout
int totemsrp_nodestatus_get(void *srp_context, unsigned int nodeid, struct totem_node_status *node_status)
uint64_t recovery_token_lost
int totemnet_recv_flush(void *net_context)
int totemnet_token_target_set(void *net_context, unsigned int nodeid)
int totemnet_ifaces_get(void *net_context, char ***status, unsigned int *iface_count)
unsigned char end_of_memb_join[0]
unsigned int token_retransmits_before_loss_const
struct rtr_item rtr_list[0]
struct memb_ring_id ring_id
unsigned int seqno_unchanged_const
uint64_t commit_token_lost
unsigned int miss_count_const
uint64_t token_hold_cancel_rx
int totemnet_crypto_set(void *net_context, const char *cipher_type, const char *hash_type)
unsigned int join_timeout
uint32_t originated_orf_token
void * totemnet_buffer_alloc(void *net_context)
struct totem_ip_address mcast_addr
#define MESSAGE_QUEUE_MAX
int totemnet_recv_mcast_empty(void *net_context)
unsigned int received_flg
unsigned int last_released
int orf_token_retransmit_size
int totemsrp_avail(void *srp_context)
Return number of available messages that can be queued.
int totemnet_initialize(qb_loop_t *loop_pt, void **net_context, struct totem_config *totem_config, totemsrp_stats_t *stats, void *context, void(*deliver_fn)(void *context, const void *msg, unsigned int msg_len, const struct sockaddr_storage *system_from), void(*iface_change_fn)(void *context, const struct totem_ip_address *iface_address, unsigned int ring_no), void(*mtu_changed)(void *context, int net_mtu), void(*target_set_completed)(void *context))
#define RETRANS_MESSAGE_QUEUE_SIZE_MAX
#define LOGSYS_LEVEL_DEBUG
unsigned int fail_to_recv_const
int totemnet_send_flush(void *net_context)
void totemsrp_stats_clear(void *context, int flags)
int totemnet_mcast_noflush_send(void *net_context, const void *msg, unsigned int msg_len)
void * token_recv_event_handle
struct totem_ip_address boundto
unsigned int my_high_seq_received
qb_loop_t * totemsrp_poll_handle
int totemnet_finalize(void *net_context)
qb_loop_timer_handle timer_pause_timeout
qb_loop_timer_handle timer_merge_detect_timeout
int my_merge_detect_timeout_outstanding
void(* log_printf)(int level, int subsys, const char *function_name, const char *file_name, int file_line, const char *format,...) __attribute__((format(printf
int totemsrp_log_level_security
qb_loop_timer_handle timer_orf_token_retransmit_timeout
struct totem_config * totem_config
int(* callback_fn)(enum totem_callback_token_type type, const void *)
struct totem_message_header header
#define swab32(x)
The swab32 macro.
qb_loop_timer_handle timer_orf_token_timeout
uint32_t continuous_gather
void main_deliver_fn(void *context, const void *msg, unsigned int msg_len, const struct sockaddr_storage *system_from)
void totemsrp_threaded_mode_enable(void *context)
int totemnet_token_send(void *net_context, const void *msg, unsigned int msg_len)
int totemsrp_ifaces_get(void *srp_context, unsigned int nodeid, unsigned int *interface_id, struct totem_ip_address *interfaces, unsigned int interfaces_size, char ***status, unsigned int *iface_count)
int totemsrp_initialize(qb_loop_t *poll_handle, void **srp_context, struct totem_config *totem_config, totempg_stats_t *stats, void(*deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required), void(*confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id), void(*waiting_trans_ack_cb_fn)(int waiting_trans_ack))
Create a protocol instance.
struct totem_message_header header
void totemsrp_event_signal(void *srp_context, enum totem_event_type type, int value)
void(* totem_memb_ring_id_store)(const struct memb_ring_id *memb_ring_id, unsigned int nodeid)
uint32_t orf_token_discard
int my_failed_list_entries
struct srp_addr my_left_memb_list[PROCESSOR_COUNT_MAX]
uint64_t token_hold_cancel_tx
const char * totemip_sa_print(const struct sockaddr *sa)
unsigned int token_timeout
struct totem_message_header header
unsigned int high_delivered
unsigned int consensus_timeout
void totemsrp_force_gather(void *context)
#define PROCESSOR_COUNT_MAX
struct totem_ip_address my_addrs[INTERFACE_MAX]
char orf_token_retransmit[TOKEN_SIZE_MAX]
unsigned int token_warning
struct sq regular_sort_queue
#define swab16(x)
The swab16 macro.
struct totem_message_header header
void totemsrp_finalize(void *srp_context)
struct totem_message_header header
#define QUEUE_RTR_ITEMS_SIZE_MAX
struct srp_addr my_failed_list[PROCESSOR_COUNT_MAX]
struct cs_queue retrans_message_queue
const char * gather_state_from_desc[]
qb_loop_timer_handle memb_timer_state_gather_join_timeout
int my_trans_memb_entries
void(* totemsrp_waiting_trans_ack_cb_fn)(int waiting_trans_ack)
uint64_t memb_merge_detect_tx
unsigned int high_delivered
struct rtr_item rtr_list[0]
int consensus_list_entries
int totemnet_crypto_reconfigure_phase(void *net_context, struct totem_config *totem_config, cfg_message_crypto_reconfig_phase_t phase)
int totemnet_member_add(void *net_context, const struct totem_ip_address *local, const struct totem_ip_address *member, int ring_no)
struct totem_interface * orig_interfaces
unsigned char end_of_commit_token[0]
unsigned int cancel_token_hold_on_retransmit
uint32_t threaded_mode_enabled
enum totem_callback_token_type callback_type
unsigned long long ring_seq
struct totem_message_header header
struct totem_logging_configuration totem_logging_configuration
struct memb_ring_id ring_id
#define log_printf(level, format, args...)
void totemsrp_trans_ack(void *context)
unsigned int max_messages
uint64_t recovery_entered
qb_loop_timer_handle memb_timer_state_commit_timeout
void totemnet_buffer_release(void *net_context, void *ptr)
int totemnet_reconfigure(void *net_context, struct totem_config *totem_config)
struct memb_commit_token * commit_token
struct consensus_list_item consensus_list[PROCESSOR_COUNT_MAX]
struct srp_addr system_from
struct srp_addr my_proc_list[PROCESSOR_COUNT_MAX]
void(* memb_ring_id_store)(const struct memb_ring_id *memb_ring_id, unsigned int nodeid)
int(* handler_functions[6])(struct totemsrp_instance *instance, const void *msg, size_t msg_len, int endian_conversion_needed)
unsigned int merge_timeout
unsigned int use_heartbeat
int totemnet_iface_check(void *net_context)
unsigned int token_retransmit_timeout
struct qb_list_head token_callback_sent_listhead
struct srp_addr my_trans_memb_list[PROCESSOR_COUNT_MAX]
#define RETRANSMIT_ENTRIES_MAX
int totemnet_nodestatus_get(void *net_context, unsigned int nodeid, struct totem_node_status *node_status)
unsigned int my_token_seq
struct memb_ring_id ring_id
unsigned int my_leave_memb_list[PROCESSOR_COUNT_MAX]
qb_loop_timer_handle timer_orf_token_hold_retransmit_timeout
int totemnet_iface_set(void *net_context, const struct totem_ip_address *interface_addr, unsigned short ip_port, unsigned int iface_no)
struct memb_ring_id ring_id
unsigned int my_install_seq
unsigned int failed_list_entries
struct sq recovery_sort_queue
#define TOTEMPG_STATS_CLEAR_TRANSPORT
totem_callback_token_type
The totem_callback_token_type enum.
unsigned int my_high_ring_delivered
int totemnet_processor_count_set(void *net_context, int processor_count)