Infra fix for drop stats missing
[opflex.git] / agent-ovs / ovs / SwitchConnection.cpp
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 #include "SwitchConnection.h"
10 #include <opflexagent/logging.h>
11
12 #include <sys/eventfd.h>
13 #include <string>
14 #include <fstream>
15
16 #include <boost/scope_exit.hpp>
17
18 #include <unordered_map>
19
20 #include "ovs-ofputil.h"
21 #include <lib/util.h>
22 #include <openvswitch/poll-loop.h>
23 #define HAVE_STRUCT_MMSGHDR_MSG_LEN
24 #define HAVE_SENDMMSG
25
26 extern "C" {
27 #include <lib/dirs.h>
28 #include <lib/socket-util.h>
29 #include <lib/stream.h>
30 #include <openvswitch/vconn.h>
31 #include <openvswitch/ofp-msgs.h>
32 #include <openvswitch/ofp-packet.h>
33 #include <openvswitch/ofp-monitor.h>
34 }
35
36 typedef std::lock_guard<std::mutex> mutex_guard;
37
38 const int LOST_CONN_BACKOFF_MSEC = 5000;
39 const std::chrono::seconds ECHO_INTERVAL(5);
40 const std::chrono::seconds MAX_ECHO_INTERVAL(30);
41
42 namespace opflexagent {
43
44 int SwitchConnection::DecodeFlowRemoved(ofpbuf *msg,
45         struct ofputil_flow_removed* fentry) {
46     const struct ofp_header *oh = (ofp_header *)msg->data;
47     int ret;
48     bzero(fentry, sizeof(struct ofputil_flow_removed));
49
50     ret = ofputil_decode_flow_removed(fentry, oh);
51     if (ret != 0) {
52         LOG(ERROR) << "Failed to decode flow removed message: "
53                    << ovs_strerror(ret);
54         return ret;
55     } else {
56         /* ovs 2.11.2 specific changes. Check handleFlowStats for
57          * more comments on below fix */
58         if (fentry) {
59             fentry->match.flow.packet_type = 0;
60             fentry->match.wc.masks.packet_type = 0;
61         }
62     }
63     return ret;
64 }
65
66 SwitchConnection::SwitchConnection(const std::string& swName) :
67     switchName(swName), ofConn(NULL) {
68     connThread = NULL;
69     ofProtoVersion = OFP10_VERSION;
70     isDisconnecting = false;
71
72     pollEventFd = eventfd(0, 0);
73
74     RegisterMessageHandler(OFPTYPE_ECHO_REQUEST, &echoReqHandler);
75     RegisterMessageHandler(OFPTYPE_ECHO_REPLY, &echoRepHandler);
76     RegisterMessageHandler(OFPTYPE_ERROR, &errorHandler);
77 }
78
79 SwitchConnection::~SwitchConnection() {
80     Disconnect();
81 }
82
83 void
84 SwitchConnection::RegisterOnConnectListener(OnConnectListener *l) {
85     if (l) {
86         mutex_guard lock(connMtx);
87         onConnectListeners.push_back(l);
88     }
89 }
90
91 void
92 SwitchConnection::UnregisterOnConnectListener(OnConnectListener *l) {
93     mutex_guard lock(connMtx);
94     onConnectListeners.remove(l);
95 }
96
97 void
98 SwitchConnection::RegisterMessageHandler(int msgType, MessageHandler *handler)
99 {
100     if (handler) {
101         mutex_guard lock(connMtx);
102         msgHandlers[msgType].push_back(handler);
103     }
104 }
105
106 void
107 SwitchConnection::UnregisterMessageHandler(int msgType,
108         MessageHandler *handler)
109 {
110     mutex_guard lock(connMtx);
111     HandlerMap::iterator itr = msgHandlers.find(msgType);
112     if (itr != msgHandlers.end()) {
113         itr->second.remove(handler);
114     }
115 }
116
117 int
118 SwitchConnection::Connect(int protoVer) {
119     if (ofConn != NULL) {    // connection already created
120         return true;
121     }
122
123     ofProtoVersion = protoVer;
124     int err = doConnectOF();
125     if (err != 0) {
126         LOG(ERROR) << "Failed to connect to " << switchName << ": "
127             << ovs_strerror(err);
128     }
129     connThread.reset(new std::thread(std::ref(*this)));
130     return err;
131 }
132
133 int
134 SwitchConnection::doConnectOF() {
135     std::string swPath;
136     swPath.append("unix:").append(ovs_rundir()).append("/")
137             .append(switchName).append(".mgmt");
138
139     uint32_t versionBitmap = 1u << ofProtoVersion;
140     vconn *newConn;
141     int error;
142     error = vconn_open_block(swPath.c_str(), versionBitmap, DSCP_DEFAULT,
143             0, &newConn);
144     if (error) {
145         return error;
146     }
147
148     /* Verify we have the correct protocol version */
149     int connVersion = vconn_get_version(newConn);
150     if (connVersion != ofProtoVersion) {
151         LOG(WARNING) << "Remote supports version " << connVersion <<
152                 ", wanted " << ofProtoVersion;
153     }
154     LOG(INFO) << "Connected to switch " << swPath
155             << " using protocol version " << ofProtoVersion;
156     {
157         mutex_guard lock(connMtx);
158         lastEchoTime = std::chrono::steady_clock::now();
159         cleanupOFConn();
160         ofConn = newConn;
161         ofProtoVersion = connVersion;
162     }
163     return 0;
164 }
165
166 void SwitchConnection::cleanupOFConn() {
167     if (ofConn != NULL) {
168         vconn_close(ofConn);
169         ofConn = NULL;
170     }
171 }
172
173 void
174 SwitchConnection::Disconnect() {
175     isDisconnecting = true;
176     if (connThread && SignalPollEvent()) {
177         connThread->join();
178         connThread.reset();
179     }
180
181     mutex_guard lock(connMtx);
182     cleanupOFConn();
183     isDisconnecting = false;
184 }
185
186 bool
187 SwitchConnection::IsConnected() {
188     mutex_guard lock(connMtx);
189     return IsConnectedLocked();
190 }
191
192 bool
193 SwitchConnection::IsConnectedLocked() {
194     return ofConn != NULL && vconn_get_status(ofConn) == 0;
195 }
196
197 int
198 SwitchConnection::GetProtocolVersion() {
199     return ofProtoVersion;
200 }
201
202 std::string SwitchConnection::getSwitchName() {
203     return switchName;
204 }
205
206 void
207 SwitchConnection::operator()() {
208     Monitor();
209 }
210
211 bool
212 SwitchConnection::SignalPollEvent() {
213     uint64_t data = 1;
214     ssize_t szWrote = write(pollEventFd, &data, sizeof(data));
215     if (szWrote != sizeof(data)) {
216         LOG(ERROR) << "Failed to send event to poll loop: " << strerror(errno);
217         return false;
218     }
219     return true;
220 }
221
222 void
223 SwitchConnection::WatchPollEvent() {
224     poll_fd_wait(pollEventFd, POLLIN);
225 }
226
227 void
228 SwitchConnection::Monitor() {
229     LOG(DEBUG) << "Connection monitor started ...";
230
231     bool connLost = (IsConnected() == false);
232     if (!connLost) {
233         FireOnConnectListeners();
234     }
235
236     while (true) {
237         if (connLost) {
238             LOG(ERROR) << "Connection lost, trying to auto reconnect";
239             mutex_guard lock(connMtx);
240             cleanupOFConn();
241         }
242         bool oldConnLost = connLost;
243         while (connLost && !isDisconnecting) {
244             WatchPollEvent();
245             poll_timer_wait(LOST_CONN_BACKOFF_MSEC);
246             poll_block(); // block till timer expires or disconnect is requested
247             if (!isDisconnecting) {
248                 connLost = (doConnectOF() != 0);
249             }
250         }
251         if (isDisconnecting) {
252             return;
253         }
254         if (oldConnLost != connLost) {
255             FireOnConnectListeners();
256         }
257         WatchPollEvent();
258         if (!connLost) {
259             {
260                 mutex_guard lock(connMtx);
261                 poll_timer_wait(LOST_CONN_BACKOFF_MSEC);
262
263                 vconn_run(ofConn);
264                 vconn_run_wait(ofConn);
265                 vconn_recv_wait(ofConn);
266             }
267             poll_block();
268         }
269         connLost = (EOF == receiveOFMessage());
270
271         if (!connLost) {
272             std::chrono::time_point<std::chrono::steady_clock> echoTime;
273             {
274                 mutex_guard lock(connMtx);
275                 echoTime = lastEchoTime;
276             }
277
278             auto diff = std::chrono::steady_clock::now() - echoTime;
279             if (diff >= MAX_ECHO_INTERVAL) {
280                 LOG(ERROR) << "Timed out reading from switch socket";
281                 connLost = true;
282             } else if (diff >= ECHO_INTERVAL) {
283                 OfpBuf msg(
284                     ofputil_encode_echo_request((ofp_version)GetProtocolVersion()));
285                 SendMessage(msg);
286             }
287         }
288     }
289 }
290
291 int
292 SwitchConnection::receiveOFMessage() {
293     do {
294         int err;
295         ofpbuf *recvMsg;
296         {
297             mutex_guard lock(connMtx);
298             err = vconn_recv(ofConn, &recvMsg);
299         }
300         if (err == EAGAIN) {
301             return 0;
302         } else if (err != 0) {
303             LOG(ERROR) << "Error while receiving message: "
304                     << ovs_strerror(err);
305             ofpbuf_delete(recvMsg);
306             return err;
307         } else {
308             ofptype type;
309             if (!ofptype_decode(&type, (ofp_header *)recvMsg->data)) {
310                 struct ofputil_flow_removed flow_removed;
311                 if(type == OFPTYPE_FLOW_REMOVED) {
312                     if(DecodeFlowRemoved(recvMsg, &flow_removed) != 0) {
313                         break;
314                     }
315                 }
316                 HandlerMap::const_iterator itr = msgHandlers.find(type);
317                 if (itr != msgHandlers.end()) {
318                     for (MessageHandler *h : itr->second) {
319                         h->Handle(this, type, recvMsg, &flow_removed);
320                     }
321                 }
322             }
323             ofpbuf_delete(recvMsg);
324         }
325     } while (true);
326     return 0;
327 }
328
329 int
330 SwitchConnection::SendMessage(OfpBuf& msg) {
331     while(true) {
332         mutex_guard lock(connMtx);
333         if (!IsConnectedLocked()) {
334             return ENOTCONN;
335         }
336         int err = vconn_send(ofConn, msg.get());
337         if (err == 0) {
338             // vconn_send takes ownership
339             msg.release();
340             return 0;
341         } else if (err != EAGAIN) {
342             LOG(ERROR) << "Error sending OF message: " << ovs_strerror(err);
343             return err;
344         } else {
345             vconn_run(ofConn);
346             vconn_send_wait(ofConn);
347         }
348     }
349 }
350
351 void
352 SwitchConnection::FireOnConnectListeners() {
353     if (GetProtocolVersion() >= OFP12_VERSION) {
354         // Set controller role to MASTER
355         ofp12_role_request *rr;
356         OfpBuf b0(ofpraw_alloc(OFPRAW_OFPT12_ROLE_REQUEST,
357                                GetProtocolVersion(), sizeof *rr));
358         rr = (ofp12_role_request*)b0.put_zeros(sizeof *rr);
359         rr->role = htonl(OFPCR12_ROLE_MASTER);
360         SendMessage(b0);
361     }
362     {
363         // Set default miss length to non-zero value to enable
364         // asynchronous messages
365         ofp_switch_config *osc;
366         OfpBuf b1(ofpraw_alloc(OFPRAW_OFPT_SET_CONFIG,
367                                GetProtocolVersion(), sizeof *osc));
368         osc = (ofp_switch_config*)b1.put_zeros(sizeof *osc);
369         osc->miss_send_len = htons(OFP_DEFAULT_MISS_SEND_LEN);
370         SendMessage(b1);
371     }
372     {
373         // Set packet-in format to nicira-extended format
374         OfpBuf b2(ofputil_encode_set_packet_in_format((ofp_version)GetProtocolVersion(),
375                   OFPUTIL_PACKET_IN_NXT));
376         SendMessage(b2);
377     }
378     notifyConnectListeners();
379 }
380
381 void
382 SwitchConnection::notifyConnectListeners() {
383     for (OnConnectListener *l : onConnectListeners) {
384         l->Connected(this);
385     }
386 }
387
388 void
389 SwitchConnection::EchoRequestHandler::Handle(SwitchConnection *swConn,
390                                              int, ofpbuf *msg,
391                                              struct ofputil_flow_removed*) {
392     const ofp_header *rq = (const ofp_header *)msg->data;
393     OfpBuf echoReplyMsg(ofputil_encode_echo_reply(rq));
394     swConn->SendMessage(echoReplyMsg);
395 }
396
397 void
398 SwitchConnection::EchoReplyHandler::Handle(SwitchConnection *swConn,
399                                     int, ofpbuf *msg,
400                                     struct ofputil_flow_removed*) {
401     mutex_guard lock(swConn->connMtx);
402     swConn->lastEchoTime = std::chrono::steady_clock::now();
403 }
404
405 void
406 SwitchConnection::ErrorHandler::Handle(SwitchConnection*, int,
407                                        ofpbuf *msg,
408                                        struct ofputil_flow_removed*) {
409     const struct ofp_header *oh = (ofp_header *)msg->data;
410     ofperr err = ofperr_decode_msg(oh, NULL);
411     LOG(ERROR) << "Got error reply from switch ("
412                << ntohl(oh->xid) << "): "
413                << ofperr_get_name(err) << ": "
414                << ofperr_get_description(err);
415 }
416
417 } // namespace opflexagent