@ -5,7 +5,7 @@
| This file is part of the Roundcube Webmail client |
| This file is part of the Roundcube Webmail client |
| |
| |
| Copyright (C) 2013, The Roundcube Dev Team |
| Copyright (C) 2013, The Roundcube Dev Team |
| Copyright (C) 2013 , Kolab Systems AG |
| Copyright (C) 2014 , Kolab Systems AG |
| |
| |
| Licensed under the GNU General Public License version 3 or |
| Licensed under the GNU General Public License version 3 or |
| any later version with exceptions for skins & plugins. |
| any later version with exceptions for skins & plugins. |
@ -18,15 +18,8 @@
+-----------------------------------------------------------------------+
+-----------------------------------------------------------------------+
*/
*/
// create classes defined by the pthreads module if that isn't installed
if (!defined('PTHREADS_INHERIT_ALL')) {
class Worker { }
class Stackable { }
}
/**
/**
* Class to control search jobs on multiple IMAP folders.
* Class to control search jobs on multiple IMAP folders.
* This implement a simple threads pool using the pthreads extension.
*
*
* @package Framework
* @package Framework
* @subpackage Storage
* @subpackage Storage
@ -36,12 +29,10 @@ class rcube_imap_search
{
{
public $options = array();
public $options = array();
private $size = 10;
protected $jobs = array();
private $next = 0;
protected $timelimit = 0;
private $workers = array();
protected $results;
private $states = array();
protected $conn;
private $jobs = array();
private $conn;
/**
/**
* Default constructor
* Default constructor
@ -63,28 +54,32 @@ class rcube_imap_search
*/
*/
public function exec($folders, $str, $charset = null, $sort_field = null, $threading=null)
public function exec($folders, $str, $charset = null, $sort_field = null, $threading=null)
{
{
$pthreads = defined('PTHREADS_INHERIT_ALL');
$start = floor(microtime(true));
$results = new rcube_result_multifolder($folders);
$results = new rcube_result_multifolder($folders);
// start a search job for every folder to search in
// start a search job for every folder to search in
foreach ($folders as $folder) {
foreach ($folders as $folder) {
$job = new rcube_imap_search_job($folder, $str, $charset, $sort_field, $threading);
// a complete result for this folder already exists
if ($pthreads & & $this->submit($job)) {
$result = $this->results ? $this->results->get_set($folder) : false;
$this->jobs[] = $job;
if ($result & & !$result->incomplete) {
$results->add($result);
}
}
else {
else {
$job = new rcube_imap_search_job($folder, $str, $charset, $sort_field, $threading);
$job->worker = $this;
$job->worker = $this;
$job->run();
$this->jobs[] = $job;
$this->jobs[] = $job;
}
}
}
}
// wait for all workers to be done
// execute jobs and gather results
$this->shutdown();
// gather results
foreach ($this->jobs as $job) {
foreach ($this->jobs as $job) {
// only run search if within the configured time limit
// TODO: try to estimate the required time based on folder size and previous search performance
if (!$this->timelimit || floor(microtime(true)) - $start < $this->timelimit) {
$job->run();
}
// add result (may have ->incomplete flag set)
$results->add($job->get_result());
$results->add($job->get_result());
}
}
@ -92,51 +87,21 @@ class rcube_imap_search
}
}
/**
/**
* Assign the given job object to one of the worker threads for execution
* Setter for timelimt property
*/
*/
public function submit(Stackable $job )
public function set_timelimit($seconds )
{
{
if (count($this->workers) < $this->size) {
$this->timelimit = $seconds;
$id = count($this->workers);
$this->workers[$id] = new rcube_imap_search_worker($id, $this->options);
$this->workers[$id]->start(PTHREADS_INHERIT_ALL);
if ($this->workers[$id]->stack($job)) {
return $job;
}
else {
// trigger_error(sprintf("Failed to push Stackable onto %s", $id), E_USER_WARNING);
}
}
if (($worker = $this->workers[$this->next])) {
$this->next = ($this->next+1) % $this->size;
if ($worker->stack($job)) {
return $job;
}
else {
// trigger_error(sprintf("Failed to stack onto selected worker %s", $worker->id), E_USER_WARNING);
}
}
else {
// trigger_error(sprintf("Failed to select a worker for Stackable"), E_USER_WARNING);
}
return false;
}
}
/**
/**
* Shutdown the pool of threads cleanly, retaining exit status locally
* Setter for previous (potentially incomplete) search results
*/
*/
public function shutdown( )
public function set_results($res)
{
{
foreach ($this->workers as $worker) {
$this->results = $res;
$this->states[$worker->getThreadId()] = $worker->shutdown();
$worker->close();
}
# console('shutdown', $this->states);
}
}
/**
/**
* Get connection to the IMAP server
* Get connection to the IMAP server
* (used for single-thread mode)
* (used for single-thread mode)
@ -151,7 +116,7 @@ class rcube_imap_search
/**
/**
* Stackable item to run the search on a specific IMAP folder
* Stackable item to run the search on a specific IMAP folder
*/
*/
class rcube_imap_search_job extends Stackable
class rcube_imap_search_job /* extends Stackable */
{
{
private $folder;
private $folder;
private $search;
private $search;
@ -169,13 +134,14 @@ class rcube_imap_search_job extends Stackable
$this->charset = $charset;
$this->charset = $charset;
$this->sort_field = $sort_field;
$this->sort_field = $sort_field;
$this->threading = $threading;
$this->threading = $threading;
$this->result = new rcube_result_index($folder);
$this->result->incomplete = true;
}
}
public function run()
public function run()
{
{
// trigger_error("Start search $this->folder", E_USER_NOTICE);
$this->result = $this->search_index();
$this->result = $this->search_index();
// trigger_error("End search $this->folder: " . $this->result->count(), E_USER_NOTICE);
}
}
/**
/**
@ -183,7 +149,6 @@ class rcube_imap_search_job extends Stackable
*/
*/
protected function search_index()
protected function search_index()
{
{
$pthreads = defined('PTHREADS_INHERIT_ALL');
$criteria = $this->search;
$criteria = $this->search;
$charset = $this->charset;
$charset = $this->charset;
@ -193,10 +158,10 @@ class rcube_imap_search_job extends Stackable
trigger_error("No IMAP connection for $this->folder", E_USER_WARNING);
trigger_error("No IMAP connection for $this->folder", E_USER_WARNING);
if ($this->threading) {
if ($this->threading) {
return new rcube_result_thread();
return new rcube_result_thread($this->folder );
}
}
else {
else {
return new rcube_result_index();
return new rcube_result_index($this->folder );
}
}
}
}
@ -220,10 +185,6 @@ class rcube_imap_search_job extends Stackable
rcube_imap::convert_criteria($criteria, $charset), true, 'US-ASCII');
rcube_imap::convert_criteria($criteria, $charset), true, 'US-ASCII');
}
}
// close IMAP connection again
if ($pthreads)
$imap->closeConnection();
return $threads;
return $threads;
}
}
@ -249,10 +210,6 @@ class rcube_imap_search_job extends Stackable
}
}
}
}
// close IMAP connection again
if ($pthreads)
$imap->closeConnection();
return $messages;
return $messages;
}
}
@ -271,67 +228,7 @@ class rcube_imap_search_job extends Stackable
{
{
return $this->result;
return $this->result;
}
}
}
/**
* Worker thread to run search jobs while maintaining a common context
*/
class rcube_imap_search_worker extends Worker
{
public $id;
public $options;
private $conn;
private $counts = 0;
/**
* Default constructor
*/
public function __construct($id, $options)
{
$this->id = $id;
$this->options = $options;
}
/**
* Get a dedicated connection to the IMAP server
*/
public function get_imap()
{
// TODO: make this connection persistent for several jobs
// This doesn't seem to work. Socket connections don't survive serialization which is used in pthreads
$conn = new rcube_imap_generic();
# $conn->setDebug(true, function($conn, $message){ trigger_error($message, E_USER_NOTICE); });
if ($this->options['user'] & & $this->options['password']) {
$this->options['ident']['command'] = 'search-' . $this->id . 't' . ++$this->counts;
$conn->connect($this->options['host'], $this->options['user'], $this->options['password'], $this->options);
}
if ($conn->error)
trigger_error($conn->error, E_USER_WARNING);
return $conn;
}
/**
* @override
*/
public function run()
{
}
/**
* Close IMAP connection
*/
public function close()
{
if ($this->conn) {
$this->conn->close();
}
}
}
}