import akka.cluster.Cluster;
import akka.dispatch.OnComplete;
import akka.pattern.Patterns;
+import akka.util.Timeout;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import java.util.List;
import org.opendaylight.netconf.topology.singleton.api.NetconfDOMTransaction;
import org.opendaylight.netconf.topology.singleton.impl.tx.NetconfMasterDOMTransaction;
import org.opendaylight.netconf.topology.singleton.impl.tx.NetconfProxyDOMTransaction;
-import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
import org.opendaylight.yangtools.yang.common.SimpleDateFormatUtil;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
private static final Logger LOG = LoggerFactory.getLogger(MasterSalFacade.class);
private final RemoteDeviceId id;
+ private final Timeout actorResponseWaitTime;
private SchemaContext remoteSchemaContext = null;
private NetconfSessionPreferences netconfSessionPreferences = null;
private DOMDataBroker deviceDataBroker = null;
MasterSalFacade(final RemoteDeviceId id,
- final Broker domBroker,
- final BindingAwareBroker bindingBroker,
- final ActorSystem actorSystem,
- final ActorRef masterActorRef) {
+ final Broker domBroker,
+ final BindingAwareBroker bindingBroker,
+ final ActorSystem actorSystem,
+ final ActorRef masterActorRef,
+ final Timeout actorResponseWaitTime) {
this.id = id;
this.salProvider = new NetconfDeviceSalProvider(id);
this.actorSystem = actorSystem;
this.masterActorRef = masterActorRef;
+ this.actorResponseWaitTime = actorResponseWaitTime;
registerToSal(domBroker, bindingBroker);
}
// We need to create NetconfProxyDOMTransaction so accessing mountpoint
// on leader node would be same as on follower node
final NetconfDOMTransaction proxyDOMTransation =
- new NetconfProxyDOMTransaction(id, actorSystem, masterActorRef);
+ new NetconfProxyDOMTransaction(id, actorSystem, masterActorRef, actorResponseWaitTime);
final NetconfDOMDataBroker proxyDataBroker = new NetconfDOMDataBroker(actorSystem, id, proxyDOMTransation);
salProvider.getMountInstance()
.onTopologyDeviceConnected(remoteSchemaContext, proxyDataBroker, deviceRpc, notificationService);
// send initial data to master actor and create actor for providing it
return Patterns.ask(masterActorRef, new CreateInitialMasterActorData(deviceDataBroker, sourceIdentifiers,
- deviceRpc), NetconfTopologyUtils.TIMEOUT);
+ deviceRpc), actorResponseWaitTime);
}
private void updateDeviceData() {
- Cluster cluster = Cluster.get(actorSystem);
+ final Cluster cluster = Cluster.get(actorSystem);
salProvider.getTopologyDatastoreAdapter().updateClusteredDeviceData(true, cluster.selfAddress().toString(),
netconfSessionPreferences.getNetconfDeviceCapabilities());
}
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
+import akka.util.Timeout;
import java.util.Collection;
import javax.annotation.Nonnull;
import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
private final SchemaSourceRegistry schemaRegistry;
private final SchemaRepository schemaRepository;
private ActorRef slaveActorRef;
+ private final Timeout actorResponseWaitTime;
NetconfNodeManager(final NetconfTopologySetup setup,
final RemoteDeviceId id, final SchemaSourceRegistry schemaRegistry,
- final SchemaRepository schemaRepository) {
+ final SchemaRepository schemaRepository, final Timeout actorResponseWaitTime) {
this.setup = setup;
this.id = id;
this.schemaRegistry = schemaRegistry;
this.schemaRepository = schemaRepository;
+ this.actorResponseWaitTime = actorResponseWaitTime;
}
@Override
NetconfTopologyUtils.createMasterActorName(id.getName(),
netconfNodeAfter.getClusteredConnectionStatus().getNetconfMasterNode()));
setup.getActorSystem().actorSelection(path).tell(new AskForMasterMountPoint(), slaveActorRef);
- } else { ;
+ } else {
closeActor();
}
}
private void createActorRef() {
if (slaveActorRef == null) {
slaveActorRef = setup.getActorSystem().actorOf(NetconfNodeActor.props(setup, id, schemaRegistry,
- schemaRepository), id.getName());
+ schemaRepository, actorResponseWaitTime), id.getName());
}
}
import akka.cluster.Cluster;
import akka.dispatch.OnComplete;
import akka.pattern.Patterns;
+import akka.util.Timeout;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
private static final Logger LOG = LoggerFactory.getLogger(NetconfTopologyContext.class);
private final ServiceGroupIdentifier serviceGroupIdent;
+ private final Timeout actorResponseWaitTime;
private NetconfTopologySetup netconfTopologyDeviceSetup;
private RemoteDeviceId remoteDeviceId;
private RemoteDeviceConnector remoteDeviceConnector;
private ActorRef masterActorRef;
NetconfTopologyContext(final NetconfTopologySetup netconfTopologyDeviceSetup,
- final ServiceGroupIdentifier serviceGroupIdent) {
+ final ServiceGroupIdentifier serviceGroupIdent,
+ final Timeout actorResponseWaitTime) {
this.netconfTopologyDeviceSetup = Preconditions.checkNotNull(netconfTopologyDeviceSetup);
this.serviceGroupIdent = serviceGroupIdent;
+ this.actorResponseWaitTime = actorResponseWaitTime;
remoteDeviceId = NetconfTopologyUtils.createRemoteDeviceId(netconfTopologyDeviceSetup.getNode().getNodeId(),
netconfTopologyDeviceSetup.getNode().getAugmentation(NetconfNode.class));
- remoteDeviceConnector = new RemoteDeviceConnectorImpl(netconfTopologyDeviceSetup, remoteDeviceId);
+ remoteDeviceConnector = new RemoteDeviceConnectorImpl(netconfTopologyDeviceSetup, remoteDeviceId,
+ actorResponseWaitTime);
netconfNodeManager = createNodeDeviceManager();
-
}
@Override
if (!finalClose) {
final String masterAddress = Cluster.get(netconfTopologyDeviceSetup.getActorSystem()).selfAddress().toString();
masterActorRef = netconfTopologyDeviceSetup.getActorSystem().actorOf(NetconfNodeActor.props(
- netconfTopologyDeviceSetup, remoteDeviceId, DEFAULT_SCHEMA_REPOSITORY, DEFAULT_SCHEMA_REPOSITORY),
+ netconfTopologyDeviceSetup, remoteDeviceId, DEFAULT_SCHEMA_REPOSITORY, DEFAULT_SCHEMA_REPOSITORY,
+ actorResponseWaitTime),
NetconfTopologyUtils.createMasterActorName(remoteDeviceId.getName(), masterAddress));
remoteDeviceConnector.startRemoteDeviceConnection(masterActorRef);
private NetconfNodeManager createNodeDeviceManager() {
final NetconfNodeManager ndm =
new NetconfNodeManager(netconfTopologyDeviceSetup, remoteDeviceId, DEFAULT_SCHEMA_REPOSITORY,
- DEFAULT_SCHEMA_REPOSITORY);
+ DEFAULT_SCHEMA_REPOSITORY, actorResponseWaitTime);
ndm.registerDataTreeChangeListener(netconfTopologyDeviceSetup.getTopologyId(),
netconfTopologyDeviceSetup.getNode().getKey());
if (!isMaster) {
netconfNodeManager.refreshDevice(netconfTopologyDeviceSetup, remoteDeviceId);
}
- remoteDeviceConnector = new RemoteDeviceConnectorImpl(netconfTopologyDeviceSetup, remoteDeviceId);
+ remoteDeviceConnector = new RemoteDeviceConnectorImpl(netconfTopologyDeviceSetup, remoteDeviceId, actorResponseWaitTime);
if (isMaster) {
final Future<Object> future = Patterns.ask(masterActorRef, new RefreshSetupMasterActorData(
- netconfTopologyDeviceSetup, remoteDeviceId), NetconfTopologyUtils.TIMEOUT);
+ netconfTopologyDeviceSetup, remoteDeviceId), actorResponseWaitTime);
future.onComplete(new OnComplete<Object>() {
@Override
package org.opendaylight.netconf.topology.singleton.impl;
import akka.actor.ActorSystem;
+import akka.util.Timeout;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.Duration;
public class NetconfTopologyManager
implements ClusteredDataTreeChangeListener<Node>, NetconfTopologySingletonService, AutoCloseable {
private final String topologyId;
public NetconfTopologyManager(final DataBroker dataBroker, final RpcProviderRegistry rpcProviderRegistry,
- final ClusterSingletonServiceProvider clusterSingletonServiceProvider,
- final BindingAwareBroker bindingAwareBroker,
- final ScheduledThreadPool keepaliveExecutor, final ThreadPool processingExecutor,
- final Broker domBroker, final ActorSystemProvider actorSystemProvider, final EventExecutor eventExecutor,
- final NetconfClientDispatcher clientDispatcher, final String topologyId) {
+ final ClusterSingletonServiceProvider clusterSingletonServiceProvider,
+ final BindingAwareBroker bindingAwareBroker,
+ final ScheduledThreadPool keepaliveExecutor, final ThreadPool processingExecutor,
+ final Broker domBroker, final ActorSystemProvider actorSystemProvider, final EventExecutor eventExecutor,
+ final NetconfClientDispatcher clientDispatcher, final String topologyId) {
this.dataBroker = Preconditions.checkNotNull(dataBroker);
this.rpcProviderRegistry = Preconditions.checkNotNull(rpcProviderRegistry);
this.clusterSingletonServiceProvider = Preconditions.checkNotNull(clusterSingletonServiceProvider);
@Override
public void onDataTreeChanged(@Nonnull final Collection<DataTreeModification<Node>> changes) {
- for (DataTreeModification<Node> change : changes) {
+ for (final DataTreeModification<Node> change : changes) {
final DataObjectModification<Node> rootNode = change.getRootNode();
final InstanceIdentifier<Node> dataModifIdent = change.getRootPath().getRootIdentifier();
final NodeId nodeId = NetconfTopologyUtils.getNodeId(rootNode.getIdentifier());
}
}
- private void refreshNetconfDeviceContext(InstanceIdentifier<Node> instanceIdentifier, Node node) {
+ private void refreshNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
final NetconfTopologyContext context = contexts.get(instanceIdentifier);
context.refresh(createSetup(instanceIdentifier, node));
}
Preconditions.checkNotNull(netconfNode.getHost());
Preconditions.checkNotNull(netconfNode.getHost().getIpAddress());
+ final Timeout actorResponseWaitTime = new Timeout(Duration.create(netconfNode.getActorResponseWaitTime(),
+ "seconds"));
+
final ServiceGroupIdentifier serviceGroupIdent =
ServiceGroupIdentifier.create(instanceIdentifier.toString());
final NetconfTopologyContext newNetconfTopologyContext =
- new NetconfTopologyContext(createSetup(instanceIdentifier, node), serviceGroupIdent);
+ new NetconfTopologyContext(createSetup(instanceIdentifier, node), serviceGroupIdent,
+ actorResponseWaitTime);
final ClusterSingletonServiceRegistration clusterSingletonServiceRegistration =
clusterSingletonServiceProvider.registerClusterSingletonService(newNetconfTopologyContext);
try {
clusterRegistrations.get(instanceIdentifier).close();
contexts.get(instanceIdentifier).closeFinal();
- } catch (Exception e) {
+ } catch (final Exception e) {
LOG.warn("Error at closing topology context. InstanceIdentifier: " + instanceIdentifier);
}
contexts.remove(instanceIdentifier);
contexts.forEach((instanceIdentifier, netconfTopologyContext) -> {
try {
netconfTopologyContext.closeFinal();
- } catch (Exception e) {
+ } catch (final Exception e) {
LOG.error("Error at closing topology context. InstanceIdentifier: " + instanceIdentifier, e);
}
});
clusterRegistrations.forEach((instanceIdentifier, clusterSingletonServiceRegistration) -> {
try {
clusterSingletonServiceRegistration.close();
- } catch (Exception e) {
+ } catch (final Exception e) {
LOG.error("Error at unregistering from cluster. InstanceIdentifier: " + instanceIdentifier, e);
}
});
clusterRegistrations.clear();
}
- private ListenerRegistration<NetconfTopologyManager> registerDataTreeChangeListener(String topologyId) {
+ private ListenerRegistration<NetconfTopologyManager> registerDataTreeChangeListener(final String topologyId) {
final WriteTransaction wtx = dataBroker.newWriteOnlyTransaction();
initTopology(wtx, LogicalDatastoreType.CONFIGURATION, topologyId);
initTopology(wtx, LogicalDatastoreType.OPERATIONAL, topologyId);
Futures.addCallback(wtx.submit(), new FutureCallback<Void>() {
@Override
- public void onSuccess(Void result) {
+ public void onSuccess(final Void result) {
LOG.debug("topology initialization successful");
}
@Override
- public void onFailure(@Nonnull Throwable throwable) {
+ public void onFailure(@Nonnull final Throwable throwable) {
LOG.error("Unable to initialize netconf-topology, {}", throwable);
}
});
NetconfTopologyUtils.createTopologyListPath(topologyId).child(Node.class)), this);
}
- private void initTopology(final WriteTransaction wtx, final LogicalDatastoreType datastoreType, String topologyId) {
+ private void initTopology(final WriteTransaction wtx, final LogicalDatastoreType datastoreType, final String topologyId) {
final NetworkTopology networkTopology = new NetworkTopologyBuilder().build();
final InstanceIdentifier<NetworkTopology> networkTopologyId =
InstanceIdentifier.builder(NetworkTopology.class).build();
import akka.actor.ActorSystem;
import akka.dispatch.OnComplete;
import akka.pattern.Patterns;
+import akka.util.Timeout;
import com.google.common.base.Function;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.Futures;
import org.opendaylight.controller.md.sal.dom.spi.DefaultDOMRpcResult;
import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
import org.opendaylight.netconf.topology.singleton.impl.utils.ClusteringRpcException;
-import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
import org.opendaylight.netconf.topology.singleton.messages.SchemaPathMessage;
import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessage;
private final ActorRef masterActorRef;
private final ActorSystem actorSystem;
private final RemoteDeviceId id;
+ private final Timeout actorResponseWaitTime;
public ProxyDOMRpcService(final ActorSystem actorSystem, final ActorRef masterActorRef,
- final RemoteDeviceId remoteDeviceId) {
+ final RemoteDeviceId remoteDeviceId, final Timeout actorResponseWaitTime) {
this.actorSystem = actorSystem;
this.masterActorRef = masterActorRef;
id = remoteDeviceId;
+ this.actorResponseWaitTime = actorResponseWaitTime;
}
@Nonnull
final Future<Object> scalaFuture =
Patterns.ask(masterActorRef,
new InvokeRpcMessage(new SchemaPathMessage(type), normalizedNodeMessage),
- NetconfTopologyUtils.TIMEOUT);
+ actorResponseWaitTime);
final SettableFuture<DOMRpcResult> settableFuture = SettableFuture.create();
import akka.dispatch.Futures;
import akka.dispatch.OnComplete;
import akka.pattern.Patterns;
+import akka.util.Timeout;
import com.google.common.collect.Sets;
import java.util.Set;
import javax.annotation.Nonnull;
import org.opendaylight.controller.cluster.schema.provider.RemoteYangTextSourceProvider;
import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy;
-import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
import org.opendaylight.netconf.topology.singleton.messages.YangTextSchemaSourceRequest;
import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
import scala.concurrent.Future;
private final ActorRef masterRef;
private final ActorContext actorContext;
+ private final Timeout actorResponseWaitTime;
- public ProxyYangTextSourceProvider(final ActorRef masterRef, final ActorContext actorContext) {
+ public ProxyYangTextSourceProvider(final ActorRef masterRef, final ActorContext actorContext,
+ final Timeout actorResponseWaitTime) {
this.masterRef = masterRef;
this.actorContext = actorContext;
+ this.actorResponseWaitTime = actorResponseWaitTime;
}
@Override
@Nonnull final SourceIdentifier sourceIdentifier) {
final Future<Object> scalaFuture = Patterns.ask(masterRef,
- new YangTextSchemaSourceRequest(sourceIdentifier), NetconfTopologyUtils.TIMEOUT);
+ new YangTextSchemaSourceRequest(sourceIdentifier), actorResponseWaitTime);
final Promise.DefaultPromise<YangTextSchemaSourceSerializationProxy> promise = new Promise.DefaultPromise<>();
package org.opendaylight.netconf.topology.singleton.impl;
import akka.actor.ActorRef;
+import akka.util.Timeout;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
* synchronization locks.
*/
private static final Map<String, NetconfDevice.SchemaResourcesDTO> schemaResourcesDTOs = new HashMap<>();
+ private final Timeout actorResponseWaitTime;
// Initializes default constant instances for the case when the default schema repository
// directory cache/schema is used.
private NetconfConnectorDTO deviceCommunicatorDTO;
public RemoteDeviceConnectorImpl(final NetconfTopologySetup netconfTopologyDeviceSetup,
- final RemoteDeviceId remoteDeviceId) {
+ final RemoteDeviceId remoteDeviceId, final Timeout actorResponseWaitTime) {
this.netconfTopologyDeviceSetup = Preconditions.checkNotNull(netconfTopologyDeviceSetup);
this.remoteDeviceId = remoteDeviceId;
+ this.actorResponseWaitTime = actorResponseWaitTime;
}
@Override
RemoteDeviceHandler<NetconfSessionPreferences> salFacade = new MasterSalFacade(remoteDeviceId,
netconfTopologyDeviceSetup.getDomBroker(), netconfTopologyDeviceSetup.getBindingAwareBroker(),
- netconfTopologyDeviceSetup.getActorSystem(), deviceContextActorRef);
+ netconfTopologyDeviceSetup.getActorSystem(), deviceContextActorRef, actorResponseWaitTime);
if (keepaliveDelay > 0) {
LOG.info("{}: Adding keepalive facade.", remoteDeviceId);
salFacade = new KeepaliveSalFacade(remoteDeviceId, salFacade,
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
+import akka.util.Timeout;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
import org.opendaylight.controller.sal.core.api.Broker;
import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceNotificationService;
private final NetconfDeviceSalProvider salProvider;
private final ActorSystem actorSystem;
+ private final Timeout actorResponseWaitTime;
public SlaveSalFacade(final RemoteDeviceId id,
final Broker domBroker,
- final ActorSystem actorSystem) {
+ final ActorSystem actorSystem,
+ final Timeout actorResponseWaitTime) {
this.id = id;
this.salProvider = new NetconfDeviceSalProvider(id);
this.actorSystem = actorSystem;
+ this.actorResponseWaitTime = actorResponseWaitTime;
registerToSal(domBroker);
}
final NetconfDeviceNotificationService notificationService = new NetconfDeviceNotificationService();
final NetconfDOMTransaction proxyDOMTransactions =
- new NetconfProxyDOMTransaction(id, actorSystem, masterActorRef);
+ new NetconfProxyDOMTransaction(id, actorSystem, masterActorRef, actorResponseWaitTime);
final NetconfDOMDataBroker netconfDeviceDataBroker =
new NetconfDOMDataBroker(actorSystem, id, proxyDOMTransactions);
unregisterSlaveMountPoint();
try {
salProvider.getMountInstance().close();
- } catch (Exception exception) {
+ } catch (final Exception exception) {
LOG.warn("{}: Exception in closing slave sal facade: {}", id, exception);
}
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;
+import akka.util.Timeout;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
private List<SourceIdentifier> sourceIdentifiers;
private DOMRpcService deviceRpc;
private SlaveSalFacade slaveSalManager;
+ private final Timeout actorResponseWaitTime;
public static Props props(final NetconfTopologySetup setup,
final RemoteDeviceId id, final SchemaSourceRegistry schemaRegistry,
- final SchemaRepository schemaRepository) {
+ final SchemaRepository schemaRepository, final Timeout actorResponseWaitTime) {
return Props.create(NetconfNodeActor.class, () ->
- new NetconfNodeActor(setup, id, schemaRegistry, schemaRepository));
+ new NetconfNodeActor(setup, id, schemaRegistry, schemaRepository, actorResponseWaitTime));
}
private NetconfNodeActor(final NetconfTopologySetup setup,
- final RemoteDeviceId id, SchemaSourceRegistry schemaRegistry,
- final SchemaRepository schemaRepository) {
+ final RemoteDeviceId id, final SchemaSourceRegistry schemaRegistry,
+ final SchemaRepository schemaRepository, final Timeout actorResponseWaitTime) {
this.setup = setup;
this.id = id;
this.schemaRegistry = schemaRegistry;
this.schemaRepository = schemaRepository;
+ this.actorResponseWaitTime = actorResponseWaitTime;
}
@Override
public void onSuccess(final YangTextSchemaSource yangTextSchemaSource) {
try {
sender.tell(new YangTextSchemaSourceSerializationProxy(yangTextSchemaSource), getSelf());
- } catch (IOException exception) {
+ } catch (final IOException exception) {
sender.tell(exception.getCause(), getSelf());
}
}
});
}
- private void registerSlaveMountPoint(ActorRef masterReference) {
+ private void registerSlaveMountPoint(final ActorRef masterReference) {
if (this.slaveSalManager != null) {
slaveSalManager.close();
}
- slaveSalManager = new SlaveSalFacade(id, setup.getDomBroker(), setup.getActorSystem());
+ slaveSalManager = new SlaveSalFacade(id, setup.getDomBroker(), setup.getActorSystem(), actorResponseWaitTime);
final CheckedFuture<SchemaContext, SchemaResolutionException> remoteSchemaContext =
getSchemaContext(masterReference);
});
}
- private DOMRpcService getDOMRpcService(ActorRef masterReference) {
- return new ProxyDOMRpcService(setup.getActorSystem(), masterReference, id);
+ private DOMRpcService getDOMRpcService(final ActorRef masterReference) {
+ return new ProxyDOMRpcService(setup.getActorSystem(), masterReference, id, actorResponseWaitTime);
}
- private CheckedFuture<SchemaContext, SchemaResolutionException> getSchemaContext(ActorRef masterReference) {
+ private CheckedFuture<SchemaContext, SchemaResolutionException> getSchemaContext(final ActorRef masterReference) {
final RemoteYangTextSourceProvider remoteYangTextSourceProvider =
- new ProxyYangTextSourceProvider(masterReference, getContext());
+ new ProxyYangTextSourceProvider(masterReference, getContext(), actorResponseWaitTime);
final RemoteSchemaProvider remoteProvider = new RemoteSchemaProvider(remoteYangTextSourceProvider,
getContext().dispatcher());
import akka.actor.ActorSystem;
import akka.dispatch.OnComplete;
import akka.pattern.Patterns;
+import akka.util.Timeout;
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import org.opendaylight.controller.config.util.xml.DocumentedException;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
import org.opendaylight.netconf.topology.singleton.api.NetconfDOMTransaction;
-import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
private final RemoteDeviceId id;
private final ActorSystem actorSystem;
private final ActorRef masterContextRef;
+ private final Timeout actorResponseWaitTime;
public NetconfProxyDOMTransaction(final RemoteDeviceId id,
final ActorSystem actorSystem,
- final ActorRef masterContextRef) {
+ final ActorRef masterContextRef,
+ final Timeout actorResponseWaitTime) {
this.id = id;
this.actorSystem = actorSystem;
this.masterContextRef = masterContextRef;
+ this.actorResponseWaitTime = actorResponseWaitTime;
}
@Override
// node.
LOG.debug("{}: Requesting leader {} to open new transaction", id, masterContextRef);
final Future<Object> openTxFuture =
- Patterns.ask(masterContextRef, new OpenTransaction(), NetconfTopologyUtils.TIMEOUT);
+ Patterns.ask(masterContextRef, new OpenTransaction(), actorResponseWaitTime);
try {
// we have to wait here so we can see if tx can be opened
- Await.result(openTxFuture, NetconfTopologyUtils.TIMEOUT.duration());
+ Await.result(openTxFuture, actorResponseWaitTime.duration());
LOG.debug("{}: New transaction opened successfully", id);
} catch (final Exception e) {
LOG.error("{}: Failed to open new transaction", id, e);
final YangInstanceIdentifier path) {
final Future<Object> readScalaFuture =
- Patterns.ask(masterContextRef, new ReadRequest(store, path), NetconfTopologyUtils.TIMEOUT);
+ Patterns.ask(masterContextRef, new ReadRequest(store, path), actorResponseWaitTime);
LOG.trace("{}: Read {} via NETCONF: {}", id, store, path);
@Override
public Future<Boolean> exists(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
final Future<Object> existsScalaFuture =
- Patterns.ask(masterContextRef, new ExistsRequest(store, path), NetconfTopologyUtils.TIMEOUT);
+ Patterns.ask(masterContextRef, new ExistsRequest(store, path), actorResponseWaitTime);
LOG.trace("{}: Exists {} via NETCONF: {}", id, store, path);
@Override
public boolean cancel() {
final Future<Object> cancelScalaFuture =
- Patterns.ask(masterContextRef, new CancelRequest(), NetconfTopologyUtils.TIMEOUT);
+ Patterns.ask(masterContextRef, new CancelRequest(), actorResponseWaitTime);
LOG.trace("{}: Cancel {} via NETCONF", id);
try {
// here must be Await because AsyncWriteTransaction do not return future
- return (boolean) Await.result(cancelScalaFuture, NetconfTopologyUtils.TIMEOUT.duration());
+ return (boolean) Await.result(cancelScalaFuture, actorResponseWaitTime.duration());
} catch (Exception e) {
return false;
}
@Override
public Future<Void> submit() {
final Future<Object> submitScalaFuture =
- Patterns.ask(masterContextRef, new SubmitRequest(), NetconfTopologyUtils.TIMEOUT);
+ Patterns.ask(masterContextRef, new SubmitRequest(), actorResponseWaitTime);
LOG.trace("{}: Submit {} via NETCONF", id);
package org.opendaylight.netconf.topology.singleton.impl.utils;
-import akka.util.Timeout;
import java.io.File;
import java.math.BigDecimal;
import java.net.InetSocketAddress;
import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
import org.opendaylight.yangtools.yang.model.repo.util.FilesystemSchemaSourceCache;
import org.opendaylight.yangtools.yang.parser.repo.SharedSchemaRepository;
-import scala.concurrent.duration.Duration;
public class NetconfTopologyUtils {
private static final String DEFAULT_SCHEMA_REPOSITORY_NAME = "sal-netconf-connector";
- public static final Timeout TIMEOUT = new Timeout(Duration.create(10, "seconds"));
-
public static final long DEFAULT_REQUEST_TIMEOUT_MILLIS = 60000L;
public static final int DEFAULT_KEEPALIVE_DELAY = 0;
public static final boolean DEFAULT_RECONNECT_ON_CHANGED_SCHEMA = false;
DEFAULT_SCHEMA_REPOSITORY.createSchemaContextFactory(SchemaSourceFilter.ALWAYS_ACCEPT);
public static RemoteDeviceId createRemoteDeviceId(final NodeId nodeId, final NetconfNode node) {
- IpAddress ipAddress = node.getHost().getIpAddress();
- InetSocketAddress address = new InetSocketAddress(ipAddress.getIpv4Address() != null
+ final IpAddress ipAddress = node.getHost().getIpAddress();
+ final InetSocketAddress address = new InetSocketAddress(ipAddress.getIpv4Address() != null
? ipAddress.getIpv4Address().getValue() : ipAddress.getIpv6Address().getValue(),
node.getPort().getValue());
return new RemoteDeviceId(nodeId.getValue(), address);
}
- public static String createActorPath(String masterMember, String name) {
+ public static String createActorPath(final String masterMember, final String name) {
return masterMember + "/user/" + name;
}
- public static String createMasterActorName(String name, String masterAddress) {
+ public static String createMasterActorName(final String name, final String masterAddress) {
return masterAddress.replaceAll("//", "") + "_" + name;
}
final NetconfTopologySetup setup = mock(NetconfTopologySetup.class);
final Props props = NetconfNodeActor.props(setup, remoteDeviceId, DEFAULT_SCHEMA_REPOSITORY,
- DEFAULT_SCHEMA_REPOSITORY);
+ DEFAULT_SCHEMA_REPOSITORY, TIMEOUT);
system = ActorSystem.create();
final SchemaRepository schemaRepository = mock(SchemaRepository.class);
final SourceIdentifier sourceIdentifier = RevisionSourceIdentifier.create("testID", Optional.absent());
final Props props = NetconfNodeActor.props(mock(NetconfTopologySetup.class), remoteDeviceId,
- DEFAULT_SCHEMA_REPOSITORY, schemaRepository);
+ DEFAULT_SCHEMA_REPOSITORY, schemaRepository, TIMEOUT);
final ActorRef actorRefSchemaRepo = TestActorRef.create(system, props, "master_mocked_schema_repository");
final ActorContext actorContext = mock(ActorContext.class);
doReturn(system.dispatcher()).when(actorContext).dispatcher();
final ProxyYangTextSourceProvider proxyYang =
- new ProxyYangTextSourceProvider(actorRefSchemaRepo, actorContext);
+ new ProxyYangTextSourceProvider(actorRefSchemaRepo, actorContext, TIMEOUT);
// test if asking for source is resolved and sended back
final YangTextSchemaSource yangTextSchemaSource = new YangTextSchemaSource(sourceIdentifier) {
@Override
- protected MoreObjects.ToStringHelper addToStringAttributes(MoreObjects.ToStringHelper toStringHelper) {
+ protected MoreObjects.ToStringHelper addToStringAttributes(final MoreObjects.ToStringHelper toStringHelper) {
return null;
}
// test if slave get right identifiers from master
- final ProxyDOMRpcService slaveDomRPCService = new ProxyDOMRpcService(system, masterRef, remoteDeviceId);
+ final ProxyDOMRpcService slaveDomRPCService = new ProxyDOMRpcService(system, masterRef, remoteDeviceId, TIMEOUT);
final SchemaPath schemaPath = SchemaPath.create(true, QName.create("TestQname"));
final NormalizedNode<?, ?> outputNode = ImmutableContainerNodeBuilder.create()
}
- private String convertStreamToString(java.io.InputStream is) {
- java.util.Scanner s = new java.util.Scanner(is).useDelimiter("\\A");
+ private String convertStreamToString(final java.io.InputStream is) {
+ final java.util.Scanner s = new java.util.Scanner(is).useDelimiter("\\A");
return s.hasNext() ? s.next() : "";
}
.setBetweenAttemptsTimeoutMillis(100)
.setSchemaless(false)
.setTcpOnly(false)
+ .setActorResponseWaitTime(10)
.build();
final Node node = new NodeBuilder().setNodeId(new NodeId("node-id"))
.addAugmentation(NetconfNode.class, netconfNode).build();
private final DataTreeIdentifier<Node> rootPath;
private final DataObjectModification<Node> rootNode;
- CustomTreeModification(DataTreeIdentifier<Node> rootPath, DataObjectModification<Node> rootNode) {
+ CustomTreeModification(final DataTreeIdentifier<Node> rootPath, final DataObjectModification<Node> rootNode) {
this.rootPath = rootPath;
this.rootNode = rootNode;
}
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
+import akka.util.Timeout;
import io.netty.util.concurrent.EventExecutor;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
+import scala.concurrent.duration.Duration;
public class RemoteDeviceConnectorImplTest {
private static final NodeId NODE_ID = new NodeId("testing-node");
private static final String TOPOLOGY_ID = "testing-topology";
+ private static final Timeout TIMEOUT = new Timeout(Duration.create(5, "seconds"));
@Mock
private DataBroker dataBroker;
final RemoteDeviceHandler salFacade = mock(RemoteDeviceHandler.class);
final TestingRemoteDeviceConnectorImpl remoteDeviceConnection =
- new TestingRemoteDeviceConnectorImpl(builder.build(), remoteDeviceId, communicator, salFacade);
+ new TestingRemoteDeviceConnectorImpl(builder.build(), remoteDeviceId, communicator, salFacade, TIMEOUT);
final ActorRef masterRef = mock(ActorRef.class);
.build();
final RemoteDeviceConnectorImpl remoteDeviceConnection =
- new RemoteDeviceConnectorImpl(builder.build(), remoteDeviceId);
+ new RemoteDeviceConnectorImpl(builder.build(), remoteDeviceId, TIMEOUT);
final ActorRef masterRef = mock(ActorRef.class);
new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9999));
final RemoteDeviceConnectorImpl remoteDeviceConnection =
- new RemoteDeviceConnectorImpl(builder.build(), remoteDeviceId);
+ new RemoteDeviceConnectorImpl(builder.build(), remoteDeviceId, TIMEOUT);
final ActorRef masterRef = mock(ActorRef.class);
.build();
final RemoteDeviceConnectorImpl remoteDeviceConnection =
- new RemoteDeviceConnectorImpl(builder.build(), remoteDeviceId);
+ new RemoteDeviceConnectorImpl(builder.build(), remoteDeviceId, TIMEOUT);
final NetconfReconnectingClientConfiguration defaultClientConfig =
remoteDeviceConnection.getClientConfig(listener, testingNode);
.build();
final RemoteDeviceConnectorImpl remoteDeviceConnection =
- new RemoteDeviceConnectorImpl(builder.build(), remoteDeviceId);
+ new RemoteDeviceConnectorImpl(builder.build(), remoteDeviceId, TIMEOUT);
final ActorRef masterRef = mock(ActorRef.class);
import static org.mockito.Mockito.doReturn;
import akka.actor.ActorRef;
+import akka.util.Timeout;
import com.google.common.util.concurrent.Futures;
import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCommunicator;
private final RemoteDeviceHandler salFacade;
TestingRemoteDeviceConnectorImpl(final NetconfTopologySetup netconfTopologyDeviceSetup,
- final RemoteDeviceId remoteDeviceId,
- final NetconfDeviceCommunicator communicator,
- final RemoteDeviceHandler salFacade) {
- super(netconfTopologyDeviceSetup, remoteDeviceId);
+ final RemoteDeviceId remoteDeviceId,
+ final NetconfDeviceCommunicator communicator,
+ final RemoteDeviceHandler salFacade,
+ final Timeout actorResponseWaitTime) {
+ super(netconfTopologyDeviceSetup, remoteDeviceId, actorResponseWaitTime);
this.communicator = communicator;
this.salFacade = salFacade;
}
@Override
public NetconfConnectorDTO createDeviceCommunicator(final NodeId nodeId, final NetconfNode node,
final ActorRef deviceContextActorRef) {
-
final NetconfConnectorDTO connectorDTO = new NetconfConnectorDTO(communicator, salFacade);
doReturn(Futures.immediateCheckedFuture(null)).when(communicator).initializeRemoteConnection(any(), any());
return connectorDTO;
final NetconfTopologySetup setup = mock(NetconfTopologySetup.class);
final Props props = NetconfNodeActor.props(setup, remoteDeviceId, DEFAULT_SCHEMA_REPOSITORY,
- DEFAULT_SCHEMA_REPOSITORY);
+ DEFAULT_SCHEMA_REPOSITORY, TIMEOUT);
masterRef = TestActorRef.create(system, props, "master_read");
// Create slave data broker for testing proxy
final NetconfDOMTransaction proxyDOMTransactions =
- new NetconfProxyDOMTransaction(remoteDeviceId, system, masterRef);
+ new NetconfProxyDOMTransaction(remoteDeviceId, system, masterRef, TIMEOUT);
slaveDataBroker = new NetconfDOMDataBroker(system, remoteDeviceId, proxyDOMTransactions);
final NetconfTopologySetup setup = mock(NetconfTopologySetup.class);
final Props props = NetconfNodeActor.props(setup, remoteDeviceId, DEFAULT_SCHEMA_REPOSITORY,
- DEFAULT_SCHEMA_REPOSITORY);
+ DEFAULT_SCHEMA_REPOSITORY, TIMEOUT);
masterRef = TestActorRef.create(system, props, "master_read");
// Create slave data broker for testing proxy
final NetconfDOMTransaction proxyDOMTransactions =
- new NetconfProxyDOMTransaction(remoteDeviceId, system, masterRef);
+ new NetconfProxyDOMTransaction(remoteDeviceId, system, masterRef, TIMEOUT);
slaveDataBroker = new NetconfDOMDataBroker(system, remoteDeviceId, proxyDOMTransactions);
description "Limit of concurrent messages that can be send before reply messages are received.
If value <1 is provided, no limit will be enforced";
}
+
+ leaf actor-response-wait-time {
+ config true;
+ type uint16 {
+ range "1..max";
+ }
+ default 5;
+ description "Time that slave actor will wait for response from master.";
+ }
}
grouping netconf-node-connection-status {