This is an extenstion to the existing wiki page on md-sal ping tutorial.
It exposes the use of Future<> with the rpc api generated by yang-tools,
to make the ping service non-blocking; using the uris:
.../ping/async/start/{ipAddress}
.../ping/async/get/{ipAddress}
.../ping/async/stop/{ipAddress}
If this becomes interesting enough, I will update the link https://wiki.opendaylight.org/view/Ping
with the stepping stones.
Example:
for x in 127.0.0.1 128.0.0.1 192.168.1.1 ;
do curl --user "admin":"admin" -X PUT http://localhost:8080/controller/nb/v2/ping/async/start/${x} ; echo ; done
while : ; do for x in 127.0.0.1 128.0.0.1 192.168.1.1 ;
do curl --user "admin":"admin" -X PUT http://localhost:8080/controller/nb/v2/ping/async/get/${x} ; echo ;
done ; sleep 2 ; done
for x in 127.0.0.1 128.0.0.1 192.168.1.1 ;
do curl --user "admin":"admin" -X PUT http://localhost:8080/controller/nb/v2/ping/async/stop/${x} ; echo ; done
[DO NOT MERGE]
Change-Id: Ibfaee34ace0435cac39006a7e9c6577df2bf71a8
Signed-off-by: Flavio Fernandes <ffernand@redhat.com>
@Path("/ping/{ipAddress}")
@PUT
@StatusCodes({
- @ResponseCode(code = 200, condition = "Destination reachable"),
- @ResponseCode(code = 503, condition = "Internal error"),
- @ResponseCode(code = 503, condition = "Destination unreachable") })
+ @ResponseCode(code = 200, condition = "Destination reachable"),
+ // @ResponseCode(code = 206, condition = "Ping in progress"),
+ @ResponseCode(code = 503, condition = "Internal error"),
+ @ResponseCode(code = 503, condition = "Destination unreachable") })
public Response ping(@PathParam(value = "ipAddress") String ipAddress) {
- PingServiceAPI ping = (PingServiceAPI) ServiceHelper.getGlobalInstance(
- PingServiceAPI.class, this);
- if (ping == null) {
+ return pingCommon(ipAddress, true);
+ }
+
+ @Path("/ping/async/start/{ipAddress}")
+ @PUT
+ @StatusCodes({
+ @ResponseCode(code = 200, condition = "Destination reachable"),
+ @ResponseCode(code = 206, condition = "Ping in progress"),
+ @ResponseCode(code = 503, condition = "Internal error"),
+ @ResponseCode(code = 503, condition = "Destination unreachable") })
+ public Response pingAsyncStart(@PathParam(value = "ipAddress") String ipAddress) {
+ return pingCommon(ipAddress, false);
+ }
+
+ @Path("/ping/async/get/{ipAddress}")
+ @PUT
+ @StatusCodes({
+ @ResponseCode(code = 200, condition = "Destination reachable"),
+ @ResponseCode(code = 206, condition = "Ping in progress"),
+ @ResponseCode(code = 503, condition = "Internal error"),
+ @ResponseCode(code = 503, condition = "Destination unreachable") })
+ public Response pingAsyncGet(@PathParam(value = "ipAddress") String ipAddress) {
+ return pingCommon(ipAddress, false);
+ }
+ @Path("/ping/async/stop/{ipAddress}")
+ @PUT
+ @StatusCodes({
+ @ResponseCode(code = 200, condition = "Ping stopped"),
+ @ResponseCode(code = 503, condition = "Internal error")})
+ public Response pingAsyncStop(@PathParam(value = "ipAddress") String ipAddress) {
+ PingServiceAPI pingServiceAPI = (PingServiceAPI) ServiceHelper.getGlobalInstance(PingServiceAPI.class, this);
+ if (pingServiceAPI != null) { pingServiceAPI.pingAsyncStop(ipAddress); }
+ return Response.ok(new String(ipAddress + " - stopped")).build(); // idem-potent
+ }
+
+ private Response pingCommon(String ipAddress, boolean isSync) {
+ PingServiceAPI pingServiceAPI = (PingServiceAPI) ServiceHelper.getGlobalInstance(PingServiceAPI.class, this);
+ if (pingServiceAPI == null) {
/* Ping service not found. */
- return Response.ok(new String("No ping service")).status(500)
- .build();
+ return Response.ok(new String("No ping service")).status(500).build();
}
- if (ping.pingDestination(ipAddress))
- return Response.ok(new String(ipAddress + " - reachable")).build();
-
- return Response.ok(new String(ipAddress + " - unreachable")).status(503)
- .build();
+ PingServiceAPI.PingResult pingResult = isSync ?
+ pingServiceAPI.pingDestinationSync(ipAddress) : pingServiceAPI.pingDestinationAsync(ipAddress);
+ if (pingResult == PingServiceAPI.PingResult.InProgress)
+ return Response.ok(new String(ipAddress + " - " + pingResult)).status(206).build();
+ if (pingResult == PingServiceAPI.PingResult.GotResponse)
+ return Response.ok(new String(ipAddress + " - " + pingResult)).build();
+ return Response.ok(new String(ipAddress + " - " + pingResult)).status(503).build();
}
}
package org.opendaylight.controller.ping.plugin.internal;
-import java.io.IOException;
import java.net.InetAddress;
import java.util.Collections;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.opendaylight.controller.sal.common.util.Rpcs;
import org.opendaylight.yangtools.yang.common.RpcError;
import org.opendaylight.yangtools.yang.common.RpcResult;
-import com.google.common.util.concurrent.Futures;
-
public class PingImpl implements PingService {
-
- private EchoResult pingHost(InetAddress destination) throws IOException {
- if (destination.isReachable(5000)) {
- return EchoResult.Reachable;
- } else {
- return EchoResult.Unreachable;
- }
+ private final ExecutorService pool = Executors.newFixedThreadPool(2);
+
+ private Future<RpcResult<SendEchoOutput>> startPingHost(final SendEchoInput destination) {
+ return pool.submit(new Callable<RpcResult<SendEchoOutput>>() {
+ @Override
+ public RpcResult<SendEchoOutput> call() throws Exception {
+ SendEchoOutputBuilder ob = new SendEchoOutputBuilder();
+ try {
+ InetAddress dst = InetAddress.getByName(destination.getDestination().getValue());
+ /* Build the result and return it. */
+ ob.setEchoResult(dst.isReachable(5000) ? EchoResult.Reachable : EchoResult.Unreachable);
+ } catch (Exception e) {
+ /* Return error result. */
+ ob.setEchoResult(EchoResult.Error);
+ }
+ return Rpcs.<SendEchoOutput>getRpcResult(true, ob.build(), Collections.<RpcError>emptySet());
+ }
+ });
}
@Override
public Future<RpcResult<SendEchoOutput>> sendEcho(SendEchoInput destination) {
- try {
- InetAddress dst = InetAddress.getByName(destination
- .getDestination().getValue());
- EchoResult result = this.pingHost(dst);
-
- /* Build the result and return it. */
- SendEchoOutputBuilder ob = new SendEchoOutputBuilder();
- ob.setEchoResult(result);
- RpcResult<SendEchoOutput> rpcResult =
- Rpcs.<SendEchoOutput> getRpcResult(true, ob.build(),
- Collections.<RpcError> emptySet());
-
- return Futures.immediateFuture(rpcResult);
- } catch (Exception e) {
-
- /* Return error result. */
- SendEchoOutputBuilder ob = new SendEchoOutputBuilder();
- ob.setEchoResult(EchoResult.Error);
- RpcResult<SendEchoOutput> rpcResult =
- Rpcs.<SendEchoOutput> getRpcResult(true, ob.build(),
- Collections.<RpcError> emptySet());
- return Futures.immediateFuture(rpcResult);
- }
+ return this.startPingHost(destination);
}
}
public interface PingServiceAPI {
+ public enum PingResult {
+ InProgress(0),
+ GotResponse(1),
+ NoResponse(2),
+ Error(3);
+
+ int value;
+ static java.util.Map<java.lang.Integer, PingResult> valueMap;
+
+ static {
+ valueMap = new java.util.HashMap<>();
+ for (PingResult enumItem : PingResult.values())
+ {
+ valueMap.put(enumItem.value, enumItem);
+ }
+ }
+
+ private PingResult(int value) {
+ this.value = value;
+ }
+
+ /**
+ * @return integer value
+ */
+ public int getIntValue() {
+ return value;
+ }
+
+ /**
+ * @param valueArg
+ * @return corresponding EchoResult item
+ */
+ public static PingResult forValue(int valueArg) {
+ return valueMap.get(valueArg);
+ }
+ }
+
/**
- * pingDestination
+ * pingDestinationSync
+ *
+ * Will block caller until ping operation is finished.
+ *
+ * @param address An IPv4 address to be pinged
+ * @return PingResult enum. Will never return InProgress.
+ */
+ PingResult pingDestinationSync(String address);
+
+ /**
+ * pingDestinationAsync
+ *
+ * Will return last known state for given address.
+ *
+ * @param address An IPv4 address to be pinged
+ * @return PingResult enum.
+ */
+ PingResult pingDestinationAsync(String address);
+
+ /**
+ * pingAsyncStop
+ *
+ * Will stop async ping for given address.
*
* @param address An IPv4 address to be pinged
- * @return True if address is reachable,
- * false if address is unreachable or error occurs.
*/
- boolean pingDestination(String address);
+ void pingAsyncStop(String address);
}
package org.opendaylight.controller.ping.service.impl;
+import java.util.Iterator;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import org.opendaylight.controller.ping.service.api.PingServiceAPI;
import org.opendaylight.controller.sal.binding.api.AbstractBindingAwareConsumer;
public class PingServiceImpl extends AbstractBindingAwareConsumer implements
BundleActivator, BindingAwareConsumer, PingServiceAPI {
- private PingService ping;
+ private PingService pingService;
private ConsumerContext session;
+ private class AsyncPingEntry {
+ public Future<RpcResult<SendEchoOutput>> pendingResult;
+ public SendEchoOutput.EchoResult lastEchoResult;
+ public int countSent = 0; // debug use
+ public int countReceived = 0; // debug use
+
+ public AsyncPingEntry(Ipv4Address destination, PingService pingService, AsyncPingEntry previousAsyncPingEntry) {
+ if (previousAsyncPingEntry != null) {
+ this.lastEchoResult = previousAsyncPingEntry.lastEchoResult;
+ this.countSent = previousAsyncPingEntry.countSent;
+ this.countReceived = previousAsyncPingEntry.countReceived;
+ }
+
+ /** Tickle the ping service to go ahead and send the echo packet.
+ */
+ try {
+ SendEchoInputBuilder ib = new SendEchoInputBuilder();
+ ib.setDestination(destination);
+ this.pendingResult = pingService.sendEcho(ib.build());
+ this.countSent++;
+ } catch (Exception e) {
+ this.pendingResult = null;
+ }
+ }
+ }
+ private ConcurrentMap<Ipv4Address, AsyncPingEntry> asyncPingEntryConcurrentMap;
+ private Timer asyncPingTimer;
+ private class AsyncPingTimerHandler extends TimerTask {
+ /** The purpose of AsyncPingTimerHandler's life is to look for async entries that have been
+ * finished with their RPC calls to the ping plugin. When such entries are found, it simply
+ * restarts them, by replacing the 'data' wiht a new AsyncPingEntry instance. While invoking
+ * the AsyncPingEntry's constructor, the new RPC call to the ping plugin backend is executed.
+ */
+ @Override
+ public void run() {
+ Iterator<ConcurrentMap.Entry<Ipv4Address, AsyncPingEntry>> iterator =
+ asyncPingEntryConcurrentMap.entrySet().iterator();
+ while (iterator.hasNext()) {
+ try {
+ ConcurrentHashMap.Entry<Ipv4Address, AsyncPingEntry> entry = iterator.next();
+ Ipv4Address destination = entry.getKey();
+ AsyncPingEntry asyncPingEntry = entry.getValue();
+
+ /** Re-initiate ping, as long as last attempt for doing it is finished
+ */
+ if (asyncPingEntry.pendingResult == null || asyncPingEntry.pendingResult.isDone()) {
+ /** Bump receive count if destination was reachable
+ */
+ if (asyncPingEntry != null) {
+ final SendEchoOutput.EchoResult echoResult =
+ asyncPingEntry.pendingResult.get().getResult().getEchoResult();
+
+ asyncPingEntry.lastEchoResult = echoResult;
+ if (echoResult == SendEchoOutput.EchoResult.Reachable) { asyncPingEntry.countReceived++; }
+ }
+
+ /** Replace the entry for a key only if currently mapped to some value.
+ * This will protect the concurrent map against a race where this thread
+ * would be re-adding an entry that just got taken out.
+ *
+ * Note that by constructing a new AsyncPingEntry instance, the ping plugin is
+ * triggered and the cycle restarts by waiting for asyncPingEntry.pendingResult
+ * of the new instance to be done.
+ */
+ asyncPingEntryConcurrentMap.replace(destination,
+ new AsyncPingEntry(destination, pingService, asyncPingEntry));
+ }
+ } catch (InterruptedException ie) {
+ } catch (ExecutionException ee) {
+ }
+ }
+ }
+ }
+
+ private PingResult mapResult(SendEchoOutput.EchoResult echoResult) {
+ // Translate echoResult to pingResult
+ switch (echoResult) {
+ case Reachable:
+ return PingResult.GotResponse;
+ case Unreachable:
+ return PingResult.NoResponse;
+ case Error:
+ default:
+ break;
+ }
+ return PingResult.Error;
+ }
+
@Override
public void onSessionInitialized(ConsumerContext session) {
this.session = session;
+ this.pingService = this.session.getRpcService(PingService.class);
}
@Override
protected void startImpl(BundleContext context) {
+ asyncPingEntryConcurrentMap = new ConcurrentHashMap<Ipv4Address, AsyncPingEntry>();
+
+ /* Async Ping Timer to go off every 1 second for periodic pingService behavior */
+ asyncPingTimer = new Timer();
+ asyncPingTimer.schedule(new AsyncPingTimerHandler(), 1000, 1000);
+
context.registerService(PingServiceAPI.class, this, null);
}
@Override
- public boolean pingDestination(String address) {
+ protected void stopImpl(BundleContext context) {
+ asyncPingTimer.cancel();
+ asyncPingEntryConcurrentMap.clear();
+ }
- if (ping == null) {
- ping = this.session.getRpcService(PingService.class);
- if (ping == null) {
+ @Override
+ public PingResult pingDestinationSync(String address) {
+ if (pingService == null) { return PingResult.Error; } // No pingService service found.
- /* No ping service found. */
- return false;
- }
+ try {
+ Ipv4Address destination = new Ipv4Address(address);
+ SendEchoInputBuilder ib = new SendEchoInputBuilder();
+ ib.setDestination(destination);
+ return mapResult(pingService.sendEcho(ib.build()).get().getResult().getEchoResult());
+ } catch (InterruptedException ie) {
+ } catch (ExecutionException ee) {
}
+ return PingResult.Error;
+ }
+ @Override
+ public PingResult pingDestinationAsync(String address) {
+ if (pingService == null) { return PingResult.Error; } // No pingService service found.
+
+ /** Look for destination in asyncPingEntryConcurrentMap. If none is found, create a new entry
+ * and return "in progress". This will happen on the very first time async is requested.
+ */
Ipv4Address destination = new Ipv4Address(address);
+ AsyncPingEntry asyncPingEntry = asyncPingEntryConcurrentMap.get(destination);
+ if (asyncPingEntry == null) {
+ asyncPingEntryConcurrentMap.put(destination, new AsyncPingEntry(destination, pingService, null));
+ return PingResult.InProgress;
+ }
- SendEchoInputBuilder ib = new SendEchoInputBuilder();
- ib.setDestination(destination);
+ /** Pending result may not be ready to be consumed. In such case, use lastResult. If there has
+ * not been a lastResult, then the only choice is to use "in progress".
+ */
+ if (!asyncPingEntry.pendingResult.isDone()) {
+ return asyncPingEntry.lastEchoResult == null ?
+ PingResult.InProgress : mapResult(asyncPingEntry.lastEchoResult);
+ }
+
+ /** If we made it this far, we know that pendingResult contains the latest and greatest result
+ */
try {
- RpcResult<SendEchoOutput> result = ping.sendEcho(ib.build()).get();
- switch (result.getResult().getEchoResult()) {
- case Reachable:
- return true;
- case Unreachable:
- case Error:
- default:
- return false;
- }
+ return mapResult(asyncPingEntry.pendingResult.get().getResult().getEchoResult());
} catch (InterruptedException ie) {
} catch (ExecutionException ee) {
}
-
- return false;
+ return PingResult.Error;
}
+ @Override
+ public void pingAsyncStop(String address) {
+ asyncPingEntryConcurrentMap.remove( new Ipv4Address(address) );
+ }
}