package org.opendaylight.netconf.topology.pipeline;
+import akka.actor.ActorContext;
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.TypedActor;
+import akka.actor.TypedProps;
+import akka.cluster.Cluster;
+import akka.cluster.Member;
+import akka.japi.Creator;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
import org.opendaylight.controller.sal.core.api.Broker;
import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
-import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceDataBroker;
import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceNotificationService;
import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.netconf.topology.util.messages.AnnounceMasterMountPoint;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private static final Logger LOG = LoggerFactory.getLogger(TopologyMountPointFacade.class);
+ private static final String MOUNT_POINT = "mountpoint";
+
+ private final String topologyId;
private final RemoteDeviceId id;
private final Broker domBroker;
private final BindingAwareBroker bindingBroker;
private DOMRpcService deviceRpc = null;
private final ClusteredNetconfDeviceMountInstanceProxy salProvider;
+ private ActorSystem actorSystem;
+ private DOMDataBroker deviceDataBroker = null;
+
private final ArrayList<RemoteDeviceHandler<NetconfSessionPreferences>> connectionStatusListeners = new ArrayList<>();
- public TopologyMountPointFacade(final RemoteDeviceId id,
+ public TopologyMountPointFacade(final String topologyId,
+ final RemoteDeviceId id,
final Broker domBroker,
final BindingAwareBroker bindingBroker,
long defaultRequestTimeoutMillis) {
-
+ this.topologyId = topologyId;
this.id = id;
this.domBroker = domBroker;
this.bindingBroker = bindingBroker;
salProvider.getMountInstance().publish(domNotification);
}
- public void registerMountPoint() {
+ public void registerMountPoint(final ActorSystem actorSystem, final ActorContext context) {
Preconditions.checkNotNull(id);
Preconditions.checkNotNull(remoteSchemaContext, "Device has no remote schema context yet. Probably not fully connected.");
Preconditions.checkNotNull(netconfSessionPreferences, "Device has no capabilities yet. Probably not fully connected.");
+ this.actorSystem = actorSystem;
+ final NetconfDeviceNotificationService notificationService = new NetconfDeviceNotificationService();
- final DOMDataBroker netconfDeviceDataBroker = new NetconfDeviceDataBroker(id, remoteSchemaContext, deviceRpc, netconfSessionPreferences, defaultRequestTimeoutMillis);
+ LOG.warn("Creating master data broker for device {}", id);
+ deviceDataBroker = TypedActor.get(context).typedActorOf(new TypedProps<>(ProxyNetconfDeviceDataBroker.class, new Creator<NetconfDeviceMasterDataBroker>() {
+ @Override
+ public NetconfDeviceMasterDataBroker create() throws Exception {
+ return new NetconfDeviceMasterDataBroker(actorSystem, id, remoteSchemaContext, deviceRpc, netconfSessionPreferences, defaultRequestTimeoutMillis);
+ }
+ }), MOUNT_POINT);
+ LOG.warn("Master data broker registered on path {}", TypedActor.get(actorSystem).getActorRefFor(deviceDataBroker).path());
+ salProvider.getMountInstance().onTopologyDeviceConnected(remoteSchemaContext, deviceDataBroker, deviceRpc, notificationService);
+ final Cluster cluster = Cluster.get(actorSystem);
+ final Iterable<Member> members = cluster.state().getMembers();
+ final ActorRef deviceDataBrokerRef = TypedActor.get(actorSystem).getActorRefFor(deviceDataBroker);
+ for (final Member member : members) {
+ if (!member.address().equals(cluster.selfAddress())) {
+ final String path = member.address() + "/user/" + topologyId + "/" + id.getName();
+ actorSystem.actorSelection(path).tell(new AnnounceMasterMountPoint(), deviceDataBrokerRef);
+ }
+ }
+ }
+
+ public void registerMountPoint(final ActorSystem actorSystem, final ActorContext context, final ActorRef masterRef) {
+ Preconditions.checkNotNull(id);
+ Preconditions.checkNotNull(remoteSchemaContext, "Device has no remote schema context yet. Probably not fully connected.");
+ Preconditions.checkNotNull(netconfSessionPreferences, "Device has no capabilities yet. Probably not fully connected.");
+ this.actorSystem = actorSystem;
final NetconfDeviceNotificationService notificationService = new NetconfDeviceNotificationService();
- salProvider.getMountInstance().onTopologyDeviceConnected(remoteSchemaContext, netconfDeviceDataBroker, deviceRpc, notificationService);
+ LOG.warn("Creating a proxy for master data broker");
+ final ProxyNetconfDeviceDataBroker masterDataBroker = TypedActor.get(actorSystem).typedActorOf(new TypedProps<>(ProxyNetconfDeviceDataBroker.class, NetconfDeviceMasterDataBroker.class), masterRef);
+ LOG.warn("Creating slave data broker for device {}", id);
+ final DOMDataBroker deviceDataBroker = new NetconfDeviceSlaveDataBroker(actorSystem, id, masterDataBroker);
+ salProvider.getMountInstance().onTopologyDeviceConnected(remoteSchemaContext, deviceDataBroker, deviceRpc, notificationService);
}
public void unregisterMountPoint() {
salProvider.getMountInstance().onTopologyDeviceDisconnected();
+ if (deviceDataBroker != null) {
+ LOG.warn("Stopping master data broker for device {}", id.getName());
+ TypedActor.get(actorSystem).stop(deviceDataBroker);
+ deviceDataBroker = null;
+ }
}
public ConnectionStatusListenerRegistration registerConnectionStatusListener(final RemoteDeviceHandler<NetconfSessionPreferences> listener) {