Module utilities.milvus_utilities

Expand source code
import numpy as np
import os
from shutil import copyfile
from milvus import Milvus, IndexType, MetricType, Status

def create_collection(client, collection_name, embedding_dim, reset=False):
    """Creates a milvus collection.

    Args:
        client (object): milvus client.
        collection_name (str): given name for the collection to create.
        embedding_dim (int): dimensionality of the vectors to be hosted in the collection.
        reset (bool, optional): If True, the collection will be removed and re-created if it already exists. Defaults to False.

    Returns:
        None
    """    
    status, ok = client.has_collection(collection_name)
    param = {
        'collection_name': collection_name,
        'dimension': embedding_dim,
        'metric_type': MetricType.L2  # optional
    }
    if ok:
        print("Collection already exists!")
        if reset:
            print("Resetting collection...")
            status = client.drop_collection(collection_name)
            client.create_collection(param)
            print("Succesfully created collection!")
    else:
        client.create_collection(param)
        print("Succesfully created collection!")
    return None


def insert_embeddings(client, collection_name, embedding_vectors, buffer_size=256):
    """Given a milvus client, the embedding_vectors will be inserted into the given collection.

    Args:
        client (object): milvus client.
        collection_name (str): name of the collection.
        embedding_vectors (np.array): numpy array of vectors to insert into the collection.
        buffer_size (int, optional): buffer size specified in the server_config.yaml file. Defaults to 256.

    Returns:
        list: milvus ids of all inserted vectors.
    """    
    embedding_size_mb = embedding_vectors.nbytes * 1e-6
    if embedding_size_mb > buffer_size:
        chunks = np.ceil(embedding_size_mb/buffer_size)
        print("Warning: Embeddings size are above the buffer size. Will insert recursively.")
        array_chunks = np.array_split(embedding_vectors, chunks)
        
        all_ids = []
        for i in array_chunks:
            status, ids = client.insert(collection_name=collection_name, records=i)
            if not status.OK():
                print("Insert failed: {}".format(status))  
                raise
            else:
                print("Insertion succesfull.")
                all_ids.extend(ids)
    else:
        status, all_ids = client.insert(collection_name=collection_name, records=embedding_vectors)
        if not status.OK():
            print("Insert failed: {}".format(status))
            raise
        else:
            print("Insertion succesfull.")
    return all_ids


def download_nearest_files(results, inventory, path):
    """Downloads the nearest neighbor files for a given result.

    The inventory argument must be a pandas DataFrame and must at least contain two features named
    image_path and milvus_id. The former representing the path to the images in the filesystem and 
    the latter representing the assigned milvus ids.

    Args:
        results (milvus.client.abstract.TopKQueryResult): resulting object from milvus query.
        inventory (pd.DataFrame): Dataframe containing the image inventory. Read above for more information.
        path (str): Path-like string indicating directory where the files will be saved to.

    Returns:
        None
    """    
    if not os.path.isdir(path):
        os.makedirs(path)

    for i in results.id_array[0]:
        resulting_df = inventory[inventory.milvus_ids == i]
        image_path = resulting_df.image_path.values[0]
        image_name = os.path.basename(image_path)
        new_path = os.path.join(path, image_name)
        copyfile(image_path, new_path)
    return None

Functions

def create_collection(client, collection_name, embedding_dim, reset=False)

Creates a milvus collection.

Args

client : object
milvus client.
collection_name : str
given name for the collection to create.
embedding_dim : int
dimensionality of the vectors to be hosted in the collection.
reset : bool, optional
If True, the collection will be removed and re-created if it already exists. Defaults to False.

Returns

None

Expand source code
def create_collection(client, collection_name, embedding_dim, reset=False):
    """Creates a milvus collection.

    Args:
        client (object): milvus client.
        collection_name (str): given name for the collection to create.
        embedding_dim (int): dimensionality of the vectors to be hosted in the collection.
        reset (bool, optional): If True, the collection will be removed and re-created if it already exists. Defaults to False.

    Returns:
        None
    """    
    status, ok = client.has_collection(collection_name)
    param = {
        'collection_name': collection_name,
        'dimension': embedding_dim,
        'metric_type': MetricType.L2  # optional
    }
    if ok:
        print("Collection already exists!")
        if reset:
            print("Resetting collection...")
            status = client.drop_collection(collection_name)
            client.create_collection(param)
            print("Succesfully created collection!")
    else:
        client.create_collection(param)
        print("Succesfully created collection!")
    return None
def download_nearest_files(results, inventory, path)

Downloads the nearest neighbor files for a given result.

The inventory argument must be a pandas DataFrame and must at least contain two features named image_path and milvus_id. The former representing the path to the images in the filesystem and the latter representing the assigned milvus ids.

Args

results : milvus.client.abstract.TopKQueryResult
resulting object from milvus query.
inventory : pd.DataFrame
Dataframe containing the image inventory. Read above for more information.
path : str
Path-like string indicating directory where the files will be saved to.

Returns

None

Expand source code
def download_nearest_files(results, inventory, path):
    """Downloads the nearest neighbor files for a given result.

    The inventory argument must be a pandas DataFrame and must at least contain two features named
    image_path and milvus_id. The former representing the path to the images in the filesystem and 
    the latter representing the assigned milvus ids.

    Args:
        results (milvus.client.abstract.TopKQueryResult): resulting object from milvus query.
        inventory (pd.DataFrame): Dataframe containing the image inventory. Read above for more information.
        path (str): Path-like string indicating directory where the files will be saved to.

    Returns:
        None
    """    
    if not os.path.isdir(path):
        os.makedirs(path)

    for i in results.id_array[0]:
        resulting_df = inventory[inventory.milvus_ids == i]
        image_path = resulting_df.image_path.values[0]
        image_name = os.path.basename(image_path)
        new_path = os.path.join(path, image_name)
        copyfile(image_path, new_path)
    return None
def insert_embeddings(client, collection_name, embedding_vectors, buffer_size=256)

Given a milvus client, the embedding_vectors will be inserted into the given collection.

Args

client : object
milvus client.
collection_name : str
name of the collection.
embedding_vectors : np.array
numpy array of vectors to insert into the collection.
buffer_size : int, optional
buffer size specified in the server_config.yaml file. Defaults to 256.

Returns

list
milvus ids of all inserted vectors.
Expand source code
def insert_embeddings(client, collection_name, embedding_vectors, buffer_size=256):
    """Given a milvus client, the embedding_vectors will be inserted into the given collection.

    Args:
        client (object): milvus client.
        collection_name (str): name of the collection.
        embedding_vectors (np.array): numpy array of vectors to insert into the collection.
        buffer_size (int, optional): buffer size specified in the server_config.yaml file. Defaults to 256.

    Returns:
        list: milvus ids of all inserted vectors.
    """    
    embedding_size_mb = embedding_vectors.nbytes * 1e-6
    if embedding_size_mb > buffer_size:
        chunks = np.ceil(embedding_size_mb/buffer_size)
        print("Warning: Embeddings size are above the buffer size. Will insert recursively.")
        array_chunks = np.array_split(embedding_vectors, chunks)
        
        all_ids = []
        for i in array_chunks:
            status, ids = client.insert(collection_name=collection_name, records=i)
            if not status.OK():
                print("Insert failed: {}".format(status))  
                raise
            else:
                print("Insertion succesfull.")
                all_ids.extend(ids)
    else:
        status, all_ids = client.insert(collection_name=collection_name, records=embedding_vectors)
        if not status.OK():
            print("Insert failed: {}".format(status))
            raise
        else:
            print("Insertion succesfull.")
    return all_ids