package org.opendaylight.netconf.topology.util;
+import akka.actor.ActorContext;
+import akka.actor.ActorIdentity;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Address;
+import akka.actor.Identify;
import akka.actor.TypedActor;
import akka.actor.TypedActorExtension;
import akka.actor.TypedProps;
import akka.cluster.ClusterEvent.UnreachableMember;
import akka.cluster.Member;
import akka.dispatch.OnComplete;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
+import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.netconf.topology.RoleChangeStrategy;
import org.opendaylight.netconf.topology.StateAggregator;
import org.opendaylight.netconf.topology.TopologyManager;
import org.opendaylight.yangtools.binding.data.codec.impl.BindingNormalizedNodeCodecRegistry;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.impl.Promise.DefaultPromise;
public final class BaseTopologyManager
implements TopologyManager {
private static final Logger LOG = LoggerFactory.getLogger(BaseTopologyManager.class);
+ private static final InstanceIdentifier<NetworkTopology> NETWORK_TOPOLOGY_PATH = InstanceIdentifier.builder(NetworkTopology.class).build();
+
+ private final KeyedInstanceIdentifier<Topology, TopologyKey> topologyListPath;
private final ActorSystem system;
private final TypedActorExtension typedExtension;
// election has not yet happened
this.isMaster = isMaster;
+ this.topologyListPath = NETWORK_TOPOLOGY_PATH.child(Topology.class, new TopologyKey(new TopologyId(topologyId)));
+
LOG.debug("Base manager started ", +id);
}
final String path = member.address() + PATH + topologyId;
LOG.debug("Actor at :{} is resolving topology actor for path {}", clusterExtension.selfAddress(), path);
- clusterExtension.system().actorSelection(path).tell(new CustomIdentifyMessage(clusterExtension.selfAddress()), TypedActor.context().self());
+ // first send basic identify message in case our messages have not been loaded through osgi yet to prevent crashing akka.
+ clusterExtension.system().actorSelection(path).tell(new Identify(member.address()), TypedActor.context().self());
} else if (message instanceof MemberExited) {
// remove peer
final Member member = ((MemberExited) message).member();
final String path = member.address() + PATH + topologyId;
LOG.debug("Actor at :{} is resolving topology actor for path {}", clusterExtension.selfAddress(), path);
+ clusterExtension.system().actorSelection(path).tell(new Identify(member.address()), TypedActor.context().self());
+ } else if (message instanceof ActorIdentity) {
+ LOG.debug("Received ActorIdentity message", message);
+ final String path = ((ActorIdentity) message).correlationId() + PATH + topologyId;
+ if (((ActorIdentity) message).getRef() == null) {
+ LOG.debug("ActorIdentity has null actor ref, retrying..", message);
+ final ActorRef self = TypedActor.context().self();
+ final ActorContext context = TypedActor.context();
+ system.scheduler().scheduleOnce(new FiniteDuration(5, TimeUnit.SECONDS), new Runnable() {
+ @Override
+ public void run() {
+ LOG.debug("Retrying identify message from master to node {} , full path {}", ((ActorIdentity) message).correlationId(), path);
+ context.system().actorSelection(path).tell(new Identify(((ActorIdentity) message).correlationId()), self);
+
+ }
+ }, system.dispatcher());
+ return;
+ }
+ LOG.debug("Actor at :{} is resolving topology actor for path {}, with a custom message", clusterExtension.selfAddress(), path);
+
clusterExtension.system().actorSelection(path).tell(new CustomIdentifyMessage(clusterExtension.selfAddress()), TypedActor.context().self());
} else if (message instanceof CustomIdentifyMessageReply) {
- LOG.debug("Received a custom identify reply message from: {}", ((CustomIdentifyMessageReply) message).getAddress());
+
+ LOG.warn("Received a custom identify reply message from: {}", ((CustomIdentifyMessageReply) message).getAddress());
if (!peers.containsKey(((CustomIdentifyMessage) message).getAddress())) {
final TopologyManager peer = typedExtension.typedActorOf(new TypedProps<>(TopologyManager.class, BaseTopologyManager.class), actorRef);
peers.put(((CustomIdentifyMessageReply) message).getAddress(), peer);
+ if (isMaster) {
+ resyncPeer(peer);
+ }
}
} else if (message instanceof CustomIdentifyMessage) {
- LOG.debug("Received a custom identify message from: {}", ((CustomIdentifyMessage) message).getAddress());
+ LOG.warn("Received a custom identify message from: {}", ((CustomIdentifyMessage) message).getAddress());
if (!peers.containsKey(((CustomIdentifyMessage) message).getAddress())) {
final TopologyManager peer = typedExtension.typedActorOf(new TypedProps<>(TopologyManager.class, BaseTopologyManager.class), actorRef);
peers.put(((CustomIdentifyMessage) message).getAddress(), peer);
+ if (isMaster) {
+ resyncPeer(peer);
+ }
}
actorRef.tell(new CustomIdentifyMessageReply(clusterExtension.selfAddress()), TypedActor.context().self());
}
}
+
+ private void resyncPeer(final TopologyManager peer) {
+ final ReadOnlyTransaction rTx = dataBroker.newReadOnlyTransaction();
+ final CheckedFuture<Optional<Topology>, ReadFailedException> read = rTx.read(LogicalDatastoreType.CONFIGURATION, topologyListPath);
+
+ Futures.addCallback(read, new FutureCallback<Optional<Topology>>() {
+ @Override
+ public void onSuccess(Optional<Topology> result) {
+ if (result.isPresent()) {
+ for (final Node node : result.get().getNode()) {
+ final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry = codecRegistry.toNormalizedNode(getNodeIid(topologyId), node);
+ peer.onRemoteNodeCreated(new NormalizedNodeMessage(entry.getKey(), entry.getValue()));
+ // we dont care about the future from now on since we will be notified by the onConnected event
+ }
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ LOG.error("Unable to read from datastore");
+ }
+ });
+
+ }
}
import akka.actor.TypedActor;
import akka.actor.TypedProps;
import akka.cluster.Cluster;
-import akka.cluster.Member;
import akka.dispatch.OnComplete;
import com.google.common.base.Function;
import com.google.common.collect.FluentIterable;
@Nullable final Node configNode) {
final NetconfNode netconfNode = configNode == null ? currentOperationalNode.getAugmentation(NetconfNode.class) : configNode.getAugmentation(NetconfNode.class);
- return new NodeBuilder()
+ final Node failedNode = new NodeBuilder()
.setNodeId(nodeId)
.addAugmentation(NetconfNode.class,
new NetconfNodeBuilder()
.build())
.build())
.build();
+
+ if (currentOperationalNode == null) {
+ currentOperationalNode = failedNode;
+ }
+
+ return failedNode;
}
@Nonnull @Override public ListenableFuture<Node> onNodeCreated(@Nonnull final NodeId nodeId,
@Nonnull final Node configNode) {
// first disconnect this node
topologyDispatcher.unregisterMountPoint(nodeId);
- registration.close();
+ if (registration != null) {
+ registration.close();
+ }
topologyDispatcher.disconnectNode(nodeId);
// now reinit this connection with new settings
Futures.addCallback(connectionFuture, new FutureCallback<NetconfDeviceCapabilities>() {
@Override
public void onSuccess(@Nullable NetconfDeviceCapabilities result) {
- registration = topologyDispatcher.registerConnectionStatusListener(nodeId, NetconfNodeManagerCallback.this);
+ registration = topologyDispatcher.registerConnectionStatusListener(nodeId, nodeManager);
}
@Override
@Nonnull @Override public ListenableFuture<Void> onNodeDeleted(@Nonnull final NodeId nodeId) {
// cleanup and disconnect
topologyDispatcher.unregisterMountPoint(nodeId);
- registration.close();
+ if (registration != null) {
+ registration.close();
+ }
roleChangeStrategy.unregisterRoleCandidate();
return topologyDispatcher.disconnectNode(nodeId);
}
@Override
public void onRoleChanged(final RoleChangeDTO roleChangeDTO) {
- topologyDispatcher.unregisterMountPoint(currentOperationalNode.getNodeId());
+ topologyDispatcher.unregisterMountPoint(new NodeId(nodeId));
isMaster = roleChangeDTO.isOwner();
if (isMaster) {
connected = false;
if (isMaster) {
// announce that master mount point is going down
- for (final Member member : clusterExtension.state().getMembers()) {
- actorSystem.actorSelection(member.address() + "/user/" + topologyId + "/" + nodeId).tell(new AnnounceMasterMountPointDown(), null);
- }
+// for (final Member member : clusterExtension.state().getMembers()) {
+// actorSystem.actorSelection(member.address() + "/user/" + topologyId + "/" + nodeId).tell(new AnnounceMasterMountPointDown(), null);
+// }
// set master to false since we are unregistering, the ownershipChanged callback can sometimes lag behind causing multiple nodes behaving as masters
isMaster = false;
// onRoleChanged() callback can sometimes lag behind, so unregister the mount right when it disconnects