当サイトは、アフィリエイト広告を利用しています
JavaScriptのStream、JavaのStream API、AngularのObservableなど、
さまざまな技術で「ストリーム」という言葉が登場する
ここでは、ストリームという概念が何を意味するのかを整理していく
ストリーム(Stream)とは
です。
ポイントは
ことにある。
従来のプログラミングでは、多くの場合データを
する
一方ストリームでは
する
データモデルの違いから比較する。
従来の考え方では、データは
と考える。
データ = 集合(構造)
のようなイメージである。
具体的には
などが該当する。
# データ構造numbers = [1, 2, 3, 4, 5]result = []# 処理# 新しいデータ構造for n in numbers:if n % 2 == 0:result.append(n * 2)print(result)
この場合の処理イメージは次のようになる。
データ構造↓処理↓新しいデータ構造
つまり
データを保存してから処理する
というモデルである。
ストリームでは、データを
として扱う。
データ = 流れ
のようなイメージである。
具体的には
などが該当する。
numbers = [1, 2, 3, 4, 5]# 処理の仕組みを作るstream = (n * 2 for n in numbers if n % 2 == 0)# データを流すfor v in stream:print(v)
このときの処理イメージは次のようになる。
1 → 条件チェック → skip2 → 条件チェック → ×2 → 43 → 条件チェック → skip4 → 条件チェック → ×2 → 85 → 条件チェック → skip
処理のモデルは次のようになる。
処理の仕組みを先に作る↓データが流れてくる↓その処理が実行される
つまりストリームでは
データを保持してから処理するのではなく流れてくるデータを逐次処理する
というモデルである。
多くのストリーム処理は、次の3つの要素で構成される。
Source → Operator → Sink
Source:データが発生する場所
Operator:ストリームを変換する処理
Sink:ストリームの最終処理
# Sourcenumbers = [1,2,3,4,5]# Operator# filter: 偶数だけ残す# map: 値を2倍するstream = (n*2 for n in numbers if n%2==0)# Sinkfor v in stream:print(v)
ストリームという概念は、主に次のような問題を扱うために使われる。
従来のデータ構造モデルでは、まずデータをすべて読み込んでから処理する。
lines = open("log.txt").readlines()errors = []for line in lines:if "ERROR" in line:errors.append(line)
この方法では
ファイル↓すべてメモリに読み込む↓処理する
という流れになる。
しかしデータが非常に大きい場合
巨大ログ大量データ継続的なデータ
などでは すべてをメモリに読み込むことが難しくなる。
ストリームでは次のように処理できる。
with open("log.txt") as f:for line in f:if "ERROR" in line:print(line)
イメージ
データを1件読む↓処理する↓次のデータ
このように データを1つずつ処理することでメモリを節約できる。
ストリームは データ処理の流れをパイプラインとして整理するためにも使われる。
従来の処理では、データを中間配列として何度も保存することが多い。
numbers = [1,2,3,4,5]filtered = []for n in numbers:if n % 2 == 0:filtered.append(n)mapped = []for n in filtered:mapped.append(n * 2)print(mapped)
この場合の処理構造は次のようになる。
データ↓filter↓新しい配列↓map↓新しい配列
つまり
中間データ構造を何度も作る
という構造になる。
ストリームでは、これを 処理パイプラインとして表現できる。
numbers = [1,2,3,4,5]# 処理パイプライン作成stream = (n * 2 for n in numbers if n % 2 == 0)# 処理パイプラインにデータを流すfor v in stream:print(v)
この処理は次のように表現できる。
Source↓filter↓map↓Sink
つまりストリームでは
処理の流れをパイプラインとして表現できる
という特徴がある。
ストリームは 時間とともに発生するデータを扱うためにも使われる。
多くのシステムでは、データは一度にすべて存在するわけではなく、 時間とともに生成される。
例
ログユーザーイベントネットワーク通信センサーデータ
このようなデータは
イベント↓イベント↓イベント
のように 時間順に発生する。
ストリームモデルでは、このようなデータを
イベント発生↓処理↓次のイベント
という形で処理できる。
このモデルは
Reactive ProgrammingObservableEvent Stream
などの技術の基礎となっている。
ストリームは次の 3つの軸 で分類することができる。
Stream├ データ取得方式│ ├ Pull│ └ Push│├ 実行モデル│ ├ Sync│ └ Async│└ データの性質├ Finite├ Infinite└ Event
これらの軸は互いに独立した概念である。 実際のストリームシステムは、これらの組み合わせで構成される。
例えば次のようになる。
Python Generator→ Pull + Sync + FiniteAngular Observable→ Push + Async + Event
データ取得方式とは
データを誰が取得するか
というモデルである。
PullPush
の2種類がある。
Pull型では
Consumer → Source
の方向でデータを取得する。
つまり
Consumerが必要なタイミングでデータを取りに行く
モデルである。
代表例
IteratorGeneratorJava Stream
Push型では
Source → Consumer
の方向でデータが送られる。
つまり
Sourceがデータを生成するとConsumerへ通知される
モデルである。
代表例
ObservableEvent Stream
実行モデルとは
ストリーム処理が同期か非同期か
という分類である。
SyncAsync
Syncでは
処理↓次の処理↓次の処理
のように順番に処理が実行される。
代表例
Python GeneratorJava Stream API
Asyncでは
イベント↓非同期処理↓次のイベント
のようにイベントやI/Oに応じて処理が進む。
代表例
ObservableAsync IteratorAsync Generator
ストリームはデータの性質によっても分類できる。
FiniteInfiniteEvent
Finite Streamは
終了するストリーム
である。
例
配列ファイルDB結果
データの終端に到達するとストリームも終了する。
Infinite Streamは
終わらないストリーム
である。
例
ログセンサーリアルタイムデータ
データが継続的に生成されるため理論上ストリームは終了しない。
Event Streamは
イベントが発生したときにだけデータが流れるストリーム
である。
例
クリックイベントWebSocketメッセージキュー
イベントが発生するたびにデータがストリームとして流れる。
これまで見てきた
の組み合わせによってストリームの実装パターンが決まる。
代表的なパターンを整理すると次のようになる。
Sync Async-----------------------------------------Pull Python Generator Async IteratorJava StreamPush 実用的でない RxJS Observable(Consumer処理中に Event StreamSourceがブロックされる)
Push + Sync は理論上は可能だが、 Consumerが処理中にSourceがブロックされるため 実質的にPullモデルと同じ振る舞いになる。
そのため実用的なストリーム設計ではほとんど使われない。
実際のストリーム処理では主に次の3パターンが使われる。
Pull + SyncPull + AsyncPush + Async
代表例
Python GeneratorJava Stream API
numbers = [1,2,3,4,5]stream = (n*2 for n in numbers if n%2==0)for v in stream:print(v)
List<Integer> numbers = List.of(1,2,3,4,5);numbers.stream().filter(n -> n % 2 == 0).map(n -> n * 2).forEach(System.out::println);
Consumerがデータを1件ずつ取得し、同期的に処理する。
代表例
Async GeneratorAsync Iterator
import asyncioasync def stream():for i in range(3):await asyncio.sleep(1)yield iasync def main():async for v in stream():print(v)asyncio.run(main())
Consumerがデータを取得するが、処理は非同期に進行する。
代表例
RxJS ObservableEvent Stream
import { fromEvent } from 'rxjs';const clicks = fromEvent(document, 'click');clicks.subscribe(event => {console.log("click", event);});
イベントが発生するとデータがConsumerへPushされる。
ストリームとは
データを集合ではなく流れとして扱う抽象モデル
である。
ストリーム処理では
Source → Operator → Sink
という構造でデータを処理する。
またストリームは
Pull / PushSync / AsyncFinite / Infinite / Event
などの軸で分類できる。
この概念を理解すると
大量データ処理ログ処理イベント処理リアクティブプログラミング
など、多くのデータ処理モデルを統一的に理解することができる。