41 #include <sys/types.h> 42 #include <sys/socket.h> 44 #include <sys/ioctl.h> 45 #include <netinet/in.h> 54 #include <arpa/inet.h> 57 #include <qb/qblist.h> 61 #include <qb/qbipc_common.h> 70 #define MAP_ANONYMOUS MAP_ANON 77 #define GROUP_HASH_SIZE 32 143 static struct qb_list_head joinlist_messages_head;
155 struct qb_list_head iteration_instance_list_head;
156 struct qb_list_head zcb_mapped_list_head;
162 struct qb_list_head items_list_head;
172 static unsigned int my_member_list_entries;
176 static unsigned int my_old_member_list_entries = 0;
208 static int cpg_lib_init_fn (
void *conn);
210 static int cpg_lib_exit_fn (
void *conn);
212 static void message_handler_req_exec_cpg_procjoin (
216 static void message_handler_req_exec_cpg_procleave (
220 static void message_handler_req_exec_cpg_joinlist (
224 static void message_handler_req_exec_cpg_mcast (
228 static void message_handler_req_exec_cpg_partial_mcast (
232 static void message_handler_req_exec_cpg_downlist_old (
236 static void message_handler_req_exec_cpg_downlist (
240 static void exec_cpg_procjoin_endian_convert (
void *msg);
242 static void exec_cpg_joinlist_endian_convert (
void *msg);
244 static void exec_cpg_mcast_endian_convert (
void *msg);
246 static void exec_cpg_partial_mcast_endian_convert (
void *msg);
248 static void exec_cpg_downlist_endian_convert_old (
void *msg);
250 static void exec_cpg_downlist_endian_convert (
void *msg);
252 static void message_handler_req_lib_cpg_join (
void *conn,
const void *message);
254 static void message_handler_req_lib_cpg_leave (
void *conn,
const void *message);
256 static void message_handler_req_lib_cpg_finalize (
void *conn,
const void *message);
258 static void message_handler_req_lib_cpg_mcast (
void *conn,
const void *message);
260 static void message_handler_req_lib_cpg_partial_mcast (
void *conn,
const void *message);
262 static void message_handler_req_lib_cpg_membership (
void *conn,
263 const void *message);
265 static void message_handler_req_lib_cpg_local_get (
void *conn,
266 const void *message);
268 static void message_handler_req_lib_cpg_iteration_initialize (
270 const void *message);
272 static void message_handler_req_lib_cpg_iteration_next (
274 const void *message);
276 static void message_handler_req_lib_cpg_iteration_finalize (
278 const void *message);
280 static void message_handler_req_lib_cpg_zc_alloc (
282 const void *message);
284 static void message_handler_req_lib_cpg_zc_free (
286 const void *message);
288 static void message_handler_req_lib_cpg_zc_execute (
290 const void *message);
292 static int cpg_node_joinleave_send (
unsigned int pid,
const mar_cpg_name_t *group_name,
int fn,
int reason);
294 static int cpg_exec_send_downlist(
void);
296 static int cpg_exec_send_joinlist(
void);
298 static void downlist_inform_clients (
void);
300 static void joinlist_inform_clients (
void);
302 static void joinlist_messages_delete (
void);
304 static void cpg_sync_init (
305 const unsigned int *trans_list,
306 size_t trans_list_entries,
307 const unsigned int *member_list,
308 size_t member_list_entries,
311 static int cpg_sync_process (
void);
313 static void cpg_sync_activate (
void);
315 static void cpg_sync_abort (
void);
317 static void do_proc_join(
322 qb_map_t *group_notify_map);
324 static void do_proc_leave(
330 static int notify_lib_totem_membership (
332 int member_list_entries,
333 const unsigned int *member_list);
335 static inline int zcb_all_free (
338 static char *cpg_print_group_name (
351 .lib_handler_fn = message_handler_req_lib_cpg_leave,
355 .lib_handler_fn = message_handler_req_lib_cpg_mcast,
359 .lib_handler_fn = message_handler_req_lib_cpg_membership,
363 .lib_handler_fn = message_handler_req_lib_cpg_local_get,
367 .lib_handler_fn = message_handler_req_lib_cpg_iteration_initialize,
371 .lib_handler_fn = message_handler_req_lib_cpg_iteration_next,
375 .lib_handler_fn = message_handler_req_lib_cpg_iteration_finalize,
379 .lib_handler_fn = message_handler_req_lib_cpg_finalize,
383 .lib_handler_fn = message_handler_req_lib_cpg_zc_alloc,
387 .lib_handler_fn = message_handler_req_lib_cpg_zc_free,
391 .lib_handler_fn = message_handler_req_lib_cpg_zc_execute,
395 .lib_handler_fn = message_handler_req_lib_cpg_partial_mcast,
405 .exec_endian_convert_fn = exec_cpg_procjoin_endian_convert
408 .exec_handler_fn = message_handler_req_exec_cpg_procleave,
409 .exec_endian_convert_fn = exec_cpg_procjoin_endian_convert
412 .exec_handler_fn = message_handler_req_exec_cpg_joinlist,
413 .exec_endian_convert_fn = exec_cpg_joinlist_endian_convert
416 .exec_handler_fn = message_handler_req_exec_cpg_mcast,
417 .exec_endian_convert_fn = exec_cpg_mcast_endian_convert
420 .exec_handler_fn = message_handler_req_exec_cpg_downlist_old,
421 .exec_endian_convert_fn = exec_cpg_downlist_endian_convert_old
424 .exec_handler_fn = message_handler_req_exec_cpg_downlist,
425 .exec_endian_convert_fn = exec_cpg_downlist_endian_convert
428 .exec_handler_fn = message_handler_req_exec_cpg_partial_mcast,
429 .exec_endian_convert_fn = exec_cpg_partial_mcast_endian_convert
434 .
name =
"corosync cluster closed process group service v1.01",
437 .private_data_size =
sizeof (
struct cpg_pd),
440 .lib_init_fn = cpg_lib_init_fn,
441 .lib_exit_fn = cpg_lib_exit_fn,
442 .lib_engine = cpg_lib_engine,
444 .exec_init_fn = cpg_exec_init_fn,
445 .exec_dump_fn = NULL,
446 .exec_engine = cpg_exec_engine,
448 .sync_init = cpg_sync_init,
449 .sync_process = cpg_sync_process,
450 .sync_activate = cpg_sync_activate,
451 .sync_abort = cpg_sync_abort
456 return (&cpg_service_engine);
460 struct qb_ipc_request_header header __attribute__((aligned(8)));
467 struct qb_ipc_request_header header __attribute__((aligned(8)));
476 struct qb_ipc_request_header header __attribute__((aligned(8)));
487 struct qb_ipc_request_header header __attribute__((aligned(8)));
493 struct qb_ipc_request_header header __attribute__((aligned(8)));
520 for (i = 0; i < group->length; i++) {
523 if (c >=
' ' && c < 0x7f && c !=
'\\') {
527 res[dest_pos++] =
'\\';
528 res[dest_pos++] =
'\\';
530 snprintf(res + dest_pos,
sizeof(res) - dest_pos,
"\\x%02X", c);
540 static void cpg_sync_init (
541 const unsigned int *trans_list,
542 size_t trans_list_entries,
543 const unsigned int *member_list,
544 size_t member_list_entries,
553 memcpy (my_member_list, member_list, member_list_entries *
554 sizeof (
unsigned int));
555 my_member_list_entries = member_list_entries;
557 last_sync_ring_id.nodeid = ring_id->
nodeid;
558 last_sync_ring_id.seq = ring_id->
seq;
564 for (i = 0; i < my_old_member_list_entries; i++) {
566 for (j = 0; j < trans_list_entries; j++) {
567 if (my_old_member_list[i] == trans_list[j]) {
573 g_req_exec_cpg_downlist.nodeids[entries++] =
574 my_old_member_list[i];
577 g_req_exec_cpg_downlist.left_nodes = entries;
580 static int cpg_sync_process (
void)
585 res = cpg_exec_send_downlist();
592 res = cpg_exec_send_joinlist();
597 static void cpg_sync_activate (
void)
599 memcpy (my_old_member_list, my_member_list,
600 my_member_list_entries *
sizeof (
unsigned int));
601 my_old_member_list_entries = my_member_list_entries;
603 downlist_inform_clients ();
605 joinlist_inform_clients ();
607 joinlist_messages_delete ();
609 notify_lib_totem_membership (NULL, my_member_list_entries, my_member_list);
612 static void cpg_sync_abort (
void)
615 joinlist_messages_delete ();
618 static int notify_lib_totem_membership (
620 int member_list_entries,
621 const unsigned int *member_list)
623 struct qb_list_head *iter;
635 res->member_list_entries = member_list_entries;
636 res->header.size =
size;
638 res->header.error =
CS_OK;
644 qb_list_for_each(iter, &cpg_pd_list_head) {
663 static void notify_lib_joinlist_fill_member_list(
665 int left_list_entries,
667 int *member_list_entries,
670 struct qb_list_head *iter;
673 if (member_list_entries != NULL) {
674 *member_list_entries = 0;
677 qb_list_for_each(iter, &process_info_list_head) {
680 if (mar_name_compare (&pi->
group, group_name) == 0) {
681 int in_left_list = 0;
683 for (i = 0; i < left_list_entries; i++) {
684 if (left_list[i].
nodeid == pi->
nodeid && left_list[i].pid == pi->
pid) {
691 if (member_list_entries != NULL) {
692 (*member_list_entries)++;
695 if (member_list != NULL) {
696 (*member_list)->nodeid = pi->
nodeid;
697 (*member_list)->pid = pi->
pid;
706 static int notify_lib_joinlist(
708 int joined_list_entries,
710 int left_list_entries,
716 struct qb_list_head *iter;
717 int member_list_entries;
725 notify_lib_joinlist_fill_member_list(group_name, left_list_entries, left_list,
726 &member_list_entries, NULL);
729 sizeof(
mar_cpg_address_t) * (member_list_entries + left_list_entries + joined_list_entries);
735 res->joined_list_entries = joined_list_entries;
736 res->left_list_entries = left_list_entries;
737 res->member_list_entries = member_list_entries;
739 res->header.size =
size;
741 res->header.error =
CS_OK;
747 notify_lib_joinlist_fill_member_list(group_name, left_list_entries, left_list,
753 if (left_list_entries) {
755 retgi += left_list_entries;
758 if (joined_list_entries) {
763 retgi += joined_list_entries;
768 for (i = 0; i < joined_list_entries; i++) {
770 qb_list_for_each(iter, &cpg_pd_list_head) {
772 if (joined_list[i].
pid == cpd->
pid &&
773 mar_name_compare (&cpd->
group_name, group_name) == 0) {
784 qb_list_for_each(iter, &cpg_pd_list_head) {
786 if (mar_name_compare (&cpd->
group_name, group_name) == 0) {
796 if (left_list_entries) {
803 for (i = 0; i < joined_list_entries; i++) {
806 qb_list_for_each(iter, &cpg_pd_list_head) {
808 if (left_list[i].
pid == cpd->
pid &&
809 mar_name_compare (&cpd->
group_name, group_name) == 0) {
822 qb_list_for_each(iter, &cpg_pd_list_head) {
828 notify_lib_totem_membership (cpd->
conn, my_old_member_list_entries, my_old_member_list);
838 "%s: members(old:%d left:%d)",
844 static void downlist_inform_clients (
void)
846 struct qb_list_head *iter, *tmp_iter;
854 int left_list_entries;
855 struct qb_list_head
list;
857 qb_map_iter_t *miter;
860 downlist_log(
"my downlist", &g_req_exec_cpg_downlist);
862 group_map = qb_skiplist_create();
869 qb_list_for_each_safe(iter, tmp_iter, &process_info_list_head) {
873 for (i = 0; i < g_req_exec_cpg_downlist.left_nodes; i++) {
875 if (pi->
nodeid == g_req_exec_cpg_downlist.nodeids[i]) {
882 marshall_from_mar_cpg_name_t(&cpg_group, &left_pi->
group);
883 cpg_group.value[cpg_group.length] = 0;
885 pcd = (
struct confchg_data *)qb_map_get(group_map, cpg_group.value);
887 pcd = (
struct confchg_data *)calloc(1,
sizeof(
struct confchg_data));
888 memcpy(&pcd->cpg_group, &cpg_group,
sizeof(
struct cpg_name));
889 qb_map_put(group_map, pcd->cpg_group.value, pcd);
891 size = pcd->left_list_entries;
892 pcd->left_list[
size].nodeid = left_pi->
nodeid;
893 pcd->left_list[
size].pid = left_pi->
pid;
895 pcd->left_list_entries++;
896 qb_list_del (&left_pi->
list);
902 miter = qb_map_iter_create(group_map);
903 while (qb_map_iter_next(miter, (
void **)&pcd)) {
904 marshall_to_mar_cpg_name_t(&group, &pcd->cpg_group);
906 log_printf (LOG_DEBUG,
"left_list_entries:%d", pcd->left_list_entries);
907 for (i=0; i<pcd->left_list_entries; i++) {
908 log_printf (LOG_DEBUG,
"left_list[%d] group:%s, ip:%s, pid:%d",
909 i, cpg_print_group_name(&group),
911 pcd->left_list[i].pid);
915 notify_lib_joinlist(&group,
917 pcd->left_list_entries,
923 qb_map_iter_free(miter);
924 qb_map_destroy(group_map);
930 static void joinlist_remove_zombie_pi_entries (
void)
932 struct qb_list_head *pi_iter, *tmp_iter;
933 struct qb_list_head *jl_iter;
938 qb_list_for_each_safe(pi_iter, tmp_iter, &process_info_list_head) {
952 qb_list_for_each(jl_iter, &joinlist_messages_head) {
960 pi->
pid == stored_msg->
pid &&
973 static void joinlist_inform_clients (
void)
976 struct qb_list_head *iter;
978 qb_map_t *group_notify_map;
979 qb_map_iter_t *miter;
982 group_notify_map = qb_skiplist_create();
985 qb_list_for_each(iter, &joinlist_messages_head) {
988 log_printf (LOG_DEBUG,
"joinlist_messages[%u] group:%s, ip:%s, pid:%d",
989 i++, cpg_print_group_name(&stored_msg->
group_name),
1002 miter = qb_map_iter_create(group_notify_map);
1003 while (qb_map_iter_next(miter, (
void **)&jld)) {
1004 notify_lib_joinlist(&jld->cpg_group,
1005 jld->join_list_entries, jld->join_list,
1010 qb_map_iter_free(miter);
1011 qb_map_destroy(group_notify_map);
1013 joinlist_remove_zombie_pi_entries ();
1016 static void joinlist_messages_delete (
void)
1019 struct qb_list_head *iter, *tmp_iter;
1021 qb_list_for_each_safe(iter, tmp_iter, &joinlist_messages_head) {
1023 qb_list_del (&stored_msg->
list);
1026 qb_list_init (&joinlist_messages_head);
1031 qb_list_init (&joinlist_messages_head);
1038 struct qb_list_head *iter, *tmp_iter;
1041 qb_list_for_each_safe(iter, tmp_iter, &(cpg_iteration_instance->
items_list_head)) {
1043 qb_list_del (&pi->
list);
1047 qb_list_del (&cpg_iteration_instance->
list);
1048 hdb_handle_destroy (&cpg_iteration_handle_t_db, cpg_iteration_instance->
handle);
1051 static void cpg_pd_finalize (
struct cpg_pd *cpd)
1053 struct qb_list_head *iter, *tmp_iter;
1054 struct cpg_iteration_instance *cpii;
1058 cpii = qb_list_entry (iter,
struct cpg_iteration_instance,
list);
1060 cpg_iteration_instance_finalize (cpii);
1063 qb_list_del (&cpd->
list);
1066 static int cpg_lib_exit_fn (
void *conn)
1077 cpg_pd_finalize (cpd);
1083 static int cpg_node_joinleave_send (
unsigned int pid,
const mar_cpg_name_t *group_name,
int fn,
int reason)
1086 struct iovec req_exec_cpg_iovec;
1107 static void exec_cpg_procjoin_endian_convert (
void *msg)
1111 req_exec_cpg_procjoin->pid =
swab32(req_exec_cpg_procjoin->pid);
1112 swab_mar_cpg_name_t (&req_exec_cpg_procjoin->group_name);
1113 req_exec_cpg_procjoin->reason =
swab32(req_exec_cpg_procjoin->reason);
1116 static void exec_cpg_joinlist_endian_convert (
void *msg_v)
1119 struct qb_ipc_response_header *res = (
struct qb_ipc_response_header *)msg;
1122 swab_mar_int32_t (&res->size);
1124 while ((
const char*)jle < msg + res->size) {
1131 static void exec_cpg_downlist_endian_convert_old (
void *msg)
1135 static void exec_cpg_downlist_endian_convert (
void *msg)
1140 req_exec_cpg_downlist->left_nodes =
swab32(req_exec_cpg_downlist->left_nodes);
1141 req_exec_cpg_downlist->old_members =
swab32(req_exec_cpg_downlist->old_members);
1143 for (i = 0; i < req_exec_cpg_downlist->left_nodes; i++) {
1144 req_exec_cpg_downlist->nodeids[i] =
swab32(req_exec_cpg_downlist->nodeids[i]);
1149 static void exec_cpg_mcast_endian_convert (
void *msg)
1153 swab_coroipc_request_header_t (&req_exec_cpg_mcast->header);
1154 swab_mar_cpg_name_t (&req_exec_cpg_mcast->group_name);
1155 req_exec_cpg_mcast->pid =
swab32(req_exec_cpg_mcast->pid);
1156 req_exec_cpg_mcast->msglen =
swab32(req_exec_cpg_mcast->msglen);
1157 swab_mar_message_source_t (&req_exec_cpg_mcast->source);
1160 static void exec_cpg_partial_mcast_endian_convert (
void *msg)
1164 swab_coroipc_request_header_t (&req_exec_cpg_mcast->header);
1165 swab_mar_cpg_name_t (&req_exec_cpg_mcast->group_name);
1166 req_exec_cpg_mcast->pid =
swab32(req_exec_cpg_mcast->pid);
1167 req_exec_cpg_mcast->msglen =
swab32(req_exec_cpg_mcast->msglen);
1168 req_exec_cpg_mcast->fraglen =
swab32(req_exec_cpg_mcast->fraglen);
1169 req_exec_cpg_mcast->type =
swab32(req_exec_cpg_mcast->type);
1170 swab_mar_message_source_t (&req_exec_cpg_mcast->source);
1174 struct qb_list_head *iter;
1176 qb_list_for_each(iter, &process_info_list_head) {
1179 if (pi->
pid == pid && pi->
nodeid == nodeid &&
1180 mar_name_compare (&pi->
group, group_name) == 0) {
1188 static void do_proc_join(
1191 unsigned int nodeid,
1193 qb_map_t *group_notify_map)
1198 struct qb_list_head *
list;
1199 struct qb_list_head *list_to_add = NULL;
1202 if (process_info_find (name, pid, nodeid) != NULL) {
1212 memcpy(&pi->
group, name,
sizeof(*name));
1213 qb_list_init(&pi->
list);
1218 list_to_add = &process_info_list_head;
1219 qb_list_for_each(list, &process_info_list_head) {
1220 pi_entry = qb_list_entry(list,
struct process_info, list);
1228 qb_list_add (&pi->
list, list_to_add);
1230 notify_info.pid = pi->
pid;
1231 notify_info.nodeid =
nodeid;
1232 notify_info.reason = reason;
1234 if (group_notify_map == NULL) {
1235 notify_lib_joinlist(&pi->
group,
1244 qb_map_put(group_notify_map, jld->
cpg_group.value, jld);
1254 static void do_proc_leave(
1257 unsigned int nodeid,
1261 struct qb_list_head *iter, *tmp_iter;
1264 notify_info.pid = pid;
1265 notify_info.nodeid =
nodeid;
1266 notify_info.reason = reason;
1268 notify_lib_joinlist(name,
1273 qb_list_for_each_safe(iter, tmp_iter, &process_info_list_head) {
1276 if (pi->
pid == pid && pi->
nodeid == nodeid &&
1277 mar_name_compare (&pi->
group, name)==0) {
1278 qb_list_del (&pi->
list);
1284 static void message_handler_req_exec_cpg_downlist_old (
1285 const void *message,
1286 unsigned int nodeid)
1292 static void message_handler_req_exec_cpg_downlist(
1293 const void *message,
1294 unsigned int nodeid)
1296 const struct req_exec_cpg_downlist *req_exec_cpg_downlist = message;
1299 req_exec_cpg_downlist->left_nodes);
1303 static void message_handler_req_exec_cpg_procjoin (
1304 const void *message,
1305 unsigned int nodeid)
1312 (
unsigned int)req_exec_cpg_procjoin->pid);
1314 do_proc_join (&req_exec_cpg_procjoin->group_name,
1315 req_exec_cpg_procjoin->pid, nodeid,
1319 static void message_handler_req_exec_cpg_procleave (
1320 const void *message,
1321 unsigned int nodeid)
1328 (
unsigned int)req_exec_cpg_procjoin->pid);
1330 do_proc_leave (&req_exec_cpg_procjoin->group_name,
1331 req_exec_cpg_procjoin->pid, nodeid,
1332 req_exec_cpg_procjoin->reason);
1337 static void message_handler_req_exec_cpg_joinlist (
1338 const void *message_v,
1339 unsigned int nodeid)
1341 const char *message = message_v;
1342 const struct qb_ipc_response_header *res = (
const struct qb_ipc_response_header *)message;
1349 while ((
const char*)jle < message + res->size) {
1353 stored_msg->
pid = jle->
pid;
1355 qb_list_init (&stored_msg->
list);
1356 qb_list_add (&stored_msg->
list, &joinlist_messages_head);
1361 static void message_handler_req_exec_cpg_mcast (
1362 const void *message,
1363 unsigned int nodeid)
1367 int msglen = req_exec_cpg_mcast->msglen;
1368 struct qb_list_head *iter, *pi_iter, *tmp_iter;
1370 struct iovec iovec[2];
1384 iovec[1].iov_base = (
char*)message+
sizeof(*req_exec_cpg_mcast);
1385 iovec[1].iov_len = msglen;
1387 qb_list_for_each_safe(iter, tmp_iter, &cpg_pd_list_head) {
1388 cpd = qb_list_entry(iter,
struct cpg_pd, list);
1390 && (mar_name_compare (&cpd->
group_name, &req_exec_cpg_mcast->group_name) == 0)) {
1394 qb_list_for_each(pi_iter, &process_info_list_head) {
1397 if (pi->
nodeid == nodeid &&
1398 mar_name_compare (&pi->
group, &req_exec_cpg_mcast->group_name) == 0) {
1415 static void message_handler_req_exec_cpg_partial_mcast (
1416 const void *message,
1417 unsigned int nodeid)
1421 int msglen = req_exec_cpg_mcast->fraglen;
1422 struct qb_list_head *iter, *pi_iter, *tmp_iter;
1424 struct iovec iovec[2];
1442 iovec[1].iov_base = (
char*)message+
sizeof(*req_exec_cpg_mcast);
1443 iovec[1].iov_len = msglen;
1445 qb_list_for_each_safe(iter, tmp_iter, &cpg_pd_list_head) {
1446 cpd = qb_list_entry(iter,
struct cpg_pd, list);
1449 && (mar_name_compare (&cpd->
group_name, &req_exec_cpg_mcast->group_name) == 0)) {
1453 qb_list_for_each(pi_iter, &process_info_list_head) {
1456 if (pi->
nodeid == nodeid &&
1457 mar_name_compare (&pi->
group, &req_exec_cpg_mcast->group_name) == 0) {
1475 static int cpg_exec_send_downlist(
void)
1480 g_req_exec_cpg_downlist.header.size =
sizeof(
struct req_exec_cpg_downlist);
1482 g_req_exec_cpg_downlist.old_members = my_old_member_list_entries;
1484 iov.iov_base = (
void *)&g_req_exec_cpg_downlist;
1485 iov.iov_len = g_req_exec_cpg_downlist.header.size;
1490 static int cpg_exec_send_joinlist(
void)
1493 struct qb_list_head *iter;
1494 struct qb_ipc_response_header *res;
1498 struct iovec req_exec_cpg_iovec;
1500 qb_list_for_each(iter, &process_info_list_head) {
1512 buf_size =
sizeof(
struct qb_ipc_response_header) + sizeof(struct
join_list_entry) * count;
1513 buf = alloca(buf_size);
1518 memset(buf, 0, buf_size);
1520 jle = (
struct join_list_entry *)(buf +
sizeof(
struct qb_ipc_response_header));
1521 res = (
struct qb_ipc_response_header *)buf;
1523 qb_list_for_each(iter, &process_info_list_head) {
1534 res->size =
sizeof(
struct qb_ipc_response_header)+sizeof(struct
join_list_entry) * count;
1536 req_exec_cpg_iovec.iov_base = buf;
1537 req_exec_cpg_iovec.iov_len = res->size;
1542 static int cpg_lib_init_fn (
void *conn)
1545 memset (cpd, 0,
sizeof(
struct cpg_pd));
1547 qb_list_add (&cpd->
list, &cpg_pd_list_head);
1558 static void message_handler_req_lib_cpg_join (
void *conn,
const void *message)
1564 struct qb_list_head *iter;
1567 qb_list_for_each(iter, &cpg_pd_list_head) {
1568 struct cpg_pd *cpd_item = qb_list_entry (iter,
struct cpg_pd, list);
1570 if (cpd_item->
pid == req_lib_cpg_join->pid &&
1571 mar_name_compare(&req_lib_cpg_join->group_name, &cpd_item->
group_name) == 0) {
1583 qb_list_for_each(iter, &process_info_list_head) {
1587 mar_name_compare(&req_lib_cpg_join->group_name, &pi->
group) == 0) {
1603 cpd->
pid = req_lib_cpg_join->pid;
1604 cpd->
flags = req_lib_cpg_join->flags;
1605 memcpy (&cpd->
group_name, &req_lib_cpg_join->group_name,
1608 cpg_node_joinleave_send (req_lib_cpg_join->pid,
1609 &req_lib_cpg_join->group_name,
1631 static void message_handler_req_lib_cpg_leave (
void *conn,
const void *message)
1653 cpg_node_joinleave_send (req_lib_cpg_leave->pid,
1654 &req_lib_cpg_leave->group_name,
1668 static void message_handler_req_lib_cpg_finalize (
1670 const void *message)
1682 qb_list_del (&cpd->
list);
1683 qb_list_init (&cpd->
list);
1703 fd = open (path, O_RDWR, 0600);
1711 res = ftruncate (fd, bytes);
1713 goto error_close_unlink;
1716 addr = mmap (NULL, bytes, PROT_READ | PROT_WRITE,
1719 if (addr == MAP_FAILED) {
1720 goto error_close_unlink;
1723 madvise(addr, bytes, MADV_NOSYNC);
1728 munmap (addr, bytes);
1740 static inline int zcb_alloc (
1742 const char *path_to_file,
1749 zcb_mapped = malloc (
sizeof (
struct zcb_mapped));
1750 if (zcb_mapped == NULL) {
1763 qb_list_init (&zcb_mapped->
list);
1771 static inline int zcb_free (
struct zcb_mapped *zcb_mapped)
1775 res = munmap (zcb_mapped->
addr, zcb_mapped->
size);
1776 qb_list_del (&zcb_mapped->
list);
1781 static inline int zcb_by_addr_free (
struct cpg_pd *cpd,
void *addr)
1783 struct qb_list_head *
list, *tmp_iter;
1784 struct zcb_mapped *zcb_mapped;
1785 unsigned int res = 0;
1788 zcb_mapped = qb_list_entry (list,
struct zcb_mapped, list);
1790 if (zcb_mapped->
addr == addr) {
1791 res = zcb_free (zcb_mapped);
1799 static inline int zcb_all_free (
1802 struct qb_list_head *
list, *tmp_iter;
1803 struct zcb_mapped *zcb_mapped;
1806 zcb_mapped = qb_list_entry (list,
struct zcb_mapped, list);
1808 zcb_free (zcb_mapped);
1818 static uint64_t void2serveraddr (
void *server_ptr)
1826 static void *serveraddr2void (uint64_t
server_addr)
1834 static void message_handler_req_lib_cpg_zc_alloc (
1836 const void *message)
1839 struct qb_ipc_response_header res_header;
1847 res = zcb_alloc (cpd, hdr->path_to_file, hdr->map_size,
1854 res_header.size =
sizeof (
struct qb_ipc_response_header);
1861 static void message_handler_req_lib_cpg_zc_free (
1863 const void *message)
1866 struct qb_ipc_response_header res_header;
1872 addr = serveraddr2void (hdr->server_address);
1874 zcb_by_addr_free (cpd, addr);
1876 res_header.size =
sizeof (
struct qb_ipc_response_header);
1884 static void message_handler_req_lib_cpg_partial_mcast (
void *conn,
const void *message)
1890 struct iovec req_exec_cpg_iovec[2];
1893 int msglen = req_lib_cpg_mcast->fraglen;
1915 res_lib_cpg_partial_send.header.size =
sizeof(res_lib_cpg_partial_send);
1925 if (error ==
CS_OK) {
1926 req_exec_cpg_mcast.header.size =
sizeof(req_exec_cpg_mcast) + msglen;
1929 req_exec_cpg_mcast.pid = cpd->
pid;
1930 req_exec_cpg_mcast.msglen = req_lib_cpg_mcast->msglen;
1931 req_exec_cpg_mcast.type = req_lib_cpg_mcast->type;
1932 req_exec_cpg_mcast.fraglen = req_lib_cpg_mcast->fraglen;
1934 memcpy(&req_exec_cpg_mcast.group_name, &group_name,
1937 req_exec_cpg_iovec[0].iov_base = (
char *)&req_exec_cpg_mcast;
1938 req_exec_cpg_iovec[0].iov_len =
sizeof(req_exec_cpg_mcast);
1939 req_exec_cpg_iovec[1].iov_base = (
char *)&req_lib_cpg_mcast->message;
1940 req_exec_cpg_iovec[1].iov_len = msglen;
1943 assert(result == 0);
1946 conn, group_name.value, cpd->
cpd_state, error);
1949 res_lib_cpg_partial_send.header.error = error;
1951 sizeof (res_lib_cpg_partial_send));
1955 static void message_handler_req_lib_cpg_mcast (
void *conn,
const void *message)
1961 struct iovec req_exec_cpg_iovec[2];
1962 struct req_exec_cpg_mcast req_exec_cpg_mcast;
1963 int msglen = req_lib_cpg_mcast->msglen;
1984 if (error ==
CS_OK) {
1985 memset(&req_exec_cpg_mcast, 0,
sizeof(req_exec_cpg_mcast));
1987 req_exec_cpg_mcast.header.size =
sizeof(req_exec_cpg_mcast) + msglen;
1990 req_exec_cpg_mcast.pid = cpd->
pid;
1991 req_exec_cpg_mcast.msglen = msglen;
1993 memcpy(&req_exec_cpg_mcast.group_name, &group_name,
1996 req_exec_cpg_iovec[0].iov_base = (
char *)&req_exec_cpg_mcast;
1997 req_exec_cpg_iovec[0].iov_len =
sizeof(req_exec_cpg_mcast);
1998 req_exec_cpg_iovec[1].iov_base = (
char *)&req_lib_cpg_mcast->message;
1999 req_exec_cpg_iovec[1].iov_len = msglen;
2002 assert(result == 0);
2005 conn, group_name.value, cpd->
cpd_state, error);
2009 static void message_handler_req_lib_cpg_zc_execute (
2011 const void *message)
2014 struct qb_ipc_request_header *
header;
2017 struct iovec req_exec_cpg_iovec[2];
2018 struct req_exec_cpg_mcast req_exec_cpg_mcast;
2025 header = (
struct qb_ipc_request_header *)(((
char *)serveraddr2void(hdr->server_address) + sizeof (
struct coroipcs_zc_header)));
2026 req_lib_cpg_mcast = (
struct req_lib_cpg_mcast *)header;
2043 res_lib_cpg_mcast.header.size =
sizeof(res_lib_cpg_mcast);
2045 if (error ==
CS_OK) {
2046 req_exec_cpg_mcast.header.size =
sizeof(req_exec_cpg_mcast) + req_lib_cpg_mcast->msglen;
2049 req_exec_cpg_mcast.pid = cpd->
pid;
2050 req_exec_cpg_mcast.msglen = req_lib_cpg_mcast->msglen;
2052 memcpy(&req_exec_cpg_mcast.group_name, &cpd->
group_name,
2055 req_exec_cpg_iovec[0].iov_base = (
char *)&req_exec_cpg_mcast;
2056 req_exec_cpg_iovec[0].iov_len =
sizeof(req_exec_cpg_mcast);
2057 req_exec_cpg_iovec[1].iov_base = (
char *)header +
sizeof(
struct req_lib_cpg_mcast);
2058 req_exec_cpg_iovec[1].iov_len = req_exec_cpg_mcast.msglen;
2062 res_lib_cpg_mcast.header.error =
CS_OK;
2067 res_lib_cpg_mcast.header.error = error;
2071 sizeof (res_lib_cpg_mcast));
2075 static void message_handler_req_lib_cpg_membership (
void *conn,
2076 const void *message)
2079 (
struct req_lib_cpg_membership_get *)message;
2081 struct qb_list_head *iter;
2082 int member_count = 0;
2085 res_lib_cpg_membership_get.header.error =
CS_OK;
2086 res_lib_cpg_membership_get.header.size =
2087 sizeof (
struct res_lib_cpg_membership_get);
2089 qb_list_for_each(iter, &process_info_list_head) {
2091 if (mar_name_compare (&pi->
group, &req_lib_cpg_membership_get->group_name) == 0) {
2093 res_lib_cpg_membership_get.
member_list[member_count].pid = pi->
pid;
2097 res_lib_cpg_membership_get.member_count = member_count;
2100 sizeof (res_lib_cpg_membership_get));
2103 static void message_handler_req_lib_cpg_local_get (
void *conn,
2104 const void *message)
2108 res_lib_cpg_local_get.header.size =
sizeof (res_lib_cpg_local_get);
2110 res_lib_cpg_local_get.header.error =
CS_OK;
2114 sizeof (res_lib_cpg_local_get));
2117 static void message_handler_req_lib_cpg_iteration_initialize (
2119 const void *message)
2125 struct qb_list_head *iter, *iter2;
2126 struct cpg_iteration_instance *cpg_iteration_instance;
2139 res = hdb_handle_create (&cpg_iteration_handle_t_db,
sizeof (
struct cpg_iteration_instance),
2140 &cpg_iteration_handle);
2147 res = hdb_handle_get (&cpg_iteration_handle_t_db, cpg_iteration_handle, (
void *)&cpg_iteration_instance);
2154 qb_list_init (&cpg_iteration_instance->items_list_head);
2155 cpg_iteration_instance->handle = cpg_iteration_handle;
2160 qb_list_for_each(iter, &process_info_list_head) {
2170 qb_list_for_each(iter2, &(cpg_iteration_instance->items_list_head)) {
2173 if (mar_name_compare (&pi2->
group, &pi->
group) == 0) {
2189 if (mar_name_compare (&pi->
group, &req_lib_cpg_iterationinitialize->group_name) != 0)
2202 goto error_put_destroy;
2206 qb_list_init (&new_pi->
list);
2218 qb_list_for_each(iter2, &(cpg_iteration_instance->items_list_head)) {
2221 if (mar_name_compare (&pi2->
group, &pi->
group) == 0) {
2226 qb_list_add (&new_pi->
list, iter2);
2236 qb_list_init (&cpg_iteration_instance->list);
2239 cpg_iteration_instance->current_pointer = &cpg_iteration_instance->items_list_head;
2242 hdb_handle_put (&cpg_iteration_handle_t_db, cpg_iteration_handle);
2244 if (error !=
CS_OK) {
2245 hdb_handle_destroy (&cpg_iteration_handle_t_db, cpg_iteration_handle);
2249 res_lib_cpg_iterationinitialize.header.size =
sizeof (res_lib_cpg_iterationinitialize);
2251 res_lib_cpg_iterationinitialize.header.error = error;
2252 res_lib_cpg_iterationinitialize.iteration_handle = cpg_iteration_handle;
2255 sizeof (res_lib_cpg_iterationinitialize));
2258 static void message_handler_req_lib_cpg_iteration_next (
2260 const void *message)
2264 struct cpg_iteration_instance *cpg_iteration_instance;
2271 res = hdb_handle_get (&cpg_iteration_handle_t_db,
2272 req_lib_cpg_iterationnext->iteration_handle,
2273 (
void *)&cpg_iteration_instance);
2280 assert (cpg_iteration_instance);
2282 cpg_iteration_instance->current_pointer = cpg_iteration_instance->current_pointer->next;
2284 if (cpg_iteration_instance->current_pointer == &cpg_iteration_instance->items_list_head) {
2289 pi = qb_list_entry (cpg_iteration_instance->current_pointer,
struct process_info, list);
2294 res_lib_cpg_iterationnext.description.nodeid = pi->
nodeid;
2295 res_lib_cpg_iterationnext.description.pid = pi->
pid;
2296 memcpy (&res_lib_cpg_iterationnext.description.group,
2301 hdb_handle_put (&cpg_iteration_handle_t_db, req_lib_cpg_iterationnext->iteration_handle);
2303 res_lib_cpg_iterationnext.header.size =
sizeof (res_lib_cpg_iterationnext);
2305 res_lib_cpg_iterationnext.header.error = error;
2308 sizeof (res_lib_cpg_iterationnext));
2311 static void message_handler_req_lib_cpg_iteration_finalize (
2313 const void *message)
2317 struct cpg_iteration_instance *cpg_iteration_instance;
2323 res = hdb_handle_get (&cpg_iteration_handle_t_db,
2324 req_lib_cpg_iterationfinalize->iteration_handle,
2325 (
void *)&cpg_iteration_instance);
2332 assert (cpg_iteration_instance);
2334 cpg_iteration_instance_finalize (cpg_iteration_instance);
2335 hdb_handle_put (&cpg_iteration_handle_t_db, cpg_iteration_instance->handle);
2338 res_lib_cpg_iterationfinalize.header.size =
sizeof (res_lib_cpg_iterationfinalize);
2340 res_lib_cpg_iterationfinalize.header.error = error;
2343 sizeof (res_lib_cpg_iterationfinalize));
void *(* ipc_private_data_get)(void *conn)
int initial_totem_conf_sent
mar_cpg_address_t member_list[]
mar_req_coroipcc_zc_free_t struct
#define CPG_MAX_NAME_LENGTH
mar_cpg_address_t join_list[CPG_MEMBERS_MAX]
uint64_t initial_transition_counter
#define LOGSYS_LEVEL_TRACE
mar_uint32_t sender_nodeid
#define CPG_MODEL_V1_DELIVER_INITIAL_TOTEM_CONF
The req_lib_cpg_join struct.
mar_req_coroipcc_zc_alloc_t struct
The corosync_service_engine struct.
int(* ipc_dispatch_iov_send)(void *conn, const struct iovec *iov, unsigned int iov_len)
int(* ipc_response_send)(void *conn, const void *msg, size_t mlen)
struct corosync_service_engine * cpg_get_service_engine_ver0(void)
The res_lib_cpg_partial_deliver_callback struct.
The req_lib_cpg_mcast struct.
The corosync_lib_handler struct.
The res_lib_cpg_membership_get struct.
The res_lib_cpg_iterationnext struct.
The res_lib_cpg_iterationinitialize struct.
The corosync_exec_handler struct.
int(* totem_mcast)(const struct iovec *iovec, unsigned int iov_len, unsigned int guarantee)
uint64_t transition_counter
#define log_printf(level, format, args...)
The res_lib_cpg_partial_send struct.
void(* exec_handler_fn)(const void *msg, unsigned int nodeid)
#define SERVICE_ID_MAKE(a, b)
The req_lib_cpg_iterationinitialize struct.
#define LOGSYS_LEVEL_WARNING
The res_lib_cpg_join struct.
unsigned int(* totem_nodeid_get)(void)
void(* ipc_refcnt_dec)(void *conn)
struct qb_list_head * current_pointer
mar_req_coroipcc_zc_execute_t struct
The res_lib_cpg_mcast struct.
#define LOGSYS_LEVEL_ERROR
mar_uint32_t member_list[]
cs_error_t
The cs_error_t enum.
The req_lib_cpg_leave struct.
#define LOGSYS_LEVEL_DEBUG
mar_cpg_address_t member_list[PROCESSOR_COUNT_MAX]
mar_cpg_name_t group_name
The req_lib_cpg_iterationfinalize struct.
mar_cpg_name_t group_name
The corosync_api_v1 struct.
LOGSYS_DECLARE_SUBSYS("CPG")
DECLARE_HDB_DATABASE(cpg_iteration_handle_t_db, NULL)
struct totem_message_header header
#define swab32(x)
The swab32 macro.
The res_lib_cpg_finalize struct.
struct qb_list_head items_list_head
struct qb_list_head zcb_mapped_list_head
The res_lib_cpg_local_get struct.
#define PROCESSOR_COUNT_MAX
The res_lib_cpg_iterationfinalize struct.
struct corosync_service_engine cpg_service_engine
The req_lib_cpg_partial_mcast struct.
The req_lib_cpg_iterationnext struct.
const char *(* totem_ifaces_print)(unsigned int nodeid)
The res_lib_cpg_confchg_callback struct.
mar_cpg_name_t group_name
struct qb_list_head iteration_instance_list_head
void(* lib_handler_fn)(void *conn, const void *msg)
The req_lib_cpg_membership_get struct.
QB_LIST_DECLARE(cpg_pd_list_head)
int(* ipc_dispatch_send)(void *conn, const void *msg, size_t mlen)
The res_lib_cpg_leave struct.
struct memb_ring_id ring_id
void(* ipc_source_set)(mar_message_source_t *source, void *conn)
The res_lib_cpg_totem_confchg_callback struct.
Message from another node.
The mar_message_source_t struct.
void(* ipc_refcnt_inc)(void *conn)