<p> 大型的连锁店有一个大问题。每天,在每家商店会发生数千次交易。公司执行官希望对这些数据进行挖掘。哪些产品卖得好?哪些不好?有机产品在哪里卖得好?冰淇淋的销售情况怎么样?</p><p> 为了捕捉这些数据,组织必须将所有事务性数据装载进一个数据模型,以便更适合生成公司所需的报告类型。但是,这很花费时间,而且随着连锁规模的增长,处理一天的数据可能要花费一天以上的时间。因此,这是个大问题。</p><p> 现在,您的 Web 应用程序可能不需要处理这么多数据,但是任何站点的处理时间都有可能超过客户愿意等待的时间。一般来说,客户愿意等待的时间是 200 毫秒,如果超过这个时间,客户就会觉得过程 “缓慢”。这个数字基于桌面应用程序,而 Web 使我们更有耐心了。但无论如何,不应该让客户等待的时间超过几秒。所以,要采用一些策略来处理 PHP 中的批处理作业。</p><p> 分散的方式与 cron</p><p> 在 UNIX® 机器上,执行批处理的核心程序是 cron 守护进程。这个守护进程读取一个配置文件,这个文件会告诉它要运行哪些命令行以及运行的频率。然后,这个守护进程就按照配置执行它们。在遇到错误时,它甚至能够向指定的电子邮件地址发送错误输出,从而帮助对问题进行调试。</p><p> 我知道一些工程师强烈主张使用线程技术。“线程!线程才是进行后台处理的真正方法。cron 守护进程太过时了。”</p><p> 我不这么认为。</p><p> 这两种方法我都用过,我认为 cron 具备 “Keep It Simple, Stupid(KISS,简单就是美)” 原则的优点。它使后台处理保持简单。不需要编写一直运行的多线程的作业处理应用程序(因此不会有内存泄漏),而是由 cron 启动一个简单的批处理脚本。这个脚本判断是否有作业要处理,执行作业,然后退出。不需要担心内存泄漏。也不需要担心线程停止或陷入无限循环。</p>
<p> </p>
<p> 那么,cron 是如何工作的?这依赖于您所处的系统环境。我只讨论老式简单的 cron 的 UNIX 命令行版本,您可以向系统管理员咨询如何在自己的 Web 应用程序中实现它。</p><p> 下面是一个简单的 cron 配置,它在每天晚上 11 点运行一个 PHP 脚本:</p><code>0 23 * * * jack /usr/bin/php /users/home/jack/myscript.php</code></p><p> 前 5 个字段定义应该启动脚本的时间。然后是应该用来运行这个脚本的用户名。其余的命令是要执行的命令行。时间字段分别是分、小时、月中的日、月和周中的日。下面是几个示例。</p><p> 命令:</p><code>15 * * * * jack /usr/bin/php /users/home/jack/myscript.php</code></p><p> 在每个小时的第 15 分钟运行脚本。</p><p> 命令:</p><code>15,45 * * * * jack /usr/bin/php /users/home/jack/myscript.php</code></p><p> 在每个小时的第 15 和第 45 分钟运行脚本。</p><p> 命令:</p><code>*/1 3-23 * * * jack /usr/bin/php /users/home/jack/myscript.php</code></p><p> 在早上 3 点到晚上 11 点之间的每分钟运行脚本。</p><p> 命令</p><code>30 23 * * 6 jack /usr/bin/php /users/home/jack/myscript.php</code></p><p> 在每星期六的晚上 11:30 运行脚本(星期六由 6 指定)。</p><p> 可以看到,组合的数量是无限的。可以根据需要控制运行脚本的时间。还可以指定多个要运行的脚本,这样的话,一些脚本可以每分钟都运行,而其他脚本(比如备份脚本)可以每天只运行一次。</p><p> 为了指定将报告的错误发送到哪个电子邮件地址,可以使用 MAILTO 指令,如下所示:</p><code>MAILTO=jherr@pobox.com</code></p><p> 注意:对于 Microsoft® Windows® 用户,有一个等效的 Scheduled Tasks 系统可以用来定期启动命令行进程(比如 PHP 脚本)。</p>
<p> </p>
<p> 批处理体系结构的基础知识</p><p> 批处理是相当简单的。在大多数情况下,采用两个工作流之一。第一个工作流用于进行报告;脚本每天运行一次,它生成报告并将报告发送给一组用户。第二个工作流是在响应某种请求时创建的批作业。例如,我登录进 Web 应用程序中,并要求它向系统中注册的所有用户发送一个消息,将一个新的特性告诉他们。这个操作必须进行批处理,因为系统中有 10,000 个用户。PHP 要花费一段时间才能完成这样的任务,所以它必须由浏览器之外的一个作业来执行。</p><p> 在第二个工作流中,Web 应用程序只需将信息放在某个位置,让批处理应用程序共享它。这些信息指定作业的性质(例如,“Send this e-mail to all the people on the system”。)批处理程序运行这个作业,然后删除作业。另一种方法是,处理程序将作业标为已完成。无论用哪种方法,作业都应该识别为已完成,这样就不会再次运行它。</p><p> 本文的其余部分演示在 Web 应用程序前端和批处理后端之间共享数据的各种方法。</p><p> 邮件队列</p><p> 第一种方法是使用专用的邮件队列系统。在这种模型中,数据库中的一个表包含应该发送给各个用户的电子邮件消息。Web 界面使用 mailouts 类将电子邮件添加到队列中。电子邮件处理程序使用 mailouts 类检索未处理的电子邮件,然后再次使用它从队列中删除未处理的电子邮件。</p><p> 这个模型首先需要 MySQL 模式。</p><p> 清单 1. mailout.sql<code>DROP TABLE IF EXISTS mailouts;<br />CREATE TABLE mailouts (<br /> id MEDIUMINT NOT NULL AUTO_INCREMENT,<br /> from_address TEXT NOT NULL,<br /> to_address TEXT NOT NULL,<br /> subject TEXT NOT NULL,<br /> content TEXT NOT NULL,<br /> PRIMARY KEY ( id )<br />);</code></p>
<p> </p>
<p> 这个模式非常简单。每行中有一个 from 和一个 to 地址,以及电子邮件的主题和内容。</p><p> 对数据库中的 mailouts 表进行处理的是 PHP mailouts 类。</p><p> 清单 2. mailouts.php<code><?php<br />require_once('DB.php');<br />class Mailouts<br />{<br /> public static function get_db()<br /> {<br /> $dsn = 'mysql://root:@localhost/mailout';<br /> $db =& DB::Connect( $dsn, array() );<br /> if (PEAR::isError($db)) { die($db->getMessage()); }<br /> return $db;<br /> }<br /> public static function delete( $id )<br /> {<br /> $db = Mailouts::get_db();<br /> $sth = $db->prepare( 'DELETE FROM mailouts WHERE id=?' );<br /> $db->execute( $sth, $id );<br /> return true;<br /> }<br /> public static function add( $from, $to, $subject, $content )<br /> {<br /> $db = Mailouts::get_db();<br /> $sth = $db->prepare( 'INSERT INTO mailouts VALUES (null,?,?,?,?)' );<br /> $db->execute( $sth, array( $from, $to, $subject, $content ) );<br /> return true;<br /> }<br /> public static function get_all()<br /> {<br /> $db = Mailouts::get_db();<br /> $res = $db->query( "SELECT * FROM mailouts" );<br /> $rows = array();<br /> while( $res->fetchInto( $row ) ) { $rows []= $row; }<br /> return $rows;<br /> }<br />}<br />?></code></p><p> 这个脚本包含 Pear::DB 数据库访问类。然后定义 mailouts 类,其中包含三个主要的静态函数:add、delete 和 get_all。add() 方法向队列中添加一个电子邮件,这个方法由前端使用。get_all() 方法从表中返回所有数据。delete() 方法删除一个电子邮件。</p>
<p> </p>
<p> 您可能会问,我为什么不只在脚本末尾调用 delete_all() 方法。不这么做有两个原因:如果在发送每个消息之后删除它,那么即使脚本在出现问题之后重新运行,消息也不可能发送两次;在批作业的启动和完成之间可能会添加新的消息。</p><p> 下一步是编写一个简单的测试脚本,这个脚本将一个条目添加到队列中。</p><p> 清单 3. mailout_test_add.php<code><?php<br />require 'mailout.php';<br />Mailouts::add( 'donotreply@mydomain.com',<br /> 'molly@nocompany.com.org',<br /> 'Test Subject',<br /> 'This is a test of the batch mail sendout' );<br />?></code></p><p> 在这个示例中,我添加一个 mailout,这个消息要发送给某公司的 Molly,其中包括主题 “Test Subject” 和电子邮件主体。可以在命令行上运行这个脚本:php mailout_test_add.php。</p><p> 为了发送电子邮件,需要另一个脚本,这个脚本作为作业处理程序。</p><p> 清单 4. mailout_send.php<code><?php<br />require_once 'mailout.php';<br />function process( $from, $to, $subject, $email ) {<br /> mail( $to, $subject, $email, "From: $from" );<br />}<br />$messages = Mailouts::get_all();<br />foreach( $messages as $msg ) {<br /> process( $msg[1], $msg[2], $msg[3], $msg[4] );<br /> Mailouts::delete( $msg[0] );<br />}<br />?></code></p><p> 这个脚本使用 get_all() 方法检索所有电子邮件消息,然后使用 PHP 的 mail() 方法逐一发送消息。在每次成功发送电子邮件之后,调用 delete() 方法从队列中删除对应的记录。</p><p> 使用 cron 守护进程定期运行这个脚本。运行这个脚本的频率取决于您的应用程序的需要。</p>
<p> </p>
<p> 更通用的方法</p><p> 专门用来发送电子邮件的解决方案是很不错,但是是否有更通用的方法?我们需要能够发送电子邮件、生成报告或者执行其他耗费时间的处理,而不必在浏览器中等待处理完成。</p><p> 为此,可以利用一个事实:PHP 是一种解释型语言。可以将 PHP 代码存储在数据库中的队列中,以后再执行它。这需要两个表,见清单 5。</p><p> 清单 5. generic.sql<code>DROP TABLE IF EXISTS processing_items;<br />CREATE TABLE processing_items (<br /> id MEDIUMINT NOT NULL AUTO_INCREMENT,<br /> function TEXT NOT NULL,<br /> PRIMARY KEY ( id )<br />);<br />DROP TABLE IF EXISTS processing_args;<br />CREATE TABLE processing_args (<br /> id MEDIUMINT NOT NULL AUTO_INCREMENT,<br /> item_id MEDIUMINT NOT NULL,<br /> key_name TEXT NOT NULL,<br /> value TEXT NOT NULL,<br /> PRIMARY KEY ( id )<br />);</code></p><p> 第一个表 processing_items 包含作业处理程序调用的函数。第二个表 processing_args 包含要发送给函数的参数,采用的形式是由键/值对组成的 hash 表。</p><p> 与 mailouts 表一样,这两个表也由 PHP 类包装,这个类称为 ProcessingItems。</p><p> 清单 6. generic.php<code><?php<br />require_once('DB.php');<br />class ProcessingItems<br />{<br /> public static function get_db() { ... }<br /> public static function delete( $id )<br /> {<br /> $db = ProcessingItems::get_db();<br /> $sth = $db->prepare( 'DELETE FROM processing_args WHERE item_id=?' );<br /> $db->execute( $sth, $id );<br /> $sth = $db->prepare( 'DELETE FROM processing_items WHERE id=?' );<br /> $db->execute( $sth, $id );<br /> return true;<br /> }<br /> public static function add( $function, $args )<br /> {<br /> $db = ProcessingItems::get_db();<br /> $sth = $db->prepare( 'INSERT INTO processing_items VALUES (null,?)' );<br /> $db->execute( $sth, array( $function ) );<br /> $res = $db->query( "SELECT last_insert_id()" );<br /> $id = null;<br /> while( $res->fetchInto( $row ) ) { $id = $row[0]; }<br /> foreach( $args as $key => $value )<br /> {<br /> $sth = $db->prepare( 'INSERT INTO processing_args<br /> VALUES (null,?,?,?)' );<br /> $db->execute( $sth, array( $id, $key, $value ) );<br /> }<br /> return true;<br /> }<br /> public static function get_all()<br /> {<br /> $db = ProcessingItems::get_db();<br /> $res = $db->query( "SELECT * FROM processing_items" );<br /> $rows = array();<br /> while( $res->fetchInto( $row ) )<br /> {<br /> $item = array();<br /> $item['id'] = $row[0];<br /> $item['function'] = $row[1];<br /> $item['args'] = array();<br /> $ares = $db->query( "SELECT key_name, value FROM<br /> processing_args WHERE item_id=?", $item['id'] );<br /> while( $ares->fetchInto( $arow ) )<br /> $item['args'][ $arow[0] ] = $arow[1];<br /> $rows []= $item;<br /> }<br /> return $rows;<br /> }<br />}<br />?></code></p>
<p> </p>
<p> 这个类包含三个重要的方法:add()、get_all() 和 delete()。与 mailouts 系统一样,前端使用 add(),处理引擎使用 get_all() 和 delete()。</p><p> 清单 7 所示的测试脚本将一个条目添加到处理队列中。</p><p> 清单 7. generic_test_add.php<code><?php<br />require_once 'generic.php';<br />ProcessingItems::add( 'printvalue', array( 'value' => 'foo' ) );<br />?></code></p><p> 在这个示例中,添加了一个对 printvalue 函数的调用,并将 value 参数设置为 foo。我使用 PHP 命令行解释器运行这个脚本,并将这个方法调用放进队列中。然后使用以下处理脚本运行这个方法。</p><p> 清单 8. generic_process.php<code><?php<br />require_once 'generic.php';<br />function printvalue( $args ) {<br /> echo 'Printing: '.$args['value']."<br />";<br />}<br />foreach( ProcessingItems::get_all() as $item ) {<br /> call_user_func_array( $item['function'],<br /> array( $item['args'] ) );<br /> ProcessingItems::delete( $item['id'] );<br />}<br />?></code></p><p> 这个脚本非常简单。它获得 get_all() 返回的处理条目,然后使用 call_user_func_array(一个 PHP 内部函数)用给定的参数动态地调用这个方法。在这个示例中,调用本地的 printvalue 函数。</p><p> 为了演示这种功能,我们看看在命令行上发生了什么:</p><code>% php generic_test_add.php<br />% php generic_process.php<br />Printing: foo<br />%</code></p><p> 输出并不多,但是您能够看出要点。通过这种机制,可以将任何 PHP 函数的处理推迟。</p>
<p> </p>
<p> 现在,如果您不喜欢将 PHP 函数名和参数放进数据库中,那么另一种方法是在 PHP 代码中建立数据库中的 “处理作业类型” 名称和实际 PHP 处理函数之间的映射。按照这种方式,如果以后决定修改 PHP 后端,那么只要 “处理作业类型” 字符串匹配,系统就仍然可以工作。</p><p> 放弃数据库</p><p> 最后,我演示另一种稍有不同的解决方案,它使用一个目录中的文件来存储批作业,而不是使用数据库。在这里提供这个思路并不是建议您 “采用这种方式,而不使用数据库”,这只是一种可供选择的方式,是否采用它由您决定。</p><p> 显然,这个解决方案中没有模式,因为我们不使用数据库。所以先编写一个类,它包含与前面示例中相似的 add()、get_all() 和 delete() 方法。</p><p> 清单 9. batch_by_file.php<code><?php<br />define( 'BATCH_DIRECTORY', 'batch_items/' );<br />class BatchFiles<br />{<br /> public static function delete( $id )<br /> {<br /> unlink( $id );<br /> return true;<br /> }<br /> public static function add( $function, $args )<br /> {<br /> $path = '';<br /> while( true )<br /> {<br /> $path = BATCH_DIRECTORY.time();<br /> if ( file_exists( $path ) == false )<br /> break;<br /> }<br /> $fh = fopen( $path, "w" );<br /> fprintf( $fh, $function."<br />" );<br /> foreach( $args as $k => $v )<br /> {<br /> fprintf( $fh, $k.":".$v."<br />" );<br /> }<br /> fclose( $fh );<br /> return true;<br /> }<br /> public static function get_all()<br /> {<br /> $rows = array();<br /> if (is_dir(BATCH_DIRECTORY)) {<br /> if ($dh = opendir(BATCH_DIRECTORY)) {<br /> while (($file = readdir($dh)) !== false) {<br /> $path = BATCH_DIRECTORY.$file;<br /> if ( is_dir( $path ) == false )<br /> {<br /> $item = array();<br /> $item['id'] = $path;<br /> $fh = fopen( $path, 'r' );<br /> if ( $fh )<br /> {<br /> $item['function'] = trim(fgets( $fh ));<br /> $item['args'] = array();<br /> while( ( $line = fgets( $fh ) ) != null )<br /> {<br /> $args = split( ':', trim($line) );<br /> $item['args'][$args[0]] = $args[1];<br /> }<br /> $rows []= $item;<br /> fclose( $fh );<br /> }<br /> }<br /> }<br /> closedir($dh);<br /> }<br /> }<br /> return $rows;<br /> }<br />}<br />?></code></p></p><p> BatchFiles 类有三个主要方法:add()、get_all() 和 delete()。这个类不访问数据库,而是读写 batch_items 目录中的文件。</p><p> 使用以下测试代码添加新的批处理条目。</p><p> 清单 10. batch_by_file_test_add.php<code><?php<br />require_once 'batch_by_file.php';<br />BatchFiles::add( "printvalue", array( 'value' => 'foo' ) );<br />?></code></p><p> 有一点需要注意:除了类名(BatchFiles)之外,实际上没有任何迹象能够说明作业是如何存储的。所以,以后很容易将它改为数据库风格的存储方式,而不需要修改接口。</p><p> 最后是处理程序的代码。</p><p> 清单 11. batch_by_file_processor.php<code><?php<br />require_once 'batch_by_file.php';<br />function printvalue( $args ) {<br /> echo 'Printing: '.$args['value']."<br />";<br />}<br />foreach( BatchFiles::get_all() as $item ) {<br /> call_user_func_array( $item['function'], array( $item['args'] ) );<br /> BatchFiles::delete( $item['id'] );<br />}<br />?></code></p><p> 这段代码几乎与数据库版本完全相同,只是修改了文件名和类名。</p><p> 结束语</p><p> 正如前面提到的,服务器对线程提供了许多支持,可以进行后台批处理。在某些情况下,使用辅助线程处理小作业肯定比较容易。但是,也可以使用传统工具(cron、MySQL、标准的面向对象的 PHP 和 Pear::DB)在 PHP 应用程序中创建批作业,这很容易实现、部署和维护。</p></p>