近年、RAGの精度改善の手法が数多く提案されています。
「リランキング」はその一つで、ベクトル検索の結果をユーザーの意図に合うように並び替え、回答精度を向上させる手法として知られています。
一方、データ分析基盤として知られているDatabricksにも生成AIに関連する機能が次々と追加されていて、先日、「Databricks Reranker」という機能がパブリックプレビューとしてアップデートされました。
導入がとても簡単なので、少し実験を交えつつ試してみたいと思います。

目次

1.    データセットの準備
2.    RAGの実装
3.    Rerankerを試す

1. データセットの準備

今回は、検索対象のドキュメントと、それに関するQAを全てLLM(Large Language Model)に作らせます。
まずは次のコードを動かして架空の会社の社内ドキュメントを作成し、ボリュームに格納します。
今回は日本語非対応の埋め込みモデルを使用するため、英語のドキュメントを作成します。
※ボリュームなどのパスは実行環境に合わせて置き換えてください。

ライブラリのインストール

%pip install langchain databricks-vectorsearch "mlflow[databricks]>=3.4.0"

dbutils.library.restartPython()

ドキュメントの作成

import os

import time

import glob

from databricks.sdk import WorkspaceClient

volume_path = "/Volumes/workspace/default/rag_eval_dataset/"

w = WorkspaceClient()

client = w.serving_endpoints.get_open_ai_client()

# LLMモデルを指定

model_name = "databricks-meta-llama-3-3-70b-instruct"

# プロンプトで作成するドキュメントの内容を指示

prompts = [

"Please create a detailed internal document of about 10000 characters regarding the work regulations of 〇△× Corporation.",

"Please create a long internal document of about 10000 characters regarding the information security policy of 〇△× Corporation.",

~~~ 省略 ~~~

]

for i, prompt in enumerate(prompts, 1):

response = client.chat.completions.create(

model=model_name,

messages=[

{"role": "system", "content": "You are an HR representative at a major Japanese company."},

{"role": "user", "content": prompt},

],

)

# 1回目の生成結果を取得

initial_text = response.choices[0].message.content

# さらに文章を拡張するようにLLMに指示

expand_prompt = (

"Please expand and enrich the following internal document, adding more details, examples, and explanations to make it even longer and more comprehensive:\n"

+ initial_text

)

expand_response = client.chat.completions.create(

model=model_name,

messages=[

{"role": "system", "content": "You are an HR representative at a major Japanese company."},

{"role": "user", "content": expand_prompt},

],

)

text = expand_response.choices[0].message.content

    # ドキュメントの内容からタイトルを作成するように指示

title_prompt = "Please provide a concise and appropriate title (no more than 50 characters) for the following internal document:\n" + text

title_response = client.chat.completions.create(

model=model_name,

messages=[

{"role": "system", "content": "You are an HR representative at a major Japanese company."},

{"role": "user", "content": title_prompt},

],

)

title = title_response.choices[0].message.content.strip().replace(" ", "_").replace("/", "_")

# ドキュメントの保存

file_path = os.path.join(volume_path, f"{title}_internal_doc_{i}.txt")

with open(file_path, "w", encoding="utf-8") as f:

f.write(text)

time.sleep(1)

# ドキュメントの読み込み

def load_text_files(folder_path, extension="txt"):

docs = []

for filepath in glob.glob(os.path.join(folder_path, f"*.{extension}")):

with open(filepath, "r", encoding="utf-8") as f:

docs.append({"filename": os.path.basename(filepath), "content": f.read()})

return docs

docs = load_text_files(volume_path)

次に、作成したドキュメントをチャンキングして、デルタテーブルに書き込みます。StandardタイプのVector Search Endpointを使用する場合、デルタテーブルはChange Data Feedを有効にしておく必要があります。

ドキュメントのチャンキング

from pyspark.sql.types import StructType, StructField, StringType

from datetime import datetime, timedelta

import uuid

from langchain.text_splitter import CharacterTextSplitter

schema = StructType([

StructField("id", StringType(), False),

StructField("content", StringType(), False),

StructField("title", StringType(), True),

StructField("updated_at", StringType(), True),

])

# rag_chunkテーブルを作成

spark.sql("""

CREATE TABLE IF NOT EXISTS default.rag_chunk (

id STRING NOT NULL,

content STRING NOT NULL,

title STRING,

updated_at STRING

) USING DELTA

""")

# CDFを有効にしておきます

spark.sql("""

ALTER TABLE default.rag_chunk SET TBLPROPERTIES (delta.enableChangeDataFeed = true)

""")

CHUNK_SIZE = 1000

text_splitter = CharacterTextSplitter(chunk_size=CHUNK_SIZE, separator="\n")

# 各テキストに対してチャンク化

chunks = []

start_date = datetime(2025, 1, 1)

date_counter = 0

for doc in docs:

text_chunks = text_splitter.split_text(doc["content"])

for chunk in text_chunks:

updated_at = (start_date + timedelta(days=date_counter)).strftime("%Y-%m-%d")

chunks.append({

"id": str(uuid.uuid4()),

"content": chunk,

"title": doc["filename"],

"updated_at": updated_at,

})

date_counter += 1

# Spark DataFrameに変換

df_chunks = spark.createDataFrame(chunks, schema=schema)

# rag_chunkテーブルに追加

df_chunks.write.format("delta").mode("append").saveAsTable("default.rag_chunk")

display(df_chunks)

続いて、評価用のQAペアを作成します。
MLflowが扱える形にして保存しておきます。

評価用QAを作成

eval_qa = []

for doc in docs:

prompt = f"""

Based on the following text, generate 10 pairs of questions and answers that test understanding of the content.

Please use the following output format:

Q:

A:

Text:

{doc["content"]}

"""

response = client.chat.completions.create(

# Provide a valid model name for your LLM provider.

model=model_name,

messages=[{"role": "user", "content": prompt}],

temperature=0,

)

qna_text = response.choices[0].message.content

print(qna_text)

# Q&A をRAG用評価データセット形式に整形

current_q = None

current_a = None

for block in qna_text.split("\n"):

if block.startswith("Q:"):

current_q = block.replace("Q:", "").strip()

elif block.startswith("A:"):

current_a = block.replace("A:", "").strip()

if current_q and current_a:

eval_qa.append({

"inputs": {"query": current_q},

"expectations": {"expected_facts": [current_a]},

})

current_q = None

current_a = None

print(eval_qa)

import json

with open("/Volumes/workspace/default/rag_eval_qa/rag_eval_qa.json", "w", encoding="utf-8") as f:

json.dump(eval_qa, f, ensure_ascii=False, indent=2)

print("RAG用評価データセットを作成しました。")

これで、事前準備は完了です。

2. RAGの実装

続いてRAGの実装に入ります。
ベクトル検索を使うためにVector Search Endpoint, Vector Search Indexが必要になります。
コンピュートの画面で上のタブ → Vector Search → 「エンドポイントを作成」ボタンをクリックし、Vector Search Endpointを作成します。

図1:Vector Search Endpoint作成ボタンをクリック
拡大
図1:Vector Search Endpoint作成ボタンをクリック
図2:Vector Search Endpointの作成
拡大
図2:Vector Search Endpointの作成

カタログエクスプローラー上で作成したデルタテーブルを開き、Vector Search Indexを作成します。
埋め込みモデルにはdatabricks-gte-large-enを使用します。

図3:Vector Search Index作成ボタンをクリック
拡大
図3:Vector Search Index作成ボタンをクリック
図4:Vector Search Indexの作成
拡大
図4:Vector Search Indexの作成

次に、RAG評価のためにMLflowエクスペリメントを作成します。

図5:「GenAIアプリとエージェント」をクリック
拡大
図5:「GenAIアプリとエージェント」をクリック
図6:MLflowエクスペリメントを作成
拡大
図6:MLflowエクスペリメントを作成

これで、必要なものは全て用意できたので、いよいよRAGを実装していきます。
まずはMLflowとVector Searchを利用できるようにします。

MLflowとVector Searchの準備

import mlflow

from databricks.sdk import WorkspaceClient

mlflow.openai.autolog()

mlflow.set_experiment("/path/to/MLflow3")

from databricks.vector_search.client import VectorSearchClient

vs_client = VectorSearchClient()

index = vs_client.get_index(endpoint_name="vs_endpoint", index_name="workspace.default.vs_index")

続いて、RAGの処理を実装していきます。

RAGアプリの実装

from mlflow.entities import Document

from typing import List

@mlflow.trace(span_type="RETRIEVER")

def retrieve_docs(query: str) -> List[Document]:

    results = index.similarity_search(

        query_text=query,

        columns=["content"],

        num_results=10,

    )

    docs = []

    data_array = results["result"]["data_array"]

    for i, row in enumerate(data_array):

        content = row[0]

        score = row[1]

        docs.append(

            Document(

                id=f"vs_doc_{i+1}",

                page_content=content,

                metadata={"score": score},

            )

        )

    return docs

@mlflow.trace

def rag_app(query: str):

    retrieved_documents = retrieve_docs(query=query)

    retrieved_docs_text = "\n".join([f"{i+1}. {doc.page_content}" for i, doc in enumerate(retrieved_documents)])

    messages_for_llm = [

        {

            "role": "system",

            "content": f"You are an excellent assistant at 〇△× Corporation. Based on the following context, please answer questions regarding the company: {retrieved_docs_text}. If the information necessary to answer is not included in the context and the question is not of a general nature, please respond with: 'No information was found to answer the question'.",

        },

        {

            "role": "user",

            "content": query,

        },

    ]

    response = client.chat.completions.create(

        model=model_name,

        messages=messages_for_llm,

        temperature=0,

    )

    return response

RAGが実装できたので、予め用意した評価用QAデータセットで実際に評価してみましょう。

評価を実施するコードの実装

import json

with open("/Volumes/workspace/default/rag_eval_qa/rag_eval_qa.json", "r") as f:

    eval_qa = json.load(f)

from mlflow.genai.scorers import (

    Correctness,

    RelevanceToQuery,

    RetrievalRelevance,

    RetrievalSufficiency,

)

# Run predefined scorers that require ground truth

mlflow.genai.evaluate(

    data=eval_qa,

    predict_fn=rag_app,

    scorers=[

        Correctness(),

        RelevanceToQuery(),

        RetrievalRelevance(),

        RetrievalSufficiency(),

    ],

)

これを実行した後、MLflowのログを確認すると、RAGの動きを確認できます。

3. Rerankerを試す

similarity_search()関数の引数rerankerにDatabricksRerankerを指定するだけで、Rerankerを試すことができます。
columns_to_rerankには、リランキング時に考慮する情報を指定します。

DatabricksRerankerの設定

from databricks.vector_search.reranker import DatabricksReranker

results = index.similarity_search(

        query_text=query,

        columns=["id", "content", "title", "updated_at"],

        num_results=5,

        reranker=DatabricksReranker(columns_to_rerank=["content"])

)

実際に、Rerankerを使わない場合と使う場合の結果を比べてみます。
結果は次のようになりました。

図7:Reranker未使用時の結果
拡大
図7:Reranker未使用時の結果
図8:Reranker使用時の結果
拡大
図8:Reranker使用時の結果

コンテキストの十分性は87%→92%に、回答の正確性は74%→78%に精度が向上していることが確認できました。(全190件)
Rerankerを使用して改善したケースを見てみると、未使用時は全く検索に含まれませんが、使用時には検索結果のトップに来ていることが分かります。

図9:Reranker未使用時の結果
拡大
図9:Reranker未使用時の結果
図10:Reranker使用時の結果
拡大
図10:Reranker使用時の結果

次は、似たような内容のドキュメントが2つあるような状況を再現してみます。
ここでは極端な例として、内容の一部だけを書き換え、タイトルが異なる2つのドキュメントを用意します。

タイトル:New_Employee_Training_Manual

**Training Manual** 
~~~省略~~~
Duration and Evaluation: The program typically lasts one to three months. After each module there are comprehension checks and one-on-one meetings with supervisors. Final evaluation is based on achievement of assigned tasks.

 

タイトル:Mid-Career_Training_Manual

**Training Manual**
~~~省略~~~
Duration and Evaluation: The program typically lasts one to two weeks. After each module there are comprehension checks and one-on-one meetings with supervisors. Final evaluation is based on achievement of assigned tasks.

 

次に、リランキングを使用する際にタイトルを考慮するようにします。

columns_to_rerankにtitleを追加

from databricks.vector_search.reranker import DatabricksReranker

results = index.similarity_search(

        query_text=query,

        columns=["id", "content", "title", "updated_at"],

        num_results=5,

        reranker=DatabricksReranker(columns_to_rerank=["content", "title"])

)

そして、以下のような質問文で検索してみます。

According to the Training Manual, how long is the training period for mid-career hires?

結果としては次のようになりました。
ちゃんとタイトルが考慮され、正しいドキュメント(「新人研修」ではなく「中途研修」のドキュメント)が取得できるようになっています。

図11:Reranker未使用時の結果
拡大
図11:Reranker未使用時の結果
図12:Reranker使用時の結果
拡大
図12:Reranker使用時の結果

まとめ

LLMに作らせた簡易的なデータセットでもRerankerの効果がはっきり表れたことには驚きでした。
簡単に導入できることも分かったので、DatabricksでRAGを構築するならぜひ試してみるべき機能だと思います。
ちなみに、更新日付をリランキングのパラメータに入れて、より新しいドキュメントが取得できるようになるか確認する実験も試しましたが、そちらは失敗でした。(むしろ悪化)
もしかしたら得意不得意があるのかもしれません。気になる方はぜひお試しください!