import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.Collection;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
final long startTime = System.nanoTime();
+ final Iterator<DOMStoreThreePhaseCommitCohort> cohortIterator = cohorts.iterator();
+
// Not using Futures.allAsList here to avoid its internal overhead.
- final AtomicInteger remaining = new AtomicInteger(cohorts.size());
FutureCallback<Boolean> futureCallback = new FutureCallback<Boolean>() {
@Override
public void onSuccess(Boolean result) {
new TransactionCommitFailedException(
"Can Commit failed, no detailed cause available."));
} else {
- if(remaining.decrementAndGet() == 0) {
+ if(!cohortIterator.hasNext()) {
// All cohorts completed successfully - we can move on to the preCommit phase
doPreCommit(startTime, clientSubmitFuture, transaction, cohorts);
+ } else {
+ ListenableFuture<Boolean> canCommitFuture = cohortIterator.next().canCommit();
+ Futures.addCallback(canCommitFuture, this, internalFutureCallbackExecutor);
}
}
}
}
};
- for(DOMStoreThreePhaseCommitCohort cohort: cohorts) {
- ListenableFuture<Boolean> canCommitFuture = cohort.canCommit();
- Futures.addCallback(canCommitFuture, futureCallback, internalFutureCallbackExecutor);
- }
+ ListenableFuture<Boolean> canCommitFuture = cohortIterator.next().canCommit();
+ Futures.addCallback(canCommitFuture, futureCallback, internalFutureCallbackExecutor);
}
private void doPreCommit(final long startTime, final AsyncNotifyingSettableFuture clientSubmitFuture,
final DOMDataWriteTransaction transaction,
final Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
+ final Iterator<DOMStoreThreePhaseCommitCohort> cohortIterator = cohorts.iterator();
+
// Not using Futures.allAsList here to avoid its internal overhead.
- final AtomicInteger remaining = new AtomicInteger(cohorts.size());
FutureCallback<Void> futureCallback = new FutureCallback<Void>() {
@Override
public void onSuccess(Void notUsed) {
- if(remaining.decrementAndGet() == 0) {
+ if(!cohortIterator.hasNext()) {
// All cohorts completed successfully - we can move on to the commit phase
doCommit(startTime, clientSubmitFuture, transaction, cohorts);
+ } else {
+ ListenableFuture<Void> preCommitFuture = cohortIterator.next().preCommit();
+ Futures.addCallback(preCommitFuture, this, internalFutureCallbackExecutor);
}
}
}
};
- for(DOMStoreThreePhaseCommitCohort cohort: cohorts) {
- ListenableFuture<Void> preCommitFuture = cohort.preCommit();
- Futures.addCallback(preCommitFuture, futureCallback, internalFutureCallbackExecutor);
- }
+ ListenableFuture<Void> preCommitFuture = cohortIterator.next().preCommit();
+ Futures.addCallback(preCommitFuture, futureCallback, internalFutureCallbackExecutor);
}
private void doCommit(final long startTime, final AsyncNotifyingSettableFuture clientSubmitFuture,
final DOMDataWriteTransaction transaction,
final Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
+ final Iterator<DOMStoreThreePhaseCommitCohort> cohortIterator = cohorts.iterator();
+
// Not using Futures.allAsList here to avoid its internal overhead.
- final AtomicInteger remaining = new AtomicInteger(cohorts.size());
FutureCallback<Void> futureCallback = new FutureCallback<Void>() {
@Override
public void onSuccess(Void notUsed) {
- if(remaining.decrementAndGet() == 0) {
+ if(!cohortIterator.hasNext()) {
// All cohorts completed successfully - we're done.
commitStatsTracker.addDuration(System.nanoTime() - startTime);
clientSubmitFuture.set();
+ } else {
+ ListenableFuture<Void> commitFuture = cohortIterator.next().commit();
+ Futures.addCallback(commitFuture, this, internalFutureCallbackExecutor);
}
}
}
};
- for(DOMStoreThreePhaseCommitCohort cohort: cohorts) {
- ListenableFuture<Void> commitFuture = cohort.commit();
- Futures.addCallback(commitFuture, futureCallback, internalFutureCallbackExecutor);
- }
+ ListenableFuture<Void> commitFuture = cohortIterator.next().commit();
+ Futures.addCallback(commitFuture, futureCallback, internalFutureCallbackExecutor);
}
private void handleException(final AsyncNotifyingSettableFuture clientSubmitFuture,
DataChangeScope scope) {
Future<Object> future = actorContext.executeOperationAsync(shard,
- new RegisterChangeListener(path, dataChangeListenerActor.path(), scope),
+ new RegisterChangeListener(path, dataChangeListenerActor, scope),
actorContext.getDatastoreContext().getShardInitializationTimeout());
future.onComplete(new OnComplete<Object>(){
LOG.debug("{}: registerDataChangeListener sending reply, listenerRegistrationPath = {} ",
persistenceId(), listenerRegistration.path());
- getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()), getSelf());
+ getSender().tell(new RegisterChangeListenerReply(listenerRegistration), getSelf());
}
private ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
+import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
+import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
-import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
// A data store could be of type config/operational
private final String type;
+ private final String shardManagerIdentifierString;
+
private final ClusterWrapper cluster;
private final Configuration configuration;
this.datastoreContext = datastoreContext;
this.dataPersistenceProvider = createDataPersistenceProvider(datastoreContext.isPersistent());
this.type = datastoreContext.getDataStoreType();
+ this.shardManagerIdentifierString = ShardManagerIdentifier.builder().type(type).build().toString();
this.shardDispatcherPath =
new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
@Override
public void handleCommand(Object message) throws Exception {
- if (FindPrimary.SERIALIZABLE_CLASS.isInstance(message)) {
- findPrimary(FindPrimary.fromSerializable(message));
+ if (message instanceof FindPrimary) {
+ findPrimary((FindPrimary)message);
} else if(message instanceof FindLocalShard){
findLocalShard((FindLocalShard) message);
} else if (message instanceof UpdateSchemaContext) {
ShardInformation shardInfo = message.getShardInfo();
LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(),
- shardInfo.getShardId());
+ shardInfo.getShardName());
shardInfo.removeOnShardInitialized(message.getOnShardInitialized());
if(!shardInfo.isShardInitialized()) {
- message.getSender().tell(new ActorNotInitialized(), getSelf());
+ LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName());
+ message.getSender().tell(createNotInitializedException(shardInfo.shardId), getSelf());
} else {
+ LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName());
message.getSender().tell(createNoShardLeaderException(shardInfo.shardId), getSelf());
}
}
}
private void markShardAsInitialized(String shardName) {
- LOG.debug("Initializing shard [{}]", shardName);
+ LOG.debug("{}: Initializing shard [{}]", persistenceId(), shardName);
ShardInformation shardInformation = localShards.get(shardName);
if (shardInformation != null) {
shardInformation.addOnShardInitialized(onShardInitialized);
+ LOG.debug("{}: Scheduling timer to wait for shard {}", persistenceId(), shardInformation.getShardName());
+
Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
datastoreContext.getShardInitializationTimeout().duration(), getSelf(),
new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender),
onShardInitialized.setTimeoutSchedule(timeoutSchedule);
} else if (!shardInformation.isShardInitialized()) {
- getSender().tell(new ActorNotInitialized(), getSelf());
+ LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(),
+ shardInformation.getShardName());
+ getSender().tell(createNotInitializedException(shardInformation.shardId), getSelf());
} else {
+ LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(),
+ shardInformation.getShardName());
getSender().tell(createNoShardLeaderException(shardInformation.shardId), getSelf());
}
"recovering and a leader is being elected. Try again later.", shardId));
}
+ private NotInitializedException createNotInitializedException(ShardIdentifier shardId) {
+ return new NotInitializedException(String.format(
+ "Found primary shard %s but it's not initialized yet. Please try again later", shardId));
+ }
+
private void memberRemoved(ClusterEvent.MemberRemoved message) {
+ String memberName = message.member().roles().head();
+
+ LOG.debug("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
+ message.member().address());
+
memberNameToAddress.remove(message.member().roles().head());
}
private void memberUp(ClusterEvent.MemberUp message) {
String memberName = message.member().roles().head();
+ LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
+ message.member().address());
+
memberNameToAddress.put(memberName, message.member().address());
for(ShardInformation info : localShards.values()){
}
+ @VisibleForTesting
+ protected ClusterWrapper getCluster() {
+ return cluster;
+ }
+
@VisibleForTesting
protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) {
return getContext().actorOf(Shard.props(info.getShardId(),
}
private void findPrimary(FindPrimary message) {
+ LOG.debug("{}: In findPrimary: {}", persistenceId(), message);
+
final String shardName = message.getShardName();
// First see if the there is a local replica for the shard
sendResponse(info, message.isWaitUntilReady(), true, new Supplier<Object>() {
@Override
public Object get() {
- Object found = new PrimaryFound(info.getSerializedLeaderActor()).toSerializable();
+ Object found = new PrimaryFound(info.getSerializedLeaderActor());
if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Found primary for {}: {}", shardName, found);
+ LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
}
return found;
return;
}
- List<String> members = configuration.getMembersFromShardName(shardName);
+ for(Map.Entry<String, Address> entry: memberNameToAddress.entrySet()) {
+ if(!cluster.getCurrentMemberName().equals(entry.getKey())) {
+ String path = getShardManagerActorPathBuilder(entry.getValue()).toString();
- if(cluster.getCurrentMemberName() != null) {
- members.remove(cluster.getCurrentMemberName());
- }
+ LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}", persistenceId(),
+ shardName, path);
- /**
- * FIXME: Instead of sending remote shard actor path back to sender,
- * forward FindPrimary message to remote shard manager
- */
- // There is no way for us to figure out the primary (for now) so assume
- // that one of the remote nodes is a primary
- for(String memberName : members) {
- Address address = memberNameToAddress.get(memberName);
- if(address != null){
- String path =
- getShardActorPath(shardName, memberName);
- getSender().tell(new PrimaryFound(path).toSerializable(), getSelf());
+ getContext().actorSelection(path).forward(message, getContext());
return;
}
}
- getSender().tell(new PrimaryNotFound(shardName).toSerializable(), getSelf());
+
+ LOG.debug("{}: No shard found for {}", persistenceId(), shardName);
+
+ getSender().tell(new PrimaryNotFoundException(
+ String.format("No primary shard found for %s.", shardName)), getSelf());
+ }
+
+ private StringBuilder getShardManagerActorPathBuilder(Address address) {
+ StringBuilder builder = new StringBuilder();
+ builder.append(address.toString()).append("/user/").append(shardManagerIdentifierString);
+ return builder;
}
private String getShardActorPath(String shardName, String memberName) {
Address address = memberNameToAddress.get(memberName);
if(address != null) {
- StringBuilder builder = new StringBuilder();
- builder.append(address.toString())
- .append("/user/")
- .append(ShardManagerIdentifier.builder().type(type).build().toString())
- .append("/")
+ StringBuilder builder = getShardManagerActorPathBuilder(address);
+ builder.append("/")
.append(getShardIdentifier(memberName, shardName));
return builder.toString();
}
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore.messages;
-
-import java.io.Serializable;
-
-public class ActorNotInitialized implements Serializable {
- private static final long serialVersionUID = 1L;
-}
package org.opendaylight.controller.cluster.datastore.messages;
import com.google.common.base.Preconditions;
+import java.io.Serializable;
/**
* The FindPrimary message is used to locate the primary of any given shard
*
*/
-public class FindPrimary implements SerializableMessage{
- public static final Class<FindPrimary> SERIALIZABLE_CLASS = FindPrimary.class;
+public class FindPrimary implements Serializable {
+ private static final long serialVersionUID = 1L;
private final String shardName;
private final boolean waitUntilReady;
return waitUntilReady;
}
- @Override
- public Object toSerializable() {
- return this;
- }
-
- public static FindPrimary fromSerializable(Object message){
- return (FindPrimary) message;
- }
-
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
package org.opendaylight.controller.cluster.datastore.messages;
+import java.io.Serializable;
-public class PrimaryFound implements SerializableMessage {
- public static final Class<PrimaryFound> SERIALIZABLE_CLASS = PrimaryFound.class;
- private final String primaryPath;
+public class PrimaryFound implements Serializable {
+ private static final long serialVersionUID = 1L;
- public PrimaryFound(final String primaryPath) {
- this.primaryPath = primaryPath;
- }
+ private final String primaryPath;
- public String getPrimaryPath() {
- return primaryPath;
- }
-
- @Override
- public boolean equals(final Object o) {
- if (this == o) {
- return true;
+ public PrimaryFound(final String primaryPath) {
+ this.primaryPath = primaryPath;
}
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- PrimaryFound that = (PrimaryFound) o;
- if (!primaryPath.equals(that.primaryPath)) {
- return false;
+ public String getPrimaryPath() {
+ return primaryPath;
}
- return true;
- }
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
- @Override
- public int hashCode() {
- return primaryPath.hashCode();
- }
+ PrimaryFound that = (PrimaryFound) o;
- @Override
- public String toString() {
- return "PrimaryFound{" +
- "primaryPath='" + primaryPath + '\'' +
- '}';
- }
+ if (!primaryPath.equals(that.primaryPath)) {
+ return false;
+ }
+ return true;
+ }
- @Override
- public Object toSerializable() {
- return this;
- }
+ @Override
+ public int hashCode() {
+ return primaryPath.hashCode();
+ }
- public static PrimaryFound fromSerializable(final Object message){
- return (PrimaryFound) message;
- }
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("PrimaryFound [primaryPath=").append(primaryPath).append("]");
+ return builder.toString();
+ }
}
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore.messages;
-
-import com.google.common.base.Preconditions;
-
-public class PrimaryNotFound implements SerializableMessage {
- public static final Class<PrimaryNotFound> SERIALIZABLE_CLASS = PrimaryNotFound.class;
-
- private final String shardName;
-
- public PrimaryNotFound(final String shardName){
-
- Preconditions.checkNotNull(shardName, "shardName should not be null");
-
- this.shardName = shardName;
- }
-
- @Override
- public boolean equals(final Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- PrimaryNotFound that = (PrimaryNotFound) o;
-
- if (shardName != null ? !shardName.equals(that.shardName) : that.shardName != null) {
- return false;
- }
-
- return true;
- }
-
- @Override
- public int hashCode() {
- return shardName != null ? shardName.hashCode() : 0;
- }
-
- @Override
- public Object toSerializable() {
- return this;
- }
-
- public static PrimaryNotFound fromSerializable(final Object message){
- return (PrimaryNotFound) message;
- }
-}
package org.opendaylight.controller.cluster.datastore.messages;
import akka.actor.ActorPath;
+import akka.actor.ActorRef;
import akka.actor.ActorSystem;
+import akka.serialization.Serialization;
import org.opendaylight.controller.cluster.datastore.util.InstanceIdentifierUtils;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
import org.opendaylight.controller.protobuff.messages.registration.ListenerRegistrationMessages;
ListenerRegistrationMessages.RegisterChangeListener.class;
private final YangInstanceIdentifier path;
- private final ActorPath dataChangeListenerPath;
+ private final ActorRef dataChangeListener;
private final AsyncDataBroker.DataChangeScope scope;
public RegisterChangeListener(YangInstanceIdentifier path,
- ActorPath dataChangeListenerPath,
+ ActorRef dataChangeListener,
AsyncDataBroker.DataChangeScope scope) {
this.path = path;
- this.dataChangeListenerPath = dataChangeListenerPath;
+ this.dataChangeListener = dataChangeListener;
this.scope = scope;
}
}
public ActorPath getDataChangeListenerPath() {
- return dataChangeListenerPath;
+ return dataChangeListener.path();
}
public ListenerRegistrationMessages.RegisterChangeListener toSerializable() {
return ListenerRegistrationMessages.RegisterChangeListener.newBuilder()
.setInstanceIdentifierPath(InstanceIdentifierUtils.toSerializable(path))
- .setDataChangeListenerActorPath(dataChangeListenerPath.toString())
+ .setDataChangeListenerActorPath(Serialization.serializedActorPath(dataChangeListener))
.setDataChangeScope(scope.ordinal()).build();
}
- public static RegisterChangeListener fromSerializable(ActorSystem actorSystem,Object serializable){
+ public static RegisterChangeListener fromSerializable(ActorSystem actorSystem, Object serializable){
ListenerRegistrationMessages.RegisterChangeListener o = (ListenerRegistrationMessages.RegisterChangeListener) serializable;
return new RegisterChangeListener(InstanceIdentifierUtils.fromSerializable(o.getInstanceIdentifierPath()),
- actorSystem.actorFor(o.getDataChangeListenerActorPath()).path(),
+ actorSystem.provider().resolveActorRef(o.getDataChangeListenerActorPath()),
AsyncDataBroker.DataChangeScope.values()[o.getDataChangeScope()]);
}
package org.opendaylight.controller.cluster.datastore.messages;
import akka.actor.ActorPath;
+import akka.actor.ActorRef;
import akka.actor.ActorSystem;
+import akka.serialization.Serialization;
import org.opendaylight.controller.protobuff.messages.registration.ListenerRegistrationMessages;
public class RegisterChangeListenerReply implements SerializableMessage{
public static final Class<ListenerRegistrationMessages.RegisterChangeListenerReply> SERIALIZABLE_CLASS =
ListenerRegistrationMessages.RegisterChangeListenerReply.class;
- private final ActorPath listenerRegistrationPath;
+ private final ActorRef listenerRegistration;
- public RegisterChangeListenerReply(final ActorPath listenerRegistrationPath) {
- this.listenerRegistrationPath = listenerRegistrationPath;
+ public RegisterChangeListenerReply(final ActorRef listenerRegistration) {
+ this.listenerRegistration = listenerRegistration;
}
public ActorPath getListenerRegistrationPath() {
- return listenerRegistrationPath;
+ return listenerRegistration.path();
}
@Override
public ListenerRegistrationMessages.RegisterChangeListenerReply toSerializable() {
return ListenerRegistrationMessages.RegisterChangeListenerReply.newBuilder()
- .setListenerRegistrationPath(listenerRegistrationPath.toString()).build();
+ .setListenerRegistrationPath(Serialization.serializedActorPath(listenerRegistration)).build();
}
public static RegisterChangeListenerReply fromSerializable(final ActorSystem actorSystem,final Object serializable){
ListenerRegistrationMessages.RegisterChangeListenerReply o = (ListenerRegistrationMessages.RegisterChangeListenerReply) serializable;
return new RegisterChangeListenerReply(
- actorSystem.actorFor(o.getListenerRegistrationPath()).path()
+ actorSystem.provider().resolveActorRef(o.getListenerRegistrationPath())
);
}
}
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
-import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
return ret;
}
Future<Object> future = executeOperationAsync(shardManager,
- new FindPrimary(shardName, true).toSerializable(), shardInitializationTimeout);
+ new FindPrimary(shardName, true), shardInitializationTimeout);
return future.transform(new Mapper<Object, ActorSelection>() {
@Override
public ActorSelection checkedApply(Object response) throws Exception {
- if(PrimaryFound.SERIALIZABLE_CLASS.isInstance(response)) {
- PrimaryFound found = PrimaryFound.fromSerializable(response);
+ if(response instanceof PrimaryFound) {
+ PrimaryFound found = (PrimaryFound)response;
LOG.debug("Primary found {}", found.getPrimaryPath());
ActorSelection actorSelection = actorSystem.actorSelection(found.getPrimaryPath());
primaryShardActorSelectionCache.put(shardName, Futures.successful(actorSelection));
return actorSelection;
- } else if(response instanceof ActorNotInitialized) {
- throw new NotInitializedException(
- String.format("Found primary shard %s but it's not initialized yet. " +
- "Please try again later", shardName));
- } else if(response instanceof PrimaryNotFound) {
- throw new PrimaryNotFoundException(
- String.format("No primary shard found for %S.", shardName));
+ } else if(response instanceof NotInitializedException) {
+ throw (NotInitializedException)response;
+ } else if(response instanceof PrimaryNotFoundException) {
+ throw (PrimaryNotFoundException)response;
} else if(response instanceof NoShardLeaderException) {
throw (NoShardLeaderException)response;
}
LocalShardFound found = (LocalShardFound)response;
LOG.debug("Local shard found {}", found.getPath());
return found.getPath();
- } else if(response instanceof ActorNotInitialized) {
- throw new NotInitializedException(
- String.format("Found local shard for %s but it's not initialized yet.",
- shardName));
+ } else if(response instanceof NotInitializedException) {
+ throw (NotInitializedException)response;
} else if(response instanceof LocalShardNotFound) {
throw new LocalShardNotFoundException(
String.format("Local shard for %s does not exist.", shardName));
*
* @author Thomas Pantelis
*/
-public class DOMConcurrentDataCommitCoordinatorTest {
+public class ConcurrentDOMDataBrokerTest {
private final DOMDataWriteTransaction transaction = mock(DOMDataWriteTransaction.class);
private final DOMStoreThreePhaseCommitCohort mockCohort1 = mock(DOMStoreThreePhaseCommitCohort.class);
*/
package org.opendaylight.controller.cluster.datastore;
-import static org.mockito.Mockito.any;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
-import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
+import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
Assert.assertEquals("getPath", path, registerMsg.getPath());
Assert.assertEquals("getScope", scope, registerMsg.getScope());
- reply(new RegisterChangeListenerReply(getRef().path()));
+ reply(new RegisterChangeListenerReply(getRef()));
for(int i = 0; (i < 20 * 5) && proxy.getListenerRegistrationActor() == null; i++) {
Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
- reply(new ActorNotInitialized());
+ reply(new NotInitializedException("not initialized"));
new Within(duration("1 seconds")) {
@Override
@Override
public Future<Object> answer(InvocationOnMock invocation) {
proxy.close();
- return Futures.successful((Object)new RegisterChangeListenerReply(getRef().path()));
+ return Futures.successful((Object)new RegisterChangeListenerReply(getRef()));
}
};
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.AddressFromURIString;
import akka.actor.Props;
+import akka.cluster.Cluster;
+import akka.cluster.ClusterEvent;
+import akka.dispatch.Dispatchers;
import akka.japi.Creator;
import akka.pattern.Patterns;
import akka.persistence.RecoveryCompleted;
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
import akka.util.Timeout;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Uninterruptibles;
+import com.typesafe.config.ConfigFactory;
import java.net.URI;
import java.util.Arrays;
import java.util.Collection;
import org.mockito.MockitoAnnotations;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
+import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
+import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
-import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder().
dataStoreType(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS);
+ private static ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) {
+ String name = new ShardIdentifier(shardName, memberName,"config").toString();
+ return TestActorRef.create(system, Props.create(MessageCollectorActor.class), name);
+ }
+
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
}
private Props newPropsShardMgrWithMockShardActor() {
+ return newPropsShardMgrWithMockShardActor("shardManager", mockShardActor, new MockClusterWrapper(),
+ new MockConfiguration());
+ }
+
+ private Props newPropsShardMgrWithMockShardActor(final String name, final ActorRef shardActor,
+ final ClusterWrapper clusterWrapper, final Configuration config) {
Creator<ShardManager> creator = new Creator<ShardManager>() {
private static final long serialVersionUID = 1L;
@Override
public ShardManager create() throws Exception {
- return new ShardManager(new MockClusterWrapper(), new MockConfiguration(),
- datastoreContextBuilder.build(), ready) {
- @Override
- protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
- return mockShardActor;
- }
- };
+ return new ForwardingShardManager(clusterWrapper, config, datastoreContextBuilder.build(),
+ ready, name, shardActor);
}
};
- return Props.create(new DelegatingShardManagerCreator(creator));
+ return Props.create(new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId());
}
@Test
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
- shardManager.tell(new FindPrimary("non-existent", false).toSerializable(), getRef());
+ shardManager.tell(new FindPrimary("non-existent", false), getRef());
- expectMsgEquals(duration("5 seconds"), new PrimaryNotFound("non-existent").toSerializable());
+ expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
}};
}
shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
RaftState.Leader.name())), mockShardActor);
- shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
- PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
+ PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
primaryFound.getPrimaryPath().contains("member-1-shard-default"));
}};
RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
shardManager.tell(new LeaderStateChanged(memberId1, memberId2), mockShardActor);
- shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
- PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
+ PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
primaryFound.getPrimaryPath().contains("member-2-shard-default"));
}};
new JavaTestKit(getSystem()) {{
final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
- shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
- expectMsgClass(duration("5 seconds"), ActorNotInitialized.class);
+ expectMsgClass(duration("5 seconds"), NotInitializedException.class);
}};
}
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
shardManager.tell(new ActorInitialized(), mockShardActor);
- shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
}};
shardManager.tell(new RoleChangeNotification(memberId,
RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
- shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
shardManager.tell(new LeaderStateChanged(memberId, memberId), mockShardActor);
- shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
- PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
+ PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
primaryFound.getPrimaryPath().contains("member-1-shard-default"));
}};
// We're passing waitUntilInitialized = true to FindPrimary so the response should be
// delayed until we send ActorInitialized and RoleChangeNotification.
- shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef());
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
shardManager.tell(new LeaderStateChanged(memberId, memberId), mockShardActor);
- PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
+ PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
primaryFound.getPrimaryPath().contains("member-1-shard-default"));
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
- shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef());
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
- expectMsgClass(duration("2 seconds"), ActorNotInitialized.class);
+ expectMsgClass(duration("2 seconds"), NotInitializedException.class);
shardManager.tell(new ActorInitialized(), mockShardActor);
shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
null, RaftState.Candidate.name()), mockShardActor);
- shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef());
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
}};
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
shardManager.tell(new ActorInitialized(), mockShardActor);
- shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef());
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
}};
}
+ @Test
+ public void testOnReceiveFindPrimaryForRemoteShard() throws Exception {
+ String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
+
+ // Create an ActorSystem ShardManager actor for member-1.
+
+ final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
+ Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
+
+ ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
+
+ final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
+ newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
+ new MockConfiguration()), shardManagerID);
+
+ // Create an ActorSystem ShardManager actor for member-2.
+
+ final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
+
+ Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
+
+ final ActorRef mockShardActor2 = newMockShardActor(system2, "astronauts", "member-2");
+
+ MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
+ put("default", Arrays.asList("member-1", "member-2")).
+ put("astronauts", Arrays.asList("member-2")).build());
+
+ final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
+ newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
+ mockConfig2), shardManagerID);
+
+ new JavaTestKit(system1) {{
+
+ shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+ shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+
+ shardManager2.tell(new ActorInitialized(), mockShardActor2);
+
+ String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
+ shardManager2.tell(new LeaderStateChanged(memberId2, memberId2), mockShardActor2);
+ shardManager2.tell(new RoleChangeNotification(memberId2,
+ RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
+
+ shardManager1.underlyingActor().waitForMemberUp();
+
+ shardManager1.tell(new FindPrimary("astronauts", false), getRef());
+
+ PrimaryFound found = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
+ String path = found.getPrimaryPath();
+ assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config"));
+
+ shardManager2.underlyingActor().verifyFindPrimary();
+
+ Cluster.get(system2).down(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
+
+ shardManager1.underlyingActor().waitForMemberRemoved();
+
+ shardManager1.tell(new FindPrimary("astronauts", false), getRef());
+
+ expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
+ }};
+
+ JavaTestKit.shutdownActorSystem(system1);
+ JavaTestKit.shutdownActorSystem(system2);
+ }
+
@Test
public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
new JavaTestKit(getSystem()) {{
shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
- expectMsgClass(duration("5 seconds"), ActorNotInitialized.class);
+ expectMsgClass(duration("5 seconds"), NotInitializedException.class);
}};
}
}};
}
- @Test
- public void testOnReceiveMemberUp() throws Exception {
- new JavaTestKit(getSystem()) {{
- final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
-
- MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
-
- shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
-
- PrimaryFound found = PrimaryFound.fromSerializable(expectMsgClass(duration("5 seconds"),
- PrimaryFound.SERIALIZABLE_CLASS));
- String path = found.getPrimaryPath();
- assertTrue("Found path contains " + path, path.contains("member-2-shard-astronauts-config"));
- }};
- }
-
- @Test
- public void testOnReceiveMemberDown() throws Exception {
-
- new JavaTestKit(getSystem()) {{
- final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
-
- MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
-
- shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
-
- expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
-
- MockClusterWrapper.sendMemberRemoved(shardManager, "member-2", getRef().path().toString());
-
- shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
-
- expectMsgClass(duration("5 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS);
- }};
- }
-
@Test
public void testOnRecoveryJournalIsCleaned() {
InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
return delegate.create();
}
}
+
+ private static class ForwardingShardManager extends ShardManager {
+ private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1);
+ private CountDownLatch memberUpReceived = new CountDownLatch(1);
+ private CountDownLatch memberRemovedReceived = new CountDownLatch(1);
+ private final ActorRef shardActor;
+ private final String name;
+
+ protected ForwardingShardManager(ClusterWrapper cluster, Configuration configuration,
+ DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch, String name,
+ ActorRef shardActor) {
+ super(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch);
+ this.shardActor = shardActor;
+ this.name = name;
+ }
+
+ @Override
+ public void handleCommand(Object message) throws Exception {
+ try{
+ super.handleCommand(message);
+ } finally {
+ if(message instanceof FindPrimary) {
+ findPrimaryMessageReceived.countDown();
+ } else if(message instanceof ClusterEvent.MemberUp) {
+ String role = ((ClusterEvent.MemberUp)message).member().roles().head();
+ if(!getCluster().getCurrentMemberName().equals(role)) {
+ memberUpReceived.countDown();
+ }
+ } else if(message instanceof ClusterEvent.MemberRemoved) {
+ String role = ((ClusterEvent.MemberRemoved)message).member().roles().head();
+ if(!getCluster().getCurrentMemberName().equals(role)) {
+ memberRemovedReceived.countDown();
+ }
+ }
+ }
+ }
+
+ @Override
+ public String persistenceId() {
+ return name;
+ }
+
+ @Override
+ protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
+ return shardActor;
+ }
+
+ void waitForMemberUp() {
+ assertEquals("MemberUp received", true,
+ Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS));
+ memberUpReceived = new CountDownLatch(1);
+ }
+
+ void waitForMemberRemoved() {
+ assertEquals("MemberRemoved received", true,
+ Uninterruptibles.awaitUninterruptibly(memberRemovedReceived, 5, TimeUnit.SECONDS));
+ memberRemovedReceived = new CountDownLatch(1);
+ }
+
+ void verifyFindPrimary() {
+ assertEquals("FindPrimary received", true,
+ Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS));
+ findPrimaryMessageReceived = new CountDownLatch(1);
+ }
+ }
}
"testRegisterChangeListener-DataChangeListener");
shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
- dclActor.path(), AsyncDataBroker.DataChangeScope.BASE), getRef());
+ dclActor, AsyncDataBroker.DataChangeScope.BASE), getRef());
RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
RegisterChangeListenerReply.class);
onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
// Now send the RegisterChangeListener and wait for the reply.
- shard.tell(new RegisterChangeListener(path, dclActor.path(),
+ shard.tell(new RegisterChangeListener(path, dclActor,
AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
--- /dev/null
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import static junit.framework.TestCase.assertEquals;
+import akka.actor.Actor;
+import akka.serialization.Serialization;
+import akka.testkit.TestActorRef;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
+import org.opendaylight.controller.cluster.raft.TestActorFactory;
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import org.opendaylight.controller.protobuff.messages.registration.ListenerRegistrationMessages;
+
+public class RegisterChangeListenerReplyTest extends AbstractActorTest {
+
+ private TestActorFactory factory;
+
+
+ @Before
+ public void setUp(){
+ factory = new TestActorFactory(getSystem());
+ }
+
+ @After
+ public void shutDown(){
+ factory.close();
+ }
+
+ @Test
+ public void testToSerializable(){
+ TestActorRef<Actor> testActor = factory.createTestActor(MessageCollectorActor.props());
+
+ RegisterChangeListenerReply registerChangeListenerReply = new RegisterChangeListenerReply(testActor);
+
+ ListenerRegistrationMessages.RegisterChangeListenerReply serialized
+ = registerChangeListenerReply.toSerializable();
+
+ assertEquals(Serialization.serializedActorPath(testActor), serialized.getListenerRegistrationPath());
+ }
+
+ @Test
+ public void testFromSerializable(){
+ TestActorRef<Actor> testActor = factory.createTestActor(MessageCollectorActor.props());
+
+ RegisterChangeListenerReply registerChangeListenerReply = new RegisterChangeListenerReply(testActor);
+
+ ListenerRegistrationMessages.RegisterChangeListenerReply serialized
+ = registerChangeListenerReply.toSerializable();
+
+
+ RegisterChangeListenerReply fromSerialized
+ = RegisterChangeListenerReply.fromSerializable(getSystem(), serialized);
+
+ assertEquals(testActor.path().toString(), fromSerialized.getListenerRegistrationPath().toString());
+ }
+
+}
\ No newline at end of file
--- /dev/null
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import static junit.framework.TestCase.assertEquals;
+import akka.actor.Actor;
+import akka.serialization.Serialization;
+import akka.testkit.TestActorRef;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
+import org.opendaylight.controller.cluster.raft.TestActorFactory;
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
+import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
+import org.opendaylight.controller.protobuff.messages.registration.ListenerRegistrationMessages;
+
+public class RegisterChangeListenerTest extends AbstractActorTest {
+
+ private TestActorFactory factory;
+
+ @Before
+ public void setUp(){
+ factory = new TestActorFactory(getSystem());
+ }
+
+ @After
+ public void shutDown(){
+ factory.close();
+ }
+
+ @Test
+ public void testToSerializable(){
+ TestActorRef<Actor> testActor = factory.createTestActor(MessageCollectorActor.props());
+ RegisterChangeListener registerChangeListener = new RegisterChangeListener(TestModel.TEST_PATH, testActor
+ , AsyncDataBroker.DataChangeScope.BASE);
+
+ ListenerRegistrationMessages.RegisterChangeListener serialized
+ = registerChangeListener.toSerializable();
+
+ NormalizedNodeMessages.InstanceIdentifier path = serialized.getInstanceIdentifierPath();
+
+ assertEquals("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test", path.getCode(0));
+ assertEquals(Serialization.serializedActorPath(testActor), serialized.getDataChangeListenerActorPath());
+ assertEquals(AsyncDataBroker.DataChangeScope.BASE.ordinal(), serialized.getDataChangeScope());
+
+ }
+
+ @Test
+ public void testFromSerializable(){
+ TestActorRef<Actor> testActor = factory.createTestActor(MessageCollectorActor.props());
+ RegisterChangeListener registerChangeListener = new RegisterChangeListener(TestModel.TEST_PATH, testActor
+ , AsyncDataBroker.DataChangeScope.SUBTREE);
+
+ ListenerRegistrationMessages.RegisterChangeListener serialized
+ = registerChangeListener.toSerializable();
+
+
+ RegisterChangeListener fromSerialized = RegisterChangeListener.fromSerializable(getSystem(), serialized);
+
+ assertEquals(TestModel.TEST_PATH, registerChangeListener.getPath());
+ assertEquals(testActor.path().toString(), fromSerialized.getDataChangeListenerPath().toString());
+ assertEquals(AsyncDataBroker.DataChangeScope.SUBTREE, fromSerialized.getScope());
+
+
+ }
+}
\ No newline at end of file
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
-import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
mock(Configuration.class), dataStoreContext) {
@Override
protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
- return Futures.successful((Object) new PrimaryNotFound("foobar"));
+ return Futures.successful((Object) new PrimaryNotFoundException("not found"));
}
};
mock(Configuration.class), dataStoreContext) {
@Override
protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
- return Futures.successful((Object) new ActorNotInitialized());
+ return Futures.successful((Object) new NotInitializedException("not iniislized"));
}
};
TestActorRef<MockShardManager> shardManagerActorRef = TestActorRef.create(getSystem(), MockShardManager.props());
MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor();
- shardManagerActor.addFindPrimaryResp("shard1", new PrimaryFound(shardActorRef1.path().toString()).toSerializable());
- shardManagerActor.addFindPrimaryResp("shard2", new PrimaryFound(shardActorRef2.path().toString()).toSerializable());
+ shardManagerActor.addFindPrimaryResp("shard1", new PrimaryFound(shardActorRef1.path().toString()));
+ shardManagerActor.addFindPrimaryResp("shard2", new PrimaryFound(shardActorRef2.path().toString()));
shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found"));
Configuration mockConfig = mock(Configuration.class);
import akka.cluster.ClusterEvent;
import akka.cluster.MemberStatus;
import akka.cluster.UniqueAddress;
-import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
-import scala.collection.JavaConversions;
import java.util.HashSet;
import java.util.Set;
+import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
+import scala.collection.JavaConversions;
public class MockClusterWrapper implements ClusterWrapper{
private Address selfAddress = new Address("akka.tcp", "test", "127.0.0.1", 2550);
+ private String currentMemberName = "member-1";
+
+ public MockClusterWrapper() {
+ }
+
+ public MockClusterWrapper(String currentMemberName) {
+ this.currentMemberName = currentMemberName;
+ }
@Override
public void subscribeToMemberEvents(ActorRef actorRef) {
@Override
public String getCurrentMemberName() {
- return "member-1";
+ return currentMemberName;
}
@Override
package org.opendaylight.controller.cluster.datastore.utils;
import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy;
public class MockConfiguration implements Configuration{
- @Override public List<String> getMemberShardNames(final String memberName) {
- return Arrays.asList("default");
+ private Map<String, List<String>> shardMembers = ImmutableMap.<String, List<String>>builder().
+ put("default", Arrays.asList("member-1", "member-2")).
+ /*put("astronauts", Arrays.asList("member-2", "member-3")).*/build();
+
+ public MockConfiguration() {
+ }
+
+ public MockConfiguration(Map<String, List<String>> shardMembers) {
+ this.shardMembers = shardMembers;
}
- @Override public Optional<String> getModuleNameFromNameSpace(
+ @Override
+ public List<String> getMemberShardNames(final String memberName) {
+ return new ArrayList<>(shardMembers.keySet());
+ }
+ @Override
+ public Optional<String> getModuleNameFromNameSpace(
final String nameSpace) {
return Optional.absent();
}
return Arrays.asList("member-2", "member-3");
}
- return Collections.emptyList();
+ List<String> members = shardMembers.get(shardName);
+ return members != null ? members : Collections.<String>emptyList();
}
@Override public Set<String> getAllShardNames() {
mailbox-capacity = 1000
mailbox-push-timeout-time = 100ms
}
+
+Member1 {
+ bounded-mailbox {
+ mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
+ mailbox-capacity = 1000
+ mailbox-push-timeout-time = 100ms
+ }
+
+ in-memory-journal {
+ class = "org.opendaylight.controller.cluster.raft.utils.InMemoryJournal"
+ }
+
+ in-memory-snapshot-store {
+ class = "org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore"
+ plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
+ }
+
+ akka {
+ persistence.snapshot-store.plugin = "in-memory-snapshot-store"
+ persistence.journal.plugin = "in-memory-journal"
+
+ loglevel = "DEBUG"
+
+ actor {
+ provider = "akka.cluster.ClusterActorRefProvider"
+
+ serializers {
+ java = "akka.serialization.JavaSerializer"
+ proto = "akka.remote.serialization.ProtobufSerializer"
+ }
+
+ serialization-bindings {
+ "com.google.protobuf.Message" = proto
+ }
+ }
+ remote {
+ log-remote-lifecycle-events = off
+ netty.tcp {
+ hostname = "127.0.0.1"
+ port = 2558
+ }
+ }
+
+ cluster {
+ auto-down-unreachable-after = 100s
+
+ roles = [
+ "member-1"
+ ]
+ }
+ }
+}
+
+Member2 {
+ bounded-mailbox {
+ mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
+ mailbox-capacity = 1000
+ mailbox-push-timeout-time = 100ms
+ }
+
+ in-memory-journal {
+ class = "org.opendaylight.controller.cluster.raft.utils.InMemoryJournal"
+ }
+
+ in-memory-snapshot-store {
+ class = "org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore"
+ plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
+ }
+
+ akka {
+ persistence.snapshot-store.plugin = "in-memory-snapshot-store"
+ persistence.journal.plugin = "in-memory-journal"
+
+ actor {
+ provider = "akka.cluster.ClusterActorRefProvider"
+
+ serializers {
+ java = "akka.serialization.JavaSerializer"
+ proto = "akka.remote.serialization.ProtobufSerializer"
+ }
+
+ serialization-bindings {
+ "com.google.protobuf.Message" = proto
+ }
+ }
+ remote {
+ log-remote-lifecycle-events = off
+ netty.tcp {
+ hostname = "127.0.0.1"
+ port = 2559
+ }
+ }
+
+ cluster {
+ auto-down-unreachable-after = 100s
+
+ roles = [
+ "member-2"
+ ]
+ }
+ }
+}