c0ea15a01bcf8e2a30e88d09b984d1f8ca37681f
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / LifecycleConductorImpl.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.impl;
10
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Optional;
13 import com.google.common.base.Preconditions;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import io.netty.util.HashedWheelTimer;
18 import io.netty.util.Timeout;
19 import io.netty.util.TimerTask;
20 import java.util.Collection;
21 import java.util.List;
22 import java.util.Map;
23 import java.util.concurrent.ConcurrentHashMap;
24 import java.util.concurrent.TimeUnit;
25 import javax.annotation.Nonnull;
26 import javax.annotation.Nullable;
27 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
28 import org.opendaylight.openflowplugin.api.openflow.OFPManager;
29 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
30 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
31 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
32 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
33 import org.opendaylight.openflowplugin.api.openflow.lifecycle.DeviceContextChangeListener;
34 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleConductor;
35 import org.opendaylight.openflowplugin.api.openflow.lifecycle.RoleChangeListener;
36 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ServiceChangeListener;
37 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext;
38 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager;
39 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
40 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
41 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageIntelligenceAgency;
42 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
43 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
44 import org.opendaylight.openflowplugin.impl.util.MdSalRegistrationUtils;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49
50 /**
51  */
52 final class LifecycleConductorImpl implements LifecycleConductor, RoleChangeListener, DeviceContextChangeListener, ExtensionConverterProviderKeeper {
53
54     private static final Logger LOG = LoggerFactory.getLogger(LifecycleConductorImpl.class);
55     private static final int TICKS_PER_WHEEL = 500;
56     private static final long TICK_DURATION = 10; // 0.5 sec.
57
58     private final HashedWheelTimer hashedWheelTimer = new HashedWheelTimer(TICK_DURATION, TimeUnit.MILLISECONDS, TICKS_PER_WHEEL);
59     private ExtensionConverterProvider extensionConverterProvider;
60     private DeviceManager deviceManager;
61     private StatisticsManager statisticsManager;
62     private RpcManager rpcManager;
63     private final MessageIntelligenceAgency messageIntelligenceAgency;
64     private ConcurrentHashMap<DeviceInfo, ServiceChangeListener> serviceChangeListeners = new ConcurrentHashMap<>();
65     private NotificationPublishService notificationPublishService;
66
67     LifecycleConductorImpl(final MessageIntelligenceAgency messageIntelligenceAgency) {
68         this.messageIntelligenceAgency = Preconditions.checkNotNull(messageIntelligenceAgency);
69     }
70
71     @Override
72     public ExtensionConverterProvider getExtensionConverterProvider() {
73         return extensionConverterProvider;
74     }
75
76     @Override
77     public void setExtensionConverterProvider(ExtensionConverterProvider extensionConverterProvider) {
78         this.extensionConverterProvider = extensionConverterProvider;
79     }
80
81     @Override
82     public void setSafelyManager(final OFPManager manager){
83         if (manager instanceof RpcManager) {
84             if (rpcManager != null) {
85                 LOG.info("RPC manager {} is already defined in conductor. ", manager);
86                 return;
87             }
88             this.rpcManager = (RpcManager) manager;
89         } else {
90             if (manager instanceof StatisticsManager) {
91                 if (statisticsManager != null) {
92                     LOG.info("Statistics manager {} is already defined in conductor. ", manager);
93                     return;
94                 }
95                 this.statisticsManager = (StatisticsManager) manager;
96             } else {
97                 if (manager instanceof DeviceManager) {
98                     if (deviceManager != null) {
99                         LOG.info("Device manager {} is already defined in conductor. ", manager);
100                         return;
101                     }
102                     this.deviceManager = (DeviceManager) manager;
103                 }
104             }
105         }
106     }
107
108     @Override
109     public void addOneTimeListenerWhenServicesChangesDone(final ServiceChangeListener manager, final DeviceInfo deviceInfo){
110         LOG.debug("Listener {} for service change for node {} registered.", manager, deviceInfo.getNodeId());
111         serviceChangeListeners.put(deviceInfo, manager);
112     }
113
114     @VisibleForTesting
115     void notifyServiceChangeListeners(final DeviceInfo deviceInfo, final boolean success){
116         if (serviceChangeListeners.size() == 0) {
117             return;
118         }
119         LOG.debug("Notifying registered listeners for service change, no. of listeners {}", serviceChangeListeners.size());
120         for (final Map.Entry<DeviceInfo, ServiceChangeListener> nodeIdServiceChangeListenerEntry : serviceChangeListeners.entrySet()) {
121             if (nodeIdServiceChangeListenerEntry.getKey().equals(deviceInfo)) {
122                 LOG.debug("Listener {} for service change for node {} was notified. Success was set on {}", nodeIdServiceChangeListenerEntry.getValue(), deviceInfo, success);
123                 nodeIdServiceChangeListenerEntry.getValue().servicesChangeDone(deviceInfo, success);
124                 serviceChangeListeners.remove(deviceInfo);
125             }
126         }
127     }
128
129     @Override
130     public void roleInitializationDone(final DeviceInfo deviceInfo, final boolean success) {
131         if (!success) {
132             LOG.warn("Initialization phase for node {} in role context was NOT successful, closing connection.", deviceInfo);
133             closeConnection(deviceInfo);
134         } else {
135             LOG.info("initialization phase for node {} in role context was successful, continuing to next context.", deviceInfo);
136         }
137     }
138
139     public void closeConnection(final DeviceInfo deviceInfo) {
140         LOG.debug("Close connection called for node {}", deviceInfo);
141         final DeviceContext deviceContext = getDeviceContext(deviceInfo);
142         if (null != deviceContext) {
143             deviceManager.notifyDeviceValidListeners(deviceInfo, false);
144             deviceContext.shutdownConnection();
145         }
146     }
147
148     @Override
149     public void roleChangeOnDevice(final DeviceInfo deviceInfo, final OfpRole newRole) {
150
151         final DeviceContext deviceContext = Preconditions.checkNotNull(
152                 deviceManager.gainContext(deviceInfo),
153                 "Something went wrong, device context for nodeId: %s doesn't exists", deviceInfo.getNodeId()
154         );
155
156         final RpcContext rpcContext =  Preconditions.checkNotNull(
157                 rpcManager.gainContext(deviceInfo),
158                 "Something went wrong, rpc context for nodeId: %s doesn't exists", deviceInfo.getNodeId()
159         );
160
161         LOG.info("Role change to {} in role context for node {} was successful.", newRole, deviceInfo);
162
163         final String logText;
164
165         if (OfpRole.BECOMEMASTER.equals(newRole)) {
166             logText = "Start";
167
168             // Fill device flow registry with flows from datastore
169             final ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill =
170                     deviceContext.getDeviceFlowRegistry().fill(deviceInfo.getNodeInstanceIdentifier());
171
172             // Start statistics scheduling only after we finished initializing device flow registry
173             Futures.addCallback(deviceFlowRegistryFill, new FutureCallback<List<Optional<FlowCapableNode>>>() {
174                 @Override
175                 public void onSuccess(@Nullable List<Optional<FlowCapableNode>> result) {
176                     if (LOG.isDebugEnabled()) {
177                         // Count all flows we read from datastore for debugging purposes.
178                         // This number do not always represent how many flows were actually added
179                         // to DeviceFlowRegistry, because of possible duplicates.
180                         long flowCount = Optional.fromNullable(result).asSet().stream()
181                                 .flatMap(Collection::stream)
182                                 .flatMap(flowCapableNodeOptional -> flowCapableNodeOptional.asSet().stream())
183                                 .flatMap(flowCapableNode -> flowCapableNode.getTable().stream())
184                                 .flatMap(table -> table.getFlow().stream())
185                                 .count();
186
187                         LOG.debug("Finished filling flow registry with {} flows for node: {}", flowCount, deviceInfo.getNodeId());
188                     }
189
190                     statisticsManager.startScheduling(deviceInfo);
191                 }
192
193                 @Override
194                 public void onFailure(Throwable t) {
195                     // If we manually cancelled this future, do not start scheduling of statistics
196                     if (deviceFlowRegistryFill.isCancelled()) {
197                         LOG.debug("Cancelled filling flow registry with flows for node: {}", deviceInfo.getNodeId());
198                     } else {
199                         LOG.warn("Failed filling flow registry with flows for node: {} with exception: {}", deviceInfo.getNodeId(), t);
200                         statisticsManager.startScheduling(deviceInfo);
201                     }
202                 }
203             });
204
205             MdSalRegistrationUtils.registerServices(rpcContext, deviceContext, this.extensionConverterProvider);
206
207             if (rpcContext.isStatisticsRpcEnabled()) {
208                 MdSalRegistrationUtils.registerStatCompatibilityServices(
209                         rpcContext,
210                         deviceContext,
211                         notificationPublishService);
212             }
213
214             // Fill flow registry with flows found in operational and config datastore
215             deviceContext.getDeviceFlowRegistry().fill(deviceInfo.getNodeInstanceIdentifier());
216         } else {
217             logText = "Stopp";
218             statisticsManager.stopScheduling(deviceInfo);
219
220             // Clean device flow registry if we became slave
221             if (OfpRole.BECOMESLAVE.equals(newRole)) {
222                 deviceContext.getDeviceFlowRegistry().close();
223             }
224
225             MdSalRegistrationUtils.unregisterServices(rpcContext);
226         }
227
228         final ListenableFuture<Void> onClusterRoleChange = deviceManager.onClusterRoleChange(deviceInfo, newRole);
229         Futures.addCallback(onClusterRoleChange, new FutureCallback<Void>() {
230             @Override
231             public void onSuccess(@Nullable final Void aVoid) {
232                 LOG.info("{}ing services for node {} was successful", logText, deviceInfo);
233                 if (newRole.equals(OfpRole.BECOMESLAVE)) {
234                     notifyServiceChangeListeners(deviceInfo, true);
235                 }
236             }
237
238             @Override
239             public void onFailure(final Throwable throwable) {
240                 LOG.warn("{}ing services for node {} was NOT successful, closing connection", logText, deviceInfo);
241                 closeConnection(deviceInfo);
242             }
243         });
244     }
245
246     public MessageIntelligenceAgency getMessageIntelligenceAgency() {
247         return messageIntelligenceAgency;
248     }
249
250     @Override
251     public DeviceContext getDeviceContext(DeviceInfo deviceInfo){
252          return deviceManager.gainContext(deviceInfo);
253     }
254
255     @Override
256     public StatisticsContext getStatisticsContext(DeviceInfo deviceInfo){
257         return statisticsManager.gainContext(deviceInfo);
258     }
259
260     public Timeout newTimeout(@Nonnull TimerTask task, long delay, @Nonnull TimeUnit unit) {
261         return hashedWheelTimer.newTimeout(task, delay, unit);
262     }
263
264     @Override
265     public ConnectionContext.CONNECTION_STATE gainConnectionStateSafely(final DeviceInfo deviceInfo){
266         return (null != getDeviceContext(deviceInfo)) ? getDeviceContext(deviceInfo).getPrimaryConnectionContext().getConnectionState() : null;
267     }
268
269     @Override
270     public Long reserveXidForDeviceMessage(final DeviceInfo deviceInfo){
271         return null != getDeviceContext(deviceInfo) ? getDeviceContext(deviceInfo).reserveXidForDeviceMessage() : null;
272     }
273
274     @Override
275     public void deviceStartInitializationDone(final DeviceInfo deviceInfo, final boolean success) {
276         if (!success) {
277             LOG.warn("Initialization phase for node {} in device context was NOT successful, closing connection.", deviceInfo);
278             closeConnection(deviceInfo);
279         } else {
280             LOG.info("initialization phase for node {} in device context was successful. Continuing to next context.", deviceInfo);
281         }
282     }
283
284     @Override
285     public void deviceInitializationDone(final DeviceInfo deviceInfo, final boolean success) {
286         if (!success) {
287             LOG.warn("Initialization phase for node {} in device context was NOT successful, closing connection.", deviceInfo);
288             closeConnection(deviceInfo);
289         } else {
290             LOG.info("initialization phase for node {} in device context was successful. All phases initialized OK.", deviceInfo);
291         }
292     }
293
294     @VisibleForTesting
295     boolean isServiceChangeListenersEmpty() {
296         return this.serviceChangeListeners.isEmpty();
297     }
298
299     @Override
300     public NotificationPublishService getNotificationPublishService() {
301         return notificationPublishService;
302     }
303
304     @Override
305     public void setNotificationPublishService(NotificationPublishService notificationPublishService) {
306         this.notificationPublishService = notificationPublishService;
307     }
308 }