Skip to content

Commit

Permalink
[prism] Universal runner improvements. (apache#27570)
Browse files Browse the repository at this point in the history
Co-authored-by: lostluck <13907733+lostluck@users.noreply.github.com>
  • Loading branch information
2 people authored and bullet03 committed Aug 11, 2023
1 parent a62a110 commit ade733b
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 5 deletions.
7 changes: 6 additions & 1 deletion sdks/go/pkg/beam/runners/universal/runnerlib/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func Execute(ctx context.Context, p *pipepb.Pipeline, endpoint string, opt *JobO
presult := &universalPipelineResult{}

bin := opt.Worker
if bin == "" {
if bin == "" && !opt.Loopback {
if self, ok := IsWorkerCompatibleBinary(); ok {
bin = self
log.Infof(ctx, "Using running binary as worker binary: '%v'", bin)
Expand All @@ -56,6 +56,11 @@ func Execute(ctx context.Context, p *pipepb.Pipeline, endpoint string, opt *JobO

bin = worker
}
} else if opt.Loopback {
// TODO(https://github.com/apache/beam/issues/27569: determine the canonical location for Beam temp files.
// In loopback mode, the binary is unused, so we can avoid an unnecessary compile step.
f, _ := os.CreateTemp(os.TempDir(), "beamloopbackworker-*")
bin = f.Name()
} else {
log.Infof(ctx, "Using specified worker binary: '%v'", bin)
}
Expand Down
30 changes: 26 additions & 4 deletions sdks/go/pkg/beam/runners/universal/runnerlib/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,17 @@ type JobOptions struct {
// Experiments are additional experiments.
Experiments []string

// TODO(herohde) 3/17/2018: add further parametrization as needed

// Worker is the worker binary override.
Worker string

// RetainDocker is an option to pass to the runner.
// RetainDocker is an option to pass to the runner indicating the docker containers should be cached.
RetainDocker bool

// Indicates a limit on parallelism the runner should impose.
Parallelism int

// Loopback indicates this job is running in loopback mode and will reconnect to the local process.
Loopback bool
}

// Prepare prepares a job to the given job service. It returns the preparation id
Expand Down Expand Up @@ -101,10 +103,17 @@ func WaitForCompletion(ctx context.Context, client jobpb.JobServiceClient, jobID
return errors.Wrap(err, "failed to get job stream")
}

mostRecentError := errors.New("<no error received, see runner logs>")
var errReceived, jobFailed bool

for {
msg, err := stream.Recv()
if err != nil {
if err == io.EOF {
if jobFailed {
// Connection finished with a failed status, so produce what we have.
return errors.Errorf("job %v failed:\n%w", jobID, mostRecentError)
}
return nil
}
return err
Expand All @@ -120,7 +129,11 @@ func WaitForCompletion(ctx context.Context, client jobpb.JobServiceClient, jobID
case jobpb.JobState_DONE, jobpb.JobState_CANCELLED:
return nil
case jobpb.JobState_FAILED:
return errors.Errorf("job %v failed", jobID)
jobFailed = true
if errReceived {
return errors.Errorf("job %v failed:\n%w", jobID, mostRecentError)
}
// Otherwise, wait for at least one error log from the runner, or the connection to close.
}

case msg.GetMessageResponse() != nil:
Expand All @@ -129,6 +142,15 @@ func WaitForCompletion(ctx context.Context, client jobpb.JobServiceClient, jobID
text := fmt.Sprintf("%v (%v): %v", resp.GetTime(), resp.GetMessageId(), resp.GetMessageText())
log.Output(ctx, messageSeverity(resp.GetImportance()), 1, text)

if resp.GetImportance() >= jobpb.JobMessage_JOB_MESSAGE_ERROR {
errReceived = true
mostRecentError = errors.New(resp.GetMessageText())

if jobFailed {
return errors.Errorf("job %v failed:\n%w", jobID, mostRecentError)
}
}

default:
return errors.Errorf("unexpected job update: %v", proto.MarshalTextString(msg))
}
Expand Down
1 change: 1 addition & 0 deletions sdks/go/pkg/beam/runners/universal/universal.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func Execute(ctx context.Context, p *beam.Pipeline) (beam.PipelineResult, error)
Worker: *jobopts.WorkerBinary,
RetainDocker: *jobopts.RetainDockerContainers,
Parallelism: *jobopts.Parallelism,
Loopback: jobopts.IsLoopback(),
}
return runnerlib.Execute(ctx, pipeline, endpoint, opt, *jobopts.Async)
}

0 comments on commit ade733b

Please sign in to comment.