RAG Sample - 2. Build the RAG system
3 min read

RAG Sample - 2. Build the RAG system

RAG Sample - 2. Build the RAG system

Prerequisites

Before starting work, I made sure I have the following items in place:

  1. Ollama - installed on Windows, just to eliminate headaches with having a NVIDIA card available from docker. I also made sure I have llama3:latest installed via ollama pull llama3
  2. Milvus via docker compose. I know this limits its capabilities (GPU access), but for experimentation is OK for me
  3. Milvus Python SDK to be able to work with it. It's a shame it doesn't have a Rust SDK natively :(

Initialise the database

Milvus has a lot of concepts you should get familiar with, like database, schema, index, roles ... However, you can get away only with collections, the client and the concept of index. In my mind, I made the following equivalents with a relational database:

  • MilvusClient - the python client for the DB, same way PostgreSQL has psycopg2
  • Collection - a very special table
  • CollectionSchema - the table description (fields)
  • FieldSchema - the field description
  • index_params - the definition of indexes for that collection

In other words, the Collection is a table whose layout is defined by a CollectionSchema. Each FieldSchema represents a column of the CollectionSchema, and different FieldSchemas can be indexed.

Note: You need to index the vector field (the one(s) with the type FLOAT_VECTOR). This is a prerequisite to load (aka be able to use) a collection.

Once these concepts have been crystalised, I started with creating a client, connected to the uri pointing to my docker install:

uri = "http://10.10.0.10:19530"
client = MilvusClient(uri)

Then, I defined a minimal schema:

name = "my_schema"
fields = [
  FieldSchema(name="id",dtype=DataType.INT64, is_primary=True, auto_id=False),
  FieldSchema(name="embeddings", dtype=DataType.FLOAT_VECTOR, dim=dimensions)
]
schema = CollectionSchema(fields, name)

... and the indexes (see here for more details on preparing indexes):

index_params = client.prepare_index_params()
index_params.add_index(
  field_name="embeddings",
  index_name="_embeddings",
  index_type="IVF_FLAT",
  metric_type="L2",
  params={"nlist": 1024}
)

index_params.add_index(
  index_name="_id",
  field_name="id",
  index_type="STL_SORT"
)

Once this setup is done, I've created the collection and indexes:

client.create_collection(
  collection_name=name,
  schema = schema,
  consistency_level = "Strong"
)

client.create_index(collection_name=name, index_params=index_params)

Optionally, you can add a bit of code to wait until the indexes are created:

connections.connect(client=client)

utility.wait_for_index_building_complete(
  collection_name=COLLECTION_NAME,
  index_name="_embeddings",
)

My code looks like this now:

from pymilvus import MilvusClient, FieldSchema, CollectionSchema, DataType, utility, connections


COLLECTION_NAME = "test_collection"
EMBEDDING_DIM = 1024

def init_collection(uri="http://10.0.0.35:19530", name=COLLECTION_NAME, dimensions=EMBEDDING_DIM, schema= None):
    client = MilvusClient(uri=uri)

    if schema is None:
        fields = [
            FieldSchema(name="id",dtype=DataType.INT64, is_primary=True, auto_id=False),
            FieldSchema(name="embeddings", dtype=DataType.FLOAT_VECTOR, dim=dimensions)
        ]
        schema = CollectionSchema(fields, name)

    if not client.has_collection(name):


        index_params = client.prepare_index_params()
        index_params.add_index(
            field_name="embeddings", 
            index_name="_embeddings",
            index_type="IVF_FLAT",
            metric_type="L2",
            params={"nlist": 1024}
        )
        index_params.add_index(
            index_name="_id",
            field_name="id",
            index_type="STL_SORT"
        )

        client.create_collection(
            collection_name=name,
            schema = schema,
            consistency_level = "Strong"
        )

        client.create_index(collection_name=name, index_params=index_params)
        print(client.list_indexes(collection_name=name))

    return client  

def reset_collection(uri="http://10.0.0.35:19530", name=COLLECTION_NAME, dimensions=EMBEDDING_DIM, schema = None):
    client = MilvusClient(uri=uri)

    if client.has_collection(name):
        client.drop_collection(name)

    return init_collection(uri=uri, name=name, dimensions=dimensions, schema=schema)

if __name__ == "__main__":
    client = reset_collection()
    print(client.get_collection_stats(COLLECTION_NAME))
    connections.connect(client=client)
    utility.wait_for_index_building_complete(
        collection_name=COLLECTION_NAME,
        index_name="_embeddings",
    )

It has the following characteristics:

  • Defines a function to reset a collection (drop and recreate)
  • Defines a function to create a collection if not present
  • Both functions accept custom URIs (to connect to Milvus servers), collection names, schemas and dimensions
  • The main can be used to create the default collection (and ID field and a vector embedding), and it waits until the index for the embedding has been created.

Real-world-ish schema

My documents are stored in paperless-ngx right now, so they have URIs, tags, document types and other components. My schema right now is:

fields = [
	FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
	FieldSchema(name="embeddings", dtype=DataType.FLOAT_VECTOR, dim=1024),
	FieldSchema(name="uri",  dtype=DataType.VARCHAR, max_length=512),
	FieldSchema(name="title",  dtype=DataType.VARCHAR, max_length=512),
	FieldSchema(name="text",  dtype=DataType.VARCHAR, max_length=2048),
]

Fields are:

  • id - the unique ID for the record. It auto-sets the field, because it's difficult to keep track of the last ID obtained (since I'll most likely launch this multiple times to index different documents).
  • text - the text (chunk)
  • embedding - the vector containing the embedding for the text
  • title - a displayable (and meaningful) title
  • uri - the URL pointing to that document

Note: This schema is in a state of flux right now and can be updated in the future.