[计算机]实战javaConcurrent.docx

上传人:scccc 文档编号:13384653 上传时间:2021-12-24 格式:DOCX 页数:30 大小:22.97KB
返回 下载 相关 举报
[计算机]实战javaConcurrent.docx_第1页
第1页 / 共30页
[计算机]实战javaConcurrent.docx_第2页
第2页 / 共30页
[计算机]实战javaConcurrent.docx_第3页
第3页 / 共30页
[计算机]实战javaConcurrent.docx_第4页
第4页 / 共30页
[计算机]实战javaConcurrent.docx_第5页
第5页 / 共30页
点击查看更多>>
资源描述

《[计算机]实战javaConcurrent.docx》由会员分享,可在线阅读,更多相关《[计算机]实战javaConcurrent.docx(30页珍藏版)》请在三一文库上搜索。

1、.实战java Concurrent时间:2020-09-24 08:09:55来源:网络 作者:未知 点击:1686次编写多线程的程序一直都是一件比较麻烦的事情,要考虑很多事情,处理不好还会出很多意想不到的麻烦。加上现在很多开发者接触到的项目都是打着企业级旗号的B/S项目,大多数人都很少涉及多线程,这又为本文的主角增加了一份神编写多线程的程序一直都是一件比较麻烦的事情,要考虑很多事情,处理不好还会出很多意想不到的麻烦。加上现在很多开发者接触到的项目都是打着企业级旗号的B/S项目,大多数人都很少涉及多线程,这又为本文的主角增加了一份神秘感。讲到Java多线程,大多数人脑海中

2、跳出来的是Thread、Runnable、synchronized这些是最基本的东西,虽然已经足够强大,但想要用好还真不容易。从JDK 1.5开始,增加了java.util.concurrent包,它的引入大大简化了多线程程序的开发(要感谢一下大牛Doug Lee)。java.util.concurrent包分成了三个部分,分别是java.util.concurrent、 java.util.concurrent.atomic和java.util.concurrent.lock。内容涵盖了并发集合类、线程池机制、同步互斥机制、线程安全的变量更新工具类、锁等等常用工具。为了便于理解,本文使用一个

3、例子来做说明,交代一下它的场景:假设要对一套10个节点组成的环境进行检查,这个环境有两个入口点,通过节点间的依赖关系可以遍历到整个环境。依赖关系可以构成一张有向图,可能存在环。为了提高检查的效率,考虑使用多线程。1、Executors通过这个类能够获得多种线程池的实例,例如可以调用newSingleThreadExecutor()获得单线程的 ExecutorService,调用newFixedThreadPool()获得固定大小线程池的ExecutorService。拿到 ExecutorService可以做的事情就比较多了,最简单的是用它来执行Runnable对象,也可以执行一些实现了 C

4、allable<T>的对象。用Thread的start()方法没有返回值,如果该线程执行的方法有返回值那用 ExecutorService就再好不过了,可以选择submit()、invokeAll()或者invokeAny(),根据具体情况选择合适的方法即可。Java代码  package  service;      import  java.util.ArrayList;   import  java.util.List; 

5、60; import  java.util.concurrent.ExecutionException;   import  java.util.concurrent.ExecutorService;   import  java.util.concurrent.Executors;   import  java.util.concurrent.Future;   import  java.util.concurre

6、nt.TimeUnit;      /*    * 线程池服务类    *     * author DigitalSonic    */    public   class  ThreadPoolService        /*

7、0;       * 默认线程池大小        */        public   static   final   int   DEFAULT_POOL_SIZE    =  5 ;       

8、60;  /*        * 默认一个任务的超时时间,单位为毫秒        */        public   static   final   long  DEFAULT_TASK_TIMEOUT =  1000 ;      

9、    private   int               poolSize             = DEFAULT_POOL_SIZE;       private  ExecutorService 

10、 executorService;          /*        * 根据给定大小创建线程池        */        public  ThreadPoolService( int  poolSize)      

11、      setPoolSize(poolSize);                 /*        * 使用线程池中的线程来执行任务        */        public

12、   void  execute(Runnable task)            executorService.execute(task);                 /*        * 在线程池中执行所有给定的任务并取回运行结果,使

13、用默认超时时间        *         * see #invokeAll(List, long)        */        public  List<Node> invokeAll(List<ValidationTask> tasks)  

14、0;         return  invokeAll(tasks, DEFAULT_TASK_TIMEOUT * tasks.size();                 /*        * 在线程池中执行所有给定的任务并取回运行结果    

15、;    *         * param timeout 以毫秒为单位的超时时间,小于0表示不设定超时        * see java.util.concurrent.ExecutorService#invokeAll(java.util.Collection)        */     

16、   public  List<Node> invokeAll(List<ValidationTask> tasks,  long  timeout)            List<Node> nodes = new  ArrayList<Node>(tasks.size();         &#

17、160; try                 List<Future<Node>> futures = null ;               if  (timeout <  0 )       

18、;             futures = executorService.invokeAll(tasks);               else                

19、60;    futures = executorService.invokeAll(tasks, timeout, TimeUnit.MILLISECONDS);                              for  (Future<Node> fut

20、ure : futures)                    try                         nodes.add(future.get(); 

21、0;                 catch  (ExecutionException e)                        e.printStackTrace();  &

22、#160;                                          catch  (InterruptedException e)    

23、60;           e.printStackTrace();                      return  nodes;             &#

24、160;   /*        * 关闭当前ExecutorService        *         * param timeout 以毫秒为单位的超时时间        */        public

25、60;  void  destoryExecutorService( long  timeout)            if  (executorService !=  null  && !executorService.isShutdown()                tr

26、y                     executorService.awaitTermination(timeout, TimeUnit.MILLISECONDS);               catch  (InterruptedException

27、 e)                    e.printStackTrace();                            

28、0; executorService.shutdown();                            /*        * 关闭当前ExecutorService,随后根据poolSize创建新的ExecutorService   &

29、#160;    */        public   void  createExecutorService()            destoryExecutorService(1000 );           executorService = Executo

30、rs.newFixedThreadPool(poolSize);                 /*        * 调整线程池大小        * see #createExecutorService()        */

31、0;       public   void  setPoolSize( int  poolSize)            this .poolSize = poolSize;           createExecutorService();    &#

32、160;        package service;import java.util.ArrayList;import java.util.List;import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;import java.util.concurr

33、ent.TimeUnit;/* * 线程池服务类 *  * author DigitalSonic */public class ThreadPoolService     /*     * 默认线程池大小     */    public static final int  DEFAULT_POOL_SIZE    = 5;    /*

34、     * 默认一个任务的超时时间,单位为毫秒     */    public static final long DEFAULT_TASK_TIMEOUT = 1000;    private int              poolSize      

35、0;      = DEFAULT_POOL_SIZE;    private ExecutorService  executorService;    /*     * 根据给定大小创建线程池     */    public ThreadPoolService(int poolSize)       &#

36、160; setPoolSize(poolSize);        /*     * 使用线程池中的线程来执行任务     */    public void execute(Runnable task)         executorService.execute(task);       

37、/*     * 在线程池中执行所有给定的任务并取回运行结果,使用默认超时时间     *      * see #invokeAll(List, long)     */    public List<Node> invokeAll(List<ValidationTask> tasks)        

38、 return invokeAll(tasks, DEFAULT_TASK_TIMEOUT * tasks.size();        /*     * 在线程池中执行所有给定的任务并取回运行结果     *      * param timeout 以毫秒为单位的超时时间,小于0表示不设定超时     * see java.util.concurrent.Exec

39、utorService#invokeAll(java.util.Collection)     */    public List<Node> invokeAll(List<ValidationTask> tasks, long timeout)         List<Node> nodes = new ArrayList<Node>(tasks.size();   

40、0;    try             List<Future<Node>> futures = null;            if (timeout < 0)             

41、60;   futures = executorService.invokeAll(tasks);            else                 futures = executorService.invokeAll(tasks, timeout, TimeUnit.MILLISECONDS);

42、60;                       for (Future<Node> future : futures)                 try      &#

43、160;              nodes.add(future.get();                catch (ExecutionException e)             &#

44、160;       e.printStackTrace();                                    catch (InterruptedException e)  &

45、#160;          e.printStackTrace();                return nodes;    /*     * 关闭当前ExecutorService     *    

46、0; * param timeout 以毫秒为单位的超时时间     */    public void destoryExecutorService(long timeout)         if (executorService != null && !executorService.isShutdown()           

47、60; try                 executorService.awaitTermination(timeout, TimeUnit.MILLISECONDS);            catch (InterruptedException e)       &#

48、160;         e.printStackTrace();                        executorService.shutdown();            

49、0; /*     * 关闭当前ExecutorService,随后根据poolSize创建新的ExecutorService     */    public void createExecutorService()         destoryExecutorService(1000);        executorService = Ex

50、ecutors.newFixedThreadPool(poolSize);        /*     * 调整线程池大小     * see #createExecutorService()     */    public void setPoolSize(int poolSize)         this.po

51、olSize = poolSize;        createExecutorService();      这里要额外说明一下invokeAll()和invokeAny()方法。前者会执行给定的所有Callable<T>对象,等所有任务完成后返回一个包含了执行结果的List<Future<T>>,每个Future.isDone()都是true,可以用 Future.get()拿到结果;后者只要完成了列表中的任意一个任务就立刻返回,返回值

52、就是执行结果。还有一个比较诡异的地方 本代码是在JDK 1.6下编译测试的,如果在JDK 1.5下测试,很可能在invokeAll和invokeAny的地方出错。明明ValidationTask实现了 Callable<Node>,可是它死活不认,类型不匹配,这时可以将参数声明由List<ValidationTask>改为 List<Callable<Node>>。造成这个问题的主要原因是两个版本中invokeAll和invokeAny的方法签名不同,1.6里是 invokeAll(Collection<? extends Cal

53、lable<T>> tasks),而1.5里是invokeAll(Collection<Callable<T>> tasks)。网上也有人遇到类似的问题(invokeAll() is not willing to acept a Collection<Callable<T>> )。和其他资源一样,线程池在使用完毕后也需要释放,用shutdown()方法可以关闭线程池,如果当时池里还有没有被执行的任务,它会等待任务执行完毕,在等待期间试图进入线程池的任务将被拒绝。也可以用shutdownNow()来关闭线程池,它会立刻关闭线程池,

54、没有执行的任务作为返回值返回。2、Lock多线程编程中常常要锁定某个对象,之前会用synchronized来实现,现在又多了另一种选择,那就是 java.util.concurrent.locks。通过Lock能够实现更灵活的锁定机制,它还提供了很多synchronized所没有的功能,例如尝试获得锁(tryLock()。使用Lock时需要自己获得锁并在使用后手动释放,这一点与synchronized有所不同,所以通常Lock的使用方式是这样的:Java代码  Lock l = .;    l.lock();  &

55、#160;try         / 执行操作     finally         l.unlock();      Lock l = .; l.lock();try  / 执行操作 finally  l.unlock();java.util.concurrent.locks中提供了几个Lock接口的实现类,比较常用的应该是

56、ReentrantLock。以下范例中使用了ReentrantLock进行节点锁定:Java代码  package  service;      import  java.util.concurrent.locks.Lock;   import  java.util.concurrent.locks.ReentrantLock;      /*    * 节

57、点类    *     * author DigitalSonic    */    public   class  Node        private  String name;       private  String wsdl; 

58、0;     private  String result =  "PASS" ;       private  String dependencies =  new  String ;       private  Lock lock =  new  ReentrantLock();    

59、60;  /*        * 默认构造方法        */        public  Node()                      /*  

60、;      * 构造节点对象,设置名称及WSDL        */        public  Node(String name, String wsdl)            this .name = name;      

61、60;    this .wsdl = wsdl;                 /*        * 返回包含节点名称、WSDL以及验证结果的字符串        */        Override&#

62、160;       public  String toString()            String toString = "Node: "  + name +  " WSDL: "  + wsdl +  " Result: "  + result;     &

63、#160;     return  toString;                     / Getter & Setter        public  String getName()       &#

64、160;    return  name;                 public   void  setName(String name)            this .name = name;      

65、          public  String getWsdl()            return  wsdl;                 public   void  setWsdl(String

66、 wsdl)            this .wsdl = wsdl;                 public  String getResult()            return  result; 

67、60;               public   void  setResult(String result)            this .result = result;              

68、   public  String getDependencies()            return  dependencies;                 public   void  setDependencies(String dependencies) 

69、60;          this .dependencies = dependencies;                 public  Lock getLock()            return  lock; 

70、60;              package service;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;/* * 节点类 *  * author DigitalSonic */public class Node  private String name; priv

71、ate String wsdl; private String result = "PASS" private String dependencies = new String ; private Lock lock = new ReentrantLock(); /*  * 默认构造方法  */ public Node()    /*  * 构造节点对象,设置名称及WSDL  */ public Node(String name, String

72、wsdl)   this.name = name;  this.wsdl = wsdl;  /*  * 返回包含节点名称、WSDL以及验证结果的字符串  */ Override public String toString()   String toString = "Node: " + name + " WSDL: " + wsdl + " Result: " + result;  return

73、 toString;   / Getter & Setter public String getName()   return name;  public void setName(String name)   this.name = name;  public String getWsdl()   return wsdl;  public void setWsdl(String wsdl)   this.w

74、sdl = wsdl;  public String getResult()   return result;  public void setResult(String result)   this.result = result;  public String getDependencies()   return dependencies;  public void setDependencies(String dependencies)  

75、 this.dependencies = dependencies;  public Lock getLock()   return lock; Java代码  package  service;      import  java.util.concurrent.Callable;   import  java.util.concurrent.locks.Lock;  

76、60;import  java.util.logging.Logger;      import  service.mock.MockNodeValidator;      /*    * 执行验证的任务类    *     * author DigitalSonic    */ 

77、;   public   class  ValidationTask  implements  Callable<Node>        private   static  Logger logger = Logger.getLogger( "ValidationTask" );          pri

78、vate  String        wsdl;          /*        * 构造方法,传入节点的WSDL        */        public  ValidationTask(Str

79、ing wsdl)            this .wsdl = wsdl;                 /*        * 执行针对某个节点的验证<br/>        * 如果正有别

80、的线程在执行同一节点的验证则等待其结果,不重复执行验证        */        Override        public  Node call()  throws  Exception            Node node = Validat

81、ionService.NODE_MAP.get(wsdl);           Lock lock = null ;           logger.info("开始验证节点:"  + wsdl);           if  (node != 

82、 null )                lock = node.getLock();               if  (lock.tryLock()             

83、60;      / 当前没有其他线程验证该节点                    logger.info("当前没有其他线程验证节点"  + node.getName() +  ""  + wsdl +  "" );   &

84、#160;               try                         Node result = MockNodeValidator.validateNode(wsdl);  

85、0;                    mergeNode(result, node);                   finally        

86、                 lock.unlock();                                &#

87、160; else                     / 当前有别的线程正在验证该节点,等待结果                    logger.info("当前有别的线程正在验证节点"&#

88、160; + node.getName() +  ""  + wsdl +  ",等待结果" );                   lock.lock();                

89、;   lock.unlock();                          else                 / 从未进行过验证,这种情况应该只出现在系统启动初期

90、0;               / 这时是在做初始化,不应该有冲突发生                logger.info("首次验证节点:"  + wsdl);         

91、60;     node = MockNodeValidator.validateNode(wsdl);               ValidationService.NODE_MAP.put(wsdl, node);                 

92、60;    logger.info("节点"  + node.getName() +  ""  + wsdl +  "验证结束,验证结果:"  + node.getResult();           return  node;          

93、0;      /*        * 将src的内容合并进dest节点中,不进行深度拷贝        */        private  Node mergeNode(Node src, Node dest)          

94、60; dest.setName(src.getName();           dest.setWsdl(src.getWsdl();           dest.setDependencies(src.getDependencies();           dest.setResult(s

95、rc.getResult();           return  dest;             package service;import java.util.concurrent.Callable;import java.util.concurrent.locks.Lock;import java.util.logging.Logger;import se

96、rvice.mock.MockNodeValidator;/* * 执行验证的任务类 *  * author DigitalSonic */public class ValidationTask implements Callable<Node>     private static Logger logger = Logger.getLogger("ValidationTask");    private String   

97、;     wsdl;    /*     * 构造方法,传入节点的WSDL     */    public ValidationTask(String wsdl)         this.wsdl = wsdl;        /*     * 执行针对某个节点的验证<br/>     * 如果正有别的线程在执行同一节点的验证则等待其结果,不重复执行验证     */    Override    pu

展开阅读全文
相关资源
猜你喜欢
相关搜索

当前位置:首页 > 社会民生


经营许可证编号:宁ICP备18001539号-1