[DO NOT MERGE]
[controller.git] / opendaylight / ping / service / src / main / java / org / opendaylight / controller / ping / service / impl / PingServiceImpl.java
1 package org.opendaylight.controller.ping.service.impl;
2
3 import java.util.Iterator;
4 import java.util.Timer;
5 import java.util.TimerTask;
6 import java.util.concurrent.ConcurrentHashMap;
7 import java.util.concurrent.ConcurrentMap;
8 import java.util.concurrent.ExecutionException;
9 import java.util.concurrent.Future;
10
11 import org.opendaylight.controller.ping.service.api.PingServiceAPI;
12 import org.opendaylight.controller.sal.binding.api.AbstractBindingAwareConsumer;
13 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ConsumerContext;
14 import org.opendaylight.controller.sal.binding.api.BindingAwareConsumer;
15 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Ipv4Address;
16 import org.opendaylight.yang.gen.v1.urn.opendaylight.ping.rev130911.PingService;
17 import org.opendaylight.yang.gen.v1.urn.opendaylight.ping.rev130911.SendEchoInputBuilder;
18 import org.opendaylight.yang.gen.v1.urn.opendaylight.ping.rev130911.SendEchoOutput;
19 import org.opendaylight.yangtools.yang.common.RpcResult;
20 import org.osgi.framework.BundleActivator;
21 import org.osgi.framework.BundleContext;
22
23 public class PingServiceImpl extends AbstractBindingAwareConsumer implements
24         BundleActivator, BindingAwareConsumer, PingServiceAPI {
25
26     private PingService pingService;
27     private ConsumerContext session;
28
29     private class AsyncPingEntry {
30         public Future<RpcResult<SendEchoOutput>> pendingResult;
31         public SendEchoOutput.EchoResult lastEchoResult;
32         public int countSent = 0;     // debug use
33         public int countReceived = 0; // debug use
34
35         public AsyncPingEntry(Ipv4Address destination, PingService pingService, AsyncPingEntry previousAsyncPingEntry) {
36             if (previousAsyncPingEntry != null) {
37                 this.lastEchoResult = previousAsyncPingEntry.lastEchoResult;
38                 this.countSent = previousAsyncPingEntry.countSent;
39                 this.countReceived = previousAsyncPingEntry.countReceived;
40             }
41
42             /** Tickle the ping service to go ahead and send the echo packet.
43              */
44             try {
45                 SendEchoInputBuilder ib = new SendEchoInputBuilder();
46                 ib.setDestination(destination);
47                 this.pendingResult = pingService.sendEcho(ib.build());
48                 this.countSent++;
49             } catch (Exception e) {
50                 this.pendingResult = null;
51             }
52         }
53     }
54     private ConcurrentMap<Ipv4Address, AsyncPingEntry> asyncPingEntryConcurrentMap;
55     private Timer asyncPingTimer;
56     private class AsyncPingTimerHandler extends TimerTask {
57         /** The purpose of AsyncPingTimerHandler's life is to look for async entries that have been
58          * finished with their RPC calls to the ping plugin. When such entries are found, it simply
59          * restarts them, by replacing the 'data' wiht a new AsyncPingEntry instance. While invoking
60          * the AsyncPingEntry's constructor, the new RPC call to the ping plugin backend is executed.
61          */
62         @Override
63         public void run() {
64             Iterator<ConcurrentMap.Entry<Ipv4Address, AsyncPingEntry>> iterator =
65                     asyncPingEntryConcurrentMap.entrySet().iterator();
66             while (iterator.hasNext()) {
67                 try {
68                     ConcurrentHashMap.Entry<Ipv4Address, AsyncPingEntry> entry = iterator.next();
69                     Ipv4Address destination = entry.getKey();
70                     AsyncPingEntry asyncPingEntry = entry.getValue();
71
72                     /** Re-initiate ping, as long as last attempt for doing it is finished
73                      */
74                     if (asyncPingEntry.pendingResult == null || asyncPingEntry.pendingResult.isDone()) {
75                         /** Bump receive count if destination was reachable
76                          */
77                         if (asyncPingEntry != null) {
78                             final SendEchoOutput.EchoResult echoResult =
79                                     asyncPingEntry.pendingResult.get().getResult().getEchoResult();
80
81                             asyncPingEntry.lastEchoResult = echoResult;
82                             if (echoResult == SendEchoOutput.EchoResult.Reachable) { asyncPingEntry.countReceived++; }
83                         }
84
85                         /** Replace the entry for a key only if currently mapped to some value.
86                          * This will protect the concurrent map against a race where this thread
87                          * would be re-adding an entry that just got taken out.
88                          *
89                          * Note that by constructing a new AsyncPingEntry instance, the ping plugin is
90                          * triggered and the cycle restarts by waiting for asyncPingEntry.pendingResult
91                          * of the new instance to be done.
92                          */
93                         asyncPingEntryConcurrentMap.replace(destination,
94                                 new AsyncPingEntry(destination, pingService, asyncPingEntry));
95                     }
96                 } catch (InterruptedException ie) {
97                 } catch (ExecutionException ee) {
98                 }
99             }
100         }
101     }
102
103     private PingResult mapResult(SendEchoOutput.EchoResult echoResult) {
104         // Translate echoResult to pingResult
105         switch (echoResult) {
106             case Reachable:
107                 return PingResult.GotResponse;
108             case Unreachable:
109                 return PingResult.NoResponse;
110             case Error:
111             default:
112                 break;
113         }
114         return PingResult.Error;
115     }
116
117     @Override
118     public void onSessionInitialized(ConsumerContext session) {
119         this.session = session;
120         this.pingService = this.session.getRpcService(PingService.class);
121     }
122
123     @Override
124     protected void startImpl(BundleContext context) {
125         asyncPingEntryConcurrentMap = new ConcurrentHashMap<Ipv4Address, AsyncPingEntry>();
126
127         /* Async Ping Timer to go off every 1 second for periodic pingService behavior */
128         asyncPingTimer = new Timer();
129         asyncPingTimer.schedule(new AsyncPingTimerHandler(), 1000, 1000);
130
131         context.registerService(PingServiceAPI.class, this, null);
132     }
133
134     @Override
135     protected void stopImpl(BundleContext context) {
136         asyncPingTimer.cancel();
137         asyncPingEntryConcurrentMap.clear();
138     }
139
140     @Override
141     public PingResult pingDestinationSync(String address) {
142         if (pingService == null) { return PingResult.Error; } // No pingService service found.
143
144         try {
145             Ipv4Address destination = new Ipv4Address(address);
146             SendEchoInputBuilder ib = new SendEchoInputBuilder();
147             ib.setDestination(destination);
148             return mapResult(pingService.sendEcho(ib.build()).get().getResult().getEchoResult());
149         } catch (InterruptedException ie) {
150         } catch (ExecutionException ee) {
151         }
152         return PingResult.Error;
153     }
154
155     @Override
156     public PingResult pingDestinationAsync(String address) {
157         if (pingService == null) { return PingResult.Error; } // No pingService service found.
158
159         /** Look for destination in asyncPingEntryConcurrentMap. If none is found, create a new entry
160          * and return "in progress". This will happen on the very first time async is requested.
161          */
162         Ipv4Address destination = new Ipv4Address(address);
163         AsyncPingEntry asyncPingEntry = asyncPingEntryConcurrentMap.get(destination);
164         if (asyncPingEntry == null) {
165             asyncPingEntryConcurrentMap.put(destination, new AsyncPingEntry(destination, pingService, null));
166             return PingResult.InProgress;
167         }
168
169         /** Pending result may not be ready to be consumed. In such case, use lastResult. If there has
170          * not been a lastResult, then the only choice is to use "in progress".
171          */
172         if (!asyncPingEntry.pendingResult.isDone()) {
173             return asyncPingEntry.lastEchoResult == null ?
174                     PingResult.InProgress : mapResult(asyncPingEntry.lastEchoResult);
175         }
176
177         /** If we made it this far, we know that pendingResult contains the latest and greatest result
178          */
179         try {
180             return mapResult(asyncPingEntry.pendingResult.get().getResult().getEchoResult());
181         } catch (InterruptedException ie) {
182         } catch (ExecutionException ee) {
183         }
184         return PingResult.Error;
185     }
186
187     @Override
188     public void pingAsyncStop(String address) {
189         asyncPingEntryConcurrentMap.remove( new Ipv4Address(address) );
190     }
191 }