From: Tomas Cere Date: Mon, 30 Nov 2015 16:15:42 +0000 (+0100) Subject: RemoteDeviceDataBroker proxy X-Git-Tag: release/beryllium~45^2~7 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=commitdiff_plain;h=1145d5eeda7ec266e65ec00dc43f8136b1e762ac;p=netconf.git RemoteDeviceDataBroker proxy Change-Id: Ifdb7b905f95dc2af5e31003e59405ebc446d85d3 Signed-off-by: Tomas Cere --- diff --git a/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/NodeManager.java b/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/NodeManager.java index b4b26a859a..c8ca14d252 100644 --- a/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/NodeManager.java +++ b/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/NodeManager.java @@ -10,11 +10,13 @@ package org.opendaylight.netconf.topology; import akka.actor.TypedActor.Receiver; import com.google.common.annotations.Beta; +import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler; +import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences; /** * Node manager that handles communication between node managers and delegates calls to the customizable NodeManagerCallback */ @Beta -public interface NodeManager extends InitialStateProvider, NodeListener, Receiver, RemoteNodeListener { +public interface NodeManager extends InitialStateProvider, NodeListener, Receiver, RemoteNodeListener, RemoteDeviceHandler { } diff --git a/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/NodeManagerCallback.java b/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/NodeManagerCallback.java index fd0444aa5c..670bb81ebe 100644 --- a/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/NodeManagerCallback.java +++ b/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/NodeManagerCallback.java @@ -11,12 +11,14 @@ package org.opendaylight.netconf.topology; import akka.actor.ActorSystem; import akka.actor.TypedActor.Receiver; import com.google.common.annotations.Beta; +import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler; +import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences; /** * Customizable layer that handles communication with your application. */ @Beta -public interface NodeManagerCallback extends InitialStateProvider, NodeListener, Receiver { +public interface NodeManagerCallback extends InitialStateProvider, NodeListener, Receiver, RemoteDeviceHandler { interface NodeManagerCallbackFactory { NodeManagerCallback create(String nodeId, String topologyId, ActorSystem actorSystem); diff --git a/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/RoleChangeStrategy.java b/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/RoleChangeStrategy.java index 39cccf653c..f4f3013b7f 100644 --- a/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/RoleChangeStrategy.java +++ b/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/RoleChangeStrategy.java @@ -30,4 +30,10 @@ public interface RoleChangeStrategy extends RoleChangeListener { */ void unregisterRoleCandidate(); + /** + * + * @return True/False based on if this candidate is already registered into ownership service + */ + boolean isCandidateRegistered(); + } diff --git a/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/example/ExampleNodeManagerCallback.java b/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/example/ExampleNodeManagerCallback.java index 05b69fc79a..823c44b47a 100644 --- a/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/example/ExampleNodeManagerCallback.java +++ b/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/example/ExampleNodeManagerCallback.java @@ -12,6 +12,9 @@ import akka.actor.ActorRef; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import javax.annotation.Nonnull; +import org.opendaylight.controller.md.sal.dom.api.DOMNotification; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; +import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences; import org.opendaylight.netconf.topology.NodeManagerCallback; import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode; import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeBuilder; @@ -19,6 +22,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev15 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; public class ExampleNodeManagerCallback implements NodeManagerCallback { @@ -70,4 +74,29 @@ public class ExampleNodeManagerCallback implements NodeManagerCallback { public void onRoleChanged(RoleChangeDTO roleChangeDTO) { } + + @Override + public void onDeviceConnected(SchemaContext remoteSchemaContext, NetconfSessionPreferences netconfSessionPreferences, DOMRpcService deviceRpc) { + + } + + @Override + public void onDeviceDisconnected() { + + } + + @Override + public void onDeviceFailed(Throwable throwable) { + + } + + @Override + public void onNotification(DOMNotification domNotification) { + + } + + @Override + public void close() { + + } } diff --git a/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/util/BaseNodeManager.java b/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/util/BaseNodeManager.java index 03151505f7..2f8b1b7f62 100644 --- a/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/util/BaseNodeManager.java +++ b/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/util/BaseNodeManager.java @@ -17,6 +17,9 @@ import akka.japi.Creator; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ListenableFuture; import javax.annotation.Nonnull; +import org.opendaylight.controller.md.sal.dom.api.DOMNotification; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; +import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences; import org.opendaylight.netconf.topology.NodeManager; import org.opendaylight.netconf.topology.NodeManagerCallback; import org.opendaylight.netconf.topology.NodeManagerCallback.NodeManagerCallbackFactory; @@ -24,6 +27,7 @@ import org.opendaylight.netconf.topology.RoleChangeStrategy; import org.opendaylight.netconf.topology.util.messages.NormalizedNodeMessage; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.Future; @@ -36,7 +40,7 @@ public final class BaseNodeManager implements NodeManager { private final String topologyId; private final ActorSystem actorSystem; - private boolean isMaster; + private boolean isMaster = false; private NodeManagerCallback delegate; private BaseNodeManager(final String nodeId, @@ -99,7 +103,7 @@ public final class BaseNodeManager implements NodeManager { @Override public void onReceive(Object o, ActorRef actorRef) { - + delegate.onReceive(o, actorRef); } @Override @@ -122,6 +126,31 @@ public final class BaseNodeManager implements NodeManager { return null; } + @Override + public void onDeviceConnected(SchemaContext remoteSchemaContext, NetconfSessionPreferences netconfSessionPreferences, DOMRpcService deviceRpc) { + delegate.onDeviceConnected(remoteSchemaContext, netconfSessionPreferences, deviceRpc); + } + + @Override + public void onDeviceDisconnected() { + delegate.onDeviceDisconnected(); + } + + @Override + public void onDeviceFailed(Throwable throwable) { + delegate.onDeviceFailed(throwable); + } + + @Override + public void onNotification(DOMNotification domNotification) { + delegate.onNotification(domNotification); + } + + @Override + public void close() { + // NOOP + } + /** * Builder of BaseNodeManager instances that are proxied as TypedActors */ diff --git a/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/util/NodeRoleChangeStrategy.java b/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/util/NodeRoleChangeStrategy.java index 55bdb2e2dd..3cafd35b55 100644 --- a/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/util/NodeRoleChangeStrategy.java +++ b/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/util/NodeRoleChangeStrategy.java @@ -27,6 +27,7 @@ public class NodeRoleChangeStrategy implements RoleChangeStrategy, EntityOwnersh private final EntityOwnershipService entityOwnershipService; private final String entityType; private final String entityName; + private final Entity entity; private NodeListener ownershipCandidate; private EntityOwnershipCandidateRegistration candidateRegistration = null; @@ -38,6 +39,7 @@ public class NodeRoleChangeStrategy implements RoleChangeStrategy, EntityOwnersh this.entityOwnershipService = entityOwnershipService; this.entityType = entityType + "/" + entityName; this.entityName = entityName; + this.entity = new Entity(this.entityType, entityName); } @Override @@ -48,7 +50,7 @@ public class NodeRoleChangeStrategy implements RoleChangeStrategy, EntityOwnersh if (candidateRegistration != null) { unregisterRoleCandidate(); } - candidateRegistration = entityOwnershipService.registerCandidate(new Entity(entityType, entityName)); + candidateRegistration = entityOwnershipService.registerCandidate(entity); ownershipListenerRegistration = entityOwnershipService.registerListener(entityType, this); } catch (CandidateAlreadyRegisteredException e) { LOG.error("Candidate already registered for election", e); @@ -65,6 +67,11 @@ public class NodeRoleChangeStrategy implements RoleChangeStrategy, EntityOwnersh ownershipListenerRegistration = null; } + @Override + public boolean isCandidateRegistered() { + return entityOwnershipService.isCandidateRegistered(entity); + } + @Override public void onRoleChanged(RoleChangeDTO roleChangeDTO) { LOG.debug("Role was changed {}", roleChangeDTO); diff --git a/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/util/NoopRoleChangeStrategy.java b/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/util/NoopRoleChangeStrategy.java index ab76cc2d64..ea6e5d5eeb 100644 --- a/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/util/NoopRoleChangeStrategy.java +++ b/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/util/NoopRoleChangeStrategy.java @@ -27,6 +27,11 @@ public class NoopRoleChangeStrategy implements RoleChangeStrategy { } + @Override + public boolean isCandidateRegistered() { + return false; + } + @Override public void onRoleChanged(RoleChangeDTO roleChangeDTO) { diff --git a/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/util/TopologyRoleChangeStrategy.java b/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/util/TopologyRoleChangeStrategy.java index de9f7aca39..94cd8799af 100644 --- a/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/util/TopologyRoleChangeStrategy.java +++ b/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/util/TopologyRoleChangeStrategy.java @@ -49,7 +49,7 @@ public class TopologyRoleChangeStrategy implements RoleChangeStrategy, Clustered private NodeListener ownershipCandidate; private final String entityType; // use topologyId as entityName - private final String entityName; + private final Entity entity; private EntityOwnershipCandidateRegistration candidateRegistration = null; private EntityOwnershipListenerRegistration ownershipListenerRegistration = null; @@ -63,7 +63,7 @@ public class TopologyRoleChangeStrategy implements RoleChangeStrategy, Clustered this.dataBroker = dataBroker; this.entityOwnershipService = entityOwnershipService; this.entityType = entityType; - this.entityName = entityName; + this.entity = new Entity(entityType, entityName); datastoreListenerRegistration = null; } @@ -76,7 +76,7 @@ public class TopologyRoleChangeStrategy implements RoleChangeStrategy, Clustered if (candidateRegistration != null) { unregisterRoleCandidate(); } - candidateRegistration = entityOwnershipService.registerCandidate(new Entity(entityType, entityName)); + candidateRegistration = entityOwnershipService.registerCandidate(entity); ownershipListenerRegistration = entityOwnershipService.registerListener(entityType, this); } catch (CandidateAlreadyRegisteredException e) { LOG.error("Candidate already registered for election", e); @@ -92,6 +92,11 @@ public class TopologyRoleChangeStrategy implements RoleChangeStrategy, Clustered ownershipListenerRegistration = null; } + @Override + public boolean isCandidateRegistered() { + return entityOwnershipService.isCandidateRegistered(entity); + } + @Override public void onRoleChanged(RoleChangeDTO roleChangeDTO) { if (roleChangeDTO.isOwner()) { diff --git a/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/util/messages/AnnounceMasterMountPoint.java b/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/util/messages/AnnounceMasterMountPoint.java new file mode 100644 index 0000000000..6624a28d61 --- /dev/null +++ b/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/util/messages/AnnounceMasterMountPoint.java @@ -0,0 +1,17 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.netconf.topology.util.messages; + +import java.io.Serializable; + +public class AnnounceMasterMountPoint implements Serializable { + private static final long serialVersionUID = 1L; + + public AnnounceMasterMountPoint() {} +} diff --git a/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/util/messages/AnnounceMasterMountPointDown.java b/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/util/messages/AnnounceMasterMountPointDown.java new file mode 100644 index 0000000000..a7c3f8714e --- /dev/null +++ b/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/util/messages/AnnounceMasterMountPointDown.java @@ -0,0 +1,19 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.netconf.topology.util.messages; + +import java.io.Serializable; + +public class AnnounceMasterMountPointDown implements Serializable { + private static final long serialVersionUID = 1L; + + public AnnounceMasterMountPointDown() { + + } +} diff --git a/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/NetconfTopology.java b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/NetconfTopology.java index c9b5409d20..a8743ec56d 100644 --- a/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/NetconfTopology.java +++ b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/NetconfTopology.java @@ -8,6 +8,8 @@ package org.opendaylight.netconf.topology; +import akka.actor.ActorContext; +import akka.actor.ActorRef; import com.google.common.util.concurrent.ListenableFuture; import org.opendaylight.controller.md.sal.binding.api.DataBroker; import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler; @@ -28,7 +30,20 @@ public interface NetconfTopology { ListenableFuture disconnectNode(NodeId nodeId); - void registerMountPoint(NodeId nodeId); + /** + * register master mount point + * @param context + * @param nodeId + */ + void registerMountPoint(ActorContext context, NodeId nodeId); + + /** + * register slave mountpoint with the provided ActorRef + * @param context + * @param nodeId + * @param masterRef + */ + void registerMountPoint(ActorContext context, NodeId nodeId, ActorRef masterRef); void unregisterMountPoint(NodeId nodeId); diff --git a/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/impl/ClusteredNetconfTopology.java b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/impl/ClusteredNetconfTopology.java index 25b4e2d2f7..bf03fe1eaa 100644 --- a/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/impl/ClusteredNetconfTopology.java +++ b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/impl/ClusteredNetconfTopology.java @@ -8,6 +8,8 @@ package org.opendaylight.netconf.topology.impl; +import akka.actor.ActorContext; +import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.TypedActor; import akka.actor.TypedActorExtension; @@ -123,16 +125,21 @@ public class ClusteredNetconfTopology extends AbstractNetconfTopology implements @Override protected RemoteDeviceHandler createSalFacade(final RemoteDeviceId id, final Broker domBroker, final BindingAwareBroker bindingBroker, long defaultRequestTimeoutMillis) { - return new TopologyMountPointFacade(id, domBroker, bindingBroker, defaultRequestTimeoutMillis); + return new TopologyMountPointFacade(topologyId, id, domBroker, bindingBroker, defaultRequestTimeoutMillis); } @Override - public void registerMountPoint(NodeId nodeId) { - ((TopologyMountPointFacade) activeConnectors.get(nodeId).getFacade()).registerMountPoint(); + public void registerMountPoint(final ActorContext context, final NodeId nodeId) { + ((TopologyMountPointFacade) activeConnectors.get(nodeId).getFacade()).registerMountPoint(actorSystem, context); } @Override - public void unregisterMountPoint(NodeId nodeId) { + public void registerMountPoint(final ActorContext context, final NodeId nodeId, final ActorRef masterRef) { + ((TopologyMountPointFacade) activeConnectors.get(nodeId).getFacade()).registerMountPoint(actorSystem, context, masterRef); + } + + @Override + public void unregisterMountPoint(final NodeId nodeId) { Preconditions.checkState(activeConnectors.containsKey(nodeId), "Cannot unregister nonexistent mountpoint"); ((TopologyMountPointFacade) activeConnectors.get(nodeId).getFacade()).unregisterMountPoint(); } diff --git a/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/impl/NetconfNodeManagerCallback.java b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/impl/NetconfNodeManagerCallback.java index a17650a6fb..5f0eb2e179 100644 --- a/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/impl/NetconfNodeManagerCallback.java +++ b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/impl/NetconfNodeManagerCallback.java @@ -8,11 +8,13 @@ package org.opendaylight.netconf.topology.impl; +import akka.actor.ActorContext; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.TypedActor; import akka.actor.TypedProps; import akka.cluster.Cluster; +import akka.cluster.Member; import akka.dispatch.OnComplete; import com.google.common.base.Function; import com.google.common.collect.FluentIterable; @@ -29,15 +31,18 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.opendaylight.controller.md.sal.dom.api.DOMNotification; import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; -import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler; import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCapabilities; import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences; import org.opendaylight.netconf.topology.NetconfTopology; +import org.opendaylight.netconf.topology.NodeManager; import org.opendaylight.netconf.topology.NodeManagerCallback; import org.opendaylight.netconf.topology.RoleChangeStrategy; import org.opendaylight.netconf.topology.TopologyManager; import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade.ConnectionStatusListenerRegistration; +import org.opendaylight.netconf.topology.util.BaseNodeManager; import org.opendaylight.netconf.topology.util.BaseTopologyManager; +import org.opendaylight.netconf.topology.util.messages.AnnounceMasterMountPoint; +import org.opendaylight.netconf.topology.util.messages.AnnounceMasterMountPointDown; import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode; import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeConnectionStatus.ConnectionStatus; @@ -60,7 +65,7 @@ import org.slf4j.LoggerFactory; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; -public class NetconfNodeManagerCallback implements NodeManagerCallback, RemoteDeviceHandler{ +public class NetconfNodeManagerCallback implements NodeManagerCallback{ private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeManagerCallback.class); @@ -92,12 +97,18 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback, RemoteDe private String nodeId; private String topologyId; private TopologyManager topologyManager; + private NodeManager nodeManager; + // cached context so that we can use it in callbacks from topology + private ActorContext cachedContext; private Node currentConfig; private Node currentOperationalNode; private ConnectionStatusListenerRegistration registration = null; + private ActorRef masterDataBrokerRef = null; + private boolean connected = false; + public NetconfNodeManagerCallback(final String nodeId, final String topologyId, final ActorSystem actorSystem, @@ -123,6 +134,18 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback, RemoteDe topologyManager = TypedActor.get(actorSystem).typedActorOf(new TypedProps<>(TopologyManager.class, BaseTopologyManager.class), actorRef); } }, actorSystem.dispatcher()); + + final Future nodeRefFuture = actorSystem.actorSelection("/user/" + topologyId + "/" + nodeId).resolveOne(FiniteDuration.create(10L, TimeUnit.SECONDS)); + nodeRefFuture.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable { + if (throwable != null) { + LOG.warn("Unable to resolve actor for path: {} ", "/user/" + topologyId + "/" + nodeId, throwable); + } + LOG.debug("Actor ref for path {} resolved", "/user/" + topologyId); + nodeManager = TypedActor.get(actorSystem).typedActorOf(new TypedProps<>(NodeManager.class, BaseNodeManager.class), actorRef); + } + }, actorSystem.dispatcher()); } @@ -183,6 +206,7 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback, RemoteDe @Nonnull @Override public ListenableFuture onNodeCreated(@Nonnull final NodeId nodeId, @Nonnull final Node configNode) { + cachedContext = TypedActor.context(); this.nodeId = nodeId.getValue(); this.currentConfig = configNode; // set initial state before anything happens @@ -194,7 +218,7 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback, RemoteDe Futures.addCallback(connectionFuture, new FutureCallback() { @Override public void onSuccess(@Nullable NetconfDeviceCapabilities result) { - registration = topologyDispatcher.registerConnectionStatusListener(nodeId, NetconfNodeManagerCallback.this); + registration = topologyDispatcher.registerConnectionStatusListener(nodeId, nodeManager); } @Override @@ -284,7 +308,7 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback, RemoteDe .setAvailableCapabilities(new AvailableCapabilitiesBuilder().build()) .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().build()) .build()) - .build(); + .build(); } }); } @@ -306,16 +330,20 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback, RemoteDe @Override public void onRoleChanged(final RoleChangeDTO roleChangeDTO) { - if (roleChangeDTO.isOwner() && roleChangeDTO.wasOwner()) { - return; - } + topologyDispatcher.unregisterMountPoint(currentOperationalNode.getNodeId()); + isMaster = roleChangeDTO.isOwner(); - //TODO instead of registering mount point, init remote schema repo when its done if (isMaster) { - // unregister old mountPoint if ownership changed, register a new one - topologyDispatcher.registerMountPoint(new NodeId(nodeId)); + LOG.warn("Gained ownership of node - registering master mount point"); + topologyDispatcher.registerMountPoint(TypedActor.context(), new NodeId(nodeId)); } else { - topologyDispatcher.unregisterMountPoint(new NodeId(nodeId)); + // even though mount point is ready, we dont know who the master mount point will be since we havent received the announce msg + // after we receive the message we can go ahead and register the mount point + if (connected && masterDataBrokerRef != null) { + topologyDispatcher.registerMountPoint(TypedActor.context(), new NodeId(nodeId), masterDataBrokerRef); + } else { + LOG.debug("Mount point is ready, still waiting for master mount point"); + } } } @@ -323,7 +351,13 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback, RemoteDe public void onDeviceConnected(final SchemaContext remoteSchemaContext, final NetconfSessionPreferences netconfSessionPreferences, final DOMRpcService deviceRpc) { // we need to notify the higher level that something happened, get a current status from all other nodes, and aggregate a new result LOG.debug("onDeviceConnected received, registering role candidate"); - roleChangeStrategy.registerRoleCandidate(this); + connected = true; + roleChangeStrategy.registerRoleCandidate(nodeManager); + if (!isMaster && masterDataBrokerRef != null) { + // if we're not master but one is present already, we need to register mountpoint + LOG.warn("Device connected, master already present in topology, registering mount point"); + topologyDispatcher.registerMountPoint(cachedContext, new NodeId(nodeId), masterDataBrokerRef); + } List capabilityList = new ArrayList<>(); capabilityList.addAll(netconfSessionPreferences.getNetconfDeviceCapabilities().getNonModuleBasedCapabilities()); capabilityList.addAll(FluentIterable.from(netconfSessionPreferences.getNetconfDeviceCapabilities().getResolvedCapabilities()).transform(AVAILABLE_CAPABILITY_TRANSFORMER).toList()); @@ -354,17 +388,26 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback, RemoteDe .setUnavailableCapabilities(unavailableCapabilities) .build()) .build(); - // TODO need to implement forwarding of this msg to master topologyManager.notifyNodeStatusChange(new NodeId(nodeId)); } @Override public void onDeviceDisconnected() { // we need to notify the higher level that something happened, get a current status from all other nodes, and aggregate a new result - // no need to remove mountpoint, we should receive onRoleChanged callback after unregistering from election that unregisters the mountpoint LOG.debug("onDeviceDisconnected received, unregistering role candidate"); - topologyDispatcher.unregisterMountPoint(currentOperationalNode.getNodeId()); + connected = false; + if (isMaster) { + // announce that master mount point is going down + for (final Member member : clusterExtension.state().getMembers()) { + actorSystem.actorSelection(member.address() + "/user/" + topologyId + "/" + nodeId).tell(new AnnounceMasterMountPointDown(), null); + } + // set master to false since we are unregistering, the ownershipChanged callback can sometimes lag behind causing multiple nodes behaving as masters + isMaster = false; + // onRoleChanged() callback can sometimes lag behind, so unregister the mount right when it disconnects + topologyDispatcher.unregisterMountPoint(new NodeId(nodeId)); + } roleChangeStrategy.unregisterRoleCandidate(); + final NetconfNode netconfNode = currentConfig.getAugmentation(NetconfNode.class); currentOperationalNode = new NodeBuilder().setNodeId(new NodeId(nodeId)) .addAugmentation(NetconfNode.class, @@ -382,7 +425,6 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback, RemoteDe .setHost(netconfNode.getHost()) .setPort(netconfNode.getPort()) .build()).build(); - // TODO need to implement forwarding of this msg to master topologyManager.notifyNodeStatusChange(new NodeId(nodeId)); } @@ -391,6 +433,7 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback, RemoteDe // we need to notify the higher level that something happened, get a current status from all other nodes, and aggregate a new result // no need to remove mountpoint, we should receive onRoleChanged callback after unregistering from election that unregisters the mountpoint LOG.debug("onDeviceFailed received"); + connected = false; String reason = (throwable != null && throwable.getMessage() != null) ? throwable.getMessage() : UNKNOWN_REASON; roleChangeStrategy.unregisterRoleCandidate(); @@ -412,7 +455,6 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback, RemoteDe topologyManager.notifyNodeStatusChange(new NodeId(nodeId)); } - @Override public void onNotification(DOMNotification domNotification) { //NOOP @@ -424,7 +466,20 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback, RemoteDe } @Override - public void onReceive(Object o, ActorRef actorRef) { - + public void onReceive(Object message, ActorRef actorRef) { + LOG.warn("Netconf node callback received message {}", message); + if (message instanceof AnnounceMasterMountPoint) { + masterDataBrokerRef = actorRef; + // candidate gets registered when mount point is already prepared so we can go ahead a register it + if (roleChangeStrategy.isCandidateRegistered()) { + topologyDispatcher.registerMountPoint(TypedActor.context(), new NodeId(nodeId), masterDataBrokerRef); + } else { + LOG.warn("Announce master mount point msg received but mount point is not ready yet"); + } + } else if (message instanceof AnnounceMasterMountPointDown) { + LOG.warn("Master mountpoint went down"); + masterDataBrokerRef = null; + topologyDispatcher.unregisterMountPoint(new NodeId(nodeId)); + } } } \ No newline at end of file diff --git a/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/impl/NetconfTopologyImpl.java b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/impl/NetconfTopologyImpl.java index 4329063b8a..73d238d1d3 100644 --- a/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/impl/NetconfTopologyImpl.java +++ b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/impl/NetconfTopologyImpl.java @@ -8,6 +8,8 @@ package org.opendaylight.netconf.topology.impl; +import akka.actor.ActorContext; +import akka.actor.ActorRef; import io.netty.util.concurrent.EventExecutor; import java.util.Collection; import javax.annotation.Nonnull; @@ -72,7 +74,12 @@ public class NetconfTopologyImpl extends AbstractNetconfTopology implements Data } @Override - public void registerMountPoint(NodeId nodeId) { + public void registerMountPoint(ActorContext context, NodeId nodeId) { + throw new UnsupportedOperationException("MountPoint registration is not supported in regular topology, this happens automaticaly in the netconf pipeline"); + } + + @Override + public void registerMountPoint(ActorContext context, NodeId nodeId, ActorRef masterRef) { throw new UnsupportedOperationException("MountPoint registration is not supported in regular topology, this happens automaticaly in the netconf pipeline"); } diff --git a/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/impl/NetconfTopologyManagerCallback.java b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/impl/NetconfTopologyManagerCallback.java index fecc93f72b..eaa5d6cab7 100644 --- a/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/impl/NetconfTopologyManagerCallback.java +++ b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/impl/NetconfTopologyManagerCallback.java @@ -83,8 +83,10 @@ public class NetconfTopologyManagerCallback implements TopologyManagerCallback { createNodeManager(nodeId); nodes.put(nodeId, naBaseNodeManager); - // put initial state into datastore - naSalNodeWriter.init(nodeId, naBaseNodeManager.getInitialState(nodeId, node)); + // only master should put initial state into datastore + if (isMaster) { + naSalNodeWriter.init(nodeId, naBaseNodeManager.getInitialState(nodeId, node)); + } // trigger connect on this node return naBaseNodeManager.onNodeCreated(nodeId, node); @@ -92,8 +94,10 @@ public class NetconfTopologyManagerCallback implements TopologyManagerCallback { @Override public ListenableFuture onNodeUpdated(final NodeId nodeId, final Node node) { - // put initial state into datastore - naSalNodeWriter.init(nodeId, nodes.get(nodeId).getInitialState(nodeId, node)); + // only master should put initial state into datastore + if (isMaster) { + naSalNodeWriter.init(nodeId, nodes.get(nodeId).getInitialState(nodeId, node)); + } // Trigger onNodeUpdated only on this node return nodes.get(nodeId).onNodeUpdated(nodeId, node); diff --git a/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/impl/TopologyNodeWriter.java b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/impl/TopologyNodeWriter.java index d652b114d0..563ce977b2 100644 --- a/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/impl/TopologyNodeWriter.java +++ b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/impl/TopologyNodeWriter.java @@ -134,7 +134,6 @@ public class TopologyNodeWriter implements NodeWriter{ throw new IllegalStateException(id.getValue() + " Transaction(" + txType + ") not committed correctly", t); } }); - } private void createNetworkTopologyIfNotPresent(final WriteTransaction writeTx) { diff --git a/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/NetconfDeviceMasterDataBroker.java b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/NetconfDeviceMasterDataBroker.java new file mode 100644 index 0000000000..f210820ccd --- /dev/null +++ b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/NetconfDeviceMasterDataBroker.java @@ -0,0 +1,210 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.netconf.topology.pipeline; + +import akka.actor.ActorSystem; +import akka.actor.TypedActor; +import com.google.common.base.Optional; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import java.util.Collections; +import java.util.Map; +import javax.annotation.Nonnull; +import org.opendaylight.controller.md.sal.common.api.TransactionStatus; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener; +import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; +import org.opendaylight.controller.md.sal.dom.api.DOMDataBrokerExtension; +import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener; +import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction; +import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction; +import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; +import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain; +import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences; +import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceDataBroker; +import org.opendaylight.netconf.sal.connect.netconf.sal.tx.ReadWriteTx; +import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId; +import org.opendaylight.netconf.topology.pipeline.tx.ProxyReadOnlyTransaction; +import org.opendaylight.netconf.topology.pipeline.tx.ProxyWriteOnlyTransaction; +import org.opendaylight.netconf.topology.util.messages.NormalizedNodeMessage; +import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import scala.concurrent.Future; +import scala.concurrent.impl.Promise.DefaultPromise; + +public class NetconfDeviceMasterDataBroker implements ProxyNetconfDeviceDataBroker { + + private final RemoteDeviceId id; + + private final NetconfDeviceDataBroker delegateBroker; + private final ActorSystem actorSystem; + + private DOMDataReadOnlyTransaction readTx; + private DOMDataWriteTransaction writeTx; + + public NetconfDeviceMasterDataBroker(final ActorSystem actorSystem, final RemoteDeviceId id, + final SchemaContext schemaContext, final DOMRpcService rpc, + final NetconfSessionPreferences netconfSessionPreferences, final long requestTimeoutMillis) { + this.id = id; + delegateBroker = new NetconfDeviceDataBroker(id, schemaContext, rpc, netconfSessionPreferences, requestTimeoutMillis); + this.actorSystem = actorSystem; + + // only ever need 1 readTx since it doesnt need to be closed + readTx = delegateBroker.newReadOnlyTransaction(); + } + + @Override + public DOMDataReadOnlyTransaction newReadOnlyTransaction() { + return new ProxyReadOnlyTransaction(actorSystem, id, TypedActor.self()); + } + + @Override + public DOMDataReadWriteTransaction newReadWriteTransaction() { + return new ReadWriteTx(new ProxyReadOnlyTransaction(actorSystem, id, TypedActor.self()), + newWriteOnlyTransaction()); + } + + @Override + public DOMDataWriteTransaction newWriteOnlyTransaction() { + writeTx = delegateBroker.newWriteOnlyTransaction(); + return new ProxyWriteOnlyTransaction(actorSystem, TypedActor.self()); + } + + @Override + public ListenerRegistration registerDataChangeListener(LogicalDatastoreType store, YangInstanceIdentifier path, DOMDataChangeListener listener, DataChangeScope triggeringScope) { + throw new UnsupportedOperationException(id + ": Data change listeners not supported for netconf mount point"); + } + + @Override + public DOMTransactionChain createTransactionChain(TransactionChainListener listener) { + throw new UnsupportedOperationException(id + ": Transaction chains not supported for netconf mount point"); + } + + @Nonnull + @Override + public Map, DOMDataBrokerExtension> getSupportedExtensions() { + return Collections.emptyMap(); + } + + @Override + public Future> read(final LogicalDatastoreType store, final YangInstanceIdentifier path) { + final CheckedFuture>, ReadFailedException> readFuture = readTx.read(store, path); + + final DefaultPromise> promise = new DefaultPromise<>(); + Futures.addCallback(readFuture, new FutureCallback>>() { + @Override + public void onSuccess(Optional> result) { + if (!result.isPresent()) { + promise.success(Optional.absent()); + } else { + promise.success(Optional.of(new NormalizedNodeMessage(path, result.get()))); + } + } + + @Override + public void onFailure(Throwable t) { + promise.failure(t); + } + }); + return promise.future(); + } + + @Override + public Future exists(final LogicalDatastoreType store, final YangInstanceIdentifier path) { + final CheckedFuture existsFuture = readTx.exists(store, path); + + final DefaultPromise promise = new DefaultPromise<>(); + Futures.addCallback(existsFuture, new FutureCallback() { + @Override + public void onSuccess(Boolean result) { + promise.success(result); + } + + @Override + public void onFailure(Throwable t) { + promise.failure(t); + } + }); + return promise.future(); + } + + @Override + public void put(final LogicalDatastoreType store, final NormalizedNodeMessage data) { + if (writeTx == null) { + writeTx = delegateBroker.newWriteOnlyTransaction(); + } + writeTx.put(store, data.getIdentifier(), data.getNode()); + } + + @Override + public void merge(final LogicalDatastoreType store, final NormalizedNodeMessage data) { + if (writeTx == null) { + writeTx = delegateBroker.newWriteOnlyTransaction(); + } + writeTx.merge(store, data.getIdentifier(), data.getNode()); + } + + @Override + public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier path) { + if (writeTx == null) { + writeTx = delegateBroker.newWriteOnlyTransaction(); + } + writeTx.delete(store, path); + } + + @Override + public boolean cancel() { + return writeTx.cancel(); + } + + @Override + public Future submit() { + final CheckedFuture submitFuture = writeTx.submit(); + final DefaultPromise promise = new DefaultPromise<>(); + Futures.addCallback(submitFuture, new FutureCallback() { + @Override + public void onSuccess(Void result) { + promise.success(result); + } + + @Override + public void onFailure(Throwable t) { + promise.failure(t); + } + }); + return promise.future(); + } + + @Override + @Deprecated + public Future> commit() { + final ListenableFuture> commitFuture = writeTx.commit(); + final DefaultPromise> promise = new DefaultPromise<>(); + Futures.addCallback(commitFuture, new FutureCallback>() { + @Override + public void onSuccess(RpcResult result) { + promise.success(result); + } + + @Override + public void onFailure(Throwable t) { + promise.failure(t); + } + }); + return promise.future(); + } + +} diff --git a/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/NetconfDeviceSlaveDataBroker.java b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/NetconfDeviceSlaveDataBroker.java new file mode 100644 index 0000000000..23d7e10ae8 --- /dev/null +++ b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/NetconfDeviceSlaveDataBroker.java @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.netconf.topology.pipeline; + +import akka.actor.ActorSystem; +import java.util.Collections; +import java.util.Map; +import javax.annotation.Nonnull; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener; +import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker; +import org.opendaylight.controller.md.sal.dom.api.DOMDataBrokerExtension; +import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener; +import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction; +import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction; +import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; +import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain; +import org.opendaylight.netconf.sal.connect.netconf.sal.tx.ReadWriteTx; +import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId; +import org.opendaylight.netconf.topology.pipeline.tx.ProxyReadOnlyTransaction; +import org.opendaylight.netconf.topology.pipeline.tx.ProxyWriteOnlyTransaction; +import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; + +public class NetconfDeviceSlaveDataBroker implements DOMDataBroker{ + + private final RemoteDeviceId id; + private final ProxyNetconfDeviceDataBroker masterDataBroker; + private final ActorSystem actorSystem; + + public NetconfDeviceSlaveDataBroker(final ActorSystem actorSystem, final RemoteDeviceId id, final ProxyNetconfDeviceDataBroker masterDataBroker) { + this.id = id; + this.masterDataBroker = masterDataBroker; + this.actorSystem = actorSystem; + } + + @Override + public DOMDataReadOnlyTransaction newReadOnlyTransaction() { + return new ProxyReadOnlyTransaction(actorSystem, id, masterDataBroker); + } + + @Override + public DOMDataReadWriteTransaction newReadWriteTransaction() { + return new ReadWriteTx(new ProxyReadOnlyTransaction(actorSystem, id, masterDataBroker), new ProxyWriteOnlyTransaction(actorSystem, masterDataBroker)); + } + + @Override + public DOMDataWriteTransaction newWriteOnlyTransaction() { + return new ProxyWriteOnlyTransaction(actorSystem, masterDataBroker); + } + + @Override + public ListenerRegistration registerDataChangeListener(LogicalDatastoreType store, YangInstanceIdentifier path, DOMDataChangeListener listener, DataChangeScope triggeringScope) { + throw new UnsupportedOperationException(id + ": Data change listeners not supported for netconf mount point"); + } + + @Override + public DOMTransactionChain createTransactionChain(TransactionChainListener listener) { + throw new UnsupportedOperationException(id + ": Transaction chains not supported for netconf mount point"); + } + + @Nonnull + @Override + public Map, DOMDataBrokerExtension> getSupportedExtensions() { + return Collections.emptyMap(); + } +} diff --git a/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/ProxyNetconfDeviceDataBroker.java b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/ProxyNetconfDeviceDataBroker.java new file mode 100644 index 0000000000..3ef76884f6 --- /dev/null +++ b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/ProxyNetconfDeviceDataBroker.java @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.netconf.topology.pipeline; + +import com.google.common.base.Optional; +import org.opendaylight.controller.md.sal.common.api.TransactionStatus; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker; +import org.opendaylight.netconf.topology.util.messages.NormalizedNodeMessage; +import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import scala.concurrent.Future; + +public interface ProxyNetconfDeviceDataBroker extends DOMDataBroker{ + Future> read(LogicalDatastoreType store, YangInstanceIdentifier path); + + Future exists(LogicalDatastoreType store, YangInstanceIdentifier path); + + void put(LogicalDatastoreType store, NormalizedNodeMessage data); + + void merge(LogicalDatastoreType store, NormalizedNodeMessage data); + + void delete(LogicalDatastoreType store, YangInstanceIdentifier path); + + boolean cancel(); + + Future submit(); + + @Deprecated + Future> commit(); +} diff --git a/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/TopologyMountPointFacade.java b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/TopologyMountPointFacade.java index ec06c04b92..ec294b268f 100644 --- a/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/TopologyMountPointFacade.java +++ b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/TopologyMountPointFacade.java @@ -8,6 +8,14 @@ package org.opendaylight.netconf.topology.pipeline; +import akka.actor.ActorContext; +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.TypedActor; +import akka.actor.TypedProps; +import akka.cluster.Cluster; +import akka.cluster.Member; +import akka.japi.Creator; import com.google.common.base.Preconditions; import java.util.ArrayList; import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker; @@ -17,9 +25,9 @@ import org.opendaylight.controller.sal.binding.api.BindingAwareBroker; import org.opendaylight.controller.sal.core.api.Broker; import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler; import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences; -import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceDataBroker; import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceNotificationService; import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId; +import org.opendaylight.netconf.topology.util.messages.AnnounceMasterMountPoint; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,6 +36,9 @@ public class TopologyMountPointFacade implements AutoCloseable, RemoteDeviceHand private static final Logger LOG = LoggerFactory.getLogger(TopologyMountPointFacade.class); + private static final String MOUNT_POINT = "mountpoint"; + + private final String topologyId; private final RemoteDeviceId id; private final Broker domBroker; private final BindingAwareBroker bindingBroker; @@ -38,13 +49,17 @@ public class TopologyMountPointFacade implements AutoCloseable, RemoteDeviceHand private DOMRpcService deviceRpc = null; private final ClusteredNetconfDeviceMountInstanceProxy salProvider; + private ActorSystem actorSystem; + private DOMDataBroker deviceDataBroker = null; + private final ArrayList> connectionStatusListeners = new ArrayList<>(); - public TopologyMountPointFacade(final RemoteDeviceId id, + public TopologyMountPointFacade(final String topologyId, + final RemoteDeviceId id, final Broker domBroker, final BindingAwareBroker bindingBroker, long defaultRequestTimeoutMillis) { - + this.topologyId = topologyId; this.id = id; this.domBroker = domBroker; this.bindingBroker = bindingBroker; @@ -91,19 +106,54 @@ public class TopologyMountPointFacade implements AutoCloseable, RemoteDeviceHand salProvider.getMountInstance().publish(domNotification); } - public void registerMountPoint() { + public void registerMountPoint(final ActorSystem actorSystem, final ActorContext context) { Preconditions.checkNotNull(id); Preconditions.checkNotNull(remoteSchemaContext, "Device has no remote schema context yet. Probably not fully connected."); Preconditions.checkNotNull(netconfSessionPreferences, "Device has no capabilities yet. Probably not fully connected."); + this.actorSystem = actorSystem; + final NetconfDeviceNotificationService notificationService = new NetconfDeviceNotificationService(); - final DOMDataBroker netconfDeviceDataBroker = new NetconfDeviceDataBroker(id, remoteSchemaContext, deviceRpc, netconfSessionPreferences, defaultRequestTimeoutMillis); + LOG.warn("Creating master data broker for device {}", id); + deviceDataBroker = TypedActor.get(context).typedActorOf(new TypedProps<>(ProxyNetconfDeviceDataBroker.class, new Creator() { + @Override + public NetconfDeviceMasterDataBroker create() throws Exception { + return new NetconfDeviceMasterDataBroker(actorSystem, id, remoteSchemaContext, deviceRpc, netconfSessionPreferences, defaultRequestTimeoutMillis); + } + }), MOUNT_POINT); + LOG.warn("Master data broker registered on path {}", TypedActor.get(actorSystem).getActorRefFor(deviceDataBroker).path()); + salProvider.getMountInstance().onTopologyDeviceConnected(remoteSchemaContext, deviceDataBroker, deviceRpc, notificationService); + final Cluster cluster = Cluster.get(actorSystem); + final Iterable members = cluster.state().getMembers(); + final ActorRef deviceDataBrokerRef = TypedActor.get(actorSystem).getActorRefFor(deviceDataBroker); + for (final Member member : members) { + if (!member.address().equals(cluster.selfAddress())) { + final String path = member.address() + "/user/" + topologyId + "/" + id.getName(); + actorSystem.actorSelection(path).tell(new AnnounceMasterMountPoint(), deviceDataBrokerRef); + } + } + } + + public void registerMountPoint(final ActorSystem actorSystem, final ActorContext context, final ActorRef masterRef) { + Preconditions.checkNotNull(id); + Preconditions.checkNotNull(remoteSchemaContext, "Device has no remote schema context yet. Probably not fully connected."); + Preconditions.checkNotNull(netconfSessionPreferences, "Device has no capabilities yet. Probably not fully connected."); + this.actorSystem = actorSystem; final NetconfDeviceNotificationService notificationService = new NetconfDeviceNotificationService(); - salProvider.getMountInstance().onTopologyDeviceConnected(remoteSchemaContext, netconfDeviceDataBroker, deviceRpc, notificationService); + LOG.warn("Creating a proxy for master data broker"); + final ProxyNetconfDeviceDataBroker masterDataBroker = TypedActor.get(actorSystem).typedActorOf(new TypedProps<>(ProxyNetconfDeviceDataBroker.class, NetconfDeviceMasterDataBroker.class), masterRef); + LOG.warn("Creating slave data broker for device {}", id); + final DOMDataBroker deviceDataBroker = new NetconfDeviceSlaveDataBroker(actorSystem, id, masterDataBroker); + salProvider.getMountInstance().onTopologyDeviceConnected(remoteSchemaContext, deviceDataBroker, deviceRpc, notificationService); } public void unregisterMountPoint() { salProvider.getMountInstance().onTopologyDeviceDisconnected(); + if (deviceDataBroker != null) { + LOG.warn("Stopping master data broker for device {}", id.getName()); + TypedActor.get(actorSystem).stop(deviceDataBroker); + deviceDataBroker = null; + } } public ConnectionStatusListenerRegistration registerConnectionStatusListener(final RemoteDeviceHandler listener) { diff --git a/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/tx/NetconfDeviceDataBrokerProxy.java b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/tx/NetconfDeviceDataBrokerProxy.java new file mode 100644 index 0000000000..23fe120be1 --- /dev/null +++ b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/tx/NetconfDeviceDataBrokerProxy.java @@ -0,0 +1,12 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.netconf.topology.pipeline.tx; + +public interface NetconfDeviceDataBrokerProxy { +} diff --git a/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/tx/ProxyReadOnlyTransaction.java b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/tx/ProxyReadOnlyTransaction.java new file mode 100644 index 0000000000..c67673b05b --- /dev/null +++ b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/tx/ProxyReadOnlyTransaction.java @@ -0,0 +1,104 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.netconf.topology.pipeline.tx; + +import akka.actor.ActorSystem; +import akka.dispatch.OnComplete; +import com.google.common.base.Function; +import com.google.common.base.Optional; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.SettableFuture; +import javax.annotation.Nullable; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction; +import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId; +import org.opendaylight.netconf.topology.pipeline.ProxyNetconfDeviceDataBroker; +import org.opendaylight.netconf.topology.util.messages.NormalizedNodeMessage; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import scala.concurrent.Future; + +public class ProxyReadOnlyTransaction implements DOMDataReadOnlyTransaction{ + + private final RemoteDeviceId id; + private final ProxyNetconfDeviceDataBroker delegate; + private final ActorSystem actorSystem; + + public ProxyReadOnlyTransaction(final ActorSystem actorSystem, final RemoteDeviceId id, final ProxyNetconfDeviceDataBroker delegate) { + this.id = id; + this.delegate = delegate; + this.actorSystem = actorSystem; + } + + @Override + public void close() { + //NOOP + } + + @Override + public CheckedFuture>, ReadFailedException> read(final LogicalDatastoreType store, final YangInstanceIdentifier path) { + final Future> future = delegate.read(store, path); + final SettableFuture>> settableFuture = SettableFuture.create(); + final CheckedFuture>, ReadFailedException> checkedFuture = Futures.makeChecked(settableFuture, new Function() { + @Nullable + @Override + public ReadFailedException apply(Exception cause) { + return new ReadFailedException("Read from transaction failed", cause); + } + }); + future.onComplete(new OnComplete>() { + @Override + public void onComplete(Throwable throwable, Optional normalizedNodeMessage) throws Throwable { + if (throwable == null) { + settableFuture.set(normalizedNodeMessage.transform(new Function>() { + @Nullable + @Override + public NormalizedNode apply(NormalizedNodeMessage input) { + return input.getNode(); + } + })); + } else { + settableFuture.setException(throwable); + } + } + }, actorSystem.dispatcher()); + return checkedFuture; + } + + @Override + public CheckedFuture exists(final LogicalDatastoreType store, final YangInstanceIdentifier path) { + final Future existsFuture = delegate.exists(store, path); + final SettableFuture settableFuture = SettableFuture.create(); + final CheckedFuture checkedFuture = Futures.makeChecked(settableFuture, new Function() { + @Nullable + @Override + public ReadFailedException apply(Exception cause) { + return new ReadFailedException("Read from transaction failed", cause); + } + }); + existsFuture.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable throwable, Boolean result) throws Throwable { + if (throwable == null) { + settableFuture.set(result); + } else { + settableFuture.setException(throwable); + } + } + }, actorSystem.dispatcher()); + return checkedFuture; + } + + @Override + public Object getIdentifier() { + return this; + } +} diff --git a/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/tx/ProxyWriteOnlyTransaction.java b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/tx/ProxyWriteOnlyTransaction.java new file mode 100644 index 0000000000..d7a3c87348 --- /dev/null +++ b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/tx/ProxyWriteOnlyTransaction.java @@ -0,0 +1,105 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.netconf.topology.pipeline.tx; + +import akka.actor.ActorSystem; +import akka.dispatch.OnComplete; +import com.google.common.base.Function; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import javax.annotation.Nullable; +import org.opendaylight.controller.md.sal.common.api.TransactionStatus; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; +import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; +import org.opendaylight.netconf.topology.pipeline.ProxyNetconfDeviceDataBroker; +import org.opendaylight.netconf.topology.util.messages.NormalizedNodeMessage; +import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import scala.concurrent.Future; + +public class ProxyWriteOnlyTransaction implements DOMDataWriteTransaction { + + private final ProxyNetconfDeviceDataBroker delegate; + private final ActorSystem actorSystem; + + public ProxyWriteOnlyTransaction(ActorSystem actorSystem, final ProxyNetconfDeviceDataBroker delegate) { + this.delegate = delegate; + this.actorSystem = actorSystem; + } + + @Override + public void put (final LogicalDatastoreType store, final YangInstanceIdentifier path, final NormalizedNode < ?,?>data){ + delegate.put(store, new NormalizedNodeMessage(path, data)); + } + + @Override + public void merge (final LogicalDatastoreType store, final YangInstanceIdentifier path, final NormalizedNode < ?,?>data){ + delegate.merge(store, new NormalizedNodeMessage(path, data)); + } + + @Override + public boolean cancel () { + return delegate.cancel(); + } + + @Override + public void delete (final LogicalDatastoreType store, final YangInstanceIdentifier path){ + delegate.delete(store, path); + } + + @Override + public CheckedFuture submit() { + final Future submit = delegate.submit(); + final SettableFuture settableFuture = SettableFuture.create(); + final CheckedFuture checkedFuture = Futures.makeChecked(settableFuture, new Function() { + @Nullable + @Override + public TransactionCommitFailedException apply(Exception input) { + return new TransactionCommitFailedException("Transaction commit failed", input); + } + }); + submit.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable throwable, Void aVoid) throws Throwable { + if (throwable == null) { + settableFuture.set(aVoid); + } else { + settableFuture.setException(throwable); + } + } + }, actorSystem.dispatcher()); + return checkedFuture; + } + + @Override + public ListenableFuture> commit () { + final Future> commit = delegate.commit(); + final SettableFuture> settableFuture = SettableFuture.create(); + commit.onComplete(new OnComplete>() { + @Override + public void onComplete(Throwable throwable, RpcResult transactionStatusRpcResult) throws Throwable { + if (throwable == null) { + settableFuture.set(transactionStatusRpcResult); + } else { + settableFuture.setException(throwable); + } + } + }, actorSystem.dispatcher()); + return settableFuture; + } + + @Override + public Object getIdentifier () { + return this; + } +} diff --git a/opendaylight/netconf/netconf-topology/src/test/java/org/opendaylight/netconf/topology/ActorTest.java b/opendaylight/netconf/netconf-topology/src/test/java/org/opendaylight/netconf/topology/ActorTest.java index 392838c4b8..544cb87d21 100644 --- a/opendaylight/netconf/netconf-topology/src/test/java/org/opendaylight/netconf/topology/ActorTest.java +++ b/opendaylight/netconf/netconf-topology/src/test/java/org/opendaylight/netconf/topology/ActorTest.java @@ -45,6 +45,9 @@ import org.opendaylight.controller.md.sal.binding.api.DataChangeListener; import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.dom.api.DOMNotification; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; +import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences; import org.opendaylight.netconf.topology.NodeManagerCallback.NodeManagerCallbackFactory; import org.opendaylight.netconf.topology.TopologyManagerCallback.TopologyManagerCallbackFactory; import org.opendaylight.netconf.topology.example.ExampleNodeManagerCallback; @@ -486,6 +489,31 @@ public class ActorTest { public void onReceive(Object o, ActorRef actorRef) { } + + @Override + public void onDeviceConnected(SchemaContext remoteSchemaContext, NetconfSessionPreferences netconfSessionPreferences, DOMRpcService deviceRpc) { + + } + + @Override + public void onDeviceDisconnected() { + + } + + @Override + public void onDeviceFailed(Throwable throwable) { + + } + + @Override + public void onNotification(DOMNotification domNotification) { + + } + + @Override + public void close() { + + } } public static class TestingTopologyManagerCallback implements TopologyManagerCallback { diff --git a/opendaylight/netconf/netconf-topology/src/test/java/org/opendaylight/netconf/topology/TestingTopologyDispatcher.java b/opendaylight/netconf/netconf-topology/src/test/java/org/opendaylight/netconf/topology/TestingTopologyDispatcher.java index 7931bb8c39..28a1f97122 100644 --- a/opendaylight/netconf/netconf-topology/src/test/java/org/opendaylight/netconf/topology/TestingTopologyDispatcher.java +++ b/opendaylight/netconf/netconf-topology/src/test/java/org/opendaylight/netconf/topology/TestingTopologyDispatcher.java @@ -8,6 +8,8 @@ package org.opendaylight.netconf.topology; +import akka.actor.ActorContext; +import akka.actor.ActorRef; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; @@ -108,9 +110,13 @@ public class TestingTopologyDispatcher implements NetconfTopology{ } @Override - public void registerMountPoint(NodeId nodeId) { + public void registerMountPoint(ActorContext context, NodeId nodeId) { LOG.debug("Registering mount point for node {}", nodeId.getValue()); + } + @Override + public void registerMountPoint(ActorContext context, NodeId nodeId, ActorRef masterRef) { + LOG.debug("Registering mount point for node {}", nodeId.getValue()); } @Override