diff options
author | Patrick Ohly <patrick.ohly@intel.com> | 2013-02-06 10:59:30 +0100 |
---|---|---|
committer | Patrick Ohly <patrick.ohly@intel.com> | 2013-02-26 12:03:42 +0100 |
commit | fecfd3f69d7c014386a8c96b9f9da9aa29f62626 (patch) | |
tree | e4abef27538b3ffd608b9576b660d2006042b5b0 | |
parent | 501c32c06d6e8cee1eb06bb5f5ffd55ce8c1e85b (diff) |
PIM Manager: make some of the D-Bus methods thread-safe
This adds the infrastructure for shifting the work of the D-Bus
methods into the main thread if called by another thread, which may
happen when we add other bindings for them.
Work is shifted to the main thread via a GAsyncQueue +
g_main_context_wakeup(). The calling thread then blocks waiting on a
condition variable until signaled by the main thread. Results are
stored for the calling thread as part of the operation that it
provides. Exceptions in the main thread are caught+serialized as
string and deserialized+rethrown in the calling thread - a bit crude,
but should work and reuses existing code.
-rw-r--r-- | src/dbus/server/pim/manager.cpp | 117 | ||||
-rw-r--r-- | src/dbus/server/pim/manager.h | 27 | ||||
-rw-r--r-- | src/syncevo/GLibSupport.h | 1 |
3 files changed, 144 insertions, 1 deletions
diff --git a/src/dbus/server/pim/manager.cpp b/src/dbus/server/pim/manager.cpp index b84a384f..eb940e8b 100644 --- a/src/dbus/server/pim/manager.cpp +++ b/src/dbus/server/pim/manager.cpp @@ -76,6 +76,8 @@ Manager::Manager(const boost::shared_ptr<Server> &server) : DBusObjectHelper(server->getConnection(), MANAGER_PATH, MANAGER_IFACE), + m_mainThread(g_thread_self()), + m_taskQueue(GAsyncQueueStealCXX(g_async_queue_new())), m_server(server), m_locale(LocaleFactory::createFactory()) { @@ -93,6 +95,8 @@ Manager::~Manager() void Manager::init() { + m_taskQueueFlush.activate(-1, boost::bind(&Manager::checkTaskQueueOnIdle, this)); + // Restore sort order and active databases. m_configNode.reset(new IniFileConfigNode(SubstEnvironment("${XDG_CONFIG_HOME}/syncevolution"), "pim-manager.ini", @@ -140,6 +144,89 @@ void Manager::init() boost::function<void (bool)>()); } +struct TaskForMain +{ + GMutex m_mutex; + GCond m_cond; + bool m_done; + boost::function<void ()> m_operation; + boost::function<void ()> m_rethrow; +}; + +template <class R> void AssignResult(const boost::function<R ()> &operation, + R &res) +{ + res = operation(); +} + +template <class R> R Manager::runInMainRes(const boost::function<R ()> &operation) +{ + // Prepare task. + R res; + TaskForMain task; + g_mutex_init(&task.m_mutex); + g_cond_init(&task.m_cond); + task.m_done = false; + task.m_operation = boost::bind(&AssignResult<R>, boost::cref(operation), boost::ref(res)); + + // Run in main. + g_async_queue_push(m_taskQueue, &task); + g_main_context_wakeup(NULL); + g_mutex_lock(&task.m_mutex); + while (!task.m_done) { + g_cond_wait(&task.m_cond, &task.m_mutex); + } + g_mutex_unlock(&task.m_mutex); + + // Rethrow exceptions (optional) and return result. + g_cond_clear(&task.m_cond); + g_mutex_clear(&task.m_mutex); + if (task.m_rethrow) { + task.m_rethrow(); + } + return res; +} + +static int Return1(const boost::function<void ()> &operation) +{ + operation(); + return 1; +} + +void Manager::runInMainVoid(const boost::function<void ()> &operation) +{ + runInMainRes<int>(boost::bind(Return1, boost::cref(operation))); +} + +bool Manager::checkTaskQueueOnIdle() +{ + TaskForMain *task; + + while ((task = static_cast<TaskForMain *>(g_async_queue_try_pop(m_taskQueue))) != NULL) { + g_mutex_lock(&task->m_mutex); + + // Exceptions must be reported back to the original thread. + // This is done by serializing them as string, then using the + // existing Exception::tryRethrow() to turn that string back + // into an instance of the right class. + try { + task->m_operation(); + } catch (...) { + std::string explanation; + Exception::handle(explanation); + task->m_rethrow = boost::bind(Exception::tryRethrow, explanation, true); + } + + // Wake up task. + task->m_done = true; + g_cond_signal(&task->m_cond); + g_mutex_unlock(&task->m_mutex); + } + + // Keep checking. + return true; +} + void Manager::initFolks() { m_folks = IndividualAggregator::create(m_locale); @@ -177,6 +264,11 @@ boost::shared_ptr<GDBusCXX::DBusObjectHelper> CreateContactManager(const boost:: void Manager::start() { + if (!isMain()) { + runInMainV(&Manager::start); + return; + } + if (!m_preventingAutoTerm) { // Prevent automatic shut down during idle times, because we need // to keep our unified address book available. @@ -188,6 +280,11 @@ void Manager::start() void Manager::stop() { + if (!isMain()) { + runInMainV(&Manager::stop); + return; + } + // If there are no active searches, then recreate aggregator. // Instead of tracking open views, use the knowledge that an // idle server has only two references to the main view: @@ -208,11 +305,20 @@ void Manager::stop() bool Manager::isRunning() { + if (!isMain()) { + return runInMainR(&Manager::isRunning); + } + return m_folks->isRunning(); } void Manager::setSortOrder(const std::string &order) { + if (!isMain()) { + runInMainV(&Manager::setSortOrder, order); + return; + } + if (order == getSortOrder()) { // Nothing to do. return; @@ -226,6 +332,15 @@ void Manager::setSortOrder(const std::string &order) m_sortOrder = order; } +std::string Manager::getSortOrder() +{ + if (!isMain()) { + return runInMainR(&Manager::getSortOrder); + } + + return m_sortOrder; +} + /** * Connects a normal IndividualView to a D-Bus client. * Provides the org.01.pim.contacts.ViewControl API. @@ -651,6 +766,8 @@ void Manager::search(const boost::shared_ptr< GDBusCXX::Result1<GDBusCXX::DBusOb const LocaleFactory::Filter_t &filter, const GDBusCXX::DBusObject_t &agentPath) { + // TODO: figure out a native, thread-safe API for this. + // Start folks in parallel with asking for an ESourceRegistry. start(); diff --git a/src/dbus/server/pim/manager.h b/src/dbus/server/pim/manager.h index 121cad05..d7176358 100644 --- a/src/dbus/server/pim/manager.h +++ b/src/dbus/server/pim/manager.h @@ -38,6 +38,9 @@ SE_BEGIN_CXX */ class Manager : public GDBusCXX::DBusObjectHelper { + GThread *m_mainThread; + GAsyncQueueCXX m_taskQueue; + Timeout m_taskQueueFlush; boost::weak_ptr<Manager> m_self; boost::shared_ptr<Server> m_server; boost::shared_ptr<IndividualAggregator> m_folks; @@ -73,7 +76,7 @@ class Manager : public GDBusCXX::DBusObjectHelper /** Manager.SetSortOrder() */ void setSortOrder(const std::string &order); /** Manager.GetSortOrder() */ - std::string getSortOrder() { return m_sortOrder; } + std::string getSortOrder(); /** Manager.Search() */ void search(const boost::shared_ptr< GDBusCXX::Result1<GDBusCXX::DBusObject_t> > &result, const GDBusCXX::Caller_t &ID, @@ -173,6 +176,28 @@ class Manager : public GDBusCXX::DBusObjectHelper const boost::shared_ptr<GDBusCXX::Result> &result, const boost::function<void (const boost::shared_ptr<Session> &session)> &callback); + /** true if the current thread is the one handling the event loop and running all operations */ + bool isMain() { return g_thread_self() == m_mainThread; } + + /** + * Runs the operation inside the main thread and returns once the + * main thread is done with it. + */ + void runInMainVoid(const boost::function<void ()> &operation); + template <class R> R runInMainRes(const boost::function<R ()> &operation); + + void runInMainV(void (Manager::*method)()) { runInMainVoid(boost::bind(method, this)); } + template <class R> R runInMainR(R (Manager::*method)()) { return runInMainRes<R>(boost::bind(method, this)); } + template <class A1, class B1> void runInMainV(void (Manager::*method)(B1), A1 a1) { runInMainVoid(boost::bind(method, this, a1)); } + template <class R, class A1, class B1> R runInMainR(R (Manager::*method)(B1), A1 a1) { return runInMainRes<R>(boost::bind(method, this, a1)); } + + /** + * An idle callback which checks the task queue. runInMainV() + * wakes up the context to ensure that the idle callback is + * invoked. + */ + bool checkTaskQueueOnIdle(); + public: /** * Creates an instance of the Manager which runs as part diff --git a/src/syncevo/GLibSupport.h b/src/syncevo/GLibSupport.h index 8991aa33..f6224192 100644 --- a/src/syncevo/GLibSupport.h +++ b/src/syncevo/GLibSupport.h @@ -301,6 +301,7 @@ SE_END_CXX SE_GOBJECT_TYPE(GFile) SE_GOBJECT_TYPE(GFileMonitor) SE_GLIB_TYPE(GMainLoop, g_main_loop) +SE_GLIB_TYPE(GAsyncQueue, g_async_queue) SE_BEGIN_CXX |