Branch data Line data Source code
1 : : /*
2 : : * (c) Copyright 2016 Hewlett Packard Enterprise Development LP
3 : : * Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2016 Nicira, Inc.
4 : : *
5 : : * Licensed under the Apache License, Version 2.0 (the "License");
6 : : * you may not use this file except in compliance with the License.
7 : : * You may obtain a copy of the License at:
8 : : *
9 : : * http://www.apache.org/licenses/LICENSE-2.0
10 : : *
11 : : * Unless required by applicable law or agreed to in writing, software
12 : : * distributed under the License is distributed on an "AS IS" BASIS,
13 : : * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 : : * See the License for the specific language governing permissions and
15 : : * limitations under the License.
16 : : */
17 : :
18 : : #include <config.h>
19 : :
20 : :
21 : : #include "condition.h"
22 : : #include "jsonrpc.h"
23 : : #include "openvswitch/dynamic-string.h"
24 : : #include "openvswitch/hmap.h"
25 : : #include "openvswitch/json.h"
26 : : #include "openvswitch/vlog.h"
27 : : #include "ovsdb-error.h"
28 : : #include "ovsdb.h"
29 : : #include "query.h"
30 : : #include "replication.h"
31 : : #include "row.h"
32 : : #include "sset.h"
33 : : #include "stream.h"
34 : : #include "svec.h"
35 : : #include "table.h"
36 : : #include "transaction.h"
37 : :
38 : 2594 : VLOG_DEFINE_THIS_MODULE(replication);
39 : :
40 : : static char *sync_from;
41 : : static struct jsonrpc_session *session = NULL;
42 : : static unsigned int session_seqno = UINT_MAX;
43 : :
44 : : static struct jsonrpc_msg *create_monitor_request(struct ovsdb *db);
45 : : static void add_monitored_table(struct ovsdb_table_schema *table,
46 : : struct json *monitor_requests);
47 : :
48 : : static struct ovsdb_error *reset_database(struct ovsdb *db);
49 : :
50 : : static struct ovsdb_error *process_notification(struct json *, struct ovsdb *);
51 : : static struct ovsdb_error *process_table_update(struct json *table_update,
52 : : const char *table_name,
53 : : struct ovsdb *database,
54 : : struct ovsdb_txn *txn);
55 : :
56 : : static struct ovsdb_error *execute_insert(struct ovsdb_txn *txn,
57 : : const struct uuid *row_uuid,
58 : : struct ovsdb_table *table,
59 : : struct json *new);
60 : : static struct ovsdb_error *execute_delete(struct ovsdb_txn *txn,
61 : : const struct uuid *row_uuid,
62 : : struct ovsdb_table *table);
63 : : static struct ovsdb_error *execute_update(struct ovsdb_txn *txn,
64 : : const struct uuid *row_uuid,
65 : : struct ovsdb_table *table,
66 : : struct json *new);
67 : :
68 : : /* Maps from db name to sset of table names. */
69 : : static struct shash blacklist_tables = SHASH_INITIALIZER(&blacklist_tables);
70 : :
71 : : static void blacklist_tables_clear(void);
72 : : static void blacklist_tables_add(const char *database, const char *table);
73 : : static bool blacklist_tables_find(const char *database, const char* table);
74 : :
75 : :
76 : : /* Keep track of request IDs of all outstanding OVSDB requests. */
77 : : static struct hmap request_ids = HMAP_INITIALIZER(&request_ids);
78 : :
79 : : struct request_ids_hmap_node {
80 : : struct hmap_node hmap;
81 : : struct json *request_id;
82 : : struct ovsdb *db; /* associated database */
83 : : };
84 : : void request_ids_add(const struct json *id, struct ovsdb *db);
85 : : bool request_ids_lookup_and_free(const struct json *id, struct ovsdb **db);
86 : : static void request_ids_destroy(void);
87 : : void request_ids_clear(void);
88 : :
89 : : enum ovsdb_replication_state {
90 : : RPL_S_INIT,
91 : : RPL_S_DB_REQUESTED,
92 : : RPL_S_SCHEMA_REQUESTED,
93 : : RPL_S_MONITOR_REQUESTED,
94 : : RPL_S_REPLICATING,
95 : : RPL_S_ERR /* Error, no longer replicating. */
96 : : };
97 : : static enum ovsdb_replication_state state;
98 : :
99 : :
100 : : /* All DBs known to ovsdb-server. The actual replication dbs are stored
101 : : * in 'replication dbs', which is a subset of all dbs and remote dbs whose
102 : : * schema matches. */
103 : : static struct shash local_dbs = SHASH_INITIALIZER(&local_dbs);
104 : : static struct shash *replication_dbs = NULL;
105 : :
106 : : static struct shash *replication_db_clone(struct shash *dbs);
107 : : /* Find 'struct ovsdb' by name within 'replication_dbs' */
108 : : static struct ovsdb* find_db(const char *db_name);
109 : :
110 : :
111 : : void
112 : 35 : replication_init(const char *sync_from_, const char *exclude_tables)
113 : : {
114 : 35 : free(sync_from);
115 : 35 : sync_from = xstrdup(sync_from_);
116 : 35 : char *err = set_blacklist_tables(exclude_tables, false);
117 : : /* Caller should have verified that the 'exclude_tables' is
118 : : * parseable. An error here is unexpected. */
119 [ - + ]: 35 : ovs_assert(!err);
120 : :
121 : 35 : shash_destroy(replication_dbs);
122 : 35 : replication_dbs = NULL;
123 : :
124 : 35 : shash_clear(&local_dbs);
125 [ + + ]: 35 : if (session) {
126 : 1 : jsonrpc_session_close(session);
127 : : }
128 : :
129 : 35 : session = jsonrpc_session_open(sync_from, true);
130 : 35 : session_seqno = UINT_MAX;
131 : 35 : state = RPL_S_INIT;
132 : 35 : }
133 : :
134 : : void
135 : 35 : replication_add_local_db(const char *database, struct ovsdb *db)
136 : : {
137 : 35 : shash_add_assert(&local_dbs, database, db);
138 : 35 : }
139 : :
140 : : void
141 : 542 : replication_run(void)
142 : : {
143 [ - + ]: 542 : if (!session) {
144 : 0 : return;
145 : : }
146 : :
147 : 542 : jsonrpc_session_run(session);
148 : :
149 [ + + ][ + + ]: 21492 : for (int i = 0; jsonrpc_session_is_connected(session) && i < 50; i++) {
150 : : struct jsonrpc_msg *msg;
151 : : unsigned int seqno;
152 : :
153 : 20950 : seqno = jsonrpc_session_get_seqno(session);
154 [ + + ][ - + ]: 20950 : if (seqno != session_seqno || state == RPL_S_INIT) {
155 : 35 : session_seqno = seqno;
156 : 35 : request_ids_clear();
157 : : struct jsonrpc_msg *request;
158 : 35 : request = jsonrpc_create_request("list_dbs",
159 : : json_array_create_empty(), NULL);
160 : 35 : request_ids_add(request->id, NULL);
161 : 35 : jsonrpc_session_send(session, request);
162 : :
163 : 35 : shash_destroy(replication_dbs);
164 : 35 : replication_dbs = replication_db_clone(&local_dbs);
165 : :
166 : 35 : state = RPL_S_DB_REQUESTED;
167 [ - + ]: 35 : VLOG_DBG("Send list_dbs request");
168 : : }
169 : :
170 : 20950 : msg = jsonrpc_session_recv(session);
171 [ + + ]: 20950 : if (!msg) {
172 : 20784 : continue;
173 : : }
174 : :
175 [ + + ][ + - ]: 166 : if (msg->type == JSONRPC_NOTIFY && state != RPL_S_ERR
176 [ + - ]: 61 : && !strcmp(msg->method, "update")) {
177 [ + - ]: 122 : if (msg->params->type == JSON_ARRAY
178 [ + - ]: 61 : && msg->params->u.array.n == 2
179 [ + - ]: 61 : && msg->params->u.array.elems[0]->type == JSON_STRING) {
180 : 61 : char *db_name = msg->params->u.array.elems[0]->u.string;
181 : 61 : struct ovsdb *db = find_db(db_name);
182 [ + - ]: 61 : if (db) {
183 : : struct ovsdb_error *error;
184 : 61 : error = process_notification(msg->params->u.array.elems[1],
185 : : db);
186 [ - + ]: 61 : if (error) {
187 : 0 : ovsdb_error_assert(error);
188 : 0 : state = RPL_S_ERR;
189 : : }
190 : : }
191 : : }
192 [ + - ]: 105 : } else if (msg->type == JSONRPC_REPLY) {
193 : : struct ovsdb *db;
194 [ - + ]: 105 : if (!request_ids_lookup_and_free(msg->id, &db)) {
195 [ # # ]: 0 : VLOG_WARN("received unexpected reply");
196 : 0 : goto next;
197 : : }
198 : :
199 [ + + + - : 105 : switch (state) {
- ]
200 : : case RPL_S_DB_REQUESTED:
201 [ - + ]: 35 : if (msg->result->type != JSON_ARRAY) {
202 : : struct ovsdb_error *error;
203 : 0 : error = ovsdb_error("list-dbs failed",
204 : : "list_dbs response is not array");
205 : 0 : ovsdb_error_assert(error);
206 : 0 : state = RPL_S_ERR;
207 : : } else {
208 : : size_t i;
209 [ + + ]: 70 : for (i = 0; i < msg->result->u.array.n; i++) {
210 : 35 : const struct json *name = msg->result->u.array.elems[i];
211 [ + - ]: 35 : if (name->type == JSON_STRING) {
212 : : /* Send one schema request for each remote DB. */
213 : 35 : const char *db_name = json_string(name);
214 : 35 : struct ovsdb *db = find_db(db_name);
215 [ + - ]: 35 : if (db) {
216 : 35 : struct jsonrpc_msg *request =
217 : 35 : jsonrpc_create_request(
218 : : "get_schema",
219 : : json_array_create_1(
220 : : json_string_create(db_name)),
221 : : NULL);
222 : :
223 : 35 : request_ids_add(request->id, db);
224 : 35 : jsonrpc_session_send(session, request);
225 : : }
226 : : }
227 : : }
228 [ - + ]: 35 : VLOG_DBG("Send schema requests");
229 : 35 : state = RPL_S_SCHEMA_REQUESTED;
230 : : }
231 : 35 : break;
232 : :
233 : : case RPL_S_SCHEMA_REQUESTED: {
234 : : struct ovsdb_schema *schema;
235 : : struct ovsdb_error *error;
236 : :
237 : 35 : error = ovsdb_schema_from_json(msg->result, &schema);
238 [ - + ]: 35 : if (error) {
239 : 0 : ovsdb_error_assert(error);
240 : 0 : state = RPL_S_ERR;
241 : : }
242 : :
243 [ - + ]: 35 : if (db != find_db(schema->name)) {
244 : : /* Unexpected schema. */
245 [ # # ]: 0 : VLOG_WARN("unexpected schema %s", schema->name);
246 : 0 : state = RPL_S_ERR;
247 [ - + ]: 35 : } else if (!ovsdb_schema_equal(schema, db->schema)) {
248 : : /* Schmea version mismatch. */
249 [ # # ]: 0 : VLOG_INFO("Schema version mismatch, %s not replicated",
250 : : schema->name);
251 : 0 : shash_find_and_delete(replication_dbs, schema->name);
252 : : }
253 : 35 : ovsdb_schema_destroy(schema);
254 : :
255 : : /* After receiving schemas, reset the local databases that
256 : : * will be monitored and send out monitor requests for them. */
257 [ + - ]: 35 : if (hmap_is_empty(&request_ids)) {
258 : : struct shash_node *node, *next;
259 : :
260 [ + + ][ - + ]: 70 : SHASH_FOR_EACH_SAFE (node, next, replication_dbs) {
[ + + ]
261 : 35 : db = node->data;
262 : 35 : struct ovsdb_error *error = reset_database(db);
263 [ - + ]: 35 : if (error) {
264 : 0 : const char *db_name = db->schema->name;
265 : 0 : shash_find_and_delete(replication_dbs, db_name);
266 : 0 : ovsdb_error_assert(error);
267 [ # # ]: 0 : VLOG_WARN("Failed to reset database, "
268 : : "%s not replicated.", db_name);
269 : : }
270 : : }
271 : :
272 [ - + ]: 35 : if (shash_is_empty(replication_dbs)) {
273 [ # # ]: 0 : VLOG_WARN("Nothing to replicate.");
274 : 0 : state = RPL_S_ERR;
275 : : } else {
276 [ + + ][ - + ]: 70 : SHASH_FOR_EACH (node, replication_dbs) {
277 : 35 : db = node->data;
278 : 35 : struct ovsdb *db = node->data;
279 : 35 : struct jsonrpc_msg *request =
280 : : create_monitor_request(db);
281 : :
282 : 35 : request_ids_add(request->id, db);
283 : 35 : jsonrpc_session_send(session, request);
284 [ - + ]: 35 : VLOG_DBG("Send monitor requests");
285 : 35 : state = RPL_S_MONITOR_REQUESTED;
286 : : }
287 : : }
288 : : }
289 : 35 : break;
290 : : }
291 : :
292 : : case RPL_S_MONITOR_REQUESTED: {
293 : : /* Reply to monitor requests. */
294 : : struct ovsdb_error *error;
295 : 35 : error = process_notification(msg->result, db);
296 [ - + ]: 35 : if (error) {
297 : 0 : ovsdb_error_assert(error);
298 : 0 : state = RPL_S_ERR;
299 : : } else {
300 : : /* Transition to replicating state after receiving
301 : : * all replies of "monitor" requests. */
302 [ + - ]: 35 : if (hmap_is_empty(&request_ids)) {
303 [ - + ]: 35 : VLOG_DBG("Listening to monitor updates");
304 : 35 : state = RPL_S_REPLICATING;
305 : : }
306 : : }
307 : 35 : break;
308 : : }
309 : :
310 : : case RPL_S_ERR:
311 : : /* Ignore all messages */
312 : 0 : break;
313 : :
314 : : case RPL_S_INIT:
315 : : case RPL_S_REPLICATING:
316 : : default:
317 : 105 : OVS_NOT_REACHED();
318 : : }
319 : : }
320 : : next:
321 : 166 : jsonrpc_msg_destroy(msg);
322 : : }
323 : : }
324 : :
325 : : void
326 : 542 : replication_wait(void)
327 : : {
328 [ + - ]: 542 : if (session) {
329 : 542 : jsonrpc_session_wait(session);
330 : 542 : jsonrpc_session_recv_wait(session);
331 : : }
332 : 542 : }
333 : :
334 : : /* Parse 'blacklist' to rebuild 'blacklist_tables'. If 'dryrun' is false, the
335 : : * current black list tables will be wiped out, regardless of whether
336 : : * 'blacklist' can be parsed. If 'dryrun' is true, only parses 'blacklist' and
337 : : * reports any errors, without modifying the blacklist.
338 : : *
339 : : * On error, returns the error string, which the caller is
340 : : * responsible for freeing. Returns NULL otherwise. */
341 : : char * OVS_WARN_UNUSED_RESULT
342 : 44 : set_blacklist_tables(const char *blacklist, bool dryrun)
343 : : {
344 : 44 : struct sset set = SSET_INITIALIZER(&set);
345 : 44 : char *err = NULL;
346 : :
347 [ + + ]: 44 : if (blacklist) {
348 : : const char *longname;
349 : :
350 [ + + ]: 16 : if (!dryrun) {
351 : : /* Can only add to an empty shash. */
352 : 15 : blacklist_tables_clear();
353 : : }
354 : :
355 : 16 : sset_from_delimited_string(&set, blacklist, " ,");
356 [ + - ][ + + ]: 33 : SSET_FOR_EACH (longname, &set) {
[ + + ]
357 : 17 : char *database = xstrdup(longname), *table = NULL;
358 : 17 : strtok_r(database, ":", &table);
359 [ + - ][ + + ]: 17 : if (table && !dryrun) {
360 : 16 : blacklist_tables_add(database, table);
361 : : }
362 : :
363 : 17 : free(database);
364 [ - + ]: 17 : if (!table) {
365 : 0 : err = xasprintf("Can't parse black list table: %s", longname);
366 : 0 : goto done;
367 : : }
368 : : }
369 : : }
370 : :
371 : : done:
372 : 44 : sset_destroy(&set);
373 [ - + ][ # # ]: 44 : if (err && !dryrun) {
374 : : /* On error, destroy the partially built 'blacklist_tables'. */
375 : 0 : blacklist_tables_clear();
376 : : }
377 : 44 : return err;
378 : : }
379 : :
380 : : char * OVS_WARN_UNUSED_RESULT
381 : 0 : get_blacklist_tables(void)
382 : : {
383 : : struct shash_node *node;
384 : 0 : struct sset set = SSET_INITIALIZER(&set);
385 : :
386 [ # # ][ # # ]: 0 : SHASH_FOR_EACH (node, &blacklist_tables) {
387 : 0 : const char *database = node->name;
388 : : const char *table;
389 : 0 : struct sset *tables = node->data;
390 : :
391 [ # # ][ # # ]: 0 : SSET_FOR_EACH (table, tables) {
[ # # ]
392 : 0 : sset_add_and_free(&set, xasprintf("%s:%s", database, table));
393 : : }
394 : : }
395 : :
396 : : /* Output the table list in an sorted order, so that
397 : : * the output string will not depend on the hash function
398 : : * that used to implement the hmap data structure. This is
399 : : * only useful for writting unit tests. */
400 : 0 : const char **sorted = sset_sort(&set);
401 : 0 : struct ds ds = DS_EMPTY_INITIALIZER;
402 : : size_t i;
403 [ # # ]: 0 : for (i = 0; i < sset_count(&set); i++) {
404 : 0 : ds_put_format(&ds, "%s,", sorted[i]);
405 : : }
406 : :
407 : 0 : ds_chomp(&ds, ',');
408 : :
409 : 0 : free(sorted);
410 : 0 : sset_destroy(&set);
411 : :
412 : 0 : return ds_steal_cstr(&ds);
413 : : }
414 : :
415 : : static void
416 : 1273 : blacklist_tables_clear(void)
417 : : {
418 : : struct shash_node *node;
419 [ + + ][ - + ]: 1287 : SHASH_FOR_EACH (node, &blacklist_tables) {
420 : 14 : struct sset *tables = node->data;
421 : 14 : sset_destroy(tables);
422 : : }
423 : :
424 : 1273 : shash_clear_free_data(&blacklist_tables);
425 : 1273 : }
426 : :
427 : : static void
428 : 16 : blacklist_tables_add(const char *database, const char *table)
429 : : {
430 : 16 : struct sset *tables = shash_find_data(&blacklist_tables, database);
431 : :
432 [ + + ]: 16 : if (!tables) {
433 : 15 : tables = xmalloc(sizeof *tables);
434 : 15 : sset_init(tables);
435 : 15 : shash_add(&blacklist_tables, database, tables);
436 : : }
437 : :
438 : 16 : sset_add(tables, table);
439 : 16 : }
440 : :
441 : : static bool
442 : 108 : blacklist_tables_find(const char *database, const char *table)
443 : : {
444 : 108 : struct sset *tables = shash_find_data(&blacklist_tables, database);
445 [ + + ][ + + ]: 108 : return tables && sset_contains(tables, table);
446 : : }
447 : :
448 : : void
449 : 1 : disconnect_active_server(void)
450 : : {
451 : 1 : jsonrpc_session_close(session);
452 : 1 : session = NULL;
453 : 1 : }
454 : :
455 : : void
456 : 1258 : replication_destroy(void)
457 : : {
458 : 1258 : blacklist_tables_clear();
459 : 1258 : shash_destroy(&blacklist_tables);
460 : :
461 [ + + ]: 1258 : if (sync_from) {
462 : 34 : free(sync_from);
463 : 34 : sync_from = NULL;
464 : : }
465 : :
466 : 1258 : request_ids_destroy();
467 : 1258 : shash_destroy(replication_dbs);
468 : 1258 : replication_dbs = NULL;
469 : :
470 : 1258 : shash_destroy(&local_dbs);
471 : 1258 : }
472 : :
473 : : static struct ovsdb *
474 : 131 : find_db(const char *db_name)
475 : : {
476 : 131 : return shash_find_data(replication_dbs, db_name);
477 : : }
478 : :
479 : : static struct ovsdb_error *
480 : 35 : reset_database(struct ovsdb *db)
481 : : {
482 : 35 : struct ovsdb_txn *txn = ovsdb_txn_create(db);
483 : : struct shash_node *table_node;
484 : :
485 [ + + ][ - + ]: 89 : SHASH_FOR_EACH (table_node, &db->tables) {
486 : : /* Delete all rows if the table is not blacklisted. */
487 [ + + ]: 54 : if (!blacklist_tables_find(db->schema->name, table_node->name)) {
488 : 47 : struct ovsdb_table *table = table_node->data;
489 : : struct ovsdb_row *row;
490 [ - + ][ - + ]: 47 : HMAP_FOR_EACH (row, hmap_node, &table->rows) {
491 : 0 : ovsdb_txn_row_delete(txn, row);
492 : : }
493 : : }
494 : : }
495 : :
496 : 35 : return ovsdb_txn_commit(txn, false);
497 : : }
498 : :
499 : : /* Create a monitor request for 'db'. The monitor request will include
500 : : * any tables from 'blacklisted_tables'
501 : : *
502 : : * Caller is responsible for disposing 'request'.
503 : : */
504 : : static struct jsonrpc_msg *
505 : 35 : create_monitor_request(struct ovsdb *db)
506 : : {
507 : : struct jsonrpc_msg *request;
508 : : struct json *monitor;
509 : 35 : struct ovsdb_schema *schema = db->schema;
510 : 35 : const char *db_name = schema->name;
511 : :
512 : 35 : struct json *monitor_request = json_object_create();
513 : 35 : size_t n = shash_count(&schema->tables);
514 : 35 : const struct shash_node **nodes = shash_sort(&schema->tables);
515 : :
516 [ + + ]: 89 : for (int j = 0; j < n; j++) {
517 : 54 : struct ovsdb_table_schema *table = nodes[j]->data;
518 : :
519 : : /* Monitor all tables not blacklisted. */
520 [ + + ]: 54 : if (!blacklist_tables_find(db_name, table->name)) {
521 : 47 : add_monitored_table(table, monitor_request);
522 : : }
523 : : }
524 : 35 : free(nodes);
525 : :
526 : : /* Create a monitor request. */
527 : 35 : monitor = json_array_create_3(
528 : : json_string_create(db_name),
529 : : json_string_create(db_name),
530 : : monitor_request);
531 : 35 : request = jsonrpc_create_request("monitor", monitor, NULL);
532 : :
533 : 35 : return request;
534 : : }
535 : :
536 : : static void
537 : 47 : add_monitored_table(struct ovsdb_table_schema *table,
538 : : struct json *monitor_request)
539 : : {
540 : : struct json *monitor_request_array;
541 : :
542 : 47 : monitor_request_array = json_array_create_empty();
543 : 47 : json_array_add(monitor_request_array, json_object_create());
544 : :
545 : 47 : json_object_put(monitor_request, table->name, monitor_request_array);
546 : 47 : }
547 : :
548 : :
549 : : static struct ovsdb_error *
550 : 96 : process_notification(struct json *table_updates, struct ovsdb *db)
551 : : {
552 : 96 : struct ovsdb_error *error = NULL;
553 : : struct ovsdb_txn *txn;
554 : :
555 [ + - ]: 96 : if (table_updates->type == JSON_OBJECT) {
556 : 96 : txn = ovsdb_txn_create(db);
557 : :
558 : : /* Process each table update. */
559 : : struct shash_node *node;
560 [ + + ][ - + ]: 163 : SHASH_FOR_EACH (node, json_object(table_updates)) {
561 : 67 : struct json *table_update = node->data;
562 [ + - ]: 67 : if (table_update) {
563 : 67 : error = process_table_update(table_update, node->name, db, txn);
564 [ - + ]: 67 : if (error) {
565 : 0 : break;
566 : : }
567 : : }
568 : : }
569 : :
570 [ - + ]: 96 : if (error) {
571 : 0 : ovsdb_txn_abort(txn);
572 : 0 : return error;
573 : : } else {
574 : : /* Commit transaction. */
575 : 96 : error = ovsdb_txn_commit(txn, false);
576 : : }
577 : : }
578 : :
579 : 96 : return error;
580 : : }
581 : :
582 : : static struct ovsdb_error *
583 : 67 : process_table_update(struct json *table_update, const char *table_name,
584 : : struct ovsdb *database, struct ovsdb_txn *txn)
585 : : {
586 : 67 : struct ovsdb_table *table = ovsdb_get_table(database, table_name);
587 [ - + ]: 67 : if (!table) {
588 : 0 : return ovsdb_error("unknown table", "unknown table %s", table_name);
589 : : }
590 : :
591 [ - + ]: 67 : if (table_update->type != JSON_OBJECT) {
592 : 0 : return ovsdb_error("Not a JSON object",
593 : : "<table-update> for table is not object");
594 : : }
595 : :
596 : : struct shash_node *node;
597 [ + + ][ - + ]: 157 : SHASH_FOR_EACH (node, json_object(table_update)) {
598 : 90 : struct json *row_update = node->data;
599 : : struct json *old, *new;
600 : :
601 [ - + ]: 90 : if (row_update->type != JSON_OBJECT) {
602 : 0 : return ovsdb_error("Not a JSON object",
603 : : "<row-update> is not object");
604 : : }
605 : :
606 : : struct uuid uuid;
607 [ - + ]: 90 : if (!uuid_from_string(&uuid, node->name)) {
608 : 0 : return ovsdb_syntax_error(table_update, "bad row UUID",
609 : : "<table-update> names must be UUIDs");
610 : : }
611 : :
612 : 90 : old = shash_find_data(json_object(row_update), "old");
613 : 90 : new = shash_find_data(json_object(row_update), "new");
614 : :
615 : : struct ovsdb_error *error;
616 : 90 : error = (!new ? execute_delete(txn, &uuid, table)
617 [ + + ]: 161 : : !old ? execute_insert(txn, &uuid, table, new)
618 [ + + ]: 71 : : execute_update(txn, &uuid, table, new));
619 [ - + ]: 90 : if (error) {
620 : 0 : return error;
621 : : }
622 : : }
623 : 67 : return NULL;
624 : : }
625 : :
626 : : static struct ovsdb_error *
627 : 60 : execute_insert(struct ovsdb_txn *txn, const struct uuid *row_uuid,
628 : : struct ovsdb_table *table, struct json *json_row)
629 : : {
630 : 60 : struct ovsdb_row *row = ovsdb_row_create(table);
631 : 60 : struct ovsdb_error *error = ovsdb_row_from_json(row, json_row, NULL, NULL);
632 [ + - ]: 60 : if (!error) {
633 : 60 : *ovsdb_row_get_uuid_rw(row) = *row_uuid;
634 : 60 : ovsdb_txn_row_insert(txn, row);
635 : : } else {
636 : : static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
637 [ # # ]: 0 : VLOG_WARN_RL(&rl, "cannot add existing row "UUID_FMT" to table %s",
638 : : UUID_ARGS(row_uuid), table->schema->name);
639 : 0 : ovsdb_row_destroy(row);
640 : : }
641 : :
642 : 60 : return error;
643 : : }
644 : :
645 : : static struct ovsdb_error *
646 : 19 : execute_delete(struct ovsdb_txn *txn, const struct uuid *row_uuid,
647 : : struct ovsdb_table *table)
648 : : {
649 : 19 : const struct ovsdb_row *row = ovsdb_table_get_row(table, row_uuid);
650 [ + - ]: 19 : if (row) {
651 : 19 : ovsdb_txn_row_delete(txn, row);
652 : : } else {
653 : : static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
654 [ # # ]: 0 : VLOG_WARN_RL(&rl, "cannot delete missing row "UUID_FMT" from table %s",
655 : : UUID_ARGS(row_uuid), table->schema->name);
656 : : }
657 : 19 : return NULL;
658 : : }
659 : :
660 : : static struct ovsdb_error *
661 : 11 : execute_update(struct ovsdb_txn *txn, const struct uuid *row_uuid,
662 : : struct ovsdb_table *table, struct json *json_row)
663 : : {
664 : 11 : const struct ovsdb_row *row = ovsdb_table_get_row(table, row_uuid);
665 [ - + ]: 11 : if (!row) {
666 : : static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
667 [ # # ]: 0 : VLOG_WARN_RL(&rl, "cannot modify missing row "UUID_FMT" in table %s",
668 : : UUID_ARGS(row_uuid), table->schema->name);
669 : 0 : return NULL;
670 : : }
671 : :
672 : 11 : struct ovsdb_column_set columns = OVSDB_COLUMN_SET_INITIALIZER;
673 : 11 : struct ovsdb_row *update = ovsdb_row_create(table);
674 : 11 : struct ovsdb_error *error = ovsdb_row_from_json(update, json_row,
675 : : NULL, &columns);
676 : :
677 [ + - ][ + - ]: 11 : if (!error && !ovsdb_row_equal_columns(row, update, &columns)) {
678 : 11 : ovsdb_row_update_columns(ovsdb_txn_row_modify(txn, row),
679 : : update, &columns);
680 : : }
681 : :
682 : 11 : ovsdb_column_set_destroy(&columns);
683 : 11 : ovsdb_row_destroy(update);
684 : 11 : return error;
685 : : }
686 : :
687 : : void
688 : 105 : request_ids_add(const struct json *id, struct ovsdb *db)
689 : : {
690 : 105 : struct request_ids_hmap_node *node = xmalloc(sizeof *node);
691 : :
692 : 105 : node->request_id = json_clone(id);
693 : 105 : node->db = db;
694 : 105 : hmap_insert(&request_ids, &node->hmap, json_hash(id, 0));
695 : 105 : }
696 : :
697 : : /* Look up 'id' from 'request_ids', if found, remove the found id from
698 : : * 'request_ids' and free its memory. If not found, 'request_ids' does
699 : : * not change. Sets '*db' to the database for the request (NULL if not
700 : : * found).
701 : : *
702 : : * Return true if 'id' is found, false otherwise.
703 : : */
704 : : bool
705 : 105 : request_ids_lookup_and_free(const struct json *id, struct ovsdb **db)
706 : : {
707 : : struct request_ids_hmap_node *node;
708 : :
709 [ + - ][ # # ]: 105 : HMAP_FOR_EACH_WITH_HASH (node, hmap, json_hash(id, 0), &request_ids) {
710 [ + - ]: 105 : if (json_equal(id, node->request_id)) {
711 : 105 : hmap_remove(&request_ids, &node->hmap);
712 : 105 : *db = node->db;
713 : 105 : json_destroy(node->request_id);
714 : 105 : free(node);
715 : 105 : return true;
716 : : }
717 : : }
718 : :
719 : 0 : *db = NULL;
720 : 0 : return false;
721 : : }
722 : :
723 : : static void
724 : 1293 : request_ids_destroy(void)
725 : : {
726 : : struct request_ids_hmap_node *node;
727 : :
728 [ + - ][ - + ]: 1293 : HMAP_FOR_EACH_POP (node, hmap, &request_ids) {
[ - + ]
729 : 0 : json_destroy(node->request_id);
730 : 0 : free(node);
731 : : }
732 : 1293 : hmap_destroy(&request_ids);
733 : 1293 : }
734 : :
735 : : void
736 : 35 : request_ids_clear(void)
737 : : {
738 : 35 : request_ids_destroy();
739 : 35 : hmap_init(&request_ids);
740 : 35 : }
741 : :
742 : : static struct shash *
743 : 35 : replication_db_clone(struct shash *dbs)
744 : : {
745 : 35 : struct shash *new = xmalloc(sizeof *new);
746 : 35 : shash_init(new);
747 : :
748 : : struct shash_node *node;
749 [ + + ][ - + ]: 70 : SHASH_FOR_EACH (node, dbs) {
750 : 35 : shash_add(new, node->name, node->data);
751 : : }
752 : :
753 : 35 : return new;
754 : : }
755 : :
756 : : /* Return true if replication just started or is ongoing.
757 : : * Return false if the connection failed, or the replication
758 : : * was not able to start. */
759 : : bool
760 : 542 : replication_is_alive(void)
761 : : {
762 [ + - ]: 542 : if (session) {
763 [ + - ][ + - ]: 542 : return jsonrpc_session_is_alive(session) && state != RPL_S_ERR;
764 : : }
765 : 0 : return false;
766 : : }
767 : :
768 : : /* Return the last error reported on a connection by 'session'. The
769 : : * return value is 0 if replication is not currently running, or
770 : : * if replication session has not encountered any error.
771 : : *
772 : : * Return a negative value if replication session has error, or the
773 : : * replication was not able to start. */
774 : : int
775 : 0 : replication_get_last_error(void)
776 : : {
777 : 0 : int err = 0;
778 : :
779 [ # # ]: 0 : if (session) {
780 : 0 : err = jsonrpc_session_get_last_error(session);
781 [ # # ]: 0 : if (!err) {
782 [ # # ]: 0 : err = (state == RPL_S_ERR) ? ENOENT : 0;
783 : : }
784 : : }
785 : :
786 : 0 : return err;
787 : : }
788 : :
789 : : char *
790 : 0 : replication_status(void)
791 : : {
792 [ # # ][ # # ]: 0 : bool alive = session && jsonrpc_session_is_alive(session);
793 : 0 : struct ds ds = DS_EMPTY_INITIALIZER;
794 : :
795 [ # # ]: 0 : if (alive) {
796 [ # # # # ]: 0 : switch(state) {
797 : : case RPL_S_INIT:
798 : : case RPL_S_DB_REQUESTED:
799 : : case RPL_S_SCHEMA_REQUESTED:
800 : : case RPL_S_MONITOR_REQUESTED:
801 : 0 : ds_put_format(&ds, "connecting: %s", sync_from);
802 : 0 : break;
803 : : case RPL_S_REPLICATING: {
804 : : struct shash_node *node;
805 : :
806 : 0 : ds_put_format(&ds, "replicating: %s\n", sync_from);
807 : 0 : ds_put_cstr(&ds, "database:");
808 [ # # ][ # # ]: 0 : SHASH_FOR_EACH (node, replication_dbs) {
809 : 0 : ds_put_format(&ds, " %s,", node->name);
810 : : }
811 : 0 : ds_chomp(&ds, ',');
812 : :
813 [ # # ]: 0 : if (!shash_is_empty(&blacklist_tables)) {
814 : 0 : ds_put_char(&ds, '\n');
815 : 0 : ds_put_cstr(&ds, "exclude: ");
816 : 0 : ds_put_and_free_cstr(&ds, get_blacklist_tables());
817 : : }
818 : 0 : break;
819 : : }
820 : : case RPL_S_ERR:
821 : 0 : ds_put_format(&ds, "Replication to (%s) failed\n", sync_from);
822 : 0 : break;
823 : : default:
824 : 0 : OVS_NOT_REACHED();
825 : : break;
826 : : }
827 : : } else {
828 : 0 : ds_put_format(&ds, "not connected to %s", sync_from);
829 : : }
830 : 0 : return ds_steal_cstr(&ds);
831 : : }
832 : :
833 : : void
834 : 0 : replication_usage(void)
835 : : {
836 : 0 : printf("\n\
837 : : Syncing options:\n\
838 : : --sync-from=SERVER sync DATABASE from active SERVER and start in\n\
839 : : backup mode (except with --active)\n\
840 : : --sync-exclude-tables=DB:TABLE,...\n\
841 : : exclude the TABLE in DB from syncing\n\
842 : : --active with --sync-from, start in active mode\n");
843 : 0 : }
|