badger: Discard invalid versions of keys during compaction

I’m hoping this is a configuration related issue but I’ve played around with the settings and I keep getting the same behavior. Tested on an i3.4XL in AWS, raid0 on the two SSD drives.

Expected behavior of the code below:

  • keys/data are stored for 1hr, after a few hours the badger directory should stay fairly constant as you write/expire keys
  • I would expect to see sst files written and multiple size levels each level a larger file size
  • memory should stay fairly consistent

Actual behavior seen:

  • OOM’s after 12 hours
  • all sst files at 67MB (thousands of them)
  • disk fills up on a 4TB drive, no data appears to ttl out
  • file counts steadily increase until oom (there’s no leveling off)
  • every hour the process stalls (assuming the stall count is being hit according to profiler)

Please advise of what is wrong in the code below, thanks!

3HRs of runtime you can see just linear growth https://imgur.com/a/2UUfIrG

UPDATE: I’ve also tried with these settings and memory doesn’t grow as fast but it linearly climbs until OOM as well and the same behavior as above

dir := "/raid0/badgertest"
opts := badger.DefaultOptions
opts.Dir = dir
opts.ValueDir = dir
opts.TableLoadingMode = options.FileIO
opts.SyncWrites = false
db, err := badger.Open(opts)
package usecases

import (
	"github.com/dgraph-io/badger"
	"github.com/dgraph-io/badger/options"
	"time"
	"fmt"
	"encoding/binary"
	"github.com/spaolacci/murmur3"
	"path/filepath"
	"os"
	"github.com/Sirupsen/logrus"
)

type writable struct {
	key   []byte
	value []byte
}


type BadgerTest struct {
	db *badger.DB
}

func NewBadgerTest() *BadgerTest {

	dir := "/raid0/badgertest"
	opts := badger.DefaultOptions
	opts.Dir = dir
	opts.ValueDir = dir
	opts.TableLoadingMode = options.MemoryMap
	opts.NumCompactors = 1
	opts.NumLevelZeroTables = 20
	opts.NumLevelZeroTablesStall = 50
	opts.SyncWrites = false
	db, err := badger.Open(opts)
	if err != nil {
		panic(fmt.Sprintf("unable to open badger db; %s", err))
	}
	bt := &BadgerTest{
		db: db,
	}

	go bt.filecounts(dir)
	return bt

}

func (b *BadgerTest) Start() {

	workers := 4
	for i := 0; i < workers; i++ {
		go b.write()
	}
	go b.badgerGC()

}
func (b *BadgerTest) Stop() {

	b.db.Close()
	logrus.Infof("shut down badger test")
	time.Sleep(1 * time.Second)
}

func (b *BadgerTest) filecounts(dir string) {

	ticker := time.NewTicker(60 * time.Second)
	for _ = range ticker.C {

		logFiles := int64(0)
		sstFiles := int64(0)
		_ = filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {

			if filepath.Ext(path) == ".sst" {
				sstFiles++
			}
			if filepath.Ext(path) == ".vlog" {
				logFiles++
			}
			return nil
		})


		logrus.Infof("updated gauges vlog=%d sst=%d", logFiles, sstFiles)

	}

}

func (b *BadgerTest) write() {

	data := `{"randomstring":"6446D58D6DFAFD58586D3EA85A53F4A6B3CC057F933A22BB58E188A74AC8F663","refID":12495494,"testfield1234":"foo bar baz","date":"2018-01-01 12:00:00"}`
	batchSize := 20000
	rows := []writable{}
	var cnt uint64
	for {
		cnt++
		ts := time.Now().UnixNano()
		buf := make([]byte, 24)
		offset := 0
		binary.BigEndian.PutUint64(buf[offset:], uint64(ts))
		offset = offset + 8
		key := fmt.Sprintf("%d%d", ts, cnt)
		mkey := murmur3.Sum64([]byte(key))
		binary.BigEndian.PutUint64(buf[offset:], mkey)

		offset = offset + 8
		binary.BigEndian.PutUint64(buf[offset:], cnt)

		w := writable{key: buf, value: []byte(data)}
		rows = append(rows, w)
		if len(rows) > batchSize {
			b.saveRows(rows)
			rows = []writable{}
		}
	}

}

func (b *BadgerTest) saveRows(rows []writable) {
	ttl := 1 * time.Hour

	_ = b.db.Update(func(txn *badger.Txn) error {
		var err error
		for _, row := range rows {
			testMsgMeter.Mark(1)
			if err := txn.SetWithTTL(row.key, row.value, ttl); err == badger.ErrTxnTooBig {
				logrus.Infof("TX too big, committing...")
				_ = txn.Commit(nil)
				txn = b.db.NewTransaction(true)
				err = txn.SetWithTTL(row.key, row.value, ttl)
			}
		}
		return err
	})
}

func (b *BadgerTest) badgerGC() {

	ticker := time.NewTicker(1 * time.Minute)
	for {
		select {
		case <-ticker.C:
			logrus.Infof("CLEANUP starting to purge keys %s", time.Now())
			err := b.db.PurgeOlderVersions()
			if err != nil {
				logrus.Errorf("badgerOps unable to purge older versions; %s", err)
			}
			err = b.db.RunValueLogGC(0.5)
			if err != nil {
				logrus.Errorf("badgerOps unable to RunValueLogGC; %s", err)
			}
			logrus.Infof("CLEANUP purge complete %s", time.Now())
		}
	}
}


About this issue

  • Original URL
  • State: closed
  • Created 6 years ago
  • Comments: 26 (26 by maintainers)

Commits related to this issue

Most upvoted comments

boom! 12+ hour soak test looked good. Memory remained fairly constant and disk space was fairly constant. I may have a couple other things I’ll fire off in new tickets. Thanks again Manish, this is a big win!

image

Submitted as e597fb7. Based on my testing, this PR works. Would be good to hear from your end as well, considering you’re running tests for a lot longer.

Btw, modified your code a bit to allow ctrl+c to be captured to close the DB gracefully, and a couple other changes. You can see my version here: https://gist.github.com/manishrjain/137b9849f2672a5fc3a106815d9d41d7

Keys aren’t really deleted from the LSM tree, they’re only tombstoned (marked as deleted). I’ll see if we can have a way by which the keys can be removed too, but that’d be beyond the scope of this issue.