微服務中的Kafka與Micronaut

今天,我們將通過Apache Kafka主題構建一些彼此異步通信的微服務。我們使用Micronaut框架,它為與Kafka集成提供專門的庫。讓我們簡要介紹一下示例係統的體繫結構。我們有四個微型服務:訂單服務行程服務司機服務乘客服務這些應用程序的實現非常簡單。它們都有內存存儲,並連接到同一個Kafka實例。

我們系統的主要目標是為客戶安排行程。訂單服務應用程序還充當網關。它接收來自客戶的請求,保存歷史記錄並將事件發送到orders主題。所有其他微服務都在監聽orders這個主題,並處理order-service發送的訂單。每個微服務都有自己的專用主題,其中發送包含更改信息的事件。此類事件由其他一些微服務接收。架構如下圖所示。

在閱讀本文之前,有必要熟悉一下Micronaut框架。您可以閱讀之前的一篇文章,該文章描述了通過REST API構建微服務通信的過程:。

1 運行Kafka

要在本地機器上運行Apache Kafka,我們可以使用它的Docker映像。最新的鏡像是由共享的。在啟動Kafka容器之前,我們必須啟動kafka所用使用的ZooKeeper服務器。如果在Windows上運行Docker,其虛擬機的默認地址是192.168.99.100它還必須設置為Kafka容器的環境。

ZookeeperKafka容器都將在同一個網絡中啟動。在docker中運行Zookeeper以zookeeper的名稱提供服務,並在暴露2181端口。Kafka容器需要在環境變量使用KAFKA_ZOOKEEPER_CONNECT的地址。

$ docker network create kafka
$ docker run -d --name zookeeper --network kafka -p 2181:2181 wurstmeister/zookeeper
$ docker run -d --name kafka -p 9092:9092 --network kafka --env KAFKA_ADVERTISED_HOST_NAME=192.168.99.100 --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 wurstmeister/kafka

2 添加micronaut-kafka依賴

使用Kafka構建的microaut應用程序可以在HTTP服務器存在的情況下啟動,也可以在不存在HTTP服務器的情況下啟動。要啟用Micronaut Kafka,需要添加micronaut-kafka庫到依賴項。如果您想暴露HTTP API,您還應該添加micronaut-http-server-netty:

<dependency>
    <groupId>io.micronaut.configuration</groupId>
    <artifactId>micronaut-kafka</artifactId>
</dependency>
<dependency>
    <groupId>io.micronaut</groupId>
    <artifactId>micronaut-http-server-netty</artifactId>
</dependency>

3 構建訂單微服務

訂單微服務是唯一一個啟動嵌入式HTTP服務器並暴露REST API的應用程序。這就是為什麼我們可以為Kafka提供內置Micronaut健康檢查。要做到這一點,我們首先應該添加micronaut-management依賴:

<dependency>
    <groupId>io.micronaut</groupId>
    <artifactId>micronaut-management</artifactId>
</dependency>

為了方便起見,我們將通過在application.yml中定義以下配置來啟用所有管理端點並禁用它們的HTTP身份驗證。

endpoints:
  all:
    enabled: true
    sensitive: false

現在,可以在地址欄下使用health check我們的示例應用程序還將暴露添加新訂單列出所有以前創建的訂單的簡單REST API下面是暴露這些端點的Micronaut控制器實現:

@Controller("orders")
public class OrderController {

    @Inject
    OrderInMemoryRepository repository;
    @Inject
    OrderClient client;

    @Post
    public Order add(@Body Order order) {
        order = repository.add(order);
        client.send(order);
        return order;
    }

    @Get
    public Set<Order> findAll() {
        return repository.findAll();
    }

}

每個微服務都使用內存存儲庫實現。以下是訂單微服務(Order-Service)中的存儲庫實現:

@Singleton
public class OrderInMemoryRepository {

    private Set<Order> orders = new HashSet<>();

    public Order add(Order order) {
        order.setId((long) (orders.size() + 1));
        orders.add(order);
        return order;
    }

    public void update(Order order) {
        orders.remove(order);
        orders.add(order);
    }

    public Optional<Order> findByTripIdAndType(Long tripId, OrderType type) {
        return orders.stream().filter(order -> order.getTripId().equals(tripId) && order.getType() == type).findAny();
    }

    public Optional<Order> findNewestByUserIdAndType(Long userId, OrderType type) {
        return orders.stream().filter(order -> order.getUserId().equals(userId) && order.getType() == type)
                .max(Comparator.comparing(Order::getId));
    }

    public Set<Order> findAll() {
        return orders;
    }

}

內存存儲庫存儲Order對象實例。Order對象還被發送到名為orders的Kafka主題。下面是Order類的實現:

public class Order {

    private Long id;
    private LocalDateTime createdAt;
    private OrderType type;
    private Long userId;
    private Long tripId;
    private float currentLocationX;
    private float currentLocationY;
    private OrderStatus status;
    
    // ... GETTERS AND SETTERS
}

4 使用Kafka異步通信

現在,讓我們想一個可以通過示例係統實現的用例—— 添加新的行程

我們創建了OrderType.NEW_TRIP類型的新訂單。在此之後,(1) 訂單服務創建一個訂單並將其發送到orders主題。訂單由三個微服務接收: 司機服務乘客服務行程服務
(2)所有這些應用程序都處理這個新訂單。乘客服務應用程序檢查乘客帳戶上是否有足夠的資金。如果沒有,它就取消了行程,否則它什麼也做不了。司機服務正在尋找最近可用的司機,(3) 行程服務創建和存儲新的行程。司機服務行程服務都將事件發送到它們的主題( drivers, trips),其中包含相關更改的信息。

每一個事件可以被其他microservices訪問,例如,(4) 行程服務偵聽來自司機服務的事件,以便為行程分配一個新的司機

下圖說明了在添加新的行程時,我們的微服務之間的通信過程。

現在,讓我們繼續討論實現細節。

4.1 發送訂單

首先,我們需要創建Kafka客戶端,負責向主題發送消息。我們創建的一個接口,命名為OrderClient,為它添加@KafkaClient並聲明用於發送消息的一個或多個方法。每個方法都應該通過@Topic註解設置目標主題名稱。對於方法參數,我們可以使用三個註解@KafkaKey@Body@Header@KafkaKey用於分區,這是我們的示例應用程序所需要的。在下面可用的客戶端實現中,我們只使用@Body註解。

@KafkaClient
public interface OrderClient {

    @Topic("orders")
    void send(@Body Order order);

}

4.2 接收訂單

一旦客戶端發送了一個訂單,它就會被監聽orders主題的所有其他微服務接收。下面是司機服務中的監聽器實現。監聽器類OrderListener應該添加@KafkaListener註解。我們可以聲明groupId作為一個註解參數,以防止單個應用程序的多個實例接收相同的消息。然後,我們聲明用於處理傳入消息的方法。與客戶端方法相同,應該通過@Topic註解設置目標主題名稱,因為我們正在監聽Order對象,所以應該使用@Body註解——與對應的客戶端方法相同。

@KafkaListener(groupId = "driver")
public class OrderListener {

    private static final Logger LOGGER = LoggerFactory.getLogger(OrderListener.class);

    private DriverService service;

    public OrderListener(DriverService service) {
        this.service = service;
    }

    @Topic("orders")
    public void receive(@Body Order order) {
        LOGGER.info("Received: {}", order);
        switch (order.getType()) {
            case NEW_TRIP -> service.processNewTripOrder(order);
        }
    }

}

4.3 發送到其他主題

現在,讓我們看一下司機服務中的processNewTripOrder方法。DriverService注入兩個不同的Kafka Client
bean: OrderClientDriverClient當處理新訂單時,它將試圖尋找與發送訂單的乘客最近的司機。找到他之後,將該司機的狀態更改為UNAVAILABLE,並將帶有Driver對象的事件發送到drivers主題。

@Singleton
public class DriverService {

    private static final Logger LOGGER = LoggerFactory.getLogger(DriverService.class);

    private DriverClient client;
    private OrderClient orderClient;
    private DriverInMemoryRepository repository;

    public DriverService(DriverClient client, OrderClient orderClient, DriverInMemoryRepository repository) {
        this.client = client;
        this.orderClient = orderClient;
        this.repository = repository;
    }

    public void processNewTripOrder(Order order) {
        LOGGER.info("Processing: {}", order);
        Optional<Driver> driver = repository.findNearestDriver(order.getCurrentLocationX(), order.getCurrentLocationY());
        driver.ifPresent(driverLocal -> {
            driverLocal.setStatus(DriverStatus.UNAVAILABLE);
            repository.updateDriver(driverLocal);
            client.send(driverLocal, String.valueOf(order.getId()));
            LOGGER.info("Message sent: {}", driverLocal);
        });
    }
    
    // ...
}

這是Kafka Client司機服務中的實現,用於向driver主題發送消息。因為我們需要將DriverOrder關聯起來,所以我們使用@Header註解的orderId參數。沒有必要把它包括到Driver類中,將其分配給監聽器端的正確行程。

@KafkaClient
public interface DriverClient {

    @Topic("drivers")
    void send(@Body Driver driver, @Header("Order-Id") String orderId);

}

4.4 服務間通信

DriverListener收到@KafkaListener行程服務中聲明。它監聽傳入到trip主題。接收方法的參數和客戶端發送方法的類似,如下所示:

@KafkaListener(groupId = "trip")
public class DriverListener {

    private static final Logger LOGGER = LoggerFactory.getLogger(OrderListener.class);

    private TripService service;

    public DriverListener(TripService service) {
        this.service = service;
    }

    @Topic("drivers")
    public void receive(@Body Driver driver, @Header("Order-Id") String orderId) {
        LOGGER.info("Received: driver->{}, header->{}", driver, orderId);
        service.processNewDriver(driver);
    }

}

最後一步,將orderId查詢到的行程TripdriverId關聯,這樣整個流程就結束。

@Singleton
public class TripService {

    private static final Logger LOGGER = LoggerFactory.getLogger(TripService.class);

    private TripInMemoryRepository repository;
    private TripClient client;

    public TripService(TripInMemoryRepository repository, TripClient client) {
        this.repository = repository;
        this.client = client;
    }


    public void processNewDriver(Driver driver, String orderId) {
        LOGGER.info("Processing: {}", driver);
        Optional<Trip> trip = repository.findByOrderId(Long.valueOf(orderId));
        trip.ifPresent(tripLocal -> {
            tripLocal.setDriverId(driver.getId());
            repository.update(tripLocal);
        });
    }
    
    // ... OTHER METHODS

}

5 跟蹤

我們可以使用Micronaut Kafka輕鬆地啟用分佈式跟蹤。首先,我們需要啟用和配置Micronaut跟蹤。要做到這一點,首先應該添加一些依賴項:

<dependency>
    <groupId>io.micronaut</groupId>
    <artifactId>micronaut-tracing</artifactId>
</dependency>
<dependency>
    <groupId>io.zipkin.brave</groupId>
    <artifactId>brave-instrumentation-http</artifactId>
    <scope>runtime</scope>
</dependency>
<dependency>
    <groupId>io.zipkin.reporter2</groupId>
    <artifactId>zipkin-reporter</artifactId>
    <scope>runtime</scope>
</dependency>
<dependency>
    <groupId>io.opentracing.brave</groupId>
    <artifactId>brave-opentracing</artifactId>
</dependency>
<dependency>
    <groupId>io.opentracing.contrib</groupId>
    <artifactId>opentracing-kafka-client</artifactId>
    <version>0.0.16</version>
    <scope>runtime</scope>
</dependency>

我們還需要在application.yml配置文件中,配置Zipkin的追蹤的地址等

tracing:
  zipkin:
    enabled: true
    http:
      url: http://192.168.99.100:9411
    sampler:
      probability: 1

在啟動應用程序之前,我們必須運行Zipkin容器:

$ docker run -d --name zipkin -p 9411:9411 openzipkin/zipkin

6 總結

在本文中,您將了解通過Apache Kafka使用異步通信構建微服務架構的過程。我已經向大家展示了Microaut Kafka庫最重要的特性,它允許您輕鬆地聲明Kafka主題的生產者和消費者,為您的微服務啟用健康檢查分佈式跟蹤我已經為我們的系統描述了一個簡單的場景的實現,包括根據客戶的請求添加一個新的行程。本示例係統的整體實現,請查看GitHub上的

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理
【其他文章推薦】

※公開收購3c價格,不怕被賤賣!

※想知道網站建置網站改版該如何進行嗎?將由專業工程師為您規劃客製化網頁設計後台網頁設計

※不管是台北網頁設計公司台中網頁設計公司,全省皆有專員為您服務

※Google地圖已可更新顯示潭子電動車充電站設置地點!!

※帶您來看台北網站建置台北網頁設計,各種案例分享

RNN-LSTM講解-基於tensorflow實現

cnn卷積神經網絡在前面已經有所了解了,目前博主也使用它進行了一個圖像分類問題,基於kaggle裏面的food-101進行的圖像識別,識別率有點感人,基於數據集的關係,大致來說還可行。
下面我就繼續學習rnn神經網絡。

rnn神經網絡(遞歸/循環神經網絡)模式如下:

我們在處理文字等問題的時候,我們的輸入會把上一個時間輸出的數據作為下一個時間的輸入數據進行處理。
例如:我們有一段話,我們將其分詞,得到t個數據,我們分別將每一個詞傳入到x0,x1….xt裏面,當x0傳入后,會得到一個結果h0,同時我們會將處理后的數據傳入到下個時間,到下個時間的時候,我們會再傳入一個數據x1,同時還有上一個時間處理后的數據,將這兩個數據進行整合計算,然後再向下傳輸,一直到結束。
rnn本質來說還是一個bp迴路,不過他只是比bp網絡多一個環節,即它可以反饋上一時間點處理后的數據。

上圖細化如下:

rnn實際上還是存在梯度消失的問題,因此如上圖所示,當我們在第一個時間輸入的數據,可能在很久之後他就已經梯度消失了(影響很小),因此我們使用lstm(long short trem memory)

上圖有三個門:輸入門    忘記門   輸出門
1.輸入門:通過input * g 來判斷是否輸入,如果不輸入就為0,輸入就是0,以此判斷信號是否輸入
2.忘記門:這個信號是否需要衰減多少,可能為50%,衰減是根據信號來判斷。
3.輸入門:通過判斷是否輸出,或者輸出多少,例如輸出50%。
因此上述圖可化為:

可以看出,這三個門,所有得影響都是關於輸入和上一個數據得輸出來進行計算的。

可以看下圖:

我們使用lstm得話,通過三個門決定信號是否向下傳輸,傳輸多少都可以控制,是否傳入信號,輸出信息都進行控制。

下面我們還是用tensorflow實現,數據集還是手寫数字,雖然rnn主要是用在文字和語言上,但是它依舊可以用在圖片上。
下面給出代碼:

```python
import tensorflow as tf
from tensorflow.contrib import rnn
from tensorflow.examples.tutorials.mnist import  input_data
mnist=input_data.read_data_sets("MNNIST_data",one_hot=True)

#輸入圖片為 28*28
n_inputs=28#輸入一行,一行有28個像素
max_time=28#一共28行,所以為28*28
lstm_size=100#100個隱藏單元
batch_size=50
n_classes=10
n_batch=mnist.train.num_examples//batch_size#計算一共多少批次

#這裏none表示第一個維度可以是任意長度
x=tf.placeholder(tf.float32,[None,784])

y=tf.placeholder(tf.float32,[None,10])

#初始化權值
weights=tf.Variable(tf.truncated_normal([lstm_size,n_classes],stddev=0.1))
#初始化偏置值
biases=tf.Variable(tf.constant(0.1,shape=[n_classes]))

##定義Rnn 網絡
def RNN(X,weights,biases):
    inputs=tf.reshape(X,[-1,max_time,n_inputs])
    #定義lstm基本cell
    lstm_cell = rnn.BasicLSTMCell(lstm_size)
    #lstm_cell=tf.contrib.rnn.core_rnn_cell.BasicLSTMCell(lstm_size)
    outputs,final_state=tf.nn.dynamic_rnn(lstm_cell,inputs,dtype=tf.float32)
    results=tf.nn.softmax(tf.matmul(final_state[1],weights)+biases)
    return results
prediction=RNN(x,weights,biases)
#損失函數
cross_entropy=tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=prediction,labels=y))
#優化器
train_step=tf.train.AdamOptimizer(1e-4).minimize(cross_entropy)
#保存結果
correct_prediction=tf.equal(tf.argmax(y,1),tf.argmax(prediction,1))

accuracy=tf.reduce_mean(tf.cast(correct_prediction,tf.float32))

init=tf.global_variables_initializer()

with tf.Session() as sess:
    sess.run(init)
    for epoch in range(6):
        for batch in range(n_batch):
            batch_xs,batch_ys=mnist.train.next_batch(batch_size)
            sess.run(train_step,feed_dict={x:batch_xs,y:batch_ys})

        acc=sess.run(accuracy,feed_dict={x:mnist.test.images,y:mnist.test.labels})
        print("iter:"+str(epoch)+"testing accuracy"+str(acc))

 

“`
運行結果如下:

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

※公開收購3c價格,不怕被賤賣!

※想知道網站建置網站改版該如何進行嗎?將由專業工程師為您規劃客製化網頁設計後台網頁設計

※不管是台北網頁設計公司台中網頁設計公司,全省皆有專員為您服務

※Google地圖已可更新顯示潭子電動車充電站設置地點!!

※帶您來看台北網站建置台北網頁設計,各種案例分享

WPF 修改屏幕DPI,會觸發控件重新加載Unload/Load

修改屏幕DPI,會觸發控件的Unloaded/Loaded

現象/重現案例

這裏簡單介紹下,修改屏幕DPI,觸發Unloaded/Loaded的神奇案例

1. 我們新建一個窗口,添加一個UserControl1,然後在UserControl1中添加UserControl2

 1 <Window x:Class="WPFUnloadedTriggerTest.MainWindow"
 2         xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
 3         xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
 4         xmlns:d="http://schemas.microsoft.com/expression/blend/2008"
 5         xmlns:mc="http://schemas.openxmlformats.org/markup-compatibility/2006"
 6         xmlns:local="clr-namespace:WPFUnloadedTriggerTest"
 7         mc:Ignorable="d"
 8         Title="MainWindow" Height="450" Width="800">
 9     <local:UserControl1></local:UserControl1>
10 </Window>
11 ------------------------------我是分隔線-----------------------------------
12 <UserControl x:Class="WPFUnloadedTriggerTest.UserControl1"
13              xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
14              xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
15              xmlns:mc="http://schemas.openxmlformats.org/markup-compatibility/2006" 
16              xmlns:d="http://schemas.microsoft.com/expression/blend/2008" 
17              xmlns:local="clr-namespace:WPFUnloadedTriggerTest"
18              mc:Ignorable="d" 
19              d:DesignHeight="450" d:DesignWidth="800">
20     <local:UserControl2></local:UserControl2>
21 </UserControl>

View Code

2. 显示窗口后,修改DPI比例

3. 設置完后,會觸發Unloaded/Loaded重新加載

Unloaded的觸發順序是UserControl1–>UserControl2,Window並不會觸發Unloaded事件!

是不是詭異?我們繼續。。。

 4. Window我們添加一個ControlTemplate模塊

1     <Window.Template>
2         <ControlTemplate TargetType="Window">
3             <Border>
4                 <AdornerDecorator>
5                     <ContentPresenter />
6                 </AdornerDecorator>
7             </Border>
8         </ControlTemplate>
9     </Window.Template>

 再重複2、3步驟,Unloaded的觸發順序變了:

觸發UserControl2的Unloaded,Window、UserControl1並不會觸發Unloaded事件!

問題分析

第2步驟中修改DPI后,Unloaded事件不一定觸發。如何必現呢?

將窗口靠近到任務欄上方,再修改文本比例。

 我們查看調用堆棧,貌似是系統給窗口發送消息然後調用BroadcastUnloadedEvent事件,觸發Unload

 所以應該是修改DPI,窗口寬高超出了當前屏幕尺寸範圍,系統對UserControl的視覺樹進行重新加載布局。

至於窗口沒有觸發Unloaded、以及在窗口添加以上模塊後下一級子控件也沒有觸發Unloaded事件的原因,暫不了解

而對WPF-Unloaded/Loaded的已知情況如下:

  • FrameworkElement, 第一次加載显示時,會觸發Loaded。元素被釋放時,會觸發Unloaded。窗口Show/Close時,視覺樹變化都會觸發加載事件
  • MenuItem, 在FrameworkElement基礎上,每次和隱藏MenuItem時,會額外觸發Load/Unloaded
  • TabControl,當你選中一個tabItem時會觸發Loaded,當你取消選中一個tabItem時會觸發Unloaded,所以切換Tab時必定有一個Loaded一個Unloaded。
  • Expander,每次被Expanded擴展時會引發Loaded,但當隱藏時不會引發Unloaded。

 以上問題的解決方案?暫時沒有解決方案,只有規避措施,不要過於依賴於Unload/Loaded,而且使用了Unload/Loaded時也要添加註銷機制,防止重入

我在github提了個issue:

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

※公開收購3c價格,不怕被賤賣!

※想知道網站建置網站改版該如何進行嗎?將由專業工程師為您規劃客製化網頁設計後台網頁設計

※不管是台北網頁設計公司台中網頁設計公司,全省皆有專員為您服務

※Google地圖已可更新顯示潭子電動車充電站設置地點!!

※帶您來看台北網站建置台北網頁設計,各種案例分享

Java學習筆記 線程池使用及詳解

有點笨,參考了好幾篇大佬們寫的文章才整理出來的筆記….

字面意思上解釋,線程池就是裝有線程的池,我們可以把要執行的多線程交給線程池來處理,和連接池的概念一樣,通過維護一定數量的線程池來達到多個線程的復用。

好處

多線程產生的問題

一般我們使用到多線程的編程的時候,需要通過new Thread(xxRunnable).start()創建並開啟線程,我們可以使用多線程來達到最優效率(如多線程下載)。

但是,線程不是越多就越好,線程過多,創建和銷毀就會消耗系統的資源,也不方便管理。

除此之外,多線程還會造成併發問題,線程併發數量過多,搶佔系統資源從而導致阻塞。

線程池優點

我們將線程放入線程池,由線程池對線程進行管理,可以對線程池中緩衝的線程進行復用,這樣,就不會經常去創建和銷毀線程了,從而省下了系統的資源。

線程池能夠有效的控制線程併發的數量,能夠解決多線程造成的併發問題。

除此之外,線程池還能夠對線程進行一定的管理,如延時執行、定時循環執行的策略等

線程池實現

線程池的實現,主要是通過這個類ThreadPoolExecutor,其的構造參數非常長,我們先大概了解,之後再進行詳細的介紹。

public ThreadPoolExecutor(int corePoolSize,
    int maximumPoolSize,long keepAliveTime,
    TimeUnit unit,BlockingQueue workQueue,
    RejectedExecutionHandler handler)
  • corePoolSize:線程池核心線程數量
  • maximumPoolSize:線程池最大線程數量
  • keepAliverTime:當活躍線程數大於核心線程數時,空閑的多餘線程最大存活時間
  • unit:存活時間的單位
  • workQueue:存放線程的工作隊列
  • handler:超出線程範圍和隊列容量的任務的處理程序(拒絕策略)

這裏大概簡單說明一下線程池的運行流程:

當線程被添加到線程池中,如果線程池中的當前的線程數量等於線程池定義的最大核心線程數量(corePoolSize)了,此線程就會別放入線程的工作隊列(workQueue)中,等待線程池的調用。

Java提供了一個工具類Excutors,方便我們快速創建線程池,其底層也是調用了ThreadPoolExecutor

不過阿里巴巴Java規範中強制要求我們應該通過ThreadPoolExecutor來創建自己的線程池,使用Excutors容易造成OOM問題。

所以,我們先從Excutors開始學習,之後在對ThreadPoolExecutor進行詳細的講解

Excutors

由於Excutors是工具類,所以下面的介紹的都是其的靜態方法,如果是比較線程數目比較少的小項目,可以使用此工具類來創建線程池

PS:把線程提交給線程池中,有兩種方法,一種是submit,另外一種則是execute

兩者的區別:

  1. execute沒有返回值,如果不需要知道線程的結果就使用execute方法,性能會好很多。
  2. submit返回一個Future對象,如果想知道線程結果就使用submit提交,而且它能在主線程中通過Future的get方法捕獲線程中的異常

線程池可以接收兩種的參數,一個為Runnable對象,另外則是Callable對象

Callable是JDK1.5時加入的接口,作為Runnable的一種補充,允許有返回值,允許拋出異常。

主要的幾個靜態方法:

方法 說明
newFixedThreadPool(int nThreads) 創建固定大小的線程池
newSingleThreadExecutor() 創建只有一個線程的線程池
newCachedThreadPool() 創建一個不限線程數上限的線程池,任何提交的任務都將立即執行
newScheduledThreadPool(int nThreads) 創建一個支持定時、周期性或延時任務的限定線程數目的線程池
newSingleThreadScheduledExecutor() 創建一個支持定時、周期性或延時任務的單個線程的線程池

1.newSingleThreadExecutor

創建一個單線程化的線程池,它只會用唯一的工作線程來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先級)執行,我們可以使用它來達到控制線程順序執行。

控制進程順序執行:

Thread thread1 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println("這是線程1");
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
Thread thread2 = new Thread(new Runnable() {
    @Override
    public void run() {
        try {
            System.out.println("這是線程2");
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
});
Thread thread3 = new Thread(new Runnable() {
    @Override
    public void run() {
        try {
            System.out.println("這是線程3");
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
});
//創建線程池對象
ExecutorService executorService = Executors.newSingleThreadExecutor();
//把線程添加到線程池中
executorService.submit(thread1);
executorService.submit(thread2);
executorService.submit(thread3);

之後出現的結果就是按照順序輸出

2.newFixedThreadPool

創建一個定長線程池,可控制線程最大併發數,超出的線程會在隊列中等待。定長線程池的大小最好根據系統資源進行設置。如Runtime.getRuntime().availableProcessors()

3.newCachedThreadPool

創建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程,線程池為無限大,當執行第二個任務時第一個任務已經完成,會復用執行第一個任務的線程,而不用每次新建線程。

代碼:

//創建了一個自定義的線程
public class MyThread extends Thread {
    private int index;

    public MyThread(int index) {
        this.index = index;
    }

    @Override
    public void run() {
        System.out.println(index+" 當前線程"+Thread.currentThread().getName());
    }
}

//創建緩存線程池
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
    executorService.execute(new MyThread(i));
    try {
        //這裏模擬等待時間,等待線程池復用回收線程
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

可以看到結果都是使用的同一個線程

4.newScheduledThreadPool

創建一個定長線程池,支持定時、周期性或延時任務執行

延遲1s后啟動線程:

ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
scheduledExecutorService.schedule(new MyThread(1),1, TimeUnit.SECONDS);

ThreadPoolExecutor

構造方法

上面提到的那個構造方法其實只是ThreadPoolExecutor類中的一個,ThreadPoolExecutor類中存在有四種不同的構造方法,主要區別就是參數不同。

//五個參數的構造函數
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue)

//六個參數的構造函數-1
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory)

//六個參數的構造函數-2
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler)

//七個參數的構造函數
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

首先,有個概念需要明白,線程池的最大線程數(線程總數,maximumPoolSize)= 核心線程數(corePoolSize)+非核心線程數

  • corePoolSize:線程池核心線程數量
  • maximumPoolSize:線程池最大線程數量
  • keepAliverTime:當活躍線程數大於核心線程數時,空閑的多餘線程最大存活時間
  • unit:存活時間的單位
  • workQueue:存放線程的工作隊列
  • handler:超出線程範圍和隊列容量的任務的處理程序(拒絕策略)

核心線程和非核心線程有什麼區別呢?

核心線程是永遠不會被線程池丟棄回收(即使核心線程沒有工作),非核心線程則是超過一定時間(keepAliverTime)則就會被丟棄

workQueue

當所有的核心線程都在工作時,新添加的任務會被添加到這個隊列中等待處理,如果隊列滿了,則新建非核心線程執行任務

1.SynchronousQueue:這個隊列接收到任務的時候,會直接提交給線程處理,而不保留它,如果所有線程都在工作怎麼辦?那就新建一個線程來處理這個任務!所以為了保證不出現線程數達到了maximumPoolSize而不能新建線程的錯誤,使用這個類型隊列的時候,maximumPoolSize一般指定成Integer.MAX_VALUE,即無限大

2.LinkedBlockingQueue:這個隊列接收到任務的時候,如果當前線程數小於核心線程數,則新建線程(核心線程)處理任務;如果當前線程數等於核心線程數,則進入隊列等待。由於這個隊列沒有最大值限制,即所有超過核心線程數的任務都將被添加到隊列中,這也就導致了maximumPoolSize的設定失效,因為總線程數永遠不會超過corePoolSize

3.ArrayBlockingQueue:可以限定隊列的長度,接收到任務的時候,如果沒有達到corePoolSize的值,則新建線程(核心線程)執行任務,如果達到了,則入隊等候,如果隊列已滿,則新建線程(非核心線程)執行任務,又如果總線程數到了maximumPoolSize,並且隊列也滿了,則發生錯誤

4.DelayQueue:隊列內元素必須實現Delayed接口,這就意味着你傳進去的任務必須先實現Delayed接口。這個隊列接收到任務時,首先先入隊,只有達到了指定的延時時間,才會執行任務

拒絕策略:

拒絕策略 拒絕行為
AbortPolicy 拋出RejectedExecutionException異常(默認)
DiscardPolicy 不處理,丟棄掉
DiscardOldestPolicy 丟棄執行隊列中等待最久的一個任務,嘗試為新來的任務騰出位置
CallerRunsPolicy 直接由提交任務者執行這個任務

兩種方法設置拒絕策略:

//ThreadPoolExecutor對象的setRejectedExecutionHandler方法設置
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 5, 60, TimeUnit.SECONDS, queue);
threadPool.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
//構造方法進行設置,省略

線程池默認的拒絕行為是AbortPolicy,也就是拋出RejectedExecutionHandler異常,該異常是非受檢異常,很容易忘記捕獲。

如果不關心任務被拒絕的事件,可以將拒絕策略設置成DiscardPolicy,這樣多餘的任務會悄悄的被忽略。

ThreadFactory

一個接口類,用來對線程進行設置,需要實現newThread(Runnable r)方法

官方的文檔說明:

newThread此方法一般來初始化線程的優先級(priority),名字(name),守護進程(daemon)或線程組(ThreadGroup)

簡單的例子(讓某個類實現ThreadFactory接口):

@Override
public Thread newThread(Runnable r) {
    Thread thread = new Thread(r);
    thread.setDaemon(true);
    return thread;
}

線程池獲取執行結果

PS:把線程提交給線程池中,有兩種方法,一種是submit,另外一種則是execute

兩者的區別:

  1. execute沒有返回值,如果不需要知道線程的結果就使用execute方法,性能會好很多。
  2. submit返回一個Future對象,如果想知道線程結果就使用submit提交,而且它能在主線程中通過Future的get方法捕獲線程中的異常

線程池可以接收兩種的參數,一個為Runnable對象,另外則是Callable對象

Callable是JDK1.5時加入的接口,作為Runnable的一種補充,允許有返回值,允許拋出異常。

線程池的處理結果、以及處理過程中的異常都被包裝到Future中,並在調用Future.get()方法時獲取,執行過程中的異常會被包裝成ExecutionException,submit()方法本身不會傳遞結果和任務執行過程中的異常。

獲取執行結果的代碼可以這樣寫:

ExecutorService executorService = Executors.newFixedThreadPool(4);
Future<Object> future = executorService.submit(new Callable<Object>() {
        @Override
        public Object call() throws Exception {
            //該異常會在調用Future.get()時傳遞給調用者
            throw new RuntimeException("exception in call~");
        }
    });
    
try {
    //獲得返回結果
    Object result = future.get();
    
    
} catch (InterruptedException e) {
  // interrupt
} catch (ExecutionException e) {
  // exception in Callable.call()
  e.printStackTrace();
}

線程池運行流程

一個形象的比喻說明線程池的流程:

規定:

  1. 線程池比作成一家公司
  2. 公司的最大員工數為maximumPoolSize
  3. 最大正式員工數為coolPoolSize(核心線程的總數)
  4. 最大員工數(maximumPoolSize) = 最大正式員工(coolPoolSize)和臨時工(非核心線程)
  5. 單子(任務)可看做為線程
  6. 隊列使用的是ArrayBlockingQueue
  7. 一個員工只能幹一個任務

最開始的時候,公司是沒有一名員工。之後,公司接到了單子(任務),這個時候,公司才去找員工(創建核心線程並讓線程開始執行),這個時候找到的員工就是正式員工了。

公司的聲譽越來越好,於是來了更多的單子,公司繼續招人,直到正式員工數量達到最大的正式員工的數量(核心線程數量已達到最大)

於是,多出來的單子就暫時地存放在了隊列中,都在排隊,等待正式員工們把手頭的工作做完之後,就從隊列中依次取出單子繼續工作。

某天,來了一個新單子,但是這個時候隊列已經滿了,公司為了自己的信譽和聲譽着想,不得已只能去找臨時工(創建非核心線程)來幫忙開始進行工作(負責新單子)

在此之後,又來了新單子,公司繼續去招臨時工為新來的單子工作,直到正式工和臨時工的數量已經達到了公司最大員工數。

這個時候,公司沒有辦法了,只能拒絕新來的單子了(拒絕策略)

此時,正式工和臨時工都是在加班加點去從隊列中取出任務來工作,終於某一天,隊列的已經沒有單子了,市場發展不好,單子越來越少,臨時工很久都不工作了(非核心線程超過了最大存活時間keepAliveTime),公司就把這些臨時工解僱了,直到剩下只有正式員工。

PS:如果也想要解僱正式員工(銷毀核心線程),可以設置ThreadPoolExecutor對象的的allowCoreThreadTimeOut這個屬性為true

個人理解,可能不是很正確,僅供參考!

線程池關閉

方法 說明
shutdown() 不再接受新的任務,之前提交的任務等執行結束再關閉線程池
shutdownNow() 不再接受新的任務,試圖停止池中的任務再關閉線程池,返回所有未處理的線程list列表。

總結

如果是小的Java程序,可以使用Excutors,如果是服務器程序,則使用ThreadPoolExecutor進行自定義線程池的創建

參考鏈接:

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

USB CONNECTOR 掌控什麼技術要點? 帶您認識其相關發展及效能

※高價3c回收,收購空拍機,收購鏡頭,收購 MACBOOK-更多收購平台討論專區

※評比前十大台北網頁設計台北網站設計公司知名案例作品心得分享

收購3c瘋!各款手機、筆電、相機、平板,歡迎來詢價!

※智慧手機時代的來臨,RWD網頁設計已成為網頁設計推薦首選

.NET Core 3.0 使用Nswag生成Api文檔和客戶端代碼

摘要

在前後端分離、Restful API盛行的年代,完美的接口文檔,成了交流的紐帶。在項目中引入Swagger (也稱為OpenAPI),是種不錯的選擇,它可以讓接口數據可視化。下文將會演示

  • 利用Nswag如何生成Api文檔

  • 利用NSwagStudio如何生成客戶端代碼,並且進行測試

什麼是 Swagger/OpenAPI?

Swagger 是一個與語言無關的規範,用於描述 REST API。Swagger 項目已捐贈給 OpenAPI 計劃,現在它被稱為開放 API。這兩個名稱可互換使用,但 OpenAPI 是首選。它允許計算機和人員了解服務的功能,而無需直接訪問實現(源代碼、網絡訪問、文檔)。其中一個目標是盡量減少連接取消關聯的服務所需的工作量。另一個目標是減少準確記錄服務所需的時間。

Nswag VS Swashbuckle?

.NET Swagger 實現類庫有兩個比較流行:

  • Swashbuckle.AspNetCore 是一個開源項目,用於生成 ASP.NET Core Web API 的 Swagger 文檔。

  • NSwag 是另一個用於生成 Swagger 文檔並將 Swagger UI 或 ReDoc 集成到 ASP.NET Core Web API 中的開源項目。此外,NSwag 還提供了為 API 生成 C# 和 TypeScript 客戶端代碼的方法。

 

為什麼我在.NET core3.0中選擇NSwag呢,NSwag比較活躍,一直在更新,功能也很強大,可以完美的代替Swashbuckle.AspNetCore具體可以參考:https://github.com/aspnet/AspNetCore.Docs/issues/4258

一、利用Nswag生成Api文檔

步驟
  1. 創建Asp.NET Core Api項目,並且集成NSwag

  2. 配置項目

  3. 運行項目

創建Asp.NET Core Api項目,並且集成NSwag

我們將簡單的創建一個ASP.NET core API項目。將其命名為“WebAPIwithSwg”。基於.NETcore3.0

安裝nuget包NSwag.AspNetCore

接下來,在Startup.cs文件中配置Nswag服務和中間件。

在ConfigureServices方法中添加服務

  public void ConfigureServices(IServiceCollection services)
        {
            services.AddControllers();
            services.AddSwaggerDocument(); //註冊Swagger 服務
        }
在Configure方法中添加Nswag中間件
 public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
        {
            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
            }
            app.UseRouting();
            app.UseAuthorization();
            app.UseEndpoints(endpoints =>
            {
                endpoints.MapControllers();
            });
            app.UseOpenApi(); //添加swagger生成api文檔(默認路由文檔 /swagger/v1/swagger.json)
            app.UseSwaggerUi3();//添加Swagger UI到請求管道中(默認路由: /swagger).
        }
配置項目

運行項目

右鍵項目在瀏覽器中查看,查看swagger UI需要在url後面添加“/swagger”。本示例http://localhost:54117/swagger

二、利用NSwagStudio如何生成客戶端代碼,並且進行測試
提供GUI界面是NSwag的一大特點,只需要下載安裝NSwagStudio,即可生成客戶端代碼。
步驟
  1. 現在安裝NSwagStudio

  2. NSwagStudio配置,生成客戶端代碼

  3. 創建測試客戶端項目

下載安裝NSwagStudio

下載NSwag Studio http://rsuter.com/Projects/NSwagStudio/installer.php 安裝之後打開 NSwag Studio 如圖

NSwagStudio配置,生成客戶端代碼

選擇runtime,我選擇的是NETCore30,切換OpenAPI/Swagger Specification ,在Specification UR輸入你的Swagger.json路徑,本示例:http://localhost:54117/swagger/v1/swagger.json輸入路徑之後,點擊 create local copy 按鈕獲取json。

接下配置來生成客戶端代碼。我們首先選擇csharp client”複選框,然後勾選掉 “Inject Http Client via Constructor (life cycle is managed by caller)” ,最後設置下輸出路徑 點擊生成文件(Generate Files)。步驟如下

到此客戶端代碼已經自動生成。

查看生成的部分代碼


public async System.Threading.Tasks.Task<System.Collections.Generic.ICollection<WeatherForecast>> GetAsync(System.Threading.CancellationToken cancellationToken)
        {
            var urlBuilder_ = new System.Text.StringBuilder();
            urlBuilder_.Append(BaseUrl != null ? BaseUrl.TrimEnd('/') : "").Append("/WeatherForecast");
    
            var client_ = new System.Net.Http.HttpClient();
            try
            {
                using (var request_ = new System.Net.Http.HttpRequestMessage())
                {
                    request_.Method = new System.Net.Http.HttpMethod("GET");
                    request_.Headers.Accept.Add(System.Net.Http.Headers.MediaTypeWithQualityHeaderValue.Parse("application/json"));
    
                    PrepareRequest(client_, request_, urlBuilder_);
                    var url_ = urlBuilder_.ToString();
                    request_.RequestUri = new System.Uri(url_, System.UriKind.RelativeOrAbsolute);
                    PrepareRequest(client_, request_, url_);
    
                    var response_ = await client_.SendAsync(request_, System.Net.Http.HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false);
                    try
                    {
                        var headers_ = System.Linq.Enumerable.ToDictionary(response_.Headers, h_ => h_.Key, h_ => h_.Value);
                        if (response_.Content != null && response_.Content.Headers != null)
                        {
                            foreach (var item_ in response_.Content.Headers)
                                headers_[item_.Key] = item_.Value;
                        }
    
                        ProcessResponse(client_, response_);
    
                        var status_ = ((int)response_.StatusCode).ToString();
                        if (status_ == "200") 
                        {
                            var objectResponse_ = await ReadObjectResponseAsync<System.Collections.Generic.ICollection<WeatherForecast>>(response_, headers_).ConfigureAwait(false);
                            return objectResponse_.Object;
                        }
                        else
                        if (status_ != "200" && status_ != "204")
                        {
                            var responseData_ = response_.Content == null ? null : await response_.Content.ReadAsStringAsync().ConfigureAwait(false); 
                            throw new ApiException("The HTTP status code of the response was not expected (" + (int)response_.StatusCode + ").", (int)response_.StatusCode, responseData_, headers_, null);
                        }
            
                        return default(System.Collections.Generic.ICollection<WeatherForecast>);
                    }
                    finally
                    {
                        if (response_ != null)
                            response_.Dispose();
                    }
                }
            }
            finally
            {
                if (client_ != null)
                    client_.Dispose();
            }
        }
創建測試客戶端項目

創建一個控製程序項目,命名“WebApiClient”。

把自動生成的類“WeatherForecastClient”添加到客戶端項目中,然後安裝Newtonsoft

最後在Main函數中添加測試代碼,開始使用Api。

 static async System.Threading.Tasks.Task Main(string[] args)
        {

            var weatherForecastClient = new WeatherForecastClient();
            //gets all values from the API
            var allValues = await weatherForecastClient.GetAsync();
            Console.WriteLine("Hello World!");
        }

運行客戶端應用程序,進行調用api

當然如果需要調試api項目內部代碼,可以設置斷點,進入一步一步的調試

小結:NSwag 功能遠不止這些,本篇文章演示了如何生成api文檔和自動生成的api客戶端代碼方便我們調試,也可以作為對應的sdk。

參考:微軟官方文檔—https://docs.microsoft.com/zh-cn/aspnet/core/tutorials/getting-started-with-nswag?view=aspnetcore-2.2&tabs=visual-studio

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

※公開收購3c價格,不怕被賤賣!

※想知道網站建置網站改版該如何進行嗎?將由專業工程師為您規劃客製化網頁設計後台網頁設計

※不管是台北網頁設計公司台中網頁設計公司,全省皆有專員為您服務

※Google地圖已可更新顯示潭子電動車充電站設置地點!!

※帶您來看台北網站建置台北網頁設計,各種案例分享