Skip to content

Commit

Permalink
clustersynchro: fix duplicate sync conditions
Browse files Browse the repository at this point in the history
Signed-off-by: Iceber Gu <wei.cai-nat@daocloud.io>
  • Loading branch information
Iceber committed Jan 26, 2022
1 parent 08d78bf commit 5b6f1d9
Showing 1 changed file with 56 additions and 62 deletions.
118 changes: 56 additions & 62 deletions pkg/synchromanager/clustersynchro/cluster_synchro.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,86 +167,32 @@ func (s *ClusterSynchro) SetResources(clusterResources []clustersv1alpha1.Cluste
for _, resources := range clusterResources {
for _, resource := range resources.Resources {
gr := schema.GroupResource{Group: resources.Group, Resource: resource}
gvks, err := s.restmapper.KindsFor(gr.WithVersion(""))

supportedGVKs, err := s.restmapper.KindsFor(gr.WithVersion(""))
if err != nil {
klog.ErrorS(fmt.Errorf("Cluster not supported resource: %v", err), "Skip resource sync", "cluster", s.name, "resource", gr)
continue
}

// filter cluster unsupported resource
if len(gvks) == 0 {
klog.InfoS("Skip resource sync, cluster not supported resource", "cluster", s.name, "resource", gr)
syncVersions, isLegacyResource, err := negotiateSyncVersions(resources.Versions, supportedGVKs)
if err != nil {
klog.InfoS("Skip resource sync", "cluster", s.name, "resource", gr, "reason", err)
continue
}

mapper, err := s.restmapper.RESTMapping(gvks[0].GroupKind(), gvks[0].Version)
mapper, err := s.restmapper.RESTMapping(supportedGVKs[0].GroupKind(), supportedGVKs[0].Version)
if err != nil {
klog.ErrorS(err, "Skip resource sync", "cluster", s.name, "resource", gr)
continue
}

var structuredObject bool
versions := resources.Versions
legacyResourceVersions := resourcescheme.LegacyResourceScheme.VersionsForGroupKind(gvks[0].GroupKind())
switch {
case len(legacyResourceVersions) != 0:
// kube resource
var preferredVersion schema.GroupVersion

// gvks is cluster supported versions
// gvs is cluster pedia supported versions
for _, gvk := range gvks {
for _, gv := range legacyResourceVersions {
if gvk.GroupVersion() == gv {
preferredVersion = gv
}
}
}
// if not get preferred version, skip resource
if preferredVersion.Empty() {
klog.ErrorS(errors.New("Not found preferred version"), "Skip resource sync", "cluster", s.name, "resource", gr)
continue
}

structuredObject = true
versions = append(versions, preferredVersion.Version)
case len(versions) != 0:
// custom resource
// filter resource version using cluster supported resource versions
var filtered []string
for _, version := range versions {
for _, gvk := range gvks {
if gvk.Version == version {
filtered = append(filtered, version)
}
}
}
if len(filtered) == 0 {
// no supported version found for the cluster
// TODO(iceber): add warn log
continue
}

versions = filtered
default:
// For custom resources, if the version to be synchronized is not specified,
// then the first three versions available from the cluster are used
for _, gvk := range gvks {
versions = append(versions, gvk.Version)
}

if len(versions) > 3 {
versions = versions[:3]
}
}

info := &clustersv1alpha1.ClusterResourceStatus{
Kind: mapper.GroupVersionKind.Kind,
Resource: gr.Resource,
Namespaced: mapper.Scope.Name() == meta.RESTScopeNameNamespace,
}

for _, version := range versions {
for _, version := range syncVersions {
syncResource := gr.WithVersion(version)
storageConfig, err := s.resourceStorageConfig.NewConfig(syncResource)
if err != nil {
Expand All @@ -265,7 +211,7 @@ func (s *ClusterSynchro) SetResources(clusterResources []clustersv1alpha1.Cluste
}

if syncResource != storageResource {
if structuredObject {
if isLegacyResource {
config.convertor = resourcescheme.LegacyResourceScheme
} else {
config.convertor = resourcescheme.CustomResourceScheme
Expand Down Expand Up @@ -645,3 +591,51 @@ func checkKubeHealthy(client kubernetes.Interface) (bool, error) {
}
return true, nil
}

func negotiateSyncVersions(syncVersions []string, supportedGVKs []schema.GroupVersionKind) ([]string, bool, error) {
if len(supportedGVKs) == 0 {
return nil, false, errors.New("The supported versions are empty, the resource is not supported")
}

knowns := resourcescheme.LegacyResourceScheme.VersionsForGroupKind(supportedGVKs[0].GroupKind())
if len(knowns) != 0 {
var preferredVersion schema.GroupVersion
for _, gvk := range supportedGVKs {
for _, gv := range knowns {
if gvk.GroupVersion() == gv {
preferredVersion = gv
}
}
}
if preferredVersion.Empty() {
return nil, false, errors.New("The supported versions do not contain any known versions")
}
return []string{preferredVersion.Version}, true, nil
}

// Handles custom resources that specify sync versions
if len(syncVersions) != 0 {
var filtered []string
for _, version := range syncVersions {
for _, gvk := range supportedGVKs {
if gvk.Version == version {
filtered = append(syncVersions, version)
}
}
}
if len(filtered) == 0 {
return nil, false, errors.New("The supported versions do not contain any specified sync version")
}
return filtered, false, nil
}

// For custom resources, if the version to be synchronized is not specified,
// then the first three versions available from the cluster are used
for _, gvk := range supportedGVKs {
syncVersions = append(syncVersions, gvk.Version)
}
if len(syncVersions) > 3 {
syncVersions = syncVersions[:3]
}
return syncVersions, false, nil
}

0 comments on commit 5b6f1d9

Please sign in to comment.