Fixed bugs while getting demo.py to work.
[affinity.git] / analytics / implementation / src / main / java / org / opendaylight / affinity / analytics / internal / AnalyticsManager.java
1 /*
2  * Copyright (c) 2013 Plexxi, Inc.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.affinity.analytics.internal;
10
11 import java.lang.Short;
12 import java.net.InetAddress;
13 import java.net.UnknownHostException;
14 import java.util.ArrayList;
15 import java.util.Arrays;
16 import java.util.HashMap;
17 import java.util.HashSet;
18 import java.util.List;
19 import java.util.Map;
20 import java.util.Map.Entry;
21 import java.util.Set;
22
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25
26 import org.opendaylight.affinity.affinity.AffinityGroup;
27 import org.opendaylight.affinity.affinity.AffinityLink;
28 import org.opendaylight.affinity.affinity.IAffinityManager;
29 import org.opendaylight.affinity.analytics.IAnalyticsManager;
30 import org.opendaylight.controller.hosttracker.IfIptoHost;
31 import org.opendaylight.controller.hosttracker.hostAware.HostNodeConnector;
32 import org.opendaylight.controller.sal.core.Host;
33 import org.opendaylight.controller.sal.core.Node;
34 import org.opendaylight.controller.sal.core.NodeConnector;
35 import org.opendaylight.controller.sal.flowprogrammer.Flow;
36 import org.opendaylight.controller.sal.match.Match;
37 import org.opendaylight.controller.sal.match.MatchField;
38 import org.opendaylight.controller.sal.match.MatchType;
39 import org.opendaylight.controller.sal.packet.address.EthernetAddress;
40 import org.opendaylight.controller.sal.reader.FlowOnNode;
41 import org.opendaylight.controller.sal.reader.IReadServiceListener;
42 import org.opendaylight.controller.sal.reader.NodeConnectorStatistics;
43 import org.opendaylight.controller.sal.reader.NodeDescription;
44 import org.opendaylight.controller.sal.reader.NodeTableStatistics;
45 import org.opendaylight.controller.sal.utils.Status;
46
47 public class AnalyticsManager implements IReadServiceListener, IAnalyticsManager {
48
49     private static final Logger log = LoggerFactory.getLogger(AnalyticsManager.class);
50
51     private IAffinityManager affinityManager;
52     private IfIptoHost hostTracker;
53
54     private Map<MatchField, Host> destinationHostCache;
55     private Map<MatchField, Host> sourceHostCache;
56     private Map<Host, Map<Host, HostStats>> hostsToStats;
57
58     /* Initialize data structures */
59     void init() {
60         this.destinationHostCache = new HashMap<MatchField, Host>();
61         this.sourceHostCache = new HashMap<MatchField, Host>();
62         this.hostsToStats = new HashMap<Host, Map<Host, HostStats>>();
63     }
64
65     void destroy() {
66     }
67
68     void start() {
69         log.info("Analytics manager started.!!!");
70     }
71
72     void started(){
73         log.info("Analytics manager started.!!!");
74     }
75
76     void stop() {
77     }
78
79     void setAffinityManager(IAffinityManager a) {
80         this.affinityManager = a;
81     }
82
83     void unsetAffinityManager(IAffinityManager a) {
84         if (this.affinityManager.equals(a))
85             this.affinityManager = null;
86     }
87
88     void setHostTracker(IfIptoHost h) {
89         this.hostTracker = h;
90     }
91
92     void unsetHostTracker(IfIptoHost h) {
93         if (this.hostTracker.equals(h))
94             this.hostTracker = null;
95     }
96
97     /* Returns the destination host associated with this flow, if one
98      * exists.  Returns null otherwise. */
99     protected Host getDestinationHostFromFlow(Flow flow, Set<HostNodeConnector> hosts) {
100         Match match = flow.getMatch();
101         MatchField dst = null;
102
103         // Flow has to have DL_DST field or NW_DST field to proceed
104         if (match.isPresent(MatchType.DL_DST))
105             dst = match.getField(MatchType.DL_DST);
106         else if (match.isPresent(MatchType.NW_DST))
107             dst = match.getField(MatchType.NW_DST);
108         else
109             return null;
110
111         // Check cache
112         Host cacheHit = this.destinationHostCache.get(dst);
113         if (cacheHit != null) 
114             return cacheHit;
115
116         // Find the destination host
117         Host dstHost = null;
118         for (HostNodeConnector h : hosts) {
119             
120             // DL_DST => compare on MAC address strings
121             if (match.isPresent(MatchType.DL_DST)) {
122                 String dstMac = MatchType.DL_DST.stringify(dst.getValue());
123                 String hostMac = ((EthernetAddress) h.getDataLayerAddress()).getMacAddress();
124                 if (dstMac.equals(hostMac)) {
125                     dstHost = h;
126                     this.destinationHostCache.put(dst, dstHost); // Add to cache
127                     break;
128                 }
129             }
130           
131             // NW_DST => compare on IP address (of type InetAddress)
132             else if (match.isPresent(MatchType.NW_DST)) {
133                 InetAddress hostIP = h.getNetworkAddress();
134                 if (dst.getValue().equals(hostIP)) {
135                     dstHost = h;
136                     this.destinationHostCache.put(dst, dstHost); // Add to cache
137                     break;
138                 }
139             }
140         }
141
142         return dstHost;
143     }
144
145     /* Returns the source Host associated with this flow, if one
146      * exists.  Returns null otherwise. */
147     protected Host getSourceHostFromFlow(Flow flow, Set<HostNodeConnector> hosts) {
148
149         Host srcHost = null;
150         Match match = flow.getMatch();
151
152         // Flow must have IN_PORT field (DL_SRC rarely (never?)
153         // exists).
154         if (match.isPresent(MatchType.IN_PORT)) {
155             MatchField inPort = match.getField(MatchType.IN_PORT);
156
157             // Check cache
158             Host cacheHit = this.sourceHostCache.get(inPort);
159             if (cacheHit != null)
160                 return cacheHit;
161
162             // Find the source host by comparing the NodeConnectors
163             NodeConnector inPortNc = (NodeConnector) inPort.getValue();
164             for (HostNodeConnector h : hosts) {
165                 NodeConnector hostNc = h.getnodeConnector();
166                 if (hostNc.equals(inPortNc)) {
167                     srcHost = h;
168                     this.sourceHostCache.put(inPort, h); // Add to cache
169                     break;
170                 }
171             }
172         }
173         return srcHost;
174     }
175
176     /* Return all protocols used between two sets */
177     protected Set<Byte> getProtocols(Set<Host> srcSet, Set<Host> dstSet) {
178         Set<Byte> protocols = new HashSet<Byte>();
179         for (Host src : srcSet) {
180             for (Host dst : dstSet) {
181                 if (this.hostsToStats.get(src) != null &&
182                     this.hostsToStats.get(src).get(dst) != null) {
183                     protocols.addAll(this.hostsToStats.get(src).get(dst).getProtocols());
184                 }
185             }
186         }
187         return protocols;
188     }
189
190     /* Return the number of bytes transferred between two sets,
191      * per-protocol (across all protocols if protocol is null).*/
192     protected long getByteCount(Set<Host> srcSet, Set<Host> dstSet, Byte protocol) {
193         long byteCount = 0;
194         for (Host src : srcSet) {
195             for (Host dst : dstSet) {
196                 if (this.hostsToStats.get(src) != null &&
197                     this.hostsToStats.get(src).get(dst) != null) {
198                     if (protocol == null)
199                         byteCount += this.hostsToStats.get(src).get(dst).getByteCount();
200                     else
201                         byteCount += this.hostsToStats.get(src).get(dst).getByteCount(protocol);
202                 }
203             }
204         }
205         return byteCount;
206     }
207
208     /* Returns a map of protocol -> byte counts between two sets */
209     protected Map<Byte, Long> getAllByteCounts(Set<Host> srcSet, Set<Host> dstSet) {
210         Map<Byte, Long> byteCounts = new HashMap<Byte, Long>();
211         Set<Byte> protocols = getProtocols(srcSet, dstSet);
212         for (Byte protocol : protocols)
213             byteCounts.put(protocol, getByteCount(srcSet, dstSet, protocol));
214         return byteCounts;
215     }
216
217     /* Returns the packet count between two sets, per-protocol (across
218      * all protocols if protocol is null). */
219     protected long getPacketCount(Set<Host> srcSet, Set<Host> dstSet, Byte protocol) {
220         long packetCount = 0;
221         for (Host src : srcSet) {
222             for (Host dst : dstSet) {
223                 if (this.hostsToStats.get(src) != null &&
224                     this.hostsToStats.get(src).get(dst) != null) {
225                     if (protocol == null)
226                         packetCount += this.hostsToStats.get(src).get(dst).getPacketCount();
227                     else
228                         packetCount += this.hostsToStats.get(src).get(dst).getPacketCount(protocol);
229                 }
230             }
231         }
232         return packetCount;
233     }
234
235     /* Returns a map of protocol -> packet counts between two sets */
236     protected Map<Byte, Long> getAllPacketCounts(Set<Host> srcSet, Set<Host> dstSet) {
237         Map<Byte, Long> packetCounts = new HashMap<Byte, Long>();
238         Set<Byte> protocols = getProtocols(srcSet, dstSet);
239         for (Byte protocol : protocols)
240             packetCounts.put(protocol, getPacketCount(srcSet, dstSet, protocol));
241         return packetCounts;
242     }
243
244     /* Returns the duration of communication between two sets (max
245      * duration of all flows) for a particular protocol (across all
246      * protocols if protocol is null) */
247     protected double getDuration(Set<Host> srcSet, Set<Host> dstSet, Byte protocol) {
248         double maxDuration = 0.0;
249         for (Host src : srcSet) {
250             if (this.hostsToStats.get(src) == null)
251                 continue;
252             for (Host dst : dstSet) {
253                 double duration;
254                 if (this.hostsToStats.get(src).get(dst) == null)
255                     continue;
256                 if (protocol == null)
257                     duration = this.hostsToStats.get(src).get(dst).getDuration();
258                 else
259                     duration = this.hostsToStats.get(src).get(dst).getDuration(protocol);
260                 if (duration > maxDuration)
261                     maxDuration = duration;
262             }
263         }
264         return maxDuration;
265     }
266
267     /* Returns a map of protocol -> (max) duration over that protocol. */
268     protected Map<Byte, Double> getAllDurations(Set<Host> srcSet, Set<Host> dstSet) {
269         Map<Byte, Double> durations = new HashMap<Byte, Double>();
270         Set<Byte> protocols = getProtocols(srcSet, dstSet);
271         for (Byte protocol : protocols)
272             durations.put(protocol, getDuration(srcSet, dstSet, protocol));
273         return durations;
274     }
275
276     /* Returns the bit rate between two sets */
277     protected double getBitRate(Set<Host> srcSet, Set<Host> dstSet, Byte protocol) {
278         double duration = getDuration(srcSet, dstSet, protocol);
279         long totalBytes = getByteCount(srcSet, dstSet, protocol);
280         if (duration == 0.0)
281             return 0.0;
282         return (totalBytes * 8.0) / duration;
283     }
284
285     /* Returns all bit rates between two sets */
286     protected Map<Byte, Double> getAllBitRates(Set<Host> srcSet, Set<Host> dstSet) {
287         Map<Byte, Double> bitRates = new HashMap<Byte, Double>();
288         Set<Byte> protocols = getProtocols(srcSet, dstSet);
289         for (Byte protocol : protocols)
290             bitRates.put(protocol, getBitRate(srcSet, dstSet, protocol));
291         return bitRates;
292     }
293
294     /* Because the generic stats API relies on having two Set<Host>
295      * arguments, the next series of functions converts various
296      * Objects into Set<Host>.  */
297
298     /* Host -> Set<Host> */
299     private Set<Host> getSet(Host h) {
300         return new HashSet<Host>(Arrays.asList(h));
301     }
302
303     /* AffinityLink -> Set of source Hosts */
304     private Set<Host> getSrcSet(AffinityLink al) {
305         return new HashSet<Host>(this.affinityManager.getAllElementsByHost(al.getToGroup()));
306     }
307
308     /* AffinityLink -> Set of destination Hosts */
309     private Set<Host> getDstSet(AffinityLink al) {
310         return new HashSet<Host>(this.affinityManager.getAllElementsByHost(al.getToGroup()));
311     }
312
313     /* srcSubnet, dstSubnet -> Set of Hosts in srcSubnet.  If
314      * srcSubnet is null, will return all hosts *not* in dstSubnet. */
315     private Set<Host> getSrcSet(String srcSubnet, String dstSubnet) {
316         if (srcSubnet == null && dstSubnet == null) {
317             log.debug("Source and destination subnets cannot both be null.");
318             return new HashSet<Host>();
319         }
320         if (srcSubnet == null)
321             return getHostsNotInSubnet(dstSubnet);
322         else
323             return getHostsInSubnet(srcSubnet);
324     }
325
326     /* srcSubnet, dstSubnet -> Set of Hosts in dstSubnet.  This has
327      * the same logic as the previous method, so just flip the
328      * arguments. */
329     private Set<Host> getDstSet(String srcSubnet, String dstSubnet) {
330         return getSrcSet(dstSubnet, srcSubnet);
331     }
332
333     /* Basic getters for host pair statistics */
334     public long getByteCount(Host src, Host dst) { return getByteCount(src, dst, null); }
335     public long getByteCount(Host src, Host dst, Byte protocol) { return getByteCount(getSet(src), getSet(dst), protocol); }
336     public Map<Byte, Long> getAllByteCounts(Host src, Host dst) { return getAllByteCounts(getSet(src), getSet(dst)); }
337     public long getPacketCount(Host src, Host dst) { return getPacketCount(src, dst, null); }
338     public long getPacketCount(Host src, Host dst, Byte protocol) { return getPacketCount(getSet(src), getSet(dst), protocol); }
339     public Map<Byte, Long> getAllPacketCounts(Host src, Host dst) { return getAllPacketCounts(getSet(src), getSet(dst)); }
340     public double getDuration(Host src, Host dst) { return getDuration(src, dst, null); }
341     public double getDuration(Host src, Host dst, Byte protocol) { return getDuration(getSet(src), getSet(dst), protocol); }
342     public Map<Byte, Double> getAllDurations(Host src, Host dst) { return getAllDurations(getSet(src), getSet(dst)); }
343     public double getBitRate(Host src, Host dst) { return getBitRate(src, dst, null); }
344     public double getBitRate(Host src, Host dst, Byte protocol) { return getBitRate(getSet(src), getSet(dst), protocol); }
345     public Map<Byte, Double> getAllBitRates(Host src, Host dst) { return getAllBitRates(getSet(src), getSet(dst)); }
346
347     /* Basic getters for affinity link statistics */
348     public long getByteCount(AffinityLink al) { return getByteCount(al, null); }
349     public long getByteCount(AffinityLink al, Byte protocol) { return getByteCount(getSrcSet(al), getDstSet(al), protocol); }
350     public Map<Byte, Long> getAllByteCounts(AffinityLink al) { return getAllByteCounts(getSrcSet(al), getDstSet(al)); }
351     public long getPacketCount(AffinityLink al) { return getPacketCount(al, null); }
352     public long getPacketCount(AffinityLink al, Byte protocol) { return getPacketCount(getSrcSet(al), getDstSet(al), protocol); }
353     public Map<Byte, Long> getAllPacketCounts(AffinityLink al) { return getAllPacketCounts(getSrcSet(al), getDstSet(al)); }
354     public double getDuration(AffinityLink al) { return getDuration(al, null); }
355     public double getDuration(AffinityLink al, Byte protocol) { return getDuration(getSrcSet(al), getDstSet(al), protocol); }
356     public Map<Byte, Double> getAllDurations(AffinityLink al) { return getAllDurations(getSrcSet(al), getDstSet(al)); }
357     public double getBitRate(AffinityLink al) { return getBitRate(al, null); }
358     public double getBitRate(AffinityLink al, Byte protocol) { return getBitRate(getSrcSet(al), getDstSet(al), protocol); }
359     public Map<Byte, Double> getAllBitRates(AffinityLink al) { return getAllBitRates(getSrcSet(al), getDstSet(al)); }
360
361     /* Basic getters for subnet statistics */
362     public long getByteCount(String srcSub, String dstSub) { return getByteCount(srcSub, dstSub, null); }
363     public long getByteCount(String srcSub, String dstSub, Byte protocol) { return getByteCount(getSrcSet(srcSub, dstSub), getDstSet(srcSub, dstSub), protocol); }
364     public Map<Byte, Long> getAllByteCounts(String srcSub, String dstSub) { return getAllByteCounts(getSrcSet(srcSub, dstSub), getDstSet(srcSub, dstSub)); }
365     public long getPacketCount(String srcSub, String dstSub) { return getPacketCount(srcSub, dstSub, null); }
366     public long getPacketCount(String srcSub, String dstSub, Byte protocol) { return getPacketCount(getSrcSet(srcSub, dstSub), getDstSet(srcSub, dstSub), protocol); }
367     public Map<Byte, Long> getAllPacketCounts(String srcSub, String dstSub) { return getAllPacketCounts(getSrcSet(srcSub, dstSub), getDstSet(srcSub, dstSub)); }
368     public double getDuration(String srcSub, String dstSub) { return getDuration(srcSub, dstSub, null); }
369     public double getDuration(String srcSub, String dstSub, Byte protocol) { return getDuration(getSrcSet(srcSub, dstSub), getDstSet(srcSub, dstSub), protocol); }
370     public Map<Byte, Double> getAllDurations(String srcSub, String dstSub) { return getAllDurations(getSrcSet(srcSub, dstSub), getDstSet(srcSub, dstSub)); }
371     public double getBitRate(String srcSub, String dstSub) { return getBitRate(srcSub, dstSub, null); }
372     public double getBitRate(String srcSub, String dstSub, Byte protocol) { return getBitRate(getSrcSet(srcSub, dstSub), getDstSet(srcSub, dstSub), protocol); }
373     public Map<Byte, Double> getAllBitRates(String srcSub, String dstSub) { return getAllBitRates(getSrcSet(srcSub, dstSub), getDstSet(srcSub, dstSub)); }
374     
375     /* Returns all hosts that transferred data into this subnet. */
376     public Map<Host, Long> getIncomingHostByteCounts(String subnet, Byte protocol, Set<HostNodeConnector> allHosts) {
377         Map<Host, Long> hosts = new HashMap<Host, Long>();
378         Set<Host> dstHosts = getHostsInSubnet(subnet, allHosts);
379         Set<Host> otherHosts = getHostsNotInSubnet(subnet, allHosts);
380         for (Host host : otherHosts) {
381             for (Host targetHost : dstHosts) {
382                 Long byteCount = getByteCount(host, targetHost, protocol);
383                 if (byteCount > 0)
384                     hosts.put(host, byteCount);
385             }
386         }
387         return hosts;
388     }
389
390     public Map<Host, Long> getIncomingHostByteCounts(String subnet) {
391         return getIncomingHostByteCounts(subnet, null);
392     }
393
394     public Map<Host, Long> getIncomingHostByteCounts(String subnet, Byte protocol) {
395          Set<HostNodeConnector> allHosts = this.hostTracker.getAllHosts();
396          return getIncomingHostByteCounts(subnet, protocol, allHosts);
397     }
398
399     /* Hosts in allHosts that are not part of the subnet. This is
400      * useful when we need statistics about, e.g., all data into a
401      * particular subnet (so data from hosts outside of that
402      * subnet).*/
403     protected Set<Host> getHostsNotInSubnet(String subnet, Set<HostNodeConnector> allHosts) {
404         Set<Host> hostsInSubnet = getHostsInSubnet(subnet, allHosts);
405         Set<HostNodeConnector> otherHosts = new HashSet<HostNodeConnector>(allHosts); // copy constructor
406         otherHosts.removeAll(hostsInSubnet);
407         Set<Host> hostsNotInSubnet = new HashSet<Host>();
408         for (Host h : otherHosts)
409             hostsNotInSubnet.add(h);
410         return hostsNotInSubnet;
411     }
412
413     private Set<Host> getHostsNotInSubnet(String subnet) {
414         return getHostsNotInSubnet(subnet, this.hostTracker.getAllHosts());
415     }
416
417     /* Returns the set of hosts that are part of this subnet. */
418     protected Set<Host> getHostsInSubnet(String subnet, Set<HostNodeConnector> allHosts) {
419         InetAddress ip;
420         Short mask;
421         Set<Host> hosts = new HashSet<Host>();
422
423         // Split 1.2.3.4/5 format into the subnet (1.2.3.4) and the mask (5)
424         try {
425             String[] splitSubnet = subnet.split("/");
426             ip = InetAddress.getByName(splitSubnet[0]);
427             mask = (splitSubnet.length == 2) ? Short.valueOf(splitSubnet[1]) : 32;
428         } catch (UnknownHostException e) {
429             log.debug("Incorrect subnet/mask format: " + subnet);
430             return hosts;
431         }
432
433         // Match on subnets
434         InetAddress targetSubnet = getSubnet(ip, mask);
435         for (HostNodeConnector host : allHosts) {
436             InetAddress hostSubnet = getSubnet(host.getNetworkAddress(), mask);
437             if (hostSubnet.equals(targetSubnet))
438                 hosts.add(host);
439         }
440         return hosts;
441     }
442
443     private Set<Host> getHostsInSubnet(String subnet) {
444         return getHostsInSubnet(subnet, this.hostTracker.getAllHosts());
445     }
446
447     /* Get the subnet associated with this IP and mask. */
448     private InetAddress getSubnet(InetAddress ip, Short mask) {
449         byte[] prefix = ip.getAddress();
450         InetAddress newIP = null;
451         try {
452             int bits = (32 - mask) % 8;
453             int bytes = 4 - ((int) mask / 8);
454             if (bits > 0)
455                 bytes--;
456             // zero out the bytes
457             for (int i = 1; i <= bytes; i++)
458                 prefix[prefix.length - i] = 0x0;
459             // zero out the bits
460             if (bits > 0)
461                 prefix[prefix.length - bytes - 1] &= (0xFF << bits);
462             newIP = InetAddress.getByAddress(prefix);
463         } catch (UnknownHostException e) {
464         }
465         return newIP;
466     }
467
468     @Override
469     public void nodeFlowStatisticsUpdated(Node node, List<FlowOnNode> flowStatsList) {
470         nodeFlowStatisticsUpdated(node, flowStatsList, this.hostTracker.getAllHosts());
471     }
472
473     protected void nodeFlowStatisticsUpdated(Node node, List<FlowOnNode> flowStatsList, Set<HostNodeConnector> allHosts) {
474         for (FlowOnNode f : flowStatsList) {
475             Host srcHost = getSourceHostFromFlow(f.getFlow(), allHosts);
476             Host dstHost = getDestinationHostFromFlow(f.getFlow(), allHosts);
477
478             // Source host being null is okay; it indicates that the
479             // source of this particular flow is a switch, not a host.
480             //
481             // TODO: It would be useful, at least for debugging
482             // output, to differentiate between when the source is a
483             // switch and when it's a host that the hosttracker
484             // doesn't know about. The latter would be an error.
485             if (dstHost == null) {
486                 log.debug("Error: Destination host is null for Flow " + f.getFlow());
487                 continue;
488             }
489             else if (srcHost == null) {
490                 log.debug("Source host is null for Flow " + f.getFlow() + ". This is NOT necessarily an error.");
491                 continue;
492             }
493
494             if (this.hostsToStats.get(srcHost) == null)
495                 this.hostsToStats.put(srcHost, new HashMap<Host, HostStats>());
496             if (this.hostsToStats.get(srcHost).get(dstHost) == null)
497                 this.hostsToStats.get(srcHost).put(dstHost, new HostStats());
498             this.hostsToStats.get(srcHost).get(dstHost).setStatsFromFlow(f);
499         }
500     }
501
502     @Override
503     public void nodeConnectorStatisticsUpdated(Node node, List<NodeConnectorStatistics> ncStatsList) {
504         // Not interested in this update
505     }
506
507     @Override
508     public void nodeTableStatisticsUpdated(Node node, List<NodeTableStatistics> tableStatsList) {
509         // Not interested in this update
510     }
511
512     @Override
513     public void descriptionStatisticsUpdated(Node node, NodeDescription nodeDescription) {
514         // Not interested in this update
515     }
516 }