Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Usage of InputStream instead of file path #16

Open
mykolapolonskyi opened this issue Apr 2, 2019 · 2 comments
Open

Usage of InputStream instead of file path #16

mykolapolonskyi opened this issue Apr 2, 2019 · 2 comments

Comments

@mykolapolonskyi
Copy link

maybe generated code can be more flexible with usage of of InputStream instead of file path

e.g.

Stl.fromFile(ClassPathResource("stl/bottom_shell_ONLY.stl").file.absolutePath) 

it won't give chance to process files that comes from network(gdrive) for example without saving on disk

@GreyCat
Copy link
Member

GreyCat commented Apr 4, 2019

Note that InputStream does not have a way to do seeks/positioning in a stream, and that's vital for many formats.

You're welcome to contribute an implementation of KaitaiStream which will use InputStream (for example, you can throw RuntimeExceptions on attempts to seek, or try some magic with mark/reset), but without seeks it will be probably only useful for only a few simple formats, and with seek emulation it will be relatively slow and memory-hogging.

@GreyCat GreyCat changed the title Usage of InputStram instead of file path Usage of InputStream instead of file path Apr 4, 2019
@ratcashdev
Copy link

Consider the code below:

import io.kaitai.struct.ByteBufferKaitaiStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import org.agrona.LangUtil;

public class KaitaiInputStream extends ByteBufferKaitaiStream {
  private static final int BUFFER_SIZE = 32 * 1024;

  private final ReadableByteChannel channel;
  private final ByteBuffer buffer;
  private final ByteBuffer helper;
  private long pos;

  public static KaitaiInputStream fromStream(InputStream is) {
    final var channel = Channels.newChannel(is);
    final var buffer = ByteBuffer.allocateDirect(BUFFER_SIZE);
    return new KaitaiInputStream(channel, buffer);
  }

  private KaitaiInputStream(ReadableByteChannel ch, ByteBuffer buffer) {
    super(buffer);
    this.channel = ch;
    this.buffer = buffer;
    this.buffer.limit(0);
    this.helper = ByteBuffer.allocateDirect(buffer.capacity());
    this.helper.limit(0);
  }

  private void readBytesToBuffer(int count) {
    int unused = buffer.capacity() - buffer.limit();
    if (unused > count) {
      append(buffer, count);
      return;
    }

    pos += buffer.position();
    moveRemainingToFront();
    append(buffer, count);
  }

  private void moveRemainingToFront() {
    helper.clear();
    helper.put(buffer);
    helper.flip();
    buffer.clear();
    buffer.put(helper);
    buffer.flip();
  }

  private void append(ByteBuffer dst, int count) {
    int mark = dst.position();
    dst.position(dst.limit());
    dst.limit(dst.capacity());
    fillBuffer(dst, count);
    dst.position(mark);
  }

  private void fillBuffer(ByteBuffer dst, int count) {
    int remaining = count;
    try {
      while (remaining > 0 && channel.isOpen()) {
        int bytesRead = channel.read(dst);
        if (bytesRead == -1) {
          throw new IOException("End of stream");
        }
        if (bytesRead > 0) {
          remaining -= bytesRead;
        }
      }
      dst.flip();
    } catch (IOException ex) {
      LangUtil.rethrowUnchecked(ex);
    }
  }

  private void ensureBytes(int byteCount) {
    if (buffer.remaining() >= byteCount) {
      return;
    }
    readBytesToBuffer(byteCount - buffer.remaining());
  }

  @Override
  public void close() throws IOException {
    channel.close();
  }

  @Override
  public boolean isEof() {
    return !channel.isOpen();
  }

  @Override
  public void seek(int i) {
    ensureBytes(i);
    buffer.position(buffer.position() + i);
    pos += i;
  }

  @Override
  public void seek(long l) {
    if (l > Integer.MAX_VALUE) {
      throw new UnsupportedOperationException("Not supported yet.");
    }
    seek((int) l);
  }

  @Override
  public int pos() {
    return (int) pos;
  }

  @Override
  public long size() {
    return Integer.MAX_VALUE;
  }

  @Override
  public byte readS1() {
    ensureBytes(1);
    return super.readS1();
  }

  @Override
  public short readS2be() {
    ensureBytes(2);
    return super.readS2be();
  }

  @Override
  public int readS4be() {
    ensureBytes(4);
    return super.readS4be();
  }

  @Override
  public long readS8be() {
    ensureBytes(8);
    return super.readS8be();
  }

  @Override
  public short readS2le() {
    ensureBytes(2);
    return super.readS2le();
  }

  @Override
  public int readS4le() {
    ensureBytes(4);
    return super.readS4le();
  }

  @Override
  public long readS8le() {
    ensureBytes(8);
    return super.readS8le();
  }

  @Override
  public int readU1() {
    ensureBytes(1);
    return super.readU1();
  }

  @Override
  public int readU2be() {
    ensureBytes(2);
    return super.readU2be();
  }

  @Override
  public long readU4be() {
    ensureBytes(4);
    return super.readU4be();
  }

  @Override
  public int readU2le() {
    ensureBytes(2);
    return super.readU2le();
  }

  @Override
  public long readU4le() {
    ensureBytes(4);
    return super.readU4le();
  }

  @Override
  public float readF4be() {
    ensureBytes(4);
    return super.readF4be();
  }

  @Override
  public double readF8be() {
    ensureBytes(8);
    return super.readF8be();
  }

  @Override
  public float readF4le() {
    ensureBytes(4);
    return super.readF4le();
  }

  @Override
  public double readF8le() {
    ensureBytes(8);
    return super.readF8le();
  }

  @Override
  public byte[] readBytes(long n) {
    ensureBytes(toByteArrayLength(n));
    return super.readBytes(n);
  }

  @Override
  public byte[] readBytesFull() {
    throw new UnsupportedOperationException("Not supported yet.");
  }

  @Override
  public byte[] readBytesTerm(
      byte term, boolean includeTerm, boolean consumeTerm, boolean eosError) {
    throw new UnsupportedOperationException("Not supported yet.");
  }
}

with the above, we can have a streaming PCap parser, as follows:

public class StreamingPCap extends Pcap implements Iterable<Packet> {
  private Pcap.Header hdr;

  public StreamingPCap(KaitaiStream io) {
    super(io);
    hdr = new Pcap.Header(this._io, this, this);
  }

  @Override
  public Header hdr() {
    return hdr;
  }

  @Override
  protected void _read() {
    // do nothing
  }

  public Stream<Packet> packetStream() {
    return StreamSupport.stream(this.spliterator(), false);
  }

  @Override
  public Iterator<Packet> iterator() {
    return new PacketIterator(_io, this);
  }

  private static class PacketIterator implements Iterator<Packet> {
    private final KaitaiStream io;
    private final Pcap parent;

    private PacketIterator(KaitaiStream io, Pcap parent) {
      this.io = io;
      this.parent = parent;
    }

    @Override
    public boolean hasNext() {
      return !io.isEof();
    }

    @Override
    public Packet next() {
      return new Packet(this.io, parent, parent);
    }
  }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants