咨询电话

Java 8中的ConcurrentHashMap 置顶

发表于2017-03-10 09:47:46 次查看
每个Java工程师应该了解微服务:反应微服务架构Lightbend合作为您带来

 

ConcurrentHashMap这些天已经是一个非常受欢迎的数据结构。在本讲座中,我们将介绍Java 8中引入的新功能。

Java 8引入了forEach,search和reduce方法,这些方法非常支持并行性。这三个操作有四种形式:使用键,值,条目和键值对参数接受函数。

所有这些方法都采用称为parallelismThreshold的第一个参数。

我们将以这个地图定义为例:

ConcurrentHashMap < StringInteger >  hashMap  =  new  ConcurrentHashMap <>();
hashMapput“A”1);
hashMapput“B”2);
hashMapput“C”3);
hashMapput“D”4);
hashMapput“E”5);
hashMapput“F”6);
hashMapput“G”7);

 

并行度阈值

这是定义如何执行操作 - 顺序或并行。假设你给出了一个parallelismThreshold为2.所以只要在你的map中少于两个元素,它就是顺序的。否则,它是并行的(取决于JVM)。

hashMap的forEach2,(ķv- >  系统出来的println“键盘>”  +  ķ  +  “与值-相关>”  +  v  + “,由线程>” +  主题currentThread()。getName()));

 

它在我的机器上生成以下o / p(你可以看到两个不同的线程在操作 - main和ForkJoinPool.commonPool-worker-1):

key-> A通过thread-> main与value-> 1相关
key-> D与value-> 4相关,通过thread-> ForkJoinPool.commonPool-worker-1
key-> B通过thread-> main与value-> 2相关
key-> E与value-> 5相关,通过thread-> ForkJoinPool.commonPool-worker-1
key-> C与value-> 3相关,由thread-> main
key-> F与value-> 6相关,通过thread-> ForkJoinPool.commonPool-worker-1
key-> G与value-> 7相关,通过thread-> ForkJoinPool.commonPool-worker-1

 

forEach

此方法具有签名:

public void forEach(long parallelismThreshold, BiConsumer action)

我们已经看到了parallelismThreshold。现在,让我们来看看BiConsumer。它是一个FunctionalInterface,它接受两个输入参数并且不返回任何结果。它有这个定义:

@FunctionalInterface
public  interface  BiConsumer < TU > {
  void  acceptT  tU  u);
}}

 

所以forEach需要parallelismThreshold和BiConsumer,并调用这个方法:

 

 ForEachMappingTaskbatchForparallelismThreshold),00行动)。invoke();

 

batchFor(parallelismThreshold)方法是使用ForkJoinPool.getCommonPoolParallelism()方法来获取并行性,正如我们在parallelismThreshold示例中所见。

这里,  action 是我们在forEach方法中传递的BiConsumer。

ForEachMappingTask是一个扩展BulkTask的静态final类,BulkTask扩展了抽象类CountedCompleter,它扩展了抽象类ForkJoinTask。所以简而言之,ForEachMappingTask是支持ForkJoinTask。

ForEachMappingTask类具有如下计算方法:

public  final  void  compute(){
final  BiConsumer <? 超级 ķ  V >  动作 ;
    如果((行动 =  动作!=  ){
      forint  i  =  baseIndexfh ; batch  >  0  &&
      (h  =((f  =  baseLimit+  i>>>  1>  i ;){
        addToPendingCount1);
        new  ForEachMappingTask < KV >thisbatch  >>> =  1
            baseLimit  =  hftabaction)。fork();
      }}
      forNode < KV >  p ;(p  =  advance())!=  null ;)
      行动接受ppVAL);
 propagateCompletion();
    }}
}}

 

每次我们执行一个子任务,addToPendingCount(int)方法被调用,如上面的代码。这只是一个计数器,这是确定任务是否执行完成。

上面的计算方法负责使用ForkJoinTask的fork方法将子任务添加到队列(共享/非共享队列),如上面的代码所示。

new ForEachMappingTask (this, batch >>>= 1, baseLimit = h, f, tab, action).fork(); 

 

让我们用下面的例子来理解BiConsumer的accept方法:

import  javautil功能BiConsumer ;
public  class  BiConsumerTest {
  public  static  void  mainString [] args){
    BiConsumer < StringString >  biConsumer  =xy- > {
      系统出来println“Key =>”  +  x  +  “,value =>” +  y);
    };
    biConsumeraccept“k”“arun”);
  }}
}}

 

其他ForkJoinTasks,如RecursiveTask和RecursiveAction,不需要任何显式调用来完成他们的工作,而在CountedCompleter的情况下,需要显式调用propagateCompletion()。

这是在计算方法中看到的同样的事情。

正如我们上面已经看到的,调用ForEachMappingTask的invoke方法,因为ForEachMappingTask扩展了ForkJoinTask; 实现如下 -

public  final  V  invoke(){
  int  s ;
  if((s  =  doInvoke() DONE_MASK!=  NORMAL
    reportExceptions);
  return  getRawResult();
}}

 

这里,getRawResult()返回计算结果,默认为null。否则,它应该被覆盖。

搜索

让我们看一个搜索代码段:

String  result  =  hashMapsearch1,(kv {
  系统出来的println线程currentThread()的getName());
  如果ķ等于“A” ))
    return  k  + “ - ”  + v ;
  return  null ;
});
系统出来println“result =>”  + result);

 

O / P:

主要
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-2
result => A-1

 

搜索方法的方法签名为:

public U search(long parallelismThreshold, BiFunction searchFunction)

search方法调用SearchMappingsTask的invoke方法。SearchMappingsTask是BulkTask的子类,我们已经看到。SearchmappingTask有一个如下的计算方法:

public  final  void  compute(){
  最终 BiFunction <? 超级 ķ  V extends  U >  searchFunction ;
  final  AtomicReference < U >  result ;
  如果((searchFunction  =  searchFunction!=   &&
   (结果 =  结果!=  ){
  forint  i  =  baseIndexfh ; batch  >  0  &&
h  =((f  =  baseLimit+  i>>>  1>  i ;){
      如果结果获得()!=  
        返回 ;
      addToPendingCount1);
      new  SearchMappingsTask < KVU >thisbatch  >>> =  1
        baseLimit  =  hftabsearchFunctionresult)。fork();
}}
结果获得()==  ){
      U  u ;
      节点&lt; KV &gt  ; p ;
      if((p  =  advance())==  null){
        propagateCompletion();
        突破 ;
      }}
      如果((û  =  searchFunction适用ppVAL))!=  ){
        如果结果compareAndSetü))
          quietlyCompleteRoot();
        突破 ;
      }}
    }}
  }}
}}

 

在上面的方法中,AtomicReference用于有一个对象引用,它可以被原子地更新,因为它是一个结果。每次执行一个子任务时,addToPendingCount(int)方法就像上面的代码一样被调用。这只是有一个计数器来确定任务执行是否顺利。然后,它调用fork方法使其并行。

其他ForkJoinTasks,如RecursiveTask和RecursiveAction,不需要任何显式调用来完成他们的工作,而在CountedCompleter的情况下,需要显式调用propagateCompletion()。后来,quietlyCompleteRoot()方法正常完成任务。

合并

根据Javadoc,“如果指定的键尚未与一个(非空)值关联,则将它与给定的值相关联,否则,将该值替换为给定重映射函数的结果,否则删除该值。整个方法调用以原子方式执行,在计算正在进行时,其他线程在该映射上的一些尝试更新操作可能被阻塞,因此计算应该是简短且简单的,并且不得尝试更新此映射的任何其他映射。

让我们看看同样的代码片段:

ConcurrentHashMap < StringString >  map  =  new  ConcurrentHashMap <>();
地图put“X”“x”);
系统出来println“1st ==>”  + map);
系统出来的println“第2 ==>”  +  地图合并“X” “×” ,(V1V2- >  ));
系统出来println“3rd ==>”  + map);
地图put“Y”“y”);
地图put“X”“x1”);
系统出来的println“第4 ==>”  + 地图合并“X” “×1” ,(V1V2- >  “Z” ));
系统出来println“5th ==>”  + map);
系统出来的println“6 ==>”  + 地图合并
 “X”“x1”,(v1v2 v2concat“z”)));
系统出来println“7th ==>”  + map);

 

O / P:

1st ==> {X = x}
2nd ==> null
3rd ==> {}
4th ==> z
5th ==> {X = z,Y = y}
6th ==> x1z
7th ==> {X = x1z,Y = y}

 

合并方法的方法签名是:

public V merge(K key, V value, BiFunction remappingFunction)

 

这里,remappingFunction是重新计算值(如果存在)的函数。

getOrDefault

根据Javadoc,“它返回指定键映射到的值,或者如果此映射不包含键的映射,则返回给定的默认值。

让我们看看代码片段:

ConcurrentHashMap < StringInteger >  defaultMap  = 
  new  ConcurrentHashMap < StringInteger >();
defaultMapput“X”30);
系统出来printlndefaultMap);
系统出来的printlndefaultMapgetOrDefault“Y” 21));

 

O / P:

{X = 30}
21

 

计算

通常,我们对映射值进行一些计算并将其存储回来。在并发模型中,它很难管理,这就是Java引入compute方法的原因。整个方法调用以原子方式执行。

compute和computeIfPresent方法将重映射函数作为计算值的参数,并且重映射是类型BiFunction。computeIfAbsent方法接受一个参数作为mappingFunction来计算一个值,因此mappingFunction类型为Function。

让我们看看代码片段来理解这一点:

ConcurrentHashMap < StringInteger >  map1  =  new  ConcurrentHashMap <>();
map1put“A”1);
map1put“B”2);
map1put“C”3);
//计算现有键的新值
系统出来的println“第1次印刷=>”  + MAP1计算“A” 
  (kv- >  v  ==  null   42v  +  40));
系统出来println“2nd print =>”  +  map1);
//这将添加一个新的(键,值)对
系统出来的println“第三打印=>”  +  MAP1计算“X” 
  (kv- >  v  ==  null   42v  +  41));
系统出来println“4th print =>”  +  map1);

// computeIfPresent方法
系统出来的println“第5打印=>”  +  MAP1computeIfPresent“X” ,(ķv- >  v  ==    42v  +  10));
系统出来println“6th print =>”  +  map1);

// computeIfAbsent方法
系统出来的println“第七打印=>”  +  MAP1computeIfAbsent“Y” ķ- >  90));
系统出来println“8th print =>”  +  map1);

 

O / P:

1st print => 41
2nd print => {A = 41,B = 2,C = 3}
3rd print => 42
第四打印=> {A = 41,B = 2,C = 3,X = 42}
第五次打印=> 52
6th print => {A = 41,B = 2,C = 3,X = 52}
7th print => 90
8th print => {A = 41,B = 2,C = 3,X = 52,Y = 90}

 

减少 

reduce方法签名是:

公共 U  减少 并行度阈值BiFunction  变压器BiFunction  减速器

 

这里,变换器是返回元素的变换的函数,或者如果没有变换则返回null(在这种情况下它不被组合),并且reducer是可交换关联组合函数。

reduce方法调用MapReduceMappingsTask的invoke方法。MapReduceMappingsTask扩展了BulkTask,我们已经看到了这一点。

让我们看看下面的例子:

ConcurrentHashMap < StringInteger >  reducedMap  =  new  ConcurrentHashMap <>();
reduceMapput“One”1);
reduceMapput“Two”2);
reduceMapput“Three”3);
系统出来println“reduce example =>” 
 + reducedMap减少2,(kv v * 2,(totalelem total  +  elem));

系统出来println“reduceKeys example =>” 
 + reducedMapreduceKeys2,(KEY1KEY2- >  键1长度()>  键2()  键1  +  “ - ” + KEY2KEY2  +  “ - ” + 键1));

系统出来println“reduceValues example =>” 
 + reducedMapreduceValues2,(v- >  v * 2,(value1value2- >  value1  >  value2   value1  -  value2value2  -  value1
系统出来println“After reduce =>”  + reduceMap);

 

O / P:

reduce example => 12
reduceKeys example => Three-Two-One
reduceValues example => 0
after reduce => {One = 1,Two = 2,Three = 3}

 

有一个伟大的一天学习。请享用!

Java的微服务,解释。通过Reactive Microservices Architecture(一本免费的O'Reilly书)重振您的传统系统(和您的职业生涯)

 

 
在线客服
  • 点击这里给我发消息
  • 点击这里给我发消息
  • 微信扫一扫
  • 官方微信