Refactor TransactonContext
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / compat / PreLithiumTransactionContextImpl.java
index c3450333a46447d50aa16f6021fc2d48592a768b..2634adaf4cc97b225860cdd5a61c6b5124052bf8 100644 (file)
@@ -9,17 +9,21 @@ package org.opendaylight.controller.cluster.datastore.compat;
 
 import akka.actor.ActorSelection;
 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
-import org.opendaylight.controller.cluster.datastore.OperationCompleter;
-import org.opendaylight.controller.cluster.datastore.TransactionContextImpl;
+import org.opendaylight.controller.cluster.datastore.OperationLimiter;
+import org.opendaylight.controller.cluster.datastore.RemoteTransactionContext;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.VersionedExternalizableMessage;
 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
+import org.opendaylight.controller.cluster.datastore.modification.AbstractModification;
+import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
+import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
+import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
@@ -30,51 +34,58 @@ import scala.concurrent.Future;
  *
  * @author Thomas Pantelis
  */
-public class PreLithiumTransactionContextImpl extends TransactionContextImpl {
+@Deprecated
+public class PreLithiumTransactionContextImpl extends RemoteTransactionContext {
     private static final Logger LOG = LoggerFactory.getLogger(PreLithiumTransactionContextImpl.class);
 
     private final String transactionPath;
 
-    public PreLithiumTransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier,
-            String transactionChainId, ActorContext actorContext, SchemaContext schemaContext, boolean isTxActorLocal,
-            short remoteTransactionVersion, OperationCompleter operationCompleter) {
-        super(actor, identifier, transactionChainId, actorContext, schemaContext, isTxActorLocal,
-                remoteTransactionVersion, operationCompleter);
+    public PreLithiumTransactionContextImpl(TransactionIdentifier identifier, String transactionPath, ActorSelection actor,
+            ActorContext actorContext, boolean isTxActorLocal,
+            short remoteTransactionVersion, OperationLimiter limiter) {
+        super(identifier, actor, actorContext, isTxActorLocal, remoteTransactionVersion, limiter);
         this.transactionPath = transactionPath;
     }
 
     @Override
-    public void deleteData(YangInstanceIdentifier path) {
-        recordOperationFuture(executeOperationAsync(
-                new DeleteData(path, getRemoteTransactionVersion())));
-    }
+    public void executeModification(AbstractModification modification) {
+        final short remoteTransactionVersion = getRemoteTransactionVersion();
+        final YangInstanceIdentifier path = modification.getPath();
+        VersionedExternalizableMessage msg = null;
 
-    @Override
-    public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
-        recordOperationFuture(executeOperationAsync(
-                new MergeData(path, data, getRemoteTransactionVersion())));
-    }
+        if(modification instanceof DeleteModification) {
+            msg = new DeleteData(path, remoteTransactionVersion);
+        } else if(modification instanceof WriteModification) {
+            final NormalizedNode<?, ?> data = ((WriteModification) modification).getData();
 
-    @Override
-    public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
-        recordOperationFuture(executeOperationAsync(
-                new WriteData(path, data, getRemoteTransactionVersion())));
+            // be sure to check for Merge before Write, since Merge is a subclass of Write
+            if(modification instanceof MergeModification) {
+                msg = new MergeData(path, data, remoteTransactionVersion);
+            } else {
+                msg = new WriteData(path, data, remoteTransactionVersion);
+            }
+        } else {
+            LOG.error("Invalid modification type " + modification.getClass().getName());
+        }
+
+        if(msg != null) {
+            executeOperationAsync(msg);
+        }
     }
 
     @Override
     public Future<ActorSelection> readyTransaction() {
-        LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
-            getIdentifier(), recordedOperationCount());
+        LOG.debug("Tx {} readyTransaction called", getIdentifier());
 
         // Send the ReadyTransaction message to the Tx actor.
 
         Future<Object> lastReplyFuture = executeOperationAsync(ReadyTransaction.INSTANCE);
 
-        return combineRecordedOperationsFutures(lastReplyFuture);
+        return transformReadyReply(lastReplyFuture);
     }
 
     @Override
-    protected String deserializeCohortPath(String cohortPath) {
+    protected Future<ActorSelection> transformReadyReply(final Future<Object> readyReplyFuture) {
         // In base Helium we used to return the local path of the actor which represented
         // a remote ThreePhaseCommitCohort. The local path would then be converted to
         // a remote path using this resolvePath method. To maintain compatibility with
@@ -82,10 +93,20 @@ public class PreLithiumTransactionContextImpl extends TransactionContextImpl {
         // At some point in the future when upgrades from Helium are not supported
         // we could remove this code to resolvePath and just use the cohortPath as the
         // resolved cohortPath
-        if(getRemoteTransactionVersion() < DataStoreVersions.HELIUM_1_VERSION) {
-            return getActorContext().resolvePath(transactionPath, cohortPath);
+        if (getRemoteTransactionVersion() < DataStoreVersions.HELIUM_1_VERSION) {
+            return PreLithiumTransactionReadyReplyMapper.transform(readyReplyFuture, getActorContext(), getIdentifier(), transactionPath);
+        } else {
+            return super.transformReadyReply(readyReplyFuture);
         }
+    }
+
+    @Override
+    public boolean supportsDirectCommit() {
+        return false;
+    }
 
-        return cohortPath;
+    @Override
+    public Future<Object> directCommit() {
+        throw new UnsupportedOperationException("directCommit is not supported for " + getClass());
     }
 }