2018年4月22日日曜日

[JavaSE8 Goldへの道] その11 並列ストリーム

JavaSE8 Goldへの道(Upgrade to Java SE 8 Programmer 1Z0-810 試験対策)11回目です。

一連の記事は「JavaSE8Gold」ラベルを付けていきます。

今回は並列ストリームです。



並列ストリーム

ストリームには直列(sequential)と並列(parallel)があります。
これまで紹介してきたのは直列の方でした。(その5で少しだけ並列の方にも触れました)

直列ストリームは1つのスレッドで処理されます。対して並列ストリームではマルチスレッドで処理されます。
スレッドの管理は内部で自動的にやってくれるため、プログラマが意識する必要はありません。

ストリームAPIではデフォルトでは直列ストリームが生成されます。並列ストリームを使うには、生成後にBaseStream#parallelメソッドを呼ぶか、ストリーム生成時にCollection#parallelStreamメソッドを呼ぶことで生成できます。
BaseStreamStreamのスーパーインタフェースです。

逆にBaseStream#sequentialメソッドを呼ぶと並列ストリームから直列ストリームに変換できます。

並列かどうかというのは内部的な状態で、型はStreamで変わらないのでコーディングは全く同じにできます。

文末に載せている試験の参考サイトでは、以下のような例が載っています。

直列ストリームで1~10までの数値が入ったストリームを出力すると、当然ながら順番通りに出てきます。
System.out.print("sequential:");
IntStream.rangeClosed(1, 10)
  .forEach(i -> System.out.print(i + " "));
実行結果
sequential:1 2 3 4 5 6 7 8 9 10

parallelメソッドで並列ストリームにすると、複数スレッドで処理されるため順序が不定になります。

System.out.print("parallel:");
IntStream.rangeClosed(1, 10)
  .parallel()
  .forEach(i -> System.out.print(i + " "));
実行結果
parallel:7 6 8 3 9 10 5 4 2 1

スレッド名を出力させてみると、今使っている2コア2スレッドのPCでは4スレッドが動いているのがわかります。

IntStream.rangeClosed(1, 10)
  .parallel()
  .forEach(i -> System.out.println(Thread.currentThread().getName() + ":" + i));

実行結果
main:7
ForkJoinPool.commonPool-worker-2:3
ForkJoinPool.commonPool-worker-1:9
ForkJoinPool.commonPool-worker-2:5
main:6
ForkJoinPool.commonPool-worker-2:4
main:2
ForkJoinPool.commonPool-worker-1:10
ForkJoinPool.commonPool-worker-2:1
ForkJoinPool.commonPool-worker-3:8

試験範囲としてはこれくらいのようですが、もう少し考察してみます。


どういうときに使うべきか

ストリームが内部で持つフラグ

当然ながら、全てのストリームを並列にすればいいというのは間違いです。
CPUコアが1つの場合並列化しても意味ないですし、要素数が少ない・中間操作の処理負荷が低いと、並列化でスレッドを管理するオーバーヘッドの方が高くなってしまいます。

要素数がある程度多い、または中間操作が複雑で時間がかかるほど並列化の効果は上がると思いますが、実際のところ一度測ってみるのが確実です。

「この中間操作・終端操作は並列ストリームの方が良い」みたいなのがあればいいのですが、APIドキュメントに一部書いてあるものの網羅しきってはいません。

例えばfindFirstfindAnyという終端操作があります。どちらも基本は中間操作filterと組み合わせて、条件を満たす最初の要素/いずれかの要素が見つかった時点で返します。
「いずれか」ってなんだよという話ですが、findFirstの方は要素が現れる順番を意識していて、仮に並列ストリームであっても出てきた順番の中で最初の要素を返します。
findAnyの方は順番を意識しないため、並列ストリームだと結果が不定になる代わりに、パフォーマンスが上がります。

long st = System.currentTimeMillis();
OptionalDouble o = LongStream.range(1, 10_000_000_000L)
  .parallel()
  .mapToDouble(Math::sqrt)
  .filter(l -> l > 30000)
  .findAny();
//  .findFirst();
System.out.println(System.currentTimeMillis() - st);
System.out.println(o);

実行結果(findFirst)
3240
OptionalDouble[30000.000016666665]
→常に同じ結果

実行結果(findAny 何度か実行)
1056
OptionalDouble[37238.67277441558]
933
OptionalDouble[58356.28965244449]
918
OptionalDouble[49632.43804207083]
など

APIドキュメントを眺めてみると、ストリームの性質として直列/並列以外に*順序付けされているか*があるのがわかります。
また速度もfindAnyの方が早く終わっています。
BaseStream#unorderdメソッドを呼ぶと順序付けを解除することができるとあるほか、IntStream#rangeは「順序付けされた」、IntStream#generateは「順序付けされていない」ストリームを返すとあります。
上のコードをIntStream#generateに置き換えてみると、

long st = System.currentTimeMillis();
AtomicLong al = new AtomicLong(1);
OptionalDouble o = LongStream.generate(() -> al.getAndAdd(1))
  .parallel()
  .mapToDouble(Math::sqrt)
  .filter(l -> l > 30000)
  .findFirst();
System.out.println(o);
System.out.println(System.currentTimeMillis() - st);

実行結果(findFirstfindAnyどちらもほぼ同じ)
18150
OptionalDouble[30000.0001]
15054
OptionalDouble[30000.000133333335]
15444
OptionalDouble[30000.00015]
となり、findFirstでも結果が不定でfindAnyを使っても速くなりませんでした。

※ラムダ式から外部変数を参照する場合、finalもしくは実質的final(finalは付いていないけど変更されていない)である必要があります。またこの例ではマルチスレッドでgenerateに渡しているSupplierが実行されるため、生成される値がおかしくならないようにjava.util.concurrent.atomic.AtomicLongを使っています。

しかし、IntStream#rangeと同じく「順序付けされた」ストリームを生成するIntStream#iterateを使ってみると、

OptionalDouble o = LongStream.iterate(1, l -> l + 1)
//あとは同じ
実行結果(findFirstfindAnyどちらもほぼ同じ)
3147
OptionalDouble[30000.000016666665]
と、常に同じ結果になりました。

ストリームAPIの実装はとても複雑で全部把握できていませんが、内部でいくつかの状態を持っているようです。
非公開でStreamOpFlagというenumがあり、これには以下の値が定義されています。
  • DISTINCT
  • SORTED
  • ORDERED
  • SIZED
  • SHORT_CIRCUIT
ソースを見ても具体的にそれぞれが立っている時どうなるまでは書いてないのですが、これらを見て最適な処理方法を選択しているようです。

LongStream#rangeLongStream#iterateを比べたとき、おそらく前者はサイズが決まっているからSIZEDが立っていそうです。

peekメソッドでスレッド名と値を出力してみると、LongStream#rangeの方はスレッドに割り当てられる値が突然大きくなったりしており、複数スレッドに割り当てる要素をサイズに応じて変えてるように見えます。

LongStream#iterateの方はサイズがわからない無限ストリームなので、出力を見ると1024個ずつ割り振ってるようでした。
そこでLongStream#iterateを使ったコードでfilterの条件をもっと小さくしてみると、findAnyで結果がばらつくようになりました。


終端操作の並列性

前回その10で紹介したCollectors#toMapCollectors#groupingByには並列化バージョンがあり、それぞれCollectors#toConcurrentMapCollectors#groupingByConcurrentです。
Mapの代わりにjava.util.concurrent.ConcurrentMapを生成します。

しかし、APIドキュメントを見ると「並列ストリームの場合Concurrentが付く方を使わなければならない」とは書いていません

これは通常並列ストリームでは各スレッドごとにコンテナが用意され最後に集約されるのですが、Mapを集約するには他のコレクションよりコストが高くなる可能性があるため、Concurrent付きの方ではConcurrentMapを1つだけ生成して全スレッドが値を入れていくということのようです。
現にtoListtoSetにはConcurrentバージョンがありません。

プログラマは、ストリームのソースの特性(ListなのかSetなのか、サイズが決まっているか、順序付けされているか)を考慮した上で、適切な終端操作を選んで並列ストリームを使う必要がありそうです。


というわけで

後半は試験から外れましたが、並列ストリーム奥が深いです。
いろいろ試してみて経験値を積む必要がありそうです。

今回も以下のサイトとGoldの通常試験の参考書を参考にしています。
一連の記事は「JavaSE8Gold」ラベルを付けていきます。

それではみなさまよきガジェットライフを(´∀`)ノ


0 件のコメント :
コメントを投稿

▼こちらの記事もどうぞ

▼ブログを気に入っていただけたらRSS登録をお願いします!
▼ブログランキング参加中!応援よろしくお願いします。

スポンサーリンク