2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.netconf.topology.pipeline;
11 import akka.actor.ActorContext;
12 import akka.actor.ActorSystem;
13 import akka.actor.TypedActor;
14 import akka.actor.TypedProps;
15 import akka.dispatch.OnComplete;
16 import akka.japi.Creator;
17 import com.google.common.base.Optional;
18 import com.google.common.collect.Sets;
19 import com.google.common.util.concurrent.FutureCallback;
20 import com.google.common.util.concurrent.Futures;
22 import java.util.concurrent.ExecutorService;
23 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange;
24 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
25 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
26 import org.opendaylight.netconf.api.NetconfMessage;
27 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceCommunicator;
28 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
29 import org.opendaylight.netconf.sal.connect.netconf.NetconfDevice;
30 import org.opendaylight.netconf.sal.connect.netconf.NetconfDeviceBuilder;
31 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCapabilities;
32 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCommunicator;
33 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
34 import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceRpc;
35 import org.opendaylight.netconf.sal.connect.netconf.schema.mapping.NetconfMessageTransformer;
36 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
37 import org.opendaylight.netconf.util.NetconfTopologyPathCreator;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.available.capabilities.AvailableCapability;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.available.capabilities.AvailableCapabilityBuilder;
40 import org.opendaylight.yangtools.yang.common.QName;
41 import org.opendaylight.yangtools.yang.common.SimpleDateFormatUtil;
42 import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
43 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
44 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
45 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
49 public class ClusteredNetconfDevice extends NetconfDevice implements EntityOwnershipListener {
51 private static final Logger LOG = LoggerFactory.getLogger(ClusteredNetconfDevice.class);
53 private NetconfDeviceCommunicator listener;
54 private NetconfSessionPreferences sessionPreferences;
55 private SchemaRepository schemaRepo;
56 private final ActorSystem actorSystem;
57 private final String topologyId;
58 private final String nodeId;
59 private final ActorContext cachedContext;
61 private MasterSourceProvider masterSourceProvider = null;
62 private ClusteredDeviceSourcesResolver resolver = null;
64 public ClusteredNetconfDevice(final SchemaResourcesDTO schemaResourcesDTO, final RemoteDeviceId id, final RemoteDeviceHandler<NetconfSessionPreferences> salFacade,
65 final ExecutorService globalProcessingExecutor, final ActorSystem actorSystem, final String topologyId, final String nodeId,
66 final ActorContext cachedContext, final boolean reconnectOnSchemaChanged) {
67 super(schemaResourcesDTO, id, salFacade, globalProcessingExecutor, reconnectOnSchemaChanged);
68 this.schemaRepo = (SchemaRepository) schemaResourcesDTO.getSchemaRegistry();
69 this.actorSystem = actorSystem;
70 this.topologyId = topologyId;
72 this.cachedContext = cachedContext;
76 public void onRemoteSessionUp(NetconfSessionPreferences remoteSessionCapabilities, NetconfDeviceCommunicator listener) {
77 LOG.warn("Node {} SessionUp, with capabilities {}", nodeId, remoteSessionCapabilities);
78 this.listener = listener;
79 this.sessionPreferences = remoteSessionCapabilities;
85 protected void handleSalInitializationSuccess(SchemaContext result, NetconfSessionPreferences remoteSessionCapabilities, DOMRpcService deviceRpc) {
86 super.handleSalInitializationSuccess(result, remoteSessionCapabilities, deviceRpc);
88 final Set<SourceIdentifier> sourceIds = Sets.newHashSet();
89 for(ModuleIdentifier id : result.getAllModuleIdentifiers()) {
90 sourceIds.add(SourceIdentifier.create(id.getName(), (SimpleDateFormatUtil.DEFAULT_DATE_REV == id.getRevision() ? Optional.<String>absent() :
91 Optional.of(SimpleDateFormatUtil.getRevisionFormat().format(id.getRevision())))));
94 //TODO extract string constant to util class
95 LOG.debug("Creating master source provider");
96 masterSourceProvider = TypedActor.get(cachedContext).typedActorOf(
97 new TypedProps<>(MasterSourceProvider.class,
98 new Creator<MasterSourceProviderImpl>() {
100 public MasterSourceProviderImpl create() throws Exception {
101 return new MasterSourceProviderImpl(schemaRepo, sourceIds, actorSystem, topologyId, nodeId);
103 }), NetconfTopologyPathCreator.MASTER_SOURCE_PROVIDER);
107 public void onRemoteSessionDown() {
108 super.onRemoteSessionDown();
110 sessionPreferences = null;
111 if (masterSourceProvider != null) {
112 // if we have master the slave that started on this node should be already killed via PoisonPill, so stop master only now
113 LOG.debug("Stopping master source provider for node {}", nodeId);
114 TypedActor.get(actorSystem).stop(masterSourceProvider);
115 masterSourceProvider = null;
117 LOG.debug("Stopping slave source resolver for node {}", nodeId);
118 TypedActor.get(actorSystem).stop(resolver);
123 private void slaveSetupSchema() {
124 //TODO extract string constant to util class
125 resolver = TypedActor.get(cachedContext).typedActorOf(
126 new TypedProps<>(ClusteredDeviceSourcesResolver.class,
127 new Creator<ClusteredDeviceSourcesResolverImpl>() {
129 public ClusteredDeviceSourcesResolverImpl create() throws Exception {
130 return new ClusteredDeviceSourcesResolverImpl(topologyId, nodeId, actorSystem, schemaRegistry, sourceRegistrations);
132 }), NetconfTopologyPathCreator.CLUSTERED_DEVICE_SOURCES_RESOLVER);
134 final FutureCallback<SchemaContext> schemaContextFuture = new FutureCallback<SchemaContext>() {
136 public void onSuccess(SchemaContext schemaContext) {
137 LOG.debug("{}: Schema context built successfully.", id);
139 final NetconfDeviceCapabilities deviceCap = sessionPreferences.getNetconfDeviceCapabilities();
140 final Set<AvailableCapability> providedSourcesQnames = Sets.newHashSet();
141 final Set<AvailableCapability> providedSourcesNonModuleCaps = Sets.newHashSet();
142 for(ModuleIdentifier id : schemaContext.getAllModuleIdentifiers()) {
143 providedSourcesQnames.add(new AvailableCapabilityBuilder()
144 .setCapability(QName.create(id.getQNameModule(), id.getName()).toString()).build());
146 sessionPreferences.getNonModuleCaps().forEach(e -> providedSourcesNonModuleCaps.add(new AvailableCapabilityBuilder()
147 .setCapability(e).build()));
148 deviceCap.addNonModuleBasedCapabilities(providedSourcesNonModuleCaps);
149 deviceCap.addCapabilities(providedSourcesQnames);
151 ClusteredNetconfDevice.super.handleSalInitializationSuccess(
152 schemaContext, sessionPreferences, getDeviceSpecificRpc(schemaContext, listener));
156 public void onFailure(Throwable throwable) {
157 LOG.warn("{}: Unexpected error resolving device sources: {}", id, throwable);
158 handleSalInitializationFailure(throwable, listener);
162 resolver.getResolvedSources().onComplete(
163 new OnComplete<Set<SourceIdentifier>>() {
165 public void onComplete(Throwable throwable, Set<SourceIdentifier> sourceIdentifiers) throws Throwable {
166 if(throwable != null) {
167 if(throwable instanceof MasterSourceProviderOnSameNodeException) {
170 LOG.warn("{}: Unexpected error resolving device sources: {}", id, throwable);
171 handleSalInitializationFailure(throwable, listener);
174 LOG.trace("{}: Trying to build schema context from {}", id, sourceIdentifiers);
175 Futures.addCallback(schemaContextFactory.createSchemaContext(sourceIdentifiers), schemaContextFuture);
178 }, actorSystem.dispatcher());
181 private NetconfDeviceRpc getDeviceSpecificRpc(SchemaContext result, RemoteDeviceCommunicator<NetconfMessage> listener) {
182 return new NetconfDeviceRpc(result, listener, new NetconfMessageTransformer(result, true));
186 public void ownershipChanged(EntityOwnershipChange ownershipChange) {
187 LOG.debug("Entity ownership change received {}", ownershipChange);
188 if(ownershipChange.isOwner()) {
189 super.onRemoteSessionUp(sessionPreferences, listener);
190 } else if (ownershipChange.wasOwner()) {