Abstract topology base implementation 51/28451/9
authorTomas Cere <tcere@cisco.com>
Fri, 27 Nov 2015 13:31:13 +0000 (14:31 +0100)
committerTomas Cere <tcere@cisco.com>
Fri, 27 Nov 2015 14:51:18 +0000 (15:51 +0100)
Change-Id: I52b5d2f309789402dc72319d59fb75a3ac4c8df1
Signed-off-by: Tomas Cere <tcere@cisco.com>
12 files changed:
opendaylight/netconf/abstract-topology/pom.xml
opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/example/ExampleNodeManagerCallback.java [new file with mode: 0644]
opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/example/ExampleSingleStateAggregator.java [new file with mode: 0644]
opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/example/ExampleTopology.java [new file with mode: 0644]
opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/example/ExampleTopologyManagerCallback.java [new file with mode: 0644]
opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/example/LoggingSalNodeWriter.java [new file with mode: 0644]
opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/util/BaseNodeManager.java [new file with mode: 0644]
opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/util/BaseTopologyManager.java [new file with mode: 0644]
opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/util/NodeRoleChangeStrategy.java [new file with mode: 0644]
opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/util/NoopRoleChangeStrategy.java [new file with mode: 0644]
opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/util/SalNodeWriter.java [new file with mode: 0644]
opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/util/TopologyRoleChangeStrategy.java [new file with mode: 0644]

index 4ae07be4193bb6b17f28c7ecd1a31d215512e88f..703ee4e9bfa5fb349db5a27050151563bd2f8cc9 100644 (file)
         </dependency>
         <dependency>
             <groupId>com.typesafe.akka</groupId>
-            <artifactId>akka-cluster_2.11</artifactId>
-            <version>2.3.14</version>
+            <artifactId>akka-cluster_${scala.major.version}</artifactId>
+            <version>${akka.version}</version>
         </dependency>
         <dependency>
             <groupId>com.typesafe.akka</groupId>
-            <artifactId>akka-osgi_2.11</artifactId>
-            <version>2.3.14</version>
+            <artifactId>akka-osgi_${scala.major.version}</artifactId>
+            <version>${akka.version}</version>
         </dependency>
         <dependency>
             <groupId>com.typesafe</groupId>
diff --git a/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/example/ExampleNodeManagerCallback.java b/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/example/ExampleNodeManagerCallback.java
new file mode 100644 (file)
index 0000000..05b69fc
--- /dev/null
@@ -0,0 +1,73 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.example;
+
+import akka.actor.ActorRef;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import javax.annotation.Nonnull;
+import org.opendaylight.netconf.topology.NodeManagerCallback;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeFields;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
+
+public class ExampleNodeManagerCallback implements NodeManagerCallback {
+
+    public ExampleNodeManagerCallback() {
+    }
+
+    @Nonnull
+    @Override public Node getInitialState(@Nonnull final NodeId nodeId,
+                                          @Nonnull final Node configNode) {
+        return new NodeBuilder().addAugmentation(NetconfNode.class,
+                new NetconfNodeBuilder().setConnectionStatus(NetconfNodeFields.ConnectionStatus.Connecting).build()).build();
+    }
+
+    @Nonnull @Override public Node getFailedState(@Nonnull final NodeId nodeId,
+                                                  @Nonnull final Node configNode) {
+        return new NodeBuilder().addAugmentation(NetconfNode.class,
+                new NetconfNodeBuilder().setConnectionStatus(NetconfNodeFields.ConnectionStatus.UnableToConnect).build()).build();
+    }
+
+    @Nonnull @Override public ListenableFuture<Node> onNodeCreated(@Nonnull final NodeId nodeId,
+                                                                   @Nonnull final Node configNode) {
+        return Futures.immediateFuture(new NodeBuilder().addAugmentation(NetconfNode.class,
+                new NetconfNodeBuilder().setConnectionStatus(NetconfNodeFields.ConnectionStatus.Connected).build()).build());
+    }
+
+    @Nonnull @Override public ListenableFuture<Node> onNodeUpdated(@Nonnull final NodeId nodeId,
+                                                                   @Nonnull final Node configNode) {
+        // update magic
+        return Futures.immediateFuture(new NodeBuilder().addAugmentation(NetconfNode.class,
+                new NetconfNodeBuilder().setConnectionStatus(NetconfNodeFields.ConnectionStatus.Connected).build()).build());
+    }
+
+    @Nonnull @Override public ListenableFuture<Void> onNodeDeleted(@Nonnull final NodeId nodeId) {
+        return Futures.immediateFuture(null);
+    }
+
+    @Nonnull
+    @Override
+    public ListenableFuture<Node> getCurrentStatusForNode(@Nonnull NodeId nodeId) {
+        return null;
+    }
+
+    @Override
+    public void onReceive(Object message, ActorRef sender) {
+
+    }
+
+    @Override
+    public void onRoleChanged(RoleChangeDTO roleChangeDTO) {
+
+    }
+}
diff --git a/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/example/ExampleSingleStateAggregator.java b/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/example/ExampleSingleStateAggregator.java
new file mode 100644 (file)
index 0000000..6d27c26
--- /dev/null
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.example;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.List;
+import org.opendaylight.netconf.topology.StateAggregator;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+
+/**
+ * Aggregator implementation expecting just a single state
+ */
+public final class ExampleSingleStateAggregator implements StateAggregator {
+
+    @Override public ListenableFuture<Node> combineCreateAttempts(final List<ListenableFuture<Node>> stateFutures) {
+        return getSingleFuture(stateFutures);
+    }
+
+    private <T> ListenableFuture<T> getSingleFuture(final List<ListenableFuture<T>> stateFutures) {
+        Preconditions.checkArgument(stateFutures.size() == 1, "Recieved multiple results, Single result is enforced here");
+        return stateFutures.get(0);
+    }
+
+    @Override public ListenableFuture<Node> combineUpdateAttempts(final List<ListenableFuture<Node>> stateFutures) {
+        return getSingleFuture(stateFutures);
+    }
+
+    @Override public ListenableFuture<Void> combineDeleteAttempts(final List<ListenableFuture<Void>> stateFutures) {
+        return getSingleFuture(stateFutures);
+    }
+}
diff --git a/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/example/ExampleTopology.java b/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/example/ExampleTopology.java
new file mode 100644 (file)
index 0000000..c31e18e
--- /dev/null
@@ -0,0 +1,53 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.example;
+
+import akka.actor.ActorSystem;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
+import org.opendaylight.netconf.topology.NodeManagerCallback;
+import org.opendaylight.netconf.topology.NodeManagerCallback.NodeManagerCallbackFactory;
+import org.opendaylight.netconf.topology.TopologyManagerCallback;
+import org.opendaylight.netconf.topology.TopologyManagerCallback.TopologyManagerCallbackFactory;
+import org.opendaylight.netconf.topology.util.BaseTopologyManager;
+
+public class ExampleTopology {
+
+    private static final String TOPOLOGY_NETCONF = "topology-netconf";
+    private BaseTopologyManager netconfNodeBaseTopologyManager;
+    private final DataBroker dataBroker;
+
+    public ExampleTopology(final EntityOwnershipService entityOwnershipService, final DataBroker dataBroker) {
+        final ActorSystem actorSystem = ActorSystem.create("netconf-cluster");
+
+        this.dataBroker = dataBroker;
+
+        final NodeManagerCallbackFactory nodeManagerCallbackFactory = new NodeManagerCallbackFactory() {
+            @Override
+            public NodeManagerCallback create(String nodeId, String topologyId, ActorSystem actorSystem) {
+                return new ExampleNodeManagerCallback();
+            }
+        };
+
+        final TopologyManagerCallbackFactory topologyManagerCallbackFactory = new TopologyManagerCallbackFactory() {
+            @Override
+            public TopologyManagerCallback create(ActorSystem actorSystem, String topologyId) {
+                return new ExampleTopologyManagerCallback(actorSystem, dataBroker, topologyId, nodeManagerCallbackFactory);
+            }
+        };
+
+//        netconfNodeBaseTopologyManager = new BaseTopologyManager<>(dataBroker, TOPOLOGY_NETCONF,
+//                topologyManagerCallbackFactory,
+//                new SingleStateAggregator(),
+//                new SalNodeWriter(dataBroker, TOPOLOGY_NETCONF),
+//                new TopologyRoleChangeStrategy(dataBroker, entityOwnershipService, "netconf", TOPOLOGY_NETCONF));
+
+    }
+
+}
diff --git a/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/example/ExampleTopologyManagerCallback.java b/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/example/ExampleTopologyManagerCallback.java
new file mode 100644 (file)
index 0000000..76e55ac
--- /dev/null
@@ -0,0 +1,150 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.example;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.TypedActor;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.netconf.topology.NodeManager;
+import org.opendaylight.netconf.topology.NodeManagerCallback.NodeManagerCallbackFactory;
+import org.opendaylight.netconf.topology.TopologyManagerCallback;
+import org.opendaylight.netconf.topology.util.BaseNodeManager.BaseNodeManagerBuilder;
+import org.opendaylight.netconf.topology.util.NodeWriter;
+import org.opendaylight.netconf.topology.util.NoopRoleChangeStrategy;
+import org.opendaylight.netconf.topology.util.SalNodeWriter;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ExampleTopologyManagerCallback implements TopologyManagerCallback {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ExampleTopologyManagerCallback.class);
+
+    private final DataBroker dataBroker;
+    private final ActorSystem actorSystem;
+    private boolean isMaster;
+
+    private final String topologyId;
+    private final NodeWriter naSalNodeWriter;
+    private final Map<NodeId, NodeManager> nodes = new HashMap<>();
+    private final NodeManagerCallbackFactory nodeHandlerFactory;
+
+    public ExampleTopologyManagerCallback(final ActorSystem actorSystem,
+                                          final DataBroker dataBroker,
+                                          final String topologyId,
+                                          final NodeManagerCallbackFactory nodeHandlerFactory) {
+        this(actorSystem, dataBroker, topologyId, nodeHandlerFactory, new SalNodeWriter(dataBroker, topologyId));
+    }
+
+    public ExampleTopologyManagerCallback(final ActorSystem actorSystem,
+                                          final DataBroker dataBroker,
+                                          final String topologyId,
+                                          final NodeManagerCallbackFactory nodeHandlerFactory,
+                                          final NodeWriter naSalNodeWriter) {
+        this(actorSystem, dataBroker, topologyId, nodeHandlerFactory, naSalNodeWriter, false);
+
+    }
+
+    public ExampleTopologyManagerCallback(final ActorSystem actorSystem,
+                                          final DataBroker dataBroker,
+                                          final String topologyId,
+                                          final NodeManagerCallbackFactory nodeHandlerFactory,
+                                          final NodeWriter naSalNodeWriter,
+                                          boolean isMaster) {
+        this.dataBroker = dataBroker;
+        this.actorSystem = actorSystem;
+        this.topologyId = topologyId;
+        this.nodeHandlerFactory = nodeHandlerFactory;
+        this.naSalNodeWriter = naSalNodeWriter;
+
+        this.isMaster = isMaster;
+    }
+
+    @Override
+    public ListenableFuture<Node> onNodeCreated(NodeId nodeId, Node node) {
+        // Init node admin and a writer for it
+
+        // TODO let end user code notify the baseNodeManager about state changes and handle them here on topology level
+        final NodeManager naBaseNodeManager =
+                createNodeManager(nodeId);
+
+        nodes.put(nodeId, naBaseNodeManager);
+
+        // Set initial state ? in every peer or just master ? TODO
+        if (isMaster) {
+            naSalNodeWriter.init(nodeId, naBaseNodeManager.getInitialState(nodeId, node));
+        }
+
+        // trigger connect on this node
+        return naBaseNodeManager.onNodeCreated(nodeId, node);
+    }
+
+    @Override
+    public ListenableFuture<Node> onNodeUpdated(final NodeId nodeId, final Node node) {
+        // Set initial state
+        naSalNodeWriter.init(nodeId, nodes.get(nodeId).getInitialState(nodeId, node));
+
+        // Trigger onNodeUpdated only on this node
+        return nodes.get(nodeId).onNodeUpdated(nodeId, node);
+    }
+
+    @Override
+    public ListenableFuture<Void> onNodeDeleted(final NodeId nodeId) {
+        // Trigger delete only on this node
+        final ListenableFuture<Void> future = nodes.get(nodeId).onNodeDeleted(nodeId);
+        Futures.addCallback(future, new FutureCallback<Void>() {
+            @Override
+            public void onSuccess(Void result) {
+                // remove proxy from node list and stop the actor
+                final NodeManager remove = nodes.remove(nodeId);
+                TypedActor.get(actorSystem).stop(remove);
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                // NOOP will be handled on higher level
+            }
+        });
+        return future;
+    }
+
+    @Nonnull
+    @Override
+    public ListenableFuture<Node> getCurrentStatusForNode(@Nonnull NodeId nodeId) {
+        return nodes.get(nodeId).getCurrentStatusForNode(nodeId);
+    }
+
+    @Override
+    public void onRoleChanged(RoleChangeDTO roleChangeDTO) {
+        isMaster = roleChangeDTO.isOwner();
+        // our post-election logic
+    }
+
+    private NodeManager createNodeManager(NodeId nodeId) {
+        return new BaseNodeManagerBuilder().setNodeId(nodeId.getValue())
+                .setActorContext(TypedActor.context())
+                .setDelegateFactory(nodeHandlerFactory)
+                .setRoleChangeStrategy(new NoopRoleChangeStrategy())
+                .setTopologyId(topologyId)
+                .build();
+    }
+
+    @Override
+    public void onReceive(Object o, ActorRef actorRef) {
+
+    }
+}
diff --git a/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/example/LoggingSalNodeWriter.java b/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/example/LoggingSalNodeWriter.java
new file mode 100644 (file)
index 0000000..e4ead18
--- /dev/null
@@ -0,0 +1,58 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.example;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import javax.annotation.Nonnull;
+import org.opendaylight.netconf.topology.util.NodeWriter;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LoggingSalNodeWriter implements NodeWriter{
+
+    private static final Logger LOG = LoggerFactory.getLogger(LoggingSalNodeWriter.class);
+
+    private final ArrayList<NodeWriter> delegates;
+
+    public LoggingSalNodeWriter(final NodeWriter... delegates) {
+        this.delegates = new ArrayList<>(Arrays.asList(delegates));
+    }
+
+    @Override
+    public void init(@Nonnull NodeId id, @Nonnull Node operationalDataNode) {
+        LOG.warn("Init recieved");
+        LOG.warn("NodeId: {}", id.getValue());
+        LOG.warn("Node: {}", operationalDataNode);
+        for (final NodeWriter delegate : delegates) {
+            delegate.init(id, operationalDataNode);
+        }
+    }
+
+    @Override
+    public void update(@Nonnull NodeId id, @Nonnull Node operationalDataNode) {
+        LOG.warn("Update recieved");
+        LOG.warn("NodeId: {}", id.getValue());
+        LOG.warn("Node: {}", operationalDataNode);
+        for (final NodeWriter delegate : delegates) {
+            delegate.update(id, operationalDataNode);
+        }
+    }
+
+    @Override
+    public void delete(@Nonnull NodeId id) {
+        LOG.warn("Delete recieved");
+        LOG.warn("NodeId: {}", id.getValue());
+        for (final NodeWriter delegate : delegates) {
+            delegate.delete(id);
+        }
+    }
+}
diff --git a/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/util/BaseNodeManager.java b/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/util/BaseNodeManager.java
new file mode 100644 (file)
index 0000000..0315150
--- /dev/null
@@ -0,0 +1,177 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.util;
+
+import akka.actor.ActorContext;
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.TypedActor;
+import akka.actor.TypedProps;
+import akka.japi.Creator;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
+import javax.annotation.Nonnull;
+import org.opendaylight.netconf.topology.NodeManager;
+import org.opendaylight.netconf.topology.NodeManagerCallback;
+import org.opendaylight.netconf.topology.NodeManagerCallback.NodeManagerCallbackFactory;
+import org.opendaylight.netconf.topology.RoleChangeStrategy;
+import org.opendaylight.netconf.topology.util.messages.NormalizedNodeMessage;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+
+public final class BaseNodeManager implements NodeManager {
+
+    private static final Logger LOG = LoggerFactory.getLogger(BaseNodeManager.class);
+
+    private final String nodeId;
+    private final String topologyId;
+    private final ActorSystem actorSystem;
+
+    private boolean isMaster;
+    private NodeManagerCallback delegate;
+
+    private BaseNodeManager(final String nodeId,
+                            final String topologyId,
+                            final ActorSystem actorSystem,
+                            final NodeManagerCallbackFactory delegateFactory,
+                            final RoleChangeStrategy roleChangeStrategy) {
+        LOG.debug("Creating BaseNodeManager, id: {}, {}", topologyId, nodeId );
+        this.nodeId = nodeId;
+        this.topologyId = topologyId;
+        this.actorSystem = actorSystem;
+        this.delegate = delegateFactory.create(nodeId, topologyId, actorSystem);
+        // if we want to override the place election happens,
+        // we need to override this with noop election strategy and implement election in callback
+        // cannot leak "this" here! have to use TypedActor.self()
+        roleChangeStrategy.registerRoleCandidate((NodeManager) TypedActor.self());
+    }
+
+    @Nonnull @Override public Node getInitialState(@Nonnull final NodeId nodeId, @Nonnull final Node configNode) {
+        LOG.trace("Retrieving Node {} initial state", nodeId);
+        return delegate.getInitialState(nodeId, configNode);
+    }
+
+    @Nonnull @Override public Node getFailedState(@Nonnull final NodeId nodeId, @Nonnull final Node configNode) {
+        LOG.trace("Retrieving Node {} failed state", nodeId);
+        return delegate.getFailedState(nodeId, configNode);
+    }
+
+    @Nonnull @Override public ListenableFuture<Node> onNodeCreated(@Nonnull final NodeId nodeId, @Nonnull final Node configNode) {
+        LOG.debug("Creating Node {}, with configuration: {}", nodeId.getValue(), configNode);
+        return delegate.onNodeCreated(nodeId, configNode);
+    }
+
+    @Nonnull @Override public ListenableFuture<Node> onNodeUpdated(@Nonnull final NodeId nodeId, @Nonnull final Node configNode) {
+        LOG.debug("Updating Node {}, with configuration: {}", nodeId.getValue(), configNode);
+        return delegate.onNodeUpdated(nodeId, configNode);
+    }
+
+    @Nonnull @Override public ListenableFuture<Void> onNodeDeleted(@Nonnull final NodeId nodeId) {
+        LOG.debug("Deleting Node {}", nodeId.getValue());
+        return delegate.onNodeDeleted(nodeId);
+    }
+
+    @Nonnull
+    @Override
+    public ListenableFuture<Node> getCurrentStatusForNode(@Nonnull NodeId nodeId) {
+        LOG.debug("Getting current status for node: {}", nodeId.getValue());
+        return delegate.getCurrentStatusForNode(nodeId);
+    }
+
+    @Override
+    public void onRoleChanged(RoleChangeDTO roleChangeDTO) {
+        LOG.debug("Node {} role has changed from: {} to {}", nodeId,
+                (roleChangeDTO.wasOwner() ? "master" : "slave"),
+                (roleChangeDTO.isOwner() ? "master" : "slave"));
+
+        isMaster = roleChangeDTO.isOwner();
+        delegate.onRoleChanged(roleChangeDTO);
+    }
+
+    @Override
+    public void onReceive(Object o, ActorRef actorRef) {
+
+    }
+
+    @Override
+    public Future<NormalizedNodeMessage> onRemoteNodeCreated(final NormalizedNodeMessage message) {
+        return null;
+    }
+
+    @Override
+    public Future<NormalizedNodeMessage> onRemoteNodeUpdated(final NormalizedNodeMessage message) {
+        return null;
+    }
+
+    @Override
+    public Future<Void> onRemoteNodeDeleted(final NodeId nodeId) {
+        return null;
+    }
+
+    @Override
+    public Future<NormalizedNodeMessage> remoteGetCurrentStatusForNode(final NodeId nodeId) {
+        return null;
+    }
+
+    /**
+     * Builder of BaseNodeManager instances that are proxied as TypedActors
+     */
+    public static class BaseNodeManagerBuilder {
+        private String nodeId;
+        private String topologyId;
+        private NodeManagerCallbackFactory delegateFactory;
+        private RoleChangeStrategy roleChangeStrategy;
+        private ActorContext actorContext;
+
+
+        public BaseNodeManagerBuilder setNodeId(final String nodeId) {
+            this.nodeId = nodeId;
+            return this;
+        }
+
+        public BaseNodeManagerBuilder setTopologyId(final String topologyId) {
+            this.topologyId = topologyId;
+            return this;
+        }
+
+        public BaseNodeManagerBuilder setDelegateFactory(final NodeManagerCallbackFactory delegateFactory) {
+            this.delegateFactory = delegateFactory;
+            return this;
+        }
+
+        public BaseNodeManagerBuilder setRoleChangeStrategy(final RoleChangeStrategy roleChangeStrategy) {
+            this.roleChangeStrategy = roleChangeStrategy;
+            return this;
+        }
+
+        public BaseNodeManagerBuilder setActorContext(final ActorContext actorContext) {
+            this.actorContext = actorContext;
+            return this;
+        }
+
+        public NodeManager build() {
+            Preconditions.checkNotNull(nodeId);
+            Preconditions.checkNotNull(topologyId);
+            Preconditions.checkNotNull(delegateFactory);
+            Preconditions.checkNotNull(roleChangeStrategy);
+            Preconditions.checkNotNull(actorContext);
+            LOG.debug("Creating typed actor with id: {}", nodeId);
+
+            return TypedActor.get(actorContext).typedActorOf(new TypedProps<>(NodeManager.class, new Creator<BaseNodeManager>() {
+                @Override
+                public BaseNodeManager create() throws Exception {
+                    return new BaseNodeManager(nodeId, topologyId, actorContext.system(), delegateFactory, roleChangeStrategy);
+                }
+            }), nodeId);
+        }
+    }
+}
diff --git a/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/util/BaseTopologyManager.java b/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/util/BaseTopologyManager.java
new file mode 100644 (file)
index 0000000..ca55bb0
--- /dev/null
@@ -0,0 +1,565 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.util;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Address;
+import akka.actor.TypedActor;
+import akka.actor.TypedActorExtension;
+import akka.actor.TypedProps;
+import akka.cluster.Cluster;
+import akka.cluster.ClusterEvent;
+import akka.cluster.ClusterEvent.MemberEvent;
+import akka.cluster.ClusterEvent.MemberExited;
+import akka.cluster.ClusterEvent.MemberRemoved;
+import akka.cluster.ClusterEvent.MemberUp;
+import akka.cluster.ClusterEvent.ReachableMember;
+import akka.cluster.ClusterEvent.UnreachableMember;
+import akka.cluster.Member;
+import akka.dispatch.OnComplete;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.netconf.topology.NodeManager;
+import org.opendaylight.netconf.topology.RoleChangeStrategy;
+import org.opendaylight.netconf.topology.StateAggregator;
+import org.opendaylight.netconf.topology.TopologyManager;
+import org.opendaylight.netconf.topology.TopologyManagerCallback;
+import org.opendaylight.netconf.topology.TopologyManagerCallback.TopologyManagerCallbackFactory;
+import org.opendaylight.netconf.topology.util.messages.CustomIdentifyMessage;
+import org.opendaylight.netconf.topology.util.messages.CustomIdentifyMessageReply;
+import org.opendaylight.netconf.topology.util.messages.NormalizedNodeMessage;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yangtools.binding.data.codec.impl.BindingNormalizedNodeCodecRegistry;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+import scala.concurrent.impl.Promise.DefaultPromise;
+
+public final class BaseTopologyManager
+        implements TopologyManager {
+
+    private static final Logger LOG = LoggerFactory.getLogger(BaseTopologyManager.class);
+
+    private final ActorSystem system;
+    private final TypedActorExtension typedExtension;
+    private final Cluster clusterExtension;
+
+    private final BindingNormalizedNodeCodecRegistry codecRegistry;
+
+    private static final String PATH = "/user/";
+
+    private final DataBroker dataBroker;
+    private final RoleChangeStrategy roleChangeStrategy;
+    private final StateAggregator aggregator;
+
+    private final NodeWriter naSalNodeWriter;
+    private final String topologyId;
+    private final TopologyManagerCallback delegateTopologyHandler;
+
+    private final Map<NodeId, NodeManager> nodes = new HashMap<>();
+    private final Map<Address, TopologyManager> peers = new HashMap<>();
+    private TopologyManager masterPeer = null;
+    private final int id = new Random().nextInt();
+
+    private boolean isMaster;
+
+    public BaseTopologyManager(final ActorSystem system,
+                               final BindingNormalizedNodeCodecRegistry codecRegistry,
+                               final DataBroker dataBroker,
+                               final String topologyId,
+                               final TopologyManagerCallbackFactory topologyManagerCallbackFactory,
+                               final StateAggregator aggregator,
+                               final NodeWriter naSalNodeWriter,
+                               final RoleChangeStrategy roleChangeStrategy) {
+        this(system, codecRegistry, dataBroker, topologyId, topologyManagerCallbackFactory, aggregator, naSalNodeWriter, roleChangeStrategy, false);
+    }
+
+    public BaseTopologyManager(final ActorSystem system,
+                               final BindingNormalizedNodeCodecRegistry codecRegistry,
+                               final DataBroker dataBroker,
+                               final String topologyId,
+                               final TopologyManagerCallbackFactory topologyManagerCallbackFactory,
+                               final StateAggregator aggregator,
+                               final NodeWriter naSalNodeWriter,
+                               final RoleChangeStrategy roleChangeStrategy,
+                               final boolean isMaster) {
+
+        this.system = system;
+        this.typedExtension = TypedActor.get(system);
+        this.clusterExtension = Cluster.get(system);
+        this.dataBroker = dataBroker;
+        this.topologyId = topologyId;
+        this.delegateTopologyHandler = topologyManagerCallbackFactory.create(system, topologyId);
+        this.aggregator = aggregator;
+        this.naSalNodeWriter = naSalNodeWriter;
+        this.roleChangeStrategy = roleChangeStrategy;
+        this.codecRegistry = codecRegistry;
+
+        // election has not yet happened
+        this.isMaster = isMaster;
+
+        LOG.debug("Base manager started ", +id);
+    }
+
+    @Override
+    public void preStart() {
+        LOG.debug("preStart called");
+        // TODO change to enum, master/slave active/standby
+        roleChangeStrategy.registerRoleCandidate(TypedActor.<BaseTopologyManager>self());
+        LOG.debug("candidate registered");
+        clusterExtension.subscribe(TypedActor.context().self(), ClusterEvent.initialStateAsEvents(), MemberEvent.class, UnreachableMember.class);
+    }
+
+    @Override
+    public void postStop() {
+        LOG.debug("postStop called");
+        clusterExtension.leave(clusterExtension.selfAddress());
+        clusterExtension.unsubscribe(TypedActor.context().self());
+    }
+
+    @Override
+    public ListenableFuture<Node> onNodeCreated(final NodeId nodeId, final Node node) {
+        LOG.debug("TopologyManager({}) onNodeCreated received, nodeid: {} , isMaster: {}", id, nodeId.getValue(), isMaster);
+
+        final ArrayList<ListenableFuture<Node>> futures = new ArrayList<>();
+
+        if (isMaster) {
+
+            futures.add(delegateTopologyHandler.onNodeCreated(nodeId, node));
+            // only master should call connect on peers and aggregate futures
+            for (TopologyManager topologyManager : peers.values()) {
+                // convert binding into NormalizedNode for transfer
+                final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> normalizedNodeEntry = codecRegistry.toNormalizedNode(getNodeIid(topologyId), node);
+
+                LOG.debug("YangInstanceIdentifier {}", normalizedNodeEntry.getKey());
+                LOG.debug("Value {}", normalizedNodeEntry.getValue());
+
+                // add a future into our futures that gets its completion status from the converted scala future
+                final SettableFuture<Node> settableFuture = SettableFuture.create();
+                futures.add(settableFuture);
+                final Future<NormalizedNodeMessage> scalaFuture = topologyManager.onRemoteNodeCreated(new NormalizedNodeMessage(normalizedNodeEntry.getKey(), normalizedNodeEntry.getValue()));
+                scalaFuture.onComplete(new OnComplete<NormalizedNodeMessage>() {
+                    @Override
+                    public void onComplete(Throwable failure, NormalizedNodeMessage success) throws Throwable {
+                        if (failure != null) {
+                            settableFuture.setException(failure);
+                            return;
+                        }
+                        final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
+                                codecRegistry.fromNormalizedNode(success.getIdentifier(), success.getNode());
+                        final Node value = (Node) fromNormalizedNode.getValue();
+
+                        settableFuture.set(value);
+                    }
+                }, TypedActor.context().dispatcher());
+            }
+
+            final ListenableFuture<Node> aggregatedFuture = aggregator.combineCreateAttempts(futures);
+            Futures.addCallback(aggregatedFuture, new FutureCallback<Node>() {
+                @Override
+                public void onSuccess(final Node result) {
+                    LOG.debug("Futures aggregated succesfully");
+                    naSalNodeWriter.init(nodeId, result);
+                }
+
+                @Override
+                public void onFailure(final Throwable t) {
+                    // If the combined connection attempt failed, set the node to connection failed
+                    LOG.debug("Futures aggregation failed");
+                    naSalNodeWriter.update(nodeId, nodes.get(nodeId).getFailedState(nodeId, node));
+                    // FIXME disconnect those which succeeded
+                    // just issue a delete on delegateTopologyHandler that gets handled on lower level
+                }
+            }, TypedActor.context().dispatcher());
+
+            //combine peer futures
+            return aggregatedFuture;
+        }
+
+        // trigger create on this slave
+        return delegateTopologyHandler.onNodeCreated(nodeId, node);
+    }
+
+    @Override
+    public ListenableFuture<Node> onNodeUpdated(final NodeId nodeId, final Node node) {
+        LOG.debug("TopologyManager({}) onNodeUpdated received, nodeid: {}", id, nodeId.getValue());
+
+        final ArrayList<ListenableFuture<Node>> futures = new ArrayList<>();
+
+        // Master needs to trigger onNodeUpdated on peers and combine results
+        if (isMaster) {
+            futures.add(delegateTopologyHandler.onNodeUpdated(nodeId, node));
+            for (TopologyManager topologyManager : peers.values()) {
+                // convert binding into NormalizedNode for transfer
+                final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> normalizedNodeEntry = codecRegistry.toNormalizedNode(getNodeIid(topologyId), node);
+
+                // add a future into our futures that gets its completion status from the converted scala future
+                final SettableFuture<Node> settableFuture = SettableFuture.create();
+                futures.add(settableFuture);
+                final Future<NormalizedNodeMessage> scalaFuture = topologyManager.onRemoteNodeUpdated(new NormalizedNodeMessage(normalizedNodeEntry.getKey(), normalizedNodeEntry.getValue()));
+                scalaFuture.onComplete(new OnComplete<NormalizedNodeMessage>() {
+                    @Override
+                    public void onComplete(Throwable failure, NormalizedNodeMessage success) throws Throwable {
+                        if (failure != null) {
+                            settableFuture.setException(failure);
+                            return;
+                        }
+                        final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
+                                codecRegistry.fromNormalizedNode(success.getIdentifier(), success.getNode());
+                        final Node value = (Node) fromNormalizedNode.getValue();
+
+                        settableFuture.set(value);
+                    }
+                }, TypedActor.context().dispatcher());
+            }
+
+            final ListenableFuture<Node> aggregatedFuture = aggregator.combineUpdateAttempts(futures);
+            Futures.addCallback(aggregatedFuture, new FutureCallback<Node>() {
+                @Override
+                public void onSuccess(final Node result) {
+                    // FIXME make this (writing state data for nodes) optional and customizable
+                    // this should be possible with providing your own NodeWriter implementation, maybe rename this interface?
+                    naSalNodeWriter.update(nodeId, result);
+                }
+
+                @Override
+                public void onFailure(final Throwable t) {
+                    // If the combined connection attempt failed, set the node to connection failed
+                    naSalNodeWriter.update(nodeId, nodes.get(nodeId).getFailedState(nodeId, node));
+                    // FIXME disconnect those which succeeded
+                    // just issue a delete on delegateTopologyHandler that gets handled on lower level
+                }
+            });
+
+            //combine peer futures
+            return aggregatedFuture;
+        }
+
+        // Trigger update on this slave
+        return delegateTopologyHandler.onNodeUpdated(nodeId, node);
+    }
+
+    private static InstanceIdentifier<Node> getNodeIid(final String topologyId) {
+        final InstanceIdentifier<NetworkTopology> networkTopology = InstanceIdentifier.create(NetworkTopology.class);
+        return networkTopology.child(Topology.class, new TopologyKey(new TopologyId(topologyId))).child(Node.class);
+    }
+
+    @Override
+    public ListenableFuture<Void> onNodeDeleted(final NodeId nodeId) {
+        final ArrayList<ListenableFuture<Void>> futures = new ArrayList<>();
+
+        // Master needs to trigger delete on peers and combine results
+        if (isMaster) {
+            futures.add(delegateTopologyHandler.onNodeDeleted(nodeId));
+            for (TopologyManager topologyManager : peers.values()) {
+                // add a future into our futures that gets its completion status from the converted scala future
+                final SettableFuture<Void> settableFuture = SettableFuture.create();
+                futures.add(settableFuture);
+                final Future<Void> scalaFuture = topologyManager.onRemoteNodeDeleted(nodeId);
+                scalaFuture.onComplete(new OnComplete<Void>() {
+                    @Override
+                    public void onComplete(Throwable failure, Void success) throws Throwable {
+                        if (failure != null) {
+                            settableFuture.setException(failure);
+                            return;
+                        }
+
+                        settableFuture.set(success);
+                    }
+                }, TypedActor.context().dispatcher());
+            }
+
+            final ListenableFuture<Void> aggregatedFuture = aggregator.combineDeleteAttempts(futures);
+            Futures.addCallback(aggregatedFuture, new FutureCallback<Void>() {
+                @Override
+                public void onSuccess(final Void result) {
+                    naSalNodeWriter.delete(nodeId);
+                }
+
+                @Override
+                public void onFailure(final Throwable t) {
+                    // FIXME unable to disconnect all the connections, what do we do now ?
+                }
+            });
+
+            return aggregatedFuture;
+        }
+
+        // Trigger delete
+        return delegateTopologyHandler.onNodeDeleted(nodeId);
+    }
+
+    @Nonnull
+    @Override
+    public ListenableFuture<Node> getCurrentStatusForNode(@Nonnull final NodeId nodeId) {
+        return delegateTopologyHandler.getCurrentStatusForNode(nodeId);
+    }
+
+    @Override
+    public void onRoleChanged(final RoleChangeDTO roleChangeDTO) {
+        isMaster = roleChangeDTO.isOwner();
+        delegateTopologyHandler.onRoleChanged(roleChangeDTO);
+        if (isMaster) {
+            LOG.debug("Node {} is master now", clusterExtension.selfAddress());
+            clusterExtension.join(clusterExtension.selfAddress());
+        }
+    }
+
+    @Override
+    public Future<Boolean> isMaster() {
+        return new DefaultPromise<Boolean>().success(isMaster).future();
+    }
+
+    @Override
+    public void notifyNodeStatusChange(final NodeId nodeId) {
+        LOG.debug("Connection status has changed on node {}", nodeId.getValue());
+        if (isMaster) {
+            // grab status from all peers and aggregate
+            final ArrayList<ListenableFuture<Node>> futures = new ArrayList<>();
+            futures.add(delegateTopologyHandler.getCurrentStatusForNode(nodeId));
+            // only master should call connect on peers and aggregate futures
+            for (TopologyManager topologyManager : peers.values()) {
+                // add a future into our futures that gets its completion status from the converted scala future
+                final SettableFuture<Node> settableFuture = SettableFuture.create();
+                futures.add(settableFuture);
+                final Future<NormalizedNodeMessage> scalaFuture = topologyManager.remoteGetCurrentStatusForNode(nodeId);
+                scalaFuture.onComplete(new OnComplete<NormalizedNodeMessage>() {
+                    @Override
+                    public void onComplete(Throwable failure, NormalizedNodeMessage success) throws Throwable {
+                        if (failure != null) {
+                            settableFuture.setException(failure);
+                            return;
+                        }
+                        final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
+                                codecRegistry.fromNormalizedNode(success.getIdentifier(), success.getNode());
+                        final Node value = (Node) fromNormalizedNode.getValue();
+
+                        settableFuture.set(value);
+                    }
+                }, TypedActor.context().dispatcher());
+            }
+
+            final ListenableFuture<Node> aggregatedFuture = aggregator.combineUpdateAttempts(futures);
+            Futures.addCallback(aggregatedFuture, new FutureCallback<Node>() {
+                @Override
+                public void onSuccess(final Node result) {
+                    LOG.debug("Futures aggregated succesfully");
+                    naSalNodeWriter.update(nodeId, result);
+                }
+
+                @Override
+                public void onFailure(final Throwable t) {
+                    // If the combined connection attempt failed, set the node to connection failed
+                    LOG.debug("Futures aggregation failed");
+                    naSalNodeWriter.update(nodeId, nodes.get(nodeId).getFailedState(nodeId, null));
+                    // FIXME disconnect those which succeeded
+                    // just issue a delete on delegateTopologyHandler that gets handled on lower level
+                }
+            });
+            return;
+        }
+        LOG.debug("Not master, forwarding..");
+        for (final TopologyManager manager : peers.values()) {
+            // asynchronously find out which peer is master
+            final Future<Boolean> future = manager.isMaster();
+            future.onComplete(new OnComplete<Boolean>() {
+                @Override
+                public void onComplete(Throwable failure, Boolean success) throws Throwable {
+                    if (failure == null && success) {
+                        LOG.debug("Found master peer");
+                        // forward to master
+                        manager.notifyNodeStatusChange(nodeId);
+                        return;
+                    }
+                    if (failure != null) {
+                        LOG.debug("Retrieving master peer failed, {}", failure);
+                    }
+                }
+            }, TypedActor.context().dispatcher());
+        }
+    }
+
+    @Override
+    public boolean hasAllPeersUp() {
+        LOG.debug("Peers needed: {} Peers up: {}", 2, peers.size());
+        LOG.warn(clusterExtension.state().toString());
+        LOG.warn(peers.toString());
+        return peers.size() == 2;
+    }
+
+    @Override
+    public Future<NormalizedNodeMessage> onRemoteNodeCreated(final NormalizedNodeMessage message) {
+        final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
+                codecRegistry.fromNormalizedNode(message.getIdentifier(), message.getNode());
+        final InstanceIdentifier<Node> iid = (InstanceIdentifier<Node>) fromNormalizedNode.getKey();
+        final Node value = (Node) fromNormalizedNode.getValue();
+
+        LOG.debug("TopologyManager({}) onRemoteNodeCreated received, nodeid: {}", value.getNodeId(), value);
+        final ListenableFuture<Node> nodeListenableFuture = onNodeCreated(value.getNodeId(), value);
+        final DefaultPromise<NormalizedNodeMessage> promise = new DefaultPromise<>();
+        Futures.addCallback(nodeListenableFuture, new FutureCallback<Node>() {
+            @Override
+            public void onSuccess(Node result) {
+                final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry = codecRegistry.toNormalizedNode(iid, result);
+                promise.success(new NormalizedNodeMessage(entry.getKey(), entry.getValue()));
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                promise.failure(t);
+            }
+        });
+
+        return promise.future();
+    }
+
+    @Override
+    public Future<NormalizedNodeMessage> onRemoteNodeUpdated(final NormalizedNodeMessage message) {
+        final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
+                codecRegistry.fromNormalizedNode(message.getIdentifier(), message.getNode());
+        final InstanceIdentifier<Node> iid = (InstanceIdentifier<Node>) fromNormalizedNode.getKey();
+        final Node value = (Node) fromNormalizedNode.getValue();
+
+        LOG.debug("TopologyManager({}) onRemoteNodeUpdated received, nodeid: {}", id, value.getNodeId());
+
+        final ListenableFuture<Node> nodeListenableFuture = onNodeUpdated(value.getNodeId(), value);
+        final DefaultPromise<NormalizedNodeMessage> promise = new DefaultPromise<>();
+        Futures.addCallback(nodeListenableFuture, new FutureCallback<Node>() {
+            @Override
+            public void onSuccess(Node result) {
+                final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry = codecRegistry.toNormalizedNode(iid, result);
+                promise.success(new NormalizedNodeMessage(entry.getKey(), entry.getValue()));
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                promise.failure(t);
+            }
+        });
+        return promise.future();
+    }
+
+    @Override
+    public Future<Void> onRemoteNodeDeleted(final NodeId nodeId) {
+        LOG.debug("TopologyManager({}) onRemoteNodeDeleted received, nodeid: {}", id, nodeId.getValue());
+
+        final ListenableFuture<Void> listenableFuture = onNodeDeleted(nodeId);
+        final DefaultPromise<Void> promise = new DefaultPromise<>();
+        Futures.addCallback(listenableFuture, new FutureCallback<Void>() {
+            @Override
+            public void onSuccess(Void result) {
+                promise.success(null);
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                promise.failure(t);
+            }
+        });
+
+        return promise.future();
+    }
+
+    public Future<NormalizedNodeMessage> remoteGetCurrentStatusForNode(final NodeId nodeId) {
+        LOG.debug("TopologyManager({}) remoteGetCurrentStatusForNode received, nodeid: {}", id, nodeId.getValue());
+
+        final ListenableFuture<Node> listenableFuture = getCurrentStatusForNode(nodeId);
+        final DefaultPromise<NormalizedNodeMessage> promise = new DefaultPromise<>();
+        Futures.addCallback(listenableFuture, new FutureCallback<Node>() {
+            @Override
+            public void onSuccess(Node result) {
+                final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry = codecRegistry.toNormalizedNode(getNodeIid(topologyId), result);
+                promise.success(new NormalizedNodeMessage(entry.getKey(), entry.getValue()));
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                promise.failure(t);
+            }
+        });
+        return promise.future();
+    }
+
+    @Override
+    public void onReceive(final Object message, final ActorRef actorRef) {
+        LOG.debug("message received {}", message);
+        if (message instanceof MemberUp) {
+            final Member member = ((MemberUp) message).member();
+            LOG.info("Member is Up: {}", member);
+            if (member.address().equals(clusterExtension.selfAddress())) {
+                return;
+            }
+
+            final String path = member.address() + PATH + topologyId;
+            LOG.debug("Actor at :{} is resolving topology actor for path {}", clusterExtension.selfAddress(), path);
+
+            clusterExtension.system().actorSelection(path).tell(new CustomIdentifyMessage(clusterExtension.selfAddress()), TypedActor.context().self());
+        } else if (message instanceof MemberExited) {
+            // remove peer
+            final Member member = ((MemberExited) message).member();
+            LOG.info("Member exited cluster: {}", member);
+            peers.remove(member.address());
+        } else if (message instanceof MemberRemoved) {
+            // remove peer
+            final Member member = ((MemberRemoved) message).member();
+            LOG.info("Member was removed from cluster: {}", member);
+            peers.remove(member.address());
+        } else if (message instanceof UnreachableMember) {
+            // remove peer
+            final Member member = ((UnreachableMember) message).member();
+            LOG.info("Member is unreachable: {}", member);
+            peers.remove(member.address());
+        } else if (message instanceof ReachableMember) {
+            // resync peer
+            final Member member = ((ReachableMember) message).member();
+            LOG.info("Member is reachable again: {}", member);
+
+            if (member.address().equals(clusterExtension.selfAddress())) {
+                return;
+            }
+
+            final String path = member.address() + PATH + topologyId;
+            LOG.debug("Actor at :{} is resolving topology actor for path {}", clusterExtension.selfAddress(), path);
+
+            clusterExtension.system().actorSelection(path).tell(new CustomIdentifyMessage(clusterExtension.selfAddress()), TypedActor.context().self());
+        } else if (message instanceof CustomIdentifyMessageReply) {
+            LOG.debug("Received a custom identify reply message from: {}", ((CustomIdentifyMessageReply) message).getAddress());
+            if (!peers.containsKey(((CustomIdentifyMessage) message).getAddress())) {
+                final TopologyManager peer = typedExtension.typedActorOf(new TypedProps<>(TopologyManager.class, BaseTopologyManager.class), actorRef);
+                peers.put(((CustomIdentifyMessageReply) message).getAddress(), peer);
+            }
+        } else if (message instanceof CustomIdentifyMessage) {
+            LOG.debug("Received a custom identify message from: {}", ((CustomIdentifyMessage) message).getAddress());
+            if (!peers.containsKey(((CustomIdentifyMessage) message).getAddress())) {
+                final TopologyManager peer = typedExtension.typedActorOf(new TypedProps<>(TopologyManager.class, BaseTopologyManager.class), actorRef);
+                peers.put(((CustomIdentifyMessage) message).getAddress(), peer);
+            }
+            actorRef.tell(new CustomIdentifyMessageReply(clusterExtension.selfAddress()), TypedActor.context().self());
+        }
+    }
+}
diff --git a/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/util/NodeRoleChangeStrategy.java b/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/util/NodeRoleChangeStrategy.java
new file mode 100644 (file)
index 0000000..55bdb2e
--- /dev/null
@@ -0,0 +1,79 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.util;
+
+import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
+import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidateRegistration;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListenerRegistration;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
+import org.opendaylight.netconf.topology.RoleChangeStrategy;
+import org.opendaylight.netconf.topology.NodeListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NodeRoleChangeStrategy implements RoleChangeStrategy, EntityOwnershipListener {
+
+    private static final Logger LOG = LoggerFactory.getLogger(NodeRoleChangeStrategy.class);
+
+    private final EntityOwnershipService entityOwnershipService;
+    private final String entityType;
+    private final String entityName;
+    private NodeListener ownershipCandidate;
+
+    private EntityOwnershipCandidateRegistration candidateRegistration = null;
+    private EntityOwnershipListenerRegistration ownershipListenerRegistration = null;
+
+    public NodeRoleChangeStrategy(final EntityOwnershipService entityOwnershipService,
+                                  final String entityType,
+                                  final String entityName) {
+        this.entityOwnershipService = entityOwnershipService;
+        this.entityType = entityType + "/" + entityName;
+        this.entityName = entityName;
+    }
+
+    @Override
+    public void registerRoleCandidate(NodeListener electionCandidate) {
+        LOG.debug("Registering role candidate type: {} , name: {}", entityType, entityName);
+        this.ownershipCandidate = electionCandidate;
+        try {
+            if (candidateRegistration != null) {
+                unregisterRoleCandidate();
+            }
+            candidateRegistration = entityOwnershipService.registerCandidate(new Entity(entityType, entityName));
+            ownershipListenerRegistration = entityOwnershipService.registerListener(entityType, this);
+        } catch (CandidateAlreadyRegisteredException e) {
+            LOG.error("Candidate already registered for election", e);
+            throw new IllegalStateException("Candidate already registered for election", e);
+        }
+    }
+
+    @Override
+    public void unregisterRoleCandidate() {
+        LOG.debug("Unregistering role candidate");
+        candidateRegistration.close();
+        candidateRegistration = null;
+        ownershipListenerRegistration.close();
+        ownershipListenerRegistration = null;
+    }
+
+    @Override
+    public void onRoleChanged(RoleChangeDTO roleChangeDTO) {
+        LOG.debug("Role was changed {}", roleChangeDTO);
+        ownershipCandidate.onRoleChanged(roleChangeDTO);
+    }
+
+    @Override
+    public void ownershipChanged(EntityOwnershipChange ownershipChange) {
+        LOG.debug("Ownership has changed {}", ownershipChange);
+        ownershipCandidate.onRoleChanged(new RoleChangeDTO(ownershipChange.wasOwner(), ownershipChange.isOwner(), ownershipChange.hasOwner()));
+    }
+}
diff --git a/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/util/NoopRoleChangeStrategy.java b/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/util/NoopRoleChangeStrategy.java
new file mode 100644 (file)
index 0000000..ab76cc2
--- /dev/null
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.util;
+
+import org.opendaylight.netconf.topology.NodeListener;
+import org.opendaylight.netconf.topology.RoleChangeStrategy;
+
+/**
+ * Use this strategy to override the default roleChange registration's in BaseTopologyManager|BaseNodeManager
+ * If you use this, you will need to execute your own election in your implemented callbacks.
+ */
+public class NoopRoleChangeStrategy implements RoleChangeStrategy {
+
+    @Override
+    public void registerRoleCandidate(NodeListener electionCandidate) {
+
+    }
+
+    @Override
+    public void unregisterRoleCandidate() {
+
+    }
+
+    @Override
+    public void onRoleChanged(RoleChangeDTO roleChangeDTO) {
+
+    }
+}
diff --git a/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/util/SalNodeWriter.java b/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/util/SalNodeWriter.java
new file mode 100644 (file)
index 0000000..8d63e89
--- /dev/null
@@ -0,0 +1,96 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.util;
+
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class SalNodeWriter implements NodeWriter {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SalNodeWriter.class);
+
+    private final DataBroker dataBroker;
+    private final String topologyId;
+
+    private final InstanceIdentifier<Topology> topologyListPath;
+
+    public SalNodeWriter(final DataBroker dataBroker, final String topologyId) {
+        this.dataBroker = dataBroker;
+        this.topologyId = topologyId;
+        this.topologyListPath = createTopologyId(this.topologyId);
+    }
+
+    //FIXME change to txChains
+    @Override public void init(@Nonnull final NodeId id, @Nonnull final Node operationalDataNode) {
+        // put into Datastore
+        final WriteTransaction wTx = dataBroker.newWriteOnlyTransaction();
+        wTx.put(LogicalDatastoreType.OPERATIONAL, createBindingPathForTopology(id), operationalDataNode);
+        commitTransaction(wTx, id, "init");
+    }
+
+    @Override public void update(@Nonnull final NodeId id, @Nonnull final Node operationalDataNode) {
+        // merge
+        final WriteTransaction wTx = dataBroker.newWriteOnlyTransaction();
+        wTx.put(LogicalDatastoreType.OPERATIONAL, createBindingPathForTopology(id), operationalDataNode);
+        commitTransaction(wTx, id, "update");
+    }
+
+    @Override public void delete(@Nonnull final NodeId id) {
+        // delete
+        final WriteTransaction wTx = dataBroker.newWriteOnlyTransaction();
+        wTx.delete(LogicalDatastoreType.OPERATIONAL, createBindingPathForTopology(id));
+        commitTransaction(wTx, id, "delete");
+    }
+
+    private void commitTransaction(final WriteTransaction transaction, final NodeId id, final String txType) {
+        LOG.debug("{}: Committing Transaction {}:{}", id.getValue(), txType,
+                transaction.getIdentifier());
+        final CheckedFuture<Void, TransactionCommitFailedException> result = transaction.submit();
+
+        Futures.addCallback(result, new FutureCallback<Void>() {
+            @Override
+            public void onSuccess(final Void result) {
+                LOG.debug("{}: Transaction({}) {} SUCCESSFUL", id.getValue(), txType,
+                        transaction.getIdentifier());
+            }
+
+            @Override
+            public void onFailure(final Throwable t) {
+                LOG.error("{}: Transaction({}) {} FAILED!", id.getValue(), txType,
+                        transaction.getIdentifier(), t);
+                throw new IllegalStateException(id + "  Transaction(" + txType + ") not committed correctly", t);
+            }
+        });
+    }
+
+    private InstanceIdentifier<Node> createBindingPathForTopology(final NodeId id) {
+        return topologyListPath.child(Node.class, new NodeKey(id));
+    }
+
+    private InstanceIdentifier<Topology> createTopologyId(final String topologyId) {
+        final InstanceIdentifier<NetworkTopology> networkTopology = InstanceIdentifier.create(NetworkTopology.class);
+        return networkTopology.child(Topology.class, new TopologyKey(new TopologyId(topologyId)));
+    }
+}
diff --git a/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/util/TopologyRoleChangeStrategy.java b/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/util/TopologyRoleChangeStrategy.java
new file mode 100644 (file)
index 0000000..de9f7ac
--- /dev/null
@@ -0,0 +1,160 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.util;
+
+import java.util.Collection;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
+import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
+import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidateRegistration;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListenerRegistration;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.netconf.topology.NodeListener;
+import org.opendaylight.netconf.topology.RoleChangeStrategy;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.binding.Identifier;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.PathArgument;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TopologyRoleChangeStrategy implements RoleChangeStrategy, ClusteredDataTreeChangeListener<Node>, EntityOwnershipListener {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TopologyRoleChangeStrategy.class);
+
+    private final DataBroker dataBroker;
+
+    private final EntityOwnershipService entityOwnershipService;
+    private NodeListener ownershipCandidate;
+    private final String entityType;
+    // use topologyId as entityName
+    private final String entityName;
+
+    private EntityOwnershipCandidateRegistration candidateRegistration = null;
+    private EntityOwnershipListenerRegistration ownershipListenerRegistration = null;
+
+    private ListenerRegistration<TopologyRoleChangeStrategy> datastoreListenerRegistration;
+
+    public TopologyRoleChangeStrategy(final DataBroker dataBroker,
+                                      final EntityOwnershipService entityOwnershipService,
+                                      final String entityType,
+                                      final String entityName) {
+        this.dataBroker = dataBroker;
+        this.entityOwnershipService = entityOwnershipService;
+        this.entityType = entityType;
+        this.entityName = entityName;
+
+        datastoreListenerRegistration = null;
+    }
+
+    @Override
+    public void registerRoleCandidate(NodeListener electionCandidate) {
+        LOG.warn("Registering candidate");
+        ownershipCandidate = electionCandidate;
+        try {
+            if (candidateRegistration != null) {
+                unregisterRoleCandidate();
+            }
+            candidateRegistration = entityOwnershipService.registerCandidate(new Entity(entityType, entityName));
+            ownershipListenerRegistration = entityOwnershipService.registerListener(entityType, this);
+        } catch (CandidateAlreadyRegisteredException e) {
+            LOG.error("Candidate already registered for election", e);
+            throw new IllegalStateException("Candidate already registered for election", e);
+        }
+    }
+
+    @Override
+    public void unregisterRoleCandidate() {
+        candidateRegistration.close();
+        candidateRegistration = null;
+        ownershipListenerRegistration.close();
+        ownershipListenerRegistration = null;
+    }
+
+    @Override
+    public void onRoleChanged(RoleChangeDTO roleChangeDTO) {
+        if (roleChangeDTO.isOwner()) {
+            LOG.warn("Gained ownership of entity, registering datastore listener");
+
+            if (datastoreListenerRegistration == null) {
+                LOG.debug("Listener on path {}", createTopologyId(entityType).child(Node.class).getPathArguments());
+                datastoreListenerRegistration = dataBroker.registerDataTreeChangeListener(
+                        new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION, createTopologyId(entityType).child(Node.class)), this);
+            }
+        } else if (datastoreListenerRegistration != null) {
+            LOG.warn("No longer owner of entity, unregistering datastore listener");
+            datastoreListenerRegistration.close();
+            datastoreListenerRegistration = null;
+        }
+        ownershipCandidate.onRoleChanged(roleChangeDTO);
+    }
+
+    @Override
+    public void ownershipChanged(final EntityOwnershipChange ownershipChange) {
+        onRoleChanged(new RoleChangeDTO(ownershipChange.wasOwner(), ownershipChange.isOwner(), ownershipChange.hasOwner()));
+    }
+
+    @Override
+    public void onDataTreeChanged(@Nonnull Collection<DataTreeModification<Node>> changes) {
+        for (DataTreeModification<Node> change : changes) {
+            final DataObjectModification<Node> rootNode = change.getRootNode();
+            switch (rootNode.getModificationType()) {
+                case WRITE:
+                    LOG.debug("Data was Created {}, {}", rootNode.getIdentifier(), rootNode.getDataAfter());
+                    ownershipCandidate.onNodeCreated(getNodeId(rootNode.getIdentifier()), rootNode.getDataAfter());
+                    break;
+                case SUBTREE_MODIFIED:
+                    LOG.debug("Data was Updated {}, {}", rootNode.getIdentifier(), rootNode.getDataAfter());
+                    ownershipCandidate.onNodeUpdated(getNodeId(rootNode.getIdentifier()), rootNode.getDataAfter());
+                    break;
+                case DELETE:
+                    LOG.debug("Data was Deleted {}", rootNode.getIdentifier());
+                    ownershipCandidate.onNodeDeleted(getNodeId(rootNode.getIdentifier()));
+            }
+        }
+    }
+
+    /**
+     * Determines the Netconf Node Node ID, given the node's instance
+     * identifier.
+     *
+     * @param pathArgument Node's path arument
+     * @return     NodeId for the node
+     */
+    private NodeId getNodeId(final PathArgument pathArgument) {
+        if (pathArgument instanceof InstanceIdentifier.IdentifiableItem<?, ?>) {
+
+            final Identifier key = ((InstanceIdentifier.IdentifiableItem) pathArgument).getKey();
+            if(key instanceof NodeKey) {
+                return ((NodeKey) key).getNodeId();
+            }
+        }
+        throw new IllegalStateException("Unable to create NodeId from: " + pathArgument);
+    }
+
+    private static InstanceIdentifier<Topology> createTopologyId(final String topologyId) {
+        final InstanceIdentifier<NetworkTopology> networkTopology = InstanceIdentifier.create(NetworkTopology.class);
+        return networkTopology.child(Topology.class, new TopologyKey(new TopologyId(topologyId)));
+    }
+}