From: Tom Pantelis Date: Thu, 26 Mar 2015 19:22:37 +0000 (+0000) Subject: Merge "Refactor snapshot code" X-Git-Tag: release/lithium~347 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=c747a3f05a7f68f0e2e35720db8ffab6a59827fb;hp=9fe7a995204bcfed3ee6b644922b8fe440fe5f5c Merge "Refactor snapshot code" --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ConcurrentDOMDataBroker.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ConcurrentDOMDataBroker.java index 886c473067..538f2981da 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ConcurrentDOMDataBroker.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ConcurrentDOMDataBroker.java @@ -15,12 +15,12 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import java.util.Collection; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; @@ -91,8 +91,9 @@ public class ConcurrentDOMDataBroker extends AbstractDOMDataBroker { final long startTime = System.nanoTime(); + final Iterator cohortIterator = cohorts.iterator(); + // Not using Futures.allAsList here to avoid its internal overhead. - final AtomicInteger remaining = new AtomicInteger(cohorts.size()); FutureCallback futureCallback = new FutureCallback() { @Override public void onSuccess(Boolean result) { @@ -102,9 +103,12 @@ public class ConcurrentDOMDataBroker extends AbstractDOMDataBroker { new TransactionCommitFailedException( "Can Commit failed, no detailed cause available.")); } else { - if(remaining.decrementAndGet() == 0) { + if(!cohortIterator.hasNext()) { // All cohorts completed successfully - we can move on to the preCommit phase doPreCommit(startTime, clientSubmitFuture, transaction, cohorts); + } else { + ListenableFuture canCommitFuture = cohortIterator.next().canCommit(); + Futures.addCallback(canCommitFuture, this, internalFutureCallbackExecutor); } } } @@ -116,24 +120,26 @@ public class ConcurrentDOMDataBroker extends AbstractDOMDataBroker { } }; - for(DOMStoreThreePhaseCommitCohort cohort: cohorts) { - ListenableFuture canCommitFuture = cohort.canCommit(); - Futures.addCallback(canCommitFuture, futureCallback, internalFutureCallbackExecutor); - } + ListenableFuture canCommitFuture = cohortIterator.next().canCommit(); + Futures.addCallback(canCommitFuture, futureCallback, internalFutureCallbackExecutor); } private void doPreCommit(final long startTime, final AsyncNotifyingSettableFuture clientSubmitFuture, final DOMDataWriteTransaction transaction, final Collection cohorts) { + final Iterator cohortIterator = cohorts.iterator(); + // Not using Futures.allAsList here to avoid its internal overhead. - final AtomicInteger remaining = new AtomicInteger(cohorts.size()); FutureCallback futureCallback = new FutureCallback() { @Override public void onSuccess(Void notUsed) { - if(remaining.decrementAndGet() == 0) { + if(!cohortIterator.hasNext()) { // All cohorts completed successfully - we can move on to the commit phase doCommit(startTime, clientSubmitFuture, transaction, cohorts); + } else { + ListenableFuture preCommitFuture = cohortIterator.next().preCommit(); + Futures.addCallback(preCommitFuture, this, internalFutureCallbackExecutor); } } @@ -144,26 +150,28 @@ public class ConcurrentDOMDataBroker extends AbstractDOMDataBroker { } }; - for(DOMStoreThreePhaseCommitCohort cohort: cohorts) { - ListenableFuture preCommitFuture = cohort.preCommit(); - Futures.addCallback(preCommitFuture, futureCallback, internalFutureCallbackExecutor); - } + ListenableFuture preCommitFuture = cohortIterator.next().preCommit(); + Futures.addCallback(preCommitFuture, futureCallback, internalFutureCallbackExecutor); } private void doCommit(final long startTime, final AsyncNotifyingSettableFuture clientSubmitFuture, final DOMDataWriteTransaction transaction, final Collection cohorts) { + final Iterator cohortIterator = cohorts.iterator(); + // Not using Futures.allAsList here to avoid its internal overhead. - final AtomicInteger remaining = new AtomicInteger(cohorts.size()); FutureCallback futureCallback = new FutureCallback() { @Override public void onSuccess(Void notUsed) { - if(remaining.decrementAndGet() == 0) { + if(!cohortIterator.hasNext()) { // All cohorts completed successfully - we're done. commitStatsTracker.addDuration(System.nanoTime() - startTime); clientSubmitFuture.set(); + } else { + ListenableFuture commitFuture = cohortIterator.next().commit(); + Futures.addCallback(commitFuture, this, internalFutureCallbackExecutor); } } @@ -174,10 +182,8 @@ public class ConcurrentDOMDataBroker extends AbstractDOMDataBroker { } }; - for(DOMStoreThreePhaseCommitCohort cohort: cohorts) { - ListenableFuture commitFuture = cohort.commit(); - Futures.addCallback(commitFuture, futureCallback, internalFutureCallbackExecutor); - } + ListenableFuture commitFuture = cohortIterator.next().commit(); + Futures.addCallback(commitFuture, futureCallback, internalFutureCallbackExecutor); } private void handleException(final AsyncNotifyingSettableFuture clientSubmitFuture, diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java index 681132e660..8ac424a6a8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java @@ -116,7 +116,7 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration DataChangeScope scope) { Future future = actorContext.executeOperationAsync(shard, - new RegisterChangeListener(path, dataChangeListenerActor.path(), scope), + new RegisterChangeListener(path, dataChangeListenerActor, scope), actorContext.getDatastoreContext().getShardInitializationTimeout()); future.onComplete(new OnComplete(){ diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 66467af130..a30b6f7516 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -677,7 +677,7 @@ public class Shard extends RaftActor { LOG.debug("{}: registerDataChangeListener sending reply, listenerRegistrationPath = {} ", persistenceId(), listenerRegistration.path()); - getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()), getSelf()); + getSender().tell(new RegisterChangeListenerReply(listenerRegistration), getSelf()); } private ListenerRegistration() { @Override public Object get() { - Object found = new PrimaryFound(info.getSerializedLeaderActor()).toSerializable(); + Object found = new PrimaryFound(info.getSerializedLeaderActor()); if(LOG.isDebugEnabled()) { - LOG.debug("{}: Found primary for {}: {}", shardName, found); + LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found); } return found; @@ -490,38 +521,35 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return; } - List members = configuration.getMembersFromShardName(shardName); + for(Map.Entry entry: memberNameToAddress.entrySet()) { + if(!cluster.getCurrentMemberName().equals(entry.getKey())) { + String path = getShardManagerActorPathBuilder(entry.getValue()).toString(); - if(cluster.getCurrentMemberName() != null) { - members.remove(cluster.getCurrentMemberName()); - } + LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}", persistenceId(), + shardName, path); - /** - * FIXME: Instead of sending remote shard actor path back to sender, - * forward FindPrimary message to remote shard manager - */ - // There is no way for us to figure out the primary (for now) so assume - // that one of the remote nodes is a primary - for(String memberName : members) { - Address address = memberNameToAddress.get(memberName); - if(address != null){ - String path = - getShardActorPath(shardName, memberName); - getSender().tell(new PrimaryFound(path).toSerializable(), getSelf()); + getContext().actorSelection(path).forward(message, getContext()); return; } } - getSender().tell(new PrimaryNotFound(shardName).toSerializable(), getSelf()); + + LOG.debug("{}: No shard found for {}", persistenceId(), shardName); + + getSender().tell(new PrimaryNotFoundException( + String.format("No primary shard found for %s.", shardName)), getSelf()); + } + + private StringBuilder getShardManagerActorPathBuilder(Address address) { + StringBuilder builder = new StringBuilder(); + builder.append(address.toString()).append("/user/").append(shardManagerIdentifierString); + return builder; } private String getShardActorPath(String shardName, String memberName) { Address address = memberNameToAddress.get(memberName); if(address != null) { - StringBuilder builder = new StringBuilder(); - builder.append(address.toString()) - .append("/user/") - .append(ShardManagerIdentifier.builder().type(type).build().toString()) - .append("/") + StringBuilder builder = getShardManagerActorPathBuilder(address); + builder.append("/") .append(getShardIdentifier(memberName, shardName)); return builder.toString(); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ActorNotInitialized.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ActorNotInitialized.java deleted file mode 100644 index 576010f916..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ActorNotInitialized.java +++ /dev/null @@ -1,14 +0,0 @@ -/* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.datastore.messages; - -import java.io.Serializable; - -public class ActorNotInitialized implements Serializable { - private static final long serialVersionUID = 1L; -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/FindPrimary.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/FindPrimary.java index d51d6800a2..2c18eaa86f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/FindPrimary.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/FindPrimary.java @@ -9,13 +9,14 @@ package org.opendaylight.controller.cluster.datastore.messages; import com.google.common.base.Preconditions; +import java.io.Serializable; /** * The FindPrimary message is used to locate the primary of any given shard * */ -public class FindPrimary implements SerializableMessage{ - public static final Class SERIALIZABLE_CLASS = FindPrimary.class; +public class FindPrimary implements Serializable { + private static final long serialVersionUID = 1L; private final String shardName; private final boolean waitUntilReady; @@ -36,15 +37,6 @@ public class FindPrimary implements SerializableMessage{ return waitUntilReady; } - @Override - public Object toSerializable() { - return this; - } - - public static FindPrimary fromSerializable(Object message){ - return (FindPrimary) message; - } - @Override public String toString() { StringBuilder builder = new StringBuilder(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryFound.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryFound.java index a5565020ed..4c154d43ae 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryFound.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryFound.java @@ -8,56 +8,48 @@ package org.opendaylight.controller.cluster.datastore.messages; +import java.io.Serializable; -public class PrimaryFound implements SerializableMessage { - public static final Class SERIALIZABLE_CLASS = PrimaryFound.class; - private final String primaryPath; +public class PrimaryFound implements Serializable { + private static final long serialVersionUID = 1L; - public PrimaryFound(final String primaryPath) { - this.primaryPath = primaryPath; - } + private final String primaryPath; - public String getPrimaryPath() { - return primaryPath; - } - - @Override - public boolean equals(final Object o) { - if (this == o) { - return true; + public PrimaryFound(final String primaryPath) { + this.primaryPath = primaryPath; } - if (o == null || getClass() != o.getClass()) { - return false; - } - - PrimaryFound that = (PrimaryFound) o; - if (!primaryPath.equals(that.primaryPath)) { - return false; + public String getPrimaryPath() { + return primaryPath; } - return true; - } + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } - @Override - public int hashCode() { - return primaryPath.hashCode(); - } + PrimaryFound that = (PrimaryFound) o; - @Override - public String toString() { - return "PrimaryFound{" + - "primaryPath='" + primaryPath + '\'' + - '}'; - } + if (!primaryPath.equals(that.primaryPath)) { + return false; + } + return true; + } - @Override - public Object toSerializable() { - return this; - } + @Override + public int hashCode() { + return primaryPath.hashCode(); + } - public static PrimaryFound fromSerializable(final Object message){ - return (PrimaryFound) message; - } + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("PrimaryFound [primaryPath=").append(primaryPath).append("]"); + return builder.toString(); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryNotFound.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryNotFound.java deleted file mode 100644 index b47c91b6e5..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryNotFound.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.cluster.datastore.messages; - -import com.google.common.base.Preconditions; - -public class PrimaryNotFound implements SerializableMessage { - public static final Class SERIALIZABLE_CLASS = PrimaryNotFound.class; - - private final String shardName; - - public PrimaryNotFound(final String shardName){ - - Preconditions.checkNotNull(shardName, "shardName should not be null"); - - this.shardName = shardName; - } - - @Override - public boolean equals(final Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - PrimaryNotFound that = (PrimaryNotFound) o; - - if (shardName != null ? !shardName.equals(that.shardName) : that.shardName != null) { - return false; - } - - return true; - } - - @Override - public int hashCode() { - return shardName != null ? shardName.hashCode() : 0; - } - - @Override - public Object toSerializable() { - return this; - } - - public static PrimaryNotFound fromSerializable(final Object message){ - return (PrimaryNotFound) message; - } -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListener.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListener.java index dea085153b..1d8edece1a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListener.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListener.java @@ -9,7 +9,9 @@ package org.opendaylight.controller.cluster.datastore.messages; import akka.actor.ActorPath; +import akka.actor.ActorRef; import akka.actor.ActorSystem; +import akka.serialization.Serialization; import org.opendaylight.controller.cluster.datastore.util.InstanceIdentifierUtils; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; import org.opendaylight.controller.protobuff.messages.registration.ListenerRegistrationMessages; @@ -20,15 +22,15 @@ public class RegisterChangeListener implements SerializableMessage { ListenerRegistrationMessages.RegisterChangeListener.class; private final YangInstanceIdentifier path; - private final ActorPath dataChangeListenerPath; + private final ActorRef dataChangeListener; private final AsyncDataBroker.DataChangeScope scope; public RegisterChangeListener(YangInstanceIdentifier path, - ActorPath dataChangeListenerPath, + ActorRef dataChangeListener, AsyncDataBroker.DataChangeScope scope) { this.path = path; - this.dataChangeListenerPath = dataChangeListenerPath; + this.dataChangeListener = dataChangeListener; this.scope = scope; } @@ -42,7 +44,7 @@ public class RegisterChangeListener implements SerializableMessage { } public ActorPath getDataChangeListenerPath() { - return dataChangeListenerPath; + return dataChangeListener.path(); } @@ -50,14 +52,14 @@ public class RegisterChangeListener implements SerializableMessage { public ListenerRegistrationMessages.RegisterChangeListener toSerializable() { return ListenerRegistrationMessages.RegisterChangeListener.newBuilder() .setInstanceIdentifierPath(InstanceIdentifierUtils.toSerializable(path)) - .setDataChangeListenerActorPath(dataChangeListenerPath.toString()) + .setDataChangeListenerActorPath(Serialization.serializedActorPath(dataChangeListener)) .setDataChangeScope(scope.ordinal()).build(); } - public static RegisterChangeListener fromSerializable(ActorSystem actorSystem,Object serializable){ + public static RegisterChangeListener fromSerializable(ActorSystem actorSystem, Object serializable){ ListenerRegistrationMessages.RegisterChangeListener o = (ListenerRegistrationMessages.RegisterChangeListener) serializable; return new RegisterChangeListener(InstanceIdentifierUtils.fromSerializable(o.getInstanceIdentifierPath()), - actorSystem.actorFor(o.getDataChangeListenerActorPath()).path(), + actorSystem.provider().resolveActorRef(o.getDataChangeListenerActorPath()), AsyncDataBroker.DataChangeScope.values()[o.getDataChangeScope()]); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListenerReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListenerReply.java index bbfbbaa80b..a2f04851eb 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListenerReply.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListenerReply.java @@ -9,32 +9,34 @@ package org.opendaylight.controller.cluster.datastore.messages; import akka.actor.ActorPath; +import akka.actor.ActorRef; import akka.actor.ActorSystem; +import akka.serialization.Serialization; import org.opendaylight.controller.protobuff.messages.registration.ListenerRegistrationMessages; public class RegisterChangeListenerReply implements SerializableMessage{ public static final Class SERIALIZABLE_CLASS = ListenerRegistrationMessages.RegisterChangeListenerReply.class; - private final ActorPath listenerRegistrationPath; + private final ActorRef listenerRegistration; - public RegisterChangeListenerReply(final ActorPath listenerRegistrationPath) { - this.listenerRegistrationPath = listenerRegistrationPath; + public RegisterChangeListenerReply(final ActorRef listenerRegistration) { + this.listenerRegistration = listenerRegistration; } public ActorPath getListenerRegistrationPath() { - return listenerRegistrationPath; + return listenerRegistration.path(); } @Override public ListenerRegistrationMessages.RegisterChangeListenerReply toSerializable() { return ListenerRegistrationMessages.RegisterChangeListenerReply.newBuilder() - .setListenerRegistrationPath(listenerRegistrationPath.toString()).build(); + .setListenerRegistrationPath(Serialization.serializedActorPath(listenerRegistration)).build(); } public static RegisterChangeListenerReply fromSerializable(final ActorSystem actorSystem,final Object serializable){ ListenerRegistrationMessages.RegisterChangeListenerReply o = (ListenerRegistrationMessages.RegisterChangeListenerReply) serializable; return new RegisterChangeListenerReply( - actorSystem.actorFor(o.getListenerRegistrationPath()).path() + actorSystem.provider().resolveActorRef(o.getListenerRegistrationPath()) ); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java index 6f9bb7fc9f..b6250fc1cc 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java @@ -41,13 +41,11 @@ import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedEx import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException; import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException; -import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; -import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; @@ -209,25 +207,22 @@ public class ActorContext { return ret; } Future future = executeOperationAsync(shardManager, - new FindPrimary(shardName, true).toSerializable(), shardInitializationTimeout); + new FindPrimary(shardName, true), shardInitializationTimeout); return future.transform(new Mapper() { @Override public ActorSelection checkedApply(Object response) throws Exception { - if(PrimaryFound.SERIALIZABLE_CLASS.isInstance(response)) { - PrimaryFound found = PrimaryFound.fromSerializable(response); + if(response instanceof PrimaryFound) { + PrimaryFound found = (PrimaryFound)response; LOG.debug("Primary found {}", found.getPrimaryPath()); ActorSelection actorSelection = actorSystem.actorSelection(found.getPrimaryPath()); primaryShardActorSelectionCache.put(shardName, Futures.successful(actorSelection)); return actorSelection; - } else if(response instanceof ActorNotInitialized) { - throw new NotInitializedException( - String.format("Found primary shard %s but it's not initialized yet. " + - "Please try again later", shardName)); - } else if(response instanceof PrimaryNotFound) { - throw new PrimaryNotFoundException( - String.format("No primary shard found for %S.", shardName)); + } else if(response instanceof NotInitializedException) { + throw (NotInitializedException)response; + } else if(response instanceof PrimaryNotFoundException) { + throw (PrimaryNotFoundException)response; } else if(response instanceof NoShardLeaderException) { throw (NoShardLeaderException)response; } @@ -274,10 +269,8 @@ public class ActorContext { LocalShardFound found = (LocalShardFound)response; LOG.debug("Local shard found {}", found.getPath()); return found.getPath(); - } else if(response instanceof ActorNotInitialized) { - throw new NotInitializedException( - String.format("Found local shard for %s but it's not initialized yet.", - shardName)); + } else if(response instanceof NotInitializedException) { + throw (NotInitializedException)response; } else if(response instanceof LocalShardNotFound) { throw new LocalShardNotFoundException( String.format("Local shard for %s does not exist.", shardName)); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DOMConcurrentDataCommitCoordinatorTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ConcurrentDOMDataBrokerTest.java similarity index 99% rename from opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DOMConcurrentDataCommitCoordinatorTest.java rename to opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ConcurrentDOMDataBrokerTest.java index c760349b1e..0b166f5ac8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DOMConcurrentDataCommitCoordinatorTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ConcurrentDOMDataBrokerTest.java @@ -47,7 +47,7 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCoh * * @author Thomas Pantelis */ -public class DOMConcurrentDataCommitCoordinatorTest { +public class ConcurrentDOMDataBrokerTest { private final DOMDataWriteTransaction transaction = mock(DOMDataWriteTransaction.class); private final DOMStoreThreePhaseCommitCohort mockCohort1 = mock(DOMStoreThreePhaseCommitCohort.class); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java index f6c8f07f6b..57e0e26c11 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java @@ -7,10 +7,10 @@ */ package org.opendaylight.controller.cluster.datastore; -import static org.mockito.Mockito.any; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; import akka.actor.ActorRef; import akka.actor.ActorSystem; @@ -28,7 +28,7 @@ import org.junit.Test; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized; +import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; @@ -96,7 +96,7 @@ public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest { Assert.assertEquals("getPath", path, registerMsg.getPath()); Assert.assertEquals("getScope", scope, registerMsg.getScope()); - reply(new RegisterChangeListenerReply(getRef().path())); + reply(new RegisterChangeListenerReply(getRef())); for(int i = 0; (i < 20 * 5) && proxy.getListenerRegistrationActor() == null; i++) { Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); @@ -173,7 +173,7 @@ public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest { FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class); Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName()); - reply(new ActorNotInitialized()); + reply(new NotInitializedException("not initialized")); new Within(duration("1 seconds")) { @Override @@ -242,7 +242,7 @@ public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest { @Override public Future answer(InvocationOnMock invocation) { proxy.close(); - return Futures.successful((Object)new RegisterChangeListenerReply(getRef().path())); + return Futures.successful((Object)new RegisterChangeListenerReply(getRef())); } }; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java index ae7a4f96c5..95b1b78a19 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java @@ -9,16 +9,23 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.AddressFromURIString; import akka.actor.Props; +import akka.cluster.Cluster; +import akka.cluster.ClusterEvent; +import akka.dispatch.Dispatchers; import akka.japi.Creator; import akka.pattern.Patterns; import akka.persistence.RecoveryCompleted; import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; import akka.util.Timeout; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; import com.google.common.util.concurrent.Uninterruptibles; +import com.typesafe.config.ConfigFactory; import java.net.URI; import java.util.Arrays; import java.util.Collection; @@ -35,15 +42,16 @@ import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; +import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; +import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; +import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier; import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; -import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; -import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor; import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper; @@ -75,6 +83,11 @@ public class ShardManagerTest extends AbstractActorTest { private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder(). dataStoreType(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS); + private static ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) { + String name = new ShardIdentifier(shardName, memberName,"config").toString(); + return TestActorRef.create(system, Props.create(MessageCollectorActor.class), name); + } + @Before public void setUp() { MockitoAnnotations.initMocks(this); @@ -100,21 +113,22 @@ public class ShardManagerTest extends AbstractActorTest { } private Props newPropsShardMgrWithMockShardActor() { + return newPropsShardMgrWithMockShardActor("shardManager", mockShardActor, new MockClusterWrapper(), + new MockConfiguration()); + } + + private Props newPropsShardMgrWithMockShardActor(final String name, final ActorRef shardActor, + final ClusterWrapper clusterWrapper, final Configuration config) { Creator creator = new Creator() { private static final long serialVersionUID = 1L; @Override public ShardManager create() throws Exception { - return new ShardManager(new MockClusterWrapper(), new MockConfiguration(), - datastoreContextBuilder.build(), ready) { - @Override - protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) { - return mockShardActor; - } - }; + return new ForwardingShardManager(clusterWrapper, config, datastoreContextBuilder.build(), + ready, name, shardActor); } }; - return Props.create(new DelegatingShardManagerCreator(creator)); + return Props.create(new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()); } @Test @@ -124,9 +138,9 @@ public class ShardManagerTest extends AbstractActorTest { shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); - shardManager.tell(new FindPrimary("non-existent", false).toSerializable(), getRef()); + shardManager.tell(new FindPrimary("non-existent", false), getRef()); - expectMsgEquals(duration("5 seconds"), new PrimaryNotFound("non-existent").toSerializable()); + expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class); }}; } @@ -146,9 +160,9 @@ public class ShardManagerTest extends AbstractActorTest { shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name())), mockShardActor); - shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef()); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); - PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS); + PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class); assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), primaryFound.getPrimaryPath().contains("member-1-shard-default")); }}; @@ -170,9 +184,9 @@ public class ShardManagerTest extends AbstractActorTest { RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor); shardManager.tell(new LeaderStateChanged(memberId1, memberId2), mockShardActor); - shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef()); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); - PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS); + PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class); assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), primaryFound.getPrimaryPath().contains("member-2-shard-default")); }}; @@ -183,9 +197,9 @@ public class ShardManagerTest extends AbstractActorTest { new JavaTestKit(getSystem()) {{ final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); - shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef()); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); - expectMsgClass(duration("5 seconds"), ActorNotInitialized.class); + expectMsgClass(duration("5 seconds"), NotInitializedException.class); }}; } @@ -197,7 +211,7 @@ public class ShardManagerTest extends AbstractActorTest { shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); shardManager.tell(new ActorInitialized(), mockShardActor); - shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef()); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); expectMsgClass(duration("5 seconds"), NoShardLeaderException.class); }}; @@ -215,15 +229,15 @@ public class ShardManagerTest extends AbstractActorTest { shardManager.tell(new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor); - shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef()); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); expectMsgClass(duration("5 seconds"), NoShardLeaderException.class); shardManager.tell(new LeaderStateChanged(memberId, memberId), mockShardActor); - shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef()); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); - PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS); + PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class); assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), primaryFound.getPrimaryPath().contains("member-1-shard-default")); }}; @@ -238,7 +252,7 @@ public class ShardManagerTest extends AbstractActorTest { // We're passing waitUntilInitialized = true to FindPrimary so the response should be // delayed until we send ActorInitialized and RoleChangeNotification. - shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef()); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef()); expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS)); @@ -254,7 +268,7 @@ public class ShardManagerTest extends AbstractActorTest { shardManager.tell(new LeaderStateChanged(memberId, memberId), mockShardActor); - PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS); + PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class); assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), primaryFound.getPrimaryPath().contains("member-1-shard-default")); @@ -269,9 +283,9 @@ public class ShardManagerTest extends AbstractActorTest { shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); - shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef()); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef()); - expectMsgClass(duration("2 seconds"), ActorNotInitialized.class); + expectMsgClass(duration("2 seconds"), NotInitializedException.class); shardManager.tell(new ActorInitialized(), mockShardActor); @@ -289,7 +303,7 @@ public class ShardManagerTest extends AbstractActorTest { shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix, null, RaftState.Candidate.name()), mockShardActor); - shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef()); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef()); expectMsgClass(duration("2 seconds"), NoShardLeaderException.class); }}; @@ -303,12 +317,78 @@ public class ShardManagerTest extends AbstractActorTest { shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); shardManager.tell(new ActorInitialized(), mockShardActor); - shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef()); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef()); expectMsgClass(duration("2 seconds"), NoShardLeaderException.class); }}; } + @Test + public void testOnReceiveFindPrimaryForRemoteShard() throws Exception { + String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString(); + + // Create an ActorSystem ShardManager actor for member-1. + + final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1")); + Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); + + ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1"); + + final TestActorRef shardManager1 = TestActorRef.create(system1, + newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1), + new MockConfiguration()), shardManagerID); + + // Create an ActorSystem ShardManager actor for member-2. + + final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2")); + + Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); + + final ActorRef mockShardActor2 = newMockShardActor(system2, "astronauts", "member-2"); + + MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.>builder(). + put("default", Arrays.asList("member-1", "member-2")). + put("astronauts", Arrays.asList("member-2")).build()); + + final TestActorRef shardManager2 = TestActorRef.create(system2, + newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2), + mockConfig2), shardManagerID); + + new JavaTestKit(system1) {{ + + shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + + shardManager2.tell(new ActorInitialized(), mockShardActor2); + + String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix; + shardManager2.tell(new LeaderStateChanged(memberId2, memberId2), mockShardActor2); + shardManager2.tell(new RoleChangeNotification(memberId2, + RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2); + + shardManager1.underlyingActor().waitForMemberUp(); + + shardManager1.tell(new FindPrimary("astronauts", false), getRef()); + + PrimaryFound found = expectMsgClass(duration("5 seconds"), PrimaryFound.class); + String path = found.getPrimaryPath(); + assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config")); + + shardManager2.underlyingActor().verifyFindPrimary(); + + Cluster.get(system2).down(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); + + shardManager1.underlyingActor().waitForMemberRemoved(); + + shardManager1.tell(new FindPrimary("astronauts", false), getRef()); + + expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class); + }}; + + JavaTestKit.shutdownActorSystem(system1); + JavaTestKit.shutdownActorSystem(system2); + } + @Test public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception { new JavaTestKit(getSystem()) {{ @@ -348,7 +428,7 @@ public class ShardManagerTest extends AbstractActorTest { shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef()); - expectMsgClass(duration("5 seconds"), ActorNotInitialized.class); + expectMsgClass(duration("5 seconds"), NotInitializedException.class); }}; } @@ -371,42 +451,6 @@ public class ShardManagerTest extends AbstractActorTest { }}; } - @Test - public void testOnReceiveMemberUp() throws Exception { - new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newShardMgrProps()); - - MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString()); - - shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef()); - - PrimaryFound found = PrimaryFound.fromSerializable(expectMsgClass(duration("5 seconds"), - PrimaryFound.SERIALIZABLE_CLASS)); - String path = found.getPrimaryPath(); - assertTrue("Found path contains " + path, path.contains("member-2-shard-astronauts-config")); - }}; - } - - @Test - public void testOnReceiveMemberDown() throws Exception { - - new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newShardMgrProps()); - - MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString()); - - shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef()); - - expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS); - - MockClusterWrapper.sendMemberRemoved(shardManager, "member-2", getRef().path().toString()); - - shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef()); - - expectMsgClass(duration("5 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS); - }}; - } - @Test public void testOnRecoveryJournalIsCleaned() { InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules( @@ -804,4 +848,69 @@ public class ShardManagerTest extends AbstractActorTest { return delegate.create(); } } + + private static class ForwardingShardManager extends ShardManager { + private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1); + private CountDownLatch memberUpReceived = new CountDownLatch(1); + private CountDownLatch memberRemovedReceived = new CountDownLatch(1); + private final ActorRef shardActor; + private final String name; + + protected ForwardingShardManager(ClusterWrapper cluster, Configuration configuration, + DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch, String name, + ActorRef shardActor) { + super(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch); + this.shardActor = shardActor; + this.name = name; + } + + @Override + public void handleCommand(Object message) throws Exception { + try{ + super.handleCommand(message); + } finally { + if(message instanceof FindPrimary) { + findPrimaryMessageReceived.countDown(); + } else if(message instanceof ClusterEvent.MemberUp) { + String role = ((ClusterEvent.MemberUp)message).member().roles().head(); + if(!getCluster().getCurrentMemberName().equals(role)) { + memberUpReceived.countDown(); + } + } else if(message instanceof ClusterEvent.MemberRemoved) { + String role = ((ClusterEvent.MemberRemoved)message).member().roles().head(); + if(!getCluster().getCurrentMemberName().equals(role)) { + memberRemovedReceived.countDown(); + } + } + } + } + + @Override + public String persistenceId() { + return name; + } + + @Override + protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) { + return shardActor; + } + + void waitForMemberUp() { + assertEquals("MemberUp received", true, + Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS)); + memberUpReceived = new CountDownLatch(1); + } + + void waitForMemberRemoved() { + assertEquals("MemberRemoved received", true, + Uninterruptibles.awaitUninterruptibly(memberRemovedReceived, 5, TimeUnit.SECONDS)); + memberRemovedReceived = new CountDownLatch(1); + } + + void verifyFindPrimary() { + assertEquals("FindPrimary received", true, + Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS)); + findPrimaryMessageReceived = new CountDownLatch(1); + } + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index adc7f4706c..3e0bc42397 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -120,7 +120,7 @@ public class ShardTest extends AbstractShardTest { "testRegisterChangeListener-DataChangeListener"); shard.tell(new RegisterChangeListener(TestModel.TEST_PATH, - dclActor.path(), AsyncDataBroker.DataChangeScope.BASE), getRef()); + dclActor, AsyncDataBroker.DataChangeScope.BASE), getRef()); RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"), RegisterChangeListenerReply.class); @@ -208,7 +208,7 @@ public class ShardTest extends AbstractShardTest { onFirstElectionTimeout.await(5, TimeUnit.SECONDS)); // Now send the RegisterChangeListener and wait for the reply. - shard.tell(new RegisterChangeListener(path, dclActor.path(), + shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.SUBTREE), getRef()); RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"), diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListenerReplyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListenerReplyTest.java new file mode 100644 index 0000000000..696a898169 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListenerReplyTest.java @@ -0,0 +1,58 @@ +package org.opendaylight.controller.cluster.datastore.messages; + +import static junit.framework.TestCase.assertEquals; +import akka.actor.Actor; +import akka.serialization.Serialization; +import akka.testkit.TestActorRef; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.opendaylight.controller.cluster.datastore.AbstractActorTest; +import org.opendaylight.controller.cluster.raft.TestActorFactory; +import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; +import org.opendaylight.controller.protobuff.messages.registration.ListenerRegistrationMessages; + +public class RegisterChangeListenerReplyTest extends AbstractActorTest { + + private TestActorFactory factory; + + + @Before + public void setUp(){ + factory = new TestActorFactory(getSystem()); + } + + @After + public void shutDown(){ + factory.close(); + } + + @Test + public void testToSerializable(){ + TestActorRef testActor = factory.createTestActor(MessageCollectorActor.props()); + + RegisterChangeListenerReply registerChangeListenerReply = new RegisterChangeListenerReply(testActor); + + ListenerRegistrationMessages.RegisterChangeListenerReply serialized + = registerChangeListenerReply.toSerializable(); + + assertEquals(Serialization.serializedActorPath(testActor), serialized.getListenerRegistrationPath()); + } + + @Test + public void testFromSerializable(){ + TestActorRef testActor = factory.createTestActor(MessageCollectorActor.props()); + + RegisterChangeListenerReply registerChangeListenerReply = new RegisterChangeListenerReply(testActor); + + ListenerRegistrationMessages.RegisterChangeListenerReply serialized + = registerChangeListenerReply.toSerializable(); + + + RegisterChangeListenerReply fromSerialized + = RegisterChangeListenerReply.fromSerializable(getSystem(), serialized); + + assertEquals(testActor.path().toString(), fromSerialized.getListenerRegistrationPath().toString()); + } + +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListenerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListenerTest.java new file mode 100644 index 0000000000..2354a7946a --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListenerTest.java @@ -0,0 +1,67 @@ +package org.opendaylight.controller.cluster.datastore.messages; + +import static junit.framework.TestCase.assertEquals; +import akka.actor.Actor; +import akka.serialization.Serialization; +import akka.testkit.TestActorRef; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.opendaylight.controller.cluster.datastore.AbstractActorTest; +import org.opendaylight.controller.cluster.raft.TestActorFactory; +import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; +import org.opendaylight.controller.md.cluster.datastore.model.TestModel; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; +import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages; +import org.opendaylight.controller.protobuff.messages.registration.ListenerRegistrationMessages; + +public class RegisterChangeListenerTest extends AbstractActorTest { + + private TestActorFactory factory; + + @Before + public void setUp(){ + factory = new TestActorFactory(getSystem()); + } + + @After + public void shutDown(){ + factory.close(); + } + + @Test + public void testToSerializable(){ + TestActorRef testActor = factory.createTestActor(MessageCollectorActor.props()); + RegisterChangeListener registerChangeListener = new RegisterChangeListener(TestModel.TEST_PATH, testActor + , AsyncDataBroker.DataChangeScope.BASE); + + ListenerRegistrationMessages.RegisterChangeListener serialized + = registerChangeListener.toSerializable(); + + NormalizedNodeMessages.InstanceIdentifier path = serialized.getInstanceIdentifierPath(); + + assertEquals("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test", path.getCode(0)); + assertEquals(Serialization.serializedActorPath(testActor), serialized.getDataChangeListenerActorPath()); + assertEquals(AsyncDataBroker.DataChangeScope.BASE.ordinal(), serialized.getDataChangeScope()); + + } + + @Test + public void testFromSerializable(){ + TestActorRef testActor = factory.createTestActor(MessageCollectorActor.props()); + RegisterChangeListener registerChangeListener = new RegisterChangeListener(TestModel.TEST_PATH, testActor + , AsyncDataBroker.DataChangeScope.SUBTREE); + + ListenerRegistrationMessages.RegisterChangeListener serialized + = registerChangeListener.toSerializable(); + + + RegisterChangeListener fromSerialized = RegisterChangeListener.fromSerializable(getSystem(), serialized); + + assertEquals(TestModel.TEST_PATH, registerChangeListener.getPath()); + assertEquals(testActor.path().toString(), fromSerialized.getDataChangeListenerPath().toString()); + assertEquals(AsyncDataBroker.DataChangeScope.SUBTREE, fromSerialized.getScope()); + + + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java index 2746bcf982..6b4f633778 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java @@ -37,13 +37,11 @@ import org.opendaylight.controller.cluster.datastore.DatastoreContext; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; -import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; -import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.Await; @@ -458,7 +456,7 @@ public class ActorContextTest extends AbstractActorTest{ mock(Configuration.class), dataStoreContext) { @Override protected Future doAsk(ActorRef actorRef, Object message, Timeout timeout) { - return Futures.successful((Object) new PrimaryNotFound("foobar")); + return Futures.successful((Object) new PrimaryNotFoundException("not found")); } }; @@ -491,7 +489,7 @@ public class ActorContextTest extends AbstractActorTest{ mock(Configuration.class), dataStoreContext) { @Override protected Future doAsk(ActorRef actorRef, Object message, Timeout timeout) { - return Futures.successful((Object) new ActorNotInitialized()); + return Futures.successful((Object) new NotInitializedException("not iniislized")); } }; @@ -518,8 +516,8 @@ public class ActorContextTest extends AbstractActorTest{ TestActorRef shardManagerActorRef = TestActorRef.create(getSystem(), MockShardManager.props()); MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor(); - shardManagerActor.addFindPrimaryResp("shard1", new PrimaryFound(shardActorRef1.path().toString()).toSerializable()); - shardManagerActor.addFindPrimaryResp("shard2", new PrimaryFound(shardActorRef2.path().toString()).toSerializable()); + shardManagerActor.addFindPrimaryResp("shard1", new PrimaryFound(shardActorRef1.path().toString())); + shardManagerActor.addFindPrimaryResp("shard2", new PrimaryFound(shardActorRef2.path().toString())); shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found")); Configuration mockConfig = mock(Configuration.class); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockClusterWrapper.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockClusterWrapper.java index fe40aa0fd4..810b270cfc 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockClusterWrapper.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockClusterWrapper.java @@ -14,14 +14,22 @@ import akka.actor.AddressFromURIString; import akka.cluster.ClusterEvent; import akka.cluster.MemberStatus; import akka.cluster.UniqueAddress; -import org.opendaylight.controller.cluster.datastore.ClusterWrapper; -import scala.collection.JavaConversions; import java.util.HashSet; import java.util.Set; +import org.opendaylight.controller.cluster.datastore.ClusterWrapper; +import scala.collection.JavaConversions; public class MockClusterWrapper implements ClusterWrapper{ private Address selfAddress = new Address("akka.tcp", "test", "127.0.0.1", 2550); + private String currentMemberName = "member-1"; + + public MockClusterWrapper() { + } + + public MockClusterWrapper(String currentMemberName) { + this.currentMemberName = currentMemberName; + } @Override public void subscribeToMemberEvents(ActorRef actorRef) { @@ -29,7 +37,7 @@ public class MockClusterWrapper implements ClusterWrapper{ @Override public String getCurrentMemberName() { - return "member-1"; + return currentMemberName; } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockConfiguration.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockConfiguration.java index 4ef7d65857..0bc561f1bd 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockConfiguration.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockConfiguration.java @@ -9,6 +9,8 @@ package org.opendaylight.controller.cluster.datastore.utils; import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -18,11 +20,23 @@ import org.opendaylight.controller.cluster.datastore.Configuration; import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy; public class MockConfiguration implements Configuration{ - @Override public List getMemberShardNames(final String memberName) { - return Arrays.asList("default"); + private Map> shardMembers = ImmutableMap.>builder(). + put("default", Arrays.asList("member-1", "member-2")). + /*put("astronauts", Arrays.asList("member-2", "member-3")).*/build(); + + public MockConfiguration() { + } + + public MockConfiguration(Map> shardMembers) { + this.shardMembers = shardMembers; } - @Override public Optional getModuleNameFromNameSpace( + @Override + public List getMemberShardNames(final String memberName) { + return new ArrayList<>(shardMembers.keySet()); + } + @Override + public Optional getModuleNameFromNameSpace( final String nameSpace) { return Optional.absent(); } @@ -44,7 +58,8 @@ public class MockConfiguration implements Configuration{ return Arrays.asList("member-2", "member-3"); } - return Collections.emptyList(); + List members = shardMembers.get(shardName); + return members != null ? members : Collections.emptyList(); } @Override public Set getAllShardNames() { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf index badec6f831..03634627d6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf @@ -34,3 +34,105 @@ bounded-mailbox { mailbox-capacity = 1000 mailbox-push-timeout-time = 100ms } + +Member1 { + bounded-mailbox { + mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox" + mailbox-capacity = 1000 + mailbox-push-timeout-time = 100ms + } + + in-memory-journal { + class = "org.opendaylight.controller.cluster.raft.utils.InMemoryJournal" + } + + in-memory-snapshot-store { + class = "org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore" + plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" + } + + akka { + persistence.snapshot-store.plugin = "in-memory-snapshot-store" + persistence.journal.plugin = "in-memory-journal" + + loglevel = "DEBUG" + + actor { + provider = "akka.cluster.ClusterActorRefProvider" + + serializers { + java = "akka.serialization.JavaSerializer" + proto = "akka.remote.serialization.ProtobufSerializer" + } + + serialization-bindings { + "com.google.protobuf.Message" = proto + } + } + remote { + log-remote-lifecycle-events = off + netty.tcp { + hostname = "127.0.0.1" + port = 2558 + } + } + + cluster { + auto-down-unreachable-after = 100s + + roles = [ + "member-1" + ] + } + } +} + +Member2 { + bounded-mailbox { + mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox" + mailbox-capacity = 1000 + mailbox-push-timeout-time = 100ms + } + + in-memory-journal { + class = "org.opendaylight.controller.cluster.raft.utils.InMemoryJournal" + } + + in-memory-snapshot-store { + class = "org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore" + plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" + } + + akka { + persistence.snapshot-store.plugin = "in-memory-snapshot-store" + persistence.journal.plugin = "in-memory-journal" + + actor { + provider = "akka.cluster.ClusterActorRefProvider" + + serializers { + java = "akka.serialization.JavaSerializer" + proto = "akka.remote.serialization.ProtobufSerializer" + } + + serialization-bindings { + "com.google.protobuf.Message" = proto + } + } + remote { + log-remote-lifecycle-events = off + netty.tcp { + hostname = "127.0.0.1" + port = 2559 + } + } + + cluster { + auto-down-unreachable-after = 100s + + roles = [ + "member-2" + ] + } + } +}