00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #ifndef _PASSENGER_SAFE_LIBEV_H_
00026 #define _PASSENGER_SAFE_LIBEV_H_
00027
00028 #include <ev++.h>
00029 #include <vector>
00030 #include <boost/thread.hpp>
00031 #include <boost/function.hpp>
00032 #include <boost/bind.hpp>
00033
00034 namespace Passenger {
00035
00036 using namespace std;
00037 using namespace boost;
00038
00039
00040
00041
00042
00043 class SafeLibev {
00044 private:
00045 typedef function<void ()> Callback;
00046
00047 struct ev_loop *loop;
00048 pthread_t loopThread;
00049 ev_async async;
00050
00051 boost::mutex syncher;
00052 condition_variable cond;
00053 vector<Callback> commands;
00054
00055 static void asyncHandler(EV_P_ ev_async *w, int revents) {
00056 SafeLibev *self = (SafeLibev *) w->data;
00057 unique_lock<boost::mutex> l(self->syncher);
00058 vector<Callback> commands = self->commands;
00059 self->commands.clear();
00060 l.unlock();
00061
00062 vector<Callback>::const_iterator it, end = commands.end();
00063 for (it = commands.begin(); it != commands.end(); it++) {
00064 (*it)();
00065 }
00066 }
00067
00068 template<typename Watcher>
00069 void startWatcherAndNotify(Watcher *watcher, bool *done) {
00070 watcher->set(loop);
00071 watcher->start();
00072 unique_lock<boost::mutex> l(syncher);
00073 *done = true;
00074 cond.notify_all();
00075 }
00076
00077 template<typename Watcher>
00078 void stopWatcherAndNotify(Watcher *watcher, bool *done) {
00079 watcher->stop();
00080 unique_lock<boost::mutex> l(syncher);
00081 *done = true;
00082 cond.notify_all();
00083 }
00084
00085 public:
00086 SafeLibev(struct ev_loop *loop) {
00087 this->loop = loop;
00088 loopThread = pthread_self();
00089 ev_async_init(&async, asyncHandler);
00090 async.data = this;
00091 ev_async_start(loop, &async);
00092 }
00093
00094 ~SafeLibev() {
00095 ev_async_stop(loop, &async);
00096 }
00097
00098 struct ev_loop *getLoop() const {
00099 return loop;
00100 }
00101
00102 template<typename Watcher>
00103 void start(Watcher &watcher) {
00104 if (pthread_equal(pthread_self(), loopThread)) {
00105 watcher.set(loop);
00106 watcher.start();
00107 } else {
00108 unique_lock<boost::mutex> l(syncher);
00109 bool done = false;
00110 commands.push_back(boost::bind(&SafeLibev::startWatcherAndNotify<Watcher>,
00111 this, &watcher, &done));
00112 ev_async_send(loop, &async);
00113 while (!done) {
00114 cond.wait(l);
00115 }
00116 }
00117 }
00118
00119 template<typename Watcher>
00120 void stop(Watcher &watcher) {
00121 if (pthread_equal(pthread_self(), loopThread)) {
00122 watcher.stop();
00123 } else {
00124 unique_lock<boost::mutex> l(syncher);
00125 bool done = false;
00126 commands.push_back(boost::bind(&SafeLibev::stopWatcherAndNotify<Watcher>,
00127 this, &watcher, &done));
00128 ev_async_send(loop, &async);
00129 while (!done) {
00130 cond.wait(l);
00131 }
00132 }
00133 }
00134
00135 void run(const Callback &callback) {
00136 if (pthread_equal(pthread_self(), loopThread)) {
00137 callback();
00138 } else {
00139 unique_lock<boost::mutex> l(syncher);
00140 commands.push_back(callback);
00141 ev_async_send(loop, &async);
00142 }
00143 }
00144 };
00145
00146
00147 }
00148
00149 #endif