Hatena::ブログ(Diary)

ablog このページをアンテナに追加 RSSフィード

2018-10-20

Presto で Parquet にクエリするとファイル中の必要な Column chunk のみを読んでいるか

f:id:yohei-a:20181021053908j:image:w360 f:id:yohei-a:20181021053904j:image:w360 f:id:yohei-a:20181021053900j:image:w360

Column chunks

Column chunks are composed of pages written back to back. The pages share a common header and readers can skip over page they are not interested in. The data for the page follows the header and can be compressed and/or encoded. The compression and encoding is specified in the page metadata.

https://parquet.apache.org/documentation/latest/

f:id:yohei-a:20181021052338p:image:w640

f:id:yohei-a:20181021052335p:image:w640

https://events.static.linuxfound.org/sites/events/files/slides/Presto.pdf

(1) read only required columns in Parquet and build columnar blocks on the fly, saving CPU and memory to transform row-based Parquet records into columnar blocks, and (2) evaluate the predicate using columnar blocks in the Presto engine.

no title

New Hive Parquet Reader

We have added a new Parquet reader implementation. The new reader supports vectorized reads, lazy loading, and predicate push down, all of which make the reader more efficient and typically reduces wall clock time for a query. Although the new reader has been heavily tested, it is an extensive rewrite of the Apache Hive Parquet reader, and may have some latent issues, so it is not enabled by default. If you are using Parquet we suggest you test out the new reader on a per-query basis by setting the <hive-catalog>.parquet_optimized_reader_enabled session property, or you can enable the reader by default by setting the Hive catalog property hive.parquet-optimized-reader.enabled=true. To enable Parquet predicate push down there is a separate session property <hive-catalog>.parquet_predicate_pushdown_enabled and configuration property hive.parquet-predicate-pushdown.enabled=true.

https://prestodb.io/docs/current/release/release-0.138.html

f:id:yohei-a:20181021051335p:image

Hadoop Internals for Oracle Developers and DBAs: Strata Conference + Hadoop World 2013 - O'Reilly Conferences, October 28 - 30, 2013, New York, NY
/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.parquet.reader;

import com.facebook.presto.parquet.RichColumnDescriptor;
import com.facebook.presto.spi.block.BlockBuilder;
import com.facebook.presto.spi.type.Type;
import io.airlift.slice.Slice;
import parquet.io.api.Binary;

import static com.facebook.presto.spi.type.Chars.isCharType;
import static com.facebook.presto.spi.type.Chars.truncateToLengthAndTrimSpaces;
import static com.facebook.presto.spi.type.Varchars.isVarcharType;
import static com.facebook.presto.spi.type.Varchars.truncateToLength;
import static io.airlift.slice.Slices.EMPTY_SLICE;
import static io.airlift.slice.Slices.wrappedBuffer;

public class BinaryColumnReader
        extends PrimitiveColumnReader
{
    public BinaryColumnReader(RichColumnDescriptor descriptor)
    {
        super(descriptor);
    }

    @Override
    protected void readValue(BlockBuilder blockBuilder, Type type)
    {
        if (definitionLevel == columnDescriptor.getMaxDefinitionLevel()) {
            Binary binary = valuesReader.readBytes();
            Slice value;
            if (binary.length() == 0) {
                value = EMPTY_SLICE;
            }
            else {
                value = wrappedBuffer(binary.getBytes());
            }
            if (isVarcharType(type)) {
                value = truncateToLength(value, type);
            }
            if (isCharType(type)) {
                value = truncateToLengthAndTrimSpaces(value, type);
            }
            type.writeSlice(blockBuilder, value);
        }
        else if (isValueNull()) {
            blockBuilder.appendNull();
        }
    }

    @Override
    protected void skipValue()
    {
        if (definitionLevel == columnDescriptor.getMaxDefinitionLevel()) {
            valuesReader.readBytes();
        }
    }
}
/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.parquet.reader;

import com.facebook.presto.parquet.DataPage;
import com.facebook.presto.parquet.DataPageV1;
import com.facebook.presto.parquet.DataPageV2;
import com.facebook.presto.parquet.DictionaryPage;
import com.facebook.presto.parquet.Field;
import com.facebook.presto.parquet.ParquetEncoding;
import com.facebook.presto.parquet.ParquetTypeUtils;
import com.facebook.presto.parquet.RichColumnDescriptor;
import com.facebook.presto.parquet.dictionary.Dictionary;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.block.BlockBuilder;
import com.facebook.presto.spi.type.DecimalType;
import com.facebook.presto.spi.type.Type;
import io.airlift.slice.Slice;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.ints.IntList;
import parquet.bytes.BytesUtils;
import parquet.column.ColumnDescriptor;
import parquet.column.values.ValuesReader;
import parquet.column.values.rle.RunLengthBitPackingHybridDecoder;
import parquet.io.ParquetDecodingException;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.Optional;
import java.util.function.Consumer;

import static com.facebook.presto.parquet.ParquetTypeUtils.createDecimalType;
import static com.facebook.presto.parquet.ValuesType.DEFINITION_LEVEL;
import static com.facebook.presto.parquet.ValuesType.REPETITION_LEVEL;
import static com.facebook.presto.parquet.ValuesType.VALUES;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Verify.verify;
import static java.util.Objects.requireNonNull;

public abstract class PrimitiveColumnReader
{
    private static final int EMPTY_LEVEL_VALUE = -1;
    protected final RichColumnDescriptor columnDescriptor;

    protected int definitionLevel = EMPTY_LEVEL_VALUE;
    protected int repetitionLevel = EMPTY_LEVEL_VALUE;
    protected ValuesReader valuesReader;

    private int nextBatchSize;
    private LevelReader repetitionReader;
    private LevelReader definitionReader;
    private long totalValueCount;
    private PageReader pageReader;
    private Dictionary dictionary;
    private int currentValueCount;
    private DataPage page;
    private int remainingValueCountInPage;
    private int readOffset;

    protected abstract void readValue(BlockBuilder blockBuilder, Type type);

    protected abstract void skipValue();

    protected boolean isValueNull()
    {
        return ParquetTypeUtils.isValueNull(columnDescriptor.isRequired(), definitionLevel, columnDescriptor.getMaxDefinitionLevel());
    }

    public static PrimitiveColumnReader createReader(RichColumnDescriptor descriptor)
    {
        switch (descriptor.getType()) {
            case BOOLEAN:
                return new BooleanColumnReader(descriptor);
            case INT32:
                return createDecimalColumnReader(descriptor).orElse(new IntColumnReader(descriptor));
            case INT64:
                return createDecimalColumnReader(descriptor).orElse(new LongColumnReader(descriptor));
            case INT96:
                return new TimestampColumnReader(descriptor);
            case FLOAT:
                return new FloatColumnReader(descriptor);
            case DOUBLE:
                return new DoubleColumnReader(descriptor);
            case BINARY:
                return createDecimalColumnReader(descriptor).orElse(new BinaryColumnReader(descriptor));
            case FIXED_LEN_BYTE_ARRAY:
                return createDecimalColumnReader(descriptor)
                        .orElseThrow(() -> new PrestoException(NOT_SUPPORTED, " type FIXED_LEN_BYTE_ARRAY supported as DECIMAL; got " + descriptor.getPrimitiveType().getOriginalType()));
            default:
                throw new PrestoException(NOT_SUPPORTED, "Unsupported parquet type: " + descriptor.getType());
        }
    }

    private static Optional<PrimitiveColumnReader> createDecimalColumnReader(RichColumnDescriptor descriptor)
    {
        Optional<Type> type = createDecimalType(descriptor);
        if (type.isPresent()) {
            DecimalType decimalType = (DecimalType) type.get();
            return Optional.of(DecimalColumnReaderFactory.createReader(descriptor, decimalType.getPrecision(), decimalType.getScale()));
        }
        return Optional.empty();
    }

    public PrimitiveColumnReader(RichColumnDescriptor columnDescriptor)
    {
        this.columnDescriptor = requireNonNull(columnDescriptor, "columnDescriptor");
        pageReader = null;
    }

    public PageReader getPageReader()
    {
        return pageReader;
    }

    public void setPageReader(PageReader pageReader)
    {
        this.pageReader = requireNonNull(pageReader, "pageReader");
        DictionaryPage dictionaryPage = pageReader.readDictionaryPage();

        if (dictionaryPage != null) {
            try {
                dictionary = dictionaryPage.getEncoding().initDictionary(columnDescriptor, dictionaryPage);
            }
            catch (IOException e) {
                throw new ParquetDecodingException("could not decode the dictionary for " + columnDescriptor, e);
            }
        }
        else {
            dictionary = null;
        }
        checkArgument(pageReader.getTotalValueCount() > 0, "page is empty");
        totalValueCount = pageReader.getTotalValueCount();
    }

    public void prepareNextRead(int batchSize)
    {
        readOffset = readOffset + nextBatchSize;
        nextBatchSize = batchSize;
    }

    public ColumnDescriptor getDescriptor()
    {
        return columnDescriptor;
    }

    public ColumnChunk readPrimitive(Field field)
            throws IOException
    {
        IntList definitionLevels = new IntArrayList();
        IntList repetitionLevels = new IntArrayList();
        seek();
        BlockBuilder blockBuilder = field.getType().createBlockBuilder(null, nextBatchSize);
        int valueCount = 0;
        while (valueCount < nextBatchSize) {
            if (page == null) {
                readNextPage();
            }
            int valuesToRead = Math.min(remainingValueCountInPage, nextBatchSize - valueCount);
            readValues(blockBuilder, valuesToRead, field.getType(), definitionLevels, repetitionLevels);
            valueCount += valuesToRead;
        }
        checkArgument(valueCount == nextBatchSize, "valueCount %s not equals to batchSize %s", valueCount, nextBatchSize);

        readOffset = 0;
        nextBatchSize = 0;
        return new ColumnChunk(blockBuilder.build(), definitionLevels.toIntArray(), repetitionLevels.toIntArray());
    }

    private void readValues(BlockBuilder blockBuilder, int valuesToRead, Type type, IntList definitionLevels, IntList repetitionLevels)
    {
        processValues(valuesToRead, ignored -> {
            readValue(blockBuilder, type);
            definitionLevels.add(definitionLevel);
            repetitionLevels.add(repetitionLevel);
        });
    }

    private void skipValues(int valuesToRead)
    {
        processValues(valuesToRead, ignored -> skipValue());
    }

    private void processValues(int valuesToRead, Consumer<Void> valueConsumer)
    {
        if (definitionLevel == EMPTY_LEVEL_VALUE && repetitionLevel == EMPTY_LEVEL_VALUE) {
            definitionLevel = definitionReader.readLevel();
            repetitionLevel = repetitionReader.readLevel();
        }
        int valueCount = 0;
        for (int i = 0; i < valuesToRead; i++) {
            do {
                valueConsumer.accept(null);
                valueCount++;
                if (valueCount == remainingValueCountInPage) {
                    updateValueCounts(valueCount);
                    if (!readNextPage()) {
                        return;
                    }
                    valueCount = 0;
                }
                repetitionLevel = repetitionReader.readLevel();
                definitionLevel = definitionReader.readLevel();
            }
            while (repetitionLevel != 0);
        }
        updateValueCounts(valueCount);
    }

    private void seek()
    {
        checkArgument(currentValueCount <= totalValueCount, "Already read all values in column chunk");
        if (readOffset == 0) {
            return;
        }
        int valuePosition = 0;
        while (valuePosition < readOffset) {
            if (page == null) {
                readNextPage();
            }
            int offset = Math.min(remainingValueCountInPage, readOffset - valuePosition);
            skipValues(offset);
            valuePosition = valuePosition + offset;
        }
        checkArgument(valuePosition == readOffset, "valuePosition %s must be equal to readOffset %s", valuePosition, readOffset);
    }

    private boolean readNextPage()
    {
        verify(page == null, "readNextPage has to be called when page is null");
        page = pageReader.readPage();
        if (page == null) {
            // we have read all pages
            return false;
        }
        remainingValueCountInPage = page.getValueCount();
        if (page instanceof DataPageV1) {
            valuesReader = readPageV1((DataPageV1) page);
        }
        else {
            valuesReader = readPageV2((DataPageV2) page);
        }
        return true;
    }

    private void updateValueCounts(int valuesRead)
    {
        if (valuesRead == remainingValueCountInPage) {
            page = null;
            valuesReader = null;
        }
        remainingValueCountInPage -= valuesRead;
        currentValueCount += valuesRead;
    }

    private ValuesReader readPageV1(DataPageV1 page)
    {
        ValuesReader rlReader = page.getRepetitionLevelEncoding().getValuesReader(columnDescriptor, REPETITION_LEVEL);
        ValuesReader dlReader = page.getDefinitionLevelEncoding().getValuesReader(columnDescriptor, DEFINITION_LEVEL);
        repetitionReader = new LevelValuesReader(rlReader);
        definitionReader = new LevelValuesReader(dlReader);
        try {
            byte[] bytes = page.getSlice().getBytes();
            rlReader.initFromPage(page.getValueCount(), bytes, 0);
            int offset = rlReader.getNextOffset();
            dlReader.initFromPage(page.getValueCount(), bytes, offset);
            offset = dlReader.getNextOffset();
            return initDataReader(page.getValueEncoding(), bytes, offset, page.getValueCount());
        }
        catch (IOException e) {
            throw new ParquetDecodingException("Error reading parquet page " + page + " in column " + columnDescriptor, e);
        }
    }

    private ValuesReader readPageV2(DataPageV2 page)
    {
        repetitionReader = buildLevelRLEReader(columnDescriptor.getMaxRepetitionLevel(), page.getRepetitionLevels());
        definitionReader = buildLevelRLEReader(columnDescriptor.getMaxDefinitionLevel(), page.getDefinitionLevels());
        return initDataReader(page.getDataEncoding(), page.getSlice().getBytes(), 0, page.getValueCount());
    }

    private LevelReader buildLevelRLEReader(int maxLevel, Slice slice)
    {
        if (maxLevel == 0) {
            return new LevelNullReader();
        }
        return new LevelRLEReader(new RunLengthBitPackingHybridDecoder(BytesUtils.getWidthFromMaxInt(maxLevel), new ByteArrayInputStream(slice.getBytes())));
    }

    private ValuesReader initDataReader(ParquetEncoding dataEncoding, byte[] bytes, int offset, int valueCount)
    {
        ValuesReader valuesReader;
        if (dataEncoding.usesDictionary()) {
            if (dictionary == null) {
                throw new ParquetDecodingException("Dictionary is missing for Page");
            }
            valuesReader = dataEncoding.getDictionaryBasedValuesReader(columnDescriptor, VALUES, dictionary);
        }
        else {
            valuesReader = dataEncoding.getValuesReader(columnDescriptor, VALUES);
        }

        try {
            valuesReader.initFromPage(valueCount, bytes, offset);
            return valuesReader;
        }
        catch (IOException e) {
            throw new ParquetDecodingException("Error reading parquet page in column " + columnDescriptor, e);
        }
    }
}
/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.hive.parquet;

import com.facebook.presto.hive.HiveColumnHandle;
import com.facebook.presto.parquet.Field;
import com.facebook.presto.parquet.ParquetCorruptionException;
import com.facebook.presto.parquet.reader.ParquetReader;
import com.facebook.presto.spi.ConnectorPageSource;
import com.facebook.presto.spi.Page;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.block.LazyBlock;
import com.facebook.presto.spi.block.LazyBlockLoader;
import com.facebook.presto.spi.block.RunLengthEncodedBlock;
import com.facebook.presto.spi.predicate.TupleDomain;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.spi.type.TypeManager;
import com.google.common.collect.ImmutableList;
import parquet.io.MessageColumnIO;
import parquet.schema.MessageType;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Optional;
import java.util.Properties;

import static com.facebook.presto.hive.HiveColumnHandle.ColumnType.REGULAR;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_BAD_DATA;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_CURSOR_ERROR;
import static com.facebook.presto.hive.parquet.ParquetPageSourceFactory.getParquetType;
import static com.facebook.presto.parquet.ParquetTypeUtils.getFieldIndex;
import static com.facebook.presto.parquet.ParquetTypeUtils.lookupColumnByName;
import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;
import static parquet.io.ColumnIOConverter.constructField;

public class ParquetPageSource
        implements ConnectorPageSource
{
    private static final int MAX_VECTOR_LENGTH = 1024;

    private final ParquetReader parquetReader;
    private final MessageType fileSchema;
    // for debugging heap dump
    private final List<String> columnNames;
    private final List<Type> types;
    private final List<Optional<Field>> fields;

    private final Block[] constantBlocks;
    private final int[] hiveColumnIndexes;

    private int batchId;
    private boolean closed;
    private long readTimeNanos;
    private final boolean useParquetColumnNames;

    public ParquetPageSource(
            ParquetReader parquetReader,
            MessageType fileSchema,
            MessageColumnIO messageColumnIO,
            TypeManager typeManager,
            Properties splitSchema,
            List<HiveColumnHandle> columns,
            TupleDomain<HiveColumnHandle> effectivePredicate,
            boolean useParquetColumnNames)
    {
        requireNonNull(splitSchema, "splitSchema is null");
        requireNonNull(columns, "columns is null");
        requireNonNull(effectivePredicate, "effectivePredicate is null");
        this.parquetReader = requireNonNull(parquetReader, "parquetReader is null");
        this.fileSchema = requireNonNull(fileSchema, "fileSchema is null");
        this.useParquetColumnNames = useParquetColumnNames;

        int size = columns.size();
        this.constantBlocks = new Block[size];
        this.hiveColumnIndexes = new int[size];

        ImmutableList.Builder<String> namesBuilder = ImmutableList.builder();
        ImmutableList.Builder<Type> typesBuilder = ImmutableList.builder();
        ImmutableList.Builder<Optional<Field>> fieldsBuilder = ImmutableList.builder();
        for (int columnIndex = 0; columnIndex < size; columnIndex++) {
            HiveColumnHandle column = columns.get(columnIndex);
            checkState(column.getColumnType() == REGULAR, "column type must be regular");

            String name = column.getName();
            Type type = typeManager.getType(column.getTypeSignature());

            namesBuilder.add(name);
            typesBuilder.add(type);
            hiveColumnIndexes[columnIndex] = column.getHiveColumnIndex();

            if (getParquetType(column, fileSchema, useParquetColumnNames) == null) {
                constantBlocks[columnIndex] = RunLengthEncodedBlock.create(type, null, MAX_VECTOR_LENGTH);
                fieldsBuilder.add(Optional.empty());
            }
            else {
                String columnName = useParquetColumnNames ? name : fileSchema.getFields().get(column.getHiveColumnIndex()).getName();
                fieldsBuilder.add(constructField(type, lookupColumnByName(messageColumnIO, columnName)));
            }
        }
        types = typesBuilder.build();
        fields = fieldsBuilder.build();
        columnNames = namesBuilder.build();
    }

    @Override
    public long getCompletedBytes()
    {
        return parquetReader.getDataSource().getReadBytes();
    }

    @Override
    public long getReadTimeNanos()
    {
        return readTimeNanos;
    }

    @Override
    public boolean isFinished()
    {
        return closed;
    }

    @Override
    public long getSystemMemoryUsage()
    {
        return parquetReader.getSystemMemoryContext().getBytes();
    }

    @Override
    public Page getNextPage()
    {
        try {
            batchId++;
            long start = System.nanoTime();

            int batchSize = parquetReader.nextBatch();

            readTimeNanos += System.nanoTime() - start;

            if (closed || batchSize <= 0) {
                close();
                return null;
            }

            Block[] blocks = new Block[hiveColumnIndexes.length];
            for (int fieldId = 0; fieldId < blocks.length; fieldId++) {
                if (constantBlocks[fieldId] != null) {
                    blocks[fieldId] = constantBlocks[fieldId].getRegion(0, batchSize);
                }
                else {
                    Type type = types.get(fieldId);
                    Optional<Field> field = fields.get(fieldId);
                    int fieldIndex;
                    if (useParquetColumnNames) {
                        fieldIndex = getFieldIndex(fileSchema, columnNames.get(fieldId));
                    }
                    else {
                        fieldIndex = hiveColumnIndexes[fieldId];
                    }
                    if (fieldIndex != -1 && field.isPresent()) {
                        blocks[fieldId] = new LazyBlock(batchSize, new ParquetBlockLoader(field.get()));
                    }
                    else {
                        blocks[fieldId] = RunLengthEncodedBlock.create(type, null, batchSize);
                    }
                }
            }
            return new Page(batchSize, blocks);
        }
        catch (PrestoException e) {
            closeWithSuppression(e);
            throw e;
        }
        catch (RuntimeException e) {
            closeWithSuppression(e);
            throw new PrestoException(HIVE_CURSOR_ERROR, e);
        }
    }

    private void closeWithSuppression(Throwable throwable)
    {
        requireNonNull(throwable, "throwable is null");
        try {
            close();
        }
        catch (RuntimeException e) {
            // Self-suppression not permitted
            if (e != throwable) {
                throwable.addSuppressed(e);
            }
        }
    }

    @Override
    public void close()
    {
        if (closed) {
            return;
        }
        closed = true;

        try {
            parquetReader.close();
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    private final class ParquetBlockLoader
            implements LazyBlockLoader<LazyBlock>
    {
        private final int expectedBatchId = batchId;
        private final Field field;
        private boolean loaded;

        public ParquetBlockLoader(Field field)
        {
            this.field = requireNonNull(field, "field is null");
        }

        @Override
        public final void load(LazyBlock lazyBlock)
        {
            if (loaded) {
                return;
            }

            checkState(batchId == expectedBatchId);

            try {
                Block block = parquetReader.readBlock(field);
                lazyBlock.setBlock(block);
            }
            catch (ParquetCorruptionException e) {
                throw new PrestoException(HIVE_BAD_DATA, e);
            }
            catch (IOException e) {
                throw new PrestoException(HIVE_CURSOR_ERROR, e);
            }
            loaded = true;
        }
    }
}

2018-09-24

HDFS キャッシング

HDFS のブロックはファイルシステムに保存されるため、Linux カーネルのページキャッシュを自然に使っていたが、ユーザー空間から制御できないため、HDFSキャッシングHadoop 2.3.0 以降)という機能がある。

HDFS上のデータの読み書きの際には、ディスクから読み出されたデータは、Linuxカーネル内のページキャッシュ(原文ではBuffer cacheとなってます)にキャッシュされます。(これにより毎回ディスクアクセスを避けることが期待できます)

HDFSが高速に?キャッシュメカニズムの追加 | Tech Blog

Hadoop 2.3.0 以降には「HDFSキャッシング」と呼ばれる、HDFSキャッシュ機構が搭載されています。

(中略)

HDFSの中央キャッシュ管理は、ユーザが明示的に指定したパスを、HDFSによって明示的にキャッシュする仕組みです。ネームノードはブロックをディスクに持つデータノード通信して、そのブロックを「オフピーク (off-heap)」キャッシュキャッシュします。

オフピークキャッシュは各データノードにある、JVMVMヒープ対象外のメモリ領域です。ユーザーがコマンドからキャッシュに登録するパスを指定することにより、ブロックがこの領域にキャッシュされます。

HDFSの新しい機能 - HDFSキャッシング | Tech Blog

https://www.ibm.com/support/knowledgecenter/ja/SSPT3X_4.1.0/com.ibm.swg.im.infosphere.biginsights.dev.doc/doc/biga_hdfscache.html を参考に手元の EMR(emr-5.17.0) のマスターノードで、hdfs-site.xmldfs.client.mmap.enabled や dfs.datanode.max.locked.memory のエントリはなく、キャッシュ・プールも無かったので、意図的に使わないと使われない模様。

$ hdfs cacheadmin -listPools
Found 0 results.

2018-09-23

HDFS の下の OS レイヤーを覗いてみる

Big Data Forensics: Learning Hadoop Investigations

Big Data Forensics: Learning Hadoop Investigations

  • HDFS collections through the host operating system

Targeted collection from a Hadoop client

The third method for collecting HDFS data from the host operating system is a targeted collection. The HDFS data is stored in defined locations within the host operating system. This data can be collected on a per-node basis through logical file copies. Every node needs to be collected to ensure the HDFS files can be reconstructed in the analysis phase.

The same process is conducted for both targeted collections and imaging collections, except for a couple of differences. With imaging collections, entire disk volumes are collected and hashed. Targeted collections involve the copying of individual files and directories. In both methods, the investigator collects the data, documents the process, and computes MD5/SHA-1 hash values. However, there are differences. In targeted collections, MD5/SHA-1 is computed on the files but not the volumes, the collection process requires multiple copies rather than a single image file, and certain metadata is not preserved. Also, investigators typically perform the targeted collection using scripts rather than manually typing the commands at runtime.

The first step for performing the targeted collection is to identify the location where the host operating system stores the HDFS files. For Linux, Unix, OS X, and other Unix variants, this can be found in the hdfs-site.xml file. While typically stored in the /etc/hadoop directory, it can be stored in other locations, so the investigator first needs to find this location before beginning. In Windows, this information is typically located in the Windows Hadoop installation directory c:\hadoop. To find the directory location from the command line, run the following command:


(中略)


The investigator should collect the entire DataNode tree structure. The structure is comprised of the following directories and files:

  • BP-<integer>-<IP Address>-<creation time>: This directory is the block pool that collects the blocks of data belonging to that DataNode.
  • finalized/rbw: The actual data blocks are stored in these directories. The finalized directory stores the blocks that have been completely written to disk. The rbw directory stands for replica being written and stores the blocks that are currently being written to HDFS.
  • VERSION: This text file stores property information. Each DataNode has a DataNode-wide VERSION file and also VERSION files for each block pool.
  • blk_<block ID>: The binary data blocks content files.
  • blk_<block ID>.meta: The binary data blocks metadata files.
  • dncp_block_verification: This file tracks the times in which the block was last verified via checksum.
  • in_use.lock: This is a lock file used by the DataNode process to prevent multiple DataNode processes from modifying the directory.

実際にちょっと見てみた。

(中略)

  <property>
    <name>dfs.name.dir</name>
    <value>/mnt/namenode</value>
  </property>

  <property>
    <name>dfs.data.dir</name>
    <value>/mnt/hdfs</value>
  </property>

(中略)
[root@ip-***-**-*-133 hdfs]# tree -d /mnt/hdfs
/mnt/hdfs
└── current
    └── BP-747367826-172.31.6.167-1537719042716
        ├── current
        │&#160;&#160; ├── finalized&#160;&#160; │&#160;&#160; └── subdir0&#160;&#160; │&#160;&#160;     ├── subdir0&#160;&#160; │&#160;&#160;     ├── subdir1&#160;&#160; │&#160;&#160;     ├── subdir3&#160;&#160; │&#160;&#160;     ├── subdir4&#160;&#160; │&#160;&#160;     ├── subdir5&#160;&#160; │&#160;&#160;     ├── subdir6&#160;&#160; │&#160;&#160;     ├── subdir7&#160;&#160; │&#160;&#160;     └── subdir8&#160;&#160; └── rbw
        └── tmp

15 directories
  • ファイルを確認する
[root@ip-***-**-*-133 subdir7]# pwd
/mnt/hdfs/current/BP-747367826-***.**.*.167-1537719042716/current/finalized/subdir0/subdir7
[root@ip-***-**-*-133 subdir7]# ls -lh|head
total 15G
-rw-r--r-- 1 hdfs hdfs  128M Sep 23 16:23 blk_1073743618
-rw-r--r-- 1 hdfs hdfs  1.1M Sep 23 16:23 blk_1073743618_2794.meta
-rw-r--r-- 1 hdfs hdfs  128M Sep 23 16:23 blk_1073743619
-rw-r--r-- 1 hdfs hdfs  1.1M Sep 23 16:23 blk_1073743619_2795.meta
-rw-r--r-- 1 hdfs hdfs  128M Sep 23 16:23 blk_1073743620
-rw-r--r-- 1 hdfs hdfs  1.1M Sep 23 16:23 blk_1073743620_2796.meta
-rw-r--r-- 1 hdfs hdfs  128M Sep 23 16:23 blk_1073743622
-rw-r--r-- 1 hdfs hdfs  1.1M Sep 23 16:23 blk_1073743622_2798.meta
-rw-r--r-- 1 hdfs hdfs  128M Sep 23 16:24 blk_1073743624
  • /mnt は HDFS のデータが保存されているのでサイズが大きい。
[root@ip-***-**-*-133 hdfs]# df
Filesystem      Size  Used Avail Use% Mounted on
devtmpfs         16G   76K   16G   1% /dev
tmpfs            16G     0   16G   0% /dev/shm
/dev/xvda1       99G  3.7G   95G   4% /
/dev/xvdb1      5.0G   37M  5.0G   1% /emr
/dev/xvdb2      495G   43G  452G   9% /mnt ★
[root@ip-***-**-*-133 hdfs]# mount
proc on /proc type proc (rw,relatime)
sysfs on /sys type sysfs (rw,relatime)
devtmpfs on /dev type devtmpfs (rw,relatime,size=16460148k,nr_inodes=4115037,mode=755)
devpts on /dev/pts type devpts (rw,relatime,gid=5,mode=620,ptmxmode=000)
tmpfs on /dev/shm type tmpfs (rw,relatime)
/dev/xvda1 on / type ext4 (rw,noatime,data=ordered)
devpts on /dev/pts type devpts (rw,relatime,gid=5,mode=620,ptmxmode=000)
none on /proc/sys/fs/binfmt_misc type binfmt_misc (rw,relatime)
/dev/xvdb1 on /emr type xfs (rw,relatime,attr2,inode64,noquota)
/dev/xvdb2 on /mnt type xfs★ (rw,relatime,attr2,inode64,noquota)
cgroup on /cgroup/blkio type cgroup (rw,relatime,blkio)
cgroup on /cgroup/cpu type cgroup (rw,relatime,cpu)
cgroup on /cgroup/cpuacct type cgroup (rw,relatime,cpuacct)
cgroup on /cgroup/cpuset type cgroup (rw,relatime,cpuset)
cgroup on /cgroup/devices type cgroup (rw,relatime,devices)
cgroup on /cgroup/freezer type cgroup (rw,relatime,freezer)
cgroup on /cgroup/hugetlb type cgroup (rw,relatime,hugetlb)
cgroup on /cgroup/memory type cgroup (rw,relatime,memory)
cgroup on /cgroup/perf_event type cgroup (rw,relatime,perf_event)

環境

2018-09-22

Parquet

検証結果

  • Athena
#クエリ実行時間I/O
1select count(*) from amazon_reviews_parquet 5.6秒0KB
2select count(year) from amazon_reviews_parquet 6.63秒2.58MB
3select count(review_body) from amazon_reviews_parquet 5.7秒34.05GB
4select * from amazon_reviews_parquet limit 100002.05秒455.11MB
5select year from amazon_reviews_parquet limit 100000.92秒163.79KB
6select review_body from amazon_reviews_parquet limit 100000.54秒3.25MB

f:id:yohei-a:20180923180926p:image

準備手順

$ aws s3 mb s3://amazon-reviews-pds-az
$ aws s3 cp --recursive s3://amazon-reviews-pds/ s3://amazon-reviews-pds-az
CREATE EXTERNAL TABLE amazon_reviews_parquet(
  marketplace string, 
  customer_id string, 
  review_id string, 
  product_id string, 
  product_parent string, 
  product_title string, 
  star_rating int, 
  helpful_votes int, 
  total_votes int, 
  vine string, 
  verified_purchase string, 
  review_headline string, 
  review_body string, 
  review_date bigint, 
  year int)
PARTITIONED BY (product_category string)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  's3://amazon-reviews-pds-az/parquet/'
MSCK REPAIR TABLE  amazon_reviews_parquet

情報採取

  • select count(*) from amazon_reviews_parquet
select eventname, count(eventname) as apicall_count from default.cloudtrail_logs_cloudtrail_269419664770_do_not_delete 
where eventsource = 's3.amazonaws.com' 
and useragent = 'athena.amazonaws.com' 
and awsregion = 'us-east-1'
and eventtime > '2018-09-23T07:12:00Z'
and eventtime < '2018-09-23T07:35:00Z'
group by eventname
order by apicall_count desc
eventnameapicall_count
GetObject2230
HeadObject2203
HeadBucket176
ListObjects43
PutObject2
  • select count(year) from amazon_reviews_parquet
select eventname, count(eventname) as apicall_count from default.cloudtrail_logs_cloudtrail_269419664770_do_not_delete 
where eventsource = 's3.amazonaws.com' 
and useragent = 'athena.amazonaws.com' 
and awsregion = 'us-east-1'
and eventtime > '2018-09-23T07:35:00Z'
and eventtime < '2018-09-23T07:55:00Z'
group by eventname
order by apicall_count desc
eventnameapicall_count
GetObject2894
HeadObject2238
HeadBucket229
ListObjects43
PutObject2
  • select count(review_body) from amazon_reviews_parquet
select eventname, count(eventname) as apicall_count from default.cloudtrail_logs_cloudtrail_269419664770_do_not_delete 
where eventsource = 's3.amazonaws.com' 
and useragent = 'athena.amazonaws.com' 
and awsregion = 'us-east-1'
and eventtime > '2018-09-23T07:50:00Z'
and eventtime < '2018-09-23T08:00:00Z'
group by eventname
order by apicall_count desc
eventnameapicall_count
GetObject3027
HeadObject2248
HeadBucket668
ListObjects43
PutObject2
  • select * from amazon_reviews_parquet limit 10000
select eventname, count(eventname) as apicall_count from default.cloudtrail_logs_cloudtrail_269419664770_do_not_delete 
where eventsource = 's3.amazonaws.com' 
and useragent = 'athena.amazonaws.com' 
and awsregion = 'us-east-1'
and eventtime > '2018-09-23T08:10:00Z'
and eventtime < '2018-09-23T08:20:00Z'
group by eventname
order by apicall_count desc
eventnameapicall_count
GetObject725
HeadObject607
HeadBucket105
ListObjects43
PutObject2
  • select year from amazon_reviews_parquet limit 10000
select eventname, count(eventname) as apicall_count from default.cloudtrail_logs_cloudtrail_269419664770_do_not_delete 
where eventsource = 's3.amazonaws.com' 
and useragent = 'athena.amazonaws.com' 
and awsregion = 'us-east-1'
and eventtime > '2018-09-23T08:20:00Z'
and eventtime < '2018-09-23T08:30:00Z'
group by eventname
order by apicall_count desc
eventnameapicall_count
HeadObject2095
GetObject1934
HeadBucket327
ListObjects43
PutObject2
  • select review_body from amazon_reviews_parquet limit 10000
select eventname, count(eventname) as apicall_count from default.cloudtrail_logs_cloudtrail_269419664770_do_not_delete 
where eventsource = 's3.amazonaws.com' 
and useragent = 'athena.amazonaws.com' 
and awsregion = 'us-east-1'
and eventtime > '2018-09-23T08:40:00Z'
and eventtime < '2018-09-23T08:59:00Z'
group by eventname
order by apicall_count desc
eventnameapicall_count
GetObject1153
HeadObject1117
HeadBucket500
ListObjects43
PutObject2

検証パターン

参考

2018-09-21

db tech showcase 2018 Day 3

2018/9/21(金)に開催された db tech showcase 2018 Day 3 のメモ。


Pythonから使える列指向ファイルフォーマット・Parquetを使おう

f:id:yohei-a:20180921093930j:image:w640

概要
  • 講師: 玉川 竜司さん(Sky株式会社)
  • 講師略歴: 本職はセキュリティソフトの開発。Pythonは2000年くらいから使用し始めている。db tech showcaseでは、MongoDBの人としてデビュー。本業の傍ら、オライリージャパンから「SRE サイトリライアビリティエンジニアリング」「初めてのSpark」「ヘルシープログラマ」「Google BigQuery」「Sparkによる実践データ解析」など技術翻訳書を多数発刊。
  • 内容: 大量のデータをCSVで保存するのは非効率です。そのデータを分析に利用するなら、列指向のフォーマットでデータを保存することで、保存に必要なストレージ容量や処理に必要なCPUパワーを大幅に削減できます。本セッションでは、Pythonから使える列指向のファイルフォーマットであるParquetについて、実例と共に説明します。
スライド
  • To be uploaded
メモ
  • 「指定したフィールドだけを読み取ることによるI/O削減」はファイルシステムからブロック単位で読むという意味だろうか?同じファイルに複数列の値が入っているが。
  • 主にfastparquetとPyArrowの2つのライブラリがある。Hadoop エコシステムの親和性では PyArrow のほうが優れているかも。

質疑応答
  • Parquet は Date 型が使えないが、それも考慮に入れた上で、ORC と Parquet でどちらが良いか。
  • メモリ空間を効率的に利用できてると思うが数値的に調べたことがありますか?
    • そこまでは調べてない

Deep Dive on the Amazon Aurora PostgreSQL-compatible Edition

f:id:yohei-a:20180921103132j:image:w640

概要
  • 講師: 江川 大地さん(アマゾン ウェブ サービス ジャパン 株式会社 - 技術統括本部 エンタープライズソリューション部 ソリューションアーキテクト)
  • 講師略歴: ソリューションアーキテクトとして、Amazon Web Services (AWS)を利用するお客様へ技術支援を行なっています。クラウドのメリットを活かしたシステムが増えるよう、日々活動しています。
  • 概要: Amazon Auroraは、クラウド時代にAmazonが再設計したRDBMSです。本セッションでは昨年リリースされたPostgreSQLと互換性を持つエンジンについて、そのアーキテクチャや特徴をご紹介します。
スライド
  • To be uploaded

MVCCにおけるw-w/w-r/r-wのあり方とcommit orderのあり方の再検討〜Sundial: Harmonizing Concurrency Control and Caching in a Distributed OLTP Database Management Systemを題材に

f:id:yohei-a:20180921133839j:image:w640

概要
  • 講師: 神林 飛志(株式会社ノーチラステクノロジーズ - 代表取締役会長)
  • 講師略歴: 2011年〜ノーチラステクノロジーズ代表取締役 Hadoopでの分散処理フレームワークAsakusaの開発・導入に従事 各社の原価計算システムの構築にも従事
  • 内容: サーバアーキテクチャの変更は、そのままデータベース・アーキテクチャへの否が応でもの変革を促します。特に、MVCCはP. Bernstein以降の理論的な枠組みのまま、現在のOCCの流れを無理矢理合流させたところもあり、その理論的な難易度と実装のリソース逼迫から一度見送られた風潮がありました。しかし、近年のサーバアーキテクチャの大幅な高進はMVCCに必要なリソースを提供できるだけの状態になり、MVCCは再検討/再実装の中で無視できないうねりになっています。他方、その理論的な難易度から「見てみないふりをした実装」も散見されるようになり、ユーザサイドではややもすれば「anomalyだだ漏れのバグというかこれは仕様ですDB」に直面することになります。今回はこのような状況をうまく捌くために、避けることのできないMVCCの理論的な枠組みについて、その内容を丁寧に後追いし、今後のあり方について模索を行う。
スライド
  • 非公開

Amazon Aurora - Latest innovations and updates behind Aurora’s torrid growth

f:id:yohei-a:20180921141628j:image:w640

概要
  • 講師: 星野 豊さん(アマゾン ウェブ サービス ジャパン 株式会社 - Aurora/RDS Specialist SA)
  • 講師略歴: Amazon AuroraやAmazon Relational Database Serviceのパフォーマンス・チューニングや新機能の活用など技術的な支援を行っています。新技術・ハイボリュームなトラフィックを扱うシステムが大好きです。
  • 内容: システムを構築する上で切り離すことはできないデータベース。 本セッションでは、Amazon Aurora がリリースされてから行ってきた機能追加や安定性向上に対する取り組みと、その内部アーキテクチャをご紹介し、実環境で運用する際に注意する点などの Tips もご紹介します。
スライド
  • to be uploaded
メモ
  • backtrack は最大72時間前まで戻せる。今どこにいるかの LSN を変えるだけなので戻しが速い。実データを書き換えているわけではない。Actual Backtrack Window で barck track できる実際の時間を確認できる。
  • Aurora Serverless は Ci/CD などテスト環境に適している。25〜30秒でスケールアウト/スケールダウンする。NLBの後ろにインスタンスがある。Warm Pool からインスタンスを取るのでスケールアウトが速い。
  • Performance insights はAPIでデータ取得することもできる。過去分のデータも参照できる。
  • 本日 Parallel Query が Aurora が利用できる全リージョンで GA した。EXPLAIN で実行計画が Parallel Query になっているかどうか確認できる。
  • Multi Master はWriter を複数立てておいて1つの Writer だけ更新用途で使うと F/O が速い。後は複数の Writer に別のページを更新する(conflictしない)処理を流してスループットを上げる。

進化を続ける Amazon Redshift のパフォーマンスチューニングテクニックと最新アップデート

f:id:yohei-a:20180923042703j:image:w640

概要
  • 講師: 大薗 純平さん(アマゾン ウェブ サービス ジャパン 株式会社 - 技術統括本部 レディネスソリューション本部 ソリューションアーキテクト)
  • 講師略歴: AWS のソリューションアーキテクト。データウェアハウス/ビッグデータアナリティクスの領域に関して、お客様の技術支援を担当しています。
  • 内容: スケーラブルで高速な AWS のマネージドデータウェアハウスである Amazon Redshift は常に進化を続けています。本セッションでは、Redshift の現在地におけるパフォーマンスチューニングテクニックと、最新のアップデート情報についてお話します。
スライド
  • to be uploaded

Platinumホルダーが選ぶ! 現場で役立つOracle Database18c新機能

f:id:yohei-a:20180923043305j:image:w640

概要
  • 講師: 五十嵐 一俊(株式会社コーソル/Japan Oracle User Group - Oracleサービスグループ)
  • 講師略歴: コーソル入社後、Oracle製品のサポート/コンサルティング業務を経て、Oracle ExadataのDBA業務に従事する。自社技術力向上にも取り組んでおり、その成果として3年連続ORACLE MASTER Platinum取得者数No.1を達成した。
  • 内容: Oracle Databaseを日々愛用(酷使)するコーソルのORACLE MASTER Platinum 12c保持エンジニアが、非常に多くの18c新機能から厳選した、必ず知っておくべき新機能、地味ながら絶対に現場の役に立つ新機能をご紹介します。
スライド
  • to be uploaded

Oracle Databaseバージョン選択における考察'18

f:id:yohei-a:20180923043324j:image:w640

概要
  • 講師:諸橋 渉さん(日本ヒューレット・パッカード株式会社/Japan Oracle User Group)
  • 講師略歴: データと分析にまつわる技術支援業務に従事。ささやかにデータ管理関連の様々な仕事を続けている。Oracle ACEのひとり。Japan Oracle User Groupボードメンバのひとり。
  • 内容: 年次リリースになったOracle Database。各社の関連クラウドサービス等の最新動向もふまえて、バージョン選定と採用時期を決めるための ひとつの考察。
スライド


P.S.

講師控室が同窓会のようで楽しい。

f:id:yohei-a:20180923044512j:image:w640

夜は 過橋米線 秋葉原店 で打ち上げ。美味しかったのでまた行きたい。