[DO NOT MERGE] 92/6992/2
authorFlavio Fernandes <ffernand@redhat.com>
Wed, 14 May 2014 12:56:58 +0000 (08:56 -0400)
committerFlavio Fernandes <ffernand@redhat.com>
Thu, 15 May 2014 19:30:15 +0000 (15:30 -0400)
This is an extenstion to the existing wiki page on md-sal ping tutorial.
It exposes the use of Future<> with the rpc api generated by yang-tools,
to make the ping service non-blocking; using the uris:

   .../ping/async/start/{ipAddress}
   .../ping/async/get/{ipAddress}
   .../ping/async/stop/{ipAddress}

If this becomes interesting enough, I will update the link https://wiki.opendaylight.org/view/Ping
with the stepping stones.

Example:

for x in 127.0.0.1 128.0.0.1 192.168.1.1 ;
   do curl --user "admin":"admin" -X PUT  http://localhost:8080/controller/nb/v2/ping/async/start/${x} ; echo ; done

while : ; do for x in 127.0.0.1 128.0.0.1 192.168.1.1 ;
   do curl --user "admin":"admin" -X PUT  http://localhost:8080/controller/nb/v2/ping/async/get/${x} ; echo ;
done ; sleep 2 ; done

for x in 127.0.0.1 128.0.0.1 192.168.1.1 ;
   do curl --user "admin":"admin" -X PUT  http://localhost:8080/controller/nb/v2/ping/async/stop/${x} ; echo ; done

[DO NOT MERGE]

Change-Id: Ibfaee34ace0435cac39006a7e9c6577df2bf71a8
Signed-off-by: Flavio Fernandes <ffernand@redhat.com>
opendaylight/ping/northbound/src/main/java/org/opendaylight/controller/ping/northbound/PingNorthbound.java
opendaylight/ping/plugin/src/main/java/org/opendaylight/controller/ping/plugin/internal/PingImpl.java
opendaylight/ping/service/src/main/java/org/opendaylight/controller/ping/service/api/PingServiceAPI.java
opendaylight/ping/service/src/main/java/org/opendaylight/controller/ping/service/impl/PingServiceImpl.java

index e1e21349bec7e7b7c989031d4ab3b6e774a44be6..acc0b01b09c3399c374f6bc9732ac6f623d0c198 100644 (file)
@@ -18,22 +18,59 @@ public class PingNorthbound {
     @Path("/ping/{ipAddress}")
     @PUT
     @StatusCodes({
-        @ResponseCode(code = 200, condition = "Destination reachable"),
-        @ResponseCode(code = 503, condition = "Internal error"),
-        @ResponseCode(code = 503, condition = "Destination unreachable") })
+            @ResponseCode(code = 200, condition = "Destination reachable"),
+            // @ResponseCode(code = 206, condition = "Ping in progress"),
+            @ResponseCode(code = 503, condition = "Internal error"),
+            @ResponseCode(code = 503, condition = "Destination unreachable") })
     public Response ping(@PathParam(value = "ipAddress") String ipAddress) {
-        PingServiceAPI ping = (PingServiceAPI) ServiceHelper.getGlobalInstance(
-                PingServiceAPI.class, this);
-        if (ping == null) {
+        return pingCommon(ipAddress, true);
+    }
+
+    @Path("/ping/async/start/{ipAddress}")
+    @PUT
+    @StatusCodes({
+            @ResponseCode(code = 200, condition = "Destination reachable"),
+            @ResponseCode(code = 206, condition = "Ping in progress"),
+            @ResponseCode(code = 503, condition = "Internal error"),
+            @ResponseCode(code = 503, condition = "Destination unreachable") })
+    public Response pingAsyncStart(@PathParam(value = "ipAddress") String ipAddress) {
+        return pingCommon(ipAddress, false);
+    }
+
+    @Path("/ping/async/get/{ipAddress}")
+    @PUT
+    @StatusCodes({
+            @ResponseCode(code = 200, condition = "Destination reachable"),
+            @ResponseCode(code = 206, condition = "Ping in progress"),
+            @ResponseCode(code = 503, condition = "Internal error"),
+            @ResponseCode(code = 503, condition = "Destination unreachable") })
+    public Response pingAsyncGet(@PathParam(value = "ipAddress") String ipAddress) {
+        return pingCommon(ipAddress, false);
+    }
 
+    @Path("/ping/async/stop/{ipAddress}")
+    @PUT
+    @StatusCodes({
+            @ResponseCode(code = 200, condition = "Ping stopped"),
+            @ResponseCode(code = 503, condition = "Internal error")})
+    public Response pingAsyncStop(@PathParam(value = "ipAddress") String ipAddress) {
+        PingServiceAPI pingServiceAPI = (PingServiceAPI) ServiceHelper.getGlobalInstance(PingServiceAPI.class, this);
+        if (pingServiceAPI != null) { pingServiceAPI.pingAsyncStop(ipAddress); }
+        return Response.ok(new String(ipAddress + " - stopped")).build();  // idem-potent
+    }
+
+    private Response pingCommon(String ipAddress, boolean isSync) {
+        PingServiceAPI pingServiceAPI = (PingServiceAPI) ServiceHelper.getGlobalInstance(PingServiceAPI.class, this);
+        if (pingServiceAPI == null) {
             /* Ping service not found. */
-            return Response.ok(new String("No ping service")).status(500)
-                    .build();
+            return Response.ok(new String("No ping service")).status(500).build();
         }
-        if (ping.pingDestination(ipAddress))
-            return Response.ok(new String(ipAddress + " - reachable")).build();
-
-        return Response.ok(new String(ipAddress + " - unreachable")).status(503)
-                .build();
+        PingServiceAPI.PingResult pingResult = isSync ?
+                pingServiceAPI.pingDestinationSync(ipAddress) : pingServiceAPI.pingDestinationAsync(ipAddress);
+        if (pingResult == PingServiceAPI.PingResult.InProgress)
+            return Response.ok(new String(ipAddress + " - " + pingResult)).status(206).build();
+        if (pingResult == PingServiceAPI.PingResult.GotResponse)
+            return Response.ok(new String(ipAddress + " - " + pingResult)).build();
+        return Response.ok(new String(ipAddress + " - " + pingResult)).status(503).build();
     }
 }
index 0fe73000f9f8bfba261c6e2462a2eb2fb473c29d..3305503c30e65adb2da5c3e4a8c4af4d710f8c83 100644 (file)
@@ -1,8 +1,10 @@
 package org.opendaylight.controller.ping.plugin.internal;
 
-import java.io.IOException;
 import java.net.InetAddress;
 import java.util.Collections;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 import org.opendaylight.controller.sal.common.util.Rpcs;
@@ -14,43 +16,30 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.ping.rev130911.SendEchoOutp
 import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 
-import com.google.common.util.concurrent.Futures;
-
 public class PingImpl implements PingService {
-
-    private EchoResult pingHost(InetAddress destination) throws IOException {
-        if (destination.isReachable(5000)) {
-            return EchoResult.Reachable;
-        } else {
-            return EchoResult.Unreachable;
-        }
+    private final ExecutorService pool = Executors.newFixedThreadPool(2);
+
+    private Future<RpcResult<SendEchoOutput>> startPingHost(final SendEchoInput destination) {
+        return pool.submit(new Callable<RpcResult<SendEchoOutput>>() {
+            @Override
+            public RpcResult<SendEchoOutput> call() throws Exception {
+                SendEchoOutputBuilder ob = new SendEchoOutputBuilder();
+                try {
+                    InetAddress dst = InetAddress.getByName(destination.getDestination().getValue());
+                    /* Build the result and return it. */
+                    ob.setEchoResult(dst.isReachable(5000) ? EchoResult.Reachable : EchoResult.Unreachable);
+                } catch (Exception e) {
+                    /* Return error result. */
+                    ob.setEchoResult(EchoResult.Error);
+                }
+                return Rpcs.<SendEchoOutput>getRpcResult(true, ob.build(), Collections.<RpcError>emptySet());
+            }
+        });
     }
 
     @Override
     public Future<RpcResult<SendEchoOutput>> sendEcho(SendEchoInput destination) {
-        try {
-            InetAddress dst = InetAddress.getByName(destination
-                    .getDestination().getValue());
-            EchoResult result = this.pingHost(dst);
-
-            /* Build the result and return it. */
-            SendEchoOutputBuilder ob = new SendEchoOutputBuilder();
-            ob.setEchoResult(result);
-            RpcResult<SendEchoOutput> rpcResult =
-                    Rpcs.<SendEchoOutput> getRpcResult(true, ob.build(),
-                            Collections.<RpcError> emptySet());
-
-            return Futures.immediateFuture(rpcResult);
-        } catch (Exception e) {
-
-            /* Return error result. */
-            SendEchoOutputBuilder ob = new SendEchoOutputBuilder();
-            ob.setEchoResult(EchoResult.Error);
-            RpcResult<SendEchoOutput> rpcResult =
-                    Rpcs.<SendEchoOutput> getRpcResult(true, ob.build(),
-                            Collections.<RpcError> emptySet());
-            return Futures.immediateFuture(rpcResult);
-        }
+        return this.startPingHost(destination);
     }
 
 }
index 5dea55ae416ffbcb1674c3d14d85807f40fab139..8a9da4d01287c7cd4cae711ed48ca401bfc6d538 100644 (file)
@@ -3,13 +3,70 @@ package org.opendaylight.controller.ping.service.api;
 
 public interface PingServiceAPI {
 
+    public enum PingResult {
+        InProgress(0),
+        GotResponse(1),
+        NoResponse(2),
+        Error(3);
+
+        int value;
+        static java.util.Map<java.lang.Integer, PingResult> valueMap;
+
+        static {
+            valueMap = new java.util.HashMap<>();
+            for (PingResult enumItem : PingResult.values())
+            {
+                valueMap.put(enumItem.value, enumItem);
+            }
+        }
+
+        private PingResult(int value) {
+            this.value = value;
+        }
+
+        /**
+         * @return integer value
+         */
+        public int getIntValue() {
+            return value;
+        }
+
+        /**
+         * @param valueArg
+         * @return corresponding EchoResult item
+         */
+        public static PingResult forValue(int valueArg) {
+            return valueMap.get(valueArg);
+        }
+    }
+
     /**
-     * pingDestination
+     * pingDestinationSync
+     *
+     * Will block caller until ping operation is finished.
+     *
+     * @param address An IPv4 address to be pinged
+     * @return PingResult enum. Will never return InProgress.
+     */
+    PingResult pingDestinationSync(String address);
+
+    /**
+     * pingDestinationAsync
+     *
+     * Will return last known state for given address.
+     *
+     * @param address An IPv4 address to be pinged
+     * @return PingResult enum.
+     */
+    PingResult pingDestinationAsync(String address);
+
+    /**
+     * pingAsyncStop
+     *
+     * Will stop async ping for given address.
      *
      * @param address An IPv4 address to be pinged
-     * @return True if address is reachable,
-     * false if address is unreachable or error occurs.
      */
-    boolean pingDestination(String address);
+    void pingAsyncStop(String address);
 }
 
index 08229b7200b28d0ec270ee8465edda270eaf8a7e..eebf01b5a9cb0e573082825e020d2e80db6ceaa1 100644 (file)
@@ -1,6 +1,12 @@
 package org.opendaylight.controller.ping.service.impl;
 
+import java.util.Iterator;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 
 import org.opendaylight.controller.ping.service.api.PingServiceAPI;
 import org.opendaylight.controller.sal.binding.api.AbstractBindingAwareConsumer;
@@ -17,50 +23,169 @@ import org.osgi.framework.BundleContext;
 public class PingServiceImpl extends AbstractBindingAwareConsumer implements
         BundleActivator, BindingAwareConsumer, PingServiceAPI {
 
-    private PingService ping;
+    private PingService pingService;
     private ConsumerContext session;
 
+    private class AsyncPingEntry {
+        public Future<RpcResult<SendEchoOutput>> pendingResult;
+        public SendEchoOutput.EchoResult lastEchoResult;
+        public int countSent = 0;     // debug use
+        public int countReceived = 0; // debug use
+
+        public AsyncPingEntry(Ipv4Address destination, PingService pingService, AsyncPingEntry previousAsyncPingEntry) {
+            if (previousAsyncPingEntry != null) {
+                this.lastEchoResult = previousAsyncPingEntry.lastEchoResult;
+                this.countSent = previousAsyncPingEntry.countSent;
+                this.countReceived = previousAsyncPingEntry.countReceived;
+            }
+
+            /** Tickle the ping service to go ahead and send the echo packet.
+             */
+            try {
+                SendEchoInputBuilder ib = new SendEchoInputBuilder();
+                ib.setDestination(destination);
+                this.pendingResult = pingService.sendEcho(ib.build());
+                this.countSent++;
+            } catch (Exception e) {
+                this.pendingResult = null;
+            }
+        }
+    }
+    private ConcurrentMap<Ipv4Address, AsyncPingEntry> asyncPingEntryConcurrentMap;
+    private Timer asyncPingTimer;
+    private class AsyncPingTimerHandler extends TimerTask {
+        /** The purpose of AsyncPingTimerHandler's life is to look for async entries that have been
+         * finished with their RPC calls to the ping plugin. When such entries are found, it simply
+         * restarts them, by replacing the 'data' wiht a new AsyncPingEntry instance. While invoking
+         * the AsyncPingEntry's constructor, the new RPC call to the ping plugin backend is executed.
+         */
+        @Override
+        public void run() {
+            Iterator<ConcurrentMap.Entry<Ipv4Address, AsyncPingEntry>> iterator =
+                    asyncPingEntryConcurrentMap.entrySet().iterator();
+            while (iterator.hasNext()) {
+                try {
+                    ConcurrentHashMap.Entry<Ipv4Address, AsyncPingEntry> entry = iterator.next();
+                    Ipv4Address destination = entry.getKey();
+                    AsyncPingEntry asyncPingEntry = entry.getValue();
+
+                    /** Re-initiate ping, as long as last attempt for doing it is finished
+                     */
+                    if (asyncPingEntry.pendingResult == null || asyncPingEntry.pendingResult.isDone()) {
+                        /** Bump receive count if destination was reachable
+                         */
+                        if (asyncPingEntry != null) {
+                            final SendEchoOutput.EchoResult echoResult =
+                                    asyncPingEntry.pendingResult.get().getResult().getEchoResult();
+
+                            asyncPingEntry.lastEchoResult = echoResult;
+                            if (echoResult == SendEchoOutput.EchoResult.Reachable) { asyncPingEntry.countReceived++; }
+                        }
+
+                        /** Replace the entry for a key only if currently mapped to some value.
+                         * This will protect the concurrent map against a race where this thread
+                         * would be re-adding an entry that just got taken out.
+                         *
+                         * Note that by constructing a new AsyncPingEntry instance, the ping plugin is
+                         * triggered and the cycle restarts by waiting for asyncPingEntry.pendingResult
+                         * of the new instance to be done.
+                         */
+                        asyncPingEntryConcurrentMap.replace(destination,
+                                new AsyncPingEntry(destination, pingService, asyncPingEntry));
+                    }
+                } catch (InterruptedException ie) {
+                } catch (ExecutionException ee) {
+                }
+            }
+        }
+    }
+
+    private PingResult mapResult(SendEchoOutput.EchoResult echoResult) {
+        // Translate echoResult to pingResult
+        switch (echoResult) {
+            case Reachable:
+                return PingResult.GotResponse;
+            case Unreachable:
+                return PingResult.NoResponse;
+            case Error:
+            default:
+                break;
+        }
+        return PingResult.Error;
+    }
+
     @Override
     public void onSessionInitialized(ConsumerContext session) {
         this.session = session;
+        this.pingService = this.session.getRpcService(PingService.class);
     }
 
     @Override
     protected void startImpl(BundleContext context) {
+        asyncPingEntryConcurrentMap = new ConcurrentHashMap<Ipv4Address, AsyncPingEntry>();
+
+        /* Async Ping Timer to go off every 1 second for periodic pingService behavior */
+        asyncPingTimer = new Timer();
+        asyncPingTimer.schedule(new AsyncPingTimerHandler(), 1000, 1000);
+
         context.registerService(PingServiceAPI.class, this, null);
     }
 
     @Override
-    public boolean pingDestination(String address) {
+    protected void stopImpl(BundleContext context) {
+        asyncPingTimer.cancel();
+        asyncPingEntryConcurrentMap.clear();
+    }
 
-        if (ping == null) {
-            ping = this.session.getRpcService(PingService.class);
-            if (ping == null) {
+    @Override
+    public PingResult pingDestinationSync(String address) {
+        if (pingService == null) { return PingResult.Error; } // No pingService service found.
 
-                /* No ping service found. */
-                return false;
-            }
+        try {
+            Ipv4Address destination = new Ipv4Address(address);
+            SendEchoInputBuilder ib = new SendEchoInputBuilder();
+            ib.setDestination(destination);
+            return mapResult(pingService.sendEcho(ib.build()).get().getResult().getEchoResult());
+        } catch (InterruptedException ie) {
+        } catch (ExecutionException ee) {
         }
+        return PingResult.Error;
+    }
 
+    @Override
+    public PingResult pingDestinationAsync(String address) {
+        if (pingService == null) { return PingResult.Error; } // No pingService service found.
+
+        /** Look for destination in asyncPingEntryConcurrentMap. If none is found, create a new entry
+         * and return "in progress". This will happen on the very first time async is requested.
+         */
         Ipv4Address destination = new Ipv4Address(address);
+        AsyncPingEntry asyncPingEntry = asyncPingEntryConcurrentMap.get(destination);
+        if (asyncPingEntry == null) {
+            asyncPingEntryConcurrentMap.put(destination, new AsyncPingEntry(destination, pingService, null));
+            return PingResult.InProgress;
+        }
 
-        SendEchoInputBuilder ib = new SendEchoInputBuilder();
-        ib.setDestination(destination);
+        /** Pending result may not be ready to be consumed. In such case, use lastResult. If there has
+         * not been a lastResult, then the only choice is to use "in progress".
+         */
+        if (!asyncPingEntry.pendingResult.isDone()) {
+            return asyncPingEntry.lastEchoResult == null ?
+                    PingResult.InProgress : mapResult(asyncPingEntry.lastEchoResult);
+        }
+
+        /** If we made it this far, we know that pendingResult contains the latest and greatest result
+         */
         try {
-            RpcResult<SendEchoOutput> result = ping.sendEcho(ib.build()).get();
-            switch (result.getResult().getEchoResult()) {
-            case Reachable:
-                return true;
-            case Unreachable:
-            case Error:
-            default:
-                return false;
-            }
+            return mapResult(asyncPingEntry.pendingResult.get().getResult().getEchoResult());
         } catch (InterruptedException ie) {
         } catch (ExecutionException ee) {
         }
-
-        return false;
+        return PingResult.Error;
     }
 
+    @Override
+    public void pingAsyncStop(String address) {
+        asyncPingEntryConcurrentMap.remove( new Ipv4Address(address) );
+    }
 }