X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShard.java;h=9cda3f1aa168ee43c40f715753a49ecdbace8369;hp=999d0f8bafca9639baa70ea7c893656f87704ae7;hb=a4b4ae2880afda94538e0edea05310cd6ace2f4a;hpb=07ba9a998f0b3c3045ed8e31afda5c96de141b3b diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 999d0f8baf..9cda3f1aa1 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -25,6 +25,7 @@ import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.EnableNotification; import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; @@ -42,7 +43,9 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; @@ -74,10 +77,12 @@ public class Shard extends RaftActor { private final String name; - private SchemaContext schemaContext; + private volatile SchemaContext schemaContext; private final ShardStats shardMBean; + private final List dataChangeListeners = new ArrayList<>(); + private Shard(String name, Map peerAddresses) { super(name, peerAddresses); @@ -136,13 +141,29 @@ public class Shard extends RaftActor { } } + private ActorRef createTypedTransactionActor(CreateTransaction createTransaction,String transactionId){ + if(createTransaction.getTransactionType()== TransactionProxy.TransactionType.READ_ONLY.ordinal()){ + return getContext().actorOf( + ShardTransaction.props( store.newReadOnlyTransaction(), getSelf(), schemaContext), transactionId); + + }else if (createTransaction.getTransactionType()== TransactionProxy.TransactionType.READ_WRITE.ordinal()){ + return getContext().actorOf( + ShardTransaction.props( store.newReadWriteTransaction(), getSelf(), schemaContext), transactionId); + + + }else if (createTransaction.getTransactionType()== TransactionProxy.TransactionType.WRITE_ONLY.ordinal()){ + return getContext().actorOf( + ShardTransaction.props( store.newWriteOnlyTransaction(), getSelf(), schemaContext), transactionId); + }else{ + throw new IllegalArgumentException ("CreateTransaction message has unidentified transaction type="+createTransaction.getTransactionType()) ; + } + } + private void createTransaction(CreateTransaction createTransaction) { - DOMStoreReadWriteTransaction transaction = - store.newReadWriteTransaction(); + String transactionId = "shard-" + createTransaction.getTransactionId(); LOG.info("Creating transaction : {} " , transactionId); - ActorRef transactionActor = getContext().actorOf( - ShardTransaction.props(transaction, getSelf(), schemaContext), transactionId); + ActorRef transactionActor = createTypedTransactionActor(createTransaction,transactionId); getSender() .tell(new CreateTransactionReply(Serialization.serializedActorPath(transactionActor), createTransaction.getTransactionId()).toSerializable(), @@ -155,10 +176,23 @@ public class Shard extends RaftActor { modificationToCohort.remove(serialized); if (cohort == null) { LOG.error( - "Could not find cohort for modification : " + modification); + "Could not find cohort for modification : {}", modification); LOG.info("Writing modification using a new transaction"); - modification.apply(store.newReadWriteTransaction()); - return; + DOMStoreReadWriteTransaction transaction = + store.newReadWriteTransaction(); + modification.apply(transaction); + DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready(); + ListenableFuture future = + commitCohort.preCommit(); + try { + future.get(); + future = commitCohort.commit(); + future.get(); + } catch (InterruptedException e) { + LOG.error("Failed to commit", e); + } catch (ExecutionException e) { + LOG.error("Failed to commit", e); + } } final ListenableFuture future = cohort.commit(); @@ -213,6 +247,16 @@ public class Shard extends RaftActor { .system().actorSelection( registerChangeListener.getDataChangeListenerPath()); + + // Notify the listener if notifications should be enabled or not + // If this shard is the leader then it will enable notifications else + // it will not + dataChangeListenerPath.tell(new EnableNotification(isLeader()), getSelf()); + + // Now store a reference to the data change listener so it can be notified + // at a later point if notifications should be enabled or disabled + dataChangeListeners.add(dataChangeListenerPath); + AsyncDataChangeListener> listener = new DataChangeListenerProxy(schemaContext,dataChangeListenerPath); @@ -248,7 +292,14 @@ public class Shard extends RaftActor { if(data instanceof CompositeModificationPayload){ Object modification = ((CompositeModificationPayload) data).getModification(); - commit(clientActor, modification); + + if(modification != null){ + commit(clientActor, modification); + } else { + LOG.error("modification is null - this is very unexpected"); + } + + } else { LOG.error("Unknown state received {}", data); } @@ -263,6 +314,12 @@ public class Shard extends RaftActor { throw new UnsupportedOperationException("applySnapshot"); } + @Override protected void onStateChanged() { + for(ActorSelection dataChangeListener : dataChangeListeners){ + dataChangeListener.tell(new EnableNotification(isLeader()), getSelf()); + } + } + @Override public String persistenceId() { return this.name; }