import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.opendaylight.controller.cluster.ActorSystemProvider;
import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
import org.opendaylight.netconf.client.NetconfClientDispatcher;
-import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
import org.opendaylight.netconf.topology.singleton.api.NetconfTopologySingletonService;
import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder;
private final EventExecutor eventExecutor;
private final NetconfClientDispatcher clientDispatcher;
private final String topologyId;
+ private final Duration writeTxIdleTimeout;
public NetconfTopologyManager(final DataBroker dataBroker, final RpcProviderRegistry rpcProviderRegistry,
final ClusterSingletonServiceProvider clusterSingletonServiceProvider,
final BindingAwareBroker bindingAwareBroker,
final ScheduledThreadPool keepaliveExecutor, final ThreadPool processingExecutor,
final Broker domBroker, final ActorSystemProvider actorSystemProvider, final EventExecutor eventExecutor,
- final NetconfClientDispatcher clientDispatcher, final String topologyId) {
+ final NetconfClientDispatcher clientDispatcher, final String topologyId,
+ final int writeTxIdleTimeout) {
this.dataBroker = Preconditions.checkNotNull(dataBroker);
this.rpcProviderRegistry = Preconditions.checkNotNull(rpcProviderRegistry);
this.clusterSingletonServiceProvider = Preconditions.checkNotNull(clusterSingletonServiceProvider);
this.eventExecutor = Preconditions.checkNotNull(eventExecutor);
this.clientDispatcher = Preconditions.checkNotNull(clientDispatcher);
this.topologyId = Preconditions.checkNotNull(topologyId);
+ this.writeTxIdleTimeout = Duration.apply(writeTxIdleTimeout, TimeUnit.SECONDS);
}
// Blueprint init method
.setProcessingExecutor(processingExecutor)
.setTopologyId(topologyId)
.setNetconfClientDispatcher(clientDispatcher)
- .setSchemaResourceDTO(NetconfTopologyUtils.setupSchemaCacheDTO(node));
+ .setSchemaResourceDTO(NetconfTopologyUtils.setupSchemaCacheDTO(node))
+ .setIdleTimeout(writeTxIdleTimeout);
return builder.build();
}
import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.Duration;
public class NetconfNodeActor extends UntypedActor {
private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeActor.class);
- private NetconfTopologySetup setup;
- private RemoteDeviceId id;
private final SchemaSourceRegistry schemaRegistry;
private final SchemaRepository schemaRepository;
+ private final Timeout actorResponseWaitTime;
+ private final Duration writeTxIdleTimeout;
+ private RemoteDeviceId id;
+ private NetconfTopologySetup setup;
private List<SourceIdentifier> sourceIdentifiers;
private DOMRpcService deviceRpc;
private SlaveSalFacade slaveSalManager;
- private final Timeout actorResponseWaitTime;
private DOMDataBroker deviceDataBroker;
//readTxActor can be shared
private ActorRef readTxActor;
this.schemaRegistry = schemaRegistry;
this.schemaRepository = schemaRepository;
this.actorResponseWaitTime = actorResponseWaitTime;
+ this.writeTxIdleTimeout = setup.getIdleTimeout();
}
@Override
} else if (message instanceof NewWriteTransactionRequest) { // master
try {
final DOMDataWriteTransaction tx = deviceDataBroker.newWriteOnlyTransaction();
- final ActorRef txActor = context().actorOf(WriteTransactionActor.props(tx));
+ final ActorRef txActor = context().actorOf(WriteTransactionActor.props(tx, writeTxIdleTimeout));
sender().tell(new NewWriteTransactionReply(txActor), self());
} catch (final Throwable t) {
sender().tell(t, self());
import akka.actor.ActorRef;
import akka.actor.Props;
+import akka.actor.ReceiveTimeout;
import akka.actor.UntypedActor;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.FutureCallback;
import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitReply;
import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.Duration;
/**
* WriteTransactionActor is an interface to device's {@link DOMDataReadOnlyTransaction} for cluster nodes.
*/
public class WriteTransactionActor extends UntypedActor {
+ private static final Logger LOG = LoggerFactory.getLogger(WriteTransactionActor.class);
+
private final DOMDataWriteTransaction tx;
+ private final long idleTimeout;
/**
* Creates new actor Props.
*
* @param tx delegate device write transaction
+ * @param idleTimeout idle time in seconds, after which transaction is closed automatically
* @return props
*/
- static Props props(final DOMDataWriteTransaction tx) {
- return Props.create(WriteTransactionActor.class, () -> new WriteTransactionActor(tx));
+ static Props props(final DOMDataWriteTransaction tx, final Duration idleTimeout) {
+ return Props.create(WriteTransactionActor.class, () -> new WriteTransactionActor(tx, idleTimeout));
}
- private WriteTransactionActor(final DOMDataWriteTransaction tx) {
+ private WriteTransactionActor(final DOMDataWriteTransaction tx, final Duration idleTimeout) {
this.tx = tx;
+ this.idleTimeout = idleTimeout.toSeconds();
+ if (this.idleTimeout > 0) {
+ context().setReceiveTimeout(idleTimeout);
+ }
}
@Override
cancel();
} else if (message instanceof SubmitRequest) {
submit(sender(), self());
+ } else if (message instanceof ReceiveTimeout) {
+ LOG.warn("Haven't received any message for {} seconds, cancelling transaction and stopping actor",
+ idleTimeout);
+ tx.cancel();
+ context().stop(self());
} else {
unhandled(message);
}
import org.opendaylight.netconf.sal.connect.netconf.NetconfDevice;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import scala.concurrent.duration.Duration;
public class NetconfTopologySetup {
private final NetconfClientDispatcher netconfClientDispatcher;
private final String topologyId;
private final NetconfDevice.SchemaResourcesDTO schemaResourceDTO;
+ private final Duration idleTimeout;
private NetconfTopologySetup(final NetconfTopologySetupBuilder builder) {
this.clusterSingletonServiceProvider = builder.getClusterSingletonServiceProvider();
this.netconfClientDispatcher = builder.getNetconfClientDispatcher();
this.topologyId = builder.getTopologyId();
this.schemaResourceDTO = builder.getSchemaResourceDTO();
+ this.idleTimeout = builder.getIdleTimeout();
}
public ClusterSingletonServiceProvider getClusterSingletonServiceProvider() {
return schemaResourceDTO;
}
+ public Duration getIdleTimeout() {
+ return idleTimeout;
+ }
+
public static class NetconfTopologySetupBuilder {
private ClusterSingletonServiceProvider clusterSingletonServiceProvider;
private String topologyId;
private NetconfClientDispatcher netconfClientDispatcher;
private NetconfDevice.SchemaResourcesDTO schemaResourceDTO;
+ private Duration idleTimeout;
public NetconfTopologySetupBuilder(){
}
return bindingAwareBroker;
}
- public NetconfTopologySetupBuilder setBindingAwareBroker(BindingAwareBroker bindingAwareBroker) {
+ public NetconfTopologySetupBuilder setBindingAwareBroker(final BindingAwareBroker bindingAwareBroker) {
this.bindingAwareBroker = bindingAwareBroker;
return this;
}
return keepaliveExecutor;
}
- public NetconfTopologySetupBuilder setKeepaliveExecutor(ScheduledThreadPool keepaliveExecutor) {
+ public NetconfTopologySetupBuilder setKeepaliveExecutor(final ScheduledThreadPool keepaliveExecutor) {
this.keepaliveExecutor = keepaliveExecutor;
return this;
}
return processingExecutor;
}
- public NetconfTopologySetupBuilder setProcessingExecutor(ThreadPool processingExecutor) {
+ public NetconfTopologySetupBuilder setProcessingExecutor(final ThreadPool processingExecutor) {
this.processingExecutor = processingExecutor;
return this;
}
return domBroker;
}
- public NetconfTopologySetupBuilder setDomBroker(Broker domBroker) {
+ public NetconfTopologySetupBuilder setDomBroker(final Broker domBroker) {
this.domBroker = domBroker;
return this;
}
return actorSystem;
}
- public NetconfTopologySetupBuilder setActorSystem(ActorSystem actorSystem) {
+ public NetconfTopologySetupBuilder setActorSystem(final ActorSystem actorSystem) {
this.actorSystem = actorSystem;
return this;
}
return eventExecutor;
}
- public NetconfTopologySetupBuilder setEventExecutor(EventExecutor eventExecutor) {
+ public NetconfTopologySetupBuilder setEventExecutor(final EventExecutor eventExecutor) {
this.eventExecutor = eventExecutor;
return this;
}
return topologyId;
}
- public NetconfTopologySetupBuilder setTopologyId(String topologyId) {
+ public NetconfTopologySetupBuilder setTopologyId(final String topologyId) {
this.topologyId = topologyId;
return this;
}
return netconfClientDispatcher;
}
- public NetconfTopologySetupBuilder setNetconfClientDispatcher(NetconfClientDispatcher clientDispatcher) {
+ public NetconfTopologySetupBuilder setNetconfClientDispatcher(final NetconfClientDispatcher clientDispatcher) {
this.netconfClientDispatcher = clientDispatcher;
return this;
}
return schemaResourceDTO;
}
+ public NetconfTopologySetupBuilder setIdleTimeout(final Duration idleTimeout) {
+ this.idleTimeout = idleTimeout;
+ return this;
+ }
+
+ private Duration getIdleTimeout() {
+ return idleTimeout;
+ }
+
public static NetconfTopologySetupBuilder create() {
return new NetconfTopologySetupBuilder();
}
<argument ref="eventExecutor"/>
<argument ref="clientDispatcherDependency"/>
<argument value="topology-netconf"/>
+ <argument value="0"/>
</bean>
<service ref="netconfTopologyManager"
interface="org.opendaylight.netconf.topology.singleton.api.NetconfTopologySingletonService"/>
netconfTopologyManager = new NetconfTopologyManager(dataBroker, rpcProviderRegistry,
clusterSingletonServiceProvider, bindingAwareBroker, keepaliveExecutor, processingExecutor, domBroker,
- actorSystemProvider, eventExecutor, clientDispatcher, topologyId);
+ actorSystemProvider, eventExecutor, clientDispatcher, topologyId, 0);
}
+
@Test
public void testWriteConfiguration() throws Exception {
package org.opendaylight.netconf.topology.singleton.impl.actors;
+import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
import scala.concurrent.Await;
import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
public class WriteTransactionActorTest {
private static final YangInstanceIdentifier PATH = YangInstanceIdentifier.EMPTY;
node = Builders.containerBuilder()
.withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("cont")))
.build();
- actorRef = TestActorRef.create(system, WriteTransactionActor.props(deviceWriteTx), "testA");
+ actorRef = TestActorRef.create(system, WriteTransactionActor.props(deviceWriteTx,
+ Duration.apply(2, TimeUnit.SECONDS)), "testA");
}
@After
verify(deviceWriteTx).submit();
}
+ @Test
+ public void testIdleTimeout() throws Exception {
+ final TestProbe probe = new TestProbe(system);
+ probe.watch(actorRef);
+ verify(deviceWriteTx, timeout(3000)).cancel();
+ probe.expectTerminated(actorRef, TIMEOUT.duration());
+ }
+
}
\ No newline at end of file
new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9999));
final NetconfTopologySetup setup = mock(NetconfTopologySetup.class);
+ doReturn(Duration.apply(0, TimeUnit.SECONDS)).when(setup).getIdleTimeout();
final Props props = NetconfNodeActor.props(setup, remoteDeviceId, DEFAULT_SCHEMA_REPOSITORY,
DEFAULT_SCHEMA_REPOSITORY, TIMEOUT);