<commons.opendaylight.version>1.6.0-SNAPSHOT</commons.opendaylight.version>
<controller.mdsal.version>1.3.0-SNAPSHOT</controller.mdsal.version>
<features.test.version>1.6.0-SNAPSHOT</features.test.version>
- <jersey-servlet.version>1.17</jersey-servlet.version>
<mdsal.version>2.0.0-SNAPSHOT</mdsal.version>
<mdsal.model.version>0.8.0-SNAPSHOT</mdsal.model.version>
<restconf.version>1.3.0-SNAPSHOT</restconf.version>
<dependency>
<groupId>com.typesafe</groupId>
<artifactId>config</artifactId>
- <version>${typesafe.config.version}</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<version>1.9.5</version>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>com.typesafe</groupId>
- <artifactId>config</artifactId>
- <version>1.2.1</version>
- </dependency>
<dependency>
<groupId>com.jayway.awaitility</groupId>
<artifactId>awaitility</artifactId>
import com.google.common.annotations.Beta;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
Node getInitialState(@Nonnull final NodeId nodeId, @Nonnull final Node configNode);
@Nonnull
- Node getFailedState(@Nonnull final NodeId nodeId, @Nonnull final Node configNode);
+ Node getFailedState(@Nonnull final NodeId nodeId, @Nullable final Node configNode);
}
import akka.actor.TypedActor.Receiver;
import com.google.common.annotations.Beta;
+import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
+import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
/**
* Node manager that handles communication between node managers and delegates calls to the customizable NodeManagerCallback
*/
@Beta
-public interface NodeManager extends InitialStateProvider, NodeListener, Receiver, RemoteNodeListener {
+public interface NodeManager extends InitialStateProvider, NodeListener, Receiver, RemoteNodeListener, RemoteDeviceHandler<NetconfSessionPreferences> {
}
import akka.actor.ActorSystem;
import akka.actor.TypedActor.Receiver;
import com.google.common.annotations.Beta;
+import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
+import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
/**
* Customizable layer that handles communication with your application.
*/
@Beta
-public interface NodeManagerCallback extends InitialStateProvider, NodeListener, Receiver {
+public interface NodeManagerCallback extends InitialStateProvider, NodeListener, Receiver, RemoteDeviceHandler<NetconfSessionPreferences> {
interface NodeManagerCallbackFactory<M> {
NodeManagerCallback create(String nodeId, String topologyId, ActorSystem actorSystem);
*/
void unregisterRoleCandidate();
+ /**
+ *
+ * @return True/False based on if this candidate is already registered into ownership service
+ */
+ boolean isCandidateRegistered();
+
}
* Customizable extension layer between the top level TopologyManager and NodeManager
*/
@Beta
-public interface TopologyManagerCallback extends NodeListener, Receiver, RoleChangeListener {
+public interface TopologyManagerCallback extends InitialStateProvider, NodeListener, Receiver, RoleChangeListener {
interface TopologyManagerCallbackFactory {
TopologyManagerCallback create(ActorSystem actorSystem, String topologyId);
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import javax.annotation.Nonnull;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
import org.opendaylight.netconf.topology.NodeManagerCallback;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeBuilder;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
public class ExampleNodeManagerCallback implements NodeManagerCallback {
public void onRoleChanged(RoleChangeDTO roleChangeDTO) {
}
+
+ @Override
+ public void onDeviceConnected(SchemaContext remoteSchemaContext, NetconfSessionPreferences netconfSessionPreferences, DOMRpcService deviceRpc) {
+
+ }
+
+ @Override
+ public void onDeviceDisconnected() {
+
+ }
+
+ @Override
+ public void onDeviceFailed(Throwable throwable) {
+
+ }
+
+ @Override
+ public void onNotification(DOMNotification domNotification) {
+
+ }
+
+ @Override
+ public void close() {
+
+ }
}
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.netconf.topology.NodeManager;
import org.opendaylight.netconf.topology.NodeManagerCallback.NodeManagerCallbackFactory;
public void onReceive(Object o, ActorRef actorRef) {
}
+
+ @Nonnull
+ @Override
+ public Node getInitialState(@Nonnull NodeId nodeId, @Nonnull Node configNode) {
+ return nodes.get(nodeId).getInitialState(nodeId, configNode);
+ }
+
+ @Nonnull
+ @Override
+ public Node getFailedState(@Nonnull NodeId nodeId, @Nullable Node configNode) {
+ return nodes.get(nodeId).getFailedState(nodeId, configNode);
+ }
}
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListenableFuture;
import javax.annotation.Nonnull;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
import org.opendaylight.netconf.topology.NodeManager;
import org.opendaylight.netconf.topology.NodeManagerCallback;
import org.opendaylight.netconf.topology.NodeManagerCallback.NodeManagerCallbackFactory;
import org.opendaylight.netconf.topology.util.messages.NormalizedNodeMessage;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Future;
private final String topologyId;
private final ActorSystem actorSystem;
- private boolean isMaster;
+ private boolean isMaster = false;
private NodeManagerCallback delegate;
private BaseNodeManager(final String nodeId,
@Override
public void onReceive(Object o, ActorRef actorRef) {
-
+ delegate.onReceive(o, actorRef);
}
@Override
return null;
}
+ @Override
+ public void onDeviceConnected(SchemaContext remoteSchemaContext, NetconfSessionPreferences netconfSessionPreferences, DOMRpcService deviceRpc) {
+ delegate.onDeviceConnected(remoteSchemaContext, netconfSessionPreferences, deviceRpc);
+ }
+
+ @Override
+ public void onDeviceDisconnected() {
+ delegate.onDeviceDisconnected();
+ }
+
+ @Override
+ public void onDeviceFailed(Throwable throwable) {
+ delegate.onDeviceFailed(throwable);
+ }
+
+ @Override
+ public void onNotification(DOMNotification domNotification) {
+ delegate.onNotification(domNotification);
+ }
+
+ @Override
+ public void close() {
+ // NOOP
+ }
+
/**
* Builder of BaseNodeManager instances that are proxied as TypedActors
*/
package org.opendaylight.netconf.topology.util;
+import akka.actor.ActorContext;
+import akka.actor.ActorIdentity;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Address;
+import akka.actor.Identify;
import akka.actor.TypedActor;
import akka.actor.TypedActorExtension;
import akka.actor.TypedProps;
import akka.cluster.ClusterEvent.UnreachableMember;
import akka.cluster.Member;
import akka.dispatch.OnComplete;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.netconf.topology.NodeManager;
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.netconf.topology.RoleChangeStrategy;
import org.opendaylight.netconf.topology.StateAggregator;
import org.opendaylight.netconf.topology.TopologyManager;
import org.opendaylight.yangtools.binding.data.codec.impl.BindingNormalizedNodeCodecRegistry;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.impl.Promise.DefaultPromise;
public final class BaseTopologyManager
implements TopologyManager {
private static final Logger LOG = LoggerFactory.getLogger(BaseTopologyManager.class);
+ private static final InstanceIdentifier<NetworkTopology> NETWORK_TOPOLOGY_PATH = InstanceIdentifier.builder(NetworkTopology.class).build();
+
+ private final KeyedInstanceIdentifier<Topology, TopologyKey> topologyListPath;
private final ActorSystem system;
private final TypedActorExtension typedExtension;
private final NodeWriter naSalNodeWriter;
private final String topologyId;
private final TopologyManagerCallback delegateTopologyHandler;
+ private final Set<NodeId> created = new HashSet<>();
- private final Map<NodeId, NodeManager> nodes = new HashMap<>();
private final Map<Address, TopologyManager> peers = new HashMap<>();
private TopologyManager masterPeer = null;
private final int id = new Random().nextInt();
// election has not yet happened
this.isMaster = isMaster;
+ this.topologyListPath = NETWORK_TOPOLOGY_PATH.child(Topology.class, new TopologyKey(new TopologyId(topologyId)));
+
LOG.debug("Base manager started ", +id);
}
public ListenableFuture<Node> onNodeCreated(final NodeId nodeId, final Node node) {
LOG.debug("TopologyManager({}) onNodeCreated received, nodeid: {} , isMaster: {}", id, nodeId.getValue(), isMaster);
+ if (created.contains(nodeId)) {
+ LOG.warn("Node{} already exists, triggering update..", nodeId);
+ return onNodeUpdated(nodeId, node);
+ }
+ created.add(nodeId);
final ArrayList<ListenableFuture<Node>> futures = new ArrayList<>();
if (isMaster) {
public void onFailure(final Throwable t) {
// If the combined connection attempt failed, set the node to connection failed
LOG.debug("Futures aggregation failed");
- naSalNodeWriter.update(nodeId, nodes.get(nodeId).getFailedState(nodeId, node));
+ naSalNodeWriter.update(nodeId, delegateTopologyHandler.getFailedState(nodeId, node));
// FIXME disconnect those which succeeded
// just issue a delete on delegateTopologyHandler that gets handled on lower level
}
public ListenableFuture<Node> onNodeUpdated(final NodeId nodeId, final Node node) {
LOG.debug("TopologyManager({}) onNodeUpdated received, nodeid: {}", id, nodeId.getValue());
- final ArrayList<ListenableFuture<Node>> futures = new ArrayList<>();
-
// Master needs to trigger onNodeUpdated on peers and combine results
if (isMaster) {
- futures.add(delegateTopologyHandler.onNodeUpdated(nodeId, node));
- for (TopologyManager topologyManager : peers.values()) {
- // convert binding into NormalizedNode for transfer
- final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> normalizedNodeEntry = codecRegistry.toNormalizedNode(getNodeIid(topologyId), node);
-
- // add a future into our futures that gets its completion status from the converted scala future
- final SettableFuture<Node> settableFuture = SettableFuture.create();
- futures.add(settableFuture);
- final Future<NormalizedNodeMessage> scalaFuture = topologyManager.onRemoteNodeUpdated(new NormalizedNodeMessage(normalizedNodeEntry.getKey(), normalizedNodeEntry.getValue()));
- scalaFuture.onComplete(new OnComplete<NormalizedNodeMessage>() {
- @Override
- public void onComplete(Throwable failure, NormalizedNodeMessage success) throws Throwable {
- if (failure != null) {
- settableFuture.setException(failure);
- return;
+ // first cleanup old node
+ final ListenableFuture<Void> deleteFuture = onNodeDeleted(nodeId);
+ final SettableFuture<Node> createFuture = SettableFuture.create();
+ final TopologyManager selfProxy = TypedActor.self();
+ final ActorContext context = TypedActor.context();
+ Futures.addCallback(deleteFuture, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(Void result) {
+ LOG.warn("Delete part of update succesfull, triggering create");
+ // trigger create on all nodes
+ Futures.addCallback(selfProxy.onNodeCreated(nodeId, node), new FutureCallback<Node>() {
+ @Override
+ public void onSuccess(Node result) {
+ createFuture.set(result);
}
- final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
- codecRegistry.fromNormalizedNode(success.getIdentifier(), success.getNode());
- final Node value = (Node) fromNormalizedNode.getValue();
-
- settableFuture.set(value);
- }
- }, TypedActor.context().dispatcher());
- }
- final ListenableFuture<Node> aggregatedFuture = aggregator.combineUpdateAttempts(futures);
- Futures.addCallback(aggregatedFuture, new FutureCallback<Node>() {
- @Override
- public void onSuccess(final Node result) {
- // FIXME make this (writing state data for nodes) optional and customizable
- // this should be possible with providing your own NodeWriter implementation, maybe rename this interface?
- naSalNodeWriter.update(nodeId, result);
+ @Override
+ public void onFailure(Throwable t) {
+ createFuture.setException(t);
+ }
+ }, context.dispatcher());
}
@Override
- public void onFailure(final Throwable t) {
- // If the combined connection attempt failed, set the node to connection failed
- naSalNodeWriter.update(nodeId, nodes.get(nodeId).getFailedState(nodeId, node));
- // FIXME disconnect those which succeeded
- // just issue a delete on delegateTopologyHandler that gets handled on lower level
+ public void onFailure(Throwable t) {
+ LOG.warn("Delete part of update failed, {}", t);
}
- });
-
- //combine peer futures
- return aggregatedFuture;
+ }, context.dispatcher());
+ return createFuture;
}
// Trigger update on this slave
@Override
public ListenableFuture<Void> onNodeDeleted(final NodeId nodeId) {
final ArrayList<ListenableFuture<Void>> futures = new ArrayList<>();
+ created.remove(nodeId);
// Master needs to trigger delete on peers and combine results
if (isMaster) {
public void onFailure(final Throwable t) {
// If the combined connection attempt failed, set the node to connection failed
LOG.debug("Futures aggregation failed");
- naSalNodeWriter.update(nodeId, nodes.get(nodeId).getFailedState(nodeId, null));
+ naSalNodeWriter.update(nodeId, delegateTopologyHandler.getFailedState(nodeId, null));
// FIXME disconnect those which succeeded
// just issue a delete on delegateTopologyHandler that gets handled on lower level
}
final String path = member.address() + PATH + topologyId;
LOG.debug("Actor at :{} is resolving topology actor for path {}", clusterExtension.selfAddress(), path);
- clusterExtension.system().actorSelection(path).tell(new CustomIdentifyMessage(clusterExtension.selfAddress()), TypedActor.context().self());
+ // first send basic identify message in case our messages have not been loaded through osgi yet to prevent crashing akka.
+ clusterExtension.system().actorSelection(path).tell(new Identify(member.address()), TypedActor.context().self());
} else if (message instanceof MemberExited) {
// remove peer
final Member member = ((MemberExited) message).member();
final String path = member.address() + PATH + topologyId;
LOG.debug("Actor at :{} is resolving topology actor for path {}", clusterExtension.selfAddress(), path);
+ clusterExtension.system().actorSelection(path).tell(new Identify(member.address()), TypedActor.context().self());
+ } else if (message instanceof ActorIdentity) {
+ LOG.debug("Received ActorIdentity message", message);
+ final String path = ((ActorIdentity) message).correlationId() + PATH + topologyId;
+ if (((ActorIdentity) message).getRef() == null) {
+ LOG.debug("ActorIdentity has null actor ref, retrying..", message);
+ final ActorRef self = TypedActor.context().self();
+ final ActorContext context = TypedActor.context();
+ system.scheduler().scheduleOnce(new FiniteDuration(5, TimeUnit.SECONDS), new Runnable() {
+ @Override
+ public void run() {
+ LOG.debug("Retrying identify message from master to node {} , full path {}", ((ActorIdentity) message).correlationId(), path);
+ context.system().actorSelection(path).tell(new Identify(((ActorIdentity) message).correlationId()), self);
+
+ }
+ }, system.dispatcher());
+ return;
+ }
+ LOG.debug("Actor at :{} is resolving topology actor for path {}, with a custom message", clusterExtension.selfAddress(), path);
+
clusterExtension.system().actorSelection(path).tell(new CustomIdentifyMessage(clusterExtension.selfAddress()), TypedActor.context().self());
} else if (message instanceof CustomIdentifyMessageReply) {
- LOG.debug("Received a custom identify reply message from: {}", ((CustomIdentifyMessageReply) message).getAddress());
+
+ LOG.warn("Received a custom identify reply message from: {}", ((CustomIdentifyMessageReply) message).getAddress());
if (!peers.containsKey(((CustomIdentifyMessage) message).getAddress())) {
final TopologyManager peer = typedExtension.typedActorOf(new TypedProps<>(TopologyManager.class, BaseTopologyManager.class), actorRef);
peers.put(((CustomIdentifyMessageReply) message).getAddress(), peer);
+ if (isMaster) {
+ resyncPeer(peer);
+ }
}
} else if (message instanceof CustomIdentifyMessage) {
- LOG.debug("Received a custom identify message from: {}", ((CustomIdentifyMessage) message).getAddress());
+ LOG.warn("Received a custom identify message from: {}", ((CustomIdentifyMessage) message).getAddress());
if (!peers.containsKey(((CustomIdentifyMessage) message).getAddress())) {
final TopologyManager peer = typedExtension.typedActorOf(new TypedProps<>(TopologyManager.class, BaseTopologyManager.class), actorRef);
peers.put(((CustomIdentifyMessage) message).getAddress(), peer);
+ if (isMaster) {
+ resyncPeer(peer);
+ }
}
actorRef.tell(new CustomIdentifyMessageReply(clusterExtension.selfAddress()), TypedActor.context().self());
}
}
+
+ private void resyncPeer(final TopologyManager peer) {
+ final ReadOnlyTransaction rTx = dataBroker.newReadOnlyTransaction();
+ final CheckedFuture<Optional<Topology>, ReadFailedException> read = rTx.read(LogicalDatastoreType.CONFIGURATION, topologyListPath);
+
+ Futures.addCallback(read, new FutureCallback<Optional<Topology>>() {
+ @Override
+ public void onSuccess(Optional<Topology> result) {
+ if (result.isPresent()) {
+ for (final Node node : result.get().getNode()) {
+ final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry = codecRegistry.toNormalizedNode(getNodeIid(topologyId), node);
+ peer.onRemoteNodeCreated(new NormalizedNodeMessage(entry.getKey(), entry.getValue()));
+ // we dont care about the future from now on since we will be notified by the onConnected event
+ }
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ LOG.error("Unable to read from datastore");
+ }
+ });
+
+ }
}
private final EntityOwnershipService entityOwnershipService;
private final String entityType;
private final String entityName;
+ private final Entity entity;
private NodeListener ownershipCandidate;
private EntityOwnershipCandidateRegistration candidateRegistration = null;
this.entityOwnershipService = entityOwnershipService;
this.entityType = entityType + "/" + entityName;
this.entityName = entityName;
+ this.entity = new Entity(this.entityType, entityName);
}
@Override
if (candidateRegistration != null) {
unregisterRoleCandidate();
}
- candidateRegistration = entityOwnershipService.registerCandidate(new Entity(entityType, entityName));
+ candidateRegistration = entityOwnershipService.registerCandidate(entity);
ownershipListenerRegistration = entityOwnershipService.registerListener(entityType, this);
} catch (CandidateAlreadyRegisteredException e) {
LOG.error("Candidate already registered for election", e);
@Override
public void unregisterRoleCandidate() {
LOG.debug("Unregistering role candidate");
- candidateRegistration.close();
- candidateRegistration = null;
- ownershipListenerRegistration.close();
- ownershipListenerRegistration = null;
+ if (candidateRegistration != null) {
+ candidateRegistration.close();
+ candidateRegistration = null;
+ }
+ if (ownershipListenerRegistration != null) {
+ ownershipListenerRegistration.close();
+ ownershipListenerRegistration = null;
+ }
+ }
+
+ @Override
+ public boolean isCandidateRegistered() {
+ return entityOwnershipService.isCandidateRegistered(entity);
}
@Override
}
+ @Override
+ public boolean isCandidateRegistered() {
+ return false;
+ }
+
@Override
public void onRoleChanged(RoleChangeDTO roleChangeDTO) {
private NodeListener ownershipCandidate;
private final String entityType;
// use topologyId as entityName
- private final String entityName;
+ private final Entity entity;
private EntityOwnershipCandidateRegistration candidateRegistration = null;
private EntityOwnershipListenerRegistration ownershipListenerRegistration = null;
this.dataBroker = dataBroker;
this.entityOwnershipService = entityOwnershipService;
this.entityType = entityType;
- this.entityName = entityName;
+ this.entity = new Entity(entityType, entityName);
datastoreListenerRegistration = null;
}
if (candidateRegistration != null) {
unregisterRoleCandidate();
}
- candidateRegistration = entityOwnershipService.registerCandidate(new Entity(entityType, entityName));
+ candidateRegistration = entityOwnershipService.registerCandidate(entity);
ownershipListenerRegistration = entityOwnershipService.registerListener(entityType, this);
} catch (CandidateAlreadyRegisteredException e) {
LOG.error("Candidate already registered for election", e);
ownershipListenerRegistration = null;
}
+ @Override
+ public boolean isCandidateRegistered() {
+ return entityOwnershipService.isCandidateRegistered(entity);
+ }
+
@Override
public void onRoleChanged(RoleChangeDTO roleChangeDTO) {
if (roleChangeDTO.isOwner()) {
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.util.messages;
+
+import java.io.Serializable;
+
+public class AnnounceMasterMountPoint implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ public AnnounceMasterMountPoint() {}
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.util.messages;
+
+import java.io.Serializable;
+
+public class AnnounceMasterMountPointDown implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ public AnnounceMasterMountPointDown() {
+
+ }
+}
import io.netty.channel.Channel;
import io.netty.util.Timer;
import io.netty.util.concurrent.Promise;
+import java.net.SocketAddress;
import java.util.Set;
import org.opendaylight.netconf.mapping.api.NetconfOperationService;
import org.opendaylight.netconf.mapping.api.NetconfOperationServiceFactory;
}
return new NetconfServerSessionNegotiator(proposal, promise, channel, timer,
- getListener(Long.toString(sessionId)), connectionTimeoutMillis);
+ getListener(Long.toString(sessionId), channel.localAddress()), connectionTimeoutMillis);
}
- private NetconfServerSessionListener getListener(final String netconfSessionIdForReporting) {
- final NetconfOperationService service =
- this.aggregatedOpService.createService(netconfSessionIdForReporting);
+ private NetconfServerSessionListener getListener(final String netconfSessionIdForReporting, final SocketAddress socketAddress) {
+ final NetconfOperationService service = getOperationServiceForAddress(netconfSessionIdForReporting, socketAddress);
final NetconfOperationRouter operationRouter =
new NetconfOperationRouterImpl(service, monitoringService, netconfSessionIdForReporting);
return new NetconfServerSessionListener(operationRouter, monitoringService, service);
}
+ protected NetconfOperationService getOperationServiceForAddress(final String netconfSessionIdForReporting, final SocketAddress socketAddress) {
+ return this.aggregatedOpService.createService(netconfSessionIdForReporting);
+ }
+
+ protected final NetconfOperationServiceFactory getOperationServiceFactory() {
+ return aggregatedOpService;
+ }
+
private NetconfHelloMessage createHelloMessage(final long sessionId, final NetconfMonitoringService capabilityProvider) throws NetconfDocumentedException {
return NetconfHelloMessage.createServerHello(Sets.union(transformCapabilities(capabilityProvider.getCapabilities()), baseCapabilities), sessionId);
}
import org.openexi.proc.common.EXIOptionsException;
import org.w3c.dom.Element;
import org.w3c.dom.NodeList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public final class EXIParameters {
private static final String EXI_PARAMETER_ALIGNMENT = "alignment";
private static final String EXI_FIDELITY_PREFIXES = "prefixes";
private final EXIOptions options;
+ private static final Logger LOG = LoggerFactory.getLogger(EXIParameters.class);
private EXIParameters(final EXIOptions options) {
this.options = Preconditions.checkNotNull(options);
public static EXIParameters fromXmlElement(final XmlElement root) throws EXIOptionsException {
final EXIOptions options = new EXIOptions();
-
- options.setAlignmentType(AlignmentType.bitPacked);
-
final NodeList alignmentElements = root.getElementsByTagName(EXI_PARAMETER_ALIGNMENT);
if (alignmentElements.getLength() > 0) {
final Element alignmentElement = (Element) alignmentElements.item(0);
final String alignmentTextContent = alignmentElement.getTextContent().trim();
switch (alignmentTextContent) {
- case EXI_PARAMETER_BIT_PACKED:
- options.setAlignmentType(AlignmentType.bitPacked);
- break;
case EXI_PARAMETER_BYTE_ALIGNED:
options.setAlignmentType(AlignmentType.byteAligned);
break;
case EXI_PARAMETER_PRE_COMPRESSION:
options.setAlignmentType(AlignmentType.preCompress);
break;
+ default:
+ LOG.warn("Unexpected value in alignmentTextContent: {} , using default value", alignmentTextContent);
+ case EXI_PARAMETER_BIT_PACKED:
+ options.setAlignmentType(AlignmentType.bitPacked);
+ break;
}
+ } else {
+ options.setAlignmentType(AlignmentType.bitPacked);
}
if (root.getElementsByTagName(EXI_PARAMETER_FIDELITY).getLength() > 0) {
<dependency>
<groupId>com.typesafe</groupId>
<artifactId>config</artifactId>
- <version>${typesafe.config.version}</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<version>1.6.5</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>yang-model-api</artifactId>
+ </dependency>
</dependencies>
<build>
import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
import org.opendaylight.controller.sal.core.api.Provider;
import org.opendaylight.netconf.client.NetconfClientDispatcher;
+import org.opendaylight.netconf.client.NetconfClientSessionListener;
import org.opendaylight.netconf.client.conf.NetconfClientConfiguration;
import org.opendaylight.netconf.client.conf.NetconfReconnectingClientConfiguration;
import org.opendaylight.netconf.client.conf.NetconfReconnectingClientConfigurationBuilder;
private static final Logger LOG = LoggerFactory.getLogger(AbstractNetconfTopology.class);
- private static final long DEFAULT_REQUEST_TIMEOUT_MILIS = 60000L;
- private static final int DEFAULT_KEEPALIVE_DELAY = 0;
- private static final boolean DEFAULT_RECONNECT_ON_CHANGED_SCHEMA = false;
- private static final int DEFAULT_MAX_CONNECTION_ATTEMPTS = 0;
- private static final int DEFAULT_BETWEEN_ATTEMPTS_TIMEOUT_MILLIS = 2000;
- private static final BigDecimal DEFAULT_SLEEP_FACTOR = new BigDecimal(1.5);
+ protected static final long DEFAULT_REQUEST_TIMEOUT_MILIS = 60000L;
+ protected static final int DEFAULT_KEEPALIVE_DELAY = 0;
+ protected static final boolean DEFAULT_RECONNECT_ON_CHANGED_SCHEMA = false;
+ protected static final int DEFAULT_MAX_CONNECTION_ATTEMPTS = 0;
+ protected static final int DEFAULT_BETWEEN_ATTEMPTS_TIMEOUT_MILLIS = 2000;
+ protected static final BigDecimal DEFAULT_SLEEP_FACTOR = new BigDecimal(1.5);
private static FilesystemSchemaSourceCache<YangTextSchemaSource> CACHE = null;
//keep track of already initialized repositories to avoid adding redundant listeners
protected final String topologyId;
private final NetconfClientDispatcher clientDispatcher;
protected final BindingAwareBroker bindingAwareBroker;
- private final Broker domBroker;
+ protected final Broker domBroker;
private final EventExecutor eventExecutor;
- private final ScheduledThreadPool keepaliveExecutor;
- private final ThreadPool processingExecutor;
- private final SharedSchemaRepository sharedSchemaRepository;
+ protected final ScheduledThreadPool keepaliveExecutor;
+ protected final ThreadPool processingExecutor;
+ protected final SharedSchemaRepository sharedSchemaRepository;
- private SchemaSourceRegistry schemaRegistry = null;
- private SchemaContextFactory schemaContextFactory = null;
+ protected SchemaSourceRegistry schemaRegistry = null;
+ protected SchemaContextFactory schemaContextFactory = null;
protected DOMMountPointService mountPointService = null;
protected DataBroker dataBroker = null;
return Futures.immediateFuture(null);
}
- private ListenableFuture<NetconfDeviceCapabilities> setupConnection(final NodeId nodeId,
+ protected ListenableFuture<NetconfDeviceCapabilities> setupConnection(final NodeId nodeId,
final Node configNode) {
final NetconfNode netconfNode = configNode.getAugmentation(NetconfNode.class);
final NetconfConnectorDTO deviceCommunicatorDTO = createDeviceCommunicator(nodeId, netconfNode);
final NetconfDeviceCommunicator deviceCommunicator = deviceCommunicatorDTO.getCommunicator();
- final NetconfReconnectingClientConfiguration clientConfig = getClientConfig(deviceCommunicator, netconfNode);
+ final NetconfClientSessionListener netconfClientSessionListener = deviceCommunicatorDTO.getSessionListener();
+ final NetconfReconnectingClientConfiguration clientConfig = getClientConfig(netconfClientSessionListener, netconfNode);
final ListenableFuture<NetconfDeviceCapabilities> future = deviceCommunicator.initializeRemoteConnection(clientDispatcher, clientConfig);
+
activeConnectors.put(nodeId, deviceCommunicatorDTO);
Futures.addCallback(future, new FutureCallback<NetconfDeviceCapabilities>() {
return future;
}
- private NetconfConnectorDTO createDeviceCommunicator(final NodeId nodeId,
+ protected NetconfConnectorDTO createDeviceCommunicator(final NodeId nodeId,
final NetconfNode node) {
//setup default values since default value is not supported yet in mdsal
// TODO remove this when mdsal starts supporting default values
return new NetconfConnectorDTO(new NetconfDeviceCommunicator(remoteDeviceId, device), salFacade);
}
- public NetconfReconnectingClientConfiguration getClientConfig(final NetconfDeviceCommunicator listener, NetconfNode node) {
+ public NetconfReconnectingClientConfiguration getClientConfig(final NetconfClientSessionListener listener, NetconfNode node) {
//setup default values since default value is not supported yet in mdsal
// TODO remove this when mdsal starts supporting default values
}
}
- protected static final class NetconfConnectorDTO {
+ protected static class NetconfConnectorDTO {
private final NetconfDeviceCommunicator communicator;
private final RemoteDeviceHandler<NetconfSessionPreferences> facade;
- private NetconfConnectorDTO(final NetconfDeviceCommunicator communicator, final RemoteDeviceHandler<NetconfSessionPreferences> facade) {
+ public NetconfConnectorDTO(final NetconfDeviceCommunicator communicator, final RemoteDeviceHandler<NetconfSessionPreferences> facade) {
this.communicator = communicator;
this.facade = facade;
}
public RemoteDeviceHandler<NetconfSessionPreferences> getFacade() {
return facade;
}
+
+ public NetconfClientSessionListener getSessionListener() {
+ return communicator;
+ }
}
}
package org.opendaylight.netconf.topology;
+import akka.actor.ActorContext;
+import akka.actor.ActorRef;
import com.google.common.util.concurrent.ListenableFuture;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
ListenableFuture<Void> disconnectNode(NodeId nodeId);
- void registerMountPoint(NodeId nodeId);
+ /**
+ * register master mount point
+ * @param context
+ * @param nodeId
+ */
+ void registerMountPoint(ActorContext context, NodeId nodeId);
+
+ /**
+ * register slave mountpoint with the provided ActorRef
+ * @param context
+ * @param nodeId
+ * @param masterRef
+ */
+ void registerMountPoint(ActorContext context, NodeId nodeId, ActorRef masterRef);
void unregisterMountPoint(NodeId nodeId);
package org.opendaylight.netconf.topology.impl;
+import akka.actor.ActorContext;
+import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.TypedActor;
import akka.actor.TypedActorExtension;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import io.netty.util.concurrent.EventExecutor;
+import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Collections;
import javassist.ClassPool;
import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
import org.opendaylight.controller.sal.core.api.Broker;
import org.opendaylight.netconf.client.NetconfClientDispatcher;
+import org.opendaylight.netconf.client.NetconfClientSessionListener;
import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
+import org.opendaylight.netconf.sal.connect.netconf.NetconfDevice;
+import org.opendaylight.netconf.sal.connect.netconf.NetconfStateSchemas;
import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
+import org.opendaylight.netconf.sal.connect.netconf.sal.KeepaliveSalFacade;
import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
import org.opendaylight.netconf.topology.AbstractNetconfTopology;
import org.opendaylight.netconf.topology.NetconfTopology;
import org.opendaylight.netconf.topology.TopologyManagerCallback;
import org.opendaylight.netconf.topology.TopologyManagerCallback.TopologyManagerCallbackFactory;
import org.opendaylight.netconf.topology.example.LoggingSalNodeWriter;
+import org.opendaylight.netconf.topology.pipeline.ClusteredNetconfDevice;
+import org.opendaylight.netconf.topology.pipeline.ClusteredNetconfDeviceCommunicator;
+import org.opendaylight.netconf.topology.pipeline.ClusteredNetconfDeviceCommunicator.NetconfClientSessionListenerRegistration;
import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade;
import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade.ConnectionStatusListenerRegistration;
import org.opendaylight.netconf.topology.util.BaseTopologyManager;
import org.opendaylight.netconf.topology.util.NodeRoleChangeStrategy;
import org.opendaylight.netconf.topology.util.NodeWriter;
import org.opendaylight.netconf.topology.util.TopologyRoleChangeStrategy;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.IpAddress;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.$YangModuleInfoImpl;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
import org.opendaylight.yangtools.binding.data.codec.gen.impl.StreamWriterGenerator;
import org.opendaylight.yangtools.binding.data.codec.impl.BindingNormalizedNodeCodecRegistry;
LOG.warn("Clustered netconf topo started");
}
+
+
@Override
public void onSessionInitiated(final ProviderContext session) {
dataBroker = session.getSALService(DataBroker.class);
public void close() throws Exception {
// close all existing connectors, delete whole topology in datastore?
for (NetconfConnectorDTO connectorDTO : activeConnectors.values()) {
- connectorDTO.getCommunicator().disconnect();
+ connectorDTO.getCommunicator().close();
}
activeConnectors.clear();
}
+ @Override
+ protected NetconfConnectorDTO createDeviceCommunicator(final NodeId nodeId,
+ final NetconfNode node) {
+ //setup default values since default value is not supported yet in mdsal
+ // TODO remove this when mdsal starts supporting default values
+ final Long defaultRequestTimeoutMillis = node.getDefaultRequestTimeoutMillis() == null ? DEFAULT_REQUEST_TIMEOUT_MILIS : node.getDefaultRequestTimeoutMillis();
+ final Long keepaliveDelay = node.getKeepaliveDelay() == null ? DEFAULT_KEEPALIVE_DELAY : node.getKeepaliveDelay();
+ final Boolean reconnectOnChangedSchema = node.isReconnectOnChangedSchema() == null ? DEFAULT_RECONNECT_ON_CHANGED_SCHEMA : node.isReconnectOnChangedSchema();
+
+ IpAddress ipAddress = node.getHost().getIpAddress();
+ InetSocketAddress address = new InetSocketAddress(ipAddress.getIpv4Address() != null ?
+ ipAddress.getIpv4Address().getValue() : ipAddress.getIpv6Address().getValue(),
+ node.getPort().getValue());
+ RemoteDeviceId remoteDeviceId = new RemoteDeviceId(nodeId.getValue(), address);
+
+ RemoteDeviceHandler<NetconfSessionPreferences> salFacade =
+ createSalFacade(remoteDeviceId, domBroker, bindingAwareBroker, defaultRequestTimeoutMillis);
+
+ if (keepaliveDelay > 0) {
+ LOG.warn("Adding keepalive facade, for device {}", nodeId);
+ salFacade = new KeepaliveSalFacade(remoteDeviceId, salFacade, keepaliveExecutor.getExecutor(), keepaliveDelay);
+ }
+
+ NetconfDevice.SchemaResourcesDTO schemaResourcesDTO =
+ new NetconfDevice.SchemaResourcesDTO(schemaRegistry, schemaContextFactory, new NetconfStateSchemas.NetconfStateSchemasResolverImpl());
+
+ NetconfDevice device = new ClusteredNetconfDevice(schemaResourcesDTO, remoteDeviceId, salFacade,
+ processingExecutor.getExecutor(), sharedSchemaRepository, actorSystem, topologyId, nodeId.getValue(), TypedActor.context());
+
+ return new NetconfConnectorDTO(new ClusteredNetconfDeviceCommunicator(remoteDeviceId, device, entityOwnershipService), salFacade);
+ }
+
@Override
protected RemoteDeviceHandler<NetconfSessionPreferences> createSalFacade(final RemoteDeviceId id, final Broker domBroker, final BindingAwareBroker bindingBroker, long defaultRequestTimeoutMillis) {
- return new TopologyMountPointFacade(id, domBroker, bindingBroker, defaultRequestTimeoutMillis);
+ return new TopologyMountPointFacade(topologyId, id, domBroker, bindingBroker, defaultRequestTimeoutMillis);
}
@Override
- public void registerMountPoint(NodeId nodeId) {
- ((TopologyMountPointFacade) activeConnectors.get(nodeId).getFacade()).registerMountPoint();
+ public void registerMountPoint(final ActorContext context, final NodeId nodeId) {
+ ((TopologyMountPointFacade) activeConnectors.get(nodeId).getFacade()).registerMountPoint(actorSystem, context);
}
@Override
- public void unregisterMountPoint(NodeId nodeId) {
+ public void registerMountPoint(final ActorContext context, final NodeId nodeId, final ActorRef masterRef) {
+ ((TopologyMountPointFacade) activeConnectors.get(nodeId).getFacade()).registerMountPoint(actorSystem, context, masterRef);
+ }
+
+ @Override
+ public void unregisterMountPoint(final NodeId nodeId) {
Preconditions.checkState(activeConnectors.containsKey(nodeId), "Cannot unregister nonexistent mountpoint");
((TopologyMountPointFacade) activeConnectors.get(nodeId).getFacade()).unregisterMountPoint();
}
return Collections.emptySet();
}
+ public NetconfClientSessionListenerRegistration registerNetconfClientSessionListener(final NodeId node, final NetconfClientSessionListener listener) {
+ Preconditions.checkState(activeConnectors.containsKey(node), "Need to connect a node before a session listener can be registered");
+ return ((ClusteredNetconfDeviceCommunicator) activeConnectors.get(node).getCommunicator()).registerNetconfClientSessionListener(listener);
+ }
+
static class TopologyCallbackFactory implements TopologyManagerCallbackFactory {
private final NetconfTopology netconfTopology;
package org.opendaylight.netconf.topology.impl;
+import akka.actor.ActorContext;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.TypedActor;
import javax.annotation.Nullable;
import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
-import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
+import org.opendaylight.netconf.api.NetconfMessage;
+import org.opendaylight.netconf.api.NetconfTerminationReason;
+import org.opendaylight.netconf.client.NetconfClientSession;
+import org.opendaylight.netconf.client.NetconfClientSessionListener;
import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCapabilities;
import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
import org.opendaylight.netconf.topology.NetconfTopology;
+import org.opendaylight.netconf.topology.NodeManager;
import org.opendaylight.netconf.topology.NodeManagerCallback;
import org.opendaylight.netconf.topology.RoleChangeStrategy;
import org.opendaylight.netconf.topology.TopologyManager;
+import org.opendaylight.netconf.topology.pipeline.ClusteredNetconfDeviceCommunicator.NetconfClientSessionListenerRegistration;
import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade.ConnectionStatusListenerRegistration;
+import org.opendaylight.netconf.topology.util.BaseNodeManager;
import org.opendaylight.netconf.topology.util.BaseTopologyManager;
+import org.opendaylight.netconf.topology.util.messages.AnnounceMasterMountPoint;
+import org.opendaylight.netconf.topology.util.messages.AnnounceMasterMountPointDown;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeConnectionStatus.ConnectionStatus;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
-public class NetconfNodeManagerCallback implements NodeManagerCallback, RemoteDeviceHandler<NetconfSessionPreferences>{
+public class NetconfNodeManagerCallback implements NodeManagerCallback, NetconfClientSessionListener{
private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeManagerCallback.class);
private String nodeId;
private String topologyId;
private TopologyManager topologyManager;
+ private NodeManager nodeManager;
+ // cached context so that we can use it in callbacks from topology
+ private ActorContext cachedContext;
private Node currentConfig;
private Node currentOperationalNode;
- private ConnectionStatusListenerRegistration registration = null;
+ private ConnectionStatusListenerRegistration connectionStatusregistration = null;
+ private NetconfClientSessionListenerRegistration sessionListener = null;
+
+ private ActorRef masterDataBrokerRef = null;
+ private boolean connected = false;
public NetconfNodeManagerCallback(final String nodeId,
final String topologyId,
topologyManager = TypedActor.get(actorSystem).typedActorOf(new TypedProps<>(TopologyManager.class, BaseTopologyManager.class), actorRef);
}
}, actorSystem.dispatcher());
+
+ final Future<ActorRef> nodeRefFuture = actorSystem.actorSelection("/user/" + topologyId + "/" + nodeId).resolveOne(FiniteDuration.create(10L, TimeUnit.SECONDS));
+ nodeRefFuture.onComplete(new OnComplete<ActorRef>() {
+ @Override
+ public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable {
+ if (throwable != null) {
+ LOG.warn("Unable to resolve actor for path: {} ", "/user/" + topologyId + "/" + nodeId, throwable);
+ }
+ LOG.debug("Actor ref for path {} resolved", "/user/" + topologyId);
+ nodeManager = TypedActor.get(actorSystem).typedActorOf(new TypedProps<>(NodeManager.class, BaseNodeManager.class), actorRef);
+ }
+ }, actorSystem.dispatcher());
}
.setHost(netconfNode.getHost())
.setPort(netconfNode.getPort())
.setConnectionStatus(ConnectionStatus.Connecting)
+ .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<String>()).build())
+ .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
.setClusteredConnectionStatus(
new ClusteredConnectionStatusBuilder()
.setNodeStatus(
}
@Nonnull @Override public Node getFailedState(@Nonnull final NodeId nodeId,
- @Nonnull final Node configNode) {
- final NetconfNode netconfNode = configNode.getAugmentation(NetconfNode.class);
+ @Nullable final Node configNode) {
+ final NetconfNode netconfNode = configNode == null ? currentOperationalNode.getAugmentation(NetconfNode.class) : configNode.getAugmentation(NetconfNode.class);
- return new NodeBuilder()
+ final Node failedNode = new NodeBuilder()
.setNodeId(nodeId)
.addAugmentation(NetconfNode.class,
new NetconfNodeBuilder()
.setHost(netconfNode.getHost())
.setPort(netconfNode.getPort())
.setConnectionStatus(ConnectionStatus.UnableToConnect)
+ .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<String>()).build())
+ .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
.setClusteredConnectionStatus(
new ClusteredConnectionStatusBuilder()
.setNodeStatus(
.build())
.build())
.build();
+
+ if (currentOperationalNode == null) {
+ currentOperationalNode = failedNode;
+ }
+
+ return failedNode;
}
@Nonnull @Override public ListenableFuture<Node> onNodeCreated(@Nonnull final NodeId nodeId,
@Nonnull final Node configNode) {
+ cachedContext = TypedActor.context();
this.nodeId = nodeId.getValue();
this.currentConfig = configNode;
// set initial state before anything happens
Futures.addCallback(connectionFuture, new FutureCallback<NetconfDeviceCapabilities>() {
@Override
public void onSuccess(@Nullable NetconfDeviceCapabilities result) {
- registration = topologyDispatcher.registerConnectionStatusListener(nodeId, NetconfNodeManagerCallback.this);
+ connectionStatusregistration = topologyDispatcher.registerConnectionStatusListener(nodeId, nodeManager);
+ sessionListener = topologyDispatcher.registerNetconfClientSessionListener(nodeId, NetconfNodeManagerCallback.this);
}
@Override
.build())
.setHost(netconfNode.getHost())
.setPort(netconfNode.getPort())
- .setAvailableCapabilities(new AvailableCapabilitiesBuilder().build())
- .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().build())
+ .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<String>()).build())
+ .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
.build()).build();
return currentOperationalNode;
}
@Nonnull final Node configNode) {
// first disconnect this node
topologyDispatcher.unregisterMountPoint(nodeId);
- registration.close();
+
+ if (connectionStatusregistration != null) {
+ connectionStatusregistration.close();
+ }
topologyDispatcher.disconnectNode(nodeId);
// now reinit this connection with new settings
Futures.addCallback(connectionFuture, new FutureCallback<NetconfDeviceCapabilities>() {
@Override
public void onSuccess(@Nullable NetconfDeviceCapabilities result) {
- registration = topologyDispatcher.registerConnectionStatusListener(nodeId, NetconfNodeManagerCallback.this);
+ connectionStatusregistration = topologyDispatcher.registerConnectionStatusListener(nodeId, NetconfNodeManagerCallback.this);
}
@Override
.build())
.setHost(netconfNode.getHost())
.setPort(netconfNode.getPort())
- .setAvailableCapabilities(new AvailableCapabilitiesBuilder().build())
- .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().build())
+ .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<String>()).build())
+ .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
.build())
- .build();
+ .build();
}
});
}
@Nonnull @Override public ListenableFuture<Void> onNodeDeleted(@Nonnull final NodeId nodeId) {
// cleanup and disconnect
topologyDispatcher.unregisterMountPoint(nodeId);
- registration.close();
+
+ if(connectionStatusregistration != null) {
+ connectionStatusregistration.close();
+ }
roleChangeStrategy.unregisterRoleCandidate();
return topologyDispatcher.disconnectNode(nodeId);
}
@Override
public void onRoleChanged(final RoleChangeDTO roleChangeDTO) {
- if (roleChangeDTO.isOwner() && roleChangeDTO.wasOwner()) {
- return;
- }
+ topologyDispatcher.unregisterMountPoint(new NodeId(nodeId));
+
isMaster = roleChangeDTO.isOwner();
- //TODO instead of registering mount point, init remote schema repo when its done
- if (isMaster) {
- // unregister old mountPoint if ownership changed, register a new one
- topologyDispatcher.registerMountPoint(new NodeId(nodeId));
- } else {
- topologyDispatcher.unregisterMountPoint(new NodeId(nodeId));
- }
}
@Override
public void onDeviceConnected(final SchemaContext remoteSchemaContext, final NetconfSessionPreferences netconfSessionPreferences, final DOMRpcService deviceRpc) {
// we need to notify the higher level that something happened, get a current status from all other nodes, and aggregate a new result
- LOG.debug("onDeviceConnected received, registering role candidate");
- roleChangeStrategy.registerRoleCandidate(this);
+ connected = true;
+ if (isMaster) {
+ LOG.debug("Master is done with schema resolution, registering mount point");
+ topologyDispatcher.registerMountPoint(TypedActor.context(), new NodeId(nodeId));
+ } else if (masterDataBrokerRef != null) {
+ LOG.warn("Device connected, master already present in topology, registering mount point");
+ topologyDispatcher.registerMountPoint(cachedContext, new NodeId(nodeId), masterDataBrokerRef);
+ }
+
List<String> capabilityList = new ArrayList<>();
capabilityList.addAll(netconfSessionPreferences.getNetconfDeviceCapabilities().getNonModuleBasedCapabilities());
capabilityList.addAll(FluentIterable.from(netconfSessionPreferences.getNetconfDeviceCapabilities().getResolvedCapabilities()).transform(AVAILABLE_CAPABILITY_TRANSFORMER).toList());
.setUnavailableCapabilities(unavailableCapabilities)
.build())
.build();
- // TODO need to implement forwarding of this msg to master
topologyManager.notifyNodeStatusChange(new NodeId(nodeId));
}
@Override
public void onDeviceDisconnected() {
// we need to notify the higher level that something happened, get a current status from all other nodes, and aggregate a new result
- // no need to remove mountpoint, we should receive onRoleChanged callback after unregistering from election that unregisters the mountpoint
- LOG.debug("onDeviceDisconnected received, unregistering role candidate");
- topologyDispatcher.unregisterMountPoint(currentOperationalNode.getNodeId());
- roleChangeStrategy.unregisterRoleCandidate();
+ LOG.debug("onDeviceDisconnected received, unregistered role candidate");
+ connected = false;
+ if (isMaster) {
+ // set master to false since we are unregistering, the ownershipChanged callback can sometimes lag behind causing multiple nodes behaving as masters
+ isMaster = false;
+ // onRoleChanged() callback can sometimes lag behind, so unregister the mount right when it disconnects
+ topologyDispatcher.unregisterMountPoint(new NodeId(nodeId));
+ }
+
final NetconfNode netconfNode = currentConfig.getAugmentation(NetconfNode.class);
currentOperationalNode = new NodeBuilder().setNodeId(new NodeId(nodeId))
.addAugmentation(NetconfNode.class,
new NetconfNodeBuilder()
.setConnectionStatus(ConnectionStatus.Connecting)
+ .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<String>()).build())
+ .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
.setClusteredConnectionStatus(
new ClusteredConnectionStatusBuilder()
.setNodeStatus(
.setHost(netconfNode.getHost())
.setPort(netconfNode.getPort())
.build()).build();
- // TODO need to implement forwarding of this msg to master
topologyManager.notifyNodeStatusChange(new NodeId(nodeId));
}
public void onDeviceFailed(Throwable throwable) {
// we need to notify the higher level that something happened, get a current status from all other nodes, and aggregate a new result
// no need to remove mountpoint, we should receive onRoleChanged callback after unregistering from election that unregisters the mountpoint
- LOG.debug("onDeviceFailed received");
+ LOG.warn("Netconf node {} failed with {}", nodeId, throwable);
+ connected = false;
String reason = (throwable != null && throwable.getMessage() != null) ? throwable.getMessage() : UNKNOWN_REASON;
- roleChangeStrategy.unregisterRoleCandidate();
currentOperationalNode = new NodeBuilder().setNodeId(new NodeId(nodeId))
.addAugmentation(NetconfNode.class,
new NetconfNodeBuilder()
.setConnectionStatus(ConnectionStatus.UnableToConnect)
+ .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<String>()).build())
+ .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
.setClusteredConnectionStatus(
new ClusteredConnectionStatusBuilder()
.setNodeStatus(
topologyManager.notifyNodeStatusChange(new NodeId(nodeId));
}
-
@Override
public void onNotification(DOMNotification domNotification) {
//NOOP
}
@Override
- public void onReceive(Object o, ActorRef actorRef) {
+ public void onReceive(Object message, ActorRef actorRef) {
+ LOG.debug("Netconf node callback received message {}", message);
+ if (message instanceof AnnounceMasterMountPoint) {
+ masterDataBrokerRef = actorRef;
+ // candidate gets registered when mount point is already prepared so we can go ahead a register it
+ if (connected) {
+ topologyDispatcher.registerMountPoint(TypedActor.context(), new NodeId(nodeId), masterDataBrokerRef);
+ } else {
+ LOG.debug("Announce master mount point msg received but mount point is not ready yet");
+ }
+ } else if (message instanceof AnnounceMasterMountPointDown) {
+ LOG.debug("Master mountpoint went down");
+ masterDataBrokerRef = null;
+ topologyDispatcher.unregisterMountPoint(new NodeId(nodeId));
+ }
+ }
+ @Override
+ public void onSessionUp(NetconfClientSession netconfClientSession) {
+ //NetconfClientSession is up, we can register role candidate
+ LOG.debug("Netconf client session is up, registering role candidate");
+ roleChangeStrategy.registerRoleCandidate(nodeManager);
+ }
+
+ @Override
+ public void onSessionDown(NetconfClientSession netconfClientSession, Exception e) {
+ LOG.debug("Netconf client session is down, unregistering role candidate");
+ roleChangeStrategy.unregisterRoleCandidate();
+ }
+
+ @Override
+ public void onSessionTerminated(NetconfClientSession netconfClientSession, NetconfTerminationReason netconfTerminationReason) {
+ LOG.debug("Netconf client session is down, unregistering role candidate");
+ roleChangeStrategy.unregisterRoleCandidate();
+ }
+
+ @Override
+ public void onMessage(NetconfClientSession netconfClientSession, NetconfMessage netconfMessage) {
+ //NOOP
}
}
\ No newline at end of file
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeConnectionStatus.ConnectionStatus;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.AvailableCapabilities;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.ClusteredConnectionStatusBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.UnavailableCapabilities;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.clustered.connection.status.NodeStatus;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
public void onSuccess(final List<Node> result) {
Node base = null;
NetconfNode baseAugmentation = null;
+ AvailableCapabilities masterCaps = null;
+ UnavailableCapabilities unavailableMasterCaps = null;
final ArrayList<NodeStatus> statusList = new ArrayList<>();
for (final Node node : result) {
final NetconfNode netconfNode = node.getAugmentation(NetconfNode.class);
base = node;
baseAugmentation = netconfNode;
}
+ // we need to pull out caps from master, since slave does not go through resolution
+ if (masterCaps == null) {
+ masterCaps = netconfNode.getAvailableCapabilities();
+ unavailableMasterCaps = netconfNode.getUnavailableCapabilities();
+ }
+ if (netconfNode.getAvailableCapabilities().getAvailableCapability().size() > masterCaps.getAvailableCapability().size()) {
+ masterCaps = netconfNode.getAvailableCapabilities();
+ unavailableMasterCaps = netconfNode.getUnavailableCapabilities();
+ }
LOG.debug(netconfNode.toString());
statusList.addAll(netconfNode.getClusteredConnectionStatus().getNodeStatus());
}
if (base == null) {
base = result.get(0);
baseAugmentation = result.get(0).getAugmentation(NetconfNode.class);
- LOG.warn("All results {}", result.toString());
+ LOG.debug("All results {}", result.toString());
}
- LOG.warn("Base node: {}", base);
-
final Node aggregatedNode =
new NodeBuilder(base)
.addAugmentation(NetconfNode.class,
new ClusteredConnectionStatusBuilder()
.setNodeStatus(statusList)
.build())
+ .setAvailableCapabilities(masterCaps)
+ .setUnavailableCapabilities(unavailableMasterCaps)
.build())
.build();
+
future.set(aggregatedNode);
}
public void onSuccess(final List<Node> result) {
Node base = null;
NetconfNode baseAugmentation = null;
+ AvailableCapabilities masterCaps = null;
+ UnavailableCapabilities unavailableMasterCaps = null;
final ArrayList<NodeStatus> statusList = new ArrayList<>();
for (final Node node : result) {
final NetconfNode netconfNode = node.getAugmentation(NetconfNode.class);
base = node;
baseAugmentation = netconfNode;
}
+ // we need to pull out caps from master, since slave does not go through resolution
+ if (masterCaps == null) {
+ masterCaps = netconfNode.getAvailableCapabilities();
+ unavailableMasterCaps = netconfNode.getUnavailableCapabilities();
+ }
+ if (netconfNode.getAvailableCapabilities().getAvailableCapability().size() > masterCaps.getAvailableCapability().size()) {
+ masterCaps = netconfNode.getAvailableCapabilities();
+ unavailableMasterCaps = netconfNode.getUnavailableCapabilities();
+ }
LOG.debug(netconfNode.toString());
statusList.addAll(netconfNode.getClusteredConnectionStatus().getNodeStatus());
}
if (base == null) {
base = result.get(0);
baseAugmentation = result.get(0).getAugmentation(NetconfNode.class);
- LOG.warn("All results {}", result.toString());
+ LOG.debug("All results {}", result.toString());
}
final Node aggregatedNode =
new ClusteredConnectionStatusBuilder()
.setNodeStatus(statusList)
.build())
+ .setAvailableCapabilities(masterCaps)
+ .setUnavailableCapabilities(unavailableMasterCaps)
.build())
.build();
future.set(aggregatedNode);
package org.opendaylight.netconf.topology.impl;
+import akka.actor.ActorContext;
+import akka.actor.ActorRef;
import io.netty.util.concurrent.EventExecutor;
import java.util.Collection;
import javax.annotation.Nonnull;
}
@Override
- public void registerMountPoint(NodeId nodeId) {
+ public void registerMountPoint(ActorContext context, NodeId nodeId) {
+ throw new UnsupportedOperationException("MountPoint registration is not supported in regular topology, this happens automaticaly in the netconf pipeline");
+ }
+
+ @Override
+ public void registerMountPoint(ActorContext context, NodeId nodeId, ActorRef masterRef) {
throw new UnsupportedOperationException("MountPoint registration is not supported in regular topology, this happens automaticaly in the netconf pipeline");
}
createNodeManager(nodeId);
nodes.put(nodeId, naBaseNodeManager);
- // put initial state into datastore
- naSalNodeWriter.init(nodeId, naBaseNodeManager.getInitialState(nodeId, node));
+ // only master should put initial state into datastore
+ if (isMaster) {
+ naSalNodeWriter.init(nodeId, naBaseNodeManager.getInitialState(nodeId, node));
+ }
// trigger connect on this node
return naBaseNodeManager.onNodeCreated(nodeId, node);
@Override
public ListenableFuture<Node> onNodeUpdated(final NodeId nodeId, final Node node) {
- // put initial state into datastore
- naSalNodeWriter.init(nodeId, nodes.get(nodeId).getInitialState(nodeId, node));
+ // only master should put initial state into datastore
+ if (isMaster) {
+ naSalNodeWriter.init(nodeId, nodes.get(nodeId).getInitialState(nodeId, node));
+ }
// Trigger onNodeUpdated only on this node
return nodes.get(nodeId).onNodeUpdated(nodeId, node);
@Nonnull
@Override
public ListenableFuture<Node> getCurrentStatusForNode(@Nonnull NodeId nodeId) {
+ if (!nodes.containsKey(nodeId)) {
+ nodes.put(nodeId, createNodeManager(nodeId));
+ }
return nodes.get(nodeId).getCurrentStatusForNode(nodeId);
}
public void onReceive(Object o, ActorRef actorRef) {
}
+
+ @Nonnull
+ @Override
+ public Node getInitialState(@Nonnull NodeId nodeId, @Nonnull Node configNode) {
+ return nodes.get(nodeId).getInitialState(nodeId, configNode);
+ }
+
+ @Nonnull
+ @Override
+ public Node getFailedState(@Nonnull NodeId nodeId, @Nonnull Node configNode) {
+ return nodes.get(nodeId).getFailedState(nodeId, configNode);
+ }
}
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
+import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nonnull;
import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
private final InstanceIdentifier<NetworkTopology> networkTopologyPath;
private final KeyedInstanceIdentifier<Topology, TopologyKey> topologyListPath;
+ private final ReentrantLock lock = new ReentrantLock(true);
+
public TopologyNodeWriter(final String topologyId, final DataBroker dataBroker) {
this.topologyId = topologyId;
this.txChain = Preconditions.checkNotNull(dataBroker).createTransactionChain(new TransactionChainListener() {
@Override
public void init(@Nonnull NodeId id, @Nonnull Node operationalDataNode) {
- final WriteTransaction writeTx = txChain.newWriteOnlyTransaction();
-
- createNetworkTopologyIfNotPresent(writeTx);
- final InstanceIdentifier<Node> path = createBindingPathForTopology(new NodeKey(id), topologyId);
-
- LOG.trace("{}: Init device state transaction {} putting if absent operational data started. Putting data on path {}",
- id.getValue(), writeTx.getIdentifier(), path);
- writeTx.put(LogicalDatastoreType.OPERATIONAL, path, operationalDataNode);
- LOG.trace("{}: Init device state transaction {} putting operational data ended.",
- id.getValue(), writeTx.getIdentifier());
-
- commitTransaction(writeTx, "init", id);
+ lock.lock();
+ try {
+ final WriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+
+ createNetworkTopologyIfNotPresent(writeTx);
+ final InstanceIdentifier<Node> path = createBindingPathForTopology(new NodeKey(id), topologyId);
+
+ LOG.trace("{}: Init device state transaction {} putting if absent operational data started. Putting data on path {}",
+ id.getValue(), writeTx.getIdentifier(), path);
+ writeTx.put(LogicalDatastoreType.OPERATIONAL, path, operationalDataNode);
+ LOG.trace("{}: Init device state transaction {} putting operational data ended.",
+ id.getValue(), writeTx.getIdentifier());
+
+ commitTransaction(writeTx, "init", id);
+ } finally {
+ lock.unlock();
+ }
}
@Override
public void update(@Nonnull NodeId id, @Nonnull Node operationalDataNode) {
- final WriteTransaction writeTx = txChain.newWriteOnlyTransaction();
-
- final InstanceIdentifier<Node> path = createBindingPathForTopology(new NodeKey(id), topologyId);
- LOG.trace("{}: Update device state transaction {} merging operational data started. Putting data on path {}",
- id, writeTx.getIdentifier(), operationalDataNode);
- writeTx.put(LogicalDatastoreType.OPERATIONAL, path, operationalDataNode);
- LOG.trace("{}: Update device state transaction {} merging operational data ended.",
- id, writeTx.getIdentifier());
+ lock.lock();
+ try {
+ final WriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+
+ final InstanceIdentifier<Node> path = createBindingPathForTopology(new NodeKey(id), topologyId);
+ LOG.trace("{}: Update device state transaction {} merging operational data started. Putting data on path {}",
+ id, writeTx.getIdentifier(), operationalDataNode);
+ writeTx.put(LogicalDatastoreType.OPERATIONAL, path, operationalDataNode);
+ LOG.trace("{}: Update device state transaction {} merging operational data ended.",
+ id, writeTx.getIdentifier());
+
+ commitTransaction(writeTx, "update", id);
+ } finally {
+ lock.unlock();
+ }
- commitTransaction(writeTx, "update", id);
}
@Override
public void delete(@Nonnull NodeId id) {
- final WriteTransaction writeTx = txChain.newWriteOnlyTransaction();
-
- final InstanceIdentifier<Node> path = createBindingPathForTopology(new NodeKey(id), topologyId);
-
- LOG.trace(
- "{}: Close device state transaction {} removing all data started. Path: {}",
- id, writeTx.getIdentifier(), path);
- writeTx.delete(LogicalDatastoreType.OPERATIONAL, path);
- LOG.trace(
- "{}: Close device state transaction {} removing all data ended.",
- id, writeTx.getIdentifier());
-
- commitTransaction(writeTx, "close", id);
+ lock.lock();
+ try {
+ final WriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+
+ final InstanceIdentifier<Node> path = createBindingPathForTopology(new NodeKey(id), topologyId);
+
+ LOG.trace(
+ "{}: Close device state transaction {} removing all data started. Path: {}",
+ id, writeTx.getIdentifier(), path);
+ writeTx.delete(LogicalDatastoreType.OPERATIONAL, path);
+ LOG.trace(
+ "{}: Close device state transaction {} removing all data ended.",
+ id, writeTx.getIdentifier());
+
+ commitTransaction(writeTx, "close", id);
+ } finally {
+ lock.unlock();
+ }
}
private void commitTransaction(final WriteTransaction transaction, final String txType, final NodeId id) {
throw new IllegalStateException(id.getValue() + " Transaction(" + txType + ") not committed correctly", t);
}
});
-
}
private void createNetworkTopologyIfNotPresent(final WriteTransaction writeTx) {
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.pipeline;
+
+import akka.actor.TypedActor;
+import java.util.Set;
+import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
+import scala.concurrent.Future;
+
+public interface ClusteredDeviceSourcesResolver extends TypedActor.Receiver, TypedActor.PreStart {
+
+ Future<Set<SourceIdentifier>> getResolvedSources();
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+
+package org.opendaylight.netconf.topology.pipeline;
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.TypedActor;
+import akka.actor.TypedProps;
+import akka.cluster.Cluster;
+import akka.cluster.Member;
+import akka.dispatch.Futures;
+import akka.dispatch.OnComplete;
+import java.util.List;
+import java.util.Set;
+import org.opendaylight.controller.cluster.schema.provider.impl.RemoteSchemaProvider;
+import org.opendaylight.netconf.topology.pipeline.messages.AnnounceClusteredDeviceSourcesResolverUp;
+import org.opendaylight.netconf.topology.pipeline.messages.AnnounceMasterOnSameNodeUp;
+import org.opendaylight.netconf.topology.pipeline.messages.AnnounceMasterSourceProviderUp;
+import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceRepresentation;
+import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
+import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
+import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
+import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
+import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+import scala.concurrent.Promise;
+
+
+public class ClusteredDeviceSourcesResolverImpl implements ClusteredDeviceSourcesResolver {
+
+ private static Logger LOG = LoggerFactory.getLogger(ClusteredDeviceSourcesResolver.class);
+
+ private final String topologyId;
+ private final String nodeId;
+ private final ActorSystem actorSystem;
+ private final SchemaSourceRegistry schemaRegistry;
+ private final List<SchemaSourceRegistration<? extends SchemaSourceRepresentation>> sourceRegistrations;
+
+ private final Promise<Set<SourceIdentifier>> resolvedSourcesPromise;
+ private MasterSourceProvider remoteYangTextSourceProvider;
+
+ public ClusteredDeviceSourcesResolverImpl(String topologyId, String nodeId, ActorSystem actorSystem,
+ SchemaSourceRegistry schemaRegistry,
+ List<SchemaSourceRegistration<? extends SchemaSourceRepresentation>> sourceRegistrations) {
+ this.topologyId = topologyId;
+ this.nodeId = nodeId;
+ this.actorSystem = actorSystem;
+ this.schemaRegistry = schemaRegistry;
+ this.sourceRegistrations = sourceRegistrations;
+ resolvedSourcesPromise = Futures.promise();
+ }
+
+ @Override
+ public void preStart(){
+ Cluster cluster = Cluster.get(actorSystem);
+ for(Member node : cluster.state().getMembers()) {
+ if(!node.address().equals(cluster.selfAddress())) {
+ final String path = node.address() + "/user/" + topologyId + "/" + nodeId + "/masterSourceProvider";
+ actorSystem.actorSelection(path).tell(new AnnounceClusteredDeviceSourcesResolverUp(), TypedActor.context().self());
+ }
+ }
+ }
+
+ @Override
+ public void onReceive(Object o, ActorRef actorRef) {
+ if(o instanceof AnnounceMasterSourceProviderUp) {
+ if(remoteYangTextSourceProvider == null) {
+ remoteYangTextSourceProvider = TypedActor.get(actorSystem).typedActorOf(
+ new TypedProps<>(MasterSourceProvider.class,
+ MasterSourceProviderImpl.class), actorRef);
+ registerProvidedSourcesToSchemaRegistry();
+ }
+ } else if(o instanceof AnnounceMasterOnSameNodeUp) {
+ resolvedSourcesPromise.failure(new MasterSourceProviderOnSameNodeException());
+ }
+ }
+
+ private void registerProvidedSourcesToSchemaRegistry() {
+ Future<Set<SourceIdentifier>> sourcesFuture = remoteYangTextSourceProvider.getProvidedSources();
+ resolvedSourcesPromise.completeWith(sourcesFuture);
+ final RemoteSchemaProvider remoteProvider = new RemoteSchemaProvider(remoteYangTextSourceProvider, actorSystem.dispatcher());
+
+ sourcesFuture.onComplete(new OnComplete<Set<SourceIdentifier>>() {
+ @Override
+ public void onComplete(Throwable throwable, Set<SourceIdentifier> sourceIdentifiers) throws Throwable {
+ for (SourceIdentifier sourceId : sourceIdentifiers) {
+ sourceRegistrations.add(schemaRegistry.registerSchemaSource(remoteProvider,
+ PotentialSchemaSource.create(sourceId, YangTextSchemaSource.class, PotentialSchemaSource.Costs.REMOTE_IO.getValue())));
+ }
+ }
+ }, actorSystem.dispatcher());
+ }
+
+ @Override
+ public Future<Set<SourceIdentifier>> getResolvedSources() {
+ return resolvedSourcesPromise.future();
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.pipeline;
+
+import akka.actor.ActorContext;
+import akka.actor.ActorSystem;
+import akka.actor.TypedActor;
+import akka.actor.TypedProps;
+import akka.dispatch.OnComplete;
+import akka.japi.Creator;
+import com.google.common.base.Optional;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.netconf.api.NetconfMessage;
+import org.opendaylight.netconf.sal.connect.api.RemoteDeviceCommunicator;
+import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
+import org.opendaylight.netconf.sal.connect.netconf.NetconfDevice;
+import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCapabilities;
+import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCommunicator;
+import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
+import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceRpc;
+import org.opendaylight.netconf.sal.connect.netconf.schema.mapping.NetconfMessageTransformer;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.SimpleDateFormatUtil;
+import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
+import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ClusteredNetconfDevice extends NetconfDevice implements EntityOwnershipListener {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ClusteredNetconfDevice.class);
+
+ private boolean isMaster = false;
+ private NetconfDeviceCommunicator listener;
+ private NetconfSessionPreferences sessionPreferences;
+ private SchemaRepository schemaRepo;
+ private final ActorSystem actorSystem;
+ private final String topologyId;
+ private final String nodeId;
+ private final ActorContext cachedContext;
+
+ private MasterSourceProvider masterSourceProvider = null;
+ private ClusteredDeviceSourcesResolver resolver = null;
+
+ public ClusteredNetconfDevice(final SchemaResourcesDTO schemaResourcesDTO, final RemoteDeviceId id, final RemoteDeviceHandler<NetconfSessionPreferences> salFacade,
+ final ExecutorService globalProcessingExecutor, SchemaRepository schemaRepo, ActorSystem actorSystem, String topologyId, String nodeId,
+ ActorContext cachedContext) {
+ super(schemaResourcesDTO, id, salFacade, globalProcessingExecutor);
+ this.schemaRepo = schemaRepo;
+ this.actorSystem = actorSystem;
+ this.topologyId = topologyId;
+ this.nodeId = nodeId;
+ this.cachedContext = cachedContext;
+ }
+
+ @Override
+ public void onRemoteSessionUp(NetconfSessionPreferences remoteSessionCapabilities, NetconfDeviceCommunicator listener) {
+ LOG.warn("Node {} SessionUp, with capabilities {}", nodeId, remoteSessionCapabilities);
+ this.listener = listener;
+ this.sessionPreferences = remoteSessionCapabilities;
+ slaveSetupSchema();
+ }
+
+
+ @Override
+ protected void handleSalInitializationSuccess(SchemaContext result, NetconfSessionPreferences remoteSessionCapabilities, DOMRpcService deviceRpc) {
+ super.handleSalInitializationSuccess(result, remoteSessionCapabilities, deviceRpc);
+
+ final Set<SourceIdentifier> sourceIds = Sets.newHashSet();
+ for(ModuleIdentifier id : result.getAllModuleIdentifiers()) {
+ sourceIds.add(SourceIdentifier.create(id.getName(), (SimpleDateFormatUtil.DEFAULT_DATE_REV == id.getRevision() ? Optional.<String>absent() :
+ Optional.of(SimpleDateFormatUtil.getRevisionFormat().format(id.getRevision())))));
+ }
+
+ //TODO extract string constant to util class
+ LOG.debug("Creating master source provider");
+ masterSourceProvider = TypedActor.get(cachedContext).typedActorOf(
+ new TypedProps<>(MasterSourceProvider.class,
+ new Creator<MasterSourceProviderImpl>() {
+ @Override
+ public MasterSourceProviderImpl create() throws Exception {
+ return new MasterSourceProviderImpl(schemaRepo, sourceIds, actorSystem, topologyId, nodeId);
+ }
+ }), "masterSourceProvider");
+ }
+
+ @Override
+ public void onRemoteSessionDown() {
+ super.onRemoteSessionDown();
+ listener = null;
+ sessionPreferences = null;
+ if (masterSourceProvider != null) {
+ // if we have master the slave that started on this node should be already killed via PoisonPill, so stop master only now
+ LOG.debug("Stopping master source provider for node {}", nodeId);
+ TypedActor.get(actorSystem).stop(masterSourceProvider);
+ masterSourceProvider = null;
+ } else {
+ LOG.debug("Stopping slave source resolver for node {}", nodeId);
+ TypedActor.get(actorSystem).stop(resolver);
+ resolver = null;
+ }
+ }
+
+ private void slaveSetupSchema() {
+ //TODO extract string constant to util class
+ resolver = TypedActor.get(cachedContext).typedActorOf(
+ new TypedProps<>(ClusteredDeviceSourcesResolver.class,
+ new Creator<ClusteredDeviceSourcesResolverImpl>() {
+ @Override
+ public ClusteredDeviceSourcesResolverImpl create() throws Exception {
+ return new ClusteredDeviceSourcesResolverImpl(topologyId, nodeId, actorSystem, schemaRegistry, sourceRegistrations);
+ }
+ }), "clusteredDeviceSourcesResolver");
+
+
+ final FutureCallback<SchemaContext> schemaContextFuture = new FutureCallback<SchemaContext>() {
+ @Override
+ public void onSuccess(SchemaContext schemaContext) {
+ LOG.debug("{}: Schema context built successfully.", id);
+
+ final NetconfDeviceCapabilities deviceCap = sessionPreferences.getNetconfDeviceCapabilities();
+ final Set<QName> providedSourcesQnames = Sets.newHashSet();
+ for(ModuleIdentifier id : schemaContext.getAllModuleIdentifiers()) {
+ providedSourcesQnames.add(QName.create(id.getQNameModule(), id.getName()));
+ }
+
+ deviceCap.addNonModuleBasedCapabilities(sessionPreferences.getNonModuleCaps());
+ deviceCap.addCapabilities(providedSourcesQnames);
+
+ ClusteredNetconfDevice.super.handleSalInitializationSuccess(
+ schemaContext, sessionPreferences, getDeviceSpecificRpc(schemaContext, listener));
+ }
+
+ @Override
+ public void onFailure(Throwable throwable) {
+ LOG.warn("{}: Unexpected error resolving device sources: {}", id, throwable);
+ handleSalInitializationFailure(throwable, listener);
+ }
+ };
+
+ resolver.getResolvedSources().onComplete(
+ new OnComplete<Set<SourceIdentifier>>() {
+ @Override
+ public void onComplete(Throwable throwable, Set<SourceIdentifier> sourceIdentifiers) throws Throwable {
+ if(throwable != null) {
+ if(throwable instanceof MasterSourceProviderOnSameNodeException) {
+ //do nothing
+ } else {
+ LOG.warn("{}: Unexpected error resolving device sources: {}", id, throwable);
+ handleSalInitializationFailure(throwable, listener);
+ }
+ } else {
+ LOG.trace("{}: Trying to build schema context from {}", id, sourceIdentifiers);
+ Futures.addCallback(schemaContextFactory.createSchemaContext(sourceIdentifiers), schemaContextFuture);
+ }
+ }
+ }, actorSystem.dispatcher());
+ }
+
+ private NetconfDeviceRpc getDeviceSpecificRpc(SchemaContext result, RemoteDeviceCommunicator<NetconfMessage> listener) {
+ return new NetconfDeviceRpc(result, listener, new NetconfMessageTransformer(result, true));
+ }
+
+ @Override
+ public void ownershipChanged(EntityOwnershipChange ownershipChange) {
+ LOG.debug("Entity ownership change received {}", ownershipChange);
+ if(ownershipChange.isOwner()) {
+ super.onRemoteSessionUp(sessionPreferences, listener);
+ } else if (ownershipChange.wasOwner()) {
+ slaveSetupSchema();
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.pipeline;
+
+import java.util.ArrayList;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListenerRegistration;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
+import org.opendaylight.netconf.api.NetconfMessage;
+import org.opendaylight.netconf.api.NetconfTerminationReason;
+import org.opendaylight.netconf.client.NetconfClientSession;
+import org.opendaylight.netconf.client.NetconfClientSessionListener;
+import org.opendaylight.netconf.sal.connect.netconf.NetconfDevice;
+import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCommunicator;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+
+public class ClusteredNetconfDeviceCommunicator extends NetconfDeviceCommunicator {
+
+ private final EntityOwnershipService ownershipService;
+
+ private final ArrayList<NetconfClientSessionListener> netconfClientSessionListeners = new ArrayList<>();
+ private EntityOwnershipListenerRegistration ownershipListenerRegistration = null;
+
+ public ClusteredNetconfDeviceCommunicator(RemoteDeviceId id, NetconfDevice remoteDevice, EntityOwnershipService ownershipService) {
+ super(id, remoteDevice);
+ this.ownershipService = ownershipService;
+ }
+
+ @Override
+ public void onMessage(NetconfClientSession session, NetconfMessage message) {
+ super.onMessage(session, message);
+ for(NetconfClientSessionListener listener : netconfClientSessionListeners) {
+ listener.onMessage(session, message);
+ }
+ }
+
+ @Override
+ public void onSessionDown(NetconfClientSession session, Exception e) {
+ super.onSessionDown(session, e);
+ ownershipListenerRegistration.close();
+ for(NetconfClientSessionListener listener : netconfClientSessionListeners) {
+ listener.onSessionDown(session, e);
+ }
+ }
+
+ @Override
+ public void onSessionUp(NetconfClientSession session) {
+ super.onSessionUp(session);
+ ownershipListenerRegistration = ownershipService.registerListener("netconf-node/" + id.getName(), (ClusteredNetconfDevice) remoteDevice);
+ for(NetconfClientSessionListener listener : netconfClientSessionListeners) {
+ listener.onSessionUp(session);
+ }
+ }
+
+ @Override
+ public void onSessionTerminated(NetconfClientSession session, NetconfTerminationReason reason) {
+ super.onSessionTerminated(session, reason);
+ ownershipListenerRegistration.close();
+ for(NetconfClientSessionListener listener : netconfClientSessionListeners) {
+ listener.onSessionTerminated(session, reason);
+ }
+ }
+
+ public NetconfClientSessionListenerRegistration registerNetconfClientSessionListener(NetconfClientSessionListener listener) {
+ netconfClientSessionListeners.add(listener);
+ return new NetconfClientSessionListenerRegistration(listener);
+ }
+
+ public class NetconfClientSessionListenerRegistration {
+
+ private final NetconfClientSessionListener listener;
+
+ public NetconfClientSessionListenerRegistration(NetconfClientSessionListener listener) {
+ this.listener = listener;
+ }
+
+ public void close() {
+ netconfClientSessionListeners.remove(listener);
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.topology.pipeline;
+
+import akka.actor.TypedActor;
+import org.opendaylight.controller.cluster.schema.provider.RemoteYangTextSourceProvider;
+
+public interface MasterSourceProvider
+ extends TypedActor.PreStart, TypedActor.Receiver, RemoteYangTextSourceProvider {
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.topology.pipeline;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.TypedActor;
+import akka.cluster.Cluster;
+import akka.cluster.Member;
+import java.util.Set;
+import org.opendaylight.controller.cluster.schema.provider.impl.RemoteYangTextSourceProviderImpl;
+import org.opendaylight.netconf.topology.pipeline.messages.AnnounceClusteredDeviceSourcesResolverUp;
+import org.opendaylight.netconf.topology.pipeline.messages.AnnounceMasterOnSameNodeUp;
+import org.opendaylight.netconf.topology.pipeline.messages.AnnounceMasterSourceProviderUp;
+import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
+import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MasterSourceProviderImpl extends RemoteYangTextSourceProviderImpl
+ implements MasterSourceProvider {
+
+ private static Logger LOG = LoggerFactory.getLogger(MasterSourceProviderImpl.class);
+
+ private final ActorSystem actorSystem;
+ private final String topologyId;
+ private final String nodeId;
+
+ public MasterSourceProviderImpl(SchemaRepository schemaRepo, Set<SourceIdentifier> providedSources, ActorSystem actorSystem, String topologyId, String nodeId) {
+ super(schemaRepo, providedSources);
+ this.actorSystem = actorSystem;
+ this.topologyId = topologyId;
+ this.nodeId = nodeId;
+ }
+
+ @Override
+ public void onReceive(Object o, ActorRef actorRef) {
+ if(o instanceof AnnounceClusteredDeviceSourcesResolverUp) {
+ LOG.debug("Received source resolver up");
+ actorRef.tell(new AnnounceMasterSourceProviderUp(), TypedActor.context().self());
+ }
+ }
+
+ @Override
+ public void preStart() {
+ Cluster cluster = Cluster.get(actorSystem);
+ cluster.join(cluster.selfAddress());
+ LOG.debug("Notifying members master schema source provider is up.");
+ for(Member node : cluster.state().getMembers()) {
+ final String path = node.address() + "/user/" + topologyId + "/" + nodeId + "/clusteredDeviceSourcesResolver";
+ if(node.address().equals(cluster.selfAddress())) {
+ actorSystem.actorSelection(path).tell(new AnnounceMasterOnSameNodeUp(), TypedActor.context().self());
+ actorSystem.actorSelection(path).tell(PoisonPill.getInstance(), TypedActor.context().self());
+ } else {
+ //TODO extract string constant to util class
+ actorSystem.actorSelection(path).tell(new AnnounceMasterSourceProviderUp(), TypedActor.context().self());
+ }
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.pipeline;
+
+public class MasterSourceProviderOnSameNodeException extends Exception {
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.pipeline;
+
+import akka.actor.ActorSystem;
+import akka.actor.TypedActor;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.Collections;
+import java.util.Map;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBrokerExtension;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
+import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
+import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceDataBroker;
+import org.opendaylight.netconf.sal.connect.netconf.sal.tx.ReadWriteTx;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.netconf.topology.pipeline.tx.ProxyReadOnlyTransaction;
+import org.opendaylight.netconf.topology.pipeline.tx.ProxyWriteOnlyTransaction;
+import org.opendaylight.netconf.topology.util.messages.NormalizedNodeMessage;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import scala.concurrent.Future;
+import scala.concurrent.impl.Promise.DefaultPromise;
+
+public class NetconfDeviceMasterDataBroker implements ProxyNetconfDeviceDataBroker {
+
+ private final RemoteDeviceId id;
+
+ private final NetconfDeviceDataBroker delegateBroker;
+ private final ActorSystem actorSystem;
+
+ private DOMDataReadOnlyTransaction readTx;
+ private DOMDataWriteTransaction writeTx;
+
+ public NetconfDeviceMasterDataBroker(final ActorSystem actorSystem, final RemoteDeviceId id,
+ final SchemaContext schemaContext, final DOMRpcService rpc,
+ final NetconfSessionPreferences netconfSessionPreferences, final long requestTimeoutMillis) {
+ this.id = id;
+ delegateBroker = new NetconfDeviceDataBroker(id, schemaContext, rpc, netconfSessionPreferences, requestTimeoutMillis);
+ this.actorSystem = actorSystem;
+
+ // only ever need 1 readTx since it doesnt need to be closed
+ readTx = delegateBroker.newReadOnlyTransaction();
+ }
+
+ @Override
+ public DOMDataReadOnlyTransaction newReadOnlyTransaction() {
+ return new ProxyReadOnlyTransaction(actorSystem, id, TypedActor.<NetconfDeviceMasterDataBroker>self());
+ }
+
+ @Override
+ public DOMDataReadWriteTransaction newReadWriteTransaction() {
+ return new ReadWriteTx(new ProxyReadOnlyTransaction(actorSystem, id, TypedActor.<NetconfDeviceMasterDataBroker>self()),
+ newWriteOnlyTransaction());
+ }
+
+ @Override
+ public DOMDataWriteTransaction newWriteOnlyTransaction() {
+ writeTx = delegateBroker.newWriteOnlyTransaction();
+ return new ProxyWriteOnlyTransaction(actorSystem, TypedActor.<NetconfDeviceMasterDataBroker>self());
+ }
+
+ @Override
+ public ListenerRegistration<DOMDataChangeListener> registerDataChangeListener(LogicalDatastoreType store, YangInstanceIdentifier path, DOMDataChangeListener listener, DataChangeScope triggeringScope) {
+ throw new UnsupportedOperationException(id + ": Data change listeners not supported for netconf mount point");
+ }
+
+ @Override
+ public DOMTransactionChain createTransactionChain(TransactionChainListener listener) {
+ throw new UnsupportedOperationException(id + ": Transaction chains not supported for netconf mount point");
+ }
+
+ @Nonnull
+ @Override
+ public Map<Class<? extends DOMDataBrokerExtension>, DOMDataBrokerExtension> getSupportedExtensions() {
+ return Collections.emptyMap();
+ }
+
+ @Override
+ public Future<Optional<NormalizedNodeMessage>> read(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
+ final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readFuture = readTx.read(store, path);
+
+ final DefaultPromise<Optional<NormalizedNodeMessage>> promise = new DefaultPromise<>();
+ Futures.addCallback(readFuture, new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
+ @Override
+ public void onSuccess(Optional<NormalizedNode<?, ?>> result) {
+ if (!result.isPresent()) {
+ promise.success(Optional.<NormalizedNodeMessage>absent());
+ } else {
+ promise.success(Optional.of(new NormalizedNodeMessage(path, result.get())));
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ promise.failure(t);
+ }
+ });
+ return promise.future();
+ }
+
+ @Override
+ public Future<Boolean> exists(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
+ final CheckedFuture<Boolean, ReadFailedException> existsFuture = readTx.exists(store, path);
+
+ final DefaultPromise<Boolean> promise = new DefaultPromise<>();
+ Futures.addCallback(existsFuture, new FutureCallback<Boolean>() {
+ @Override
+ public void onSuccess(Boolean result) {
+ promise.success(result);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ promise.failure(t);
+ }
+ });
+ return promise.future();
+ }
+
+ @Override
+ public void put(final LogicalDatastoreType store, final NormalizedNodeMessage data) {
+ if (writeTx == null) {
+ writeTx = delegateBroker.newWriteOnlyTransaction();
+ }
+ writeTx.put(store, data.getIdentifier(), data.getNode());
+ }
+
+ @Override
+ public void merge(final LogicalDatastoreType store, final NormalizedNodeMessage data) {
+ if (writeTx == null) {
+ writeTx = delegateBroker.newWriteOnlyTransaction();
+ }
+ writeTx.merge(store, data.getIdentifier(), data.getNode());
+ }
+
+ @Override
+ public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
+ if (writeTx == null) {
+ writeTx = delegateBroker.newWriteOnlyTransaction();
+ }
+ writeTx.delete(store, path);
+ }
+
+ @Override
+ public boolean cancel() {
+ return writeTx.cancel();
+ }
+
+ @Override
+ public Future<Void> submit() {
+ final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = writeTx.submit();
+ final DefaultPromise<Void> promise = new DefaultPromise<>();
+ Futures.addCallback(submitFuture, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(Void result) {
+ promise.success(result);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ promise.failure(t);
+ }
+ });
+ return promise.future();
+ }
+
+ @Override
+ @Deprecated
+ public Future<RpcResult<TransactionStatus>> commit() {
+ final ListenableFuture<RpcResult<TransactionStatus>> commitFuture = writeTx.commit();
+ final DefaultPromise<RpcResult<TransactionStatus>> promise = new DefaultPromise<>();
+ Futures.addCallback(commitFuture, new FutureCallback<RpcResult<TransactionStatus>>() {
+ @Override
+ public void onSuccess(RpcResult<TransactionStatus> result) {
+ promise.success(result);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ promise.failure(t);
+ }
+ });
+ return promise.future();
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.pipeline;
+
+import akka.actor.ActorSystem;
+import java.util.Collections;
+import java.util.Map;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBrokerExtension;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
+import org.opendaylight.netconf.sal.connect.netconf.sal.tx.ReadWriteTx;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.netconf.topology.pipeline.tx.ProxyReadOnlyTransaction;
+import org.opendaylight.netconf.topology.pipeline.tx.ProxyWriteOnlyTransaction;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+public class NetconfDeviceSlaveDataBroker implements DOMDataBroker{
+
+ private final RemoteDeviceId id;
+ private final ProxyNetconfDeviceDataBroker masterDataBroker;
+ private final ActorSystem actorSystem;
+
+ public NetconfDeviceSlaveDataBroker(final ActorSystem actorSystem, final RemoteDeviceId id, final ProxyNetconfDeviceDataBroker masterDataBroker) {
+ this.id = id;
+ this.masterDataBroker = masterDataBroker;
+ this.actorSystem = actorSystem;
+ }
+
+ @Override
+ public DOMDataReadOnlyTransaction newReadOnlyTransaction() {
+ return new ProxyReadOnlyTransaction(actorSystem, id, masterDataBroker);
+ }
+
+ @Override
+ public DOMDataReadWriteTransaction newReadWriteTransaction() {
+ return new ReadWriteTx(new ProxyReadOnlyTransaction(actorSystem, id, masterDataBroker), new ProxyWriteOnlyTransaction(actorSystem, masterDataBroker));
+ }
+
+ @Override
+ public DOMDataWriteTransaction newWriteOnlyTransaction() {
+ return new ProxyWriteOnlyTransaction(actorSystem, masterDataBroker);
+ }
+
+ @Override
+ public ListenerRegistration<DOMDataChangeListener> registerDataChangeListener(LogicalDatastoreType store, YangInstanceIdentifier path, DOMDataChangeListener listener, DataChangeScope triggeringScope) {
+ throw new UnsupportedOperationException(id + ": Data change listeners not supported for netconf mount point");
+ }
+
+ @Override
+ public DOMTransactionChain createTransactionChain(TransactionChainListener listener) {
+ throw new UnsupportedOperationException(id + ": Transaction chains not supported for netconf mount point");
+ }
+
+ @Nonnull
+ @Override
+ public Map<Class<? extends DOMDataBrokerExtension>, DOMDataBrokerExtension> getSupportedExtensions() {
+ return Collections.emptyMap();
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.pipeline;
+
+import com.google.common.base.Optional;
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.netconf.topology.util.messages.NormalizedNodeMessage;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import scala.concurrent.Future;
+
+public interface ProxyNetconfDeviceDataBroker extends DOMDataBroker{
+ Future<Optional<NormalizedNodeMessage>> read(LogicalDatastoreType store, YangInstanceIdentifier path);
+
+ Future<Boolean> exists(LogicalDatastoreType store, YangInstanceIdentifier path);
+
+ void put(LogicalDatastoreType store, NormalizedNodeMessage data);
+
+ void merge(LogicalDatastoreType store, NormalizedNodeMessage data);
+
+ void delete(LogicalDatastoreType store, YangInstanceIdentifier path);
+
+ boolean cancel();
+
+ Future<Void> submit();
+
+ @Deprecated
+ Future<RpcResult<TransactionStatus>> commit();
+}
package org.opendaylight.netconf.topology.pipeline;
+import akka.actor.ActorContext;
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.TypedActor;
+import akka.actor.TypedProps;
+import akka.cluster.Cluster;
+import akka.cluster.Member;
+import akka.japi.Creator;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
import org.opendaylight.controller.sal.core.api.Broker;
import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
-import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceDataBroker;
import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceNotificationService;
import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.netconf.topology.util.messages.AnnounceMasterMountPoint;
+import org.opendaylight.netconf.topology.util.messages.AnnounceMasterMountPointDown;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private static final Logger LOG = LoggerFactory.getLogger(TopologyMountPointFacade.class);
+ private static final String MOUNT_POINT = "mountpoint";
+
+ private final String topologyId;
private final RemoteDeviceId id;
private final Broker domBroker;
private final BindingAwareBroker bindingBroker;
private DOMRpcService deviceRpc = null;
private final ClusteredNetconfDeviceMountInstanceProxy salProvider;
+ private ActorSystem actorSystem;
+ private DOMDataBroker deviceDataBroker = null;
+
private final ArrayList<RemoteDeviceHandler<NetconfSessionPreferences>> connectionStatusListeners = new ArrayList<>();
- public TopologyMountPointFacade(final RemoteDeviceId id,
+ public TopologyMountPointFacade(final String topologyId,
+ final RemoteDeviceId id,
final Broker domBroker,
final BindingAwareBroker bindingBroker,
long defaultRequestTimeoutMillis) {
-
+ this.topologyId = topologyId;
this.id = id;
this.domBroker = domBroker;
this.bindingBroker = bindingBroker;
final NetconfSessionPreferences netconfSessionPreferences,
final DOMRpcService deviceRpc) {
// prepare our prerequisites for mountpoint
+ LOG.debug("Mount point facade onConnected capabilities {}", netconfSessionPreferences);
this.remoteSchemaContext = remoteSchemaContext;
this.netconfSessionPreferences = netconfSessionPreferences;
this.deviceRpc = deviceRpc;
salProvider.getMountInstance().publish(domNotification);
}
- public void registerMountPoint() {
+ public void registerMountPoint(final ActorSystem actorSystem, final ActorContext context) {
+ if (remoteSchemaContext == null || netconfSessionPreferences == null) {
+ LOG.debug("Master mount point does not have schemas ready yet, delaying registration");
+ return;
+ }
+
Preconditions.checkNotNull(id);
Preconditions.checkNotNull(remoteSchemaContext, "Device has no remote schema context yet. Probably not fully connected.");
Preconditions.checkNotNull(netconfSessionPreferences, "Device has no capabilities yet. Probably not fully connected.");
+ this.actorSystem = actorSystem;
+ final NetconfDeviceNotificationService notificationService = new NetconfDeviceNotificationService();
+
+ LOG.warn("Creating master data broker for device {}", id);
+ deviceDataBroker = TypedActor.get(context).typedActorOf(new TypedProps<>(ProxyNetconfDeviceDataBroker.class, new Creator<NetconfDeviceMasterDataBroker>() {
+ @Override
+ public NetconfDeviceMasterDataBroker create() throws Exception {
+ return new NetconfDeviceMasterDataBroker(actorSystem, id, remoteSchemaContext, deviceRpc, netconfSessionPreferences, defaultRequestTimeoutMillis);
+ }
+ }), MOUNT_POINT);
+ LOG.debug("Master data broker registered on path {}", TypedActor.get(actorSystem).getActorRefFor(deviceDataBroker).path());
+ salProvider.getMountInstance().onTopologyDeviceConnected(remoteSchemaContext, deviceDataBroker, deviceRpc, notificationService);
+ final Cluster cluster = Cluster.get(actorSystem);
+ final Iterable<Member> members = cluster.state().getMembers();
+ final ActorRef deviceDataBrokerRef = TypedActor.get(actorSystem).getActorRefFor(deviceDataBroker);
+ for (final Member member : members) {
+ if (!member.address().equals(cluster.selfAddress())) {
+ final String path = member.address() + "/user/" + topologyId + "/" + id.getName();
+ actorSystem.actorSelection(path).tell(new AnnounceMasterMountPoint(), deviceDataBrokerRef);
+ }
+ }
+ }
- final DOMDataBroker netconfDeviceDataBroker = new NetconfDeviceDataBroker(id, remoteSchemaContext, deviceRpc, netconfSessionPreferences, defaultRequestTimeoutMillis);
+ public void registerMountPoint(final ActorSystem actorSystem, final ActorContext context, final ActorRef masterRef) {
+ if (remoteSchemaContext == null || netconfSessionPreferences == null) {
+ LOG.debug("Slave mount point does not have schemas ready yet, delaying registration");
+ return;
+ }
+
+ Preconditions.checkNotNull(id);
+ Preconditions.checkNotNull(remoteSchemaContext, "Device has no remote schema context yet. Probably not fully connected.");
+ Preconditions.checkNotNull(netconfSessionPreferences, "Device has no capabilities yet. Probably not fully connected.");
+ this.actorSystem = actorSystem;
final NetconfDeviceNotificationService notificationService = new NetconfDeviceNotificationService();
- salProvider.getMountInstance().onTopologyDeviceConnected(remoteSchemaContext, netconfDeviceDataBroker, deviceRpc, notificationService);
+ final ProxyNetconfDeviceDataBroker masterDataBroker = TypedActor.get(actorSystem).typedActorOf(new TypedProps<>(ProxyNetconfDeviceDataBroker.class, NetconfDeviceMasterDataBroker.class), masterRef);
+ LOG.warn("Creating slave data broker for device {}", id);
+ final DOMDataBroker deviceDataBroker = new NetconfDeviceSlaveDataBroker(actorSystem, id, masterDataBroker);
+ salProvider.getMountInstance().onTopologyDeviceConnected(remoteSchemaContext, deviceDataBroker, deviceRpc, notificationService);
}
public void unregisterMountPoint() {
salProvider.getMountInstance().onTopologyDeviceDisconnected();
+ if (deviceDataBroker != null) {
+ LOG.debug("Stopping master data broker for device {}", id.getName());
+ for (final Member member : Cluster.get(actorSystem).state().getMembers()) {
+ if (member.address().equals(Cluster.get(actorSystem).selfAddress())) {
+ continue;
+ }
+ actorSystem.actorSelection(member.address() + "/user/" + topologyId + "/" + id.getName()).tell(new AnnounceMasterMountPointDown(), null);
+ }
+ TypedActor.get(actorSystem).stop(deviceDataBroker);
+ deviceDataBroker = null;
+ }
}
public ConnectionStatusListenerRegistration registerConnectionStatusListener(final RemoteDeviceHandler<NetconfSessionPreferences> listener) {
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.pipeline.messages;
+
+import java.io.Serializable;
+
+public class AnnounceClusteredDeviceSourcesResolverUp implements Serializable {
+ public static final long serialVersionUID = 1L;
+
+ public AnnounceClusteredDeviceSourcesResolverUp() {}
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.pipeline.messages;
+
+import java.io.Serializable;
+
+public class AnnounceMasterOnSameNodeUp implements Serializable {
+ public static long serialVersionUID = 1L;
+
+ public AnnounceMasterOnSameNodeUp() {
+
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.pipeline.messages;
+
+import java.io.Serializable;
+
+public class AnnounceMasterSourceProviderUp implements Serializable {
+ public static final long serialVersionUID = 1L;
+
+ public AnnounceMasterSourceProviderUp() {
+
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.pipeline.tx;
+
+public interface NetconfDeviceDataBrokerProxy {
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.pipeline.tx;
+
+import akka.actor.ActorSystem;
+import akka.dispatch.OnComplete;
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.SettableFuture;
+import javax.annotation.Nullable;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.netconf.topology.pipeline.ProxyNetconfDeviceDataBroker;
+import org.opendaylight.netconf.topology.util.messages.NormalizedNodeMessage;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import scala.concurrent.Future;
+
+public class ProxyReadOnlyTransaction implements DOMDataReadOnlyTransaction{
+
+ private final RemoteDeviceId id;
+ private final ProxyNetconfDeviceDataBroker delegate;
+ private final ActorSystem actorSystem;
+
+ public ProxyReadOnlyTransaction(final ActorSystem actorSystem, final RemoteDeviceId id, final ProxyNetconfDeviceDataBroker delegate) {
+ this.id = id;
+ this.delegate = delegate;
+ this.actorSystem = actorSystem;
+ }
+
+ @Override
+ public void close() {
+ //NOOP
+ }
+
+ @Override
+ public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
+ final Future<Optional<NormalizedNodeMessage>> future = delegate.read(store, path);
+ final SettableFuture<Optional<NormalizedNode<?, ?>>> settableFuture = SettableFuture.create();
+ final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> checkedFuture = Futures.makeChecked(settableFuture, new Function<Exception, ReadFailedException>() {
+ @Nullable
+ @Override
+ public ReadFailedException apply(Exception cause) {
+ return new ReadFailedException("Read from transaction failed", cause);
+ }
+ });
+ future.onComplete(new OnComplete<Optional<NormalizedNodeMessage>>() {
+ @Override
+ public void onComplete(Throwable throwable, Optional<NormalizedNodeMessage> normalizedNodeMessage) throws Throwable {
+ if (throwable == null) {
+ settableFuture.set(normalizedNodeMessage.transform(new Function<NormalizedNodeMessage, NormalizedNode<?, ?>>() {
+ @Nullable
+ @Override
+ public NormalizedNode<?, ?> apply(NormalizedNodeMessage input) {
+ return input.getNode();
+ }
+ }));
+ } else {
+ settableFuture.setException(throwable);
+ }
+ }
+ }, actorSystem.dispatcher());
+ return checkedFuture;
+ }
+
+ @Override
+ public CheckedFuture<Boolean, ReadFailedException> exists(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
+ final Future<Boolean> existsFuture = delegate.exists(store, path);
+ final SettableFuture<Boolean> settableFuture = SettableFuture.create();
+ final CheckedFuture<Boolean, ReadFailedException> checkedFuture = Futures.makeChecked(settableFuture, new Function<Exception, ReadFailedException>() {
+ @Nullable
+ @Override
+ public ReadFailedException apply(Exception cause) {
+ return new ReadFailedException("Read from transaction failed", cause);
+ }
+ });
+ existsFuture.onComplete(new OnComplete<Boolean>() {
+ @Override
+ public void onComplete(Throwable throwable, Boolean result) throws Throwable {
+ if (throwable == null) {
+ settableFuture.set(result);
+ } else {
+ settableFuture.setException(throwable);
+ }
+ }
+ }, actorSystem.dispatcher());
+ return checkedFuture;
+ }
+
+ @Override
+ public Object getIdentifier() {
+ return this;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.pipeline.tx;
+
+import akka.actor.ActorSystem;
+import akka.dispatch.OnComplete;
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import javax.annotation.Nullable;
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.netconf.topology.pipeline.ProxyNetconfDeviceDataBroker;
+import org.opendaylight.netconf.topology.util.messages.NormalizedNodeMessage;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import scala.concurrent.Future;
+
+public class ProxyWriteOnlyTransaction implements DOMDataWriteTransaction {
+
+ private final ProxyNetconfDeviceDataBroker delegate;
+ private final ActorSystem actorSystem;
+
+ public ProxyWriteOnlyTransaction(ActorSystem actorSystem, final ProxyNetconfDeviceDataBroker delegate) {
+ this.delegate = delegate;
+ this.actorSystem = actorSystem;
+ }
+
+ @Override
+ public void put (final LogicalDatastoreType store, final YangInstanceIdentifier path, final NormalizedNode < ?,?>data){
+ delegate.put(store, new NormalizedNodeMessage(path, data));
+ }
+
+ @Override
+ public void merge (final LogicalDatastoreType store, final YangInstanceIdentifier path, final NormalizedNode < ?,?>data){
+ delegate.merge(store, new NormalizedNodeMessage(path, data));
+ }
+
+ @Override
+ public boolean cancel () {
+ return delegate.cancel();
+ }
+
+ @Override
+ public void delete (final LogicalDatastoreType store, final YangInstanceIdentifier path){
+ delegate.delete(store, path);
+ }
+
+ @Override
+ public CheckedFuture<Void, TransactionCommitFailedException> submit() {
+ final Future<Void> submit = delegate.submit();
+ final SettableFuture<Void> settableFuture = SettableFuture.create();
+ final CheckedFuture<Void, TransactionCommitFailedException> checkedFuture = Futures.makeChecked(settableFuture, new Function<Exception, TransactionCommitFailedException>() {
+ @Nullable
+ @Override
+ public TransactionCommitFailedException apply(Exception input) {
+ return new TransactionCommitFailedException("Transaction commit failed", input);
+ }
+ });
+ submit.onComplete(new OnComplete<Void>() {
+ @Override
+ public void onComplete(Throwable throwable, Void aVoid) throws Throwable {
+ if (throwable == null) {
+ settableFuture.set(aVoid);
+ } else {
+ settableFuture.setException(throwable);
+ }
+ }
+ }, actorSystem.dispatcher());
+ return checkedFuture;
+ }
+
+ @Override
+ public ListenableFuture<RpcResult<TransactionStatus>> commit () {
+ final Future<RpcResult<TransactionStatus>> commit = delegate.commit();
+ final SettableFuture<RpcResult<TransactionStatus>> settableFuture = SettableFuture.create();
+ commit.onComplete(new OnComplete<RpcResult<TransactionStatus>>() {
+ @Override
+ public void onComplete(Throwable throwable, RpcResult<TransactionStatus> transactionStatusRpcResult) throws Throwable {
+ if (throwable == null) {
+ settableFuture.set(transactionStatusRpcResult);
+ } else {
+ settableFuture.setException(throwable);
+ }
+ }
+ }, actorSystem.dispatcher());
+ return settableFuture;
+ }
+
+ @Override
+ public Object getIdentifier () {
+ return this;
+ }
+}
import akka.actor.TypedActorExtension;
import akka.actor.TypedProps;
import akka.japi.Creator;
+import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.concurrent.TimeUnit;
import javassist.ClassPool;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.MockitoAnnotations;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
import org.opendaylight.netconf.topology.NodeManagerCallback.NodeManagerCallbackFactory;
import org.opendaylight.netconf.topology.TopologyManagerCallback.TopologyManagerCallbackFactory;
import org.opendaylight.netconf.topology.example.ExampleNodeManagerCallback;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeConnectionStatus.ConnectionStatus;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.AvailableCapabilitiesBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.ClusteredConnectionStatusBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.UnavailableCapabilitiesBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.clustered.connection.status.NodeStatus.Status;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.clustered.connection.status.NodeStatusBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.unavailable.capabilities.UnavailableCapability;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
import org.opendaylight.yangtools.binding.data.codec.gen.impl.StreamWriterGenerator;
@Mock
private DataBroker dataBroker;
+ @Mock
+ private ReadOnlyTransaction mockedReadOnlyTx;
+
private static final BindingNormalizedNodeCodecRegistry CODEC_REGISTRY;
static {
@Before
public void setup() {
MockitoAnnotations.initMocks(this);
+ final SettableFuture<Optional<Topology>> settableFuture = SettableFuture.create();
+ final CheckedFuture<Optional<Topology>, ReadFailedException> checkedFuture = Futures.makeChecked(settableFuture, new Function<Exception, ReadFailedException>() {
+ @Nullable
+ @Override
+ public ReadFailedException apply(Exception input) {
+ return new ReadFailedException("Dummy future should never return this");
+ }
+ });
+ settableFuture.set(Optional.<Topology>absent());
+ when(mockedReadOnlyTx.read(any(LogicalDatastoreType.class), any(InstanceIdentifier.class))).thenReturn(checkedFuture);
when(dataBroker.registerDataChangeListener(
any(LogicalDatastoreType.class),
any(InstanceIdentifier.class),
any(DataChangeListener.class),
any(DataChangeScope.class))).thenReturn(null);
+ when(dataBroker.newReadOnlyTransaction()).thenReturn(mockedReadOnlyTx);
}
private void setMaster(final TopologyManager manager) {
}
});
}
+ LOG.debug("Waiting for updates to finish");
+ Futures.allAsList(futures).get();
final List<ListenableFuture<Void>> deleteFutures = new ArrayList<>();
.setHost(netconfNode.getHost())
.setPort(netconfNode.getPort())
.setConnectionStatus(ConnectionStatus.Connecting)
+ .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<String>()).build())
+ .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
.setClusteredConnectionStatus(
new ClusteredConnectionStatusBuilder()
.setNodeStatus(
.setHost(netconfNode.getHost())
.setPort(netconfNode.getPort())
.setConnectionStatus(ConnectionStatus.UnableToConnect)
+ .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<String>()).build())
+ .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
.setClusteredConnectionStatus(
new ClusteredConnectionStatusBuilder()
.setNodeStatus(
.setConnectionStatus(ConnectionStatus.Connected)
.setHost(augmentation.getHost())
.setPort(augmentation.getPort())
+ .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<String>()).build())
+ .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
.setClusteredConnectionStatus(
new ClusteredConnectionStatusBuilder()
.setNodeStatus(
.setConnectionStatus(ConnectionStatus.Connected)
.setHost(augmentation.getHost())
.setPort(augmentation.getPort())
+ .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<String>()).build())
+ .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
.setClusteredConnectionStatus(
new ClusteredConnectionStatusBuilder()
.setNodeStatus(
public void onReceive(Object o, ActorRef actorRef) {
}
+
+ @Override
+ public void onDeviceConnected(SchemaContext remoteSchemaContext, NetconfSessionPreferences netconfSessionPreferences, DOMRpcService deviceRpc) {
+
+ }
+
+ @Override
+ public void onDeviceDisconnected() {
+
+ }
+
+ @Override
+ public void onDeviceFailed(Throwable throwable) {
+
+ }
+
+ @Override
+ public void onNotification(DOMNotification domNotification) {
+
+ }
+
+ @Override
+ public void close() {
+
+ }
}
public static class TestingTopologyManagerCallback implements TopologyManagerCallback {
.setConnectionStatus(ConnectionStatus.Connected)
.setHost(new Host(new IpAddress(new Ipv4Address("127.0.0.1"))))
.setPort(new PortNumber(2555))
+ .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<String>()).build())
+ .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
.build())
.build());
}
.setConnectionStatus(ConnectionStatus.Connected)
.setHost(new Host(new IpAddress(new Ipv4Address("127.0.0.1"))))
.setPort(new PortNumber(65535))
+ .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<String>()).build())
+ .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
.build())
.build());
}
public void onReceive(Object o, ActorRef actorRef) {
}
+
+ @Nonnull
+ @Override
+ public Node getInitialState(@Nonnull NodeId nodeId, @Nonnull Node configNode) {
+ return new NodeBuilder()
+ .setNodeId(nodeId)
+ .addAugmentation(NetconfNode.class,
+ new NetconfNodeBuilder()
+ .setConnectionStatus(ConnectionStatus.Connecting)
+ .setHost(new Host(new IpAddress(new Ipv4Address("127.0.0.1"))))
+ .setPort(new PortNumber(65535))
+ .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<String>()).build())
+ .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
+ .build())
+ .build();
+ }
+
+ @Nonnull
+ @Override
+ public Node getFailedState(@Nonnull NodeId nodeId, @Nonnull Node configNode) {
+ return new NodeBuilder()
+ .setNodeId(nodeId)
+ .addAugmentation(NetconfNode.class,
+ new NetconfNodeBuilder()
+ .setConnectionStatus(ConnectionStatus.UnableToConnect)
+ .setHost(new Host(new IpAddress(new Ipv4Address("127.0.0.1"))))
+ .setPort(new PortNumber(65535))
+ .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<String>()).build())
+ .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
+ .build())
+ .build();
+ }
}
public class TestingSuccesfulStateAggregator implements StateAggregator {
package org.opendaylight.netconf.topology;
+import akka.actor.ActorContext;
+import akka.actor.ActorRef;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
}
@Override
- public void registerMountPoint(NodeId nodeId) {
+ public void registerMountPoint(ActorContext context, NodeId nodeId) {
LOG.debug("Registering mount point for node {}", nodeId.getValue());
+ }
+ @Override
+ public void registerMountPoint(ActorContext context, NodeId nodeId, ActorRef masterRef) {
+ LOG.debug("Registering mount point for node {}", nodeId.getValue());
}
@Override
<scope>test</scope>
</dependency>
<dependency>
- <groupId>org.opendaylight.controller</groupId>
- <artifactId>sal-binding-broker-impl</artifactId>
- <type>test-jar</type>
- <scope>test</scope>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-binding-broker-impl</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
</dependency>
<dependency>
- <groupId>org.opendaylight.yangtools</groupId>
- <artifactId>mockito-configuration</artifactId>
- <scope>test</scope>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
</dependency>
</dependencies>
*/
package org.opendaylight.netconf.sal.connect.netconf;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
/**
* This is a mediator between NetconfDeviceCommunicator and NetconfDeviceSalFacade
*/
-public final class NetconfDevice implements RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> {
+public class NetconfDevice implements RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> {
private static final Logger LOG = LoggerFactory.getLogger(NetconfDevice.class);
}
};
- private final RemoteDeviceId id;
+ protected final RemoteDeviceId id;
private final boolean reconnectOnSchemasChange;
- private final SchemaContextFactory schemaContextFactory;
+ protected final SchemaContextFactory schemaContextFactory;
private final RemoteDeviceHandler<NetconfSessionPreferences> salFacade;
private final ListeningExecutorService processingExecutor;
- private final SchemaSourceRegistry schemaRegistry;
+ protected final SchemaSourceRegistry schemaRegistry;
private final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver;
private final NotificationHandler notificationHandler;
- private final List<SchemaSourceRegistration<? extends SchemaSourceRepresentation>> sourceRegistrations = Lists.newArrayList();
+ protected final List<SchemaSourceRegistration<? extends SchemaSourceRepresentation>> sourceRegistrations = Lists.newArrayList();
// Message transformer is constructed once the schemas are available
private MessageTransformer<NetconfMessage> messageTransformer;
return remoteSessionCapabilities.isNotificationsSupported() && reconnectOnSchemasChange;
}
- @VisibleForTesting
- void handleSalInitializationSuccess(final SchemaContext result, final NetconfSessionPreferences remoteSessionCapabilities, final DOMRpcService deviceRpc) {
+ protected void handleSalInitializationSuccess(final SchemaContext result, final NetconfSessionPreferences remoteSessionCapabilities, final DOMRpcService deviceRpc) {
messageTransformer = new NetconfMessageTransformer(result, true);
updateTransformer(messageTransformer);
LOG.info("{}: Netconf connector initialized successfully", id);
}
- private void handleSalInitializationFailure(final Throwable t, final RemoteDeviceCommunicator<NetconfMessage> listener) {
+ protected void handleSalInitializationFailure(final Throwable t, final RemoteDeviceCommunicator<NetconfMessage> listener) {
LOG.error("{}: Initialization in sal failed, disconnecting from device", id, t);
listener.close();
onRemoteSessionDown();
Futures.addCallback(schemaBuilderFuture, RecursiveSchemaBuilderCallback);
}
- private NetconfDeviceRpc getDeviceSpecificRpc(final SchemaContext result) {
+ protected NetconfDeviceRpc getDeviceSpecificRpc(final SchemaContext result) {
return new NetconfDeviceRpc(result, listener, new NetconfMessageTransformer(result, true));
}
private static final Logger LOG = LoggerFactory.getLogger(NetconfDeviceCommunicator.class);
- private final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> remoteDevice;
+ protected final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> remoteDevice;
private final Optional<NetconfSessionPreferences> overrideNetconfCapabilities;
- private final RemoteDeviceId id;
+ protected final RemoteDeviceId id;
private final Lock sessionLock = new ReentrantLock();
// TODO implement concurrent message limit
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
}
private static QName cachedQName(final String namespace, final String revision, final String moduleName) {
- return QName.cachedReference(QName.create(namespace, revision, moduleName));
+ return QName.create(namespace, revision, moduleName).intern();
}
private static QName cachedQName(final String namespace, final String moduleName) {
- return QName.cachedReference(QName.create(URI.create(namespace), null, moduleName).withoutRevision());
+ return QName.create(URI.create(namespace), null, moduleName).withoutRevision().intern();
}
public static NetconfSessionPreferences fromStrings(final Collection<String> capabilities) {
final String namespace = capability.substring(0, qmark);
final Iterable<String> queryParams = AMP_SPLITTER.split(capability.substring(qmark + 1));
final String moduleName = MODULE_PARAM.from(queryParams);
- if (moduleName == null) {
+ if (Strings.isNullOrEmpty(moduleName)) {
continue;
}
String revision = REVISION_PARAM.from(queryParams);
- if (revision != null) {
+ if (!Strings.isNullOrEmpty(revision)) {
addModuleQName(moduleBasedCaps, nonModuleCaps, capability, cachedQName(namespace, revision, moduleName));
continue;
}
LOG.debug("Netconf device was not reporting revision correctly, trying to get amp;revision=");
revision = BROKEN_REVISON_PARAM.from(queryParams);
- if (revision == null) {
+ if (Strings.isNullOrEmpty(revision)) {
LOG.warn("Netconf device returned revision incorrectly escaped for {}, ignoring it", capability);
addModuleQName(moduleBasedCaps, nonModuleCaps, capability, cachedQName(namespace, moduleName));
} else {
private void scheduleKeepalive() {
Preconditions.checkState(currentDeviceRpc != null);
LOG.trace("{}: Scheduling next keepalive in {} {}", id, keepaliveDelaySeconds, TimeUnit.SECONDS);
- currentKeepalive = executor.schedule(new Keepalive(), keepaliveDelaySeconds, TimeUnit.SECONDS);
+ currentKeepalive = executor.schedule(new Keepalive(currentKeepalive), keepaliveDelaySeconds, TimeUnit.SECONDS);
}
@Override
*/
private class Keepalive implements Runnable, FutureCallback<DOMRpcResult> {
+ private final ScheduledFuture<?> previousKeepalive;
+
+ public Keepalive(final ScheduledFuture<?> previousKeepalive) {
+ this.previousKeepalive = previousKeepalive;
+ }
+
@Override
public void run() {
LOG.trace("{}: Invoking keepalive RPC", id);
try {
- Futures.addCallback(currentDeviceRpc.invokeRpc(PATH, KEEPALIVE_PAYLOAD), this);
+ if(previousKeepalive != null && !previousKeepalive.isDone()) {
+ onFailure(new IllegalStateException("Previous keepalive timed out"));
+ } else {
+ Futures.addCallback(currentDeviceRpc.invokeRpc(PATH, KEEPALIVE_PAYLOAD), this);
+ }
} catch (NullPointerException e) {
LOG.debug("{}: Skipping keepalive while reconnecting", id);
// Empty catch block intentional
"namespace:2?module=module2",
"namespace:2?module=module2&revision=2012-12-12",
"namespace:2?module=module1&RANDOMSTRING;revision=2013-12-12",
- "namespace:2?module=module2&RANDOMSTRING;revision=2013-12-12" // This one should be ignored(same as first), since revision is in wrong format
+ // Revision parameter present, but no revision defined
+ "namespace:2?module=module4&RANDOMSTRING;revision=",
+ // This one should be ignored(same as first), since revision is in wrong format
+ "namespace:2?module=module2&RANDOMSTRING;revision=2013-12-12"
);
final NetconfSessionPreferences sessionCaps1 = NetconfSessionPreferences.fromStrings(caps1);
- assertCaps(sessionCaps1, 0, 3);
+ assertCaps(sessionCaps1, 0, 4);
}
private void assertCaps(final NetconfSessionPreferences sessionCaps1, final int nonModuleCaps, final int moduleCaps) {
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
-import com.google.common.util.concurrent.Futures;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Matchers;
import org.mockito.Mock;
+import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+import com.google.common.util.concurrent.Futures;
+
public class KeepaliveSalFacadeTest {
private static final RemoteDeviceId REMOTE_DEVICE_ID = new RemoteDeviceId("test", new InetSocketAddress("localhost", 22));
@Mock
private RemoteDeviceHandler<NetconfSessionPreferences> underlyingSalFacade;
- private static java.util.concurrent.ScheduledExecutorService executorService;
+ private ScheduledExecutorService executorServiceSpy;
@Mock
private NetconfDeviceCommunicator listener;
private DOMRpcService proxyRpc;
+ @Mock
+ private ScheduledFuture currentKeepalive;
+
@Before
public void setUp() throws Exception {
- executorService = Executors.newScheduledThreadPool(1);
+ executorServiceSpy = Executors.newScheduledThreadPool(1);
MockitoAnnotations.initMocks(this);
doReturn("mockedRpc").when(deviceRpc).toString();
doNothing().when(underlyingSalFacade).onDeviceConnected(
any(SchemaContext.class), any(NetconfSessionPreferences.class), any(DOMRpcService.class));
+
+ ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
+ executorServiceSpy = Mockito.spy(executorService);
+ doAnswer(new Answer<ScheduledFuture>() {
+ @Override
+ public ScheduledFuture answer(InvocationOnMock invocationOnMock)
+ throws Throwable {
+ invocationOnMock.callRealMethod();
+ return currentKeepalive;
+ }
+ }).when(executorServiceSpy).schedule(Mockito.<Runnable> any(),
+ Mockito.anyLong(), Matchers.<TimeUnit> any());
+
+ Mockito.when(currentKeepalive.isDone()).thenReturn(true);
}
@After
public void tearDown() throws Exception {
- executorService.shutdownNow();
+ executorServiceSpy.shutdownNow();
}
@Test
doReturn(Futures.immediateCheckedFuture(result)).when(deviceRpc).invokeRpc(any(SchemaPath.class), any(NormalizedNode.class));
final KeepaliveSalFacade keepaliveSalFacade =
- new KeepaliveSalFacade(REMOTE_DEVICE_ID, underlyingSalFacade, executorService, 1L);
+ new KeepaliveSalFacade(REMOTE_DEVICE_ID, underlyingSalFacade, executorServiceSpy, 1L);
keepaliveSalFacade.setListener(listener);
keepaliveSalFacade.onDeviceConnected(null, null, deviceRpc);
.when(deviceRpc).invokeRpc(any(SchemaPath.class), any(NormalizedNode.class));
final KeepaliveSalFacade keepaliveSalFacade =
- new KeepaliveSalFacade(REMOTE_DEVICE_ID, underlyingSalFacade, executorService, 1L);
+ new KeepaliveSalFacade(REMOTE_DEVICE_ID, underlyingSalFacade, executorServiceSpy, 1L);
keepaliveSalFacade.setListener(listener);
keepaliveSalFacade.onDeviceConnected(null, null, deviceRpc);
// 1 failed that results in disconnect, 3 total with previous fail
verify(listener, timeout(15000).times(3)).disconnect();
+
+
+ Mockito.when(currentKeepalive.isDone()).thenReturn(false);
+ keepaliveSalFacade.onDeviceConnected(null, null, deviceRpc);
+ // 1 failed that results in disconnect, 4 total with previous fail
+ verify(listener, timeout(15000).times(4)).disconnect();
}
@Test
.when(deviceRpc).invokeRpc(any(SchemaPath.class), any(NormalizedNode.class));
final KeepaliveSalFacade keepaliveSalFacade =
- new KeepaliveSalFacade(REMOTE_DEVICE_ID, underlyingSalFacade, executorService, 100L);
+ new KeepaliveSalFacade(REMOTE_DEVICE_ID, underlyingSalFacade, executorServiceSpy, 100L);
keepaliveSalFacade.setListener(listener);
keepaliveSalFacade.onDeviceConnected(null, null, deviceRpc);
@Arg(dest = "devices-count")
public int deviceCount;
+ @Arg(dest = "devices-per-port")
+ public int devicesPerPort;
+
@Arg(dest = "starting-port")
public int startingPort;
parser.addArgument("--device-count")
.type(Integer.class)
.setDefault(1)
- .type(Integer.class)
- .help("Number of simulated netconf devices to spin")
+ .help("Number of simulated netconf devices to spin. This is the number of actual ports open for the devices.")
.dest("devices-count");
+ parser.addArgument("--devices-per-port")
+ .type(Integer.class)
+ .setDefault(1)
+ .help("Amount of config files generated per port to spoof more devices then are actually running")
+ .dest("devices-per-port");
+
parser.addArgument("--schemas-dir")
.type(File.class)
.help("Directory containing yang schemas to describe simulated devices. Some schemas e.g. netconf monitoring and inet types are included by default")
void validate() {
checkArgument(deviceCount > 0, "Device count has to be > 0");
checkArgument(startingPort > 1023, "Starting port has to be > 1023");
+ checkArgument(devicesPerPort > 0, "Atleast one device per port needed");
if(schemasDir != null) {
checkArgument(schemasDir.exists(), "Schemas dir has to exist");
}
if(params.distroFolder != null) {
final ConfigGenerator configGenerator = new ConfigGenerator(params.distroFolder, openDevices);
- final List<File> generated = configGenerator.generate(params.ssh, params.generateConfigBatchSize, params.generateConfigsTimeout, params.generateConfigsAddress);
+ final List<File> generated = configGenerator.generate(
+ params.ssh, params.generateConfigBatchSize,
+ params.generateConfigsTimeout, params.generateConfigsAddress,
+ params.devicesPerPort);
configGenerator.updateFeatureFile(generated);
configGenerator.changeLoadOrder();
}
this.openDevices = openDevices;
}
- public List<File> generate(final boolean useSsh, final int batchSize, final int generateConfigsTimeout, final String address) {
+ public List<File> generate(final boolean useSsh, final int batchSize,
+ final int generateConfigsTimeout, final String address,
+ final int devicesPerPort) {
if(configDir.exists() == false) {
Preconditions.checkState(configDir.mkdirs(), "Unable to create directory " + configDir);
}
batchStart = openDevice;
}
- final String name = String.valueOf(openDevice) + SIM_DEVICE_SUFFIX;
- String configContent = String.format(middleBlueprint, name, address, String.valueOf(openDevice), String.valueOf(!useSsh));
- configContent = String.format("%s%s%d%s\n%s\n", configContent, "<connection-timeout-millis>", generateConfigsTimeout, "</connection-timeout-millis>", "</module>");
-
- b.append(configContent);
- connectorCount++;
- if(connectorCount == batchSize) {
- b.append(after);
- final File to = new File(configDir, String.format(SIM_DEVICE_CFG_PREFIX + "%d-%d.xml", batchStart, openDevice));
- generatedConfigs.add(to);
- Files.write(b.toString(), to, Charsets.UTF_8);
- connectorCount = 0;
- b = new StringBuilder();
- b.append(before);
- batchStart = null;
+ for (int i = 0; i < devicesPerPort; i++) {
+ final String name = String.valueOf(openDevice) + SIM_DEVICE_SUFFIX + (i == 0 ? "" : "-" + String.valueOf(i));
+ String configContent = String.format(middleBlueprint, name, address, String.valueOf(openDevice), String.valueOf(!useSsh));
+ configContent = String.format("%s%s%d%s\n%s\n", configContent, "<connection-timeout-millis>", generateConfigsTimeout, "</connection-timeout-millis>", "</module>");
+
+ b.append(configContent);
+ connectorCount++;
+ if(connectorCount == batchSize) {
+ b.append(after);
+ final File to = new File(configDir, String.format(SIM_DEVICE_CFG_PREFIX + "%d-%d.xml", batchStart, openDevice));
+ generatedConfigs.add(to);
+ Files.write(b.toString(), to, Charsets.UTF_8);
+ connectorCount = 0;
+ b = new StringBuilder();
+ b.append(before);
+ batchStart = null;
+ }
}
}
.getLogger(MdsalOperationProvider.class);
private final Set<Capability> caps;
- private final MdsalOperationService mdsalOperationService;
+ private final SchemaContext schemaContext;
public MdsalOperationProvider(final SessionIdProvider idProvider,
final Set<Capability> caps,
final SchemaContext schemaContext) {
this.caps = caps;
- mdsalOperationService = new MdsalOperationService(
- idProvider.getCurrentSessionId(), schemaContext, caps);
+ this.schemaContext = schemaContext;
}
@Override
@Override
public NetconfOperationService createService(String netconfSessionIdForReporting) {
- return mdsalOperationService;
+ return new MdsalOperationService(Long.parseLong(netconfSessionIdForReporting), schemaContext, caps);
}
static class MdsalOperationService implements NetconfOperationService {
private final long currentSessionId;
private final SchemaContext schemaContext;
private final Set<Capability> caps;
+ private final SchemaService schemaService;
+ private final DOMDataBroker dataBroker;
public MdsalOperationService(final long currentSessionId,
final SchemaContext schemaContext,
this.currentSessionId = currentSessionId;
this.schemaContext = schemaContext;
this.caps = caps;
+ this.schemaService = createSchemaService();
+
+ this.dataBroker = createDataStore(schemaService);
+
}
@Override
public Set<NetconfOperation> getNetconfOperations() {
- final SchemaService schemaService = createSchemaService();
-
- final DOMDataBroker db = createDataStore(schemaService);
- TransactionProvider transactionProvider = new TransactionProvider(db, String.valueOf(currentSessionId));
+ TransactionProvider transactionProvider = new TransactionProvider(dataBroker, String.valueOf(currentSessionId));
CurrentSchemaContext currentSchemaContext = new CurrentSchemaContext(schemaService);
ContainerNode netconf = createNetconfState();
YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().node(NetconfState.QNAME)
.build();
- final DOMDataWriteTransaction tx = db.newWriteOnlyTransaction();
+ final DOMDataWriteTransaction tx = dataBroker.newWriteOnlyTransaction();
tx.put(LogicalDatastoreType.OPERATIONAL, yangInstanceIdentifier, netconf);
try {
? NetconfServerSessionNegotiatorFactory.DEFAULT_BASE_CAPABILITIES
: Sets.newHashSet(XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0, XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_1);
- final NetconfServerSessionNegotiatorFactory serverNegotiatorFactory = new NetconfServerSessionNegotiatorFactory(
+ final NetconfServerSessionNegotiatorFactory serverNegotiatorFactory = new TesttoolNegotiationFactory(
hashedWheelTimer, aggregatedNetconfOperationServiceFactory, idProvider, generateConfigsTimeout, monitoringService1, serverCapabilities);
final NetconfServerDispatcherImpl.ServerChannelInitializer serverChannelInitializer = new NetconfServerDispatcherImpl.ServerChannelInitializer(
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.test.tool;
+
+import io.netty.util.Timer;
+import java.net.SocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import org.opendaylight.netconf.api.monitoring.NetconfMonitoringService;
+import org.opendaylight.netconf.impl.NetconfServerSessionNegotiatorFactory;
+import org.opendaylight.netconf.impl.SessionIdProvider;
+import org.opendaylight.netconf.mapping.api.NetconfOperationService;
+import org.opendaylight.netconf.mapping.api.NetconfOperationServiceFactory;
+
+public class TesttoolNegotiationFactory extends NetconfServerSessionNegotiatorFactory {
+
+ private final Map<SocketAddress, NetconfOperationService> cachedOperationServices = new HashMap<>();
+
+ public TesttoolNegotiationFactory(final Timer timer, final NetconfOperationServiceFactory netconfOperationProvider,
+ final SessionIdProvider idProvider, final long connectionTimeoutMillis,
+ final NetconfMonitoringService monitoringService) {
+ super(timer, netconfOperationProvider, idProvider, connectionTimeoutMillis, monitoringService);
+ }
+
+ public TesttoolNegotiationFactory(final Timer timer, final NetconfOperationServiceFactory netconfOperationProvider,
+ final SessionIdProvider idProvider, final long connectionTimeoutMillis,
+ final NetconfMonitoringService monitoringService, final Set<String> baseCapabilities) {
+ super(timer, netconfOperationProvider, idProvider, connectionTimeoutMillis, monitoringService, baseCapabilities);
+ }
+
+ @Override
+ protected NetconfOperationService getOperationServiceForAddress(final String netconfSessionIdForReporting, final SocketAddress socketAddress) {
+ if (cachedOperationServices.containsKey(socketAddress)) {
+ return cachedOperationServices.get(socketAddress);
+ } else {
+ final NetconfOperationService service = getOperationServiceFactory().createService(netconfSessionIdForReporting);
+ cachedOperationServices.put(socketAddress, service);
+ return service;
+ }
+ }
+}
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationNotAvailableException;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
import org.opendaylight.controller.md.sal.dom.spi.DefaultDOMRpcResult;
if (cause instanceof IllegalArgumentException) {
throw new RestconfDocumentedException(cause.getMessage(), ErrorType.PROTOCOL,
ErrorTag.INVALID_VALUE);
+ } else if (cause instanceof DOMRpcImplementationNotAvailableException) {
+ throw new RestconfDocumentedException(cause.getMessage(), ErrorType.APPLICATION, ErrorTag.OPERATION_NOT_SUPPORTED);
}
throw new RestconfDocumentedException("The operation encountered an unexpected error while executing.",cause);
} else {
restconfImpl.invokeRpc("toaster:cancel-toast", "", uriInfo);
fail("Expected an exception to be thrown.");
} catch (final RestconfDocumentedException e) {
- verifyRestconfDocumentedException(e, 0, ErrorType.APPLICATION, ErrorTag.OPERATION_FAILED,
+ verifyRestconfDocumentedException(e, 0, ErrorType.APPLICATION, ErrorTag.OPERATION_NOT_SUPPORTED,
Optional.<String> absent(), Optional.<String> absent());
}
}
!org.apache.maven.project,
!org.opendaylight.yangtools.yang2sources.spi,
*,
- com.sun.jersey.spi.container.servlet, org.eclipse.jetty.servlets
+ com.sun.jersey.spi.container.servlet,
+ org.eclipse.jetty.servlets,
+ org.opendaylight.aaa.shiro.filters,
+ org.opendaylight.aaa.shiro.realm,
+ org.opendaylight.aaa.shiro.web.env,
+ org.apache.shiro.web.env
</Import-Package>
<Bundle-Activator>org.opendaylight.netconf.sal.rest.doc.DocProvider</Bundle-Activator>
<Web-ContextPath>/apidoc</Web-ContextPath>
<param-name>javax.ws.rs.Application</param-name>
<param-value>org.opendaylight.netconf.sal.rest.doc.jaxrs.ApiDocApplication</param-value>
</init-param>
- <!-- AAA Auth Filter -->
- <init-param>
- <param-name>com.sun.jersey.spi.container.ContainerRequestFilters</param-name>
- <param-value> org.opendaylight.aaa.sts.TokenAuthFilter</param-value>
- </init-param>
<load-on-startup>1</load-on-startup>
</servlet>
+ <context-param>
+ <param-name>shiroEnvironmentClass</param-name>
+ <param-value>org.opendaylight.aaa.shiro.web.env.KarafIniWebEnvironment</param-value>
+ </context-param>
+
+ <listener>
+ <listener-class>org.apache.shiro.web.env.EnvironmentLoaderListener</listener-class>
+ </listener>
+
+ <filter>
+ <filter-name>ShiroFilter</filter-name>
+ <filter-class>org.opendaylight.aaa.shiro.filters.AAAFilter</filter-class>
+ </filter>
+
+ <filter-mapping>
+ <filter-name>ShiroFilter</filter-name>
+ <url-pattern>/*</url-pattern>
+ </filter-mapping>
+
<servlet-mapping>
<servlet-name>JAXRSApiDoc</servlet-name>
<url-pattern>/apis/*</url-pattern>
<ietf-restconf.version>2013.10.19.8-SNAPSHOT</ietf-restconf.version>
<ietf-topology.version>2013.10.21.8-SNAPSHOT</ietf-topology.version>
<ietf-yang-types.version>2010.09.24.8-SNAPSHOT</ietf-yang-types.version>
- <jersey-servlet.version>1.17</jersey-servlet.version>
<mdsal.version>2.0.0-SNAPSHOT</mdsal.version>
<mdsal.model.version>0.8.0-SNAPSHOT</mdsal.model.version>
<scala.major.version>2.10</scala.major.version>
<scala.minor.version>4</scala.minor.version>
<surefire.version>2.15</surefire.version>
- <typesafe.config.version>1.2.1</typesafe.config.version>
<yangtools.version>0.8.0-SNAPSHOT</yangtools.version>
<jmxGeneratorPath>src/main/yang-gen-config</jmxGeneratorPath>