kubernetes之client-go基础包 Indexer

目录

为什么介绍Indexer

Indexer

indexer抽象定义

Indexer与索引相关的接口

Indexer实现

cache

ThreadSafeStore

索引相关函数

cache实现

总结

参考链接


为什么介绍Indexer

如下图示:

kubernetes之client-go基础包 Indexer

可以看出Local Store 在client-go及kubernetes控制器中的重要作用。上图可看出Informer比较复杂,拆解成独立的模块进行分析。

Indexer字面上看是索引器,就是Informer的LocalStore。肯定有人会问索引和存储有啥关系,那数据库建索引不也是存储和索引建立了关系么?索引构建在存储之上,使得按照某些条件查询速度会非常快。

版本是:client-go  v11.0.0

Indexer

indexer抽象定义

// Indexer 是能够 使用多个索引函数列举对象 的 存储接口
type Indexer interface {
	Store    // 组合Store接口 ,定义在cliet-go/tool/cache/store.go中
	........
}

indexer在 store存储的基础之上,拓展了对对象查找的索引能力。说白了,其实就是通过map结构实现不通维度的资源查找。

// 代码源自client-go/tools/cache/index.go
type IndexFunc func(obj interface{}) ([]string, error) //索引函数,传入对象,输出字符串索引
type Indexers map[string]IndexFunc    // 索引函数name  对应多个索引函数,通过不同的索引函数来计算资源的字符串索引
type Indices map[string]Index         // 索引函数name   对应多个索引键   多个对象键   真正对象                          
type Index map[string]sets.String                       

何所谓索引,索引目的就是为了快速查找。例如:查找某个节点上所有的pod,对应上面的Index类型就是 map[nodename]sets.podname。查找方式可能不止nodename这一种,查找方式可通过索引函数进行计算,这就是Indexers这个类型作用了。

其实写到这里,我已经明白了这之间的关系。统一再说下:

Indexers和Indices都是按照IndexFunc(名字)分组, 每个IndexFunc输出多个IndexKey,产生相同IndexKey的多个对象存储在一个集合中。

Indexers: 索引函数名  ======>  索引函数  ======> 索引key值

Indices:   索引函数名  ======> 对应多个索引key值 ======> 每个索引key值对应 不同的资源。(可能不同的索引key值对应相同的资源对象)

Indexer与索引相关的接口

// 说实话,k8s代码自注释做的相当棒
type Indexer interface {
	Store

    // 检索 匹配命名索引函数的对象列表
    // indexName索引函数名,obj是对象,计算obj在indexName索引函数中的所有索引键,通过索引键把所有的对象取出来  主要就是获取对象索引键
	Index(indexName string, obj interface{}) ([]interface{}, error)
	
    // 索引函数indexName  中索引键为indexKey 的对象
	IndexKeys(indexName, indexKey string) ([]string, error)
	// 列举索引函数 indexName 所有的索引键
	ListIndexFuncValues(indexName string) []string
	// 返回值不是对象键,而是所有对象
	ByIndex(indexName, indexKey string) ([]interface{}, error)
	// GetIndexer return the indexers
	GetIndexers() Indexers

    // 在存储中添加更多的indexers,增加更多的索引函数
	AddIndexers(newIndexers Indexers) error
}
//client-go/tools/cache/store.go
type Store interface {
	Add(obj interface{}) error
	Update(obj interface{}) error
	Delete(obj interface{}) error
	List() []interface{}
    // 列举对象键
	ListKeys() []string
    // 返回obj相同对象键的对象,对象键是通过对象计算出来的字符串
	Get(obj interface{}) (item interface{}, exists bool, err error)
    // 通过对象键获取对象
	GetByKey(key string) (item interface{}, exists bool, err error)

    // []interface{}替换Store存储的所有对象,等同于删除全部原有对象在逐一添加新的对象
	Replace([]interface{}, string) error
    // 重新同步缓存
	Resync() error
}

对象存储对应唯一的对象键,对象键的实现需要看代码具体实现。

Indexer实现

cache

通过cache实现Indexer,所有的对象是缓存在内存中的。包内私有类型,外部无法直接使用,必须通过函数初始化。

// client-go/tools/cache/store.go
// 1. 通过 keyFunc函数 计算对象key值
// 2. 调用 ThreadSafeStorage 线程安全存储的接口 方法实现
type cache struct {
	cacheStorage ThreadSafeStore  // 线程安全的存储 读写锁实现的map
	keyFunc KeyFunc  // 确定性的key值
}

//keyfunc 对象生成键
type KeyFunc func(obj interface{}) (string, error)

// 初始化  创建是需要执指定 计算对象键的函数
// NewStore returns a Store implemented simply with a map and a lock.
func NewStore(keyFunc KeyFunc) Store {
	return &cache{
		cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}),
		keyFunc:      keyFunc,
	}
}

// NewIndexer returns an Indexer implemented simply with a map and a lock.
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
	return &cache{
		cacheStorage: NewThreadSafeStore(indexers, Indices{}),
		keyFunc:      keyFunc,
	}
}

ThreadSafeStore

可以看出来cache是通过ThreadSafeStore来实现的,而KeyFunc 仅仅是计算对象键的。

// client-go/tools/cache/thread_safe_store.go
type ThreadSafeStore interface {
	Add(key string, obj interface{})
	Update(key string, obj interface{})
	Delete(key string)
	Get(key string) (item interface{}, exists bool)
	List() []interface{}
	ListKeys() []string
	Replace(map[string]interface{}, string)
	Index(indexName string, obj interface{}) ([]interface{}, error)
	IndexKeys(indexName, indexKey string) ([]string, error)
	ListIndexFuncValues(name string) []string
	ByIndex(indexName, indexKey string) ([]interface{}, error)
	GetIndexers() Indexers

	// AddIndexers adds more indexers to this store.  If you call this after you already have data
	// in the store, the results are undefined.
	AddIndexers(newIndexers Indexers) error
	Resync() error
}

ThreadSafeStore接口和indexer接口基本是一样的,差别是存储接口需要提供键值。所以所有的实现都在这个文件中,下面将会具体分析实现方法。

// threadSafeMap implements ThreadSafeStore
type threadSafeMap struct {
	lock  sync.RWMutex          // 读写锁,增加读场景并发性
	items map[string]interface{}  // 对象键:对象

	// indexers maps a name to an IndexFunc
	indexers Indexers            // 索引函数名  索引函数  ===> 计算索引键
	// indices maps a name to an Index
	indices Indices              // 索引函数名  索引键   对象键   item快速查找对象
}

强调:索引键和对象键不一样,索引键是用于快速查找对象键的;对象键是为对象在存储中的唯一命名的,通过在存储中快速查找对象的。

// 涉及 写操作,需要加互斥锁;读操作,需要加 并发读锁
// 添加对象
func (c *threadSafeMap) Add(key string, obj interface{}) {
	c.lock.Lock()
	defer c.lock.Unlock()
    // 去除老资源
	oldObject := c.items[key]
    // key 添加新资源
	c.items[key] = obj
    // 更新索引
	c.updateIndices(oldObject, obj, key)
}
// 更新对象,貌似和添加相同
func (c *threadSafeMap) Update(key string, obj interface{}) {
	c.lock.Lock()
	defer c.lock.Unlock()
	oldObject := c.items[key]
	c.items[key] = obj
	c.updateIndices(oldObject, obj, key)
}
// 删除对象
func (c *threadSafeMap) Delete(key string) {
	c.lock.Lock()
	defer c.lock.Unlock()
	if obj, exists := c.items[key]; exists {
        // 删除索引
		c.deleteFromIndices(obj, key)
        // 删除对象
		delete(c.items, key)
	}
}
// 获取对象
func (c *threadSafeMap) Get(key string) (item interface{}, exists bool) {
	c.lock.RLock()
	defer c.lock.RUnlock()
	item, exists = c.items[key]
	return item, exists
}
// 列举对象
func (c *threadSafeMap) List() []interface{} {
	c.lock.RLock()
	defer c.lock.RUnlock()
	list := make([]interface{}, 0, len(c.items))
	for _, item := range c.items {
		list = append(list, item)
	}
	return list
}

// 列举所有对象key值列表
func (c *threadSafeMap) ListKeys() []string {
	c.lock.RLock()
	defer c.lock.RUnlock()
	list := make([]string, 0, len(c.items))
	for key := range c.items {
		list = append(list, key)
	}
	return list
}
// 替换存储中所有的对象
func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion string) {
	c.lock.Lock()
	defer c.lock.Unlock()
	c.items = items

	// rebuild any index
    // 重新生成新的索引
 	c.indices = Indices{}
	for key, item := range c.items {
		c.updateIndices(nil, item, key)
	}
}

索引相关函数

下面重点介绍的是索引相关的函数。

// Index returns a list of items that match on the index function
// Index is thread-safe so long as you treat all items as immutable
// 通过指定的索引函数计算对象的索引键,索引键 === > 对象键  ===>  取出所有的对象
func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{}, error) {
	c.lock.RLock()
	defer c.lock.RUnlock()
    // 通过 索引函数名 indexName 获取 索引函数
	indexFunc := c.indexers[indexName]
	if indexFunc == nil {
		return nil, fmt.Errorf("Index with name %s does not exist", indexName)
	}
    // 获取该对象的所有 索引键
	indexKeys, err := indexFunc(obj)
	if err != nil {
		return nil, err
	}
    // 通过 索引函数名 indexName 获取 索引键对应的所有对象键的map结构
	index := c.indices[indexName]

	var returnKeySet sets.String
	if len(indexKeys) == 1 {
		// In majority of cases, there is exactly one value matching.
		// Optimize the most common path - deduping is not needed here.
		returnKeySet = index[indexKeys[0]]
	} else {
		// Need to de-dupe the return list.
		// Since multiple keys are allowed, this can happen.
		returnKeySet = sets.String{}
		for _, indexKey := range indexKeys {
            // 对象键
			for key := range index[indexKey] {
				returnKeySet.Insert(key)
			}
		}
	}
    // 通过对象键获取所有的对象
	list := make([]interface{}, 0, returnKeySet.Len())
	for absoluteKey := range returnKeySet {
		list = append(list, c.items[absoluteKey])
	}
	return list, nil
}
// 索引键获取对象列表
func (c *threadSafeMap) ByIndex(indexName, indexKey string) ([]interface{}, error) {
	c.lock.RLock()
	defer c.lock.RUnlock()
    // 索引函数
	indexFunc := c.indexers[indexName]
	if indexFunc == nil {
		return nil, fmt.Errorf("Index with name %s does not exist", indexName)
	}
    // 索引键
	index := c.indices[indexName]
    // 对象键
	set := index[indexKey]
	list := make([]interface{}, 0, set.Len())
	for key := range set {
		list = append(list, c.items[key])
	}

	return list, nil
}
// 获取索引键对应的所有对象键
func (c *threadSafeMap) IndexKeys(indexName, indexKey string) ([]string, error) {
	c.lock.RLock()
	defer c.lock.RUnlock()

	indexFunc := c.indexers[indexName]
	if indexFunc == nil {
		return nil, fmt.Errorf("Index with name %s does not exist", indexName)
	}

	index := c.indices[indexName]

	set := index[indexKey]
	return set.List(), nil
}
// 获取索引函数 下的所有 索引键的键值
func (c *threadSafeMap) ListIndexFuncValues(indexName string) []string {
	c.lock.RLock()
	defer c.lock.RUnlock()

	index := c.indices[indexName]
	names := make([]string, 0, len(index))
	for key := range index {
		names = append(names, key)
	}
	return names
}

更新索引和移除索引函数

// deleteFromIndices removes the object from each of the managed indexes
// it is intended to be called from a function that already has a lock on the cache
func (c *threadSafeMap) deleteFromIndices(obj interface{}, key string) {
	for name, indexFunc := range c.indexers {
        // 索引函数 ==> 对象 ==> 索引值
		indexValues, err := indexFunc(obj)
		if err != nil {
			panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
		}
        // 索引值map
		index := c.indices[name]
		if index == nil {
			continue
		}
		for _, indexValue := range indexValues {
            // 对象键
			set := index[indexValue]
			if set != nil {
                // 删除对象键
				set.Delete(key)
			}
		}
	}
}
// 对象添加或者更新时,需要更新索引,因为引用该私有函数的函数已经加锁
func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) {
    // 移除旧对象的索引键
	if oldObj != nil {
		c.deleteFromIndices(oldObj, key)
	}
	for name, indexFunc := range c.indexers {
        // 索引键
		indexValues, err := indexFunc(newObj)
		if err != nil {
			panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
		}
        // 索引函数  ===> 索引键
		index := c.indices[name]
		if index == nil {
			index = Index{}
			c.indices[name] = index
		}

		for _, indexValue := range indexValues {
			set := index[indexValue]
			if set == nil {
				set = sets.String{}
				index[indexValue] = set
			}
            // 对象键
			set.Insert(key)
		}
	}
}

cache实现

cache的实现主要是调用 threadSafeMap 的方法进行实现,这里就不进行过多介绍。

总结

  • 搞明白 索引之间的关系很重要,后面的代码看起来就跟搭积木差不多。   
  • 第一层关系: 索引函数名称   === > 索引函数  === > 对象  ===> 计算出索引键
  • 第二层关系: 索引函数名称   === > 索引键     === > 对象键   ===> 对象

参考链接

https://blog.csdn.net/weixin_42663840/article/details/81530606

https://github.com/kubernetes/client-go/tree/master/tools/cache