From a11df192b34c34d9f197c551ee56fc266e549fcc Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Fri, 16 May 2014 10:26:38 +0200 Subject: [PATCH] Improve cache interactions Do not interact with the XID request cache until the request completes. Change-Id: I39a8b0bac94474a5a52ef461ed596e70bddfd087 Signed-off-by: Robert Varga Signed-off-by: Michal Polkorab --- .../impl/connection/AbstractRpcListener.java | 69 ++++++ .../connection/ConnectionAdapterImpl.java | 214 +++++------------- .../ResponseExpectedRpcListener.java | 46 ++++ .../impl/connection/SimpleRpcListener.java | 19 ++ 4 files changed, 192 insertions(+), 156 deletions(-) create mode 100644 openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/AbstractRpcListener.java create mode 100644 openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/ResponseExpectedRpcListener.java create mode 100644 openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/SimpleRpcListener.java diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/AbstractRpcListener.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/AbstractRpcListener.java new file mode 100644 index 00000000..28555923 --- /dev/null +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/AbstractRpcListener.java @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2014 Pantheon Technologies s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.openflowjava.protocol.impl.connection; + +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; + +import java.util.Collections; + +import org.opendaylight.controller.sal.common.util.Rpcs; +import org.opendaylight.yangtools.yang.common.RpcError; +import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity; +import org.opendaylight.yangtools.yang.common.RpcResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; + +abstract class AbstractRpcListener implements GenericFutureListener> { + private static final Logger LOG = LoggerFactory.getLogger(AbstractRpcListener.class); + private final SettableFuture> result = SettableFuture.create(); + private final String failureInfo; + + AbstractRpcListener(final String failureInfo) { + this.failureInfo = failureInfo; + } + + public final ListenableFuture> getResult() { + return result; + } + + @Override + public final void operationComplete(final Future future) { + if (!future.isSuccess()) { + LOG.debug("operation failed"); + failedRpc(future.cause()); + } else { + LOG.debug("operation complete"); + operationSuccessful(); + } + } + + abstract protected void operationSuccessful(); + + protected final void failedRpc(final Throwable cause) { + final RpcError rpcError = ConnectionAdapterImpl.buildRpcError( + failureInfo, ErrorSeverity.ERROR, "check switch connection", cause); + + result.set(Rpcs.getRpcResult( + false, + (T)null, + Collections.singletonList(rpcError))); + } + + protected final void successfulRpc(final T value) { + + result.set(Rpcs.getRpcResult( + true, + value, + Collections.emptyList())); + } + +} diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/ConnectionAdapterImpl.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/ConnectionAdapterImpl.java index e3822dd6..070d7d5e 100644 --- a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/ConnectionAdapterImpl.java +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/ConnectionAdapterImpl.java @@ -13,14 +13,10 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.util.concurrent.GenericFutureListener; -import java.util.Collection; -import java.util.Collections; -import java.util.List; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.opendaylight.controller.sal.common.util.RpcErrors; -import org.opendaylight.controller.sal.common.util.Rpcs; import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutput; @@ -75,7 +71,7 @@ import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.cache.RemovalListener; import com.google.common.cache.RemovalNotification; -import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; /** @@ -96,7 +92,7 @@ public class ConnectionAdapterImpl implements ConnectionFacade { private Channel channel; private OpenflowProtocolListener messageListener; /** expiring cache for future rpcResponses */ - protected Cache> responseCache; + protected Cache> responseCache; private SystemNotificationsListener systemListener; private boolean disconnectOccured = false; @@ -116,112 +112,112 @@ public class ConnectionAdapterImpl implements ConnectionFacade { /** * @param channel the channel to be set - used for communication */ - public void setChannel(Channel channel) { + public void setChannel(final Channel channel) { this.channel = channel; } @Override - public Future> barrier(BarrierInput input) { + public Future> barrier(final BarrierInput input) { return sendToSwitchExpectRpcResultFuture( input, BarrierOutput.class, "barrier-input sending failed"); } @Override - public Future> echo(EchoInput input) { + public Future> echo(final EchoInput input) { return sendToSwitchExpectRpcResultFuture( input, EchoOutput.class, "echo-input sending failed"); } @Override - public Future> echoReply(EchoReplyInput input) { + public Future> echoReply(final EchoReplyInput input) { return sendToSwitchFuture(input, "echo-reply sending failed"); } @Override - public Future> experimenter(ExperimenterInput input) { + public Future> experimenter(final ExperimenterInput input) { return sendToSwitchFuture(input, "experimenter sending failed"); } @Override - public Future> flowMod(FlowModInput input) { + public Future> flowMod(final FlowModInput input) { return sendToSwitchFuture(input, "flow-mod sending failed"); } @Override - public Future> getConfig(GetConfigInput input) { + public Future> getConfig(final GetConfigInput input) { return sendToSwitchExpectRpcResultFuture( input, GetConfigOutput.class, "get-config-input sending failed"); } @Override public Future> getFeatures( - GetFeaturesInput input) { + final GetFeaturesInput input) { return sendToSwitchExpectRpcResultFuture( input, GetFeaturesOutput.class, "get-features-input sending failed"); } @Override public Future> getQueueConfig( - GetQueueConfigInput input) { + final GetQueueConfigInput input) { return sendToSwitchExpectRpcResultFuture( input, GetQueueConfigOutput.class, "get-queue-config-input sending failed"); } @Override - public Future> groupMod(GroupModInput input) { + public Future> groupMod(final GroupModInput input) { return sendToSwitchFuture(input, "group-mod-input sending failed"); } @Override - public Future> hello(HelloInput input) { + public Future> hello(final HelloInput input) { return sendToSwitchFuture(input, "hello-input sending failed"); } @Override - public Future> meterMod(MeterModInput input) { + public Future> meterMod(final MeterModInput input) { return sendToSwitchFuture(input, "meter-mod-input sending failed"); } @Override - public Future> packetOut(PacketOutInput input) { + public Future> packetOut(final PacketOutInput input) { return sendToSwitchFuture(input, "packet-out-input sending failed"); } @Override - public Future> multipartRequest(MultipartRequestInput input) { + public Future> multipartRequest(final MultipartRequestInput input) { return sendToSwitchFuture(input, "multi-part-request sending failed"); } @Override - public Future> portMod(PortModInput input) { + public Future> portMod(final PortModInput input) { return sendToSwitchFuture(input, "port-mod-input sending failed"); } @Override public Future> roleRequest( - RoleRequestInput input) { + final RoleRequestInput input) { return sendToSwitchExpectRpcResultFuture( input, RoleRequestOutput.class, "role-request-config-input sending failed"); } @Override - public Future> setConfig(SetConfigInput input) { + public Future> setConfig(final SetConfigInput input) { return sendToSwitchFuture(input, "set-config-input sending failed"); } @Override - public Future> tableMod(TableModInput input) { + public Future> tableMod(final TableModInput input) { return sendToSwitchFuture(input, "table-mod-input sending failed"); } @Override - public Future> getAsync(GetAsyncInput input) { + public Future> getAsync(final GetAsyncInput input) { return sendToSwitchExpectRpcResultFuture( input, GetAsyncOutput.class, "get-async-input sending failed"); } @Override - public Future> setAsync(SetAsyncInput input) { + public Future> setAsync(final SetAsyncInput input) { return sendToSwitchFuture(input, "set-async-input sending failed"); } @@ -243,12 +239,12 @@ public class ConnectionAdapterImpl implements ConnectionFacade { } @Override - public void setMessageListener(OpenflowProtocolListener messageListener) { + public void setMessageListener(final OpenflowProtocolListener messageListener) { this.messageListener = messageListener; } @Override - public void consume(DataObject message) { + public void consume(final DataObject message) { LOG.debug("ConsumeIntern msg"); if (disconnectOccured ) { return; @@ -281,25 +277,23 @@ public class ConnectionAdapterImpl implements ConnectionFacade { } else if (message instanceof PortStatusMessage) { messageListener.onPortStatusMessage((PortStatusMessage) message); } else { - LOG.warn("message listening not supported for type: "+message.getClass()); + LOG.warn("message listening not supported for type: {}", message.getClass()); } } else { if (message instanceof OfHeader) { LOG.debug("OFheader msg received"); RpcResponseKey key = createRpcResponseKey((OfHeader) message); - final SettableFuture> rpcFuture = findRpcResponse(key); - if (rpcFuture != null) { + final ResponseExpectedRpcListener listener = findRpcResponse(key); + if (listener != null) { LOG.debug("corresponding rpcFuture found"); - List errors = Collections.emptyList(); - LOG.debug("before setting rpcFuture"); - rpcFuture.set(Rpcs.getRpcResult(true, message, errors)); + listener.completed((OfHeader)message); LOG.debug("after setting rpcFuture"); responseCache.invalidate(key); } else { - LOG.warn("received unexpected rpc response: "+key); + LOG.warn("received unexpected rpc response: {}", key); } } else { - LOG.warn("message listening not supported for type: "+message.getClass()); + LOG.warn("message listening not supported for type: {}", message.getClass()); } } } @@ -314,15 +308,15 @@ public class ConnectionAdapterImpl implements ConnectionFacade { *
  • else {@link RpcResult} will contain errors and failed status
  • * */ - private SettableFuture> sendToSwitchFuture( - DataObject input, final String failureInfo) { + private ListenableFuture> sendToSwitchFuture( + final DataObject input, final String failureInfo) { + final SimpleRpcListener listener = new SimpleRpcListener(failureInfo); + LOG.debug("going to flush"); - ChannelFuture resultFuture = channel.writeAndFlush(input); + channel.writeAndFlush(input).addListener(listener); LOG.debug("flushed"); - ErrorSeverity errorSeverity = ErrorSeverity.ERROR; - String errorMessage = "check switch connection"; - return handleRpcChannelFuture(resultFuture, failureInfo, errorSeverity, errorMessage); + return listener.getResult(); } /** @@ -340,104 +334,17 @@ public class ConnectionAdapterImpl implements ConnectionFacade { * * */ - private SettableFuture> sendToSwitchExpectRpcResultFuture( - IN input, Class responseClazz, final String failureInfo) { + private ListenableFuture> sendToSwitchExpectRpcResultFuture( + final IN input, final Class responseClazz, final String failureInfo) { + final RpcResponseKey key = new RpcResponseKey(input.getXid(), responseClazz.getName()); + final ResponseExpectedRpcListener listener = + new ResponseExpectedRpcListener<>(failureInfo, responseCache, key); + LOG.debug("going to flush"); - SettableFuture> rpcResult = SettableFuture.create(); - RpcResponseKey key = new RpcResponseKey(input.getXid(), responseClazz.getName()); - responseCache.put(key, rpcResult); - ChannelFuture resultFuture = channel.writeAndFlush(input); + channel.writeAndFlush(input).addListener(listener); LOG.debug("flushed"); - ErrorSeverity errorSeverity = ErrorSeverity.ERROR; - String errorMessage = "check switch connection"; - - return handleRpcChannelFutureWithResponse(resultFuture, failureInfo, errorSeverity, - errorMessage, input, responseClazz, rpcResult, key); - } - - /** - * @param resultFuture - * @param failureInfo - * @return - */ - private SettableFuture> handleRpcChannelFuture( - ChannelFuture resultFuture, final String failureInfo, - final ErrorSeverity errorSeverity, final String errorMessage) { - - final SettableFuture> rpcResult = SettableFuture.create(); - LOG.debug("handlerpcchannelfuture"); - resultFuture.addListener(new GenericFutureListener>() { - - @Override - public void operationComplete( - io.netty.util.concurrent.Future future) - throws Exception { - LOG.debug("operation complete"); - Collection errors = Collections.emptyList(); - - if (future.cause() != null) { - LOG.debug("future.cause != null"); - RpcError rpcError = buildRpcError(failureInfo, - errorSeverity, errorMessage, future.cause()); - errors = Lists.newArrayList(rpcError); - } - - rpcResult.set(Rpcs.getRpcResult( - future.isSuccess(), - (Void) null, - errors) - ); - } - }); - return rpcResult; - } - - /** - * @param resultFuture - * @param failureInfo - * @param errorSeverity - * @param errorMessage - * @param input - * @param responseClazz - * @param key of rpcResponse - * @return - */ - private SettableFuture> handleRpcChannelFutureWithResponse( - ChannelFuture resultFuture, final String failureInfo, - final ErrorSeverity errorSeverity, final String errorMessage, - final IN input, Class responseClazz, final SettableFuture> rpcResult, final RpcResponseKey key) { - LOG.debug("handleRpcchanfuture with response"); - - resultFuture.addListener(new GenericFutureListener>() { - - @Override - public void operationComplete( - io.netty.util.concurrent.Future future) - throws Exception { - - LOG.debug("operation complete"); - Collection errors = Collections.emptyList(); - if (future.cause() != null) { - LOG.debug("ChannelFuture.cause != null"); - RpcError rpcError = buildRpcError(failureInfo, - errorSeverity, errorMessage, future.cause()); - errors = Lists.newArrayList(rpcError); - rpcResult.set(Rpcs.getRpcResult( - future.isSuccess(), - (OUT) null, - errors) - ); - responseCache.invalidate(key); - } else { - LOG.debug("ChannelFuture.cause == null"); - if (responseCache.getIfPresent(key) == null) { - LOG.debug("responcache: key wasn't present"); - } - } - } - }); - return rpcResult; + return listener.getResult(); } /** @@ -448,7 +355,7 @@ public class ConnectionAdapterImpl implements ConnectionFacade { * @return */ private static SettableFuture handleTransportChannelFuture( - ChannelFuture resultFuture, final String failureInfo, + final ChannelFuture resultFuture, final String failureInfo, final ErrorSeverity errorSeverity, final String message) { final SettableFuture transportResult = SettableFuture.create(); @@ -457,7 +364,7 @@ public class ConnectionAdapterImpl implements ConnectionFacade { @Override public void operationComplete( - io.netty.util.concurrent.Future future) + final io.netty.util.concurrent.Future future) throws Exception { transportResult.set(future.isSuccess()); if (!future.isSuccess()) { @@ -472,8 +379,8 @@ public class ConnectionAdapterImpl implements ConnectionFacade { * @param cause * @return */ - protected RpcError buildRpcError(String info, ErrorSeverity severity, String message, - Throwable cause) { + static RpcError buildRpcError(final String info, final ErrorSeverity severity, final String message, + final Throwable cause) { RpcError error = RpcErrors.getRpcError(APPLICATION_TAG, TAG, info, severity, message, ErrorType.RPC, cause); return error; @@ -483,8 +390,8 @@ public class ConnectionAdapterImpl implements ConnectionFacade { * @param cause * @return */ - protected RpcError buildTransportError(String info, ErrorSeverity severity, String message, - Throwable cause) { + protected static RpcError buildTransportError(final String info, final ErrorSeverity severity, final String message, + final Throwable cause) { RpcError error = RpcErrors.getRpcError(APPLICATION_TAG, TAG, info, severity, message, ErrorType.TRANSPORT, cause); return error; @@ -494,20 +401,19 @@ public class ConnectionAdapterImpl implements ConnectionFacade { * @param message * @return */ - private static RpcResponseKey createRpcResponseKey(OfHeader message) { + private static RpcResponseKey createRpcResponseKey(final OfHeader message) { return new RpcResponseKey(message.getXid(), message.getImplementedInterface().getName()); } /** * @return */ - @SuppressWarnings("unchecked") - private SettableFuture> findRpcResponse(RpcResponseKey key) { - return (SettableFuture>) responseCache.getIfPresent(key); + private ResponseExpectedRpcListener findRpcResponse(final RpcResponseKey key) { + return responseCache.getIfPresent(key); } @Override - public void setSystemListener(SystemNotificationsListener systemListener) { + public void setSystemListener(final SystemNotificationsListener systemListener) { this.systemListener = systemListener; } @@ -529,15 +435,11 @@ public class ConnectionAdapterImpl implements ConnectionFacade { } } - static class ResponseRemovalListener implements RemovalListener> { + static class ResponseRemovalListener implements RemovalListener> { @Override public void onRemoval( - RemovalNotification> notification) { - SettableFuture future = notification.getValue(); - if (!future.isDone()) { - LOG.warn("rpc response discarded: " + notification.getKey()); - future.cancel(true); - } + final RemovalNotification> notification) { + notification.getValue().discard(); } } @@ -565,7 +467,7 @@ public class ConnectionAdapterImpl implements ConnectionFacade { @Override public void setConnectionReadyListener( - ConnectionReadyListener connectionReadyListener) { + final ConnectionReadyListener connectionReadyListener) { this.connectionReadyListener = connectionReadyListener; } diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/ResponseExpectedRpcListener.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/ResponseExpectedRpcListener.java new file mode 100644 index 00000000..ea43d7e0 --- /dev/null +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/ResponseExpectedRpcListener.java @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2014 Pantheon Technologies s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.openflowjava.protocol.impl.connection; + +import java.util.concurrent.TimeoutException; + +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.cache.Cache; + +final class ResponseExpectedRpcListener extends AbstractRpcListener { + private static final Logger LOG = LoggerFactory.getLogger(ResponseExpectedRpcListener.class); + private final Cache> cache; + private final RpcResponseKey key; + + ResponseExpectedRpcListener(final String failureInfo, + final Cache> cache, final RpcResponseKey key) { + super(failureInfo); + this.cache = Preconditions.checkNotNull(cache); + this.key = Preconditions.checkNotNull(key); + } + + public void discard() { + LOG.warn("Request for {} did not receive a response", key); + failedRpc(new TimeoutException("Request timed out")); + } + + @SuppressWarnings("unchecked") + public void completed(final OfHeader message) { + successfulRpc((T)message); + } + + @Override + protected void operationSuccessful() { + LOG.debug("Request for {} sent successfully", key); + cache.put(key, this); + } +} diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/SimpleRpcListener.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/SimpleRpcListener.java new file mode 100644 index 00000000..1dc1f566 --- /dev/null +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/SimpleRpcListener.java @@ -0,0 +1,19 @@ +/* + * Copyright (c) 2014 Pantheon Technologies s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.openflowjava.protocol.impl.connection; + +final class SimpleRpcListener extends AbstractRpcListener { + public SimpleRpcListener(final String failureInfo) { + super(failureInfo); + } + + @Override + protected void operationSuccessful() { + successfulRpc(null); + } +} -- 2.36.6