From: Tony Tkacik Date: Wed, 29 Apr 2015 07:04:37 +0000 (+0000) Subject: Merge "BUG 3066 : Optimistic lock failed, on NetconfStateUpdate" into stable/lithium X-Git-Tag: release/lithium~202 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=commitdiff_plain;h=836834b4bdec55222de4843a144c8800e0e02dce;hp=9a0e32fddf11369466b37b17d02e322c96c55eb5;p=controller.git Merge "BUG 3066 : Optimistic lock failed, on NetconfStateUpdate" into stable/lithium --- diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 1738cc5fe0..d82528c48a 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -298,7 +298,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { Optional roleChangeNotifier = getRoleChangeNotifier(); if(!Objects.equal(oldBehaviorLeaderId, currentBehavior.getLeaderId())) { if(roleChangeNotifier.isPresent()) { - roleChangeNotifier.get().tell(new LeaderStateChanged(getId(), currentBehavior.getLeaderId()), getSelf()); + roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), currentBehavior.getLeaderId()), getSelf()); } onLeaderChanged(oldBehaviorLeaderId, currentBehavior.getLeaderId()); @@ -311,6 +311,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } } + protected LeaderStateChanged newLeaderStateChanged(String memberId, String leaderId) { + return new LeaderStateChanged(memberId, leaderId); + } + /** * When a derived RaftActor needs to persist something it must call * persistData. diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/LeaderStateChanged.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/LeaderStateChanged.java index ec35b03b0a..23c95ecc99 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/LeaderStateChanged.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/LeaderStateChanged.java @@ -7,29 +7,30 @@ */ package org.opendaylight.controller.cluster.notifications; -import java.io.Serializable; +import com.google.common.base.Preconditions; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + /** - * A message initiated internally from the RaftActor when some state of a leader has changed + * A local message initiated internally from the RaftActor when some state of a leader has changed. * * @author Thomas Pantelis */ -public class LeaderStateChanged implements Serializable { - private static final long serialVersionUID = 1L; - +public class LeaderStateChanged { private final String memberId; private final String leaderId; - public LeaderStateChanged(String memberId, String leaderId) { - this.memberId = memberId; + public LeaderStateChanged(@Nonnull String memberId, @Nullable String leaderId) { + this.memberId = Preconditions.checkNotNull(memberId); this.leaderId = leaderId; } - public String getMemberId() { + public @Nonnull String getMemberId() { return memberId; } - public String getLeaderId() { + public @Nullable String getLeaderId() { return leaderId; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 62d3259a71..e62e918e5e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -48,12 +48,14 @@ import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTran import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener; +import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; import org.opendaylight.controller.cluster.datastore.utils.Dispatchers; import org.opendaylight.controller.cluster.datastore.utils.MessageTracker; +import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier; import org.opendaylight.controller.cluster.raft.RaftActor; @@ -63,6 +65,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyn import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType; @@ -265,6 +268,12 @@ public class Shard extends RaftActor { return roleChangeNotifier; } + @Override + protected LeaderStateChanged newLeaderStateChanged(String memberId, String leaderId) { + return new ShardLeaderStateChanged(memberId, leaderId, + isLeader() ? Optional.of(store.getDataTree()) : Optional.absent()); + } + private void onDatastoreContext(DatastoreContext context) { datastoreContext = context; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index cff44b13cb..f4fa7b3a97 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -24,6 +24,7 @@ import akka.persistence.RecoveryFailure; import akka.serialization.Serialization; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Objects; +import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.base.Supplier; @@ -54,17 +55,20 @@ import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.Sha import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; +import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved; -import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; +import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary; +import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound; +import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import org.opendaylight.controller.cluster.datastore.utils.Dispatchers; -import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; import org.opendaylight.controller.cluster.notifications.RoleChangeNotification; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; @@ -185,27 +189,31 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message); } else if(message instanceof ShardNotInitializedTimeout) { onShardNotInitializedTimeout((ShardNotInitializedTimeout)message); - } else if(message instanceof LeaderStateChanged) { - onLeaderStateChanged((LeaderStateChanged)message); + } else if(message instanceof ShardLeaderStateChanged) { + onLeaderStateChanged((ShardLeaderStateChanged)message); } else { unknownMessage(message); } } - private void onLeaderStateChanged(LeaderStateChanged leaderStateChanged) { + private void checkReady(){ + if (isReadyWithLeaderId()) { + LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}", + persistenceId(), type, waitTillReadyCountdownLatch.getCount()); + + waitTillReadyCountdownLatch.countDown(); + } + } + + private void onLeaderStateChanged(ShardLeaderStateChanged leaderStateChanged) { LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged); ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId()); if(shardInformation != null) { + shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree()); shardInformation.setLeaderId(leaderStateChanged.getLeaderId()); - if (isReadyWithLeaderId()) { - LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}", - persistenceId(), type, waitTillReadyCountdownLatch.getCount()); - - waitTillReadyCountdownLatch.countDown(); - } - + checkReady(); } else { LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId()); } @@ -249,14 +257,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId()); if(shardInformation != null) { shardInformation.setRole(roleChanged.getNewRole()); - - if (isReadyWithLeaderId()) { - LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}", - persistenceId(), type, waitTillReadyCountdownLatch.getCount()); - - waitTillReadyCountdownLatch.countDown(); - } - + checkReady(); mBean.setSyncStatus(isInSync()); } } @@ -439,6 +440,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { info.updatePeerAddress(getShardIdentifier(memberName, shardName).toString(), getShardActorPath(shardName, memberName), getSelf()); } + + checkReady(); } private void onDatastoreContext(DatastoreContext context) { @@ -510,6 +513,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { LOG.debug("{}: In findPrimary: {}", persistenceId(), message); final String shardName = message.getShardName(); + final boolean canReturnLocalShardState = !(message instanceof RemoteFindPrimary); // First see if the there is a local replica for the shard final ShardInformation info = localShards.get(shardName); @@ -517,7 +521,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { sendResponse(info, message.isWaitUntilReady(), true, new Supplier() { @Override public Object get() { - Object found = new PrimaryFound(info.getSerializedLeaderActor()); + String primaryPath = info.getSerializedLeaderActor(); + Object found = canReturnLocalShardState && info.isLeader() ? + new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) : + new RemotePrimaryShardFound(primaryPath); if(LOG.isDebugEnabled()) { LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found); @@ -537,7 +544,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}", persistenceId(), shardName, path); - getContext().actorSelection(path).forward(message, getContext()); + getContext().actorSelection(path).forward(new RemoteFindPrimary(shardName, + message.isWaitUntilReady()), getContext()); return; } } @@ -665,6 +673,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private ActorRef actor; private ActorPath actorPath; private final Map peerAddresses; + private Optional localShardDataTree; // flag that determines if the actor is ready for business private boolean actorInitialized = false; @@ -703,6 +712,14 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return shardId; } + void setLocalDataTree(Optional localShardDataTree) { + this.localShardDataTree = localShardDataTree; + } + + Optional getLocalShardDataTree() { + return localShardDataTree; + } + Map getPeerAddresses() { return peerAddresses; } @@ -731,7 +748,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } boolean isShardReadyWithLeaderId() { - return isShardReady() && (isLeader() || peerAddresses.containsKey(leaderId)); + return isShardReady() && (isLeader() || peerAddresses.get(leaderId) != null); } boolean isShardInitialized() { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/FindPrimary.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/FindPrimary.java index 2c18eaa86f..0b7fcf0ed5 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/FindPrimary.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/FindPrimary.java @@ -21,7 +21,7 @@ public class FindPrimary implements Serializable { private final String shardName; private final boolean waitUntilReady; - public FindPrimary(String shardName, boolean waitUntilReady){ + public FindPrimary(String shardName, boolean waitUntilReady) { Preconditions.checkNotNull(shardName, "shardName should not be null"); @@ -40,8 +40,8 @@ public class FindPrimary implements Serializable { @Override public String toString() { StringBuilder builder = new StringBuilder(); - builder.append("FindPrimary [shardName=").append(shardName).append(", waitUntilReady=").append(waitUntilReady) - .append("]"); + builder.append(getClass().getName()).append(" [shardName=").append(shardName).append(", waitUntilReady=") + .append(waitUntilReady).append("]"); return builder.toString(); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/LocalPrimaryShardFound.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/LocalPrimaryShardFound.java new file mode 100644 index 0000000000..e19dcd65b3 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/LocalPrimaryShardFound.java @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.messages; + +import com.google.common.base.Preconditions; +import javax.annotation.Nonnull; +import org.apache.commons.lang3.ObjectUtils; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; + +/** + * Local message sent in reply to FindPrimaryShard to indicate the primary shard is local to the caller. + * + * @author Thomas Pantelis + */ +public class LocalPrimaryShardFound { + + private final String primaryPath; + private final DataTree localShardDataTree; + + public LocalPrimaryShardFound(@Nonnull String primaryPath, @Nonnull DataTree localShardDataTree) { + this.primaryPath = Preconditions.checkNotNull(primaryPath); + this.localShardDataTree = Preconditions.checkNotNull(localShardDataTree); + } + + public @Nonnull String getPrimaryPath() { + return primaryPath; + } + + public @Nonnull DataTree getLocalShardDataTree() { + return localShardDataTree; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("LocalPrimaryShardFound [primaryPath=").append(primaryPath).append(", localShardDataTree=") + .append(ObjectUtils.identityToString(localShardDataTree)).append("]"); + return builder.toString(); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RemoteFindPrimary.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RemoteFindPrimary.java new file mode 100644 index 0000000000..820512e096 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RemoteFindPrimary.java @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.messages; + +/** + * A remote message sent to locate the primary shard. + * + * @author Thomas Pantelis + */ +public class RemoteFindPrimary extends FindPrimary { + private static final long serialVersionUID = 1L; + + public RemoteFindPrimary(String shardName, boolean waitUntilReady) { + super(shardName, waitUntilReady); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryFound.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RemotePrimaryShardFound.java similarity index 53% rename from opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryFound.java rename to opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RemotePrimaryShardFound.java index 4c154d43ae..662eefd9d1 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryFound.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RemotePrimaryShardFound.java @@ -10,12 +10,15 @@ package org.opendaylight.controller.cluster.datastore.messages; import java.io.Serializable; -public class PrimaryFound implements Serializable { +/** + * Local or remote message sent in reply to FindPrimaryShard to indicate the primary shard is remote to the caller. + */ +public class RemotePrimaryShardFound implements Serializable { private static final long serialVersionUID = 1L; private final String primaryPath; - public PrimaryFound(final String primaryPath) { + public RemotePrimaryShardFound(final String primaryPath) { this.primaryPath = primaryPath; } @@ -23,33 +26,10 @@ public class PrimaryFound implements Serializable { return primaryPath; } - @Override - public boolean equals(final Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - PrimaryFound that = (PrimaryFound) o; - - if (!primaryPath.equals(that.primaryPath)) { - return false; - } - - return true; - } - - @Override - public int hashCode() { - return primaryPath.hashCode(); - } - @Override public String toString() { StringBuilder builder = new StringBuilder(); - builder.append("PrimaryFound [primaryPath=").append(primaryPath).append("]"); + builder.append("RemotePrimaryShardFound [primaryPath=").append(primaryPath).append("]"); return builder.toString(); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ShardLeaderStateChanged.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ShardLeaderStateChanged.java new file mode 100644 index 0000000000..d9a55ab1e9 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ShardLeaderStateChanged.java @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.messages; + +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import javax.annotation.Nonnull; +import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; + +/** + * A local message derived from LeaderStateChanged containing additional Shard-specific info that is sent + * when some state of the shard leader has changed. This message is used by the ShardManager to maintain + * current Shard information. + * + * @author Thomas Pantelis + */ +public class ShardLeaderStateChanged extends LeaderStateChanged { + + private final Optional localShardDataTree; + + public ShardLeaderStateChanged(@Nonnull String memberId, @Nonnull String leaderId, + @Nonnull Optional localShardDataTree) { + super(memberId, leaderId); + this.localShardDataTree = Preconditions.checkNotNull(localShardDataTree); + } + + public @Nonnull Optional getLocalShardDataTree() { + return localShardDataTree; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java index afa773b461..73f1a8f328 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java @@ -42,10 +42,11 @@ import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; +import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; -import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; +import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import org.opendaylight.controller.cluster.reporting.MetricsReporter; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; @@ -209,14 +210,13 @@ public class ActorContext { return future.transform(new Mapper() { @Override public PrimaryShardInfo checkedApply(Object response) throws Exception { - if(response instanceof PrimaryFound) { - PrimaryFound found = (PrimaryFound)response; - - LOG.debug("Primary found {}", found.getPrimaryPath()); - ActorSelection actorSelection = actorSystem.actorSelection(found.getPrimaryPath()); - PrimaryShardInfo info = new PrimaryShardInfo(actorSelection, Optional.absent()); - primaryShardInfoCache.put(shardName, Futures.successful(info)); - return info; + if(response instanceof RemotePrimaryShardFound) { + LOG.debug("findPrimaryShardAsync received: {}", response); + return onPrimaryShardFound(shardName, ((RemotePrimaryShardFound)response).getPrimaryPath(), null); + } else if(response instanceof LocalPrimaryShardFound) { + LOG.debug("findPrimaryShardAsync received: {}", response); + LocalPrimaryShardFound found = (LocalPrimaryShardFound)response; + return onPrimaryShardFound(shardName, found.getPrimaryPath(), found.getLocalShardDataTree()); } else if(response instanceof NotInitializedException) { throw (NotInitializedException)response; } else if(response instanceof PrimaryNotFoundException) { @@ -231,6 +231,14 @@ public class ActorContext { }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher()); } + private PrimaryShardInfo onPrimaryShardFound(String shardName, String primaryActorPath, + DataTree localShardDataTree) { + ActorSelection actorSelection = actorSystem.actorSelection(primaryActorPath); + PrimaryShardInfo info = new PrimaryShardInfo(actorSelection, Optional.fromNullable(localShardDataTree)); + primaryShardInfoCache.put(shardName, Futures.successful(info)); + return info; + } + /** * Finds a local shard given its shard name and return it's ActorRef * diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java index b676cf225c..645890dcb9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java @@ -2,6 +2,7 @@ package org.opendaylight.controller.cluster.datastore; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -21,6 +22,7 @@ import akka.persistence.RecoveryCompleted; import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; import akka.util.Timeout; +import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; @@ -49,9 +51,11 @@ import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIde import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; +import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; -import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; +import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound; +import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor; import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper; @@ -63,6 +67,7 @@ import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.Await; @@ -154,7 +159,8 @@ public class ShardManagerTest extends AbstractActorTest { shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); shardManager.tell(new ActorInitialized(), mockShardActor); - shardManager.tell(new LeaderStateChanged(memberId, memberId), getRef()); + DataTree mockDataTree = mock(DataTree.class); + shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree)), getRef()); MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class); shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(), @@ -162,9 +168,30 @@ public class ShardManagerTest extends AbstractActorTest { shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); - PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class); + LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class); assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), primaryFound.getPrimaryPath().contains("member-1-shard-default")); + assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() ); + }}; + } + + @Test + public void testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager.tell(new ActorInitialized(), mockShardActor); + + String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix; + String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix; + shardManager.tell(new RoleChangeNotification(memberId1, + RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor); + shardManager.tell(new LeaderStateChanged(memberId1, memberId2), mockShardActor); + + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); + + expectMsgClass(duration("5 seconds"), NoShardLeaderException.class); }}; } @@ -182,11 +209,11 @@ public class ShardManagerTest extends AbstractActorTest { String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix; shardManager.tell(new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor); - shardManager.tell(new LeaderStateChanged(memberId1, memberId2), mockShardActor); + shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, Optional.absent()), mockShardActor); shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); - PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class); + RemotePrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class); assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), primaryFound.getPrimaryPath().contains("member-2-shard-default")); }}; @@ -233,13 +260,15 @@ public class ShardManagerTest extends AbstractActorTest { expectMsgClass(duration("5 seconds"), NoShardLeaderException.class); - shardManager.tell(new LeaderStateChanged(memberId, memberId), mockShardActor); + DataTree mockDataTree = mock(DataTree.class); + shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree)), mockShardActor); shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); - PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class); + LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class); assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), primaryFound.getPrimaryPath().contains("member-1-shard-default")); + assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() ); }}; } @@ -266,11 +295,13 @@ public class ShardManagerTest extends AbstractActorTest { expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS)); - shardManager.tell(new LeaderStateChanged(memberId, memberId), mockShardActor); + DataTree mockDataTree = mock(DataTree.class); + shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree)), mockShardActor); - PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class); + LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class); assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), primaryFound.getPrimaryPath().contains("member-1-shard-default")); + assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() ); expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS)); }}; @@ -362,7 +393,8 @@ public class ShardManagerTest extends AbstractActorTest { shardManager2.tell(new ActorInitialized(), mockShardActor2); String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix; - shardManager2.tell(new LeaderStateChanged(memberId2, memberId2), mockShardActor2); + shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, + Optional.of(mock(DataTree.class))), mockShardActor2); shardManager2.tell(new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2); @@ -370,7 +402,7 @@ public class ShardManagerTest extends AbstractActorTest { shardManager1.tell(new FindPrimary("astronauts", false), getRef()); - PrimaryFound found = expectMsgClass(duration("5 seconds"), PrimaryFound.class); + RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class); String path = found.getPrimaryPath(); assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config")); @@ -639,7 +671,7 @@ public class ShardManagerTest extends AbstractActorTest { } @Test - public void testRoleChangeNotificationAndLeaderStateChangedReleaseReady() throws Exception { + public void testRoleChangeNotificationAndShardLeaderStateChangedReleaseReady() throws Exception { new JavaTestKit(getSystem()) { { TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps()); @@ -650,7 +682,8 @@ public class ShardManagerTest extends AbstractActorTest { verify(ready, never()).countDown(); - shardManager.underlyingActor().onReceiveCommand(new LeaderStateChanged(memberId, memberId)); + shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId, memberId, + Optional.of(mock(DataTree.class)))); verify(ready, times(1)).countDown(); @@ -658,7 +691,7 @@ public class ShardManagerTest extends AbstractActorTest { } @Test - public void testRoleChangeNotificationToFollowerWithLeaderStateChangedReleaseReady() throws Exception { + public void testRoleChangeNotificationToFollowerWithShardLeaderStateChangedReleaseReady() throws Exception { new JavaTestKit(getSystem()) { { TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps()); @@ -669,13 +702,37 @@ public class ShardManagerTest extends AbstractActorTest { verify(ready, never()).countDown(); - shardManager.underlyingActor().onReceiveCommand(new LeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix)); + shardManager.underlyingActor().onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString())); + + shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId, + "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)))); verify(ready, times(1)).countDown(); }}; } + @Test + public void testReadyCountDownForMemberUpAfterLeaderStateChanged() throws Exception { + new JavaTestKit(getSystem()) { + { + TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps()); + + String memberId = "member-1-shard-default-" + shardMrgIDSuffix; + shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification( + memberId, null, RaftState.Follower.name())); + + verify(ready, never()).countDown(); + + shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId, + "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)))); + + shardManager.underlyingActor().onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString())); + + verify(ready, times(1)).countDown(); + + }}; + } @Test public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index 4cbc121a95..1ecf0971c1 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -4,6 +4,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.inOrder; @@ -29,7 +30,6 @@ import com.google.common.util.concurrent.Uninterruptibles; import java.io.IOException; import java.util.Collections; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; @@ -59,6 +59,7 @@ import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeList import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply; import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener; import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply; +import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import org.opendaylight.controller.cluster.datastore.modification.DeleteModification; import org.opendaylight.controller.cluster.datastore.modification.MergeModification; @@ -66,7 +67,6 @@ import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; import org.opendaylight.controller.cluster.datastore.modification.WriteModification; -import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor; import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener; import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener; import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils; @@ -83,8 +83,10 @@ import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; +import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; +import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; @@ -2092,14 +2094,27 @@ public class ShardTest extends AbstractShardTest { shard.tell(new RegisterRoleChangeListener(), listener); - // TODO: MessageCollectorActor exists as a test util in both the akka-raft and distributed-datastore - // projects. Need to move it to commons as a regular utility and then we can get rid of this arbitrary - // sleep. - Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + MessageCollectorActor.expectFirstMatching(listener, RegisterRoleChangeListenerReply.class); - List allMatching = MessageCollectorActor.getAllMatching(listener, RegisterRoleChangeListenerReply.class); + ShardLeaderStateChanged leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener, + ShardLeaderStateChanged.class); + assertEquals("getLocalShardDataTree present", true, + leaderStateChanged.getLocalShardDataTree().isPresent()); + assertSame("getLocalShardDataTree", shard.underlyingActor().getDataStore().getDataTree(), + leaderStateChanged.getLocalShardDataTree().get()); - assertEquals(1, allMatching.size()); + MessageCollectorActor.clearMessages(listener); + + // Force a leader change + + shard.tell(new RequestVote(10000, "member2", 50, 50), getRef()); + + leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener, + ShardLeaderStateChanged.class); + assertEquals("getLocalShardDataTree present", false, + leaderStateChanged.getLocalShardDataTree().isPresent()); + + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); } }; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java index bc80937897..031463b2b9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java @@ -4,6 +4,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.doReturn; @@ -30,6 +31,7 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.lang.time.StopWatch; import org.junit.Assert; import org.junit.Test; +import org.mockito.Mockito; import org.opendaylight.controller.cluster.datastore.AbstractActorTest; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; import org.opendaylight.controller.cluster.datastore.Configuration; @@ -39,10 +41,12 @@ import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedEx import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; +import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; -import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; +import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.Await; @@ -404,7 +408,7 @@ public class ActorContextTest extends AbstractActorTest{ } @Test - public void testFindPrimaryShardAsyncPrimaryFound() throws Exception { + public void testFindPrimaryShardAsyncRemotePrimaryFound() throws Exception { TestActorRef shardManager = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class)); @@ -418,11 +422,10 @@ public class ActorContextTest extends AbstractActorTest{ mock(Configuration.class), dataStoreContext) { @Override protected Future doAsk(ActorRef actorRef, Object message, Timeout timeout) { - return Futures.successful((Object) new PrimaryFound(expPrimaryPath)); + return Futures.successful((Object) new RemotePrimaryShardFound(expPrimaryPath)); } }; - Future foobar = actorContext.findPrimaryShardAsync("foobar"); PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS)); @@ -444,7 +447,50 @@ public class ActorContextTest extends AbstractActorTest{ cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar"); assertNull(cached); + } + + @Test + public void testFindPrimaryShardAsyncLocalPrimaryFound() throws Exception { + + TestActorRef shardManager = + TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class)); + + DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config"). + shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build(); + final DataTree mockDataTree = Mockito.mock(DataTree.class); + final String expPrimaryPath = "akka://test-system/find-primary-shard"; + ActorContext actorContext = + new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class), + mock(Configuration.class), dataStoreContext) { + @Override + protected Future doAsk(ActorRef actorRef, Object message, Timeout timeout) { + return Futures.successful((Object) new LocalPrimaryShardFound(expPrimaryPath, mockDataTree)); + } + }; + + Future foobar = actorContext.findPrimaryShardAsync("foobar"); + PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS)); + + assertNotNull(actual); + assertEquals("LocalShardDataTree present", true, actual.getLocalShardDataTree().isPresent()); + assertSame("LocalShardDataTree", mockDataTree, actual.getLocalShardDataTree().get()); + assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(), + expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString())); + + Future cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar"); + + PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS)); + + assertEquals(cachedInfo, actual); + + // Wait for 200 Milliseconds. The cached entry should have been removed. + + Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); + + cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar"); + + assertNull(cached); } @Test @@ -521,8 +567,8 @@ public class ActorContextTest extends AbstractActorTest{ TestActorRef shardManagerActorRef = TestActorRef.create(getSystem(), MockShardManager.props()); MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor(); - shardManagerActor.addFindPrimaryResp("shard1", new PrimaryFound(shardActorRef1.path().toString())); - shardManagerActor.addFindPrimaryResp("shard2", new PrimaryFound(shardActorRef2.path().toString())); + shardManagerActor.addFindPrimaryResp("shard1", new RemotePrimaryShardFound(shardActorRef1.path().toString())); + shardManagerActor.addFindPrimaryResp("shard2", new RemotePrimaryShardFound(shardActorRef2.path().toString())); shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found")); Configuration mockConfig = mock(Configuration.class); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockClusterWrapper.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockClusterWrapper.java index 810b270cfc..5d44033cd6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockClusterWrapper.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockClusterWrapper.java @@ -74,7 +74,7 @@ public class MockClusterWrapper implements ClusterWrapper{ } - private static ClusterEvent.MemberUp createMemberUp(String memberName, String address) { + public static ClusterEvent.MemberUp createMemberUp(String memberName, String address) { akka.cluster.UniqueAddress uniqueAddress = new UniqueAddress( AddressFromURIString.parse(address), 55); diff --git a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/JsonNormalizedNodeBodyReader.java b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/JsonNormalizedNodeBodyReader.java index 10399ffeff..4d8463adae 100644 --- a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/JsonNormalizedNodeBodyReader.java +++ b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/JsonNormalizedNodeBodyReader.java @@ -90,7 +90,7 @@ public class JsonNormalizedNodeBodyReader extends AbstractIdentifierAwareJaxRsPr NormalizedNode partialResult = resultHolder.getResult(); final NormalizedNode result; - // unwrap result from augmentation and choice nodes on PUT + // FIXME: Also II should be updated unwrap result from augmentation and choice nodes on PUT if (!isPost()) { while (partialResult instanceof AugmentationNode || partialResult instanceof ChoiceNode) { final Object childNode = ((DataContainerNode) partialResult).getValue().iterator().next(); @@ -98,7 +98,7 @@ public class JsonNormalizedNodeBodyReader extends AbstractIdentifierAwareJaxRsPr } } - if (partialResult instanceof MapNode) { + if (partialResult instanceof MapNode && !isPost()) { result = Iterables.getOnlyElement(((MapNode) partialResult).getValue()); } else { result = partialResult; diff --git a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/BrokerFacade.java b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/BrokerFacade.java index 0378ae40ee..6a3adcccd6 100644 --- a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/BrokerFacade.java +++ b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/BrokerFacade.java @@ -39,6 +39,7 @@ import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; +import org.opendaylight.yangtools.yang.data.api.schema.MapNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.model.api.SchemaContext; @@ -204,19 +205,36 @@ public class BrokerFacade { final YangInstanceIdentifier parentPath, final NormalizedNode payload, final SchemaContext schemaContext) { // FIXME: This is doing correct post for container and list children // not sure if this will work for choice case - final YangInstanceIdentifier path; - if(payload instanceof MapEntryNode) { - path = parentPath.node(payload.getNodeType()).node(payload.getIdentifier()); + if(payload instanceof MapNode) { + final YangInstanceIdentifier mapPath = parentPath.node(payload.getIdentifier()); + final NormalizedNode emptySubtree = ImmutableNodes.fromInstanceId(schemaContext, mapPath); + rWTransaction.merge(datastore, YangInstanceIdentifier.create(emptySubtree.getIdentifier()), emptySubtree); + ensureParentsByMerge(datastore, mapPath, rWTransaction, schemaContext); + for(final MapEntryNode child : ((MapNode) payload).getValue()) { + final YangInstanceIdentifier childPath = mapPath.node(child.getIdentifier()); + checkItemDoesNotExists(rWTransaction, datastore, childPath); + rWTransaction.put(datastore, childPath, child); + } } else { - path = parentPath.node(payload.getIdentifier()); + final YangInstanceIdentifier path; + if(payload instanceof MapEntryNode) { + path = parentPath.node(payload.getNodeType()).node(payload.getIdentifier()); + } else { + path = parentPath.node(payload.getIdentifier()); + } + checkItemDoesNotExists(rWTransaction,datastore, path); + ensureParentsByMerge(datastore, path, rWTransaction, schemaContext); + rWTransaction.put(datastore, path, payload); } + return rWTransaction.submit(); + } - final ListenableFuture>> futureDatastoreData = rWTransaction.read(datastore, path); + private void checkItemDoesNotExists(final DOMDataReadWriteTransaction rWTransaction,final LogicalDatastoreType store, final YangInstanceIdentifier path) { + final ListenableFuture futureDatastoreData = rWTransaction.exists(store, path); try { - final Optional> optionalDatastoreData = futureDatastoreData.get(); - if (optionalDatastoreData.isPresent() && payload.equals(optionalDatastoreData.get())) { + if (futureDatastoreData.get()) { final String errMsg = "Post Configuration via Restconf was not executed because data already exists"; - LOG.trace(errMsg + ":{}", path); + LOG.debug(errMsg + ":{}", path); rWTransaction.cancel(); throw new RestconfDocumentedException("Data already exists for path: " + path, ErrorType.PROTOCOL, ErrorTag.DATA_EXISTS); @@ -225,10 +243,6 @@ public class BrokerFacade { LOG.trace("It wasn't possible to get data loaded from datastore at path " + path); } - ensureParentsByMerge(datastore, path, rWTransaction, schemaContext); - rWTransaction.merge(datastore, path, payload); - LOG.trace("Post " + datastore.name() + " via Restconf: {}", path); - return rWTransaction.submit(); } private CheckedFuture putDataViaTransaction( diff --git a/opendaylight/md-sal/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/test/BrokerFacadeTest.java b/opendaylight/md-sal/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/test/BrokerFacadeTest.java index 6542396612..9c5de08c5c 100644 --- a/opendaylight/md-sal/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/test/BrokerFacadeTest.java +++ b/opendaylight/md-sal/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/test/BrokerFacadeTest.java @@ -197,13 +197,8 @@ public class BrokerFacadeTest { @SuppressWarnings("unchecked") final CheckedFuture expFuture = mock(CheckedFuture.class); - final NormalizedNode dummyNode2 = createDummyNode("dummy:namespace2", "2014-07-01", "dummy local name2"); - - when(rwTransaction.read(eq(LogicalDatastoreType.CONFIGURATION), any(YangInstanceIdentifier.class))).thenReturn( - wrapDummyNode(dummyNode2)); - when(rwTransaction.exists(eq(LogicalDatastoreType.CONFIGURATION), any(YangInstanceIdentifier.class))).thenReturn( - wrapExistence(true)); + wrapExistence(false)); when(rwTransaction.submit()).thenReturn(expFuture); @@ -215,14 +210,16 @@ public class BrokerFacadeTest { final InOrder inOrder = inOrder(domDataBroker, rwTransaction); inOrder.verify(domDataBroker).newReadWriteTransaction(); - inOrder.verify(rwTransaction).merge(LogicalDatastoreType.CONFIGURATION, instanceID, dummyNode); + inOrder.verify(rwTransaction).exists(LogicalDatastoreType.CONFIGURATION, instanceID); + inOrder.verify(rwTransaction).put(LogicalDatastoreType.CONFIGURATION, instanceID, dummyNode); inOrder.verify(rwTransaction).submit(); } @Test(expected = RestconfDocumentedException.class) public void testCommitConfigurationDataPostAlreadyExists() { - when(rwTransaction.read(eq(LogicalDatastoreType.CONFIGURATION), any(YangInstanceIdentifier.class))).thenReturn( - dummyNodeInFuture); + final CheckedFuture successFuture = Futures.immediateCheckedFuture(Boolean.TRUE); + when(rwTransaction.exists(eq(LogicalDatastoreType.CONFIGURATION), any(YangInstanceIdentifier.class))).thenReturn( + successFuture); try { // Schema context is only necessary for ensuring parent structure brokerFacade.commitConfigurationDataPost((SchemaContext)null, instanceID, dummyNode);