private long snapshotBatchCount = SNAPSHOT_BATCH_COUNT;
private int journalRecoveryLogBatchSize = JOURNAL_RECOVERY_LOG_BATCH_SIZE;
private long isolatedLeaderCheckInterval = HEART_BEAT_INTERVAL.$times(1000).toMillis();
+ private FiniteDuration electionTimeOutInterval;
// 12 is just an arbitrary percentage. This is the amount of the total memory that a raft actor's
// in-memory journal can use before it needs to snapshot
public void setHeartBeatInterval(FiniteDuration heartBeatInterval) {
this.heartBeatInterval = heartBeatInterval;
+ electionTimeOutInterval = null;
}
public void setSnapshotBatchCount(long snapshotBatchCount) {
public void setElectionTimeoutFactor(long electionTimeoutFactor){
this.electionTimeoutFactor = electionTimeoutFactor;
+ electionTimeOutInterval = null;
}
@Override
@Override
public FiniteDuration getElectionTimeOutInterval() {
- return getHeartBeatInterval().$times(electionTimeoutFactor);
+ if(electionTimeOutInterval == null) {
+ electionTimeOutInterval = getHeartBeatInterval().$times(electionTimeoutFactor);
+ }
+
+ return electionTimeOutInterval;
}
@Override
import com.google.common.base.Stopwatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
-import scala.concurrent.duration.FiniteDuration;
public class FollowerLogInformationImpl implements FollowerLogInformation {
private static final AtomicLongFieldUpdater<FollowerLogInformationImpl> NEXT_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(FollowerLogInformationImpl.class, "nextIndex");
private final Stopwatch stopwatch = Stopwatch.createUnstarted();
- private final long followerTimeoutMillis;
+ private final RaftActorContext context;
private volatile long nextIndex;
private volatile long matchIndex;
- public FollowerLogInformationImpl(String id, long nextIndex,
- long matchIndex, FiniteDuration followerTimeoutDuration) {
+ public FollowerLogInformationImpl(String id, long matchIndex, RaftActorContext context) {
this.id = id;
- this.nextIndex = nextIndex;
+ this.nextIndex = context.getCommitIndex();
this.matchIndex = matchIndex;
- this.followerTimeoutMillis = followerTimeoutDuration.toMillis();
+ this.context = context;
}
@Override
@Override
public boolean isFollowerActive() {
long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
- return (stopwatch.isRunning()) && (elapsed <= followerTimeoutMillis);
+ return (stopwatch.isRunning()) &&
+ (elapsed <= context.getConfigParams().getElectionTimeOutInterval().toMillis());
}
@Override
builder.append("FollowerLogInformationImpl [id=").append(id).append(", nextIndex=").append(nextIndex)
.append(", matchIndex=").append(matchIndex).append(", stopwatch=")
.append(stopwatch.elapsed(TimeUnit.MILLISECONDS))
- .append(", followerTimeoutMillis=").append(followerTimeoutMillis).append("]");
+ .append(", followerTimeoutMillis=")
+ .append(context.getConfigParams().getElectionTimeOutInterval().toMillis()).append("]");
return builder.toString();
}
* This context should NOT be passed directly to any other actor it is
* only to be consumed by the RaftActorBehaviors
*/
- private final RaftActorContext context;
+ private final RaftActorContextImpl context;
/**
* The in-memory journal
super.preStart();
}
+ @Override
+ public void postStop() {
+ if(currentBehavior != null) {
+ try {
+ currentBehavior.close();
+ } catch (Exception e) {
+ LOG.debug("{}: Error closing behavior {}", persistenceId(), currentBehavior.state());
+ }
+ }
+
+ super.postStop();
+ }
+
@Override
public void handleRecover(Object message) {
if(persistence().isRecoveryApplicable()) {
return context;
}
+ protected void updateConfigParams(ConfigParams configParams) {
+ context.setConfigParams(configParams);
+ }
+
/**
* setPeerAddress sets the address of a known peer at a later time.
* <p>
private final Logger LOG;
- private final ConfigParams configParams;
+ private ConfigParams configParams;
private boolean snapshotCaptureInitiated;
this.LOG = logger;
}
+ void setConfigParams(ConfigParams configParams) {
+ this.configParams = configParams;
+ }
+
@Override
public ActorRef actorOf(Props props){
return context.actorOf(props);
final Builder<String, FollowerLogInformation> ftlBuilder = ImmutableMap.builder();
for (String followerId : context.getPeerAddresses().keySet()) {
FollowerLogInformation followerLogInformation =
- new FollowerLogInformationImpl(followerId,
- context.getCommitIndex(), -1,
- context.getConfigParams().getElectionTimeOutInterval());
+ new FollowerLogInformationImpl(followerId, -1, context);
ftlBuilder.put(followerId, followerLogInformation);
}
*/
package org.opendaylight.controller.cluster.raft;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.concurrent.TimeUnit;
import org.junit.Test;
import scala.concurrent.duration.FiniteDuration;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
public class FollowerLogInformationImplTest {
@Test
public void testIsFollowerActive() {
- FiniteDuration timeoutDuration =
- new FiniteDuration(500, TimeUnit.MILLISECONDS);
-
- FollowerLogInformation followerLogInformation =
- new FollowerLogInformationImpl(
- "follower1", 10, 9, timeoutDuration);
+ MockRaftActorContext context = new MockRaftActorContext();
+ context.setCommitIndex(10);
+ DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+ configParams.setHeartBeatInterval(new FiniteDuration(500, TimeUnit.MILLISECONDS));
+ configParams.setElectionTimeoutFactor(1);
+ context.setConfigParams(configParams);
+ FollowerLogInformation followerLogInformation =
+ new FollowerLogInformationImpl("follower1", 9, context);
assertFalse("Follower should be termed inactive before stopwatch starts",
followerLogInformation.isFollowerActive());
private boolean snapshotCaptureInitiated;
public MockRaftActorContext(){
- electionTerm = null;
-
- initReplicatedLog();
- }
-
- public MockRaftActorContext(String id, ActorSystem system, ActorRef actor){
- this.id = id;
- this.system = system;
- this.actor = actor;
-
- final String id1 = id;
electionTerm = new ElectionTerm() {
- /**
- * Identifier of the actor whose election term information this is
- */
- private final String id = id1;
private long currentTerm = 1;
private String votedFor = "";
};
configParams = new DefaultConfigParamsImpl();
+ }
+
+ public MockRaftActorContext(String id, ActorSystem system, ActorRef actor){
+ this();
+ this.id = id;
+ this.system = system;
+ this.actor = actor;
initReplicatedLog();
}
import java.util.Dictionary;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceReference;
+import org.osgi.framework.ServiceRegistration;
import org.osgi.service.cm.Configuration;
import org.osgi.service.cm.ConfigurationAdmin;
+import org.osgi.service.cm.ConfigurationEvent;
+import org.osgi.service.cm.ConfigurationListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DatastoreContextConfigAdminOverlay implements AutoCloseable {
public static final String CONFIG_ID = "org.opendaylight.controller.cluster.datastore";
+ public static interface Listener {
+ void onDatastoreContextUpdated(DatastoreContext context);
+ }
+
private static final Logger LOG = LoggerFactory.getLogger(DatastoreContextConfigAdminOverlay.class);
private final DatastoreContextIntrospector introspector;
private final BundleContext bundleContext;
+ private ServiceRegistration<?> configListenerServiceRef;
+ private Listener listener;
- public DatastoreContextConfigAdminOverlay(DatastoreContextIntrospector introspector, BundleContext bundleContext) {
+ public DatastoreContextConfigAdminOverlay(DatastoreContextIntrospector introspector,
+ BundleContext bundleContext) {
this.introspector = introspector;
this.bundleContext = bundleContext;
LOG.warn("No ConfigurationAdmin service found");
} else {
overlaySettings(configAdminServiceReference);
+
+ configListenerServiceRef = bundleContext.registerService(ConfigurationListener.class.getName(),
+ new DatastoreConfigurationListener(), null);
}
}
+ public void setListener(Listener listener) {
+ this.listener = listener;
+ }
+
private void overlaySettings(ServiceReference<ConfigurationAdmin> configAdminServiceReference) {
try {
ConfigurationAdmin configAdmin = bundleContext.getService(configAdminServiceReference);
LOG.debug("Overlaying settings: {}", properties);
- introspector.update(properties);
+ if(introspector.update(properties)) {
+ if(listener != null) {
+ listener.onDatastoreContextUpdated(introspector.getContext());
+ }
+ }
} else {
LOG.debug("No Configuration found for {}", CONFIG_ID);
}
@Override
public void close() {
+ listener = null;
+
+ if(configListenerServiceRef != null) {
+ configListenerServiceRef.unregister();
+ }
+ }
+
+ private class DatastoreConfigurationListener implements ConfigurationListener {
+ @Override
+ public void configurationEvent(ConfigurationEvent event) {
+ if(CONFIG_ID.equals(event.getPid()) && event.getType() == ConfigurationEvent.CM_UPDATED) {
+ LOG.debug("configurationEvent: config {} was updated", CONFIG_ID);
+ overlaySettings(event.getReference());
+ }
+ }
}
}
* @param properties the properties to apply
* @return true if the cached DatastoreContext was updated, false otherwise.
*/
- public boolean update(Dictionary<String, Object> properties) {
+ public synchronized boolean update(Dictionary<String, Object> properties) {
if(properties == null || properties.isEmpty()) {
return false;
}
+ LOG.debug("In update: properties: {}", properties);
+
Builder builder = DatastoreContext.newBuilderFrom(context);
final String dataStoreTypePrefix = context.getDataStoreType() + '.';
}
private Object constructorValueRecursively(Class<?> toType, Object fromValue) throws Exception {
- LOG.debug("convertValueRecursively - toType: {}, fromValue {} ({})",
+ LOG.trace("convertValueRecursively - toType: {}, fromValue {} ({})",
toType.getSimpleName(), fromValue, fromValue.getClass().getSimpleName());
Constructor<?> ctor = constructors.get(toType);
- LOG.debug("Found {}", ctor);
+ LOG.trace("Found {}", ctor);
if(ctor == null) {
throw new IllegalArgumentException(String.format("Constructor not found for type %s", toType));
/**
*
*/
-public class DistributedDataStore implements DOMStore, SchemaContextListener, AutoCloseable {
+public class DistributedDataStore implements DOMStore, SchemaContextListener,
+ DatastoreContextConfigAdminOverlay.Listener, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class);
public static final int REGISTER_DATA_CHANGE_LISTENER_TIMEOUT_FACTOR = 24; // 24 times the usual operation timeout
actorContext.setSchemaContext(schemaContext);
}
+ @Override
+ public void onDatastoreContextUpdated(DatastoreContext context) {
+ LOG.info("DatastoreContext updated for data store {}", actorContext.getDataStoreType());
+
+ actorContext.setDatastoreContext(context);
+ datastoreConfigMXBean.setContext(context);
+ }
+
@Override
public void close() {
datastoreConfigMXBean.unregisterMBean();
final DistributedDataStore dataStore = new DistributedDataStore(actorSystem,
new ClusterWrapperImpl(actorSystem), config, introspector.getContext());
+ overlay.setListener(dataStore);
+
ShardStrategyFactory.setConfiguration(config);
schemaService.registerSchemaContextListener(dataStore);
private final List<DelayedListenerRegistration> delayedListenerRegistrations =
Lists.newArrayList();
- private final DatastoreContext datastoreContext;
+ private DatastoreContext datastoreContext;
- private final DataPersistenceProvider dataPersistenceProvider;
+ private DataPersistenceProvider dataPersistenceProvider;
private SchemaContext schemaContext;
private final ShardCommitCoordinator commitCoordinator;
- private final long transactionCommitTimeout;
+ private long transactionCommitTimeout;
private Cancellable txCommitTimeoutCheckSchedule;
commitCoordinator = new ShardCommitCoordinator(TimeUnit.SECONDS.convert(1, TimeUnit.MINUTES),
datastoreContext.getShardTransactionCommitQueueCapacity(), LOG, name.toString());
- transactionCommitTimeout = TimeUnit.MILLISECONDS.convert(
- datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS);
+ setTransactionCommitTimeout();
// create a notifier actor for each cluster member
roleChangeNotifier = createRoleChangeNotifier(name.toString());
getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis());
}
+ private void setTransactionCommitTimeout() {
+ transactionCommitTimeout = TimeUnit.MILLISECONDS.convert(
+ datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS);
+ }
+
private static Map<String, String> mapPeerAddresses(
final Map<ShardIdentifier, String> peerAddresses) {
Map<String, String> map = new HashMap<>();
@Override
public void postStop() {
+ LOG.info("Stopping Shard {}", persistenceId());
+
super.postStop();
if(txCommitTimeoutCheckSchedule != null) {
txCommitTimeoutCheckSchedule.cancel();
}
+
+ shardMBean.unregisterMBean();
}
@Override
resolved.getPeerAddress());
} else if (message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
handleTransactionCommitTimeoutCheck();
+ } else if(message instanceof DatastoreContext) {
+ onDatastoreContext((DatastoreContext)message);
} else {
super.onReceiveCommand(message);
}
return roleChangeNotifier;
}
+ private void onDatastoreContext(DatastoreContext context) {
+ datastoreContext = context;
+
+ commitCoordinator.setQueueCapacity(datastoreContext.getShardTransactionCommitQueueCapacity());
+
+ setTransactionCommitTimeout();
+
+ if(datastoreContext.isPersistent() &&
+ dataPersistenceProvider instanceof NonPersistentRaftDataProvider) {
+ dataPersistenceProvider = new PersistentDataProvider();
+ } else if(!datastoreContext.isPersistent() &&
+ dataPersistenceProvider instanceof PersistentDataProvider) {
+ dataPersistenceProvider = new NonPersistentRaftDataProvider();
+ }
+
+ updateConfigParams(datastoreContext.getShardRaftConfig());
+ }
+
private void handleTransactionCommitTimeoutCheck() {
CohortEntry cohortEntry = commitCoordinator.getCurrentCohortEntry();
if(cohortEntry != null) {
private final Queue<CohortEntry> queuedCohortEntries;
- private final int queueCapacity;
+ private int queueCapacity;
private final Logger log;
queuedCohortEntries = new LinkedList<>();
}
+ public void setQueueCapacity(int queueCapacity) {
+ this.queueCapacity = queueCapacity;
+ }
+
/**
* This method caches a cohort entry for the given transactions ID in preparation for the
* subsequent 3-phase commit.
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
-import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
private final String shardDispatcherPath;
- private ShardManagerInfoMBean mBean;
+ private ShardManagerInfo mBean;
- private final DatastoreContext datastoreContext;
+ private DatastoreContext datastoreContext;
private Collection<String> knownModules = Collections.emptySet();
return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContext));
}
+ @Override
+ public void postStop() {
+ LOG.info("Stopping ShardManager");
+
+ mBean.unregisterMBean();
+ }
+
@Override
public void handleCommand(Object message) throws Exception {
if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) {
memberRemoved((ClusterEvent.MemberRemoved) message);
} else if(message instanceof ClusterEvent.UnreachableMember) {
ignoreMessage(message);
+ } else if(message instanceof DatastoreContext) {
+ onDatastoreContext((DatastoreContext)message);
} else{
unknownMessage(message);
}
}
}
+ private void onDatastoreContext(DatastoreContext context) {
+ datastoreContext = context;
+ for (ShardInformation info : localShards.values()) {
+ if (info.getActor() != null) {
+ info.getActor().tell(datastoreContext, getSelf());
+ }
+ }
+ }
+
/**
* Notifies all the local shards of a change in the schema context
*
private final ActorRef shardManager;
private final ClusterWrapper clusterWrapper;
private final Configuration configuration;
- private final DatastoreContext datastoreContext;
- private final FiniteDuration operationDuration;
- private final Timeout operationTimeout;
+ private DatastoreContext datastoreContext;
+ private FiniteDuration operationDuration;
+ private Timeout operationTimeout;
private final String selfAddressHostPort;
- private final RateLimiter txRateLimiter;
+ private RateLimiter txRateLimiter;
private final MetricRegistry metricRegistry = new MetricRegistry();
private final JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry).inDomain(DOMAIN).build();
private final int transactionOutstandingOperationLimit;
- private final Timeout transactionCommitOperationTimeout;
+ private Timeout transactionCommitOperationTimeout;
private final Dispatchers dispatchers;
private volatile SchemaContext schemaContext;
+ private volatile boolean updated;
public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
ClusterWrapper clusterWrapper, Configuration configuration) {
this.clusterWrapper = clusterWrapper;
this.configuration = configuration;
this.datastoreContext = datastoreContext;
- this.txRateLimiter = RateLimiter.create(datastoreContext.getTransactionCreationInitialRateLimit());
this.dispatchers = new Dispatchers(actorSystem.dispatchers());
- operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS);
- operationTimeout = new Timeout(operationDuration);
- transactionCommitOperationTimeout = new Timeout(Duration.create(getDatastoreContext().getShardTransactionCommitTimeoutInSeconds(),
- TimeUnit.SECONDS));
-
+ setCachedProperties();
Address selfAddress = clusterWrapper.getSelfAddress();
if (selfAddress != null && !selfAddress.host().isEmpty()) {
}
+ private void setCachedProperties() {
+ txRateLimiter = RateLimiter.create(datastoreContext.getTransactionCreationInitialRateLimit());
+
+ operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS);
+ operationTimeout = new Timeout(operationDuration);
+
+ transactionCommitOperationTimeout = new Timeout(Duration.create(
+ datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS));
+ }
+
public DatastoreContext getDatastoreContext() {
return datastoreContext;
}
this.schemaContext = schemaContext;
if(shardManager != null) {
- shardManager.tell(new UpdateSchemaContext(schemaContext), null);
+ shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
+ }
+ }
+
+ public void setDatastoreContext(DatastoreContext context) {
+ this.datastoreContext = context;
+ setCachedProperties();
+
+ // We write the 'updated' volatile to trigger a write memory barrier so that the writes above
+ // will be published immediately even though they may not be immediately visible to other
+ // threads due to unsynchronized reads. That's OK though - we're going for eventual
+ // consistency here as immediately visible updates to these members aren't critical. These
+ // members could've been made volatile but wanted to avoid volatile reads as these are
+ // accessed often and updates will be infrequent.
+
+ updated = true;
+
+ if(shardManager != null) {
+ shardManager.tell(context, ActorRef.noSender());
}
}
*/
package org.opendaylight.controller.cluster.datastore;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import java.io.IOException;
import java.util.Dictionary;
import java.util.Hashtable;
+import org.junit.Before;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceReference;
+import org.osgi.framework.ServiceRegistration;
import org.osgi.service.cm.Configuration;
import org.osgi.service.cm.ConfigurationAdmin;
+import org.osgi.service.cm.ConfigurationEvent;
+import org.osgi.service.cm.ConfigurationListener;
/**
* Unit tests for DatastoreContextConfigAdminOverlay.
*
* @author Thomas Pantelis
*/
+@SuppressWarnings("unchecked")
public class DatastoreContextConfigAdminOverlayTest {
- @SuppressWarnings("unchecked")
- @Test
- public void test() throws IOException {
- BundleContext mockBundleContext = mock(BundleContext.class);
- ServiceReference<ConfigurationAdmin> mockServiceRef = mock(ServiceReference.class);
- ConfigurationAdmin mockConfigAdmin = mock(ConfigurationAdmin.class);
- Configuration mockConfig = mock(Configuration.class);
- DatastoreContextIntrospector mockIntrospector = mock(DatastoreContextIntrospector.class);
+ @Mock
+ private BundleContext mockBundleContext;
+
+ @Mock
+ private ServiceReference<ConfigurationAdmin> mockConfigAdminServiceRef;
+
+ @Mock
+ private ConfigurationAdmin mockConfigAdmin;
+
+ @Mock
+ private Configuration mockConfig;
+
+ @Mock
+ private DatastoreContextIntrospector mockIntrospector;
+
+ @Mock
+ private ServiceRegistration<?> configListenerServiceReg;
+
+ @Before
+ public void setup() throws IOException {
+ MockitoAnnotations.initMocks(this);
- doReturn(mockServiceRef).when(mockBundleContext).getServiceReference(ConfigurationAdmin.class);
- doReturn(mockConfigAdmin).when(mockBundleContext).getService(mockServiceRef);
+ doReturn(mockConfigAdminServiceRef).when(mockBundleContext).getServiceReference(ConfigurationAdmin.class);
+ doReturn(mockConfigAdmin).when(mockBundleContext).getService(mockConfigAdminServiceRef);
+ doReturn(configListenerServiceReg).when(mockBundleContext).registerService(
+ eq(ConfigurationListener.class.getName()), any(), any(Dictionary.class));
doReturn(mockConfig).when(mockConfigAdmin).getConfiguration(DatastoreContextConfigAdminOverlay.CONFIG_ID);
doReturn(DatastoreContextConfigAdminOverlay.CONFIG_ID).when(mockConfig).getPid();
+ }
+
+ @Test
+ public void testUpdateOnConstruction() {
+ Dictionary<String, Object> properties = new Hashtable<>();
+ properties.put("property", "value");
+ doReturn(properties).when(mockConfig).getProperties();
+
+ DatastoreContextConfigAdminOverlay overlay = new DatastoreContextConfigAdminOverlay(
+ mockIntrospector, mockBundleContext);
+
+ verify(mockIntrospector).update(properties);
+
+ verify(mockBundleContext).ungetService(mockConfigAdminServiceRef);
+
+ overlay.close();
+ }
+
+ @Test
+ public void testUpdateOnConfigurationEvent() {
+ DatastoreContextConfigAdminOverlay overlay = new DatastoreContextConfigAdminOverlay(
+ mockIntrospector, mockBundleContext);
+
+ reset(mockIntrospector);
+
+ DatastoreContext context = DatastoreContext.newBuilder().build();
+ doReturn(context).when(mockIntrospector).getContext();
+
+ DatastoreContextConfigAdminOverlay.Listener mockListener =
+ mock(DatastoreContextConfigAdminOverlay.Listener.class);
+
+ overlay.setListener(mockListener);
+
Dictionary<String, Object> properties = new Hashtable<>();
properties.put("property", "value");
doReturn(properties).when(mockConfig).getProperties();
- try(DatastoreContextConfigAdminOverlay overlay = new DatastoreContextConfigAdminOverlay(
- mockIntrospector, mockBundleContext)) {
- }
+ doReturn(true).when(mockIntrospector).update(properties);
+
+ ArgumentCaptor<ConfigurationListener> configListener =
+ ArgumentCaptor.forClass(ConfigurationListener.class);
+ verify(mockBundleContext).registerService(eq(ConfigurationListener.class.getName()),
+ configListener.capture(), any(Dictionary.class));
+
+ ConfigurationEvent configEvent = mock(ConfigurationEvent.class);
+ doReturn(DatastoreContextConfigAdminOverlay.CONFIG_ID).when(configEvent).getPid();
+ doReturn(mockConfigAdminServiceRef).when(configEvent).getReference();
+ doReturn(ConfigurationEvent.CM_UPDATED).when(configEvent).getType();
+
+ configListener.getValue().configurationEvent(configEvent);
verify(mockIntrospector).update(properties);
- verify(mockBundleContext).ungetService(mockServiceRef);
+ verify(mockListener).onDatastoreContextUpdated(context);
+
+ verify(mockBundleContext, times(2)).ungetService(mockConfigAdminServiceRef);
+
+ overlay.close();
+
+ verify(configListenerServiceReg).unregister();
+ }
+
+ @Test
+ public void testConfigurationEventWithDifferentPid() {
+ DatastoreContextConfigAdminOverlay overlay = new DatastoreContextConfigAdminOverlay(
+ mockIntrospector, mockBundleContext);
+
+ reset(mockIntrospector);
+
+ ArgumentCaptor<ConfigurationListener> configListener =
+ ArgumentCaptor.forClass(ConfigurationListener.class);
+ verify(mockBundleContext).registerService(eq(ConfigurationListener.class.getName()),
+ configListener.capture(), any(Dictionary.class));
+
+ ConfigurationEvent configEvent = mock(ConfigurationEvent.class);
+ doReturn("other-pid").when(configEvent).getPid();
+ doReturn(mockConfigAdminServiceRef).when(configEvent).getReference();
+ doReturn(ConfigurationEvent.CM_UPDATED).when(configEvent).getType();
+
+ configListener.getValue().configurationEvent(configEvent);
+
+ verify(mockIntrospector, times(0)).update(any(Dictionary.class));
+
+ overlay.close();
+ }
+
+ @Test
+ public void testConfigurationEventWithNonUpdateEventType() {
+ DatastoreContextConfigAdminOverlay overlay = new DatastoreContextConfigAdminOverlay(
+ mockIntrospector, mockBundleContext);
+
+ reset(mockIntrospector);
+
+ ArgumentCaptor<ConfigurationListener> configListener =
+ ArgumentCaptor.forClass(ConfigurationListener.class);
+ verify(mockBundleContext).registerService(eq(ConfigurationListener.class.getName()),
+ configListener.capture(), any(Dictionary.class));
+
+ ConfigurationEvent configEvent = mock(ConfigurationEvent.class);
+ doReturn(DatastoreContextConfigAdminOverlay.CONFIG_ID).when(configEvent).getPid();
+ doReturn(mockConfigAdminServiceRef).when(configEvent).getReference();
+ doReturn(ConfigurationEvent.CM_DELETED).when(configEvent).getType();
+
+ configListener.getValue().configurationEvent(configEvent);
+
+ verify(mockIntrospector, times(0)).update(any(Dictionary.class));
+
+ overlay.close();
}
}
}
+ @Test
+ public void testOnDatastoreContext() {
+ new ShardTestKit(getSystem()) {{
+ dataStoreContextBuilder.persistent(true);
+
+ TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testOnDatastoreContext");
+
+ assertEquals("isRecoveryApplicable", true,
+ shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
+
+ waitUntilLeader(shard);
+
+ shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
+
+ assertEquals("isRecoveryApplicable", false,
+ shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
+
+ shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
+
+ assertEquals("isRecoveryApplicable", true,
+ shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }};
+ }
private NormalizedNode<?, ?> readStore(final InMemoryDOMDataStore store) throws ReadFailedException {
DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
import com.typesafe.config.ConfigFactory;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.time.StopWatch;
+import org.junit.Assert;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
}
+ @Test
+ public void testSetDatastoreContext() {
+ new JavaTestKit(getSystem()) {{
+ ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class),
+ mock(Configuration.class), DatastoreContext.newBuilder().
+ operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build());
+
+ assertEquals("getOperationDuration", 5, actorContext.getOperationDuration().toSeconds());
+ assertEquals("getTransactionCommitOperationTimeout", 7,
+ actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
+
+ DatastoreContext newContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(6).
+ shardTransactionCommitTimeoutInSeconds(8).build();
+
+ actorContext.setDatastoreContext(newContext);
+
+ expectMsgClass(duration("5 seconds"), DatastoreContext.class);
+
+ Assert.assertSame("getDatastoreContext", newContext, actorContext.getDatastoreContext());
+
+ assertEquals("getOperationDuration", 6, actorContext.getOperationDuration().toSeconds());
+ assertEquals("getTransactionCommitOperationTimeout", 8,
+ actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
+ }};
+ }
}
org.slf4j.simpleLogger.logFile=System.out
org.slf4j.simpleLogger.showShortLogName=true
org.slf4j.simpleLogger.levelInBrackets=true
-org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.datastore=trace
\ No newline at end of file
+org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.datastore=debug
\ No newline at end of file