import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.opendaylight.controller.cluster.common.actor.CommonConfig;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
/**
* A Shard represents a portion of the logical data tree <br/>
* <p>
- * Our Shard uses InMemoryDataStore as it's internal representation and delegates all requests it
+ * Our Shard uses InMemoryDataTree as it's internal representation and delegates all requests it
* </p>
*/
public class Shard extends RaftActor {
static final String DEFAULT_NAME = "default";
// The state of this Shard
- private final InMemoryDOMDataStore store;
+ private final ShardDataTree store;
/// The name of this shard
private final String name;
private final MessageTracker appendEntriesReplyTracker;
- private final DOMTransactionFactory domTransactionFactory;
-
private final ShardTransactionActorFactory transactionActorFactory;
private final ShardSnapshotCohort snapshotCohort;
LOG.info("Shard created : {}, persistent : {}", name, datastoreContext.isPersistent());
- store = InMemoryDOMDataStoreFactory.create(name.toString(), null,
- datastoreContext.getDataStoreProperties());
-
- if (schemaContext != null) {
- store.onGlobalContextUpdated(schemaContext);
- }
+ store = new ShardDataTree(schemaContext);
shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString(),
datastoreContext.getDataStoreMXBeanType());
- shardMBean.setNotificationManager(store.getDataChangeListenerNotificationManager());
shardMBean.setShardActor(getSelf());
if (isMetricsCaptureEnabled()) {
getContext().become(new MeteringBehavior(this));
}
- domTransactionFactory = new DOMTransactionFactory(store, shardMBean, LOG, this.name);
-
- commitCoordinator = new ShardCommitCoordinator(domTransactionFactory,
+ commitCoordinator = new ShardCommitCoordinator(store,
TimeUnit.SECONDS.convert(5, TimeUnit.MINUTES),
datastoreContext.getShardTransactionCommitQueueCapacity(), self(), LOG, this.name);
appendEntriesReplyTracker = new MessageTracker(AppendEntriesReply.class,
getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis());
- transactionActorFactory = new ShardTransactionActorFactory(domTransactionFactory, datastoreContext,
+ transactionActorFactory = new ShardTransactionActorFactory(store, datastoreContext,
new Dispatchers(context().system().dispatchers()).getDispatcherPath(
Dispatchers.DispatcherType.Transaction), self(), getContext(), shardMBean);
}
private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) {
- domTransactionFactory.closeTransactionChain(closeTransactionChain.getTransactionChainId());
+ store.closeTransactionChain(closeTransactionChain.getTransactionChainId());
}
private ActorRef createTypedTransactionActor(int transactionType,
}
private void commitWithNewTransaction(final Modification modification) {
- DOMStoreWriteTransaction tx = store.newWriteOnlyTransaction();
- modification.apply(tx);
+ ReadWriteShardDataTreeTransaction tx = store.newReadWriteTransaction(modification.toString(), null);
+ modification.apply(tx.getSnapshot());
try {
snapshotCohort.syncCommitTransaction(tx);
shardMBean.incrementCommittedTransactionCount();
shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
- } catch (InterruptedException | ExecutionException e) {
+ } catch (Exception e) {
shardMBean.incrementFailedTransactionsCount();
LOG.error("{}: Failed to commit", persistenceId(), e);
}
@VisibleForTesting
void updateSchemaContext(final SchemaContext schemaContext) {
- store.onGlobalContextUpdated(schemaContext);
+ store.updateSchemaContext(schemaContext);
}
private boolean isMetricsCaptureEnabled() {
persistenceId(), getId());
}
- domTransactionFactory.closeAllTransactionChains();
+ store.closeAllTransactionChains();
}
}
}
@VisibleForTesting
- public InMemoryDOMDataStore getDataStore() {
+ public ShardDataTree getDataStore() {
return store;
}