CP932のエンコーディングに対応したZIPファイル展開スクリプトのPython 3対応版
Pythonのバージョン3系では内部の仕様に色々と変更が加えられ、バージョン2系向けのスクリプトはバージョン3系ではそのままでは動かないことも多く、移行支援目的のPythonスクリプト変換ツール2to3を用いても正しく実行できない場合がある。
「PythonでZIPファイルを展開する(コード例)」のスクリプトはその例で、文字列とエンコーディングに関する仕様が変わったことに伴うzipfileモジュール内の変更によってZIPファイルの中のファイル名のデータ(エンコードされたバイト列データ)を(Unicodeのデータとして保持される)文字列へうまくデコードできなくなっている。
幸いzipfileのモジュールはバージョン3.1系の時点ではC言語ではなくPythonで書かれており、ZIPファイルはZipFileというクラスで扱うことになっているため、このクラスを継承してエンコーディング関係の処理を調整したものを使うようにしつつ試行錯誤することで、うまくUTF-8/CP932のどちらのエンコーディングの名前のファイルを含んだZIPファイルでも正しく展開することができるようになった(継承したクラスにおける変更点が適切かどうかは不明)。
その他、細かい部分を色々と調整したものを下に貼り付ける。
[任意]ファイル名: unzip-py3.py ライセンス: GPL-3 (or lator)(JpZipFileクラスの定義以外), Python Software Foundation License(JpZipFileクラスの定義のみ)
#! /usr/bin/python # -*- coding: utf-8 -*- # ZIPファイルを展開するPythonスクリプト(Python 3用) # CP932エンコーディングのファイル名を含むZIPファイルに対応/タイムスタンプ復元機能 # version 20101117 # (C) 2009-2010 kakurasan # Licensed under GPLv3+(main), Python Software Foundation License(JpZipFile) from zipfile import ZipFile, _EndRecData, _ECD_SIZE, _ECD_OFFSET, _ECD_COMMENT, _ECD_LOCATION, _ECD_SIGNATURE, stringEndArchive64, sizeCentralDir, stringCentralDir, structCentralDir, _CD_FILENAME_LENGTH, ZipInfo, _CD_EXTRA_FIELD_LENGTH, _CD_COMMENT_LENGTH, _CD_LOCAL_HEADER_OFFSET, sizeFileHeader, stringFileHeader, structFileHeader, _FH_FILENAME_LENGTH, _FH_EXTRA_FIELD_LENGTH, ZipExtFile, BadZipfile from optparse import OptionParser import locale import struct import errno import time import sys import os import io # Python 3.1.2のzipfile.pyからZipFileクラスを継承して展開に関係したメンバ関数を再定義 ##### Python Software Foundation License from here ##### class JpZipFile(ZipFile): def __init__(self, *args, **kwargs): ZipFile.__init__(self, *args, **kwargs) def open(self, name, mode="r", pwd=None): """Return file-like object for 'name'.""" if mode not in ("r", "U", "rU"): raise RuntimeError('open() requires mode "r", "U", or "rU"') if not self.fp: raise RuntimeError( "Attempt to read ZIP archive that was already closed") # Only open a new file for instances where we were not # given a file object in the constructor if self._filePassed: zef_file = self.fp else: zef_file = io.open(self.filename, 'rb') # Make sure we have an info object if isinstance(name, ZipInfo): # 'name' is already an info object zinfo = name else: # Get info object for name zinfo = self.getinfo(name) zef_file.seek(zinfo.header_offset, 0) # Skip the file header: fheader = zef_file.read(sizeFileHeader) if fheader[0:4] != stringFileHeader: raise BadZipfile("Bad magic number for file header") fheader = struct.unpack(structFileHeader, fheader) fname = zef_file.read(fheader[_FH_FILENAME_LENGTH]) if fheader[_FH_EXTRA_FIELD_LENGTH]: zef_file.read(fheader[_FH_EXTRA_FIELD_LENGTH]) # modified for Japanese if fname != zinfo.orig_filename.encode("utf-8") and \ fname != zinfo.orig_filename.encode("cp932"): raise BadZipfile( 'File name in directory {!r} and header {!r} differ.'.format(zinfo.orig_filename, fname)) # check for encrypted flag & handle password is_encrypted = zinfo.flag_bits & 0x1 zd = None if is_encrypted: if not pwd: pwd = self.pwd if not pwd: raise RuntimeError("File {0} is encrypted, " "password required for extraction".format(name)) zd = _ZipDecrypter(pwd) # The first 12 bytes in the cypher stream is an encryption header # used to strengthen the algorithm. The first 11 bytes are # completely random, while the 12th contains the MSB of the CRC, # or the MSB of the file time depending on the header type # and is used to check the correctness of the password. bytes = zef_file.read(12) h = list(map(zd, bytes[0:12])) if zinfo.flag_bits & 0x8: # compare against the file type from extended local headers check_byte = (zinfo._raw_time >> 8) & 0xff else: # compare against the CRC otherwise check_byte = (zinfo.CRC >> 24) & 0xff if h[11] != check_byte: raise RuntimeError("Bad password for file", name) # build and return a ZipExtFile if zd is None: zef = ZipExtFile(zef_file, zinfo) else: zef = ZipExtFile(zef_file, zinfo, zd) # set universal newlines on ZipExtFile if necessary if "U" in mode: zef.set_univ_newlines(True) return zef def _RealGetContents(self): """Read in the table of contents for the ZIP file.""" fp = self.fp endrec = _EndRecData(fp) if not endrec: raise BadZipfile("File is not a zip file") if self.debug > 1: print(endrec) size_cd = endrec[_ECD_SIZE] # bytes in central directory offset_cd = endrec[_ECD_OFFSET] # offset of central directory self.comment = endrec[_ECD_COMMENT] # archive comment # "concat" is zero, unless zip was concatenated to another file concat = endrec[_ECD_LOCATION] - size_cd - offset_cd if endrec[_ECD_SIGNATURE] == stringEndArchive64: # If Zip64 extension structures are present, account for them concat -= (sizeEndCentDir64 + sizeEndCentDir64Locator) if self.debug > 2: inferred = concat + offset_cd print("given, inferred, offset", offset_cd, inferred, concat) # self.start_dir: Position of start of central directory self.start_dir = offset_cd + concat fp.seek(self.start_dir, 0) data = fp.read(size_cd) fp = io.BytesIO(data) total = 0 while total < size_cd: centdir = fp.read(sizeCentralDir) if centdir[0:4] != stringCentralDir: raise BadZipfile("Bad magic number for central directory") centdir = struct.unpack(structCentralDir, centdir) if self.debug > 2: print(centdir) filename = fp.read(centdir[_CD_FILENAME_LENGTH]) flags = centdir[5] # modified for Japanese try: filename = filename.decode('utf-8') except UnicodeDecodeError: try: filename = filename.decode('cp932') except UnicodeDecodeError: filename = filename.decode('cp437') # Create ZipInfo instance to store file information x = ZipInfo(filename) x.extra = fp.read(centdir[_CD_EXTRA_FIELD_LENGTH]) x.comment = fp.read(centdir[_CD_COMMENT_LENGTH]) x.header_offset = centdir[_CD_LOCAL_HEADER_OFFSET] (x.create_version, x.create_system, x.extract_version, x.reserved, x.flag_bits, x.compress_type, t, d, x.CRC, x.compress_size, x.file_size) = centdir[1:12] x.volume, x.internal_attr, x.external_attr = centdir[15:18] # Convert date/time code to (year, month, day, hour, min, sec) x._raw_time = t x.date_time = ( (d>>9)+1980, (d>>5)&0xF, d&0x1F, t>>11, (t>>5)&0x3F, (t&0x1F) * 2 ) x._decodeExtra() x.header_offset = x.header_offset + concat self.filelist.append(x) self.NameToInfo[x.filename] = x # update total bytes read from central directory total = (total + sizeCentralDir + centdir[_CD_FILENAME_LENGTH] + centdir[_CD_EXTRA_FIELD_LENGTH] + centdir[_CD_COMMENT_LENGTH]) if self.debug > 2: print("total", total) ##### Python Software Foundation License ends here ##### ##### GPLv3+ from here ##### def main (): # ロケール設定 # エラー発生時のメッセージのロケールに影響 locale.setlocale (locale.LC_ALL, '') # オプション解析(-dオプションで展開先指定を可能に) parser = OptionParser (usage='%prog ( -d ) [zipfile]') parser.set_defaults (outdir=os.getcwd ()) parser.add_option ('-d', '--output-directory', dest='outdir', action='store', type='string', help='set output directory', metavar='DIR') (options, args) = parser.parse_args () # 入力ファイルなし if len (args) < 1: parser.error ('no input file specified') # 入力ファイルと展開先接頭辞を決定 infile = args[0] prefix = options.outdir # 出力先ディレクトリが無ければ掘っておく try: os.makedirs (prefix) except: # 既にある場合と書き込み失敗の場合とがあるが # 失敗した場合は後でファイル書き込み時にもエラーが出るので一緒に扱うことにする pass # ZIPファイルを開いて展開 (filesl, dirsl) = ([], []) try: f_zip = JpZipFile (infile, 'r') print ('[OK] open: {0}'.format (infile)) except IOError as e: (no, msg) = e.args print ('*NG* open: {0}: IOError[{1}]: {2}'.format (infile, no, msg), file=sys.stderr) return 1 (namel, infol) = (f_zip.namelist (), f_zip.infolist ()) try: # 分別 # namelist()の名前とinfolist()の情報は同じ順番なのでzip()で同時に回す for (item, info) in zip (namel, infol): # ZipInfoオブジェクトのメンバdate_timeは使いにくく # 文字列を介してtime.strptime()とtime.mktime()したものをos.utime()へ timestamp = time.mktime (time.strptime ('%d/%02d/%02d %02d:%02d:%02d' % info.date_time, '%Y/%m/%d %H:%M:%S')) if item.endswith ('/'): # ディレクトリ dirsl.append ((item, timestamp)) else: # ファイル filesl.append ((item, timestamp)) # ディレクトリのない書庫向けに親ディレクトリもディレクトリ一覧に追加 parent_in_list = False parent = item while True: # 階層を1つずつ上がっていき、一番上までたどったら抜ける parent = os.path.dirname (parent) if parent == '': break # 重複しないように、現在のディレクトリ一覧と照らし合わせて # ない場合にのみ追加する for (i, t) in dirsl: if parent == i: parent_in_list = True if not parent_in_list: dirsl.append ((parent, None)) # ディレクトリを先に作成 for (item, timestamp) in dirsl: outpath = os.path.join (prefix, item) try: os.makedirs (outpath) except OSError as e: (no, msg) = e.args # 存在することによる失敗は無視 if no != errno.EEXIST: print ('*NG* makedirs: {0}: OSError[{1}] {2}'.format (outpath, no, msg), file=sys.stderr) return 1 # ファイルを展開 for (item, timestamp) in filesl: outpath = os.path.join (prefix, item) try: f_out = open (outpath, 'wb') # バイナリモード指定必須 print ('[OK] open: {0}'.format (outpath)) except IOError as e: (no, msg) = e.args print ('*NG* open: {0}: IOError[{1}] {2}'.format (outpath, no, msg), file=sys.stderr) return 1 try: f_out.write (f_zip.read (item)) print ('[OK] write: {0}'.format (outpath)) except IOError as e: (no, msg) = e.args print ('*NG* write: {0}: IOError[{1}] {2}'.format (outpath, no, msg), file=sys.stderr) return 1 finally: f_out.close () # タイムスタンプ設定 try: os.utime (outpath, (timestamp, timestamp)) print ('[OK] utime: {0}: '.format (timestamp) + outpath) except OSError as e: (no, msg) = e.args print ('*NG* utime: {0}: OSError[{1}] {2}'.format (outpath, no, msg), file=sys.stderr) return 1 # 最後にディレクトリのタイムスタンプを設定 for (item, timestamp) in dirsl: outpath = os.path.join (prefix, item) if timestamp: try: os.utime (outpath, (timestamp, timestamp)) print ('[OK] utime: {0}: '.format (timestamp) + outpath) except OSError as e: (no, msg) = e.args print ('*NG* utime: {0}: OSError[{1}] {2}'.format (outpath, no, msg), file=sys.stderr) return 1 else: # ファイル一覧からこのディレクトリ中の項目の中で最新のものにする timestamp = 0 for (n, i) in zip (namel, infol): if n.startswith (item): t = time.mktime (time.strptime ('%d/%02d/%02d %02d:%02d:%02d' % i.date_time, '%Y/%m/%d %H:%M:%S')) if timestamp < t: timestamp = t try: os.utime (outpath, (timestamp, timestamp)) print ('[OK] utime: {0}: '.format (timestamp) + outpath) except OSError as e: (no, msg) = e.args print ('*NG* utime: {0}: OSError[{1}] {2}'.format (outpath, no, msg), file=sys.stderr) return 1 except IOError as e: (no, msg) = e.args print('*NG* IOError[{0}]: {1}'.format(no, msg), file=sys.stderr) return 1 finally: f_zip.close() return 0 if __name__ == '__main__': sys.exit (main ())
関連記事:
- PythonでZIPファイルを展開する(コード例)
- バージョン3系のPythonにおける文字列とそのエンコーディングに関する覚え書き(文字列型とバイト列型)
- バージョン3系のPythonにおける文字列とそのエンコーディングに関する覚え書き(ファイル入出力とエンコーディング)
使用したバージョン:
- Python 3.1.2