試験運用中なLinux備忘録・旧記事

はてなダイアリーで公開していた2007年5月-2015年3月の記事を保存しています。

CP932のエンコーディングに対応したZIPファイル展開スクリプトのPython 3対応版

Pythonのバージョン3系では内部の仕様に色々と変更が加えられ、バージョン2系向けのスクリプトはバージョン3系ではそのままでは動かないことも多く、移行支援目的のPythonスクリプト変換ツール2to3を用いても正しく実行できない場合がある。
PythonでZIPファイルを展開する(コード例)」のスクリプトはその例で、文字列とエンコーディングに関する仕様が変わったことに伴うzipfileモジュール内の変更によってZIPファイルの中のファイル名のデータ(エンコードされたバイト列データ)を(Unicodeのデータとして保持される)文字列へうまくデコードできなくなっている。
幸いzipfileのモジュールはバージョン3.1系の時点ではC言語ではなくPythonで書かれており、ZIPファイルはZipFileというクラスで扱うことになっているため、このクラスを継承してエンコーディング関係の処理を調整したものを使うようにしつつ試行錯誤することで、うまくUTF-8/CP932のどちらのエンコーディングの名前のファイルを含んだZIPファイルでも正しく展開することができるようになった(継承したクラスにおける変更点が適切かどうかは不明)。
その他、細かい部分を色々と調整したものを下に貼り付ける。
[任意]ファイル名: unzip-py3.py ライセンス: GPL-3 (or lator)(JpZipFileクラスの定義以外), Python Software Foundation License(JpZipFileクラスの定義のみ)

#! /usr/bin/python
# -*- coding: utf-8 -*-

# ZIPファイルを展開するPythonスクリプト(Python 3用)
# CP932エンコーディングのファイル名を含むZIPファイルに対応/タイムスタンプ復元機能
# version 20101117
# (C) 2009-2010 kakurasan
# Licensed under GPLv3+(main), Python Software Foundation License(JpZipFile)

from zipfile import ZipFile, _EndRecData, _ECD_SIZE, _ECD_OFFSET, _ECD_COMMENT, _ECD_LOCATION, _ECD_SIGNATURE, stringEndArchive64, sizeCentralDir, stringCentralDir, structCentralDir, _CD_FILENAME_LENGTH, ZipInfo, _CD_EXTRA_FIELD_LENGTH, _CD_COMMENT_LENGTH, _CD_LOCAL_HEADER_OFFSET, sizeFileHeader, stringFileHeader, structFileHeader, _FH_FILENAME_LENGTH, _FH_EXTRA_FIELD_LENGTH, ZipExtFile, BadZipfile
from optparse import OptionParser
import locale
import struct
import errno
import time
import sys
import os
import io


# Python 3.1.2のzipfile.pyからZipFileクラスを継承して展開に関係したメンバ関数を再定義
##### Python Software Foundation License from here #####
class JpZipFile(ZipFile):
    def __init__(self, *args, **kwargs):
      ZipFile.__init__(self, *args, **kwargs)
    def open(self, name, mode="r", pwd=None):
        """Return file-like object for 'name'."""
        if mode not in ("r", "U", "rU"):
            raise RuntimeError('open() requires mode "r", "U", or "rU"')
        if not self.fp:
            raise RuntimeError(
                  "Attempt to read ZIP archive that was already closed")

        # Only open a new file for instances where we were not
        # given a file object in the constructor
        if self._filePassed:
            zef_file = self.fp
        else:
            zef_file = io.open(self.filename, 'rb')

        # Make sure we have an info object
        if isinstance(name, ZipInfo):
            # 'name' is already an info object
            zinfo = name
        else:
            # Get info object for name
            zinfo = self.getinfo(name)

        zef_file.seek(zinfo.header_offset, 0)

        # Skip the file header:
        fheader = zef_file.read(sizeFileHeader)
        if fheader[0:4] != stringFileHeader:
            raise BadZipfile("Bad magic number for file header")

        fheader = struct.unpack(structFileHeader, fheader)
        fname = zef_file.read(fheader[_FH_FILENAME_LENGTH])
        if fheader[_FH_EXTRA_FIELD_LENGTH]:
            zef_file.read(fheader[_FH_EXTRA_FIELD_LENGTH])

        # modified for Japanese
        if fname != zinfo.orig_filename.encode("utf-8") and \
          fname != zinfo.orig_filename.encode("cp932"):
            raise BadZipfile(
                  'File name in directory {!r} and header {!r} differ.'.format(zinfo.orig_filename, fname))

        # check for encrypted flag & handle password
        is_encrypted = zinfo.flag_bits & 0x1
        zd = None
        if is_encrypted:
            if not pwd:
                pwd = self.pwd
            if not pwd:
                raise RuntimeError("File {0} is encrypted, "
                                   "password required for extraction".format(name))

            zd = _ZipDecrypter(pwd)
            # The first 12 bytes in the cypher stream is an encryption header
            #  used to strengthen the algorithm. The first 11 bytes are
            #  completely random, while the 12th contains the MSB of the CRC,
            #  or the MSB of the file time depending on the header type
            #  and is used to check the correctness of the password.
            bytes = zef_file.read(12)
            h = list(map(zd, bytes[0:12]))
            if zinfo.flag_bits & 0x8:
                # compare against the file type from extended local headers
                check_byte = (zinfo._raw_time >> 8) & 0xff
            else:
                # compare against the CRC otherwise
                check_byte = (zinfo.CRC >> 24) & 0xff
            if h[11] != check_byte:
                raise RuntimeError("Bad password for file", name)

        # build and return a ZipExtFile
        if zd is None:
            zef = ZipExtFile(zef_file, zinfo)
        else:
            zef = ZipExtFile(zef_file, zinfo, zd)

        # set universal newlines on ZipExtFile if necessary
        if "U" in mode:
            zef.set_univ_newlines(True)
        return zef
    def _RealGetContents(self):
        """Read in the table of contents for the ZIP file."""
        fp = self.fp
        endrec = _EndRecData(fp)
        if not endrec:
            raise BadZipfile("File is not a zip file")
        if self.debug > 1:
            print(endrec)
        size_cd = endrec[_ECD_SIZE]             # bytes in central directory
        offset_cd = endrec[_ECD_OFFSET]         # offset of central directory
        self.comment = endrec[_ECD_COMMENT]     # archive comment

        # "concat" is zero, unless zip was concatenated to another file
        concat = endrec[_ECD_LOCATION] - size_cd - offset_cd
        if endrec[_ECD_SIGNATURE] == stringEndArchive64:
            # If Zip64 extension structures are present, account for them
            concat -= (sizeEndCentDir64 + sizeEndCentDir64Locator)

        if self.debug > 2:
            inferred = concat + offset_cd
            print("given, inferred, offset", offset_cd, inferred, concat)
        # self.start_dir:  Position of start of central directory
        self.start_dir = offset_cd + concat
        fp.seek(self.start_dir, 0)
        data = fp.read(size_cd)
        fp = io.BytesIO(data)
        total = 0
        while total < size_cd:
            centdir = fp.read(sizeCentralDir)
            if centdir[0:4] != stringCentralDir:
                raise BadZipfile("Bad magic number for central directory")
            centdir = struct.unpack(structCentralDir, centdir)
            if self.debug > 2:
                print(centdir)
            filename = fp.read(centdir[_CD_FILENAME_LENGTH])
            flags = centdir[5]
            # modified for Japanese
            try:
                filename = filename.decode('utf-8')
            except UnicodeDecodeError:
                try:
                    filename = filename.decode('cp932')
                except UnicodeDecodeError:
                    filename = filename.decode('cp437')
          # Create ZipInfo instance to store file information
            x = ZipInfo(filename)
            x.extra = fp.read(centdir[_CD_EXTRA_FIELD_LENGTH])
            x.comment = fp.read(centdir[_CD_COMMENT_LENGTH])
            x.header_offset = centdir[_CD_LOCAL_HEADER_OFFSET]
            (x.create_version, x.create_system, x.extract_version, x.reserved,
                x.flag_bits, x.compress_type, t, d,
                x.CRC, x.compress_size, x.file_size) = centdir[1:12]
            x.volume, x.internal_attr, x.external_attr = centdir[15:18]
            # Convert date/time code to (year, month, day, hour, min, sec)
            x._raw_time = t
            x.date_time = ( (d>>9)+1980, (d>>5)&0xF, d&0x1F,
                                     t>>11, (t>>5)&0x3F, (t&0x1F) * 2 )

            x._decodeExtra()
            x.header_offset = x.header_offset + concat
            self.filelist.append(x)
            self.NameToInfo[x.filename] = x

            # update total bytes read from central directory
            total = (total + sizeCentralDir + centdir[_CD_FILENAME_LENGTH]
                     + centdir[_CD_EXTRA_FIELD_LENGTH]
                     + centdir[_CD_COMMENT_LENGTH])

            if self.debug > 2:
                print("total", total)
##### Python Software Foundation License ends here #####

##### GPLv3+ from here #####

def main ():
  # ロケール設定
  # エラー発生時のメッセージのロケールに影響
  locale.setlocale (locale.LC_ALL, '')

  # オプション解析(-dオプションで展開先指定を可能に)
  parser = OptionParser (usage='%prog ( -d ) [zipfile]')
  parser.set_defaults (outdir=os.getcwd ())
  parser.add_option ('-d', '--output-directory',
                     dest='outdir',
                     action='store',
                     type='string',
                     help='set output directory',
                     metavar='DIR')
  (options, args) = parser.parse_args ()

  # 入力ファイルなし
  if len (args) < 1:
    parser.error ('no input file specified')

  # 入力ファイルと展開先接頭辞を決定
  infile = args[0]
  prefix = options.outdir
  # 出力先ディレクトリが無ければ掘っておく
  try:
    os.makedirs (prefix)
  except:
    # 既にある場合と書き込み失敗の場合とがあるが
    # 失敗した場合は後でファイル書き込み時にもエラーが出るので一緒に扱うことにする
    pass

  # ZIPファイルを開いて展開
  (filesl, dirsl) = ([], [])
  try:
    f_zip = JpZipFile (infile, 'r')
    print ('[OK] open: {0}'.format (infile))
  except IOError as e:
    (no, msg) = e.args
    print ('*NG* open: {0}: IOError[{1}]: {2}'.format (infile, no, msg), file=sys.stderr)
    return 1
  (namel, infol) = (f_zip.namelist (), f_zip.infolist ())
  try:
    # 分別
    # namelist()の名前とinfolist()の情報は同じ順番なのでzip()で同時に回す
    for (item, info) in zip (namel, infol):
      # ZipInfoオブジェクトのメンバdate_timeは使いにくく
      # 文字列を介してtime.strptime()とtime.mktime()したものをos.utime()へ
      timestamp = time.mktime (time.strptime ('%d/%02d/%02d %02d:%02d:%02d' % info.date_time, '%Y/%m/%d %H:%M:%S'))
      if item.endswith ('/'):
        # ディレクトリ
        dirsl.append ((item, timestamp))
      else:
        # ファイル
        filesl.append ((item, timestamp))
        # ディレクトリのない書庫向けに親ディレクトリもディレクトリ一覧に追加
        parent_in_list = False
        parent = item
        while True:
          # 階層を1つずつ上がっていき、一番上までたどったら抜ける
          parent = os.path.dirname (parent)
          if parent == '':
            break
          # 重複しないように、現在のディレクトリ一覧と照らし合わせて
          # ない場合にのみ追加する
          for (i, t) in dirsl:
            if parent == i:
              parent_in_list = True
          if not parent_in_list:
            dirsl.append ((parent, None))
    # ディレクトリを先に作成
    for (item, timestamp) in dirsl:
      outpath = os.path.join (prefix, item)
      try:
        os.makedirs (outpath)
      except OSError as e:
        (no, msg) = e.args
        # 存在することによる失敗は無視
        if no != errno.EEXIST:
          print ('*NG* makedirs: {0}: OSError[{1}] {2}'.format (outpath, no, msg), file=sys.stderr)
          return 1
    # ファイルを展開
    for (item, timestamp) in filesl:
      outpath = os.path.join (prefix, item)
      try:
        f_out = open (outpath, 'wb')  # バイナリモード指定必須
        print ('[OK] open: {0}'.format (outpath))
      except IOError as e:
        (no, msg) = e.args
        print ('*NG* open: {0}: IOError[{1}] {2}'.format (outpath, no, msg), file=sys.stderr)
        return 1
      try:
        f_out.write (f_zip.read (item))
        print ('[OK] write: {0}'.format (outpath))
      except IOError as e:
        (no, msg) = e.args
        print ('*NG* write: {0}: IOError[{1}] {2}'.format (outpath, no, msg), file=sys.stderr)
        return 1
      finally:
        f_out.close ()
      # タイムスタンプ設定
      try:
        os.utime (outpath, (timestamp, timestamp))
        print ('[OK] utime: {0}: '.format (timestamp) + outpath)
      except OSError as e:
        (no, msg) = e.args
        print ('*NG* utime: {0}: OSError[{1}] {2}'.format (outpath, no, msg), file=sys.stderr)
        return 1
    # 最後にディレクトリのタイムスタンプを設定
    for (item, timestamp) in dirsl:
      outpath = os.path.join (prefix, item)
      if timestamp:
        try:
          os.utime (outpath, (timestamp, timestamp))
          print ('[OK] utime: {0}: '.format (timestamp) + outpath)
        except OSError as e:
          (no, msg) = e.args
          print ('*NG* utime: {0}: OSError[{1}] {2}'.format (outpath, no, msg), file=sys.stderr)
          return 1
      else:
        # ファイル一覧からこのディレクトリ中の項目の中で最新のものにする
        timestamp = 0
        for (n, i) in zip (namel, infol):
          if n.startswith (item):
            t = time.mktime (time.strptime ('%d/%02d/%02d %02d:%02d:%02d' % i.date_time, '%Y/%m/%d %H:%M:%S'))
            if timestamp < t:
              timestamp = t
        try:
          os.utime (outpath, (timestamp, timestamp))
          print ('[OK] utime: {0}: '.format (timestamp) + outpath)
        except OSError as e:
          (no, msg) = e.args
          print ('*NG* utime: {0}: OSError[{1}] {2}'.format (outpath, no, msg), file=sys.stderr)
          return 1
  except IOError as e:
    (no, msg) = e.args
    print('*NG* IOError[{0}]: {1}'.format(no, msg), file=sys.stderr)
    return 1
  finally:
    f_zip.close()
  return 0

if __name__ == '__main__':
  sys.exit (main ())

関連記事:

使用したバージョン: