executor = MoreExecutors.listeningDecorator(
MoreExecutors.getExitingExecutorService((ThreadPoolExecutor)Executors.newFixedThreadPool(1), 1L, TimeUnit.SECONDS));
- InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", dsExec,
- MoreExecutors.sameThreadExecutor());
- InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", dsExec,
- MoreExecutors.sameThreadExecutor());
+ InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", dsExec);
+ InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", dsExec);
Map<LogicalDatastoreType, DOMStore> datastores = ImmutableMap.of(
LogicalDatastoreType.OPERATIONAL, (DOMStore)operStore,
LogicalDatastoreType.CONFIGURATION, configStore);
+++ /dev/null
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.md.sal.dom.store.benchmark;
-
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
-import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
-import org.openjdk.jmh.annotations.BenchmarkMode;
-import org.openjdk.jmh.annotations.Fork;
-import org.openjdk.jmh.annotations.Level;
-import org.openjdk.jmh.annotations.Mode;
-import org.openjdk.jmh.annotations.OutputTimeUnit;
-import org.openjdk.jmh.annotations.Scope;
-import org.openjdk.jmh.annotations.Setup;
-import org.openjdk.jmh.annotations.State;
-import org.openjdk.jmh.annotations.TearDown;
-
-/**
- * Benchmark for testing of performance of write operations for InMemoryDataStore. The instance
- * of benchmark creates InMemoryDataStore with Data Change Listener Executor Service as BlockingBoundedFastThreadPool
- * and DOM Store Executor Service as Blocking Bounded Fast Thread Pool.
- *
- * @see org.opendaylight.yangtools.util.concurrent.SpecialExecutors
- * @see org.opendaylight.controller.md.sal.dom.store.benchmark.AbstractInMemoryDatastoreWriteTransactionBenchmark
- *
- * @author Lukas Sedlak <lsedlak@cisco.com>
- */
-@State(Scope.Thread)
-@BenchmarkMode(Mode.AverageTime)
-@OutputTimeUnit(TimeUnit.MILLISECONDS)
-@Fork(1)
-public class InMemoryDataStoreWithExecutorServiceBenchmark extends AbstractInMemoryDatastoreWriteTransactionBenchmark {
-
- private static final int MAX_DATA_CHANGE_EXECUTOR_POOL_SIZE = 20;
- private static final int MAX_DATA_CHANGE_EXECUTOR_QUEUE_SIZE = 1000;
- private static final int MAX_DATA_STORE_EXECUTOR_QUEUE_SIZE = 5000;
-
- @Override
- @Setup(Level.Trial)
- public void setUp() throws Exception {
- final String name = "DS_BENCHMARK";
- final ExecutorService dataChangeListenerExecutor = SpecialExecutors.newBlockingBoundedFastThreadPool(
- MAX_DATA_CHANGE_EXECUTOR_POOL_SIZE, MAX_DATA_CHANGE_EXECUTOR_QUEUE_SIZE, name + "-DCL");
-
- final ListeningExecutorService domStoreExecutor = MoreExecutors.listeningDecorator(SpecialExecutors.newBoundedSingleThreadExecutor(
- MAX_DATA_STORE_EXECUTOR_QUEUE_SIZE, "DOMStore-" + name ));
-
- domStore = new InMemoryDOMDataStore(name, domStoreExecutor,
- dataChangeListenerExecutor);
- schemaContext = BenchmarkModel.createTestContext();
- domStore.onGlobalContextUpdated(schemaContext);
- initTestNode();
- }
-
- @Override
- @TearDown
- public void tearDown() {
- schemaContext = null;
- domStore = null;
- }
-}
*/
package org.opendaylight.controller.md.sal.dom.store.benchmark;
-import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
final ExecutorService dataChangeListenerExecutor = SpecialExecutors.newBlockingBoundedFastThreadPool(
MAX_DATA_CHANGE_EXECUTOR_POOL_SIZE, MAX_DATA_CHANGE_EXECUTOR_QUEUE_SIZE, name + "-DCL");
- domStore = new InMemoryDOMDataStore("SINGLE_THREADED_DS_BENCHMARK", MoreExecutors.sameThreadExecutor(),
- dataChangeListenerExecutor);
+ domStore = new InMemoryDOMDataStore("SINGLE_THREADED_DS_BENCHMARK", dataChangeListenerExecutor);
schemaContext = BenchmarkModel.createTestContext();
domStore.onGlobalContextUpdated(schemaContext);
initTestNode();
@Setup(Level.Trial)
public void setUp() throws Exception {
- domStore = new InMemoryDOMDataStore("SINGLE_THREADED_DS_BENCHMARK", MoreExecutors.sameThreadExecutor(),
- MoreExecutors.sameThreadExecutor());
+ domStore = new InMemoryDOMDataStore("SINGLE_THREADED_DS_BENCHMARK", MoreExecutors.sameThreadExecutor());
schemaContext = BenchmarkModel.createTestContext();
domStore.onGlobalContextUpdated(schemaContext);
initTestNode();
}
public DOMStore createConfigurationDatastore() {
- InMemoryDOMDataStore store = new InMemoryDOMDataStore("CFG",
- MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
+ InMemoryDOMDataStore store = new InMemoryDOMDataStore("CFG", MoreExecutors.sameThreadExecutor());
schemaService.registerSchemaContextListener(store);
return store;
}
public DOMStore createOperationalDatastore() {
- InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER",
- MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
+ InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
schemaService.registerSchemaContextListener(store);
return store;
}
}
public DOMStore createConfigurationDatastore() {
- InMemoryDOMDataStore store = new InMemoryDOMDataStore("CFG",
- MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
+ InMemoryDOMDataStore store = new InMemoryDOMDataStore("CFG", MoreExecutors.sameThreadExecutor());
schemaService.registerSchemaContextListener(store);
return store;
}
public DOMStore createOperationalDatastore() {
- InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER",
- MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
+ InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
schemaService.registerSchemaContextListener(store);
return store;
}
package org.opendaylight.controller.sal.binding.test.util;
import static com.google.common.base.Preconditions.checkState;
-
import com.google.common.annotations.Beta;
import com.google.common.collect.ClassToInstanceMap;
import com.google.common.collect.ImmutableClassToInstanceMap;
public void startNewDomDataBroker() {
checkState(executor != null, "Executor needs to be set");
- InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", executor,
- MoreExecutors.sameThreadExecutor());
- InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", executor,
- MoreExecutors.sameThreadExecutor());
+ InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
+ InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", MoreExecutors.sameThreadExecutor());
newDatastores = ImmutableMap.<LogicalDatastoreType, DOMStore>builder()
.put(LogicalDatastoreType.OPERATIONAL, operStore)
.put(LogicalDatastoreType.CONFIGURATION, configStore)
private final Map<String, DOMStoreTransactionChain> transactionChains = new HashMap<>();
- protected Shard(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses,
- DatastoreContext datastoreContext, SchemaContext schemaContext) {
+ protected Shard(final ShardIdentifier name, final Map<ShardIdentifier, String> peerAddresses,
+ final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
super(name.toString(), mapPeerAddresses(peerAddresses),
Optional.of(datastoreContext.getShardRaftConfig()));
shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString(),
datastoreContext.getDataStoreMXBeanType());
- shardMBean.setDataStoreExecutor(store.getDomStoreExecutor());
shardMBean.setNotificationManager(store.getDataChangeListenerNotificationManager());
if (isMetricsCaptureEnabled()) {
}
private static Map<String, String> mapPeerAddresses(
- Map<ShardIdentifier, String> peerAddresses) {
+ final Map<ShardIdentifier, String> peerAddresses) {
Map<String, String> map = new HashMap<>();
for (Map.Entry<ShardIdentifier, String> entry : peerAddresses
public static Props props(final ShardIdentifier name,
final Map<ShardIdentifier, String> peerAddresses,
- DatastoreContext datastoreContext, SchemaContext schemaContext) {
+ final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
Preconditions.checkNotNull(name, "name should not be null");
Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");
Preconditions.checkNotNull(datastoreContext, "dataStoreContext should not be null");
}
@Override
- public void onReceiveRecover(Object message) throws Exception {
+ public void onReceiveRecover(final Object message) throws Exception {
if(LOG.isDebugEnabled()) {
LOG.debug("onReceiveRecover: Received message {} from {}",
message.getClass().toString(),
}
@Override
- public void onReceiveCommand(Object message) throws Exception {
+ public void onReceiveCommand(final Object message) throws Exception {
if(LOG.isDebugEnabled()) {
LOG.debug("onReceiveCommand: Received message {} from {}", message, getSender());
}
}
}
- private void handleCommitTransaction(CommitTransaction commit) {
+ private void handleCommitTransaction(final CommitTransaction commit) {
final String transactionID = commit.getTransactionID();
LOG.debug("Committing transaction {}", transactionID);
commitCoordinator.currentTransactionComplete(transactionID, true);
}
- private void handleCanCommitTransaction(CanCommitTransaction canCommit) {
+ private void handleCanCommitTransaction(final CanCommitTransaction canCommit) {
LOG.debug("Can committing transaction {}", canCommit.getTransactionID());
commitCoordinator.handleCanCommit(canCommit, getSender(), self());
}
readyTransactionReply, getSelf());
}
- private void handleAbortTransaction(AbortTransaction abort) {
+ private void handleAbortTransaction(final AbortTransaction abort) {
doAbortTransaction(abort.getTransactionID(), getSender());
}
- private void doAbortTransaction(String transactionID, final ActorRef sender) {
+ private void doAbortTransaction(final String transactionID, final ActorRef sender) {
final CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
if(cohortEntry != null) {
LOG.debug("Aborting transaction {}", transactionID);
Futures.addCallback(future, new FutureCallback<Void>() {
@Override
- public void onSuccess(Void v) {
+ public void onSuccess(final Void v) {
shardMBean.incrementAbortTransactionsCount();
if(sender != null) {
}
@Override
- public void onFailure(Throwable t) {
+ public void onFailure(final Throwable t) {
LOG.error(t, "An exception happened during abort");
if(sender != null) {
}
}
- private void handleCreateTransaction(Object message) {
+ private void handleCreateTransaction(final Object message) {
if (isLeader()) {
createTransaction(CreateTransaction.fromSerializable(message));
} else if (getLeader() != null) {
}
}
- private void handleReadDataReply(Object message) {
+ private void handleReadDataReply(final Object message) {
// This must be for install snapshot. Don't want to open this up and trigger
// deSerialization
getSender().tell(PoisonPill.getInstance(), self());
}
- private void closeTransactionChain(CloseTransactionChain closeTransactionChain) {
+ private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) {
DOMStoreTransactionChain chain =
transactionChains.remove(closeTransactionChain.getTransactionChainId());
return transactionActor;
}
- private void syncCommitTransaction(DOMStoreWriteTransaction transaction)
+ private void syncCommitTransaction(final DOMStoreWriteTransaction transaction)
throws ExecutionException, InterruptedException {
DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
commitCohort.preCommit().get();
commitCohort.commit().get();
}
- private void commitWithNewTransaction(Modification modification) {
+ private void commitWithNewTransaction(final Modification modification) {
DOMStoreWriteTransaction tx = store.newWriteOnlyTransaction();
modification.apply(tx);
try {
}
}
- private void updateSchemaContext(UpdateSchemaContext message) {
+ private void updateSchemaContext(final UpdateSchemaContext message) {
this.schemaContext = message.getSchemaContext();
updateSchemaContext(message.getSchemaContext());
store.onGlobalContextUpdated(message.getSchemaContext());
}
@VisibleForTesting
- void updateSchemaContext(SchemaContext schemaContext) {
+ void updateSchemaContext(final SchemaContext schemaContext) {
store.onGlobalContextUpdated(schemaContext);
}
- private void registerChangeListener(RegisterChangeListener registerChangeListener) {
+ private void registerChangeListener(final RegisterChangeListener registerChangeListener) {
LOG.debug("registerDataChangeListener for {}", registerChangeListener.getPath());
private ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
NormalizedNode<?, ?>>> doChangeListenerRegistration(
- RegisterChangeListener registerChangeListener) {
+ final RegisterChangeListener registerChangeListener) {
ActorSelection dataChangeListenerPath = getContext().system().actorSelection(
registerChangeListener.getDataChangeListenerPath());
@Override
protected
- void startLogRecoveryBatch(int maxBatchSize) {
+ void startLogRecoveryBatch(final int maxBatchSize) {
currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize);
if(LOG.isDebugEnabled()) {
}
@Override
- protected void appendRecoveredLogEntry(Payload data) {
+ protected void appendRecoveredLogEntry(final Payload data) {
if (data instanceof CompositeModificationPayload) {
currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification());
} else {
}
@Override
- protected void applyRecoverySnapshot(ByteString snapshot) {
+ protected void applyRecoverySnapshot(final ByteString snapshot) {
if(recoveryCoordinator == null) {
recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext);
}
}
@Override
- protected void applyState(ActorRef clientActor, String identifier, Object data) {
+ protected void applyState(final ActorRef clientActor, final String identifier, final Object data) {
if (data instanceof CompositeModificationPayload) {
Object modification = ((CompositeModificationPayload) data).getModification();
@VisibleForTesting
@Override
- protected void applySnapshot(ByteString snapshot) {
+ protected void applySnapshot(final ByteString snapshot) {
// Since this will be done only on Recovery or when this actor is a Follower
// we can safely commit everything in here. We not need to worry about event notifications
// as they would have already been disabled on the follower
return dataPersistenceProvider;
}
- @Override protected void onLeaderChanged(String oldLeader, String newLeader) {
+ @Override protected void onLeaderChanged(final String oldLeader, final String newLeader) {
shardMBean.setLeader(newLeader);
}
final DatastoreContext datastoreContext;
final SchemaContext schemaContext;
- ShardCreator(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses,
- DatastoreContext datastoreContext, SchemaContext schemaContext) {
+ ShardCreator(final ShardIdentifier name, final Map<ShardIdentifier, String> peerAddresses,
+ final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
this.name = name;
this.peerAddresses = peerAddresses;
this.datastoreContext = datastoreContext;
private volatile ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
NormalizedNode<?, ?>>> delegate;
- DelayedListenerRegistration(RegisterChangeListener registerChangeListener) {
+ DelayedListenerRegistration(final RegisterChangeListener registerChangeListener) {
this.registerChangeListener = registerChangeListener;
}
- void setDelegate( ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
+ void setDelegate( final ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
NormalizedNode<?, ?>>> registration) {
this.delegate = registration;
}
*/
package org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-
/**
* @author Basheeruddin syedbahm@cisco.com
*
private static final Logger LOG = LoggerFactory.getLogger(ShardMBeanFactory.class);
- private static Cache<String,ShardStats> shardMBeansCache =
+ private static final Cache<String,ShardStats> shardMBeansCache =
CacheBuilder.newBuilder().weakValues().build();
public static ShardStats getShardStatsMBean(final String shardName, final String mxBeanType) {
package org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard;
+import java.text.SimpleDateFormat;
+import java.util.Date;
import java.util.List;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
-
import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean;
import org.opendaylight.controller.md.sal.common.util.jmx.QueuedNotificationManagerMXBeanImpl;
import org.opendaylight.controller.md.sal.common.util.jmx.ThreadExecutorStats;
import org.opendaylight.controller.md.sal.common.util.jmx.ThreadExecutorStatsMXBeanImpl;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
import org.opendaylight.yangtools.util.concurrent.ListenerNotificationQueueStats;
import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-
/**
* Maintains statistics for a shard.
*
private ThreadExecutorStatsMXBeanImpl notificationExecutorStatsBean;
- private ThreadExecutorStatsMXBeanImpl dataStoreExecutorStatsBean;
-
private QueuedNotificationManagerMXBeanImpl notificationManagerStatsBean;
private final SimpleDateFormat sdf =
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
- public ShardStats(String shardName, String mxBeanType) {
+ public ShardStats(final String shardName, final String mxBeanType) {
super(shardName, mxBeanType, JMX_CATEGORY_SHARD);
}
- public void setDataStoreExecutor(ExecutorService dsExecutor) {
- this.dataStoreExecutorStatsBean = ThreadExecutorStatsMXBeanImpl.create(dsExecutor);
- }
-
- public void setNotificationManager(QueuedNotificationManager<?, ?> manager) {
+ public void setNotificationManager(final QueuedNotificationManager<?, ?> manager) {
this.notificationManagerStatsBean = new QueuedNotificationManagerMXBeanImpl(manager,
"notification-manager", getMBeanType(), getMBeanCategory());
return abortTransactionsCount.incrementAndGet();
}
- public void setLeader(String leader) {
+ public void setLeader(final String leader) {
this.leader = leader;
}
- public void setRaftState(String raftState) {
+ public void setRaftState(final String raftState) {
this.raftState = raftState;
}
- public void setLastLogTerm(long lastLogTerm) {
+ public void setLastLogTerm(final long lastLogTerm) {
this.lastLogTerm = lastLogTerm;
}
- public void setLastLogIndex(long lastLogIndex) {
+ public void setLastLogIndex(final long lastLogIndex) {
this.lastLogIndex = lastLogIndex;
}
- public void setCurrentTerm(long currentTerm) {
+ public void setCurrentTerm(final long currentTerm) {
this.currentTerm = currentTerm;
}
- public void setCommitIndex(long commitIndex) {
+ public void setCommitIndex(final long commitIndex) {
this.commitIndex = commitIndex;
}
- public void setLastApplied(long lastApplied) {
+ public void setLastApplied(final long lastApplied) {
this.lastApplied = lastApplied;
}
- public void setLastCommittedTransactionTime(long lastCommittedTransactionTime) {
+ public void setLastCommittedTransactionTime(final long lastCommittedTransactionTime) {
this.lastCommittedTransactionTime = lastCommittedTransactionTime;
}
@Override
public ThreadExecutorStats getDataStoreExecutorStats() {
- return dataStoreExecutorStatsBean == null ? null :
- dataStoreExecutorStatsBean.toThreadExecutorStats();
+ // FIXME: this particular thing does not work, as it really is DS-specific
+ return null;
}
@Override
abortTransactionsCount.set(0);
}
+
+ public void setDataStore(final InMemoryDOMDataStore store) {
+ setNotificationManager(store.getDataChangeListenerNotificationManager());
+ }
}
package org.opendaylight.controller.cluster.datastore;
+import static org.junit.Assert.assertEquals;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.testkit.JavaTestKit;
-import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import static org.junit.Assert.assertEquals;
-
public class DataChangeListenerRegistrationTest extends AbstractActorTest {
- private static ListeningExecutorService storeExecutor = MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor());
-
- private static final InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", storeExecutor,
- MoreExecutors.sameThreadExecutor());
+ private static final InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
static {
store.onGlobalContextUpdated(TestModel.createTestContext());
final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
// do not put code outside this method, will run afterwards
@Override
- protected String match(Object in) {
+ protected String match(final Object in) {
if (in.getClass().equals(CloseDataChangeListenerRegistrationReply.SERIALIZABLE_CLASS)) {
return "match";
} else {
private AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> noOpDataChangeListener(){
return new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
@Override
- public void onDataChanged(AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
+ public void onDataChanged(final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
}
};
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
-
public class ShardTest extends AbstractActorTest {
private static final SchemaContext SCHEMA_CONTEXT = TestModel.createTestContext();
*/
@Test
public void testInMemoryDataStoreRestore() throws ReadFailedException {
- InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.listeningDecorator(
- MoreExecutors.sameThreadExecutor()), MoreExecutors.sameThreadExecutor());
+ InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.sameThreadExecutor());
store.onGlobalContextUpdated(SCHEMA_CONTEXT);
NormalizedNode<?, ?> actual = readStore(store);
assertEquals(expected, actual);
-
}
@Test
import akka.actor.Props;
import akka.pattern.AskTimeoutException;
import akka.testkit.TestActorRef;
-import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
* @author Basheeruddin Ahmed <syedbahm@cisco.com>
*/
public class ShardTransactionFailureTest extends AbstractActorTest {
- private static ListeningExecutorService storeExecutor =
- MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor());
-
private static final InMemoryDOMDataStore store =
- new InMemoryDOMDataStore("OPER", storeExecutor,
- MoreExecutors.sameThreadExecutor());
+ new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
private static final SchemaContext testSchemaContext =
TestModel.createTestContext();
import akka.actor.Terminated;
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
-import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import scala.concurrent.duration.Duration;
public class ShardTransactionTest extends AbstractActorTest {
- private static ListeningExecutorService storeExecutor =
- MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor());
-
private static final InMemoryDOMDataStore store =
- new InMemoryDOMDataStore("OPER", storeExecutor, MoreExecutors.sameThreadExecutor());
+ new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
private static final SchemaContext testSchemaContext = TestModel.createTestContext();
@Before
public void setUp(){
- store = new InMemoryDOMDataStore("test", MoreExecutors.sameThreadExecutor(),
- MoreExecutors.sameThreadExecutor());
+ store = new InMemoryDOMDataStore("test", MoreExecutors.sameThreadExecutor());
store.onGlobalContextUpdated(TestModel.createTestContext());
}
- protected void commitTransaction(DOMStoreWriteTransaction transaction){
+ protected void commitTransaction(final DOMStoreWriteTransaction transaction){
DOMStoreThreePhaseCommitCohort cohort = transaction.ready();
cohort.preCommit();
cohort.commit();
}
- protected Optional<NormalizedNode<?,?>> readData(YangInstanceIdentifier path) throws Exception{
+ protected Optional<NormalizedNode<?,?>> readData(final YangInstanceIdentifier path) throws Exception{
DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
ListenableFuture<Optional<NormalizedNode<?, ?>>> future = transaction.read(path);
return future.get();
import static org.junit.Assert.assertTrue;
import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.OPERATIONAL;
-
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
-
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-
public class DOMBrokerPerformanceTest {
private static final Logger log = LoggerFactory.getLogger(DOMBrokerPerformanceTest.class);
@Before
public void setupStore() {
- InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER",
- MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
- InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG",
- MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
+ InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
+ InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", MoreExecutors.sameThreadExecutor());
schemaContext = TestModel.createTestContext();
operStore.onGlobalContextUpdated(schemaContext);
public void setupStore() {
InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER",
- MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
+ MoreExecutors.sameThreadExecutor());
InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG",
- MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
+ MoreExecutors.sameThreadExecutor());
schemaContext = TestModel.createTestContext();
operStore.onGlobalContextUpdated(schemaContext);
import static org.junit.Assert.fail;
import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.OPERATIONAL;
-
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-
public class DOMTransactionChainTest {
private SchemaContext schemaContext;
@Before
public void setupStore() {
- InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER",
- MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
- InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG",
- MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
+ InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
+ InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", MoreExecutors.sameThreadExecutor());
schemaContext = TestModel.createTestContext();
operStore.onGlobalContextUpdated(schemaContext);
getMaxDataChangeExecutorQueueSize(), getMaxDataChangeListenerQueueSize(),
getMaxDataStoreExecutorQueueSize()));
- InMemoryDataStoreStats statsBean = new InMemoryDataStoreStats("InMemoryConfigDataStore",
- dataStore.getDataChangeListenerNotificationManager(), dataStore.getDomStoreExecutor());
-
+ InMemoryDataStoreStats statsBean = new InMemoryDataStoreStats("InMemoryConfigDataStore", dataStore);
dataStore.setCloseable(statsBean);
return dataStore;
getMaxDataStoreExecutorQueueSize()));
- InMemoryDataStoreStats statsBean = new InMemoryDataStoreStats("InMemoryOperationalDataStore",
- dataStore.getDataChangeListenerNotificationManager(), dataStore.getDomStoreExecutor());
+ InMemoryDataStoreStats statsBean = new InMemoryDataStoreStats("InMemoryOperationalDataStore", dataStore);
dataStore.setCloseable(statsBean);
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
public class InMemoryDOMDataStore extends TransactionReadyPrototype implements DOMStore, Identifiable<String>, SchemaContextListener, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataStore.class);
private static final ListenableFuture<Void> SUCCESSFUL_FUTURE = Futures.immediateFuture(null);
+ private static final ListenableFuture<Boolean> CAN_COMMIT_FUTURE = Futures.immediateFuture(Boolean.TRUE);
private static final Invoker<DataChangeListenerRegistration<?>, DOMImmutableDataChangeEvent> DCL_NOTIFICATION_MGR_INVOKER =
new Invoker<DataChangeListenerRegistration<?>, DOMImmutableDataChangeEvent>() {
private final QueuedNotificationManager<DataChangeListenerRegistration<?>, DOMImmutableDataChangeEvent> dataChangeListenerNotificationManager;
private final ExecutorService dataChangeListenerExecutor;
- private final ListeningExecutorService commitExecutor;
private final boolean debugTransactions;
private final String name;
private volatile AutoCloseable closeable;
- public InMemoryDOMDataStore(final String name, final ListeningExecutorService commitExecutor,
- final ExecutorService dataChangeListenerExecutor) {
- this(name, commitExecutor, dataChangeListenerExecutor,
- InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_LISTENER_QUEUE_SIZE, false);
+ public InMemoryDOMDataStore(final String name, final ExecutorService dataChangeListenerExecutor) {
+ this(name, dataChangeListenerExecutor, InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_LISTENER_QUEUE_SIZE, false);
}
- public InMemoryDOMDataStore(final String name, final ListeningExecutorService commitExecutor,
- final ExecutorService dataChangeListenerExecutor, final int maxDataChangeListenerQueueSize,
- final boolean debugTransactions) {
+ public InMemoryDOMDataStore(final String name, final ExecutorService dataChangeListenerExecutor,
+ final int maxDataChangeListenerQueueSize, final boolean debugTransactions) {
this.name = Preconditions.checkNotNull(name);
- this.commitExecutor = Preconditions.checkNotNull(commitExecutor);
this.dataChangeListenerExecutor = Preconditions.checkNotNull(dataChangeListenerExecutor);
this.debugTransactions = debugTransactions;
return dataChangeListenerNotificationManager;
}
- public ExecutorService getDomStoreExecutor() {
- return commitExecutor;
- }
-
@Override
public final String getIdentifier() {
return name;
@Override
public void close() {
- ExecutorServiceUtil.tryGracefulShutdown(commitExecutor, 30, TimeUnit.SECONDS);
ExecutorServiceUtil.tryGracefulShutdown(dataChangeListenerExecutor, 30, TimeUnit.SECONDS);
if(closeable != null) {
@Override
public ListenableFuture<Boolean> canCommit() {
- return commitExecutor.submit(new Callable<Boolean>() {
- @Override
- public Boolean call() throws TransactionCommitFailedException {
- try {
- dataTree.validate(modification);
- LOG.debug("Store Transaction: {} can be committed", transaction.getIdentifier());
- return true;
- } catch (ConflictingModificationAppliedException e) {
- LOG.warn("Store Tx: {} Conflicting modification for {}.", transaction.getIdentifier(),
- e.getPath());
- transaction.warnDebugContext(LOG);
- throw new OptimisticLockFailedException("Optimistic lock failed.",e);
- } catch (DataValidationFailedException e) {
- LOG.warn("Store Tx: {} Data Precondition failed for {}.", transaction.getIdentifier(),
- e.getPath(), e);
- transaction.warnDebugContext(LOG);
- throw new TransactionCommitFailedException("Data did not pass validation.",e);
- }
- }
- });
+ try {
+ dataTree.validate(modification);
+ LOG.debug("Store Transaction: {} can be committed", transaction.getIdentifier());
+ return CAN_COMMIT_FUTURE;
+ } catch (ConflictingModificationAppliedException e) {
+ LOG.warn("Store Tx: {} Conflicting modification for {}.", transaction.getIdentifier(),
+ e.getPath());
+ transaction.warnDebugContext(LOG);
+ return Futures.immediateFailedFuture(new OptimisticLockFailedException("Optimistic lock failed.", e));
+ } catch (DataValidationFailedException e) {
+ LOG.warn("Store Tx: {} Data Precondition failed for {}.", transaction.getIdentifier(),
+ e.getPath(), e);
+ transaction.warnDebugContext(LOG);
+ return Futures.immediateFailedFuture(new TransactionCommitFailedException("Data did not pass validation.", e));
+ } catch (Exception e) {
+ LOG.warn("Unexpected failure in validation phase", e);
+ return Futures.immediateFailedFuture(e);
+ }
}
@Override
public ListenableFuture<Void> preCommit() {
- return commitExecutor.submit(new Callable<Void>() {
- @Override
- public Void call() {
- candidate = dataTree.prepare(modification);
- listenerResolver = ResolveDataChangeEventsTask.create(candidate, listenerTree);
- return null;
- }
- });
+ try {
+ candidate = dataTree.prepare(modification);
+ listenerResolver = ResolveDataChangeEventsTask.create(candidate, listenerTree);
+ return SUCCESSFUL_FUTURE;
+ } catch (Exception e) {
+ LOG.warn("Unexpected failure in pre-commit phase", e);
+ return Futures.immediateFailedFuture(e);
+ }
}
@Override
*/
package org.opendaylight.controller.md.sal.dom.store.impl;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.ExecutorService;
import javax.annotation.Nullable;
import org.opendaylight.controller.sal.core.api.model.SchemaService;
ExecutorService dataChangeListenerExecutor = SpecialExecutors.newBlockingBoundedFastThreadPool(
dclExecutorMaxPoolSize, dclExecutorMaxQueueSize, name + "-DCL" );
- final ListeningExecutorService commitExecutor = MoreExecutors.sameThreadExecutor();
- final InMemoryDOMDataStore dataStore = new InMemoryDOMDataStore(name,
- commitExecutor, dataChangeListenerExecutor,
+ final InMemoryDOMDataStore dataStore = new InMemoryDOMDataStore(name, dataChangeListenerExecutor,
actualProperties.getMaxDataChangeListenerQueueSize(), debugTransactions);
if (schemaService != null) {
package org.opendaylight.controller.md.sal.dom.store.impl.jmx;
-import java.util.concurrent.ExecutorService;
import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean;
import org.opendaylight.controller.md.sal.common.util.jmx.QueuedNotificationManagerMXBeanImpl;
import org.opendaylight.controller.md.sal.common.util.jmx.ThreadExecutorStatsMXBeanImpl;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager;
/**
public class InMemoryDataStoreStats implements AutoCloseable {
private final AbstractMXBean notificationExecutorStatsBean;
- private final AbstractMXBean dataStoreExecutorStatsBean;
private final QueuedNotificationManagerMXBeanImpl notificationManagerStatsBean;
- public InMemoryDataStoreStats(final String mBeanType, final QueuedNotificationManager<?, ?> manager,
- final ExecutorService dataStoreExecutor) {
+ public InMemoryDataStoreStats(final String mBeanType, final QueuedNotificationManager<?, ?> manager) {
notificationManagerStatsBean = new QueuedNotificationManagerMXBeanImpl(manager,
"notification-manager", mBeanType, null);
if (notificationExecutorStatsBean != null) {
notificationExecutorStatsBean.registerMBean();
}
+ }
- dataStoreExecutorStatsBean = ThreadExecutorStatsMXBeanImpl.create(dataStoreExecutor,
- "data-store-executor", mBeanType, null);
- if (dataStoreExecutorStatsBean != null) {
- dataStoreExecutorStatsBean.registerMBean();
- }
+ public InMemoryDataStoreStats(final String name, final InMemoryDOMDataStore dataStore) {
+ this(name, dataStore.getDataChangeListenerNotificationManager());
}
@Override
notificationExecutorStatsBean.unregisterMBean();
}
- if(dataStoreExecutorStatsBean != null) {
- dataStoreExecutorStatsBean.unregisterMBean();
- }
-
if(notificationManagerStatsBean != null) {
notificationManagerStatsBean.unregisterMBean();
}
*/
package org.opendaylight.controller.md.sal.dom.store.impl;
-import com.google.common.util.concurrent.MoreExecutors;
-
import java.util.Collection;
import java.util.Map;
-
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
dclExecutorService = new TestDCLExecutorService(
SpecialExecutors.newBlockingBoundedFastThreadPool(1, 10, "DCL" ));
- datastore = new InMemoryDOMDataStore("TEST",
- MoreExecutors.sameThreadExecutor(), dclExecutorService );
+ datastore = new InMemoryDOMDataStore("TEST", dclExecutorService);
datastore.onGlobalContextUpdated(schemaContext);
}
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
-
import com.google.common.base.Optional;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
-
import java.util.concurrent.ExecutionException;
-
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
@Before
public void setupStore() {
- domStore = new InMemoryDOMDataStore("TEST", MoreExecutors.sameThreadExecutor(),
- MoreExecutors.sameThreadExecutor());
+ domStore = new InMemoryDOMDataStore("TEST", MoreExecutors.sameThreadExecutor());
schemaContext = TestModel.createTestContext();
domStore.onGlobalContextUpdated(schemaContext);
}
package org.opendaylight.controller.md.sal.dom.store.impl;
import static org.junit.Assert.assertNotNull;
-
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.ExecutionException;
-
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import com.google.common.base.Throwables;
-import com.google.common.util.concurrent.MoreExecutors;
-
public class SchemaUpdateForTransactionTest {
private static final YangInstanceIdentifier TOP_PATH = YangInstanceIdentifier.of(Top.QNAME);
@Before
public void setupStore() {
- domStore = new InMemoryDOMDataStore("TEST", MoreExecutors.sameThreadExecutor(),
- MoreExecutors.sameThreadExecutor());
+ domStore = new InMemoryDOMDataStore("TEST", MoreExecutors.sameThreadExecutor());
loadSchemas(RockTheHouseInput.class);
}
}
public DOMStore createConfigurationDatastore() {
- InMemoryDOMDataStore store = new InMemoryDOMDataStore("CFG",
- MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
+ InMemoryDOMDataStore store = new InMemoryDOMDataStore("CFG", MoreExecutors.sameThreadExecutor());
schemaService.registerSchemaContextListener(store);
return store;
}
public DOMStore createOperationalDatastore() {
- InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER",
- MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
+ InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
schemaService.registerSchemaContextListener(store);
return store;
}