每个 series 都是 16 byte 对齐的, 使得 series 开始处的偏移量能够被 16 整除. 因此,将 series 的 id 设置为 offset/16, offset 指向 series 的开头. 每当要访问某个 series 时, 可以直接通过 id * 16 来获取在 index 中的位置.
Label Offset Table & Label Index i
这两个是耦合的, 因此应该放在一起介绍.
但是, 目前这两个已经不再使用, 只是为了向后兼容而编写的. 所以本文暂且没有介绍.
Postings Offset Table & Postings i
Postings i
posting 其实就是 series id. (之所以叫 posting 其实是因为在倒排索引的 “世界” 里, 文档 id 常被成为 posting, 而在当前的场景下, 一个 series 可以被视为一个文档, 因此把 series id 当做 posting.) 单个 posting 其实代表了一个 posting list, 其格式如下所示:
// Querier returns a new querier over the data partition for the given time range. func (db *DB) Querier(_ context.Context, mint, maxt int64) (storage.Querier, error) { var blocks []BlockReader
db.mtx.RLock() defer db.mtx.RUnlock()
for _, b := range db.blocks { if b.OverlapsClosedInterval(mint, maxt) { blocks = append(blocks, b) } } if maxt >= db.head.MinTime() { blocks = append(blocks, NewRangeHead(db.head, mint, maxt)) }
blockQueriers := make([]storage.Querier, 0, len(blocks)) for _, b := range blocks { q, err := NewBlockQuerier(b, mint, maxt) //为 block 创建 NewBlockQuerier, 构造函数见下一个代码块 if err == nil { blockQueriers = append(blockQueriers, q) continue } // If we fail, all previously opened queriers must be closed. for _, q := range blockQueriers { // TODO(bwplotka): Handle error. _ = q.Close() } return nil, errors.Wrapf(err, "open querier for block %s", b) } return storage.NewMergeQuerier(blockQueriers, nil, storage.ChainedSeriesMerge), nil }
1 2 3 4 5 6 7 8
// NewBlockQuerier returns a querier against the block reader and requested min and max time range. func NewBlockQuerier(b BlockReader, mint, maxt int64) (storage.Querier, error) { q, err := newBlockBaseQuerier(b, mint, maxt) // newBlockBaseQuerier 构造函数见下一个代码块 if err != nil { return nil, err } return &blockQuerier{blockBaseQuerier: q}, nil }
// Querier provides querying access over time series data of a fixed time range. type Querier interface { LabelQuerier
// Select returns a set of series that matches the given label matchers. // Caller can specify if it requires returned series to be sorted. Prefer not requiring sorting for better performance. // It allows passing hints that can help in optimising select, but it's up to implementation how this is used if used at all. Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) SeriesSet }
func (q *blockQuerier) Select(sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.SeriesSet { mint := q.mint maxt := q.maxt p, err := PostingsForMatchers(q.index, ms...) // 在这一步即获得了 postings, 即 series ids, 见下一个代码块 if err != nil { return storage.ErrSeriesSet(err) } if sortSeries { p = q.index.SortedPostings(p) }
if hints != nil { mint = hints.Start maxt = hints.End if hints.Func == "series" { // When you're only looking up metadata (for example series API), you don't need to load any chunks. return newBlockSeriesSet(q.index, newNopChunkReader(), q.tombstones, p, mint, maxt) } }
// PostingsForMatchers assembles a single postings iterator against the index reader // based on the given matchers. The resulting postings are not ordered by series. func PostingsForMatchers(ix IndexReader, ms ...*labels.Matcher) (index.Postings, error) { var its, notIts []index.Postings // See which label must be non-empty. // Optimization for case like {l=~".", l!="1"}. labelMustBeSet := make(map[string]bool, len(ms)) for _, m := range ms { if !m.Matches("") { labelMustBeSet[m.Name] = true } }
for _, m := range ms { if labelMustBeSet[m.Name] { // If this matcher must be non-empty, we can be smarter. matchesEmpty := m.Matches("") isNot := m.Type == labels.MatchNotEqual || m.Type == labels.MatchNotRegexp if isNot && matchesEmpty { // l!="foo" // If the label can't be empty and is a Not and the inner matcher // doesn't match empty, then subtract it out at the end. inverse, err := m.Inverse() if err != nil { return nil, err }
it, err := postingsForMatcher(ix, inverse) //见下一个代码块 if err != nil { return nil, err } notIts = append(notIts, it) } else if isNot && !matchesEmpty { // l!="" // If the label can't be empty and is a Not, but the inner matcher can // be empty we need to use inversePostingsForMatcher. inverse, err := m.Inverse() if err != nil { return nil, err }
it, err := inversePostingsForMatcher(ix, inverse) //见下下个代码块 if err != nil { return nil, err } its = append(its, it) } else { // l="a" // Non-Not matcher, use normal postingsForMatcher. it, err := postingsForMatcher(ix, m) //见下一个代码块 if err != nil { return nil, err } its = append(its, it) } } else { // l="" // If the matchers for a labelname selects an empty value, it selects all // the series which don't have the label name set too. See: // https://github.com/prometheus/prometheus/issues/3575 and // https://github.com/prometheus/prometheus/pull/3578#issuecomment-351653555 it, err := inversePostingsForMatcher(ix, m) //见下下个代码块 if err != nil { return nil, err } notIts = append(notIts, it) } }
// If there's nothing to subtract from, add in everything and remove the notIts later. if len(its) == 0 && len(notIts) != 0 { k, v := index.AllPostingsKey() allPostings, err := ix.Postings(k, v) //最终都会调用 Postings 方法 if err != nil { return nil, err } its = append(its, allPostings) }
it := index.Intersect(its...)
for _, n := range notIts { it = index.Without(it, n) }
var res []string lastVal, isSorted := "", true for _, val := range vals { if m.Matches(val) { res = append(res, val) if isSorted && val < lastVal { isSorted = false } lastVal = val } }
if len(res) == 0 { return index.EmptyPostings(), nil }
if !isSorted { sort.Strings(res) } return ix.Postings(m.Name, res...) }
// inversePostingsForMatcher returns the postings for the series with the label name set but not matching the matcher. func inversePostingsForMatcher(ix IndexReader, m *labels.Matcher) (index.Postings, error) { vals, err := ix.LabelValues(m.Name) if err != nil { return nil, err }
var res []string lastVal, isSorted := "", true for _, val := range vals { if !m.Matches(val) { res = append(res, val) if isSorted && val < lastVal { isSorted = false } lastVal = val } }
if !isSorted { sort.Strings(res) } return ix.Postings(m.Name, res...) }
// Postings returns the postings list iterator for the label pairs. func (h *headIndexReader) Postings(name string, values ...string) (index.Postings, error) { res := make([]index.Postings, 0, len(values)) for _, value := range values { res = append(res, h.head.postings.Get(name, value)) //直接通过 postings.Get 获得 } return index.Merge(res...), nil }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
// Get returns a postings list for the given label pair. func (p *MemPostings) Get(name, value string) Postings { var lp []uint64 p.mtx.RLock() l := p.m[name] if l != nil { lp = l[value] } p.mtx.RUnlock()
// MemPostings holds postings list for series ID per label pair. They may be written // to out of order. // ensureOrder() must be called once before any reads are done. This allows for quick // unordered batch fills on startup. type MemPostings struct { mtx sync.RWMutex m map[string]map[string][]uint64 ordered bool }
// Discard values before the start. valueIndex++ } for valueIndex < len(values) { value := values[valueIndex]
//二分查找 i := sort.Search(len(e), func(i int) bool { return e[i].value >= value }) if i == len(e) { // We're past the end. break } if i > 0 && e[i].value != value { // Need to look from previous entry. i-- } // Don't Crc32 the entire postings offset table, this is very slow // so hope any issues were caught at startup. d := encoding.NewDecbufAt(r.b, int(r.toc.PostingsTable), nil) d.Skip(e[i].off)
// Iterate on the offset table. var postingsOff uint64 // The offset into the postings table. for d.Err() == nil { if skip == 0 { // These are always the same number of bytes, // and it's faster to skip than parse. skip = d.Len() d.Uvarint() // Keycount. d.UvarintBytes() // Label name. skip -= d.Len() } else { d.Skip(skip) } v := d.UvarintBytes() // Label value. postingsOff = d.Uvarint64() // Offset. for string(v) >= value { if string(v) == value { // 如果标签值匹配上了 // Read from the postings table. d2 := encoding.NewDecbufAt(r.b, int(postingsOff), castagnoliTable) _, p, err := r.dec.Postings(d2.Get()) // 解码 if err != nil { return nil, errors.Wrap(err, "decode postings") } res = append(res, p) // postings 加入 result list } valueIndex++ if valueIndex == len(values) { break } value = values[valueIndex] } if i+1 == len(e) || value >= e[i+1].value || valueIndex == len(values) { // Need to go to a later postings offset entry, if there is one. break } } if d.Err() != nil { return nil, errors.Wrap(d.Err(), "get postings offset entry") } }
// Close that releases the underlying resources of the byte slice. c io.Closer
// Map of LabelName to a list of some LabelValues's position in the offset table. // The first and last values for each name are always present. postings map[string][]postingOffset // For the v1 format, labelname -> labelvalue -> offset. postingsV1 map[string]map[string]uint64
symbols *Symbols nameSymbols map[uint32]string // Cache of the label name symbol lookups, // as there are not many and they are half of all lookups.