X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShard.java;h=a5abd2fc69059f4af377ab85ac161372de15cbed;hb=09b8efdb40105cd4cd3c21c9a9aea2c6687972be;hp=9551c800ca003788de3beaf2898e48e4809cf1bb;hpb=7d03757a62c1909ee74b42e6acf30d3d7b961bfa;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 9551c800ca..a5abd2fc69 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -60,8 +60,10 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; +import org.opendaylight.controller.cluster.datastore.utils.Dispatchers; import org.opendaylight.controller.cluster.datastore.utils.MessageTracker; import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils; +import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier; import org.opendaylight.controller.cluster.raft.RaftActor; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; @@ -73,6 +75,7 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListene import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; @@ -111,9 +114,9 @@ public class Shard extends RaftActor { private final List delayedListenerRegistrations = Lists.newArrayList(); - private final DatastoreContext datastoreContext; + private DatastoreContext datastoreContext; - private final DataPersistenceProvider dataPersistenceProvider; + private DataPersistenceProvider dataPersistenceProvider; private SchemaContext schemaContext; @@ -121,7 +124,7 @@ public class Shard extends RaftActor { private final ShardCommitCoordinator commitCoordinator; - private final long transactionCommitTimeout; + private long transactionCommitTimeout; private Cancellable txCommitTimeoutCheckSchedule; @@ -137,6 +140,8 @@ public class Shard extends RaftActor { private final Map transactionChains = new HashMap<>(); + private final String txnDispatcherPath; + protected Shard(final ShardIdentifier name, final Map peerAddresses, final DatastoreContext datastoreContext, final SchemaContext schemaContext) { super(name.toString(), mapPeerAddresses(peerAddresses), @@ -145,7 +150,11 @@ public class Shard extends RaftActor { this.name = name; this.datastoreContext = datastoreContext; this.schemaContext = schemaContext; - this.dataPersistenceProvider = (datastoreContext.isPersistent()) ? new PersistentDataProvider() : new NonPersistentRaftDataProvider(); + this.dataPersistenceProvider = (datastoreContext.isPersistent()) + ? new PersistentDataProvider() : new NonPersistentRaftDataProvider(); + this.txnDispatcherPath = new Dispatchers(context().system().dispatchers()) + .getDispatcherPath(Dispatchers.DispatcherType.Transaction); + LOG.info("Shard created : {}, persistent : {}", name, datastoreContext.isPersistent()); @@ -167,8 +176,7 @@ public class Shard extends RaftActor { commitCoordinator = new ShardCommitCoordinator(TimeUnit.SECONDS.convert(1, TimeUnit.MINUTES), datastoreContext.getShardTransactionCommitQueueCapacity(), LOG, name.toString()); - transactionCommitTimeout = TimeUnit.MILLISECONDS.convert( - datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS); + setTransactionCommitTimeout(); // create a notifier actor for each cluster member roleChangeNotifier = createRoleChangeNotifier(name.toString()); @@ -177,6 +185,11 @@ public class Shard extends RaftActor { getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis()); } + private void setTransactionCommitTimeout() { + transactionCommitTimeout = TimeUnit.MILLISECONDS.convert( + datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS); + } + private static Map mapPeerAddresses( final Map peerAddresses) { Map map = new HashMap<>(); @@ -203,16 +216,20 @@ public class Shard extends RaftActor { private Optional createRoleChangeNotifier(String shardId) { ActorRef shardRoleChangeNotifier = this.getContext().actorOf( RoleChangeNotifier.getProps(shardId), shardId + "-notifier"); - return Optional.of(shardRoleChangeNotifier); + return Optional.of(shardRoleChangeNotifier); } @Override public void postStop() { + LOG.info("Stopping Shard {}", persistenceId()); + super.postStop(); if(txCommitTimeoutCheckSchedule != null) { txCommitTimeoutCheckSchedule.cancel(); } + + shardMBean.unregisterMBean(); } @Override @@ -270,6 +287,10 @@ public class Shard extends RaftActor { resolved.getPeerAddress()); } else if (message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) { handleTransactionCommitTimeoutCheck(); + } else if(message instanceof DatastoreContext) { + onDatastoreContext((DatastoreContext)message); + } else if(message instanceof RegisterRoleChangeListener){ + roleChangeNotifier.get().forward(message, context()); } else { super.onReceiveCommand(message); } @@ -283,6 +304,24 @@ public class Shard extends RaftActor { return roleChangeNotifier; } + private void onDatastoreContext(DatastoreContext context) { + datastoreContext = context; + + commitCoordinator.setQueueCapacity(datastoreContext.getShardTransactionCommitQueueCapacity()); + + setTransactionCommitTimeout(); + + if(datastoreContext.isPersistent() && + dataPersistenceProvider instanceof NonPersistentRaftDataProvider) { + dataPersistenceProvider = new PersistentDataProvider(); + } else if(!datastoreContext.isPersistent() && + dataPersistenceProvider instanceof PersistentDataProvider) { + dataPersistenceProvider = new NonPersistentRaftDataProvider(); + } + + updateConfigParams(datastoreContext.getShardRaftConfig()); + } + private void handleTransactionCommitTimeoutCheck() { CohortEntry cohortEntry = commitCoordinator.getCurrentCohortEntry(); if(cohortEntry != null) { @@ -515,32 +554,19 @@ public class Shard extends RaftActor { shardMBean.incrementReadOnlyTransactionCount(); - return getContext().actorOf( - ShardTransaction.props(factory.newReadOnlyTransaction(), getSelf(), - schemaContext, datastoreContext, shardMBean, - transactionId.getRemoteTransactionId(), clientVersion), - transactionId.toString()); + return createShardTransaction(factory.newReadOnlyTransaction(), transactionId, clientVersion); } else if (transactionType == TransactionProxy.TransactionType.READ_WRITE.ordinal()) { shardMBean.incrementReadWriteTransactionCount(); - return getContext().actorOf( - ShardTransaction.props(factory.newReadWriteTransaction(), getSelf(), - schemaContext, datastoreContext, shardMBean, - transactionId.getRemoteTransactionId(), clientVersion), - transactionId.toString()); - + return createShardTransaction(factory.newReadWriteTransaction(), transactionId, clientVersion); } else if (transactionType == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) { shardMBean.incrementWriteOnlyTransactionCount(); - return getContext().actorOf( - ShardTransaction.props(factory.newWriteOnlyTransaction(), getSelf(), - schemaContext, datastoreContext, shardMBean, - transactionId.getRemoteTransactionId(), clientVersion), - transactionId.toString()); + return createShardTransaction(factory.newWriteOnlyTransaction(), transactionId, clientVersion); } else { throw new IllegalArgumentException( "Shard="+name + ":CreateTransaction message has unidentified transaction type=" @@ -548,6 +574,17 @@ public class Shard extends RaftActor { } } + private ActorRef createShardTransaction(DOMStoreTransaction transaction, ShardTransactionIdentifier transactionId, + short clientVersion){ + return getContext().actorOf( + ShardTransaction.props(transaction, getSelf(), + schemaContext, datastoreContext, shardMBean, + transactionId.getRemoteTransactionId(), clientVersion) + .withDispatcher(txnDispatcherPath), + transactionId.toString()); + + } + private void createTransaction(CreateTransaction createTransaction) { try { ActorRef transactionActor = createTransaction(createTransaction.getTransactionType(),