Merge "Remove RoleManager and RoleContext" into stable/boron
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / lifecycle / LifecycleServiceImpl.java
1 /**
2  * Copyright (c) 2016 Pantheon Technologies s.r.o. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.lifecycle;
9
10 import com.google.common.base.Function;
11 import com.google.common.base.Optional;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import java.util.ArrayList;
16 import java.util.Collection;
17 import java.util.List;
18 import java.util.Objects;
19 import javax.annotation.Nullable;
20 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
21 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
22 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
23 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
24 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
25 import org.opendaylight.openflowplugin.api.openflow.device.handlers.ClusterInitializationPhaseHandler;
26 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
27 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext;
28 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33 public class LifecycleServiceImpl implements LifecycleService {
34
35     private static final Logger LOG = LoggerFactory.getLogger(LifecycleServiceImpl.class);
36
37     private boolean inClosing = false;
38     private DeviceContext deviceContext;
39     private RpcContext rpcContext;
40     private StatisticsContext statContext;
41     private ClusterSingletonServiceRegistration registration;
42     private ClusterInitializationPhaseHandler clusterInitializationPhaseHandler;
43
44
45     @Override
46     public void instantiateServiceInstance() {
47         LOG.info("Starting clustering MASTER services for node {}", this.deviceContext.getDeviceInfo().getLOGValue());
48
49         if (!this.clusterInitializationPhaseHandler.onContextInstantiateService(null)) {
50             this.closeConnection();
51         }
52
53     }
54
55     @Override
56     public ListenableFuture<Void> closeServiceInstance() {
57         LOG.info("Closing clustering MASTER services for node {}", this.deviceContext.getDeviceInfo().getLOGValue());
58
59         final boolean connectionInterrupted =
60                 this.deviceContext
61                         .getPrimaryConnectionContext()
62                         .getConnectionState()
63                         .equals(ConnectionContext.CONNECTION_STATE.RIP);
64
65         // If connection was interrupted and we are not trying to close service, then we received something
66         // we do not wanted to receive, so do not continue
67         if (connectionInterrupted && !inClosing) {
68             LOG.warn("Failed to close clustering MASTER services for node {} because they are already closed",
69                     LifecycleServiceImpl.this.deviceContext.getDeviceInfo().getLOGValue());
70
71             return Futures.immediateCancelledFuture();
72         }
73
74         // Chain all jobs that will stop our services
75         final List<ListenableFuture<Void>> futureList = new ArrayList<>();
76         futureList.add(statContext.stopClusterServices(connectionInterrupted));
77         futureList.add(rpcContext.stopClusterServices(connectionInterrupted));
78         futureList.add(deviceContext.stopClusterServices(connectionInterrupted));
79
80         // When we stopped all jobs then we are not in closing state anymore (at least from plugin perspective)
81         return Futures.transform(Futures.successfulAsList(futureList), new Function<List<Void>, Void>() {
82             @Nullable
83             @Override
84             public Void apply(@Nullable List<Void> input) {
85                 LOG.debug("Closed clustering MASTER services for node {}",
86                         LifecycleServiceImpl.this.deviceContext.getDeviceInfo().getLOGValue());
87                 return null;
88             }
89         });
90     }
91
92     @Override
93     public ServiceGroupIdentifier getIdentifier() {
94         return deviceContext.getServiceIdentifier();
95     }
96
97
98     @Override
99     public void close() throws Exception {
100         // If we are still registered and we are not already closing, then close the registration
101         if (Objects.nonNull(registration) && !inClosing) {
102             inClosing = true;
103             registration.close();
104             registration = null;
105         }
106     }
107
108     @Override
109     public void registerService(final ClusterSingletonServiceProvider singletonServiceProvider) {
110         LOG.info("Registering clustering MASTER services for node {}", this.deviceContext.getDeviceInfo().getLOGValue());
111
112         //lifecycle service -> device context -> statistics context -> rpc context -> role context -> lifecycle service
113         this.clusterInitializationPhaseHandler = deviceContext;
114         this.deviceContext.setLifecycleInitializationPhaseHandler(this.statContext);
115         this.statContext.setLifecycleInitializationPhaseHandler(this.rpcContext);
116         this.rpcContext.setLifecycleInitializationPhaseHandler(this);
117         //Set initial submit handler
118         this.statContext.setInitialSubmitHandler(this.deviceContext);
119         //Register cluster singleton service
120         this.registration = singletonServiceProvider.registerClusterSingletonService(this);
121     }
122
123     @Override
124     public void setDeviceContext(final DeviceContext deviceContext) {
125         this.deviceContext = deviceContext;
126     }
127
128     @Override
129     public void setRpcContext(final RpcContext rpcContext) {
130         this.rpcContext = rpcContext;
131     }
132
133     @Override
134     public void setStatContext(final StatisticsContext statContext) {
135         this.statContext = statContext;
136     }
137
138     @Override
139     public DeviceContext getDeviceContext() {
140         return this.deviceContext;
141     }
142
143     @Override
144     public void closeConnection() {
145         this.deviceContext.shutdownConnection();
146     }
147
148     private void fillDeviceFlowRegistry() {
149         final ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill = deviceContext.getDeviceFlowRegistry().fill();
150         Futures.addCallback(deviceFlowRegistryFill, new DeviceFlowRegistryCallback(deviceFlowRegistryFill));
151     }
152
153     @Override
154     public void setLifecycleInitializationPhaseHandler(final ClusterInitializationPhaseHandler handler) {
155         this.clusterInitializationPhaseHandler = handler;
156     }
157
158     @Override
159     public boolean onContextInstantiateService(final ConnectionContext connectionContext) {
160
161         if (ConnectionContext.CONNECTION_STATE.RIP.equals(connectionContext.getConnectionState())) {
162             if (LOG.isDebugEnabled()) {
163                 LOG.debug("Connection to the device {} was interrupted.", this.deviceContext.getDeviceInfo().getLOGValue());
164             }
165             return false;
166         }
167
168         fillDeviceFlowRegistry();
169         return true;
170     }
171
172     private class DeviceFlowRegistryCallback implements FutureCallback<List<Optional<FlowCapableNode>>> {
173         private final ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill;
174
175         public DeviceFlowRegistryCallback(ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill) {
176             this.deviceFlowRegistryFill = deviceFlowRegistryFill;
177         }
178
179         @Override
180         public void onSuccess(@Nullable List<Optional<FlowCapableNode>> result) {
181             if (LOG.isDebugEnabled()) {
182                 // Count all flows we read from datastore for debugging purposes.
183                 // This number do not always represent how many flows were actually added
184                 // to DeviceFlowRegistry, because of possible duplicates.
185                 long flowCount = Optional.fromNullable(result).asSet().stream()
186                         .flatMap(Collection::stream)
187                         .filter(Objects::nonNull)
188                         .flatMap(flowCapableNodeOptional -> flowCapableNodeOptional.asSet().stream())
189                         .filter(Objects::nonNull)
190                         .filter(flowCapableNode -> Objects.nonNull(flowCapableNode.getTable()))
191                         .flatMap(flowCapableNode -> flowCapableNode.getTable().stream())
192                         .filter(Objects::nonNull)
193                         .filter(table -> Objects.nonNull(table.getFlow()))
194                         .flatMap(table -> table.getFlow().stream())
195                         .filter(Objects::nonNull)
196                         .count();
197
198                 LOG.debug("Finished filling flow registry with {} flows for node: {}", flowCount, deviceContext.getDeviceInfo().getLOGValue());
199             }
200         }
201
202         @Override
203         public void onFailure(Throwable t) {
204             if (deviceFlowRegistryFill.isCancelled()) {
205                 if (LOG.isDebugEnabled()) {
206                     LOG.debug("Cancelled filling flow registry with flows for node: {}", deviceContext.getDeviceInfo().getLOGValue());
207                 }
208             } else {
209                 LOG.warn("Failed filling flow registry with flows for node: {} with exception: {}", deviceContext.getDeviceInfo().getLOGValue(), t);
210             }
211         }
212     }
213 }