kubernetes之client-go基础包 Indexer
目录
为什么介绍Indexer
如下图示:
可以看出Local Store 在client-go及kubernetes控制器中的重要作用。上图可看出Informer比较复杂,拆解成独立的模块进行分析。
Indexer字面上看是索引器,就是Informer的LocalStore。肯定有人会问索引和存储有啥关系,那数据库建索引不也是存储和索引建立了关系么?索引构建在存储之上,使得按照某些条件查询速度会非常快。
版本是:client-go v11.0.0
Indexer
indexer抽象定义
// Indexer 是能够 使用多个索引函数列举对象 的 存储接口
type Indexer interface {
Store // 组合Store接口 ,定义在cliet-go/tool/cache/store.go中
........
}
indexer在 store存储的基础之上,拓展了对对象查找的索引能力。说白了,其实就是通过map结构实现不通维度的资源查找。
// 代码源自client-go/tools/cache/index.go
type IndexFunc func(obj interface{}) ([]string, error) //索引函数,传入对象,输出字符串索引
type Indexers map[string]IndexFunc // 索引函数name 对应多个索引函数,通过不同的索引函数来计算资源的字符串索引
type Indices map[string]Index // 索引函数name 对应多个索引键 多个对象键 真正对象
type Index map[string]sets.String
何所谓索引,索引目的就是为了快速查找。例如:查找某个节点上所有的pod,对应上面的Index类型就是 map[nodename]sets.podname。查找方式可能不止nodename这一种,查找方式可通过索引函数进行计算,这就是Indexers这个类型作用了。
其实写到这里,我已经明白了这之间的关系。统一再说下:
Indexers和Indices都是按照IndexFunc(名字)分组, 每个IndexFunc输出多个IndexKey,产生相同IndexKey的多个对象存储在一个集合中。
Indexers: 索引函数名 ======> 索引函数 ======> 索引key值
Indices: 索引函数名 ======> 对应多个索引key值 ======> 每个索引key值对应 不同的资源。(可能不同的索引key值对应相同的资源对象)
Indexer与索引相关的接口
// 说实话,k8s代码自注释做的相当棒
type Indexer interface {
Store
// 检索 匹配命名索引函数的对象列表
// indexName索引函数名,obj是对象,计算obj在indexName索引函数中的所有索引键,通过索引键把所有的对象取出来 主要就是获取对象索引键
Index(indexName string, obj interface{}) ([]interface{}, error)
// 索引函数indexName 中索引键为indexKey 的对象
IndexKeys(indexName, indexKey string) ([]string, error)
// 列举索引函数 indexName 所有的索引键
ListIndexFuncValues(indexName string) []string
// 返回值不是对象键,而是所有对象
ByIndex(indexName, indexKey string) ([]interface{}, error)
// GetIndexer return the indexers
GetIndexers() Indexers
// 在存储中添加更多的indexers,增加更多的索引函数
AddIndexers(newIndexers Indexers) error
}
//client-go/tools/cache/store.go
type Store interface {
Add(obj interface{}) error
Update(obj interface{}) error
Delete(obj interface{}) error
List() []interface{}
// 列举对象键
ListKeys() []string
// 返回obj相同对象键的对象,对象键是通过对象计算出来的字符串
Get(obj interface{}) (item interface{}, exists bool, err error)
// 通过对象键获取对象
GetByKey(key string) (item interface{}, exists bool, err error)
// []interface{}替换Store存储的所有对象,等同于删除全部原有对象在逐一添加新的对象
Replace([]interface{}, string) error
// 重新同步缓存
Resync() error
}
对象存储对应唯一的对象键,对象键的实现需要看代码具体实现。
Indexer实现
cache
通过cache实现Indexer,所有的对象是缓存在内存中的。包内私有类型,外部无法直接使用,必须通过函数初始化。
// client-go/tools/cache/store.go
// 1. 通过 keyFunc函数 计算对象key值
// 2. 调用 ThreadSafeStorage 线程安全存储的接口 方法实现
type cache struct {
cacheStorage ThreadSafeStore // 线程安全的存储 读写锁实现的map
keyFunc KeyFunc // 确定性的key值
}
//keyfunc 对象生成键
type KeyFunc func(obj interface{}) (string, error)
// 初始化 创建是需要执指定 计算对象键的函数
// NewStore returns a Store implemented simply with a map and a lock.
func NewStore(keyFunc KeyFunc) Store {
return &cache{
cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}),
keyFunc: keyFunc,
}
}
// NewIndexer returns an Indexer implemented simply with a map and a lock.
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
return &cache{
cacheStorage: NewThreadSafeStore(indexers, Indices{}),
keyFunc: keyFunc,
}
}
ThreadSafeStore
可以看出来cache是通过ThreadSafeStore来实现的,而KeyFunc 仅仅是计算对象键的。
// client-go/tools/cache/thread_safe_store.go
type ThreadSafeStore interface {
Add(key string, obj interface{})
Update(key string, obj interface{})
Delete(key string)
Get(key string) (item interface{}, exists bool)
List() []interface{}
ListKeys() []string
Replace(map[string]interface{}, string)
Index(indexName string, obj interface{}) ([]interface{}, error)
IndexKeys(indexName, indexKey string) ([]string, error)
ListIndexFuncValues(name string) []string
ByIndex(indexName, indexKey string) ([]interface{}, error)
GetIndexers() Indexers
// AddIndexers adds more indexers to this store. If you call this after you already have data
// in the store, the results are undefined.
AddIndexers(newIndexers Indexers) error
Resync() error
}
ThreadSafeStore接口和indexer接口基本是一样的,差别是存储接口需要提供键值。所以所有的实现都在这个文件中,下面将会具体分析实现方法。
// threadSafeMap implements ThreadSafeStore
type threadSafeMap struct {
lock sync.RWMutex // 读写锁,增加读场景并发性
items map[string]interface{} // 对象键:对象
// indexers maps a name to an IndexFunc
indexers Indexers // 索引函数名 索引函数 ===> 计算索引键
// indices maps a name to an Index
indices Indices // 索引函数名 索引键 对象键 item快速查找对象
}
强调:索引键和对象键不一样,索引键是用于快速查找对象键的;对象键是为对象在存储中的唯一命名的,通过在存储中快速查找对象的。
// 涉及 写操作,需要加互斥锁;读操作,需要加 并发读锁
// 添加对象
func (c *threadSafeMap) Add(key string, obj interface{}) {
c.lock.Lock()
defer c.lock.Unlock()
// 去除老资源
oldObject := c.items[key]
// key 添加新资源
c.items[key] = obj
// 更新索引
c.updateIndices(oldObject, obj, key)
}
// 更新对象,貌似和添加相同
func (c *threadSafeMap) Update(key string, obj interface{}) {
c.lock.Lock()
defer c.lock.Unlock()
oldObject := c.items[key]
c.items[key] = obj
c.updateIndices(oldObject, obj, key)
}
// 删除对象
func (c *threadSafeMap) Delete(key string) {
c.lock.Lock()
defer c.lock.Unlock()
if obj, exists := c.items[key]; exists {
// 删除索引
c.deleteFromIndices(obj, key)
// 删除对象
delete(c.items, key)
}
}
// 获取对象
func (c *threadSafeMap) Get(key string) (item interface{}, exists bool) {
c.lock.RLock()
defer c.lock.RUnlock()
item, exists = c.items[key]
return item, exists
}
// 列举对象
func (c *threadSafeMap) List() []interface{} {
c.lock.RLock()
defer c.lock.RUnlock()
list := make([]interface{}, 0, len(c.items))
for _, item := range c.items {
list = append(list, item)
}
return list
}
// 列举所有对象key值列表
func (c *threadSafeMap) ListKeys() []string {
c.lock.RLock()
defer c.lock.RUnlock()
list := make([]string, 0, len(c.items))
for key := range c.items {
list = append(list, key)
}
return list
}
// 替换存储中所有的对象
func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion string) {
c.lock.Lock()
defer c.lock.Unlock()
c.items = items
// rebuild any index
// 重新生成新的索引
c.indices = Indices{}
for key, item := range c.items {
c.updateIndices(nil, item, key)
}
}
索引相关函数
下面重点介绍的是索引相关的函数。
// Index returns a list of items that match on the index function
// Index is thread-safe so long as you treat all items as immutable
// 通过指定的索引函数计算对象的索引键,索引键 === > 对象键 ===> 取出所有的对象
func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{}, error) {
c.lock.RLock()
defer c.lock.RUnlock()
// 通过 索引函数名 indexName 获取 索引函数
indexFunc := c.indexers[indexName]
if indexFunc == nil {
return nil, fmt.Errorf("Index with name %s does not exist", indexName)
}
// 获取该对象的所有 索引键
indexKeys, err := indexFunc(obj)
if err != nil {
return nil, err
}
// 通过 索引函数名 indexName 获取 索引键对应的所有对象键的map结构
index := c.indices[indexName]
var returnKeySet sets.String
if len(indexKeys) == 1 {
// In majority of cases, there is exactly one value matching.
// Optimize the most common path - deduping is not needed here.
returnKeySet = index[indexKeys[0]]
} else {
// Need to de-dupe the return list.
// Since multiple keys are allowed, this can happen.
returnKeySet = sets.String{}
for _, indexKey := range indexKeys {
// 对象键
for key := range index[indexKey] {
returnKeySet.Insert(key)
}
}
}
// 通过对象键获取所有的对象
list := make([]interface{}, 0, returnKeySet.Len())
for absoluteKey := range returnKeySet {
list = append(list, c.items[absoluteKey])
}
return list, nil
}
// 索引键获取对象列表
func (c *threadSafeMap) ByIndex(indexName, indexKey string) ([]interface{}, error) {
c.lock.RLock()
defer c.lock.RUnlock()
// 索引函数
indexFunc := c.indexers[indexName]
if indexFunc == nil {
return nil, fmt.Errorf("Index with name %s does not exist", indexName)
}
// 索引键
index := c.indices[indexName]
// 对象键
set := index[indexKey]
list := make([]interface{}, 0, set.Len())
for key := range set {
list = append(list, c.items[key])
}
return list, nil
}
// 获取索引键对应的所有对象键
func (c *threadSafeMap) IndexKeys(indexName, indexKey string) ([]string, error) {
c.lock.RLock()
defer c.lock.RUnlock()
indexFunc := c.indexers[indexName]
if indexFunc == nil {
return nil, fmt.Errorf("Index with name %s does not exist", indexName)
}
index := c.indices[indexName]
set := index[indexKey]
return set.List(), nil
}
// 获取索引函数 下的所有 索引键的键值
func (c *threadSafeMap) ListIndexFuncValues(indexName string) []string {
c.lock.RLock()
defer c.lock.RUnlock()
index := c.indices[indexName]
names := make([]string, 0, len(index))
for key := range index {
names = append(names, key)
}
return names
}
更新索引和移除索引函数
// deleteFromIndices removes the object from each of the managed indexes
// it is intended to be called from a function that already has a lock on the cache
func (c *threadSafeMap) deleteFromIndices(obj interface{}, key string) {
for name, indexFunc := range c.indexers {
// 索引函数 ==> 对象 ==> 索引值
indexValues, err := indexFunc(obj)
if err != nil {
panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
}
// 索引值map
index := c.indices[name]
if index == nil {
continue
}
for _, indexValue := range indexValues {
// 对象键
set := index[indexValue]
if set != nil {
// 删除对象键
set.Delete(key)
}
}
}
}
// 对象添加或者更新时,需要更新索引,因为引用该私有函数的函数已经加锁
func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) {
// 移除旧对象的索引键
if oldObj != nil {
c.deleteFromIndices(oldObj, key)
}
for name, indexFunc := range c.indexers {
// 索引键
indexValues, err := indexFunc(newObj)
if err != nil {
panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
}
// 索引函数 ===> 索引键
index := c.indices[name]
if index == nil {
index = Index{}
c.indices[name] = index
}
for _, indexValue := range indexValues {
set := index[indexValue]
if set == nil {
set = sets.String{}
index[indexValue] = set
}
// 对象键
set.Insert(key)
}
}
}
cache实现
cache的实现主要是调用 threadSafeMap 的方法进行实现,这里就不进行过多介绍。
总结
- 搞明白 索引之间的关系很重要,后面的代码看起来就跟搭积木差不多。
- 第一层关系: 索引函数名称 === > 索引函数 === > 对象 ===> 计算出索引键
- 第二层关系: 索引函数名称 === > 索引键 === > 对象键 ===> 对象
参考链接
https://blog.csdn.net/weixin_42663840/article/details/81530606
https://github.com/kubernetes/client-go/tree/master/tools/cache