Merge "Stop scheduling the Install Snapshot check"
authorTom Pantelis <tpanteli@brocade.com>
Thu, 12 Feb 2015 21:15:39 +0000 (21:15 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Thu, 12 Feb 2015 21:15:40 +0000 (21:15 +0000)
55 files changed:
opendaylight/commons/opendaylight/pom.xml
opendaylight/config/config-manager/src/main/java/org/opendaylight/controller/config/manager/impl/dynamicmbean/AnnotationsHelper.java
opendaylight/config/config-manager/src/test/java/org/opendaylight/controller/config/manager/impl/util/InterfacesHelperTest.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MessageCollectorActor.java
opendaylight/md-sal/sal-binding-it/pom.xml
opendaylight/md-sal/sal-binding-it/src/main/java/org/opendaylight/controller/test/sal/binding/it/TestHelper.java
opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/akka.conf
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java
opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/DOMDataTreeChangeListener.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/DOMDataTreeChangeService.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/DOMDataTreeIdentifier.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-spi/src/main/java/org/opendaylight/controller/sal/core/spi/data/DOMStoreTreeChangePublisher.java [new file with mode: 0644]
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/ResolveDataChangeEventsTask.java
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/ResolveDataChangeState.java
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/tree/ListenerNode.java [new file with mode: 0644]
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/tree/ListenerTree.java
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/tree/ListenerWalker.java [new file with mode: 0644]
opendaylight/md-sal/sal-netconf-connector/pom.xml
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/config/yang/md/sal/connector/netconf/NetconfConnectorModule.java
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/api/RemoteDevice.java
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NotificationHandler.java
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/listener/NetconfDeviceCommunicator.java
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/listener/NetconfSessionPreferences.java
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/util/NetconfMessageTransformUtil.java
opendaylight/md-sal/sal-netconf-connector/src/main/yang/odl-sal-netconf-connector-cfg.yang
opendaylight/md-sal/sal-netconf-connector/src/test/java/org/opendaylight/controller/sal/connect/netconf/NetconfDeviceTest.java
opendaylight/md-sal/sal-netconf-connector/src/test/java/org/opendaylight/controller/sal/connect/netconf/listener/NetconfDeviceCommunicatorTest.java
opendaylight/netconf/config-netconf-connector/pom.xml
opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/operations/editconfig/EditConfig.java
opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/operations/get/Get.java
opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/operations/getconfig/GetConfig.java
opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/operations/runtimerpc/RuntimeRpc.java
opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/osgi/Activator.java
opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/osgi/NetconfOperationProvider.java
opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/osgi/NetconfOperationServiceFactoryImpl.java
opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/osgi/NetconfOperationServiceImpl.java
opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/osgi/YangStoreContext.java [new file with mode: 0644]
opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/osgi/YangStoreException.java [deleted file]
opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/osgi/YangStoreService.java
opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/osgi/YangStoreServiceImpl.java [deleted file]
opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/osgi/YangStoreSnapshot.java
opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/osgi/YangStoreSnapshotImpl.java [deleted file]
opendaylight/netconf/config-netconf-connector/src/test/java/org/opendaylight/controller/netconf/confignetconfconnector/NetconfMappingTest.java
opendaylight/netconf/config-netconf-connector/src/test/java/org/opendaylight/controller/netconf/confignetconfconnector/operations/editconfig/EditConfigTest.java
opendaylight/netconf/config-netconf-connector/src/test/java/org/opendaylight/controller/netconf/confignetconfconnector/osgi/NetconfOperationServiceImplTest.java [deleted file]
opendaylight/netconf/netconf-connector-config/src/main/resources/initial/99-netconf-connector.xml
opendaylight/netconf/netconf-impl/src/main/java/org/opendaylight/controller/netconf/impl/osgi/NetconfOperationRouterImpl.java
opendaylight/netconf/netconf-it/src/test/java/org/opendaylight/controller/netconf/it/AbstractNetconfConfigTest.java
opendaylight/netconf/netconf-it/src/test/java/org/opendaylight/controller/netconf/it/NetconfITSecureTest.java
opendaylight/networkconfiguration/neutron/northbound/src/main/java/org/opendaylight/controller/networkconfig/neutron/northbound/NeutronNorthboundRSApplication.java
opendaylight/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/NeutronPort.java

index 6a9b4be..6cc363b 100644 (file)
         <artifactId>nagasena-rta</artifactId>
         <version>${exi.nagasena.version}</version>
       </dependency>
-      <dependency>
-        <groupId>org.osgi</groupId>
-        <artifactId>org.osgi.compendium</artifactId>
-        <version>${osgi.compendium.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.osgi</groupId>
-        <artifactId>org.osgi.core</artifactId>
-        <version>${osgi.core.version}</version>
-      </dependency>
       <dependency>
         <groupId>org.reflections</groupId>
         <artifactId>reflections</artifactId>
index efb3574..fba9844 100644 (file)
@@ -7,6 +7,8 @@
  */
 package org.opendaylight.controller.config.manager.impl.dynamicmbean;
 
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableSet.Builder;
 import java.lang.annotation.Annotation;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
@@ -29,9 +31,9 @@ public class AnnotationsHelper {
      * @return list of found annotations
      */
     static <T extends Annotation> List<T> findMethodAnnotationInSuperClassesAndIfcs(
-            final Method setter, Class<T> annotationType,
-            Set<Class<?>> inspectedInterfaces) {
-        List<T> result = new ArrayList<T>();
+            final Method setter, final Class<T> annotationType,
+            final Set<Class<?>> inspectedInterfaces) {
+        Builder<T> result = ImmutableSet.builder();
         Class<?> inspectedClass = setter.getDeclaringClass();
         do {
             try {
@@ -46,7 +48,8 @@ public class AnnotationsHelper {
             } catch (NoSuchMethodException e) {
                 inspectedClass = Object.class; // no need to go further
             }
-        } while (inspectedClass.equals(Object.class) == false);
+        } while (!inspectedClass.equals(Object.class));
+
         // inspect interfaces
         for (Class<?> ifc : inspectedInterfaces) {
             if (ifc.isInterface() == false) {
@@ -63,7 +66,7 @@ public class AnnotationsHelper {
 
             }
         }
-        return result;
+        return new ArrayList<>(result.build());
     }
 
     /**
@@ -74,7 +77,7 @@ public class AnnotationsHelper {
      * @return list of found annotations
      */
     static <T extends Annotation> List<T> findClassAnnotationInSuperClassesAndIfcs(
-            Class<?> clazz, Class<T> annotationType, Set<Class<?>> interfaces) {
+            final Class<?> clazz, final Class<T> annotationType, final Set<Class<?>> interfaces) {
         List<T> result = new ArrayList<T>();
         Class<?> declaringClass = clazz;
         do {
@@ -101,7 +104,7 @@ public class AnnotationsHelper {
      * @return empty string if no annotation is found, or list of descriptions
      *         separated by newline
      */
-    static String aggregateDescriptions(List<Description> descriptions) {
+    static String aggregateDescriptions(final List<Description> descriptions) {
         StringBuilder builder = new StringBuilder();
         for (Description d : descriptions) {
             if (builder.length() != 0) {
index 34039ce..5656163 100644 (file)
@@ -8,7 +8,6 @@
 package org.opendaylight.controller.config.manager.impl.util;
 
 import static org.junit.Assert.assertEquals;
-
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
 import java.util.Collections;
@@ -25,37 +24,37 @@ import org.opendaylight.yangtools.concepts.Identifiable;
 
 public class InterfacesHelperTest {
 
-    interface SuperA {
+    public interface SuperA {
 
     }
 
-    interface SuperBMXBean {
+    public interface SuperBMXBean {
 
     }
 
-    interface SuperC extends SuperA, SuperBMXBean {
+    public interface SuperC extends SuperA, SuperBMXBean {
 
     }
 
-    class SuperClass implements SuperC {
+    public class SuperClass implements SuperC {
 
     }
 
     @MXBean
-    interface SubA {
+    public interface SubA {
 
     }
 
     @ServiceInterfaceAnnotation(value = "a", osgiRegistrationType = SuperA.class, namespace = "n", revision = "r", localName = "l")
-    interface Service extends AbstractServiceInterface{}
+    public interface Service extends AbstractServiceInterface{}
     @ServiceInterfaceAnnotation(value = "b", osgiRegistrationType = SuperC.class, namespace = "n", revision = "r", localName = "l")
-    interface SubService extends Service{}
+    public interface SubService extends Service{}
 
-    abstract class SubClass extends SuperClass implements SubA, Module {
+    public abstract class SubClass extends SuperClass implements SubA, Module {
 
     }
 
-    abstract class SubClassWithService implements SubService, Module {
+    public abstract class SubClassWithService implements SubService, Module {
 
     }
 
index 31464c5..9b6c088 100644 (file)
@@ -26,7 +26,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
 import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
@@ -129,7 +128,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         // Upon election: send initial empty AppendEntries RPCs
         // (heartbeat) to each server; repeat during idle periods to
         // prevent election timeouts (ยง5.2)
-        scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS));
+        sendAppendEntries(0);
     }
 
     /**
@@ -425,18 +424,18 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             context.setCommitIndex(logIndex);
             applyLogToStateMachine(logIndex);
         } else {
-            sendAppendEntries();
+            sendAppendEntries(0);
         }
     }
 
-    private void sendAppendEntries() {
+    private void sendAppendEntries(long timeSinceLastActivityInterval) {
         // Send an AppendEntries to all followers
-        long heartbeatInterval = context.getConfigParams().getHeartBeatInterval().toMillis();
         for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
             final String followerId = e.getKey();
             final FollowerLogInformation followerLogInformation = e.getValue();
             // This checks helps not to send a repeat message to the follower
-            if(followerLogInformation.timeSinceLastActivity() >= heartbeatInterval) {
+            if(!followerLogInformation.isFollowerActive() ||
+                    followerLogInformation.timeSinceLastActivity() >= timeSinceLastActivityInterval) {
                 sendUpdatesToFollower(followerId, followerLogInformation, true);
             }
         }
@@ -661,7 +660,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
     private void sendHeartBeat() {
         if (!followerToLog.isEmpty()) {
-            sendAppendEntries();
+            sendAppendEntries(context.getConfigParams().getHeartBeatInterval().toMillis());
         }
     }
 
index c490cb2..4d33152 100644 (file)
@@ -88,7 +88,8 @@ public class MockRaftActorContext implements RaftActorContext {
 
     public void initReplicatedLog(){
         this.replicatedLog = new SimpleReplicatedLog();
-        this.replicatedLog.append(new MockReplicatedLogEntry(1, 1, new MockPayload("")));
+        this.replicatedLog.append(new MockReplicatedLogEntry(1, 0, new MockPayload("1")));
+        this.replicatedLog.append(new MockReplicatedLogEntry(1, 1, new MockPayload("2")));
     }
 
     @Override public ActorRef actorOf(Props props) {
index 666cea6..3f551b3 100644 (file)
@@ -1,5 +1,8 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 import akka.actor.ActorRef;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
@@ -41,14 +44,16 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
+import org.slf4j.impl.SimpleLogger;
 import scala.concurrent.duration.FiniteDuration;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
 public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
+    static {
+        // This enables trace logging for the tests.
+        System.setProperty(SimpleLogger.LOG_KEY_PREFIX + MockRaftActorContext.class.getName(), "trace");
+    }
+
     private final ActorRef leaderActor =
         getSystem().actorOf(Props.create(DoNothingActor.class));
     private final ActorRef senderActor =
@@ -70,47 +75,50 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
     @Test
     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
         new JavaTestKit(getSystem()) {{
-
             new Within(duration("1 seconds")) {
                 @Override
                 protected void run() {
-
                     ActorRef followerActor = getTestActor();
 
                     MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
 
                     Map<String, String> peerAddresses = new HashMap<>();
 
-                    peerAddresses.put(followerActor.path().toString(),
-                        followerActor.path().toString());
+                    String followerId = "follower";
+                    peerAddresses.put(followerId, followerActor.path().toString());
 
                     actorContext.setPeerAddresses(peerAddresses);
 
+                    long term = 1;
+                    actorContext.getTermInformation().update(term, "");
+
                     Leader leader = new Leader(actorContext);
-                    leader.markFollowerActive(followerActor.path().toString());
-                    Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
-                        TimeUnit.MILLISECONDS);
-                    leader.handleMessage(senderActor, new SendHeartBeat());
 
-                    final String out =
-                        new ExpectMsg<String>(duration("1 seconds"), "match hint") {
-                            // do not put code outside this method, will run afterwards
-                            @Override
-                            protected String match(Object in) {
-                                Object msg = fromSerializableMessage(in);
-                                if (msg instanceof AppendEntries) {
-                                    if (((AppendEntries)msg).getTerm() == 0) {
-                                        return "match";
-                                    }
-                                    return null;
-                                } else {
-                                    throw noMatch();
-                                }
-                            }
-                        }.get(); // this extracts the received message
+                    // Leader should send an immediate heartbeat with no entries as follower is inactive.
+                    long lastIndex = actorContext.getReplicatedLog().lastIndex();
+                    AppendEntries appendEntries = expectMsgClass(duration("5 seconds"), AppendEntries.class);
+                    assertEquals("getTerm", term, appendEntries.getTerm());
+                    assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+                    assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
+                    assertEquals("Entries size", 0, appendEntries.getEntries().size());
 
-                    assertEquals("match", out);
+                    // The follower would normally reply - simulate that explicitly here.
+                    leader.handleMessage(followerActor, new AppendEntriesReply(
+                            followerId, term, true, lastIndex - 1, term));
+                    assertEquals("isFollowerActive", true, leader.getFollower(followerId).isFollowerActive());
+
+                    // Sleep for the heartbeat interval so AppendEntries is sent.
+                    Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
+                            getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
 
+                    leader.handleMessage(senderActor, new SendHeartBeat());
+
+                    appendEntries = expectMsgClass(duration("5 seconds"), AppendEntries.class);
+                    assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
+                    assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
+                    assertEquals("Entries size", 1, appendEntries.getEntries().size());
+                    assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
+                    assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
                 }
             };
         }};
@@ -119,55 +127,51 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
     @Test
     public void testHandleReplicateMessageSendAppendEntriesToFollower() {
         new JavaTestKit(getSystem()) {{
-
             new Within(duration("1 seconds")) {
                 @Override
                 protected void run() {
-
                     ActorRef followerActor = getTestActor();
 
-                    MockRaftActorContext actorContext =
-                        (MockRaftActorContext) createActorContext();
+                    MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
 
                     Map<String, String> peerAddresses = new HashMap<>();
 
-                    peerAddresses.put(followerActor.path().toString(),
-                            followerActor.path().toString());
+                    String followerId = "follower";
+                    peerAddresses.put(followerId, followerActor.path().toString());
 
                     actorContext.setPeerAddresses(peerAddresses);
 
+                    long term = 1;
+                    actorContext.getTermInformation().update(term, "");
+
                     Leader leader = new Leader(actorContext);
-                    leader.markFollowerActive(followerActor.path().toString());
-                    Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
-                        TimeUnit.MILLISECONDS);
-                    RaftActorBehavior raftBehavior = leader
-                        .handleMessage(senderActor, new Replicate(null, null,
-                            new MockRaftActorContext.MockReplicatedLogEntry(1,
-                                100,
-                                new MockRaftActorContext.MockPayload("foo"))
-                        ));
+
+                    // Leader will send an immediate heartbeat - ignore it.
+                    expectMsgClass(duration("5 seconds"), AppendEntries.class);
+
+                    // The follower would normally reply - simulate that explicitly here.
+                    long lastIndex = actorContext.getReplicatedLog().lastIndex();
+                    leader.handleMessage(followerActor, new AppendEntriesReply(
+                            followerId, term, true, lastIndex, term));
+                    assertEquals("isFollowerActive", true, leader.getFollower(followerId).isFollowerActive());
+
+                    MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
+                    MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
+                            1, lastIndex + 1, payload);
+                    actorContext.getReplicatedLog().append(newEntry);
+                    RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
+                            new Replicate(null, null, newEntry));
 
                     // State should not change
                     assertTrue(raftBehavior instanceof Leader);
 
-                    final String out =
-                        new ExpectMsg<String>(duration("1 seconds"), "match hint") {
-                            // do not put code outside this method, will run afterwards
-                            @Override
-                            protected String match(Object in) {
-                                Object msg = fromSerializableMessage(in);
-                                if (msg instanceof AppendEntries) {
-                                    if (((AppendEntries)msg).getTerm() == 0) {
-                                        return "match";
-                                    }
-                                    return null;
-                                } else {
-                                    throw noMatch();
-                                }
-                            }
-                        }.get(); // this extracts the received message
-
-                    assertEquals("match", out);
+                    AppendEntries appendEntries = expectMsgClass(duration("5 seconds"), AppendEntries.class);
+                    assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
+                    assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
+                    assertEquals("Entries size", 1, appendEntries.getEntries().size());
+                    assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
+                    assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
+                    assertEquals("Entry payload", payload, appendEntries.getEntries().get(0).getData());
                 }
             };
         }};
@@ -176,7 +180,6 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
     @Test
     public void testHandleReplicateMessageWhenThereAreNoFollowers() {
         new JavaTestKit(getSystem()) {{
-
             new Within(duration("1 seconds")) {
                 @Override
                 protected void run() {
@@ -282,7 +285,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
             leader.handleMessage(leaderActor, new SendHeartBeat());
 
-            AppendEntries aeproto = (AppendEntries)MessageCollectorActor.getFirstMatching(
+            AppendEntries aeproto = MessageCollectorActor.getFirstMatching(
                 followerActor, AppendEntries.class);
 
             assertNotNull("AppendEntries should be sent even if InstallSnapshotReply is not " +
@@ -297,9 +300,8 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
             leader.handleMessage(senderActor, new SendHeartBeat());
 
-            InstallSnapshotMessages.InstallSnapshot isproto = (InstallSnapshotMessages.InstallSnapshot)
-                MessageCollectorActor.getFirstMatching(followerActor,
-                    InstallSnapshot.SERIALIZABLE_CLASS);
+            InstallSnapshotMessages.InstallSnapshot isproto = MessageCollectorActor.getFirstMatching(followerActor,
+                InstallSnapshot.SERIALIZABLE_CLASS);
 
             assertNotNull("Installsnapshot should get called for sending the next chunk of snapshot",
                 isproto);
@@ -435,7 +437,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
             RaftActorBehavior raftBehavior = leader.handleMessage(
                 leaderActor, new InitiateInstallSnapshot());
 
-            CaptureSnapshot cs = (CaptureSnapshot) MessageCollectorActor.
+            CaptureSnapshot cs = MessageCollectorActor.
                 getFirstMatching(leaderActor, CaptureSnapshot.class);
 
             assertNotNull(cs);
@@ -491,6 +493,9 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
             Leader leader = new Leader(actorContext);
 
+            // Ignore initial heartbeat.
+            expectMsgClass(duration("5 seconds"), AppendEntries.class);
+
             // new entry
             ReplicatedLogImplEntry entry =
                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
@@ -558,6 +563,9 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
             MockLeader leader = new MockLeader(actorContext);
 
+            // Ignore initial heartbeat.
+            expectMsgClass(duration("5 seconds"), AppendEntries.class);
+
             Map<String, String> leadersSnapshot = new HashMap<>();
             leadersSnapshot.put("1", "A");
             leadersSnapshot.put("2", "B");
@@ -734,15 +742,17 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
             leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
 
-            Object o = MessageCollectorActor.getAllMessages(followerActor).get(0);
+            MessageCollectorActor.getAllMatching(followerActor,
+                    InstallSnapshotMessages.InstallSnapshot.class);
 
-            assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
-
-            InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+            InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.getFirstMatching(
+                    followerActor, InstallSnapshotMessages.InstallSnapshot.class);
+            assertNotNull(installSnapshot);
 
             assertEquals(1, installSnapshot.getChunkIndex());
             assertEquals(3, installSnapshot.getTotalChunks());
 
+            followerActor.underlyingActor().clear();
 
             leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
                 followerActor.path().toString(), -1, false));
@@ -752,11 +762,9 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
             leader.handleMessage(leaderActor, new SendHeartBeat());
 
-            o = MessageCollectorActor.getAllMatching(followerActor,InstallSnapshotMessages.InstallSnapshot.class).get(1);
-
-            assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
-
-            installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+            installSnapshot = MessageCollectorActor.getFirstMatching(
+                    followerActor, InstallSnapshotMessages.InstallSnapshot.class);
+            assertNotNull(installSnapshot);
 
             assertEquals(1, installSnapshot.getChunkIndex());
             assertEquals(3, installSnapshot.getTotalChunks());
@@ -769,7 +777,6 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
     public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
         new JavaTestKit(getSystem()) {
             {
-
                 TestActorRef<MessageCollectorActor> followerActor =
                         TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower-chunk");
 
@@ -811,11 +818,9 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
                 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
 
-                Object o = MessageCollectorActor.getAllMessages(followerActor).get(0);
-
-                assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
-
-                InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+                InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.getFirstMatching(
+                        followerActor, InstallSnapshotMessages.InstallSnapshot.class);
+                assertNotNull(installSnapshot);
 
                 assertEquals(1, installSnapshot.getChunkIndex());
                 assertEquals(3, installSnapshot.getTotalChunks());
@@ -823,17 +828,13 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
                 int hashCode = installSnapshot.getData().hashCode();
 
-                leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),followerActor.path().toString(),1,true ));
-
-                Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
-
-                leader.handleMessage(leaderActor, new SendHeartBeat());
+                followerActor.underlyingActor().clear();
 
-                o = MessageCollectorActor.getAllMessages(followerActor).get(1);
-
-                assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
+                leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),followerActor.path().toString(),1,true ));
 
-                installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+                installSnapshot = MessageCollectorActor.getFirstMatching(
+                        followerActor, InstallSnapshotMessages.InstallSnapshot.class);
+                assertNotNull(installSnapshot);
 
                 assertEquals(2, installSnapshot.getChunkIndex());
                 assertEquals(3, installSnapshot.getTotalChunks());
@@ -902,7 +903,12 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
     @Override
     protected RaftActorContext createActorContext(ActorRef actorRef) {
-        return new MockRaftActorContext("test", getSystem(), actorRef);
+        DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+        configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
+        configParams.setElectionTimeoutFactor(100000);
+        MockRaftActorContext context = new MockRaftActorContext("test", getSystem(), actorRef);
+        context.setConfigParams(configParams);
+        return context;
     }
 
     private ByteString toByteString(Map<String, String> state) {
@@ -931,43 +937,41 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
     }
 
     public static class ForwardMessageToBehaviorActor extends MessageCollectorActor {
-        private static AbstractRaftActorBehavior behavior;
-
-        public ForwardMessageToBehaviorActor(){
-
-        }
+        AbstractRaftActorBehavior behavior;
 
         @Override public void onReceive(Object message) throws Exception {
+            if(behavior != null) {
+                behavior.handleMessage(sender(), message);
+            }
+
             super.onReceive(message);
-            behavior.handleMessage(sender(), message);
         }
 
-        public static void setBehavior(AbstractRaftActorBehavior behavior){
-            ForwardMessageToBehaviorActor.behavior = behavior;
+        public static Props props() {
+            return Props.create(ForwardMessageToBehaviorActor.class);
         }
     }
 
     @Test
     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
         new JavaTestKit(getSystem()) {{
-
-            ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+            TestActorRef<ForwardMessageToBehaviorActor> leaderActor = TestActorRef.create(getSystem(),
+                    Props.create(ForwardMessageToBehaviorActor.class));
 
             MockRaftActorContext leaderActorContext =
-                new MockRaftActorContext("leader", getSystem(), leaderActor);
+                    new MockRaftActorContext("leader", getSystem(), leaderActor);
 
-            ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class));
+            TestActorRef<ForwardMessageToBehaviorActor> followerActor = TestActorRef.create(getSystem(),
+                    ForwardMessageToBehaviorActor.props());
 
             MockRaftActorContext followerActorContext =
-                new MockRaftActorContext("follower", getSystem(), followerActor);
+                    new MockRaftActorContext("follower", getSystem(), followerActor);
 
             Follower follower = new Follower(followerActorContext);
-
-            ForwardMessageToBehaviorActor.setBehavior(follower);
+            followerActor.underlyingActor().behavior = follower;
 
             Map<String, String> peerAddresses = new HashMap<>();
-            peerAddresses.put(followerActor.path().toString(),
-                followerActor.path().toString());
+            peerAddresses.put("follower", followerActor.path().toString());
 
             leaderActorContext.setPeerAddresses(peerAddresses);
 
@@ -975,7 +979,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
             //create 3 entries
             leaderActorContext.setReplicatedLog(
-                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+                    new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
 
             leaderActorContext.setCommitIndex(1);
 
@@ -983,37 +987,29 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
             // follower too has the exact same log entries and has the same commit index
             followerActorContext.setReplicatedLog(
-                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+                    new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
 
             followerActorContext.setCommitIndex(1);
 
             Leader leader = new Leader(leaderActorContext);
-            leader.markFollowerActive(followerActor.path().toString());
-
-            Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
-                TimeUnit.MILLISECONDS);
-
-            leader.handleMessage(leaderActor, new SendHeartBeat());
-
-            AppendEntries appendEntries = (AppendEntries) MessageCollectorActor
-                    .getFirstMatching(followerActor, AppendEntries.class);
 
+            AppendEntries appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
             assertNotNull(appendEntries);
 
             assertEquals(1, appendEntries.getLeaderCommit());
-            assertEquals(1, appendEntries.getEntries().get(0).getIndex());
+            assertEquals(0, appendEntries.getEntries().size());
             assertEquals(0, appendEntries.getPrevLogIndex());
 
-            AppendEntriesReply appendEntriesReply =
-                (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
+            AppendEntriesReply appendEntriesReply = MessageCollectorActor.getFirstMatching(
                     leaderActor, AppendEntriesReply.class);
-
             assertNotNull(appendEntriesReply);
 
-            // follower returns its next index
             assertEquals(2, appendEntriesReply.getLogLastIndex());
             assertEquals(1, appendEntriesReply.getLogLastTerm());
 
+            // follower returns its next index
+            assertEquals(2, appendEntriesReply.getLogLastIndex());
+            assertEquals(1, appendEntriesReply.getLogLastTerm());
         }};
     }
 
@@ -1021,68 +1017,83 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
     @Test
     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
         new JavaTestKit(getSystem()) {{
-
-            ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+            TestActorRef<ForwardMessageToBehaviorActor> leaderActor = TestActorRef.create(getSystem(),
+                    Props.create(ForwardMessageToBehaviorActor.class));
 
             MockRaftActorContext leaderActorContext =
-                new MockRaftActorContext("leader", getSystem(), leaderActor);
+                    new MockRaftActorContext("leader", getSystem(), leaderActor);
 
-            ActorRef followerActor = getSystem().actorOf(
-                Props.create(ForwardMessageToBehaviorActor.class));
+            TestActorRef<ForwardMessageToBehaviorActor> followerActor = TestActorRef.create(getSystem(),
+                    ForwardMessageToBehaviorActor.props());
 
             MockRaftActorContext followerActorContext =
-                new MockRaftActorContext("follower", getSystem(), followerActor);
+                    new MockRaftActorContext("follower", getSystem(), followerActor);
 
             Follower follower = new Follower(followerActorContext);
-
-            ForwardMessageToBehaviorActor.setBehavior(follower);
+            followerActor.underlyingActor().behavior = follower;
 
             Map<String, String> peerAddresses = new HashMap<>();
-            peerAddresses.put(followerActor.path().toString(),
-                followerActor.path().toString());
+            peerAddresses.put("follower", followerActor.path().toString());
 
             leaderActorContext.setPeerAddresses(peerAddresses);
 
             leaderActorContext.getReplicatedLog().removeFrom(0);
 
             leaderActorContext.setReplicatedLog(
-                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+                    new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
 
             leaderActorContext.setCommitIndex(1);
 
             followerActorContext.getReplicatedLog().removeFrom(0);
 
             followerActorContext.setReplicatedLog(
-                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+                    new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
 
             // follower has the same log entries but its commit index > leaders commit index
             followerActorContext.setCommitIndex(2);
 
             Leader leader = new Leader(leaderActorContext);
-            leader.markFollowerActive(followerActor.path().toString());
-
-            Thread.sleep(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis());
-
-            leader.handleMessage(leaderActor, new SendHeartBeat());
-
-            AppendEntries appendEntries = (AppendEntries) MessageCollectorActor
-                    .getFirstMatching(followerActor, AppendEntries.class);
 
+            // Initial heartbeat
+            AppendEntries appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
             assertNotNull(appendEntries);
 
             assertEquals(1, appendEntries.getLeaderCommit());
-            assertEquals(1, appendEntries.getEntries().get(0).getIndex());
+            assertEquals(0, appendEntries.getEntries().size());
             assertEquals(0, appendEntries.getPrevLogIndex());
 
-            AppendEntriesReply appendEntriesReply =
-                (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
+            AppendEntriesReply appendEntriesReply = MessageCollectorActor.getFirstMatching(
                     leaderActor, AppendEntriesReply.class);
+            assertNotNull(appendEntriesReply);
+
+            assertEquals(2, appendEntriesReply.getLogLastIndex());
+            assertEquals(1, appendEntriesReply.getLogLastTerm());
+
+            leaderActor.underlyingActor().behavior = leader;
+            leader.handleMessage(followerActor, appendEntriesReply);
+
+            leaderActor.underlyingActor().clear();
+            followerActor.underlyingActor().clear();
+
+            Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+                    TimeUnit.MILLISECONDS);
 
+            leader.handleMessage(leaderActor, new SendHeartBeat());
+
+            appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
+            assertNotNull(appendEntries);
+
+            assertEquals(1, appendEntries.getLeaderCommit());
+            assertEquals(0, appendEntries.getEntries().size());
+            assertEquals(2, appendEntries.getPrevLogIndex());
+
+            appendEntriesReply = MessageCollectorActor.getFirstMatching(leaderActor, AppendEntriesReply.class);
             assertNotNull(appendEntriesReply);
 
             assertEquals(2, appendEntriesReply.getLogLastIndex());
             assertEquals(1, appendEntriesReply.getLogLastTerm());
 
+            assertEquals(1, followerActorContext.getCommitIndex());
         }};
     }
 
@@ -1156,8 +1167,8 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                 assertEquals(2, leaderActorContext.getCommitIndex());
 
                 ApplyLogEntries applyLogEntries =
-                    (ApplyLogEntries) MessageCollectorActor.getFirstMatching(leaderActor,
-                        ApplyLogEntries.class);
+                    MessageCollectorActor.getFirstMatching(leaderActor,
+                    ApplyLogEntries.class);
 
                 assertNotNull(applyLogEntries);
 
@@ -1295,78 +1306,91 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
     @Test
     public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception {
         new JavaTestKit(getSystem()) {{
-
-            ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+            TestActorRef<MessageCollectorActor> leaderActor = TestActorRef.create(getSystem(),
+                    Props.create(MessageCollectorActor.class));
 
             MockRaftActorContext leaderActorContext =
-                new MockRaftActorContext("leader", getSystem(), leaderActor);
+                    new MockRaftActorContext("leader", getSystem(), leaderActor);
 
             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
-            configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
+            //configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
             configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
 
             leaderActorContext.setConfigParams(configParams);
 
-            ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class));
+            TestActorRef<ForwardMessageToBehaviorActor> followerActor = TestActorRef.create(getSystem(),
+                    ForwardMessageToBehaviorActor.props());
 
             MockRaftActorContext followerActorContext =
-                new MockRaftActorContext("follower-reply", getSystem(), followerActor);
+                    new MockRaftActorContext("follower-reply", getSystem(), followerActor);
 
             followerActorContext.setConfigParams(configParams);
 
             Follower follower = new Follower(followerActorContext);
-
-            ForwardMessageToBehaviorActor.setBehavior(follower);
+            followerActor.underlyingActor().behavior = follower;
 
             Map<String, String> peerAddresses = new HashMap<>();
             peerAddresses.put("follower-reply",
-                followerActor.path().toString());
+                    followerActor.path().toString());
 
             leaderActorContext.setPeerAddresses(peerAddresses);
 
             leaderActorContext.getReplicatedLog().removeFrom(0);
+            leaderActorContext.setCommitIndex(-1);
+            leaderActorContext.setLastApplied(-1);
 
-            //create 3 entries
-            leaderActorContext.setReplicatedLog(
-                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
-
-            leaderActorContext.setCommitIndex(1);
+            followerActorContext.getReplicatedLog().removeFrom(0);
+            followerActorContext.setCommitIndex(-1);
+            followerActorContext.setLastApplied(-1);
 
             Leader leader = new Leader(leaderActorContext);
-            leader.markFollowerActive("follower-reply");
+
+            AppendEntriesReply appendEntriesReply = MessageCollectorActor.getFirstMatching(
+                    leaderActor, AppendEntriesReply.class);
+            assertNotNull(appendEntriesReply);
+            System.out.println("appendEntriesReply: "+appendEntriesReply);
+            leader.handleMessage(followerActor, appendEntriesReply);
+
+            // Clear initial heartbeat messages
+
+            leaderActor.underlyingActor().clear();
+            followerActor.underlyingActor().clear();
+
+            // create 3 entries
+            leaderActorContext.setReplicatedLog(
+                    new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+            leaderActorContext.setCommitIndex(1);
+            leaderActorContext.setLastApplied(1);
 
             Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
-                TimeUnit.MILLISECONDS);
+                    TimeUnit.MILLISECONDS);
 
             leader.handleMessage(leaderActor, new SendHeartBeat());
 
-            AppendEntries appendEntries = (AppendEntries) ForwardMessageToBehaviorActor
-                .getFirstMatching(followerActor, AppendEntries.class);
-
+            AppendEntries appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
             assertNotNull(appendEntries);
 
+            // Should send first log entry
             assertEquals(1, appendEntries.getLeaderCommit());
-            assertEquals(1, appendEntries.getEntries().get(0).getIndex());
-            assertEquals(0, appendEntries.getPrevLogIndex());
-
-            AppendEntriesReply appendEntriesReply =
-                (AppendEntriesReply)ForwardMessageToBehaviorActor.getFirstMatching(leaderActor, AppendEntriesReply.class);
+            assertEquals(0, appendEntries.getEntries().get(0).getIndex());
+            assertEquals(-1, appendEntries.getPrevLogIndex());
 
+            appendEntriesReply = MessageCollectorActor.getFirstMatching(leaderActor, AppendEntriesReply.class);
             assertNotNull(appendEntriesReply);
 
-            leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
-
-            List<Object> entries = ForwardMessageToBehaviorActor
-                .getAllMatching(followerActor, AppendEntries.class);
+            assertEquals(1, appendEntriesReply.getLogLastTerm());
+            assertEquals(0, appendEntriesReply.getLogLastIndex());
 
-            assertEquals("AppendEntries count should be 2 ", 2, entries.size());
+            followerActor.underlyingActor().clear();
 
-            AppendEntries appendEntriesSecond = (AppendEntries) entries.get(1);
+            leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
 
-            assertEquals(1, appendEntriesSecond.getLeaderCommit());
-            assertEquals(2, appendEntriesSecond.getEntries().get(0).getIndex());
-            assertEquals(1, appendEntriesSecond.getPrevLogIndex());
+            appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
+            assertNotNull(appendEntries);
 
+            // Should send second log entry
+            assertEquals(1, appendEntries.getLeaderCommit());
+            assertEquals(1, appendEntries.getEntries().get(0).getIndex());
         }};
     }
 
index 3469a95..c5acb1f 100644 (file)
@@ -13,6 +13,7 @@ import akka.actor.UntypedActor;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
 import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Uninterruptibles;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
@@ -23,7 +24,7 @@ import scala.concurrent.duration.FiniteDuration;
 
 
 public class MessageCollectorActor extends UntypedActor {
-    private List<Object> messages = new ArrayList<>();
+    private final List<Object> messages = new ArrayList<>();
 
     @Override public void onReceive(Object message) throws Exception {
         if(message instanceof String){
@@ -35,6 +36,10 @@ public class MessageCollectorActor extends UntypedActor {
         }
     }
 
+    public void clear() {
+        messages.clear();
+    }
+
     public static List<Object> getAllMessages(ActorRef actor) throws Exception {
         FiniteDuration operationDuration = Duration.create(5, TimeUnit.SECONDS);
         Timeout operationTimeout = new Timeout(operationDuration);
@@ -53,13 +58,17 @@ public class MessageCollectorActor extends UntypedActor {
      * @param clazz
      * @return
      */
-    public static Object getFirstMatching(ActorRef actor, Class<?> clazz) throws Exception {
-        List<Object> allMessages = getAllMessages(actor);
+    public static <T> T getFirstMatching(ActorRef actor, Class<T> clazz) throws Exception {
+        for(int i = 0; i < 50; i++) {
+            List<Object> allMessages = getAllMessages(actor);
 
-        for(Object message : allMessages){
-            if(message.getClass().equals(clazz)){
-                return message;
+            for(Object message : allMessages){
+                if(message.getClass().equals(clazz)){
+                    return (T) message;
+                }
             }
+
+            Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
         }
 
         return null;
index 491e5dc..7c6710f 100644 (file)
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>netconf-monitoring</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>netconf-notifications-api</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>sal-binding-broker-impl</artifactId>
@@ -77,6 +81,7 @@
       <groupId>org.opendaylight.yangtools.thirdparty</groupId>
       <artifactId>antlr4-runtime-osgi-nohead</artifactId>
     </dependency>
+
     <!--Compile scopes for all testing dependencies are intentional-->
     <!--This way, all testing dependencies can be transitively used by other integration test modules-->
     <!--If the dependencies are test scoped, they are not visible to other maven modules depending on sal-binding-it-->
index a64e360..96f52bd 100644 (file)
@@ -83,6 +83,9 @@ public class TestHelper {
                 mavenBundle("org.eclipse.birt.runtime.3_7_1", "org.apache.xml.resolver", "1.2.0"),
 
                 mavenBundle(CONTROLLER, "config-netconf-connector").versionAsInProject(), //
+                mavenBundle(CONTROLLER, "netconf-notifications-api").versionAsInProject(), //
+                mavenBundle(CONTROLLER, "ietf-netconf").versionAsInProject(), //
+                mavenBundle(CONTROLLER, "ietf-netconf-notifications").versionAsInProject(), //
                 mavenBundle(CONTROLLER, "netconf-impl").versionAsInProject(), //
 
                 mavenBundle(CONTROLLER, "config-persister-file-xml-adapter").versionAsInProject().noStart(),
index f196ad1..10f6a42 100644 (file)
@@ -39,7 +39,7 @@ odl-cluster-data {
     cluster {
       seed-nodes = ["akka.tcp://opendaylight-cluster-data@127.0.0.1:2550"]
 
-      auto-down-unreachable-after = 10s
+      auto-down-unreachable-after = 300s
 
       roles = [
         "member-1"
@@ -77,7 +77,7 @@ odl-cluster-rpc {
     cluster {
       seed-nodes = ["akka.tcp://odl-cluster-rpc@127.0.0.1:2551"]
 
-      auto-down-unreachable-after = 10s
+      auto-down-unreachable-after = 300s
     }
   }
 }
index 8a37dfe..1e2386a 100644 (file)
@@ -124,29 +124,20 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering
 
     protected void readData(DOMStoreReadTransaction transaction, ReadData message,
             final boolean returnSerialized) {
-        final ActorRef sender = getSender();
-        final ActorRef self = getSelf();
-        final YangInstanceIdentifier path = message.getPath();
-        final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
-                transaction.read(path);
-
-        future.addListener(new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    Optional<NormalizedNode<?, ?>> optional = future.checkedGet();
-                    ReadDataReply readDataReply = new ReadDataReply(optional.orNull());
 
-                    sender.tell((returnSerialized ? readDataReply.toSerializable(clientTxVersion):
-                        readDataReply), self);
+        final YangInstanceIdentifier path = message.getPath();
+        try {
+            final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future = transaction.read(path);
+            Optional<NormalizedNode<?, ?>> optional = future.checkedGet();
+            ReadDataReply readDataReply = new ReadDataReply(optional.orNull());
 
-                } catch (Exception e) {
-                    shardStats.incrementFailedReadTransactionsCount();
-                    sender.tell(new akka.actor.Status.Failure(e), self);
-                }
+            sender().tell((returnSerialized ? readDataReply.toSerializable(clientTxVersion): readDataReply), self());
 
-            }
-        }, getContext().dispatcher());
+        } catch (Exception e) {
+            LOG.error(String.format("Unexpected error reading path %s", path), e);
+            shardStats.incrementFailedReadTransactionsCount();
+            sender().tell(new akka.actor.Status.Failure(e), self());
+        }
     }
 
     protected void dataExists(DOMStoreReadTransaction transaction, DataExists message,
diff --git a/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/DOMDataTreeChangeListener.java b/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/DOMDataTreeChangeListener.java
new file mode 100644 (file)
index 0000000..2578790
--- /dev/null
@@ -0,0 +1,46 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.api;
+
+import java.util.Collection;
+import java.util.EventListener;
+import javax.annotation.Nonnull;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+
+/**
+ * Interface implemented by classes interested in receiving notifications about
+ * data tree changes. This interface differs from {@link DOMDataChangeListener}
+ * in that it provides a cursor-based view of the change, which has potentially
+ * lower overhead.
+ */
+public interface DOMDataTreeChangeListener extends EventListener {
+    /**
+     * Invoked when there was data change for the supplied path, which was used
+     * to register this listener.
+     *
+     * <p>
+     * This method may be also invoked during registration of the listener if
+     * there is any pre-existing data in the conceptual data tree for supplied
+     * path. This initial event will contain all pre-existing data as created.
+     *
+     * <p>
+     * A data change event may be triggered spuriously, e.g. such that data before
+     * and after compare as equal. Implementations of this interface are expected
+     * to recover from such events. Event producers are expected to exert reasonable
+     * effort to suppress such events.
+     *
+     * In other words, it is completely acceptable to observe
+     * a {@link org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode},
+     * which reports a {@link org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType}
+     * other than UNMODIFIED, while the before- and after- data items compare as
+     * equal.
+     *
+     * @param changes Collection of change events, may not be null or empty.
+     */
+    void onDataTreeChanged(@Nonnull Collection<DataTreeCandidate> changes);
+}
diff --git a/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/DOMDataTreeChangeService.java b/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/DOMDataTreeChangeService.java
new file mode 100644 (file)
index 0000000..e001dbb
--- /dev/null
@@ -0,0 +1,54 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.api;
+
+import javax.annotation.Nonnull;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+
+/**
+ * A {@link DOMService} which allows users to register for changes to a
+ * subtree.
+ */
+public interface DOMDataTreeChangeService extends DOMService {
+    /**
+     * Registers a {@link DOMDataTreeChangeListener} to receive
+     * notifications when data changes under a given path in the conceptual data
+     * tree.
+     * <p>
+     * You are able to register for notifications  for any node or subtree
+     * which can be represented using {@link DOMDataTreeIdentifier}.
+     * <p>
+     *
+     * You are able to register for data change notifications for a subtree or leaf
+     * even if it does not exist. You will receive notification once that node is
+     * created.
+     * <p>
+     * If there is any pre-existing data in the data tree for the path for which you are
+     * registering, you will receive an initial data change event, which will
+     * contain all pre-existing data, marked as created.
+     *
+     * <p>
+     * This method returns a {@link ListenerRegistration} object. To
+     * "unregister" your listener for changes call the {@link ListenerRegistration#close()}
+     * method on the returned object.
+     * <p>
+     * You MUST explicitly unregister your listener when you no longer want to receive
+     * notifications. This is especially true in OSGi environments, where failure to
+     * do so during bundle shutdown can lead to stale listeners being still registered.
+     *
+     * @param treeId
+     *            Data tree identifier of the subtree which should be watched for
+     *            changes.
+     * @param listener
+     *            Listener instance which is being registered
+     * @return Listener registration object, which may be used to unregister
+     *         your listener using {@link ListenerRegistration#close()} to stop
+     *         delivery of change events.
+     */
+    @Nonnull <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerDataTreeChangeListener(@Nonnull DOMDataTreeIdentifier treeId, @Nonnull L listener);
+}
diff --git a/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/DOMDataTreeIdentifier.java b/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/DOMDataTreeIdentifier.java
new file mode 100644 (file)
index 0000000..7370ebe
--- /dev/null
@@ -0,0 +1,77 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.api;
+
+import com.google.common.base.Preconditions;
+import java.io.Serializable;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.yangtools.concepts.Immutable;
+import org.opendaylight.yangtools.concepts.Path;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+/**
+ * A unique identifier for a particular subtree. It is composed of the logical
+ * data store type and the instance identifier of the root node.
+ */
+public final class DOMDataTreeIdentifier implements Immutable, Path<DOMDataTreeIdentifier>, Serializable {
+    private static final long serialVersionUID = 1L;
+    private final YangInstanceIdentifier rootIdentifier;
+    private final LogicalDatastoreType datastoreType;
+
+    public DOMDataTreeIdentifier(final LogicalDatastoreType datastoreType, final YangInstanceIdentifier rootIdentifier) {
+        this.datastoreType = Preconditions.checkNotNull(datastoreType);
+        this.rootIdentifier = Preconditions.checkNotNull(rootIdentifier);
+    }
+
+    /**
+     * Return the logical data store type.
+     *
+     * @return Logical data store type. Guaranteed to be non-null.
+     */
+    public @Nonnull LogicalDatastoreType getDatastoreType() {
+        return datastoreType;
+    }
+
+    /**
+     * Return the {@link YangInstanceIdentifier} of the root node.
+     *
+     * @return Instance identifier corresponding to the root node.
+     */
+    public @Nonnull YangInstanceIdentifier getRootIdentifier() {
+        return rootIdentifier;
+    }
+
+    @Override
+    public boolean contains(final DOMDataTreeIdentifier other) {
+        return datastoreType == other.datastoreType && rootIdentifier.contains(other.rootIdentifier);
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + datastoreType.hashCode();
+        result = prime * result + rootIdentifier.hashCode();
+        return result;
+    }
+
+    @Override
+    public boolean equals(final Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (!(obj instanceof DOMDataTreeIdentifier)) {
+            return false;
+        }
+        DOMDataTreeIdentifier other = (DOMDataTreeIdentifier) obj;
+        if (datastoreType != other.datastoreType) {
+            return false;
+        }
+        return rootIdentifier.equals(other.rootIdentifier);
+    }
+}
diff --git a/opendaylight/md-sal/sal-dom-spi/src/main/java/org/opendaylight/controller/sal/core/spi/data/DOMStoreTreeChangePublisher.java b/opendaylight/md-sal/sal-dom-spi/src/main/java/org/opendaylight/controller/sal/core/spi/data/DOMStoreTreeChangePublisher.java
new file mode 100644 (file)
index 0000000..5d75f88
--- /dev/null
@@ -0,0 +1,56 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.core.spi.data;
+
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+/**
+ * Interface implemented by DOMStore implementations which allow registration
+ * of {@link DOMDataTreeChangeListener} instances.
+ */
+public interface DOMStoreTreeChangePublisher {
+    /**
+     * Registers a {@link DOMDataTreeChangeListener} to receive
+     * notifications when data changes under a given path in the conceptual data
+     * tree.
+     * <p>
+     * You are able to register for notifications  for any node or subtree
+     * which can be represented using {@link YangInstanceIdentifier}.
+     * <p>
+     *
+     * You are able to register for data change notifications for a subtree or leaf
+     * even if it does not exist. You will receive notification once that node is
+     * created.
+     * <p>
+     * If there is any pre-existing data in data tree on path for which you are
+     * registering, you will receive initial data change event, which will
+     * contain all pre-existing data, marked as created.
+     *
+     * <p>
+     * This method returns a {@link ListenerRegistration} object. To
+     * "unregister" your listener for changes call the {@link ListenerRegistration#close()}
+     * method on this returned object.
+     * <p>
+     * You MUST explicitly unregister your listener when you no longer want to receive
+     * notifications. This is especially true in OSGi environments, where failure to
+     * do so during bundle shutdown can lead to stale listeners being still registered.
+     *
+     * @param treeId
+     *            Data tree identifier of the subtree which should be watched for
+     *            changes.
+     * @param listener
+     *            Listener instance which is being registered
+     * @return Listener registration object, which may be used to unregister
+     *         your listener using {@link ListenerRegistration#close()} to stop
+     *         delivery of change events.
+     */
+    @Nonnull <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerTreeChangeListener(@Nonnull YangInstanceIdentifier treeId, @Nonnull L listener);
+}
index 25ddbf5..14d565c 100644 (file)
@@ -17,7 +17,7 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataCh
 import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent.Builder;
 import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent.SimpleEventFactory;
 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree;
-import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree.Walker;
+import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerWalker;
 import org.opendaylight.yangtools.util.concurrent.NotificationManager;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -51,7 +51,7 @@ final class ResolveDataChangeEventsTask {
      * Resolves and submits notification tasks to the specified manager.
      */
     public synchronized void resolve(final NotificationManager<DataChangeListenerRegistration<?>, DOMImmutableDataChangeEvent> manager) {
-        try (final Walker w = listenerRoot.getWalker()) {
+        try (final ListenerWalker w = listenerRoot.getWalker()) {
             // Defensive: reset internal state
             collectedEvents = ArrayListMultimap.create();
 
index 3db4115..6bbed57 100644 (file)
@@ -7,11 +7,6 @@
  */
 package org.opendaylight.controller.md.sal.dom.store.impl;
 
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Multimap;
-
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -22,7 +17,7 @@ import java.util.Map.Entry;
 
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent.Builder;
-import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree.Node;
+import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerNode;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
@@ -32,6 +27,11 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Multimap;
+
 /**
  * Recursion state used in {@link ResolveDataChangeEventsTask}. Instances of this
  * method track which listeners are affected by a particular change node. It takes
@@ -49,7 +49,7 @@ final class ResolveDataChangeState {
      */
     private final Collection<Builder> inheritedOne;
     private final YangInstanceIdentifier nodeId;
-    private final Collection<Node> nodes;
+    private final Collection<ListenerNode> nodes;
 
     private final Map<DataChangeListenerRegistration<?>, Builder> subBuilders;
     private final Map<DataChangeListenerRegistration<?>, Builder> oneBuilders;
@@ -57,7 +57,7 @@ final class ResolveDataChangeState {
 
     private ResolveDataChangeState(final YangInstanceIdentifier nodeId,
             final Iterable<Builder> inheritedSub, final Collection<Builder> inheritedOne,
-            final Collection<Node> nodes) {
+            final Collection<ListenerNode> nodes) {
         this.nodeId = Preconditions.checkNotNull(nodeId);
         this.nodes = Preconditions.checkNotNull(nodes);
         this.inheritedSub = Preconditions.checkNotNull(inheritedSub);
@@ -69,7 +69,7 @@ final class ResolveDataChangeState {
         final Map<DataChangeListenerRegistration<?>, Builder> sub = new HashMap<>();
         final Map<DataChangeListenerRegistration<?>, Builder> one = new HashMap<>();
         final Map<DataChangeListenerRegistration<?>, Builder> base = new HashMap<>();
-        for (Node n : nodes) {
+        for (ListenerNode n : nodes) {
             for (DataChangeListenerRegistration<?> l : n.getListeners()) {
                 final Builder b = DOMImmutableDataChangeEvent.builder(DataChangeScope.BASE);
                 switch (l.getScope()) {
@@ -105,7 +105,7 @@ final class ResolveDataChangeState {
      * @param root root node
      * @return
      */
-    public static ResolveDataChangeState initial(final YangInstanceIdentifier rootId, final Node root) {
+    public static ResolveDataChangeState initial(final YangInstanceIdentifier rootId, final ListenerNode root) {
         return new ResolveDataChangeState(rootId, Collections.<Builder>emptyList(),
             Collections.<Builder>emptyList(), Collections.singletonList(root));
     }
@@ -257,13 +257,13 @@ final class ResolveDataChangeState {
         LOG.trace("Collected events {}", map);
     }
 
-    private static Collection<Node> getListenerChildrenWildcarded(final Collection<Node> parentNodes,
+    private static Collection<ListenerNode> getListenerChildrenWildcarded(final Collection<ListenerNode> parentNodes,
             final PathArgument child) {
         if (parentNodes.isEmpty()) {
             return Collections.emptyList();
         }
 
-        final List<Node> result = new ArrayList<>();
+        final List<ListenerNode> result = new ArrayList<>();
         if (child instanceof NodeWithValue || child instanceof NodeIdentifierWithPredicates) {
             NodeIdentifier wildcardedIdentifier = new NodeIdentifier(child.getNodeType());
             addChildNodes(result, parentNodes, wildcardedIdentifier);
@@ -272,9 +272,9 @@ final class ResolveDataChangeState {
         return result;
     }
 
-    private static void addChildNodes(final List<Node> result, final Collection<Node> parentNodes, final PathArgument childIdentifier) {
-        for (Node node : parentNodes) {
-            Optional<Node> child = node.getChild(childIdentifier);
+    private static void addChildNodes(final List<ListenerNode> result, final Collection<ListenerNode> parentNodes, final PathArgument childIdentifier) {
+        for (ListenerNode node : parentNodes) {
+            Optional<ListenerNode> child = node.getChild(childIdentifier);
             if (child.isPresent()) {
                 result.add(child.get());
             }
diff --git a/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/tree/ListenerNode.java b/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/tree/ListenerNode.java
new file mode 100644 (file)
index 0000000..0aef142
--- /dev/null
@@ -0,0 +1,106 @@
+/**
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.store.impl.tree;
+
+import com.google.common.base.Optional;
+import java.lang.ref.Reference;
+import java.lang.ref.WeakReference;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration;
+import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree.DataChangeListenerRegistrationImpl;
+import org.opendaylight.yangtools.concepts.Identifiable;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.StoreTreeNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is a single node within the listener tree. Note that the data returned from
+ * and instance of this class is guaranteed to have any relevance or consistency
+ * only as long as the {@link ListenerWalker} instance through which it is reached remains
+ * unclosed.
+ *
+ * @author Robert Varga
+ */
+public class ListenerNode implements StoreTreeNode<ListenerNode>, Identifiable<PathArgument> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ListenerNode.class);
+
+    private final Collection<DataChangeListenerRegistration<?>> listeners = new ArrayList<>();
+    private final Map<PathArgument, ListenerNode> children = new HashMap<>();
+    private final PathArgument identifier;
+    private final Reference<ListenerNode> parent;
+
+    ListenerNode(final ListenerNode parent, final PathArgument identifier) {
+        this.parent = new WeakReference<>(parent);
+        this.identifier = identifier;
+    }
+
+    @Override
+    public PathArgument getIdentifier() {
+        return identifier;
+    }
+
+    @Override
+    public Optional<ListenerNode> getChild(final PathArgument child) {
+        return Optional.fromNullable(children.get(child));
+    }
+
+    /**
+     * Return the list of current listeners. This collection is guaranteed
+     * to be immutable only while the walker, through which this node is
+     * reachable remains unclosed.
+     *
+     * @return the list of current listeners
+     */
+    public Collection<DataChangeListenerRegistration<?>> getListeners() {
+        return listeners;
+    }
+
+    ListenerNode ensureChild(final PathArgument child) {
+        ListenerNode potential = children.get(child);
+        if (potential == null) {
+            potential = new ListenerNode(this, child);
+            children.put(child, potential);
+        }
+        return potential;
+    }
+
+    void addListener(final DataChangeListenerRegistration<?> listener) {
+        listeners.add(listener);
+        LOG.debug("Listener {} registered", listener);
+    }
+
+    void removeListener(final DataChangeListenerRegistrationImpl<?> listener) {
+        listeners.remove(listener);
+        LOG.debug("Listener {} unregistered", listener);
+
+        // We have been called with the write-lock held, so we can perform some cleanup.
+        removeThisIfUnused();
+    }
+
+    private void removeThisIfUnused() {
+        final ListenerNode p = parent.get();
+        if (p != null && listeners.isEmpty() && children.isEmpty()) {
+            p.removeChild(identifier);
+        }
+    }
+
+    private void removeChild(final PathArgument arg) {
+        children.remove(arg);
+        removeThisIfUnused();
+    }
+
+    @Override
+    public String toString() {
+        return "Node [identifier=" + identifier + ", listeners=" + listeners.size() + ", children=" + children.size() + "]";
+    }
+}
index ac7a318..dcff643 100644 (file)
@@ -7,41 +7,29 @@
  */
 package org.opendaylight.controller.md.sal.dom.store.impl.tree;
 
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-
-import java.lang.ref.Reference;
-import java.lang.ref.WeakReference;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import javax.annotation.concurrent.GuardedBy;
-
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration;
 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
-import org.opendaylight.yangtools.concepts.Identifiable;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.StoreTreeNode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * A set of listeners organized as a tree by node to which they listen. This class
  * allows for efficient lookup of listeners when we walk the DataTreeCandidate.
+ *
+ * @author Robert Varga
  */
 public final class ListenerTree  {
     private static final Logger LOG = LoggerFactory.getLogger(ListenerTree.class);
     private final ReadWriteLock rwLock = new ReentrantReadWriteLock(true);
-    private final Node rootNode = new Node(null, null);
+    private final ListenerNode rootNode = new ListenerNode(null, null);
 
     private ListenerTree() {
         // Private to disallow direct instantiation
@@ -71,12 +59,12 @@ public final class ListenerTree  {
         rwLock.writeLock().lock();
 
         try {
-            Node walkNode = rootNode;
+            ListenerNode walkNode = rootNode;
             for (final PathArgument arg : path.getPathArguments()) {
                 walkNode = walkNode.ensureChild(arg);
             }
 
-            final Node node = walkNode;
+            final ListenerNode node = walkNode;
             DataChangeListenerRegistration<L> reg = new DataChangeListenerRegistrationImpl<L>(listener) {
                 @Override
                 public DataChangeScope getScope() {
@@ -128,7 +116,7 @@ public final class ListenerTree  {
      *
      * @return A walker instance.
      */
-    public Walker getWalker() {
+    public ListenerWalker getWalker() {
         /*
          * TODO: The only current user of this method is local to the datastore.
          *       Since this class represents a read-lock, losing a reference to
@@ -137,127 +125,12 @@ public final class ListenerTree  {
          *       external user exist, make the Walker a phantom reference, which
          *       will cleanup the lock if not told to do so.
          */
-        final Walker ret = new Walker(rwLock.readLock(), rootNode);
+        final ListenerWalker ret = new ListenerWalker(rwLock.readLock(), rootNode);
         rwLock.readLock().lock();
         return ret;
     }
 
-    /**
-     * A walking context, pretty much equivalent to an iterator, but it
-     * exposes the underlying tree structure.
-     */
-    /*
-     * FIXME: BUG-1511: split this class out as ListenerWalker.
-     */
-    public static final class Walker implements AutoCloseable {
-        private final Lock lock;
-        private final Node node;
-
-        @GuardedBy("this")
-        private boolean valid = true;
-
-        private Walker(final Lock lock, final Node node) {
-            this.lock = Preconditions.checkNotNull(lock);
-            this.node = Preconditions.checkNotNull(node);
-        }
-
-        public Node getRootNode() {
-            return node;
-        }
-
-        @Override
-        public synchronized void close() {
-            if (valid) {
-                lock.unlock();
-                valid = false;
-            }
-        }
-    }
-
-    /**
-     * This is a single node within the listener tree. Note that the data returned from
-     * and instance of this class is guaranteed to have any relevance or consistency
-     * only as long as the {@link org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree.Walker} instance through which it is reached remains
-     * unclosed.
-     */
-    /*
-     * FIXME: BUG-1511: split this class out as ListenerNode.
-     */
-    public static final class Node implements StoreTreeNode<Node>, Identifiable<PathArgument> {
-        private final Collection<DataChangeListenerRegistration<?>> listeners = new ArrayList<>();
-        private final Map<PathArgument, Node> children = new HashMap<>();
-        private final PathArgument identifier;
-        private final Reference<Node> parent;
-
-        private Node(final Node parent, final PathArgument identifier) {
-            this.parent = new WeakReference<>(parent);
-            this.identifier = identifier;
-        }
-
-        @Override
-        public PathArgument getIdentifier() {
-            return identifier;
-        }
-
-        @Override
-        public Optional<Node> getChild(final PathArgument child) {
-            return Optional.fromNullable(children.get(child));
-        }
-
-        /**
-         * Return the list of current listeners. This collection is guaranteed
-         * to be immutable only while the walker, through which this node is
-         * reachable remains unclosed.
-         *
-         * @return the list of current listeners
-         */
-        public Collection<DataChangeListenerRegistration<?>> getListeners() {
-            return listeners;
-        }
-
-        private Node ensureChild(final PathArgument child) {
-            Node potential = children.get(child);
-            if (potential == null) {
-                potential = new Node(this, child);
-                children.put(child, potential);
-            }
-            return potential;
-        }
-
-        private void addListener(final DataChangeListenerRegistration<?> listener) {
-            listeners.add(listener);
-            LOG.debug("Listener {} registered", listener);
-        }
-
-        private void removeListener(final DataChangeListenerRegistrationImpl<?> listener) {
-            listeners.remove(listener);
-            LOG.debug("Listener {} unregistered", listener);
-
-            // We have been called with the write-lock held, so we can perform some cleanup.
-            removeThisIfUnused();
-        }
-
-        private void removeThisIfUnused() {
-            final Node p = parent.get();
-            if (p != null && listeners.isEmpty() && children.isEmpty()) {
-                p.removeChild(identifier);
-            }
-        }
-
-        private void removeChild(final PathArgument arg) {
-            children.remove(arg);
-            removeThisIfUnused();
-        }
-
-        @Override
-        public String toString() {
-            return "Node [identifier=" + identifier + ", listeners=" + listeners.size() + ", children=" + children.size() + "]";
-        }
-
-
-    }
-
-    private abstract static class DataChangeListenerRegistrationImpl<T extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> extends AbstractListenerRegistration<T> //
+    abstract static class DataChangeListenerRegistrationImpl<T extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> extends AbstractListenerRegistration<T> //
     implements DataChangeListenerRegistration<T> {
         public DataChangeListenerRegistrationImpl(final T listener) {
             super(listener);
diff --git a/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/tree/ListenerWalker.java b/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/tree/ListenerWalker.java
new file mode 100644 (file)
index 0000000..0c297a2
--- /dev/null
@@ -0,0 +1,44 @@
+/**
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.store.impl.tree;
+
+import com.google.common.base.Preconditions;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.locks.Lock;
+
+/**
+ * A walking context, pretty much equivalent to an iterator, but it
+ * exposes the underlying tree structure.
+ *
+ * @author Robert Varga
+ */
+public class ListenerWalker implements AutoCloseable {
+    private static final AtomicIntegerFieldUpdater<ListenerWalker> CLOSED_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ListenerWalker.class, "closed");
+    private final Lock lock;
+    private final ListenerNode node;
+
+    // Used via CLOSED_UPDATER
+    @SuppressWarnings("unused")
+    private volatile int closed = 0;
+
+    ListenerWalker(final Lock lock, final ListenerNode node) {
+        this.lock = Preconditions.checkNotNull(lock);
+        this.node = Preconditions.checkNotNull(node);
+    }
+
+    public ListenerNode getRootNode() {
+        return node;
+    }
+
+    @Override
+    public void close() {
+        if (CLOSED_UPDATER.compareAndSet(this, 0, 1)) {
+            lock.unlock();
+        }
+    }
+}
\ No newline at end of file
index add889f..61c83a6 100644 (file)
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>ietf-netconf-monitoring</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>ietf-netconf-notifications</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>netconf-client</artifactId>
index 460e072..b966fae 100644 (file)
@@ -9,6 +9,7 @@ package org.opendaylight.controller.config.yang.md.sal.connector.netconf;
 
 import static org.opendaylight.controller.config.api.JmxAttributeValidationException.checkCondition;
 import static org.opendaylight.controller.config.api.JmxAttributeValidationException.checkNotNull;
+
 import com.google.common.base.Optional;
 import io.netty.util.concurrent.EventExecutor;
 import java.math.BigDecimal;
@@ -87,7 +88,6 @@ public final class NetconfConnectorModule extends org.opendaylight.controller.co
         }
 
         userCapabilities = getUserCapabilities();
-
     }
 
     private boolean isHostAddressPresent(final Host address) {
@@ -111,17 +111,17 @@ public final class NetconfConnectorModule extends org.opendaylight.controller.co
                 new NetconfDevice.SchemaResourcesDTO(schemaRegistry, schemaContextFactory, new NetconfStateSchemas.NetconfStateSchemasResolverImpl());
 
         final NetconfDevice device =
-                new NetconfDevice(schemaResourcesDTO, id, salFacade, globalProcessingExecutor, new NetconfMessageTransformer());
+                new NetconfDevice(schemaResourcesDTO, id, salFacade, globalProcessingExecutor, new NetconfMessageTransformer(), true);
 
         final NetconfDeviceCommunicator listener = userCapabilities.isPresent() ?
                 new NetconfDeviceCommunicator(id, device, userCapabilities.get()) : new NetconfDeviceCommunicator(id, device);
 
         final NetconfReconnectingClientConfiguration clientConfig = getClientConfig(listener);
-
         final NetconfClientDispatcher dispatcher = getClientDispatcherDependency();
+
         listener.initializeRemoteConnection(dispatcher, clientConfig);
 
-        return new MyAutoCloseable(listener, salFacade);
+        return new SalConnectorCloseable(listener, salFacade);
     }
 
     private Optional<NetconfSessionPreferences> getUserCapabilities() {
@@ -152,7 +152,7 @@ public final class NetconfConnectorModule extends org.opendaylight.controller.co
         final InetSocketAddress socketAddress = getSocketAddress();
         final long clientConnectionTimeoutMillis = getConnectionTimeoutMillis();
 
-        final ReconnectStrategyFactory sf = new MyReconnectStrategyFactory(
+        final ReconnectStrategyFactory sf = new TimedReconnectStrategyFactory(
             getEventExecutorDependency(), getMaxConnectionAttempts(), getBetweenAttemptsTimeoutMillis(), getSleepFactor());
         final ReconnectStrategy strategy = sf.createReconnectStrategy();
 
@@ -160,21 +160,21 @@ public final class NetconfConnectorModule extends org.opendaylight.controller.co
         .withAddress(socketAddress)
         .withConnectionTimeoutMillis(clientConnectionTimeoutMillis)
         .withReconnectStrategy(strategy)
-        .withSessionListener(listener)
         .withAuthHandler(new LoginPassword(getUsername(),getPassword()))
         .withProtocol(getTcpOnly() ?
                 NetconfClientConfiguration.NetconfClientProtocol.TCP :
                 NetconfClientConfiguration.NetconfClientProtocol.SSH)
         .withConnectStrategyFactory(sf)
+        .withSessionListener(listener)
         .build();
     }
 
-    private static final class MyAutoCloseable implements AutoCloseable {
+    private static final class SalConnectorCloseable implements AutoCloseable {
         private final RemoteDeviceHandler<NetconfSessionPreferences> salFacade;
         private final NetconfDeviceCommunicator listener;
 
-        public MyAutoCloseable(final NetconfDeviceCommunicator listener,
-                final RemoteDeviceHandler<NetconfSessionPreferences> salFacade) {
+        public SalConnectorCloseable(final NetconfDeviceCommunicator listener,
+                                     final RemoteDeviceHandler<NetconfSessionPreferences> salFacade) {
             this.listener = listener;
             this.salFacade = salFacade;
         }
@@ -186,13 +186,13 @@ public final class NetconfConnectorModule extends org.opendaylight.controller.co
         }
     }
 
-    private static final class MyReconnectStrategyFactory implements ReconnectStrategyFactory {
+    private static final class TimedReconnectStrategyFactory implements ReconnectStrategyFactory {
         private final Long connectionAttempts;
         private final EventExecutor executor;
         private final double sleepFactor;
         private final int minSleep;
 
-        MyReconnectStrategyFactory(final EventExecutor executor, final Long maxConnectionAttempts, final int minSleep, final BigDecimal sleepFactor) {
+        TimedReconnectStrategyFactory(final EventExecutor executor, final Long maxConnectionAttempts, final int minSleep, final BigDecimal sleepFactor) {
             if (maxConnectionAttempts != null && maxConnectionAttempts > 0) {
                 connectionAttempts = maxConnectionAttempts;
             } else {
index 9423dbf..ca12e59 100644 (file)
@@ -10,9 +10,9 @@ package org.opendaylight.controller.sal.connect.api;
 /**
  *
  */
-public interface RemoteDevice<PREF, M> {
+public interface RemoteDevice<PREF, M, LISTENER extends RemoteDeviceCommunicator<M>> {
 
-    void onRemoteSessionUp(PREF remoteSessionCapabilities, RemoteDeviceCommunicator<M> listener);
+    void onRemoteSessionUp(PREF remoteSessionCapabilities, LISTENER listener);
 
     void onRemoteSessionDown();
 
index 39340fa..9a5b239 100644 (file)
@@ -32,12 +32,17 @@ import org.opendaylight.controller.sal.connect.api.RemoteDevice;
 import org.opendaylight.controller.sal.connect.api.RemoteDeviceCommunicator;
 import org.opendaylight.controller.sal.connect.api.RemoteDeviceHandler;
 import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCapabilities;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCommunicator;
 import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionPreferences;
 import org.opendaylight.controller.sal.connect.netconf.sal.NetconfDeviceRpc;
 import org.opendaylight.controller.sal.connect.netconf.schema.NetconfRemoteSchemaYangSourceProvider;
+import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil;
 import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.fields.unavailable.capabilities.UnavailableCapability.FailureReason;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChange;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.fields.unavailable.capabilities.UnavailableCapability;
 import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.opendaylight.yangtools.yang.model.repo.api.MissingSchemaSourceException;
 import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory;
@@ -54,7 +59,7 @@ import org.slf4j.LoggerFactory;
 /**
  *  This is a mediator between NetconfDeviceCommunicator and NetconfDeviceSalFacade
  */
-public final class NetconfDevice implements RemoteDevice<NetconfSessionPreferences, NetconfMessage> {
+public final class NetconfDevice implements RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> {
 
     private static final Logger logger = LoggerFactory.getLogger(NetconfDevice.class);
 
@@ -66,6 +71,7 @@ public final class NetconfDevice implements RemoteDevice<NetconfSessionPreferenc
     };
 
     private final RemoteDeviceId id;
+    private final boolean reconnectOnSchemasChange;
 
     private final SchemaContextFactory schemaContextFactory;
     private final RemoteDeviceHandler<NetconfSessionPreferences> salFacade;
@@ -78,7 +84,14 @@ public final class NetconfDevice implements RemoteDevice<NetconfSessionPreferenc
 
     public NetconfDevice(final SchemaResourcesDTO schemaResourcesDTO, final RemoteDeviceId id, final RemoteDeviceHandler<NetconfSessionPreferences> salFacade,
                          final ExecutorService globalProcessingExecutor, final MessageTransformer<NetconfMessage> messageTransformer) {
+        this(schemaResourcesDTO, id, salFacade, globalProcessingExecutor, messageTransformer, false);
+    }
+
+    // FIXME reduce parameters
+    public NetconfDevice(final SchemaResourcesDTO schemaResourcesDTO, final RemoteDeviceId id, final RemoteDeviceHandler<NetconfSessionPreferences> salFacade,
+                         final ExecutorService globalProcessingExecutor, final MessageTransformer<NetconfMessage> messageTransformer, final boolean reconnectOnSchemasChange) {
         this.id = id;
+        this.reconnectOnSchemasChange = reconnectOnSchemasChange;
         this.schemaRegistry = schemaResourcesDTO.getSchemaRegistry();
         this.messageTransformer = messageTransformer;
         this.schemaContextFactory = schemaResourcesDTO.getSchemaContextFactory();
@@ -90,7 +103,7 @@ public final class NetconfDevice implements RemoteDevice<NetconfSessionPreferenc
 
     @Override
     public void onRemoteSessionUp(final NetconfSessionPreferences remoteSessionCapabilities,
-                                  final RemoteDeviceCommunicator<NetconfMessage> listener) {
+                                  final NetconfDeviceCommunicator listener) {
         // SchemaContext setup has to be performed in a dedicated thread since
         // we are in a netty thread in this method
         // Yang models are being downloaded in this method and it would cause a
@@ -103,6 +116,10 @@ public final class NetconfDevice implements RemoteDevice<NetconfSessionPreferenc
         final DeviceSourcesResolver task = new DeviceSourcesResolver(deviceRpc, remoteSessionCapabilities, id, stateSchemasResolver);
         final ListenableFuture<DeviceSources> sourceResolverFuture = processingExecutor.submit(task);
 
+        if(shouldListenOnSchemaChange(remoteSessionCapabilities)) {
+           registerToBaseNetconfStream(deviceRpc, listener);
+        }
+
         final FutureCallback<DeviceSources> resolvedSourceCallback = new FutureCallback<DeviceSources>() {
             @Override
             public void onSuccess(final DeviceSources result) {
@@ -125,12 +142,49 @@ public final class NetconfDevice implements RemoteDevice<NetconfSessionPreferenc
 
     }
 
+    private void registerToBaseNetconfStream(final NetconfDeviceRpc deviceRpc, final NetconfDeviceCommunicator listener) {
+       final ListenableFuture<RpcResult<CompositeNode>> rpcResultListenableFuture =
+                deviceRpc.invokeRpc(NetconfMessageTransformUtil.CREATE_SUBSCRIPTION_RPC_QNAME, NetconfMessageTransformUtil.CREATE_SUBSCRIPTION_RPC_CONTENT);
+
+        final NotificationHandler.NotificationFilter filter = new NotificationHandler.NotificationFilter() {
+            @Override
+            public Optional<CompositeNode> filterNotification(final CompositeNode notification) {
+                if (isCapabilityChanged(notification)) {
+                    logger.info("{}: Schemas change detected, reconnecting", id);
+                    // Only disconnect is enough, the reconnecting nature of the connector will take care of reconnecting
+                    listener.disconnect();
+                    return Optional.absent();
+                }
+                return Optional.of(notification);
+            }
+
+            private boolean isCapabilityChanged(final CompositeNode notification) {
+                return notification.getNodeType().equals(NetconfCapabilityChange.QNAME);
+            }
+        };
+
+        Futures.addCallback(rpcResultListenableFuture, new FutureCallback<RpcResult<CompositeNode>>() {
+            @Override
+            public void onSuccess(final RpcResult<CompositeNode> result) {
+                notificationHandler.addNotificationFilter(filter);
+            }
+
+            @Override
+            public void onFailure(final Throwable t) {
+                logger.warn("Unable to subscribe to base notification stream. Schemas will not be reloaded on the fly", t);
+            }
+        });
+    }
+
+    private boolean shouldListenOnSchemaChange(final NetconfSessionPreferences remoteSessionCapabilities) {
+        return remoteSessionCapabilities.isNotificationsSupported() && reconnectOnSchemasChange;
+    }
+
     private void handleSalInitializationSuccess(final SchemaContext result, final NetconfSessionPreferences remoteSessionCapabilities, final NetconfDeviceRpc deviceRpc) {
         updateMessageTransformer(result);
         salFacade.onDeviceConnected(result, remoteSessionCapabilities, deviceRpc);
         notificationHandler.onRemoteSchemaUp();
 
-        logger.debug("{}: Initialization in sal successful", id);
         logger.info("{}: Netconf connector initialized successfully", id);
     }
 
@@ -150,7 +204,6 @@ public final class NetconfDevice implements RemoteDevice<NetconfSessionPreferenc
 
     /**
      * Update initial message transformer to use retrieved schema
-     * @param currentSchemaContext
      */
     private void updateMessageTransformer(final SchemaContext currentSchemaContext) {
         messageTransformer.onGlobalContextUpdated(currentSchemaContext);
@@ -346,7 +399,7 @@ public final class NetconfDevice implements RemoteDevice<NetconfSessionPreferenc
                     if (t instanceof MissingSchemaSourceException) {
                         final SourceIdentifier missingSource = ((MissingSchemaSourceException) t).getSourceId();
                         logger.warn("{}: Unable to build schema context, missing source {}, will reattempt without it", id, missingSource);
-                        capabilities.addUnresolvedCapabilities(getQNameFromSourceIdentifiers(Sets.newHashSet(missingSource)), FailureReason.MissingSource);
+                        capabilities.addUnresolvedCapabilities(getQNameFromSourceIdentifiers(Sets.newHashSet(missingSource)), UnavailableCapability.FailureReason.MissingSource);
                         setUpSchema(stripMissingSource(requiredSources, missingSource));
 
                     // In case resolution error, try only with resolved sources
@@ -354,7 +407,7 @@ public final class NetconfDevice implements RemoteDevice<NetconfSessionPreferenc
                         // TODO check for infinite loop
                         final SchemaResolutionException resolutionException = (SchemaResolutionException) t;
                         final Set<SourceIdentifier> unresolvedSources = resolutionException.getUnsatisfiedImports().keySet();
-                        capabilities.addUnresolvedCapabilities(getQNameFromSourceIdentifiers(unresolvedSources), FailureReason.UnableToResolve);
+                        capabilities.addUnresolvedCapabilities(getQNameFromSourceIdentifiers(unresolvedSources), UnavailableCapability.FailureReason.UnableToResolve);
                         logger.warn("{}: Unable to build schema context, unsatisfied imports {}, will reattempt with resolved only", id, resolutionException.getUnsatisfiedImports());
                         setUpSchema(resolutionException.getResolvedSources());
                     // unknown error, fail
index cc8960f..b5927f0 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.controller.sal.connect.netconf;
 
+import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import java.util.LinkedList;
 import java.util.List;
@@ -31,6 +32,7 @@ final class NotificationHandler {
     private final MessageTransformer<NetconfMessage> messageTransformer;
     private final RemoteDeviceId id;
     private boolean passNotifications = false;
+    private NotificationFilter filter;
 
     NotificationHandler(final RemoteDeviceHandler<?> salFacade, final MessageTransformer<NetconfMessage> messageTransformer, final RemoteDeviceId id) {
         this.salFacade = Preconditions.checkNotNull(salFacade);
@@ -70,9 +72,21 @@ final class NotificationHandler {
         queue.add(notification);
     }
 
-    private void passNotification(final CompositeNode parsedNotification) {
+    private synchronized void passNotification(final CompositeNode parsedNotification) {
         logger.debug("{}: Forwarding notification {}", id, parsedNotification);
         Preconditions.checkNotNull(parsedNotification);
-        salFacade.onNotification(parsedNotification);
+
+        if(filter == null || filter.filterNotification(parsedNotification).isPresent()) {
+            salFacade.onNotification(parsedNotification);
+        }
+    }
+
+    synchronized void addNotificationFilter(final NotificationFilter filter) {
+        this.filter = filter;
+    }
+
+    static interface NotificationFilter {
+
+        Optional<CompositeNode> filterNotification(CompositeNode notification);
     }
 }
index 556fc2f..8553820 100644 (file)
@@ -47,7 +47,7 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener,
 
     private static final Logger logger = LoggerFactory.getLogger(NetconfDeviceCommunicator.class);
 
-    private final RemoteDevice<NetconfSessionPreferences, NetconfMessage> remoteDevice;
+    private final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> remoteDevice;
     private final Optional<NetconfSessionPreferences> overrideNetconfCapabilities;
     private final RemoteDeviceId id;
     private final Lock sessionLock = new ReentrantLock();
@@ -57,17 +57,17 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener,
     private NetconfClientSession session;
     private Future<?> initFuture;
 
-    public NetconfDeviceCommunicator(final RemoteDeviceId id, final RemoteDevice<NetconfSessionPreferences, NetconfMessage> remoteDevice,
-            final NetconfSessionPreferences netconfSessionPreferences) {
-        this(id, remoteDevice, Optional.of(netconfSessionPreferences));
+    public NetconfDeviceCommunicator(final RemoteDeviceId id, final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> remoteDevice,
+            final NetconfSessionPreferences NetconfSessionPreferences) {
+        this(id, remoteDevice, Optional.of(NetconfSessionPreferences));
     }
 
     public NetconfDeviceCommunicator(final RemoteDeviceId id,
-                                     final RemoteDevice<NetconfSessionPreferences, NetconfMessage> remoteDevice) {
+                                     final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> remoteDevice) {
         this(id, remoteDevice, Optional.<NetconfSessionPreferences>absent());
     }
 
-    private NetconfDeviceCommunicator(final RemoteDeviceId id, final RemoteDevice<NetconfSessionPreferences, NetconfMessage> remoteDevice,
+    private NetconfDeviceCommunicator(final RemoteDeviceId id, final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> remoteDevice,
             final Optional<NetconfSessionPreferences> overrideNetconfCapabilities) {
         this.id = id;
         this.remoteDevice = remoteDevice;
@@ -97,14 +97,15 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener,
         }
     }
 
-    public void initializeRemoteConnection(final NetconfClientDispatcher dispatch,
-                                           final NetconfClientConfiguration config) {
+    public void initializeRemoteConnection(final NetconfClientDispatcher dispatcher, final NetconfClientConfiguration config) {
+        // TODO 2313 extract listener from configuration
         if(config instanceof NetconfReconnectingClientConfiguration) {
-            initFuture = dispatch.createReconnectingClient((NetconfReconnectingClientConfiguration) config);
+            initFuture = dispatcher.createReconnectingClient((NetconfReconnectingClientConfiguration) config);
         } else {
-            initFuture = dispatch.createClient(config);
+            initFuture = dispatcher.createClient(config);
         }
 
+
         initFuture.addListener(new GenericFutureListener<Future<Object>>(){
 
             @Override
@@ -115,6 +116,13 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener,
                 }
             }
         });
+
+    }
+
+    public void disconnect() {
+        if(session != null) {
+            session.close();
+        }
     }
 
     private void tearDown( String reason ) {
@@ -158,18 +166,14 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener,
         }
     }
 
-    private RpcResult<NetconfMessage> createSessionDownRpcResult()
-    {
+    private RpcResult<NetconfMessage> createSessionDownRpcResult() {
         return createErrorRpcResult( RpcError.ErrorType.TRANSPORT,
                              String.format( "The netconf session to %1$s is disconnected", id.getName() ) );
     }
 
-    private RpcResult<NetconfMessage> createErrorRpcResult( RpcError.ErrorType errorType, String message )
-    {
+    private RpcResult<NetconfMessage> createErrorRpcResult( RpcError.ErrorType errorType, String message ) {
         return RpcResultBuilder.<NetconfMessage>failed()
-                .withError( errorType, NetconfDocumentedException.ErrorTag.operation_failed.getTagValue(),
-                            message )
-                .build();
+                .withError(errorType, NetconfDocumentedException.ErrorTag.operation_failed.getTagValue(), message).build();
     }
 
     @Override
@@ -194,6 +198,7 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener,
         if(session != null) {
             session.close();
         }
+
         tearDown(id + ": Netconf session closed");
     }
 
@@ -232,14 +237,12 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener,
             logger.debug("{}: Message received {}", id, message);
 
             if(logger.isTraceEnabled()) {
-                logger.trace( "{}: Matched request: {} to response: {}", id,
-                              msgToS( request.request ), msgToS( message ) );
+                logger.trace( "{}: Matched request: {} to response: {}", id, msgToS( request.request ), msgToS( message ) );
             }
 
             try {
                 NetconfMessageTransformUtil.checkValidReply( request.request, message );
-            }
-            catch (final NetconfDocumentedException e) {
+            } catch (final NetconfDocumentedException e) {
                 logger.warn( "{}: Invalid request-reply match, reply message contains different message-id, request: {}, response: {}",
                              id, msgToS( request.request ), msgToS( message ), e );
 
@@ -250,8 +253,7 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener,
 
             try {
                 NetconfMessageTransformUtil.checkSuccessReply(message);
-            }
-            catch(final NetconfDocumentedException e) {
+            } catch(final NetconfDocumentedException e) {
                 logger.warn( "{}: Error reply from remote device, request: {}, response: {}", id,
                              msgToS( request.request ), msgToS( message ), e );
 
@@ -269,13 +271,11 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener,
     }
 
     @Override
-    public ListenableFuture<RpcResult<NetconfMessage>> sendRequest(
-                                               final NetconfMessage message, final QName rpc) {
+    public ListenableFuture<RpcResult<NetconfMessage>> sendRequest(final NetconfMessage message, final QName rpc) {
         sessionLock.lock();
         try {
             return sendRequestWithLock( message, rpc );
-        }
-        finally {
+        } finally {
             sessionLock.unlock();
         }
     }
index 572885b..89211ed 100644 (file)
@@ -105,6 +105,11 @@ public final class NetconfSessionPreferences {
         return containsNonModuleCapability(NetconfMessageTransformUtil.NETCONF_RUNNING_WRITABLE_URI.toString());
     }
 
+    public boolean isNotificationsSupported() {
+        return containsNonModuleCapability(NetconfMessageTransformUtil.NETCONF_NOTIFICATONS_URI.toString())
+                || containsModuleCapability(NetconfMessageTransformUtil.IETF_NETCONF_NOTIFICATIONS);
+    }
+
     public boolean isMonitoringSupported() {
         return containsModuleCapability(NetconfMessageTransformUtil.IETF_NETCONF_MONITORING)
                 || containsNonModuleCapability(NetconfMessageTransformUtil.IETF_NETCONF_MONITORING.getNamespace().toString());
index 9eba241..5e3ad2c 100644 (file)
@@ -26,7 +26,9 @@ import javax.annotation.Nullable;
 import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
 import org.opendaylight.controller.netconf.api.NetconfMessage;
 import org.opendaylight.controller.netconf.util.messages.NetconfMessageUtil;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.NetconfState;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChange;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity;
@@ -51,6 +53,7 @@ import org.w3c.dom.Element;
 public class NetconfMessageTransformUtil {
 
     public static final String MESSAGE_ID_ATTR = "message-id";
+    public static final QName CREATE_SUBSCRIPTION_RPC_QNAME = QName.cachedReference(QName.create(CreateSubscriptionInput.QNAME, "create-subscription"));
 
     private NetconfMessageTransformUtil() {}
 
@@ -61,6 +64,8 @@ public class NetconfMessageTransformUtil {
     public static final QName IETF_NETCONF_MONITORING_SCHEMA_VERSION = QName.create(IETF_NETCONF_MONITORING, "version");
     public static final QName IETF_NETCONF_MONITORING_SCHEMA_NAMESPACE = QName.create(IETF_NETCONF_MONITORING, "namespace");
 
+    public static final QName IETF_NETCONF_NOTIFICATIONS = QName.create(NetconfCapabilityChange.QNAME, "ietf-netconf-notifications");
+
     public static URI NETCONF_URI = URI.create("urn:ietf:params:xml:ns:netconf:base:1.0");
     public static QName NETCONF_QNAME = QName.create(NETCONF_URI, null, "netconf");
     public static QName NETCONF_DATA_QNAME = QName.create(NETCONF_QNAME, "data");
@@ -91,6 +96,9 @@ public class NetconfMessageTransformUtil {
     public static URI NETCONF_CANDIDATE_URI = URI
             .create("urn:ietf:params:netconf:capability:candidate:1.0");
 
+    public static URI NETCONF_NOTIFICATONS_URI = URI
+            .create("urn:ietf:params:netconf:capability:notification:1.0");
+
     public static URI NETCONF_RUNNING_WRITABLE_URI = URI
             .create("urn:ietf:params:netconf:capability:writable-running:1.0");
 
@@ -105,6 +113,10 @@ public class NetconfMessageTransformUtil {
     public static final CompositeNode COMMIT_RPC_CONTENT =
             NodeFactory.createImmutableCompositeNode(NETCONF_COMMIT_QNAME, null, Collections.<Node<?>>emptyList());
 
+    // Create-subscription changes message
+    public static final CompositeNode CREATE_SUBSCRIPTION_RPC_CONTENT =
+            NodeFactory.createImmutableCompositeNode(CREATE_SUBSCRIPTION_RPC_QNAME, null, Collections.<Node<?>>emptyList());
+
     public static Node<?> toFilterStructure(final YangInstanceIdentifier identifier) {
         Node<?> previous = null;
         if (Iterables.isEmpty(identifier.getPathArguments())) {
index e13398b..7059a14 100644 (file)
@@ -66,6 +66,13 @@ module odl-sal-netconf-connector-cfg {
                 }
             }
 
+            leaf reconnect-on-changed-schema {
+                type boolean;
+                default false;
+                description "If true, the connector would auto disconnect/reconnect when schemas are changed in the remote device.
+                             The connector subscribes (right after connect) to base netconf notifications and listens for netconf-capability-change notification";
+            }
+
             container dom-registry {
                 uses config:service-ref {
                     refine type {
index 0ddafa3..ec945e0 100644 (file)
@@ -37,9 +37,9 @@ import org.mockito.stubbing.Answer;
 import org.opendaylight.controller.netconf.api.NetconfMessage;
 import org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants;
 import org.opendaylight.controller.sal.connect.api.MessageTransformer;
-import org.opendaylight.controller.sal.connect.api.RemoteDeviceCommunicator;
 import org.opendaylight.controller.sal.connect.api.RemoteDeviceHandler;
 import org.opendaylight.controller.sal.connect.api.SchemaSourceProviderFactory;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCommunicator;
 import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionPreferences;
 import org.opendaylight.controller.sal.connect.netconf.sal.NetconfDeviceRpc;
 import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil;
@@ -101,7 +101,7 @@ public class NetconfDeviceTest {
         final ArrayList<String> capList = Lists.newArrayList(TEST_CAPABILITY);
 
         final RemoteDeviceHandler<NetconfSessionPreferences> facade = getFacade();
-        final RemoteDeviceCommunicator<NetconfMessage> listener = getListener();
+        final NetconfDeviceCommunicator listener = getListener();
 
         final SchemaContextFactory schemaFactory = getSchemaFactory();
 
@@ -115,7 +115,7 @@ public class NetconfDeviceTest {
 
         final NetconfDevice.SchemaResourcesDTO schemaResourcesDTO
                 = new NetconfDevice.SchemaResourcesDTO(getSchemaRegistry(), schemaFactory, stateSchemasResolver);
-        final NetconfDevice device = new NetconfDevice(schemaResourcesDTO, getId(), facade, getExecutor(), getMessageTransformer());
+        final NetconfDevice device = new NetconfDevice(schemaResourcesDTO, getId(), facade, getExecutor(), getMessageTransformer(), true);
         // Monitoring not supported
         final NetconfSessionPreferences sessionCaps = getSessionCaps(false, capList);
         device.onRemoteSessionUp(sessionCaps, listener);
@@ -128,7 +128,7 @@ public class NetconfDeviceTest {
     @Test
     public void testNetconfDeviceMissingSource() throws Exception {
         final RemoteDeviceHandler<NetconfSessionPreferences> facade = getFacade();
-        final RemoteDeviceCommunicator<NetconfMessage> listener = getListener();
+        final NetconfDeviceCommunicator listener = getListener();
 
         final SchemaContextFactory schemaFactory = getSchemaFactory();
 
@@ -147,7 +147,7 @@ public class NetconfDeviceTest {
 
         final NetconfDevice.SchemaResourcesDTO schemaResourcesDTO
                 = new NetconfDevice.SchemaResourcesDTO(getSchemaRegistry(), schemaFactory, stateSchemasResolver);
-        final NetconfDevice device = new NetconfDevice(schemaResourcesDTO, getId(), facade, getExecutor(), getMessageTransformer());
+        final NetconfDevice device = new NetconfDevice(schemaResourcesDTO, getId(), facade, getExecutor(), getMessageTransformer(), true);
         // Monitoring supported
         final NetconfSessionPreferences sessionCaps = getSessionCaps(true, Lists.newArrayList(TEST_CAPABILITY, TEST_CAPABILITY2));
         device.onRemoteSessionUp(sessionCaps, listener);
@@ -167,13 +167,13 @@ public class NetconfDeviceTest {
     @Test
     public void testNotificationBeforeSchema() throws Exception {
         final RemoteDeviceHandler<NetconfSessionPreferences> facade = getFacade();
-        final RemoteDeviceCommunicator<NetconfMessage> listener = getListener();
+        final NetconfDeviceCommunicator listener = getListener();
 
         final MessageTransformer<NetconfMessage> messageTransformer = getMessageTransformer();
 
         final NetconfDevice.SchemaResourcesDTO schemaResourcesDTO
                 = new NetconfDevice.SchemaResourcesDTO(getSchemaRegistry(), getSchemaFactory(), stateSchemasResolver);
-        final NetconfDevice device = new NetconfDevice(schemaResourcesDTO, getId(), facade, getExecutor(), messageTransformer);
+        final NetconfDevice device = new NetconfDevice(schemaResourcesDTO, getId(), facade, getExecutor(), messageTransformer, true);
 
         device.onNotification(netconfMessage);
         device.onNotification(netconfMessage);
@@ -196,14 +196,14 @@ public class NetconfDeviceTest {
     @Test
     public void testNetconfDeviceReconnect() throws Exception {
         final RemoteDeviceHandler<NetconfSessionPreferences> facade = getFacade();
-        final RemoteDeviceCommunicator<NetconfMessage> listener = getListener();
+        final NetconfDeviceCommunicator listener = getListener();
 
         final SchemaContextFactory schemaContextProviderFactory = getSchemaFactory();
         final MessageTransformer<NetconfMessage> messageTransformer = getMessageTransformer();
 
         final NetconfDevice.SchemaResourcesDTO schemaResourcesDTO
                 = new NetconfDevice.SchemaResourcesDTO(getSchemaRegistry(), schemaContextProviderFactory, stateSchemasResolver);
-        final NetconfDevice device = new NetconfDevice(schemaResourcesDTO, getId(), facade, getExecutor(), messageTransformer);
+        final NetconfDevice device = new NetconfDevice(schemaResourcesDTO, getId(), facade, getExecutor(), messageTransformer, true);
         final NetconfSessionPreferences sessionCaps = getSessionCaps(true,
                 Lists.newArrayList(TEST_NAMESPACE + "?module=" + TEST_MODULE + "&amp;revision=" + TEST_REVISION));
         device.onRemoteSessionUp(sessionCaps, listener);
@@ -299,8 +299,8 @@ public class NetconfDeviceTest {
                 capabilities);
     }
 
-    public RemoteDeviceCommunicator<NetconfMessage> getListener() throws Exception {
-        final RemoteDeviceCommunicator<NetconfMessage> remoteDeviceCommunicator = mockCloseableClass(RemoteDeviceCommunicator.class);
+    public NetconfDeviceCommunicator getListener() throws Exception {
+        final NetconfDeviceCommunicator remoteDeviceCommunicator = mockCloseableClass(NetconfDeviceCommunicator.class);
         doReturn(Futures.immediateFuture(rpcResult)).when(remoteDeviceCommunicator).sendRequest(any(NetconfMessage.class), any(QName.class));
         return remoteDeviceCommunicator;
     }
index fad3d8e..68fe87f 100644 (file)
@@ -56,10 +56,10 @@ import org.opendaylight.controller.netconf.api.NetconfTerminationReason;
 import org.opendaylight.controller.netconf.client.NetconfClientDispatcherImpl;
 import org.opendaylight.controller.netconf.client.NetconfClientSession;
 import org.opendaylight.controller.netconf.client.conf.NetconfClientConfiguration;
+import org.opendaylight.controller.netconf.client.conf.NetconfReconnectingClientConfiguration;
 import org.opendaylight.controller.netconf.client.conf.NetconfReconnectingClientConfigurationBuilder;
 import org.opendaylight.controller.netconf.nettyutil.handler.ssh.authentication.LoginPassword;
 import org.opendaylight.controller.sal.connect.api.RemoteDevice;
-import org.opendaylight.controller.sal.connect.api.RemoteDeviceCommunicator;
 import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil;
 import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
 import org.opendaylight.protocol.framework.ReconnectStrategy;
@@ -77,7 +77,7 @@ public class NetconfDeviceCommunicatorTest {
     NetconfClientSession mockSession;
 
     @Mock
-    RemoteDevice<NetconfSessionPreferences, NetconfMessage> mockDevice;
+    RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> mockDevice;
 
     NetconfDeviceCommunicator communicator;
 
@@ -85,16 +85,15 @@ public class NetconfDeviceCommunicatorTest {
     public void setUp() throws Exception {
         MockitoAnnotations.initMocks( this );
 
-        communicator = new NetconfDeviceCommunicator( new RemoteDeviceId( "test" ), mockDevice );
+        communicator = new NetconfDeviceCommunicator( new RemoteDeviceId( "test" ), mockDevice);
     }
 
     @SuppressWarnings("unchecked")
-    void setupSession()
-    {
-        doReturn( Collections.<String>emptySet() ).when( mockSession ).getServerCapabilities();
-        doNothing().when( mockDevice ).onRemoteSessionUp( any( NetconfSessionPreferences.class ),
-                                                          any( RemoteDeviceCommunicator.class ) );
-        communicator.onSessionUp( mockSession );
+    void setupSession() {
+        doReturn(Collections.<String>emptySet()).when(mockSession).getServerCapabilities();
+        doNothing().when(mockDevice).onRemoteSessionUp(any(NetconfSessionPreferences.class),
+                any(NetconfDeviceCommunicator.class));
+        communicator.onSessionUp(mockSession);
     }
 
     private ListenableFuture<RpcResult<NetconfMessage>> sendRequest() throws Exception {
@@ -130,16 +129,16 @@ public class NetconfDeviceCommunicatorTest {
                                  testCapability );
         doReturn( serverCapabilities ).when( mockSession ).getServerCapabilities();
 
-        ArgumentCaptor<NetconfSessionPreferences> netconfSessionCapabilities =
+        ArgumentCaptor<NetconfSessionPreferences> NetconfSessionPreferences =
                                               ArgumentCaptor.forClass( NetconfSessionPreferences.class );
-        doNothing().when( mockDevice ).onRemoteSessionUp( netconfSessionCapabilities.capture(), eq( communicator ) );
+        doNothing().when( mockDevice ).onRemoteSessionUp( NetconfSessionPreferences.capture(), eq( communicator ) );
 
         communicator.onSessionUp( mockSession );
 
         verify( mockSession ).getServerCapabilities();
-        verify( mockDevice ).onRemoteSessionUp( netconfSessionCapabilities.capture(), eq( communicator ) );
+        verify( mockDevice ).onRemoteSessionUp( NetconfSessionPreferences.capture(), eq( communicator ) );
 
-        NetconfSessionPreferences actualCapabilites = netconfSessionCapabilities.getValue();
+        NetconfSessionPreferences actualCapabilites = NetconfSessionPreferences.getValue();
         assertEquals( "containsModuleCapability", true, actualCapabilites.containsNonModuleCapability(
                 NetconfMessageTransformUtil.NETCONF_ROLLBACK_ON_ERROR_URI.toString()) );
         assertEquals( "containsModuleCapability", false, actualCapabilites.containsNonModuleCapability(testCapability) );
@@ -340,7 +339,7 @@ public class NetconfDeviceCommunicatorTest {
      */
     @Test
     public void testNetconfDeviceReconnectInCommunicator() throws Exception {
-        final RemoteDevice<NetconfSessionPreferences, NetconfMessage> device = mock(RemoteDevice.class);
+        final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> device = mock(RemoteDevice.class);
 
         final TimedReconnectStrategy timedReconnectStrategy = new TimedReconnectStrategy(GlobalEventExecutor.INSTANCE, 10000, 0, 1.0, null, 100L, null);
         final ReconnectStrategy reconnectStrategy = spy(new ReconnectStrategy() {
@@ -360,11 +359,11 @@ public class NetconfDeviceCommunicatorTest {
             }
         });
 
-        final NetconfDeviceCommunicator listener = new NetconfDeviceCommunicator(new RemoteDeviceId("test"), device);
         final EventLoopGroup group = new NioEventLoopGroup();
         final Timer time = new HashedWheelTimer();
         try {
-            final NetconfClientConfiguration cfg = NetconfReconnectingClientConfigurationBuilder.create()
+            final NetconfDeviceCommunicator listener = new NetconfDeviceCommunicator(new RemoteDeviceId("test"), device);
+            final NetconfReconnectingClientConfiguration cfg = NetconfReconnectingClientConfigurationBuilder.create()
                     .withAddress(new InetSocketAddress("localhost", 65000))
                     .withReconnectStrategy(reconnectStrategy)
                     .withConnectStrategyFactory(new ReconnectStrategyFactory() {
@@ -379,7 +378,6 @@ public class NetconfDeviceCommunicatorTest {
                     .withSessionListener(listener)
                     .build();
 
-
             listener.initializeRemoteConnection(new NetconfClientDispatcherImpl(group, group, time), cfg);
 
             verify(reconnectStrategy, timeout((int) TimeUnit.MINUTES.toMillis(3)).times(101)).scheduleReconnect(any(Throwable.class));
index 2b30152..3a94969 100644 (file)
       <groupId>${project.groupId}</groupId>
       <artifactId>netconf-mapping-api</artifactId>
     </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>netconf-notifications-api</artifactId>
+    </dependency>
     <dependency>
       <groupId>${project.groupId}</groupId>
       <artifactId>netconf-util</artifactId>
                             org.opendaylight.controller.netconf.confignetconfconnector.util,
                             org.opendaylight.controller.netconf.confignetconfconnector.osgi,
                             org.opendaylight.controller.netconf.confignetconfconnector.exception,</Private-Package>
-            <Import-Package>*</Import-Package>
-            <Export-Package></Export-Package>
           </instructions>
         </configuration>
       </plugin>
index f526d92..ca6a8c4 100644 (file)
@@ -37,7 +37,7 @@ import org.opendaylight.controller.netconf.confignetconfconnector.mapping.config
 import org.opendaylight.controller.netconf.confignetconfconnector.mapping.config.Services;
 import org.opendaylight.controller.netconf.confignetconfconnector.operations.AbstractConfigNetconfOperation;
 import org.opendaylight.controller.netconf.confignetconfconnector.operations.editconfig.EditConfigXmlParser.EditConfigExecution;
-import org.opendaylight.controller.netconf.confignetconfconnector.osgi.YangStoreSnapshot;
+import org.opendaylight.controller.netconf.confignetconfconnector.osgi.YangStoreContext;
 import org.opendaylight.controller.netconf.confignetconfconnector.transactions.TransactionProvider;
 import org.opendaylight.controller.netconf.util.xml.XmlElement;
 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
@@ -52,12 +52,12 @@ public class EditConfig extends AbstractConfigNetconfOperation {
 
     private static final Logger LOG = LoggerFactory.getLogger(EditConfig.class);
 
-    private final YangStoreSnapshot yangStoreSnapshot;
+    private final YangStoreContext yangStoreSnapshot;
 
     private final TransactionProvider transactionProvider;
     private EditConfigXmlParser editConfigXmlParser;
 
-    public EditConfig(YangStoreSnapshot yangStoreSnapshot, TransactionProvider transactionProvider,
+    public EditConfig(YangStoreContext yangStoreSnapshot, TransactionProvider transactionProvider,
             ConfigRegistryClient configRegistryClient, String netconfSessionIdForReporting) {
         super(configRegistryClient, netconfSessionIdForReporting);
         this.yangStoreSnapshot = yangStoreSnapshot;
@@ -204,7 +204,7 @@ public class EditConfig extends AbstractConfigNetconfOperation {
         }
     }
 
-    public static Config getConfigMapping(ConfigRegistryClient configRegistryClient, YangStoreSnapshot yangStoreSnapshot) {
+    public static Config getConfigMapping(ConfigRegistryClient configRegistryClient, YangStoreContext yangStoreSnapshot) {
         Map<String, Map<String, ModuleConfig>> factories = transformMbeToModuleConfigs(configRegistryClient,
                 yangStoreSnapshot.getModuleMXBeanEntryMap());
         Map<String, Map<Date, IdentityMapping>> identitiesMap = transformIdentities(yangStoreSnapshot.getModules());
index b504cbf..27d53cd 100644 (file)
@@ -26,7 +26,7 @@ import org.opendaylight.controller.netconf.confignetconfconnector.mapping.runtim
 import org.opendaylight.controller.netconf.confignetconfconnector.operations.AbstractConfigNetconfOperation;
 import org.opendaylight.controller.netconf.confignetconfconnector.operations.Datastore;
 import org.opendaylight.controller.netconf.confignetconfconnector.operations.editconfig.EditConfig;
-import org.opendaylight.controller.netconf.confignetconfconnector.osgi.YangStoreSnapshot;
+import org.opendaylight.controller.netconf.confignetconfconnector.osgi.YangStoreContext;
 import org.opendaylight.controller.netconf.util.exception.MissingNameSpaceException;
 import org.opendaylight.controller.netconf.util.exception.UnexpectedElementException;
 import org.opendaylight.controller.netconf.util.exception.UnexpectedNamespaceException;
@@ -38,10 +38,10 @@ import org.w3c.dom.Element;
 
 public class Get extends AbstractConfigNetconfOperation {
 
-    private final YangStoreSnapshot yangStoreSnapshot;
+    private final YangStoreContext yangStoreSnapshot;
     private static final Logger LOG = LoggerFactory.getLogger(Get.class);
 
-    public Get(YangStoreSnapshot yangStoreSnapshot, ConfigRegistryClient configRegistryClient,
+    public Get(YangStoreContext yangStoreSnapshot, ConfigRegistryClient configRegistryClient,
                String netconfSessionIdForReporting) {
         super(configRegistryClient, netconfSessionIdForReporting);
         this.yangStoreSnapshot = yangStoreSnapshot;
index 2ff4dd6..350ace5 100644 (file)
@@ -20,7 +20,7 @@ import org.opendaylight.controller.netconf.confignetconfconnector.mapping.config
 import org.opendaylight.controller.netconf.confignetconfconnector.operations.AbstractConfigNetconfOperation;
 import org.opendaylight.controller.netconf.confignetconfconnector.operations.Datastore;
 import org.opendaylight.controller.netconf.confignetconfconnector.operations.editconfig.EditConfig;
-import org.opendaylight.controller.netconf.confignetconfconnector.osgi.YangStoreSnapshot;
+import org.opendaylight.controller.netconf.confignetconfconnector.osgi.YangStoreContext;
 import org.opendaylight.controller.netconf.confignetconfconnector.transactions.TransactionProvider;
 import org.opendaylight.controller.netconf.util.exception.MissingNameSpaceException;
 import org.opendaylight.controller.netconf.util.exception.UnexpectedElementException;
@@ -36,14 +36,14 @@ public class GetConfig extends AbstractConfigNetconfOperation {
 
     public static final String GET_CONFIG = "get-config";
 
-    private final YangStoreSnapshot yangStoreSnapshot;
+    private final YangStoreContext yangStoreSnapshot;
     private final Optional<String> maybeNamespace;
 
     private final TransactionProvider transactionProvider;
 
     private static final Logger LOG = LoggerFactory.getLogger(GetConfig.class);
 
-    public GetConfig(YangStoreSnapshot yangStoreSnapshot, Optional<String> maybeNamespace,
+    public GetConfig(YangStoreContext yangStoreSnapshot, Optional<String> maybeNamespace,
             TransactionProvider transactionProvider, ConfigRegistryClient configRegistryClient,
             String netconfSessionIdForReporting) {
         super(configRegistryClient, netconfSessionIdForReporting);
index 937a2ad..ebbc0e5 100644 (file)
@@ -30,7 +30,7 @@ import org.opendaylight.controller.netconf.confignetconfconnector.mapping.rpc.In
 import org.opendaylight.controller.netconf.confignetconfconnector.mapping.rpc.ModuleRpcs;
 import org.opendaylight.controller.netconf.confignetconfconnector.mapping.rpc.Rpcs;
 import org.opendaylight.controller.netconf.confignetconfconnector.operations.AbstractConfigNetconfOperation;
-import org.opendaylight.controller.netconf.confignetconfconnector.osgi.YangStoreSnapshot;
+import org.opendaylight.controller.netconf.confignetconfconnector.osgi.YangStoreContext;
 import org.opendaylight.controller.netconf.mapping.api.HandlingPriority;
 import org.opendaylight.controller.netconf.util.exception.MissingNameSpaceException;
 import org.opendaylight.controller.netconf.util.xml.XmlElement;
@@ -45,9 +45,9 @@ public class RuntimeRpc extends AbstractConfigNetconfOperation {
     private static final Logger LOG = LoggerFactory.getLogger(RuntimeRpc.class);
     public static final String CONTEXT_INSTANCE = "context-instance";
 
-    private final YangStoreSnapshot yangStoreSnapshot;
+    private final YangStoreContext yangStoreSnapshot;
 
-    public RuntimeRpc(final YangStoreSnapshot yangStoreSnapshot, ConfigRegistryClient configRegistryClient,
+    public RuntimeRpc(final YangStoreContext yangStoreSnapshot, ConfigRegistryClient configRegistryClient,
             String netconfSessionIdForReporting) {
         super(configRegistryClient, netconfSessionIdForReporting);
         this.yangStoreSnapshot = yangStoreSnapshot;
index faaa17d..1579d19 100644 (file)
@@ -43,7 +43,7 @@ public class Activator implements BundleActivator {
 
                 SchemaContextProvider schemaContextProvider = reference.getBundle().getBundleContext().getService(reference);
 
-                YangStoreServiceImpl yangStoreService = new YangStoreServiceImpl(schemaContextProvider);
+                YangStoreService yangStoreService = new YangStoreService(schemaContextProvider, context);
                 configRegistryLookup = new ConfigRegistryLookupThread(yangStoreService);
                 configRegistryLookup.start();
                 return configRegistryLookup;
@@ -79,9 +79,9 @@ public class Activator implements BundleActivator {
     }
 
     private class ConfigRegistryLookupThread extends Thread {
-        private final YangStoreServiceImpl yangStoreService;
+        private final YangStoreService yangStoreService;
 
-        private ConfigRegistryLookupThread(YangStoreServiceImpl yangStoreService) {
+        private ConfigRegistryLookupThread(YangStoreService yangStoreService) {
             super("config-registry-lookup");
             this.yangStoreService = yangStoreService;
         }
index 04d5d4b..612bd85 100644 (file)
@@ -27,7 +27,7 @@ import org.opendaylight.controller.netconf.mapping.api.NetconfOperation;
 final class NetconfOperationProvider {
     private final Set<NetconfOperation> operations;
 
-    NetconfOperationProvider(YangStoreSnapshot yangStoreSnapshot, ConfigRegistryClient configRegistryClient,
+    NetconfOperationProvider(YangStoreContext yangStoreSnapshot, ConfigRegistryClient configRegistryClient,
             TransactionProvider transactionProvider, String netconfSessionIdForReporting) {
 
         operations = setUpOperations(yangStoreSnapshot, configRegistryClient, transactionProvider,
@@ -38,7 +38,7 @@ final class NetconfOperationProvider {
         return operations;
     }
 
-    private static Set<NetconfOperation> setUpOperations(YangStoreSnapshot yangStoreSnapshot,
+    private static Set<NetconfOperation> setUpOperations(YangStoreContext yangStoreSnapshot,
             ConfigRegistryClient configRegistryClient, TransactionProvider transactionProvider,
             String netconfSessionIdForReporting) {
         Set<NetconfOperation> ops = Sets.newHashSet();
index b5ae66d..82c04a5 100644 (file)
@@ -66,10 +66,6 @@ public class NetconfOperationServiceFactoryImpl implements NetconfOperationServi
 
     @Override
     public NetconfOperationServiceImpl createService(String netconfSessionIdForReporting) {
-        try {
-            return new NetconfOperationServiceImpl(yangStoreService, jmxClient, netconfSessionIdForReporting);
-        } catch (YangStoreException e) {
-            throw new IllegalStateException(e);
-        }
+        return new NetconfOperationServiceImpl(yangStoreService, jmxClient, netconfSessionIdForReporting);
     }
 }
index 902be44..ef0a72c 100644 (file)
@@ -8,18 +8,12 @@
 
 package org.opendaylight.controller.netconf.confignetconfconnector.osgi;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Sets;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
-import java.util.Map;
 import java.util.Set;
-import org.opendaylight.controller.config.api.LookupRegistry;
 import org.opendaylight.controller.config.util.ConfigRegistryJMXClient;
-import org.opendaylight.controller.config.yangjmxgenerator.ModuleMXBeanEntry;
 import org.opendaylight.controller.netconf.confignetconfconnector.transactions.TransactionProvider;
 import org.opendaylight.controller.netconf.confignetconfconnector.util.Util;
 import org.opendaylight.controller.netconf.mapping.api.Capability;
@@ -28,61 +22,32 @@ import org.opendaylight.controller.netconf.mapping.api.NetconfOperationService;
 import org.opendaylight.yangtools.yang.model.api.Module;
 
 /**
- * Manages life cycle of {@link YangStoreSnapshot}.
+ * Manages life cycle of {@link YangStoreContext}.
  */
 public class NetconfOperationServiceImpl implements NetconfOperationService {
 
-    private final YangStoreSnapshot yangStoreSnapshot;
     private final NetconfOperationProvider operationProvider;
-    private final Set<Capability> capabilities;
     private final TransactionProvider transactionProvider;
+    private final YangStoreService yangStoreService;
 
     public NetconfOperationServiceImpl(final YangStoreService yangStoreService, final ConfigRegistryJMXClient jmxClient,
-            final String netconfSessionIdForReporting) throws YangStoreException {
+            final String netconfSessionIdForReporting) {
 
-        yangStoreSnapshot = yangStoreService.getYangStoreSnapshot();
-        checkConsistencyBetweenYangStoreAndConfig(jmxClient, yangStoreSnapshot);
+        this.yangStoreService = yangStoreService;
 
         transactionProvider = new TransactionProvider(jmxClient, netconfSessionIdForReporting);
-        operationProvider = new NetconfOperationProvider(yangStoreSnapshot, jmxClient, transactionProvider,
+        operationProvider = new NetconfOperationProvider(yangStoreService, jmxClient, transactionProvider,
                 netconfSessionIdForReporting);
-        capabilities = setupCapabilities(yangStoreSnapshot);
-    }
-
-
-    @VisibleForTesting
-    static void checkConsistencyBetweenYangStoreAndConfig(final LookupRegistry jmxClient, final YangStoreSnapshot yangStoreSnapshot) {
-        Set<String> missingModulesFromConfig = Sets.newHashSet();
-
-        Set<String> modulesSeenByConfig = jmxClient.getAvailableModuleFactoryQNames();
-        Map<String, Map<String, ModuleMXBeanEntry>> moduleMXBeanEntryMap = yangStoreSnapshot.getModuleMXBeanEntryMap();
-
-        for (Map<String, ModuleMXBeanEntry> moduleNameToMBE : moduleMXBeanEntryMap.values()) {
-            for (ModuleMXBeanEntry moduleMXBeanEntry : moduleNameToMBE.values()) {
-                String moduleSeenByYangStore = moduleMXBeanEntry.getYangModuleQName().toString();
-                if(!modulesSeenByConfig.contains(moduleSeenByYangStore)){
-                    missingModulesFromConfig.add(moduleSeenByYangStore);
-                }
-            }
-        }
-
-        Preconditions
-                .checkState(
-                        missingModulesFromConfig.isEmpty(),
-                        "There are inconsistencies between configuration subsystem and yangstore in terms of discovered yang modules, yang modules missing from config subsystem but present in yangstore: %s, %sAll modules present in config: %s",
-                        missingModulesFromConfig, System.lineSeparator(), modulesSeenByConfig);
-
     }
 
     @Override
     public void close() {
-        yangStoreSnapshot.close();
         transactionProvider.close();
     }
 
     @Override
     public Set<Capability> getCapabilities() {
-        return capabilities;
+        return setupCapabilities(yangStoreService);
     }
 
     @Override
@@ -90,7 +55,7 @@ public class NetconfOperationServiceImpl implements NetconfOperationService {
         return operationProvider.getOperations();
     }
 
-    private static Set<Capability> setupCapabilities(final YangStoreSnapshot yangStoreSnapshot) {
+    private static Set<Capability> setupCapabilities(final YangStoreContext yangStoreSnapshot) {
         Set<Capability> capabilities = new HashSet<>();
         // [RFC6241] 8.3.  Candidate Configuration Capability
         capabilities.add(new BasicCapability("urn:ietf:params:netconf:capability:candidate:1.0"));
diff --git a/opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/osgi/YangStoreContext.java b/opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/osgi/YangStoreContext.java
new file mode 100644 (file)
index 0000000..6a38a9a
--- /dev/null
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.netconf.confignetconfconnector.osgi;
+
+import java.util.Map;
+import java.util.Set;
+import org.opendaylight.controller.config.yangjmxgenerator.ModuleMXBeanEntry;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.model.api.Module;
+import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
+
+public interface YangStoreContext {
+
+    /**
+     * @deprecated Use {@link #getQNamesToIdentitiesToModuleMXBeanEntries()} instead. This method return only one
+     * module representation even if multiple revisions are available.
+     */
+    @Deprecated
+    Map<String/* Namespace from yang file */,
+            Map<String /* Name of module entry from yang file */, ModuleMXBeanEntry>> getModuleMXBeanEntryMap();
+
+
+    Map<QName, Map<String /* identity local name */, ModuleMXBeanEntry>> getQNamesToIdentitiesToModuleMXBeanEntries();
+
+    /**
+     * Get all modules discovered when this snapshot was created.
+     * @return all modules discovered. If one module exists with two different revisions, return both.
+     */
+    Set<Module> getModules();
+
+    String getModuleSource(ModuleIdentifier moduleIdentifier);
+
+}
diff --git a/opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/osgi/YangStoreException.java b/opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/osgi/YangStoreException.java
deleted file mode 100644 (file)
index 18558b3..0000000
+++ /dev/null
@@ -1,18 +0,0 @@
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.netconf.confignetconfconnector.osgi;
-
-public class YangStoreException extends Exception {
-
-    private static final long serialVersionUID = 2841238836278528836L;
-
-    public YangStoreException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-}
index 969d7cf..de151a8 100644 (file)
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
+
 package org.opendaylight.controller.netconf.confignetconfconnector.osgi;
 
-/**
- * Yang store OSGi service
- */
-public interface YangStoreService {
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import java.lang.ref.SoftReference;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicReference;
+import org.opendaylight.controller.config.yangjmxgenerator.ModuleMXBeanEntry;
+import org.opendaylight.controller.netconf.notifications.BaseNetconfNotificationListener;
+import org.opendaylight.controller.netconf.notifications.BaseNotificationPublisherRegistration;
+import org.opendaylight.controller.netconf.notifications.NetconfNotificationCollector;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Uri;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChange;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChangeBuilder;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.changed.by.parms.ChangedByBuilder;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.changed.by.parms.changed.by.server.or.user.ServerBuilder;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.model.api.Module;
+import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
+import org.opendaylight.yangtools.yang.model.api.SchemaContextProvider;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceReference;
+import org.osgi.util.tracker.ServiceTracker;
+import org.osgi.util.tracker.ServiceTrackerCustomizer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class YangStoreService implements YangStoreContext {
+
+    private static final Logger LOG = LoggerFactory.getLogger(YangStoreService.class);
 
     /**
-     * Module entry objects mapped to module names and namespaces.
+     * This is a rather interesting locking model. We need to guard against both the
+     * cache expiring from GC and being invalidated by schema context change. The
+     * context can change while we are doing processing, so we do not want to block
+     * it, so no synchronization can happen on the methods.
+     *
+     * So what we are doing is the following:
      *
-     * @return actual view of what is available in OSGi service registry.
+     * We synchronize with GC as usual, using a SoftReference.
+     *
+     * The atomic reference is used to synchronize with {@link #refresh()}, e.g. when
+     * refresh happens, it will push a SoftReference(null), e.g. simulate the GC. Now
+     * that may happen while the getter is already busy acting on the old schema context,
+     * so it needs to understand that a refresh has happened and retry. To do that, it
+     * attempts a CAS operation -- if it fails, in knows that the SoftReference has
+     * been replaced and thus it needs to retry.
+     *
+     * Note that {@link #getYangStoreSnapshot()} will still use synchronize() internally
+     * to stop multiple threads doing the same work.
      */
-    YangStoreSnapshot getYangStoreSnapshot() throws YangStoreException;
+    private final AtomicReference<SoftReference<YangStoreSnapshot>> ref =
+            new AtomicReference<>(new SoftReference<YangStoreSnapshot>(null));
+
+    private final SchemaContextProvider schemaContextProvider;
+    private final BaseNetconfNotificationListener notificationPublisher;
+
+    private final ExecutorService notificationExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() {
+        @Override
+        public Thread newThread(final Runnable r) {
+            return new Thread(r, "config-netconf-connector-capability-notifications");
+        }
+    });
+
+    public YangStoreService(final SchemaContextProvider schemaContextProvider, final BundleContext context) {
+        this(schemaContextProvider, new NotificationCollectorTracker(context));
+    }
+
+    public YangStoreService(final SchemaContextProvider schemaContextProvider, final BaseNetconfNotificationListener notificationHandler) {
+        this.schemaContextProvider = schemaContextProvider;
+        this.notificationPublisher = notificationHandler;
+    }
+
+    private synchronized YangStoreContext getYangStoreSnapshot() {
+        SoftReference<YangStoreSnapshot> r = ref.get();
+        YangStoreSnapshot ret = r.get();
+
+        while (ret == null) {
+            // We need to be compute a new value
+            ret = new YangStoreSnapshot(schemaContextProvider.getSchemaContext());
+
+            if (!ref.compareAndSet(r, new SoftReference<>(ret))) {
+                LOG.debug("Concurrent refresh detected, recomputing snapshot");
+                r = ref.get();
+                ret = null;
+            }
+        }
+
+        return ret;
+    }
+
+    @Override
+    public Map<String, Map<String, ModuleMXBeanEntry>> getModuleMXBeanEntryMap() {
+        return getYangStoreSnapshot().getModuleMXBeanEntryMap();
+    }
+
+    @Override
+    public Map<QName, Map<String, ModuleMXBeanEntry>> getQNamesToIdentitiesToModuleMXBeanEntries() {
+        return getYangStoreSnapshot().getQNamesToIdentitiesToModuleMXBeanEntries();
+    }
+
+    @Override
+    public Set<Module> getModules() {
+        return getYangStoreSnapshot().getModules();
+    }
+
+    @Override
+    public String getModuleSource(final ModuleIdentifier moduleIdentifier) {
+        return getYangStoreSnapshot().getModuleSource(moduleIdentifier);
+    }
+
+    public void refresh() {
+        final YangStoreSnapshot previous = ref.get().get();
+        ref.set(new SoftReference<YangStoreSnapshot>(null));
+        notificationExecutor.submit(new CapabilityChangeNotifier(previous));
+    }
+
+    private final class CapabilityChangeNotifier implements Runnable {
+        private final YangStoreSnapshot previous;
+
+        public CapabilityChangeNotifier(final YangStoreSnapshot previous) {
+            this.previous = previous;
+        }
+
+        @Override
+        public void run() {
+            final YangStoreContext current = getYangStoreSnapshot();
+
+            if(current.equals(previous) == false) {
+                notificationPublisher.onCapabilityChanged(computeDiff(previous, current));
+            }
+        }
+    }
+
+    private static final Function<Module, Uri> MODULE_TO_URI = new Function<Module, Uri>() {
+        @Override
+        public Uri apply(final Module input) {
+            final QName qName = QName.cachedReference(QName.create(input.getQNameModule(), input.getName()));
+            return new Uri(qName.toString());
+        }
+    };
+
+    static NetconfCapabilityChange computeDiff(final YangStoreContext previous, final YangStoreContext current) {
+        final Sets.SetView<Module> removed = Sets.difference(previous.getModules(), current.getModules());
+        final Sets.SetView<Module> added = Sets.difference(current.getModules(), previous.getModules());
+
+        final NetconfCapabilityChangeBuilder netconfCapabilityChangeBuilder = new NetconfCapabilityChangeBuilder();
+        netconfCapabilityChangeBuilder.setChangedBy(new ChangedByBuilder().setServerOrUser(new ServerBuilder().setServer(true).build()).build());
+        netconfCapabilityChangeBuilder.setDeletedCapability(Lists.newArrayList(Collections2.transform(removed, MODULE_TO_URI)));
+        netconfCapabilityChangeBuilder.setAddedCapability(Lists.newArrayList(Collections2.transform(added, MODULE_TO_URI)));
+        // TODO modified should be computed ... but why ?
+        netconfCapabilityChangeBuilder.setModifiedCapability(Collections.<Uri>emptyList());
+        return netconfCapabilityChangeBuilder.build();
+    }
+
+
+    /**
+     * Looks for NetconfNotificationCollector service and publishes base netconf notifications if possible
+     */
+    private static class NotificationCollectorTracker implements ServiceTrackerCustomizer<NetconfNotificationCollector, NetconfNotificationCollector>, BaseNetconfNotificationListener, AutoCloseable {
+
+        private final BundleContext context;
+        private final ServiceTracker<NetconfNotificationCollector, NetconfNotificationCollector> listenerTracker;
+        private BaseNotificationPublisherRegistration publisherReg;
+
+        public NotificationCollectorTracker(final BundleContext context) {
+            this.context = context;
+            listenerTracker = new ServiceTracker<>(context, NetconfNotificationCollector.class, this);
+            listenerTracker.open();
+        }
+
+        @Override
+        public synchronized NetconfNotificationCollector addingService(final ServiceReference<NetconfNotificationCollector> reference) {
+            closePublisherRegistration();
+            publisherReg = context.getService(reference).registerBaseNotificationPublisher();
+            return null;
+        }
+
+        @Override
+        public synchronized void modifiedService(final ServiceReference<NetconfNotificationCollector> reference, final NetconfNotificationCollector service) {
+            closePublisherRegistration();
+            publisherReg = context.getService(reference).registerBaseNotificationPublisher();
+        }
+
+        @Override
+        public synchronized void removedService(final ServiceReference<NetconfNotificationCollector> reference, final NetconfNotificationCollector service) {
+            closePublisherRegistration();
+            publisherReg = null;
+        }
+
+        private void closePublisherRegistration() {
+            if(publisherReg != null) {
+                publisherReg.close();
+            }
+        }
+
+        @Override
+        public synchronized void close() {
+            closePublisherRegistration();
+            listenerTracker.close();
+        }
+
+        @Override
+        public void onCapabilityChanged(final NetconfCapabilityChange capabilityChange) {
+            if(publisherReg == null) {
+                LOG.warn("Omitting notification due to missing notification service: {}", capabilityChange);
+                return;
+            }
 
+            publisherReg.onCapabilityChanged(capabilityChange);
+        }
+    }
 }
diff --git a/opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/osgi/YangStoreServiceImpl.java b/opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/osgi/YangStoreServiceImpl.java
deleted file mode 100644 (file)
index 958af54..0000000
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.netconf.confignetconfconnector.osgi;
-
-import java.lang.ref.SoftReference;
-import java.util.concurrent.atomic.AtomicReference;
-import org.opendaylight.yangtools.yang.model.api.SchemaContextProvider;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class YangStoreServiceImpl implements YangStoreService {
-    private static final Logger LOG = LoggerFactory.getLogger(YangStoreServiceImpl.class);
-
-    /**
-     * This is a rather interesting locking model. We need to guard against both the
-     * cache expiring from GC and being invalidated by schema context change. The
-     * context can change while we are doing processing, so we do not want to block
-     * it, so no synchronization can happen on the methods.
-     *
-     * So what we are doing is the following:
-     *
-     * We synchronize with GC as usual, using a SoftReference.
-     *
-     * The atomic reference is used to synchronize with {@link #refresh()}, e.g. when
-     * refresh happens, it will push a SoftReference(null), e.g. simulate the GC. Now
-     * that may happen while the getter is already busy acting on the old schema context,
-     * so it needs to understand that a refresh has happened and retry. To do that, it
-     * attempts a CAS operation -- if it fails, in knows that the SoftReference has
-     * been replaced and thus it needs to retry.
-     *
-     * Note that {@link #getYangStoreSnapshot()} will still use synchronize() internally
-     * to stop multiple threads doing the same work.
-     */
-    private final AtomicReference<SoftReference<YangStoreSnapshotImpl>> ref = new AtomicReference<>(new SoftReference<YangStoreSnapshotImpl>(null));
-    private final SchemaContextProvider service;
-
-    public YangStoreServiceImpl(final SchemaContextProvider service) {
-        this.service = service;
-    }
-
-    @Override
-    public synchronized YangStoreSnapshotImpl getYangStoreSnapshot() throws YangStoreException {
-        SoftReference<YangStoreSnapshotImpl> r = ref.get();
-        YangStoreSnapshotImpl ret = r.get();
-
-        while (ret == null) {
-            // We need to be compute a new value
-            ret = new YangStoreSnapshotImpl(service.getSchemaContext());
-
-            if (!ref.compareAndSet(r, new SoftReference<>(ret))) {
-                LOG.debug("Concurrent refresh detected, recomputing snapshot");
-                r = ref.get();
-                ret = null;
-            }
-        }
-
-        return ret;
-    }
-
-    /**
-     * Called when schema context changes, invalidates cache.
-     */
-    public void refresh() {
-        ref.set(new SoftReference<YangStoreSnapshotImpl>(null));
-    }
-}
index 8ec4fdd..0d33705 100644 (file)
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
+
 package org.opendaylight.controller.netconf.confignetconfconnector.osgi;
 
+import com.google.common.collect.Maps;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import org.opendaylight.controller.config.yangjmxgenerator.ModuleMXBeanEntry;
+import org.opendaylight.controller.config.yangjmxgenerator.PackageTranslator;
+import org.opendaylight.controller.config.yangjmxgenerator.ServiceInterfaceEntry;
+import org.opendaylight.controller.config.yangjmxgenerator.TypeProviderWrapper;
+import org.opendaylight.yangtools.sal.binding.yang.types.TypeProviderImpl;
 import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.model.api.IdentitySchemaNode;
 import org.opendaylight.yangtools.yang.model.api.Module;
-import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class YangStoreSnapshot implements YangStoreContext {
+    private static final Logger LOG = LoggerFactory.getLogger(YangStoreSnapshot.class);
+
+
+    private final Map<String /* Namespace from yang file */,
+    Map<String /* Name of module entry from yang file */, ModuleMXBeanEntry>> moduleMXBeanEntryMap;
+
+
+    private final Map<QName, Map<String, ModuleMXBeanEntry>> qNamesToIdentitiesToModuleMXBeanEntries;
+
+    private final SchemaContext schemaContext;
+
+    public YangStoreSnapshot(final SchemaContext resolveSchemaContext) {
+        LOG.trace("Resolved modules:{}", resolveSchemaContext.getModules());
+        this.schemaContext = resolveSchemaContext;
+        // JMX generator
+
+        Map<String, String> namespaceToPackageMapping = Maps.newHashMap();
+        PackageTranslator packageTranslator = new PackageTranslator(namespaceToPackageMapping);
+        Map<QName, ServiceInterfaceEntry> qNamesToSIEs = new HashMap<>();
+        Map<IdentitySchemaNode, ServiceInterfaceEntry> knownSEITracker = new HashMap<>();
+        // create SIE structure qNamesToSIEs
+        for (Module module : resolveSchemaContext.getModules()) {
+            String packageName = packageTranslator.getPackageName(module);
+            Map<QName, ServiceInterfaceEntry> namesToSIEntries = ServiceInterfaceEntry
+                    .create(module, packageName, knownSEITracker);
+            for (Entry<QName, ServiceInterfaceEntry> sieEntry : namesToSIEntries.entrySet()) {
+                // merge value into qNamesToSIEs
+                if (qNamesToSIEs.containsKey(sieEntry.getKey()) == false) {
+                    qNamesToSIEs.put(sieEntry.getKey(), sieEntry.getValue());
+                } else {
+                    throw new IllegalStateException("Cannot add two SIE with same qname "
+                            + sieEntry.getValue());
+                }
+            }
+        }
+
+        Map<String, Map<String, ModuleMXBeanEntry>> moduleMXBeanEntryMap = Maps.newHashMap();
+
+        Map<QName, Map<String /* identity local name */, ModuleMXBeanEntry>> qNamesToIdentitiesToModuleMXBeanEntries = new HashMap<>();
+
 
-public interface YangStoreSnapshot extends AutoCloseable {
+        for (Module module : schemaContext.getModules()) {
+            String packageName = packageTranslator.getPackageName(module);
+            TypeProviderWrapper typeProviderWrapper = new TypeProviderWrapper(
+                    new TypeProviderImpl(resolveSchemaContext));
 
-    /**
-     * @deprecated Use {@link #getQNamesToIdentitiesToModuleMXBeanEntries()} instead. This method return only one
-     * module representation even if multiple revisions are available.
-     */
-    @Deprecated
-    Map<String/* Namespace from yang file */,
-            Map<String /* Name of module entry from yang file */, ModuleMXBeanEntry>> getModuleMXBeanEntryMap();
+            QName qName = QName.create(module.getNamespace(), module.getRevision(), module.getName());
 
+            Map<String /* MB identity local name */, ModuleMXBeanEntry> namesToMBEs =
+                    Collections.unmodifiableMap(ModuleMXBeanEntry.create(module, qNamesToSIEs, resolveSchemaContext,
+                            typeProviderWrapper, packageName));
+            moduleMXBeanEntryMap.put(module.getNamespace().toString(), namesToMBEs);
+
+            qNamesToIdentitiesToModuleMXBeanEntries.put(qName, namesToMBEs);
+        }
+        this.moduleMXBeanEntryMap = Collections.unmodifiableMap(moduleMXBeanEntryMap);
+        this.qNamesToIdentitiesToModuleMXBeanEntries = Collections.unmodifiableMap(qNamesToIdentitiesToModuleMXBeanEntries);
+
+    }
+
+    @Override
+    public Map<String, Map<String, ModuleMXBeanEntry>> getModuleMXBeanEntryMap() {
+        return moduleMXBeanEntryMap;
+    }
+
+    @Override
+    public Map<QName, Map<String, ModuleMXBeanEntry>> getQNamesToIdentitiesToModuleMXBeanEntries() {
+        return qNamesToIdentitiesToModuleMXBeanEntries;
+    }
+
+    @Override
+    public Set<Module> getModules() {
+        return schemaContext.getModules();
+    }
+
+    @Override
+    public String getModuleSource(final org.opendaylight.yangtools.yang.model.api.ModuleIdentifier moduleIdentifier) {
+        return schemaContext.getModuleSource(moduleIdentifier).get();
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
 
-    Map<QName, Map<String /* identity local name */, ModuleMXBeanEntry>> getQNamesToIdentitiesToModuleMXBeanEntries();
+        final YangStoreSnapshot that = (YangStoreSnapshot) o;
 
-    /**
-     * Get all modules discovered when this snapshot was created.
-     * @return all modules discovered. If one module exists with two different revisions, return both.
-     */
-    Set<Module> getModules();
+        if (schemaContext != null ? !schemaContext.equals(that.schemaContext) : that.schemaContext != null)
+            return false;
 
-    String getModuleSource(ModuleIdentifier moduleIdentifier);
+        return true;
+    }
 
     @Override
-    void close();
+    public int hashCode() {
+        return schemaContext != null ? schemaContext.hashCode() : 0;
+    }
 }
diff --git a/opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/osgi/YangStoreSnapshotImpl.java b/opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/osgi/YangStoreSnapshotImpl.java
deleted file mode 100644 (file)
index 075ae63..0000000
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.netconf.confignetconfconnector.osgi;
-
-import com.google.common.collect.Maps;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import org.opendaylight.controller.config.yangjmxgenerator.ModuleMXBeanEntry;
-import org.opendaylight.controller.config.yangjmxgenerator.PackageTranslator;
-import org.opendaylight.controller.config.yangjmxgenerator.ServiceInterfaceEntry;
-import org.opendaylight.controller.config.yangjmxgenerator.TypeProviderWrapper;
-import org.opendaylight.yangtools.sal.binding.yang.types.TypeProviderImpl;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.model.api.IdentitySchemaNode;
-import org.opendaylight.yangtools.yang.model.api.Module;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class YangStoreSnapshotImpl implements YangStoreSnapshot {
-    private static final Logger LOG = LoggerFactory.getLogger(YangStoreSnapshotImpl.class);
-
-
-    private final Map<String /* Namespace from yang file */,
-    Map<String /* Name of module entry from yang file */, ModuleMXBeanEntry>> moduleMXBeanEntryMap;
-
-
-    private final Map<QName, Map<String, ModuleMXBeanEntry>> qNamesToIdentitiesToModuleMXBeanEntries;
-
-    private final SchemaContext schemaContext;
-
-
-    public YangStoreSnapshotImpl(final SchemaContext resolveSchemaContext) {
-        LOG.trace("Resolved modules:{}", resolveSchemaContext.getModules());
-        this.schemaContext = resolveSchemaContext;
-        // JMX generator
-
-        Map<String, String> namespaceToPackageMapping = Maps.newHashMap();
-        PackageTranslator packageTranslator = new PackageTranslator(namespaceToPackageMapping);
-        Map<QName, ServiceInterfaceEntry> qNamesToSIEs = new HashMap<>();
-        Map<IdentitySchemaNode, ServiceInterfaceEntry> knownSEITracker = new HashMap<>();
-        // create SIE structure qNamesToSIEs
-        for (Module module : resolveSchemaContext.getModules()) {
-            String packageName = packageTranslator.getPackageName(module);
-            Map<QName, ServiceInterfaceEntry> namesToSIEntries = ServiceInterfaceEntry
-                    .create(module, packageName, knownSEITracker);
-            for (Entry<QName, ServiceInterfaceEntry> sieEntry : namesToSIEntries.entrySet()) {
-                // merge value into qNamesToSIEs
-                if (qNamesToSIEs.containsKey(sieEntry.getKey()) == false) {
-                    qNamesToSIEs.put(sieEntry.getKey(), sieEntry.getValue());
-                } else {
-                    throw new IllegalStateException("Cannot add two SIE with same qname "
-                            + sieEntry.getValue());
-                }
-            }
-        }
-
-        Map<String, Map<String, ModuleMXBeanEntry>> moduleMXBeanEntryMap = Maps.newHashMap();
-
-        Map<QName, Map<String /* identity local name */, ModuleMXBeanEntry>> qNamesToIdentitiesToModuleMXBeanEntries = new HashMap<>();
-
-
-        for (Module module : schemaContext.getModules()) {
-            String packageName = packageTranslator.getPackageName(module);
-            TypeProviderWrapper typeProviderWrapper = new TypeProviderWrapper(
-                    new TypeProviderImpl(resolveSchemaContext));
-
-            QName qName = QName.create(module.getNamespace(), module.getRevision(), module.getName());
-
-            Map<String /* MB identity local name */, ModuleMXBeanEntry> namesToMBEs =
-                    Collections.unmodifiableMap(ModuleMXBeanEntry.create(module, qNamesToSIEs, resolveSchemaContext,
-                            typeProviderWrapper, packageName));
-            moduleMXBeanEntryMap.put(module.getNamespace().toString(), namesToMBEs);
-
-            qNamesToIdentitiesToModuleMXBeanEntries.put(qName, namesToMBEs);
-        }
-        this.moduleMXBeanEntryMap = Collections.unmodifiableMap(moduleMXBeanEntryMap);
-        this.qNamesToIdentitiesToModuleMXBeanEntries = Collections.unmodifiableMap(qNamesToIdentitiesToModuleMXBeanEntries);
-
-    }
-
-    @Override
-    public Map<String, Map<String, ModuleMXBeanEntry>> getModuleMXBeanEntryMap() {
-        return moduleMXBeanEntryMap;
-    }
-
-    @Override
-    public Map<QName, Map<String, ModuleMXBeanEntry>> getQNamesToIdentitiesToModuleMXBeanEntries() {
-        return qNamesToIdentitiesToModuleMXBeanEntries;
-    }
-
-    @Override
-    public Set<Module> getModules() {
-        return schemaContext.getModules();
-    }
-
-    @Override
-    public String getModuleSource(final org.opendaylight.yangtools.yang.model.api.ModuleIdentifier moduleIdentifier) {
-        return schemaContext.getModuleSource(moduleIdentifier).get();
-    }
-
-    @Override
-    public void close() {
-
-    }
-}
index 6f9a62a..f1fc277 100644 (file)
@@ -13,6 +13,8 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -89,8 +91,8 @@ import org.opendaylight.controller.netconf.confignetconfconnector.operations.edi
 import org.opendaylight.controller.netconf.confignetconfconnector.operations.get.Get;
 import org.opendaylight.controller.netconf.confignetconfconnector.operations.getconfig.GetConfig;
 import org.opendaylight.controller.netconf.confignetconfconnector.operations.runtimerpc.RuntimeRpc;
-import org.opendaylight.controller.netconf.confignetconfconnector.osgi.YangStoreServiceImpl;
-import org.opendaylight.controller.netconf.confignetconfconnector.osgi.YangStoreSnapshot;
+import org.opendaylight.controller.netconf.confignetconfconnector.osgi.YangStoreContext;
+import org.opendaylight.controller.netconf.confignetconfconnector.osgi.YangStoreService;
 import org.opendaylight.controller.netconf.confignetconfconnector.transactions.TransactionProvider;
 import org.opendaylight.controller.netconf.impl.mapping.operations.DefaultCloseSession;
 import org.opendaylight.controller.netconf.impl.osgi.NetconfOperationRouter;
@@ -109,6 +111,9 @@ import org.opendaylight.yangtools.yang.model.api.Module;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.opendaylight.yangtools.yang.model.api.SchemaContextProvider;
 import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl;
+import org.osgi.framework.Filter;
+import org.osgi.framework.ServiceListener;
+import org.osgi.framework.ServiceReference;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.w3c.dom.Document;
@@ -132,7 +137,7 @@ public class NetconfMappingTest extends AbstractConfigTest {
     private TestImplModuleFactory factory4;
 
     @Mock
-    YangStoreSnapshot yangStoreSnapshot;
+    YangStoreContext yangStoreSnapshot;
     @Mock
     NetconfOperationRouter netconfOperationRouter;
     @Mock
@@ -143,6 +148,13 @@ public class NetconfMappingTest extends AbstractConfigTest {
     @Before
     public void setUp() throws Exception {
         MockitoAnnotations.initMocks(this);
+
+
+        final Filter filter = mock(Filter.class);
+        doReturn(filter).when(mockedContext).createFilter(anyString());
+        doNothing().when(mockedContext).addServiceListener(any(ServiceListener.class), anyString());
+        doReturn(new ServiceReference<?>[]{}).when(mockedContext).getServiceReferences(anyString(), anyString());
+
         doReturn(getMbes()).when(this.yangStoreSnapshot).getModuleMXBeanEntryMap();
         doReturn(getModules()).when(this.yangStoreSnapshot).getModules();
         doNothing().when(netconfOperationServiceSnapshot).close();
@@ -151,6 +163,8 @@ public class NetconfMappingTest extends AbstractConfigTest {
         this.factory2 = new DepTestImplModuleFactory();
         this.factory3 = new IdentityTestModuleFactory();
         factory4 = new TestImplModuleFactory();
+
+
         super.initConfigTransactionManagerImpl(new HardcodedModuleFactoriesResolver(mockedContext, this.factory, this.factory2,
                 this.factory3, factory4));
 
@@ -629,13 +643,13 @@ public class NetconfMappingTest extends AbstractConfigTest {
 
         YangParserImpl yangParser = new YangParserImpl();
         final SchemaContext schemaContext = yangParser.resolveSchemaContext(new HashSet<>(yangParser.parseYangModelsFromStreamsMapped(yangDependencies).values()));
-        YangStoreServiceImpl yangStoreService = new YangStoreServiceImpl(new SchemaContextProvider() {
+        YangStoreService yangStoreService = new YangStoreService(new SchemaContextProvider() {
             @Override
             public SchemaContext getSchemaContext() {
                 return schemaContext ;
             }
-        });
-        mBeanEntries.putAll(yangStoreService.getYangStoreSnapshot().getModuleMXBeanEntryMap());
+        }, mockedContext);
+        mBeanEntries.putAll(yangStoreService.getModuleMXBeanEntryMap());
 
         return mBeanEntries;
     }
index 817bedf..ad57f89 100644 (file)
@@ -41,14 +41,14 @@ import org.opendaylight.controller.netconf.confignetconfconnector.mapping.config
 import org.opendaylight.controller.netconf.confignetconfconnector.mapping.config.Services;
 import org.opendaylight.controller.netconf.confignetconfconnector.operations.ValidateTest;
 import org.opendaylight.controller.netconf.confignetconfconnector.operations.editconfig.EditConfigXmlParser.EditConfigExecution;
-import org.opendaylight.controller.netconf.confignetconfconnector.osgi.YangStoreSnapshot;
+import org.opendaylight.controller.netconf.confignetconfconnector.osgi.YangStoreContext;
 import org.opendaylight.controller.netconf.confignetconfconnector.transactions.TransactionProvider;
 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
 
 public class EditConfigTest {
 
     @Mock
-    private YangStoreSnapshot yangStoreSnapshot;
+    private YangStoreContext yangStoreSnapshot;
     @Mock
     private TransactionProvider provider;
     @Mock
diff --git a/opendaylight/netconf/config-netconf-connector/src/test/java/org/opendaylight/controller/netconf/confignetconfconnector/osgi/NetconfOperationServiceImplTest.java b/opendaylight/netconf/config-netconf-connector/src/test/java/org/opendaylight/controller/netconf/confignetconfconnector/osgi/NetconfOperationServiceImplTest.java
deleted file mode 100644 (file)
index 413aa5c..0000000
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.netconf.confignetconfconnector.osgi;
-
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import java.net.URI;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.Map;
-import java.util.Set;
-import org.hamcrest.CoreMatchers;
-import org.junit.Assert;
-import org.junit.Test;
-import org.opendaylight.controller.config.api.LookupRegistry;
-import org.opendaylight.controller.config.yangjmxgenerator.ModuleMXBeanEntry;
-import org.opendaylight.yangtools.yang.common.QName;
-
-public class NetconfOperationServiceImplTest {
-
-    private static final Date date1970_01_01;
-
-    static {
-        try {
-            date1970_01_01 = new SimpleDateFormat("yyyy-MM-dd").parse("1970-01-01");
-        } catch (ParseException e) {
-            throw new IllegalStateException(e);
-        }
-    }
-
-    @Test
-    public void testCheckConsistencyBetweenYangStoreAndConfig_ok() throws Exception {
-        NetconfOperationServiceImpl.checkConsistencyBetweenYangStoreAndConfig(
-                mockJmxClient("qname1", "qname2"),
-                mockYangStoreSnapshot("qname2", "qname1"));
-    }
-
-    @Test
-    public void testCheckConsistencyBetweenYangStoreAndConfig_ok2() throws Exception {
-        NetconfOperationServiceImpl.checkConsistencyBetweenYangStoreAndConfig(
-                mockJmxClient("qname1", "qname2", "qname4", "qname5"),
-                mockYangStoreSnapshot("qname2", "qname1"));
-    }
-
-    @Test
-    public void testCheckConsistencyBetweenYangStoreAndConfig_ok3() throws Exception {
-        NetconfOperationServiceImpl.checkConsistencyBetweenYangStoreAndConfig(
-                mockJmxClient(),
-                mockYangStoreSnapshot());
-    }
-
-    @Test
-    public void testCheckConsistencyBetweenYangStoreAndConfig_yangStoreMore() throws Exception {
-        try {
-            NetconfOperationServiceImpl.checkConsistencyBetweenYangStoreAndConfig(mockJmxClient("qname1"),
-                    mockYangStoreSnapshot("qname2", "qname1"));
-            fail("An exception of type " + IllegalStateException.class + " was expected");
-        } catch (IllegalStateException e) {
-            String message = e.getMessage();
-            Assert.assertThat(
-                    message,
-                    CoreMatchers
-                    .containsString("missing from config subsystem but present in yangstore: [(namespace?revision=1970-01-01)qname2]"));
-            Assert.assertThat(
-                    message,
-                    CoreMatchers
-                    .containsString("All modules present in config: [(namespace?revision=1970-01-01)qname1]"));
-        }
-    }
-
-    private YangStoreSnapshot mockYangStoreSnapshot(final String... qnames) {
-        YangStoreSnapshot mock = mock(YangStoreSnapshot.class);
-
-        Map<String, Map<String, ModuleMXBeanEntry>> map = Maps.newHashMap();
-
-        Map<String, ModuleMXBeanEntry> innerMap = Maps.newHashMap();
-
-        int i = 1;
-        for (String qname : qnames) {
-            innerMap.put(Integer.toString(i++), mockMBeanEntry(qname));
-        }
-
-        map.put("1", innerMap);
-
-        doReturn(map).when(mock).getModuleMXBeanEntryMap();
-
-        return mock;
-    }
-
-    private ModuleMXBeanEntry mockMBeanEntry(final String qname) {
-        ModuleMXBeanEntry mock = mock(ModuleMXBeanEntry.class);
-        QName q = getQName(qname);
-        doReturn(q).when(mock).getYangModuleQName();
-        return mock;
-    }
-
-    private QName getQName(final String qname) {
-        return QName.create(URI.create("namespace"), date1970_01_01, qname);
-    }
-
-    private LookupRegistry mockJmxClient(final String... visibleQNames) {
-        LookupRegistry mock = mock(LookupRegistry.class);
-        Set<String> qnames = Sets.newHashSet();
-        for (String visibleQName : visibleQNames) {
-            QName q = getQName(visibleQName);
-            qnames.add(q.toString());
-        }
-        doReturn(qnames).when(mock).getAvailableModuleFactoryQNames();
-        return mock;
-    }
-}
index 7155eb8..2bd919d 100644 (file)
@@ -20,6 +20,7 @@
           <username xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">admin</username>
           <password xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">admin</password>
           <tcp-only xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">false</tcp-only>
+          <reconnect-on-changed-schema>true</reconnect-on-changed-schema>
           <event-executor xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">
             <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:netty">prefix:netty-event-executor</type>
             <name>global-event-executor</name>
index aeab13f..2178d4e 100644 (file)
@@ -31,6 +31,7 @@ import org.opendaylight.controller.netconf.mapping.api.NetconfOperation;
 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationChainedExecution;
 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationService;
 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationServiceSnapshot;
+import org.opendaylight.controller.netconf.mapping.api.SessionAwareNetconfOperation;
 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -147,6 +148,9 @@ public class NetconfOperationRouterImpl implements NetconfOperationRouter {
             if (netconfOperation instanceof DefaultNetconfOperation) {
                 ((DefaultNetconfOperation) netconfOperation).setNetconfSession(session);
             }
+            if(netconfOperation instanceof SessionAwareNetconfOperation) {
+                ((SessionAwareNetconfOperation) netconfOperation).setSession(session);
+            }
             if (!handlingPriority.equals(HandlingPriority.CANNOT_HANDLE)) {
 
                 Preconditions.checkState(!sortedPriority.containsKey(handlingPriority),
index fd362f8..bf13853 100644 (file)
@@ -52,10 +52,7 @@ import org.opendaylight.controller.netconf.client.SimpleNetconfClientSessionList
 import org.opendaylight.controller.netconf.client.conf.NetconfClientConfiguration;
 import org.opendaylight.controller.netconf.client.conf.NetconfClientConfigurationBuilder;
 import org.opendaylight.controller.netconf.confignetconfconnector.osgi.NetconfOperationServiceFactoryImpl;
-import org.opendaylight.controller.netconf.confignetconfconnector.osgi.YangStoreException;
 import org.opendaylight.controller.netconf.confignetconfconnector.osgi.YangStoreService;
-import org.opendaylight.controller.netconf.confignetconfconnector.osgi.YangStoreServiceImpl;
-import org.opendaylight.controller.netconf.confignetconfconnector.osgi.YangStoreSnapshot;
 import org.opendaylight.controller.netconf.impl.DefaultCommitNotificationProducer;
 import org.opendaylight.controller.netconf.impl.NetconfServerDispatcher;
 import org.opendaylight.controller.netconf.impl.NetconfServerSessionNegotiatorFactory;
@@ -67,8 +64,10 @@ import org.opendaylight.controller.netconf.impl.osgi.SessionMonitoringService;
 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationProvider;
 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationService;
 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationServiceFactory;
+import org.opendaylight.controller.netconf.notifications.BaseNetconfNotificationListener;
 import org.opendaylight.controller.netconf.util.test.XmlFileLoader;
 import org.opendaylight.protocol.framework.NeverReconnectStrategy;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChange;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.opendaylight.yangtools.yang.model.api.SchemaContextProvider;
 import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl;
@@ -176,7 +175,7 @@ public abstract class AbstractNetconfConfigTest extends AbstractConfigTest {
         return clientDispatcher;
     }
 
-    private HardcodedYangStoreService getYangStore() throws YangStoreException, IOException {
+    private HardcodedYangStoreService getYangStore() throws IOException {
         final Collection<InputStream> yangDependencies = getBasicYangs();
         return new HardcodedYangStoreService(yangDependencies);
     }
@@ -246,22 +245,35 @@ public abstract class AbstractNetconfConfigTest extends AbstractConfigTest {
         return b.build();
     }
 
-    public static final class HardcodedYangStoreService implements YangStoreService {
-
-        private final List<InputStream> byteArrayInputStreams;
+    public static final class HardcodedYangStoreService extends YangStoreService {
+        public HardcodedYangStoreService(final Collection<? extends InputStream> inputStreams) throws IOException {
+            super(new SchemaContextProvider() {
+                @Override
+                public SchemaContext getSchemaContext() {
+                    return getSchema(inputStreams);
+                }
+            }, new BaseNetconfNotificationListener() {
+                @Override
+                public void onCapabilityChanged(final NetconfCapabilityChange capabilityChange) {
+                    // NOOP
+                }
+            });
+        }
 
-        public HardcodedYangStoreService(final Collection<? extends InputStream> inputStreams) throws YangStoreException, IOException {
-            byteArrayInputStreams = new ArrayList<>();
+        private static SchemaContext getSchema(final Collection<? extends InputStream> inputStreams) {
+            final ArrayList<InputStream> byteArrayInputStreams = new ArrayList<>();
             for (final InputStream inputStream : inputStreams) {
                 assertNotNull(inputStream);
-                final byte[] content = IOUtils.toByteArray(inputStream);
+                final byte[] content;
+                try {
+                    content = IOUtils.toByteArray(inputStream);
+                } catch (IOException e) {
+                    throw new IllegalStateException("Cannot read " + inputStream, e);
+                }
                 final ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(content);
                 byteArrayInputStreams.add(byteArrayInputStream);
             }
-        }
 
-        @Override
-        public YangStoreSnapshot getYangStoreSnapshot() throws YangStoreException {
             for (final InputStream inputStream : byteArrayInputStreams) {
                 try {
                     inputStream.reset();
@@ -271,14 +283,7 @@ public abstract class AbstractNetconfConfigTest extends AbstractConfigTest {
             }
 
             final YangParserImpl yangParser = new YangParserImpl();
-            final SchemaContext schemaContext = yangParser.resolveSchemaContext(new HashSet<>(yangParser.parseYangModelsFromStreamsMapped(byteArrayInputStreams).values()));
-            final YangStoreServiceImpl yangStoreService = new YangStoreServiceImpl(new SchemaContextProvider() {
-                @Override
-                public SchemaContext getSchemaContext() {
-                    return schemaContext ;
-                }
-            });
-            return yangStoreService.getYangStoreSnapshot();
+            return yangParser.resolveSchemaContext(new HashSet<>(yangParser.parseYangModelsFromStreamsMapped(byteArrayInputStreams).values()));
         }
     }
 }
index a938fbf..029aeff 100644 (file)
@@ -58,7 +58,6 @@ import org.opendaylight.controller.netconf.util.messages.NetconfMessageUtil;
 import org.opendaylight.controller.netconf.util.osgi.NetconfConfigUtil;
 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
 import org.opendaylight.controller.sal.connect.api.RemoteDevice;
-import org.opendaylight.controller.sal.connect.api.RemoteDeviceCommunicator;
 import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCommunicator;
 import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionPreferences;
 import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
@@ -199,8 +198,8 @@ public class NetconfITSecureTest extends AbstractNetconfConfigTest {
     }
 
     static NetconfDeviceCommunicator getSessionListener() {
-        RemoteDevice<NetconfSessionPreferences, NetconfMessage> mockedRemoteDevice = mock(RemoteDevice.class);
-        doNothing().when(mockedRemoteDevice).onRemoteSessionUp(any(NetconfSessionPreferences.class), any(RemoteDeviceCommunicator.class));
+        RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> mockedRemoteDevice = mock(RemoteDevice.class);
+        doNothing().when(mockedRemoteDevice).onRemoteSessionUp(any(NetconfSessionPreferences.class), any(NetconfDeviceCommunicator.class));
         doNothing().when(mockedRemoteDevice).onRemoteSessionDown();
         return new NetconfDeviceCommunicator(new RemoteDeviceId("secure-test"), mockedRemoteDevice);
     }
index 96d72cb..052d3dc 100644 (file)
@@ -43,6 +43,7 @@ public class NeutronNorthboundRSApplication extends Application {
         classes.add(NeutronLoadBalancerPoolNorthbound.class);
         classes.add(NeutronLoadBalancerHealthMonitorNorthbound.class);
         classes.add(NeutronLoadBalancerPoolMembersNorthbound.class);
+      classes.add(MOXyJsonProvider.class);
         return classes;
     }
 
@@ -56,9 +57,10 @@ public class NeutronNorthboundRSApplication extends Application {
         moxyJsonProvider.setMarshalEmptyCollections(true);
         moxyJsonProvider.setValueWrapper("$");
 
-        Map<String, String> namespacePrefixMapper = new HashMap<String, String>(1);
+        Map<String, String> namespacePrefixMapper = new HashMap<String, String>(3);
         namespacePrefixMapper.put("router", "router");        // FIXME: fill in with XSD
         namespacePrefixMapper.put("provider", "provider");    // FIXME: fill in with XSD
+        namespacePrefixMapper.put("binding", "binding");
         moxyJsonProvider.setNamespacePrefixMapper(namespacePrefixMapper);
         moxyJsonProvider.setNamespaceSeparator(':');
 
index a529599..3853988 100644 (file)
@@ -8,6 +8,7 @@
 
 package org.opendaylight.controller.networkconfig.neutron;
 
+
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -62,6 +63,16 @@ public class NeutronPort implements Serializable, INeutronObject {
     @XmlElement (name="security_groups")
     List<NeutronSecurityGroup> securityGroups;
 
+    @XmlElement (namespace= "binding", name="host_id")
+    String bindinghostID;
+
+    @XmlElement (namespace= "binding", name="vnic_type")
+    String bindingvnicType;
+
+    @XmlElement (namespace= "binding", name="vif_type")
+    String bindingvifType;
+
+
     /* this attribute stores the floating IP address assigned to
      * each fixed IP address
      */
@@ -169,6 +180,30 @@ public class NeutronPort implements Serializable, INeutronObject {
         this.securityGroups = securityGroups;
     }
 
+    public String getBindinghostID() {
+      return bindinghostID;
+    }
+
+    public void setBindinghostID(String bindinghostID) {
+      this.bindinghostID = bindinghostID;
+    }
+
+  public String getBindingvnicType() {
+    return bindingvnicType;
+  }
+
+  public void setBindingvnicType(String bindingvnicType) {
+    this.bindingvnicType = bindingvnicType;
+  }
+
+  public String getBindingvifType() {
+    return bindingvifType;
+  }
+
+  public void setBindingvifType(String bindingvifType) {
+    this.bindingvifType = bindingvifType;
+  }
+
     public NeutronFloatingIP getFloatingIP(String key) {
         if (!floatingIPMap.containsKey(key)) {
             return null;
@@ -271,6 +306,8 @@ public class NeutronPort implements Serializable, INeutronObject {
         return "NeutronPort [portUUID=" + portUUID + ", networkUUID=" + networkUUID + ", name=" + name
                 + ", adminStateUp=" + adminStateUp + ", status=" + status + ", macAddress=" + macAddress
                 + ", fixedIPs=" + fixedIPs + ", deviceID=" + deviceID + ", deviceOwner=" + deviceOwner + ", tenantID="
-                + tenantID + ", floatingIPMap=" + floatingIPMap + ", securityGroups=" + securityGroups + "]";
+                + tenantID + ", floatingIPMap=" + floatingIPMap + ", securityGroups=" + securityGroups
+                + ", bindinghostID=" + bindinghostID + ", bindingvnicType=" + bindingvnicType
+                + ", bindingvnicType=" + bindingvnicType + "]";
     }
 }