Clustered sources resolution 26/31326/12
authorJakub Morvay <jmorvay@cisco.com>
Thu, 10 Dec 2015 14:40:05 +0000 (15:40 +0100)
committerTomas Cere <tcere@cisco.com>
Mon, 21 Dec 2015 12:05:34 +0000 (13:05 +0100)
Change-Id: I9fe167ab4185a8ac959b5e1aa6737d1eaff1d90c
Signed-off-by: Jakub Morvay <jmorvay@cisco.com>
opendaylight/netconf/netconf-topology/pom.xml
opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/ClusteredDeviceSourcesResolver.java [new file with mode: 0644]
opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/ClusteredDeviceSourcesResolverImpl.java [new file with mode: 0644]
opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/MasterSourceProvider.java [new file with mode: 0644]
opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/MasterSourceProviderImpl.java [new file with mode: 0644]
opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/MasterSourceProviderOnSameNodeException.java [new file with mode: 0644]
opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/messages/AnnounceClusteredDeviceSourcesResolverUp.java [new file with mode: 0644]
opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/messages/AnnounceMasterOnSameNodeUp.java [new file with mode: 0644]
opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/messages/AnnounceMasterSourceProviderUp.java [new file with mode: 0644]

index 94b0d40e19325ee7a07c22ee54c91e9da211310c..fb36a67f2d5dcbb3d92293b45ea86b3720afb31d 100644 (file)
             <version>1.6.5</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.opendaylight.yangtools</groupId>
+            <artifactId>yang-model-api</artifactId>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/ClusteredDeviceSourcesResolver.java b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/ClusteredDeviceSourcesResolver.java
new file mode 100644 (file)
index 0000000..6e0c1d1
--- /dev/null
@@ -0,0 +1,19 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.pipeline;
+
+import akka.actor.TypedActor;
+import java.util.Set;
+import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
+import scala.concurrent.Future;
+
+public interface ClusteredDeviceSourcesResolver extends TypedActor.Receiver, TypedActor.PreStart {
+
+    Future<Set<SourceIdentifier>> getResolvedSources();
+}
diff --git a/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/ClusteredDeviceSourcesResolverImpl.java b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/ClusteredDeviceSourcesResolverImpl.java
new file mode 100644 (file)
index 0000000..3a74e40
--- /dev/null
@@ -0,0 +1,114 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+
+package org.opendaylight.netconf.topology.pipeline;
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.TypedActor;
+import akka.actor.TypedProps;
+import akka.cluster.Cluster;
+import akka.cluster.Member;
+import akka.dispatch.Futures;
+import akka.dispatch.OnComplete;
+import java.util.List;
+import java.util.Set;
+import org.opendaylight.controller.cluster.schema.provider.impl.RemoteSchemaProvider;
+import org.opendaylight.netconf.topology.pipeline.messages.AnnounceClusteredDeviceSourcesResolverUp;
+import org.opendaylight.netconf.topology.pipeline.messages.AnnounceMasterOnSameNodeUp;
+import org.opendaylight.netconf.topology.pipeline.messages.AnnounceMasterSourceProviderUp;
+import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceRepresentation;
+import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
+import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
+import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
+import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
+import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+import scala.concurrent.Promise;
+
+
+public class ClusteredDeviceSourcesResolverImpl implements ClusteredDeviceSourcesResolver {
+
+    private static Logger LOG = LoggerFactory.getLogger(ClusteredDeviceSourcesResolver.class);
+
+    private final String topologyId;
+    private final String nodeId;
+    private final ActorSystem actorSystem;
+    private final SchemaSourceRegistry schemaRegistry;
+    private final List<SchemaSourceRegistration<? extends SchemaSourceRepresentation>> sourceRegistrations;
+
+    private final Promise<Set<SourceIdentifier>> resolvedSourcesPromise;
+    private MasterSourceProvider remoteYangTextSourceProvider;
+
+    public ClusteredDeviceSourcesResolverImpl(String topologyId, String nodeId, ActorSystem actorSystem,
+                                              SchemaSourceRegistry schemaRegistry,
+                                              List<SchemaSourceRegistration<? extends SchemaSourceRepresentation>> sourceRegistrations) {
+        this.topologyId = topologyId;
+        this.nodeId = nodeId;
+        this.actorSystem = actorSystem;
+        this.schemaRegistry = schemaRegistry;
+        this.sourceRegistrations = sourceRegistrations;
+        resolvedSourcesPromise = Futures.promise();
+    }
+
+    @Override
+    public void preStart(){
+        Cluster cluster = Cluster.get(actorSystem);
+        for(Member node : cluster.state().getMembers()) {
+            if(!node.address().equals(cluster.selfAddress())) {
+                final String path = node.address() + "/user/" + topologyId + "/" + nodeId + "/masterSourceProvider";
+                actorSystem.actorSelection(path).tell(new AnnounceClusteredDeviceSourcesResolverUp(), TypedActor.context().self());
+            }
+        }
+    }
+
+    @Override
+    public void onReceive(Object o, ActorRef actorRef) {
+        if(o instanceof AnnounceMasterSourceProviderUp) {
+            if(remoteYangTextSourceProvider == null) {
+                remoteYangTextSourceProvider = TypedActor.get(actorSystem).typedActorOf(
+                        new TypedProps<>(MasterSourceProvider.class,
+                                MasterSourceProviderImpl.class), actorRef);
+                registerProvidedSourcesToSchemaRegistry();
+            }
+        } else if(o instanceof AnnounceMasterOnSameNodeUp) {
+            resolvedSourcesPromise.failure(new MasterSourceProviderOnSameNodeException());
+        }
+    }
+
+    private void registerProvidedSourcesToSchemaRegistry() {
+        Future<Set<SourceIdentifier>> sourcesFuture = remoteYangTextSourceProvider.getProvidedSources();
+        resolvedSourcesPromise.completeWith(sourcesFuture);
+        final RemoteSchemaProvider remoteProvider = new RemoteSchemaProvider(remoteYangTextSourceProvider, actorSystem.dispatcher());
+
+        sourcesFuture.onComplete(new OnComplete<Set<SourceIdentifier>>() {
+            @Override
+            public void onComplete(Throwable throwable, Set<SourceIdentifier> sourceIdentifiers) throws Throwable {
+                for (SourceIdentifier sourceId : sourceIdentifiers) {
+                   sourceRegistrations.add(schemaRegistry.registerSchemaSource(remoteProvider,
+                           PotentialSchemaSource.create(sourceId, YangTextSchemaSource.class, PotentialSchemaSource.Costs.REMOTE_IO.getValue())));
+                }
+            }
+        }, actorSystem.dispatcher());
+    }
+
+    @Override
+    public Future<Set<SourceIdentifier>> getResolvedSources() {
+        return resolvedSourcesPromise.future();
+    }
+}
diff --git a/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/MasterSourceProvider.java b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/MasterSourceProvider.java
new file mode 100644 (file)
index 0000000..f9726f4
--- /dev/null
@@ -0,0 +1,15 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.topology.pipeline;
+
+import akka.actor.TypedActor;
+import org.opendaylight.controller.cluster.schema.provider.RemoteYangTextSourceProvider;
+
+public interface MasterSourceProvider
+        extends TypedActor.PreStart, TypedActor.Receiver, RemoteYangTextSourceProvider {
+}
diff --git a/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/MasterSourceProviderImpl.java b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/MasterSourceProviderImpl.java
new file mode 100644 (file)
index 0000000..42375dc
--- /dev/null
@@ -0,0 +1,66 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.topology.pipeline;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.TypedActor;
+import akka.cluster.Cluster;
+import akka.cluster.Member;
+import java.util.Set;
+import org.opendaylight.controller.cluster.schema.provider.impl.RemoteYangTextSourceProviderImpl;
+import org.opendaylight.netconf.topology.pipeline.messages.AnnounceClusteredDeviceSourcesResolverUp;
+import org.opendaylight.netconf.topology.pipeline.messages.AnnounceMasterOnSameNodeUp;
+import org.opendaylight.netconf.topology.pipeline.messages.AnnounceMasterSourceProviderUp;
+import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
+import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MasterSourceProviderImpl extends RemoteYangTextSourceProviderImpl
+        implements MasterSourceProvider {
+
+    private static Logger LOG = LoggerFactory.getLogger(MasterSourceProviderImpl.class);
+
+    private final ActorSystem actorSystem;
+    private final String topologyId;
+    private final String nodeId;
+
+    public MasterSourceProviderImpl(SchemaRepository schemaRepo, Set<SourceIdentifier> providedSources, ActorSystem actorSystem, String topologyId, String nodeId) {
+        super(schemaRepo, providedSources);
+        this.actorSystem = actorSystem;
+        this.topologyId = topologyId;
+        this.nodeId = nodeId;
+    }
+
+    @Override
+    public void onReceive(Object o, ActorRef actorRef) {
+        if(o instanceof AnnounceClusteredDeviceSourcesResolverUp) {
+            LOG.debug("Received source resolver up");
+            actorRef.tell(new AnnounceMasterSourceProviderUp(), TypedActor.context().self());
+        }
+    }
+
+    @Override
+    public void preStart() {
+        Cluster cluster = Cluster.get(actorSystem);
+        cluster.join(cluster.selfAddress());
+        LOG.debug("Notifying members master schema source provider is up.");
+        for(Member node : cluster.state().getMembers()) {
+            final String path = node.address() + "/user/" + topologyId + "/" + nodeId + "/clusteredDeviceSourcesResolver";
+            if(node.address().equals(cluster.selfAddress())) {
+                actorSystem.actorSelection(path).tell(new AnnounceMasterOnSameNodeUp(), TypedActor.context().self());
+                actorSystem.actorSelection(path).tell(PoisonPill.getInstance(), TypedActor.context().self());
+            } else {
+                //TODO extract string constant to util class
+                actorSystem.actorSelection(path).tell(new AnnounceMasterSourceProviderUp(), TypedActor.context().self());
+            }
+        }
+    }
+}
diff --git a/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/MasterSourceProviderOnSameNodeException.java b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/MasterSourceProviderOnSameNodeException.java
new file mode 100644 (file)
index 0000000..71f3005
--- /dev/null
@@ -0,0 +1,12 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.pipeline;
+
+public class MasterSourceProviderOnSameNodeException extends Exception {
+}
diff --git a/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/messages/AnnounceClusteredDeviceSourcesResolverUp.java b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/messages/AnnounceClusteredDeviceSourcesResolverUp.java
new file mode 100644 (file)
index 0000000..f8328e7
--- /dev/null
@@ -0,0 +1,17 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.pipeline.messages;
+
+import java.io.Serializable;
+
+public class AnnounceClusteredDeviceSourcesResolverUp implements Serializable {
+    public static final long serialVersionUID = 1L;
+
+    public AnnounceClusteredDeviceSourcesResolverUp() {}
+}
diff --git a/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/messages/AnnounceMasterOnSameNodeUp.java b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/messages/AnnounceMasterOnSameNodeUp.java
new file mode 100644 (file)
index 0000000..793321c
--- /dev/null
@@ -0,0 +1,19 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.pipeline.messages;
+
+import java.io.Serializable;
+
+public class AnnounceMasterOnSameNodeUp  implements Serializable {
+    public static long serialVersionUID = 1L;
+
+    public AnnounceMasterOnSameNodeUp() {
+
+    }
+}
diff --git a/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/messages/AnnounceMasterSourceProviderUp.java b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/messages/AnnounceMasterSourceProviderUp.java
new file mode 100644 (file)
index 0000000..7bec681
--- /dev/null
@@ -0,0 +1,19 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.pipeline.messages;
+
+import java.io.Serializable;
+
+public class AnnounceMasterSourceProviderUp implements Serializable {
+    public static final long serialVersionUID = 1L;
+
+    public AnnounceMasterSourceProviderUp() {
+
+    }
+}