package org.opendaylight.netconf.topology.singleton.impl;
import akka.actor.ActorSystem;
+import akka.util.Timeout;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.Duration;
public class NetconfTopologyManager
implements ClusteredDataTreeChangeListener<Node>, NetconfTopologySingletonService, AutoCloseable {
private final String topologyId;
public NetconfTopologyManager(final DataBroker dataBroker, final RpcProviderRegistry rpcProviderRegistry,
- final ClusterSingletonServiceProvider clusterSingletonServiceProvider,
- final BindingAwareBroker bindingAwareBroker,
- final ScheduledThreadPool keepaliveExecutor, final ThreadPool processingExecutor,
- final Broker domBroker, final ActorSystemProvider actorSystemProvider, final EventExecutor eventExecutor,
- final NetconfClientDispatcher clientDispatcher, final String topologyId) {
+ final ClusterSingletonServiceProvider clusterSingletonServiceProvider,
+ final BindingAwareBroker bindingAwareBroker,
+ final ScheduledThreadPool keepaliveExecutor, final ThreadPool processingExecutor,
+ final Broker domBroker, final ActorSystemProvider actorSystemProvider, final EventExecutor eventExecutor,
+ final NetconfClientDispatcher clientDispatcher, final String topologyId) {
this.dataBroker = Preconditions.checkNotNull(dataBroker);
this.rpcProviderRegistry = Preconditions.checkNotNull(rpcProviderRegistry);
this.clusterSingletonServiceProvider = Preconditions.checkNotNull(clusterSingletonServiceProvider);
@Override
public void onDataTreeChanged(@Nonnull final Collection<DataTreeModification<Node>> changes) {
- for (DataTreeModification<Node> change : changes) {
+ for (final DataTreeModification<Node> change : changes) {
final DataObjectModification<Node> rootNode = change.getRootNode();
final InstanceIdentifier<Node> dataModifIdent = change.getRootPath().getRootIdentifier();
final NodeId nodeId = NetconfTopologyUtils.getNodeId(rootNode.getIdentifier());
}
}
- private void refreshNetconfDeviceContext(InstanceIdentifier<Node> instanceIdentifier, Node node) {
+ private void refreshNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
final NetconfTopologyContext context = contexts.get(instanceIdentifier);
context.refresh(createSetup(instanceIdentifier, node));
}
Preconditions.checkNotNull(netconfNode.getHost());
Preconditions.checkNotNull(netconfNode.getHost().getIpAddress());
+ final Timeout actorResponseWaitTime = new Timeout(Duration.create(netconfNode.getActorResponseWaitTime(),
+ "seconds"));
+
final ServiceGroupIdentifier serviceGroupIdent =
ServiceGroupIdentifier.create(instanceIdentifier.toString());
final NetconfTopologyContext newNetconfTopologyContext =
- new NetconfTopologyContext(createSetup(instanceIdentifier, node), serviceGroupIdent);
+ new NetconfTopologyContext(createSetup(instanceIdentifier, node), serviceGroupIdent,
+ actorResponseWaitTime);
final ClusterSingletonServiceRegistration clusterSingletonServiceRegistration =
clusterSingletonServiceProvider.registerClusterSingletonService(newNetconfTopologyContext);
try {
clusterRegistrations.get(instanceIdentifier).close();
contexts.get(instanceIdentifier).closeFinal();
- } catch (Exception e) {
+ } catch (final Exception e) {
LOG.warn("Error at closing topology context. InstanceIdentifier: " + instanceIdentifier);
}
contexts.remove(instanceIdentifier);
contexts.forEach((instanceIdentifier, netconfTopologyContext) -> {
try {
netconfTopologyContext.closeFinal();
- } catch (Exception e) {
+ } catch (final Exception e) {
LOG.error("Error at closing topology context. InstanceIdentifier: " + instanceIdentifier, e);
}
});
clusterRegistrations.forEach((instanceIdentifier, clusterSingletonServiceRegistration) -> {
try {
clusterSingletonServiceRegistration.close();
- } catch (Exception e) {
+ } catch (final Exception e) {
LOG.error("Error at unregistering from cluster. InstanceIdentifier: " + instanceIdentifier, e);
}
});
clusterRegistrations.clear();
}
- private ListenerRegistration<NetconfTopologyManager> registerDataTreeChangeListener(String topologyId) {
+ private ListenerRegistration<NetconfTopologyManager> registerDataTreeChangeListener(final String topologyId) {
final WriteTransaction wtx = dataBroker.newWriteOnlyTransaction();
initTopology(wtx, LogicalDatastoreType.CONFIGURATION, topologyId);
initTopology(wtx, LogicalDatastoreType.OPERATIONAL, topologyId);
Futures.addCallback(wtx.submit(), new FutureCallback<Void>() {
@Override
- public void onSuccess(Void result) {
+ public void onSuccess(final Void result) {
LOG.debug("topology initialization successful");
}
@Override
- public void onFailure(@Nonnull Throwable throwable) {
+ public void onFailure(@Nonnull final Throwable throwable) {
LOG.error("Unable to initialize netconf-topology, {}", throwable);
}
});
NetconfTopologyUtils.createTopologyListPath(topologyId).child(Node.class)), this);
}
- private void initTopology(final WriteTransaction wtx, final LogicalDatastoreType datastoreType, String topologyId) {
+ private void initTopology(final WriteTransaction wtx, final LogicalDatastoreType datastoreType, final String topologyId) {
final NetworkTopology networkTopology = new NetworkTopologyBuilder().build();
final InstanceIdentifier<NetworkTopology> networkTopologyId =
InstanceIdentifier.builder(NetworkTopology.class).build();