Hatena::ブログ(Diary)

ablog このページをアンテナに追加 RSSフィード

2018-09-13

AWS Glue とは

カタログ

データベース
テーブル
分類子(Classifier)

ETL

ジョブ
トリガー
開発エンドポイント

セキュリティ


Apache Spark の主要な抽象化の 1 つは SparkSQL DataFrame で、これは R と Pandas にある DataFrame 構造に似ています。DataFrame はテーブルと似ており、機能スタイル (マップ/リデュース/フィルタ/その他) 操作と SQL 操作 (選択、プロジェクト、集計) をサポートしています。

DataFrames は、強力で広く使用されていますが、抽出、変換、およびロード (ETL) 操作に関しては制限があります。最も重要なのは、データをロードする前にスキーマを指定する必要があることです。SparkSQL は、データに対してパスを 2 つ作ることでこれを解決します。1 つ目はスキーマを推測し、2 つ目はデータをロードします。ただし、この推測は限定されており、実際の煩雑なデータには対応しません。たとえば、同じフィールドが異なるレコードの異なるタイプである可能性があります。Apache Spark は、多くの場合、作業を中断して、元のフィールドテキストを使用してタイプを string として報告します。これは正しくない可能性があり、スキーマの不一致を解決する方法を細かく制御する必要があります。また、大規模なデータセットの場合、ソースデータに対する追加パスが非常に高価になる可能性があります。

これらの制限に対応するために、AWS Glue により DynamicFrame が導入されました。DynamicFrame は、DataFrame と似ていますが、各レコードが自己記述できるため、最初はスキーマは必要ありません。代わりに、AWS Glue は必要に応じてオンザフライでスキーマを計算し、選択 (または共用) タイプを使用してスキーマの不一致を明示的にエンコードします。これらの不整合を解決して、固定スキーマを必要とするデータストアとデータセットを互換性のあるものにできます。

同様に、DynamicRecord は DynamicFrame 内の論理レコードを表します。これは、Spark DataFrame の行と似ていますが、自己記述型であり、固定スキーマに適合しないデータに使用できます。

DynamicFrame クラス - AWS Glue

上記のドキュメントでは、Crawlerがテーブルを作成する際はデータの先頭2MBを見て判断すると記載されています。

AWS GlueのDynamicFrameの動きを見てみる | DevelopersIO

  • DPU

ETL ジョブの実行に使用された DPU (Data Processing Unit) の数に基づいて時間あたりの課金が発生します。1 つの DPU (Data Processing Unit) では 4 つの vCPU と 16 GB のメモリが提供されます。Glue の ETL ジョブには最低で 2 個の DPU が必要です。AWS Glue のデフォルトでは、各 ETL ジョブに 10 個の DPU が割り当てられます。DPU 時間あたり 0.44 ドルが 1 秒単位で課金され、最も近い秒単位に切り上げられます。ETL ジョブごとに 10 分の最小期間が設定されます。

AWS Glue の料金 ? アマゾン ウェブ サービス (AWS)
  • クローラー
    • DPU(データ処理ユニット)の時間単位で課金
    • 1DPU = 4vCPUと16GBメモリ
    • クローラーには2DPUが割り当て
    • 1DPU 0.44ドル/時
    • 最低10分とし、分単位で切り上げ請求
  • ETLジョブ
    • DPU(データ処理ユニット)の時間単位で課金
      • 1DPU = 4vCPUと16GBメモリ
      • Glue ETL処理には最低2DPUが必要
      • 本番環境のデフォルトはジョブごとに10DPUを割り当て
      • 開発エンドポイントのデフォルトは開発エンドポイントごとに5DPUを割り当て
    • 1DPU 0.44ドル/時
    • 最低10分とし、分単位で切り上げ請求
【新サービス】AWS上でフルマネージドなデータカタログとETLを実現するサービス『AWS Glue』がリリースされました!使い始めの準備をご紹介 | DevelopersIO
  • Zeppelinとは

ブラウザ上でプログラムインタラクティブに記述できるノートブックというものを作成するツール一種です。 有名なところではJupyter notebook(IPython notebook)があるかと思いますが、それと似たツールだと想像していただければと思います。

Apache Zeppelin入門 | Hadoop Advent Calendar 2016 #19 | DevelopersIO

ファイルの最初の方にデータの定義があり、その後にデータが連続して登録されています。jsonxmlのように定義名が何度も繰り返すような冗長さもなく、ファイルの最初を読み込んだ段階でデータ構造が理解できる「マシン・リーダブル」なフォーマット

(中略)

{
"name": "users",
"type": "record",
"fields": [
{"name": "id", "type": "int"},
{"name": "guid", "type": "string"},
{"name": "name", "type": "string"},
{"name": "address", "type": "string"}
]}
Amazon EMR の Avro フォーマットのデータを Amazon Redshift にロードする | DevelopersIO

AWS Glue では、開発エンドポイントを作成してから、REPL (Read-Evaluate-Print Loop) シェルを呼び出して PySpark コードを増分的に実行し、ETL スクリプトデプロイする前にインタラクティブデバッグできるようにします。

チュートリアル: 開発エンドポイントで REPL シェルを使用する - AWS Glue

2018-09-12

「AWS Cloudtrail Logs を AWS Glue と Amazon Quicksight 使って可視化する」をやってみた

AWS Cloudtrail Logs を AWS Glue と Amazon Quicksight 使って可視化する | Amazon Web Services ブログ を試してみた。


Lambda用ロールの作成
  • 名前: CloudTrailWatchLogs
  • インラインポリシー
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "arn:aws:logs:*:*:*"
        },
        {
            "Action": [
                "s3:*"
            ],
            "Resource": [
                "arn:aws:s3:::cloudtrail-do-not-delete/*",
                "arn:aws:s3:::cloudtrail-do-not-delete"
            ],
            "Effect": "Allow"
        }
    ]
}
Glue用ロールの作成
  • 名前: AWSGlueServiceRole-Default
  • 以下のポリシーをアタッチ
    • AmazonS3FullAccess
    • AWSGlueServiceRole
Lambda関数の作成
from __future__ import print_function
import json
import urllib
import boto3
import gzip

s3 = boto3.resource('s3')
client = boto3.client('s3')

def convertColumntoLowwerCaps(obj):
    for key in obj.keys():
        new_key = key.lower()
        if new_key != key:
            obj[new_key] = obj[key]
            del obj[key]
    return obj


def lambda_handler(event, context):

    bucket = event['Records'][0]['s3']['bucket']['name']
    key = urllib.unquote_plus(event['Records'][0]['s3']['object']['key'].encode('utf8'))
    print(bucket)
    print(key)
    try:
        newKey = 'flatfiles/' + key.replace("/", "")
        client.download_file(bucket, key, '/tmp/file.json.gz')
        with gzip.open('/tmp/out.json.gz', 'w') as output, gzip.open('/tmp/file.json.gz', 'rb') as file:
            i = 0
            for line in file: 
                for record in json.loads(line,object_hook=convertColumntoLowwerCaps)['records']:
            		if i != 0:
            		    output.write("\n")
            		output.write(json.dumps(record))
            		i += 1
        client.upload_file('/tmp/out.json.gz', bucket,newKey)
        return "success"
    except Exception as e:
        print(e)
        print('Error processing object {} from bucket {}. Make sure they exist and your bucket is in the same region as this function.'.format(key, bucket))
        raise e
Glue クローラの作成(Flatfile用)
  • 名前: cloudTrailFlatfiles
  • Choose a data store: S3
  • インクルードパス: s3://cloudtrail-do-not-delete/flatfiles
  • IAM ロール: AWSGlueServiceRole-Default
  • データベース: cloudtrail
  • 頻度: 毎時
Glue クローラの作成(Parquet用)
  • 名前: cloudtrailParquetFiles
  • Choose a data store: S3
  • インクルードパス: s3://cloudtrail-do-not-delete/parquettrails 
  • IAM ロール: AWSGlueServiceRole-Default
  • データベース: cloudtrail
  • 頻度: 毎時
Glueジョブの作成
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import boto3
import time

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "cloudtrail", table_name = "flatfiles", transformation_ctx = "datasource0")
resolvechoice1 = ResolveChoice.apply(frame = datasource0, choice = "make_struct", transformation_ctx = "resolvechoice1")
relationalized1 = resolvechoice1.relationalize("trail", args["TempDir"]).select("trail")
datasink = glueContext.write_dynamic_frame.from_options(frame = relationalized1, connection_type = "s3", connection_options = {"path": "s3://cloudtrail-do-not-delete/parquettrails"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()
Athena でクエリ実行
select *
from cloudtrail.parquettrails
where eventtime > '2017-10-23T12:00:00Z' AND eventtime < '2017-10-23T13:00:00Z'
order by eventtime asc;

2018-09-05

EMR のブートストラップアクションはマスター・コア・タスクノードで実行される

EMR のブートストラップアクションはマスター・コア・タスクノードで実行されることを確認したメモ。


検証結果

起動時に実行するスクリプトを作成する
  • s3://az-test/bootstrap.sh
#!/bin/bash
set -e
wget -S -T 10 -t 5 http://elasticmapreduce.s3.amazonaws.com/bootstrap-actions/file.tar.gz
mkdir -p /home/hadoop/contents
tar -xzf file.tar.gz -C /home/hadoop/contents 

EMRクラスタを作成する

確認する
[hadoop@ip-172-31-14-243 ~]$ find . -name README -ls
524311    4 -rw-r--r--   1 hadoop   hadoop         15 Oct 22  2009 ./contents/README
[hadoop@ip-172-31-12-2 ~]$ find . -name README -ls
524311    4 -rw-r--r--   1 hadoop   hadoop         15 Oct 22  2009 ./contents/README
[hadoop@ip-172-31-5-49 ~]$ find . -name README -ls
524311    4 -rw-r--r--   1 hadoop   hadoop         15 Oct 22  2009 ./contents/README

参考

デフォルトでは、ブートストラップアクションは Hadoop ユーザーとして実行されます。ブートストラップアクションは、sudo を使用し、ルート権限で実行できます。

すべての Amazon EMR 管理インターフェイスでブートストラップアクションがサポートされています。コンソール、AWS CLI、または API から複数の bootstrap-action パラメータを指定すると、クラスターごとに最大 16 個のブートストラップアクションを指定できます。

クラスターの作成時に、Amazon EMR コンソールからオプションでブートストラップアクションを指定できます。

CLI を使用する場合、create-cluster コマンドを使用してクラスターを作成するときに --bootstrap-actions パラメータを追加して、Amazon EMR にブートストラップアクションスクリプトへの参照を渡すことができます。--bootstrap-actions パラメータシンタックスは次のとおりです。

AWS CLI

--bootstrap-actions Path=s3://mybucket/filename",Args=[arg1,arg2]
追加のソフトウェアをインストールするためのブートストラップアクションの作成 - Amazon EMR

--bootstrap-actions (list)

Specifies a list of bootstrap actions to run on each EC2 instance when a cluster is created. Bootstrap actions run on each instance immediately after Amazon EMR provisions the EC2 instance and before Amazon EMR installs specified applications.

You can specify a bootstrap action as an inline JSON structure enclosed in single quotation marks, or you can use a shorthand syntax, specifying multiple bootstrap actions, each separated by a space. When using the shorthand syntax, each bootstrap action takes the following parameters, separated by commas with no trailing space. Optional parameters are shown in [square brackets].

  • Path - The path and file name of the script to run, which must be accessible to each instance in the cluster. For example, Path=s3://mybucket/myscript.sh .
  • [Name] - A friendly name to help you identify the bootstrap action. For example, Name=BootstrapAction1
  • [Args] - A comma-separated list of arguments to pass to the bootstrap action script. Arguments can be either a list of values (Args=arg1,arg2,arg3 ) or a list of key-value pairs, as well as optional values, enclosed in square brackets (Args=[arg1,arg2=arg2value,arg3]) .
create-cluster — AWS CLI 1.16.35 Command Reference

*1:コア・タスクノードへはマスターノード経由で接続してもよい

2018-07-30

sysbench で MySQL にカスタムクエリを同時多重実行して一時ファイルを大量に使ってみる

インストール

curl -s https://packagecloud.io/install/repositories/akopytov/sysbench/script.rpm.sh | sudo bash
sudo yum -y install sysbench mysql

準備

  • 初期データロード
$ sysbench /usr/share/sysbench/oltp_read_write.lua \
 --db-driver=mysql \
 --table-size=100000 \
 --mysql-host=aurora01.*********.ap-northeast-1.rds.amazonaws.com \
 --mysql-user=awsuser \
 --mysql-password=********* \
 --mysql-db=mydb \
 --db-ps-mode=disable \
 prepare
  • カスタムスクリプト(/usr/share/sysbench/select_sort.lua)を作成する
function event(thread_id)
        db_query("select c.* from (select a.* from mydb.sbtest1 a cross join sbtest1 b) as c order by c.pad")
end

負荷をかける

  • 負荷をかける
$ sysbench /usr/share/sysbench/select_sort.lua \
 --db-driver=mysql \
 --mysql-db=mydb \
 --mysql-host=aurora01.*********.ap-northeast-1.rds.amazonaws.com \
 --mysql-user=awsuser \
 --mysql-password=********* \
 --time=300 \
 --db-ps-mode=disable \
 --threads=30 \
 run
  • 実行結果
sysbench 1.0.15 (using bundled LuaJIT 2.1.0-beta2)

Running the test with following options:
Number of threads: 30
Initializing random number generator from current time


Initializing worker threads...

Threads started!

FATAL: mysql_store_result() returned error 3 (Error writing file '/rdsdbdata/tmp/MYz8AJRr' (Errcode: 28 - No space left on device))
FATAL: `thread_run' function failed: /usr/share/sysbench/select_sort.lua:2: db_query() failed
FATAL: mysql_store_result() returned error 3 (Error writing file '/rdsdbdata/tmp/MY4EuUqw' (Errcode: 28 - No space left on device))
FATAL: `thread_run' function failed: /usr/share/sysbench/select_sort.lua:2: db_query() failed
(中略)
FATAL: mysql_store_result() returned error 3 (Error writing file '/rdsdbdata/tmp/MYoI30Js' (Errcode: 28 - No space left on device))
FATAL: `thread_run' function failed: /usr/share/sysbench/select_sort.lua:2: db_query() failed
FATAL: mysql_store_result() returned error 3 (Error writing file '/rdsdbdata/tmp/MYgWJgNX' (Errcode: 28 - No space left on device))

性能統計情報を確認する

  • performance_schema.events_statements_current を確認する
mysql> select thread_id, sql_text, sort_range, sort_rows, sort_scan,created_tmp_disk_tables, created_tmp_tables from performance_schema.events_statements_current where sql_text = 'select c.* from (select a.* from mydb.sbtest1 a cross join sbtest1 b) as c order by c.pad';
+-----------+-------------------------------------------------------------------------------------------+------------+-----------+-----------+-------------------------+--------------------+
| thread_id | sql_text                                                                                  | sort_range | sort_rows | sort_scan | created_tmp_disk_tables | created_tmp_tables |
+-----------+-------------------------------------------------------------------------------------------+------------+-----------+-----------+-------------------------+--------------------+
|       533 | select c.* from (select a.* from mydb.sbtest1 a cross join sbtest1 b) as c order by c.pad |          0 |         0 |         0 |                       1 |                  1 |
|       534 | select c.* from (select a.* from mydb.sbtest1 a cross join sbtest1 b) as c order by c.pad |          0 |         0 |         0 |                       1 |                  1 |
|       535 | select c.* from (select a.* from mydb.sbtest1 a cross join sbtest1 b) as c order by c.pad |          0 |         0 |         0 |                       1 |                  1 |
|       536 | select c.* from (select a.* from mydb.sbtest1 a cross join sbtest1 b) as c order by c.pad |          0 |         0 |         0 |                       1 |                  1 |
|       537 | select c.* from (select a.* from mydb.sbtest1 a cross join sbtest1 b) as c order by c.pad |          0 |         0 |         0 |                       1 |                  1 |
|       538 | select c.* from (select a.* from mydb.sbtest1 a cross join sbtest1 b) as c order by c.pad |          0 |         0 |         0 |                       1 |                  1 |
|       539 | select c.* from (select a.* from mydb.sbtest1 a cross join sbtest1 b) as c order by c.pad |          0 |         0 |         0 |                       1 |                  1 |
|       540 | select c.* from (select a.* from mydb.sbtest1 a cross join sbtest1 b) as c order by c.pad |          0 |         0 |         0 |                       1 |                  1 |
|       541 | select c.* from (select a.* from mydb.sbtest1 a cross join sbtest1 b) as c order by c.pad |          0 |         0 |         0 |                       1 |                  1 |
|       542 | select c.* from (select a.* from mydb.sbtest1 a cross join sbtest1 b) as c order by c.pad |          0 |         0 |         0 |                       1 |                  1 |
|       543 | select c.* from (select a.* from mydb.sbtest1 a cross join sbtest1 b) as c order by c.pad |          0 |         0 |         0 |                       1 |                  1 |
|       544 | select c.* from (select a.* from mydb.sbtest1 a cross join sbtest1 b) as c order by c.pad |          0 |         0 |         0 |                       1 |                  1 |
|       545 | select c.* from (select a.* from mydb.sbtest1 a cross join sbtest1 b) as c order by c.pad |          0 |         0 |         0 |                       1 |                  1 |
|       546 | select c.* from (select a.* from mydb.sbtest1 a cross join sbtest1 b) as c order by c.pad |          0 |         0 |         0 |                       1 |                  1 |
|       547 | select c.* from (select a.* from mydb.sbtest1 a cross join sbtest1 b) as c order by c.pad |          0 |         0 |         0 |                       1 |                  1 |
|       548 | select c.* from (select a.* from mydb.sbtest1 a cross join sbtest1 b) as c order by c.pad |          0 |         0 |         0 |                       1 |                  1 |
|       549 | select c.* from (select a.* from mydb.sbtest1 a cross join sbtest1 b) as c order by c.pad |          0 |         0 |         0 |                       1 |                  1 |
|       550 | select c.* from (select a.* from mydb.sbtest1 a cross join sbtest1 b) as c order by c.pad |          0 |         0 |         0 |                       1 |                  1 |
|       551 | select c.* from (select a.* from mydb.sbtest1 a cross join sbtest1 b) as c order by c.pad |          0 |         0 |         0 |                       1 |                  1 |
|       552 | select c.* from (select a.* from mydb.sbtest1 a cross join sbtest1 b) as c order by c.pad |          0 |         0 |         0 |                       1 |                  1 |
|       553 | select c.* from (select a.* from mydb.sbtest1 a cross join sbtest1 b) as c order by c.pad |          0 |         0 |         0 |                       1 |                  1 |
|       554 | select c.* from (select a.* from mydb.sbtest1 a cross join sbtest1 b) as c order by c.pad |          0 |         0 |         0 |                       1 |                  1 |
|       555 | select c.* from (select a.* from mydb.sbtest1 a cross join sbtest1 b) as c order by c.pad |          0 |         0 |         0 |                       1 |                  1 |
|       556 | select c.* from (select a.* from mydb.sbtest1 a cross join sbtest1 b) as c order by c.pad |          0 |         0 |         0 |                       1 |                  1 |
|       557 | select c.* from (select a.* from mydb.sbtest1 a cross join sbtest1 b) as c order by c.pad |          0 |         0 |         0 |                       1 |                  1 |
|       558 | select c.* from (select a.* from mydb.sbtest1 a cross join sbtest1 b) as c order by c.pad |          0 |         0 |         0 |                       1 |                  1 |
|       559 | select c.* from (select a.* from mydb.sbtest1 a cross join sbtest1 b) as c order by c.pad |          0 |         0 |         0 |                       1 |                  1 |
|       560 | select c.* from (select a.* from mydb.sbtest1 a cross join sbtest1 b) as c order by c.pad |          0 |         0 |         0 |                       1 |                  1 |
|       561 | select c.* from (select a.* from mydb.sbtest1 a cross join sbtest1 b) as c order by c.pad |          0 |         0 |         0 |                       1 |                  1 |
|       562 | select c.* from (select a.* from mydb.sbtest1 a cross join sbtest1 b) as c order by c.pad |          0 |         0 |         0 |                       1 |                  1 |
+-----------+-------------------------------------------------------------------------------------------+------------+-----------+-----------+-------------------------+--------------------+
30 rows in set (0.42 sec)

参考

MySQLトラブルシューティング

MySQLトラブルシューティング

2018-07-17

Amazon Redshift で awslabs の amazon-redshift-utils/AdminViews を一括作成する

awslabs/amazon-redshift-utils の AdminViews を一括作成する手順をメモ。


インストール

$ sudo yum -y install git
$ sudo yum -y install postgresql

GitHub からスクリプトを入手する

$ git clone https://github.com/awslabs/amazon-redshift-utils.git

ビューを作成する

$ cd amazon-redshift-utils/src/AdminViews
$ ls -tr v_*.sql|perl -nle 'print qq/\\i $_/' > create_all_views.sql
  • Redshift にスーパーユーザーで接続する
$ psql "host=ds28xl4n.********.us-west-2.redshift.amazonaws.com user=awsuser dbname=mydb port=5439"
# create schema admin authorization awsuser;
  • ビューを一括作成する
# \i create_all_views.sql