mirror of
https://github.com/moparisthebest/SickRage
synced 2024-12-15 04:22:17 -05:00
402 lines
15 KiB
Python
402 lines
15 KiB
Python
|
"""
|
||
|
7zip file parser
|
||
|
|
||
|
Informations:
|
||
|
- File 7zformat.txt of 7-zip SDK:
|
||
|
http://www.7-zip.org/sdk.html
|
||
|
|
||
|
Author: Olivier SCHWAB
|
||
|
Creation date: 6 december 2006
|
||
|
"""
|
||
|
|
||
|
from lib.hachoir_parser import Parser
|
||
|
from lib.hachoir_core.field import (Field, FieldSet, ParserError,
|
||
|
GenericVector,
|
||
|
Enum, UInt8, UInt32, UInt64,
|
||
|
Bytes, RawBytes)
|
||
|
from lib.hachoir_core.endian import LITTLE_ENDIAN
|
||
|
from lib.hachoir_core.text_handler import textHandler, hexadecimal, filesizeHandler
|
||
|
|
||
|
class SZUInt64(Field):
|
||
|
"""
|
||
|
Variable length UInt64, where the first byte gives both the number of bytes
|
||
|
needed and the upper byte value.
|
||
|
"""
|
||
|
def __init__(self, parent, name, max_size=None, description=None):
|
||
|
Field.__init__(self, parent, name, size=8, description=description)
|
||
|
value = 0
|
||
|
addr = self.absolute_address
|
||
|
mask = 0x80
|
||
|
firstByte = parent.stream.readBits(addr, 8, LITTLE_ENDIAN)
|
||
|
for i in xrange(8):
|
||
|
addr += 8
|
||
|
if not (firstByte & mask):
|
||
|
value += ((firstByte & (mask-1)) << (8*i))
|
||
|
break
|
||
|
value |= (parent.stream.readBits(addr, 8, LITTLE_ENDIAN) << (8*i))
|
||
|
mask >>= 1
|
||
|
self._size += 8
|
||
|
self.createValue = lambda: value
|
||
|
|
||
|
ID_END, ID_HEADER, ID_ARCHIVE_PROPS, ID_ADD_STREAM_INFO, ID_MAIN_STREAM_INFO, \
|
||
|
ID_FILES_INFO, ID_PACK_INFO, ID_UNPACK_INFO, ID_SUBSTREAMS_INFO, ID_SIZE, \
|
||
|
ID_CRC, ID_FOLDER, ID_CODERS_UNPACK_SIZE, ID_NUM_UNPACK_STREAMS, \
|
||
|
ID_EMPTY_STREAM, ID_EMPTY_FILE, ID_ANTI, ID_NAME, ID_CREATION_TIME, \
|
||
|
ID_LAST_ACCESS_TIME, ID_LAST_WRITE_TIME, ID_WIN_ATTR, ID_COMMENT, \
|
||
|
ID_ENCODED_HEADER = xrange(24)
|
||
|
|
||
|
ID_INFO = {
|
||
|
ID_END : "End",
|
||
|
ID_HEADER : "Header embedding another one",
|
||
|
ID_ARCHIVE_PROPS : "Archive Properties",
|
||
|
ID_ADD_STREAM_INFO : "Additional Streams Info",
|
||
|
ID_MAIN_STREAM_INFO : "Main Streams Info",
|
||
|
ID_FILES_INFO : "Files Info",
|
||
|
ID_PACK_INFO : "Pack Info",
|
||
|
ID_UNPACK_INFO : "Unpack Info",
|
||
|
ID_SUBSTREAMS_INFO : "Substreams Info",
|
||
|
ID_SIZE : "Size",
|
||
|
ID_CRC : "CRC",
|
||
|
ID_FOLDER : "Folder",
|
||
|
ID_CODERS_UNPACK_SIZE: "Coders Unpacked size",
|
||
|
ID_NUM_UNPACK_STREAMS: "Number of Unpacked Streams",
|
||
|
ID_EMPTY_STREAM : "Empty Stream",
|
||
|
ID_EMPTY_FILE : "Empty File",
|
||
|
ID_ANTI : "Anti",
|
||
|
ID_NAME : "Name",
|
||
|
ID_CREATION_TIME : "Creation Time",
|
||
|
ID_LAST_ACCESS_TIME : "Last Access Time",
|
||
|
ID_LAST_WRITE_TIME : "Last Write Time",
|
||
|
ID_WIN_ATTR : "Win Attributes",
|
||
|
ID_COMMENT : "Comment",
|
||
|
ID_ENCODED_HEADER : "Header holding encoded data info",
|
||
|
}
|
||
|
|
||
|
class SkippedData(FieldSet):
|
||
|
def createFields(self):
|
||
|
yield Enum(UInt8(self, "id[]"), ID_INFO)
|
||
|
size = SZUInt64(self, "size")
|
||
|
yield size
|
||
|
if size.value > 0:
|
||
|
yield RawBytes(self, "data", size.value)
|
||
|
|
||
|
def waitForID(s, wait_id, wait_name="waited_id[]"):
|
||
|
while not s.eof:
|
||
|
addr = s.absolute_address+s.current_size
|
||
|
uid = s.stream.readBits(addr, 8, LITTLE_ENDIAN)
|
||
|
if uid == wait_id:
|
||
|
yield Enum(UInt8(s, wait_name), ID_INFO)
|
||
|
s.info("Found ID %s (%u)" % (ID_INFO[uid], uid))
|
||
|
return
|
||
|
s.info("Skipping ID %u!=%u" % (uid, wait_id))
|
||
|
yield SkippedData(s, "skipped_id[]", "%u != %u" % (uid, wait_id))
|
||
|
|
||
|
class HashDigest(FieldSet):
|
||
|
def __init__(self, parent, name, num_digests, desc=None):
|
||
|
FieldSet.__init__(self, parent, name, desc)
|
||
|
self.num_digests = num_digests
|
||
|
def createFields(self):
|
||
|
yield Enum(UInt8(self, "id"), ID_INFO)
|
||
|
bytes = self.stream.readBytes(self.absolute_address, self.num_digests)
|
||
|
if self.num_digests > 0:
|
||
|
yield GenericVector(self, "defined[]", self.num_digests, UInt8, "bool")
|
||
|
for index in xrange(self.num_digests):
|
||
|
if bytes[index]:
|
||
|
yield textHandler(UInt32(self, "hash[]",
|
||
|
"Hash for digest %u" % index), hexadecimal)
|
||
|
|
||
|
class PackInfo(FieldSet):
|
||
|
def createFields(self):
|
||
|
yield Enum(UInt8(self, "id"), ID_INFO)
|
||
|
# Very important, helps determine where the data is
|
||
|
yield SZUInt64(self, "pack_pos", "Position of the packs")
|
||
|
num = SZUInt64(self, "num_pack_streams")
|
||
|
yield num
|
||
|
num = num.value
|
||
|
|
||
|
for field in waitForID(self, ID_SIZE, "size_marker"):
|
||
|
yield field
|
||
|
|
||
|
for size in xrange(num):
|
||
|
yield SZUInt64(self, "pack_size[]")
|
||
|
|
||
|
while not self.eof:
|
||
|
addr = self.absolute_address+self.current_size
|
||
|
uid = self.stream.readBits(addr, 8, LITTLE_ENDIAN)
|
||
|
if uid == ID_END:
|
||
|
yield Enum(UInt8(self, "end_marker"), ID_INFO)
|
||
|
break
|
||
|
elif uid == ID_CRC:
|
||
|
yield HashDigest(self, "hash_digest", size)
|
||
|
else:
|
||
|
yield SkippedData(self, "skipped_data")
|
||
|
|
||
|
def lzmaParams(value):
|
||
|
param = value.value
|
||
|
remainder = param / 9
|
||
|
# Literal coder context bits
|
||
|
lc = param % 9
|
||
|
# Position state bits
|
||
|
pb = remainder / 5
|
||
|
# Literal coder position bits
|
||
|
lp = remainder % 5
|
||
|
return "lc=%u pb=%u lp=%u" % (lc, lp, pb)
|
||
|
|
||
|
class CoderID(FieldSet):
|
||
|
CODECS = {
|
||
|
# Only 2 methods ... and what about PPMD ?
|
||
|
"\0" : "copy",
|
||
|
"\3\1\1": "lzma",
|
||
|
}
|
||
|
def createFields(self):
|
||
|
byte = UInt8(self, "id_size")
|
||
|
yield byte
|
||
|
byte = byte.value
|
||
|
self.info("ID=%u" % byte)
|
||
|
size = byte & 0xF
|
||
|
if size > 0:
|
||
|
name = self.stream.readBytes(self.absolute_address+self.current_size, size)
|
||
|
if name in self.CODECS:
|
||
|
name = self.CODECS[name]
|
||
|
self.info("Codec is %s" % name)
|
||
|
else:
|
||
|
self.info("Undetermined codec %s" % name)
|
||
|
name = "unknown"
|
||
|
yield RawBytes(self, name, size)
|
||
|
#yield textHandler(Bytes(self, "id", size), lambda: name)
|
||
|
if byte & 0x10:
|
||
|
yield SZUInt64(self, "num_stream_in")
|
||
|
yield SZUInt64(self, "num_stream_out")
|
||
|
self.info("Streams: IN=%u OUT=%u" % \
|
||
|
(self["num_stream_in"].value, self["num_stream_out"].value))
|
||
|
if byte & 0x20:
|
||
|
size = SZUInt64(self, "properties_size[]")
|
||
|
yield size
|
||
|
if size.value == 5:
|
||
|
#LzmaDecodeProperties@LZMAStateDecode.c
|
||
|
yield textHandler(UInt8(self, "parameters"), lzmaParams)
|
||
|
yield filesizeHandler(UInt32(self, "dictionary_size"))
|
||
|
elif size.value > 0:
|
||
|
yield RawBytes(self, "properties[]", size.value)
|
||
|
|
||
|
class CoderInfo(FieldSet):
|
||
|
def __init__(self, parent, name, desc=None):
|
||
|
FieldSet.__init__(self, parent, name, desc)
|
||
|
self.in_streams = 1
|
||
|
self.out_streams = 1
|
||
|
def createFields(self):
|
||
|
# The real ID
|
||
|
addr = self.absolute_address + self.current_size
|
||
|
b = self.parent.stream.readBits(addr, 8, LITTLE_ENDIAN)
|
||
|
cid = CoderID(self, "coder_id")
|
||
|
yield cid
|
||
|
if b&0x10: # Work repeated, ...
|
||
|
self.in_streams = cid["num_stream_in"].value
|
||
|
self.out_streams = cid["num_stream_out"].value
|
||
|
|
||
|
# Skip other IDs
|
||
|
while b&0x80:
|
||
|
addr = self.absolute_address + self.current_size
|
||
|
b = self.parent.stream.readBits(addr, 8, LITTLE_ENDIAN)
|
||
|
yield CoderID(self, "unused_codec_id[]")
|
||
|
|
||
|
class BindPairInfo(FieldSet):
|
||
|
def createFields(self):
|
||
|
# 64 bits values then cast to 32 in fact
|
||
|
yield SZUInt64(self, "in_index")
|
||
|
yield SZUInt64(self, "out_index")
|
||
|
self.info("Indexes: IN=%u OUT=%u" % \
|
||
|
(self["in_index"].value, self["out_index"].value))
|
||
|
|
||
|
class FolderItem(FieldSet):
|
||
|
def __init__(self, parent, name, desc=None):
|
||
|
FieldSet.__init__(self, parent, name, desc)
|
||
|
self.in_streams = 0
|
||
|
self.out_streams = 0
|
||
|
|
||
|
def createFields(self):
|
||
|
yield SZUInt64(self, "num_coders")
|
||
|
num = self["num_coders"].value
|
||
|
self.info("Folder: %u codecs" % num)
|
||
|
|
||
|
# Coders info
|
||
|
for index in xrange(num):
|
||
|
ci = CoderInfo(self, "coder_info[]")
|
||
|
yield ci
|
||
|
self.in_streams += ci.in_streams
|
||
|
self.out_streams += ci.out_streams
|
||
|
|
||
|
# Bin pairs
|
||
|
self.info("out streams: %u" % self.out_streams)
|
||
|
for index in xrange(self.out_streams-1):
|
||
|
yield BindPairInfo(self, "bind_pair[]")
|
||
|
|
||
|
# Packed streams
|
||
|
# @todo: Actually find mapping
|
||
|
packed_streams = self.in_streams - self.out_streams + 1
|
||
|
if packed_streams == 1:
|
||
|
pass
|
||
|
else:
|
||
|
for index in xrange(packed_streams):
|
||
|
yield SZUInt64(self, "pack_stream[]")
|
||
|
|
||
|
|
||
|
class UnpackInfo(FieldSet):
|
||
|
def createFields(self):
|
||
|
yield Enum(UInt8(self, "id"), ID_INFO)
|
||
|
# Wait for synch
|
||
|
for field in waitForID(self, ID_FOLDER, "folder_marker"):
|
||
|
yield field
|
||
|
yield SZUInt64(self, "num_folders")
|
||
|
|
||
|
# Get generic info
|
||
|
num = self["num_folders"].value
|
||
|
self.info("%u folders" % num)
|
||
|
yield UInt8(self, "is_external")
|
||
|
|
||
|
# Read folder items
|
||
|
for folder_index in xrange(num):
|
||
|
yield FolderItem(self, "folder_item[]")
|
||
|
|
||
|
# Get unpack sizes for each coder of each folder
|
||
|
for field in waitForID(self, ID_CODERS_UNPACK_SIZE, "coders_unpsize_marker"):
|
||
|
yield field
|
||
|
for folder_index in xrange(num):
|
||
|
folder_item = self["folder_item[%u]" % folder_index]
|
||
|
for index in xrange(folder_item.out_streams):
|
||
|
#yield UInt8(self, "unpack_size[]")
|
||
|
yield SZUInt64(self, "unpack_size[]")
|
||
|
|
||
|
# Extract digests
|
||
|
while not self.eof:
|
||
|
addr = self.absolute_address+self.current_size
|
||
|
uid = self.stream.readBits(addr, 8, LITTLE_ENDIAN)
|
||
|
if uid == ID_END:
|
||
|
yield Enum(UInt8(self, "end_marker"), ID_INFO)
|
||
|
break
|
||
|
elif uid == ID_CRC:
|
||
|
yield HashDigest(self, "hash_digest", num)
|
||
|
else:
|
||
|
yield SkippedData(self, "skip_data")
|
||
|
|
||
|
class SubStreamInfo(FieldSet):
|
||
|
def createFields(self):
|
||
|
yield Enum(UInt8(self, "id"), ID_INFO)
|
||
|
raise ParserError("SubStreamInfo not implemented yet")
|
||
|
|
||
|
class EncodedHeader(FieldSet):
|
||
|
def createFields(self):
|
||
|
yield Enum(UInt8(self, "id"), ID_INFO)
|
||
|
while not self.eof:
|
||
|
addr = self.absolute_address+self.current_size
|
||
|
uid = self.stream.readBits(addr, 8, LITTLE_ENDIAN)
|
||
|
if uid == ID_END:
|
||
|
yield Enum(UInt8(self, "end_marker"), ID_INFO)
|
||
|
break
|
||
|
elif uid == ID_PACK_INFO:
|
||
|
yield PackInfo(self, "pack_info", ID_INFO[ID_PACK_INFO])
|
||
|
elif uid == ID_UNPACK_INFO:
|
||
|
yield UnpackInfo(self, "unpack_info", ID_INFO[ID_UNPACK_INFO])
|
||
|
elif uid == ID_SUBSTREAMS_INFO:
|
||
|
yield SubStreamInfo(self, "substreams_info", ID_INFO[ID_SUBSTREAMS_INFO])
|
||
|
else:
|
||
|
self.info("Unexpected ID (%i)" % uid)
|
||
|
break
|
||
|
|
||
|
class IDHeader(FieldSet):
|
||
|
def createFields(self):
|
||
|
yield Enum(UInt8(self, "id"), ID_INFO)
|
||
|
ParserError("IDHeader not implemented")
|
||
|
|
||
|
class NextHeader(FieldSet):
|
||
|
def __init__(self, parent, name, desc="Next header"):
|
||
|
FieldSet.__init__(self, parent, name, desc)
|
||
|
self._size = 8*self["/signature/start_hdr/next_hdr_size"].value
|
||
|
# Less work, as much interpretable information as the other
|
||
|
# version... what an obnoxious format
|
||
|
def createFields2(self):
|
||
|
yield Enum(UInt8(self, "header_type"), ID_INFO)
|
||
|
yield RawBytes(self, "header_data", self._size-1)
|
||
|
def createFields(self):
|
||
|
uid = self.stream.readBits(self.absolute_address, 8, LITTLE_ENDIAN)
|
||
|
if uid == ID_HEADER:
|
||
|
yield IDHeader(self, "header", ID_INFO[ID_HEADER])
|
||
|
elif uid == ID_ENCODED_HEADER:
|
||
|
yield EncodedHeader(self, "encoded_hdr", ID_INFO[ID_ENCODED_HEADER])
|
||
|
# Game Over: this is usually encoded using LZMA, not copy
|
||
|
# See SzReadAndDecodePackedStreams/SzDecode being called with the
|
||
|
# data position from "/next_hdr/encoded_hdr/pack_info/pack_pos"
|
||
|
# We should process further, yet we can't...
|
||
|
else:
|
||
|
ParserError("Unexpected ID %u" % uid)
|
||
|
size = self._size - self.current_size
|
||
|
if size > 0:
|
||
|
yield RawBytes(self, "next_hdr_data", size//8, "Next header's data")
|
||
|
|
||
|
class Body(FieldSet):
|
||
|
def __init__(self, parent, name, desc="Body data"):
|
||
|
FieldSet.__init__(self, parent, name, desc)
|
||
|
self._size = 8*self["/signature/start_hdr/next_hdr_offset"].value
|
||
|
def createFields(self):
|
||
|
if "encoded_hdr" in self["/next_hdr/"]:
|
||
|
pack_size = sum([s.value for s in self.array("/next_hdr/encoded_hdr/pack_info/pack_size")])
|
||
|
body_size = self["/next_hdr/encoded_hdr/pack_info/pack_pos"].value
|
||
|
yield RawBytes(self, "compressed_data", body_size, "Compressed data")
|
||
|
# Here we could check if copy method was used to "compress" it,
|
||
|
# but this never happens, so just output "compressed file info"
|
||
|
yield RawBytes(self, "compressed_file_info", pack_size,
|
||
|
"Compressed file information")
|
||
|
size = (self._size//8) - pack_size - body_size
|
||
|
if size > 0:
|
||
|
yield RawBytes(self, "unknown_data", size)
|
||
|
elif "header" in self["/next_hdr"]:
|
||
|
yield RawBytes(self, "compressed_data", self._size//8, "Compressed data")
|
||
|
|
||
|
class StartHeader(FieldSet):
|
||
|
static_size = 160
|
||
|
def createFields(self):
|
||
|
yield textHandler(UInt64(self, "next_hdr_offset",
|
||
|
"Next header offset"), hexadecimal)
|
||
|
yield UInt64(self, "next_hdr_size", "Next header size")
|
||
|
yield textHandler(UInt32(self, "next_hdr_crc",
|
||
|
"Next header CRC"), hexadecimal)
|
||
|
|
||
|
class SignatureHeader(FieldSet):
|
||
|
static_size = 96 + StartHeader.static_size
|
||
|
def createFields(self):
|
||
|
yield Bytes(self, "signature", 6, "Signature Header")
|
||
|
yield UInt8(self, "major_ver", "Archive major version")
|
||
|
yield UInt8(self, "minor_ver", "Archive minor version")
|
||
|
yield textHandler(UInt32(self, "start_hdr_crc",
|
||
|
"Start header CRC"), hexadecimal)
|
||
|
yield StartHeader(self, "start_hdr", "Start header")
|
||
|
|
||
|
class SevenZipParser(Parser):
|
||
|
PARSER_TAGS = {
|
||
|
"id": "7zip",
|
||
|
"category": "archive",
|
||
|
"file_ext": ("7z",),
|
||
|
"mime": (u"application/x-7z-compressed",),
|
||
|
"min_size": 32*8,
|
||
|
"magic": (("7z\xbc\xaf\x27\x1c", 0),),
|
||
|
"description": "Compressed archive in 7z format"
|
||
|
}
|
||
|
endian = LITTLE_ENDIAN
|
||
|
|
||
|
def createFields(self):
|
||
|
yield SignatureHeader(self, "signature", "Signature Header")
|
||
|
yield Body(self, "body_data")
|
||
|
yield NextHeader(self, "next_hdr")
|
||
|
|
||
|
def validate(self):
|
||
|
if self.stream.readBytes(0,6) != "7z\xbc\xaf'\x1c":
|
||
|
return "Invalid signature"
|
||
|
return True
|
||
|
|
||
|
def createContentSize(self):
|
||
|
size = self["/signature/start_hdr/next_hdr_offset"].value
|
||
|
size += self["/signature/start_hdr/next_hdr_size"].value
|
||
|
size += 12 # Signature size
|
||
|
size += 20 # Start header size
|
||
|
return size*8
|