Merge "BUG 3066 : Optimistic lock failed, on NetconfStateUpdate" into stable/lithium
authorTony Tkacik <ttkacik@cisco.com>
Wed, 29 Apr 2015 07:04:37 +0000 (07:04 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Wed, 29 Apr 2015 07:04:38 +0000 (07:04 +0000)
17 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/LeaderStateChanged.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/FindPrimary.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/LocalPrimaryShardFound.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RemoteFindPrimary.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RemotePrimaryShardFound.java [moved from opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryFound.java with 53% similarity]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ShardLeaderStateChanged.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockClusterWrapper.java
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/JsonNormalizedNodeBodyReader.java
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/BrokerFacade.java
opendaylight/md-sal/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/test/BrokerFacadeTest.java

index 1738cc5fe001ab8b30cfd105f67dc4d9adcc8200..d82528c48a92dee74feffd3a47a91e0bafa863c5 100644 (file)
@@ -298,7 +298,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         Optional<ActorRef> roleChangeNotifier = getRoleChangeNotifier();
         if(!Objects.equal(oldBehaviorLeaderId, currentBehavior.getLeaderId())) {
             if(roleChangeNotifier.isPresent()) {
-                roleChangeNotifier.get().tell(new LeaderStateChanged(getId(), currentBehavior.getLeaderId()), getSelf());
+                roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), currentBehavior.getLeaderId()), getSelf());
             }
 
             onLeaderChanged(oldBehaviorLeaderId, currentBehavior.getLeaderId());
@@ -311,6 +311,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         }
     }
 
+    protected LeaderStateChanged newLeaderStateChanged(String memberId, String leaderId) {
+        return new LeaderStateChanged(memberId, leaderId);
+    }
+
     /**
      * When a derived RaftActor needs to persist something it must call
      * persistData.
index ec35b03b0a5956176aeafadbdaa56fa990e23699..23c95ecc99c0dea3ca144092018fd303d6dd7511 100644 (file)
@@ -7,29 +7,30 @@
  */
 package org.opendaylight.controller.cluster.notifications;
 
-import java.io.Serializable;
+import com.google.common.base.Preconditions;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
 
 /**
- * A message initiated internally from the RaftActor when some state of a leader has changed
+ * A local message initiated internally from the RaftActor when some state of a leader has changed.
  *
  * @author Thomas Pantelis
  */
-public class LeaderStateChanged implements Serializable {
-    private static final long serialVersionUID = 1L;
-
+public class LeaderStateChanged {
     private final String memberId;
     private final String leaderId;
 
-    public LeaderStateChanged(String memberId, String leaderId) {
-        this.memberId = memberId;
+    public LeaderStateChanged(@Nonnull String memberId, @Nullable String leaderId) {
+        this.memberId = Preconditions.checkNotNull(memberId);
         this.leaderId = leaderId;
     }
 
-    public String getMemberId() {
+    public @Nonnull String getMemberId() {
         return memberId;
     }
 
-    public String getLeaderId() {
+    public @Nullable String getLeaderId() {
         return leaderId;
     }
 
index 62d3259a714059f497ca3e97ad08166a3a1106bf..e62e918e5e7eff53b8f7e00dbe1e8300d270b59f 100644 (file)
@@ -48,12 +48,14 @@ import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTran
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
+import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
 import org.opendaylight.controller.cluster.datastore.utils.MessageTracker;
+import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
 import org.opendaylight.controller.cluster.raft.RaftActor;
@@ -63,6 +65,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyn
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
@@ -265,6 +268,12 @@ public class Shard extends RaftActor {
         return roleChangeNotifier;
     }
 
+    @Override
+    protected LeaderStateChanged newLeaderStateChanged(String memberId, String leaderId) {
+        return new ShardLeaderStateChanged(memberId, leaderId,
+                isLeader() ? Optional.<DataTree>of(store.getDataTree()) : Optional.<DataTree>absent());
+    }
+
     private void onDatastoreContext(DatastoreContext context) {
         datastoreContext = context;
 
index cff44b13cb3edb1756ed6159e95d8fe259f91fde..f4fa7b3a97e8617f8ac474b494a08deb330a0b6d 100644 (file)
@@ -24,6 +24,7 @@ import akka.persistence.RecoveryFailure;
 import akka.serialization.Serialization;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
+import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.common.base.Supplier;
@@ -54,17 +55,20 @@ import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.Sha
 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
+import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
+import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
-import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
@@ -185,27 +189,31 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
         } else if(message instanceof ShardNotInitializedTimeout) {
             onShardNotInitializedTimeout((ShardNotInitializedTimeout)message);
-        } else if(message instanceof LeaderStateChanged) {
-            onLeaderStateChanged((LeaderStateChanged)message);
+        } else if(message instanceof ShardLeaderStateChanged) {
+            onLeaderStateChanged((ShardLeaderStateChanged)message);
         } else {
             unknownMessage(message);
         }
 
     }
 
-    private void onLeaderStateChanged(LeaderStateChanged leaderStateChanged) {
+    private void checkReady(){
+        if (isReadyWithLeaderId()) {
+            LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
+                    persistenceId(), type, waitTillReadyCountdownLatch.getCount());
+
+            waitTillReadyCountdownLatch.countDown();
+        }
+    }
+
+    private void onLeaderStateChanged(ShardLeaderStateChanged leaderStateChanged) {
         LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged);
 
         ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
         if(shardInformation != null) {
+            shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree());
             shardInformation.setLeaderId(leaderStateChanged.getLeaderId());
-            if (isReadyWithLeaderId()) {
-                LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
-                        persistenceId(), type, waitTillReadyCountdownLatch.getCount());
-
-                waitTillReadyCountdownLatch.countDown();
-            }
-
+            checkReady();
         } else {
             LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId());
         }
@@ -249,14 +257,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId());
         if(shardInformation != null) {
             shardInformation.setRole(roleChanged.getNewRole());
-
-            if (isReadyWithLeaderId()) {
-                LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
-                        persistenceId(), type, waitTillReadyCountdownLatch.getCount());
-
-                waitTillReadyCountdownLatch.countDown();
-            }
-
+            checkReady();
             mBean.setSyncStatus(isInSync());
         }
     }
@@ -439,6 +440,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             info.updatePeerAddress(getShardIdentifier(memberName, shardName).toString(),
                 getShardActorPath(shardName, memberName), getSelf());
         }
+
+        checkReady();
     }
 
     private void onDatastoreContext(DatastoreContext context) {
@@ -510,6 +513,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         LOG.debug("{}: In findPrimary: {}", persistenceId(), message);
 
         final String shardName = message.getShardName();
+        final boolean canReturnLocalShardState = !(message instanceof RemoteFindPrimary);
 
         // First see if the there is a local replica for the shard
         final ShardInformation info = localShards.get(shardName);
@@ -517,7 +521,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             sendResponse(info, message.isWaitUntilReady(), true, new Supplier<Object>() {
                 @Override
                 public Object get() {
-                    Object found = new PrimaryFound(info.getSerializedLeaderActor());
+                    String primaryPath = info.getSerializedLeaderActor();
+                    Object found = canReturnLocalShardState && info.isLeader() ?
+                            new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
+                                new RemotePrimaryShardFound(primaryPath);
 
                     if(LOG.isDebugEnabled()) {
                         LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
@@ -537,7 +544,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                 LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}", persistenceId(),
                         shardName, path);
 
-                getContext().actorSelection(path).forward(message, getContext());
+                getContext().actorSelection(path).forward(new RemoteFindPrimary(shardName,
+                        message.isWaitUntilReady()), getContext());
                 return;
             }
         }
@@ -665,6 +673,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         private ActorRef actor;
         private ActorPath actorPath;
         private final Map<String, String> peerAddresses;
+        private Optional<DataTree> localShardDataTree;
 
         // flag that determines if the actor is ready for business
         private boolean actorInitialized = false;
@@ -703,6 +712,14 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return shardId;
         }
 
+        void setLocalDataTree(Optional<DataTree> localShardDataTree) {
+            this.localShardDataTree = localShardDataTree;
+        }
+
+        Optional<DataTree> getLocalShardDataTree() {
+            return localShardDataTree;
+        }
+
         Map<String, String> getPeerAddresses() {
             return peerAddresses;
         }
@@ -731,7 +748,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
 
         boolean isShardReadyWithLeaderId() {
-            return isShardReady() && (isLeader() || peerAddresses.containsKey(leaderId));
+            return isShardReady() && (isLeader() || peerAddresses.get(leaderId) != null);
         }
 
         boolean isShardInitialized() {
index 2c18eaa86fff9c3d6449068b723906defc930427..0b7fcf0ed507160866b2e220f5e21b93d43bf3fb 100644 (file)
@@ -21,7 +21,7 @@ public class FindPrimary implements Serializable {
     private final String shardName;
     private final boolean waitUntilReady;
 
-    public FindPrimary(String shardName, boolean waitUntilReady){
+    public FindPrimary(String shardName, boolean waitUntilReady) {
 
         Preconditions.checkNotNull(shardName, "shardName should not be null");
 
@@ -40,8 +40,8 @@ public class FindPrimary implements Serializable {
     @Override
     public String toString() {
         StringBuilder builder = new StringBuilder();
-        builder.append("FindPrimary [shardName=").append(shardName).append(", waitUntilReady=").append(waitUntilReady)
-                .append("]");
+        builder.append(getClass().getName()).append(" [shardName=").append(shardName).append(", waitUntilReady=")
+               .append(waitUntilReady).append("]");
         return builder.toString();
     }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/LocalPrimaryShardFound.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/LocalPrimaryShardFound.java
new file mode 100644 (file)
index 0000000..e19dcd6
--- /dev/null
@@ -0,0 +1,45 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import com.google.common.base.Preconditions;
+import javax.annotation.Nonnull;
+import org.apache.commons.lang3.ObjectUtils;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+
+/**
+ * Local message sent in reply to FindPrimaryShard to indicate the primary shard is local to the caller.
+ *
+ * @author Thomas Pantelis
+ */
+public class LocalPrimaryShardFound {
+
+    private final String primaryPath;
+    private final DataTree localShardDataTree;
+
+    public LocalPrimaryShardFound(@Nonnull String primaryPath, @Nonnull DataTree localShardDataTree) {
+        this.primaryPath = Preconditions.checkNotNull(primaryPath);
+        this.localShardDataTree = Preconditions.checkNotNull(localShardDataTree);
+    }
+
+    public @Nonnull String getPrimaryPath() {
+        return primaryPath;
+    }
+
+    public @Nonnull DataTree getLocalShardDataTree() {
+        return localShardDataTree;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder builder = new StringBuilder();
+        builder.append("LocalPrimaryShardFound [primaryPath=").append(primaryPath).append(", localShardDataTree=")
+                .append(ObjectUtils.identityToString(localShardDataTree)).append("]");
+        return builder.toString();
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RemoteFindPrimary.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RemoteFindPrimary.java
new file mode 100644 (file)
index 0000000..820512e
--- /dev/null
@@ -0,0 +1,21 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+/**
+ * A remote message sent to locate the primary shard.
+ *
+ * @author Thomas Pantelis
+ */
+public class RemoteFindPrimary extends FindPrimary {
+    private static final long serialVersionUID = 1L;
+
+    public RemoteFindPrimary(String shardName, boolean waitUntilReady) {
+        super(shardName, waitUntilReady);
+    }
+}
@@ -10,12 +10,15 @@ package org.opendaylight.controller.cluster.datastore.messages;
 
 import java.io.Serializable;
 
-public class PrimaryFound implements Serializable {
+/**
+ * Local or remote message sent in reply to FindPrimaryShard to indicate the primary shard is remote to the caller.
+ */
+public class RemotePrimaryShardFound implements Serializable {
     private static final long serialVersionUID = 1L;
 
     private final String primaryPath;
 
-    public PrimaryFound(final String primaryPath) {
+    public RemotePrimaryShardFound(final String primaryPath) {
         this.primaryPath = primaryPath;
     }
 
@@ -23,33 +26,10 @@ public class PrimaryFound implements Serializable {
         return primaryPath;
     }
 
-    @Override
-    public boolean equals(final Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-
-        PrimaryFound that = (PrimaryFound) o;
-
-        if (!primaryPath.equals(that.primaryPath)) {
-            return false;
-        }
-
-        return true;
-    }
-
-    @Override
-    public int hashCode() {
-        return primaryPath.hashCode();
-    }
-
     @Override
     public String toString() {
         StringBuilder builder = new StringBuilder();
-        builder.append("PrimaryFound [primaryPath=").append(primaryPath).append("]");
+        builder.append("RemotePrimaryShardFound [primaryPath=").append(primaryPath).append("]");
         return builder.toString();
     }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ShardLeaderStateChanged.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ShardLeaderStateChanged.java
new file mode 100644 (file)
index 0000000..d9a55ab
--- /dev/null
@@ -0,0 +1,36 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+
+/**
+ * A local message derived from LeaderStateChanged containing additional Shard-specific info that is sent
+ * when some state of the shard leader has changed. This message is used by the ShardManager to maintain
+ * current Shard information.
+ *
+ * @author Thomas Pantelis
+ */
+public class ShardLeaderStateChanged extends LeaderStateChanged {
+
+    private final Optional<DataTree> localShardDataTree;
+
+    public ShardLeaderStateChanged(@Nonnull String memberId, @Nonnull String leaderId,
+            @Nonnull Optional<DataTree> localShardDataTree) {
+        super(memberId, leaderId);
+        this.localShardDataTree = Preconditions.checkNotNull(localShardDataTree);
+    }
+
+    public @Nonnull Optional<DataTree> getLocalShardDataTree() {
+        return localShardDataTree;
+    }
+}
index afa773b4615e8905fffea10d42dbabd1cb046b0e..73f1a8f328d2a671c5699d770b782a860afcc7f7 100644 (file)
@@ -42,10 +42,11 @@ import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException
 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
+import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.reporting.MetricsReporter;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
@@ -209,14 +210,13 @@ public class ActorContext {
         return future.transform(new Mapper<Object, PrimaryShardInfo>() {
             @Override
             public PrimaryShardInfo checkedApply(Object response) throws Exception {
-                if(response instanceof PrimaryFound) {
-                    PrimaryFound found = (PrimaryFound)response;
-
-                    LOG.debug("Primary found {}", found.getPrimaryPath());
-                    ActorSelection actorSelection = actorSystem.actorSelection(found.getPrimaryPath());
-                    PrimaryShardInfo info = new PrimaryShardInfo(actorSelection, Optional.<DataTree>absent());
-                    primaryShardInfoCache.put(shardName, Futures.successful(info));
-                    return info;
+                if(response instanceof RemotePrimaryShardFound) {
+                    LOG.debug("findPrimaryShardAsync received: {}", response);
+                    return onPrimaryShardFound(shardName, ((RemotePrimaryShardFound)response).getPrimaryPath(), null);
+                } else if(response instanceof LocalPrimaryShardFound) {
+                    LOG.debug("findPrimaryShardAsync received: {}", response);
+                    LocalPrimaryShardFound found = (LocalPrimaryShardFound)response;
+                    return onPrimaryShardFound(shardName, found.getPrimaryPath(), found.getLocalShardDataTree());
                 } else if(response instanceof NotInitializedException) {
                     throw (NotInitializedException)response;
                 } else if(response instanceof PrimaryNotFoundException) {
@@ -231,6 +231,14 @@ public class ActorContext {
         }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher());
     }
 
+    private PrimaryShardInfo onPrimaryShardFound(String shardName, String primaryActorPath,
+            DataTree localShardDataTree) {
+        ActorSelection actorSelection = actorSystem.actorSelection(primaryActorPath);
+        PrimaryShardInfo info = new PrimaryShardInfo(actorSelection, Optional.fromNullable(localShardDataTree));
+        primaryShardInfoCache.put(shardName, Futures.successful(info));
+        return info;
+    }
+
     /**
      * Finds a local shard given its shard name and return it's ActorRef
      *
index b676cf225c801039d1e679298e6d7cb23d2808ac..645890dcb9c0f058bddbca5d06c4bd6f6bdcabe7 100644 (file)
@@ -2,6 +2,7 @@ package org.opendaylight.controller.cluster.datastore;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
@@ -21,6 +22,7 @@ import akka.persistence.RecoveryCompleted;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
 import akka.util.Timeout;
+import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
@@ -49,9 +51,11 @@ import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIde
 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
+import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
+import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
@@ -63,6 +67,7 @@ import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.Await;
@@ -154,7 +159,8 @@ public class ShardManagerTest extends AbstractActorTest {
             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
             shardManager.tell(new ActorInitialized(), mockShardActor);
 
-            shardManager.tell(new LeaderStateChanged(memberId, memberId), getRef());
+            DataTree mockDataTree = mock(DataTree.class);
+            shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree)), getRef());
 
             MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class);
             shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
@@ -162,9 +168,30 @@ public class ShardManagerTest extends AbstractActorTest {
 
             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
 
-            PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
+            LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
+            assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
+        }};
+    }
+
+    @Test
+    public void testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+
+            shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+            shardManager.tell(new ActorInitialized(), mockShardActor);
+
+            String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
+            String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
+            shardManager.tell(new RoleChangeNotification(memberId1,
+                    RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
+            shardManager.tell(new LeaderStateChanged(memberId1, memberId2), mockShardActor);
+
+            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
+
+            expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
         }};
     }
 
@@ -182,11 +209,11 @@ public class ShardManagerTest extends AbstractActorTest {
             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
             shardManager.tell(new RoleChangeNotification(memberId1,
                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
-            shardManager.tell(new LeaderStateChanged(memberId1, memberId2), mockShardActor);
+            shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, Optional.<DataTree>absent()), mockShardActor);
 
             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
 
-            PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
+            RemotePrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
                     primaryFound.getPrimaryPath().contains("member-2-shard-default"));
         }};
@@ -233,13 +260,15 @@ public class ShardManagerTest extends AbstractActorTest {
 
             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
 
-            shardManager.tell(new LeaderStateChanged(memberId, memberId), mockShardActor);
+            DataTree mockDataTree = mock(DataTree.class);
+            shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree)), mockShardActor);
 
             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
 
-            PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
+            LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
+            assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
         }};
     }
 
@@ -266,11 +295,13 @@ public class ShardManagerTest extends AbstractActorTest {
 
             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
 
-            shardManager.tell(new LeaderStateChanged(memberId, memberId), mockShardActor);
+            DataTree mockDataTree = mock(DataTree.class);
+            shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree)), mockShardActor);
 
-            PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
+            LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
+            assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
 
             expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
         }};
@@ -362,7 +393,8 @@ public class ShardManagerTest extends AbstractActorTest {
             shardManager2.tell(new ActorInitialized(), mockShardActor2);
 
             String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
-            shardManager2.tell(new LeaderStateChanged(memberId2, memberId2), mockShardActor2);
+            shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2,
+                    Optional.of(mock(DataTree.class))), mockShardActor2);
             shardManager2.tell(new RoleChangeNotification(memberId2,
                     RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
 
@@ -370,7 +402,7 @@ public class ShardManagerTest extends AbstractActorTest {
 
             shardManager1.tell(new FindPrimary("astronauts", false), getRef());
 
-            PrimaryFound found = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
+            RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
             String path = found.getPrimaryPath();
             assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config"));
 
@@ -639,7 +671,7 @@ public class ShardManagerTest extends AbstractActorTest {
     }
 
     @Test
-    public void testRoleChangeNotificationAndLeaderStateChangedReleaseReady() throws Exception {
+    public void testRoleChangeNotificationAndShardLeaderStateChangedReleaseReady() throws Exception {
         new JavaTestKit(getSystem()) {
             {
                 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
@@ -650,7 +682,8 @@ public class ShardManagerTest extends AbstractActorTest {
 
                 verify(ready, never()).countDown();
 
-                shardManager.underlyingActor().onReceiveCommand(new LeaderStateChanged(memberId, memberId));
+                shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId, memberId,
+                        Optional.of(mock(DataTree.class))));
 
                 verify(ready, times(1)).countDown();
 
@@ -658,7 +691,7 @@ public class ShardManagerTest extends AbstractActorTest {
     }
 
     @Test
-    public void testRoleChangeNotificationToFollowerWithLeaderStateChangedReleaseReady() throws Exception {
+    public void testRoleChangeNotificationToFollowerWithShardLeaderStateChangedReleaseReady() throws Exception {
         new JavaTestKit(getSystem()) {
             {
                 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
@@ -669,13 +702,37 @@ public class ShardManagerTest extends AbstractActorTest {
 
                 verify(ready, never()).countDown();
 
-                shardManager.underlyingActor().onReceiveCommand(new LeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix));
+                shardManager.underlyingActor().onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
+
+                shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId,
+                        "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class))));
 
                 verify(ready, times(1)).countDown();
 
             }};
     }
 
+    @Test
+    public void testReadyCountDownForMemberUpAfterLeaderStateChanged() throws Exception {
+        new JavaTestKit(getSystem()) {
+            {
+                TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
+
+                String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
+                shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
+                        memberId, null, RaftState.Follower.name()));
+
+                verify(ready, never()).countDown();
+
+                shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId,
+                        "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class))));
+
+                shardManager.underlyingActor().onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
+
+                verify(ready, times(1)).countDown();
+
+            }};
+    }
 
     @Test
     public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
index 4cbc121a950e87d22228e66418d5a77c96de3d0f..1ecf0971c10d3f0a429f3c56ea36d84eaa44befc 100644 (file)
@@ -4,6 +4,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.inOrder;
@@ -29,7 +30,6 @@ import com.google.common.util.concurrent.Uninterruptibles;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
@@ -59,6 +59,7 @@ import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeList
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
+import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
@@ -66,7 +67,6 @@ import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
-import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
@@ -83,8 +83,10 @@ import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
+import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
@@ -2092,14 +2094,27 @@ public class ShardTest extends AbstractShardTest {
 
                 shard.tell(new RegisterRoleChangeListener(), listener);
 
-                // TODO: MessageCollectorActor exists as a test util in both the akka-raft and distributed-datastore
-                // projects. Need to move it to commons as a regular utility and then we can get rid of this arbitrary
-                // sleep.
-                Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+                MessageCollectorActor.expectFirstMatching(listener, RegisterRoleChangeListenerReply.class);
 
-                List<Object> allMatching = MessageCollectorActor.getAllMatching(listener, RegisterRoleChangeListenerReply.class);
+                ShardLeaderStateChanged leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
+                        ShardLeaderStateChanged.class);
+                assertEquals("getLocalShardDataTree present", true,
+                        leaderStateChanged.getLocalShardDataTree().isPresent());
+                assertSame("getLocalShardDataTree", shard.underlyingActor().getDataStore().getDataTree(),
+                        leaderStateChanged.getLocalShardDataTree().get());
 
-                assertEquals(1, allMatching.size());
+                MessageCollectorActor.clearMessages(listener);
+
+                // Force a leader change
+
+                shard.tell(new RequestVote(10000, "member2", 50, 50), getRef());
+
+                leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
+                        ShardLeaderStateChanged.class);
+                assertEquals("getLocalShardDataTree present", false,
+                        leaderStateChanged.getLocalShardDataTree().isPresent());
+
+                shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
             }
         };
     }
index bc80937897c97104a7b8a17ac1d0cf9eff672143..031463b2b958efc9803830a56ee4436eacd32674 100644 (file)
@@ -4,6 +4,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.doReturn;
@@ -30,6 +31,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang.time.StopWatch;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.Mockito;
 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.Configuration;
@@ -39,10 +41,12 @@ import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedEx
 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
+import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Await;
@@ -404,7 +408,7 @@ public class ActorContextTest extends AbstractActorTest{
     }
 
     @Test
-    public void testFindPrimaryShardAsyncPrimaryFound() throws Exception {
+    public void testFindPrimaryShardAsyncRemotePrimaryFound() throws Exception {
 
             TestActorRef<MessageCollectorActor> shardManager =
                     TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
@@ -418,11 +422,10 @@ public class ActorContextTest extends AbstractActorTest{
                             mock(Configuration.class), dataStoreContext) {
                         @Override
                         protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
-                            return Futures.successful((Object) new PrimaryFound(expPrimaryPath));
+                            return Futures.successful((Object) new RemotePrimaryShardFound(expPrimaryPath));
                         }
                     };
 
-
             Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
             PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
 
@@ -444,7 +447,50 @@ public class ActorContextTest extends AbstractActorTest{
             cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
 
             assertNull(cached);
+    }
+
+    @Test
+    public void testFindPrimaryShardAsyncLocalPrimaryFound() throws Exception {
+
+            TestActorRef<MessageCollectorActor> shardManager =
+                    TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
+
+            DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
+                    shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
 
+            final DataTree mockDataTree = Mockito.mock(DataTree.class);
+            final String expPrimaryPath = "akka://test-system/find-primary-shard";
+            ActorContext actorContext =
+                    new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
+                            mock(Configuration.class), dataStoreContext) {
+                        @Override
+                        protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
+                            return Futures.successful((Object) new LocalPrimaryShardFound(expPrimaryPath, mockDataTree));
+                        }
+                    };
+
+            Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
+            PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
+
+            assertNotNull(actual);
+            assertEquals("LocalShardDataTree present", true, actual.getLocalShardDataTree().isPresent());
+            assertSame("LocalShardDataTree", mockDataTree, actual.getLocalShardDataTree().get());
+            assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
+                    expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
+
+            Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
+
+            PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
+
+            assertEquals(cachedInfo, actual);
+
+            // Wait for 200 Milliseconds. The cached entry should have been removed.
+
+            Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+
+            cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
+
+            assertNull(cached);
     }
 
     @Test
@@ -521,8 +567,8 @@ public class ActorContextTest extends AbstractActorTest{
 
             TestActorRef<MockShardManager> shardManagerActorRef = TestActorRef.create(getSystem(), MockShardManager.props());
             MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor();
-            shardManagerActor.addFindPrimaryResp("shard1", new PrimaryFound(shardActorRef1.path().toString()));
-            shardManagerActor.addFindPrimaryResp("shard2", new PrimaryFound(shardActorRef2.path().toString()));
+            shardManagerActor.addFindPrimaryResp("shard1", new RemotePrimaryShardFound(shardActorRef1.path().toString()));
+            shardManagerActor.addFindPrimaryResp("shard2", new RemotePrimaryShardFound(shardActorRef2.path().toString()));
             shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found"));
 
             Configuration mockConfig = mock(Configuration.class);
index 810b270cfcee82aab53ca55f96a777e87bdc3141..5d44033cd609cf97a3ac14522e6cd201a46cbea6 100644 (file)
@@ -74,7 +74,7 @@ public class MockClusterWrapper implements ClusterWrapper{
     }
 
 
-    private static ClusterEvent.MemberUp createMemberUp(String memberName, String address) {
+    public static ClusterEvent.MemberUp createMemberUp(String memberName, String address) {
         akka.cluster.UniqueAddress uniqueAddress = new UniqueAddress(
             AddressFromURIString.parse(address), 55);
 
index 10399ffeffa85a20b7bb0c3c8de77aff069a86ac..4d8463adae66cb1e53ba06b2014b0bc8702fc4b4 100644 (file)
@@ -90,7 +90,7 @@ public class JsonNormalizedNodeBodyReader extends AbstractIdentifierAwareJaxRsPr
             NormalizedNode<?, ?> partialResult = resultHolder.getResult();
             final NormalizedNode<?, ?> result;
 
-            // unwrap result from augmentation and choice nodes on PUT
+            // FIXME: Also II should be updated unwrap result from augmentation and choice nodes on PUT
             if (!isPost()) {
                 while (partialResult instanceof AugmentationNode || partialResult instanceof ChoiceNode) {
                     final Object childNode = ((DataContainerNode) partialResult).getValue().iterator().next();
@@ -98,7 +98,7 @@ public class JsonNormalizedNodeBodyReader extends AbstractIdentifierAwareJaxRsPr
                 }
             }
 
-            if (partialResult instanceof MapNode) {
+            if (partialResult instanceof MapNode && !isPost()) {
                 result = Iterables.getOnlyElement(((MapNode) partialResult).getValue());
             } else {
                 result = partialResult;
index 0378ae40ee39aa09a2c2a40199b4a5161964bd50..6a3adcccd6576b8f7ac766eb521264b05a04f6b8 100644 (file)
@@ -39,6 +39,7 @@ import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
@@ -204,19 +205,36 @@ public class BrokerFacade {
             final YangInstanceIdentifier parentPath, final NormalizedNode<?, ?> payload, final SchemaContext schemaContext) {
         // FIXME: This is doing correct post for container and list children
         //        not sure if this will work for choice case
-        final YangInstanceIdentifier path;
-        if(payload instanceof MapEntryNode) {
-            path = parentPath.node(payload.getNodeType()).node(payload.getIdentifier());
+        if(payload instanceof MapNode) {
+            final YangInstanceIdentifier mapPath = parentPath.node(payload.getIdentifier());
+            final NormalizedNode<?, ?> emptySubtree = ImmutableNodes.fromInstanceId(schemaContext, mapPath);
+            rWTransaction.merge(datastore, YangInstanceIdentifier.create(emptySubtree.getIdentifier()), emptySubtree);
+            ensureParentsByMerge(datastore, mapPath, rWTransaction, schemaContext);
+            for(final MapEntryNode child : ((MapNode) payload).getValue()) {
+                final YangInstanceIdentifier childPath = mapPath.node(child.getIdentifier());
+                checkItemDoesNotExists(rWTransaction, datastore, childPath);
+                rWTransaction.put(datastore, childPath, child);
+            }
         } else {
-            path = parentPath.node(payload.getIdentifier());
+            final YangInstanceIdentifier path;
+            if(payload instanceof MapEntryNode) {
+                path = parentPath.node(payload.getNodeType()).node(payload.getIdentifier());
+            } else {
+                path = parentPath.node(payload.getIdentifier());
+            }
+            checkItemDoesNotExists(rWTransaction,datastore, path);
+            ensureParentsByMerge(datastore, path, rWTransaction, schemaContext);
+            rWTransaction.put(datastore, path, payload);
         }
+        return rWTransaction.submit();
+    }
 
-        final ListenableFuture<Optional<NormalizedNode<?, ?>>> futureDatastoreData = rWTransaction.read(datastore, path);
+    private void checkItemDoesNotExists(final DOMDataReadWriteTransaction rWTransaction,final LogicalDatastoreType store, final YangInstanceIdentifier path) {
+        final ListenableFuture<Boolean> futureDatastoreData = rWTransaction.exists(store, path);
         try {
-            final Optional<NormalizedNode<?, ?>> optionalDatastoreData = futureDatastoreData.get();
-            if (optionalDatastoreData.isPresent() && payload.equals(optionalDatastoreData.get())) {
+            if (futureDatastoreData.get()) {
                 final String errMsg = "Post Configuration via Restconf was not executed because data already exists";
-                LOG.trace(errMsg + ":{}", path);
+                LOG.debug(errMsg + ":{}", path);
                 rWTransaction.cancel();
                 throw new RestconfDocumentedException("Data already exists for path: " + path, ErrorType.PROTOCOL,
                         ErrorTag.DATA_EXISTS);
@@ -225,10 +243,6 @@ public class BrokerFacade {
             LOG.trace("It wasn't possible to get data loaded from datastore at path " + path);
         }
 
-        ensureParentsByMerge(datastore, path, rWTransaction, schemaContext);
-        rWTransaction.merge(datastore, path, payload);
-        LOG.trace("Post " + datastore.name() + " via Restconf: {}", path);
-        return rWTransaction.submit();
     }
 
     private CheckedFuture<Void, TransactionCommitFailedException> putDataViaTransaction(
index 6542396612f6a2c4d57bc0226567e4ac507681c3..9c5de08c5ce1d3346c713986d3d1ce0d31ee84b0 100644 (file)
@@ -197,13 +197,8 @@ public class BrokerFacadeTest {
         @SuppressWarnings("unchecked")
         final CheckedFuture<Void, TransactionCommitFailedException> expFuture = mock(CheckedFuture.class);
 
-        final NormalizedNode<?, ?> dummyNode2 = createDummyNode("dummy:namespace2", "2014-07-01", "dummy local name2");
-
-        when(rwTransaction.read(eq(LogicalDatastoreType.CONFIGURATION), any(YangInstanceIdentifier.class))).thenReturn(
-                wrapDummyNode(dummyNode2));
-
         when(rwTransaction.exists(eq(LogicalDatastoreType.CONFIGURATION), any(YangInstanceIdentifier.class))).thenReturn(
-            wrapExistence(true));
+            wrapExistence(false));
 
 
         when(rwTransaction.submit()).thenReturn(expFuture);
@@ -215,14 +210,16 @@ public class BrokerFacadeTest {
 
         final InOrder inOrder = inOrder(domDataBroker, rwTransaction);
         inOrder.verify(domDataBroker).newReadWriteTransaction();
-        inOrder.verify(rwTransaction).merge(LogicalDatastoreType.CONFIGURATION, instanceID, dummyNode);
+        inOrder.verify(rwTransaction).exists(LogicalDatastoreType.CONFIGURATION, instanceID);
+        inOrder.verify(rwTransaction).put(LogicalDatastoreType.CONFIGURATION, instanceID, dummyNode);
         inOrder.verify(rwTransaction).submit();
     }
 
     @Test(expected = RestconfDocumentedException.class)
     public void testCommitConfigurationDataPostAlreadyExists() {
-        when(rwTransaction.read(eq(LogicalDatastoreType.CONFIGURATION), any(YangInstanceIdentifier.class))).thenReturn(
-                dummyNodeInFuture);
+        final CheckedFuture<Boolean, ReadFailedException> successFuture = Futures.immediateCheckedFuture(Boolean.TRUE);
+        when(rwTransaction.exists(eq(LogicalDatastoreType.CONFIGURATION), any(YangInstanceIdentifier.class))).thenReturn(
+                successFuture);
         try {
             // Schema context is only necessary for ensuring parent structure
             brokerFacade.commitConfigurationDataPost((SchemaContext)null, instanceID, dummyNode);