2019.10.08

Hive3のトランザクションを試してみる

こんにちは。次世代システム研究室のデータベース と Hadoop を担当している M.K. です。

前々回前回のブログでLLAPとDruidとスナップショット機能を試しましたが、今回はHive3系で新しくなったACIDトランザクションを試してみました。

結論から先に言うと、今までのHiveとは別物みたいになって、これからの運用がガラッと変わりそうです。

目次

  1. 準備
    1. クラウド環境とHadoop環境について
    2. 使用したデータとテーブル
    3. Hadoopクラスタのパラメータ設定(YARNとHive/LLAP)
  2. 検証
    1. UPDATE文を色々試す
    2. 同じ行を繰り返し更新し続けるとどうなるか?
    3. 手動でコンパクションしてみる
    4. ロックの動きを見る
    5. おまけ(Primary Key検索)
  3. まとめ
    1. 検証した結果
    2. 今後の課題

1. 準備

1. クラウド環境とHadoop環境について

前回のブログの環境を使って検証したので、GMOアプリクラウド&HDP3.0.1です。

  • クラウド環境
    • GMOアプリクラウド
      • マスターノード (仮想CPU:4個、メモリ:16GB) × 4
      • スレーブノード (仮想CPU:6個、メモリ:30GB) × 5
  • Hadoop環境
    • HDP 3.0.1.0-187
      • HDFS 3.1.1
      • YARN 3.1.1
      • Hive 3.1.0

 

2. 使用したデータとテーブル

トランザクションを試すにあたり、手ごろなサイズということもあって、Kaggleで提供されていたアイオワ州のお酒販売のデータを使うことにしました。

ただ、面倒なことにカラムの値に改行が入っていて、残念ながらHiveはカラムの値に改行が入っているCSVを簡単に読み取ることができないので、今回は改行をタブに変換しています。

awkでレコードを切り分けるための改行に印をつけて、カラムの中の改行を消すようなことをしました。参考までに載せておきます。

awk '/^[A-Z][A-Z_-]*[0-9][0-9]*,/ {print "ZZZ\n"$0; next}
{print}
END{print "ZZZ"}' Iowa_Liquor_Sales.csv > Iowa_Liquor_Sales.csv2

awk 'BEGIN{RS="\nZZZ\n"; ORF="\n"}
{gsub("\n", "\t"); print}' Iowa_Liquor_Sales.csv2 > Iowa_Liquor_Sales.csv3

CSVデータをHiveで読めるようにするために、テーブルを作成します。テキストフォーマットでフィールドをカンマ区切りにしてもCSVは読めますが、ダブルクオートで囲まれたカラムには対応してないので、org.apache.hadoop.hive.serde2.OpenCSVSerdeを使いました。

CREATE TABLE iowa_liquor_sales_csv (
  invoice_item_number  varchar(50)
 ,sale_date  varchar(10)
 ,store_number  integer
 ,store_name  varchar(100)
 ,address  varchar(100)
 ,city  varchar(100)
 ,zip_code  integer
 ,store_location  string
 ,county_number  integer
 ,county  varchar(50)
 ,category  varchar(50)
 ,category_name  varchar(50)
 ,vendor_number  integer
 ,vendor_name  varchar(50)
 ,item_number  integer
 ,item_description  varchar(100)
 ,pack  integer
 ,bottle_volume  integer
 ,state_bottle_cost  varchar(50)
 ,state_bottle_retail  varchar(50)
 ,bottles_sold  integer
 ,sale  varchar(50)
 ,volume_sold_liters  integer
 ,volume_sold_gallons  integer
)
ROW FORMAT
SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
   'separatorChar' = ',',
   'quoteChar'     = '"',
   'escapeChar'    = '\\'
)
STORED AS TEXTFILE
TBLPROPERTIES('skip.header.line.count'='1')
;

文字列カラムは適当にデータ長を推測して上記のようにしました。このカラム定義を使ってORCテーブルも後で作ります。

余談ですが、org.apache.hadoop.hive.serde2.OpenCSVSerdeを使ってテーブルを作ると、どのカラムも全部string型に、transactional_propertiesがinsert_onlyに自動的になりました。transactional_propertiesはトランザクションの種類をテーブルごとに指定できるもので、insert_onlyは文字通りINSERT文のみでUPDATE文には対応しなくなります。

show create table文で定義を見ると以下のようになりました。

+----------------------------------------------------+
|                   createtab_stmt                   |
+----------------------------------------------------+
| CREATE TABLE `iowa_liquor_sales_csv`(              |
|   `invoice_item_number` string COMMENT 'from deserializer',  |
|   `sale_date` string COMMENT 'from deserializer',  |
|   `store_number` string COMMENT 'from deserializer',  |
|   `store_name` string COMMENT 'from deserializer',  |
|   `address` string COMMENT 'from deserializer',    |
|   `city` string COMMENT 'from deserializer',       |
|   `zip_code` string COMMENT 'from deserializer',   |
・・・(中略)・・・
| TBLPROPERTIES (                                    |
|   'bucketing_version'='2',                         |
|   'last_modified_by'='hive',                       |
|   'last_modified_time'='1567842216',               |
|   'skip.header.line.count'='1',                    |
|   'transactional'='true',                          |
|   'transactional_properties'='insert_only',        |
|   'transient_lastDdlTime'='1567842216')            |
+----------------------------------------------------+

テーブルを作ったら、CSVファイルをロードします。

LOAD DATA LOCAL INPATH '/var/tmp/load_data/Iowa_Liquor_Sales.csv3'
OVERWRITE INTO TABLE Iowa_liquor_sales_csv
;

CSVフォーマットのテーブルからORCフォーマットのテーブルにデータを移行します。Hive3のACIDトランザクションは今のところORCテーブルにしか対応していません。TBLPROPERTIES句のtransactionalをtrueにするとトランザクション対応になります。デフォルトでtrueになっています。つまりHiveはもうトランザクションが前提ということですね。

また、NO_AUTO_COMPACTIONをtrueにするとバックエンドで行われるコンパクション処理が自動で行われなくなります。Hive3にはコンパクションというこれまでHiveになかった概念・仕掛けが出てきます。デフォルトはfalseで、falseのまま検証しました。

CREATE TABLE iowa_liquor_sales (
  invoice_item_number  varchar(50)
 ,sale_date  date
 ,store_number  integer
 ,store_name  varchar(100)
 ,address  varchar(100)
 ,city  varchar(100)
 ,zip_code  integer
 ,store_location  string
 ,county_number  integer
 ,county  varchar(50)
 ,category  varchar(50)
 ,category_name  varchar(50)
 ,vendor_number  integer
 ,vendor_name  varchar(50)
 ,item_number  integer
 ,item_description  varchar(100)
 ,pack  integer
 ,bottle_volume  integer
 ,state_bottle_cost  varchar(50)
 ,state_bottle_retail  varchar(50)
 ,bottles_sold  integer
 ,sale  varchar(50)
 ,volume_sold_liters  integer
 ,volume_sold_gallons  integer
)
STORED AS ORC
TBLPROPERTIES (
   'transactional'='true',
   'transactional_properties'='default',
   'NO_AUTO_COMPACTION'='false'
);

日付のカラムについては、Hiveで日付演算がちゃんとできるように文字列型から日付型に変換を加えてINSERTしています。CSVの1行目はヘッダー情報なので除外しました。

INSERT OVERWRITE TABLE iowa_liquor_sales
SELECT
  invoice_item_number
 ,FROM_UNIXTIME(UNIX_TIMESTAMP(sale_date ,'MM/dd/yyyy'), 'yyyy-MM-dd') AS sale_date
 ,store_number
 ,store_name
 ,address
 ,city
 ,zip_code
 ,store_location
 ,county_number
 ,county
 ,category
 ,category_name
 ,vendor_number
 ,vendor_name
 ,item_number
 ,item_description
 ,pack
 ,bottle_volume
 ,state_bottle_cost
 ,state_bottle_retail
 ,bottles_sold
 ,sale
 ,volume_sold_liters
 ,volume_sold_gallons
FROM iowa_liquor_sales_csv
WHERE invoice_item_number != 'Invoice/Item Number'
;

これでテーブルの準備ができました。

 

3. Hadoopクラスタのパラメータ設定(YARNとHive/LLAP)

今回の検証はLLAP利用を前提にしていたこともあって、YARNとHive(特にLLAP)のパラメータとYARNキューの設定を新たに見直しました。ポイントとなる設定がたくさんあって、メモリ計算を間違ったりするとLLAPが再起動できなくなるので、何かを変えるときは面倒でも毎回ちゃんと計算する必要があります。

LLAPに関するパラメータの設定方針については、こちらのLLAPサイジングのドキュメントを参考にします。

https://community.cloudera.com/t5/Community-Articles/LLAP-sizing-and-setup/ta-p/247425

理解を深めたいときはさらにこちらのDeep Diveのドキュメントを読むのをお薦めします。

https://community.cloudera.com/t5/Community-Articles/Hive-LLAP-deep-dive/ta-p/248893

 

メインとなるLLAPデーモンサイズばかりを気にして、他に必要なメモリ量の計算を誤って、YARNキューのメモリ割り当てが足りずに起動できなかったりすることが何回かありました。それでも今回一番ハマったのは、メモリ計算が合っているのに起動できなかったり、一度起動した設定なのに再起動ができなくなったりする現象に悩まされたことでした。

結局、Node Managerが立ち上がるスレーブノードにおいて、Druidサービスなどが数GBメモリを使っており、Node Managerに物理メモリの多くを割り当てすぎてメモリがスワップしたのが原因のようでした。色々試行錯誤して以下の設定にしたら安定して起動できるようになりました。加えて、今回の検証ではDruidは使わないのでサービスを停止しました。

 

大事な設定ポイントは、1つのexecutorに4GBのメモリを確保したこと(hive.tez.container.size、tez.task.resource.memory.mb、LLAP daemon size、Number of executors per LLAP Daemonなど)と、LLAPのキャッシュを利用できるようにしたこと(hive.llap.io.enabled、hive.llap.io.memory.modeなど)です。

 

パラメータ設定

  • YARN > CONFIGS > SETTINGS
     [Memory]
      Node: Memory allocated for all YARN containers on a node →25(GB)
      Container: Minimum Container Size (Memory) →2048(MB)
      Container: Maximum Container Size (Memory) →22528(MB)
     [CPU]
      Node: Number of virtual cores →4
      Container: Maximum Container Size (VCores) 4
  • YARN > CONFIGS > ADVANCED
     [Node Manager]
      NodeManager Java heap size →2048(MB)
  • Hive > CONFIGS > SETTINGS
     [Interactive Query]
      Number of nodes used by Hive’s LLAP →3
      Maximum Total Concurrent Queries →3
      Memory per Daemon (=LLAP daemon size) →22528(MB)
      In-Memory Cache per Daemon (=Cache size) →4096(MB)
      Number of executors per LLAP Daemon →4
     [Optimization]
      Tez > Tez Container Size →4096(MB)
  • Hive > CONFIGS > ADVANCED
     [Advanced hive-interactive-env]
      HiveServer Interactive Heap Size →2048(MB)
      LLAP Daemon Container Max Headroom →1024MB
      LLAP Daemon Heap Size (MB) →16384
      Number of Node(s) for running Hive LLAP daemon →3
     [Advanced hive-interactive-site]
      hive.llap.execution.mode only →all
      hive.llap.io.memory.mode →cache
      hive.llap.io.enabled →true
      hive.llap.io.threadpool.size →4
      hive.tez.container.size →4096(MB)
     [Advanced tez-interactive-site]
      tez.am.resource.memory.mb →2048(MB)
  • Tez > CONFIGS
     [General]
      tez.am.resource.memory.mb →2048(MB)
      tez.task.resource.memory.mb →4096(MB)

YARNキュー設定

  • llapキュー
      Capacity →70%、Max Capacity →70%
      Priority →1
  • defaultキュー
      Capacity →30%、Max Capacity →30%

(参考)LLAPが再起動できないときのエラー

LLAPSTATUS WatchMode with timeout=400 s
--------------------------------------------------------------------------------
LLAP status unknown
--------------------------------------------------------------------------------
WARN cli.LlapStatusServiceDriver: Watch mode enabled and got YARN error. Retrying..
WARN cli.LlapStatusServiceDriver: Watch mode enabled and got YARN error. Retrying..
WARN cli.LlapStatusServiceDriver: Watch mode enabled and got YARN error. Retrying..

 

2. 検証

Hive3のACIDトランザクションは、まだSTART TRANSACTIONやROLLBACKがありません。その意味ではまだ発展途上と言えます。そこで今回はUPDATE文を中心に検証してみました。

1. UPDATE文を色々試す

UPDATE文を試す前に、まず用意したiowa_liquor_salesテーブルのデータファイルの状態を確認してみます。

dfs -ls /warehouse/tablespace/managed/hive/hivedb.db/iowa_liquor_sales/*;
+----------------------------------------------------+
|                     DFS Output                     |
+----------------------------------------------------+
| Found 2 items                                      |
| -rw-rw----+  3 hive hadoop          1 2019-09-22 11:49 /warehouse/tablespace/managed/hive/hivedb.db/iowa_liquor_sales/base_0000001/_orc_acid_version |
| -rw-rw----+  3 hive hadoop  329673920 2019-09-22 11:58 /warehouse/tablespace/managed/hive/hivedb.db/iowa_liquor_sales/base_0000001/bucket_00000 |
+----------------------------------------------------+

このファイルの状態が一番きれいな状態と考えられます。3GBくらいのそこまで大きくないデータサイズなので、実体は1ファイルですね。もっとデータサイズが大きいと分割されると思います。

ファイル名にbucketが付いている通り、テーブル定義に宣言しなくても自動的にbucket分割をしてくれました。

 

1行更新のUPDATE

次に任意の1行だけにUPDATEを試してみます。何でもよかったので適当にinvoice_item_numberがS13775700110のレコードを選びました。

SELECT state_bottle_cost, state_bottle_retail, bottles_sold, sale FROM iowa_liquor_sales WHERE invoice_item_number = 'S13775700110';
+--------------------+----------------------+---------------+---------+
| state_bottle_cost  | state_bottle_retail  | bottles_sold  |  sale   |
+--------------------+----------------------+---------------+---------+
| $9.50              | $14.24               | 2             | $28.48  |
+--------------------+----------------------+---------------+---------+

単純な以下のようなUPDATE文を実施しました。

UPDATE iowa_liquor_sales
SET
  state_bottle_cost = '$10.00',
  state_bottle_retail = '$14.75',
  sale = '$29.50'
WHERE 
  invoice_item_number = 'S13775700110'
;
----------------------------------------------------------------------------------------------
        VERTICES      MODE        STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
----------------------------------------------------------------------------------------------
Map 1 ..........      llap     SUCCEEDED      2          2        0        0       0       0
Reducer 2 ......      llap     SUCCEEDED      2          2        0        0       0       0
----------------------------------------------------------------------------------------------
VERTICES: 02/02  [==========================>>] 100%  ELAPSED TIME: 12.11 s
----------------------------------------------------------------------------------------------

当たり前といえば当たり前ですが、ちゃんと1行更新できました!

SELECT state_bottle_cost, state_bottle_retail, bottles_sold, sale FROM iowa_liquor_sales WHERE invoice_item_number = 'S13775700110';
+--------------------+----------------------+---------------+---------+
| state_bottle_cost  | state_bottle_retail  | bottles_sold  |  sale   |
+--------------------+----------------------+---------------+---------+
| $10.00             | $14.75               | 2             | $29.50  |
+--------------------+----------------------+---------------+---------+

今回の環境とテーブルでは10数秒かかっていますが、パーティション単位でINSERTで書き戻すしかなかった従来のHiveのことを考えると、これくらいでさっと1行更新できるのは結果便利と思います。 1行更新するとデータファイルがどうなるかを見てみます。

dfs -ls /warehouse/tablespace/managed/hive/hivedb.db/iowa_liquor_sales/*;
+----------------------------------------------------+
|                     DFS Output                     |
+----------------------------------------------------+
| Found 2 items                                      |
| -rw-rw----+  3 hive hadoop          1 2019-09-22 11:49 /warehouse/tablespace/managed/hive/hivedb.db/iowa_liquor_sales/base_0000001/_orc_acid_version |
| -rw-rw----+  3 hive hadoop  329673920 2019-09-22 11:58 /warehouse/tablespace/managed/hive/hivedb.db/iowa_liquor_sales/base_0000001/bucket_00000 |
| Found 2 items                                      |
| -rw-rw----+  3 hive hadoop          1 2019-09-22 12:18 /warehouse/tablespace/managed/hive/hivedb.db/iowa_liquor_sales/delete_delta_0000002_0000002_0000/_orc_acid_version |
| -rw-rw----+  3 hive hadoop       1540 2019-09-22 12:18 /warehouse/tablespace/managed/hive/hivedb.db/iowa_liquor_sales/delete_delta_0000002_0000002_0000/bucket_00000 |
| Found 2 items                                      |
| -rw-rw----+  3 hive hadoop          1 2019-09-22 12:18 /warehouse/tablespace/managed/hive/hivedb.db/iowa_liquor_sales/delta_0000002_0000002_0000/_orc_acid_version |
| -rw-rw----+  3 hive hadoop       3065 2019-09-22 12:18 /warehouse/tablespace/managed/hive/hivedb.db/iowa_liquor_sales/delta_0000002_0000002_0000/bucket_00000 |
+----------------------------------------------------+

もとの1ファイルだった状態から、delete_delteとdeltaの別のディレクトリができて差分ファイルが追加されていました。更新を繰り返すと差分がどんどん増えていって性能劣化するのが推測されますね。この差分が溜まった状態をきれいにして性能劣化を防ぐのが新たな仕掛けのコンパクションだと思います。

今度は、同じ1行更新でも、カラムの元の値を加工して更新するUPDATE文を試してみました。

UPDATE iowa_liquor_sales
SET
  state_bottle_cost = CONCAT('$', CAST(REPLACE(state_bottle_cost, '$', '') AS double) + 0.5),
  state_bottle_retail = CONCAT('$', CAST(REPLACE(state_bottle_retail, '$', '') AS double) + 0.5),
  bottles_sold = bottles_sold + 1,
  sale = CONCAT('$', (CAST(REPLACE(state_bottle_retail, '$', '') AS double) + 0.5) * (bottles_sold + 1) )
WHERE 
  invoice_item_number = 'S13775700110'
;
----------------------------------------------------------------------------------------------
        VERTICES      MODE        STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
----------------------------------------------------------------------------------------------
Map 1 ..........      llap     SUCCEEDED      3          3        0        0       0       0
Reducer 2 ......      llap     SUCCEEDED      2          2        0        0       0       0
----------------------------------------------------------------------------------------------
VERTICES: 02/02  [==========================>>] 100%  ELAPSED TIME: 9.79 s
----------------------------------------------------------------------------------------------

カラムの元の値を加工して更新するUPDATE文もちゃんと更新されました。

SELECT state_bottle_cost, state_bottle_retail, bottles_sold, sale FROM iowa_liquor_sales WHERE invoice_item_number = 'S13775700110';
+--------------------+----------------------+---------------+---------+
| state_bottle_cost  | state_bottle_retail  | bottles_sold  |  sale   |
+--------------------+----------------------+---------------+---------+
| $10.5              | $15.25               | 3             | $45.75  |
+--------------------+----------------------+---------------+---------+

一度にたくさんの行を更新するUPDATE

1行更新の次は一度にたくさんの行を更新するUPDATE文を試してみます。

UPDATE iowa_liquor_sales
SET
  sale_date = CURRENT_DATE
WHERE sale_date = '2015-12-01'
;
----------------------------------------------------------------------------------------------
        VERTICES      MODE        STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
----------------------------------------------------------------------------------------------
Map 1 ..........      llap     SUCCEEDED      4          4        0        0       0       0
Reducer 2 ......      llap     SUCCEEDED      2          2        0        0       0       0
----------------------------------------------------------------------------------------------
VERTICES: 02/02  [==========================>>] 100%  ELAPSED TIME: 19.15 s
----------------------------------------------------------------------------------------------

20秒くらいで終わりました。

dfs -ls /warehouse/tablespace/managed/hive/hivedb.db/iowa_liquor_sales/*;
+----------------------------------------------------+
|                     DFS Output                     |
+----------------------------------------------------+
| Found 2 items                                      |
| -rw-rw----+  3 hive hadoop          1 2019-09-22 11:49 /warehouse/tablespace/managed/hive/hivedb.db/iowa_liquor_sales/base_0000001/_orc_acid_version |
| -rw-rw----+  3 hive hadoop  329673920 2019-09-22 11:58 /warehouse/tablespace/managed/hive/hivedb.db/iowa_liquor_sales/base_0000001/bucket_00000 |
| Found 2 items                                      |
| -rw-rw----+  3 hive hadoop          1 2019-09-22 12:18 /warehouse/tablespace/managed/hive/hivedb.db/iowa_liquor_sales/delete_delta_0000002_0000002_0000/_orc_acid_version |
| -rw-rw----+  3 hive hadoop       1540 2019-09-22 12:18 /warehouse/tablespace/managed/hive/hivedb.db/iowa_liquor_sales/delete_delta_0000002_0000002_0000/bucket_00000 |
| Found 2 items                                      |
| -rw-rw----+  3 hive hadoop          1 2019-09-22 12:20 /warehouse/tablespace/managed/hive/hivedb.db/iowa_liquor_sales/delete_delta_0000003_0000003_0000/_orc_acid_version |
| -rw-rw----+  3 hive hadoop       1496 2019-09-22 12:20 /warehouse/tablespace/managed/hive/hivedb.db/iowa_liquor_sales/delete_delta_0000003_0000003_0000/bucket_00000 |
| Found 2 items                                      |
| -rw-rw----+  3 hive hadoop          1 2019-09-22 12:23 /warehouse/tablespace/managed/hive/hivedb.db/iowa_liquor_sales/delete_delta_0000004_0000004_0000/_orc_acid_version |
| -rw-rw----+  3 hive hadoop      10868 2019-09-22 12:23 /warehouse/tablespace/managed/hive/hivedb.db/iowa_liquor_sales/delete_delta_0000004_0000004_0000/bucket_00000 |
| Found 2 items                                      |
| -rw-rw----+  3 hive hadoop          1 2019-09-22 12:18 /warehouse/tablespace/managed/hive/hivedb.db/iowa_liquor_sales/delta_0000002_0000002_0000/_orc_acid_version |
| -rw-rw----+  3 hive hadoop       3065 2019-09-22 12:18 /warehouse/tablespace/managed/hive/hivedb.db/iowa_liquor_sales/delta_0000002_0000002_0000/bucket_00000 |
| Found 2 items                                      |
| -rw-rw----+  3 hive hadoop          1 2019-09-22 12:20 /warehouse/tablespace/managed/hive/hivedb.db/iowa_liquor_sales/delta_0000003_0000003_0000/_orc_acid_version |
| -rw-rw----+  3 hive hadoop       3064 2019-09-22 12:20 /warehouse/tablespace/managed/hive/hivedb.db/iowa_liquor_sales/delta_0000003_0000003_0000/bucket_00000 |
| Found 2 items                                      |
| -rw-rw----+  3 hive hadoop          1 2019-09-22 12:23 /warehouse/tablespace/managed/hive/hivedb.db/iowa_liquor_sales/delta_0000004_0000004_0000/_orc_acid_version |
| -rw-rw----+  3 hive hadoop     304483 2019-09-22 12:23 /warehouse/tablespace/managed/hive/hivedb.db/iowa_liquor_sales/delta_0000004_0000004_0000/bucket_00000 |
+----------------------------------------------------+

更新した行数でなくて、更新回数に比例してdelteの差分ファイルが増えていくことがわかりました。従来のHiveのようにパーティション単位でのINSERT文がメインで、ときどき一部のレコードにUPDATE文を行うようなワークロードであれば差分ファイルもそこまでできないので、あまり性能劣化を気にせずに使えそうです。

 

2. 同じ行を繰り返し更新し続けるとどうなるか?

同じ行に対するUPDATE文を350回繰り返す

同じ複数の行に対しカラムの元の値を加工して更新するUPDATE文を350回繰り返して、データファイルの状態とSELECT文の実行速度がどれくらい影響を受けるか試してみます。先にsale_dateをCURRENT_DATE(2019-09-22)に更新してますが、その対象行に対して更新を繰り返しました。

UPDATE iowa_liquor_sales
SET
  state_bottle_cost = CONCAT('$', CAST(REPLACE(state_bottle_cost, '$', '') AS double) + 0.5),
  state_bottle_retail = CONCAT('$', CAST(REPLACE(state_bottle_retail, '$', '') AS double) + 0.5),
  bottles_sold = bottles_sold + 1,
  sale = CONCAT('$', (CAST(REPLACE(state_bottle_retail, '$', '') AS double) + 0.5) * (bottles_sold + 1) )
WHERE
  sale_date = '2019-09-22';

結果、上記のUPDATE文を350回やり終えるのに、1日以上かかりました。相当遅いですね。最初の30回くらいまでは一回のUPDATE文で20秒ほどだったのが、回数を重ねるごとに遅くなっていき、100回超えると1分近くになり、200回を超えると2分半近く、300回を超えると5分を超え350回あたりでは7分以上かかっていました。 それからデータファイルも1000個以上できていました。

 

NO_AUTO_COMPACTION=falseでテーブルを作ったこともあり、コンパクション処理が適宜発生して差分のファイル数が抑えられるのかと思いましたが、今回のケースではそうでもなく、比較検証としてNO_AUTO_COMPACTION=trueにした以外全く同じテーブルを作って同様のUPDATE文を繰り返してみましたが、どちらも同じ1000個以上のデータファイル数でした。 コンパクション処理の設定も色々あるので、このあたりはもっと探求する必要がありそうです。

 

UPDATEを繰り返した後にSELECT文の実行速度を見る

更新した行に対するSELECT文と、更新していない行に対するSELECT文で比較しました。

前者のSELECT文

SELECT MAX(CAST(REPLACE(sale, '$', '') AS double)) AS sale, bottles_sold
FROM iowa_liquor_sales
WHERE sale_date = '2015-12-28' AND bottles_sold >= 2
GROUP BY bottles_sold
;
----------------------------------------------------------------------------------------------
        VERTICES      MODE        STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
----------------------------------------------------------------------------------------------
Map 1 ..........      llap     SUCCEEDED      3          3        0        0       0       0
Reducer 2 ......      llap     SUCCEEDED     12         12        0        0       0       0
----------------------------------------------------------------------------------------------
VERTICES: 02/02  [==========================>>] 100%  ELAPSED TIME: 403.84 s
----------------------------------------------------------------------------------------------

後者のSELECT文

SELECT MAX(CAST(REPLACE(sale, '$', '') AS double)) AS sale, bottles_sold
FROM iowa_liquor_sales
WHERE sale_date = '2019-09-22' AND bottles_sold >= 352
GROUP BY bottles_sold
;
----------------------------------------------------------------------------------------------
        VERTICES      MODE        STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
----------------------------------------------------------------------------------------------
Map 1 ..........      llap     SUCCEEDED      3          3        0        0       0       0
Reducer 2 ......      llap     SUCCEEDED     12         12        0        0       0       1
----------------------------------------------------------------------------------------------
VERTICES: 02/02  [==========================>>] 100%  ELAPSED TIME: 402.25 s
----------------------------------------------------------------------------------------------

どちらも400秒近くかかり、あまり差が出ませんでした。どうやら更新した行だけ遅くなるわけではないようです。 実行速度がどれくらい遅くなったかを調べるため、UPDATE文による更新を全くしていない状態の別テーブルを作り、前者のSELECT文と同じ内容のクエリを発行したときは1~2秒ほどでした。 コンパクション処理をしないととても遅くなることが改めてわかった結果です。

ちなみに、LLAPでキャッシュを使う設定にしている場合、全く同じ内容のSELECT文を投げるとクエリキャッシュが働いて0.2秒未満で結果が返ってくるため、上記のSELECT文は一回目の結果です。

 

3. 手動でコンパクションしてみる

明らかに性能が劣化したのと、当初想定したようなコンパクション処理が行われなかった感じなので、手動でコンパクションを行ってみました。

Hiveのコンパクション処理には、コンパクションを行う他の主要なプロダクトと同じようにマイナーとメジャーの2種類があります。

 

マイナーコンパクション

まずマイナーコンパクションをやってみます。以下のようなクエリを発行します。ALTER TABLE文の一種ですね。

ALTER TABLE iowa_liquor_sales COMPACT 'minor';

このALTER TABLE文はわずか0.2秒ほどで返ってきました。そんなに速くコンパクション処理が終わることはなくバックエンドでは処理が続行しており、データファイルをちょくちょく見ると一時的なファイルやデータファイルを集約しているような挙動が見られました。 バックエンドでの処理が落ち着いたのは大体5分後くらいです。 データファイルを見ると、1000個以上あったものが以下のようにすっきりしていました。

dfs -ls /warehouse/tablespace/managed/hive/hivedb.db/iowa_liquor_sales/*;
+----------------------------------------------------+
|                     DFS Output                     |
+----------------------------------------------------+
| Found 2 items                                      |
| -rw-rw----+  3 hive hadoop          1 2019-09-22 11:49 /warehouse/tablespace/managed/hive/hivedb.db/iowa_liquor_sales/base_0000001/_orc_acid_version |
| -rw-rw----+  3 hive hadoop  329673920 2019-09-22 11:58 /warehouse/tablespace/managed/hive/hivedb.db/iowa_liquor_sales/base_0000001/bucket_00000 |
| Found 2 items                                      |
| -rw-rw----+  3 hive hadoop          1 2019-10-02 14:55 /warehouse/tablespace/managed/hive/hivedb.db/iowa_liquor_sales/delete_delta_0000002_0000358/_orc_acid_version |
| -rw-rw----+  3 hive hadoop     685524 2019-10-02 14:55 /warehouse/tablespace/managed/hive/hivedb.db/iowa_liquor_sales/delete_delta_0000002_0000358/bucket_00000 |
| Found 2 items                                      |
| -rw-rw----+  3 hive hadoop          1 2019-10-02 14:55 /warehouse/tablespace/managed/hive/hivedb.db/iowa_liquor_sales/delta_0000002_0000358/_orc_acid_version |
| -rw-rw----+  3 hive hadoop  146044206 2019-10-02 14:55 /warehouse/tablespace/managed/hive/hivedb.db/iowa_liquor_sales/delta_0000002_0000358/bucket_00000 |
+----------------------------------------------------+

delete_delteとdeltaの差分ファイルが残っていますが、それぞれ一つのファイルにまとまっています。これを見るとマイナーコンパクションはその時点のdeltaをマージする処理に見えます。 マイナーコンパクション後に、先ほど性能を見るために投げた更新していない行に対するSELECT文と同じようなクエリを投げてみると、2.5秒くらいでした。

 

メジャーコンパクション

マイナーコンパクションの後で今度はメジャーコンパクションを行ってみます。

ALTER TABLE iowa_liquor_sales COMPACT 'major';

マイナーコンパクションのときと同様、0.2秒ほどで返ってきます。 コンパクション処理中に、SELECT文を投げるとどうなるかをここで試してみました。マイナーコンパクション後に投げたSELECT文と同じようなクエリを投げてみると、2秒弱でした。 この結果を見ると、コンパクション中でも大きな性能劣化なく実行できることがわかります。

 

また、バックエンドでコンパクション処理を行っているYARNアプリケーションを確認すると、compactorの文字が入ったMAPREDUCEアプリケーションがdefaultキューで稼働していました。マイナーコンパクションのときも同様です。このときに利用するメモリは8GBで、デフォルト設定を変えない限り、defaultキューで8GBのメモリをコンパクション処理で使うようです。 メジャーコンパクションのバックエンドで動く処理が落ち着いたのも大体5分後くらいでした。

メジャーコンパクション後のデータファイルを確認します。

dfs -ls /warehouse/tablespace/managed/hive/hivedb.db/iowa_liquor_sales/*;
+----------------------------------------------------+
|                     DFS Output                     |
+----------------------------------------------------+
| Found 3 items                                      |
| -rw-rw----+  3 hive hadoop         48 2019-10-02 15:15 /warehouse/tablespace/managed/hive/hivedb.db/iowa_liquor_sales/base_0000358/_metadata_acid |
| -rw-rw----+  3 hive hadoop          1 2019-10-02 15:15 /warehouse/tablespace/managed/hive/hivedb.db/iowa_liquor_sales/base_0000358/_orc_acid_version |
| -rw-rw----+  3 hive hadoop  329731684 2019-10-02 15:15 /warehouse/tablespace/managed/hive/hivedb.db/iowa_liquor_sales/base_0000358/bucket_00000 |
+----------------------------------------------------+

deltaの差分ファイルがなくなってすべてbeseのディレクトリにまとめられています。メジャーコンパクションを行うと、_metadata_acidというファイルが新たにできるようです。 マイナーコンパクション後に投げたのと全く同じSELECT文を投げると、クエリキャッシュが働き0.2秒未満で返ってきました。メジャーコンパクションをしてもクエリキャッシュは破棄されないようです。 条件を少し変えた同じようなクエリを投げると、2秒弱でした。マイナーコンパクション後より若干速くなった印象です。

 

4. ロックの動きを見る

最後にトランザクションの検証でもっとも大事なロックの検証を試してみます。beeline(Hiveクライアント)で二つ接続し、同じ1行に対する別々のUPDATE文を片方が投げているときにもう片方も投げたらどうなるかを検証してみました。

トランザクション①

UPDATE iowa_liquor_sales
SET
  item_description = concat('[A] ', item_description)
WHERE
  invoice_item_number = 'S10532700017'
;

トランザクション②

UPDATE iowa_liquor_sales
SET
  item_description = concat('[Z] ', item_description)
WHERE
  invoice_item_number = 'S10532700017'
;

トランザクション①のUPDATE文を投げた直後に、トランザクション②を投げて、それぞれどうなるかを確認しました。 S10532700017の行に対して①が先にロックを取るので、②が待機されて①が更新し終わったら②も更新が終わり、item_descriptionカラムの先頭が[Z][A]となることを期待して検証したのですが、なんと①が終わると②がエラーを返しました!!!

たしかに整合性は保証されていますが、先勝ちで後のトランザクションがエラーになるのはちょっと扱いずらいと思います。もっと詳しくロックの仕様を知る必要はありますが、重要なポイントであることには違いないです。 参考までにトランザクション②で返ってきたエラーを載せておきます。

ERROR : FAILED: Hive Internal Error: org.apache.hadoop.hive.ql.lockmgr.LockException(Transaction manager has aborted the transaction txnid:3558.  Reason: Aborting [txnid:3558,3558] due to a write conflict on hivedb/iowa_liquor_sales committed by [txnid:3557,3558] u/u)
org.apache.hadoop.hive.ql.lockmgr.LockException: Transaction manager has aborted the transaction txnid:3558.  Reason: Aborting [txnid:3558,3558] due to a write conflict on hivedb/iowa_liquor_sales committed by [txnid:3557,3558] u/u
        at org.apache.hadoop.hive.ql.lockmgr.DbTxnManager.commitTxn(DbTxnManager.java:511)
        at org.apache.hadoop.hive.ql.Driver.releaseLocksAndCommitOrRollback(Driver.java:1703)
        at org.apache.hadoop.hive.ql.Driver.releaseLocksAndCommitOrRollback(Driver.java:1669)
        at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:2058)
        at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1746)
        at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1740)
        at org.apache.hadoop.hive.ql.reexec.ReExecDriver.run(ReExecDriver.java:157)
        at org.apache.hive.service.cli.operation.SQLOperation.runQuery(SQLOperation.java:226)
        at org.apache.hive.service.cli.operation.SQLOperation.access$700(SQLOperation.java:87)
        at org.apache.hive.service.cli.operation.SQLOperation$BackgroundWork$1.run(SQLOperation.java:318)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
        at org.apache.hive.service.cli.operation.SQLOperation$BackgroundWork.run(SQLOperation.java:331)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: TxnAbortedException(message:Aborting [txnid:3558,3558] due to a write conflict on hivedb/iowa_liquor_sales committed by [txnid:3557,3558] u/u)
        at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$commit_txn_result$commit_txn_resultStandardScheme.read(ThriftHiveMetastore.java)
        at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$commit_txn_result$commit_txn_resultStandardScheme.read(ThriftHiveMetastore.java)
        at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$commit_txn_result.read(ThriftHiveMetastore.java)
        at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:86)
        at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_commit_txn(ThriftHiveMetastore.java:5403)
        at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.commit_txn(ThriftHiveMetastore.java:5390)
        at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.commitTxn(HiveMetaStoreClient.java:2648)
        at sun.reflect.GeneratedMethodAccessor73.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:212)
        at com.sun.proxy.$Proxy37.commitTxn(Unknown Source)
        at sun.reflect.GeneratedMethodAccessor73.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.hadoop.hive.metastore.HiveMetaStoreClient$SynchronizedHandler.invoke(HiveMetaStoreClient.java:2934)
        at com.sun.proxy.$Proxy37.commitTxn(Unknown Source)
        at org.apache.hadoop.hive.ql.lockmgr.DbTxnManager.commitTxn(DbTxnManager.java:506)
        ... 20 more

Error: Error while processing statement: FAILED: Hive Internal Error: org.apache.hadoop.hive.ql.lockmgr.LockException(Transaction manager has aborted the transaction txnid:3558.  Reason: Aborting [txnid:3558,3558] due to a write conflict on hivedb/iowa_liquor_sales committed by [txnid:3557,3558] u/u) (state=42000,code=12)

以前のブログで書いたように、Hiveのメタデータを格納するデータベース(Percona XtraDB Cluster)のテーブル定義に手を入れているのですが(Primary KeyがないテーブルにPrimary Keyを追加)、 ここまででデータベース側で特にエラーは出ていません。なので、そのときのPrimary Key追加のやり方はある程度大丈夫と言えそうです。

 

5. おまけ(Primary Key検索)

Hive3ではテーブルを作る際にPrimary Keyを作ることができます。1行だけのUPDATE文をもっと速くするために、Primary Keyを張ったらどうなるかをおまけで試してみました。

CREATE TABLE iowa_liquor_sales_pk (
  invoice_item_number  varchar(50)
 ,sale_date  date
 ,store_number  integer
 ,store_name  varchar(100)
 ,address  varchar(100)
 ,city  varchar(100)
 ,zip_code  integer
 ,store_location  string
 ,county_number  integer
 ,county  varchar(50)
 ,category  varchar(50)
 ,category_name  varchar(50)
 ,vendor_number  integer
 ,vendor_name  varchar(50)
 ,item_number  integer
 ,item_description  varchar(100)
 ,pack  integer
 ,bottle_volume  integer
 ,state_bottle_cost  varchar(50)
 ,state_bottle_retail  varchar(50)
 ,bottles_sold  integer
 ,sale  varchar(50)
 ,volume_sold_liters  integer
 ,volume_sold_gallons  integer
, PRIMARY KEY (invoice_item_number) DISABLE NOVALIDATE
)
STORED AS ORC
TBLPROPERTIES (
   'transactional'='true',
   'transactional_properties'='default',
   'NO_AUTO_COMPACTION'='false'
);

Primary Keyなどの制約については発展途上のようで、Primary KeyもDISABLE NOVALIDATEオプションでしか作成できません。 テーブル作成して、データをINSERTしデータファイルを見てみると以下のようになっていました。

dfs -ls /warehouse/tablespace/managed/hive/hivedb.db/iowa_liquor_sales_pk/*;
+----------------------------------------------------+
|                     DFS Output                     |
+----------------------------------------------------+
| Found 3 items                                      |
| -rw-rw----+  3 hive hadoop          1 2019-10-02 21:05 /warehouse/tablespace/managed/hive/hivedb.db/iowa_liquor_sales_pk/base_0000001/_orc_acid_version |
| -rw-rw----+  3 hive hadoop  273008362 2019-10-02 21:12 /warehouse/tablespace/managed/hive/hivedb.db/iowa_liquor_sales_pk/base_0000001/bucket_00000 |
| -rw-rw----+  3 hive hadoop   56736282 2019-10-02 21:07 /warehouse/tablespace/managed/hive/hivedb.db/iowa_liquor_sales_pk/base_0000001/bucket_00001 |
+----------------------------------------------------+

Primary Keyなしのときと比較すると、データファイルが一つ多くなっていることがわかります。bucket_00001がPrimary Keyの何かしらのファイルに見えます。 単純なPrimary Keyで1行検索する以下のようなSELECT文を使って、Primary Keyあり/なしで速度を比較してみました。

select sale_date, item_description from iowa_liquor_sales_pk where invoice_item_number = 'S10532700017';

何かしら速くなるのかと思っていたら、Primary Keyがあってもなくても30秒ほどかかりました(つまりPrimary Keyで速くなっていない)。 Hive3のPrimary Keyについてはもっとウオッチする必要がありそうです。

 

3. まとめ

1. 検証結果

今回UPDATE文を色々検証しましたが、Hive3のACIDトランザクションは、従来のHiveを使っていた者からするとかなり進化した印象を受けました。コンパクションの運用をうまくするためのノウハウは必須ですが、これからHiveを使うにあたってUPDATE文は前提になってきそうです。

 

LLAPとACIDトランザクションが前提になってくると、ノード1台あたりのメモリ容量がますます必要で、Hiveに関わるパラメータ設定もたくさん増えています。Hadoopクラスタが進化すればするほど、ビッグデータを扱うハードルが上がってしまうのにジレンマがありそうです。

それから、HortonworksとClouderaが統合して、HadoopクラスタのパッケージがCDPという製品に統一されるようなので、Hive3やLLAPがどうなっていくのかも注目です。

 

2. 今後やりたいこと

今回やり切れなかったことで今後やってみたい検証は、

  • 激しいOLTPワークロードの検証
  • 1行に対するUPDATEをもっと速くする(Primary KeyやIndex?)
  • Spark2とHive3の連携(SparkからHiveのトランザクション対応テーブルへの読み書き)

あたりです。SparkとHiveの使い分けはとても強力なので、Spark連携はぜひ試してみたいです。

 

最後に



次世代システム研究室では、データサイエンティスト/データ分析エンジニアやアーキテクトを募集しています。ビッグデータの解析業務など次世代システム研究室にご興味を持って頂ける方がいらっしゃいましたら、ぜひ 募集職種一覧 からご応募をお願いします。

 

皆さんのご応募をお待ちしています。

  • Twitter
  • Facebook
  • はてなブックマークに追加

グループ研究開発本部の最新情報をTwitterで配信中です。ぜひフォローください。

 
  • AI研究開発室
  • 大阪研究開発グループ

関連記事