Change return type of events
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / lifecycle / ContextChainImpl.java
1 /*
2  * Copyright (c) 2016 Pantheon Technologies s.r.o. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.lifecycle;
9
10 import static org.opendaylight.openflowplugin.api.openflow.OFPContext.ContextState;
11
12 import com.google.common.collect.Lists;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import java.util.List;
17 import java.util.Objects;
18 import java.util.concurrent.CopyOnWriteArrayList;
19 import java.util.concurrent.ExecutorService;
20 import java.util.concurrent.atomic.AtomicBoolean;
21 import java.util.stream.Collectors;
22 import javax.annotation.Nonnull;
23 import javax.annotation.Nullable;
24 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
25 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
26 import org.opendaylight.openflowplugin.api.ConnectionException;
27 import org.opendaylight.openflowplugin.api.openflow.OFPContext;
28 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
29 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
30 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
31 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceRemovedHandler;
32 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChain;
33 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
34 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipWatcher;
35 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainState;
36 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainStateListener;
37 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput;
39 import org.opendaylight.yangtools.yang.common.RpcResult;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42
43 public class ContextChainImpl implements ContextChain {
44     private static final Logger LOG = LoggerFactory.getLogger(ContextChainImpl.class);
45     private final AtomicBoolean masterStateOnDevice = new AtomicBoolean(false);
46     private final AtomicBoolean initialGathering = new AtomicBoolean(false);
47     private final AtomicBoolean initialSubmitting = new AtomicBoolean(false);
48     private final AtomicBoolean registryFilling = new AtomicBoolean(false);
49     private final AtomicBoolean rpcRegistration = new AtomicBoolean(false);
50     private final List<DeviceRemovedHandler> deviceRemovedHandlers = new CopyOnWriteArrayList<>();
51     private final List<OFPContext> contexts = new CopyOnWriteArrayList<>();
52     private final List<ConnectionContext> auxiliaryConnections = new CopyOnWriteArrayList<>();
53     private final ExecutorService executorService;
54     private final ContextChainMastershipWatcher contextChainMastershipWatcher;
55     private final DeviceInfo deviceInfo;
56     private final ConnectionContext primaryConnection;
57     private AutoCloseable registration;
58     private ContextState state = ContextState.INITIALIZATION;
59
60     private volatile ContextChainState contextChainState = ContextChainState.UNDEFINED;
61
62     ContextChainImpl(@Nonnull final ContextChainMastershipWatcher contextChainMastershipWatcher,
63                      @Nonnull final ConnectionContext connectionContext,
64                      @Nonnull final ExecutorService executorService) {
65         this.contextChainMastershipWatcher = contextChainMastershipWatcher;
66         this.primaryConnection = connectionContext;
67         this.deviceInfo = connectionContext.getDeviceInfo();
68         this.executorService = executorService;
69     }
70
71     @Override
72     public <T extends OFPContext> void addContext(@Nonnull final T context) {
73         contexts.add(context);
74     }
75
76     @Override
77     public void instantiateServiceInstance() {
78         LOG.info("Starting clustering services for node {}", deviceInfo);
79
80         try {
81             contexts.forEach(this::initializeContextService);
82             LOG.info("Started clustering services for node {}", deviceInfo);
83         } catch (final Exception ex) {
84             executorService.submit(() -> contextChainMastershipWatcher
85                     .onNotAbleToStartMastershipMandatory(deviceInfo, ex.getMessage()));
86         }
87     }
88
89     @Override
90     public ListenableFuture<Void> closeServiceInstance() {
91         LOG.info("Closing clustering services for node {}", deviceInfo);
92         contextChainMastershipWatcher.onSlaveRoleAcquired(deviceInfo);
93
94         final ListenableFuture<List<Void>> servicesToBeClosed = Futures
95                 .successfulAsList(Lists.reverse(contexts)
96                         .stream()
97                         .map(this::closeContextService)
98                         .collect(Collectors.toList()));
99
100         return Futures.transform(servicesToBeClosed, (input) -> {
101             LOG.info("Closed clustering services for node {}", deviceInfo);
102             return null;
103         }, executorService);
104     }
105
106     @Nonnull
107     @Override
108     public ServiceGroupIdentifier getIdentifier() {
109         return deviceInfo.getServiceIdentifier();
110     }
111
112     @Override
113     public void close() {
114         if (ContextState.TERMINATION.equals(state)) {
115             LOG.debug("ContextChain for node {} is already in TERMINATION state.", deviceInfo);
116             return;
117         }
118
119         state = ContextState.TERMINATION;
120         contextChainMastershipWatcher.onSlaveRoleAcquired(deviceInfo);
121
122         // Close all connections to devices
123         auxiliaryConnections.forEach(connectionContext -> connectionContext.closeConnection(false));
124         auxiliaryConnections.clear();
125         primaryConnection.closeConnection(true);
126
127         // Close all contexts (device, statistics, rpc)
128         contexts.forEach(OFPContext::close);
129         contexts.clear();
130
131         // If we are still registered and we are not already closing, then close the registration
132         if (Objects.nonNull(registration)) {
133             try {
134                 LOG.info("Closing clustering services registration for node {}", deviceInfo);
135                 registration.close();
136                 registration = null;
137                 LOG.info("Closed clustering services registration for node {}", deviceInfo);
138             } catch (final Exception e) {
139                 LOG.warn("Failed to close clustering services registration for node {} with exception: ",
140                         deviceInfo, e);
141             }
142         }
143
144         // We are closing, so cleanup all managers now
145         deviceRemovedHandlers.forEach(h -> h.onDeviceRemoved(deviceInfo));
146         deviceRemovedHandlers.clear();
147     }
148
149     @Override
150     public void makeContextChainStateSlave() {
151         unMasterMe();
152         changeState(ContextChainState.WORKING_SLAVE);
153     }
154
155     @Override
156     public void registerServices(final ClusterSingletonServiceProvider clusterSingletonServiceProvider) {
157         LOG.info("Registering clustering services for node {}", deviceInfo);
158         state = ContextState.WORKING;
159         registration = Objects.requireNonNull(clusterSingletonServiceProvider
160                 .registerClusterSingletonService(this));
161         LOG.info("Registered clustering services for node {}", deviceInfo);
162     }
163
164     @Override
165     public void makeDeviceSlave() {
166         unMasterMe();
167
168         contexts.stream()
169                 .filter(DeviceContext.class::isInstance)
170                 .map(DeviceContext.class::cast)
171                 .findAny()
172                 .ifPresent(deviceContext -> Futures
173                         .addCallback(
174                                 deviceContext.makeDeviceSlave(),
175                                 new DeviceSlaveCallback(),
176                                 executorService));
177     }
178
179     @Override
180     public boolean isMastered(@Nonnull ContextChainMastershipState mastershipState) {
181         switch (mastershipState) {
182             case INITIAL_SUBMIT:
183                 LOG.debug("Device {}, initial submit OK.", deviceInfo);
184                 this.initialSubmitting.set(true);
185                 break;
186             case MASTER_ON_DEVICE:
187                 LOG.debug("Device {}, master state OK.", deviceInfo);
188                 this.masterStateOnDevice.set(true);
189                 break;
190             case INITIAL_GATHERING:
191                 LOG.debug("Device {}, initial gathering OK.", deviceInfo);
192                 this.initialGathering.set(true);
193                 break;
194             case RPC_REGISTRATION:
195                 LOG.debug("Device {}, RPC registration OK.", deviceInfo);
196                 this.rpcRegistration.set(true);
197                 break;
198             case INITIAL_FLOW_REGISTRY_FILL:
199                 // Flow registry fill is not mandatory to work as a master
200                 LOG.debug("Device {}, initial registry filling OK.", deviceInfo);
201                 this.registryFilling.set(true);
202             case CHECK:
203             default:
204         }
205
206         final boolean result = initialGathering.get() &&
207                 masterStateOnDevice.get() &&
208                 initialSubmitting.get() &&
209                 rpcRegistration.get();
210
211         if (result && mastershipState != ContextChainMastershipState.CHECK) {
212             LOG.info("Device {} is able to work as master{}",
213                     deviceInfo,
214                     registryFilling.get() ? "." : " WITHOUT flow registry !!!");
215             changeState(ContextChainState.WORKING_MASTER);
216         }
217
218         return result;
219     }
220
221     @Override
222     public boolean isClosing() {
223         return ContextState.TERMINATION.equals(state);
224     }
225
226     @Override
227     public boolean isPrepared() {
228         return this.initialGathering.get() &&
229                 this.masterStateOnDevice.get() &&
230                 this.rpcRegistration.get();
231     }
232
233     @Override
234     public boolean continueInitializationAfterReconciliation() {
235         return contexts.stream()
236                 .filter(StatisticsContext.class::isInstance)
237                 .map(StatisticsContext.class::cast)
238                 .findAny()
239                 .map(StatisticsContext::initialSubmitAfterReconciliation)
240                 .orElse(false) &&
241         isMastered(ContextChainMastershipState.INITIAL_SUBMIT);
242     }
243
244     @Override
245     public boolean addAuxiliaryConnection(@Nonnull ConnectionContext connectionContext) {
246         return (connectionContext.getFeatures().getAuxiliaryId() != 0)
247                 && (!ConnectionContext.CONNECTION_STATE.RIP.equals(primaryConnection.getConnectionState()))
248                 && auxiliaryConnections.add(connectionContext);
249     }
250
251     @Override
252     public boolean auxiliaryConnectionDropped(@Nonnull ConnectionContext connectionContext) {
253         return auxiliaryConnections.remove(connectionContext);
254     }
255
256     @Override
257     public void registerDeviceRemovedHandler(@Nonnull final DeviceRemovedHandler deviceRemovedHandler) {
258         deviceRemovedHandlers.add(deviceRemovedHandler);
259     }
260
261     private void changeState(final ContextChainState contextChainState) {
262         boolean propagate = this.contextChainState == ContextChainState.UNDEFINED;
263         this.contextChainState = contextChainState;
264
265         if (propagate) {
266             contexts.stream()
267                     .filter(ContextChainStateListener.class::isInstance)
268                     .map(ContextChainStateListener.class::cast)
269                     .forEach(listener -> listener.onStateAcquired(contextChainState));
270         }
271     }
272
273     private void initializeContextService(final OFPContext context) {
274         if (ConnectionContext.CONNECTION_STATE.WORKING.equals(primaryConnection.getConnectionState())) {
275             context.instantiateServiceInstance();
276         } else {
277             LOG.warn("Device connection for node {} doesn't exist anymore. Primary connection status: {}",
278                     deviceInfo,
279                     primaryConnection.getConnectionState());
280         }
281     }
282
283     private ListenableFuture<Void> closeContextService(final OFPContext context) {
284         if (ConnectionContext.CONNECTION_STATE.RIP.equals(primaryConnection.getConnectionState())) {
285             final String errMsg = String
286                     .format("Device connection for node %s doesn't exist anymore. Primary connection status: %s",
287                             deviceInfo.toString(),
288                             primaryConnection.getConnectionState());
289
290             return Futures.immediateFailedFuture(new ConnectionException(errMsg));
291         }
292
293         return context.closeServiceInstance();
294     }
295
296     private void unMasterMe() {
297         registryFilling.set(false);
298         initialSubmitting.set(false);
299         initialGathering.set(false);
300         masterStateOnDevice.set(false);
301         rpcRegistration.set(false);
302     }
303
304     private final class DeviceSlaveCallback implements FutureCallback<RpcResult<SetRoleOutput>> {
305         @Override
306         public void onSuccess(@Nullable final RpcResult<SetRoleOutput> result) {
307             contextChainMastershipWatcher.onSlaveRoleAcquired(deviceInfo);
308         }
309
310         @Override
311         public void onFailure(@Nonnull final Throwable t) {
312             contextChainMastershipWatcher.onSlaveRoleNotAcquired(deviceInfo);
313         }
314     }
315 }