This is an extension to the existing wiki page on md-sal ping tutorial.
It exposes the use of Future<> with the rpc api generated by yang-tools,
to make the ping service non-blocking; using the uris:
.../ping/async/start/{ipAddress}
.../ping/async/get/{ipAddress}
.../ping/async/clear/{ipAddress}
If this becomes interesting enough, I will update the link https://wiki.opendaylight.org/view/Ping
with the stepping stones.
Example:
for x in 127.0.0.1 128.0.0.1 192.168.1.1 ;
do curl --user "admin":"admin" -X PUT http://localhost:8080/controller/nb/v2/ping/async/start/${x} ; echo ; done
while : ; do for x in 127.0.0.1 128.0.0.1 192.168.1.1 ;
do curl --user "admin":"admin" -X PUT http://localhost:8080/controller/nb/v2/ping/async/get/${x} ; echo ;
done ; sleep 2 ; done
for x in 127.0.0.1 128.0.0.1 192.168.1.1 ;
do curl --user "admin":"admin" -X PUT http://localhost:8080/controller/nb/v2/ping/async/clear/${x} ; echo ; done
[DO NOT MERGE]
Change-Id: Ibfaee34ace0435cac39006a7e9c6577df2bf71a8
Signed-off-by: Flavio Fernandes <ffernand@redhat.com>
@Path("/ping/{ipAddress}")
@PUT
@StatusCodes({
- @ResponseCode(code = 200, condition = "Destination reachable"),
- @ResponseCode(code = 503, condition = "Internal error"),
- @ResponseCode(code = 503, condition = "Destination unreachable") })
+ @ResponseCode(code = 200, condition = "Destination reachable"),
+ // @ResponseCode(code = 206, condition = "Ping in progress"),
+ @ResponseCode(code = 503, condition = "Internal error"),
+ @ResponseCode(code = 503, condition = "Destination unreachable") })
public Response ping(@PathParam(value = "ipAddress") String ipAddress) {
- PingServiceAPI ping = (PingServiceAPI) ServiceHelper.getGlobalInstance(
- PingServiceAPI.class, this);
- if (ping == null) {
+ return pingCommon(ipAddress, true);
+ }
+
+ @Path("/ping/async/start/{ipAddress}")
+ @PUT
+ @StatusCodes({
+ @ResponseCode(code = 200, condition = "Destination reachable"),
+ @ResponseCode(code = 206, condition = "Ping in progress"),
+ @ResponseCode(code = 503, condition = "Internal error"),
+ @ResponseCode(code = 503, condition = "Destination unreachable") })
+ public Response pingAsyncStart(@PathParam(value = "ipAddress") String ipAddress) {
+ return pingCommon(ipAddress, false);
+ }
+
+ @Path("/ping/async/get/{ipAddress}")
+ @PUT
+ @StatusCodes({
+ @ResponseCode(code = 200, condition = "Destination reachable"),
+ @ResponseCode(code = 206, condition = "Ping in progress"),
+ @ResponseCode(code = 503, condition = "Internal error"),
+ @ResponseCode(code = 503, condition = "Destination unreachable") })
+ public Response pingAsyncGet(@PathParam(value = "ipAddress") String ipAddress) {
+ return pingCommon(ipAddress, false);
+ }
+ @Path("/ping/async/clear/{ipAddress}")
+ @PUT
+ @StatusCodes({
+ @ResponseCode(code = 200, condition = "Async ping removed"),
+ @ResponseCode(code = 503, condition = "Internal error")})
+ public Response pingAsyncClear(@PathParam(value = "ipAddress") String ipAddress) {
+ PingServiceAPI pingServiceAPI = (PingServiceAPI) ServiceHelper.getGlobalInstance(PingServiceAPI.class, this);
+ if (pingServiceAPI != null) { pingServiceAPI.pingAsyncClear(ipAddress); }
+ return Response.ok(new String(ipAddress + " - removed")).build(); // idem-potent
+ }
+
+ private Response pingCommon(String ipAddress, boolean isSync) {
+ PingServiceAPI pingServiceAPI = (PingServiceAPI) ServiceHelper.getGlobalInstance(PingServiceAPI.class, this);
+ if (pingServiceAPI == null) {
/* Ping service not found. */
- return Response.ok(new String("No ping service")).status(500)
- .build();
+ return Response.ok(new String("No ping service")).status(500).build();
}
- if (ping.pingDestination(ipAddress))
- return Response.ok(new String(ipAddress + " - reachable")).build();
-
- return Response.ok(new String(ipAddress + " - unreachable")).status(503)
- .build();
+ PingServiceAPI.PingResult pingResult = isSync ?
+ pingServiceAPI.pingDestinationSync(ipAddress) : pingServiceAPI.pingDestinationAsync(ipAddress);
+ if (pingResult == PingServiceAPI.PingResult.InProgress)
+ return Response.ok(new String(ipAddress + " - " + pingResult)).status(206).build();
+ if (pingResult == PingServiceAPI.PingResult.GotResponse)
+ return Response.ok(new String(ipAddress + " - " + pingResult)).build();
+ return Response.ok(new String(ipAddress + " - " + pingResult)).status(503).build();
}
}
package org.opendaylight.controller.ping.plugin.internal;
-import java.io.IOException;
import java.net.InetAddress;
import java.util.Collections;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.opendaylight.controller.sal.common.util.Rpcs;
import org.opendaylight.yangtools.yang.common.RpcError;
import org.opendaylight.yangtools.yang.common.RpcResult;
-import com.google.common.util.concurrent.Futures;
-
public class PingImpl implements PingService {
-
- private EchoResult pingHost(InetAddress destination) throws IOException {
- if (destination.isReachable(5000)) {
- return EchoResult.Reachable;
- } else {
- return EchoResult.Unreachable;
- }
+ private final ExecutorService pool = Executors.newFixedThreadPool(2);
+
+ private Future<RpcResult<SendEchoOutput>> startPingHost(final SendEchoInput destination) {
+ return pool.submit(new Callable<RpcResult<SendEchoOutput>>() {
+ @Override
+ public RpcResult<SendEchoOutput> call() throws Exception {
+ SendEchoOutputBuilder ob = new SendEchoOutputBuilder();
+ try {
+ InetAddress dst = InetAddress.getByName(destination.getDestination().getValue());
+ /* Build the result and return it. */
+ ob.setEchoResult(dst.isReachable(5000) ? EchoResult.Reachable : EchoResult.Unreachable);
+ } catch (Exception e) {
+ /* Return error result. */
+ ob.setEchoResult(EchoResult.Error);
+ }
+ return Rpcs.<SendEchoOutput>getRpcResult(true, ob.build(), Collections.<RpcError>emptySet());
+ }
+ });
}
@Override
public Future<RpcResult<SendEchoOutput>> sendEcho(SendEchoInput destination) {
- try {
- InetAddress dst = InetAddress.getByName(destination
- .getDestination().getValue());
- EchoResult result = this.pingHost(dst);
-
- /* Build the result and return it. */
- SendEchoOutputBuilder ob = new SendEchoOutputBuilder();
- ob.setEchoResult(result);
- RpcResult<SendEchoOutput> rpcResult =
- Rpcs.<SendEchoOutput> getRpcResult(true, ob.build(),
- Collections.<RpcError> emptySet());
-
- return Futures.immediateFuture(rpcResult);
- } catch (Exception e) {
-
- /* Return error result. */
- SendEchoOutputBuilder ob = new SendEchoOutputBuilder();
- ob.setEchoResult(EchoResult.Error);
- RpcResult<SendEchoOutput> rpcResult =
- Rpcs.<SendEchoOutput> getRpcResult(true, ob.build(),
- Collections.<RpcError> emptySet());
- return Futures.immediateFuture(rpcResult);
- }
+ return this.startPingHost(destination);
}
}
org.opendaylight.yangtools.yang.common,
org.opendaylight.yangtools.yang.binding,
org.opendaylight.controller.sal.binding.api,
+ org.slf4j,
org.osgi.framework</Import-Package>
<Export-Package>org.opendaylight.controller.ping.service.api</Export-Package>
<Bundle-Activator>org.opendaylight.controller.ping.service.impl.PingServiceImpl</Bundle-Activator>
public interface PingServiceAPI {
+ public enum PingResult {
+ InProgress(0),
+ GotResponse(1),
+ NoResponse(2),
+ Error(3);
+
+ int value;
+ static java.util.Map<java.lang.Integer, PingResult> valueMap;
+
+ static {
+ valueMap = new java.util.HashMap<>();
+ for (PingResult enumItem : PingResult.values())
+ {
+ valueMap.put(enumItem.value, enumItem);
+ }
+ }
+
+ private PingResult(int value) {
+ this.value = value;
+ }
+
+ /**
+ * @return integer value
+ */
+ public int getIntValue() {
+ return value;
+ }
+
+ /**
+ * @param valueArg
+ * @return corresponding EchoResult item
+ */
+ public static PingResult forValue(int valueArg) {
+ return valueMap.get(valueArg);
+ }
+ }
+
/**
- * pingDestination
+ * pingDestinationSync
+ *
+ * Will block caller until ping operation is finished.
+ *
+ * @param address An IPv4 address to be pinged
+ * @return PingResult enum. Will never return InProgress.
+ */
+ PingResult pingDestinationSync(String address);
+
+ /**
+ * pingDestinationAsync
+ *
+ * Will return last known state for given address.
+ *
+ * @param address An IPv4 address to be pinged
+ * @return PingResult enum.
+ */
+ PingResult pingDestinationAsync(String address);
+
+ /**
+ * pingAsyncClear
+ *
+ * Will remove async ping for given address.
*
* @param address An IPv4 address to be pinged
- * @return True if address is reachable,
- * false if address is unreachable or error occurs.
*/
- boolean pingDestination(String address);
+ void pingAsyncClear(String address);
}
package org.opendaylight.controller.ping.service.impl;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import org.opendaylight.controller.ping.service.api.PingServiceAPI;
import org.opendaylight.controller.sal.binding.api.AbstractBindingAwareConsumer;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.osgi.framework.BundleActivator;
import org.osgi.framework.BundleContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class PingServiceImpl extends AbstractBindingAwareConsumer implements
BundleActivator, BindingAwareConsumer, PingServiceAPI {
+ private static Logger log = LoggerFactory.getLogger(PingServiceImpl.class);
- private PingService ping;
+ private PingService pingService;
private ConsumerContext session;
+ private Map<Ipv4Address, Future<RpcResult<SendEchoOutput>>> asyncPingEntryMap;
@Override
public void onSessionInitialized(ConsumerContext session) {
this.session = session;
+ this.pingService = this.session.getRpcService(PingService.class);
}
@Override
protected void startImpl(BundleContext context) {
+ asyncPingEntryMap = new HashMap<>();
context.registerService(PingServiceAPI.class, this, null);
}
@Override
- public boolean pingDestination(String address) {
+ protected void stopImpl(BundleContext context) {
+ asyncPingEntryMap.clear();
+ }
+
+ @Override
+ public PingResult pingDestinationSync(String address) {
+ return _pingDestinationSync(address);
+ }
- if (ping == null) {
- ping = this.session.getRpcService(PingService.class);
- if (ping == null) {
+ @Override
+ public PingResult pingDestinationAsync(String address) {
+ return _pingDestinationAsync(address);
+ }
- /* No ping service found. */
- return false;
- }
+ @Override
+ public void pingAsyncClear(String address) {
+ _pingAsyncClear(address);
+ }
+
+ private PingResult _pingDestinationSync(String address) {
+ if (pingService == null) { return PingResult.Error; } // No pingService service found.
+
+ try {
+ Ipv4Address destination = new Ipv4Address(address);
+ SendEchoInputBuilder ib = new SendEchoInputBuilder();
+ ib.setDestination(destination);
+ return mapResult(pingService.sendEcho(ib.build()).get().getResult().getEchoResult());
+ } catch (InterruptedException ie) {
+ log.warn("InterruptedException received by pingDestinationSync: {} from: {}",
+ address, ie.getMessage());
+ } catch (ExecutionException ee) {
+ log.warn("ExecutionException received by pingDestinationSync: {} from: {}",
+ address, ee.getMessage());
}
+ return PingResult.Error;
+ }
- Ipv4Address destination = new Ipv4Address(address);
+ private synchronized PingResult _pingDestinationAsync(String address) {
+ if (pingService == null) { return PingResult.Error; } // No pingService service found.
- SendEchoInputBuilder ib = new SendEchoInputBuilder();
- ib.setDestination(destination);
+ /** Look for destination in asyncPingEntryMap. If none is found, create a new entry
+ * and return "in progress". This will happen on the very first time async is requested.
+ *
+ * NOTE: In a real scenario, you would want to consider a cache which automatically drops
+ * entries, so implementation does not need to rely on users calling async clear to remove
+ * the entries from the map. An example for doing such would be Google's caches or a
+ * weakhash map; which we deliberately chose not to use here for sake of simplicity.
+ */
+ final Ipv4Address destination = new Ipv4Address(address);
+ final Future<RpcResult<SendEchoOutput>> rpcResultFuture = asyncPingEntryMap.get(destination);
+ if (rpcResultFuture == null) {
+ SendEchoInputBuilder ib = new SendEchoInputBuilder();
+ ib.setDestination(destination);
+ asyncPingEntryMap.put(destination, pingService.sendEcho(ib.build()));
+ log.info("Starting pingDestinationAsync: {}", address);
+ return PingResult.InProgress;
+ }
+
+ /** Pending result may not be ready to be consumed. In such case, use "in progress".
+ */
+ if (!rpcResultFuture.isDone()) {
+ log.info("pingDestinationAsync: {} get result is not ready (ie. inProgress)", address);
+ return PingResult.InProgress;
+ }
+
+ /** If we made it this far, we know that rpcResultFuture is ready for consumption.
+ */
try {
- RpcResult<SendEchoOutput> result = ping.sendEcho(ib.build()).get();
- switch (result.getResult().getEchoResult()) {
- case Reachable:
- return true;
- case Unreachable:
- case Error:
- default:
- return false;
- }
+ PingResult pingResult = mapResult(rpcResultFuture.get().getResult().getEchoResult());
+ log.info("pingDestinationAsync: {} get result is {}", address, pingResult);
+ return pingResult;
} catch (InterruptedException ie) {
+ log.warn("InterruptedException received by pingDestinationAsync: {} from: {}",
+ address, ie.getMessage());
} catch (ExecutionException ee) {
+ log.warn("ExecutionException received by pingDestinationAsync: {} from: {}",
+ address, ee.getMessage());
}
+ return PingResult.Error;
+ }
- return false;
+ private synchronized void _pingAsyncClear(String address) {
+ asyncPingEntryMap.remove(new Ipv4Address(address));
+ log.info("Removing pingDestinationAsync: {}", address);
}
+ private static PingResult mapResult(SendEchoOutput.EchoResult echoResult) {
+ // Translate echoResult to pingResult
+ switch (echoResult) {
+ case Reachable:
+ return PingResult.GotResponse;
+ case Unreachable:
+ return PingResult.NoResponse;
+ case Error:
+ default:
+ break;
+ }
+ return PingResult.Error;
+ }
}