summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Ohly <patrick.ohly@intel.com>2013-02-06 10:59:30 +0100
committerPatrick Ohly <patrick.ohly@intel.com>2013-02-26 12:03:42 +0100
commitfecfd3f69d7c014386a8c96b9f9da9aa29f62626 (patch)
treee4abef27538b3ffd608b9576b660d2006042b5b0
parent501c32c06d6e8cee1eb06bb5f5ffd55ce8c1e85b (diff)
PIM Manager: make some of the D-Bus methods thread-safe
This adds the infrastructure for shifting the work of the D-Bus methods into the main thread if called by another thread, which may happen when we add other bindings for them. Work is shifted to the main thread via a GAsyncQueue + g_main_context_wakeup(). The calling thread then blocks waiting on a condition variable until signaled by the main thread. Results are stored for the calling thread as part of the operation that it provides. Exceptions in the main thread are caught+serialized as string and deserialized+rethrown in the calling thread - a bit crude, but should work and reuses existing code.
-rw-r--r--src/dbus/server/pim/manager.cpp117
-rw-r--r--src/dbus/server/pim/manager.h27
-rw-r--r--src/syncevo/GLibSupport.h1
3 files changed, 144 insertions, 1 deletions
diff --git a/src/dbus/server/pim/manager.cpp b/src/dbus/server/pim/manager.cpp
index b84a384f..eb940e8b 100644
--- a/src/dbus/server/pim/manager.cpp
+++ b/src/dbus/server/pim/manager.cpp
@@ -76,6 +76,8 @@ Manager::Manager(const boost::shared_ptr<Server> &server) :
DBusObjectHelper(server->getConnection(),
MANAGER_PATH,
MANAGER_IFACE),
+ m_mainThread(g_thread_self()),
+ m_taskQueue(GAsyncQueueStealCXX(g_async_queue_new())),
m_server(server),
m_locale(LocaleFactory::createFactory())
{
@@ -93,6 +95,8 @@ Manager::~Manager()
void Manager::init()
{
+ m_taskQueueFlush.activate(-1, boost::bind(&Manager::checkTaskQueueOnIdle, this));
+
// Restore sort order and active databases.
m_configNode.reset(new IniFileConfigNode(SubstEnvironment("${XDG_CONFIG_HOME}/syncevolution"),
"pim-manager.ini",
@@ -140,6 +144,89 @@ void Manager::init()
boost::function<void (bool)>());
}
+struct TaskForMain
+{
+ GMutex m_mutex;
+ GCond m_cond;
+ bool m_done;
+ boost::function<void ()> m_operation;
+ boost::function<void ()> m_rethrow;
+};
+
+template <class R> void AssignResult(const boost::function<R ()> &operation,
+ R &res)
+{
+ res = operation();
+}
+
+template <class R> R Manager::runInMainRes(const boost::function<R ()> &operation)
+{
+ // Prepare task.
+ R res;
+ TaskForMain task;
+ g_mutex_init(&task.m_mutex);
+ g_cond_init(&task.m_cond);
+ task.m_done = false;
+ task.m_operation = boost::bind(&AssignResult<R>, boost::cref(operation), boost::ref(res));
+
+ // Run in main.
+ g_async_queue_push(m_taskQueue, &task);
+ g_main_context_wakeup(NULL);
+ g_mutex_lock(&task.m_mutex);
+ while (!task.m_done) {
+ g_cond_wait(&task.m_cond, &task.m_mutex);
+ }
+ g_mutex_unlock(&task.m_mutex);
+
+ // Rethrow exceptions (optional) and return result.
+ g_cond_clear(&task.m_cond);
+ g_mutex_clear(&task.m_mutex);
+ if (task.m_rethrow) {
+ task.m_rethrow();
+ }
+ return res;
+}
+
+static int Return1(const boost::function<void ()> &operation)
+{
+ operation();
+ return 1;
+}
+
+void Manager::runInMainVoid(const boost::function<void ()> &operation)
+{
+ runInMainRes<int>(boost::bind(Return1, boost::cref(operation)));
+}
+
+bool Manager::checkTaskQueueOnIdle()
+{
+ TaskForMain *task;
+
+ while ((task = static_cast<TaskForMain *>(g_async_queue_try_pop(m_taskQueue))) != NULL) {
+ g_mutex_lock(&task->m_mutex);
+
+ // Exceptions must be reported back to the original thread.
+ // This is done by serializing them as string, then using the
+ // existing Exception::tryRethrow() to turn that string back
+ // into an instance of the right class.
+ try {
+ task->m_operation();
+ } catch (...) {
+ std::string explanation;
+ Exception::handle(explanation);
+ task->m_rethrow = boost::bind(Exception::tryRethrow, explanation, true);
+ }
+
+ // Wake up task.
+ task->m_done = true;
+ g_cond_signal(&task->m_cond);
+ g_mutex_unlock(&task->m_mutex);
+ }
+
+ // Keep checking.
+ return true;
+}
+
void Manager::initFolks()
{
m_folks = IndividualAggregator::create(m_locale);
@@ -177,6 +264,11 @@ boost::shared_ptr<GDBusCXX::DBusObjectHelper> CreateContactManager(const boost::
void Manager::start()
{
+ if (!isMain()) {
+ runInMainV(&Manager::start);
+ return;
+ }
+
if (!m_preventingAutoTerm) {
// Prevent automatic shut down during idle times, because we need
// to keep our unified address book available.
@@ -188,6 +280,11 @@ void Manager::start()
void Manager::stop()
{
+ if (!isMain()) {
+ runInMainV(&Manager::stop);
+ return;
+ }
+
// If there are no active searches, then recreate aggregator.
// Instead of tracking open views, use the knowledge that an
// idle server has only two references to the main view:
@@ -208,11 +305,20 @@ void Manager::stop()
bool Manager::isRunning()
{
+ if (!isMain()) {
+ return runInMainR(&Manager::isRunning);
+ }
+
return m_folks->isRunning();
}
void Manager::setSortOrder(const std::string &order)
{
+ if (!isMain()) {
+ runInMainV(&Manager::setSortOrder, order);
+ return;
+ }
+
if (order == getSortOrder()) {
// Nothing to do.
return;
@@ -226,6 +332,15 @@ void Manager::setSortOrder(const std::string &order)
m_sortOrder = order;
}
+std::string Manager::getSortOrder()
+{
+ if (!isMain()) {
+ return runInMainR(&Manager::getSortOrder);
+ }
+
+ return m_sortOrder;
+}
+
/**
* Connects a normal IndividualView to a D-Bus client.
* Provides the org.01.pim.contacts.ViewControl API.
@@ -651,6 +766,8 @@ void Manager::search(const boost::shared_ptr< GDBusCXX::Result1<GDBusCXX::DBusOb
const LocaleFactory::Filter_t &filter,
const GDBusCXX::DBusObject_t &agentPath)
{
+ // TODO: figure out a native, thread-safe API for this.
+
// Start folks in parallel with asking for an ESourceRegistry.
start();
diff --git a/src/dbus/server/pim/manager.h b/src/dbus/server/pim/manager.h
index 121cad05..d7176358 100644
--- a/src/dbus/server/pim/manager.h
+++ b/src/dbus/server/pim/manager.h
@@ -38,6 +38,9 @@ SE_BEGIN_CXX
*/
class Manager : public GDBusCXX::DBusObjectHelper
{
+ GThread *m_mainThread;
+ GAsyncQueueCXX m_taskQueue;
+ Timeout m_taskQueueFlush;
boost::weak_ptr<Manager> m_self;
boost::shared_ptr<Server> m_server;
boost::shared_ptr<IndividualAggregator> m_folks;
@@ -73,7 +76,7 @@ class Manager : public GDBusCXX::DBusObjectHelper
/** Manager.SetSortOrder() */
void setSortOrder(const std::string &order);
/** Manager.GetSortOrder() */
- std::string getSortOrder() { return m_sortOrder; }
+ std::string getSortOrder();
/** Manager.Search() */
void search(const boost::shared_ptr< GDBusCXX::Result1<GDBusCXX::DBusObject_t> > &result,
const GDBusCXX::Caller_t &ID,
@@ -173,6 +176,28 @@ class Manager : public GDBusCXX::DBusObjectHelper
const boost::shared_ptr<GDBusCXX::Result> &result,
const boost::function<void (const boost::shared_ptr<Session> &session)> &callback);
+ /** true if the current thread is the one handling the event loop and running all operations */
+ bool isMain() { return g_thread_self() == m_mainThread; }
+
+ /**
+ * Runs the operation inside the main thread and returns once the
+ * main thread is done with it.
+ */
+ void runInMainVoid(const boost::function<void ()> &operation);
+ template <class R> R runInMainRes(const boost::function<R ()> &operation);
+
+ void runInMainV(void (Manager::*method)()) { runInMainVoid(boost::bind(method, this)); }
+ template <class R> R runInMainR(R (Manager::*method)()) { return runInMainRes<R>(boost::bind(method, this)); }
+ template <class A1, class B1> void runInMainV(void (Manager::*method)(B1), A1 a1) { runInMainVoid(boost::bind(method, this, a1)); }
+ template <class R, class A1, class B1> R runInMainR(R (Manager::*method)(B1), A1 a1) { return runInMainRes<R>(boost::bind(method, this, a1)); }
+
+ /**
+ * An idle callback which checks the task queue. runInMainV()
+ * wakes up the context to ensure that the idle callback is
+ * invoked.
+ */
+ bool checkTaskQueueOnIdle();
+
public:
/**
* Creates an instance of the Manager which runs as part
diff --git a/src/syncevo/GLibSupport.h b/src/syncevo/GLibSupport.h
index 8991aa33..f6224192 100644
--- a/src/syncevo/GLibSupport.h
+++ b/src/syncevo/GLibSupport.h
@@ -301,6 +301,7 @@ SE_END_CXX
SE_GOBJECT_TYPE(GFile)
SE_GOBJECT_TYPE(GFileMonitor)
SE_GLIB_TYPE(GMainLoop, g_main_loop)
+SE_GLIB_TYPE(GAsyncQueue, g_async_queue)
SE_BEGIN_CXX