精華區beta MATLAB 關於我們 聯絡資訊
一開始寫分散式處理程式的時候, 常常覺得會把 code 弄得很亂 因為要連線到 job-manager, 要指定 job 和裡面的 tasks 工作丟進去算, 算完還不一定對, 有錯也不會自動重算 整體來講, Matlab提供的 Distributed Computing Toolbox 直接要拿來用, 真的不怎麼好用 ^^" 經過一陣子嘗試, 我把要丟給 Distributed Computing 的任務 用一個 wrapper (包裝器) 包裝起來, 上面第一段提到的細節全部都寫在 wrapper 程式碼也變得比較精簡 wrapper的內容, 對於需要跟 Dist-Computing toolbox 打交道的朋友 應該也是有幫助 ---- 要丟到 wrapper 的程式, 必須寫成特定的形式 一般有三個步驟. 1. 準備傳給 function 的變數 2. 呼叫 wrapper 計算 3. 處理計算結果 首先, 假如我要運算的 function 是 ret = fun(a, b); 那麼 fun() 有兩個輸入和一個輸出, 假設我希望計算下列的結果... ret(1) = fun(1, 10); ret(2) = fun(2, 10); ... ret(10) = fun(10, 10); ---- step.1 準備傳給 function 的變數 把要傳入 fun() 的傳入值用 cell-array 包裝起來 input = { {[1] [10]}, {[2] [10]}, ... {10, 10} }; * input 本身是一個大的 cell array * input 裡面有很多個小的 cell-array, eg. { [1] [10] } * 小 cell-array 裡, 每個元素, 分別對應到 function 的輸入參數 * 以 { [1] [10] } 來說, 到時候會算 fun([1], [10]) 概念上, 先把所有的 input 全部都準備好 用 Matlab 的術語來說, input 變數在描述 "job" input 變數裡每個小 cell, 在描述每個 "task" step2. 呼叫 wrapper 計算 fhandle = @fun; % 告訴 wrapper 要算的 function-name num_of_outputs = 1; % 告訴 wrapper 函數輸出的數目 is_destroy = 1; % 要不要清除在 jobmanager 上的運算結果 ret = dist_computing_wrapper_jobs( ... fhandle, num_of_outputs, input, is_destroy); step3. 處理計算結果 下面舉例, 把10次算完的結果畫成 10 張圖, 其他 case 依樣畫葫蘆即可 for ret_index=1:10 figure; plot( ret{ret_index} ); end ---- 已知問題. 1. 這個 wrapper 會自動把算錯的 task 重新執行, 如果原本程式就寫錯 執行過程中本來就會錯誤, 那套用這個 wrapper 會引發無窮迴圈 2. 下面標示藍色的程式區塊, 請按照需要設定 ---- function ret = dist_computing_wrapper_jobs(fhandle, num_of_outputs, input, is_destroy) %% 連線到 job manager jobmanager_name = 'jobmanager_xeon'; jobmanager_host = 'xeon'; jm = findResource( ... 'scheduler', 'type', 'jobmanager', ... 'name',jobmanager_name, ... 'LookupURL',jobmanager_host); %% 建立 jobs 和 tasks [len_job len_task] = size(input); jobs = []; for job_index=1:len_job % 準備 jobs 並且丟去給他算 %% 建立 job 和裡面的 task j = createJob(jm); % 描述 job 裡的 tasks for index_task = 1:len_task createTask(j, fhandle, num_of_outputs, input(job_index, index_task)); end % 設定檔案分享 set(j, 'FileDependencies', {'.' '../delsig'}); set(j, 'PathDependencies', {'.' '../delsig'}); % 設定 worker 在第一次執行 job 裡的 task 時, reset set(j, 'RestartWorker', true); % DEBUG 看 錯誤訊息 alltasks = get(j, 'Tasks'); set(alltasks, 'CaptureCommandWindowOutput', true); % 提交 job 並執行 submit(j); jobs = [jobs; j]; fprintf('Job #%d submitted\n', job_index); end %% 等所有jobs執行結束 & 取回執行結果 ret = []; for job_index=1:len_job waitForState(jobs(job_index)); % 檢查是不是所有 job 都正確 tasks = findTask(jobs(job_index)); error_task = []; for task_index=1:length(tasks) if(isempty(tasks(task_index).ErrorIdentifier) == 0) % Error task error_task = [error_task task_index]; end end ret_currjob = getAllOutputArguments(jobs(job_index)); % 取回執行結果 if(isempty(error_task) == 0) % 有錯誤的結果 fprintf('執行有碰到錯誤, 自動重算\n'); input_redo = input(error_task); ret_currjob_redo = dist_computing_wrapper_jobs(fhandle, num_of_outputs, input_redo, is_destroy); for error_task_index=1:length(error_task) ret_currjob(error_task(error_task_index)) = ret_currjob_redo(error_task_index); end end fprintf('Job #%d completed ^+++^\n', job_index); ret = [ret; ret_currjob]; end if(is_destroy) destroy(jobs) end -- ※ 發信站: 批踢踢實業坊(ptt.cc) ◆ From: 140.113.236.3 ※ 編輯: lihgong 來自: 140.113.236.3 (07/14 10:57)