J~杰's Blog

人生就一条路,走一步有一步的景观

0%

简介

线程任务执行如果需要获取执行结果,就必须通过共享变量或者使用线程通信的方式来达到效果,这样使用起来就比较麻烦。

而自从Java 1.5开始,就提供了Callable和Future,通过它们可以在任务执行完毕之后得到任务执行结果。

今天我们就来讨论一下Callable、Future和FutureTask三个类的使用方法。

Callable与Runnable

先说一下java.lang.Runnable吧,它是一个接口,在它里面只声明了一个run()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@FunctionalInterface
public interface Runnable {
/**
* When an object implementing interface <code>Runnable</code> is used
* to create a thread, starting the thread causes the object's
* <code>run</code> method to be called in that separately executing
* thread.
* <p>
* The general contract of the method <code>run</code> is that it may
* take any action whatsoever.
*
* @see java.lang.Thread#run()
*/
public abstract void run();
}

由于run()方法返回值为void类型,所以在执行完任务之后无法返回任何结果。

Callable位于java.util.concurrent包下,它也是一个接口,在它里面也只声明了一个方法,只不过这个方法叫做call():

1
2
3
4
5
6
7
8
9
10
@FunctionalInterface
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}

可以看到,这是一个泛型接口,call()函数返回的类型就是传递进来的V类型。

那么怎么使用Callable呢?一般情况下是配合ExecutorService来使用的,在ExecutorService接口中声明了若干个submit方法的重载版本:

1
2
3
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);

Future

Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。

Future类位于java.util.concurrent包下,它是一个接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public interface Future<V> {

/**
* cancel方法用来取消任务,如果取消任务成功则返回true,如果取消任务失败则返回false。
* 参数mayInterruptIfRunning表示是否允许取消正在执行却没有执行完毕的任务,
* 如果设置true,则表示可以取消正在执行过程中的任务。
* 如果任务已经完成,则无论mayInterruptIfRunning为true还是false,此方法肯定返回false,
* 即如果取消已经完成的任务会返回false;如果任务正在执行,若mayInterruptIfRunning设置为true,
* 则返回true,若mayInterruptIfRunning设置为false,则返回false;
* 如果任务还没有执行,则无论mayInterruptIfRunning为true还是false,肯定返回true。
* @param mayInterruptIfRunning
* @return
*/
boolean cancel(boolean mayInterruptIfRunning);

/**
* 表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true。
* @return
*/
boolean isCancelled();

/**
* 表示任务是否已经完成,若任务完成,则返回true;
* @return
*/
boolean isDone();

/**
* 用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回;
* @return
* @throws InterruptedException
* @throws ExecutionException
*/
V get() throws InterruptedException, ExecutionException;

/**
* 用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null。
*
* @param timeout
* @param unit
* @return
* @throws InterruptedException
* @throws ExecutionException
* @throws TimeoutException
*/
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

RunnableFuture

RunnableFuture作为 Runnable 的 Future。成功执行 run 方法可以完成 Future 并允许访问其结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* 作为 Runnable 的 Future。成功执行 run 方法可以完成 Future 并允许访问其结果。
* A {@link Future} that is {@link Runnable}. Successful execution of
* the {@code run} method causes completion of the {@code Future}
* and allows access to its results.
* @see FutureTask
* @see Executor
* @since 1.6
* @author Doug Lea
* @param <V> The result type returned by this Future's {@code get} method
*/
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}

FutureTask

我们先来看一下FutureTask的实现:

1
public class FutureTask<V> implements RunnableFuture<V>

FutureTask类实现了RunnableFuture接口,我们看一下RunnableFuture接口的实现:

1
2
3
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}

可以看出RunnableFuture继承了Runnable接口和Future接口,而FutureTask实现了RunnableFuture接口。所以它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。
FutureTask提供了2个构造器:

1
2
3
4
public FutureTask(Callable<V> callable) {
}
public FutureTask(Runnable runnable, V result) {
}
Read more »

背景

在使用Java编程过程中,我们对于一些代码调用的细节有多种编写方式,但是不确定它们性能时,往往采用重复多次计数的方式来解决。但是随着JVM不断的进化,随着代码执行次数的增加,JVM会不断的进行编译优化,使得重复多少次才能够得到一个稳定的测试结果变得让人疑惑,这时候有经验的同学就会在测试执行前先循环上万次并注释为预热。

没错!这样做确实可以获得一个偏向正确的测试结果,但是我们试想如果每到需要斟酌性能的时候,都要根据场景写一段预热的逻辑吗?当预热完成后,需要多少次迭代来进行正式内容的测量呢?每次测试结果的输出报告是不是都需要用System.out来输出呢?

其实这些工作都可以交给 JMH (the Java Microbenchmark Harness) ,它被作为Java9的一部分来发布,但是我们完全不需要等待Java9,而可以方便的使用它来简化我们测试,它能够照看好JVM的预热、代码优化,让你的测试过程变得更加简单。

Read more »

简介

DelayedWorkQueue是一个基于堆的数据结构,类似于DelayQueue和PriorityQueue。在执行定时任务的时候,每个任务的执行时间都不同,所以DelayedWorkQueue的工作就是按照执行时间的升序来排列,执行时间距离当前时间越近的任务在队列的前面。

堆结构如下图所示:

为什么要使用DelayedWorkQueue呢?
定时任务执行时需要取出最近要执行的任务,所以任务在队列中每次出队时一定要是当前队列中执行时间最靠前的,所以自然要使用优先级队列。

DelayedWorkQueue是一个优先级队列,它可以保证每次出队的任务都是当前队列中执行时间最靠前的,由于它是基于堆结构的队列,堆结构在执行插入和删除操作时的最坏时间复杂度是 O(logN)。

排序规则

  • 执行时间距离当前时间越近,越靠前
  • 如果执行时间相同,则先执行插入时间靠前的任务。

新增/获取任务

DelayedWorkQueue通过put或者add来新增一条任务,但其底层都是调用offer来新增任务的。对于获取任务,我们知道在ThreadPoolExecutor中线程根据getTask来获取任务队列中的任务,而在getTask中任务队列通过poll或者take函数来获取任务队列中的任务,由于ScheduleThreadPoolExecutor继承自ThreadPoolExecutor,因此其底层获取任务方式相同,只需要DelayedWorkQueue提供take及pool方法即可。

DelayWorkQueue底层是用最小堆数据结构实现的,需要最先执行的任务在堆的顶部,因此在每次插入或者删除任务时需要调整二叉树节点的顺序,但不同于最小堆的地方在于DelayWorkQueue不关心兄弟节点之间的顺序,只要父节点的任务先于子节点执行即可。

在一个最小堆的队列中,假如索引从0开始,子节点索引值为k,父节点索引值为p,则存在如下规律:

  • 一个节点的左子节点的索引为:k = p * 2 + 1
  • 一个节点的右子节点的索引为:k = (p + 1) * 2
  • 一个节点的父节点的索引为:p = (k - 1) / 2
Read more »

简介

自JDK1.5开始,JDK提供了ScheduledThreadPoolExecutor类来支持周期性任务的调度。在这之前的实现需要依靠Timer和TimerTask或者其它第三方工具来完成。但Timer有不少的缺陷:

  • Timer是单线程模式;
  • 如果在执行任务期间某个TimerTask耗时较久,那么就会影响其它任务的调度;
  • Timer的任务调度是基于绝对时间的,对系统时间敏感;
  • Timer不会捕获执行TimerTask时所抛出的异常,由于Timer是单线程,所以一旦出现异常,则线程就会终止,其他任务也得不到执行。

ScheduledThreadPoolExecutor继承ThreadPoolExecutor来重用线程池的功能,它的实现方式如下:

  • 将任务封装成ScheduledFutureTask对象,ScheduledFutureTask基于相对时间,不受系统时间的改变所影响;
  • ScheduledFutureTask实现了java.lang.Comparable接口和java.util.concurrent.Delayed接口,所以有两个重要的方法:compareTo和getDelay。compareTo方法用于比较任务之间的优先级关系,如果距离下次执行的时间间隔较短,则优先级高;getDelay方法用于返回距离下次任务执行时间的时间间隔;
  • ScheduledThreadPoolExecutor定义了一个DelayedWorkQueue,它是一个有序队列,会通过每个任务按照距离下次执行时间间隔的大小来排序;
  • ScheduledFutureTask继承自FutureTask,可以通过返回Future对象来获取执行的结果。

类图

从ScheduledThreadPoolExecutor类声明可以看出:

ScheduledThreadPoolExecutor是ThreadPoolExecutor的子类,并且实现了接口ScheduledExecutorService;

Read more »

简介

Executors 这个类,因为它仅仅是工具类,它的所有方法都是 static 的。

Demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package com.fly.learn.executor;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* 线程池test
* @author: peijiepang
* @date 2018/11/23
* @Description:
*/
public class ThreadPoolExecutorTest {
private final static Logger LOGGER = LoggerFactory.getLogger(ThreadPoolExecutorTest.class);
//private static ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10));
//private static Executor executor = Executors.newFixedThreadPool(5);
//private static Executor executor = Executors.newSingleThreadExecutor();
//private static Executor executor = Executors.newCachedThreadPool();
private static ExecutorService executor = Executors.newScheduledThreadPool(5);

public void executeTask(){
Task1 task1 = new Task1();//构建任务
executor.execute(task1);//执行任务
}

/*
* 基本任务2
*/
class Task1 implements Runnable{
public void run() {
//具体任务的业务
for(int i=0;i<1000;i++){
LOGGER.info("{}...",i);
}
}
}
public static void main(String[] args) {
ThreadPoolExecutorTest test = new ThreadPoolExecutorTest();
test.executeTask();
executor.shutdown();
while (!executor.isTerminated()){
LOGGER.info("finish......");
}
}
}
Read more »

本文根据dbaplus社群第156期线上分享整理而成.地址:https://m.qlchat.com/topic/details?topicId=2000001669563722&tracePage=liveCenter

讲解人:赵俊 京东金融高级Java开发工程师

分布式事务的使用场景

ACID

  • Atomicity:原子性

    事务作为整体来执行,要么全部执行,要么全不执行。

  • Consistency:一致性

    事务应确保数据从一个一致的状态转变为另一个一致的状态。

  • Isolation:隔离性

    多个事务并发执行时,一个事务的执行不应影响其他事务的执行。

  • Durability:持久性

    已提交的事务修改数据会被持久保持。

关系型数据库的本地事务完美的提供了对ACID的原生支持。但在分布式的场景下,它却成为系统性能的桎梏。如何让数据库在分布式场景下满足ACID的特性或找寻相应的替代方案,是本文将要阐述的话题。

CAP和Base理论

对于互联网应用而言,随着访问量和数据量的激增,传统的单体架构模式将无法满足业务的高速发展。这时,开发者需要把单体应用拆分为多个独立的小应用,把单个数据库按照分片规则拆分为多个库和多个表。

数据拆分后,如何在多个数据库节点间保证本地事务的ACID特性则成为一个技术难题,并且由此而衍生出了CAP和BASE经典理论。

CAP理论指出,对于分布式的应用而言,不可能同时满足C(一致性),A(可用性),P(分区容错性),由于网络分区是分布式应用的基本要素,因此开发者需要在C和A上做出平衡。

由于C和A互斥性,其权衡的结果就是BASE理论。

对于大部分的分布式应用而言,只要数据在规定的时间内达到最终一致性即可。我们可以把符合传统的ACID叫做刚性事务,把满足BASE理论的最终一致性事务叫做柔性事务。

一味的追求强一致性,并非最佳方案。对于分布式应用来说,刚柔并济是更加合理的设计方案,即在本地服务中采用强一致事务,在跨系统调用中采用最终一致性。如何权衡系统的性能与一致性,是十分考验架构师与开发者的设计功力的。

Read more »

简介

AbstractExecutorService 抽象类派生自 ExecutorService 接口,然后在其基础上实现了几个实用的方法,这些方法提供给子类进行调用。ExecutorService又是继承Executor接口。接下来我们就来一一分析。

源码解析

  1. Executor接口
    我们可以看到 Executor 接口非常简单,就一个 void execute(Runnable command) 方法,代表提交一个任务。
    1
    2
    3
    4
    5
    6
    7
    /* 
    * @since 1.5
    * @author Doug Lea
    */
    public interface Executor {
    void execute(Runnable command);
    }
Read more »

简介

线程池可以简单看做是一组线程的集合,通过使用线程池,我们可以方便的复用线程,避免了频繁创建和销毁线程所带来的开销。

类图

如上图,最顶层的接口 Executor 仅声明了一个方法execute。ExecutorService 接口在其父类接口基础上,声明了包含但不限于shutdown、submit、invokeAll、invokeAny 等方法。至于 ScheduledExecutorService 接口,则是声明了一些和定时任务相关的方法,比如 schedule和scheduleAtFixedRate。线程池的核心实现是在 ThreadPoolExecutor 类中,我们使用 Executors 调用newFixedThreadPool、newSingleThreadExecutor和newCachedThreadPool等方法创建线程池均是 ThreadPoolExecutor 类型。

Read more »

定义

Semaphore 主要用于限量控制并发执行代码的工具类, 其内部通过 一个 permit 来进行定义并发执行的数量,本质就是aqs的共享锁.

类图

Demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package com.fly.learn.reentrantlock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

/**
* @author: peijiepang
* @date 2018/11/17
* @Description:
*/
public class SemaphoreTest extends Thread{

private final static Logger LOGGER = LoggerFactory.getLogger(SemaphoreTest.class);

private Semaphore semaphore;


public SemaphoreTest(String threadName,Semaphore semaphore) {
this.semaphore = semaphore;
this.setName(threadName);
}

@Override
public void run(){
try {
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
Thread.sleep(1000l);
} catch (InterruptedException e) {
e.printStackTrace();
}
LOGGER.info("{}......",Thread.currentThread().getName());
semaphore.release();
}

public static void main(String[] args) {
//公平锁
Semaphore semaphore = new Semaphore(2,true);
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i=0;i<10;i++){
executorService.submit(new SemaphoreTest("thread_"+1,semaphore));
}
executorService.shutdown();
LOGGER.info("finish.......");
}
}

执行结果如下

1
2
3
4
5
6
7
8
9
10
2018-11-17 21:05:46.644 [pool-2-thread-1] INFO  c.f.l.reentrantlock.SemaphoreTest - pool-2-thread-1......
2018-11-17 21:05:46.645 [pool-2-thread-2] INFO c.f.l.reentrantlock.SemaphoreTest - pool-2-thread-2......
2018-11-17 21:05:47.647 [pool-2-thread-4] INFO c.f.l.reentrantlock.SemaphoreTest - pool-2-thread-4......
2018-11-17 21:05:47.647 [pool-2-thread-3] INFO c.f.l.reentrantlock.SemaphoreTest - pool-2-thread-3......
2018-11-17 21:05:48.650 [pool-2-thread-6] INFO c.f.l.reentrantlock.SemaphoreTest - pool-2-thread-6......
2018-11-17 21:05:48.650 [pool-2-thread-5] INFO c.f.l.reentrantlock.SemaphoreTest - pool-2-thread-5......
2018-11-17 21:05:49.652 [pool-2-thread-7] INFO c.f.l.reentrantlock.SemaphoreTest - pool-2-thread-7......
2018-11-17 21:05:49.652 [pool-2-thread-8] INFO c.f.l.reentrantlock.SemaphoreTest - pool-2-thread-8......
2018-11-17 21:05:50.657 [pool-2-thread-9] INFO c.f.l.reentrantlock.SemaphoreTest - pool-2-thread-9......
2018-11-17 21:05:50.658 [pool-2-thread-10] INFO c.f.l.reentrantlock.SemaphoreTest - pool-2-thread-10......

从上面的执行结果来看,每1s都会获取到2个令牌,符合预期Semaphore配置为2的结论。

特点

1.Semaphore方法的实现通过 Sync(AQS的继承类)代理来实现
2.支持公平与非公平模式, 都是在AQS的子类里面进行, 主要区分在 tryAcquire
Read more »

简介

与 CountDownLatch 的实现方式不同,CyclicBarrier 并没有直接通过 AQS 实现同步功能,而是在重入锁 ReentrantLock 的基础上实现的。在 CyclicBarrier 中,线程访问 await 方法需先获取锁才能访问。在最后一个线程访问 await 方法前,其他线程进入 await 方法中后,会调用 Condition 的 await 方法进入等待状态。在最后一个线程进入 CyclicBarrier await 方法后,该线程将会调用 Condition 的 signalAll 方法唤醒所有处于等待状态中的线程。同时,最后一个进入 await 的线程还会重置 CyclicBarrier 的状态,使其可以重复使用。

在创建 CyclicBarrier 对象时,需要转入一个值,用于初始化 CyclicBarrier 的成员变量 parties,该成员变量表示屏障拦截的线程数。当到达屏障的线程数小于 parties 时,这些线程都会被阻塞住。当最后一个线程到达屏障后,此前被阻塞的线程才会被唤醒。

Demo例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package com.fly.learn.reentrantlock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

/**
* @author: peijiepang
* @date 2018/11/14
* @Description:
*/
public class CyclicBarrierTest {
private final static Logger LOGGER = LoggerFactory.getLogger(CyclicBarrierTest.class);

public static void main(String[] args) throws BrokenBarrierException, InterruptedException {
CyclicBarrier cb = new CyclicBarrier(3, new Thread("barrierAction") {
public void run() {
LOGGER.info(Thread.currentThread().getName() + " barrier action");
}
});
MyThread t1 = new MyThread("t1", cb);
MyThread t2 = new MyThread("t2", cb);
t1.start();
t2.start();
LOGGER.info(Thread.currentThread().getName() + " going to await");
cb.await();
LOGGER.info(Thread.currentThread().getName() + " continue");
}

}

class MyThread extends Thread {
private final static Logger LOGGER = LoggerFactory.getLogger(MyThread.class);

private CyclicBarrier cb;
public MyThread(String name, CyclicBarrier cb) {
super(name);
this.cb = cb;
}

public void run() {
LOGGER.info(Thread.currentThread().getName() + " going to await");
try {
cb.await();
LOGGER.info(Thread.currentThread().getName() + " continue");
} catch (Exception e) {
e.printStackTrace();
}
}
}
1
2
3
4
5
6
7
2018-11-14 23:14:23.641 [main] INFO  c.f.l.r.CyclicBarrierTest - main going to await
2018-11-14 23:14:23.641 [t2] INFO com.fly.learn.reentrantlock.MyThread - t2 going to await
2018-11-14 23:14:23.640 [t1] INFO com.fly.learn.reentrantlock.MyThread - t1 going to await
2018-11-14 23:14:23.648 [t1] INFO c.f.l.r.CyclicBarrierTest - t1 barrier action
2018-11-14 23:14:23.648 [t1] INFO com.fly.learn.reentrantlock.MyThread - t1 continue
2018-11-14 23:14:23.649 [t2] INFO com.fly.learn.reentrantlock.MyThread - t2 continue
2018-11-14 23:14:23.650 [main] INFO c.f.l.r.CyclicBarrierTest - main continue
Read more »