import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.opendaylight.controller.cluster.ActorSystemProvider;
import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.Duration;
public class NetconfTopologyManager
implements ClusteredDataTreeChangeListener<Node>, NetconfTopologySingletonService, AutoCloseable {
private final EventExecutor eventExecutor;
private final NetconfClientDispatcher clientDispatcher;
private final String topologyId;
+ private final Duration writeTxIdleTimeout;
public NetconfTopologyManager(final DataBroker dataBroker, final RpcProviderRegistry rpcProviderRegistry,
- final ClusterSingletonServiceProvider clusterSingletonServiceProvider,
- final BindingAwareBroker bindingAwareBroker,
- final ScheduledThreadPool keepaliveExecutor, final ThreadPool processingExecutor,
- final Broker domBroker, final ActorSystemProvider actorSystemProvider, final EventExecutor eventExecutor,
- final NetconfClientDispatcher clientDispatcher, final String topologyId) {
+ final ClusterSingletonServiceProvider clusterSingletonServiceProvider,
+ final BindingAwareBroker bindingAwareBroker,
+ final ScheduledThreadPool keepaliveExecutor, final ThreadPool processingExecutor,
+ final Broker domBroker, final ActorSystemProvider actorSystemProvider, final EventExecutor eventExecutor,
+ final NetconfClientDispatcher clientDispatcher, final String topologyId,
+ final int writeTxIdleTimeout) {
this.dataBroker = Preconditions.checkNotNull(dataBroker);
this.rpcProviderRegistry = Preconditions.checkNotNull(rpcProviderRegistry);
this.clusterSingletonServiceProvider = Preconditions.checkNotNull(clusterSingletonServiceProvider);
this.eventExecutor = Preconditions.checkNotNull(eventExecutor);
this.clientDispatcher = Preconditions.checkNotNull(clientDispatcher);
this.topologyId = Preconditions.checkNotNull(topologyId);
+ this.writeTxIdleTimeout = Duration.apply(writeTxIdleTimeout, TimeUnit.SECONDS);
}
// Blueprint init method
@Override
public void onDataTreeChanged(@Nonnull final Collection<DataTreeModification<Node>> changes) {
- for (DataTreeModification<Node> change : changes) {
+ for (final DataTreeModification<Node> change : changes) {
final DataObjectModification<Node> rootNode = change.getRootNode();
final InstanceIdentifier<Node> dataModifIdent = change.getRootPath().getRootIdentifier();
final NodeId nodeId = NetconfTopologyUtils.getNodeId(rootNode.getIdentifier());
}
}
- private void refreshNetconfDeviceContext(InstanceIdentifier<Node> instanceIdentifier, Node node) {
+ private void refreshNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
final NetconfTopologyContext context = contexts.get(instanceIdentifier);
context.refresh(createSetup(instanceIdentifier, node));
}
try {
clusterRegistrations.get(instanceIdentifier).close();
contexts.get(instanceIdentifier).closeFinal();
- } catch (Exception e) {
+ } catch (final Exception e) {
LOG.warn("Error at closing topology context. InstanceIdentifier: " + instanceIdentifier);
}
contexts.remove(instanceIdentifier);
contexts.forEach((instanceIdentifier, netconfTopologyContext) -> {
try {
netconfTopologyContext.closeFinal();
- } catch (Exception e) {
+ } catch (final Exception e) {
LOG.error("Error at closing topology context. InstanceIdentifier: " + instanceIdentifier, e);
}
});
clusterRegistrations.forEach((instanceIdentifier, clusterSingletonServiceRegistration) -> {
try {
clusterSingletonServiceRegistration.close();
- } catch (Exception e) {
+ } catch (final Exception e) {
LOG.error("Error at unregistering from cluster. InstanceIdentifier: " + instanceIdentifier, e);
}
});
clusterRegistrations.clear();
}
- private ListenerRegistration<NetconfTopologyManager> registerDataTreeChangeListener(String topologyId) {
+ private ListenerRegistration<NetconfTopologyManager> registerDataTreeChangeListener(final String topologyId) {
final WriteTransaction wtx = dataBroker.newWriteOnlyTransaction();
initTopology(wtx, LogicalDatastoreType.CONFIGURATION, topologyId);
initTopology(wtx, LogicalDatastoreType.OPERATIONAL, topologyId);
Futures.addCallback(wtx.submit(), new FutureCallback<Void>() {
@Override
- public void onSuccess(Void result) {
+ public void onSuccess(final Void result) {
LOG.debug("topology initialization successful");
}
@Override
- public void onFailure(@Nonnull Throwable throwable) {
+ public void onFailure(@Nonnull final Throwable throwable) {
LOG.error("Unable to initialize netconf-topology, {}", throwable);
}
});
NetconfTopologyUtils.createTopologyListPath(topologyId).child(Node.class)), this);
}
- private void initTopology(final WriteTransaction wtx, final LogicalDatastoreType datastoreType, String topologyId) {
+ private void initTopology(final WriteTransaction wtx, final LogicalDatastoreType datastoreType, final String topologyId) {
final NetworkTopology networkTopology = new NetworkTopologyBuilder().build();
final InstanceIdentifier<NetworkTopology> networkTopologyId =
InstanceIdentifier.builder(NetworkTopology.class).build();
.setKeepaliveExecutor(keepaliveExecutor)
.setProcessingExecutor(processingExecutor)
.setTopologyId(topologyId)
- .setNetconfClientDispatcher(clientDispatcher);
+ .setNetconfClientDispatcher(clientDispatcher)
+ .setIdleTimeout(writeTxIdleTimeout);
return builder.build();
}
import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.Duration;
public class NetconfNodeActor extends UntypedActor {
private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeActor.class);
- private NetconfTopologySetup setup;
- private RemoteDeviceId id;
private final SchemaSourceRegistry schemaRegistry;
private final SchemaRepository schemaRepository;
+ private final Duration writeTxIdleTimeout;
+ private RemoteDeviceId id;
+ private NetconfTopologySetup setup;
private List<SourceIdentifier> sourceIdentifiers;
private DOMRpcService deviceRpc;
private SlaveSalFacade slaveSalManager;
}
private NetconfNodeActor(final NetconfTopologySetup setup,
- final RemoteDeviceId id, SchemaSourceRegistry schemaRegistry,
+ final RemoteDeviceId id, final SchemaSourceRegistry schemaRegistry,
final SchemaRepository schemaRepository) {
this.setup = setup;
this.id = id;
this.schemaRegistry = schemaRegistry;
this.schemaRepository = schemaRepository;
+ this.writeTxIdleTimeout = setup.getIdleTimeout();
}
@Override
} else if (message instanceof NewWriteTransactionRequest) { // master
try {
final DOMDataWriteTransaction tx = deviceDataBroker.newWriteOnlyTransaction();
- final ActorRef txActor = context().actorOf(WriteTransactionActor.props(tx));
+ final ActorRef txActor = context().actorOf(WriteTransactionActor.props(tx, writeTxIdleTimeout));
sender().tell(new NewWriteTransactionReply(txActor), self());
} catch (final Throwable t) {
sender().tell(t, self());
public void onSuccess(final YangTextSchemaSource yangTextSchemaSource) {
try {
sender.tell(new YangTextSchemaSourceSerializationProxy(yangTextSchemaSource), getSelf());
- } catch (IOException exception) {
+ } catch (final IOException exception) {
sender.tell(exception.getCause(), getSelf());
}
}
});
}
- private void registerSlaveMountPoint(ActorRef masterReference) {
+ private void registerSlaveMountPoint(final ActorRef masterReference) {
if (this.slaveSalManager != null) {
slaveSalManager.close();
}
});
}
- private DOMRpcService getDOMRpcService(ActorRef masterReference) {
+ private DOMRpcService getDOMRpcService(final ActorRef masterReference) {
return new ProxyDOMRpcService(setup.getActorSystem(), masterReference, id);
}
- private CheckedFuture<SchemaContext, SchemaResolutionException> getSchemaContext(ActorRef masterReference) {
+ private CheckedFuture<SchemaContext, SchemaResolutionException> getSchemaContext(final ActorRef masterReference) {
final RemoteYangTextSourceProvider remoteYangTextSourceProvider =
new ProxyYangTextSourceProvider(masterReference, getContext());
import akka.actor.ActorRef;
import akka.actor.Props;
+import akka.actor.ReceiveTimeout;
import akka.actor.UntypedActor;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.FutureCallback;
import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitReply;
import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.Duration;
/**
* WriteTransactionActor is an interface to device's {@link DOMDataReadOnlyTransaction} for cluster nodes.
*/
public class WriteTransactionActor extends UntypedActor {
+ private static final Logger LOG = LoggerFactory.getLogger(WriteTransactionActor.class);
+
private final DOMDataWriteTransaction tx;
+ private final long idleTimeout;
/**
* Creates new actor Props.
*
* @param tx delegate device write transaction
+ * @param idleTimeout idle time in seconds, after which transaction is closed automatically
* @return props
*/
- static Props props(final DOMDataWriteTransaction tx) {
- return Props.create(WriteTransactionActor.class, () -> new WriteTransactionActor(tx));
+ static Props props(final DOMDataWriteTransaction tx, final Duration idleTimeout) {
+ return Props.create(WriteTransactionActor.class, () -> new WriteTransactionActor(tx, idleTimeout));
}
- private WriteTransactionActor(final DOMDataWriteTransaction tx) {
+ private WriteTransactionActor(final DOMDataWriteTransaction tx, final Duration idleTimeout) {
this.tx = tx;
+ this.idleTimeout = idleTimeout.toSeconds();
+ if (this.idleTimeout > 0) {
+ context().setReceiveTimeout(idleTimeout);
+ }
}
@Override
cancel();
} else if (message instanceof SubmitRequest) {
submit(sender(), self());
+ } else if (message instanceof ReceiveTimeout) {
+ LOG.warn("Haven't received any message for {} seconds, cancelling transaction and stopping actor",
+ idleTimeout);
+ tx.cancel();
+ context().stop(self());
} else {
unhandled(message);
}
import org.opendaylight.netconf.client.NetconfClientDispatcher;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import scala.concurrent.duration.Duration;
public class NetconfTopologySetup {
private final EventExecutor eventExecutor;
private final NetconfClientDispatcher netconfClientDispatcher;
private final String topologyId;
+ private final Duration idleTimeout;
private NetconfTopologySetup(final NetconfTopologySetupBuilder builder) {
this.clusterSingletonServiceProvider = builder.getClusterSingletonServiceProvider();
this.rpcProviderRegistry = builder.getRpcProviderRegistry();
this.eventExecutor = builder.getEventExecutor();
this.netconfClientDispatcher = builder.getNetconfClientDispatcher();
this.topologyId = builder.getTopologyId();
+ this.idleTimeout = builder.getIdleTimeout();
}
public ClusterSingletonServiceProvider getClusterSingletonServiceProvider() {
return netconfClientDispatcher;
}
+ public Duration getIdleTimeout() {
+ return idleTimeout;
+ }
+
public static class NetconfTopologySetupBuilder {
private ClusterSingletonServiceProvider clusterSingletonServiceProvider;
private EventExecutor eventExecutor;
private String topologyId;
private NetconfClientDispatcher netconfClientDispatcher;
+ private Duration idleTimeout;
public NetconfTopologySetupBuilder(){
}
return bindingAwareBroker;
}
- public NetconfTopologySetupBuilder setBindingAwareBroker(BindingAwareBroker bindingAwareBroker) {
+ public NetconfTopologySetupBuilder setBindingAwareBroker(final BindingAwareBroker bindingAwareBroker) {
this.bindingAwareBroker = bindingAwareBroker;
return this;
}
return keepaliveExecutor;
}
- public NetconfTopologySetupBuilder setKeepaliveExecutor(ScheduledThreadPool keepaliveExecutor) {
+ public NetconfTopologySetupBuilder setKeepaliveExecutor(final ScheduledThreadPool keepaliveExecutor) {
this.keepaliveExecutor = keepaliveExecutor;
return this;
}
return processingExecutor;
}
- public NetconfTopologySetupBuilder setProcessingExecutor(ThreadPool processingExecutor) {
+ public NetconfTopologySetupBuilder setProcessingExecutor(final ThreadPool processingExecutor) {
this.processingExecutor = processingExecutor;
return this;
}
return domBroker;
}
- public NetconfTopologySetupBuilder setDomBroker(Broker domBroker) {
+ public NetconfTopologySetupBuilder setDomBroker(final Broker domBroker) {
this.domBroker = domBroker;
return this;
}
return actorSystem;
}
- public NetconfTopologySetupBuilder setActorSystem(ActorSystem actorSystem) {
+ public NetconfTopologySetupBuilder setActorSystem(final ActorSystem actorSystem) {
this.actorSystem = actorSystem;
return this;
}
return eventExecutor;
}
- public NetconfTopologySetupBuilder setEventExecutor(EventExecutor eventExecutor) {
+ public NetconfTopologySetupBuilder setEventExecutor(final EventExecutor eventExecutor) {
this.eventExecutor = eventExecutor;
return this;
}
return topologyId;
}
- public NetconfTopologySetupBuilder setTopologyId(String topologyId) {
+ public NetconfTopologySetupBuilder setTopologyId(final String topologyId) {
this.topologyId = topologyId;
return this;
}
return netconfClientDispatcher;
}
- public NetconfTopologySetupBuilder setNetconfClientDispatcher(NetconfClientDispatcher clientDispatcher) {
+ public NetconfTopologySetupBuilder setNetconfClientDispatcher(final NetconfClientDispatcher clientDispatcher) {
this.netconfClientDispatcher = clientDispatcher;
return this;
}
+ public NetconfTopologySetupBuilder setIdleTimeout(final Duration idleTimeout) {
+ this.idleTimeout = idleTimeout;
+ return this;
+ }
+
+ private Duration getIdleTimeout() {
+ return idleTimeout;
+ }
+
public static NetconfTopologySetupBuilder create() {
return new NetconfTopologySetupBuilder();
}
<argument ref="eventExecutor"/>
<argument ref="clientDispatcherDependency"/>
<argument value="topology-netconf"/>
+ <argument value="0"/>
</bean>
<service ref="netconfTopologyManager"
interface="org.opendaylight.netconf.topology.singleton.api.NetconfTopologySingletonService"/>
netconfTopologyManager = new NetconfTopologyManager(dataBroker, rpcProviderRegistry,
clusterSingletonServiceProvider, bindingAwareBroker, keepaliveExecutor, processingExecutor, domBroker,
- actorSystemProvider, eventExecutor, clientDispatcher, topologyId);
+ actorSystemProvider, eventExecutor, clientDispatcher, topologyId, 0);
}
+
@Test
public void testWriteConfiguration() throws Exception {
package org.opendaylight.netconf.topology.singleton.impl.actors;
+import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
import scala.concurrent.Await;
import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
public class WriteTransactionActorTest {
private static final YangInstanceIdentifier PATH = YangInstanceIdentifier.EMPTY;
node = Builders.containerBuilder()
.withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("cont")))
.build();
- actorRef = TestActorRef.create(system, WriteTransactionActor.props(deviceWriteTx), "testA");
+ actorRef = TestActorRef.create(system, WriteTransactionActor.props(deviceWriteTx,
+ Duration.apply(2, TimeUnit.SECONDS)), "testA");
}
@After
verify(deviceWriteTx).submit();
}
+ @Test
+ public void testIdleTimeout() throws Exception {
+ final TestProbe probe = new TestProbe(system);
+ probe.watch(actorRef);
+ verify(deviceWriteTx, timeout(3000)).cancel();
+ probe.expectTerminated(actorRef, TIMEOUT.duration());
+ }
+
}
\ No newline at end of file
new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9999));
final NetconfTopologySetup setup = mock(NetconfTopologySetup.class);
+ doReturn(Duration.apply(0, TimeUnit.SECONDS)).when(setup).getIdleTimeout();
final Props props = NetconfNodeActor.props(setup, remoteDeviceId, DEFAULT_SCHEMA_REPOSITORY,
DEFAULT_SCHEMA_REPOSITORY);