RemoteDeviceDataBroker proxy
[netconf.git] / opendaylight / netconf / netconf-topology / src / main / java / org / opendaylight / netconf / topology / pipeline / TopologyMountPointFacade.java
index ec06c04b92f9e1fc9ac94885ad7b9e8e64ab7fc0..ec294b268f4528e94f5b9e495980a818a6a29165 100644 (file)
@@ -8,6 +8,14 @@
 
 package org.opendaylight.netconf.topology.pipeline;
 
+import akka.actor.ActorContext;
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.TypedActor;
+import akka.actor.TypedProps;
+import akka.cluster.Cluster;
+import akka.cluster.Member;
+import akka.japi.Creator;
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
@@ -17,9 +25,9 @@ import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
 import org.opendaylight.controller.sal.core.api.Broker;
 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
-import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceDataBroker;
 import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceNotificationService;
 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.netconf.topology.util.messages.AnnounceMasterMountPoint;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -28,6 +36,9 @@ public class TopologyMountPointFacade implements AutoCloseable, RemoteDeviceHand
 
     private static final Logger LOG = LoggerFactory.getLogger(TopologyMountPointFacade.class);
 
+    private static final String MOUNT_POINT = "mountpoint";
+
+    private final String topologyId;
     private final RemoteDeviceId id;
     private final Broker domBroker;
     private final BindingAwareBroker bindingBroker;
@@ -38,13 +49,17 @@ public class TopologyMountPointFacade implements AutoCloseable, RemoteDeviceHand
     private DOMRpcService deviceRpc = null;
     private final ClusteredNetconfDeviceMountInstanceProxy salProvider;
 
+    private ActorSystem actorSystem;
+    private DOMDataBroker deviceDataBroker = null;
+
     private final ArrayList<RemoteDeviceHandler<NetconfSessionPreferences>> connectionStatusListeners = new ArrayList<>();
 
-    public TopologyMountPointFacade(final RemoteDeviceId id,
+    public TopologyMountPointFacade(final String topologyId,
+                                    final RemoteDeviceId id,
                                     final Broker domBroker,
                                     final BindingAwareBroker bindingBroker,
                                     long defaultRequestTimeoutMillis) {
-
+        this.topologyId = topologyId;
         this.id = id;
         this.domBroker = domBroker;
         this.bindingBroker = bindingBroker;
@@ -91,19 +106,54 @@ public class TopologyMountPointFacade implements AutoCloseable, RemoteDeviceHand
         salProvider.getMountInstance().publish(domNotification);
     }
 
-    public void registerMountPoint() {
+    public void registerMountPoint(final ActorSystem actorSystem, final ActorContext context) {
         Preconditions.checkNotNull(id);
         Preconditions.checkNotNull(remoteSchemaContext, "Device has no remote schema context yet. Probably not fully connected.");
         Preconditions.checkNotNull(netconfSessionPreferences, "Device has no capabilities yet. Probably not fully connected.");
+        this.actorSystem = actorSystem;
+        final NetconfDeviceNotificationService notificationService = new NetconfDeviceNotificationService();
 
-        final DOMDataBroker netconfDeviceDataBroker = new NetconfDeviceDataBroker(id, remoteSchemaContext, deviceRpc, netconfSessionPreferences, defaultRequestTimeoutMillis);
+        LOG.warn("Creating master data broker for device {}", id);
+        deviceDataBroker = TypedActor.get(context).typedActorOf(new TypedProps<>(ProxyNetconfDeviceDataBroker.class, new Creator<NetconfDeviceMasterDataBroker>() {
+            @Override
+            public NetconfDeviceMasterDataBroker create() throws Exception {
+                return new NetconfDeviceMasterDataBroker(actorSystem, id, remoteSchemaContext, deviceRpc, netconfSessionPreferences, defaultRequestTimeoutMillis);
+            }
+        }), MOUNT_POINT);
+        LOG.warn("Master data broker registered on path {}", TypedActor.get(actorSystem).getActorRefFor(deviceDataBroker).path());
+        salProvider.getMountInstance().onTopologyDeviceConnected(remoteSchemaContext, deviceDataBroker, deviceRpc, notificationService);
+        final Cluster cluster = Cluster.get(actorSystem);
+        final Iterable<Member> members = cluster.state().getMembers();
+        final ActorRef deviceDataBrokerRef = TypedActor.get(actorSystem).getActorRefFor(deviceDataBroker);
+        for (final Member member : members) {
+            if (!member.address().equals(cluster.selfAddress())) {
+                final String path = member.address() + "/user/" + topologyId + "/" + id.getName();
+                actorSystem.actorSelection(path).tell(new AnnounceMasterMountPoint(), deviceDataBrokerRef);
+            }
+        }
+    }
+
+    public void registerMountPoint(final ActorSystem actorSystem, final ActorContext context, final ActorRef masterRef) {
+        Preconditions.checkNotNull(id);
+        Preconditions.checkNotNull(remoteSchemaContext, "Device has no remote schema context yet. Probably not fully connected.");
+        Preconditions.checkNotNull(netconfSessionPreferences, "Device has no capabilities yet. Probably not fully connected.");
+        this.actorSystem = actorSystem;
         final NetconfDeviceNotificationService notificationService = new NetconfDeviceNotificationService();
 
-        salProvider.getMountInstance().onTopologyDeviceConnected(remoteSchemaContext, netconfDeviceDataBroker, deviceRpc, notificationService);
+        LOG.warn("Creating a proxy for master data broker");
+        final ProxyNetconfDeviceDataBroker masterDataBroker = TypedActor.get(actorSystem).typedActorOf(new TypedProps<>(ProxyNetconfDeviceDataBroker.class, NetconfDeviceMasterDataBroker.class), masterRef);
+        LOG.warn("Creating slave data broker for device {}", id);
+        final DOMDataBroker deviceDataBroker = new NetconfDeviceSlaveDataBroker(actorSystem, id, masterDataBroker);
+        salProvider.getMountInstance().onTopologyDeviceConnected(remoteSchemaContext, deviceDataBroker, deviceRpc, notificationService);
     }
 
     public void unregisterMountPoint() {
         salProvider.getMountInstance().onTopologyDeviceDisconnected();
+        if (deviceDataBroker != null) {
+            LOG.warn("Stopping master data broker for device {}", id.getName());
+            TypedActor.get(actorSystem).stop(deviceDataBroker);
+            deviceDataBroker = null;
+        }
     }
 
     public ConnectionStatusListenerRegistration registerConnectionStatusListener(final RemoteDeviceHandler<NetconfSessionPreferences> listener) {