Prerequisites
Before starting work, I made sure I have the following items in place:
- Ollama - installed on Windows, just to eliminate headaches with having a NVIDIA card available from docker. I also made sure I have
llama3:latest
installed viaollama pull llama3
- Milvus via docker compose. I know this limits its capabilities (GPU access), but for experimentation is OK for me
- Milvus Python SDK to be able to work with it. It's a shame it doesn't have a Rust SDK natively :(
Initialise the database
Milvus has a lot of concepts you should get familiar with, like database, schema, index, roles ... However, you can get away only with collections, the client and the concept of index. In my mind, I made the following equivalents with a relational database:
- MilvusClient - the python client for the DB, same way PostgreSQL has
psycopg2
- Collection - a very special table
- CollectionSchema - the table description (fields)
- FieldSchema - the field description
- index_params - the definition of indexes for that collection
In other words, the Collection
is a table whose layout is defined by a CollectionSchema
. Each FieldSchema
represents a column of the CollectionSchema
, and different FieldSchema
s can be indexed.
Note: You need to index the vector field (the one(s) with the type FLOAT_VECTOR
). This is a prerequisite to load (aka be able to use) a collection.
Once these concepts have been crystalised, I started with creating a client, connected to the uri
pointing to my docker install:
uri = "http://10.10.0.10:19530"
client = MilvusClient(uri)
Then, I defined a minimal schema:
name = "my_schema"
fields = [
FieldSchema(name="id",dtype=DataType.INT64, is_primary=True, auto_id=False),
FieldSchema(name="embeddings", dtype=DataType.FLOAT_VECTOR, dim=dimensions)
]
schema = CollectionSchema(fields, name)
... and the indexes (see here for more details on preparing indexes):
index_params = client.prepare_index_params()
index_params.add_index(
field_name="embeddings",
index_name="_embeddings",
index_type="IVF_FLAT",
metric_type="L2",
params={"nlist": 1024}
)
index_params.add_index(
index_name="_id",
field_name="id",
index_type="STL_SORT"
)
Once this setup is done, I've created the collection and indexes:
client.create_collection(
collection_name=name,
schema = schema,
consistency_level = "Strong"
)
client.create_index(collection_name=name, index_params=index_params)
Optionally, you can add a bit of code to wait until the indexes are created:
connections.connect(client=client)
utility.wait_for_index_building_complete(
collection_name=COLLECTION_NAME,
index_name="_embeddings",
)
My code looks like this now:
from pymilvus import MilvusClient, FieldSchema, CollectionSchema, DataType, utility, connections
COLLECTION_NAME = "test_collection"
EMBEDDING_DIM = 1024
def init_collection(uri="http://10.0.0.35:19530", name=COLLECTION_NAME, dimensions=EMBEDDING_DIM, schema= None):
client = MilvusClient(uri=uri)
if schema is None:
fields = [
FieldSchema(name="id",dtype=DataType.INT64, is_primary=True, auto_id=False),
FieldSchema(name="embeddings", dtype=DataType.FLOAT_VECTOR, dim=dimensions)
]
schema = CollectionSchema(fields, name)
if not client.has_collection(name):
index_params = client.prepare_index_params()
index_params.add_index(
field_name="embeddings",
index_name="_embeddings",
index_type="IVF_FLAT",
metric_type="L2",
params={"nlist": 1024}
)
index_params.add_index(
index_name="_id",
field_name="id",
index_type="STL_SORT"
)
client.create_collection(
collection_name=name,
schema = schema,
consistency_level = "Strong"
)
client.create_index(collection_name=name, index_params=index_params)
print(client.list_indexes(collection_name=name))
return client
def reset_collection(uri="http://10.0.0.35:19530", name=COLLECTION_NAME, dimensions=EMBEDDING_DIM, schema = None):
client = MilvusClient(uri=uri)
if client.has_collection(name):
client.drop_collection(name)
return init_collection(uri=uri, name=name, dimensions=dimensions, schema=schema)
if __name__ == "__main__":
client = reset_collection()
print(client.get_collection_stats(COLLECTION_NAME))
connections.connect(client=client)
utility.wait_for_index_building_complete(
collection_name=COLLECTION_NAME,
index_name="_embeddings",
)
It has the following characteristics:
- Defines a function to reset a collection (drop and recreate)
- Defines a function to create a collection if not present
- Both functions accept custom URIs (to connect to Milvus servers), collection names, schemas and dimensions
- The main can be used to create the default collection (and ID field and a vector embedding), and it waits until the index for the embedding has been created.
Real-world-ish schema
My documents are stored in paperless-ngx right now, so they have URIs, tags, document types and other components. My schema right now is:
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
FieldSchema(name="embeddings", dtype=DataType.FLOAT_VECTOR, dim=1024),
FieldSchema(name="uri", dtype=DataType.VARCHAR, max_length=512),
FieldSchema(name="title", dtype=DataType.VARCHAR, max_length=512),
FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=2048),
]
Fields are:
id
- the unique ID for the record. It auto-sets the field, because it's difficult to keep track of the last ID obtained (since I'll most likely launch this multiple times to index different documents).text
- the text (chunk)embedding
- the vector containing the embedding for the texttitle
- a displayable (and meaningful) titleuri
- the URL pointing to that document
Note: This schema is in a state of flux right now and can be updated in the future.
Member discussion: