前面通过阅读代码知道了怎样推断各个模块处理某个消息的先后顺序。那么内部是怎样实现的呢?
每当一个模块表示对一个消息感兴趣的时候,就会调用IFloodlightProviderService(详细有Controller类实现)的addOFMessageListener方法进行注冊订阅, 核心工作是由 ListenerDispatcher类来完毕:1) 每次添加一个观察者的时候都会推断其是否是终结点(也就是不被其它的listener所依赖),由于终于确定这些观察者顺序的时候就是由这些终结点開始往前进行DFS遍历而得到。2)比方说ForwardingBase和Distributing (我们自己加的。没有约束其顺序)。当它们注冊packetin消息的时候。会增加到终结点集合terminals中。所以从它们開始深度遍历的时候得到的有序集合ordering=linkdiscovery,topology,devicemanager, forwarding, distributing( 这里进行了两次DFS traverse)。接下来看代码:
-------------Controller中实现IFloodlightProviderService的方法
@Override
public synchronized void addOFMessageListener(OFType type,
IOFMessageListener listener) {
//先推断与type相应的 ListenerDispatcher对象是否存在
ListenerDispatcher< OFType, IOFMessageListener> ldd =
messageListeners.get(type);
if (ldd == null ) {
ldd = new ListenerDispatcher< OFType, IOFMessageListener>();
messageListeners.put(type, ldd);
}
//注冊监听type这个消息。
ldd.addListener(type, listener);
}
--------------ListenerDispatcher实现(维护这些观察者,有依赖关系)
public class ListenerDispatcher <U, T extends IListener<U>> {
protected static Logger logger = LoggerFactory.getLogger(ListenerDispatcher. class );
List<T> listeners = null ;
//每一个OF msg都有唯一的ListenerDispatcher对象。观察者存在listeners链表中
//从listener这个观察者開始,根据有没有监听者在他之前,进行深度优先遍历
//终于有序序列存在ordering中。visited用于存已经訪问过的terminal listener。
private void visit(List<T> newlisteners, U type, HashSet<T> visited,
List<T> ordering, T listener) {
if (!visited.contains(listener)) {
visited.add(listener);
for (T i : newlisteners) {
if (ispre(type, i, listener)) {
visit(newlisteners, type, visited, ordering, i);
}
}
ordering.add(listener);
}
}
//推断观察者l1 是否在 l2 之前(每一个观察者实现了IListener接口)
private boolean ispre(U type, T l1, T l2) {
return (l2.isCallbackOrderingPrereq(type, l1.getName()) ||
l1.isCallbackOrderingPostreq(type, l2.getName()));
}
//订阅type消息。
public void addListener(U type, T listener) {
List<T> newlisteners = new ArrayList<T>();
if ( listeners != null )
newlisteners.addAll( listeners );
newlisteners.add(listener);
// Find nodes without outgoing edges
List<T> terminals = new ArrayList<T>();
for (T i : newlisteners) {
boolean isterm = true ;
for (T j : newlisteners) {
if (ispre(type, i, j)) {
isterm = false ;
break ;
}
}
if (isterm) {
terminals.add(i); //维护终节点集合
}
}
if (terminals.size() == 0) {
logger .error( "No listener dependency solution: " +
"No listeners without incoming dependencies" );
listeners = newlisteners;
return ;
}
//接下来得到有序的listeners;
// visit depth-first traversing in the opposite order from
// the dependencies. Note we will not generally detect cycles
HashSet<T> visited = new HashSet<T>();
List<T> ordering = new ArrayList <T>();
for (T term : terminals) {
visit(newlisteners, type, visited, ordering, term);
}
listeners = ordering;
}
//观察者退出。为何不直接remove??
public void removeListener(T listener) {
if ( listeners != null ) {
List<T> newlisteners = new ArrayList<T>();
newlisteners.addAll( listeners );
newlisteners.remove(listener);
listeners = newlisteners;
}
}
//清除全部listeners;
public void clearListeners() {
listeners = new ArrayList<T>();
}
//
public List<T> getOrderedListeners() {
return listeners ;
}
}
数据结构关联图:
转载注明出处: