当サイトは、アフィリエイト広告を利用しています

【Python × MongoDB】pymongoのCRUD操作

作成日:2025月04月13日
更新日:2025年04月13日

pythonからMongoDBの公式クライアントライブラリであり
最も基本的で汎用的な使い方ができる「pymongo」使をつかっての
MongoDBを操作する際の基本的な使い方についてまとめておく。

当記事ではpymongoのコレクションに対する
CRUD操作である

  • insert系操作
  • find系操作
  • update系操作
  • replace系操作
  • delete系操作

についてをまとめる。

ライブラリ

ライブラリとしては

  • pymongo==4.9.2
  • pydantic==2.10.2

を使うのでpip等でインストールする

pymongo

MongoDBをpythonから使うためのライブラリ。

pydantic

CRUD操作するドキュメントデータの作成については
型ヒントを使用してデータのバリデーションを行えて

  • インスタンスへのデシリアライズ
  • インスタンスからのシリアライズ

が簡単にできるpydanticを使う。
※データ作りやすいので。

MongoDBの概要や操作について下記記事でまとめている

mongo-expressはデータをブラウザで確認・編集できるので便利

またpymongo自体のインストールや、CRUD操作までの処理(クライアント取得当)
については下記記事で詳しく紹介している

※flaskでpymongoを使用しているがpymongo自体はフレームワークに依存する ものではないので、pythonスクリプトからでも実行可能

コレクション操作

MongoDBのコレクションに対してCRUD操作を実装する
下記のPythonスクリプトから各クエリーを実行するよう実装する

pymongoのクエリ実行スクリプト

処理フローとしては下記になる

  • クライアントインスタンスを取得する
  • データベースの作成
  • データベースの選択
  • データベースのコレクションの選択
  • クエリーの実行
pymongoのクエリ実行スクリプト
from pymongo import MongoClient
from pymongo.collection import Collection
from pydantic import BaseModel
# MongoDB接続情報
MONGO_URI = 'mongodb://admin:admin123@localhost:27017/'
DB_NAME = "USER_DB"
COLLECTION_NAME = "USER_COLLECTION"
# データモデルクラスを定義
class User(BaseModel):
user_id: int
name: str
age: int
# コレクション操作関数
def main():
# MongoDB接続先
client = MongoClient(MONGO_URI)
# データベース指定
db = client[DB_NAME]
# コレクション指定
collection = db[COLLECTION_NAME]
# データ準備
# クエリ発行
# 結果判定
if __name__ == "__main__":
main()
  • コレクション操作関数を実装する
  • コレクション操作関数をmain()から使う

insert系操作

insert_one():1件挿入

insert_one()
from pymongo import MongoClient
from pymongo.collection import Collection
from pydantic import BaseModel
# MongoDB接続情報
MONGO_URI = 'mongodb://admin:admin123@localhost:27017/'
DB_NAME = "USER_DB"
COLLECTION_NAME = "USER_COLLECTION"
# データモデルクラスを定義
class User(BaseModel):
user_id: int
name: str
age: int
def insert_one_document(collection: Collection, insert_data: User):
try:
# 辞書に変換
insert_data_dict = insert_data.model_dump()
# クエリー実行
result = collection.insert_one(insert_data_dict)
return result
except Exception as e :
return None
def main():
# MongoDB接続先
client = MongoClient(MONGO_URI)
# データベース指定
db = client[DB_NAME]
# コレクション指定
collection = db[COLLECTION_NAME]
# データ準備
insert_data = User(user_id=100, name="Taro", age=25)
# クエリ発行
result = insert_one_document(collection=collection, insert_data=insert_data)
# 結果判定
if result is None:
print("データの挿入に失敗しました。")
else:
print(f"データを挿入しました。挿入ID: {result.inserted_id}")
if __name__ == "__main__":
main()
  • 1つのドキュメントを追加するメソッド
  • 引数のデータは辞書型で渡す必要がある
  • insertの失敗はExceptionになるのでエラーハンドリングをする

insert_many():複数件挿入

insert_many()
from pymongo import MongoClient
from pymongo.collection import Collection
from pydantic import BaseModel
# MongoDB接続情報
MONGO_URI = 'mongodb://admin:admin123@localhost:27017/'
DB_NAME = "USER_DB"
COLLECTION_NAME = "USER_COLLECTION"
# データモデルクラスを定義
class User(BaseModel):
user_id: int
name: str
age: int
def insert_many_document(collection: Collection, insert_data: list[User]):
try:
# 辞書に変換
insert_data_dict_list = [
user.model_dump() for user in insert_data
]
# クエリー実行
result = collection.insert_many(insert_data_dict_list)
return result
except Exception as e :
return None
def main():
# MongoDB接続先
client = MongoClient(MONGO_URI)
# データベース指定
db = client[DB_NAME]
# コレクション指定
collection = db[COLLECTION_NAME]
# データ準備
insert_data_list = [
User(user_id=100, name="Taro", age=25),
User(user_id=200, name="Jaro", age=20)
]
# クエリ発行
result = insert_many_document(collection=collection, insert_data=insert_data_list)
# 結果判定
if result is None:
print("データの挿入に失敗しました。")
else:
print("データを挿入しました。挿入ID一覧:")
for _id in result.inserted_ids:
print(f" - {_id}")
if __name__ == "__main__":
main()
  • 複数のドキュメントを追加するメソッド
  • そのほかはinsert_oneメソッドと同様

find系操作

insertでデータ登録済みの前提。

find_one():1件取得

find_one()
from pymongo import MongoClient
from pymongo.collection import Collection
from pydantic import BaseModel
# MongoDB接続情報
MONGO_URI = 'mongodb://admin:admin123@localhost:27017/'
DB_NAME = "USER_DB"
COLLECTION_NAME = "USER_COLLECTION"
# データモデルクラスを定義
class User(BaseModel):
user_id: int
name: str
age: int
def find_one_document(collection: Collection, user_id: int):
try:
# 抽出条件
query = {"user_id":user_id}
# 項目指定
projection={"_id":0}
# クエリ実行
result = collection.find_one(query,projection)
return result
except Exception as e :
return None
def main():
# MongoDB接続先
client = MongoClient(MONGO_URI)
# データベース指定
db = client[DB_NAME]
# コレクション指定
collection = db[COLLECTION_NAME]
# データ準備
user_id = 100
# クエリ発行
result = find_one_document(collection=collection, user_id=user_id)
# 結果判定
if result is None:
print("データの取得に失敗しました。")
else:
print("データを取得しました。")
print(result)
if __name__ == "__main__":
main()
  • MongoDBのコレクションから、1件のドキュメントを検索するためのメソッド
  • 最初の引数 クエリは検索条件を指定するもので、辞書形式で指定する
  • 2番目の引数 プロジェクション は、返すフィールドを0,1で指定できる※0は返さない
    • 何も指定しなかった場合: 全項目(フィールド)が返される。
    • 0 のみ指定した場合: 指定したフィールドだけが除外され、それ以外は返される。
    • 1 の指定がある場合: 指定されたフィールドだけが返される。_id フィールドはデフォルトで返されるが、明示的に "_id": 0 を追加すると除外できる。

find():複数取得(Cursor型)

find()
from pymongo import MongoClient
from pymongo.collection import Collection
from pydantic import BaseModel
# MongoDB接続情報
MONGO_URI = 'mongodb://admin:admin123@localhost:27017/'
DB_NAME = "USER_DB"
COLLECTION_NAME = "USER_COLLECTION"
# データモデルクラスを定義
class User(BaseModel):
user_id: int
name: str
age: int
def find_many_document(collection: Collection, user_id: int):
try:
# 抽出条件
query = {"user_id":user_id}
# 項目指定
projection={"_id":0}
# クエリ実行
result = list(collection.find(query,projection))
return result
except Exception as e :
return None
def main():
# MongoDB接続先
client = MongoClient(MONGO_URI)
# データベース指定
db = client[DB_NAME]
# コレクション指定
collection = db[COLLECTION_NAME]
# データ準備
user_id = 100
# クエリ発行
result = find_many_document(collection=collection, user_id=user_id)
# 結果判定
if result is None:
print("データの取得に失敗しました。")
else:
print("データを取得しました。")
for data in result:
print(data)
if __name__ == "__main__":
main()
  • MongoDBのコレクションから、複数のドキュメントを検索するためのメソッド
  • 最初の引数 クエリ は検索条件を指定するもので、辞書形式で指定
  • 2番目の引数 プロジェクション は返すフィールドを0,1で指定できる
    • 何も指定しなかった場合: 全項目(フィールド)が返される。
    • 0 のみ指定した場合: 指定したフィールドだけが除外され、それ以外は返される。
    • 1 の指定がある場合: 指定されたフィールドだけが返される。_id フィールドはデフォルトで返されるが、明示的に "_id": 0 を追加すると除外できる。
  • find() は遅延評価を行うCursorオブジェクトを返す
  • そのためlistやforで評価して使う

update系操作

update_one():1件更新

update_one()
from pymongo import MongoClient
from pymongo.collection import Collection
from pydantic import BaseModel
# MongoDB接続情報
MONGO_URI = 'mongodb://admin:admin123@localhost:27017/'
DB_NAME = "USER_DB"
COLLECTION_NAME = "USER_COLLECTION"
# データモデルクラスを定義
class User(BaseModel):
user_id: int
name: str
age: int
def update_one_document(collection: Collection, user_id: str, update_data: User):
try:
# 辞書変換
update_data_dict = update_data.model_dump()
# クエリ:更新対象となるユーザーIDを指定
query = {"user_id":user_id}
# 更新内容:$set演算子を使って、update_dataに指定されたフィールドのみ更新
update_fields = {"$set": update_data_dict}
# クエリ実行
result = collection.update_one(
query,
update_fields
)
return result
except Exception as e :
return None
def main():
# MongoDB接続先
client = MongoClient(MONGO_URI)
# データベース指定
db = client[DB_NAME]
# コレクション指定
collection = db[COLLECTION_NAME]
# データ準備
user_id = 100
update_data = User(user_id=100, name="Saburo", age=30)
# クエリ発行
result = update_one_document(collection=collection, user_id=user_id, update_data=update_data)
# 結果判定
if result is None:
print("更新に失敗しました。")
elif result.modified_count == 0:
print("一致するデータがありませんでした。更新は行われていません。")
else:
print(f"{result.modified_count} 件のデータを更新しました。")
if __name__ == "__main__":
main()
  • updateはMongoDBのコレクションから、一致した最初の1件のドキュメントを更新するメソッド

    • 部分的に更新することができる
    • フィールド単位で変更が可能で、指定したフィールドのみが更新される
  • 最初の引数 queryは検索条件を指定するもので、辞書形式で指定

  • 2番目の引数 update_fieldsは、更新するフィールドとその新しい値を含む辞書型で指定

    • update_fieldsは、更新演算子(例:$set, $inc)を含む辞書にする
  • 更新演算子(例:$set, $inc, $unset など)を使用することで、特定のフィールドのみ更新できる

    • "$set" は指定されたフィールドを更新。もしフィールドが存在しない場合は、新たに追加
    • "$inc" は数値フィールドをインクリメントする
    • "$unset" は指定されたフィールドを削除する
  • `update_oneメソッドの戻り値は、更新結果を示す情報などを含むオブジェクト

    • modified_count(変更されたドキュメント数)
    • matched_count`(一致したドキュメント数)

update_many():複数件更新

update_many()
from pymongo import MongoClient
from pymongo.collection import Collection
from pydantic import BaseModel
# MongoDB接続情報
MONGO_URI = 'mongodb://admin:admin123@localhost:27017/'
DB_NAME = "USER_DB"
COLLECTION_NAME = "USER_COLLECTION"
# データモデルクラスを定義
class User(BaseModel):
user_id: int
name: str
age: int
def update_many_document(collection: Collection, user_id: str, update_data: User):
try:
# 辞書変換
update_data_dict = update_data.model_dump()
# クエリ:更新対象となるユーザーIDを指定
query = {"user_id":user_id}
# 更新内容:$set演算子を使って、update_dataに指定されたフィールドのみ更新
update_fields = {"$set": update_data_dict}
# クエリを実行
result = collection.update_many(
query,
update_fields
)
return result
except Exception as e :
return None
def main():
# MongoDB接続先
client = MongoClient(MONGO_URI)
# データベース指定
db = client[DB_NAME]
# コレクション指定
collection = db[COLLECTION_NAME]
# データ準備
user_id = 100
update_data = User(user_id=100, name="siro", age=30)
# クエリ発行
result = update_many_document(collection=collection, user_id=user_id, update_data=update_data)
# 結果判定
if result is None:
print("更新に失敗しました。")
elif result.modified_count == 0:
print("一致するデータがありませんでした。更新は行われていません。")
else:
print(f"{result.modified_count} 件のデータを更新しました。")
if __name__ == "__main__":
main()
  • updateはMongoDBのコレクションから、一致したすべてのドキュメントを更新するメソッド
  • そのほかの動作はupdate_oneメソッドと同様

replace系操作

updateがフィールド単位で部分更新するのに対して
replaceはドキュメントごと置き換える

replace_one():1件をまるごと置き換え

replace_one()
from pymongo import MongoClient
from pymongo.collection import Collection
from pydantic import BaseModel
# MongoDB接続情報
MONGO_URI = 'mongodb://admin:admin123@localhost:27017/'
DB_NAME = "USER_DB"
COLLECTION_NAME = "USER_COLLECTION"
# データモデルクラスを定義
class User(BaseModel):
user_id: int
name: str
age: int
def replace_one_document(collection: Collection, user_id: str, replace_data: dict):
try:
# 条件指定
query = {"user_id": user_id}
# クエリ実行
result = collection.replace_one(query, replace_data)
return result
except Exception as e:
return None
def main():
# MongoDB接続先
client = MongoClient(MONGO_URI)
# データベース指定
db = client[DB_NAME]
# コレクション指定
collection = db[COLLECTION_NAME]
# データ準備
user_id = 100
# 置き換えでデータ作成
replace_data = {"user_id":100}
# クエリ発行
result = replace_one_document(collection=collection, user_id=user_id, replace_data=replace_data)
# 結果判定
if result is None:
print("更新に失敗しました。")
elif result.matched_count == 0:
print("一致するデータが見つかりませんでした。")
elif result.modified_count == 0:
print("データは見つかりましたが、内容に変更がなかったため更新されませんでした。")
else:
print(f"{result.modified_count} 件のデータを更新しました。")
if __name__ == "__main__":
main()
  • MongoDBのコレクションから、一致した最初1件のドキュメントを完全に置き換えるメソッド

  • 最初の引数 queryは検索条件を指定するもので、辞書形式で指定

  • 2番目の引数 replace_data は、新しいドキュメント全体を表す辞書を指定。

    • 元のドキュメント全体を置き換えるため、古いフィールドはすべて削除される
    • _id フィールドが必要であれば、明示的に replace_data に含める必要がある
  • replace_oneメソッドの戻り値は、更新結果を示すオブジェクト

    • matched_count(一致したドキュメントの数)
    • modified_count(実際に置き換えが行われたドキュメントの数)

delete系操作

delete_one():1件削除

delete_one()
from pymongo import MongoClient
from pymongo.collection import Collection
from pydantic import BaseModel
# MongoDB接続情報
MONGO_URI = 'mongodb://admin:admin123@localhost:27017/'
DB_NAME = "USER_DB"
COLLECTION_NAME = "USER_COLLECTION"
# データモデルクラスを定義
class User(BaseModel):
user_id: int
name: str
age: int
def delete_one_document(collection: Collection, user_id: int):
try:
# 削除条件
query = {"user_id": user_id}
# クエリ実行
result = collection.delete_one(query)
return result
except Exception as e:
print(f"エラーが発生しました: {e}")
return None
def main():
# MongoDB接続先
client = MongoClient(MONGO_URI)
# データベース指定
db = client[DB_NAME]
# コレクション指定
collection = db[COLLECTION_NAME]
# データ準備
user_id = 100
# クエリ発行
result = delete_one_document(collection=collection, user_id=user_id)
# 結果判定
if result is None:
print("削除処理に失敗しました。")
elif result.deleted_count == 0:
print("一致するデータが見つかりませんでした。")
else:
print(f"{result.deleted_count} 件のデータを削除しました。")
if __name__ == "__main__":
main()
  • MongoDBのコレクションから、一致した最初1件のドキュメントを削除するメソッド

  • 最初の引数 queryは削除条件を指定する辞書形式で指定

  • delete_oneの戻り値は DeleteResultオブジェクトで、下記の情報を保持する

    • deleted_count(削除された件数)

delete_many():複数件削除

delete_many()
from pymongo import MongoClient
from pymongo.collection import Collection
from pydantic import BaseModel
# MongoDB接続情報
MONGO_URI = 'mongodb://admin:admin123@localhost:27017/'
DB_NAME = "USER_DB"
COLLECTION_NAME = "USER_COLLECTION"
# データモデルクラスを定義
class User(BaseModel):
user_id: int
name: str
age: int
def delete_many_document(collection: Collection, user_id: int):
try:
# 削除条件
query = {"user_id": user_id}
# クエリ実行
result = collection.delete_many(query)
return result
except Exception as e:
print(f"エラーが発生しました: {e}")
return None
def main():
# MongoDB接続先
client = MongoClient(MONGO_URI)
# データベース指定
db = client[DB_NAME]
# コレクション指定
collection = db[COLLECTION_NAME]
# データ準備
user_id = 100
# クエリ発行
result = delete_many_document(collection=collection, user_id=user_id)
# 結果判定
if result is None:
print("削除処理に失敗しました。")
elif result.deleted_count == 0:
print("一致するデータが見つかりませんでした。")
else:
print(f"{result.deleted_count} 件のデータを削除しました。")
if __name__ == "__main__":
main()
  • MongoDBのコレクションから、一致したすべてのドキュメントを削除するメソッド
  • そのほかの動作はdelete_oneメソッドと同様

まとめ

pythonからpymongoを使ったCRUD処理の基本的なところを
まとめてみた

基本的に

  • クエリ条件
  • データ
  • 戻り値

は辞書型なのでpydanticでデータモデルクラスとして扱う場合は
適宜、変換が必要になる

新着記事

タグ別一覧
top