Fix context chain initialization and SLAVE change
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / lifecycle / ContextChainHolderImpl.java
1 /*
2  * Copyright (c) 2016 Pantheon Technologies s.r.o. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.lifecycle;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import io.netty.util.HashedWheelTimer;
12 import java.util.Collections;
13 import java.util.HashMap;
14 import java.util.Map;
15 import java.util.Objects;
16 import java.util.Optional;
17 import java.util.concurrent.ExecutorService;
18 import java.util.concurrent.TimeUnit;
19 import java.util.concurrent.TimeoutException;
20 import javax.annotation.Nonnull;
21 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange;
22 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListenerRegistration;
23 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
24 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
25 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
26 import org.opendaylight.openflowplugin.api.openflow.OFPManager;
27 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
28 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionStatus;
29 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
30 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
31 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
32 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChain;
33 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainHolder;
34 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
35 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext;
36 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager;
37 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
38 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
39 import org.opendaylight.openflowplugin.impl.util.DeviceStateUtil;
40 import org.opendaylight.openflowplugin.impl.util.ItemScheduler;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
44 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
45 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48
49 public class ContextChainHolderImpl implements ContextChainHolder {
50     private static final Logger LOG = LoggerFactory.getLogger(ContextChainHolderImpl.class);
51
52     private static final String CONTEXT_CREATED_FOR_CONNECTION = " context created for connection: {}";
53     private static final long CHECK_ROLE_MASTER_TIMEOUT = 20000L;
54     private static final long CHECK_ROLE_MASTER_TOLERANCE = CHECK_ROLE_MASTER_TIMEOUT / 2;
55     private static final long REMOVE_DEVICE_FROM_DS_TIMEOUT = 5000L;
56     private static final String ASYNC_SERVICE_ENTITY_TYPE = "org.opendaylight.mdsal.AsyncServiceCloseEntityType";
57
58     private final Map<DeviceInfo, ContextChain> contextChainMap = Collections.synchronizedMap(new HashMap<>());
59     private final EntityOwnershipListenerRegistration eosListenerRegistration;
60     private final HashedWheelTimer timer;
61     private final ClusterSingletonServiceProvider singletonServiceProvider;
62     private final ItemScheduler<DeviceInfo, ContextChain> scheduler;
63     private final ExecutorService executorService;
64     private DeviceManager deviceManager;
65     private RpcManager rpcManager;
66     private StatisticsManager statisticsManager;
67
68     public ContextChainHolderImpl(final HashedWheelTimer timer,
69                                   final ExecutorService executorService,
70                                   final ClusterSingletonServiceProvider singletonServiceProvider,
71                                   final EntityOwnershipService entityOwnershipService) {
72         this.timer = timer;
73         this.singletonServiceProvider = singletonServiceProvider;
74         this.executorService = executorService;
75         this.eosListenerRegistration = Objects.requireNonNull(entityOwnershipService
76                 .registerListener(ASYNC_SERVICE_ENTITY_TYPE, this));
77
78         this.scheduler = new ItemScheduler<>(
79                 timer,
80                 CHECK_ROLE_MASTER_TIMEOUT,
81                 CHECK_ROLE_MASTER_TOLERANCE,
82                 ContextChain::makeDeviceSlave);
83     }
84
85     @Override
86     public <T extends OFPManager> void addManager(final T manager) {
87         if (Objects.isNull(deviceManager) && manager instanceof DeviceManager) {
88             LOG.trace("Context chain holder: Device manager OK.");
89             deviceManager = (DeviceManager) manager;
90         } else if (Objects.isNull(rpcManager) && manager instanceof RpcManager) {
91             LOG.trace("Context chain holder: RPC manager OK.");
92             rpcManager = (RpcManager) manager;
93         } else if (Objects.isNull(statisticsManager) && manager instanceof StatisticsManager) {
94             LOG.trace("Context chain holder: Statistics manager OK.");
95             statisticsManager = (StatisticsManager) manager;
96         }
97     }
98
99     @VisibleForTesting
100     ContextChain createContextChain(final ConnectionContext connectionContext) {
101         final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
102
103         final DeviceContext deviceContext = deviceManager.createContext(connectionContext);
104         deviceContext.registerMastershipChangeListener(this);
105         LOG.debug("Device" + CONTEXT_CREATED_FOR_CONNECTION, deviceInfo);
106
107         final RpcContext rpcContext = rpcManager.createContext(deviceContext);
108         rpcContext.registerMastershipChangeListener(this);
109         LOG.debug("RPC" + CONTEXT_CREATED_FOR_CONNECTION, deviceInfo);
110
111         final StatisticsContext statisticsContext = statisticsManager.createContext(deviceContext);
112         statisticsContext.registerMastershipChangeListener(this);
113         LOG.debug("Statistics" + CONTEXT_CREATED_FOR_CONNECTION, deviceInfo);
114
115         final ContextChain contextChain = new ContextChainImpl(this, connectionContext,
116                 executorService);
117         contextChain.registerDeviceRemovedHandler(deviceManager);
118         contextChain.registerDeviceRemovedHandler(rpcManager);
119         contextChain.registerDeviceRemovedHandler(statisticsManager);
120         contextChain.registerDeviceRemovedHandler(this);
121         contextChain.addContext(deviceContext);
122         contextChain.addContext(rpcContext);
123         contextChain.addContext(statisticsContext);
124         contextChainMap.put(deviceInfo, contextChain);
125         LOG.debug("Context chain" + CONTEXT_CREATED_FOR_CONNECTION, deviceInfo);
126
127         deviceContext.onPublished();
128         scheduler.add(deviceInfo, contextChain);
129         scheduler.startIfNotRunning();
130         LOG.info("Started timer for setting SLAVE role on node {} if no role will be set in {}s.",
131                 deviceInfo,
132                 CHECK_ROLE_MASTER_TIMEOUT / 1000L);
133
134         contextChain.registerServices(singletonServiceProvider);
135         return contextChain;
136     }
137
138     @Override
139     public ConnectionStatus deviceConnected(final ConnectionContext connectionContext) throws Exception {
140         final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
141         final ContextChain contextChain = contextChainMap.get(deviceInfo);
142         LOG.info("Device {} connected.", deviceInfo);
143
144         if (Objects.nonNull(contextChain)) {
145             if (contextChain.isClosing()) {
146                 LOG.warn("Device {} is already in termination state, closing all incoming connections.", deviceInfo);
147                 return ConnectionStatus.CLOSING;
148             }
149
150             if (contextChain.addAuxiliaryConnection(connectionContext)) {
151                 LOG.info("An auxiliary connection was added to device: {}", deviceInfo);
152                 return ConnectionStatus.MAY_CONTINUE;
153             }
154
155             LOG.warn("Device {} already connected. Closing all connection to the device.", deviceInfo);
156             destroyContextChain(deviceInfo);
157             return ConnectionStatus.ALREADY_CONNECTED;
158         }
159
160         LOG.debug("No context chain found for device: {}, creating new.", deviceInfo);
161         final ContextChain newContextChain = createContextChain(connectionContext);
162         LOG.debug("Successfully created context chain with identifier: {}", newContextChain.getIdentifier());
163         return ConnectionStatus.MAY_CONTINUE;
164     }
165
166     @Override
167     public void onNotAbleToStartMastership(final DeviceInfo deviceInfo, @Nonnull final String reason, final boolean mandatory) {
168         LOG.warn("Not able to set MASTER role on device {}, reason: {}", deviceInfo, reason);
169
170         if (!mandatory) {
171             return;
172         }
173
174         Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(contextChain -> {
175             LOG.warn("This mastering is mandatory, destroying context chain and closing connection for device {}.", deviceInfo);
176             destroyContextChain(deviceInfo);
177         });
178     }
179
180     @Override
181     public void onMasterRoleAcquired(final DeviceInfo deviceInfo, @Nonnull final ContextChainMastershipState mastershipState) {
182         scheduler.remove(deviceInfo);
183
184         Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(contextChain -> {
185             if (contextChain.isMastered(mastershipState)) {
186                 LOG.info("Role MASTER was granted to device {}", deviceInfo);
187                 deviceManager.sendNodeAddedNotification(deviceInfo.getNodeInstanceIdentifier());
188             }
189         });
190     }
191
192     @Override
193     public void onSlaveRoleAcquired(final DeviceInfo deviceInfo) {
194         scheduler.remove(deviceInfo);
195         Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(ContextChain::makeContextChainStateSlave);
196     }
197
198     @Override
199     public void onSlaveRoleNotAcquired(final DeviceInfo deviceInfo) {
200         Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(contextChain -> destroyContextChain(deviceInfo));
201     }
202
203     @Override
204     public void onDeviceDisconnected(final ConnectionContext connectionContext) {
205         final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
206
207         Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(contextChain -> {
208             if (contextChain.auxiliaryConnectionDropped(connectionContext)) {
209                 LOG.info("Auxiliary connection from device {} disconnected.", deviceInfo);
210             } else {
211                 LOG.info("Device {} disconnected.", deviceInfo);
212                 destroyContextChain(deviceInfo);
213             }
214         });
215     }
216
217     @VisibleForTesting
218     boolean checkAllManagers() {
219         return Objects.nonNull(deviceManager) && Objects.nonNull(rpcManager) && Objects.nonNull(statisticsManager);
220     }
221
222     @Override
223     public void close() throws Exception {
224         scheduler.close();
225         contextChainMap.keySet().forEach(this::destroyContextChain);
226         contextChainMap.clear();
227         eosListenerRegistration.close();
228     }
229
230     @Override
231     public void ownershipChanged(EntityOwnershipChange entityOwnershipChange) {
232         if (entityOwnershipChange.hasOwner()) {
233             return;
234         }
235
236         final String entityName = getEntityNameFromOwnershipChange(entityOwnershipChange);
237
238         if (Objects.nonNull(entityName)) {
239             LOG.debug("Entity {} has no owner", entityName);
240             final NodeId nodeId = new NodeId(entityName);
241
242             try {
243                 final KeyedInstanceIdentifier<Node, NodeKey> nodeInstanceIdentifier =
244                         DeviceStateUtil.createNodeInstanceIdentifier(nodeId);
245
246                 deviceManager.sendNodeRemovedNotification(nodeInstanceIdentifier);
247
248                 LOG.info("Removing device {} from operational DS", nodeId);
249                 deviceManager
250                         .removeDeviceFromOperationalDS(nodeInstanceIdentifier)
251                         .checkedGet(REMOVE_DEVICE_FROM_DS_TIMEOUT, TimeUnit.MILLISECONDS);
252             } catch (TimeoutException | TransactionCommitFailedException e) {
253                 LOG.info("Not able to remove device {} from operational DS. Probably removed by another cluster node.",
254                         nodeId);
255             }
256         }
257     }
258
259     private synchronized void destroyContextChain(final DeviceInfo deviceInfo) {
260         scheduler.remove(deviceInfo);
261
262         Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(contextChain -> {
263             deviceManager.sendNodeRemovedNotification(deviceInfo.getNodeInstanceIdentifier());
264             contextChain.close();
265         });
266     }
267
268     private String getEntityNameFromOwnershipChange(final EntityOwnershipChange entityOwnershipChange) {
269         final YangInstanceIdentifier.NodeIdentifierWithPredicates lastIdArgument =
270                 (YangInstanceIdentifier.NodeIdentifierWithPredicates) entityOwnershipChange
271                         .getEntity()
272                         .getId()
273                         .getLastPathArgument();
274
275         return lastIdArgument
276                 .getKeyValues()
277                 .values()
278                 .iterator()
279                 .next()
280                 .toString();
281     }
282
283     @Override
284     public void onDeviceRemoved(final DeviceInfo deviceInfo) {
285         scheduler.remove(deviceInfo);
286         contextChainMap.remove(deviceInfo);
287         LOG.debug("Context chain removed for node {}", deviceInfo);
288     }
289 }