LCOV - code coverage report
Current view: top level - lib - jsonrpc.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 436 516 84.5 %
Date: 2016-09-14 01:02:56 Functions: 53 56 94.6 %
Branches: 191 274 69.7 %

           Branch data     Line data    Source code
       1                 :            : /*
       2                 :            :  * Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2015, 2016 Nicira, Inc.
       3                 :            :  *
       4                 :            :  * Licensed under the Apache License, Version 2.0 (the "License");
       5                 :            :  * you may not use this file except in compliance with the License.
       6                 :            :  * You may obtain a copy of the License at:
       7                 :            :  *
       8                 :            :  *     http://www.apache.org/licenses/LICENSE-2.0
       9                 :            :  *
      10                 :            :  * Unless required by applicable law or agreed to in writing, software
      11                 :            :  * distributed under the License is distributed on an "AS IS" BASIS,
      12                 :            :  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      13                 :            :  * See the License for the specific language governing permissions and
      14                 :            :  * limitations under the License.
      15                 :            :  */
      16                 :            : 
      17                 :            : #include <config.h>
      18                 :            : 
      19                 :            : #include "jsonrpc.h"
      20                 :            : 
      21                 :            : #include <errno.h>
      22                 :            : 
      23                 :            : #include "byteq.h"
      24                 :            : #include "openvswitch/dynamic-string.h"
      25                 :            : #include "fatal-signal.h"
      26                 :            : #include "openvswitch/json.h"
      27                 :            : #include "openvswitch/list.h"
      28                 :            : #include "openvswitch/ofpbuf.h"
      29                 :            : #include "ovs-thread.h"
      30                 :            : #include "poll-loop.h"
      31                 :            : #include "reconnect.h"
      32                 :            : #include "stream.h"
      33                 :            : #include "timeval.h"
      34                 :            : #include "openvswitch/vlog.h"
      35                 :            : 
      36                 :      53956 : VLOG_DEFINE_THIS_MODULE(jsonrpc);
      37                 :            : 
      38                 :            : struct jsonrpc {
      39                 :            :     struct stream *stream;
      40                 :            :     char *name;
      41                 :            :     int status;
      42                 :            : 
      43                 :            :     /* Input. */
      44                 :            :     struct byteq input;
      45                 :            :     uint8_t input_buffer[512];
      46                 :            :     struct json_parser *parser;
      47                 :            : 
      48                 :            :     /* Output. */
      49                 :            :     struct ovs_list output;     /* Contains "struct ofpbuf"s. */
      50                 :            :     size_t output_count;        /* Number of elements in "output". */
      51                 :            :     size_t backlog;
      52                 :            : };
      53                 :            : 
      54                 :            : /* Rate limit for error messages. */
      55                 :            : static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
      56                 :            : 
      57                 :            : static struct jsonrpc_msg *jsonrpc_parse_received_message(struct jsonrpc *);
      58                 :            : static void jsonrpc_cleanup(struct jsonrpc *);
      59                 :            : static void jsonrpc_error(struct jsonrpc *, int error);
      60                 :            : 
      61                 :            : /* This is just the same as stream_open() except that it uses the default
      62                 :            :  * JSONRPC port if none is specified. */
      63                 :            : int
      64                 :       8130 : jsonrpc_stream_open(const char *name, struct stream **streamp, uint8_t dscp)
      65                 :            : {
      66                 :       8130 :     return stream_open_with_default_port(name, OVSDB_PORT, streamp, dscp);
      67                 :            : }
      68                 :            : 
      69                 :            : /* This is just the same as pstream_open() except that it uses the default
      70                 :            :  * JSONRPC port if none is specified. */
      71                 :            : int
      72                 :       1297 : jsonrpc_pstream_open(const char *name, struct pstream **pstreamp, uint8_t dscp)
      73                 :            : {
      74                 :       1297 :     return pstream_open_with_default_port(name, OVSDB_PORT, pstreamp, dscp);
      75                 :            : }
      76                 :            : 
      77                 :            : /* Returns a new JSON-RPC stream that uses 'stream' for input and output.  The
      78                 :            :  * new jsonrpc object takes ownership of 'stream'. */
      79                 :            : struct jsonrpc *
      80                 :      37850 : jsonrpc_open(struct stream *stream)
      81                 :            : {
      82                 :            :     struct jsonrpc *rpc;
      83                 :            : 
      84         [ -  + ]:      37850 :     ovs_assert(stream != NULL);
      85                 :            : 
      86                 :      37850 :     rpc = xzalloc(sizeof *rpc);
      87                 :      37850 :     rpc->name = xstrdup(stream_get_name(stream));
      88                 :      37850 :     rpc->stream = stream;
      89                 :      37850 :     byteq_init(&rpc->input, rpc->input_buffer, sizeof rpc->input_buffer);
      90                 :      37850 :     ovs_list_init(&rpc->output);
      91                 :            : 
      92                 :      37850 :     return rpc;
      93                 :            : }
      94                 :            : 
      95                 :            : /* Destroys 'rpc', closing the stream on which it is based, and frees its
      96                 :            :  * memory. */
      97                 :            : void
      98                 :      45953 : jsonrpc_close(struct jsonrpc *rpc)
      99                 :            : {
     100         [ +  + ]:      45953 :     if (rpc) {
     101                 :      37819 :         jsonrpc_cleanup(rpc);
     102                 :      37819 :         free(rpc->name);
     103                 :      37819 :         free(rpc);
     104                 :            :     }
     105                 :      45953 : }
     106                 :            : 
     107                 :            : /* Performs periodic maintenance on 'rpc', such as flushing output buffers. */
     108                 :            : void
     109                 :     381513 : jsonrpc_run(struct jsonrpc *rpc)
     110                 :            : {
     111         [ +  + ]:     381513 :     if (rpc->status) {
     112                 :       8155 :         return;
     113                 :            :     }
     114                 :            : 
     115                 :     373358 :     stream_run(rpc->stream);
     116         [ +  + ]:     468012 :     while (!ovs_list_is_empty(&rpc->output)) {
     117                 :      95321 :         struct ofpbuf *buf = ofpbuf_from_list(rpc->output.next);
     118                 :            :         int retval;
     119                 :            : 
     120                 :      95321 :         retval = stream_send(rpc->stream, buf->data, buf->size);
     121         [ +  + ]:      95321 :         if (retval >= 0) {
     122                 :      94654 :             rpc->backlog -= retval;
     123                 :      94654 :             ofpbuf_pull(buf, retval);
     124         [ +  + ]:      94654 :             if (!buf->size) {
     125                 :      94279 :                 ovs_list_remove(&buf->list_node);
     126                 :      94279 :                 rpc->output_count--;
     127                 :      94654 :                 ofpbuf_delete(buf);
     128                 :            :             }
     129                 :            :         } else {
     130         [ +  + ]:        667 :             if (retval != -EAGAIN) {
     131         [ +  - ]:         47 :                 VLOG_WARN_RL(&rl, "%s: send error: %s",
     132                 :            :                              rpc->name, ovs_strerror(-retval));
     133                 :         47 :                 jsonrpc_error(rpc, -retval);
     134                 :            :             }
     135                 :        667 :             break;
     136                 :            :         }
     137                 :            :     }
     138                 :            : }
     139                 :            : 
     140                 :            : /* Arranges for the poll loop to wake up when 'rpc' needs to perform
     141                 :            :  * maintenance activities. */
     142                 :            : void
     143                 :     253440 : jsonrpc_wait(struct jsonrpc *rpc)
     144                 :            : {
     145         [ +  + ]:     253440 :     if (!rpc->status) {
     146                 :     245285 :         stream_run_wait(rpc->stream);
     147         [ +  + ]:     245285 :         if (!ovs_list_is_empty(&rpc->output)) {
     148                 :        620 :             stream_send_wait(rpc->stream);
     149                 :            :         }
     150                 :            :     }
     151                 :     253440 : }
     152                 :            : 
     153                 :            : /*
     154                 :            :  * Returns the current status of 'rpc'.  The possible return values are:
     155                 :            :  * - 0: no error yet
     156                 :            :  * - >0: errno value
     157                 :            :  * - EOF: end of file (remote end closed connection; not necessarily an error).
     158                 :            :  *
     159                 :            :  * When this functions nonzero, 'rpc' is effectively out of commission.  'rpc'
     160                 :            :  * will not receive any more messages and any further messages that one
     161                 :            :  * attempts to send with 'rpc' will be discarded.  The caller can keep 'rpc'
     162                 :            :  * around as long as it wants, but it's not going to provide any more useful
     163                 :            :  * services.
     164                 :            :  */
     165                 :            : int
     166                 :     475418 : jsonrpc_get_status(const struct jsonrpc *rpc)
     167                 :            : {
     168                 :     475418 :     return rpc->status;
     169                 :            : }
     170                 :            : 
     171                 :            : /* Returns the number of bytes buffered by 'rpc' to be written to the
     172                 :            :  * underlying stream.  Always returns 0 if 'rpc' has encountered an error or if
     173                 :            :  * the remote end closed the connection. */
     174                 :            : size_t
     175                 :     687907 : jsonrpc_get_backlog(const struct jsonrpc *rpc)
     176                 :            : {
     177         [ +  + ]:     687907 :     return rpc->status ? 0 : rpc->backlog;
     178                 :            : }
     179                 :            : 
     180                 :            : /* Returns the number of bytes that have been received on 'rpc''s underlying
     181                 :            :  * stream.  (The value wraps around if it exceeds UINT_MAX.) */
     182                 :            : unsigned int
     183                 :     584100 : jsonrpc_get_received_bytes(const struct jsonrpc *rpc)
     184                 :            : {
     185                 :     584100 :     return rpc->input.head;
     186                 :            : }
     187                 :            : 
     188                 :            : /* Returns 'rpc''s name, that is, the name returned by stream_get_name() for
     189                 :            :  * the stream underlying 'rpc' when 'rpc' was created. */
     190                 :            : const char *
     191                 :       8215 : jsonrpc_get_name(const struct jsonrpc *rpc)
     192                 :            : {
     193                 :       8215 :     return rpc->name;
     194                 :            : }
     195                 :            : 
     196                 :            : static void
     197                 :     188728 : jsonrpc_log_msg(const struct jsonrpc *rpc, const char *title,
     198                 :            :                 const struct jsonrpc_msg *msg)
     199                 :            : {
     200         [ +  + ]:     188728 :     if (VLOG_IS_DBG_ENABLED()) {
     201                 :        490 :         struct ds s = DS_EMPTY_INITIALIZER;
     202         [ +  + ]:        490 :         if (msg->method) {
     203                 :        283 :             ds_put_format(&s, ", method=\"%s\"", msg->method);
     204                 :            :         }
     205         [ +  + ]:        490 :         if (msg->params) {
     206                 :        283 :             ds_put_cstr(&s, ", params=");
     207                 :        283 :             json_to_ds(msg->params, 0, &s);
     208                 :            :         }
     209         [ +  + ]:        490 :         if (msg->result) {
     210                 :        204 :             ds_put_cstr(&s, ", result=");
     211                 :        204 :             json_to_ds(msg->result, 0, &s);
     212                 :            :         }
     213         [ +  + ]:        490 :         if (msg->error) {
     214                 :          3 :             ds_put_cstr(&s, ", error=");
     215                 :          3 :             json_to_ds(msg->error, 0, &s);
     216                 :            :         }
     217         [ +  + ]:        490 :         if (msg->id) {
     218                 :        412 :             ds_put_cstr(&s, ", id=");
     219                 :        412 :             json_to_ds(msg->id, 0, &s);
     220                 :            :         }
     221         [ +  - ]:        490 :         VLOG_DBG("%s: %s %s%s", rpc->name, title,
     222                 :            :                  jsonrpc_msg_type_to_string(msg->type), ds_cstr(&s));
     223                 :        490 :         ds_destroy(&s);
     224                 :            :     }
     225                 :     188728 : }
     226                 :            : 
     227                 :            : /* Schedules 'msg' to be sent on 'rpc' and returns 'rpc''s status (as with
     228                 :            :  * jsonrpc_get_status()).
     229                 :            :  *
     230                 :            :  * If 'msg' cannot be sent immediately, it is appended to a buffer.  The caller
     231                 :            :  * is responsible for ensuring that the amount of buffered data is somehow
     232                 :            :  * limited.  (jsonrpc_get_backlog() returns the amount of data currently
     233                 :            :  * buffered in 'rpc'.)
     234                 :            :  *
     235                 :            :  * Always takes ownership of 'msg', regardless of success. */
     236                 :            : int
     237                 :      94326 : jsonrpc_send(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
     238                 :            : {
     239                 :            :     struct ofpbuf *buf;
     240                 :            :     struct json *json;
     241                 :      94326 :     struct ds ds = DS_EMPTY_INITIALIZER;
     242                 :            :     size_t length;
     243                 :            : 
     244         [ -  + ]:      94326 :     if (rpc->status) {
     245                 :          0 :         jsonrpc_msg_destroy(msg);
     246                 :          0 :         return rpc->status;
     247                 :            :     }
     248                 :            : 
     249                 :      94326 :     jsonrpc_log_msg(rpc, "send", msg);
     250                 :            : 
     251                 :      94326 :     json = jsonrpc_msg_to_json(msg);
     252                 :      94326 :     json_to_ds(json, 0, &ds);
     253                 :      94326 :     length = ds.length;
     254                 :      94326 :     json_destroy(json);
     255                 :            : 
     256                 :      94326 :     buf = xmalloc(sizeof *buf);
     257                 :      94326 :     ofpbuf_use_ds(buf, &ds);
     258                 :      94326 :     ovs_list_push_back(&rpc->output, &buf->list_node);
     259                 :      94326 :     rpc->output_count++;
     260                 :      94326 :     rpc->backlog += length;
     261                 :            : 
     262         [ -  + ]:      94326 :     if (rpc->output_count >= 50) {
     263         [ #  # ]:          0 :         VLOG_INFO_RL(&rl, "excessive sending backlog, jsonrpc: %s, num of"
     264                 :            :                      " msgs: %"PRIuSIZE", backlog: %"PRIuSIZE".", rpc->name,
     265                 :            :                      rpc->output_count, rpc->backlog);
     266                 :            :     }
     267                 :            : 
     268         [ +  - ]:      94326 :     if (rpc->backlog == length) {
     269                 :      94326 :         jsonrpc_run(rpc);
     270                 :            :     }
     271                 :      94326 :     return rpc->status;
     272                 :            : }
     273                 :            : 
     274                 :            : /* Attempts to receive a message from 'rpc'.
     275                 :            :  *
     276                 :            :  * If successful, stores the received message in '*msgp' and returns 0.  The
     277                 :            :  * caller takes ownership of '*msgp' and must eventually destroy it with
     278                 :            :  * jsonrpc_msg_destroy().
     279                 :            :  *
     280                 :            :  * Otherwise, stores NULL in '*msgp' and returns one of the following:
     281                 :            :  *
     282                 :            :  *   - EAGAIN: No message has been received.
     283                 :            :  *
     284                 :            :  *   - EOF: The remote end closed the connection gracefully.
     285                 :            :  *
     286                 :            :  *   - Otherwise an errno value that represents a JSON-RPC protocol violation
     287                 :            :  *     or another error fatal to the connection.  'rpc' will not send or
     288                 :            :  *     receive any more messages.
     289                 :            :  */
     290                 :            : int
     291                 :     532275 : jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
     292                 :            : {
     293                 :            :     int i;
     294                 :            : 
     295                 :     532275 :     *msgp = NULL;
     296         [ +  + ]:     532275 :     if (rpc->status) {
     297                 :       1664 :         return rpc->status;
     298                 :            :     }
     299                 :            : 
     300         [ +  + ]:     908752 :     for (i = 0; i < 50; i++) {
     301                 :            :         size_t n, used;
     302                 :            : 
     303                 :            :         /* Fill our input buffer if it's empty. */
     304         [ +  + ]:     905665 :         if (byteq_is_empty(&rpc->input)) {
     305                 :            :             size_t chunk;
     306                 :            :             int retval;
     307                 :            : 
     308                 :     890319 :             chunk = byteq_headroom(&rpc->input);
     309                 :     890319 :             retval = stream_recv(rpc->stream, byteq_head(&rpc->input), chunk);
     310         [ +  + ]:     890319 :             if (retval < 0) {
     311         [ +  + ]:     416411 :                 if (retval == -EAGAIN) {
     312                 :     416400 :                     return EAGAIN;
     313                 :            :                 } else {
     314         [ +  - ]:         11 :                     VLOG_WARN_RL(&rl, "%s: receive error: %s",
     315                 :            :                                  rpc->name, ovs_strerror(-retval));
     316                 :         11 :                     jsonrpc_error(rpc, -retval);
     317                 :         11 :                     return rpc->status;
     318                 :            :                 }
     319         [ +  + ]:     473908 :             } else if (retval == 0) {
     320                 :      16711 :                 jsonrpc_error(rpc, EOF);
     321                 :      16711 :                 return EOF;
     322                 :            :             }
     323                 :     457197 :             byteq_advance_head(&rpc->input, retval);
     324                 :            :         }
     325                 :            : 
     326                 :            :         /* We have some input.  Feed it into the JSON parser. */
     327         [ +  + ]:     472543 :         if (!rpc->parser) {
     328                 :      94402 :             rpc->parser = json_parser_create(0);
     329                 :            :         }
     330                 :     472543 :         n = byteq_tailroom(&rpc->input);
     331                 :     472543 :         used = json_parser_feed(rpc->parser,
     332                 :     472543 :                                 (char *) byteq_tail(&rpc->input), n);
     333                 :     472543 :         byteq_advance_tail(&rpc->input, used);
     334                 :            : 
     335                 :            :         /* If we have complete JSON, attempt to parse it as JSON-RPC. */
     336         [ +  + ]:     472543 :         if (json_parser_is_done(rpc->parser)) {
     337                 :      94402 :             *msgp = jsonrpc_parse_received_message(rpc);
     338         [ +  - ]:      94402 :             if (*msgp) {
     339                 :      94402 :                 return 0;
     340                 :            :             }
     341                 :            : 
     342         [ #  # ]:          0 :             if (rpc->status) {
     343                 :          0 :                 const struct byteq *q = &rpc->input;
     344         [ #  # ]:          0 :                 if (q->head <= q->size) {
     345                 :          0 :                     stream_report_content(q->buffer, q->head, STREAM_JSONRPC,
     346                 :          0 :                                           &this_module, rpc->name);
     347                 :            :                 }
     348                 :          0 :                 return rpc->status;
     349                 :            :             }
     350                 :            :         }
     351                 :            :     }
     352                 :            : 
     353                 :       3087 :     return EAGAIN;
     354                 :            : }
     355                 :            : 
     356                 :            : /* Causes the poll loop to wake up when jsonrpc_recv() may return a value other
     357                 :            :  * than EAGAIN. */
     358                 :            : void
     359                 :     248949 : jsonrpc_recv_wait(struct jsonrpc *rpc)
     360                 :            : {
     361 [ +  + ][ +  + ]:     248949 :     if (rpc->status || !byteq_is_empty(&rpc->input)) {
     362                 :       9683 :         poll_immediate_wake_at(rpc->name);
     363                 :            :     } else {
     364                 :     239266 :         stream_recv_wait(rpc->stream);
     365                 :            :     }
     366                 :     248949 : }
     367                 :            : 
     368                 :            : /* Sends 'msg' on 'rpc' and waits for it to be successfully queued to the
     369                 :            :  * underlying stream.  Returns 0 if 'msg' was sent successfully, otherwise a
     370                 :            :  * status value (see jsonrpc_get_status()).
     371                 :            :  *
     372                 :            :  * Always takes ownership of 'msg', regardless of success. */
     373                 :            : int
     374                 :      12147 : jsonrpc_send_block(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
     375                 :            : {
     376                 :            :     int error;
     377                 :            : 
     378                 :      12147 :     fatal_signal_run();
     379                 :            : 
     380                 :      12147 :     error = jsonrpc_send(rpc, msg);
     381         [ -  + ]:      12147 :     if (error) {
     382                 :          0 :         return error;
     383                 :            :     }
     384                 :            : 
     385                 :            :     for (;;) {
     386                 :      12147 :         jsonrpc_run(rpc);
     387 [ -  + ][ #  # ]:      12147 :         if (ovs_list_is_empty(&rpc->output) || rpc->status) {
     388                 :      12147 :             return rpc->status;
     389                 :            :         }
     390                 :          0 :         jsonrpc_wait(rpc);
     391                 :          0 :         poll_block();
     392                 :          0 :     }
     393                 :            : }
     394                 :            : 
     395                 :            : /* Waits for a message to be received on 'rpc'.  Same semantics as
     396                 :            :  * jsonrpc_recv() except that EAGAIN will never be returned. */
     397                 :            : int
     398                 :      12149 : jsonrpc_recv_block(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
     399                 :            : {
     400                 :            :     for (;;) {
     401                 :      25818 :         int error = jsonrpc_recv(rpc, msgp);
     402         [ +  + ]:      25818 :         if (error != EAGAIN) {
     403                 :      12149 :             fatal_signal_run();
     404                 :      12149 :             return error;
     405                 :            :         }
     406                 :            : 
     407                 :      13669 :         jsonrpc_run(rpc);
     408                 :      13669 :         jsonrpc_wait(rpc);
     409                 :      13669 :         jsonrpc_recv_wait(rpc);
     410                 :      13669 :         poll_block();
     411                 :      13669 :     }
     412                 :            : }
     413                 :            : 
     414                 :            : /* Sends 'request' to 'rpc' then waits for a reply.  The return value is 0 if
     415                 :            :  * successful, in which case '*replyp' is set to the reply, which the caller
     416                 :            :  * must eventually free with jsonrpc_msg_destroy().  Otherwise returns a status
     417                 :            :  * value (see jsonrpc_get_status()).
     418                 :            :  *
     419                 :            :  * Discards any message received on 'rpc' that is not a reply to 'request'
     420                 :            :  * (based on message id).
     421                 :            :  *
     422                 :            :  * Always takes ownership of 'request', regardless of success. */
     423                 :            : int
     424                 :      12146 : jsonrpc_transact_block(struct jsonrpc *rpc, struct jsonrpc_msg *request,
     425                 :            :                        struct jsonrpc_msg **replyp)
     426                 :            : {
     427                 :      12146 :     struct jsonrpc_msg *reply = NULL;
     428                 :            :     struct json *id;
     429                 :            :     int error;
     430                 :            : 
     431                 :      12146 :     id = json_clone(request->id);
     432                 :      12146 :     error = jsonrpc_send_block(rpc, request);
     433         [ +  - ]:      12146 :     if (!error) {
     434                 :            :         for (;;) {
     435                 :      12146 :             error = jsonrpc_recv_block(rpc, &reply);
     436         [ -  + ]:      12146 :             if (error) {
     437                 :          0 :                 break;
     438                 :            :             }
     439 [ +  + ][ +  - ]:      12146 :             if ((reply->type == JSONRPC_REPLY || reply->type == JSONRPC_ERROR)
     440         [ +  - ]:      12146 :                 && json_equal(id, reply->id)) {
     441                 :      12146 :                 break;
     442                 :            :             }
     443                 :          0 :             jsonrpc_msg_destroy(reply);
     444                 :          0 :         }
     445                 :            :     }
     446         [ +  - ]:      12146 :     *replyp = error ? NULL : reply;
     447                 :      12146 :     json_destroy(id);
     448                 :      12146 :     return error;
     449                 :            : }
     450                 :            : 
     451                 :            : /* Attempts to parse the content of 'rpc->parser' (which is complete JSON) as a
     452                 :            :  * JSON-RPC message.  If successful, returns the JSON-RPC message.  On failure,
     453                 :            :  * signals an error on 'rpc' with jsonrpc_error() and returns NULL. */
     454                 :            : static struct jsonrpc_msg *
     455                 :      94402 : jsonrpc_parse_received_message(struct jsonrpc *rpc)
     456                 :            : {
     457                 :            :     struct jsonrpc_msg *msg;
     458                 :            :     struct json *json;
     459                 :            :     char *error;
     460                 :            : 
     461                 :      94402 :     json = json_parser_finish(rpc->parser);
     462                 :      94402 :     rpc->parser = NULL;
     463         [ -  + ]:      94402 :     if (json->type == JSON_STRING) {
     464         [ #  # ]:          0 :         VLOG_WARN_RL(&rl, "%s: error parsing stream: %s",
     465                 :            :                      rpc->name, json_string(json));
     466                 :          0 :         jsonrpc_error(rpc, EPROTO);
     467                 :          0 :         json_destroy(json);
     468                 :          0 :         return NULL;
     469                 :            :     }
     470                 :            : 
     471                 :      94402 :     error = jsonrpc_msg_from_json(json, &msg);
     472         [ -  + ]:      94402 :     if (error) {
     473         [ #  # ]:          0 :         VLOG_WARN_RL(&rl, "%s: received bad JSON-RPC message: %s",
     474                 :            :                      rpc->name, error);
     475                 :          0 :         free(error);
     476                 :          0 :         jsonrpc_error(rpc, EPROTO);
     477                 :          0 :         return NULL;
     478                 :            :     }
     479                 :            : 
     480                 :      94402 :     jsonrpc_log_msg(rpc, "received", msg);
     481                 :      94402 :     return msg;
     482                 :            : }
     483                 :            : 
     484                 :            : static void
     485                 :      24928 : jsonrpc_error(struct jsonrpc *rpc, int error)
     486                 :            : {
     487         [ -  + ]:      24928 :     ovs_assert(error);
     488         [ +  + ]:      24928 :     if (!rpc->status) {
     489                 :      16773 :         rpc->status = error;
     490                 :      16773 :         jsonrpc_cleanup(rpc);
     491                 :            :     }
     492                 :      24928 : }
     493                 :            : 
     494                 :            : static void
     495                 :      54592 : jsonrpc_cleanup(struct jsonrpc *rpc)
     496                 :            : {
     497                 :      54592 :     stream_close(rpc->stream);
     498                 :      54592 :     rpc->stream = NULL;
     499                 :            : 
     500                 :      54592 :     json_parser_abort(rpc->parser);
     501                 :      54592 :     rpc->parser = NULL;
     502                 :            : 
     503                 :      54592 :     ofpbuf_list_delete(&rpc->output);
     504                 :      54592 :     rpc->backlog = 0;
     505                 :      54592 :     rpc->output_count = 0;
     506                 :      54592 : }
     507                 :            : 
     508                 :            : static struct jsonrpc_msg *
     509                 :      94326 : jsonrpc_create(enum jsonrpc_msg_type type, const char *method,
     510                 :            :                 struct json *params, struct json *result, struct json *error,
     511                 :            :                 struct json *id)
     512                 :            : {
     513                 :      94326 :     struct jsonrpc_msg *msg = xmalloc(sizeof *msg);
     514                 :      94326 :     msg->type = type;
     515                 :      94326 :     msg->method = nullable_xstrdup(method);
     516                 :      94326 :     msg->params = params;
     517                 :      94326 :     msg->result = result;
     518                 :      94326 :     msg->error = error;
     519                 :      94326 :     msg->id = id;
     520                 :      94326 :     return msg;
     521                 :            : }
     522                 :            : 
     523                 :            : static struct json *
     524                 :      38654 : jsonrpc_create_id(void)
     525                 :            : {
     526                 :            :     static atomic_count next_id = ATOMIC_COUNT_INIT(0);
     527                 :            :     unsigned int id;
     528                 :            : 
     529                 :      38654 :     id = atomic_count_inc(&next_id);
     530                 :      38654 :     return json_integer_create(id);
     531                 :            : }
     532                 :            : 
     533                 :            : struct jsonrpc_msg *
     534                 :      38654 : jsonrpc_create_request(const char *method, struct json *params,
     535                 :            :                        struct json **idp)
     536                 :            : {
     537                 :      38654 :     struct json *id = jsonrpc_create_id();
     538         [ +  + ]:      38654 :     if (idp) {
     539                 :      26254 :         *idp = json_clone(id);
     540                 :            :     }
     541                 :      38654 :     return jsonrpc_create(JSONRPC_REQUEST, method, params, NULL, NULL, id);
     542                 :            : }
     543                 :            : 
     544                 :            : struct jsonrpc_msg *
     545                 :      17516 : jsonrpc_create_notify(const char *method, struct json *params)
     546                 :            : {
     547                 :      17516 :     return jsonrpc_create(JSONRPC_NOTIFY, method, params, NULL, NULL, NULL);
     548                 :            : }
     549                 :            : 
     550                 :            : struct jsonrpc_msg *
     551                 :      38081 : jsonrpc_create_reply(struct json *result, const struct json *id)
     552                 :            : {
     553                 :      38081 :     return jsonrpc_create(JSONRPC_REPLY, NULL, NULL, result, NULL,
     554                 :            :                            json_clone(id));
     555                 :            : }
     556                 :            : 
     557                 :            : struct jsonrpc_msg *
     558                 :         75 : jsonrpc_create_error(struct json *error, const struct json *id)
     559                 :            : {
     560                 :         75 :     return jsonrpc_create(JSONRPC_REPLY, NULL, NULL, NULL, error,
     561                 :            :                            json_clone(id));
     562                 :            : }
     563                 :            : 
     564                 :            : const char *
     565                 :      94896 : jsonrpc_msg_type_to_string(enum jsonrpc_msg_type type)
     566                 :            : {
     567   [ +  +  +  +  :      94896 :     switch (type) {
                      - ]
     568                 :            :     case JSONRPC_REQUEST:
     569                 :      38364 :         return "request";
     570                 :            : 
     571                 :            :     case JSONRPC_NOTIFY:
     572                 :      17671 :         return "notification";
     573                 :            : 
     574                 :            :     case JSONRPC_REPLY:
     575                 :      38788 :         return "reply";
     576                 :            : 
     577                 :            :     case JSONRPC_ERROR:
     578                 :         73 :         return "error";
     579                 :            :     }
     580                 :          0 :     return "(null)";
     581                 :            : }
     582                 :            : 
     583                 :            : char *
     584                 :      94406 : jsonrpc_msg_is_valid(const struct jsonrpc_msg *m)
     585                 :            : {
     586                 :            :     const char *type_name;
     587                 :            :     unsigned int pattern;
     588                 :            : 
     589 [ +  + ][ -  + ]:      94406 :     if (m->params && m->params->type != JSON_ARRAY) {
     590                 :          0 :         return xstrdup("\"params\" must be JSON array");
     591                 :            :     }
     592                 :            : 
     593   [ +  +  +  +  :      94406 :     switch (m->type) {
                      - ]
     594                 :            :     case JSONRPC_REQUEST:
     595                 :      38159 :         pattern = 0x11001;
     596                 :      38159 :         break;
     597                 :            : 
     598                 :            :     case JSONRPC_NOTIFY:
     599                 :      17593 :         pattern = 0x11000;
     600                 :      17593 :         break;
     601                 :            : 
     602                 :            :     case JSONRPC_REPLY:
     603                 :      38581 :         pattern = 0x00101;
     604                 :      38581 :         break;
     605                 :            : 
     606                 :            :     case JSONRPC_ERROR:
     607                 :         73 :         pattern = 0x00011;
     608                 :         73 :         break;
     609                 :            : 
     610                 :            :     default:
     611                 :          0 :         return xasprintf("invalid JSON-RPC message type %d", m->type);
     612                 :            :     }
     613                 :            : 
     614                 :      94406 :     type_name = jsonrpc_msg_type_to_string(m->type);
     615         [ -  + ]:      94406 :     if ((m->method != NULL) != ((pattern & 0x10000) != 0)) {
     616         [ #  # ]:          0 :         return xasprintf("%s must%s have \"method\"",
     617                 :          0 :                          type_name, (pattern & 0x10000) ? "" : " not");
     618                 :            : 
     619                 :            :     }
     620         [ -  + ]:      94406 :     if ((m->params != NULL) != ((pattern & 0x1000) != 0)) {
     621         [ #  # ]:          0 :         return xasprintf("%s must%s have \"params\"",
     622                 :          0 :                          type_name, (pattern & 0x1000) ? "" : " not");
     623                 :            : 
     624                 :            :     }
     625         [ -  + ]:      94406 :     if ((m->result != NULL) != ((pattern & 0x100) != 0)) {
     626         [ #  # ]:          0 :         return xasprintf("%s must%s have \"result\"",
     627                 :          0 :                          type_name, (pattern & 0x100) ? "" : " not");
     628                 :            : 
     629                 :            :     }
     630         [ -  + ]:      94406 :     if ((m->error != NULL) != ((pattern & 0x10) != 0)) {
     631         [ #  # ]:          0 :         return xasprintf("%s must%s have \"error\"",
     632                 :          0 :                          type_name, (pattern & 0x10) ? "" : " not");
     633                 :            : 
     634                 :            :     }
     635         [ -  + ]:      94406 :     if ((m->id != NULL) != ((pattern & 0x1) != 0)) {
     636         [ #  # ]:          0 :         return xasprintf("%s must%s have \"id\"",
     637                 :          0 :                          type_name, (pattern & 0x1) ? "" : " not");
     638                 :            : 
     639                 :            :     }
     640                 :      94406 :     return NULL;
     641                 :            : }
     642                 :            : 
     643                 :            : void
     644                 :      94363 : jsonrpc_msg_destroy(struct jsonrpc_msg *m)
     645                 :            : {
     646         [ +  - ]:      94363 :     if (m) {
     647                 :      94363 :         free(m->method);
     648                 :      94363 :         json_destroy(m->params);
     649                 :      94363 :         json_destroy(m->result);
     650                 :      94363 :         json_destroy(m->error);
     651                 :      94363 :         json_destroy(m->id);
     652                 :      94363 :         free(m);
     653                 :            :     }
     654                 :      94363 : }
     655                 :            : 
     656                 :            : static struct json *
     657                 :     377608 : null_from_json_null(struct json *json)
     658                 :            : {
     659 [ +  + ][ +  + ]:     377608 :     if (json && json->type == JSON_NULL) {
     660                 :      56181 :         json_destroy(json);
     661                 :      56181 :         return NULL;
     662                 :            :     }
     663                 :     321427 :     return json;
     664                 :            : }
     665                 :            : 
     666                 :            : char *
     667                 :      94402 : jsonrpc_msg_from_json(struct json *json, struct jsonrpc_msg **msgp)
     668                 :            : {
     669                 :      94402 :     struct json *method = NULL;
     670                 :      94402 :     struct jsonrpc_msg *msg = NULL;
     671                 :            :     struct shash *object;
     672                 :            :     char *error;
     673                 :            : 
     674         [ -  + ]:      94402 :     if (json->type != JSON_OBJECT) {
     675                 :          0 :         error = xstrdup("message is not a JSON object");
     676                 :          0 :         goto exit;
     677                 :            :     }
     678                 :      94402 :     object = json_object(json);
     679                 :            : 
     680                 :      94402 :     method = shash_find_and_delete(object, "method");
     681 [ +  + ][ -  + ]:      94402 :     if (method && method->type != JSON_STRING) {
     682                 :          0 :         error = xstrdup("method is not a JSON string");
     683                 :          0 :         goto exit;
     684                 :            :     }
     685                 :            : 
     686                 :      94402 :     msg = xzalloc(sizeof *msg);
     687         [ +  + ]:      94402 :     msg->method = method ? xstrdup(method->u.string) : NULL;
     688                 :      94402 :     msg->params = null_from_json_null(shash_find_and_delete(object, "params"));
     689                 :      94402 :     msg->result = null_from_json_null(shash_find_and_delete(object, "result"));
     690                 :      94402 :     msg->error = null_from_json_null(shash_find_and_delete(object, "error"));
     691                 :      94402 :     msg->id = null_from_json_null(shash_find_and_delete(object, "id"));
     692 [ +  + ][ +  + ]:      94402 :     msg->type = (msg->result ? JSONRPC_REPLY
     693                 :      55821 :                  : msg->error ? JSONRPC_ERROR
     694                 :      55748 :                  : msg->id ? JSONRPC_REQUEST
     695                 :      55748 :                  : JSONRPC_NOTIFY);
     696         [ -  + ]:      94402 :     if (!shash_is_empty(object)) {
     697                 :          0 :         error = xasprintf("message has unexpected member \"%s\"",
     698                 :          0 :                           shash_first(object)->name);
     699                 :          0 :         goto exit;
     700                 :            :     }
     701                 :      94402 :     error = jsonrpc_msg_is_valid(msg);
     702         [ -  + ]:      94402 :     if (error) {
     703                 :          0 :         goto exit;
     704                 :            :     }
     705                 :            : 
     706                 :            : exit:
     707                 :      94402 :     json_destroy(method);
     708                 :      94402 :     json_destroy(json);
     709         [ -  + ]:      94402 :     if (error) {
     710                 :          0 :         jsonrpc_msg_destroy(msg);
     711                 :          0 :         msg = NULL;
     712                 :            :     }
     713                 :      94402 :     *msgp = msg;
     714                 :      94402 :     return error;
     715                 :            : }
     716                 :            : 
     717                 :            : struct json *
     718                 :      94365 : jsonrpc_msg_to_json(struct jsonrpc_msg *m)
     719                 :            : {
     720                 :      94365 :     struct json *json = json_object_create();
     721                 :            : 
     722         [ +  + ]:      94365 :     if (m->method) {
     723                 :      56170 :         json_object_put(json, "method", json_string_create_nocopy(m->method));
     724                 :            :     }
     725                 :            : 
     726         [ +  + ]:      94365 :     if (m->params) {
     727                 :      56170 :         json_object_put(json, "params", m->params);
     728                 :            :     }
     729                 :            : 
     730         [ +  + ]:      94365 :     if (m->result) {
     731                 :      38118 :         json_object_put(json, "result", m->result);
     732         [ +  + ]:      56247 :     } else if (m->type == JSONRPC_ERROR) {
     733                 :          2 :         json_object_put(json, "result", json_null_create());
     734                 :            :     }
     735                 :            : 
     736         [ +  + ]:      94365 :     if (m->error) {
     737                 :         77 :         json_object_put(json, "error", m->error);
     738         [ +  + ]:      94288 :     } else if (m->type == JSONRPC_REPLY) {
     739                 :      38118 :         json_object_put(json, "error", json_null_create());
     740                 :            :     }
     741                 :            : 
     742         [ +  + ]:      94365 :     if (m->id) {
     743                 :      76813 :         json_object_put(json, "id", m->id);
     744         [ +  + ]:      17552 :     } else if (m->type == JSONRPC_NOTIFY) {
     745                 :      17516 :         json_object_put(json, "id", json_null_create());
     746                 :            :     }
     747                 :            : 
     748                 :      94365 :     free(m);
     749                 :            : 
     750                 :      94365 :     return json;
     751                 :            : }
     752                 :            : 
     753                 :            : /* A JSON-RPC session with reconnection. */
     754                 :            : 
     755                 :            : struct jsonrpc_session {
     756                 :            :     struct reconnect *reconnect;
     757                 :            :     struct jsonrpc *rpc;
     758                 :            :     struct stream *stream;
     759                 :            :     struct pstream *pstream;
     760                 :            :     int last_error;
     761                 :            :     unsigned int seqno;
     762                 :            :     uint8_t dscp;
     763                 :            : };
     764                 :            : 
     765                 :            : /* Creates and returns a jsonrpc_session to 'name', which should be a string
     766                 :            :  * acceptable to stream_open() or pstream_open().
     767                 :            :  *
     768                 :            :  * If 'name' is an active connection method, e.g. "tcp:127.1.2.3", the new
     769                 :            :  * jsonrpc_session connects to 'name'.  If 'retry' is true, then the new
     770                 :            :  * session connects and reconnects to 'name', with backoff.  If 'retry' is
     771                 :            :  * false, the new session will only try to connect once and after a connection
     772                 :            :  * failure or a disconnection jsonrpc_session_is_alive() will return false for
     773                 :            :  * the new session.
     774                 :            :  *
     775                 :            :  * If 'name' is a passive connection method, e.g. "ptcp:", the new
     776                 :            :  * jsonrpc_session listens for connections to 'name'.  It maintains at most one
     777                 :            :  * connection at any given time.  Any new connection causes the previous one
     778                 :            :  * (if any) to be dropped. */
     779                 :            : struct jsonrpc_session *
     780                 :       7196 : jsonrpc_session_open(const char *name, bool retry)
     781                 :            : {
     782                 :            :     struct jsonrpc_session *s;
     783                 :            : 
     784                 :       7196 :     s = xmalloc(sizeof *s);
     785                 :       7196 :     s->reconnect = reconnect_create(time_msec());
     786                 :       7196 :     reconnect_set_name(s->reconnect, name);
     787                 :       7196 :     reconnect_enable(s->reconnect, time_msec());
     788                 :       7196 :     s->rpc = NULL;
     789                 :       7196 :     s->stream = NULL;
     790                 :       7196 :     s->pstream = NULL;
     791                 :       7196 :     s->seqno = 0;
     792                 :       7196 :     s->dscp = 0;
     793                 :       7196 :     s->last_error = 0;
     794                 :            : 
     795         [ -  + ]:       7196 :     if (!pstream_verify_name(name)) {
     796                 :          0 :         reconnect_set_passive(s->reconnect, true, time_msec());
     797         [ +  + ]:       7196 :     } else if (!retry) {
     798                 :       6338 :         reconnect_set_max_tries(s->reconnect, 1);
     799                 :       6338 :         reconnect_set_backoff(s->reconnect, INT_MAX, INT_MAX);
     800                 :            :     }
     801                 :            : 
     802         [ +  + ]:       7196 :     if (!stream_or_pstream_needs_probes(name)) {
     803                 :       7188 :         reconnect_set_probe_interval(s->reconnect, 0);
     804                 :            :     }
     805                 :            : 
     806                 :       7196 :     return s;
     807                 :            : }
     808                 :            : 
     809                 :            : /* Creates and returns a jsonrpc_session that is initially connected to
     810                 :            :  * 'jsonrpc'.  If the connection is dropped, it will not be reconnected.
     811                 :            :  *
     812                 :            :  * On the assumption that such connections are likely to be short-lived
     813                 :            :  * (e.g. from ovs-vsctl), informational logging for them is suppressed. */
     814                 :            : struct jsonrpc_session *
     815                 :       8186 : jsonrpc_session_open_unreliably(struct jsonrpc *jsonrpc, uint8_t dscp)
     816                 :            : {
     817                 :            :     struct jsonrpc_session *s;
     818                 :            : 
     819                 :       8186 :     s = xmalloc(sizeof *s);
     820                 :       8186 :     s->reconnect = reconnect_create(time_msec());
     821                 :       8186 :     reconnect_set_quiet(s->reconnect, true);
     822                 :       8186 :     reconnect_set_name(s->reconnect, jsonrpc_get_name(jsonrpc));
     823                 :       8186 :     reconnect_set_max_tries(s->reconnect, 0);
     824                 :       8186 :     reconnect_connected(s->reconnect, time_msec());
     825                 :       8186 :     s->dscp = dscp;
     826                 :       8186 :     s->rpc = jsonrpc;
     827                 :       8186 :     s->stream = NULL;
     828                 :       8186 :     s->pstream = NULL;
     829                 :       8186 :     s->seqno = 0;
     830                 :            : 
     831                 :       8186 :     return s;
     832                 :            : }
     833                 :            : 
     834                 :            : void
     835                 :      15347 : jsonrpc_session_close(struct jsonrpc_session *s)
     836                 :            : {
     837         [ +  - ]:      15347 :     if (s) {
     838                 :      15347 :         jsonrpc_close(s->rpc);
     839                 :      15347 :         reconnect_destroy(s->reconnect);
     840                 :      15347 :         stream_close(s->stream);
     841                 :      15347 :         pstream_close(s->pstream);
     842                 :      15347 :         free(s);
     843                 :            :     }
     844                 :      15347 : }
     845                 :            : 
     846                 :            : static void
     847                 :      15359 : jsonrpc_session_disconnect(struct jsonrpc_session *s)
     848                 :            : {
     849         [ +  + ]:      15359 :     if (s->rpc) {
     850                 :       8159 :         jsonrpc_error(s->rpc, EOF);
     851                 :       8159 :         jsonrpc_close(s->rpc);
     852                 :       8159 :         s->rpc = NULL;
     853                 :       8159 :         s->seqno++;
     854         [ -  + ]:       7200 :     } else if (s->stream) {
     855                 :          0 :         stream_close(s->stream);
     856                 :          0 :         s->stream = NULL;
     857                 :          0 :         s->seqno++;
     858                 :            :     }
     859                 :      15359 : }
     860                 :            : 
     861                 :            : static void
     862                 :       7200 : jsonrpc_session_connect(struct jsonrpc_session *s)
     863                 :            : {
     864                 :       7200 :     const char *name = reconnect_get_name(s->reconnect);
     865                 :            :     int error;
     866                 :            : 
     867                 :       7200 :     jsonrpc_session_disconnect(s);
     868         [ +  - ]:       7200 :     if (!reconnect_is_passive(s->reconnect)) {
     869                 :       7200 :         error = jsonrpc_stream_open(name, &s->stream, s->dscp);
     870         [ +  + ]:       7200 :         if (!error) {
     871                 :       7191 :             reconnect_connecting(s->reconnect, time_msec());
     872                 :            :         } else {
     873                 :       7200 :             s->last_error = error;
     874                 :            :         }
     875                 :            :     } else {
     876         [ #  # ]:          0 :         error = s->pstream ? 0 : jsonrpc_pstream_open(name, &s->pstream,
     877                 :          0 :                                                       s->dscp);
     878         [ #  # ]:          0 :         if (!error) {
     879                 :          0 :             reconnect_listening(s->reconnect, time_msec());
     880                 :            :         }
     881                 :            :     }
     882                 :            : 
     883         [ +  + ]:       7200 :     if (error) {
     884                 :          9 :         reconnect_connect_failed(s->reconnect, time_msec(), error);
     885                 :            :     }
     886                 :       7200 :     s->seqno++;
     887                 :       7200 : }
     888                 :            : 
     889                 :            : void
     890                 :     243584 : jsonrpc_session_run(struct jsonrpc_session *s)
     891                 :            : {
     892         [ -  + ]:     243584 :     if (s->pstream) {
     893                 :            :         struct stream *stream;
     894                 :            :         int error;
     895                 :            : 
     896                 :          0 :         error = pstream_accept(s->pstream, &stream);
     897         [ #  # ]:          0 :         if (!error) {
     898 [ #  # ][ #  # ]:          0 :             if (s->rpc || s->stream) {
     899         [ #  # ]:          0 :                 VLOG_INFO_RL(&rl,
     900                 :            :                              "%s: new connection replacing active connection",
     901                 :            :                              reconnect_get_name(s->reconnect));
     902                 :          0 :                 jsonrpc_session_disconnect(s);
     903                 :            :             }
     904                 :          0 :             reconnect_connected(s->reconnect, time_msec());
     905                 :          0 :             s->rpc = jsonrpc_open(stream);
     906         [ #  # ]:          0 :         } else if (error != EAGAIN) {
     907                 :          0 :             reconnect_listen_error(s->reconnect, time_msec(), error);
     908                 :          0 :             pstream_close(s->pstream);
     909                 :          0 :             s->pstream = NULL;
     910                 :            :         }
     911                 :            :     }
     912                 :            : 
     913         [ +  + ]:     243584 :     if (s->rpc) {
     914                 :            :         size_t backlog;
     915                 :            :         int error;
     916                 :            : 
     917                 :     228971 :         backlog = jsonrpc_get_backlog(s->rpc);
     918                 :     228971 :         jsonrpc_run(s->rpc);
     919         [ +  + ]:     228971 :         if (jsonrpc_get_backlog(s->rpc) < backlog) {
     920                 :            :             /* Data previously caught in a queue was successfully sent (or
     921                 :            :              * there's an error, which we'll catch below.)
     922                 :            :              *
     923                 :            :              * We don't count data that is successfully sent immediately as
     924                 :            :              * activity, because there's a lot of queuing downstream from us,
     925                 :            :              * which means that we can push a lot of data into a connection
     926                 :            :              * that has stalled and won't ever recover.
     927                 :            :              */
     928                 :          1 :             reconnect_activity(s->reconnect, time_msec());
     929                 :            :         }
     930                 :            : 
     931                 :     228971 :         error = jsonrpc_get_status(s->rpc);
     932         [ +  + ]:     228971 :         if (error) {
     933                 :       8155 :             reconnect_disconnected(s->reconnect, time_msec(), error);
     934                 :       8155 :             jsonrpc_session_disconnect(s);
     935                 :     228971 :             s->last_error = error;
     936                 :            :         }
     937         [ +  + ]:      14613 :     } else if (s->stream) {
     938                 :            :         int error;
     939                 :            : 
     940                 :       7194 :         stream_run(s->stream);
     941                 :       7194 :         error = stream_connect(s->stream);
     942         [ +  + ]:       7194 :         if (!error) {
     943                 :       7188 :             reconnect_connected(s->reconnect, time_msec());
     944                 :       7188 :             s->rpc = jsonrpc_open(s->stream);
     945                 :       7188 :             s->stream = NULL;
     946         [ +  + ]:          6 :         } else if (error != EAGAIN) {
     947                 :          3 :             reconnect_connect_failed(s->reconnect, time_msec(), error);
     948                 :          3 :             stream_close(s->stream);
     949                 :          3 :             s->stream = NULL;
     950                 :          3 :             s->last_error = error;
     951                 :            :         }
     952                 :            :     }
     953                 :            : 
     954   [ +  +  +  + ]:     243584 :     switch (reconnect_run(s->reconnect, time_msec())) {
     955                 :            :     case RECONNECT_CONNECT:
     956                 :       7200 :         jsonrpc_session_connect(s);
     957                 :       7200 :         break;
     958                 :            : 
     959                 :            :     case RECONNECT_DISCONNECT:
     960                 :          4 :         reconnect_disconnected(s->reconnect, time_msec(), 0);
     961                 :          4 :         jsonrpc_session_disconnect(s);
     962                 :          4 :         break;
     963                 :            : 
     964                 :            :     case RECONNECT_PROBE:
     965         [ +  - ]:        103 :         if (s->rpc) {
     966                 :            :             struct json *params;
     967                 :            :             struct jsonrpc_msg *request;
     968                 :            : 
     969                 :        103 :             params = json_array_create_empty();
     970                 :        103 :             request = jsonrpc_create_request("echo", params, NULL);
     971                 :        103 :             json_destroy(request->id);
     972                 :        103 :             request->id = json_string_create("echo");
     973                 :        103 :             jsonrpc_send(s->rpc, request);
     974                 :            :         }
     975                 :        103 :         break;
     976                 :            :     }
     977                 :     243584 : }
     978                 :            : 
     979                 :            : void
     980                 :     223638 : jsonrpc_session_wait(struct jsonrpc_session *s)
     981                 :            : {
     982         [ +  + ]:     223638 :     if (s->rpc) {
     983                 :     216114 :         jsonrpc_wait(s->rpc);
     984         [ +  + ]:       7524 :     } else if (s->stream) {
     985                 :       7194 :         stream_run_wait(s->stream);
     986                 :       7194 :         stream_connect_wait(s->stream);
     987                 :            :     }
     988         [ -  + ]:     223638 :     if (s->pstream) {
     989                 :          0 :         pstream_wait(s->pstream);
     990                 :            :     }
     991                 :     223638 :     reconnect_wait(s->reconnect, time_msec());
     992                 :     223638 : }
     993                 :            : 
     994                 :            : size_t
     995                 :     182277 : jsonrpc_session_get_backlog(const struct jsonrpc_session *s)
     996                 :            : {
     997         [ +  + ]:     182277 :     return s->rpc ? jsonrpc_get_backlog(s->rpc) : 0;
     998                 :            : }
     999                 :            : 
    1000                 :            : /* Always returns a pointer to a valid C string, assuming 's' was initialized
    1001                 :            :  * correctly. */
    1002                 :            : const char *
    1003                 :          0 : jsonrpc_session_get_name(const struct jsonrpc_session *s)
    1004                 :            : {
    1005                 :          0 :     return reconnect_get_name(s->reconnect);
    1006                 :            : }
    1007                 :            : 
    1008                 :            : /* Always takes ownership of 'msg', regardless of success. */
    1009                 :            : int
    1010                 :      71440 : jsonrpc_session_send(struct jsonrpc_session *s, struct jsonrpc_msg *msg)
    1011                 :            : {
    1012         [ +  - ]:      71440 :     if (s->rpc) {
    1013                 :      71440 :         return jsonrpc_send(s->rpc, msg);
    1014                 :            :     } else {
    1015                 :          0 :         jsonrpc_msg_destroy(msg);
    1016                 :          0 :         return ENOTCONN;
    1017                 :            :     }
    1018                 :            : }
    1019                 :            : 
    1020                 :            : struct jsonrpc_msg *
    1021                 :     300110 : jsonrpc_session_recv(struct jsonrpc_session *s)
    1022                 :            : {
    1023         [ +  + ]:     300110 :     if (s->rpc) {
    1024                 :            :         unsigned int received_bytes;
    1025                 :            :         struct jsonrpc_msg *msg;
    1026                 :            : 
    1027                 :     292050 :         received_bytes = jsonrpc_get_received_bytes(s->rpc);
    1028                 :     292050 :         jsonrpc_recv(s->rpc, &msg);
    1029         [ +  + ]:     292050 :         if (received_bytes != jsonrpc_get_received_bytes(s->rpc)) {
    1030                 :            :             /* Data was successfully received.
    1031                 :            :              *
    1032                 :            :              * Previously we only counted receiving a full message as activity,
    1033                 :            :              * but with large messages or a slow connection that policy could
    1034                 :            :              * time out the session mid-message. */
    1035                 :      60824 :             reconnect_activity(s->reconnect, time_msec());
    1036                 :            :         }
    1037                 :            : 
    1038         [ +  + ]:     292050 :         if (msg) {
    1039 [ +  + ][ +  + ]:      71690 :             if (msg->type == JSONRPC_REQUEST && !strcmp(msg->method, "echo")) {
    1040                 :            :                 /* Echo request.  Send reply. */
    1041                 :            :                 struct jsonrpc_msg *reply;
    1042                 :            : 
    1043                 :        103 :                 reply = jsonrpc_create_reply(json_clone(msg->params), msg->id);
    1044                 :        103 :                 jsonrpc_session_send(s, reply);
    1045         [ +  + ]:      71484 :             } else if (msg->type == JSONRPC_REPLY
    1046 [ +  - ][ +  + ]:      26468 :                        && msg->id && msg->id->type == JSON_STRING
    1047         [ +  - ]:        103 :                        && !strcmp(msg->id->u.string, "echo")) {
    1048                 :            :                 /* It's a reply to our echo request.  Suppress it. */
    1049                 :            :             } else {
    1050                 :      71381 :                 return msg;
    1051                 :            :             }
    1052                 :     220669 :             jsonrpc_msg_destroy(msg);
    1053                 :            :         }
    1054                 :            :     }
    1055                 :     228729 :     return NULL;
    1056                 :            : }
    1057                 :            : 
    1058                 :            : void
    1059                 :     219555 : jsonrpc_session_recv_wait(struct jsonrpc_session *s)
    1060                 :            : {
    1061         [ +  + ]:     219555 :     if (s->rpc) {
    1062                 :     212031 :         jsonrpc_recv_wait(s->rpc);
    1063                 :            :     }
    1064                 :     219555 : }
    1065                 :            : 
    1066                 :            : /* Returns true if 's' is currently connected or trying to connect. */
    1067                 :            : bool
    1068                 :     112038 : jsonrpc_session_is_alive(const struct jsonrpc_session *s)
    1069                 :            : {
    1070 [ +  + ][ +  + ]:     112038 :     return s->rpc || s->stream || reconnect_get_max_tries(s->reconnect);
                 [ +  + ]
    1071                 :            : }
    1072                 :            : 
    1073                 :            : /* Returns true if 's' is currently connected. */
    1074                 :            : bool
    1075                 :     226653 : jsonrpc_session_is_connected(const struct jsonrpc_session *s)
    1076                 :            : {
    1077                 :     226653 :     return s->rpc != NULL;
    1078                 :            : }
    1079                 :            : 
    1080                 :            : /* Returns a sequence number for 's'.  The sequence number increments every
    1081                 :            :  * time 's' connects or disconnects.  Thus, a caller can use the change (or
    1082                 :            :  * lack of change) in the sequence number to figure out whether the underlying
    1083                 :            :  * connection is the same as before. */
    1084                 :            : unsigned int
    1085                 :     316597 : jsonrpc_session_get_seqno(const struct jsonrpc_session *s)
    1086                 :            : {
    1087                 :     316597 :     return s->seqno;
    1088                 :            : }
    1089                 :            : 
    1090                 :            : /* Returns the current status of 's'.  If 's' is NULL or is disconnected, this
    1091                 :            :  * is 0, otherwise it is the status of the connection, as reported by
    1092                 :            :  * jsonrpc_get_status(). */
    1093                 :            : int
    1094                 :          0 : jsonrpc_session_get_status(const struct jsonrpc_session *s)
    1095                 :            : {
    1096 [ #  # ][ #  # ]:          0 :     return s && s->rpc ? jsonrpc_get_status(s->rpc) : 0;
    1097                 :            : }
    1098                 :            : 
    1099                 :            : /* Returns the last error reported on a connection by 's'.  The return value is
    1100                 :            :  * 0 only if no connection made by 's' has ever encountered an error.  See
    1101                 :            :  * jsonrpc_get_status() for return value interpretation. */
    1102                 :            : int
    1103                 :          5 : jsonrpc_session_get_last_error(const struct jsonrpc_session *s)
    1104                 :            : {
    1105                 :          5 :     return s->last_error;
    1106                 :            : }
    1107                 :            : 
    1108                 :            : /* Populates 'stats' with statistics from 's'. */
    1109                 :            : void
    1110                 :          0 : jsonrpc_session_get_reconnect_stats(const struct jsonrpc_session *s,
    1111                 :            :                                     struct reconnect_stats *stats)
    1112                 :            : {
    1113                 :          0 :     reconnect_get_stats(s->reconnect, time_msec(), stats);
    1114                 :          0 : }
    1115                 :            : 
    1116                 :            : /* Enables 's' to reconnect to the peer if the connection drops. */
    1117                 :            : void
    1118                 :       2164 : jsonrpc_session_enable_reconnect(struct jsonrpc_session *s)
    1119                 :            : {
    1120                 :       2164 :     reconnect_set_max_tries(s->reconnect, UINT_MAX);
    1121                 :       2164 :     reconnect_set_backoff(s->reconnect, RECONNECT_DEFAULT_MIN_BACKOFF,
    1122                 :            :                           RECONNECT_DEFAULT_MAX_BACKOFF);
    1123                 :       2164 : }
    1124                 :            : 
    1125                 :            : /* Forces 's' to drop its connection (if any) and reconnect. */
    1126                 :            : void
    1127                 :          4 : jsonrpc_session_force_reconnect(struct jsonrpc_session *s)
    1128                 :            : {
    1129                 :          4 :     reconnect_force_reconnect(s->reconnect, time_msec());
    1130                 :          4 : }
    1131                 :            : 
    1132                 :            : /* Sets 'max_backoff' as the maximum time, in milliseconds, to wait after a
    1133                 :            :  * connection attempt fails before attempting to connect again. */
    1134                 :            : void
    1135                 :      86930 : jsonrpc_session_set_max_backoff(struct jsonrpc_session *s, int max_backoff)
    1136                 :            : {
    1137                 :      86930 :     reconnect_set_backoff(s->reconnect, 0, max_backoff);
    1138                 :      86930 : }
    1139                 :            : 
    1140                 :            : /* Sets the "probe interval" for 's' to 'probe_interval', in milliseconds.  If
    1141                 :            :  * this is zero, it disables the connection keepalive feature.  Otherwise, if
    1142                 :            :  * 's' is idle for 'probe_interval' milliseconds then 's' will send an echo
    1143                 :            :  * request and, if no reply is received within an additional 'probe_interval'
    1144                 :            :  * milliseconds, close the connection (then reconnect, if that feature is
    1145                 :            :  * enabled). */
    1146                 :            : void
    1147                 :      90097 : jsonrpc_session_set_probe_interval(struct jsonrpc_session *s,
    1148                 :            :                                    int probe_interval)
    1149                 :            : {
    1150                 :      90097 :     reconnect_set_probe_interval(s->reconnect, probe_interval);
    1151                 :      90097 : }
    1152                 :            : 
    1153                 :            : /* Sets the DSCP value used for 's''s connection to 'dscp'.  If this is
    1154                 :            :  * different from the DSCP value currently in use then the connection is closed
    1155                 :            :  * and reconnected. */
    1156                 :            : void
    1157                 :      86930 : jsonrpc_session_set_dscp(struct jsonrpc_session *s, uint8_t dscp)
    1158                 :            : {
    1159         [ -  + ]:      86930 :     if (s->dscp != dscp) {
    1160                 :          0 :         pstream_close(s->pstream);
    1161                 :          0 :         s->pstream = NULL;
    1162                 :            : 
    1163                 :          0 :         s->dscp = dscp;
    1164                 :          0 :         jsonrpc_session_force_reconnect(s);
    1165                 :            :     }
    1166                 :      86930 : }

Generated by: LCOV version 1.12