Infra fix for drop stats missing 67/88567/7
authorKiran Shastri <shastrinator@gmail.com>
Sat, 21 Mar 2020 06:50:11 +0000 (23:50 -0700)
committerTom Flynn <tom.flynn@gmail.com>
Sun, 22 Mar 2020 19:00:05 +0000 (19:00 +0000)
Any flow stats reply should be consumed by the corrresponding listener
who sent a flow stats request.
Flow stats removed notification is asynchronous and should be delivered to
all stats clients.
Fixed some doxygen warnings.

TODO: Add test cases for DropStats
Change-Id: I2be16c348741d7e37f226919a213ee6a84860c1d
Signed-off-by: Kiran Shastri <shastrinator@gmail.com>
28 files changed:
agent-ovs/ovs/ContractStatsManager.cpp
agent-ovs/ovs/FlowExecutor.cpp
agent-ovs/ovs/FlowReader.cpp
agent-ovs/ovs/InterfaceStatsManager.cpp
agent-ovs/ovs/PacketInHandler.cpp
agent-ovs/ovs/PodSvcStatsManager.cpp
agent-ovs/ovs/PolicyStatsManager.cpp
agent-ovs/ovs/PortMapper.cpp
agent-ovs/ovs/SecGrpStatsManager.cpp
agent-ovs/ovs/SwitchConnection.cpp
agent-ovs/ovs/TableDropStatsManager.cpp
agent-ovs/ovs/include/ContractStatsManager.h
agent-ovs/ovs/include/FlowExecutor.h
agent-ovs/ovs/include/FlowReader.h
agent-ovs/ovs/include/InterfaceStatsManager.h
agent-ovs/ovs/include/PacketInHandler.h
agent-ovs/ovs/include/PacketLogHandler.h
agent-ovs/ovs/include/PodSvcStatsManager.h
agent-ovs/ovs/include/PolicyStatsManager.h
agent-ovs/ovs/include/PortMapper.h
agent-ovs/ovs/include/SecGrpStatsManager.h
agent-ovs/ovs/include/SwitchConnection.h
agent-ovs/ovs/include/TableDropStatsManager.h
agent-ovs/ovs/integration-test/connection_test.cpp
agent-ovs/ovs/test/ContractStatsManager_test.cpp
agent-ovs/ovs/test/PodSvcStatsManager_test.cpp
agent-ovs/ovs/test/SecGrpStatsManager_test.cpp
agent-ovs/ovs/test/include/PolicyStatsManagerFixture.h

index 0be8d2d4c03193a4be07a6d772c381f7b8595804..92685770aaf1e4b6b4e781658f5cdc1fd35daefa 100644 (file)
@@ -303,14 +303,16 @@ void ContractStatsManager::objectUpdated(opflex::modb::class_id_t class_id,
 }
 
 void ContractStatsManager::Handle(SwitchConnection* connection,
-                                  int msgType, ofpbuf *msg) {
+                                  int msgType,
+                                  ofpbuf *msg,
+                                  struct ofputil_flow_removed* fentry) {
     handleMessage(msgType, msg,
                   [this](uint32_t table_id) -> flowCounterState_t* {
                       if (table_id == IntFlowManager::POL_TABLE_ID)
                           return &contractState;
                       else
                           return NULL;
-                  });
+                  }, fentry);
 }
 
 } /* namespace opflexagent */
index 95b0fac2310977d9badb9ae05b03b4d6364f9e42..cc22a5a1aa6569b192ef489b1bac156880767b35 100644 (file)
@@ -246,7 +246,10 @@ FlowExecutor::WaitOnBarrier(OfpBuf& barrReq) {
 }
 
 void
-FlowExecutor::Handle(SwitchConnection *, int msgType, ofpbuf *msg) {
+FlowExecutor::Handle(SwitchConnection *,
+                     int msgType,
+                     ofpbuf *msg,
+                     struct ofputil_flow_removed*) {
     ofp_header *msgHdr = (ofp_header *)msg->data;
     ovs_be32 recvXid = msgHdr->xid;
 
index ad2e7358299808a5f3fadc79dcf3d53530aaa0f4..3adef785a828ada0096e7844dff01b7e514f0f33 100644 (file)
@@ -117,7 +117,10 @@ bool FlowReader::sendRequest(OfpBuf& req, const U& cb, V& reqMap) {
     return (err == 0);
 }
 
-void FlowReader::Handle(SwitchConnection*, int msgType, ofpbuf *msg) {
+void FlowReader::Handle(SwitchConnection*,
+                        int msgType,
+                        ofpbuf *msg,
+                        struct ofputil_flow_removed * fentry) {
     if (msgType == OFPTYPE_FLOW_STATS_REPLY) {
         handleReply<FlowEntryList, FlowCb, FlowCbMap>(msg, flowRequests);
     } else if (msgType == OFPTYPE_GROUP_DESC_STATS_REPLY) {
index 30a24bfb0cd2f98e670ee13950fc0d3b411353c2..4cf922aa71e8473edadedc56b216c71f210ebab7 100644 (file)
@@ -163,7 +163,8 @@ updateEndpointCounters(const std::string& uuid,
 }
 
 void InterfaceStatsManager::Handle(SwitchConnection* connection,
-                                   int msgType, ofpbuf *msg) {
+                                   int msgType, ofpbuf *msg,
+                                   struct ofputil_flow_removed* fentry) {
     assert(msgType == OFPTYPE_PORT_STATS_REPLY);
     if (!connection || !msg)
         return;
index 76b80e7685f351561fd95dc6342449b6cebc48bf..b0cc8daded300f747ae42233f4e1abb1c31b22de 100644 (file)
@@ -896,7 +896,8 @@ static void handleICMPEchoPktIn(bool v4,
  * Dispatch packet-in messages to the appropriate handlers
  */
 void PacketInHandler::Handle(SwitchConnection* conn,
-                             int msgType, ofpbuf *msg) {
+                             int msgType, ofpbuf *msg,
+                             struct ofputil_flow_removed* fentry) {
     assert(msgType == OFPTYPE_PACKET_IN);
 
     const struct ofp_header *oh = (ofp_header *)msg->data;
index f1670203b580b0f38d893c5b2d131ed0a14de0b4..6e9ae4e2af98137933326328d8a183694fb1ca7c 100644 (file)
@@ -282,14 +282,16 @@ operator()(const PodSvcStatsManager::PodSvcFlowMatchKey_t& k) const noexcept {
 }
 
 void PodSvcStatsManager::Handle(SwitchConnection* connection,
-                                  int msgType, ofpbuf *msg) {
+                                int msgType, ofpbuf *msg,
+                                struct ofputil_flow_removed *fentry)
+{
     handleMessage(msgType, msg,
                   [this](uint32_t table_id) -> flowCounterState_t* {
                       if (table_id == IntFlowManager::STATS_TABLE_ID)
                           return &statsState;
                       else
                           return NULL;
-                  });
+                  }, fentry);
 }
 
 } /* namespace opflexagent */
index be9416e695a44e7e5c9bcb57e7e82f4c5b457a89..71530bbb9af7cf40f7f3ff26fe53afa5a6d3f2fd 100644 (file)
@@ -396,29 +396,53 @@ void PolicyStatsManager::updateNewFlowCounters(uint32_t cookie,
     }
 }
 
-void PolicyStatsManager::
-handleMessage(int msgType, ofpbuf *msg, const table_map_t& tableMap) {
+void PolicyStatsManager::handleMessage(int msgType,
+                                       ofpbuf *msg,
+                                       const table_map_t& tableMap,
+                                       struct ofputil_flow_removed *fentry) {
+    bool ret = false;
     if (msg == (ofpbuf *)NULL) {
         LOG(ERROR) << "Unexpected null message";
         return;
     }
     if (msgType == OFPTYPE_FLOW_STATS_REPLY) {
-        handleFlowStats(msg, tableMap);
+        std::lock_guard<std::mutex> lock(pstatMtx);
+        ofp_header *msgHdr = (ofp_header *)msg->data;
+        ovs_be32 recvXid = msgHdr->xid;
+        if(txns.find(recvXid) == txns.end()) {
+            return;
+        }
+        ret = handleFlowStats(msg, tableMap);
+        if(ret) {
+            txns.erase(recvXid);
+        }
     } else if (msgType == OFPTYPE_FLOW_REMOVED) {
-        handleFlowRemoved(msg, tableMap);
+        std::lock_guard<std::mutex> lock(pstatMtx);
+        if(!fentry)
+            return;
+        flowCounterState_t* counterState = tableMap(fentry->table_id);
+        if (!counterState)
+            return;
+        updateNewFlowCounters((uint32_t)ovs_ntohll(fentry->cookie),
+                              fentry->priority,
+                              (fentry->match),
+                              fentry->packet_count,
+                              fentry->byte_count,
+                              *counterState, true);
     } else {
         LOG(ERROR) << "Unexpected message type: " << msgType;
-        return;
     }
-
 }
 
-void PolicyStatsManager::handleFlowStats(ofpbuf *msg, const table_map_t& tableMap) {
+/**
+ * Call this method holding the lock pstatMtx always. Lock has been
+ * moved out of this method to avoid adding more specific locks in the
+ * code path.
+ */
+bool PolicyStatsManager::handleFlowStats(ofpbuf *msg, const table_map_t& tableMap) {
 
     struct ofputil_flow_stats* fentry, fstat;
-
     fentry = &fstat;
-    std::lock_guard<std::mutex> lock(pstatMtx);
 
     do {
         ofpbuf actsBuf;
@@ -433,8 +457,10 @@ void PolicyStatsManager::handleFlowStats(ofpbuf *msg, const table_map_t& tableMa
             if (ret != EOF) {
                 LOG(ERROR) << "Failed to decode flow stats reply: "
                            << ovs_strerror(ret);
+                return true;
+            } else {
+                return !ofpmp_more((ofp_header*)msg->header);
             }
-            break;
         } else {
 
             /**
@@ -473,7 +499,7 @@ void PolicyStatsManager::handleFlowStats(ofpbuf *msg, const table_map_t& tableMa
 
             flowCounterState_t* counterState = tableMap(fentry->table_id);
             if (!counterState)
-                return;
+                return true;
 
             if ((fentry->flags & OFPUTIL_FF_SEND_FLOW_REM) == 0) {
                 // skip those flow entries that don't have flag set
@@ -505,41 +531,6 @@ void PolicyStatsManager::handleFlowStats(ofpbuf *msg, const table_map_t& tableMa
 
 }
 
-void PolicyStatsManager::handleFlowRemoved(ofpbuf *msg, const table_map_t& tableMap) {
-
-    const struct ofp_header *oh = (ofp_header *)msg->data;
-    struct ofputil_flow_removed* fentry, flow_removed;
-
-    fentry = &flow_removed;
-    std::lock_guard<std::mutex> lock(pstatMtx);
-    int ret;
-    bzero(fentry, sizeof(struct ofputil_flow_removed));
-
-    ret = ofputil_decode_flow_removed(fentry, oh);
-    if (ret != 0) {
-        LOG(ERROR) << "Failed to decode flow removed message: "
-                   << ovs_strerror(ret);
-        return;
-    } else {
-        /* ovs 2.11.2 specific changes. Check handleFlowStats for
-         * more comments on below fix */
-        if (fentry) {
-            fentry->match.flow.packet_type = 0;
-            fentry->match.wc.masks.packet_type = 0;
-        }
-
-        flowCounterState_t* counterState = tableMap(fentry->table_id);
-        if (!counterState)
-                return;
-        updateNewFlowCounters((uint32_t)ovs_ntohll(fentry->cookie),
-                              fentry->priority,
-                              (fentry->match),
-                              fentry->packet_count,
-                              fentry->byte_count,
-                              *counterState, true);
-    }
-}
-
 void PolicyStatsManager::sendRequest(uint32_t table_id, uint64_t _cookie,
         uint64_t _cookie_mask) {
     // send port stats request again
@@ -558,6 +549,8 @@ void PolicyStatsManager::sendRequest(uint32_t table_id, uint64_t _cookie,
 
     OfpBuf req(ofputil_encode_flow_stats_request(&fsr, proto));
     ofpmsg_update_length(req.get());
+    ovs_be32 reqXid = ((ofp_header *)req->data)->xid;
+    txns.insert(reqXid);
 
     int err = connection->SendMessage(req);
     if (err != 0) {
index 5d518623efdde0e6cf37e055da68fa20fc39d77a..14ef6d49c6e191e024686aae4a638b3e82554635 100644 (file)
@@ -82,7 +82,10 @@ PortMapper::Connected(SwitchConnection *conn) {
 }
 
 void
-PortMapper::Handle(SwitchConnection*, int msgType, ofpbuf *msg) {
+PortMapper::Handle(SwitchConnection*,
+                   int msgType,
+                   ofpbuf *msg,
+                   struct ofputil_flow_removed*) {
     switch (msgType) {
     case OFPTYPE_PORT_DESC_STATS_REPLY:
         HandlePortDescReply(msg);
index 9abe84465633573378c648f884d791653cf9cd16..23cee48052c55c4a46f47d9fbd80a31442af64a3 100644 (file)
@@ -176,7 +176,9 @@ void SecGrpStatsManager::objectUpdated(opflex::modb::class_id_t class_id,
 }
 
 void SecGrpStatsManager::Handle(SwitchConnection* connection,
-                                int msgType, ofpbuf *msg) {
+                                int msgType,
+                                ofpbuf *msg,
+                                struct ofputil_flow_removed* fentry) {
     handleMessage(msgType, msg,
                   [this](uint32_t table_id) -> flowCounterState_t* {
                       switch (table_id) {
@@ -187,7 +189,7 @@ void SecGrpStatsManager::Handle(SwitchConnection* connection,
                       default:
                           return NULL;
                       }
-                  });
+                  }, fentry);
 }
 
 } /* namespace opflexagent */
index f50c63ac0feee33edbe95b1629dbc1210f7fa749..c1c4d93d85be2b4d0a1c1f51cb84ed1e81ee152c 100644 (file)
@@ -30,6 +30,7 @@ extern "C" {
 #include <openvswitch/vconn.h>
 #include <openvswitch/ofp-msgs.h>
 #include <openvswitch/ofp-packet.h>
+#include <openvswitch/ofp-monitor.h>
 }
 
 typedef std::lock_guard<std::mutex> mutex_guard;
@@ -40,6 +41,28 @@ const std::chrono::seconds MAX_ECHO_INTERVAL(30);
 
 namespace opflexagent {
 
+int SwitchConnection::DecodeFlowRemoved(ofpbuf *msg,
+        struct ofputil_flow_removed* fentry) {
+    const struct ofp_header *oh = (ofp_header *)msg->data;
+    int ret;
+    bzero(fentry, sizeof(struct ofputil_flow_removed));
+
+    ret = ofputil_decode_flow_removed(fentry, oh);
+    if (ret != 0) {
+        LOG(ERROR) << "Failed to decode flow removed message: "
+                   << ovs_strerror(ret);
+        return ret;
+    } else {
+        /* ovs 2.11.2 specific changes. Check handleFlowStats for
+         * more comments on below fix */
+        if (fentry) {
+            fentry->match.flow.packet_type = 0;
+            fentry->match.wc.masks.packet_type = 0;
+        }
+    }
+    return ret;
+}
+
 SwitchConnection::SwitchConnection(const std::string& swName) :
     switchName(swName), ofConn(NULL) {
     connThread = NULL;
@@ -284,10 +307,16 @@ SwitchConnection::receiveOFMessage() {
         } else {
             ofptype type;
             if (!ofptype_decode(&type, (ofp_header *)recvMsg->data)) {
+                struct ofputil_flow_removed flow_removed;
+                if(type == OFPTYPE_FLOW_REMOVED) {
+                    if(DecodeFlowRemoved(recvMsg, &flow_removed) != 0) {
+                        break;
+                    }
+                }
                 HandlerMap::const_iterator itr = msgHandlers.find(type);
                 if (itr != msgHandlers.end()) {
                     for (MessageHandler *h : itr->second) {
-                        h->Handle(this, type, recvMsg);
+                        h->Handle(this, type, recvMsg, &flow_removed);
                     }
                 }
             }
@@ -358,7 +387,8 @@ SwitchConnection::notifyConnectListeners() {
 
 void
 SwitchConnection::EchoRequestHandler::Handle(SwitchConnection *swConn,
-                                             int, ofpbuf *msg) {
+                                             int, ofpbuf *msg,
+                                             struct ofputil_flow_removed*) {
     const ofp_header *rq = (const ofp_header *)msg->data;
     OfpBuf echoReplyMsg(ofputil_encode_echo_reply(rq));
     swConn->SendMessage(echoReplyMsg);
@@ -366,14 +396,16 @@ SwitchConnection::EchoRequestHandler::Handle(SwitchConnection *swConn,
 
 void
 SwitchConnection::EchoReplyHandler::Handle(SwitchConnection *swConn,
-                                    int, ofpbuf *msg) {
+                                    int, ofpbuf *msg,
+                                    struct ofputil_flow_removed*) {
     mutex_guard lock(swConn->connMtx);
     swConn->lastEchoTime = std::chrono::steady_clock::now();
 }
 
 void
 SwitchConnection::ErrorHandler::Handle(SwitchConnection*, int,
-                                       ofpbuf *msg) {
+                                       ofpbuf *msg,
+                                       struct ofputil_flow_removed*) {
     const struct ofp_header *oh = (ofp_header *)msg->data;
     ofperr err = ofperr_decode_msg(oh, NULL);
     LOG(ERROR) << "Got error reply from switch ("
index 2a92accbad30176991efa7491a5cad2168f5b18c..8c61a957d5e2cd8cfa79d39660a6deb1a4a2951f 100644 (file)
@@ -106,6 +106,10 @@ void BaseTableDropStatsManager::start(bool register_listener) {
        prometheusManager.addTableDropGauge(connection->getSwitchName(),
                                             tbl_it.second.first);
 #endif
+       CurrentDropCounterState[tbl_it.first];
+       auto &counter = TableDropCounterState[tbl_it.first];
+       counter.packet_count = boost::make_optional(false, 0);
+       counter.byte_count = boost::make_optional(false, 0);
     }
     PolicyStatsManager::start(register_listener);
     {
@@ -348,14 +352,16 @@ void BaseTableDropStatsManager::objectUpdated(opflex::modb::class_id_t class_id,
 }
 
 void BaseTableDropStatsManager::Handle(SwitchConnection* connection,
-                                  int msgType, ofpbuf *msg) {
+                                  int msgType,
+                                  ofpbuf *msg,
+                                  struct ofputil_flow_removed* fentry) {
     handleMessage(msgType, msg,
-                  [this](uint32_t table_id) -> flowCounterState_t* {
-                      if(tableDescMap.find(table_id)!= tableDescMap.end())
-                          return &CurrentDropCounterState[table_id];
-                      else
-                          return NULL;
-                  });
+        [this](uint32_t table_id) -> flowCounterState_t* {
+            if(tableDescMap.find(table_id)!= tableDescMap.end())
+                return &CurrentDropCounterState[table_id];
+            else
+                return NULL;
+        }, fentry);
 }
 
 } /* namespace opflexagent */
index a3dd45de0633fa56df5d381f3f3fa4b7ad5e81c2..5104dfb886775c6e1e6288a77371f7f3ecf53abf 100644 (file)
@@ -85,7 +85,9 @@ public:
 
     /* Interface: MessageListener */
     void Handle(SwitchConnection* connection,
-                int msgType, ofpbuf *msg) override;
+                int msgType,
+                ofpbuf *msg,
+                struct ofputil_flow_removed* fentry=NULL) override;
 
     void updatePolicyStatsCounters(const std::string& srcEpg,
                                    const std::string& dstEpg,
index ede516e80844a719e677fd1f400559d714feaa10..68f90bbecfc4ce4afd94132f41dc54f2b41fccff 100644 (file)
@@ -106,7 +106,10 @@ public:
     void UninstallListenersForConnection(SwitchConnection *conn);
 
     /** Interface: MessageHandler */
-    void Handle(SwitchConnection *conn, int msgType, ofpbuf *msg);
+    void Handle(SwitchConnection *conn,
+                int msgType,
+                ofpbuf *msg,
+                struct ofputil_flow_removed* fentry=NULL);
 
     /** Interface: OnConnectListener */
     void Connected(SwitchConnection *swConn);
index 5eea62707b17d4f8f341681f06b9428137b182dd..633d7ec85cab6d1b39409ee69254f541896c240e 100644 (file)
@@ -102,7 +102,10 @@ public:
     virtual bool getTlvs(const TlvCb& cb);
 
     /* Interface: MessageHandler */
-    void Handle(SwitchConnection *c, int msgType, ofpbuf *msg);
+    void Handle(SwitchConnection *c,
+                int msgType,
+                ofpbuf *msg,
+                struct ofputil_flow_removed *fentry=NULL);
 
     /**
      * Clear the state of the flow reader
index eca3333649ea865a64149ed8f69007c43a27919d..6dc83e9f83851b73279260cf354b99f6c671a6cd 100644 (file)
@@ -94,7 +94,10 @@ public:
     void stop();
 
     // see: MessageHandler
-    void Handle(SwitchConnection *swConn, int type, ofpbuf *msg);
+    void Handle(SwitchConnection *swConn,
+                int type,
+                ofpbuf *msg,
+                struct ofputil_flow_removed* fentry=NULL);
 
 private:
     Agent* agent;
index 382c65bda6bac9c1fdce493651b4de3c674bec41..ec3fc67b6fc1800fb80b73a9ce345b94324fc53c 100644 (file)
@@ -75,8 +75,10 @@ public:
     // MessageHandler
     // **************
 
-    virtual void Handle(SwitchConnection *swConn, int msgType,
-                        ofpbuf *msg);
+    virtual void Handle(SwitchConnection *swConn,
+                        int msgType,
+                        ofpbuf *msg,
+                        struct ofputil_flow_removed* fentry=NULL);
 
 private:
     Agent& agent;
index ace763d1b099134f294e6196c1355865c2439be5..230b6cb91a5b8d7466de27df938110fcd28a6e21 100644 (file)
@@ -200,6 +200,8 @@ public:
     void parseLog(unsigned char *buf , std::size_t length);
 
 protected:
+    ///@{
+    /** Member names are self-explanatory */
     boost::asio::io_service &server_io;
     boost::asio::io_service &client_io;
     std::unique_ptr<UdpServer> socketListener;
@@ -217,6 +219,7 @@ protected:
     static const unsigned maxOutstandingEvents=30;
     friend UdpServer;
     friend LocalClient;
+    ///@}
 };
 
 } /* namespace opflexagent */
index a0156e9fb415645eb4dd48e8e4f9c8263c953150..5ce815ea973d9b5fc87b7d23b1765388cd112f27 100644 (file)
@@ -67,7 +67,9 @@ public:
      * Interface: MessageListener 
      */
     void Handle(SwitchConnection* connection,
-                int msgType, ofpbuf *msg) override;
+                int msgType,
+                ofpbuf *msg,
+                struct ofputil_flow_removed* fentry=NULL) override;
 
     /** Interface: ObjectListener */
     // Note: This is used to delete observer MOs.
index 3663b6b1ba9a432eb5703924961049785ac5ee0c..19a7a5226e7e18dae8307d73a92620fdfe268036 100644 (file)
@@ -19,6 +19,7 @@
 #include <unordered_map>
 #include <mutex>
 #include <functional>
+#include <unordered_set>
 
 #pragma once
 #ifndef OPFLEXAGENT_POLICYSTATSMANAGER_H
@@ -140,6 +141,13 @@ public:
      */
     static const int MAX_AGE = 9;
 
+    /**
+     * This is a test method. Do not use in production
+     * @param txn_id: transaction id to simulate send
+     */
+    void testInjectTxnId(uint32_t txn_id) {
+        txns.insert(txn_id);
+    }
 protected:
     /**
      * Type used as a key for Policy counter maps
@@ -415,7 +423,10 @@ protected:
     /**
      * handle the OpenFlow message provided using the given table map
      */
-    void handleMessage(int msgType, ofpbuf *msg, const table_map_t& tableMap);
+    void handleMessage(int msgType,
+                       ofpbuf *msg,
+                       const table_map_t& tableMap,
+                       struct ofputil_flow_removed* fentry=NULL);
 
     /**
      * Handle a drop stats message
@@ -474,9 +485,14 @@ protected:
      */
     std::atomic<bool> stopping;
 
+    /**
+     * Transaction Id set for tracking request/replies
+     */
+    std::unordered_set<uint32_t> txns;
+
+
 private:
-    void handleFlowStats(ofpbuf *msg, const table_map_t& tableMap);
-    void handleFlowRemoved(ofpbuf *msg, const table_map_t& tableMap);
+    bool handleFlowStats(ofpbuf *msg, const table_map_t& tableMap);
 
 };
 
index 0a56c646c7998ef0d104e86085209d3bb3e40812..261cb49d5238e1a68a6e7c7e04231ae0b5080a4e 100644 (file)
@@ -88,7 +88,10 @@ public:
     void UninstallListenersForConnection(SwitchConnection *conn);
 
     /** Interface: MessageHandler */
-    void Handle(SwitchConnection *swConn, int type, ofpbuf *msg);
+    void Handle(SwitchConnection *swConn,
+                int type,
+                ofpbuf *msg,
+                struct ofputil_flow_removed* fentry=NULL);
 
     /** Interface: OnConnectListener */
     void Connected(SwitchConnection *swConn);
index 3a8fa8f2710387eb625a696b70a8e8fc3701ff57..41d909cbfd7e8fe49b9246098881b4a4e8ad2105 100644 (file)
@@ -74,7 +74,9 @@ public:
 
     /* Interface: MessageListener */
     void Handle(SwitchConnection* connection,
-                int msgType, ofpbuf *msg) override;
+                int msgType,
+                ofpbuf *msg,
+                struct ofputil_flow_removed* fentry=NULL) override;
 
     void updatePolicyStatsCounters(const std::string& l24Classifier,
                                    FlowStats_t& newVals1,
index 68d26b0e404793d8698a9e6269fa5be55a2b4ebd..fb83afa6e63d0a68c9c16a1cd959e009aafcfd42 100644 (file)
@@ -20,6 +20,7 @@
 struct vconn;
 struct ofpbuf;
 class OfpBuf;
+struct ofputil_flow_removed;
 
 namespace opflexagent {
 
@@ -35,9 +36,12 @@ public:
      * @param swConn Connection where message was received
      * @param msgType Type of the received message
      * @param msg The received message
+     * @param fentry decoded flow_removed message
      */
-    virtual void Handle(SwitchConnection *swConn, int msgType,
-                        struct ofpbuf *msg) = 0;
+    virtual void Handle(SwitchConnection *swConn,
+                        int msgType,
+                        struct ofpbuf *msg,
+                        struct ofputil_flow_removed *fentry=NULL) = 0;
 };
 
 /**
@@ -62,6 +66,17 @@ public:
  */
 class SwitchConnection {
 public:
+    /**
+     * Parse the received flow_removed message and notify listeners.
+     * This message requires central handling since this is
+     * received asynchronously from the switch. We cannot defer handling
+     * to the listeners, because the first listener will consume this packet.
+     * @param msg the openflow message received
+     * @param fentry structure to hold the parsed flow
+     * @return 0 if decode successful non-zero otherwise
+     */
+    static int DecodeFlowRemoved(ofpbuf *msg,
+            struct ofputil_flow_removed* fentry);
     /**
      * Construct a new switch connection to the given switch name
      * @param swName the name of the OVS bridge to connect to
@@ -212,7 +227,10 @@ private:
      * Needed to keep the connection to switch alive.
      */
     class EchoRequestHandler : public MessageHandler {
-        void Handle(SwitchConnection *swConn, int type, struct ofpbuf *msg);
+        void Handle(SwitchConnection *swConn,
+                    int type,
+                    struct ofpbuf *msg,
+                    struct ofputil_flow_removed* fentry=NULL);
     };
 
     EchoRequestHandler echoReqHandler;
@@ -222,7 +240,10 @@ private:
      * Needed to keep the connection to switch alive.
      */
     class EchoReplyHandler : public MessageHandler {
-        void Handle(SwitchConnection *swConn, int type, struct ofpbuf *msg);
+        void Handle(SwitchConnection *swConn,
+                    int type,
+                    struct ofpbuf *msg,
+                    struct ofputil_flow_removed* fentry=NULL);
     };
 
     EchoReplyHandler echoRepHandler;
@@ -231,7 +252,10 @@ private:
      * @brief Handle errors from the switch by logging.
      */
     class ErrorHandler : public MessageHandler {
-        void Handle(SwitchConnection *swConn, int type, struct ofpbuf *msg);
+        void Handle(SwitchConnection *swConn,
+                    int type,
+                    struct ofpbuf *msg,
+                    struct ofputil_flow_removed* fentry=NULL);
     };
 
     ErrorHandler errorHandler;
index 1b2e4fff8da93cd910f40164df1ebe44b5327204..d7c83babfa0ddf6845f0b23d5d0ec06612788b96 100644 (file)
@@ -71,7 +71,9 @@ public:
 
     /* Interface: MessageListener */
     void Handle(SwitchConnection* connection,
-                int msgType, ofpbuf *msg) override;
+                int msgType,
+                ofpbuf *msg,
+                struct ofputil_flow_removed* fentry=NULL) override;
 
     void handleTableDropStats(struct ofputil_flow_stats* fentry) override;
 
index 5a2506d4fd55b2280e17519051a3373805abc1ff..39c09910c03243779f619c91fa53c9da3a688bca 100644 (file)
@@ -24,7 +24,10 @@ using namespace opflexagent;
 class EchoReplyHandler : public MessageHandler {
 public:
     EchoReplyHandler() : counter(0) {}
-    void Handle(SwitchConnection*, int type, ofpbuf*) {
+    void Handle(SwitchConnection*,
+                int type,
+                ofpbuf*,
+                struct ofputil_flow_removed *) {
         BOOST_CHECK(type == OFPTYPE_ECHO_REPLY);
         ++counter;
     }
index 8ed1d173b24f4ab7bc59a330eaa6fc6b51ef5cc8..7942ea6e9565800c22b26c3489c15a4b35c61544 100644 (file)
@@ -232,9 +232,12 @@ BOOST_FIXTURE_TEST_CASE(testFlowRemoved, ContractStatsManagerFixture) {
                                  entryList);
     LOG(DEBUG) << "1 makeFlowRemovedMessage_2 created";
     BOOST_REQUIRE(res_msg!=0);
+    struct ofputil_flow_removed fentry;
+    SwitchConnection::DecodeFlowRemoved(res_msg, &fentry);
 
     contractStatsManager.Handle(&integrationPortConn,
-                                OFPTYPE_FLOW_REMOVED, res_msg);
+                                OFPTYPE_FLOW_REMOVED, res_msg,
+                                &fentry);
     LOG(DEBUG) << "1 makeFlowRemovedMessage_2 handled";
     ofpbuf_delete(res_msg);
 
index eae88b61f9fa7b126d16b8d90066fd062fc371bd..0f78ee2bc5600509c25c9d933bb846f84fc8faf6 100644 (file)
@@ -378,6 +378,8 @@ PodSvcStatsManagerFixture::testFlowStats (MockConnection& portConn,
                                    entryList);
     BOOST_REQUIRE(res_msg!=0);
     LOG(DEBUG) << "1 makeFlowStatsReplyMessage successful";
+    ofp_header *msgHdr = (ofp_header *)res_msg->data;
+    statsManager->testInjectTxnId(msgHdr->xid);
 
     // send first flow stats reply message
     statsManager->Handle(&portConn,
@@ -403,6 +405,8 @@ PodSvcStatsManagerFixture::testFlowStats (MockConnection& portConn,
                                      entryList);
     BOOST_REQUIRE(res_msg!=0);
     LOG(DEBUG) << "2 makeFlowStatReplyMessage successful";
+    msgHdr = (ofp_header *)res_msg->data;
+    statsManager->testInjectTxnId(msgHdr->xid);
 
     // send second flow stats reply message
     statsManager->Handle(&portConn,
@@ -486,11 +490,14 @@ PodSvcStatsManagerFixture::testFlowRemoved (MockConnection& portConn,
                                    entryList1);
     BOOST_REQUIRE(res_msg!=0);
     LOG(DEBUG) << "1 makeFlowRemovedMessage successful";
+    struct ofputil_flow_removed fentry;
+    SwitchConnection::DecodeFlowRemoved(res_msg, &fentry);
 
     // send first flow stats reply message
     statsManager->Handle(&portConn,
                          OFPTYPE_FLOW_REMOVED,
-                         res_msg);
+                         res_msg,
+                         &fentry);
     LOG(DEBUG) << "1 FlowRemovedMessage handling successful";
     ofpbuf_delete(res_msg);
 
@@ -501,11 +508,13 @@ PodSvcStatsManagerFixture::testFlowRemoved (MockConnection& portConn,
                                    entryList2);
     BOOST_REQUIRE(res_msg!=0);
     LOG(DEBUG) << "2 makeFlowRemovedMessage successful";
+    SwitchConnection::DecodeFlowRemoved(res_msg, &fentry);
 
     // send first flow stats reply message
     statsManager->Handle(&portConn,
                          OFPTYPE_FLOW_REMOVED,
-                         res_msg);
+                         res_msg,
+                         &fentry);
     LOG(DEBUG) << "2 FlowRemovedMessage handling successful";
     ofpbuf_delete(res_msg);
 
index 6cbe4f9cc2163f58075601f9781ebb8284ccb822..2cdf6f22a16974fd78525236ea3b282687da865f 100644 (file)
@@ -151,9 +151,11 @@ BOOST_FIXTURE_TEST_CASE(testFlowRemoved, SecGrpStatsManagerFixture) {
                                  AccessFlowManager::SEC_GROUP_IN_TABLE_ID,
                                  entryList);
     BOOST_REQUIRE(res_msg!=0);
+    struct ofputil_flow_removed fentry;
+    SwitchConnection::DecodeFlowRemoved(res_msg, &fentry);
 
     secGrpStatsManager.Handle(&accPortConn,
-                              OFPTYPE_FLOW_REMOVED, res_msg);
+                              OFPTYPE_FLOW_REMOVED, res_msg, &fentry);
     secGrpStatsManager.on_timer(ec);
 
     ofpbuf_delete(res_msg);
@@ -164,7 +166,7 @@ BOOST_FIXTURE_TEST_CASE(testFlowRemoved, SecGrpStatsManagerFixture) {
     BOOST_REQUIRE(res_msg!=0);
 
     secGrpStatsManager.Handle(&accPortConn,
-                              OFPTYPE_FLOW_REMOVED, res_msg);
+                              OFPTYPE_FLOW_REMOVED, res_msg, &fentry);
     ofpbuf_delete(res_msg);
     res_msg =
         makeFlowRemovedMessage_2(&accPortConn,
@@ -172,9 +174,10 @@ BOOST_FIXTURE_TEST_CASE(testFlowRemoved, SecGrpStatsManagerFixture) {
                                  AccessFlowManager::SEC_GROUP_OUT_TABLE_ID,
                                  entryList1);
     BOOST_REQUIRE(res_msg!=0);
+    SwitchConnection::DecodeFlowRemoved(res_msg, &fentry);
 
     secGrpStatsManager.Handle(&accPortConn,
-                              OFPTYPE_FLOW_REMOVED, res_msg);
+                              OFPTYPE_FLOW_REMOVED, res_msg, &fentry);
     ofpbuf_delete(res_msg);
     res_msg =
         makeFlowRemovedMessage_2(&accPortConn,
@@ -182,9 +185,10 @@ BOOST_FIXTURE_TEST_CASE(testFlowRemoved, SecGrpStatsManagerFixture) {
                                  AccessFlowManager::SEC_GROUP_OUT_TABLE_ID,
                                  entryList1);
     BOOST_REQUIRE(res_msg!=0);
+    SwitchConnection::DecodeFlowRemoved(res_msg, &fentry);
 
     secGrpStatsManager.Handle(&accPortConn,
-                              OFPTYPE_FLOW_REMOVED, res_msg);
+                              OFPTYPE_FLOW_REMOVED, res_msg, &fentry);
     ofpbuf_delete(res_msg);
     // Call on_timer function to process the stats collected
     // and generate Genie objects for stats
index 9e874a336a150d1276bceb574c62ed65f2f62732..f539b791ea68b666fca69abfd915f0c4dd2bd079 100644 (file)
@@ -290,6 +290,8 @@ public:
                                        entryList);
         LOG(DEBUG) << "1 makeFlowStatReplyMessage created";
         BOOST_REQUIRE(res_msg!=0);
+        ofp_header *msgHdr = (ofp_header *)res_msg->data;
+        statsManager->testInjectTxnId(msgHdr->xid);
 
         // send first flow stats reply message
         statsManager->Handle(&portConn,
@@ -306,6 +308,8 @@ public:
                                                  (statCount+1),
                                                  table_id,
                                                  entryList);
+            msgHdr = (ofp_header *)res_msg->data;
+            statsManager->testInjectTxnId(msgHdr->xid);
             // send second flow stats reply message
             statsManager->Handle(&portConn,
                                  OFPTYPE_FLOW_STATS_REPLY, res_msg);
@@ -368,6 +372,8 @@ public:
                                        entryList);
         LOG(DEBUG) << "1 makeFlowStatReplyMessage created";
         BOOST_REQUIRE(res_msg!=0);
+        ofp_header *msgHdr = (ofp_header *)res_msg->data;
+        statsManager->testInjectTxnId(msgHdr->xid);
 
         // send first flow stats reply message
         statsManager->Handle(&portConn,
@@ -382,6 +388,8 @@ public:
                                              entryList);
         LOG(DEBUG) << "2 makeFlowStatReplyMessage created";
         BOOST_REQUIRE(res_msg!=0);
+        msgHdr = (ofp_header *)res_msg->data;
+        statsManager->testInjectTxnId(msgHdr->xid);
 
         // send second flow stats reply message
         statsManager->Handle(&portConn,