Change handling of netconf cluster transactions
[netconf.git] / netconf / netconf-topology-singleton / src / main / java / org / opendaylight / netconf / topology / singleton / impl / actors / NetconfNodeActor.java
index ba8f11fdc07d69fb006d0db31de6ebf5f6fb9f4a..9828e2476fcf7931295fbad29943b067e623548f 100644 (file)
@@ -22,14 +22,15 @@ import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.schema.provider.RemoteYangTextSourceProvider;
 import org.opendaylight.controller.cluster.schema.provider.impl.RemoteSchemaProvider;
 import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.netconf.topology.singleton.api.RemoteOperationTxProcessor;
 import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMRpcService;
 import org.opendaylight.netconf.topology.singleton.impl.ProxyYangTextSourceProvider;
-import org.opendaylight.netconf.topology.singleton.impl.RemoteOperationTxProcessorImpl;
 import org.opendaylight.netconf.topology.singleton.impl.SlaveSalFacade;
 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
 import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint;
@@ -42,16 +43,11 @@ import org.opendaylight.netconf.topology.singleton.messages.UnregisterSlaveMount
 import org.opendaylight.netconf.topology.singleton.messages.YangTextSchemaSourceRequest;
 import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessage;
 import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessageReply;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
 import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyResultResponse;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.OpenTransaction;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.TransactionRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionReply;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionReply;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionRequest;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
@@ -76,11 +72,13 @@ public class NetconfNodeActor extends UntypedActor {
     private final SchemaSourceRegistry schemaRegistry;
     private final SchemaRepository schemaRepository;
 
-    private RemoteOperationTxProcessor operationsProcessor;
     private List<SourceIdentifier> sourceIdentifiers;
     private DOMRpcService deviceRpc;
     private SlaveSalFacade slaveSalManager;
     private final Timeout actorResponseWaitTime;
+    private DOMDataBroker deviceDataBroker;
+    //readTxActor can be shared
+    private ActorRef readTxActor;
 
     public static Props props(final NetconfTopologySetup setup,
                               final RemoteDeviceId id, final SchemaSourceRegistry schemaRegistry,
@@ -103,11 +101,12 @@ public class NetconfNodeActor extends UntypedActor {
     public void onReceive(final Object message) throws Exception {
         if (message instanceof CreateInitialMasterActorData) { // master
 
-            sourceIdentifiers = ((CreateInitialMasterActorData) message).getSourceIndentifiers();
-            operationsProcessor =
-                    new RemoteOperationTxProcessorImpl(((CreateInitialMasterActorData) message).getDeviceDataBroker(),
-                            id);
-            this.deviceRpc = ((CreateInitialMasterActorData) message).getDeviceRpc();
+            final CreateInitialMasterActorData masterActorData = (CreateInitialMasterActorData) message;
+            sourceIdentifiers = masterActorData.getSourceIndentifiers();
+            this.deviceDataBroker = masterActorData.getDeviceDataBroker();
+            final DOMDataReadOnlyTransaction tx = deviceDataBroker.newReadOnlyTransaction();
+            readTxActor = context().actorOf(ReadTransactionActor.props(tx));
+            this.deviceRpc = masterActorData.getDeviceRpc();
 
             sender().tell(new MasterActorDataInitialized(), self());
 
@@ -118,21 +117,30 @@ public class NetconfNodeActor extends UntypedActor {
             id = ((RefreshSetupMasterActorData) message).getRemoteDeviceId();
             sender().tell(new MasterActorDataInitialized(), self());
         } else if (message instanceof AskForMasterMountPoint) { // master
-            // only master contains reference to operations processor
-            if (operationsProcessor != null) {
+            // only master contains reference to deviceDataBroker
+            if (deviceDataBroker != null) {
                 getSender().tell(new RegisterMountPoint(sourceIdentifiers), getSelf());
             }
 
-        } else if (message instanceof TransactionRequest) { // master
-
-            resolveProxyCalls(message, sender(), getSelf());
-
         } else if (message instanceof YangTextSchemaSourceRequest) { // master
 
             final YangTextSchemaSourceRequest yangTextSchemaSourceRequest = (YangTextSchemaSourceRequest) message;
             sendYangTextSchemaSourceProxy(yangTextSchemaSourceRequest.getSourceIdentifier(), sender());
 
-        } else if (message instanceof InvokeRpcMessage) {
+        } else if (message instanceof NewReadTransactionRequest) { // master
+
+            sender().tell(new NewReadTransactionReply(readTxActor), self());
+
+        } else if (message instanceof NewWriteTransactionRequest) { // master
+            try {
+                final DOMDataWriteTransaction tx = deviceDataBroker.newWriteOnlyTransaction();
+                final ActorRef txActor = context().actorOf(WriteTransactionActor.props(tx));
+                sender().tell(new NewWriteTransactionReply(txActor), self());
+            } catch (final Throwable t) {
+                sender().tell(t, self());
+            }
+
+        } else if (message instanceof InvokeRpcMessage) { // master
 
             final InvokeRpcMessage invokeRpcMessage = ((InvokeRpcMessage) message);
             invokeSlaveRpc(invokeRpcMessage.getSchemaPath(), invokeRpcMessage.getNormalizedNodeMessage(), sender());
@@ -151,43 +159,6 @@ public class NetconfNodeActor extends UntypedActor {
         }
     }
 
-    private void resolveProxyCalls(final Object message, final ActorRef recipient, final ActorRef futureSender) {
-        if (message instanceof OpenTransaction) {
-            operationsProcessor.doOpenTransaction(recipient, futureSender);
-        } else if (message instanceof ReadRequest) {
-
-            final ReadRequest readRequest = (ReadRequest) message;
-            operationsProcessor.doRead(readRequest.getStore(), readRequest.getPath(), recipient, futureSender);
-
-        } else if (message instanceof ExistsRequest) {
-
-            final ExistsRequest readRequest = (ExistsRequest) message;
-            operationsProcessor.doExists(readRequest.getStore(), readRequest.getPath(), recipient, futureSender);
-
-        } else if (message instanceof MergeRequest) {
-
-            final MergeRequest mergeRequest = (MergeRequest) message;
-            operationsProcessor.doMerge(mergeRequest.getStore(), mergeRequest.getNormalizedNodeMessage());
-
-        } else if (message instanceof PutRequest) {
-
-            final PutRequest putRequest = (PutRequest) message;
-            operationsProcessor.doPut(putRequest.getStore(), putRequest.getNormalizedNodeMessage());
-
-        } else if (message instanceof DeleteRequest) {
-
-            final DeleteRequest deleteRequest = (DeleteRequest) message;
-            operationsProcessor.doDelete(deleteRequest.getStore(), deleteRequest.getPath());
-
-        } else if (message instanceof CancelRequest) {
-
-            operationsProcessor.doCancel(recipient, futureSender);
-
-        } else if (message instanceof SubmitRequest) {
-
-            operationsProcessor.doSubmit(recipient, futureSender);
-        }
-    }
 
     private void sendYangTextSchemaSourceProxy(final SourceIdentifier sourceIdentifier, final ActorRef sender) {
         final CheckedFuture<YangTextSchemaSource, SchemaSourceException> yangTextSchemaSource =