[DO NOT MERGE]
[controller.git] / opendaylight / ping / service / src / main / java / org / opendaylight / controller / ping / service / impl / PingServiceImpl.java
index 08229b7200b28d0ec270ee8465edda270eaf8a7e..eebf01b5a9cb0e573082825e020d2e80db6ceaa1 100644 (file)
@@ -1,6 +1,12 @@
 package org.opendaylight.controller.ping.service.impl;
 
+import java.util.Iterator;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 
 import org.opendaylight.controller.ping.service.api.PingServiceAPI;
 import org.opendaylight.controller.sal.binding.api.AbstractBindingAwareConsumer;
@@ -17,50 +23,169 @@ import org.osgi.framework.BundleContext;
 public class PingServiceImpl extends AbstractBindingAwareConsumer implements
         BundleActivator, BindingAwareConsumer, PingServiceAPI {
 
-    private PingService ping;
+    private PingService pingService;
     private ConsumerContext session;
 
+    private class AsyncPingEntry {
+        public Future<RpcResult<SendEchoOutput>> pendingResult;
+        public SendEchoOutput.EchoResult lastEchoResult;
+        public int countSent = 0;     // debug use
+        public int countReceived = 0; // debug use
+
+        public AsyncPingEntry(Ipv4Address destination, PingService pingService, AsyncPingEntry previousAsyncPingEntry) {
+            if (previousAsyncPingEntry != null) {
+                this.lastEchoResult = previousAsyncPingEntry.lastEchoResult;
+                this.countSent = previousAsyncPingEntry.countSent;
+                this.countReceived = previousAsyncPingEntry.countReceived;
+            }
+
+            /** Tickle the ping service to go ahead and send the echo packet.
+             */
+            try {
+                SendEchoInputBuilder ib = new SendEchoInputBuilder();
+                ib.setDestination(destination);
+                this.pendingResult = pingService.sendEcho(ib.build());
+                this.countSent++;
+            } catch (Exception e) {
+                this.pendingResult = null;
+            }
+        }
+    }
+    private ConcurrentMap<Ipv4Address, AsyncPingEntry> asyncPingEntryConcurrentMap;
+    private Timer asyncPingTimer;
+    private class AsyncPingTimerHandler extends TimerTask {
+        /** The purpose of AsyncPingTimerHandler's life is to look for async entries that have been
+         * finished with their RPC calls to the ping plugin. When such entries are found, it simply
+         * restarts them, by replacing the 'data' wiht a new AsyncPingEntry instance. While invoking
+         * the AsyncPingEntry's constructor, the new RPC call to the ping plugin backend is executed.
+         */
+        @Override
+        public void run() {
+            Iterator<ConcurrentMap.Entry<Ipv4Address, AsyncPingEntry>> iterator =
+                    asyncPingEntryConcurrentMap.entrySet().iterator();
+            while (iterator.hasNext()) {
+                try {
+                    ConcurrentHashMap.Entry<Ipv4Address, AsyncPingEntry> entry = iterator.next();
+                    Ipv4Address destination = entry.getKey();
+                    AsyncPingEntry asyncPingEntry = entry.getValue();
+
+                    /** Re-initiate ping, as long as last attempt for doing it is finished
+                     */
+                    if (asyncPingEntry.pendingResult == null || asyncPingEntry.pendingResult.isDone()) {
+                        /** Bump receive count if destination was reachable
+                         */
+                        if (asyncPingEntry != null) {
+                            final SendEchoOutput.EchoResult echoResult =
+                                    asyncPingEntry.pendingResult.get().getResult().getEchoResult();
+
+                            asyncPingEntry.lastEchoResult = echoResult;
+                            if (echoResult == SendEchoOutput.EchoResult.Reachable) { asyncPingEntry.countReceived++; }
+                        }
+
+                        /** Replace the entry for a key only if currently mapped to some value.
+                         * This will protect the concurrent map against a race where this thread
+                         * would be re-adding an entry that just got taken out.
+                         *
+                         * Note that by constructing a new AsyncPingEntry instance, the ping plugin is
+                         * triggered and the cycle restarts by waiting for asyncPingEntry.pendingResult
+                         * of the new instance to be done.
+                         */
+                        asyncPingEntryConcurrentMap.replace(destination,
+                                new AsyncPingEntry(destination, pingService, asyncPingEntry));
+                    }
+                } catch (InterruptedException ie) {
+                } catch (ExecutionException ee) {
+                }
+            }
+        }
+    }
+
+    private PingResult mapResult(SendEchoOutput.EchoResult echoResult) {
+        // Translate echoResult to pingResult
+        switch (echoResult) {
+            case Reachable:
+                return PingResult.GotResponse;
+            case Unreachable:
+                return PingResult.NoResponse;
+            case Error:
+            default:
+                break;
+        }
+        return PingResult.Error;
+    }
+
     @Override
     public void onSessionInitialized(ConsumerContext session) {
         this.session = session;
+        this.pingService = this.session.getRpcService(PingService.class);
     }
 
     @Override
     protected void startImpl(BundleContext context) {
+        asyncPingEntryConcurrentMap = new ConcurrentHashMap<Ipv4Address, AsyncPingEntry>();
+
+        /* Async Ping Timer to go off every 1 second for periodic pingService behavior */
+        asyncPingTimer = new Timer();
+        asyncPingTimer.schedule(new AsyncPingTimerHandler(), 1000, 1000);
+
         context.registerService(PingServiceAPI.class, this, null);
     }
 
     @Override
-    public boolean pingDestination(String address) {
+    protected void stopImpl(BundleContext context) {
+        asyncPingTimer.cancel();
+        asyncPingEntryConcurrentMap.clear();
+    }
 
-        if (ping == null) {
-            ping = this.session.getRpcService(PingService.class);
-            if (ping == null) {
+    @Override
+    public PingResult pingDestinationSync(String address) {
+        if (pingService == null) { return PingResult.Error; } // No pingService service found.
 
-                /* No ping service found. */
-                return false;
-            }
+        try {
+            Ipv4Address destination = new Ipv4Address(address);
+            SendEchoInputBuilder ib = new SendEchoInputBuilder();
+            ib.setDestination(destination);
+            return mapResult(pingService.sendEcho(ib.build()).get().getResult().getEchoResult());
+        } catch (InterruptedException ie) {
+        } catch (ExecutionException ee) {
         }
+        return PingResult.Error;
+    }
 
+    @Override
+    public PingResult pingDestinationAsync(String address) {
+        if (pingService == null) { return PingResult.Error; } // No pingService service found.
+
+        /** Look for destination in asyncPingEntryConcurrentMap. If none is found, create a new entry
+         * and return "in progress". This will happen on the very first time async is requested.
+         */
         Ipv4Address destination = new Ipv4Address(address);
+        AsyncPingEntry asyncPingEntry = asyncPingEntryConcurrentMap.get(destination);
+        if (asyncPingEntry == null) {
+            asyncPingEntryConcurrentMap.put(destination, new AsyncPingEntry(destination, pingService, null));
+            return PingResult.InProgress;
+        }
 
-        SendEchoInputBuilder ib = new SendEchoInputBuilder();
-        ib.setDestination(destination);
+        /** Pending result may not be ready to be consumed. In such case, use lastResult. If there has
+         * not been a lastResult, then the only choice is to use "in progress".
+         */
+        if (!asyncPingEntry.pendingResult.isDone()) {
+            return asyncPingEntry.lastEchoResult == null ?
+                    PingResult.InProgress : mapResult(asyncPingEntry.lastEchoResult);
+        }
+
+        /** If we made it this far, we know that pendingResult contains the latest and greatest result
+         */
         try {
-            RpcResult<SendEchoOutput> result = ping.sendEcho(ib.build()).get();
-            switch (result.getResult().getEchoResult()) {
-            case Reachable:
-                return true;
-            case Unreachable:
-            case Error:
-            default:
-                return false;
-            }
+            return mapResult(asyncPingEntry.pendingResult.get().getResult().getEchoResult());
         } catch (InterruptedException ie) {
         } catch (ExecutionException ee) {
         }
-
-        return false;
+        return PingResult.Error;
     }
 
+    @Override
+    public void pingAsyncStop(String address) {
+        asyncPingEntryConcurrentMap.remove( new Ipv4Address(address) );
+    }
 }