作为Android中最常用跨线程手段之一,AsyncTask经常出现在代码中。我也经常使用AsyncTask,有一次遇到一个奇怪的情况:AsyncTask.execute()函数调用后,AsyncTask却未立即执行,而是过了一段时间以后,才开始执行,当时觉得很奇怪,一番Google后,得知是线程池的原因。事后也一直搁着,直到今天工作有点空,花了点时间看看AsyncTask的实现。
AsyncTask的线程池
AsyncTask提供了两个线程池,用以执行后台任务:
当然,开发者也可以通过自定义线程池来执行后台任务:
THREAD_POOL_EXECUTOR
THREAD_POOL_EXECUTOR的定义为:
-
/**
-
* An {@link Executor} that can be used to execute tasks in parallel.
-
*/
-
public static final Executor THREAD_POOL_EXECUTOR
-
= new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE,
-
TimeUnit.SECONDS, sPoolWorkQueue, sThreadFactory);
其中用到的一些参数如下:
-
private static final int CORE_POOL_SIZE = 5;
-
private static final int MAXIMUM_POOL_SIZE = 128;
-
private static final int KEEP_ALIVE = 1;
-
-
private static final ThreadFactory sThreadFactory = new ThreadFactory() {
-
private final AtomicInteger mCount = new AtomicInteger(1);
-
-
public Thread newThread(Runnable r) {
-
return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());//创建一个拥有特定名字的线程
-
}
-
};
-
-
private static final BlockingQueue<Runnable> sPoolWorkQueue =
-
new LinkedBlockingQueue<Runnable>(10);
这里不详细解释ThreadPoolExecutor的实现,而是简单介绍下ThreadPoolExecutor的工作逻辑(线程池的工作逻辑,相信大家都比较清楚)
- 当线程池内无空闲线程,且线程数不足CORE_POOL_SIZE时,创建新的线程来执行任务。
- 当线程池内无空闲线程,且线程数大于等于CORE_POOL_SIZE,且sPoolWorkQueue为满时,把任务放到sPoolWorkQueue中。
- 当线程池内无空闲线程,且线程数大于等于CORE_POOL_SZIE,且sPoolWorkQueue已满,且线程数未达到MAXIMUM_POOL_SIZE时,创建新线程以执行任务。
- 当线程池内无空闲线程,且线程数大于等于CORE_POOL_SZIE,且sPoolWorkQueue已满,且线程数等于MAXIMUM_POOL_SIZE时,抛出异常。
从当前的参数我们可以看到,THREAD_POOL_EXECUTOR最多同时拥有128个线程执行任务,通常情况下(sPoolWorkQueue有任务,且未满),THREAD_POOL_EXECUTOR会拥有5条线程同时处理任务。
SERIAL_EXECUTOR
默认情况下,AsyncTask会在SERIAL_EXECUTOR中执行后台任务(其实,这个说法不太准确,稍后解释):
-
private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;
SERIAL_EXECUTOR的定义为:
-
/**
-
* An {@link Executor} that executes tasks one at a time in serial
-
* order. This serialization is global to a particular process.
-
*/
-
public static final Executor SERIAL_EXECUTOR = new SerialExecutor();
而SerialExecutor的定义为:
-
private static class SerialExecutor implements Executor {
-
final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
-
Runnable mActive;
-
-
public synchronized void execute(final Runnable r) {
-
mTasks.offer(new Runnable() {
-
public void run() {
-
try {
-
r.run();
-
} finally {
-
scheduleNext();
-
}
-
}
-
});
-
if (mActive == null) {
-
scheduleNext();
-
}
-
}
-
-
protected synchronized void scheduleNext() {
-
if ((mActive = mTasks.poll()) != null) {
-
THREAD_POOL_EXECUTOR.execute(mActive);
-
}
-
}
-
}
从这里,我们可以看到SerialExecutor并不是一个线程池(所以本文前面说AsyncTask的两个线程池的说法是不准确的),它实际上是提供了一个mTask来储存所有待执行的task,并逐个提交给THREAD_POOL_EXECUTOR执行。
所以,实际上所有的后台任务都是在THREAD_POOL_EXECUTOR中执行的,而
和
-
AsyncTask.executorOnExecutor(AsyncTask.THREAD_POOL_EXECUTOR)
的差别在于,前者会逐个执行任务,同一时间仅有一个任务被执行。
AsyncTask的实现
AsyncTask的实现,需要依赖于两个成员:
-
private final WorkerRunnable<Params, Result> mWorker;
-
private final FutureTask<Result> mFuture;
WorkerRunnable
WorkerRunnable的定义为:
-
private static abstract class WorkerRunnable<Params, Result> implements Callable<Result> {
-
Params[] mParams;
-
}
而Callable的定义为:
-
public interface Callable<V> {
-
/**
-
* Computes a result, or throws an exception if unable to do so.
-
*
-
* @return computed result
-
* @throws Exception if unable to compute a result
-
*/
-
V call() throws Exception;
-
}
WorkerRunnable为抽象类,所以使用的其实是它的子类:
-
/**
-
* Creates a new asynchronous task. This constructor must be invoked on the UI thread.
-
*/
-
public AsyncTask() {
-
mWorker = new WorkerRunnable<Params, Result>() {
-
public Result call() throws Exception {
-
mTaskInvoked.set(true);
-
-
Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
-
//noinspection unchecked
-
return postResult(doInBackground(mParams));
-
}
-
};
-
-
mFuture = new FutureTask<Result>(mWorker) {
-
@Override
-
protected void done() {
-
try {
-
postResultIfNotInvoked(get());
-
} catch (InterruptedException e) {
-
android.util.Log.w(LOG_TAG, e);
-
} catch (ExecutionException e) {
-
throw new RuntimeException("An error occured while executing doInBackground()",
-
e.getCause());
-
} catch (CancellationException e) {
-
postResultIfNotInvoked(null);
-
}
-
}
-
};
-
}
call函数的重载挺简单,主要就是调用doInBackground函数,执行后台任务。
FutureTask
FutureTask的实现比WorkerRunnable复杂,但是,如果抓住核心去分析也很简单。
首先, FutureTask实现了RunnableFuture接口:
-
public class FutureTask<V> implements RunnableFuture<V> {
然后,RunnableFuture继承自Runnable:
-
public interface RunnableFuture<V> extends Runnable, Future<V> {
-
/**
-
* Sets this Future to the result of its computation
-
* unless it has been cancelled.
-
*/
-
void run();
-
}
所以,FutureTask类,肯定实现了run方法:
-
public void run() {
-
if (state != NEW ||
-
!UNSAFE.compareAndSwapObject(this, runnerOffset,
-
null, Thread.currentThread()))
-
return;
-
try {
-
Callable<V> c = callable;
-
if (c != null && state == NEW) {
-
V result;
-
boolean ran;
-
try {
-
result = c.call();
-
ran = true;
-
} catch (Throwable ex) {
-
result = null;
-
ran = false;
-
setException(ex);
-
}
-
if (ran)
-
set(result);
-
}
-
} finally {
-
// runner must be non-null until state is settled to
-
// prevent concurrent calls to run()
-
runner = null;
-
// state must be re-read after nulling runner to prevent
-
// leaked interrupts
-
int s = state;
-
if (s >= INTERRUPTING)
-
handlePossibleCancellationInterrupt(s);
-
}
-
}
忽略其他处理,run函数执行了callable的call函数。再说说callable是什么东西:
-
public FutureTask(Callable<V> callable) {
-
if (callable == null)
-
throw new NullPointerException();
-
this.callable = callable;
-
this.state = NEW; // ensure visibility of callable
-
}
在看看前面已经介绍过的AsyncTask构造函数:
-
/**
-
* Creates a new asynchronous task. This constructor must be invoked on the UI thread.
-
*/
-
public AsyncTask() {
-
mWorker = new WorkerRunnable<Params, Result>() {
-
public Result call() throws Exception {
-
mTaskInvoked.set(true);
-
-
Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
-
//noinspection unchecked
-
return postResult(doInBackground(mParams));
-
}
-
};
-
-
mFuture = new FutureTask<Result>(mWorker) {
-
@Override
-
protected void done() {
-
try {
-
postResultIfNotInvoked(get());
-
} catch (InterruptedException e) {
-
android.util.Log.w(LOG_TAG, e);
-
} catch (ExecutionException e) {
-
throw new RuntimeException("An error occured while executing doInBackground()",
-
e.getCause());
-
} catch (CancellationException e) {
-
postResultIfNotInvoked(null);
-
}
-
}
-
};
-
}
原来callable就是mWorker,也就是说,FutureTask的run函数,会调用doInBackground函数。
AsyncTask.execute函数的执行过程
1. AsyncTask.execute
-
public final AsyncTask<Params, Progress, Result> execute(Params... params) {
-
return executeOnExecutor(sDefaultExecutor, params);
-
}
2.AsyncTask.executeOnExecutor
-
public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,
-
Params... params) {
-
if (mStatus != Status.PENDING) {
-
switch (mStatus) {
-
case RUNNING:
-
throw new IllegalStateException("Cannot execute task:"
-
+ " the task is already running.");
-
case FINISHED:
-
throw new IllegalStateException("Cannot execute task:"
-
+ " the task has already been executed "
-
+ "(a task can be executed only once)");
-
}
-
}
-
-
mStatus = Status.RUNNING;
-
-
onPreExecute();//调用onPreExecute
-
-
mWorker.mParams = params;//保存参数
-
exec.execute(mFuture);//执行task
-
-
return this;
-
}
跳过SERIAL_EXECUTOR和THREAD_POOL_EXECUTOR的实现不谈,我们可以用如下的方式,简单理解exe.execute函数:
-
void execute(Runnable runnable){
-
new Thread(runnable).start();
-
}
3. FutureTask.run()
-
public void run() {
-
if (state != NEW ||
-
!UNSAFE.compareAndSwapObject(this, runnerOffset,
-
null, Thread.currentThread()))
-
return;
-
try {
-
Callable<V> c = callable;
-
if (c != null && state == NEW) {
-
V result;
-
boolean ran;
-
try {
-
result = c.call();
-
ran = true;
-
} catch (Throwable ex) {
-
result = null;
-
ran = false;
-
setException(ex);
-
}
-
if (ran)
-
set(result);
-
}
-
} finally {
-
// runner must be non-null until state is settled to
-
// prevent concurrent calls to run()
-
runner = null;
-
// state must be re-read after nulling runner to prevent
-
// leaked interrupts
-
int s = state;
-
if (s >= INTERRUPTING)
-
handlePossibleCancellationInterrupt(s);
-
}
-
}
前面已经讲过,会调用mWorker.call().
4. WorkerRunnable.call()
-
mWorker = new WorkerRunnable<Params, Result>() {
-
public Result call() throws Exception {
-
mTaskInvoked.set(true);
-
-
Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
-
//noinspection unchecked
-
return postResult(doInBackground(mParams));
-
}
-
};
5. AsyncTask.postResult()
-
private Result postResult(Result result) {
-
@SuppressWarnings("unchecked")
-
Message message = sHandler.obtainMessage(MESSAGE_POST_RESULT,
-
new AsyncTaskResult<Result>(this, result));
-
message.sendToTarget();
-
return result;
-
}
通过sHandler,去UI线程执行接下来的工作。
6. sHandler.handleMessage
-
private static class InternalHandler extends Handler {
-
@SuppressWarnings({"unchecked", "RawUseOfParameterizedType"})
-
@Override
-
public void handleMessage(Message msg) {
-
AsyncTaskResult result = (AsyncTaskResult) msg.obj;
-
switch (msg.what) {
-
case MESSAGE_POST_RESULT:
-
// There is only one result
-
result.mTask.finish(result.mData[0]);
-
break;
-
case MESSAGE_POST_PROGRESS:
-
result.mTask.onProgressUpdate(result.mData);
-
break;
-
}
-
}
-
}
7. AsyncTask.finish
-
private void finish(Result result) {
-
if (isCancelled()) {
-
onCancelled(result);
-
} else {
-
onPostExecute(result);
-
}
-
mStatus = Status.FINISHED;
-
}
通常情况下(即尚未调用过cancel(boolean interrupt)),isCannelled()为false,接下来会执行onPostExecute。
总结
从本质上来讲,AsyncTask即为Execute和Handler的组合。AsyncTask利用Executor在后台线程中执行doInBackgroud函数,并利用handler执行onPostExecute函数。
而AsyncTask的行为则由executeOnExecutor函数中参数exec决定。AsyncTask提供的SERIAL_EXECUTOR限定任务逐个执行,而THREAD_POOL_EXECUTOR支持最多128个任务并行执行,但是当待处理任务过多时,会抛出RejectedExecutionException。