31#include <boost/timer/timer.hpp>
43 const Size nThreads,
const QuantLib::Date& today,
const QuantLib::ext::shared_ptr<ore::data::DateGrid>& dateGrid,
44 const Size nSamples,
const QuantLib::ext::shared_ptr<ore::data::Loader>& loader,
45 const QuantLib::ext::shared_ptr<ore::analytics::ScenarioGenerator>& scenarioGenerator,
46 const QuantLib::ext::shared_ptr<ore::data::EngineData>& engineData,
47 const QuantLib::ext::shared_ptr<ore::data::CurveConfigurations>& curveConfigs,
48 const QuantLib::ext::shared_ptr<ore::data::TodaysMarketParameters>& todaysMarketParams,
const std::string& configuration,
49 const QuantLib::ext::shared_ptr<ore::analytics::ScenarioSimMarketParameters>& simMarketData,
50 const bool useSpreadedTermStructures,
const bool cacheSimData,
51 const QuantLib::ext::shared_ptr<ore::analytics::ScenarioFilter>& scenarioFilter,
52 const QuantLib::ext::shared_ptr<ore::data::ReferenceDataManager>& referenceData,
54 const bool handlePseudoCurrenciesSimMarket,
const bool recalibrateModels,
55 const std::function<QuantLib::ext::shared_ptr<ore::analytics::NPVCube>(
const QuantLib::Date&,
const std::set<std::string>&,
56 const std::vector<QuantLib::Date>&,
57 const QuantLib::Size)>& cubeFactory,
58 const std::function<QuantLib::ext::shared_ptr<ore::analytics::NPVCube>(
59 const QuantLib::Date&,
const std::vector<QuantLib::Date>&,
const QuantLib::Size)>& nettingSetCubeFactory,
60 const std::function<QuantLib::ext::shared_ptr<ore::analytics::NPVCube>(
const QuantLib::Date&,
const std::set<std::string>&,
61 const std::vector<QuantLib::Date>&,
62 const QuantLib::Size)>& cptyCubeFactory,
63 const std::string& context,
const QuantLib::ext::shared_ptr<ore::analytics::Scenario>& offSetScenario)
64 : nThreads_(nThreads), today_(today), dateGrid_(dateGrid), nSamples_(nSamples), loader_(loader),
65 scenarioGenerator_(scenarioGenerator), engineData_(engineData), curveConfigs_(
curveConfigs),
66 todaysMarketParams_(todaysMarketParams), configuration_(configuration), simMarketData_(simMarketData),
67 useSpreadedTermStructures_(useSpreadedTermStructures), cacheSimData_(cacheSimData),
68 scenarioFilter_(scenarioFilter), referenceData_(referenceData), iborFallbackConfig_(iborFallbackConfig),
69 handlePseudoCurrenciesTodaysMarket_(handlePseudoCurrenciesTodaysMarket),
70 handlePseudoCurrenciesSimMarket_(handlePseudoCurrenciesSimMarket), recalibrateModels_(recalibrateModels),
71 cubeFactory_(cubeFactory), nettingSetCubeFactory_(nettingSetCubeFactory), cptyCubeFactory_(cptyCubeFactory),
72 context_(context), offsetScenario_(offSetScenario){
74 QL_REQUIRE(
nThreads_ != 0,
"MultiThreadedValuationEngine: nThreads must be > 0");
78#ifndef QL_ENABLE_SESSIONS
79 QL_FAIL(
"MultiThreadedValuationEngine requires a build with QL_ENABLE_SESSIONS = ON.");
85 cubeFactory_ = [](
const QuantLib::Date&
asof,
const std::set<std::string>& ids,
86 const std::vector<QuantLib::Date>& dates,
const Size samples) {
87 return QuantLib::ext::make_shared<ore::analytics::DoublePrecisionInMemoryCube>(
asof, ids, dates, samples);
92 const Size samples) {
return nullptr; };
96 const std::vector<QuantLib::Date>& dates,
const Size samples) {
return nullptr; };
100 const QuantLib::ext::shared_ptr<AggregationScenarioData>& aggregationScenarioData) {
105 const QuantLib::ext::shared_ptr<ore::data::Portfolio>& portfolio,
106 const std::function<std::vector<QuantLib::ext::shared_ptr<ore::analytics::ValuationCalculator>>()>& calculators,
107 const std::function<std::vector<QuantLib::ext::shared_ptr<ore::analytics::CounterpartyCalculator>>()>& cptyCalculators,
108 bool mporStickyDate,
bool dryRun) {
110 boost::timer::cpu_timer timer;
112 LOG(
"MultiThreadedValuationEngine::buildCube() was called");
116 LOG(
"Extract pricing stats and clear them in the current portfolio");
118 std::map<std::string, std::pair<std::size_t, boost::timer::nanosecond_type>> pricingStats;
119 for (
auto const& [tid, t] : portfolio->trades())
120 pricingStats[tid] = std::make_pair(t->getNumberOfPricings(), t->getCumulativePricingTime());
124 LOG(
"Reset and build portfolio against init market to produce pricing stats from a single pricing. Using pricing "
128 QuantLib::ext::shared_ptr<ore::data::Market> initMarket = QuantLib::ext::make_shared<ore::data::TodaysMarket>(
132 auto engineFactory = QuantLib::ext::make_shared<ore::data::EngineFactory>(
137 portfolio->build(engineFactory,
context_,
true);
139 for (
auto const& [tid, t] : portfolio->trades()) {
140 TLOG(
"got npv for " << tid <<
": " << std::setprecision(12) << t->instrument()->NPV() <<
" "
141 << t->npvCurrency());
146 Size eff_nThreads = std::min(portfolio->size(),
nThreads_);
148 LOG(
"Splitting portfolio.");
150 LOG(
"portfolio size = " << portfolio->size());
152 LOG(
"eff nThreads = " << eff_nThreads);
154 QL_REQUIRE(eff_nThreads > 0,
"effective threads are zero, this is not allowed.");
156 std::vector<QuantLib::ext::shared_ptr<ore::data::Portfolio>> portfolios;
157 for (Size i = 0; i < eff_nThreads; ++i)
158 portfolios.push_back(QuantLib::ext::make_shared<ore::data::Portfolio>());
160 double totalAvgPricingTime = 0.0;
161 std::vector<std::pair<std::string, double>> timings;
162 for (
auto const& [tid, t] : portfolio->trades()) {
163 if (t->getNumberOfPricings() != 0) {
164 double dt = t->getCumulativePricingTime() /
static_cast<double>(t->getNumberOfPricings());
165 timings.push_back(std::make_pair(tid, dt));
166 totalAvgPricingTime += dt;
169 timings.push_back(std::make_pair(tid, 0.0));
173 std::sort(timings.begin(), timings.end(),
174 [](
const std::pair<std::string, double>& p1,
const std::pair<std::string, double> p2) {
175 if (p1.second == p2.second)
176 return p1.first < p2.first;
178 return p1.second > p2.second;
181 std::vector<double> portfolioTotalAvgPricingTime(portfolios.size());
182 Size portfolioIndex = 0;
183 for (
auto const& t : timings) {
184 portfolios[portfolioIndex]->add(portfolio->get(t.first));
185 portfolioTotalAvgPricingTime[portfolioIndex] += t.second;
186 if (++portfolioIndex >= eff_nThreads)
192 std::vector<std::string> portfoliosAsString;
193 for (
auto const& p : portfolios) {
194 portfoliosAsString.emplace_back(p->toXMLString());
199 LOG(
"Total avg pricing time : " << totalAvgPricingTime / 1E6 <<
" ms");
200 for (Size i = 0; i < eff_nThreads; ++i) {
201 LOG(
"Portfolio #" << i <<
" number of trades : " << portfolios[i]->
size());
202 LOG(
"Portfolio #" << i <<
" total avg pricing time : " << portfolioTotalAvgPricingTime[i] / 1E6 <<
" ms");
207 LOG(
"Cloning scenario generators for " << eff_nThreads <<
" threads...");
208 std::vector<QuantLib::ext::shared_ptr<ore::analytics::ScenarioGenerator>> scenarioGenerators;
211 scenarioGenerators.push_back(tmp);
212 DLOG(
"generator for thread 1 cloned.");
213 for (Size i = 1; i < eff_nThreads; ++i) {
214 scenarioGenerators.push_back(QuantLib::ext::make_shared<ore::analytics::ClonedScenarioGenerator>(*tmp));
215 DLOG(
"generator for thread " << (i + 1) <<
" cloned.");
220 LOG(
"Cloning loaders for " << eff_nThreads <<
" threads...");
221 std::vector<QuantLib::ext::shared_ptr<ore::data::ClonedLoader>> loaders;
222 for (Size i = 0; i < eff_nThreads; ++i)
223 loaders.push_back(QuantLib::ext::make_shared<ore::data::ClonedLoader>(
today_,
loader_));
227 LOG(
"Build " << eff_nThreads <<
" mini result cubes...");
231 for (Size i = 0; i < eff_nThreads; ++i) {
240 auto progressIndicator =
241 QuantLib::ext::make_shared<ore::analytics::MultiThreadedProgressIndicator>(this->
progressIndicators());
250 using resultType = int;
251 std::vector<std::future<resultType>> results(eff_nThreads);
253 std::vector<std::thread> jobs;
256 std::vector<std::map<std::string, std::pair<std::size_t, boost::timer::nanosecond_type>>> workerPricingStats(
262 for (Size i = 0; i < eff_nThreads; ++i) {
264 auto job = [
this, obsMode, dryRun, &calculators, &cptyCalculators, mporStickyDate, &portfoliosAsString,
265 &scenarioGenerators, &loaders, &workerPricingStats, &progressIndicator](
int id) -> resultType {
268 QuantLib::Settings::instance().evaluationDate() =
today_;
269 ore::analytics::ObservationMode::instance().setMode(obsMode);
271 LOG(
"Start thread " <<
id);
279 QuantLib::ext::shared_ptr<ore::data::Market> initMarket = QuantLib::ext::make_shared<ore::data::TodaysMarket>(
285 QuantLib::ext::shared_ptr<ore::analytics::ScenarioSimMarket> simMarket =
286 QuantLib::ext::make_shared<ore::analytics::ScenarioSimMarket>(
298 simMarket->scenarioGenerator() = scenarioGenerators[id];
307 auto portfolio = QuantLib::ext::make_shared<ore::data::Portfolio>();
308 portfolio->fromXMLString(portfoliosAsString[
id]);
309 auto engineFactory = QuantLib::ext::make_shared<ore::data::EngineFactory>(
313 portfolio->build(engineFactory,
context_,
true);
317 auto valEngine = QuantLib::ext::make_shared<ore::analytics::ValuationEngine>(
320 : std::set<std::pair<std::string, QuantLib::ext::shared_ptr<QuantExt::ModelBuilder>>>());
321 valEngine->registerProgressIndicator(progressIndicator);
327 cptyCalculators ? cptyCalculators()
328 : std::vector<QuantLib::ext::shared_ptr<CounterpartyCalculator>>(),
333 for (
auto const& [tid, t] : portfolio->trades())
334 workerPricingStats[id][tid] =
335 std::make_pair(t->getNumberOfPricings(), t->getCumulativePricingTime());
339 LOG(
"Thread " <<
id <<
" successfully finished.");
343 }
catch (
const std::exception& e) {
359 std::packaged_task<resultType(
int)> task(job);
360 results[i] = task.get_future();
361 std::thread thread(std::move(task), i);
362 jobs.emplace_back(std::move(thread));
371 for (Size i = 0; i < results.size(); ++i) {
375 for (Size i = 0; i < results.size(); ++i) {
376 QL_REQUIRE(results[i].valid(),
"internal error: did not get a valid result");
377 int rc = results[i].get();
378 QL_REQUIRE(rc == 0,
"error: thread " << i <<
" exited with return code " << rc
379 <<
". Check for structured errors from 'MultiThreaded Valuation Engine'.");
389 LOG(
"Update pricing stats of trades.");
391 for (
auto const& [tid, t] : portfolio->trades()) {
392 auto p = pricingStats[tid];
393 std::size_t n = p.first;
394 boost::timer::nanosecond_type d = p.second;
395 for (
auto const& w : workerPricingStats) {
396 auto p = w.find(tid);
398 n += p->second.first;
399 d += p->second.second;
402 t->resetPricingStats(n, d);
407 LOG(
"MultiThreadedValuationEngine::buildCube() successfully finished, timings: "
408 <<
static_cast<double>(timer.elapsed().wall) / 1.0E9 <<
"s Wall, "
409 <<
static_cast<double>(timer.elapsed().user) / 1.0E9 <<
"s User, "
410 <<
static_cast<double>(timer.elapsed().system) / 1.0E9 <<
"s System.");
bool handlePseudoCurrenciesSimMarket_
QuantLib::ext::shared_ptr< ore::data::CurveConfigurations > curveConfigs_
std::string configuration_
QuantLib::ext::shared_ptr< ore::data::TodaysMarketParameters > todaysMarketParams_
QuantLib::ext::shared_ptr< ore::analytics::Scenario > offsetScenario_
std::vector< QuantLib::ext::shared_ptr< ore::analytics::NPVCube > > miniNettingSetCubes_
std::function< QuantLib::ext::shared_ptr< ore::analytics::NPVCube >(const QuantLib::Date &, const std::vector< QuantLib::Date > &, const QuantLib::Size)> nettingSetCubeFactory_
std::vector< QuantLib::ext::shared_ptr< ore::analytics::NPVCube > > miniCubes_
bool useSpreadedTermStructures_
QuantLib::ext::shared_ptr< ore::analytics::ScenarioSimMarketParameters > simMarketData_
void setAggregationScenarioData(const QuantLib::ext::shared_ptr< AggregationScenarioData > &aggregationScenarioData)
QuantLib::ext::shared_ptr< ore::data::DateGrid > dateGrid_
QuantLib::ext::shared_ptr< AggregationScenarioData > aggregationScenarioData_
QuantLib::ext::shared_ptr< ore::data::ReferenceDataManager > referenceData_
QuantLib::ext::shared_ptr< ore::analytics::ScenarioFilter > scenarioFilter_
std::function< QuantLib::ext::shared_ptr< ore::analytics::NPVCube >(const QuantLib::Date &, const std::set< std::string > &, const std::vector< QuantLib::Date > &, const QuantLib::Size)> cptyCubeFactory_
bool handlePseudoCurrenciesTodaysMarket_
ore::data::IborFallbackConfig iborFallbackConfig_
std::vector< QuantLib::ext::shared_ptr< ore::analytics::NPVCube > > miniCptyCubes_
QuantLib::ext::shared_ptr< ore::data::Loader > loader_
void buildCube(const QuantLib::ext::shared_ptr< ore::data::Portfolio > &portfolio, const std::function< std::vector< QuantLib::ext::shared_ptr< ore::analytics::ValuationCalculator > >()> &calculators, const std::function< std::vector< QuantLib::ext::shared_ptr< ore::analytics::CounterpartyCalculator > >()> &cptyCalculators={}, bool mporStickyDate=true, bool dryRun=false)
std::function< QuantLib::ext::shared_ptr< ore::analytics::NPVCube >(const QuantLib::Date &, const std::set< std::string > &, const std::vector< QuantLib::Date > &, const QuantLib::Size)> cubeFactory_
MultiThreadedValuationEngine(const QuantLib::Size nThreads, const QuantLib::Date &today, const QuantLib::ext::shared_ptr< ore::analytics::DateGrid > &dateGrid, const QuantLib::Size nSamples, const QuantLib::ext::shared_ptr< ore::data::Loader > &loader, const QuantLib::ext::shared_ptr< ore::analytics::ScenarioGenerator > &scenarioGenerator, const QuantLib::ext::shared_ptr< ore::data::EngineData > &engineData, const QuantLib::ext::shared_ptr< ore::data::CurveConfigurations > &curveConfigs, const QuantLib::ext::shared_ptr< ore::data::TodaysMarketParameters > &todaysMarketParams, const std::string &configuration, const QuantLib::ext::shared_ptr< ore::analytics::ScenarioSimMarketParameters > &simMarketData, const bool useSpreadedTermStructures=false, const bool cacheSimData=false, const QuantLib::ext::shared_ptr< ore::analytics::ScenarioFilter > &scenarioFilter=QuantLib::ext::make_shared< ore::analytics::ScenarioFilter >(), const QuantLib::ext::shared_ptr< ore::data::ReferenceDataManager > &referenceData=nullptr, const ore::data::IborFallbackConfig &iborFallbackConfig=ore::data::IborFallbackConfig::defaultConfig(), const bool handlePseudoCurrenciesTodaysMarket=true, const bool handlePseudoCurrenciesSimMarket=true, const bool recalibrateModels=true, const std::function< QuantLib::ext::shared_ptr< ore::analytics::NPVCube >(const QuantLib::Date &, const std::set< std::string > &, const std::vector< QuantLib::Date > &, const QuantLib::Size)> &cubeFactory={}, const std::function< QuantLib::ext::shared_ptr< ore::analytics::NPVCube >(const QuantLib::Date &, const std::vector< QuantLib::Date > &, const QuantLib::Size)> &nettingSetCubeFactory={}, const std::function< QuantLib::ext::shared_ptr< ore::analytics::NPVCube >(const QuantLib::Date &, const std::set< std::string > &, const std::vector< QuantLib::Date > &, const QuantLib::Size)> &cptyCubeFactory={}, const std::string &context="unspecified", const QuantLib::ext::shared_ptr< ore::analytics::Scenario > &offSetScenario=nullptr)
QuantLib::ext::shared_ptr< ore::analytics::ScenarioGenerator > scenarioGenerator_
QuantLib::ext::shared_ptr< ore::data::EngineData > engineData_
const std::set< QuantLib::ext::shared_ptr< ProgressIndicator > > & progressIndicators() const
A cube implementation that stores the cube in memory.
multi-threaded valuation engine
Size size(const ValueType &v)
Singleton class to hold global Observation Mode.
Structured analytics error.
vector< string > curveConfigs