Merge "use annotations instead XML for BP in impl"
[openflowplugin.git] / applications / arbitratorreconciliation / impl / src / main / java / org / opendaylight / openflowplugin / applications / arbitratorreconciliation / impl / ArbitratorReconciliationManagerImpl.java
1 /*
2  * Copyright (c) 2018 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.applications.arbitratorreconciliation.impl;
10
11 import com.google.common.base.Function;
12 import com.google.common.base.Preconditions;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.JdkFutureAdapters;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import java.math.BigInteger;
19 import java.util.ArrayList;
20 import java.util.Collections;
21 import java.util.List;
22 import java.util.Map;
23 import java.util.concurrent.Callable;
24 import java.util.concurrent.ConcurrentHashMap;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.atomic.AtomicLong;
29 import javax.annotation.PostConstruct;
30 import javax.annotation.PreDestroy;
31 import javax.inject.Inject;
32 import javax.inject.Singleton;
33 import org.apache.aries.blueprint.annotation.service.Reference;
34 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration;
35 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
36 import org.opendaylight.openflowplugin.api.OFConstants;
37 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
38 import org.opendaylight.openflowplugin.applications.reconciliation.NotificationRegistration;
39 import org.opendaylight.openflowplugin.applications.reconciliation.ReconciliationManager;
40 import org.opendaylight.openflowplugin.applications.reconciliation.ReconciliationNotificationListener;
41 import org.opendaylight.serviceutils.upgrade.UpgradeState;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupId;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupTypes;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupBuilder;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeContext;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesInput;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesInputBuilder;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesOutput;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleInput;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleInputBuilder;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleOutput;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.SalBundleService;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.Messages;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.MessagesBuilder;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.Message;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.MessageBuilder;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveFlowCaseBuilder;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveGroupCaseBuilder;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.remove.flow._case.RemoveFlowCaseDataBuilder;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.remove.group._case.RemoveGroupCaseDataBuilder;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleControlType;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleFlags;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleId;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.ArbitratorReconcileService;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.CommitActiveBundleInput;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.CommitActiveBundleOutput;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.CommitActiveBundleOutputBuilder;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleInput;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleOutput;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleOutputBuilder;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.rf.state.rev170713.ResultState;
80 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
81 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
82 import org.opendaylight.yangtools.yang.common.RpcError;
83 import org.opendaylight.yangtools.yang.common.RpcResult;
84 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
85 import org.slf4j.Logger;
86 import org.slf4j.LoggerFactory;
87
88 @Singleton
89 public class ArbitratorReconciliationManagerImpl implements ArbitratorReconcileService,
90         ReconciliationNotificationListener, AutoCloseable {
91
92     private static final Logger LOG = LoggerFactory.getLogger(ArbitratorReconciliationManagerImpl.class);
93     private static final int THREAD_POOL_SIZE = 4;
94     private static final AtomicLong BUNDLE_ID = new AtomicLong();
95     private static final BundleFlags BUNDLE_FLAGS = new BundleFlags(true, true);
96     private static final int ARBITRATOR_RECONCILIATION_PRIORITY = Integer
97             .getInteger("arbitrator.reconciliation.manager.priority", 0/*default*/);
98     private static final String SERVICE_NAME = "ArbitratorReconciliationManager";
99     private static final String SEPARATOR = ":";
100
101     private final SalBundleService salBundleService;
102     private final ReconciliationManager reconciliationManager;
103     private final RoutedRpcRegistration routedRpcReg;
104     private final UpgradeState upgradeState;
105     private NotificationRegistration registration;
106     private final ExecutorService executor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
107     private final Map<BigInteger, BundleDetails> bundleIdMap = new ConcurrentHashMap<>();
108
109     @Inject
110     public ArbitratorReconciliationManagerImpl(@Reference RpcProviderRegistry rpcRegistry,
111             @Reference ReconciliationManager reconciliationManager, @Reference UpgradeState upgradeState) {
112         Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry cannot be null !");
113         this.reconciliationManager = Preconditions.checkNotNull(reconciliationManager,
114                 "ReconciliationManager cannot be null!");
115         this.salBundleService = Preconditions.checkNotNull(rpcRegistry.getRpcService(SalBundleService.class),
116                 "RPC SalBundlService not found.");
117         this.routedRpcReg = rpcRegistry.addRoutedRpcImplementation(ArbitratorReconcileService.class,
118                 this);
119         this.upgradeState = Preconditions.checkNotNull(upgradeState, "UpgradeState cannot be null!");
120     }
121
122     @PostConstruct
123     public void start() {
124         registration = reconciliationManager.registerService(this);
125         LOG.info("ArbitratorReconciliationManager has started successfully.");
126     }
127
128     @Override
129     @PreDestroy
130     public void close() throws Exception {
131         executor.shutdown();
132         if (registration != null) {
133             registration.close();
134             registration = null;
135         }
136     }
137
138     @Override
139     public ListenableFuture<RpcResult<CommitActiveBundleOutput>> commitActiveBundle(
140             CommitActiveBundleInput input) {
141         BigInteger nodeId = input.getNodeId();
142         if (bundleIdMap.containsKey(nodeId)) {
143             BundleId bundleId = bundleIdMap.get(nodeId).getBundleId();
144             if (bundleId != null) {
145                 final ControlBundleInput commitBundleInput = new ControlBundleInputBuilder()
146                         .setNode(input.getNode()).setBundleId(bundleId)
147                         .setFlags(BUNDLE_FLAGS)
148                         .setType(BundleControlType.ONFBCTCOMMITREQUEST).build();
149                 ListenableFuture<RpcResult<ControlBundleOutput>> rpcResult = salBundleService
150                         .controlBundle(commitBundleInput);
151                 bundleIdMap.put(nodeId, new BundleDetails(bundleId, rpcResult));
152                 Futures.addCallback(rpcResult, new CommitActiveBundleCallback(nodeId),
153                         MoreExecutors.directExecutor());
154                 return Futures.transform(
155                         rpcResult,
156                         this.<ControlBundleOutput>createRpcResultCondenser("committed active bundle"),
157                         MoreExecutors.directExecutor());
158             }
159         }
160         return RpcResultBuilder.success(new CommitActiveBundleOutputBuilder()
161                 .setResult(null).build())
162                 .withRpcErrors(Collections.singleton(RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION,
163                 null, "No active bundle found for the node" + nodeId.toString()))).buildFuture();
164     }
165
166     @Override
167     public ListenableFuture<RpcResult<GetActiveBundleOutput>> getActiveBundle(GetActiveBundleInput input) {
168         BigInteger nodeId = input.getNodeId();
169         BundleDetails bundleDetails = bundleIdMap.get(nodeId);
170         if (bundleDetails != null) {
171             try {
172                 //This blocking call is used to prevent the applications from pushing flows and groups via the default
173                 // pipeline when the commit bundle is ongoing.
174                 bundleDetails.getResult().get();
175                 return RpcResultBuilder.success(new GetActiveBundleOutputBuilder()
176                         .setResult(bundleDetails.getBundleId()).build()).buildFuture();
177             } catch (InterruptedException | ExecutionException | NullPointerException e) {
178                 return RpcResultBuilder.<GetActiveBundleOutput>failed()
179                         .withRpcErrors(Collections.singleton(RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION,
180                                 null, e.getMessage()))).buildFuture();
181             }
182         }
183         return RpcResultBuilder.success(new GetActiveBundleOutputBuilder()
184                 .setResult(null).build()).buildFuture();
185     }
186
187     @Override
188     public ListenableFuture<Boolean> startReconciliation(DeviceInfo node) {
189         registerRpc(node);
190         if (upgradeState.isUpgradeInProgress()) {
191             LOG.trace("Starting arbitrator reconciliation for node {}", node.getDatapathId());
192             return reconcileConfiguration(node);
193         }
194         LOG.trace("arbitrator reconciliation is disabled");
195         return Futures.immediateFuture(true);
196     }
197
198     @Override
199     public ListenableFuture<Boolean> endReconciliation(DeviceInfo node) {
200         BigInteger datapathId = node.getDatapathId();
201         LOG.trace("Stopping arbitrator reconciliation for node {}", datapathId);
202         bundleIdMap.remove(datapathId);
203         deregisterRpc(node);
204         return Futures.immediateFuture(true);
205     }
206
207     @Override
208     public int getPriority() {
209         return ARBITRATOR_RECONCILIATION_PRIORITY;
210     }
211
212     @Override
213     public String getName() {
214         return SERVICE_NAME;
215     }
216
217     @Override
218     public ResultState getResultState() {
219         return ResultState.DONOTHING;
220     }
221
222     private ListenableFuture<Boolean> reconcileConfiguration(DeviceInfo node) {
223         LOG.info("Triggering arbitrator reconciliation for device {}", node.getDatapathId());
224         ArbitratorReconciliationTask upgradeReconTask = new ArbitratorReconciliationTask(node);
225         return JdkFutureAdapters.listenInPoolThread(executor.submit(upgradeReconTask));
226     }
227
228     private Messages createMessages(final NodeRef nodeRef) {
229         final List<Message> messages = new ArrayList<>();
230         messages.add(new MessageBuilder().setNode(nodeRef).setBundleInnerMessage(
231                 new BundleRemoveFlowCaseBuilder()
232                 .setRemoveFlowCaseData(new RemoveFlowCaseDataBuilder(getDeleteAllFlow()).build())
233                 .build()).build());
234
235         messages.add(new MessageBuilder().setNode(nodeRef).setBundleInnerMessage(
236                 new BundleRemoveGroupCaseBuilder()
237                 .setRemoveGroupCaseData(new RemoveGroupCaseDataBuilder(getDeleteAllGroup()).build())
238                 .build()).build());
239         LOG.debug("The size of the flows and group messages created in createMessage() {}", messages.size());
240         return new MessagesBuilder().setMessage(messages).build();
241     }
242
243     private Flow getDeleteAllFlow() {
244         final FlowBuilder flowBuilder = new FlowBuilder();
245         flowBuilder.setTableId(OFConstants.OFPTT_ALL);
246         return flowBuilder.build();
247     }
248
249     private Group getDeleteAllGroup() {
250         final GroupBuilder groupBuilder = new GroupBuilder();
251         groupBuilder.setGroupType(GroupTypes.GroupAll);
252         groupBuilder.setGroupId(new GroupId(OFConstants.OFPG_ALL));
253         return groupBuilder.build();
254     }
255
256     private class ArbitratorReconciliationTask implements Callable<Boolean> {
257         final DeviceInfo deviceInfo;
258
259         ArbitratorReconciliationTask(final DeviceInfo deviceInfo) {
260             this.deviceInfo = deviceInfo;
261         }
262
263         @Override
264         public Boolean call() {
265             InstanceIdentifier<FlowCapableNode> nodeIdentity = deviceInfo.getNodeInstanceIdentifier()
266                     .augmentation(FlowCapableNode.class);
267             String node = nodeIdentity.firstKeyOf(Node.class).getId().getValue();
268             BundleId bundleIdValue = new BundleId(BUNDLE_ID.getAndIncrement());
269             LOG.debug("Triggering arbitrator reconciliation for device :{}", node);
270             final NodeRef nodeRef = new NodeRef(nodeIdentity.firstIdentifierOf(Node.class));
271
272             final ControlBundleInput closeBundleInput = new ControlBundleInputBuilder().setNode(nodeRef)
273                     .setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
274                     .setType(BundleControlType.ONFBCTCLOSEREQUEST).build();
275
276             final ControlBundleInput openBundleInput = new ControlBundleInputBuilder().setNode(nodeRef)
277                     .setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
278                     .setType(BundleControlType.ONFBCTOPENREQUEST).build();
279
280             final AddBundleMessagesInput addBundleMessagesInput = new AddBundleMessagesInputBuilder()
281                     .setNode(nodeRef).setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
282                     .setMessages(createMessages(nodeRef)).build();
283
284             ListenableFuture<RpcResult<ControlBundleOutput>> closeBundle = salBundleService
285                     .controlBundle(closeBundleInput);
286
287             ListenableFuture<RpcResult<ControlBundleOutput>> openBundleMessagesFuture = Futures
288                     .transformAsync(closeBundle, rpcResult -> salBundleService
289                             .controlBundle(openBundleInput), MoreExecutors.directExecutor());
290
291             ListenableFuture<RpcResult<AddBundleMessagesOutput>> addBundleMessagesFuture = Futures
292                     .transformAsync(openBundleMessagesFuture, rpcResult -> {
293                         if (rpcResult.isSuccessful()) {
294                             return salBundleService
295                                     .addBundleMessages(addBundleMessagesInput);
296                         }
297                         return Futures.immediateFuture(null);
298                     }, MoreExecutors.directExecutor());
299             BigInteger nodeId = getDpnIdFromNodeName(node);
300             try {
301                 if (addBundleMessagesFuture.get().isSuccessful()) {
302                     bundleIdMap.put(nodeId, new BundleDetails(bundleIdValue,
303                             Futures.immediateFuture(null)));
304                     LOG.debug("Arbitrator reconciliation initial task has been completed for node {} and open up"
305                             + " for application programming.", nodeId);
306                     return true;
307                 } else {
308                     LOG.error("Error while performing arbitrator reconciliation for device:{}", nodeId);
309                     return false;
310                 }
311             } catch (InterruptedException | ExecutionException e) {
312                 LOG.error("Error while performing arbitrator reconciliation for device:{}", nodeId, e);
313                 return false;
314             }
315         }
316     }
317
318     public final class CommitActiveBundleCallback implements FutureCallback<RpcResult<?>> {
319         private final BigInteger nodeId;
320
321         private CommitActiveBundleCallback(final BigInteger nodeId) {
322             this.nodeId = nodeId;
323         }
324
325         @Override
326         public void onSuccess(RpcResult<?> rpcResult) {
327             LOG.debug("Completed arbitrator reconciliation for device:{}", nodeId);
328             bundleIdMap.remove(nodeId);
329         }
330
331         @Override
332         public void onFailure(Throwable throwable) {
333             LOG.error("Error while performing arbitrator reconciliation for device {}",
334                     nodeId, throwable);
335         }
336     }
337
338     private <D> Function<RpcResult<D>,
339             RpcResult<CommitActiveBundleOutput>> createRpcResultCondenser(final String action) {
340         return input -> {
341             final RpcResultBuilder<CommitActiveBundleOutput> resultSink;
342             if (input != null) {
343                 List<RpcError> errors = new ArrayList<>();
344                 if (!input.isSuccessful()) {
345                     errors.addAll(input.getErrors());
346                     resultSink = RpcResultBuilder.<CommitActiveBundleOutput>failed().withRpcErrors(errors);
347                 } else {
348                     resultSink = RpcResultBuilder.success();
349                 }
350             } else {
351                 resultSink = RpcResultBuilder.<CommitActiveBundleOutput>failed()
352                         .withError(RpcError.ErrorType.APPLICATION, "action of " + action + " failed");
353             }
354             return resultSink.build();
355         };
356     }
357
358     private void registerRpc(DeviceInfo node) {
359         KeyedInstanceIdentifier<Node, NodeKey> path = InstanceIdentifier.create(Nodes.class)
360                 .child(Node.class, new NodeKey(node.getNodeId()));
361         LOG.debug("The path is registered : {}", path);
362         routedRpcReg.registerPath(NodeContext.class, path);
363     }
364
365     private void deregisterRpc(DeviceInfo node) {
366         KeyedInstanceIdentifier<Node, NodeKey> path = InstanceIdentifier.create(Nodes.class).child(Node.class,
367                 new NodeKey(node.getNodeId()));
368         LOG.debug("The path is unregistered : {}", path);
369         routedRpcReg.unregisterPath(NodeContext.class, path);
370     }
371
372     private static class BundleDetails {
373         private final BundleId bundleId;
374         private final ListenableFuture<RpcResult<ControlBundleOutput>> result;
375
376         BundleDetails(BundleId bundleId, ListenableFuture<RpcResult<ControlBundleOutput>> result) {
377             this.bundleId = bundleId;
378             this.result = result;
379         }
380
381         public BundleId getBundleId() {
382             return bundleId;
383         }
384
385         public ListenableFuture<RpcResult<ControlBundleOutput>> getResult() {
386             return result;
387         }
388     }
389
390     private BigInteger getDpnIdFromNodeName(String nodeName) {
391         String dpnId = nodeName.substring(nodeName.lastIndexOf(SEPARATOR) + 1);
392         return new BigInteger(dpnId);
393     }
394
395 }