Merge "BUG-1567 Expose sources for submodules in netconf"
authorTony Tkacik <ttkacik@cisco.com>
Mon, 30 Mar 2015 08:20:45 +0000 (08:20 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Mon, 30 Mar 2015 08:20:46 +0000 (08:20 +0000)
25 files changed:
opendaylight/archetypes/opendaylight-startup/src/main/resources/archetype-resources/features/pom.xml
opendaylight/commons/opendaylight/pom.xml
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChanged.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/TestModel.java
opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClientSessionNegotiator.java
opendaylight/netconf/netconf-ssh/src/test/java/org/opendaylight/controller/netconf/netty/SSHTest.java
opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/messages/NetconfHelloMessage.java
opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/messages/NetconfMessageUtil.java

index c5adb28db745ce6c676f21ccf3f10c4edd43f933..4afbedb76e44ada6766aa119c3b63c8f773d25ff 100644 (file)
@@ -70,6 +70,13 @@ and is available at http://www.eclipse.org/legal/epl-v10.html INTERNAL
       <artifactId>${artifactId}-impl</artifactId>
       <version>${symbol_dollar}{project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>${symbol_dollar}{project.groupId}</groupId>
+      <artifactId>${artifactId}-impl</artifactId>
+      <version>${symbol_dollar}{project.version}</version>
+      <type>xml</type>
+      <classifier>config</classifier>
+    </dependency>
     <dependency>
       <groupId>${symbol_dollar}{project.groupId}</groupId>
       <artifactId>${artifactId}-api</artifactId>
index 130cb11c5afec8039831cf78d271c91a1e674074..2915a8dfd93de373373476ea717779937673e78d 100644 (file)
     <yang-ext.version>2013.09.07.7-SNAPSHOT</yang-ext.version>
     <yang-jmx-generator.version>1.1.0-SNAPSHOT</yang-jmx-generator.version>
     <yangtools.version>0.7.0-SNAPSHOT</yangtools.version>
-    <sshd-core.version>0.12.0</sshd-core.version>
+    <sshd-core.version>0.14.0</sshd-core.version>
     <jmh.version>0.9.7</jmh.version>
     <lmax.version>3.3.0</lmax.version>
   </properties>
index 2a3653ee91d1bc4ba30dca5a1f229412aba50534..c276d32cce33d5b5bfada40f7f62afb6244a2e07 100644 (file)
@@ -39,6 +39,8 @@ import scala.concurrent.duration.FiniteDuration;
  */
 public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
 
+    protected static final ElectionTimeout ELECTION_TIMEOUT = new ElectionTimeout();
+
     /**
      * Information about the RaftActor whose behavior this class represents
      */
@@ -254,7 +256,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
         // message is sent to itself
         electionCancel =
             context.getActorSystem().scheduler().scheduleOnce(interval,
-                context.getActor(), new ElectionTimeout(),
+                context.getActor(), ELECTION_TIMEOUT,
                 context.getActorSystem().dispatcher(), context.getActor());
     }
 
index b36c41abf262b8c24d05f354091f7c4ec92a0827..74bede171f1f6e6ad6b33feef6806d3c77321581 100644 (file)
@@ -58,7 +58,14 @@ public class Candidate extends AbstractRaftActorBehavior {
         votesRequired = getMajorityVoteCount(peers.size());
 
         startNewTerm();
-        scheduleElection(electionDuration());
+
+        if(context.getPeerAddresses().isEmpty()){
+            actor().tell(ELECTION_TIMEOUT, actor());
+        } else {
+            scheduleElection(electionDuration());
+        }
+
+
     }
 
     @Override protected RaftActorBehavior handleAppendEntries(ActorRef sender,
index 6a29a348b8420aa0e77fd9c93f3ada12b4578fa4..a6722e6ff98dbbe9ab68df6c9e04915c23c8721a 100644 (file)
@@ -46,9 +46,14 @@ public class Follower extends AbstractRaftActorBehavior {
     public Follower(RaftActorContext context) {
         super(context, RaftState.Follower);
 
-        scheduleElection(electionDuration());
-
         initialSyncStatusTracker = new InitialSyncStatusTracker(context.getActor());
+
+        if(context.getPeerAddresses().isEmpty()){
+            actor().tell(ELECTION_TIMEOUT, actor());
+        } else {
+            scheduleElection(electionDuration());
+        }
+
     }
 
     private boolean isLogEntryPresent(long index){
index 120a3a16a9e8ce95b262c7c01637dcdbe2351013..13445b0b26bb54f2794446647ab494ffb794fd26 100644 (file)
@@ -261,4 +261,8 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
         assertEquals("ReplicatedLogEntry getIndex", expIndex, replicatedLogEntry.getIndex());
         assertEquals("ReplicatedLogEntry getData", payload, replicatedLogEntry.getData());
     }
+
+    protected String testActorPath(String id){
+        return "akka://test/user" + id;
+    }
 }
index b93b73958baede1e6e7e51560bed0386d4587a65..0a4a2c7717facfcc9fc883c2234d4577ff49f876 100644 (file)
@@ -31,6 +31,7 @@ import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
 import akka.util.Timeout;
 import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.google.protobuf.ByteString;
@@ -158,6 +159,16 @@ public class RaftActorTest extends AbstractActorTest {
             }
         }
 
+
+        public void waitUntilLeader(){
+            for(int i = 0;i < 10; i++){
+                if(isLeader()){
+                    break;
+                }
+                Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+            }
+        }
+
         public List<Object> getState() {
             return state;
         }
@@ -177,6 +188,13 @@ public class RaftActorTest extends AbstractActorTest {
             return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, roleChangeNotifier));
         }
 
+        public static Props props(final String id, final Map<String, String> peerAddresses,
+                                  Optional<ConfigParams> config, ActorRef roleChangeNotifier,
+                                  DataPersistenceProvider dataPersistenceProvider){
+            return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, roleChangeNotifier));
+        }
+
+
         @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
             delegate.applyState(clientActor, identifier, data);
             LOG.info("{}: applyState called", persistenceId());
@@ -674,11 +692,13 @@ public class RaftActorTest extends AbstractActorTest {
 
                 mockRaftActor.waitForInitializeBehaviorComplete();
 
+                mockRaftActor.waitUntilLeader();
+
                 mockRaftActor.getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
 
                 mockRaftActor.getRaftActorContext().getReplicatedLog().removeFromAndPersist(0);
 
-                verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class));
+                verify(dataPersistenceProvider, times(3)).persist(anyObject(), any(Procedure.class));
             }
         };
     }
@@ -702,9 +722,11 @@ public class RaftActorTest extends AbstractActorTest {
 
                 mockRaftActor.waitForInitializeBehaviorComplete();
 
+                mockRaftActor.waitUntilLeader();
+
                 mockRaftActor.onReceiveCommand(new ApplyJournalEntries(10));
 
-                verify(dataPersistenceProvider, times(1)).persist(anyObject(), any(Procedure.class));
+                verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class));
 
             }
 
@@ -766,7 +788,7 @@ public class RaftActorTest extends AbstractActorTest {
                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
 
                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
-                        Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
+                        ImmutableMap.of("leader", "fake/path"), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
 
                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
 
@@ -949,7 +971,7 @@ public class RaftActorTest extends AbstractActorTest {
     }
 
     @Test
-    public void testRaftRoleChangeNotifier() throws Exception {
+    public void testRaftRoleChangeNotifierWhenRaftActorHasNoPeers() throws Exception {
         new JavaTestKit(getSystem()) {{
             TestActorRef<MessageCollectorActor> notifierActor = factory.createTestActor(
                     Props.create(MessageCollectorActor.class));
@@ -958,15 +980,17 @@ public class RaftActorTest extends AbstractActorTest {
             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
             long heartBeatInterval = 100;
             config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
-            config.setElectionTimeoutFactor(1);
+            config.setElectionTimeoutFactor(20);
 
             String persistenceId = factory.generateActorId("notifier-");
 
             TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
-                    Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor), persistenceId);
+                    Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor,
+                    new NonPersistentProvider()), persistenceId);
 
             List<RoleChanged> matches =  MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3);
 
+
             // check if the notifier got a role change from null to Follower
             RoleChanged raftRoleChanged = matches.get(0);
             assertEquals(persistenceId, raftRoleChanged.getMemberId());
@@ -1022,6 +1046,49 @@ public class RaftActorTest extends AbstractActorTest {
         }};
     }
 
+    @Test
+    public void testRaftRoleChangeNotifierWhenRaftActorHasPeers() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            ActorRef notifierActor = factory.createActor(Props.create(MessageCollectorActor.class));
+            MessageCollectorActor.waitUntilReady(notifierActor);
+
+            DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+            long heartBeatInterval = 100;
+            config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
+            config.setElectionTimeoutFactor(1);
+
+            String persistenceId = factory.generateActorId("notifier-");
+
+            factory.createActor(MockRaftActor.props(persistenceId,
+                    ImmutableMap.of("leader", "fake/path"), Optional.<ConfigParams>of(config), notifierActor), persistenceId);
+
+            List<RoleChanged> matches =  null;
+            for(int i = 0; i < 5000 / heartBeatInterval; i++) {
+                matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
+                assertNotNull(matches);
+                if(matches.size() == 3) {
+                    break;
+                }
+                Uninterruptibles.sleepUninterruptibly(heartBeatInterval, TimeUnit.MILLISECONDS);
+            }
+
+            assertEquals(2, matches.size());
+
+            // check if the notifier got a role change from null to Follower
+            RoleChanged raftRoleChanged = matches.get(0);
+            assertEquals(persistenceId, raftRoleChanged.getMemberId());
+            assertNull(raftRoleChanged.getOldRole());
+            assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
+
+            // check if the notifier got a role change from Follower to Candidate
+            raftRoleChanged = matches.get(1);
+            assertEquals(persistenceId, raftRoleChanged.getMemberId());
+            assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
+            assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
+
+        }};
+    }
+
     @Test
     public void testFakeSnapshotsForLeaderWithInRealSnapshots() throws Exception {
         new JavaTestKit(getSystem()) {
index 7a291f364b1ce9e6646d9460c665b4610e3a5f79..bd670fd581153e673ef145ca178211c6f7e60f86 100644 (file)
@@ -62,9 +62,11 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt
         // Create the leader and 2 follower actors and verify initial syncing of the followers after leader
         // persistence recovery.
 
-        follower1Actor = newTestRaftActor(follower1Id, null, newFollowerConfigParams());
+        follower1Actor = newTestRaftActor(follower1Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
+                follower2Id, testActorPath(follower2Id)), newFollowerConfigParams());
 
-        follower2Actor = newTestRaftActor(follower2Id, null, newFollowerConfigParams());
+        follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
+                follower1Id, testActorPath(follower1Id)), newFollowerConfigParams());
 
         peerAddresses = ImmutableMap.<String, String>builder().
                 put(follower1Id, follower1Actor.path().toString()).
index aca19c0b8b10ada72a2e7064f20da39392fe72c3..d4a9f7701b0bec82a775f14ee439463ce2da2f78 100644 (file)
@@ -56,10 +56,11 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         InMemoryJournal.addEntry(leaderId, 1, new UpdateElectionTerm(initialTerm, leaderId));
 
         // Create the leader and 2 follower actors.
+        follower1Actor = newTestRaftActor(follower1Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
+                follower2Id, testActorPath(follower2Id)), newFollowerConfigParams());
 
-        follower1Actor = newTestRaftActor(follower1Id, null, newFollowerConfigParams());
-
-        follower2Actor = newTestRaftActor(follower2Id, null, newFollowerConfigParams());
+        follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
+                follower1Id, testActorPath(follower1Id)), newFollowerConfigParams());
 
         Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().
                 put(follower1Id, follower1Actor.path().toString()).
index 60f45523cf204d5d246fb14ac01eddc4dcc760ba..63fd530675a78e9f4ec0c35c67e394e210ae9fd3 100644 (file)
@@ -1,12 +1,15 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.testkit.TestActorRef;
+import com.google.common.base.Stopwatch;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -184,6 +187,20 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest {
         assertEquals("getTerm", 1001, reply.getTerm());
     }
 
+    @Test
+    public void testCandidateSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers(){
+        MockRaftActorContext context = createActorContext();
+
+        Stopwatch stopwatch = Stopwatch.createStarted();
+
+        candidate = createBehavior(context);
+
+        MessageCollectorActor.expectFirstMatching(candidateActor, ElectionTimeout.class);
+
+        long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
+
+        assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis());
+    }
 
 
     @Override
index 75509bae51573a422fac7594af79788ce63ad79a..26e43648787530c8911bf2139a035df11224f768 100644 (file)
@@ -12,11 +12,13 @@ import static org.mockito.Mockito.verify;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.testkit.TestActorRef;
+import com.google.common.base.Stopwatch;
 import com.google.protobuf.ByteString;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
@@ -755,6 +757,21 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker());
     }
 
+    @Test
+    public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers(){
+        MockRaftActorContext context = createActorContext();
+
+        Stopwatch stopwatch = Stopwatch.createStarted();
+
+        follower = createBehavior(context);
+
+        MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
+
+        long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
+
+        assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis());
+    }
+
     public ByteString getNextChunk (ByteString bs, int offset, int chunkSize){
         int snapshotLength = bs.size();
         int start = offset;
index 1dda2791c063d48966ec778e98bc462c20521e0f..ba0bd0f29c96237ab758487b719eac959d115edc 100644 (file)
@@ -48,6 +48,7 @@ import scala.concurrent.duration.FiniteDuration;
 public class LeaderTest extends AbstractLeaderTest {
 
     static final String FOLLOWER_ID = "follower";
+    public static final String LEADER_ID = "leader";
 
     private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
@@ -948,7 +949,7 @@ public class LeaderTest extends AbstractLeaderTest {
 
     @Override
     protected MockRaftActorContext createActorContext(ActorRef actorRef) {
-        return createActorContext("leader", actorRef);
+        return createActorContext(LEADER_ID, actorRef);
     }
 
     private MockRaftActorContext createActorContextWithFollower() {
@@ -1027,14 +1028,15 @@ public class LeaderTest extends AbstractLeaderTest {
         MockRaftActorContext leaderActorContext = createActorContext();
 
         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
+        followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
 
         Follower follower = new Follower(followerActorContext);
         followerActor.underlyingActor().setBehavior(follower);
 
-        Map<String, String> peerAddresses = new HashMap<>();
-        peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
+        Map<String, String> leaderPeerAddresses = new HashMap<>();
+        leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
 
-        leaderActorContext.setPeerAddresses(peerAddresses);
+        leaderActorContext.setPeerAddresses(leaderPeerAddresses);
 
         leaderActorContext.getReplicatedLog().removeFrom(0);
 
@@ -1269,6 +1271,7 @@ public class LeaderTest extends AbstractLeaderTest {
         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
 
         followerActorContext.setConfigParams(configParams);
+        followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
 
         Follower follower = new Follower(followerActorContext);
         followerActor.underlyingActor().setBehavior(follower);
index f315bfdf7a13ad9e2c0b9ecaf6550f0a138763bc..770d709abe79d1fc60bfc705b8bfa94a591991c9 100644 (file)
@@ -28,4 +28,13 @@ public class RoleChanged {
     public String getNewRole() {
         return newRole;
     }
+
+    @Override
+    public String toString() {
+        return "RoleChanged{" +
+                "memberId='" + memberId + '\'' +
+                ", oldRole='" + oldRole + '\'' +
+                ", newRole='" + newRole + '\'' +
+                '}';
+    }
 }
index 55a86ceeea966aab6d67f2b3ba1692d772e74640..52762b4eb352ff9de295e44969b13c4410b7f2f0 100644 (file)
@@ -197,6 +197,13 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
         if(shardInformation != null) {
             shardInformation.setLeaderId(leaderStateChanged.getLeaderId());
+            if (isReadyWithLeaderId()) {
+                LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
+                        persistenceId(), type, waitTillReadyCountdownLatch.getCount());
+
+                waitTillReadyCountdownLatch.countDown();
+            }
+
         } else {
             LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId());
         }
@@ -241,7 +248,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         if(shardInformation != null) {
             shardInformation.setRole(roleChanged.getNewRole());
 
-            if (isReady()) {
+            if (isReadyWithLeaderId()) {
                 LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
                         persistenceId(), type, waitTillReadyCountdownLatch.getCount());
 
@@ -263,10 +270,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         return null;
     }
 
-    private boolean isReady() {
+    private boolean isReadyWithLeaderId() {
         boolean isReady = true;
         for (ShardInformation info : localShards.values()) {
-            if(!info.isShardReady()){
+            if(!info.isShardReadyWithLeaderId()){
                 isReady = false;
                 break;
             }
index c479da73127760977d4c27b3ff9873d10c295c57..aeb4062103d1c0fc77c29f0c733447728a60194f 100644 (file)
@@ -16,6 +16,7 @@ import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
@@ -117,17 +118,25 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
         if(LOG.isDebugEnabled()) {
             LOG.debug("Tx {} finishCanCommit", transactionId);
         }
-        // The last phase of canCommit is to invoke all the cohort actors asynchronously to perform
-        // their canCommit processing. If any one fails then we'll fail canCommit.
 
-        Future<Iterable<Object>> combinedFuture =
-                invokeCohorts(new CanCommitTransaction(transactionId).toSerializable());
+        // For empty transactions return immediately
+        if(cohorts.size() == 0){
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Tx {}: canCommit returning result: {}", transactionId, true);
+            }
+            returnFuture.set(Boolean.TRUE);
+            return;
+        }
 
-        combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
+        final Object message = new CanCommitTransaction(transactionId).toSerializable();
+
+        final Iterator<ActorSelection> iterator = cohorts.iterator();
+
+        final OnComplete<Object> onComplete = new OnComplete<Object>() {
             @Override
-            public void onComplete(Throwable failure, Iterable<Object> responses) throws Throwable {
-                if(failure != null) {
-                    if(LOG.isDebugEnabled()) {
+            public void onComplete(Throwable failure, Object response) throws Throwable {
+                if (failure != null) {
+                    if (LOG.isDebugEnabled()) {
                         LOG.debug("Tx {}: a canCommit cohort Future failed: {}", transactionId, failure);
                     }
                     returnFuture.setException(failure);
@@ -135,27 +144,36 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
                 }
 
                 boolean result = true;
-                for(Object response: responses) {
-                    if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) {
-                        CanCommitTransactionReply reply =
-                                CanCommitTransactionReply.fromSerializable(response);
-                        if (!reply.getCanCommit()) {
-                            result = false;
-                            break;
-                        }
-                    } else {
-                        LOG.error("Unexpected response type {}", response.getClass());
-                        returnFuture.setException(new IllegalArgumentException(
-                                String.format("Unexpected response type %s", response.getClass())));
-                        return;
+                if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) {
+                    CanCommitTransactionReply reply =
+                            CanCommitTransactionReply.fromSerializable(response);
+                    if (!reply.getCanCommit()) {
+                        result = false;
                     }
+                } else {
+                    LOG.error("Unexpected response type {}", response.getClass());
+                    returnFuture.setException(new IllegalArgumentException(
+                            String.format("Unexpected response type %s", response.getClass())));
+                    return;
                 }
-                if(LOG.isDebugEnabled()) {
-                    LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result);
+
+                if(iterator.hasNext() && result){
+                    Future<Object> future = actorContext.executeOperationAsync(iterator.next(), message,
+                            actorContext.getTransactionCommitOperationTimeout());
+                    future.onComplete(this, actorContext.getClientDispatcher());
+                } else {
+                    if(LOG.isDebugEnabled()) {
+                        LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result);
+                    }
+                    returnFuture.set(Boolean.valueOf(result));
                 }
-                returnFuture.set(Boolean.valueOf(result));
+
             }
-        }, actorContext.getClientDispatcher());
+        };
+
+        Future<Object> future = actorContext.executeOperationAsync(iterator.next(), message,
+                actorContext.getTransactionCommitOperationTimeout());
+        future.onComplete(onComplete, actorContext.getClientDispatcher());
     }
 
     private Future<Iterable<Object>> invokeCohorts(Object message) {
index 378bc717f4da3b22f2516a2c303c43abc9144fab..34f0164504fe666e7cfb76090c57ed5b2362a268 100644 (file)
@@ -16,6 +16,7 @@ import static org.mockito.Mockito.mock;
 import akka.actor.ActorRef;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
+import akka.dispatch.Dispatchers;
 import akka.japi.Creator;
 import akka.testkit.TestActorRef;
 import com.google.common.base.Function;
@@ -117,7 +118,7 @@ public abstract class AbstractShardTest extends AbstractActorTest{
         };
 
         TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
-                Props.create(new DelegatingShardCreator(creator)), "testRecovery");
+                Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()), "testRecovery");
 
         assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
 
index fdc7e664c2b6cd63b5071e504bebf70c5f0dc05d..a8384d8758a7bc3523bf75f1f3e31480d21ae737 100644 (file)
@@ -468,7 +468,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
     private void testTransactionCommitFailureWithNoShardLeader(final boolean writeOnly) throws Throwable {
         new IntegrationTestKit(getSystem()) {{
             String testName = "testTransactionCommitFailureWithNoShardLeader";
-            String shardName = "test-1";
+            String shardName = "default";
 
             // We don't want the shard to become the leader so prevent shard election from completing
             // by setting the election timeout, which is based on the heartbeat interval, really high.
@@ -497,8 +497,8 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
                 @Override
                 public void run() {
                     try {
-                        writeTx.write(TestModel.TEST_PATH,
-                                ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+                        writeTx.write(TestModel.JUNK_PATH,
+                                ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
 
                         txCohort.set(writeTx.ready());
                     } catch(Exception e) {
index 95b1b78a198e511d15347637f2f1558b86d81e65..b676cf225c801039d1e679298e6d7cb23d2808ac 100644 (file)
@@ -639,7 +639,7 @@ public class ShardManagerTest extends AbstractActorTest {
     }
 
     @Test
-    public void testRoleChangeNotificationReleaseReady() throws Exception {
+    public void testRoleChangeNotificationAndLeaderStateChangedReleaseReady() throws Exception {
         new JavaTestKit(getSystem()) {
             {
                 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
@@ -648,11 +648,35 @@ public class ShardManagerTest extends AbstractActorTest {
                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
                         memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
 
+                verify(ready, never()).countDown();
+
+                shardManager.underlyingActor().onReceiveCommand(new LeaderStateChanged(memberId, memberId));
+
+                verify(ready, times(1)).countDown();
+
+            }};
+    }
+
+    @Test
+    public void testRoleChangeNotificationToFollowerWithLeaderStateChangedReleaseReady() throws Exception {
+        new JavaTestKit(getSystem()) {
+            {
+                TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
+
+                String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
+                shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
+                        memberId, null, RaftState.Follower.name()));
+
+                verify(ready, never()).countDown();
+
+                shardManager.underlyingActor().onReceiveCommand(new LeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix));
+
                 verify(ready, times(1)).countDown();
 
             }};
     }
 
+
     @Test
     public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
         new JavaTestKit(getSystem()) {
index efcb0c9bfd6da4ae3cf0dce5681d3931eaa1271e..cc96d0d3b0d070623c737dc8f78340c45a20539f 100644 (file)
@@ -160,8 +160,12 @@ public class ShardTest extends AbstractShardTest {
 
                 @Override
                 public Shard create() throws Exception {
+                    // Use a non persistent provider because this test actually invokes persist on the journal
+                    // this will cause all other messages to not be queued properly after that.
+                    // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when
+                    // it does do a persist)
                     return new Shard(shardID, Collections.<String,String>emptyMap(),
-                            newDatastoreContext(), SCHEMA_CONTEXT) {
+                            dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
                         @Override
                         public void onReceiveCommand(final Object message) throws Exception {
                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
@@ -935,7 +939,7 @@ public class ShardTest extends AbstractShardTest {
 
                 // Use MBean for verification
                 // Committed transaction count should increase as usual
-                assertEquals(1,shard.underlyingActor().getShardMBean().getCommittedTransactionsCount());
+                assertEquals(1, shard.underlyingActor().getShardMBean().getCommittedTransactionsCount());
 
                 // Commit index should advance as we do not have an empty modification
                 assertEquals(0, shard.underlyingActor().getShardMBean().getCommitIndex());
index 647b6e7b542508953bb9750555aaf193c0ba2864..d595adc8bb80c0a98701526b2240b757c1f2d6c6 100644 (file)
@@ -38,6 +38,7 @@ import org.opendaylight.controller.cluster.datastore.messages.SerializableMessag
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
 import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
 
 public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
 
@@ -116,6 +117,9 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
 
         stubber.when(actorContext).executeOperationAsync(any(ActorSelection.class),
                 isA(requestType), any(Timeout.class));
+
+        doReturn(new Timeout(Duration.apply(1000, TimeUnit.MILLISECONDS)))
+                .when(actorContext).getTransactionCommitOperationTimeout();
     }
 
     private void verifyCohortInvocations(int nCohorts, Class<?> requestType) {
@@ -180,9 +184,11 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
 
         ListenableFuture<Boolean> future = proxy.canCommit();
 
-        assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS));
+        Boolean actual = future.get(5, TimeUnit.SECONDS);
 
-        verifyCohortInvocations(3, CanCommitTransaction.SERIALIZABLE_CLASS);
+        assertEquals("canCommit", false, actual);
+
+        verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
     }
 
     @Test(expected = TestException.class)
index 9761ed8615a763df20ec6763defc69b55395a120..42406080361601de2d3c53bb91a8f9c62fc7a886 100644 (file)
@@ -21,6 +21,10 @@ public class TestModel {
   public static final QName TEST_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test", "2014-03-13",
           "test");
 
+  public static final QName JUNK_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:junk", "2014-03-13",
+          "junk");
+
+
   public static final QName OUTER_LIST_QNAME = QName.create(TEST_QNAME, "outer-list");
   public static final QName INNER_LIST_QNAME = QName.create(TEST_QNAME, "inner-list");
   public static final QName OUTER_CHOICE_QNAME = QName.create(TEST_QNAME, "outer-choice");
@@ -31,6 +35,7 @@ public class TestModel {
   private static final String DATASTORE_TEST_YANG = "/odl-datastore-test.yang";
 
   public static final YangInstanceIdentifier TEST_PATH = YangInstanceIdentifier.of(TEST_QNAME);
+  public static final YangInstanceIdentifier JUNK_PATH = YangInstanceIdentifier.of(JUNK_QNAME);
   public static final YangInstanceIdentifier OUTER_LIST_PATH = YangInstanceIdentifier.builder(TEST_PATH).
           node(OUTER_LIST_QNAME).build();
   public static final YangInstanceIdentifier INNER_LIST_PATH = YangInstanceIdentifier.builder(TEST_PATH).
index 06c695c25a1941d74877234a7c030a45b34ac7e1..f4017fbe5897521e5589f36e987abdc454cfb895 100644 (file)
@@ -8,7 +8,7 @@
 
 package org.opendaylight.controller.netconf.client;
 
-import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
@@ -45,6 +45,9 @@ public class NetconfClientSessionNegotiator extends
     private static final XPathExpression sessionIdXPath = XMLNetconfUtil
             .compileXPath("/netconf:hello/netconf:session-id");
 
+    private static final XPathExpression sessionIdXPathNoNamespace = XMLNetconfUtil
+            .compileXPath("/hello/session-id");
+
     private static final String EXI_1_0_CAPABILITY_MARKER = "exi:1.0";
 
     protected NetconfClientSessionNegotiator(final NetconfClientSessionPreferences sessionPreferences,
@@ -113,16 +116,22 @@ public class NetconfClientSessionNegotiator extends
     }
 
     private long extractSessionId(final Document doc) {
-        final Node sessionIdNode = (Node) XmlUtil.evaluateXPath(sessionIdXPath, doc, XPathConstants.NODE);
-        Preconditions.checkState(sessionIdNode != null, "");
-        String textContent = sessionIdNode.getTextContent();
-        if (textContent == null || textContent.equals("")) {
-            throw new IllegalStateException("Session id not received from server");
+        String textContent = getSessionIdWithXPath(doc, sessionIdXPath);
+        if (Strings.isNullOrEmpty(textContent)) {
+            textContent = getSessionIdWithXPath(doc, sessionIdXPathNoNamespace);
+            if (Strings.isNullOrEmpty(textContent)) {
+                throw new IllegalStateException("Session id not received from server, hello message: " + XmlUtil.toString(doc));
+            }
         }
 
         return Long.valueOf(textContent);
     }
 
+    private String getSessionIdWithXPath(final Document doc, final XPathExpression sessionIdXPath) {
+        final Node sessionIdNode = (Node) XmlUtil.evaluateXPath(sessionIdXPath, doc, XPathConstants.NODE);
+        return sessionIdNode != null ? sessionIdNode.getTextContent() : null;
+    }
+
     @Override
     protected NetconfClientSession getSession(final NetconfClientSessionListener sessionListener, final Channel channel,
             final NetconfHelloMessage message) throws NetconfDocumentedException {
index ab731260216f4452f6ba3d5e8c6b64c8179bdad2..d7d8660ae49149923cf60ada5b236dff745f9aed 100644 (file)
@@ -61,7 +61,7 @@ public class SSHTest {
     @AfterClass
     public static void tearDown() throws Exception {
         hashedWheelTimer.stop();
-        nettyGroup.shutdownGracefully().await();
+        nettyGroup.shutdownGracefully().await(5, TimeUnit.SECONDS);
         minaTimerEx.shutdownNow();
         nioExec.shutdownNow();
     }
index 5cd17a2331e293c23d064c9acd535e021de43c8e..404885db7e9a497a243abd12d468df7c48f8b6ae 100644 (file)
@@ -94,9 +94,9 @@ public final class NetconfHelloMessage extends NetconfMessage {
     private static boolean isHelloMessage(final Document document) {
         XmlElement element = XmlElement.fromDomElement(document.getDocumentElement());
         try {
+            // accept even if hello has no namespace
             return element.getName().equals(HELLO_TAG) &&
-                   element.hasNamespace() &&
-                   element.getNamespace().equals(XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_BASE_1_0);
+                    (!element.hasNamespace() || element.getNamespace().equals(XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_BASE_1_0));
         } catch (MissingNameSpaceException e) {
             // Cannot happen, since we check for hasNamespace
             throw new IllegalStateException(e);
index 61b23202c3a134c27d975279e2f29f7d7eefefa0..3c6b6ccab9efa0a0a28b5630da5d5c9f30157610 100644 (file)
@@ -9,6 +9,7 @@
 package org.opendaylight.controller.netconf.util.messages;
 
 import com.google.common.base.Function;
+import com.google.common.base.Optional;
 import com.google.common.collect.Collections2;
 import java.util.Collection;
 import java.util.List;
@@ -59,9 +60,13 @@ public final class NetconfMessageUtil {
 
     public static Collection<String> extractCapabilitiesFromHello(Document doc) throws NetconfDocumentedException {
         XmlElement responseElement = XmlElement.fromDomDocument(doc);
-        XmlElement capabilitiesElement = responseElement
-                .getOnlyChildElementWithSameNamespace(XmlNetconfConstants.CAPABILITIES);
-        List<XmlElement> caps = capabilitiesElement.getChildElements(XmlNetconfConstants.CAPABILITY);
+        // Extract child element <capabilities> from <hello> with or without(fallback) the same namespace
+        Optional<XmlElement> capabilitiesElement = responseElement
+                .getOnlyChildElementWithSameNamespaceOptionally(XmlNetconfConstants.CAPABILITIES)
+                .or(responseElement
+                        .getOnlyChildElementOptionally(XmlNetconfConstants.CAPABILITIES));
+
+        List<XmlElement> caps = capabilitiesElement.get().getChildElements(XmlNetconfConstants.CAPABILITY);
         return Collections2.transform(caps, new Function<XmlElement, String>() {
 
             @Override