2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 #include "SwitchConnection.h"
10 #include <opflexagent/logging.h>
12 #include <sys/eventfd.h>
16 #include <boost/scope_exit.hpp>
18 #include <unordered_map>
20 #include "ovs-ofputil.h"
22 #include <openvswitch/poll-loop.h>
23 #define HAVE_STRUCT_MMSGHDR_MSG_LEN
28 #include <lib/socket-util.h>
29 #include <lib/stream.h>
30 #include <openvswitch/vconn.h>
31 #include <openvswitch/ofp-msgs.h>
32 #include <openvswitch/ofp-packet.h>
33 #include <openvswitch/ofp-monitor.h>
36 typedef std::lock_guard<std::mutex> mutex_guard;
38 const int LOST_CONN_BACKOFF_MSEC = 5000;
39 const std::chrono::seconds ECHO_INTERVAL(5);
40 const std::chrono::seconds MAX_ECHO_INTERVAL(30);
42 namespace opflexagent {
44 int SwitchConnection::DecodeFlowRemoved(ofpbuf *msg,
45 struct ofputil_flow_removed* fentry) {
46 const struct ofp_header *oh = (ofp_header *)msg->data;
48 bzero(fentry, sizeof(struct ofputil_flow_removed));
50 ret = ofputil_decode_flow_removed(fentry, oh);
52 LOG(ERROR) << "Failed to decode flow removed message: "
56 /* ovs 2.11.2 specific changes. Check handleFlowStats for
57 * more comments on below fix */
59 fentry->match.flow.packet_type = 0;
60 fentry->match.wc.masks.packet_type = 0;
66 SwitchConnection::SwitchConnection(const std::string& swName) :
67 switchName(swName), ofConn(NULL) {
69 ofProtoVersion = OFP10_VERSION;
70 isDisconnecting = false;
72 pollEventFd = eventfd(0, 0);
74 RegisterMessageHandler(OFPTYPE_ECHO_REQUEST, &echoReqHandler);
75 RegisterMessageHandler(OFPTYPE_ECHO_REPLY, &echoRepHandler);
76 RegisterMessageHandler(OFPTYPE_ERROR, &errorHandler);
79 SwitchConnection::~SwitchConnection() {
84 SwitchConnection::RegisterOnConnectListener(OnConnectListener *l) {
86 mutex_guard lock(connMtx);
87 onConnectListeners.push_back(l);
92 SwitchConnection::UnregisterOnConnectListener(OnConnectListener *l) {
93 mutex_guard lock(connMtx);
94 onConnectListeners.remove(l);
98 SwitchConnection::RegisterMessageHandler(int msgType, MessageHandler *handler)
101 mutex_guard lock(connMtx);
102 msgHandlers[msgType].push_back(handler);
107 SwitchConnection::UnregisterMessageHandler(int msgType,
108 MessageHandler *handler)
110 mutex_guard lock(connMtx);
111 HandlerMap::iterator itr = msgHandlers.find(msgType);
112 if (itr != msgHandlers.end()) {
113 itr->second.remove(handler);
118 SwitchConnection::Connect(int protoVer) {
119 if (ofConn != NULL) { // connection already created
123 ofProtoVersion = protoVer;
124 int err = doConnectOF();
126 LOG(ERROR) << "Failed to connect to " << switchName << ": "
127 << ovs_strerror(err);
129 connThread.reset(new std::thread(std::ref(*this)));
134 SwitchConnection::doConnectOF() {
136 swPath.append("unix:").append(ovs_rundir()).append("/")
137 .append(switchName).append(".mgmt");
139 uint32_t versionBitmap = 1u << ofProtoVersion;
142 error = vconn_open_block(swPath.c_str(), versionBitmap, DSCP_DEFAULT,
148 /* Verify we have the correct protocol version */
149 int connVersion = vconn_get_version(newConn);
150 if (connVersion != ofProtoVersion) {
151 LOG(WARNING) << "Remote supports version " << connVersion <<
152 ", wanted " << ofProtoVersion;
154 LOG(INFO) << "Connected to switch " << swPath
155 << " using protocol version " << ofProtoVersion;
157 mutex_guard lock(connMtx);
158 lastEchoTime = std::chrono::steady_clock::now();
161 ofProtoVersion = connVersion;
166 void SwitchConnection::cleanupOFConn() {
167 if (ofConn != NULL) {
174 SwitchConnection::Disconnect() {
175 isDisconnecting = true;
176 if (connThread && SignalPollEvent()) {
181 mutex_guard lock(connMtx);
183 isDisconnecting = false;
187 SwitchConnection::IsConnected() {
188 mutex_guard lock(connMtx);
189 return IsConnectedLocked();
193 SwitchConnection::IsConnectedLocked() {
194 return ofConn != NULL && vconn_get_status(ofConn) == 0;
198 SwitchConnection::GetProtocolVersion() {
199 return ofProtoVersion;
202 std::string SwitchConnection::getSwitchName() {
207 SwitchConnection::operator()() {
212 SwitchConnection::SignalPollEvent() {
214 ssize_t szWrote = write(pollEventFd, &data, sizeof(data));
215 if (szWrote != sizeof(data)) {
216 LOG(ERROR) << "Failed to send event to poll loop: " << strerror(errno);
223 SwitchConnection::WatchPollEvent() {
224 poll_fd_wait(pollEventFd, POLLIN);
228 SwitchConnection::Monitor() {
229 LOG(DEBUG) << "Connection monitor started ...";
231 bool connLost = (IsConnected() == false);
233 FireOnConnectListeners();
238 LOG(ERROR) << "Connection lost, trying to auto reconnect";
239 mutex_guard lock(connMtx);
242 bool oldConnLost = connLost;
243 while (connLost && !isDisconnecting) {
245 poll_timer_wait(LOST_CONN_BACKOFF_MSEC);
246 poll_block(); // block till timer expires or disconnect is requested
247 if (!isDisconnecting) {
248 connLost = (doConnectOF() != 0);
251 if (isDisconnecting) {
254 if (oldConnLost != connLost) {
255 FireOnConnectListeners();
260 mutex_guard lock(connMtx);
261 poll_timer_wait(LOST_CONN_BACKOFF_MSEC);
264 vconn_run_wait(ofConn);
265 vconn_recv_wait(ofConn);
269 connLost = (EOF == receiveOFMessage());
272 std::chrono::time_point<std::chrono::steady_clock> echoTime;
274 mutex_guard lock(connMtx);
275 echoTime = lastEchoTime;
278 auto diff = std::chrono::steady_clock::now() - echoTime;
279 if (diff >= MAX_ECHO_INTERVAL) {
280 LOG(ERROR) << "Timed out reading from switch socket";
282 } else if (diff >= ECHO_INTERVAL) {
284 ofputil_encode_echo_request((ofp_version)GetProtocolVersion()));
292 SwitchConnection::receiveOFMessage() {
297 mutex_guard lock(connMtx);
298 err = vconn_recv(ofConn, &recvMsg);
302 } else if (err != 0) {
303 LOG(ERROR) << "Error while receiving message: "
304 << ovs_strerror(err);
305 ofpbuf_delete(recvMsg);
309 if (!ofptype_decode(&type, (ofp_header *)recvMsg->data)) {
310 struct ofputil_flow_removed flow_removed;
311 if(type == OFPTYPE_FLOW_REMOVED) {
312 if(DecodeFlowRemoved(recvMsg, &flow_removed) != 0) {
316 HandlerMap::const_iterator itr = msgHandlers.find(type);
317 if (itr != msgHandlers.end()) {
318 for (MessageHandler *h : itr->second) {
319 h->Handle(this, type, recvMsg, &flow_removed);
323 ofpbuf_delete(recvMsg);
330 SwitchConnection::SendMessage(OfpBuf& msg) {
332 mutex_guard lock(connMtx);
333 if (!IsConnectedLocked()) {
336 int err = vconn_send(ofConn, msg.get());
338 // vconn_send takes ownership
341 } else if (err != EAGAIN) {
342 LOG(ERROR) << "Error sending OF message: " << ovs_strerror(err);
346 vconn_send_wait(ofConn);
352 SwitchConnection::FireOnConnectListeners() {
353 if (GetProtocolVersion() >= OFP12_VERSION) {
354 // Set controller role to MASTER
355 ofp12_role_request *rr;
356 OfpBuf b0(ofpraw_alloc(OFPRAW_OFPT12_ROLE_REQUEST,
357 GetProtocolVersion(), sizeof *rr));
358 rr = (ofp12_role_request*)b0.put_zeros(sizeof *rr);
359 rr->role = htonl(OFPCR12_ROLE_MASTER);
363 // Set default miss length to non-zero value to enable
364 // asynchronous messages
365 ofp_switch_config *osc;
366 OfpBuf b1(ofpraw_alloc(OFPRAW_OFPT_SET_CONFIG,
367 GetProtocolVersion(), sizeof *osc));
368 osc = (ofp_switch_config*)b1.put_zeros(sizeof *osc);
369 osc->miss_send_len = htons(OFP_DEFAULT_MISS_SEND_LEN);
373 // Set packet-in format to nicira-extended format
374 OfpBuf b2(ofputil_encode_set_packet_in_format((ofp_version)GetProtocolVersion(),
375 OFPUTIL_PACKET_IN_NXT));
378 notifyConnectListeners();
382 SwitchConnection::notifyConnectListeners() {
383 for (OnConnectListener *l : onConnectListeners) {
389 SwitchConnection::EchoRequestHandler::Handle(SwitchConnection *swConn,
391 struct ofputil_flow_removed*) {
392 const ofp_header *rq = (const ofp_header *)msg->data;
393 OfpBuf echoReplyMsg(ofputil_encode_echo_reply(rq));
394 swConn->SendMessage(echoReplyMsg);
398 SwitchConnection::EchoReplyHandler::Handle(SwitchConnection *swConn,
400 struct ofputil_flow_removed*) {
401 mutex_guard lock(swConn->connMtx);
402 swConn->lastEchoTime = std::chrono::steady_clock::now();
406 SwitchConnection::ErrorHandler::Handle(SwitchConnection*, int,
408 struct ofputil_flow_removed*) {
409 const struct ofp_header *oh = (ofp_header *)msg->data;
410 ofperr err = ofperr_decode_msg(oh, NULL);
411 LOG(ERROR) << "Got error reply from switch ("
412 << ntohl(oh->xid) << "): "
413 << ofperr_get_name(err) << ": "
414 << ofperr_get_description(err);
417 } // namespace opflexagent