package org.opendaylight.netconf.topology.impl;
+import akka.actor.ActorContext;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.TypedActor;
import akka.actor.TypedProps;
import akka.cluster.Cluster;
+import akka.cluster.Member;
import akka.dispatch.OnComplete;
import com.google.common.base.Function;
import com.google.common.collect.FluentIterable;
import javax.annotation.Nullable;
import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
-import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCapabilities;
import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
import org.opendaylight.netconf.topology.NetconfTopology;
+import org.opendaylight.netconf.topology.NodeManager;
import org.opendaylight.netconf.topology.NodeManagerCallback;
import org.opendaylight.netconf.topology.RoleChangeStrategy;
import org.opendaylight.netconf.topology.TopologyManager;
import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade.ConnectionStatusListenerRegistration;
+import org.opendaylight.netconf.topology.util.BaseNodeManager;
import org.opendaylight.netconf.topology.util.BaseTopologyManager;
+import org.opendaylight.netconf.topology.util.messages.AnnounceMasterMountPoint;
+import org.opendaylight.netconf.topology.util.messages.AnnounceMasterMountPointDown;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeConnectionStatus.ConnectionStatus;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
-public class NetconfNodeManagerCallback implements NodeManagerCallback, RemoteDeviceHandler<NetconfSessionPreferences>{
+public class NetconfNodeManagerCallback implements NodeManagerCallback{
private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeManagerCallback.class);
private String nodeId;
private String topologyId;
private TopologyManager topologyManager;
+ private NodeManager nodeManager;
+ // cached context so that we can use it in callbacks from topology
+ private ActorContext cachedContext;
private Node currentConfig;
private Node currentOperationalNode;
private ConnectionStatusListenerRegistration registration = null;
+ private ActorRef masterDataBrokerRef = null;
+ private boolean connected = false;
+
public NetconfNodeManagerCallback(final String nodeId,
final String topologyId,
final ActorSystem actorSystem,
topologyManager = TypedActor.get(actorSystem).typedActorOf(new TypedProps<>(TopologyManager.class, BaseTopologyManager.class), actorRef);
}
}, actorSystem.dispatcher());
+
+ final Future<ActorRef> nodeRefFuture = actorSystem.actorSelection("/user/" + topologyId + "/" + nodeId).resolveOne(FiniteDuration.create(10L, TimeUnit.SECONDS));
+ nodeRefFuture.onComplete(new OnComplete<ActorRef>() {
+ @Override
+ public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable {
+ if (throwable != null) {
+ LOG.warn("Unable to resolve actor for path: {} ", "/user/" + topologyId + "/" + nodeId, throwable);
+ }
+ LOG.debug("Actor ref for path {} resolved", "/user/" + topologyId);
+ nodeManager = TypedActor.get(actorSystem).typedActorOf(new TypedProps<>(NodeManager.class, BaseNodeManager.class), actorRef);
+ }
+ }, actorSystem.dispatcher());
}
@Nonnull @Override public ListenableFuture<Node> onNodeCreated(@Nonnull final NodeId nodeId,
@Nonnull final Node configNode) {
+ cachedContext = TypedActor.context();
this.nodeId = nodeId.getValue();
this.currentConfig = configNode;
// set initial state before anything happens
Futures.addCallback(connectionFuture, new FutureCallback<NetconfDeviceCapabilities>() {
@Override
public void onSuccess(@Nullable NetconfDeviceCapabilities result) {
- registration = topologyDispatcher.registerConnectionStatusListener(nodeId, NetconfNodeManagerCallback.this);
+ registration = topologyDispatcher.registerConnectionStatusListener(nodeId, nodeManager);
}
@Override
.setAvailableCapabilities(new AvailableCapabilitiesBuilder().build())
.setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().build())
.build())
- .build();
+ .build();
}
});
}
@Override
public void onRoleChanged(final RoleChangeDTO roleChangeDTO) {
- if (roleChangeDTO.isOwner() && roleChangeDTO.wasOwner()) {
- return;
- }
+ topologyDispatcher.unregisterMountPoint(currentOperationalNode.getNodeId());
+
isMaster = roleChangeDTO.isOwner();
- //TODO instead of registering mount point, init remote schema repo when its done
if (isMaster) {
- // unregister old mountPoint if ownership changed, register a new one
- topologyDispatcher.registerMountPoint(new NodeId(nodeId));
+ LOG.warn("Gained ownership of node - registering master mount point");
+ topologyDispatcher.registerMountPoint(TypedActor.context(), new NodeId(nodeId));
} else {
- topologyDispatcher.unregisterMountPoint(new NodeId(nodeId));
+ // even though mount point is ready, we dont know who the master mount point will be since we havent received the announce msg
+ // after we receive the message we can go ahead and register the mount point
+ if (connected && masterDataBrokerRef != null) {
+ topologyDispatcher.registerMountPoint(TypedActor.context(), new NodeId(nodeId), masterDataBrokerRef);
+ } else {
+ LOG.debug("Mount point is ready, still waiting for master mount point");
+ }
}
}
public void onDeviceConnected(final SchemaContext remoteSchemaContext, final NetconfSessionPreferences netconfSessionPreferences, final DOMRpcService deviceRpc) {
// we need to notify the higher level that something happened, get a current status from all other nodes, and aggregate a new result
LOG.debug("onDeviceConnected received, registering role candidate");
- roleChangeStrategy.registerRoleCandidate(this);
+ connected = true;
+ roleChangeStrategy.registerRoleCandidate(nodeManager);
+ if (!isMaster && masterDataBrokerRef != null) {
+ // if we're not master but one is present already, we need to register mountpoint
+ LOG.warn("Device connected, master already present in topology, registering mount point");
+ topologyDispatcher.registerMountPoint(cachedContext, new NodeId(nodeId), masterDataBrokerRef);
+ }
List<String> capabilityList = new ArrayList<>();
capabilityList.addAll(netconfSessionPreferences.getNetconfDeviceCapabilities().getNonModuleBasedCapabilities());
capabilityList.addAll(FluentIterable.from(netconfSessionPreferences.getNetconfDeviceCapabilities().getResolvedCapabilities()).transform(AVAILABLE_CAPABILITY_TRANSFORMER).toList());
.setUnavailableCapabilities(unavailableCapabilities)
.build())
.build();
- // TODO need to implement forwarding of this msg to master
topologyManager.notifyNodeStatusChange(new NodeId(nodeId));
}
@Override
public void onDeviceDisconnected() {
// we need to notify the higher level that something happened, get a current status from all other nodes, and aggregate a new result
- // no need to remove mountpoint, we should receive onRoleChanged callback after unregistering from election that unregisters the mountpoint
LOG.debug("onDeviceDisconnected received, unregistering role candidate");
- topologyDispatcher.unregisterMountPoint(currentOperationalNode.getNodeId());
+ connected = false;
+ if (isMaster) {
+ // announce that master mount point is going down
+ for (final Member member : clusterExtension.state().getMembers()) {
+ actorSystem.actorSelection(member.address() + "/user/" + topologyId + "/" + nodeId).tell(new AnnounceMasterMountPointDown(), null);
+ }
+ // set master to false since we are unregistering, the ownershipChanged callback can sometimes lag behind causing multiple nodes behaving as masters
+ isMaster = false;
+ // onRoleChanged() callback can sometimes lag behind, so unregister the mount right when it disconnects
+ topologyDispatcher.unregisterMountPoint(new NodeId(nodeId));
+ }
roleChangeStrategy.unregisterRoleCandidate();
+
final NetconfNode netconfNode = currentConfig.getAugmentation(NetconfNode.class);
currentOperationalNode = new NodeBuilder().setNodeId(new NodeId(nodeId))
.addAugmentation(NetconfNode.class,
.setHost(netconfNode.getHost())
.setPort(netconfNode.getPort())
.build()).build();
- // TODO need to implement forwarding of this msg to master
topologyManager.notifyNodeStatusChange(new NodeId(nodeId));
}
// we need to notify the higher level that something happened, get a current status from all other nodes, and aggregate a new result
// no need to remove mountpoint, we should receive onRoleChanged callback after unregistering from election that unregisters the mountpoint
LOG.debug("onDeviceFailed received");
+ connected = false;
String reason = (throwable != null && throwable.getMessage() != null) ? throwable.getMessage() : UNKNOWN_REASON;
roleChangeStrategy.unregisterRoleCandidate();
topologyManager.notifyNodeStatusChange(new NodeId(nodeId));
}
-
@Override
public void onNotification(DOMNotification domNotification) {
//NOOP
}
@Override
- public void onReceive(Object o, ActorRef actorRef) {
-
+ public void onReceive(Object message, ActorRef actorRef) {
+ LOG.warn("Netconf node callback received message {}", message);
+ if (message instanceof AnnounceMasterMountPoint) {
+ masterDataBrokerRef = actorRef;
+ // candidate gets registered when mount point is already prepared so we can go ahead a register it
+ if (roleChangeStrategy.isCandidateRegistered()) {
+ topologyDispatcher.registerMountPoint(TypedActor.context(), new NodeId(nodeId), masterDataBrokerRef);
+ } else {
+ LOG.warn("Announce master mount point msg received but mount point is not ready yet");
+ }
+ } else if (message instanceof AnnounceMasterMountPointDown) {
+ LOG.warn("Master mountpoint went down");
+ masterDataBrokerRef = null;
+ topologyDispatcher.unregisterMountPoint(new NodeId(nodeId));
+ }
}
}
\ No newline at end of file