108 {
109
110 boost::timer::cpu_timer timer;
111
112 LOG(
"MultiThreadedValuationEngine::buildCube() was called");
113
114
115
116 LOG(
"Extract pricing stats and clear them in the current portfolio");
117
118 std::map<std::string, std::pair<std::size_t, boost::timer::nanosecond_type>> pricingStats;
119 for (auto const& [tid, t] : portfolio->trades())
120 pricingStats[tid] = std::make_pair(t->getNumberOfPricings(), t->getCumulativePricingTime());
121
122
123
124 LOG(
"Reset and build portfolio against init market to produce pricing stats from a single pricing. Using pricing "
125 "configuration '"
127
128 QuantLib::ext::shared_ptr<ore::data::Market> initMarket = QuantLib::ext::make_shared<ore::data::TodaysMarket>(
131
132 auto engineFactory = QuantLib::ext::make_shared<ore::data::EngineFactory>(
136
137 portfolio->build(engineFactory,
context_,
true);
138
139 for (auto const& [tid, t] : portfolio->trades()) {
140 TLOG(
"got npv for " << tid <<
": " << std::setprecision(12) << t->instrument()->NPV() <<
" "
141 << t->npvCurrency());
142 }
143
144
145
146 Size eff_nThreads = std::min(portfolio->size(),
nThreads_);
147
148 LOG(
"Splitting portfolio.");
149
150 LOG(
"portfolio size = " << portfolio->size());
152 LOG(
"eff nThreads = " << eff_nThreads);
153
154 QL_REQUIRE(eff_nThreads > 0, "effective threads are zero, this is not allowed.");
155
156 std::vector<QuantLib::ext::shared_ptr<ore::data::Portfolio>> portfolios;
157 for (Size i = 0; i < eff_nThreads; ++i)
158 portfolios.push_back(QuantLib::ext::make_shared<ore::data::Portfolio>());
159
160 double totalAvgPricingTime = 0.0;
161 std::vector<std::pair<std::string, double>> timings;
162 for (auto const& [tid, t] : portfolio->trades()) {
163 if (t->getNumberOfPricings() != 0) {
164 double dt = t->getCumulativePricingTime() / static_cast<double>(t->getNumberOfPricings());
165 timings.push_back(std::make_pair(tid, dt));
166 totalAvgPricingTime += dt;
167 } else {
168
169 timings.push_back(std::make_pair(tid, 0.0));
170 }
171 }
172
173 std::sort(timings.begin(), timings.end(),
174 [](const std::pair<std::string, double>& p1, const std::pair<std::string, double> p2) {
175 if (p1.second == p2.second)
176 return p1.first < p2.first;
177 else
178 return p1.second > p2.second;
179 });
180
181 std::vector<double> portfolioTotalAvgPricingTime(portfolios.size());
182 Size portfolioIndex = 0;
183 for (auto const& t : timings) {
184 portfolios[portfolioIndex]->add(portfolio->get(t.first));
185 portfolioTotalAvgPricingTime[portfolioIndex] += t.second;
186 if (++portfolioIndex >= eff_nThreads)
187 portfolioIndex = 0;
188 }
189
190
191
192 std::vector<std::string> portfoliosAsString;
193 for (auto const& p : portfolios) {
194 portfoliosAsString.emplace_back(p->toXMLString());
195 }
196
197
198
199 LOG(
"Total avg pricing time : " << totalAvgPricingTime / 1E6 <<
" ms");
200 for (Size i = 0; i < eff_nThreads; ++i) {
201 LOG(
"Portfolio #" << i <<
" number of trades : " << portfolios[i]->
size());
202 LOG(
"Portfolio #" << i <<
" total avg pricing time : " << portfolioTotalAvgPricingTime[i] / 1E6 <<
" ms");
203 }
204
205
206
207 LOG(
"Cloning scenario generators for " << eff_nThreads <<
" threads...");
208 std::vector<QuantLib::ext::shared_ptr<ore::analytics::ScenarioGenerator>> scenarioGenerators;
209 auto tmp =
211 scenarioGenerators.push_back(tmp);
212 DLOG(
"generator for thread 1 cloned.");
213 for (Size i = 1; i < eff_nThreads; ++i) {
214 scenarioGenerators.push_back(QuantLib::ext::make_shared<ore::analytics::ClonedScenarioGenerator>(*tmp));
215 DLOG(
"generator for thread " << (i + 1) <<
" cloned.");
216 }
217
218
219
220 LOG(
"Cloning loaders for " << eff_nThreads <<
" threads...");
221 std::vector<QuantLib::ext::shared_ptr<ore::data::ClonedLoader>> loaders;
222 for (Size i = 0; i < eff_nThreads; ++i)
223 loaders.push_back(QuantLib::ext::make_shared<ore::data::ClonedLoader>(
today_,
loader_));
224
225
226
227 LOG(
"Build " << eff_nThreads <<
" mini result cubes...");
231 for (Size i = 0; i < eff_nThreads; ++i) {
236 }
237
238
239
240 auto progressIndicator =
241 QuantLib::ext::make_shared<ore::analytics::MultiThreadedProgressIndicator>(this->
progressIndicators());
242
243
244
245
246
247
248
249
250 using resultType = int;
251 std::vector<std::future<resultType>> results(eff_nThreads);
252
253 std::vector<std::thread> jobs;
254
255
256 std::vector<std::map<std::string, std::pair<std::size_t, boost::timer::nanosecond_type>>> workerPricingStats(
257 eff_nThreads);
258
259
261
262 for (Size i = 0; i < eff_nThreads; ++i) {
263
264 auto job = [this, obsMode, dryRun, &calculators, &cptyCalculators, mporStickyDate, &portfoliosAsString,
265 &scenarioGenerators, &loaders, &workerPricingStats, &progressIndicator](int id) -> resultType {
266
267
268 QuantLib::Settings::instance().evaluationDate() =
today_;
269 ore::analytics::ObservationMode::instance().setMode(obsMode);
270
271 LOG(
"Start thread " <<
id);
272
273 int rc;
274
275 try {
276
277
278
279 QuantLib::ext::shared_ptr<ore::data::Market> initMarket = QuantLib::ext::make_shared<ore::data::TodaysMarket>(
282
283
284
285 QuantLib::ext::shared_ptr<ore::analytics::ScenarioSimMarket> simMarket =
286 QuantLib::ext::make_shared<ore::analytics::ScenarioSimMarket>(
290
291
292
295
296
297
298 simMarket->scenarioGenerator() = scenarioGenerators[id];
299
300
301
304
305
306
307 auto portfolio = QuantLib::ext::make_shared<ore::data::Portfolio>();
308 portfolio->fromXMLString(portfoliosAsString[id]);
309 auto engineFactory = QuantLib::ext::make_shared<ore::data::EngineFactory>(
312
313 portfolio->build(engineFactory,
context_,
true);
314
315
316
317 auto valEngine = QuantLib::ext::make_shared<ore::analytics::ValuationEngine>(
320 : std::set<std::pair<std::string, QuantLib::ext::shared_ptr<QuantExt::ModelBuilder>>>());
321 valEngine->registerProgressIndicator(progressIndicator);
322
323
324
327 cptyCalculators ? cptyCalculators()
328 : std::vector<QuantLib::ext::shared_ptr<CounterpartyCalculator>>(),
329 dryRun);
330
331
332
333 for (auto const& [tid, t] : portfolio->trades())
334 workerPricingStats[id][tid] =
335 std::make_pair(t->getNumberOfPricings(), t->getCumulativePricingTime());
336
337
338
339 LOG(
"Thread " <<
id <<
" successfully finished.");
340
341 rc = 0;
342
343 } catch (const std::exception& e) {
344
345
346
348 rc = 1;
349 }
350
351
352
353 return rc;
354 };
355
356
357
358
359 std::packaged_task<resultType(int)> task(job);
360 results[i] = task.get_future();
361 std::thread thread(std::move(task), i);
362 jobs.emplace_back(std::move(thread));
363 }
364
365
366
367
368 for (auto& t : jobs)
369 t.join();
370
371 for (Size i = 0; i < results.size(); ++i) {
372 results[i].wait();
373 }
374
375 for (Size i = 0; i < results.size(); ++i) {
376 QL_REQUIRE(results[i].valid(), "internal error: did not get a valid result");
377 int rc = results[i].get();
378 QL_REQUIRE(rc == 0, "error: thread " << i << " exited with return code " << rc
379 << ". Check for structured errors from 'MultiThreaded Valuation Engine'.");
380 }
381
382
383
384
385
386
387
388
389 LOG(
"Update pricing stats of trades.");
390
391 for (auto const& [tid, t] : portfolio->trades()) {
392 auto p = pricingStats[tid];
393 std::size_t n = p.first;
394 boost::timer::nanosecond_type d = p.second;
395 for (auto const& w : workerPricingStats) {
396 auto p = w.find(tid);
397 if (p != w.end()) {
398 n += p->second.first;
399 d += p->second.second;
400 }
401 }
402 t->resetPricingStats(n, d);
403 }
404
405
406
407 LOG(
"MultiThreadedValuationEngine::buildCube() successfully finished, timings: "
408 << static_cast<double>(timer.elapsed().wall) / 1.0E9 << "s Wall, "
409 << static_cast<double>(timer.elapsed().user) / 1.0E9 << "s User, "
410 << static_cast<double>(timer.elapsed().system) / 1.0E9 << "s System.");
411}
std::vector< QuantLib::ext::shared_ptr< ore::analytics::NPVCube > > miniNettingSetCubes_
std::vector< QuantLib::ext::shared_ptr< ore::analytics::NPVCube > > miniCubes_
std::vector< QuantLib::ext::shared_ptr< ore::analytics::NPVCube > > miniCptyCubes_
const std::set< QuantLib::ext::shared_ptr< ProgressIndicator > > & progressIndicators() const
Size size(const ValueType &v)