704 {
705 LOG(
"Starting multi-threaded AMCValuationEngine for "
707 << " dates.");
708
709 QL_REQUIRE(
useMultithreading_,
"AMCValuationEngine::buildCube() method was called with signature for "
710 "multi-threaded run, but engine was constructed for single-threaded runs");
711
712 QL_REQUIRE(portfolio->size() > 0, "AMCValuationEngine::buildCube: empty portfolio");
713
714
715
716 LOG(
"Splitting portfolio.");
717
718 Size eff_nThreads = std::min(portfolio->size(),
nThreads_);
719
720 LOG(
"portfolio size = " << portfolio->size());
722 LOG(
"eff nThreads = " << eff_nThreads);
723
724 QL_REQUIRE(eff_nThreads > 0, "effective threads are zero, this is not allowed.");
725
726 std::vector<QuantLib::ext::shared_ptr<ore::data::Portfolio>> portfolios;
727 for (Size i = 0; i < eff_nThreads; ++i)
728 portfolios.push_back(QuantLib::ext::make_shared<ore::data::Portfolio>());
729
730 Size portfolioIndex = 0;
731 for (auto const& t : portfolio->trades()) {
732 portfolios[portfolioIndex]->add(t.second);
733 if (++portfolioIndex >= eff_nThreads)
734 portfolioIndex = 0;
735 }
736
737
738
739 std::vector<std::string> portfoliosAsString;
740 for (auto const& p : portfolios) {
741 portfoliosAsString.emplace_back(p->toXMLString());
742 }
743
744
745
746 for (Size i = 0; i < eff_nThreads; ++i) {
747 LOG(
"Portfolio #" << i <<
" number of trades : " << portfolios[i]->
size());
748 }
749
750
751
752 LOG(
"Cloning loaders for " << eff_nThreads <<
" threads...");
753 std::vector<QuantLib::ext::shared_ptr<ore::data::ClonedLoader>> loaders;
754 for (Size i = 0; i < eff_nThreads; ++i)
755 loaders.push_back(QuantLib::ext::make_shared<ore::data::ClonedLoader>(
today_,
loader_));
756
757
758
759 LOG(
"Build " << eff_nThreads <<
" mini result cubes...");
761 for (Size i = 0; i < eff_nThreads; ++i) {
764 }
765
766
767
768 std::vector<Date> simDates =
772
773
774
775 auto progressIndicator =
776 QuantLib::ext::make_shared<ore::analytics::MultiThreadedProgressIndicator>(this->
progressIndicators());
777
778
779
780
781
782
783
784
785 using resultType = int;
786 std::vector<std::future<resultType>> results(eff_nThreads);
787
788 std::vector<std::thread> jobs;
789
790
791
793
794 for (Size i = 0; i < eff_nThreads; ++i) {
795
796 auto job = [this, obsMode, &portfoliosAsString, &loaders, &simDates, &progressIndicator](int id) -> resultType {
797
798
799 QuantLib::Settings::instance().evaluationDate() =
today_;
800 ore::analytics::ObservationMode::instance().setMode(obsMode);
801
802 LOG(
"Start thread " <<
id);
803
804 int rc;
805
806 try {
807
808
809
810 QuantLib::ext::shared_ptr<ore::data::Market> initMarket = QuantLib::ext::make_shared<ore::data::TodaysMarket>(
813
814 QuantLib::ext::shared_ptr<ore::data::Market> market = initMarket;
815
818 "AMC Valuation Engine can not build simMarket without simMarketParam");
819 bool continueOnError = true;
821 market = QuantLib::ext::make_shared<ScenarioSimMarket>(
825 }
826
827
832
833 auto cam = *modelBuilder.model();
834
835
836
837 auto portfolio = QuantLib::ext::make_shared<ore::data::Portfolio>();
838 portfolio->fromXMLString(portfoliosAsString[id]);
839
840 QuantLib::ext::shared_ptr<EngineData> edCopy = QuantLib::ext::make_shared<EngineData>(*
engineData_);
841 edCopy->globalParameters()["GenerateAdditionalResults"] = "false";
842 edCopy->globalParameters()["RunType"] = "NPV";
846
847 auto engineFactory = QuantLib::ext::make_shared<EngineFactory>(
849 EngineBuilderFactory::instance().generateAmcEngineBuilders(cam, simDates), true);
850
851 portfolio->build(engineFactory, "amc-val-engine", true);
852
853
854
857
858
859
860 LOG(
"Thread " <<
id <<
" successfully finished.");
861
862 rc = 0;
863
864 } catch (const std::exception& e) {
865
866
867
869 e.what())
871 rc = 1;
872 }
873
874
875
876 return rc;
877 };
878
879
880
881
882 std::packaged_task<resultType(int)> task(job);
883 results[i] = task.get_future();
884 std::thread thread(std::move(task), i);
885 jobs.emplace_back(std::move(thread));
886 }
887
888
889 for (auto& t : jobs)
890 t.join();
891
892 for (Size i = 0; i < results.size(); ++i) {
893 results[i].wait();
894 }
895
896 for (Size i = 0; i < results.size(); ++i) {
897 QL_REQUIRE(results[i].valid(), "internal error: did not get a valid result");
898 int rc = results[i].get();
899 QL_REQUIRE(rc == 0, "error: thread " << i << " exited with return code " << rc
900 << ". Check for structured errors from 'AMCValuationEngine'.");
901 }
902
903
904
905
906
907
908 LOG(
"Finished multi-threaded AMCValuationEngine run.");
909}
std::vector< QuantLib::ext::shared_ptr< ore::analytics::NPVCube > > miniCubes_
QuantLib::ext::shared_ptr< ore::analytics::AggregationScenarioData > asd_
const std::set< QuantLib::ext::shared_ptr< ProgressIndicator > > & progressIndicators() const
Size size(const ValueType &v)