Merge "Use ClassToInstanceMap instead of a HashMap"
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / lifecycle / ContextChainImpl.java
1 /*
2  * Copyright (c) 2016 Pantheon Technologies s.r.o. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.lifecycle;
9
10 import com.google.common.collect.Lists;
11 import com.google.common.util.concurrent.Futures;
12 import com.google.common.util.concurrent.ListenableFuture;
13 import java.util.List;
14 import java.util.Objects;
15 import java.util.concurrent.CopyOnWriteArrayList;
16 import java.util.concurrent.ExecutorService;
17 import java.util.concurrent.atomic.AtomicBoolean;
18 import java.util.concurrent.atomic.AtomicReference;
19 import java.util.stream.Collectors;
20 import javax.annotation.Nonnull;
21 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
22 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
23 import org.opendaylight.openflowplugin.api.openflow.OFPContext;
24 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
25 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
26 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceRemovedHandler;
27 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChain;
28 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
29 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipWatcher;
30 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainState;
31 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainStateListener;
32 import org.opendaylight.openflowplugin.api.openflow.lifecycle.GuardedContext;
33 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ReconciliationFrameworkStep;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 public class ContextChainImpl implements ContextChain {
38     private static final Logger LOG = LoggerFactory.getLogger(ContextChainImpl.class);
39
40     private final AtomicBoolean masterStateOnDevice = new AtomicBoolean(false);
41     private final AtomicBoolean initialGathering = new AtomicBoolean(false);
42     private final AtomicBoolean initialSubmitting = new AtomicBoolean(false);
43     private final AtomicBoolean registryFilling = new AtomicBoolean(false);
44     private final AtomicBoolean rpcRegistration = new AtomicBoolean(false);
45     private final List<DeviceRemovedHandler> deviceRemovedHandlers = new CopyOnWriteArrayList<>();
46     private final List<GuardedContext> contexts = new CopyOnWriteArrayList<>();
47     private final List<ConnectionContext> auxiliaryConnections = new CopyOnWriteArrayList<>();
48     private final ExecutorService executorService;
49     private final ContextChainMastershipWatcher contextChainMastershipWatcher;
50     private final DeviceInfo deviceInfo;
51     private final ConnectionContext primaryConnection;
52     private final AtomicReference<ContextChainState> contextChainState =
53             new AtomicReference<>(ContextChainState.UNDEFINED);
54     private AutoCloseable registration;
55
56     ContextChainImpl(@Nonnull final ContextChainMastershipWatcher contextChainMastershipWatcher,
57                      @Nonnull final ConnectionContext connectionContext,
58                      @Nonnull final ExecutorService executorService) {
59         this.contextChainMastershipWatcher = contextChainMastershipWatcher;
60         this.primaryConnection = connectionContext;
61         this.deviceInfo = connectionContext.getDeviceInfo();
62         this.executorService = executorService;
63     }
64
65     @Override
66     public <T extends OFPContext> void addContext(@Nonnull final T context) {
67         contexts.add(new GuardedContextImpl(context));
68     }
69
70     @Override
71     @SuppressWarnings("checkstyle:IllegalCatch")
72     public void instantiateServiceInstance() {
73         try {
74             contexts.forEach(OFPContext::instantiateServiceInstance);
75             LOG.info("Started clustering services for node {}", deviceInfo);
76         } catch (final Exception ex) {
77             LOG.warn("Not able to start clustering services for node {}", deviceInfo);
78             executorService.execute(() -> contextChainMastershipWatcher
79                     .onNotAbleToStartMastershipMandatory(deviceInfo, ex.toString()));
80         }
81     }
82
83     @Override
84     public ListenableFuture<?> closeServiceInstance() {
85
86         contextChainMastershipWatcher.onSlaveRoleAcquired(deviceInfo);
87
88         final ListenableFuture<?> servicesToBeClosed = Futures.allAsList(Lists.reverse(contexts).stream()
89             .map(OFPContext::closeServiceInstance)
90             .collect(Collectors.toList()));
91
92         return Futures.transform(servicesToBeClosed, (input) -> {
93             LOG.info("Closed clustering services for node {}", deviceInfo);
94             return null;
95         }, executorService);
96     }
97
98     @Nonnull
99     @Override
100     public ServiceGroupIdentifier getIdentifier() {
101         return deviceInfo.getServiceIdentifier();
102     }
103
104     @Override
105     @SuppressWarnings("checkstyle:IllegalCatch")
106     public void close() {
107         if (ContextChainState.CLOSED.equals(contextChainState.get())) {
108             LOG.debug("ContextChain for node {} is already in TERMINATION state.", deviceInfo);
109             return;
110         }
111
112         contextChainState.set(ContextChainState.CLOSED);
113         unMasterMe();
114
115         // Close all connections to devices
116         auxiliaryConnections.forEach(connectionContext -> connectionContext.closeConnection(false));
117         auxiliaryConnections.clear();
118
119         // If we are still registered and we are not already closing, then close the registration
120         if (registration != null) {
121             try {
122                 registration.close();
123                 registration = null;
124                 LOG.info("Closed clustering services registration for node {}", deviceInfo);
125             } catch (final Exception e) {
126                 LOG.warn("Failed to close clustering services registration for node {} with exception: ",
127                         deviceInfo, e);
128             }
129         }
130
131
132         // Close all contexts (device, statistics, rpc)
133         contexts.forEach(OFPContext::close);
134         contexts.clear();
135
136         // We are closing, so cleanup all managers now
137         deviceRemovedHandlers.forEach(h -> h.onDeviceRemoved(deviceInfo));
138         deviceRemovedHandlers.clear();
139
140         primaryConnection.closeConnection(false);
141
142     }
143
144     @Override
145     public void makeContextChainStateSlave() {
146         unMasterMe();
147         changeMastershipState(ContextChainState.WORKING_SLAVE);
148     }
149
150     @Override
151     public void registerServices(final ClusterSingletonServiceProvider clusterSingletonServiceProvider) {
152         registration = Objects.requireNonNull(clusterSingletonServiceProvider
153                 .registerClusterSingletonService(this));
154         LOG.debug("Registered clustering services for node {}", deviceInfo);
155     }
156
157     @Override
158     public boolean isMastered(@Nonnull ContextChainMastershipState mastershipState,
159                               boolean inReconciliationFrameworkStep) {
160         switch (mastershipState) {
161             case INITIAL_SUBMIT:
162                 LOG.debug("Device {}, initial submit OK.", deviceInfo);
163                 this.initialSubmitting.set(true);
164                 break;
165             case MASTER_ON_DEVICE:
166                 LOG.debug("Device {}, master state OK.", deviceInfo);
167                 this.masterStateOnDevice.set(true);
168                 break;
169             case INITIAL_GATHERING:
170                 LOG.debug("Device {}, initial gathering OK.", deviceInfo);
171                 this.initialGathering.set(true);
172                 break;
173             case RPC_REGISTRATION:
174                 LOG.debug("Device {}, RPC registration OK.", deviceInfo);
175                 this.rpcRegistration.set(true);
176                 break;
177             case INITIAL_FLOW_REGISTRY_FILL:
178                 // Flow registry fill is not mandatory to work as a master
179                 LOG.debug("Device {}, initial registry filling OK.", deviceInfo);
180                 this.registryFilling.set(true);
181                 break;
182             case CHECK:
183                 // no operation
184                 break;
185             default:
186                 // no operation
187                 break;
188         }
189
190         final boolean result = initialGathering.get() && masterStateOnDevice.get() && rpcRegistration.get()
191                 && inReconciliationFrameworkStep || initialSubmitting.get();
192
193         if (!inReconciliationFrameworkStep && result && mastershipState != ContextChainMastershipState.CHECK) {
194             LOG.info("Device {} is able to work as master{}", deviceInfo,
195                      registryFilling.get() ? "." : " WITHOUT flow registry !!!");
196             changeMastershipState(ContextChainState.WORKING_MASTER);
197         }
198
199         return result;
200     }
201
202     @Override
203     public boolean isClosing() {
204         return ContextChainState.CLOSED.equals(contextChainState.get());
205     }
206
207     @Override
208     public void continueInitializationAfterReconciliation() {
209         contexts.forEach(context -> {
210             if (context.map(ReconciliationFrameworkStep.class::isInstance)) {
211                 context.map(ReconciliationFrameworkStep.class::cast).continueInitializationAfterReconciliation();
212             }
213         });
214     }
215
216     @Override
217     public boolean addAuxiliaryConnection(@Nonnull ConnectionContext connectionContext) {
218         return connectionContext.getFeatures().getAuxiliaryId() != 0
219                 && !ConnectionContext.CONNECTION_STATE.RIP.equals(primaryConnection.getConnectionState())
220                 && auxiliaryConnections.add(connectionContext);
221     }
222
223     @Override
224     public boolean auxiliaryConnectionDropped(@Nonnull ConnectionContext connectionContext) {
225         return auxiliaryConnections.remove(connectionContext);
226     }
227
228     @Override
229     public void registerDeviceRemovedHandler(@Nonnull final DeviceRemovedHandler deviceRemovedHandler) {
230         deviceRemovedHandlers.add(deviceRemovedHandler);
231     }
232
233     private void changeMastershipState(final ContextChainState newContextChainState) {
234         if (ContextChainState.CLOSED.equals(this.contextChainState.get())) {
235             return;
236         }
237
238         boolean propagate = ContextChainState.UNDEFINED.equals(this.contextChainState.get());
239         this.contextChainState.set(newContextChainState);
240
241         if (propagate) {
242             contexts.forEach(context -> {
243                 if (context.map(ContextChainStateListener.class::isInstance)) {
244                     context.map(ContextChainStateListener.class::cast).onStateAcquired(newContextChainState);
245                 }
246             });
247         }
248     }
249
250     private void unMasterMe() {
251         registryFilling.set(false);
252         initialSubmitting.set(false);
253         initialGathering.set(false);
254         masterStateOnDevice.set(false);
255         rpcRegistration.set(false);
256     }
257 }