Bug 4940 - correctly implement default-request-timeout-millis 09/33209/1
authoradetalhouet <adetalhouet@inocybe.com>
Thu, 21 Jan 2016 13:30:19 +0000 (08:30 -0500)
committerAlexis de Talhouët <adetalhouet@inocybe.com>
Thu, 21 Jan 2016 13:38:26 +0000 (13:38 +0000)
Change-Id: I5eadfdaff9f67ef79b4fae47135012865f40a9eb
Signed-off-by: adetalhouet <adetalhouet@inocybe.com>
opendaylight/netconf/sal-netconf-connector/pom.xml
opendaylight/netconf/sal-netconf-connector/src/main/java/org/opendaylight/controller/config/yang/md/sal/connector/netconf/NetconfConnectorModule.java
opendaylight/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/sal/NetconfDeviceDataBroker.java
opendaylight/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/sal/tx/AbstractWriteTx.java
opendaylight/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/sal/tx/ReadOnlyTx.java
opendaylight/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/sal/tx/WriteCandidateRunningTx.java
opendaylight/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/sal/tx/WriteCandidateTx.java
opendaylight/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/sal/tx/WriteRunningTx.java
opendaylight/netconf/sal-netconf-connector/src/test/java/org/opendaylight/netconf/sal/connect/netconf/sal/tx/ReadOnlyTxTest.java

index d53eee48b8c683860a6ca96f82e7053e1511e13b..c00b937d5ddb1f4280570ec34bcf28efe9feee88 100644 (file)
         <artifactId>mockito-core</artifactId>
         <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.powermock</groupId>
+      <artifactId>powermock-api-mockito</artifactId>
+      <version>1.5.2</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.powermock</groupId>
+      <artifactId>powermock-module-junit4</artifactId>
+      <version>1.5.2</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
index 9df4ad42efefcd71fdebb277d3599aa112e27d17..dffd7f249ecd92157f66ba6c456060bd03718af3 100644 (file)
@@ -101,8 +101,8 @@ public final class NetconfConnectorModule extends org.opendaylight.controller.co
         checkNotNull(getConnectionTimeoutMillis(), connectionTimeoutMillisJmxAttribute);
         checkCondition(getConnectionTimeoutMillis() > 0, "must be > 0", connectionTimeoutMillisJmxAttribute);
 
-        checkNotNull(getConnectionTimeoutMillis(), defaultRequestTimeoutMillisJmxAttribute);
-        checkCondition(getConnectionTimeoutMillis() > 0, "must be > 0", defaultRequestTimeoutMillisJmxAttribute);
+        checkNotNull(getDefaultRequestTimeoutMillis(), defaultRequestTimeoutMillisJmxAttribute);
+        checkCondition(getDefaultRequestTimeoutMillis() > 0, "must be > 0", defaultRequestTimeoutMillisJmxAttribute);
 
         checkNotNull(getBetweenAttemptsTimeoutMillis(), betweenAttemptsTimeoutMillisJmxAttribute);
         checkCondition(getBetweenAttemptsTimeoutMillis() > 0, "must be > 0", betweenAttemptsTimeoutMillisJmxAttribute);
index 093abde49d4b24c24c7ada62c40019a9f52e55e3..7283cae9b85b9446a8e6b284e465f18ba7fb54c2 100644 (file)
@@ -57,7 +57,7 @@ public final class NetconfDeviceDataBroker implements DOMDataBroker {
 
     @Override
     public DOMDataReadOnlyTransaction newReadOnlyTransaction() {
-        return new ReadOnlyTx(netconfOps, id);
+        return new ReadOnlyTx(netconfOps, id, requestTimeoutMillis);
     }
 
     @Override
index bf0bfaf44b504d398269a5f2566edefa551a4b94..a12ba3a02abbc5b0a920fe03af58428c8d47b3a2 100644 (file)
@@ -11,6 +11,7 @@ package org.opendaylight.netconf.sal.connect.netconf.sal.tx;
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -28,6 +29,7 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
 import org.opendaylight.yangtools.yang.data.api.schema.MixinNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,7 +37,7 @@ public abstract class AbstractWriteTx implements DOMDataWriteTransaction {
 
     private static final Logger LOG  = LoggerFactory.getLogger(AbstractWriteTx.class);
 
-    private final long defaultRequestTimeoutMillis;
+    protected final long defaultRequestTimeoutMillis;
     protected final RemoteDeviceId id;
     protected final NetconfBaseOps netOps;
     protected final boolean rollbackSupport;
@@ -64,7 +66,7 @@ public abstract class AbstractWriteTx implements DOMDataWriteTransaction {
 
     protected void invokeBlocking(final String msg, final Function<NetconfBaseOps, ListenableFuture<DOMRpcResult>> op) throws NetconfDocumentedException {
         try {
-            final DOMRpcResult compositeNodeRpcResult = op.apply(netOps).get(defaultRequestTimeoutMillis, TimeUnit.MILLISECONDS);
+            final DOMRpcResult compositeNodeRpcResult = op.apply(netOps).get();
             if(isSuccess(compositeNodeRpcResult) == false) {
                 throw new NetconfDocumentedException(id + ": " + msg + " failed: " + compositeNodeRpcResult.getErrors(), NetconfDocumentedException.ErrorType.application,
                         NetconfDocumentedException.ErrorTag.operation_failed, NetconfDocumentedException.ErrorSeverity.warning);
@@ -72,7 +74,7 @@ public abstract class AbstractWriteTx implements DOMDataWriteTransaction {
         } catch (final InterruptedException e) {
             Thread.currentThread().interrupt();
             throw new RuntimeException(e);
-        } catch (final ExecutionException | TimeoutException e) {
+        } catch (final ExecutionException e) {
             throw new NetconfDocumentedException(id + ": " + msg + " failed: " + e.getMessage(), e, NetconfDocumentedException.ErrorType.application,
                     NetconfDocumentedException.ErrorTag.operation_failed, NetconfDocumentedException.ErrorSeverity.warning);
         }
@@ -174,4 +176,18 @@ public abstract class AbstractWriteTx implements DOMDataWriteTransaction {
     }
 
     protected abstract void editConfig(DataContainerChild<?, ?> editStructure, Optional<ModifyAction> defaultOperation) throws NetconfDocumentedException;
+
+
+    protected ListenableFuture<DOMRpcResult> perfomRequestWithTimeout(String operation, ListenableFuture<DOMRpcResult> future) {
+        try {
+            future.get(defaultRequestTimeoutMillis, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException | ExecutionException e) {
+            LOG.error("{}: {} failed with error", operation, id, e);
+            return Futures.immediateFailedCheckedFuture(new RuntimeException(id + ": " + operation + " failed"));
+        } catch (TimeoutException e) {
+            LOG.warn("{}: Unable to {} after {} milliseconds", id, operation, defaultRequestTimeoutMillis, e);
+            return Futures.immediateFailedCheckedFuture(new SchemaSourceException(e.getMessage()));
+        }
+        return future;
+    }
 }
index dda54851094e612bf97a86c3020349427a384e8f..28d75cdaa4dd3210d9686f391e211abb4a22ad5c 100644 (file)
@@ -16,6 +16,9 @@ import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
@@ -41,9 +44,12 @@ public final class ReadOnlyTx implements DOMDataReadOnlyTransaction {
     private final RemoteDeviceId id;
     private final FutureCallback<DOMRpcResult> loggingCallback;
 
-    public ReadOnlyTx(final NetconfBaseOps netconfOps, final RemoteDeviceId id) {
+    private final long requestTimeoutMillis;
+
+    public ReadOnlyTx(final NetconfBaseOps netconfOps, final RemoteDeviceId id, final long requestTimeoutMillis) {
         this.netconfOps = netconfOps;
         this.id = id;
+        this.requestTimeoutMillis = requestTimeoutMillis;
 
         // Simple logging callback to log result of read operation
         loggingCallback = new FutureCallback<DOMRpcResult>() {
@@ -78,6 +84,10 @@ public final class ReadOnlyTx implements DOMDataReadOnlyTransaction {
             }
         });
 
+        if(!readWithTimeout("readConfigurationData", configRunning)) {
+            return null;
+        }
+
         return MappingCheckedFuture.create(transformedFuture, ReadFailedException.MAPPER);
     }
 
@@ -109,6 +119,10 @@ public final class ReadOnlyTx implements DOMDataReadOnlyTransaction {
             }
         });
 
+        if(!readWithTimeout("readOperationalData", configCandidate)) {
+            return null;
+        }
+
         return MappingCheckedFuture.create(transformedFuture, ReadFailedException.MAPPER);
     }
 
@@ -147,4 +161,18 @@ public final class ReadOnlyTx implements DOMDataReadOnlyTransaction {
     public Object getIdentifier() {
         return this;
     }
+
+    private boolean readWithTimeout(String operation, ListenableFuture<DOMRpcResult> future) {
+        try {
+            future.get(requestTimeoutMillis, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException | ExecutionException e) {
+            LOG.error("{}: {} failed with error", id, operation, e);
+            throw new RuntimeException(id + ": readOperationalData failed");
+        } catch (TimeoutException e) {
+            LOG.warn("{}: Unable to {} after {} milliseconds", id, operation, requestTimeoutMillis, e);
+            future.cancel(true);
+            return false;
+        }
+        return true;
+    }
 }
index 3004801abbeb36aae86ca3c920ead85c0a4e91cd..0b9bb98e9c75b7ad539313c677c418fe666f790b 100644 (file)
@@ -29,7 +29,7 @@ public class WriteCandidateRunningTx extends WriteCandidateTx {
 
     private static final Logger LOG  = LoggerFactory.getLogger(WriteCandidateRunningTx.class);
 
-    public WriteCandidateRunningTx(final RemoteDeviceId id, final NetconfBaseOps netOps, final boolean rollbackSupport, long requestTimeoutMillis) {
+    public WriteCandidateRunningTx(final RemoteDeviceId id, final NetconfBaseOps netOps, final boolean rollbackSupport, final long requestTimeoutMillis) {
         super(id, netOps, rollbackSupport, requestTimeoutMillis);
     }
 
@@ -46,11 +46,13 @@ public class WriteCandidateRunningTx extends WriteCandidateTx {
     }
 
     private void lockRunning() {
+        final String operation = "Lock Running";
         try {
-            invokeBlocking("Lock running", new Function<NetconfBaseOps, ListenableFuture<DOMRpcResult>>() {
+            invokeBlocking(operation, new Function<NetconfBaseOps, ListenableFuture<DOMRpcResult>>() {
                 @Override
                 public ListenableFuture<DOMRpcResult> apply(final NetconfBaseOps input) {
-                    return input.lockRunning(new NetconfRpcFutureCallback("Lock running", id));
+                    return perfomRequestWithTimeout(operation, input.lockRunning(new NetconfRpcFutureCallback(operation, id)));
+
                 }
             });
         } catch (final NetconfDocumentedException e) {
index b40836583db54fbca287bcfc50309cc9eea10a31..f607089dc01b9a1881ed9d0d7fda7e9f621f5195 100644 (file)
@@ -95,11 +95,12 @@ public class WriteCandidateTx extends AbstractWriteTx {
     }
 
     private void lock() throws NetconfDocumentedException {
+        final String operation = "Lock candidate";
         try {
-            invokeBlocking("Lock candidate", new Function<NetconfBaseOps, ListenableFuture<DOMRpcResult>>() {
+            invokeBlocking(operation, new Function<NetconfBaseOps, ListenableFuture<DOMRpcResult>>() {
                 @Override
                 public ListenableFuture<DOMRpcResult> apply(final NetconfBaseOps input) {
-                    return input.lockCandidate(new NetconfRpcFutureCallback("Lock candidate", id));
+                    return perfomRequestWithTimeout(operation, input.lockCandidate(new NetconfRpcFutureCallback(operation, id)));
                 }
             });
         } catch (final NetconfDocumentedException e) {
@@ -185,14 +186,16 @@ public class WriteCandidateTx extends AbstractWriteTx {
 
     @Override
     protected void editConfig(final DataContainerChild<?, ?> editStructure, final Optional<ModifyAction> defaultOperation) throws NetconfDocumentedException {
-        invokeBlocking("Edit candidate", new Function<NetconfBaseOps, ListenableFuture<DOMRpcResult>>() {
+        final String operation = "Edit candidate";
+        invokeBlocking(operation, new Function<NetconfBaseOps, ListenableFuture<DOMRpcResult>>() {
             @Override
             public ListenableFuture<DOMRpcResult> apply(final NetconfBaseOps input) {
-                    return defaultOperation.isPresent()
-                            ? input.editConfigCandidate(new NetconfRpcFutureCallback("Edit candidate", id), editStructure, defaultOperation.get(),
-                            rollbackSupport)
-                            : input.editConfigCandidate(new NetconfRpcFutureCallback("Edit candidate", id), editStructure,
-                            rollbackSupport);
+
+                        return perfomRequestWithTimeout(operation, defaultOperation.isPresent()
+                                ? input.editConfigCandidate(new NetconfRpcFutureCallback(operation, id), editStructure, defaultOperation.get(),
+                                rollbackSupport)
+                                : input.editConfigCandidate(new NetconfRpcFutureCallback(operation, id), editStructure,
+                                rollbackSupport));
             }
         });
     }
index 1799619048268a464e46fa3a53c517e071311a82..1d0ff653ef0d85abf515da0c409ab0a861c35636 100644 (file)
@@ -61,11 +61,12 @@ public class WriteRunningTx extends AbstractWriteTx {
     }
 
     private void lock() {
+        final String operation = "Lock running";
         try {
-            invokeBlocking("Lock running", new Function<NetconfBaseOps, ListenableFuture<DOMRpcResult>>() {
+            invokeBlocking(operation, new Function<NetconfBaseOps, ListenableFuture<DOMRpcResult>>() {
                 @Override
                 public ListenableFuture<DOMRpcResult> apply(final NetconfBaseOps input) {
-                    return input.lockRunning(new NetconfRpcFutureCallback("Lock running", id));
+                    return perfomRequestWithTimeout(operation, input.lockRunning(new NetconfRpcFutureCallback(operation, id)));
                 }
             });
         } catch (final NetconfDocumentedException e) {
@@ -96,14 +97,14 @@ public class WriteRunningTx extends AbstractWriteTx {
 
     @Override
     public synchronized CheckedFuture<Void, TransactionCommitFailedException> submit() {
-        final ListenableFuture<Void> commmitFutureAsVoid = Futures.transform(commit(), new Function<RpcResult<TransactionStatus>, Void>() {
+        final ListenableFuture<Void> commitFutureAsVoid = Futures.transform(commit(), new Function<RpcResult<TransactionStatus>, Void>() {
             @Override
             public Void apply(final RpcResult<TransactionStatus> input) {
                 return null;
             }
         });
 
-        return Futures.makeChecked(commmitFutureAsVoid, new Function<Exception, TransactionCommitFailedException>() {
+        return Futures.makeChecked(commitFutureAsVoid, new Function<Exception, TransactionCommitFailedException>() {
             @Override
             public TransactionCommitFailedException apply(final Exception input) {
                 return new TransactionCommitFailedException("Submit of transaction " + getIdentifier() + " failed", input);
@@ -119,24 +120,26 @@ public class WriteRunningTx extends AbstractWriteTx {
 
     @Override
     protected void editConfig(final DataContainerChild<?, ?> editStructure, final Optional<ModifyAction> defaultOperation) throws NetconfDocumentedException {
-        invokeBlocking("Edit running", new Function<NetconfBaseOps, ListenableFuture<DOMRpcResult>>() {
+        final String operation = "Edit running";
+        invokeBlocking(operation, new Function<NetconfBaseOps, ListenableFuture<DOMRpcResult>>() {
             @Override
             public ListenableFuture<DOMRpcResult> apply(final NetconfBaseOps input) {
-                    return defaultOperation.isPresent()
-                            ? input.editConfigRunning(new NetconfRpcFutureCallback("Edit running", id), editStructure, defaultOperation.get(),
-                            rollbackSupport)
-                            : input.editConfigRunning(new NetconfRpcFutureCallback("Edit running", id), editStructure,
-                            rollbackSupport);
+                        return perfomRequestWithTimeout(operation, defaultOperation.isPresent()
+                                ? input.editConfigRunning(new NetconfRpcFutureCallback(operation, id), editStructure, defaultOperation.get(),
+                                rollbackSupport)
+                                : input.editConfigRunning(new NetconfRpcFutureCallback(operation, id), editStructure,
+                                rollbackSupport));
             }
         });
     }
 
     private void unlock() {
+        final String operation = "Unlocking running";
         try {
-            invokeBlocking("Unlocking running", new Function<NetconfBaseOps, ListenableFuture<DOMRpcResult>>() {
+            invokeBlocking(operation, new Function<NetconfBaseOps, ListenableFuture<DOMRpcResult>>() {
                 @Override
                 public ListenableFuture<DOMRpcResult> apply(final NetconfBaseOps input) {
-                    return input.unlockRunning(new NetconfRpcFutureCallback("Unlock running", id));
+                    return perfomRequestWithTimeout(operation, input.unlockRunning(new NetconfRpcFutureCallback(operation, id)));
                 }
             });
         } catch (final NetconfDocumentedException e) {
index 2ea94f47a2fbcd5c8da74708e5ef9e6e90181456..c005b5bf232c49730c8c726eda330f695b3a222c 100644 (file)
@@ -13,15 +13,27 @@ import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
 import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizationException;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
 import org.opendaylight.controller.md.sal.dom.spi.DefaultDOMRpcResult;
 import org.opendaylight.netconf.sal.connect.netconf.util.NetconfBaseOps;
@@ -31,7 +43,12 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
 
+@PrepareForTest({NetconfBaseOps.class})
+@RunWith(PowerMockRunner.class)
 public class ReadOnlyTxTest {
 
     private static final YangInstanceIdentifier path = YangInstanceIdentifier.create();
@@ -53,11 +70,33 @@ public class ReadOnlyTxTest {
     public void testRead() throws Exception {
         final NetconfBaseOps netconfOps = new NetconfBaseOps(rpc, mock(SchemaContext.class));
 
-        final ReadOnlyTx readOnlyTx = new ReadOnlyTx(netconfOps, new RemoteDeviceId("a", new InetSocketAddress("localhost", 196)));
+        final ReadOnlyTx readOnlyTx = new ReadOnlyTx(netconfOps, new RemoteDeviceId("a", new InetSocketAddress("localhost", 196)), 60000L);
 
         readOnlyTx.read(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.create());
         verify(rpc).invokeRpc(Mockito.eq(NetconfMessageTransformUtil.toPath(NetconfMessageTransformUtil.NETCONF_GET_CONFIG_QNAME)), any(NormalizedNode.class));
         readOnlyTx.read(LogicalDatastoreType.OPERATIONAL, path);
         verify(rpc).invokeRpc(Mockito.eq(NetconfMessageTransformUtil.toPath(NetconfMessageTransformUtil.NETCONF_GET_QNAME)), any(NormalizedNode.class));
     }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testReadTimeout() throws Exception {
+        final ListenableFuture<DOMRpcResult> future = mock(ListenableFuture.class);
+
+        Mockito.when(future.get(Mockito.anyLong(), any(TimeUnit.class))).then(new Answer<DOMRpcResult>() {
+            @Override
+            public DOMRpcResult answer(InvocationOnMock invocation)
+                    throws Throwable {
+                throw new TimeoutException("Processing Timeout");
+            }
+        });
+
+        final NetconfBaseOps netconfOps = PowerMockito.mock(NetconfBaseOps.class);
+        Mockito.when(netconfOps.getConfigRunning(any(FutureCallback.class), any(Optional.class))).thenReturn(future);
+
+
+        final ReadOnlyTx readOnlyTx = new ReadOnlyTx(netconfOps, new RemoteDeviceId("a", new InetSocketAddress("localhost", 196)), 100L);
+        Assert.assertNull("Read operation didn't correctly timeout", readOnlyTx.read(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.create()));
+        readOnlyTx.close();
+    }
 }
\ No newline at end of file