corosync  2.4.4-dirty
exec/cpg.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2006-2015 Red Hat, Inc.
3  *
4  * All rights reserved.
5  *
6  * Author: Christine Caulfield (ccaulfie@redhat.com)
7  * Author: Jan Friesse (jfriesse@redhat.com)
8  *
9  * This software licensed under BSD license, the text of which follows:
10  *
11  * Redistribution and use in source and binary forms, with or without
12  * modification, are permitted provided that the following conditions are met:
13  *
14  * - Redistributions of source code must retain the above copyright notice,
15  * this list of conditions and the following disclaimer.
16  * - Redistributions in binary form must reproduce the above copyright notice,
17  * this list of conditions and the following disclaimer in the documentation
18  * and/or other materials provided with the distribution.
19  * - Neither the name of the MontaVista Software, Inc. nor the names of its
20  * contributors may be used to endorse or promote products derived from this
21  * software without specific prior written permission.
22  *
23  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTIBUTORS "AS IS"
24  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
27  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
28  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
29  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
30  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
31  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
32  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
33  * THE POSSIBILITY OF SUCH DAMAGE.
34  */
35 
36 #include <config.h>
37 
38 #ifdef HAVE_ALLOCA_H
39 #include <alloca.h>
40 #endif
41 #include <sys/types.h>
42 #include <sys/socket.h>
43 #include <sys/un.h>
44 #include <sys/ioctl.h>
45 #include <netinet/in.h>
46 #include <sys/uio.h>
47 #include <unistd.h>
48 #include <fcntl.h>
49 #include <stdlib.h>
50 #include <stdio.h>
51 #include <errno.h>
52 #include <time.h>
53 #include <assert.h>
54 #include <arpa/inet.h>
55 #include <sys/mman.h>
56 #include <qb/qbmap.h>
57 
58 #include <corosync/corotypes.h>
59 #include <qb/qbipc_common.h>
60 #include <corosync/corodefs.h>
61 #include <corosync/list.h>
62 #include <corosync/logsys.h>
63 #include <corosync/coroapi.h>
64 
65 #include <corosync/cpg.h>
66 #include <corosync/ipc_cpg.h>
67 
68 #ifndef MAP_ANONYMOUS
69 #define MAP_ANONYMOUS MAP_ANON
70 #endif
71 
72 #include "service.h"
73 
74 LOGSYS_DECLARE_SUBSYS ("CPG");
75 
76 #define GROUP_HASH_SIZE 32
77 
86 };
87 
88 struct zcb_mapped {
89  struct list_head list;
90  void *addr;
91  size_t size;
92 };
93 /*
94  * state` exec deliver
95  * match group name, pid -> if matched deliver for YES:
96  * XXX indicates impossible state
97  *
98  * join leave mcast
99  * UNJOINED XXX XXX NO
100  * LEAVE_STARTED XXX YES(unjoined_enter) YES
101  * JOIN_STARTED YES(join_started_enter) XXX NO
102  * JOIN_COMPLETED XXX NO YES
103  *
104  * join_started_enter
105  * set JOIN_COMPLETED
106  * add entry to process_info list
107  * unjoined_enter
108  * set UNJOINED
109  * delete entry from process_info list
110  *
111  *
112  * library accept join error codes
113  * UNJOINED YES(CS_OK) set JOIN_STARTED
114  * LEAVE_STARTED NO(CS_ERR_BUSY)
115  * JOIN_STARTED NO(CS_ERR_EXIST)
116  * JOIN_COMPlETED NO(CS_ERR_EXIST)
117  *
118  * library accept leave error codes
119  * UNJOINED NO(CS_ERR_NOT_EXIST)
120  * LEAVE_STARTED NO(CS_ERR_NOT_EXIST)
121  * JOIN_STARTED NO(CS_ERR_BUSY)
122  * JOIN_COMPLETED YES(CS_OK) set LEAVE_STARTED
123  *
124  * library accept mcast
125  * UNJOINED NO(CS_ERR_NOT_EXIST)
126  * LEAVE_STARTED NO(CS_ERR_NOT_EXIST)
127  * JOIN_STARTED YES(CS_OK)
128  * JOIN_COMPLETED YES(CS_OK)
129  */
130 enum cpd_state {
135 };
136 
140 };
141 
142 static struct list_head joinlist_messages_head;
143 
144 struct cpg_pd {
145  void *conn;
147  uint32_t pid;
149  unsigned int flags;
151  uint64_t transition_counter; /* These two are used when sending fragmented messages */
153  struct list_head list;
154  struct list_head iteration_instance_list_head;
155  struct list_head zcb_mapped_list_head;
156 };
157 
160  struct list_head list;
161  struct list_head items_list_head; /* List of process_info */
163 };
164 
165 DECLARE_HDB_DATABASE(cpg_iteration_handle_t_db,NULL);
166 
167 DECLARE_LIST_INIT(cpg_pd_list_head);
168 
169 static unsigned int my_member_list[PROCESSOR_COUNT_MAX];
170 
171 static unsigned int my_member_list_entries;
172 
173 static unsigned int my_old_member_list[PROCESSOR_COUNT_MAX];
174 
175 static unsigned int my_old_member_list_entries = 0;
176 
177 static struct corosync_api_v1 *api = NULL;
178 
179 static enum cpg_sync_state my_sync_state = CPGSYNC_DOWNLIST;
180 
181 static mar_cpg_ring_id_t last_sync_ring_id;
182 
183 struct process_info {
184  unsigned int nodeid;
185  uint32_t pid;
187  struct list_head list; /* on the group_info members list */
188 };
189 DECLARE_LIST_INIT(process_info_list_head);
190 
192  uint32_t pid;
194 };
195 
196 /*
197  * Service Interfaces required by service_message_handler struct
198  */
199 static char *cpg_exec_init_fn (struct corosync_api_v1 *);
200 
201 static int cpg_lib_init_fn (void *conn);
202 
203 static int cpg_lib_exit_fn (void *conn);
204 
205 static void message_handler_req_exec_cpg_procjoin (
206  const void *message,
207  unsigned int nodeid);
208 
209 static void message_handler_req_exec_cpg_procleave (
210  const void *message,
211  unsigned int nodeid);
212 
213 static void message_handler_req_exec_cpg_joinlist (
214  const void *message,
215  unsigned int nodeid);
216 
217 static void message_handler_req_exec_cpg_mcast (
218  const void *message,
219  unsigned int nodeid);
220 
221 static void message_handler_req_exec_cpg_partial_mcast (
222  const void *message,
223  unsigned int nodeid);
224 
225 static void message_handler_req_exec_cpg_downlist_old (
226  const void *message,
227  unsigned int nodeid);
228 
229 static void message_handler_req_exec_cpg_downlist (
230  const void *message,
231  unsigned int nodeid);
232 
233 static void exec_cpg_procjoin_endian_convert (void *msg);
234 
235 static void exec_cpg_joinlist_endian_convert (void *msg);
236 
237 static void exec_cpg_mcast_endian_convert (void *msg);
238 
239 static void exec_cpg_partial_mcast_endian_convert (void *msg);
240 
241 static void exec_cpg_downlist_endian_convert_old (void *msg);
242 
243 static void exec_cpg_downlist_endian_convert (void *msg);
244 
245 static void message_handler_req_lib_cpg_join (void *conn, const void *message);
246 
247 static void message_handler_req_lib_cpg_leave (void *conn, const void *message);
248 
249 static void message_handler_req_lib_cpg_finalize (void *conn, const void *message);
250 
251 static void message_handler_req_lib_cpg_mcast (void *conn, const void *message);
252 
253 static void message_handler_req_lib_cpg_partial_mcast (void *conn, const void *message);
254 
255 static void message_handler_req_lib_cpg_membership (void *conn,
256  const void *message);
257 
258 static void message_handler_req_lib_cpg_local_get (void *conn,
259  const void *message);
260 
261 static void message_handler_req_lib_cpg_iteration_initialize (
262  void *conn,
263  const void *message);
264 
265 static void message_handler_req_lib_cpg_iteration_next (
266  void *conn,
267  const void *message);
268 
269 static void message_handler_req_lib_cpg_iteration_finalize (
270  void *conn,
271  const void *message);
272 
273 static void message_handler_req_lib_cpg_zc_alloc (
274  void *conn,
275  const void *message);
276 
277 static void message_handler_req_lib_cpg_zc_free (
278  void *conn,
279  const void *message);
280 
281 static void message_handler_req_lib_cpg_zc_execute (
282  void *conn,
283  const void *message);
284 
285 static int cpg_node_joinleave_send (unsigned int pid, const mar_cpg_name_t *group_name, int fn, int reason);
286 
287 static int cpg_exec_send_downlist(void);
288 
289 static int cpg_exec_send_joinlist(void);
290 
291 static void downlist_inform_clients (void);
292 
293 static void joinlist_inform_clients (void);
294 
295 static void joinlist_messages_delete (void);
296 
297 static void cpg_sync_init (
298  const unsigned int *trans_list,
299  size_t trans_list_entries,
300  const unsigned int *member_list,
301  size_t member_list_entries,
302  const struct memb_ring_id *ring_id);
303 
304 static int cpg_sync_process (void);
305 
306 static void cpg_sync_activate (void);
307 
308 static void cpg_sync_abort (void);
309 
310 static void do_proc_join(
311  const mar_cpg_name_t *name,
312  uint32_t pid,
313  unsigned int nodeid,
314  int reason);
315 
316 static void do_proc_leave(
317  const mar_cpg_name_t *name,
318  uint32_t pid,
319  unsigned int nodeid,
320  int reason);
321 
322 static int notify_lib_totem_membership (
323  void *conn,
324  int member_list_entries,
325  const unsigned int *member_list);
326 
327 static inline int zcb_all_free (
328  struct cpg_pd *cpd);
329 
330 static char *cpg_print_group_name (
331  const mar_cpg_name_t *group);
332 
333 /*
334  * Library Handler Definition
335  */
336 static struct corosync_lib_handler cpg_lib_engine[] =
337 {
338  { /* 0 - MESSAGE_REQ_CPG_JOIN */
339  .lib_handler_fn = message_handler_req_lib_cpg_join,
340  .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
341  },
342  { /* 1 - MESSAGE_REQ_CPG_LEAVE */
343  .lib_handler_fn = message_handler_req_lib_cpg_leave,
344  .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
345  },
346  { /* 2 - MESSAGE_REQ_CPG_MCAST */
347  .lib_handler_fn = message_handler_req_lib_cpg_mcast,
348  .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
349  },
350  { /* 3 - MESSAGE_REQ_CPG_MEMBERSHIP */
351  .lib_handler_fn = message_handler_req_lib_cpg_membership,
352  .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
353  },
354  { /* 4 - MESSAGE_REQ_CPG_LOCAL_GET */
355  .lib_handler_fn = message_handler_req_lib_cpg_local_get,
356  .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
357  },
358  { /* 5 - MESSAGE_REQ_CPG_ITERATIONINITIALIZE */
359  .lib_handler_fn = message_handler_req_lib_cpg_iteration_initialize,
360  .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
361  },
362  { /* 6 - MESSAGE_REQ_CPG_ITERATIONNEXT */
363  .lib_handler_fn = message_handler_req_lib_cpg_iteration_next,
364  .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
365  },
366  { /* 7 - MESSAGE_REQ_CPG_ITERATIONFINALIZE */
367  .lib_handler_fn = message_handler_req_lib_cpg_iteration_finalize,
368  .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
369  },
370  { /* 8 - MESSAGE_REQ_CPG_FINALIZE */
371  .lib_handler_fn = message_handler_req_lib_cpg_finalize,
372  .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
373  },
374  { /* 9 */
375  .lib_handler_fn = message_handler_req_lib_cpg_zc_alloc,
376  .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
377  },
378  { /* 10 */
379  .lib_handler_fn = message_handler_req_lib_cpg_zc_free,
380  .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
381  },
382  { /* 11 */
383  .lib_handler_fn = message_handler_req_lib_cpg_zc_execute,
384  .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
385  },
386  { /* 12 */
387  .lib_handler_fn = message_handler_req_lib_cpg_partial_mcast,
388  .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
389  },
390 
391 };
392 
393 static struct corosync_exec_handler cpg_exec_engine[] =
394 {
395  { /* 0 - MESSAGE_REQ_EXEC_CPG_PROCJOIN */
396  .exec_handler_fn = message_handler_req_exec_cpg_procjoin,
397  .exec_endian_convert_fn = exec_cpg_procjoin_endian_convert
398  },
399  { /* 1 - MESSAGE_REQ_EXEC_CPG_PROCLEAVE */
400  .exec_handler_fn = message_handler_req_exec_cpg_procleave,
401  .exec_endian_convert_fn = exec_cpg_procjoin_endian_convert
402  },
403  { /* 2 - MESSAGE_REQ_EXEC_CPG_JOINLIST */
404  .exec_handler_fn = message_handler_req_exec_cpg_joinlist,
405  .exec_endian_convert_fn = exec_cpg_joinlist_endian_convert
406  },
407  { /* 3 - MESSAGE_REQ_EXEC_CPG_MCAST */
408  .exec_handler_fn = message_handler_req_exec_cpg_mcast,
409  .exec_endian_convert_fn = exec_cpg_mcast_endian_convert
410  },
411  { /* 4 - MESSAGE_REQ_EXEC_CPG_DOWNLIST_OLD */
412  .exec_handler_fn = message_handler_req_exec_cpg_downlist_old,
413  .exec_endian_convert_fn = exec_cpg_downlist_endian_convert_old
414  },
415  { /* 5 - MESSAGE_REQ_EXEC_CPG_DOWNLIST */
416  .exec_handler_fn = message_handler_req_exec_cpg_downlist,
417  .exec_endian_convert_fn = exec_cpg_downlist_endian_convert
418  },
419  { /* 6 - MESSAGE_REQ_EXEC_CPG_PARTIAL_MCAST */
420  .exec_handler_fn = message_handler_req_exec_cpg_partial_mcast,
421  .exec_endian_convert_fn = exec_cpg_partial_mcast_endian_convert
422  },
423 };
424 
426  .name = "corosync cluster closed process group service v1.01",
427  .id = CPG_SERVICE,
428  .priority = 1,
429  .private_data_size = sizeof (struct cpg_pd),
430  .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED,
431  .allow_inquorate = CS_LIB_ALLOW_INQUORATE,
432  .lib_init_fn = cpg_lib_init_fn,
433  .lib_exit_fn = cpg_lib_exit_fn,
434  .lib_engine = cpg_lib_engine,
435  .lib_engine_count = sizeof (cpg_lib_engine) / sizeof (struct corosync_lib_handler),
436  .exec_init_fn = cpg_exec_init_fn,
437  .exec_dump_fn = NULL,
438  .exec_engine = cpg_exec_engine,
439  .exec_engine_count = sizeof (cpg_exec_engine) / sizeof (struct corosync_exec_handler),
440  .sync_init = cpg_sync_init,
441  .sync_process = cpg_sync_process,
442  .sync_activate = cpg_sync_activate,
443  .sync_abort = cpg_sync_abort
444 };
445 
447 {
448  return (&cpg_service_engine);
449 }
450 
452  struct qb_ipc_request_header header __attribute__((aligned(8)));
453  mar_cpg_name_t group_name __attribute__((aligned(8)));
454  mar_uint32_t pid __attribute__((aligned(8)));
455  mar_uint32_t reason __attribute__((aligned(8)));
456 };
457 
459  struct qb_ipc_request_header header __attribute__((aligned(8)));
460  mar_cpg_name_t group_name __attribute__((aligned(8)));
461  mar_uint32_t msglen __attribute__((aligned(8)));
462  mar_uint32_t pid __attribute__((aligned(8)));
463  mar_message_source_t source __attribute__((aligned(8)));
464  mar_uint8_t message[] __attribute__((aligned(8)));
465 };
466 
468  struct qb_ipc_request_header header __attribute__((aligned(8)));
469  mar_cpg_name_t group_name __attribute__((aligned(8)));
470  mar_uint32_t msglen __attribute__((aligned(8)));
471  mar_uint32_t fraglen __attribute__((aligned(8)));
472  mar_uint32_t pid __attribute__((aligned(8)));
473  mar_uint32_t type __attribute__((aligned(8)));
474  mar_message_source_t source __attribute__((aligned(8)));
475  mar_uint8_t message[] __attribute__((aligned(8)));
476 };
477 
479  struct qb_ipc_request_header header __attribute__((aligned(8)));
480  mar_uint32_t left_nodes __attribute__((aligned(8)));
481  mar_uint32_t nodeids[PROCESSOR_COUNT_MAX] __attribute__((aligned(8)));
482 };
483 
485  struct qb_ipc_request_header header __attribute__((aligned(8)));
486  /* merge decisions */
487  mar_uint32_t old_members __attribute__((aligned(8)));
488  /* downlist below */
489  mar_uint32_t left_nodes __attribute__((aligned(8)));
490  mar_uint32_t nodeids[PROCESSOR_COUNT_MAX] __attribute__((aligned(8)));
491 };
492 
493 struct joinlist_msg {
495  uint32_t pid;
497  struct list_head list;
498 };
499 
500 static struct req_exec_cpg_downlist g_req_exec_cpg_downlist;
501 
502 /*
503  * Function print group name. It's not reentrant
504  */
505 static char *cpg_print_group_name(const mar_cpg_name_t *group)
506 {
507  static char res[CPG_MAX_NAME_LENGTH * 4 + 1];
508  int dest_pos = 0;
509  char c;
510  int i;
511 
512  for (i = 0; i < group->length; i++) {
513  c = group->value[i];
514 
515  if (c >= ' ' && c < 0x7f && c != '\\') {
516  res[dest_pos++] = c;
517  } else {
518  if (c == '\\') {
519  res[dest_pos++] = '\\';
520  res[dest_pos++] = '\\';
521  } else {
522  snprintf(res + dest_pos, sizeof(res) - dest_pos, "\\x%02X", c);
523  dest_pos += 4;
524  }
525  }
526  }
527  res[dest_pos] = 0;
528 
529  return (res);
530 }
531 
532 static void cpg_sync_init (
533  const unsigned int *trans_list,
534  size_t trans_list_entries,
535  const unsigned int *member_list,
536  size_t member_list_entries,
537  const struct memb_ring_id *ring_id)
538 {
539  int entries;
540  int i, j;
541  int found;
542 
543  my_sync_state = CPGSYNC_DOWNLIST;
544 
545  memcpy (my_member_list, member_list, member_list_entries *
546  sizeof (unsigned int));
547  my_member_list_entries = member_list_entries;
548 
549  last_sync_ring_id.nodeid = ring_id->rep.nodeid;
550  last_sync_ring_id.seq = ring_id->seq;
551 
552  entries = 0;
553  /*
554  * Determine list of nodeids for downlist message
555  */
556  for (i = 0; i < my_old_member_list_entries; i++) {
557  found = 0;
558  for (j = 0; j < trans_list_entries; j++) {
559  if (my_old_member_list[i] == trans_list[j]) {
560  found = 1;
561  break;
562  }
563  }
564  if (found == 0) {
565  g_req_exec_cpg_downlist.nodeids[entries++] =
566  my_old_member_list[i];
567  }
568  }
569  g_req_exec_cpg_downlist.left_nodes = entries;
570 }
571 
572 static int cpg_sync_process (void)
573 {
574  int res = -1;
575 
576  if (my_sync_state == CPGSYNC_DOWNLIST) {
577  res = cpg_exec_send_downlist();
578  if (res == -1) {
579  return (-1);
580  }
581  my_sync_state = CPGSYNC_JOINLIST;
582  }
583  if (my_sync_state == CPGSYNC_JOINLIST) {
584  res = cpg_exec_send_joinlist();
585  }
586  return (res);
587 }
588 
589 static void cpg_sync_activate (void)
590 {
591  memcpy (my_old_member_list, my_member_list,
592  my_member_list_entries * sizeof (unsigned int));
593  my_old_member_list_entries = my_member_list_entries;
594 
595  downlist_inform_clients ();
596 
597  joinlist_inform_clients ();
598 
599  joinlist_messages_delete ();
600 
601  notify_lib_totem_membership (NULL, my_member_list_entries, my_member_list);
602 }
603 
604 static void cpg_sync_abort (void)
605 {
606 
607  joinlist_messages_delete ();
608 }
609 
610 static int notify_lib_totem_membership (
611  void *conn,
612  int member_list_entries,
613  const unsigned int *member_list)
614 {
615  struct list_head *iter;
616  char *buf;
617  int size;
619 
620  size = sizeof(struct res_lib_cpg_totem_confchg_callback) +
621  sizeof(mar_uint32_t) * (member_list_entries);
622  buf = alloca(size);
623  if (!buf)
624  return CS_ERR_LIBRARY;
625 
626  res = (struct res_lib_cpg_totem_confchg_callback *)buf;
627  res->member_list_entries = member_list_entries;
628  res->header.size = size;
630  res->header.error = CS_OK;
631 
632  memcpy (&res->ring_id, &last_sync_ring_id, sizeof (mar_cpg_ring_id_t));
633  memcpy (res->member_list, member_list, res->member_list_entries * sizeof (mar_uint32_t));
634 
635  if (conn == NULL) {
636  for (iter = cpg_pd_list_head.next; iter != &cpg_pd_list_head; iter = iter->next) {
637  struct cpg_pd *cpg_pd = list_entry (iter, struct cpg_pd, list);
638  api->ipc_dispatch_send (cpg_pd->conn, buf, size);
639  }
640  } else {
641  api->ipc_dispatch_send (conn, buf, size);
642  }
643 
644  return CS_OK;
645 }
646 
647 static int notify_lib_joinlist(
648  const mar_cpg_name_t *group_name,
649  void *conn,
650  int joined_list_entries,
651  mar_cpg_address_t *joined_list,
652  int left_list_entries,
653  mar_cpg_address_t *left_list,
654  int id)
655 {
656  int size;
657  char *buf;
658  struct list_head *iter;
659  int count;
660  struct res_lib_cpg_confchg_callback *res;
661  mar_cpg_address_t *retgi;
662 
663  count = 0;
664 
665  for (iter = process_info_list_head.next; iter != &process_info_list_head; iter = iter->next) {
666  struct process_info *pi = list_entry (iter, struct process_info, list);
667  if (mar_name_compare (&pi->group, group_name) == 0) {
668  int i;
669  int founded = 0;
670 
671  for (i = 0; i < left_list_entries; i++) {
672  if (left_list[i].nodeid == pi->nodeid && left_list[i].pid == pi->pid) {
673  founded++;
674  }
675  }
676 
677  if (!founded)
678  count++;
679  }
680  }
681 
682  size = sizeof(struct res_lib_cpg_confchg_callback) +
683  sizeof(mar_cpg_address_t) * (count + left_list_entries + joined_list_entries);
684  buf = alloca(size);
685  if (!buf)
686  return CS_ERR_LIBRARY;
687 
688  res = (struct res_lib_cpg_confchg_callback *)buf;
689  res->joined_list_entries = joined_list_entries;
690  res->left_list_entries = left_list_entries;
691  res->member_list_entries = count;
692  retgi = res->member_list;
693  res->header.size = size;
694  res->header.id = id;
695  res->header.error = CS_OK;
696  memcpy(&res->group_name, group_name, sizeof(mar_cpg_name_t));
697 
698  for (iter = process_info_list_head.next; iter != &process_info_list_head; iter = iter->next) {
699  struct process_info *pi=list_entry (iter, struct process_info, list);
700 
701  if (mar_name_compare (&pi->group, group_name) == 0) {
702  int i;
703  int founded = 0;
704 
705  for (i = 0;i < left_list_entries; i++) {
706  if (left_list[i].nodeid == pi->nodeid && left_list[i].pid == pi->pid) {
707  founded++;
708  }
709  }
710 
711  if (!founded) {
712  retgi->nodeid = pi->nodeid;
713  retgi->pid = pi->pid;
714  retgi++;
715  }
716  }
717  }
718 
719  if (left_list_entries) {
720  memcpy (retgi, left_list, left_list_entries * sizeof(mar_cpg_address_t));
721  retgi += left_list_entries;
722  }
723 
724  if (joined_list_entries) {
725  memcpy (retgi, joined_list, joined_list_entries * sizeof(mar_cpg_address_t));
726  retgi += joined_list_entries;
727  }
728 
729  if (conn) {
730  api->ipc_dispatch_send (conn, buf, size);
731  } else {
732  for (iter = cpg_pd_list_head.next; iter != &cpg_pd_list_head; iter = iter->next) {
733  struct cpg_pd *cpd = list_entry (iter, struct cpg_pd, list);
734  if (mar_name_compare (&cpd->group_name, group_name) == 0) {
735  assert (joined_list_entries <= 1);
736  if (joined_list_entries) {
737  if (joined_list[0].pid == cpd->pid &&
738  joined_list[0].nodeid == api->totem_nodeid_get()) {
740  }
741  }
742  if (cpd->cpd_state == CPD_STATE_JOIN_COMPLETED ||
744 
745  api->ipc_dispatch_send (cpd->conn, buf, size);
746  cpd->transition_counter++;
747  }
748  if (left_list_entries) {
749  if (left_list[0].pid == cpd->pid &&
750  left_list[0].nodeid == api->totem_nodeid_get() &&
751  left_list[0].reason == CONFCHG_CPG_REASON_LEAVE) {
752 
753  cpd->pid = 0;
754  memset (&cpd->group_name, 0, sizeof(cpd->group_name));
756  }
757  }
758  }
759  }
760  }
761 
762 
763  /*
764  * Traverse thru cpds and send totem membership for cpd, where it is not send yet
765  */
766  for (iter = cpg_pd_list_head.next; iter != &cpg_pd_list_head; iter = iter->next) {
767  struct cpg_pd *cpd = list_entry (iter, struct cpg_pd, list);
768 
770  cpd->initial_totem_conf_sent = 1;
771 
772  notify_lib_totem_membership (cpd->conn, my_old_member_list_entries, my_old_member_list);
773  }
774  }
775 
776  return CS_OK;
777 }
778 
779 static void downlist_log(const char *msg, struct req_exec_cpg_downlist *dl)
780 {
781  log_printf (LOG_DEBUG,
782  "%s: members(old:%d left:%d)",
783  msg,
784  dl->old_members,
785  dl->left_nodes);
786 }
787 
788 static void downlist_inform_clients (void)
789 {
790  struct list_head *iter;
791  struct process_info *left_pi;
792  qb_map_t *group_map;
793  struct cpg_name cpg_group;
794  mar_cpg_name_t group;
795  struct confchg_data{
796  struct cpg_name cpg_group;
798  int left_list_entries;
799  struct list_head list;
800  } *pcd;
801  qb_map_iter_t *miter;
802  int i, size;
803 
804  downlist_log("my downlist", &g_req_exec_cpg_downlist);
805 
806  group_map = qb_skiplist_create();
807 
808  /*
809  * only the cpg groups included in left nodes should receive
810  * confchg event, so we will collect these cpg groups and
811  * relative left_lists here.
812  */
813  for (iter = process_info_list_head.next; iter != &process_info_list_head; ) {
814  struct process_info *pi = list_entry(iter, struct process_info, list);
815  iter = iter->next;
816 
817  left_pi = NULL;
818  for (i = 0; i < g_req_exec_cpg_downlist.left_nodes; i++) {
819 
820  if (pi->nodeid == g_req_exec_cpg_downlist.nodeids[i]) {
821  left_pi = pi;
822  break;
823  }
824  }
825 
826  if (left_pi) {
827  marshall_from_mar_cpg_name_t(&cpg_group, &left_pi->group);
828  cpg_group.value[cpg_group.length] = 0;
829 
830  pcd = (struct confchg_data *)qb_map_get(group_map, cpg_group.value);
831  if (pcd == NULL) {
832  pcd = (struct confchg_data *)calloc(1, sizeof(struct confchg_data));
833  memcpy(&pcd->cpg_group, &cpg_group, sizeof(struct cpg_name));
834  qb_map_put(group_map, pcd->cpg_group.value, pcd);
835  }
836  size = pcd->left_list_entries;
837  pcd->left_list[size].nodeid = left_pi->nodeid;
838  pcd->left_list[size].pid = left_pi->pid;
839  pcd->left_list[size].reason = CONFCHG_CPG_REASON_NODEDOWN;
840  pcd->left_list_entries++;
841  list_del (&left_pi->list);
842  free (left_pi);
843  }
844  }
845 
846  /* send only one confchg event per cpg group */
847  miter = qb_map_iter_create(group_map);
848  while (qb_map_iter_next(miter, (void **)&pcd)) {
849  marshall_to_mar_cpg_name_t(&group, &pcd->cpg_group);
850 
851  log_printf (LOG_DEBUG, "left_list_entries:%d", pcd->left_list_entries);
852  for (i=0; i<pcd->left_list_entries; i++) {
853  log_printf (LOG_DEBUG, "left_list[%d] group:%s, ip:%s, pid:%d",
854  i, cpg_print_group_name(&group),
855  (char*)api->totem_ifaces_print(pcd->left_list[i].nodeid),
856  pcd->left_list[i].pid);
857  }
858 
859  /* send confchg event */
860  notify_lib_joinlist(&group, NULL,
861  0, NULL,
862  pcd->left_list_entries,
863  pcd->left_list,
865 
866  free(pcd);
867  }
868  qb_map_iter_free(miter);
869  qb_map_destroy(group_map);
870 }
871 
872 /*
873  * Remove processes that might have left the group while we were suspended.
874  */
875 static void joinlist_remove_zombie_pi_entries (void)
876 {
877  struct list_head *pi_iter;
878  struct list_head *jl_iter;
879  struct process_info *pi;
880  struct joinlist_msg *stored_msg;
881  int found;
882 
883  for (pi_iter = process_info_list_head.next; pi_iter != &process_info_list_head; ) {
884  pi = list_entry (pi_iter, struct process_info, list);
885  pi_iter = pi_iter->next;
886 
887  /*
888  * Ignore local node
889  */
890  if (pi->nodeid == api->totem_nodeid_get()) {
891  continue ;
892  }
893 
894  /*
895  * Try to find message in joinlist messages
896  */
897  found = 0;
898  for (jl_iter = joinlist_messages_head.next;
899  jl_iter != &joinlist_messages_head;
900  jl_iter = jl_iter->next) {
901 
902  stored_msg = list_entry(jl_iter, struct joinlist_msg, list);
903 
904  if (stored_msg->sender_nodeid == api->totem_nodeid_get()) {
905  continue ;
906  }
907 
908  if (pi->nodeid == stored_msg->sender_nodeid &&
909  pi->pid == stored_msg->pid &&
910  mar_name_compare (&pi->group, &stored_msg->group_name) == 0) {
911  found = 1;
912  break ;
913  }
914  }
915 
916  if (!found) {
917  do_proc_leave(&pi->group, pi->pid, pi->nodeid, CONFCHG_CPG_REASON_PROCDOWN);
918  }
919  }
920 }
921 
922 static void joinlist_inform_clients (void)
923 {
924  struct joinlist_msg *stored_msg;
925  struct list_head *iter;
926  unsigned int i;
927 
928  i = 0;
929  for (iter = joinlist_messages_head.next;
930  iter != &joinlist_messages_head;
931  iter = iter->next) {
932 
933  stored_msg = list_entry(iter, struct joinlist_msg, list);
934 
935  log_printf (LOG_DEBUG, "joinlist_messages[%u] group:%s, ip:%s, pid:%d",
936  i++, cpg_print_group_name(&stored_msg->group_name),
937  (char*)api->totem_ifaces_print(stored_msg->sender_nodeid),
938  stored_msg->pid);
939 
940  /* Ignore our own messages */
941  if (stored_msg->sender_nodeid == api->totem_nodeid_get()) {
942  continue ;
943  }
944 
945  do_proc_join (&stored_msg->group_name, stored_msg->pid, stored_msg->sender_nodeid,
947  }
948 
949  joinlist_remove_zombie_pi_entries ();
950 }
951 
952 static void joinlist_messages_delete (void)
953 {
954  struct joinlist_msg *stored_msg;
955  struct list_head *iter, *iter_next;
956 
957  for (iter = joinlist_messages_head.next;
958  iter != &joinlist_messages_head;
959  iter = iter_next) {
960 
961  iter_next = iter->next;
962 
963  stored_msg = list_entry(iter, struct joinlist_msg, list);
964  list_del (&stored_msg->list);
965  free (stored_msg);
966  }
967  list_init (&joinlist_messages_head);
968 }
969 
970 static char *cpg_exec_init_fn (struct corosync_api_v1 *corosync_api)
971 {
972  list_init (&joinlist_messages_head);
973  api = corosync_api;
974  return (NULL);
975 }
976 
977 static void cpg_iteration_instance_finalize (struct cpg_iteration_instance *cpg_iteration_instance)
978 {
979  struct list_head *iter, *iter_next;
980  struct process_info *pi;
981 
982  for (iter = cpg_iteration_instance->items_list_head.next;
983  iter != &cpg_iteration_instance->items_list_head;
984  iter = iter_next) {
985 
986  iter_next = iter->next;
987 
988  pi = list_entry (iter, struct process_info, list);
989  list_del (&pi->list);
990  free (pi);
991  }
992 
993  list_del (&cpg_iteration_instance->list);
994  hdb_handle_destroy (&cpg_iteration_handle_t_db, cpg_iteration_instance->handle);
995 }
996 
997 static void cpg_pd_finalize (struct cpg_pd *cpd)
998 {
999  struct list_head *iter, *iter_next;
1000  struct cpg_iteration_instance *cpii;
1001 
1002  zcb_all_free(cpd);
1003  for (iter = cpd->iteration_instance_list_head.next;
1004  iter != &cpd->iteration_instance_list_head;
1005  iter = iter_next) {
1006 
1007  iter_next = iter->next;
1008 
1009  cpii = list_entry (iter, struct cpg_iteration_instance, list);
1010 
1011  cpg_iteration_instance_finalize (cpii);
1012  }
1013 
1014  list_del (&cpd->list);
1015 }
1016 
1017 static int cpg_lib_exit_fn (void *conn)
1018 {
1019  struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
1020 
1021  log_printf(LOGSYS_LEVEL_DEBUG, "exit_fn for conn=%p", conn);
1022 
1023  if (cpd->group_name.length > 0 && cpd->cpd_state != CPD_STATE_LEAVE_STARTED) {
1024  cpg_node_joinleave_send (cpd->pid, &cpd->group_name,
1026  }
1027 
1028  cpg_pd_finalize (cpd);
1029 
1030  api->ipc_refcnt_dec (conn);
1031  return (0);
1032 }
1033 
1034 static int cpg_node_joinleave_send (unsigned int pid, const mar_cpg_name_t *group_name, int fn, int reason)
1035 {
1037  struct iovec req_exec_cpg_iovec;
1038  int result;
1039 
1040  memcpy(&req_exec_cpg_procjoin.group_name, group_name, sizeof(mar_cpg_name_t));
1041  req_exec_cpg_procjoin.pid = pid;
1042  req_exec_cpg_procjoin.reason = reason;
1043 
1044  req_exec_cpg_procjoin.header.size = sizeof(req_exec_cpg_procjoin);
1046 
1047  req_exec_cpg_iovec.iov_base = (char *)&req_exec_cpg_procjoin;
1048  req_exec_cpg_iovec.iov_len = sizeof(req_exec_cpg_procjoin);
1049 
1050  result = api->totem_mcast (&req_exec_cpg_iovec, 1, TOTEM_AGREED);
1051 
1052  return (result);
1053 }
1054 
1055 /* Can byteswap join & leave messages */
1056 static void exec_cpg_procjoin_endian_convert (void *msg)
1057 {
1059 
1060  req_exec_cpg_procjoin->pid = swab32(req_exec_cpg_procjoin->pid);
1061  swab_mar_cpg_name_t (&req_exec_cpg_procjoin->group_name);
1062  req_exec_cpg_procjoin->reason = swab32(req_exec_cpg_procjoin->reason);
1063 }
1064 
1065 static void exec_cpg_joinlist_endian_convert (void *msg_v)
1066 {
1067  char *msg = msg_v;
1068  struct qb_ipc_response_header *res = (struct qb_ipc_response_header *)msg;
1069  struct join_list_entry *jle = (struct join_list_entry *)(msg + sizeof(struct qb_ipc_response_header));
1070 
1071  swab_mar_int32_t (&res->size);
1072 
1073  while ((const char*)jle < msg + res->size) {
1074  jle->pid = swab32(jle->pid);
1075  swab_mar_cpg_name_t (&jle->group_name);
1076  jle++;
1077  }
1078 }
1079 
1080 static void exec_cpg_downlist_endian_convert_old (void *msg)
1081 {
1082 }
1083 
1084 static void exec_cpg_downlist_endian_convert (void *msg)
1085 {
1087  unsigned int i;
1088 
1089  req_exec_cpg_downlist->left_nodes = swab32(req_exec_cpg_downlist->left_nodes);
1090  req_exec_cpg_downlist->old_members = swab32(req_exec_cpg_downlist->old_members);
1091 
1092  for (i = 0; i < req_exec_cpg_downlist->left_nodes; i++) {
1093  req_exec_cpg_downlist->nodeids[i] = swab32(req_exec_cpg_downlist->nodeids[i]);
1094  }
1095 }
1096 
1097 
1098 static void exec_cpg_mcast_endian_convert (void *msg)
1099 {
1100  struct req_exec_cpg_mcast *req_exec_cpg_mcast = msg;
1101 
1102  swab_coroipc_request_header_t (&req_exec_cpg_mcast->header);
1103  swab_mar_cpg_name_t (&req_exec_cpg_mcast->group_name);
1104  req_exec_cpg_mcast->pid = swab32(req_exec_cpg_mcast->pid);
1105  req_exec_cpg_mcast->msglen = swab32(req_exec_cpg_mcast->msglen);
1106  swab_mar_message_source_t (&req_exec_cpg_mcast->source);
1107 }
1108 
1109 static void exec_cpg_partial_mcast_endian_convert (void *msg)
1110 {
1112 
1113  swab_coroipc_request_header_t (&req_exec_cpg_mcast->header);
1114  swab_mar_cpg_name_t (&req_exec_cpg_mcast->group_name);
1115  req_exec_cpg_mcast->pid = swab32(req_exec_cpg_mcast->pid);
1116  req_exec_cpg_mcast->msglen = swab32(req_exec_cpg_mcast->msglen);
1117  req_exec_cpg_mcast->fraglen = swab32(req_exec_cpg_mcast->fraglen);
1118  req_exec_cpg_mcast->type = swab32(req_exec_cpg_mcast->type);
1119  swab_mar_message_source_t (&req_exec_cpg_mcast->source);
1120 }
1121 
1122 static struct process_info *process_info_find(const mar_cpg_name_t *group_name, uint32_t pid, unsigned int nodeid) {
1123  struct list_head *iter;
1124 
1125  for (iter = process_info_list_head.next; iter != &process_info_list_head; ) {
1126  struct process_info *pi = list_entry (iter, struct process_info, list);
1127  iter = iter->next;
1128 
1129  if (pi->pid == pid && pi->nodeid == nodeid &&
1130  mar_name_compare (&pi->group, group_name) == 0) {
1131  return pi;
1132  }
1133  }
1134 
1135  return NULL;
1136 }
1137 
1138 static void do_proc_join(
1139  const mar_cpg_name_t *name,
1140  uint32_t pid,
1141  unsigned int nodeid,
1142  int reason)
1143 {
1144  struct process_info *pi;
1145  struct process_info *pi_entry;
1146  mar_cpg_address_t notify_info;
1147  struct list_head *list;
1148  struct list_head *list_to_add = NULL;
1149 
1150  if (process_info_find (name, pid, nodeid) != NULL) {
1151  return ;
1152  }
1153  pi = malloc (sizeof (struct process_info));
1154  if (!pi) {
1155  log_printf(LOGSYS_LEVEL_WARNING, "Unable to allocate process_info struct");
1156  return;
1157  }
1158  pi->nodeid = nodeid;
1159  pi->pid = pid;
1160  memcpy(&pi->group, name, sizeof(*name));
1161  list_init(&pi->list);
1162 
1163  /*
1164  * Insert new process in sorted order so synchronization works properly
1165  */
1166  list_to_add = &process_info_list_head;
1167  for (list = process_info_list_head.next; list != &process_info_list_head; list = list->next) {
1168 
1169  pi_entry = list_entry(list, struct process_info, list);
1170  if (pi_entry->nodeid > pi->nodeid ||
1171  (pi_entry->nodeid == pi->nodeid && pi_entry->pid > pi->pid)) {
1172 
1173  break;
1174  }
1175  list_to_add = list;
1176  }
1177  list_add (&pi->list, list_to_add);
1178 
1179  notify_info.pid = pi->pid;
1180  notify_info.nodeid = nodeid;
1181  notify_info.reason = reason;
1182 
1183  notify_lib_joinlist(&pi->group, NULL,
1184  1, &notify_info,
1185  0, NULL,
1187 }
1188 
1189 static void do_proc_leave(
1190  const mar_cpg_name_t *name,
1191  uint32_t pid,
1192  unsigned int nodeid,
1193  int reason)
1194 {
1195  struct process_info *pi;
1196  struct list_head *iter;
1197  mar_cpg_address_t notify_info;
1198 
1199  notify_info.pid = pid;
1200  notify_info.nodeid = nodeid;
1201  notify_info.reason = reason;
1202 
1203  notify_lib_joinlist(name, NULL,
1204  0, NULL,
1205  1, &notify_info,
1207 
1208  for (iter = process_info_list_head.next; iter != &process_info_list_head; ) {
1209  pi = list_entry(iter, struct process_info, list);
1210  iter = iter->next;
1211 
1212  if (pi->pid == pid && pi->nodeid == nodeid &&
1213  mar_name_compare (&pi->group, name)==0) {
1214  list_del (&pi->list);
1215  free (pi);
1216  }
1217  }
1218 }
1219 
1220 static void message_handler_req_exec_cpg_downlist_old (
1221  const void *message,
1222  unsigned int nodeid)
1223 {
1224  log_printf (LOGSYS_LEVEL_WARNING, "downlist OLD from node 0x%x",
1225  nodeid);
1226 }
1227 
1228 static void message_handler_req_exec_cpg_downlist(
1229  const void *message,
1230  unsigned int nodeid)
1231 {
1232  const struct req_exec_cpg_downlist *req_exec_cpg_downlist = message;
1233 
1234  log_printf (LOGSYS_LEVEL_WARNING, "downlist left_list: %d received",
1235  req_exec_cpg_downlist->left_nodes);
1236 }
1237 
1238 
1239 static void message_handler_req_exec_cpg_procjoin (
1240  const void *message,
1241  unsigned int nodeid)
1242 {
1243  const struct req_exec_cpg_procjoin *req_exec_cpg_procjoin = message;
1244 
1245  log_printf(LOGSYS_LEVEL_DEBUG, "got procjoin message from cluster node 0x%x (%s) for pid %u",
1246  nodeid,
1247  api->totem_ifaces_print(nodeid),
1248  (unsigned int)req_exec_cpg_procjoin->pid);
1249 
1250  do_proc_join (&req_exec_cpg_procjoin->group_name,
1251  req_exec_cpg_procjoin->pid, nodeid,
1253 }
1254 
1255 static void message_handler_req_exec_cpg_procleave (
1256  const void *message,
1257  unsigned int nodeid)
1258 {
1259  const struct req_exec_cpg_procjoin *req_exec_cpg_procjoin = message;
1260 
1261  log_printf(LOGSYS_LEVEL_DEBUG, "got procleave message from cluster node 0x%x (%s) for pid %u",
1262  nodeid,
1263  api->totem_ifaces_print(nodeid),
1264  (unsigned int)req_exec_cpg_procjoin->pid);
1265 
1266  do_proc_leave (&req_exec_cpg_procjoin->group_name,
1267  req_exec_cpg_procjoin->pid, nodeid,
1268  req_exec_cpg_procjoin->reason);
1269 }
1270 
1271 
1272 /* Got a proclist from another node */
1273 static void message_handler_req_exec_cpg_joinlist (
1274  const void *message_v,
1275  unsigned int nodeid)
1276 {
1277  const char *message = message_v;
1278  const struct qb_ipc_response_header *res = (const struct qb_ipc_response_header *)message;
1279  const struct join_list_entry *jle = (const struct join_list_entry *)(message + sizeof(struct qb_ipc_response_header));
1280  struct joinlist_msg *stored_msg;
1281 
1282  log_printf(LOGSYS_LEVEL_DEBUG, "got joinlist message from node 0x%x",
1283  nodeid);
1284 
1285  while ((const char*)jle < message + res->size) {
1286  stored_msg = malloc (sizeof (struct joinlist_msg));
1287  memset(stored_msg, 0, sizeof (struct joinlist_msg));
1288  stored_msg->sender_nodeid = nodeid;
1289  stored_msg->pid = jle->pid;
1290  memcpy(&stored_msg->group_name, &jle->group_name, sizeof(mar_cpg_name_t));
1291  list_init (&stored_msg->list);
1292  list_add (&stored_msg->list, &joinlist_messages_head);
1293  jle++;
1294  }
1295 }
1296 
1297 static void message_handler_req_exec_cpg_mcast (
1298  const void *message,
1299  unsigned int nodeid)
1300 {
1301  const struct req_exec_cpg_mcast *req_exec_cpg_mcast = message;
1303  int msglen = req_exec_cpg_mcast->msglen;
1304  struct list_head *iter, *pi_iter;
1305  struct cpg_pd *cpd;
1306  struct iovec iovec[2];
1307  int known_node = 0;
1308 
1310  res_lib_cpg_mcast.header.size = sizeof(res_lib_cpg_mcast) + msglen;
1311  res_lib_cpg_mcast.msglen = msglen;
1312  res_lib_cpg_mcast.pid = req_exec_cpg_mcast->pid;
1313  res_lib_cpg_mcast.nodeid = nodeid;
1314 
1315  memcpy(&res_lib_cpg_mcast.group_name, &req_exec_cpg_mcast->group_name,
1316  sizeof(mar_cpg_name_t));
1317  iovec[0].iov_base = (void *)&res_lib_cpg_mcast;
1318  iovec[0].iov_len = sizeof (res_lib_cpg_mcast);
1319 
1320  iovec[1].iov_base = (char*)message+sizeof(*req_exec_cpg_mcast);
1321  iovec[1].iov_len = msglen;
1322 
1323  for (iter = cpg_pd_list_head.next; iter != &cpg_pd_list_head; ) {
1324  cpd = list_entry(iter, struct cpg_pd, list);
1325  iter = iter->next;
1326 
1328  && (mar_name_compare (&cpd->group_name, &req_exec_cpg_mcast->group_name) == 0)) {
1329 
1330  if (!known_node) {
1331  /* Try to find, if we know the node */
1332  for (pi_iter = process_info_list_head.next;
1333  pi_iter != &process_info_list_head; pi_iter = pi_iter->next) {
1334 
1335  struct process_info *pi = list_entry (pi_iter, struct process_info, list);
1336 
1337  if (pi->nodeid == nodeid &&
1338  mar_name_compare (&pi->group, &req_exec_cpg_mcast->group_name) == 0) {
1339  known_node = 1;
1340  break;
1341  }
1342  }
1343  }
1344 
1345  if (!known_node) {
1346  log_printf(LOGSYS_LEVEL_WARNING, "Unknown node -> we will not deliver message");
1347  return ;
1348  }
1349 
1350  api->ipc_dispatch_iov_send (cpd->conn, iovec, 2);
1351  }
1352  }
1353 }
1354 
1355 static void message_handler_req_exec_cpg_partial_mcast (
1356  const void *message,
1357  unsigned int nodeid)
1358 {
1359  const struct req_exec_cpg_partial_mcast *req_exec_cpg_mcast = message;
1361  int msglen = req_exec_cpg_mcast->fraglen;
1362  struct list_head *iter, *pi_iter;
1363  struct cpg_pd *cpd;
1364  struct iovec iovec[2];
1365  int known_node = 0;
1366 
1367  log_printf(LOGSYS_LEVEL_DEBUG, "Got fragmented message from node %d, size = %d bytes\n", nodeid, msglen);
1368 
1370  res_lib_cpg_mcast.header.size = sizeof(res_lib_cpg_mcast) + msglen;
1371  res_lib_cpg_mcast.fraglen = msglen;
1372  res_lib_cpg_mcast.msglen = req_exec_cpg_mcast->msglen;
1373  res_lib_cpg_mcast.pid = req_exec_cpg_mcast->pid;
1374  res_lib_cpg_mcast.type = req_exec_cpg_mcast->type;
1375  res_lib_cpg_mcast.nodeid = nodeid;
1376 
1377  memcpy(&res_lib_cpg_mcast.group_name, &req_exec_cpg_mcast->group_name,
1378  sizeof(mar_cpg_name_t));
1379  iovec[0].iov_base = (void *)&res_lib_cpg_mcast;
1380  iovec[0].iov_len = sizeof (res_lib_cpg_mcast);
1381 
1382  iovec[1].iov_base = (char*)message+sizeof(*req_exec_cpg_mcast);
1383  iovec[1].iov_len = msglen;
1384 
1385  for (iter = cpg_pd_list_head.next; iter != &cpg_pd_list_head; ) {
1386  cpd = list_entry(iter, struct cpg_pd, list);
1387  iter = iter->next;
1388 
1390  && (mar_name_compare (&cpd->group_name, &req_exec_cpg_mcast->group_name) == 0)) {
1391 
1392  if (!known_node) {
1393  /* Try to find, if we know the node */
1394  for (pi_iter = process_info_list_head.next;
1395  pi_iter != &process_info_list_head; pi_iter = pi_iter->next) {
1396 
1397  struct process_info *pi = list_entry (pi_iter, struct process_info, list);
1398 
1399  if (pi->nodeid == nodeid &&
1400  mar_name_compare (&pi->group, &req_exec_cpg_mcast->group_name) == 0) {
1401  known_node = 1;
1402  break;
1403  }
1404  }
1405  }
1406 
1407  if (!known_node) {
1408  log_printf(LOGSYS_LEVEL_WARNING, "Unknown node -> we will not deliver message");
1409  return ;
1410  }
1411 
1412  api->ipc_dispatch_iov_send (cpd->conn, iovec, 2);
1413  }
1414  }
1415 }
1416 
1417 
1418 static int cpg_exec_send_downlist(void)
1419 {
1420  struct iovec iov;
1421 
1422  g_req_exec_cpg_downlist.header.id = SERVICE_ID_MAKE(CPG_SERVICE, MESSAGE_REQ_EXEC_CPG_DOWNLIST);
1423  g_req_exec_cpg_downlist.header.size = sizeof(struct req_exec_cpg_downlist);
1424 
1425  g_req_exec_cpg_downlist.old_members = my_old_member_list_entries;
1426 
1427  iov.iov_base = (void *)&g_req_exec_cpg_downlist;
1428  iov.iov_len = g_req_exec_cpg_downlist.header.size;
1429 
1430  return (api->totem_mcast (&iov, 1, TOTEM_AGREED));
1431 }
1432 
1433 static int cpg_exec_send_joinlist(void)
1434 {
1435  int count = 0;
1436  struct list_head *iter;
1437  struct qb_ipc_response_header *res;
1438  char *buf;
1439  struct join_list_entry *jle;
1440  struct iovec req_exec_cpg_iovec;
1441 
1442  for (iter = process_info_list_head.next; iter != &process_info_list_head; iter = iter->next) {
1443  struct process_info *pi = list_entry (iter, struct process_info, list);
1444 
1445  if (pi->nodeid == api->totem_nodeid_get ()) {
1446  count++;
1447  }
1448  }
1449 
1450  /* Nothing to send */
1451  if (!count)
1452  return 0;
1453 
1454  buf = alloca(sizeof(struct qb_ipc_response_header) + sizeof(struct join_list_entry) * count);
1455  if (!buf) {
1456  log_printf(LOGSYS_LEVEL_WARNING, "Unable to allocate joinlist buffer");
1457  return -1;
1458  }
1459 
1460  jle = (struct join_list_entry *)(buf + sizeof(struct qb_ipc_response_header));
1461  res = (struct qb_ipc_response_header *)buf;
1462 
1463  for (iter = process_info_list_head.next; iter != &process_info_list_head; iter = iter->next) {
1464  struct process_info *pi = list_entry (iter, struct process_info, list);
1465 
1466  if (pi->nodeid == api->totem_nodeid_get ()) {
1467  memcpy (&jle->group_name, &pi->group, sizeof (mar_cpg_name_t));
1468  jle->pid = pi->pid;
1469  jle++;
1470  }
1471  }
1472 
1474  res->size = sizeof(struct qb_ipc_response_header)+sizeof(struct join_list_entry) * count;
1475 
1476  req_exec_cpg_iovec.iov_base = buf;
1477  req_exec_cpg_iovec.iov_len = res->size;
1478 
1479  return (api->totem_mcast (&req_exec_cpg_iovec, 1, TOTEM_AGREED));
1480 }
1481 
1482 static int cpg_lib_init_fn (void *conn)
1483 {
1484  struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
1485  memset (cpd, 0, sizeof(struct cpg_pd));
1486  cpd->conn = conn;
1487  list_add (&cpd->list, &cpg_pd_list_head);
1488 
1489  list_init (&cpd->iteration_instance_list_head);
1490  list_init (&cpd->zcb_mapped_list_head);
1491 
1492  api->ipc_refcnt_inc (conn);
1493  log_printf(LOGSYS_LEVEL_DEBUG, "lib_init_fn: conn=%p, cpd=%p", conn, cpd);
1494  return (0);
1495 }
1496 
1497 /* Join message from the library */
1498 static void message_handler_req_lib_cpg_join (void *conn, const void *message)
1499 {
1500  const struct req_lib_cpg_join *req_lib_cpg_join = message;
1501  struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
1503  cs_error_t error = CS_OK;
1504  struct list_head *iter;
1505 
1506  /* Test, if we don't have same pid and group name joined */
1507  for (iter = cpg_pd_list_head.next; iter != &cpg_pd_list_head; iter = iter->next) {
1508  struct cpg_pd *cpd_item = list_entry (iter, struct cpg_pd, list);
1509 
1510  if (cpd_item->pid == req_lib_cpg_join->pid &&
1511  mar_name_compare(&req_lib_cpg_join->group_name, &cpd_item->group_name) == 0) {
1512 
1513  /* We have same pid and group name joined -> return error */
1514  error = CS_ERR_EXIST;
1515  goto response_send;
1516  }
1517  }
1518 
1519  /*
1520  * Same check must be done in process info list, because there may be not yet delivered
1521  * leave of client.
1522  */
1523  for (iter = process_info_list_head.next; iter != &process_info_list_head; iter = iter->next) {
1524  struct process_info *pi = list_entry (iter, struct process_info, list);
1525 
1526  if (pi->nodeid == api->totem_nodeid_get () && pi->pid == req_lib_cpg_join->pid &&
1527  mar_name_compare(&req_lib_cpg_join->group_name, &pi->group) == 0) {
1528  /* We have same pid and group name joined -> return error */
1529  error = CS_ERR_TRY_AGAIN;
1530  goto response_send;
1531  }
1532  }
1533 
1534  if (req_lib_cpg_join->group_name.length > CPG_MAX_NAME_LENGTH) {
1535  error = CS_ERR_NAME_TOO_LONG;
1536  goto response_send;
1537  }
1538 
1539  switch (cpd->cpd_state) {
1540  case CPD_STATE_UNJOINED:
1541  error = CS_OK;
1543  cpd->pid = req_lib_cpg_join->pid;
1544  cpd->flags = req_lib_cpg_join->flags;
1545  memcpy (&cpd->group_name, &req_lib_cpg_join->group_name,
1546  sizeof (cpd->group_name));
1547 
1548  cpg_node_joinleave_send (req_lib_cpg_join->pid,
1549  &req_lib_cpg_join->group_name,
1551  break;
1553  error = CS_ERR_BUSY;
1554  break;
1556  error = CS_ERR_EXIST;
1557  break;
1559  error = CS_ERR_EXIST;
1560  break;
1561  }
1562 
1563 response_send:
1564  res_lib_cpg_join.header.size = sizeof(res_lib_cpg_join);
1566  res_lib_cpg_join.header.error = error;
1567  api->ipc_response_send (conn, &res_lib_cpg_join, sizeof(res_lib_cpg_join));
1568 }
1569 
1570 /* Leave message from the library */
1571 static void message_handler_req_lib_cpg_leave (void *conn, const void *message)
1572 {
1574  cs_error_t error = CS_OK;
1575  struct req_lib_cpg_leave *req_lib_cpg_leave = (struct req_lib_cpg_leave *)message;
1576  struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
1577 
1578  log_printf(LOGSYS_LEVEL_DEBUG, "got leave request on %p", conn);
1579 
1580  switch (cpd->cpd_state) {
1581  case CPD_STATE_UNJOINED:
1582  error = CS_ERR_NOT_EXIST;
1583  break;
1585  error = CS_ERR_NOT_EXIST;
1586  break;
1588  error = CS_ERR_BUSY;
1589  break;
1591  error = CS_OK;
1593  cpg_node_joinleave_send (req_lib_cpg_leave->pid,
1594  &req_lib_cpg_leave->group_name,
1597  break;
1598  }
1599 
1600  /* send return */
1601  res_lib_cpg_leave.header.size = sizeof(res_lib_cpg_leave);
1603  res_lib_cpg_leave.header.error = error;
1605 }
1606 
1607 /* Finalize message from library */
1608 static void message_handler_req_lib_cpg_finalize (
1609  void *conn,
1610  const void *message)
1611 {
1612  struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
1614  cs_error_t error = CS_OK;
1615 
1616  log_printf (LOGSYS_LEVEL_DEBUG, "cpg finalize for conn=%p", conn);
1617 
1618  /*
1619  * We will just remove cpd from list. After this call, connection will be
1620  * closed on lib side, and cpg_lib_exit_fn will be called
1621  */
1622  list_del (&cpd->list);
1623  list_init (&cpd->list);
1624 
1625  res_lib_cpg_finalize.header.size = sizeof (res_lib_cpg_finalize);
1627  res_lib_cpg_finalize.header.error = error;
1628 
1630  sizeof (res_lib_cpg_finalize));
1631 }
1632 
1633 static int
1634 memory_map (
1635  const char *path,
1636  size_t bytes,
1637  void **buf)
1638 {
1639  int32_t fd;
1640  void *addr;
1641  int32_t res;
1642 
1643  fd = open (path, O_RDWR, 0600);
1644 
1645  unlink (path);
1646 
1647  if (fd == -1) {
1648  return (-1);
1649  }
1650 
1651  res = ftruncate (fd, bytes);
1652  if (res == -1) {
1653  goto error_close_unlink;
1654  }
1655 
1656  addr = mmap (NULL, bytes, PROT_READ | PROT_WRITE,
1657  MAP_SHARED, fd, 0);
1658 
1659  if (addr == MAP_FAILED) {
1660  goto error_close_unlink;
1661  }
1662 #ifdef MADV_NOSYNC
1663  madvise(addr, bytes, MADV_NOSYNC);
1664 #endif
1665 
1666  res = close (fd);
1667  if (res) {
1668  munmap (addr, bytes);
1669  return (-1);
1670  }
1671  *buf = addr;
1672  return (0);
1673 
1674 error_close_unlink:
1675  close (fd);
1676  unlink(path);
1677  return -1;
1678 }
1679 
1680 static inline int zcb_alloc (
1681  struct cpg_pd *cpd,
1682  const char *path_to_file,
1683  size_t size,
1684  void **addr)
1685 {
1686  struct zcb_mapped *zcb_mapped;
1687  unsigned int res;
1688 
1689  zcb_mapped = malloc (sizeof (struct zcb_mapped));
1690  if (zcb_mapped == NULL) {
1691  return (-1);
1692  }
1693 
1694  res = memory_map (
1695  path_to_file,
1696  size,
1697  addr);
1698  if (res == -1) {
1699  free (zcb_mapped);
1700  return (-1);
1701  }
1702 
1703  list_init (&zcb_mapped->list);
1704  zcb_mapped->addr = *addr;
1705  zcb_mapped->size = size;
1706  list_add_tail (&zcb_mapped->list, &cpd->zcb_mapped_list_head);
1707  return (0);
1708 }
1709 
1710 
1711 static inline int zcb_free (struct zcb_mapped *zcb_mapped)
1712 {
1713  unsigned int res;
1714 
1715  res = munmap (zcb_mapped->addr, zcb_mapped->size);
1716  list_del (&zcb_mapped->list);
1717  free (zcb_mapped);
1718  return (res);
1719 }
1720 
1721 static inline int zcb_by_addr_free (struct cpg_pd *cpd, void *addr)
1722 {
1723  struct list_head *list;
1724  struct zcb_mapped *zcb_mapped;
1725  unsigned int res = 0;
1726 
1727  for (list = cpd->zcb_mapped_list_head.next;
1728  list != &cpd->zcb_mapped_list_head; list = list->next) {
1729 
1730  zcb_mapped = list_entry (list, struct zcb_mapped, list);
1731 
1732  if (zcb_mapped->addr == addr) {
1733  res = zcb_free (zcb_mapped);
1734  break;
1735  }
1736 
1737  }
1738  return (res);
1739 }
1740 
1741 static inline int zcb_all_free (
1742  struct cpg_pd *cpd)
1743 {
1744  struct list_head *list;
1745  struct zcb_mapped *zcb_mapped;
1746 
1747  for (list = cpd->zcb_mapped_list_head.next;
1748  list != &cpd->zcb_mapped_list_head;) {
1749 
1750  zcb_mapped = list_entry (list, struct zcb_mapped, list);
1751 
1752  list = list->next;
1753 
1754  zcb_free (zcb_mapped);
1755  }
1756  return (0);
1757 }
1758 
1759 union u {
1760  uint64_t server_addr;
1761  void *server_ptr;
1762 };
1763 
1764 static uint64_t void2serveraddr (void *server_ptr)
1765 {
1766  union u u;
1767 
1768  u.server_ptr = server_ptr;
1769  return (u.server_addr);
1770 }
1771 
1772 static void *serveraddr2void (uint64_t server_addr)
1773 {
1774  union u u;
1775 
1777  return (u.server_ptr);
1778 };
1779 
1780 static void message_handler_req_lib_cpg_zc_alloc (
1781  void *conn,
1782  const void *message)
1783 {
1785  struct qb_ipc_response_header res_header;
1786  void *addr = NULL;
1787  struct coroipcs_zc_header *zc_header;
1788  unsigned int res;
1789  struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
1790 
1791  log_printf(LOGSYS_LEVEL_DEBUG, "path: %s", hdr->path_to_file);
1792 
1793  res = zcb_alloc (cpd, hdr->path_to_file, hdr->map_size,
1794  &addr);
1795  assert(res == 0);
1796 
1797  zc_header = (struct coroipcs_zc_header *)addr;
1798  zc_header->server_address = void2serveraddr(addr);
1799 
1800  res_header.size = sizeof (struct qb_ipc_response_header);
1801  res_header.id = 0;
1802  api->ipc_response_send (conn,
1803  &res_header,
1804  res_header.size);
1805 }
1806 
1807 static void message_handler_req_lib_cpg_zc_free (
1808  void *conn,
1809  const void *message)
1810 {
1812  struct qb_ipc_response_header res_header;
1813  void *addr = NULL;
1814  struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
1815 
1816  log_printf(LOGSYS_LEVEL_DEBUG, " free'ing");
1817 
1818  addr = serveraddr2void (hdr->server_address);
1819 
1820  zcb_by_addr_free (cpd, addr);
1821 
1822  res_header.size = sizeof (struct qb_ipc_response_header);
1823  res_header.id = 0;
1824  api->ipc_response_send (
1825  conn, &res_header,
1826  res_header.size);
1827 }
1828 
1829 /* Fragmented mcast message from the library */
1830 static void message_handler_req_lib_cpg_partial_mcast (void *conn, const void *message)
1831 {
1832  const struct req_lib_cpg_partial_mcast *req_lib_cpg_mcast = message;
1833  struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
1834  mar_cpg_name_t group_name = cpd->group_name;
1835 
1836  struct iovec req_exec_cpg_iovec[2];
1837  struct req_exec_cpg_partial_mcast req_exec_cpg_mcast;
1838  struct res_lib_cpg_partial_send res_lib_cpg_partial_send;
1839  int msglen = req_lib_cpg_mcast->fraglen;
1840  int result;
1841  cs_error_t error = CS_ERR_NOT_EXIST;
1842 
1843  log_printf(LOGSYS_LEVEL_TRACE, "got fragmented mcast request on %p", conn);
1844  log_printf(LOGSYS_LEVEL_DEBUG, "Sending fragmented message size = %d bytes\n", msglen);
1845 
1846  switch (cpd->cpd_state) {
1847  case CPD_STATE_UNJOINED:
1848  error = CS_ERR_NOT_EXIST;
1849  break;
1851  error = CS_ERR_NOT_EXIST;
1852  break;
1854  error = CS_OK;
1855  break;
1857  error = CS_OK;
1858  break;
1859  }
1860 
1861  res_lib_cpg_partial_send.header.size = sizeof(res_lib_cpg_partial_send);
1862  res_lib_cpg_partial_send.header.id = MESSAGE_RES_CPG_PARTIAL_SEND;
1863 
1864  if (req_lib_cpg_mcast->type == LIBCPG_PARTIAL_FIRST) {
1866  }
1868  error = CS_ERR_INTERRUPT;
1869  }
1870 
1871  if (error == CS_OK) {
1872  req_exec_cpg_mcast.header.size = sizeof(req_exec_cpg_mcast) + msglen;
1873  req_exec_cpg_mcast.header.id = SERVICE_ID_MAKE(CPG_SERVICE,
1875  req_exec_cpg_mcast.pid = cpd->pid;
1876  req_exec_cpg_mcast.msglen = req_lib_cpg_mcast->msglen;
1877  req_exec_cpg_mcast.type = req_lib_cpg_mcast->type;
1878  req_exec_cpg_mcast.fraglen = req_lib_cpg_mcast->fraglen;
1879  api->ipc_source_set (&req_exec_cpg_mcast.source, conn);
1880  memcpy(&req_exec_cpg_mcast.group_name, &group_name,
1881  sizeof(mar_cpg_name_t));
1882 
1883  req_exec_cpg_iovec[0].iov_base = (char *)&req_exec_cpg_mcast;
1884  req_exec_cpg_iovec[0].iov_len = sizeof(req_exec_cpg_mcast);
1885  req_exec_cpg_iovec[1].iov_base = (char *)&req_lib_cpg_mcast->message;
1886  req_exec_cpg_iovec[1].iov_len = msglen;
1887 
1888  result = api->totem_mcast (req_exec_cpg_iovec, 2, TOTEM_AGREED);
1889  assert(result == 0);
1890  } else {
1891  log_printf(LOGSYS_LEVEL_ERROR, "*** %p can't mcast to group %s state:%d, error:%d",
1892  conn, group_name.value, cpd->cpd_state, error);
1893  }
1894 
1895  res_lib_cpg_partial_send.header.error = error;
1896  api->ipc_response_send (conn, &res_lib_cpg_partial_send,
1897  sizeof (res_lib_cpg_partial_send));
1898 }
1899 
1900 /* Mcast message from the library */
1901 static void message_handler_req_lib_cpg_mcast (void *conn, const void *message)
1902 {
1903  const struct req_lib_cpg_mcast *req_lib_cpg_mcast = message;
1904  struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
1905  mar_cpg_name_t group_name = cpd->group_name;
1906 
1907  struct iovec req_exec_cpg_iovec[2];
1908  struct req_exec_cpg_mcast req_exec_cpg_mcast;
1909  int msglen = req_lib_cpg_mcast->msglen;
1910  int result;
1911  cs_error_t error = CS_ERR_NOT_EXIST;
1912 
1913  log_printf(LOGSYS_LEVEL_TRACE, "got mcast request on %p", conn);
1914 
1915  switch (cpd->cpd_state) {
1916  case CPD_STATE_UNJOINED:
1917  error = CS_ERR_NOT_EXIST;
1918  break;
1920  error = CS_ERR_NOT_EXIST;
1921  break;
1923  error = CS_OK;
1924  break;
1926  error = CS_OK;
1927  break;
1928  }
1929 
1930  if (error == CS_OK) {
1931  req_exec_cpg_mcast.header.size = sizeof(req_exec_cpg_mcast) + msglen;
1932  req_exec_cpg_mcast.header.id = SERVICE_ID_MAKE(CPG_SERVICE,
1934  req_exec_cpg_mcast.pid = cpd->pid;
1935  req_exec_cpg_mcast.msglen = msglen;
1936  api->ipc_source_set (&req_exec_cpg_mcast.source, conn);
1937  memcpy(&req_exec_cpg_mcast.group_name, &group_name,
1938  sizeof(mar_cpg_name_t));
1939 
1940  req_exec_cpg_iovec[0].iov_base = (char *)&req_exec_cpg_mcast;
1941  req_exec_cpg_iovec[0].iov_len = sizeof(req_exec_cpg_mcast);
1942  req_exec_cpg_iovec[1].iov_base = (char *)&req_lib_cpg_mcast->message;
1943  req_exec_cpg_iovec[1].iov_len = msglen;
1944 
1945  result = api->totem_mcast (req_exec_cpg_iovec, 2, TOTEM_AGREED);
1946  assert(result == 0);
1947  } else {
1948  log_printf(LOGSYS_LEVEL_ERROR, "*** %p can't mcast to group %s state:%d, error:%d",
1949  conn, group_name.value, cpd->cpd_state, error);
1950  }
1951 }
1952 
1953 static void message_handler_req_lib_cpg_zc_execute (
1954  void *conn,
1955  const void *message)
1956 {
1958  struct qb_ipc_request_header *header;
1959  struct res_lib_cpg_mcast res_lib_cpg_mcast;
1960  struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
1961  struct iovec req_exec_cpg_iovec[2];
1962  struct req_exec_cpg_mcast req_exec_cpg_mcast;
1964  int result;
1965  cs_error_t error = CS_ERR_NOT_EXIST;
1966 
1967  log_printf(LOGSYS_LEVEL_TRACE, "got ZC mcast request on %p", conn);
1968 
1969  header = (struct qb_ipc_request_header *)(((char *)serveraddr2void(hdr->server_address) + sizeof (struct coroipcs_zc_header)));
1970  req_lib_cpg_mcast = (struct req_lib_cpg_mcast *)header;
1971 
1972  switch (cpd->cpd_state) {
1973  case CPD_STATE_UNJOINED:
1974  error = CS_ERR_NOT_EXIST;
1975  break;
1977  error = CS_ERR_NOT_EXIST;
1978  break;
1980  error = CS_OK;
1981  break;
1983  error = CS_OK;
1984  break;
1985  }
1986 
1987  res_lib_cpg_mcast.header.size = sizeof(res_lib_cpg_mcast);
1988  res_lib_cpg_mcast.header.id = MESSAGE_RES_CPG_MCAST;
1989  if (error == CS_OK) {
1990  req_exec_cpg_mcast.header.size = sizeof(req_exec_cpg_mcast) + req_lib_cpg_mcast->msglen;
1991  req_exec_cpg_mcast.header.id = SERVICE_ID_MAKE(CPG_SERVICE,
1993  req_exec_cpg_mcast.pid = cpd->pid;
1994  req_exec_cpg_mcast.msglen = req_lib_cpg_mcast->msglen;
1995  api->ipc_source_set (&req_exec_cpg_mcast.source, conn);
1996  memcpy(&req_exec_cpg_mcast.group_name, &cpd->group_name,
1997  sizeof(mar_cpg_name_t));
1998 
1999  req_exec_cpg_iovec[0].iov_base = (char *)&req_exec_cpg_mcast;
2000  req_exec_cpg_iovec[0].iov_len = sizeof(req_exec_cpg_mcast);
2001  req_exec_cpg_iovec[1].iov_base = (char *)header + sizeof(struct req_lib_cpg_mcast);
2002  req_exec_cpg_iovec[1].iov_len = req_exec_cpg_mcast.msglen;
2003 
2004  result = api->totem_mcast (req_exec_cpg_iovec, 2, TOTEM_AGREED);
2005  if (result == 0) {
2006  res_lib_cpg_mcast.header.error = CS_OK;
2007  } else {
2008  res_lib_cpg_mcast.header.error = CS_ERR_TRY_AGAIN;
2009  }
2010  } else {
2011  res_lib_cpg_mcast.header.error = error;
2012  }
2013 
2014  api->ipc_response_send (conn, &res_lib_cpg_mcast,
2015  sizeof (res_lib_cpg_mcast));
2016 
2017 }
2018 
2019 static void message_handler_req_lib_cpg_membership (void *conn,
2020  const void *message)
2021 {
2023  (struct req_lib_cpg_membership_get *)message;
2024  struct res_lib_cpg_membership_get res_lib_cpg_membership_get;
2025  struct list_head *iter;
2026  int member_count = 0;
2027 
2028  res_lib_cpg_membership_get.header.id = MESSAGE_RES_CPG_MEMBERSHIP;
2029  res_lib_cpg_membership_get.header.error = CS_OK;
2030  res_lib_cpg_membership_get.header.size =
2031  sizeof (struct res_lib_cpg_membership_get);
2032 
2033  for (iter = process_info_list_head.next;
2034  iter != &process_info_list_head; iter = iter->next) {
2035 
2036  struct process_info *pi = list_entry (iter, struct process_info, list);
2037  if (mar_name_compare (&pi->group, &req_lib_cpg_membership_get->group_name) == 0) {
2038  res_lib_cpg_membership_get.member_list[member_count].nodeid = pi->nodeid;
2039  res_lib_cpg_membership_get.member_list[member_count].pid = pi->pid;
2040  member_count += 1;
2041  }
2042  }
2043  res_lib_cpg_membership_get.member_count = member_count;
2044 
2045  api->ipc_response_send (conn, &res_lib_cpg_membership_get,
2046  sizeof (res_lib_cpg_membership_get));
2047 }
2048 
2049 static void message_handler_req_lib_cpg_local_get (void *conn,
2050  const void *message)
2051 {
2052  struct res_lib_cpg_local_get res_lib_cpg_local_get;
2053 
2054  res_lib_cpg_local_get.header.size = sizeof (res_lib_cpg_local_get);
2055  res_lib_cpg_local_get.header.id = MESSAGE_RES_CPG_LOCAL_GET;
2056  res_lib_cpg_local_get.header.error = CS_OK;
2057  res_lib_cpg_local_get.local_nodeid = api->totem_nodeid_get ();
2058 
2059  api->ipc_response_send (conn, &res_lib_cpg_local_get,
2060  sizeof (res_lib_cpg_local_get));
2061 }
2062 
2063 static void message_handler_req_lib_cpg_iteration_initialize (
2064  void *conn,
2065  const void *message)
2066 {
2068  struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
2069  hdb_handle_t cpg_iteration_handle = 0;
2070  struct res_lib_cpg_iterationinitialize res_lib_cpg_iterationinitialize;
2071  struct list_head *iter, *iter2;
2072  struct cpg_iteration_instance *cpg_iteration_instance;
2073  cs_error_t error = CS_OK;
2074  int res;
2075 
2076  log_printf (LOGSYS_LEVEL_DEBUG, "cpg iteration initialize");
2077 
2078  /* Because between calling this function and *next can be some operations which will
2079  * change list, we must do full copy.
2080  */
2081 
2082  /*
2083  * Create new iteration instance
2084  */
2085  res = hdb_handle_create (&cpg_iteration_handle_t_db, sizeof (struct cpg_iteration_instance),
2086  &cpg_iteration_handle);
2087 
2088  if (res != 0) {
2089  error = CS_ERR_NO_MEMORY;
2090  goto response_send;
2091  }
2092 
2093  res = hdb_handle_get (&cpg_iteration_handle_t_db, cpg_iteration_handle, (void *)&cpg_iteration_instance);
2094 
2095  if (res != 0) {
2096  error = CS_ERR_BAD_HANDLE;
2097  goto error_destroy;
2098  }
2099 
2100  list_init (&cpg_iteration_instance->items_list_head);
2101  cpg_iteration_instance->handle = cpg_iteration_handle;
2102 
2103  /*
2104  * Create copy of process_info list "grouped by" group name
2105  */
2106  for (iter = process_info_list_head.next; iter != &process_info_list_head; iter = iter->next) {
2107  struct process_info *pi = list_entry (iter, struct process_info, list);
2108  struct process_info *new_pi;
2109 
2110  if (req_lib_cpg_iterationinitialize->iteration_type == CPG_ITERATION_NAME_ONLY) {
2111  /*
2112  * Try to find processed group name in our list new list
2113  */
2114  int found = 0;
2115 
2116  for (iter2 = cpg_iteration_instance->items_list_head.next;
2117  iter2 != &cpg_iteration_instance->items_list_head;
2118  iter2 = iter2->next) {
2119  struct process_info *pi2 = list_entry (iter2, struct process_info, list);
2120 
2121  if (mar_name_compare (&pi2->group, &pi->group) == 0) {
2122  found = 1;
2123  break;
2124  }
2125  }
2126 
2127  if (found) {
2128  /*
2129  * We have this name in list -> don't add
2130  */
2131  continue ;
2132  }
2133  } else if (req_lib_cpg_iterationinitialize->iteration_type == CPG_ITERATION_ONE_GROUP) {
2134  /*
2135  * Test pi group name with request
2136  */
2137  if (mar_name_compare (&pi->group, &req_lib_cpg_iterationinitialize->group_name) != 0)
2138  /*
2139  * Not same -> don't add
2140  */
2141  continue ;
2142  }
2143 
2144  new_pi = malloc (sizeof (struct process_info));
2145  if (!new_pi) {
2146  log_printf(LOGSYS_LEVEL_WARNING, "Unable to allocate process_info struct");
2147 
2148  error = CS_ERR_NO_MEMORY;
2149 
2150  goto error_put_destroy;
2151  }
2152 
2153  memcpy (new_pi, pi, sizeof (struct process_info));
2154  list_init (&new_pi->list);
2155 
2156  if (req_lib_cpg_iterationinitialize->iteration_type == CPG_ITERATION_NAME_ONLY) {
2157  /*
2158  * pid and nodeid -> undefined
2159  */
2160  new_pi->pid = new_pi->nodeid = 0;
2161  }
2162 
2163  /*
2164  * We will return list "grouped" by "group name", so try to find right place to add
2165  */
2166  for (iter2 = cpg_iteration_instance->items_list_head.next;
2167  iter2 != &cpg_iteration_instance->items_list_head;
2168  iter2 = iter2->next) {
2169  struct process_info *pi2 = list_entry (iter2, struct process_info, list);
2170 
2171  if (mar_name_compare (&pi2->group, &pi->group) == 0) {
2172  break;
2173  }
2174  }
2175 
2176  list_add (&new_pi->list, iter2);
2177  }
2178 
2179  /*
2180  * Now we have a full "grouped by" copy of process_info list
2181  */
2182 
2183  /*
2184  * Add instance to current cpd list
2185  */
2186  list_init (&cpg_iteration_instance->list);
2187  list_add (&cpg_iteration_instance->list, &cpd->iteration_instance_list_head);
2188 
2189  cpg_iteration_instance->current_pointer = &cpg_iteration_instance->items_list_head;
2190 
2191 error_put_destroy:
2192  hdb_handle_put (&cpg_iteration_handle_t_db, cpg_iteration_handle);
2193 error_destroy:
2194  if (error != CS_OK) {
2195  hdb_handle_destroy (&cpg_iteration_handle_t_db, cpg_iteration_handle);
2196  }
2197 
2198 response_send:
2199  res_lib_cpg_iterationinitialize.header.size = sizeof (res_lib_cpg_iterationinitialize);
2200  res_lib_cpg_iterationinitialize.header.id = MESSAGE_RES_CPG_ITERATIONINITIALIZE;
2201  res_lib_cpg_iterationinitialize.header.error = error;
2202  res_lib_cpg_iterationinitialize.iteration_handle = cpg_iteration_handle;
2203 
2204  api->ipc_response_send (conn, &res_lib_cpg_iterationinitialize,
2205  sizeof (res_lib_cpg_iterationinitialize));
2206 }
2207 
2208 static void message_handler_req_lib_cpg_iteration_next (
2209  void *conn,
2210  const void *message)
2211 {
2212  const struct req_lib_cpg_iterationnext *req_lib_cpg_iterationnext = message;
2213  struct res_lib_cpg_iterationnext res_lib_cpg_iterationnext;
2214  struct cpg_iteration_instance *cpg_iteration_instance;
2215  cs_error_t error = CS_OK;
2216  int res;
2217  struct process_info *pi;
2218 
2219  log_printf (LOGSYS_LEVEL_DEBUG, "cpg iteration next");
2220 
2221  res = hdb_handle_get (&cpg_iteration_handle_t_db,
2222  req_lib_cpg_iterationnext->iteration_handle,
2223  (void *)&cpg_iteration_instance);
2224 
2225  if (res != 0) {
2226  error = CS_ERR_LIBRARY;
2227  goto error_exit;
2228  }
2229 
2230  assert (cpg_iteration_instance);
2231 
2232  cpg_iteration_instance->current_pointer = cpg_iteration_instance->current_pointer->next;
2233 
2234  if (cpg_iteration_instance->current_pointer == &cpg_iteration_instance->items_list_head) {
2235  error = CS_ERR_NO_SECTIONS;
2236  goto error_put;
2237  }
2238 
2239  pi = list_entry (cpg_iteration_instance->current_pointer, struct process_info, list);
2240 
2241  /*
2242  * Copy iteration data
2243  */
2244  res_lib_cpg_iterationnext.description.nodeid = pi->nodeid;
2245  res_lib_cpg_iterationnext.description.pid = pi->pid;
2246  memcpy (&res_lib_cpg_iterationnext.description.group,
2247  &pi->group,
2248  sizeof (mar_cpg_name_t));
2249 
2250 error_put:
2251  hdb_handle_put (&cpg_iteration_handle_t_db, req_lib_cpg_iterationnext->iteration_handle);
2252 error_exit:
2253  res_lib_cpg_iterationnext.header.size = sizeof (res_lib_cpg_iterationnext);
2254  res_lib_cpg_iterationnext.header.id = MESSAGE_RES_CPG_ITERATIONNEXT;
2255  res_lib_cpg_iterationnext.header.error = error;
2256 
2257  api->ipc_response_send (conn, &res_lib_cpg_iterationnext,
2258  sizeof (res_lib_cpg_iterationnext));
2259 }
2260 
2261 static void message_handler_req_lib_cpg_iteration_finalize (
2262  void *conn,
2263  const void *message)
2264 {
2266  struct res_lib_cpg_iterationfinalize res_lib_cpg_iterationfinalize;
2267  struct cpg_iteration_instance *cpg_iteration_instance;
2268  cs_error_t error = CS_OK;
2269  int res;
2270 
2271  log_printf (LOGSYS_LEVEL_DEBUG, "cpg iteration finalize");
2272 
2273  res = hdb_handle_get (&cpg_iteration_handle_t_db,
2274  req_lib_cpg_iterationfinalize->iteration_handle,
2275  (void *)&cpg_iteration_instance);
2276 
2277  if (res != 0) {
2278  error = CS_ERR_LIBRARY;
2279  goto error_exit;
2280  }
2281 
2282  assert (cpg_iteration_instance);
2283 
2284  cpg_iteration_instance_finalize (cpg_iteration_instance);
2285  hdb_handle_put (&cpg_iteration_handle_t_db, cpg_iteration_instance->handle);
2286 
2287 error_exit:
2288  res_lib_cpg_iterationfinalize.header.size = sizeof (res_lib_cpg_iterationfinalize);
2289  res_lib_cpg_iterationfinalize.header.id = MESSAGE_RES_CPG_ITERATIONFINALIZE;
2290  res_lib_cpg_iterationfinalize.header.error = error;
2291 
2292  api->ipc_response_send (conn, &res_lib_cpg_iterationfinalize,
2293  sizeof (res_lib_cpg_iterationfinalize));
2294 }
void *(* ipc_private_data_get)(void *conn)
Definition: coroapi.h:256
#define TOTEM_AGREED
Definition: coroapi.h:102
int initial_totem_conf_sent
Definition: exec/cpg.c:150
const char * name
Definition: coroapi.h:492
Definition: exec/cpg.c:1759
mar_cpg_address_t member_list[]
Definition: ipc_cpg.h:390
mar_req_coroipcc_zc_free_t struct
Definition: ipc_cpg.h:481
#define CPG_MAX_NAME_LENGTH
Definition: cpg.h:115
mar_cpg_address_t struct
Definition: ipc_cpg.h:155
uint64_t initial_transition_counter
Definition: exec/cpg.c:152
#define LOGSYS_LEVEL_TRACE
Definition: logsys.h:75
mar_uint32_t sender_nodeid
Definition: exec/cpg.c:494
#define CPG_MODEL_V1_DELIVER_INITIAL_TOTEM_CONF
Definition: cpg.h:192
The req_lib_cpg_join struct.
Definition: ipc_cpg.h:251
mar_req_coroipcc_zc_alloc_t struct
Definition: ipc_cpg.h:472
The cpg_name struct.
Definition: cpg.h:119
struct list_head * next
Definition: list.h:47
The corosync_service_engine struct.
Definition: coroapi.h:491
struct list_head list
Definition: exec/cpg.c:160
int(* ipc_dispatch_iov_send)(void *conn, const struct iovec *iov, unsigned int iov_len)
Definition: coroapi.h:265
int(* ipc_response_send)(void *conn, const void *msg, size_t mlen)
Definition: coroapi.h:258
struct corosync_service_engine * cpg_get_service_engine_ver0(void)
Definition: exec/cpg.c:446
cpg_sync_state
Definition: exec/cpg.c:137
The res_lib_cpg_partial_deliver_callback struct.
Definition: ipc_cpg.h:345
The req_lib_cpg_mcast struct.
Definition: ipc_cpg.h:304
mar_cpg_name_t group
Definition: exec/cpg.c:186
The corosync_lib_handler struct.
Definition: coroapi.h:468
The res_lib_cpg_membership_get struct.
Definition: ipc_cpg.h:375
struct message_header header
Definition: totemsrp.c:60
struct list_head * current_pointer
Definition: exec/cpg.c:162
The res_lib_cpg_iterationnext struct.
Definition: ipc_cpg.h:449
hdb_handle_t handle
Definition: exec/cpg.c:159
uint32_t pid
Definition: exec/cpg.c:185
#define CPG_MEMBERS_MAX
Definition: cpg.h:124
The res_lib_cpg_iterationinitialize struct.
Definition: ipc_cpg.h:433
The corosync_exec_handler struct.
Definition: coroapi.h:476
int(* totem_mcast)(const struct iovec *iovec, unsigned int iov_len, unsigned int guarantee)
Definition: coroapi.h:281
coroipcs_zc_header struct
Definition: ipc_cpg.h:498
uint64_t transition_counter
Definition: exec/cpg.c:151
Definition: list.h:46
#define log_printf(level, format, args...)
Definition: logsys.h:320
struct list_head list
Definition: exec/cpg.c:187
The res_lib_cpg_partial_send struct.
Definition: ipc_cpg.h:297
void(* exec_handler_fn)(const void *msg, unsigned int nodeid)
Definition: coroapi.h:477
uint64_t server_address
Definition: ipc_cpg.h:500
void * conn
Definition: exec/cpg.c:145
struct list_head iteration_instance_list_head
Definition: exec/cpg.c:154
#define SERVICE_ID_MAKE(a, b)
Definition: coroapi.h:459
The req_lib_cpg_iterationinitialize struct.
Definition: ipc_cpg.h:424
#define LOGSYS_LEVEL_WARNING
Definition: logsys.h:71
The res_lib_cpg_join struct.
Definition: ipc_cpg.h:261
struct list_head list
Definition: exec/cpg.c:497
unsigned int flags
Definition: exec/cpg.c:149
unsigned int(* totem_nodeid_get)(void)
Definition: coroapi.h:275
uint32_t pid
Definition: exec/cpg.c:495
unsigned int nodeid
Definition: coroapi.h:112
void(* ipc_refcnt_dec)(void *conn)
Definition: coroapi.h:270
mar_req_coroipcc_zc_execute_t struct
Definition: ipc_cpg.h:490
The res_lib_cpg_mcast struct.
Definition: ipc_cpg.h:326
struct list_head list
Definition: exec/cpg.c:89
#define LOGSYS_LEVEL_ERROR
Definition: logsys.h:70
size_t size
Definition: exec/cpg.c:91
Linked list API.
mar_cpg_name_t struct
Definition: ipc_cpg.h:112
void * server_ptr
Definition: exec/cpg.c:1761
struct totem_ip_address rep
Definition: coroapi.h:123
uint32_t pid
Definition: exec/cpg.c:147
cs_error_t
The cs_error_t enum.
Definition: corotypes.h:94
The req_lib_cpg_leave struct.
Definition: ipc_cpg.h:408
#define LOGSYS_LEVEL_DEBUG
Definition: logsys.h:74
mar_cpg_address_t member_list[PROCESSOR_COUNT_MAX]
Definition: ipc_cpg.h:378
mar_cpg_name_t group_name
Definition: exec/cpg.c:193
The req_lib_cpg_iterationfinalize struct.
Definition: ipc_cpg.h:457
uint8_t mar_uint8_t
Definition: mar_gen.h:51
mar_cpg_name_t group_name
Definition: exec/cpg.c:146
The corosync_api_v1 struct.
Definition: coroapi.h:225
typedef __attribute__
cpg_message_req_types
Definition: exec/cpg.c:78
LOGSYS_DECLARE_SUBSYS("CPG")
DECLARE_HDB_DATABASE(cpg_iteration_handle_t_db, NULL)
#define swab32(x)
The swab32 macro.
Definition: swab.h:51
enum cpd_state cpd_state
Definition: exec/cpg.c:148
The res_lib_cpg_finalize struct.
Definition: ipc_cpg.h:275
uint32_t mar_uint32_t
Definition: mar_gen.h:53
unsigned int nodeid
Definition: exec/cpg.c:184
The res_lib_cpg_local_get struct.
Definition: ipc_cpg.h:289
struct list_head list
Definition: exec/cpg.c:153
#define PROCESSOR_COUNT_MAX
Definition: coroapi.h:96
Definition: exec/cpg.c:191
qb_handle_t hdb_handle_t
Definition: hdb.h:52
The memb_ring_id struct.
Definition: coroapi.h:122
The res_lib_cpg_iterationfinalize struct.
Definition: ipc_cpg.h:465
struct corosync_service_engine cpg_service_engine
Definition: exec/cpg.c:425
The req_lib_cpg_partial_mcast struct.
Definition: ipc_cpg.h:314
struct list_head zcb_mapped_list_head
Definition: exec/cpg.c:155
The req_lib_cpg_iterationnext struct.
Definition: ipc_cpg.h:441
const char *(* totem_ifaces_print)(unsigned int nodeid)
Definition: coroapi.h:291
uint32_t pid
Definition: exec/cpg.c:192
The res_lib_cpg_confchg_callback struct.
Definition: ipc_cpg.h:384
#define list_entry(ptr, type, member)
Definition: list.h:84
void * addr
Definition: exec/cpg.c:90
mar_cpg_name_t group_name
Definition: exec/cpg.c:496
unsigned long long seq
Definition: coroapi.h:124
void(* lib_handler_fn)(void *conn, const void *msg)
Definition: coroapi.h:469
char type
Definition: totemrrp.c:518
The req_lib_cpg_membership_get struct.
Definition: ipc_cpg.h:367
int(* ipc_dispatch_send)(void *conn, const void *msg, size_t mlen)
Definition: coroapi.h:263
struct list_head items_list_head
Definition: exec/cpg.c:161
uint64_t server_addr
Definition: exec/cpg.c:1760
The res_lib_cpg_leave struct.
Definition: ipc_cpg.h:417
unsigned int nodeid
Definition: coroapi.h:75
struct memb_ring_id ring_id
Definition: totemsrp.c:64
mar_cpg_ring_id_t struct
Definition: ipc_cpg.h:230
void(* ipc_source_set)(mar_message_source_t *source, void *conn)
Definition: coroapi.h:252
cpd_state
Definition: exec/cpg.c:130
The res_lib_cpg_totem_confchg_callback struct.
Definition: ipc_cpg.h:398
DECLARE_LIST_INIT(cpg_pd_list_head)
Message from another node.
Definition: ipc_cpg.h:333
The mar_message_source_t struct.
Definition: coroapi.h:50
void(* ipc_refcnt_inc)(void *conn)
Definition: coroapi.h:268