Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tool to find likely stored block from deflate streams of a known file #99

Open
kimci86 opened this issue May 17, 2023 · 2 comments
Open

Comments

@kimci86
Copy link
Owner

kimci86 commented May 17, 2023

[...] to look for a large stored block that appears in deflate output for various tools and parameters, hoping that it also appears in encrypted data, and bruteforce the offset. I don't know if some tool can help finding such stored block based on several deflate streams. Maybe I can make such a tool in the future.

Originally posted by @kimci86 in #95 (comment)

@G-lacier
Copy link

 import zlib
import struct
import argparse
from collections import defaultdict

# Helper class to read bits from byte data
class BitReader:
    def __init__(self, data):
        self.data = data
        self.pos = 0
        self.bit_pos = 0

    def read_bits(self, num_bits):
        result = 0
        for _ in range(num_bits):
            if self.bit_pos == 0:
                self.current_byte = self.data[self.pos]
                self.pos += 1
            result |= ((self.current_byte >> self.bit_pos) & 1) << _
            self.bit_pos = (self.bit_pos + 1) % 8
        return result

    def read_bytes(self, num_bytes):
        result = self.data[self.pos:self.pos + num_bytes]
        self.pos += num_bytes
        return result

# Helper functions for Huffman decoding
def build_huffman_tree(codes_lengths):
    """Builds a Huffman tree for given code lengths."""
    code_to_symbol = {}
    max_bits = max(codes_lengths)
    bl_count = [0] * (max_bits + 1)
    next_code = [0] * (max_bits + 1)

    for length in codes_lengths:
        bl_count[length] += 1

    code = 0
    for bits in range(1, max_bits + 1):
        code = (code + bl_count[bits - 1]) << 1
        next_code[bits] = code

    for symbol, length in enumerate(codes_lengths):
        if length != 0:
            code_to_symbol[next_code[length]] = symbol
            next_code[length] += 1

    return code_to_symbol

def decode_huffman_code(bit_reader, huffman_tree):
    """Decodes a symbol using the provided Huffman tree."""
    code = 0
    while True:
        code = (code << 1) | bit_reader.read_bits(1)
        if code in huffman_tree:
            return huffman_tree[code]

def parse_deflate_stream(data):
    """Parse a deflate stream and return offsets of stored blocks."""
    offsets = []
    bit_reader = BitReader(data)
    while bit_reader.pos < len(data):
        byte = bit_reader.read_bits(8)
        if byte & 0x01:  # Check if the block is the last block
            last_block = True
        else:
            last_block = False
        block_type = (byte >> 1) & 0x03
        if block_type == 0:  # Stored block
            if bit_reader.pos + 4 > len(data):
                raise ValueError("Unexpected end of data")
            length, nlength = struct.unpack_from("<HH", data, bit_reader.pos)
            bit_reader.pos += 4
            if length != (~nlength & 0xFFFF):
                raise ValueError("Stored block length does not match its one's complement")
            offsets.append(bit_reader.pos)
            bit_reader.pos += length
        elif block_type == 1:  # Fixed Huffman
            parse_fixed_huffman_block(bit_reader)
        elif block_type == 2:  # Dynamic Huffman
            parse_dynamic_huffman_block(bit_reader)
        else:
            raise ValueError("Reserved block type encountered")
        if last_block:
            break
    return offsets

def parse_fixed_huffman_block(bit_reader):
    """Parse fixed Huffman blocks."""
    lit_len_tree = build_fixed_huffman_tree()
    dist_tree = build_fixed_distance_tree()
    while True:
        symbol = decode_huffman_code(bit_reader, lit_len_tree)
        if symbol < 256:  # Literal byte
            continue
        elif symbol == 256:  # End of block
            break
        else:  # Length/Distance pair
            length = decode_length(bit_reader, symbol)
            distance = decode_distance(bit_reader, dist_tree)
            bit_reader.pos += length  # Skip over the length of data
    return

def parse_dynamic_huffman_block(bit_reader):
    """Parse dynamic Huffman blocks."""
    hlit = bit_reader.read_bits(5) + 257
    hdist = bit_reader.read_bits(5) + 1
    hclen = bit_reader.read_bits(4) + 4

    code_length_order = [16, 17, 18, 0, 8, 7, 9, 6, 10, 5, 11, 4, 12, 3, 13, 2, 14, 1, 15]
    code_lengths = [0] * 19
    for i in range(hclen):
        code_lengths[code_length_order[i]] = bit_reader.read_bits(3)
    
    code_length_tree = build_huffman_tree(code_lengths)

    lit_len_lengths = []
    dist_lengths = []

    num_lengths = hlit + hdist
    while len(lit_len_lengths) + len(dist_lengths) < num_lengths:
        symbol = decode_huffman_code(bit_reader, code_length_tree)
        if symbol <= 15:
            lit_len_lengths.append(symbol)
        elif symbol == 16:
            repeat = bit_reader.read_bits(2) + 3
            lit_len_lengths.extend([lit_len_lengths[-1]] * repeat)
        elif symbol == 17:
            repeat = bit_reader.read_bits(3) + 3
            lit_len_lengths.extend([0] * repeat)
        elif symbol == 18:
            repeat = bit_reader.read_bits(7) + 11
            lit_len_lengths.extend([0] * repeat)

    lit_len_tree = build_huffman_tree(lit_len_lengths[:hlit])
    dist_tree = build_huffman_tree(lit_len_lengths[hlit:hlit + hdist])

    while True:
        symbol = decode_huffman_code(bit_reader, lit_len_tree)
        if symbol < 256:  # Literal byte
            continue
        elif symbol == 256:  # End of block
            break
        else:  # Length/Distance pair
            length = decode_length(bit_reader, symbol)
            distance = decode_distance(bit_reader, dist_tree)
            bit_reader.pos += length  # Skip over the length of data
    return

def build_fixed_huffman_tree():
    """Builds a fixed Huffman tree for literals/lengths."""
    lit_len_lengths = [8] * 144 + [9] * 112 + [7] * 24 + [8] * 8
    return build_huffman_tree(lit_len_lengths)

def build_fixed_distance_tree():
    """Builds a fixed Huffman tree for distances."""
    dist_lengths = [5] * 32
    return build_huffman_tree(dist_lengths)

def decode_length(bit_reader, symbol):
    """Decodes the length based on the symbol."""
    if symbol <= 264:
        return symbol - 254
    elif symbol == 285:
        return 258
    else:
        extra_bits = (symbol - 265) // 4 + 1
        base_length = (symbol - 261) * 4 + 11
        return base_length + bit_reader.read_bits(extra_bits)

def decode_distance(bit_reader, dist_tree):
    """Decodes the distance based on the Huffman tree."""
    symbol = decode_huffman_code(bit_reader, dist_tree)
    if symbol < 4:
        return symbol + 1
    else:
        extra_bits = (symbol // 2) - 1
        base_distance = (2 + (symbol % 2)) << extra_bits
        return base_distance + bit_reader.read_bits(extra_bits)

def brute_force_offsets(deflate_streams):
    """Brute force the offset of stored blocks in multiple deflate streams."""
    common_offsets = None
    for stream in deflate_streams:
        offsets = parse_deflate_stream(stream)
        if common_offsets is None:
            common_offsets = set(offsets)
        else:
            common_offsets &= set(offsets)
    return common_offsets

def main():
    parser = argparse.ArgumentParser(description="Find common stored block offsets in deflate streams.")
    parser.add_argument('files', metavar='F', type=str, nargs='+', help='Paths to deflate stream files')
    args = parser.parse_args()

    deflate_streams = []
    for file in args.files:
        with open(file, 'rb') as f:
            deflate_streams.append(f.read())

    common_offsets = brute_force_offsets(deflate_streams)
    print("Common stored block offsets:", common_offsets)

if __name__ == "__main__":
    main()
    `

python script.py file1.deflate file2.deflate file3.deflate

@kimci86
Copy link
Owner Author

kimci86 commented May 20, 2024

Thank you for sharing this.
Unfortunately it seems it is not working. It runs into index out of bounds error. I think the huffman decoding function not accounting for leading zeros is a problem.
Anyways it can give inspiration to write a working solution.

An idea to make the tool more useful: it would be better to list overlapping intervals (offset and length) of stored data relative to the original file, and their corresponding position in deflate streams.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants