新聞中心
本文轉(zhuǎn)載自微信公眾號「JavaKeeper」,作者海星。轉(zhuǎn)載本文請聯(lián)系JavaKeeper公眾號。

目前創(chuàng)新互聯(lián)公司已為成百上千的企業(yè)提供了網(wǎng)站建設(shè)、域名、虛擬主機、網(wǎng)站運營、企業(yè)網(wǎng)站設(shè)計、龍游網(wǎng)站維護等服務(wù),公司將堅持客戶導(dǎo)向、應(yīng)用為本的策略,正道將秉承"和諧、參與、激情"的文化,與客戶和合作伙伴齊心協(xié)力一起成長,共同發(fā)展。
管道模式,不屬于23種設(shè)計模式之一(是責(zé)任鏈模式的一種變體),但是在我們實際業(yè)務(wù)架構(gòu)中還是有很多場景適用的,主要用于將復(fù)雜的進(jìn)程分解成多個獨立的子任務(wù),像流水線一樣去執(zhí)行,了解一下唄
一、開場
假設(shè)我們有這樣的一個需求,讀取文件內(nèi)容,并過濾包含 “hello” 的字符串,然后將其反轉(zhuǎn)
Linux 一行搞定
- cat hello.txt | grep "hello" | rev
用世界上最好語言 Java 實現(xiàn)也很簡單
- File file = new File("/Users/starfish/Documents/hello.txt");
- String content = FileUtils.readFileToString(file,"UTF-8");
- List
helloStr = Stream.of(content).filter(s -> s.contains("hello")).collect(Collectors.toList()); - System.out.println(new StringBuilder(String.join("",helloStr)).reverse().toString());
再假設(shè)我們上邊的場景是在一個大型系統(tǒng)中,有這樣的數(shù)據(jù)流需要多次進(jìn)行復(fù)雜的邏輯處理,還是簡單粗暴的把一系列流程像上邊那樣放在一個大組件中嗎?
這樣的設(shè)計完全違背了單一職責(zé)原則,我們在增改,或者減少一些處理邏輯的時候,就必須對整個組件進(jìn)行改動??蓴U展性和可重用性幾乎沒有~~
那有沒有一種模式可以將整個處理流程進(jìn)行詳細(xì)劃分,劃分出的每個小模塊互相獨立且各自負(fù)責(zé)一小段邏輯處理,這些小模塊可以按順序連起來,前一模塊的輸出作為后一模塊的輸入,最后一個模塊的輸出為最終的處理結(jié)果呢?
如此一來修改邏輯時只針對某個模塊修改,添加或減少處理邏輯也可細(xì)化到某個模塊顆粒度,并且每個模塊可重復(fù)利用,可重用性大大增強。
恩,這就是我們要說的管道模式
二、定義
管道模式(Pipeline Pattern) 是責(zé)任鏈模式(Chain of Responsibility Pattern)的常用變體之一。
顧名思義,管道模式就像一條管道把多個對象連接起來,整體看起來就像若干個閥門嵌套在管道中,而處理邏輯就放在閥門上,需要處理的對象進(jìn)入管道后,分別經(jīng)過各個閥門,每個閥門都會對進(jìn)入的對象進(jìn)行一些邏輯處理,經(jīng)過一層層的處理后從管道尾出來,此時的對象就是已完成處理的目標(biāo)對象。
管道模式用于將復(fù)雜的進(jìn)程分解成多個獨立的子任務(wù)。每個獨立的任務(wù)都是可復(fù)用的,因此這些任務(wù)可以被組合成復(fù)雜的進(jìn)程。
PS:純的責(zé)任鏈模式在鏈上只會有一個處理器用于處理數(shù)據(jù),而管道模式上多個處理器都會處理數(shù)據(jù)。
三、角色
管道模式:對于管道模式來說,有 3 個對象:
- 閥門:處理數(shù)據(jù)的節(jié)點,或者叫過濾器、階段
- 管道:組織各個閥門
- 客戶端:構(gòu)造管道,并調(diào)用
四、實例
程序員還是看代碼消化才快些,我們用管道模式實現(xiàn)下文章開頭的小需求
1、處理器(管道的各個階段)
- public interface Handler {
- O process(I input);
- }
2、定義具體的處理器(閥門)
- public class FileProcessHandler implements Handler
{ - @Override
- public String process(File file) {
- System.out.println("===文件處理===");
- try{
- return FileUtils.readFileToString(file,"UTF-8");
- }catch (IOException e){
- e.printStackTrace();
- }
- return null;
- }
- }
- public class CharacterFilterHandler implements Handler
{ - @Override
- public String process(String input) {
- System.out.println("===字符過濾===");
- List
hello = Stream.of(input).filter(s -> s.contains("hello")).collect(Collectors.toList()); - return String.join("",hello);
- }
- }
- public class CharacterReverseHandler implements Handler
{ - @Override
- public String process(String input) {
- System.out.println("===反轉(zhuǎn)字符串===");
- return new StringBuilder(input).reverse().toString();
- }
- }
3、管道
- public class Pipeline {
- private final Handler currentHandler;
- Pipeline(Handler currentHandler) {
- this.currentHandler = currentHandler;
- }
Pipeline addHandler(Handler newHandler) { - return new Pipeline<>(input -> newHandler.process(currentHandler.process(input)));
- }
- O execute(I input) {
- return currentHandler.process(input);
- }
- }
4、 客戶端使用
- import lombok.val;
- public class ClientTest {
- public static void main(String[] args) {
- File file = new File("/Users/apple/Documents/hello.txt");
- val filters = new Pipeline<>(new FileProcessHandler())
- .addHandler(new CharacterFilterHandler())
- .addHandler(new CharacterReverseHandler());
- System.out.println(filters.execute(file));
- }
- }
5、結(jié)果
UML 類圖
產(chǎn)品他么的又來了,這次是刪除 hello.txt 中的 world 字符
三下五除二,精通 shell 編程的我搞定了
- cat hello.txt |grep hello |rev | tr -d 'world'
Java 怎么搞,你應(yīng)該很清晰了吧
五、優(yōu)缺點
Pipeline 模式的核心思想是將一個任務(wù)處理分解為若干個處理階段(Stage),其中每個處理階段的輸出作為下一個處理階段的輸入,并且各個處理階段都有相應(yīng)的工作者線程去執(zhí)行相應(yīng)的計算。因此,處理一批任務(wù)時,各個任務(wù)的各個處理階段是并行(Parallel)的。通過并行計算,Pipeline 模式使應(yīng)用程序能夠充分利用多核 CPU 資源,提高其計算效率。 ——《Java 多線程編程實戰(zhàn)指南》
優(yōu)點
- 將復(fù)雜的處理流程分解成獨立的子任務(wù),解耦上下游處理邏輯,也方便您對每個子任務(wù)的測試
- 被分解的子任務(wù)還可以被不同的處理進(jìn)程復(fù)用
- 在復(fù)雜進(jìn)程中添加、移除和替換子任務(wù)非常輕松,對已存在的進(jìn)程沒有任何影響,這就加大了該模式的擴展性和靈活性
- 對于每個處理單元又可以打補丁,做監(jiān)聽。(這就是切面編程了)
模式需要注意的東西
- Pipeline的深度:Pipeline 中 Pipe 的個數(shù)被稱作 Pipeline 的深度。所以我們在用 Pipeline 的深度與 JVM 宿主機的 CPU 個數(shù)間的關(guān)系。如果 Pipeline 實例所處的任務(wù)多屬于 CPU 密集型,那么深度最好不超過 Ncpu。如果 Pipeline 所處理的任務(wù)多屬于 I/O 密集型,那么 Pipeline 的深度最好不要超過 2*Ncpu。
- 基于線程池的 Pipe:如果 Pipe 實例使用線程池,由于有多個 Pipe 實例,更容易出現(xiàn)線程死鎖的問題,需要仔細(xì)考慮。
- 錯誤處理:Pipe 實例對其任務(wù)進(jìn)行過程中跑出的異??赡苄枰鄳?yīng) Pipe 實例之外進(jìn)行處理。
- 此時,處理方法通常有兩種:一是各個 Pipe 實例捕獲到異常后調(diào)用 PipeContext 實例的 handleError 進(jìn)行錯誤處理。另一個是創(chuàng)建一個專門負(fù)責(zé)錯我處理的 Pipe 實例,其他 Pipe 實例捕獲異常后提交相關(guān)數(shù)據(jù)給該 Pipe 實例處理。
- 可配置的 Pipeline:Pipeline 模式可以用代碼的方式將若干個 Pipe 實例添加,也可以用配置文件的方式實現(xiàn)動態(tài)方式添加 Pipe。
六、Java Function
如果,你的管道邏輯真的很簡單,也直接用 Java8 提供的 Function 就,具體實現(xiàn)如下這樣
- File file = new File("/Users/apple/Documents/hello.txt");
- Function
readFile = input -> { - System.out.println("===文件處理===");
- try{
- return FileUtils.readFileToString(input,"UTF-8");
- }catch (IOException e){
- e.printStackTrace();
- }
- return null;
- };
- Function
filterCharacter = input -> { - System.out.println("===字符過濾===");
- List
hello = Stream.of(input).filter(s -> s.contains("hello")).collect(Collectors.toList()); - return String.join("",hello);
- };
- Function
reverseCharacter = input -> { - System.out.println("===反轉(zhuǎn)字符串===");
- return new StringBuilder(input).reverse().toString();
- };
- final Function
pipe = readFile - .andThen(filterCharacter)
- .andThen(reverseCharacter);
- System.out.println(pipe.apply(file));
最后
但是,并不是一碰到這種類似流式處理的任務(wù)就需要用管道,Pipeline 模式中各個處理階段所用的工作者線程或者線程池,表示各個階段的輸入/輸出對象的創(chuàng)建和一定(進(jìn)出隊列)都有其自身的時間和空間開銷,所以使用 Pipeline 模式的時候需要考慮它所付出的代價。建議處理規(guī)模較大的任務(wù),否則可能得不償失。
參考
https://java-design-patterns.com/patterns/pipeline/
https://developer.aliyun.com/article/778865
https://yasinshaw.com/articles/108
《Java多線程編程實戰(zhàn)指南(設(shè)計模式篇)》
當(dāng)前名稱:讓我們一起學(xué)習(xí)管道模式,你會了嗎?
標(biāo)題URL:http://www.fisionsoft.com.cn/article/cdghcpe.html


咨詢
建站咨詢
