+/*
+ * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
package org.opendaylight.controller.cluster.datastore.utils;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.doReturn;
import com.google.common.base.Optional;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.Uninterruptibles;
import com.typesafe.config.ConfigFactory;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang.time.StopWatch;
import org.junit.Assert;
import org.junit.Test;
+import org.mockito.Mockito;
import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
-import org.opendaylight.controller.cluster.datastore.Configuration;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
+import org.opendaylight.controller.cluster.datastore.config.Configuration;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
+import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
+import org.opendaylight.controller.cluster.raft.utils.EchoActor;
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
assertEquals(expected, actual);
}
- @Test
- public void testRateLimiting(){
- DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
- transactionCreationInitialRateLimit(155L).build();
-
- ActorContext actorContext =
- new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
- mock(Configuration.class), dataStoreContext);
-
- // Check that the initial value is being picked up from DataStoreContext
- assertEquals(dataStoreContext.getTransactionCreationInitialRateLimit(), actorContext.getTxCreationLimit(), 1e-15);
-
- actorContext.setTxCreationLimit(1.0);
-
- assertEquals(1.0, actorContext.getTxCreationLimit(), 1e-15);
-
-
- StopWatch watch = new StopWatch();
-
- watch.start();
-
- actorContext.acquireTxCreationPermit();
- actorContext.acquireTxCreationPermit();
- actorContext.acquireTxCreationPermit();
-
- watch.stop();
-
- assertTrue("did not take as much time as expected", watch.getTime() > 1000);
- }
@Test
public void testClientDispatcherIsGlobalDispatcher(){
ActorContext actorContext =
new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
- mock(Configuration.class), DatastoreContext.newBuilder().build());
+ mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
ActorContext actorContext =
new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class),
- mock(Configuration.class), DatastoreContext.newBuilder().build());
+ mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
new JavaTestKit(getSystem()) {{
ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class),
mock(Configuration.class), DatastoreContext.newBuilder().
- operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build());
+ operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build(), new PrimaryShardInfoFutureCache());
assertEquals("getOperationDuration", 5, actorContext.getOperationDuration().toSeconds());
assertEquals("getTransactionCommitOperationTimeout", 7,
DatastoreContext newContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(6).
shardTransactionCommitTimeoutInSeconds(8).build();
- actorContext.setDatastoreContext(newContext);
+ DatastoreContextFactory mockContextFactory = mock(DatastoreContextFactory.class);
+ Mockito.doReturn(newContext).when(mockContextFactory).getBaseDatastoreContext();
- expectMsgClass(duration("5 seconds"), DatastoreContext.class);
+ actorContext.setDatastoreContext(mockContextFactory);
+
+ expectMsgClass(duration("5 seconds"), DatastoreContextFactory.class);
Assert.assertSame("getDatastoreContext", newContext, actorContext.getDatastoreContext());
}
@Test
- public void testFindPrimaryShardAsyncPrimaryFound() throws Exception {
+ public void testFindPrimaryShardAsyncRemotePrimaryFound() throws Exception {
TestActorRef<MessageCollectorActor> shardManager =
TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
- DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
+ DatastoreContext dataStoreContext = DatastoreContext.newBuilder().
+ logicalStoreType(LogicalDatastoreType.CONFIGURATION).
shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
final String expPrimaryPath = "akka://test-system/find-primary-shard";
+ final short expPrimaryVersion = DataStoreVersions.CURRENT_VERSION;
ActorContext actorContext =
new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
- mock(Configuration.class), dataStoreContext) {
+ mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
@Override
protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
- return Futures.successful((Object) new PrimaryFound(expPrimaryPath));
+ return Futures.successful((Object) new RemotePrimaryShardFound(expPrimaryPath, expPrimaryVersion));
}
};
-
Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
assertEquals("LocalShardDataTree present", false, actual.getLocalShardDataTree().isPresent());
assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
+ assertEquals("getPrimaryShardVersion", expPrimaryVersion, actual.getPrimaryShardVersion());
Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
assertEquals(cachedInfo, actual);
- // Wait for 200 Milliseconds. The cached entry should have been removed.
-
- Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+ actorContext.getPrimaryShardInfoCache().remove("foobar");
cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
assertNull(cached);
-
}
@Test
- public void testFindPrimaryShardAsyncPrimaryNotFound() throws Exception {
+ public void testFindPrimaryShardAsyncLocalPrimaryFound() throws Exception {
TestActorRef<MessageCollectorActor> shardManager =
TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
- DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
+ DatastoreContext dataStoreContext = DatastoreContext.newBuilder().
+ logicalStoreType(LogicalDatastoreType.CONFIGURATION).
shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
+ final DataTree mockDataTree = Mockito.mock(DataTree.class);
+ final String expPrimaryPath = "akka://test-system/find-primary-shard";
ActorContext actorContext =
new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
- mock(Configuration.class), dataStoreContext) {
+ mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
@Override
protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
- return Futures.successful((Object) new PrimaryNotFoundException("not found"));
+ return Futures.successful((Object) new LocalPrimaryShardFound(expPrimaryPath, mockDataTree));
}
};
-
Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
+ PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
- try {
- Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
- fail("Expected PrimaryNotFoundException");
- } catch(PrimaryNotFoundException e){
-
- }
+ assertNotNull(actual);
+ assertEquals("LocalShardDataTree present", true, actual.getLocalShardDataTree().isPresent());
+ assertSame("LocalShardDataTree", mockDataTree, actual.getLocalShardDataTree().get());
+ assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
+ expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
+ assertEquals("getPrimaryShardVersion", DataStoreVersions.CURRENT_VERSION, actual.getPrimaryShardVersion());
Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
+ PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
+
+ assertEquals(cachedInfo, actual);
+
+ actorContext.getPrimaryShardInfoCache().remove("foobar");
+
+ cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
+
assertNull(cached);
}
@Test
- public void testFindPrimaryShardAsyncActorNotInitialized() throws Exception {
+ public void testFindPrimaryShardAsyncPrimaryNotFound() throws Exception {
+ testFindPrimaryExceptions(new PrimaryNotFoundException("not found"));
+ }
- TestActorRef<MessageCollectorActor> shardManager =
- TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
+ @Test
+ public void testFindPrimaryShardAsyncActorNotInitialized() throws Exception {
+ testFindPrimaryExceptions(new NotInitializedException("not initialized"));
+ }
- DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
- shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
+ private void testFindPrimaryExceptions(final Object expectedException) throws Exception {
+ TestActorRef<MessageCollectorActor> shardManager =
+ TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
- ActorContext actorContext =
- new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
- mock(Configuration.class), dataStoreContext) {
- @Override
- protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
- return Futures.successful((Object) new NotInitializedException("not iniislized"));
- }
- };
+ DatastoreContext dataStoreContext = DatastoreContext.newBuilder().
+ logicalStoreType(LogicalDatastoreType.CONFIGURATION).
+ shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
+ ActorContext actorContext =
+ new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
+ mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
+ @Override
+ protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
+ return Futures.successful(expectedException);
+ }
+ };
- Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
-
- try {
- Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
- fail("Expected NotInitializedException");
- } catch(NotInitializedException e){
+ Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
+ try {
+ Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
+ fail("Expected" + expectedException.getClass().toString());
+ } catch(Exception e){
+ if(!expectedException.getClass().isInstance(e)) {
+ fail("Expected Exception of type " + expectedException.getClass().toString());
}
+ }
- Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
+ Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
- assertNull(cached);
+ assertNull(cached);
}
@Test
TestActorRef<MockShardManager> shardManagerActorRef = TestActorRef.create(getSystem(), MockShardManager.props());
MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor();
- shardManagerActor.addFindPrimaryResp("shard1", new PrimaryFound(shardActorRef1.path().toString()));
- shardManagerActor.addFindPrimaryResp("shard2", new PrimaryFound(shardActorRef2.path().toString()));
+ shardManagerActor.addFindPrimaryResp("shard1", new RemotePrimaryShardFound(shardActorRef1.path().toString(),
+ DataStoreVersions.CURRENT_VERSION));
+ shardManagerActor.addFindPrimaryResp("shard2", new RemotePrimaryShardFound(shardActorRef2.path().toString(),
+ DataStoreVersions.CURRENT_VERSION));
shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found"));
Configuration mockConfig = mock(Configuration.class);
ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
mock(ClusterWrapper.class), mockConfig,
- DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build());
+ DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(), new PrimaryShardInfoFutureCache());
actorContext.broadcast(new TestMessage());
- expectFirstMatching(shardActorRef1, TestMessage.class);
- expectFirstMatching(shardActorRef2, TestMessage.class);
+ MessageCollectorActor.expectFirstMatching(shardActorRef1, TestMessage.class);
+ MessageCollectorActor.expectFirstMatching(shardActorRef2, TestMessage.class);
}};
}
- private <T> T expectFirstMatching(ActorRef actor, Class<T> clazz) {
- int count = 5000 / 50;
- for(int i = 0; i < count; i++) {
- try {
- T message = (T) MessageCollectorActor.getFirstMatching(actor, clazz);
- if(message != null) {
- return message;
- }
- } catch (Exception e) {}
-
- Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
- }
-
- Assert.fail("Did not receive message of type " + clazz);
- return null;
- }
}