import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.japi.Creator;
+import akka.persistence.Persistent;
import akka.persistence.UntypedProcessor;
+import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply;
+import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
/**
private final InMemoryDOMDataStore store;
+ private final Map<Modification, DOMStoreThreePhaseCommitCohort> modificationToCohort = new HashMap<>();
+
private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
private Shard(String name){
registerChangeListener((RegisterChangeListener) message);
} else if (message instanceof UpdateSchemaContext) {
updateSchemaContext((UpdateSchemaContext) message);
+ } else if (message instanceof ForwardedCommitTransaction ) {
+ handleForwardedCommit((ForwardedCommitTransaction) message);
+ } else if (message instanceof Persistent){
+ commit((Persistent) message);
+ }
+ }
+
+ private void commit(Persistent message) {
+ Modification modification = (Modification) message.payload();
+ DOMStoreThreePhaseCommitCohort cohort = modificationToCohort.remove(modification);
+ if(cohort == null){
+ log.error("Could not find cohort for modification : " + modification);
+ return;
}
+ final ListenableFuture<Void> future = cohort.commit();
+ final ActorRef sender = getSender();
+ final ActorRef self = getSelf();
+ future.addListener(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ future.get();
+ sender.tell(new CommitTransactionReply(), self);
+ } catch (InterruptedException | ExecutionException e) {
+ log.error(e, "An exception happened when committing");
+ }
+ }
+ }, getContext().dispatcher());
+ }
+
+ private void handleForwardedCommit(ForwardedCommitTransaction message) {
+ log.info("received forwarded transaction");
+ modificationToCohort.put(message.getModification(), message.getCohort());
+ getSelf().forward(Persistent.create(message.getModification()), getContext());
}
private void updateSchemaContext(UpdateSchemaContext message) {