-
-
Notifications
You must be signed in to change notification settings - Fork 148
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Tool to find likely stored block from deflate streams of a known file #99
Comments
import zlib
import struct
import argparse
from collections import defaultdict
# Helper class to read bits from byte data
class BitReader:
def __init__(self, data):
self.data = data
self.pos = 0
self.bit_pos = 0
def read_bits(self, num_bits):
result = 0
for _ in range(num_bits):
if self.bit_pos == 0:
self.current_byte = self.data[self.pos]
self.pos += 1
result |= ((self.current_byte >> self.bit_pos) & 1) << _
self.bit_pos = (self.bit_pos + 1) % 8
return result
def read_bytes(self, num_bytes):
result = self.data[self.pos:self.pos + num_bytes]
self.pos += num_bytes
return result
# Helper functions for Huffman decoding
def build_huffman_tree(codes_lengths):
"""Builds a Huffman tree for given code lengths."""
code_to_symbol = {}
max_bits = max(codes_lengths)
bl_count = [0] * (max_bits + 1)
next_code = [0] * (max_bits + 1)
for length in codes_lengths:
bl_count[length] += 1
code = 0
for bits in range(1, max_bits + 1):
code = (code + bl_count[bits - 1]) << 1
next_code[bits] = code
for symbol, length in enumerate(codes_lengths):
if length != 0:
code_to_symbol[next_code[length]] = symbol
next_code[length] += 1
return code_to_symbol
def decode_huffman_code(bit_reader, huffman_tree):
"""Decodes a symbol using the provided Huffman tree."""
code = 0
while True:
code = (code << 1) | bit_reader.read_bits(1)
if code in huffman_tree:
return huffman_tree[code]
def parse_deflate_stream(data):
"""Parse a deflate stream and return offsets of stored blocks."""
offsets = []
bit_reader = BitReader(data)
while bit_reader.pos < len(data):
byte = bit_reader.read_bits(8)
if byte & 0x01: # Check if the block is the last block
last_block = True
else:
last_block = False
block_type = (byte >> 1) & 0x03
if block_type == 0: # Stored block
if bit_reader.pos + 4 > len(data):
raise ValueError("Unexpected end of data")
length, nlength = struct.unpack_from("<HH", data, bit_reader.pos)
bit_reader.pos += 4
if length != (~nlength & 0xFFFF):
raise ValueError("Stored block length does not match its one's complement")
offsets.append(bit_reader.pos)
bit_reader.pos += length
elif block_type == 1: # Fixed Huffman
parse_fixed_huffman_block(bit_reader)
elif block_type == 2: # Dynamic Huffman
parse_dynamic_huffman_block(bit_reader)
else:
raise ValueError("Reserved block type encountered")
if last_block:
break
return offsets
def parse_fixed_huffman_block(bit_reader):
"""Parse fixed Huffman blocks."""
lit_len_tree = build_fixed_huffman_tree()
dist_tree = build_fixed_distance_tree()
while True:
symbol = decode_huffman_code(bit_reader, lit_len_tree)
if symbol < 256: # Literal byte
continue
elif symbol == 256: # End of block
break
else: # Length/Distance pair
length = decode_length(bit_reader, symbol)
distance = decode_distance(bit_reader, dist_tree)
bit_reader.pos += length # Skip over the length of data
return
def parse_dynamic_huffman_block(bit_reader):
"""Parse dynamic Huffman blocks."""
hlit = bit_reader.read_bits(5) + 257
hdist = bit_reader.read_bits(5) + 1
hclen = bit_reader.read_bits(4) + 4
code_length_order = [16, 17, 18, 0, 8, 7, 9, 6, 10, 5, 11, 4, 12, 3, 13, 2, 14, 1, 15]
code_lengths = [0] * 19
for i in range(hclen):
code_lengths[code_length_order[i]] = bit_reader.read_bits(3)
code_length_tree = build_huffman_tree(code_lengths)
lit_len_lengths = []
dist_lengths = []
num_lengths = hlit + hdist
while len(lit_len_lengths) + len(dist_lengths) < num_lengths:
symbol = decode_huffman_code(bit_reader, code_length_tree)
if symbol <= 15:
lit_len_lengths.append(symbol)
elif symbol == 16:
repeat = bit_reader.read_bits(2) + 3
lit_len_lengths.extend([lit_len_lengths[-1]] * repeat)
elif symbol == 17:
repeat = bit_reader.read_bits(3) + 3
lit_len_lengths.extend([0] * repeat)
elif symbol == 18:
repeat = bit_reader.read_bits(7) + 11
lit_len_lengths.extend([0] * repeat)
lit_len_tree = build_huffman_tree(lit_len_lengths[:hlit])
dist_tree = build_huffman_tree(lit_len_lengths[hlit:hlit + hdist])
while True:
symbol = decode_huffman_code(bit_reader, lit_len_tree)
if symbol < 256: # Literal byte
continue
elif symbol == 256: # End of block
break
else: # Length/Distance pair
length = decode_length(bit_reader, symbol)
distance = decode_distance(bit_reader, dist_tree)
bit_reader.pos += length # Skip over the length of data
return
def build_fixed_huffman_tree():
"""Builds a fixed Huffman tree for literals/lengths."""
lit_len_lengths = [8] * 144 + [9] * 112 + [7] * 24 + [8] * 8
return build_huffman_tree(lit_len_lengths)
def build_fixed_distance_tree():
"""Builds a fixed Huffman tree for distances."""
dist_lengths = [5] * 32
return build_huffman_tree(dist_lengths)
def decode_length(bit_reader, symbol):
"""Decodes the length based on the symbol."""
if symbol <= 264:
return symbol - 254
elif symbol == 285:
return 258
else:
extra_bits = (symbol - 265) // 4 + 1
base_length = (symbol - 261) * 4 + 11
return base_length + bit_reader.read_bits(extra_bits)
def decode_distance(bit_reader, dist_tree):
"""Decodes the distance based on the Huffman tree."""
symbol = decode_huffman_code(bit_reader, dist_tree)
if symbol < 4:
return symbol + 1
else:
extra_bits = (symbol // 2) - 1
base_distance = (2 + (symbol % 2)) << extra_bits
return base_distance + bit_reader.read_bits(extra_bits)
def brute_force_offsets(deflate_streams):
"""Brute force the offset of stored blocks in multiple deflate streams."""
common_offsets = None
for stream in deflate_streams:
offsets = parse_deflate_stream(stream)
if common_offsets is None:
common_offsets = set(offsets)
else:
common_offsets &= set(offsets)
return common_offsets
def main():
parser = argparse.ArgumentParser(description="Find common stored block offsets in deflate streams.")
parser.add_argument('files', metavar='F', type=str, nargs='+', help='Paths to deflate stream files')
args = parser.parse_args()
deflate_streams = []
for file in args.files:
with open(file, 'rb') as f:
deflate_streams.append(f.read())
common_offsets = brute_force_offsets(deflate_streams)
print("Common stored block offsets:", common_offsets)
if __name__ == "__main__":
main()
`
|
Thank you for sharing this. An idea to make the tool more useful: it would be better to list overlapping intervals (offset and length) of stored data relative to the original file, and their corresponding position in deflate streams. |
Originally posted by @kimci86 in #95 (comment)
The text was updated successfully, but these errors were encountered: