Java并发编程之LongAdder执行情况是什么

其他教程   发布日期:2025年04月24日   浏览次数:176

这篇文章主要介绍“Java并发编程之LongAdder执行情况是什么”,在日常操作中,相信很多人在Java并发编程之LongAdder执行情况是什么问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Java并发编程之LongAdder执行情况是什么”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

longAccumulate方法

  1. final void longAccumulate(long x, LongBinaryOperator fn,
  2. boolean wasUncontended) {
  3. int h;
  4. if ((h = getProbe()) == 0) {
  5. ThreadLocalRandom.current(); // force initialization
  6. h = getProbe();
  7. wasUncontended = true;
  8. }
  9. boolean collide = false; // True if last slot nonempty
  10. for (;;) {
  11. Cell[] as; Cell a; int n; long v;
  12. if ((as = cells) != null && (n = as.length) > 0) {
  13. if ((a = as[(n - 1) & h]) == null) {
  14. if (cellsBusy == 0) { // Try to attach new Cell
  15. Cell r = new Cell(x); // Optimistically create
  16. if (cellsBusy == 0 && casCellsBusy()) {
  17. boolean created = false;
  18. try { // Recheck under lock
  19. Cell[] rs; int m, j;
  20. if ((rs = cells) != null &&
  21. (m = rs.length) > 0 &&
  22. rs[j = (m - 1) & h] == null) {
  23. rs[j] = r;
  24. created = true;
  25. }
  26. } finally {
  27. cellsBusy = 0;
  28. }
  29. if (created)
  30. break;
  31. continue; // Slot is now non-empty
  32. }
  33. }
  34. collide = false;
  35. }
  36. else if (!wasUncontended) // CAS already known to fail
  37. wasUncontended = true; // Continue after rehash
  38. else if (a.cas(v = a.value, ((fn == null) ? v + x :
  39. fn.applyAsLong(v, x))))
  40. break;
  41. else if (n >= NCPU || cells != as)
  42. collide = false; // At max size or stale
  43. else if (!collide)
  44. collide = true;
  45. else if (cellsBusy == 0 && casCellsBusy()) {
  46. try {
  47. if (cells == as) { // Expand table unless stale
  48. Cell[] rs = new Cell[n << 1];
  49. for (int i = 0; i < n; ++i)
  50. rs[i] = as[i];
  51. cells = rs;
  52. }
  53. } finally {
  54. cellsBusy = 0;
  55. }
  56. collide = false;
  57. continue; // Retry with expanded table
  58. }
  59. h = advanceProbe(h);
  60. }
  61. else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
  62. boolean init = false;
  63. try { // Initialize table
  64. if (cells == as) {
  65. Cell[] rs = new Cell[2];
  66. rs[h & 1] = new Cell(x);
  67. cells = rs;
  68. init = true;
  69. }
  70. } finally {
  71. cellsBusy = 0;
  72. }
  73. if (init)
  74. break;
  75. }
  76. else if (casBase(v = base, ((fn == null) ? v + x :
  77. fn.applyAsLong(v, x))))
  78. break; // Fall back on using base
  79. }
  80. }

代码较长,我们分段来分析,首先介绍一下各部分的内容

  • 第一部分:

    1. for
    循环之前的代码,主要是获取线程的hash值,如果是0的话就强制初始化
  • 第二部分:

    1. for
    循环中第一个
    1. if
    语句,在
    1. Cell
    数组中进行累加、扩容
  • 第三部分:

    1. for
    循环中第一个
    1. else if
    语句,这部分的作用是创建
    1. Cell
    数组并初始化
  • 第四部分:

    1. for
    循环中第二个
    1. else if
    语句,当
    1. Cell
    数组竞争激烈时尝试在
    1. base
    上进行累加

线程hash值

  1. int h;
  2. if ((h = getProbe()) == 0) {
  3. ThreadLocalRandom.current(); // force initialization
  4. h = getProbe();
  5. wasUncontended = true; // true表示没有竞争
  6. }
  7. boolean collide = false; // True if last slot nonempty 可以理解为是否需要扩容

这部分的核心代码是

  1. getProbe
方法,这个方法的作用就是获取线程的
  1. hash
值,方便后面通过位运算定位到
  1. Cell
数组中某个位置,如果是
  1. 0
的话就会进行强制初始化

初始化Cell数组

  1. final void longAccumulate(long x, LongBinaryOperator fn,
  2. boolean wasUncontended) {
  3. // 省略...
  4. for (;;) {
  5. Cell[] as; Cell a; int n; long v;
  6. if ((as = cells) != null && (n = as.length) > 0) {
  7. // 省略...
  8. }
  9. else if (cellsBusy == 0 && cells == as && casCellsBusy()) { // 获取锁
  10. boolean init = false; // 初始化标志
  11. try { // Initialize table
  12. if (cells == as) {
  13. Cell[] rs = new Cell[2]; // 创建Cell数组
  14. rs[h & 1] = new Cell(x); // 索引1位置创建Cell元素,值为x=1
  15. cells = rs; // cells指向新数组
  16. init = true; // 初始化完成
  17. }
  18. } finally {
  19. cellsBusy = 0; // 释放锁
  20. }
  21. if (init)
  22. break; // 跳出循环
  23. }
  24. else if (casBase(v = base, ((fn == null) ? v + x :
  25. fn.applyAsLong(v, x))))
  26. break; // Fall back on using base
  27. }
  28. }

第一种情况下

  1. Cell
数组为
  1. null
,所以会进入第一个
  1. else if
语句,并且没有其他线程进行操作,所以
  1. cellsBusy==0
  1. cells==as
也是
  1. true
  1. casCellsBusy()
尝试对
  1. cellsBusy
进行
  1. cas
操作改成
  1. 1
也是
  1. true

首先创建了一个有两个元素的

  1. Cell
数组,然后通过线程
  1. h & 1
的位运算在索引
  1. 1
的位置设置一个
  1. value
  1. 1
  1. Cell
,然后重新赋值给
  1. cells
,标记初始化成功,修改
  1. cellsBusy
  1. 0
表示释放锁,最后跳出循环,初始化操作就完成了。

对base进行累加

  1. final void longAccumulate(long x, LongBinaryOperator fn,
  2. boolean wasUncontended) {
  3. // 省略...
  4. for (;;) {
  5. Cell[] as; Cell a; int n; long v;
  6. if ((as = cells) != null && (n = as.length) > 0) {
  7. // 省略...
  8. }
  9. else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
  10. // 省略...
  11. }
  12. else if (casBase(v = base, ((fn == null) ? v + x :
  13. fn.applyAsLong(v, x))))
  14. break; // Fall back on using base
  15. }
  16. }

第二个

  1. else if
语句的意思是当
  1. Cell
数组中所有位置竞争都很激烈时,就尝试在
  1. base
上进行累加,可以理解为最后的保障

Cell数组初始化之后

  1. final void longAccumulate(long x, LongBinaryOperator fn,
  2. boolean wasUncontended) {
  3. // 省略...
  4. for (;;) {
  5. Cell[] as; Cell a; int n; long v;
  6. if ((as = cells) != null && (n = as.length) > 0) { // as初始化之后满足条件
  7. if ((a = as[(n - 1) & h]) == null) { // as中某个位置的值为null
  8. if (cellsBusy == 0) { // Try to attach new Cell 是否加锁
  9. Cell r = new Cell(x); // Optimistically create 创建新Cell
  10. if (cellsBusy == 0 && casCellsBusy()) { // 双重检查是否有锁,并尝试加锁
  11. boolean created = false; //
  12. try { // Recheck under lock
  13. Cell[] rs; int m, j;
  14. if ((rs = cells) != null &&
  15. (m = rs.length) > 0 &&
  16. rs[j = (m - 1) & h] == null) { // 重新检查该位置是否为null
  17. rs[j] = r; // 该位置添加Cell元素
  18. created = true; // 新Cell创建成功
  19. }
  20. } finally {
  21. cellsBusy = 0; // 释放锁
  22. }
  23. if (created)
  24. break; // 创建成功,跳出循环
  25. continue; // Slot is now non-empty
  26. }
  27. }
  28. collide = false; // 扩容标志
  29. }
  30. else if (!wasUncontended) // 上面定位到的索引位置的值不为null
  31. wasUncontended = true; // 重新计算hash,重新定位其他索引位置重试
  32. else if (a.cas(v = a.value, ((fn == null) ? v + x :
  33. fn.applyAsLong(v, x)))) // 尝试在该索引位置进行累加
  34. break;
  35. else if (n >= NCPU || cells != as) // 如果数组长度大于等于CPU核心数,就不能在扩容
  36. collide = false; // At max size or stale
  37. else if (!collide) // 数组长度没有达到最大值,修改扩容标志可以扩容
  38. collide = true;
  39. else if (cellsBusy == 0 && casCellsBusy()) { // 尝试加锁
  40. try {
  41. if (cells == as) { // Expand table unless stale
  42. Cell[] rs = new Cell[n << 1]; // 创建一个原来长度2倍的数组
  43. for (int i = 0; i < n; ++i)
  44. rs[i] = as[i]; // 把原来的元素拷贝到新数组中
  45. cells = rs; // cells指向新数组
  46. }
  47. } finally {
  48. cellsBusy = 0; // 释放锁
  49. }
  50. collide = false; // 已经扩容完成,修改标志不用再扩容
  51. continue; // Retry with expanded table
  52. }
  53. h = advanceProbe(h); // 重新获取hash值
  54. }
  55. // 省略...
  56. }

根据代码中的注释分析一遍整体逻辑

  • 首先如果找到数组某个位置上的值为

    1. null
    ,说明可以在这个位置进行操作,就创建一个新的
    1. Cell
    并初始化值为
    1. 1
    放到这个位置,如果失败了就重新计算
    1. hash
    值再重试
  • 定位到的位置已经有值了,说明线程之间产生了竞争,如果

    1. wasUncontended
    1. false
    就修改为
    1. true
    ,并重新计算
    1. hash
    重试
  • 定位的位置有值并且

    1. wasUncontended
    已经是
    1. true
    ,就尝试在该位置进行累加
  • 当累加失败时,判断数组容量是否已经达到最大,如果是就不能进行扩容,只能

    1. rehash
    并重试
  • 如果前面条件都不满足,并且扩容标志

    1. collide
    标记为
    1. false
    的话就修改为
    1. true
    ,表示可以进行扩容,然后
    1. rehash
    重试
  • 首先尝试加锁,成功了就进行扩容操作,每次扩容长度是之前的

    1. 2
    倍,然后把原来数组内容拷贝到新数组,扩容就完成了。

以上就是Java并发编程之LongAdder执行情况是什么的详细内容,更多关于Java并发编程之LongAdder执行情况是什么的资料请关注九品源码其它相关文章!