Bug 8152: Transaction is already opened 66/55866/2
authorAndrej Mak <andrej.mak@pantheon.tech>
Wed, 19 Apr 2017 06:12:36 +0000 (08:12 +0200)
committerAndrej Mak <andrej.mak@pantheon.tech>
Mon, 24 Apr 2017 08:10:12 +0000 (08:10 +0000)
This issue happens, when for some reason transaction
submit or cancel message isn't delivered to master
node. With current implementation, only one device
transaction can be opened at the time, so submit or
cancel delivery failure will lock device forever.

To prevent this, this patch introduces write trancaction
idle timeout. Write transaction actor will be stopped
and its device transaction cancelled, when no message
is received for given time. Cancellation unlocks device,
so mountpouint is usable again.

Change-Id: I37bef30038cf6fd10fa5149a3fa949540ac16eab
Signed-off-by: Andrej Mak <andrej.mak@pantheon.tech>
(cherry picked from commit 95dc1455a7303eac56c755d01a37ca1f203543c0)

netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/NetconfTopologyManager.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/NetconfNodeActor.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/WriteTransactionActor.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/utils/NetconfTopologySetup.java
netconf/netconf-topology-singleton/src/main/resources/org/opendaylight/blueprint/netconf-topology-singleton.xml
netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/NetconfTopologyManagerTest.java
netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/actors/WriteTransactionActorTest.java
netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/WriteOnlyTransactionTest.java

index b932cd4f44a73bfe8468f43f012b926b816351aa..734461ac9c00c14abd980958f119dd5095f8405d 100644 (file)
@@ -17,6 +17,7 @@ import io.netty.util.concurrent.EventExecutor;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import javax.annotation.Nonnull;
 import org.opendaylight.controller.cluster.ActorSystemProvider;
 import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
@@ -35,7 +36,6 @@ import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvid
 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
 import org.opendaylight.netconf.client.NetconfClientDispatcher;
-import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
 import org.opendaylight.netconf.topology.singleton.api.NetconfTopologySingletonService;
 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder;
@@ -77,13 +77,15 @@ public class NetconfTopologyManager
     private final EventExecutor eventExecutor;
     private final NetconfClientDispatcher clientDispatcher;
     private final String topologyId;
+    private final Duration writeTxIdleTimeout;
 
     public NetconfTopologyManager(final DataBroker dataBroker, final RpcProviderRegistry rpcProviderRegistry,
                                   final ClusterSingletonServiceProvider clusterSingletonServiceProvider,
                                   final BindingAwareBroker bindingAwareBroker,
                                   final ScheduledThreadPool keepaliveExecutor, final ThreadPool processingExecutor,
                                   final Broker domBroker, final ActorSystemProvider actorSystemProvider, final EventExecutor eventExecutor,
-                                  final NetconfClientDispatcher clientDispatcher, final String topologyId) {
+                                  final NetconfClientDispatcher clientDispatcher, final String topologyId,
+                                  final int writeTxIdleTimeout) {
         this.dataBroker = Preconditions.checkNotNull(dataBroker);
         this.rpcProviderRegistry = Preconditions.checkNotNull(rpcProviderRegistry);
         this.clusterSingletonServiceProvider = Preconditions.checkNotNull(clusterSingletonServiceProvider);
@@ -95,6 +97,7 @@ public class NetconfTopologyManager
         this.eventExecutor = Preconditions.checkNotNull(eventExecutor);
         this.clientDispatcher = Preconditions.checkNotNull(clientDispatcher);
         this.topologyId = Preconditions.checkNotNull(topologyId);
+        this.writeTxIdleTimeout = Duration.apply(writeTxIdleTimeout, TimeUnit.SECONDS);
     }
 
     // Blueprint init method
@@ -244,7 +247,8 @@ public class NetconfTopologyManager
                 .setProcessingExecutor(processingExecutor)
                 .setTopologyId(topologyId)
                 .setNetconfClientDispatcher(clientDispatcher)
-                .setSchemaResourceDTO(NetconfTopologyUtils.setupSchemaCacheDTO(node));
+                .setSchemaResourceDTO(NetconfTopologyUtils.setupSchemaCacheDTO(node))
+                .setIdleTimeout(writeTxIdleTimeout);
 
         return builder.build();
     }
index 9828e2476fcf7931295fbad29943b067e623548f..8dd5e39d1e57078d92a656173a8b2038bc5fa09c 100644 (file)
@@ -62,20 +62,22 @@ import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.Duration;
 
 public class NetconfNodeActor extends UntypedActor {
 
     private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeActor.class);
 
-    private NetconfTopologySetup setup;
-    private RemoteDeviceId id;
     private final SchemaSourceRegistry schemaRegistry;
     private final SchemaRepository schemaRepository;
+    private final Timeout actorResponseWaitTime;
+    private final Duration writeTxIdleTimeout;
 
+    private RemoteDeviceId id;
+    private NetconfTopologySetup setup;
     private List<SourceIdentifier> sourceIdentifiers;
     private DOMRpcService deviceRpc;
     private SlaveSalFacade slaveSalManager;
-    private final Timeout actorResponseWaitTime;
     private DOMDataBroker deviceDataBroker;
     //readTxActor can be shared
     private ActorRef readTxActor;
@@ -95,6 +97,7 @@ public class NetconfNodeActor extends UntypedActor {
         this.schemaRegistry = schemaRegistry;
         this.schemaRepository = schemaRepository;
         this.actorResponseWaitTime = actorResponseWaitTime;
+        this.writeTxIdleTimeout = setup.getIdleTimeout();
     }
 
     @Override
@@ -134,7 +137,7 @@ public class NetconfNodeActor extends UntypedActor {
         } else if (message instanceof NewWriteTransactionRequest) { // master
             try {
                 final DOMDataWriteTransaction tx = deviceDataBroker.newWriteOnlyTransaction();
-                final ActorRef txActor = context().actorOf(WriteTransactionActor.props(tx));
+                final ActorRef txActor = context().actorOf(WriteTransactionActor.props(tx, writeTxIdleTimeout));
                 sender().tell(new NewWriteTransactionReply(txActor), self());
             } catch (final Throwable t) {
                 sender().tell(t, self());
index 5350b6478ed317e6be5e18e6b6f58a7a76435912..008559ec9bfc39b4c0b930f07b5c836ec7bb50c2 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.netconf.topology.singleton.impl.actors;
 
 import akka.actor.ActorRef;
 import akka.actor.Props;
+import akka.actor.ReceiveTimeout;
 import akka.actor.UntypedActor;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
@@ -25,26 +26,37 @@ import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRe
 import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
 import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitReply;
 import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.Duration;
 
 /**
  * WriteTransactionActor is an interface to device's {@link DOMDataReadOnlyTransaction} for cluster nodes.
  */
 public class WriteTransactionActor extends UntypedActor {
 
+    private static final Logger LOG = LoggerFactory.getLogger(WriteTransactionActor.class);
+
     private final DOMDataWriteTransaction tx;
+    private final long idleTimeout;
 
     /**
      * Creates new actor Props.
      *
      * @param tx delegate device write transaction
+     * @param idleTimeout idle time in seconds, after which transaction is closed automatically
      * @return props
      */
-    static Props props(final DOMDataWriteTransaction tx) {
-        return Props.create(WriteTransactionActor.class, () -> new WriteTransactionActor(tx));
+    static Props props(final DOMDataWriteTransaction tx, final Duration idleTimeout) {
+        return Props.create(WriteTransactionActor.class, () -> new WriteTransactionActor(tx, idleTimeout));
     }
 
-    private WriteTransactionActor(final DOMDataWriteTransaction tx) {
+    private WriteTransactionActor(final DOMDataWriteTransaction tx, final Duration idleTimeout) {
         this.tx = tx;
+        this.idleTimeout = idleTimeout.toSeconds();
+        if (this.idleTimeout > 0) {
+            context().setReceiveTimeout(idleTimeout);
+        }
     }
 
     @Override
@@ -64,6 +76,11 @@ public class WriteTransactionActor extends UntypedActor {
             cancel();
         } else if (message instanceof SubmitRequest) {
             submit(sender(), self());
+        } else if (message instanceof ReceiveTimeout) {
+            LOG.warn("Haven't received any message for {} seconds, cancelling transaction and stopping actor",
+                    idleTimeout);
+            tx.cancel();
+            context().stop(self());
         } else {
             unhandled(message);
         }
index 9ee287d97dfa092a6306c88e2ad8be7502d69e04..be57ce35aa7b0ae6dcf4608f696653cc8d263e5a 100644 (file)
@@ -21,6 +21,7 @@ import org.opendaylight.netconf.client.NetconfClientDispatcher;
 import org.opendaylight.netconf.sal.connect.netconf.NetconfDevice;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import scala.concurrent.duration.Duration;
 
 public class NetconfTopologySetup {
 
@@ -38,6 +39,7 @@ public class NetconfTopologySetup {
     private final NetconfClientDispatcher netconfClientDispatcher;
     private final String topologyId;
     private final NetconfDevice.SchemaResourcesDTO schemaResourceDTO;
+    private final Duration idleTimeout;
 
     private NetconfTopologySetup(final NetconfTopologySetupBuilder builder) {
         this.clusterSingletonServiceProvider = builder.getClusterSingletonServiceProvider();
@@ -54,6 +56,7 @@ public class NetconfTopologySetup {
         this.netconfClientDispatcher = builder.getNetconfClientDispatcher();
         this.topologyId = builder.getTopologyId();
         this.schemaResourceDTO = builder.getSchemaResourceDTO();
+        this.idleTimeout = builder.getIdleTimeout();
     }
 
     public ClusterSingletonServiceProvider getClusterSingletonServiceProvider() {
@@ -112,6 +115,10 @@ public class NetconfTopologySetup {
         return  schemaResourceDTO;
     }
 
+    public Duration getIdleTimeout() {
+        return idleTimeout;
+    }
+
     public static class NetconfTopologySetupBuilder {
 
         private ClusterSingletonServiceProvider clusterSingletonServiceProvider;
@@ -128,6 +135,7 @@ public class NetconfTopologySetup {
         private String topologyId;
         private NetconfClientDispatcher netconfClientDispatcher;
         private NetconfDevice.SchemaResourcesDTO schemaResourceDTO;
+        private Duration idleTimeout;
 
         public NetconfTopologySetupBuilder(){
         }
@@ -186,7 +194,7 @@ public class NetconfTopologySetup {
             return bindingAwareBroker;
         }
 
-        public NetconfTopologySetupBuilder setBindingAwareBroker(BindingAwareBroker bindingAwareBroker) {
+        public NetconfTopologySetupBuilder setBindingAwareBroker(final BindingAwareBroker bindingAwareBroker) {
             this.bindingAwareBroker = bindingAwareBroker;
             return this;
         }
@@ -195,7 +203,7 @@ public class NetconfTopologySetup {
             return keepaliveExecutor;
         }
 
-        public NetconfTopologySetupBuilder setKeepaliveExecutor(ScheduledThreadPool keepaliveExecutor) {
+        public NetconfTopologySetupBuilder setKeepaliveExecutor(final ScheduledThreadPool keepaliveExecutor) {
             this.keepaliveExecutor = keepaliveExecutor;
             return this;
         }
@@ -204,7 +212,7 @@ public class NetconfTopologySetup {
             return processingExecutor;
         }
 
-        public NetconfTopologySetupBuilder setProcessingExecutor(ThreadPool processingExecutor) {
+        public NetconfTopologySetupBuilder setProcessingExecutor(final ThreadPool processingExecutor) {
             this.processingExecutor = processingExecutor;
             return this;
         }
@@ -213,7 +221,7 @@ public class NetconfTopologySetup {
             return domBroker;
         }
 
-        public NetconfTopologySetupBuilder setDomBroker(Broker domBroker) {
+        public NetconfTopologySetupBuilder setDomBroker(final Broker domBroker) {
             this.domBroker = domBroker;
             return this;
         }
@@ -222,7 +230,7 @@ public class NetconfTopologySetup {
             return actorSystem;
         }
 
-        public NetconfTopologySetupBuilder setActorSystem(ActorSystem actorSystem) {
+        public NetconfTopologySetupBuilder setActorSystem(final ActorSystem actorSystem) {
             this.actorSystem = actorSystem;
             return this;
         }
@@ -231,7 +239,7 @@ public class NetconfTopologySetup {
             return eventExecutor;
         }
 
-        public NetconfTopologySetupBuilder setEventExecutor(EventExecutor eventExecutor) {
+        public NetconfTopologySetupBuilder setEventExecutor(final EventExecutor eventExecutor) {
             this.eventExecutor = eventExecutor;
             return this;
         }
@@ -240,7 +248,7 @@ public class NetconfTopologySetup {
             return topologyId;
         }
 
-        public NetconfTopologySetupBuilder setTopologyId(String topologyId) {
+        public NetconfTopologySetupBuilder setTopologyId(final String topologyId) {
             this.topologyId = topologyId;
             return this;
         }
@@ -249,7 +257,7 @@ public class NetconfTopologySetup {
             return netconfClientDispatcher;
         }
 
-        public NetconfTopologySetupBuilder setNetconfClientDispatcher(NetconfClientDispatcher clientDispatcher) {
+        public NetconfTopologySetupBuilder setNetconfClientDispatcher(final NetconfClientDispatcher clientDispatcher) {
             this.netconfClientDispatcher = clientDispatcher;
             return this;
         }
@@ -264,6 +272,15 @@ public class NetconfTopologySetup {
             return schemaResourceDTO;
         }
 
+        public NetconfTopologySetupBuilder setIdleTimeout(final Duration idleTimeout) {
+            this.idleTimeout = idleTimeout;
+            return this;
+        }
+
+        private Duration getIdleTimeout() {
+            return idleTimeout;
+        }
+
         public static NetconfTopologySetupBuilder create() {
             return new NetconfTopologySetupBuilder();
         }
index ea9ed8131ac4f0bb650d6ec2d1c423175588caaf..0997ea5af2daa94ee7114a4079f7564946ac94f1 100644 (file)
@@ -50,6 +50,7 @@ and is available at http://www.eclipse.org/legal/epl-v10.html
         <argument ref="eventExecutor"/>
         <argument ref="clientDispatcherDependency"/>
         <argument value="topology-netconf"/>
+        <argument value="0"/>
     </bean>
     <service ref="netconfTopologyManager"
              interface="org.opendaylight.netconf.topology.singleton.api.NetconfTopologySingletonService"/>
index df398f3fb26181cd0d1205f71e17ecd4b79208ef..99ef75f2a4a488b304269a0c1a061bf09b1d8a17 100644 (file)
@@ -86,8 +86,9 @@ public class NetconfTopologyManagerTest {
 
         netconfTopologyManager = new NetconfTopologyManager(dataBroker, rpcProviderRegistry,
                 clusterSingletonServiceProvider, bindingAwareBroker, keepaliveExecutor, processingExecutor, domBroker,
-                actorSystemProvider, eventExecutor, clientDispatcher, topologyId);
+                actorSystemProvider, eventExecutor, clientDispatcher, topologyId, 0);
     }
+
     @Test
     public void testWriteConfiguration() throws Exception {
 
index d95d15840b5d7ece72fffcc5353911749e1a431e..158ee468ba2ef37bd4236d3e8b60e653865c1ab8 100644 (file)
@@ -8,6 +8,7 @@
 
 package org.opendaylight.netconf.topology.singleton.impl.actors;
 
+import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -44,6 +45,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
 
 public class WriteTransactionActorTest {
     private static final YangInstanceIdentifier PATH = YangInstanceIdentifier.EMPTY;
@@ -65,7 +67,8 @@ public class WriteTransactionActorTest {
         node = Builders.containerBuilder()
                 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("cont")))
                 .build();
-        actorRef = TestActorRef.create(system, WriteTransactionActor.props(deviceWriteTx), "testA");
+        actorRef = TestActorRef.create(system, WriteTransactionActor.props(deviceWriteTx,
+                Duration.apply(2, TimeUnit.SECONDS)), "testA");
     }
 
     @After
@@ -124,4 +127,12 @@ public class WriteTransactionActorTest {
         verify(deviceWriteTx).submit();
     }
 
+    @Test
+    public void testIdleTimeout() throws Exception {
+        final TestProbe probe = new TestProbe(system);
+        probe.watch(actorRef);
+        verify(deviceWriteTx, timeout(3000)).cancel();
+        probe.expectTerminated(actorRef, TIMEOUT.duration());
+    }
+
 }
\ No newline at end of file
index 3e86bd2572da3208bb965464d1dea8813a36637d..0e1d474a5da18ffdf350b2b4d8625c3decfc9ecc 100644 (file)
@@ -91,6 +91,7 @@ public class WriteOnlyTransactionTest {
                 new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9999));
 
         final NetconfTopologySetup setup = mock(NetconfTopologySetup.class);
+        doReturn(Duration.apply(0, TimeUnit.SECONDS)).when(setup).getIdleTimeout();
         final Props props = NetconfNodeActor.props(setup, remoteDeviceId, DEFAULT_SCHEMA_REPOSITORY,
                 DEFAULT_SCHEMA_REPOSITORY, TIMEOUT);