2 * Copyright (c) 2013 Plexxi, Inc. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.affinity.analytics.internal;
11 import java.lang.Short;
12 import java.net.InetAddress;
13 import java.net.UnknownHostException;
14 import java.util.ArrayList;
15 import java.util.HashMap;
16 import java.util.HashSet;
17 import java.util.List;
19 import java.util.Map.Entry;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
25 import org.opendaylight.affinity.affinity.AffinityGroup;
26 import org.opendaylight.affinity.affinity.AffinityLink;
27 import org.opendaylight.affinity.affinity.IAffinityManager;
28 import org.opendaylight.affinity.analytics.IAnalyticsManager;
29 import org.opendaylight.controller.hosttracker.IfIptoHost;
30 import org.opendaylight.controller.hosttracker.hostAware.HostNodeConnector;
31 import org.opendaylight.controller.sal.core.Host;
32 import org.opendaylight.controller.sal.core.Node;
33 import org.opendaylight.controller.sal.core.NodeConnector;
34 import org.opendaylight.controller.sal.flowprogrammer.Flow;
35 import org.opendaylight.controller.sal.match.Match;
36 import org.opendaylight.controller.sal.match.MatchField;
37 import org.opendaylight.controller.sal.match.MatchType;
38 import org.opendaylight.controller.sal.packet.address.EthernetAddress;
39 import org.opendaylight.controller.sal.reader.FlowOnNode;
40 import org.opendaylight.controller.sal.reader.IReadServiceListener;
41 import org.opendaylight.controller.sal.reader.NodeConnectorStatistics;
42 import org.opendaylight.controller.sal.reader.NodeDescription;
43 import org.opendaylight.controller.sal.reader.NodeTableStatistics;
44 import org.opendaylight.controller.sal.utils.Status;
46 public class AnalyticsManager implements IReadServiceListener, IAnalyticsManager {
48 private static final Logger log = LoggerFactory.getLogger(AnalyticsManager.class);
50 private IAffinityManager affinityManager;
51 private IfIptoHost hostTracker;
53 private Map<MatchField, Host> destinationHostCache;
54 private Map<MatchField, Host> sourceHostCache;
55 private Map<Host, Map<Host, HostStats>> hostsToStats;
57 /* Initialize data structures */
59 this.destinationHostCache = new HashMap<MatchField, Host>();
60 this.sourceHostCache = new HashMap<MatchField, Host>();
61 this.hostsToStats = new HashMap<Host, Map<Host, HostStats>>();
76 void setAffinityManager(IAffinityManager a) {
77 this.affinityManager = a;
80 void unsetAffinityManager(IAffinityManager a) {
81 if (this.affinityManager.equals(a))
82 this.affinityManager = null;
85 void setHostTracker(IfIptoHost h) {
89 void unsetHostTracker(IfIptoHost h) {
90 if (this.hostTracker.equals(h))
91 this.hostTracker = null;
94 /* Returns the destination host associated with this flow, if one
95 * exists. Returns null otherwise. */
96 protected Host getDestinationHostFromFlow(Flow flow, Set<HostNodeConnector> hosts) {
97 Match match = flow.getMatch();
98 MatchField dst = null;
100 // Flow has to have DL_DST field or NW_DST field to proceed
101 if (match.isPresent(MatchType.DL_DST))
102 dst = match.getField(MatchType.DL_DST);
103 else if (match.isPresent(MatchType.NW_DST))
104 dst = match.getField(MatchType.NW_DST);
109 Host cacheHit = this.destinationHostCache.get(dst);
110 if (cacheHit != null)
113 // Find the destination host
115 for (HostNodeConnector h : hosts) {
117 // DL_DST => compare on MAC address strings
118 if (match.isPresent(MatchType.DL_DST)) {
119 String dstMac = MatchType.DL_DST.stringify(dst.getValue());
120 String hostMac = ((EthernetAddress) h.getDataLayerAddress()).getMacAddress();
121 if (dstMac.equals(hostMac)) {
123 this.destinationHostCache.put(dst, dstHost); // Add to cache
128 // NW_DST => compare on IP address (of type InetAddress)
129 else if (match.isPresent(MatchType.NW_DST)) {
130 InetAddress hostIP = h.getNetworkAddress();
131 if (dst.getValue().equals(hostIP)) {
133 this.destinationHostCache.put(dst, dstHost); // Add to cache
142 /* Returns the source Host associated with this flow, if one
143 * exists. Returns null otherwise. */
144 protected Host getSourceHostFromFlow(Flow flow, Set<HostNodeConnector> hosts) {
147 Match match = flow.getMatch();
149 // Flow must have IN_PORT field (DL_SRC rarely (never?)
151 if (match.isPresent(MatchType.IN_PORT)) {
152 MatchField inPort = match.getField(MatchType.IN_PORT);
155 Host cacheHit = this.sourceHostCache.get(inPort);
156 if (cacheHit != null)
159 // Find the source host by comparing the NodeConnectors
160 NodeConnector inPortNc = (NodeConnector) inPort.getValue();
161 for (HostNodeConnector h : hosts) {
162 NodeConnector hostNc = h.getnodeConnector();
163 if (hostNc.equals(inPortNc)) {
165 this.sourceHostCache.put(inPort, h); // Add to cache
173 /* These are all basic getters/setters, most of which are required
174 * by IAnalyticsManager */
175 public long getByteCount(Host src, Host dst) {
176 return getByteCountBetweenHostsInternal(src, dst, null);
179 public long getByteCount(Host src, Host dst, Byte protocol) {
180 return getByteCountBetweenHostsInternal(src, dst, protocol);
183 public Map<Byte, Long> getAllByteCounts(Host src, Host dst) {
184 if (this.hostsToStats.get(src) == null ||
185 this.hostsToStats.get(src).get(dst) == null)
186 return new HashMap<Byte, Long>();
187 return this.hostsToStats.get(src).get(dst).getAllByteCounts();
190 public double getBitRate(Host src, Host dst) {
191 return getBitRateBetweenHostsInternal(src, dst, null);
194 public double getBitRate(Host src, Host dst, Byte protocol) {
195 return getBitRateBetweenHostsInternal(src, dst, protocol);
198 public Map<Byte, Double> getAllBitRates(Host src, Host dst) {
199 if (this.hostsToStats.get(src) == null ||
200 this.hostsToStats.get(src).get(dst) == null)
201 return new HashMap<Byte, Double>();
202 return this.hostsToStats.get(src).get(dst).getAllBitRates();
205 public long getByteCount(AffinityLink al) {
206 return getByteCountOnAffinityLinkInternal(al, null);
209 public long getByteCount(AffinityLink al, Byte protocol) {
210 return getByteCountOnAffinityLinkInternal(al, protocol);
213 public Map<Byte, Long> getAllByteCounts(AffinityLink al) {
214 Map<Byte, Long> byteCounts = new HashMap<Byte, Long>();
215 Set<Byte> protocols = getProtocols(al);
216 for (Byte protocol : protocols) {
217 Long thisByteCounts = getByteCount(al, protocol);
218 byteCounts.put(protocol, thisByteCounts);
223 public double getBitRate(AffinityLink al) {
224 return getBitRateOnAffinityLinkInternal(al, null);
227 public double getBitRate(AffinityLink al, Byte protocol) {
228 return getBitRateOnAffinityLinkInternal(al, protocol);
231 public Map<Byte, Double> getAllBitRates(AffinityLink al) {
232 Map<Byte, Double> bitRates = new HashMap<Byte, Double>();
233 Set<Byte> protocols = getProtocols(al);
234 for (Byte protocol : protocols)
235 bitRates.put(protocol, getBitRate(al, protocol));
239 public long getByteCount(String srcSubnet, String dstSubnet) {
240 return getByteCountBySubnetInternal(srcSubnet, dstSubnet, null);
243 public long getByteCount(String srcSubnet, String dstSubnet, Byte protocol) {
244 return getByteCountBySubnetInternal(srcSubnet, dstSubnet, protocol);
247 public Map<Byte, Long> getAllByteCounts(String srcSubnet, String dstSubnet) {
248 Map<Byte, Long> byteCounts = new HashMap<Byte, Long>();
249 Set<Byte> protocols = getProtocols(srcSubnet, dstSubnet);
250 for (Byte protocol : protocols) {
251 byteCounts.put(protocol, getByteCount(srcSubnet, dstSubnet, protocol));
256 public double getBitRate(String srcSubnet, String dstSubnet) {
257 return getBitRateBySubnetInternal(srcSubnet, dstSubnet, null);
260 public double getBitRate(String srcSubnet, String dstSubnet, Byte protocol) {
261 return getBitRateBySubnetInternal(srcSubnet, dstSubnet, protocol);
264 public Map<Byte, Double> getAllBitRates(String srcSubnet, String dstSubnet) {
265 Map<Byte, Double> bitRates = new HashMap<Byte, Double>();
266 Set<Byte> protocols = getProtocols(srcSubnet, dstSubnet);
267 for (Byte protocol : protocols)
268 bitRates.put(protocol, getBitRate(srcSubnet, dstSubnet, protocol));
272 public Map<Host, Long> getIncomingHostByteCounts(String subnet) {
273 return getIncomingHostByteCountsInternal(subnet, null);
276 public Map<Host, Long> getIncomingHostByteCounts(String subnet, Byte protocol) {
277 return getIncomingHostByteCountsInternal(subnet, protocol);
280 /* Return byte count between two hosts, either per-protocol or not */
281 private long getByteCountBetweenHostsInternal(Host src, Host dst, Byte protocol) {
283 if (this.hostsToStats.get(src) != null &&
284 this.hostsToStats.get(src).get(dst) != null) {
285 if (protocol == null)
286 byteCount = this.hostsToStats.get(src).get(dst).getByteCount();
288 byteCount = this.hostsToStats.get(src).get(dst).getByteCount(protocol);
293 /* Return the total bit rate between two hosts, either per-protocol or not */
294 private double getBitRateBetweenHostsInternal(Host src, Host dst, Byte protocol) {
296 if (this.hostsToStats.get(src) != null &&
297 this.hostsToStats.get(src).get(dst) != null) {
298 if (protocol == null)
299 bitRate = this.hostsToStats.get(src).get(dst).getBitRate();
301 bitRate = this.hostsToStats.get(src).get(dst).getBitRate(protocol);
306 /* Return the duration between two hosts, either per-protocol or not */
307 private double getDurationBetweenHostsInternal(Host src, Host dst, Byte protocol) {
308 double duration = 0.0;
309 if (this.hostsToStats.get(src) != null &&
310 this.hostsToStats.get(src).get(dst) !=null) {
311 if (protocol == null)
312 duration = this.hostsToStats.get(src).get(dst).getDuration();
314 duration = this.hostsToStats.get(src).get(dst).getDuration(protocol);
319 /* Return the byte count on an affinity link, per-protocol or not */
320 private long getByteCountOnAffinityLinkInternal(AffinityLink al, Byte protocol) {
322 List<Entry<Host, Host>> flows = this.affinityManager.getAllFlowsByHost(al);
323 for (Entry<Host, Host> flow : flows) {
324 Host h1 = flow.getKey();
325 Host h2 = flow.getValue();
326 // This will handle protocol being null
327 b += getByteCountBetweenHostsInternal(h1, h2, protocol);
332 /* Returns bit rate in bits-per-second on an affinity link, per-protocol or not */
333 private double getBitRateOnAffinityLinkInternal(AffinityLink al, Byte protocol) {
334 double duration = getDurationOnAffinityLinkInternal(al, protocol);
335 long totalBytes = getByteCountOnAffinityLinkInternal(al, protocol);
338 return (totalBytes * 8.0) / duration;
341 /* Returns the duration of communication on an affinity link, per-protocol or not */
342 private double getDurationOnAffinityLinkInternal(AffinityLink al, Byte protocol) {
343 double maxDuration = 0.0;
344 for (Entry<Host, Host> flow : this.affinityManager.getAllFlowsByHost(al)) {
345 Host h1 = flow.getKey();
346 Host h2 = flow.getValue();
347 // This will handle protocol being null
348 double duration = getDurationBetweenHostsInternal(h1, h2, protocol);
349 if (duration > maxDuration)
350 maxDuration = duration;
355 /* Return the total bytes for a particular protocol between these subnets. */
356 private long getByteCountBySubnetInternal(String srcSubnet, String dstSubnet, Byte protocol) {
358 if (srcSubnet == null && dstSubnet == null) {
359 log.debug("Source and destination subnets cannot both be null.");
364 if (srcSubnet == null) {
365 dstHosts = getHostsInSubnet(dstSubnet);
366 srcHosts = getHostsNotInSubnet(dstSubnet);
367 } else if (dstSubnet == null) {
368 srcHosts = getHostsInSubnet(srcSubnet);
369 dstHosts = getHostsNotInSubnet(srcSubnet);
371 srcHosts = getHostsInSubnet(srcSubnet);
372 dstHosts = getHostsInSubnet(dstSubnet);
375 for (Host srcHost : srcHosts)
376 for (Host dstHost : dstHosts)
377 totalBytes += getByteCount(srcHost, dstHost, protocol);
381 /* Returns the duration of communication between two subnetes, per-protocol or not */
382 private double getDurationBySubnetInternal(String srcSubnet, String dstSubnet, Byte protocol) {
383 double maxDuration = 0.0;
384 if (srcSubnet == null && dstSubnet == null) {
385 log.debug("Source and destination subnet cannot both be null.");
390 if (srcSubnet == null) {
391 dstHosts = getHostsInSubnet(dstSubnet);
392 srcHosts = getHostsNotInSubnet(dstSubnet);
393 } else if (dstSubnet == null) {
394 srcHosts = getHostsInSubnet(srcSubnet);
395 dstHosts = getHostsNotInSubnet(srcSubnet);
397 srcHosts = getHostsInSubnet(srcSubnet);
398 dstHosts = getHostsInSubnet(dstSubnet);
400 for (Host srcHost : srcHosts) {
401 for (Host dstHost : dstHosts) {
402 double duration = getDurationBetweenHostsInternal(srcHost, dstHost, protocol);
403 if (duration > maxDuration)
404 maxDuration = duration;
410 /* Returns the bit rate between these subnects, per-protocol or not. */
411 private double getBitRateBySubnetInternal(String srcSubnet, String dstSubnet, Byte protocol) {
412 double duration = getDurationBySubnetInternal(srcSubnet, dstSubnet, protocol);
413 long totalBytes = getByteCountBySubnetInternal(srcSubnet, dstSubnet, protocol);
416 return (totalBytes * 8.0) / duration;
419 /* Returns all hosts that transferred data into this subnet. */
420 private Map<Host, Long> getIncomingHostByteCountsInternal(String subnet, Byte protocol) {
421 Map<Host, Long> hosts = new HashMap<Host, Long>();
422 Set<Host> dstHosts = getHostsInSubnet(subnet);
423 Set<Host> otherHosts = getHostsNotInSubnet(subnet);
424 for (Host host : otherHosts) {
425 for (Host targetHost : dstHosts) {
426 Long byteCount = getByteCount(host, targetHost, protocol);
428 hosts.put(host, byteCount);
434 private Set<Byte> getProtocols(Host src, Host dst) {
435 if (this.hostsToStats.get(src) == null ||
436 this.hostsToStats.get(src).get(dst) == null)
437 return new HashSet<Byte>();
438 return this.hostsToStats.get(src).get(dst).getProtocols();
441 private Set<Byte> getProtocols(AffinityLink al) {
442 Set<Byte> protocols = new HashSet<Byte>();
443 for (Entry<Host, Host> flow : this.affinityManager.getAllFlowsByHost(al)) {
444 Host h1 = flow.getKey();
445 Host h2 = flow.getValue();
446 Set<Byte> thisProtocols = getProtocols(h1, h2);
447 protocols.addAll(thisProtocols);
452 private Set<Byte> getProtocols(String srcSubnet, String dstSubnet) {
453 if (srcSubnet == null && dstSubnet == null) {
454 log.debug("Source and destination subnets cannot both be null.");
457 Set<Byte> protocols = new HashSet<Byte>();
460 if (srcSubnet == null) {
461 dstHosts = getHostsInSubnet(dstSubnet);
462 srcHosts = getHostsNotInSubnet(dstSubnet);
463 } else if (dstSubnet == null) {
464 srcHosts = getHostsInSubnet(srcSubnet);
465 dstHosts = getHostsNotInSubnet(srcSubnet);
467 srcHosts = getHostsInSubnet(srcSubnet);
468 dstHosts = getHostsInSubnet(dstSubnet);
471 for (Host srcHost : srcHosts)
472 for (Host dstHost : dstHosts)
473 protocols.addAll(getProtocols(srcHost, dstHost));
477 private Set<Host> getHostsNotInSubnet(String subnet) {
478 Set<Host> hostsInSubnet = getHostsInSubnet(subnet);
479 Set<HostNodeConnector> otherHosts = this.hostTracker.getAllHosts();
480 otherHosts.removeAll(hostsInSubnet);
481 Set<Host> hostsNotInSubnet = new HashSet<Host>();
482 for (Host h : otherHosts)
483 hostsNotInSubnet.add(h);
484 return hostsNotInSubnet;
487 private Set<Host> getHostsInSubnet(String subnet) {
490 Set<Host> hosts = new HashSet<Host>();
492 // Split 1.2.3.4/5 format into the subnet (1.2.3.4) and the mask (5)
494 String[] splitSubnet = subnet.split("/");
495 ip = InetAddress.getByName(splitSubnet[0]);
496 mask = (splitSubnet.length == 2) ? Short.valueOf(splitSubnet[1]) : 32;
497 } catch (UnknownHostException e) {
498 log.debug("Incorrect subnet/mask format: " + subnet);
503 InetAddress targetSubnet = getSubnet(ip, mask);
504 Set<HostNodeConnector> allHosts = this.hostTracker.getAllHosts();
505 for (HostNodeConnector host : allHosts) {
506 InetAddress hostSubnet = getSubnet(host.getNetworkAddress(), mask);
507 if (hostSubnet.equals(targetSubnet))
513 private InetAddress getSubnet(InetAddress ip, Short mask) {
514 byte[] prefix = ip.getAddress();
515 InetAddress newIP = null;
517 int bits = (32 - mask) % 8;
518 int bytes = 4 - ((int) mask / 8);
521 // zero out the bytes
522 for (int i = 1; i <= bytes; i++)
523 prefix[prefix.length - i] = 0x0;
526 prefix[prefix.length - bytes - 1] &= (0xFF << bits);
527 newIP = InetAddress.getByAddress(prefix);
528 } catch (UnknownHostException e) {
534 public void nodeFlowStatisticsUpdated(Node node, List<FlowOnNode> flowStatsList) {
535 Set<HostNodeConnector> allHosts = this.hostTracker.getAllHosts();
537 for (FlowOnNode f : flowStatsList) {
538 Host srcHost = getSourceHostFromFlow(f.getFlow(), allHosts);
539 Host dstHost = getDestinationHostFromFlow(f.getFlow(), allHosts);
541 // Source host being null is okay; it indicates that the
542 // source of this particular flow is a switch, not a host.
544 // TODO: It would be useful, at least for debugging
545 // output, to differentiate between when the source is a
546 // switch and when it's a host that the hosttracker
547 // doesn't know about. The latter would be an error.
548 if (dstHost == null) {
549 log.debug("Error: Destination host is null for Flow " + f.getFlow());
552 else if (srcHost == null) {
553 log.debug("Source host is null for Flow " + f.getFlow() + ". This is NOT necessarily an error.");
557 if (this.hostsToStats.get(srcHost) == null)
558 this.hostsToStats.put(srcHost, new HashMap<Host, HostStats>());
559 if (this.hostsToStats.get(srcHost).get(dstHost) == null)
560 this.hostsToStats.get(srcHost).put(dstHost, new HostStats());
561 this.hostsToStats.get(srcHost).get(dstHost).setStatsFromFlow(f);
566 public void nodeConnectorStatisticsUpdated(Node node, List<NodeConnectorStatistics> ncStatsList) {
567 // Not interested in this update
571 public void nodeTableStatisticsUpdated(Node node, List<NodeTableStatistics> tableStatsList) {
572 // Not interested in this update
576 public void descriptionStatisticsUpdated(Node node, NodeDescription nodeDescription) {
577 // Not interested in this update