Bump MRI upstreams
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / lifecycle / ContextChainImpl.java
1 /*
2  * Copyright (c) 2016 Pantheon Technologies s.r.o. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.lifecycle;
9
10 import com.google.common.collect.Lists;
11 import com.google.common.util.concurrent.Futures;
12 import com.google.common.util.concurrent.ListenableFuture;
13 import java.util.List;
14 import java.util.Objects;
15 import java.util.concurrent.CopyOnWriteArrayList;
16 import java.util.concurrent.Executor;
17 import java.util.concurrent.atomic.AtomicBoolean;
18 import java.util.concurrent.atomic.AtomicReference;
19 import java.util.stream.Collectors;
20 import org.eclipse.jdt.annotation.NonNull;
21 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
22 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
23 import org.opendaylight.openflowplugin.api.openflow.OFPContext;
24 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
25 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
26 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceRemovedHandler;
27 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChain;
28 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
29 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipWatcher;
30 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainState;
31 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainStateListener;
32 import org.opendaylight.openflowplugin.api.openflow.lifecycle.DeviceInitializationContext;
33 import org.opendaylight.openflowplugin.api.openflow.lifecycle.GuardedContext;
34 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ReconciliationFrameworkStep;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37
38 public class ContextChainImpl implements ContextChain {
39     private static final Logger LOG = LoggerFactory.getLogger(ContextChainImpl.class);
40     private static final Logger OF_EVENT_LOG = LoggerFactory.getLogger("OfEventLog");
41
42     private final AtomicBoolean masterStateOnDevice = new AtomicBoolean(false);
43     private final AtomicBoolean initialSubmitting = new AtomicBoolean(false);
44     private final AtomicBoolean rpcRegistration = new AtomicBoolean(false);
45     private final List<DeviceRemovedHandler> deviceRemovedHandlers = new CopyOnWriteArrayList<>();
46     private final List<GuardedContext> contexts = new CopyOnWriteArrayList<>();
47     private final List<ConnectionContext> auxiliaryConnections = new CopyOnWriteArrayList<>();
48     private final Executor executor;
49     private final ContextChainMastershipWatcher contextChainMastershipWatcher;
50     private final DeviceInfo deviceInfo;
51     private final ConnectionContext primaryConnection;
52     private final AtomicReference<ContextChainState> contextChainState =
53             new AtomicReference<>(ContextChainState.UNDEFINED);
54     private AutoCloseable registration;
55
56     ContextChainImpl(@NonNull final ContextChainMastershipWatcher contextChainMastershipWatcher,
57                      @NonNull final ConnectionContext connectionContext,
58                      @NonNull final Executor executor) {
59         this.contextChainMastershipWatcher = contextChainMastershipWatcher;
60         this.primaryConnection = connectionContext;
61         this.deviceInfo = connectionContext.getDeviceInfo();
62         this.executor = executor;
63     }
64
65     @Override
66     public <T extends OFPContext> void addContext(@NonNull final T context) {
67         contexts.add(new GuardedContextImpl(context));
68     }
69
70     @Override
71     @SuppressWarnings("checkstyle:IllegalCatch")
72     public void instantiateServiceInstance() {
73         OF_EVENT_LOG.debug("Clustering Service Invocation, Node: {}", deviceInfo);
74         try {
75             contexts.forEach(OFPContext::instantiateServiceInstance);
76             LOG.info("Started clustering services for node {}", deviceInfo);
77         } catch (final Exception ex) {
78             LOG.error("Not able to start clustering services for node {}", deviceInfo);
79             executor.execute(() -> contextChainMastershipWatcher
80                     .onNotAbleToStartMastershipMandatory(deviceInfo, ex.toString()));
81         }
82     }
83
84     @Override
85     public ListenableFuture<?> closeServiceInstance() {
86
87         contextChainMastershipWatcher.onSlaveRoleAcquired(deviceInfo);
88
89         final ListenableFuture<?> servicesToBeClosed = Futures.allAsList(Lists.reverse(contexts).stream()
90             .map(OFPContext::closeServiceInstance)
91             .collect(Collectors.toList()));
92
93         return Futures.transform(servicesToBeClosed, input -> {
94             OF_EVENT_LOG.debug("Closing clustering Services, Node: {}", deviceInfo);
95             LOG.info("Closed clustering services for node {}", deviceInfo);
96             return null;
97         }, executor);
98     }
99
100     @NonNull
101     @Override
102     public ServiceGroupIdentifier getIdentifier() {
103         return deviceInfo.getServiceIdentifier();
104     }
105
106     @Override
107     @SuppressWarnings("checkstyle:IllegalCatch")
108     public void close() {
109         if (ContextChainState.CLOSED.equals(contextChainState.get())) {
110             LOG.debug("ContextChain for node {} is already in TERMINATION state.", deviceInfo);
111             return;
112         }
113
114         contextChainState.set(ContextChainState.CLOSED);
115         unMasterMe();
116
117         // Close all connections to devices
118         auxiliaryConnections.forEach(connectionContext -> connectionContext.closeConnection(false));
119         auxiliaryConnections.clear();
120
121         // If we are still registered and we are not already closing, then close the registration
122         if (registration != null) {
123             try {
124                 registration.close();
125                 registration = null;
126                 LOG.info("Closed clustering services registration for node {}", deviceInfo);
127                 OF_EVENT_LOG.debug("Closed clustering services registration for node {}", deviceInfo);
128             } catch (final Exception e) {
129                 LOG.warn("Failed to close clustering services registration for node {} with exception: ",
130                         deviceInfo, e);
131             }
132         }
133
134
135         // Close all contexts (device, statistics, rpc)
136         contexts.forEach(OFPContext::close);
137         contexts.clear();
138
139         // We are closing, so cleanup all managers now
140         deviceRemovedHandlers.forEach(h -> h.onDeviceRemoved(deviceInfo));
141         deviceRemovedHandlers.clear();
142
143         primaryConnection.closeConnection(false);
144
145     }
146
147     @Override
148     public void makeContextChainStateSlave() {
149         unMasterMe();
150         changeMastershipState(ContextChainState.WORKING_SLAVE);
151     }
152
153     @Override
154     public void registerServices(final ClusterSingletonServiceProvider clusterSingletonServiceProvider) {
155         registration = Objects.requireNonNull(clusterSingletonServiceProvider
156                 .registerClusterSingletonService(this));
157         LOG.debug("Registered clustering services for node {}", deviceInfo);
158         OF_EVENT_LOG.debug("Registered Clustering Services, Node: {}", deviceInfo);
159     }
160
161     @Override
162     public boolean isMastered(@NonNull final ContextChainMastershipState mastershipState,
163                               final boolean inReconciliationFrameworkStep) {
164         switch (mastershipState) {
165             case INITIAL_SUBMIT:
166                 LOG.debug("Device {}, initial submit OK.", deviceInfo);
167                 OF_EVENT_LOG.debug("Device {}, initial submit OK.", deviceInfo);
168                 this.initialSubmitting.set(true);
169                 break;
170             case MASTER_ON_DEVICE:
171                 LOG.debug("Device {}, master state OK.", deviceInfo);
172                 OF_EVENT_LOG.debug("Device {}, master state OK.", deviceInfo);
173                 this.masterStateOnDevice.set(true);
174                 break;
175             case RPC_REGISTRATION:
176                 LOG.debug("Device {}, RPC registration OK.", deviceInfo);
177                 OF_EVENT_LOG.debug("Device {}, RPC registration OK.", deviceInfo);
178                 this.rpcRegistration.set(true);
179                 break;
180             case CHECK:
181                 // no operation
182                 break;
183             default:
184                 // no operation
185                 break;
186         }
187
188         final boolean result = masterStateOnDevice.get() && rpcRegistration.get()
189                 && inReconciliationFrameworkStep || initialSubmitting.get();
190
191         if (!inReconciliationFrameworkStep && result && mastershipState != ContextChainMastershipState.CHECK) {
192             LOG.info("Device {} is able to work as master", deviceInfo);
193             changeMastershipState(ContextChainState.WORKING_MASTER);
194         }
195
196         return result;
197     }
198
199     @Override
200     public boolean isClosing() {
201         return ContextChainState.CLOSED.equals(contextChainState.get());
202     }
203
204     @Override
205     public void continueInitializationAfterReconciliation() {
206         contexts.forEach(context -> {
207             if (context.map(ReconciliationFrameworkStep.class::isInstance)) {
208                 context.map(ReconciliationFrameworkStep.class::cast).continueInitializationAfterReconciliation();
209             }
210         });
211     }
212
213     @Override
214     public void initializeDevice() {
215         contexts.forEach(context -> {
216             if (context.map(DeviceInitializationContext.class::isInstance)) {
217                 context.map(DeviceInitializationContext.class::cast).initializeDevice();
218             }
219         });
220     }
221
222     @Override
223     public boolean addAuxiliaryConnection(@NonNull final ConnectionContext connectionContext) {
224         return connectionContext.getFeatures().getAuxiliaryId().toJava() != 0
225                 && !ConnectionContext.CONNECTION_STATE.RIP.equals(primaryConnection.getConnectionState())
226                 && auxiliaryConnections.add(connectionContext);
227     }
228
229     @Override
230     public boolean auxiliaryConnectionDropped(@NonNull final ConnectionContext connectionContext) {
231         return auxiliaryConnections.remove(connectionContext);
232     }
233
234     @Override
235     public void registerDeviceRemovedHandler(@NonNull final DeviceRemovedHandler deviceRemovedHandler) {
236         deviceRemovedHandlers.add(deviceRemovedHandler);
237     }
238
239     private void changeMastershipState(final ContextChainState newContextChainState) {
240         if (ContextChainState.CLOSED.equals(this.contextChainState.get())) {
241             return;
242         }
243
244         boolean propagate = ContextChainState.UNDEFINED.equals(this.contextChainState.get());
245         this.contextChainState.set(newContextChainState);
246
247         if (propagate) {
248             contexts.forEach(context -> {
249                 if (context.map(ContextChainStateListener.class::isInstance)) {
250                     context.map(ContextChainStateListener.class::cast).onStateAcquired(newContextChainState);
251                 }
252             });
253         }
254     }
255
256     private void unMasterMe() {
257         initialSubmitting.set(false);
258         masterStateOnDevice.set(false);
259         rpcRegistration.set(false);
260     }
261 }