Skip to content

Commit

Permalink
Merge pull request #1966 from jedevc/rework-driver-resolution
Browse files Browse the repository at this point in the history
build: rework node resolution
  • Loading branch information
tonistiigi committed Nov 14, 2023
2 parents 0408f3a + 47eb405 commit 629b555
Show file tree
Hide file tree
Showing 3 changed files with 653 additions and 235 deletions.
270 changes: 35 additions & 235 deletions build/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,218 +135,6 @@ func filterAvailableNodes(nodes []builder.Node) ([]builder.Node, error) {
return nil, err
}

type driverPair struct {
driverIndex int
platforms []specs.Platform
so *client.SolveOpt
bopts gateway.BuildOpts
}

func driverIndexes(m map[string][]driverPair) []int {
out := make([]int, 0, len(m))
visited := map[int]struct{}{}
for _, dp := range m {
for _, d := range dp {
if _, ok := visited[d.driverIndex]; ok {
continue
}
visited[d.driverIndex] = struct{}{}
out = append(out, d.driverIndex)
}
}
return out
}

func allIndexes(l int) []int {
out := make([]int, 0, l)
for i := 0; i < l; i++ {
out = append(out, i)
}
return out
}

func ensureBooted(ctx context.Context, nodes []builder.Node, idxs []int, pw progress.Writer) ([]*client.Client, error) {
clients := make([]*client.Client, len(nodes))

baseCtx := ctx
eg, ctx := errgroup.WithContext(ctx)

for _, i := range idxs {
func(i int) {
eg.Go(func() error {
c, err := driver.Boot(ctx, baseCtx, nodes[i].Driver, pw)
if err != nil {
return err
}
clients[i] = c
return nil
})
}(i)
}

if err := eg.Wait(); err != nil {
return nil, err
}

return clients, nil
}

func splitToDriverPairs(availablePlatforms map[string]int, opt map[string]Options) map[string][]driverPair {
m := map[string][]driverPair{}
for k, opt := range opt {
mm := map[int][]specs.Platform{}
for _, p := range opt.Platforms {
k := platforms.Format(p)
idx := availablePlatforms[k] // default 0
pp := mm[idx]
pp = append(pp, p)
mm[idx] = pp
}
// if no platform is specified, use first driver
if len(mm) == 0 {
mm[0] = nil
}
dps := make([]driverPair, 0, 2)
for idx, pp := range mm {
dps = append(dps, driverPair{driverIndex: idx, platforms: pp})
}
m[k] = dps
}
return m
}

func resolveDrivers(ctx context.Context, nodes []builder.Node, opt map[string]Options, pw progress.Writer) (map[string][]driverPair, []*client.Client, error) {
dps, clients, err := resolveDriversBase(ctx, nodes, opt, pw)
if err != nil {
return nil, nil, err
}

bopts := make([]gateway.BuildOpts, len(clients))

span, ctx := tracing.StartSpan(ctx, "load buildkit capabilities", trace.WithSpanKind(trace.SpanKindInternal))

eg, ctx := errgroup.WithContext(ctx)
for i, c := range clients {
if c == nil {
continue
}

func(i int, c *client.Client) {
eg.Go(func() error {
clients[i].Build(ctx, client.SolveOpt{
Internal: true,
}, "buildx", func(ctx context.Context, c gateway.Client) (*gateway.Result, error) {
bopts[i] = c.BuildOpts()
return nil, nil
}, nil)
return nil
})
}(i, c)
}

err = eg.Wait()
tracing.FinishWithError(span, err)
if err != nil {
return nil, nil, err
}
for key := range dps {
for i, dp := range dps[key] {
dps[key][i].bopts = bopts[dp.driverIndex]
}
}

return dps, clients, nil
}

func resolveDriversBase(ctx context.Context, nodes []builder.Node, opt map[string]Options, pw progress.Writer) (map[string][]driverPair, []*client.Client, error) {
availablePlatforms := map[string]int{}
for i, node := range nodes {
for _, p := range node.Platforms {
availablePlatforms[platforms.Format(p)] = i
}
}

undetectedPlatform := false
allPlatforms := map[string]struct{}{}
for _, opt := range opt {
for _, p := range opt.Platforms {
k := platforms.Format(p)
allPlatforms[k] = struct{}{}
if _, ok := availablePlatforms[k]; !ok {
undetectedPlatform = true
}
}
}

// fast path
if len(nodes) == 1 || len(allPlatforms) == 0 {
m := map[string][]driverPair{}
for k, opt := range opt {
m[k] = []driverPair{{driverIndex: 0, platforms: opt.Platforms}}
}
clients, err := ensureBooted(ctx, nodes, driverIndexes(m), pw)
if err != nil {
return nil, nil, err
}
return m, clients, nil
}

// map based on existing platforms
if !undetectedPlatform {
m := splitToDriverPairs(availablePlatforms, opt)
clients, err := ensureBooted(ctx, nodes, driverIndexes(m), pw)
if err != nil {
return nil, nil, err
}
return m, clients, nil
}

// boot all drivers in k
clients, err := ensureBooted(ctx, nodes, allIndexes(len(nodes)), pw)
if err != nil {
return nil, nil, err
}

eg, ctx := errgroup.WithContext(ctx)
workers := make([][]*client.WorkerInfo, len(clients))

for i, c := range clients {
if c == nil {
continue
}
func(i int) {
eg.Go(func() error {
ww, err := clients[i].ListWorkers(ctx)
if err != nil {
return errors.Wrap(err, "listing workers")
}
workers[i] = ww
return nil
})

}(i)
}

if err := eg.Wait(); err != nil {
return nil, nil, err
}

for i, ww := range workers {
for _, w := range ww {
for _, p := range w.Platforms {
p = platforms.Normalize(p)
ps := platforms.Format(p)

if _, ok := availablePlatforms[ps]; !ok {
availablePlatforms[ps] = i
}
}
}
}

return splitToDriverPairs(availablePlatforms, opt), clients, nil
}

func toRepoOnly(in string) (string, error) {
m := map[string]struct{}{}
p := strings.Split(in, ",")
Expand Down Expand Up @@ -717,10 +505,14 @@ func BuildWithResultHandler(ctx context.Context, nodes []builder.Node, opt map[s
}
}

m, clients, err := resolveDrivers(ctx, nodes, opt, w)
drivers, err := resolveDrivers(ctx, nodes, opt, w)
if err != nil {
return nil, err
}
driversSolveOpts := make(map[string][]*client.SolveOpt, len(drivers))
for k, dps := range drivers {
driversSolveOpts[k] = make([]*client.SolveOpt, len(dps))
}

defers := make([]func(), 0, 2)
defer func() {
Expand All @@ -734,19 +526,22 @@ func BuildWithResultHandler(ctx context.Context, nodes []builder.Node, opt map[s
eg, ctx := errgroup.WithContext(ctx)

for k, opt := range opt {
multiDriver := len(m[k]) > 1
multiDriver := len(drivers[k]) > 1
hasMobyDriver := false
gitattrs, err := getGitAttributes(ctx, opt.Inputs.ContextPath, opt.Inputs.DockerfilePath)
if err != nil {
logrus.WithError(err).Warn("current commit information was not captured by the build")
}
for i, np := range m[k] {
node := nodes[np.driverIndex]
if node.Driver.IsMobyDriver() {
for i, np := range drivers[k] {
if np.Node().Driver.IsMobyDriver() {
hasMobyDriver = true
}
opt.Platforms = np.platforms
so, release, err := toSolveOpt(ctx, node, multiDriver, opt, np.bopts, configDir, w, docker)
gatewayOpts, err := np.BuildOpts(ctx)
if err != nil {
return nil, err
}
so, release, err := toSolveOpt(ctx, np.Node(), multiDriver, opt, gatewayOpts, configDir, w, docker)
if err != nil {
return nil, err
}
Expand All @@ -757,7 +552,7 @@ func BuildWithResultHandler(ctx context.Context, nodes []builder.Node, opt map[s
so.FrontendAttrs[k] = v
}
defers = append(defers, release)
m[k][i].so = so
driversSolveOpts[k][i] = so
}
for _, at := range opt.Session {
if s, ok := at.(interface {
Expand All @@ -771,8 +566,8 @@ func BuildWithResultHandler(ctx context.Context, nodes []builder.Node, opt map[s

// validate for multi-node push
if hasMobyDriver && multiDriver {
for _, dp := range m[k] {
for _, e := range dp.so.Exports {
for _, so := range driversSolveOpts[k] {
for _, e := range so.Exports {
if e.Type == "moby" {
if ok, _ := strconv.ParseBool(e.Attrs["push"]); ok {
return nil, errors.Errorf("multi-node push can't currently be performed with the docker driver, please switch to a different driver")
Expand All @@ -785,12 +580,13 @@ func BuildWithResultHandler(ctx context.Context, nodes []builder.Node, opt map[s

// validate that all links between targets use same drivers
for name := range opt {
dps := m[name]
for _, dp := range dps {
for k, v := range dp.so.FrontendAttrs {
dps := drivers[name]
for i, dp := range dps {
so := driversSolveOpts[name][i]
for k, v := range so.FrontendAttrs {
if strings.HasPrefix(k, "context:") && strings.HasPrefix(v, "target:") {
k2 := strings.TrimPrefix(v, "target:")
dps2, ok := m[k2]
dps2, ok := drivers[k2]
if !ok {
return nil, errors.Errorf("failed to find target %s for context %s", k2, strings.TrimPrefix(k, "context:")) // should be validated before already
}
Expand Down Expand Up @@ -818,8 +614,8 @@ func BuildWithResultHandler(ctx context.Context, nodes []builder.Node, opt map[s
for k, opt := range opt {
err := func(k string) error {
opt := opt
dps := m[k]
multiDriver := len(m[k]) > 1
dps := drivers[k]
multiDriver := len(drivers[k]) > 1

var span trace.Span
ctx := ctx
Expand All @@ -835,8 +631,9 @@ func BuildWithResultHandler(ctx context.Context, nodes []builder.Node, opt map[s
var insecurePush bool

for i, dp := range dps {
i, dp, so := i, dp, *dp.so
node := nodes[dp.driverIndex]
i, dp := i, dp
node := dp.Node()
so := driversSolveOpts[k][i]
if multiDriver {
for i, e := range so.Exports {
switch e.Type {
Expand Down Expand Up @@ -867,11 +664,14 @@ func BuildWithResultHandler(ctx context.Context, nodes []builder.Node, opt map[s

pw := progress.WithPrefix(w, k, multiTarget)

c := clients[dp.driverIndex]
c, err := dp.Client(ctx)
if err != nil {
return err
}
eg2.Go(func() error {
pw = progress.ResetTime(pw)

if err := waitContextDeps(ctx, dp.driverIndex, results, &so); err != nil {
if err := waitContextDeps(ctx, dp.driverIndex, results, so); err != nil {
return err
}

Expand Down Expand Up @@ -950,10 +750,10 @@ func BuildWithResultHandler(ctx context.Context, nodes []builder.Node, opt map[s
var rr *client.SolveResponse
if resultHandleFunc != nil {
var resultHandle *ResultHandle
resultHandle, rr, err = NewResultHandle(ctx, cc, so, "buildx", buildFunc, ch)
resultHandle, rr, err = NewResultHandle(ctx, cc, *so, "buildx", buildFunc, ch)
resultHandleFunc(dp.driverIndex, resultHandle)
} else {
rr, err = c.Build(ctx, so, "buildx", buildFunc, ch)
rr, err = c.Build(ctx, *so, "buildx", buildFunc, ch)
}
if desktop.BuildBackendEnabled() && node.Driver.HistoryAPISupported(ctx) {
buildRef := fmt.Sprintf("%s/%s/%s", node.Builder, node.Name, so.Ref)
Expand All @@ -977,7 +777,7 @@ func BuildWithResultHandler(ctx context.Context, nodes []builder.Node, opt map[s
rr.ExporterResponse[k] = string(v)
}

node := nodes[dp.driverIndex].Driver
node := dp.Node().Driver
if node.IsMobyDriver() {
for _, e := range so.Exports {
if e.Type == "moby" && e.Attrs["push"] != "" {
Expand Down Expand Up @@ -1069,7 +869,7 @@ func BuildWithResultHandler(ctx context.Context, nodes []builder.Node, opt map[s
if len(descs) > 0 {
var imageopt imagetools.Opt
for _, dp := range dps {
imageopt = nodes[dp.driverIndex].ImageOpt
imageopt = dp.Node().ImageOpt
break
}
names := strings.Split(pushNames, ",")
Expand Down
Loading

0 comments on commit 629b555

Please sign in to comment.