Bug 6110: Fixed bugs in statistics manager due to race condition.
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / lifecycle / LifecycleServiceImpl.java
1 /**
2  * Copyright (c) 2016 Pantheon Technologies s.r.o. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.lifecycle;
9
10 import com.google.common.base.Function;
11 import com.google.common.base.Optional;
12 import com.google.common.base.Verify;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import java.util.ArrayList;
17 import java.util.Collection;
18 import java.util.List;
19 import java.util.Objects;
20 import javax.annotation.Nullable;
21 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
22 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
23 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
24 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
25 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext.CONNECTION_STATE;
26 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
27 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
28 import org.opendaylight.openflowplugin.api.openflow.device.handlers.ClusterInitializationPhaseHandler;
29 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceRemovedHandler;
30 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
31 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext;
32 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 public class LifecycleServiceImpl implements LifecycleService {
38
39     private static final Logger LOG = LoggerFactory.getLogger(LifecycleServiceImpl.class);
40     private DeviceContext deviceContext;
41     private RpcContext rpcContext;
42     private StatisticsContext statContext;
43     private ClusterSingletonServiceRegistration registration;
44     private ClusterInitializationPhaseHandler clusterInitializationPhaseHandler;
45     private final List<DeviceRemovedHandler> deviceRemovedHandlers = new ArrayList<>();
46     private volatile CONTEXT_STATE state = CONTEXT_STATE.INITIALIZATION;
47
48
49     @Override
50     public void instantiateServiceInstance() {
51         LOG.info("Starting clustering MASTER services for node {}", getDeviceInfo().getLOGValue());
52
53         if (!clusterInitializationPhaseHandler.onContextInstantiateService(null)) {
54             closeConnection();
55         }
56     }
57
58     @Override
59     public ListenableFuture<Void> closeServiceInstance() {
60         final boolean connectionInterrupted =
61                 this.deviceContext
62                         .getPrimaryConnectionContext()
63                         .getConnectionState()
64                         .equals(ConnectionContext.CONNECTION_STATE.RIP);
65
66         // Chain all jobs that will stop our services
67         final List<ListenableFuture<Void>> futureList = new ArrayList<>();
68         futureList.add(statContext.stopClusterServices(connectionInterrupted));
69         futureList.add(rpcContext.stopClusterServices(connectionInterrupted));
70         futureList.add(deviceContext.stopClusterServices(connectionInterrupted));
71
72         return Futures.transform(Futures.successfulAsList(futureList), new Function<List<Void>, Void>() {
73             @Nullable
74             @Override
75             public Void apply(@Nullable List<Void> input) {
76                 LOG.debug("Closed clustering MASTER services for node {}", getDeviceInfo().getLOGValue());
77                 return null;
78             }
79         });
80     }
81
82     @Override
83     public ServiceGroupIdentifier getIdentifier() {
84         return getServiceIdentifier();
85     }
86
87     @Override
88     public CONTEXT_STATE getState() {
89         return this.state;
90     }
91
92     @Override
93     public ServiceGroupIdentifier getServiceIdentifier() {
94         return deviceContext.getServiceIdentifier();
95     }
96
97     @Override
98     public DeviceInfo getDeviceInfo() {
99         return deviceContext.getDeviceInfo();
100     }
101
102     @Override
103     public void close() {
104         if (CONTEXT_STATE.TERMINATION.equals(getState())){
105             if (LOG.isDebugEnabled()) {
106                 LOG.debug("LifecycleService is already in TERMINATION state.");
107             }
108         } else {
109             this.state = CONTEXT_STATE.TERMINATION;
110
111             // We are closing, so cleanup all managers now
112             deviceRemovedHandlers.forEach(h -> h.onDeviceRemoved(getDeviceInfo()));
113
114             // If we are still registered and we are not already closing, then close the registration
115             if (Objects.nonNull(registration)) {
116                 try {
117                     LOG.debug("Closing clustering MASTER services for node {}", getDeviceInfo().getLOGValue());
118                     registration.close();
119                 } catch (Exception e) {
120                     LOG.debug("Failed to close clustering MASTER services for node {} with exception: ",
121                             getDeviceInfo().getLOGValue(), e);
122                 }
123             }
124         }
125     }
126
127     @Override
128     public void registerService(final ClusterSingletonServiceProvider singletonServiceProvider) {
129         LOG.debug("Registered clustering MASTER services for node {}", getDeviceInfo().getLOGValue());
130
131         // lifecycle service -> device context -> statistics context -> rpc context -> role context -> lifecycle service
132         this.clusterInitializationPhaseHandler = deviceContext;
133         this.deviceContext.setLifecycleInitializationPhaseHandler(this.statContext);
134         this.statContext.setLifecycleInitializationPhaseHandler(this.rpcContext);
135         this.rpcContext.setLifecycleInitializationPhaseHandler(this);
136         //Set initial submit handler
137         this.statContext.setInitialSubmitHandler(this.deviceContext);
138
139         // Register cluster singleton service
140         try {
141             this.registration = Verify.verifyNotNull(singletonServiceProvider.registerClusterSingletonService(this));
142             LOG.info("Registered clustering MASTER services for node {}", getDeviceInfo().getLOGValue());
143         } catch (Exception e) {
144             LOG.warn("Failed to register cluster singleton service for node {}, with exception: {}", getDeviceInfo(), e);
145             closeConnection();
146         }
147     }
148
149     @Override
150     public void registerDeviceRemovedHandler(final DeviceRemovedHandler deviceRemovedHandler) {
151         if (!deviceRemovedHandlers.contains(deviceRemovedHandler)) {
152             deviceRemovedHandlers.add(deviceRemovedHandler);
153         }
154     }
155
156     @Override
157     public void setDeviceContext(final DeviceContext deviceContext) {
158         this.deviceContext = deviceContext;
159     }
160
161     @Override
162     public void setRpcContext(final RpcContext rpcContext) {
163         this.rpcContext = rpcContext;
164     }
165
166     @Override
167     public void setStatContext(final StatisticsContext statContext) {
168         this.statContext = statContext;
169     }
170
171     @Override
172     public DeviceContext getDeviceContext() {
173         return this.deviceContext;
174     }
175
176     @Override
177     public void closeConnection() {
178         if (LOG.isDebugEnabled()) {
179             LOG.debug("Closing connection for node {}.", getDeviceInfo().getLOGValue());
180         }
181
182         this.deviceContext.shutdownConnection();
183     }
184
185     private void fillDeviceFlowRegistry() {
186         final ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill = deviceContext.getDeviceFlowRegistry().fill();
187         Futures.addCallback(deviceFlowRegistryFill, new DeviceFlowRegistryCallback(deviceFlowRegistryFill));
188     }
189
190     @Override
191     public void setLifecycleInitializationPhaseHandler(final ClusterInitializationPhaseHandler handler) {
192         this.clusterInitializationPhaseHandler = handler;
193     }
194
195     @Override
196     public boolean onContextInstantiateService(final ConnectionContext connectionContext) {
197         if (CONNECTION_STATE.RIP.equals(connectionContext.getConnectionState())) {
198             if (LOG.isDebugEnabled()) {
199                 LOG.debug("Connection to the device {} was interrupted.", getDeviceInfo().getLOGValue());
200             }
201
202             return false;
203         }
204
205         fillDeviceFlowRegistry();
206         return true;
207     }
208
209     private class DeviceFlowRegistryCallback implements FutureCallback<List<Optional<FlowCapableNode>>> {
210         private final ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill;
211
212         DeviceFlowRegistryCallback(ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill) {
213             this.deviceFlowRegistryFill = deviceFlowRegistryFill;
214         }
215
216         @Override
217         public void onSuccess(@Nullable List<Optional<FlowCapableNode>> result) {
218             if (LOG.isDebugEnabled()) {
219                 // Count all flows we read from datastore for debugging purposes.
220                 // This number do not always represent how many flows were actually added
221                 // to DeviceFlowRegistry, because of possible duplicates.
222                 long flowCount = Optional.fromNullable(result).asSet().stream()
223                         .flatMap(Collection::stream)
224                         .filter(Objects::nonNull)
225                         .flatMap(flowCapableNodeOptional -> flowCapableNodeOptional.asSet().stream())
226                         .filter(Objects::nonNull)
227                         .filter(flowCapableNode -> Objects.nonNull(flowCapableNode.getTable()))
228                         .flatMap(flowCapableNode -> flowCapableNode.getTable().stream())
229                         .filter(Objects::nonNull)
230                         .filter(table -> Objects.nonNull(table.getFlow()))
231                         .flatMap(table -> table.getFlow().stream())
232                         .filter(Objects::nonNull)
233                         .count();
234
235                 LOG.debug("Finished filling flow registry with {} flows for node: {}", flowCount, getDeviceInfo().getLOGValue());
236             }
237         }
238
239         @Override
240         public void onFailure(Throwable t) {
241             if (deviceFlowRegistryFill.isCancelled()) {
242                 if (LOG.isDebugEnabled()) {
243                     LOG.debug("Cancelled filling flow registry with flows for node: {}", getDeviceInfo().getLOGValue());
244                 }
245             } else {
246                 LOG.warn("Failed filling flow registry with flows for node: {} with exception: {}", getDeviceInfo().getLOGValue(), t);
247             }
248         }
249     }
250 }