From 76d8a66814be207ade5e4cba867a21bb5e06054a Mon Sep 17 00:00:00 2001 From: Flavio Fernandes Date: Wed, 14 May 2014 08:56:58 -0400 Subject: [PATCH] [DO NOT MERGE] This is an extension to the existing wiki page on md-sal ping tutorial. It exposes the use of Future<> with the rpc api generated by yang-tools, to make the ping service non-blocking; using the uris: .../ping/async/start/{ipAddress} .../ping/async/get/{ipAddress} .../ping/async/clear/{ipAddress} If this becomes interesting enough, I will update the link https://wiki.opendaylight.org/view/Ping with the stepping stones. Example: for x in 127.0.0.1 128.0.0.1 192.168.1.1 ; do curl --user "admin":"admin" -X PUT http://localhost:8080/controller/nb/v2/ping/async/start/${x} ; echo ; done while : ; do for x in 127.0.0.1 128.0.0.1 192.168.1.1 ; do curl --user "admin":"admin" -X PUT http://localhost:8080/controller/nb/v2/ping/async/get/${x} ; echo ; done ; sleep 2 ; done for x in 127.0.0.1 128.0.0.1 192.168.1.1 ; do curl --user "admin":"admin" -X PUT http://localhost:8080/controller/nb/v2/ping/async/clear/${x} ; echo ; done [DO NOT MERGE] Change-Id: Ibfaee34ace0435cac39006a7e9c6577df2bf71a8 Signed-off-by: Flavio Fernandes --- .../ping/northbound/PingNorthbound.java | 63 ++++++++-- .../ping/plugin/internal/PingImpl.java | 55 ++++---- opendaylight/ping/service/pom.xml | 1 + .../ping/service/api/PingServiceAPI.java | 65 +++++++++- .../ping/service/impl/PingServiceImpl.java | 118 ++++++++++++++---- 5 files changed, 231 insertions(+), 71 deletions(-) diff --git a/opendaylight/ping/northbound/src/main/java/org/opendaylight/controller/ping/northbound/PingNorthbound.java b/opendaylight/ping/northbound/src/main/java/org/opendaylight/controller/ping/northbound/PingNorthbound.java index e1e21349be..62f794d5c8 100644 --- a/opendaylight/ping/northbound/src/main/java/org/opendaylight/controller/ping/northbound/PingNorthbound.java +++ b/opendaylight/ping/northbound/src/main/java/org/opendaylight/controller/ping/northbound/PingNorthbound.java @@ -18,22 +18,59 @@ public class PingNorthbound { @Path("/ping/{ipAddress}") @PUT @StatusCodes({ - @ResponseCode(code = 200, condition = "Destination reachable"), - @ResponseCode(code = 503, condition = "Internal error"), - @ResponseCode(code = 503, condition = "Destination unreachable") }) + @ResponseCode(code = 200, condition = "Destination reachable"), + // @ResponseCode(code = 206, condition = "Ping in progress"), + @ResponseCode(code = 503, condition = "Internal error"), + @ResponseCode(code = 503, condition = "Destination unreachable") }) public Response ping(@PathParam(value = "ipAddress") String ipAddress) { - PingServiceAPI ping = (PingServiceAPI) ServiceHelper.getGlobalInstance( - PingServiceAPI.class, this); - if (ping == null) { + return pingCommon(ipAddress, true); + } + + @Path("/ping/async/start/{ipAddress}") + @PUT + @StatusCodes({ + @ResponseCode(code = 200, condition = "Destination reachable"), + @ResponseCode(code = 206, condition = "Ping in progress"), + @ResponseCode(code = 503, condition = "Internal error"), + @ResponseCode(code = 503, condition = "Destination unreachable") }) + public Response pingAsyncStart(@PathParam(value = "ipAddress") String ipAddress) { + return pingCommon(ipAddress, false); + } + + @Path("/ping/async/get/{ipAddress}") + @PUT + @StatusCodes({ + @ResponseCode(code = 200, condition = "Destination reachable"), + @ResponseCode(code = 206, condition = "Ping in progress"), + @ResponseCode(code = 503, condition = "Internal error"), + @ResponseCode(code = 503, condition = "Destination unreachable") }) + public Response pingAsyncGet(@PathParam(value = "ipAddress") String ipAddress) { + return pingCommon(ipAddress, false); + } + @Path("/ping/async/clear/{ipAddress}") + @PUT + @StatusCodes({ + @ResponseCode(code = 200, condition = "Async ping removed"), + @ResponseCode(code = 503, condition = "Internal error")}) + public Response pingAsyncClear(@PathParam(value = "ipAddress") String ipAddress) { + PingServiceAPI pingServiceAPI = (PingServiceAPI) ServiceHelper.getGlobalInstance(PingServiceAPI.class, this); + if (pingServiceAPI != null) { pingServiceAPI.pingAsyncClear(ipAddress); } + return Response.ok(new String(ipAddress + " - removed")).build(); // idem-potent + } + + private Response pingCommon(String ipAddress, boolean isSync) { + PingServiceAPI pingServiceAPI = (PingServiceAPI) ServiceHelper.getGlobalInstance(PingServiceAPI.class, this); + if (pingServiceAPI == null) { /* Ping service not found. */ - return Response.ok(new String("No ping service")).status(500) - .build(); + return Response.ok(new String("No ping service")).status(500).build(); } - if (ping.pingDestination(ipAddress)) - return Response.ok(new String(ipAddress + " - reachable")).build(); - - return Response.ok(new String(ipAddress + " - unreachable")).status(503) - .build(); + PingServiceAPI.PingResult pingResult = isSync ? + pingServiceAPI.pingDestinationSync(ipAddress) : pingServiceAPI.pingDestinationAsync(ipAddress); + if (pingResult == PingServiceAPI.PingResult.InProgress) + return Response.ok(new String(ipAddress + " - " + pingResult)).status(206).build(); + if (pingResult == PingServiceAPI.PingResult.GotResponse) + return Response.ok(new String(ipAddress + " - " + pingResult)).build(); + return Response.ok(new String(ipAddress + " - " + pingResult)).status(503).build(); } } diff --git a/opendaylight/ping/plugin/src/main/java/org/opendaylight/controller/ping/plugin/internal/PingImpl.java b/opendaylight/ping/plugin/src/main/java/org/opendaylight/controller/ping/plugin/internal/PingImpl.java index 0fe73000f9..3305503c30 100644 --- a/opendaylight/ping/plugin/src/main/java/org/opendaylight/controller/ping/plugin/internal/PingImpl.java +++ b/opendaylight/ping/plugin/src/main/java/org/opendaylight/controller/ping/plugin/internal/PingImpl.java @@ -1,8 +1,10 @@ package org.opendaylight.controller.ping.plugin.internal; -import java.io.IOException; import java.net.InetAddress; import java.util.Collections; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import org.opendaylight.controller.sal.common.util.Rpcs; @@ -14,43 +16,30 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.ping.rev130911.SendEchoOutp import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; -import com.google.common.util.concurrent.Futures; - public class PingImpl implements PingService { - - private EchoResult pingHost(InetAddress destination) throws IOException { - if (destination.isReachable(5000)) { - return EchoResult.Reachable; - } else { - return EchoResult.Unreachable; - } + private final ExecutorService pool = Executors.newFixedThreadPool(2); + + private Future> startPingHost(final SendEchoInput destination) { + return pool.submit(new Callable>() { + @Override + public RpcResult call() throws Exception { + SendEchoOutputBuilder ob = new SendEchoOutputBuilder(); + try { + InetAddress dst = InetAddress.getByName(destination.getDestination().getValue()); + /* Build the result and return it. */ + ob.setEchoResult(dst.isReachable(5000) ? EchoResult.Reachable : EchoResult.Unreachable); + } catch (Exception e) { + /* Return error result. */ + ob.setEchoResult(EchoResult.Error); + } + return Rpcs.getRpcResult(true, ob.build(), Collections.emptySet()); + } + }); } @Override public Future> sendEcho(SendEchoInput destination) { - try { - InetAddress dst = InetAddress.getByName(destination - .getDestination().getValue()); - EchoResult result = this.pingHost(dst); - - /* Build the result and return it. */ - SendEchoOutputBuilder ob = new SendEchoOutputBuilder(); - ob.setEchoResult(result); - RpcResult rpcResult = - Rpcs. getRpcResult(true, ob.build(), - Collections. emptySet()); - - return Futures.immediateFuture(rpcResult); - } catch (Exception e) { - - /* Return error result. */ - SendEchoOutputBuilder ob = new SendEchoOutputBuilder(); - ob.setEchoResult(EchoResult.Error); - RpcResult rpcResult = - Rpcs. getRpcResult(true, ob.build(), - Collections. emptySet()); - return Futures.immediateFuture(rpcResult); - } + return this.startPingHost(destination); } } diff --git a/opendaylight/ping/service/pom.xml b/opendaylight/ping/service/pom.xml index 5d797c8d07..33381c14c8 100644 --- a/opendaylight/ping/service/pom.xml +++ b/opendaylight/ping/service/pom.xml @@ -53,6 +53,7 @@ org.opendaylight.yangtools.yang.common, org.opendaylight.yangtools.yang.binding, org.opendaylight.controller.sal.binding.api, + org.slf4j, org.osgi.framework org.opendaylight.controller.ping.service.api org.opendaylight.controller.ping.service.impl.PingServiceImpl diff --git a/opendaylight/ping/service/src/main/java/org/opendaylight/controller/ping/service/api/PingServiceAPI.java b/opendaylight/ping/service/src/main/java/org/opendaylight/controller/ping/service/api/PingServiceAPI.java index 5dea55ae41..635466e972 100644 --- a/opendaylight/ping/service/src/main/java/org/opendaylight/controller/ping/service/api/PingServiceAPI.java +++ b/opendaylight/ping/service/src/main/java/org/opendaylight/controller/ping/service/api/PingServiceAPI.java @@ -3,13 +3,70 @@ package org.opendaylight.controller.ping.service.api; public interface PingServiceAPI { + public enum PingResult { + InProgress(0), + GotResponse(1), + NoResponse(2), + Error(3); + + int value; + static java.util.Map valueMap; + + static { + valueMap = new java.util.HashMap<>(); + for (PingResult enumItem : PingResult.values()) + { + valueMap.put(enumItem.value, enumItem); + } + } + + private PingResult(int value) { + this.value = value; + } + + /** + * @return integer value + */ + public int getIntValue() { + return value; + } + + /** + * @param valueArg + * @return corresponding EchoResult item + */ + public static PingResult forValue(int valueArg) { + return valueMap.get(valueArg); + } + } + /** - * pingDestination + * pingDestinationSync + * + * Will block caller until ping operation is finished. + * + * @param address An IPv4 address to be pinged + * @return PingResult enum. Will never return InProgress. + */ + PingResult pingDestinationSync(String address); + + /** + * pingDestinationAsync + * + * Will return last known state for given address. + * + * @param address An IPv4 address to be pinged + * @return PingResult enum. + */ + PingResult pingDestinationAsync(String address); + + /** + * pingAsyncClear + * + * Will remove async ping for given address. * * @param address An IPv4 address to be pinged - * @return True if address is reachable, - * false if address is unreachable or error occurs. */ - boolean pingDestination(String address); + void pingAsyncClear(String address); } diff --git a/opendaylight/ping/service/src/main/java/org/opendaylight/controller/ping/service/impl/PingServiceImpl.java b/opendaylight/ping/service/src/main/java/org/opendaylight/controller/ping/service/impl/PingServiceImpl.java index 08229b7200..b34f7a9269 100644 --- a/opendaylight/ping/service/src/main/java/org/opendaylight/controller/ping/service/impl/PingServiceImpl.java +++ b/opendaylight/ping/service/src/main/java/org/opendaylight/controller/ping/service/impl/PingServiceImpl.java @@ -1,6 +1,9 @@ package org.opendaylight.controller.ping.service.impl; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import org.opendaylight.controller.ping.service.api.PingServiceAPI; import org.opendaylight.controller.sal.binding.api.AbstractBindingAwareConsumer; @@ -13,54 +16,127 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.ping.rev130911.SendEchoOutp import org.opendaylight.yangtools.yang.common.RpcResult; import org.osgi.framework.BundleActivator; import org.osgi.framework.BundleContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class PingServiceImpl extends AbstractBindingAwareConsumer implements BundleActivator, BindingAwareConsumer, PingServiceAPI { + private static Logger log = LoggerFactory.getLogger(PingServiceImpl.class); - private PingService ping; + private PingService pingService; private ConsumerContext session; + private Map>> asyncPingEntryMap; @Override public void onSessionInitialized(ConsumerContext session) { this.session = session; + this.pingService = this.session.getRpcService(PingService.class); } @Override protected void startImpl(BundleContext context) { + asyncPingEntryMap = new HashMap<>(); context.registerService(PingServiceAPI.class, this, null); } @Override - public boolean pingDestination(String address) { + protected void stopImpl(BundleContext context) { + asyncPingEntryMap.clear(); + } + + @Override + public PingResult pingDestinationSync(String address) { + return _pingDestinationSync(address); + } - if (ping == null) { - ping = this.session.getRpcService(PingService.class); - if (ping == null) { + @Override + public PingResult pingDestinationAsync(String address) { + return _pingDestinationAsync(address); + } - /* No ping service found. */ - return false; - } + @Override + public void pingAsyncClear(String address) { + _pingAsyncClear(address); + } + + private PingResult _pingDestinationSync(String address) { + if (pingService == null) { return PingResult.Error; } // No pingService service found. + + try { + Ipv4Address destination = new Ipv4Address(address); + SendEchoInputBuilder ib = new SendEchoInputBuilder(); + ib.setDestination(destination); + return mapResult(pingService.sendEcho(ib.build()).get().getResult().getEchoResult()); + } catch (InterruptedException ie) { + log.warn("InterruptedException received by pingDestinationSync: {} from: {}", + address, ie.getMessage()); + } catch (ExecutionException ee) { + log.warn("ExecutionException received by pingDestinationSync: {} from: {}", + address, ee.getMessage()); } + return PingResult.Error; + } - Ipv4Address destination = new Ipv4Address(address); + private synchronized PingResult _pingDestinationAsync(String address) { + if (pingService == null) { return PingResult.Error; } // No pingService service found. - SendEchoInputBuilder ib = new SendEchoInputBuilder(); - ib.setDestination(destination); + /** Look for destination in asyncPingEntryMap. If none is found, create a new entry + * and return "in progress". This will happen on the very first time async is requested. + * + * NOTE: In a real scenario, you would want to consider a cache which automatically drops + * entries, so implementation does not need to rely on users calling async clear to remove + * the entries from the map. An example for doing such would be Google's caches or a + * weakhash map; which we deliberately chose not to use here for sake of simplicity. + */ + final Ipv4Address destination = new Ipv4Address(address); + final Future> rpcResultFuture = asyncPingEntryMap.get(destination); + if (rpcResultFuture == null) { + SendEchoInputBuilder ib = new SendEchoInputBuilder(); + ib.setDestination(destination); + asyncPingEntryMap.put(destination, pingService.sendEcho(ib.build())); + log.info("Starting pingDestinationAsync: {}", address); + return PingResult.InProgress; + } + + /** Pending result may not be ready to be consumed. In such case, use "in progress". + */ + if (!rpcResultFuture.isDone()) { + log.info("pingDestinationAsync: {} get result is not ready (ie. inProgress)", address); + return PingResult.InProgress; + } + + /** If we made it this far, we know that rpcResultFuture is ready for consumption. + */ try { - RpcResult result = ping.sendEcho(ib.build()).get(); - switch (result.getResult().getEchoResult()) { - case Reachable: - return true; - case Unreachable: - case Error: - default: - return false; - } + PingResult pingResult = mapResult(rpcResultFuture.get().getResult().getEchoResult()); + log.info("pingDestinationAsync: {} get result is {}", address, pingResult); + return pingResult; } catch (InterruptedException ie) { + log.warn("InterruptedException received by pingDestinationAsync: {} from: {}", + address, ie.getMessage()); } catch (ExecutionException ee) { + log.warn("ExecutionException received by pingDestinationAsync: {} from: {}", + address, ee.getMessage()); } + return PingResult.Error; + } - return false; + private synchronized void _pingAsyncClear(String address) { + asyncPingEntryMap.remove(new Ipv4Address(address)); + log.info("Removing pingDestinationAsync: {}", address); } + private static PingResult mapResult(SendEchoOutput.EchoResult echoResult) { + // Translate echoResult to pingResult + switch (echoResult) { + case Reachable: + return PingResult.GotResponse; + case Unreachable: + return PingResult.NoResponse; + case Error: + default: + break; + } + return PingResult.Error; + } } -- 2.36.6