首页 > 编程 > Java > 正文

《Java高并发程序设计》总结--2.Java并行程序基础

2019-11-06 07:26:28
字体:
来源:转载
供稿:网友
2.1 基本概念1)进程进程(PRocess)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。程序是指令、数据及其组织形式的描述,进程是程序的实体。2)线程线程是轻量级进程,是程序执行的最小单位。使用多线程而不是多进程去进行并发程序设计,是因为线程间的切换或调度的成本远小于进程。3)线程的状态:NEW状态表示刚刚创建的线程,这种线程还没开始执行。等到线程的start()方法调用时,才表示开始执行。当线程执行时,处于RUNNABLE状态,表示线程所需的一切资源都已经准备好了。如果线程在执行过程中遇到了synchronized同步块,就会进入BLOCKED阻塞状态。这事线程就会暂停执行,直到获得请求的锁。WAITING和TIMED_WAITING都表示等待状态,它们的区别是WAITING会进入一个无时间限制的等待,TIME_WATING会进入一个有时限的等待。一旦等到了期望的事件,线程会再次执行,进入RUNNABLE状态。当线程执行完毕后,则进入TERMINATED状态,表示结束。2.2 初始线程:线程的基本操作1)新建线程java提供了线程类Thread来创建多线程的程序。其实,创建线程与创建普通的类的对象的操作是一样的,而线程就是Thread类或其子类的实例对象。每个Thread对象描述了一个单独的线程。要产生一个线程,有两种方法:◆ 需要从Java.lang.Thread类派生一个新的线程类,重载它的run()方法; ◆ 实现Runnalbe接口,重载Runnalbe接口中的run()方法。2)终止线程Thread.stop()方法在结束线程时,会直接终止线程,并且会立即释放这个线程所持有的锁。而这些锁恰恰是用来维持对象一致性的。如果此时,写线程写入数据正写到一半,并强行终止,那么对象就会被破坏,同时,由于锁已经被释放,另外一个等待该锁的读线程就顺理成章的读到了这个不一致的对象。这个过程可以用一下代码模拟package%20cn.guet.parallel;public%20class%20StopThreadUnsafe%20{public%20static%20User%20u%20=%20new%20User();public%20static%20class%20User%20{private%20int%20id;private%20String%20name;public%20User()%20{id%20=%200;name%20=%20"0";}public%20int%20getId()%20{return%20id;}public%20void%20setId(int%20id)%20{this.id%20=%20id;}public%20String%20getName()%20{return%20name;}public%20void%20setName(String%20name)%20{this.name%20=%20name;}@Overridepublic%20String%20toString()%20{//%20TODO%20Auto-generated%20method%20stubreturn%20"User%20[id="%20+%20id%20+%20",name="%20+%20name%20+%20"]";}}public%20static%20class%20ChangeObjectThread%20extends%20Thread%20{@Overridepublic%20void%20run()%20{while(true)%20{synchronized%20(u)%20{int%20v%20=%20(int)(System.currentTimeMillis()/1000);u.setId(v);try%20{Thread.sleep(100);}%20catch%20(Exception%20e)%20{e.printStackTrace();}u.setName(String.valueOf(v));}Thread.yield();}}}public%20static%20class%20ReadObjectThread%20extends%20Thread%20{@Overridepublic%20void%20run()%20{while%20(true)%20{synchronized%20(u)%20{if(u.getId()%20!=%20Integer.parseInt(u.getName()))%20{System.out.println(u.toString());}};%20}}}public%20static%20void%20main(String[]%20args)%20throws%20Exception%20{new%20ReadObjectThread().start();while%20(true)%20{Thread%20t%20=%20new%20ChangeObjectThread();t.start();Thread.sleep(150);t.stop();}}}如果需要停止一个线程,只是需要自行决定线程何时退出就可以。用上述例子说明,只需将ChangeObjectTread线程增加一个stopMe()方法即可。public%20static%20class%20ChangeObjectThread%20extends%20Thread%20{volatile%20boolean%20stopme%20=%20false;public%20void%20stopMe()%20{stopme%20=%20true;}@Overridepublic%20void%20run()%20{while(true)%20{if(stopme)%20{System.out.println("exit%20by%20stop%20me");break;}synchronized%20(u)%20{int%20v%20=%20(int)(System.currentTimeMillis()/1000);u.setId(v);try%20{Thread.sleep(100);}%20catch%20(Exception%20e)%20{e.printStackTrace();}u.setName(String.valueOf(v));}Thread.yield();}}}3)线程中断Thread.interrupt()方法是一个实例方法。它通知目标线程中断,也就是设置中断标志位。中断标志位表示当前线程已经被中断。Thread.isInterrupted()方法也是实例方法,它判断当前线程是否有被中断(通过检查中断标志位)。最后的静态方法Thread.interrupted()也是用来判断当前线程的中断状态,但同时会清除当前线程的中断标志位状态。public%20static%20void%20main(String[]%20args)%20throws%20Exception%20{new%20ReadObjectThread().start();Thread%20t%20=%20new%20Thread()%20{@Overridepublic%20void%20run()%20{while%20(true)%20{if(Thread.currentThread().isInterrupted())%20{System.out.println("Interruted!");}try%20{Thread.sleep(2000);}%20catch%20(InterruptedException%20e)%20{System.out.println("Interruted%20When%20Sleep");Thread.currentThread().interrupt();}Thread.yield();}}};t.start();Thread.sleep(2000);t.interrupt();}4)等待和通知当一个对象实例上调用wait()方法后,当前线程就会在这个对象上等待。比如,线程A中,调用了obj.wait()方法,那么线程A就会停止继续执行,而转为等待状态。线程A一直等到其他线程调用了obj.notify()方法为止。这时,obj对象就俨然成为多个线程之间的有效通信手段。一个简单地使用wait()和notify()的案例:public class SimpleWN {final static Object object = new Object();public static class T1 extends Thread {@Overridepublic void run() {synchronized (object) {System.out.println(System.currentTimeMillis() + "T1 start!");try {System.out.println(System.currentTimeMillis() + "T1 wait for object");object.wait();} catch (Exception e) {e.printStackTrace();}System.out.println(System.currentTimeMillis() + "T1 end!");}}}public static class T2 extends Thread {@Overridepublic void run() {synchronized (object) {System.out.println(System.currentTimeMillis() + "T2 start! notify one thread");object.notify();System.out.println(System.currentTimeMillis() + "T2 end!");try {Thread.sleep(2000);} catch (InterruptedException e) {// TODO: handle exception}}}}public static void main(String[] args) {Thread t1 = new T1();Thread t2 = new T2();t1.start();t2.start();}}5)挂起和继续执行线程线程挂起(suspend)和继续执行(resume)是一对相反的操作,被挂起的线程,必须要等到resume()操作后,才能继续执行。并不推荐使用suspend()去挂起线程,因为suspend()在导致线程暂停的同时,并不会去释放任何资源。此时,其他线程想要访问被它暂用的锁时,都会被牵连,导致无法正常继续运行。为理解suspend()的问题,演示程序如下:public class BadSuspend {public static Object u = new Object();static ChangeObjectThread t1 = new ChangeObjectThread("t1");static ChangeObjectThread t2 = new ChangeObjectThread("t2");public static class ChangeObjectThread extends Thread {public ChangeObjectThread(String name) {super.setName(name);}@Overridepublic void run() {synchronized (u) {System.out.println("in " + getName());Thread.currentThread().suspend();}}}public static void main(String[] args) throws InterruptedException {t1.start();Thread.sleep(100);t2.start();t1.resume();t2.resume();t1.join();t2.join();}}改进后的代码如下:public class GoodSuspend {public static Object u = new Object();public static class ChangeObjectThread extends Thread {volatile boolean suspendme = false;public void suspendMe() {suspendme = true;}public void resumeMe() {suspendme = false;synchronized (this) {notify();}}@Overridepublic void run() {while (true) {synchronized (this) {while (suspendme) {try {wait();} catch (InterruptedException e) {e.printStackTrace();}}synchronized (u) {System.out.println("in ChangeObjectThread");}Thread.yield();}}}public static class ReadObjectThread extends Thread {@Overridepublic void run() {while(true) {synchronized (u) {System.out.println("in ReadObjectThread");}Thread.yield();}}}public static void main(String[] args) throws InterruptedException {ChangeObjectThread t1 = new ChangeObjectThread();ReadObjectThread t2 = new ReadObjectThread();t1.start();t2.start();Thread.sleep(1000);t1.suspendMe();System.out.println("suspend t1 2 second");Thread.sleep(2000);System.out.println("resume t1");t1.resumeMe();}}}6)等待线程结束(join)和谦让(yield)第一个join()方法表示无限等待,它会一直阻塞当前线程,直到目标线程执行完毕。第二个方法给出了一个最大等待时间,如果超过给定时间目标线程还在执行,当前线程也会因为“等不及了”,而继续往下执行。这里提供一个简单的join实例:public class JoinMain {public volatile static int i = 0;public static class AddThread extends Thread {@Overridepublic void run() {for (i = 0; i < 10000000; i++) {}}}public static void main(String[] args) throws InterruptedException {AddThread at = new AddThread();at.start();at.join();System.out.println(i);}}主函数中,如果不使用join()等待AddThread,那么得到i很可能是0或者一个非常小的数字。因为AddThread还没开始执行,i的值就已经被输出了。但在使用join()方法后,表示主线程愿意等待AddThread执行完毕,跟着AddTread一起往前走,故在join()返回时,AddThread已经执行完成,故i总是10000000。2.3 volatile与Java内存模型(JMM)volatile的定义如下: java编程语言允许线程访问共享变量,为了确保共享变量能被准确和一致的更新,线程应该确保通过排他锁单独获得这个变量。volatile对于保证操作的原子性是有非常大的帮助的。但是,volatile并不能代替锁,它无法保证一些复合操作的原子性,例如,volatile是无法保证i++的原子性操作的:public class PlusTask {static volatile int i = 0;public static class Plus implements Runnable {public void run() {for (int k = 0; k < 10000; k++) {i++;}}}public static void main(String[] args) throws InterruptedException {Thread[] threads = new Thread[10];for (int i = 0; i < 10; i++) {threads[i] = new Thread(new Plus());threads[i].start();}for (int i = 0; i < 10; i++) {threads[i].join();}System.out.println(i);}}此外,volatile也能保证数据的可见性和有序性。public class NoVisibility {private static boolean ready;private static int number;private static class ReaderThread extends Thread {@Overridepublic void run() {while(!ready);System.out.println(number);}}public static void main(String[] args) throws InterruptedException {new ReaderThread().start();Thread.sleep(1000);number = 42;ready = true;Thread.sleep(10000);}}在虚拟机的Client模式下,由于JIT并没有做足够的优化,在主线程修改ready变量的状态后,ReaderThread可以发现这个改动,并退出程序。但是在Server模式下,由于系统优化的结果,ReaderThread线程无法“看到”主线程中的修改,导致ReaderThread永远无法退出。这个问题就是一个典型的可见性问题。2.4 分门别类的管理:线程组线程组的使用如下:public class ThreadGroupName implements Runnable {@Overridepublic void run() {String groupAndName = Thread.currentThread().getThreadGroup().getName()+ "-" + Thread.currentThread().getName();while(true) {System.out.println("I am " + groupAndName);try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}}}public static void main(String[] args) {ThreadGroup tg = new ThreadGroup("PrintGroup");Thread t1 = new Thread(tg,new ThreadGroupName(),"T1");Thread t2 = new Thread(tg,new ThreadGroupName(),"T2");t1.start();t2.start();System.out.println(tg.activeCount());tg.list();}}2.5 守护线程(Daemon)只要当前JVM实例中尚存在任何一个非守护线程没有结束,守护线程就全部工作;只有当最后一个非守护线程结束时,守护线程随着JVM一同结束工作。Daemon的作用是为其他线程的运行提供便利服务,守护线程最典型的应用就是 GC (垃圾回收器),它就是一个很称职的守护者。这里有几点需要注意: (1) thread.setDaemon(true)必须在thread.start()之前设置,否则会跑出一个IllegalThreadStateException异常。你不能把正在运行的常规线程设置为守护线程。(2) 在Daemon线程中产生的新线程也是Daemon的。 (3) 不要认为所有的应用都可以分配给Daemon来进行服务,比如读写操作或者计算逻辑。2.6 线程优先级下面的代码展示了优先级的作用:public class PriorityDemo {public static class HightPriority extends Thread {static int count = 0;@Overridepublic void run() {while(true) {synchronized (PriorityDemo.class) {count++;if(count > 10000000) {System.out.println("HightPriority is complete");break;}}}}}public static class LowPriority extends Thread {static int count = 0;@Overridepublic void run() {while (true) {synchronized (PriorityDemo.class) {count++;if(count > 10000000) {System.out.println("LowPriority is complete");break;}}}}}public static void main(String[] args) {Thread high = new HightPriority();LowPriority low = new LowPriority();high.setPriority(Thread.MAX_PRIORITY);low.setPriority(Thread.MIN_PRIORITY);low.start();high.start();}}2.7 线程安全的概念与synchronized下面的代码演示了一个计数器,两个线程同时对i进行累加操作,各执行10000000次。在很多时候,i的最终值会小于20000000。这就是因为两个线程同时对i进行写入时,其中一个线程的结果会覆盖另一个。public class AccountingVol implements Runnable {static AccountingVol instance = new AccountingVol();static volatile int i =0;public static void increase() {i++;}@Overridepublic void run() {for(int j=0; j<10000000; j++) {increase();}}public static void main(String[] args) throws InterruptedException {Thread t1 = new Thread(instance);Thread t2 = new Thread(instance);t1.start();t2.start();t1.join();t2.join();System.out.println(i);}}要从根本上解决这个问题,我们就必须保证对个线程对i进行操作时完全同步。也就是说,当线程A在写入时,线程B不仅不能写,同时也不能读。关键字synchronized可以有多种用法。1)指定加锁对象:对给定对象加锁,进入同步代码前要获得给定对象的锁。2)直接作用于实例方法:相当于对当前实例加锁,进入同步代码前要获得当前实例的锁。3)直接作用于静态方法:相当于对当前类加锁,进入同步代码前要获得当前类的锁。下述代码中,将synchronized作用于一个给定对象instance,因此,每次当线程进入被synchronized包裹的代码段,就都会要求请求instance实例的锁。如果当前有其他线程正持有这把锁,那么新到的线程就必须等待。这样,就保证了每次只能有一个线程执行i++操作。public class AccountingSync implements Runnable{static AccountingSync instance = new AccountingSync();static int i = 0;@Overridepublic void run() {for(int j=0; j<10000000; j++) {synchronized (instance) {i++;}}}public static void main(String[] args) throws InterruptedException {Thread t1 = new Thread(instance);Thread t2 = new Thread(instance);t1.start();t2.start();t1.join();t2.join();System.out.println(i);}}2.8 隐蔽的错误1)无提示的错误案例int v1 = 1073741827;int v2 = 1431655768;int ave = (v1+v2)/2;System.out.println(ave);上述代码中,视图计算v1和v2的均值。这是一个典型的溢出问题。v1+v2的结果已经导致了int的溢出。2)并发下的ArrayListArrayList是一个线程不安全的容器。public class ArrayListMultiThread {static ArrayList<Integer> al = new ArrayList<Integer>();public static class AddThread implements Runnable {@Overridepublic void run() {for (int i = 0; i < 1000000; i++) {al.add(i);}}}public static void main(String[] args) throws InterruptedException {Thread t1 = new Thread(new AddThread());Thread t2 = new Thread(new AddThread());t1.start();t2.start();t1.join();t2.join();System.out.println(al.size());}}执行这段代码,可能出现三种结果。第一,程序正常结束。第二,程序抛出异常:Exception in thread "Thread-0" java.lang.ArrayIndexOutOfBoundsException: 6246这是因为ArrayList在扩容过程中,内部一致性被破坏,但由于没有锁的保护,另外一个线程访问到了不一致的内部状态,导致出现越界问题。第三,出现了一个非常隐蔽的错误比如打印如下值作为结果:1425166这是由于多线程访问冲突,使得彼此保存容器大小的变量被多线程不正常的访问,同时两个线程也同时对ArrayList中的同一个位置进行赋值导致的。3)并发下诡异的HashMapHashMap同样不是线程安全的。代码如下:import java.util.HashMap;import java.util.Map;public class HashMapMultiThread {static Map<String, String> map = new HashMap<String, String>();public static class AddThread implements Runnable {int start = 0;public AddThread(int start) {this.start = start;}@Overridepublic void run() {for (int i = start; i < 100000; i+=2) {map.put(Integer.toString(i), Integer.toBinaryString(i));}}}public static void main(String[] args) throws InterruptedException {Thread t1 = new Thread(new HashMapMultiThread.AddThread(0));Thread t2 = new Thread(new HashMapMultiThread.AddThread(1));t1.start();t2.start();t1.join();t2.join();System.out.println(map.size());}}第一,程序正常结束,并且结果也是符合预期的。HashMap的大小为100000。第二,程序正常结束,但结果不符合预期,而是一个小于100000的数字。第三,程序永远无法结束。前两种情况,和ArrayList的情况非常相似。而第三种情况,通过查看HashMap.put()方法,可知,由于多线程的冲突,这个链表结构已经遭到破坏,链表成环了,下述的迭代就等同于一个死循环。但这个死循环的问题在JDK8中已经不存在了。由于JDK8对HashMap的内部做了大规模调整,规避了这个问题。但即使这样,贸然在多线程环境下使用HashMap依然会导致内部数据不一致。最简单的解决方案就是使用ConcurrentHashMap代替HashMap。for (Entry<K,V> e = table[i]; e != null; e = e.next) {Object k;if (e.hash == hash && ((k = e.key) == key || key.equals(k))) {V oldValue = e.value;e.value = value;e.recordaccess(this);return oldValue;}}4)错误的加锁假设我们需要一个计数器,这个计数器会被多个线程同时访问。为了确保数据正确性,我们需要对计数器加锁。代码如下:public class BadLockInteger implements Runnable {public static Integer i = 0;static BadLockInteger instance = new BadLockInteger();@Overridepublic void run() {for (int j = 0; j < 10000000; j++) {synchronized (i) {i++;}}}public static void main(String[] args) throws InterruptedException {Thread t1 = new Thread(instance);Thread t2 = new Thread(instance);t1.start();t2.start();t1.join();t2.join();System.out.println(i);}}结果我们得到了一个比20000000小很多的数字。要解释这个问题,得从Integer说起。在Java中,Integer属于不变对象。也就是说对象一旦被创建,就不可能被修改。i++在真实执行时变成了:i = Integer.valueOf(i.intValue()+1)。进一步查看 Integer.valueOf():public static Integer valueOf(int i) {assert IntegerCache.high >= 127;if (i >= IntegerCache.low && i <= IntegerCache.high)return IntegerCache.cache[i + (-IntegerCache.low)];return new Integer(i);}Integer.valueOf()实际上是一个工厂方法,它会倾向于返回一个代表指定数值的Integer实例。因此,i++的本质是,创建一个新的Integer对象,并将它的引用赋值给i。如此一来,我们就明白问题所在,由于在多个线程间,并不一定能够看到同一个对象(因为i对象一直在变),因此,两个线程每次加锁可能都加在了不同的对象实例上,从而导致对临界区代码控制出现问题。注:本篇博客内容摘自《Java高并发程序设计》
发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表