设计模式 同步模式—保护性暂停 定义 保护性暂停即Guarded Suspension,用在一个线程等待另一个线程的执行结果,因为要等待另一方的结果,因此归类到同步模式。
要点:
有一个结果需要从一个线程传递到另一个线程,让他们关联同一个GuardedObject,JDK中,join的实现、Future的实现,采用的就是此模式。
如果有结果不断从一个线程到另一个线程那么可以使用消息队列(见生产者/消费者)
实现 单个线程相互等待:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 public class Test01 { public static void main (String[] args) { GuardedObject guardedObject = new GuardedObject(); new Thread(() -> { Object o = guardedObject.get(); System.out.println(o.toString()); },"t1" ).start(); new Thread(() -> { guardedObject.complete("产生的结果" ); },"t2" ).start(); } } class GuardedObject { private Object response; public Object get () { synchronized (this ){ while (response == null ){ try { this .wait(); } catch (InterruptedException e) { e.printStackTrace(); } } return response; } } public Object get (long timeout) { synchronized (this ){ long begin = System.currentTimeMillis(); long passedTime = 0 ; while (response == null ){ long waitTime = timeout - passedTime; if (waitTime <= 0 ){ break ; } try { this .wait(waitTime); } catch (InterruptedException e) { e.printStackTrace(); } passedTime = System.currentTimeMillis() - begin; } return response; } } public void complete (Object response) { synchronized (this ){ this .response = response; this .notifyAll(); } } }
多个线程相互等待
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 class Boxes { private static Map<Integer,GuardedObject> boxes = new Hashtable<>(); private static int id = 1 ; private static synchronized int generateId () { return id++; } public static GuardedObject createGuardedObject () { GuardedObject guardedObject = new GuardedObject(generateId()); boxes.put(guardedObject.getId(),guardedObject); return guardedObject; } public static Set<Integer> getIds () { return boxes.keySet(); } public static GuardedObject getGuarObject (int id) { return boxes.remove(id); } } class GuardedObject { private int id; private Object response; public GuardedObject () { } public GuardedObject (int id) { this .id = id; } public int getId () { return id; } public Object get () { synchronized (this ){ while (response == null ){ try { this .wait(); } catch (InterruptedException e) { e.printStackTrace(); } } return response; } } public Object get (long timeout) { synchronized (this ){ long begin = System.currentTimeMillis(); long passedTime = 0 ; while (response == null ){ long waitTime = timeout - passedTime; if (waitTime <= 0 ){ break ; } try { this .wait(waitTime); } catch (InterruptedException e) { e.printStackTrace(); } passedTime = System.currentTimeMillis() - begin; } return response; } } public void complete (Object response) { synchronized (this ){ this .response = response; this .notifyAll(); } } }
同步模式—顺序控制 有时候需要控制线程的执行顺序,例如下面的例子要保证t2线程在t1线程前执行
wait/notify方式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public class 顺序控制 { static final Object lock = new Object(); static boolean flag = false ; public static void main (String[] args) { new Thread(() -> { synchronized (lock){ while (!flag){ try { lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } System.out.println(1 ); },"t1" ).start(); new Thread(() -> { synchronized (lock){ System.out.println(2 ); flag = true ; lock.notify(); } },"t2" ).start(); } }
这种方式与ReentrantLock的await和signal一致
park/unpark方式 同样例子:要让t2线程在t1线程前执行
1 2 3 4 5 6 7 8 9 10 11 12 13 public class 顺序控制 { public static void main (String[] args) { Thread t1 = new Thread(() -> { LockSupport.park(); System.out.println(1 ); }, "t1" ); t1.start(); new Thread(()->{ System.out.println(2 ); LockSupport.unpark(t1); },"t2" ).start(); } }
异步模式—生产者/消费者 定义 与前面的保护性暂停中的GuardObject 不同,不需要产生结果和消费结果的线程一一对应,并且是异步的不会立刻对消息进行处理消费;JDK中各种阻塞队列,采用的就是这种模式。
要点:
消费队列可以用来平衡生产和消费的线程资源
生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 public class xfz { public static void main (String[] args) { MessageQueue queue = new MessageQueue(2 ); for (int i = 0 ; i < 3 ; i++) { int id = i; new Thread(()->{ queue.put(new Message(id,"值" +id )); },"生成者" +i).start(); } new Thread(()->{ while (true ){ try { sleep(1000 ); System.out.println(queue.take()); } catch (InterruptedException e) { e.printStackTrace(); } } },"消费者" ).start(); } } class MessageQueue { private LinkedList<Message> list = new LinkedList<>(); private int capcity; public MessageQueue (int capcity) { this .capcity = capcity; } public Message take () { synchronized (list){ while (list.isEmpty()){ System.out.println("队列为空" ); try { list.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } Message message = list.removeFirst(); list.notifyAll(); return message; } } public void put (Message message) { synchronized (list){ while (list.size() == capcity){ System.out.println("队列已满!!" ); try { list.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } list.addLast(message); list.notifyAll(); } } } final class Message { private int id; private Object value; public Message (int id, Object value) { this .id = id; this .value = value; } public int getId () { return id; } public Object getValue () { return value; } @Override public String toString () { return "Message{" + "id=" + id + ", value=" + value + '}' ; } }
交替执行 该模式主要是用于让多线程按照一定的顺序交替执行,案例:让a,b,c线程按照顺序交替执行
wait/notify 代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 public class 交替输出01 { public static void main (String[] args) { WaitNotify waitNotify = new WaitNotify(1 , 3 ); new Thread(()->{ waitNotify.print("aaaa" ,1 , 2 ); },"a" ).start(); new Thread(()->{ waitNotify.print("bbbb" ,2 , 3 ); },"b" ).start(); new Thread(()->{ waitNotify.print("cccc" ,3 , 1 ); },"c" ).start(); } } class WaitNotify { public void print (String str,int waitFlag,int nextFlag) { for (int i = 0 ; i < loopNumber; i++) { synchronized (this ){ while (flag != waitFlag){ try { this .wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.print(str); flag = nextFlag; this .notifyAll(); } } } private int flag; private int loopNumber; public WaitNotify (int flag, int loopNumber) { this .flag = flag; this .loopNumber = loopNumber; } }
结果:
await/signal 代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 public class 交替输出02 { public static void main (String[] args) { AwaitSignal awaitSignal = new AwaitSignal(5 ); Condition conditionA = awaitSignal.newCondition(); Condition conditionB = awaitSignal.newCondition(); Condition conditionC = awaitSignal.newCondition(); new Thread(() -> { awaitSignal.print("aaaa" ,conditionA, conditionB); }).start(); new Thread(() -> { awaitSignal.print("bbbb" ,conditionB, conditionC); }).start(); new Thread(() -> { awaitSignal.print("cccc" ,conditionC, conditionA); }).start(); awaitSignal.lock(); try { System.out.println("开始..." ); conditionA.signal(); }finally { awaitSignal.unlock(); } } } class AwaitSignal extends ReentrantLock { private int loopNumber; public AwaitSignal (int loopNumber) { this .loopNumber = loopNumber; } public void print (String str,Condition current,Condition next) { for (int i = 0 ; i < loopNumber; i++) { lock(); try { current.await(); System.out.print(str); next.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { unlock(); } } } }
结果:
park/unpark 代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 public class 交替输出03 { static Thread a; static Thread b; static Thread c; public static void main (String[] args) { ParkUnpark parkUnpark = new ParkUnpark(5 ); a = new Thread(() -> { parkUnpark.print("aaaa" ,b); },"a" ); b = new Thread(() -> { parkUnpark.print("bbbb" ,c); },"b" ); c = new Thread(() -> { parkUnpark.print("cccc" ,a); },"c" ); a.start(); b.start(); c.start(); LockSupport.unpark(a); } } class ParkUnpark { public void print (String str, Thread next) { for (int i = 0 ; i < loopNumber; i++) { LockSupport.park(); System.out.print(str); LockSupport.unpark(next); } } private int loopNumber; public ParkUnpark (int loopNumber) { this .loopNumber = loopNumber; } }
结果:
两阶段终止模式 两阶段终止模式是指能够优雅的结束线程,能够进行完善的善后工作。
终止线程的错误想法:
使用stop(),stop方法会真正杀死线程,如果这时线程锁住了共享资源,那么当它被杀死后就再也没有机会释放锁,其它线程将永远无法获取锁,所以该方法已经被废弃!同样的还有suspend()、resume()
使用System.exit(0):这样会让整个程序都停止
打断标记法实现 利用调用interrupt()方法会让线程抛出InterruptedException异常,这样可以捕获标记打断标记,被标记后会进行善后工作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 class Termination { private Thread monitor; public void start () { monitor = new Thread(() -> { while (true ){ Thread current = Thread.currentThread(); if (current.isInterrupted()){ System.out.println("线程结束中...." +current.isInterrupted()); break ; } try { Thread.sleep(1000 ); System.out.println("执行监控.." ); } catch (InterruptedException e) { e.printStackTrace(); current.interrupt(); } } }); monitor.start(); } public void stop () { monitor.interrupt(); } }
利用volatile优化:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 class Termination { private Thread monitor; private volatile boolean stop = false ; public void start () { monitor = new Thread(() -> { while (true ){ if (stop){ System.out.println("线程结束中...." ); break ; } try { Thread.sleep(1000 ); System.out.println("执行监控.." ); } catch (InterruptedException e) { } } }); monitor.start(); } public void stop () { stop = true ; monitor.interrupt(); } }
避免共享 如果避免了共享,同样可以解决多线程并发问题,已经知道通过局部变量可以做到避免共享,还可以使用Java 语言提供的线程本地存储(ThreadLocal)
ThreadLocal ThreadLocal 的目标是让不同的线程有不同的变量 V,所以其内部维护了一个Map,叫做 ThreadLocalMap,不过持有 ThreadLocalMap 的不是 ThreadLocal,而是 Thread。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 class Thread { ThreadLocal.ThreadLocalMap threadLocals; } class ThreadLocal <T > { public T get () { ThreadLocalMap map = Thread.currentThread() .threadLocals; Entry e = map.getEntry(this ); return e.value; } static class ThreadLocalMap { Entry[] table; Entry getEntry (ThreadLocal key) { } static class Entry extends WeakReference <ThreadLocal > { Object value; } } }
将Map放在每个Thread里面的好处:
ThreadLocal 仅仅是一个代理工具类,内部并不持有任何与线程相关的数据,所有和线程相关的数据都存储在 Thread 里面,这样的设计容易理解。
这样不容易造成内存泄漏,防止Map 中的 Thread 对象就永远不会被回收。(但是如果是线程池使用了就有可能会导致内存泄漏;原因就出在线程池中线程的存活时间太长,往往都是和程序同生共死的,这就意味着 Thread 持有的 ThreadLocalMap 一直都不会被回收)
1 2 3 4 5 6 7 8 9 10 11 12 ExecutorService es; ThreadLocal tl; es.execute(()->{ tl.set(obj); try { }finally { tl.remove(); } });
Balking(犹豫)模式 Balking (犹豫)模式用在一个线程发现另一个线程或本线程已经做了某一件相同的事,那么本线程就无需再做了,直接结束返回。例如线程安全的单例模式 中的双重检查就是利用的这个模式。
例如:尽管start了多次,始终只有一个线程在执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 public class 犹豫模式 { public static void main (String[] args) { Termination termination = new Termination(); termination.start(); termination.start(); } } class Termination { private Thread monitor; private volatile boolean stop = false ; private boolean starting = false ; public void start () { synchronized (this ){ if (starting){ return ; } starting = true ; } monitor = new Thread(() -> { while (true ){ Thread current = Thread.currentThread(); if (stop){ System.out.println("线程结束中...." ); starting = false ; break ; } try { Thread.sleep(1000 ); System.out.println("执行监控.." ); } catch (InterruptedException e) { } } }); monitor.start(); } public void stop () { stop = true ; monitor.interrupt(); } }