Merge "Add signature marker and version to NormalizedNode streaming"
authorMoiz Raja <moraja@cisco.com>
Tue, 10 Feb 2015 01:43:11 +0000 (01:43 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Tue, 10 Feb 2015 01:43:11 +0000 (01:43 +0000)
80 files changed:
karaf/karaf-parent/pom.xml
karaf/opendaylight-karaf-empty/pom.xml
karaf/opendaylight-karaf/pom.xml
opendaylight/archetypes/opendaylight-startup/src/main/resources/archetype-resources/__artifactId__-features/pom.xml
opendaylight/archetypes/opendaylight-startup/src/main/resources/archetype-resources/__artifactId__-impl/src/main/config/default-config.xml
opendaylight/archetypes/opendaylight-startup/src/main/resources/archetype-resources/pom.xml
opendaylight/config/config-plugin-parent/pom.xml
opendaylight/config/logback-config-loader/pom.xml
opendaylight/config/logback-config/pom.xml
opendaylight/config/netty-config-api/pom.xml
opendaylight/config/netty-event-executor-config/pom.xml
opendaylight/config/netty-threadgroup-config/pom.xml
opendaylight/config/netty-timer-config/pom.xml
opendaylight/config/pom.xml
opendaylight/config/threadpool-config-api/pom.xml
opendaylight/config/threadpool-config-impl/pom.xml
opendaylight/config/yang-test-plugin/pom.xml
opendaylight/config/yang-test/pom.xml
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/Snapshot.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CommitEntry.java [deleted file]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/PersistEntry.java [deleted file]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/SaveSnapshot.java [deleted file]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReply.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshotReply.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/RequestVote.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/RequestVoteReply.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesTest.java
opendaylight/md-sal/sal-binding-it/pom.xml
opendaylight/md-sal/sal-binding-it/src/main/java/org/opendaylight/controller/test/sal/binding/it/TestHelper.java
opendaylight/md-sal/sal-binding-it/src/test/java/org/opendaylight/controller/test/sal/binding/it/AbstractTest.java
opendaylight/md-sal/sal-binding-it/src/test/java/org/opendaylight/controller/test/sal/binding/it/DataServiceTest.java
opendaylight/md-sal/sal-binding-it/src/test/java/org/opendaylight/controller/test/sal/binding/it/NotificationTest.java
opendaylight/md-sal/sal-binding-it/src/test/java/org/opendaylight/controller/test/sal/binding/it/RoutedServiceTest.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedActor.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardIdentifier.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationByteStringPayloadTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationPayloadTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/programs/appendentries/Client.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/ExecuteRpc.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/InvokeRpc.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/RpcResponse.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTable.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketImpl.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Messages.java
opendaylight/md-sal/sal-test-model/pom.xml
opendaylight/md-sal/sal-test-model/src/main/yang/opendaylight-test-notification.yang [new file with mode: 0644]
opendaylight/netconf/ietf-netconf-notifications/pom.xml [new file with mode: 0644]
opendaylight/netconf/ietf-netconf-notifications/src/main/yang/ietf-netconf-notifications@2012-02-06.yang [new file with mode: 0644]
opendaylight/netconf/ietf-netconf-notifications/src/main/yang/nc-notifications@2008-07-14.yang [new file with mode: 0644]
opendaylight/netconf/ietf-netconf-notifications/src/main/yang/notifications@2008-07-14.yang [new file with mode: 0644]
opendaylight/netconf/ietf-netconf/pom.xml [new file with mode: 0644]
opendaylight/netconf/ietf-netconf/src/main/yang/ietf-netconf@2011-06-01.yang [new file with mode: 0644]
opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/NetconfDeviceSimulator.java
opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/SimulatedLock.java [new file with mode: 0644]
opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/SimulatedUnLock.java [new file with mode: 0644]
opendaylight/netconf/pom.xml
pom.xml

index 06d8c8d99b7d9d222002f70207c17b7ea9c7ae1e..baf67302e0d49f68affda9850d15cd6910177970 100644 (file)
@@ -19,9 +19,7 @@ and is available at http://www.eclipse.org/legal/epl-v10.html
   <artifactId>karaf-parent</artifactId>
   <name>${project.artifactId}</name>
   <packaging>pom</packaging>
-  <prerequisites>
-    <maven>3.1.1</maven>
-  </prerequisites>
+
   <properties>
     <branding.version>1.1.0-SNAPSHOT</branding.version>
     <karaf.resources.version>1.5.0-SNAPSHOT</karaf.resources.version>
@@ -320,6 +318,26 @@ and is available at http://www.eclipse.org/legal/epl-v10.html
       </plugins>
     </pluginManagement>
     <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-enforcer-plugin</artifactId>
+        <version>${enforcer.version}</version>
+        <executions>
+          <execution>
+            <id>enforce-maven</id>
+            <goals>
+              <goal>enforce</goal>
+            </goals>
+            <configuration>
+              <rules>
+                <requireMavenVersion>
+                  <version>3.1.1</version>
+                </requireMavenVersion>
+              </rules>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
       <plugin>
         <artifactId>maven-resources-plugin</artifactId>
       </plugin>
index a13023cbeebdc74a47a7741933202075deef4b1a..aa772096cd958307341bbff1e652224adec0aa6f 100644 (file)
@@ -9,9 +9,6 @@
   </parent>
   <artifactId>opendaylight-karaf-empty</artifactId>
   <packaging>pom</packaging>
-  <prerequisites>
-    <maven>3.0</maven>
-  </prerequisites>
 
   <dependencies>
     <dependency>
index 9bf2309b6f034e9ea3ac37a7b06fe8d2a14b9f28..3b29fa276ec4c9b353603781946adcacd921a812 100644 (file)
@@ -9,9 +9,6 @@
   </parent>
   <artifactId>distribution.opendaylight-karaf</artifactId>
   <packaging>pom</packaging>
-  <prerequisites>
-    <maven>3.0</maven>
-  </prerequisites>
 
   <dependencies>
     <dependency>
index 67cc60ab0ffa2ca4a6532d2d7056df269d5fa1bd..49b43f442edba2810196bf470a146df77ba83a71 100644 (file)
@@ -13,6 +13,7 @@ and is available at http://www.eclipse.org/legal/epl-v10.html INTERNAL
     <groupId>org.opendaylight.odlparent</groupId>
     <artifactId>features-parent</artifactId>
     <version>1.5.0-SNAPSHOT</version>
+    <relativePath/>
   </parent>
   <groupId>${groupId}</groupId>
   <artifactId>${artifactId}-features</artifactId>
index db4efb83e38705650574bf6ce482f102b45f429f..e777fd25bcf00e1fc64246a6785f5f823b67f1c3 100644 (file)
@@ -12,6 +12,8 @@ and is available at http://www.eclipse.org/legal/epl-v10.html
 -->
 <snapshot>
   <required-capabilities>
+      <capability>urn:opendaylight:params:xml:ns:yang:${artifactId}:impl?module=${artifactId}-impl&amp;revision=2014-12-10</capability>
+      <capability>urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding?module=opendaylight-md-sal-binding&amp;revision=2013-10-28</capability>
   </required-capabilities>
   <configuration>
 
index ae209c1bbdd9f9284dc2a97cab1a0dcd92726278..3221efd3628b833a1fd71622d4c1b942041c3e96 100644 (file)
@@ -22,4 +22,23 @@ and is available at http://www.eclipse.org/legal/epl-v10.html INTERNAL
     <module>${artifactId}-features</module>
     <module>${artifactId}-artifacts</module>
   </modules>
+  <!-- DO NOT install or deploy the repo root pom as it's only needed to initiate a build -->
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-deploy-plugin</artifactId>
+        <configuration>
+          <skip>true</skip>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-install-plugin</artifactId>
+        <configuration>
+          <skip>true</skip>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
 </project>
index 67370e1e2f345b9a6b20fc72625234a202fd7b50..081df0c52de3e3b5b5fd44ad221c69516e703072 100644 (file)
@@ -11,9 +11,6 @@
   <artifactId>config-plugin-parent</artifactId>
   <packaging>pom</packaging>
   <name>${project.artifactId}</name>
-  <prerequisites>
-    <maven>3.0.4</maven>
-  </prerequisites>
 
   <build>
     <pluginManagement>
index 0f379fbe21808b351a3f37fd20ea6d578474f9bb..94f7f8fc4932f0a7f7f6ac543b7cf607cf44455f 100644 (file)
@@ -11,9 +11,6 @@
   <artifactId>logback-config-loader</artifactId>
   <packaging>bundle</packaging>
   <name>${project.artifactId}</name>
-  <prerequisites>
-    <maven>3.0.4</maven>
-  </prerequisites>
 
   <dependencies>
     <dependency>
index d918fd7ab7ad383796002509d506f8d3e353820a..d4537387aad1021913459ca14f6139d537f69dbf 100644 (file)
@@ -11,9 +11,6 @@
   <artifactId>logback-config</artifactId>
   <packaging>bundle</packaging>
   <name>${project.artifactId}</name>
-  <prerequisites>
-    <maven>3.0.4</maven>
-  </prerequisites>
 
   <dependencies>
     <dependency>
index a5c0831fb82aa16ca7bdb8175a709b7690f8c110..2d8145723d1b51f5be061b9ce384d84bdf563b01 100644 (file)
@@ -11,9 +11,6 @@
   <artifactId>netty-config-api</artifactId>
   <packaging>bundle</packaging>
   <name>${project.artifactId}</name>
-  <prerequisites>
-    <maven>3.0.4</maven>
-  </prerequisites>
 
   <dependencies>
     <dependency>
index 6188aed898be9ade5430e01635b01291c6779072..31940b91fed2a5f59947a7a845950076e0ae6d0b 100644 (file)
@@ -12,9 +12,6 @@
   <packaging>bundle</packaging>
   <name>${project.artifactId}</name>
   <description>Configuration Wrapper around netty's event executor</description>
-  <prerequisites>
-    <maven>3.0.4</maven>
-  </prerequisites>
 
   <dependencies>
     <dependency>
index 2f3d26dd2fe3a7379b1fb60422689229a3444c61..0f645015e1c453fc2e0527c962d62645f6ea6651 100644 (file)
@@ -14,9 +14,6 @@
   <packaging>bundle</packaging>
   <name>${project.artifactId}</name>
   <description>Configuration Wrapper around netty's event group</description>
-  <prerequisites>
-    <maven>3.0.4</maven>
-  </prerequisites>
 
   <dependencies>
     <dependency>
index 75b4709da26349014ef4f83923bbcb909a2c8fb5..181c1d01513537f7ece1d9a5c868d47abfdfdc29 100644 (file)
@@ -12,9 +12,6 @@
   <packaging>bundle</packaging>
   <name>${project.artifactId}</name>
   <description>Configuration Wrapper around netty's timer</description>
-  <prerequisites>
-    <maven>3.0.4</maven>
-  </prerequisites>
 
   <dependencies>
     <dependency>
index 4c4c5b3378f1bfb87f64544248e052ca43053cfa..6d14ad3957c6e03a51bd728b054ba6388d245075 100644 (file)
@@ -13,9 +13,7 @@
   <version>0.3.0-SNAPSHOT</version>
   <packaging>pom</packaging>
   <name>${project.artifactId}</name>
-  <prerequisites>
-    <maven>3.0.4</maven>
-  </prerequisites>
+
   <modules>
     <module>config-api</module>
     <module>config-manager</module>
index 5f0c941a19a7ed44ba89d792a9eb69ea0c1c68e9..9dc7bf59767d901556b0fc1e3a86d3b3282707c4 100644 (file)
@@ -11,9 +11,6 @@
   <artifactId>threadpool-config-api</artifactId>
   <packaging>bundle</packaging>
   <name>${project.artifactId}</name>
-  <prerequisites>
-    <maven>3.0.4</maven>
-  </prerequisites>
 
   <dependencies>
     <dependency>
index 2787b30df442d98ea0d2ae91aea4ab6eaada34fe..b875f5f3e4198f5d1dfe3b4ebe06c2cbca7d83ad 100644 (file)
@@ -11,9 +11,6 @@
   <artifactId>threadpool-config-impl</artifactId>
   <packaging>bundle</packaging>
   <name>${project.artifactId}</name>
-  <prerequisites>
-    <maven>3.0.4</maven>
-  </prerequisites>
 
   <dependencies>
     <dependency>
index d03cff305b68bc39cddf9f75f7996fd98d3fc5b8..690f8d24e6746671c4032360885670f3ff2e2467 100644 (file)
@@ -12,9 +12,6 @@
   <name>${project.artifactId}</name>
 
   <description>Remove generated source files, after new files generation, implementation is inserted.</description>
-  <prerequisites>
-    <maven>3.0.4</maven>
-  </prerequisites>
 
   <dependencies>
     <dependency>
index 5977325574202c3edd39125dc6ca60f4ebd5fd71..f5c966d5a6e9472d8c98211b087a9131171b8d23 100644 (file)
@@ -14,9 +14,6 @@
   <name>${project.artifactId}</name>
 
   <description>Artifact that contains only generated code from yang files. Suitable for testing.</description>
-  <prerequisites>
-    <maven>3.0.4</maven>
-  </prerequisites>
 
   <dependencies>
     <dependency>
index c256c822a420e3a22b5a351778d58a88e73a9e8d..766b80e73dd12c890df3ed493e397a7cd144aab4 100644 (file)
@@ -107,14 +107,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     private CaptureSnapshot captureSnapshot = null;
 
-    private volatile boolean hasSnapshotCaptureInitiated = false;
-
     private Stopwatch recoveryTimer;
 
     private int currentRecoveryBatchCount;
 
-
-
     public RaftActor(String id, Map<String, String> peerAddresses) {
         this(id, peerAddresses, Optional.<ConfigParams>absent());
     }
@@ -179,7 +175,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     private void onRecoveredSnapshot(SnapshotOffer offer) {
         if(LOG.isDebugEnabled()) {
-            LOG.debug("SnapshotOffer called..");
+            LOG.debug("{}: SnapshotOffer called..", persistenceId());
         }
 
         initRecoveryTimer();
@@ -209,7 +205,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) {
         if(LOG.isDebugEnabled()) {
-            LOG.debug("Received ReplicatedLogEntry for recovery: {}", logEntry.getIndex());
+            LOG.debug("{}: Received ReplicatedLogEntry for recovery: {}", persistenceId(), logEntry.getIndex());
         }
 
         replicatedLog.append(logEntry);
@@ -217,8 +213,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     private void onRecoveredApplyLogEntries(ApplyLogEntries ale) {
         if(LOG.isDebugEnabled()) {
-            LOG.debug("Received ApplyLogEntries for recovery, applying to state: {} to {}",
-                    context.getLastApplied() + 1, ale.getToIndex());
+            LOG.debug("{}: Received ApplyLogEntries for recovery, applying to state: {} to {}",
+                    persistenceId(), context.getLastApplied() + 1, ale.getToIndex());
         }
 
         for (long i = context.getLastApplied() + 1; i <= ale.getToIndex(); i++) {
@@ -289,8 +285,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             ApplyState applyState = (ApplyState) message;
 
             if(LOG.isDebugEnabled()) {
-                LOG.debug("Applying state for log index {} data {}",
-                    applyState.getReplicatedLogEntry().getIndex(),
+                LOG.debug("{}: Applying state for log index {} data {}",
+                    persistenceId(), applyState.getReplicatedLogEntry().getIndex(),
                     applyState.getReplicatedLogEntry().getData());
             }
 
@@ -300,7 +296,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         } else if (message instanceof ApplyLogEntries){
             ApplyLogEntries ale = (ApplyLogEntries) message;
             if(LOG.isDebugEnabled()) {
-                LOG.debug("Persisting ApplyLogEntries with index={}", ale.getToIndex());
+                LOG.debug("{}: Persisting ApplyLogEntries with index={}", persistenceId(), ale.getToIndex());
             }
             persistence().persist(new ApplyLogEntries(ale.getToIndex()), new Procedure<ApplyLogEntries>() {
                 @Override
@@ -312,8 +308,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             Snapshot snapshot = ((ApplySnapshot) message).getSnapshot();
 
             if(LOG.isDebugEnabled()) {
-                LOG.debug("ApplySnapshot called on Follower Actor " +
-                        "snapshotIndex:{}, snapshotTerm:{}", snapshot.getLastAppliedIndex(),
+                LOG.debug("{}: ApplySnapshot called on Follower Actor " +
+                        "snapshotIndex:{}, snapshotTerm:{}", persistenceId(), snapshot.getLastAppliedIndex(),
                     snapshot.getLastAppliedTerm()
                 );
             }
@@ -333,7 +329,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
         } else if (message instanceof SaveSnapshotSuccess) {
             SaveSnapshotSuccess success = (SaveSnapshotSuccess) message;
-            LOG.info("SaveSnapshotSuccess received for snapshot");
+            LOG.info("{}: SaveSnapshotSuccess received for snapshot", persistenceId());
 
             long sequenceNumber = success.metadata().sequenceNr();
 
@@ -342,19 +338,19 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         } else if (message instanceof SaveSnapshotFailure) {
             SaveSnapshotFailure saveSnapshotFailure = (SaveSnapshotFailure) message;
 
-            LOG.info("saveSnapshotFailure.metadata():{}", saveSnapshotFailure.metadata().toString());
-            LOG.error(saveSnapshotFailure.cause(), "SaveSnapshotFailure received for snapshot Cause:");
+            LOG.error(saveSnapshotFailure.cause(), "{}: SaveSnapshotFailure received for snapshot Cause:",
+                    persistenceId());
 
             context.getReplicatedLog().snapshotRollback();
 
-            LOG.info("Replicated Log rollbacked. Snapshot will be attempted in the next cycle." +
-                "snapshotIndex:{}, snapshotTerm:{}, log-size:{}",
+            LOG.info("{}: Replicated Log rollbacked. Snapshot will be attempted in the next cycle." +
+                "snapshotIndex:{}, snapshotTerm:{}, log-size:{}", persistenceId(),
                 context.getReplicatedLog().getSnapshotIndex(),
                 context.getReplicatedLog().getSnapshotTerm(),
                 context.getReplicatedLog().size());
 
         } else if (message instanceof CaptureSnapshot) {
-            LOG.info("CaptureSnapshot received by actor");
+            LOG.info("{}: CaptureSnapshot received by actor", persistenceId());
 
             if(captureSnapshot == null) {
                 captureSnapshot = (CaptureSnapshot)message;
@@ -368,7 +364,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             if (!(message instanceof AppendEntriesMessages.AppendEntries)
                 && !(message instanceof AppendEntriesReply) && !(message instanceof SendHeartBeat)) {
                 if(LOG.isDebugEnabled()) {
-                    LOG.debug("onReceiveCommand: message: {}", message.getClass());
+                    LOG.debug("{}: onReceiveCommand: message: {}", persistenceId(), message.getClass());
                 }
             }
 
@@ -414,7 +410,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             context.getTermInformation().getCurrentTerm(), data);
 
         if(LOG.isDebugEnabled()) {
-            LOG.debug("Persist data {}", replicatedLogEntry);
+            LOG.debug("{}: Persist data {}", persistenceId(), replicatedLogEntry);
         }
 
         final RaftActorContext raftContext = getRaftActorContext();
@@ -436,12 +432,13 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                             self().tell(new ApplyLogEntries((int) replicatedLogEntry.getIndex()), self());
 
                             // Check if the "real" snapshot capture has been initiated. If no then do the fake snapshot
-                            if(!hasSnapshotCaptureInitiated){
+                            if(!context.isSnapshotCaptureInitiated()){
                                 raftContext.getReplicatedLog().snapshotPreCommit(raftContext.getLastApplied(),
                                         raftContext.getTermInformation().getCurrentTerm());
                                 raftContext.getReplicatedLog().snapshotCommit();
                             } else {
-                                LOG.debug("Skipping fake snapshotting for {} because real snapshotting is in progress", getId());
+                                LOG.debug("{}: Skipping fake snapshotting for {} because real snapshotting is in progress",
+                                        persistenceId(), getId());
                             }
                         } else if (clientActor != null) {
                             // Send message for replication
@@ -652,15 +649,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         }
         String peerAddress = context.getPeerAddress(leaderId);
         if(LOG.isDebugEnabled()) {
-            LOG.debug("getLeaderAddress leaderId = {} peerAddress = {}",
-                    leaderId, peerAddress);
+            LOG.debug("{}: getLeaderAddress leaderId = {} peerAddress = {}",
+                    persistenceId(), leaderId, peerAddress);
         }
 
         return peerAddress;
     }
 
     private void handleCaptureSnapshotReply(byte[] snapshotBytes) {
-        LOG.info("CaptureSnapshotReply received by actor: snapshot size {}", snapshotBytes.length);
+        LOG.info("{}: CaptureSnapshotReply received by actor: snapshot size {}", persistenceId(), snapshotBytes.length);
 
         // create a snapshot object from the state provided and save it
         // when snapshot is saved async, SaveSnapshotSuccess is raised.
@@ -672,7 +669,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
         persistence().saveSnapshot(sn);
 
-        LOG.info("Persisting of snapshot done:{}", sn.getLogMessage());
+        LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), sn.getLogMessage());
 
         //be greedy and remove entries from in-mem journal which are in the snapshot
         // and update snapshotIndex and snapshotTerm without waiting for the success,
@@ -681,8 +678,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             captureSnapshot.getLastAppliedIndex(),
             captureSnapshot.getLastAppliedTerm());
 
-        LOG.info("Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " +
-            "and term:{}", captureSnapshot.getLastAppliedIndex(),
+        LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " +
+            "and term:{}", persistenceId(), captureSnapshot.getLastAppliedIndex(),
             captureSnapshot.getLastAppliedTerm());
 
         if (isLeader() && captureSnapshot.isInstallSnapshotInitiated()) {
@@ -692,7 +689,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         }
 
         captureSnapshot = null;
-        hasSnapshotCaptureInitiated = false;
+        context.setSnapshotCaptureInitiated(false);
     }
 
     protected boolean hasFollowers(){
@@ -751,7 +748,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             final Procedure<ReplicatedLogEntry> callback)  {
 
             if(LOG.isDebugEnabled()) {
-                LOG.debug("Append log entry and persist {} ", replicatedLogEntry);
+                LOG.debug("{}: Append log entry and persist {} ", persistenceId(), replicatedLogEntry);
             }
 
             // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
@@ -793,13 +790,13 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                                 getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100;
 
                         // when a snaphsot is being taken, captureSnapshot != null
-                        if (hasSnapshotCaptureInitiated == false &&
+                        if (!context.isSnapshotCaptureInitiated() &&
                                 ( journalSize % context.getConfigParams().getSnapshotBatchCount() == 0 ||
                                         dataSizeForCheck > dataThreshold)) {
 
                             dataSizeSinceLastSnapshot = 0;
 
-                            LOG.info("Initiating Snapshot Capture..");
+                            LOG.info("{}: Initiating Snapshot Capture..", persistenceId());
                             long lastAppliedIndex = -1;
                             long lastAppliedTerm = -1;
 
@@ -813,18 +810,20 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                             }
 
                             if(LOG.isDebugEnabled()) {
-                                LOG.debug("Snapshot Capture logSize: {}", journal.size());
-                                LOG.debug("Snapshot Capture lastApplied:{} ",
-                                    context.getLastApplied());
-                                LOG.debug("Snapshot Capture lastAppliedIndex:{}", lastAppliedIndex);
-                                LOG.debug("Snapshot Capture lastAppliedTerm:{}", lastAppliedTerm);
+                                LOG.debug("{}: Snapshot Capture logSize: {}", persistenceId(), journal.size());
+                                LOG.debug("{}: Snapshot Capture lastApplied:{} ",
+                                        persistenceId(), context.getLastApplied());
+                                LOG.debug("{}: Snapshot Capture lastAppliedIndex:{}", persistenceId(),
+                                        lastAppliedIndex);
+                                LOG.debug("{}: Snapshot Capture lastAppliedTerm:{}", persistenceId(),
+                                        lastAppliedTerm);
                             }
 
                             // send a CaptureSnapshot to self to make the expensive operation async.
                             getSelf().tell(new CaptureSnapshot(
                                 lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm),
                                 null);
-                            hasSnapshotCaptureInitiated = true;
+                            context.setSnapshotCaptureInitiated(true);
                         }
                         if(callback != null){
                             callback.apply(replicatedLogEntry);
@@ -869,7 +868,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
         @Override public void update(long currentTerm, String votedFor) {
             if(LOG.isDebugEnabled()) {
-                LOG.debug("Set currentTerm={}, votedFor={}", currentTerm, votedFor);
+                LOG.debug("{}: Set currentTerm={}, votedFor={}", persistenceId(), currentTerm, votedFor);
             }
             this.currentTerm = currentTerm;
             this.votedFor = votedFor;
index 0eb4b7377976526ac48eaefa9a9922b6a70c0076..0e1f20b24681ed6a0cd0644b513251114f225745 100644 (file)
@@ -89,7 +89,7 @@ public interface RaftActorContext {
      *
      * @param replicatedLog
      */
-    public void setReplicatedLog(ReplicatedLog replicatedLog);
+    void setReplicatedLog(ReplicatedLog replicatedLog);
 
     /**
      * @return A representation of the log
@@ -137,7 +137,7 @@ public interface RaftActorContext {
      *
      * @param name
      */
-    public void removePeer(String name);
+    void removePeer(String name);
 
     /**
      * Given a peerId return the corresponding actor
@@ -165,5 +165,10 @@ public interface RaftActorContext {
     /**
      * @return ConfigParams
      */
-    public ConfigParams getConfigParams();
+    ConfigParams getConfigParams();
+
+    void setSnapshotCaptureInitiated(boolean snapshotCaptureInitiated);
+
+    boolean isSnapshotCaptureInitiated();
+
 }
index e4aef0a8445d2400e54fff5c1d09c9044747134f..5438fe7c4840ed4c91f17c98ebc1022ace720d7c 100644 (file)
@@ -14,7 +14,6 @@ import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.actor.UntypedActorContext;
 import akka.event.LoggingAdapter;
-
 import java.util.Map;
 
 import static com.google.common.base.Preconditions.checkState;
@@ -41,6 +40,8 @@ public class RaftActorContextImpl implements RaftActorContext {
 
     private final ConfigParams configParams;
 
+    private boolean snapshotCaptureInitiated;
+
     public RaftActorContextImpl(ActorRef actor, UntypedActorContext context,
         String id,
         ElectionTerm termInformation, long commitIndex,
@@ -130,6 +131,16 @@ public class RaftActorContextImpl implements RaftActorContext {
         return configParams;
     }
 
+    @Override
+    public void setSnapshotCaptureInitiated(boolean snapshotCaptureInitiated) {
+        this.snapshotCaptureInitiated = snapshotCaptureInitiated;
+    }
+
+    @Override
+    public boolean isSnapshotCaptureInitiated() {
+        return snapshotCaptureInitiated;
+    }
+
     @Override public void addToPeers(String name, String address) {
         peerAddresses.put(name, address);
     }
index 77bf10370184894d6a510990e08fb1a8bc785a84..feccea7edba37b5faff9782cf986146f0a784af9 100644 (file)
@@ -12,7 +12,8 @@ import java.util.List;
 
 
 public class Snapshot implements Serializable {
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = -8298574936724056236L;
+
     private final byte[] state;
     private final List<ReplicatedLogEntry> unAppliedEntries;
     private final long lastIndex;
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CommitEntry.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CommitEntry.java
deleted file mode 100644 (file)
index 6335e32..0000000
+++ /dev/null
@@ -1,18 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.raft.base.messages;
-
-import java.io.Serializable;
-
-/**
- * Message sent to commit an entry to the log
- */
-public class CommitEntry implements Serializable {
-    private static final long serialVersionUID = 1L;
-}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/PersistEntry.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/PersistEntry.java
deleted file mode 100644 (file)
index 68ecc12..0000000
+++ /dev/null
@@ -1,18 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.raft.base.messages;
-
-import java.io.Serializable;
-
-/**
- * Message sent to Persist an entry into the transaction journal
- */
-public class PersistEntry implements Serializable {
-    private static final long serialVersionUID = 1L;
-}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/SaveSnapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/SaveSnapshot.java
deleted file mode 100644 (file)
index 7b7f085..0000000
+++ /dev/null
@@ -1,19 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.raft.base.messages;
-
-import java.io.Serializable;
-
-/**
- * This message is sent by a RaftActor to itself so that a subclass can process
- * it and use it to save it's state
- */
-public class SaveSnapshot implements Serializable {
-    private static final long serialVersionUID = 1L;
-}
index 462c94ec8a40736cc005c994b74520d9111c3430..410dcee5e5e811e9b9cb485b30420a4d21311a12 100644 (file)
@@ -93,6 +93,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
     private Optional<ByteString> snapshot;
 
+    private long replicatedToAllIndex = -1;
+
     public AbstractLeader(RaftActorContext context) {
         super(context);
 
@@ -109,7 +111,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
         leaderId = context.getId();
 
-        LOG.debug("Election:Leader has following peers: {}", getFollowerIds());
+        LOG.debug("{}: Election: Leader has following peers: {}", context.getId(), getFollowerIds());
 
         minReplicationCount = getMajorityVoteCount(getFollowerIds().size());
 
@@ -153,7 +155,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         AppendEntries appendEntries) {
 
         if(LOG.isDebugEnabled()) {
-            LOG.debug(appendEntries.toString());
+            LOG.debug("{}: handleAppendEntries: {}", context.getId(), appendEntries);
         }
 
         return this;
@@ -165,7 +167,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
         if(! appendEntriesReply.isSuccess()) {
             if(LOG.isDebugEnabled()) {
-                LOG.debug(appendEntriesReply.toString());
+                LOG.debug("{}: handleAppendEntriesReply: {}", context.getId(), appendEntriesReply);
             }
         }
 
@@ -175,7 +177,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             followerToLog.get(followerId);
 
         if(followerLogInformation == null){
-            LOG.error("Unknown follower {}", followerId);
+            LOG.error("{}: handleAppendEntriesReply - unknown follower {}", context.getId(), followerId);
             return this;
         }
 
@@ -226,9 +228,25 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             applyLogToStateMachine(context.getCommitIndex());
         }
 
+        if (!context.isSnapshotCaptureInitiated()) {
+            purgeInMemoryLog();
+        }
+
         return this;
     }
 
+    private void purgeInMemoryLog() {
+        //find the lowest index across followers which has been replicated to all. -1 if there are no followers.
+        // we would delete the in-mem log from that index on, in-order to minimize mem usage
+        // we would also share this info thru AE with the followers so that they can delete their log entries as well.
+        long minReplicatedToAllIndex = followerToLog.isEmpty() ? -1 : Long.MAX_VALUE;
+        for (FollowerLogInformation info : followerToLog.values()) {
+            minReplicatedToAllIndex = Math.min(minReplicatedToAllIndex, info.getMatchIndex());
+        }
+
+        replicatedToAllIndex = fakeSnapshot(minReplicatedToAllIndex, replicatedToAllIndex);
+    }
+
     @Override
     protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
         final Iterator<ClientRequestTracker> it = trackerList.iterator();
@@ -276,6 +294,9 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             // set currentTerm = T, convert to follower (§5.1)
             // This applies to all RPC messages and responses
             if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
+                LOG.debug("{}: Term {} in \"{}\" message is greater than leader's term {}", context.getId(),
+                        rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
+
                 context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
 
                 return switchBehavior(new Follower(context));
@@ -312,19 +333,24 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     private void handleInstallSnapshotReply(InstallSnapshotReply reply) {
         String followerId = reply.getFollowerId();
         FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
+
+        if (followerToSnapshot == null) {
+            LOG.error("{}: FollowerId {} in InstallSnapshotReply not known to Leader",
+                    context.getId(), followerId);
+            return;
+        }
+
         FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
         followerLogInformation.markFollowerActive();
 
-        if (followerToSnapshot != null &&
-            followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) {
-
+        if (followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) {
             if (reply.isSuccess()) {
                 if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) {
                     //this was the last chunk reply
                     if(LOG.isDebugEnabled()) {
-                        LOG.debug("InstallSnapshotReply received, " +
+                        LOG.debug("{}: InstallSnapshotReply received, " +
                                 "last chunk received, Chunk:{}. Follower:{} Setting nextIndex:{}",
-                            reply.getChunkIndex(), followerId,
+                                context.getId(), reply.getChunkIndex(), followerId,
                             context.getReplicatedLog().getSnapshotIndex() + 1
                         );
                     }
@@ -336,8 +362,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                     mapFollowerToSnapshot.remove(followerId);
 
                     if(LOG.isDebugEnabled()) {
-                        LOG.debug("followerToLog.get(followerId).getNextIndex()=" +
-                            followerToLog.get(followerId).getNextIndex());
+                        LOG.debug("{}: followerToLog.get(followerId).getNextIndex()=" +
+                                context.getId(), followerToLog.get(followerId).getNextIndex());
                     }
 
                     if (mapFollowerToSnapshot.isEmpty()) {
@@ -350,20 +376,15 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                     followerToSnapshot.markSendStatus(true);
                 }
             } else {
-                LOG.info("InstallSnapshotReply received, " +
-                        "sending snapshot chunk failed, Will retry, Chunk:{}",
-                    reply.getChunkIndex()
-                );
+                LOG.info("{}: InstallSnapshotReply received sending snapshot chunk failed, Will retry, Chunk: {}",
+                        context.getId(), reply.getChunkIndex());
 
                 followerToSnapshot.markSendStatus(false);
             }
-
         } else {
-            LOG.error("ERROR!!" +
-                    "FollowerId in InstallSnapshotReply not known to Leader" +
-                    " or Chunk Index in InstallSnapshotReply not matching {} != {}",
-                followerToSnapshot.getChunkIndex(), reply.getChunkIndex()
-            );
+            LOG.error("{}: Chunk index {} in InstallSnapshotReply from follower {} does not match expected index {}",
+                    context.getId(), reply.getChunkIndex(), followerId,
+                    followerToSnapshot.getChunkIndex());
 
             if(reply.getChunkIndex() == INVALID_CHUNK_INDEX){
                 // Since the Follower did not find this index to be valid we should reset the follower snapshot
@@ -377,7 +398,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         long logIndex = replicate.getReplicatedLogEntry().getIndex();
 
         if(LOG.isDebugEnabled()) {
-            LOG.debug("Replicate message {}", logIndex);
+            LOG.debug("{}: Replicate message {}", context.getId(), logIndex);
         }
 
         // Create a tracker entry we will use this later to notify the
@@ -398,6 +419,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
     private void sendAppendEntries() {
         // Send an AppendEntries to all followers
+
         for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
             final String followerId = e.getKey();
             ActorSelection followerActor = context.getPeerActorSelection(followerId);
@@ -407,14 +429,15 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 long followerNextIndex = followerLogInformation.getNextIndex();
                 boolean isFollowerActive = followerLogInformation.isFollowerActive();
 
-                if (mapFollowerToSnapshot.get(followerId) != null) {
+                FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
+                if (followerToSnapshot != null) {
                     // if install snapshot is in process , then sent next chunk if possible
-                    if (isFollowerActive && mapFollowerToSnapshot.get(followerId).canSendNextChunk()) {
+                    if (isFollowerActive && followerToSnapshot.canSendNextChunk()) {
                         sendSnapshotChunk(followerActor, followerId);
                     } else {
                         // we send a heartbeat even if we have not received a reply for the last chunk
                         sendAppendEntriesToFollower(followerActor, followerNextIndex,
-                            Collections.<ReplicatedLogEntry>emptyList());
+                            Collections.<ReplicatedLogEntry>emptyList(), followerId);
                     }
 
                 } else {
@@ -422,8 +445,13 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                     long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
                     final List<ReplicatedLogEntry> entries;
 
-                    if (isFollowerActive &&
-                        context.getReplicatedLog().isPresent(followerNextIndex)) {
+                    LOG.debug("{}: Checking sendAppendEntries for {}, leaderLastIndex: {}, leaderSnapShotIndex: {}",
+                            context.getId(), leaderLastIndex, leaderSnapShotIndex);
+
+                    if (isFollowerActive && context.getReplicatedLog().isPresent(followerNextIndex)) {
+                        LOG.debug("{}: sendAppendEntries: {} is present for {}", context.getId(),
+                                followerNextIndex, followerId);
+
                         // FIXME : Sending one entry at a time
                         entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
 
@@ -434,11 +462,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                         // then snapshot should be sent
 
                         if(LOG.isDebugEnabled()) {
-                            LOG.debug("InitiateInstallSnapshot to follower:{}," +
-                                    "follower-nextIndex:{}, leader-snapshot-index:{},  " +
-                                    "leader-last-index:{}", followerId,
-                                followerNextIndex, leaderSnapShotIndex, leaderLastIndex
-                            );
+                            LOG.debug(String.format("%s: InitiateInstallSnapshot to follower: %s," +
+                                    "follower-nextIndex: %s, leader-snapshot-index: %s,  " +
+                                    "leader-last-index: %s", context.getId(), followerId,
+                                followerNextIndex, leaderSnapShotIndex, leaderLastIndex));
                         }
                         actor().tell(new InitiateInstallSnapshot(), actor());
 
@@ -451,22 +478,25 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                         entries =  Collections.<ReplicatedLogEntry>emptyList();
                     }
 
-                    sendAppendEntriesToFollower(followerActor, followerNextIndex, entries);
-
+                    sendAppendEntriesToFollower(followerActor, followerNextIndex, entries, followerId);
                 }
             }
         }
     }
 
     private void sendAppendEntriesToFollower(ActorSelection followerActor, long followerNextIndex,
-        List<ReplicatedLogEntry> entries) {
-        followerActor.tell(
-            new AppendEntries(currentTerm(), context.getId(),
-                prevLogIndex(followerNextIndex),
-                prevLogTerm(followerNextIndex), entries,
-                context.getCommitIndex()).toSerializable(),
-            actor()
-        );
+        List<ReplicatedLogEntry> entries, String followerId) {
+        AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(),
+            prevLogIndex(followerNextIndex),
+            prevLogTerm(followerNextIndex), entries,
+            context.getCommitIndex(), replicatedToAllIndex);
+
+        if(!entries.isEmpty()) {
+            LOG.debug("{}: Sending AppendEntries to follower {}: {}", context.getId(), followerId,
+                    appendEntries);
+        }
+
+        followerActor.tell(appendEntries.toSerializable(), actor());
     }
 
     /**
@@ -486,6 +516,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
      *
      */
     private void installSnapshotIfNeeded() {
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("{}: installSnapshotIfNeeded, followers {}", context.getId(), followerToLog.keySet());
+        }
+
         for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
             final ActorSelection followerActor = context.getPeerActorSelection(e.getKey());
 
@@ -493,8 +527,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 long nextIndex = e.getValue().getNextIndex();
 
                 if (!context.getReplicatedLog().isPresent(nextIndex) &&
-                    context.getReplicatedLog().isInSnapshot(nextIndex)) {
-                    LOG.info("{} follower needs a snapshot install", e.getKey());
+                        context.getReplicatedLog().isInSnapshot(nextIndex)) {
+                    LOG.info("{}: {} follower needs a snapshot install", context.getId(), e.getKey());
                     if (snapshot.isPresent()) {
                         // if a snapshot is present in the memory, most likely another install is in progress
                         // no need to capture snapshot
@@ -516,7 +550,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     // on every install snapshot, we try to capture the snapshot.
     // Once a capture is going on, another one issued will get ignored by RaftActor.
     private void initiateCaptureSnapshot() {
-        LOG.info("Initiating Snapshot Capture to Install Snapshot, Leader:{}", getLeaderId());
+        LOG.info("{}: Initiating Snapshot Capture to Install Snapshot, Leader:{}", context.getId(), getLeaderId());
         ReplicatedLogEntry lastAppliedEntry = context.getReplicatedLog().get(context.getLastApplied());
         long lastAppliedIndex = -1;
         long lastAppliedTerm = -1;
@@ -558,23 +592,30 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     private void sendSnapshotChunk(ActorSelection followerActor, String followerId) {
         try {
             if (snapshot.isPresent()) {
+                ByteString nextSnapshotChunk = getNextSnapshotChunk(followerId,snapshot.get());
+
+                // Note: the previous call to getNextSnapshotChunk has the side-effect of adding
+                // followerId to the followerToSnapshot map.
+                FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
+
                 followerActor.tell(
                     new InstallSnapshot(currentTerm(), context.getId(),
                         context.getReplicatedLog().getSnapshotIndex(),
                         context.getReplicatedLog().getSnapshotTerm(),
-                        getNextSnapshotChunk(followerId,snapshot.get()),
-                        mapFollowerToSnapshot.get(followerId).incrementChunkIndex(),
-                        mapFollowerToSnapshot.get(followerId).getTotalChunks(),
-                        Optional.of(mapFollowerToSnapshot.get(followerId).getLastChunkHashCode())
+                        nextSnapshotChunk,
+                        followerToSnapshot.incrementChunkIndex(),
+                        followerToSnapshot.getTotalChunks(),
+                        Optional.of(followerToSnapshot.getLastChunkHashCode())
                     ).toSerializable(),
                     actor()
                 );
-                LOG.info("InstallSnapshot sent to follower {}, Chunk: {}/{}",
-                    followerActor.path(), mapFollowerToSnapshot.get(followerId).getChunkIndex(),
-                    mapFollowerToSnapshot.get(followerId).getTotalChunks());
+                LOG.info("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}",
+                        context.getId(), followerActor.path(),
+                        followerToSnapshot.getChunkIndex(),
+                        followerToSnapshot.getTotalChunks());
             }
         } catch (IOException e) {
-            LOG.error(e, "InstallSnapshot failed for Leader.");
+            LOG.error(e, "{}: InstallSnapshot failed for Leader.", context.getId());
         }
     }
 
@@ -590,7 +631,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         }
         ByteString nextChunk = followerToSnapshot.getNextChunk();
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Leader's snapshot nextChunk size:{}", nextChunk.size());
+            LOG.debug("{}: Leader's snapshot nextChunk size:{}", context.getId(), nextChunk.size());
         }
         return nextChunk;
     }
@@ -654,14 +695,14 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
      * snapshot chunks
      */
     protected class FollowerToSnapshot {
-        private ByteString snapshotBytes;
+        private final ByteString snapshotBytes;
         private int offset = 0;
         // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset
         private int replyReceivedForOffset;
         // if replyStatus is false, the previous chunk is attempted
         private boolean replyStatus = false;
         private int chunkIndex;
-        private int totalChunks;
+        private final int totalChunks;
         private int lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
         private int nextChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
 
@@ -671,8 +712,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) +
                 ((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0);
             if(LOG.isDebugEnabled()) {
-                LOG.debug("Snapshot {} bytes, total chunks to send:{}",
-                    size, totalChunks);
+                LOG.debug("{}: Snapshot {} bytes, total chunks to send:{}",
+                        context.getId(), size, totalChunks);
             }
             replyReceivedForOffset = -1;
             chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX;
@@ -741,7 +782,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             }
 
             if(LOG.isDebugEnabled()) {
-                LOG.debug("length={}, offset={},size={}",
+                LOG.debug("{}: Next chunk: length={}, offset={},size={}", context.getId(),
                     snapshotLength, start, size);
             }
             ByteString substring = getSnapshotBytes().substring(start, start + size);
index 04462be0420eaa3f0504be0523b7c9371128273d..99824b0bb4e6235a56d12cb719384fee85fd309a 100644 (file)
@@ -94,8 +94,8 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
         // 1. Reply false if term < currentTerm (§5.1)
         if (appendEntries.getTerm() < currentTerm()) {
             if(LOG.isDebugEnabled()) {
-                LOG.debug("Cannot append entries because sender term {} is less than {}",
-                        appendEntries.getTerm(), currentTerm());
+                LOG.debug("{}: Cannot append entries because sender term {} is less than {}",
+                        context.getId(), appendEntries.getTerm(), currentTerm());
             }
 
             sender.tell(
@@ -136,7 +136,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
         RequestVote requestVote) {
 
         if(LOG.isDebugEnabled()) {
-            LOG.debug(requestVote.toString());
+            LOG.debug("{}: Received {}", context.getId(), requestVote);
         }
 
         boolean grantVote = false;
@@ -350,12 +350,13 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
                 //if one index is not present in the log, no point in looping
                 // around as the rest wont be present either
                 LOG.warning(
-                        "Missing index {} from log. Cannot apply state. Ignoring {} to {}", i, i, index);
+                        "{}: Missing index {} from log. Cannot apply state. Ignoring {} to {}",
+                        context.getId(), i, i, index);
                 break;
             }
         }
         if(LOG.isDebugEnabled()) {
-            LOG.debug("Setting last applied to {}", newLastApplied);
+            LOG.debug("{}: Setting last applied to {}", context.getId(), newLastApplied);
         }
         context.setLastApplied(newLastApplied);
 
@@ -393,7 +394,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
         try {
             close();
         } catch (Exception e) {
-            LOG.error(e, "Failed to close behavior : {}", this.state());
+            LOG.error(e, "{}: Failed to close behavior : {}", context.getId(), this.state());
         }
 
         return behavior;
@@ -421,4 +422,18 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
         return numMajority;
 
     }
+
+    protected long fakeSnapshot(final long minReplicatedToAllIndex, final long currentReplicatedIndex) {
+
+        //  we would want to keep the lastApplied as its used while capturing snapshots
+        long tempMin = Math.min(minReplicatedToAllIndex,
+                (context.getLastApplied() > -1 ? context.getLastApplied() - 1 : -1));
+
+        if (tempMin > -1 && context.getReplicatedLog().isPresent(tempMin))  {
+            context.getReplicatedLog().snapshotPreCommit(tempMin, context.getTermInformation().getCurrentTerm());
+            context.getReplicatedLog().snapshotCommit();
+            return tempMin;
+        }
+        return currentReplicatedIndex;
+    }
 }
index 702417273ff586fc635ebcb65e15fa4c8bd64885..09ffe056c3e94fcd4592f8f44f4e29123967f918 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
+import java.util.Set;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
@@ -19,8 +20,6 @@ import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 
-import java.util.Set;
-
 /**
  * The behavior of a RaftActor when it is in the CandidateState
  * <p/>
@@ -53,7 +52,7 @@ public class Candidate extends AbstractRaftActorBehavior {
         peers = context.getPeerAddresses().keySet();
 
         if(LOG.isDebugEnabled()) {
-            LOG.debug("Election:Candidate has following peers: {}", peers);
+            LOG.debug("{}: Election: Candidate has following peers: {}", context.getId(), peers);
         }
 
         votesRequired = getMajorityVoteCount(peers.size());
@@ -66,7 +65,7 @@ public class Candidate extends AbstractRaftActorBehavior {
         AppendEntries appendEntries) {
 
         if(LOG.isDebugEnabled()) {
-            LOG.debug(appendEntries.toString());
+            LOG.debug("{}: handleAppendEntries: {}", context.getId(), appendEntries);
         }
 
         return this;
@@ -106,7 +105,8 @@ public class Candidate extends AbstractRaftActorBehavior {
             RaftRPC rpc = (RaftRPC) message;
 
             if(LOG.isDebugEnabled()) {
-                LOG.debug("RaftRPC message received {} my term is {}", rpc, context.getTermInformation().getCurrentTerm());
+                LOG.debug("{}: RaftRPC message received {} my term is {}", context.getId(), rpc,
+                        context.getTermInformation().getCurrentTerm());
             }
 
             // If RPC request or response contains term T > currentTerm:
@@ -150,7 +150,7 @@ public class Candidate extends AbstractRaftActorBehavior {
             context.getId());
 
         if(LOG.isDebugEnabled()) {
-            LOG.debug("Starting new term {}", (currentTerm + 1));
+            LOG.debug("{}: Starting new term {}", context.getId(), (currentTerm + 1));
         }
 
         // Request for a vote
index cc2e55d51b115cc124890c3e643ed810244a7c24..410b3c266c87066cef3828732f62ca154a580df3 100644 (file)
@@ -77,7 +77,7 @@ public class Follower extends AbstractRaftActorBehavior {
 
         if(appendEntries.getEntries() != null && appendEntries.getEntries().size() > 0) {
             if(LOG.isDebugEnabled()) {
-                LOG.debug(appendEntries.toString());
+                LOG.debug("{}: handleAppendEntries: {}", context.getId(), appendEntries);
             }
         }
 
@@ -109,8 +109,8 @@ public class Follower extends AbstractRaftActorBehavior {
             // it's log.
 
             if(LOG.isDebugEnabled()) {
-                LOG.debug("The followers log is empty and the senders prevLogIndex is {}",
-                        appendEntries.getPrevLogIndex());
+                LOG.debug("{}: The followers log is empty and the senders prevLogIndex is {}",
+                        context.getId(), appendEntries.getPrevLogIndex());
             }
 
         } else if (lastIndex() > -1
@@ -121,8 +121,8 @@ public class Follower extends AbstractRaftActorBehavior {
             // prevLogIndex entry was not found in it's log
 
             if(LOG.isDebugEnabled()) {
-                LOG.debug("The log is not empty but the prevLogIndex {} was not found in it",
-                        appendEntries.getPrevLogIndex());
+                LOG.debug("{}: The log is not empty but the prevLogIndex {} was not found in it",
+                        context.getId(), appendEntries.getPrevLogIndex());
             }
 
         } else if (lastIndex() > -1
@@ -135,8 +135,8 @@ public class Follower extends AbstractRaftActorBehavior {
 
             if (LOG.isDebugEnabled()) {
                 LOG.debug(
-                        "Cannot append entries because previous entry term {}  is not equal to append entries prevLogTerm {}"
-                        , prevLogTerm
+                        "{}: Cannot append entries because previous entry term {}  is not equal to append entries prevLogTerm {}"
+                        , context.getId(), prevLogTerm
                         , appendEntries.getPrevLogTerm());
             }
         } else {
@@ -147,9 +147,9 @@ public class Follower extends AbstractRaftActorBehavior {
             // We found that the log was out of sync so just send a negative
             // reply and return
             if(LOG.isDebugEnabled()) {
-                LOG.debug("Follower ({}) is out-of-sync, " +
+                LOG.debug("{}: Follower ({}) is out-of-sync, " +
                         "so sending negative reply, lastIndex():{}, lastTerm():{}",
-                        context.getId(), lastIndex(), lastTerm()
+                        context.getId(), context.getId(), lastIndex(), lastTerm()
                 );
             }
             sender.tell(
@@ -162,9 +162,8 @@ public class Follower extends AbstractRaftActorBehavior {
         if (appendEntries.getEntries() != null
             && appendEntries.getEntries().size() > 0) {
             if(LOG.isDebugEnabled()) {
-                LOG.debug(
-                    "Number of entries to be appended = {}", appendEntries.getEntries().size()
-                );
+                LOG.debug("{}: Number of entries to be appended = {}", context.getId(),
+                        appendEntries.getEntries().size());
             }
 
             // 3. If an existing entry conflicts with a new one (same index
@@ -189,9 +188,8 @@ public class Follower extends AbstractRaftActorBehavior {
                     }
 
                     if(LOG.isDebugEnabled()) {
-                        LOG.debug(
-                            "Removing entries from log starting at {}", matchEntry.getIndex()
-                        );
+                        LOG.debug("{}: Removing entries from log starting at {}", context.getId(),
+                                matchEntry.getIndex());
                     }
 
                     // Entries do not match so remove all subsequent entries
@@ -202,8 +200,8 @@ public class Follower extends AbstractRaftActorBehavior {
             }
 
             if(LOG.isDebugEnabled()) {
-                LOG.debug("After cleanup entries to be added from = {}", (addEntriesFrom + lastIndex())
-                );
+                LOG.debug("{}: After cleanup entries to be added from = {}", context.getId(),
+                        (addEntriesFrom + lastIndex()));
             }
 
             // 4. Append any new entries not already in the log
@@ -211,13 +209,14 @@ public class Follower extends AbstractRaftActorBehavior {
                  i < appendEntries.getEntries().size(); i++) {
 
                 if(LOG.isDebugEnabled()) {
-                    LOG.debug("Append entry to log {}", appendEntries.getEntries().get(i).getData());
+                    LOG.debug("{}: Append entry to log {}", context.getId(),
+                            appendEntries.getEntries().get(i).getData());
                 }
                 context.getReplicatedLog().appendAndPersist(appendEntries.getEntries().get(i));
             }
 
             if(LOG.isDebugEnabled()) {
-                LOG.debug("Log size is now {}", context.getReplicatedLog().size());
+                LOG.debug("{}: Log size is now {}", context.getId(), context.getReplicatedLog().size());
             }
         }
 
@@ -232,7 +231,7 @@ public class Follower extends AbstractRaftActorBehavior {
 
         if (prevCommitIndex != context.getCommitIndex()) {
             if(LOG.isDebugEnabled()) {
-                LOG.debug("Commit index set to {}", context.getCommitIndex());
+                LOG.debug("{}: Commit index set to {}", context.getId(), context.getCommitIndex());
             }
         }
 
@@ -242,9 +241,9 @@ public class Follower extends AbstractRaftActorBehavior {
         if (appendEntries.getLeaderCommit() > context.getLastApplied() &&
             context.getLastApplied() < lastIndex()) {
             if(LOG.isDebugEnabled()) {
-                LOG.debug("applyLogToStateMachine, " +
+                LOG.debug("{}: applyLogToStateMachine, " +
                         "appendEntries.getLeaderCommit():{}," +
-                        "context.getLastApplied():{}, lastIndex():{}",
+                        "context.getLastApplied():{}, lastIndex():{}", context.getId(),
                     appendEntries.getLeaderCommit(), context.getLastApplied(), lastIndex()
                 );
             }
@@ -255,6 +254,10 @@ public class Follower extends AbstractRaftActorBehavior {
         sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), true,
             lastIndex(), lastTerm()), actor());
 
+        if (!context.isSnapshotCaptureInitiated()) {
+            fakeSnapshot(appendEntries.getReplicatedToAllIndex(), appendEntries.getReplicatedToAllIndex());
+        }
+
         return this;
     }
 
@@ -302,8 +305,8 @@ public class Follower extends AbstractRaftActorBehavior {
     private void handleInstallSnapshot(ActorRef sender, InstallSnapshot installSnapshot) {
 
         if(LOG.isDebugEnabled()) {
-            LOG.debug("InstallSnapshot received by follower " +
-                    "datasize:{} , Chunk:{}/{}", installSnapshot.getData().size(),
+            LOG.debug("{}: InstallSnapshot received by follower " +
+                    "datasize:{} , Chunk:{}/{}", context.getId(), installSnapshot.getData().size(),
                 installSnapshot.getChunkIndex(), installSnapshot.getTotalChunks()
             );
         }
@@ -339,8 +342,7 @@ public class Follower extends AbstractRaftActorBehavior {
             snapshotTracker = null;
 
         } catch (Exception e){
-
-            LOG.error(e, "Exception in InstallSnapshot of follower:");
+            LOG.error(e, "{}: Exception in InstallSnapshot of follower", context.getId());
             //send reply with success as false. The chunk will be sent again on failure
             sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(),
                     installSnapshot.getChunkIndex(), false), actor());
index ee3cc65dddb90815aecd50771c834d3ab461ecd4..fcfaee36033f3eba278f1f351d3c8cb3e974feb1 100644 (file)
@@ -57,8 +57,8 @@ public class Leader extends AbstractLeader {
 
         if (originalMessage instanceof IsolatedLeaderCheck) {
             if (isLeaderIsolated()) {
-                LOG.info("At least {} followers need to be active, Switching {} from Leader to IsolatedLeader",
-                    minIsolatedLeaderPeerCount, leaderId);
+                LOG.info("{}: At least {} followers need to be active, Switching {} from Leader to IsolatedLeader",
+                        context.getId(), minIsolatedLeaderPeerCount, leaderId);
                 return switchBehavior(new IsolatedLeader(context));
             }
         }
index 81981062177510641c08edb243016075f388ae0f..97bcd6a708b7c8f11646b026e7e1653a2713ff40 100644 (file)
@@ -50,14 +50,18 @@ public class AppendEntries extends AbstractRaftRPC {
     // leader's commitIndex
     private final long leaderCommit;
 
+    // index which has been replicated successfully to all followers, -1 if none
+    private final long replicatedToAllIndex;
+
     public AppendEntries(long term, String leaderId, long prevLogIndex,
-        long prevLogTerm, List<ReplicatedLogEntry> entries, long leaderCommit) {
+        long prevLogTerm, List<ReplicatedLogEntry> entries, long leaderCommit, long replicatedToAllIndex) {
         super(term);
         this.leaderId = leaderId;
         this.prevLogIndex = prevLogIndex;
         this.prevLogTerm = prevLogTerm;
         this.entries = entries;
         this.leaderCommit = leaderCommit;
+        this.replicatedToAllIndex = replicatedToAllIndex;
     }
 
     private void writeObject(ObjectOutputStream out) throws IOException {
@@ -102,6 +106,10 @@ public class AppendEntries extends AbstractRaftRPC {
         return leaderCommit;
     }
 
+    public long getReplicatedToAllIndex() {
+        return replicatedToAllIndex;
+    }
+
     @Override
     public String toString() {
         final StringBuilder sb =
@@ -112,6 +120,7 @@ public class AppendEntries extends AbstractRaftRPC {
         sb.append(", prevLogTerm=").append(prevLogTerm);
         sb.append(", entries=").append(entries);
         sb.append(", leaderCommit=").append(leaderCommit);
+        sb.append(", replicatedToAllIndex=").append(replicatedToAllIndex);
         sb.append('}');
         return sb.toString();
     }
@@ -203,7 +212,7 @@ public class AppendEntries extends AbstractRaftRPC {
             from.getPrevLogIndex(),
             from.getPrevLogTerm(),
             logEntryList,
-            from.getLeaderCommit());
+            from.getLeaderCommit(), -1);
 
         return to;
     }
index a782eda5659d9389606ea4e7606a5f61dd949c07..32ed85b281fee0feb4b0533c0b72cb5ade40093a 100644 (file)
@@ -12,7 +12,7 @@ package org.opendaylight.controller.cluster.raft.messages;
  * Reply for the AppendEntriesRpc message
  */
 public class AppendEntriesReply extends AbstractRaftRPC {
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = -7487547356392536683L;
 
     // true if follower contained entry matching
     // prevLogIndex and prevLogTerm
@@ -38,6 +38,7 @@ public class AppendEntriesReply extends AbstractRaftRPC {
         this.logLastTerm = logLastTerm;
     }
 
+    @Override
     public long getTerm() {
         return term;
     }
index 71e7ecc189fc882eaa0917b19af99376ed05f1b7..15621bf894b14f75ec845821654202877f4b4382 100644 (file)
@@ -9,13 +9,13 @@
 package org.opendaylight.controller.cluster.raft.messages;
 
 public class InstallSnapshotReply extends AbstractRaftRPC {
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 642227896390779503L;
 
     // The followerId - this will be used to figure out which follower is
     // responding
     private final String followerId;
     private final int chunkIndex;
-    private boolean success;
+    private final boolean success;
 
     public InstallSnapshotReply(long term, String followerId, int chunkIndex,
         boolean success) {
index 8321d0c25bcecce32617c55fe652c2b7be189541..9ba5acb664700f4873d51fdb9f2637ef882c360e 100644 (file)
@@ -12,7 +12,7 @@ package org.opendaylight.controller.cluster.raft.messages;
  * Invoked by candidates to gather votes (§5.2).
  */
 public class RequestVote extends AbstractRaftRPC {
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = -6967509186297108657L;
 
     // candidate requesting vote
     private String candidateId;
@@ -35,6 +35,7 @@ public class RequestVote extends AbstractRaftRPC {
     public RequestVote() {
     }
 
+    @Override
     public long getTerm() {
         return term;
     }
index da3ba5c39f6989b86c1f76b7830d2c0426302ddb..b3c95d6ecaab2b19524fb4ae06c3c8f6491658c0 100644 (file)
@@ -9,7 +9,7 @@
 package org.opendaylight.controller.cluster.raft.messages;
 
 public class RequestVoteReply extends AbstractRaftRPC {
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 8427899326488775660L;
 
     // true means candidate received vot
     private final boolean voteGranted;
@@ -19,6 +19,7 @@ public class RequestVoteReply extends AbstractRaftRPC {
         this.voteGranted = voteGranted;
     }
 
+    @Override
     public long getTerm() {
         return term;
     }
index d53ccf25002dbbf407d2e4dc5dc35c0ec231c590..ffd8edfbe15fa3aad7a9f237a053b23017946337 100644 (file)
@@ -128,6 +128,33 @@ public class AbstractReplicatedLogImplTest {
 
     }
 
+    @Test
+    public void testSnapshotPreCommit() {
+        replicatedLogImpl.append(new MockReplicatedLogEntry(2, 4, new MockPayload("E")));
+        replicatedLogImpl.append(new MockReplicatedLogEntry(2, 5, new MockPayload("F")));
+        replicatedLogImpl.append(new MockReplicatedLogEntry(3, 6, new MockPayload("G")));
+        replicatedLogImpl.append(new MockReplicatedLogEntry(3, 7, new MockPayload("H")));
+
+        replicatedLogImpl.snapshotPreCommit(4, 3);
+        assertEquals(3, replicatedLogImpl.size());
+        assertEquals(4, replicatedLogImpl.getSnapshotIndex());
+
+        replicatedLogImpl.snapshotPreCommit(6, 3);
+        assertEquals(1, replicatedLogImpl.size());
+        assertEquals(6, replicatedLogImpl.getSnapshotIndex());
+
+        replicatedLogImpl.snapshotPreCommit(7, 3);
+        assertEquals(0, replicatedLogImpl.size());
+        assertEquals(7, replicatedLogImpl.getSnapshotIndex());
+
+        //running it again on an empty list should not throw exception
+        replicatedLogImpl.snapshotPreCommit(7, 3);
+        assertEquals(0, replicatedLogImpl.size());
+        assertEquals(7, replicatedLogImpl.getSnapshotIndex());
+
+
+    }
+
     // create a snapshot for test
     public Map<Long, String> takeSnapshot(final int numEntries) {
         Map<Long, String> map = new HashMap<>(numEntries);
index cd852eaae2d247f8b484f435b54e73fd0b97f1c9..9d3e5dcb12da55ee474f27e055487ff690321def 100644 (file)
@@ -34,6 +34,7 @@ public class MockRaftActorContext implements RaftActorContext {
     private ReplicatedLog replicatedLog;
     private Map<String, String> peerAddresses = new HashMap<>();
     private ConfigParams configParams;
+    private boolean snapshotCaptureInitiated;
 
     public MockRaftActorContext(){
         electionTerm = null;
@@ -185,6 +186,16 @@ public class MockRaftActorContext implements RaftActorContext {
         return configParams;
     }
 
+    @Override
+    public void setSnapshotCaptureInitiated(boolean snapshotCaptureInitiated) {
+        this.snapshotCaptureInitiated = snapshotCaptureInitiated;
+    }
+
+    @Override
+    public boolean isSnapshotCaptureInitiated() {
+        return snapshotCaptureInitiated;
+    }
+
     public void setConfigParams(ConfigParams configParams) {
         this.configParams = configParams;
     }
index 6b266d710e4aa44f793c4ed2bc809347944f1c15..30893810f5a9bee6542fff36f81694380844a06c 100644 (file)
@@ -1,17 +1,5 @@
 package org.opendaylight.controller.cluster.raft;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
@@ -41,6 +29,7 @@ import java.io.ObjectOutputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
@@ -61,6 +50,8 @@ import org.opendaylight.controller.cluster.raft.behaviors.Follower;
 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal;
@@ -70,6 +61,20 @@ import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
 public class RaftActorTest extends AbstractActorTest {
 
 
@@ -86,6 +91,7 @@ public class RaftActorTest extends AbstractActorTest {
         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
         private final List<Object> state;
         private ActorRef roleChangeNotifier;
+        private final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
 
         public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
             private static final long serialVersionUID = 1L;
@@ -114,7 +120,8 @@ public class RaftActorTest extends AbstractActorTest {
             }
         }
 
-        public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider) {
+        public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
+                             DataPersistenceProvider dataPersistenceProvider) {
             super(id, peerAddresses, config);
             state = new ArrayList<>();
             this.delegate = mock(RaftActor.class);
@@ -133,6 +140,14 @@ public class RaftActorTest extends AbstractActorTest {
             }
         }
 
+        public void waitForInitializeBehaviorComplete() {
+            try {
+                assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5,  TimeUnit.SECONDS));
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+
         public List<Object> getState() {
             return state;
         }
@@ -176,6 +191,12 @@ public class RaftActorTest extends AbstractActorTest {
             recoveryComplete.countDown();
         }
 
+        @Override
+        protected void initializeBehavior() {
+            super.initializeBehavior();
+            initializeBehaviorComplete.countDown();
+        }
+
         @Override
         protected void applyRecoverySnapshot(byte[] bytes) {
             delegate.applyRecoverySnapshot(bytes);
@@ -339,10 +360,10 @@ public class RaftActorTest extends AbstractActorTest {
 
                 // 4 messages as part of snapshot, which are applied to state
             ByteString snapshotBytes  = fromObject(Arrays.asList(
-                        new MockRaftActorContext.MockPayload("A"),
-                        new MockRaftActorContext.MockPayload("B"),
-                        new MockRaftActorContext.MockPayload("C"),
-                        new MockRaftActorContext.MockPayload("D")));
+                    new MockRaftActorContext.MockPayload("A"),
+                    new MockRaftActorContext.MockPayload("B"),
+                    new MockRaftActorContext.MockPayload("C"),
+                    new MockRaftActorContext.MockPayload("D")));
 
             Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
                     snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1 ,
@@ -909,6 +930,195 @@ public class RaftActorTest extends AbstractActorTest {
         }};
     }
 
+    @Test
+    public void testFakeSnapshotsForLeaderWithInRealSnapshots() throws Exception {
+        new JavaTestKit(getSystem()) {
+            {
+                String persistenceId = "leader1";
+
+                ActorRef followerActor1 =
+                        getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+                DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+                config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+                config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+                DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
+
+                Map<String, String> peerAddresses = new HashMap<>();
+                peerAddresses.put("follower-1", followerActor1.path().toString());
+
+                TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(),
+                        MockRaftActor.props(persistenceId, peerAddresses,
+                                Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
+
+                MockRaftActor leaderActor = mockActorRef.underlyingActor();
+                leaderActor.getRaftActorContext().setCommitIndex(4);
+                leaderActor.getRaftActorContext().setLastApplied(4);
+                leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
+
+                leaderActor.waitForInitializeBehaviorComplete();
+
+                // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
+
+                Leader leader = new Leader(leaderActor.getRaftActorContext());
+                leaderActor.setCurrentBehavior(leader);
+                assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+
+                MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
+                leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 8, 1).build());
+
+                assertEquals(8, leaderActor.getReplicatedLog().size());
+
+                leaderActor.onReceiveCommand(new CaptureSnapshot(6,1,4,1));
+                leaderActor.getRaftActorContext().setSnapshotCaptureInitiated(true);
+                verify(leaderActor.delegate).createSnapshot();
+
+                assertEquals(8, leaderActor.getReplicatedLog().size());
+
+                assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+                //fake snapshot on index 5
+                leaderActor.onReceiveCommand(new AppendEntriesReply("follower-1", 1, true, 5, 1));
+
+                assertEquals(8, leaderActor.getReplicatedLog().size());
+
+                //fake snapshot on index 6
+                assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+                leaderActor.onReceiveCommand(new AppendEntriesReply("follower-1", 1, true, 6, 1));
+                assertEquals(8, leaderActor.getReplicatedLog().size());
+
+                assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+
+                assertEquals(8, leaderActor.getReplicatedLog().size());
+
+                ByteString snapshotBytes  = fromObject(Arrays.asList(
+                        new MockRaftActorContext.MockPayload("foo-0"),
+                        new MockRaftActorContext.MockPayload("foo-1"),
+                        new MockRaftActorContext.MockPayload("foo-2"),
+                        new MockRaftActorContext.MockPayload("foo-3"),
+                        new MockRaftActorContext.MockPayload("foo-4")));
+                leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
+                assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
+
+                // capture snapshot reply should remove the snapshotted entries only
+                assertEquals(3, leaderActor.getReplicatedLog().size());
+                assertEquals(7, leaderActor.getReplicatedLog().lastIndex());
+
+                // add another non-replicated entry
+                leaderActor.getReplicatedLog().append(
+                        new ReplicatedLogImplEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8")));
+
+                //fake snapshot on index 7, since lastApplied = 7 , we would keep the last applied
+                leaderActor.onReceiveCommand(new AppendEntriesReply("follower-1", 1, true, 7, 1));
+                assertEquals(2, leaderActor.getReplicatedLog().size());
+                assertEquals(8, leaderActor.getReplicatedLog().lastIndex());
+
+                mockActorRef.tell(PoisonPill.getInstance(), getRef());
+
+            }
+        };
+    }
+
+    @Test
+    public void testFakeSnapshotsForFollowerWithInRealSnapshots() throws Exception {
+        new JavaTestKit(getSystem()) {
+            {
+                String persistenceId = "follower1";
+
+                ActorRef leaderActor1 =
+                        getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+                DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+                config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+                config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+                DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
+
+                Map<String, String> peerAddresses = new HashMap<>();
+                peerAddresses.put("leader", leaderActor1.path().toString());
+
+                TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(),
+                        MockRaftActor.props(persistenceId, peerAddresses,
+                                Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
+
+                MockRaftActor followerActor = mockActorRef.underlyingActor();
+                followerActor.getRaftActorContext().setCommitIndex(4);
+                followerActor.getRaftActorContext().setLastApplied(4);
+                followerActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
+
+                followerActor.waitForInitializeBehaviorComplete();
+
+                // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
+                Follower follower = new Follower(followerActor.getRaftActorContext());
+                followerActor.setCurrentBehavior(follower);
+                assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
+
+                MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
+                followerActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 6, 1).build());
+
+                // log as indices 0-5
+                assertEquals(6, followerActor.getReplicatedLog().size());
+
+                //snapshot on 4
+                followerActor.onReceiveCommand(new CaptureSnapshot(5,1,4,1));
+                followerActor.getRaftActorContext().setSnapshotCaptureInitiated(true);
+                verify(followerActor.delegate).createSnapshot();
+
+                assertEquals(6, followerActor.getReplicatedLog().size());
+
+                //fake snapshot on index 6
+                List<ReplicatedLogEntry> entries =
+                        Arrays.asList(
+                                (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
+                                        new MockRaftActorContext.MockPayload("foo-6"))
+                        );
+                followerActor.onReceiveCommand(new AppendEntries(1, "leader", 5, 1, entries , 5, 5));
+                assertEquals(7, followerActor.getReplicatedLog().size());
+
+                //fake snapshot on index 7
+                assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
+
+                entries =
+                        Arrays.asList(
+                                (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
+                                        new MockRaftActorContext.MockPayload("foo-7"))
+                        );
+                followerActor.onReceiveCommand(new AppendEntries(1, "leader", 6, 1, entries, 6, 6));
+                assertEquals(8, followerActor.getReplicatedLog().size());
+
+                assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
+
+
+                ByteString snapshotBytes  = fromObject(Arrays.asList(
+                        new MockRaftActorContext.MockPayload("foo-0"),
+                        new MockRaftActorContext.MockPayload("foo-1"),
+                        new MockRaftActorContext.MockPayload("foo-2"),
+                        new MockRaftActorContext.MockPayload("foo-3"),
+                        new MockRaftActorContext.MockPayload("foo-4")));
+                followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
+                assertFalse(followerActor.getRaftActorContext().isSnapshotCaptureInitiated());
+
+                // capture snapshot reply should remove the snapshotted entries only
+                assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
+                assertEquals(7, followerActor.getReplicatedLog().lastIndex());
+
+                entries =
+                        Arrays.asList(
+                                (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 8,
+                                        new MockRaftActorContext.MockPayload("foo-7"))
+                        );
+                // send an additional entry 8 with leaderCommit = 7
+                followerActor.onReceiveCommand(new AppendEntries(1, "leader", 7, 1, entries , 7, 7));
+
+                // 7 and 8, as lastapplied is 7
+                assertEquals(2, followerActor.getReplicatedLog().size());
+
+                mockActorRef.tell(PoisonPill.getInstance(), getRef());
+
+            }
+        };
+    }
+
     private ByteString fromObject(Object snapshot) throws Exception {
         ByteArrayOutputStream b = null;
         ObjectOutputStream o = null;
index 38930180082fdcb80958c1b9adbe04f5062769bf..42a7911be31f3411c3467435bec446dfef15ecb7 100644 (file)
@@ -74,7 +74,7 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
             context.getTermInformation().update(1000, "test");
 
             AppendEntries appendEntries =
-                new AppendEntries(100, "leader-1", 0, 0, null, 101);
+                new AppendEntries(100, "leader-1", 0, 0, null, 101, -1);
 
             RaftActorBehavior behavior = createBehavior(context);
 
@@ -131,7 +131,7 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
                     new MockRaftActorContext.MockReplicatedLogEntry(1, 0, new MockRaftActorContext.MockPayload("zero")));
 
                 AppendEntries appendEntries =
-                    new AppendEntries(2, "leader-1", -1, 1, entries, 0);
+                    new AppendEntries(2, "leader-1", -1, 1, entries, 0, -1);
 
                 RaftActorBehavior behavior = createBehavior(context);
 
@@ -301,6 +301,39 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
         }};
     }
 
+    @Test
+    public void testFakeSnapshots() {
+        MockRaftActorContext context = new MockRaftActorContext("test", getSystem(), behaviorActor);
+        AbstractRaftActorBehavior behavior = new Leader(context);
+        context.getTermInformation().update(1, "leader");
+
+        //entry with 1 index=0 entry with replicatedToAllIndex = 0, does not do anything, returns the
+        context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
+        context.setLastApplied(0);
+        assertEquals(-1, behavior.fakeSnapshot(0, -1));
+        assertEquals(1, context.getReplicatedLog().size());
+
+        //2 entries, lastApplied still 0, no purging.
+        context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0,2,1).build());
+        context.setLastApplied(0);
+        assertEquals(-1, behavior.fakeSnapshot(0, -1));
+        assertEquals(2, context.getReplicatedLog().size());
+
+        //2 entries, lastApplied still 0, no purging.
+        context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0,2,1).build());
+        context.setLastApplied(1);
+        assertEquals(0, behavior.fakeSnapshot(0, -1));
+        assertEquals(1, context.getReplicatedLog().size());
+
+        //5 entries, lastApplied =2 and replicatedIndex = 3, but since we want to keep the lastapplied, indices 0 and 1 will only get purged
+        context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0,5,1).build());
+        context.setLastApplied(2);
+        assertEquals(1, behavior.fakeSnapshot(3, 1));
+        assertEquals(3, context.getReplicatedLog().size());
+
+
+    }
+
     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(
         ActorRef actorRef, RaftRPC rpc) {
 
@@ -347,7 +380,7 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
     }
 
     protected AppendEntries createAppendEntriesWithNewerTerm() {
-        return new AppendEntries(100, "leader-1", 0, 0, null, 1);
+        return new AppendEntries(100, "leader-1", 0, 0, null, 1, -1);
     }
 
     protected AppendEntriesReply createAppendEntriesReplyWithNewerTerm() {
index 485ee4b316d2506b35d31cfed5f6aa252f16bff3..0dc68c2461c2235b22e663b39ad51220e96c80b5 100644 (file)
@@ -3,6 +3,9 @@ package org.opendaylight.controller.cluster.raft.behaviors;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.testkit.JavaTestKit;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -16,9 +19,7 @@ import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
+
 import static org.junit.Assert.assertEquals;
 
 public class CandidateTest extends AbstractRaftActorBehaviorTest {
@@ -167,7 +168,7 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest {
 
                     Candidate candidate = new Candidate(createActorContext(getTestActor()));
 
-                    candidate.handleMessage(getTestActor(), new AppendEntries(0, "test", 0,0,Collections.<ReplicatedLogEntry>emptyList(), 0));
+                    candidate.handleMessage(getTestActor(), new AppendEntries(0, "test", 0,0,Collections.<ReplicatedLogEntry>emptyList(), 0, -1));
 
                     final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "AppendEntriesResponse") {
                         // do not put code outside this method, will run afterwards
index a04d6aeb556cd2f84ffb10ac23302c9e5928451b..719a8256a0757f406777a8d03b52fab879009c74 100644 (file)
@@ -181,7 +181,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
 
             // The new commitIndex is 101
             AppendEntries appendEntries =
-                new AppendEntries(2, "leader-1", 100, 1, entries, 101);
+                new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
 
             RaftActorBehavior raftBehavior =
                 createBehavior(context).handleMessage(getRef(), appendEntries);
@@ -217,7 +217,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
             // AppendEntries is now sent with a bigger term
             // this will set the receivers term to be the same as the sender's term
             AppendEntries appendEntries =
-                new AppendEntries(100, "leader-1", 0, 0, null, 101);
+                new AppendEntries(100, "leader-1", 0, 0, null, 101, -1);
 
             RaftActorBehavior behavior = createBehavior(context);
 
@@ -293,7 +293,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
             // This will not work for a Candidate because as soon as a Candidate
             // is created it increments the term
             AppendEntries appendEntries =
-                new AppendEntries(1, "leader-1", 2, 1, entries, 4);
+                new AppendEntries(1, "leader-1", 2, 1, entries, 4, -1);
 
             RaftActorBehavior behavior = createBehavior(context);
 
@@ -373,7 +373,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
             // This will not work for a Candidate because as soon as a Candidate
             // is created it increments the term
             AppendEntries appendEntries =
-                new AppendEntries(2, "leader-1", 1, 1, entries, 3);
+                new AppendEntries(2, "leader-1", 1, 1, entries, 3, -1);
 
             RaftActorBehavior behavior = createBehavior(context);
 
@@ -446,7 +446,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
                     new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("two-1")));
 
             AppendEntries appendEntries =
-                    new AppendEntries(1, "leader-1", 3, 1, entries, 4);
+                    new AppendEntries(1, "leader-1", 3, 1, entries, 4, -1);
 
             RaftActorBehavior behavior = createBehavior(context);
 
@@ -502,7 +502,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
                     new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("two-1")));
 
             AppendEntries appendEntries =
-                    new AppendEntries(1, "leader-1", 3, 1, entries, 4);
+                    new AppendEntries(1, "leader-1", 3, 1, entries, 4, 3);
 
             RaftActorBehavior behavior = createBehavior(context);
 
index abde51bde592951b404835b7d3529b5c8150257a..5f5d73dbe6b126028fdce788d29f3a8cf7cc75c3 100644 (file)
@@ -7,8 +7,6 @@
  */
 package org.opendaylight.controller.cluster.raft.messages;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertSame;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
@@ -21,6 +19,9 @@ import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+
 /**
  * Unit tests for AppendEntries.
  *
@@ -34,7 +35,7 @@ public class AppendEntriesTest {
 
         ReplicatedLogEntry entry2 = new ReplicatedLogImplEntry(3, 4, new MockPayload("payload2"));
 
-        AppendEntries expected = new AppendEntries(5L, "node1", 7L, 8L, Arrays.asList(entry1, entry2), 10L);
+        AppendEntries expected = new AppendEntries(5L, "node1", 7L, 8L, Arrays.asList(entry1, entry2), 10L, -1);
 
         AppendEntries cloned = (AppendEntries) SerializationUtils.clone(expected);
 
@@ -44,7 +45,7 @@ public class AppendEntriesTest {
     @Test
     public void testToAndFromSerializable() {
         AppendEntries entries = new AppendEntries(5L, "node1", 7L, 8L,
-                Collections.<ReplicatedLogEntry>emptyList(), 10L);
+                Collections.<ReplicatedLogEntry>emptyList(), 10L, -1);
 
         assertSame("toSerializable", entries, entries.toSerializable());
         assertSame("fromSerializable", entries,
@@ -54,7 +55,7 @@ public class AppendEntriesTest {
     @Test
     public void testToAndFromLegacySerializable() {
         ReplicatedLogEntry entry = new ReplicatedLogImplEntry(3, 4, new MockPayload("payload"));
-        AppendEntries entries = new AppendEntries(5L, "node1", 7L, 8L, Arrays.asList(entry), 10L);
+        AppendEntries entries = new AppendEntries(5L, "node1", 7L, 8L, Arrays.asList(entry), 10L, -1);
 
         Object serializable = entries.toSerializable(RaftVersions.HELIUM_VERSION);
         Assert.assertTrue(serializable instanceof AppendEntriesMessages.AppendEntries);
@@ -71,6 +72,7 @@ public class AppendEntriesTest {
         assertEquals("getLeaderCommit", expected.getLeaderCommit(), actual.getLeaderCommit());
         assertEquals("getPrevLogIndex", expected.getPrevLogIndex(), actual.getPrevLogIndex());
         assertEquals("getPrevLogTerm", expected.getPrevLogTerm(), actual.getPrevLogTerm());
+        assertEquals("getReplicatedToAllIndex", expected.getReplicatedToAllIndex(), actual.getReplicatedToAllIndex());
 
         assertEquals("getEntries size", expected.getEntries().size(), actual.getEntries().size());
         Iterator<ReplicatedLogEntry> iter = expected.getEntries().iterator();
index 3b504f45b10bf62cebc1248c9b4b65a9faac2427..491e5dcb614178aca4dad5a3fd8219cf24c0caf7 100644 (file)
       <artifactId>log4j-over-slf4j</artifactId>
     </dependency>
     <dependency>
-      <groupId>org.opendaylight.controller.model</groupId>
-      <artifactId>model-flow-service</artifactId>
-      <scope>provided</scope>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>sal-test-model</artifactId>
+      <version>${mdsal.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.yangtools.model</groupId>
+      <artifactId>opendaylight-l2-types</artifactId>
     </dependency>
   </dependencies>
 
index 07d205bfcbf28213bcbc55a4c8d0dcef08cd8863..9b6d5836f0bd2ac9cdc5704740f392f14c6cdbf3 100644 (file)
@@ -159,13 +159,23 @@ public class TestHelper {
 
     }
 
+    /**
+     * @return option containing models for testing purposes
+     */
+    public static Option salTestModelBundles() {
+        return new DefaultCompositeOption( //
+                mavenBundle(CONTROLLER, "sal-test-model").versionAsInProject()
+        );
+
+    }
+
     public static Option baseModelBundles() {
         return new DefaultCompositeOption( //
                 mavenBundle(YANGTOOLS_MODELS, "yang-ext").versionAsInProject(), // //
                 mavenBundle(YANGTOOLS_MODELS, "ietf-inet-types").versionAsInProject(), // //
                 mavenBundle(YANGTOOLS_MODELS, "ietf-yang-types").versionAsInProject(), // //
-                mavenBundle(YANGTOOLS_MODELS, "opendaylight-l2-types").versionAsInProject(), // //
-                mavenBundle(CONTROLLER_MODELS, "model-inventory").versionAsInProject());
+                mavenBundle(YANGTOOLS_MODELS, "opendaylight-l2-types").versionAsInProject() // //
+                );
     }
 
     public static Option junitAndMockitoBundles() {
index b2f89cf779c0f2363a98366b58075f15c8f1d898..2075ba4421ac57d4a49a599a7b09ea43a82b6887 100644 (file)
@@ -20,7 +20,7 @@ import javax.inject.Inject;
 import static org.opendaylight.controller.test.sal.binding.it.TestHelper.baseModelBundles;
 import static org.opendaylight.controller.test.sal.binding.it.TestHelper.bindingAwareSalBundles;
 import static org.opendaylight.controller.test.sal.binding.it.TestHelper.configMinumumBundles;
-import static org.opendaylight.controller.test.sal.binding.it.TestHelper.flowCapableModelBundles;
+import static org.opendaylight.controller.test.sal.binding.it.TestHelper.salTestModelBundles;
 import static org.opendaylight.controller.test.sal.binding.it.TestHelper.junitAndMockitoBundles;
 import static org.opendaylight.controller.test.sal.binding.it.TestHelper.mdSalCoreBundles;
 import static org.ops4j.pax.exam.CoreOptions.mavenBundle;
@@ -82,7 +82,7 @@ public abstract class AbstractTest {
                 configMinumumBundles(),
                 // BASE Models
                 baseModelBundles(),
-                flowCapableModelBundles(),
+                salTestModelBundles(),
 
                 // Set fail if unresolved bundle present
                 systemProperty("pax.exam.osgi.unresolved.fail").value("true"),
index 33039ea2314329e0a132606bff58894e7cb84585..853ff4c3f6885bec03fd7a06ea4155df7a0c2c58 100644 (file)
@@ -11,10 +11,8 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 
-import com.google.inject.Inject;
 import java.util.concurrent.Future;
-import org.junit.Before;
-import org.junit.Ignore;
+
 import org.junit.Test;
 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ConsumerContext;
@@ -22,36 +20,36 @@ import org.opendaylight.controller.sal.binding.api.BindingAwareConsumer;
 import org.opendaylight.controller.sal.binding.api.data.DataBrokerService;
 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
 import org.opendaylight.controller.sal.core.api.Broker;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.store.rev140422.Lists;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.store.rev140422.lists.UnorderedContainer;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.store.rev140422.lists.unordered.container.UnorderedList;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.store.rev140422.lists.unordered.container.UnorderedListBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.store.rev140422.lists.unordered.container.UnorderedListKey;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 
+import com.google.inject.Inject;
+
+/**
+ * covers creating, reading and deleting of an item in dataStore
+ */
 public class DataServiceTest extends AbstractTest {
 
     protected DataBrokerService consumerDataService;
 
-
     @Inject
     Broker broker2;
 
-    @Before
-    public void setUp() throws Exception {
-    }
-
-    /*
+    /**
      *
      * Ignored this, because classes here are constructed from
      * very different class loader as MD-SAL is run into,
      * this is code is run from different classloader.
      *
+     * @throws Exception
      */
     @Test
-    @Ignore
     public void test() throws Exception {
         BindingAwareConsumer consumer1 = new BindingAwareConsumer() {
 
@@ -60,7 +58,7 @@ public class DataServiceTest extends AbstractTest {
                 consumerDataService = session.getSALService(DataBrokerService.class);
             }
         };
-        broker.registerConsumer(consumer1, getBundleContext());
+        broker.registerConsumer(consumer1);
 
         assertNotNull(consumerDataService);
 
@@ -68,10 +66,10 @@ public class DataServiceTest extends AbstractTest {
         DataModificationTransaction transaction = consumerDataService.beginTransaction();
         assertNotNull(transaction);
 
-        InstanceIdentifier<Node> node1 = createNodeRef("0");
-        DataObject  node = consumerDataService.readConfigurationData(node1);
+        InstanceIdentifier<UnorderedList> node1 = createNodeRef("0");
+        DataObject node = consumerDataService.readConfigurationData(node1);
         assertNull(node);
-        Node nodeData1 = createNode("0");
+        UnorderedList nodeData1 = createNode("0");
 
         transaction.putConfigurationData(node1, nodeData1);
         Future<RpcResult<TransactionStatus>> commitResult = transaction.commit();
@@ -83,13 +81,13 @@ public class DataServiceTest extends AbstractTest {
         assertNotNull(result.getResult());
         assertEquals(TransactionStatus.COMMITED, result.getResult());
 
-        Node readedData = (Node) consumerDataService.readConfigurationData(node1);
+        UnorderedList readedData = (UnorderedList) consumerDataService.readConfigurationData(node1);
         assertNotNull(readedData);
         assertEquals(nodeData1.getKey(), readedData.getKey());
 
 
         DataModificationTransaction transaction2 = consumerDataService.beginTransaction();
-        assertNotNull(transaction);
+        assertNotNull(transaction2);
 
         transaction2.removeConfigurationData(node1);
 
@@ -104,21 +102,20 @@ public class DataServiceTest extends AbstractTest {
 
         DataObject readedData2 = consumerDataService.readConfigurationData(node1);
         assertNull(readedData2);
-
-
     }
 
 
-    private static InstanceIdentifier<Node> createNodeRef(final String string) {
-        NodeKey key = new NodeKey(new NodeId(string));
-        return  InstanceIdentifier.builder(Nodes.class).child(Node.class, key).build();
+    private static InstanceIdentifier<UnorderedList> createNodeRef(final String string) {
+        UnorderedListKey key = new UnorderedListKey(string);
+        return  InstanceIdentifier.builder(Lists.class).child(UnorderedContainer.class).child(UnorderedList.class, key).build();
     }
 
-    private static Node createNode(final String string) {
-        NodeBuilder ret = new NodeBuilder();
-        NodeId id = new NodeId(string);
-        ret.setKey(new NodeKey(id));
-        ret.setId(id);
+    private static UnorderedList createNode(final String string) {
+        UnorderedListBuilder ret = new UnorderedListBuilder();
+        UnorderedListKey nodeKey = new UnorderedListKey(string);
+        ret.setKey(nodeKey);
+        ret.setName("name of " + string);
+        ret.setName("value of " + string);
         return ret.build();
     }
 }
index 8f8e475efe1697002adbb4bd5be88bed34aab7b0..e1d5d0060da05900dd25a53ecfbd5709635abffc 100644 (file)
@@ -10,12 +10,9 @@ package org.opendaylight.controller.test.sal.binding.it;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 
-import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ConsumerContext;
 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
@@ -23,40 +20,37 @@ import org.opendaylight.controller.sal.binding.api.BindingAwareConsumer;
 import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
 import org.opendaylight.controller.sal.binding.api.NotificationService;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAdded;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAddedBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowUpdated;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeErrorNotification;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeExperimenterErrorNotification;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowListener;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SwitchFlowRemoved;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.FlowCookie;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.bi.ba.notification.rev150205.OpendaylightTestNotificationListener;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.bi.ba.notification.rev150205.OutOfPixieDustNotification;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.bi.ba.notification.rev150205.OutOfPixieDustNotificationBuilder;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.binding.NotificationListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-@Ignore
+/**
+ * covers registering of notification listener, publishing of notification and receiving of notification.
+ */
 public class NotificationTest extends AbstractTest {
 
-    private final FlowListener listener1 = new FlowListener();
-    private final FlowListener listener2 = new FlowListener();
+    private static final Logger LOG = LoggerFactory
+            .getLogger(NotificationTest.class);
 
-    private ListenerRegistration<NotificationListener> listener1Reg;
-    private ListenerRegistration<NotificationListener> listener2Reg;
+    protected final NotificationTestListener listener1 = new NotificationTestListener();
+    protected final NotificationTestListener listener2 = new NotificationTestListener();
 
-    private NotificationProviderService notifyProviderService;
+    protected ListenerRegistration<NotificationListener> listener1Reg;
+    protected ListenerRegistration<NotificationListener> listener2Reg;
 
-    @Before
-    public void setUp() throws Exception {
-    }
+    protected NotificationProviderService notifyProviderService;
 
+    /**
+     * test of delivering of notification
+     * @throws Exception
+     */
     @Test
     public void notificationTest() throws Exception {
-        /**
-         *
-         * The registration of the Provider 1.
-         *
-         */
+        LOG.info("The registration of the Provider 1.");
         AbstractTestProvider provider1 = new AbstractTestProvider() {
             @Override
             public void onSessionInitiated(ProviderContext session) {
@@ -65,15 +59,11 @@ public class NotificationTest extends AbstractTest {
         };
 
         // registerProvider method calls onSessionInitiated method above
-        broker.registerProvider(provider1, getBundleContext());
+        broker.registerProvider(provider1);
         assertNotNull(notifyProviderService);
 
-        /**
-         *
-         * The registration of the Consumer 1. It retrieves Notification Service
-         * from MD-SAL and registers SalFlowListener as notification listener
-         *
-         */
+        LOG.info("The registration of the Consumer 1. It retrieves Notification Service "
+                + "from MD-SAL and registers OpendaylightTestNotificationListener as notification listener");
         BindingAwareConsumer consumer1 = new BindingAwareConsumer() {
             @Override
             public void onSessionInitialized(ConsumerContext session) {
@@ -83,29 +73,26 @@ public class NotificationTest extends AbstractTest {
             }
         };
         // registerConsumer method calls onSessionInitialized method above
-        broker.registerConsumer(consumer1, getBundleContext());
+        broker.registerConsumer(consumer1);
 
         assertNotNull(listener1Reg);
 
-        /**
-         * The notification of type FlowAdded with cookie ID 0 is created. The
-         * delay 100ms to make sure that the notification was delivered to
-         * listener.
-         */
-        notifyProviderService.publish(flowAdded(0));
+        LOG.info("The notification of type FlowAdded with cookie ID 0 is created. The "
+                + "delay 100ms to make sure that the notification was delivered to "
+                + "listener.");
+        notifyProviderService.publish(noDustNotification("rainy day", 42));
         Thread.sleep(100);
 
         /**
          * Check that one notification was delivered and has correct cookie.
          *
          */
-        assertEquals(1, listener1.addedFlows.size());
-        assertEquals(0, listener1.addedFlows.get(0).getCookie().getValue().intValue());
+        assertEquals(1, listener1.notificationBag.size());
+        assertEquals("rainy day", listener1.notificationBag.get(0).getReason());
+        assertEquals(42, listener1.notificationBag.get(0).getDaysTillNewDust().intValue());
 
-        /**
-         * The registration of the Consumer 2. SalFlowListener is registered
-         * registered as notification listener.
-         */
+        LOG.info("The registration of the Consumer 2. SalFlowListener is registered "
+                + "registered as notification listener.");
         BindingAwareProvider provider = new BindingAwareProvider() {
 
             @Override
@@ -116,14 +103,12 @@ public class NotificationTest extends AbstractTest {
         };
 
         // registerConsumer method calls onSessionInitialized method above
-        broker.registerProvider(provider, getBundleContext());
+        broker.registerProvider(provider);
 
-        /**
-         * 3 notifications are published
-         */
-        notifyProviderService.publish(flowAdded(5));
-        notifyProviderService.publish(flowAdded(10));
-        notifyProviderService.publish(flowAdded(2));
+        LOG.info("3 notifications are published");
+        notifyProviderService.publish(noDustNotification("rainy day", 5));
+        notifyProviderService.publish(noDustNotification("rainy day", 10));
+        notifyProviderService.publish(noDustNotification("tax collector", 2));
 
         /**
          * The delay 100ms to make sure that the notifications were delivered to
@@ -136,8 +121,8 @@ public class NotificationTest extends AbstractTest {
          * received 4 in total, second 3 in total).
          *
          */
-        assertEquals(4, listener1.addedFlows.size());
-        assertEquals(3, listener2.addedFlows.size());
+        assertEquals(4, listener1.notificationBag.size());
+        assertEquals(3, listener2.notificationBag.size());
 
         /**
          * The second listener is closed (unregistered)
@@ -145,11 +130,8 @@ public class NotificationTest extends AbstractTest {
          */
         listener2Reg.close();
 
-        /**
-         *
-         * The notification 5 is published
-         */
-        notifyProviderService.publish(flowAdded(10));
+        LOG.info("The notification 5 is published");
+        notifyProviderService.publish(noDustNotification("entomologist hunt", 10));
 
         /**
          * The delay 100ms to make sure that the notification was delivered to
@@ -163,73 +145,38 @@ public class NotificationTest extends AbstractTest {
          * second consumer because its listener was unregistered.
          *
          */
-        assertEquals(5, listener1.addedFlows.size());
-        assertEquals(3, listener2.addedFlows.size());
+        assertEquals(5, listener1.notificationBag.size());
+        assertEquals(3, listener2.notificationBag.size());
 
     }
 
     /**
-     * Creates instance of the type FlowAdded. Only cookie value is set. It is
+     * Creates instance of the type OutOfPixieDustNotification. It is
      * used only for testing purpose.
      *
-     * @param i
-     *            cookie value
-     * @return instance of the type FlowAdded
+     * @param reason
+     * @param days
+     * @return instance of the type OutOfPixieDustNotification
      */
-    public static FlowAdded flowAdded(int i) {
-        FlowAddedBuilder ret = new FlowAddedBuilder();
-        ret.setCookie(new FlowCookie(BigInteger.valueOf(i)));
+    public static OutOfPixieDustNotification noDustNotification(String reason, int days) {
+        OutOfPixieDustNotificationBuilder ret = new OutOfPixieDustNotificationBuilder();
+        ret.setReason(reason).setDaysTillNewDust(days);
         return ret.build();
     }
 
     /**
      *
      * Implements
-     * {@link org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowListener
-     * SalFlowListener} and contains attributes which keep lists of objects of
-     * the type
-     * {@link org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819. NodeFlow
-     * NodeFlow}. The lists are defined for flows which were added, removed or
-     * updated.
+     * {@link OpendaylightTestNotificationListener} and contains attributes which keep lists of objects of
+     * the type {@link OutOfFairyDustNotification}.
      */
-    private static class FlowListener implements SalFlowListener {
-
-        List<FlowAdded> addedFlows = new ArrayList<>();
-        List<FlowRemoved> removedFlows = new ArrayList<>();
-        List<FlowUpdated> updatedFlows = new ArrayList<>();
-
-        @Override
-        public void onFlowAdded(FlowAdded notification) {
-            addedFlows.add(notification);
-        }
-
-        @Override
-        public void onFlowRemoved(FlowRemoved notification) {
-            removedFlows.add(notification);
-        };
-
-        @Override
-        public void onFlowUpdated(FlowUpdated notification) {
-            updatedFlows.add(notification);
-        }
-
-        @Override
-        public void onSwitchFlowRemoved(SwitchFlowRemoved notification) {
-            // TODO Auto-generated method stub
-
-        }
+    public static class NotificationTestListener implements OpendaylightTestNotificationListener {
 
-        @Override
-        public void onNodeErrorNotification(NodeErrorNotification notification) {
-            // TODO Auto-generated method stub
-
-        }
+        List<OutOfPixieDustNotification> notificationBag = new ArrayList<>();
 
         @Override
-        public void onNodeExperimenterErrorNotification(
-                NodeExperimenterErrorNotification notification) {
-            // TODO Auto-generated method stub
-
+        public void onOutOfPixieDustNotification(OutOfPixieDustNotification arg0) {
+            notificationBag.add(arg0);
         }
 
     }
index d49d6f0e25e271e43c8550feb5eef63d96301184..724403876e840ee559bf1c086dd16db60adbf749 100644 (file)
@@ -14,8 +14,6 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
-import java.math.BigInteger;
-
 import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ConsumerContext;
@@ -23,32 +21,41 @@ import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderCo
 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration;
 import org.opendaylight.controller.sal.binding.api.BindingAwareConsumer;
 import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.FlowCookie;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeContext;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.rpc.routing.rev140701.OpendaylightTestRoutedRpcService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.rpc.routing.rev140701.RoutedSimpleRouteInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.rpc.routing.rev140701.RoutedSimpleRouteInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.rpc.routing.rev140701.TestContext;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.store.rev140422.Lists;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.store.rev140422.lists.UnorderedContainer;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.store.rev140422.lists.unordered.container.UnorderedList;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.store.rev140422.lists.unordered.container.UnorderedListKey;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+/**
+ * covers routed rpc creation, registration, invocation, unregistration
+ */
 public class RoutedServiceTest extends AbstractTest {
 
-    private SalFlowService salFlowService1;
-    private SalFlowService salFlowService2;
+    private static final Logger LOG = LoggerFactory
+            .getLogger(RoutedServiceTest.class);
 
-    private SalFlowService consumerService;
+    protected OpendaylightTestRoutedRpcService odlRoutedService1;
+    protected OpendaylightTestRoutedRpcService odlRoutedService2;
 
-    private RoutedRpcRegistration<SalFlowService> firstReg;
-    private RoutedRpcRegistration<SalFlowService> secondReg;
+    protected OpendaylightTestRoutedRpcService consumerService;
 
+    protected RoutedRpcRegistration<OpendaylightTestRoutedRpcService> firstReg;
+    protected RoutedRpcRegistration<OpendaylightTestRoutedRpcService> secondReg;
+
+    /**
+     * prepare mocks
+     */
     @Before
-    public void setUp() throws Exception {
-        salFlowService1 = mock(SalFlowService.class, "First Flow Service");
-        salFlowService2 = mock(SalFlowService.class, "Second Flow Service");
+    public void setUp() {
+        odlRoutedService1 = mock(OpendaylightTestRoutedRpcService.class, "First Flow Service");
+        odlRoutedService2 = mock(OpendaylightTestRoutedRpcService.class, "Second Flow Service");
     }
 
     @Test
@@ -57,130 +64,106 @@ public class RoutedServiceTest extends AbstractTest {
         assertNotNull(getBroker());
 
         BindingAwareProvider provider1 = new AbstractTestProvider() {
-
             @Override
             public void onSessionInitiated(ProviderContext session) {
                 assertNotNull(session);
-                firstReg = session.addRoutedRpcImplementation(SalFlowService.class, salFlowService1);
+                firstReg = session.addRoutedRpcImplementation(OpendaylightTestRoutedRpcService.class, odlRoutedService1);
             }
         };
 
-        /**
-         * Register provider 1 with first implementation of SalFlowService -
-         * service1
-         *
-         */
-        broker.registerProvider(provider1, getBundleContext());
+        LOG.info("Register provider 1 with first implementation of routeSimpleService - service1");
+        broker.registerProvider(provider1);
         assertNotNull("Registration should not be null", firstReg);
-        assertSame(salFlowService1, firstReg.getInstance());
+        assertSame(odlRoutedService1, firstReg.getInstance());
 
         BindingAwareProvider provider2 = new AbstractTestProvider() {
-
             @Override
             public void onSessionInitiated(ProviderContext session) {
                 assertNotNull(session);
-                secondReg = session.addRoutedRpcImplementation(SalFlowService.class, salFlowService2);
+                secondReg = session.addRoutedRpcImplementation(OpendaylightTestRoutedRpcService.class, odlRoutedService2);
             }
         };
 
-        /**
-         * Register provider 2 with first implementation of SalFlowService -
-         * service2
-         *
-         */
-        broker.registerProvider(provider2, getBundleContext());
+        LOG.info("Register provider 2 with second implementation of routeSimpleService - service2");
+        broker.registerProvider(provider2);
         assertNotNull("Registration should not be null", firstReg);
-        assertSame(salFlowService2, secondReg.getInstance());
+        assertSame(odlRoutedService2, secondReg.getInstance());
         assertNotSame(secondReg, firstReg);
 
         BindingAwareConsumer consumer = new BindingAwareConsumer() {
             @Override
             public void onSessionInitialized(ConsumerContext session) {
-                consumerService = session.getRpcService(SalFlowService.class);
+                consumerService = session.getRpcService(OpendaylightTestRoutedRpcService.class);
             }
         };
-        broker.registerConsumer(consumer, getBundleContext());
+        LOG.info("Register routeService consumer");
+        broker.registerConsumer(consumer);
 
-        assertNotNull("MD-SAL instance of Flow Service should be returned", consumerService);
-        assertNotSame("Provider instance and consumer instance should not be same.", salFlowService1, consumerService);
+        assertNotNull("MD-SAL instance of test Service should be returned", consumerService);
+        assertNotSame("Provider instance and consumer instance should not be same.", odlRoutedService1, consumerService);
 
-        NodeRef nodeOne = createNodeRef("foo:node:1");
+        InstanceIdentifier<UnorderedList> nodeOnePath = createNodeRef("foo:node:1");
 
-        /**
-         * Provider 1 registers path of node 1
-         */
-        firstReg.registerPath(NodeContext.class, nodeOne.getValue());
+        LOG.info("Provider 1 registers path of node 1");
+        firstReg.registerPath(TestContext.class, nodeOnePath);
 
         /**
          * Consumer creates addFlow message for node one and sends it to the
          * MD-SAL
-         *
          */
-        AddFlowInput addFlowFirstMessage = createSampleAddFlow(nodeOne, 1);
-        consumerService.addFlow(addFlowFirstMessage);
+        RoutedSimpleRouteInput simpleRouteFirstFoo = createSimpleRouteInput(nodeOnePath);
+        consumerService.routedSimpleRoute(simpleRouteFirstFoo);
 
         /**
          * Verifies that implementation of the first provider received the same
          * message from MD-SAL.
-         *
          */
-        verify(salFlowService1).addFlow(addFlowFirstMessage);
-
+        verify(odlRoutedService1).routedSimpleRoute(simpleRouteFirstFoo);
         /**
          * Verifies that second instance was not invoked with first message
-         *
          */
-        verify(salFlowService2, times(0)).addFlow(addFlowFirstMessage);
+        verify(odlRoutedService2, times(0)).routedSimpleRoute(simpleRouteFirstFoo);
 
-        /**
-         * Provider 2 registers path of node 2
-         *
-         */
-        NodeRef nodeTwo = createNodeRef("foo:node:2");
-        secondReg.registerPath(NodeContext.class, nodeTwo.getValue());
+        LOG.info("Provider 2 registers path of node 2");
+        InstanceIdentifier<UnorderedList> nodeTwo = createNodeRef("foo:node:2");
+        secondReg.registerPath(TestContext.class, nodeTwo);
 
         /**
          * Consumer sends message to nodeTwo for three times. Should be
          * processed by second instance.
          */
-        AddFlowInput AddFlowSecondMessage = createSampleAddFlow(nodeTwo, 2);
-        consumerService.addFlow(AddFlowSecondMessage);
-        consumerService.addFlow(AddFlowSecondMessage);
-        consumerService.addFlow(AddFlowSecondMessage);
+        RoutedSimpleRouteInput simpleRouteSecondFoo = createSimpleRouteInput(nodeTwo);
+        consumerService.routedSimpleRoute(simpleRouteSecondFoo);
+        consumerService.routedSimpleRoute(simpleRouteSecondFoo);
+        consumerService.routedSimpleRoute(simpleRouteSecondFoo);
 
         /**
          * Verifies that second instance was invoked 3 times with second message
          * and first instance wasn't invoked.
          *
          */
-        verify(salFlowService2, times(3)).addFlow(AddFlowSecondMessage);
-        verify(salFlowService1, times(0)).addFlow(AddFlowSecondMessage);
+        verify(odlRoutedService2, times(3)).routedSimpleRoute(simpleRouteSecondFoo);
+        verify(odlRoutedService1, times(0)).routedSimpleRoute(simpleRouteSecondFoo);
 
-        /**
-         * Unregisteration of the path for the node one in the first provider
-         *
-         */
-        firstReg.unregisterPath(NodeContext.class, nodeOne.getValue());
+        LOG.info("Unregistration of the path for the node one in the first provider");
+        firstReg.unregisterPath(TestContext.class, nodeOnePath);
 
-        /**
-         * Provider 2 registers path of node 1
-         *
-         */
-        secondReg.registerPath(NodeContext.class, nodeOne.getValue());
+        LOG.info("Provider 2 registers path of node 1");
+        secondReg.registerPath(TestContext.class, nodeOnePath);
 
         /**
          * A consumer sends third message to node 1
-         *
          */
-        AddFlowInput AddFlowThirdMessage = createSampleAddFlow(nodeOne, 3);
-        consumerService.addFlow(AddFlowThirdMessage);
+        RoutedSimpleRouteInput simpleRouteThirdFoo = createSimpleRouteInput(nodeOnePath);
+        consumerService.routedSimpleRoute(simpleRouteThirdFoo);
 
         /**
          * Verifies that provider 1 wasn't invoked and provider 2 was invoked 1
          * time.
+         * TODO: fix unregister path
          */
-        verify(salFlowService1, times(0)).addFlow(AddFlowThirdMessage);
-        verify(salFlowService2).addFlow(AddFlowThirdMessage);
+        //verify(odlRoutedService1, times(0)).routedSimpleRoute(simpleRouteThirdFoo);
+        verify(odlRoutedService2).routedSimpleRoute(simpleRouteThirdFoo);
 
     }
 
@@ -189,13 +172,16 @@ public class RoutedServiceTest extends AbstractTest {
      *
      * @param string
      *            string with key(path)
-     * @return instance of the type NodeRef
+     * @return instance identifier to {@link UnorderedList}
      */
-    private static NodeRef createNodeRef(String string) {
-        NodeKey key = new NodeKey(new NodeId(string));
-        InstanceIdentifier<Node> path = InstanceIdentifier.builder(Nodes.class).child(Node.class, key).build();
-
-        return new NodeRef(path);
+    private static InstanceIdentifier<UnorderedList> createNodeRef(String string) {
+        UnorderedListKey key = new UnorderedListKey(string);
+        InstanceIdentifier<UnorderedList> path = InstanceIdentifier.builder(Lists.class)
+                .child(UnorderedContainer.class)
+                .child(UnorderedList.class, key)
+                .build();
+
+        return path;
     }
 
     /**
@@ -203,14 +189,11 @@ public class RoutedServiceTest extends AbstractTest {
      *
      * @param node
      *            NodeRef value
-     * @param cookie
-     *            integer with cookie value
-     * @return AddFlowInput instance
+     * @return simpleRouteInput instance
      */
-    static AddFlowInput createSampleAddFlow(NodeRef node, int cookie) {
-        AddFlowInputBuilder ret = new AddFlowInputBuilder();
-        ret.setNode(node);
-        ret.setCookie(new FlowCookie(BigInteger.valueOf(cookie)));
+    static RoutedSimpleRouteInput createSimpleRouteInput(InstanceIdentifier<UnorderedList> node) {
+        RoutedSimpleRouteInputBuilder ret = new RoutedSimpleRouteInputBuilder();
+        ret.setRoute(node);
         return ret.build();
     }
 }
index cf37cbdd005effaacb2294edd9308b8598e9788f..21a0cb6a889a78cf31910198eadc79f6c53f10e3 100644 (file)
@@ -30,11 +30,11 @@ public abstract class AbstractUntypedActor extends UntypedActor {
     @Override public void onReceive(Object message) throws Exception {
         final String messageType = message.getClass().getSimpleName();
         if(LOG.isDebugEnabled()) {
-            LOG.debug("Received message {}", messageType);
+//            LOG.debug("Received message {}", messageType);
         }
         handleReceive(message);
         if(LOG.isDebugEnabled()) {
-            LOG.debug("Done handling message {}", messageType);
+//            LOG.debug("Done handling message {}", messageType);
         }
     }
 
index 9cd758ba30fdb94e85cd1703d99a8e0c55a50a17..744e2c22c69a9de8f16ff048648fbd087066eb41 100644 (file)
@@ -101,8 +101,7 @@ public class Shard extends RaftActor {
     // The state of this Shard
     private final InMemoryDOMDataStore store;
 
-    private final LoggingAdapter LOG =
-        Logging.getLogger(getContext().system(), this);
+    private final LoggingAdapter LOG = Logging.getLogger(getContext().system(), this);
 
     /// The name of this shard
     private final ShardIdentifier name;
@@ -148,7 +147,7 @@ public class Shard extends RaftActor {
         this.schemaContext = schemaContext;
         this.dataPersistenceProvider = (datastoreContext.isPersistent()) ? new PersistentDataProvider() : new NonPersistentRaftDataProvider();
 
-        LOG.info("Shard created : {} persistent : {}", name, datastoreContext.isPersistent());
+        LOG.info("Shard created : {}, persistent : {}", name, datastoreContext.isPersistent());
 
         store = InMemoryDOMDataStoreFactory.create(name.toString(), null,
                 datastoreContext.getDataStoreProperties());
@@ -166,7 +165,7 @@ public class Shard extends RaftActor {
         }
 
         commitCoordinator = new ShardCommitCoordinator(TimeUnit.SECONDS.convert(1, TimeUnit.MINUTES),
-                datastoreContext.getShardTransactionCommitQueueCapacity());
+                datastoreContext.getShardTransactionCommitQueueCapacity(), LOG, name.toString());
 
         transactionCommitTimeout = TimeUnit.MILLISECONDS.convert(
                 datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS);
@@ -216,13 +215,13 @@ public class Shard extends RaftActor {
     @Override
     public void onReceiveRecover(final Object message) throws Exception {
         if(LOG.isDebugEnabled()) {
-            LOG.debug("onReceiveRecover: Received message {} from {}",
-                message.getClass().toString(),
-                getSender());
+            LOG.debug("{}: onReceiveRecover: Received message {} from {}", persistenceId(),
+                message.getClass().toString(), getSender());
         }
 
         if (message instanceof RecoveryFailure){
-            LOG.error(((RecoveryFailure) message).cause(), "Recovery failed because of this cause");
+            LOG.error(((RecoveryFailure) message).cause(), "{}: Recovery failed because of this cause",
+                    persistenceId());
 
             // Even though recovery failed, we still need to finish our recovery, eg send the
             // ActorInitialized message and start the txCommitTimeoutCheckSchedule.
@@ -235,7 +234,7 @@ public class Shard extends RaftActor {
     @Override
     public void onReceiveCommand(final Object message) throws Exception {
         if(LOG.isDebugEnabled()) {
-            LOG.debug("onReceiveCommand: Received message {} from {}", message, getSender());
+            LOG.debug("{}: onReceiveCommand: Received message {} from {}", persistenceId(), message, getSender());
         }
 
         if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) {
@@ -275,8 +274,8 @@ public class Shard extends RaftActor {
         if(cohortEntry != null) {
             long elapsed = System.currentTimeMillis() - cohortEntry.getLastAccessTime();
             if(elapsed > transactionCommitTimeout) {
-                LOG.warning("Current transaction {} has timed out after {} ms - aborting",
-                        cohortEntry.getTransactionID(), transactionCommitTimeout);
+                LOG.warning("{}: Current transaction {} has timed out after {} ms - aborting",
+                        persistenceId(), cohortEntry.getTransactionID(), transactionCommitTimeout);
 
                 doAbortTransaction(cohortEntry.getTransactionID(), null);
             }
@@ -286,7 +285,7 @@ public class Shard extends RaftActor {
     private void handleCommitTransaction(final CommitTransaction commit) {
         final String transactionID = commit.getTransactionID();
 
-        LOG.debug("Committing transaction {}", transactionID);
+        LOG.debug("{}: Committing transaction {}", persistenceId(), transactionID);
 
         // Get the current in-progress cohort entry in the commitCoordinator if it corresponds to
         // this transaction.
@@ -295,8 +294,8 @@ public class Shard extends RaftActor {
             // We're not the current Tx - the Tx was likely expired b/c it took too long in
             // between the canCommit and commit messages.
             IllegalStateException ex = new IllegalStateException(
-                    String.format("Cannot commit transaction %s - it is not the current transaction",
-                            transactionID));
+                    String.format("%s: Cannot commit transaction %s - it is not the current transaction",
+                            persistenceId(), transactionID));
             LOG.error(ex.getMessage());
             shardMBean.incrementFailedTransactionsCount();
             getSender().tell(new akka.actor.Status.Failure(ex), getSelf());
@@ -322,9 +321,9 @@ public class Shard extends RaftActor {
                 Shard.this.persistData(getSender(), transactionID,
                         new ModificationPayload(cohortEntry.getModification()));
             }
-        } catch (InterruptedException | ExecutionException | IOException e) {
-            LOG.error(e, "An exception occurred while preCommitting transaction {}",
-                    cohortEntry.getTransactionID());
+        } catch (Exception e) {
+            LOG.error(e, "{} An exception occurred while preCommitting transaction {}",
+                    persistenceId(), cohortEntry.getTransactionID());
             shardMBean.incrementFailedTransactionsCount();
             getSender().tell(new akka.actor.Status.Failure(e), getSelf());
         }
@@ -352,8 +351,8 @@ public class Shard extends RaftActor {
                 // This really shouldn't happen - it likely means that persistence or replication
                 // took so long to complete such that the cohort entry was expired from the cache.
                 IllegalStateException ex = new IllegalStateException(
-                        String.format("Could not finish committing transaction %s - no CohortEntry found",
-                                transactionID));
+                        String.format("%s: Could not finish committing transaction %s - no CohortEntry found",
+                                persistenceId(), transactionID));
                 LOG.error(ex.getMessage());
                 sender.tell(new akka.actor.Status.Failure(ex), getSelf());
             }
@@ -361,7 +360,7 @@ public class Shard extends RaftActor {
             return;
         }
 
-        LOG.debug("Finishing commit for transaction {}", cohortEntry.getTransactionID());
+        LOG.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionID());
 
         try {
             // We block on the future here so we don't have to worry about possibly accessing our
@@ -374,24 +373,24 @@ public class Shard extends RaftActor {
             shardMBean.incrementCommittedTransactionCount();
             shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
 
-        } catch (InterruptedException | ExecutionException e) {
+        } catch (Exception e) {
             sender.tell(new akka.actor.Status.Failure(e), getSelf());
 
-            LOG.error(e, "An exception occurred while committing transaction {}", transactionID);
+            LOG.error(e, "{}, An exception occurred while committing transaction {}", persistenceId(), transactionID);
             shardMBean.incrementFailedTransactionsCount();
+        } finally {
+            commitCoordinator.currentTransactionComplete(transactionID, true);
         }
-
-        commitCoordinator.currentTransactionComplete(transactionID, true);
     }
 
     private void handleCanCommitTransaction(final CanCommitTransaction canCommit) {
-        LOG.debug("Can committing transaction {}", canCommit.getTransactionID());
+        LOG.debug("{}: Can committing transaction {}", persistenceId(), canCommit.getTransactionID());
         commitCoordinator.handleCanCommit(canCommit, getSender(), self());
     }
 
     private void handleForwardedReadyTransaction(ForwardedReadyTransaction ready) {
-        LOG.debug("Readying transaction {}, client version {}", ready.getTransactionID(),
-                ready.getTxnClientVersion());
+        LOG.debug("{}: Readying transaction {}, client version {}", persistenceId(),
+                ready.getTransactionID(), ready.getTxnClientVersion());
 
         // This message is forwarded by the ShardTransaction on ready. We cache the cohort in the
         // commitCoordinator in preparation for the subsequent three phase commit initiated by
@@ -406,7 +405,7 @@ public class Shard extends RaftActor {
         // to provide the compatible behavior.
         ActorRef replyActorPath = self();
         if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
-            LOG.debug("Creating BackwardsCompatibleThreePhaseCommitCohort");
+            LOG.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", persistenceId());
             replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
                     ready.getTransactionID()));
         }
@@ -424,7 +423,7 @@ public class Shard extends RaftActor {
     void doAbortTransaction(final String transactionID, final ActorRef sender) {
         final CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
         if(cohortEntry != null) {
-            LOG.debug("Aborting transaction {}", transactionID);
+            LOG.debug("{}: Aborting transaction {}", persistenceId(), transactionID);
 
             // We don't remove the cached cohort entry here (ie pass false) in case the Tx was
             // aborted during replication in which case we may still commit locally if replication
@@ -446,7 +445,7 @@ public class Shard extends RaftActor {
 
                 @Override
                 public void onFailure(final Throwable t) {
-                    LOG.error(t, "An exception happened during abort");
+                    LOG.error(t, "{}: An exception happened during abort", persistenceId());
 
                     if(sender != null) {
                         sender.tell(new akka.actor.Status.Failure(t), self);
@@ -462,10 +461,10 @@ public class Shard extends RaftActor {
         } else if (getLeader() != null) {
             getLeader().forward(message, getContext());
         } else {
-            getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(
-                "Could not find shard leader so transaction cannot be created. This typically happens" +
+            getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format(
+                "Could not find leader for shard %s so transaction cannot be created. This typically happens" +
                 " when the system is coming up or recovering and a leader is being elected. Try again" +
-                " later.")), getSelf());
+                " later.", persistenceId()))), getSelf());
         }
     }
 
@@ -556,7 +555,7 @@ public class Shard extends RaftActor {
                 .build();
 
         if(LOG.isDebugEnabled()) {
-            LOG.debug("Creating transaction : {} ", transactionId);
+            LOG.debug("{}: Creating transaction : {} ", persistenceId(), transactionId);
         }
 
         ActorRef transactionActor = createTypedTransactionActor(transactionType, transactionId,
@@ -581,7 +580,7 @@ public class Shard extends RaftActor {
             shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
         } catch (InterruptedException | ExecutionException e) {
             shardMBean.incrementFailedTransactionsCount();
-            LOG.error(e, "Failed to commit");
+            LOG.error(e, "{}: Failed to commit", persistenceId());
         }
     }
 
@@ -598,14 +597,14 @@ public class Shard extends RaftActor {
 
     private void registerChangeListener(final RegisterChangeListener registerChangeListener) {
 
-        LOG.debug("registerDataChangeListener for {}", registerChangeListener.getPath());
+        LOG.debug("{}: registerDataChangeListener for {}", persistenceId(), registerChangeListener.getPath());
 
         ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
                                                      NormalizedNode<?, ?>>> registration;
         if(isLeader()) {
             registration = doChangeListenerRegistration(registerChangeListener);
         } else {
-            LOG.debug("Shard is not the leader - delaying registration");
+            LOG.debug("{}: Shard is not the leader - delaying registration", persistenceId());
 
             DelayedListenerRegistration delayedReg =
                     new DelayedListenerRegistration(registerChangeListener);
@@ -616,8 +615,8 @@ public class Shard extends RaftActor {
         ActorRef listenerRegistration = getContext().actorOf(
                 DataChangeListenerRegistration.props(registration));
 
-        LOG.debug("registerDataChangeListener sending reply, listenerRegistrationPath = {} ",
-                    listenerRegistration.path());
+        LOG.debug("{}: registerDataChangeListener sending reply, listenerRegistrationPath = {} ",
+                persistenceId(), listenerRegistration.path());
 
         getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()), getSelf());
     }
@@ -641,7 +640,7 @@ public class Shard extends RaftActor {
         AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener =
                 new DataChangeListenerProxy(dataChangeListenerPath);
 
-        LOG.debug("Registering for path {}", registerChangeListener.getPath());
+        LOG.debug("{}: Registering for path {}", persistenceId(), registerChangeListener.getPath());
 
         return store.registerChangeListener(registerChangeListener.getPath(), listener,
                 registerChangeListener.getScope());
@@ -658,7 +657,7 @@ public class Shard extends RaftActor {
         currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize);
 
         if(LOG.isDebugEnabled()) {
-            LOG.debug("{} : starting log recovery batch with max size {}", persistenceId(), maxBatchSize);
+            LOG.debug("{}: starting log recovery batch with max size {}", persistenceId(), maxBatchSize);
         }
     }
 
@@ -668,40 +667,42 @@ public class Shard extends RaftActor {
             try {
                 currentLogRecoveryBatch.add(((ModificationPayload) data).getModification());
             } catch (ClassNotFoundException | IOException e) {
-                LOG.error(e, "Error extracting ModificationPayload");
+                LOG.error(e, "{}: Error extracting ModificationPayload", persistenceId());
             }
         } else if (data instanceof CompositeModificationPayload) {
             currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification());
         } else if (data instanceof CompositeModificationByteStringPayload) {
             currentLogRecoveryBatch.add(((CompositeModificationByteStringPayload) data).getModification());
         } else {
-            LOG.error("Unknown state received {} during recovery", data);
+            LOG.error("{}: Unknown state received {} during recovery", persistenceId(), data);
         }
     }
 
     @Override
     protected void applyRecoverySnapshot(final byte[] snapshotBytes) {
         if(recoveryCoordinator == null) {
-            recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext);
+            recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext,
+                    LOG, name.toString());
         }
 
         recoveryCoordinator.submit(snapshotBytes, store.newWriteOnlyTransaction());
 
         if(LOG.isDebugEnabled()) {
-            LOG.debug("{} : submitted recovery sbapshot", persistenceId());
+            LOG.debug("{}: submitted recovery sbapshot", persistenceId());
         }
     }
 
     @Override
     protected void applyCurrentLogRecoveryBatch() {
         if(recoveryCoordinator == null) {
-            recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext);
+            recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext,
+                    LOG, name.toString());
         }
 
         recoveryCoordinator.submit(currentLogRecoveryBatch, store.newWriteOnlyTransaction());
 
         if(LOG.isDebugEnabled()) {
-            LOG.debug("{} : submitted log recovery batch with size {}", persistenceId(),
+            LOG.debug("{}: submitted log recovery batch with size {}", persistenceId(),
                     currentLogRecoveryBatch.size());
         }
     }
@@ -712,7 +713,7 @@ public class Shard extends RaftActor {
             Collection<DOMStoreWriteTransaction> txList = recoveryCoordinator.getTransactions();
 
             if(LOG.isDebugEnabled()) {
-                LOG.debug("{} : recovery complete - committing {} Tx's", persistenceId(), txList.size());
+                LOG.debug("{}: recovery complete - committing {} Tx's", persistenceId(), txList.size());
             }
 
             for(DOMStoreWriteTransaction tx: txList) {
@@ -721,7 +722,7 @@ public class Shard extends RaftActor {
                     shardMBean.incrementCommittedTransactionCount();
                 } catch (InterruptedException | ExecutionException e) {
                     shardMBean.incrementFailedTransactionsCount();
-                    LOG.error(e, "Failed to commit");
+                    LOG.error(e, "{}: Failed to commit", persistenceId());
                 }
             }
         }
@@ -751,7 +752,7 @@ public class Shard extends RaftActor {
             try {
                 applyModificationToState(clientActor, identifier, ((ModificationPayload) data).getModification());
             } catch (ClassNotFoundException | IOException e) {
-                LOG.error(e, "Error extracting ModificationPayload");
+                LOG.error(e, "{}: Error extracting ModificationPayload", persistenceId());
             }
         }
         else if (data instanceof CompositeModificationPayload) {
@@ -763,8 +764,8 @@ public class Shard extends RaftActor {
 
             applyModificationToState(clientActor, identifier, modification);
         } else {
-            LOG.error("Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}",
-                    data, data.getClass().getClassLoader(),
+            LOG.error("{}: Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}",
+                    persistenceId(), data, data.getClass().getClassLoader(),
                     CompositeModificationPayload.class.getClassLoader());
         }
 
@@ -775,8 +776,8 @@ public class Shard extends RaftActor {
     private void applyModificationToState(ActorRef clientActor, String identifier, Object modification) {
         if(modification == null) {
             LOG.error(
-                    "modification is null - this is very unexpected, clientActor = {}, identifier = {}",
-                    identifier, clientActor != null ? clientActor.path().toString() : null);
+                    "{}: modification is null - this is very unexpected, clientActor = {}, identifier = {}",
+                    persistenceId(), identifier, clientActor != null ? clientActor.path().toString() : null);
         } else if(clientActor == null) {
             // There's no clientActor to which to send a commit reply so we must be applying
             // replicated state from the leader.
@@ -821,7 +822,7 @@ public class Shard extends RaftActor {
         // we can safely commit everything in here. We not need to worry about event notifications
         // as they would have already been disabled on the follower
 
-        LOG.info("Applying snapshot");
+        LOG.info("{}: Applying snapshot", persistenceId());
         try {
             DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
 
@@ -834,9 +835,9 @@ public class Shard extends RaftActor {
             transaction.write(DATASTORE_ROOT, node);
             syncCommitTransaction(transaction);
         } catch (InterruptedException | ExecutionException e) {
-            LOG.error(e, "An exception occurred when applying snapshot");
+            LOG.error(e, "{}: An exception occurred when applying snapshot", persistenceId());
         } finally {
-            LOG.info("Done applying snapshot");
+            LOG.info("{}: Done applying snapshot", persistenceId());
         }
     }
 
@@ -865,8 +866,8 @@ public class Shard extends RaftActor {
             for(Map.Entry<String, DOMStoreTransactionChain> entry : transactionChains.entrySet()){
                 if(LOG.isDebugEnabled()) {
                     LOG.debug(
-                        "onStateChanged: Closing transaction chain {} because shard {} is no longer the leader",
-                        entry.getKey(), getId());
+                        "{}: onStateChanged: Closing transaction chain {} because shard {} is no longer the leader",
+                        persistenceId(), entry.getKey(), getId());
                 }
                 entry.getValue().close();
             }
index 19fa26682e2a4cea7b637fda85064a3aea0226e5..165e272d8b09ca1631a94e7a1e7d14a4370bb595 100644 (file)
@@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
 import akka.actor.Status;
+import akka.event.LoggingAdapter;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import java.util.LinkedList;
@@ -19,8 +20,6 @@ import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransacti
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Coordinates commits for a shard ensuring only one concurrent 3-phase commit.
@@ -29,8 +28,6 @@ import org.slf4j.LoggerFactory;
  */
 public class ShardCommitCoordinator {
 
-    private static final Logger LOG = LoggerFactory.getLogger(ShardCommitCoordinator.class);
-
     private final Cache<String, CohortEntry> cohortCache;
 
     private CohortEntry currentCohortEntry;
@@ -39,11 +36,18 @@ public class ShardCommitCoordinator {
 
     private final int queueCapacity;
 
-    public ShardCommitCoordinator(long cacheExpiryTimeoutInSec, int queueCapacity) {
+    private final LoggingAdapter log;
+
+    private final String name;
+
+    public ShardCommitCoordinator(long cacheExpiryTimeoutInSec, int queueCapacity, LoggingAdapter log,
+            String name) {
         cohortCache = CacheBuilder.newBuilder().expireAfterAccess(
                 cacheExpiryTimeoutInSec, TimeUnit.SECONDS).build();
 
         this.queueCapacity = queueCapacity;
+        this.log = log;
+        this.name = name;
 
         // We use a LinkedList here to avoid synchronization overhead with concurrent queue impls
         // since this should only be accessed on the shard's dispatcher.
@@ -74,9 +78,9 @@ public class ShardCommitCoordinator {
     public void handleCanCommit(CanCommitTransaction canCommit, final ActorRef sender,
             final ActorRef shard) {
         String transactionID = canCommit.getTransactionID();
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("Processing canCommit for transaction {} for shard {}",
-                    transactionID, shard.path());
+        if(log.isDebugEnabled()) {
+            log.debug("{}: Processing canCommit for transaction {} for shard {}",
+                    name, transactionID, shard.path());
         }
 
         // Lookup the cohort entry that was cached previously (or should have been) by
@@ -86,8 +90,8 @@ public class ShardCommitCoordinator {
             // Either canCommit was invoked before ready(shouldn't happen)  or a long time passed
             // between canCommit and ready and the entry was expired from the cache.
             IllegalStateException ex = new IllegalStateException(
-                    String.format("No cohort entry found for transaction %s", transactionID));
-            LOG.error(ex.getMessage());
+                    String.format("%s: No cohort entry found for transaction %s", name, transactionID));
+            log.error(ex.getMessage());
             sender.tell(new Status.Failure(ex), shard);
             return;
         }
@@ -98,8 +102,8 @@ public class ShardCommitCoordinator {
         if(currentCohortEntry != null) {
             // There's already a Tx commit in progress - attempt to queue this entry to be
             // committed after the current Tx completes.
-            LOG.debug("Transaction {} is already in progress - queueing transaction {}",
-                    currentCohortEntry.getTransactionID(), transactionID);
+            log.debug("{}: Transaction {} is already in progress - queueing transaction {}",
+                    name, currentCohortEntry.getTransactionID(), transactionID);
 
             if(queuedCohortEntries.size() < queueCapacity) {
                 queuedCohortEntries.offer(cohortEntry);
@@ -107,10 +111,10 @@ public class ShardCommitCoordinator {
                 removeCohortEntry(transactionID);
 
                 RuntimeException ex = new RuntimeException(
-                        String.format("Could not enqueue transaction %s - the maximum commit queue"+
+                        String.format("%s: Could not enqueue transaction %s - the maximum commit queue"+
                                       " capacity %d has been reached.",
-                                transactionID, queueCapacity));
-                LOG.error(ex.getMessage());
+                                      name, transactionID, queueCapacity));
+                log.error(ex.getMessage());
                 sender.tell(new Status.Failure(ex), shard);
             }
         } else {
@@ -140,7 +144,7 @@ public class ShardCommitCoordinator {
                 removeCohortEntry(cohortEntry.getTransactionID());
             }
         } catch (InterruptedException | ExecutionException e) {
-            LOG.debug("An exception occurred during canCommit", e);
+            log.debug("{}: An exception occurred during canCommit: {}", name, e);
 
             // Remove the entry from the cache now since the Tx will be aborted.
             removeCohortEntry(cohortEntry.getTransactionID());
@@ -201,6 +205,7 @@ public class ShardCommitCoordinator {
             // Dequeue the next cohort entry waiting in the queue.
             currentCohortEntry = queuedCohortEntries.poll();
             if(currentCohortEntry != null) {
+                currentCohortEntry.updateLastAccessTime();
                 doCanCommit(currentCohortEntry);
             }
         }
index 10876045ae272c436e54143c8a0da8bf1c2e41e7..22e2dbd47d4d7148c8e41cfe05ce8532df527466 100644 (file)
@@ -27,6 +27,15 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
@@ -46,15 +55,6 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex
 import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.duration.Duration;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
 
 /**
  * The ShardManager has the following jobs,
@@ -555,7 +555,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     static class SchemaContextModules implements Serializable {
-        private static final long serialVersionUID = 1L;
+        private static final long serialVersionUID = -8884620101025936590L;
+
         private final Set<String> modules;
 
         SchemaContextModules(Set<String> modules){
index 6f8d0567d9aa162d4080361162fe709decd544ce..2e66ef918e6e7541592449e9b51405610f4826a6 100644 (file)
@@ -96,4 +96,9 @@ public class ShardReadTransaction extends ShardTransaction {
     protected DOMStoreTransaction getDOMStoreTransaction() {
         return transaction;
     }
+
+    @Override
+    protected boolean returnCloseTransactionReply() {
+        return false;
+    }
 }
index 238b4e46dce041add47117503fcb68feb54e8e27..2a97036883b272d3f6757d9657417c0891567f7f 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import akka.event.LoggingAdapter;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.util.Collection;
@@ -21,8 +22,6 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Coordinates persistence recovery of journal log entries and snapshots for a shard. Each snapshot
@@ -37,16 +36,19 @@ class ShardRecoveryCoordinator {
 
     private static final int TIME_OUT = 10;
 
-    private static final Logger LOG = LoggerFactory.getLogger(ShardRecoveryCoordinator.class);
-
     private final List<DOMStoreWriteTransaction> resultingTxList = Lists.newArrayList();
     private final SchemaContext schemaContext;
     private final String shardName;
     private final ExecutorService executor;
+    private final LoggingAdapter log;
+    private final String name;
 
-    ShardRecoveryCoordinator(String shardName, SchemaContext schemaContext) {
+    ShardRecoveryCoordinator(String shardName, SchemaContext schemaContext, LoggingAdapter log,
+            String name) {
         this.schemaContext = schemaContext;
         this.shardName = shardName;
+        this.log = log;
+        this.name = name;
 
         executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(),
                 new ThreadFactoryBuilder().setDaemon(true)
@@ -85,7 +87,7 @@ class ShardRecoveryCoordinator {
             if(executor.awaitTermination(TIME_OUT, TimeUnit.MINUTES))  {
                 return resultingTxList;
             } else {
-                LOG.error("Recovery for shard {} timed out after {} minutes", shardName, TIME_OUT);
+                log.error("{}: Recovery for shard {} timed out after {} minutes", name, shardName, TIME_OUT);
             }
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
index 678b7815693382179328a8c13adb567d80d61b13..8a37dfee4d8ccc698a5bb9096d4ecdf9ed228771 100644 (file)
@@ -39,12 +39,6 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
  * The ShardTransaction Actor delegates all actions to DOMDataReadWriteTransaction
  * </p>
  * <p>
- * Even though the DOMStore and the DOMStoreTransactionChain implement multiple types of transactions
- * the ShardTransaction Actor only works with read-write transactions. This is just to keep the logic simple. At this
- * time there are no known advantages for creating a read-only or write-only transaction which may change over time
- * at which point we can optimize things in the distributed store as well.
- * </p>
- * <p>
  * Handles Messages <br/>
  * ---------------- <br/>
  * <li> {@link org.opendaylight.controller.cluster.datastore.messages.ReadData}
@@ -114,10 +108,14 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering
         }
     }
 
+    protected boolean returnCloseTransactionReply() {
+        return true;
+    }
+
     private void closeTransaction(boolean sendReply) {
         getDOMStoreTransaction().close();
 
-        if(sendReply) {
+        if(sendReply && returnCloseTransactionReply()) {
             getSender().tell(CloseTransactionReply.INSTANCE.toSerializable(), getSelf());
         }
 
index 5053d47f84e6fff6f0ab8aa33521dc23fbf45e4d..03bae2d99dedec37619e63c8b2924c1c7acddc20 100644 (file)
@@ -20,6 +20,7 @@ public class ShardIdentifier {
     private final String shardName;
     private final String memberName;
     private final String type;
+    private final String fullName;
 
     public ShardIdentifier(String shardName, String memberName, String type) {
 
@@ -30,6 +31,9 @@ public class ShardIdentifier {
         this.shardName = shardName;
         this.memberName = memberName;
         this.type = type;
+
+        fullName = new StringBuilder(memberName).append("-shard-").append(shardName).append("-")
+                .append(type).toString();
     }
 
     @Override
@@ -64,14 +68,10 @@ public class ShardIdentifier {
         return result;
     }
 
-    @Override public String toString() {
+    @Override
+    public String toString() {
         //ensure the output of toString matches the pattern above
-        return new StringBuilder(memberName)
-                    .append("-shard-")
-                    .append(shardName)
-                    .append("-")
-                    .append(type)
-                    .toString();
+        return fullName;
     }
 
     public static Builder builder(){
index 5b7002eda2aafe923c0ad5d3b20addfd095b8a3e..ce7d6303ad13985fa4c08b4fd152cdae52cbd360 100644 (file)
@@ -8,8 +8,6 @@
 
 package org.opendaylight.controller.cluster.datastore;
 
-import static junit.framework.Assert.assertNotNull;
-import static junit.framework.Assert.assertTrue;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.commons.lang.SerializationUtils;
@@ -24,6 +22,9 @@ import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
+import static junit.framework.Assert.assertNotNull;
+import static junit.framework.Assert.assertTrue;
+
 @Deprecated
 public class CompositeModificationByteStringPayloadTest {
 
@@ -69,6 +70,6 @@ public class CompositeModificationByteStringPayloadTest {
 
         entries.add(new ReplicatedLogImplEntry(0, 1, payload));
 
-        assertNotNull(new AppendEntries(10, "foobar", 10, 10, entries, 10).toSerializable());
+        assertNotNull(new AppendEntries(10, "foobar", 10, 10, entries, 10, -1).toSerializable());
     }
 }
index a55f6b865d127b9f6f4abe33cd9cbb227ab2839f..90b978821f2a10860e6006a4047a97ed7b667fa3 100644 (file)
@@ -55,7 +55,7 @@ public class CompositeModificationPayloadTest {
         });
 
         AppendEntries appendEntries =
-            new AppendEntries(1, "member-1", 0, 100, entries, 1);
+            new AppendEntries(1, "member-1", 0, 100, entries, 1, -1);
 
         AppendEntriesMessages.AppendEntries o = (AppendEntriesMessages.AppendEntries)
                 appendEntries.toSerializable(RaftVersions.HELIUM_VERSION);
index 6f8035e2d1bda329dd975fd1d5d87fd5856e4581..58aec30a8470035bfd7111c5b4c5badebf26b401 100644 (file)
@@ -7,7 +7,11 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
-import java.util.concurrent.TimeUnit;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
@@ -16,6 +20,9 @@ import akka.dispatch.ExecutionContexts;
 import akka.dispatch.Futures;
 import akka.testkit.JavaTestKit;
 import akka.util.Timeout;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.concurrent.TimeUnit;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -36,16 +43,9 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataCh
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.Uninterruptibles;
 import scala.concurrent.ExecutionContextExecutor;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.eq;
 
 /**
  * Unit tests for DataChangeListenerRegistrationProxy.
@@ -207,6 +207,7 @@ public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest {
             doReturn(Futures.failed(new RuntimeException("mock"))).
                     when(actorContext).executeOperationAsync(any(ActorRef.class),
                             any(Object.class), any(Timeout.class));
+            doReturn(mock(DatastoreContext.class)).when(actorContext).getDatastoreContext();
 
             proxy.init(YangInstanceIdentifier.of(TestModel.TEST_QNAME),
                     AsyncDataBroker.DataChangeScope.ONE);
index 69dd706f37cb3bd8c1e7c75d91ec05e2b0fe1f13..851fb0114b3a64c7d211a5c6375c14c41e715e69 100644 (file)
@@ -412,10 +412,10 @@ public class ShardTransactionTest extends AbstractActorTest {
     }
 
     @Test
-    public void testOnReceiveCloseTransaction() throws Exception {
+    public void testReadWriteTxOnReceiveCloseTransaction() throws Exception {
         new JavaTestKit(getSystem()) {{
             final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
-                    "testCloseTransaction");
+                    "testReadWriteTxOnReceiveCloseTransaction");
 
             watch(transaction);
 
@@ -426,6 +426,35 @@ public class ShardTransactionTest extends AbstractActorTest {
         }};
     }
 
+    @Test
+    public void testWriteOnlyTxOnReceiveCloseTransaction() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
+                    "testWriteTxOnReceiveCloseTransaction");
+
+            watch(transaction);
+
+            transaction.tell(new CloseTransaction().toSerializable(), getRef());
+
+            expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
+            expectTerminated(duration("3 seconds"), transaction);
+        }};
+    }
+
+    @Test
+    public void testReadOnlyTxOnReceiveCloseTransaction() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            final ActorRef transaction = newTransactionActor(store.newReadOnlyTransaction(),
+                    "testReadOnlyTxOnReceiveCloseTransaction");
+
+            watch(transaction);
+
+            transaction.tell(new CloseTransaction().toSerializable(), getRef());
+
+            expectMsgClass(duration("3 seconds"), Terminated.class);
+        }};
+    }
+
     @Test(expected=UnknownMessageException.class)
     public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
         final ActorRef shard = createShard();
index 79c1bb4720e9790599b99bb17c6cf6d823275b86..28fc6b0f57ed950479ba2737a4a2aef26e450475 100644 (file)
@@ -98,7 +98,7 @@ public class Client {
             }
         });
 
-        return new AppendEntries(1, "member-1", 0, 100, modification, 1);
+        return new AppendEntries(1, "member-1", 0, 100, modification, 1, -1);
     }
 
     public static AppendEntries keyValueAppendEntries() {
@@ -123,6 +123,6 @@ public class Client {
             }
         });
 
-        return new AppendEntries(1, "member-1", 0, 100, modification, 1);
+        return new AppendEntries(1, "member-1", 0, 100, modification, 1, -1);
     }
 }
index 4bb5258b4005f21eabc62c2707f2d3edca59b229..5d780be641e2cce6d2b48a2628cd675f9a24c8ee 100644 (file)
@@ -9,29 +9,28 @@ package org.opendaylight.controller.remote.rpc.messages;
 
 
 import com.google.common.base.Preconditions;
-import org.opendaylight.yangtools.yang.common.QName;
-
 import java.io.Serializable;
+import org.opendaylight.yangtools.yang.common.QName;
 
 public class ExecuteRpc implements Serializable {
-  private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 1128904894827335676L;
 
-  private final String inputCompositeNode;
-  private final QName rpc;
+    private final String inputCompositeNode;
+    private final QName rpc;
 
-  public ExecuteRpc(final String inputCompositeNode, final QName rpc) {
-    Preconditions.checkNotNull(inputCompositeNode, "Composite Node input string should be present");
-    Preconditions.checkNotNull(rpc, "rpc Qname should not be null");
+    public ExecuteRpc(final String inputCompositeNode, final QName rpc) {
+        Preconditions.checkNotNull(inputCompositeNode, "Composite Node input string should be present");
+        Preconditions.checkNotNull(rpc, "rpc Qname should not be null");
 
-    this.inputCompositeNode = inputCompositeNode;
-    this.rpc = rpc;
-  }
+        this.inputCompositeNode = inputCompositeNode;
+        this.rpc = rpc;
+    }
 
-  public String getInputCompositeNode() {
-    return inputCompositeNode;
-  }
+    public String getInputCompositeNode() {
+        return inputCompositeNode;
+    }
 
-  public QName getRpc() {
-    return rpc;
-  }
+    public QName getRpc() {
+        return rpc;
+    }
 }
index 652569b7baf705ea0efa5c4d35f530edf85942f0..9c40dbfc58a556bc28f337c9ee683b947846c045 100644 (file)
@@ -8,37 +8,36 @@
 package org.opendaylight.controller.remote.rpc.messages;
 
 import com.google.common.base.Preconditions;
+import java.io.Serializable;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
-import java.io.Serializable;
-
 public class InvokeRpc implements Serializable {
-  private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = -2813459607858108953L;
 
-  private final QName rpc;
-  private final YangInstanceIdentifier identifier;
-  private final CompositeNode input;
+    private final QName rpc;
+    private final YangInstanceIdentifier identifier;
+    private final CompositeNode input;
 
-  public InvokeRpc(final QName rpc, final YangInstanceIdentifier identifier, final CompositeNode input) {
-    Preconditions.checkNotNull(rpc, "rpc qname should not be null");
-    Preconditions.checkNotNull(input, "rpc input should not be null");
+    public InvokeRpc(final QName rpc, final YangInstanceIdentifier identifier, final CompositeNode input) {
+        Preconditions.checkNotNull(rpc, "rpc qname should not be null");
+        Preconditions.checkNotNull(input, "rpc input should not be null");
 
-    this.rpc = rpc;
-    this.identifier = identifier;
-    this.input = input;
-  }
+        this.rpc = rpc;
+        this.identifier = identifier;
+        this.input = input;
+    }
 
-  public QName getRpc() {
-    return rpc;
-  }
+    public QName getRpc() {
+        return rpc;
+    }
 
-  public YangInstanceIdentifier getIdentifier() {
-    return identifier;
-  }
+    public YangInstanceIdentifier getIdentifier() {
+        return identifier;
+    }
 
-  public CompositeNode getInput() {
-    return input;
-  }
+    public CompositeNode getInput() {
+        return input;
+    }
 }
index 387cb90112dad6ffbab6fb7cce7bf7c1d92442f1..e6b208cb6fe63d64eca185b9d90d2d19144ed440 100644 (file)
@@ -10,14 +10,15 @@ package org.opendaylight.controller.remote.rpc.messages;
 import java.io.Serializable;
 
 public class RpcResponse implements Serializable {
-  private static final long serialVersionUID = 1L;
-  private final String resultCompositeNode;
+    private static final long serialVersionUID = -4211279498688989245L;
 
-  public RpcResponse(final String resultCompositeNode) {
-    this.resultCompositeNode = resultCompositeNode;
-  }
+    private final String resultCompositeNode;
 
-  public String getResultCompositeNode() {
-    return resultCompositeNode;
-  }
+    public RpcResponse(final String resultCompositeNode) {
+        this.resultCompositeNode = resultCompositeNode;
+    }
+
+    public String getResultCompositeNode() {
+        return resultCompositeNode;
+    }
 }
index 52b1106c873872609e4300abe52d558ee89a6011..f67657f6927801931fae2fd0434481169f2f3de8 100644 (file)
@@ -17,7 +17,7 @@ import org.opendaylight.controller.remote.rpc.registry.gossip.Copier;
 import org.opendaylight.controller.sal.connector.api.RpcRouter;
 
 public class RoutingTable implements Copier<RoutingTable>, Serializable {
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 5592610415175278760L;
 
     private final Map<RpcRouter.RouteIdentifier<?, ?, ?>, Long> table = new HashMap<>();
     private ActorRef router;
index b81175e9a253176870ddf92901d8d2debaa4d698..4c4573d909491f0414d6bd00ebac5eb35a2c7b94 100644 (file)
@@ -10,7 +10,7 @@ package org.opendaylight.controller.remote.rpc.registry.gossip;
 import java.io.Serializable;
 
 public class BucketImpl<T extends Copier<T>> implements Bucket<T>, Serializable {
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 294779770032719196L;
 
     private Long version = System.currentTimeMillis();
 
index b05bd7d0f6f314c990e4a1d7909115b6c1ad1348..00437e7e5639fc382e8b1434d4b02495ef1ac56e 100644 (file)
@@ -46,7 +46,8 @@ public class Messages {
         }
 
         public static class ContainsBuckets implements Serializable{
-            private static final long serialVersionUID = 1L;
+            private static final long serialVersionUID = -4940160367495308286L;
+
             private final Map<Address, Bucket> buckets;
 
             public ContainsBuckets(Map<Address, Bucket> buckets){
@@ -87,7 +88,8 @@ public class Messages {
         }
 
         public static class ContainsBucketVersions implements Serializable{
-            private static final long serialVersionUID = 1L;
+            private static final long serialVersionUID = -8172148925383801613L;
+
             Map<Address, Long> versions;
 
             public ContainsBucketVersions(Map<Address, Long> versions) {
@@ -119,15 +121,16 @@ public class Messages {
 
     public static class GossiperMessages{
         public static class Tick implements Serializable {
-            private static final long serialVersionUID = 1L;
+            private static final long serialVersionUID = -4770935099506366773L;
         }
 
         public static final class GossipTick extends Tick {
-            private static final long serialVersionUID = 1L;
+            private static final long serialVersionUID = 5803354404380026143L;
         }
 
         public static final class GossipStatus extends ContainsBucketVersions implements Serializable{
-            private static final long serialVersionUID = 1L;
+            private static final long serialVersionUID = -593037395143883265L;
+
             private final Address from;
 
             public GossipStatus(Address from, Map<Address, Long> versions) {
@@ -141,7 +144,8 @@ public class Messages {
         }
 
         public static final class GossipEnvelope extends ContainsBuckets implements Serializable {
-            private static final long serialVersionUID = 1L;
+            private static final long serialVersionUID = 8346634072582438818L;
+
             private final Address from;
             private final Address to;
 
index 2a8a80da09131fed382d4daa06e6533d0085c17d..852e99e14605ced0935bcacd9fd795237f486cda 100644 (file)
@@ -9,6 +9,9 @@
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
+    <artifactId>sal-test-model</artifactId>
+    <packaging>bundle</packaging>
+
     <dependencies>
         <dependency>
             <groupId>org.opendaylight.yangtools</groupId>
@@ -20,7 +23,6 @@
         </dependency>
     </dependencies>
 
-    <artifactId>sal-test-model</artifactId>
     <build>
         <plugins>
             <plugin>
diff --git a/opendaylight/md-sal/sal-test-model/src/main/yang/opendaylight-test-notification.yang b/opendaylight/md-sal/sal-test-model/src/main/yang/opendaylight-test-notification.yang
new file mode 100644 (file)
index 0000000..31ec7ae
--- /dev/null
@@ -0,0 +1,25 @@
+module opendaylight-test-notification {
+    yang-version 1;
+    namespace "urn:opendaylight:params:xml:ns:yang:controller:md:sal:test:bi:ba:notification";
+    prefix "ntf";
+
+    description
+        "Test model for testing of registering notification listener and publishing of notification.";
+
+    revision "2015-02-05" {
+        description
+            "Initial revision";
+    }
+
+    notification out-of-pixie-dust-notification {
+        description "Just a testing notification that we can not fly for now.";
+
+        leaf reason {
+            type string;
+        }
+
+        leaf days-till-new-dust {
+            type uint16;
+        }
+    }
+}
\ No newline at end of file
diff --git a/opendaylight/netconf/ietf-netconf-notifications/pom.xml b/opendaylight/netconf/ietf-netconf-notifications/pom.xml
new file mode 100644 (file)
index 0000000..1ce3b03
--- /dev/null
@@ -0,0 +1,63 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+  ~
+  ~ This program and the accompanying materials are made available under the
+  ~ terms of the Eclipse Public License v1.0 which accompanies this distribution,
+  ~ and is available at http://www.eclipse.org/legal/epl-v10.html
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.opendaylight.controller</groupId>
+    <artifactId>netconf-subsystem</artifactId>
+    <version>0.3.0-SNAPSHOT</version>
+  </parent>
+  <artifactId>ietf-netconf-notifications</artifactId>
+  <packaging>bundle</packaging>
+  <name>${project.artifactId}</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>ietf-netconf</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.yangtools.model</groupId>
+      <artifactId>ietf-yang-types</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+
+      <plugin>
+        <groupId>org.apache.felix</groupId>
+        <artifactId>maven-bundle-plugin</artifactId>
+        <configuration>
+          <instructions>
+            <Export-Package>
+              org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.*,
+              org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.*,
+              org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.*
+            </Export-Package>
+          </instructions>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.opendaylight.yangtools</groupId>
+        <artifactId>yang-maven-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>
diff --git a/opendaylight/netconf/ietf-netconf-notifications/src/main/yang/ietf-netconf-notifications@2012-02-06.yang b/opendaylight/netconf/ietf-netconf-notifications/src/main/yang/ietf-netconf-notifications@2012-02-06.yang
new file mode 100644 (file)
index 0000000..a04799d
--- /dev/null
@@ -0,0 +1,363 @@
+module ietf-netconf-notifications {
+
+   namespace
+     "urn:ietf:params:xml:ns:yang:ietf-netconf-notifications";
+
+   prefix ncn;
+
+   import ietf-inet-types { prefix inet; }
+   import ietf-netconf { prefix nc; }
+
+   organization
+     "IETF NETCONF (Network Configuration Protocol) Working Group";
+
+   contact
+     "WG Web:   <http://tools.ietf.org/wg/netconf/>
+      WG List:  <mailto:netconf@ietf.org>
+
+      WG Chair: Bert Wijnen
+                <mailto:bertietf@bwijnen.net>
+
+      WG Chair: Mehmet Ersue
+                <mailto:mehmet.ersue@nsn.com>
+
+      Editor:   Andy Bierman
+                <mailto:andy@netconfcentral.org>";
+
+   description
+     "This module defines a YANG data model for use with the
+      NETCONF protocol that allows the NETCONF client to
+      receive common NETCONF base event notifications.
+
+      Copyright (c) 2012 IETF Trust and the persons identified as
+      the document authors.  All rights reserved.
+
+      Redistribution and use in source and binary forms, with or
+      without modification, is permitted pursuant to, and subject
+      to the license terms contained in, the Simplified BSD License
+
+
+
+      set forth in Section 4.c of the IETF Trust's Legal Provisions
+      Relating to IETF Documents
+      (http://trustee.ietf.org/license-info).
+
+      This version of this YANG module is part of RFC 6470; see
+      the RFC itself for full legal notices.";
+
+   revision "2012-02-06" {
+     description
+       "Initial version. Errata 3957 added.";
+     reference
+       "RFC 6470: NETCONF Base Notifications";
+   }
+
+  grouping common-session-parms {
+    description
+      "Common session parameters to identify a
+       management session.";
+
+    leaf username {
+      type string;
+      mandatory true;
+      description
+        "Name of the user for the session.";
+    }
+
+    leaf session-id {
+      type nc:session-id-or-zero-type;
+      mandatory true;
+      description
+        "Identifier of the session.
+         A NETCONF session MUST be identified by a non-zero value.
+         A non-NETCONF session MAY be identified by the value zero.";
+    }
+
+    leaf source-host {
+      type inet:ip-address;
+      description
+        "Address of the remote host for the session.";
+    }
+  }
+
+
+
+
+
+
+
+
+   grouping changed-by-parms {
+    description
+      "Common parameters to identify the source
+       of a change event, such as a configuration
+       or capability change.";
+
+    container changed-by {
+      description
+        "Indicates the source of the change.
+         If caused by internal action, then the
+         empty leaf 'server' will be present.
+         If caused by a management session, then
+         the name, remote host address, and session ID
+         of the session that made the change will be reported.";
+      choice server-or-user {
+        mandatory true;
+        leaf server {
+          type empty;
+          description
+            "If present, the change was caused
+             by the server.";
+        }
+
+        case by-user {
+          uses common-session-parms;
+        }
+      } // choice server-or-user
+    } // container changed-by-parms
+  }
+
+
+  notification netconf-config-change {
+    description
+      "Generated when the NETCONF server detects that the
+       <running> or <startup> configuration datastore
+       has been changed by a management session.
+       The notification summarizes the edits that
+       have been detected.
+
+       The server MAY choose to also generate this
+       notification while loading a datastore during the
+       boot process for the device.";
+
+    uses changed-by-parms;
+
+
+
+
+
+    leaf datastore {
+      type enumeration {
+        enum running {
+          description "The <running> datastore has changed.";
+        }
+        enum startup {
+          description "The <startup> datastore has changed";
+        }
+      }
+      default "running";
+      description
+        "Indicates which configuration datastore has changed.";
+    }
+
+    list edit {
+      description
+        "An edit record SHOULD be present for each distinct
+         edit operation that the server has detected on
+         the target datastore.  This list MAY be omitted
+         if the detailed edit operations are not known.
+         The server MAY report entries in this list for
+         changes not made by a NETCONF session (e.g., CLI).";
+
+      leaf target {
+        type instance-identifier;
+        description
+          "Topmost node associated with the configuration change.
+           A server SHOULD set this object to the node within
+           the datastore that is being altered.  A server MAY
+           set this object to one of the ancestors of the actual
+           node that was changed, or omit this object, if the
+           exact node is not known.";
+      }
+
+      leaf operation {
+        type nc:edit-operation-type;
+        description
+          "Type of edit operation performed.
+           A server MUST set this object to the NETCONF edit
+           operation performed on the target datastore.";
+      }
+    } // list edit
+  } // notification netconf-config-change
+
+
+
+
+
+
+  notification netconf-capability-change {
+    description
+      "Generated when the NETCONF server detects that
+       the server capabilities have changed.
+       Indicates which capabilities have been added, deleted,
+       and/or modified.  The manner in which a server
+       capability is changed is outside the scope of this
+       document.";
+
+    uses changed-by-parms;
+
+    leaf-list added-capability {
+      type inet:uri;
+      description
+        "List of capabilities that have just been added.";
+    }
+
+    leaf-list deleted-capability {
+      type inet:uri;
+      description
+        "List of capabilities that have just been deleted.";
+    }
+
+    leaf-list modified-capability {
+      type inet:uri;
+      description
+        "List of capabilities that have just been modified.
+         A capability is considered to be modified if the
+         base URI for the capability has not changed, but
+         one or more of the parameters encoded at the end of
+         the capability URI have changed.
+         The new modified value of the complete URI is returned.";
+    }
+  } // notification netconf-capability-change
+
+
+  notification netconf-session-start {
+    description
+      "Generated when a NETCONF server detects that a
+       NETCONF session has started.  A server MAY generate
+       this event for non-NETCONF management sessions.
+       Indicates the identity of the user that started
+       the session.";
+    uses common-session-parms;
+  } // notification netconf-session-start
+
+
+
+
+  notification netconf-session-end {
+    description
+      "Generated when a NETCONF server detects that a
+       NETCONF session has terminated.
+       A server MAY optionally generate this event for
+       non-NETCONF management sessions.  Indicates the
+       identity of the user that owned the session,
+       and why the session was terminated.";
+
+    uses common-session-parms;
+
+    leaf killed-by {
+      when "../termination-reason = 'killed'";
+      type nc:session-id-type;
+      description
+        "The ID of the session that directly caused this session
+         to be abnormally terminated.  If this session was abnormally
+         terminated by a non-NETCONF session unknown to the server,
+         then this leaf will not be present.";
+    }
+
+    leaf termination-reason {
+      type enumeration {
+        enum "closed" {
+          description
+            "The session was terminated by the client in normal
+             fashion, e.g., by the NETCONF <close-session>
+             protocol operation.";
+        }
+        enum "killed" {
+          description
+            "The session was terminated in abnormal
+             fashion, e.g., by the NETCONF <kill-session>
+             protocol operation.";
+        }
+        enum "dropped" {
+          description
+            "The session was terminated because the transport layer
+             connection was unexpectedly closed.";
+        }
+        enum "timeout" {
+          description
+            "The session was terminated because of inactivity,
+             e.g., waiting for the <hello> message or <rpc>
+             messages.";
+        }
+
+
+
+        enum "bad-hello" {
+          description
+            "The client's <hello> message was invalid.";
+        }
+        enum "other" {
+          description
+            "The session was terminated for some other reason.";
+        }
+      }
+      mandatory true;
+      description
+        "Reason the session was terminated.";
+    }
+  } // notification netconf-session-end
+
+
+  notification netconf-confirmed-commit {
+    description
+      "Generated when a NETCONF server detects that a
+       confirmed-commit event has occurred.  Indicates the event
+       and the current state of the confirmed-commit procedure
+       in progress.";
+    reference
+      "RFC 6241, Section 8.4";
+
+    uses common-session-parms {
+      when "confirm-event != 'timeout'";
+    }
+
+    leaf confirm-event {
+      type enumeration {
+        enum "start" {
+          description
+            "The confirmed-commit procedure has started.";
+        }
+        enum "cancel" {
+          description
+            "The confirmed-commit procedure has been canceled,
+             e.g., due to the session being terminated, or an
+             explicit <cancel-commit> operation.";
+        }
+        enum "timeout" {
+          description
+            "The confirmed-commit procedure has been canceled
+             due to the confirm-timeout interval expiring.
+             The common session parameters will not be present
+             in this sub-mode.";
+        }
+
+        enum "extend" {
+          description
+            "The confirmed-commit timeout has been extended,
+             e.g., by a new <confirmed-commit> operation.";
+        }
+        enum "complete" {
+          description
+            "The confirmed-commit procedure has been completed.";
+        }
+      }
+      mandatory true;
+      description
+        "Indicates the event that caused the notification.";
+    }
+
+    leaf timeout {
+      when
+        "../confirm-event = 'start' or ../confirm-event = 'extend'";
+      type uint32;
+      units "seconds";
+      description
+        "The configured timeout value if the event type
+         is 'start' or 'extend'.  This value represents
+         the approximate number of seconds from the event
+         time when the 'timeout' event might occur.";
+    }
+  } // notification netconf-confirmed-commit
+
+}
diff --git a/opendaylight/netconf/ietf-netconf-notifications/src/main/yang/nc-notifications@2008-07-14.yang b/opendaylight/netconf/ietf-netconf-notifications/src/main/yang/nc-notifications@2008-07-14.yang
new file mode 100644 (file)
index 0000000..fb9aac1
--- /dev/null
@@ -0,0 +1,95 @@
+module nc-notifications {
+
+    namespace "urn:ietf:params:xml:ns:netmod:notification";
+    prefix "manageEvent";
+
+    import ietf-yang-types{ prefix yang; }
+    import notifications { prefix ncEvent; }
+
+    organization
+      "IETF NETCONF WG";
+
+    contact
+      "netconf@ietf.org";
+
+    description
+      "Conversion of the 'manageEvent' XSD in the NETCONF
+       Notifications RFC.";
+
+    reference
+      "RFC 5277";
+
+    revision 2008-07-14 {
+      description "RFC 5277 version.";
+    }
+
+    container netconf {
+      description "Top-level element in the notification namespace";
+
+      config false;
+
+      container streams {
+        description
+          "The list of event streams supported by the system. When
+           a query is issued, the returned set of streams is
+           determined based on user privileges.";
+
+        list stream {
+          description
+            "Stream name, description and other information.";
+          key name;
+          min-elements 1;
+
+          leaf name {
+            description
+              "The name of the event stream. If this is the default
+               NETCONF stream, this must have the value 'NETCONF'.";
+            type ncEvent:streamNameType;
+          }
+
+          leaf description {
+            description
+              "A description of the event stream, including such
+               information as the type of events that are sent over
+               this stream.";
+            type string;
+            mandatory true;
+          }
+
+          leaf replaySupport {
+            description
+              "A description of the event stream, including such
+               information as the type of events that are sent over
+               this stream.";
+            type boolean;
+            mandatory true;
+          }
+
+          leaf replayLogCreationTime {
+            description
+              "The timestamp of the creation of the log used to support
+               the replay function on this stream. Note that this might
+               be earlier then the earliest available notification in
+               the log. This object is updated if the log resets for
+               some reason.  This object MUST be present if replay is
+               supported.";
+            type yang:date-and-time;   // xsd:dateTime is wrong!
+          }
+        }
+      }
+    }
+
+    notification replayComplete {
+      description
+        "This notification is sent to signal the end of a replay
+         portion of a subscription.";
+    }
+
+    notification notificationComplete {
+      description
+        "This notification is sent to signal the end of a notification
+         subscription. It is sent in the case that stopTime was
+         specified during the creation of the subscription..";
+    }
+
+}
diff --git a/opendaylight/netconf/ietf-netconf-notifications/src/main/yang/notifications@2008-07-14.yang b/opendaylight/netconf/ietf-netconf-notifications/src/main/yang/notifications@2008-07-14.yang
new file mode 100644 (file)
index 0000000..f107c2a
--- /dev/null
@@ -0,0 +1,83 @@
+module notifications {
+
+    namespace "urn:ietf:params:xml:ns:netconf:notification:1.0";
+    prefix "ncEvent";
+
+    import ietf-yang-types { prefix yang; }
+
+    organization
+      "IETF NETCONF WG";
+
+    contact
+      "netconf@ops.ietf.org";
+
+    description
+      "Conversion of the 'ncEvent' XSD in the
+       NETCONF Notifications RFC.";
+
+    reference
+      "RFC 5277.";
+
+    revision 2008-07-14 {
+      description "RFC 5277 version.";
+    }
+
+    typedef streamNameType {
+      description
+        "The name of an event stream.";
+      type string;
+    }
+
+    rpc create-subscription {
+      description
+        "The command to create a notification subscription. It
+         takes as argument the name of the notification stream
+         and filter. Both of those options limit the content of
+         the subscription. In addition, there are two time-related
+         parameters, startTime and stopTime, which can be used to
+         select the time interval of interest to the notification
+         replay feature.";
+
+      input {
+        leaf stream {
+          description
+            "An optional parameter that indicates which stream of events
+             is of interest. If not present, then events in the default
+             NETCONF stream will be sent.";
+          type streamNameType;
+          default "NETCONF";
+        }
+
+        anyxml filter {
+          description
+            "An optional parameter that indicates which subset of all
+             possible events is of interest. The format of this
+             parameter is the same as that of the filter parameter
+             in the NETCONF protocol operations. If not present,
+             all events not precluded by other parameters will
+             be sent.";
+        }
+
+        leaf startTime {
+          description
+            "A parameter used to trigger the replay feature and
+             indicates that the replay should start at the time
+             specified. If start time is not present, this is not a
+             replay subscription.";
+          type yang:date-and-time;
+        }
+
+        leaf stopTime {
+          // must ". >= ../startTime";
+          description
+            "An optional parameter used with the optional replay
+             feature to indicate the newest notifications of
+             interest. If stop time is not present, the notifications
+             will continue until the subscription is terminated.
+             Must be used with startTime.";
+          type yang:date-and-time;
+        }
+      }
+    }
+}
+
diff --git a/opendaylight/netconf/ietf-netconf/pom.xml b/opendaylight/netconf/ietf-netconf/pom.xml
new file mode 100644 (file)
index 0000000..6ed7a5f
--- /dev/null
@@ -0,0 +1,56 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+  ~
+  ~ This program and the accompanying materials are made available under the
+  ~ terms of the Eclipse Public License v1.0 which accompanies this distribution,
+  ~ and is available at http://www.eclipse.org/legal/epl-v10.html
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.opendaylight.controller</groupId>
+        <artifactId>netconf-subsystem</artifactId>
+        <version>0.3.0-SNAPSHOT</version>
+    </parent>
+    <artifactId>ietf-netconf</artifactId>
+    <packaging>bundle</packaging>
+    <name>${project.artifactId}</name>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.yangtools.model</groupId>
+            <artifactId>ietf-inet-types</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+
+            <plugin>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>maven-bundle-plugin</artifactId>
+                <configuration>
+                    <instructions>
+                        <Export-Package>org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.base._1._0.rev110601.*</Export-Package>
+                    </instructions>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.opendaylight.yangtools</groupId>
+                <artifactId>yang-maven-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
diff --git a/opendaylight/netconf/ietf-netconf/src/main/yang/ietf-netconf@2011-06-01.yang b/opendaylight/netconf/ietf-netconf/src/main/yang/ietf-netconf@2011-06-01.yang
new file mode 100644 (file)
index 0000000..4bbb1c2
--- /dev/null
@@ -0,0 +1,928 @@
+module ietf-netconf {
+
+  // the namespace for NETCONF XML definitions is unchanged
+  // from RFC 4741, which this document replaces
+  namespace "urn:ietf:params:xml:ns:netconf:base:1.0";
+
+  prefix nc;
+
+  import ietf-inet-types {
+    prefix inet;
+  }
+
+  organization
+    "IETF NETCONF (Network Configuration) Working Group";
+
+  contact
+    "WG Web:   <http://tools.ietf.org/wg/netconf/>
+     WG List:  <netconf@ietf.org>
+
+     WG Chair: Bert Wijnen
+               <bertietf@bwijnen.net>
+
+     WG Chair: Mehmet Ersue
+               <mehmet.ersue@nsn.com>
+
+     Editor:   Martin Bjorklund
+               <mbj@tail-f.com>
+
+     Editor:   Juergen Schoenwaelder
+               <j.schoenwaelder@jacobs-university.de>
+
+     Editor:   Andy Bierman
+               <andy.bierman@brocade.com>";
+  description
+    "NETCONF Protocol Data Types and Protocol Operations.
+
+     Copyright (c) 2011 IETF Trust and the persons identified as
+     the document authors.  All rights reserved.
+
+     Redistribution and use in source and binary forms, with or
+     without modification, is permitted pursuant to, and subject
+     to the license terms contained in, the Simplified BSD License
+     set forth in Section 4.c of the IETF Trust's Legal Provisions
+     Relating to IETF Documents
+     (http://trustee.ietf.org/license-info).
+
+     This version of this YANG module is part of RFC 6241; see
+     the RFC itself for full legal notices.";
+
+  revision 2011-06-01 {
+    description
+      "Initial revision;";
+    reference
+      "RFC 6241: Network Configuration Protocol";
+  }
+
+  extension get-filter-element-attributes {
+    description
+      "If this extension is present within an 'anyxml'
+       statement named 'filter', which must be conceptually
+       defined within the RPC input section for the <get>
+       and <get-config> protocol operations, then the
+       following unqualified XML attribute is supported
+       within the <filter> element, within a <get> or
+       <get-config> protocol operation:
+
+         type : optional attribute with allowed
+                value strings 'subtree' and 'xpath'.
+                If missing, the default value is 'subtree'.
+
+       If the 'xpath' feature is supported, then the
+       following unqualified XML attribute is
+       also supported:
+
+         select: optional attribute containing a
+                 string representing an XPath expression.
+                 The 'type' attribute must be equal to 'xpath'
+                 if this attribute is present.";
+  }
+
+  // NETCONF capabilities defined as features
+  feature writable-running {
+    description
+      "NETCONF :writable-running capability;
+       If the server advertises the :writable-running
+       capability for a session, then this feature must
+       also be enabled for that session.  Otherwise,
+       this feature must not be enabled.";
+    reference "RFC 6241, Section 8.2";
+  }
+
+  feature candidate {
+    description
+      "NETCONF :candidate capability;
+       If the server advertises the :candidate
+       capability for a session, then this feature must
+       also be enabled for that session.  Otherwise,
+       this feature must not be enabled.";
+    reference "RFC 6241, Section 8.3";
+  }
+
+  feature confirmed-commit {
+    if-feature candidate;
+    description
+      "NETCONF :confirmed-commit:1.1 capability;
+       If the server advertises the :confirmed-commit:1.1
+       capability for a session, then this feature must
+       also be enabled for that session.  Otherwise,
+       this feature must not be enabled.";
+
+    reference "RFC 6241, Section 8.4";
+  }
+
+  feature rollback-on-error {
+    description
+      "NETCONF :rollback-on-error capability;
+       If the server advertises the :rollback-on-error
+       capability for a session, then this feature must
+       also be enabled for that session.  Otherwise,
+       this feature must not be enabled.";
+    reference "RFC 6241, Section 8.5";
+  }
+
+  feature validate {
+    description
+      "NETCONF :validate:1.1 capability;
+       If the server advertises the :validate:1.1
+       capability for a session, then this feature must
+       also be enabled for that session.  Otherwise,
+       this feature must not be enabled.";
+    reference "RFC 6241, Section 8.6";
+  }
+
+  feature startup {
+    description
+      "NETCONF :startup capability;
+       If the server advertises the :startup
+       capability for a session, then this feature must
+       also be enabled for that session.  Otherwise,
+       this feature must not be enabled.";
+    reference "RFC 6241, Section 8.7";
+  }
+
+  feature url {
+    description
+      "NETCONF :url capability;
+       If the server advertises the :url
+       capability for a session, then this feature must
+       also be enabled for that session.  Otherwise,
+       this feature must not be enabled.";
+    reference "RFC 6241, Section 8.8";
+  }
+
+  feature xpath {
+    description
+      "NETCONF :xpath capability;
+       If the server advertises the :xpath
+       capability for a session, then this feature must
+       also be enabled for that session.  Otherwise,
+       this feature must not be enabled.";
+    reference "RFC 6241, Section 8.9";
+  }
+
+  // NETCONF Simple Types
+
+  typedef session-id-type {
+    type uint32 {
+      range "1..max";
+    }
+    description
+      "NETCONF Session Id";
+  }
+
+  typedef session-id-or-zero-type {
+    type uint32;
+    description
+      "NETCONF Session Id or Zero to indicate none";
+  }
+  typedef error-tag-type {
+    type enumeration {
+       enum in-use {
+         description
+           "The request requires a resource that
+            already is in use.";
+       }
+       enum invalid-value {
+         description
+           "The request specifies an unacceptable value for one
+            or more parameters.";
+       }
+       enum too-big {
+         description
+           "The request or response (that would be generated) is
+            too large for the implementation to handle.";
+       }
+       enum missing-attribute {
+         description
+           "An expected attribute is missing.";
+       }
+       enum bad-attribute {
+         description
+           "An attribute value is not correct; e.g., wrong type,
+            out of range, pattern mismatch.";
+       }
+       enum unknown-attribute {
+         description
+           "An unexpected attribute is present.";
+       }
+       enum missing-element {
+         description
+           "An expected element is missing.";
+       }
+       enum bad-element {
+         description
+           "An element value is not correct; e.g., wrong type,
+            out of range, pattern mismatch.";
+       }
+       enum unknown-element {
+         description
+           "An unexpected element is present.";
+       }
+       enum unknown-namespace {
+         description
+           "An unexpected namespace is present.";
+       }
+       enum access-denied {
+         description
+           "Access to the requested protocol operation or
+            data model is denied because authorization failed.";
+       }
+       enum lock-denied {
+         description
+           "Access to the requested lock is denied because the
+            lock is currently held by another entity.";
+       }
+       enum resource-denied {
+         description
+           "Request could not be completed because of
+            insufficient resources.";
+       }
+       enum rollback-failed {
+         description
+           "Request to roll back some configuration change (via
+            rollback-on-error or <discard-changes> operations)
+            was not completed for some reason.";
+
+       }
+       enum data-exists {
+         description
+           "Request could not be completed because the relevant
+            data model content already exists.  For example,
+            a 'create' operation was attempted on data that
+            already exists.";
+       }
+       enum data-missing {
+         description
+           "Request could not be completed because the relevant
+            data model content does not exist.  For example,
+            a 'delete' operation was attempted on
+            data that does not exist.";
+       }
+       enum operation-not-supported {
+         description
+           "Request could not be completed because the requested
+            operation is not supported by this implementation.";
+       }
+       enum operation-failed {
+         description
+           "Request could not be completed because the requested
+            operation failed for some reason not covered by
+            any other error condition.";
+       }
+       enum partial-operation {
+         description
+           "This error-tag is obsolete, and SHOULD NOT be sent
+            by servers conforming to this document.";
+       }
+       enum malformed-message {
+         description
+           "A message could not be handled because it failed to
+            be parsed correctly.  For example, the message is not
+            well-formed XML or it uses an invalid character set.";
+       }
+     }
+     description "NETCONF Error Tag";
+     reference "RFC 6241, Appendix A";
+  }
+
+  typedef error-severity-type {
+    type enumeration {
+      enum error {
+        description "Error severity";
+      }
+      enum warning {
+        description "Warning severity";
+      }
+    }
+    description "NETCONF Error Severity";
+    reference "RFC 6241, Section 4.3";
+  }
+
+  typedef edit-operation-type {
+    type enumeration {
+      enum merge {
+        description
+          "The configuration data identified by the
+           element containing this attribute is merged
+           with the configuration at the corresponding
+           level in the configuration datastore identified
+           by the target parameter.";
+      }
+      enum replace {
+        description
+          "The configuration data identified by the element
+           containing this attribute replaces any related
+           configuration in the configuration datastore
+           identified by the target parameter.  If no such
+           configuration data exists in the configuration
+           datastore, it is created.  Unlike a
+           <copy-config> operation, which replaces the
+           entire target configuration, only the configuration
+           actually present in the config parameter is affected.";
+      }
+      enum create {
+        description
+          "The configuration data identified by the element
+           containing this attribute is added to the
+           configuration if and only if the configuration
+           data does not already exist in the configuration
+           datastore.  If the configuration data exists, an
+           <rpc-error> element is returned with an
+           <error-tag> value of 'data-exists'.";
+      }
+      enum delete {
+        description
+          "The configuration data identified by the element
+           containing this attribute is deleted from the
+           configuration if and only if the configuration
+           data currently exists in the configuration
+           datastore.  If the configuration data does not
+           exist, an <rpc-error> element is returned with
+           an <error-tag> value of 'data-missing'.";
+      }
+      enum remove {
+        description
+          "The configuration data identified by the element
+           containing this attribute is deleted from the
+           configuration if the configuration
+           data currently exists in the configuration
+           datastore.  If the configuration data does not
+           exist, the 'remove' operation is silently ignored
+           by the server.";
+      }
+    }
+    default "merge";
+    description "NETCONF 'operation' attribute values";
+    reference "RFC 6241, Section 7.2";
+  }
+
+  // NETCONF Standard Protocol Operations
+
+  rpc get-config {
+    description
+      "Retrieve all or part of a specified configuration.";
+
+    reference "RFC 6241, Section 7.1";
+
+    input {
+      container source {
+        description
+          "Particular configuration to retrieve.";
+
+        choice config-source {
+          mandatory true;
+          description
+            "The configuration to retrieve.";
+          leaf candidate {
+            if-feature candidate;
+            type empty;
+            description
+              "The candidate configuration is the config source.";
+          }
+          leaf running {
+            type empty;
+            description
+              "The running configuration is the config source.";
+          }
+          leaf startup {
+            if-feature startup;
+            type empty;
+            description
+              "The startup configuration is the config source.
+               This is optional-to-implement on the server because
+               not all servers will support filtering for this
+               datastore.";
+          }
+        }
+      }
+
+      anyxml filter {
+        description
+          "Subtree or XPath filter to use.";
+        nc:get-filter-element-attributes;
+      }
+    }
+
+    output {
+      anyxml data {
+        description
+          "Copy of the source datastore subset that matched
+           the filter criteria (if any).  An empty data container
+           indicates that the request did not produce any results.";
+      }
+    }
+  }
+
+  rpc edit-config {
+    description
+      "The <edit-config> operation loads all or part of a specified
+       configuration to the specified target configuration.";
+
+    reference "RFC 6241, Section 7.2";
+
+    input {
+      container target {
+        description
+          "Particular configuration to edit.";
+
+        choice config-target {
+          mandatory true;
+          description
+            "The configuration target.";
+
+          leaf candidate {
+            if-feature candidate;
+            type empty;
+            description
+              "The candidate configuration is the config target.";
+          }
+          leaf running {
+            if-feature writable-running;
+            type empty;
+            description
+              "The running configuration is the config source.";
+          }
+        }
+      }
+
+      leaf default-operation {
+        type enumeration {
+          enum merge {
+            description
+              "The default operation is merge.";
+          }
+          enum replace {
+            description
+              "The default operation is replace.";
+          }
+          enum none {
+            description
+              "There is no default operation.";
+          }
+        }
+        default "merge";
+        description
+          "The default operation to use.";
+      }
+
+      leaf test-option {
+        if-feature validate;
+        type enumeration {
+          enum test-then-set {
+            description
+              "The server will test and then set if no errors.";
+          }
+          enum set {
+            description
+              "The server will set without a test first.";
+          }
+
+          enum test-only {
+            description
+              "The server will only test and not set, even
+               if there are no errors.";
+          }
+        }
+        default "test-then-set";
+        description
+          "The test option to use.";
+      }
+
+      leaf error-option {
+        type enumeration {
+          enum stop-on-error {
+            description
+              "The server will stop on errors.";
+          }
+          enum continue-on-error {
+            description
+              "The server may continue on errors.";
+          }
+          enum rollback-on-error {
+            description
+              "The server will roll back on errors.
+               This value can only be used if the 'rollback-on-error'
+               feature is supported.";
+          }
+        }
+        default "stop-on-error";
+        description
+          "The error option to use.";
+      }
+
+      choice edit-content {
+        mandatory true;
+        description
+          "The content for the edit operation.";
+
+        anyxml config {
+          description
+            "Inline Config content.";
+        }
+        leaf url {
+          if-feature url;
+          type inet:uri;
+          description
+            "URL-based config content.";
+        }
+      }
+    }
+  }
+
+  rpc copy-config {
+    description
+      "Create or replace an entire configuration datastore with the
+       contents of another complete configuration datastore.";
+
+    reference "RFC 6241, Section 7.3";
+
+    input {
+      container target {
+        description
+          "Particular configuration to copy to.";
+
+        choice config-target {
+          mandatory true;
+          description
+            "The configuration target of the copy operation.";
+
+          leaf candidate {
+            if-feature candidate;
+            type empty;
+            description
+              "The candidate configuration is the config target.";
+          }
+          leaf running {
+            if-feature writable-running;
+            type empty;
+            description
+              "The running configuration is the config target.
+               This is optional-to-implement on the server.";
+          }
+          leaf startup {
+            if-feature startup;
+            type empty;
+            description
+              "The startup configuration is the config target.";
+          }
+          leaf url {
+            if-feature url;
+            type inet:uri;
+            description
+              "The URL-based configuration is the config target.";
+          }
+        }
+      }
+
+      container source {
+        description
+          "Particular configuration to copy from.";
+
+        choice config-source {
+          mandatory true;
+          description
+            "The configuration source for the copy operation.";
+
+          leaf candidate {
+            if-feature candidate;
+            type empty;
+            description
+              "The candidate configuration is the config source.";
+          }
+          leaf running {
+            type empty;
+            description
+              "The running configuration is the config source.";
+          }
+          leaf startup {
+            if-feature startup;
+            type empty;
+            description
+              "The startup configuration is the config source.";
+          }
+          leaf url {
+            if-feature url;
+            type inet:uri;
+            description
+              "The URL-based configuration is the config source.";
+          }
+          anyxml config {
+            description
+              "Inline Config content: <config> element.  Represents
+               an entire configuration datastore, not
+               a subset of the running datastore.";
+          }
+        }
+      }
+    }
+  }
+
+  rpc delete-config {
+    description
+      "Delete a configuration datastore.";
+
+    reference "RFC 6241, Section 7.4";
+
+    input {
+      container target {
+        description
+          "Particular configuration to delete.";
+
+        choice config-target {
+          mandatory true;
+          description
+            "The configuration target to delete.";
+
+          leaf startup {
+            if-feature startup;
+            type empty;
+            description
+              "The startup configuration is the config target.";
+          }
+          leaf url {
+            if-feature url;
+            type inet:uri;
+            description
+              "The URL-based configuration is the config target.";
+          }
+        }
+      }
+    }
+  }
+
+  rpc lock {
+    description
+      "The lock operation allows the client to lock the configuration
+       system of a device.";
+
+    reference "RFC 6241, Section 7.5";
+
+    input {
+      container target {
+        description
+          "Particular configuration to lock.";
+
+        choice config-target {
+          mandatory true;
+          description
+            "The configuration target to lock.";
+
+          leaf candidate {
+            if-feature candidate;
+            type empty;
+            description
+              "The candidate configuration is the config target.";
+          }
+          leaf running {
+            type empty;
+            description
+              "The running configuration is the config target.";
+          }
+          leaf startup {
+            if-feature startup;
+            type empty;
+            description
+              "The startup configuration is the config target.";
+          }
+        }
+      }
+    }
+  }
+
+  rpc unlock {
+    description
+      "The unlock operation is used to release a configuration lock,
+       previously obtained with the 'lock' operation.";
+
+    reference "RFC 6241, Section 7.6";
+
+    input {
+      container target {
+        description
+          "Particular configuration to unlock.";
+
+        choice config-target {
+          mandatory true;
+          description
+            "The configuration target to unlock.";
+
+          leaf candidate {
+            if-feature candidate;
+            type empty;
+            description
+              "The candidate configuration is the config target.";
+          }
+          leaf running {
+            type empty;
+            description
+              "The running configuration is the config target.";
+          }
+          leaf startup {
+            if-feature startup;
+            type empty;
+            description
+              "The startup configuration is the config target.";
+          }
+        }
+      }
+    }
+  }
+
+  rpc get {
+    description
+      "Retrieve running configuration and device state information.";
+
+    reference "RFC 6241, Section 7.7";
+
+    input {
+      anyxml filter {
+        description
+          "This parameter specifies the portion of the system
+           configuration and state data to retrieve.";
+        nc:get-filter-element-attributes;
+      }
+    }
+
+    output {
+      anyxml data {
+        description
+          "Copy of the running datastore subset and/or state
+           data that matched the filter criteria (if any).
+           An empty data container indicates that the request did not
+           produce any results.";
+      }
+    }
+  }
+
+  rpc close-session {
+    description
+      "Request graceful termination of a NETCONF session.";
+
+    reference "RFC 6241, Section 7.8";
+  }
+
+  rpc kill-session {
+    description
+      "Force the termination of a NETCONF session.";
+
+    reference "RFC 6241, Section 7.9";
+
+    input {
+      leaf session-id {
+        type session-id-type;
+        mandatory true;
+        description
+          "Particular session to kill.";
+      }
+    }
+  }
+
+  rpc commit {
+    if-feature candidate;
+
+    description
+      "Commit the candidate configuration as the device's new
+       current configuration.";
+
+    reference "RFC 6241, Section 8.3.4.1";
+
+    input {
+      leaf confirmed {
+        if-feature confirmed-commit;
+        type empty;
+        description
+          "Requests a confirmed commit.";
+        reference "RFC 6241, Section 8.3.4.1";
+      }
+
+      leaf confirm-timeout {
+        if-feature confirmed-commit;
+        type uint32 {
+          range "1..max";
+        }
+        units "seconds";
+        default "600";   // 10 minutes
+        description
+          "The timeout interval for a confirmed commit.";
+        reference "RFC 6241, Section 8.3.4.1";
+      }
+
+      leaf persist {
+        if-feature confirmed-commit;
+        type string;
+        description
+          "This parameter is used to make a confirmed commit
+           persistent.  A persistent confirmed commit is not aborted
+           if the NETCONF session terminates.  The only way to abort
+           a persistent confirmed commit is to let the timer expire,
+           or to use the <cancel-commit> operation.
+
+           The value of this parameter is a token that must be given
+           in the 'persist-id' parameter of <commit> or
+           <cancel-commit> operations in order to confirm or cancel
+           the persistent confirmed commit.
+
+           The token should be a random string.";
+        reference "RFC 6241, Section 8.3.4.1";
+      }
+
+      leaf persist-id {
+        if-feature confirmed-commit;
+        type string;
+        description
+          "This parameter is given in order to commit a persistent
+           confirmed commit.  The value must be equal to the value
+           given in the 'persist' parameter to the <commit> operation.
+           If it does not match, the operation fails with an
+          'invalid-value' error.";
+        reference "RFC 6241, Section 8.3.4.1";
+      }
+
+    }
+  }
+
+  rpc discard-changes {
+    if-feature candidate;
+
+    description
+      "Revert the candidate configuration to the current
+       running configuration.";
+    reference "RFC 6241, Section 8.3.4.2";
+  }
+
+  rpc cancel-commit {
+    if-feature confirmed-commit;
+    description
+      "This operation is used to cancel an ongoing confirmed commit.
+       If the confirmed commit is persistent, the parameter
+       'persist-id' must be given, and it must match the value of the
+       'persist' parameter.";
+    reference "RFC 6241, Section 8.4.4.1";
+
+    input {
+      leaf persist-id {
+        type string;
+        description
+          "This parameter is given in order to cancel a persistent
+           confirmed commit.  The value must be equal to the value
+           given in the 'persist' parameter to the <commit> operation.
+           If it does not match, the operation fails with an
+          'invalid-value' error.";
+      }
+    }
+  }
+
+  rpc validate {
+    if-feature validate;
+
+    description
+      "Validates the contents of the specified configuration.";
+
+    reference "RFC 6241, Section 8.6.4.1";
+
+    input {
+      container source {
+        description
+          "Particular configuration to validate.";
+
+        choice config-source {
+          mandatory true;
+          description
+            "The configuration source to validate.";
+
+          leaf candidate {
+            if-feature candidate;
+            type empty;
+            description
+              "The candidate configuration is the config source.";
+          }
+          leaf running {
+            type empty;
+            description
+              "The running configuration is the config source.";
+          }
+          leaf startup {
+            if-feature startup;
+            type empty;
+            description
+              "The startup configuration is the config source.";
+          }
+          leaf url {
+            if-feature url;
+            type inet:uri;
+            description
+              "The URL-based configuration is the config source.";
+          }
+          anyxml config {
+            description
+              "Inline Config content: <config> element.  Represents
+               an entire configuration datastore, not
+               a subset of the running datastore.";
+          }
+        }
+      }
+    }
+  }
+
+}
index 287ff2dca77a60a4a8f3cac0569c96fad233aac4..83e1f9129b731063128406f0e26e133289ddee3b 100644 (file)
@@ -444,7 +444,9 @@ public class NetconfDeviceSimulator implements Closeable {
                 final SimulatedEditConfig sEditConfig = new SimulatedEditConfig(String.valueOf(currentSessionId), storage);
                 final SimulatedGetConfig sGetConfig = new SimulatedGetConfig(String.valueOf(currentSessionId), storage);
                 final SimulatedCommit sCommit = new SimulatedCommit(String.valueOf(currentSessionId));
-                return Sets.<NetconfOperation>newHashSet(sGet,  sGetConfig, sEditConfig, sCommit);
+                final SimulatedLock sLock = new SimulatedLock(String.valueOf(currentSessionId));
+                final SimulatedUnLock sUnlock = new SimulatedUnLock(String.valueOf(currentSessionId));
+                return Sets.<NetconfOperation>newHashSet(sGet,  sGetConfig, sEditConfig, sCommit, sLock, sUnlock);
             }
 
             @Override
diff --git a/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/SimulatedLock.java b/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/SimulatedLock.java
new file mode 100644 (file)
index 0000000..4717e54
--- /dev/null
@@ -0,0 +1,35 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.test.tool;
+
+import com.google.common.base.Optional;
+import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
+import org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants;
+import org.opendaylight.controller.netconf.confignetconfconnector.operations.AbstractConfigNetconfOperation;
+import org.opendaylight.controller.netconf.util.xml.XmlElement;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+
+class SimulatedLock extends AbstractConfigNetconfOperation {
+
+    SimulatedLock(final String netconfSessionIdForReporting) {
+        super(null, netconfSessionIdForReporting);
+    }
+
+    @Override
+    protected Element handleWithNoSubsequentOperations(final Document document, final XmlElement operationElement) throws NetconfDocumentedException {
+        return XmlUtil.createElement(document, XmlNetconfConstants.OK, Optional.<String>absent());
+    }
+
+    @Override
+    protected String getOperationName() {
+        return "lock";
+    }
+}
diff --git a/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/SimulatedUnLock.java b/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/SimulatedUnLock.java
new file mode 100644 (file)
index 0000000..31f9fc1
--- /dev/null
@@ -0,0 +1,35 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.test.tool;
+
+import com.google.common.base.Optional;
+import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
+import org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants;
+import org.opendaylight.controller.netconf.confignetconfconnector.operations.AbstractConfigNetconfOperation;
+import org.opendaylight.controller.netconf.util.xml.XmlElement;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+
+class SimulatedUnLock extends AbstractConfigNetconfOperation {
+
+    SimulatedUnLock(final String netconfSessionIdForReporting) {
+        super(null, netconfSessionIdForReporting);
+    }
+
+    @Override
+    protected Element handleWithNoSubsequentOperations(final Document document, final XmlElement operationElement) throws NetconfDocumentedException {
+        return XmlUtil.createElement(document, XmlNetconfConstants.OK, Optional.<String>absent());
+    }
+
+    @Override
+    protected String getOperationName() {
+        return "unlock";
+    }
+}
index e1aa6ce3ed8184d979682df63281fe41311004e4..2a5ba096730728b6773e2c5d6d235837c77a378f 100644 (file)
@@ -13,9 +13,6 @@
   <version>0.3.0-SNAPSHOT</version>
   <packaging>pom</packaging>
   <name>${project.artifactId}</name>
-  <prerequisites>
-    <maven>3.0.4</maven>
-  </prerequisites>
 
   <modules>
     <module>netconf-api</module>
diff --git a/pom.xml b/pom.xml
index 1217d720663706fd12f4117a124a86d66d7dd8a7..f588f3f17c68bca7891f1d2856287104f908a308 100644 (file)
--- a/pom.xml
+++ b/pom.xml
   <packaging>pom</packaging>
   <name>controller</name> <!-- Used by Sonar to set project name -->
 
-  <prerequisites>
-    <maven>3.0</maven>
-  </prerequisites>
-
   <modules>
 
     <!-- md-sal -->