6cd8fcf2f5f8071ec942705be33636073a87c6db
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / LifecycleConductorImpl.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.impl;
10
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Optional;
13 import com.google.common.base.Preconditions;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import io.netty.util.HashedWheelTimer;
18 import io.netty.util.Timeout;
19 import io.netty.util.TimerTask;
20 import java.util.Collection;
21 import java.util.List;
22 import java.util.Map;
23 import java.util.concurrent.ConcurrentHashMap;
24 import java.util.concurrent.TimeUnit;
25 import javax.annotation.Nonnull;
26 import javax.annotation.Nullable;
27 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
28 import org.opendaylight.openflowplugin.api.openflow.OFPManager;
29 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
30 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
31 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
32 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
33 import org.opendaylight.openflowplugin.api.openflow.lifecycle.DeviceContextChangeListener;
34 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleConductor;
35 import org.opendaylight.openflowplugin.api.openflow.lifecycle.RoleChangeListener;
36 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ServiceChangeListener;
37 import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
38 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext;
39 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager;
40 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
41 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
42 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageIntelligenceAgency;
43 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
44 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
45 import org.opendaylight.openflowplugin.impl.util.MdSalRegistrationUtils;
46 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
51
52 /**
53  */
54 final class LifecycleConductorImpl implements LifecycleConductor, RoleChangeListener, DeviceContextChangeListener, ExtensionConverterProviderKeeper {
55
56     private static final Logger LOG = LoggerFactory.getLogger(LifecycleConductorImpl.class);
57     private static final int TICKS_PER_WHEEL = 500;
58     private static final long TICK_DURATION = 10; // 0.5 sec.
59
60     private final HashedWheelTimer hashedWheelTimer = new HashedWheelTimer(TICK_DURATION, TimeUnit.MILLISECONDS, TICKS_PER_WHEEL);
61     private ExtensionConverterProvider extensionConverterProvider;
62     private DeviceManager deviceManager;
63     private StatisticsManager statisticsManager;
64     private RpcManager rpcManager;
65     private final MessageIntelligenceAgency messageIntelligenceAgency;
66     private final ConvertorExecutor convertorExecutor;
67     private ConcurrentHashMap<DeviceInfo, ServiceChangeListener> serviceChangeListeners = new ConcurrentHashMap<>();
68     private NotificationPublishService notificationPublishService;
69
70     LifecycleConductorImpl(final MessageIntelligenceAgency messageIntelligenceAgency, ConvertorExecutor convertorExecutor) {
71         this.messageIntelligenceAgency = Preconditions.checkNotNull(messageIntelligenceAgency);
72         this.convertorExecutor = convertorExecutor;
73     }
74
75     @Override
76     public ExtensionConverterProvider getExtensionConverterProvider() {
77         return extensionConverterProvider;
78     }
79
80     @Override
81     public void setExtensionConverterProvider(ExtensionConverterProvider extensionConverterProvider) {
82         this.extensionConverterProvider = extensionConverterProvider;
83     }
84
85     @Override
86     public void setSafelyManager(final OFPManager manager){
87         if (manager instanceof RpcManager) {
88             if (rpcManager != null) {
89                 LOG.info("RPC manager {} is already defined in conductor. ", manager);
90                 return;
91             }
92             this.rpcManager = (RpcManager) manager;
93         } else {
94             if (manager instanceof StatisticsManager) {
95                 if (statisticsManager != null) {
96                     LOG.info("Statistics manager {} is already defined in conductor. ", manager);
97                     return;
98                 }
99                 this.statisticsManager = (StatisticsManager) manager;
100             } else {
101                 if (manager instanceof DeviceManager) {
102                     if (deviceManager != null) {
103                         LOG.info("Device manager {} is already defined in conductor. ", manager);
104                         return;
105                     }
106                     this.deviceManager = (DeviceManager) manager;
107                 }
108             }
109         }
110     }
111
112     @Override
113     public void addOneTimeListenerWhenServicesChangesDone(final ServiceChangeListener manager, final DeviceInfo deviceInfo){
114         LOG.debug("Listener {} for service change for node {} registered.", manager, deviceInfo.getNodeId());
115         serviceChangeListeners.put(deviceInfo, manager);
116     }
117
118     @VisibleForTesting
119     void notifyServiceChangeListeners(final DeviceInfo deviceInfo, final boolean success){
120         if (serviceChangeListeners.size() == 0) {
121             return;
122         }
123         LOG.debug("Notifying registered listeners for service change, no. of listeners {}", serviceChangeListeners.size());
124         for (final Map.Entry<DeviceInfo, ServiceChangeListener> nodeIdServiceChangeListenerEntry : serviceChangeListeners.entrySet()) {
125             if (nodeIdServiceChangeListenerEntry.getKey().equals(deviceInfo)) {
126                 LOG.debug("Listener {} for service change for node {} was notified. Success was set on {}", nodeIdServiceChangeListenerEntry.getValue(), deviceInfo.getNodeId().getValue(), success);
127                 nodeIdServiceChangeListenerEntry.getValue().servicesChangeDone(deviceInfo, success);
128                 serviceChangeListeners.remove(deviceInfo);
129             }
130         }
131     }
132
133     @Override
134     public void roleInitializationDone(final DeviceInfo deviceInfo, final boolean success) {
135         if (!success) {
136             LOG.warn("Initialization phase for node {} in role context was NOT successful, closing connection.", deviceInfo.getNodeId().getValue());
137             closeConnection(deviceInfo);
138         } else {
139             LOG.info("initialization phase for node {} in role context was successful, continuing to next context.", deviceInfo.getNodeId().getValue());
140         }
141     }
142
143     public void closeConnection(final DeviceInfo deviceInfo) {
144         LOG.debug("Close connection called for node {}", deviceInfo);
145         final DeviceContext deviceContext = getDeviceContext(deviceInfo);
146         if (null != deviceContext) {
147             deviceManager.notifyDeviceValidListeners(deviceInfo, false);
148             deviceContext.shutdownConnection();
149         }
150     }
151
152     @Override
153     public void roleChangeOnDevice(final DeviceInfo deviceInfo, final OfpRole newRole) {
154
155         final DeviceContext deviceContext = Preconditions.checkNotNull(
156                 deviceManager.gainContext(deviceInfo),
157                 "Something went wrong, device context for nodeId: %s doesn't exists", deviceInfo.getNodeId().getValue()
158         );
159
160         final RpcContext rpcContext =  Preconditions.checkNotNull(
161                 rpcManager.gainContext(deviceInfo),
162                 "Something went wrong, rpc context for nodeId: %s doesn't exists", deviceInfo.getNodeId().getValue()
163         );
164
165         LOG.info("Role change to {} in role context for node {} was successful.", newRole, deviceInfo.getNodeId().getValue());
166
167         final String logText;
168
169         if (OfpRole.BECOMEMASTER.equals(newRole)) {
170             logText = "Start";
171             fillDeviceFlowRegistry(deviceInfo, deviceContext.getDeviceFlowRegistry());
172             MdSalRegistrationUtils.registerServices(rpcContext, deviceContext, this.extensionConverterProvider, convertorExecutor);
173
174             if (rpcContext.isStatisticsRpcEnabled()) {
175                 MdSalRegistrationUtils.registerStatCompatibilityServices(
176                         rpcContext,
177                         deviceContext,
178                         notificationPublishService, convertorExecutor);
179             }
180         } else {
181             logText = "Stopp";
182             statisticsManager.stopScheduling(deviceInfo);
183
184             // Clean device flow registry if we became slave
185             if (OfpRole.BECOMESLAVE.equals(newRole)) {
186                 deviceContext.getDeviceFlowRegistry().close();
187             }
188
189             MdSalRegistrationUtils.unregisterServices(rpcContext);
190         }
191
192         final ListenableFuture<Void> onClusterRoleChange = deviceManager.onClusterRoleChange(deviceInfo, newRole);
193         Futures.addCallback(onClusterRoleChange, new FutureCallback<Void>() {
194             @Override
195             public void onSuccess(@Nullable final Void aVoid) {
196                 LOG.info("{}ing services for node {} was successful", logText, deviceInfo);
197                 if (newRole.equals(OfpRole.BECOMESLAVE)) {
198                     notifyServiceChangeListeners(deviceInfo, true);
199                 }
200             }
201
202             @Override
203             public void onFailure(final Throwable throwable) {
204                 LOG.warn("{}ing services for node {} was NOT successful, closing connection", logText, deviceInfo.getNodeId().getValue());
205                 closeConnection(deviceInfo);
206             }
207         });
208     }
209
210     private void fillDeviceFlowRegistry(final DeviceInfo deviceInfo, final DeviceFlowRegistry deviceFlowRegistry) {
211         // Fill device flow registry with flows from datastore
212         final ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill = deviceFlowRegistry.fill();
213
214         // Start statistics scheduling only after we finished initializing device flow registry
215         Futures.addCallback(deviceFlowRegistryFill, new FutureCallback<List<Optional<FlowCapableNode>>>() {
216             @Override
217             public void onSuccess(@Nullable List<Optional<FlowCapableNode>> result) {
218                 if (LOG.isDebugEnabled()) {
219                     // Count all flows we read from datastore for debugging purposes.
220                     // This number do not always represent how many flows were actually added
221                     // to DeviceFlowRegistry, because of possible duplicates.
222                     long flowCount = Optional.fromNullable(result).asSet().stream()
223                             .flatMap(Collection::stream)
224                             .flatMap(flowCapableNodeOptional -> flowCapableNodeOptional.asSet().stream())
225                             .flatMap(flowCapableNode -> flowCapableNode.getTable().stream())
226                             .flatMap(table -> table.getFlow().stream())
227                             .count();
228
229                     LOG.debug("Finished filling flow registry with {} flows for node: {}", flowCount, deviceInfo.getNodeId().getValue());
230                 }
231
232                 statisticsManager.startScheduling(deviceInfo);
233             }
234
235             @Override
236             public void onFailure(Throwable t) {
237                 // If we manually cancelled this future, do not start scheduling of statistics
238                 if (deviceFlowRegistryFill.isCancelled()) {
239                     LOG.debug("Cancelled filling flow registry with flows for node: {}", deviceInfo.getNodeId().getValue());
240                 } else {
241                     LOG.warn("Failed filling flow registry with flows for node: {} with exception: {}", deviceInfo.getNodeId().getValue(), t);
242                     statisticsManager.startScheduling(deviceInfo);
243                 }
244             }
245         });
246     }
247
248     public MessageIntelligenceAgency getMessageIntelligenceAgency() {
249         return messageIntelligenceAgency;
250     }
251
252     @Override
253     public DeviceContext getDeviceContext(DeviceInfo deviceInfo){
254          return deviceManager.gainContext(deviceInfo);
255     }
256
257     @Override
258     public StatisticsContext getStatisticsContext(DeviceInfo deviceInfo){
259         return statisticsManager.gainContext(deviceInfo);
260     }
261
262     public Timeout newTimeout(@Nonnull TimerTask task, long delay, @Nonnull TimeUnit unit) {
263         return hashedWheelTimer.newTimeout(task, delay, unit);
264     }
265
266     @Override
267     public ConnectionContext.CONNECTION_STATE gainConnectionStateSafely(final DeviceInfo deviceInfo){
268         return (null != getDeviceContext(deviceInfo)) ? getDeviceContext(deviceInfo).getPrimaryConnectionContext().getConnectionState() : null;
269     }
270
271     @Override
272     public Long reserveXidForDeviceMessage(final DeviceInfo deviceInfo){
273         return null != getDeviceContext(deviceInfo) ? getDeviceContext(deviceInfo).reserveXidForDeviceMessage() : null;
274     }
275
276     @Override
277     public void deviceStartInitializationDone(final DeviceInfo deviceInfo, final boolean success) {
278         if (!success) {
279             LOG.warn("Initialization phase for node {} in device context was NOT successful, closing connection.", deviceInfo.getNodeId().getValue());
280             closeConnection(deviceInfo);
281         } else {
282             LOG.info("initialization phase for node {} in device context was successful. Continuing to next context.", deviceInfo.getNodeId().getValue());
283         }
284     }
285
286     @Override
287     public void deviceInitializationDone(final DeviceInfo deviceInfo, final boolean success) {
288         if (!success) {
289             LOG.warn("Initialization phase for node {} in device context was NOT successful, closing connection.", deviceInfo.getNodeId().getValue());
290             closeConnection(deviceInfo);
291         } else {
292             LOG.info("initialization phase for node {} in device context was successful. All phases initialized OK.", deviceInfo.getNodeId().getValue());
293         }
294     }
295
296     @VisibleForTesting
297     boolean isServiceChangeListenersEmpty() {
298         return this.serviceChangeListeners.isEmpty();
299     }
300
301     @Override
302     public NotificationPublishService getNotificationPublishService() {
303         return notificationPublishService;
304     }
305
306     @Override
307     public void setNotificationPublishService(NotificationPublishService notificationPublishService) {
308         this.notificationPublishService = notificationPublishService;
309     }
310 }