DT 技術解説 ベクトルデータベース完全ガイド 2025年版

ベクトルデータベース完全ガイド 2025年版 - RAGからレコメンドまで

ベクトルデータベースの基本概念から実装、最適化まで包括的に解説。Pinecone、Weaviate、ChromaDB、Qdrantの比較と、RAG、レコメンドシステム、画像検索の実装例を紹介します。

約5分で読めます
技術記事
実践的

この記事のポイント

ベクトルデータベースの基本概念から実装、最適化まで包括的に解説。Pinecone、Weaviate、ChromaDB、Qdrantの比較と、RAG、レコメンドシステム、画像検索の実装例を紹介します。

この記事では、実践的なアプローチで技術的な課題を解決する方法を詳しく解説します。具体的なコード例とともに、ベストプラクティスを学ぶことができます。

はじめに

ベクトルデータベースは、AIアプリケーション、特に大規模言語モデル(LLM)を活用したアプリケーションにおいて重要な技術基盤となっています。2025年現在、多様なベクトルデータベースが登場し、それぞれ異なる特徴と用途を持っています。

ベクトルデータベースの基本概念

ベクトル化とは

graph TB
    subgraph "データソース"
        T[テキスト文書]
        I[画像]
        A[音声]
        V[動画]
    end
    
    subgraph "エンベディングモデル"
        TM[Text Embedding
OpenAI, BGE, E5] IM[Image Embedding
CLIP, DINOv2] AM[Audio Embedding
Wav2Vec] VM[Video Embedding
VideoBERT] end subgraph "ベクトル表現" TV[テキストベクトル
768次元] IV[画像ベクトル
512次元] AV[音声ベクトル
1024次元] VV[動画ベクトル
2048次元] end subgraph "ベクトルデータベース" VDB[(Vector Database)] SI[類似性検索] IDX[インデックス
HNSW, IVF] end T --> TM --> TV --> VDB I --> IM --> IV --> VDB A --> AM --> AV --> VDB V --> VM --> VV --> VDB VDB --> SI --> IDX

類似性検索の仕組み

# embedding_utils.py
import numpy as np
from typing import List, Tuple, Union
import torch
from sentence_transformers import SentenceTransformer

class EmbeddingManager:
    def __init__(self, model_name: str = "sentence-transformers/all-MiniLM-L6-v2"):
        self.model = SentenceTransformer(model_name)
        self.dimension = self.model.get_sentence_embedding_dimension()
    
    def encode_texts(self, texts: List[str]) -> np.ndarray:
        """テキストをベクトルに変換"""
        embeddings = self.model.encode(texts, convert_to_numpy=True)
        return embeddings
    
    def encode_text(self, text: str) -> np.ndarray:
        """単一テキストをベクトルに変換"""
        return self.model.encode([text], convert_to_numpy=True)[0]
    
    @staticmethod
    def cosine_similarity(vector1: np.ndarray, vector2: np.ndarray) -> float:
        """コサイン類似度の計算"""
        dot_product = np.dot(vector1, vector2)
        norm1 = np.linalg.norm(vector1)
        norm2 = np.linalg.norm(vector2)
        
        if norm1 == 0 or norm2 == 0:
            return 0.0
        
        return dot_product / (norm1 * norm2)
    
    @staticmethod
    def euclidean_distance(vector1: np.ndarray, vector2: np.ndarray) -> float:
        """ユークリッド距離の計算"""
        return np.linalg.norm(vector1 - vector2)
    
    @staticmethod
    def dot_product_similarity(vector1: np.ndarray, vector2: np.ndarray) -> float:
        """内積による類似度"""
        return np.dot(vector1, vector2)
    
    def find_most_similar(
        self, 
        query_vector: np.ndarray, 
        candidate_vectors: np.ndarray,
        top_k: int = 5,
        metric: str = "cosine"
    ) -> List[Tuple[int, float]]:
        """最も類似したベクトルを検索"""
        similarities = []
        
        for i, candidate in enumerate(candidate_vectors):
            if metric == "cosine":
                sim = self.cosine_similarity(query_vector, candidate)
            elif metric == "euclidean":
                sim = -self.euclidean_distance(query_vector, candidate)  # 距離なので負の値
            elif metric == "dot_product":
                sim = self.dot_product_similarity(query_vector, candidate)
            else:
                raise ValueError(f"Unknown metric: {metric}")
            
            similarities.append((i, sim))
        
        # 類似度でソート
        similarities.sort(key=lambda x: x[1], reverse=True)
        return similarities[:top_k]

# 使用例
def demonstrate_similarity_search():
    embedding_manager = EmbeddingManager()
    
    # サンプルデータ
    documents = [
        "機械学習は人工知能の一分野です",
        "深層学習はニューラルネットワークを使用します",
        "自然言語処理はテキストデータを扱います",
        "コンピュータビジョンは画像認識技術です",
        "データサイエンスは統計学とプログラミングを組み合わせます"
    ]
    
    # ドキュメントをベクトル化
    doc_embeddings = embedding_manager.encode_texts(documents)
    
    # クエリ
    query = "AIと機械学習について教えて"
    query_embedding = embedding_manager.encode_text(query)
    
    # 類似検索
    results = embedding_manager.find_most_similar(
        query_embedding, 
        doc_embeddings, 
        top_k=3,
        metric="cosine"
    )
    
    print(f"クエリ: {query}")
    print("類似度の高い文書:")
    for idx, similarity in results:
        print(f"  {similarity:.3f}: {documents[idx]}")

主要ベクトルデータベースの比較

アーキテクチャと特徴

// vector-db-comparison.ts
interface VectorDatabaseConfig {
    name: string;
    type: 'cloud' | 'self-hosted' | 'hybrid';
    indexAlgorithms: string[];
    maxDimensions: number;
    scalability: 'vertical' | 'horizontal' | 'both';
    consistency: 'eventual' | 'strong';
    specialFeatures: string[];
}

const vectorDatabases: VectorDatabaseConfig[] = [
    {
        name: 'Pinecone',
        type: 'cloud',
        indexAlgorithms: ['HNSW'],
        maxDimensions: 20000,
        scalability: 'horizontal',
        consistency: 'eventual',
        specialFeatures: [
            'マネージドサービス',
            'リアルタイム更新',
            'メタデータフィルタリング',
            'ハイブリッド検索'
        ]
    },
    {
        name: 'Weaviate',
        type: 'hybrid',
        indexAlgorithms: ['HNSW', 'LSH'],
        maxDimensions: 65536,
        scalability: 'both',
        consistency: 'eventual',
        specialFeatures: [
            'GraphQLクエリ',
            'マルチモーダル検索',
            'スキーマ管理',
            'モジュラーアーキテクチャ'
        ]
    },
    {
        name: 'Qdrant',
        type: 'self-hosted',
        indexAlgorithms: ['HNSW', 'IVF'],
        maxDimensions: 65536,
        scalability: 'horizontal',
        consistency: 'strong',
        specialFeatures: [
            'Rust製で高速',
            'ペイロード付きベクトル',
            'フィルタリング最適化',
            '分散クラスタリング'
        ]
    },
    {
        name: 'ChromaDB',
        type: 'self-hosted',
        indexAlgorithms: ['HNSW', 'Brute Force'],
        maxDimensions: 2048,
        scalability: 'vertical',
        consistency: 'strong',
        specialFeatures: [
            '軽量・簡単セットアップ',
            'Python統合',
            'ローカル開発',
            'SQLite基盤'
        ]
    },
    {
        name: 'Milvus',
        type: 'self-hosted',
        indexAlgorithms: ['HNSW', 'IVF', 'ANNOY', 'FAISS'],
        maxDimensions: 32768,
        scalability: 'horizontal',
        consistency: 'eventual',
        specialFeatures: [
            'クラウドネイティブ',
            '高スループット',
            'Kubernetes対応',
            'アルゴリズム豊富'
        ]
    }
];

class VectorDatabaseSelector {
    static selectDatabase(requirements: {
        scale: 'small' | 'medium' | 'large';
        budget: 'low' | 'medium' | 'high';
        maintenance: 'minimal' | 'moderate' | 'full';
        performance: 'standard' | 'high' | 'ultra';
        features: string[];
    }): VectorDatabaseConfig[] {
        
        return vectorDatabases.filter(db => {
            // スケール要件
            if (requirements.scale === 'large' && db.scalability === 'vertical') {
                return false;
            }
            
            // 予算要件(クラウドサービスは高コスト)
            if (requirements.budget === 'low' && db.type === 'cloud') {
                return false;
            }
            
            // メンテナンス要件
            if (requirements.maintenance === 'minimal' && db.type === 'self-hosted') {
                return false;
            }
            
            // 機能要件
            const hasRequiredFeatures = requirements.features.every(feature =>
                db.specialFeatures.some(dbFeature => 
                    dbFeature.toLowerCase().includes(feature.toLowerCase())
                )
            );
            
            return hasRequiredFeatures;
        });
    }
}

Pinecone の実装例

# pinecone_implementation.py
import pinecone
from typing import List, Dict, Any, Optional
import uuid
import time

class PineconeVectorStore:
    def __init__(self, api_key: str, environment: str, index_name: str):
        pinecone.init(api_key=api_key, environment=environment)
        self.index_name = index_name
        self.index = None
        
    def create_index(self, dimension: int, metric: str = "cosine", pod_type: str = "p1.x1"):
        """インデックスの作成"""
        if self.index_name not in pinecone.list_indexes():
            pinecone.create_index(
                name=self.index_name,
                dimension=dimension,
                metric=metric,
                pod_type=pod_type
            )
            
            # インデックスの準備完了を待機
            while not pinecone.describe_index(self.index_name).status['ready']:
                time.sleep(1)
        
        self.index = pinecone.Index(self.index_name)
    
    def upsert_vectors(
        self, 
        vectors: List[np.ndarray], 
        metadatas: Optional[List[Dict[str, Any]]] = None,
        ids: Optional[List[str]] = None
    ) -> Dict[str, int]:
        """ベクトルの挿入・更新"""
        if not self.index:
            raise ValueError("Index not initialized")
        
        # IDの生成
        if ids is None:
            ids = [str(uuid.uuid4()) for _ in vectors]
        
        # メタデータの準備
        if metadatas is None:
            metadatas = [{}] * len(vectors)
        
        # Pinecone形式に変換
        upsert_data = []
        for i, (vector, metadata, vector_id) in enumerate(zip(vectors, metadatas, ids)):
            upsert_data.append({
                'id': vector_id,
                'values': vector.tolist(),
                'metadata': metadata
            })
        
        # バッチ処理(Pineconeの制限に従って分割)
        batch_size = 100
        upserted_count = 0
        
        for i in range(0, len(upsert_data), batch_size):
            batch = upsert_data[i:i + batch_size]
            response = self.index.upsert(vectors=batch)
            upserted_count += response['upserted_count']
        
        return {'upserted_count': upserted_count}
    
    def query(
        self, 
        query_vector: np.ndarray, 
        top_k: int = 10,
        filter_dict: Optional[Dict[str, Any]] = None,
        include_metadata: bool = True,
        include_values: bool = False
    ) -> List[Dict[str, Any]]:
        """類似ベクトル検索"""
        if not self.index:
            raise ValueError("Index not initialized")
        
        query_response = self.index.query(
            vector=query_vector.tolist(),
            top_k=top_k,
            filter=filter_dict,
            include_metadata=include_metadata,
            include_values=include_values
        )
        
        return query_response['matches']
    
    def hybrid_search(
        self,
        query_vector: np.ndarray,
        query_text: str,
        top_k: int = 10,
        alpha: float = 0.5  # ベクトル検索とテキスト検索の重み
    ) -> List[Dict[str, Any]]:
        """ハイブリッド検索(ベクトル + テキスト)"""
        # ベクトル検索
        vector_results = self.query(
            query_vector=query_vector,
            top_k=top_k * 2  # より多くの候補を取得
        )
        
        # テキスト検索(メタデータ内のテキストフィールドを検索)
        text_filter = {
            "text": {"$regex": f".*{query_text}.*"}
        }
        text_results = self.query(
            query_vector=query_vector,
            top_k=top_k * 2,
            filter_dict=text_filter
        )
        
        # 結果のマージとスコア計算
        combined_results = self._merge_search_results(
            vector_results, text_results, alpha
        )
        
        return combined_results[:top_k]
    
    def _merge_search_results(
        self, 
        vector_results: List[Dict], 
        text_results: List[Dict], 
        alpha: float
    ) -> List[Dict]:
        """検索結果のマージ"""
        result_dict = {}
        
        # ベクトル検索結果の処理
        for i, result in enumerate(vector_results):
            vector_score = result['score']
            vector_rank_score = 1.0 / (i + 1)  # 順位による重み
            
            result_dict[result['id']] = {
                **result,
                'combined_score': alpha * vector_score + (1 - alpha) * vector_rank_score
            }
        
        # テキスト検索結果の処理
        for i, result in enumerate(text_results):
            text_rank_score = 1.0 / (i + 1)
            
            if result['id'] in result_dict:
                # 既存の結果に加算
                result_dict[result['id']]['combined_score'] += (1 - alpha) * text_rank_score
            else:
                # 新しい結果として追加
                result_dict[result['id']] = {
                    **result,
                    'combined_score': alpha * result.get('score', 0) + (1 - alpha) * text_rank_score
                }
        
        # 統合スコアでソート
        sorted_results = sorted(
            result_dict.values(), 
            key=lambda x: x['combined_score'], 
            reverse=True
        )
        
        return sorted_results
    
    def delete_vectors(self, ids: List[str]) -> Dict[str, Any]:
        """ベクトルの削除"""
        if not self.index:
            raise ValueError("Index not initialized")
        
        return self.index.delete(ids=ids)
    
    def get_index_stats(self) -> Dict[str, Any]:
        """インデックス統計の取得"""
        if not self.index:
            raise ValueError("Index not initialized")
        
        return self.index.describe_index_stats()

Qdrant の実装例

# qdrant_implementation.py
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct, Filter, FieldCondition, Range
from typing import List, Dict, Any, Optional
import uuid

class QdrantVectorStore:
    def __init__(self, host: str = "localhost", port: int = 6333):
        self.client = QdrantClient(host=host, port=port)
        self.collection_name = None
    
    def create_collection(
        self, 
        collection_name: str, 
        dimension: int, 
        distance: Distance = Distance.COSINE
    ):
        """コレクションの作成"""
        self.collection_name = collection_name
        
        # 既存のコレクションがあるかチェック
        collections = self.client.get_collections().collections
        existing_names = [col.name for col in collections]
        
        if collection_name not in existing_names:
            self.client.create_collection(
                collection_name=collection_name,
                vectors_config=VectorParams(
                    size=dimension,
                    distance=distance
                )
            )
    
    def upsert_points(
        self, 
        vectors: List[np.ndarray], 
        payloads: Optional[List[Dict[str, Any]]] = None,
        ids: Optional[List[str]] = None
    ) -> Dict[str, Any]:
        """ポイントの挿入・更新"""
        if not self.collection_name:
            raise ValueError("Collection not initialized")
        
        # IDの生成
        if ids is None:
            ids = [str(uuid.uuid4()) for _ in vectors]
        
        # ペイロードの準備
        if payloads is None:
            payloads = [{}] * len(vectors)
        
        # PointStruct の作成
        points = []
        for vector, payload, point_id in zip(vectors, payloads, ids):
            points.append(
                PointStruct(
                    id=point_id,
                    vector=vector.tolist(),
                    payload=payload
                )
            )
        
        # バッチアップサート
        operation_info = self.client.upsert(
            collection_name=self.collection_name,
            points=points
        )
        
        return {
            'operation_id': operation_info.operation_id,
            'status': operation_info.status
        }
    
    def search(
        self, 
        query_vector: np.ndarray, 
        top_k: int = 10,
        filter_conditions: Optional[Filter] = None,
        score_threshold: Optional[float] = None
    ) -> List[Dict[str, Any]]:
        """ベクトル検索"""
        if not self.collection_name:
            raise ValueError("Collection not initialized")
        
        search_result = self.client.search(
            collection_name=self.collection_name,
            query_vector=query_vector.tolist(),
            query_filter=filter_conditions,
            limit=top_k,
            score_threshold=score_threshold
        )
        
        # 結果の整形
        results = []
        for point in search_result:
            results.append({
                'id': point.id,
                'score': point.score,
                'payload': point.payload,
                'vector': point.vector
            })
        
        return results
    
    def filtered_search(
        self,
        query_vector: np.ndarray,
        field_filters: Dict[str, Any],
        top_k: int = 10
    ) -> List[Dict[str, Any]]:
        """フィルタ付き検索"""
        # フィルタ条件の構築
        conditions = []
        for field, value in field_filters.items():
            if isinstance(value, (int, float)):
                if isinstance(value, int):
                    conditions.append(
                        FieldCondition(
                            key=field,
                            match={"value": value}
                        )
                    )
                else:
                    conditions.append(
                        FieldCondition(
                            key=field,
                            range=Range(gte=value, lte=value)
                        )
                    )
            elif isinstance(value, str):
                conditions.append(
                    FieldCondition(
                        key=field,
                        match={"value": value}
                    )
                )
            elif isinstance(value, dict):
                # 範囲指定
                if 'gte' in value or 'lte' in value:
                    conditions.append(
                        FieldCondition(
                            key=field,
                            range=Range(**value)
                        )
                    )
        
        filter_obj = Filter(must=conditions) if conditions else None
        
        return self.search(
            query_vector=query_vector,
            top_k=top_k,
            filter_conditions=filter_obj
        )
    
    def batch_search(
        self,
        query_vectors: List[np.ndarray],
        top_k: int = 10
    ) -> List[List[Dict[str, Any]]]:
        """バッチ検索"""
        if not self.collection_name:
            raise ValueError("Collection not initialized")
        
        # バッチ検索の実行
        search_queries = [
            {
                'vector': vector.tolist(),
                'limit': top_k
            }
            for vector in query_vectors
        ]
        
        batch_results = self.client.search_batch(
            collection_name=self.collection_name,
            requests=search_queries
        )
        
        # 結果の整形
        formatted_results = []
        for result_list in batch_results:
            formatted_result = []
            for point in result_list:
                formatted_result.append({
                    'id': point.id,
                    'score': point.score,
                    'payload': point.payload
                })
            formatted_results.append(formatted_result)
        
        return formatted_results
    
    def recommend_points(
        self,
        positive_ids: List[str],
        negative_ids: Optional[List[str]] = None,
        top_k: int = 10,
        filter_conditions: Optional[Filter] = None
    ) -> List[Dict[str, Any]]:
        """レコメンド機能"""
        if not self.collection_name:
            raise ValueError("Collection not initialized")
        
        recommend_result = self.client.recommend(
            collection_name=self.collection_name,
            positive=positive_ids,
            negative=negative_ids or [],
            limit=top_k,
            query_filter=filter_conditions
        )
        
        results = []
        for point in recommend_result:
            results.append({
                'id': point.id,
                'score': point.score,
                'payload': point.payload
            })
        
        return results
    
    def get_collection_info(self) -> Dict[str, Any]:
        """コレクション情報の取得"""
        if not self.collection_name:
            raise ValueError("Collection not initialized")
        
        info = self.client.get_collection(self.collection_name)
        return {
            'name': info.name,
            'status': info.status,
            'vectors_count': info.vectors_count,
            'segments_count': info.segments_count,
            'disk_data_size': info.disk_data_size,
            'ram_data_size': info.ram_data_size
        }

RAG(Retrieval-Augmented Generation)の実装

エンドツーエンドRAGシステム

# rag_system.py
from typing import List, Dict, Any, Optional
import openai
from dataclasses import dataclass
import asyncio
import logging

@dataclass
class Document:
    id: str
    content: str
    title: str
    metadata: Dict[str, Any]
    embedding: Optional[np.ndarray] = None

@dataclass
class RAGResponse:
    answer: str
    source_documents: List[Document]
    confidence_score: float
    reasoning: str

class RAGSystem:
    def __init__(
        self,
        vector_store: Any,  # VectorStore interface
        embedding_manager: EmbeddingManager,
        llm_client: openai.OpenAI,
        chunk_size: int = 1000,
        chunk_overlap: int = 200
    ):
        self.vector_store = vector_store
        self.embedding_manager = embedding_manager
        self.llm_client = llm_client
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.logger = logging.getLogger(__name__)
    
    def chunk_document(self, document: Document) -> List[Document]:
        """ドキュメントをチャンクに分割"""
        content = document.content
        chunks = []
        
        # 重複を考慮した分割
        start = 0
        chunk_id = 0
        
        while start < len(content):
            end = min(start + self.chunk_size, len(content))
            
            # 文境界で分割調整
            if end < len(content):
                last_period = content.rfind('.', start, end)
                last_newline = content.rfind('\n', start, end)
                boundary = max(last_period, last_newline)
                
                if boundary > start:
                    end = boundary + 1
            
            chunk_content = content[start:end].strip()
            
            if chunk_content:
                chunk = Document(
                    id=f"{document.id}_chunk_{chunk_id}",
                    content=chunk_content,
                    title=document.title,
                    metadata={
                        **document.metadata,
                        'parent_id': document.id,
                        'chunk_index': chunk_id,
                        'chunk_start': start,
                        'chunk_end': end
                    }
                )
                chunks.append(chunk)
                chunk_id += 1
            
            # オーバーラップを考慮して次の開始位置を設定
            start = end - self.chunk_overlap
            if start >= end:
                break
        
        return chunks
    
    async def ingest_documents(self, documents: List[Document]) -> Dict[str, int]:
        """ドキュメントの取り込み"""
        all_chunks = []
        
        # ドキュメントをチャンクに分割
        for doc in documents:
            chunks = self.chunk_document(doc)
            all_chunks.extend(chunks)
        
        # チャンクをベクトル化
        contents = [chunk.content for chunk in all_chunks]
        embeddings = self.embedding_manager.encode_texts(contents)
        
        # ベクトルストアに保存
        metadatas = []
        for chunk in all_chunks:
            metadatas.append({
                'title': chunk.title,
                'content': chunk.content,
                **chunk.metadata
            })
        
        result = self.vector_store.upsert_vectors(
            vectors=embeddings,
            metadatas=metadatas,
            ids=[chunk.id for chunk in all_chunks]
        )
        
        self.logger.info(f"Ingested {len(all_chunks)} chunks from {len(documents)} documents")
        return {
            'documents_processed': len(documents),
            'chunks_created': len(all_chunks),
            'vectors_upserted': result.get('upserted_count', len(all_chunks))
        }
    
    async def retrieve_relevant_documents(
        self, 
        query: str, 
        top_k: int = 5,
        filter_dict: Optional[Dict[str, Any]] = None,
        rerank: bool = True
    ) -> List[Document]:
        """関連ドキュメントの取得"""
        # クエリをベクトル化
        query_embedding = self.embedding_manager.encode_text(query)
        
        # ベクトル検索
        search_results = self.vector_store.query(
            query_vector=query_embedding,
            top_k=top_k * 2 if rerank else top_k,  # リランク用に多めに取得
            filter_dict=filter_dict,
            include_metadata=True
        )
        
        # 結果をDocumentオブジェクトに変換
        documents = []
        for result in search_results:
            metadata = result.get('metadata', {})
            doc = Document(
                id=result['id'],
                content=metadata.get('content', ''),
                title=metadata.get('title', ''),
                metadata=metadata
            )
            documents.append(doc)
        
        # リランク処理
        if rerank and len(documents) > top_k:
            documents = await self.rerank_documents(query, documents, top_k)
        
        return documents[:top_k]
    
    async def rerank_documents(
        self, 
        query: str, 
        documents: List[Document], 
        top_k: int
    ) -> List[Document]:
        """ドキュメントのリランク"""
        # Cross-encoder を使用したリランク(簡略版)
        scores = []
        
        for doc in documents:
            # 簡易的なスコアリング(実際にはCross-encoderモデルを使用)
            query_terms = set(query.lower().split())
            doc_terms = set(doc.content.lower().split())
            
            # Jaccard similarity
            intersection = len(query_terms.intersection(doc_terms))
            union = len(query_terms.union(doc_terms))
            jaccard_score = intersection / union if union > 0 else 0
            
            # BM25スコア(簡略版)
            tf_scores = []
            for term in query_terms:
                tf = doc.content.lower().count(term)
                tf_scores.append(tf)
            
            bm25_score = sum(tf_scores) / len(doc.content.split()) if doc.content else 0
            
            # 総合スコア
            combined_score = 0.7 * jaccard_score + 0.3 * bm25_score
            scores.append((doc, combined_score))
        
        # スコアでソート
        scores.sort(key=lambda x: x[1], reverse=True)
        return [doc for doc, _ in scores[:top_k]]
    
    async def generate_answer(
        self, 
        query: str, 
        context_documents: List[Document],
        system_prompt: Optional[str] = None
    ) -> RAGResponse:
        """回答の生成"""
        # コンテキストの構築
        context = self.build_context(context_documents)
        
        # システムプロンプトの設定
        if system_prompt is None:
            system_prompt = """
            あなたは質問応答アシスタントです。提供されたコンテキスト情報を基に、
            正確で有用な回答を提供してください。コンテキストに情報がない場合は、
            その旨を明確に伝えてください。
            """
        
        # プロンプトの構築
        user_prompt = f"""
        コンテキスト:
        {context}
        
        質問: {query}
        
        上記のコンテキストを基に、質問に対する正確な回答を提供してください。
        回答の根拠となる情報も合わせて説明してください。
        """
        
        try:
            # LLMによる回答生成
            response = await self.llm_client.chat.completions.acreate(
                model="gpt-4-turbo-preview",
                messages=[
                    {"role": "system", "content": system_prompt},
                    {"role": "user", "content": user_prompt}
                ],
                temperature=0.1,
                max_tokens=1000
            )
            
            answer = response.choices[0].message.content
            
            # 信頼度スコアの計算(簡略版)
            confidence_score = self.calculate_confidence_score(
                query, answer, context_documents
            )
            
            return RAGResponse(
                answer=answer,
                source_documents=context_documents,
                confidence_score=confidence_score,
                reasoning="Based on the provided context documents"
            )
            
        except Exception as e:
            self.logger.error(f"Error generating answer: {e}")
            return RAGResponse(
                answer="申し訳ございませんが、回答の生成中にエラーが発生しました。",
                source_documents=context_documents,
                confidence_score=0.0,
                reasoning=f"Error: {str(e)}"
            )
    
    def build_context(self, documents: List[Document]) -> str:
        """ドキュメントからコンテキストを構築"""
        context_parts = []
        
        for i, doc in enumerate(documents, 1):
            context_part = f"[文書{i}] {doc.title}\n{doc.content}\n"
            context_parts.append(context_part)
        
        return "\n".join(context_parts)
    
    def calculate_confidence_score(
        self, 
        query: str, 
        answer: str, 
        documents: List[Document]
    ) -> float:
        """信頼度スコアの計算"""
        # 簡易的な信頼度計算
        query_terms = set(query.lower().split())
        answer_terms = set(answer.lower().split())
        
        # ドキュメントとの関連性
        doc_relevance_scores = []
        for doc in documents:
            doc_terms = set(doc.content.lower().split())
            intersection = len(query_terms.intersection(doc_terms))
            doc_score = intersection / len(query_terms) if query_terms else 0
            doc_relevance_scores.append(doc_score)
        
        avg_doc_relevance = np.mean(doc_relevance_scores) if doc_relevance_scores else 0
        
        # 回答の具体性(長さベース)
        answer_specificity = min(len(answer.split()) / 50, 1.0)
        
        # 総合信頼度スコア
        confidence = 0.6 * avg_doc_relevance + 0.4 * answer_specificity
        return min(confidence, 1.0)
    
    async def query(
        self, 
        question: str,
        top_k: int = 5,
        filter_dict: Optional[Dict[str, Any]] = None,
        system_prompt: Optional[str] = None
    ) -> RAGResponse:
        """エンドツーエンドのクエリ処理"""
        try:
            # 関連ドキュメントの取得
            relevant_docs = await self.retrieve_relevant_documents(
                query=question,
                top_k=top_k,
                filter_dict=filter_dict
            )
            
            if not relevant_docs:
                return RAGResponse(
                    answer="関連する情報が見つかりませんでした。",
                    source_documents=[],
                    confidence_score=0.0,
                    reasoning="No relevant documents found"
                )
            
            # 回答の生成
            response = await self.generate_answer(
                query=question,
                context_documents=relevant_docs,
                system_prompt=system_prompt
            )
            
            return response
            
        except Exception as e:
            self.logger.error(f"Error in RAG query: {e}")
            return RAGResponse(
                answer="エラーが発生しました。",
                source_documents=[],
                confidence_score=0.0,
                reasoning=f"Error: {str(e)}"
            )

画像検索システムの実装

マルチモーダル検索エンジン

# multimodal_search.py
import torch
import clip
from PIL import Image
import requests
from io import BytesIO
from typing import List, Dict, Any, Union, Optional

class MultimodalSearchEngine:
    def __init__(self, model_name: str = "ViT-B/32"):
        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        self.model, self.preprocess = clip.load(model_name, device=self.device)
        self.vector_store = None
        
    def set_vector_store(self, vector_store):
        """ベクトルストアの設定"""
        self.vector_store = vector_store
    
    def encode_image(self, image: Union[str, Image.Image, bytes]) -> np.ndarray:
        """画像をベクトルに変換"""
        if isinstance(image, str):
            # URL or file path
            if image.startswith(('http://', 'https://')):
                response = requests.get(image)
                image = Image.open(BytesIO(response.content))
            else:
                image = Image.open(image)
        elif isinstance(image, bytes):
            image = Image.open(BytesIO(image))
        
        # 前処理
        image_input = self.preprocess(image).unsqueeze(0).to(self.device)
        
        # エンコード
        with torch.no_grad():
            image_features = self.model.encode_image(image_input)
            image_features = image_features / image_features.norm(dim=-1, keepdim=True)
        
        return image_features.cpu().numpy()[0]
    
    def encode_text(self, text: str) -> np.ndarray:
        """テキストをベクトルに変換"""
        text_input = clip.tokenize([text]).to(self.device)
        
        with torch.no_grad():
            text_features = self.model.encode_text(text_input)
            text_features = text_features / text_features.norm(dim=-1, keepdim=True)
        
        return text_features.cpu().numpy()[0]
    
    async def index_images(
        self, 
        image_data: List[Dict[str, Any]]
    ) -> Dict[str, int]:
        """画像データのインデクシング"""
        vectors = []
        metadatas = []
        ids = []
        
        for item in image_data:
            try:
                # 画像をエンコード
                image_vector = self.encode_image(item['image'])
                vectors.append(image_vector)
                
                # メタデータの準備
                metadata = {
                    'image_url': item.get('image_url', ''),
                    'title': item.get('title', ''),
                    'description': item.get('description', ''),
                    'tags': item.get('tags', []),
                    'category': item.get('category', ''),
                    'created_at': item.get('created_at', ''),
                    'image_format': item.get('image_format', ''),
                    'image_size': item.get('image_size', {})
                }
                metadatas.append(metadata)
                ids.append(item['id'])
                
            except Exception as e:
                print(f"Error processing image {item.get('id', 'unknown')}: {e}")
                continue
        
        # ベクトルストアに保存
        if vectors:
            result = self.vector_store.upsert_vectors(
                vectors=np.array(vectors),
                metadatas=metadatas,
                ids=ids
            )
            return result
        
        return {'upserted_count': 0}
    
    def search_by_image(
        self, 
        query_image: Union[str, Image.Image, bytes],
        top_k: int = 10,
        filter_dict: Optional[Dict[str, Any]] = None
    ) -> List[Dict[str, Any]]:
        """画像による類似画像検索"""
        # クエリ画像をエンコード
        query_vector = self.encode_image(query_image)
        
        # 類似画像を検索
        results = self.vector_store.query(
            query_vector=query_vector,
            top_k=top_k,
            filter_dict=filter_dict
        )
        
        return results
    
    def search_by_text(
        self, 
        query_text: str,
        top_k: int = 10,
        filter_dict: Optional[Dict[str, Any]] = None
    ) -> List[Dict[str, Any]]:
        """テキストによる画像検索"""
        # クエリテキストをエンコード
        query_vector = self.encode_text(query_text)
        
        # 関連画像を検索
        results = self.vector_store.query(
            query_vector=query_vector,
            top_k=top_k,
            filter_dict=filter_dict
        )
        
        return results
    
    def hybrid_image_search(
        self,
        query_image: Optional[Union[str, Image.Image, bytes]] = None,
        query_text: Optional[str] = None,
        image_weight: float = 0.6,
        text_weight: float = 0.4,
        top_k: int = 10,
        filter_dict: Optional[Dict[str, Any]] = None
    ) -> List[Dict[str, Any]]:
        """ハイブリッド画像検索(画像+テキスト)"""
        if not query_image and not query_text:
            raise ValueError("Either query_image or query_text must be provided")
        
        results = []
        
        if query_image and query_text:
            # 両方のクエリがある場合
            image_vector = self.encode_image(query_image)
            text_vector = self.encode_text(query_text)
            
            # 重み付き平均
            combined_vector = (
                image_weight * image_vector + 
                text_weight * text_vector
            )
            
            results = self.vector_store.query(
                query_vector=combined_vector,
                top_k=top_k,
                filter_dict=filter_dict
            )
            
        elif query_image:
            results = self.search_by_image(
                query_image, top_k, filter_dict
            )
        else:
            results = self.search_by_text(
                query_text, top_k, filter_dict
            )
        
        return results
    
    def generate_image_captions(
        self, 
        images: List[Union[str, Image.Image, bytes]],
        caption_templates: List[str] = None
    ) -> List[Dict[str, Any]]:
        """画像キャプションの生成"""
        if caption_templates is None:
            caption_templates = [
                "a photo of a {}",
                "a picture of a {}",
                "an image of a {}",
            ]
        
        results = []
        
        for image in images:
            try:
                image_vector = self.encode_image(image)
                
                # 事前定義されたキャプションテンプレートとの類似度を計算
                best_captions = []
                
                for template in caption_templates:
                    # 各テンプレートに対して最も類似する概念を見つける
                    # (実際の実装では、より sophisticated な手法を使用)
                    concepts = ["person", "animal", "vehicle", "building", "nature", "object"]
                    
                    best_concept = None
                    best_similarity = -1
                    
                    for concept in concepts:
                        text_vector = self.encode_text(template.format(concept))
                        similarity = np.dot(image_vector, text_vector)
                        
                        if similarity > best_similarity:
                            best_similarity = similarity
                            best_concept = concept
                    
                    if best_concept:
                        best_captions.append({
                            'caption': template.format(best_concept),
                            'confidence': float(best_similarity)
                        })
                
                results.append({
                    'image': image,
                    'captions': sorted(best_captions, key=lambda x: x['confidence'], reverse=True)
                })
                
            except Exception as e:
                results.append({
                    'image': image,
                    'captions': [],
                    'error': str(e)
                })
        
        return results
    
    def cluster_images(
        self, 
        image_vectors: np.ndarray,
        n_clusters: int = 5,
        method: str = "kmeans"
    ) -> Dict[str, Any]:
        """画像のクラスタリング"""
        if method == "kmeans":
            from sklearn.cluster import KMeans
            
            clustering = KMeans(n_clusters=n_clusters, random_state=42)
            cluster_labels = clustering.fit_predict(image_vectors)
            
            return {
                'cluster_labels': cluster_labels.tolist(),
                'cluster_centers': clustering.cluster_centers_.tolist(),
                'inertia': clustering.inertia_
            }
        
        elif method == "hierarchical":
            from sklearn.cluster import AgglomerativeClustering
            
            clustering = AgglomerativeClustering(n_clusters=n_clusters)
            cluster_labels = clustering.fit_predict(image_vectors)
            
            return {
                'cluster_labels': cluster_labels.tolist(),
                'n_clusters': n_clusters
            }
        
        else:
            raise ValueError(f"Unknown clustering method: {method}")

レコメンドシステムの実装

協調フィルタリング + ベクトル検索

# recommendation_system.py
import numpy as np
from typing import List, Dict, Any, Tuple, Optional
from scipy.sparse import csr_matrix
from sklearn.decomposition import TruncatedSVD
from sklearn.metrics.pairwise import cosine_similarity

class HybridRecommendationSystem:
    def __init__(self, vector_store, embedding_manager):
        self.vector_store = vector_store
        self.embedding_manager = embedding_manager
        self.user_item_matrix = None
        self.item_features = None
        self.user_profiles = {}
        self.collaborative_model = None
        
    def build_user_item_matrix(
        self, 
        interactions: List[Dict[str, Any]]
    ) -> csr_matrix:
        """ユーザー-アイテム行列の構築"""
        # ユーザーとアイテムのIDをマッピング
        user_ids = list(set([interaction['user_id'] for interaction in interactions]))
        item_ids = list(set([interaction['item_id'] for interaction in interactions]))
        
        self.user_id_map = {uid: idx for idx, uid in enumerate(user_ids)}
        self.item_id_map = {iid: idx for idx, iid in enumerate(item_ids)}
        self.reverse_user_map = {idx: uid for uid, idx in self.user_id_map.items()}
        self.reverse_item_map = {idx: iid for iid, idx in self.item_id_map.items()}
        
        # 疎行列の作成
        n_users = len(user_ids)
        n_items = len(item_ids)
        
        row_indices = []
        col_indices = []
        data = []
        
        for interaction in interactions:
            user_idx = self.user_id_map[interaction['user_id']]
            item_idx = self.item_id_map[interaction['item_id']]
            rating = interaction.get('rating', 1.0)  # implicit feedback の場合は1
            
            row_indices.append(user_idx)
            col_indices.append(item_idx)
            data.append(rating)
        
        self.user_item_matrix = csr_matrix(
            (data, (row_indices, col_indices)),
            shape=(n_users, n_items)
        )
        
        return self.user_item_matrix
    
    def train_collaborative_filtering(
        self, 
        n_components: int = 50,
        algorithm: str = "svd"
    ):
        """協調フィルタリングモデルの訓練"""
        if self.user_item_matrix is None:
            raise ValueError("User-item matrix not built. Call build_user_item_matrix first.")
        
        if algorithm == "svd":
            self.collaborative_model = TruncatedSVD(
                n_components=n_components,
                random_state=42
            )
            self.user_factors = self.collaborative_model.fit_transform(self.user_item_matrix)
            self.item_factors = self.collaborative_model.components_.T
            
        else:
            raise ValueError(f"Unknown algorithm: {algorithm}")
    
    def build_content_features(
        self, 
        items: List[Dict[str, Any]]
    ):
        """アイテムのコンテンツ特徴量の構築"""
        item_features = {}
        
        for item in items:
            # テキスト特徴量の抽出
            text_content = f"{item.get('title', '')} {item.get('description', '')}"
            text_embedding = self.embedding_manager.encode_text(text_content)
            
            # カテゴリ特徴量
            categories = item.get('categories', [])
            category_features = self._encode_categories(categories)
            
            # 価格、評価などの数値特徴量
            numerical_features = np.array([
                item.get('price', 0),
                item.get('rating', 0),
                item.get('popularity', 0),
                len(item.get('reviews', []))
            ])
            
            # 特徴量の結合
            combined_features = np.concatenate([
                text_embedding,
                category_features,
                numerical_features
            ])
            
            item_features[item['id']] = combined_features
        
        self.item_features = item_features
    
    def _encode_categories(self, categories: List[str]) -> np.ndarray:
        """カテゴリの one-hot エンコーディング"""
        # 簡略化された実装
        all_categories = [
            'electronics', 'books', 'clothing', 'home', 'sports', 
            'automotive', 'health', 'beauty', 'toys', 'groceries'
        ]
        
        encoding = np.zeros(len(all_categories))
        for category in categories:
            if category.lower() in all_categories:
                idx = all_categories.index(category.lower())
                encoding[idx] = 1.0
        
        return encoding
    
    def build_user_profile(
        self, 
        user_id: str, 
        interactions: List[Dict[str, Any]]
    ) -> np.ndarray:
        """ユーザープロファイルの構築"""
        user_interactions = [
            interaction for interaction in interactions 
            if interaction['user_id'] == user_id
        ]
        
        if not user_interactions:
            return np.zeros(self.embedding_manager.dimension)
        
        # インタラクションしたアイテムの特徴量を集約
        item_features = []
        weights = []
        
        for interaction in user_interactions:
            item_id = interaction['item_id']
            if item_id in self.item_features:
                item_features.append(self.item_features[item_id])
                weights.append(interaction.get('rating', 1.0))
        
        if not item_features:
            return np.zeros(self.embedding_manager.dimension)
        
        # 重み付き平均
        weighted_features = np.average(
            np.array(item_features), 
            weights=weights, 
            axis=0
        )
        
        self.user_profiles[user_id] = weighted_features
        return weighted_features
    
    def collaborative_recommendations(
        self, 
        user_id: str, 
        top_k: int = 10,
        exclude_seen: bool = True
    ) -> List[Tuple[str, float]]:
        """協調フィルタリングベースの推薦"""
        if user_id not in self.user_id_map:
            return []
        
        user_idx = self.user_id_map[user_id]
        user_vector = self.user_factors[user_idx]
        
        # 全アイテムに対するスコア計算
        scores = np.dot(self.item_factors, user_vector)
        
        # スコアでソート
        item_scores = [(self.reverse_item_map[idx], score) 
                      for idx, score in enumerate(scores)]
        item_scores.sort(key=lambda x: x[1], reverse=True)
        
        # 既に見たアイテムを除外
        if exclude_seen:
            seen_items = set()
            user_row = self.user_item_matrix[user_idx].toarray()[0]
            for item_idx, rating in enumerate(user_row):
                if rating > 0:
                    seen_items.add(self.reverse_item_map[item_idx])
            
            item_scores = [
                (item_id, score) for item_id, score in item_scores 
                if item_id not in seen_items
            ]
        
        return item_scores[:top_k]
    
    def content_based_recommendations(
        self, 
        user_id: str, 
        top_k: int = 10
    ) -> List[Tuple[str, float]]:
        """コンテンツベースの推薦"""
        if user_id not in self.user_profiles:
            return []
        
        user_profile = self.user_profiles[user_id]
        
        # アイテムとの類似度計算
        item_similarities = []
        for item_id, item_features in self.item_features.items():
            similarity = cosine_similarity(
                user_profile.reshape(1, -1),
                item_features.reshape(1, -1)
            )[0][0]
            item_similarities.append((item_id, similarity))
        
        # 類似度でソート
        item_similarities.sort(key=lambda x: x[1], reverse=True)
        return item_similarities[:top_k]
    
    def vector_search_recommendations(
        self, 
        user_id: str, 
        top_k: int = 10
    ) -> List[Dict[str, Any]]:
        """ベクトル検索ベースの推薦"""
        if user_id not in self.user_profiles:
            return []
        
        user_profile = self.user_profiles[user_id]
        
        # ベクトルデータベースで類似アイテムを検索
        results = self.vector_store.query(
            query_vector=user_profile,
            top_k=top_k,
            include_metadata=True
        )
        
        return results
    
    def hybrid_recommendations(
        self, 
        user_id: str, 
        top_k: int = 10,
        cf_weight: float = 0.4,
        cb_weight: float = 0.3,
        vs_weight: float = 0.3
    ) -> List[Tuple[str, float]]:
        """ハイブリッド推薦"""
        # 各手法からの推薦を取得
        cf_recs = self.collaborative_recommendations(user_id, top_k * 2)
        cb_recs = self.content_based_recommendations(user_id, top_k * 2)
        vs_recs = self.vector_search_recommendations(user_id, top_k * 2)
        
        # スコアの正規化と統合
        combined_scores = {}
        
        # 協調フィルタリングのスコア
        if cf_recs:
            max_cf_score = max(score for _, score in cf_recs)
            for item_id, score in cf_recs:
                normalized_score = score / max_cf_score if max_cf_score > 0 else 0
                combined_scores[item_id] = combined_scores.get(item_id, 0) + cf_weight * normalized_score
        
        # コンテンツベースのスコア
        if cb_recs:
            max_cb_score = max(score for _, score in cb_recs)
            for item_id, score in cb_recs:
                normalized_score = score / max_cb_score if max_cb_score > 0 else 0
                combined_scores[item_id] = combined_scores.get(item_id, 0) + cb_weight * normalized_score
        
        # ベクトル検索のスコア
        if vs_recs:
            max_vs_score = max(result['score'] for result in vs_recs)
            for result in vs_recs:
                item_id = result['id']
                normalized_score = result['score'] / max_vs_score if max_vs_score > 0 else 0
                combined_scores[item_id] = combined_scores.get(item_id, 0) + vs_weight * normalized_score
        
        # 統合スコアでソート
        sorted_recommendations = sorted(
            combined_scores.items(), 
            key=lambda x: x[1], 
            reverse=True
        )
        
        return sorted_recommendations[:top_k]
    
    def evaluate_recommendations(
        self, 
        test_interactions: List[Dict[str, Any]],
        recommendation_method: str = "hybrid",
        top_k: int = 10
    ) -> Dict[str, float]:
        """推薦精度の評価"""
        precisions = []
        recalls = []
        
        # ユーザーごとに評価
        test_users = set([interaction['user_id'] for interaction in test_interactions])
        
        for user_id in test_users:
            # 実際にインタラクションしたアイテム
            actual_items = set([
                interaction['item_id'] for interaction in test_interactions
                if interaction['user_id'] == user_id
            ])
            
            if not actual_items:
                continue
            
            # 推薦結果を取得
            if recommendation_method == "hybrid":
                recommended_items = [
                    item_id for item_id, _ in self.hybrid_recommendations(user_id, top_k)
                ]
            elif recommendation_method == "collaborative":
                recommended_items = [
                    item_id for item_id, _ in self.collaborative_recommendations(user_id, top_k)
                ]
            elif recommendation_method == "content":
                recommended_items = [
                    item_id for item_id, _ in self.content_based_recommendations(user_id, top_k)
                ]
            else:
                continue
            
            recommended_set = set(recommended_items)
            
            # Precision と Recall の計算
            if recommended_set:
                precision = len(actual_items.intersection(recommended_set)) / len(recommended_set)
                precisions.append(precision)
            
            if actual_items:
                recall = len(actual_items.intersection(recommended_set)) / len(actual_items)
                recalls.append(recall)
        
        # 平均値の計算
        avg_precision = np.mean(precisions) if precisions else 0
        avg_recall = np.mean(recalls) if recalls else 0
        f1_score = (2 * avg_precision * avg_recall) / (avg_precision + avg_recall) if (avg_precision + avg_recall) > 0 else 0
        
        return {
            'precision': avg_precision,
            'recall': avg_recall,
            'f1_score': f1_score
        }

パフォーマンス最適化

インデックス最適化

# performance_optimization.py
import time
import psutil
from typing import Dict, Any, List
import numpy as np

class VectorDatabaseOptimizer:
    def __init__(self, vector_store):
        self.vector_store = vector_store
        self.performance_metrics = {}
    
    def benchmark_search_performance(
        self, 
        query_vectors: List[np.ndarray],
        top_k_values: List[int] = [1, 5, 10, 50, 100],
        num_runs: int = 5
    ) -> Dict[str, Any]:
        """検索パフォーマンスのベンチマーク"""
        results = {}
        
        for top_k in top_k_values:
            latencies = []
            throughputs = []
            
            for run in range(num_runs):
                start_time = time.time()
                
                # バッチ検索の実行
                for query_vector in query_vectors:
                    self.vector_store.query(
                        query_vector=query_vector,
                        top_k=top_k
                    )
                
                end_time = time.time()
                total_time = end_time - start_time
                
                # メトリクスの計算
                latency = total_time / len(query_vectors)  # 平均レイテンシ
                throughput = len(query_vectors) / total_time  # QPS
                
                latencies.append(latency)
                throughputs.append(throughput)
            
            results[f'top_k_{top_k}'] = {
                'avg_latency_ms': np.mean(latencies) * 1000,
                'p95_latency_ms': np.percentile(latencies, 95) * 1000,
                'avg_throughput_qps': np.mean(throughputs),
                'min_throughput_qps': np.min(throughputs)
            }
        
        return results
    
    def monitor_resource_usage(self, duration_seconds: int = 60) -> Dict[str, Any]:
        """リソース使用量の監視"""
        cpu_usage = []
        memory_usage = []
        disk_io = []
        
        start_time = time.time()
        
        while time.time() - start_time < duration_seconds:
            # CPU使用率
            cpu_percent = psutil.cpu_percent(interval=1)
            cpu_usage.append(cpu_percent)
            
            # メモリ使用量
            memory_info = psutil.virtual_memory()
            memory_usage.append(memory_info.percent)
            
            # ディスクI/O
            disk_info = psutil.disk_io_counters()
            if disk_info:
                disk_io.append({
                    'read_bytes': disk_info.read_bytes,
                    'write_bytes': disk_info.write_bytes
                })
        
        return {
            'cpu_usage': {
                'avg': np.mean(cpu_usage),
                'max': np.max(cpu_usage),
                'min': np.min(cpu_usage)
            },
            'memory_usage': {
                'avg': np.mean(memory_usage),
                'max': np.max(memory_usage),
                'min': np.min(memory_usage)
            },
            'disk_io': disk_io
        }
    
    def optimize_batch_size(
        self, 
        query_vectors: List[np.ndarray],
        batch_sizes: List[int] = [1, 5, 10, 20, 50, 100]
    ) -> Dict[str, Any]:
        """最適なバッチサイズの特定"""
        results = {}
        
        for batch_size in batch_sizes:
            start_time = time.time()
            
            # バッチ処理
            for i in range(0, len(query_vectors), batch_size):
                batch = query_vectors[i:i + batch_size]
                
                # バッチ検索の実行
                if hasattr(self.vector_store, 'batch_search'):
                    self.vector_store.batch_search(batch)
                else:
                    # 個別検索のフォールバック
                    for vector in batch:
                        self.vector_store.query(query_vector=vector, top_k=10)
            
            end_time = time.time()
            total_time = end_time - start_time
            
            results[f'batch_size_{batch_size}'] = {
                'total_time_seconds': total_time,
                'avg_time_per_vector_ms': (total_time / len(query_vectors)) * 1000,
                'throughput_qps': len(query_vectors) / total_time
            }
        
        # 最適なバッチサイズを特定
        best_batch_size = min(
            results.keys(),
            key=lambda k: results[k]['total_time_seconds']
        )
        
        return {
            'results': results,
            'optimal_batch_size': int(best_batch_size.split('_')[-1]),
            'optimal_performance': results[best_batch_size]
        }
    
    def analyze_index_efficiency(self) -> Dict[str, Any]:
        """インデックス効率の分析"""
        if hasattr(self.vector_store, 'get_index_stats'):
            stats = self.vector_store.get_index_stats()
            
            analysis = {
                'index_size_mb': stats.get('index_size_bytes', 0) / (1024 * 1024),
                'vector_count': stats.get('vector_count', 0),
                'memory_usage_mb': stats.get('memory_usage_bytes', 0) / (1024 * 1024),
                'fragmentation_ratio': self._calculate_fragmentation_ratio(stats)
            }
            
            # 効率性の評価
            if analysis['vector_count'] > 0:
                analysis['bytes_per_vector'] = (
                    stats.get('index_size_bytes', 0) / analysis['vector_count']
                )
                analysis['memory_per_vector'] = (
                    stats.get('memory_usage_bytes', 0) / analysis['vector_count']
                )
            
            return analysis
        
        return {'error': 'Index statistics not available'}
    
    def _calculate_fragmentation_ratio(self, stats: Dict[str, Any]) -> float:
        """フラグメンテーション比率の計算"""
        total_size = stats.get('index_size_bytes', 0)
        used_size = stats.get('used_size_bytes', total_size)
        
        if total_size > 0:
            return 1.0 - (used_size / total_size)
        return 0.0
    
    def recommend_optimizations(
        self, 
        performance_data: Dict[str, Any]
    ) -> List[Dict[str, str]]:
        """最適化の推奨事項"""
        recommendations = []
        
        # レイテンシに基づく推奨
        avg_latency = performance_data.get('search_performance', {}).get('top_k_10', {}).get('avg_latency_ms', 0)
        if avg_latency > 100:  # 100ms以上
            recommendations.append({
                'type': 'latency_optimization',
                'issue': 'High search latency detected',
                'recommendation': 'Consider using a more efficient index algorithm (e.g., HNSW) or reducing vector dimensions',
                'priority': 'high'
            })
        
        # メモリ使用量に基づく推奨
        memory_usage = performance_data.get('resource_usage', {}).get('memory_usage', {}).get('avg', 0)
        if memory_usage > 80:  # 80%以上
            recommendations.append({
                'type': 'memory_optimization',
                'issue': 'High memory usage detected',
                'recommendation': 'Consider implementing vector compression or using disk-based storage',
                'priority': 'medium'
            })
        
        # インデックス効率に基づく推奨
        fragmentation = performance_data.get('index_efficiency', {}).get('fragmentation_ratio', 0)
        if fragmentation > 0.3:  # 30%以上のフラグメンテーション
            recommendations.append({
                'type': 'index_maintenance',
                'issue': 'High index fragmentation detected',
                'recommendation': 'Consider rebuilding the index to improve efficiency',
                'priority': 'low'
            })
        
        return recommendations

まとめ

ベクトルデータベースは、2025年のAI時代において重要なインフラストラクチャとなっています。

選択のポイント

  1. スケール要件 - 小規模ならChromaDB、大規模ならPinecone/Milvus
  2. 運用負荷 - マネージドサービスか自己運用か
  3. 機能要件 - マルチモーダル、フィルタリング、ハイブリッド検索
  4. 予算 - クラウドサービスの料金vs運用コスト
  5. パフォーマンス - レイテンシとスループットの要件

実装のベストプラクティス

  • 適切な次元数 - 用途に応じた埋め込み次元の選択
  • インデックス最適化 - データ特性に合ったアルゴリズム選択
  • バッチ処理 - 効率的なデータ投入と検索
  • 監視と最適化 - パフォーマンスメトリクスの継続的な監視

ベクトルデータベースの適切な選択と実装により、高度なAIアプリケーションの構築が可能になります。