ベクトルデータベース完全ガイド 2025年版 - RAGからレコメンドまで
ベクトルデータベースの基本概念から実装、最適化まで包括的に解説。Pinecone、Weaviate、ChromaDB、Qdrantの比較と、RAG、レコメンドシステム、画像検索の実装例を紹介します。
約5分で読めます
技術記事
実践的
この記事のポイント
ベクトルデータベースの基本概念から実装、最適化まで包括的に解説。Pinecone、Weaviate、ChromaDB、Qdrantの比較と、RAG、レコメンドシステム、画像検索の実装例を紹介します。
この記事では、実践的なアプローチで技術的な課題を解決する方法を詳しく解説します。具体的なコード例とともに、ベストプラクティスを学ぶことができます。
はじめに
ベクトルデータベースは、AIアプリケーション、特に大規模言語モデル(LLM)を活用したアプリケーションにおいて重要な技術基盤となっています。2025年現在、多様なベクトルデータベースが登場し、それぞれ異なる特徴と用途を持っています。
ベクトルデータベースの基本概念
ベクトル化とは
graph TB subgraph "データソース" T[テキスト文書] I[画像] A[音声] V[動画] end subgraph "エンベディングモデル" TM[Text Embedding
OpenAI, BGE, E5] IM[Image Embedding
CLIP, DINOv2] AM[Audio Embedding
Wav2Vec] VM[Video Embedding
VideoBERT] end subgraph "ベクトル表現" TV[テキストベクトル
768次元] IV[画像ベクトル
512次元] AV[音声ベクトル
1024次元] VV[動画ベクトル
2048次元] end subgraph "ベクトルデータベース" VDB[(Vector Database)] SI[類似性検索] IDX[インデックス
HNSW, IVF] end T --> TM --> TV --> VDB I --> IM --> IV --> VDB A --> AM --> AV --> VDB V --> VM --> VV --> VDB VDB --> SI --> IDX
類似性検索の仕組み
# embedding_utils.py
import numpy as np
from typing import List, Tuple, Union
import torch
from sentence_transformers import SentenceTransformer
class EmbeddingManager:
def __init__(self, model_name: str = "sentence-transformers/all-MiniLM-L6-v2"):
self.model = SentenceTransformer(model_name)
self.dimension = self.model.get_sentence_embedding_dimension()
def encode_texts(self, texts: List[str]) -> np.ndarray:
"""テキストをベクトルに変換"""
embeddings = self.model.encode(texts, convert_to_numpy=True)
return embeddings
def encode_text(self, text: str) -> np.ndarray:
"""単一テキストをベクトルに変換"""
return self.model.encode([text], convert_to_numpy=True)[0]
@staticmethod
def cosine_similarity(vector1: np.ndarray, vector2: np.ndarray) -> float:
"""コサイン類似度の計算"""
dot_product = np.dot(vector1, vector2)
norm1 = np.linalg.norm(vector1)
norm2 = np.linalg.norm(vector2)
if norm1 == 0 or norm2 == 0:
return 0.0
return dot_product / (norm1 * norm2)
@staticmethod
def euclidean_distance(vector1: np.ndarray, vector2: np.ndarray) -> float:
"""ユークリッド距離の計算"""
return np.linalg.norm(vector1 - vector2)
@staticmethod
def dot_product_similarity(vector1: np.ndarray, vector2: np.ndarray) -> float:
"""内積による類似度"""
return np.dot(vector1, vector2)
def find_most_similar(
self,
query_vector: np.ndarray,
candidate_vectors: np.ndarray,
top_k: int = 5,
metric: str = "cosine"
) -> List[Tuple[int, float]]:
"""最も類似したベクトルを検索"""
similarities = []
for i, candidate in enumerate(candidate_vectors):
if metric == "cosine":
sim = self.cosine_similarity(query_vector, candidate)
elif metric == "euclidean":
sim = -self.euclidean_distance(query_vector, candidate) # 距離なので負の値
elif metric == "dot_product":
sim = self.dot_product_similarity(query_vector, candidate)
else:
raise ValueError(f"Unknown metric: {metric}")
similarities.append((i, sim))
# 類似度でソート
similarities.sort(key=lambda x: x[1], reverse=True)
return similarities[:top_k]
# 使用例
def demonstrate_similarity_search():
embedding_manager = EmbeddingManager()
# サンプルデータ
documents = [
"機械学習は人工知能の一分野です",
"深層学習はニューラルネットワークを使用します",
"自然言語処理はテキストデータを扱います",
"コンピュータビジョンは画像認識技術です",
"データサイエンスは統計学とプログラミングを組み合わせます"
]
# ドキュメントをベクトル化
doc_embeddings = embedding_manager.encode_texts(documents)
# クエリ
query = "AIと機械学習について教えて"
query_embedding = embedding_manager.encode_text(query)
# 類似検索
results = embedding_manager.find_most_similar(
query_embedding,
doc_embeddings,
top_k=3,
metric="cosine"
)
print(f"クエリ: {query}")
print("類似度の高い文書:")
for idx, similarity in results:
print(f" {similarity:.3f}: {documents[idx]}")
主要ベクトルデータベースの比較
アーキテクチャと特徴
// vector-db-comparison.ts
interface VectorDatabaseConfig {
name: string;
type: 'cloud' | 'self-hosted' | 'hybrid';
indexAlgorithms: string[];
maxDimensions: number;
scalability: 'vertical' | 'horizontal' | 'both';
consistency: 'eventual' | 'strong';
specialFeatures: string[];
}
const vectorDatabases: VectorDatabaseConfig[] = [
{
name: 'Pinecone',
type: 'cloud',
indexAlgorithms: ['HNSW'],
maxDimensions: 20000,
scalability: 'horizontal',
consistency: 'eventual',
specialFeatures: [
'マネージドサービス',
'リアルタイム更新',
'メタデータフィルタリング',
'ハイブリッド検索'
]
},
{
name: 'Weaviate',
type: 'hybrid',
indexAlgorithms: ['HNSW', 'LSH'],
maxDimensions: 65536,
scalability: 'both',
consistency: 'eventual',
specialFeatures: [
'GraphQLクエリ',
'マルチモーダル検索',
'スキーマ管理',
'モジュラーアーキテクチャ'
]
},
{
name: 'Qdrant',
type: 'self-hosted',
indexAlgorithms: ['HNSW', 'IVF'],
maxDimensions: 65536,
scalability: 'horizontal',
consistency: 'strong',
specialFeatures: [
'Rust製で高速',
'ペイロード付きベクトル',
'フィルタリング最適化',
'分散クラスタリング'
]
},
{
name: 'ChromaDB',
type: 'self-hosted',
indexAlgorithms: ['HNSW', 'Brute Force'],
maxDimensions: 2048,
scalability: 'vertical',
consistency: 'strong',
specialFeatures: [
'軽量・簡単セットアップ',
'Python統合',
'ローカル開発',
'SQLite基盤'
]
},
{
name: 'Milvus',
type: 'self-hosted',
indexAlgorithms: ['HNSW', 'IVF', 'ANNOY', 'FAISS'],
maxDimensions: 32768,
scalability: 'horizontal',
consistency: 'eventual',
specialFeatures: [
'クラウドネイティブ',
'高スループット',
'Kubernetes対応',
'アルゴリズム豊富'
]
}
];
class VectorDatabaseSelector {
static selectDatabase(requirements: {
scale: 'small' | 'medium' | 'large';
budget: 'low' | 'medium' | 'high';
maintenance: 'minimal' | 'moderate' | 'full';
performance: 'standard' | 'high' | 'ultra';
features: string[];
}): VectorDatabaseConfig[] {
return vectorDatabases.filter(db => {
// スケール要件
if (requirements.scale === 'large' && db.scalability === 'vertical') {
return false;
}
// 予算要件(クラウドサービスは高コスト)
if (requirements.budget === 'low' && db.type === 'cloud') {
return false;
}
// メンテナンス要件
if (requirements.maintenance === 'minimal' && db.type === 'self-hosted') {
return false;
}
// 機能要件
const hasRequiredFeatures = requirements.features.every(feature =>
db.specialFeatures.some(dbFeature =>
dbFeature.toLowerCase().includes(feature.toLowerCase())
)
);
return hasRequiredFeatures;
});
}
}
Pinecone の実装例
# pinecone_implementation.py
import pinecone
from typing import List, Dict, Any, Optional
import uuid
import time
class PineconeVectorStore:
def __init__(self, api_key: str, environment: str, index_name: str):
pinecone.init(api_key=api_key, environment=environment)
self.index_name = index_name
self.index = None
def create_index(self, dimension: int, metric: str = "cosine", pod_type: str = "p1.x1"):
"""インデックスの作成"""
if self.index_name not in pinecone.list_indexes():
pinecone.create_index(
name=self.index_name,
dimension=dimension,
metric=metric,
pod_type=pod_type
)
# インデックスの準備完了を待機
while not pinecone.describe_index(self.index_name).status['ready']:
time.sleep(1)
self.index = pinecone.Index(self.index_name)
def upsert_vectors(
self,
vectors: List[np.ndarray],
metadatas: Optional[List[Dict[str, Any]]] = None,
ids: Optional[List[str]] = None
) -> Dict[str, int]:
"""ベクトルの挿入・更新"""
if not self.index:
raise ValueError("Index not initialized")
# IDの生成
if ids is None:
ids = [str(uuid.uuid4()) for _ in vectors]
# メタデータの準備
if metadatas is None:
metadatas = [{}] * len(vectors)
# Pinecone形式に変換
upsert_data = []
for i, (vector, metadata, vector_id) in enumerate(zip(vectors, metadatas, ids)):
upsert_data.append({
'id': vector_id,
'values': vector.tolist(),
'metadata': metadata
})
# バッチ処理(Pineconeの制限に従って分割)
batch_size = 100
upserted_count = 0
for i in range(0, len(upsert_data), batch_size):
batch = upsert_data[i:i + batch_size]
response = self.index.upsert(vectors=batch)
upserted_count += response['upserted_count']
return {'upserted_count': upserted_count}
def query(
self,
query_vector: np.ndarray,
top_k: int = 10,
filter_dict: Optional[Dict[str, Any]] = None,
include_metadata: bool = True,
include_values: bool = False
) -> List[Dict[str, Any]]:
"""類似ベクトル検索"""
if not self.index:
raise ValueError("Index not initialized")
query_response = self.index.query(
vector=query_vector.tolist(),
top_k=top_k,
filter=filter_dict,
include_metadata=include_metadata,
include_values=include_values
)
return query_response['matches']
def hybrid_search(
self,
query_vector: np.ndarray,
query_text: str,
top_k: int = 10,
alpha: float = 0.5 # ベクトル検索とテキスト検索の重み
) -> List[Dict[str, Any]]:
"""ハイブリッド検索(ベクトル + テキスト)"""
# ベクトル検索
vector_results = self.query(
query_vector=query_vector,
top_k=top_k * 2 # より多くの候補を取得
)
# テキスト検索(メタデータ内のテキストフィールドを検索)
text_filter = {
"text": {"$regex": f".*{query_text}.*"}
}
text_results = self.query(
query_vector=query_vector,
top_k=top_k * 2,
filter_dict=text_filter
)
# 結果のマージとスコア計算
combined_results = self._merge_search_results(
vector_results, text_results, alpha
)
return combined_results[:top_k]
def _merge_search_results(
self,
vector_results: List[Dict],
text_results: List[Dict],
alpha: float
) -> List[Dict]:
"""検索結果のマージ"""
result_dict = {}
# ベクトル検索結果の処理
for i, result in enumerate(vector_results):
vector_score = result['score']
vector_rank_score = 1.0 / (i + 1) # 順位による重み
result_dict[result['id']] = {
**result,
'combined_score': alpha * vector_score + (1 - alpha) * vector_rank_score
}
# テキスト検索結果の処理
for i, result in enumerate(text_results):
text_rank_score = 1.0 / (i + 1)
if result['id'] in result_dict:
# 既存の結果に加算
result_dict[result['id']]['combined_score'] += (1 - alpha) * text_rank_score
else:
# 新しい結果として追加
result_dict[result['id']] = {
**result,
'combined_score': alpha * result.get('score', 0) + (1 - alpha) * text_rank_score
}
# 統合スコアでソート
sorted_results = sorted(
result_dict.values(),
key=lambda x: x['combined_score'],
reverse=True
)
return sorted_results
def delete_vectors(self, ids: List[str]) -> Dict[str, Any]:
"""ベクトルの削除"""
if not self.index:
raise ValueError("Index not initialized")
return self.index.delete(ids=ids)
def get_index_stats(self) -> Dict[str, Any]:
"""インデックス統計の取得"""
if not self.index:
raise ValueError("Index not initialized")
return self.index.describe_index_stats()
Qdrant の実装例
# qdrant_implementation.py
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct, Filter, FieldCondition, Range
from typing import List, Dict, Any, Optional
import uuid
class QdrantVectorStore:
def __init__(self, host: str = "localhost", port: int = 6333):
self.client = QdrantClient(host=host, port=port)
self.collection_name = None
def create_collection(
self,
collection_name: str,
dimension: int,
distance: Distance = Distance.COSINE
):
"""コレクションの作成"""
self.collection_name = collection_name
# 既存のコレクションがあるかチェック
collections = self.client.get_collections().collections
existing_names = [col.name for col in collections]
if collection_name not in existing_names:
self.client.create_collection(
collection_name=collection_name,
vectors_config=VectorParams(
size=dimension,
distance=distance
)
)
def upsert_points(
self,
vectors: List[np.ndarray],
payloads: Optional[List[Dict[str, Any]]] = None,
ids: Optional[List[str]] = None
) -> Dict[str, Any]:
"""ポイントの挿入・更新"""
if not self.collection_name:
raise ValueError("Collection not initialized")
# IDの生成
if ids is None:
ids = [str(uuid.uuid4()) for _ in vectors]
# ペイロードの準備
if payloads is None:
payloads = [{}] * len(vectors)
# PointStruct の作成
points = []
for vector, payload, point_id in zip(vectors, payloads, ids):
points.append(
PointStruct(
id=point_id,
vector=vector.tolist(),
payload=payload
)
)
# バッチアップサート
operation_info = self.client.upsert(
collection_name=self.collection_name,
points=points
)
return {
'operation_id': operation_info.operation_id,
'status': operation_info.status
}
def search(
self,
query_vector: np.ndarray,
top_k: int = 10,
filter_conditions: Optional[Filter] = None,
score_threshold: Optional[float] = None
) -> List[Dict[str, Any]]:
"""ベクトル検索"""
if not self.collection_name:
raise ValueError("Collection not initialized")
search_result = self.client.search(
collection_name=self.collection_name,
query_vector=query_vector.tolist(),
query_filter=filter_conditions,
limit=top_k,
score_threshold=score_threshold
)
# 結果の整形
results = []
for point in search_result:
results.append({
'id': point.id,
'score': point.score,
'payload': point.payload,
'vector': point.vector
})
return results
def filtered_search(
self,
query_vector: np.ndarray,
field_filters: Dict[str, Any],
top_k: int = 10
) -> List[Dict[str, Any]]:
"""フィルタ付き検索"""
# フィルタ条件の構築
conditions = []
for field, value in field_filters.items():
if isinstance(value, (int, float)):
if isinstance(value, int):
conditions.append(
FieldCondition(
key=field,
match={"value": value}
)
)
else:
conditions.append(
FieldCondition(
key=field,
range=Range(gte=value, lte=value)
)
)
elif isinstance(value, str):
conditions.append(
FieldCondition(
key=field,
match={"value": value}
)
)
elif isinstance(value, dict):
# 範囲指定
if 'gte' in value or 'lte' in value:
conditions.append(
FieldCondition(
key=field,
range=Range(**value)
)
)
filter_obj = Filter(must=conditions) if conditions else None
return self.search(
query_vector=query_vector,
top_k=top_k,
filter_conditions=filter_obj
)
def batch_search(
self,
query_vectors: List[np.ndarray],
top_k: int = 10
) -> List[List[Dict[str, Any]]]:
"""バッチ検索"""
if not self.collection_name:
raise ValueError("Collection not initialized")
# バッチ検索の実行
search_queries = [
{
'vector': vector.tolist(),
'limit': top_k
}
for vector in query_vectors
]
batch_results = self.client.search_batch(
collection_name=self.collection_name,
requests=search_queries
)
# 結果の整形
formatted_results = []
for result_list in batch_results:
formatted_result = []
for point in result_list:
formatted_result.append({
'id': point.id,
'score': point.score,
'payload': point.payload
})
formatted_results.append(formatted_result)
return formatted_results
def recommend_points(
self,
positive_ids: List[str],
negative_ids: Optional[List[str]] = None,
top_k: int = 10,
filter_conditions: Optional[Filter] = None
) -> List[Dict[str, Any]]:
"""レコメンド機能"""
if not self.collection_name:
raise ValueError("Collection not initialized")
recommend_result = self.client.recommend(
collection_name=self.collection_name,
positive=positive_ids,
negative=negative_ids or [],
limit=top_k,
query_filter=filter_conditions
)
results = []
for point in recommend_result:
results.append({
'id': point.id,
'score': point.score,
'payload': point.payload
})
return results
def get_collection_info(self) -> Dict[str, Any]:
"""コレクション情報の取得"""
if not self.collection_name:
raise ValueError("Collection not initialized")
info = self.client.get_collection(self.collection_name)
return {
'name': info.name,
'status': info.status,
'vectors_count': info.vectors_count,
'segments_count': info.segments_count,
'disk_data_size': info.disk_data_size,
'ram_data_size': info.ram_data_size
}
RAG(Retrieval-Augmented Generation)の実装
エンドツーエンドRAGシステム
# rag_system.py
from typing import List, Dict, Any, Optional
import openai
from dataclasses import dataclass
import asyncio
import logging
@dataclass
class Document:
id: str
content: str
title: str
metadata: Dict[str, Any]
embedding: Optional[np.ndarray] = None
@dataclass
class RAGResponse:
answer: str
source_documents: List[Document]
confidence_score: float
reasoning: str
class RAGSystem:
def __init__(
self,
vector_store: Any, # VectorStore interface
embedding_manager: EmbeddingManager,
llm_client: openai.OpenAI,
chunk_size: int = 1000,
chunk_overlap: int = 200
):
self.vector_store = vector_store
self.embedding_manager = embedding_manager
self.llm_client = llm_client
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.logger = logging.getLogger(__name__)
def chunk_document(self, document: Document) -> List[Document]:
"""ドキュメントをチャンクに分割"""
content = document.content
chunks = []
# 重複を考慮した分割
start = 0
chunk_id = 0
while start < len(content):
end = min(start + self.chunk_size, len(content))
# 文境界で分割調整
if end < len(content):
last_period = content.rfind('.', start, end)
last_newline = content.rfind('\n', start, end)
boundary = max(last_period, last_newline)
if boundary > start:
end = boundary + 1
chunk_content = content[start:end].strip()
if chunk_content:
chunk = Document(
id=f"{document.id}_chunk_{chunk_id}",
content=chunk_content,
title=document.title,
metadata={
**document.metadata,
'parent_id': document.id,
'chunk_index': chunk_id,
'chunk_start': start,
'chunk_end': end
}
)
chunks.append(chunk)
chunk_id += 1
# オーバーラップを考慮して次の開始位置を設定
start = end - self.chunk_overlap
if start >= end:
break
return chunks
async def ingest_documents(self, documents: List[Document]) -> Dict[str, int]:
"""ドキュメントの取り込み"""
all_chunks = []
# ドキュメントをチャンクに分割
for doc in documents:
chunks = self.chunk_document(doc)
all_chunks.extend(chunks)
# チャンクをベクトル化
contents = [chunk.content for chunk in all_chunks]
embeddings = self.embedding_manager.encode_texts(contents)
# ベクトルストアに保存
metadatas = []
for chunk in all_chunks:
metadatas.append({
'title': chunk.title,
'content': chunk.content,
**chunk.metadata
})
result = self.vector_store.upsert_vectors(
vectors=embeddings,
metadatas=metadatas,
ids=[chunk.id for chunk in all_chunks]
)
self.logger.info(f"Ingested {len(all_chunks)} chunks from {len(documents)} documents")
return {
'documents_processed': len(documents),
'chunks_created': len(all_chunks),
'vectors_upserted': result.get('upserted_count', len(all_chunks))
}
async def retrieve_relevant_documents(
self,
query: str,
top_k: int = 5,
filter_dict: Optional[Dict[str, Any]] = None,
rerank: bool = True
) -> List[Document]:
"""関連ドキュメントの取得"""
# クエリをベクトル化
query_embedding = self.embedding_manager.encode_text(query)
# ベクトル検索
search_results = self.vector_store.query(
query_vector=query_embedding,
top_k=top_k * 2 if rerank else top_k, # リランク用に多めに取得
filter_dict=filter_dict,
include_metadata=True
)
# 結果をDocumentオブジェクトに変換
documents = []
for result in search_results:
metadata = result.get('metadata', {})
doc = Document(
id=result['id'],
content=metadata.get('content', ''),
title=metadata.get('title', ''),
metadata=metadata
)
documents.append(doc)
# リランク処理
if rerank and len(documents) > top_k:
documents = await self.rerank_documents(query, documents, top_k)
return documents[:top_k]
async def rerank_documents(
self,
query: str,
documents: List[Document],
top_k: int
) -> List[Document]:
"""ドキュメントのリランク"""
# Cross-encoder を使用したリランク(簡略版)
scores = []
for doc in documents:
# 簡易的なスコアリング(実際にはCross-encoderモデルを使用)
query_terms = set(query.lower().split())
doc_terms = set(doc.content.lower().split())
# Jaccard similarity
intersection = len(query_terms.intersection(doc_terms))
union = len(query_terms.union(doc_terms))
jaccard_score = intersection / union if union > 0 else 0
# BM25スコア(簡略版)
tf_scores = []
for term in query_terms:
tf = doc.content.lower().count(term)
tf_scores.append(tf)
bm25_score = sum(tf_scores) / len(doc.content.split()) if doc.content else 0
# 総合スコア
combined_score = 0.7 * jaccard_score + 0.3 * bm25_score
scores.append((doc, combined_score))
# スコアでソート
scores.sort(key=lambda x: x[1], reverse=True)
return [doc for doc, _ in scores[:top_k]]
async def generate_answer(
self,
query: str,
context_documents: List[Document],
system_prompt: Optional[str] = None
) -> RAGResponse:
"""回答の生成"""
# コンテキストの構築
context = self.build_context(context_documents)
# システムプロンプトの設定
if system_prompt is None:
system_prompt = """
あなたは質問応答アシスタントです。提供されたコンテキスト情報を基に、
正確で有用な回答を提供してください。コンテキストに情報がない場合は、
その旨を明確に伝えてください。
"""
# プロンプトの構築
user_prompt = f"""
コンテキスト:
{context}
質問: {query}
上記のコンテキストを基に、質問に対する正確な回答を提供してください。
回答の根拠となる情報も合わせて説明してください。
"""
try:
# LLMによる回答生成
response = await self.llm_client.chat.completions.acreate(
model="gpt-4-turbo-preview",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
temperature=0.1,
max_tokens=1000
)
answer = response.choices[0].message.content
# 信頼度スコアの計算(簡略版)
confidence_score = self.calculate_confidence_score(
query, answer, context_documents
)
return RAGResponse(
answer=answer,
source_documents=context_documents,
confidence_score=confidence_score,
reasoning="Based on the provided context documents"
)
except Exception as e:
self.logger.error(f"Error generating answer: {e}")
return RAGResponse(
answer="申し訳ございませんが、回答の生成中にエラーが発生しました。",
source_documents=context_documents,
confidence_score=0.0,
reasoning=f"Error: {str(e)}"
)
def build_context(self, documents: List[Document]) -> str:
"""ドキュメントからコンテキストを構築"""
context_parts = []
for i, doc in enumerate(documents, 1):
context_part = f"[文書{i}] {doc.title}\n{doc.content}\n"
context_parts.append(context_part)
return "\n".join(context_parts)
def calculate_confidence_score(
self,
query: str,
answer: str,
documents: List[Document]
) -> float:
"""信頼度スコアの計算"""
# 簡易的な信頼度計算
query_terms = set(query.lower().split())
answer_terms = set(answer.lower().split())
# ドキュメントとの関連性
doc_relevance_scores = []
for doc in documents:
doc_terms = set(doc.content.lower().split())
intersection = len(query_terms.intersection(doc_terms))
doc_score = intersection / len(query_terms) if query_terms else 0
doc_relevance_scores.append(doc_score)
avg_doc_relevance = np.mean(doc_relevance_scores) if doc_relevance_scores else 0
# 回答の具体性(長さベース)
answer_specificity = min(len(answer.split()) / 50, 1.0)
# 総合信頼度スコア
confidence = 0.6 * avg_doc_relevance + 0.4 * answer_specificity
return min(confidence, 1.0)
async def query(
self,
question: str,
top_k: int = 5,
filter_dict: Optional[Dict[str, Any]] = None,
system_prompt: Optional[str] = None
) -> RAGResponse:
"""エンドツーエンドのクエリ処理"""
try:
# 関連ドキュメントの取得
relevant_docs = await self.retrieve_relevant_documents(
query=question,
top_k=top_k,
filter_dict=filter_dict
)
if not relevant_docs:
return RAGResponse(
answer="関連する情報が見つかりませんでした。",
source_documents=[],
confidence_score=0.0,
reasoning="No relevant documents found"
)
# 回答の生成
response = await self.generate_answer(
query=question,
context_documents=relevant_docs,
system_prompt=system_prompt
)
return response
except Exception as e:
self.logger.error(f"Error in RAG query: {e}")
return RAGResponse(
answer="エラーが発生しました。",
source_documents=[],
confidence_score=0.0,
reasoning=f"Error: {str(e)}"
)
画像検索システムの実装
マルチモーダル検索エンジン
# multimodal_search.py
import torch
import clip
from PIL import Image
import requests
from io import BytesIO
from typing import List, Dict, Any, Union, Optional
class MultimodalSearchEngine:
def __init__(self, model_name: str = "ViT-B/32"):
self.device = "cuda" if torch.cuda.is_available() else "cpu"
self.model, self.preprocess = clip.load(model_name, device=self.device)
self.vector_store = None
def set_vector_store(self, vector_store):
"""ベクトルストアの設定"""
self.vector_store = vector_store
def encode_image(self, image: Union[str, Image.Image, bytes]) -> np.ndarray:
"""画像をベクトルに変換"""
if isinstance(image, str):
# URL or file path
if image.startswith(('http://', 'https://')):
response = requests.get(image)
image = Image.open(BytesIO(response.content))
else:
image = Image.open(image)
elif isinstance(image, bytes):
image = Image.open(BytesIO(image))
# 前処理
image_input = self.preprocess(image).unsqueeze(0).to(self.device)
# エンコード
with torch.no_grad():
image_features = self.model.encode_image(image_input)
image_features = image_features / image_features.norm(dim=-1, keepdim=True)
return image_features.cpu().numpy()[0]
def encode_text(self, text: str) -> np.ndarray:
"""テキストをベクトルに変換"""
text_input = clip.tokenize([text]).to(self.device)
with torch.no_grad():
text_features = self.model.encode_text(text_input)
text_features = text_features / text_features.norm(dim=-1, keepdim=True)
return text_features.cpu().numpy()[0]
async def index_images(
self,
image_data: List[Dict[str, Any]]
) -> Dict[str, int]:
"""画像データのインデクシング"""
vectors = []
metadatas = []
ids = []
for item in image_data:
try:
# 画像をエンコード
image_vector = self.encode_image(item['image'])
vectors.append(image_vector)
# メタデータの準備
metadata = {
'image_url': item.get('image_url', ''),
'title': item.get('title', ''),
'description': item.get('description', ''),
'tags': item.get('tags', []),
'category': item.get('category', ''),
'created_at': item.get('created_at', ''),
'image_format': item.get('image_format', ''),
'image_size': item.get('image_size', {})
}
metadatas.append(metadata)
ids.append(item['id'])
except Exception as e:
print(f"Error processing image {item.get('id', 'unknown')}: {e}")
continue
# ベクトルストアに保存
if vectors:
result = self.vector_store.upsert_vectors(
vectors=np.array(vectors),
metadatas=metadatas,
ids=ids
)
return result
return {'upserted_count': 0}
def search_by_image(
self,
query_image: Union[str, Image.Image, bytes],
top_k: int = 10,
filter_dict: Optional[Dict[str, Any]] = None
) -> List[Dict[str, Any]]:
"""画像による類似画像検索"""
# クエリ画像をエンコード
query_vector = self.encode_image(query_image)
# 類似画像を検索
results = self.vector_store.query(
query_vector=query_vector,
top_k=top_k,
filter_dict=filter_dict
)
return results
def search_by_text(
self,
query_text: str,
top_k: int = 10,
filter_dict: Optional[Dict[str, Any]] = None
) -> List[Dict[str, Any]]:
"""テキストによる画像検索"""
# クエリテキストをエンコード
query_vector = self.encode_text(query_text)
# 関連画像を検索
results = self.vector_store.query(
query_vector=query_vector,
top_k=top_k,
filter_dict=filter_dict
)
return results
def hybrid_image_search(
self,
query_image: Optional[Union[str, Image.Image, bytes]] = None,
query_text: Optional[str] = None,
image_weight: float = 0.6,
text_weight: float = 0.4,
top_k: int = 10,
filter_dict: Optional[Dict[str, Any]] = None
) -> List[Dict[str, Any]]:
"""ハイブリッド画像検索(画像+テキスト)"""
if not query_image and not query_text:
raise ValueError("Either query_image or query_text must be provided")
results = []
if query_image and query_text:
# 両方のクエリがある場合
image_vector = self.encode_image(query_image)
text_vector = self.encode_text(query_text)
# 重み付き平均
combined_vector = (
image_weight * image_vector +
text_weight * text_vector
)
results = self.vector_store.query(
query_vector=combined_vector,
top_k=top_k,
filter_dict=filter_dict
)
elif query_image:
results = self.search_by_image(
query_image, top_k, filter_dict
)
else:
results = self.search_by_text(
query_text, top_k, filter_dict
)
return results
def generate_image_captions(
self,
images: List[Union[str, Image.Image, bytes]],
caption_templates: List[str] = None
) -> List[Dict[str, Any]]:
"""画像キャプションの生成"""
if caption_templates is None:
caption_templates = [
"a photo of a {}",
"a picture of a {}",
"an image of a {}",
]
results = []
for image in images:
try:
image_vector = self.encode_image(image)
# 事前定義されたキャプションテンプレートとの類似度を計算
best_captions = []
for template in caption_templates:
# 各テンプレートに対して最も類似する概念を見つける
# (実際の実装では、より sophisticated な手法を使用)
concepts = ["person", "animal", "vehicle", "building", "nature", "object"]
best_concept = None
best_similarity = -1
for concept in concepts:
text_vector = self.encode_text(template.format(concept))
similarity = np.dot(image_vector, text_vector)
if similarity > best_similarity:
best_similarity = similarity
best_concept = concept
if best_concept:
best_captions.append({
'caption': template.format(best_concept),
'confidence': float(best_similarity)
})
results.append({
'image': image,
'captions': sorted(best_captions, key=lambda x: x['confidence'], reverse=True)
})
except Exception as e:
results.append({
'image': image,
'captions': [],
'error': str(e)
})
return results
def cluster_images(
self,
image_vectors: np.ndarray,
n_clusters: int = 5,
method: str = "kmeans"
) -> Dict[str, Any]:
"""画像のクラスタリング"""
if method == "kmeans":
from sklearn.cluster import KMeans
clustering = KMeans(n_clusters=n_clusters, random_state=42)
cluster_labels = clustering.fit_predict(image_vectors)
return {
'cluster_labels': cluster_labels.tolist(),
'cluster_centers': clustering.cluster_centers_.tolist(),
'inertia': clustering.inertia_
}
elif method == "hierarchical":
from sklearn.cluster import AgglomerativeClustering
clustering = AgglomerativeClustering(n_clusters=n_clusters)
cluster_labels = clustering.fit_predict(image_vectors)
return {
'cluster_labels': cluster_labels.tolist(),
'n_clusters': n_clusters
}
else:
raise ValueError(f"Unknown clustering method: {method}")
レコメンドシステムの実装
協調フィルタリング + ベクトル検索
# recommendation_system.py
import numpy as np
from typing import List, Dict, Any, Tuple, Optional
from scipy.sparse import csr_matrix
from sklearn.decomposition import TruncatedSVD
from sklearn.metrics.pairwise import cosine_similarity
class HybridRecommendationSystem:
def __init__(self, vector_store, embedding_manager):
self.vector_store = vector_store
self.embedding_manager = embedding_manager
self.user_item_matrix = None
self.item_features = None
self.user_profiles = {}
self.collaborative_model = None
def build_user_item_matrix(
self,
interactions: List[Dict[str, Any]]
) -> csr_matrix:
"""ユーザー-アイテム行列の構築"""
# ユーザーとアイテムのIDをマッピング
user_ids = list(set([interaction['user_id'] for interaction in interactions]))
item_ids = list(set([interaction['item_id'] for interaction in interactions]))
self.user_id_map = {uid: idx for idx, uid in enumerate(user_ids)}
self.item_id_map = {iid: idx for idx, iid in enumerate(item_ids)}
self.reverse_user_map = {idx: uid for uid, idx in self.user_id_map.items()}
self.reverse_item_map = {idx: iid for iid, idx in self.item_id_map.items()}
# 疎行列の作成
n_users = len(user_ids)
n_items = len(item_ids)
row_indices = []
col_indices = []
data = []
for interaction in interactions:
user_idx = self.user_id_map[interaction['user_id']]
item_idx = self.item_id_map[interaction['item_id']]
rating = interaction.get('rating', 1.0) # implicit feedback の場合は1
row_indices.append(user_idx)
col_indices.append(item_idx)
data.append(rating)
self.user_item_matrix = csr_matrix(
(data, (row_indices, col_indices)),
shape=(n_users, n_items)
)
return self.user_item_matrix
def train_collaborative_filtering(
self,
n_components: int = 50,
algorithm: str = "svd"
):
"""協調フィルタリングモデルの訓練"""
if self.user_item_matrix is None:
raise ValueError("User-item matrix not built. Call build_user_item_matrix first.")
if algorithm == "svd":
self.collaborative_model = TruncatedSVD(
n_components=n_components,
random_state=42
)
self.user_factors = self.collaborative_model.fit_transform(self.user_item_matrix)
self.item_factors = self.collaborative_model.components_.T
else:
raise ValueError(f"Unknown algorithm: {algorithm}")
def build_content_features(
self,
items: List[Dict[str, Any]]
):
"""アイテムのコンテンツ特徴量の構築"""
item_features = {}
for item in items:
# テキスト特徴量の抽出
text_content = f"{item.get('title', '')} {item.get('description', '')}"
text_embedding = self.embedding_manager.encode_text(text_content)
# カテゴリ特徴量
categories = item.get('categories', [])
category_features = self._encode_categories(categories)
# 価格、評価などの数値特徴量
numerical_features = np.array([
item.get('price', 0),
item.get('rating', 0),
item.get('popularity', 0),
len(item.get('reviews', []))
])
# 特徴量の結合
combined_features = np.concatenate([
text_embedding,
category_features,
numerical_features
])
item_features[item['id']] = combined_features
self.item_features = item_features
def _encode_categories(self, categories: List[str]) -> np.ndarray:
"""カテゴリの one-hot エンコーディング"""
# 簡略化された実装
all_categories = [
'electronics', 'books', 'clothing', 'home', 'sports',
'automotive', 'health', 'beauty', 'toys', 'groceries'
]
encoding = np.zeros(len(all_categories))
for category in categories:
if category.lower() in all_categories:
idx = all_categories.index(category.lower())
encoding[idx] = 1.0
return encoding
def build_user_profile(
self,
user_id: str,
interactions: List[Dict[str, Any]]
) -> np.ndarray:
"""ユーザープロファイルの構築"""
user_interactions = [
interaction for interaction in interactions
if interaction['user_id'] == user_id
]
if not user_interactions:
return np.zeros(self.embedding_manager.dimension)
# インタラクションしたアイテムの特徴量を集約
item_features = []
weights = []
for interaction in user_interactions:
item_id = interaction['item_id']
if item_id in self.item_features:
item_features.append(self.item_features[item_id])
weights.append(interaction.get('rating', 1.0))
if not item_features:
return np.zeros(self.embedding_manager.dimension)
# 重み付き平均
weighted_features = np.average(
np.array(item_features),
weights=weights,
axis=0
)
self.user_profiles[user_id] = weighted_features
return weighted_features
def collaborative_recommendations(
self,
user_id: str,
top_k: int = 10,
exclude_seen: bool = True
) -> List[Tuple[str, float]]:
"""協調フィルタリングベースの推薦"""
if user_id not in self.user_id_map:
return []
user_idx = self.user_id_map[user_id]
user_vector = self.user_factors[user_idx]
# 全アイテムに対するスコア計算
scores = np.dot(self.item_factors, user_vector)
# スコアでソート
item_scores = [(self.reverse_item_map[idx], score)
for idx, score in enumerate(scores)]
item_scores.sort(key=lambda x: x[1], reverse=True)
# 既に見たアイテムを除外
if exclude_seen:
seen_items = set()
user_row = self.user_item_matrix[user_idx].toarray()[0]
for item_idx, rating in enumerate(user_row):
if rating > 0:
seen_items.add(self.reverse_item_map[item_idx])
item_scores = [
(item_id, score) for item_id, score in item_scores
if item_id not in seen_items
]
return item_scores[:top_k]
def content_based_recommendations(
self,
user_id: str,
top_k: int = 10
) -> List[Tuple[str, float]]:
"""コンテンツベースの推薦"""
if user_id not in self.user_profiles:
return []
user_profile = self.user_profiles[user_id]
# アイテムとの類似度計算
item_similarities = []
for item_id, item_features in self.item_features.items():
similarity = cosine_similarity(
user_profile.reshape(1, -1),
item_features.reshape(1, -1)
)[0][0]
item_similarities.append((item_id, similarity))
# 類似度でソート
item_similarities.sort(key=lambda x: x[1], reverse=True)
return item_similarities[:top_k]
def vector_search_recommendations(
self,
user_id: str,
top_k: int = 10
) -> List[Dict[str, Any]]:
"""ベクトル検索ベースの推薦"""
if user_id not in self.user_profiles:
return []
user_profile = self.user_profiles[user_id]
# ベクトルデータベースで類似アイテムを検索
results = self.vector_store.query(
query_vector=user_profile,
top_k=top_k,
include_metadata=True
)
return results
def hybrid_recommendations(
self,
user_id: str,
top_k: int = 10,
cf_weight: float = 0.4,
cb_weight: float = 0.3,
vs_weight: float = 0.3
) -> List[Tuple[str, float]]:
"""ハイブリッド推薦"""
# 各手法からの推薦を取得
cf_recs = self.collaborative_recommendations(user_id, top_k * 2)
cb_recs = self.content_based_recommendations(user_id, top_k * 2)
vs_recs = self.vector_search_recommendations(user_id, top_k * 2)
# スコアの正規化と統合
combined_scores = {}
# 協調フィルタリングのスコア
if cf_recs:
max_cf_score = max(score for _, score in cf_recs)
for item_id, score in cf_recs:
normalized_score = score / max_cf_score if max_cf_score > 0 else 0
combined_scores[item_id] = combined_scores.get(item_id, 0) + cf_weight * normalized_score
# コンテンツベースのスコア
if cb_recs:
max_cb_score = max(score for _, score in cb_recs)
for item_id, score in cb_recs:
normalized_score = score / max_cb_score if max_cb_score > 0 else 0
combined_scores[item_id] = combined_scores.get(item_id, 0) + cb_weight * normalized_score
# ベクトル検索のスコア
if vs_recs:
max_vs_score = max(result['score'] for result in vs_recs)
for result in vs_recs:
item_id = result['id']
normalized_score = result['score'] / max_vs_score if max_vs_score > 0 else 0
combined_scores[item_id] = combined_scores.get(item_id, 0) + vs_weight * normalized_score
# 統合スコアでソート
sorted_recommendations = sorted(
combined_scores.items(),
key=lambda x: x[1],
reverse=True
)
return sorted_recommendations[:top_k]
def evaluate_recommendations(
self,
test_interactions: List[Dict[str, Any]],
recommendation_method: str = "hybrid",
top_k: int = 10
) -> Dict[str, float]:
"""推薦精度の評価"""
precisions = []
recalls = []
# ユーザーごとに評価
test_users = set([interaction['user_id'] for interaction in test_interactions])
for user_id in test_users:
# 実際にインタラクションしたアイテム
actual_items = set([
interaction['item_id'] for interaction in test_interactions
if interaction['user_id'] == user_id
])
if not actual_items:
continue
# 推薦結果を取得
if recommendation_method == "hybrid":
recommended_items = [
item_id for item_id, _ in self.hybrid_recommendations(user_id, top_k)
]
elif recommendation_method == "collaborative":
recommended_items = [
item_id for item_id, _ in self.collaborative_recommendations(user_id, top_k)
]
elif recommendation_method == "content":
recommended_items = [
item_id for item_id, _ in self.content_based_recommendations(user_id, top_k)
]
else:
continue
recommended_set = set(recommended_items)
# Precision と Recall の計算
if recommended_set:
precision = len(actual_items.intersection(recommended_set)) / len(recommended_set)
precisions.append(precision)
if actual_items:
recall = len(actual_items.intersection(recommended_set)) / len(actual_items)
recalls.append(recall)
# 平均値の計算
avg_precision = np.mean(precisions) if precisions else 0
avg_recall = np.mean(recalls) if recalls else 0
f1_score = (2 * avg_precision * avg_recall) / (avg_precision + avg_recall) if (avg_precision + avg_recall) > 0 else 0
return {
'precision': avg_precision,
'recall': avg_recall,
'f1_score': f1_score
}
パフォーマンス最適化
インデックス最適化
# performance_optimization.py
import time
import psutil
from typing import Dict, Any, List
import numpy as np
class VectorDatabaseOptimizer:
def __init__(self, vector_store):
self.vector_store = vector_store
self.performance_metrics = {}
def benchmark_search_performance(
self,
query_vectors: List[np.ndarray],
top_k_values: List[int] = [1, 5, 10, 50, 100],
num_runs: int = 5
) -> Dict[str, Any]:
"""検索パフォーマンスのベンチマーク"""
results = {}
for top_k in top_k_values:
latencies = []
throughputs = []
for run in range(num_runs):
start_time = time.time()
# バッチ検索の実行
for query_vector in query_vectors:
self.vector_store.query(
query_vector=query_vector,
top_k=top_k
)
end_time = time.time()
total_time = end_time - start_time
# メトリクスの計算
latency = total_time / len(query_vectors) # 平均レイテンシ
throughput = len(query_vectors) / total_time # QPS
latencies.append(latency)
throughputs.append(throughput)
results[f'top_k_{top_k}'] = {
'avg_latency_ms': np.mean(latencies) * 1000,
'p95_latency_ms': np.percentile(latencies, 95) * 1000,
'avg_throughput_qps': np.mean(throughputs),
'min_throughput_qps': np.min(throughputs)
}
return results
def monitor_resource_usage(self, duration_seconds: int = 60) -> Dict[str, Any]:
"""リソース使用量の監視"""
cpu_usage = []
memory_usage = []
disk_io = []
start_time = time.time()
while time.time() - start_time < duration_seconds:
# CPU使用率
cpu_percent = psutil.cpu_percent(interval=1)
cpu_usage.append(cpu_percent)
# メモリ使用量
memory_info = psutil.virtual_memory()
memory_usage.append(memory_info.percent)
# ディスクI/O
disk_info = psutil.disk_io_counters()
if disk_info:
disk_io.append({
'read_bytes': disk_info.read_bytes,
'write_bytes': disk_info.write_bytes
})
return {
'cpu_usage': {
'avg': np.mean(cpu_usage),
'max': np.max(cpu_usage),
'min': np.min(cpu_usage)
},
'memory_usage': {
'avg': np.mean(memory_usage),
'max': np.max(memory_usage),
'min': np.min(memory_usage)
},
'disk_io': disk_io
}
def optimize_batch_size(
self,
query_vectors: List[np.ndarray],
batch_sizes: List[int] = [1, 5, 10, 20, 50, 100]
) -> Dict[str, Any]:
"""最適なバッチサイズの特定"""
results = {}
for batch_size in batch_sizes:
start_time = time.time()
# バッチ処理
for i in range(0, len(query_vectors), batch_size):
batch = query_vectors[i:i + batch_size]
# バッチ検索の実行
if hasattr(self.vector_store, 'batch_search'):
self.vector_store.batch_search(batch)
else:
# 個別検索のフォールバック
for vector in batch:
self.vector_store.query(query_vector=vector, top_k=10)
end_time = time.time()
total_time = end_time - start_time
results[f'batch_size_{batch_size}'] = {
'total_time_seconds': total_time,
'avg_time_per_vector_ms': (total_time / len(query_vectors)) * 1000,
'throughput_qps': len(query_vectors) / total_time
}
# 最適なバッチサイズを特定
best_batch_size = min(
results.keys(),
key=lambda k: results[k]['total_time_seconds']
)
return {
'results': results,
'optimal_batch_size': int(best_batch_size.split('_')[-1]),
'optimal_performance': results[best_batch_size]
}
def analyze_index_efficiency(self) -> Dict[str, Any]:
"""インデックス効率の分析"""
if hasattr(self.vector_store, 'get_index_stats'):
stats = self.vector_store.get_index_stats()
analysis = {
'index_size_mb': stats.get('index_size_bytes', 0) / (1024 * 1024),
'vector_count': stats.get('vector_count', 0),
'memory_usage_mb': stats.get('memory_usage_bytes', 0) / (1024 * 1024),
'fragmentation_ratio': self._calculate_fragmentation_ratio(stats)
}
# 効率性の評価
if analysis['vector_count'] > 0:
analysis['bytes_per_vector'] = (
stats.get('index_size_bytes', 0) / analysis['vector_count']
)
analysis['memory_per_vector'] = (
stats.get('memory_usage_bytes', 0) / analysis['vector_count']
)
return analysis
return {'error': 'Index statistics not available'}
def _calculate_fragmentation_ratio(self, stats: Dict[str, Any]) -> float:
"""フラグメンテーション比率の計算"""
total_size = stats.get('index_size_bytes', 0)
used_size = stats.get('used_size_bytes', total_size)
if total_size > 0:
return 1.0 - (used_size / total_size)
return 0.0
def recommend_optimizations(
self,
performance_data: Dict[str, Any]
) -> List[Dict[str, str]]:
"""最適化の推奨事項"""
recommendations = []
# レイテンシに基づく推奨
avg_latency = performance_data.get('search_performance', {}).get('top_k_10', {}).get('avg_latency_ms', 0)
if avg_latency > 100: # 100ms以上
recommendations.append({
'type': 'latency_optimization',
'issue': 'High search latency detected',
'recommendation': 'Consider using a more efficient index algorithm (e.g., HNSW) or reducing vector dimensions',
'priority': 'high'
})
# メモリ使用量に基づく推奨
memory_usage = performance_data.get('resource_usage', {}).get('memory_usage', {}).get('avg', 0)
if memory_usage > 80: # 80%以上
recommendations.append({
'type': 'memory_optimization',
'issue': 'High memory usage detected',
'recommendation': 'Consider implementing vector compression or using disk-based storage',
'priority': 'medium'
})
# インデックス効率に基づく推奨
fragmentation = performance_data.get('index_efficiency', {}).get('fragmentation_ratio', 0)
if fragmentation > 0.3: # 30%以上のフラグメンテーション
recommendations.append({
'type': 'index_maintenance',
'issue': 'High index fragmentation detected',
'recommendation': 'Consider rebuilding the index to improve efficiency',
'priority': 'low'
})
return recommendations
まとめ
ベクトルデータベースは、2025年のAI時代において重要なインフラストラクチャとなっています。
選択のポイント
- スケール要件 - 小規模ならChromaDB、大規模ならPinecone/Milvus
- 運用負荷 - マネージドサービスか自己運用か
- 機能要件 - マルチモーダル、フィルタリング、ハイブリッド検索
- 予算 - クラウドサービスの料金vs運用コスト
- パフォーマンス - レイテンシとスループットの要件
実装のベストプラクティス
- 適切な次元数 - 用途に応じた埋め込み次元の選択
- インデックス最適化 - データ特性に合ったアルゴリズム選択
- バッチ処理 - 効率的なデータ投入と検索
- 監視と最適化 - パフォーマンスメトリクスの継続的な監視
ベクトルデータベースの適切な選択と実装により、高度なAIアプリケーションの構築が可能になります。