2 * Copyright (c) 2011,2012 Big Switch Networks, Inc.
4 * Licensed under the Eclipse Public License, Version 1.0 (the
5 * "License"); you may not use this file except in compliance with the
6 * License. You may obtain a copy of the License at
8 * http://www.eclipse.org/legal/epl-v10.html
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 * implied. See the License for the specific language governing
14 * permissions and limitations under the License.
16 * This file incorporates work covered by the following copyright and
19 * Originally created by David Erickson, Stanford University
21 * Licensed under the Apache License, Version 2.0 (the "License");
22 * you may not use this file except in compliance with the
23 * License. You may obtain a copy of the License at
25 * http://www.apache.org/licenses/LICENSE-2.0
27 * Unless required by applicable law or agreed to in writing,
28 * software distributed under the License is distributed on an "AS
29 * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
30 * express or implied. See the License for the specific language
31 * governing permissions and limitations under the License.
34 package org.opendaylight.controller.hosttracker.internal;
36 import static org.opendaylight.controller.hosttracker.internal.DeviceManagerImpl.DeviceUpdate.Change.ADD;
37 import static org.opendaylight.controller.hosttracker.internal.DeviceManagerImpl.DeviceUpdate.Change.CHANGE;
38 import static org.opendaylight.controller.hosttracker.internal.DeviceManagerImpl.DeviceUpdate.Change.DELETE;
40 import java.net.InetAddress;
41 import java.net.UnknownHostException;
42 import java.util.ArrayList;
43 import java.util.Calendar;
44 import java.util.Collection;
45 import java.util.Collections;
46 import java.util.Comparator;
47 import java.util.Date;
48 import java.util.EnumSet;
49 import java.util.HashMap;
50 import java.util.HashSet;
51 import java.util.Iterator;
52 import java.util.LinkedList;
53 import java.util.List;
55 import java.util.Queue;
57 import java.util.concurrent.ConcurrentHashMap;
58 import java.util.concurrent.Executors;
59 import java.util.concurrent.Future;
60 import java.util.concurrent.ScheduledExecutorService;
61 import java.util.concurrent.TimeUnit;
63 import org.opendaylight.controller.hosttracker.Entity;
64 import org.opendaylight.controller.hosttracker.IDevice;
65 import org.opendaylight.controller.hosttracker.IDeviceListener;
66 import org.opendaylight.controller.hosttracker.IDeviceService;
67 import org.opendaylight.controller.hosttracker.IEntityClass;
68 import org.opendaylight.controller.hosttracker.IEntityClassListener;
69 import org.opendaylight.controller.hosttracker.IEntityClassifierService;
70 import org.opendaylight.controller.hosttracker.IfIptoHost;
71 import org.opendaylight.controller.hosttracker.IfNewHostNotify;
72 import org.opendaylight.controller.hosttracker.SwitchPort;
73 import org.opendaylight.controller.hosttracker.hostAware.HostNodeConnector;
74 import org.opendaylight.controller.sal.core.ConstructionException;
75 import org.opendaylight.controller.sal.core.Edge;
76 import org.opendaylight.controller.sal.core.Host;
77 import org.opendaylight.controller.sal.core.Node;
78 import org.opendaylight.controller.sal.core.NodeConnector;
79 import org.opendaylight.controller.sal.core.NodeConnector.NodeConnectorIDType;
80 import org.opendaylight.controller.sal.core.Tier;
81 import org.opendaylight.controller.sal.core.UpdateType;
82 import org.opendaylight.controller.sal.packet.ARP;
83 import org.opendaylight.controller.sal.packet.Ethernet;
84 import org.opendaylight.controller.sal.packet.IDataPacketService;
85 import org.opendaylight.controller.sal.packet.IListenDataPacket;
86 import org.opendaylight.controller.sal.packet.Packet;
87 import org.opendaylight.controller.sal.packet.PacketResult;
88 import org.opendaylight.controller.sal.packet.RawPacket;
89 import org.opendaylight.controller.sal.packet.address.DataLinkAddress;
90 import org.opendaylight.controller.sal.packet.address.EthernetAddress;
91 import org.opendaylight.controller.sal.topology.TopoEdgeUpdate;
92 import org.opendaylight.controller.sal.utils.HexEncode;
93 import org.opendaylight.controller.sal.utils.ListenerDispatcher;
94 import org.opendaylight.controller.sal.utils.MultiIterator;
95 import org.opendaylight.controller.sal.utils.NetUtils;
96 import org.opendaylight.controller.sal.utils.SingletonTask;
97 import org.opendaylight.controller.sal.utils.Status;
98 import org.opendaylight.controller.sal.utils.StatusCode;
99 import org.opendaylight.controller.switchmanager.ISwitchManager;
100 import org.opendaylight.controller.topologymanager.ITopologyManager;
101 import org.opendaylight.controller.topologymanager.ITopologyManagerAware;
102 import org.slf4j.Logger;
103 import org.slf4j.LoggerFactory;
106 * DeviceManager creates Devices based upon MAC addresses seen in the network.
107 * It tracks any network addresses mapped to the Device, and its location within
112 public class DeviceManagerImpl implements IDeviceService, IEntityClassListener,
113 IListenDataPacket, ITopologyManagerAware, IfIptoHost {
114 protected static Logger logger = LoggerFactory
115 .getLogger(DeviceManagerImpl.class);
117 public static final String MODULE_NAME = "devicemanager";
119 // protected ITopologyService topology;
120 // protected IStorageSourceService storageSource;
121 // protected IRestApiService restApi;
122 // protected IThreadPoolService threadPool;
123 // protected IFlowReconcileService flowReconcileMgr;
124 // protected IFlowReconcileEngineService flowReconcileEngine;
125 // protected IDebugCounterService debugCounters;
126 // private ISyncService syncService;
127 // private IStoreClient<String,DeviceSyncRepresentation> storeClient;
128 // private DeviceSyncManager deviceSyncManager;
130 private ITopologyManager topology;
131 private ISwitchManager switchManager = null;
132 private IDataPacketService dataPacketService = null;
134 public static final String CNT_INCOMING = MODULE_NAME + "-incoming";
135 public static final String CNT_RECONCILE_REQUEST = MODULE_NAME
136 + "-reconcileRequest";
137 public static final String CNT_RECONCILE_NO_SOURCE = MODULE_NAME
138 + "-reconcileNoSourceDevice";
139 public static final String CNT_RECONCILE_NO_DEST = MODULE_NAME
140 + "-reconcileNoDestDevice";
141 public static final String CNT_BROADCAST_SOURCE = MODULE_NAME
142 + "-broadcastSource";
143 public static final String CNT_NO_SOURCE = MODULE_NAME + "-noSourceDevice";
144 public static final String CNT_NO_DEST = MODULE_NAME + "-noDestDevice";
145 public static final String CNT_DHCP_CLIENT_NAME_SNOOPED = MODULE_NAME
146 + "-dhcpClientNameSnooped";
147 public static final String CNT_DEVICE_ON_INTERAL_PORT_NOT_LEARNED = MODULE_NAME
148 + "-deviceOnInternalPortNotLearned";
149 public static final String CNT_PACKET_NOT_ALLOWED = MODULE_NAME
150 + "-packetNotAllowed";
151 public static final String CNT_NEW_DEVICE = MODULE_NAME + "-newDevice";
152 public static final String CNT_PACKET_ON_INTERNAL_PORT_FOR_KNOWN_DEVICE = MODULE_NAME
153 + "-packetOnInternalPortForKnownDevice";
154 public static final String CNT_NEW_ENTITY = MODULE_NAME + "-newEntity";
155 public static final String CNT_DEVICE_CHANGED = MODULE_NAME
157 public static final String CNT_DEVICE_MOVED = MODULE_NAME + "-deviceMoved";
158 public static final String CNT_CLEANUP_ENTITIES_RUNS = MODULE_NAME
159 + "-cleanupEntitiesRuns";
160 public static final String CNT_ENTITY_REMOVED_TIMEOUT = MODULE_NAME
161 + "-entityRemovedTimeout";
162 public static final String CNT_DEVICE_DELETED = MODULE_NAME
164 public static final String CNT_DEVICE_RECLASSIFY_DELETE = MODULE_NAME
165 + "-deviceReclassifyDelete";
166 public static final String CNT_DEVICE_STORED = MODULE_NAME
168 public static final String CNT_DEVICE_STORE_THROTTLED = MODULE_NAME
169 + "-deviceStoreThrottled";
170 public static final String CNT_DEVICE_REMOVED_FROM_STORE = MODULE_NAME
171 + "-deviceRemovedFromStore";
172 public static final String CNT_SYNC_EXCEPTION = MODULE_NAME
174 public static final String CNT_DEVICES_FROM_STORE = MODULE_NAME
175 + "-devicesFromStore";
176 public static final String CNT_CONSOLIDATE_STORE_RUNS = MODULE_NAME
177 + "-consolidateStoreRuns";
178 public static final String CNT_CONSOLIDATE_STORE_DEVICES_REMOVED = MODULE_NAME
179 + "-consolidateStoreDevicesRemoved";
181 static final String DEVICE_SYNC_STORE_NAME = DeviceManagerImpl.class
182 .getCanonicalName() + ".stateStore";
185 * Time interval between writes of entries for the same device to the sync
188 // static final int DEFAULT_SYNC_STORE_WRITE_INTERVAL_MS =
189 // 5*60*1000; // 5 min
190 // private int syncStoreWriteIntervalMs =
191 // DEFAULT_SYNC_STORE_WRITE_INTERVAL_MS;
194 * Time after SLAVE->MASTER until we run the consolidate store code.
196 // static final int DEFAULT_INITIAL_SYNC_STORE_CONSOLIDATE_MS =
197 // 15*1000; // 15 sec
198 // private int initialSyncStoreConsolidateMs =
199 // DEFAULT_INITIAL_SYNC_STORE_CONSOLIDATE_MS;
202 * Time interval between consolidate store runs.
204 // static final int DEFAULT_SYNC_STORE_CONSOLIDATE_INTERVAL_MS =
205 // 75*60*1000; // 75 min
206 // private final int syncStoreConsolidateIntervalMs =
207 // DEFAULT_SYNC_STORE_CONSOLIDATE_INTERVAL_MS;
210 * Time in milliseconds before entities will expire
212 protected static final int ENTITY_TIMEOUT = 60 * 60 * 1000;
215 * Time in seconds between cleaning up old entities/devices
217 protected static final int ENTITY_CLEANUP_INTERVAL = 60 * 60;
220 * This is the master device map that maps device IDs to {@link Device}
223 protected ConcurrentHashMap<Long, Device> deviceMap;
226 * Counter used to generate device keys
228 protected long deviceKeyCounter = 0;
231 * Lock for incrementing the device key counter
233 protected Object deviceKeyLock = new Object();
236 * This is the primary entity index that contains all entities
238 protected DeviceUniqueIndex primaryIndex;
241 * This stores secondary indices over the fields in the devices
243 protected Map<EnumSet<DeviceField>, DeviceIndex> secondaryIndexMap;
246 * This map contains state for each of the {@ref IEntityClass} that exist
248 protected ConcurrentHashMap<String, ClassState> classStateMap;
251 * This is the list of indices we want on a per-class basis
253 protected Set<EnumSet<DeviceField>> perClassIndices;
256 * The entity classifier currently in use
258 protected IEntityClassifierService entityClassifier;
261 * Used to cache state about specific entity classes
263 protected class ClassState {
268 protected DeviceUniqueIndex classIndex;
271 * This stores secondary indices over the fields in the device for the
274 protected Map<EnumSet<DeviceField>, DeviceIndex> secondaryIndexMap;
277 * Allocate a new {@link ClassState} object for the class
280 * the class to use for the state
282 public ClassState(IEntityClass clazz) {
283 EnumSet<DeviceField> keyFields = clazz.getKeyFields();
284 EnumSet<DeviceField> primaryKeyFields = entityClassifier
286 boolean keyFieldsMatchPrimary = primaryKeyFields.equals(keyFields);
288 if (!keyFieldsMatchPrimary)
289 classIndex = new DeviceUniqueIndex(keyFields);
291 secondaryIndexMap = new HashMap<EnumSet<DeviceField>, DeviceIndex>();
292 for (EnumSet<DeviceField> fields : perClassIndices) {
293 secondaryIndexMap.put(fields, new DeviceMultiIndex(fields));
299 * Device manager event listeners reclassifyDeviceListeners are notified
300 * first before reconcileDeviceListeners. This is to make sure devices are
301 * correctly reclassified before reconciliation.
303 protected ListenerDispatcher<String, IDeviceListener> deviceListeners;
306 * Using the IfNewHostNotify to notify listeners of host changes.
308 private Set<IfNewHostNotify> newHostNotify = Collections.synchronizedSet(new HashSet<IfNewHostNotify>());
310 * A device update event to be dispatched
312 protected static class DeviceUpdate {
318 * The affected device
320 protected Device device;
323 * The change that was made
325 protected Change change;
328 * If not added, then this is the list of fields changed
330 protected EnumSet<DeviceField> fieldsChanged;
332 public DeviceUpdate(Device device, Change change,
333 EnumSet<DeviceField> fieldsChanged) {
335 this.device = device;
336 this.change = change;
337 this.fieldsChanged = fieldsChanged;
341 public String toString() {
342 String devIdStr = device.getEntityClass().getName() + "::"
343 + device.getMACAddressString();
344 return "DeviceUpdate [device=" + devIdStr + ", change=" + change
345 + ", fieldsChanged=" + fieldsChanged + "]";
351 * AttachmentPointComparator
353 * Compares two attachment points and returns the latest one. It is assumed
354 * that the two attachment points are in the same L2 domain.
358 protected class AttachmentPointComparator implements
359 Comparator<AttachmentPoint> {
360 public AttachmentPointComparator() {
365 public int compare(AttachmentPoint oldAP, AttachmentPoint newAP) {
366 // First compare based on L2 domain ID;
368 // XXX - missing functionality -- need topology
369 // long oldDomain = topology.getL2DomainId(oldSw);
370 // boolean oldBD = topology.isBroadcastDomainPort(oldSw, oldPort);
372 boolean oldBD = false;
374 // XXX - missing functionality -- need topology
375 // long newDomain = topology.getL2DomainId(newSw);
376 // boolean newBD = topology.isBroadcastDomainPort(newSw, newPort);
378 boolean newBD = false;
380 if (oldDomain < newDomain)
382 else if (oldDomain > newDomain)
385 // Give preference to OFPP_LOCAL always
386 if (!oldAP.getPort().getType().equals(NodeConnectorIDType.SWSTACK)
387 && newAP.getPort().getType()
388 .equals(NodeConnectorIDType.SWSTACK)) {
390 } else if (oldAP.getPort().getType()
391 .equals(NodeConnectorIDType.SWSTACK)
392 && !newAP.getPort().getType()
393 .equals(NodeConnectorIDType.SWSTACK)) {
397 // We expect that the last seen of the new AP is higher than
398 // old AP, if it is not, just reverse and send the negative
400 if (oldAP.getActiveSince() > newAP.getActiveSince())
401 return -compare(newAP, oldAP);
403 long activeOffset = 0;
404 // XXX - missing functionality -- need topology
405 // if (!topology.isConsistent(oldSw, oldPort, newSw, newPort)) {
406 if (!newBD && oldBD) {
409 if (newBD && oldBD) {
410 activeOffset = AttachmentPoint.EXTERNAL_TO_EXTERNAL_TIMEOUT;
411 } else if (newBD && !oldBD) {
412 activeOffset = AttachmentPoint.OPENFLOW_TO_EXTERNAL_TIMEOUT;
416 // // The attachment point is consistent.
417 // activeOffset = AttachmentPoint.CONSISTENT_TIMEOUT;
420 if ((newAP.getActiveSince() > oldAP.getLastSeen() + activeOffset)
421 || (newAP.getLastSeen() > oldAP.getLastSeen()
422 + AttachmentPoint.INACTIVITY_INTERVAL)) {
430 * Comparator for sorting by cluster ID
432 public AttachmentPointComparator apComparator;
435 * Switch ports where attachment points shouldn't be learned
437 private Set<SwitchPort> suppressAPs;
440 * Periodic task to clean up expired entities
442 public SingletonTask entityCleanupTask;
444 // ********************
445 // Dependency injection
446 // ********************
448 void setNewHostNotify(IfNewHostNotify obj){
449 this.newHostNotify.add(obj);
452 void unsetNewHostNotify(IfNewHostNotify obj){
453 this.newHostNotify.remove(obj);
456 void setDataPacketService(IDataPacketService s) {
457 this.dataPacketService = s;
460 void unsetDataPacketService(IDataPacketService s) {
461 if (this.dataPacketService == s) {
462 this.dataPacketService = null;
466 public void setTopologyManager(ITopologyManager s) {
470 public void unsetTopologyManager(ITopologyManager s) {
471 if (this.topology == s) {
472 logger.debug("Topology Manager Service removed!");
473 this.topology = null;
477 private volatile boolean stopped = true;
478 private ScheduledExecutorService ses;
486 public void start() {
487 this.perClassIndices = new HashSet<EnumSet<DeviceField>>();
489 // XXX - TODO need to make it possible to register a non-default
491 entityClassifier = new DefaultEntityClassifier();
492 this.deviceListeners = new ListenerDispatcher<String, IDeviceListener>();
493 this.suppressAPs = Collections
494 .newSetFromMap(new ConcurrentHashMap<SwitchPort, Boolean>());
495 primaryIndex = new DeviceUniqueIndex(entityClassifier.getKeyFields());
496 secondaryIndexMap = new HashMap<EnumSet<DeviceField>, DeviceIndex>();
498 deviceMap = new ConcurrentHashMap<Long, Device>();
499 classStateMap = new ConcurrentHashMap<String, ClassState>();
500 apComparator = new AttachmentPointComparator();
502 addIndex(true, EnumSet.of(DeviceField.IPV4));
504 // floodlightProvider.addOFMessageListener(OFType.PACKET_IN, this);
505 // floodlightProvider.addHAListener(this.haListenerDelegate);
506 // if (topology != null)
507 // topology.addListener(this);
508 // flowReconcileMgr.addFlowReconcileListener(this);
509 // entityClassifier.addListener(this);
512 // XXX - Should use a common threadpool but this doesn't currently exist
513 ses = Executors.newScheduledThreadPool(1);
514 Runnable ecr = new Runnable() {
519 entityCleanupTask.reschedule(ENTITY_CLEANUP_INTERVAL,
523 entityCleanupTask = new SingletonTask(ses, ecr);
524 entityCleanupTask.reschedule(ENTITY_CLEANUP_INTERVAL, TimeUnit.SECONDS);
527 * XXX Missing functionality if (restApi != null) {
528 * restApi.addRestletRoutable(new DeviceRoutable()); } else {
529 * logger.debug("Could not instantiate REST API"); }
532 registerDeviceManagerDebugCounters();
535 * XXX Missing functionality try {
536 * this.syncService.registerStore(DEVICE_SYNC_STORE_NAME, Scope.LOCAL);
537 * this.storeClient = this.syncService
538 * .getStoreClient(DEVICE_SYNC_STORE_NAME, String.class,
539 * DeviceSyncRepresentation.class); } catch (SyncException e) { throw
540 * new FloodlightModuleException("Error while setting up sync service",
543 * Runnable consolidateStoreRunner = new Runnable() {
545 * @Override public void run() { deviceSyncManager.consolidateStore();
546 * storeConsolidateTask.reschedule(syncStoreConsolidateIntervalMs,
547 * TimeUnit.MILLISECONDS); debugCounters.flushCounters(); } };
548 * storeConsolidateTask = new SingletonTask(ses,
549 * consolidateStoreRunner); if (isMaster)
550 * storeConsolidateTask.reschedule(syncStoreConsolidateIntervalMs,
551 * TimeUnit.MILLISECONDS);
556 * Periodic task to consolidate entries in the store. I.e., delete entries
557 * in the store that are not known to DeviceManager
559 // XXX - Missing functionality
560 // private SingletonTask storeConsolidateTask;
562 // *********************
563 // IDeviceManagerService
564 // *********************
566 void setSwitchManager(ISwitchManager s) {
567 logger.debug("SwitchManager set");
568 this.switchManager = s;
571 void unsetSwitchManager(ISwitchManager s) {
572 if (this.switchManager == s) {
573 logger.debug("SwitchManager removed!");
574 this.switchManager = null;
579 public IDevice getDevice(Long deviceKey) {
580 return deviceMap.get(deviceKey);
584 public IDevice findDevice(long macAddress, Short vlan, Integer ipv4Address,
585 NodeConnector port) throws IllegalArgumentException {
586 if (vlan != null && vlan.shortValue() <= 0)
588 if (ipv4Address != null && ipv4Address == 0)
590 Entity e = new Entity(macAddress, vlan, ipv4Address, port, null);
591 if (!allKeyFieldsPresent(e, entityClassifier.getKeyFields())) {
592 throw new IllegalArgumentException("Not all key fields specified."
593 + " Required fields: " + entityClassifier.getKeyFields());
595 return findDeviceByEntity(e);
599 public IDevice findClassDevice(IEntityClass entityClass, long macAddress,
600 Short vlan, Integer ipv4Address) throws IllegalArgumentException {
601 if (vlan != null && vlan.shortValue() <= 0)
603 if (ipv4Address != null && ipv4Address == 0)
605 Entity e = new Entity(macAddress, vlan, ipv4Address, null, null);
606 if (entityClass == null
607 || !allKeyFieldsPresent(e, entityClass.getKeyFields())) {
608 throw new IllegalArgumentException("Not all key fields and/or "
609 + " no source device specified. Required fields: "
610 + entityClassifier.getKeyFields());
612 return findDestByEntity(entityClass, e);
616 public Collection<? extends IDevice> getAllDevices() {
617 return Collections.unmodifiableCollection(deviceMap.values());
621 public void addIndex(boolean perClass, EnumSet<DeviceField> keyFields) {
623 perClassIndices.add(keyFields);
625 secondaryIndexMap.put(keyFields, new DeviceMultiIndex(keyFields));
630 public Iterator<? extends IDevice> queryDevices(Long macAddress,
631 Short vlan, Integer ipv4Address, NodeConnector port) {
632 DeviceIndex index = null;
633 if (secondaryIndexMap.size() > 0) {
634 EnumSet<DeviceField> keys = getEntityKeys(macAddress, vlan,
636 index = secondaryIndexMap.get(keys);
639 Iterator<Device> deviceIterator = null;
641 // Do a full table scan
642 deviceIterator = deviceMap.values().iterator();
645 Entity entity = new Entity((macAddress == null ? 0 : macAddress),
646 vlan, ipv4Address, port, null);
647 deviceIterator = new DeviceIndexInterator(this,
648 index.queryByEntity(entity));
651 DeviceIterator di = new DeviceIterator(deviceIterator, null,
652 macAddress, vlan, ipv4Address, port);
657 public Iterator<? extends IDevice> queryClassDevices(
658 IEntityClass entityClass, Long macAddress, Short vlan,
659 Integer ipv4Address, NodeConnector port) {
660 ArrayList<Iterator<Device>> iterators = new ArrayList<Iterator<Device>>();
661 ClassState classState = getClassState(entityClass);
663 DeviceIndex index = null;
664 if (classState.secondaryIndexMap.size() > 0) {
665 EnumSet<DeviceField> keys = getEntityKeys(macAddress, vlan,
667 index = classState.secondaryIndexMap.get(keys);
670 Iterator<Device> iter;
672 index = classState.classIndex;
675 return new DeviceIterator(deviceMap.values().iterator(),
676 new IEntityClass[] { entityClass }, macAddress, vlan,
679 // scan the entire class
680 iter = new DeviceIndexInterator(this, index.getAll());
684 Entity entity = new Entity((macAddress == null ? 0 : macAddress),
685 vlan, ipv4Address, port, null);
686 iter = new DeviceIndexInterator(this, index.queryByEntity(entity));
690 return new MultiIterator<Device>(iterators.iterator());
693 protected Iterator<Device> getDeviceIteratorForQuery(Long macAddress,
694 Short vlan, Integer ipv4Address, NodeConnector port) {
695 DeviceIndex index = null;
696 if (secondaryIndexMap.size() > 0) {
697 EnumSet<DeviceField> keys = getEntityKeys(macAddress, vlan,
699 index = secondaryIndexMap.get(keys);
702 Iterator<Device> deviceIterator = null;
704 // Do a full table scan
705 deviceIterator = deviceMap.values().iterator();
708 Entity entity = new Entity((macAddress == null ? 0 : macAddress),
709 vlan, ipv4Address, port, null);
710 deviceIterator = new DeviceIndexInterator(this,
711 index.queryByEntity(entity));
714 DeviceIterator di = new DeviceIterator(deviceIterator, null,
715 macAddress, vlan, ipv4Address, port);
720 public void addListener(IDeviceListener listener) {
721 deviceListeners.addListener("device", listener);
726 public void addSuppressAPs(NodeConnector port) {
727 suppressAPs.add(new SwitchPort(port));
731 public void removeSuppressAPs(NodeConnector port) {
732 suppressAPs.remove(new SwitchPort(port));
736 public Set<SwitchPort> getSuppressAPs() {
737 return Collections.unmodifiableSet(suppressAPs);
740 private void logListeners() {
741 List<IDeviceListener> listeners = deviceListeners.getOrderedListeners();
742 if (listeners != null) {
743 StringBuffer sb = new StringBuffer();
744 sb.append("DeviceListeners: ");
745 for (IDeviceListener l : listeners) {
746 sb.append(l.getName());
749 logger.debug(sb.toString());
754 // IFlowReconcileListener
757 * XXX - Missing functionality
759 * @Override public Command reconcileFlows(ArrayList<OFMatchReconcile>
760 * ofmRcList) { ListIterator<OFMatchReconcile> iter =
761 * ofmRcList.listIterator(); while (iter.hasNext()) { OFMatchReconcile ofm =
764 * // Remove the STOPPed flow. if (Command.STOP == reconcileFlow(ofm)) {
767 * if (ofmRcList.size() > 0) { return Command.CONTINUE; } else { return
770 * protected Command reconcileFlow(OFMatchReconcile ofm) {
771 * debugCounters.updateCounter(CNT_RECONCILE_REQUEST); // Extract source
772 * entity information Entity srcEntity =
773 * getEntityFromFlowMod(ofm.ofmWithSwDpid, true); if (srcEntity == null) {
774 * debugCounters.updateCounter(CNT_RECONCILE_NO_SOURCE); return
777 * // Find the device by source entity Device srcDevice =
778 * findDeviceByEntity(srcEntity); if (srcDevice == null) {
779 * debugCounters.updateCounter(CNT_RECONCILE_NO_SOURCE); return
780 * Command.STOP; } // Store the source device in the context
781 * fcStore.put(ofm.cntx, CONTEXT_SRC_DEVICE, srcDevice);
783 * // Find the device matching the destination from the entity // classes of
784 * the source. Entity dstEntity = getEntityFromFlowMod(ofm.ofmWithSwDpid,
785 * false); Device dstDevice = null; if (dstEntity != null) { dstDevice =
786 * findDestByEntity(srcDevice.getEntityClass(), dstEntity); if (dstDevice !=
787 * null) fcStore.put(ofm.cntx, CONTEXT_DST_DEVICE, dstDevice); else
788 * debugCounters.updateCounter(CNT_RECONCILE_NO_DEST); } else {
789 * debugCounters.updateCounter(CNT_RECONCILE_NO_DEST); } if
790 * (logger.isTraceEnabled()) {
791 * logger.trace("Reconciling flow: match={}, srcEntity={}, srcDev={}, " +
792 * "dstEntity={}, dstDev={}", new Object[] {ofm.ofmWithSwDpid.getOfMatch(),
793 * srcEntity, srcDevice, dstEntity, dstDevice } ); } return
794 * Command.CONTINUE; }
802 public PacketResult receiveDataPacket(RawPacket inPkt) {
803 // XXX - Can this really pass in null? Why would you ever want that?
805 return PacketResult.IGNORED;
808 // throw new Exception("Sample");
809 // } catch (Exception e) {
810 // logger.error("Sample stack trace", e);
813 Packet formattedPak = this.dataPacketService.decodeDataPacket(inPkt);
815 if (formattedPak instanceof Ethernet) {
816 eth = (Ethernet) formattedPak;
818 return PacketResult.IGNORED;
821 // Extract source entity information
822 NodeConnector inPort = inPkt.getIncomingNodeConnector();
823 Entity srcEntity = getSourceEntityFromPacket(eth, inPort);
824 if (srcEntity == null) {
825 // debugCounters.updateCounter(CNT_BROADCAST_SOURCE);
826 return PacketResult.CONSUME;
829 // Learn from ARP packet for special VRRP settings.
830 // In VRRP settings, the source MAC address and sender MAC
831 // addresses can be different. In such cases, we need to learn
832 // the IP to MAC mapping of the VRRP IP address. The source
833 // entity will not have that information. Hence, a separate call
834 // to learn devices in such cases.
835 learnDeviceFromArpResponseData(eth, inPort);
837 // Learn/lookup device information
838 Device srcDevice = learnDeviceByEntity(srcEntity);
839 if (srcDevice == null) {
840 // debugCounters.updateCounter(CNT_NO_SOURCE);
841 return PacketResult.CONSUME;
843 logger.trace("Saw packet from device {}", srcDevice);
845 // // Store the source device in the context
846 // fcStore.put(cntx, CONTEXT_SRC_DEVICE, srcDevice);
848 // // Find the device matching the destination from the entity
849 // // classes of the source.
850 // Entity dstEntity = getDestEntityFromPacket(eth);
851 // Device dstDevice = null;
852 // if (dstEntity != null) {
854 // findDestByEntity(srcDevice.getEntityClass(), dstEntity);
855 // if (dstDevice != null)
856 // fcStore.put(cntx, CONTEXT_DST_DEVICE, dstDevice);
858 // //debugCounters.updateCounter(CNT_NO_DEST);
860 // //debugCounters.updateCounter(CNT_NO_DEST);
863 // if (logger.isTraceEnabled()) {
864 // logger.trace("Received PI: {} on switch {}, port {} *** eth={}" +
865 // " *** srcDev={} *** dstDev={} *** ",
866 // new Object[] { pi, sw.getStringId(), pi.getInPort(), eth,
867 // srcDevice, dstDevice });
870 // snoopDHCPClientName(eth, srcDevice);
872 return PacketResult.KEEP_PROCESSING;
880 * Snoop and record client-provided host name from DHCP requests
885 // private void snoopDHCPClientName(Ethernet eth, Device srcDevice) {
886 // if (! (eth.getPayload() instanceof IPv4) )
888 // IPv4 ipv4 = (IPv4) eth.getPayload();
889 // if (! (ipv4.getPayload() instanceof UDP) )
891 // UDP udp = (UDP) ipv4.getPayload();
892 // if (!(udp.getPayload() instanceof DHCP))
894 // DHCP dhcp = (DHCP) udp.getPayload();
895 // byte opcode = dhcp.getOpCode();
896 // if (opcode == DHCP.OPCODE_REQUEST) {
897 // DHCPOption dhcpOption = dhcp.getOption(
898 // DHCPOptionCode.OptionCode_Hostname);
899 // if (dhcpOption != null) {
900 // debugCounters.updateCounter(CNT_DHCP_CLIENT_NAME_SNOOPED);
901 // srcDevice.dhcpClientName = new String(dhcpOption.getData());
907 * Check whether the given attachment point is valid given the current
914 * @return true if it's a valid attachment point
916 public boolean isValidAttachmentPoint(NodeConnector port) {
917 // XXX - missing functionality -- need topology module
918 // if (topology.isAttachmentPointPort(port) == false)
920 if (topology.isInternal(port))
922 if (!switchManager.isNodeConnectorEnabled(port))
924 if (suppressAPs.contains(new SwitchPort(port)))
931 * Get sender IP address from packet if the packet is either an ARP packet.
937 private int getSrcNwAddr(Ethernet eth, long dlAddr) {
938 if (eth.getPayload() instanceof ARP) {
939 ARP arp = (ARP) eth.getPayload();
940 if ((arp.getProtocolType() == ARP.PROTO_TYPE_IP)
941 && (toLong(arp.getSenderHardwareAddress()) == dlAddr)) {
942 return toIPv4Address(arp.getSenderProtocolAddress());
949 * Parse an entity from an {@link Ethernet} packet.
952 * the packet to parse
954 * the switch on which the packet arrived
956 * the original packetin
957 * @return the entity from the packet
959 protected Entity getSourceEntityFromPacket(Ethernet eth, NodeConnector port) {
960 byte[] dlAddrArr = eth.getSourceMACAddress();
961 long dlAddr = toLong(dlAddrArr);
963 // Ignore broadcast/multicast source
964 if ((dlAddrArr[0] & 0x1) != 0)
967 // XXX missing functionality
969 int nwSrc = getSrcNwAddr(eth, dlAddr);
970 return new Entity(dlAddr, null, ((nwSrc != 0) ? nwSrc : null), port,
975 * Learn device from ARP data in scenarios where the Ethernet source MAC is
976 * different from the sender hardware address in ARP data.
978 protected void learnDeviceFromArpResponseData(Ethernet eth,
979 NodeConnector port) {
981 if (!(eth.getPayload() instanceof ARP))
983 ARP arp = (ARP) eth.getPayload();
985 byte[] dlAddrArr = eth.getSourceMACAddress();
986 long dlAddr = toLong(dlAddrArr);
988 byte[] senderHardwareAddr = arp.getSenderHardwareAddress();
989 long senderAddr = toLong(senderHardwareAddr);
991 if (dlAddr == senderAddr)
994 // Ignore broadcast/multicast source
995 if ((senderHardwareAddr[0] & 0x1) != 0)
998 // short vlan = eth.getVlanID();
999 int nwSrc = toIPv4Address(arp.getSenderProtocolAddress());
1001 Entity e = new Entity(senderAddr, null, ((nwSrc != 0) ? nwSrc : null),
1004 learnDeviceByEntity(e);
1008 * Get a (partial) entity for the destination from the packet.
1013 // protected Entity getDestEntityFromPacket(Ethernet eth) {
1014 // byte[] dlAddrArr = eth.getDestinationMACAddress();
1015 // long dlAddr = Ethernet.toLong(dlAddrArr);
1016 // short vlan = eth.getVlanID();
1019 // // Ignore broadcast/multicast destination
1020 // if ((dlAddrArr[0] & 0x1) != 0)
1023 // if (eth.getPayload() instanceof IPv4) {
1024 // IPv4 ipv4 = (IPv4) eth.getPayload();
1025 // nwDst = ipv4.getDestinationAddress();
1028 // return new Entity(dlAddr,
1029 // ((vlan >= 0) ? vlan : null),
1030 // ((nwDst != 0) ? nwDst : null),
1037 * Parse an entity from an OFMatchWithSwDpid.
1039 * @param ofmWithSwDpid
1040 * @return the entity from the packet
1042 // private Entity getEntityFromFlowMod(OFMatchWithSwDpid ofmWithSwDpid,
1043 // boolean isSource) {
1044 // byte[] dlAddrArr = ofmWithSwDpid.getOfMatch().getDataLayerSource();
1045 // int nwSrc = ofmWithSwDpid.getOfMatch().getNetworkSource();
1047 // dlAddrArr = ofmWithSwDpid.getOfMatch().getDataLayerDestination();
1048 // nwSrc = ofmWithSwDpid.getOfMatch().getNetworkDestination();
1051 // long dlAddr = Ethernet.toLong(dlAddrArr);
1053 // // Ignore broadcast/multicast source
1054 // if ((dlAddrArr[0] & 0x1) != 0)
1057 // Long swDpid = null;
1058 // Short inPort = null;
1061 // swDpid = ofmWithSwDpid.getSwitchDataPathId();
1062 // inPort = ofmWithSwDpid.getOfMatch().getInputPort();
1065 // /**for the new flow cache design, the flow mods retrived are not always
1066 // from the source, learn AP should be disabled --meiyang*/
1067 // boolean learnap = false;
1069 // * if (swDpid == null ||
1070 // inPort == null ||
1071 // !isValidAttachmentPoint(swDpid, inPort)) {
1072 // // If this is an internal port or we otherwise don't want
1073 // // to learn on these ports. In the future, we should
1074 // // handle this case by labeling flows with something that
1075 // // will give us the entity class. For now, we'll do our
1076 // // best assuming attachment point information isn't used
1077 // // as a key field.
1082 // short vlan = ofmWithSwDpid.getOfMatch().getDataLayerVirtualLan();
1083 // return new Entity(dlAddr,
1084 // ((vlan >= 0) ? vlan : null),
1085 // ((nwSrc != 0) ? nwSrc : null),
1086 // (learnap ? swDpid : null),
1087 // (learnap ? (int)inPort : null),
1092 * Look up a {@link Device} based on the provided {@link Entity}. We first
1093 * check the primary index. If we do not find an entry there we classify the
1094 * device into its IEntityClass and query the classIndex. This implies that
1095 * all key field of the current IEntityClassifier must be present in the
1096 * entity for the lookup to succeed!
1099 * the entity to search for
1100 * @return The {@link Device} object if found
1102 protected Device findDeviceByEntity(Entity entity) {
1103 // Look up the fully-qualified entity to see if it already
1104 // exists in the primary entity index.
1105 Long deviceKey = primaryIndex.findByEntity(entity);
1106 IEntityClass entityClass = null;
1108 if (deviceKey == null) {
1109 // If the entity does not exist in the primary entity index,
1110 // use the entity classifier for find the classes for the
1111 // entity. Look up the entity in the returned class'
1112 // class entity index.
1113 entityClass = entityClassifier.classifyEntity(entity);
1114 if (entityClass == null) {
1117 ClassState classState = getClassState(entityClass);
1119 if (classState.classIndex != null) {
1120 deviceKey = classState.classIndex.findByEntity(entity);
1123 if (deviceKey == null)
1125 return deviceMap.get(deviceKey);
1129 * Get a destination device using entity fields that corresponds with the
1130 * given source device. The source device is important since there could be
1131 * ambiguity in the destination device without the attachment point
1135 * the source device's entity class. The returned destination
1136 * will be in the same entity class as the source.
1138 * the entity to look up
1139 * @return an {@link Device} or null if no device is found.
1141 protected Device findDestByEntity(IEntityClass reference, Entity dstEntity) {
1143 // Look up the fully-qualified entity to see if it
1144 // exists in the primary entity index
1145 Long deviceKey = primaryIndex.findByEntity(dstEntity);
1147 if (deviceKey == null) {
1148 // This could happen because:
1149 // 1) no destination known, or a broadcast destination
1150 // 2) if we have attachment point key fields since
1151 // attachment point information isn't available for
1152 // destination devices.
1153 // For the second case, we'll need to match up the
1154 // destination device with the class of the source
1156 ClassState classState = getClassState(reference);
1157 if (classState.classIndex == null) {
1160 deviceKey = classState.classIndex.findByEntity(dstEntity);
1162 if (deviceKey == null)
1164 return deviceMap.get(deviceKey);
1168 * Look up a {@link Device} within a particular entity class based on the
1169 * provided {@link Entity}.
1172 * the entity class to search for the entity
1174 * the entity to search for
1175 * @return The {@link Device} object if found private Device
1176 * findDeviceInClassByEntity(IEntityClass clazz, Entity entity) { //
1177 * XXX - TODO throw new UnsupportedOperationException(); }
1181 * Look up a {@link Device} based on the provided {@link Entity}. Also
1182 * learns based on the new entity, and will update existing devices as
1186 * the {@link Entity}
1187 * @return The {@link Device} object if found
1189 protected Device learnDeviceByEntity(Entity entity) {
1190 logger.info("Primary index {}", primaryIndex);
1191 ArrayList<Long> deleteQueue = null;
1192 LinkedList<DeviceUpdate> deviceUpdates = null;
1193 Device oldDevice = null;
1194 Device device = null;
1196 // we may need to restart the learning process if we detect
1197 // concurrent modification. Note that we ensure that at least
1198 // one thread should always succeed so we don't get into infinite
1201 deviceUpdates = null;
1203 // Look up the fully-qualified entity to see if it already
1204 // exists in the primary entity index.
1205 Long deviceKey = primaryIndex.findByEntity(entity);
1206 IEntityClass entityClass = null;
1208 if (deviceKey == null) {
1209 // If the entity does not exist in the primary entity index,
1210 // use the entity classifier for find the classes for the
1211 // entity. Look up the entity in the returned class'
1212 // class entity index.
1213 entityClass = entityClassifier.classifyEntity(entity);
1214 if (entityClass == null) {
1215 // could not classify entity. No device
1219 ClassState classState = getClassState(entityClass);
1221 if (classState.classIndex != null) {
1222 deviceKey = classState.classIndex.findByEntity(entity);
1225 if (deviceKey != null) {
1226 // If the primary or secondary index contains the entity
1227 // use resulting device key to look up the device in the
1228 // device map, and use the referenced Device below.
1229 device = deviceMap.get(deviceKey);
1230 if (device == null) {
1231 // This can happen due to concurrent modification
1232 if (logger.isDebugEnabled()) {
1233 logger.debug("No device for deviceKey {} while "
1234 + "while processing entity {}", deviceKey,
1237 // if so, then try again till we don't even get the device
1239 // and so we recreate the device
1243 // If the secondary index does not contain the entity,
1244 // create a new Device object containing the entity, and
1245 // generate a new device ID if the the entity is on an
1246 // attachment point port. Otherwise ignore.
1247 if (entity.hasSwitchPort()
1248 && !isValidAttachmentPoint(entity.getPort())) {
1249 // debugCounters.updateCounter(CNT_DEVICE_ON_INTERAL_PORT_NOT_LEARNED);
1250 if (logger.isDebugEnabled()) {
1251 logger.debug("Not learning new device on internal"
1252 + " link: {}", entity);
1257 // Before we create the new device also check if
1258 // the entity is allowed (e.g., for spoofing protection)
1259 if (!isEntityAllowed(entity, entityClass)) {
1260 // debugCounters.updateCounter(CNT_PACKET_NOT_ALLOWED);
1261 if (logger.isDebugEnabled()) {
1262 logger.debug("PacketIn is not allowed {} {}",
1263 entityClass.getName(), entity);
1268 synchronized (deviceKeyLock) {
1269 deviceKey = Long.valueOf(deviceKeyCounter++);
1271 device = allocateDevice(deviceKey, entity, entityClass);
1273 // Add the new device to the primary map with a simple put
1274 deviceMap.put(deviceKey, device);
1277 if (!updateIndices(device, deviceKey)) {
1278 if (deleteQueue == null)
1279 deleteQueue = new ArrayList<Long>();
1280 deleteQueue.add(deviceKey);
1284 updateSecondaryIndices(entity, entityClass, deviceKey);
1286 // We need to count and log here. If we log earlier we could
1287 // hit a concurrent modification and restart the dev creation
1288 // and potentially count the device twice.
1289 // debugCounters.updateCounter(CNT_NEW_DEVICE);
1290 if (logger.isDebugEnabled()) {
1292 "New device created: {} deviceKey={}, entity={}",
1293 new Object[] { device, deviceKey, entity });
1295 // generate new device update
1296 deviceUpdates = updateUpdates(deviceUpdates, new DeviceUpdate(
1297 device, ADD, null));
1301 // if it gets here, we have a pre-existing Device for this Entity
1302 if (!isEntityAllowed(entity, device.getEntityClass())) {
1303 // debugCounters.updateCounter(CNT_PACKET_NOT_ALLOWED);
1304 if (logger.isDebugEnabled()) {
1305 logger.info("PacketIn is not allowed {} {}", device
1306 .getEntityClass().getName(), entity);
1310 // If this is not an attachment point port we don't learn the new
1312 // and don't update indexes. But we do allow the device to continue
1315 if (entity.hasSwitchPort()
1316 && !isValidAttachmentPoint(entity.getPort())) {
1317 // debugCounters.updateCounter(CNT_PACKET_ON_INTERNAL_PORT_FOR_KNOWN_DEVICE);
1320 int entityindex = -1;
1321 if ((entityindex = device.entityIndex(entity)) >= 0) {
1322 // Entity already exists
1323 // update timestamp on the found entity
1324 Date lastSeen = entity.getLastSeenTimestamp();
1325 if (lastSeen == null) {
1326 lastSeen = new Date();
1327 entity.setLastSeenTimestamp(lastSeen);
1329 device.entities[entityindex].setLastSeenTimestamp(lastSeen);
1330 // we break the loop after checking for changes to the AP
1332 // New entity for this device
1333 // compute the insertion point for the entity.
1334 // see Arrays.binarySearch()
1335 entityindex = -(entityindex + 1);
1336 Device newDevice = allocateDevice(device, entity, entityindex);
1339 EnumSet<DeviceField> changedFields = findChangedFields(device,
1342 // update the device map with a replace call
1343 boolean res = deviceMap.replace(deviceKey, device, newDevice);
1344 // If replace returns false, restart the process from the
1345 // beginning (this implies another thread concurrently
1346 // modified this Device).
1352 if (!updateIndices(device, deviceKey)) {
1355 updateSecondaryIndices(entity, device.getEntityClass(),
1358 // We need to count here after all the possible "continue"
1359 // statements in this branch
1360 // debugCounters.updateCounter(CNT_NEW_ENTITY);
1361 if (changedFields.size() > 0) {
1362 // debugCounters.updateCounter(CNT_DEVICE_CHANGED);
1363 deviceUpdates = updateUpdates(deviceUpdates,
1364 new DeviceUpdate(newDevice, CHANGE, changedFields));
1366 // we break the loop after checking for changed AP
1368 // Update attachment point (will only be hit if the device
1369 // already existed and no concurrent modification)
1370 if (entity.hasSwitchPort()) {
1371 boolean moved = device.updateAttachmentPoint(entity.getPort(),
1372 entity.getLastSeenTimestamp().getTime());
1373 // TODO: use update mechanism instead of sending the
1374 // notification directly
1376 // we count device moved events in
1377 // sendDeviceMovedNotification()
1378 sendDeviceMovedNotification(device, oldDevice);
1379 if (logger.isTraceEnabled()) {
1380 logger.trace("Device moved: attachment points {},"
1381 + "entities {}", device.attachmentPoints,
1385 if (logger.isTraceEnabled()) {
1386 logger.trace("Device attachment point updated: "
1387 + "attachment points {}," + "entities {}",
1388 device.attachmentPoints, device.entities);
1395 if (deleteQueue != null) {
1396 for (Long l : deleteQueue) {
1397 Device dev = deviceMap.get(l);
1398 this.deleteDevice(dev);
1402 processUpdates(deviceUpdates);
1403 // deviceSyncManager.storeDeviceThrottled(device);
1408 protected boolean isEntityAllowed(Entity entity, IEntityClass entityClass) {
1412 protected EnumSet<DeviceField> findChangedFields(Device device,
1414 EnumSet<DeviceField> changedFields = EnumSet.of(DeviceField.IPV4,
1415 DeviceField.VLAN, DeviceField.SWITCHPORT);
1417 if (newEntity.getIpv4Address() == null)
1418 changedFields.remove(DeviceField.IPV4);
1419 if (newEntity.getVlan() == null)
1420 changedFields.remove(DeviceField.VLAN);
1421 if (newEntity.getPort() == null)
1422 changedFields.remove(DeviceField.SWITCHPORT);
1424 if (changedFields.size() == 0)
1425 return changedFields;
1427 for (Entity entity : device.getEntities()) {
1428 if (newEntity.getIpv4Address() == null
1429 || (entity.getIpv4Address() != null && entity
1431 .equals(newEntity.getIpv4Address())))
1432 changedFields.remove(DeviceField.IPV4);
1433 if (newEntity.getVlan() == null
1434 || (entity.getVlan() != null && entity.getVlan().equals(
1435 newEntity.getVlan())))
1436 changedFields.remove(DeviceField.VLAN);
1437 if (newEntity.getPort() == null
1438 || (entity.getPort() != null && entity.getPort().equals(
1439 newEntity.getPort())))
1440 changedFields.remove(DeviceField.SWITCHPORT);
1443 return changedFields;
1447 * Send update notifications to listeners
1450 * the updates to process.
1452 protected void processUpdates(Queue<DeviceUpdate> updates) {
1453 if (updates == null)
1455 DeviceUpdate update = null;
1456 while (null != (update = updates.poll())) {
1457 if (logger.isTraceEnabled()) {
1458 logger.trace("Dispatching device update: {}", update);
1460 // if (update.change == DeviceUpdate.Change.DELETE)
1461 // deviceSyncManager.removeDevice(update.device);
1463 // deviceSyncManager.storeDevice(update.device);
1464 List<IDeviceListener> listeners = deviceListeners
1465 .getOrderedListeners();
1466 notifyListeners(listeners, update);
1470 protected void notifyListeners(List<IDeviceListener> listeners,
1471 DeviceUpdate update) {
1472 // Topology update is for some reason outside of listeners registry
1474 Entity[] ents = update.device.getEntities();
1475 Entity e = ents[ents.length-1];
1476 NodeConnector p = e.getPort();
1477 Node node = p.getNode();
1480 byte[] mac = NetUtils.longToByteArray6(e.getMacAddress());
1481 DataLinkAddress dla = new EthernetAddress(
1484 InetAddress.getAllByName(e.getIpv4Address().toString());
1485 h = new org.opendaylight.controller.sal.core.Host(dla,
1486 InetAddress.getByName(e.getIpv4Address().toString()));
1487 } catch (ConstructionException ce) {
1490 } catch (UnknownHostException ue){
1495 if (topology != null && p != null && h != null) {
1496 if (update.change.equals(DeviceUpdate.Change.ADD)) {
1497 Tier tier = new Tier(1);
1498 switchManager.setNodeProp(node, tier);
1499 topology.updateHostLink(p, h, UpdateType.ADDED, null);
1501 // No need to reset the tiering if no other hosts are currently
1503 // If this switch was discovered to be an access switch, it
1504 // still is even if the host is down
1505 Tier tier = new Tier(0);
1506 switchManager.setNodeProp(node, tier);
1507 topology.updateHostLink(p, h, UpdateType.REMOVED, null);
1511 if (listeners == null && newHostNotify.isEmpty()) {
1515 * TODO: IfNewHostNotify is needed for current controller API.
1516 * Adding logic so that existing apps (like SimpleForwardingManager)
1517 * work. IDeviceListener adds additional methods and uses IListener's
1518 * callback ordering. The two interfaces need to be merged.
1521 for (IfNewHostNotify notify : newHostNotify){
1522 switch (update.change) {
1524 notify.notifyHTClient(update.device.toHostNodeConnector());
1527 notify.notifyHTClientHostRemoved(update.device.toHostNodeConnector());
1534 * TODO: Remove this section as IDeviceListener functionality gets
1535 * merged with IfNewHostNotify
1537 for (IDeviceListener listener : listeners) {
1538 switch (update.change) {
1540 listener.deviceAdded(update.device);
1543 listener.deviceRemoved(update.device);
1546 for (DeviceField field : update.fieldsChanged) {
1549 listener.deviceIPV4AddrChanged(update.device);
1552 // listener.deviceMoved(update.device);
1555 listener.deviceVlanChanged(update.device);
1558 logger.debug("Unknown device field changed {}",
1559 update.fieldsChanged.toString());
1569 * Check if the entity e has all the keyFields set. Returns false if not
1574 * the key fields to check e against
1577 protected boolean allKeyFieldsPresent(Entity e,
1578 EnumSet<DeviceField> keyFields) {
1579 for (DeviceField f : keyFields) {
1582 // MAC address is always present
1585 if (e.getIpv4Address() == null)
1589 if (e.getPort() == null)
1593 // FIXME: vlan==null is ambiguous: it can mean: not present
1595 // if (e.vlan == null) return false;
1598 // we should never get here. unless somebody extended
1600 throw new IllegalStateException();
1606 private LinkedList<DeviceUpdate> updateUpdates(
1607 LinkedList<DeviceUpdate> list, DeviceUpdate update) {
1611 list = new LinkedList<DeviceUpdate>();
1618 * Get the secondary index for a class. Will return null if the secondary
1619 * index was created concurrently in another thread.
1622 * the class for the index
1625 private ClassState getClassState(IEntityClass clazz) {
1626 ClassState classState = classStateMap.get(clazz.getName());
1627 if (classState != null)
1630 classState = new ClassState(clazz);
1631 ClassState r = classStateMap.putIfAbsent(clazz.getName(), classState);
1640 * Update both the primary and class indices for the provided device. If the
1641 * update fails because of an concurrent update, will return false.
1644 * the device to update
1646 * the device key for the device
1647 * @return true if the update succeeded, false otherwise.
1649 private boolean updateIndices(Device device, Long deviceKey) {
1650 if (!primaryIndex.updateIndex(device, deviceKey)) {
1653 IEntityClass entityClass = device.getEntityClass();
1654 ClassState classState = getClassState(entityClass);
1656 if (classState.classIndex != null) {
1657 if (!classState.classIndex.updateIndex(device, deviceKey))
1664 * Update the secondary indices for the given entity and associated entity
1668 * the entity to update
1669 * @param entityClass
1670 * the entity class for the entity
1672 * the device key to set up
1674 private void updateSecondaryIndices(Entity entity,
1675 IEntityClass entityClass, Long deviceKey) {
1676 for (DeviceIndex index : secondaryIndexMap.values()) {
1677 index.updateIndex(entity, deviceKey);
1679 ClassState state = getClassState(entityClass);
1680 for (DeviceIndex index : state.secondaryIndexMap.values()) {
1681 index.updateIndex(entity, deviceKey);
1686 * Clean up expired entities/devices
1688 protected void cleanupEntities() {
1689 // debugCounters.updateCounter(CNT_CLEANUP_ENTITIES_RUNS);
1691 Calendar c = Calendar.getInstance();
1692 c.add(Calendar.MILLISECOND, -ENTITY_TIMEOUT);
1693 Date cutoff = c.getTime();
1695 ArrayList<Entity> toRemove = new ArrayList<Entity>();
1696 ArrayList<Entity> toKeep = new ArrayList<Entity>();
1698 Iterator<Device> diter = deviceMap.values().iterator();
1699 LinkedList<DeviceUpdate> deviceUpdates = new LinkedList<DeviceUpdate>();
1701 while (diter.hasNext()) {
1702 Device d = diter.next();
1705 deviceUpdates.clear();
1708 for (Entity e : d.getEntities()) {
1709 if (e.getLastSeenTimestamp() != null
1710 && 0 > e.getLastSeenTimestamp().compareTo(cutoff)) {
1711 // individual entity needs to be removed
1717 if (toRemove.size() == 0) {
1721 // debugCounters.updateCounter(CNT_ENTITY_REMOVED_TIMEOUT);
1722 for (Entity e : toRemove) {
1723 removeEntity(e, d.getEntityClass(), d.getDeviceKey(),
1727 if (toKeep.size() > 0) {
1728 Device newDevice = allocateDevice(d.getDeviceKey(),
1729 d.getDHCPClientName(), d.oldAPs,
1730 d.attachmentPoints, toKeep, d.getEntityClass());
1732 EnumSet<DeviceField> changedFields = EnumSet
1733 .noneOf(DeviceField.class);
1734 for (Entity e : toRemove) {
1735 changedFields.addAll(findChangedFields(newDevice, e));
1737 DeviceUpdate update = null;
1738 if (changedFields.size() > 0) {
1739 update = new DeviceUpdate(d, CHANGE, changedFields);
1742 if (!deviceMap.replace(newDevice.getDeviceKey(), d,
1744 // concurrent modification; try again
1745 // need to use device that is the map now for the next
1747 d = deviceMap.get(d.getDeviceKey());
1751 if (update != null) {
1752 // need to count after all possibly continue stmts in
1754 // debugCounters.updateCounter(CNT_DEVICE_CHANGED);
1755 deviceUpdates.add(update);
1758 DeviceUpdate update = new DeviceUpdate(d, DELETE, null);
1759 if (!deviceMap.remove(d.getDeviceKey(), d)) {
1760 // concurrent modification; try again
1761 // need to use device that is the map now for the next
1763 d = deviceMap.get(d.getDeviceKey());
1766 // debugCounters.updateCounter(CNT_DEVICE_DELETED);
1768 deviceUpdates.add(update);
1770 processUpdates(deviceUpdates);
1776 protected void removeEntity(Entity removed, IEntityClass entityClass,
1777 Long deviceKey, Collection<Entity> others) {
1778 // Don't count in this method. This method CAN BE called to clean-up
1779 // after concurrent device adds/updates and thus counting here
1781 for (DeviceIndex index : secondaryIndexMap.values()) {
1782 index.removeEntityIfNeeded(removed, deviceKey, others);
1784 ClassState classState = getClassState(entityClass);
1785 for (DeviceIndex index : classState.secondaryIndexMap.values()) {
1786 index.removeEntityIfNeeded(removed, deviceKey, others);
1789 primaryIndex.removeEntityIfNeeded(removed, deviceKey, others);
1791 if (classState.classIndex != null) {
1792 classState.classIndex.removeEntityIfNeeded(removed, deviceKey,
1798 * method to delete a given device, remove all entities first and then
1799 * finally delete the device itself.
1803 protected void deleteDevice(Device device) {
1804 // Don't count in this method. This method CAN BE called to clean-up
1805 // after concurrent device adds/updates and thus counting here
1807 ArrayList<Entity> emptyToKeep = new ArrayList<Entity>();
1808 for (Entity entity : device.getEntities()) {
1809 this.removeEntity(entity, device.getEntityClass(),
1810 device.getDeviceKey(), emptyToKeep);
1812 if (!deviceMap.remove(device.getDeviceKey(), device)) {
1813 if (logger.isDebugEnabled())
1814 logger.debug("device map does not have this device -"
1815 + device.toString());
1819 private EnumSet<DeviceField> getEntityKeys(Long macAddress, Short vlan,
1820 Integer ipv4Address, NodeConnector port) {
1821 // FIXME: vlan==null is a valid search. Need to handle this
1822 // case correctly. Note that the code will still work correctly.
1823 // But we might do a full device search instead of using an index.
1824 EnumSet<DeviceField> keys = EnumSet.noneOf(DeviceField.class);
1825 if (macAddress != null)
1826 keys.add(DeviceField.MAC);
1828 keys.add(DeviceField.VLAN);
1829 if (ipv4Address != null)
1830 keys.add(DeviceField.IPV4);
1832 keys.add(DeviceField.SWITCHPORT);
1836 protected Iterator<Device> queryClassByEntity(IEntityClass clazz,
1837 EnumSet<DeviceField> keyFields, Entity entity) {
1838 ClassState classState = getClassState(clazz);
1839 DeviceIndex index = classState.secondaryIndexMap.get(keyFields);
1841 return Collections.<Device> emptySet().iterator();
1842 return new DeviceIndexInterator(this, index.queryByEntity(entity));
1845 protected Device allocateDevice(Long deviceKey, Entity entity,
1846 IEntityClass entityClass) {
1847 return new Device(this, deviceKey, entity, entityClass);
1851 protected Device allocateDevice(Long deviceKey, String dhcpClientName,
1852 List<AttachmentPoint> aps, List<AttachmentPoint> trueAPs,
1853 Collection<Entity> entities, IEntityClass entityClass) {
1854 return new Device(this, deviceKey, dhcpClientName, aps, trueAPs,
1855 entities, entityClass);
1858 protected Device allocateDevice(Device device, Entity entity,
1859 int insertionpoint) {
1860 return new Device(device, entity, insertionpoint);
1864 protected Device allocateDevice(Device device, Set<Entity> entities) {
1865 List<AttachmentPoint> newPossibleAPs = new ArrayList<AttachmentPoint>();
1866 List<AttachmentPoint> newAPs = new ArrayList<AttachmentPoint>();
1867 for (Entity entity : entities) {
1868 if (entity.getPort() != null) {
1869 AttachmentPoint aP = new AttachmentPoint(entity.getPort(), 0);
1870 newPossibleAPs.add(aP);
1873 if (device.attachmentPoints != null) {
1874 for (AttachmentPoint oldAP : device.attachmentPoints) {
1875 if (newPossibleAPs.contains(oldAP)) {
1880 if (newAPs.isEmpty())
1882 Device d = new Device(this, device.getDeviceKey(),
1883 device.getDHCPClientName(), newAPs, null, entities,
1884 device.getEntityClass());
1885 d.updateAttachmentPoint();
1889 // *********************
1890 // ITopologyManagerAware
1891 // *********************
1894 public void edgeUpdate(List<TopoEdgeUpdate> topoedgeupdateList) {
1895 Iterator<Device> diter = deviceMap.values().iterator();
1897 while (diter.hasNext()) {
1898 Device d = diter.next();
1899 if (d.updateAttachmentPoint()) {
1900 if (logger.isDebugEnabled()) {
1901 logger.debug("Attachment point changed for device: {}", d);
1903 sendDeviceMovedNotification(d);
1909 public void edgeOverUtilized(Edge edge) {
1914 public void edgeUtilBackToNormal(Edge edge) {
1918 // *********************
1919 // IEntityClassListener
1920 // *********************
1923 public void entityClassChanged(Set<String> entityClassNames) {
1925 * iterate through the devices, reclassify the devices that belong to
1926 * these entity class names
1928 Iterator<Device> diter = deviceMap.values().iterator();
1929 while (diter.hasNext()) {
1930 Device d = diter.next();
1931 if (d.getEntityClass() == null
1932 || entityClassNames.contains(d.getEntityClass().getName()))
1933 reclassifyDevice(d);
1941 * Send update notifications to listeners
1944 * the updates to process.
1946 protected void sendDeviceMovedNotification(Device d) {
1947 // debugCounters.updateCounter(CNT_DEVICE_MOVED);
1948 // deviceSyncManager.storeDevice(d);
1949 List<IDeviceListener> listeners = deviceListeners.getOrderedListeners();
1950 if (listeners != null) {
1951 for (IDeviceListener listener : listeners) {
1952 listener.deviceMoved(d);
1957 * Send update notifications to listeners.
1958 * IfNewHostNotify listeners need to remove old device and add new device.
1962 protected void sendDeviceMovedNotification(Device device, Device oldDevice){
1963 for (IfNewHostNotify notify : newHostNotify){
1964 notify.notifyHTClientHostRemoved(oldDevice.toHostNodeConnector());
1965 notify.notifyHTClient(device.toHostNodeConnector());
1967 sendDeviceMovedNotification(device);
1971 * this method will reclassify and reconcile a device - possibilities are -
1972 * create new device(s), remove entities from this device. If the device
1973 * entity class did not change then it returns false else true.
1977 protected boolean reclassifyDevice(Device device) {
1978 // first classify all entities of this device
1979 if (device == null) {
1980 logger.debug("In reclassify for null device");
1983 boolean needToReclassify = false;
1984 for (Entity entity : device.entities) {
1985 IEntityClass entityClass = this.entityClassifier
1986 .classifyEntity(entity);
1987 if (entityClass == null || device.getEntityClass() == null) {
1988 needToReclassify = true;
1991 if (!entityClass.getName()
1992 .equals(device.getEntityClass().getName())) {
1993 needToReclassify = true;
1997 if (needToReclassify == false) {
2001 // debugCounters.updateCounter(CNT_DEVICE_RECLASSIFY_DELETE);
2002 LinkedList<DeviceUpdate> deviceUpdates = new LinkedList<DeviceUpdate>();
2003 // delete this device and then re-learn all the entities
2004 this.deleteDevice(device);
2005 deviceUpdates.add(new DeviceUpdate(device, DeviceUpdate.Change.DELETE,
2007 if (!deviceUpdates.isEmpty())
2008 processUpdates(deviceUpdates);
2009 for (Entity entity : device.entities) {
2010 this.learnDeviceByEntity(entity);
2016 * For testing: sets the interval between writes of the same device to the
2021 // void setSyncStoreWriteInterval(int intervalMs) {
2022 // this.syncStoreWriteIntervalMs = intervalMs;
2026 * For testing: sets the time between transition to MASTER and consolidate
2031 // void setInitialSyncStoreConsolidateMs(int intervalMs) {
2032 // this.initialSyncStoreConsolidateMs = intervalMs;
2035 private long toLong(byte[] address) {
2037 for (int i = 0; i < 6; i++) {
2038 long t = (address[i] & 0xffL) << ((5 - i) * 8);
2045 * Accepts an IPv4 address in a byte array and returns the corresponding
2046 * 32-bit integer value.
2051 private static int toIPv4Address(byte[] ipAddress) {
2053 for (int i = 0; i < 4; i++) {
2054 int t = (ipAddress[i] & 0xff) << ((3 - i) * 8);
2060 private void registerDeviceManagerDebugCounters() {
2062 * XXX Missing functionality if (debugCounters == null) {
2063 * logger.error("Debug Counter Service not found."); debugCounters = new
2064 * NullDebugCounter(); return; }
2065 * debugCounters.registerCounter(CNT_INCOMING,
2066 * "All incoming packets seen by this module",
2067 * CounterType.ALWAYS_COUNT);
2068 * debugCounters.registerCounter(CNT_RECONCILE_REQUEST,
2069 * "Number of flows that have been received for reconciliation by " +
2070 * "this module", CounterType.ALWAYS_COUNT);
2071 * debugCounters.registerCounter(CNT_RECONCILE_NO_SOURCE,
2072 * "Number of flow reconcile events that failed because no source " +
2073 * "device could be identified", CounterType.WARN); // is this really a
2074 * warning debugCounters.registerCounter(CNT_RECONCILE_NO_DEST,
2075 * "Number of flow reconcile events that failed because no " +
2076 * "destination device could be identified", CounterType.WARN); // is
2077 * this really a warning
2078 * debugCounters.registerCounter(CNT_BROADCAST_SOURCE,
2079 * "Number of packetIns that were discarded because the source " +
2080 * "MAC was broadcast or multicast", CounterType.WARN);
2081 * debugCounters.registerCounter(CNT_NO_SOURCE,
2082 * "Number of packetIns that were discarded because the " +
2083 * "could not identify a source device. This can happen if a " +
2084 * "packet is not allowed, appears on an illegal port, does not " +
2085 * "have a valid address space, etc.", CounterType.WARN);
2086 * debugCounters.registerCounter(CNT_NO_DEST,
2087 * "Number of packetIns that did not have an associated " +
2088 * "destination device. E.g., because the destination MAC is " +
2089 * "broadcast/multicast or is not yet known to the controller.",
2090 * CounterType.ALWAYS_COUNT);
2091 * debugCounters.registerCounter(CNT_DHCP_CLIENT_NAME_SNOOPED,
2092 * "Number of times a DHCP client name was snooped from a " +
2093 * "packetIn.", CounterType.ALWAYS_COUNT);
2094 * debugCounters.registerCounter(CNT_DEVICE_ON_INTERAL_PORT_NOT_LEARNED,
2095 * "Number of times packetIn was received on an internal port and" +
2096 * "no source device is known for the source MAC. The packetIn is " +
2097 * "discarded.", CounterType.WARN);
2098 * debugCounters.registerCounter(CNT_PACKET_NOT_ALLOWED,
2099 * "Number of times a packetIn was not allowed due to spoofing " +
2100 * "protection configuration.", CounterType.WARN); // is this really a
2101 * warning? debugCounters.registerCounter(CNT_NEW_DEVICE,
2102 * "Number of times a new device was learned",
2103 * CounterType.ALWAYS_COUNT); debugCounters.registerCounter(
2104 * CNT_PACKET_ON_INTERNAL_PORT_FOR_KNOWN_DEVICE,
2105 * "Number of times a packetIn was received on an internal port " +
2106 * "for a known device.", CounterType.ALWAYS_COUNT);
2107 * debugCounters.registerCounter(CNT_NEW_ENTITY,
2108 * "Number of times a new entity was learned for an existing device",
2109 * CounterType.ALWAYS_COUNT);
2110 * debugCounters.registerCounter(CNT_DEVICE_CHANGED,
2111 * "Number of times device properties have changed",
2112 * CounterType.ALWAYS_COUNT);
2113 * debugCounters.registerCounter(CNT_DEVICE_MOVED,
2114 * "Number of times devices have moved", CounterType.ALWAYS_COUNT);
2115 * debugCounters.registerCounter(CNT_CLEANUP_ENTITIES_RUNS,
2116 * "Number of times the entity cleanup task has been run",
2117 * CounterType.ALWAYS_COUNT);
2118 * debugCounters.registerCounter(CNT_ENTITY_REMOVED_TIMEOUT,
2119 * "Number of times entities have been removed due to timeout " +
2120 * "(entity has been inactive for " + ENTITY_TIMEOUT/1000 + "s)",
2121 * CounterType.ALWAYS_COUNT);
2122 * debugCounters.registerCounter(CNT_DEVICE_DELETED,
2123 * "Number of devices that have been removed due to inactivity",
2124 * CounterType.ALWAYS_COUNT);
2125 * debugCounters.registerCounter(CNT_DEVICE_RECLASSIFY_DELETE,
2126 * "Number of devices that required reclassification and have been " +
2127 * "temporarily delete for reclassification", CounterType.ALWAYS_COUNT);
2128 * debugCounters.registerCounter(CNT_DEVICE_STORED,
2129 * "Number of device entries written or updated to the sync store",
2130 * CounterType.ALWAYS_COUNT);
2131 * debugCounters.registerCounter(CNT_DEVICE_STORE_THROTTLED,
2132 * "Number of times a device update to the sync store was " +
2133 * "requested but not performed because the same device entities " +
2134 * "have recently been updated already", CounterType.ALWAYS_COUNT);
2135 * debugCounters.registerCounter(CNT_DEVICE_REMOVED_FROM_STORE,
2136 * "Number of devices that were removed from the sync store " +
2137 * "because the local controller removed the device due to " +
2138 * "inactivity", CounterType.ALWAYS_COUNT);
2139 * debugCounters.registerCounter(CNT_SYNC_EXCEPTION,
2140 * "Number of times an operation on the sync store resulted in " +
2141 * "sync exception", CounterType.WARN); // it this an error?
2142 * debugCounters.registerCounter(CNT_DEVICES_FROM_STORE,
2143 * "Number of devices that were read from the sync store after " +
2144 * "the local controller transitioned from SLAVE to MASTER",
2145 * CounterType.ALWAYS_COUNT);
2146 * debugCounters.registerCounter(CNT_CONSOLIDATE_STORE_RUNS,
2147 * "Number of times the task to consolidate entries in the " +
2148 * "store witch live known devices has been run",
2149 * CounterType.ALWAYS_COUNT);
2150 * debugCounters.registerCounter(CNT_CONSOLIDATE_STORE_DEVICES_REMOVED,
2151 * "Number of times a device has been removed from the sync " +
2152 * "store because no corresponding live device is known. " +
2153 * "This indicates a remote controller still writing device " +
2154 * "entries despite the local controller being MASTER or an " +
2155 * "incosistent store update from the local controller.",
2156 * CounterType.WARN);
2157 * debugCounters.registerCounter(CNT_TRANSITION_TO_MASTER,
2158 * "Number of times this controller has transitioned from SLAVE " +
2159 * "to MASTER role. Will be 0 or 1.", CounterType.ALWAYS_COUNT);
2164 public HostNodeConnector hostFind(InetAddress networkAddress) {
2165 // TODO Auto-generated method stub
2170 public HostNodeConnector hostQuery(InetAddress networkAddress) {
2171 // TODO Auto-generated method stub
2176 public Future<HostNodeConnector> discoverHost(InetAddress networkAddress) {
2177 // TODO Auto-generated method stub
2182 public List<List<String>> getHostNetworkHierarchy(InetAddress hostAddress) {
2183 // TODO Auto-generated method stub
2188 public Set<HostNodeConnector> getAllHosts() {
2189 Collection<Device> devices = Collections
2190 .unmodifiableCollection(deviceMap.values());
2191 Iterator<Device> i = devices.iterator();
2192 Set<HostNodeConnector> nc = new HashSet<HostNodeConnector>();
2193 while (i.hasNext()) {
2194 Device device = i.next();
2195 nc.add(device.toHostNodeConnector());
2201 public Set<HostNodeConnector> getActiveStaticHosts() {
2202 Collection<Device> devices = Collections
2203 .unmodifiableCollection(deviceMap.values());
2204 Iterator<Device> i = devices.iterator();
2205 Set<HostNodeConnector> nc = new HashSet<HostNodeConnector>();
2206 while (i.hasNext()) {
2207 Device device = i.next();
2208 if(device.isStaticHost())
2209 nc.add(device.toHostNodeConnector());
2215 public Set<HostNodeConnector> getInactiveStaticHosts() {
2216 // TODO Auto-generated method stub
2221 public Status addStaticHost(String networkAddress, String dataLayerAddress,
2222 NodeConnector nc, String vlan) {
2223 Long mac = HexEncode.stringToLong(dataLayerAddress);
2225 InetAddress addr = InetAddress.getByName(networkAddress);
2226 int ip = toIPv4Address(addr.getAddress());
2227 Entity e = new Entity(mac, Short.valueOf(vlan), ip, nc, new Date());
2228 Device d = this.learnDeviceByEntity(e);
2229 d.setStaticHost(true);
2230 return new Status(StatusCode.SUCCESS);
2231 }catch(UnknownHostException e){
2232 return new Status(StatusCode.INTERNALERROR);
2237 public Status removeStaticHost(String networkAddress) {
2240 addr = toIPv4Address(InetAddress.getByName(networkAddress).getAddress());
2241 } catch (UnknownHostException e) {
2242 return new Status(StatusCode.NOTFOUND, "Host does not exist");
2244 Iterator<Device> di = this.getDeviceIteratorForQuery(null, null, addr, null);
2245 List<IDeviceListener> listeners = deviceListeners
2246 .getOrderedListeners();
2247 while(di.hasNext()){
2248 Device d = di.next();
2249 if(d.isStaticHost()){
2251 for (IfNewHostNotify notify : newHostNotify)
2252 notify.notifyHTClientHostRemoved(d.toHostNodeConnector());
2253 for (IDeviceListener listener : listeners)
2254 listener.deviceRemoved(d);
2257 return new Status(StatusCode.SUCCESS);
2261 * For testing: consolidate the store NOW
2263 // void scheduleConsolidateStoreNow() {
2264 // this.storeConsolidateTask.reschedule(0, TimeUnit.MILLISECONDS);
2267 // private class DeviceSyncManager {
2268 // // maps (opaque) deviceKey to the time in System.nanoTime() when we
2269 // // last wrote the device to the sync store
2270 // private ConcurrentMap<Long, Long> lastWriteTimes =
2271 // new ConcurrentHashMap<Long, Long>();
2274 // * Write the given device to storage if we are MASTER.
2275 // * Use this method if the device has significantly changed (e.g.,
2276 // * new AP, new IP, entities removed).
2277 // * @param d the device to store
2279 // public void storeDevice(Device d) {
2284 // long now = System.nanoTime();
2285 // writeUpdatedDeviceToStorage(d);
2286 // lastWriteTimes.put(d.getDeviceKey(), now);
2290 // * Write the given device to storage if we are MASTER and if the
2291 // * last write for the device was more than this.syncStoreIntervalNs
2293 // * Use this method to updated last active times in the store.
2294 // * @param d the device to store
2296 // public void storeDeviceThrottled(Device d) {
2297 // long intervalNs = syncStoreWriteIntervalMs*1000L*1000L;
2302 // long now = System.nanoTime();
2303 // Long last = lastWriteTimes.get(d.getDeviceKey());
2304 // if (last == null ||
2305 // now - last > intervalNs) {
2306 // writeUpdatedDeviceToStorage(d);
2307 // lastWriteTimes.put(d.getDeviceKey(), now);
2309 // debugCounters.updateCounter(CNT_DEVICE_STORE_THROTTLED);
2314 // * Remove the given device from the store. If only some entities have
2315 // * been removed the updated device should be written using
2316 // * {@link #storeDevice(Device)}
2319 // public void removeDevice(Device d) {
2322 // // FIXME: could we have a problem with concurrent put to the
2323 // // hashMap? I.e., we write a stale entry to the map after the
2324 // // delete and now are left with an entry we'll never clean up
2325 // lastWriteTimes.remove(d.getDeviceKey());
2327 // // TODO: should probably do versioned delete. OTOH, even
2328 // // if we accidentally delete, we'll write it again after
2329 // // the next entity ....
2330 // debugCounters.updateCounter(CNT_DEVICE_REMOVED_FROM_STORE);
2331 // storeClient.delete(DeviceSyncRepresentation.computeKey(d));
2332 // } catch(ObsoleteVersionException e) {
2334 // } catch (SyncException e) {
2335 // debugCounters.updateCounter(CNT_SYNC_EXCEPTION);
2336 // logger.error("Could not remove device " + d + " from store", e);
2341 // * Remove the given Versioned device from the store. If the device
2342 // * was locally modified ignore the delete request.
2343 // * @param syncedDeviceKey
2345 // private void removeDevice(Versioned<DeviceSyncRepresentation> dev) {
2347 // debugCounters.updateCounter(CNT_DEVICE_REMOVED_FROM_STORE);
2348 // storeClient.delete(dev.getValue().getKey(),
2349 // dev.getVersion());
2350 // } catch(ObsoleteVersionException e) {
2351 // // Key was locally modified by another thread.
2352 // // Do not delete and ignore.
2353 // } catch(SyncException e) {
2354 // debugCounters.updateCounter(CNT_SYNC_EXCEPTION);
2355 // logger.error("Failed to remove device entry for " +
2356 // dev.toString() + " from store.", e);
2361 // * Synchronously transition from SLAVE to MASTER. By iterating through
2362 // * the store and learning all devices from the store
2364 // private void goToMaster() {
2365 // if (logger.isDebugEnabled()) {
2366 // logger.debug("Transitioning to MASTER role");
2368 // debugCounters.updateCounter(CNT_TRANSITION_TO_MASTER);
2369 // IClosableIterator<Map.Entry<String,Versioned<DeviceSyncRepresentation>>>
2372 // iter = storeClient.entries();
2373 // } catch (SyncException e) {
2374 // debugCounters.updateCounter(CNT_SYNC_EXCEPTION);
2375 // logger.error("Failed to read devices from sync store", e);
2379 // while(iter.hasNext()) {
2380 // Versioned<DeviceSyncRepresentation> versionedDevice =
2381 // iter.next().getValue();
2382 // DeviceSyncRepresentation storedDevice =
2383 // versionedDevice.getValue();
2384 // if (storedDevice == null)
2386 // debugCounters.updateCounter(CNT_DEVICES_FROM_STORE);
2387 // for(SyncEntity se: storedDevice.getEntities()) {
2388 // learnDeviceByEntity(se.asEntity());
2392 // if (iter != null)
2395 // storeConsolidateTask.reschedule(initialSyncStoreConsolidateMs,
2396 // TimeUnit.MILLISECONDS);
2400 // * Actually perform the write of the device to the store
2401 // * FIXME: concurrent modification behavior
2402 // * @param device The device to write
2404 // private void writeUpdatedDeviceToStorage(Device device) {
2406 // debugCounters.updateCounter(CNT_DEVICE_STORED);
2407 // // FIXME: use a versioned put
2408 // DeviceSyncRepresentation storeDevice =
2409 // new DeviceSyncRepresentation(device);
2410 // storeClient.put(storeDevice.getKey(), storeDevice);
2411 // } catch (ObsoleteVersionException e) {
2412 // // FIXME: what's the right behavior here. Can the store client
2413 // // even throw this error?
2414 // } catch (SyncException e) {
2415 // debugCounters.updateCounter(CNT_SYNC_EXCEPTION);
2416 // logger.error("Could not write device " + device +
2417 // " to sync store:", e);
2422 // * Iterate through all entries in the sync store. For each device
2423 // * in the store check if any stored entity matches a live device. If
2424 // * no entities match a live device we remove the entry from the store.
2426 // * Note: we do not check if all devices known to device manager are
2427 // * in the store. We rely on regular packetIns for that.
2428 // * Note: it's possible that multiple entries in the store map to the
2429 // * same device. We don't check or handle this case.
2431 // * We need to perform this check after a SLAVE->MASTER transition to
2432 // * get rid of all entries the old master might have written to the
2433 // * store after we took over. We also run it regularly in MASTER
2434 // * state to ensure we don't have stale entries in the store
2436 // private void consolidateStore() {
2439 // debugCounters.updateCounter(CNT_CONSOLIDATE_STORE_RUNS);
2440 // if (logger.isDebugEnabled()) {
2441 // logger.debug("Running consolidateStore.");
2443 // IClosableIterator<Map.Entry<String,Versioned<DeviceSyncRepresentation>>>
2446 // iter = storeClient.entries();
2447 // } catch (SyncException e) {
2448 // debugCounters.updateCounter(CNT_SYNC_EXCEPTION);
2449 // logger.error("Failed to read devices from sync store", e);
2453 // while(iter.hasNext()) {
2454 // boolean found = false;
2455 // Versioned<DeviceSyncRepresentation> versionedDevice =
2456 // iter.next().getValue();
2457 // DeviceSyncRepresentation storedDevice =
2458 // versionedDevice.getValue();
2459 // if (storedDevice == null)
2461 // for(SyncEntity se: storedDevice.getEntities()) {
2463 // // Do we have a device for this entity??
2464 // IDevice d = findDevice(se.macAddress, se.vlan,
2472 // } catch (IllegalArgumentException e) {
2473 // // not all key fields provided. Skip entity
2477 // // We currently DO NOT have a live device that
2478 // // matches the current device from the store.
2479 // // Delete device from store.
2480 // if (logger.isDebugEnabled()) {
2481 // logger.debug("Removing device {} from store. No "
2482 // + "corresponding live device",
2483 // storedDevice.getKey());
2485 // debugCounters.updateCounter(CNT_CONSOLIDATE_STORE_DEVICES_REMOVED);
2486 // removeDevice(versionedDevice);
2490 // if (iter != null)
2498 // * For testing. Sets the syncService. Only call after init but before
2499 // * startUp. Used by MockDeviceManager
2500 // * @param syncService
2502 // protected void setSyncServiceIfNotSet(ISyncService syncService) {
2503 // if (this.syncService == null)
2504 // this.syncService = syncService;