corosync  3.1.0
vsf_ykd.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2005 MontaVista Software, Inc.
3  * Copyright (c) 2006-2012 Red Hat, Inc.
4  *
5  * All rights reserved.
6  *
7  * Author: Steven Dake (sdake@redhat.com)
8  *
9  * This software licensed under BSD license, the text of which follows:
10  *
11  * Redistribution and use in source and binary forms, with or without
12  * modification, are permitted provided that the following conditions are met:
13  *
14  * - Redistributions of source code must retain the above copyright notice,
15  * this list of conditions and the following disclaimer.
16  * - Redistributions in binary form must reproduce the above copyright notice,
17  * this list of conditions and the following disclaimer in the documentation
18  * and/or other materials provided with the distribution.
19  * - Neither the name of the MontaVista Software, Inc. nor the names of its
20  * contributors may be used to endorse or promote products derived from this
21  * software without specific prior written permission.
22  *
23  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
24  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
27  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
28  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
29  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
30  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
31  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
32  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
33  * THE POSSIBILITY OF SUCH DAMAGE.
34  */
35 
36 #include <config.h>
37 
38 #include <assert.h>
39 #include <pwd.h>
40 #include <grp.h>
41 #include <sys/types.h>
42 #include <sys/poll.h>
43 #include <sys/uio.h>
44 #include <sys/mman.h>
45 #include <sys/socket.h>
46 #include <sys/un.h>
47 #include <sys/time.h>
48 #include <sys/resource.h>
49 #include <netinet/in.h>
50 #include <arpa/inet.h>
51 #include <unistd.h>
52 #include <fcntl.h>
53 #include <stdlib.h>
54 #include <stdio.h>
55 #include <errno.h>
56 #include <sched.h>
57 #include <time.h>
58 
59 #include "quorum.h"
60 #include <corosync/logsys.h>
61 #include <corosync/corotypes.h>
62 #include <qb/qbipc_common.h>
63 #include <corosync/mar_gen.h>
64 #include <corosync/coroapi.h>
65 #include <corosync/swab.h>
66 
67 #include "vsf_ykd.h"
68 
69 LOGSYS_DECLARE_SUBSYS ("YKD");
70 
71 #define YKD_PROCESSOR_COUNT_MAX 32
72 
76 };
77 
78 enum ykd_mode {
81 };
82 
83 struct ykd_header {
84  int id;
85 };
86 
87 struct ykd_session {
88  unsigned int member_list[YKD_PROCESSOR_COUNT_MAX];
91 };
92 
93 struct ykd_state {
94  struct ykd_session last_primary;
95 
96  struct ykd_session last_formed[YKD_PROCESSOR_COUNT_MAX];
97 
99 
100  struct ykd_session ambiguous_sessions[YKD_PROCESSOR_COUNT_MAX];
101 
103 
105 };
106 
108  unsigned int nodeid;
109  int received;
111 };
112 
113 struct ykd_state ykd_state;
114 
115 static void *ykd_group_handle;
116 
117 static struct state_received state_received_confchg[YKD_PROCESSOR_COUNT_MAX];
118 
119 static int state_received_confchg_entries;
120 
121 static struct state_received state_received_process[YKD_PROCESSOR_COUNT_MAX];
122 
123 static int state_received_process_entries;
124 
125 static enum ykd_mode ykd_mode;
126 
127 static unsigned int ykd_view_list[YKD_PROCESSOR_COUNT_MAX];
128 
129 static int ykd_view_list_entries;
130 
131 static int session_id_max;
132 
133 static struct ykd_session *last_primary_max;
134 
135 static struct ykd_session ambiguous_sessions_max[YKD_PROCESSOR_COUNT_MAX];
136 
137 static int ambiguous_sessions_max_entries;
138 
139 static int ykd_primary_designated = 0;
140 
141 static struct memb_ring_id ykd_ring_id;
142 
144 
146 
147 static struct corosync_api_v1 *api;
148 
149 static void (*ykd_primary_callback_fn) (
150  const unsigned int *view_list,
151  size_t view_list_entries,
152  int primary_designated,
153  struct memb_ring_id *ring_id) = NULL;
154 
155 static void ykd_state_init (void)
156 {
157  ykd_state.session_id = 0;
158  ykd_state.last_formed_entries = 0;
159  ykd_state.ambiguous_sessions_entries = 0;
160  ykd_state.last_primary.session_id = 0;
161  ykd_state.last_primary.member_list_entries = 0;
162 }
163 
164 static int ykd_state_send_msg (const void *context)
165 {
166  struct iovec iovec[2];
167  struct ykd_header header;
168  int res;
169 
170  header.id = YKD_HEADER_SENDSTATE;
171 
172  iovec[0].iov_base = (char *)&header;
173  iovec[0].iov_len = sizeof (struct ykd_header);
174  iovec[1].iov_base = (char *)&ykd_state;
175  iovec[1].iov_len = sizeof (struct ykd_state);
176 
177  res = api->tpg_joined_mcast (ykd_group_handle, iovec, 2,
178  TOTEM_AGREED);
179 
180  return (res);
181 }
182 
183 static void ykd_state_send (void)
184 {
185  api->schedwrk_create (
187  ykd_state_send_msg,
188  NULL);
189 }
190 
191 static int ykd_attempt_send_msg (const void *context)
192 {
193  struct iovec iovec;
194  struct ykd_header header;
195  int res;
196 
197  header.id = YKD_HEADER_ATTEMPT;
198 
199  iovec.iov_base = (char *)&header;
200  iovec.iov_len = sizeof (struct ykd_header);
201 
202  res = api->tpg_joined_mcast (ykd_group_handle, &iovec, 1,
203  TOTEM_AGREED);
204 
205  return (res);
206 }
207 
208 static void ykd_attempt_send (void)
209 {
210  api->schedwrk_create (
212  ykd_attempt_send_msg,
213  NULL);
214 }
215 
216 static void compute (void)
217 {
218  int i;
219  int j;
220 
221  session_id_max = 0;
222  last_primary_max = &state_received_process[0].ykd_state.last_primary;
223  ambiguous_sessions_max_entries = 0;
224 
225  for (i = 0; i < state_received_process_entries; i++) {
226  /*
227  * Calculate maximum session id
228  */
229  if (state_received_process[i].ykd_state.session_id > session_id_max) {
230  session_id_max = state_received_process[i].ykd_state.session_id;
231  }
232 
233  /*
234  * Calculate maximum primary id
235  */
236  if (state_received_process[i].ykd_state.last_primary.session_id > last_primary_max->session_id) {
237  last_primary_max = &state_received_process[i].ykd_state.last_primary;
238  }
239 
240  /*
241  * generate the maximum ambiguous sessions list
242  */
243  for (j = 0; j < state_received_process[i].ykd_state.ambiguous_sessions_entries; j++) {
244  if (state_received_process[i].ykd_state.ambiguous_sessions[j].session_id > last_primary_max->session_id) {
245  memcpy (&ambiguous_sessions_max[ambiguous_sessions_max_entries],
246  &state_received_process[i].ykd_state.ambiguous_sessions[j],
247  sizeof (struct ykd_session));
248  ambiguous_sessions_max_entries += 1;
249  }
250  }
251  }
252 }
253 
254 static int subquorum (
255  unsigned int *member_list,
256  int member_list_entries,
257  struct ykd_session *session)
258 {
259  int intersections = 0;
260  int i;
261  int j;
262 
263  for (i = 0; i < member_list_entries; i++) {
264  for (j = 0; j < session->member_list_entries; j++) {
265  if (member_list[i] == session->member_list[j]) {
266  intersections += 1;
267  }
268  }
269  }
270 
271  /*
272  * even split
273  */
274  if (intersections == (session->member_list_entries - intersections)) {
275  return (1);
276  } else
277 
278  /*
279  * majority split
280  */
281  if (intersections > (session->member_list_entries - intersections)) {
282  return (1);
283  }
284  return (0);
285 }
286 
287 static int decide (void)
288 {
289  int i;
290 
291  /*
292  * Determine if there is a subquorum
293  */
294  if (subquorum (ykd_view_list, ykd_view_list_entries, last_primary_max) == 0) {
295  return (0);
296  }
297 
298  for (i = 0; i < ambiguous_sessions_max_entries; i++) {
299  if (subquorum (ykd_view_list, ykd_view_list_entries, &ambiguous_sessions_max[i]) == 0) {
300  return (0);
301  }
302 
303  }
304  return (1);
305 }
306 
307 static void ykd_session_endian_convert (struct ykd_session *ykd_session)
308 {
309  int i;
310 
311  ykd_session->member_list_entries =
312  swab32 (ykd_session->member_list_entries);
313  ykd_session->session_id = swab32 (ykd_session->session_id);
314  for (i = 0; i < ykd_session->member_list_entries; i++) {
315  ykd_session->member_list[i] =
316  swab32 (ykd_session->member_list[i]);
317  }
318 }
319 
320 static void ykd_state_endian_convert (struct ykd_state *state)
321 {
322  int i;
323 
324  ykd_session_endian_convert (&state->last_primary);
327  state->session_id = swab32 (state->session_id);
328 
329  for (i = 0; i < state->last_formed_entries; i++) {
330  ykd_session_endian_convert (&state->last_formed[i]);
331  }
332 
333  for (i = 0; i < state->ambiguous_sessions_entries; i++) {
334  ykd_session_endian_convert (&state->ambiguous_sessions[i]);
335  }
336 }
337 
338 static void ykd_deliver_fn (
339  unsigned int nodeid,
340  const void *msg,
341  unsigned int msg_len,
342  int endian_conversion_required)
343 {
344  int all_received = 1;
345  int state_position = 0;
346  int i;
347  struct ykd_header *header = (struct ykd_header *)msg;
348  char *msg_state = (char *)msg + sizeof (struct ykd_header);
349 
350  /*
351  * If this is a localhost address, this node is always primary
352  */
353 #ifdef TODO
354  if (totemip_localhost_check (source_addr)) {
356  "This processor is within the primary component.");
357  primary_designated = 1;
358 
359  ykd_primary_callback_fn (
360  ykd_view_list,
361  ykd_view_list_entries,
362  primary_designated,
363  &ykd_ring_id);
364  return;
365  }
366 #endif
367  if (endian_conversion_required &&
368  (msg_len > sizeof (struct ykd_header))) {
369  ykd_state_endian_convert ((struct ykd_state *)msg_state);
370  }
371 
372  /*
373  * Set completion for source_addr's address
374  */
375  for (state_position = 0; state_position < state_received_confchg_entries; state_position++) {
376  if (nodeid == state_received_process[state_position].nodeid) {
377  /*
378  * State position contains the address of the state to modify
379  * This may be used later by the other algorithms
380  */
381  state_received_process[state_position].received = 1;
382  break;
383  }
384  }
385 
386  /*
387  * Test if all nodes have submitted their state data
388  */
389  for (i = 0; i < state_received_confchg_entries; i++) {
390  if (state_received_process[i].received == 0) {
391  all_received = 0;
392  }
393  }
394 
395  /*
396  * Ignore messages from a different state
397  */
398  if ((ykd_mode == YKD_MODE_SENDSTATE && header->id == YKD_HEADER_ATTEMPT) ||
400  return;
401 
402  switch (ykd_mode) {
403  case YKD_MODE_SENDSTATE:
404  assert (msg_len > sizeof (struct ykd_header));
405  /*
406  * Copy state information for the sending processor
407  */
408  memcpy (&state_received_process[state_position].ykd_state,
409  msg_state, sizeof (struct ykd_state));
410 
411  /*
412  * Try to form a component
413  */
414  if (all_received) {
415  for (i = 0; i < state_received_confchg_entries; i++) {
416  state_received_process[i].received = 0;
417  }
419 
420 // TODO resolve optimizes for failure conditions during ykd calculation
421 // resolve();
422  compute();
423 
424  if (decide ()) {
425  ykd_state.session_id = session_id_max + 1;
426  memcpy (ykd_state.ambiguous_sessions[ykd_state.ambiguous_sessions_entries].member_list,
427  ykd_view_list, sizeof (unsigned int) * ykd_view_list_entries);
428  ykd_state.ambiguous_sessions[ykd_state.ambiguous_sessions_entries].member_list_entries = ykd_view_list_entries;
429  ykd_state.ambiguous_sessions_entries += 1;
430  ykd_attempt_send();
431  }
432  }
433  break;
434 
435  case YKD_MODE_ATTEMPT:
436  if (all_received) {
438  "This processor is within the primary component.");
439  ykd_primary_designated = 1;
440 
441  ykd_primary_callback_fn (
442  ykd_view_list,
443  ykd_view_list_entries,
444  ykd_primary_designated,
445  &ykd_ring_id);
446 
447  memcpy (ykd_state.last_primary.member_list, ykd_view_list, sizeof (ykd_view_list));
448  ykd_state.last_primary.member_list_entries = ykd_view_list_entries;
449  ykd_state.last_primary.session_id = ykd_state.session_id;
450  ykd_state.ambiguous_sessions_entries = 0;
451  }
452  break;
453  }
454 }
455 
456 int first_run = 1;
457 static void ykd_confchg_fn (
458  enum totem_configuration_type configuration_type,
459  const unsigned int *member_list, size_t member_list_entries,
460  const unsigned int *left_list, size_t left_list_entries,
461  const unsigned int *joined_list, size_t joined_list_entries,
462  const struct memb_ring_id *ring_id)
463 {
464  int i;
465 
466  if (configuration_type != TOTEM_CONFIGURATION_REGULAR) {
467  return;
468  }
469 
470  memcpy (&ykd_ring_id, ring_id, sizeof (struct memb_ring_id));
471 
472  if (first_run) {
473  ykd_state.last_primary.member_list[0] = api->totem_nodeid_get();
474  ykd_state.last_primary.member_list_entries = 1;
475  ykd_state.last_primary.session_id = 0;
476  first_run = 0;
477  }
478  memcpy (ykd_view_list, member_list,
479  member_list_entries * sizeof (unsigned int));
480  ykd_view_list_entries = member_list_entries;
481 
483 
484  ykd_primary_designated = 0;
485 
486  ykd_primary_callback_fn (
487  ykd_view_list,
488  ykd_view_list_entries,
489  ykd_primary_designated,
490  &ykd_ring_id);
491 
492  memset (&state_received_confchg, 0, sizeof (state_received_confchg));
493  for (i = 0; i < member_list_entries; i++) {
494  state_received_confchg[i].nodeid = member_list[i];
495  state_received_confchg[i].received = 0;
496  }
497  memcpy (state_received_process, state_received_confchg,
498  sizeof (state_received_confchg));
499 
500  state_received_confchg_entries = member_list_entries;
501  state_received_process_entries = member_list_entries;
502 
503  ykd_state_send ();
504 }
505 
507  .group = "ykd",
508  .group_len = 3
509 };
510 
511 char *ykd_init (
512  struct corosync_api_v1 *corosync_api,
513  quorum_set_quorate_fn_t set_primary)
514 {
515  const char *error = NULL;
516 
517  ykd_primary_callback_fn = set_primary;
518  api = corosync_api;
519 
520  if (set_primary == 0) {
521  error = (char *)"set primary not set";
522  }
523 
524  api->tpg_init (
525  &ykd_group_handle,
526  ykd_deliver_fn,
527  ykd_confchg_fn);
528 
529  api->tpg_join (
530  ykd_group_handle,
531  &ykd_group,
532  1);
533 
534  ykd_state_init ();
535 
536  return ((char *)error);
537 }
#define TOTEM_AGREED
Definition: coroapi.h:102
int session_id
Definition: vsf_ykd.c:104
hdb_handle_t schedwrk_attempt_send_callback_handle
Definition: vsf_ykd.c:143
struct ykd_session ambiguous_sessions[YKD_PROCESSOR_COUNT_MAX]
Definition: vsf_ykd.c:100
char * ykd_init(struct corosync_api_v1 *corosync_api, quorum_set_quorate_fn_t set_primary)
Definition: vsf_ykd.c:511
struct ykd_session last_primary
Definition: vsf_ykd.c:94
totem_configuration_type
The totem_configuration_type enum.
Definition: coroapi.h:132
struct ykd_state ykd_state
Definition: vsf_ykd.c:110
struct ykd_session last_formed[YKD_PROCESSOR_COUNT_MAX]
Definition: vsf_ykd.c:96
#define YKD_PROCESSOR_COUNT_MAX
Definition: vsf_ykd.c:71
LOGSYS_DECLARE_SUBSYS("YKD")
const void * group
Definition: coroapi.h:82
ykd_header_values
Definition: vsf_ykd.c:73
#define log_printf(level, format, args...)
Definition: logsys.h:323
struct corosync_tpg_group ykd_group
Definition: vsf_ykd.c:506
unsigned int member_list[YKD_PROCESSOR_COUNT_MAX]
Definition: vsf_ykd.c:88
The corosync_tpg_group struct.
Definition: coroapi.h:81
hdb_handle_t schedwrk_state_send_callback_handle
Definition: vsf_ykd.c:145
void(* quorum_set_quorate_fn_t)(const unsigned int *view_list, size_t view_list_entries, int quorate, struct memb_ring_id *)
Definition: exec/quorum.h:42
int first_run
Definition: vsf_ykd.c:456
int(* tpg_join)(void *instance, const struct corosync_tpg_group *groups, size_t group_cnt)
Definition: coroapi.h:330
unsigned int(* totem_nodeid_get)(void)
Definition: coroapi.h:275
int session_id
Definition: vsf_ykd.c:90
int(* tpg_init)(void **instance, void(*deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required), void(*confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id))
Definition: coroapi.h:308
The corosync_api_v1 struct.
Definition: coroapi.h:225
int id
Definition: vsf_ykd.c:84
struct totem_message_header header
Definition: totemsrp.c:260
#define swab32(x)
The swab32 macro.
Definition: swab.h:51
int totemip_localhost_check(const struct totem_ip_address *addr)
Definition: totemip.c:225
int(* schedwrk_create)(hdb_handle_t *handle, int(schedwrk_fn)(const void *), const void *context)
Definition: coroapi.h:372
unsigned int nodeid
Definition: vsf_ykd.c:108
qb_handle_t hdb_handle_t
Definition: hdb.h:52
The memb_ring_id struct.
Definition: coroapi.h:122
int ambiguous_sessions_entries
Definition: vsf_ykd.c:102
ykd_mode
Definition: vsf_ykd.c:78
int member_list_entries
Definition: vsf_ykd.c:89
int last_formed_entries
Definition: vsf_ykd.c:98
#define LOGSYS_LEVEL_NOTICE
Definition: logsys.h:74
unsigned int nodeid
Definition: coroapi.h:75
struct memb_ring_id ring_id
Definition: totemsrp.c:264
int(* tpg_joined_mcast)(void *totempg_groups_instance, const struct iovec *iovec, unsigned int iov_len, int guarantee)
Definition: coroapi.h:340