Chunk and index text

This is part of the Use RAG with Continue.dev series.
Once we have the dependencies installed, we can start filling up the vector db.
Set up
First part, is to create the LanceDB database and embedding model:
from lancedb.embeddings import get_registry
from lancedb import connect
OLLAMA_HOST = "http://10.0.0.35:11434"
OLLAMA_MODEL = "mxbai-embed-large:latest"
db = connect("lance")
embed_model = get_registry().get("ollama").create(
name=OLLAMA_MODEL,
base_url=OLLAMA_HOST,
)
embed_model.host = OLLAMA_HOST
As you can see, I have defined:
OLLAMA_HOST
- the host where Ollama is installedOLLAMA_MODEL
- the embedding model we want to usedb
- the database (warning: The parameter is actually a path). In my case, the DB will be located in thelance
directory, at the same level with the scriptembed_model
- the LanceDB object that defined the embedding we want to use
The approach in LanceDB with the embed_model
is quite useful because we can define the vector field size based on the embedding models' parameters.
I have also defined a text splitter, using llama_index:
from llama_index.embeddings.ollama import OllamaEmbedding
from llama_index.packs.node_parser_semantic_chunking.base import SemanticChunker
ollama_embeddings = OllamaEmbedding(
model_name=OLLAMA_MODEL,
base_url=OLLAMA_HOST,
ollama_additional_kwargs={"mirostat": 0},
)
splitter = SemanticChunker(embed_model=ollama_embeddings)
The LanceDB model
The LanceDB model I defined is quite basic. It has:
- the file name
- the repository name
- the text of the chunk
- the vector embedding
class CodeChunks(LanceModel):
filename: str
repository: str
text: str = embed_model.SourceField()
vector: Vector(embed_model.ndims()) = embed_model.VectorField()
Create necessary tables
When you perform a query from LanceDB you must tell it where to query from, and that's a table. The functions to initialise a table is:
def create_table(name: str):
result = None
try:
result = db.create_table(f"{name}", schema=CodeChunks)
except Exception:
result = db.open_table(f"{name}")
return result
As you can see, the {python} db.create_table()
throws an exception if the table already exists (you can give it an extra parameter to overwrite, but that's not what we want). In that case, we just open the table.
Index a file
Once I have all the components above, file indexing is quite simple:
def index_file(project_name, repo_name, file_name, file_content):
repo_table = create_table(repo_name)
project_table = create_table(project_name)
chunks = splitter.split_text(file_content)
array = []
for chunk in chunks:
array.append({"text": chunk, "repository": repo_name, "filename": file_name})
# debug
print(f" {len(array)} chunks")
repo_table.add(array)
project_table.add(array)
# debug
print(f"repo rows: {repo_table.count_rows()}, project_rows: {project_table.count_rows()}")
First, split the input file in chunks using the text splitter splitter
initialised above. Then, build an array as per model and add the array of chunks to the repository-specific and project-specific tables. This way, the project-specific table will have embeddings from all the code base.
When the xxx_table.add(array)
happens, the embedding model is triggered to create the vectors for each chunk. Much nicer than writing boiler-plate!
I chose to create a table that contains all chunks from my Bitbucket project, and one-table-per-repository which would contain only that repository's embeddings. I did that to see which variant would give me better results.
Index a project
Great! Now we can use the function to index the codebase. The indexing code would looks something along the lines of:
files = get_all_files_from_bitbucket(PROJECT_NAME, repo, main_branch)
print(files)
for filename in files:
if filename.lower().endswith(IGNORE_EXTENSIONS):
print(f"Skipping {filename} because of extension")
else:
print(f"Indexing {filename}:")
content = get_file_from_bitbucket(PROJECT_NAME, repo, main_branch, filename)
# This is the important line
index_file(PROJECT_NAME, repo, filename, content)
Practically:
for each file from the project
get the file's content
index the file
Bonus: Do it with Qdrant
The Qdrant code follows the same approach:
- create a table per repository
- create a table per project
- index a file
- query the vector database
For all, I've strived to keep the same signatures as for LanceDB, so it'll be just an import change :)
Prerequisites
First, we need to create the Qdrant client:
load_dotenv()
client = QdrantClient(host=os.getenv("QDRANT_HOST"), port=int(os.getenv("QDRANT_PORT")))
Then we create the semantic chunker:
# Get the number of dimensions
sample_embedding = ollama_embeddings.get_text_embedding("test")
ollama_embeddings_size = len(sample_embedding)
semantic_chunker = SemanticChunker(embed_model=ollama_embeddings)
Creating the tables
Here are the functions to create tables:
def create_table_per_repo(repo_name: str):
if not client.collection_exists(repo_name):
client.create_collection(
collection_name=repo_name,
vectors_config=VectorParams(size=ollama_embeddings_size, distance=Distance.COSINE),
)
Index a file
The function to add a source code file to the database (with chunking) is:
def add_git_file_to_qdrant(
text: str,
file_name: str,
branch: str,
repository_name: str
):
"""
Splits a file's text into semantic chunks, embeds each chunk, and stores them in Qdrant with Git metadata.
If the file already exists in the collection, its previous embeddings are removed before inserting new ones.
Args:
text (str): The content of the file.
file_name (str): The name of the file.
branch (str): The Git branch.
repository_name (str): The name of the Git repository.
"""
create_table_per_repo(repository_name)
create_table_per_repo(os.getenv("BITBUCKET_PROJECT"))
# Step 1: Remove existing records for the same file
points_selector=Filter(
must=[
FieldCondition(key="file_name", match=MatchValue(value=file_name)),
FieldCondition(key="repository_name", match=MatchValue(value=repository_name)),
]
)
# from the repo_name collection
client.delete(
collection_name=repository_name,
points_selector=points_selector,
)
# from the project collection
client.delete(
collection_name=os.getenv("BITBUCKET_PROJECT"),
points_selector=points_selector,
)
# debug
# print(f"Removed previous entries for {file_name} in Qdrant.")
# Step 2: Chunk the text using SemanticChunker
chunks = semantic_chunker.split_text(text)
# Step 3: Embed each chunk and prepare data for Qdrant
points: List[PointStruct] = []
for chunk in chunks:
embedding = ollama_embeddings.get_text_embedding(chunk) # Generate embedding
point_id = str(uuid.uuid4()) # Generate a unique ID
points.append(
PointStruct(
id=point_id,
vector=embedding,
payload={
"chunk": chunk,
"file_name": file_name,
"branch": branch,
"repository_name": repository_name,
}
)
)
# Step 4: Insert data into Qdrant
try:
client.upsert(collection_name=repository_name, points=points)
client.upsert(collection_name=os.getenv("BITBUCKET_PROJECT"), points=points)
except Exception as e:
print(f"[ERROR] Cannot add file {e}")
It's important to note that add_git_file_to_qdrant()
does an upsert
(i.e. removes the entries for the file if they exists before re-adding it).
Simple! Now all we have to do is actually call it :)
HTH,