pymilvus: Error occurred. Socket closed

以512线程运行milvus搜索向量,运行几个小时后,出现搜索接口不能使用,其他接口正常。

Environment details

  • docker milvusdb/milvus 0.10.1-cpu-d072020-bd02b1
  • pymilvus 0.2.14

code


from milvus import Milvus, IndexType, MetricType, Status
import os

MILVUS_HOST = os.getenv("MILVUS_HOST", "127.0.0.1")
MILVUS_PORT = os.getenv("MILVUS_PORT", 19530)
VECTOR_DIMENSION = os.getenv("VECTOR_DIMENSION", 2048)


def milvus_client():
    """
    连接milvus服务
    :return:
    """
    try:
        client = Milvus(host=MILVUS_HOST, port=MILVUS_PORT)
        print("连接成功。。。")
        return client
    except Exception as e:
        print(e)


def create_table(client, table_name=None, dimension=VECTOR_DIMENSION,
                 index_file_size=1024, metric_type=MetricType.L2):
    """
    创建table
    :param client: milvus服务
    :param table_name: 表名
    :param dimension: 维度
    :param index_file_size: 索引文件大小
    :param metric_type: 距离计算方式  https://www.milvus.io/cn/docs/v0.9.0/guides/metric.md
    :return:
    """
    table_param = {
        'collection_name': table_name,
        'dimension': dimension,
        'index_file_size': index_file_size,
        'metric_type': metric_type
    }
    try:
        status = client.create_collection(table_param)
        return status
    except Exception as e:
        print(e)


def insert_vectors(client, table_name, vectors):
    """
    插入向量
    :param client: milvus服务
    :param table_name: 表名
    :param vectors: 特征向量
    :return:
    """
    if not client.has_collection(collection_name=table_name):
        print("collection %s not exist", table_name)
        return
    try:
        status, ids = client.insert(collection_name=table_name, records=vectors)
        return status, ids
    except Exception as e:
        print(e)


def create_index(client, table_name):
    """
    创建索引
    :param client: milvus服务
    :param table_name: 表名
    :return:
    """
    param = {'nlist': 2048}
    status = client.create_index(table_name, IndexType.IVF_FLAT, param)
    return status


def delete_table(client, table_name):
    """
    删除表
    :param client: milvus服务
    :param table_name: 表名
    :return:
    """
    status = client.drop_collection(collection_name=table_name)
    print(status)
    return status


def search_vectors(client, table_name, vectors, top_k):
    """
    向量搜索
    :param client: milvus服务
    :param table_name: 表名
    :param vectors: 向量
    :param top_k:
    :return:
    """
    search_param = {'nprobe': 64}
    status, res = client.search(collection_name=table_name, query_records=vectors, top_k=top_k, params=search_param)
    return status, res


def has_table(client, table_name):
    """
    判断表名是否存在
    :param client:  milvus服务
    :param table_name:  表名
    :return:
    """
    status = client.has_collection(collection_name=table_name)
    return status


def count_table(client, table_name):
    """
    计算表中的向量数
    :param client: milvus服务
    :param table_name: 表名
    :return:
    """
    status, num = client.count_entities(collection_name=table_name)
    return num


def get_info(client, table_name):
    """
    计算表中的向量数
    :param client: milvus服务
    :param table_name: 表名
    :return:
    """
    status, num = client.get_collection_info(collection_name=table_name)
    return num


功能测试

get_info


 print(get_info(milvus_client(), "loan_feature_1012"))

# 连接成功。。。
# CollectionSchema(collection_name='loan_feature_1012', dimension=25600, index_file_size=1024, metric_type=<MetricType: L2>)

insert


import numpy as np

table_name = "test"

feature = np.ones((2048,))

vids = insert(table_name, feature, 2048)

# 连接成功。。。
# insert into: test
# insert vectors: Status(code=0, message='Add vectors successfully!')
# Train finished
# 数据库: test, 数据总量: 3

search


import numpy as np

table_name = "test"

feature = np.ones((2048,))

vids = search(table_name, feature)

连接成功。。。
Addr [127.0.0.1:19530] search
Rpc error: <_MultiThreadedRendezvous of RPC that terminated with:
        status = StatusCode.UNAVAILABLE
        details = "Socket closed"
        debug_error_string = "{"created":"@1602658342.190640049","description":"Error received from peer ipv4:127.0.0.1:19530","file":"src/core/lib/surface/call.cc","file_line":1061,"grpc_message":"Socket closed","grpc_status":14}"
>
        {'API start': '2020-10-14 14:30:56.707296', 'RPC start': '2020-10-14 14:30:56.707487', 'RPC error': '2020-10-14 14:52:22.190905'}
Status(code=<StatusCode.UNAVAILABLE: (14, 'unavailable')>, message='Error occurred. Socket closed')

About this issue

  • Original URL
  • State: closed
  • Created 4 years ago
  • Comments: 23 (14 by maintainers)

Most upvoted comments

This is a bug of server side. I have created an issue in milvus project: https://github.com/milvus-io/milvus/issues/4031

docker run command:


docker run -d --name milvus_cpu \
    --net host \
    -v /home/dm/milvus/db:/var/lib/milvus/db \
    -v /home/dm/milvus/conf:/var/lib/milvus/conf \
    -v /home/dm/milvus/logs:/var/lib/milvus/logs \
    -v /home/dm/milvus/wal:/var/lib/milvus/wal \
    milvusdb/milvus:0.10.1-cpu-d072020-bd02b1

python code


#!/usr/bin/env python
# -*- encoding: utf-8 -*-
# File    :   utils.py
# Time    :   2020/05/27 11:05:54
# Author  :   seven 
# Desc    :   milvus管理脚本
from milvus import Milvus, IndexType, MetricType, Status
import os

MILVUS_HOST = os.getenv("MILVUS_HOST", "127.0.0.1")
MILVUS_PORT = os.getenv("MILVUS_PORT", 19530)
VECTOR_DIMENSION = os.getenv("VECTOR_DIMENSION", 2048)


def milvus_client():
    """
    连接milvus服务
    :return:
    """
    try:
        client = Milvus(host=MILVUS_HOST, port=MILVUS_PORT)
        print("连接成功。。。")
        return client
    except Exception as e:
        print(e)


def create_table(client, table_name=None, dimension=VECTOR_DIMENSION,
                 index_file_size=1024, metric_type=MetricType.L2):
    """
    创建table
    :param client: milvus服务
    :param table_name: 表名
    :param dimension: 维度
    :param index_file_size: 索引文件大小
    :param metric_type: 距离计算方式  https://www.milvus.io/cn/docs/v0.9.0/guides/metric.md
    :return:
    """
    table_param = {
        'collection_name': table_name,
        'dimension': dimension,
        'index_file_size': index_file_size,
        'metric_type': metric_type
    }
    try:
        status = client.create_collection(table_param)
        return status
    except Exception as e:
        print(e)


def insert_vectors(client, table_name, vectors):
    """
    插入向量
    :param client: milvus服务
    :param table_name: 表名
    :param vectors: 特征向量
    :return:
    """
    if not client.has_collection(collection_name=table_name):
        print("collection %s not exist", table_name)
        return
    try:
        status, ids = client.insert(collection_name=table_name, records=vectors)
        return status, ids
    except Exception as e:
        print(e)


def create_index(client, table_name):
    """
    创建索引
    :param client: milvus服务
    :param table_name: 表名
    :return:
    """
    param = {'nlist': 2048}
    status = client.create_index(table_name, IndexType.IVF_FLAT, param)
    return status


def delete_table(client, table_name):
    """
    删除表
    :param client: milvus服务
    :param table_name: 表名
    :return:
    """
    status = client.drop_collection(collection_name=table_name)
    print(status)
    return status


def search_vectors(client, table_name, vectors, top_k):
    """
    向量搜索
    :param client: milvus服务
    :param table_name: 表名
    :param vectors: 向量
    :param top_k:
    :return:
    """
    search_param = {'nprobe': 64}
    status, res = client.search(collection_name=table_name, query_records=vectors, top_k=top_k, params=search_param)
    return status, res


def has_table(client, table_name):
    """
    判断表名是否存在
    :param client:  milvus服务
    :param table_name:  表名
    :return:
    """
    status = client.has_collection(collection_name=table_name)
    return status


def count_table(client, table_name):
    """
    计算表中的向量数
    :param client: milvus服务
    :param table_name: 表名
    :return:
    """
    status, num = client.count_entities(collection_name=table_name)
    return num


def get_info(client, table_name):
    """
    计算表中的向量数
    :param client: milvus服务
    :param table_name: 表名
    :return:
    """
    status, num = client.get_collection_info(collection_name=table_name)
    return num


def search(index_client, table_name, feature, top_k=5):
    """
    向量搜索
    :param table_name: 表名
    :param img: 特征
    :param top_k: 返回数据量
    :return:
    """
    status, vectors = search_vectors(index_client, table_name, [feature.tolist()], top_k=top_k)  # 特征相似度匹配
    print(status)
    res_distance = [x.distance for x in vectors[0]]  # 获取欧氏距离
    vids = [x.id for x in vectors[0]]
    return {"distance": res_distance, "ids": vids}


def insert(index_client, table_name, vectors, dimension):
    """
    把数据提取特征并存入milvus
    :param table_name: 表名
    :param vectors: 特征
    :return:
    """
    status, ok = has_table(index_client, table_name)  # 判断特征模板库是否存在
    if not ok:
        print("create table.")
        create_table(index_client, table_name=table_name, dimension=dimension)  # 创建特征模板库
    print("insert into:", table_name)
    status, ids = insert_vectors(index_client, table_name, [vectors.tolist()])  # 往特征模板库里插入数据
    print(f"insert vectors: {status}")
    create_index(index_client, table_name)  # 创建索引
    print("Train finished")
    number = count_table(index_client, table_name) 
    print(f"数据库: {table_name}, 数据总量: {number}")
    return ids


if __name__ == '__main__':
    import numpy as np
    print(get_info(milvus_client(), "loan_feature_1012"))
    table_name = "test"
    feature = np.ones((2048,))
    vids = insert(milvus_client(), table_name, feature, 2048)
    res = search(milvus_client(),table_name, feature)
    

连接成功。。。
CollectionSchema(collection_name='loan_feature_1012', dimension=25600, index_file_size=1024, metric_type=<MetricType: L2>)
连接成功。。。
insert into: test
insert vectors: Status(code=0, message='Add vectors successfully!')
Train finished
数据库: test, 数据总量: 6
连接成功。。。

其他接口是正常的,就是搜索接口一直堵塞,没有反应!重启server容器后就正常

造成这种的现象的原因是,我以512的线程池跑大约20万图片进行搜索匹配特征,几个小时后出现任务堵塞,经过排查是search接口堵塞,但是其余接口没有问题正常访问,docker日志里没有错误日志。

So far, 600000 queries were finished, still in running, will let it run whole night.