LCOV - code coverage report
Current view: top level - ovsdb - replication.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 254 346 73.4 %
Date: 2016-09-14 01:02:56 Functions: 26 30 86.7 %
Branches: 124 243 51.0 %

           Branch data     Line data    Source code
       1                 :            : /*
       2                 :            :  * (c) Copyright 2016 Hewlett Packard Enterprise Development LP
       3                 :            :  * Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2016 Nicira, Inc.
       4                 :            :  *
       5                 :            :  * Licensed under the Apache License, Version 2.0 (the "License");
       6                 :            :  * you may not use this file except in compliance with the License.
       7                 :            :  * You may obtain a copy of the License at:
       8                 :            :  *
       9                 :            :  *     http://www.apache.org/licenses/LICENSE-2.0
      10                 :            :  *
      11                 :            :  * Unless required by applicable law or agreed to in writing, software
      12                 :            :  * distributed under the License is distributed on an "AS IS" BASIS,
      13                 :            :  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      14                 :            :  * See the License for the specific language governing permissions and
      15                 :            :  * limitations under the License.
      16                 :            :  */
      17                 :            : 
      18                 :            : #include <config.h>
      19                 :            : 
      20                 :            : 
      21                 :            : #include "condition.h"
      22                 :            : #include "jsonrpc.h"
      23                 :            : #include "openvswitch/dynamic-string.h"
      24                 :            : #include "openvswitch/hmap.h"
      25                 :            : #include "openvswitch/json.h"
      26                 :            : #include "openvswitch/vlog.h"
      27                 :            : #include "ovsdb-error.h"
      28                 :            : #include "ovsdb.h"
      29                 :            : #include "query.h"
      30                 :            : #include "replication.h"
      31                 :            : #include "row.h"
      32                 :            : #include "sset.h"
      33                 :            : #include "stream.h"
      34                 :            : #include "svec.h"
      35                 :            : #include "table.h"
      36                 :            : #include "transaction.h"
      37                 :            : 
      38                 :       2594 : VLOG_DEFINE_THIS_MODULE(replication);
      39                 :            : 
      40                 :            : static char *sync_from;
      41                 :            : static struct jsonrpc_session *session = NULL;
      42                 :            : static unsigned int session_seqno = UINT_MAX;
      43                 :            : 
      44                 :            : static struct jsonrpc_msg *create_monitor_request(struct ovsdb *db);
      45                 :            : static void add_monitored_table(struct ovsdb_table_schema *table,
      46                 :            :                                 struct json *monitor_requests);
      47                 :            : 
      48                 :            : static struct ovsdb_error *reset_database(struct ovsdb *db);
      49                 :            : 
      50                 :            : static struct ovsdb_error *process_notification(struct json *, struct ovsdb *);
      51                 :            : static struct ovsdb_error *process_table_update(struct json *table_update,
      52                 :            :                                                 const char *table_name,
      53                 :            :                                                 struct ovsdb *database,
      54                 :            :                                                 struct ovsdb_txn *txn);
      55                 :            : 
      56                 :            : static struct ovsdb_error *execute_insert(struct ovsdb_txn *txn,
      57                 :            :                                           const struct uuid *row_uuid,
      58                 :            :                                           struct ovsdb_table *table,
      59                 :            :                                           struct json *new);
      60                 :            : static struct ovsdb_error *execute_delete(struct ovsdb_txn *txn,
      61                 :            :                                           const struct uuid *row_uuid,
      62                 :            :                                           struct ovsdb_table *table);
      63                 :            : static struct ovsdb_error *execute_update(struct ovsdb_txn *txn,
      64                 :            :                                           const struct uuid *row_uuid,
      65                 :            :                                           struct ovsdb_table *table,
      66                 :            :                                           struct json *new);
      67                 :            : 
      68                 :            : /* Maps from db name to sset of table names. */
      69                 :            : static struct shash blacklist_tables = SHASH_INITIALIZER(&blacklist_tables);
      70                 :            : 
      71                 :            : static void blacklist_tables_clear(void);
      72                 :            : static void blacklist_tables_add(const char *database, const char *table);
      73                 :            : static bool blacklist_tables_find(const char *database, const char* table);
      74                 :            : 
      75                 :            : 
      76                 :            : /* Keep track of request IDs of all outstanding OVSDB requests. */
      77                 :            : static struct hmap request_ids = HMAP_INITIALIZER(&request_ids);
      78                 :            : 
      79                 :            : struct request_ids_hmap_node {
      80                 :            :     struct hmap_node hmap;
      81                 :            :     struct json *request_id;
      82                 :            :     struct ovsdb *db;          /* associated database */
      83                 :            : };
      84                 :            : void request_ids_add(const struct json *id, struct ovsdb *db);
      85                 :            : bool request_ids_lookup_and_free(const struct json *id, struct ovsdb **db);
      86                 :            : static void request_ids_destroy(void);
      87                 :            : void request_ids_clear(void);
      88                 :            : 
      89                 :            : enum ovsdb_replication_state {
      90                 :            :     RPL_S_INIT,
      91                 :            :     RPL_S_DB_REQUESTED,
      92                 :            :     RPL_S_SCHEMA_REQUESTED,
      93                 :            :     RPL_S_MONITOR_REQUESTED,
      94                 :            :     RPL_S_REPLICATING,
      95                 :            :     RPL_S_ERR /* Error, no longer replicating. */
      96                 :            : };
      97                 :            : static enum ovsdb_replication_state state;
      98                 :            : 
      99                 :            : 
     100                 :            : /* All DBs known to ovsdb-server.  The actual replication dbs are stored
     101                 :            :  * in 'replication dbs', which is a subset of all dbs and remote dbs whose
     102                 :            :  * schema matches.  */
     103                 :            : static struct shash local_dbs = SHASH_INITIALIZER(&local_dbs);
     104                 :            : static struct shash *replication_dbs = NULL;
     105                 :            : 
     106                 :            : static struct shash *replication_db_clone(struct shash *dbs);
     107                 :            : /* Find 'struct ovsdb' by name within 'replication_dbs' */
     108                 :            : static struct ovsdb* find_db(const char *db_name);
     109                 :            : 
     110                 :            : 
     111                 :            : void
     112                 :         35 : replication_init(const char *sync_from_, const char *exclude_tables)
     113                 :            : {
     114                 :         35 :     free(sync_from);
     115                 :         35 :     sync_from = xstrdup(sync_from_);
     116                 :         35 :     char *err = set_blacklist_tables(exclude_tables, false);
     117                 :            :     /* Caller should have verified that the 'exclude_tables' is
     118                 :            :      * parseable. An error here is unexpected. */
     119         [ -  + ]:         35 :     ovs_assert(!err);
     120                 :            : 
     121                 :         35 :     shash_destroy(replication_dbs);
     122                 :         35 :     replication_dbs = NULL;
     123                 :            : 
     124                 :         35 :     shash_clear(&local_dbs);
     125         [ +  + ]:         35 :     if (session) {
     126                 :          1 :         jsonrpc_session_close(session);
     127                 :            :     }
     128                 :            : 
     129                 :         35 :     session = jsonrpc_session_open(sync_from, true);
     130                 :         35 :     session_seqno = UINT_MAX;
     131                 :         35 :     state = RPL_S_INIT;
     132                 :         35 : }
     133                 :            : 
     134                 :            : void
     135                 :         35 : replication_add_local_db(const char *database, struct ovsdb *db)
     136                 :            : {
     137                 :         35 :     shash_add_assert(&local_dbs, database, db);
     138                 :         35 : }
     139                 :            : 
     140                 :            : void
     141                 :        542 : replication_run(void)
     142                 :            : {
     143         [ -  + ]:        542 :     if (!session) {
     144                 :          0 :         return;
     145                 :            :     }
     146                 :            : 
     147                 :        542 :     jsonrpc_session_run(session);
     148                 :            : 
     149 [ +  + ][ +  + ]:      21492 :     for (int i = 0; jsonrpc_session_is_connected(session) && i < 50; i++) {
     150                 :            :         struct jsonrpc_msg *msg;
     151                 :            :         unsigned int seqno;
     152                 :            : 
     153                 :      20950 :         seqno = jsonrpc_session_get_seqno(session);
     154 [ +  + ][ -  + ]:      20950 :         if (seqno != session_seqno || state == RPL_S_INIT) {
     155                 :         35 :             session_seqno = seqno;
     156                 :         35 :             request_ids_clear();
     157                 :            :             struct jsonrpc_msg *request;
     158                 :         35 :             request = jsonrpc_create_request("list_dbs",
     159                 :            :                                              json_array_create_empty(), NULL);
     160                 :         35 :             request_ids_add(request->id, NULL);
     161                 :         35 :             jsonrpc_session_send(session, request);
     162                 :            : 
     163                 :         35 :             shash_destroy(replication_dbs);
     164                 :         35 :             replication_dbs = replication_db_clone(&local_dbs);
     165                 :            : 
     166                 :         35 :             state = RPL_S_DB_REQUESTED;
     167         [ -  + ]:         35 :             VLOG_DBG("Send list_dbs request");
     168                 :            :         }
     169                 :            : 
     170                 :      20950 :         msg = jsonrpc_session_recv(session);
     171         [ +  + ]:      20950 :         if (!msg) {
     172                 :      20784 :             continue;
     173                 :            :         }
     174                 :            : 
     175 [ +  + ][ +  - ]:        166 :         if (msg->type == JSONRPC_NOTIFY && state != RPL_S_ERR
     176         [ +  - ]:         61 :             && !strcmp(msg->method, "update")) {
     177         [ +  - ]:        122 :             if (msg->params->type == JSON_ARRAY
     178         [ +  - ]:         61 :                 && msg->params->u.array.n == 2
     179         [ +  - ]:         61 :                 && msg->params->u.array.elems[0]->type == JSON_STRING) {
     180                 :         61 :                 char *db_name = msg->params->u.array.elems[0]->u.string;
     181                 :         61 :                 struct ovsdb *db = find_db(db_name);
     182         [ +  - ]:         61 :                 if (db) {
     183                 :            :                     struct ovsdb_error *error;
     184                 :         61 :                     error = process_notification(msg->params->u.array.elems[1],
     185                 :            :                                                  db);
     186         [ -  + ]:         61 :                     if (error) {
     187                 :          0 :                         ovsdb_error_assert(error);
     188                 :          0 :                         state = RPL_S_ERR;
     189                 :            :                     }
     190                 :            :                 }
     191                 :            :             }
     192         [ +  - ]:        105 :         } else if (msg->type == JSONRPC_REPLY) {
     193                 :            :             struct ovsdb *db;
     194         [ -  + ]:        105 :             if (!request_ids_lookup_and_free(msg->id, &db)) {
     195         [ #  # ]:          0 :                 VLOG_WARN("received unexpected reply");
     196                 :          0 :                 goto next;
     197                 :            :             }
     198                 :            : 
     199   [ +  +  +  -  :        105 :             switch (state) {
                      - ]
     200                 :            :             case RPL_S_DB_REQUESTED:
     201         [ -  + ]:         35 :                 if (msg->result->type != JSON_ARRAY) {
     202                 :            :                     struct ovsdb_error *error;
     203                 :          0 :                     error = ovsdb_error("list-dbs failed",
     204                 :            :                                         "list_dbs response is not array");
     205                 :          0 :                     ovsdb_error_assert(error);
     206                 :          0 :                     state = RPL_S_ERR;
     207                 :            :                 } else {
     208                 :            :                     size_t i;
     209         [ +  + ]:         70 :                     for (i = 0; i < msg->result->u.array.n; i++) {
     210                 :         35 :                         const struct json *name = msg->result->u.array.elems[i];
     211         [ +  - ]:         35 :                         if (name->type == JSON_STRING) {
     212                 :            :                             /* Send one schema request for each remote DB. */
     213                 :         35 :                             const char *db_name = json_string(name);
     214                 :         35 :                             struct ovsdb *db = find_db(db_name);
     215         [ +  - ]:         35 :                             if (db) {
     216                 :         35 :                                 struct jsonrpc_msg *request =
     217                 :         35 :                                     jsonrpc_create_request(
     218                 :            :                                         "get_schema",
     219                 :            :                                         json_array_create_1(
     220                 :            :                                             json_string_create(db_name)),
     221                 :            :                                         NULL);
     222                 :            : 
     223                 :         35 :                                 request_ids_add(request->id, db);
     224                 :         35 :                                 jsonrpc_session_send(session, request);
     225                 :            :                             }
     226                 :            :                         }
     227                 :            :                     }
     228         [ -  + ]:         35 :                     VLOG_DBG("Send schema requests");
     229                 :         35 :                     state = RPL_S_SCHEMA_REQUESTED;
     230                 :            :                 }
     231                 :         35 :                 break;
     232                 :            : 
     233                 :            :             case RPL_S_SCHEMA_REQUESTED: {
     234                 :            :                 struct ovsdb_schema *schema;
     235                 :            :                 struct ovsdb_error *error;
     236                 :            : 
     237                 :         35 :                 error = ovsdb_schema_from_json(msg->result, &schema);
     238         [ -  + ]:         35 :                 if (error) {
     239                 :          0 :                     ovsdb_error_assert(error);
     240                 :          0 :                     state = RPL_S_ERR;
     241                 :            :                 }
     242                 :            : 
     243         [ -  + ]:         35 :                 if (db != find_db(schema->name)) {
     244                 :            :                     /* Unexpected schema. */
     245         [ #  # ]:          0 :                     VLOG_WARN("unexpected schema %s", schema->name);
     246                 :          0 :                     state = RPL_S_ERR;
     247         [ -  + ]:         35 :                 } else if (!ovsdb_schema_equal(schema, db->schema)) {
     248                 :            :                     /* Schmea version mismatch. */
     249         [ #  # ]:          0 :                     VLOG_INFO("Schema version mismatch, %s not replicated",
     250                 :            :                               schema->name);
     251                 :          0 :                     shash_find_and_delete(replication_dbs, schema->name);
     252                 :            :                 }
     253                 :         35 :                 ovsdb_schema_destroy(schema);
     254                 :            : 
     255                 :            :                 /* After receiving schemas, reset the local databases that
     256                 :            :                  * will be monitored and send out monitor requests for them. */
     257         [ +  - ]:         35 :                 if (hmap_is_empty(&request_ids)) {
     258                 :            :                     struct shash_node *node, *next;
     259                 :            : 
     260 [ +  + ][ -  + ]:         70 :                     SHASH_FOR_EACH_SAFE (node, next, replication_dbs) {
                 [ +  + ]
     261                 :         35 :                         db = node->data;
     262                 :         35 :                         struct ovsdb_error *error = reset_database(db);
     263         [ -  + ]:         35 :                         if (error) {
     264                 :          0 :                             const char *db_name = db->schema->name;
     265                 :          0 :                             shash_find_and_delete(replication_dbs, db_name);
     266                 :          0 :                             ovsdb_error_assert(error);
     267         [ #  # ]:          0 :                             VLOG_WARN("Failed to reset database, "
     268                 :            :                                       "%s not replicated.", db_name);
     269                 :            :                         }
     270                 :            :                     }
     271                 :            : 
     272         [ -  + ]:         35 :                     if (shash_is_empty(replication_dbs)) {
     273         [ #  # ]:          0 :                         VLOG_WARN("Nothing to replicate.");
     274                 :          0 :                         state = RPL_S_ERR;
     275                 :            :                     } else {
     276 [ +  + ][ -  + ]:         70 :                         SHASH_FOR_EACH (node, replication_dbs) {
     277                 :         35 :                             db = node->data;
     278                 :         35 :                             struct ovsdb *db = node->data;
     279                 :         35 :                             struct jsonrpc_msg *request =
     280                 :            :                                 create_monitor_request(db);
     281                 :            : 
     282                 :         35 :                             request_ids_add(request->id, db);
     283                 :         35 :                             jsonrpc_session_send(session, request);
     284         [ -  + ]:         35 :                             VLOG_DBG("Send monitor requests");
     285                 :         35 :                             state = RPL_S_MONITOR_REQUESTED;
     286                 :            :                         }
     287                 :            :                     }
     288                 :            :                 }
     289                 :         35 :                 break;
     290                 :            :             }
     291                 :            : 
     292                 :            :             case RPL_S_MONITOR_REQUESTED: {
     293                 :            :                 /* Reply to monitor requests. */
     294                 :            :                 struct ovsdb_error *error;
     295                 :         35 :                 error = process_notification(msg->result, db);
     296         [ -  + ]:         35 :                 if (error) {
     297                 :          0 :                     ovsdb_error_assert(error);
     298                 :          0 :                     state = RPL_S_ERR;
     299                 :            :                 } else {
     300                 :            :                     /* Transition to replicating state after receiving
     301                 :            :                      * all replies of "monitor" requests. */
     302         [ +  - ]:         35 :                     if (hmap_is_empty(&request_ids)) {
     303         [ -  + ]:         35 :                         VLOG_DBG("Listening to monitor updates");
     304                 :         35 :                         state = RPL_S_REPLICATING;
     305                 :            :                     }
     306                 :            :                 }
     307                 :         35 :                 break;
     308                 :            :             }
     309                 :            : 
     310                 :            :             case RPL_S_ERR:
     311                 :            :                 /* Ignore all messages */
     312                 :          0 :                 break;
     313                 :            : 
     314                 :            :             case RPL_S_INIT:
     315                 :            :             case RPL_S_REPLICATING:
     316                 :            :             default:
     317                 :        105 :                 OVS_NOT_REACHED();
     318                 :            :             }
     319                 :            :         }
     320                 :            :     next:
     321                 :        166 :         jsonrpc_msg_destroy(msg);
     322                 :            :     }
     323                 :            : }
     324                 :            : 
     325                 :            : void
     326                 :        542 : replication_wait(void)
     327                 :            : {
     328         [ +  - ]:        542 :     if (session) {
     329                 :        542 :         jsonrpc_session_wait(session);
     330                 :        542 :         jsonrpc_session_recv_wait(session);
     331                 :            :     }
     332                 :        542 : }
     333                 :            : 
     334                 :            : /* Parse 'blacklist' to rebuild 'blacklist_tables'.  If 'dryrun' is false, the
     335                 :            :  * current black list tables will be wiped out, regardless of whether
     336                 :            :  * 'blacklist' can be parsed.  If 'dryrun' is true, only parses 'blacklist' and
     337                 :            :  * reports any errors, without modifying the blacklist.
     338                 :            :  *
     339                 :            :  * On error, returns the error string, which the caller is
     340                 :            :  * responsible for freeing. Returns NULL otherwise. */
     341                 :            : char * OVS_WARN_UNUSED_RESULT
     342                 :         44 : set_blacklist_tables(const char *blacklist, bool dryrun)
     343                 :            : {
     344                 :         44 :     struct sset set = SSET_INITIALIZER(&set);
     345                 :         44 :     char *err = NULL;
     346                 :            : 
     347         [ +  + ]:         44 :     if (blacklist) {
     348                 :            :         const char *longname;
     349                 :            : 
     350         [ +  + ]:         16 :         if (!dryrun) {
     351                 :            :             /* Can only add to an empty shash. */
     352                 :         15 :             blacklist_tables_clear();
     353                 :            :         }
     354                 :            : 
     355                 :         16 :         sset_from_delimited_string(&set, blacklist, " ,");
     356 [ +  - ][ +  + ]:         33 :         SSET_FOR_EACH (longname, &set) {
                 [ +  + ]
     357                 :         17 :             char *database = xstrdup(longname), *table = NULL;
     358                 :         17 :             strtok_r(database, ":", &table);
     359 [ +  - ][ +  + ]:         17 :             if (table && !dryrun) {
     360                 :         16 :                 blacklist_tables_add(database, table);
     361                 :            :             }
     362                 :            : 
     363                 :         17 :             free(database);
     364         [ -  + ]:         17 :             if (!table) {
     365                 :          0 :                 err = xasprintf("Can't parse black list table: %s", longname);
     366                 :          0 :                 goto done;
     367                 :            :             }
     368                 :            :         }
     369                 :            :     }
     370                 :            : 
     371                 :            : done:
     372                 :         44 :     sset_destroy(&set);
     373 [ -  + ][ #  # ]:         44 :     if (err && !dryrun) {
     374                 :            :         /* On error, destroy the partially built 'blacklist_tables'. */
     375                 :          0 :         blacklist_tables_clear();
     376                 :            :     }
     377                 :         44 :     return err;
     378                 :            : }
     379                 :            : 
     380                 :            : char * OVS_WARN_UNUSED_RESULT
     381                 :          0 : get_blacklist_tables(void)
     382                 :            : {
     383                 :            :     struct shash_node *node;
     384                 :          0 :     struct sset set = SSET_INITIALIZER(&set);
     385                 :            : 
     386 [ #  # ][ #  # ]:          0 :     SHASH_FOR_EACH (node, &blacklist_tables) {
     387                 :          0 :         const char *database = node->name;
     388                 :            :         const char *table;
     389                 :          0 :         struct sset *tables = node->data;
     390                 :            : 
     391 [ #  # ][ #  # ]:          0 :         SSET_FOR_EACH (table, tables) {
                 [ #  # ]
     392                 :          0 :             sset_add_and_free(&set, xasprintf("%s:%s", database, table));
     393                 :            :         }
     394                 :            :     }
     395                 :            : 
     396                 :            :     /* Output the table list in an sorted order, so that
     397                 :            :      * the output string will not depend on the hash function
     398                 :            :      * that used to implement the hmap data structure. This is
     399                 :            :      * only useful for writting unit tests.  */
     400                 :          0 :     const char **sorted = sset_sort(&set);
     401                 :          0 :     struct ds ds = DS_EMPTY_INITIALIZER;
     402                 :            :     size_t i;
     403         [ #  # ]:          0 :     for (i = 0; i < sset_count(&set); i++) {
     404                 :          0 :         ds_put_format(&ds, "%s,", sorted[i]);
     405                 :            :     }
     406                 :            : 
     407                 :          0 :     ds_chomp(&ds, ',');
     408                 :            : 
     409                 :          0 :     free(sorted);
     410                 :          0 :     sset_destroy(&set);
     411                 :            : 
     412                 :          0 :     return ds_steal_cstr(&ds);
     413                 :            : }
     414                 :            : 
     415                 :            : static void
     416                 :       1273 : blacklist_tables_clear(void)
     417                 :            : {
     418                 :            :     struct shash_node *node;
     419 [ +  + ][ -  + ]:       1287 :     SHASH_FOR_EACH (node, &blacklist_tables) {
     420                 :         14 :         struct sset *tables = node->data;
     421                 :         14 :         sset_destroy(tables);
     422                 :            :     }
     423                 :            : 
     424                 :       1273 :     shash_clear_free_data(&blacklist_tables);
     425                 :       1273 : }
     426                 :            : 
     427                 :            : static void
     428                 :         16 : blacklist_tables_add(const char *database, const char *table)
     429                 :            : {
     430                 :         16 :     struct sset *tables = shash_find_data(&blacklist_tables, database);
     431                 :            : 
     432         [ +  + ]:         16 :     if (!tables) {
     433                 :         15 :         tables = xmalloc(sizeof *tables);
     434                 :         15 :         sset_init(tables);
     435                 :         15 :         shash_add(&blacklist_tables, database, tables);
     436                 :            :     }
     437                 :            : 
     438                 :         16 :     sset_add(tables, table);
     439                 :         16 : }
     440                 :            : 
     441                 :            : static bool
     442                 :        108 : blacklist_tables_find(const char *database, const char *table)
     443                 :            : {
     444                 :        108 :     struct sset *tables = shash_find_data(&blacklist_tables, database);
     445 [ +  + ][ +  + ]:        108 :     return tables && sset_contains(tables, table);
     446                 :            : }
     447                 :            : 
     448                 :            : void
     449                 :          1 : disconnect_active_server(void)
     450                 :            : {
     451                 :          1 :     jsonrpc_session_close(session);
     452                 :          1 :     session = NULL;
     453                 :          1 : }
     454                 :            : 
     455                 :            : void
     456                 :       1258 : replication_destroy(void)
     457                 :            : {
     458                 :       1258 :     blacklist_tables_clear();
     459                 :       1258 :     shash_destroy(&blacklist_tables);
     460                 :            : 
     461         [ +  + ]:       1258 :     if (sync_from) {
     462                 :         34 :         free(sync_from);
     463                 :         34 :         sync_from = NULL;
     464                 :            :     }
     465                 :            : 
     466                 :       1258 :     request_ids_destroy();
     467                 :       1258 :     shash_destroy(replication_dbs);
     468                 :       1258 :     replication_dbs = NULL;
     469                 :            : 
     470                 :       1258 :     shash_destroy(&local_dbs);
     471                 :       1258 : }
     472                 :            : 
     473                 :            : static struct ovsdb *
     474                 :        131 : find_db(const char *db_name)
     475                 :            : {
     476                 :        131 :     return shash_find_data(replication_dbs, db_name);
     477                 :            : }
     478                 :            : 
     479                 :            : static struct ovsdb_error *
     480                 :         35 : reset_database(struct ovsdb *db)
     481                 :            : {
     482                 :         35 :     struct ovsdb_txn *txn = ovsdb_txn_create(db);
     483                 :            :     struct shash_node *table_node;
     484                 :            : 
     485 [ +  + ][ -  + ]:         89 :     SHASH_FOR_EACH (table_node, &db->tables) {
     486                 :            :         /* Delete all rows if the table is not blacklisted. */
     487         [ +  + ]:         54 :         if (!blacklist_tables_find(db->schema->name, table_node->name)) {
     488                 :         47 :             struct ovsdb_table *table = table_node->data;
     489                 :            :             struct ovsdb_row *row;
     490 [ -  + ][ -  + ]:         47 :             HMAP_FOR_EACH (row, hmap_node, &table->rows) {
     491                 :          0 :                 ovsdb_txn_row_delete(txn, row);
     492                 :            :             }
     493                 :            :         }
     494                 :            :     }
     495                 :            : 
     496                 :         35 :     return ovsdb_txn_commit(txn, false);
     497                 :            : }
     498                 :            : 
     499                 :            : /* Create a monitor request for 'db'. The monitor request will include
     500                 :            :  * any tables from 'blacklisted_tables'
     501                 :            :  *
     502                 :            :  * Caller is responsible for disposing 'request'.
     503                 :            :  */
     504                 :            : static struct jsonrpc_msg *
     505                 :         35 : create_monitor_request(struct ovsdb *db)
     506                 :            : {
     507                 :            :     struct jsonrpc_msg *request;
     508                 :            :     struct json *monitor;
     509                 :         35 :     struct ovsdb_schema *schema = db->schema;
     510                 :         35 :     const char *db_name = schema->name;
     511                 :            : 
     512                 :         35 :     struct json *monitor_request = json_object_create();
     513                 :         35 :     size_t n = shash_count(&schema->tables);
     514                 :         35 :     const struct shash_node **nodes = shash_sort(&schema->tables);
     515                 :            : 
     516         [ +  + ]:         89 :     for (int j = 0; j < n; j++) {
     517                 :         54 :         struct ovsdb_table_schema *table = nodes[j]->data;
     518                 :            : 
     519                 :            :         /* Monitor all tables not blacklisted. */
     520         [ +  + ]:         54 :         if (!blacklist_tables_find(db_name, table->name)) {
     521                 :         47 :             add_monitored_table(table, monitor_request);
     522                 :            :         }
     523                 :            :     }
     524                 :         35 :     free(nodes);
     525                 :            : 
     526                 :            :     /* Create a monitor request. */
     527                 :         35 :     monitor = json_array_create_3(
     528                 :            :         json_string_create(db_name),
     529                 :            :         json_string_create(db_name),
     530                 :            :         monitor_request);
     531                 :         35 :     request = jsonrpc_create_request("monitor", monitor, NULL);
     532                 :            : 
     533                 :         35 :     return request;
     534                 :            : }
     535                 :            : 
     536                 :            : static void
     537                 :         47 : add_monitored_table(struct ovsdb_table_schema *table,
     538                 :            :                     struct json *monitor_request)
     539                 :            : {
     540                 :            :     struct json *monitor_request_array;
     541                 :            : 
     542                 :         47 :     monitor_request_array = json_array_create_empty();
     543                 :         47 :     json_array_add(monitor_request_array, json_object_create());
     544                 :            : 
     545                 :         47 :     json_object_put(monitor_request, table->name, monitor_request_array);
     546                 :         47 : }
     547                 :            : 
     548                 :            : 
     549                 :            : static struct ovsdb_error *
     550                 :         96 : process_notification(struct json *table_updates, struct ovsdb *db)
     551                 :            : {
     552                 :         96 :     struct ovsdb_error *error = NULL;
     553                 :            :     struct ovsdb_txn *txn;
     554                 :            : 
     555         [ +  - ]:         96 :     if (table_updates->type == JSON_OBJECT) {
     556                 :         96 :         txn = ovsdb_txn_create(db);
     557                 :            : 
     558                 :            :         /* Process each table update. */
     559                 :            :         struct shash_node *node;
     560 [ +  + ][ -  + ]:        163 :         SHASH_FOR_EACH (node, json_object(table_updates)) {
     561                 :         67 :             struct json *table_update = node->data;
     562         [ +  - ]:         67 :             if (table_update) {
     563                 :         67 :                 error = process_table_update(table_update, node->name, db, txn);
     564         [ -  + ]:         67 :                 if (error) {
     565                 :          0 :                     break;
     566                 :            :                 }
     567                 :            :             }
     568                 :            :         }
     569                 :            : 
     570         [ -  + ]:         96 :         if (error) {
     571                 :          0 :             ovsdb_txn_abort(txn);
     572                 :          0 :             return error;
     573                 :            :         } else {
     574                 :            :             /* Commit transaction. */
     575                 :         96 :             error = ovsdb_txn_commit(txn, false);
     576                 :            :         }
     577                 :            :     }
     578                 :            : 
     579                 :         96 :     return error;
     580                 :            : }
     581                 :            : 
     582                 :            : static struct ovsdb_error *
     583                 :         67 : process_table_update(struct json *table_update, const char *table_name,
     584                 :            :                      struct ovsdb *database, struct ovsdb_txn *txn)
     585                 :            : {
     586                 :         67 :     struct ovsdb_table *table = ovsdb_get_table(database, table_name);
     587         [ -  + ]:         67 :     if (!table) {
     588                 :          0 :         return ovsdb_error("unknown table", "unknown table %s", table_name);
     589                 :            :     }
     590                 :            : 
     591         [ -  + ]:         67 :     if (table_update->type != JSON_OBJECT) {
     592                 :          0 :         return ovsdb_error("Not a JSON object",
     593                 :            :                            "<table-update> for table is not object");
     594                 :            :     }
     595                 :            : 
     596                 :            :     struct shash_node *node;
     597 [ +  + ][ -  + ]:        157 :     SHASH_FOR_EACH (node, json_object(table_update)) {
     598                 :         90 :         struct json *row_update = node->data;
     599                 :            :         struct json *old, *new;
     600                 :            : 
     601         [ -  + ]:         90 :         if (row_update->type != JSON_OBJECT) {
     602                 :          0 :             return ovsdb_error("Not a JSON object",
     603                 :            :                                "<row-update> is not object");
     604                 :            :         }
     605                 :            : 
     606                 :            :         struct uuid uuid;
     607         [ -  + ]:         90 :         if (!uuid_from_string(&uuid, node->name)) {
     608                 :          0 :             return ovsdb_syntax_error(table_update, "bad row UUID",
     609                 :            :                                       "<table-update> names must be UUIDs");
     610                 :            :         }
     611                 :            : 
     612                 :         90 :         old = shash_find_data(json_object(row_update), "old");
     613                 :         90 :         new = shash_find_data(json_object(row_update), "new");
     614                 :            : 
     615                 :            :         struct ovsdb_error *error;
     616                 :         90 :         error = (!new ? execute_delete(txn, &uuid, table)
     617         [ +  + ]:        161 :                  : !old ? execute_insert(txn, &uuid, table, new)
     618         [ +  + ]:         71 :                  : execute_update(txn, &uuid, table, new));
     619         [ -  + ]:         90 :         if (error) {
     620                 :          0 :             return error;
     621                 :            :         }
     622                 :            :     }
     623                 :         67 :     return NULL;
     624                 :            : }
     625                 :            : 
     626                 :            : static struct ovsdb_error *
     627                 :         60 : execute_insert(struct ovsdb_txn *txn, const struct uuid *row_uuid,
     628                 :            :                struct ovsdb_table *table, struct json *json_row)
     629                 :            : {
     630                 :         60 :     struct ovsdb_row *row = ovsdb_row_create(table);
     631                 :         60 :     struct ovsdb_error *error = ovsdb_row_from_json(row, json_row, NULL, NULL);
     632         [ +  - ]:         60 :     if (!error) {
     633                 :         60 :         *ovsdb_row_get_uuid_rw(row) = *row_uuid;
     634                 :         60 :         ovsdb_txn_row_insert(txn, row);
     635                 :            :     } else {
     636                 :            :         static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
     637         [ #  # ]:          0 :         VLOG_WARN_RL(&rl, "cannot add existing row "UUID_FMT" to table %s",
     638                 :            :                      UUID_ARGS(row_uuid), table->schema->name);
     639                 :          0 :         ovsdb_row_destroy(row);
     640                 :            :     }
     641                 :            : 
     642                 :         60 :     return error;
     643                 :            : }
     644                 :            : 
     645                 :            : static struct ovsdb_error *
     646                 :         19 : execute_delete(struct ovsdb_txn *txn, const struct uuid *row_uuid,
     647                 :            :                struct ovsdb_table *table)
     648                 :            : {
     649                 :         19 :     const struct ovsdb_row *row = ovsdb_table_get_row(table, row_uuid);
     650         [ +  - ]:         19 :     if (row) {
     651                 :         19 :         ovsdb_txn_row_delete(txn, row);
     652                 :            :     } else {
     653                 :            :         static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
     654         [ #  # ]:          0 :         VLOG_WARN_RL(&rl, "cannot delete missing row "UUID_FMT" from table %s",
     655                 :            :                      UUID_ARGS(row_uuid), table->schema->name);
     656                 :            :     }
     657                 :         19 :     return NULL;
     658                 :            : }
     659                 :            : 
     660                 :            : static struct ovsdb_error *
     661                 :         11 : execute_update(struct ovsdb_txn *txn, const struct uuid *row_uuid,
     662                 :            :                struct ovsdb_table *table, struct json *json_row)
     663                 :            : {
     664                 :         11 :     const struct ovsdb_row *row = ovsdb_table_get_row(table, row_uuid);
     665         [ -  + ]:         11 :     if (!row) {
     666                 :            :         static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
     667         [ #  # ]:          0 :         VLOG_WARN_RL(&rl, "cannot modify missing row "UUID_FMT" in table %s",
     668                 :            :                      UUID_ARGS(row_uuid), table->schema->name);
     669                 :          0 :         return NULL;
     670                 :            :     }
     671                 :            : 
     672                 :         11 :     struct ovsdb_column_set columns = OVSDB_COLUMN_SET_INITIALIZER;
     673                 :         11 :     struct ovsdb_row *update = ovsdb_row_create(table);
     674                 :         11 :     struct ovsdb_error *error = ovsdb_row_from_json(update, json_row,
     675                 :            :                                                     NULL, &columns);
     676                 :            : 
     677 [ +  - ][ +  - ]:         11 :     if (!error && !ovsdb_row_equal_columns(row, update, &columns)) {
     678                 :         11 :         ovsdb_row_update_columns(ovsdb_txn_row_modify(txn, row),
     679                 :            :                                  update, &columns);
     680                 :            :     }
     681                 :            : 
     682                 :         11 :     ovsdb_column_set_destroy(&columns);
     683                 :         11 :     ovsdb_row_destroy(update);
     684                 :         11 :     return error;
     685                 :            : }
     686                 :            : 
     687                 :            : void
     688                 :        105 : request_ids_add(const struct json *id, struct ovsdb *db)
     689                 :            : {
     690                 :        105 :     struct request_ids_hmap_node *node = xmalloc(sizeof *node);
     691                 :            : 
     692                 :        105 :     node->request_id = json_clone(id);
     693                 :        105 :     node->db = db;
     694                 :        105 :     hmap_insert(&request_ids, &node->hmap, json_hash(id, 0));
     695                 :        105 : }
     696                 :            : 
     697                 :            : /* Look up 'id' from 'request_ids', if found, remove the found id from
     698                 :            :  * 'request_ids' and free its memory. If not found, 'request_ids' does
     699                 :            :  * not change.  Sets '*db' to the database for the request (NULL if not
     700                 :            :  * found).
     701                 :            :  *
     702                 :            :  * Return true if 'id' is found, false otherwise.
     703                 :            :  */
     704                 :            : bool
     705                 :        105 : request_ids_lookup_and_free(const struct json *id, struct ovsdb **db)
     706                 :            : {
     707                 :            :     struct request_ids_hmap_node *node;
     708                 :            : 
     709 [ +  - ][ #  # ]:        105 :     HMAP_FOR_EACH_WITH_HASH (node, hmap, json_hash(id, 0), &request_ids) {
     710         [ +  - ]:        105 :         if (json_equal(id, node->request_id)) {
     711                 :        105 :             hmap_remove(&request_ids, &node->hmap);
     712                 :        105 :             *db = node->db;
     713                 :        105 :             json_destroy(node->request_id);
     714                 :        105 :             free(node);
     715                 :        105 :             return true;
     716                 :            :         }
     717                 :            :     }
     718                 :            : 
     719                 :          0 :     *db = NULL;
     720                 :          0 :     return false;
     721                 :            : }
     722                 :            : 
     723                 :            : static void
     724                 :       1293 : request_ids_destroy(void)
     725                 :            : {
     726                 :            :     struct request_ids_hmap_node *node;
     727                 :            : 
     728 [ +  - ][ -  + ]:       1293 :     HMAP_FOR_EACH_POP (node, hmap, &request_ids) {
                 [ -  + ]
     729                 :          0 :         json_destroy(node->request_id);
     730                 :          0 :         free(node);
     731                 :            :     }
     732                 :       1293 :     hmap_destroy(&request_ids);
     733                 :       1293 : }
     734                 :            : 
     735                 :            : void
     736                 :         35 : request_ids_clear(void)
     737                 :            : {
     738                 :         35 :     request_ids_destroy();
     739                 :         35 :     hmap_init(&request_ids);
     740                 :         35 : }
     741                 :            : 
     742                 :            : static struct shash *
     743                 :         35 : replication_db_clone(struct shash *dbs)
     744                 :            : {
     745                 :         35 :     struct shash *new = xmalloc(sizeof *new);
     746                 :         35 :     shash_init(new);
     747                 :            : 
     748                 :            :     struct shash_node *node;
     749 [ +  + ][ -  + ]:         70 :     SHASH_FOR_EACH (node, dbs) {
     750                 :         35 :         shash_add(new, node->name, node->data);
     751                 :            :     }
     752                 :            : 
     753                 :         35 :     return new;
     754                 :            : }
     755                 :            : 
     756                 :            : /* Return true if replication just started or is ongoing.
     757                 :            :  * Return false if the connection failed, or the replication
     758                 :            :  * was not able to start. */
     759                 :            : bool
     760                 :        542 : replication_is_alive(void)
     761                 :            : {
     762         [ +  - ]:        542 :     if (session) {
     763 [ +  - ][ +  - ]:        542 :         return jsonrpc_session_is_alive(session) && state != RPL_S_ERR;
     764                 :            :     }
     765                 :          0 :     return false;
     766                 :            : }
     767                 :            : 
     768                 :            : /* Return the last error reported on a connection by 'session'. The
     769                 :            :  * return value is 0 if replication is not currently running, or
     770                 :            :  * if replication session has not encountered any error.
     771                 :            :  *
     772                 :            :  * Return a negative value if replication session has error, or the
     773                 :            :  * replication was not able to start.  */
     774                 :            : int
     775                 :          0 : replication_get_last_error(void)
     776                 :            : {
     777                 :          0 :     int err = 0;
     778                 :            : 
     779         [ #  # ]:          0 :     if (session) {
     780                 :          0 :         err = jsonrpc_session_get_last_error(session);
     781         [ #  # ]:          0 :         if (!err) {
     782         [ #  # ]:          0 :             err = (state == RPL_S_ERR) ? ENOENT : 0;
     783                 :            :         }
     784                 :            :     }
     785                 :            : 
     786                 :          0 :     return err;
     787                 :            : }
     788                 :            : 
     789                 :            : char *
     790                 :          0 : replication_status(void)
     791                 :            : {
     792 [ #  # ][ #  # ]:          0 :     bool alive = session && jsonrpc_session_is_alive(session);
     793                 :          0 :     struct ds ds = DS_EMPTY_INITIALIZER;
     794                 :            : 
     795         [ #  # ]:          0 :     if (alive) {
     796   [ #  #  #  # ]:          0 :         switch(state) {
     797                 :            :         case RPL_S_INIT:
     798                 :            :         case RPL_S_DB_REQUESTED:
     799                 :            :         case RPL_S_SCHEMA_REQUESTED:
     800                 :            :         case RPL_S_MONITOR_REQUESTED:
     801                 :          0 :             ds_put_format(&ds, "connecting: %s", sync_from);
     802                 :          0 :             break;
     803                 :            :         case RPL_S_REPLICATING: {
     804                 :            :             struct shash_node *node;
     805                 :            : 
     806                 :          0 :             ds_put_format(&ds, "replicating: %s\n", sync_from);
     807                 :          0 :             ds_put_cstr(&ds, "database:");
     808 [ #  # ][ #  # ]:          0 :             SHASH_FOR_EACH (node, replication_dbs) {
     809                 :          0 :                 ds_put_format(&ds, " %s,", node->name);
     810                 :            :             }
     811                 :          0 :             ds_chomp(&ds, ',');
     812                 :            : 
     813         [ #  # ]:          0 :             if (!shash_is_empty(&blacklist_tables)) {
     814                 :          0 :                 ds_put_char(&ds, '\n');
     815                 :          0 :                 ds_put_cstr(&ds, "exclude: ");
     816                 :          0 :                 ds_put_and_free_cstr(&ds, get_blacklist_tables());
     817                 :            :             }
     818                 :          0 :             break;
     819                 :            :         }
     820                 :            :         case RPL_S_ERR:
     821                 :          0 :             ds_put_format(&ds, "Replication to (%s) failed\n", sync_from);
     822                 :          0 :             break;
     823                 :            :         default:
     824                 :          0 :             OVS_NOT_REACHED();
     825                 :            :             break;
     826                 :            :         }
     827                 :            :     } else {
     828                 :          0 :         ds_put_format(&ds, "not connected to %s", sync_from);
     829                 :            :     }
     830                 :          0 :     return ds_steal_cstr(&ds);
     831                 :            : }
     832                 :            : 
     833                 :            : void
     834                 :          0 : replication_usage(void)
     835                 :            : {
     836                 :          0 :     printf("\n\
     837                 :            : Syncing options:\n\
     838                 :            :   --sync-from=SERVER      sync DATABASE from active SERVER and start in\n\
     839                 :            :                           backup mode (except with --active)\n\
     840                 :            :   --sync-exclude-tables=DB:TABLE,...\n\
     841                 :            :                           exclude the TABLE in DB from syncing\n\
     842                 :            :   --active                with --sync-from, start in active mode\n");
     843                 :          0 : }

Generated by: LCOV version 1.12