Skip to content

Commit 15a6186

Browse files
authored
Simplify yaml parsing and improve its preformance (#192)
* create decoder only once * parse resources file-by-file * a more accurate error message * resourceFinder now accumulates resources it finds * Using resource accumulation to simplify code * inlineConfigMapRefs() as resourceFinder method * Fixes to documentation * avoid redundant type conversions * fully parsing each yaml doc in one iteration
1 parent f497a98 commit 15a6186

5 files changed

+151
-194
lines changed

pkg/controller/error_types.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func (err *NoYamlsFoundError) Error() string {
5656
}
5757

5858
func (err *NoK8sResourcesFoundError) Error() string {
59-
return "no relevant Kubernetes resources found"
59+
return "could not find any Kubernetes workload resources"
6060
}
6161

6262
func (err *ConfigMapNotFoundError) Error() string {

pkg/controller/policies_synthesizer.go

+7-10
Original file line numberDiff line numberDiff line change
@@ -134,32 +134,29 @@ func (ps *PoliciesSynthesizer) ConnectionsFromFolderPaths(dirPaths []string) ([]
134134
func (ps *PoliciesSynthesizer) extractConnections(dirPaths []string) ([]common.Resource, []*common.Connections, []FileProcessingError) {
135135
// 1. Get all relevant resources from the repo
136136
resFinder := resourceFinder{logger: ps.logger, stopOn1stErr: ps.stopOnError, walkFn: ps.walkFn}
137-
rawResources := []rawResourcesInFile{}
138137
fileErrors := []FileProcessingError{}
139138
for _, dirPath := range dirPaths {
140-
dObjs, errs := resFinder.getRelevantK8sResources(dirPath)
141-
rawResources = append(rawResources, dObjs...)
139+
errs := resFinder.getRelevantK8sResources(dirPath)
142140
fileErrors = append(fileErrors, errs...)
143141
if stopProcessing(ps.stopOnError, errs) {
144142
return nil, nil, fileErrors
145143
}
146144
}
147-
if len(rawResources) == 0 {
145+
if len(resFinder.workloads) == 0 {
148146
fileErrors = appendAndLogNewError(fileErrors, noK8sResourcesFound(), ps.logger)
149147
return []common.Resource{}, []*common.Connections{}, fileErrors
150148
}
151149

152-
// 2. Parse them into internal structs
153-
resParser := resourceParser{logger: ps.logger}
154-
resources, links, parseErrors := resParser.parseResources(rawResources)
155-
fileErrors = append(fileErrors, parseErrors...)
150+
// 2. Inline configmaps values as workload envs
151+
errs := resFinder.inlineConfigMapRefsAsEnvs()
152+
fileErrors = append(fileErrors, errs...)
156153
if stopProcessing(ps.stopOnError, fileErrors) {
157154
return nil, nil, fileErrors
158155
}
159156

160157
// 3. Discover all connections between resources
161-
connections := discoverConnections(resources, links, ps.logger)
162-
return resources, connections, fileErrors
158+
connections := discoverConnections(resFinder.workloads, resFinder.services, ps.logger)
159+
return resFinder.workloads, connections, fileErrors
163160
}
164161

165162
func hasFatalError(errs []FileProcessingError) error {

pkg/controller/resource_finder.go

+119-51
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@ import (
1010

1111
"gopkg.in/yaml.v3"
1212
"k8s.io/client-go/kubernetes/scheme"
13+
14+
"github.com/np-guard/cluster-topology-analyzer/pkg/analyzer"
15+
"github.com/np-guard/cluster-topology-analyzer/pkg/common"
1316
)
1417

1518
// K8s resources that are relevant for connectivity analysis
@@ -18,8 +21,8 @@ const (
1821
replicaSet string = "ReplicaSet"
1922
replicationController string = "ReplicationController"
2023
deployment string = "Deployment"
21-
statefulset string = "StatefulSet"
22-
daemonset string = "DaemonSet"
24+
statefulSet string = "StatefulSet"
25+
daemonSet string = "DaemonSet"
2326
job string = "Job"
2427
cronJob string = "CronTab"
2528
service string = "Service"
@@ -28,57 +31,48 @@ const (
2831

2932
var (
3033
acceptedK8sTypesRegex = fmt.Sprintf("(^%s$|^%s$|^%s$|^%s$|^%s$|^%s$|^%s$|^%s$|^%s$|^%s$)",
31-
pod, replicaSet, replicationController, deployment, daemonset, statefulset, job, cronJob, service, configmap)
32-
acceptedK8sTypes = regexp.MustCompile(acceptedK8sTypesRegex)
33-
yamlSuffix = regexp.MustCompile(".ya?ml$")
34+
pod, replicaSet, replicationController, deployment, daemonSet, statefulSet, job, cronJob, service, configmap)
35+
acceptedK8sTypes = regexp.MustCompile(acceptedK8sTypesRegex)
36+
yamlSuffix = regexp.MustCompile(".ya?ml$")
37+
k8sResourceDecoder = scheme.Codecs.UniversalDeserializer()
3438
)
3539

36-
// rawResourcesInFile represents a single YAML file with multiple K8s resources
37-
type rawResourcesInFile struct {
38-
ManifestFilepath string
39-
rawK8sResources []rawK8sResource
40-
}
41-
42-
// rawK8sResource stores a raw K8s resource and its kind for later parsing
43-
type rawK8sResource struct {
44-
GroupKind string
45-
RuntimeObject []byte
46-
}
47-
48-
// resourceFinder is used to locate all relevant K8s resources in a given file-system directory
40+
// resourceFinder is used to locate all relevant K8s resources in given file-system directories
41+
// and to convert them into the internal structs, used for later processing.
4942
type resourceFinder struct {
5043
logger Logger
5144
stopOn1stErr bool
5245
walkFn WalkFunction // for customizing directory scan
46+
47+
workloads []common.Resource // accumulates all workload resources found
48+
services []common.Service // accumulates all service resources found
49+
configmaps []common.CfgMap // accumulates all ConfigMap resources found
5350
}
5451

5552
// getRelevantK8sResources is the main function of resourceFinder.
5653
// It scans a given directory using walkFn, looking for all yaml files. It then breaks each yaml into its documents
5754
// and extracts all K8s resources that are relevant for connectivity analysis.
58-
// The result is stored in a slice of rawResourcesInFile (one per yaml file), each containing a slice of rawK8sResource
59-
func (rf *resourceFinder) getRelevantK8sResources(repoDir string) ([]rawResourcesInFile, []FileProcessingError) {
55+
// The resources are stored in the struct, separated to workloads, services and configmaps
56+
func (rf *resourceFinder) getRelevantK8sResources(repoDir string) []FileProcessingError {
6057
manifestFiles, fileScanErrors := rf.searchForManifests(repoDir)
6158
if stopProcessing(rf.stopOn1stErr, fileScanErrors) {
62-
return nil, fileScanErrors
59+
return fileScanErrors
6360
}
6461
if len(manifestFiles) == 0 {
6562
fileScanErrors = appendAndLogNewError(fileScanErrors, noYamlsFound(), rf.logger)
66-
return nil, fileScanErrors
63+
return fileScanErrors
6764
}
6865

69-
parsedObjs := []rawResourcesInFile{}
7066
for _, mfp := range manifestFiles {
71-
rawK8sResources, err := rf.parseK8sYaml(mfp)
72-
fileScanErrors = append(fileScanErrors, err...)
67+
relMfp := pathWithoutBaseDir(mfp, repoDir)
68+
errs := rf.parseK8sYaml(mfp, relMfp)
69+
fileScanErrors = append(fileScanErrors, errs...)
7370
if stopProcessing(rf.stopOn1stErr, fileScanErrors) {
74-
return nil, fileScanErrors
75-
}
76-
if len(rawK8sResources) > 0 {
77-
manifestFilePath := pathWithoutBaseDir(mfp, repoDir)
78-
parsedObjs = append(parsedObjs, rawResourcesInFile{rawK8sResources: rawK8sResources, ManifestFilepath: manifestFilePath})
71+
return fileScanErrors
7972
}
8073
}
81-
return parsedObjs, fileScanErrors
74+
75+
return fileScanErrors
8276
}
8377

8478
// searchForManifests returns a list of YAML files under a given directory (recursively)
@@ -105,14 +99,14 @@ func (rf *resourceFinder) searchForManifests(repoDir string) ([]string, []FilePr
10599
}
106100

107101
// splitByYamlDocuments takes a YAML file and returns a slice containing its documents as raw text
108-
func (rf *resourceFinder) splitByYamlDocuments(mfp string) ([]string, []FileProcessingError) {
102+
func (rf *resourceFinder) splitByYamlDocuments(mfp string) ([][]byte, []FileProcessingError) {
109103
fileBuf, err := os.ReadFile(mfp)
110104
if err != nil {
111-
return []string{}, appendAndLogNewError(nil, failedReadingFile(mfp, err), rf.logger)
105+
return nil, appendAndLogNewError(nil, failedReadingFile(mfp, err), rf.logger)
112106
}
113107

114108
decoder := yaml.NewDecoder(bytes.NewBuffer(fileBuf))
115-
documents := []string{}
109+
documents := [][]byte{}
116110
documentID := 0
117111
for {
118112
var doc yaml.Node
@@ -127,39 +121,67 @@ func (rf *resourceFinder) splitByYamlDocuments(mfp string) ([]string, []FileProc
127121
if err != nil {
128122
return documents, appendAndLogNewError(nil, malformedYamlDoc(mfp, doc.Line, documentID, err), rf.logger)
129123
}
130-
documents = append(documents, string(out))
124+
documents = append(documents, out)
131125
}
132126
documentID += 1
133127
}
134128
return documents, nil
135129
}
136130

137-
// parseK8sYaml takes a YAML document and checks if it stands for a relevant K8s resource.
138-
// If yes, it puts it into a rawK8sResource and appends it to the result.
139-
func (rf *resourceFinder) parseK8sYaml(mfp string) ([]rawK8sResource, []FileProcessingError) {
140-
dObjs := []rawK8sResource{}
141-
sepYamlFiles, fileProcessingErrors := rf.splitByYamlDocuments(mfp)
131+
// parseK8sYaml takes a YAML file and attempts to parse each of its documents into
132+
// one of the relevant k8s resources
133+
func (rf *resourceFinder) parseK8sYaml(mfp, relMfp string) []FileProcessingError {
134+
yamlDocs, fileProcessingErrors := rf.splitByYamlDocuments(mfp)
142135
if stopProcessing(rf.stopOn1stErr, fileProcessingErrors) {
143-
return nil, fileProcessingErrors
136+
return fileProcessingErrors
144137
}
145138

146-
for docID, doc := range sepYamlFiles {
147-
decode := scheme.Codecs.UniversalDeserializer().Decode
148-
_, groupVersionKind, err := decode([]byte(doc), nil, nil)
139+
for docID, doc := range yamlDocs {
140+
_, groupVersionKind, err := k8sResourceDecoder.Decode(doc, nil, nil)
149141
if err != nil {
150-
fileProcessingErrors = appendAndLogNewError(fileProcessingErrors, notK8sResource(mfp, docID, err), rf.logger)
142+
fileProcessingErrors = appendAndLogNewError(fileProcessingErrors, notK8sResource(relMfp, docID, err), rf.logger)
151143
continue
152144
}
153145
if !acceptedK8sTypes.MatchString(groupVersionKind.Kind) {
154-
rf.logger.Infof("in file: %s, document: %d, skipping object with type: %s", mfp, docID, groupVersionKind.Kind)
146+
rf.logger.Infof("in file: %s, document: %d, skipping object with type: %s", relMfp, docID, groupVersionKind.Kind)
155147
} else {
156-
d := rawK8sResource{}
157-
d.GroupKind = groupVersionKind.Kind
158-
d.RuntimeObject = []byte(doc)
159-
dObjs = append(dObjs, d)
148+
kind := groupVersionKind.Kind
149+
err = rf.parseResource(kind, doc, relMfp)
150+
if err != nil {
151+
fileProcessingErrors = appendAndLogNewError(fileProcessingErrors, failedScanningResource(kind, relMfp, err), rf.logger)
152+
}
153+
}
154+
}
155+
return fileProcessingErrors
156+
}
157+
158+
// parseResource takes a yaml document, parses it into a K8s resource and puts it into one of the 3 struct slices:
159+
// the workload resource slice, the Service resource slice, and the ConfigMaps resource slice
160+
func (rf *resourceFinder) parseResource(kind string, yamlDoc []byte, manifestFilePath string) error {
161+
switch kind {
162+
case service:
163+
res, err := analyzer.ScanK8sServiceObject(kind, yamlDoc)
164+
if err != nil {
165+
return err
160166
}
167+
res.Resource.FilePath = manifestFilePath
168+
rf.services = append(rf.services, res)
169+
case configmap:
170+
res, err := analyzer.ScanK8sConfigmapObject(kind, yamlDoc)
171+
if err != nil {
172+
return err
173+
}
174+
rf.configmaps = append(rf.configmaps, res)
175+
default:
176+
res, err := analyzer.ScanK8sWorkloadObject(kind, yamlDoc)
177+
if err != nil {
178+
return err
179+
}
180+
res.Resource.FilePath = manifestFilePath
181+
rf.workloads = append(rf.workloads, res)
161182
}
162-
return dObjs, fileProcessingErrors
183+
184+
return nil
163185
}
164186

165187
// returns a file path without its prefix base dir
@@ -174,3 +196,49 @@ func pathWithoutBaseDir(path, baseDir string) string {
174196
}
175197
return relPath
176198
}
199+
200+
// inlineConfigMapRefsAsEnvs appends to the Envs of each given resource the ConfigMap values it is referring to
201+
// It should only be called after ALL calls to getRelevantK8sResources successfully returned
202+
func (rf *resourceFinder) inlineConfigMapRefsAsEnvs() []FileProcessingError {
203+
cfgMapsByName := map[string]*common.CfgMap{}
204+
for cm := range rf.configmaps {
205+
cfgMapsByName[rf.configmaps[cm].FullName] = &rf.configmaps[cm]
206+
}
207+
208+
parseErrors := []FileProcessingError{}
209+
for idx := range rf.workloads {
210+
res := &rf.workloads[idx]
211+
212+
// inline the envFrom field in PodSpec->containers
213+
for _, cfgMapRef := range res.Resource.ConfigMapRefs {
214+
configmapFullName := res.Resource.Namespace + "/" + cfgMapRef
215+
if cfgMap, ok := cfgMapsByName[configmapFullName]; ok {
216+
for _, v := range cfgMap.Data {
217+
if analyzer.IsNetworkAddressValue(v) {
218+
res.Resource.NetworkAddrs = append(res.Resource.NetworkAddrs, v)
219+
}
220+
}
221+
} else {
222+
parseErrors = appendAndLogNewError(parseErrors, configMapNotFound(configmapFullName, res.Resource.Name), rf.logger)
223+
}
224+
}
225+
226+
// inline PodSpec->container->env->valueFrom->configMapKeyRef
227+
for _, cfgMapKeyRef := range res.Resource.ConfigMapKeyRefs {
228+
configmapFullName := res.Resource.Namespace + "/" + cfgMapKeyRef.Name
229+
if cfgMap, ok := cfgMapsByName[configmapFullName]; ok {
230+
if val, ok := cfgMap.Data[cfgMapKeyRef.Key]; ok {
231+
if analyzer.IsNetworkAddressValue(val) {
232+
res.Resource.NetworkAddrs = append(res.Resource.NetworkAddrs, val)
233+
}
234+
} else {
235+
err := configMapKeyNotFound(cfgMapKeyRef.Name, cfgMapKeyRef.Key, res.Resource.Name)
236+
parseErrors = appendAndLogNewError(parseErrors, err, rf.logger)
237+
}
238+
} else {
239+
parseErrors = appendAndLogNewError(parseErrors, configMapNotFound(configmapFullName, res.Resource.Name), rf.logger)
240+
}
241+
}
242+
}
243+
return parseErrors
244+
}

pkg/controller/resource_finder_test.go

+24-14
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import (
1313
func TestGetRelevantK8sResourcesBadYamlDocument(t *testing.T) {
1414
dirPath := filepath.Join(getTestsDir(), "bad_yamls", "document_with_syntax_error.yaml")
1515
resFinder := resourceFinder{logger: NewDefaultLogger(), stopOn1stErr: false, walkFn: filepath.WalkDir}
16-
objs, errs := resFinder.getRelevantK8sResources(dirPath)
16+
errs := resFinder.getRelevantK8sResources(dirPath)
1717
require.Len(t, errs, 1)
1818
badDoc := &MalformedYamlDocError{}
1919
require.True(t, errors.As(errs[0].Error(), &badDoc))
@@ -22,14 +22,15 @@ func TestGetRelevantK8sResourcesBadYamlDocument(t *testing.T) {
2222
require.Equal(t, 6, docID)
2323
require.Nil(t, err)
2424

25-
require.Len(t, objs, 1)
26-
require.Len(t, objs[0].rawK8sResources, 6)
25+
require.Len(t, resFinder.workloads, 3)
26+
require.Len(t, resFinder.services, 3)
27+
require.Empty(t, resFinder.configmaps)
2728
}
2829

2930
func TestGetRelevantK8sResourcesBadYamlDocumentFailFast(t *testing.T) {
3031
dirPath := filepath.Join(getTestsDir(), "bad_yamls", "document_with_syntax_error.yaml")
3132
resFinder := resourceFinder{logger: NewDefaultLogger(), stopOn1stErr: true, walkFn: filepath.WalkDir}
32-
objs, errs := resFinder.getRelevantK8sResources(dirPath)
33+
errs := resFinder.getRelevantK8sResources(dirPath)
3334
require.Len(t, errs, 1)
3435
badDoc := &MalformedYamlDocError{}
3536
require.True(t, errors.As(errs[0].Error(), &badDoc))
@@ -38,48 +39,57 @@ func TestGetRelevantK8sResourcesBadYamlDocumentFailFast(t *testing.T) {
3839
require.Equal(t, 6, docID)
3940
require.Nil(t, err)
4041

41-
require.Empty(t, objs)
42+
require.Empty(t, resFinder.workloads)
43+
require.Empty(t, resFinder.services)
44+
require.Empty(t, resFinder.configmaps)
4245
}
4346

4447
func TestGetRelevantK8sResourcesNoK8sResource(t *testing.T) {
4548
dirPath := filepath.Join(getTestsDir(), "bad_yamls", "not_a_k8s_resource.yaml")
4649
resFinder := resourceFinder{logger: NewDefaultLogger(), stopOn1stErr: false, walkFn: filepath.WalkDir}
47-
objs, errs := resFinder.getRelevantK8sResources(dirPath)
50+
errs := resFinder.getRelevantK8sResources(dirPath)
4851
require.Len(t, errs, 1)
4952
notK8sRes := &NotK8sResourceError{}
5053
require.True(t, errors.As(errs[0].Error(), &notK8sRes))
51-
require.Len(t, objs, 1)
52-
require.Len(t, objs[0].rawK8sResources, 1)
54+
require.Empty(t, resFinder.workloads)
55+
require.Len(t, resFinder.services, 1)
56+
require.Empty(t, resFinder.configmaps)
5357
}
5458

5559
func TestGetRelevantK8sResourcesNoYAMLs(t *testing.T) {
5660
dirPath := filepath.Join(getTestsDir(), "bad_yamls", "subdir2")
5761
resFinder := resourceFinder{logger: NewDefaultLogger(), stopOn1stErr: false, walkFn: filepath.WalkDir}
58-
objs, errs := resFinder.getRelevantK8sResources(dirPath)
62+
errs := resFinder.getRelevantK8sResources(dirPath)
5963
require.Len(t, errs, 1)
6064
noYamls := &NoYamlsFoundError{}
6165
require.True(t, errors.As(errs[0].Error(), &noYamls))
62-
require.Empty(t, objs)
66+
require.Empty(t, resFinder.workloads)
67+
require.Empty(t, resFinder.services)
68+
require.Empty(t, resFinder.configmaps)
6369
}
6470

6571
func TestGetRelevantK8sResourcesBadDir(t *testing.T) {
6672
dirPath := filepath.Join(getTestsDir(), "bad_yamls", "subdir3") // doesn't exist
6773
resFinder := resourceFinder{logger: NewDefaultLogger(), stopOn1stErr: false, walkFn: filepath.WalkDir}
68-
objs, errs := resFinder.getRelevantK8sResources(dirPath)
74+
errs := resFinder.getRelevantK8sResources(dirPath)
6975
require.Len(t, errs, 1)
7076
badDir := &FailedAccessingDirError{}
7177
require.True(t, errors.As(errs[0].Error(), &badDir))
72-
require.Empty(t, objs)
78+
require.Empty(t, resFinder.workloads)
79+
require.Empty(t, resFinder.services)
80+
require.Empty(t, resFinder.configmaps)
7381
}
7482

7583
func TestGetRelevantK8sResourcesBadDirFailFast(t *testing.T) {
7684
dirPath := filepath.Join(getTestsDir(), "bad_yamls", "subdir3") // doesn't exist
7785
resFinder := resourceFinder{logger: NewDefaultLogger(), stopOn1stErr: true, walkFn: filepath.WalkDir}
78-
objs, errs := resFinder.getRelevantK8sResources(dirPath)
86+
errs := resFinder.getRelevantK8sResources(dirPath)
7987
require.Len(t, errs, 1)
8088
badDir := &FailedAccessingDirError{}
8189
require.True(t, errors.As(errs[0].Error(), &badDir))
82-
require.Empty(t, objs)
90+
require.Empty(t, resFinder.workloads)
91+
require.Empty(t, resFinder.services)
92+
require.Empty(t, resFinder.configmaps)
8393
}
8494

8595
func TestSearchForManifests(t *testing.T) {

0 commit comments

Comments
 (0)