Provide a way to disable reporting of specific classes of
[opflex.git] / agent-ovs / ovs / FlowReader.cpp
1 /*
2  * Implementation of FlowReader class
3  *
4  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
5  *
6  * This program and the accompanying materials are made available under the
7  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
8  * and is available at http://www.eclipse.org/legal/epl-v10.html
9  */
10
11 #include <opflexagent/logging.h>
12 #include "FlowReader.h"
13 #include "ActionBuilder.h"
14 #include "ovs-shim.h"
15 #include "ovs-ofputil.h"
16
17 #include <openvswitch/list.h>
18 #include <lib/util.h>
19 extern "C" {
20 #include <openvswitch/ofp-msgs.h>
21 }
22
23 typedef std::lock_guard<std::mutex> mutex_guard;
24
25 namespace opflexagent {
26
27 FlowReader::FlowReader() : swConn(NULL) {
28 }
29
30 FlowReader::~FlowReader() {
31 }
32
33 void FlowReader::installListenersForConnection(SwitchConnection *conn) {
34     swConn = conn;
35     conn->RegisterMessageHandler(OFPTYPE_FLOW_STATS_REPLY, this);
36     conn->RegisterMessageHandler(OFPTYPE_GROUP_DESC_STATS_REPLY, this);
37 }
38
39 void FlowReader::uninstallListenersForConnection(SwitchConnection *conn) {
40     conn->UnregisterMessageHandler(OFPTYPE_FLOW_STATS_REPLY, this);
41     conn->RegisterMessageHandler(OFPTYPE_GROUP_DESC_STATS_REPLY, this);
42 }
43
44 void FlowReader::clear() {
45     mutex_guard lock(reqMtx);
46     flowRequests.clear();
47     groupRequests.clear();
48 }
49
50 bool FlowReader::getFlows(uint8_t tableId, const FlowCb& cb) {
51     return getFlows(tableId, NULL, cb);
52 }
53
54 bool FlowReader::getFlows(uint8_t tableId, match* m, const FlowCb& cb) {
55     OfpBuf req(createFlowRequest(tableId, m));
56     return sendRequest<FlowCb, FlowCbMap>(req, cb, flowRequests);
57 }
58
59 bool FlowReader::getGroups(const GroupCb& cb) {
60     OfpBuf req(createGroupRequest());
61     return sendRequest<GroupCb, GroupCbMap>(req, cb, groupRequests);
62 }
63
64 OfpBuf FlowReader::createFlowRequest(uint8_t tableId, match* m) {
65     ofp_version ofVer = (ofp_version)swConn->GetProtocolVersion();
66     ofputil_protocol proto = ofputil_protocol_from_ofp_version(ofVer);
67
68     ofputil_flow_stats_request fsr;
69     fsr.aggregate = false;
70     if (m) {
71         memcpy(&fsr.match, m, sizeof(fsr.match));
72     } else {
73         match_init_catchall(&fsr.match);
74     }
75     fsr.table_id = tableId;
76     fsr.out_port = OFPP_ANY;
77     fsr.out_group = OFPG_ANY;
78     fsr.cookie = fsr.cookie_mask = (uint64_t)0;
79
80     return OfpBuf(ofputil_encode_flow_stats_request(&fsr, proto));
81 }
82
83 OfpBuf FlowReader::createGroupRequest() {
84     return OfpBuf(ofputil_encode_group_desc_request
85                   ((ofp_version)swConn->GetProtocolVersion(), OFPG_ALL));
86 }
87
88 // XXX TODO need a way to time out requests
89 template <typename U, typename V>
90 bool FlowReader::sendRequest(OfpBuf& req, const U& cb, V& reqMap) {
91     ovs_be32 reqXid = ((ofp_header *)req->data)->xid;
92     LOG(DEBUG) << "Sending flow/group read request xid=" << reqXid;
93
94     {
95         mutex_guard lock(reqMtx);
96         reqMap[reqXid] = cb;
97     }
98     int err = swConn->SendMessage(req);
99     if (err != 0) {
100         LOG(ERROR) << "Failed to send flow/group read request: "
101             << ovs_strerror(err);
102         mutex_guard lock(reqMtx);
103         reqMap.erase(reqXid);
104     }
105     return (err == 0);
106 }
107
108 void FlowReader::Handle(SwitchConnection*, int msgType, ofpbuf *msg) {
109     if (msgType == OFPTYPE_FLOW_STATS_REPLY) {
110         handleReply<FlowEntryList, FlowCb, FlowCbMap>(msg, flowRequests);
111     } else if (msgType == OFPTYPE_GROUP_DESC_STATS_REPLY) {
112         handleReply<GroupEdit::EntryList, GroupCb, GroupCbMap>(msg,
113                                                                groupRequests);
114     }
115 }
116
117 template<typename T, typename U, typename V>
118 void FlowReader::handleReply(ofpbuf *msg, V& reqMap) {
119     ofp_header *msgHdr = (ofp_header *)msg->data;
120     ovs_be32 recvXid = msgHdr->xid;
121
122     U cb;
123     {
124         mutex_guard lock(reqMtx);
125         typename V::iterator itr = reqMap.find(recvXid);
126         if (itr == reqMap.end()) {
127             return;
128         }
129         cb = itr->second;
130     }
131
132     T recv;
133     bool replyDone = false;
134
135     decodeReply<T>(msg, recv, replyDone);
136
137     if (replyDone) {
138         mutex_guard lock(reqMtx);
139         reqMap.erase(recvXid);
140     }
141     cb(recv, replyDone);
142 }
143
144 template<>
145 void FlowReader::decodeReply(ofpbuf *msg, FlowEntryList& recvFlows,
146         bool& replyDone) {
147     do {
148         FlowEntryPtr entry(new FlowEntry());
149
150         ofpbuf actsBuf;
151         ofpbuf_init(&actsBuf, 32);
152         int ret = ofputil_decode_flow_stats_reply(entry->entry, msg, false,
153                 &actsBuf);
154         entry->entry->ofpacts = ActionBuilder::getActionsFromBuffer(&actsBuf,
155                 entry->entry->ofpacts_len);
156         ofpbuf_uninit(&actsBuf);
157
158         /* HACK: override the "raw" field so that our comparisons work
159          * properly XXX TODO See if ActionBuilder can construct
160          * actions with proper "raw" type
161          */
162         override_raw_actions(entry->entry->ofpacts, entry->entry->ofpacts_len);
163
164         if (ret != 0) {
165             if (ret == EOF) {
166                 replyDone = !ofpmp_more((ofp_header*)msg->header);
167             } else {
168                 LOG(ERROR) << "Failed to decode flow stats reply: "
169                     << ovs_strerror(ret);
170                 replyDone = true;
171             }
172             break;
173         }
174         recvFlows.push_back(entry);
175         LOG(DEBUG) << "Got flow: " << *entry;
176     } while (true);
177 }
178
179 template<>
180 void FlowReader::decodeReply(ofpbuf *msg, GroupEdit::EntryList& recv,
181         bool& replyDone) {
182     ofp_version ver = (ofp_version)((ofp_header *)msg->data)->version;
183     while (true) {
184         ofputil_group_desc gd;
185         int ret = ofputil_decode_group_desc_reply(&gd, msg, ver);
186         if (ret != 0) {
187             if (ret == EOF) {
188                 replyDone = !ofpmp_more((ofp_header*)msg->header);
189             } else {
190                 LOG(ERROR) << "Failed to decode group desc reply: "
191                     << ovs_strerror(ret);
192                 replyDone = true;
193             }
194             break;
195         }
196
197         GroupEdit::Entry entry(new GroupEdit::GroupMod());
198         entry->mod->group_id = gd.group_id;
199         entry->mod->type = gd.type;
200         ovs_list_move(&entry->mod->buckets, &gd.buckets);
201         recv.push_back(entry);
202
203         LOG(DEBUG) << "Got group: " << entry;
204      }
205 }
206
207 }   // namespace opflexagent
208