2 * Implementation of FlowReader class
4 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
6 * This program and the accompanying materials are made available under the
7 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
8 * and is available at http://www.eclipse.org/legal/epl-v10.html
11 #include <opflexagent/logging.h>
12 #include "FlowReader.h"
13 #include "ActionBuilder.h"
15 #include "ovs-ofputil.h"
17 #include <openvswitch/list.h>
20 #include <openvswitch/ofp-msgs.h>
23 typedef std::lock_guard<std::mutex> mutex_guard;
25 namespace opflexagent {
27 FlowReader::FlowReader() : swConn(NULL) {
30 FlowReader::~FlowReader() {
33 void FlowReader::installListenersForConnection(SwitchConnection *conn) {
35 conn->RegisterMessageHandler(OFPTYPE_FLOW_STATS_REPLY, this);
36 conn->RegisterMessageHandler(OFPTYPE_GROUP_DESC_STATS_REPLY, this);
39 void FlowReader::uninstallListenersForConnection(SwitchConnection *conn) {
40 conn->UnregisterMessageHandler(OFPTYPE_FLOW_STATS_REPLY, this);
41 conn->RegisterMessageHandler(OFPTYPE_GROUP_DESC_STATS_REPLY, this);
44 void FlowReader::clear() {
45 mutex_guard lock(reqMtx);
47 groupRequests.clear();
50 bool FlowReader::getFlows(uint8_t tableId, const FlowCb& cb) {
51 return getFlows(tableId, NULL, cb);
54 bool FlowReader::getFlows(uint8_t tableId, match* m, const FlowCb& cb) {
55 OfpBuf req(createFlowRequest(tableId, m));
56 return sendRequest<FlowCb, FlowCbMap>(req, cb, flowRequests);
59 bool FlowReader::getGroups(const GroupCb& cb) {
60 OfpBuf req(createGroupRequest());
61 return sendRequest<GroupCb, GroupCbMap>(req, cb, groupRequests);
64 OfpBuf FlowReader::createFlowRequest(uint8_t tableId, match* m) {
65 ofp_version ofVer = (ofp_version)swConn->GetProtocolVersion();
66 ofputil_protocol proto = ofputil_protocol_from_ofp_version(ofVer);
68 ofputil_flow_stats_request fsr;
69 fsr.aggregate = false;
71 memcpy(&fsr.match, m, sizeof(fsr.match));
73 match_init_catchall(&fsr.match);
75 fsr.table_id = tableId;
76 fsr.out_port = OFPP_ANY;
77 fsr.out_group = OFPG_ANY;
78 fsr.cookie = fsr.cookie_mask = (uint64_t)0;
80 return OfpBuf(ofputil_encode_flow_stats_request(&fsr, proto));
83 OfpBuf FlowReader::createGroupRequest() {
84 return OfpBuf(ofputil_encode_group_desc_request
85 ((ofp_version)swConn->GetProtocolVersion(), OFPG_ALL));
88 // XXX TODO need a way to time out requests
89 template <typename U, typename V>
90 bool FlowReader::sendRequest(OfpBuf& req, const U& cb, V& reqMap) {
91 ovs_be32 reqXid = ((ofp_header *)req->data)->xid;
92 LOG(DEBUG) << "Sending flow/group read request xid=" << reqXid;
95 mutex_guard lock(reqMtx);
98 int err = swConn->SendMessage(req);
100 LOG(ERROR) << "Failed to send flow/group read request: "
101 << ovs_strerror(err);
102 mutex_guard lock(reqMtx);
103 reqMap.erase(reqXid);
108 void FlowReader::Handle(SwitchConnection*, int msgType, ofpbuf *msg) {
109 if (msgType == OFPTYPE_FLOW_STATS_REPLY) {
110 handleReply<FlowEntryList, FlowCb, FlowCbMap>(msg, flowRequests);
111 } else if (msgType == OFPTYPE_GROUP_DESC_STATS_REPLY) {
112 handleReply<GroupEdit::EntryList, GroupCb, GroupCbMap>(msg,
117 template<typename T, typename U, typename V>
118 void FlowReader::handleReply(ofpbuf *msg, V& reqMap) {
119 ofp_header *msgHdr = (ofp_header *)msg->data;
120 ovs_be32 recvXid = msgHdr->xid;
124 mutex_guard lock(reqMtx);
125 typename V::iterator itr = reqMap.find(recvXid);
126 if (itr == reqMap.end()) {
133 bool replyDone = false;
135 decodeReply<T>(msg, recv, replyDone);
138 mutex_guard lock(reqMtx);
139 reqMap.erase(recvXid);
145 void FlowReader::decodeReply(ofpbuf *msg, FlowEntryList& recvFlows,
148 FlowEntryPtr entry(new FlowEntry());
151 ofpbuf_init(&actsBuf, 32);
152 int ret = ofputil_decode_flow_stats_reply(entry->entry, msg, false,
154 entry->entry->ofpacts = ActionBuilder::getActionsFromBuffer(&actsBuf,
155 entry->entry->ofpacts_len);
156 ofpbuf_uninit(&actsBuf);
158 /* HACK: override the "raw" field so that our comparisons work
159 * properly XXX TODO See if ActionBuilder can construct
160 * actions with proper "raw" type
162 override_raw_actions(entry->entry->ofpacts, entry->entry->ofpacts_len);
166 replyDone = !ofpmp_more((ofp_header*)msg->header);
168 LOG(ERROR) << "Failed to decode flow stats reply: "
169 << ovs_strerror(ret);
174 recvFlows.push_back(entry);
175 LOG(DEBUG) << "Got flow: " << *entry;
180 void FlowReader::decodeReply(ofpbuf *msg, GroupEdit::EntryList& recv,
182 ofp_version ver = (ofp_version)((ofp_header *)msg->data)->version;
184 ofputil_group_desc gd;
185 int ret = ofputil_decode_group_desc_reply(&gd, msg, ver);
188 replyDone = !ofpmp_more((ofp_header*)msg->header);
190 LOG(ERROR) << "Failed to decode group desc reply: "
191 << ovs_strerror(ret);
197 GroupEdit::Entry entry(new GroupEdit::GroupMod());
198 entry->mod->group_id = gd.group_id;
199 entry->mod->type = gd.type;
200 ovs_list_move(&entry->mod->buckets, &gd.buckets);
201 recv.push_back(entry);
203 LOG(DEBUG) << "Got group: " << entry;
207 } // namespace opflexagent