X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShard.java;h=23e27c9f5ff909b2c9b887aa5f7687b92d7cda10;hb=eff56082a308f3be21cd4eefd03e0799a6b04714;hp=999d0f8bafca9639baa70ea7c893656f87704ae7;hpb=5ffd4b46ca00fc8f3d801050670c890117dc0811;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 999d0f8baf..23e27c9f5f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -15,9 +15,11 @@ import akka.event.Logging; import akka.event.LoggingAdapter; import akka.japi.Creator; import akka.serialization.Serialization; + import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; + import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; @@ -25,6 +27,7 @@ import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.EnableNotification; import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; @@ -42,7 +45,9 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; @@ -74,10 +79,12 @@ public class Shard extends RaftActor { private final String name; - private SchemaContext schemaContext; + private volatile SchemaContext schemaContext; private final ShardStats shardMBean; + private final List dataChangeListeners = new ArrayList<>(); + private Shard(String name, Map peerAddresses) { super(name, peerAddresses); @@ -155,10 +162,23 @@ public class Shard extends RaftActor { modificationToCohort.remove(serialized); if (cohort == null) { LOG.error( - "Could not find cohort for modification : " + modification); + "Could not find cohort for modification : {}", modification); LOG.info("Writing modification using a new transaction"); - modification.apply(store.newReadWriteTransaction()); - return; + DOMStoreReadWriteTransaction transaction = + store.newReadWriteTransaction(); + modification.apply(transaction); + DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready(); + ListenableFuture future = + commitCohort.preCommit(); + try { + future.get(); + future = commitCohort.commit(); + future.get(); + } catch (InterruptedException e) { + LOG.error("Failed to commit", e); + } catch (ExecutionException e) { + LOG.error("Failed to commit", e); + } } final ListenableFuture future = cohort.commit(); @@ -213,6 +233,16 @@ public class Shard extends RaftActor { .system().actorSelection( registerChangeListener.getDataChangeListenerPath()); + + // Notify the listener if notifications should be enabled or not + // If this shard is the leader then it will enable notifications else + // it will not + dataChangeListenerPath.tell(new EnableNotification(isLeader()), getSelf()); + + // Now store a reference to the data change listener so it can be notified + // at a later point if notifications should be enabled or disabled + dataChangeListeners.add(dataChangeListenerPath); + AsyncDataChangeListener> listener = new DataChangeListenerProxy(schemaContext,dataChangeListenerPath); @@ -248,7 +278,14 @@ public class Shard extends RaftActor { if(data instanceof CompositeModificationPayload){ Object modification = ((CompositeModificationPayload) data).getModification(); - commit(clientActor, modification); + + if(modification != null){ + commit(clientActor, modification); + } else { + LOG.error("modification is null - this is very unexpected"); + } + + } else { LOG.error("Unknown state received {}", data); } @@ -263,6 +300,12 @@ public class Shard extends RaftActor { throw new UnsupportedOperationException("applySnapshot"); } + @Override protected void onStateChanged() { + for(ActorSelection dataChangeListener : dataChangeListeners){ + dataChangeListener.tell(new EnableNotification(isLeader()), getSelf()); + } + } + @Override public String persistenceId() { return this.name; }