SourceXtractorPlusPlus 0.18
SourceXtractor++, the next generation SExtractor
SourceXtractor.cpp
Go to the documentation of this file.
1
23#include <dlfcn.h>
24#include <iomanip>
25#include <map>
26#include <string>
27#include <typeinfo>
28
29#include <boost/program_options.hpp>
30#include <boost/algorithm/string/predicate.hpp>
32
33#include "ElementsKernel/Main.h"
36
38#include "Configuration/Utils.h"
39
41
49
51
54
78
80#include "SEMain/PluginConfig.h"
81#include "SEMain/Sorter.h"
82
83
84namespace po = boost::program_options;
85namespace fs = boost::filesystem;
86using namespace SourceXtractor;
87using namespace Euclid::Configuration;
88
90
91static const std::string LIST_OUTPUT_PROPERTIES {"list-output-properties"};
92static const std::string PROPERTY_COLUMN_MAPPING_ALL {"property-column-mapping-all"};
93static const std::string PROPERTY_COLUMN_MAPPING {"property-column-mapping"};
94static const std::string DUMP_CONFIG {"dump-default-config"};
95
96class GroupObserver : public Observer<std::shared_ptr<SourceGroupInterface>> {
97public:
98 virtual void handleMessage(const std::shared_ptr<SourceGroupInterface>& group) override {
99 m_list.push_back(group);
100 }
101
103};
104
105class SourceObserver : public Observer<std::shared_ptr<SourceWithOnDemandProperties>> {
106public:
108 m_list.push_back(source);
109 }
110
112};
113
115
116static void setupEnvironment(void) {
117 // Some parts of boost (including boost::filesystem) can throw an exception when the
118 // locale as configured in the environment is invalid.
119 // We work around that overriding the locale if we find an invalid one.
120 // See https://svn.boost.org/trac10/ticket/10205
121 try {
122 std::locale("");
123 }
124 catch (...) {
125 ::setenv("LC_ALL", "C", 1);
126 }
127}
128
136 bool omp_env_present = getenv("OMP_NUM_THREADS") || getenv("OMP_DYNAMIC");
137 bool mkl_env_present = getenv("MKL_NUM_THREADS") || getenv("MKL_DYNAMIC");
138 if (!omp_env_present && !mkl_env_present) {
139 // Despite the documentation, the methods following C ABI are capitalized
140 void (*set_num_threads)(int) = reinterpret_cast<void (*)(int)>(dlsym(RTLD_DEFAULT, "MKL_Set_Num_Threads"));
141 void (*set_dynamic)(int) = reinterpret_cast<void (*)(int)>(dlsym(RTLD_DEFAULT, "MKL_Set_Dynamic"));
142 if (set_num_threads) {
143 logger.debug() << "Disabling multithreading";
144 set_num_threads(1);
145 }
146 if (set_dynamic) {
147 logger.debug() << "Disabling dynamic multithreading";
148 set_dynamic(0);
149 }
150 }
151}
152
153class SEMain : public Elements::Program {
154
155 std::shared_ptr<TaskFactoryRegistry> task_factory_registry = std::make_shared<TaskFactoryRegistry>();
157 std::shared_ptr<OutputRegistry> output_registry = std::make_shared<OutputRegistry>();
162 std::make_shared<SourceWithOnDemandPropertiesFactory>(task_provider);
164 std::make_shared<SourceGroupWithOnDemandPropertiesFactory>(task_provider);
170
171 bool config_initialized = false;
172 po::options_description config_parameters;
173
174public:
175
176 SEMain(const std::string& plugin_path, const std::vector<std::string>& plugin_list)
178 }
179
183 po::options_description getConfigParameters() {
184 if (!config_initialized) {
185 auto& config_manager = ConfigManager::getInstance(config_manager_id);
186 config_manager.registerConfiguration<SourceXtractorConfig>();
187 config_manager.registerConfiguration<BackgroundConfig>();
188 config_manager.registerConfiguration<SE2BackgroundConfig>();
189 config_manager.registerConfiguration<MemoryConfig>();
190 config_manager.registerConfiguration<BackgroundAnalyzerFactory>();
191 config_manager.registerConfiguration<SamplingConfig>();
192 config_manager.registerConfiguration<DetectionFrameConfig>();
193
194 CheckImages::getInstance().reportConfigDependencies(config_manager);
195
196 //plugins need to be registered before reportConfigDependencies()
205
206 config_parameters.add(config_manager.closeRegistration());
207 config_initialized = true;
208 }
209 return config_parameters;
210 }
211
214 auto options = getConfigParameters();
215
216 options.add_options() (LIST_OUTPUT_PROPERTIES.c_str(), po::bool_switch(),
217 "List the possible output properties for the given input parameters and exit");
218 options.add_options() (PROPERTY_COLUMN_MAPPING_ALL.c_str(), po::bool_switch(),
219 "Show the columns created for each property");
220 options.add_options() (PROPERTY_COLUMN_MAPPING.c_str(), po::bool_switch(),
221 "Show the columns created for each property, for the given configuration");
222 options.add_options() (DUMP_CONFIG.c_str(), po::bool_switch(),
223 "Dump parameters with default values into a configuration file");
225
226 // Allow to pass Python options as positional following --
227 po::positional_options_description p;
228 p.add("python-arg", -1);
229
230 return {options, p};
231 }
232
234 template <typename T>
235 static void writeDefault(std::ostream& out, const po::option_description& opt, const boost::any& default_value) {
236 out << opt.long_name() << '=' << boost::any_cast<T>(default_value) << std::endl;
237 }
238
240 template <typename T>
241 static void writeDefaultMultiple(std::ostream& out, const po::option_description& opt, const boost::any& default_value) {
242 auto values = boost::any_cast<std::vector<T>>(default_value);
243 if (values.empty()) {
244 out << "# " << opt.long_name() << '=' << std::endl;
245 }
246 else {
247 for (const auto& v : values)
248 out << opt.long_name() << '=' << v << std::endl;
249 }
250 }
251
254 typedef std::function<void(std::ostream&, const po::option_description&, const boost::any&)> PrinterFunction;
256 {typeid(bool), &writeDefault<bool>},
257 {typeid(int), &writeDefault<int>},
258 {typeid(double), &writeDefault<double>},
259 {typeid(std::string), &writeDefault<std::string>},
260 {typeid(std::vector<std::string>), &writeDefaultMultiple<std::string>}
261 };
262 decltype(printers)::const_iterator printer;
263
265 for (const auto& p : config_parameters.options()) {
266 boost::any default_value;
267
268 std::cout << "# " << p->description() << std::endl;
269 if (!p->semantic()->apply_default(default_value)) {
270 std::cout << '#' << p->long_name() << "=" << std::endl;
271 }
272 else if ((printer = printers.find(default_value.type())) == printers.end()) {
273 std::cout << '#' << p->long_name() << "=<Unknown type " << default_value.type().name() << '>' << std::endl;
274 }
275 else {
276 printer->second(std::cout, *p, default_value);
277 }
279 }
280
281 // We need to print the log options manually, as that is set up by Elements
282 std::cout << "# Log level: FATAL, ERROR, WARN, INFO, DEBUG" << std::endl;
283 std::cout << "log-level=INFO" << std::endl;
284 std::cout << "# Log file" << std::endl;
285 std::cout << "#log-file" << std::endl;
286 }
287
289
290 // If the user just requested to see the possible output columns we show
291 // them and we do nothing else
292
293 if (args.at(LIST_OUTPUT_PROPERTIES).as<bool>()) {
294 for (auto& name : output_registry->getOutputPropertyNames()) {
295 std::cout << name << std::endl;
296 }
298 }
299
300 if (args.at(PROPERTY_COLUMN_MAPPING_ALL).as<bool>()) {
303 }
304
305 if (args.at(DUMP_CONFIG).as<bool>()) {
308 }
309
310 // Make sure the BLAS multithreading does not interfere
312
313 // Elements does not verify that the config-file exists. It will just not read it.
314 // We verify that it does exist here.
315 if (args.find("config-file") != args.end()) {
316 auto cfg_file = args.at("config-file").as<fs::path>();
317 if (cfg_file != "" && !fs::exists(cfg_file)) {
318 throw Elements::Exception() << "The configuration file '" << cfg_file << "' does not exist";
319 }
320 }
321
322 // Create the progress listener and printer ASAP
324 auto progress_mediator = progress_printer_factory.createProgressMediator();
325
326 // Initialize the rest of the components
327 auto& config_manager = ConfigManager::getInstance(config_manager_id);
328 config_manager.initialize(args);
329
330 // Configure TileManager
331 auto memory_config = config_manager.getConfiguration<MemoryConfig>();
332 TileManager::getInstance()->setOptions(memory_config.getTileSize(),
333 memory_config.getTileSize(), memory_config.getTileMaxMemory());
334
335 CheckImages::getInstance().configure(config_manager);
336
337 task_factory_registry->configure(config_manager);
339
340 segmentation_factory.configure(config_manager);
341 partition_factory.configure(config_manager);
342 grouping_factory.configure(config_manager);
343 deblending_factory.configure(config_manager);
344 measurement_factory.configure(config_manager);
345 output_factory.configure(config_manager);
346
347 if (args.at(PROPERTY_COLUMN_MAPPING).as<bool>()) {
348 output_registry->printPropertyColumnMap(config_manager.getConfiguration<OutputConfig>().getOutputProperties());
350 }
351
352 auto segmentation = segmentation_factory.createSegmentation();
353
354 // Multithreading
355 auto multithreading_config = config_manager.getConfiguration<MultiThreadingConfig>();
356 auto thread_pool = multithreading_config.getThreadPool();
357
358 // Rest of the stages
360 auto source_grouping = grouping_factory.createGrouping();
361
365
366 // Prefetcher
368 if (thread_pool) {
369 auto prefetch = source_grouping->requiredProperties();
370 auto deblending_prefetch = deblending->requiredProperties();
371 prefetch.insert(deblending_prefetch.begin(), deblending_prefetch.end());
372 if (!prefetch.empty()) {
373 prefetcher = std::make_shared<Prefetcher>(thread_pool, multithreading_config.getMaxQueueSize());
374 prefetcher->requestProperties(prefetch);
375 }
376 }
377
378 // Link together the pipeline's steps
379 segmentation->setNextStage(partition);
380
381 if (prefetcher) {
382 partition->setNextStage(prefetcher);
383 prefetcher->setNextStage(source_grouping);
384 }
385 else {
386 partition->setNextStage(source_grouping);
387 }
388
389 source_grouping->setNextStage(deblending);
390 deblending->setNextStage(measurement);
391
392 if (config_manager.getConfiguration<OutputConfig>().getOutputUnsorted()) {
393 logger.info() << "Writing output following measure order";
394 measurement->setNextStage(output);
395 } else {
396 logger.info() << "Writing output following segmentation order";
397 auto sorter = std::make_shared<Sorter>();
398 measurement->setNextStage(sorter);
399 sorter->setNextStage(output);
400 }
401
402 segmentation->Observable<SegmentationProgress>::addObserver(progress_mediator->getSegmentationObserver());
403 segmentation->Observable<SourceInterface>::addObserver(progress_mediator->getDetectionObserver());
404 deblending->Observable<SourceGroupInterface>::addObserver(progress_mediator->getDeblendingObserver());
405 measurement->Observable<SourceGroupInterface>::addObserver(progress_mediator->getMeasurementObserver());
406
407 // Add observers for CheckImages
408 if (CheckImages::getInstance().getSegmentationImage(0) != nullptr) {
409 segmentation->Observable<SourceInterface>::addObserver(std::make_shared<DetectionIdCheckImage>());
410 }
411 if (CheckImages::getInstance().getPartitionImage(0) != nullptr) {
412 measurement->Observable<SourceGroupInterface>::addObserver(std::make_shared<SourceIdCheckImage>());
413 }
414 if (CheckImages::getInstance().getGroupImage(0) != nullptr) {
415 measurement->Observable<SourceGroupInterface>::addObserver(std::make_shared<GroupIdCheckImage>());
416 }
417 if (CheckImages::getInstance().getMoffatImage(0) != nullptr) {
418 measurement->Observable<SourceGroupInterface>::addObserver(std::make_shared<MoffatCheckImage>());
419 }
420 const auto& detection_frames = config_manager.getConfiguration<DetectionFrameConfig>().getDetectionFrames();
421
422 // Perform measurements (multi-threaded part)
423 measurement->startThreads();
424
425 size_t prev_writen_rows = 0;
426 size_t frame_number = 0;
427 for (auto& detection_frame : detection_frames) {
428 frame_number++;
429 try {
430 // Process the image
431 logger.info() << "Processing frame "
432 << frame_number << " / " << detection_frames.size() << " : " << detection_frame->getLabel();
433 segmentation->processFrame(detection_frame);
434 }
435 catch (const std::exception &e) {
436 logger.error() << "Failed to process the frame! " << e.what();
437 measurement->stopThreads();
439 }
440
441 if (prefetcher) {
442 prefetcher->synchronize();
443 }
444 measurement->synchronizeThreads();
445
446 size_t nb_writen_rows = output->flush();
447 output->nextPart();
448
449 logger.info() << (nb_writen_rows - prev_writen_rows) << " sources detected in frame, " << nb_writen_rows << " total";
450
451 prev_writen_rows = nb_writen_rows;
452 }
453
454 if (prefetcher) {
455 prefetcher->wait();
456 }
457 measurement->stopThreads();
458
459 CheckImages::getInstance().saveImages();
460 TileManager::getInstance()->flush();
461 progress_mediator->done();
462
463 if (prev_writen_rows > 0) {
464 logger.info() << "total " << prev_writen_rows << " sources detected";
465 } else {
466 logger.info() << "NO SOURCES DETECTED";
467 }
468
470 }
471};
472
473
475
476public:
478 m_plugin_path(plugin_path), m_plugin_list(plugin_list) {
479 }
480
481 virtual ~PluginOptionsMain() = default;
482
483 boost::program_options::options_description defineSpecificProgramOptions() override {
484 auto& config_manager = ConfigManager::getInstance(conf_man_id);
485 config_manager.registerConfiguration<PluginConfig>();
486 auto options = config_manager.closeRegistration();
487 // The following will consume any extra options in the configuration file
488 options.add_options()("*", po::value<std::vector<std::string>>());
489 return options;
490 }
491
493 auto& config_manager = ConfigManager::getInstance(conf_man_id);
494 config_manager.initialize(args);
495 auto& conf = config_manager.getConfiguration<PluginConfig>();
496 m_plugin_path = conf.getPluginPath();
497 m_plugin_list = conf.getPluginList();
499 }
500
501private:
502
506
507};
508
509
510static void forwardOptions(int argc, char *const *argv, std::vector<std::string>& plugin_options_input) {
511 for (int i = 0; i < argc; ++i) {
512 std::string option{argv[i]};
513 if (option == "--config-file") {
514 plugin_options_input.emplace_back("--config-file");
515 plugin_options_input.emplace_back(std::string{argv[i + 1]});
516 }
517 if (boost::starts_with(option, "--config-file=")) {
518 plugin_options_input.emplace_back(option);
519 }
520 if (option == "--plugin-directory") {
521 plugin_options_input.emplace_back("--plugin-directory");
522 plugin_options_input.emplace_back(std::string{argv[i + 1]});
523 }
524 if (boost::starts_with(option, "--plugin-directory=")) {
525 plugin_options_input.emplace_back(option);
526 }
527 if (option == "--plugin") {
528 plugin_options_input.emplace_back("--plugin");
529 plugin_options_input.emplace_back(std::string{argv[i + 1]});
530 }
531 if (boost::starts_with(option, "--plugin=")) {
532 plugin_options_input.emplace_back(option);
533 }
534 }
535}
536
537
538ELEMENTS_API int main(int argc, char* argv[]) {
539 std::string plugin_path {};
540 std::vector<std::string> plugin_list {};
541
542 // This adds the current directory as a valid location for the default "sourcextractor++.conf" configuration
543 Elements::TempEnv local_env;
544 if (local_env["ELEMENTS_CONF_PATH"].empty()) {
545 local_env["ELEMENTS_CONF_PATH"] = ".:/etc";
546 } else {
547 local_env["ELEMENTS_CONF_PATH"] = ".:" + local_env["ELEMENTS_CONF_PATH"] + ":/etc";
548 }
549
551
552 // Try to be reasonably graceful with unhandled exceptions
554
555 try {
556 // First we create a program which has a sole purpose to get the options for
557 // the plugin paths. Note that we do not want to have this helper program
558 // to handle any other options except of the plugin-directory and plugin, so
559 // we create a subset of the given options with only the necessary ones. We
560 // also turn off the the logging.
561 std::vector<int> masked_indices{};
562 std::vector<std::string> plugin_options_input{};
563 plugin_options_input.emplace_back("DummyProgram");
564 plugin_options_input.emplace_back("--log-level");
565 plugin_options_input.emplace_back("ERROR");
566 forwardOptions(argc, argv, plugin_options_input);
567
568 int argc_tmp = plugin_options_input.size();
569 std::vector<const char *> argv_tmp(argc_tmp);
570 for (unsigned int i = 0; i < plugin_options_input.size(); ++i) {
571 auto& option_str = plugin_options_input[i];
572 argv_tmp[i] = option_str.data();
573 }
574
575 CREATE_MANAGER_WITH_ARGS(plugin_options_program, PluginOptionsMain, plugin_path, plugin_list);
576 plugin_options_program.run(argc_tmp, const_cast<char **>(argv_tmp.data()));
577
578 CREATE_MANAGER_WITH_ARGS(main, SEMain, plugin_path, plugin_list);
579 Elements::ExitCode exit_code = main.run(argc, argv);
580 return static_cast<Elements::ExitCodeType>(exit_code);
581 }
582 catch (const std::exception &e) {
583 logger.fatal() << e.what();
585 }
586 catch (...) {
587 logger.fatal() << "Unknown exception type!";
588 logger.fatal() << "Please, report this as a bug";
590 }
591}
static const std::string PROPERTY_COLUMN_MAPPING
static void setupEnvironment(void)
static void disableBlasMultithreading()
static Elements::Logging logger
static const std::string LIST_OUTPUT_PROPERTIES
static void forwardOptions(int argc, char *const *argv, std::vector< std::string > &plugin_options_input)
static const std::string PROPERTY_COLUMN_MAPPING_ALL
static const std::string DUMP_CONFIG
static long config_manager_id
ELEMENTS_API int main(int argc, char *argv[])
T at(T... args)
T c_str(T... args)
void error(const std::string &logMessage)
void debug(const std::string &logMessage)
static Logging getLogger(const std::string &name="")
void fatal(const std::string &logMessage)
void info(const std::string &logMessage)
static void onTerminate() noexcept
virtual void handleMessage(const std::shared_ptr< SourceGroupInterface > &group) override
std::list< std::shared_ptr< SourceGroupInterface > > m_list
boost::program_options::options_description defineSpecificProgramOptions() override
Elements::ExitCode mainMethod(std::map< std::string, boost::program_options::variable_value > &args) override
std::string & m_plugin_path
virtual ~PluginOptionsMain()=default
PluginOptionsMain(std::string &plugin_path, std::vector< std::string > &plugin_list)
std::vector< std::string > & m_plugin_list
DeblendingFactory deblending_factory
std::shared_ptr< OutputRegistry > output_registry
std::pair< po::options_description, po::positional_options_description > defineProgramArguments() override
Return the arguments that the program accepts.
po::options_description getConfigParameters()
PartitionFactory partition_factory
SegmentationFactory segmentation_factory
void printDefaults()
Print a configuration file populated with defaults.
std::shared_ptr< SourceGroupFactory > group_factory
ProgressReporterFactory progress_printer_factory
GroupingFactory grouping_factory
std::shared_ptr< SourceFactory > source_factory
std::shared_ptr< TaskFactoryRegistry > task_factory_registry
SEMain(const std::string &plugin_path, const std::vector< std::string > &plugin_list)
OutputFactory output_factory
Elements::ExitCode mainMethod(std::map< std::string, po::variable_value > &args) override
static void writeDefaultMultiple(std::ostream &out, const po::option_description &opt, const boost::any &default_value)
Print a multiple-value option.
bool config_initialized
static void writeDefault(std::ostream &out, const po::option_description &opt, const boost::any &default_value)
Print a simple option.
MeasurementFactory measurement_factory
PluginManager plugin_manager
std::shared_ptr< TaskProvider > task_provider
po::options_description config_parameters
std::list< std::shared_ptr< SourceWithOnDemandProperties > > m_list
virtual void handleMessage(const std::shared_ptr< SourceWithOnDemandProperties > &source) override
std::unique_ptr< Deblending > createDeblending() const
void configure(Euclid::Configuration::ConfigManager &manager) override
Method which should initialize the object.
void reportConfigDependencies(Euclid::Configuration::ConfigManager &manager) const override
Registers all the Configuration dependencies.
Provides combined detection frame.
void configure(Euclid::Configuration::ConfigManager &manager) override
Method which should initialize the object.
std::shared_ptr< SourceGrouping > createGrouping() const
void reportConfigDependencies(Euclid::Configuration::ConfigManager &manager) const override
Registers all the Configuration dependencies.
void configure(Euclid::Configuration::ConfigManager &manager) override
Method which should initialize the object.
void reportConfigDependencies(Euclid::Configuration::ConfigManager &manager) const override
Registers all the Configuration dependencies.
std::unique_ptr< Measurement > getMeasurement() const
const std::shared_ptr< Euclid::ThreadPool > & getThreadPool() const
Observer interface to be used with Observable to implement the Observer pattern.
Definition: Observable.h:38
const std::vector< std::string > getOutputProperties()
void configure(Euclid::Configuration::ConfigManager &manager) override
Method which should initialize the object.
void reportConfigDependencies(Euclid::Configuration::ConfigManager &manager) const override
Registers all the Configuration dependencies.
std::shared_ptr< Output > createOutput() const
std::set< std::string > getOutputPropertyNames()
void printPropertyColumnMap(const std::vector< std::string > &properties={})
void configure(Euclid::Configuration::ConfigManager &manager) override
Method which should initialize the object.
std::shared_ptr< Partition > getPartition() const
void reportConfigDependencies(Euclid::Configuration::ConfigManager &manager) const override
Registers all the Configuration dependencies.
PluginManager handles the loading of plugins and calls their registration function,...
Definition: PluginManager.h:53
void loadPlugins()
loads all the available plugins. Both those linked at compile-time and those loaded at run-time
std::shared_ptr< ProgressMediator > createProgressMediator(void) const
void addOptions(boost::program_options::options_description &options) const
void configure(const std::map< std::string, boost::program_options::variable_value > &args)
The SegmentationFactory will provide a Segmentation implementation based on the current configuration...
void reportConfigDependencies(Euclid::Configuration::ConfigManager &manager) const override
Registers all the Configuration dependencies.
void configure(Euclid::Configuration::ConfigManager &manager) override
Method which should initialize the object.
std::shared_ptr< Segmentation > createSegmentation() const
void configure(Euclid::Configuration::ConfigManager &manager) override
Method which should initialize the object.
void registerPropertyInstances(OutputRegistry &output_registry)
void reportConfigDependencies(Euclid::Configuration::ConfigManager &manager) const override
Registers all the Configuration dependencies.
T data(T... args)
T emplace_back(T... args)
T end(T... args)
T endl(T... args)
T find(T... args)
T getenv(T... args)
#define ELEMENTS_API
#define CREATE_MANAGER_WITH_ARGS(MANAGER, ELEMENTS_PROGRAM,...)
constexpr double e
std::underlying_type< ExitCode >::type ExitCodeType
long getUniqueManagerId() noexcept
Definition: conf.py:1
T partition(T... args)
T push_back(T... args)
T set_terminate(T... args)