Skip to content

Commit

Permalink
impl ami resolution
Browse files Browse the repository at this point in the history
  • Loading branch information
jmdeal committed Jul 3, 2024
1 parent 197cfbd commit 41e94bd
Showing 1 changed file with 71 additions and 108 deletions.
179 changes: 71 additions & 108 deletions pkg/providers/amifamily/ami.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
"github.com/aws/aws-sdk-go/service/ssm"
"github.com/aws/aws-sdk-go/service/ssm/ssmiface"
"github.com/mitchellh/hashstructure/v2"
"github.com/patrickmn/go-cache"
"github.com/samber/lo"
Expand All @@ -36,6 +34,7 @@ import (
"github.com/aws/karpenter-provider-aws/pkg/apis/v1beta1"
"github.com/aws/karpenter-provider-aws/pkg/providers/version"

"github.com/aws/karpenter-provider-aws/pkg/providers/ssm"
"sigs.k8s.io/karpenter/pkg/cloudprovider"
"sigs.k8s.io/karpenter/pkg/scheduling"
"sigs.k8s.io/karpenter/pkg/utils/pretty"
Expand All @@ -48,10 +47,10 @@ type Provider interface {
type DefaultProvider struct {
sync.Mutex
cache *cache.Cache
ssm ssmiface.SSMAPI
ec2api ec2iface.EC2API
cm *pretty.ChangeMonitor
versionProvider version.Provider
ssmProvider ssm.Provider
}

type AMI struct {
Expand Down Expand Up @@ -134,33 +133,27 @@ func MapToInstanceTypes(instanceTypes []*cloudprovider.InstanceType, amis []v1be
return amiIDs
}

func NewDefaultProvider(versionProvider version.Provider, ssm ssmiface.SSMAPI, ec2api ec2iface.EC2API, cache *cache.Cache) *DefaultProvider {
func NewDefaultProvider(versionProvider version.Provider, ssmProvider ssm.Provider, ec2api ec2iface.EC2API, cache *cache.Cache) *DefaultProvider {
return &DefaultProvider{
cache: cache,
ssm: ssm,
ec2api: ec2api,
cm: pretty.NewChangeMonitor(),
versionProvider: versionProvider,
ssmProvider: ssmProvider,
}
}

// Get Returning a list of AMIs with its associated requirements
func (p *DefaultProvider) List(ctx context.Context, nodeClass *v1beta1.EC2NodeClass) (AMIs, error) {
p.Lock()
defer p.Unlock()

var err error
var amis AMIs
if len(nodeClass.Spec.AMISelectorTerms) == 0 {
amis, err = p.getDefaultAMIs(ctx, nodeClass)
if err != nil {
return nil, err
}
} else {
amis, err = p.getAMIs(ctx, nodeClass.Spec.AMISelectorTerms)
if err != nil {
return nil, err
}
queries, err := p.GetAMIQueries(ctx, nodeClass)
if err != nil {
return nil, fmt.Errorf("getting AMI queries, %w", err)
}
amis, err := p.getAMIs(ctx, queries)
if err != nil {
return nil, err
}
amis.Sort()
uniqueAMIs := lo.Uniq(lo.Map(amis, func(a AMI, _ int) string { return a.AmiID }))
Expand All @@ -171,58 +164,8 @@ func (p *DefaultProvider) List(ctx context.Context, nodeClass *v1beta1.EC2NodeCl
return amis, nil
}

func (p *DefaultProvider) getDefaultAMIs(ctx context.Context, nodeClass *v1beta1.EC2NodeClass) (res AMIs, err error) {
if images, ok := p.cache.Get(lo.FromPtr(nodeClass.Spec.AMIFamily)); ok {
// Ensure what's returned from this function is a deep-copy of AMIs so alterations
// to the data don't affect the original
return append(AMIs{}, images.(AMIs)...), nil
}
amiFamily := GetAMIFamily(nodeClass.Spec.AMIFamily, &Options{})
kubernetesVersion, err := p.versionProvider.Get(ctx)
if err != nil {
return nil, fmt.Errorf("getting kubernetes version %w", err)
}
defaultAMIs := amiFamily.DefaultAMIs(kubernetesVersion)
for _, ami := range defaultAMIs {
if id, err := p.resolveSSMParameter(ctx, ami.Query); err != nil {
log.FromContext(ctx).WithValues("query", ami.Query).Error(err, "failed discovering amis from ssm")
} else {
res = append(res, AMI{AmiID: id, Requirements: ami.Requirements})
}
}
// Resolve Name and CreationDate information into the DefaultAMIs
if err = p.ec2api.DescribeImagesPagesWithContext(ctx, &ec2.DescribeImagesInput{
Filters: []*ec2.Filter{{Name: aws.String("image-id"), Values: aws.StringSlice(lo.Map(res, func(a AMI, _ int) string { return a.AmiID }))}},
MaxResults: aws.Int64(500),
}, func(page *ec2.DescribeImagesOutput, _ bool) bool {
for i := range page.Images {
for j := range res {
if res[j].AmiID == aws.StringValue(page.Images[i].ImageId) {
res[j].Name = aws.StringValue(page.Images[i].Name)
res[j].CreationDate = aws.StringValue(page.Images[i].CreationDate)
}
}
}
return true
}); err != nil {
return nil, fmt.Errorf("describing images, %w", err)
}
p.cache.SetDefault(lo.FromPtr(nodeClass.Spec.AMIFamily), res)
return res, nil
}

func (p *DefaultProvider) resolveSSMParameter(ctx context.Context, ssmQuery string) (string, error) {
output, err := p.ssm.GetParameterWithContext(ctx, &ssm.GetParameterInput{Name: aws.String(ssmQuery)})
if err != nil {
return "", fmt.Errorf("getting ssm parameter %q, %w", ssmQuery, err)
}
ami := aws.StringValue(output.Parameter.Value)
return ami, nil
}

func (p *DefaultProvider) getAMIs(ctx context.Context, terms []v1beta1.AMISelectorTerm) (AMIs, error) {
filterAndOwnerSets := GetFilterAndOwnerSets(terms)
hash, err := hashstructure.Hash(filterAndOwnerSets, hashstructure.FormatV2, &hashstructure.HashOptions{SlicesAsSets: true})
func (p *DefaultProvider) getAMIs(ctx context.Context, queries []AMIQuery) (AMIs, error) {
hash, err := hashstructure.Hash(queries, hashstructure.FormatV2, &hashstructure.HashOptions{SlicesAsSets: true})
if err != nil {
return nil, err
}
Expand All @@ -232,36 +175,48 @@ func (p *DefaultProvider) getAMIs(ctx context.Context, terms []v1beta1.AMISelect
return append(AMIs{}, images.(AMIs)...), nil
}
images := map[uint64]AMI{}
for _, filtersAndOwners := range filterAndOwnerSets {
for _, query := range queries {
if err = p.ec2api.DescribeImagesPagesWithContext(ctx, &ec2.DescribeImagesInput{
// Don't include filters in the Describe Images call as EC2 API doesn't allow empty filters.
Filters: lo.Ternary(len(filtersAndOwners.Filters) > 0, filtersAndOwners.Filters, nil),
Owners: lo.Ternary(len(filtersAndOwners.Owners) > 0, aws.StringSlice(filtersAndOwners.Owners), nil),
Filters: lo.Ternary(len(query.Filters) > 0, query.Filters, nil),
Owners: lo.Ternary(len(query.Owners) > 0, aws.StringSlice(query.Owners), nil),
MaxResults: aws.Int64(1000),
}, func(page *ec2.DescribeImagesOutput, _ bool) bool {
for i := range page.Images {
reqs := p.getRequirementsFromImage(page.Images[i])
if !v1beta1.WellKnownArchitectures.Has(reqs.Get(v1.LabelArchStable).Any()) {
for _, image := range page.Images {
archRequirements := p.getRequirementsFromImage(image)
if !v1beta1.WellKnownArchitectures.Has(archRequirements.Get(v1.LabelArchStable).Any()) {
continue
}
reqsHash := lo.Must(hashstructure.Hash(reqs.NodeSelectorRequirements(), hashstructure.FormatV2, &hashstructure.HashOptions{SlicesAsSets: true}))
// If the proposed image is newer, store it so that we can return it
if v, ok := images[reqsHash]; ok {
candidateCreationTime, _ := time.Parse(time.RFC3339, lo.FromPtr(page.Images[i].CreationDate))
existingCreationTime, _ := time.Parse(time.RFC3339, v.CreationDate)
if existingCreationTime == candidateCreationTime && lo.FromPtr(page.Images[i].Name) < v.Name {
continue

requirementSets := func() []scheduling.Requirements {
if knownRequirements, ok := query.KnownRequirements[lo.FromPtr(image.ImageId)]; ok {
return lo.Map(knownRequirements, func(r scheduling.Requirements, _ int) scheduling.Requirements {
return lo.Assign(r, archRequirements)
})
}
return []scheduling.Requirements{archRequirements}
}()

for _, reqs := range requirementSets {
reqsHash := lo.Must(hashstructure.Hash(reqs.NodeSelectorRequirements(), hashstructure.FormatV2, &hashstructure.HashOptions{SlicesAsSets: true}))
// If the proposed image is newer, store it so that we can return it
if v, ok := images[reqsHash]; ok {
candidateCreationTime, _ := time.Parse(time.RFC3339, lo.FromPtr(image.CreationDate))
existingCreationTime, _ := time.Parse(time.RFC3339, v.CreationDate)
if existingCreationTime == candidateCreationTime && lo.FromPtr(image.Name) < v.Name {
continue
}
if candidateCreationTime.Unix() < existingCreationTime.Unix() {
continue
}
}
if candidateCreationTime.Unix() < existingCreationTime.Unix() {
continue
images[reqsHash] = AMI{
Name: lo.FromPtr(image.Name),
AmiID: lo.FromPtr(image.ImageId),
CreationDate: lo.FromPtr(image.CreationDate),
Requirements: reqs,
}
}
images[reqsHash] = AMI{
Name: lo.FromPtr(page.Images[i].Name),
AmiID: lo.FromPtr(page.Images[i].ImageId),
CreationDate: lo.FromPtr(page.Images[i].CreationDate),
Requirements: reqs,
}
}
return true
}); err != nil {
Expand All @@ -273,58 +228,66 @@ func (p *DefaultProvider) getAMIs(ctx context.Context, terms []v1beta1.AMISelect
}

type AMIQuery struct {
Filters []*ec2.Filter
Owners []string
KnownRequirements map[string][]scheduling.Requirements{}
Filters []*ec2.Filter
Owners []string
KnownRequirements map[string][]scheduling.Requirements
}

type FiltersAndOwners struct {
Filters []*ec2.Filter
Owners []string
}
func (p *DefaultProvider) GetAMIQueries(ctx context.Context, nodeClass *v1beta1.EC2NodeClass) ([]AMIQuery, error) {
// Aliases should be mutually exclusive (enforced via CEL validation), we'll treat it as such
if amiFamilyKey := nodeClass.AMIFamily(); amiFamilyKey != v1beta1.AMIFamilyCustom {
amiVersion := nodeClass.AMIVersion()
amiFamily := GetAMIFamily(&amiFamilyKey, nil)
kubernetesVersion, err := p.versionProvider.Get(ctx)
if err != nil {
return nil, fmt.Errorf("getting kubernetes version, %w", err)
}
query, err := amiFamily.AMIQuery(ctx, p.ssmProvider, kubernetesVersion, amiVersion)
return []AMIQuery{query}, err
}

func GetFilterAndOwnerSets(terms []v1beta1.AMISelectorTerm) (res []FiltersAndOwners) {
idFilter := &ec2.Filter{Name: aws.String("image-id")}
for _, term := range terms {
queries := []AMIQuery{}
for _, term := range nodeClass.Spec.AMISelectorTerms {
switch {
case term.ID != "":
idFilter.Values = append(idFilter.Values, aws.String(term.ID))
default:
elem := FiltersAndOwners{
query := AMIQuery{
Owners: lo.Ternary(term.Owner != "", []string{term.Owner}, []string{}),
}
if term.Name != "" {
// Default owners to self,amazon to ensure Karpenter only discovers cross-account AMIs if the user specifically allows it.
// Removing this default would cause Karpenter to discover publicly shared AMIs passing the name filter.
elem = FiltersAndOwners{
query = AMIQuery{
Owners: lo.Ternary(term.Owner != "", []string{term.Owner}, []string{"self", "amazon"}),
}
elem.Filters = append(elem.Filters, &ec2.Filter{
query.Filters = append(query.Filters, &ec2.Filter{
Name: aws.String("name"),
Values: aws.StringSlice([]string{term.Name}),
})

}
for k, v := range term.Tags {
if v == "*" {
elem.Filters = append(elem.Filters, &ec2.Filter{
query.Filters = append(query.Filters, &ec2.Filter{
Name: aws.String("tag-key"),
Values: []*string{aws.String(k)},
})
} else {
elem.Filters = append(elem.Filters, &ec2.Filter{
query.Filters = append(query.Filters, &ec2.Filter{
Name: aws.String(fmt.Sprintf("tag:%s", k)),
Values: []*string{aws.String(v)},
})
}
}
res = append(res, elem)
queries = append(queries, query)
}
}
if len(idFilter.Values) > 0 {
res = append(res, FiltersAndOwners{Filters: []*ec2.Filter{idFilter}})
queries = append(queries, AMIQuery{Filters: []*ec2.Filter{idFilter}})
}
return res
return queries, nil
}

func (p *DefaultProvider) getRequirementsFromImage(ec2Image *ec2.Image) scheduling.Requirements {
Expand Down

0 comments on commit 41e94bd

Please sign in to comment.