• 0

  • 473

Handler 史诗级分析

6天前

1 Handler 用法总计

1.1 更新Ui

因为Android不允许在子线程更新Ui,所以系统提供一个Handler用于在子线程操作,进行切换线程的机制。也是Handler最重要的功能。

一般我们的用法是这样的,我们使用Handler发送一个Message,然后在Handler的handleMessage进行更新Ui的操作。

1.2 同步屏障

上面我们讲了Handler的常用用法,我们知道Handler的使用需要配合Message一起使用。

在Android系统中,Handler系统支持两种Message,一种是同步消息,一种是异步消息。在系统中,Message是通过链表进存储的。

同步屏障分成两个部分:1 设置同步屏障 2 发生异步消息

当系统读取到一个同步屏障后,会遍历整个消息链表,优先处理存在的异步消息

在系统中的应用

void scheduleTraversals() {
    if (!mTraversalScheduled) {
        mTraversalScheduled = true;
        //设置同步障碍,确保mTraversalRunnable优先被执行
        mTraversalBarrier = mHandler.getLooper().getQueue().postSyncBarrier();
        //内部通过Handler发送了一个异步消息
        mChoreographer.postCallback(
                Choreographer.CALLBACK_TRAVERSAL, mTraversalRunnable, null);
        if (!mUnbufferedInputDispatch) {
            scheduleConsumeBatchedInput();
        }
        notifyRendererOfFramePending();
        pokeDrawLockIfNeeded();
    }
}
复制代码

mTraversalRunnable调用了performTraversals执行measure、layout、draw

为了让mTraversalRunnable尽快被执行,在发消息之前调用MessageQueue.postSyncBarrier设置了同步屏障

1.2.1 同步蔽障使用方法

github.com/bighk/testH…

1.3 IdleHandler

IdleHandler 是 Handler 提供的一种在消息队列空闲时,执行任务的时机,我们常见可以使用用于Ui渲染完成执行后的机制。我们通常可以用来获取控件的宽高

handler?.looper?.queue?.addIdleHandler {
  //增加需要的操作
}
复制代码

系统的使用机制

系统在ActivityThread中,使用idleHandler来更新ActivityClientRecord

@Override
public void handleResumeActivity(IBinder token, boolean finalStateRequest, boolean isForward, String reason){  
   
   unscheduleGcIdler();
   mSomeActivitiesChanged = true;
   ...
   ...
     
   Looper.myQueue().addIdleHandler(new Idler());  
 }
复制代码

2 Handler 初始化

Handler 示意图

通常我们都是在主线程初始化Handler,初始化Handler的方法如下

public Handler() {
        this(null, false);
}
复制代码

会调用到其他的构造方法

public Handler(Callback callback, boolean async) {
    
        if (FIND_POTENTIAL_LEAKS) {
            final Class<? extends Handler> klass = getClass();
            if ((klass.isAnonymousClass() || klass.isMemberClass() || klass.isLocalClass()) &&
                    (klass.getModifiers() & Modifier.STATIC) == 0) {
                Log.w(TAG, "The following Handler class should be static or leaks might occur:klass.getCanonicalName());}
                      }
                
     mLooper = Looper.myLooper();
    if (mLooper == null) {
        throw new RuntimeException(
            "Can't create handler inside thread that has not called Looper.prepare()");
    }
    
    mQueue = mLooper.mQueue;
                      
                      
    mCallback = callback;
    mAsynchronous = async;
}
复制代码

因为callback传的是null ,async 传的是false,所以构造函数这里只有下面的两句干了活

 mLooper = Looper.myLooper();
 mQueue = mLooper.mQueue;
复制代码

3 Looper

Handler 可以想象成一个邮局,用来处理各地发送过来的信件,邮局内部有一个机器人,一个超级大的信箱,它一直坐在邮局内部,一但有信件到达,它就会去邮局的信箱中取出信件,根据时间和收件人处理。

那么Looper就是这个机器人的角色。在Handler的内部有调用一个方法

Looper.myLooper();
复制代码
public static @Nullable Looper myLooper() {
    return sThreadLocal.get();
}
复制代码

这是一个静态的方法,方法内部使用去一个使用ThreadLocal 修饰的内部变量中获取了一个Looper

 static final ThreadLocal<Looper> sThreadLocal = new ThreadLocal<Looper>();
复制代码

3.1 ThreadLocal 关键字

我们看看ThreadLocal的源码

Each thread holds an implicit reference to its copy of a thread-local variable as long as the thread is alive and the {@code ThreadLocal} instance is accessible; after a thread goes away, all of its copies of thread-local instances are subject to garbage collection (unless other references to these copies exist).

大概意思就是如果被这个关键字修饰的变量,每个线程都会持有一份这个变量的拷贝,如果线程在,这个变量就可以访问,否则就会被GC。

简单来说就是一个线程内部的存储类,可以在指定线程内存储数据,数据存储以后,只有指定线程可以得到存储数据 。

3.2 ThreadLocal 用法

ThreadLocal 初始化

 ThreadLocal<String> mLocal = new ThreadLocal<String>();
复制代码

ThreadLocal 存值

mLocal.set("aaaa");
复制代码

ThreadLocal 取值

aLocal.get();
复制代码
  • 在主线程存一次值->"test"
  • 在thread存一次值->"aaaa"
  • 看看主线程的test 会不会更改
public class Demo {

    public static void main(String[] args) {
    
         ThreadLocal<String> aLocal = new ThreadLocal<String>();
         
          aLocal.put("test");
         
          System.out.println(Thread.currentThread().getName()+" content = "+aLocal.get());
         
         Thread thread  = new Thread() {
             public void run() {
                 aLocal.set("aaaa");
                 System.out.println(Thread.currentThread().getName()+"content = "+aLocal.get());
             };
         };
         
         thread.start();
         
         try {
            Thread.sleep(1000);
             } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
         System.out.println(Thread.currentThread().getName()+" content = "+aLocal.get());
    }
 
}
复制代码

结果如下:

main content = test

Thread-0 content = aaaa

main content = test

复制代码

结果显示,被ThreadLocal 修饰的变量只和线程有关

3.3 ThreadLocal 源码分析

在Thread 类的内部有一个成员变量

/* ThreadLocal values pertaining to this thread. This map is maintained
     * by the ThreadLocal class. */
    ThreadLocal.ThreadLocalMap threadLocals = null;
复制代码

这个是一个ThreadLocal 的 Map类型的内部类,这个类是每个线程都有的成员变量

3.3.1 ThreadLocalMap

static class ThreadLocalMap {

    static class Entry extends WeakReference<ThreadLocal<?>> {
        /** The value associated with this ThreadLocal. */
        Object value;

        Entry(ThreadLocal<?> k, Object v) {
            super(k);
            value = v;
        }
    }
    
    ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
            table = new Entry[INITIAL_CAPACITY];
            int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
            table[i] = new Entry(firstKey, firstValue);
            size = 1;
            setThreshold(INITIAL_CAPACITY);
        }

...

}    
复制代码

ThreadLocalMap 的内部存储对象是WeakReference,也就是说非强引用的类型。根据Entry可知道

我们知道这个Map就是存放以ThreadLocal为key的值就可以了。

3.3.2 ThreadLocalMap 的初始化

在线程中的threadLocals变量默认是空的对象,那它是什么时候被初始化的呢?

/* ThreadLocal values pertaining to this thread. This map is maintained
     * by the ThreadLocal class. */
ThreadLocal.ThreadLocalMap threadLocals = null;
复制代码

只有一处地方初始化,就在ThreadLocal 内部初始化的

void createMap(Thread t, T firstValue) {
        t.threadLocals = new ThreadLocalMap(this, firstValue);
 }
复制代码

这个代码也没有什么特别的地方,只是调用了构造函数进行初始化。我们看看createMap 在哪里被调用

public void set(T value) {
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null)
            map.set(this, value);
        else
            createMap(t, value);
    }
复制代码

就在ThreadLocal 插入值的时候被调用了。

3.3.3 为什么ThreadLocal 可以在指定线程存储?

我们刚刚在之前的例子里面看到,在主线程程存储的值,和在线程1存储的值,是不一样的,但是对于程序来说,只有一个ThreadLocal。也就是说这个ThreadLocal 的值,对于每个线程都不一样了。为什么可以实现呢?

因为构造函数内部没有代码,我们直接看set函数是怎么实现的

public void set(T value) {
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null)
            map.set(this, value);
        else
            createMap(t, value);
 }
复制代码

现在假设有一个ThreadLocal (local)的变量,两个线程使用(线程1 线程2 )

  1. 线程1 开始对local 进行赋值,调用set方法
  2. 获取当前线程,当前线程是线程1,并获取当前线程的ThreadLocalMap。因为线程1默认没有ThreadLocalMap,所以为空,所以创建了一个new ThreadLocalMap(this, firstValue);这个ThreadLocalMap是属于线程1的一个变量
  3. 线程2 开始对local 进行赋值,调用set方法
  4. 获取当前线程,当前线程是线程2,并获取当前线程的ThreadLocalMap。因为线程2默认没有ThreadLocalMap,所以为空,所以创建了一个new ThreadLocalMap(this, firstValue);这个ThreadLocalMap是属于线程2的一个变量
  5. 到现在,我们分别在线程1 和线程2上分别创建了一个ThreadLocalMap。
  6. 如果我们修改线程1上的ThreadLocal的值,会再次调用set方法
  7. 这次因为已经创建了ThreadLocalMap,直接根据key修改就可以了,这时修改的是线程1的,不会影响到线程2上的值。

3.4 为什么要使用ThreadLocal 关键字

我们知道Handler是用来切换线程的,当我们需要在子线程上进行Ui操作,必须切换到主线程去,所以我们必须有一个机制能够获取到主线程的操作符,这个时候我们就需要使用ThreadLocal ,会使用ThreadLocal +Looper 这个组合来标记主线程和主线程的操作句柄。

3.5 Looper 什么时候创建的

我们创建一个Handler 的时候,会调用Looper的myLooper()方法

public Handler(Callback callback, boolean async) {
    
        if (FIND_POTENTIAL_LEAKS) {
            final Class<? extends Handler> klass = getClass();
            if ((klass.isAnonymousClass() || klass.isMemberClass() || klass.isLocalClass()) &&
                    (klass.getModifiers() & Modifier.STATIC) == 0) {
                Log.w(TAG, "The following Handler class should be static or leaks might occur:klass.getCanonicalName());}
                      }
     // 1           
     mLooper = Looper.myLooper();
    if (mLooper == null) {
        throw new RuntimeException(
            "Can't create handler inside thread that has not called Looper.prepare()");
    }
    
    mQueue = mLooper.mQueue;
                      
                      
    mCallback = callback;
    mAsynchronous = async;
}
复制代码

如果为空的话,就会抛一个异常出去。但是我们使用Handler 通常都是直接 Handler()。我们没有主动创建,那Looper 是在那里初始化的呢?

在开启一个新的应用的时候,会启动一个类ActivityThread,这个类就是我们说的主线程。我们知道,Java程序执行的入口在Main 方法。我们看看ActivityThread的main 方法

public static void main(String[] args) {

    Trace.traceBegin(Trace.TRACE_TAG_ACTIVITY_MANAGER, "ActivityThreadMain");

    SamplingProfilerIntegration.start();

    CloseGuard.setEnabled(false);

    Environment.initForCurrentUser();

    EventLogger.setReporter(new EventLoggingReporter());

    final File configDir = Environment.getUserConfigDirectory(UserHandle.myUserId());
    TrustedCertificateStore.setDefaultUserDirectory(configDir);

    Process.setArgV0("<pre-initialized>");
    
    //创建主线程的Looper
    Looper.prepareMainLooper();
    
    ActivityThread thread = new ActivityThread();
    thread.attach(false);

    if (sMainThreadHandler == null) {
            sMainThreadHandler = thread.getHandler();
     }
    if (false) {
            Looper.myLooper().setMessageLogging(new
                    LogPrinter(Log.DEBUG, "ActivityThread"));
     }
     Trace.traceEnd(Trace.TRACE_TAG_ACTIVITY_MANAGER);
     //开始loop循环
     Looper.loop();

     throw new RuntimeException("Main thread loop unexpectedly exited");    
复制代码

main() 方法内部有两句比较重要的代码,第一句

Looper.prepareMainLooper();
复制代码

我们看看这个方法的内部实现

//准备主线程的Looper
public static void prepareMainLooper() {
        //生成一个Looper
        prepare(false);
    
        synchronized (Looper.class) {
            if (sMainLooper != null) {
                throw new IllegalStateException("The main Looper has already been prepared.");
            }
            //保存主线程的Looper
            sMainLooper = myLooper();
        }
 }
复制代码
private static void prepare(boolean quitAllowed) {
        if (sThreadLocal.get() != null) {
            throw new RuntimeException("Only one Looper may be created per thread");
        }
        sThreadLocal.set(new Looper(quitAllowed));
 }
复制代码
// 私有构造函数
private Looper(boolean quitAllowed) {
        mQueue = new MessageQueue(quitAllowed);
        mThread = Thread.currentThread();
    }
复制代码

实际上,prepareMainLooper 就是在主线程创建一个Looper,那么如果我们在OnCreate() 等生命周期方法创建Hnadler(),因为是在主线程运行,最后获取到的就是这个主线程Looper,

第二句

Looper.loop();
复制代码
public static void loop() {
        // 获取当前线程的Looper
        final Looper me = myLooper();
        if (me == null) {
            throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
        }
    
        //获取到消息队列
        final MessageQueue queue = me.mQueue;

       
        Binder.clearCallingIdentity();
        final long ident = Binder.clearCallingIdentity();
        
        //开始循环取数据
        for (;;) {
            Message msg = queue.next(); // might block
            if (msg == null) {
                // No message indicates that the message queue is quitting.
                return;
            }

            // This must be in a local variable, in case a UI event sets the logger
            final Printer logging = me.mLogging;
            if (logging != null) {
                logging.println(">>>>> Dispatching to " + msg.target + " " +
                        msg.callback + ": " + msg.what);
            }

            final long slowDispatchThresholdMs = me.mSlowDispatchThresholdMs;

            final long traceTag = me.mTraceTag;
            if (traceTag != 0 && Trace.isTagEnabled(traceTag)) {
                Trace.traceBegin(traceTag, msg.target.getTraceName(msg));
            }
            final long start = (slowDispatchThresholdMs == 0) ? 0 : SystemClock.uptimeMillis();
            final long end;
            try {
                msg.target.dispatchMessage(msg);
                end = (slowDispatchThresholdMs == 0) ? 0 : SystemClock.uptimeMillis();
            } finally {
                if (traceTag != 0) {
                    Trace.traceEnd(traceTag);
                }
            }
            if (slowDispatchThresholdMs > 0) {
                final long time = end - start;
                if (time > slowDispatchThresholdMs) {
                    Slog.w(TAG, "Dispatch took " + time + "ms on "
                            + Thread.currentThread().getName() + ", h=" +
                            msg.target + " cb=" + msg.callback + " msg=" + msg.what);
                }
            }

            if (logging != null) {
                logging.println("<<<<< Finished to " + msg.target + " " + msg.callback);
            }

            // Make sure that during the course of dispatching the
            // identity of the thread wasn't corrupted.
            final long newIdent = Binder.clearCallingIdentity();
            if (ident != newIdent) {
                Log.wtf(TAG, "Thread identity changed from 0x"
                        + Long.toHexString(ident) + " to 0x"
                        + Long.toHexString(newIdent) + " while dispatching to "
                        + msg.target.getClass().getName() + " "
                        + msg.callback + " what=" + msg.what);
            }

            msg.recycleUnchecked();
        }
    }
复制代码

Looper 内部的原理很简单:

  1. 创建一个消息队列(MessageQueue)
  2. 不停的去消息队列中获取入队的消息 loop()

3.6 loop() 是否会ANR

public static void loop() {
        // 获取当前线程的Looper
        final Looper me = myLooper();
        if (me == null) {
            throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
        }
    
        //获取到消息队列
        final MessageQueue queue = me.mQueue;

       
        Binder.clearCallingIdentity();
        final long ident = Binder.clearCallingIdentity();
        
        //开始循环取数据
        for (;;) {
            // might block
            Message msg = queue.next(); 
            if (msg == null) {
                // No message indicates that the message queue is quitting.
                return;
            }

            // This must be in a local variable, in case a UI event sets the logger
            final Printer logging = me.mLogging;
            if (logging != null) {
                logging.println(">>>>> Dispatching to " + msg.target + " " +
                        msg.callback + ": " + msg.what);
            }

            final long slowDispatchThresholdMs = me.mSlowDispatchThresholdMs;

            final long traceTag = me.mTraceTag;
            if (traceTag != 0 && Trace.isTagEnabled(traceTag)) {
                Trace.traceBegin(traceTag, msg.target.getTraceName(msg));
            }
            final long start = (slowDispatchThresholdMs == 0) ? 0 : SystemClock.uptimeMillis();
            final long end;
            try {
                msg.target.dispatchMessage(msg);
                end = (slowDispatchThresholdMs == 0) ? 0 : SystemClock.uptimeMillis();
            } finally {
                if (traceTag != 0) {
                    Trace.traceEnd(traceTag);
                }
            }
            if (slowDispatchThresholdMs > 0) {
                final long time = end - start;
                if (time > slowDispatchThresholdMs) {
                    Slog.w(TAG, "Dispatch took " + time + "ms on "
                            + Thread.currentThread().getName() + ", h=" +
                            msg.target + " cb=" + msg.callback + " msg=" + msg.what);
                }
            }

            if (logging != null) {
                logging.println("<<<<< Finished to " + msg.target + " " + msg.callback);
            }

            // Make sure that during the course of dispatching the
            // identity of the thread wasn't corrupted.
            final long newIdent = Binder.clearCallingIdentity();
            if (ident != newIdent) {
                Log.wtf(TAG, "Thread identity changed from 0x"
                        + Long.toHexString(ident) + " to 0x"
                        + Long.toHexString(newIdent) + " while dispatching to "
                        + msg.target.getClass().getName() + " "
                        + msg.callback + " what=" + msg.what);
            }

            msg.recycleUnchecked();
        }
    }
复制代码

这个就是Looper 的loop方法,处理所有的消息,但是这个处理有点粗暴,直接一个for循环进行处理,而且这个for循环还没有退出的条件?那这个会不会阻塞主线程呢?

答案是不会的,

Message msg = queue.next(); 
复制代码

这句代码调用了消息队列进行获取最新的消息,如果没有,就进行阻塞,让出cpu。因为涉及到底层的内容,我们分析到MessageQueue再继续分析。

3.7 非主线程创建Handler

刚刚我们分析了主线程在创建的时候,会创建一个对应主线的Looper, 如果非主线程要创建Handler 怎么办呢?

class LooperThread extends Thread {
      public Handler mHandler;
 
      public void run() {
          Looper.prepare();

           mHandler = new Handler() {
               public void handleMessage(Message msg) {
                   // process incoming messages here
               }
           };
 
           Looper.loop();
  }
复制代码

注意,这个Handler 是无法更新UI的,因为这个mHandler的handleMessage实际运行还是LooperThread不是主线程。

如果不想自己创建,可以使用Android提供给我们的一个API 来直接创建一个带Handler的线程 HandlerThread。

4 MessageQueue

根据上一节,我们知道了Handler获取一条信息是通过Lopper的loop()方法进行的,loop()内部调用了MessageQueue的next()方法。

MessageQueue 是Handler 和核心类,用来管理消息。在Looper 创建的时候,就会创建一个对应这个Looper 的MessageQueue。MessageQueue负责对Message进行入队和出队的操作。

因为Handler 架构涉及到java 层和native层,架构如下图所示

4.1 MessageQueue 构造函数

MessageQueue(boolean quitAllowed) {
        //是否能够退出队列
        mQuitAllowed = quitAllowed;
        //native初始化 并记录nativeMessageQueue的指针地址
        mPtr = nativeInit();
    }
复制代码
private native static long nativeInit();
复制代码

在构造函数中调用了jni 方法进行初始化。进行jni 方法前,有必要和大家讲一下Handler 的分层结构

Handler 除了有java 层的 实现外,也有c++ 层的实现,源码的位置

framework/base/core/java/andorid/os/MessageQueue.java framework/base/core/java/andorid/os/Looper.java 

framework/base/core/jni/android_os_MessageQueue.cpp 
system/core/libutils/Looper.cpp 
system/core/include/utils/Looper.h 
system/core/libutils/RefBase.cpp 

复制代码

4.1.1 nativeInit

nativeInit 方法注册在android_os_MessageQueue.cpp 使用jni进行编写

static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
    //new 了一个 NativeMessageQueue对象出来 (4.1.1节)
    NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
    //如果生成该对象失败,抛异常
    if (!nativeMessageQueue) {
        jniThrowRuntimeException(env, "Unable to allocate native queue");
        return 0;
    }
    //强引用计数加1,用于内存管理(4.1.2节)
    nativeMessageQueue->incStrong(env);
  
    //强制转换指针,保存在java层,进行后续的操作
    return reinterpret_cast<jlong>(nativeMessageQueue);
}
复制代码

4.1.2 NativeMessageQueue

为什么要生成一个NativeMessageQueue 对象,NativeMessageQueue 它继承MessageQueue这个类,实际的操作还是通过NativeMessageQueue来进行的

class NativeMessageQueue : public MessageQueue, public LooperCallback {
public:
    NativeMessageQueue();
    virtual ~NativeMessageQueue();

    virtual void raiseException(JNIEnv* env, const char* msg, jthrowable exceptionObj);

    void pollOnce(JNIEnv* env, jobject obj, int timeoutMillis);
    void wake();
    void setFileDescriptorEvents(int fd, int events);

    virtual int handleEvent(int fd, int events, void* data);

private:
    JNIEnv* mPollEnv;
    jobject mPollObj;
    jthrowable mExceptionObj;
};
复制代码

4.1.3 Android native 智能指针

我们知道Java是jvm 进行gc的,c++内部是需要自动进行销毁的。为了方便android 给我们提供了一个RefBase 来进行对象的管理,在Android Native体系架构中,通过sp(strong pointer),wp(weak pointer) 这一系列强弱引用计数实现对对象生命周期的管理。

他的出现类似于Java中的回收器,或者是OC中的自动释放池的功能等,他内部实现也很简单,就是用两个变量来控制,一个是强引用计数变量,一个是弱引用计数变量,这两个变量都是int类型的,表示一个对象被引用多少次,两个变量会依据具体的生命管理周期模式来决定是否释放对象。

举个例子,以两个对象A,B为例,A持有B,B也持有A,当A不用的时候,准备回收A的时候,发现B持有A对象的引用,所以系统判断A还是有用的,无法进行回收,同理B也可能出现上述的情况。

所以Android 引进了智能指针

还是上面的例子,A持有B , B也持有A,A通过强指针持有B,B通过弱指针持有A,当A生命周期结束时,因为B是弱引用,所以不影响A的回收,当A回收后,强指针也被回收了,不影响B。

源码位置:

system/core/libutils/RefBase.cpp
system/core/include/utils/RefBase.h
system/core/include/utils/StrongPointer.h
frameworks/base/core/jni/android_os_MessageQueue.h
复制代码
class MessageQueue : public virtual RefBase {
复制代码
// 实现了RefBase后可以使用智能智能指针,强引用+1 
nativeMessageQueue->incStrong(env);
复制代码

简单看看RefBase的源码

class RefBase::weakref_impl : public RefBase::weakref_type{
    
    //原子计数
    public:
      //对于强引用计数控制是通过这个变
        std::atomic<int32_t>    mStrong;
      //对于弱引用的计数控制是通过这个变量
        std::atomic<int32_t>    mWeak;
        RefBase* const          mBase;
        std::atomic<int32_t>    mFlags;

 
     ...
     
     void addStrongRef(const void* id) {
        //记录操作数的方法
        addRef(&mStrongRefs, id, mStrong.load(std::memory_order_relaxed));
    }
    
     //使用链表进行记录
     void addRef(ref_entry** refs, const void* id, int32_t mRef)
    {
        if (mTrackEnabled) {
            AutoMutex _l(mMutex);

            ref_entry* ref = new ref_entry;
            ref->ref = mRef;
            ref->id = id;
            ref->next = *refs;
            *refs = ref;
        }
    }
    ...
}
复制代码
void RefBase::incStrong(const void* id) const
{   
    //内部的实现类
    weakref_impl* const refs = mRefs;
    
    //weakref_impl 没有实现incWeak,调用了weakref_impl 的父类 weakref_type的incWeak
    refs->incWeak(id);
    
    //空实现
    refs->addStrongRef(id);
    
    //mStrong 原子+1
    const int32_t c = refs->mStrong.fetch_add(1, std::memory_order_relaxed);
   
    if (c != INITIAL_STRONG_VALUE)  {
        return;
    }

     //mStrong 计数+1
    int32_t old = refs->mStrong.fetch_sub(INITIAL_STRONG_VALUE,
            std::memory_order_relaxed);
  
    refs->mBase->onFirstRef();
}
复制代码
void RefBase::decStrong(const void* id) const
{
    weakref_impl* const refs = mRefs;
    //清除强引用
    refs->removeStrongRef(id);
    //强引用-1
    const int32_t c = refs->mStrong.fetch_sub(1, std::memory_order_release);
 
    LOG_ALWAYS_FATAL_IF(BAD_STRONG(c), "decStrong() called on %p too many times",
            refs);
    if (c == 1) {
        std::atomic_thread_fence(std::memory_order_acquire);
        refs->mBase->onLastStrongRef(id);
        int32_t flags = refs->mFlags.load(std::memory_order_relaxed);
        if ((flags&OBJECT_LIFETIME_MASK) == OBJECT_LIFETIME_STRONG) {
            delete this;
            // The destructor does not delete refs in this case.
        }
    }
    // Note that even with only strong reference operations, the thread
    // deallocating this may not be the same as the thread deallocating refs.
    // That's OK: all accesses to this happen before its deletion here,
    // and all accesses to refs happen before its deletion in the final decWeak.
    // The destructor can safely access mRefs because either it's deleting
    // mRefs itself, or it's running entirely before the final mWeak decrement.
    //清除弱引用
    refs->decWeak(id);
}
复制代码
void RefBase::weakref_type::decWeak(const void* id)
{
    weakref_impl* const impl = static_cast<weakref_impl*>(this);
    //弱引用解除
    impl->removeWeakRef(id);
    //弱引用-1
    const int32_t c = impl->mWeak.fetch_sub(1, std::memory_order_release);
    LOG_ALWAYS_FATAL_IF(BAD_WEAK(c), "decWeak called on %p too many times",
            this);
    if (c != 1) return;
    atomic_thread_fence(std::memory_order_acquire);

    int32_t flags = impl->mFlags.load(std::memory_order_relaxed);
    if ((flags&OBJECT_LIFETIME_MASK) == OBJECT_LIFETIME_STRONG) {
        // This is the regular lifetime case. The object is destroyed
        // when the last strong reference goes away. Since weakref_impl
        // outlives the object, it is not destroyed in the dtor, and
        // we'll have to do it here.
        if (impl->mStrong.load(std::memory_order_relaxed)
                == INITIAL_STRONG_VALUE) {
            // Decrementing a weak count to zero when object never had a strong
            // reference.  We assume it acquired a weak reference early, e.g.
            // in the constructor, and will eventually be properly destroyed,
            // usually via incrementing and decrementing the strong count.
            // Thus we no longer do anything here.  We log this case, since it
            // seems to be extremely rare, and should not normally occur. We
            // used to deallocate mBase here, so this may now indicate a leak.
            ALOGW("RefBase: Object at %p lost last weak reference "
                    "before it had a strong reference", impl->mBase);
        } else {
            // ALOGV("Freeing refs %p of old RefBase %p\n", this, impl->mBase);
            delete impl;
        }
    } else {
        // This is the OBJECT_LIFETIME_WEAK case. The last weak-reference
        // is gone, we can destroy the object.
        impl->mBase->onLastWeakRef(id);
        delete impl->mBase;
    }
}
复制代码
void RefBase::weakref_type::incWeak(const void* id)
{
    weakref_impl* const impl = static_cast<weakref_impl*>(this);
    //空实现
    impl->addWeakRef(id);
    //mWeak 计数+1
    const int32_t c __unused = impl->mWeak.fetch_add(1,
            std::memory_order_relaxed);
    ALOG_ASSERT(c >= 0, "incWeak called on %p after last weak ref", this);
}
复制代码

所以总结如下:

引用类型 强引用计数 弱引用计数
sp构造 +1 +1
wp构造   +1
sp析构 -1 -1
wp析构   -1

3 指针转换

将 new出来的nativeMessageQueue指针转换成java long 型,在java 层进行保存

reinterpret_cast<jlong>(nativeMessageQueue)
复制代码

总结一下 nativeInit 干了什么事情

  1. 创建一个C++ 层的NativeMessageQueue
  2. 使用智能指针对这个NativeMessageQueue的生命周期进行管理
  3. 将这个NativeMessageQueue的指针返回java层,供后续操作

4.2 next() 方法分析

根据之前的分析我们知道了获取下一条消息的方法是next()方法,我们来看看它的实现

4.2.1 消息出队示意图

为了防止大家晕菜,先给大家看看一个示意图

1 消息队列中有消息的情况

(按顺序取出一条条消息)

2 消息队列中没有消息的情况

​ (如果没有消息的话,Looper 阻塞 在取消息的方法,然后等新的消息进来后,被唤醒在取消息的方法)

3 消息队列中有一个延期消息

(如果队列中是一条延迟消息的话,延迟n秒后唤醒)

4.2.2 next 分析

Message next() {
  
    //如果没有初始化的native指针,立即返回
    final long ptr = mPtr;
    if (ptr == 0) {
        return null;
    }

    //IdleHandler的数量
    int pendingIdleHandlerCount = -1;  
    //下一条消息的delay
    int nextPollTimeoutMillis = 0;
    
    //死循环
    for (;;) {
        if (nextPollTimeoutMillis != 0) {
            Binder.flushPendingCommands();
        }

        //native方法,可能会照成阻塞,第一次进去是0,0就立马返回了,-1会一直阻塞
        nativePollOnce(ptr, nextPollTimeoutMillis);

       //native 方法返回了,可能有以下几种情况
       // 1 第一次 nextPollTimeoutMillis = 0 ,直接返回了
       // 2 nextPollTimeoutMillis 的阻塞时间到了,返回了
       // 3 新来了一条message,唤醒了阻塞的native方法
        synchronized (this) {
            
            //当前的时间 
            final long now = SystemClock.uptimeMillis();
            
            Message prevMsg = null;
            Message msg = mMessages;
          
            //处理同步壁障
            //如果消息不为空,而且没有target,这个消息就是一个 同步壁障
            if (msg != null && msg.target == null) {
                //如果有同步壁障的话,会沿着链表查找到所有的异步消息
                do {
                    prevMsg = msg;
                    msg = msg.next;
                } while (msg != null && !msg.isAsynchronous());
            }
          
            //开始处理这条消息
            if (msg != null) {
               //如果这个消息的时间还没有到的话
                if (now < msg.when) {
                    //记录阻塞时间 
                    nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
                } else {
                     //返回这个消息,并在链表删除
                    mBlocked = false;
                    if (prevMsg != null) {
                        prevMsg.next = msg.next;
                    } else {
                        mMessages = msg.next;
                    }
                    msg.next = null;
                    if (DEBUG) Log.v(TAG, "Returning message: " + msg);
                    msg.markInUse();
                    return msg;
                }
            } else {
                // 链表没有消息,nextPollTimeoutMillis为-1的话,会一直阻塞等待到下一条消息来
                nextPollTimeoutMillis = -1;
            }

            //处理退出的情况
            if (mQuitting) {
                dispose();
                return null;
            }

          
            if (pendingIdleHandlerCount < 0
                    && (mMessages == null || now < mMessages.when)) {
                //获取IdleHandler的个数
                pendingIdleHandlerCount = mIdleHandlers.size();
            }
            
            //如果没有IdleHandler就重新 
            if (pendingIdleHandlerCount <= 0) {
                // No idle handlers to run.  Loop and wait some more.
                mBlocked = true;
                continue;
            }

            //用一个临时数组保存IdleHandler
            if (mPendingIdleHandlers == null) {
                mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
            }
            mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
        }

         //处理IdleHandler
        for (int i = 0; i < pendingIdleHandlerCount; i++) {
            final IdleHandler idler = mPendingIdleHandlers[i];
            mPendingIdleHandlers[i] = null; // release the reference to the handler
            
            //如果为空的话,当空闲的时候继续提示
            boolean keep = false;
            try {
                keep = idler.queueIdle();
            } catch (Throwable t) {
                Log.wtf(TAG, "IdleHandler threw exception", t);
            }

            if (!keep) {
                synchronized (this) {
                    mIdleHandlers.remove(idler);
                }
            }
        }

       
        pendingIdleHandlerCount = 0;
        nextPollTimeoutMillis = 0;
    }
}
复制代码

4.2.3 IdleHandler分析

当每次next()方法时,消息队列没有消息的时候,Looper会去尝试读取IdleHandler的队列。如果IdleHandler有配置

当没有消息的时候,会去idleHandler的队列的对象。然后根据IdleHandler的返回值进行处理,如果是true就继续下次空闲处理,false就删除。

当处理完idleHandler 后仍没有消息来的话,会阻塞在nativePollOnce(),当新的消息来了,返回Looper.loop(),重新执行next方法,会重新判断是否有空闲处理。

4.3 nativePollOnce

根据上面的分析,我们知道了next的核心机制就是 nativePollOnce 因为它类似java的wait 和 notify 机制。能够阻塞和唤醒代码,为啥不直接使用java的wait 和 notify,其实在2.2之前,都是使用wait 和 notify,后面为了native方面的通信,才使用native的方法,nativePollOnce最早使用的select之后才使用的Epoll 机制。

这部分涉及到的代码:

frameworks\base\core\java\android\os\Looper.java
frameworks\base\core\java\android\os\MessageQueue.java

frameworks\base\core\jni\android_os_MessageQueue.cpp
frameworks\base\core\jni\android_os_MessageQueue.h

system\core\libutils\Looper.cpp
复制代码

4.3.1Epoll 机制

分析native代码前,要和大家普及一下Linux下的进程的状态

正在执行的进程,由于期待的某些事件未发生,如请求系统资源失败、等待某种操作的完成、新数据尚未到达或无新工作做等,则由系统自动执行阻塞原语(Block),使自己由运行状态变为阻塞状态。可见,进程的阻塞是进程自身的一种主动行为,也因此只有处于运行态的进程(获得CPU),才可能将其转为阻塞状态。当进程进入阻塞状态,是不占用CPU资源的

为什么Handler 能做到阻塞和唤醒,是使用了java 的wait 和notify 吗?实际上不是的,是通过了Linux的一个Epoll 机制实现的。

首先在Linux 中所有的东西都是文件,包括设备。

文件描述符(File descriptor),是一个用于表述指向文件的引用的抽象化概念。你可以简单理解为是指向文件的指针。 文件描述符这一概念往往只适用于UNIX、Linux这样的操作系统。

Epoll是Linux 内部的一个编程函数,用于监听多个IO,当任何一个IO,发生数据读写的话,会通知监听的对象。

Epoll 的使用也很简单,涉及到的函数与定义

#include <sys/epoll.h>
1int epoll_create(int size):用于创建一个epoll的文件描述符,创建的文件描述符可监听size个文件描述符;
参数介绍:
size:size是指监听的描述符个数

2int epoll_ctl(int epfd, int op, int fd, struct epoll_event * event): 用于对需要监听的文件描述符fd执行op操作,比如将fd添加到epoll文件描述符epfd;
参数介绍:
epfd:是epoll_create()的返回值
op:表示op操作,用三个宏来表示,分别为EPOLL_CTL_ADD(添加)、EPOLL_CTL_DEL(删除)和EPOLL_CTL_MOD(修改)
fd:需要监听的文件描述符
epoll_event:需要监听的事件,有4种类型的事件,分别为EPOLLIN(文件描述符可读)、EPOLLOUT(文件描述符可写), EPOLLERR(文件描述符错误)和EPOLLHUP(文件描述符断)

3int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout): 等待事件的上报, 该函数返回需要处理的事件数目,如返回0表示已超时;
参数介绍:
epfd:等待epfd上的io事件,最多返回maxevents个事件
events:用来从内核得到事件的集合
maxevents:events数量,该maxevents值不能大于创建epoll_create()时的size
timeout:超时时间(毫秒,0会立即返回,-1永久阻塞)


复制代码
int enentfd(unsigned int initval,int flags)
  创建一个文件描述符
复制代码

下面是一个epoll的简单的例子

#define MAX_EVENTS 10
//接受的事件 和 返回事件的数组
struct epoll_event ev, events[MAX_EVENTS];

//eventfd 创建的文件描述符
int listen_sock, conn_sock, nfds, epollfd;

//1 创建一个epoll的文件描述符
epollfd = epoll_create(10);

//创建epoll的文件描述符失败
if (epollfd == -1) {
    perror("epoll_create");
    exit(EXIT_FAILURE);
}

//监听数据写入
ev.events = EPOLLIN;
//事件的文件描述符是listen_sock
ev.data.fd = listen_sock;

//2 将文件描述符添加进入epoll的文件描述符
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, listen_sock, &ev) == -1) {
    perror("epoll_ctl: listen_sock");
    exit(EXIT_FAILURE);
}

//开启循环监听数据写入
for (;;) {
    
    //3 一直阻塞监听epoll文件描述符的写入,events 是用来返回接受到的信息,
    //nfds是接收到的事件数量
    nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1);
    if (nfds == -1) {
        perror("epoll_pwait");
        exit(EXIT_FAILURE);
    }
  
//遍历接收到的事件  
for (n = 0; n < nfds; ++n) {
     //如果这个事件的描述符是listen_sock
    if (events[n].data.fd == listen_sock) {
        conn_sock = accept(listen_sock,
                        (struct sockaddr *) &local, &addrlen);
        if (conn_sock == -1) {
            perror("accept");
            exit(EXIT_FAILURE);
        }
        setnonblocking(conn_sock);
        ev.events = EPOLLIN | EPOLLET;
        ev.data.fd = conn_sock;
        //增加事件conn_sock进入监听
        if (epoll_ctl(epollfd, EPOLL_CTL_ADD, conn_sock,
                    &ev) == -1) {
            perror("epoll_ctl: conn_sock");
            exit(EXIT_FAILURE);
        }
    } else {
        do_use_fd(events[n].data.fd);
    }
}
}
复制代码

经过上面的学习,我们知道了Epoll的六个步骤

  1. 使用enentfd 创建一个文件描述符
  2. epoll_create 传进一个epoll 文件描述符
  3. 创建一个epoll_event,指定这个event的文件描述符是enentfd创建的文件描述符
  4. 使用epoll_ctl 创建的epoll_event绑定到epoll_create的epoll 文件描述符
  5. 创建一个epoll_event数组,用于接收返回的数据
  6. 使用epoll_wait监听接收到的事件

4.3.2 nativePollOnce 分析

private native void nativePollOnce(long ptr, int timeoutMillis);
复制代码

这个是nativePollOnce的方法签名

对应的nativie的实现在android_os_MessgeQueue.cpp内部

static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj, jlong ptr, jint timeoutMillis) {
    //将Java层传递下来的mPtr转换为nativeMessageQueue
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
    nativeMessageQueue->pollOnce(env, obj, timeoutMillis); 
}
复制代码

ptr 是在MessageQueue 创建的时候保存的指针,指针指向创建的NativeMessageQueue

nativePollOnce调用到NativeMessageQueue 的 pollOnce方法

4.3.3 pollOnce

void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
    mPollEnv = env;
    mPollObj = pollObj;
    //核心
    mLooper->pollOnce(timeoutMillis); 【4】
    mPollObj = NULL;
    mPollEnv = NULL;
    if (mExceptionObj) {
        env->Throw(mExceptionObj);
        env->DeleteLocalRef(mExceptionObj);
        mExceptionObj = NULL;
    }
}
复制代码

我们发现 pollOnce 调用到了 mLooper的 pollOnce 方法。这个mLooper是什么呢?

4.3.4 Looper.cpp

NativeMessageQueue::NativeMessageQueue() :
        mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
    
    //当前的线程获取一个Looper
    mLooper = Looper::getForThread();
    if (mLooper == NULL) {
        //为当前线程创建一个Looper
        mLooper = new Looper(false);
        Looper::setForThread(mLooper);
    }
}
复制代码

Looper构造函数

Looper::Looper(bool allowNonCallbacks) :
        mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
        mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false),
        mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
          
    //创建了一个文件描述符      
    mWakeEventFd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
    
    LOG_ALWAYS_FATAL_IF(mWakeEventFd < 0, "Could not make wake event fd: %s",
                        strerror(errno));

    AutoMutex _l(mLock);
    
    //创建epoll文件符      
    rebuildEpollLocked();
}
复制代码
void Looper::rebuildEpollLocked() {
    // Close old epoll instance if we have one.
    //如果已经存在了epoll描述符,关闭
    if (mEpollFd >= 0) {
        close(mEpollFd);
    }

    //创建一个epoll描述符 
    mEpollFd = epoll_create(EPOLL_SIZE_HINT);
    
    //创建监听事件 和 回调事件的数组
    struct epoll_event eventItem;
    
    //数据写入监听,文件描述符是之前创建的mWakeEventFd
    eventItem.events = EPOLLIN;
    eventItem.data.fd = mWakeEventFd;
  
    //将事件添加到到epoll的文件描述符上
    int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem);
    
    LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake event fd to epoll instance: %s",
                        strerror(errno));

    for (size_t i = 0; i < mRequests.size(); i++) {
        const Request& request = mRequests.valueAt(i);
        struct epoll_event eventItem;
        request.initEventItem(&eventItem);

        int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, request.fd, & eventItem);
        if (epollResult < 0) {
            ALOGE("Error adding epoll events for fd %d while rebuilding epoll set: %s",
                  request.fd, strerror(errno));
        }
    }
}
复制代码

Looper的析构函数

Looper::~Looper() {
     //关闭了文件描述符
    close(mWakeEventFd);
    mWakeEventFd = -1;
    // 关闭了epoll文件描述符
    if (mEpollFd >= 0) {
        close(mEpollFd);
    }
}
复制代码

Looper的构造函数主要就是初始化了epoll的文件描述符

4.3.5 Looper.cpp -> pollOnce

int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
    int result = 0;
    for (;;) {
        while (mResponseIndex < mResponses.size()) {
            const Response& response = mResponses.itemAt(mResponseIndex++);
            int ident = response.request.ident;
            if (ident >= 0) {
                int fd = response.request.fd;
                int events = response.events;
                void* data = response.request.data;

                if (outFd != NULL) *outFd = fd;
                if (outEvents != NULL) *outEvents = events;
                if (outData != NULL) *outData = data;
                return ident;
            }
        }

        if (result != 0) {
            if (outFd != NULL) *outFd = 0;
            if (outEvents != NULL) *outEvents = 0;
            if (outData != NULL) *outData = NULL;
            return result;
        }

         //实际上的逻辑
        result = pollInner(timeoutMillis);
    }
}
复制代码

4.3.6 Looper.cpp -> pollInner

int Looper::pollInner(int timeoutMillis) {
   
   //调整等待的时间
   if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
        nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
        int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
        if (messageTimeoutMillis >= 0
                && (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {
            timeoutMillis = messageTimeoutMillis;
        }
 
    // Poll.
    int result = POLL_WAKE;
    mResponses.clear();
    mResponseIndex = 0;

    // 进入阻塞状态.
    mPolling = true;

    //使用 epoll_wait 来阻塞获取文件输入
    struct epoll_event eventItems[EPOLL_MAX_EVENTS];
    int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);

    // 取消阻塞状态.
    mPolling = false;

    // Acquire lock.
    mLock.lock();

    // 如果需要重建epoll
    if (mEpollRebuildRequired) {
        mEpollRebuildRequired = false;
        rebuildEpollLocked();
        goto Done;
    }

   
    //epoll-wait 返回负数,出错了
    if (eventCount < 0) {
        if (errno == EINTR) {
            goto Done;
        }
        ALOGW("Poll failed with an unexpected error: %s", strerror(errno));
        result = POLL_ERROR;
        goto Done;
    }

     //epoll-wait 返回0,阻塞到时间了
    if (eventCount == 0) {
        result = POLL_TIMEOUT;
        goto Done;
    }

     //遍历所有的事件
    for (int i = 0; i < eventCount; i++) {
        int fd = eventItems[i].data.fd;
        uint32_t epollEvents = eventItems[i].events;
        
        //如果事件的描述符是mWakeEventFd
        if (fd == mWakeEventFd) {
            //而且是一个写入事件
            if (epollEvents & EPOLLIN) {
                //读取管道内的数据,清空数据,为下一次做准备
                awoken();
            } else {
                ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
            }
        } else {
            //将native的事件push到response队列中
            ssize_t requestIndex = mRequests.indexOfKey(fd);
            if (requestIndex >= 0) {
                int events = 0;
                if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
                if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
                if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
                if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
                pushResponse(events, mRequests.valueAt(requestIndex));
            } else {
                ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
                        "no longer registered.", epollEvents, fd);
            }
        }
    }
Done: ;
    // Invoke pending message callbacks.
    mNextMessageUptime = LLONG_MAX;
    while (mMessageEnvelopes.size() != 0) {
        nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
        const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
        if (messageEnvelope.uptime <= now) {
            { 
                sp<MessageHandler> handler = messageEnvelope.handler;
                Message message = messageEnvelope.message;
                mMessageEnvelopes.removeAt(0);
                mSendingMessage = true;
                mLock.unlock();
                handler->handleMessage(message);
            } // release handler

            mLock.lock();
            mSendingMessage = false;
            result = POLL_CALLBACK;
        } else {
            // The last message left at the head of the queue determines the next wakeup time.
            mNextMessageUptime = messageEnvelope.uptime;
            break;
        }
    }

    // Release lock.
    mLock.unlock();

    // Invoke all response callbacks.
    for (size_t i = 0; i < mResponses.size(); i++) {
        Response& response = mResponses.editItemAt(i);
        if (response.request.ident == POLL_CALLBACK) {
            int fd = response.request.fd;
            int events = response.events;
            void* data = response.request.data;

            // Invoke the callback.  Note that the file descriptor may be closed by
            // the callback (and potentially even reused) before the function returns so
            // we need to be a little careful when removing the file descriptor afterwards.
            int callbackResult = response.request.callback->handleEvent(fd, events, data);
            if (callbackResult == 0) {
                removeFd(fd, response.request.seq);
            }

            // Clear the callback reference in the response structure promptly because we
            // will not clear the response vector itself until the next poll.
            response.request.callback.clear();
            result = POLL_CALLBACK;
        }
    }
    return result;
}
复制代码
  • POLL_WAKE: 表示由wake()触发,即pipe写端的write事件触发;
  • POLL_CALLBACK: 表示某个被监听fd被触发。
  • POLL_TIMEOUT: 表示等待超时;
  • POLL_ERROR:表示等待期间发生错误

这些值是Looper返回的

上面这个图很好的介绍了一个消息出队的流程。

当队列是空的时候,因为timeoutMIlls传递的值是-1,所以会阻塞在epoll_wait,当有新的消息来的时候,eopll_wait会接收到对应的事件,并返回到Java层的next方法,next方法return 到loop方法,进行处理这条消息。接着在进一次for循环。

5 发送一条Message

上面我们分析了接收消息的方法,我们知道了当消息队列是空的时候,MessageQueue会阻塞等待一条消息到来,当新消息到来的时候,epoll_wait会接到注册的事件。那接下来我们来分析一下发送一条消息的流程。

public final boolean sendEmptyMessage(int what)  {
        return sendEmptyMessageDelayed(what, 0);
    }
复制代码
public final boolean sendEmptyMessageDelayed(int what, long delayMillis) {
        Message msg = Message.obtain();
        msg.what = what;
        return sendMessageDelayed(msg, delayMillis);
    }
复制代码
public boolean sendMessageAtTime(Message msg, long uptimeMillis) {
        MessageQueue queue = mQueue;
        if (queue == null) {
            RuntimeException e = new RuntimeException(
                    this + " sendMessageAtTime() called with no mQueue");
            Log.w("Looper", e.getMessage(), e);
            return false;
        }
        return enqueueMessage(queue, msg, uptimeMillis);
    }
复制代码
private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
    msg.target = this;
    if (mAsynchronous) {
        msg.setAsynchronous(true);
    }
    return queue.enqueueMessage(msg, uptimeMillis);
}
复制代码

Handler 发送一条消息,会调用到MessageQueue的enqueueMessage(Message msg, long when)

boolean enqueueMessage(Message msg, long when) {
    //普通消息必须有target
    //没有target是消息壁障
    if (msg.target == null) {
        throw new IllegalArgumentException("Message must have a target.");
    }
    if (msg.isInUse()) {
        throw new IllegalStateException(msg + " This message is already in use.");
    }

    synchronized (this) {
        //正在退出了,就回收该信息了
        if (mQuitting) {
            IllegalStateException e = new IllegalStateException(
                    msg.target + " sending message to a Handler on a dead thread");
            Log.w(TAG, e.getMessage(), e);
            msg.recycle();
            return false;
        }

        msg.markInUse();
        msg.when = when;
        Message p = mMessages;
        boolean needWake;
        
        //消息入队 
      
       //插入一个新的队头
       //1 当前的消息为空
       //2 消息立马执行
       //3 当前消息的执行时间小于队头的时间
        if (p == null || when == 0 || when < p.when) {
            msg.next = p;
            mMessages = msg;
            needWake = mBlocked;
        } else {
             //插入队列中,不一定要唤醒,只有是一个消息壁障的时候才会唤醒
            needWake = mBlocked && p.target == null && msg.isAsynchronous();
            Message prev;
            for (;;) {
                prev = p;
                p = p.next;
                if (p == null || when < p.when) {
                    break;
                }
                if (needWake && p.isAsynchronous()) {
                    needWake = false;
                }
            }
            msg.next = p; // invariant: p == prev.next
            prev.next = msg;
        }

         //唤醒服务
        if (needWake) {
            nativeWake(mPtr);
        }
    }
    return true;
}
复制代码

这里核心的处理就是Message入队的服务,根据MessageQueue的状态判断是不是需要唤醒,如果需要唤醒就调用nativeWake方法

private native static void nativeWake(long ptr);
复制代码

这个是一个native方法

static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
    nativeMessageQueue->wake();
}
复制代码
void NativeMessageQueue::wake() {
    mLooper->wake();
}
复制代码
void Looper::wake() {
    uint64_t inc = 1;
    //核心的方法就是往这个文件里面写一个字节
    //因为已经注册到epoll事件里面了,epoll_wait会受到事件
    ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t)));
    if (nWrite != sizeof(uint64_t)) {
        if (errno != EAGAIN) {
            LOG_ALWAYS_FATAL("Could not write wake signal to fd %d: %s",
                    mWakeEventFd, strerror(errno));
        }
    }
}
复制代码

收到事件后,Looper会清空这个文件内部的内容

 //如果事件的描述符是mWakeEventFd
        if (fd == mWakeEventFd) {
            //而且是一个写入事件
            if (epollEvents & EPOLLIN) {
                //读取管道内的数据,清空数据,为下一次做准备
                awoken();
            } else {
                ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
            }
        } 
复制代码
void Looper::awoken() {
    uint64_t counter;
    TEMP_FAILURE_RETRY(read(mWakeEventFd, &counter, sizeof(uint64_t)));
}
复制代码

发送消息这个流程也是很简单的。

如果这个消息是普通消息,插入到消息队列中,如果是在队头插入的话,就唤醒队列,唤醒的方法也是很简单,就是往文档符号中写入一个字符,就会唤醒epoll_wait。

epoll_wait 被唤醒后,先清除文件描述符先处理native的消息,就会到Java层next方法,读取消息队列中的第一个。

免责声明:文章版权归原作者所有,其内容与观点不代表Unitimes立场,亦不构成任何投资意见或建议。

程序员

473

相关文章推荐

未登录头像

暂无评论