f7145b82ccaa23c03351a2a42e1f00b089eaafca
[openflowplugin.git] / applications / forwardingrules-manager / src / main / java / org / opendaylight / openflowplugin / applications / frm / impl / AbstractListeningCommiter.java
1 /*
2  * Copyright (c) 2014, 2017 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.applications.frm.impl;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import java.util.Collection;
16 import org.eclipse.jdt.annotation.Nullable;
17 import org.opendaylight.mdsal.binding.api.DataBroker;
18 import org.opendaylight.mdsal.binding.api.DataObjectModification;
19 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
20 import org.opendaylight.mdsal.binding.api.DataTreeModification;
21 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
22 import org.opendaylight.openflowplugin.applications.frm.ForwardingRulesCommiter;
23 import org.opendaylight.openflowplugin.applications.frm.ForwardingRulesManager;
24 import org.opendaylight.openflowplugin.applications.frm.NodeConfigurator;
25 import org.opendaylight.serviceutils.srm.RecoverableListener;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
27 import org.opendaylight.yangtools.concepts.ListenerRegistration;
28 import org.opendaylight.yangtools.yang.binding.DataObject;
29 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33 /**
34  * AbstractChangeListner implemented basic {@link org.opendaylight.mdsal.binding.api.DataTreeModification}
35  * processing for flow node subDataObject (flows, groups and meters).
36  */
37 public abstract class AbstractListeningCommiter<T extends DataObject>
38         implements ForwardingRulesCommiter<T>, RecoverableListener {
39
40     private static final Logger LOG = LoggerFactory.getLogger(AbstractListeningCommiter.class);
41     final ForwardingRulesManager provider;
42     NodeConfigurator nodeConfigurator;
43     protected final DataBroker dataBroker;
44     protected final ListenerRegistrationHelper registrationHelper;
45     protected ListenerRegistration<AbstractListeningCommiter> listenerRegistration;
46
47     public AbstractListeningCommiter(final ForwardingRulesManager provider, final DataBroker dataBroker,
48                                      final ListenerRegistrationHelper registrationHelper) {
49         this.provider = requireNonNull(provider, "ForwardingRulesManager can not be null!");
50         this.nodeConfigurator = requireNonNull(provider.getNodeConfigurator(), "NodeConfigurator can not be null!");
51         this.dataBroker = requireNonNull(dataBroker, "DataBroker can not be null!");
52         this.registrationHelper = requireNonNull(registrationHelper, "registrationHelper can not be null!");
53         registerListener();
54         provider.addRecoverableListener(this);
55     }
56
57     @SuppressWarnings("checkstyle:IllegalCatch")
58     @Override
59     public void onDataTreeChanged(final Collection<DataTreeModification<T>> changes) {
60         LOG.trace("Received data changes :{}", requireNonNull(changes, "Changes may not be null!"));
61
62         for (DataTreeModification<T> change : changes) {
63             final InstanceIdentifier<T> key = change.getRootPath().getRootIdentifier();
64             final DataObjectModification<T> mod = change.getRootNode();
65             final InstanceIdentifier<FlowCapableNode> nodeIdent =
66                     key.firstIdentifierOf(FlowCapableNode.class);
67             try {
68                 if (preConfigurationCheck(nodeIdent)) {
69                     switch (mod.getModificationType()) {
70                         case DELETE:
71                             remove(key, mod.getDataBefore(), nodeIdent);
72                             break;
73                         case SUBTREE_MODIFIED:
74                             update(key, mod.getDataBefore(), mod.getDataAfter(), nodeIdent);
75                             break;
76                         case WRITE:
77                             if (mod.getDataBefore() == null) {
78                                 add(key, mod.getDataAfter(), nodeIdent);
79                             } else {
80                                 update(key, mod.getDataBefore(), mod.getDataAfter(), nodeIdent);
81                             }
82                             break;
83                         default:
84                             throw new
85                                     IllegalArgumentException("Unhandled modification type "
86                                     + mod.getModificationType());
87                     }
88                 } else {
89                     if (provider.isStaleMarkingEnabled()) {
90                         LOG.info("Stale-Marking ENABLED and switch {} is NOT connected, storing stale entities",
91                                 nodeIdent.toString());
92                         // Switch is NOT connected
93                         switch (mod.getModificationType()) {
94                             case DELETE:
95                                 createStaleMarkEntity(key, mod.getDataBefore(), nodeIdent);
96                                 break;
97                             case SUBTREE_MODIFIED:
98                                 break;
99                             case WRITE:
100                                 break;
101                             default:
102                                 throw new
103                                         IllegalArgumentException("Unhandled modification type "
104                                         + mod.getModificationType());
105                         }
106                     }
107                 }
108             } catch (RuntimeException e) {
109                 LOG.error("Failed to handle event {} key {} due to error ", mod.getModificationType(), key, e);
110             }
111         }
112     }
113
114     @Override
115     public void registerListener() {
116         final DataTreeIdentifier<T> treeId =
117                 DataTreeIdentifier.create(LogicalDatastoreType.CONFIGURATION, getWildCardPath());
118         Futures.addCallback(registrationHelper.checkedRegisterListener(treeId, this),
119                 new FutureCallback<ListenerRegistration<AbstractListeningCommiter>>() {
120                     @Override
121                     public void onSuccess(
122                             @Nullable final ListenerRegistration<AbstractListeningCommiter> flowListenerRegistration) {
123                         LOG.info("{} registered successfully", flowListenerRegistration.getInstance());
124                         listenerRegistration = flowListenerRegistration;
125                     }
126
127                     @Override
128                     public void onFailure(final Throwable throwable) {
129                         LOG.error("Registration failed ", throwable);
130                     }
131                 }, MoreExecutors.directExecutor());
132     }
133
134     /**
135      * Method return wildCardPath for Listener registration
136      * and for identify the correct KeyInstanceIdentifier from data.
137      */
138     protected abstract InstanceIdentifier<T> getWildCardPath();
139
140     private boolean preConfigurationCheck(final InstanceIdentifier<FlowCapableNode> nodeIdent) {
141         requireNonNull(nodeIdent, "FlowCapableNode identifier can not be null!");
142         // In single node cluster, node should be in local cache before we get any flow/group/meter
143         // data change event from data store. So first check should pass.
144         // In case of 3-node cluster, when shard leader changes, clustering will send blob of data
145         // present in operational data store and config data store. So ideally local node cache
146         // should get populated. But to handle a scenario where flow request comes before the blob
147         // of config/operational data gets processes, it won't find node in local cache and it will
148         // skip the flow/group/meter operational. This requires an addition check, where it reads
149         // node from operational data store and if it's present it calls flowNodeConnected to explicitly
150         // trigger the event of new node connected.
151         return provider.isNodeOwner(nodeIdent);
152     }
153 }