ac85dbe6ef4b0112b951215de06db367a07ba549
[affinity.git] / analytics / implementation / src / main / java / org / opendaylight / affinity / analytics / internal / AnalyticsManager.java
1 /*
2  * Copyright (c) 2013 Plexxi, Inc.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.affinity.analytics.internal;
10
11 import java.lang.Short;
12 import java.net.InetAddress;
13 import java.net.UnknownHostException;
14 import java.util.ArrayList;
15 import java.util.HashMap;
16 import java.util.HashSet;
17 import java.util.List;
18 import java.util.Map;
19 import java.util.Map.Entry;
20 import java.util.Set;
21
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
24
25 import org.opendaylight.affinity.affinity.AffinityGroup;
26 import org.opendaylight.affinity.affinity.AffinityLink;
27 import org.opendaylight.affinity.affinity.IAffinityManager;
28 import org.opendaylight.affinity.analytics.IAnalyticsManager;
29 import org.opendaylight.controller.hosttracker.IfIptoHost;
30 import org.opendaylight.controller.hosttracker.hostAware.HostNodeConnector;
31 import org.opendaylight.controller.sal.core.Host;
32 import org.opendaylight.controller.sal.core.Node;
33 import org.opendaylight.controller.sal.core.NodeConnector;
34 import org.opendaylight.controller.sal.flowprogrammer.Flow;
35 import org.opendaylight.controller.sal.match.Match;
36 import org.opendaylight.controller.sal.match.MatchField;
37 import org.opendaylight.controller.sal.match.MatchType;
38 import org.opendaylight.controller.sal.packet.address.EthernetAddress;
39 import org.opendaylight.controller.sal.reader.FlowOnNode;
40 import org.opendaylight.controller.sal.reader.IReadServiceListener;
41 import org.opendaylight.controller.sal.reader.NodeConnectorStatistics;
42 import org.opendaylight.controller.sal.reader.NodeDescription;
43 import org.opendaylight.controller.sal.reader.NodeTableStatistics;
44 import org.opendaylight.controller.sal.utils.Status;
45
46 public class AnalyticsManager implements IReadServiceListener, IAnalyticsManager {
47
48     private static final Logger log = LoggerFactory.getLogger(AnalyticsManager.class);
49
50     private IAffinityManager affinityManager;
51     private IfIptoHost hostTracker;
52
53     private Map<MatchField, Host> destinationHostCache;
54     private Map<MatchField, Host> sourceHostCache;
55     private Map<Host, Map<Host, HostStats>> hostsToStats;
56
57     /* Initialize data structures */
58     void init() {
59         this.destinationHostCache = new HashMap<MatchField, Host>();
60         this.sourceHostCache = new HashMap<MatchField, Host>();
61         this.hostsToStats = new HashMap<Host, Map<Host, HostStats>>();
62     }
63
64     void destroy() {
65     }
66
67     void start() {
68     }
69
70     void started(){
71     }
72
73     void stop() {
74     }
75
76     void setAffinityManager(IAffinityManager a) {
77         this.affinityManager = a;
78     }
79
80     void unsetAffinityManager(IAffinityManager a) {
81         if (this.affinityManager.equals(a))
82             this.affinityManager = null;
83     }
84
85     void setHostTracker(IfIptoHost h) {
86         this.hostTracker = h;
87     }
88
89     void unsetHostTracker(IfIptoHost h) {
90         if (this.hostTracker.equals(h))
91             this.hostTracker = null;
92     }
93
94     /* Returns the destination host associated with this flow, if one
95      * exists.  Returns null otherwise. */
96     protected Host getDestinationHostFromFlow(Flow flow, Set<HostNodeConnector> hosts) {
97         Match match = flow.getMatch();
98         MatchField dst = null;
99
100         // Flow has to have DL_DST field or NW_DST field to proceed
101         if (match.isPresent(MatchType.DL_DST))
102             dst = match.getField(MatchType.DL_DST);
103         else if (match.isPresent(MatchType.NW_DST))
104             dst = match.getField(MatchType.NW_DST);
105         else
106             return null;
107
108         // Check cache
109         Host cacheHit = this.destinationHostCache.get(dst);
110         if (cacheHit != null) 
111             return cacheHit;
112
113         // Find the destination host
114         Host dstHost = null;
115         for (HostNodeConnector h : hosts) {
116             
117             // DL_DST => compare on MAC address strings
118             if (match.isPresent(MatchType.DL_DST)) {
119                 String dstMac = MatchType.DL_DST.stringify(dst.getValue());
120                 String hostMac = ((EthernetAddress) h.getDataLayerAddress()).getMacAddress();
121                 if (dstMac.equals(hostMac)) {
122                     dstHost = h;
123                     this.destinationHostCache.put(dst, dstHost); // Add to cache
124                     break;
125                 }
126             }
127           
128             // NW_DST => compare on IP address (of type InetAddress)
129             else if (match.isPresent(MatchType.NW_DST)) {
130                 InetAddress hostIP = h.getNetworkAddress();
131                 if (dst.getValue().equals(hostIP)) {
132                     dstHost = h;
133                     this.destinationHostCache.put(dst, dstHost); // Add to cache
134                     break;
135                 }
136             }
137         }
138
139         return dstHost;
140     }
141
142     /* Returns the source Host associated with this flow, if one
143      * exists.  Returns null otherwise. */
144     protected Host getSourceHostFromFlow(Flow flow, Set<HostNodeConnector> hosts) {
145
146         Host srcHost = null;
147         Match match = flow.getMatch();
148
149         // Flow must have IN_PORT field (DL_SRC rarely (never?)
150         // exists).
151         if (match.isPresent(MatchType.IN_PORT)) {
152             MatchField inPort = match.getField(MatchType.IN_PORT);
153
154             // Check cache
155             Host cacheHit = this.sourceHostCache.get(inPort);
156             if (cacheHit != null)
157                 return cacheHit;
158
159             // Find the source host by comparing the NodeConnectors
160             NodeConnector inPortNc = (NodeConnector) inPort.getValue();
161             for (HostNodeConnector h : hosts) {
162                 NodeConnector hostNc = h.getnodeConnector();
163                 if (hostNc.equals(inPortNc)) {
164                     srcHost = h;
165                     this.sourceHostCache.put(inPort, h); // Add to cache
166                     break;
167                 }
168             }
169         }
170         return srcHost;
171     }
172
173     /* These are all basic getters/setters, most of which are required
174      * by IAnalyticsManager */
175     public long getByteCount(Host src, Host dst) {
176         return getByteCountBetweenHostsInternal(src, dst, null);
177     }
178
179     public long getByteCount(Host src, Host dst, Byte protocol) {
180         return getByteCountBetweenHostsInternal(src, dst, protocol);
181     }
182
183     public Map<Byte, Long> getAllByteCounts(Host src, Host dst) {
184         if (this.hostsToStats.get(src) == null ||
185             this.hostsToStats.get(src).get(dst) == null)
186             return new HashMap<Byte, Long>();
187         return this.hostsToStats.get(src).get(dst).getAllByteCounts();
188     }
189
190     public double getBitRate(Host src, Host dst) {
191         return getBitRateBetweenHostsInternal(src, dst, null);
192     }
193
194     public double getBitRate(Host src, Host dst, Byte protocol) {
195         return getBitRateBetweenHostsInternal(src, dst, protocol);
196     }
197
198     public Map<Byte, Double> getAllBitRates(Host src, Host dst) {
199         if (this.hostsToStats.get(src) == null ||
200             this.hostsToStats.get(src).get(dst) == null)
201             return new HashMap<Byte, Double>();
202         return this.hostsToStats.get(src).get(dst).getAllBitRates();
203     }
204
205     public long getByteCount(AffinityLink al) {
206         return getByteCountOnAffinityLinkInternal(al, null);
207     }
208
209     public long getByteCount(AffinityLink al, Byte protocol) {
210         return getByteCountOnAffinityLinkInternal(al, protocol);
211     }
212
213     public Map<Byte, Long> getAllByteCounts(AffinityLink al) {
214         Map<Byte, Long> byteCounts = new HashMap<Byte, Long>();
215         Set<Byte> protocols = getProtocols(al);
216         for (Byte protocol : protocols) {
217             Long thisByteCounts = getByteCount(al, protocol);
218             byteCounts.put(protocol, thisByteCounts);
219         }
220         return byteCounts;
221     }
222
223     public double getBitRate(AffinityLink al) {
224         return getBitRateOnAffinityLinkInternal(al, null);
225     }
226
227     public double getBitRate(AffinityLink al, Byte protocol) {
228         return getBitRateOnAffinityLinkInternal(al, protocol);
229     }
230
231     public Map<Byte, Double> getAllBitRates(AffinityLink al) {
232         Map<Byte, Double> bitRates = new HashMap<Byte, Double>();
233         Set<Byte> protocols = getProtocols(al);
234         for (Byte protocol : protocols)
235             bitRates.put(protocol, getBitRate(al, protocol));
236         return bitRates;
237     }
238
239     public long getByteCount(String srcSubnet, String dstSubnet) {
240         return getByteCountBySubnetInternal(srcSubnet, dstSubnet, null);
241     }
242
243     public long getByteCount(String srcSubnet, String dstSubnet, Byte protocol) {
244         return getByteCountBySubnetInternal(srcSubnet, dstSubnet, protocol);
245     }
246
247     public Map<Byte, Long> getAllByteCounts(String srcSubnet, String dstSubnet) {
248         Map<Byte, Long> byteCounts = new HashMap<Byte, Long>();
249         Set<Byte> protocols = getProtocols(srcSubnet, dstSubnet);
250         for (Byte protocol : protocols) {
251             byteCounts.put(protocol, getByteCount(srcSubnet, dstSubnet, protocol));
252         }
253         return byteCounts;
254     }
255
256     public double getBitRate(String srcSubnet, String dstSubnet) {
257         return getBitRateBySubnetInternal(srcSubnet, dstSubnet, null);
258     }
259
260     public double getBitRate(String srcSubnet, String dstSubnet, Byte protocol) {
261         return getBitRateBySubnetInternal(srcSubnet, dstSubnet, protocol);
262     }
263
264     public Map<Byte, Double> getAllBitRates(String srcSubnet, String dstSubnet) {
265         Map<Byte, Double> bitRates = new HashMap<Byte, Double>();
266         Set<Byte> protocols = getProtocols(srcSubnet, dstSubnet);
267         for (Byte protocol : protocols)
268             bitRates.put(protocol, getBitRate(srcSubnet, dstSubnet, protocol));
269         return bitRates;
270     }
271
272     public Map<Host, Long> getIncomingHostByteCounts(String subnet) {
273         return getIncomingHostByteCountsInternal(subnet, null);
274     }
275
276     public Map<Host, Long> getIncomingHostByteCounts(String subnet, Byte protocol) {
277         return getIncomingHostByteCountsInternal(subnet, protocol);
278     }
279
280     /* Return byte count between two hosts, either per-protocol or not */
281     private long getByteCountBetweenHostsInternal(Host src, Host dst, Byte protocol) {
282         long byteCount = 0;
283         if (this.hostsToStats.get(src) != null &&
284             this.hostsToStats.get(src).get(dst) != null) {
285             if (protocol == null)
286                 byteCount = this.hostsToStats.get(src).get(dst).getByteCount();
287             else
288                 byteCount = this.hostsToStats.get(src).get(dst).getByteCount(protocol);
289         }
290         return byteCount;
291     }
292
293     /* Return the total bit rate between two hosts, either per-protocol or not */
294     private double getBitRateBetweenHostsInternal(Host src, Host dst, Byte protocol) {
295         double bitRate = 0;
296         if (this.hostsToStats.get(src) != null &&
297             this.hostsToStats.get(src).get(dst) != null) {
298             if (protocol == null)
299                 bitRate = this.hostsToStats.get(src).get(dst).getBitRate();
300             else
301                 bitRate = this.hostsToStats.get(src).get(dst).getBitRate(protocol);
302         }
303         return bitRate;
304     }
305
306     /* Return the duration between two hosts, either per-protocol or not */
307     private double getDurationBetweenHostsInternal(Host src, Host dst, Byte protocol) {
308         double duration = 0.0;
309         if (this.hostsToStats.get(src) != null &&
310             this.hostsToStats.get(src).get(dst) !=null) {
311             if (protocol == null)
312                 duration = this.hostsToStats.get(src).get(dst).getDuration();
313             else
314                 duration = this.hostsToStats.get(src).get(dst).getDuration(protocol);
315         }
316         return duration;
317     }
318
319     /* Return the byte count on an affinity link, per-protocol or not */
320     private long getByteCountOnAffinityLinkInternal(AffinityLink al, Byte protocol) {
321         long b = 0;
322         List<Entry<Host, Host>> flows = this.affinityManager.getAllFlowsByHost(al);
323         for (Entry<Host, Host> flow : flows) {
324             Host h1 = flow.getKey();
325             Host h2 = flow.getValue();
326             // This will handle protocol being null
327             b += getByteCountBetweenHostsInternal(h1, h2, protocol);
328         }
329         return b;
330     }
331
332     /* Returns bit rate in bits-per-second on an affinity link, per-protocol or not */
333     private double getBitRateOnAffinityLinkInternal(AffinityLink al, Byte protocol) {
334         double duration = getDurationOnAffinityLinkInternal(al, protocol);
335         long totalBytes = getByteCountOnAffinityLinkInternal(al, protocol);
336         if (duration == 0.0)
337             return 0.0;
338         return (totalBytes * 8.0) / duration;
339     }
340
341     /* Returns the duration of communication on an affinity link, per-protocol or not */
342     private double getDurationOnAffinityLinkInternal(AffinityLink al, Byte protocol) {
343         double maxDuration = 0.0;
344         for (Entry<Host, Host> flow : this.affinityManager.getAllFlowsByHost(al)) {
345             Host h1 = flow.getKey();
346             Host h2 = flow.getValue();
347             // This will handle protocol being null
348             double duration = getDurationBetweenHostsInternal(h1, h2, protocol);
349             if (duration > maxDuration)
350                 maxDuration = duration;
351         }
352         return maxDuration;
353     }
354
355     /* Return the total bytes for a particular protocol between these subnets. */
356     private long getByteCountBySubnetInternal(String srcSubnet, String dstSubnet, Byte protocol) {
357         long totalBytes = 0;
358         if (srcSubnet == null && dstSubnet == null) {
359             log.debug("Source and destination subnets cannot both be null.");
360             return totalBytes;
361         }
362         Set<Host> srcHosts;
363         Set<Host> dstHosts;
364         if (srcSubnet == null) {
365             dstHosts = getHostsInSubnet(dstSubnet);
366             srcHosts = getHostsNotInSubnet(dstSubnet);
367         } else if (dstSubnet == null) {
368             srcHosts = getHostsInSubnet(srcSubnet);
369             dstHosts = getHostsNotInSubnet(srcSubnet);
370         } else {
371             srcHosts = getHostsInSubnet(srcSubnet);
372             dstHosts = getHostsInSubnet(dstSubnet);
373         }
374
375         for (Host srcHost : srcHosts)
376             for (Host dstHost : dstHosts)
377                 totalBytes += getByteCount(srcHost, dstHost, protocol);
378         return totalBytes;
379     }
380
381     /* Returns the duration of communication between two subnetes, per-protocol or not */
382     private double getDurationBySubnetInternal(String srcSubnet, String dstSubnet, Byte protocol) {
383         double maxDuration = 0.0;
384         if (srcSubnet == null && dstSubnet == null) {
385             log.debug("Source and destination subnet cannot both be null.");
386             return maxDuration;
387         }
388         Set<Host> srcHosts;
389         Set<Host> dstHosts;
390         if (srcSubnet == null) {
391             dstHosts = getHostsInSubnet(dstSubnet);
392             srcHosts = getHostsNotInSubnet(dstSubnet);
393         } else if (dstSubnet == null) {
394             srcHosts = getHostsInSubnet(srcSubnet);
395             dstHosts = getHostsNotInSubnet(srcSubnet);
396         } else {
397             srcHosts = getHostsInSubnet(srcSubnet);
398             dstHosts = getHostsInSubnet(dstSubnet);
399         }
400         for (Host srcHost : srcHosts) {
401             for (Host dstHost : dstHosts) {
402                 double duration = getDurationBetweenHostsInternal(srcHost, dstHost, protocol);
403                 if (duration > maxDuration)
404                     maxDuration = duration;
405             }
406         }
407         return maxDuration;
408     }
409
410     /* Returns the bit rate between these subnects, per-protocol or not. */
411     private double getBitRateBySubnetInternal(String srcSubnet, String dstSubnet, Byte protocol) {
412         double duration = getDurationBySubnetInternal(srcSubnet, dstSubnet, protocol);
413         long totalBytes = getByteCountBySubnetInternal(srcSubnet, dstSubnet, protocol);
414         if (duration == 0.0)
415             return 0.0;
416         return (totalBytes * 8.0) / duration;
417     }
418
419     /* Returns all hosts that transferred data into this subnet. */
420     private Map<Host, Long> getIncomingHostByteCountsInternal(String subnet, Byte protocol) {
421         Map<Host, Long> hosts = new HashMap<Host, Long>();
422         Set<Host> dstHosts = getHostsInSubnet(subnet);
423         Set<Host> otherHosts = getHostsNotInSubnet(subnet);
424         for (Host host : otherHosts) {
425             for (Host targetHost : dstHosts) {
426                 Long byteCount = getByteCount(host, targetHost, protocol);
427                 if (byteCount > 0)
428                     hosts.put(host, byteCount);
429             }
430         }
431         return hosts;
432     }
433
434     private Set<Byte> getProtocols(Host src, Host dst) {
435         if (this.hostsToStats.get(src) == null ||
436             this.hostsToStats.get(src).get(dst) == null)
437             return new HashSet<Byte>();
438         return this.hostsToStats.get(src).get(dst).getProtocols();
439     }
440
441     private Set<Byte> getProtocols(AffinityLink al) {
442         Set<Byte> protocols = new HashSet<Byte>();
443         for (Entry<Host, Host> flow : this.affinityManager.getAllFlowsByHost(al)) {
444             Host h1 = flow.getKey();
445             Host h2 = flow.getValue();
446             Set<Byte> thisProtocols = getProtocols(h1, h2);
447             protocols.addAll(thisProtocols);
448         }
449         return protocols;
450     }
451
452     private Set<Byte> getProtocols(String srcSubnet, String dstSubnet) {
453         if (srcSubnet == null && dstSubnet == null) {
454             log.debug("Source and destination subnets cannot both be null.");
455             return null;
456         }
457         Set<Byte> protocols = new HashSet<Byte>();
458         Set<Host> srcHosts;
459         Set<Host> dstHosts;
460         if (srcSubnet == null) {
461             dstHosts = getHostsInSubnet(dstSubnet);
462             srcHosts = getHostsNotInSubnet(dstSubnet);
463         } else if (dstSubnet == null) {
464             srcHosts = getHostsInSubnet(srcSubnet);
465             dstHosts = getHostsNotInSubnet(srcSubnet);
466         } else {
467             srcHosts = getHostsInSubnet(srcSubnet);
468             dstHosts = getHostsInSubnet(dstSubnet);
469         }
470
471         for (Host srcHost : srcHosts)
472             for (Host dstHost : dstHosts)
473                 protocols.addAll(getProtocols(srcHost, dstHost));
474         return protocols;
475     }
476
477     private Set<Host> getHostsNotInSubnet(String subnet) {
478         Set<Host> hostsInSubnet = getHostsInSubnet(subnet);
479         Set<HostNodeConnector> otherHosts = this.hostTracker.getAllHosts();
480         otherHosts.removeAll(hostsInSubnet);
481         Set<Host> hostsNotInSubnet = new HashSet<Host>();
482         for (Host h : otherHosts)
483             hostsNotInSubnet.add(h);
484         return hostsNotInSubnet;
485     }
486
487     private Set<Host> getHostsInSubnet(String subnet) {
488         InetAddress ip;
489         Short mask;
490         Set<Host> hosts = new HashSet<Host>();
491
492         // Split 1.2.3.4/5 format into the subnet (1.2.3.4) and the mask (5)
493         try {
494             String[] splitSubnet = subnet.split("/");
495             ip = InetAddress.getByName(splitSubnet[0]);
496             mask = (splitSubnet.length == 2) ? Short.valueOf(splitSubnet[1]) : 32;
497         } catch (UnknownHostException e) {
498             log.debug("Incorrect subnet/mask format: " + subnet);
499             return hosts;
500         }
501
502         // Match on subnetes
503         InetAddress targetSubnet = getSubnet(ip, mask);
504         Set<HostNodeConnector> allHosts = this.hostTracker.getAllHosts();
505         for (HostNodeConnector host : allHosts) {
506             InetAddress hostSubnet = getSubnet(host.getNetworkAddress(), mask);
507             if (hostSubnet.equals(targetSubnet))
508                 hosts.add(host);
509         }
510         return hosts;
511     }
512
513     private InetAddress getSubnet(InetAddress ip, Short mask) {
514         byte[] prefix = ip.getAddress();
515         InetAddress newIP = null;
516         try {
517             int bits = (32 - mask) % 8;
518             int bytes = 4 - ((int) mask / 8);
519             if (bits > 0)
520                 bytes--;
521             // zero out the bytes
522             for (int i = 1; i <= bytes; i++)
523                 prefix[prefix.length - i] = 0x0;
524             // zero out the bits
525             if (bits > 0)
526                 prefix[prefix.length - bytes - 1] &= (0xFF << bits);
527             newIP = InetAddress.getByAddress(prefix);
528         } catch (UnknownHostException e) {
529         }
530         return newIP;
531     }
532
533     @Override
534     public void nodeFlowStatisticsUpdated(Node node, List<FlowOnNode> flowStatsList) {
535         Set<HostNodeConnector> allHosts = this.hostTracker.getAllHosts();
536
537         for (FlowOnNode f : flowStatsList) {
538             Host srcHost = getSourceHostFromFlow(f.getFlow(), allHosts);
539             Host dstHost = getDestinationHostFromFlow(f.getFlow(), allHosts);
540
541             // Source host being null is okay; it indicates that the
542             // source of this particular flow is a switch, not a host.
543             //
544             // TODO: It would be useful, at least for debugging
545             // output, to differentiate between when the source is a
546             // switch and when it's a host that the hosttracker
547             // doesn't know about. The latter would be an error.
548             if (dstHost == null) {
549                 log.debug("Error: Destination host is null for Flow " + f.getFlow());
550                 continue;
551             }
552             else if (srcHost == null) {
553                 log.debug("Source host is null for Flow " + f.getFlow() + ". This is NOT necessarily an error.");
554                 continue;
555             }
556
557             if (this.hostsToStats.get(srcHost) == null)
558                 this.hostsToStats.put(srcHost, new HashMap<Host, HostStats>());
559             if (this.hostsToStats.get(srcHost).get(dstHost) == null)
560                 this.hostsToStats.get(srcHost).put(dstHost, new HostStats());
561             this.hostsToStats.get(srcHost).get(dstHost).setStatsFromFlow(f);
562         }
563     }
564
565     @Override
566     public void nodeConnectorStatisticsUpdated(Node node, List<NodeConnectorStatistics> ncStatsList) {
567         // Not interested in this update
568     }
569
570     @Override
571     public void nodeTableStatisticsUpdated(Node node, List<NodeTableStatistics> tableStatsList) {
572         // Not interested in this update
573     }
574
575     @Override
576     public void descriptionStatisticsUpdated(Node node, NodeDescription nodeDescription) {
577         // Not interested in this update
578     }
579 }