当サイトは、アフィリエイト広告を利用しています

ストリームとは何か?

作成日:2026月03月28日
更新日:2026年03月28日

ストリームとは何か

JavaScriptのStream、JavaのStream API、AngularのObservableなど、
さまざまな技術で「ストリーム」という言葉が登場する

ここでは、ストリームという概念が何を意味するのかを整理していく

ストリーム(Stream)とは

  • 順序を持って流れてくるデータを、到着した順に逐次処理するための抽象モデル

です。

ポイントは

  • データを「集合」ではなく「流れ」として扱う

ことにある。

従来のプログラミングでは、多くの場合データを

  • 配列やリストなどの構造(入れ物)として保持してから処理

する

一方ストリームでは

  • データが順番に流れてくるものとして扱い、その流れに沿って処理

する

従来のデータモデルとの違い

データモデルの違いから比較する。

従来のデータモデル(データ構造)

従来の考え方では、データは

  • まとまった集合として存在するもの

と考える。

data_model_structure.txt
データ = 集合(構造)

のようなイメージである。

具体的には

  • 配列
  • リスト
  • テーブル

などが該当する。

traditional_data_structure.py
# データ構造
numbers = [1, 2, 3, 4, 5]
result = []
# 処理
# 新しいデータ構造
for n in numbers:
if n % 2 == 0:
result.append(n * 2)
print(result)

この場合の処理イメージは次のようになる。

traditional_flow.txt
データ構造
処理
新しいデータ構造

つまり

store_then_process.txt
データを保存してから処理する

というモデルである。


ストリームのデータモデル(データフロー)

ストリームでは、データを

  • 時間とともに流れてくるもの

として扱う。

data_flow_model.txt
データ = 流れ

のようなイメージである。

具体的には

  • ログ(次々に出力される)
  • ネットワーク通信(リクエストが順に届く)
  • イベント(クリックなどが発生する)
  • センサー(値が継続的に送られる)

などが該当する。

stream_model_python.py
numbers = [1, 2, 3, 4, 5]
# 処理の仕組みを作る
stream = (n * 2 for n in numbers if n % 2 == 0)
# データを流す
for v in stream:
print(v)

このときの処理イメージは次のようになる。

stream_execution_flow.txt
1 → 条件チェック → skip
2 → 条件チェック → ×2 → 4
3 → 条件チェック → skip
4 → 条件チェック → ×2 → 8
5 → 条件チェック → skip

処理のモデルは次のようになる。

stream_process_model.txt
処理の仕組みを先に作る
データが流れてくる
その処理が実行される

つまりストリームでは

stream_definition_model.txt
データを保持してから処理するのではなく
流れてくるデータを逐次処理する

というモデルである。

ストリームの基本構造

多くのストリーム処理は、次の3つの要素で構成される。

stream_basic_structure.txt
Source → Operator → Sink
  • Source:データが発生する場所

    • 配列
    • ファイル
    • イベント
    • ネットワーク
  • Operator:ストリームを変換する処理

    • filter
    • map
  • Sink:ストリームの最終処理

    • print
    • collect
    • save

Python例

source_operator_sink_python.py
# Source
numbers = [1,2,3,4,5]
# Operator
# filter: 偶数だけ残す
# map: 値を2倍する
stream = (n*2 for n in numbers if n%2==0)
# Sink
for v in stream:
print(v)

ストリームはなぜ必要なのか

ストリームという概念は、主に次のような問題を扱うために使われる。

  1. 大量データを効率よく処理する
  2. データ処理の流れを整理する
  3. 時間とともに発生するデータを扱う

大量データを効率よく処理する

従来のデータ構造モデルでは、まずデータをすべて読み込んでから処理する。

read_all_then_process.py
lines = open("log.txt").readlines()
errors = []
for line in lines:
if "ERROR" in line:
errors.append(line)

この方法では

read_all_flow.txt
ファイル
すべてメモリに読み込む
処理する

という流れになる。

しかしデータが非常に大きい場合

large_data_examples.txt
巨大ログ
大量データ
継続的なデータ

などでは すべてをメモリに読み込むことが難しくなる。

ストリームでは次のように処理できる。

stream_file_processing.py
with open("log.txt") as f:
for line in f:
if "ERROR" in line:
print(line)

イメージ

stream_one_by_one_image.txt
データを1件読む
処理する
次のデータ

このように データを1つずつ処理することでメモリを節約できる。


データ処理の流れを整理する

ストリームは データ処理の流れをパイプラインとして整理するためにも使われる。

従来の処理では、データを中間配列として何度も保存することが多い。

traditional_pipeline_example.py
numbers = [1,2,3,4,5]
filtered = []
for n in numbers:
if n % 2 == 0:
filtered.append(n)
mapped = []
for n in filtered:
mapped.append(n * 2)
print(mapped)

この場合の処理構造は次のようになる。

traditional_pipeline_flow.txt
データ
filter
新しい配列
map
新しい配列

つまり

intermediate_data_structure.txt
中間データ構造を何度も作る

という構造になる。

ストリームでは、これを 処理パイプラインとして表現できる。

stream_pipeline_example.py
numbers = [1,2,3,4,5]
# 処理パイプライン作成
stream = (n * 2 for n in numbers if n % 2 == 0)
# 処理パイプラインにデータを流す
for v in stream:
print(v)

この処理は次のように表現できる。

stream_pipeline_flow.txt
Source
filter
map
Sink

つまりストリームでは

pipeline_processing.txt
処理の流れをパイプラインとして表現できる

という特徴がある。


時間とともに発生するデータを扱う

ストリームは 時間とともに発生するデータを扱うためにも使われる。

多くのシステムでは、データは一度にすべて存在するわけではなく、 時間とともに生成される。

time_based_data_examples.txt
ログ
ユーザーイベント
ネットワーク通信
センサーデータ

このようなデータは

event_stream_image.txt
イベント
イベント
イベント

のように 時間順に発生する。

ストリームモデルでは、このようなデータを

event_stream_processing.txt
イベント発生
処理
次のイベント

という形で処理できる。

このモデルは

event_stream_technologies.txt
Reactive Programming
Observable
Event Stream

などの技術の基礎となっている。

ストリームの分類

ストリームは次の 3つの軸 で分類することができる。

stream_classification_axes.txt
Stream
├ データ取得方式
│ ├ Pull
│ └ Push
├ 実行モデル
│ ├ Sync
│ └ Async
└ データの性質
├ Finite
├ Infinite
└ Event

これらの軸は互いに独立した概念である。 実際のストリームシステムは、これらの組み合わせで構成される。

例えば次のようになる。

stream_examples.txt
Python Generator
→ Pull + Sync + Finite
Angular Observable
→ Push + Async + Event

データ取得方式

データ取得方式とは

データを誰が取得するか

というモデルである。

pull_push_types.txt
Pull
Push

の2種類がある。


Pull型ストリーム

Pull型では

pull_stream_direction.txt
Consumer → Source

の方向でデータを取得する。

つまり

Consumerが必要なタイミングでデータを取りに行く

モデルである。

代表例

pull_stream_examples.txt
Iterator
Generator
Java Stream

Push型ストリーム

Push型では

push_stream_direction.txt
Source → Consumer

の方向でデータが送られる。

つまり

Sourceがデータを生成するとConsumerへ通知される

モデルである。

代表例

push_stream_examples.txt
Observable
Event Stream

実行モデル

実行モデルとは

ストリーム処理が同期か非同期か

という分類である。

execution_models.txt
Sync
Async

Sync(同期)

Syncでは

sync_execution_flow.txt
処理
次の処理
次の処理

のように順番に処理が実行される。

代表例

sync_examples.txt
Python Generator
Java Stream API

Async(非同期)

Asyncでは

async_execution_flow.txt
イベント
非同期処理
次のイベント

のようにイベントやI/Oに応じて処理が進む。

代表例

async_examples.txt
Observable
Async Iterator
Async Generator

データの性質

ストリームはデータの性質によっても分類できる。

stream_data_types.txt
Finite
Infinite
Event

Finite Stream

Finite Streamは

終了するストリーム

である。

finite_stream_examples.txt
配列
ファイル
DB結果

データの終端に到達するとストリームも終了する。


Infinite Stream

Infinite Streamは

終わらないストリーム

である。

infinite_stream_examples.txt
ログ
センサー
リアルタイムデータ

データが継続的に生成されるため理論上ストリームは終了しない。


Event Stream

Event Streamは

イベントが発生したときにだけデータが流れるストリーム

である。

event_stream_examples.txt
クリックイベント
WebSocket
メッセージキュー

イベントが発生するたびにデータがストリームとして流れる。


ストリームパターン

これまで見てきた

  • データ取得方式
  • 実行モデル
  • データの性質

の組み合わせによってストリームの実装パターンが決まる。

代表的なパターンを整理すると次のようになる。

stream_pattern_matrix.txt
Sync Async
-----------------------------------------
Pull Python Generator Async Iterator
Java Stream
Push 実用的でない RxJS Observable
(Consumer処理中に Event Stream
Sourceがブロックされる)

Push + Sync は理論上は可能だが、 Consumerが処理中にSourceがブロックされるため 実質的にPullモデルと同じ振る舞いになる。

そのため実用的なストリーム設計ではほとんど使われない。

実際のストリーム処理では主に次の3パターンが使われる。

main_stream_patterns.txt
Pull + Sync
Pull + Async
Push + Async

Pull + Sync

代表例

pull_sync_examples.txt
Python Generator
Java Stream API
python_generator_stream.py
numbers = [1,2,3,4,5]
stream = (n*2 for n in numbers if n%2==0)
for v in stream:
print(v)
java_stream_api_example.java
List<Integer> numbers = List.of(1,2,3,4,5);
numbers.stream()
.filter(n -> n % 2 == 0)
.map(n -> n * 2)
.forEach(System.out::println);

Consumerがデータを1件ずつ取得し、同期的に処理する。


Pull + Async

代表例

pull_async_examples.txt
Async Generator
Async Iterator
python_async_generator_stream.py
import asyncio
async def stream():
for i in range(3):
await asyncio.sleep(1)
yield i
async def main():
async for v in stream():
print(v)
asyncio.run(main())

Consumerがデータを取得するが、処理は非同期に進行する。


Push + Async

代表例

push_async_examples.txt
RxJS Observable
Event Stream
rxjs_observable_example.ts
import { fromEvent } from 'rxjs';
const clicks = fromEvent(document, 'click');
clicks.subscribe(event => {
console.log("click", event);
});

イベントが発生するとデータがConsumerへPushされる。

まとめ

ストリームとは

stream_summary_definition.txt
データを
集合ではなく
流れとして扱う
抽象モデル

である。

ストリーム処理では

stream_summary_structure.txt
Source → Operator → Sink

という構造でデータを処理する。

またストリームは

stream_summary_axes.txt
Pull / Push
Sync / Async
Finite / Infinite / Event

などの軸で分類できる。

この概念を理解すると

stream_use_cases.txt
大量データ処理
ログ処理
イベント処理
リアクティブプログラミング

など、多くのデータ処理モデルを統一的に理解することができる。

新着記事

タグ一覧
top