遷移桌面程序到MS Store(12)——WPF使用UWP InkToolbar和InkCanvas

我們在提到了對Win10 API的調用,但仍存在無法在WPF中使用UWP控件的問題,雖然都是XAML控件,但卻是兩套命名空間下的同名類型,無法混用。
人總會被現實打敗,強大如某軟也得向生活低頭,UWP一直沒有起色,某軟的老大又一心去搞Azure。Windows平台的重振,似乎想走回頭路,從1903版本開始,支持在.NET Framwork的WPF和WinForm工程中,直接使用部分的UWP控件了。首當其沖的,就是有點騷包的InkToolbar和InkCanvas。

接下來我們就來試試如何在WPF工程中,使用UWP的InkToolbar和InkCanvas。
首先創建一個空的WPF工程,完成后,在Nuget的搜索界面填入 Microsoft.Toolkit.Wpf.UI.Controls ,選中第一個進行安裝。

完成安裝后,打開MainWindow.xaml,添加對命名空間的引用xmlns:Controls=”clr-namespace:Microsoft.Toolkit.Wpf.UI.Controls;assembly=Microsoft.Toolkit.Wpf.UI.Controls”。接着就可以在<Grid>節點中添加UWP版本的InkToolbar和InkCanvas控件了。

<Window x:Class="WPFInkSample.MainWindow"
        xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
        xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
        xmlns:d="http://schemas.microsoft.com/expression/blend/2008"
        xmlns:mc="http://schemas.openxmlformats.org/markup-compatibility/2006"
        xmlns:local="clr-namespace:WPFInkSample"
        xmlns:Controls="clr-namespace:Microsoft.Toolkit.Wpf.UI.Controls;assembly=Microsoft.Toolkit.Wpf.UI.Controls"
        mc:Ignorable="d"
        Title="MainWindow" Height="450" Width="800">
    <Grid >
        <Grid.RowDefinitions>
            <RowDefinition Height="Auto"/>
            <RowDefinition Height="*"/>
        </Grid.RowDefinitions>
        <Controls:InkToolbar TargetInkCanvas="{x:Reference myInkCanvas}" Grid.Row="0" />
        <Controls:InkCanvas x:Name="myInkCanvas" Grid.Row="1" />
    </Grid>
</Window>

同時我們還需要在MainWindow.xaml.cs中設置InputDeviceTypes。

    public partial class MainWindow : Window
    {
        public MainWindow()
        {
            InitializeComponent();
            this.myInkCanvas.InkPresenter.InputDeviceTypes = CoreInputDeviceTypes.Mouse | CoreInputDeviceTypes.Pen;
        }
    }

然後按下F5運行,某軟的騷操作來了……因為僅在1903以後的版本才支持這種騷操作(10.0.18226是稍早的preview版),所以需要做額外的處理才可以。

我們這裡有兩種選擇,一是通過來打包這個WPF程序,然後在Packaging工程的屬性里,將Target version和Minimum version同時設置為Windows 10, version 1903 (10.0.18362) 。這是MSDN上推薦的標準做法,這樣做的好處在於,打包好的程序可以直接上傳MS Store。
如果我們想保持exe的可執行文件形式,還有另一種做法,在Project文件上右鍵點擊Add->New Item,添加一個manifest文件。
在這個文件中,找到<!–Windows 10–>,然後做如下編輯:

  <compatibility xmlns="urn:schemas-microsoft-com:compatibility.v1">
    <application>
      <!-- A list of the Windows versions that this application has been tested on
           and is designed to work with. Uncomment the appropriate elements
           and Windows will automatically select the most compatible environment. -->
  
      <!-- Windows Vista -->
      <!--<supportedOS Id="{e2011457-1546-43c5-a5fe-008deee3d3f0}" />-->
  
      <!-- Windows 7 -->
      <!--<supportedOS Id="{35138b9a-5d96-4fbd-8e2d-a2440225f93a}" />-->
  
      <!-- Windows 8 -->
      <!--<supportedOS Id="{4a2f28e3-53b9-4441-ba9c-d69d4a4a6e38}" />-->
  
      <!-- Windows 8.1 -->
      <!--<supportedOS Id="{1f676c76-80e1-4239-95bb-83d0f6d0da78}" />-->
  
      <!-- Windows 10 -->
      <maxversiontested Id="10.0.18362.0"/>
      <supportedOS Id="{8e0f7a12-bfb3-4fe8-b9a5-48fd50a15a9a}" />
  
    </application>
  </compatibility>

保存后,再通過F5運行,即發現一切正常,不在出現之前的運行時錯誤了。
本篇我們介紹了如何在WPF工程中使用UWP InkToolbar和InkCavas。因為這個功能僅在1903后的版本支持,所以下一篇我們會介紹如何簡單地判斷Win10 API 版本,在運行時判斷是否執行對應版本的代碼。
Github:

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

收購3c,收購IPHONE,收購蘋果電腦-詳細收購流程一覽表

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

※公開收購3c價格,不怕被賤賣!

※自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

地理文本處理技術在高德的演進(上)

一、背景

地圖App的功能可以簡單概括為定位,搜索,導航三部分,分別解決在哪裡,去哪裡,和怎麼去的問題。高德地圖的搜索場景下,輸入的是,地理相關的檢索query,用戶位置,App圖面等信息,輸出的是,用戶想要的POI。如何能夠更加精準地找到用戶想要的POI,提高滿意度,是評價搜索效果的最關鍵指標。

一個搜索引擎通常可以拆分成query分析、召回、排序三個部分,query分析主要是嘗試理解query表達的含義,為召回和排序給予指導。

地圖搜索的query分析不僅包括通用搜索下的分詞,成分分析,同義詞,糾錯等通用NLP技術,還包括城市分析,wherewhat分析,路徑規劃分析等特定的意圖理解方式。

常見的一些地圖場景下的query意圖表達如下:

query分析是搜索引擎中策略密集的場景,通常會應用NLP領域的各種技術。地圖場景下的query分析,只需要處理地理相關的文本,多樣性不如網頁搜索,看起來會簡單一些。但是,地理文本通常比較短,並且用戶大部分的需求是唯一少量結果,要求精準度非常高,如何能夠做好地圖場景下的文本分析,並提升搜索結果的質量,是充滿挑戰的。

二、整體技術架構

搜索架構

類似於通用檢索的架構,地圖的檢索架構包括query分析,召回,排序三個主要部分。先驗的,用戶的輸入信息可以理解為多種意圖的表達,同時下發請求嘗試獲取檢索結果。后驗的,拿到每種意圖的檢索結果時,進行綜合判斷,選擇效果最好的那個。

query分析流程

具體的意圖理解可分為基礎query分析和應用query分析兩部分,基礎query分析主要是使用一些通用的NLP技術對query進行理解,包括分析,成分分析,省略,同義詞,糾錯等。應用query分析主要是針對地圖場景里的特定問題,包括分析用戶目標城市,是否是where+what表達,是否是從A到B的路徑規劃需求表達等。

整體技術演進

在地里文本處理上整體的技術演進經歷了規則為主,到逐步引入機器學習,到機器學習全面應用的過程。由於搜索模塊是一個高併發的線上服務,對於深度模型的引入有比較苛刻的條件,但隨着性能問題逐漸被解決,我們從各個子方向逐步引入深度學習的技術,進行新一輪的效果提升。

NLP領域技術在最近幾年取得了日新月異的發展,bert,XLNet等模型相繼霸榜,我們逐步統一化各個query分析子任務,使用統一的向量表示對進行用戶需求進行表達,同時進行seq2seq的多任務學習,在效果進一步提升的基礎上,也能夠保證系統不會過於臃腫。

本文就高德地圖搜索的地理文本處理,介紹相關的技術在過去幾年的演進。我們將選取一些點分上下兩篇進行介紹,上篇主要介紹搜索引擎中一些通用的query分析技術,包括糾錯,改寫和省略。下篇着重介紹地圖場景中特有query分析技術,包括城市分析,wherewhat分析,路徑規劃。

三、通用query分析技術演進

3.1 糾錯

在搜索引擎中,用戶輸入的檢索詞(query)經常會出現拼寫錯誤。如果直接對錯誤的query進行檢索,往往不會得到用戶想要的結果。因此不管是通用搜索引擎還是垂直搜索引擎,都會對用戶的query進行糾錯,最大概率獲得用戶想搜的query。

在目前的地圖搜索中,約有6%-10%的用戶請求會輸入錯誤,所以query糾錯在地圖搜索中是一個很重要的模塊,能夠極大的提升用戶搜索體驗。

在搜索引擎中,低頻和中長尾問題往往比較難解決,也是糾錯模塊面臨的主要問題。另外,地圖搜索和通用搜索,存在一個明顯的差異,地圖搜索query結構化比較突出,query中的片段往往包含一定的位置信息,如何利用好query中的結構化信息,更好地識別用戶意圖,是地圖糾錯獨有的挑戰。

常見錯誤分類

(1) 拼音相同或者相近,例如: 盤橋物流園-潘橋物流園
(2) 字形相近,例如: 河北冒黎-河北昌黎
(3) 多字或者漏字,例如: 泉州州頂街-泉州頂街

糾錯現狀

原始糾錯模塊包括多種召回方式,如:

拼音糾錯:主要解決短query的拼音糾錯問題,拼音完全相同或者模糊音作為糾錯候選。
拼寫糾錯:也叫形近字糾錯,通過遍歷替換形近字,用query熱度過濾,加入候選。
組合糾錯:通過翻譯模型進行糾錯替換,資源主要是通過query對齊挖掘的各種替換資源。

組合糾錯翻譯模型計算公式:

其中p(f)是語言模型,p(f|e)是替換模型。

問題1:召回方式存在缺陷。目前query糾錯模塊主要召回策略包括拼音召回、形近字召回,以及替換資源召回。對於低頻case,解決能力有限。

問題2:排序方式不合理。糾錯按照召回方式分為幾個獨立的模塊,分別完成相應的召回和排序,不合理。

技術改造

改造1:基於空間關係的實體糾錯
原始的糾錯主要是基於用戶session挖掘片段替換資源,所以對於低頻問題解決能力有限。但是長尾問題往往集中在低頻,所以低頻問題是當前的痛點。

地圖搜索與通用搜索引擎有個很大的區別在於,地圖搜索query比較結構化,例如北京市朝陽區阜榮街10號首開廣場。我們可以對query進行結構化切分(也就是地圖中成分分析的工作),得到這樣一種帶有類別的結構化描述,北京市【城市】朝陽區【區縣】阜榮街【道路】10號【門址後綴】首開廣場【通用實體】。

同時,我們擁有權威的地理知識數據,利用權威化的地理實體庫進行前綴樹+後綴樹的索引建庫,提取疑似糾錯的部分在索引庫中進行拉鏈召回,同時利用實體庫中的邏輯隸屬關係對糾錯結果進行過濾。實踐表明,這種方式對低頻的區劃或者實體的錯誤有着明顯的作用。

基於字根的字形相似度計算

上文提到的排序策略裏面通過字形的編輯距離作為排序的重要特徵,這裏我們開發了一個基於字根的字形相似度計算策略,對於編輯距離的計算更為細化和準確。漢字信息有漢字的字根拆分詞表和漢字的筆畫數。

將一個漢字拆分成多個字根,尋找兩個字的公共字根,根據公共字根筆畫數來計算連個字的相似度。

改造2:排序策略重構

原始的策略召回和排序策略耦合,導致不同的召回鏈路,存在顧此失彼的情況。為了能夠充分發揮各種召回方式的優勢,急需要對召回和排序進行解耦並進行全局排序優化。為此我們增加了排序模塊,將流程分為召回和排序兩階段。

模型選擇

對於這個排序問題,這裏我們參考業界的實踐,使用了基於pair-wise的gbrank進行模型訓練。

樣本建設

通過線上輸出結合人工review的方式構造樣本。

特徵建設
(1) 語義特徵。如統計語言模型。
(2) 熱度特徵。pv,點擊等。
(3) 基礎特徵。編輯距離,切詞和成分特徵,累積分佈特徵等。

這裏解決了糾錯模塊兩個痛點問題,一個是在地圖場景下的大部分低頻糾錯問題。另一個是重構了模塊流程,將召回和排序解耦,充分發揮各個召回鏈路的作用,召回方式更新后只需要重訓排序模型即可,使得模塊更加合理,為後面的深度模型升級打下良好的基礎。後面在這個框架下,我們通過深度模型進行seq2seq的糾錯召回,取得了進一步的收益。

3.2 改寫

糾錯作為query變換的一種方式的召回策略存在諸多限制,對於一些非典型的query變換表達,存在策略的空白。比如query=永城市新農合辦,目標POI是永城市新農合服務大廳。用戶的低頻query,往往得不到較好搜索效果,但其實用戶描述的語義與主poi的高頻query是相似的。

這裏我們提出一種query改寫的思路,可以將低頻query改寫成語義相似的高頻query,以更好地滿足用戶需求多樣性的表達。

這是一個從無到有的實現。用戶表達的query是多樣的,使用規則表達顯然是難以窮盡的,直觀的思路是通過向量的方式召回,但是向量召回的方式很可能出現泛化過多,不適應地圖場景的檢索的問題,這些都是要在實踐過程中需要考慮的問題。

方案

整體看,方案包括召回,排序,過濾,三個階段。

召回階段

我們調研了句子向量表示的幾種方法,選擇了算法簡單,效果和性能可以和CNN,RNN媲美的SIF(Smooth Inverse Frequency)。向量召回可以使用開源的Faiss向量搜索引擎,這裏我們使用了阿里內部的性能更好的的向量檢索引擎。

排序階段
樣本構建
原query與高頻query候選集合,計算語義相似度,選取語義相似度的TOPK,人工標註的訓練樣本。

特徵建設

1.基礎文本特徵
2.編輯距離
3.組合特徵

模型選擇

使用xgboost進行分數回歸

過濾階段
通過向量召回的query過度泛化非常嚴重,為了能夠在地圖場景下進行應用,增加了對齊模型。使用了兩種統計對齊模型giza和fastalign,實驗證明二者效果幾乎一致,但fastalign在性能上好於giza,所以選擇fastalign。

通過對齊概率和非對齊概率,對召回的結果進行進一步過濾,得到精度比較高的結果。

query改寫填補了原始query分析模塊中一些低頻表達無法滿足的空白,區別於同義詞或者糾錯的顯式query變換表達,句子的向量表示是相似query的一種隱式的表達,有其相應的優勢。

向量表示和召回也是深度學習模型逐步開始應用的嘗試。同義詞,改寫,糾錯,作為地圖中query變換主要的三種方式,以往在地圖模塊里比較分散,各司其職,也會有互相重疊的部分。在後續的迭代升級中,我們引入了統一的query變換模型進行改造,在取得收益的同時,也擺脫掉了過去很多規則以及模型耦合造成的歷史包袱。

3.2 省略

在地圖搜索場景里,有很多query包含無效詞,如果用全部query嘗試去召回很可能不能召回有效結果。如廈門市搜”湖裡區縣后高新技術園新捷創運營中心11樓1101室 縣后brt站”。這就需要一種檢索意圖,在不明顯轉義下,使用核心term進行召回目標poi候選集合,當搜索結果無果或者召回較差時起到補充召回的作用。

在省略判斷的過程中存在先驗后驗平衡的問題。省略意圖是一個先驗的判斷,但是期望的結果是能夠進行POI有效召回,和POI的召回字段的現狀密切相關。如何能夠在策略設計的過程中保持先驗的一致性,同時能夠在後驗POI中拿到相對好的效果,是做好省略模塊比較困難的地方。

原始的省略模塊主要是基於規則進行的,規則依賴的主要特徵是上游的成分分析特徵。由於基於規則擬合,模型效果存在比較大的優化空間。另外,由於強依賴成分分析,模型的魯棒性並不好。

技術改造

省略模塊的改造主要完成了規則到crf模型的升級,其中也離線應用了深度學習模型輔助樣本生成。

模型選擇

識別出來query哪些部分是核心哪些部分是可以省略的,是一個序列標註問題。在淺層模型的選型中,顯而易見地,我們使用了crf模型。

特徵建設

term特徵。使用了賦權特徵,詞性,先驗詞典特徵等。
成分特徵。仍然使用成分分析的特徵。
統計特徵。統計片段的左右邊界熵,城市分佈熵等,通過分箱進行離散化。

樣本建設

項目一期我們使用了使用線上策略粗標,外包細標的方式,構造了萬級的樣本供crf模型訓練。

但是省略query的多樣性很高,使用萬級的樣本是不夠的,在線上模型無法快速應用深度模型的情況下,我們使用了boostraping的方式,藉助深度模型的泛化能力,離線構造了大量樣本。

使用了這種方式,樣本從萬級很容易擴充到百萬級,我們仍然使用crf模型進行訓練和線上應用。

在省略模塊,我們完成了規則到機器學習的升級,引入了成分以外的其他特徵,提升了模型的魯棒性。同時並且利用離線深度學習的方式進行樣本構造的循環,提升了樣本的多樣性,使得模型能夠更加接近crf的天花板。

在後續深度模型的建模中,我們逐步擺脫了對成分分析特徵的依賴,對query到命中poi核心直接進行建模,構建大量樣本,取得了進一步的收益。

 

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

3c收購,鏡頭 收購有可能以全新價回收嗎?

台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

賣IPHONE,iPhone回收,舊換新!教你怎麼賣才划算?

生產者-消費者模型在Hudi中的應用

介紹

生產者-消費者模型用於解耦生產者與消費者,平衡兩者之間的能力不平衡,該模型廣泛應用於各個系統中,Hudi也使用了該模型控制對記錄的處理,即記錄會被生產者生產至隊列中,然後由消費者從隊列中消費,更具體一點,對於更新操作,生產者會將文件中老的記錄放入隊列中等待消費者消費,消費后交由HoodieMergeHandle處理;對於插入操作,生產者會將新記錄放入隊列中等待消費者消費,消費后交由HandleCreateHandle處理。

入口

前面的文章中提到過無論是HoodieCopyOnWriteTable#handleUpdate處理更新時直接生成了一個SparkBoundedInMemoryExecutor對象,還是HoodieCopyOnWriteTable#handleInsert處理插入時生成了一個CopyOnWriteLazyInsertIterable對象,再迭代時調用該對象的CopyOnWriteLazyInsertIterable#computeNext方法生成SparkBoundedInMemoryExecutor對象。最後兩者均會調用SparkBoundedInMemoryExecutor#execute開始記錄的處理,該方法核心代碼如下

  public E execute() {
    try {
      ExecutorCompletionService<Boolean> producerService = startProducers();
      Future<E> future = startConsumer();
      // Wait for consumer to be done
      return future.get();
    } catch (Exception e) {
      throw new HoodieException(e);
    }
  }

該方法會啟動所有生產者和單個消費者進行處理。

Hudi定義了BoundedInMemoryQueueProducer接口表示生產者,其子類實現如下

  • FunctionBasedQueueProducer,基於Function來生產記錄,在合併日誌log文件和數據parquet文件時使用,以便提供RealTimeView
  • IteratorBasedQueueProducer,基於迭代器來生產記錄,在插入更新時使用。

定義了BoundedInMemoryQueueConsumer類表示消費者,其主要子類實現如下

  • CopyOnWriteLazyInsertIterable$CopyOnWriteInsertHandler,主要處理CopyOnWrite表類型時的插入。
    • MergeOnReadLazyInsertIterable$MergeOnReadInsertHandler,主要處理MergeOnRead

表類型時的插入,其為CopyOnWriteInsertHandler的子類。

  • CopyOnWriteLazyInsertIterable$UpdateHandler,主要處理CopyOnWrite表類型時的更新。

整個生產消費相關的類繼承結構非常清晰。

對於生產者的啟動,startProducers方法核心代碼如下

  public ExecutorCompletionService<Boolean> startProducers() {
    // Latch to control when and which producer thread will close the queue
    final CountDownLatch latch = new CountDownLatch(producers.size());
    final ExecutorCompletionService<Boolean> completionService =
        new ExecutorCompletionService<Boolean>(executorService);
    producers.stream().map(producer -> {
      return completionService.submit(() -> {
        try {
          preExecute();
          producer.produce(queue);
        } catch (Exception e) {
          logger.error("error producing records", e);
          queue.markAsFailed(e);
          throw e;
        } finally {
          synchronized (latch) {
            latch.countDown();
            if (latch.getCount() == 0) {
              // Mark production as done so that consumer will be able to exit
              queue.close();
            }
          }
        }
        return true;
      });
    }).collect(Collectors.toList());
    return completionService;
  }

該方法使用CountDownLatch來協調生產者線程與消費者線程的退出動作,然後調用produce方法開始生產,對於插入更新時的IteratorBasedQueueProducer而言,其核心代碼如下

  public void produce(BoundedInMemoryQueue<I, ?> queue) throws Exception {
    ...
    while (inputIterator.hasNext()) {
      queue.insertRecord(inputIterator.next());
    }
    ...
  }

可以看到只要迭代器還有記錄(可能為插入時的新記錄或者更新時的舊記錄),就會往隊列中不斷寫入。

對於消費者的啟動,startConsumer方法的核心代碼如下

  private Future<E> startConsumer() {
    return consumer.map(consumer -> {
      return executorService.submit(() -> {
        ...
        preExecute();
        try {
          E result = consumer.consume(queue);
          return result;
        } catch (Exception e) {
          queue.markAsFailed(e);
          throw e;
        }
      });
    }).orElse(CompletableFuture.completedFuture(null));
  }

消費時會先進行執行前的準備,然後開始消費,其中consume方法的核心代碼如下

  public O consume(BoundedInMemoryQueue<?, I> queue) throws Exception {
    Iterator<I> iterator = queue.iterator();

    while (iterator.hasNext()) {
      consumeOneRecord(iterator.next());
    }

    // Notifies done
    finish();

    return getResult();
  }

可以看到只要隊列中還有記錄,就可以獲取該記錄,然後調用不同BoundedInMemoryQueueConsumer子類的consumeOneRecord進行更新插入處理。

值得一提的是Hudi對隊列進行了流控,生產者不能無限制地將記錄寫入隊列中,隊列緩存的大小由用戶配置,隊列能放入記錄的條數由採樣的記錄大小和隊列緩存大小控制。

在生產時,會調用BoundedInMemoryQueue#insertRecord將記錄寫入隊列,其核心代碼如下

  public void insertRecord(I t) throws Exception {
    ...
    rateLimiter.acquire();
    // We are retrieving insert value in the record queueing thread to offload computation
    // around schema validation
    // and record creation to it.
    final O payload = transformFunction.apply(t);
    adjustBufferSizeIfNeeded(payload);
    queue.put(Option.of(payload));
  }

首先獲取一個許可(Semaphore),未成功獲取會被阻塞直至成功獲取,然後獲取記錄的負載以便調整隊列,然後放入內部隊列(LinkedBlockingQueue)中,其中adjustBufferSizeIfNeeded方法的核心代碼如下

  private void adjustBufferSizeIfNeeded(final O payload) throws InterruptedException {
    if (this.samplingRecordCounter.incrementAndGet() % RECORD_SAMPLING_RATE != 0) {
      return;
    }

    final long recordSizeInBytes = payloadSizeEstimator.sizeEstimate(payload);
    final long newAvgRecordSizeInBytes =
        Math.max(1, (avgRecordSizeInBytes * numSamples + recordSizeInBytes) / (numSamples + 1));
    final int newRateLimit =
        (int) Math.min(RECORD_CACHING_LIMIT, Math.max(1, this.memoryLimit / newAvgRecordSizeInBytes));

    // If there is any change in number of records to cache then we will either release (if it increased) or acquire
    // (if it decreased) to adjust rate limiting to newly computed value.
    if (newRateLimit > currentRateLimit) {
      rateLimiter.release(newRateLimit - currentRateLimit);
    } else if (newRateLimit < currentRateLimit) {
      rateLimiter.acquire(currentRateLimit - newRateLimit);
    }
    currentRateLimit = newRateLimit;
    avgRecordSizeInBytes = newAvgRecordSizeInBytes;
    numSamples++;
  }

首先看是否已經達到採樣頻率,然後計算新的記錄平均大小和限流速率,如果新的限流速率大於當前速率,則可釋放一些許可(供阻塞的生產者獲取後繼續生產),否則需要獲取(回收)一些許可(許可變少後生產速率自然就降低了)。該操作可根據採樣的記錄大小動態調節速率,不至於在記錄負載太大和記錄負載太小時,放入同等個數,從而起到動態調節作用。

在消費時,會調用BoundedInMemoryQueue#readNextRecord讀取記錄,其核心代碼如下

  private Option<O> readNextRecord() {
    ...
    rateLimiter.release();
    Option<O> newRecord = Option.empty();
    while (expectMoreRecords()) {
      try {
        throwExceptionIfFailed();
        newRecord = queue.poll(RECORD_POLL_INTERVAL_SEC, TimeUnit.SECONDS);
        if (newRecord != null) {
          break;
        }
      } catch (InterruptedException e) {
        throw new HoodieException(e);
      }
    }
    ...

    if (newRecord != null && newRecord.isPresent()) {
      return newRecord;
    } else {
      // We are done reading all the records from internal iterator.
      this.isReadDone.set(true);
      return Option.empty();
    }
  }

可以看到首先會釋放一個許可,然後判斷是否還可以讀取記錄(還在生產或者停止生產但隊列不為空都可讀取),然後從內部隊列獲取記錄或返回。

上述便是生產者-消費者在Hudi中應用的分析。

總結

Hudi採用了生產者-消費者模型來控制記錄的處理,與傳統多生產者-多消費者模型不同的是,Hudi現在只支持多生產者-單消費者模型,單消費者意味着Hudi暫時不支持文件的併發寫入。而對於生產消費的隊列的實現,Hudi並未僅僅只是基於LinkedBlockingQueue,而是採用了更精細化的速率控制,保證速率會隨着記錄負載大小的變化和配置的隊列緩存大小而動態變化,這也降低了系統發生OOM的概率。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

收購3c,收購IPHONE,收購蘋果電腦-詳細收購流程一覽表

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

※公開收購3c價格,不怕被賤賣!

※自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

Alibaba Nacos 學習(五):K8S Nacos搭建,使用nfs

 

準備環境

Centos7  192.168.50.21 k8s-master 2G
Centos7  192.168.50.22 k8s-node01 2G
Centos7  192.168.50.23 k8s-node02 2G

K8S集群搭建參考 

 

master安裝好Git ,yum install git

master,node01,node02  安裝 nfs-utils

yum install nfs-utils

master,node01,node02添加nfs exports配置,為了解決後續的nfs報錯異常

/data/mysql-slave *(insecure,fsid=0,rw,async,no_root_squash)
/data/mysql-master *(insecure,fsid=0,rw,async,no_root_squash)
/data/nfs-share *(rw,fsid=0,sync,no_root_squash)
mysql-slave 數據庫從庫 
mysql-master 數據庫主庫
nfs-share nocas文件掛在目錄

後面的yml中會提到
master,node01,node02創建目錄
mkdir /data/mysql-slave
mkdir /data/mysql-master
mkdir /data/nfs-share 

 master 克隆代碼

   git clone https://github.com/nacos-group/nacos-k8s.git

克隆完成進入以下目錄

 cd /opt/nacos-k8s/deploy/

 

1.nfs安裝

kubectl create -f nfs/rbac.yaml 
kubectl create -f nfs/class.yaml 

修改nfs/deployment.yaml IP配置

 

 

 

kubectl create -f nfs/deployment.yaml

查看安裝狀態

kubectl get pod -l app=nfs-client-provisioner

 

 

 

2.mysql部署

cd /opt/nacos-k8s/deploy/mysql/

修改數據配置文件ip

vi mysql-master-nfs.yaml

 

 

 部署主庫

kubectl create -f mysql-master-nfs.yaml 

修改存庫ip

vi mysql-slave-nfs.yaml
kubectl create -f mysql-slave-nfs.yaml 

主從部署非常慢 耐心等待,如果報nfs相關的錯,重啟nfs即可

service nfs restart

 

 

3. 部署nacos

cd /opt/nacos-k8s/deploy/nacos/

 

 

 

 

 

kubectl create -f nacos-pvc-nfs.yaml 

 查看訪問端口

kubectl get svc|grep nacos

 

 

 

 

 查看K8S集群狀態

 

 Failed to pull image “nacos/nacos-server:latest”: rpc error: code = Unknown desc = context canceled

進去對應節點機器 ,拉取鏡像后,重新應用即可

kubectl apply -f

 4. 部署問題

部署過程中大部分都是NFS問題

可以參考

mount.nfs: No route to host
Warning FailedMount 100s (x5 over 10m) kubelet, node2 Unable to mount volumes for pod “nfs-client-provisioner-594f778474-whhb5_default(56aef93a-9d31-11e9-a4c4-00163e069f44)”: timeout expired waiting for volumes to attach or mount for pod “default”/”nfs-client-provisioner-594f778474-whhb5”. list of unmounted volumes=[nfs-client-root]. list of unattached volumes=[nfs-client-root nfs-client-provisioner-token-8dcrx]

修改deployment.yaml中server的IP地址為某個node節點的內網IP地址,圖1已標註

 

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

收購3c,收購IPHONE,收購蘋果電腦-詳細收購流程一覽表

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

※公開收購3c價格,不怕被賤賣!

※自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

如何使用偽類選擇器

偽類選擇器介紹

  • 偽類選擇器就是用來給超級鏈接設置不同的狀態樣式。
  • 超級鏈接分為4種狀態如:正常狀態、訪問過後狀態、鼠標放上狀態、激活狀態。

偽類選擇器說明表

選擇器 描述
:link 向未被訪問的超級鏈接添加樣式,正常狀態。
:visited 向已經被訪問的超級鏈接添加樣式,訪問過後狀態。
:hover 當鼠標懸浮在超級鏈接上方時,向超級鏈接添加樣式,鼠標放上狀態。
:active 鼠標放在超級鏈接上並且點擊的一瞬間,向超級鏈接添加樣式,激活狀態。

偽類選擇器實踐

  • 讓我們進入偽類選擇器實踐,實踐內容將超級鏈接4種狀態進行演示,演示效果如:將向未被訪問的超級鏈接文本顏色設置為紅色、已經被訪問的超級鏈接文本顏色設置為綠色、當鼠標懸浮在超級鏈接上文本顏色設置為紫色、用鼠標點擊超級鏈接的一瞬間文本顏色設置為藍色

  • 代碼塊

<!DOCTYPE html>
<html lang="en">

<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <meta http-equiv="X-UA-Compatible" content="ie=edge">
    <title>偽類選擇器</title>
    <style>
        a:link{
            color:red;
        }
        a:visited{
            color: lime;
        }
        a:hover{
            color: purple;
        }
        a:active{
            color: blue;
        }
    </style>
</head>
  
<body>
    <a href="https://www.cnblogs.com/lq000122/">微笑是最初的信仰</a>
</body>
</html>
  • 正常狀態結果圖

  • 鼠標放上狀態結果圖

  • 激活狀態結果圖

  • 訪問過後狀態

總結

  • 超級鏈接的不同狀態他其實是由順序,也就是說偽類選擇器設置其實是順序的,如果按照偽類選擇器的順序,那麼設置的樣式就不會被渲染。
  • 順序:linkvisitedhoveractive

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

3c收購,鏡頭 收購有可能以全新價回收嗎?

台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

賣IPHONE,iPhone回收,舊換新!教你怎麼賣才划算?

PHP 的 self 關鍵字用法

之前有人詢問 self 關鍵字的用法,答案是比較明顯的:靜態成員函數內不能用 this 調用非成員函數,但可以用 self 調用靜態成員函數/變量/常量;其他成員函數可以用 self 調用靜態成員函數以及非靜態成員函數。隨着討論的深入,發現 self 並沒有那麼簡單。鑒於此,本文先對幾個關鍵字做對比和區分,再總結 self 的用法。

parentstatic 以及 this 的區別

要想將徹底搞懂 self ,要與 parentstatic 以及 this 區分開。以下分別做對比。

parent

selfparent 的區分比較容易: parent 引用父類/基類被隱蓋的方法(或變量), self則引用自身方法(或變量)。例如構造函數中調用父類構造函數:

class Base {
    public function __construct() {
        echo "Base contructor!", PHP_EOL;
    }
}

class Child {
    public function __construct() {
        parent::__construct();
        echo "Child contructor!", PHP_EOL;
    }
}

new Child;
// 輸出:
// Base contructor!
// Child contructor!

 

static

static 常規用途是修飾函數或變量使其成為類函數和類變量,也可以修飾函數內變量延長其生命周期至整個應用程序的生命周期。但是其與 self 關聯上是PHP 5.3以來引入的新用途:靜態延遲綁定。

有了 static 的靜態延遲綁定功能,可以在運行時動態確定歸屬的類。例如:

class Base {
    public function __construct() {
        echo "Base constructor!", PHP_EOL;
    }

    public static function getSelf() {
        return new self();
    }

    public static function getInstance() {
        return new static();
    }

    public function selfFoo() {
        return self::foo();
    }

    public function staticFoo() {
        return static::foo();
    }

    public function thisFoo() {
        return $this->foo();
    }

    public function foo() {
        echo  "Base Foo!", PHP_EOL;
    }
}

class Child extends Base {
    public function __construct() {
        echo "Child constructor!", PHP_EOL;
    }

    public function foo() {
        echo "Child Foo!", PHP_EOL;
    }
}

$base = Child::getSelf();
$child = Child::getInstance();

$child->selfFoo();
$child->staticFoo();
$child->thisFoo();

 

程序輸出結果如下:

Base constructor!
Child constructor!
Base Foo!
Child Foo!
Child Foo!

 

在函數引用上, selfstatic 的區別是:對於靜態成員函數, self 指向代碼當前類, static 指向調用類;對於非靜態成員函數, self 抑制多態,指向當前類的成員函數, static 等同於 this ,動態指向調用類的函數。

parentselfstatic 三個關鍵字聯合在一起看挺有意思,分別指向父類、當前類、子類,有點“過去、現在、未來”的味道。

this

selfthis 是被討論最多,也是最容易引起誤用的組合。兩者的主要區別如下:

  1. this 不能用在靜態成員函數中, self 可以;
  2. 對靜態成員函數/變量的訪問, 建議 用 self ,不要用 $this::$this-> 的形式;
  3. 對非靜態成員變量的訪問,不能用 self ,只能用 this ;
  4. this 要在對象已經實例化的情況下使用, self 沒有此限制;
  5. 在非靜態成員函數內使用, self 抑制多態行為,引用當前類的函數;而 this 引用調用類的重寫(override)函數(如果有的話)。

self 的用途

看完與上述三個關鍵字的區別, self 的用途是不是呼之即出?一句話總結,那就是: self總是指向“當前類(及類實例)”。詳細說則是:

  1. 替代類名,引用當前類的靜態成員變量和靜態函數;
  2. 抑制多態行為,引用當前類的函數而非子類中覆蓋的實現;

槽點

  1. 這幾個關鍵字中,只有 this 要加 $ 符號且必須加,強迫症表示很難受;
  2. 靜態成員函數中不能通過 $this-> 調用非靜態成員函數,但是可以通過 self:: 調用,且在調用函數中未使用 $this-> 的情況下還能順暢運行。此行為貌似在不同PHP版本中表現不同,在當前的7.3中ok;
  3. 在靜態函數和非靜態函數中輸出 self ,猜猜結果是什麼?都是 string(4) "self" ,迷之輸出;
  4. return $this instanceof static::class; 會有語法錯誤,但是以下兩種寫法就正常:
    $class = static::class;
    return $this instanceof $class;
    // 或者這樣:
    return $this instanceof static;

 

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

收購3c,收購IPHONE,收購蘋果電腦-詳細收購流程一覽表

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

※公開收購3c價格,不怕被賤賣!

※自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

串燒 JavaCAS相關知識

JMM與問題引入

為啥先說JMM,因為CAS的實現類中維護的變量都被volatile修飾, 這個volatile 是遵循JMM規範(不是百分百遵循,下文會說)實現的保證多線程併發訪問某個變量實現線程安全的手段

一連串的知識點慢慢縷

首先說什麼是JMM, JMM就是大家所說的java的內存模型, 它是人們在邏輯上做出的劃分, 或者可以將JMM當成是一種規範, 有哪些規範呢? 如下

  1. 可見性: 某一個線程對內存中的變量做出改動后,要求其他的線程在第一事件內馬上馬得到通知,在CAS的實現中, 可見性其實是通過不斷的while循環讀取而得到的通知, 而不是被動的得到通知
  2. 原子性: 線程在執行某個操作的時,要麼一起成功,要麼就一起失敗
  3. 有序性: 為了提高性能, 編譯器處理器會進行指令的重排序, 源碼-> 編譯器優化重排 -> 處理器優化重排 -> 內存系統重排 -> 最終執行的命令

JVM運行的實體是線程, 每一個線程在創建之後JVM都會為其創建一個工作空間, 這個工作空間是每一個線程之間的私有空間, 並且任何兩條線程之間的都不能直接訪問到對方的工作空間, 線程之間的通信,必須通過共享空間來中轉完成

JMM規定所有的變量全部存在主內存中,主內存是一塊共享空間,那麼如果某個線程相對主內存中共享變量做出修改怎麼辦呢? 像下面這樣:

  1. 將共享變量的副本拷貝到工作空間中
  2. 對變量進行賦值修改
  3. 將工作空間中的變量寫回到內存中

JMM還規定如下:

  • 任何線程在解鎖前必須將工作空間的共享變量立即刷新進內存中
  • 線程在加鎖前必須讀取主內存中的值更新到自己的工作空間中
  • 加鎖和解鎖是同一把鎖

問題引入

這時候如果多個線程併發按照上面的三步走去訪問主內存中的共享變量的話就會出現線程安全性的問題, 比如說 現在主內存中的共享變量是c=1, 有AB兩個線程去併發訪問這個c變量, 都想進行c++, 現在A將c拷貝到自己的工作空間進行c++, 於是c=2 , 於此同時線程B也進行c++, c在B的工作空間中=2, AB線程將結果寫回工作空間最終的結果就是2, 而不是我們預期的3

相信怎麼解決大家都知道, 就是使用JUC,中的原子類就能規避這個問題

而原子類的底層實現使用的就是CAS技術

什麼是CAS

CAS(compare and swap) 顧名思義: 比較和交換,在JUC中原子類的底層使用的都是CAS無鎖實現線程安全,是一門很炫的技術

如下面兩行代碼, 先比較再交換, 即: 如果從主內存中讀取到的值為4就將它更新為2019

        AtomicInteger atomicInteger = new AtomicInteger(4);
        atomicInteger.compareAndSet(4,2019);

跟進AtomicInteger的源碼如下, 底層維護着一個int 類型的 變量, (當然是因為我選擇的原來類是AtomicInteger類型), 並且這個int類型的值被 volatile 修飾

    private volatile int value;

    /**
     * Creates a new AtomicInteger with the given initial value.
     *
     * @param initialValue the initial value
     */
    public AtomicInteger(int initialValue) {
        value = initialValue;
    }

什麼是volatile

volatile是JVM提供的輕量的同步機制, 為什麼是輕量界別呢? , 剛才在上面說了JMM規範中提到了三條特性, 而JVM提供的volatile僅僅滿足上面的規範中的 2/3, 如下:

  1. 保證可見性
  2. 不保證原子性
  3. 禁止指令重排序

單獨的volatile是不能滿足原子性的,即如下代碼在多線程併發訪問的情況下依然會出現線程安全性問題

private volatile int value;
 
public void add(){
  value++;   
}

那麼JUC的原子類是如何實現的 可以滿足原子性呢? 於是就不得不說本片博文的主角, CAS

CAS源碼跟進

我們跟進AtomicInteger中的先遞增再獲取的方法 incrementAndGet()

    public final int incrementAndGet() {
        return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
    }

通過代碼我們看到調用了Unsafe類來實現

什麼是Unsafe類?

進入Unsafe類,可以看到他裏面存在大量的 native方法,這些native方法全部是空方法,

這個unsafe類其實相當於一個後門,他是java去訪問調用系統上 C C++ 函數類庫的方法 如下圖

繼續跟進這個方法incrementAndGet() 於是我們就來到了我們的主角方法, 關於這個方法倒是不難理解,主要是搞清楚方法中的var12345到底代表什麼就行, 如下代碼+註釋

var1: 上一個方法傳遞進來的: this,即當前對象
var2: 上一個方法傳遞進來的valueOffset, 就是內存地址偏移量
      通過這個內存地址偏移量我能精確的找到要操作的變量在內存中的地址
      
var4: 上一個方法傳遞進來的1, 就是每次增長的值
var5: 通過this和內存地址偏移量讀取出來的當前內存中的目標值
public final int getAndAddInt(Object var1, long var2, int var4) {
        int var5;
        do {
            var5 = this.getIntVolatile(var1, var2);
        } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

        return var5;
    }

注意它用的是while循環, 相對if(flag){} 這種寫法會多一次判斷, 整體的思路就是 在進行修改之前先進行一次比較,如果讀取到的當前值和預期值是相同的,就自增,否則的話就繼續輪詢修改

小總結

通過上面的過程, 其實就能總結出CAS的底層實現原理

  • volatile
  • 自旋鎖
  • unsafe類

補充: CAS通過Native方法的底層實現,本質上是操作系統層面上的CPU的併發原語,JVM會直接實現出彙編層面的指令,依賴於硬件去實現, 此外, 對於CPU的原語來說, 有兩條特性1,必定連續, 2.不被中斷

CAS的優缺點

優點:

它的底層我們看到了通過do-while 實現的自旋鎖來實現, 就省去了在多個線程之間進行切換所帶來的額外的上下文切換的開銷

缺點:

  1. 通過while循環不斷的嘗試獲取, 省去了上下文切換的開銷,但是佔用cpu的資源
  2. CAS只能保證一個共享變量的原子性, 如果存在多個共享變量的話不得不加鎖實現
  3. 存在ABA問題

ABA問題

什麼是ABA問題

我們這樣玩, 還是AB兩個線程, 給AtomicInteger賦初始值0

A線程中的代碼如下:

        Thread.sleep(3000);
        atomicInteger.compareAndSet(0,2019);

B線程中的代碼如下:

        atomicInteger.compareAndSet(0,1);
        atomicInteger.compareAndSet(1,0);

AB線程同時啟動, 雖然最終的結果A線程能成果的將值修改成2019,,但是它不能感知到在他睡眠過程中B線程對數據進行過改變, 換句話說就是A線程被B線程欺騙了

ABA問題的解決— AtomicStampedRefernce.java

帶時間戳的原子引用, 實現的機制就是通過 原子引用+版本號來完成, 每次對指定值的修改相應的版本號會加1, 實例如下

        // 0表示初始化, 1表示初始版本號
        AtomicStampedReference<Integer> reference = new AtomicStampedReference<>(0, 1);
        reference.getStamp(); // 獲取版本號
        reference.attemptStamp(1,2); // 期待是1, 如果是1就更新為2

原子引用

JUC中我們可以找到像AtomicInteger這樣已經定義好了實現類, 但是JUC沒有給我們提供類似這樣 AtomicUser或者 AtomicProduct 這樣自定義類型的原子引用類型啊, 不過java仍然是提供了後門就是 原子引用類型

使用實例:

        User user  = getUserById(1);
        AtomicReference<User> userAtomicReference = new AtomicReference<User>();
        user.setUsername("張三");
        userAtomicReference.compareAndSet(user,user);

歡迎關注我, 會繼續更新筆記

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

3c收購,鏡頭 收購有可能以全新價回收嗎?

台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

賣IPHONE,iPhone回收,舊換新!教你怎麼賣才划算?

帶你漲姿勢的認識一下 Kafka 消費者

之前我們介紹過了 Kafka 整體架構,Kafka 生產者,Kafka 生產的消息最終流向哪裡呢?當然是需要消費了,要不只產生一系列數據沒有任何作用啊,如果把 Kafka 比作餐廳的話,那麼生產者就是廚師的角色,消費者就是客人,只有廚師的話,那麼炒出來的菜沒有人吃也沒有意義,如果只有客人沒有廚師的話,誰會去這個店吃飯呢?!所以如果你看完前面的文章意猶未盡的話,可以繼續讓你爽一爽。如果你沒看過前面的文章,那就從現在開始讓你爽。

Kafka 消費者概念

應用程序使用 KafkaConsumer 從 Kafka 中訂閱主題並接收來自這些主題的消息,然後再把他們保存起來。應用程序首先需要創建一個 KafkaConsumer 對象,訂閱主題並開始接受消息,驗證消息並保存結果。一段時間后,生產者往主題寫入的速度超過了應用程序驗證數據的速度,這時候該如何處理?如果只使用單個消費者的話,應用程序會跟不上消息生成的速度,就像多個生產者像相同的主題寫入消息一樣,這時候就需要多個消費者共同參与消費主題中的消息,對消息進行分流處理。

Kafka 消費者從屬於消費者群組。一個群組中的消費者訂閱的都是相同的主題,每個消費者接收主題一部分分區的消息。下面是一個 Kafka 分區消費示意圖

上圖中的主題 T1 有四個分區,分別是分區0、分區1、分區2、分區3,我們創建一個消費者群組1,消費者群組中只有一個消費者,它訂閱主題T1,接收到 T1 中的全部消息。由於一個消費者處理四個生產者發送到分區的消息,壓力有些大,需要幫手來幫忙分擔任務,於是就演變為下圖

這樣一來,消費者的消費能力就大大提高了,但是在某些環境下比如用戶產生消息特別多的時候,生產者產生的消息仍舊讓消費者吃不消,那就繼續增加消費者。

如上圖所示,每個分區所產生的消息能夠被每個消費者群組中的消費者消費,如果向消費者群組中增加更多的消費者,那麼多餘的消費者將會閑置,如下圖所示

向群組中增加消費者是橫向伸縮消費能力的主要方式。總而言之,我們可以通過增加消費組的消費者來進行水平擴展提升消費能力。這也是為什麼建議創建主題時使用比較多的分區數,這樣可以在消費負載高的情況下增加消費者來提升性能。另外,消費者的數量不應該比分區數多,因為多出來的消費者是空閑的,沒有任何幫助。

Kafka 一個很重要的特性就是,只需寫入一次消息,可以支持任意多的應用讀取這個消息。換句話說,每個應用都可以讀到全量的消息。為了使得每個應用都能讀到全量消息,應用需要有不同的消費組。對於上面的例子,假如我們新增了一個新的消費組 G2,而這個消費組有兩個消費者,那麼就演變為下圖這樣

在這個場景中,消費組 G1 和消費組 G2 都能收到 T1 主題的全量消息,在邏輯意義上來說它們屬於不同的應用。

總結起來就是如果應用需要讀取全量消息,那麼請為該應用設置一個消費組;如果該應用消費能力不足,那麼可以考慮在這個消費組裡增加消費者

消費者組和分區重平衡

消費者組是什麼

消費者組(Consumer Group)是由一個或多個消費者實例(Consumer Instance)組成的群組,具有可擴展性和可容錯性的一種機制。消費者組內的消費者共享一個消費者組ID,這個ID 也叫做 Group ID,組內的消費者共同對一個主題進行訂閱和消費,同一個組中的消費者只能消費一個分區的消息,多餘的消費者會閑置,派不上用場。

我們在上面提到了兩種消費方式

  • 一個消費者群組消費一個主題中的消息,這種消費模式又稱為點對點的消費方式,點對點的消費方式又被稱為消息隊列
  • 一個主題中的消息被多個消費者群組共同消費,這種消費模式又稱為發布-訂閱模式

消費者重平衡

我們從上面的消費者演變圖中可以知道這麼一個過程:最初是一個消費者訂閱一個主題並消費其全部分區的消息,後來有一個消費者加入群組,隨後又有更多的消費者加入群組,而新加入的消費者實例分攤了最初消費者的部分消息,這種把分區的所有權通過一個消費者轉到其他消費者的行為稱為重平衡,英文名也叫做 Rebalance 。如下圖所示

重平衡非常重要,它為消費者群組帶來了高可用性伸縮性,我們可以放心的添加消費者或移除消費者,不過在正常情況下我們並不希望發生這樣的行為。在重平衡期間,消費者無法讀取消息,造成整個消費者組在重平衡的期間都不可用。另外,當分區被重新分配給另一個消費者時,消息當前的讀取狀態會丟失,它有可能還需要去刷新緩存,在它重新恢復狀態之前會拖慢應用程序。

消費者通過向組織協調者(Kafka Broker)發送心跳來維護自己是消費者組的一員並確認其擁有的分區。對於不同不的消費群體來說,其組織協調者可以是不同的。只要消費者定期發送心跳,就會認為消費者是存活的並處理其分區中的消息。當消費者檢索記錄或者提交它所消費的記錄時就會發送心跳。

如果過了一段時間 Kafka 停止發送心跳了,會話(Session)就會過期,組織協調者就會認為這個 Consumer 已經死亡,就會觸發一次重平衡。如果消費者宕機並且停止發送消息,組織協調者會等待幾秒鐘,確認它死亡了才會觸發重平衡。在這段時間里,死亡的消費者將不處理任何消息。在清理消費者時,消費者將通知協調者它要離開群組,組織協調者會觸發一次重平衡,盡量降低處理停頓。

重平衡是一把雙刃劍,它為消費者群組帶來高可用性和伸縮性的同時,還有有一些明顯的缺點(bug),而這些 bug 到現在社區還無法修改。

重平衡的過程對消費者組有極大的影響。因為每次重平衡過程中都會導致萬物靜止,參考 JVM 中的垃圾回收機制,也就是 Stop The World ,STW,(引用自《深入理解 Java 虛擬機》中 p76 關於 Serial 收集器的描述):

更重要的是它在進行垃圾收集時,必須暫停其他所有的工作線程。直到它收集結束。Stop The World 這個名字聽起來很帥,但這項工作實際上是由虛擬機在後台自動發起並完成的,在用戶不可見的情況下把用戶正常工作的線程全部停掉,這對很多應用來說都是難以接受的。

也就是說,在重平衡期間,消費者組中的消費者實例都會停止消費,等待重平衡的完成。而且重平衡這個過程很慢……

創建消費者

上面的理論說的有點多,下面就通過代碼來講解一下消費者是如何消費的

在讀取消息之前,需要先創建一個 KafkaConsumer 對象。創建 KafkaConsumer 對象與創建 KafkaProducer 對象十分相似 — 把需要傳遞給消費者的屬性放在 properties 對象中,後面我們會着重討論 Kafka 的一些配置,這裏我們先簡單的創建一下,使用3個屬性就足矣,分別是 bootstrap.serverkey.deserializervalue.deserializer

這三個屬性我們已經用過很多次了,如果你還不是很清楚的話,可以參考

還有一個屬性是 group.id 這個屬性不是必須的,它指定了 KafkaConsumer 是屬於哪個消費者群組。創建不屬於任何一個群組的消費者也是可以的

Properties properties = new Properties();
        properties.put("bootstrap.server","192.168.1.9:9092");     properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");   properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
KafkaConsumer<String,String> consumer = new KafkaConsumer<>(properties);

主題訂閱

創建好消費者之後,下一步就開始訂閱主題了。subscribe() 方法接受一個主題列表作為參數,使用起來比較簡單

consumer.subscribe(Collections.singletonList("customerTopic"));

為了簡單我們只訂閱了一個主題 customerTopic,參數傳入的是一個正則表達式,正則表達式可以匹配多個主題,如果有人創建了新的主題,並且主題的名字與正則表達式相匹配,那麼會立即觸發一次重平衡,消費者就可以讀取新的主題。

要訂閱所有與 test 相關的主題,可以這樣做

consumer.subscribe("test.*");

輪詢

我們知道,Kafka 是支持訂閱/發布模式的,生產者發送數據給 Kafka Broker,那麼消費者是如何知道生產者發送了數據呢?其實生產者產生的數據消費者是不知道的,KafkaConsumer 採用輪詢的方式定期去 Kafka Broker 中進行數據的檢索,如果有數據就用來消費,如果沒有就再繼續輪詢等待,下面是輪詢等待的具體實現

try {
  while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(100));
    for (ConsumerRecord<String, String> record : records) {
      int updateCount = 1;
      if (map.containsKey(record.value())) {
        updateCount = (int) map.get(record.value() + 1);
      }
      map.put(record.value(), updateCount);
    }
  }
}finally {
  consumer.close();
}
  • 這是一個無限循環。消費者實際上是一個長期運行的應用程序,它通過輪詢的方式向 Kafka 請求數據。
  • 第三行代碼非常重要,Kafka 必須定期循環請求數據,否則就會認為該 Consumer 已經掛了,會觸發重平衡,它的分區會移交給群組中的其它消費者。傳給 poll() 方法的是一個超市時間,用 java.time.Duration 類來表示,如果該參數被設置為 0 ,poll() 方法會立刻返回,否則就會在指定的毫秒數內一直等待 broker 返回數據。
  • poll() 方法會返回一個記錄列表。每條記錄都包含了記錄所屬主題的信息,記錄所在分區的信息、記錄在分區中的偏移量,以及記錄的鍵值對。我們一般會遍歷這個列表,逐條處理每條記錄。
  • 在退出應用程序之前使用 close() 方法關閉消費者。網絡連接和 socket 也會隨之關閉,並立即觸發一次重平衡,而不是等待群組協調器發現它不再發送心跳並認定它已經死亡。

線程安全性

在同一個群組中,我們無法讓一個線程運行多個消費者,也無法讓多個線程安全的共享一個消費者。按照規則,一個消費者使用一個線程,如果一個消費者群組中多個消費者都想要運行的話,那麼必須讓每個消費者在自己的線程中運行,可以使用 Java 中的 ExecutorService 啟動多個消費者進行進行處理。

消費者配置

到目前為止,我們學習了如何使用消費者 API,不過只介紹了幾個最基本的屬性,Kafka 文檔列出了所有與消費者相關的配置說明。大部分參數都有合理的默認值,一般不需要修改它們,下面我們就來介紹一下這些參數。

  • fetch.min.bytes

該屬性指定了消費者從服務器獲取記錄的最小字節數。broker 在收到消費者的數據請求時,如果可用的數據量小於 fetch.min.bytes 指定的大小,那麼它會等到有足夠的可用數據時才把它返回給消費者。這樣可以降低消費者和 broker 的工作負載,因為它們在主題使用頻率不是很高的時候就不用來回處理消息。如果沒有很多可用數據,但消費者的 CPU 使用率很高,那麼就需要把該屬性的值設得比默認值大。如果消費者的數量比較多,把該屬性的值調大可以降低 broker 的工作負載。

  • fetch.max.wait.ms

我們通過上面的 fetch.min.bytes 告訴 Kafka,等到有足夠的數據時才會把它返回給消費者。而 fetch.max.wait.ms 則用於指定 broker 的等待時間,默認是 500 毫秒。如果沒有足夠的數據流入 kafka 的話,消費者獲取的最小數據量要求就得不到滿足,最終導致 500 毫秒的延遲。如果要降低潛在的延遲,就可以把參數值設置的小一些。如果 fetch.max.wait.ms 被設置為 100 毫秒的延遲,而 fetch.min.bytes 的值設置為 1MB,那麼 Kafka 在收到消費者請求后,要麼返回 1MB 的數據,要麼在 100 ms 后返回所有可用的數據。就看哪個條件首先被滿足。

  • max.partition.fetch.bytes

該屬性指定了服務器從每個分區里返回給消費者的最大字節數。它的默認值時 1MB,也就是說,KafkaConsumer.poll() 方法從每個分區里返回的記錄最多不超過 max.partition.fetch.bytes 指定的字節。如果一個主題有20個分區和5個消費者,那麼每個消費者需要至少4 MB的可用內存來接收記錄。在為消費者分配內存時,可以給它們多分配一些,因為如果群組裡有消費者發生崩潰,剩下的消費者需要處理更多的分區。max.partition.fetch.bytes 的值必須比 broker 能夠接收的最大消息的字節數(通過 max.message.size 屬性配置大),否則消費者可能無法讀取這些消息,導致消費者一直掛起重試。 在設置該屬性時,另外一個考量的因素是消費者處理數據的時間。消費者需要頻繁的調用 poll() 方法來避免會話過期和發生分區再平衡,如果單次調用poll() 返回的數據太多,消費者需要更多的時間進行處理,可能無法及時進行下一個輪詢來避免會話過期。如果出現這種情況,可以把 max.partition.fetch.bytes 值改小,或者延長會話過期時間。

  • session.timeout.ms

這個屬性指定了消費者在被認為死亡之前可以與服務器斷開連接的時間,默認是 3s。如果消費者沒有在 session.timeout.ms 指定的時間內發送心跳給群組協調器,就會被認定為死亡,協調器就會觸發重平衡。把它的分區分配給消費者群組中的其它消費者,此屬性與 heartbeat.interval.ms 緊密相關。heartbeat.interval.ms 指定了 poll() 方法向群組協調器發送心跳的頻率,session.timeout.ms 則指定了消費者可以多久不發送心跳。所以,這兩個屬性一般需要同時修改,heartbeat.interval.ms 必須比 session.timeout.ms 小,一般是 session.timeout.ms 的三分之一。如果 session.timeout.ms 是 3s,那麼 heartbeat.interval.ms 應該是 1s。把 session.timeout.ms 值設置的比默認值小,可以更快地檢測和恢復崩憤的節點,不過長時間的輪詢或垃圾收集可能導致非預期的重平衡。把該屬性的值設置得大一些,可以減少意外的重平衡,不過檢測節點崩潰需要更長的時間。

  • auto.offset.reset

該屬性指定了消費者在讀取一個沒有偏移量的分區或者偏移量無效的情況下的該如何處理。它的默認值是 latest,意思指的是,在偏移量無效的情況下,消費者將從最新的記錄開始讀取數據。另一個值是 earliest,意思指的是在偏移量無效的情況下,消費者將從起始位置處開始讀取分區的記錄。

  • enable.auto.commit

我們稍後將介紹幾種不同的提交偏移量的方式。該屬性指定了消費者是否自動提交偏移量,默認值是 true,為了盡量避免出現重複數據和數據丟失,可以把它設置為 false,由自己控制何時提交偏移量。如果把它設置為 true,還可以通過 auto.commit.interval.ms 屬性來控制提交的頻率

  • partition.assignment.strategy

我們知道,分區會分配給群組中的消費者。PartitionAssignor 會根據給定的消費者和主題,決定哪些分區應該被分配給哪個消費者,Kafka 有兩個默認的分配策略RangeRoundRobin

  • client.id

該屬性可以是任意字符串,broker 用他來標識從客戶端發送過來的消息,通常被用在日誌、度量指標和配額中

  • max.poll.records

該屬性用於控制單次調用 call() 方法能夠返回的記錄數量,可以幫你控制在輪詢中需要處理的數據量。

  • receive.buffer.bytes 和 send.buffer.bytes

socket 在讀寫數據時用到的 TCP 緩衝區也可以設置大小。如果它們被設置為 -1,就使用操作系統默認值。如果生產者或消費者與 broker 處於不同的數據中心內,可以適當增大這些值,因為跨數據中心的網絡一般都有比較高的延遲和比較低的帶寬。

提交和偏移量的概念

特殊偏移

我們上面提到,消費者在每次調用poll() 方法進行定時輪詢的時候,會返回由生產者寫入 Kafka 但是還沒有被消費者消費的記錄,因此我們可以追蹤到哪些記錄是被群組裡的哪個消費者讀取的。消費者可以使用 Kafka 來追蹤消息在分區中的位置(偏移量)

消費者會向一個叫做 _consumer_offset 的特殊主題中發送消息,這個主題會保存每次所發送消息中的分區偏移量,這個主題的主要作用就是消費者觸發重平衡後記錄偏移使用的,消費者每次向這個主題發送消息,正常情況下不觸發重平衡,這個主題是不起作用的,當觸發重平衡后,消費者停止工作,每個消費者可能會分到對應的分區,這個主題就是讓消費者能夠繼續處理消息所設置的。

如果提交的偏移量小於客戶端最後一次處理的偏移量,那麼位於兩個偏移量之間的消息就會被重複處理

如果提交的偏移量大於最後一次消費時的偏移量,那麼處於兩個偏移量中間的消息將會丟失

既然_consumer_offset 如此重要,那麼它的提交方式是怎樣的呢?下面我們就來說一下

提交方式

KafkaConsumer API 提供了多種方式來提交偏移量

自動提交

最簡單的方式就是讓消費者自動提交偏移量。如果 enable.auto.commit 被設置為true,那麼每過 5s,消費者會自動把從 poll() 方法輪詢到的最大偏移量提交上去。提交時間間隔由 auto.commit.interval.ms 控制,默認是 5s。與消費者里的其他東西一樣,自動提交也是在輪詢中進行的。消費者在每次輪詢中會檢查是否提交該偏移量了,如果是,那麼就會提交從上一次輪詢中返回的偏移量。

提交當前偏移量

auto.commit.offset 設置為 false,可以讓應用程序決定何時提交偏移量。使用 commitSync() 提交偏移量。這個 API 會提交由 poll() 方法返回的最新偏移量,提交成功后馬上返回,如果提交失敗就拋出異常。

commitSync() 將會提交由 poll() 返回的最新偏移量,如果處理完所有記錄后要確保調用了 commitSync(),否則還是會有丟失消息的風險,如果發生了在均衡,從最近一批消息到發生在均衡之間的所有消息都將被重複處理。

異步提交

異步提交 commitAsync() 與同步提交 commitSync() 最大的區別在於異步提交不會進行重試,同步提交會一致進行重試。

同步和異步組合提交

一般情況下,針對偶爾出現的提交失敗,不進行重試不會有太大的問題,因為如果提交失敗是因為臨時問題導致的,那麼後續的提交總會有成功的。但是如果在關閉消費者或再均衡前的最後一次提交,就要確保提交成功。

因此,在消費者關閉之前一般會組合使用commitAsync和commitSync提交偏移量

提交特定的偏移量

消費者API允許調用 commitSync() 和 commitAsync() 方法時傳入希望提交的 partition 和 offset 的 map,即提交特定的偏移量。

文章參考:

《極客時間-Kafka核心技術與實戰》

《Kafka 權威指南》

關注公眾號獲取更多優質电子書,關注一下你就知道資源是有多好了

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

收購3c,收購IPHONE,收購蘋果電腦-詳細收購流程一覽表

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

※公開收購3c價格,不怕被賤賣!

※自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

特斯拉發表電動卡車 Cybertruck,一台從科幻電影走出來的鋼鐵車

今天的 Elon Musk 看起來不像鋼鐵人,更像蝙蝠俠,因為他們的新車 Cybertruck,不但外型酷似蝙蝠車,同時還能防彈防撞,並且擁有超越保時捷的加速度,跟勝過市面上卡車的拖吊能力,更驚人的是,售價只要 39,900 美元起。

眾所期待的特斯拉新車 Cybertruck 今日正式發表,和之前流出的影像不同,Cybertruck 酷似隱形戰機 F-117 的設計,讓人聯想到蝙蝠車,甚至懷疑這是不是一台防雷達偵測的戰車?

Tesla Cybertruck 全車採用冷鑄鋼板,能夠抵擋 9mm 口徑手槍的射擊,現場展示用重鎚敲擊也毫髮無傷;車窗玻璃同樣採用防彈設計,然而有趣的是,現場展示時,被大鐵球砸出了一片雪花。「至少,它沒被打穿,你坐在裡面很安全。」Elon Musk 笑著說。

Cybertruck 為了因應負重,搭載了適應性氣壓懸吊系統,針對高速公路,或是越野泥巴路,能夠自動調整懸吊高度,同時也順便使用這個氣壓系統,做了一個高壓出力裝置,使用者可以自行加裝不同氣壓工具,像是高壓水槍或是電鑽等。

當重裝電動機車開上後廂時,懸吊系統會自動調整車尾高度,讓車身保持平衡。

車尾與其他皮卡車開放式貨斗不同,Cybertruck 採用封閉式貨斗,並有升降式尾門,現場展示時,將這台電動機車 ATV 直接騎上貨斗後,還能直接充電,顯然是在致敬蝙蝠車跟蝙蝠機車。

Cybertruck 如同其他皮卡車,車尾裝有釣鉤,能夠充當拖車使用,而歸功於它的強力馬達,拖車能力屌打了皮卡車霸主 Ford F-150,在現場展示的影片中,特斯拉讓 Cybertruck 跟 F-150 互相拖住對方,進行拔河測試,結果 F-150 整台被 Cybertruck 拖走。

F-150 慘遭 Cybertruck 拖走。

馬斯克強調,一般皮卡車需要另外裝載發電機才能使用電動工具,Cybertruck 直接提供了電源,因此省下不少空間,同時還提供強大的拖力。

此外,做為一台卡車,Cybertruck 莫名其妙地擁有超越保時捷的加速度,根據現場公布數據,最頂級版的 0-100 公里加速時間不到 3 秒。現場展示了 Cybertruck 與 Porsche 911 賽跑的影片,起步雖然小輸一點,但隨後就超越了 911。

現場展示競速影片,大約 1 秒後,Cybertruck 就超過了 911。

Tesla Cybertruck 共有 3 種版本,依照馬達數量來分別,最低價 39,900 美元起,最高 69,900 美元。Cybertruck 從今天起在美國開放預購,實際交車時間預計要等到 2021 年底。頂級的三馬達款,更預計要等到 2022 年底才會開始生產。

如同馬斯克開場所說,卡車在過去幾十年來都長得差不多,特斯拉要打造一台完全不一樣的卡車,同時還要保持零排放,跟超高性能,從今天的現場展示來看,特斯拉再次完成一個不可能的任務。在興奮之餘也別忘了,這一切都是現場展示,實際上如何,就有待實際交車後驗證了!

(合作媒體:。圖片來源:)

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

收購3c,收購IPHONE,收購蘋果電腦-詳細收購流程一覽表

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

※公開收購3c價格,不怕被賤賣!

※自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

【其他文章推薦】

收購3c,收購IPHONE,收購蘋果電腦-詳細收購流程一覽表

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

※公開收購3c價格,不怕被賤賣!

※自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

自己動手實現分佈式任務調度框架(續)

  之前寫過一篇:本來是用來閑來分享一下自己的思維方式,時至今日發現居然有些人正在使用了,本着對代碼負責人的態度,對代碼部分已知bug進行了修改,並增加了若干功能,如立即啟動,實時停止等功能,新增加的功能會在這一篇做詳細的說明。

  提到分佈式任務調度,市面上本身已經有一些框架工具可以使用,但是個人覺得功能做的都太豐富,架構都過於複雜,所以才有了我重複造輪子。個人喜歡把複雜的問題簡單化,利用有限的資源實現竟可能多的功能。因為有幾個朋友問部署方式,這裏再次強調下:我的這個服務可以直接打成jar放在自己本地倉庫,然後依賴進去,或者直接copy代碼過去,當成自己項目的一部分就可以了。也就是說跟隨你們自己的項目啟動,所以我這裏也沒有寫界面。下面先談談怎麼基於上次的代碼實現任務立即啟動吧!

  調度和自己服務整合後部署圖抽象成如下:

  

 

 

   用戶在前端點擊立即請求按鈕,通過各種負載均衡軟件或者設備,到達某台機器的某個帶有本調度框架的服務,然後進行具體的執行,也就是說這個立即啟動就是一個最常見最簡單的請求,沒有過多複雜的問題(比如多節點會不會重複執行這些)。最簡單的辦法,當用戶請求過來直接用一個線程或者線程池執行用戶點的那個任務的邏輯代碼就行了,當然我這裏沒有那麼粗暴,現有的調度代碼資源如下:

package com.rdpaas.task.scheduler;

import com.rdpaas.task.common.Invocation;
import com.rdpaas.task.common.Node;
import com.rdpaas.task.common.NotifyCmd;
import com.rdpaas.task.common.Task;
import com.rdpaas.task.common.TaskDetail;
import com.rdpaas.task.common.TaskStatus;
import com.rdpaas.task.config.EasyJobConfig;
import com.rdpaas.task.repository.NodeRepository;
import com.rdpaas.task.repository.TaskRepository;
import com.rdpaas.task.strategy.Strategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * 任務調度器
 * @author rongdi
 * @date 2019-03-13 21:15
 */
@Component
public class TaskExecutor {

    private static final Logger logger = LoggerFactory.getLogger(TaskExecutor.class);

    @Autowired
    private TaskRepository taskRepository;

    @Autowired
    private NodeRepository nodeRepository;

    @Autowired
    private EasyJobConfig config;

    /**
     * 創建任務到期延時隊列
      */
    private DelayQueue<DelayItem<Task>> taskQueue = new DelayQueue<>();

    /**
     * 可以明確知道最多只會運行2個線程,直接使用系統自帶工具就可以了
     */
    private ExecutorService bossPool = Executors.newFixedThreadPool(2);

    /**
     * 正在執行的任務的Future
     */
    private Map<Long,Future> doingFutures = new HashMap<>();

    /**
     * 聲明工作線程池
     */
    private ThreadPoolExecutor workerPool;
    
    /**
     * 獲取任務的策略
     */
    private Strategy strategy;


    @PostConstruct
    public void init() {
        /**
         * 根據配置選擇一個節點獲取任務的策略
         */
        strategy = Strategy.choose(config.getNodeStrategy());
        /**
         * 自定義線程池,初始線程數量corePoolSize,線程池等待隊列大小queueSize,當初始線程都有任務,並且等待隊列滿后
         * 線程數量會自動擴充最大線程數maxSize,當新擴充的線程空閑60s后自動回收.自定義線程池是因為Executors那幾個線程工具
         * 各有各的弊端,不適合生產使用
         */
        workerPool = new ThreadPoolExecutor(config.getCorePoolSize(), config.getMaxPoolSize(), 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(config.getQueueSize()));
        /**
         * 執行待處理任務加載線程
         */
        bossPool.execute(new Loader());
        /**
         * 執行任務調度線程
         */
        bossPool.execute(new Boss());
    
    }

    class Loader implements Runnable {

        @Override
        public void run() {
            for(;;) {
                try { 
                    /**
                     * 先獲取可用的節點列表
                     */
                    List<Node> nodes = nodeRepository.getEnableNodes(config.getHeartBeatSeconds() * 2);
                    if(nodes == null || nodes.isEmpty()) {
                        continue;
                    }
                    /**
                     * 查找還有指定時間(單位秒)才開始的主任務列表
                     */
                    List<Task> tasks = taskRepository.listNotStartedTasks(config.getFetchDuration());
                    if(tasks == null || tasks.isEmpty()) {
                        continue;
                    }
                    for(Task task:tasks) {
                        
                        boolean accept = strategy.accept(nodes, task, config.getNodeId());
                        /**
                         * 不該自己拿就不要搶
                         */
                        if(!accept) {
                            continue;
                        }
                        /**
                         * 先設置成待執行
                         */
                        task.setStatus(TaskStatus.PENDING);
                        task.setNodeId(config.getNodeId());
                        /**
                         * 使用樂觀鎖嘗試更新狀態,如果更新成功,其他節點就不會更新成功。如果其它節點也正在查詢未完成的
                         * 任務列表和當前這段時間有節點已經更新了這個任務,version必然和查出來時候的version不一樣了,這裏更新
                         * 必然會返回0了
                         */
                        int n = taskRepository.updateWithVersion(task);
                        Date nextStartTime = task.getNextStartTime();
                        if(n == 0 || nextStartTime == null) {
                            continue;
                        }
                        /**
                         * 封裝成延時對象放入延時隊列,這裏再查一次是因為上面樂觀鎖已經更新了版本,會導致後面結束任務更新不成功
                         */
                        task = taskRepository.get(task.getId());
                        DelayItem<Task> delayItem = new DelayItem<Task>(nextStartTime.getTime() - new Date().getTime(), task);
                        taskQueue.offer(delayItem);
                        
                    }
                    Thread.sleep(config.getFetchPeriod());
                } catch(Exception e) {
                    logger.error("fetch task list failed,cause by:{}", e);
                }
            }
        }
        
    }
    
    class Boss implements Runnable {
        @Override
        public void run() {
            for (;;) {
                try {
                     /**
                     * 時間到了就可以從延時隊列拿出任務對象,然後交給worker線程池去執行
                     */
                    DelayItem<Task> item = taskQueue.take();
                    if(item != null && item.getItem() != null) {
                        Task task = item.getItem();
                        /**
                         * 真正開始執行了設置成執行中
                         */
                        task.setStatus(TaskStatus.DOING);
                        /**
                         * loader線程中已經使用樂觀鎖控制了,這裏沒必要了
                         */
                        taskRepository.update(task);
                        /**
                         * 提交到線程池
                         */
                        Future future = workerPool.submit(new Worker(task));
                        /**
                         * 暫存在doingFutures
                         */
                        doingFutures.put(task.getId(),future);
                    }
                     
                } catch (Exception e) {
                    logger.error("fetch task failed,cause by:{}", e);
                }
            }
        }

    }

    class Worker implements Callable<String> {

        private Task task;

        public Worker(Task task) {
            this.task = task;
        }

        @Override
        public String call() {
            logger.info("Begin to execute task:{}",task.getId());
            TaskDetail detail = null;
            try {
                //開始任務
                detail = taskRepository.start(task);
                if(detail == null) return null;
                //執行任務
                task.getInvokor().invoke();
                //完成任務
                finish(task,detail);
                logger.info("finished execute task:{}",task.getId());
                /**
                 * 執行完后刪了
                 */
                doingFutures.remove(task.getId());
            } catch (Exception e) {
                logger.error("execute task:{} error,cause by:{}",task.getId(), e);
                try {
                    taskRepository.fail(task,detail,e.getCause().getMessage());
                } catch(Exception e1) {
                    logger.error("fail task:{} error,cause by:{}",task.getId(), e);
                }
            }
            return null;
        }

    }

    /**
     * 完成子任務,如果父任務失敗了,子任務不會執行
     * @param task
     * @param detail
     * @throws Exception
     */
    private void finish(Task task,TaskDetail detail) throws Exception {

        //查看是否有子類任務
        List<Task> childTasks = taskRepository.getChilds(task.getId());
        if(childTasks == null || childTasks.isEmpty()) {
            //當沒有子任務時完成父任務
            taskRepository.finish(task,detail);
            return;
        } else {
            for (Task childTask : childTasks) {
                //開始任務
                TaskDetail childDetail = null;
                try {
                    //將子任務狀態改成執行中
                    childTask.setStatus(TaskStatus.DOING);
                    childTask.setNodeId(config.getNodeId());
                    //開始子任務
                    childDetail = taskRepository.startChild(childTask,detail);
                    //使用樂觀鎖更新下狀態,不然這裏可能和恢複線程產生併發問題
                    int n = taskRepository.updateWithVersion(childTask);
                    if (n > 0) {
                        //再從數據庫取一下,避免上面update修改后version不同步
                        childTask = taskRepository.get(childTask.getId());
                        //執行子任務
                        childTask.getInvokor().invoke();
                        //完成子任務
                        finish(childTask, childDetail);
                    }
                } catch (Exception e) {
                    logger.error("execute child task error,cause by:{}", e);
                    try {
                        taskRepository.fail(childTask, childDetail, e.getCause().getMessage());
                    } catch (Exception e1) {
                        logger.error("fail child task error,cause by:{}", e);
                    }
                }
            }
            /**
             * 當有子任務時完成子任務后再完成父任務
             */
            taskRepository.finish(task,detail);

        }

    }

    /**
     * 添加任務
     * @param name
     * @param cronExp
     * @param invockor
     * @return
     * @throws Exception
     */
    public long addTask(String name, String cronExp, Invocation invockor) throws Exception {
        Task task = new Task(name,cronExp,invockor);
        return taskRepository.insert(task);
    }

    /**
     * 添加子任務
     * @param pid
     * @param name
     * @param cronExp
     * @param invockor
     * @return
     * @throws Exception
     */
    public long addChildTask(Long pid,String name, String cronExp, Invocation invockor) throws Exception {
        Task task = new Task(name,cronExp,invockor);
        task.setPid(pid);
        return taskRepository.insert(task);
    }

   
}

  上面主要就是三組線程,Loader負責加載將要執行的任務放入本地的任務隊列,Boss線程負責取出任務隊列的任務,然後分配Worker線程池的一個線程去執行。由上面的代碼可以看到如果要立即執行,其實只需要把一個延時為0的任務放入任務隊列,等着Boss線程去取然後分配給worker執行就可以實現了,代碼如下:

    /**
     * 立即執行任務,就是設置一下延時為0加入任務隊列就好了,這個可以外部直接調用
     * @param taskId
     * @return
     */
    public boolean startNow(Long taskId) {
        Task task = taskRepository.get(taskId);
        task.setStatus(TaskStatus.DOING);
        taskRepository.update(task);
        DelayItem<Task> delayItem = new DelayItem<Task>(0L, task);
        return taskQueue.offer(delayItem);
    }

  啟動不用再多說,下面介紹一下停止任務,根據面向對象的思維,用戶要想停止一個任務,最終執行停止任務的就是正在執行任務的那個節點。停止任務有兩種情況,第一種任務沒有正在運行如何停止,第二種是任務正在運行如何停止。第一種其實直接改變一下任務對象的狀態為停止就行了,不必多說。下面主要考慮如何停止正在運行的任務,細心的朋友可能已經發現上面代碼和之前那一篇代碼有點區別,之前用的Runnble作為線程實現接口,這個用了Callable,其實在java中停止線程池中正在運行的線程最常用的就是直接調用future的cancel方法了,要想獲取到這個future對象就需要將以前實現Runnbale改成實現Callable,然後提交到線程池由execute改成submit就可以了,然後每次提交到線程池得到的future對象使用taskId一起保存在一個map中,方便根據taskId隨時找到。當然任務執行完后要及時刪除這個map里的任務,以免常駐其中導致內存溢出。停止任務的請求流程如下

  

 

 

  圖還是原來的圖,但是這時候情況不一樣了,因為停止任務的時候假如當前正在執行這個任務的節點處於服務1,負載均衡是不知道要去把你引到服務1的,他可能會引入到服務2,那就悲劇了,所以通用的做法就是停止請求過來不管落到哪個節點上,那個節點就往一個公用的mq上發一個帶有停止任務業務含義的消息,各個節點訂閱這個消息,然後判斷都判斷任務在不在自己這裏執行,如果在就執行停止操作。但是這樣勢必讓我們的調度服務又要依賴一個外部的消息隊列服務,就算很方便的就可以引入一個外部的消息隊列,但是你真的可以駕馭的了嗎,消息丟了咋辦,重複發送了咋辦,消息服務掛了咋辦,網絡斷了咋辦,又引入了一大堆問題,那我是不是又要寫n篇文章來分別解決這些問題。往往現實卻是就是這麼殘酷,你解決了一個問題,引入了更多的問題,這就是為什麼bug永遠改不完的道理了。當然這不是我的風格,我的風格是利用有限的資源做盡可能多的事情(可能是由於我工作的企業都是那種資源貧瘠的,養成了我這種習慣,土豪公司的程序員請繞道,哈哈)。

  簡化一下問題:目前的問題就是如何讓正在執行任務的節點知道,然後停止正在執行的這個任務,其實就是這個停止通知如何實現。這不免讓我想起了12306網站上買票,其實我們作為老百姓多麼希望12306可以在有票的時候發個短信通知一下我們,然後我們上去搶,但是現實卻是,你要麼使用軟件一直刷,要麼是自己隔一段時間上去瞄一下有沒有票。如果把有票了給我們發短信通知定義為異步通知,那麼這種我們要隔一段時間自己去瞄一下的方式就是同步輪訓。這兩種方式都能達到告知的目的,關鍵的區別在於你到底有沒有時間去一直去瞄,不過相比於可以回家,這些時間都是值得的。個人認為軟件的設計其實就是一個權衡是否值得的過程。如果約定了不使用外部消息隊列這種異步通知的方式,那麼我們只能使用同步輪訓的方式了。不過正好我們的任務調度本身已經有一個心跳機制,沒隔一段時間就去更新一下節點狀態,如果我們把用戶的停止請求作為命令信息更新到每個節點的上,然後隨着心跳獲取到這個節點的信息,然後判斷這個命令,做相應的處理是不是就可以完美解決這個問題。值得嗎?很明顯是值得的,我們只是在心跳邏輯上加一個小小的副作用就實現了通知功能了。代碼如下

package com.rdpaas.task.common;

/**
 * @author rongdi
 * @date 2019/11/26
 */
public enum NotifyCmd {

    //沒有通知,默認狀態
    NO_NOTIFY(0),
    //開啟任務(Task)
    START_TASK(1),
    //修改任務(Task)
    EDIT_TASK(2),
    //停止任務(Task)
    STOP_TASK(3);

    int id;

    NotifyCmd(int id) {
        this.id = id;
    }

    public int getId() {
        return id;
    }

    public static NotifyCmd valueOf(int id) {
        switch (id) {
            case 1:
                return START_TASK;
            case 2:
                return EDIT_TASK;
            case 3:
                return STOP_TASK;
            default:
                return NO_NOTIFY;
        }
    }

}
package com.rdpaas.task.handles;

import com.rdpaas.task.common.NotifyCmd;
import com.rdpaas.task.utils.SpringContextUtil;

/**
 * @author: rongdi
 * @date:
 */
public interface NotifyHandler<T> {

    static NotifyHandler chooseHandler(NotifyCmd notifyCmd) {
        return SpringContextUtil.getByTypeAndName(NotifyHandler.class,notifyCmd.toString());
    }

    public void update(T t);

}
package com.rdpaas.task.handles;

import com.rdpaas.task.scheduler.TaskExecutor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @author: rongdi
 * @date:
 */
@Component("STOP_TASK")
public class StopTaskHandler implements NotifyHandler<Long> {

    @Autowired
    private TaskExecutor taskExecutor;

    @Override
    public void update(Long taskId) {
        taskExecutor.stop(taskId);
    }

}
class HeartBeat implements Runnable {
        @Override
        public void run() {
            for(;;) {
                try {
                    /**
                     * 時間到了就可以從延時隊列拿出節點對象,然後更新時間和序號,
                     * 最後再新建一個超時時間為心跳時間的節點對象放入延時隊列,形成循環的心跳
                     */
                    DelayItem<Node> item = heartBeatQueue.take();
                    if(item != null && item.getItem() != null) {
                        Node node = item.getItem();
                        handHeartBeat(node);
                    }
                    heartBeatQueue.offer(new DelayItem<>(config.getHeartBeatSeconds() * 1000,new Node(config.getNodeId())));
                } catch (Exception e) {
                    logger.error("task heart beat error,cause by:{} ",e);
                }
            }
        }
    }

    /**
     * 處理節點心跳
     * @param node
     */
    private void handHeartBeat(Node node) {
        if(node == null) {
            return;
        }
        /**
         * 先看看數據庫是否存在這個節點
         * 如果不存在:先查找下一個序號,然後設置到node對象中,最後插入
         * 如果存在:直接根據nodeId更新當前節點的序號和時間
         */
        Node currNode= nodeRepository.getByNodeId(node.getNodeId());
        if(currNode == null) {
            node.setRownum(nodeRepository.getNextRownum());
            nodeRepository.insert(node);
        } else  {
            nodeRepository.updateHeartBeat(node.getNodeId());
            NotifyCmd cmd = currNode.getNotifyCmd();
            String notifyValue = currNode.getNotifyValue();
            if(cmd != null && cmd != NotifyCmd.NO_NOTIFY) {
                /**
                 * 藉助心跳做一下通知的事情,比如及時停止正在執行的任務
                 * 根據指令名稱查找Handler
                 */
                NotifyHandler handler = NotifyHandler.chooseHandler(currNode.getNotifyCmd());
                if(handler == null || StringUtils.isEmpty(notifyValue)) {
                    return;
                }
                /**
                 * 執行操作
                 */
                handler.update(Long.valueOf(notifyValue));
            }
            
        }


    }

  最終的任務調度代碼如下:

package com.rdpaas.task.scheduler;

import com.rdpaas.task.common.Invocation;
import com.rdpaas.task.common.Node;
import com.rdpaas.task.common.NotifyCmd;
import com.rdpaas.task.common.Task;
import com.rdpaas.task.common.TaskDetail;
import com.rdpaas.task.common.TaskStatus;
import com.rdpaas.task.config.EasyJobConfig;
import com.rdpaas.task.repository.NodeRepository;
import com.rdpaas.task.repository.TaskRepository;
import com.rdpaas.task.strategy.Strategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * 任務調度器
 * @author rongdi
 * @date 2019-03-13 21:15
 */
@Component
public class TaskExecutor {

    private static final Logger logger = LoggerFactory.getLogger(TaskExecutor.class);

    @Autowired
    private TaskRepository taskRepository;

    @Autowired
    private NodeRepository nodeRepository;

    @Autowired
    private EasyJobConfig config;

    /**
     * 創建任務到期延時隊列
      */
    private DelayQueue<DelayItem<Task>> taskQueue = new DelayQueue<>();

    /**
     * 可以明確知道最多只會運行2個線程,直接使用系統自帶工具就可以了
     */
    private ExecutorService bossPool = Executors.newFixedThreadPool(2);

    /**
     * 正在執行的任務的Future
     */
    private Map<Long,Future> doingFutures = new HashMap<>();

    /**
     * 聲明工作線程池
     */
    private ThreadPoolExecutor workerPool;
    
    /**
     * 獲取任務的策略
     */
    private Strategy strategy;


    @PostConstruct
    public void init() {
        /**
         * 根據配置選擇一個節點獲取任務的策略
         */
        strategy = Strategy.choose(config.getNodeStrategy());
        /**
         * 自定義線程池,初始線程數量corePoolSize,線程池等待隊列大小queueSize,當初始線程都有任務,並且等待隊列滿后
         * 線程數量會自動擴充最大線程數maxSize,當新擴充的線程空閑60s后自動回收.自定義線程池是因為Executors那幾個線程工具
         * 各有各的弊端,不適合生產使用
         */
        workerPool = new ThreadPoolExecutor(config.getCorePoolSize(), config.getMaxPoolSize(), 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(config.getQueueSize()));
        /**
         * 執行待處理任務加載線程
         */
        bossPool.execute(new Loader());
        /**
         * 執行任務調度線程
         */
        bossPool.execute(new Boss());
    
    }

    class Loader implements Runnable {

        @Override
        public void run() {
            for(;;) {
                try { 
                    /**
                     * 先獲取可用的節點列表
                     */
                    List<Node> nodes = nodeRepository.getEnableNodes(config.getHeartBeatSeconds() * 2);
                    if(nodes == null || nodes.isEmpty()) {
                        continue;
                    }
                    /**
                     * 查找還有指定時間(單位秒)才開始的主任務列表
                     */
                    List<Task> tasks = taskRepository.listNotStartedTasks(config.getFetchDuration());
                    if(tasks == null || tasks.isEmpty()) {
                        continue;
                    }
                    for(Task task:tasks) {
                        
                        boolean accept = strategy.accept(nodes, task, config.getNodeId());
                        /**
                         * 不該自己拿就不要搶
                         */
                        if(!accept) {
                            continue;
                        }
                        /**
                         * 先設置成待執行
                         */
                        task.setStatus(TaskStatus.PENDING);
                        task.setNodeId(config.getNodeId());
                        /**
                         * 使用樂觀鎖嘗試更新狀態,如果更新成功,其他節點就不會更新成功。如果其它節點也正在查詢未完成的
                         * 任務列表和當前這段時間有節點已經更新了這個任務,version必然和查出來時候的version不一樣了,這裏更新
                         * 必然會返回0了
                         */
                        int n = taskRepository.updateWithVersion(task);
                        Date nextStartTime = task.getNextStartTime();
                        if(n == 0 || nextStartTime == null) {
                            continue;
                        }
                        /**
                         * 封裝成延時對象放入延時隊列,這裏再查一次是因為上面樂觀鎖已經更新了版本,會導致後面結束任務更新不成功
                         */
                        task = taskRepository.get(task.getId());
                        DelayItem<Task> delayItem = new DelayItem<Task>(nextStartTime.getTime() - new Date().getTime(), task);
                        taskQueue.offer(delayItem);
                        
                    }
                    Thread.sleep(config.getFetchPeriod());
                } catch(Exception e) {
                    logger.error("fetch task list failed,cause by:{}", e);
                }
            }
        }
        
    }
    
    class Boss implements Runnable {
        @Override
        public void run() {
            for (;;) {
                try {
                     /**
                     * 時間到了就可以從延時隊列拿出任務對象,然後交給worker線程池去執行
                     */
                    DelayItem<Task> item = taskQueue.take();
                    if(item != null && item.getItem() != null) {
                        Task task = item.getItem();
                        /**
                         * 真正開始執行了設置成執行中
                         */
                        task.setStatus(TaskStatus.DOING);
                        /**
                         * loader線程中已經使用樂觀鎖控制了,這裏沒必要了
                         */
                        taskRepository.update(task);
                        /**
                         * 提交到線程池
                         */
                        Future future = workerPool.submit(new Worker(task));
                        /**
                         * 暫存在doingFutures
                         */
                        doingFutures.put(task.getId(),future);
                    }
                     
                } catch (Exception e) {
                    logger.error("fetch task failed,cause by:{}", e);
                }
            }
        }

    }

    class Worker implements Callable<String> {

        private Task task;

        public Worker(Task task) {
            this.task = task;
        }

        @Override
        public String call() {
            logger.info("Begin to execute task:{}",task.getId());
            TaskDetail detail = null;
            try {
                //開始任務
                detail = taskRepository.start(task);
                if(detail == null) return null;
                //執行任務
                task.getInvokor().invoke();
                //完成任務
                finish(task,detail);
                logger.info("finished execute task:{}",task.getId());
                /**
                 * 執行完后刪了
                 */
                doingFutures.remove(task.getId());
            } catch (Exception e) {
                logger.error("execute task:{} error,cause by:{}",task.getId(), e);
                try {
                    taskRepository.fail(task,detail,e.getCause().getMessage());
                } catch(Exception e1) {
                    logger.error("fail task:{} error,cause by:{}",task.getId(), e);
                }
            }
            return null;
        }

    }

    /**
     * 完成子任務,如果父任務失敗了,子任務不會執行
     * @param task
     * @param detail
     * @throws Exception
     */
    private void finish(Task task,TaskDetail detail) throws Exception {

        //查看是否有子類任務
        List<Task> childTasks = taskRepository.getChilds(task.getId());
        if(childTasks == null || childTasks.isEmpty()) {
            //當沒有子任務時完成父任務
            taskRepository.finish(task,detail);
            return;
        } else {
            for (Task childTask : childTasks) {
                //開始任務
                TaskDetail childDetail = null;
                try {
                    //將子任務狀態改成執行中
                    childTask.setStatus(TaskStatus.DOING);
                    childTask.setNodeId(config.getNodeId());
                    //開始子任務
                    childDetail = taskRepository.startChild(childTask,detail);
                    //使用樂觀鎖更新下狀態,不然這裏可能和恢複線程產生併發問題
                    int n = taskRepository.updateWithVersion(childTask);
                    if (n > 0) {
                        //再從數據庫取一下,避免上面update修改后version不同步
                        childTask = taskRepository.get(childTask.getId());
                        //執行子任務
                        childTask.getInvokor().invoke();
                        //完成子任務
                        finish(childTask, childDetail);
                    }
                } catch (Exception e) {
                    logger.error("execute child task error,cause by:{}", e);
                    try {
                        taskRepository.fail(childTask, childDetail, e.getCause().getMessage());
                    } catch (Exception e1) {
                        logger.error("fail child task error,cause by:{}", e);
                    }
                }
            }
            /**
             * 當有子任務時完成子任務后再完成父任務
             */
            taskRepository.finish(task,detail);

        }

    }

    /**
     * 添加任務
     * @param name
     * @param cronExp
     * @param invockor
     * @return
     * @throws Exception
     */
    public long addTask(String name, String cronExp, Invocation invockor) throws Exception {
        Task task = new Task(name,cronExp,invockor);
        return taskRepository.insert(task);
    }

    /**
     * 添加子任務
     * @param pid
     * @param name
     * @param cronExp
     * @param invockor
     * @return
     * @throws Exception
     */
    public long addChildTask(Long pid,String name, String cronExp, Invocation invockor) throws Exception {
        Task task = new Task(name,cronExp,invockor);
        task.setPid(pid);
        return taskRepository.insert(task);
    }

    /**
     * 立即執行任務,就是設置一下延時為0加入任務隊列就好了,這個可以外部直接調用
     * @param taskId
     * @return
     */
    public boolean startNow(Long taskId) {
        Task task = taskRepository.get(taskId);
        task.setStatus(TaskStatus.DOING);
        taskRepository.update(task);
        DelayItem<Task> delayItem = new DelayItem<Task>(0L, task);
        return taskQueue.offer(delayItem);
    }

    /**
     * 立即停止正在執行的任務,留給外部調用的方法
     * @param taskId
     * @return
     */
    public boolean stopNow(Long taskId) {
        Task task = taskRepository.get(taskId);
        if(task == null) {
            return false;
        }
        /**
         * 該任務不是正在執行,直接修改task狀態為已完成即可
         */
        if(task.getStatus() != TaskStatus.DOING) {
            task.setStatus(TaskStatus.STOP);
            taskRepository.update(task);
            return true;
        }
        /**
         * 該任務正在執行,使用節點配合心跳發布停用通知
         */
        int n = nodeRepository.updateNotifyInfo(NotifyCmd.STOP_TASK,String.valueOf(taskId));
        return n > 0;
    }

    /**
     * 立即停止正在執行的任務,這個不需要自己調用,是給心跳線程調用
     * @param taskId
     * @return
     */
    public boolean stop(Long taskId) {
        Task task = taskRepository.get(taskId);
        /**
         * 不是自己節點的任務,本節點不能執行停用
         */
        if(task == null || !config.getNodeId().equals(task.getNodeId())) {
            return false;
        }
        /**
         * 拿到正在執行任務的future,然後強制停用,並刪除doingFutures的任務
         */
        Future future = doingFutures.get(taskId);
        boolean flag =  future.cancel(true);
        if(flag) {
            doingFutures.remove(taskId);
            /**
             * 修改狀態為已停用
             */
            task.setStatus(TaskStatus.STOP);
            taskRepository.update(task);
        }
        /**
         * 重置通知信息,避免重複執行停用通知
         */
        nodeRepository.resetNotifyInfo(NotifyCmd.STOP_TASK);
        return flag;
    }
}

  好吧,其實實現很簡單,關鍵在於思路,不BB了,詳細代碼見: 在下告辭!

  

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

3c收購,鏡頭 收購有可能以全新價回收嗎?

台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

賣IPHONE,iPhone回收,舊換新!教你怎麼賣才划算?