X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShard.java;h=d75edc7922f54ea46d72e4fd62fc4bc0e0aa3143;hp=f96cb14510a9d3b95ab4acd3820f8704d689af96;hb=refs%2Fchanges%2F70%2F8270%2F5;hpb=324c96119dec46d0fee5e641f0a26caac478c23b diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index f96cb14510..d75edc7922 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -13,20 +13,29 @@ import akka.actor.Props; import akka.event.Logging; import akka.event.LoggingAdapter; import akka.japi.Creator; +import akka.persistence.Persistent; import akka.persistence.UntypedProcessor; +import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; +import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply; +import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; +import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; /** @@ -43,6 +52,8 @@ public class Shard extends UntypedProcessor { private final InMemoryDOMDataStore store; + private final Map modificationToCohort = new HashMap<>(); + private final LoggingAdapter log = Logging.getLogger(getContext().system(), this); private Shard(String name){ @@ -68,7 +79,40 @@ public class Shard extends UntypedProcessor { registerChangeListener((RegisterChangeListener) message); } else if (message instanceof UpdateSchemaContext) { updateSchemaContext((UpdateSchemaContext) message); + } else if (message instanceof ForwardedCommitTransaction ) { + handleForwardedCommit((ForwardedCommitTransaction) message); + } else if (message instanceof Persistent){ + commit((Persistent) message); + } + } + + private void commit(Persistent message) { + Modification modification = (Modification) message.payload(); + DOMStoreThreePhaseCommitCohort cohort = modificationToCohort.remove(modification); + if(cohort == null){ + log.error("Could not find cohort for modification : " + modification); + return; } + final ListenableFuture future = cohort.commit(); + final ActorRef sender = getSender(); + final ActorRef self = getSelf(); + future.addListener(new Runnable() { + @Override + public void run() { + try { + future.get(); + sender.tell(new CommitTransactionReply(), self); + } catch (InterruptedException | ExecutionException e) { + log.error(e, "An exception happened when committing"); + } + } + }, getContext().dispatcher()); + } + + private void handleForwardedCommit(ForwardedCommitTransaction message) { + log.info("received forwarded transaction"); + modificationToCohort.put(message.getModification(), message.getCohort()); + getSelf().forward(Persistent.create(message.getModification()), getContext()); } private void updateSchemaContext(UpdateSchemaContext message) {