48 #include "BESInterface.h" 50 #include "TheBESKeys.h" 51 #include "BESResponseHandler.h" 52 #include "BESAggFactory.h" 53 #include "BESAggregationServer.h" 54 #include "BESReporterList.h" 55 #include "BESContextManager.h" 57 #include "BESExceptionManager.h" 59 #include "BESDataNames.h" 62 #include "BESStopWatch.h" 63 #include "BESTimeoutError.h" 64 #include "BESInternalError.h" 65 #include "BESInternalFatalError.h" 71 list<p_bes_init> BESInterface::_init_list;
72 list<p_bes_end> BESInterface::_end_list;
74 static jmp_buf timeout_jump;
75 static bool timeout_jump_valid =
false;
86 static volatile int timeout = 0;
88 #define BES_TIMEOUT_KEY "BES.TimeOutInSeconds" 99 static void catch_sig_alarm(
int sig)
101 if (sig == SIGALRM) {
102 LOG(
"Child listener timeout after " << timeout <<
" seconds, exiting." << endl);
107 if (timeout_jump_valid)
108 longjmp(timeout_jump, 1);
113 signal(SIGTERM, SIG_DFL);
119 static void register_signal_handler()
121 struct sigaction act;
122 sigemptyset(&act.sa_mask);
123 sigaddset(&act.sa_mask, SIGALRM);
129 act.sa_handler = catch_sig_alarm;
130 if (sigaction(SIGALRM, &act, 0))
131 throw BESInternalFatalError(
"Could not register a handler to catch alarm/timeout.", __FILE__, __LINE__);
162 static pthread_t alarm_thread;
164 static void* alarm_wait(
void * )
166 BESDEBUG(
"bes",
"Starting: " << __PRETTY_FUNCTION__ << endl);
170 sigemptyset(&sigset);
171 sigaddset(&sigset, SIGALRM);
172 sigprocmask(SIG_BLOCK, &sigset, NULL);
177 int result = sigwait(&sigset, &sig);
179 BESDEBUG(
"bes",
"Fatal error establishing timeout: " << strerror(result) << endl);
180 throw BESInternalFatalError(
string(
"Fatal error establishing timeout: ") + strerror(result), __FILE__, __LINE__);
182 else if (result == 0 && sig == SIGALRM) {
183 BESDEBUG(
"bes",
"Timeout found in " << __PRETTY_FUNCTION__ << endl);
188 oss <<
"While waiting for a timeout, found signal '" << result <<
"' in " << __PRETTY_FUNCTION__ << ends;
189 BESDEBUG(
"bes", oss.str() << endl);
194 static void wait_for_timeout()
196 BESDEBUG(
"bes",
"Entering: " << __PRETTY_FUNCTION__ << endl);
198 pthread_attr_t thread_attr;
200 if (pthread_attr_init(&thread_attr) != 0)
202 if (pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_DETACHED ) != 0)
203 throw BESInternalFatalError(
"Failed to complete pthread attribute initialization.", __FILE__, __LINE__);
205 int status = pthread_create(&alarm_thread, &thread_attr, alarm_wait, NULL);
211 BESInterface::BESInterface(ostream *output_stream) :
212 _strm(output_stream), _timeout_from_keys(0), _dhi(0), _transmitter(0)
214 if (!output_stream) {
215 throw BESInternalError(
"output stream must be set in order to output responses", __FILE__, __LINE__);
223 string timeout_key_value;
226 istringstream iss(timeout_key_value);
227 iss >> _timeout_from_keys;
231 register_signal_handler();
238 BESInterface::~BESInterface()
280 extern BESStopWatch *bes_timing::elapsedTimeToReadStart;
281 extern BESStopWatch *bes_timing::elapsedTimeToTransmitStart;
283 int BESInterface::execute_request(
const string &from)
285 BESDEBUG(
"bes",
"Entering: " << __PRETTY_FUNCTION__ << endl);
288 throw BESInternalError(
"DataHandlerInterface can not be null", __FILE__, __LINE__);
292 if (BESISDEBUG(TIMING_LOG)) {
293 sw.
start(
"BESInterface::execute_request", _dhi->data[REQUEST_ID]);
295 bes_timing::elapsedTimeToReadStart =
new BESStopWatch();
296 bes_timing::elapsedTimeToReadStart->
start(
"TIME_TO_READ_START", _dhi->data[REQUEST_ID]);
298 bes_timing::elapsedTimeToTransmitStart =
new BESStopWatch();
299 bes_timing::elapsedTimeToTransmitStart->
start(
"TIME_TO_TRANSMIT_START", _dhi->data[REQUEST_ID]);
303 throw BESInternalError(
"DataHandlerInterface can not be null", __FILE__, __LINE__);
306 _dhi->set_output_stream(_strm);
307 _dhi->data[REQUEST_FROM] = from;
309 pid_t thepid = getpid();
312 _dhi->data[SERVER_PID] = ss.str();
324 *(BESLog::TheLog()) << _dhi->data[SERVER_PID] <<
" from " << _dhi->data[REQUEST_FROM] <<
" request received" 329 validate_data_request();
331 build_data_request_plan();
334 throw BESInternalError(
"Unable to transmit the response, no transmitter", __FILE__, __LINE__);
343 if (setjmp(timeout_jump) == 0) {
344 timeout_jump_valid =
true;
345 execute_data_request_plan();
347 timeout_jump_valid =
false;
351 oss <<
"BES listener timeout after " << timeout <<
" seconds." << ends;
355 _dhi->executed =
true;
358 timeout_jump_valid =
false;
359 return exception_manager(ex);
361 catch (bad_alloc &e) {
362 timeout_jump_valid =
false;
364 return exception_manager(ex);
366 catch (exception &e) {
367 timeout_jump_valid =
false;
369 return exception_manager(ex);
372 timeout_jump_valid =
false;
373 BESInternalError ex(
"An undefined exception has been thrown", __FILE__, __LINE__);
374 return exception_manager(ex);
377 delete bes_timing::elapsedTimeToReadStart;
378 bes_timing::elapsedTimeToReadStart = 0;
380 delete bes_timing::elapsedTimeToTransmitStart;
381 bes_timing::elapsedTimeToTransmitStart = 0;
389 int BESInterface::finish(
int )
391 BESDEBUG(
"bes",
"Entering: " << __PRETTY_FUNCTION__ <<
" ***" << endl);
400 if (_dhi->error_info) {
402 delete _dhi->error_info;
403 _dhi->error_info = 0;
407 status = exception_manager(ex);
409 catch (bad_alloc &) {
410 string serr =
"BES out of memory";
412 status = exception_manager(ex);
415 string serr =
"An undefined exception has been thrown";
417 status = exception_manager(ex);
424 if (_dhi->error_info) {
425 _dhi->error_info->print(cout);
426 delete _dhi->error_info;
427 _dhi->error_info = 0;
437 (*BESLog::TheLog()) <<
"Problem logging status: " << ex.
get_message() << endl;
440 (*BESLog::TheLog()) <<
"Unknown problem logging status" << endl;
447 (*BESLog::TheLog()) <<
"Problem reporting request: " << ex.
get_message() << endl;
450 (*BESLog::TheLog()) <<
"Unknown problem reporting request" << endl;
457 (*BESLog::TheLog()) <<
"Problem ending request: " << ex.
get_message() << endl;
460 (*BESLog::TheLog()) <<
"Unknown problem ending request" << endl;
466 int BESInterface::finish_with_error(
int status)
468 if (!_dhi->error_info) {
470 string serr =
"Finish_with_error called with no error object";
472 status = exception_manager(ex);
475 return finish(status);
478 void BESInterface::add_init_callback(p_bes_init init)
480 _init_list.push_back(init);
491 if (BESISDEBUG(TIMING_LOG)) sw.
start(
"BESInterface::initialize", _dhi->data[REQUEST_ID]);
493 BESDEBUG(
"bes",
"Initializing request: " << _dhi->data[DATA_REQUEST] <<
" ... " << endl);
494 bool do_continue =
true;
495 init_iter i = _init_list.begin();
497 for (; i != _init_list.end() && do_continue ==
true; i++) {
499 do_continue = p(*_dhi);
503 BESDEBUG(
"bes",
"FAILED" << endl);
504 string se =
"Initialization callback failed, exiting";
508 BESDEBUG(
"bes",
"OK" << endl);
537 if (BESISDEBUG(TIMING_LOG))
538 sw.
start(
"BESInterface::execute_data_request_plan(\"" + _dhi->data[DATA_REQUEST] +
"\")",
539 _dhi->data[REQUEST_ID]);
544 string context = BESContextManager::TheManager()->
get_context(
"bes_timeout", found);
546 timeout = strtol(context.c_str(), NULL, 10);
547 VERBOSE(
"Set request timeout to " << timeout <<
" seconds (from context)." << endl);
550 else if (_timeout_from_keys != 0) {
551 timeout = _timeout_from_keys;
552 VERBOSE(
"Set request timeout to " << timeout <<
" seconds (from keys)." << endl);
557 BESDEBUG(
"bes",
"Executing request: " << _dhi->data[DATA_REQUEST] <<
" ... " << endl);
563 BESDEBUG(
"bes",
"FAILED" << endl);
564 string se =
"The response handler \"" + _dhi->action +
"\" does not exist";
567 BESDEBUG(
"bes",
"OK" << endl);
570 invoke_aggregation();
587 if (BESISDEBUG(TIMING_LOG)) sw.
start(
"BESInterface::invoke_aggregation", _dhi->data[REQUEST_ID]);
589 if (_dhi->data[AGG_CMD] !=
"") {
590 BESDEBUG(
"bes",
"aggregating with: " << _dhi->data[AGG_CMD] <<
" ... "<< endl);
596 BESDEBUG(
"bes",
"FAILED" << endl);
597 string se =
"The aggregation handler " + _dhi->data[AGG_HANDLER] +
"does not exist";
600 BESDEBUG(
"bes",
"OK" << endl);
620 if (BESISDEBUG(TIMING_LOG)) sw.
start(
"BESInterface::transmit_data", _dhi->data[REQUEST_ID]);
622 BESDEBUG(
"bes",
"BESInterface::transmit_data() - Transmitting request: " << _dhi->data[DATA_REQUEST] << endl);
626 if (_dhi->error_info) {
628 _dhi->error_info->print(strm);
629 (*BESLog::TheLog()) << strm.str() << endl;
630 BESDEBUG(
"bes",
" transmitting error info using transmitter ... " << endl << strm.str() << endl);
632 _dhi->error_info->transmit(_transmitter, *_dhi);
634 else if (_dhi->response_handler) {
635 BESDEBUG(
"bes",
" BESInterface::transmit_data() - Response handler " << _dhi->response_handler->get_name() << endl);
637 _dhi->response_handler->transmit(_transmitter, *_dhi);
643 if (_dhi->error_info) {
644 BESDEBUG(
"bes",
"BESInterface::transmit_data() - Transmitting error info using cout ... " << endl);
645 _dhi->error_info->print(cout);
646 delete _dhi->error_info;
647 _dhi->error_info = 0;
650 BESDEBUG(
"bes",
"BESInterface::transmit_data() - Unable to transmit the response ... FAILED " << endl);
652 throw BESInternalError(
"Unable to transmit the response, no transmitter", __FILE__, __LINE__);
657 BESDEBUG(
"bes",
"BESInterface::transmit_data() - OK" << endl);
679 BESDEBUG(
"bes",
"Reporting on request: " << _dhi->data[DATA_REQUEST] <<
" ... " << endl);
681 BESReporterList::TheList()->report(*_dhi);
683 BESDEBUG(
"bes",
"OK" << endl);
686 void BESInterface::add_end_callback(p_bes_end end)
688 _end_list.push_back(end);
698 BESDEBUG(
"bes",
"Ending request: " << _dhi->data[DATA_REQUEST] <<
" ... " << endl);
699 end_iter i = _end_list.begin();
700 for (; i != _end_list.end(); i++) {
707 _dhi->first_container();
708 while (_dhi->container) {
709 BESDEBUG(
"bes",
"Calling BESContainer::release()" << endl);
710 _dhi->container->release();
711 _dhi->next_container();
714 BESDEBUG(
"bes",
"OK" << endl);
721 if (_dhi) _dhi->clean();
751 strm << BESIndent::LMarg <<
"BESInterface::dump - (" << (
void *)
this <<
")" << endl;
754 if (_init_list.size()) {
755 strm << BESIndent::LMarg <<
"termination functions:" << endl;
757 init_iter i = _init_list.begin();
758 for (; i != _init_list.end(); i++) {
761 strm << BESIndent::LMarg << (
void *) (*i) << endl;
763 BESIndent::UnIndent();
766 strm << BESIndent::LMarg <<
"termination functions: none" << endl;
769 if (_end_list.size()) {
770 strm << BESIndent::LMarg <<
"termination functions:" << endl;
772 end_iter i = _end_list.begin();
773 for (; i != _end_list.end(); i++) {
774 strm << BESIndent::LMarg << (
void *) (*i) << endl;
776 BESIndent::UnIndent();
779 strm << BESIndent::LMarg <<
"termination functions: none" << endl;
782 strm << BESIndent::LMarg <<
"data handler interface:" << endl;
785 BESIndent::UnIndent();
788 strm << BESIndent::LMarg <<
"transmitter:" << endl;
790 _transmitter->dump(strm);
791 BESIndent::UnIndent();
794 strm << BESIndent::LMarg <<
"transmitter: not set" << endl;
796 BESIndent::UnIndent();
error thrown if there is a user syntax error in the request or any other user error ...
exception thrown if an internal error is found and is fatal to the BES
exception thrown if inernal error encountered
virtual void initialize()
Initialize the BES object.
virtual std::string get_message()
get the error message for this exception
virtual void execute(BESDataHandlerInterface &dhi)=0
knows how to build a requested response object
virtual int exception_manager(BESError &e)
Manage any exceptions thrown during the whole process.
virtual void aggregate(BESDataHandlerInterface &dhi)=0
aggregate the response object
virtual string get_context(const string &name, bool &found)
retrieve the value of the specified context from the BES
virtual bool start(string name)
virtual void transmit_data()
Transmit the resulting response object.
handler object that knows how to create a specific response object
Abstract exception class for the BES with basic string message.
virtual void report_request()
Report the request and status of the request to BESReporterList::TheList()
virtual void validate_data_request()
Validate the incoming request information.
virtual void clean()
Clean up after the request.
void get_value(const string &s, string &val, bool &found)
Retrieve the value of a given key, if set.
virtual void invoke_aggregation()
Aggregate the resulting response object.
virtual void end_request()
End the BES request.
virtual void dump(ostream &strm) const
dumps information about this object
virtual BESAggregationServer * find_handler(const string &handler_name)
returns the aggregation handler with the given name in the list
virtual void log_status()
Log the status of the request.
virtual int handle_exception(BESError &e, BESDataHandlerInterface &dhi)
Manage any exceptions thrown during the handling of a request.
Abstraction representing mechanism for aggregating data.
virtual void execute_data_request_plan()
Execute the data request plan.
static BESKeys * TheKeys()