如何創(chuàng)建長時間運行的作業(yè)
如果 Web 應(yīng)用程序中的一個特性需要超過 1 秒或 2 秒才能完成,那么應(yīng)該怎么辦?需要某種離線處理解決方案。學(xué)習(xí)幾種對 PHP 應(yīng)用程序中長時間運行的作業(yè)進行離線服務(wù)的方法。
大型的連鎖店有一個大問題。每天,在每家商店會發(fā)生數(shù)千次交易。公司執(zhí)行官希望對這些數(shù)據(jù)進行挖掘。哪些產(chǎn)品賣得好?哪些不好?有機產(chǎn)品在哪里賣得好?冰淇淋的銷售情況怎么樣?
為了捕捉這些數(shù)據(jù),組織必須將所有事務(wù)性數(shù)據(jù)裝載進一個數(shù)據(jù)模型,以便更適合生成公司所需的報告類型。但是,這很花費時間,而且隨著連鎖規(guī)模的增長,處理一天的數(shù)據(jù)可能要花費一天以上的時間。因此,這是個大問題。
現(xiàn)在,您的 Web 應(yīng)用程序可能不需要處理這么多數(shù)據(jù),但是任何站點的處理時間都有可能超過客戶愿意等待的時間。一般來說,客戶愿意等待的時間是 200 毫秒,如果超過這個時間,客戶就會覺得過程 “緩慢”。這個數(shù)字基于桌面應(yīng)用程序,而 Web 使我們更有耐心了。但無論如何,不應(yīng)該讓客戶等待的時間超過幾秒。所以,要采用一些策略來處理 PHP 中的批處理作業(yè)。
分散的方式與 cron
在 UNIX® 機器上,執(zhí)行批處理的核心程序是 cron 守護進程。這個守護進程讀取一個配置文件,這個文件會告訴它要運行哪些命令行以及運行的頻率。然后,這個守護進程就按照配置執(zhí)行它們。在遇到錯誤時,它甚至能夠向指定的電子郵件地址發(fā)送錯誤輸出,從而幫助對問題進行調(diào)試。
我知道一些工程師強烈主張使用線程技術(shù)。“線程!線程才是進行后臺處理的真正方法。cron 守護進程太過時了。”
我不這么認(rèn)為。
這兩種方法我都用過,我認(rèn)為 cron 具備 “Keep It Simple, Stupid(KISS,簡單就是美)” 原則的優(yōu)點。它使后臺處理保持簡單。不需要編寫一直運行的多線程的作業(yè)處理應(yīng)用程序(因此不會有內(nèi)存泄漏),而是由 cron 啟動一個簡單的批處理腳本。這個腳本判斷是否有作業(yè)要處理,執(zhí)行作業(yè),然后退出。不需要擔(dān)心內(nèi)存泄漏。也不需要擔(dān)心線程停止或陷入無限循環(huán)。
那么,cron 是如何工作的?這依賴于您所處的系統(tǒng)環(huán)境。我只討論老式簡單的 cron 的 UNIX 命令行版本,您可以向系統(tǒng)管理員咨詢?nèi)绾卧谧约旱?Web 應(yīng)用程序中實現(xiàn)它。
下面是一個簡單的 cron 配置,它在每天晚上 11 點運行一個 PHP 腳本:
0 23 * * * jack /usr/bin/php /users/home/jack/myscript.php
前 5 個字段定義應(yīng)該啟動腳本的時間。然后是應(yīng)該用來運行這個腳本的用戶名。其余的命令是要執(zhí)行的命令行。時間字段分別是分、小時、月中的日、月和周中的日。下面是幾個示例。
命令:
15 * * * * jack /usr/bin/php /users/home/jack/myscript.php
在每個小時的第 15 分鐘運行腳本。
命令:
15,45 * * * * jack /usr/bin/php /users/home/jack/myscript.php
在每個小時的第 15 和第 45 分鐘運行腳本。
命令:
*/1 3-23 * * * jack /usr/bin/php /users/home/jack/myscript.php
在早上 3 點到晚上 11 點之間的每分鐘運行腳本。
命令
30 23 * * 6 jack /usr/bin/php /users/home/jack/myscript.php
在每星期六的晚上 11:30 運行腳本(星期六由 6 指定)。
可以看到,組合的數(shù)量是無限的。可以根據(jù)需要控制運行腳本的時間。還可以指定多個要運行的腳本,這樣的話,一些腳本可以每分鐘都運行,而其他腳本(比如備份腳本)可以每天只運行一次。
為了指定將報告的錯誤發(fā)送到哪個電子郵件地址,可以使用 MAILTO 指令,如下所示:
MAILTO=jherr@pobox.com
注意:對于 Microsoft® Windows® 用戶,有一個等效的 Scheduled Tasks 系統(tǒng)可以用來定期啟動命令行進程(比如 PHP 腳本)。
批處理體系結(jié)構(gòu)的基礎(chǔ)知識
批處理是相當(dāng)簡單的。在大多數(shù)情況下,采用兩個工作流之一。第一個工作流用于進行報告;腳本每天運行一次,它生成報告并將報告發(fā)送給一組用戶。第二個工作流是在響應(yīng)某種請求時創(chuàng)建的批作業(yè)。例如,我登錄進 Web 應(yīng)用程序中,并要求它向系統(tǒng)中注冊的所有用戶發(fā)送一個消息,將一個新的特性告訴他們。這個操作必須進行批處理,因為系統(tǒng)中有 10,000 個用戶。PHP 要花費一段時間才能完成這樣的任務(wù),所以它必須由瀏覽器之外的一個作業(yè)來執(zhí)行。
在第二個工作流中,Web 應(yīng)用程序只需將信息放在某個位置,讓批處理應(yīng)用程序共享它。這些信息指定作業(yè)的性質(zhì)(例如,“Send this e-mail to all the people on the system”。)批處理程序運行這個作業(yè),然后刪除作業(yè)。另一種方法是,處理程序?qū)⒆鳂I(yè)標(biāo)為已完成。無論用哪種方法,作業(yè)都應(yīng)該識別為已完成,這樣就不會再次運行它。
本文的其余部分演示在 Web 應(yīng)用程序前端和批處理后端之間共享數(shù)據(jù)的各種方法。
郵件隊列
第一種方法是使用專用的郵件隊列系統(tǒng)。在這種模型中,數(shù)據(jù)庫中的一個表包含應(yīng)該發(fā)送給各個用戶的電子郵件消息。Web 界面使用mailouts 類將電子郵件添加到隊列中。電子郵件處理程序使用 mailouts 類檢索未處理的電子郵件,然后再次使用它從隊列中刪除未處理的電子郵件。
這個模型首先需要 MySQL 模式。
清單 1. mailout.sql
DROP TABLE IF EXISTS mailouts; CREATE TABLE mailouts ( id MEDIUMINT NOT NULL AUTO_INCREMENT, from_address TEXT NOT NULL, to_address TEXT NOT NULL, subject TEXT NOT NULL, content TEXT NOT NULL, PRIMARY KEY ( id ) );
這個模式非常簡單。每行中有一個 from 和一個 to 地址,以及電子郵件的主題和內(nèi)容。
對數(shù)據(jù)庫中的 mailouts 表進行處理的是 PHP mailouts 類。
清單 2. mailouts.php
getMessage()); } return $db; } public static function delete( $id ) { $db = Mailouts::get_db(); $sth = $db->prepare( 'DELETE FROM mailouts WHERE id=?' ); $db->execute( $sth, $id ); return true; } public static function add( $from, $to, $subject, $content ) { $db = Mailouts::get_db(); $sth = $db->prepare( 'INSERT INTO mailouts VALUES (null,?,?,?,?)' ); $db->execute( $sth, array( $from, $to, $subject, $content ) ); return true; } public static function get_all() { $db = Mailouts::get_db(); $res = $db->query( "SELECT * FROM mailouts" ); $rows = array(); while( $res->fetchInto( $row ) ) { $rows []= $row; } return $rows; } } ?>
這個腳本包含 Pear::DB 數(shù)據(jù)庫訪問類。然后定義 mailouts 類,其中包含三個主要的靜態(tài)函數(shù):add、delete 和 get_all。add() 方法向隊列中添加一個電子郵件,這個方法由前端使用。get_all() 方法從表中返回所有數(shù)據(jù)。delete() 方法刪除一個電子郵件。
您可能會問,我為什么不只在腳本末尾調(diào)用 delete_all() 方法。不這么做有兩個原因:如果在發(fā)送每個消息之后刪除它,那么即使腳本在出現(xiàn)問題之后重新運行,消息也不可能發(fā)送兩次;在批作業(yè)的啟動和完成之間可能會添加新的消息。
下一步是編寫一個簡單的測試腳本,這個腳本將一個條目添加到隊列中。
清單 3. mailout_test_add.php
在這個示例中,我添加一個 mailout,這個消息要發(fā)送給某公司的 Molly,其中包括主題 “Test Subject” 和電子郵件主體??梢栽诿钚猩线\行這個腳本:php mailout_test_add.php。
為了發(fā)送電子郵件,需要另一個腳本,這個腳本作為作業(yè)處理程序。
清單 4. mailout_send.php
這個腳本使用 get_all() 方法檢索所有電子郵件消息,然后使用 PHP 的 mail() 方法逐一發(fā)送消息。在每次成功發(fā)送電子郵件之后,調(diào)用delete() 方法從隊列中刪除對應(yīng)的記錄。
使用 cron 守護進程定期運行這個腳本。運行這個腳本的頻率取決于您的應(yīng)用程序的需要。
注意:PHP Extension and Application Repository(PEAR)存儲庫包含一個出色的 郵件隊列系統(tǒng) 實現(xiàn),可以免費下載。
更通用的方法
專門用來發(fā)送電子郵件的解決方案是很不錯,但是是否有更通用的方法?我們需要能夠發(fā)送電子郵件、生成報告或者執(zhí)行其他耗費時間的處理,而不必在瀏覽器中等待處理完成。
為此,可以利用一個事實:PHP 是一種解釋型語言??梢詫?PHP 代碼存儲在數(shù)據(jù)庫中的隊列中,以后再執(zhí)行它。這需要兩個表,見清單 5。
清單 5. generic.sql
DROP TABLE IF EXISTS processing_items; CREATE TABLE processing_items ( id MEDIUMINT NOT NULL AUTO_INCREMENT, function TEXT NOT NULL, PRIMARY KEY ( id ) ); DROP TABLE IF EXISTS processing_args; CREATE TABLE processing_args ( id MEDIUMINT NOT NULL AUTO_INCREMENT, item_id MEDIUMINT NOT NULL, key_name TEXT NOT NULL, value TEXT NOT NULL, PRIMARY KEY ( id ) );
第一個表 processing_items 包含作業(yè)處理程序調(diào)用的函數(shù)。第二個表 processing_args 包含要發(fā)送給函數(shù)的參數(shù),采用的形式是由鍵/值對組成的 hash 表。
與 mailouts 表一樣,這兩個表也由 PHP 類包裝,這個類稱為 ProcessingItems。
清單 6. generic.php
prepare( 'DELETE FROM processing_args WHERE item_id=?' ); $db->execute( $sth, $id ); $sth = $db->prepare( 'DELETE FROM processing_items WHERE id=?' ); $db->execute( $sth, $id ); return true; } public static function add( $function, $args ) { $db = ProcessingItems::get_db(); $sth = $db->prepare( 'INSERT INTO processing_items VALUES (null,?)' ); $db->execute( $sth, array( $function ) ); $res = $db->query( "SELECT last_insert_id()" ); $id = null; while( $res->fetchInto( $row ) ) { $id = $row[0]; } foreach( $args as $key => $value ) { $sth = $db->prepare( 'INSERT INTO processing_args VALUES (null,?,?,?)' ); $db->execute( $sth, array( $id, $key, $value ) ); } return true; } public static function get_all() { $db = ProcessingItems::get_db(); $res = $db->query( "SELECT * FROM processing_items" ); $rows = array(); while( $res->fetchInto( $row ) ) { $item = array(); $item['id'] = $row[0]; $item['function'] = $row[1]; $item['args'] = array(); $ares = $db->query( "SELECT key_name, value FROM processing_args WHERE item_id=?", $item['id'] ); while( $ares->fetchInto( $arow ) ) $item['args'][ $arow[0] ] = $arow[1]; $rows []= $item; } return $rows; } } ?>
這個類包含三個重要的方法:add()、get_all() 和 delete()。與 mailouts 系統(tǒng)一樣,前端使用 add(),處理引擎使用 get_all() 和delete()。
清單 7 所示的測試腳本將一個條目添加到處理隊列中。
清單 7. generic_test_add.php
'foo' ) ); ?>
在這個示例中,添加了一個對 printvalue 函數(shù)的調(diào)用,并將 value 參數(shù)設(shè)置為 foo。我使用 PHP 命令行解釋器運行這個腳本,并將這個方法調(diào)用放進隊列中。然后使用以下處理腳本運行這個方法。
清單 8. generic_process.php
這個腳本非常簡單。它獲得 get_all() 返回的處理條目,然后使用 call_user_func_array(一個 PHP 內(nèi)部函數(shù))用給定的參數(shù)動態(tài)地調(diào)用這個方法。在這個示例中,調(diào)用本地的 printvalue 函數(shù)。
為了演示這種功能,我們看看在命令行上發(fā)生了什么:
% php generic_test_add.php % php generic_process.php Printing: foo %
輸出并不多,但是您能夠看出要點。通過這種機制,可以將任何 PHP 函數(shù)的處理推遲。
現(xiàn)在,如果您不喜歡將 PHP 函數(shù)名和參數(shù)放進數(shù)據(jù)庫中,那么另一種方法是在 PHP 代碼中建立數(shù)據(jù)庫中的 “處理作業(yè)類型” 名稱和實際 PHP 處理函數(shù)之間的映射。按照這種方式,如果以后決定修改 PHP 后端,那么只要 “處理作業(yè)類型” 字符串匹配,系統(tǒng)就仍然可以工作。
放棄數(shù)據(jù)庫
最后,我演示另一種稍有不同的解決方案,它使用一個目錄中的文件來存儲批作業(yè),而不是使用數(shù)據(jù)庫。在這里提供這個思路并不是建議您 “采用這種方式,而不使用數(shù)據(jù)庫”,這只是一種可供選擇的方式,是否采用它由您決定。
顯然,這個解決方案中沒有模式,因為我們不使用數(shù)據(jù)庫。所以先編寫一個類,它包含與前面示例中相似的 add()、get_all() 和 delete()方法。
清單 9. batch_by_file.php
$v ) { fprintf( $fh, $k.":".$v."\n" ); } fclose( $fh ); return true; } public static function get_all() { $rows = array(); if (is_dir(BATCH_DIRECTORY)) { if ($dh = opendir(BATCH_DIRECTORY)) { while (($file = readdir($dh)) !== false) { $path = BATCH_DIRECTORY.$file; if ( is_dir( $path ) == false ) { $item = array(); $item['id'] = $path; $fh = fopen( $path, 'r' ); if ( $fh ) { $item['function'] = trim(fgets( $fh )); $item['args'] = array(); while( ( $line = fgets( $fh ) ) != null ) { $args = split( ':', trim($line) ); $item['args'][$args[0]] = $args[1]; } $rows []= $item; fclose( $fh ); } } } closedir($dh); } } return $rows; } } ?>
BatchFiles 類有三個主要方法:add()、get_all() 和 delete()。這個類不訪問數(shù)據(jù)庫,而是讀寫 batch_items 目錄中的文件。
使用以下測試代碼添加新的批處理條目。
清單 10. batch_by_file_test_add.php
'foo' ) ); ?>
有一點需要注意:除了類名(BatchFiles)之外,實際上沒有任何跡象能夠說明作業(yè)是如何存儲的。所以,以后很容易將它改為數(shù)據(jù)庫風(fēng)格的存儲方式,而不需要修改接口。
最后是處理程序的代碼。
清單 11. batch_by_file_processor.php
這段代碼幾乎與數(shù)據(jù)庫版本完全相同,只是修改了文件名和類名。
結(jié)語
正如前面提到的,服務(wù)器對線程提供了許多支持,可以進行后臺批處理。在某些情況下,使用輔助線程處理小作業(yè)肯定比較容易。但是,也可以使用傳統(tǒng)工具(cron、MySQL、標(biāo)準(zhǔn)的面向?qū)ο蟮?PHP 和 Pear::DB)在 PHP 應(yīng)用程序中創(chuàng)建批作業(yè),這很容易實現(xiàn)、部署和維護。