Skip to content

Commit

Permalink
Improve behavior after exception in begin/end transitions
Browse files Browse the repository at this point in the history
This PR covers job, stream, and ProcessBlock
  • Loading branch information
wddgit committed Jul 11, 2024
1 parent e232a9b commit 0a92808
Show file tree
Hide file tree
Showing 43 changed files with 1,611 additions and 552 deletions.
9 changes: 5 additions & 4 deletions FWCore/Framework/bin/cmsRun.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -267,26 +267,27 @@ int main(int argc, const char* argv[]) {
TaskCleanupSentry sentry{proc.get()};

alwaysAddContext = false;

proc.on();
context = "Calling beginJob";
proc->beginJob();

// EventSetupsController uses pointers to the ParameterSet
// owned by ProcessDesc while it is dealing with sharing of
// ESProducers among the top-level process and the
// SubProcesses. Therefore the ProcessDesc needs to be kept
// alive until the beginJob transition has finished.
processDesc.reset();

alwaysAddContext = false;
context =
"Calling EventProcessor::runToCompletion (which does almost everything after beginJob and before endJob)";
proc.on();
auto status = proc->runToCompletion();
if (status == edm::EventProcessor::epSignal) {
returnCode = edm::errors::CaughtSignal;
}
proc.off();

context = "Calling endJob";
proc.off();
context = "Calling endJob and endStream";
proc->endJob();
});
return returnCode;
Expand Down
9 changes: 9 additions & 0 deletions FWCore/Framework/interface/EventProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ configured in the user's main() function, and is set running.
#include "FWCore/Utilities/interface/get_underlying_safe.h"
#include "FWCore/Utilities/interface/propagate_const.h"

#include "oneapi/tbb/task_group.h"

#include <atomic>
#include <map>
#include <memory>
Expand All @@ -46,6 +48,7 @@ configured in the user's main() function, and is set running.

namespace edm {

class ExceptionCollector;
class ExceptionToActionTable;
class BranchIDListHelper;
class MergeableRunProductMetadata;
Expand Down Expand Up @@ -120,11 +123,15 @@ namespace edm {
*/
void beginJob();

void beginStream();

/**This should be called before the EventProcessor is destroyed
throws if any module's endJob throws an exception.
*/
void endJob();

void endStream(ExceptionCollector&) noexcept;

// -------------

// Same as runToCompletion(false) but since it was used extensively
Expand Down Expand Up @@ -351,6 +358,8 @@ namespace edm {
std::shared_ptr<std::recursive_mutex> sourceMutex_;
PrincipalCache principalCache_;
bool beginJobCalled_;
bool beginJobStartedModules_ = false;
bool beginJobSucceeded_ = false;
bool shouldWeStop_;
bool fileModeNoMerge_;
std::string exceptionMessageFiles_;
Expand Down
12 changes: 9 additions & 3 deletions FWCore/Framework/interface/GlobalSchedule.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "FWCore/MessageLogger/interface/ExceptionMessages.h"
#include "FWCore/ServiceRegistry/interface/GlobalContext.h"
#include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
#include "FWCore/ServiceRegistry/interface/ServiceRegistryfwd.h"
#include "FWCore/ServiceRegistry/interface/ServiceToken.h"
#include "FWCore/Utilities/interface/Algorithms.h"
#include "FWCore/Utilities/interface/BranchType.h"
Expand All @@ -38,9 +39,7 @@

namespace edm {

class ActivityRegistry;
class ExceptionCollector;
class ProcessContext;
class PreallocationConfiguration;
class ModuleRegistry;
class TriggerResultInserter;
Expand Down Expand Up @@ -76,7 +75,9 @@ namespace edm {

void beginJob(ProductRegistry const&,
eventsetup::ESRecordsToProductResolverIndices const&,
ProcessBlockHelperBase const&);
ProcessBlockHelperBase const&,
PathsAndConsumesOfModulesBase const&,
ProcessContext const&);
void endJob(ExceptionCollector& collector);

/// Return a vector allowing const access to all the
Expand Down Expand Up @@ -119,6 +120,9 @@ namespace edm {
std::vector<edm::propagate_const<WorkerPtr>> extraWorkers_;
ProcessContext const* processContext_;
unsigned int numberOfConcurrentLumis_;
unsigned int numberOfConcurrentRuns_;
unsigned int numberOfConcurrentProcessBlocks_ = 1;
unsigned int numberOfConcurrentJobs_ = 1;
};

template <typename T>
Expand Down Expand Up @@ -155,6 +159,8 @@ namespace edm {
unsigned int managerIndex = principal.index();
if constexpr (T::branchType_ == InRun) {
managerIndex += numberOfConcurrentLumis_;
} else if constexpr (T::branchType_ == InProcess) {
managerIndex += (numberOfConcurrentLumis_ + numberOfConcurrentRuns_);
}
WorkerManager& workerManager = workerManagers_[managerIndex];
workerManager.resetAll();
Expand Down
12 changes: 7 additions & 5 deletions FWCore/Framework/interface/Schedule.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
#include "FWCore/MessageLogger/interface/JobReport.h"
#include "FWCore/MessageLogger/interface/MessageLogger.h"
#include "FWCore/ServiceRegistry/interface/Service.h"
#include "FWCore/ServiceRegistry/interface/ServiceRegistryfwd.h"
#include "FWCore/Utilities/interface/Algorithms.h"
#include "FWCore/Utilities/interface/BranchType.h"
#include "FWCore/Utilities/interface/ConvertException.h"
Expand All @@ -85,6 +86,7 @@
#include <array>
#include <map>
#include <memory>
#include <mutex>
#include <set>
#include <string>
#include <vector>
Expand All @@ -100,13 +102,11 @@ namespace edm {
class ESRecordsToProductResolverIndices;
}

class ActivityRegistry;
class BranchIDListHelper;
class EventTransitionInfo;
class ExceptionCollector;
class MergeableRunProductMetadata;
class OutputModuleCommunicator;
class ProcessContext;
class ProductRegistry;
class PreallocationConfiguration;
class StreamSchedule;
Expand Down Expand Up @@ -171,11 +171,13 @@ namespace edm {

void beginJob(ProductRegistry const&,
eventsetup::ESRecordsToProductResolverIndices const&,
ProcessBlockHelperBase const&);
ProcessBlockHelperBase const&,
PathsAndConsumesOfModulesBase const&,
ProcessContext const&);
void endJob(ExceptionCollector& collector);

void beginStream(unsigned int);
void endStream(unsigned int);
void beginStream(unsigned int streamID);
void endStream(unsigned int streamID, ExceptionCollector& collector, std::mutex& collectorMutex) noexcept;

// Write the luminosity block
void writeLumiAsync(WaitingTaskHolder iTask,
Expand Down
38 changes: 18 additions & 20 deletions FWCore/Framework/interface/StreamSchedule.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@
#include "FWCore/MessageLogger/interface/JobReport.h"
#include "FWCore/MessageLogger/interface/MessageLogger.h"
#include "FWCore/ServiceRegistry/interface/Service.h"
#include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
#include "FWCore/ServiceRegistry/interface/ServiceRegistryfwd.h"
#include "FWCore/ServiceRegistry/interface/ServiceToken.h"
#include "FWCore/ServiceRegistry/interface/StreamContext.h"
#include "FWCore/Concurrency/interface/FunctorTask.h"
#include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
Expand All @@ -88,8 +90,10 @@
#include "FWCore/Utilities/interface/propagate_const.h"
#include "FWCore/Utilities/interface/thread_safety_macros.h"

#include <exception>
#include <map>
#include <memory>
#include <mutex>
#include <set>
#include <string>
#include <vector>
Expand All @@ -111,8 +115,6 @@ namespace edm {
class PathStatusInserter;
class EndPathStatusInserter;
class PreallocationConfiguration;
class WaitingTaskHolder;

class ConditionalTaskHelper;

namespace service {
Expand All @@ -123,7 +125,6 @@ namespace edm {
public:
typedef std::vector<std::string> vstring;
typedef std::vector<Path> TrigPaths;
typedef std::vector<Path> NonTrigPaths;
typedef std::shared_ptr<HLTGlobalStatus> TrigResPtr;
typedef std::shared_ptr<HLTGlobalStatus const> TrigResConstPtr;
typedef std::shared_ptr<Worker> WorkerPtr;
Expand Down Expand Up @@ -162,7 +163,7 @@ namespace edm {
bool cleaningUpAfterException = false);

void beginStream();
void endStream();
void endStream(ExceptionCollector& collector, std::mutex& collectorMutex) noexcept;

StreamID streamID() const { return streamID_; }

Expand Down Expand Up @@ -306,12 +307,9 @@ namespace edm {
void preScheduleSignal(StreamContext const*) const;

template <typename T>
void postScheduleSignal(StreamContext const*, ServiceWeakToken const&, std::exception_ptr&) const noexcept;
void postScheduleSignal(StreamContext const*, std::exception_ptr&) const noexcept;

void handleException(StreamContext const&,
ServiceWeakToken const&,
bool cleaningUpAfterException,
std::exception_ptr&) const noexcept;
void handleException(StreamContext const&, bool cleaningUpAfterException, std::exception_ptr&) const noexcept;

WorkerManager workerManagerBeginEnd_;
WorkerManager workerManagerRuns_;
Expand Down Expand Up @@ -370,11 +368,15 @@ namespace edm {
auto doneTask = make_waiting_task([this, iHolder = std::move(iHolder), cleaningUpAfterException, weakToken](
std::exception_ptr const* iPtr) mutable {
std::exception_ptr excpt;
if (iPtr) {
excpt = *iPtr;
handleException(streamContext_, weakToken, cleaningUpAfterException, excpt);
}
postScheduleSignal<T>(&streamContext_, weakToken, excpt);
{
ServiceRegistry::Operate op(weakToken.lock());

if (iPtr) {
excpt = *iPtr;
handleException(streamContext_, cleaningUpAfterException, excpt);
}
postScheduleSignal<T>(&streamContext_, excpt);
} // release service token before calling doneWaiting
iHolder.doneWaiting(excpt);
});

Expand All @@ -391,7 +393,7 @@ namespace edm {
preScheduleSignal<T>(&streamContext_);
workerManager->resetAll();
} catch (...) {
h.doneWaiting(std::current_exception());
h.presetTaskAsFailed(std::current_exception());
return;
}

Expand Down Expand Up @@ -430,13 +432,9 @@ namespace edm {

template <typename T>
void StreamSchedule::postScheduleSignal(StreamContext const* streamContext,
ServiceWeakToken const& weakToken,
std::exception_ptr& excpt) const noexcept {
try {
convertException::wrap([this, &weakToken, streamContext]() {
ServiceRegistry::Operate op(weakToken.lock());
T::postScheduleSignal(actReg_.get(), streamContext);
});
convertException::wrap([this, streamContext]() { T::postScheduleSignal(actReg_.get(), streamContext); });
} catch (cms::Exception& ex) {
if (not excpt) {
std::ostringstream ost;
Expand Down
11 changes: 7 additions & 4 deletions FWCore/Framework/interface/SubProcess.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "FWCore/Framework/interface/ProductSelector.h"
#include "FWCore/ServiceRegistry/interface/ProcessContext.h"
#include "FWCore/ServiceRegistry/interface/ServiceLegacy.h"
#include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
#include "FWCore/ServiceRegistry/interface/ServiceToken.h"
#include "FWCore/Utilities/interface/Algorithms.h"
#include "FWCore/Utilities/interface/BranchType.h"
Expand All @@ -23,6 +24,7 @@

#include <map>
#include <memory>
#include <mutex>
#include <set>
#include <vector>

Expand All @@ -32,6 +34,7 @@ namespace edm {
class BranchIDListHelper;
class EventPrincipal;
class EventSetupImpl;
class ExceptionCollector;
class HistoryAppender;
class LuminosityBlockPrincipal;
class LumiTransitionInfo;
Expand Down Expand Up @@ -86,7 +89,7 @@ namespace edm {
std::vector<ModuleProcessName> keepOnlyConsumedUnscheduledModules(bool deleteModules);

void doBeginJob();
void doEndJob();
void doEndJob(ExceptionCollector&);

void doEventAsync(WaitingTaskHolder iHolder,
EventPrincipal const& principal,
Expand All @@ -113,8 +116,8 @@ namespace edm {
LumiTransitionInfo const& iTransitionInfo,
bool cleaningUpAfterException);

void doBeginStream(unsigned int);
void doEndStream(unsigned int);
void doBeginStream(unsigned int streamID);
void doEndStream(unsigned int streamID, ExceptionCollector& collector, std::mutex& collectorMutex) noexcept;
void doStreamBeginRunAsync(WaitingTaskHolder iHolder, unsigned int iID, RunTransitionInfo const&);

void doStreamEndRunAsync(WaitingTaskHolder iHolder,
Expand Down Expand Up @@ -238,7 +241,7 @@ namespace edm {

private:
void beginJob();
void endJob();
void endJob(ExceptionCollector&);
void processAsync(WaitingTaskHolder iHolder,
EventPrincipal const& e,
std::vector<std::shared_ptr<const EventSetupImpl>> const*);
Expand Down
12 changes: 7 additions & 5 deletions FWCore/Framework/interface/WorkerManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
#include "FWCore/Framework/interface/Frameworkfwd.h"
#include "FWCore/Framework/interface/UnscheduledCallProducer.h"
#include "FWCore/Framework/interface/WorkerRegistry.h"
#include "FWCore/ServiceRegistry/interface/ParentContext.h"
#include "FWCore/ServiceRegistry/interface/ServiceRegistryfwd.h"
#include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
#include "FWCore/Utilities/interface/StreamID.h"

#include <memory>
#include <mutex>
#include <set>
#include <string>
#include <utility>
Expand Down Expand Up @@ -70,12 +72,12 @@ namespace edm {

void beginJob(ProductRegistry const& iRegistry,
eventsetup::ESRecordsToProductResolverIndices const&,
ProcessBlockHelperBase const&);
void endJob();
void endJob(ExceptionCollector& collector);
ProcessBlockHelperBase const&,
GlobalContext const&);
void endJob(ExceptionCollector&, GlobalContext const&);

void beginStream(StreamID iID, StreamContext& streamContext);
void endStream(StreamID iID, StreamContext& streamContext);
void beginStream(StreamID, StreamContext const&);
void endStream(StreamID, StreamContext const&, ExceptionCollector&, std::mutex& collectorMutex) noexcept;

AllWorkers const& allWorkers() const { return allWorkers_; }
AllWorkers const& unscheduledWorkers() const { return unscheduled_.workers(); }
Expand Down
Loading

0 comments on commit 0a92808

Please sign in to comment.