greptimedb: [bug] Possible memory leak when ingesting Nginx access logs generated by Vector at 100 QPS whether the data is stored as metrics or logs

What type of bug is this?

Performance issue

What subsystems are affected?

Standalone mode

Minimal reproduce step

func BulkInsert(data []Message) error {
	logs.Info(" log lens", len(data))
	tx, err := db.Begin()
	if err != nil {
		log.Println("Error beginning transaction:", err)
		return err
	}
	defer tx.Rollback()

	stmt, err := tx.Prepare("INSERT INTO log_test (host, datetime, method, request, protocol, bytes, referer) VALUES (?, ?, ?, ?, ?, ?, ?)")
	if err != nil {
		log.Println("Error preparing statement:", err)
		return err
	}
	defer stmt.Close()

	// 批量值插入
	values := []interface{}{}
	for _, row := range data {
		values = append(values, row.Host, row.Datetime, row.Method, row.Request, row.Protocol, row.Bytes, row.Referer)
	}

	query := "INSERT INTO log_test (host, datetime, method, request, protocol, bytes, referer) VALUES "
	placeholders := "(?, ?, ?, ?, ?, ?, ?)"
	queryValues := []string{}

	for i := 0; i < len(data); i++ {
		queryValues = append(queryValues, placeholders)
	}

	query += fmt.Sprintf("%s", strings.Join(queryValues, ","))

	_, err = tx.Exec(query, values...)
	if err != nil {
		log.Println("Error executing bulk insert:", err)
		return err
	}

	err = tx.Commit()
	if err != nil {
		log.Println("Error committing transaction:", err)
		return err
	}

	return nil
}

What did you expect to see?

内存应该文档在一个区间,不会一直增长,一直增长可能运行不了几小时,内存就爆了

What did you see instead?

几十分钟,内存一直从开始几十M,涨到3G多,一直增长不会停止。 docker-compose down 把greptime 停了,cpu 会降下,但是内存不会降下来。 插入性能比mysql还差很多,远远低于postgres

What operating system did you use?

Ubuntu 18.04

Relevant log output and stack trace

似乎有内存泄漏,vector 生成日志,qps 100/s   sql 匹配插入, 内存一直增长

About this issue

  • Original URL
  • State: closed
  • Created 6 months ago
  • Comments: 23 (8 by maintainers)

Most upvoted comments

CREATE TABLE IF NOT EXISTS "log" (
  "host" STRING NULL,
  "ts" TIMESTAMP(3) NOT NULL DEFAULT current_timestamp(),
  "func" STRING NULL,
  "log_level" STRING NULL,
  "message" STRING NULL,
  "service" STRING NULL,
  TIME INDEX ("ts"),
  PRIMARY KEY ("host")
)
package main

import (
	"context"
	"database/sql"
	"github.com/astaxie/beego/logs"
	"github.com/didi/gendry/manager"
	"strings"

	"fmt"

	"github.com/gin-gonic/gin"
	_ "github.com/go-sql-driver/mysql"
	"time"
)

var (
	db *sql.DB
	ch = make(chan *Message,1000)
)

func InitDB() {
	var (
		err error
	)
	db, err = manager.New("public", "", "", "192.168.0.220").Set(
		manager.SetCharset("utf8"),
		manager.SetAllowCleartextPasswords(true),
		manager.SetInterpolateParams(true),
		manager.SetTimeout(30*time.Second),
		manager.SetReadTimeout(30*time.Second),
	).Port(4002).Open(true)
	if err != nil {
		panic(err)
	}

}

type Message struct {
	Host string
	Datetime string
	Method string
	Request string
	Protocol string
	Bytes int
	Referer string
}

type Response struct {
	Msg    string      `json:"msg"`
	Status int         `json:"status"`
	Data   interface{} `json:"data"`
}

func BulkInsert(data []*Message) error {
	logs.Info(" log lens", len(data))
	tx, err := db.Begin()
	if err != nil {
		fmt.Println("Error beginning transaction:", err)
		return err
	}
	defer tx.Rollback()

	stmt, err := tx.Prepare("INSERT INTO log (host, datetime, method, request, protocol, bytes, referer) VALUES (?, ?, ?, ?, ?, ?, ?)")
	if err != nil {
		fmt.Println("Error preparing statement:", err)
		return err
	}
	defer stmt.Close()

	// 批量值插入
	values := []interface{}{}
	for _, row := range data {
		values = append(values, row.Host, row.Datetime, row.Method, row.Request, row.Protocol, row.Bytes, row.Referer)
	}

	query := "INSERT INTO log (host, datetime, method, request, protocol, bytes, referer) VALUES "
	placeholders := "(?, ?, ?, ?, ?, ?, ?)"
	queryValues := []string{}

	for i := 0; i < len(data); i++ {
		queryValues = append(queryValues, placeholders)
	}

	query += fmt.Sprintf("%s", strings.Join(queryValues, ","))

	_, err = tx.Exec(query, values...)
	if err != nil {
		fmt.Println("Error executing bulk insert:", err)
		return err
	}

	err = tx.Commit()
	if err != nil {
		fmt.Println("Error committing transaction:", err)
		return err
	}

	return nil
}

func BatchTicker[T any](ctx context.Context, duration time.Duration, ch chan T, batchSize int, fn func([]T)) {
	ticker := time.NewTicker(duration)
	defer ticker.Stop()
	var batch = make([]T, 0, batchSize)
	for {
		select {
		case <-ctx.Done():
			if len(batch) > 0 {
				fn(batch)
			}
			return
		case v, ok := <-ch:
			if !ok { // closed
				fn(batch)
				return
			}
			batch = append(batch, v)
			if len(batch) == batchSize { // full
				fn(batch)
				batch = make([]T, 0, batchSize) // reset
			}
		case <-ticker.C:
			if len(batch) > 0 {
				fn(batch)
				batch = make([]T, 0, batchSize) // reset
			}
		}
	}
}
func main() {

	InitDB()
	r := gin.New()
	r.Use(gin.Recovery())
	r.Use(gin.Logger())
	go BatchTicker(context.Background(), 2*time.Second, ch, 200, func(data []*Message) {
		err := BulkInsert(data)
		if err != nil {
			logs.Error(err)
		}
	})
	r.POST("/receive", func(c *gin.Context) {


		list := make([]*Message, 0)
		err := c.BindJSON(&list)
		if err != nil {
			fmt.Println(err.Error())
			c.JSON(400, Response{
				Msg:    "invalid request payload",
				Status: -1,
			})
			return
		}
		for _, request := range list {
			ch <- request
		}

		if err != nil {
			fmt.Println("insert error", err.Error())
			c.JSON(400, Response{
				Msg:    "insert error",
				Status: -1,
			})
			return
		}

		c.JSON(200, gin.H{})
	})
	r.Run(":9100")
}

所有优化手段都用上了性能实在太差,qps 100/s 每2秒插入100条。go原生database/sql 40分钟,就是内存80M上涨到4.5G, 内存每秒上涨几M 4个月了,性能没有任何优化,有些失望,之前就提过一个issues,测评单机插入和查询性能远远低于mysql,比pg差距更远, 内存高于mysql. 这次又在公司推希望用上,就是看中greptime 支持sql 和promeQL 能同时存log 和metrics 方面统一查询. 希望好好优化下内存和查询速度问题,不优化下,根本没法用