From: Flavio Fernandes Date: Wed, 14 May 2014 12:56:58 +0000 (-0400) Subject: [DO NOT MERGE] X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=e936e98189184f4d1268b89b040ca558d8e805c4;hp=6835e2211a435560be6f10d79aeeb237eb0d04e6 [DO NOT MERGE] This is an extenstion to the existing wiki page on md-sal ping tutorial. It exposes the use of Future<> with the rpc api generated by yang-tools, to make the ping service non-blocking; using the uris: .../ping/async/start/{ipAddress} .../ping/async/get/{ipAddress} .../ping/async/stop/{ipAddress} If this becomes interesting enough, I will update the link https://wiki.opendaylight.org/view/Ping with the stepping stones. Example: for x in 127.0.0.1 128.0.0.1 192.168.1.1 ; do curl --user "admin":"admin" -X PUT http://localhost:8080/controller/nb/v2/ping/async/start/${x} ; echo ; done while : ; do for x in 127.0.0.1 128.0.0.1 192.168.1.1 ; do curl --user "admin":"admin" -X PUT http://localhost:8080/controller/nb/v2/ping/async/get/${x} ; echo ; done ; sleep 2 ; done for x in 127.0.0.1 128.0.0.1 192.168.1.1 ; do curl --user "admin":"admin" -X PUT http://localhost:8080/controller/nb/v2/ping/async/stop/${x} ; echo ; done [DO NOT MERGE] Change-Id: Ibfaee34ace0435cac39006a7e9c6577df2bf71a8 Signed-off-by: Flavio Fernandes --- diff --git a/opendaylight/ping/northbound/src/main/java/org/opendaylight/controller/ping/northbound/PingNorthbound.java b/opendaylight/ping/northbound/src/main/java/org/opendaylight/controller/ping/northbound/PingNorthbound.java index e1e21349be..acc0b01b09 100644 --- a/opendaylight/ping/northbound/src/main/java/org/opendaylight/controller/ping/northbound/PingNorthbound.java +++ b/opendaylight/ping/northbound/src/main/java/org/opendaylight/controller/ping/northbound/PingNorthbound.java @@ -18,22 +18,59 @@ public class PingNorthbound { @Path("/ping/{ipAddress}") @PUT @StatusCodes({ - @ResponseCode(code = 200, condition = "Destination reachable"), - @ResponseCode(code = 503, condition = "Internal error"), - @ResponseCode(code = 503, condition = "Destination unreachable") }) + @ResponseCode(code = 200, condition = "Destination reachable"), + // @ResponseCode(code = 206, condition = "Ping in progress"), + @ResponseCode(code = 503, condition = "Internal error"), + @ResponseCode(code = 503, condition = "Destination unreachable") }) public Response ping(@PathParam(value = "ipAddress") String ipAddress) { - PingServiceAPI ping = (PingServiceAPI) ServiceHelper.getGlobalInstance( - PingServiceAPI.class, this); - if (ping == null) { + return pingCommon(ipAddress, true); + } + + @Path("/ping/async/start/{ipAddress}") + @PUT + @StatusCodes({ + @ResponseCode(code = 200, condition = "Destination reachable"), + @ResponseCode(code = 206, condition = "Ping in progress"), + @ResponseCode(code = 503, condition = "Internal error"), + @ResponseCode(code = 503, condition = "Destination unreachable") }) + public Response pingAsyncStart(@PathParam(value = "ipAddress") String ipAddress) { + return pingCommon(ipAddress, false); + } + + @Path("/ping/async/get/{ipAddress}") + @PUT + @StatusCodes({ + @ResponseCode(code = 200, condition = "Destination reachable"), + @ResponseCode(code = 206, condition = "Ping in progress"), + @ResponseCode(code = 503, condition = "Internal error"), + @ResponseCode(code = 503, condition = "Destination unreachable") }) + public Response pingAsyncGet(@PathParam(value = "ipAddress") String ipAddress) { + return pingCommon(ipAddress, false); + } + @Path("/ping/async/stop/{ipAddress}") + @PUT + @StatusCodes({ + @ResponseCode(code = 200, condition = "Ping stopped"), + @ResponseCode(code = 503, condition = "Internal error")}) + public Response pingAsyncStop(@PathParam(value = "ipAddress") String ipAddress) { + PingServiceAPI pingServiceAPI = (PingServiceAPI) ServiceHelper.getGlobalInstance(PingServiceAPI.class, this); + if (pingServiceAPI != null) { pingServiceAPI.pingAsyncStop(ipAddress); } + return Response.ok(new String(ipAddress + " - stopped")).build(); // idem-potent + } + + private Response pingCommon(String ipAddress, boolean isSync) { + PingServiceAPI pingServiceAPI = (PingServiceAPI) ServiceHelper.getGlobalInstance(PingServiceAPI.class, this); + if (pingServiceAPI == null) { /* Ping service not found. */ - return Response.ok(new String("No ping service")).status(500) - .build(); + return Response.ok(new String("No ping service")).status(500).build(); } - if (ping.pingDestination(ipAddress)) - return Response.ok(new String(ipAddress + " - reachable")).build(); - - return Response.ok(new String(ipAddress + " - unreachable")).status(503) - .build(); + PingServiceAPI.PingResult pingResult = isSync ? + pingServiceAPI.pingDestinationSync(ipAddress) : pingServiceAPI.pingDestinationAsync(ipAddress); + if (pingResult == PingServiceAPI.PingResult.InProgress) + return Response.ok(new String(ipAddress + " - " + pingResult)).status(206).build(); + if (pingResult == PingServiceAPI.PingResult.GotResponse) + return Response.ok(new String(ipAddress + " - " + pingResult)).build(); + return Response.ok(new String(ipAddress + " - " + pingResult)).status(503).build(); } } diff --git a/opendaylight/ping/plugin/src/main/java/org/opendaylight/controller/ping/plugin/internal/PingImpl.java b/opendaylight/ping/plugin/src/main/java/org/opendaylight/controller/ping/plugin/internal/PingImpl.java index 0fe73000f9..3305503c30 100644 --- a/opendaylight/ping/plugin/src/main/java/org/opendaylight/controller/ping/plugin/internal/PingImpl.java +++ b/opendaylight/ping/plugin/src/main/java/org/opendaylight/controller/ping/plugin/internal/PingImpl.java @@ -1,8 +1,10 @@ package org.opendaylight.controller.ping.plugin.internal; -import java.io.IOException; import java.net.InetAddress; import java.util.Collections; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import org.opendaylight.controller.sal.common.util.Rpcs; @@ -14,43 +16,30 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.ping.rev130911.SendEchoOutp import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; -import com.google.common.util.concurrent.Futures; - public class PingImpl implements PingService { - - private EchoResult pingHost(InetAddress destination) throws IOException { - if (destination.isReachable(5000)) { - return EchoResult.Reachable; - } else { - return EchoResult.Unreachable; - } + private final ExecutorService pool = Executors.newFixedThreadPool(2); + + private Future> startPingHost(final SendEchoInput destination) { + return pool.submit(new Callable>() { + @Override + public RpcResult call() throws Exception { + SendEchoOutputBuilder ob = new SendEchoOutputBuilder(); + try { + InetAddress dst = InetAddress.getByName(destination.getDestination().getValue()); + /* Build the result and return it. */ + ob.setEchoResult(dst.isReachable(5000) ? EchoResult.Reachable : EchoResult.Unreachable); + } catch (Exception e) { + /* Return error result. */ + ob.setEchoResult(EchoResult.Error); + } + return Rpcs.getRpcResult(true, ob.build(), Collections.emptySet()); + } + }); } @Override public Future> sendEcho(SendEchoInput destination) { - try { - InetAddress dst = InetAddress.getByName(destination - .getDestination().getValue()); - EchoResult result = this.pingHost(dst); - - /* Build the result and return it. */ - SendEchoOutputBuilder ob = new SendEchoOutputBuilder(); - ob.setEchoResult(result); - RpcResult rpcResult = - Rpcs. getRpcResult(true, ob.build(), - Collections. emptySet()); - - return Futures.immediateFuture(rpcResult); - } catch (Exception e) { - - /* Return error result. */ - SendEchoOutputBuilder ob = new SendEchoOutputBuilder(); - ob.setEchoResult(EchoResult.Error); - RpcResult rpcResult = - Rpcs. getRpcResult(true, ob.build(), - Collections. emptySet()); - return Futures.immediateFuture(rpcResult); - } + return this.startPingHost(destination); } } diff --git a/opendaylight/ping/service/src/main/java/org/opendaylight/controller/ping/service/api/PingServiceAPI.java b/opendaylight/ping/service/src/main/java/org/opendaylight/controller/ping/service/api/PingServiceAPI.java index 5dea55ae41..8a9da4d012 100644 --- a/opendaylight/ping/service/src/main/java/org/opendaylight/controller/ping/service/api/PingServiceAPI.java +++ b/opendaylight/ping/service/src/main/java/org/opendaylight/controller/ping/service/api/PingServiceAPI.java @@ -3,13 +3,70 @@ package org.opendaylight.controller.ping.service.api; public interface PingServiceAPI { + public enum PingResult { + InProgress(0), + GotResponse(1), + NoResponse(2), + Error(3); + + int value; + static java.util.Map valueMap; + + static { + valueMap = new java.util.HashMap<>(); + for (PingResult enumItem : PingResult.values()) + { + valueMap.put(enumItem.value, enumItem); + } + } + + private PingResult(int value) { + this.value = value; + } + + /** + * @return integer value + */ + public int getIntValue() { + return value; + } + + /** + * @param valueArg + * @return corresponding EchoResult item + */ + public static PingResult forValue(int valueArg) { + return valueMap.get(valueArg); + } + } + /** - * pingDestination + * pingDestinationSync + * + * Will block caller until ping operation is finished. + * + * @param address An IPv4 address to be pinged + * @return PingResult enum. Will never return InProgress. + */ + PingResult pingDestinationSync(String address); + + /** + * pingDestinationAsync + * + * Will return last known state for given address. + * + * @param address An IPv4 address to be pinged + * @return PingResult enum. + */ + PingResult pingDestinationAsync(String address); + + /** + * pingAsyncStop + * + * Will stop async ping for given address. * * @param address An IPv4 address to be pinged - * @return True if address is reachable, - * false if address is unreachable or error occurs. */ - boolean pingDestination(String address); + void pingAsyncStop(String address); } diff --git a/opendaylight/ping/service/src/main/java/org/opendaylight/controller/ping/service/impl/PingServiceImpl.java b/opendaylight/ping/service/src/main/java/org/opendaylight/controller/ping/service/impl/PingServiceImpl.java index 08229b7200..eebf01b5a9 100644 --- a/opendaylight/ping/service/src/main/java/org/opendaylight/controller/ping/service/impl/PingServiceImpl.java +++ b/opendaylight/ping/service/src/main/java/org/opendaylight/controller/ping/service/impl/PingServiceImpl.java @@ -1,6 +1,12 @@ package org.opendaylight.controller.ping.service.impl; +import java.util.Iterator; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import org.opendaylight.controller.ping.service.api.PingServiceAPI; import org.opendaylight.controller.sal.binding.api.AbstractBindingAwareConsumer; @@ -17,50 +23,169 @@ import org.osgi.framework.BundleContext; public class PingServiceImpl extends AbstractBindingAwareConsumer implements BundleActivator, BindingAwareConsumer, PingServiceAPI { - private PingService ping; + private PingService pingService; private ConsumerContext session; + private class AsyncPingEntry { + public Future> pendingResult; + public SendEchoOutput.EchoResult lastEchoResult; + public int countSent = 0; // debug use + public int countReceived = 0; // debug use + + public AsyncPingEntry(Ipv4Address destination, PingService pingService, AsyncPingEntry previousAsyncPingEntry) { + if (previousAsyncPingEntry != null) { + this.lastEchoResult = previousAsyncPingEntry.lastEchoResult; + this.countSent = previousAsyncPingEntry.countSent; + this.countReceived = previousAsyncPingEntry.countReceived; + } + + /** Tickle the ping service to go ahead and send the echo packet. + */ + try { + SendEchoInputBuilder ib = new SendEchoInputBuilder(); + ib.setDestination(destination); + this.pendingResult = pingService.sendEcho(ib.build()); + this.countSent++; + } catch (Exception e) { + this.pendingResult = null; + } + } + } + private ConcurrentMap asyncPingEntryConcurrentMap; + private Timer asyncPingTimer; + private class AsyncPingTimerHandler extends TimerTask { + /** The purpose of AsyncPingTimerHandler's life is to look for async entries that have been + * finished with their RPC calls to the ping plugin. When such entries are found, it simply + * restarts them, by replacing the 'data' wiht a new AsyncPingEntry instance. While invoking + * the AsyncPingEntry's constructor, the new RPC call to the ping plugin backend is executed. + */ + @Override + public void run() { + Iterator> iterator = + asyncPingEntryConcurrentMap.entrySet().iterator(); + while (iterator.hasNext()) { + try { + ConcurrentHashMap.Entry entry = iterator.next(); + Ipv4Address destination = entry.getKey(); + AsyncPingEntry asyncPingEntry = entry.getValue(); + + /** Re-initiate ping, as long as last attempt for doing it is finished + */ + if (asyncPingEntry.pendingResult == null || asyncPingEntry.pendingResult.isDone()) { + /** Bump receive count if destination was reachable + */ + if (asyncPingEntry != null) { + final SendEchoOutput.EchoResult echoResult = + asyncPingEntry.pendingResult.get().getResult().getEchoResult(); + + asyncPingEntry.lastEchoResult = echoResult; + if (echoResult == SendEchoOutput.EchoResult.Reachable) { asyncPingEntry.countReceived++; } + } + + /** Replace the entry for a key only if currently mapped to some value. + * This will protect the concurrent map against a race where this thread + * would be re-adding an entry that just got taken out. + * + * Note that by constructing a new AsyncPingEntry instance, the ping plugin is + * triggered and the cycle restarts by waiting for asyncPingEntry.pendingResult + * of the new instance to be done. + */ + asyncPingEntryConcurrentMap.replace(destination, + new AsyncPingEntry(destination, pingService, asyncPingEntry)); + } + } catch (InterruptedException ie) { + } catch (ExecutionException ee) { + } + } + } + } + + private PingResult mapResult(SendEchoOutput.EchoResult echoResult) { + // Translate echoResult to pingResult + switch (echoResult) { + case Reachable: + return PingResult.GotResponse; + case Unreachable: + return PingResult.NoResponse; + case Error: + default: + break; + } + return PingResult.Error; + } + @Override public void onSessionInitialized(ConsumerContext session) { this.session = session; + this.pingService = this.session.getRpcService(PingService.class); } @Override protected void startImpl(BundleContext context) { + asyncPingEntryConcurrentMap = new ConcurrentHashMap(); + + /* Async Ping Timer to go off every 1 second for periodic pingService behavior */ + asyncPingTimer = new Timer(); + asyncPingTimer.schedule(new AsyncPingTimerHandler(), 1000, 1000); + context.registerService(PingServiceAPI.class, this, null); } @Override - public boolean pingDestination(String address) { + protected void stopImpl(BundleContext context) { + asyncPingTimer.cancel(); + asyncPingEntryConcurrentMap.clear(); + } - if (ping == null) { - ping = this.session.getRpcService(PingService.class); - if (ping == null) { + @Override + public PingResult pingDestinationSync(String address) { + if (pingService == null) { return PingResult.Error; } // No pingService service found. - /* No ping service found. */ - return false; - } + try { + Ipv4Address destination = new Ipv4Address(address); + SendEchoInputBuilder ib = new SendEchoInputBuilder(); + ib.setDestination(destination); + return mapResult(pingService.sendEcho(ib.build()).get().getResult().getEchoResult()); + } catch (InterruptedException ie) { + } catch (ExecutionException ee) { } + return PingResult.Error; + } + @Override + public PingResult pingDestinationAsync(String address) { + if (pingService == null) { return PingResult.Error; } // No pingService service found. + + /** Look for destination in asyncPingEntryConcurrentMap. If none is found, create a new entry + * and return "in progress". This will happen on the very first time async is requested. + */ Ipv4Address destination = new Ipv4Address(address); + AsyncPingEntry asyncPingEntry = asyncPingEntryConcurrentMap.get(destination); + if (asyncPingEntry == null) { + asyncPingEntryConcurrentMap.put(destination, new AsyncPingEntry(destination, pingService, null)); + return PingResult.InProgress; + } - SendEchoInputBuilder ib = new SendEchoInputBuilder(); - ib.setDestination(destination); + /** Pending result may not be ready to be consumed. In such case, use lastResult. If there has + * not been a lastResult, then the only choice is to use "in progress". + */ + if (!asyncPingEntry.pendingResult.isDone()) { + return asyncPingEntry.lastEchoResult == null ? + PingResult.InProgress : mapResult(asyncPingEntry.lastEchoResult); + } + + /** If we made it this far, we know that pendingResult contains the latest and greatest result + */ try { - RpcResult result = ping.sendEcho(ib.build()).get(); - switch (result.getResult().getEchoResult()) { - case Reachable: - return true; - case Unreachable: - case Error: - default: - return false; - } + return mapResult(asyncPingEntry.pendingResult.get().getResult().getEchoResult()); } catch (InterruptedException ie) { } catch (ExecutionException ee) { } - - return false; + return PingResult.Error; } + @Override + public void pingAsyncStop(String address) { + asyncPingEntryConcurrentMap.remove( new Ipv4Address(address) ); + } }