Merge "switch from FindBugs to SpotBugs"
[openflowplugin.git] / applications / reconciliation-framework / src / main / java / org / opendaylight / openflowplugin / applications / reconciliation / impl / ReconciliationManagerImpl.java
1 /*
2  * Copyright (c) 2017 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.applications.reconciliation.impl;
10
11 import com.google.common.base.Preconditions;
12 import com.google.common.collect.ImmutableMap;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.MoreExecutors;
16 import java.util.ArrayList;
17 import java.util.List;
18 import java.util.Map;
19 import java.util.concurrent.ConcurrentHashMap;
20 import java.util.concurrent.ConcurrentSkipListMap;
21 import java.util.concurrent.atomic.AtomicReference;
22 import java.util.stream.Collectors;
23 import javax.annotation.Nonnull;
24 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
25 import org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeException;
26 import org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeServiceManager;
27 import org.opendaylight.openflowplugin.api.openflow.mastership.ReconciliationFrameworkEvent;
28 import org.opendaylight.openflowplugin.applications.reconciliation.NotificationRegistration;
29 import org.opendaylight.openflowplugin.applications.reconciliation.ReconciliationManager;
30 import org.opendaylight.openflowplugin.applications.reconciliation.ReconciliationNotificationListener;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.rf.state.rev170713.ResultState;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 public class ReconciliationManagerImpl implements ReconciliationManager, ReconciliationFrameworkEvent {
36     private static final Logger LOG = LoggerFactory.getLogger(ReconciliationManagerImpl.class);
37
38     private final MastershipChangeServiceManager mastershipChangeServiceManager;
39     private final Map<Integer, List<ReconciliationNotificationListener>> registeredServices =
40             new ConcurrentSkipListMap<>();
41     private final Map<DeviceInfo, ListenableFuture<ResultState>> futureMap = new ConcurrentHashMap<>();
42     private final Map<ResultState, Integer> resultStateMap = new ConcurrentHashMap<>();
43     private final AtomicReference<ResultState> decidedResultState = new AtomicReference<>(ResultState.DONOTHING);
44
45     public ReconciliationManagerImpl(MastershipChangeServiceManager mastershipChangeServiceManager) {
46         this.mastershipChangeServiceManager = Preconditions
47                 .checkNotNull(mastershipChangeServiceManager, "MastershipChangeServiceManager can not be null!");
48     }
49
50     public void start() throws MastershipChangeException {
51         mastershipChangeServiceManager.reconciliationFrameworkRegistration(this);
52         LOG.info("ReconciliationManager has started successfully.");
53     }
54
55     @Override
56     public NotificationRegistration registerService(ReconciliationNotificationListener reconciliationTask) {
57         LOG.debug("Registered service {} with priority {} and intent {}", reconciliationTask.getName(),
58                   reconciliationTask.getPriority(), reconciliationTask.getResultState());
59         registeredServices.computeIfAbsent(reconciliationTask.getPriority(), services -> new ArrayList<>())
60                 .add(reconciliationTask);
61         ReconciliationServiceDelegate registration = new ReconciliationServiceDelegate(() -> {
62             LOG.debug("Service un-registered from Reconciliation framework {}", reconciliationTask.getName());
63             registeredServices.computeIfPresent(reconciliationTask.getPriority(), (priority, services) -> services)
64                     .remove(reconciliationTask);
65             decideResultState(reconciliationTask.getResultState());
66         });
67         decideResultState(reconciliationTask.getResultState());
68         return registration;
69     }
70
71     private void decideResultState(ResultState resultState) {
72         Integer count = resultStateMap.get(resultState);
73         resultStateMap.put(resultState, count = (count == null ? 1 : count + 1));
74         Map.Entry<ResultState, Integer> maxEntry = null;
75         for (Map.Entry<ResultState, Integer> entry : resultStateMap.entrySet()) {
76             if (maxEntry == null || entry.getValue() > maxEntry.getValue()) {
77                 maxEntry = entry;
78             }
79         }
80         decidedResultState.set(maxEntry.getKey());
81     }
82
83     @Override
84     public Map<Integer, List<ReconciliationNotificationListener>> getRegisteredServices() {
85         ImmutableMap.Builder<Integer, List<ReconciliationNotificationListener>> builder = ImmutableMap.builder();
86         builder.putAll(registeredServices);
87         return builder.build();
88     }
89
90     @Override
91     public void close() throws Exception {
92     }
93
94     @Override
95     public ListenableFuture<ResultState> onDevicePrepared(@Nonnull DeviceInfo node) {
96         LOG.debug("Triggering reconciliation for node : {}", node.getNodeId());
97         return futureMap.computeIfAbsent(node, value -> reconcileNode(node));
98     }
99
100     @Override
101     public ListenableFuture<Void> onDeviceDisconnected(@Nonnull DeviceInfo node) {
102         LOG.info("Stopping reconciliation for node {}", node.getNodeId());
103         if (futureMap.containsKey(node)) {
104             return cancelNodeReconciliation(node);
105         }
106         return Futures.immediateFuture(null);
107     }
108
109     private ListenableFuture<ResultState> reconcileNode(DeviceInfo node) {
110         ListenableFuture<ResultState> lastFuture = Futures.immediateFuture(null);
111         for (List<ReconciliationNotificationListener> services : registeredServices.values()) {
112             lastFuture = reconcileServices(lastFuture, services, node);
113         }
114         return lastFuture;
115     }
116
117     private ListenableFuture<ResultState> reconcileServices(ListenableFuture<ResultState> prevFuture,
118                                                             List<ReconciliationNotificationListener>
119                                                                     servicesForPriority,
120                                                             DeviceInfo node) {
121         return Futures.transformAsync(prevFuture, prevResult -> Futures.transform(Futures.allAsList(
122                 servicesForPriority.stream().map(service -> service.startReconciliation(node))
123                         .collect(Collectors.toList())), results -> decidedResultState.get(),
124                                                                                   MoreExecutors.directExecutor()),
125                                       MoreExecutors.directExecutor());
126     }
127
128     private ListenableFuture<Void> cancelNodeReconciliation(DeviceInfo node) {
129         ListenableFuture<Void> lastFuture = Futures.immediateFuture(null);
130         futureMap.get(node).cancel(true);
131         futureMap.remove(node);
132         for (List<ReconciliationNotificationListener> services : registeredServices.values()) {
133             lastFuture = cancelServiceReconciliation(lastFuture, services, node);
134         }
135         return lastFuture;
136     }
137
138     private ListenableFuture<Void> cancelServiceReconciliation(ListenableFuture<Void> prevFuture,
139                                                                List<ReconciliationNotificationListener>
140                                                                        servicesForPriority,
141                                                                DeviceInfo node) {
142         return Futures.transformAsync(prevFuture, prevResult -> Futures.transform(Futures.allAsList(
143                 servicesForPriority.stream().map(service -> service.endReconciliation(node))
144                         .collect(Collectors.toList())), results -> null, MoreExecutors.directExecutor()),
145                                       MoreExecutors.directExecutor());
146     }
147 }