<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-api-mockito</artifactId>
+ <version>1.5.2</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-module-junit4</artifactId>
+ <version>1.5.2</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
checkNotNull(getConnectionTimeoutMillis(), connectionTimeoutMillisJmxAttribute);
checkCondition(getConnectionTimeoutMillis() > 0, "must be > 0", connectionTimeoutMillisJmxAttribute);
- checkNotNull(getConnectionTimeoutMillis(), defaultRequestTimeoutMillisJmxAttribute);
- checkCondition(getConnectionTimeoutMillis() > 0, "must be > 0", defaultRequestTimeoutMillisJmxAttribute);
+ checkNotNull(getDefaultRequestTimeoutMillis(), defaultRequestTimeoutMillisJmxAttribute);
+ checkCondition(getDefaultRequestTimeoutMillis() > 0, "must be > 0", defaultRequestTimeoutMillisJmxAttribute);
checkNotNull(getBetweenAttemptsTimeoutMillis(), betweenAttemptsTimeoutMillisJmxAttribute);
checkCondition(getBetweenAttemptsTimeoutMillis() > 0, "must be > 0", betweenAttemptsTimeoutMillisJmxAttribute);
@Override
public DOMDataReadOnlyTransaction newReadOnlyTransaction() {
- return new ReadOnlyTx(netconfOps, id);
+ return new ReadOnlyTx(netconfOps, id, requestTimeoutMillis);
}
@Override
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+
import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
import org.opendaylight.yangtools.yang.data.api.schema.MixinNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private static final Logger LOG = LoggerFactory.getLogger(AbstractWriteTx.class);
- private final long defaultRequestTimeoutMillis;
+ protected final long defaultRequestTimeoutMillis;
protected final RemoteDeviceId id;
protected final NetconfBaseOps netOps;
protected final boolean rollbackSupport;
protected void invokeBlocking(final String msg, final Function<NetconfBaseOps, ListenableFuture<DOMRpcResult>> op) throws NetconfDocumentedException {
try {
- final DOMRpcResult compositeNodeRpcResult = op.apply(netOps).get(defaultRequestTimeoutMillis, TimeUnit.MILLISECONDS);
+ final DOMRpcResult compositeNodeRpcResult = op.apply(netOps).get();
if(isSuccess(compositeNodeRpcResult) == false) {
throw new NetconfDocumentedException(id + ": " + msg + " failed: " + compositeNodeRpcResult.getErrors(), NetconfDocumentedException.ErrorType.application,
NetconfDocumentedException.ErrorTag.operation_failed, NetconfDocumentedException.ErrorSeverity.warning);
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
- } catch (final ExecutionException | TimeoutException e) {
+ } catch (final ExecutionException e) {
throw new NetconfDocumentedException(id + ": " + msg + " failed: " + e.getMessage(), e, NetconfDocumentedException.ErrorType.application,
NetconfDocumentedException.ErrorTag.operation_failed, NetconfDocumentedException.ErrorSeverity.warning);
}
}
protected abstract void editConfig(DataContainerChild<?, ?> editStructure, Optional<ModifyAction> defaultOperation) throws NetconfDocumentedException;
+
+ protected ListenableFuture<DOMRpcResult> perfomRequestWithTimeout(String operation, ListenableFuture<DOMRpcResult> future) {
+ try {
+ future.get(defaultRequestTimeoutMillis, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.error("{}: {} failed with error", operation, id, e);
+ return Futures.immediateFailedCheckedFuture(new RuntimeException(id + ": " + operation + " failed"));
+ } catch (TimeoutException e) {
+ LOG.warn("{}: Unable to {} after {} milliseconds", id, operation, defaultRequestTimeoutMillis, e);
+ return Futures.immediateFailedCheckedFuture(new SchemaSourceException(e.getMessage()));
+ }
+ return future;
+ }
}
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
private final RemoteDeviceId id;
private final FutureCallback<DOMRpcResult> loggingCallback;
- public ReadOnlyTx(final NetconfBaseOps netconfOps, final RemoteDeviceId id) {
+ private final long requestTimeoutMillis;
+
+ public ReadOnlyTx(final NetconfBaseOps netconfOps, final RemoteDeviceId id, final long requestTimeoutMillis) {
this.netconfOps = netconfOps;
this.id = id;
+ this.requestTimeoutMillis = requestTimeoutMillis;
// Simple logging callback to log result of read operation
loggingCallback = new FutureCallback<DOMRpcResult>() {
} else {
LOG.warn("{}: Reading data unsuccessful: {}", id, result.getErrors());
}
-
}
@Override
}
});
+
+ if(!readWithTimeout("readConfigurationData", configRunning)) {
+ return null;
+ }
+
return MappingCheckedFuture.create(transformedFuture, ReadFailedException.MAPPER);
}
}
});
+ if(!readWithTimeout("readOperationalData", configCandidate)) {
+ return null;
+ }
+
return MappingCheckedFuture.create(transformedFuture, ReadFailedException.MAPPER);
}
public Object getIdentifier() {
return this;
}
+
+ private boolean readWithTimeout(String operation, ListenableFuture<DOMRpcResult> future) {
+ try {
+ future.get(requestTimeoutMillis, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.error("{}: {} failed with error", id, operation, e);
+ throw new RuntimeException(id + ": readOperationalData failed");
+ } catch (TimeoutException e) {
+ LOG.warn("{}: Unable to {} after {} milliseconds", id, operation, requestTimeoutMillis, e);
+ future.cancel(true);
+ return false;
+ }
+ return true;
+ }
}
package org.opendaylight.controller.sal.connect.netconf.sal.tx;
+
import com.google.common.base.Function;
import com.google.common.util.concurrent.ListenableFuture;
+
import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
import org.opendaylight.controller.sal.connect.netconf.util.NetconfBaseOps;
private static final Logger LOG = LoggerFactory.getLogger(WriteCandidateRunningTx.class);
- public WriteCandidateRunningTx(final RemoteDeviceId id, final NetconfBaseOps netOps, final boolean rollbackSupport, long requestTimeoutMillis) {
+ public WriteCandidateRunningTx(final RemoteDeviceId id, final NetconfBaseOps netOps, final boolean rollbackSupport, final long requestTimeoutMillis) {
super(id, netOps, rollbackSupport, requestTimeoutMillis);
}
}
private void lockRunning() {
+ final String operation = "Lock Running";
try {
- invokeBlocking("Lock running", new Function<NetconfBaseOps, ListenableFuture<DOMRpcResult>>() {
+ invokeBlocking(operation, new Function<NetconfBaseOps, ListenableFuture<DOMRpcResult>>() {
@Override
public ListenableFuture<DOMRpcResult> apply(final NetconfBaseOps input) {
- return input.lockRunning(new NetconfRpcFutureCallback("Lock running", id));
+ return perfomRequestWithTimeout(operation, input.lockRunning(new NetconfRpcFutureCallback(operation, id)));
+
}
});
} catch (final NetconfDocumentedException e) {
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+
import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
}
private void lock() throws NetconfDocumentedException {
+ final String operation = "Lock candidate";
try {
- invokeBlocking("Lock candidate", new Function<NetconfBaseOps, ListenableFuture<DOMRpcResult>>() {
+ invokeBlocking(operation, new Function<NetconfBaseOps, ListenableFuture<DOMRpcResult>>() {
@Override
public ListenableFuture<DOMRpcResult> apply(final NetconfBaseOps input) {
- return input.lockCandidate(new NetconfRpcFutureCallback("Lock candidate", id));
+ return perfomRequestWithTimeout(operation, input.lockCandidate(new NetconfRpcFutureCallback(operation, id)));
}
});
} catch (final NetconfDocumentedException e) {
@Override
protected void editConfig(final DataContainerChild<?, ?> editStructure, final Optional<ModifyAction> defaultOperation) throws NetconfDocumentedException {
- invokeBlocking("Edit candidate", new Function<NetconfBaseOps, ListenableFuture<DOMRpcResult>>() {
+ final String operation = "Edit candidate";
+ invokeBlocking(operation, new Function<NetconfBaseOps, ListenableFuture<DOMRpcResult>>() {
@Override
public ListenableFuture<DOMRpcResult> apply(final NetconfBaseOps input) {
- return defaultOperation.isPresent()
- ? input.editConfigCandidate(new NetconfRpcFutureCallback("Edit candidate", id), editStructure, defaultOperation.get(),
+
+ return perfomRequestWithTimeout(operation, defaultOperation.isPresent()
+ ? input.editConfigCandidate(new NetconfRpcFutureCallback(operation, id), editStructure, defaultOperation.get(),
rollbackSupport)
- : input.editConfigCandidate(new NetconfRpcFutureCallback("Edit candidate", id), editStructure,
- rollbackSupport);
+ : input.editConfigCandidate(new NetconfRpcFutureCallback(operation, id), editStructure,
+ rollbackSupport));
}
});
}
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+
import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
}
private void lock() {
+ final String operation = "Lock running";
try {
- invokeBlocking("Lock running", new Function<NetconfBaseOps, ListenableFuture<DOMRpcResult>>() {
+ invokeBlocking(operation, new Function<NetconfBaseOps, ListenableFuture<DOMRpcResult>>() {
@Override
public ListenableFuture<DOMRpcResult> apply(final NetconfBaseOps input) {
- return input.lockRunning(new NetconfRpcFutureCallback("Lock running", id));
+ return perfomRequestWithTimeout(operation, input.lockRunning(new NetconfRpcFutureCallback(operation, id)));
}
});
} catch (final NetconfDocumentedException e) {
@Override
public synchronized CheckedFuture<Void, TransactionCommitFailedException> submit() {
- final ListenableFuture<Void> commmitFutureAsVoid = Futures.transform(commit(), new Function<RpcResult<TransactionStatus>, Void>() {
+ final ListenableFuture<Void> commitFutureAsVoid = Futures.transform(commit(), new Function<RpcResult<TransactionStatus>, Void>() {
@Override
public Void apply(final RpcResult<TransactionStatus> input) {
return null;
}
});
- return Futures.makeChecked(commmitFutureAsVoid, new Function<Exception, TransactionCommitFailedException>() {
+ return Futures.makeChecked(commitFutureAsVoid, new Function<Exception, TransactionCommitFailedException>() {
@Override
public TransactionCommitFailedException apply(final Exception input) {
return new TransactionCommitFailedException("Submit of transaction " + getIdentifier() + " failed", input);
@Override
protected void editConfig(final DataContainerChild<?, ?> editStructure, final Optional<ModifyAction> defaultOperation) throws NetconfDocumentedException {
- invokeBlocking("Edit running", new Function<NetconfBaseOps, ListenableFuture<DOMRpcResult>>() {
+ final String operation = "Edit running";
+ invokeBlocking(operation, new Function<NetconfBaseOps, ListenableFuture<DOMRpcResult>>() {
@Override
public ListenableFuture<DOMRpcResult> apply(final NetconfBaseOps input) {
- return defaultOperation.isPresent()
- ? input.editConfigRunning(new NetconfRpcFutureCallback("Edit running", id), editStructure, defaultOperation.get(),
+ return perfomRequestWithTimeout(operation, defaultOperation.isPresent()
+ ? input.editConfigRunning(new NetconfRpcFutureCallback(operation, id), editStructure, defaultOperation.get(),
rollbackSupport)
- : input.editConfigRunning(new NetconfRpcFutureCallback("Edit running", id), editStructure,
- rollbackSupport);
+ : input.editConfigRunning(new NetconfRpcFutureCallback(operation, id), editStructure,
+ rollbackSupport));
}
});
}
private void unlock() {
+ final String operation = "Unlocking running";
try {
- invokeBlocking("Unlocking running", new Function<NetconfBaseOps, ListenableFuture<DOMRpcResult>>() {
+ invokeBlocking(operation, new Function<NetconfBaseOps, ListenableFuture<DOMRpcResult>>() {
@Override
public ListenableFuture<DOMRpcResult> apply(final NetconfBaseOps input) {
- return input.unlockRunning(new NetconfRpcFutureCallback("Unlock running", id));
+ return perfomRequestWithTimeout(operation, input.unlockRunning(new NetconfRpcFutureCallback(operation, id)));
}
});
} catch (final NetconfDocumentedException e) {
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizationException;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
import org.opendaylight.controller.md.sal.dom.spi.DefaultDOMRpcResult;
import org.opendaylight.controller.sal.connect.netconf.util.NetconfBaseOps;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+@PrepareForTest({NetconfBaseOps.class})
+@RunWith(PowerMockRunner.class)
public class ReadOnlyTxTest {
private static final YangInstanceIdentifier path = YangInstanceIdentifier.create();
public void testRead() throws Exception {
final NetconfBaseOps netconfOps = new NetconfBaseOps(rpc, mock(SchemaContext.class));
- final ReadOnlyTx readOnlyTx = new ReadOnlyTx(netconfOps, new RemoteDeviceId("a", new InetSocketAddress("localhost", 196)));
+ final ReadOnlyTx readOnlyTx = new ReadOnlyTx(netconfOps, new RemoteDeviceId("a", new InetSocketAddress("localhost", 196)), 60000L);
readOnlyTx.read(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.create());
verify(rpc).invokeRpc(Mockito.eq(NetconfMessageTransformUtil.toPath(NetconfMessageTransformUtil.NETCONF_GET_CONFIG_QNAME)), any(NormalizedNode.class));
readOnlyTx.read(LogicalDatastoreType.OPERATIONAL, path);
verify(rpc).invokeRpc(Mockito.eq(NetconfMessageTransformUtil.toPath(NetconfMessageTransformUtil.NETCONF_GET_QNAME)), any(NormalizedNode.class));
}
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testReadTimeout() throws Exception {
+ final ListenableFuture<DOMRpcResult> future = mock(ListenableFuture.class);
+
+ Mockito.when(future.get(Mockito.anyLong(), any(TimeUnit.class))).then(new Answer<DOMRpcResult>() {
+ @Override
+ public DOMRpcResult answer(InvocationOnMock invocation)
+ throws Throwable {
+ throw new TimeoutException("Processing Timeout");
+ }
+ });
+
+ final NetconfBaseOps netconfOps = PowerMockito.mock(NetconfBaseOps.class);
+ Mockito.when(netconfOps.getConfigRunning(any(FutureCallback.class), any(Optional.class))).thenReturn(future);
+
+
+ final ReadOnlyTx readOnlyTx = new ReadOnlyTx(netconfOps, new RemoteDeviceId("a", new InetSocketAddress("localhost", 196)), 100L);
+ Assert.assertNull("Read operation didn't correctly timeout", readOnlyTx.read(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.create()));
+ readOnlyTx.close();
+ }
}
\ No newline at end of file