}
void ContractStatsManager::Handle(SwitchConnection* connection,
- int msgType, ofpbuf *msg) {
+ int msgType,
+ ofpbuf *msg,
+ struct ofputil_flow_removed* fentry) {
handleMessage(msgType, msg,
[this](uint32_t table_id) -> flowCounterState_t* {
if (table_id == IntFlowManager::POL_TABLE_ID)
return &contractState;
else
return NULL;
- });
+ }, fentry);
}
} /* namespace opflexagent */
}
void
-FlowExecutor::Handle(SwitchConnection *, int msgType, ofpbuf *msg) {
+FlowExecutor::Handle(SwitchConnection *,
+ int msgType,
+ ofpbuf *msg,
+ struct ofputil_flow_removed*) {
ofp_header *msgHdr = (ofp_header *)msg->data;
ovs_be32 recvXid = msgHdr->xid;
return (err == 0);
}
-void FlowReader::Handle(SwitchConnection*, int msgType, ofpbuf *msg) {
+void FlowReader::Handle(SwitchConnection*,
+ int msgType,
+ ofpbuf *msg,
+ struct ofputil_flow_removed * fentry) {
if (msgType == OFPTYPE_FLOW_STATS_REPLY) {
handleReply<FlowEntryList, FlowCb, FlowCbMap>(msg, flowRequests);
} else if (msgType == OFPTYPE_GROUP_DESC_STATS_REPLY) {
}
void InterfaceStatsManager::Handle(SwitchConnection* connection,
- int msgType, ofpbuf *msg) {
+ int msgType, ofpbuf *msg,
+ struct ofputil_flow_removed* fentry) {
assert(msgType == OFPTYPE_PORT_STATS_REPLY);
if (!connection || !msg)
return;
* Dispatch packet-in messages to the appropriate handlers
*/
void PacketInHandler::Handle(SwitchConnection* conn,
- int msgType, ofpbuf *msg) {
+ int msgType, ofpbuf *msg,
+ struct ofputil_flow_removed* fentry) {
assert(msgType == OFPTYPE_PACKET_IN);
const struct ofp_header *oh = (ofp_header *)msg->data;
}
void PodSvcStatsManager::Handle(SwitchConnection* connection,
- int msgType, ofpbuf *msg) {
+ int msgType, ofpbuf *msg,
+ struct ofputil_flow_removed *fentry)
+{
handleMessage(msgType, msg,
[this](uint32_t table_id) -> flowCounterState_t* {
if (table_id == IntFlowManager::STATS_TABLE_ID)
return &statsState;
else
return NULL;
- });
+ }, fentry);
}
} /* namespace opflexagent */
}
}
-void PolicyStatsManager::
-handleMessage(int msgType, ofpbuf *msg, const table_map_t& tableMap) {
+void PolicyStatsManager::handleMessage(int msgType,
+ ofpbuf *msg,
+ const table_map_t& tableMap,
+ struct ofputil_flow_removed *fentry) {
+ bool ret = false;
if (msg == (ofpbuf *)NULL) {
LOG(ERROR) << "Unexpected null message";
return;
}
if (msgType == OFPTYPE_FLOW_STATS_REPLY) {
- handleFlowStats(msg, tableMap);
+ std::lock_guard<std::mutex> lock(pstatMtx);
+ ofp_header *msgHdr = (ofp_header *)msg->data;
+ ovs_be32 recvXid = msgHdr->xid;
+ if(txns.find(recvXid) == txns.end()) {
+ return;
+ }
+ ret = handleFlowStats(msg, tableMap);
+ if(ret) {
+ txns.erase(recvXid);
+ }
} else if (msgType == OFPTYPE_FLOW_REMOVED) {
- handleFlowRemoved(msg, tableMap);
+ std::lock_guard<std::mutex> lock(pstatMtx);
+ if(!fentry)
+ return;
+ flowCounterState_t* counterState = tableMap(fentry->table_id);
+ if (!counterState)
+ return;
+ updateNewFlowCounters((uint32_t)ovs_ntohll(fentry->cookie),
+ fentry->priority,
+ (fentry->match),
+ fentry->packet_count,
+ fentry->byte_count,
+ *counterState, true);
} else {
LOG(ERROR) << "Unexpected message type: " << msgType;
- return;
}
-
}
-void PolicyStatsManager::handleFlowStats(ofpbuf *msg, const table_map_t& tableMap) {
+/**
+ * Call this method holding the lock pstatMtx always. Lock has been
+ * moved out of this method to avoid adding more specific locks in the
+ * code path.
+ */
+bool PolicyStatsManager::handleFlowStats(ofpbuf *msg, const table_map_t& tableMap) {
struct ofputil_flow_stats* fentry, fstat;
-
fentry = &fstat;
- std::lock_guard<std::mutex> lock(pstatMtx);
do {
ofpbuf actsBuf;
if (ret != EOF) {
LOG(ERROR) << "Failed to decode flow stats reply: "
<< ovs_strerror(ret);
+ return true;
+ } else {
+ return !ofpmp_more((ofp_header*)msg->header);
}
- break;
} else {
/**
flowCounterState_t* counterState = tableMap(fentry->table_id);
if (!counterState)
- return;
+ return true;
if ((fentry->flags & OFPUTIL_FF_SEND_FLOW_REM) == 0) {
// skip those flow entries that don't have flag set
}
-void PolicyStatsManager::handleFlowRemoved(ofpbuf *msg, const table_map_t& tableMap) {
-
- const struct ofp_header *oh = (ofp_header *)msg->data;
- struct ofputil_flow_removed* fentry, flow_removed;
-
- fentry = &flow_removed;
- std::lock_guard<std::mutex> lock(pstatMtx);
- int ret;
- bzero(fentry, sizeof(struct ofputil_flow_removed));
-
- ret = ofputil_decode_flow_removed(fentry, oh);
- if (ret != 0) {
- LOG(ERROR) << "Failed to decode flow removed message: "
- << ovs_strerror(ret);
- return;
- } else {
- /* ovs 2.11.2 specific changes. Check handleFlowStats for
- * more comments on below fix */
- if (fentry) {
- fentry->match.flow.packet_type = 0;
- fentry->match.wc.masks.packet_type = 0;
- }
-
- flowCounterState_t* counterState = tableMap(fentry->table_id);
- if (!counterState)
- return;
- updateNewFlowCounters((uint32_t)ovs_ntohll(fentry->cookie),
- fentry->priority,
- (fentry->match),
- fentry->packet_count,
- fentry->byte_count,
- *counterState, true);
- }
-}
-
void PolicyStatsManager::sendRequest(uint32_t table_id, uint64_t _cookie,
uint64_t _cookie_mask) {
// send port stats request again
OfpBuf req(ofputil_encode_flow_stats_request(&fsr, proto));
ofpmsg_update_length(req.get());
+ ovs_be32 reqXid = ((ofp_header *)req->data)->xid;
+ txns.insert(reqXid);
int err = connection->SendMessage(req);
if (err != 0) {
}
void
-PortMapper::Handle(SwitchConnection*, int msgType, ofpbuf *msg) {
+PortMapper::Handle(SwitchConnection*,
+ int msgType,
+ ofpbuf *msg,
+ struct ofputil_flow_removed*) {
switch (msgType) {
case OFPTYPE_PORT_DESC_STATS_REPLY:
HandlePortDescReply(msg);
}
void SecGrpStatsManager::Handle(SwitchConnection* connection,
- int msgType, ofpbuf *msg) {
+ int msgType,
+ ofpbuf *msg,
+ struct ofputil_flow_removed* fentry) {
handleMessage(msgType, msg,
[this](uint32_t table_id) -> flowCounterState_t* {
switch (table_id) {
default:
return NULL;
}
- });
+ }, fentry);
}
} /* namespace opflexagent */
#include <openvswitch/vconn.h>
#include <openvswitch/ofp-msgs.h>
#include <openvswitch/ofp-packet.h>
+#include <openvswitch/ofp-monitor.h>
}
typedef std::lock_guard<std::mutex> mutex_guard;
namespace opflexagent {
+int SwitchConnection::DecodeFlowRemoved(ofpbuf *msg,
+ struct ofputil_flow_removed* fentry) {
+ const struct ofp_header *oh = (ofp_header *)msg->data;
+ int ret;
+ bzero(fentry, sizeof(struct ofputil_flow_removed));
+
+ ret = ofputil_decode_flow_removed(fentry, oh);
+ if (ret != 0) {
+ LOG(ERROR) << "Failed to decode flow removed message: "
+ << ovs_strerror(ret);
+ return ret;
+ } else {
+ /* ovs 2.11.2 specific changes. Check handleFlowStats for
+ * more comments on below fix */
+ if (fentry) {
+ fentry->match.flow.packet_type = 0;
+ fentry->match.wc.masks.packet_type = 0;
+ }
+ }
+ return ret;
+}
+
SwitchConnection::SwitchConnection(const std::string& swName) :
switchName(swName), ofConn(NULL) {
connThread = NULL;
} else {
ofptype type;
if (!ofptype_decode(&type, (ofp_header *)recvMsg->data)) {
+ struct ofputil_flow_removed flow_removed;
+ if(type == OFPTYPE_FLOW_REMOVED) {
+ if(DecodeFlowRemoved(recvMsg, &flow_removed) != 0) {
+ break;
+ }
+ }
HandlerMap::const_iterator itr = msgHandlers.find(type);
if (itr != msgHandlers.end()) {
for (MessageHandler *h : itr->second) {
- h->Handle(this, type, recvMsg);
+ h->Handle(this, type, recvMsg, &flow_removed);
}
}
}
void
SwitchConnection::EchoRequestHandler::Handle(SwitchConnection *swConn,
- int, ofpbuf *msg) {
+ int, ofpbuf *msg,
+ struct ofputil_flow_removed*) {
const ofp_header *rq = (const ofp_header *)msg->data;
OfpBuf echoReplyMsg(ofputil_encode_echo_reply(rq));
swConn->SendMessage(echoReplyMsg);
void
SwitchConnection::EchoReplyHandler::Handle(SwitchConnection *swConn,
- int, ofpbuf *msg) {
+ int, ofpbuf *msg,
+ struct ofputil_flow_removed*) {
mutex_guard lock(swConn->connMtx);
swConn->lastEchoTime = std::chrono::steady_clock::now();
}
void
SwitchConnection::ErrorHandler::Handle(SwitchConnection*, int,
- ofpbuf *msg) {
+ ofpbuf *msg,
+ struct ofputil_flow_removed*) {
const struct ofp_header *oh = (ofp_header *)msg->data;
ofperr err = ofperr_decode_msg(oh, NULL);
LOG(ERROR) << "Got error reply from switch ("
prometheusManager.addTableDropGauge(connection->getSwitchName(),
tbl_it.second.first);
#endif
+ CurrentDropCounterState[tbl_it.first];
+ auto &counter = TableDropCounterState[tbl_it.first];
+ counter.packet_count = boost::make_optional(false, 0);
+ counter.byte_count = boost::make_optional(false, 0);
}
PolicyStatsManager::start(register_listener);
{
}
void BaseTableDropStatsManager::Handle(SwitchConnection* connection,
- int msgType, ofpbuf *msg) {
+ int msgType,
+ ofpbuf *msg,
+ struct ofputil_flow_removed* fentry) {
handleMessage(msgType, msg,
- [this](uint32_t table_id) -> flowCounterState_t* {
- if(tableDescMap.find(table_id)!= tableDescMap.end())
- return &CurrentDropCounterState[table_id];
- else
- return NULL;
- });
+ [this](uint32_t table_id) -> flowCounterState_t* {
+ if(tableDescMap.find(table_id)!= tableDescMap.end())
+ return &CurrentDropCounterState[table_id];
+ else
+ return NULL;
+ }, fentry);
}
} /* namespace opflexagent */
/* Interface: MessageListener */
void Handle(SwitchConnection* connection,
- int msgType, ofpbuf *msg) override;
+ int msgType,
+ ofpbuf *msg,
+ struct ofputil_flow_removed* fentry=NULL) override;
void updatePolicyStatsCounters(const std::string& srcEpg,
const std::string& dstEpg,
void UninstallListenersForConnection(SwitchConnection *conn);
/** Interface: MessageHandler */
- void Handle(SwitchConnection *conn, int msgType, ofpbuf *msg);
+ void Handle(SwitchConnection *conn,
+ int msgType,
+ ofpbuf *msg,
+ struct ofputil_flow_removed* fentry=NULL);
/** Interface: OnConnectListener */
void Connected(SwitchConnection *swConn);
virtual bool getTlvs(const TlvCb& cb);
/* Interface: MessageHandler */
- void Handle(SwitchConnection *c, int msgType, ofpbuf *msg);
+ void Handle(SwitchConnection *c,
+ int msgType,
+ ofpbuf *msg,
+ struct ofputil_flow_removed *fentry=NULL);
/**
* Clear the state of the flow reader
void stop();
// see: MessageHandler
- void Handle(SwitchConnection *swConn, int type, ofpbuf *msg);
+ void Handle(SwitchConnection *swConn,
+ int type,
+ ofpbuf *msg,
+ struct ofputil_flow_removed* fentry=NULL);
private:
Agent* agent;
// MessageHandler
// **************
- virtual void Handle(SwitchConnection *swConn, int msgType,
- ofpbuf *msg);
+ virtual void Handle(SwitchConnection *swConn,
+ int msgType,
+ ofpbuf *msg,
+ struct ofputil_flow_removed* fentry=NULL);
private:
Agent& agent;
void parseLog(unsigned char *buf , std::size_t length);
protected:
+ ///@{
+ /** Member names are self-explanatory */
boost::asio::io_service &server_io;
boost::asio::io_service &client_io;
std::unique_ptr<UdpServer> socketListener;
static const unsigned maxOutstandingEvents=30;
friend UdpServer;
friend LocalClient;
+ ///@}
};
} /* namespace opflexagent */
* Interface: MessageListener
*/
void Handle(SwitchConnection* connection,
- int msgType, ofpbuf *msg) override;
+ int msgType,
+ ofpbuf *msg,
+ struct ofputil_flow_removed* fentry=NULL) override;
/** Interface: ObjectListener */
// Note: This is used to delete observer MOs.
#include <unordered_map>
#include <mutex>
#include <functional>
+#include <unordered_set>
#pragma once
#ifndef OPFLEXAGENT_POLICYSTATSMANAGER_H
*/
static const int MAX_AGE = 9;
+ /**
+ * This is a test method. Do not use in production
+ * @param txn_id: transaction id to simulate send
+ */
+ void testInjectTxnId(uint32_t txn_id) {
+ txns.insert(txn_id);
+ }
protected:
/**
* Type used as a key for Policy counter maps
/**
* handle the OpenFlow message provided using the given table map
*/
- void handleMessage(int msgType, ofpbuf *msg, const table_map_t& tableMap);
+ void handleMessage(int msgType,
+ ofpbuf *msg,
+ const table_map_t& tableMap,
+ struct ofputil_flow_removed* fentry=NULL);
/**
* Handle a drop stats message
*/
std::atomic<bool> stopping;
+ /**
+ * Transaction Id set for tracking request/replies
+ */
+ std::unordered_set<uint32_t> txns;
+
+
private:
- void handleFlowStats(ofpbuf *msg, const table_map_t& tableMap);
- void handleFlowRemoved(ofpbuf *msg, const table_map_t& tableMap);
+ bool handleFlowStats(ofpbuf *msg, const table_map_t& tableMap);
};
void UninstallListenersForConnection(SwitchConnection *conn);
/** Interface: MessageHandler */
- void Handle(SwitchConnection *swConn, int type, ofpbuf *msg);
+ void Handle(SwitchConnection *swConn,
+ int type,
+ ofpbuf *msg,
+ struct ofputil_flow_removed* fentry=NULL);
/** Interface: OnConnectListener */
void Connected(SwitchConnection *swConn);
/* Interface: MessageListener */
void Handle(SwitchConnection* connection,
- int msgType, ofpbuf *msg) override;
+ int msgType,
+ ofpbuf *msg,
+ struct ofputil_flow_removed* fentry=NULL) override;
void updatePolicyStatsCounters(const std::string& l24Classifier,
FlowStats_t& newVals1,
struct vconn;
struct ofpbuf;
class OfpBuf;
+struct ofputil_flow_removed;
namespace opflexagent {
* @param swConn Connection where message was received
* @param msgType Type of the received message
* @param msg The received message
+ * @param fentry decoded flow_removed message
*/
- virtual void Handle(SwitchConnection *swConn, int msgType,
- struct ofpbuf *msg) = 0;
+ virtual void Handle(SwitchConnection *swConn,
+ int msgType,
+ struct ofpbuf *msg,
+ struct ofputil_flow_removed *fentry=NULL) = 0;
};
/**
*/
class SwitchConnection {
public:
+ /**
+ * Parse the received flow_removed message and notify listeners.
+ * This message requires central handling since this is
+ * received asynchronously from the switch. We cannot defer handling
+ * to the listeners, because the first listener will consume this packet.
+ * @param msg the openflow message received
+ * @param fentry structure to hold the parsed flow
+ * @return 0 if decode successful non-zero otherwise
+ */
+ static int DecodeFlowRemoved(ofpbuf *msg,
+ struct ofputil_flow_removed* fentry);
/**
* Construct a new switch connection to the given switch name
* @param swName the name of the OVS bridge to connect to
* Needed to keep the connection to switch alive.
*/
class EchoRequestHandler : public MessageHandler {
- void Handle(SwitchConnection *swConn, int type, struct ofpbuf *msg);
+ void Handle(SwitchConnection *swConn,
+ int type,
+ struct ofpbuf *msg,
+ struct ofputil_flow_removed* fentry=NULL);
};
EchoRequestHandler echoReqHandler;
* Needed to keep the connection to switch alive.
*/
class EchoReplyHandler : public MessageHandler {
- void Handle(SwitchConnection *swConn, int type, struct ofpbuf *msg);
+ void Handle(SwitchConnection *swConn,
+ int type,
+ struct ofpbuf *msg,
+ struct ofputil_flow_removed* fentry=NULL);
};
EchoReplyHandler echoRepHandler;
* @brief Handle errors from the switch by logging.
*/
class ErrorHandler : public MessageHandler {
- void Handle(SwitchConnection *swConn, int type, struct ofpbuf *msg);
+ void Handle(SwitchConnection *swConn,
+ int type,
+ struct ofpbuf *msg,
+ struct ofputil_flow_removed* fentry=NULL);
};
ErrorHandler errorHandler;
/* Interface: MessageListener */
void Handle(SwitchConnection* connection,
- int msgType, ofpbuf *msg) override;
+ int msgType,
+ ofpbuf *msg,
+ struct ofputil_flow_removed* fentry=NULL) override;
void handleTableDropStats(struct ofputil_flow_stats* fentry) override;
class EchoReplyHandler : public MessageHandler {
public:
EchoReplyHandler() : counter(0) {}
- void Handle(SwitchConnection*, int type, ofpbuf*) {
+ void Handle(SwitchConnection*,
+ int type,
+ ofpbuf*,
+ struct ofputil_flow_removed *) {
BOOST_CHECK(type == OFPTYPE_ECHO_REPLY);
++counter;
}
entryList);
LOG(DEBUG) << "1 makeFlowRemovedMessage_2 created";
BOOST_REQUIRE(res_msg!=0);
+ struct ofputil_flow_removed fentry;
+ SwitchConnection::DecodeFlowRemoved(res_msg, &fentry);
contractStatsManager.Handle(&integrationPortConn,
- OFPTYPE_FLOW_REMOVED, res_msg);
+ OFPTYPE_FLOW_REMOVED, res_msg,
+ &fentry);
LOG(DEBUG) << "1 makeFlowRemovedMessage_2 handled";
ofpbuf_delete(res_msg);
entryList);
BOOST_REQUIRE(res_msg!=0);
LOG(DEBUG) << "1 makeFlowStatsReplyMessage successful";
+ ofp_header *msgHdr = (ofp_header *)res_msg->data;
+ statsManager->testInjectTxnId(msgHdr->xid);
// send first flow stats reply message
statsManager->Handle(&portConn,
entryList);
BOOST_REQUIRE(res_msg!=0);
LOG(DEBUG) << "2 makeFlowStatReplyMessage successful";
+ msgHdr = (ofp_header *)res_msg->data;
+ statsManager->testInjectTxnId(msgHdr->xid);
// send second flow stats reply message
statsManager->Handle(&portConn,
entryList1);
BOOST_REQUIRE(res_msg!=0);
LOG(DEBUG) << "1 makeFlowRemovedMessage successful";
+ struct ofputil_flow_removed fentry;
+ SwitchConnection::DecodeFlowRemoved(res_msg, &fentry);
// send first flow stats reply message
statsManager->Handle(&portConn,
OFPTYPE_FLOW_REMOVED,
- res_msg);
+ res_msg,
+ &fentry);
LOG(DEBUG) << "1 FlowRemovedMessage handling successful";
ofpbuf_delete(res_msg);
entryList2);
BOOST_REQUIRE(res_msg!=0);
LOG(DEBUG) << "2 makeFlowRemovedMessage successful";
+ SwitchConnection::DecodeFlowRemoved(res_msg, &fentry);
// send first flow stats reply message
statsManager->Handle(&portConn,
OFPTYPE_FLOW_REMOVED,
- res_msg);
+ res_msg,
+ &fentry);
LOG(DEBUG) << "2 FlowRemovedMessage handling successful";
ofpbuf_delete(res_msg);
AccessFlowManager::SEC_GROUP_IN_TABLE_ID,
entryList);
BOOST_REQUIRE(res_msg!=0);
+ struct ofputil_flow_removed fentry;
+ SwitchConnection::DecodeFlowRemoved(res_msg, &fentry);
secGrpStatsManager.Handle(&accPortConn,
- OFPTYPE_FLOW_REMOVED, res_msg);
+ OFPTYPE_FLOW_REMOVED, res_msg, &fentry);
secGrpStatsManager.on_timer(ec);
ofpbuf_delete(res_msg);
BOOST_REQUIRE(res_msg!=0);
secGrpStatsManager.Handle(&accPortConn,
- OFPTYPE_FLOW_REMOVED, res_msg);
+ OFPTYPE_FLOW_REMOVED, res_msg, &fentry);
ofpbuf_delete(res_msg);
res_msg =
makeFlowRemovedMessage_2(&accPortConn,
AccessFlowManager::SEC_GROUP_OUT_TABLE_ID,
entryList1);
BOOST_REQUIRE(res_msg!=0);
+ SwitchConnection::DecodeFlowRemoved(res_msg, &fentry);
secGrpStatsManager.Handle(&accPortConn,
- OFPTYPE_FLOW_REMOVED, res_msg);
+ OFPTYPE_FLOW_REMOVED, res_msg, &fentry);
ofpbuf_delete(res_msg);
res_msg =
makeFlowRemovedMessage_2(&accPortConn,
AccessFlowManager::SEC_GROUP_OUT_TABLE_ID,
entryList1);
BOOST_REQUIRE(res_msg!=0);
+ SwitchConnection::DecodeFlowRemoved(res_msg, &fentry);
secGrpStatsManager.Handle(&accPortConn,
- OFPTYPE_FLOW_REMOVED, res_msg);
+ OFPTYPE_FLOW_REMOVED, res_msg, &fentry);
ofpbuf_delete(res_msg);
// Call on_timer function to process the stats collected
// and generate Genie objects for stats
entryList);
LOG(DEBUG) << "1 makeFlowStatReplyMessage created";
BOOST_REQUIRE(res_msg!=0);
+ ofp_header *msgHdr = (ofp_header *)res_msg->data;
+ statsManager->testInjectTxnId(msgHdr->xid);
// send first flow stats reply message
statsManager->Handle(&portConn,
(statCount+1),
table_id,
entryList);
+ msgHdr = (ofp_header *)res_msg->data;
+ statsManager->testInjectTxnId(msgHdr->xid);
// send second flow stats reply message
statsManager->Handle(&portConn,
OFPTYPE_FLOW_STATS_REPLY, res_msg);
entryList);
LOG(DEBUG) << "1 makeFlowStatReplyMessage created";
BOOST_REQUIRE(res_msg!=0);
+ ofp_header *msgHdr = (ofp_header *)res_msg->data;
+ statsManager->testInjectTxnId(msgHdr->xid);
// send first flow stats reply message
statsManager->Handle(&portConn,
entryList);
LOG(DEBUG) << "2 makeFlowStatReplyMessage created";
BOOST_REQUIRE(res_msg!=0);
+ msgHdr = (ofp_header *)res_msg->data;
+ statsManager->testInjectTxnId(msgHdr->xid);
// send second flow stats reply message
statsManager->Handle(&portConn,