00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #ifndef _PASSENGER_MESSAGE_READERS_WRITERS_H_
00026 #define _PASSENGER_MESSAGE_READERS_WRITERS_H_
00027
00028 #include <boost/cstdint.hpp>
00029 #include <oxt/macros.hpp>
00030 #include <algorithm>
00031 #include <vector>
00032 #include <string>
00033 #include <sys/types.h>
00034 #include <cstring>
00035 #include <arpa/inet.h>
00036 #include "StaticString.h"
00037 #include "Exceptions.h"
00038 #include "Utils/MemZeroGuard.h"
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069
00070
00071
00072
00073
00074
00075
00076
00077
00078
00079
00080
00081
00082
00083
00084
00085
00086
00087
00088
00089
00090
00091
00092
00093
00094
00095
00096
00097
00098
00099
00100
00101
00102
00103
00104
00105
00106
00107
00108
00109
00110
00111
00112
00113 namespace Passenger {
00114
00115 using namespace std;
00116
00117
00118
00119
00120 class Uint16Message {
00121 private:
00122 uint16_t val;
00123 uint8_t consumed;
00124
00125 public:
00126 Uint16Message() {
00127 consumed = 0;
00128 }
00129
00130 void reset() {
00131 consumed = 0;
00132 }
00133
00134 size_t feed(const char *data, size_t size) {
00135 size_t locallyConsumed;
00136
00137 locallyConsumed = std::min(size, sizeof(uint16_t) - consumed);
00138 memcpy((char *) &val + consumed, data, locallyConsumed);
00139 consumed += locallyConsumed;
00140 if (locallyConsumed > 0 && done()) {
00141 val = ntohs(val);
00142 }
00143 return locallyConsumed;
00144 }
00145
00146 bool done() const {
00147 return consumed == sizeof(uint16_t);
00148 }
00149
00150 uint16_t value() const {
00151 return val;
00152 }
00153
00154 static void generate(void *buf, uint16_t val) {
00155 val = htons(val);
00156 memcpy(buf, &val, sizeof(val));
00157 }
00158 };
00159
00160
00161
00162
00163 class Uint32Message {
00164 private:
00165 uint32_t val;
00166 uint8_t consumed;
00167
00168 public:
00169 Uint32Message() {
00170 consumed = 0;
00171 }
00172
00173 void reset() {
00174 consumed = 0;
00175 }
00176
00177 size_t feed(const char *data, size_t size) {
00178 size_t locallyConsumed;
00179
00180 locallyConsumed = std::min(size, sizeof(uint32_t) - consumed);
00181 memcpy((char *) &val + consumed, data, locallyConsumed);
00182 consumed += locallyConsumed;
00183 if (locallyConsumed > 0 && done()) {
00184 val = ntohl(val);
00185 }
00186 return locallyConsumed;
00187 }
00188
00189 bool done() const {
00190 return consumed == sizeof(uint32_t);
00191 }
00192
00193 uint32_t value() const {
00194 return val;
00195 }
00196
00197 static void generate(void *buf, uint32_t val) {
00198 val = htonl(val);
00199 memcpy(buf, &val, sizeof(val));
00200 }
00201 };
00202
00203
00204
00205
00206 class ArrayMessage {
00207 public:
00208 enum Error {
00209 TOO_LARGE
00210 };
00211
00212 private:
00213 enum State {
00214 READING_HEADER,
00215 READING_BODY,
00216 DONE,
00217 ERROR
00218 };
00219
00220 uint16_t toReserve;
00221 uint16_t maxSize;
00222 Uint16Message headerReader;
00223 uint8_t state;
00224 uint8_t error;
00225 string buffer;
00226 vector<StaticString> result;
00227
00228 void parseBody(const char *data, size_t size) {
00229 const char *start = data;
00230 const char *terminator;
00231 size_t rest = size;
00232
00233 while ((terminator = (const char *) memchr(start, '\0', rest)) != NULL) {
00234 size_t len = terminator - start;
00235 result.push_back(StaticString(start, len));
00236 start = terminator + 1;
00237 rest = size - (start - data);
00238 }
00239 }
00240
00241 public:
00242 ArrayMessage() {
00243 state = READING_HEADER;
00244 toReserve = 0;
00245 maxSize = 0;
00246 }
00247
00248 void reserve(uint16_t size) {
00249 toReserve = size;
00250 result.reserve(size);
00251 }
00252
00253 void setMaxSize(uint16_t size) {
00254 maxSize = size;
00255 }
00256
00257 void reset() {
00258 state = READING_HEADER;
00259 headerReader.reset();
00260 buffer.clear();
00261 result.clear();
00262 if (toReserve > 0) {
00263 result.reserve(toReserve);
00264 }
00265 }
00266
00267 size_t feed(const char *data, size_t size) {
00268 size_t consumed = 0;
00269
00270 while (consumed < size && !done()) {
00271 const char *current = data + consumed;
00272 size_t rest = size - consumed;
00273
00274 switch (state) {
00275 case READING_HEADER:
00276 consumed += headerReader.feed(current, rest);
00277 if (headerReader.done()) {
00278 if (maxSize > 0 && headerReader.value() > maxSize) {
00279 state = ERROR;
00280 error = TOO_LARGE;
00281 } else if (headerReader.value() == 0) {
00282 state = DONE;
00283 } else {
00284 state = READING_BODY;
00285 }
00286 }
00287 break;
00288 case READING_BODY:
00289 if (buffer.empty() && rest >= headerReader.value()) {
00290 parseBody(current, headerReader.value());
00291 state = DONE;
00292 consumed += headerReader.value();
00293 } else {
00294 size_t toConsume = std::min(rest,
00295 headerReader.value() - buffer.size());
00296 if (buffer.capacity() < headerReader.value()) {
00297 buffer.reserve(headerReader.value());
00298 }
00299 buffer.append(current, toConsume);
00300 consumed += toConsume;
00301 if (buffer.size() == headerReader.value()) {
00302 parseBody(buffer.data(), buffer.size());
00303 state = DONE;
00304 }
00305 }
00306 break;
00307 default:
00308
00309 abort();
00310 }
00311 }
00312 return consumed;
00313 }
00314
00315 bool done() const {
00316 return state == DONE || state == ERROR;
00317 }
00318
00319 bool hasError() const {
00320 return state == ERROR;
00321 }
00322
00323 Error errorCode() const {
00324 return (Error) error;
00325 }
00326
00327 const vector<StaticString> &value() const {
00328 return result;
00329 }
00330
00331
00332
00333
00334
00335
00336
00337
00338
00339
00340
00341
00342
00343
00344
00345
00346
00347
00348 static void generate(StaticString args[], unsigned int argsCount,
00349 char headerBuf[sizeof(uint16_t)], StaticString *out, unsigned int outCount)
00350 {
00351 if (OXT_UNLIKELY(outCount < outputSize(argsCount))) {
00352 throw ArgumentException("outCount too small.");
00353 }
00354
00355 unsigned int size = 0;
00356 unsigned int i;
00357
00358 for (i = 0; i < argsCount; i++) {
00359 size += args[i].size() + 1;
00360 }
00361 if (OXT_UNLIKELY(size > 0xFFFF)) {
00362 throw ArgumentException("Data size exceeds maximum size for array messages.");
00363 }
00364
00365 Uint16Message::generate(headerBuf, size);
00366 out[0] = StaticString(headerBuf, sizeof(uint16_t));
00367 for (i = 0; i < argsCount; i++) {
00368 out[1 + 2 * i] = args[i];
00369 out[1 + 2 * i + 1] = StaticString("\0", 1);
00370 }
00371 }
00372
00373 static unsigned int outputSize(unsigned int argsCount) {
00374 return argsCount * 2 + 1;
00375 }
00376 };
00377
00378
00379
00380
00381 class ScalarMessage {
00382 public:
00383 enum Error {
00384 TOO_LARGE
00385 };
00386
00387 private:
00388 enum State {
00389 READING_HEADER,
00390 READING_BODY,
00391 DONE,
00392 ERROR
00393 };
00394
00395 uint8_t state;
00396 uint8_t error;
00397 uint32_t maxSize;
00398 Uint32Message headerReader;
00399 string buffer;
00400 StaticString result;
00401
00402 public:
00403 ScalarMessage(uint32_t maxSize = 0) {
00404 state = READING_HEADER;
00405 this->maxSize = maxSize;
00406 }
00407
00408 void setMaxSize(uint32_t maxSize) {
00409 this->maxSize = maxSize;
00410 }
00411
00412
00413
00414
00415
00416
00417
00418
00419
00420 void reset(bool zeroBuffer = false) {
00421 state = READING_HEADER;
00422 if (zeroBuffer) {
00423 MemZeroGuard guard(buffer);
00424 }
00425 headerReader.reset();
00426 buffer.clear();
00427 }
00428
00429 size_t feed(const char *data, size_t size) {
00430 size_t consumed = 0;
00431
00432 while (consumed < size && !done()) {
00433 const char *current = data + consumed;
00434 size_t rest = size - consumed;
00435
00436 switch (state) {
00437 case READING_HEADER:
00438 consumed += headerReader.feed(current, rest);
00439 if (headerReader.done()) {
00440 if (maxSize > 0 && headerReader.value() > maxSize) {
00441 state = ERROR;
00442 error = TOO_LARGE;
00443 } else if (headerReader.value() == 0) {
00444 state = DONE;
00445 } else {
00446 state = READING_BODY;
00447 }
00448 }
00449 break;
00450 case READING_BODY:
00451 if (buffer.empty() && rest >= headerReader.value()) {
00452 result = StaticString(current, headerReader.value());
00453 state = DONE;
00454 consumed += headerReader.value();
00455 } else {
00456 size_t toConsume = std::min(rest,
00457 headerReader.value() - buffer.size());
00458 if (buffer.capacity() < headerReader.value()) {
00459 buffer.reserve(headerReader.value());
00460 }
00461 buffer.append(current, toConsume);
00462 consumed += toConsume;
00463 if (buffer.size() == headerReader.value()) {
00464 result = StaticString(buffer);
00465 state = DONE;
00466 }
00467 }
00468 break;
00469 default:
00470
00471 abort();
00472 };
00473 }
00474 return consumed;
00475 }
00476
00477 bool done() const {
00478 return state == DONE || state == ERROR;
00479 }
00480
00481 bool hasError() const {
00482 return state == ERROR;
00483 }
00484
00485 Error errorCode() const {
00486 return (Error) error;
00487 }
00488
00489 const StaticString &value() const {
00490 return result;
00491 }
00492
00493 static void generate(const StaticString &data, char headerBuf[sizeof(uint32_t)],
00494 StaticString output[2])
00495 {
00496 if (OXT_UNLIKELY(data.size() > 0xFFFFFFFF)) {
00497 throw ArgumentException("Data size exceeds maximum size for scalar messages.");
00498 }
00499
00500 Uint32Message::generate(headerBuf, data.size());
00501 output[0] = StaticString(headerBuf, sizeof(uint32_t));
00502 output[1] = data;
00503 }
00504 };
00505
00506 }
00507
00508 #endif