00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #ifndef _PASSENGER_LOGGING_H_
00026 #define _PASSENGER_LOGGING_H_
00027
00028 #include <boost/shared_ptr.hpp>
00029 #include <oxt/system_calls.hpp>
00030 #include <oxt/backtrace.hpp>
00031
00032 #include <sys/types.h>
00033 #include <sys/time.h>
00034 #include <sys/file.h>
00035 #include <sys/resource.h>
00036 #include <unistd.h>
00037 #include <fcntl.h>
00038 #include <pthread.h>
00039 #include <string>
00040 #include <map>
00041 #include <ostream>
00042 #include <sstream>
00043 #include <cstdio>
00044 #include <ctime>
00045
00046 #include "RandomGenerator.h"
00047 #include "FileDescriptor.h"
00048 #include "MessageClient.h"
00049 #include "StaticString.h"
00050 #include "Exceptions.h"
00051 #include "Utils.h"
00052 #include "Utils/StrIntUtils.h"
00053 #include "Utils/MD5.h"
00054 #include "Utils/SystemTime.h"
00055
00056
00057 namespace Passenger {
00058
00059 using namespace std;
00060 using namespace boost;
00061 using namespace oxt;
00062
00063
00064
00065
00066 extern int _logLevel;
00067 extern ostream *_logStream;
00068
00069 int getLogLevel();
00070 void setLogLevel(int value);
00071 void setDebugFile(const char *logFile = NULL);
00072
00073
00074
00075
00076
00077
00078
00079 #define P_LOG_TO(level, expr, stream) \
00080 do { \
00081 if (stream != 0 && Passenger::_logLevel >= level) { \
00082 time_t the_time; \
00083 struct tm the_tm; \
00084 char datetime_buf[60]; \
00085 struct timeval tv; \
00086 std::stringstream sstream; \
00087 \
00088 the_time = time(NULL); \
00089 localtime_r(&the_time, &the_tm); \
00090 strftime(datetime_buf, sizeof(datetime_buf), "%F %H:%M:%S", &the_tm); \
00091 gettimeofday(&tv, NULL); \
00092 sstream << \
00093 "[ pid=" << ((unsigned long) getpid()) << \
00094 " thr=" << pthread_self() << \
00095 " file=" << __FILE__ << ":" << (unsigned long) __LINE__ << \
00096 " time=" << datetime_buf << "." << (unsigned long) (tv.tv_usec / 1000) << \
00097 " ]: " << \
00098 expr << std::endl; \
00099 *stream << sstream.str(); \
00100 stream->flush(); \
00101 } \
00102 } while (false)
00103
00104
00105
00106
00107 #define P_LOG(level, expr) P_LOG_TO(level, expr, Passenger::_logStream)
00108
00109
00110
00111
00112
00113 #define P_WARN(expr) P_LOG(0, expr)
00114
00115
00116
00117
00118
00119 #define P_ERROR(expr) P_LOG(-1, expr)
00120
00121
00122
00123
00124
00125 #define P_DEBUG(expr) P_TRACE(1, expr)
00126
00127 #ifdef PASSENGER_DEBUG
00128 #define P_TRACE(level, expr) P_LOG_TO(level, expr, Passenger::_logStream)
00129
00130 #define P_ASSERT(expr, result_if_failed, message) \
00131 do { \
00132 if (!(expr)) { \
00133 P_ERROR("Assertion failed: " << message); \
00134 return result_if_failed; \
00135 } \
00136 } while (false)
00137 #define P_ASSERT_WITH_VOID_RETURN(expr, message) \
00138 do { \
00139 if (!(expr)) { \
00140 P_ERROR("Assertion failed: " << message); \
00141 return; \
00142 } \
00143 } while (false)
00144 #else
00145 #define P_TRACE(level, expr) do { } while (false)
00146
00147 #define P_ASSERT(expr, result_if_failed, message) do { } while (false)
00148 #define P_ASSERT_WITH_VOID_RETURN(expr, message) do { } while (false)
00149 #endif
00150
00151
00152
00153
00154 struct AnalyticsLoggerSharedData {
00155 boost::mutex lock;
00156 MessageClient client;
00157
00158 void disconnect(bool checkErrorResponse = false) {
00159 if (checkErrorResponse && client.connected()) {
00160
00161
00162 TRACE_POINT();
00163 vector<string> args;
00164 bool hasData = true;
00165
00166 try {
00167 hasData = client.read(args);
00168 } catch (const SystemException &e) {
00169 if (e.code() != ECONNRESET) {
00170 throw;
00171 }
00172 }
00173
00174 UPDATE_TRACE_POINT();
00175 client.disconnect();
00176 if (hasData) {
00177 if (args[0] == "error") {
00178 throw IOException("The logging server responded with an error: " + args[1]);
00179 } else {
00180 throw IOException("The logging server sent an unexpected reply.");
00181 }
00182 }
00183 } else {
00184 client.disconnect();
00185 }
00186 }
00187 };
00188 typedef shared_ptr<AnalyticsLoggerSharedData> AnalyticsLoggerSharedDataPtr;
00189
00190 class AnalyticsLog {
00191 private:
00192 static const int INT64_STR_BUFSIZE = 22;
00193
00194 AnalyticsLoggerSharedDataPtr sharedData;
00195 string txnId;
00196 string groupName;
00197 string category;
00198 string unionStationKey;
00199 bool shouldFlushToDiskAfterClose;
00200
00201
00202
00203
00204 char *insertTxnIdAndTimestamp(char *buffer) {
00205 int size;
00206
00207
00208 memcpy(buffer, txnId.c_str(), txnId.size());
00209 buffer += txnId.size();
00210
00211
00212 *buffer = ' ';
00213 buffer++;
00214
00215
00216 size = snprintf(buffer, INT64_STR_BUFSIZE, "%llu", SystemTime::getUsec());
00217 if (size >= INT64_STR_BUFSIZE) {
00218
00219 throw IOException("Cannot format a new transaction log message timestamp.");
00220 }
00221 buffer += size;
00222
00223
00224 *buffer = ' ';
00225
00226 return buffer + 1;
00227 }
00228
00229 public:
00230 AnalyticsLog() { }
00231
00232 AnalyticsLog(const AnalyticsLoggerSharedDataPtr &sharedData, const string &txnId,
00233 const string &groupName, const string &category, const string &unionStationKey)
00234 {
00235 this->sharedData = sharedData;
00236 this->txnId = txnId;
00237 this->groupName = groupName;
00238 this->category = category;
00239 this->unionStationKey = unionStationKey;
00240 shouldFlushToDiskAfterClose = false;
00241 }
00242
00243 ~AnalyticsLog() {
00244 if (sharedData != NULL) {
00245 lock_guard<boost::mutex> l(sharedData->lock);
00246 if (sharedData->client.connected()) {
00247 try {
00248 char timestamp[2 * sizeof(unsigned long long) + 1];
00249 integerToHexatri<unsigned long long>(SystemTime::getUsec(),
00250 timestamp);
00251 sharedData->client.write("closeTransaction",
00252 txnId.c_str(), timestamp, NULL);
00253 } catch (const SystemException &e) {
00254 if (e.code() == EPIPE || e.code() == ECONNRESET) {
00255 TRACE_POINT();
00256 sharedData->disconnect(true);
00257 } else {
00258 throw;
00259 }
00260 }
00261
00262 if (shouldFlushToDiskAfterClose) {
00263 vector<string> args;
00264 sharedData->client.write("flush", NULL);
00265 sharedData->client.read(args);
00266 }
00267 }
00268 }
00269 }
00270
00271 void message(const StaticString &text) {
00272 if (sharedData != NULL) {
00273 lock_guard<boost::mutex> l(sharedData->lock);
00274 if (sharedData->client.connected()) {
00275 char timestamp[2 * sizeof(unsigned long long) + 1];
00276 integerToHexatri<unsigned long long>(SystemTime::getUsec(), timestamp);
00277 sharedData->client.write("log", txnId.c_str(),
00278 timestamp, NULL);
00279 sharedData->client.writeScalar(text);
00280 }
00281 }
00282 }
00283
00284 void abort(const StaticString &text) {
00285 if (sharedData != NULL) {
00286 lock_guard<boost::mutex> l(sharedData->lock);
00287 if (sharedData->client.connected()) {
00288 message("ABORT");
00289 }
00290 }
00291 }
00292
00293 void flushToDiskAfterClose(bool value) {
00294 shouldFlushToDiskAfterClose = value;
00295 }
00296
00297 bool isNull() const {
00298 return sharedData == NULL;
00299 }
00300
00301 string getTxnId() const {
00302 return txnId;
00303 }
00304
00305 string getGroupName() const {
00306 return groupName;
00307 }
00308
00309 string getCategory() const {
00310 return category;
00311 }
00312
00313 string getUnionStationKey() const {
00314 return unionStationKey;
00315 }
00316 };
00317
00318 typedef shared_ptr<AnalyticsLog> AnalyticsLogPtr;
00319
00320 class AnalyticsScopeLog {
00321 private:
00322 AnalyticsLog *log;
00323 enum {
00324 NAME,
00325 GRANULAR
00326 } type;
00327 union {
00328 const char *name;
00329 struct {
00330 const char *endMessage;
00331 const char *abortMessage;
00332 } granular;
00333 } data;
00334 bool ok;
00335
00336 static string timevalToString(struct timeval &tv) {
00337 unsigned long long i = (unsigned long long) tv.tv_sec * 1000000 + tv.tv_usec;
00338 return usecToString(i);
00339 }
00340
00341 static string usecToString(unsigned long long usec) {
00342 char timestamp[2 * sizeof(unsigned long long) + 1];
00343 integerToHexatri<unsigned long long>(usec, timestamp);
00344 return timestamp;
00345 }
00346
00347 public:
00348 AnalyticsScopeLog(const AnalyticsLogPtr &log, const char *name) {
00349 this->log = log.get();
00350 type = NAME;
00351 data.name = name;
00352 ok = false;
00353 if (log != NULL && !log->isNull()) {
00354 string message;
00355 struct rusage usage;
00356
00357 message.reserve(150);
00358 message.append("BEGIN: ");
00359 message.append(name);
00360 message.append(" (");
00361 message.append(usecToString(SystemTime::getUsec()));
00362 message.append(",");
00363 if (getrusage(RUSAGE_SELF, &usage) == -1) {
00364 int e = errno;
00365 throw SystemException("getrusage() failed", e);
00366 }
00367 message.append(timevalToString(usage.ru_utime));
00368 message.append(",");
00369 message.append(timevalToString(usage.ru_stime));
00370 message.append(") ");
00371 log->message(message);
00372 }
00373 }
00374
00375 AnalyticsScopeLog(const AnalyticsLogPtr &log, const char *beginMessage,
00376 const char *endMessage, const char *abortMessage = NULL
00377 ) {
00378 this->log = log.get();
00379 if (log != NULL) {
00380 type = GRANULAR;
00381 data.granular.endMessage = endMessage;
00382 data.granular.abortMessage = abortMessage;
00383 ok = abortMessage == NULL;
00384 log->message(beginMessage);
00385 }
00386 }
00387
00388 ~AnalyticsScopeLog() {
00389 if (log == NULL) {
00390 return;
00391 }
00392 if (type == NAME) {
00393 if (!log->isNull()) {
00394 string message;
00395 struct rusage usage;
00396
00397 message.reserve(150);
00398 if (ok) {
00399 message.append("END: ");
00400 } else {
00401 message.append("FAIL: ");
00402 }
00403 message.append(data.name);
00404 message.append(" (");
00405 message.append(usecToString(SystemTime::getUsec()));
00406 message.append(",");
00407 if (getrusage(RUSAGE_SELF, &usage) == -1) {
00408 int e = errno;
00409 throw SystemException("getrusage() failed", e);
00410 }
00411 message.append(timevalToString(usage.ru_utime));
00412 message.append(",");
00413 message.append(timevalToString(usage.ru_stime));
00414 message.append(")");
00415 log->message(message);
00416 }
00417 } else {
00418 if (ok) {
00419 log->message(data.granular.endMessage);
00420 } else {
00421 log->message(data.granular.abortMessage);
00422 }
00423 }
00424 }
00425
00426 void success() {
00427 ok = true;
00428 }
00429 };
00430
00431 class AnalyticsLogger {
00432 private:
00433
00434
00435
00436
00437 struct SharedDataLock {
00438 AnalyticsLoggerSharedDataPtr sharedData;
00439 bool locked;
00440
00441 SharedDataLock(const AnalyticsLoggerSharedDataPtr &d)
00442 : sharedData(d)
00443 {
00444 d->lock.lock();
00445 locked = true;
00446 }
00447
00448 ~SharedDataLock() {
00449 if (locked) {
00450 sharedData->lock.unlock();
00451 }
00452 }
00453
00454 void reset(const AnalyticsLoggerSharedDataPtr &d, bool lockNow = true) {
00455 if (locked) {
00456 sharedData->lock.unlock();
00457 }
00458 sharedData = d;
00459 if (lockNow) {
00460 sharedData->lock.lock();
00461 locked = true;
00462 } else {
00463 locked = false;
00464 }
00465 }
00466
00467 void lock() {
00468 assert(!locked);
00469 sharedData->lock.lock();
00470 locked = true;
00471 }
00472 };
00473
00474 static const int RETRY_SLEEP = 200000;
00475
00476 string serverAddress;
00477 string username;
00478 string password;
00479 string nodeName;
00480 RandomGenerator randomGenerator;
00481
00482
00483 mutable boost::mutex lock;
00484
00485 unsigned int maxConnectTries;
00486 unsigned long long reconnectTimeout;
00487 unsigned long long nextReconnectTime;
00488
00489 AnalyticsLoggerSharedDataPtr sharedData;
00490
00491 bool connected() const {
00492 return sharedData->client.connected();
00493 }
00494
00495 void connect() {
00496 TRACE_POINT();
00497 vector<string> args;
00498
00499 sharedData->client.connect(serverAddress, username, password);
00500 sharedData->client.write("init", nodeName.c_str(), NULL);
00501 if (!sharedData->client.read(args)) {
00502 throw SystemException("Cannot connect to logging server", ECONNREFUSED);
00503 } else if (args.size() != 1) {
00504 throw IOException("Logging server returned an invalid reply for the 'init' command");
00505 } else if (args[0] == "server shutting down") {
00506 throw SystemException("Cannot connect to server", ECONNREFUSED);
00507 } else if (args[0] != "ok") {
00508 throw IOException("Logging server returned an invalid reply for the 'init' command");
00509 }
00510
00511
00512
00513 sharedData->client.setAutoDisconnect(false);
00514 }
00515
00516 void disconnect(bool checkErrorResponse = false) {
00517 sharedData->disconnect(checkErrorResponse);
00518
00519
00520
00521 sharedData.reset(new AnalyticsLoggerSharedData());
00522 }
00523
00524 bool isNetworkError(int code) const {
00525 return code == EPIPE || code == ECONNREFUSED || code == ECONNRESET
00526 || code == EHOSTUNREACH || code == ENETDOWN || code == ENETUNREACH
00527 || code == ETIMEDOUT;
00528 }
00529
00530 public:
00531 AnalyticsLogger() { }
00532
00533 AnalyticsLogger(const string &serverAddress, const string &username,
00534 const string &password, const string &nodeName = "")
00535 {
00536 this->serverAddress = serverAddress;
00537 this->username = username;
00538 this->password = password;
00539 if (nodeName.empty()) {
00540 this->nodeName = getHostName();
00541 } else {
00542 this->nodeName = nodeName;
00543 }
00544 if (!serverAddress.empty()) {
00545 sharedData.reset(new AnalyticsLoggerSharedData());
00546 }
00547 if (isLocalSocketAddress(serverAddress)) {
00548 maxConnectTries = 10;
00549 } else {
00550 maxConnectTries = 1;
00551 }
00552 maxConnectTries = 10;
00553 reconnectTimeout = 1000000;
00554 nextReconnectTime = 0;
00555 }
00556
00557 AnalyticsLogPtr newTransaction(const string &groupName, const string &category = "requests",
00558 const string &unionStationKey = string(),
00559 const string &filters = string())
00560 {
00561 if (serverAddress.empty()) {
00562 return ptr(new AnalyticsLog());
00563 }
00564
00565 unsigned long long timestamp = SystemTime::getUsec();
00566 char txnId[
00567 2 * sizeof(unsigned int) +
00568 11 +
00569 1
00570 ];
00571 char *end;
00572 unsigned int timestampSize;
00573 char timestampStr[2 * sizeof(unsigned long long) + 1];
00574
00575
00576
00577
00578
00579 timestampSize = integerToHexatri<unsigned int>(timestamp / 1000000 / 60,
00580 txnId);
00581 end = txnId + timestampSize;
00582
00583
00584 *end = '-';
00585 end++;
00586
00587
00588 randomGenerator.generateAsciiString(end, 11);
00589 end += 11;
00590 *end = '\0';
00591
00592 integerToHexatri<unsigned long long>(timestamp, timestampStr);
00593
00594 unique_lock<boost::mutex> l(lock);
00595 SharedDataLock sl(sharedData);
00596
00597 if (SystemTime::getUsec() >= nextReconnectTime) {
00598 unsigned int tryCount = 0;
00599
00600 while (tryCount < maxConnectTries) {
00601 try {
00602 if (!connected()) {
00603 TRACE_POINT();
00604 connect();
00605 }
00606 sharedData->client.write("openTransaction",
00607 txnId,
00608 groupName.c_str(),
00609 "",
00610 category.c_str(),
00611 timestampStr,
00612 unionStationKey.c_str(),
00613 "true",
00614 "true",
00615 filters.c_str(),
00616 NULL);
00617
00618 vector<string> args;
00619 sharedData->client.read(args);
00620 if (args.size() == 2 && args[0] == "error") {
00621 disconnect();
00622 throw IOException("The logging server responded with an error: " + args[1]);
00623 } else if (args.empty() || args[0] != "ok") {
00624 disconnect();
00625 throw IOException("The logging server sent an unexpected reply.");
00626 }
00627
00628 return ptr(new AnalyticsLog(sharedData,
00629 string(txnId, end - txnId),
00630 groupName, category,
00631 unionStationKey));
00632 } catch (const SystemException &e) {
00633 TRACE_POINT();
00634 if (e.code() == ENOENT || isNetworkError(e.code())) {
00635 tryCount++;
00636 disconnect(true);
00637 sl.reset(sharedData, false);
00638 l.unlock();
00639 if (tryCount < maxConnectTries) {
00640 syscalls::usleep(RETRY_SLEEP);
00641 }
00642 l.lock();
00643 sl.lock();
00644 } else {
00645 disconnect();
00646 throw;
00647 }
00648 }
00649
00650
00651 P_WARN("Cannot connect to the logging agent (" << serverAddress << "); " <<
00652 "retrying in " << reconnectTimeout / 1000000 << " second(s).");
00653 nextReconnectTime = SystemTime::getUsec() + reconnectTimeout;
00654 }
00655 }
00656 return ptr(new AnalyticsLog());
00657 }
00658
00659 AnalyticsLogPtr continueTransaction(const string &txnId, const string &groupName,
00660 const string &category = "requests", const string &unionStationKey = string())
00661 {
00662 if (serverAddress.empty() || txnId.empty()) {
00663 return ptr(new AnalyticsLog());
00664 }
00665
00666 char timestampStr[2 * sizeof(unsigned long long) + 1];
00667 integerToHexatri<unsigned long long>(SystemTime::getUsec(), timestampStr);
00668
00669 unique_lock<boost::mutex> l(lock);
00670 SharedDataLock sl(sharedData);
00671
00672 if (SystemTime::getUsec() >= nextReconnectTime) {
00673 unsigned int tryCount = 0;
00674
00675 while (tryCount < maxConnectTries) {
00676 try {
00677 if (!connected()) {
00678 TRACE_POINT();
00679 connect();
00680 }
00681 sharedData->client.write("openTransaction",
00682 txnId.c_str(),
00683 groupName.c_str(),
00684 "",
00685 category.c_str(),
00686 timestampStr,
00687 unionStationKey.c_str(),
00688 "true",
00689 NULL);
00690 return ptr(new AnalyticsLog(sharedData,
00691 txnId, groupName, category,
00692 unionStationKey));
00693 } catch (const SystemException &e) {
00694 TRACE_POINT();
00695 if (e.code() == EPIPE || isNetworkError(e.code())) {
00696 tryCount++;
00697 disconnect(true);
00698 sl.reset(sharedData, false);
00699 l.unlock();
00700 if (tryCount < maxConnectTries) {
00701 syscalls::usleep(RETRY_SLEEP);
00702 }
00703 l.lock();
00704 sl.lock();
00705 } else {
00706 disconnect();
00707 throw;
00708 }
00709 }
00710 }
00711
00712
00713 P_WARN("Cannot connect to the logging agent (" << serverAddress << "); " <<
00714 "retrying in " << reconnectTimeout / 1000000 << " second(s).");
00715 nextReconnectTime = SystemTime::getUsec() + reconnectTimeout;
00716 }
00717 return ptr(new AnalyticsLog());
00718 }
00719
00720 void setMaxConnectTries(unsigned int value) {
00721 lock_guard<boost::mutex> l(lock);
00722 maxConnectTries = value;
00723 }
00724
00725 void setReconnectTimeout(unsigned long long usec) {
00726 lock_guard<boost::mutex> l(lock);
00727 reconnectTimeout = usec;
00728 }
00729
00730 bool isNull() const {
00731 return serverAddress.empty();
00732 }
00733
00734 string getAddress() const {
00735 return serverAddress;
00736 }
00737
00738 string getUsername() const {
00739 return username;
00740 }
00741
00742 string getPassword() const {
00743 return password;
00744 }
00745
00746 FileDescriptor getConnection() const {
00747 lock_guard<boost::mutex> l(lock);
00748 lock_guard<boost::mutex> l2(sharedData->lock);
00749 return sharedData->client.getConnection();
00750 }
00751
00752
00753
00754
00755 string getNodeName() const {
00756 return nodeName;
00757 }
00758 };
00759
00760 typedef shared_ptr<AnalyticsLogger> AnalyticsLoggerPtr;
00761
00762 }
00763
00764 #endif
00765