diff --git a/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go b/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go index 68db9b0ee76a5..5b49d2f947397 100644 --- a/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go +++ b/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go @@ -41,7 +41,7 @@ func Execute(ctx context.Context, p *pipepb.Pipeline, endpoint string, opt *JobO presult := &universalPipelineResult{} bin := opt.Worker - if bin == "" { + if bin == "" && !opt.Loopback { if self, ok := IsWorkerCompatibleBinary(); ok { bin = self log.Infof(ctx, "Using running binary as worker binary: '%v'", bin) @@ -56,6 +56,11 @@ func Execute(ctx context.Context, p *pipepb.Pipeline, endpoint string, opt *JobO bin = worker } + } else if opt.Loopback { + // TODO(https://github.com/apache/beam/issues/27569: determine the canonical location for Beam temp files. + // In loopback mode, the binary is unused, so we can avoid an unnecessary compile step. + f, _ := os.CreateTemp(os.TempDir(), "beamloopbackworker-*") + bin = f.Name() } else { log.Infof(ctx, "Using specified worker binary: '%v'", bin) } diff --git a/sdks/go/pkg/beam/runners/universal/runnerlib/job.go b/sdks/go/pkg/beam/runners/universal/runnerlib/job.go index daa6896da406a..5752b33892bb4 100644 --- a/sdks/go/pkg/beam/runners/universal/runnerlib/job.go +++ b/sdks/go/pkg/beam/runners/universal/runnerlib/job.go @@ -39,15 +39,17 @@ type JobOptions struct { // Experiments are additional experiments. Experiments []string - // TODO(herohde) 3/17/2018: add further parametrization as needed - // Worker is the worker binary override. Worker string - // RetainDocker is an option to pass to the runner. + // RetainDocker is an option to pass to the runner indicating the docker containers should be cached. RetainDocker bool + // Indicates a limit on parallelism the runner should impose. Parallelism int + + // Loopback indicates this job is running in loopback mode and will reconnect to the local process. + Loopback bool } // Prepare prepares a job to the given job service. It returns the preparation id @@ -101,10 +103,17 @@ func WaitForCompletion(ctx context.Context, client jobpb.JobServiceClient, jobID return errors.Wrap(err, "failed to get job stream") } + mostRecentError := errors.New("") + var errReceived, jobFailed bool + for { msg, err := stream.Recv() if err != nil { if err == io.EOF { + if jobFailed { + // Connection finished with a failed status, so produce what we have. + return errors.Errorf("job %v failed:\n%w", jobID, mostRecentError) + } return nil } return err @@ -120,7 +129,11 @@ func WaitForCompletion(ctx context.Context, client jobpb.JobServiceClient, jobID case jobpb.JobState_DONE, jobpb.JobState_CANCELLED: return nil case jobpb.JobState_FAILED: - return errors.Errorf("job %v failed", jobID) + jobFailed = true + if errReceived { + return errors.Errorf("job %v failed:\n%w", jobID, mostRecentError) + } + // Otherwise, wait for at least one error log from the runner, or the connection to close. } case msg.GetMessageResponse() != nil: @@ -129,6 +142,15 @@ func WaitForCompletion(ctx context.Context, client jobpb.JobServiceClient, jobID text := fmt.Sprintf("%v (%v): %v", resp.GetTime(), resp.GetMessageId(), resp.GetMessageText()) log.Output(ctx, messageSeverity(resp.GetImportance()), 1, text) + if resp.GetImportance() >= jobpb.JobMessage_JOB_MESSAGE_ERROR { + errReceived = true + mostRecentError = errors.New(resp.GetMessageText()) + + if jobFailed { + return errors.Errorf("job %v failed:\n%w", jobID, mostRecentError) + } + } + default: return errors.Errorf("unexpected job update: %v", proto.MarshalTextString(msg)) } diff --git a/sdks/go/pkg/beam/runners/universal/universal.go b/sdks/go/pkg/beam/runners/universal/universal.go index 299a64acdd691..8af9e91e1e15e 100644 --- a/sdks/go/pkg/beam/runners/universal/universal.go +++ b/sdks/go/pkg/beam/runners/universal/universal.go @@ -101,6 +101,7 @@ func Execute(ctx context.Context, p *beam.Pipeline) (beam.PipelineResult, error) Worker: *jobopts.WorkerBinary, RetainDocker: *jobopts.RetainDockerContainers, Parallelism: *jobopts.Parallelism, + Loopback: jobopts.IsLoopback(), } return runnerlib.Execute(ctx, pipeline, endpoint, opt, *jobopts.Async) }