類加載器 – 類的加載、連接與初始化

類的加載、連接與初始化

概述

在Java代碼中,類型的加載、連接與初始化過程都是在程序運行期間完成的

  • 類型:可以理解為一個class
  • 加載:查找並加載類的二進制數據,最常見的情況是將已經編譯完成的類的class文件從磁盤加載到內存中
  • 連接:確定類型與類型之間的關係,對於字節碼的相關處理
    • 驗證:確保被加載的類的正確性
    • 準備:為類的靜態變量分配內存,並將其初始化為默認值。但是在到達初始化之前,類變量都沒有初始化為真正的初始值
    • 解析:在類型的常量池中尋找類、接口、字段和方法的符號引用,把這些符號引用轉換為直接引用的過程
  • 初始化:為類的靜態變量賦予正確的初始值
  • 使用:比如創建對象,調用類的方法等
  • 卸載:類從內存中銷毀

理解:public static int number = 666;

上面這段代碼,在類加載的連接階段,為對象number分配內存,並初始化為0;然後再初始化階段在賦予正確的初始值:666

類的使用方式

Java程序對類的使用方式可分為兩種

  • 主動使用
    • 創建類的實例
    • 訪問某個類或接口的靜態變量,或者對靜態變量賦值
    • 調用類的靜態方法
    • 反射
    • 初始化類的子類
    • Java虛擬機啟動時被標明為啟動類的類(包含main方法)
    • JDK1.7開始提供的動態語言支持(java.lang.invoke.MethodHandle實例的解析結果REF_getStatic,REF_putStatic,REF_invokeStatic句柄對應的類沒有初始化,則初始化)
  • 被動使用
    • 除了主動使用的七種情況之外,其他使用Java類的方法都被看作是對類的被動使用,都不會導致類的初始化

所有的Java虛擬機實現必須在每個類或接口被Java程序“首次主動使用”時才初始化他們

代碼理解

示例一:類的加載連接和初始化過程

代碼一

public class Test01 {
    public static void main(String[] args) {
        System.out.println(Child01.str);
    }
}

class Father01 {
    public static String str = "做一個好人!";
    static {
        System.out.println("Father01 static block");
    }
}

class Child01 extends Father01 {
    static {
        System.out.println("Child01 static block");
    }
}

運行結果做一個好人!

Father01 static block
做一個好人!

代碼二

public class Test01 {
    public static void main(String[] args) {
        System.out.println(Child01.str2);
    }
}

class Father01 {
    public static String str = "做一個好人!";

    static {
        System.out.println("Father01 static block");
    }
}

class Child01 extends Father01 {
    public static String str2 = "做一個好人!";
    static {
        System.out.println("Child01 static block");
    }
}

運行結果

Father01 static block
Child01 static block
做一個好人!

分析:

  • 代碼一中,我們通過子類調用父類中的str,這個str是在父類被定義的,對Father01主動使用,沒有主動使用Child01,故Child01的靜態代碼塊沒有執行,父類的靜態代碼塊被執行了。 -> 對於靜態字段來說,只有直接定義了該字段的類才會被初始化。
  • 代碼二中,對Child01主動使用;根據主動使用的7種情況,調動類的子類時,其所有的父類都會被先初始化,所以Father01會被初始化。 -> 當一個類初始化時,要求其父類全部都已經初始化完畢了。

以上驗證的是類的初始化情況,那麼如何驗證類的加載情況呢,可以通過在啟動的時候配置虛擬機參數:-XX:+TraceClassLoading查看

運行代碼一,查看輸出結果,可以看見控制台打印了very多的日誌,第一個加載的是java.lang.Object類(不管加載哪個類,他的父類一定是Object類),後面是加載的一系列jdk的類,他們都位於rt包下。往下查看,可以看見Loaded classloader.Child01,說明即使沒有初始化Child01,但是程序依然加載了Child01類。

[Opened /usr/local/jdk1.8/jre/lib/rt.jar]
[Loaded java.lang.Object from /usr/local/jdk1.8/jre/lib/rt.jar]
...
[Loaded java.lang.Void from /usr/local/jdk1.8/jre/lib/rt.jar]
[Loaded classloader.Father01 from file:/home/fanxuan/Study/java/jvmStudy/out/production/jvmStudy/]
[Loaded classloader.Child01 from file:/home/fanxuan/Study/java/jvmStudy/out/production/jvmStudy/]
Father01 static block
做一個好人!
[Loaded java.lang.Shutdown from /usr/local/jdk1.8/jre/lib/rt.jar]
[Loaded java.lang.Shutdown$Lock from /usr/local/jdk1.8/jre/lib/rt.jar]

拓展:JVM參數介紹

因為前一章節使用了JVM參數,所以對其做一下簡單的介紹

  • 所有的JVM參數都是以-XX:開頭的
  • 如果形式是:-XX:+<option>,表示開啟option選項
  • 如果形式是:-XX:-<option>,表示關閉option選項
  • 如果形式是:-XX:<option>=<value>,表示將option選項的值設置為value

示例二:常量的本質含義

public class Test02 {
    public static void main(String[] args) {
        System.out.println(Father02.str);
    }
}

class Father02{
    public static final String str = "做一個好人!";

    static {
        System.out.println("Father02 static block");
    }
}

執行結果

做一個好人!

分析

可以看見,此段代碼並沒有初始化Father02類。這是因為final表示的是一個常量,在編譯階段常量就會被存入到調用這個常量的方法所在的類的常量池當中,本質上,調用類並沒有直接引用到定義常量的類,因此並不會觸發定義常量的類的初始化。在本代碼中,常量str會被存入到Test02的常量池中,之後Test02與Father02沒有任何關係,甚至可以刪除Father02的class文件。

我們反編譯一下Test02類

Compiled from "Test02.java"
public class classloader.Test02 {
  public classloader.Test02();
    Code:
       0: aload_0
       1: invokespecial #1                  // Method java/lang/Object."<init>":()V
       4: return

  public static void main(java.lang.String[]);
    Code:
       0: getstatic     #2                  // Field java/lang/System.out:Ljava/io/PrintStream;
       3: ldc           #4                  // String 做一個好人!
       5: invokevirtual #5                  // Method java/io/PrintStream.println:(Ljava/lang/String;)V
       8: return
}

第一塊是Test02類的構造方法,第二塊是我們要看的main方法。可以看見3: ldc #4 // String 做一個好人!,此時這個值已經是確定無疑的做一個好人!了,而不是Father02.str,證實了上面說的在編譯階段常量就會被存入到調用這個常量的方法所在的類的常量池當中

拓展:助記符

因前一章節涉及到了助記符,所以介紹下本章節涉及到的助記符及擴展

  • ldc:表示將int、float或String類型的常量值常量池中推送至棧頂
  • bipush:表示將單字節(-128 -至 127)的常量推送至棧頂
  • sipush:表示將一個短整形(-32768 至 32767)的常量推送至棧頂
  • iconst_1:表示將int類型的1推送至棧頂(這類助記符只有iconst_m1 – iconst_5七個)

示例三:編譯期常量與運行期常量的區別

public class Test03 {
    public static void main(String[] args) {
        System.out.println(Father03.str);
    }
}

class Father03 {
    public static final String str = UUID.randomUUID().toString();

    static {
        System.out.println("Father03 static block");
    }
}

運行結果

Father03 static block
a60c5db4-2673-4ffc-a9f0-2dbe53fae583

分析

本代碼與示例二的區別在於str的值是在運行時確認的,而不是編譯時就確定好的,屬於運行期常量,而不是編譯期常量。當一個常量的值並非編譯期間確定的,那麼其值就不會被放到調用類的常量池中,這時在程序運行時,會導致主動使用這個常量所在的類,導致這個類被初始化。

示例四:數組創建本質

代碼一

public class Test04 {
    public static void main(String[] args) {
         Father04 father04_1 = new Father04();
        System.out.println("-----------");
        Father04 father04_2 = new Father04();
    }
}

class Father04 {
    static {
        System.out.println("Father04 static block");
    }
}

運行結果

Father04 static block
-----------

分析

  • 創建類的實例時,會初始化類
  • 所有的Java虛擬機實現必須在每個類或接口被Java程序“首次主動使用”時才初始化他們

代碼二

public class Test04 {
    public static void main(String[] args) {
        Father04[] father04s = new Father04[1];
        System.out.println(father04s.getClass());
    }
}

運行結果

class [Lclassloader.Father04;

分析

  • 創建數組對象不再主動使用的7種情況內,屬於被動使用,故不會初始化Father04
  • 打印father04s的類型為[Lclassloader.Father04,這是虛擬機在運行期生成的。 -> 對於數組示例來說,其類型是有JVM在運行期動態生成的,表示為[Lclassloader.Father04這種形式,動態生成的類型,其父類就是Object。
  • 對於數組來說,JavaDoc經常將構成數組的元素為Component,實際上就是將數組降低一個維度后的類型

反編譯一下:

  public static void main(java.lang.String[]);
    Code:
       0: iconst_1
       1: anewarray     #2                  // class classloader/Father04
       4: astore_1
       5: getstatic     #3                  // Field java/lang/System.out:Ljava/io/PrintStream;
       8: aload_1
       9: invokevirtual #4                  // Method java/lang/Object.getClass:()Ljava/lang/Class;
      12: invokevirtual #5                  // Method java/io/PrintStream.println:(Ljava/lang/Object;)V
      15: return
  • anewarray:表示創建一個引用類型(如類、接口、數組)的數組,並將其引用值值壓入棧頂
  • newarray:表示創建一個指定的原始類型(如int、float、char等)的數組,並將其引用值壓入棧頂

示例五:接口的加載與初始化

代碼一

public class Test05 {
    public static void main(String[] args) {
        System.out.println(Child05.j);
    }
}

interface Father05 {
    int i = 5;
}

interface Child05 extends Father05 {
    int j = 6;
}

運行結果

6

分析

  • 接口中定義的常量本身就是public、static、final的
  • 結果顯而易見,這時我們刪除掉Father05.class文件和Child05.class文件,程序依然可以正常運行
    • 接口中的常量本身就是final常量,會被加載到Test05的常量池中
    • 此時,Father05和Child05都不會被加載

代碼二

public class Test05 {
    public static void main(String[] args) {
        System.out.println(Child05.j);
    }
}

interface Father05 {
    int i = 5;
}

interface Child05 extends Father05 {
    int j = new Random().nextInt(8);
}

運行結果

6

將Father05.class文件刪除,運行結果

Exception in thread "main" java.lang.NoClassDefFoundError: classloader/Father05
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at classloader.Test05.main(Test05.java:15)
Caused by: java.lang.ClassNotFoundException: classloader.Father05
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 13 more

分析

  • 只有在真正使用到父接口的時候(如引用接口中所定義的常量時),才會加載初始化

代碼三

public class Test05 {
    public static void main(String[] args) {
        System.out.println(Child05.j);
    }
}

interface Father05 {
    Thread thread = new Thread() {
        {
            System.out.println("Father05 code block");
        }
    };
}

class Child05 implements Father05 {
    public static int j = 8;
}

運行結果

8

分析

  • 在初始化一個類時,並不會先初始化他所實現的接口

代碼四

public class Test05 {
    public static void main(String[] args) {
        System.out.println(Father05.thread);
    }
}

interface GrandFather {
    Thread thread = new Thread() {
        {
            System.out.println("GrandFather code block");
        }
    };
}

interface Father05 extends GrandFather{
    Thread thread = new Thread() {
        {
            System.out.println("Father05 code block");
        }
    };
}

運行結果

Father05 code block
Thread[Thread-0,5,main]

分析

  • 在初始化一個接口時,並不會先初始化他的父接口

示例六:類加載器準備階段和初始化階段

代碼一

public class Test06 {
    public static void main(String[] args) {
        Singleton singleton = Singleton.getInstance();

        System.out.println("i:" + Singleton.i);
        System.out.println("j:" + Singleton.j);
    }
}

class Singleton {
    public static int i;

    public static int j = 0;

    private static Singleton singleton = new Singleton();

    private Singleton() {
        i ++;
        j ++;
    }

    public static Singleton getInstance() {
        return singleton;
    }
}

運行結果

i:1
j:1

分析

首先Singleton.getInstance();進入SingletongetInstance方法,getInstance會返回Singleton的實例,Singleton的實例是new Singleton();出來的,因此調用了自定義的私有構造方法。在調用構造方法之前,給靜態變量賦值,i默認賦值為0,j顯式的賦值為0,經過構造函數之後,值都為1。

代碼二

public class Test06 {
    public static void main(String[] args) {
        Singleton singleton = Singleton.getInstance();

        System.out.println("i:" + Singleton.i);
        System.out.println("j:" + Singleton.j);
    }
}

class Singleton {
    public static int i;

    private static Singleton singleton = new Singleton();

    private Singleton() {
        i ++;
        j ++;
    }

    public static int j = 0;

    public static Singleton getInstance() {
        return singleton;
    }
}

運行結果

i:1
j:0

分析

程序主動使用了Singleton類,準備階段對類的靜態變量分配內存,賦予默認值,下面給出類在連接及初始化階段常量的值的變化

  • i : 0
  • singleton:null
  • j : 0
  • getInstance:初始化
    • i:0
    • singleton:調用構造函數
      • i:1
      • j:1
    • j:0【覆蓋了之前的1】

故返回的值i為1,j為0

深入解析類的加載、連接與初始化

類的加載

  • 將類的.class文件中的二進制數據讀入到內存中,將其放在運行時數據區的方法區內,然後再內存中創建一個java.lang.Class對象(規範並未說明Class對象位於哪裡,HotSpot虛擬機將其放在了方法區中)用來封裝類在方法區的數據結構
  • 加載.class文件的方式
    • 從本地系統直接加載
    • 通過網絡下載.class文件
    • 從zip等歸檔文件中加載.class文件
    • 從專有數據庫中提取.class文件
    • 將Java源代碼動態編譯為.class文件
  • 類的加載的最終產品是位於內存中的Class對象
  • Calss對象封裝了類在方法區內的數據結構,並且向Java程序員提供了訪問方法區內的數據結構的接口
  • 有兩種類型的類加載器
    • Java虛擬機自帶的類加載器
      • 根類加載器(Bootstrap):該加載器沒有父加載器。他負責加載虛擬機的核心類庫,如java.lang.*等。根類加載器從系統屬性sun.boot.class.path所指定的目錄中加載類庫。根類加載器的實現依賴於底層操作系統,呀沒有繼承java.lang.CalssLoader類
      • 擴展類加載器(Extension):父加載器為根加載器。從java.ext.dirs系統屬性所指定的目錄中加載類庫,或者從JDK的安裝目錄的jre\lib\ext子目錄(擴展目錄)下加載類庫,如果用戶創建的JAR文件放在這個目錄下,也會自動由擴展類加載器加載。擴展類加載器是純Java類,是java.lang.ClassLoader類的子類
      • 系統(應用)類加載器(System):父加載器為擴展加載器。從環境變量classpath或者系統屬性java.class.path所指定的目錄中加載類,是用戶自定義類加載器的默認父加載器。系統類加載器是純Java類,是java.lang.ClassLoader類的子類
    • 用戶自定義的類加載器
      • java.lang.ClassLoader的子類
      • 用戶可以定製類的加載方式
  • 類加載器並不需要等到某個類被“首次使用”時再加載他
  • JVM規範允許類加載器在預料某個類將要被使用時就預先加載他,如果在預先加載的過程中遇到了.class文件缺失或存在錯誤,類加載器必須在程序首次主動使用時才報告錯誤(LinkageError錯誤)。如果這個類一直沒有被程序主動使用,那麼類加載器就不會報告錯誤

類的連接

類被加載后,就進入連接階段。連接就是將已經讀入到內存中的類的二進制數據合併到虛擬機的運行時環境中去

類的驗證

類的驗證的內容

  • 類文件的結構檢查
  • 語義檢查
  • 字節碼驗證
  • 二進制兼容性驗證

類的準備

在準備階段,Java虛擬機為類的靜態變量分配內存,並設置默認的初始值。例如對於下面的Sample類,在準備階段,將為int類型的靜態變量i分配4個字節的內存空間,並且賦默認值0;為long類型的靜態變量j分配8個字節的內存空間,並賦予默認值0

public class Sample {
    private static int i = 8;
    private static long j = 8L;
    ......
}

類的初始化

在初始化階段,Java虛擬機執行類的初始化語句,為類的靜態變量賦予初始值。在程序中,靜態變量的初始化有兩種途徑:

  • 在靜態變量的聲明處初始化
  • 在靜態代碼塊中初始化

靜態變量的聲明語句,預計靜態代碼塊都被看作類的初始化語句,Java虛擬機會按照初始化語句在類文件中的先後順序來依次執行他們

類的初始化步驟

  • 假如這個類還沒有被加載和連接誒,需要先進行加載和連接
  • 假如類存在直接父類,並且這個父類還沒有被初始化,需要先初始化直接父類
  • 假如類中存在初始化語句,需要依次執行這些初始化語句

類的初始化時機

  • 當Java虛擬機初始化一個類時,要求他的所有父類都已經被初始化,但是這條規則並不適用於接口

    • 在初始化一個類時,並不會先初始化他所實現的接口
    • 在初始化一個接口時,並不會先初始化他的父接口

    因此,一個父接口並不會因為他的子接口或實現類的初始化而初始化,只有當程序首次使用特定接口的靜態變量時,才會導致該接口的初始化。代碼參照代碼理解-接口的初始化

  • 只有當程序訪問的靜態變量或者靜態方法確實在當前類或者當前接口中定義時,才認為是對類或接口的主動使用

  • 調用ClassLoader類的loadClass方法加載一個類,並不是對類的主動使用,不會導致類的初始化

拓展部分

類實例化

類的生命周期除了前文提到的加載、連接、初始化之外,還有類示例化,垃圾回收和對象終結

  • 為新的對象分配內存
  • 為實例變量賦予默認值
  • 為實例變量賦予正確的初始值
  • Java編譯器為它編譯的每一個類都至少生成一個實例初始化方法,在Java的class文件中,這個實例初始化方法被稱為<init>。針對源代碼中每一個類的構造方法,Java編譯器都產生一個<init>方法

類的卸載

  • 當一個類被加載、連接和初始化后,它的生命周期就開始了。當代表該類的Class對象不再被引用,即不可觸及時,Class對象就會結束生命周期,這個類在方法區內的數據也會被卸載,從而結束自己的生命周期
  • 一個類何時結束生命周期,取決於代表它的Class對象何時結束生命周期
  • 由Java虛擬機自帶的類加載器所加載的類,在虛擬機的生命周期中,始終不會被卸載。Java虛擬機本身會始終引用這些類加載器,而這類加載器則會始終引用它們所加載的類的Class對象,因此這些Class對象始終是可觸及的;由用戶自定義的類加載器所加載的類是可以被卸載的

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※帶您來了解什麼是 USB CONNECTOR  ?

※自行創業 缺乏曝光? 下一步”網站設計“幫您第一時間規劃公司的門面形象

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!

※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

go中的關鍵字-defer

1. defer的使用

  defer 延遲調用。我們先來看一下,有defer關鍵字的代碼執行順序:

1 func main() {
2     defer func() {
3         fmt.Println("1號輸出")
4     }()
5     defer func() {
6         fmt.Println("2號輸出")
7     }()
8 }

  輸出結果:

1 2號出來
2 1號出來

  結論:多個defer的執行順序是倒序執行(同入棧先進后出)。

  由例子可以看出來,defer有延遲生效的作用,先使用defer的語句延遲到最後執行。

1.1 defer與返回值之間的順序

 1 func defertest() int
 2 
 3 func main() {
 4     fmt.Println("main:", defertest())
 5 }
 6 
 7 func defertest() int {
 8     var i int
 9     defer func() {
10         i++
11         fmt.Println("defer2的值:", i)
12     }()
13     defer func() {
14         i++
15         fmt.Println("defer1的值:", i)
16     }()
17     return i
18 }

  輸出結果:

1 defer1的值: 1
2 defer2的值: 2
3 main: 0

  結論:return最先執行->return負責將結果寫入返回值中->接着defer開始執行一些收尾工作->最後函數攜帶當前返回值退出

   return的時候已經先將返回值給定義下來了,就是0,由於i是在函數內部聲明所以即使在defer中進行了++操作,也不會影響return的時候做的決定。

 1 func test() (i int)
 2 
 3 func main() {
 4     fmt.Println("main:", test())
 5 }
 6 
 7 func test() (i int) {
 8     defer func() {
 9         i++
10         fmt.Println("defer2的值:", i)
11     }()
12     defer func() {
13         i++
14         fmt.Println("defer1的值:", i)
15     }()
16     return i
17 }

  詳解:由於返回值提前聲明了,所以在return的時候決定的返回值還是0,但是後面兩個defer執行後進行了兩次++,將i的值變為2,待defer執行完后,函數將i值進行了返回。

2. defer定義和執行

 1 func test(i *int) int {
 2     return *i
 3 }
 4 
 5 func main(){
 6     var i = 1
 7 
 8     // defer定義的時候test(&i)的值就已經定了,是1,後面就不會變了
 9     defer fmt.Println("i1 ="  , test(&i))
10     i++
11 
12     // defer定義的時候test(&i)的值就已經定了,是2,後面就不會變了
13     defer fmt.Println("i2 ="  , test(&i))
14 
15     // defer定義的時候,i就已經確定了是一個指針類型,地址上的值變了,這裏跟着變
16     defer func(i *int) {
17         fmt.Println("i3 ="  , *i)
18     }(&i)
19 
20     // defer定義的時候i的值就已經定了,是2,後面就不會變了
21     defer func(i int) {
22         //defer 在定義的時候就定了
23         fmt.Println("i4 ="  , i)
24     }(i)
25 
26     defer func() {
27         // 地址,所以後續跟着變
28         var c = &i
29         fmt.Println("i5 ="  , *c)
30     }()
31 
32     // 執行了 i=11 后才調用,此時i值已是11
33     defer func() {
34         fmt.Println("i6 ="  , i)
35     }()
36 
37     i = 11
38 }

  結論:會先將defer后函數的參數部分的值(或者地址)給先下來【你可以理解為()裡頭的會先確定】,後面函數執行完,才會執行defer后函數的{}中的邏輯。

例題分析

 1 //例子1
 2 func f() (result int) {
 3     defer func() {
 4         result++
 5     }()
 6     return 0
 7 }
 8 //例子2
 9 func f() (r int) {
10      t := 5
11      defer func() {
12        t = t + 5
13      }()
14      return t
15 }
16 //例子3
17 func f() (r int) {
18     defer func(r int) {
19           r = r + 5
20     }(r)
21     return 1
22 }

  例1的正確答案不是0,例2的正確答案不是10,例3的正確答案不是6……

  這裏先說一下返回值。defer是在return之前執行的。這條規則毋庸置疑,但最重要的一點是要明白,return xxx這一條語句並不是一條原子指令!

  函數返回的過程:先給返回值賦值,然後調用defer表達式,最後才是返回到調用函數中。defer表達式可能會在設置函數返回值之後,且在返回到調用函數之前去修改返回值,使最終的函數返回值與你想象的不一致。

  return xxx 可被改寫成:

1 返回值 = xxx
2 調用defer函數
3 空的return

  所以例子也可以改寫成:

 1 //例1
 2 func f() (result int) {
 3      result = 0  //return語句不是一條原子調用,return xxx其實是賦值+ret指令
 4      func() { //defer被插入到return之前執行,也就是賦返回值和ret指令之間
 5          result++
 6      }()
 7      return
 8 }
 9 //例2
10 func f() (r int) {
11      t := 5
12      r = t //賦值指令
13      func() {        //defer被插入到賦值與返回之間執行,這個例子中返回值r沒被修改過
14          t = t + 5
15      }
16      return        //空的return指令
17 }
18 例3
19 func f() (r int) {
20      r = 1  //給返回值賦值
21      func(r int) {        //這裏改的r是傳值傳進去的r,不會改變要返回的那個r值
22           r = r + 5
23      }(r)
24      return        //空的return
25 }

  所以例1的結果是1,例2的結果是5,例3的結果是1.

3. defer內部原理

  從例子開始看:

1 packmage main
2 
3 import()
4 
5 func main() {
6   defer println("這是一個測試")
7 }

  反編譯一下看看:

 1   src $ go build -o test test.go
 2   src $ go tool objdump -s "main\.main" test
 1 TEXT main.main(SB) /Users/tushanshan/go/src/test3.go
 2   test3.go:5        0x104ea70        65488b0c2530000000      MOVQ GS:0x30, CX
 3   test3.go:5        0x104ea79        483b6110                CMPQ 0x10(CX), SP
 4   test3.go:5        0x104ea7d        765f                    JBE 0x104eade
 5   test3.go:5        0x104ea7f        4883ec28                SUBQ $0x28, SP
 6   test3.go:5        0x104ea83        48896c2420              MOVQ BP, 0x20(SP)
 7   test3.go:5        0x104ea88        488d6c2420              LEAQ 0x20(SP), BP
 8   test3.go:6        0x104ea8d        c7042410000000          MOVL $0x10, 0(SP)
 9   test3.go:6        0x104ea94        488d05e5290200          LEAQ go.func.*+57(SB), AX
10   test3.go:6        0x104ea9b        4889442408              MOVQ AX, 0x8(SP)
11   test3.go:6        0x104eaa0        488d05e6e50100          LEAQ go.string.*+173(SB), AX
12   test3.go:6        0x104eaa7        4889442410              MOVQ AX, 0x10(SP)
13   test3.go:6        0x104eaac        48c744241804000000      MOVQ $0x4, 0x18(SP)
14   test3.go:6        0x104eab5        e8b631fdff              CALL runtime.deferproc(SB)
15   test3.go:6        0x104eaba        85c0                    TESTL AX, AX
16   test3.go:6        0x104eabc        7510                    JNE 0x104eace
17   test3.go:7        0x104eabe        90                      NOPL
18   test3.go:7        0x104eabf        e83c3afdff              CALL runtime.deferreturn(SB)
19   test3.go:7        0x104eac4        488b6c2420              MOVQ 0x20(SP), BP
20   test3.go:7        0x104eac9        4883c428                ADDQ $0x28, SP
21   test3.go:7        0x104eacd        c3                      RET
22   test3.go:6        0x104eace        90                      NOPL
23   test3.go:6        0x104eacf        e82c3afdff              CALL runtime.deferreturn(SB)
24   test3.go:6        0x104ead4        488b6c2420              MOVQ 0x20(SP), BP
25   test3.go:6        0x104ead9        4883c428                ADDQ $0x28, SP
26   test3.go:6        0x104eadd        c3                      RET
27   test3.go:5        0x104eade        e8cd84ffff              CALL runtime.morestack_noctxt(SB)
28   test3.go:5        0x104eae3        eb8b                    JMP main.main(SB)
29   :-1               0x104eae5        cc                      INT $0x3
30   :-1               0x104eae6        cc                      INT $0x3
31   :-1               0x104eae7        cc                      INT $0x3

   編譯器將defer處理成兩個函數調用 deferproc 定義一個延遲調用對象,然後在函數結束前通過 deferreturn 完成最終調用。在defer出現的地方,插入了指令call runtime.deferproc,然後在函數返回之前的地方,插入指令call runtime.deferreturn。

內部結構

 1 //defer
 2 type _defer struct {
 3    siz     int32   // 參數的大小
 4    started bool    // 是否執行過了
 5    sp      uintptr // sp at time of defer
 6    pc      uintptr
 7    fn      *funcval 
 8    _panic  *_panic // defer中的panic
 9    link    *_defer // defer鏈表,函數執行流程中的defer,會通過 link這個 屬性進行串聯
10 }
11 //panic
12 type _panic struct {
13    argp      unsafe.Pointer // pointer to arguments of deferred call run during panic; cannot move - known to liblink
14    arg       interface{}    // argument to panic
15    link      *_panic        // link to earlier panic
16    recovered bool           // whether this panic is over
17    aborted   bool           // the panic was aborted
18 }
19 //g
20 type g struct {
21    _panic         *_panic // panic組成的鏈表
22    _defer         *_defer // defer組成的先進后出的鏈表,同棧
23 }

  因為 defer panic 都是綁定在運行的g上的,這裏也說一下g中與 defer panic相關的屬性

  再把defer, panic, recover放一起看一下:

1 func main() {
2     defer func() {
3         recover()
4     }()
5     panic("error")
6 }

  反編譯結果:

1 go build -gcflags=all="-N -l" main.go
2 go tool objdump -s "main.main" main
1 go tool objdump -s "main\.main" main | grep CALL
2   main.go:4             0x4548d0                e81b00fdff              CALL runtime.deferproc(SB)              
3   main.go:7             0x4548f2                e8b90cfdff              CALL runtime.gopanic(SB)                
4   main.go:4             0x4548fa                e88108fdff              CALL runtime.deferreturn(SB)            
5   main.go:3             0x454909                e85282ffff              CALL runtime.morestack_noctxt(SB)       
6   main.go:5             0x4549a6                e8d511fdff              CALL runtime.gorecover(SB)              
7   main.go:4             0x4549b5                e8a681ffff              CALL runtime.morestack_noctxt(SB)

  defer 關鍵字首先會調用 runtime.deferproc 定義一個延遲調用對象,然後再函數結束前,調用 runtime.deferreturn 來完成 defer 定義的函數的調用

  panic 函數就會調用 runtime.gopanic 來實現相關的邏輯

  recover 則調用 runtime.gorecover 來實現 recover 的功能

deferproc

  根據 defer 關鍵字後面定義的函數 fn 以及 參數的size,來創建一個延遲執行的 函數,並將這個延遲函數,掛在到當前g的 _defer 的鏈表上,下面是deferproc的實現:

 1 func deferproc(siz int32, fn *funcval) { // arguments of fn follow fn
 2    sp := getcallersp()
 3    argp := uintptr(unsafe.Pointer(&fn)) + unsafe.Sizeof(fn)
 4    callerpc := getcallerpc()
 5    // 獲取一個_defer對象, 並放入g._defer鏈表的頭部
 6    d := newdefer(siz)
 7      // 設置defer的fn pc sp等,後面調用
 8    d.fn = fn
 9    d.pc = callerpc
10    d.sp = sp
11    switch siz {
12    case 0:
13       // Do nothing.
14    case sys.PtrSize:
15       // _defer 後面的內存 存儲 argp的地址信息
16       *(*uintptr)(deferArgs(d)) = *(*uintptr)(unsafe.Pointer(argp))
17    default:
18       // 如果不是指針類型的參數,把參數拷貝到 _defer 的後面的內存空間
19       memmove(deferArgs(d), unsafe.Pointer(argp), uintptr(siz))
20    }
21    return0()
22 }

  通過newproc 獲取一個 _defer 的對象,並加入到當前g的 _defer 鏈表的頭部,然後再把參數或參數的指針拷貝到 獲取到的 _defer對象的後面的內存空間。

  再看看newdefer 的實現:

 1 func newdefer(siz int32) *_defer {
 2    var d *_defer
 3    // 根據 size 通過deferclass判斷應該分配的 sizeclass,就類似於 內存分配預先確定好幾個sizeclass,然後根據size確定sizeclass,找對應的緩存的內存塊
 4    sc := deferclass(uintptr(siz))
 5    gp := getg()
 6    // 如果sizeclass在既定的sizeclass範圍內,去g綁定的p上找
 7    if sc < uintptr(len(p{}.deferpool)) {
 8       pp := gp.m.p.ptr()
 9       if len(pp.deferpool[sc]) == 0 && sched.deferpool[sc] != nil {
10          // 當前sizeclass的緩存數量==0,且不為nil,從sched上獲取一批緩存
11          systemstack(func() {
12             lock(&sched.deferlock)
13             for len(pp.deferpool[sc]) < cap(pp.deferpool[sc])/2 && sched.deferpool[sc] != nil {
14                d := sched.deferpool[sc]
15                sched.deferpool[sc] = d.link
16                d.link = nil
17                pp.deferpool[sc] = append(pp.deferpool[sc], d)
18             }
19             unlock(&sched.deferlock)
20          })
21       }
22       // 如果從sched獲取之後,sizeclass對應的緩存不為空,分配
23       if n := len(pp.deferpool[sc]); n > 0 {
24          d = pp.deferpool[sc][n-1]
25          pp.deferpool[sc][n-1] = nil
26          pp.deferpool[sc] = pp.deferpool[sc][:n-1]
27       }
28    }
29    // p和sched都沒有找到 或者 沒有對應的sizeclass,直接分配
30    if d == nil {
31       // Allocate new defer+args.
32       systemstack(func() {
33          total := roundupsize(totaldefersize(uintptr(siz)))
34          d = (*_defer)(mallocgc(total, deferType, true))
35       })
36    }
37    d.siz = siz
38    // 插入到g._defer的鏈表頭
39    d.link = gp._defer
40    gp._defer = d
41    return d
42 }

  newdefer的作用是獲取一個_defer對象, 並推入 g._defer鏈表的頭部。根據size獲取sizeclass,對sizeclass進行分類緩存,這是內存分配時的思想,先去p上分配,然後批量從全局 sched上獲取到本地緩存,這種二級緩存的思想真的在go源碼的各個部分都有。

deferreturn

 1 func deferreturn(arg0 uintptr) {
 2    gp := getg()
 3    // 獲取g defer鏈表的第一個defer,也是最後一個聲明的defer
 4    d := gp._defer
 5    // 沒有defer,就不需要干什麼事了
 6    if d == nil {
 7       return
 8    }
 9    sp := getcallersp()
10    // 如果defer的sp與callersp不匹配,說明defer不對應,有可能是調用了其他棧幀的延遲函數
11    if d.sp != sp {
12       return
13    }
14    // 根據d.siz,把原先存儲的參數信息獲取並存儲到arg0裏面
15    switch d.siz {
16    case 0:
17       // Do nothing.
18    case sys.PtrSize:
19       *(*uintptr)(unsafe.Pointer(&arg0)) = *(*uintptr)(deferArgs(d))
20    default:
21       memmove(unsafe.Pointer(&arg0), deferArgs(d), uintptr(d.siz))
22    }
23    fn := d.fn
24    d.fn = nil
25    // defer用過了就釋放了,
26    gp._defer = d.link
27    freedefer(d)
28    // 跳轉到執行defer
29    jmpdefer(fn, uintptr(unsafe.Pointer(&arg0)))
30 }

freedefer

  釋放defer用到的函數,應該跟調度器、內存分配的思想是一樣的。

 1 func freedefer(d *_defer) {
 2    // 判斷defer的sizeclass
 3    sc := deferclass(uintptr(d.siz))
 4    // 超出既定的sizeclass範圍的話,就是直接分配的內存,那就不管了
 5    if sc >= uintptr(len(p{}.deferpool)) {
 6       return
 7    }
 8    pp := getg().m.p.ptr()
 9    // p本地sizeclass對應的緩衝區滿了,批量轉移一半到全局sched
10    if len(pp.deferpool[sc]) == cap(pp.deferpool[sc]) {
11       // 使用g0來轉移
12       systemstack(func() {
13          var first, last *_defer
14          for len(pp.deferpool[sc]) > cap(pp.deferpool[sc])/2 {
15             n := len(pp.deferpool[sc])
16             d := pp.deferpool[sc][n-1]
17             pp.deferpool[sc][n-1] = nil
18             pp.deferpool[sc] = pp.deferpool[sc][:n-1]
19             // 先將需要轉移的那批defer對象串成一個鏈表
20             if first == nil {
21                first = d
22             } else {
23                last.link = d
24             }
25             last = d
26          }
27          lock(&sched.deferlock)
28          // 把這個鏈表放到sched.deferpool對應sizeclass的鏈表頭
29          last.link = sched.deferpool[sc]
30          sched.deferpool[sc] = first
31          unlock(&sched.deferlock)
32       })
33    }
34    // 清空當前要釋放的defer的屬性
35    d.siz = 0
36    d.started = false
37    d.sp = 0
38    d.pc = 0
39    d.link = nil
40 
41    pp.deferpool[sc] = append(pp.deferpool[sc], d)
42 }

gopanic

 1 func gopanic(e interface{}) {
 2    gp := getg()
 3 
 4    var p _panic
 5    p.arg = e
 6    p.link = gp._panic
 7    gp._panic = (*_panic)(noescape(unsafe.Pointer(&p)))
 8 
 9    atomic.Xadd(&runningPanicDefers, 1)
10    // 依次執行 g._defer鏈表的defer對象
11    for {
12       d := gp._defer
13       if d == nil {
14          break
15       }
16 
17       // If defer was started by earlier panic or Goexit (and, since we're back here, that triggered a new panic),
18       // take defer off list. The earlier panic or Goexit will not continue running.
19       // 正常情況下,defer執行完成之後都會被移除,既然這個defer沒有移除,原因只有兩種: 1. 這個defer裏面引發了panic 2. 這個defer裏面引發了 runtime.Goexit,但是這個defer已經執行過了,需要移除,如果引發這個defer沒有被移除是第一個原因,那麼這個panic也需要移除,因為這個panic也執行過了,這裏給panic增加標誌位,以待後續移除
20       if d.started {
21          if d._panic != nil {
22             d._panic.aborted = true
23          }
24          d._panic = nil
25          d.fn = nil
26          gp._defer = d.link
27          freedefer(d)
28          continue
29       }
30       d.started = true
31 
32       // Record the panic that is running the defer.
33       // If there is a new panic during the deferred call, that panic
34       // will find d in the list and will mark d._panic (this panic) aborted.
35       // 把當前的panic 綁定到這個defer上面,defer裏面有可能panic,這種情況下就會進入到 上面d.started 的邏輯裏面,然後把當前的panic終止掉,因為已經執行過了 
36       d._panic = (*_panic)(noescape(unsafe.Pointer(&p)))
37       // 執行defer.fn
38       p.argp = unsafe.Pointer(getargp(0))
39       reflectcall(nil, unsafe.Pointer(d.fn), deferArgs(d), uint32(d.siz), uint32(d.siz))
40       p.argp = nil
41 
42       // reflectcall did not panic. Remove d.
43       if gp._defer != d {
44          throw("bad defer entry in panic")
45       }
46       // 解決defer與panic的綁定關係,因為 defer函數已經執行完了,如果有panic或Goexit就不會執行到這裏了
47       d._panic = nil
48       d.fn = nil
49       gp._defer = d.link
50 
51       // trigger shrinkage to test stack copy. See stack_test.go:TestStackPanic
52       //GC()
53 
54       pc := d.pc
55       sp := unsafe.Pointer(d.sp) // must be pointer so it gets adjusted during stack copy
56       freedefer(d)
57       // panic被recover了,就不需要繼續panic了,繼續執行剩餘的代碼
58       if p.recovered {
59          atomic.Xadd(&runningPanicDefers, -1)
60 
61          gp._panic = p.link
62          // Aborted panics are marked but remain on the g.panic list.
63          // Remove them from the list.
64          // 從panic鏈表中移除aborted的panic,下面解釋
65          for gp._panic != nil && gp._panic.aborted {
66             gp._panic = gp._panic.link
67          }
68          if gp._panic == nil { // must be done with signal
69             gp.sig = 0
70          }
71          // Pass information about recovering frame to recovery.
72          gp.sigcode0 = uintptr(sp)
73          gp.sigcode1 = pc
74          // 調用recovery, 恢復當前g的調度執行
75          mcall(recovery)
76          throw("recovery failed") // mcall should not return
77       }
78    }
79      // 打印panic信息
80    preprintpanics(gp._panic)
81      // panic
82    fatalpanic(gp._panic) // should not return
83    *(*int)(nil) = 0      // not reached
84 }

  看下裏面gp._panic.aborted 的作用:

 1 func main() {
 2    defer func() { // defer1
 3       recover()
 4    }()
 5    panic1()
 6 }
 7 
 8 func panic1() {
 9    defer func() {  // defer2
10       panic("error1") // panic2
11    }()
12    panic("error")  // panic1
13 }

  執行順序詳解:

  • 當執行到 panic("error") 時

  g._defer鏈表: g._defer->defer2->defer1

  g._panic鏈表:g._panic->panic1 

  • 當執行到 panic("error1") 時 

  g._defer鏈表: g._defer->defer2->defer1

  g._panic鏈表:g._panic->panic2->panic1

  • 繼續執行到 defer1 函數內部,進行recover()
    此時會去恢復 panic2 引起的 panic, panic2.recovered = true,應該順着g._panic鏈表繼續處理下一個panic了,但是我們可以發現 panic1 已經執行過了,這也就是下面的代碼的邏輯了,去掉已經執行過的panic
1 for gp._panic != nil && gp._panic.aborted {
2    gp._panic = gp._panic.link
3 }

panic的邏輯:

  程序在遇到panic的時候,就不再繼續執行下去了,先把當前panic 掛載到 g._panic 鏈表上,開始遍歷當前g的g._defer鏈表,然後執行_defer對象定義的函數等,如果 defer函數在調用過程中又發生了 panic,則又執行到了 gopanic函數,最後,循環打印所有panic的信息,並退出當前g。然而,如果調用defer的過程中,遇到了recover,則繼續進行調度(mcall(recovery))。

recovery

 1 func recovery(gp *g) {
 2    // Info about defer passed in G struct.
 3    sp := gp.sigcode0
 4    pc := gp.sigcode1
 5    // Make the deferproc for this d return again,
 6    // this time returning 1.  The calling function will
 7    // jump to the standard return epilogue.
 8    // 記錄defer返回的sp pc
 9    gp.sched.sp = sp
10    gp.sched.pc = pc
11    gp.sched.lr = 0
12    gp.sched.ret = 1
13    // 重新恢復執行調度
14    gogo(&gp.sched)
15 }

gorecover

  gorecovery 僅僅只是設置了 g._panic.recovered 的標誌位

 1 func gorecover(argp uintptr) interface{} {
 2    gp := getg()
 3    p := gp._panic
 4    // 需要根據 argp的地址,判斷是否在defer函數中被調用
 5    if p != nil && !p.recovered && argp == uintptr(p.argp) {
 6       // 設置標誌位,上面gopanic中會對這個標誌位做判斷
 7       p.recovered = true
 8       return p.arg
 9    }
10    return nil
11 }

goexit

  當手動調用 runtime.Goexit() 退出的時候,defer函數也會執行:

 1 func Goexit() {
 2     // Run all deferred functions for the current goroutine.
 3     // This code is similar to gopanic, see that implementation
 4     // for detailed comments.
 5     gp := getg()
 6   // 遍歷defer鏈表
 7     for {
 8         d := gp._defer
 9         if d == nil {
10             break
11         }
12     // 如果 defer已經執行過了,與defer綁定的panic 終止掉
13         if d.started {
14             if d._panic != nil {
15                 d._panic.aborted = true
16                 d._panic = nil
17             }
18             d.fn = nil
19       // 從defer鏈表中移除
20             gp._defer = d.link
21       // 釋放defer
22             freedefer(d)
23             continue
24         }
25     // 調用defer內部函數
26         d.started = true
27         reflectcall(nil, unsafe.Pointer(d.fn), deferArgs(d), uint32(d.siz), uint32(d.siz))
28         if gp._defer != d {
29             throw("bad defer entry in Goexit")
30         }
31         d._panic = nil
32         d.fn = nil
33         gp._defer = d.link
34         freedefer(d)
35         // Note: we ignore recovers here because Goexit isn't a panic
36     }
37   // 調用goexit0,清除當前g的屬性,重新進入調度
38     goexit1()
39 }

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

※評比前十大台北網頁設計台北網站設計公司知名案例作品心得分享

※智慧手機時代的來臨,RWD網頁設計已成為網頁設計推薦首選

前蘋果資深副總 傳接手電動車計畫

日前國外媒體《The Information》才報導,蘋果的電動車計畫可能順延至 2021 年;而《華爾街日報》引述知情人士的說法,曾任蘋果資深副總裁的Bob Mansfield 將接掌電動車計畫。

熟知蘋果電動車計畫Project Titan 的內部人士向《華爾街日報》透露,曾任技術部門資深副總裁的Bob Mansfield 將接手這項計畫。不過,目前蘋果官方與Bob Mansfield 本人都不願對於報導內容發表任何評論。

Bob Mansfield 於1999 年加入蘋果,在已故執行長Steve Jobs 底下出任管理階層;過去他曾主導硬體開發工程,產品包括MacBook Air、iMac、iPad 等。蘋果曾在2012 年宣布Bob Mansfield 將要退休,但後來又被執行長Tim Cook 說服、高薪留任公司;2013 年開始他從經營團隊名單隱身,官方僅表示他負責特別專案項目,並直接向Tim Cook 彙報。

Bob Mansfield 與像是設計長Jonathan Ive 等高層一向較少在公開場合露面,而在產品發表會或精心製作的產品影片當中可見他的身影。不過本月份開始,蘋果員工發現負責電動車計畫的資深經理轉而向Bob Mansfield 彙報。

當重點產品iPhone 銷售成長放緩、影響營收與獲利後,新的發展計畫成了蘋果迫切需要的項目。該公司雖未公開承認正在開發一款電動車,不過國外媒體陸續報導,已有數百名員工加入專案,並且聘請來自汽車產業的電池技術與自動駕駛專家,這項計畫可望成為未來推動蘋果成長的發展項目之一。

(本文授權自《》──〈〉)

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※帶您來了解什麼是 USB CONNECTOR  ?

※自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!

※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

中國國產電動汽車續航超500km 遠超特斯拉

由中國中能東道集團自主研發,與四川野馬汽車公司聯合生產的「野馬u能純電動汽車」在大連進行了路演。行駛502公里,仍有餘電22%,這遠遠優於領先國際先進水準的特斯拉(400公里)和比亞迪(260公里),其續航能力是目前中國國內普通純電動汽車的3倍。  
  該純電動汽車不依賴專用充電站或充電樁,220v民用電即可完成充電。該車已進入國家工信部目錄並實現量產。中能東道集團已規劃建設5個電動汽車「動力總成核新部件產業基地」,可實現年產50萬套目標。   2014年大連市政府專門出臺檔要求2016年全市公交、公務、郵政、環衛、出租等公共領域新能源汽車置換率不低於30%;個人購買使用新能源汽車,除享受由中央和地方財政分別給予總車款60%左右的政策性補貼外,還可享受免車輛購置稅和車輛不限行、電價優惠等鼓勵政策。2016年底大連市區將實現充電樁網路化建設,基本可滿足新能源汽車充電需求。   中能東道集團已完成全國31個省管理服務中心佈局,並在大連設立運營中心。野馬u能純電動汽車即將落戶大連。   文章來源:新浪新聞看點

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

※評比前十大台北網頁設計台北網站設計公司知名案例作品心得分享

※智慧手機時代的來臨,RWD網頁設計已成為網頁設計推薦首選

眾泰汽車被舉報騙補 借殼金馬股份或生變

昨日一則關於眾泰汽車涉嫌騙取國家補貼的舉報,在網路上被迅速傳播:江蘇宿遷眾泰經銷商舉報眾泰汽車為了拿到新能源補貼,在國家大力查處騙補之時,大量生產眾泰新能源雲100早產車。由於生產日期與全身玻璃相差3-5個月,車管所拒絕過戶導致大量車主無法上牌,經銷商無法幫車主解決此問題於是舉報廠家。  
 
專項嚴打   恰恰是面臨著2016年新能源補貼將降低的時間節點,也激發了新能源汽車銷量出現井噴。據報導,2015年11月我國的新能源商用車單月產量超過全球其他國家總和。新能源汽車產銷超常增速背後的“虛火”,也引起了管理部門的關注。   2016年1月初,國家資訊中心資源開發部主任徐長明公開表達了對新能源汽車的質疑, 正是在這樣的質疑聲中,2016年1月21日,四部委聯合發佈《關於開展新能源汽車推廣應用核查工作的通知》,隨後這一“騙補核查行動”更是上升為由國務院牽頭。5月28日,財政部官網消息稱,關於新能源汽車推廣騙補的現場核查已經完成,目前處於會審階段,但並未透露任何核查細節。  
眾泰之危   值得關注的是,上市公司金馬股份已經於今年3月28日發佈公告,計畫以116億元的估值收購眾泰汽車全部股權。然而金馬股份卻在7月6日撤回了收購方案申報,眾泰汽車的借殼上市計畫也戛然而止。恰在這樣一個敏感的時點,關於眾泰汽車騙取新能源汽車補貼的舉報現於網路,並被快速傳播。   也正是由於眾泰汽車的盈利對於新能源汽車補貼的高度依賴,也使得針對該公司騙取新能源補貼的舉報備受關注,相關人士表示,如果這個舉報最終被證實,那麼對於眾泰汽車的借殼上市而言,就是絕殺,不僅是經營層面違法違規、可能面臨高額處罰,而且從最終的財務影響來看也會涉及到財務造假。   文章來源:環球網

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※帶您來了解什麼是 USB CONNECTOR  ?

※自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!

※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

大眾汽車集團「以人為本」戰略 再推15款電動汽車

大眾汽車集團攜手旗下大眾汽車、奧迪、斯柯達、西雅特、保時捷、賓利六大品牌及合資企業一汽大眾、上海大眾,聯合發佈了』以人為本」的中國戰略。  
  該戰略基於三個重要支柱——客戶、員工與社會,這三大支柱將促成創新且滿足消費者需求的車型。   作為該戰略的首項活動,大眾汽車集團攜旗下各品牌與合資公司共同向中國家庭捐贈5000多個安全坐椅。   作為中國戰略的一部分,大眾汽車集團將在未來三年中不斷推進節能環保車型的生產,加強技術創新,推出一系列低排放車型。據大眾汽車集團總裁兼CEO海茲曼介紹,大眾未來每一款新車型的燃油效率都將比其上一代車型提高20%。   大眾汽車集團已宣佈其新能源汽車戰略,計畫在2018年之前再推出超過15款電動車新車型,並從2016年開始國產其中多款車型。到2018年,大眾汽車集團在中國的國產車型將超過35款。   文章來源:新京報

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

※評比前十大台北網頁設計台北網站設計公司知名案例作品心得分享

※智慧手機時代的來臨,RWD網頁設計已成為網頁設計推薦首選

特斯拉將推純電動MPV 與MODEL X共用平臺打造

據海外媒體報導,未來特斯拉有望推出一款全新純電動MPV車型。由於特斯拉MODEL X的生產平臺具備良好的可擴展性,特斯拉純電動MPV將與MODEL X共用平臺打造,在外觀方面有可能借鑒大眾BUDD-e的設計風格。  
  雖然目前關於這款MPV的消息十分有限,但外媒根據埃隆•馬斯克上月底公佈的特斯拉未來戰略規劃推測,特斯拉的MPV在車內空間設計方面將以靈活、多用途為主,除了能容納較多乘客外,還要便於用戶放置嬰兒車、自行車、輪椅等大型物件。更多有關這款MPV的消息,我們要等到2017年其概念車型亮相後才能知曉。   文章來源:汽車之家

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※帶您來了解什麼是 USB CONNECTOR  ?

※自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!

※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

新加坡贈比亞迪100張電動計程車牌照 打造智慧交通

近日,新加坡政府向比亞迪頒發100個電動計程車牌照,意味著比亞迪正式成為有史以來進入新加坡計程車市場的第一家中國企業。  
  據悉,此次頒發的100個計程車牌照將用於e6純電動車。宏達同新加坡計程車私人有限公司將負責這100台e6計程車的日常運營和管理,首批車輛預計在今年9月上路運營,100台計程車計畫于明年一季度前全部投入服務。屆時,比亞迪將與其新加坡合作夥伴對這支e6車隊進行即時資料採集、分析和處理,聯合研發自動駕駛電動汽車技術和智慧交通管理系統,以支援新加坡政府打造智慧交通。   目前,比亞迪在新加坡的車型由最初的e6逐步擴展到電動巴士、叉車和電動商用車等多種車型,並與GrabCar等交通出行領軍企業展開合作。今年5月30日,比亞迪還攜手SMART集團在新加坡開啟了全球首家純電動車體驗中心,並簽署在新加坡聯合推廣1,000輛e6的合作諒解備忘錄。   文章來源:環球網

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

※評比前十大台北網頁設計台北網站設計公司知名案例作品心得分享

※智慧手機時代的來臨,RWD網頁設計已成為網頁設計推薦首選

2020即將到來,看完這篇幫你詳細了解數據策略

 

隨着企業與数字科技融合程度的逐漸加深,越來越多的企業在数字化轉型之路上感到前所未有的焦慮。他們相信組織已經擁有了具有經濟價值的數據資源,期望它能像其他資產那樣為組織帶來更多的未來利益。

諸如員工資產,與數據一樣,員工也是企業資產的重要組成部分。只不過相較於數據資產管理在國內企業中仍然處於早期階段。大部分企業在員工的選育用留上已經形成了一套標準化流程和相對成熟的管理策略。基於此,我們是不是應該考慮像管理員工資產那樣,以相同的方式處理數據呢?這時,數據策略顯得尤為重要。

 

一、數據策略的定義

 

數據策略是用於獲取、集成、存儲、保護、管理、監控、分析、使用和操作數據的方法。現代而全面的數據策略所涉及的不僅僅是數據,它是定義人員、流程和技術的路線圖,闡明了數據將如何實現和激發業務策略。

 

具體包括:闡明目標願景和實現該願景的實用指南,明確闡述成功標準和關鍵績效指標,這些指標可用於評估和合理化所有後續數據計劃。而不包含針對特定技術問題的詳細解決方案。隨着企業目標的發展,數據戰略並非一成不變,需要緊跟企業的技術創新和運作方式。

 

二、為什麼企業要建立數據策略

 

鑒於前面闡述的背景,數據策略與員工的選育用留策略一樣,都是為了更科學地管理企業資產。如果沒有策略,組織將被動應對各業務部門提出的數據要求。從前台業務、市場,到中後台的財務、供應鏈、人力資源,都會向一個部門提需求,可能涉及到某種數據分析、主數據管理、商業智能、數據治理、數據質量計劃等。

 

導致數據部門每天對外要理解業務數據需求的內涵,竭力排期滿足,對內要運維所使用的陳舊工具和系統,保證其正常運行,每天不堪重負。一旦出現數據質量、元數據等問題,就會被挑戰得體無完膚,甚至會升級到能力和信任的高度。更為關鍵的是,如果無法及時正確地支持這些計劃,很容易導致企業做出錯誤的商業決策。

 

三、誰來制定、推動數據策略

 

請不要將企業範圍的數據策略僅僅交給首席信息官(CIO),為什麼?數據不僅僅是IT資產,而是一種企業資產,數據策略在一定程度上是一種企業戰略。

DataPipeline認為,CEO和董事會需要深刻理解快速將數據戰略落地的意義和風險,並着手構建下述組織架構,鼓勵相應的文化和創新。

 

CEO相較於其他企業角色,既要關注生存,也要關注發展,而數據很難做到立竿見影,所以平衡短期收益與長期發展考驗的是CEO的智慧。如果CEO在公布決策時都是引用數據,並對企業內部的數據創新非常熟悉,那麼數據策略已經成功了一半,否則其他人的努力有極大概率會付諸東流。

 

CDO由CEO領導,直接負責公司組織內部數據發展策略落地的詳細路徑和整體節奏,根據業務模式確定合規要求、需求滿足的價值、速度、流程、以及自動化、智能化技術路線的選擇。這裏一定要注意滿足業務需求的速度和質量,由於數據需求的挑戰較大,太多CDO無法在一定時間,一定業務範圍內快速達成CEO、董事會、業務部門希望看到的效果。沒有一個好的起點,首席數據官的工作就會喪失前進的節奏,陷於和業務部門就數據的上收、使用等流程長期討論和拉鋸的泥潭中,造成惡性循環,使這個崗位變成高危職位。據DataPipeline觀察,很多企業開始設立CDO的崗位,並嘗試通過數據帶來業務增長,客觀來說,這和其他高管職位一樣,是一個機遇與挑戰並存的情況。

 

數據合規與標準委員會由CEO領導,並由公司的業務線領導、法務領導、首席數據官組成,詳細制定出數據使用的邊界、自由度和數據質量標準。負責隨着業務的發展保持最高頻率(一般是一周一次)的討論更新,同時使用自動化的工具將規則同步至數據系統中。如果業務的變化無法從合規層面保持一致,就會逐步成為限制數據使用的瓶頸。這裏的挑戰在於不讓規則討論過於大而全,要儘快在一定範圍內達成共識,逐步推動部分範圍內規則地快速落地,否則會使願景的落地失去前進節奏。

 

數據部門由首席數據官領導,包括數據工程師,分析師和數據科學家。數據工程師負責使用符合時代挑戰的自研或者商業的工具,確保業務用戶可以自助式地完成數據全生命周期的使用和管理。同時負責企業內外的數據源能自動高效地集成融合,快速滿足業務取數、用數需求,另外通過保證元數據、主數據、數據血緣與業務發展時刻保持一致,讓業務準確無誤地理解數據語義。

他們不僅要確保大數據平台的負載均衡、穩定性,可以隨時響應業務對數據模型的計算和查詢需求。還要遵循標準委員制定的標準,通過手工制定規則和各種算法確保數據質量並盡可能做到前置預警。最後,也是非常重要的一點,在應對業務部門的需求時,需要有一套“定價體系”。因為數據支持業務的發展探索是存在成本的,但目前業務部門對此並無感知,更核算不出ROI。在成本面前,很容易篩選出真需求,排出優先級,並且在後續服務中理清ROI。這條路舉步維艱,但又勢在必行,否則數據部門的業務價值困境始終會存在。 有時數據部門在沒有設立首席數據官的情況下也由CIO領導,這時有一個職責劃分藝術,每個企業的情況都不同,但CDO的重點職責是在合適的企業內帶領數據組用數據快速產生業務價值。CIO的職責範圍更廣,但專精的領域不在該點上。

 

業務部門中應當擁有能深入理解業務的分析師和科學家,自助使用數據部門提供的工具,這時使用門檻會不斷降低,取數用數的難度和周期也會大幅下降,技能的要求一般是SQL級別。因此業務部門需要更加理解數據,並構思數據可以應用到自身業務發展的角度,再通過管理數據使用的全生命周期,在實踐中不斷總結。挑戰在於如何能快速用數據高效地帶來業務價值,通過解耦來擺脫髮展受到數據部門效率制約的現狀。

 

四、數據策略中的數據融合策略

 

1. 為什麼要制定數據融合策略

企業希望在未來以數據支持數據應用和數據業務,但前提是能夠隨時隨心快速提取使用數據。在此過程中存在一些壁壘,諸如技術、組織、文化等。如果沒有提前思考這些難點,在後續朝目標努力時,會遇到很大的阻力,甚至有可能讓項目流產。

2. 制定數據融合策略時需要思考回答哪些問題

在搞清楚為什麼制定數據融合策略之後,接下來我們將從以下幾方面展開:

Who:組織中,誰將參与數據融合?是否有具備編程知識的IT專家解決所有數據融合任務?還是需要使業務部門的員工能夠自己使用數據融合工具?一旦實施,潛在的技術風險和實施周期是什麼?這些問題將對您選擇購買的數據融合解決方案的類型產生重大影響。

What:哪些數據可能需要集成?在DataPipeline看來,我們反對大而全,支持企業按需制定非常靈活的數據融合策略。企業需要反向從業務角度去思考,從全局角度梳理清楚在未來一到三年或更長期的時間內,有哪些業務目標希望通過數據去驅動,這些數據包括哪些數據,這些數據都存在於哪裡,避免重複建設。

如果只存在幾個數據孤島,對企業來說,最具成本效益的策略可能是選擇可以滿足特定需求的基本數據交換或ETL工具。但是,如果需要集成許多不同的孤島(或者不同類型的數據),那麼最好使用功能更為全面的數據融合平台。

When:何時進行數據融合?企業一旦制定了數據策略,接下來數據融合策略將會被擺在一個非常高的位置上。因為,如果不做數據融合,其餘事情將會舉步維艱,但從時間上要做通盤考慮,進行一個頂層設計,然後再按階段逐步進行。

如果要創建數據倉庫,則數據融合可能會在分析之前進行。如果要創建基於Hadoop或類似技術的數據湖,以原始未更改的形式存儲數據,則將在運行分析工作負載之前進行一些數據融合。目前許多企業都有數據倉庫和數據湖,選擇何種體繫結構將影響數據融合所需的技術類型。

Where:數據融合將在哪裡進行?雲和本地並不是一個矛盾,因為現在有許多公司既有本地的IDC(數據中心),又有雲上,甚至是多雲的架構。企業需要選擇一種能夠支持本地部署、雲上部署、Docker,在容器大的平台上多管齊下的數據融合工具。另外,該工具的現代化和智能化程度還應該適應企業當中非常複雜多變的環境。

How:如何進行數據融合?這個問題最為複雜,因為它涉及到工具、文化和流程。關於員工,作為企業和組織的戰略性資源,我們並非要將每個人都訓練成高水平、有經驗、訓練有素的數據工程師。而應注重培養其數據意識,知道如何使用數據,如何去發揮數據的價值。

同時企業應該採取靈活易用、穩定的數據融合工具。面對再厲害的人才,如果沒有匹配相應的工具和方法論,在工作中也很難游刃有餘。未來企業需要多思考何種工具,何種平台,何種工作方式能讓組織儘快釋放數據效能。

 

3. 選擇數據融合平台需要做哪些考量

公司需要結合自身的發展階段、信息化建設水平以及人員的素養等情況,來選擇自己的解決方案。

如果研發人員較多,可選擇的範圍相對會比較廣泛。面對開源和商業的數據融合工具,您需要根據企業的需求和對生產的要求進行選擇。開源是很好的方式,可以先從這裏嘗試做起,但是對於業務的連續性要求和有些生產級別的要求,可能就需要商業工具。

另外,關於一些技術考量點,可以看這篇文章:

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

※帶您來了解什麼是 USB CONNECTOR  ?

※自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!

※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

【Flume】Flume基礎之安裝與使用

1、Flume簡介

​ (1) Flume提供一個分佈式的,可靠的,對大數據量的日誌進行高效收集、聚集、移動的服務,Flume只能在Unix環境下運行。

​ (2) Flume基於流式架構,容錯性強,也很靈活簡單。

​ (3) Flume、Kafka用來實時進行數據收集,Spark、Flink用來實時處理數據,impala用來實時查詢。

2、Flume角色

2.1 Source

​ 用於採集數據,Source是產生數據流的地方,同時Source會將產生的數據流傳輸到Channel,這個有點類似於Java IO部分的Channel。

2.2 Channel

​ 用於橋接Sources和Sinks,類似於一個隊列。

2.3 Sink

​ 從Channel收集數據,將數據寫到目標源(可以是下一個Source,也可以是HDFS或者HBase)。

2.4 Event

​ 傳輸單元,Flume數據傳輸的基本單元,以事件的形式將數據從源頭送至目的地。

3、Flume傳輸過程

​ source監控某個文件或數據流,數據源產生新的數據,拿到該數據后,將數據封裝在一個Event中,並put到channel后commit提交,channel隊列先進先出,sink去channel隊列中拉取數據,然後寫入到HDFS或其他目標源中。

4、Flume安裝與部署

4.1 上傳包

​ 將flume的gz包上傳到/opt/soft/目錄下;

[root@bigdata111 conf]# rz

​ 若不支持rz命令,則用yum安裝lrzsz命令:

​ 查詢含有rz的yum源,由結果可見,yum源中含有lrzsz.x86_64包;

[root@bigdata111 soft]# yum search rzsz
已加載插件:fastestmirror
Loading mirror speeds from cached hostfile
 * base: mirrors.tuna.tsinghua.edu.cn
 * extras: mirrors.aliyun.com
 * updates: mirrors.aliyun.com
============================================================================================================================= N/S matched: rzsz ==============================================================================================================================
lrzsz.x86_64 : The lrz and lsz modem communications programs

  名稱和簡介匹配 only,使用“search all”試試。

​ 安裝rz命令

[root@bigdata111 soft]# yum -y install lrzsz

4.2 解壓包

​ 將flume解壓到/opt/module/目錄下,並改短名字flume-1.8.0:

[root@bigdata111 soft]# tar -zvxf apache-flume-1.8.0-bin.tar.gz -C /opt/module
[root@bigdata111 module]# mv apache-flume-1.8.0-bin flume-1.8.0

4.3 配置參數

​ 切換到/opt/module/flume-1.8.0/conf目錄,將flume-env.sh.template文件名改為:flume-env.sh

[root@bigdata111 module]# mv flume-env.sh.template flume-env.sh

​ 查詢JAVA_HOME的值;

[root@bigdata111 conf]# echo $JAVA_HOME
/opt/module/jdk1.8.0_144

​ 編輯flume-env.sh,將文件內容中的JAVA_HOME的值修改為上面查到的;

export JAVA_HOME=/opt/module/jdk1.8.0_144

4.4 配置環境變量

​ 在/etc/profile末尾添加flume的家路徑

export FLUME_HOME=/opt/module/flume-1.8.0
export PATH=$PATH:$FLUME_HOME/bin

4.5 驗證flume成功與否

​ 在xshell客戶端下,輸入flu,按tab鍵,看是否能夠自動補全:flume-ng

​ 如果可以自動補全,則代表安裝flume成功,否則失敗。

[root@bigdata112 opt]# flume-ng
Error: Unknown or unspecified command ''

Usage: /opt/module/flume-1.8.0/bin/flume-ng <command> [options]...

commands:
  help                      display this help text
  agent                     run a Flume agent
  avro-client               run an avro Flume client
  version                   show Flume version info
............

4.6 配置其他兩台機器

​ 利用scp命令,配置其他兩台機器;

​ 首先,將flume目錄分發到bigdata112,bigdata113

[root@bigdata111 ~]# scp -r /opt/module/flume-1.8.0/ root@bigdata112:/opt/module/
[root@bigdata111 ~]# scp -r /opt/module/flume-1.8.0/ root@bigdata113:/opt/module/

​ 其次,將/etc/profile環境變量文件分發到bigdata112,bigdata113

[root@bigdata111 ~]# scp -r /etc/profile root@bigdata112:/etc/
[root@bigdata111 ~]# scp -r /etc/profile root@bigdata113:/etc/

​ 最後,在bigdata112,bigdata113上分別刷新環境變量

[root@bigdata112 opt]# source /etc/profile
[root@bigdata113 opt]# source /etc/profile

5、Flume案例

5.1 監控端口數據

目標:Flume監控一端Console,另一端Console發送消息,使被監控端實時显示。

5.1.1 安裝telnet命令

[root@bigdata111 conf]# yum -y install telnet

5.1.2 創建Agent配置文件

​ 在flume根目錄下,新建一個myconf目錄,用於存放自定義conf配置文件;

​ 新建flume-telnet.conf文件,文件內容如下:

# 定義agent
# <自定義agent名>.sources=<自定義source名稱>
a1.sources = r1
# <自定義agent名>.sinks=<自定義sink名稱>
a1.sinks = k1
# <自定義agent名>.channels=<自定義channel名稱>
a1.channels = c1

# 定義source
# <agent名>.sources.<source名稱>.type = 源類型
a1.sources.r1.type = netcat
# <agent名>.sources.<source名稱>.bind = 數據來源服務器
a1.sources.r1.bind = bigdata111
# <agent名>.sources.<source名稱>.port = 自定義未被佔用的端口
a1.sources.r1.port = 44445

# 定義sink
# <agent名>.sinks.<sink名稱>.type = 下沉到目標源的類型
a1.sinks.k1.type = logger

# 定義channel
# <agent名>.channels.<channel名稱>.type = channel的類型
a1.channels.c1.type = memory
# <agent名>.channels.<channel名稱>.capacity = 最大容量
a1.channels.c1.capacity = 1000
# transactionCapacity<=capacity
a1.channels.c1.transactionCapacity = 1000                  

# 雙向鏈接
# <agent名>.sources.<source名稱>.channels = channel名稱
a1.sources.r1.channels = c1
# <agent名>.sinks.<sink名稱>.channel = channel名稱
a1.sinks.k1.channel = c1

5.1.3 啟動flume配置文件

[root@bigdata111 conf]# flume-ng agent --conf /opt/module/flume-1.8.0/conf/ --name a1 --conf-file /opt/module/flume-1.8.0/conf/flume-telnet.conf -Dflume.root.logger==INFO,console

​ 可以簡寫為:

[root@bigdata111 conf]# flume-ng agent --c /opt/module/flume-1.8.0/conf/ --n a1 --f /opt/module/flume-1.8.0/conf/flume-telnet.conf -Dflume.root.logger==INFO,console

5.1.4 發送測試數據

​ 通過其他機器向bigdata111的44445端口發送數據

[root@bigdata112 ~]# telnet bigdata111 44445
Trying 192.168.1.111...
Connected to bigdata111.
Escape character is '^]'.
echo aaaa
OK
echo aaaa
OK
echo bbbbbbbbb
OK

運行結果如圖:

5.2 實時讀取本地文件到HDFS

5.2.1 創建Agent配置文件

​ 創建flume-hdfs配置文件

# 1 agent  若同時運行兩個agent,則agent名字需要改變,比如下面a2
a2.sources = r2
a2.sinks = k2
a2.channels = c2

# 2 source 
# 因監控linux本地文件,執行shell命令,所以type為exec;
a2.sources.r2.type = exec
# 監控的文件路徑
a2.sources.r2.command = tail -F /opt/test.log
a2.sources.r2.shell = /bin/bash -c

# 3 sink
# 數據下沉到目標源hdfs
a2.sinks.k2.type = hdfs
# 如果集群為HA模式,則路徑為active的namenode地址,普通分佈式集群,直接寫namenode所在地址即可。
a2.sinks.k2.hdfs.path = hdfs://bigdata111:9000/flume/%Y%m%d/%H
#上傳文件的前綴
a2.sinks.k2.hdfs.filePrefix = logs-
#是否按照時間滾動文件夾
a2.sinks.k2.hdfs.round = true
#多少時間單位創建一個新的文件夾
a2.sinks.k2.hdfs.roundValue = 1
#重新定義時間單位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地時間戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#積攢多少個Event才flush到HDFS一次
a2.sinks.k2.hdfs.batchSize = 1000
#設置文件類型,可支持壓縮
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一個新的文件
a2.sinks.k2.hdfs.rollInterval = 600
#設置每個文件的滾動大小
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滾動與Event數量無關
a2.sinks.k2.hdfs.rollCount = 0
#最小副本數
a2.sinks.k2.hdfs.minBlockReplicas = 1

# 定義channel
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 1000

# 雙向鏈接綁定
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

5.2.2 啟動flume配置文件

[root@bigdata111 flume-1.8.0]# flume-ng agent --conf /opt/module/flume-1.8.0/conf/ --name a2 --conf-file /opt/module/flume-1.8.0/myconf/flume-hdfs.conf 

5.2.3 發送文件內容

[root@bigdata111 opt]# echo kjalksdjglkajsdg2333333333333333asdgasdgasdg >> test.log
[root@bigdata111 opt]# echo kjalksdjglkajsdg2333333333333333asdgasdgasdg >> test.log
[root@bigdata111 opt]# echo kjalksdjglkajsdg2333333333333333asdgasdgasdg >> test.log
[root@bigdata111 opt]# echo kjalksdjglkajsdg2333333333333333asdgasdgasdg >> test.log

​ 運行結果:

5.3 實時讀取目錄文件到HDFS

目標:使用flume監聽整個目錄的文件

5.3.1 創建Agent配置文件

​ 創建agent配置文件,命名為:flume-dir.conf,文件內容如下:

#1 Agent
a3.sources = r3
a3.sinks = k3
a3.channels = c3

#2 source
#監控目錄的類型
a3.sources.r3.type = spooldir
#監控目錄的路徑
a3.sources.r3.spoolDir = /opt/module/flume1.8.0/upload
#哪個文件上傳hdfs,然後給這個文件添加一個後綴
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
#忽略所有以.tmp結尾的文件,不上傳(可選)
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)

# 3 sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://bigdata111:9000/flume/%H
#上傳文件的前綴
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照時間滾動文件夾
a3.sinks.k3.hdfs.round = true
#多少時間單位創建一個新的文件夾
a3.sinks.k3.hdfs.roundValue = 1
#重新定義時間單位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地時間戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#積攢多少個Event才flush到HDFS一次
a3.sinks.k3.hdfs.batchSize = 100
#設置文件類型,可支持壓縮
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一個新的文件
a3.sinks.k3.hdfs.rollInterval = 600
#設置每個文件的滾動大小大概是128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滾動與Event數量無關
a3.sinks.k3.hdfs.rollCount = 0
#最小副本數
a3.sinks.k3.hdfs.minBlockReplicas = 1

# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3

​ 溫馨提示:

​ 1) 不要在監控目錄中創建並持續修改文件

​ 2) 上傳完成的文件會以.COMPLETED結尾

​ 3) 被監控文件夾每500毫秒掃描一次文件變動

5.3.2 啟動flume配置文件

[root@bigdata111 myconf]# flume-ng agent --conf /opt/module/flume-1.8.0/conf/ --name a3 --conf-file /opt/module/flume-1.8.0/myconf/flume-dir.conf

5.3.3 上傳文件到upload目錄

[root@bigdata111 opt]# mkdir upload
[root@bigdata111 opt]# ls
module  soft  test.log  upload
[root@bigdata111 opt]# mv test.log upload/
[root@bigdata111 opt]# ls
module  soft  upload
[root@bigdata111 opt]# vi test1.log
[root@bigdata111 opt]# mv test1.log upload/

​ 運行如圖:

5.4 扇出例子01

扇出:數據用於多個地方。(簡單理解:一個數據源對應多個channel,sink,並且輸出到多個目標源)

例子01示意圖:

目標:在flume1裏面接收數據,然後數據下沉到兩個不同目標源(控制台和HDFS)

5.4.1 創建Agent配置文件

​ 在myconf目錄下,新建一個flume-fanout1.conf文件,內容配置如下:

# 定義agent
a1.sources=c1
a1.channels=k1 k2
a1.sinks=s1 s2

# 定義source
a1.sources.c1.type=exec
a1.sources.c1.command=tail -F /opt/test.log
a1.sources.c1.shell=/bin/bash -c

# 將數據流複製給多個channel
a1.sources.r1.selector.type=replicating

# 定義channel1
a1.channels.k1.type=memory
a1.channels.k1.capacity = 1000
a1.channels.k1.transactionCapacity=1000

# 定義channel2
a1.channels.k2.type=memory
a1.channels.k2.capacity = 1000
a1.channels.k2.transactionCapacity=1000

# 定義sink1
a1.sinks.s1.type=logger

# 定義sink2
a1.sinks.s2.type=hdfs
a1.sinks.s2.hdfs.path = hdfs://bigdata111:9000/flume/%Y%m%d/%H
# 上傳文件的前綴
a1.sinks.s2.hdfs.filePrefix = logs-
# 是否按照時間滾動文件夾
a1.sinks.s2.hdfs.round = true
# 多少時間單位創建一個新的文件夾
a1.sinks.s2.hdfs.roundValue = 1
# 重新定義時間單位
a1.sinks.s2.hdfs.roundUnit = hour
# 是否使用本地時間戳
a1.sinks.s2.hdfs.useLocalTimeStamp = true
# 積攢多少個Event才flush到HDFS一次
a1.sinks.s2.hdfs.batchSize = 1000
# 設置文件類型,可支持壓縮
a1.sinks.s2.hdfs.fileType = DataStream
# 多久生成一個新的文件
a1.sinks.s2.hdfs.rollInterval = 600
# 設置每個文件的滾動大小
a1.sinks.s2.hdfs.rollSize = 134217700
# 文件的滾動與Event數量無關
a1.sinks.s2.hdfs.rollCount = 0
# 最小副本數
a1.sinks.s2.hdfs.minBlockReplicas = 1

# 雙向鏈接
a1.sources.c1.channels = k1 k2
a1.sinks.s1.channel=k1
a1.sinks.s2.channel=k2

5.4.2 啟動flume配置文件

[root@bigdata111 myconf]# flume-ng agent --conf /opt/module/flume-1.8.0/conf/ --name a1 --conf-file /opt/module/flume-1.8.0/myconf/flume-fanout1.conf -Dflume.root.logger==INFO,console

5.4.3 向文件添加內容

​ 切換到/opt/目錄下,新建test.log文件,然後動態添加內容,觀察控制台輸出以及web的hdfs文件

[root@bigdata111 opt]# touch test.log
[root@bigdata111 opt]# touch test.log
[root@bigdata111 opt]# echo 'china' >>test.log 
[root@bigdata111 opt]# echo 'hello world' >>test.log
[root@bigdata111 opt]# echo 'nihao' >> test.log

​ 控制台輸出如下:

​ web頁面結果:

5.5 扇出例子02

目標:flume1監控文件,然後將變動數據分別傳給flume2和flume3,flume2的數據下沉到HDFS;flume3的數據下沉到本地文件;

5.5.1 創建flume1配置文件

​ 在bigdata111上的myconf目錄下,新建agent配置文件:flume-fanout1.conf;

​ flume1用於監控某文件的變動,同時產生兩個channel和兩個sink,分別輸送給flume2,flume3;

​ 文件內容如下:

# 配置agent
a1.sources = c1
a1.channels = k1 k2
a1.sinks = s1 s2

# 定義source
a1.sources.c1.type=exec
a1.sources.c1.command=tail -F /opt/test.log
a1.sources.c1.shell=/bin/bash -c

# 將數據流複製給多個channel
a1.sources.c1.selector.type=replicating

# 定義channel1
a1.channels.k1.type=memory
a1.channels.k1.capacity = 1000
a1.channels.k1.transactionCapacity=1000

# 定義channel2
a1.channels.k2.type=memory
a1.channels.k2.capacity = 1000
a1.channels.k2.transactionCapacity=1000

# 定義sink1
a1.sinks.s1.type = avro
a1.sinks.s1.hostname = bigdata112
a1.sinks.s1.port = 4402

# 定義sink2
a1.sinks.s2.type = avro
a1.sinks.s2.hostname = bigdata113
a1.sinks.s2.port = 4402

# 雙向鏈接
a1.sources.c1.channels = k1 k2
a1.sinks.s1.channel=k1
a1.sinks.s2.channel=k2

5.5.2 創建flume2配置文件

​ 在bigdata112的myconf目錄下,新建agent配置文件:flume-fanout2.conf

​ 接收flume1的event數據,然後產生一個channel和一個sink,最後將數據下沉到hdfs

​ 文件內容如下:

# 配置agent 不同agent之間,agent名不相同,但是source,channel,sink名可以相同
a2.sources = c2
a2.channels = k2
a2.sinks = s2

# 定義source
a2.sources.c2.type=avro
a2.sources.c2.bind = bigdata112
a2.sources.c2.port = 4402

# 定義channel
a2.channels.k2.type=memory
a2.channels.k2.capacity = 1000
a2.channels.k2.transactionCapacity=1000

# 定義sink
a2.sinks.s2.type = hdfs
a2.sinks.s2.hdfs.path=hdfs://bigdata111:9000/flume2/%H
#上傳文件的前綴
a2.sinks.s2.hdfs.filePrefix = flume2-
#是否按照時間滾動文件夾
a2.sinks.s2.hdfs.round = true
#多少時間單位創建一個新的文件夾
a2.sinks.s2.hdfs.roundValue = 1
#重新定義時間單位
a2.sinks.s2.hdfs.roundUnit = hour
#是否使用本地時間戳
a2.sinks.s2.hdfs.useLocalTimeStamp = true
#積攢多少個Event才flush到HDFS一次
a2.sinks.s2.hdfs.batchSize = 100
#設置文件類型,可支持壓縮
a2.sinks.s2.hdfs.fileType = DataStream
#多久生成一個新的文件
a2.sinks.s2.hdfs.rollInterval = 600
#設置每個文件的滾動大小大概是128M
a2.sinks.s2.hdfs.rollSize = 134217700
#文件的滾動與Event數量無關
a2.sinks.s2.hdfs.rollCount = 0
#最小副本數
a2.sinks.s2.hdfs.minBlockReplicas = 1

# 雙向鏈接
a2.sources.c2.channels = k2
a2.sinks.s2.channel=k2

5.5.3 創建flume3配置文件

​ 在bigdata113的myconf目錄下,新建agent配置文件:flume-fanout3.conf

​ 接收flume1的event數據,然後產生一個channel和一個sink,最後將數據下沉到本地/opt/flume3

​ 文件內容如下:

# 配置agent
a3.sources = c3
a3.channels = k3
a3.sinks = s3

# 定義source
a3.sources.c3.type=avro
a3.sources.c3.bind = bigdata113
a3.sources.c3.port = 4402

# 定義channel
a3.channels.k3.type=memory
a3.channels.k3.capacity = 1000
a3.channels.k3.transactionCapacity=1000

# 定義sink
a3.sinks.s3.type = file_roll
# 提示:本地此目錄必須先建好,程序不會自動創建該目錄
a3.sinks.s3.sink.directory=/opt/flume3

# 雙向鏈接
a3.sources.c3.channels = k3
a3.sinks.s3.channel=k3

5.5.4 啟動三台機器配置文件

bigdata111:

[root@bigdata111 myconf]# flume-ng agent --conf /opt/module/flume-1.8.0/conf/ --name a1 --conf-file /opt/module/flume-1.8.0/myconf/flume-fanout1.conf -Dflume.root.logger==INFO,console

bigdata112:

[root@bigdata112 myconf]# flume-ng agent --conf /opt/module/flume-1.8.0/conf/ --name a2 --conf-file /opt/module/flume-1.8.0/myconf/flume-fanout2.conf 

bigdata113:

[root@bigdata113 myconf]# flume-ng agent --conf /opt/module/flume-1.8.0/conf/ --name a3 --conf-file /opt/module/flume-1.8.0/myconf/flume-fanout3.conf 

​ 運行結果如圖:

​ bigdata112:

​ bigdata113:

5.6 扇入例子

5.6.1 創建flume1配置文件

​ flume1(agent1)監控端口數據變化,將數據sink到flume3(agent3);

​ 在myconf目錄下新建agent文件:flume-fanin-1.conf

​ 配置內容如下:

# 配置agent
a1.sources = c1
a1.channels = k1
a1.sinks = s1

# 配置source
a1.sources.c1.type = netcat
a1.sources.c1.bind = bigdata111
a1.sources.c1.port = 6666

# 配置sink
a1.sinks.s1.type=avro
a1.sinks.s1.hostname=bigdata113
a1.sinks.s1.port=5008

# 配置channel
a1.channels.k1.type=memory
a1.channels.k1.capacity=1000
a1.channels.k1.transactionCapacity=1000

# 雙向綁定
a1.sources.c1.channels = k1
a1.sinks.s1.channel = k1

5.6.2 創建flume2配置文件

​ flume2(agent2)監控本地文件變化,將數據sink到flume3(agent3);

​ 在myconf目錄下新建agent文件:flume-fanin-2.conf

​ 配置內容如下:

# 配置agent
a2.sources = c1
a2.channels = k1
a2.sinks = s1

# 配置source
a2.sources.c1.type = exec
a2.sources.c1.command = tail -F /opt/ceshi.log
a2.sources.c1.shell=/bin/bash -c

# 配置sink
a2.sinks.s1.type=avro
a2.sinks.s1.hostname=bigdata113
a2.sinks.s1.port=5008

# 配置channel
a2.channels.k1.type=memory
a2.channels.k1.capacity=1000
a2.channels.k1.transactionCapacity=1000

# 雙向綁定
a2.sources.c1.channels = k1
a2.sinks.s1.channel = k1

5.6.3 創建flume3配置文件

​ flume3(agent3)接收flume1和flume2的數據,將數據sink到HDFS ;

​ 在myconf目錄下新建agent文件:flume-fanin-3.conf

​ 配置內容如下:

# 配置agent
a3.sources = c1
a3.channels = k1
a3.sinks = s1

# 配置source
a3.sources.c1.type = avro
a3.sources.c1.bind = bigdata113
a3.sources.c1.port = 5008

# 配置sink
a3.sinks.s1.type=hdfs
a3.sinks.s1.hdfs.path=hdfs://bigdata111:9000/flume3/%H
# 上傳文件的前綴
a3.sinks.s1.hdfs.filePrefix = flume3-
# 是否按照時間滾動文件夾
a3.sinks.s1.hdfs.round = true
# 多少時間單位創建一個新的文件夾
a3.sinks.s1.hdfs.roundValue = 1
# 重新定義時間單位
a3.sinks.s1.hdfs.roundUnit = hour
# 是否使用本地時間戳
a3.sinks.s1.hdfs.useLocalTimeStamp = true
# 積攢多少個Event才flush到HDFS一次
a3.sinks.s1.hdfs.batchSize = 1000
# 設置文件類型,可支持壓縮
a3.sinks.s1.hdfs.fileType = DataStream
# 多久生成一個新的文件
a3.sinks.s1.hdfs.rollInterval = 600
# 設置每個文件的滾動大小大概是128M
a3.sinks.s1.hdfs.rollSize = 134217700
# 文件的滾動與Event數量無關
a3.sinks.s1.hdfs.rollCount = 0
# 最小冗餘數
a3.sinks.s1.hdfs.minBlockReplicas = 1

# 配置channel
a3.channels.k1.type=memory
a3.channels.k1.capacity=1000
a3.channels.k1.transactionCapacity=1000

# 雙向綁定
a3.sources.c1.channels = k1
a3.sinks.s1.channel = k1

5.6.4 啟動三個flume配置文件

flume1:

[root@bigdata111 myconf]# flume-ng agent -c ../conf/ -n a1 -f flume-fanout1.conf -Dflume.root.logger==INFO,console

flume2:

[root@bigdata112 myconf]# flume-ng agent -c ../conf/ -n a2 -f flume-fanout2.conf 

flume3:

[root@bigdata113 myconf]# flume-ng agent -c ../conf/ -n a3 -f flume-fanout3.conf 

5.6.5 操作端口與文件

新開xshell選項卡,鏈接bigdata111服務器,然後執行telnet命令:

[root@bigdata111 ~]# telnet bigdata111 6666
Trying 192.168.1.111...
Connected to bigdata111.
Escape character is '^]'.
english
OK
chinese
OK
hello
OK
.net
OK
php
OK
java
OK

新開xshell選項卡,鏈接bigdata112服務器,然後向/opt/ceshi.log添加新內容:

[root@bigdata112 ~]# cd /opt/
[root@bigdata112 opt]# ls
ceshi.log  ha  module  soft  zookeeper.out
[root@bigdata112 opt]# cat ceshi.log
start-log-in
end-log
[root@bigdata112 opt]# echo `date` >> ceshi.log
[root@bigdata112 opt]# echo "end-log" >> ceshi.log
[root@bigdata112 opt]# cat ceshi.log 
start-log-in
end-log
2019年 09月 07日 星期六 23:36:03 CST
end-log

5.6.6 显示運行結果

​ web頁面結果:

​ hdfs的文件內容:

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

※評比前十大台北網頁設計台北網站設計公司知名案例作品心得分享

※智慧手機時代的來臨,RWD網頁設計已成為網頁設計推薦首選