import akka.actor.ActorSystem;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.DatastoreConfigurationMXBeanImpl;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
DatastoreContextConfigAdminOverlay.Listener, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class);
- public static final int REGISTER_DATA_CHANGE_LISTENER_TIMEOUT_FACTOR = 24; // 24 times the usual operation timeout
+ private static final String UNKNOWN_TYPE = "unknown";
+
+ private static final long READY_WAIT_FACTOR = 3;
private final ActorContext actorContext;
+ private final long waitTillReadyTimeInMillis;
+
private AutoCloseable closeable;
private DatastoreConfigurationMXBeanImpl datastoreConfigMXBean;
+ private CountDownLatch waitTillReadyCountDownLatch = new CountDownLatch(1);
+
+ private final String type;
+
public DistributedDataStore(ActorSystem actorSystem, ClusterWrapper cluster,
Configuration configuration, DatastoreContext datastoreContext) {
Preconditions.checkNotNull(actorSystem, "actorSystem should not be null");
Preconditions.checkNotNull(configuration, "configuration should not be null");
Preconditions.checkNotNull(datastoreContext, "datastoreContext should not be null");
- String type = datastoreContext.getDataStoreType();
+ this.type = datastoreContext.getDataStoreType();
String shardManagerId = ShardManagerIdentifier.builder().type(type).build().toString();
new Dispatchers(actorSystem.dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
actorContext = new ActorContext(actorSystem, actorSystem.actorOf(
- ShardManager.props(cluster, configuration, datastoreContext)
+ ShardManager.props(cluster, configuration, datastoreContext, waitTillReadyCountDownLatch)
.withDispatcher(shardDispatcher).withMailbox(ActorContext.MAILBOX), shardManagerId ),
cluster, configuration, datastoreContext);
+ this.waitTillReadyTimeInMillis =
+ actorContext.getDatastoreContext().getShardLeaderElectionTimeout().duration().toMillis() * READY_WAIT_FACTOR;
+
+
datastoreConfigMXBean = new DatastoreConfigurationMXBeanImpl(datastoreContext.getDataStoreMXBeanType());
datastoreConfigMXBean.setContext(datastoreContext);
datastoreConfigMXBean.registerMBean();
public DistributedDataStore(ActorContext actorContext) {
this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null");
+ this.type = UNKNOWN_TYPE;
+ this.waitTillReadyTimeInMillis =
+ actorContext.getDatastoreContext().getShardLeaderElectionTimeout().duration().toMillis() * READY_WAIT_FACTOR;
+
}
public void setCloseable(AutoCloseable closeable) {
ActorContext getActorContext() {
return actorContext;
}
+
+ public void waitTillReady(){
+ LOG.info("Beginning to wait for data store to become ready : {}", type);
+
+ try {
+ waitTillReadyCountDownLatch.await(waitTillReadyTimeInMillis, TimeUnit.MILLISECONDS);
+
+ LOG.debug("Data store {} is now ready", type);
+ } catch (InterruptedException e) {
+ LOG.error("Interrupted when trying to wait for shards to become leader in a reasonable amount of time - giving up");
+ }
+ }
+
+ @VisibleForTesting
+ public CountDownLatch getWaitTillReadyCountDownLatch() {
+ return waitTillReadyCountDownLatch;
+ }
}
schemaService.registerSchemaContextListener(dataStore);
dataStore.setCloseable(overlay);
+ dataStore.waitTillReady();
return dataStore;
}
import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
import org.opendaylight.controller.cluster.datastore.utils.MessageTracker;
import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
+import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
import org.opendaylight.controller.cluster.raft.RaftActor;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
private Optional<ActorRef> createRoleChangeNotifier(String shardId) {
ActorRef shardRoleChangeNotifier = this.getContext().actorOf(
RoleChangeNotifier.getProps(shardId), shardId + "-notifier");
- return Optional.<ActorRef>of(shardRoleChangeNotifier);
+ return Optional.of(shardRoleChangeNotifier);
}
@Override
handleTransactionCommitTimeoutCheck();
} else if(message instanceof DatastoreContext) {
onDatastoreContext((DatastoreContext)message);
+ } else if(message instanceof RegisterRoleChangeListener){
+ roleChangeNotifier.get().forward(message, context());
} else {
super.onReceiveCommand(message);
}
import akka.persistence.RecoveryFailure;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
+import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
+import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
+import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
private final DataPersistenceProvider dataPersistenceProvider;
+ private final CountDownLatch waitTillReadyCountdownLatch;
+
/**
*/
protected ShardManager(ClusterWrapper cluster, Configuration configuration,
- DatastoreContext datastoreContext) {
+ DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch) {
this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
this.type = datastoreContext.getDataStoreType();
this.shardDispatcherPath =
new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
+ this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
// Subscribe this actor to cluster member events
cluster.subscribeToMemberEvents(getSelf());
public static Props props(
final ClusterWrapper cluster,
final Configuration configuration,
- final DatastoreContext datastoreContext) {
+ final DatastoreContext datastoreContext,
+ final CountDownLatch waitTillReadyCountdownLatch) {
Preconditions.checkNotNull(cluster, "cluster should not be null");
Preconditions.checkNotNull(configuration, "configuration should not be null");
+ Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null");
- return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContext));
+ return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch));
}
@Override
ignoreMessage(message);
} else if(message instanceof DatastoreContext) {
onDatastoreContext((DatastoreContext)message);
+ } else if(message instanceof RoleChangeNotification){
+ onRoleChangeNotification((RoleChangeNotification) message);
} else{
unknownMessage(message);
}
}
+ private void onRoleChangeNotification(RoleChangeNotification message) {
+ RoleChangeNotification roleChanged = message;
+ LOG.info("Received role changed for {} from {} to {}", roleChanged.getMemberId(),
+ roleChanged.getOldRole(), roleChanged.getNewRole());
+
+ ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId());
+ if(shardInformation != null) {
+ shardInformation.setRole(roleChanged.getNewRole());
+
+ if (isReady()) {
+ LOG.info("All Shards are ready - data store {} is ready, available count is {}", type,
+ waitTillReadyCountdownLatch.getCount());
+
+ waitTillReadyCountdownLatch.countDown();
+ }
+ }
+ }
+
+
+ private ShardInformation findShardInformation(String memberId) {
+ for(ShardInformation info : localShards.values()){
+ if(info.getShardId().toString().equals(memberId)){
+ return info;
+ }
+ }
+
+ return null;
+ }
+
+ private boolean isReady() {
+ boolean isReady = true;
+ for (ShardInformation info : localShards.values()) {
+ if(RaftState.Candidate.name().equals(info.getRole()) || Strings.isNullOrEmpty(info.getRole())){
+ isReady = false;
+ break;
+ }
+ }
+ return isReady;
+ }
+
private void onActorInitialized(Object message) {
final ActorRef sender = getSender();
for (ShardInformation info : localShards.values()) {
if (info.getActor() == null) {
info.setActor(getContext().actorOf(Shard.props(info.getShardId(),
- info.getPeerAddresses(), datastoreContext, schemaContext)
+ info.getPeerAddresses(), datastoreContext, schemaContext)
.withDispatcher(shardDispatcherPath), info.getShardId().toString()));
} else {
info.getActor().tell(message, getSelf());
}
+ info.getActor().tell(new RegisterRoleChangeListener(), self());
}
}
private boolean actorInitialized = false;
private final List<Runnable> runnablesOnInitialized = Lists.newArrayList();
+ private String role ;
private ShardInformation(String shardName, ShardIdentifier shardId,
Map<ShardIdentifier, String> peerAddresses) {
void addRunnableOnInitialized(Runnable runnable) {
runnablesOnInitialized.add(runnable);
}
+
+ public void setRole(String newRole) {
+ this.role = newRole;
+ }
+
+ public String getRole(){
+ return this.role;
+ }
+
}
private static class ShardManagerCreator implements Creator<ShardManager> {
final ClusterWrapper cluster;
final Configuration configuration;
final DatastoreContext datastoreContext;
+ private final CountDownLatch waitTillReadyCountdownLatch;
ShardManagerCreator(ClusterWrapper cluster,
- Configuration configuration, DatastoreContext datastoreContext) {
+ Configuration configuration, DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch) {
this.cluster = cluster;
this.configuration = configuration;
this.datastoreContext = datastoreContext;
+ this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
}
@Override
public ShardManager create() throws Exception {
- return new ShardManager(cluster, configuration, datastoreContext);
+ return new ShardManager(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch);
}
}
boolean done = true;
public void reset(){
- Preconditions.checkState(done);
+ Preconditions.checkState(done,
+ String.format("Trying to reset a context that is not done (%s). currentMessage = %s", done, currentMessage));
done = false;
stopwatch.reset().start();
}
package org.opendaylight.controller.cluster.datastore;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import akka.util.Timeout;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import scala.concurrent.duration.FiniteDuration;
public class DistributedDataStoreTest extends AbstractActorTest {
@Mock
private ActorContext actorContext;
+ @Mock
+ private DatastoreContext datastoreContext;
+
+ @Mock
+ private Timeout shardElectionTimeout;
+
@Before
public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
verify(actorContext, times(0)).acquireTxCreationPermit();
}
+ @Test
+ public void testWaitTillReadyBlocking(){
+ doReturn(datastoreContext).when(actorContext).getDatastoreContext();
+ doReturn(shardElectionTimeout).when(datastoreContext).getShardLeaderElectionTimeout();
+ doReturn(FiniteDuration.apply(50, TimeUnit.MILLISECONDS)).when(shardElectionTimeout).duration();
+ DistributedDataStore distributedDataStore = new DistributedDataStore(actorContext);
+
+ long start = System.currentTimeMillis();
+
+ distributedDataStore.waitTillReady();
+
+ long end = System.currentTimeMillis();
+
+ assertTrue("Expected to be blocked for 50 millis", (end-start) >= 50);
+ }
+
+ @Test
+ public void testWaitTillReadyCountDown(){
+ final DistributedDataStore distributedDataStore = new DistributedDataStore(actorContext);
+ doReturn(datastoreContext).when(actorContext).getDatastoreContext();
+ doReturn(shardElectionTimeout).when(datastoreContext).getShardLeaderElectionTimeout();
+ doReturn(FiniteDuration.apply(5000, TimeUnit.MILLISECONDS)).when(shardElectionTimeout).duration();
+
+ Executors.newSingleThreadExecutor().submit(new Runnable() {
+ @Override
+ public void run() {
+ Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+ distributedDataStore.getWaitTillReadyCountDownLatch().countDown();
+ }
+ });
+
+ long start = System.currentTimeMillis();
+
+ distributedDataStore.waitTillReady();
+
+ long end = System.currentTimeMillis();
+
+ assertTrue("Expected to be released in 500 millis", (end-start) < 5000);
+
+ }
+
}
\ No newline at end of file
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import akka.actor.ActorRef;
import akka.actor.Props;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
+import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
+import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
private final String shardMrgIDSuffix = "config" + ID_COUNTER++;
private final String shardMgrID = "shard-manager-" + shardMrgIDSuffix;
+ @Mock
+ private static CountDownLatch ready;
+
private static ActorRef mockShardActor;
@Before
public void setUp() {
+ MockitoAnnotations.initMocks(this);
+
InMemoryJournal.clear();
if(mockShardActor == null) {
}
private Props newShardMgrProps() {
-
DatastoreContext.Builder builder = DatastoreContext.newBuilder();
builder.dataStoreType(shardMrgIDSuffix);
- return ShardManager.props(new MockClusterWrapper(), new MockConfiguration(), builder.build());
+ return ShardManager.props(new MockClusterWrapper(), new MockConfiguration(),
+ builder.build(), ready);
}
@Test
public void testRecoveryApplicable(){
new JavaTestKit(getSystem()) {
{
- final Props persistentProps = ShardManager.props(new MockClusterWrapper(), new MockConfiguration(),
- DatastoreContext.newBuilder().persistent(true).dataStoreType(shardMrgIDSuffix).build());
+ final Props persistentProps = ShardManager.props(
+ new MockClusterWrapper(),
+ new MockConfiguration(),
+ DatastoreContext.newBuilder().persistent(true).build(), ready);
final TestActorRef<ShardManager> persistentShardManager =
TestActorRef.create(getSystem(), persistentProps);
assertTrue("Recovery Applicable", dataPersistenceProvider1.isRecoveryApplicable());
- final Props nonPersistentProps = ShardManager.props(new MockClusterWrapper(), new MockConfiguration(),
- DatastoreContext.newBuilder().persistent(false).dataStoreType(shardMrgIDSuffix).build());
+ final Props nonPersistentProps = ShardManager.props(
+ new MockClusterWrapper(),
+ new MockConfiguration(),
+ DatastoreContext.newBuilder().persistent(false).build(), ready);
final TestActorRef<ShardManager> nonPersistentShardManager =
TestActorRef.create(getSystem(), nonPersistentProps);
private static final long serialVersionUID = 1L;
@Override
public ShardManager create() throws Exception {
- return new ShardManager(new MockClusterWrapper(), new MockConfiguration(),
- DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build()) {
+ return new ShardManager(new MockClusterWrapper(), new MockConfiguration(), DatastoreContext.newBuilder().build(), ready) {
@Override
protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) {
DataPersistenceProviderMonitor dataPersistenceProviderMonitor
}};
}
+ @Test
+ public void testRoleChangeNotificationReleaseReady() throws Exception {
+ new JavaTestKit(getSystem()) {
+ {
+ final Props persistentProps = ShardManager.props(
+ new MockClusterWrapper(),
+ new MockConfiguration(),
+ DatastoreContext.newBuilder().persistent(true).build(), ready);
+ final TestActorRef<ShardManager> shardManager =
+ TestActorRef.create(getSystem(), persistentProps);
+
+ shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
+
+ verify(ready, times(1)).countDown();
+
+ }};
+ }
+
+ @Test
+ public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
+ new JavaTestKit(getSystem()) {
+ {
+ final Props persistentProps = ShardManager.props(
+ new MockClusterWrapper(),
+ new MockConfiguration(),
+ DatastoreContext.newBuilder().persistent(true).build(), ready);
+ final TestActorRef<ShardManager> shardManager =
+ TestActorRef.create(getSystem(), persistentProps);
+
+ shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification("unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
+
+ verify(ready, never()).countDown();
+
+ }};
+ }
+
private static class TestShardManager extends ShardManager {
TestShardManager(String shardMrgIDSuffix) {
super(new MockClusterWrapper(), new MockConfiguration(),
- DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build());
+ DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build(), ready);
}
@Override
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
import org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore;
+import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
+import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
+import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
import org.opendaylight.controller.cluster.raft.Snapshot;
}};
}
+ @Test
+ public void testRegisterRoleChangeListener() throws Exception {
+ new ShardTestKit(getSystem()) {
+ {
+ final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testRegisterRoleChangeListener");
+
+ waitUntilLeader(shard);
+
+ TestActorRef<MessageCollectorActor> listener =
+ TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
+
+ shard.tell(new RegisterRoleChangeListener(), listener);
+
+ // TODO: MessageCollectorActor exists as a test util in both the akka-raft and distributed-datastore
+ // projects. Need to move it to commons as a regular utility and then we can get rid of this arbitrary
+ // sleep.
+ Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+
+ List<Object> allMatching = MessageCollectorActor.getAllMatching(listener, RegisterRoleChangeListenerReply.class);
+
+ assertEquals(1, allMatching.size());
+ }};
+ }
+
+
private NormalizedNode<?, ?> readStore(final InMemoryDOMDataStore store) throws ReadFailedException {
DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read =