import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.math.BigInteger;
+import java.util.Objects;
+import java.util.function.Function;
import javax.annotation.Nonnull;
import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
return deviceContext;
}
- protected DeviceRegistry getDeviceRegistry() {return deviceContext;}
+ protected DeviceRegistry getDeviceRegistry() {
+ return deviceContext;
+ }
- public DeviceInfo getDeviceInfo() {return deviceContext.getDeviceInfo();}
+ public DeviceInfo getDeviceInfo() {
+ return deviceContext.getDeviceInfo();
+ }
- public TxFacade getTxFacade() {return deviceContext;}
+ public TxFacade getTxFacade() {
+ return deviceContext;
+ }
public MessageSpy getMessageSpy() {
return messageSpy;
protected abstract FutureCallback<OfHeader> createCallback(RequestContext<O> context, Class<?> requestType);
public ListenableFuture<RpcResult<O>> handleServiceCall(@Nonnull final I input) {
+ return handleServiceCall(input, null);
+ }
+
+ public ListenableFuture<RpcResult<O>> handleServiceCall(@Nonnull final I input,
+ @Nonnull final Function<OfHeader, Boolean> isComplete) {
Preconditions.checkNotNull(input);
final Class<?> requestType;
try {
request = buildRequest(xid, input);
Verify.verify(xid.getValue().equals(request.getXid()), "Expected XID %s got %s", xid.getValue(), request.getXid());
- } catch (Exception e) {
- LOG.error("Failed to build request for {}, forfeiting request {}", input, xid.getValue(), e);
- RequestContextUtil.closeRequestContextWithRpcError(requestContext, "failed to build request input: " + e.getMessage());
+ } catch (Exception ex) {
+ LOG.error("Failed to build request for {}, forfeiting request {}", input, xid.getValue(), ex);
+ RequestContextUtil.closeRequestContextWithRpcError(requestContext, "failed to build request input: " + ex.getMessage());
} finally {
final OutboundQueue outboundQueue = getDeviceContext().getPrimaryConnectionContext().getOutboundQueueProvider();
- outboundQueue.commitEntry(xid.getValue(), request, createCallback(requestContext, requestType));
+
+ if (Objects.nonNull(isComplete)) {
+ outboundQueue.commitEntry(xid.getValue(), request, createCallback(requestContext, requestType), isComplete);
+ } else {
+ outboundQueue.commitEntry(xid.getValue(), request, createCallback(requestContext, requestType));
+ }
}
return requestContext.getFuture();
protected static <T> ListenableFuture<RpcResult<T>> failedFuture() {
final RpcResult<T> rpcResult = RpcResultBuilder.<T>failed()
- .withError(RpcError.ErrorType.APPLICATION, "", "Request quota exceeded").build();
+ .withError(RpcError.ErrorType.APPLICATION, "", "Request quota exceeded").build();
return Futures.immediateFuture(rpcResult);
}
}