Logo
Fully annotated reference manual - version 1.8.12
Loading...
Searching...
No Matches
multithreadedvaluationengine.cpp
Go to the documentation of this file.
1/*
2 Copyright (C) 2022 Quaternion Risk Management Ltd
3 All rights reserved.
4
5 This file is part of ORE, a free-software/open-source library
6 for transparent pricing and risk analysis - http://opensourcerisk.org
7
8 ORE is free software: you can redistribute it and/or modify it
9 under the terms of the Modified BSD License. You should have received a
10 copy of the license along with this program.
11 The license is also available online at <http://opensourcerisk.org>
12
13 This program is distributed on the basis that it will form a useful
14 contribution to risk analytics and model standardisation, but WITHOUT
15 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
16 FITNESS FOR A PARTICULAR PURPOSE. See the license for more details.
17*/
18
24
30
31#include <boost/timer/timer.hpp>
32
33#include <future>
34
35// #include <ctpl_stl.h>
36
37namespace ore {
38namespace analytics {
39
40using QuantLib::Size;
41
43 const Size nThreads, const QuantLib::Date& today, const QuantLib::ext::shared_ptr<ore::data::DateGrid>& dateGrid,
44 const Size nSamples, const QuantLib::ext::shared_ptr<ore::data::Loader>& loader,
45 const QuantLib::ext::shared_ptr<ore::analytics::ScenarioGenerator>& scenarioGenerator,
46 const QuantLib::ext::shared_ptr<ore::data::EngineData>& engineData,
47 const QuantLib::ext::shared_ptr<ore::data::CurveConfigurations>& curveConfigs,
48 const QuantLib::ext::shared_ptr<ore::data::TodaysMarketParameters>& todaysMarketParams, const std::string& configuration,
49 const QuantLib::ext::shared_ptr<ore::analytics::ScenarioSimMarketParameters>& simMarketData,
50 const bool useSpreadedTermStructures, const bool cacheSimData,
51 const QuantLib::ext::shared_ptr<ore::analytics::ScenarioFilter>& scenarioFilter,
52 const QuantLib::ext::shared_ptr<ore::data::ReferenceDataManager>& referenceData,
53 const ore::data::IborFallbackConfig& iborFallbackConfig, const bool handlePseudoCurrenciesTodaysMarket,
54 const bool handlePseudoCurrenciesSimMarket, const bool recalibrateModels,
55 const std::function<QuantLib::ext::shared_ptr<ore::analytics::NPVCube>(const QuantLib::Date&, const std::set<std::string>&,
56 const std::vector<QuantLib::Date>&,
57 const QuantLib::Size)>& cubeFactory,
58 const std::function<QuantLib::ext::shared_ptr<ore::analytics::NPVCube>(
59 const QuantLib::Date&, const std::vector<QuantLib::Date>&, const QuantLib::Size)>& nettingSetCubeFactory,
60 const std::function<QuantLib::ext::shared_ptr<ore::analytics::NPVCube>(const QuantLib::Date&, const std::set<std::string>&,
61 const std::vector<QuantLib::Date>&,
62 const QuantLib::Size)>& cptyCubeFactory,
63 const std::string& context, const QuantLib::ext::shared_ptr<ore::analytics::Scenario>& offSetScenario)
64 : nThreads_(nThreads), today_(today), dateGrid_(dateGrid), nSamples_(nSamples), loader_(loader),
65 scenarioGenerator_(scenarioGenerator), engineData_(engineData), curveConfigs_(curveConfigs),
66 todaysMarketParams_(todaysMarketParams), configuration_(configuration), simMarketData_(simMarketData),
67 useSpreadedTermStructures_(useSpreadedTermStructures), cacheSimData_(cacheSimData),
68 scenarioFilter_(scenarioFilter), referenceData_(referenceData), iborFallbackConfig_(iborFallbackConfig),
69 handlePseudoCurrenciesTodaysMarket_(handlePseudoCurrenciesTodaysMarket),
70 handlePseudoCurrenciesSimMarket_(handlePseudoCurrenciesSimMarket), recalibrateModels_(recalibrateModels),
71 cubeFactory_(cubeFactory), nettingSetCubeFactory_(nettingSetCubeFactory), cptyCubeFactory_(cptyCubeFactory),
72 context_(context), offsetScenario_(offSetScenario){
73
74 QL_REQUIRE(nThreads_ != 0, "MultiThreadedValuationEngine: nThreads must be > 0");
75
76 // check whether sessions are enabled, if not exit with an error
77
78#ifndef QL_ENABLE_SESSIONS
79 QL_FAIL("MultiThreadedValuationEngine requires a build with QL_ENABLE_SESSIONS = ON.");
80#endif
81
82 // if no cube factory is given, create a default one
83
84 if (!cubeFactory_)
85 cubeFactory_ = [](const QuantLib::Date& asof, const std::set<std::string>& ids,
86 const std::vector<QuantLib::Date>& dates, const Size samples) {
87 return QuantLib::ext::make_shared<ore::analytics::DoublePrecisionInMemoryCube>(asof, ids, dates, samples);
88 };
89
91 nettingSetCubeFactory_ = [](const QuantLib::Date& asof, const std::vector<QuantLib::Date>& dates,
92 const Size samples) { return nullptr; };
93
95 cptyCubeFactory_ = [](const QuantLib::Date& asof, const std::set<std::string>& ids,
96 const std::vector<QuantLib::Date>& dates, const Size samples) { return nullptr; };
97}
98
100 const QuantLib::ext::shared_ptr<AggregationScenarioData>& aggregationScenarioData) {
101 aggregationScenarioData_ = aggregationScenarioData;
102}
103
105 const QuantLib::ext::shared_ptr<ore::data::Portfolio>& portfolio,
106 const std::function<std::vector<QuantLib::ext::shared_ptr<ore::analytics::ValuationCalculator>>()>& calculators,
107 const std::function<std::vector<QuantLib::ext::shared_ptr<ore::analytics::CounterpartyCalculator>>()>& cptyCalculators,
108 bool mporStickyDate, bool dryRun) {
109
110 boost::timer::cpu_timer timer;
111
112 LOG("MultiThreadedValuationEngine::buildCube() was called");
113
114 // extract pricing stats accumulated so far and clear them
115
116 LOG("Extract pricing stats and clear them in the current portfolio");
117
118 std::map<std::string, std::pair<std::size_t, boost::timer::nanosecond_type>> pricingStats;
119 for (auto const& [tid, t] : portfolio->trades())
120 pricingStats[tid] = std::make_pair(t->getNumberOfPricings(), t->getCumulativePricingTime());
121
122 // build portfolio against init market and trigger single pricing to generate pricing stats
123
124 LOG("Reset and build portfolio against init market to produce pricing stats from a single pricing. Using pricing "
125 "configuration '"
126 << configuration_ << "'.");
127
128 QuantLib::ext::shared_ptr<ore::data::Market> initMarket = QuantLib::ext::make_shared<ore::data::TodaysMarket>(
131
132 auto engineFactory = QuantLib::ext::make_shared<ore::data::EngineFactory>(
133 engineData_, initMarket,
134 std::map<ore::data::MarketContext, string>{{ore::data::MarketContext::pricing, configuration_}}, referenceData_,
136
137 portfolio->build(engineFactory, context_, true);
138
139 for (auto const& [tid, t] : portfolio->trades()) {
140 TLOG("got npv for " << tid << ": " << std::setprecision(12) << t->instrument()->NPV() << " "
141 << t->npvCurrency());
142 }
143
144 // split portfolio into nThreads parts such that each part has an approximately similar total avg pricing time
145
146 Size eff_nThreads = std::min(portfolio->size(), nThreads_);
147
148 LOG("Splitting portfolio.");
149
150 LOG("portfolio size = " << portfolio->size());
151 LOG("nThreads = " << nThreads_);
152 LOG("eff nThreads = " << eff_nThreads);
153
154 QL_REQUIRE(eff_nThreads > 0, "effective threads are zero, this is not allowed.");
155
156 std::vector<QuantLib::ext::shared_ptr<ore::data::Portfolio>> portfolios;
157 for (Size i = 0; i < eff_nThreads; ++i)
158 portfolios.push_back(QuantLib::ext::make_shared<ore::data::Portfolio>());
159
160 double totalAvgPricingTime = 0.0;
161 std::vector<std::pair<std::string, double>> timings;
162 for (auto const& [tid, t] : portfolio->trades()) {
163 if (t->getNumberOfPricings() != 0) {
164 double dt = t->getCumulativePricingTime() / static_cast<double>(t->getNumberOfPricings());
165 timings.push_back(std::make_pair(tid, dt));
166 totalAvgPricingTime += dt;
167 } else {
168 // trade might be a failed trade
169 timings.push_back(std::make_pair(tid, 0.0));
170 }
171 }
172
173 std::sort(timings.begin(), timings.end(),
174 [](const std::pair<std::string, double>& p1, const std::pair<std::string, double> p2) {
175 if (p1.second == p2.second)
176 return p1.first < p2.first;
177 else
178 return p1.second > p2.second;
179 });
180
181 std::vector<double> portfolioTotalAvgPricingTime(portfolios.size());
182 Size portfolioIndex = 0;
183 for (auto const& t : timings) {
184 portfolios[portfolioIndex]->add(portfolio->get(t.first));
185 portfolioTotalAvgPricingTime[portfolioIndex] += t.second;
186 if (++portfolioIndex >= eff_nThreads)
187 portfolioIndex = 0;
188 }
189
190 // output the portfolios into strings so that the worker threads can load them from there
191
192 std::vector<std::string> portfoliosAsString;
193 for (auto const& p : portfolios) {
194 portfoliosAsString.emplace_back(p->toXMLString());
195 }
196
197 // log info on the portfolio split
198
199 LOG("Total avg pricing time : " << totalAvgPricingTime / 1E6 << " ms");
200 for (Size i = 0; i < eff_nThreads; ++i) {
201 LOG("Portfolio #" << i << " number of trades : " << portfolios[i]->size());
202 LOG("Portfolio #" << i << " total avg pricing time : " << portfolioTotalAvgPricingTime[i] / 1E6 << " ms");
203 }
204
205 // build scenario generators for each thread as clones of the original one
206
207 LOG("Cloning scenario generators for " << eff_nThreads << " threads...");
208 std::vector<QuantLib::ext::shared_ptr<ore::analytics::ScenarioGenerator>> scenarioGenerators;
209 auto tmp =
210 QuantLib::ext::make_shared<ore::analytics::ClonedScenarioGenerator>(scenarioGenerator_, dateGrid_->dates(), nSamples_);
211 scenarioGenerators.push_back(tmp);
212 DLOG("generator for thread 1 cloned.");
213 for (Size i = 1; i < eff_nThreads; ++i) {
214 scenarioGenerators.push_back(QuantLib::ext::make_shared<ore::analytics::ClonedScenarioGenerator>(*tmp));
215 DLOG("generator for thread " << (i + 1) << " cloned.");
216 }
217
218 // build loaders for each thread as clones of the original one
219
220 LOG("Cloning loaders for " << eff_nThreads << " threads...");
221 std::vector<QuantLib::ext::shared_ptr<ore::data::ClonedLoader>> loaders;
222 for (Size i = 0; i < eff_nThreads; ++i)
223 loaders.push_back(QuantLib::ext::make_shared<ore::data::ClonedLoader>(today_, loader_));
224
225 // build nThreads mini-cubes to which each thread writes its results
226
227 LOG("Build " << eff_nThreads << " mini result cubes...");
228 miniCubes_.clear();
229 miniNettingSetCubes_.clear();
230 miniCptyCubes_.clear();
231 for (Size i = 0; i < eff_nThreads; ++i) {
232 miniCubes_.push_back(cubeFactory_(today_, portfolios[i]->ids(), dateGrid_->valuationDates(), nSamples_));
234 miniCptyCubes_.push_back(
235 cptyCubeFactory_(today_, portfolios[i]->counterparties(), dateGrid_->valuationDates(), nSamples_));
236 }
237
238 // build progress indicator consolidating the results from the threads
239
240 auto progressIndicator =
241 QuantLib::ext::make_shared<ore::analytics::MultiThreadedProgressIndicator>(this->progressIndicators());
242
243 // create the thread pool with eff_nThreads and queue size = eff_nThreads as well
244
245 // LOG("Create thread pool with " << eff_nThreads);
246 // ctpl::thread_pool threadPool(eff_nThreads);
247
248 // create the jobs and push them to the pool
249
250 using resultType = int;
251 std::vector<std::future<resultType>> results(eff_nThreads);
252
253 std::vector<std::thread> jobs; // not needed if thread pool is used
254
255 // pricing stats accumulated in worker threads
256 std::vector<std::map<std::string, std::pair<std::size_t, boost::timer::nanosecond_type>>> workerPricingStats(
257 eff_nThreads);
258
259 // get obs mode of main thread, so that we can set this mode in the worker threads below
260 ore::analytics::ObservationMode::Mode obsMode = ore::analytics::ObservationMode::instance().mode();
261
262 for (Size i = 0; i < eff_nThreads; ++i) {
263
264 auto job = [this, obsMode, dryRun, &calculators, &cptyCalculators, mporStickyDate, &portfoliosAsString,
265 &scenarioGenerators, &loaders, &workerPricingStats, &progressIndicator](int id) -> resultType {
266 // set thread local singletons
267
268 QuantLib::Settings::instance().evaluationDate() = today_;
269 ore::analytics::ObservationMode::instance().setMode(obsMode);
270
271 LOG("Start thread " << id);
272
273 int rc;
274
275 try {
276
277 // build todays market using cloned market data
278
279 QuantLib::ext::shared_ptr<ore::data::Market> initMarket = QuantLib::ext::make_shared<ore::data::TodaysMarket>(
280 today_, todaysMarketParams_, loaders[id], curveConfigs_, true, true, true, referenceData_, false,
282
283 // build sim market
284
285 QuantLib::ext::shared_ptr<ore::analytics::ScenarioSimMarket> simMarket =
286 QuantLib::ext::make_shared<ore::analytics::ScenarioSimMarket>(
290
291 // set aggregation scenario data, but only in one of the sim markets, that's sufficient to populate it
292
293 if (id == 0 && aggregationScenarioData_ != nullptr)
294 simMarket->aggregationScenarioData() = aggregationScenarioData_;
295
296 // link scenario generator to sim market
297
298 simMarket->scenarioGenerator() = scenarioGenerators[id];
299
300 // set scenario filter
301
302 if (scenarioFilter_)
303 simMarket->filter() = scenarioFilter_;
304
305 // build portfolio against sim market
306
307 auto portfolio = QuantLib::ext::make_shared<ore::data::Portfolio>();
308 portfolio->fromXMLString(portfoliosAsString[id]);
309 auto engineFactory = QuantLib::ext::make_shared<ore::data::EngineFactory>(
310 engineData_, simMarket, std::map<ore::data::MarketContext, string>(), referenceData_,
312
313 portfolio->build(engineFactory, context_, true);
314
315 // build valuation engine
316
317 auto valEngine = QuantLib::ext::make_shared<ore::analytics::ValuationEngine>(
318 today_, dateGrid_, simMarket,
319 recalibrateModels_ ? engineFactory->modelBuilders()
320 : std::set<std::pair<std::string, QuantLib::ext::shared_ptr<QuantExt::ModelBuilder>>>());
321 valEngine->registerProgressIndicator(progressIndicator);
322
323 // build mini-cube
324
325 valEngine->buildCube(portfolio, miniCubes_[id], calculators(), mporStickyDate, miniNettingSetCubes_[id],
326 miniCptyCubes_[id],
327 cptyCalculators ? cptyCalculators()
328 : std::vector<QuantLib::ext::shared_ptr<CounterpartyCalculator>>(),
329 dryRun);
330
331 // set pricing stats for val engine run
332
333 for (auto const& [tid, t] : portfolio->trades())
334 workerPricingStats[id][tid] =
335 std::make_pair(t->getNumberOfPricings(), t->getCumulativePricingTime());
336
337 // return code 0 = ok
338
339 LOG("Thread " << id << " successfully finished.");
340
341 rc = 0;
342
343 } catch (const std::exception& e) {
344
345 // log error and return code 1 = not ok
346
347 ore::analytics::StructuredAnalyticsErrorMessage("Multithreaded Valuation Engine", "", e.what()).log();
348 rc = 1;
349 }
350
351 // exit
352
353 return rc;
354 };
355
356 // results[i] = threadPool.push(job);
357
358 // not needed if thread pool is used
359 std::packaged_task<resultType(int)> task(job);
360 results[i] = task.get_future();
361 std::thread thread(std::move(task), i);
362 jobs.emplace_back(std::move(thread));
363 }
364
365 // check return codes from jobs
366
367 // not needed if thread pool is used
368 for (auto& t : jobs)
369 t.join();
370
371 for (Size i = 0; i < results.size(); ++i) {
372 results[i].wait();
373 }
374
375 for (Size i = 0; i < results.size(); ++i) {
376 QL_REQUIRE(results[i].valid(), "internal error: did not get a valid result");
377 int rc = results[i].get();
378 QL_REQUIRE(rc == 0, "error: thread " << i << " exited with return code " << rc
379 << ". Check for structured errors from 'MultiThreaded Valuation Engine'.");
380 }
381
382 // stop the thread pool, wait for unfinished jobs
383
384 // LOG("Stop thread pool");
385 // threadPool.stop(true);
386
387 // set updated pricing stats in original portfolio
388
389 LOG("Update pricing stats of trades.");
390
391 for (auto const& [tid, t] : portfolio->trades()) {
392 auto p = pricingStats[tid];
393 std::size_t n = p.first;
394 boost::timer::nanosecond_type d = p.second;
395 for (auto const& w : workerPricingStats) {
396 auto p = w.find(tid);
397 if (p != w.end()) {
398 n += p->second.first;
399 d += p->second.second;
400 }
401 }
402 t->resetPricingStats(n, d);
403 }
404
405 // log timings and return the result mini-cubes
406
407 LOG("MultiThreadedValuationEngine::buildCube() successfully finished, timings: "
408 << static_cast<double>(timer.elapsed().wall) / 1.0E9 << "s Wall, "
409 << static_cast<double>(timer.elapsed().user) / 1.0E9 << "s User, "
410 << static_cast<double>(timer.elapsed().system) / 1.0E9 << "s System.");
411}
412
413} // namespace analytics
414} // namespace ore
QuantLib::ext::shared_ptr< ore::data::CurveConfigurations > curveConfigs_
QuantLib::ext::shared_ptr< ore::data::TodaysMarketParameters > todaysMarketParams_
QuantLib::ext::shared_ptr< ore::analytics::Scenario > offsetScenario_
std::vector< QuantLib::ext::shared_ptr< ore::analytics::NPVCube > > miniNettingSetCubes_
std::function< QuantLib::ext::shared_ptr< ore::analytics::NPVCube >(const QuantLib::Date &, const std::vector< QuantLib::Date > &, const QuantLib::Size)> nettingSetCubeFactory_
std::vector< QuantLib::ext::shared_ptr< ore::analytics::NPVCube > > miniCubes_
QuantLib::ext::shared_ptr< ore::analytics::ScenarioSimMarketParameters > simMarketData_
void setAggregationScenarioData(const QuantLib::ext::shared_ptr< AggregationScenarioData > &aggregationScenarioData)
QuantLib::ext::shared_ptr< ore::data::DateGrid > dateGrid_
QuantLib::ext::shared_ptr< AggregationScenarioData > aggregationScenarioData_
QuantLib::ext::shared_ptr< ore::data::ReferenceDataManager > referenceData_
QuantLib::ext::shared_ptr< ore::analytics::ScenarioFilter > scenarioFilter_
std::function< QuantLib::ext::shared_ptr< ore::analytics::NPVCube >(const QuantLib::Date &, const std::set< std::string > &, const std::vector< QuantLib::Date > &, const QuantLib::Size)> cptyCubeFactory_
std::vector< QuantLib::ext::shared_ptr< ore::analytics::NPVCube > > miniCptyCubes_
QuantLib::ext::shared_ptr< ore::data::Loader > loader_
void buildCube(const QuantLib::ext::shared_ptr< ore::data::Portfolio > &portfolio, const std::function< std::vector< QuantLib::ext::shared_ptr< ore::analytics::ValuationCalculator > >()> &calculators, const std::function< std::vector< QuantLib::ext::shared_ptr< ore::analytics::CounterpartyCalculator > >()> &cptyCalculators={}, bool mporStickyDate=true, bool dryRun=false)
std::function< QuantLib::ext::shared_ptr< ore::analytics::NPVCube >(const QuantLib::Date &, const std::set< std::string > &, const std::vector< QuantLib::Date > &, const QuantLib::Size)> cubeFactory_
MultiThreadedValuationEngine(const QuantLib::Size nThreads, const QuantLib::Date &today, const QuantLib::ext::shared_ptr< ore::analytics::DateGrid > &dateGrid, const QuantLib::Size nSamples, const QuantLib::ext::shared_ptr< ore::data::Loader > &loader, const QuantLib::ext::shared_ptr< ore::analytics::ScenarioGenerator > &scenarioGenerator, const QuantLib::ext::shared_ptr< ore::data::EngineData > &engineData, const QuantLib::ext::shared_ptr< ore::data::CurveConfigurations > &curveConfigs, const QuantLib::ext::shared_ptr< ore::data::TodaysMarketParameters > &todaysMarketParams, const std::string &configuration, const QuantLib::ext::shared_ptr< ore::analytics::ScenarioSimMarketParameters > &simMarketData, const bool useSpreadedTermStructures=false, const bool cacheSimData=false, const QuantLib::ext::shared_ptr< ore::analytics::ScenarioFilter > &scenarioFilter=QuantLib::ext::make_shared< ore::analytics::ScenarioFilter >(), const QuantLib::ext::shared_ptr< ore::data::ReferenceDataManager > &referenceData=nullptr, const ore::data::IborFallbackConfig &iborFallbackConfig=ore::data::IborFallbackConfig::defaultConfig(), const bool handlePseudoCurrenciesTodaysMarket=true, const bool handlePseudoCurrenciesSimMarket=true, const bool recalibrateModels=true, const std::function< QuantLib::ext::shared_ptr< ore::analytics::NPVCube >(const QuantLib::Date &, const std::set< std::string > &, const std::vector< QuantLib::Date > &, const QuantLib::Size)> &cubeFactory={}, const std::function< QuantLib::ext::shared_ptr< ore::analytics::NPVCube >(const QuantLib::Date &, const std::vector< QuantLib::Date > &, const QuantLib::Size)> &nettingSetCubeFactory={}, const std::function< QuantLib::ext::shared_ptr< ore::analytics::NPVCube >(const QuantLib::Date &, const std::set< std::string > &, const std::vector< QuantLib::Date > &, const QuantLib::Size)> &cptyCubeFactory={}, const std::string &context="unspecified", const QuantLib::ext::shared_ptr< ore::analytics::Scenario > &offSetScenario=nullptr)
QuantLib::ext::shared_ptr< ore::analytics::ScenarioGenerator > scenarioGenerator_
QuantLib::ext::shared_ptr< ore::data::EngineData > engineData_
const std::set< QuantLib::ext::shared_ptr< ProgressIndicator > > & progressIndicators() const
Context & context_
A cube implementation that stores the cube in memory.
#define LOG(text)
#define DLOG(text)
#define TLOG(text)
multi-threaded valuation engine
Size size(const ValueType &v)
Singleton class to hold global Observation Mode.
Structured analytics error.
vector< string > curveConfigs
Date asof(14, Jun, 2018)