当サイトは、アフィリエイト広告を利用しています
pythonからMongoDBの公式クライアントライブラリであり
最も基本的で汎用的な使い方ができる「pymongo」使をつかっての
MongoDBを操作する際の基本的な使い方についてまとめておく。
当記事ではpymongoのコレクションに対する
CRUD操作である
についてをまとめる。
ライブラリとしては
を使うのでpip等でインストールする
MongoDBをpythonから使うためのライブラリ。
CRUD操作するドキュメントデータの作成については
型ヒントを使用してデータのバリデーションを行えて
が簡単にできるpydanticを使う。
※データ作りやすいので。
MongoDBの概要や操作について下記記事でまとめている
mongo-expressはデータをブラウザで確認・編集できるので便利
またpymongo自体のインストールや、CRUD操作までの処理(クライアント取得当)
については下記記事で詳しく紹介している
MongoDBのコレクションに対してCRUD操作を実装する
下記のPythonスクリプトから各クエリーを実行するよう実装する
処理フローとしては下記になる
from pymongo import MongoClientfrom pymongo.collection import Collectionfrom pydantic import BaseModel# MongoDB接続情報MONGO_URI = 'mongodb://admin:admin123@localhost:27017/'DB_NAME = "USER_DB"COLLECTION_NAME = "USER_COLLECTION"# データモデルクラスを定義class User(BaseModel):user_id: intname: strage: int# コレクション操作関数def main():# MongoDB接続先client = MongoClient(MONGO_URI)# データベース指定db = client[DB_NAME]# コレクション指定collection = db[COLLECTION_NAME]# データ準備# クエリ発行# 結果判定if __name__ == "__main__":main()
from pymongo import MongoClientfrom pymongo.collection import Collectionfrom pydantic import BaseModel# MongoDB接続情報MONGO_URI = 'mongodb://admin:admin123@localhost:27017/'DB_NAME = "USER_DB"COLLECTION_NAME = "USER_COLLECTION"# データモデルクラスを定義class User(BaseModel):user_id: intname: strage: intdef insert_one_document(collection: Collection, insert_data: User):try:# 辞書に変換insert_data_dict = insert_data.model_dump()# クエリー実行result = collection.insert_one(insert_data_dict)return resultexcept Exception as e :return Nonedef main():# MongoDB接続先client = MongoClient(MONGO_URI)# データベース指定db = client[DB_NAME]# コレクション指定collection = db[COLLECTION_NAME]# データ準備insert_data = User(user_id=100, name="Taro", age=25)# クエリ発行result = insert_one_document(collection=collection, insert_data=insert_data)# 結果判定if result is None:print("データの挿入に失敗しました。")else:print(f"データを挿入しました。挿入ID: {result.inserted_id}")if __name__ == "__main__":main()
from pymongo import MongoClientfrom pymongo.collection import Collectionfrom pydantic import BaseModel# MongoDB接続情報MONGO_URI = 'mongodb://admin:admin123@localhost:27017/'DB_NAME = "USER_DB"COLLECTION_NAME = "USER_COLLECTION"# データモデルクラスを定義class User(BaseModel):user_id: intname: strage: intdef insert_many_document(collection: Collection, insert_data: list[User]):try:# 辞書に変換insert_data_dict_list = [user.model_dump() for user in insert_data]# クエリー実行result = collection.insert_many(insert_data_dict_list)return resultexcept Exception as e :return Nonedef main():# MongoDB接続先client = MongoClient(MONGO_URI)# データベース指定db = client[DB_NAME]# コレクション指定collection = db[COLLECTION_NAME]# データ準備insert_data_list = [User(user_id=100, name="Taro", age=25),User(user_id=200, name="Jaro", age=20)]# クエリ発行result = insert_many_document(collection=collection, insert_data=insert_data_list)# 結果判定if result is None:print("データの挿入に失敗しました。")else:print("データを挿入しました。挿入ID一覧:")for _id in result.inserted_ids:print(f" - {_id}")if __name__ == "__main__":main()
insertでデータ登録済みの前提。
from pymongo import MongoClientfrom pymongo.collection import Collectionfrom pydantic import BaseModel# MongoDB接続情報MONGO_URI = 'mongodb://admin:admin123@localhost:27017/'DB_NAME = "USER_DB"COLLECTION_NAME = "USER_COLLECTION"# データモデルクラスを定義class User(BaseModel):user_id: intname: strage: intdef find_one_document(collection: Collection, user_id: int):try:# 抽出条件query = {"user_id":user_id}# 項目指定projection={"_id":0}# クエリ実行result = collection.find_one(query,projection)return resultexcept Exception as e :return Nonedef main():# MongoDB接続先client = MongoClient(MONGO_URI)# データベース指定db = client[DB_NAME]# コレクション指定collection = db[COLLECTION_NAME]# データ準備user_id = 100# クエリ発行result = find_one_document(collection=collection, user_id=user_id)# 結果判定if result is None:print("データの取得に失敗しました。")else:print("データを取得しました。")print(result)if __name__ == "__main__":main()
from pymongo import MongoClientfrom pymongo.collection import Collectionfrom pydantic import BaseModel# MongoDB接続情報MONGO_URI = 'mongodb://admin:admin123@localhost:27017/'DB_NAME = "USER_DB"COLLECTION_NAME = "USER_COLLECTION"# データモデルクラスを定義class User(BaseModel):user_id: intname: strage: intdef find_many_document(collection: Collection, user_id: int):try:# 抽出条件query = {"user_id":user_id}# 項目指定projection={"_id":0}# クエリ実行result = list(collection.find(query,projection))return resultexcept Exception as e :return Nonedef main():# MongoDB接続先client = MongoClient(MONGO_URI)# データベース指定db = client[DB_NAME]# コレクション指定collection = db[COLLECTION_NAME]# データ準備user_id = 100# クエリ発行result = find_many_document(collection=collection, user_id=user_id)# 結果判定if result is None:print("データの取得に失敗しました。")else:print("データを取得しました。")for data in result:print(data)if __name__ == "__main__":main()
from pymongo import MongoClientfrom pymongo.collection import Collectionfrom pydantic import BaseModel# MongoDB接続情報MONGO_URI = 'mongodb://admin:admin123@localhost:27017/'DB_NAME = "USER_DB"COLLECTION_NAME = "USER_COLLECTION"# データモデルクラスを定義class User(BaseModel):user_id: intname: strage: intdef update_one_document(collection: Collection, user_id: str, update_data: User):try:# 辞書変換update_data_dict = update_data.model_dump()# クエリ:更新対象となるユーザーIDを指定query = {"user_id":user_id}# 更新内容:$set演算子を使って、update_dataに指定されたフィールドのみ更新update_fields = {"$set": update_data_dict}# クエリ実行result = collection.update_one(query,update_fields)return resultexcept Exception as e :return Nonedef main():# MongoDB接続先client = MongoClient(MONGO_URI)# データベース指定db = client[DB_NAME]# コレクション指定collection = db[COLLECTION_NAME]# データ準備user_id = 100update_data = User(user_id=100, name="Saburo", age=30)# クエリ発行result = update_one_document(collection=collection, user_id=user_id, update_data=update_data)# 結果判定if result is None:print("更新に失敗しました。")elif result.modified_count == 0:print("一致するデータがありませんでした。更新は行われていません。")else:print(f"{result.modified_count} 件のデータを更新しました。")if __name__ == "__main__":main()
updateはMongoDBのコレクションから、一致した最初の1件のドキュメントを更新するメソッド
最初の引数 queryは検索条件を指定するもので、辞書形式で指定
2番目の引数 update_fieldsは、更新するフィールドとその新しい値を含む辞書型で指定
$set
, $inc
)を含む辞書にする更新演算子(例:$set
, $inc
, $unset
など)を使用することで、特定のフィールドのみ更新できる
`update_oneメソッドの戻り値は、更新結果を示す情報などを含むオブジェクト
from pymongo import MongoClientfrom pymongo.collection import Collectionfrom pydantic import BaseModel# MongoDB接続情報MONGO_URI = 'mongodb://admin:admin123@localhost:27017/'DB_NAME = "USER_DB"COLLECTION_NAME = "USER_COLLECTION"# データモデルクラスを定義class User(BaseModel):user_id: intname: strage: intdef update_many_document(collection: Collection, user_id: str, update_data: User):try:# 辞書変換update_data_dict = update_data.model_dump()# クエリ:更新対象となるユーザーIDを指定query = {"user_id":user_id}# 更新内容:$set演算子を使って、update_dataに指定されたフィールドのみ更新update_fields = {"$set": update_data_dict}# クエリを実行result = collection.update_many(query,update_fields)return resultexcept Exception as e :return Nonedef main():# MongoDB接続先client = MongoClient(MONGO_URI)# データベース指定db = client[DB_NAME]# コレクション指定collection = db[COLLECTION_NAME]# データ準備user_id = 100update_data = User(user_id=100, name="siro", age=30)# クエリ発行result = update_many_document(collection=collection, user_id=user_id, update_data=update_data)# 結果判定if result is None:print("更新に失敗しました。")elif result.modified_count == 0:print("一致するデータがありませんでした。更新は行われていません。")else:print(f"{result.modified_count} 件のデータを更新しました。")if __name__ == "__main__":main()
updateがフィールド単位で部分更新するのに対して
replaceはドキュメントごと置き換える
from pymongo import MongoClientfrom pymongo.collection import Collectionfrom pydantic import BaseModel# MongoDB接続情報MONGO_URI = 'mongodb://admin:admin123@localhost:27017/'DB_NAME = "USER_DB"COLLECTION_NAME = "USER_COLLECTION"# データモデルクラスを定義class User(BaseModel):user_id: intname: strage: intdef replace_one_document(collection: Collection, user_id: str, replace_data: dict):try:# 条件指定query = {"user_id": user_id}# クエリ実行result = collection.replace_one(query, replace_data)return resultexcept Exception as e:return Nonedef main():# MongoDB接続先client = MongoClient(MONGO_URI)# データベース指定db = client[DB_NAME]# コレクション指定collection = db[COLLECTION_NAME]# データ準備user_id = 100# 置き換えでデータ作成replace_data = {"user_id":100}# クエリ発行result = replace_one_document(collection=collection, user_id=user_id, replace_data=replace_data)# 結果判定if result is None:print("更新に失敗しました。")elif result.matched_count == 0:print("一致するデータが見つかりませんでした。")elif result.modified_count == 0:print("データは見つかりましたが、内容に変更がなかったため更新されませんでした。")else:print(f"{result.modified_count} 件のデータを更新しました。")if __name__ == "__main__":main()
MongoDBのコレクションから、一致した最初1件のドキュメントを完全に置き換えるメソッド
最初の引数 queryは検索条件を指定するもので、辞書形式で指定
2番目の引数 replace_data は、新しいドキュメント全体を表す辞書を指定。
_id
フィールドが必要であれば、明示的に replace_data
に含める必要があるreplace_oneメソッドの戻り値は、更新結果を示すオブジェクト
from pymongo import MongoClientfrom pymongo.collection import Collectionfrom pydantic import BaseModel# MongoDB接続情報MONGO_URI = 'mongodb://admin:admin123@localhost:27017/'DB_NAME = "USER_DB"COLLECTION_NAME = "USER_COLLECTION"# データモデルクラスを定義class User(BaseModel):user_id: intname: strage: intdef delete_one_document(collection: Collection, user_id: int):try:# 削除条件query = {"user_id": user_id}# クエリ実行result = collection.delete_one(query)return resultexcept Exception as e:print(f"エラーが発生しました: {e}")return Nonedef main():# MongoDB接続先client = MongoClient(MONGO_URI)# データベース指定db = client[DB_NAME]# コレクション指定collection = db[COLLECTION_NAME]# データ準備user_id = 100# クエリ発行result = delete_one_document(collection=collection, user_id=user_id)# 結果判定if result is None:print("削除処理に失敗しました。")elif result.deleted_count == 0:print("一致するデータが見つかりませんでした。")else:print(f"{result.deleted_count} 件のデータを削除しました。")if __name__ == "__main__":main()
MongoDBのコレクションから、一致した最初1件のドキュメントを削除するメソッド
最初の引数 queryは削除条件を指定する辞書形式で指定
delete_oneの戻り値は DeleteResultオブジェクトで、下記の情報を保持する
from pymongo import MongoClientfrom pymongo.collection import Collectionfrom pydantic import BaseModel# MongoDB接続情報MONGO_URI = 'mongodb://admin:admin123@localhost:27017/'DB_NAME = "USER_DB"COLLECTION_NAME = "USER_COLLECTION"# データモデルクラスを定義class User(BaseModel):user_id: intname: strage: intdef delete_many_document(collection: Collection, user_id: int):try:# 削除条件query = {"user_id": user_id}# クエリ実行result = collection.delete_many(query)return resultexcept Exception as e:print(f"エラーが発生しました: {e}")return Nonedef main():# MongoDB接続先client = MongoClient(MONGO_URI)# データベース指定db = client[DB_NAME]# コレクション指定collection = db[COLLECTION_NAME]# データ準備user_id = 100# クエリ発行result = delete_many_document(collection=collection, user_id=user_id)# 結果判定if result is None:print("削除処理に失敗しました。")elif result.deleted_count == 0:print("一致するデータが見つかりませんでした。")else:print(f"{result.deleted_count} 件のデータを削除しました。")if __name__ == "__main__":main()
pythonからpymongoを使ったCRUD処理の基本的なところを
まとめてみた
基本的に
は辞書型なのでpydanticでデータモデルクラスとして扱う場合は
適宜、変換が必要になる