Implementing RAG in a Django Application: A Simple Guide
The goal of this post is to implement some of the current main uses of Large Language Models (LLMs) in a common Django application, specifically the technique called Retrieval-Augmented Generation (RAG), which allows users to chat with their documents. Before diving into the implementation, I need to explain some of the base building blocks of RAG, namely embeddings.
Embeddings
Embeddings are essentially multidimensional vectors of float-point numbers with a fixed size that contain information in some way representing the content and context.
[[0.011513561010360718,
-0.02314218506217003,
-0.0171588733792305,
-0.03912165388464928,
-0.021027889102697372,
0.02629205398261547,
-0.03854633495211601,
...]]
Truncated embedding of the word "Amazon"
An interesting aspect of embeddings is that, by storing them in a database, we can perform operations between them to measure the similarity between contents. For example, by converting the term 'Amazon' into an embedding and comparing it with the embeddings of 'Brazil' and 'Food' in its vector space, the distance between 'Amazon' and 'Brazil' is smaller than between 'Amazon' and 'Food', indicating that 'Amazon' and 'Brazil' are more similar.
from langchain_openai import OpenAIEmbeddings
from pgvector.django import CosineDistance
from project.app.models import MyDocument
embeddings_function = OpenAIEmbeddings()
embedding1 = embeddings_function.embed_documents(["Brazil"])[0]
embedding2 = embeddings_function.embed_documents(["Food"])[0]
MyDocument.objects.annotate(distance=CosineDistance("embedding", embedding1)).order_by("distance")[0].distance
0.17931762111467797
MyDocument.objects.annotate(distance=CosineDistance("embedding", embedding2)).order_by("distance")[0].distance
0.1892012486575828
This is called semantic similarity search and will be responsible for the retrieval part of RAG. In essence, we store every document as an embedding and then retrieve it through similarity search later. However, to store these embeddings and perform the retrieval, we need a vector database; for that, we'll use pgvector.
Pgvector
Currently, there are several different vector databases being used, such as Chroma or LanceDB. Since one of the most common database used in Django applications is Postgres, instead of adding a new database to your current application, a good alternative is to use pgvector. Pgvector is a Postgres extension that gives the capabilities of a vector database, allowing you to store embeddings and perform semantic similarity search. To install pgvector, you should follow these instructions, and then run this command inside your Postgres instance:
CREATE EXTENSION vector;
and then, install the pgvector python module
pip install pgvector
After that, it's already possible to store the embeddings. However, instead of manually doing that, we're going to save them using Django models. To save embeddings in the database, it's simple; just create or update a model and add a field called VectorField
with the number of dimensions your embedding will have, like this:
from Django.db import models
from pgvector.django import VectorField
class MyDocument(models.Model):
# ...
embedding = VectorField(dimensions=1536) # Assuming we are using the text-embedding-ada-002 embedding model
After that, just create and run the database migration using the following commands:
python manage.py makemigrations
python manage.py migrate
Now, with the ability to store embeddings using Django's ORM, we're able to leverage pgvector to store and calculate the similarity of the embeddings in the Postgres database.
However, before we can do that, we need to convert the content of documents into embeddings in the first place. For this task, we'll use Langchain.
Langchain
Langchain is a framework that gives us tools to build web applications that can interact with LLMs, including modules and classes that can generate embeddings. One of them is OpenAIEmbeddings
, which uses one of OpenAI's models to generate the embeddings. For this example, we'll use the default model (text-embedding-ada-002
) which generates a vector with 1536 dimensions.
pip install langchain langchain-openai
from langchain_openai import OpenAIEmbeddings
embeddings_function = OpenAIEmbeddings()
# Replace 'example text' with your actual content
embeddings = embeddings_function.embed_documents(['example text'])
Now that we've set up our embedding generation pipeline using Langchain, we need to load the documents themselves.
To do this, we'll use one of the PDF loaders provided by Langchain, which is PyMuPDF. The loader will read the content of the PDF and generate a list of documents
(instances of the class Document from langchain) that we can then convert into embeddings.
pip install PyMuPDF
from langchain_community.document_loaders import PyMuPDFLoader
from project.app import MyDocument
file_path = "example.pdf"
loader = PyMuPDFLoader(file_path)
documents = loader.load()
The output of loader.load()
will look like this:
[Document(metadata={'source': 'example.pdf', 'file_path': 'example.pdf', 'page': 0, 'total_pages': 1, 'format': 'PDF 2.0', 'title': '', 'author': '', 'subject': '', 'keywords': '', 'creator': '', 'producer': '', 'creationDate': 'D:20241009145551Z', 'modDate': 'D:20241009145551Z', 'trapped': ''}, page_content='Just an example \n')]
Now we can iterate the documents
list to extract the page_content
and generate the embeddings:
documents_content = [document.page_content for document in documents]
embeddings = embeddings_function.embed_documents(documents_content)
And then we can save the embeddings in the model in this way.
for embedding, document in zip(embeddings, documents):
MyDocument.objects.create(
embedding=embedding,
source=file_path,
content=document.page_content
)
As we can see, We've added new fields, source
and content
, to the MyDocument
model. This will allow us to retrieve the original URL and content of the source of each document later.
After populating the database with the embeddings, we can start implementing the main feature of this application: the RAG.
RAG
The RAG technique involves injecting the content of documents into the context of the LLM models. However, due to the limitations on the number of tokens that can be included in the context, it's necessary to retrieve only the most relevant documents related to the user's input — this is the retrieval phase of RAG.
Since we're creating a chatbot application, we'll create a conversational RAG. To do this, we'll use the appropriate langchain modules to develop our function.
from langchain_openai import ChatOpenAI
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain.chains.history_aware_retriever import create_history_aware_retriever
from langchain.chains.retrieval import create_retrieval_chain
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.messages import BaseMessage
from langchain_core.pydantic_v1 import BaseModel
from langchain_core.pydantic_v1 import Field
from django.conf import settings
from project.app.retrievers import DocumentRetriever
class InMemoryHistory(BaseChatMessageHistory, BaseModel):
"""In memory implementation of chat message history."""
messages: list[BaseMessage] = Field(default_factory=list)
def add_message(self, message: BaseMessage) -> None:
"""Add a self-created message to the store"""
self.messages.append(message)
def clear(self) -> None:
self.messages = []
store = {}
def conversational_rag(
condense_question_prompt_template,
system_prompt_template,
):
llm = ChatOpenAI(api_key=settings.OPENAI_API_KEY)
condense_question_prompt = ChatPromptTemplate.from_messages(
[
("system", condense_question_prompt_template),
("placeholder", "{chat_history}"),
("human", "{input}"),
],
)
history_aware_retriever = create_history_aware_retriever(
llm,
DocumentRetriever(max_results=4),
condense_question_prompt,
)
qa_prompt = ChatPromptTemplate.from_messages(
[
("system", system_prompt_template),
("placeholder", "{chat_history}"),
("human", "{input}"),
],
)
qa_chain = create_stuff_documents_chain(
llm=llm,
prompt=qa_prompt,
)
convo_qa_chain = create_retrieval_chain(history_aware_retriever, qa_chain)
def get_session_history(session_id):
if session_id not in store:
store[session_id] = InMemoryHistory()
return store[session_id]
return RunnableWithMessageHistory(
runnable=convo_qa_chain,
get_session_history=get_session_history,
input_messages_key="input",
history_messages_key="chat_history",
output_messages_key="answer",
)
Here we can see that we're receiving two parameters for prompting: condense_question_prompt_template
and system_prompt_template
. The first one is a prompt used to condense or rephrase the original question from the user, achieved through an LLM call.
condense_question_prompt_template = """
Given a chat history and the latest user question
which might reference context in the chat history,
formulate a standalone question which can be understood
without the chat history. Do NOT answer the question,
just reformulate it if needed and otherwise return it as is.
"""
The second one defines how the model should answer questions based on the context (the retrieved documents).
system_prompt_template = """
You are an assistant for answering questions.
You should answer based on the provided context, and the conversation history.
If you don't have any context, just say "I don't know".
Context: {context}
"""
The use of condense_question_prompt_template
is necessary in cases where the input lacks a specific term for a subject, such as "Give me more information about this." Without knowing the context, it's impossible to determine what "this" refers to, making it harder to find similar documents in the database. To solve this issue, the LLM uses the current conversation context to rephrase the user's input and retrieve relevant documents.
This prompt is used in the call to history_aware_retriever
, which receives an instance of DocumentRetriever
.
history_aware_retriever = create_history_aware_retriever(
llm,
DocumentRetriever(max_results=4),
condense_question_prompt,
)
The retriever is implemented in this way:
from langchain_core.callbacks import CallbackManagerForRetrieverRun
from langchain_core.documents import Document
from langchain_core.retrievers import BaseRetriever
from pgvector.django import CosineDistance
from langchain_openai import OpenAIEmbeddings
from django.conf import settings
from project.app.models import MyDocument
class DocumentRetriever(BaseRetriever):
max_results: int
class Config:
arbitrary_types_allowed = True
def _get_relevant_documents(
self,
query: str,
*,
run_manager: CallbackManagerForRetrieverRun,
) -> list[Document]:
embeddings_function = OpenAIEmbeddings(api_key=settings.OPENAI_API_KEY)
embeddings = embeddings_function.embed_documents([query])
documents = MyDocument.objects.annotate(
distance=CosineDistance("embedding", embeddings[0])
).order_by("distance")
return [
Document(
page_content=document.content,
)
for document in documents[:self.max_results]
]
This class defines how to retrieve documents and sets the parameters for the Document
object, which is then called internally by the history_aware_retriever
.
In this implementation, a queryset is created using the CosineDistance
metric to calculate the distance (or similarity) between the question's embedding and the embeddings stored in the database. The results are ordered based on the distance value, where smaller distances indicate greater similarity between the documents. (Note: to better understand why cosine distance is used, refer to this article.)
The other parameter, max_results
, determines how many documents are retrieved from the list, and in this case, we're using the top 4 documents.
After defining the history_aware_retriever
, we need to define the function responsible for actually responding to the user's input, the callable qa_chain
, which uses the system_prompt_template
parameter.
qa_prompt = ChatPromptTemplate.from_messages(
[
("system", system_prompt_template),
("placeholder", "{chat_history}"),
("human", "{input}"),
],
)
qa_chain = create_stuff_documents_chain(
llm=llm,
prompt=qa_prompt,
)
Next, we will combine both callables into a single one called convo_qa_chain
, as follows:
convo_qa_chain = create_retrieval_chain(history_aware_retriever, qa_chain)
In the final part, we use convo_qa_chain
as an argument for the RunnableWithMessageHistory
class. This class is responsible for adding a conversation history to the LLM's context.
RunnableWithMessageHistory(
runnable=convo_qa_chain,
get_session_history=get_session_history,
input_messages_key="input",
history_messages_key="chat_history",
output_messages_key="answer",
)
The second parameter, get_session_history
, takes a function responsible for retrieving the conversation history based on the session_id
, which will be passed later when the conversation_rag
function is called.
def get_session_history(session_id):
if session_id not in store:
store[session_id] = InMemoryHistory()
return store[session_id]
In this example, we are storing the conversation in memory, but in production applications, it is recommended to use a database like Redis.
The history_messages_key
parameter, defines the name of the variable that contains the conversation history and will be used in the template. The parameters input_messages_key
and output_messages_key
correspond to the input variable of the template (the question of the user) and the key of dict with the response of the LLM model.
With the instance ready, we can now return and show how to use this in a view function.
View
In the final part, we define a simple view function that receives a POST request with the user's question and passes the session_id
as an argument.
def ask_ai(request):
user_question = request.POST.get("question")
chat_session_id = request.POST.get("chat_session_id")
if not user_question or not chat_session_id:
return HttpResponseBadRequest()
condense_question_prompt_template = """
Given a chat history and the latest user question
which might reference context in the chat history,
formulate a standalone question which can be understood
without the chat history. Do NOT answer the question,
just reformulate it if needed and otherwise return it as is.
"""
system_prompt_template = """
You are an assistant for answering questions.
You should answer based on the provided context, and the conversation history.
If you don't have any context, just say "I don't know".
Context: {context}
"""
async def message_stream():
data = {
"input": user_question,
}
rag_chain = conversational_rag(
condense_question_system_template,
system_prompt_template,
)
async for chunk in rag_chain.astream(
data,
{"configurable": {"session_id": chat_session_id}},
):
yield chunk.get("answer", "")
response = StreamingHttpResponse(message_stream(), content_type="text/event-stream")
response["Cache-Control"] = "no-cache"
response["X-Accel-Buffering"] = "no"
return response
In this view, we are returning the result as a StreamingHttpResponse
, which enhances the user experience by allowing parts of the response to be sent as they are generated, instead of waiting for the entire process to complete. This behavior is similar to how ChatGPT operates.
To achieve this effect, we need to pass a function that returns a generator. The function message_stream
is responsible for calling the conversational_rag
function, which returns an instance of RunnableWithMessageHistory
that we defined earlier. We then call the method astream
on this instance, which returns the response in chunks.
With all this defined, we can now write some JavaScript to consume the response in the frontend and achieve the desired effect.
async function askChatbot(question) {
const formData = new FormData();
formData.append("question", question);
formData.append("chat_session_id", `{{ chat_session_id }}`);
const response = await fetch(`{% url "app:ask_ai" %}`, {
method: 'POST',
headers: {
'Content-Type': 'application/x-www-form-urlencoded',
'X-CSRFToken': `{{ csrf_token }}`
},
body: new URLSearchParams(formData)
});
if (!response.ok) {
throw new Error('Network response was not ok');
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
let answer = '';
while (true) {
const {
done,
value
} = await reader.read();
if (done) break;
let messageChunk = decoder.decode(value, {
stream: true
});
// Concatenate the chunks of the answer
answer += messageChunk;
}
}
RAG using the file of the book "The pragmatic programmer"
Conclusion
The application architecture we've constructed is relatively simple and provides a good foundation for future development. However, there's still plenty of room for improvement, and this implementation doesn't cover the construction of the frontend.
That being said, by following these steps, you should be able to add some interesting features to an existing Django application, including conversational interfaces, natural language processing, and more advanced user interaction capabilities.