【kubernetes/k8s源码分析】API go restful 框架源码解析
1 概念
- Route
路由包含两种,一种是RouterJSR311,一种是快速路由CurlyRouter。
CurlyRouter支持正则表达式和动态参数,相比RouterJSR11更加轻量级,k8s使用的是快速路由。
Route包含:http Method,URL Path,输入输出类型(JSON/YAML)以及回调函数restful.RouteFunction,响应内容类型(Accept)等。
- WebService
WebService逻辑上是Route的集合,功能上主要是为一组Route统一设置包括root path,请求响应的数据类型等一些通用的属性。
WebService必须加入到Container中才能生效。
- Container
Container逻辑上是WebService的集合,包括一组restful.WebService和一个http.ServeMux对象,使用RouteSelector进行请求派发。
2 初始化流程
- create container
- create WebService
- create route, filter and bind to route handler
- router -> WebService
- WebService -> container
- new server with
Handler=container
- start server
3 结构体
3.1 Container结构体
// Container holds a collection of WebServices and a http.ServeMux to dispatch http requests.
// The requests are further dispatched to routes of WebServices using a RouteSelector
type Container struct {
webServicesLock sync.RWMutex
webServices []*WebService
ServeMux *http.ServeMux
isRegisteredOnRoot bool
containerFilters []FilterFunction
doNotRecover bool // default is true
recoverHandleFunc RecoverHandleFunction
serviceErrorHandleFunc ServiceErrorHandleFunction
router RouteSelector // default is a CurlyRouter (RouterJSR311 is a slower alternative)
contentEncodingEnabled bool // default is false
}
3.2 WebService结构体
WebService逻辑上是Route的集合,功能上主要是为一组Route统一设置包括root path,请求响应的数据类型等一些通用的属性。
WebService必须加入到Container中才能生效。
// WebService holds a collection of Route values that bind a Http Method + URL Path to a function.
type WebService struct {
rootPath string
pathExpr *pathExpression // cached compilation of rootPath as RegExp
routes []Route
produces []string
consumes []string
pathParameters []*Parameter
filters []FilterFunction
documentation string
apiVersion string
typeNameHandleFunc TypeNameHandleFunction
dynamicRoutes bool
// protects 'routes' if dynamic routes are enabled
routesLock sync.RWMutex
}
3.3 Route结构体
路由包含两种,一种是RouterJSR311,一种是快速路由CurlyRouter。
CurlyRouter支持正则表达式和动态参数,相比RouterJSR11更加轻量级,k8s默认使用的是快速路由。
Route包含:http Method,URL Path,输入输出类型(JSON/YAML)以及回调函数restful.RouteFunction,响应内容类型(Accept)等。
// Route binds a HTTP Method,Path,Consumes combination to a RouteFunction.
type Route struct {
Method string
Produces []string
Consumes []string
Path string // webservice root path + described path
Function RouteFunction
Filters []FilterFunction
// cached values for dispatching
relativePath string
pathParts []string
pathExpr *pathExpression // cached compilation of relativePath as RegExp
// documentation
Doc string
Notes string
Operation string
ParameterDocs []*Parameter
ResponseErrors map[int]ResponseError
ReadSample, WriteSample interface{} // structs that model an example request or response payload
// Extra information used to store custom information about the route.
Metadata map[string]interface{}
}
3.4 Path
3.5 APIGroupInfo
// Info about an API group.
type APIGroupInfo struct {
PrioritizedVersions []schema.GroupVersion
// Info about the resources in this group. It's a map from version to resource to the storage.
VersionedResourcesStorageMap map[string]map[string]rest.Storage
// OptionsExternalVersion controls the APIVersion used for common objects in the
// schema like api.Status, api.DeleteOptions, and metav1.ListOptions. Other implementors may
// define a version "v1beta1" but want to use the Kubernetes "v1" internal objects.
// If nil, defaults to groupMeta.GroupVersion.
// TODO: Remove this when https://github.com/kubernetes/kubernetes/issues/19018 is fixed.
OptionsExternalVersion *schema.GroupVersion
// MetaGroupVersion defaults to "meta.k8s.io/v1" and is the scheme group version used to decode
// common API implementations like ListOptions. Future changes will allow this to vary by group
// version (for when the inevitable meta/v2 group emerges).
MetaGroupVersion *schema.GroupVersion
// Scheme includes all of the types used by this group and how to convert between them (or
// to convert objects from outside of this group that are accepted in this API).
// TODO: replace with interfaces
Scheme *runtime.Scheme
// NegotiatedSerializer controls how this group encodes and decodes data
NegotiatedSerializer runtime.NegotiatedSerializer
// ParameterCodec performs conversions for query parameters passed to API calls
ParameterCodec runtime.ParameterCodec
}
4 Container初始化过程
调用过程: main –> App.Run –> master.Complete.New –> c.Config.GenericConfig.SkipComplete().New()
4.1 New函数
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Master, error) {
if reflect.DeepEqual(c.ExtraConfig.KubeletClientConfig, kubeletclient.KubeletClientConfig{}) {
return nil, fmt.Errorf("Master.New() called with empty config.KubeletClientConfig")
}
s, err := c.GenericConfig.New("kube-apiserver", delegationTarget)
if err != nil {
return nil, err
}
if c.ExtraConfig.EnableLogsSupport {
routes.Logs{}.Install(s.Handler.GoRestfulContainer)
}
m := &Master{
GenericAPIServer: s,
}
// install legacy rest storage
if c.ExtraConfig.APIResourceConfigSource.VersionEnabled(apiv1.SchemeGroupVersion) {
legacyRESTStorageProvider := corerest.LegacyRESTStorageProvider{
StorageFactory: c.ExtraConfig.StorageFactory,
ProxyTransport: c.ExtraConfig.ProxyTransport,
KubeletClientConfig: c.ExtraConfig.KubeletClientConfig,
EventTTL: c.ExtraConfig.EventTTL,
ServiceIPRange: c.ExtraConfig.ServiceIPRange,
ServiceNodePortRange: c.ExtraConfig.ServiceNodePortRange,
LoopbackClientConfig: c.GenericConfig.LoopbackClientConfig,
ServiceAccountIssuer: c.ExtraConfig.ServiceAccountIssuer,
ServiceAccountAPIAudiences: c.ExtraConfig.ServiceAccountAPIAudiences,
}
m.InstallLegacyAPI(&c, c.GenericConfig.RESTOptionsGetter, legacyRESTStorageProvider)
}
// The order here is preserved in discovery.
// If resources with identical names exist in more than one of these groups (e.g. "deployments.apps"" and "deployments.extensions"),
// the order of this list determines which group an unqualified resource name (e.g. "deployments") should prefer.
// This priority order is used for local discovery, but it ends up aggregated in `k8s.io/kubernetes/cmd/kube-apiserver/app/aggregator.go
// with specific priorities.
// TODO: describe the priority all the way down in the RESTStorageProviders and plumb it back through the various discovery
// handlers that we have.
restStorageProviders := []RESTStorageProvider{
authenticationrest.RESTStorageProvider{Authenticator: c.GenericConfig.Authentication.Authenticator},
authorizationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer, RuleResolver: c.GenericConfig.RuleResolver},
autoscalingrest.RESTStorageProvider{},
batchrest.RESTStorageProvider{},
certificatesrest.RESTStorageProvider{},
extensionsrest.RESTStorageProvider{},
networkingrest.RESTStorageProvider{},
policyrest.RESTStorageProvider{},
rbacrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer},
schedulingrest.RESTStorageProvider{},
settingsrest.RESTStorageProvider{},
storagerest.RESTStorageProvider{},
// keep apps after extensions so legacy clients resolve the extensions versions of shared resource names.
// See https://github.com/kubernetes/kubernetes/issues/42392
appsrest.RESTStorageProvider{},
admissionregistrationrest.RESTStorageProvider{},
eventsrest.RESTStorageProvider{TTL: c.ExtraConfig.EventTTL},
}
m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStorageProviders...)
if c.ExtraConfig.Tunneler != nil {
m.installTunneler(c.ExtraConfig.Tunneler, corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig).Nodes())
}
m.GenericAPIServer.AddPostStartHookOrDie("ca-registration", c.ExtraConfig.ClientCARegistrationHook.PostStartHook)
return m, nil
}
func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*GenericAPIServer, error) { if c.Serializer == nil { return nil, fmt.Errorf("Genericapiserver.New() called with config.Serializer == nil") } if c.LoopbackClientConfig == nil { return nil, fmt.Errorf("Genericapiserver.New() called with config.LoopbackClientConfig == nil") } handlerChainBuilder := func(handler http.Handler) http.Handler { return c.BuildHandlerChainFunc(handler, c.Config) } apiServerHandler := NewAPIServerHandler(name, c.Serializer, handlerChainBuilder, delegationTarget.UnprotectedHandler()) s := &GenericAPIServer{ discoveryAddresses: c.DiscoveryAddresses, LoopbackClientConfig: c.LoopbackClientConfig, legacyAPIGroupPrefixes: c.LegacyAPIGroupPrefixes, admissionControl: c.AdmissionControl, Serializer: c.Serializer, AuditBackend: c.AuditBackend, delegationTarget: delegationTarget, HandlerChainWaitGroup: c.HandlerChainWaitGroup, minRequestTimeout: time.Duration(c.MinRequestTimeout) * time.Second, ShutdownTimeout: c.RequestTimeout, SecureServingInfo: c.SecureServing, ExternalAddress: c.ExternalAddress, Handler: apiServerHandler, listedPathProvider: apiServerHandler, swaggerConfig: c.SwaggerConfig, openAPIConfig: c.OpenAPIConfig, postStartHooks: map[string]postStartHookEntry{}, preShutdownHooks: map[string]preShutdownHookEntry{}, disabledPostStartHooks: c.DisabledPostStartHooks, healthzChecks: c.HealthzChecks, DiscoveryGroupManager: discovery.NewRootAPIsHandler(c.DiscoveryAddresses, c.Serializer), enableAPIResponseCompression: c.EnableAPIResponseCompression, } for k, v := range delegationTarget.PostStartHooks() { s.postStartHooks[k] = v } for k, v := range delegationTarget.PreShutdownHooks() { s.preShutdownHooks[k] = v } genericApiServerHookName := "generic-apiserver-start-informers" if c.SharedInformerFactory != nil && !s.isPostStartHookRegistered(genericApiServerHookName) { err := s.AddPostStartHook(genericApiServerHookName, func(context PostStartHookContext) error { c.SharedInformerFactory.Start(context.StopCh) return nil }) if err != nil { return nil, err } } for _, delegateCheck := range delegationTarget.HealthzChecks() { skip := false for _, existingCheck := range c.HealthzChecks { if existingCheck.Name() == delegateCheck.Name() { skip = true break } } if skip { continue } s.healthzChecks = append(s.healthzChecks, delegateCheck) } s.listedPathProvider = routes.ListedPathProviders{s.listedPathProvider, delegationTarget} installAPI(s, c.Config) // use the UnprotectedHandler from the delegation target to ensure that we don't attempt to double authenticator, authorize, // or some other part of the filter chain in delegation cases. if delegationTarget.UnprotectedHandler() == nil && c.EnableIndex { s.Handler.NonGoRestfulMux.NotFoundHandler(routes.IndexLister{ StatusCode: http.StatusNotFound, PathProvider: s.listedPathProvider, }) } return s, nil }
4.2 NewAPIServerHandler函数
func NewAPIServerHandler(name string, s runtime.NegotiatedSerializer, handlerChainBuilder HandlerChainBuilderFn, notFoundHandler http.Handler) *APIServerHandler {
nonGoRestfulMux := mux.NewPathRecorderMux(name)
if notFoundHandler != nil {
nonGoRestfulMux.NotFoundHandler(notFoundHandler)
}
gorestfulContainer := restful.NewContainer()
gorestfulContainer.ServeMux = http.NewServeMux()
gorestfulContainer.Router(restful.CurlyRouter{}) // e.g. for proxy/{kind}/{name}/{*}
gorestfulContainer.RecoverHandler(func(panicReason interface{}, httpWriter http.ResponseWriter) {
logStackOnRecover(s, panicReason, httpWriter)
})
gorestfulContainer.ServiceErrorHandler(func(serviceErr restful.ServiceError, request *restful.Request, response *restful.Response) {
serviceErrorHandler(s, serviceErr, request, response)
})
director := director{
name: name,
goRestfulContainer: gorestfulContainer,
nonGoRestfulMux: nonGoRestfulMux,
}
return &APIServerHandler{
FullHandlerChain: handlerChainBuilder(director),
GoRestfulContainer: gorestfulContainer,
NonGoRestfulMux: nonGoRestfulMux,
Director: director,
}
}
5 WebService初始化过程
Container已创建并且也进行了初始化。该轮到WebService了
5.1 installAPI函数
func installAPI(s *GenericAPIServer, c *Config) { if c.EnableIndex { routes.Index{}.Install(s.listedPathProvider, s.Handler.NonGoRestfulMux) } if c.SwaggerConfig != nil && c.EnableSwaggerUI { routes.SwaggerUI{}.Install(s.Handler.NonGoRestfulMux) } if c.EnableProfiling { routes.Profiling{}.Install(s.Handler.NonGoRestfulMux) if c.EnableContentionProfiling { goruntime.SetBlockProfileRate(1) } } if c.EnableMetrics { if c.EnableProfiling { routes.MetricsWithReset{}.Install(s.Handler.NonGoRestfulMux) } else { routes.DefaultMetrics{}.Install(s.Handler.NonGoRestfulMux) } } routes.Version{Version: c.Version}.Install(s.Handler.GoRestfulContainer) if c.EnableDiscovery { s.Handler.GoRestfulContainer.Add(s.DiscoveryGroupManager.WebService()) } }/api的注册是InstallLegacyAPIGroup()接口
/apis的注册是InstallAPIGroup()
这两个接口都会调用s.installAPIResources(),最后再调用InstallREST()进行API注册
5.2 InstallRest函数
func (g *APIGroupVersion) InstallREST(container *restful.Container) error { prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version) installer := &APIInstaller{ group: g, prefix: prefix, minRequestTimeout: g.MinRequestTimeout, enableAPIResponseCompression: g.EnableAPIResponseCompression, } apiResources, ws, registrationErrors := installer.Install() versionDiscoveryHandler := discovery.NewAPIVersionHandler(g.Serializer, g.GroupVersion, staticLister{apiResources}) versionDiscoveryHandler.AddToWebService(ws) container.Add(ws) return utilerrors.NewAggregate(registrationErrors) }
5.3 Install函数
func (a *APIInstaller) Install() ([]metav1.APIResource, *restful.WebService, []error) {
var apiResources []metav1.APIResource
var errors []error
ws := a.newWebService()
// Register the paths in a deterministic (sorted) order to get a deterministic swagger spec.
paths := make([]string, len(a.group.Storage))
var i int = 0
for path := range a.group.Storage {
paths[i] = path
i++
}
sort.Strings(paths)
for _, path := range paths {
apiResource, err := a.registerResourceHandlers(path, a.group.Storage[path], ws)
if err != nil {
errors = append(errors, fmt.Errorf("error in registering resource: %s, %v", path, err))
}
if apiResource != nil {
apiResources = append(apiResources, *apiResource)
}
}
return apiResources, ws, errors
}
6 RegisterResourceHandlers函数
这个函数涉及的 内容比较多,分开一段一段讲解
6.1 创建creater lister getter 等,也就是storage,对应etcd不同操作
// what verbs are supported by the storage, used to know what verbs we support per path creater, isCreater := storage.(rest.Creater) namedCreater, isNamedCreater := storage.(rest.NamedCreater) lister, isLister := storage.(rest.Lister) getter, isGetter := storage.(rest.Getter) getterWithOptions, isGetterWithOptions := storage.(rest.GetterWithOptions) gracefulDeleter, isGracefulDeleter := storage.(rest.GracefulDeleter) collectionDeleter, isCollectionDeleter := storage.(rest.CollectionDeleter) updater, isUpdater := storage.(rest.Updater) patcher, isPatcher := storage.(rest.Patcher) watcher, isWatcher := storage.(rest.Watcher) connecter, isConnecter := storage.(rest.Connecter) storageMeta, isMetadata := storage.(rest.StorageMetadata)
6.1 分为两类,有namespace,无namespace
switch { case !namespaceScoped: // Handle non-namespace scoped resources like nodes. resourcePath := resource resourceParams := params itemPath := resourcePath + "/{name}" nameParams := append(params, nameParam) proxyParams := append(nameParams, pathParam) suffix := "" if isSubresource { suffix = "/" + subresource itemPath = itemPath + suffix resourcePath = itemPath resourceParams = nameParams } apiResource.Name = path apiResource.Namespaced = false apiResource.Kind = resourceKind namer := handlers.ContextBasedNaming{ SelfLinker: a.group.Linker, ClusterScoped: true, SelfLinkPathPrefix: gpath.Join(a.prefix, resource) + "/", SelfLinkPathSuffix: suffix, } // Handler for standard REST verbs (GET, PUT, POST and DELETE). // Add actions at the resource path: /api/apiVersion/resource actions = appendIf(actions, action{"LIST", resourcePath, resourceParams, namer, false}, isLister) actions = appendIf(actions, action{"POST", resourcePath, resourceParams, namer, false}, isCreater) actions = appendIf(actions, action{"DELETECOLLECTION", resourcePath, resourceParams, namer, false}, isCollectionDeleter) // DEPRECATED actions = appendIf(actions, action{"WATCHLIST", "watch/" + resourcePath, resourceParams, namer, false}, allowWatchList) // Add actions at the item path: /api/apiVersion/resource/{name} actions = appendIf(actions, action{"GET", itemPath, nameParams, namer, false}, isGetter) if getSubpath { actions = appendIf(actions, action{"GET", itemPath + "/{path:*}", proxyParams, namer, false}, isGetter) } actions = appendIf(actions, action{"PUT", itemPath, nameParams, namer, false}, isUpdater) actions = appendIf(actions, action{"PATCH", itemPath, nameParams, namer, false}, isPatcher) actions = appendIf(actions, action{"DELETE", itemPath, nameParams, namer, false}, isGracefulDeleter) actions = appendIf(actions, action{"WATCH", "watch/" + itemPath, nameParams, namer, false}, isWatcher) actions = appendIf(actions, action{"CONNECT", itemPath, nameParams, namer, false}, isConnecter) actions = appendIf(actions, action{"CONNECT", itemPath + "/{path:*}", proxyParams, namer, false}, isConnecter && connectSubpath) break default: namespaceParamName := "namespaces" // Handler for standard REST verbs (GET, PUT, POST and DELETE). namespaceParam := ws.PathParameter("namespace", "object name and auth scope, such as for teams and projects").DataType("string") namespacedPath := namespaceParamName + "/{" + "namespace" + "}/" + resource namespaceParams := []*restful.Parameter{namespaceParam} resourcePath := namespacedPath resourceParams := namespaceParams itemPath := namespacedPath + "/{name}" nameParams := append(namespaceParams, nameParam) proxyParams := append(nameParams, pathParam) itemPathSuffix := "" if isSubresource { itemPathSuffix = "/" + subresource itemPath = itemPath + itemPathSuffix resourcePath = itemPath resourceParams = nameParams } apiResource.Name = path apiResource.Namespaced = true apiResource.Kind = resourceKind namer := handlers.ContextBasedNaming{ SelfLinker: a.group.Linker, ClusterScoped: false, SelfLinkPathPrefix: gpath.Join(a.prefix, namespaceParamName) + "/", SelfLinkPathSuffix: itemPathSuffix, } actions = appendIf(actions, action{"LIST", resourcePath, resourceParams, namer, false}, isLister) actions = appendIf(actions, action{"POST", resourcePath, resourceParams, namer, false}, isCreater) actions = appendIf(actions, action{"DELETECOLLECTION", resourcePath, resourceParams, namer, false}, isCollectionDeleter) // DEPRECATED actions = appendIf(actions, action{"WATCHLIST", "watch/" + resourcePath, resourceParams, namer, false}, allowWatchList) actions = appendIf(actions, action{"GET", itemPath, nameParams, namer, false}, isGetter) if getSubpath { actions = appendIf(actions, action{"GET", itemPath + "/{path:*}", proxyParams, namer, false}, isGetter) } actions = appendIf(actions, action{"PUT", itemPath, nameParams, namer, false}, isUpdater) actions = appendIf(actions, action{"PATCH", itemPath, nameParams, namer, false}, isPatcher) actions = appendIf(actions, action{"DELETE", itemPath, nameParams, namer, false}, isGracefulDeleter) actions = appendIf(actions, action{"WATCH", "watch/" + itemPath, nameParams, namer, false}, isWatcher) actions = appendIf(actions, action{"CONNECT", itemPath, nameParams, namer, false}, isConnecter) actions = appendIf(actions, action{"CONNECT", itemPath + "/{path:*}", proxyParams, namer, false}, isConnecter && connectSubpath) // list or post across namespace. // For ex: LIST all pods in all namespaces by sending a LIST request at /api/apiVersion/pods. // TODO: more strongly type whether a resource allows these actions on "all namespaces" (bulk delete) if !isSubresource { actions = appendIf(actions, action{"LIST", resource, params, namer, true}, isLister) actions = appendIf(actions, action{"WATCHLIST", "watch/" + resource, params, namer, true}, allowWatchList) } break }
6.2 遍历所有action,
for _, action := range actions {
..........
for _, route := range routes {
route.Metadata(ROUTE_META_GVK, metav1.GroupVersionKind{
Group: reqScope.Kind.Group,
Version: reqScope.Kind.Version,
Kind: reqScope.Kind.Kind,
})
route.Metadata(ROUTE_META_ACTION, strings.ToLower(action.Verb))
ws.Route(route)
}
// Note: update GetAuthorizerAttributes() when adding a custom handler.
}
routes := []*restful.RouteBuilder{}
6.2.1 创建RouteBuilder对象
routes := []*restful.RouteBuilder{}
6.2.2 只讲解其GET操作,其他操作大同小异
switch action.Verb {
case "GET": // Get a resource.
var handler restful.RouteFunction
if isGetterWithOptions {
handler = restfulGetResourceWithOptions(getterWithOptions, reqScope, isSubresource)
} else {
handler = restfulGetResource(getter, exporter, reqScope)
}
if needOverride {
// need change the reported verb
handler = metrics.InstrumentRouteFunc(verbOverrider.OverrideMetricsVerb(action.Verb), resource, subresource, requestScope, handler)
} else {
handler = metrics.InstrumentRouteFunc(action.Verb, resource, subresource, requestScope, handler)
}
if a.enableAPIResponseCompression {
handler = genericfilters.RestfulWithCompression(handler)
}
doc := "read the specified " + kind
if isSubresource {
doc = "read " + subresource + " of the specified " + kind
}
route := ws.GET(action.Path).To(handler).
Doc(doc).
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
Operation("read"+namespaced+kind+strings.Title(subresource)+operationSuffix).
Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).
Returns(http.StatusOK, "OK", producedObject).
Writes(producedObject)
if isGetterWithOptions {
if err := addObjectParams(ws, route, versionedGetOptions); err != nil {
return nil, err
}
}
if isExporter {
if err := addObjectParams(ws, route, versionedExportOptions); err != nil {
return nil, err
}
}
addParams(route, action.Params)
routes = append(routes, route)