2 * Copyright (c) 2013 Plexxi, Inc. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.affinity.analytics.internal;
11 import java.lang.Short;
12 import java.net.InetAddress;
13 import java.net.UnknownHostException;
14 import java.util.ArrayList;
15 import java.util.Arrays;
16 import java.util.HashMap;
17 import java.util.HashSet;
18 import java.util.List;
20 import java.util.Map.Entry;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
26 import org.opendaylight.affinity.affinity.AffinityGroup;
27 import org.opendaylight.affinity.affinity.AffinityLink;
28 import org.opendaylight.affinity.affinity.IAffinityManager;
29 import org.opendaylight.affinity.analytics.IAnalyticsManager;
30 import org.opendaylight.controller.hosttracker.IfIptoHost;
31 import org.opendaylight.controller.hosttracker.hostAware.HostNodeConnector;
32 import org.opendaylight.controller.sal.core.Host;
33 import org.opendaylight.controller.sal.core.Node;
34 import org.opendaylight.controller.sal.core.NodeConnector;
35 import org.opendaylight.controller.sal.flowprogrammer.Flow;
36 import org.opendaylight.controller.sal.match.Match;
37 import org.opendaylight.controller.sal.match.MatchField;
38 import org.opendaylight.controller.sal.match.MatchType;
39 import org.opendaylight.controller.sal.packet.address.EthernetAddress;
40 import org.opendaylight.controller.sal.reader.FlowOnNode;
41 import org.opendaylight.controller.sal.reader.IReadServiceListener;
42 import org.opendaylight.controller.sal.reader.NodeConnectorStatistics;
43 import org.opendaylight.controller.sal.reader.NodeDescription;
44 import org.opendaylight.controller.sal.reader.NodeTableStatistics;
45 import org.opendaylight.controller.sal.utils.Status;
47 public class AnalyticsManager implements IReadServiceListener, IAnalyticsManager {
49 private static final Logger log = LoggerFactory.getLogger(AnalyticsManager.class);
51 private IAffinityManager affinityManager;
52 private IfIptoHost hostTracker;
54 private Map<MatchField, Host> destinationHostCache;
55 private Map<MatchField, Host> sourceHostCache;
56 private Map<Host, Map<Host, HostStats>> hostsToStats;
58 /* Initialize data structures */
60 this.destinationHostCache = new HashMap<MatchField, Host>();
61 this.sourceHostCache = new HashMap<MatchField, Host>();
62 this.hostsToStats = new HashMap<Host, Map<Host, HostStats>>();
69 log.info("Analytics manager started.!!!");
73 log.info("Analytics manager started.!!!");
79 void setAffinityManager(IAffinityManager a) {
80 this.affinityManager = a;
83 void unsetAffinityManager(IAffinityManager a) {
84 if (this.affinityManager.equals(a))
85 this.affinityManager = null;
88 void setHostTracker(IfIptoHost h) {
92 void unsetHostTracker(IfIptoHost h) {
93 if (this.hostTracker.equals(h))
94 this.hostTracker = null;
97 /* Returns the destination host associated with this flow, if one
98 * exists. Returns null otherwise. */
99 protected Host getDestinationHostFromFlow(Flow flow, Set<HostNodeConnector> hosts) {
100 Match match = flow.getMatch();
101 MatchField dst = null;
103 // Flow has to have DL_DST field or NW_DST field to proceed
104 if (match.isPresent(MatchType.DL_DST))
105 dst = match.getField(MatchType.DL_DST);
106 else if (match.isPresent(MatchType.NW_DST))
107 dst = match.getField(MatchType.NW_DST);
112 Host cacheHit = this.destinationHostCache.get(dst);
113 if (cacheHit != null)
116 // Find the destination host
118 for (HostNodeConnector h : hosts) {
120 // DL_DST => compare on MAC address strings
121 if (match.isPresent(MatchType.DL_DST)) {
122 String dstMac = MatchType.DL_DST.stringify(dst.getValue());
123 String hostMac = ((EthernetAddress) h.getDataLayerAddress()).getMacAddress();
124 if (dstMac.equals(hostMac)) {
126 this.destinationHostCache.put(dst, dstHost); // Add to cache
131 // NW_DST => compare on IP address (of type InetAddress)
132 else if (match.isPresent(MatchType.NW_DST)) {
133 InetAddress hostIP = h.getNetworkAddress();
134 if (dst.getValue().equals(hostIP)) {
136 this.destinationHostCache.put(dst, dstHost); // Add to cache
145 /* Returns the source Host associated with this flow, if one
146 * exists. Returns null otherwise. */
147 protected Host getSourceHostFromFlow(Flow flow, Set<HostNodeConnector> hosts) {
150 Match match = flow.getMatch();
152 // Flow must have IN_PORT field (DL_SRC rarely (never?)
154 if (match.isPresent(MatchType.IN_PORT)) {
155 MatchField inPort = match.getField(MatchType.IN_PORT);
158 Host cacheHit = this.sourceHostCache.get(inPort);
159 if (cacheHit != null)
162 // Find the source host by comparing the NodeConnectors
163 NodeConnector inPortNc = (NodeConnector) inPort.getValue();
164 for (HostNodeConnector h : hosts) {
165 NodeConnector hostNc = h.getnodeConnector();
166 if (hostNc.equals(inPortNc)) {
168 this.sourceHostCache.put(inPort, h); // Add to cache
176 /* Return all protocols used between two sets */
177 protected Set<Byte> getProtocols(Set<Host> srcSet, Set<Host> dstSet) {
178 Set<Byte> protocols = new HashSet<Byte>();
179 for (Host src : srcSet) {
180 for (Host dst : dstSet) {
181 if (this.hostsToStats.get(src) != null &&
182 this.hostsToStats.get(src).get(dst) != null) {
183 protocols.addAll(this.hostsToStats.get(src).get(dst).getProtocols());
190 /* Return the number of bytes transferred between two sets,
191 * per-protocol (across all protocols if protocol is null).*/
192 protected long getByteCount(Set<Host> srcSet, Set<Host> dstSet, Byte protocol) {
194 for (Host src : srcSet) {
195 for (Host dst : dstSet) {
196 if (this.hostsToStats.get(src) != null &&
197 this.hostsToStats.get(src).get(dst) != null) {
198 if (protocol == null)
199 byteCount += this.hostsToStats.get(src).get(dst).getByteCount();
201 byteCount += this.hostsToStats.get(src).get(dst).getByteCount(protocol);
208 /* Returns a map of protocol -> byte counts between two sets */
209 protected Map<Byte, Long> getAllByteCounts(Set<Host> srcSet, Set<Host> dstSet) {
210 Map<Byte, Long> byteCounts = new HashMap<Byte, Long>();
211 Set<Byte> protocols = getProtocols(srcSet, dstSet);
212 for (Byte protocol : protocols)
213 byteCounts.put(protocol, getByteCount(srcSet, dstSet, protocol));
217 /* Returns the packet count between two sets, per-protocol (across
218 * all protocols if protocol is null). */
219 protected long getPacketCount(Set<Host> srcSet, Set<Host> dstSet, Byte protocol) {
220 long packetCount = 0;
221 for (Host src : srcSet) {
222 for (Host dst : dstSet) {
223 if (this.hostsToStats.get(src) != null &&
224 this.hostsToStats.get(src).get(dst) != null) {
225 if (protocol == null)
226 packetCount += this.hostsToStats.get(src).get(dst).getPacketCount();
228 packetCount += this.hostsToStats.get(src).get(dst).getPacketCount(protocol);
235 /* Returns a map of protocol -> packet counts between two sets */
236 protected Map<Byte, Long> getAllPacketCounts(Set<Host> srcSet, Set<Host> dstSet) {
237 Map<Byte, Long> packetCounts = new HashMap<Byte, Long>();
238 Set<Byte> protocols = getProtocols(srcSet, dstSet);
239 for (Byte protocol : protocols)
240 packetCounts.put(protocol, getPacketCount(srcSet, dstSet, protocol));
244 /* Returns the duration of communication between two sets (max
245 * duration of all flows) for a particular protocol (across all
246 * protocols if protocol is null) */
247 protected double getDuration(Set<Host> srcSet, Set<Host> dstSet, Byte protocol) {
248 double maxDuration = 0.0;
249 for (Host src : srcSet) {
250 if (this.hostsToStats.get(src) == null)
252 for (Host dst : dstSet) {
254 if (this.hostsToStats.get(src).get(dst) == null)
256 if (protocol == null)
257 duration = this.hostsToStats.get(src).get(dst).getDuration();
259 duration = this.hostsToStats.get(src).get(dst).getDuration(protocol);
260 if (duration > maxDuration)
261 maxDuration = duration;
267 /* Returns a map of protocol -> (max) duration over that protocol. */
268 protected Map<Byte, Double> getAllDurations(Set<Host> srcSet, Set<Host> dstSet) {
269 Map<Byte, Double> durations = new HashMap<Byte, Double>();
270 Set<Byte> protocols = getProtocols(srcSet, dstSet);
271 for (Byte protocol : protocols)
272 durations.put(protocol, getDuration(srcSet, dstSet, protocol));
276 /* Returns the bit rate between two sets */
277 protected double getBitRate(Set<Host> srcSet, Set<Host> dstSet, Byte protocol) {
278 double duration = getDuration(srcSet, dstSet, protocol);
279 long totalBytes = getByteCount(srcSet, dstSet, protocol);
282 return (totalBytes * 8.0) / duration;
285 /* Returns all bit rates between two sets */
286 protected Map<Byte, Double> getAllBitRates(Set<Host> srcSet, Set<Host> dstSet) {
287 Map<Byte, Double> bitRates = new HashMap<Byte, Double>();
288 Set<Byte> protocols = getProtocols(srcSet, dstSet);
289 for (Byte protocol : protocols)
290 bitRates.put(protocol, getBitRate(srcSet, dstSet, protocol));
294 /* Because the generic stats API relies on having two Set<Host>
295 * arguments, the next series of functions converts various
296 * Objects into Set<Host>. */
298 /* Host -> Set<Host> */
299 private Set<Host> getSet(Host h) {
300 return new HashSet<Host>(Arrays.asList(h));
303 /* AffinityLink -> Set of source Hosts */
304 private Set<Host> getSrcSet(AffinityLink al) {
305 return new HashSet<Host>(this.affinityManager.getAllElementsByHost(al.getToGroup()));
308 /* AffinityLink -> Set of destination Hosts */
309 private Set<Host> getDstSet(AffinityLink al) {
310 return new HashSet<Host>(this.affinityManager.getAllElementsByHost(al.getToGroup()));
313 /* srcSubnet, dstSubnet -> Set of Hosts in srcSubnet. If
314 * srcSubnet is null, will return all hosts *not* in dstSubnet. */
315 private Set<Host> getSrcSet(String srcSubnet, String dstSubnet) {
316 if (srcSubnet == null && dstSubnet == null) {
317 log.debug("Source and destination subnets cannot both be null.");
318 return new HashSet<Host>();
320 if (srcSubnet == null)
321 return getHostsNotInSubnet(dstSubnet);
323 return getHostsInSubnet(srcSubnet);
326 /* srcSubnet, dstSubnet -> Set of Hosts in dstSubnet. This has
327 * the same logic as the previous method, so just flip the
329 private Set<Host> getDstSet(String srcSubnet, String dstSubnet) {
330 return getSrcSet(dstSubnet, srcSubnet);
333 /* Basic getters for host pair statistics */
334 public long getByteCount(Host src, Host dst) { return getByteCount(src, dst, null); }
335 public long getByteCount(Host src, Host dst, Byte protocol) { return getByteCount(getSet(src), getSet(dst), protocol); }
336 public Map<Byte, Long> getAllByteCounts(Host src, Host dst) { return getAllByteCounts(getSet(src), getSet(dst)); }
337 public long getPacketCount(Host src, Host dst) { return getPacketCount(src, dst, null); }
338 public long getPacketCount(Host src, Host dst, Byte protocol) { return getPacketCount(getSet(src), getSet(dst), protocol); }
339 public Map<Byte, Long> getAllPacketCounts(Host src, Host dst) { return getAllPacketCounts(getSet(src), getSet(dst)); }
340 public double getDuration(Host src, Host dst) { return getDuration(src, dst, null); }
341 public double getDuration(Host src, Host dst, Byte protocol) { return getDuration(getSet(src), getSet(dst), protocol); }
342 public Map<Byte, Double> getAllDurations(Host src, Host dst) { return getAllDurations(getSet(src), getSet(dst)); }
343 public double getBitRate(Host src, Host dst) { return getBitRate(src, dst, null); }
344 public double getBitRate(Host src, Host dst, Byte protocol) { return getBitRate(getSet(src), getSet(dst), protocol); }
345 public Map<Byte, Double> getAllBitRates(Host src, Host dst) { return getAllBitRates(getSet(src), getSet(dst)); }
347 /* Basic getters for affinity link statistics */
348 public long getByteCount(AffinityLink al) { return getByteCount(al, null); }
349 public long getByteCount(AffinityLink al, Byte protocol) { return getByteCount(getSrcSet(al), getDstSet(al), protocol); }
350 public Map<Byte, Long> getAllByteCounts(AffinityLink al) { return getAllByteCounts(getSrcSet(al), getDstSet(al)); }
351 public long getPacketCount(AffinityLink al) { return getPacketCount(al, null); }
352 public long getPacketCount(AffinityLink al, Byte protocol) { return getPacketCount(getSrcSet(al), getDstSet(al), protocol); }
353 public Map<Byte, Long> getAllPacketCounts(AffinityLink al) { return getAllPacketCounts(getSrcSet(al), getDstSet(al)); }
354 public double getDuration(AffinityLink al) { return getDuration(al, null); }
355 public double getDuration(AffinityLink al, Byte protocol) { return getDuration(getSrcSet(al), getDstSet(al), protocol); }
356 public Map<Byte, Double> getAllDurations(AffinityLink al) { return getAllDurations(getSrcSet(al), getDstSet(al)); }
357 public double getBitRate(AffinityLink al) { return getBitRate(al, null); }
358 public double getBitRate(AffinityLink al, Byte protocol) { return getBitRate(getSrcSet(al), getDstSet(al), protocol); }
359 public Map<Byte, Double> getAllBitRates(AffinityLink al) { return getAllBitRates(getSrcSet(al), getDstSet(al)); }
361 /* Basic getters for subnet statistics */
362 public long getByteCount(String srcSub, String dstSub) { return getByteCount(srcSub, dstSub, null); }
363 public long getByteCount(String srcSub, String dstSub, Byte protocol) { return getByteCount(getSrcSet(srcSub, dstSub), getDstSet(srcSub, dstSub), protocol); }
364 public Map<Byte, Long> getAllByteCounts(String srcSub, String dstSub) { return getAllByteCounts(getSrcSet(srcSub, dstSub), getDstSet(srcSub, dstSub)); }
365 public long getPacketCount(String srcSub, String dstSub) { return getPacketCount(srcSub, dstSub, null); }
366 public long getPacketCount(String srcSub, String dstSub, Byte protocol) { return getPacketCount(getSrcSet(srcSub, dstSub), getDstSet(srcSub, dstSub), protocol); }
367 public Map<Byte, Long> getAllPacketCounts(String srcSub, String dstSub) { return getAllPacketCounts(getSrcSet(srcSub, dstSub), getDstSet(srcSub, dstSub)); }
368 public double getDuration(String srcSub, String dstSub) { return getDuration(srcSub, dstSub, null); }
369 public double getDuration(String srcSub, String dstSub, Byte protocol) { return getDuration(getSrcSet(srcSub, dstSub), getDstSet(srcSub, dstSub), protocol); }
370 public Map<Byte, Double> getAllDurations(String srcSub, String dstSub) { return getAllDurations(getSrcSet(srcSub, dstSub), getDstSet(srcSub, dstSub)); }
371 public double getBitRate(String srcSub, String dstSub) { return getBitRate(srcSub, dstSub, null); }
372 public double getBitRate(String srcSub, String dstSub, Byte protocol) { return getBitRate(getSrcSet(srcSub, dstSub), getDstSet(srcSub, dstSub), protocol); }
373 public Map<Byte, Double> getAllBitRates(String srcSub, String dstSub) { return getAllBitRates(getSrcSet(srcSub, dstSub), getDstSet(srcSub, dstSub)); }
375 /* Returns all hosts that transferred data into this subnet. */
376 public Map<Host, Long> getIncomingHostByteCounts(String subnet, Byte protocol, Set<HostNodeConnector> allHosts) {
377 Map<Host, Long> hosts = new HashMap<Host, Long>();
378 Set<Host> dstHosts = getHostsInSubnet(subnet, allHosts);
379 Set<Host> otherHosts = getHostsNotInSubnet(subnet, allHosts);
380 for (Host host : otherHosts) {
381 for (Host targetHost : dstHosts) {
382 Long byteCount = getByteCount(host, targetHost, protocol);
384 hosts.put(host, byteCount);
390 public Map<Host, Long> getIncomingHostByteCounts(String subnet) {
391 return getIncomingHostByteCounts(subnet, null);
394 public Map<Host, Long> getIncomingHostByteCounts(String subnet, Byte protocol) {
395 Set<HostNodeConnector> allHosts = this.hostTracker.getAllHosts();
396 return getIncomingHostByteCounts(subnet, protocol, allHosts);
399 /* Hosts in allHosts that are not part of the subnet. This is
400 * useful when we need statistics about, e.g., all data into a
401 * particular subnet (so data from hosts outside of that
403 protected Set<Host> getHostsNotInSubnet(String subnet, Set<HostNodeConnector> allHosts) {
404 Set<Host> hostsInSubnet = getHostsInSubnet(subnet, allHosts);
405 Set<HostNodeConnector> otherHosts = new HashSet<HostNodeConnector>(allHosts); // copy constructor
406 otherHosts.removeAll(hostsInSubnet);
407 Set<Host> hostsNotInSubnet = new HashSet<Host>();
408 for (Host h : otherHosts)
409 hostsNotInSubnet.add(h);
410 return hostsNotInSubnet;
413 private Set<Host> getHostsNotInSubnet(String subnet) {
414 return getHostsNotInSubnet(subnet, this.hostTracker.getAllHosts());
417 /* Returns the set of hosts that are part of this subnet. */
418 protected Set<Host> getHostsInSubnet(String subnet, Set<HostNodeConnector> allHosts) {
421 Set<Host> hosts = new HashSet<Host>();
423 // Split 1.2.3.4/5 format into the subnet (1.2.3.4) and the mask (5)
425 String[] splitSubnet = subnet.split("/");
426 ip = InetAddress.getByName(splitSubnet[0]);
427 mask = (splitSubnet.length == 2) ? Short.valueOf(splitSubnet[1]) : 32;
428 } catch (UnknownHostException e) {
429 log.debug("Incorrect subnet/mask format: " + subnet);
434 InetAddress targetSubnet = getSubnet(ip, mask);
435 for (HostNodeConnector host : allHosts) {
436 InetAddress hostSubnet = getSubnet(host.getNetworkAddress(), mask);
437 if (hostSubnet.equals(targetSubnet))
443 private Set<Host> getHostsInSubnet(String subnet) {
444 return getHostsInSubnet(subnet, this.hostTracker.getAllHosts());
447 /* Get the subnet associated with this IP and mask. */
448 private InetAddress getSubnet(InetAddress ip, Short mask) {
449 byte[] prefix = ip.getAddress();
450 InetAddress newIP = null;
452 int bits = (32 - mask) % 8;
453 int bytes = 4 - ((int) mask / 8);
456 // zero out the bytes
457 for (int i = 1; i <= bytes; i++)
458 prefix[prefix.length - i] = 0x0;
461 prefix[prefix.length - bytes - 1] &= (0xFF << bits);
462 newIP = InetAddress.getByAddress(prefix);
463 } catch (UnknownHostException e) {
469 public void nodeFlowStatisticsUpdated(Node node, List<FlowOnNode> flowStatsList) {
470 nodeFlowStatisticsUpdated(node, flowStatsList, this.hostTracker.getAllHosts());
473 protected void nodeFlowStatisticsUpdated(Node node, List<FlowOnNode> flowStatsList, Set<HostNodeConnector> allHosts) {
474 for (FlowOnNode f : flowStatsList) {
475 Host srcHost = getSourceHostFromFlow(f.getFlow(), allHosts);
476 Host dstHost = getDestinationHostFromFlow(f.getFlow(), allHosts);
478 // Source host being null is okay; it indicates that the
479 // source of this particular flow is a switch, not a host.
481 // TODO: It would be useful, at least for debugging
482 // output, to differentiate between when the source is a
483 // switch and when it's a host that the hosttracker
484 // doesn't know about. The latter would be an error.
485 if (dstHost == null) {
486 log.debug("Error: Destination host is null for Flow " + f.getFlow());
489 else if (srcHost == null) {
490 log.debug("Source host is null for Flow " + f.getFlow() + ". This is NOT necessarily an error.");
494 if (this.hostsToStats.get(srcHost) == null)
495 this.hostsToStats.put(srcHost, new HashMap<Host, HostStats>());
496 if (this.hostsToStats.get(srcHost).get(dstHost) == null)
497 this.hostsToStats.get(srcHost).put(dstHost, new HostStats());
498 this.hostsToStats.get(srcHost).get(dstHost).setStatsFromFlow(f);
503 public void nodeConnectorStatisticsUpdated(Node node, List<NodeConnectorStatistics> ncStatsList) {
504 // Not interested in this update
508 public void nodeTableStatisticsUpdated(Node node, List<NodeTableStatistics> tableStatsList) {
509 // Not interested in this update
513 public void descriptionStatisticsUpdated(Node node, NodeDescription nodeDescription) {
514 // Not interested in this update