import akka.actor.TypedActor.Receiver;
import com.google.common.annotations.Beta;
+import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
+import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
/**
* Node manager that handles communication between node managers and delegates calls to the customizable NodeManagerCallback
*/
@Beta
-public interface NodeManager extends InitialStateProvider, NodeListener, Receiver, RemoteNodeListener {
+public interface NodeManager extends InitialStateProvider, NodeListener, Receiver, RemoteNodeListener, RemoteDeviceHandler<NetconfSessionPreferences> {
}
import akka.actor.ActorSystem;
import akka.actor.TypedActor.Receiver;
import com.google.common.annotations.Beta;
+import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
+import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
/**
* Customizable layer that handles communication with your application.
*/
@Beta
-public interface NodeManagerCallback extends InitialStateProvider, NodeListener, Receiver {
+public interface NodeManagerCallback extends InitialStateProvider, NodeListener, Receiver, RemoteDeviceHandler<NetconfSessionPreferences> {
interface NodeManagerCallbackFactory<M> {
NodeManagerCallback create(String nodeId, String topologyId, ActorSystem actorSystem);
*/
void unregisterRoleCandidate();
+ /**
+ *
+ * @return True/False based on if this candidate is already registered into ownership service
+ */
+ boolean isCandidateRegistered();
+
}
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import javax.annotation.Nonnull;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
import org.opendaylight.netconf.topology.NodeManagerCallback;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeBuilder;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
public class ExampleNodeManagerCallback implements NodeManagerCallback {
public void onRoleChanged(RoleChangeDTO roleChangeDTO) {
}
+
+ @Override
+ public void onDeviceConnected(SchemaContext remoteSchemaContext, NetconfSessionPreferences netconfSessionPreferences, DOMRpcService deviceRpc) {
+
+ }
+
+ @Override
+ public void onDeviceDisconnected() {
+
+ }
+
+ @Override
+ public void onDeviceFailed(Throwable throwable) {
+
+ }
+
+ @Override
+ public void onNotification(DOMNotification domNotification) {
+
+ }
+
+ @Override
+ public void close() {
+
+ }
}
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListenableFuture;
import javax.annotation.Nonnull;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
import org.opendaylight.netconf.topology.NodeManager;
import org.opendaylight.netconf.topology.NodeManagerCallback;
import org.opendaylight.netconf.topology.NodeManagerCallback.NodeManagerCallbackFactory;
import org.opendaylight.netconf.topology.util.messages.NormalizedNodeMessage;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Future;
private final String topologyId;
private final ActorSystem actorSystem;
- private boolean isMaster;
+ private boolean isMaster = false;
private NodeManagerCallback delegate;
private BaseNodeManager(final String nodeId,
@Override
public void onReceive(Object o, ActorRef actorRef) {
-
+ delegate.onReceive(o, actorRef);
}
@Override
return null;
}
+ @Override
+ public void onDeviceConnected(SchemaContext remoteSchemaContext, NetconfSessionPreferences netconfSessionPreferences, DOMRpcService deviceRpc) {
+ delegate.onDeviceConnected(remoteSchemaContext, netconfSessionPreferences, deviceRpc);
+ }
+
+ @Override
+ public void onDeviceDisconnected() {
+ delegate.onDeviceDisconnected();
+ }
+
+ @Override
+ public void onDeviceFailed(Throwable throwable) {
+ delegate.onDeviceFailed(throwable);
+ }
+
+ @Override
+ public void onNotification(DOMNotification domNotification) {
+ delegate.onNotification(domNotification);
+ }
+
+ @Override
+ public void close() {
+ // NOOP
+ }
+
/**
* Builder of BaseNodeManager instances that are proxied as TypedActors
*/
private final EntityOwnershipService entityOwnershipService;
private final String entityType;
private final String entityName;
+ private final Entity entity;
private NodeListener ownershipCandidate;
private EntityOwnershipCandidateRegistration candidateRegistration = null;
this.entityOwnershipService = entityOwnershipService;
this.entityType = entityType + "/" + entityName;
this.entityName = entityName;
+ this.entity = new Entity(this.entityType, entityName);
}
@Override
if (candidateRegistration != null) {
unregisterRoleCandidate();
}
- candidateRegistration = entityOwnershipService.registerCandidate(new Entity(entityType, entityName));
+ candidateRegistration = entityOwnershipService.registerCandidate(entity);
ownershipListenerRegistration = entityOwnershipService.registerListener(entityType, this);
} catch (CandidateAlreadyRegisteredException e) {
LOG.error("Candidate already registered for election", e);
ownershipListenerRegistration = null;
}
+ @Override
+ public boolean isCandidateRegistered() {
+ return entityOwnershipService.isCandidateRegistered(entity);
+ }
+
@Override
public void onRoleChanged(RoleChangeDTO roleChangeDTO) {
LOG.debug("Role was changed {}", roleChangeDTO);
}
+ @Override
+ public boolean isCandidateRegistered() {
+ return false;
+ }
+
@Override
public void onRoleChanged(RoleChangeDTO roleChangeDTO) {
private NodeListener ownershipCandidate;
private final String entityType;
// use topologyId as entityName
- private final String entityName;
+ private final Entity entity;
private EntityOwnershipCandidateRegistration candidateRegistration = null;
private EntityOwnershipListenerRegistration ownershipListenerRegistration = null;
this.dataBroker = dataBroker;
this.entityOwnershipService = entityOwnershipService;
this.entityType = entityType;
- this.entityName = entityName;
+ this.entity = new Entity(entityType, entityName);
datastoreListenerRegistration = null;
}
if (candidateRegistration != null) {
unregisterRoleCandidate();
}
- candidateRegistration = entityOwnershipService.registerCandidate(new Entity(entityType, entityName));
+ candidateRegistration = entityOwnershipService.registerCandidate(entity);
ownershipListenerRegistration = entityOwnershipService.registerListener(entityType, this);
} catch (CandidateAlreadyRegisteredException e) {
LOG.error("Candidate already registered for election", e);
ownershipListenerRegistration = null;
}
+ @Override
+ public boolean isCandidateRegistered() {
+ return entityOwnershipService.isCandidateRegistered(entity);
+ }
+
@Override
public void onRoleChanged(RoleChangeDTO roleChangeDTO) {
if (roleChangeDTO.isOwner()) {
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.util.messages;
+
+import java.io.Serializable;
+
+public class AnnounceMasterMountPoint implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ public AnnounceMasterMountPoint() {}
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.util.messages;
+
+import java.io.Serializable;
+
+public class AnnounceMasterMountPointDown implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ public AnnounceMasterMountPointDown() {
+
+ }
+}
package org.opendaylight.netconf.topology;
+import akka.actor.ActorContext;
+import akka.actor.ActorRef;
import com.google.common.util.concurrent.ListenableFuture;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
ListenableFuture<Void> disconnectNode(NodeId nodeId);
- void registerMountPoint(NodeId nodeId);
+ /**
+ * register master mount point
+ * @param context
+ * @param nodeId
+ */
+ void registerMountPoint(ActorContext context, NodeId nodeId);
+
+ /**
+ * register slave mountpoint with the provided ActorRef
+ * @param context
+ * @param nodeId
+ * @param masterRef
+ */
+ void registerMountPoint(ActorContext context, NodeId nodeId, ActorRef masterRef);
void unregisterMountPoint(NodeId nodeId);
package org.opendaylight.netconf.topology.impl;
+import akka.actor.ActorContext;
+import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.TypedActor;
import akka.actor.TypedActorExtension;
@Override
protected RemoteDeviceHandler<NetconfSessionPreferences> createSalFacade(final RemoteDeviceId id, final Broker domBroker, final BindingAwareBroker bindingBroker, long defaultRequestTimeoutMillis) {
- return new TopologyMountPointFacade(id, domBroker, bindingBroker, defaultRequestTimeoutMillis);
+ return new TopologyMountPointFacade(topologyId, id, domBroker, bindingBroker, defaultRequestTimeoutMillis);
}
@Override
- public void registerMountPoint(NodeId nodeId) {
- ((TopologyMountPointFacade) activeConnectors.get(nodeId).getFacade()).registerMountPoint();
+ public void registerMountPoint(final ActorContext context, final NodeId nodeId) {
+ ((TopologyMountPointFacade) activeConnectors.get(nodeId).getFacade()).registerMountPoint(actorSystem, context);
}
@Override
- public void unregisterMountPoint(NodeId nodeId) {
+ public void registerMountPoint(final ActorContext context, final NodeId nodeId, final ActorRef masterRef) {
+ ((TopologyMountPointFacade) activeConnectors.get(nodeId).getFacade()).registerMountPoint(actorSystem, context, masterRef);
+ }
+
+ @Override
+ public void unregisterMountPoint(final NodeId nodeId) {
Preconditions.checkState(activeConnectors.containsKey(nodeId), "Cannot unregister nonexistent mountpoint");
((TopologyMountPointFacade) activeConnectors.get(nodeId).getFacade()).unregisterMountPoint();
}
package org.opendaylight.netconf.topology.impl;
+import akka.actor.ActorContext;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.TypedActor;
import akka.actor.TypedProps;
import akka.cluster.Cluster;
+import akka.cluster.Member;
import akka.dispatch.OnComplete;
import com.google.common.base.Function;
import com.google.common.collect.FluentIterable;
import javax.annotation.Nullable;
import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
-import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCapabilities;
import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
import org.opendaylight.netconf.topology.NetconfTopology;
+import org.opendaylight.netconf.topology.NodeManager;
import org.opendaylight.netconf.topology.NodeManagerCallback;
import org.opendaylight.netconf.topology.RoleChangeStrategy;
import org.opendaylight.netconf.topology.TopologyManager;
import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade.ConnectionStatusListenerRegistration;
+import org.opendaylight.netconf.topology.util.BaseNodeManager;
import org.opendaylight.netconf.topology.util.BaseTopologyManager;
+import org.opendaylight.netconf.topology.util.messages.AnnounceMasterMountPoint;
+import org.opendaylight.netconf.topology.util.messages.AnnounceMasterMountPointDown;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeConnectionStatus.ConnectionStatus;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
-public class NetconfNodeManagerCallback implements NodeManagerCallback, RemoteDeviceHandler<NetconfSessionPreferences>{
+public class NetconfNodeManagerCallback implements NodeManagerCallback{
private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeManagerCallback.class);
private String nodeId;
private String topologyId;
private TopologyManager topologyManager;
+ private NodeManager nodeManager;
+ // cached context so that we can use it in callbacks from topology
+ private ActorContext cachedContext;
private Node currentConfig;
private Node currentOperationalNode;
private ConnectionStatusListenerRegistration registration = null;
+ private ActorRef masterDataBrokerRef = null;
+ private boolean connected = false;
+
public NetconfNodeManagerCallback(final String nodeId,
final String topologyId,
final ActorSystem actorSystem,
topologyManager = TypedActor.get(actorSystem).typedActorOf(new TypedProps<>(TopologyManager.class, BaseTopologyManager.class), actorRef);
}
}, actorSystem.dispatcher());
+
+ final Future<ActorRef> nodeRefFuture = actorSystem.actorSelection("/user/" + topologyId + "/" + nodeId).resolveOne(FiniteDuration.create(10L, TimeUnit.SECONDS));
+ nodeRefFuture.onComplete(new OnComplete<ActorRef>() {
+ @Override
+ public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable {
+ if (throwable != null) {
+ LOG.warn("Unable to resolve actor for path: {} ", "/user/" + topologyId + "/" + nodeId, throwable);
+ }
+ LOG.debug("Actor ref for path {} resolved", "/user/" + topologyId);
+ nodeManager = TypedActor.get(actorSystem).typedActorOf(new TypedProps<>(NodeManager.class, BaseNodeManager.class), actorRef);
+ }
+ }, actorSystem.dispatcher());
}
@Nonnull @Override public ListenableFuture<Node> onNodeCreated(@Nonnull final NodeId nodeId,
@Nonnull final Node configNode) {
+ cachedContext = TypedActor.context();
this.nodeId = nodeId.getValue();
this.currentConfig = configNode;
// set initial state before anything happens
Futures.addCallback(connectionFuture, new FutureCallback<NetconfDeviceCapabilities>() {
@Override
public void onSuccess(@Nullable NetconfDeviceCapabilities result) {
- registration = topologyDispatcher.registerConnectionStatusListener(nodeId, NetconfNodeManagerCallback.this);
+ registration = topologyDispatcher.registerConnectionStatusListener(nodeId, nodeManager);
}
@Override
.setAvailableCapabilities(new AvailableCapabilitiesBuilder().build())
.setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().build())
.build())
- .build();
+ .build();
}
});
}
@Override
public void onRoleChanged(final RoleChangeDTO roleChangeDTO) {
- if (roleChangeDTO.isOwner() && roleChangeDTO.wasOwner()) {
- return;
- }
+ topologyDispatcher.unregisterMountPoint(currentOperationalNode.getNodeId());
+
isMaster = roleChangeDTO.isOwner();
- //TODO instead of registering mount point, init remote schema repo when its done
if (isMaster) {
- // unregister old mountPoint if ownership changed, register a new one
- topologyDispatcher.registerMountPoint(new NodeId(nodeId));
+ LOG.warn("Gained ownership of node - registering master mount point");
+ topologyDispatcher.registerMountPoint(TypedActor.context(), new NodeId(nodeId));
} else {
- topologyDispatcher.unregisterMountPoint(new NodeId(nodeId));
+ // even though mount point is ready, we dont know who the master mount point will be since we havent received the announce msg
+ // after we receive the message we can go ahead and register the mount point
+ if (connected && masterDataBrokerRef != null) {
+ topologyDispatcher.registerMountPoint(TypedActor.context(), new NodeId(nodeId), masterDataBrokerRef);
+ } else {
+ LOG.debug("Mount point is ready, still waiting for master mount point");
+ }
}
}
public void onDeviceConnected(final SchemaContext remoteSchemaContext, final NetconfSessionPreferences netconfSessionPreferences, final DOMRpcService deviceRpc) {
// we need to notify the higher level that something happened, get a current status from all other nodes, and aggregate a new result
LOG.debug("onDeviceConnected received, registering role candidate");
- roleChangeStrategy.registerRoleCandidate(this);
+ connected = true;
+ roleChangeStrategy.registerRoleCandidate(nodeManager);
+ if (!isMaster && masterDataBrokerRef != null) {
+ // if we're not master but one is present already, we need to register mountpoint
+ LOG.warn("Device connected, master already present in topology, registering mount point");
+ topologyDispatcher.registerMountPoint(cachedContext, new NodeId(nodeId), masterDataBrokerRef);
+ }
List<String> capabilityList = new ArrayList<>();
capabilityList.addAll(netconfSessionPreferences.getNetconfDeviceCapabilities().getNonModuleBasedCapabilities());
capabilityList.addAll(FluentIterable.from(netconfSessionPreferences.getNetconfDeviceCapabilities().getResolvedCapabilities()).transform(AVAILABLE_CAPABILITY_TRANSFORMER).toList());
.setUnavailableCapabilities(unavailableCapabilities)
.build())
.build();
- // TODO need to implement forwarding of this msg to master
topologyManager.notifyNodeStatusChange(new NodeId(nodeId));
}
@Override
public void onDeviceDisconnected() {
// we need to notify the higher level that something happened, get a current status from all other nodes, and aggregate a new result
- // no need to remove mountpoint, we should receive onRoleChanged callback after unregistering from election that unregisters the mountpoint
LOG.debug("onDeviceDisconnected received, unregistering role candidate");
- topologyDispatcher.unregisterMountPoint(currentOperationalNode.getNodeId());
+ connected = false;
+ if (isMaster) {
+ // announce that master mount point is going down
+ for (final Member member : clusterExtension.state().getMembers()) {
+ actorSystem.actorSelection(member.address() + "/user/" + topologyId + "/" + nodeId).tell(new AnnounceMasterMountPointDown(), null);
+ }
+ // set master to false since we are unregistering, the ownershipChanged callback can sometimes lag behind causing multiple nodes behaving as masters
+ isMaster = false;
+ // onRoleChanged() callback can sometimes lag behind, so unregister the mount right when it disconnects
+ topologyDispatcher.unregisterMountPoint(new NodeId(nodeId));
+ }
roleChangeStrategy.unregisterRoleCandidate();
+
final NetconfNode netconfNode = currentConfig.getAugmentation(NetconfNode.class);
currentOperationalNode = new NodeBuilder().setNodeId(new NodeId(nodeId))
.addAugmentation(NetconfNode.class,
.setHost(netconfNode.getHost())
.setPort(netconfNode.getPort())
.build()).build();
- // TODO need to implement forwarding of this msg to master
topologyManager.notifyNodeStatusChange(new NodeId(nodeId));
}
// we need to notify the higher level that something happened, get a current status from all other nodes, and aggregate a new result
// no need to remove mountpoint, we should receive onRoleChanged callback after unregistering from election that unregisters the mountpoint
LOG.debug("onDeviceFailed received");
+ connected = false;
String reason = (throwable != null && throwable.getMessage() != null) ? throwable.getMessage() : UNKNOWN_REASON;
roleChangeStrategy.unregisterRoleCandidate();
topologyManager.notifyNodeStatusChange(new NodeId(nodeId));
}
-
@Override
public void onNotification(DOMNotification domNotification) {
//NOOP
}
@Override
- public void onReceive(Object o, ActorRef actorRef) {
-
+ public void onReceive(Object message, ActorRef actorRef) {
+ LOG.warn("Netconf node callback received message {}", message);
+ if (message instanceof AnnounceMasterMountPoint) {
+ masterDataBrokerRef = actorRef;
+ // candidate gets registered when mount point is already prepared so we can go ahead a register it
+ if (roleChangeStrategy.isCandidateRegistered()) {
+ topologyDispatcher.registerMountPoint(TypedActor.context(), new NodeId(nodeId), masterDataBrokerRef);
+ } else {
+ LOG.warn("Announce master mount point msg received but mount point is not ready yet");
+ }
+ } else if (message instanceof AnnounceMasterMountPointDown) {
+ LOG.warn("Master mountpoint went down");
+ masterDataBrokerRef = null;
+ topologyDispatcher.unregisterMountPoint(new NodeId(nodeId));
+ }
}
}
\ No newline at end of file
package org.opendaylight.netconf.topology.impl;
+import akka.actor.ActorContext;
+import akka.actor.ActorRef;
import io.netty.util.concurrent.EventExecutor;
import java.util.Collection;
import javax.annotation.Nonnull;
}
@Override
- public void registerMountPoint(NodeId nodeId) {
+ public void registerMountPoint(ActorContext context, NodeId nodeId) {
+ throw new UnsupportedOperationException("MountPoint registration is not supported in regular topology, this happens automaticaly in the netconf pipeline");
+ }
+
+ @Override
+ public void registerMountPoint(ActorContext context, NodeId nodeId, ActorRef masterRef) {
throw new UnsupportedOperationException("MountPoint registration is not supported in regular topology, this happens automaticaly in the netconf pipeline");
}
createNodeManager(nodeId);
nodes.put(nodeId, naBaseNodeManager);
- // put initial state into datastore
- naSalNodeWriter.init(nodeId, naBaseNodeManager.getInitialState(nodeId, node));
+ // only master should put initial state into datastore
+ if (isMaster) {
+ naSalNodeWriter.init(nodeId, naBaseNodeManager.getInitialState(nodeId, node));
+ }
// trigger connect on this node
return naBaseNodeManager.onNodeCreated(nodeId, node);
@Override
public ListenableFuture<Node> onNodeUpdated(final NodeId nodeId, final Node node) {
- // put initial state into datastore
- naSalNodeWriter.init(nodeId, nodes.get(nodeId).getInitialState(nodeId, node));
+ // only master should put initial state into datastore
+ if (isMaster) {
+ naSalNodeWriter.init(nodeId, nodes.get(nodeId).getInitialState(nodeId, node));
+ }
// Trigger onNodeUpdated only on this node
return nodes.get(nodeId).onNodeUpdated(nodeId, node);
throw new IllegalStateException(id.getValue() + " Transaction(" + txType + ") not committed correctly", t);
}
});
-
}
private void createNetworkTopologyIfNotPresent(final WriteTransaction writeTx) {
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.pipeline;
+
+import akka.actor.ActorSystem;
+import akka.actor.TypedActor;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.Collections;
+import java.util.Map;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBrokerExtension;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
+import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
+import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceDataBroker;
+import org.opendaylight.netconf.sal.connect.netconf.sal.tx.ReadWriteTx;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.netconf.topology.pipeline.tx.ProxyReadOnlyTransaction;
+import org.opendaylight.netconf.topology.pipeline.tx.ProxyWriteOnlyTransaction;
+import org.opendaylight.netconf.topology.util.messages.NormalizedNodeMessage;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import scala.concurrent.Future;
+import scala.concurrent.impl.Promise.DefaultPromise;
+
+public class NetconfDeviceMasterDataBroker implements ProxyNetconfDeviceDataBroker {
+
+ private final RemoteDeviceId id;
+
+ private final NetconfDeviceDataBroker delegateBroker;
+ private final ActorSystem actorSystem;
+
+ private DOMDataReadOnlyTransaction readTx;
+ private DOMDataWriteTransaction writeTx;
+
+ public NetconfDeviceMasterDataBroker(final ActorSystem actorSystem, final RemoteDeviceId id,
+ final SchemaContext schemaContext, final DOMRpcService rpc,
+ final NetconfSessionPreferences netconfSessionPreferences, final long requestTimeoutMillis) {
+ this.id = id;
+ delegateBroker = new NetconfDeviceDataBroker(id, schemaContext, rpc, netconfSessionPreferences, requestTimeoutMillis);
+ this.actorSystem = actorSystem;
+
+ // only ever need 1 readTx since it doesnt need to be closed
+ readTx = delegateBroker.newReadOnlyTransaction();
+ }
+
+ @Override
+ public DOMDataReadOnlyTransaction newReadOnlyTransaction() {
+ return new ProxyReadOnlyTransaction(actorSystem, id, TypedActor.<NetconfDeviceMasterDataBroker>self());
+ }
+
+ @Override
+ public DOMDataReadWriteTransaction newReadWriteTransaction() {
+ return new ReadWriteTx(new ProxyReadOnlyTransaction(actorSystem, id, TypedActor.<NetconfDeviceMasterDataBroker>self()),
+ newWriteOnlyTransaction());
+ }
+
+ @Override
+ public DOMDataWriteTransaction newWriteOnlyTransaction() {
+ writeTx = delegateBroker.newWriteOnlyTransaction();
+ return new ProxyWriteOnlyTransaction(actorSystem, TypedActor.<NetconfDeviceMasterDataBroker>self());
+ }
+
+ @Override
+ public ListenerRegistration<DOMDataChangeListener> registerDataChangeListener(LogicalDatastoreType store, YangInstanceIdentifier path, DOMDataChangeListener listener, DataChangeScope triggeringScope) {
+ throw new UnsupportedOperationException(id + ": Data change listeners not supported for netconf mount point");
+ }
+
+ @Override
+ public DOMTransactionChain createTransactionChain(TransactionChainListener listener) {
+ throw new UnsupportedOperationException(id + ": Transaction chains not supported for netconf mount point");
+ }
+
+ @Nonnull
+ @Override
+ public Map<Class<? extends DOMDataBrokerExtension>, DOMDataBrokerExtension> getSupportedExtensions() {
+ return Collections.emptyMap();
+ }
+
+ @Override
+ public Future<Optional<NormalizedNodeMessage>> read(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
+ final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readFuture = readTx.read(store, path);
+
+ final DefaultPromise<Optional<NormalizedNodeMessage>> promise = new DefaultPromise<>();
+ Futures.addCallback(readFuture, new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
+ @Override
+ public void onSuccess(Optional<NormalizedNode<?, ?>> result) {
+ if (!result.isPresent()) {
+ promise.success(Optional.<NormalizedNodeMessage>absent());
+ } else {
+ promise.success(Optional.of(new NormalizedNodeMessage(path, result.get())));
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ promise.failure(t);
+ }
+ });
+ return promise.future();
+ }
+
+ @Override
+ public Future<Boolean> exists(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
+ final CheckedFuture<Boolean, ReadFailedException> existsFuture = readTx.exists(store, path);
+
+ final DefaultPromise<Boolean> promise = new DefaultPromise<>();
+ Futures.addCallback(existsFuture, new FutureCallback<Boolean>() {
+ @Override
+ public void onSuccess(Boolean result) {
+ promise.success(result);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ promise.failure(t);
+ }
+ });
+ return promise.future();
+ }
+
+ @Override
+ public void put(final LogicalDatastoreType store, final NormalizedNodeMessage data) {
+ if (writeTx == null) {
+ writeTx = delegateBroker.newWriteOnlyTransaction();
+ }
+ writeTx.put(store, data.getIdentifier(), data.getNode());
+ }
+
+ @Override
+ public void merge(final LogicalDatastoreType store, final NormalizedNodeMessage data) {
+ if (writeTx == null) {
+ writeTx = delegateBroker.newWriteOnlyTransaction();
+ }
+ writeTx.merge(store, data.getIdentifier(), data.getNode());
+ }
+
+ @Override
+ public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
+ if (writeTx == null) {
+ writeTx = delegateBroker.newWriteOnlyTransaction();
+ }
+ writeTx.delete(store, path);
+ }
+
+ @Override
+ public boolean cancel() {
+ return writeTx.cancel();
+ }
+
+ @Override
+ public Future<Void> submit() {
+ final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = writeTx.submit();
+ final DefaultPromise<Void> promise = new DefaultPromise<>();
+ Futures.addCallback(submitFuture, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(Void result) {
+ promise.success(result);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ promise.failure(t);
+ }
+ });
+ return promise.future();
+ }
+
+ @Override
+ @Deprecated
+ public Future<RpcResult<TransactionStatus>> commit() {
+ final ListenableFuture<RpcResult<TransactionStatus>> commitFuture = writeTx.commit();
+ final DefaultPromise<RpcResult<TransactionStatus>> promise = new DefaultPromise<>();
+ Futures.addCallback(commitFuture, new FutureCallback<RpcResult<TransactionStatus>>() {
+ @Override
+ public void onSuccess(RpcResult<TransactionStatus> result) {
+ promise.success(result);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ promise.failure(t);
+ }
+ });
+ return promise.future();
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.pipeline;
+
+import akka.actor.ActorSystem;
+import java.util.Collections;
+import java.util.Map;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBrokerExtension;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
+import org.opendaylight.netconf.sal.connect.netconf.sal.tx.ReadWriteTx;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.netconf.topology.pipeline.tx.ProxyReadOnlyTransaction;
+import org.opendaylight.netconf.topology.pipeline.tx.ProxyWriteOnlyTransaction;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+public class NetconfDeviceSlaveDataBroker implements DOMDataBroker{
+
+ private final RemoteDeviceId id;
+ private final ProxyNetconfDeviceDataBroker masterDataBroker;
+ private final ActorSystem actorSystem;
+
+ public NetconfDeviceSlaveDataBroker(final ActorSystem actorSystem, final RemoteDeviceId id, final ProxyNetconfDeviceDataBroker masterDataBroker) {
+ this.id = id;
+ this.masterDataBroker = masterDataBroker;
+ this.actorSystem = actorSystem;
+ }
+
+ @Override
+ public DOMDataReadOnlyTransaction newReadOnlyTransaction() {
+ return new ProxyReadOnlyTransaction(actorSystem, id, masterDataBroker);
+ }
+
+ @Override
+ public DOMDataReadWriteTransaction newReadWriteTransaction() {
+ return new ReadWriteTx(new ProxyReadOnlyTransaction(actorSystem, id, masterDataBroker), new ProxyWriteOnlyTransaction(actorSystem, masterDataBroker));
+ }
+
+ @Override
+ public DOMDataWriteTransaction newWriteOnlyTransaction() {
+ return new ProxyWriteOnlyTransaction(actorSystem, masterDataBroker);
+ }
+
+ @Override
+ public ListenerRegistration<DOMDataChangeListener> registerDataChangeListener(LogicalDatastoreType store, YangInstanceIdentifier path, DOMDataChangeListener listener, DataChangeScope triggeringScope) {
+ throw new UnsupportedOperationException(id + ": Data change listeners not supported for netconf mount point");
+ }
+
+ @Override
+ public DOMTransactionChain createTransactionChain(TransactionChainListener listener) {
+ throw new UnsupportedOperationException(id + ": Transaction chains not supported for netconf mount point");
+ }
+
+ @Nonnull
+ @Override
+ public Map<Class<? extends DOMDataBrokerExtension>, DOMDataBrokerExtension> getSupportedExtensions() {
+ return Collections.emptyMap();
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.pipeline;
+
+import com.google.common.base.Optional;
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.netconf.topology.util.messages.NormalizedNodeMessage;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import scala.concurrent.Future;
+
+public interface ProxyNetconfDeviceDataBroker extends DOMDataBroker{
+ Future<Optional<NormalizedNodeMessage>> read(LogicalDatastoreType store, YangInstanceIdentifier path);
+
+ Future<Boolean> exists(LogicalDatastoreType store, YangInstanceIdentifier path);
+
+ void put(LogicalDatastoreType store, NormalizedNodeMessage data);
+
+ void merge(LogicalDatastoreType store, NormalizedNodeMessage data);
+
+ void delete(LogicalDatastoreType store, YangInstanceIdentifier path);
+
+ boolean cancel();
+
+ Future<Void> submit();
+
+ @Deprecated
+ Future<RpcResult<TransactionStatus>> commit();
+}
package org.opendaylight.netconf.topology.pipeline;
+import akka.actor.ActorContext;
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.TypedActor;
+import akka.actor.TypedProps;
+import akka.cluster.Cluster;
+import akka.cluster.Member;
+import akka.japi.Creator;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
import org.opendaylight.controller.sal.core.api.Broker;
import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
-import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceDataBroker;
import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceNotificationService;
import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.netconf.topology.util.messages.AnnounceMasterMountPoint;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private static final Logger LOG = LoggerFactory.getLogger(TopologyMountPointFacade.class);
+ private static final String MOUNT_POINT = "mountpoint";
+
+ private final String topologyId;
private final RemoteDeviceId id;
private final Broker domBroker;
private final BindingAwareBroker bindingBroker;
private DOMRpcService deviceRpc = null;
private final ClusteredNetconfDeviceMountInstanceProxy salProvider;
+ private ActorSystem actorSystem;
+ private DOMDataBroker deviceDataBroker = null;
+
private final ArrayList<RemoteDeviceHandler<NetconfSessionPreferences>> connectionStatusListeners = new ArrayList<>();
- public TopologyMountPointFacade(final RemoteDeviceId id,
+ public TopologyMountPointFacade(final String topologyId,
+ final RemoteDeviceId id,
final Broker domBroker,
final BindingAwareBroker bindingBroker,
long defaultRequestTimeoutMillis) {
-
+ this.topologyId = topologyId;
this.id = id;
this.domBroker = domBroker;
this.bindingBroker = bindingBroker;
salProvider.getMountInstance().publish(domNotification);
}
- public void registerMountPoint() {
+ public void registerMountPoint(final ActorSystem actorSystem, final ActorContext context) {
Preconditions.checkNotNull(id);
Preconditions.checkNotNull(remoteSchemaContext, "Device has no remote schema context yet. Probably not fully connected.");
Preconditions.checkNotNull(netconfSessionPreferences, "Device has no capabilities yet. Probably not fully connected.");
+ this.actorSystem = actorSystem;
+ final NetconfDeviceNotificationService notificationService = new NetconfDeviceNotificationService();
- final DOMDataBroker netconfDeviceDataBroker = new NetconfDeviceDataBroker(id, remoteSchemaContext, deviceRpc, netconfSessionPreferences, defaultRequestTimeoutMillis);
+ LOG.warn("Creating master data broker for device {}", id);
+ deviceDataBroker = TypedActor.get(context).typedActorOf(new TypedProps<>(ProxyNetconfDeviceDataBroker.class, new Creator<NetconfDeviceMasterDataBroker>() {
+ @Override
+ public NetconfDeviceMasterDataBroker create() throws Exception {
+ return new NetconfDeviceMasterDataBroker(actorSystem, id, remoteSchemaContext, deviceRpc, netconfSessionPreferences, defaultRequestTimeoutMillis);
+ }
+ }), MOUNT_POINT);
+ LOG.warn("Master data broker registered on path {}", TypedActor.get(actorSystem).getActorRefFor(deviceDataBroker).path());
+ salProvider.getMountInstance().onTopologyDeviceConnected(remoteSchemaContext, deviceDataBroker, deviceRpc, notificationService);
+ final Cluster cluster = Cluster.get(actorSystem);
+ final Iterable<Member> members = cluster.state().getMembers();
+ final ActorRef deviceDataBrokerRef = TypedActor.get(actorSystem).getActorRefFor(deviceDataBroker);
+ for (final Member member : members) {
+ if (!member.address().equals(cluster.selfAddress())) {
+ final String path = member.address() + "/user/" + topologyId + "/" + id.getName();
+ actorSystem.actorSelection(path).tell(new AnnounceMasterMountPoint(), deviceDataBrokerRef);
+ }
+ }
+ }
+
+ public void registerMountPoint(final ActorSystem actorSystem, final ActorContext context, final ActorRef masterRef) {
+ Preconditions.checkNotNull(id);
+ Preconditions.checkNotNull(remoteSchemaContext, "Device has no remote schema context yet. Probably not fully connected.");
+ Preconditions.checkNotNull(netconfSessionPreferences, "Device has no capabilities yet. Probably not fully connected.");
+ this.actorSystem = actorSystem;
final NetconfDeviceNotificationService notificationService = new NetconfDeviceNotificationService();
- salProvider.getMountInstance().onTopologyDeviceConnected(remoteSchemaContext, netconfDeviceDataBroker, deviceRpc, notificationService);
+ LOG.warn("Creating a proxy for master data broker");
+ final ProxyNetconfDeviceDataBroker masterDataBroker = TypedActor.get(actorSystem).typedActorOf(new TypedProps<>(ProxyNetconfDeviceDataBroker.class, NetconfDeviceMasterDataBroker.class), masterRef);
+ LOG.warn("Creating slave data broker for device {}", id);
+ final DOMDataBroker deviceDataBroker = new NetconfDeviceSlaveDataBroker(actorSystem, id, masterDataBroker);
+ salProvider.getMountInstance().onTopologyDeviceConnected(remoteSchemaContext, deviceDataBroker, deviceRpc, notificationService);
}
public void unregisterMountPoint() {
salProvider.getMountInstance().onTopologyDeviceDisconnected();
+ if (deviceDataBroker != null) {
+ LOG.warn("Stopping master data broker for device {}", id.getName());
+ TypedActor.get(actorSystem).stop(deviceDataBroker);
+ deviceDataBroker = null;
+ }
}
public ConnectionStatusListenerRegistration registerConnectionStatusListener(final RemoteDeviceHandler<NetconfSessionPreferences> listener) {
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.pipeline.tx;
+
+public interface NetconfDeviceDataBrokerProxy {
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.pipeline.tx;
+
+import akka.actor.ActorSystem;
+import akka.dispatch.OnComplete;
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.SettableFuture;
+import javax.annotation.Nullable;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.netconf.topology.pipeline.ProxyNetconfDeviceDataBroker;
+import org.opendaylight.netconf.topology.util.messages.NormalizedNodeMessage;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import scala.concurrent.Future;
+
+public class ProxyReadOnlyTransaction implements DOMDataReadOnlyTransaction{
+
+ private final RemoteDeviceId id;
+ private final ProxyNetconfDeviceDataBroker delegate;
+ private final ActorSystem actorSystem;
+
+ public ProxyReadOnlyTransaction(final ActorSystem actorSystem, final RemoteDeviceId id, final ProxyNetconfDeviceDataBroker delegate) {
+ this.id = id;
+ this.delegate = delegate;
+ this.actorSystem = actorSystem;
+ }
+
+ @Override
+ public void close() {
+ //NOOP
+ }
+
+ @Override
+ public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
+ final Future<Optional<NormalizedNodeMessage>> future = delegate.read(store, path);
+ final SettableFuture<Optional<NormalizedNode<?, ?>>> settableFuture = SettableFuture.create();
+ final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> checkedFuture = Futures.makeChecked(settableFuture, new Function<Exception, ReadFailedException>() {
+ @Nullable
+ @Override
+ public ReadFailedException apply(Exception cause) {
+ return new ReadFailedException("Read from transaction failed", cause);
+ }
+ });
+ future.onComplete(new OnComplete<Optional<NormalizedNodeMessage>>() {
+ @Override
+ public void onComplete(Throwable throwable, Optional<NormalizedNodeMessage> normalizedNodeMessage) throws Throwable {
+ if (throwable == null) {
+ settableFuture.set(normalizedNodeMessage.transform(new Function<NormalizedNodeMessage, NormalizedNode<?, ?>>() {
+ @Nullable
+ @Override
+ public NormalizedNode<?, ?> apply(NormalizedNodeMessage input) {
+ return input.getNode();
+ }
+ }));
+ } else {
+ settableFuture.setException(throwable);
+ }
+ }
+ }, actorSystem.dispatcher());
+ return checkedFuture;
+ }
+
+ @Override
+ public CheckedFuture<Boolean, ReadFailedException> exists(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
+ final Future<Boolean> existsFuture = delegate.exists(store, path);
+ final SettableFuture<Boolean> settableFuture = SettableFuture.create();
+ final CheckedFuture<Boolean, ReadFailedException> checkedFuture = Futures.makeChecked(settableFuture, new Function<Exception, ReadFailedException>() {
+ @Nullable
+ @Override
+ public ReadFailedException apply(Exception cause) {
+ return new ReadFailedException("Read from transaction failed", cause);
+ }
+ });
+ existsFuture.onComplete(new OnComplete<Boolean>() {
+ @Override
+ public void onComplete(Throwable throwable, Boolean result) throws Throwable {
+ if (throwable == null) {
+ settableFuture.set(result);
+ } else {
+ settableFuture.setException(throwable);
+ }
+ }
+ }, actorSystem.dispatcher());
+ return checkedFuture;
+ }
+
+ @Override
+ public Object getIdentifier() {
+ return this;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.pipeline.tx;
+
+import akka.actor.ActorSystem;
+import akka.dispatch.OnComplete;
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import javax.annotation.Nullable;
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.netconf.topology.pipeline.ProxyNetconfDeviceDataBroker;
+import org.opendaylight.netconf.topology.util.messages.NormalizedNodeMessage;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import scala.concurrent.Future;
+
+public class ProxyWriteOnlyTransaction implements DOMDataWriteTransaction {
+
+ private final ProxyNetconfDeviceDataBroker delegate;
+ private final ActorSystem actorSystem;
+
+ public ProxyWriteOnlyTransaction(ActorSystem actorSystem, final ProxyNetconfDeviceDataBroker delegate) {
+ this.delegate = delegate;
+ this.actorSystem = actorSystem;
+ }
+
+ @Override
+ public void put (final LogicalDatastoreType store, final YangInstanceIdentifier path, final NormalizedNode < ?,?>data){
+ delegate.put(store, new NormalizedNodeMessage(path, data));
+ }
+
+ @Override
+ public void merge (final LogicalDatastoreType store, final YangInstanceIdentifier path, final NormalizedNode < ?,?>data){
+ delegate.merge(store, new NormalizedNodeMessage(path, data));
+ }
+
+ @Override
+ public boolean cancel () {
+ return delegate.cancel();
+ }
+
+ @Override
+ public void delete (final LogicalDatastoreType store, final YangInstanceIdentifier path){
+ delegate.delete(store, path);
+ }
+
+ @Override
+ public CheckedFuture<Void, TransactionCommitFailedException> submit() {
+ final Future<Void> submit = delegate.submit();
+ final SettableFuture<Void> settableFuture = SettableFuture.create();
+ final CheckedFuture<Void, TransactionCommitFailedException> checkedFuture = Futures.makeChecked(settableFuture, new Function<Exception, TransactionCommitFailedException>() {
+ @Nullable
+ @Override
+ public TransactionCommitFailedException apply(Exception input) {
+ return new TransactionCommitFailedException("Transaction commit failed", input);
+ }
+ });
+ submit.onComplete(new OnComplete<Void>() {
+ @Override
+ public void onComplete(Throwable throwable, Void aVoid) throws Throwable {
+ if (throwable == null) {
+ settableFuture.set(aVoid);
+ } else {
+ settableFuture.setException(throwable);
+ }
+ }
+ }, actorSystem.dispatcher());
+ return checkedFuture;
+ }
+
+ @Override
+ public ListenableFuture<RpcResult<TransactionStatus>> commit () {
+ final Future<RpcResult<TransactionStatus>> commit = delegate.commit();
+ final SettableFuture<RpcResult<TransactionStatus>> settableFuture = SettableFuture.create();
+ commit.onComplete(new OnComplete<RpcResult<TransactionStatus>>() {
+ @Override
+ public void onComplete(Throwable throwable, RpcResult<TransactionStatus> transactionStatusRpcResult) throws Throwable {
+ if (throwable == null) {
+ settableFuture.set(transactionStatusRpcResult);
+ } else {
+ settableFuture.setException(throwable);
+ }
+ }
+ }, actorSystem.dispatcher());
+ return settableFuture;
+ }
+
+ @Override
+ public Object getIdentifier () {
+ return this;
+ }
+}
import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
import org.opendaylight.netconf.topology.NodeManagerCallback.NodeManagerCallbackFactory;
import org.opendaylight.netconf.topology.TopologyManagerCallback.TopologyManagerCallbackFactory;
import org.opendaylight.netconf.topology.example.ExampleNodeManagerCallback;
public void onReceive(Object o, ActorRef actorRef) {
}
+
+ @Override
+ public void onDeviceConnected(SchemaContext remoteSchemaContext, NetconfSessionPreferences netconfSessionPreferences, DOMRpcService deviceRpc) {
+
+ }
+
+ @Override
+ public void onDeviceDisconnected() {
+
+ }
+
+ @Override
+ public void onDeviceFailed(Throwable throwable) {
+
+ }
+
+ @Override
+ public void onNotification(DOMNotification domNotification) {
+
+ }
+
+ @Override
+ public void close() {
+
+ }
}
public static class TestingTopologyManagerCallback implements TopologyManagerCallback {
package org.opendaylight.netconf.topology;
+import akka.actor.ActorContext;
+import akka.actor.ActorRef;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
}
@Override
- public void registerMountPoint(NodeId nodeId) {
+ public void registerMountPoint(ActorContext context, NodeId nodeId) {
LOG.debug("Registering mount point for node {}", nodeId.getValue());
+ }
+ @Override
+ public void registerMountPoint(ActorContext context, NodeId nodeId, ActorRef masterRef) {
+ LOG.debug("Registering mount point for node {}", nodeId.getValue());
}
@Override