[DO NOT MERGE] 92/6992/4
authorFlavio Fernandes <ffernand@redhat.com>
Wed, 14 May 2014 12:56:58 +0000 (08:56 -0400)
committerFlavio Fernandes <ffernand@redhat.com>
Tue, 20 May 2014 15:14:50 +0000 (11:14 -0400)
This is an extension to the existing wiki page on md-sal ping tutorial.
It exposes the use of Future<> with the rpc api generated by yang-tools,
to make the ping service non-blocking; using the uris:

   .../ping/async/start/{ipAddress}
   .../ping/async/get/{ipAddress}
   .../ping/async/clear/{ipAddress}

If this becomes interesting enough, I will update the link https://wiki.opendaylight.org/view/Ping
with the stepping stones.

Example:

for x in 127.0.0.1 128.0.0.1 192.168.1.1 ;
   do curl --user "admin":"admin" -X PUT  http://localhost:8080/controller/nb/v2/ping/async/start/${x} ; echo ; done

while : ; do for x in 127.0.0.1 128.0.0.1 192.168.1.1 ;
   do curl --user "admin":"admin" -X PUT  http://localhost:8080/controller/nb/v2/ping/async/get/${x} ; echo ;
done ; sleep 2 ; done

for x in 127.0.0.1 128.0.0.1 192.168.1.1 ;
   do curl --user "admin":"admin" -X PUT  http://localhost:8080/controller/nb/v2/ping/async/clear/${x} ; echo ; done

[DO NOT MERGE]

Change-Id: Ibfaee34ace0435cac39006a7e9c6577df2bf71a8
Signed-off-by: Flavio Fernandes <ffernand@redhat.com>
opendaylight/ping/northbound/src/main/java/org/opendaylight/controller/ping/northbound/PingNorthbound.java
opendaylight/ping/plugin/src/main/java/org/opendaylight/controller/ping/plugin/internal/PingImpl.java
opendaylight/ping/service/pom.xml
opendaylight/ping/service/src/main/java/org/opendaylight/controller/ping/service/api/PingServiceAPI.java
opendaylight/ping/service/src/main/java/org/opendaylight/controller/ping/service/impl/PingServiceImpl.java

index e1e21349bec7e7b7c989031d4ab3b6e774a44be6..62f794d5c87d640b29032b7cf2df2c0d8e510062 100644 (file)
@@ -18,22 +18,59 @@ public class PingNorthbound {
     @Path("/ping/{ipAddress}")
     @PUT
     @StatusCodes({
-        @ResponseCode(code = 200, condition = "Destination reachable"),
-        @ResponseCode(code = 503, condition = "Internal error"),
-        @ResponseCode(code = 503, condition = "Destination unreachable") })
+            @ResponseCode(code = 200, condition = "Destination reachable"),
+            // @ResponseCode(code = 206, condition = "Ping in progress"),
+            @ResponseCode(code = 503, condition = "Internal error"),
+            @ResponseCode(code = 503, condition = "Destination unreachable") })
     public Response ping(@PathParam(value = "ipAddress") String ipAddress) {
-        PingServiceAPI ping = (PingServiceAPI) ServiceHelper.getGlobalInstance(
-                PingServiceAPI.class, this);
-        if (ping == null) {
+        return pingCommon(ipAddress, true);
+    }
+
+    @Path("/ping/async/start/{ipAddress}")
+    @PUT
+    @StatusCodes({
+            @ResponseCode(code = 200, condition = "Destination reachable"),
+            @ResponseCode(code = 206, condition = "Ping in progress"),
+            @ResponseCode(code = 503, condition = "Internal error"),
+            @ResponseCode(code = 503, condition = "Destination unreachable") })
+    public Response pingAsyncStart(@PathParam(value = "ipAddress") String ipAddress) {
+        return pingCommon(ipAddress, false);
+    }
+
+    @Path("/ping/async/get/{ipAddress}")
+    @PUT
+    @StatusCodes({
+            @ResponseCode(code = 200, condition = "Destination reachable"),
+            @ResponseCode(code = 206, condition = "Ping in progress"),
+            @ResponseCode(code = 503, condition = "Internal error"),
+            @ResponseCode(code = 503, condition = "Destination unreachable") })
+    public Response pingAsyncGet(@PathParam(value = "ipAddress") String ipAddress) {
+        return pingCommon(ipAddress, false);
+    }
 
+    @Path("/ping/async/clear/{ipAddress}")
+    @PUT
+    @StatusCodes({
+            @ResponseCode(code = 200, condition = "Async ping removed"),
+            @ResponseCode(code = 503, condition = "Internal error")})
+    public Response pingAsyncClear(@PathParam(value = "ipAddress") String ipAddress) {
+        PingServiceAPI pingServiceAPI = (PingServiceAPI) ServiceHelper.getGlobalInstance(PingServiceAPI.class, this);
+        if (pingServiceAPI != null) { pingServiceAPI.pingAsyncClear(ipAddress); }
+        return Response.ok(new String(ipAddress + " - removed")).build();  // idem-potent
+    }
+
+    private Response pingCommon(String ipAddress, boolean isSync) {
+        PingServiceAPI pingServiceAPI = (PingServiceAPI) ServiceHelper.getGlobalInstance(PingServiceAPI.class, this);
+        if (pingServiceAPI == null) {
             /* Ping service not found. */
-            return Response.ok(new String("No ping service")).status(500)
-                    .build();
+            return Response.ok(new String("No ping service")).status(500).build();
         }
-        if (ping.pingDestination(ipAddress))
-            return Response.ok(new String(ipAddress + " - reachable")).build();
-
-        return Response.ok(new String(ipAddress + " - unreachable")).status(503)
-                .build();
+        PingServiceAPI.PingResult pingResult = isSync ?
+                pingServiceAPI.pingDestinationSync(ipAddress) : pingServiceAPI.pingDestinationAsync(ipAddress);
+        if (pingResult == PingServiceAPI.PingResult.InProgress)
+            return Response.ok(new String(ipAddress + " - " + pingResult)).status(206).build();
+        if (pingResult == PingServiceAPI.PingResult.GotResponse)
+            return Response.ok(new String(ipAddress + " - " + pingResult)).build();
+        return Response.ok(new String(ipAddress + " - " + pingResult)).status(503).build();
     }
 }
index 0fe73000f9f8bfba261c6e2462a2eb2fb473c29d..3305503c30e65adb2da5c3e4a8c4af4d710f8c83 100644 (file)
@@ -1,8 +1,10 @@
 package org.opendaylight.controller.ping.plugin.internal;
 
-import java.io.IOException;
 import java.net.InetAddress;
 import java.util.Collections;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 import org.opendaylight.controller.sal.common.util.Rpcs;
@@ -14,43 +16,30 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.ping.rev130911.SendEchoOutp
 import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 
-import com.google.common.util.concurrent.Futures;
-
 public class PingImpl implements PingService {
-
-    private EchoResult pingHost(InetAddress destination) throws IOException {
-        if (destination.isReachable(5000)) {
-            return EchoResult.Reachable;
-        } else {
-            return EchoResult.Unreachable;
-        }
+    private final ExecutorService pool = Executors.newFixedThreadPool(2);
+
+    private Future<RpcResult<SendEchoOutput>> startPingHost(final SendEchoInput destination) {
+        return pool.submit(new Callable<RpcResult<SendEchoOutput>>() {
+            @Override
+            public RpcResult<SendEchoOutput> call() throws Exception {
+                SendEchoOutputBuilder ob = new SendEchoOutputBuilder();
+                try {
+                    InetAddress dst = InetAddress.getByName(destination.getDestination().getValue());
+                    /* Build the result and return it. */
+                    ob.setEchoResult(dst.isReachable(5000) ? EchoResult.Reachable : EchoResult.Unreachable);
+                } catch (Exception e) {
+                    /* Return error result. */
+                    ob.setEchoResult(EchoResult.Error);
+                }
+                return Rpcs.<SendEchoOutput>getRpcResult(true, ob.build(), Collections.<RpcError>emptySet());
+            }
+        });
     }
 
     @Override
     public Future<RpcResult<SendEchoOutput>> sendEcho(SendEchoInput destination) {
-        try {
-            InetAddress dst = InetAddress.getByName(destination
-                    .getDestination().getValue());
-            EchoResult result = this.pingHost(dst);
-
-            /* Build the result and return it. */
-            SendEchoOutputBuilder ob = new SendEchoOutputBuilder();
-            ob.setEchoResult(result);
-            RpcResult<SendEchoOutput> rpcResult =
-                    Rpcs.<SendEchoOutput> getRpcResult(true, ob.build(),
-                            Collections.<RpcError> emptySet());
-
-            return Futures.immediateFuture(rpcResult);
-        } catch (Exception e) {
-
-            /* Return error result. */
-            SendEchoOutputBuilder ob = new SendEchoOutputBuilder();
-            ob.setEchoResult(EchoResult.Error);
-            RpcResult<SendEchoOutput> rpcResult =
-                    Rpcs.<SendEchoOutput> getRpcResult(true, ob.build(),
-                            Collections.<RpcError> emptySet());
-            return Futures.immediateFuture(rpcResult);
-        }
+        return this.startPingHost(destination);
     }
 
 }
index 5d797c8d07ad6daf57aa268e2c90d8dd7a308ccf..33381c14c811029f15cdafe1a38e8334ea006207 100644 (file)
@@ -53,6 +53,7 @@
               org.opendaylight.yangtools.yang.common,
               org.opendaylight.yangtools.yang.binding,
               org.opendaylight.controller.sal.binding.api,
+              org.slf4j,
               org.osgi.framework</Import-Package>
             <Export-Package>org.opendaylight.controller.ping.service.api</Export-Package>
             <Bundle-Activator>org.opendaylight.controller.ping.service.impl.PingServiceImpl</Bundle-Activator>
index 5dea55ae416ffbcb1674c3d14d85807f40fab139..635466e972175fdf6ab79a9f130238dc7f946678 100644 (file)
@@ -3,13 +3,70 @@ package org.opendaylight.controller.ping.service.api;
 
 public interface PingServiceAPI {
 
+    public enum PingResult {
+        InProgress(0),
+        GotResponse(1),
+        NoResponse(2),
+        Error(3);
+
+        int value;
+        static java.util.Map<java.lang.Integer, PingResult> valueMap;
+
+        static {
+            valueMap = new java.util.HashMap<>();
+            for (PingResult enumItem : PingResult.values())
+            {
+                valueMap.put(enumItem.value, enumItem);
+            }
+        }
+
+        private PingResult(int value) {
+            this.value = value;
+        }
+
+        /**
+         * @return integer value
+         */
+        public int getIntValue() {
+            return value;
+        }
+
+        /**
+         * @param valueArg
+         * @return corresponding EchoResult item
+         */
+        public static PingResult forValue(int valueArg) {
+            return valueMap.get(valueArg);
+        }
+    }
+
     /**
-     * pingDestination
+     * pingDestinationSync
+     *
+     * Will block caller until ping operation is finished.
+     *
+     * @param address An IPv4 address to be pinged
+     * @return PingResult enum. Will never return InProgress.
+     */
+    PingResult pingDestinationSync(String address);
+
+    /**
+     * pingDestinationAsync
+     *
+     * Will return last known state for given address.
+     *
+     * @param address An IPv4 address to be pinged
+     * @return PingResult enum.
+     */
+    PingResult pingDestinationAsync(String address);
+
+    /**
+     * pingAsyncClear
+     *
+     * Will remove async ping for given address.
      *
      * @param address An IPv4 address to be pinged
-     * @return True if address is reachable,
-     * false if address is unreachable or error occurs.
      */
-    boolean pingDestination(String address);
+    void pingAsyncClear(String address);
 }
 
index 08229b7200b28d0ec270ee8465edda270eaf8a7e..b34f7a9269dcad255e74b69e48b381c4a1a54e42 100644 (file)
@@ -1,6 +1,9 @@
 package org.opendaylight.controller.ping.service.impl;
 
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 
 import org.opendaylight.controller.ping.service.api.PingServiceAPI;
 import org.opendaylight.controller.sal.binding.api.AbstractBindingAwareConsumer;
@@ -13,54 +16,127 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.ping.rev130911.SendEchoOutp
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.osgi.framework.BundleActivator;
 import org.osgi.framework.BundleContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class PingServiceImpl extends AbstractBindingAwareConsumer implements
         BundleActivator, BindingAwareConsumer, PingServiceAPI {
+    private static Logger log = LoggerFactory.getLogger(PingServiceImpl.class);
 
-    private PingService ping;
+    private PingService pingService;
     private ConsumerContext session;
+    private Map<Ipv4Address, Future<RpcResult<SendEchoOutput>>> asyncPingEntryMap;
 
     @Override
     public void onSessionInitialized(ConsumerContext session) {
         this.session = session;
+        this.pingService = this.session.getRpcService(PingService.class);
     }
 
     @Override
     protected void startImpl(BundleContext context) {
+        asyncPingEntryMap = new HashMap<>();
         context.registerService(PingServiceAPI.class, this, null);
     }
 
     @Override
-    public boolean pingDestination(String address) {
+    protected void stopImpl(BundleContext context) {
+        asyncPingEntryMap.clear();
+    }
+
+    @Override
+    public PingResult pingDestinationSync(String address) {
+        return _pingDestinationSync(address);
+    }
 
-        if (ping == null) {
-            ping = this.session.getRpcService(PingService.class);
-            if (ping == null) {
+    @Override
+    public PingResult pingDestinationAsync(String address) {
+        return _pingDestinationAsync(address);
+    }
 
-                /* No ping service found. */
-                return false;
-            }
+    @Override
+    public void pingAsyncClear(String address) {
+        _pingAsyncClear(address);
+    }
+
+    private PingResult _pingDestinationSync(String address) {
+        if (pingService == null) { return PingResult.Error; } // No pingService service found.
+
+        try {
+            Ipv4Address destination = new Ipv4Address(address);
+            SendEchoInputBuilder ib = new SendEchoInputBuilder();
+            ib.setDestination(destination);
+            return mapResult(pingService.sendEcho(ib.build()).get().getResult().getEchoResult());
+        } catch (InterruptedException ie) {
+            log.warn("InterruptedException received by pingDestinationSync: {} from: {}",
+                    address, ie.getMessage());
+        } catch (ExecutionException ee) {
+            log.warn("ExecutionException received by pingDestinationSync: {} from: {}",
+                    address, ee.getMessage());
         }
+        return PingResult.Error;
+    }
 
-        Ipv4Address destination = new Ipv4Address(address);
+    private synchronized PingResult _pingDestinationAsync(String address) {
+        if (pingService == null) { return PingResult.Error; } // No pingService service found.
 
-        SendEchoInputBuilder ib = new SendEchoInputBuilder();
-        ib.setDestination(destination);
+        /** Look for destination in asyncPingEntryMap. If none is found, create a new entry
+         * and return "in progress". This will happen on the very first time async is requested.
+         *
+         * NOTE: In a real scenario, you would want to consider a cache which automatically drops
+         * entries, so implementation does not need to rely on users calling async clear to remove
+         * the entries from the map. An example for doing such would be Google's caches or a
+         * weakhash map; which we deliberately chose not to use here for sake of simplicity.
+         */
+        final Ipv4Address destination = new Ipv4Address(address);
+        final Future<RpcResult<SendEchoOutput>> rpcResultFuture = asyncPingEntryMap.get(destination);
+        if (rpcResultFuture == null) {
+            SendEchoInputBuilder ib = new SendEchoInputBuilder();
+            ib.setDestination(destination);
+            asyncPingEntryMap.put(destination, pingService.sendEcho(ib.build()));
+            log.info("Starting pingDestinationAsync: {}", address);
+            return PingResult.InProgress;
+        }
+
+        /** Pending result may not be ready to be consumed. In such case, use "in progress".
+         */
+        if (!rpcResultFuture.isDone()) {
+            log.info("pingDestinationAsync: {} get result is not ready (ie. inProgress)", address);
+            return PingResult.InProgress;
+        }
+
+        /** If we made it this far, we know that rpcResultFuture is ready for consumption.
+         */
         try {
-            RpcResult<SendEchoOutput> result = ping.sendEcho(ib.build()).get();
-            switch (result.getResult().getEchoResult()) {
-            case Reachable:
-                return true;
-            case Unreachable:
-            case Error:
-            default:
-                return false;
-            }
+            PingResult pingResult = mapResult(rpcResultFuture.get().getResult().getEchoResult());
+            log.info("pingDestinationAsync: {} get result is {}", address, pingResult);
+            return pingResult;
         } catch (InterruptedException ie) {
+            log.warn("InterruptedException received by pingDestinationAsync: {} from: {}",
+                    address, ie.getMessage());
         } catch (ExecutionException ee) {
+            log.warn("ExecutionException received by pingDestinationAsync: {} from: {}",
+                    address, ee.getMessage());
         }
+        return PingResult.Error;
+    }
 
-        return false;
+    private synchronized void _pingAsyncClear(String address) {
+        asyncPingEntryMap.remove(new Ipv4Address(address));
+        log.info("Removing pingDestinationAsync: {}", address);
     }
 
+    private static PingResult mapResult(SendEchoOutput.EchoResult echoResult) {
+        // Translate echoResult to pingResult
+        switch (echoResult) {
+            case Reachable:
+                return PingResult.GotResponse;
+            case Unreachable:
+                return PingResult.NoResponse;
+            case Error:
+            default:
+                break;
+        }
+        return PingResult.Error;
+    }
 }