This issue happens, when for some reason transaction
submit or cancel message isn't delivered to master
node. With current implementation, only one device
transaction can be opened at the time, so submit or
cancel delivery failure will lock device forever.
To prevent this, this patch introduces write trancaction
idle timeout. Write transaction actor will be stopped
and its device transaction cancelled, when no message
is received for given time. Cancellation unlocks device,
so mountpouint is usable again.
Change-Id: I37bef30038cf6fd10fa5149a3fa949540ac16eab
Signed-off-by: Andrej Mak <andrej.mak@pantheon.tech>
(cherry picked from commit
95dc1455a7303eac56c755d01a37ca1f203543c0)
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.opendaylight.controller.cluster.ActorSystemProvider;
import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
import javax.annotation.Nonnull;
import org.opendaylight.controller.cluster.ActorSystemProvider;
import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
import org.opendaylight.netconf.client.NetconfClientDispatcher;
import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
import org.opendaylight.netconf.client.NetconfClientDispatcher;
-import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
import org.opendaylight.netconf.topology.singleton.api.NetconfTopologySingletonService;
import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder;
import org.opendaylight.netconf.topology.singleton.api.NetconfTopologySingletonService;
import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder;
private final EventExecutor eventExecutor;
private final NetconfClientDispatcher clientDispatcher;
private final String topologyId;
private final EventExecutor eventExecutor;
private final NetconfClientDispatcher clientDispatcher;
private final String topologyId;
+ private final Duration writeTxIdleTimeout;
public NetconfTopologyManager(final DataBroker dataBroker, final RpcProviderRegistry rpcProviderRegistry,
final ClusterSingletonServiceProvider clusterSingletonServiceProvider,
final BindingAwareBroker bindingAwareBroker,
final ScheduledThreadPool keepaliveExecutor, final ThreadPool processingExecutor,
final Broker domBroker, final ActorSystemProvider actorSystemProvider, final EventExecutor eventExecutor,
public NetconfTopologyManager(final DataBroker dataBroker, final RpcProviderRegistry rpcProviderRegistry,
final ClusterSingletonServiceProvider clusterSingletonServiceProvider,
final BindingAwareBroker bindingAwareBroker,
final ScheduledThreadPool keepaliveExecutor, final ThreadPool processingExecutor,
final Broker domBroker, final ActorSystemProvider actorSystemProvider, final EventExecutor eventExecutor,
- final NetconfClientDispatcher clientDispatcher, final String topologyId) {
+ final NetconfClientDispatcher clientDispatcher, final String topologyId,
+ final int writeTxIdleTimeout) {
this.dataBroker = Preconditions.checkNotNull(dataBroker);
this.rpcProviderRegistry = Preconditions.checkNotNull(rpcProviderRegistry);
this.clusterSingletonServiceProvider = Preconditions.checkNotNull(clusterSingletonServiceProvider);
this.dataBroker = Preconditions.checkNotNull(dataBroker);
this.rpcProviderRegistry = Preconditions.checkNotNull(rpcProviderRegistry);
this.clusterSingletonServiceProvider = Preconditions.checkNotNull(clusterSingletonServiceProvider);
this.eventExecutor = Preconditions.checkNotNull(eventExecutor);
this.clientDispatcher = Preconditions.checkNotNull(clientDispatcher);
this.topologyId = Preconditions.checkNotNull(topologyId);
this.eventExecutor = Preconditions.checkNotNull(eventExecutor);
this.clientDispatcher = Preconditions.checkNotNull(clientDispatcher);
this.topologyId = Preconditions.checkNotNull(topologyId);
+ this.writeTxIdleTimeout = Duration.apply(writeTxIdleTimeout, TimeUnit.SECONDS);
}
// Blueprint init method
}
// Blueprint init method
.setProcessingExecutor(processingExecutor)
.setTopologyId(topologyId)
.setNetconfClientDispatcher(clientDispatcher)
.setProcessingExecutor(processingExecutor)
.setTopologyId(topologyId)
.setNetconfClientDispatcher(clientDispatcher)
- .setSchemaResourceDTO(NetconfTopologyUtils.setupSchemaCacheDTO(node));
+ .setSchemaResourceDTO(NetconfTopologyUtils.setupSchemaCacheDTO(node))
+ .setIdleTimeout(writeTxIdleTimeout);
return builder.build();
}
return builder.build();
}
import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.Duration;
public class NetconfNodeActor extends UntypedActor {
private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeActor.class);
public class NetconfNodeActor extends UntypedActor {
private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeActor.class);
- private NetconfTopologySetup setup;
- private RemoteDeviceId id;
private final SchemaSourceRegistry schemaRegistry;
private final SchemaRepository schemaRepository;
private final SchemaSourceRegistry schemaRegistry;
private final SchemaRepository schemaRepository;
+ private final Timeout actorResponseWaitTime;
+ private final Duration writeTxIdleTimeout;
+ private RemoteDeviceId id;
+ private NetconfTopologySetup setup;
private List<SourceIdentifier> sourceIdentifiers;
private DOMRpcService deviceRpc;
private SlaveSalFacade slaveSalManager;
private List<SourceIdentifier> sourceIdentifiers;
private DOMRpcService deviceRpc;
private SlaveSalFacade slaveSalManager;
- private final Timeout actorResponseWaitTime;
private DOMDataBroker deviceDataBroker;
//readTxActor can be shared
private ActorRef readTxActor;
private DOMDataBroker deviceDataBroker;
//readTxActor can be shared
private ActorRef readTxActor;
this.schemaRegistry = schemaRegistry;
this.schemaRepository = schemaRepository;
this.actorResponseWaitTime = actorResponseWaitTime;
this.schemaRegistry = schemaRegistry;
this.schemaRepository = schemaRepository;
this.actorResponseWaitTime = actorResponseWaitTime;
+ this.writeTxIdleTimeout = setup.getIdleTimeout();
} else if (message instanceof NewWriteTransactionRequest) { // master
try {
final DOMDataWriteTransaction tx = deviceDataBroker.newWriteOnlyTransaction();
} else if (message instanceof NewWriteTransactionRequest) { // master
try {
final DOMDataWriteTransaction tx = deviceDataBroker.newWriteOnlyTransaction();
- final ActorRef txActor = context().actorOf(WriteTransactionActor.props(tx));
+ final ActorRef txActor = context().actorOf(WriteTransactionActor.props(tx, writeTxIdleTimeout));
sender().tell(new NewWriteTransactionReply(txActor), self());
} catch (final Throwable t) {
sender().tell(t, self());
sender().tell(new NewWriteTransactionReply(txActor), self());
} catch (final Throwable t) {
sender().tell(t, self());
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.ActorRef;
import akka.actor.Props;
+import akka.actor.ReceiveTimeout;
import akka.actor.UntypedActor;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.FutureCallback;
import akka.actor.UntypedActor;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.FutureCallback;
import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitReply;
import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitReply;
import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.Duration;
/**
* WriteTransactionActor is an interface to device's {@link DOMDataReadOnlyTransaction} for cluster nodes.
*/
public class WriteTransactionActor extends UntypedActor {
/**
* WriteTransactionActor is an interface to device's {@link DOMDataReadOnlyTransaction} for cluster nodes.
*/
public class WriteTransactionActor extends UntypedActor {
+ private static final Logger LOG = LoggerFactory.getLogger(WriteTransactionActor.class);
+
private final DOMDataWriteTransaction tx;
private final DOMDataWriteTransaction tx;
+ private final long idleTimeout;
/**
* Creates new actor Props.
*
* @param tx delegate device write transaction
/**
* Creates new actor Props.
*
* @param tx delegate device write transaction
+ * @param idleTimeout idle time in seconds, after which transaction is closed automatically
- static Props props(final DOMDataWriteTransaction tx) {
- return Props.create(WriteTransactionActor.class, () -> new WriteTransactionActor(tx));
+ static Props props(final DOMDataWriteTransaction tx, final Duration idleTimeout) {
+ return Props.create(WriteTransactionActor.class, () -> new WriteTransactionActor(tx, idleTimeout));
- private WriteTransactionActor(final DOMDataWriteTransaction tx) {
+ private WriteTransactionActor(final DOMDataWriteTransaction tx, final Duration idleTimeout) {
+ this.idleTimeout = idleTimeout.toSeconds();
+ if (this.idleTimeout > 0) {
+ context().setReceiveTimeout(idleTimeout);
+ }
cancel();
} else if (message instanceof SubmitRequest) {
submit(sender(), self());
cancel();
} else if (message instanceof SubmitRequest) {
submit(sender(), self());
+ } else if (message instanceof ReceiveTimeout) {
+ LOG.warn("Haven't received any message for {} seconds, cancelling transaction and stopping actor",
+ idleTimeout);
+ tx.cancel();
+ context().stop(self());
} else {
unhandled(message);
}
} else {
unhandled(message);
}
import org.opendaylight.netconf.sal.connect.netconf.NetconfDevice;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.netconf.sal.connect.netconf.NetconfDevice;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import scala.concurrent.duration.Duration;
public class NetconfTopologySetup {
public class NetconfTopologySetup {
private final NetconfClientDispatcher netconfClientDispatcher;
private final String topologyId;
private final NetconfDevice.SchemaResourcesDTO schemaResourceDTO;
private final NetconfClientDispatcher netconfClientDispatcher;
private final String topologyId;
private final NetconfDevice.SchemaResourcesDTO schemaResourceDTO;
+ private final Duration idleTimeout;
private NetconfTopologySetup(final NetconfTopologySetupBuilder builder) {
this.clusterSingletonServiceProvider = builder.getClusterSingletonServiceProvider();
private NetconfTopologySetup(final NetconfTopologySetupBuilder builder) {
this.clusterSingletonServiceProvider = builder.getClusterSingletonServiceProvider();
this.netconfClientDispatcher = builder.getNetconfClientDispatcher();
this.topologyId = builder.getTopologyId();
this.schemaResourceDTO = builder.getSchemaResourceDTO();
this.netconfClientDispatcher = builder.getNetconfClientDispatcher();
this.topologyId = builder.getTopologyId();
this.schemaResourceDTO = builder.getSchemaResourceDTO();
+ this.idleTimeout = builder.getIdleTimeout();
}
public ClusterSingletonServiceProvider getClusterSingletonServiceProvider() {
}
public ClusterSingletonServiceProvider getClusterSingletonServiceProvider() {
return schemaResourceDTO;
}
return schemaResourceDTO;
}
+ public Duration getIdleTimeout() {
+ return idleTimeout;
+ }
+
public static class NetconfTopologySetupBuilder {
private ClusterSingletonServiceProvider clusterSingletonServiceProvider;
public static class NetconfTopologySetupBuilder {
private ClusterSingletonServiceProvider clusterSingletonServiceProvider;
private String topologyId;
private NetconfClientDispatcher netconfClientDispatcher;
private NetconfDevice.SchemaResourcesDTO schemaResourceDTO;
private String topologyId;
private NetconfClientDispatcher netconfClientDispatcher;
private NetconfDevice.SchemaResourcesDTO schemaResourceDTO;
+ private Duration idleTimeout;
public NetconfTopologySetupBuilder(){
}
public NetconfTopologySetupBuilder(){
}
return bindingAwareBroker;
}
return bindingAwareBroker;
}
- public NetconfTopologySetupBuilder setBindingAwareBroker(BindingAwareBroker bindingAwareBroker) {
+ public NetconfTopologySetupBuilder setBindingAwareBroker(final BindingAwareBroker bindingAwareBroker) {
this.bindingAwareBroker = bindingAwareBroker;
return this;
}
this.bindingAwareBroker = bindingAwareBroker;
return this;
}
return keepaliveExecutor;
}
return keepaliveExecutor;
}
- public NetconfTopologySetupBuilder setKeepaliveExecutor(ScheduledThreadPool keepaliveExecutor) {
+ public NetconfTopologySetupBuilder setKeepaliveExecutor(final ScheduledThreadPool keepaliveExecutor) {
this.keepaliveExecutor = keepaliveExecutor;
return this;
}
this.keepaliveExecutor = keepaliveExecutor;
return this;
}
return processingExecutor;
}
return processingExecutor;
}
- public NetconfTopologySetupBuilder setProcessingExecutor(ThreadPool processingExecutor) {
+ public NetconfTopologySetupBuilder setProcessingExecutor(final ThreadPool processingExecutor) {
this.processingExecutor = processingExecutor;
return this;
}
this.processingExecutor = processingExecutor;
return this;
}
- public NetconfTopologySetupBuilder setDomBroker(Broker domBroker) {
+ public NetconfTopologySetupBuilder setDomBroker(final Broker domBroker) {
this.domBroker = domBroker;
return this;
}
this.domBroker = domBroker;
return this;
}
- public NetconfTopologySetupBuilder setActorSystem(ActorSystem actorSystem) {
+ public NetconfTopologySetupBuilder setActorSystem(final ActorSystem actorSystem) {
this.actorSystem = actorSystem;
return this;
}
this.actorSystem = actorSystem;
return this;
}
- public NetconfTopologySetupBuilder setEventExecutor(EventExecutor eventExecutor) {
+ public NetconfTopologySetupBuilder setEventExecutor(final EventExecutor eventExecutor) {
this.eventExecutor = eventExecutor;
return this;
}
this.eventExecutor = eventExecutor;
return this;
}
- public NetconfTopologySetupBuilder setTopologyId(String topologyId) {
+ public NetconfTopologySetupBuilder setTopologyId(final String topologyId) {
this.topologyId = topologyId;
return this;
}
this.topologyId = topologyId;
return this;
}
return netconfClientDispatcher;
}
return netconfClientDispatcher;
}
- public NetconfTopologySetupBuilder setNetconfClientDispatcher(NetconfClientDispatcher clientDispatcher) {
+ public NetconfTopologySetupBuilder setNetconfClientDispatcher(final NetconfClientDispatcher clientDispatcher) {
this.netconfClientDispatcher = clientDispatcher;
return this;
}
this.netconfClientDispatcher = clientDispatcher;
return this;
}
return schemaResourceDTO;
}
return schemaResourceDTO;
}
+ public NetconfTopologySetupBuilder setIdleTimeout(final Duration idleTimeout) {
+ this.idleTimeout = idleTimeout;
+ return this;
+ }
+
+ private Duration getIdleTimeout() {
+ return idleTimeout;
+ }
+
public static NetconfTopologySetupBuilder create() {
return new NetconfTopologySetupBuilder();
}
public static NetconfTopologySetupBuilder create() {
return new NetconfTopologySetupBuilder();
}
<argument ref="eventExecutor"/>
<argument ref="clientDispatcherDependency"/>
<argument value="topology-netconf"/>
<argument ref="eventExecutor"/>
<argument ref="clientDispatcherDependency"/>
<argument value="topology-netconf"/>
</bean>
<service ref="netconfTopologyManager"
interface="org.opendaylight.netconf.topology.singleton.api.NetconfTopologySingletonService"/>
</bean>
<service ref="netconfTopologyManager"
interface="org.opendaylight.netconf.topology.singleton.api.NetconfTopologySingletonService"/>
netconfTopologyManager = new NetconfTopologyManager(dataBroker, rpcProviderRegistry,
clusterSingletonServiceProvider, bindingAwareBroker, keepaliveExecutor, processingExecutor, domBroker,
netconfTopologyManager = new NetconfTopologyManager(dataBroker, rpcProviderRegistry,
clusterSingletonServiceProvider, bindingAwareBroker, keepaliveExecutor, processingExecutor, domBroker,
- actorSystemProvider, eventExecutor, clientDispatcher, topologyId);
+ actorSystemProvider, eventExecutor, clientDispatcher, topologyId, 0);
@Test
public void testWriteConfiguration() throws Exception {
@Test
public void testWriteConfiguration() throws Exception {
package org.opendaylight.netconf.topology.singleton.impl.actors;
package org.opendaylight.netconf.topology.singleton.impl.actors;
+import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
import scala.concurrent.Await;
import scala.concurrent.Future;
import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
import scala.concurrent.Await;
import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
public class WriteTransactionActorTest {
private static final YangInstanceIdentifier PATH = YangInstanceIdentifier.EMPTY;
public class WriteTransactionActorTest {
private static final YangInstanceIdentifier PATH = YangInstanceIdentifier.EMPTY;
node = Builders.containerBuilder()
.withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("cont")))
.build();
node = Builders.containerBuilder()
.withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("cont")))
.build();
- actorRef = TestActorRef.create(system, WriteTransactionActor.props(deviceWriteTx), "testA");
+ actorRef = TestActorRef.create(system, WriteTransactionActor.props(deviceWriteTx,
+ Duration.apply(2, TimeUnit.SECONDS)), "testA");
verify(deviceWriteTx).submit();
}
verify(deviceWriteTx).submit();
}
+ @Test
+ public void testIdleTimeout() throws Exception {
+ final TestProbe probe = new TestProbe(system);
+ probe.watch(actorRef);
+ verify(deviceWriteTx, timeout(3000)).cancel();
+ probe.expectTerminated(actorRef, TIMEOUT.duration());
+ }
+
}
\ No newline at end of file
}
\ No newline at end of file
new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9999));
final NetconfTopologySetup setup = mock(NetconfTopologySetup.class);
new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9999));
final NetconfTopologySetup setup = mock(NetconfTopologySetup.class);
+ doReturn(Duration.apply(0, TimeUnit.SECONDS)).when(setup).getIdleTimeout();
final Props props = NetconfNodeActor.props(setup, remoteDeviceId, DEFAULT_SCHEMA_REPOSITORY,
DEFAULT_SCHEMA_REPOSITORY, TIMEOUT);
final Props props = NetconfNodeActor.props(setup, remoteDeviceId, DEFAULT_SCHEMA_REPOSITORY,
DEFAULT_SCHEMA_REPOSITORY, TIMEOUT);