Skip to content

Commit

Permalink
Multithreading improvement and submodule reference support
Browse files Browse the repository at this point in the history
  • Loading branch information
Artem V. Navrotskiy committed Aug 20, 2015
1 parent dbac21c commit 36f2fab
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 34 deletions.
65 changes: 39 additions & 26 deletions src/main/java/git/lfs/migrate/GitConverter.java
Expand Up @@ -26,10 +26,6 @@
public class GitConverter {
@NotNull
private static final String GIT_ATTRIBUTES = ".gitattributes";
@NotNull
private final Repository srcRepo;
@NotNull
private final RevWalk revWalk;
@Nullable
private final URL lfs;
@NotNull
Expand All @@ -38,10 +34,8 @@ public class GitConverter {
private final File basePath;
private final File tempPath;

public GitConverter(@NotNull Repository srcRepo, @NotNull File basePath, @Nullable URL lfs, @NotNull String[] suffixes) {
this.srcRepo = srcRepo;
public GitConverter(@NotNull File basePath, @Nullable URL lfs, @NotNull String[] suffixes) {
this.basePath = basePath;
this.revWalk = new RevWalk(srcRepo);
this.suffixes = suffixes.clone();
this.lfs = lfs;

Expand All @@ -50,40 +44,59 @@ public GitConverter(@NotNull Repository srcRepo, @NotNull File basePath, @Nullab
}

@NotNull
public ConvertTask convertTask(@NotNull TaskKey key) throws IOException {
public ConvertTask convertTask(@NotNull ObjectReader reader, @NotNull TaskKey key) throws IOException {
switch (key.getType()) {
case Simple: {
final RevObject revObject = revWalk.parseAny(key.getObjectId());
if (!reader.has(key.getObjectId())) {
return keepMissingTask(key.getObjectId());
}
final RevObject revObject = new RevWalk(reader).parseAny(key.getObjectId());
if (revObject instanceof RevCommit) {
return convertCommitTask((RevCommit) revObject);
}
if (revObject instanceof RevTree) {
return convertTreeTask(revObject, false);
return convertTreeTask(reader, revObject, false);
}
if (revObject instanceof RevBlob) {
return copyTask(revObject);
return copyTask(reader, revObject);
}
if (revObject instanceof RevTag) {
return convertTagTask((RevTag) revObject);
}
throw new IllegalStateException("Unsupported object type: " + key + " (" + revObject.getClass().getName() + ")");
}
case Root: {
final RevObject revObject = revWalk.parseAny(key.getObjectId());
final RevObject revObject = new RevWalk(reader).parseAny(key.getObjectId());
if (revObject instanceof RevTree) {
return convertTreeTask(revObject, true);
return convertTreeTask(reader, revObject, true);
}
throw new IllegalStateException("Unsupported object type: " + key + " (" + revObject.getClass().getName() + ")");
}
case Attribute:
return createAttributesTask(key.getObjectId());
return createAttributesTask(reader, key.getObjectId());
case UploadLfs:
return convertLfsTask(key.getObjectId());
return convertLfsTask(reader, key.getObjectId());
default:
throw new IllegalStateException("Unknwon task key type: " + key.getType());
}
}

private ConvertTask keepMissingTask(@NotNull ObjectId objectId) {
return new ConvertTask() {
@NotNull
@Override
public Iterable<TaskKey> depends() throws IOException {
return Collections.emptyList();
}

@NotNull
@Override
public ObjectId convert(@NotNull ObjectInserter inserter, @NotNull ConvertResolver resolver) throws IOException {
return objectId;
}
};
}

@NotNull
private ConvertTask convertTagTask(@NotNull RevTag revObject) throws IOException {
return new ConvertTask() {
Expand Down Expand Up @@ -143,12 +156,12 @@ public ObjectId convert(@NotNull ObjectInserter inserter, @NotNull ConvertResolv
}

@NotNull
private ConvertTask convertTreeTask(@NotNull ObjectId id, boolean rootTree) {
private ConvertTask convertTreeTask(@NotNull ObjectReader reader, @NotNull ObjectId id, boolean rootTree) {
return new ConvertTask() {
@NotNull
private List<GitTreeEntry> getEntries() throws IOException {
final List<GitTreeEntry> entries = new ArrayList<>();
final CanonicalTreeParser treeParser = new CanonicalTreeParser(null, srcRepo.newObjectReader(), id);
final CanonicalTreeParser treeParser = new CanonicalTreeParser(null, reader, id);
boolean needAttributes = rootTree;
while (!treeParser.eof()) {
final FileMode fileMode = treeParser.getEntryFileMode();
Expand Down Expand Up @@ -206,7 +219,7 @@ private boolean matchFilename(@NotNull String fileName) {
}

@NotNull
private ConvertTask convertLfsTask(@Nullable ObjectId id) throws IOException {
private ConvertTask convertLfsTask(@NotNull ObjectReader reader, @Nullable ObjectId id) throws IOException {
return new ConvertTask() {
@NotNull
@Override
Expand All @@ -225,7 +238,7 @@ public ObjectId convert(@NotNull ObjectInserter inserter, @NotNull ConvertResolv
}
// Create LFS stream.
final File tmpFile = new File(tempPath, id.getName());
final ObjectLoader loader = srcRepo.open(id, Constants.OBJ_BLOB);
final ObjectLoader loader = reader.open(id, Constants.OBJ_BLOB);
try (InputStream istream = loader.openStream();
OutputStream ostream = new FileOutputStream(tmpFile)) {
byte[] buffer = new byte[0x10000];
Expand Down Expand Up @@ -311,7 +324,7 @@ private void upload(@NotNull String hash, long size, @NotNull File file) throws
}

@NotNull
private ConvertTask createAttributesTask(@Nullable ObjectId id) throws IOException {
private ConvertTask createAttributesTask(@NotNull final ObjectReader reader, @Nullable ObjectId id) throws IOException {
return new ConvertTask() {
@NotNull
@Override
Expand All @@ -327,9 +340,9 @@ public ObjectId convert(@NotNull ObjectInserter inserter, @NotNull ConvertResolv
attributes.add("*" + suffix + "\tfilter=lfs diff=lfs merge=lfs -crlf");
}
final ByteArrayOutputStream blob = new ByteArrayOutputStream();
try (BufferedReader reader = new BufferedReader(new InputStreamReader(openAttributes(id), StandardCharsets.UTF_8))) {
try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(openAttributes(reader, id), StandardCharsets.UTF_8))) {
while (true) {
String line = reader.readLine();
String line = bufferedReader.readLine();
if (line == null) break;
if (!attributes.remove(line)) {
blob.write(line.getBytes(StandardCharsets.UTF_8));
Expand All @@ -346,7 +359,7 @@ public ObjectId convert(@NotNull ObjectInserter inserter, @NotNull ConvertResolv
};
}

private ConvertTask copyTask(@NotNull ObjectId id) throws IOException {
private ConvertTask copyTask(@NotNull ObjectReader reader, @NotNull ObjectId id) throws IOException {
return new ConvertTask() {
@NotNull
@Override
Expand All @@ -357,7 +370,7 @@ public Iterable<TaskKey> depends() throws IOException {
@NotNull
@Override
public ObjectId convert(@NotNull ObjectInserter inserter, @NotNull ConvertResolver resolver) throws IOException {
final ObjectLoader loader = srcRepo.open(id);
final ObjectLoader loader = reader.open(id);
try (ObjectStream stream = loader.openStream()) {
inserter.insert(loader.getType(), loader.getSize(), stream);
}
Expand All @@ -367,11 +380,11 @@ public ObjectId convert(@NotNull ObjectInserter inserter, @NotNull ConvertResolv
}

@NotNull
private InputStream openAttributes(@Nullable ObjectId id) throws IOException {
private InputStream openAttributes(@NotNull ObjectReader reader, @Nullable ObjectId id) throws IOException {
if (ObjectId.zeroId().equals(id)) {
return new ByteArrayInputStream(new byte[0]);
}
return srcRepo.open(id, Constants.OBJ_BLOB).openStream();
return reader.open(id, Constants.OBJ_BLOB).openStream();
}

public enum TaskType {
Expand Down
19 changes: 11 additions & 8 deletions src/main/java/git/lfs/migrate/Main.java
Expand Up @@ -55,7 +55,7 @@ public static void processRepository(@NotNull File srcPath, @NotNull File dstPat
final Repository dstRepo = new FileRepositoryBuilder()
.setMustExist(false)
.setGitDir(dstPath).build();
final GitConverter converter = new GitConverter(srcRepo, dstPath, lfs, suffixes);
final GitConverter converter = new GitConverter(dstPath, lfs, suffixes);
try {
dstRepo.create(true);
// Load all revision list.
Expand All @@ -66,10 +66,10 @@ public static void processRepository(@NotNull File srcPath, @NotNull File dstPat

final ConcurrentMap<TaskKey, ObjectId> converted = new ConcurrentHashMap<>();
log.info("Converting object without dependencies in " + threads + " threads...", totalObjects);
processMultipleThreads(converter, graph, dstRepo, converted, threads);
processMultipleThreads(converter, graph, srcRepo, dstRepo, converted, threads);

log.info("Converting graph in single thread...");
processSingleThread(converter, graph, dstRepo, converted);
processSingleThread(converter, graph, srcRepo, dstRepo, converted);

// Validate result
if (converted.size() != totalObjects) {
Expand All @@ -91,7 +91,7 @@ public static void processRepository(@NotNull File srcPath, @NotNull File dstPat
}
}

private static void processMultipleThreads(@NotNull GitConverter converter, @NotNull SimpleDirectedGraph<TaskKey, DefaultEdge> graph, Repository dstRepo, @NotNull ConcurrentMap<TaskKey, ObjectId> converted, int threads) throws IOException, InterruptedException {
private static void processMultipleThreads(@NotNull GitConverter converter, @NotNull SimpleDirectedGraph<TaskKey, DefaultEdge> graph, @NotNull Repository srcRepo, @NotNull Repository dstRepo, @NotNull ConcurrentMap<TaskKey, ObjectId> converted, int threads) throws IOException, InterruptedException {
final Deque<TaskKey> queue = new ConcurrentLinkedDeque<>();
for (TaskKey vertex : graph.vertexSet()) {
if (graph.outgoingEdgesOf(vertex).isEmpty()) {
Expand All @@ -106,9 +106,10 @@ private static void processMultipleThreads(@NotNull GitConverter converter, @Not
jobs.add(pool.submit(() -> {
try {
final ObjectInserter inserter = dstRepo.newObjectInserter();
final ObjectReader reader = srcRepo.newObjectReader();
while (!queue.isEmpty()) {
final TaskKey taskKey = queue.poll();
final ObjectId objectId = converter.convertTask(taskKey).convert(inserter, converted::get);
final ObjectId objectId = converter.convertTask(reader, taskKey).convert(inserter, converted::get);
converted.put(taskKey, objectId);
reporter.increment();
}
Expand All @@ -130,7 +131,7 @@ private static void processMultipleThreads(@NotNull GitConverter converter, @Not
}
}

private static void processSingleThread(@NotNull GitConverter converter, @NotNull SimpleDirectedGraph<TaskKey, DefaultEdge> graph, Repository dstRepo, @NotNull Map<TaskKey, ObjectId> converted) throws IOException {
private static void processSingleThread(@NotNull GitConverter converter, @NotNull SimpleDirectedGraph<TaskKey, DefaultEdge> graph, @NotNull Repository srcRepo, @NotNull Repository dstRepo, @NotNull Map<TaskKey, ObjectId> converted) throws IOException {
try (ProgressReporter reporter = new ProgressReporter("completed", graph.vertexSet().size())) {
final Deque<TaskKey> queue = new ArrayDeque<>();
for (TaskKey vertex : graph.vertexSet()) {
Expand All @@ -139,9 +140,10 @@ private static void processSingleThread(@NotNull GitConverter converter, @NotNul
}
}
final ObjectInserter inserter = dstRepo.newObjectInserter();
final ObjectReader reader = srcRepo.newObjectReader();
while (!queue.isEmpty()) {
final TaskKey taskKey = queue.pop();
final ObjectId objectId = converter.convertTask(taskKey).convert(inserter, converted::get);
final ObjectId objectId = converter.convertTask(reader, taskKey).convert(inserter, converted::get);
converted.put(taskKey, objectId);

final List<TaskKey> sources = new ArrayList<>();
Expand Down Expand Up @@ -192,9 +194,10 @@ private static SimpleDirectedGraph<TaskKey, DefaultEdge> loadTaskGraph(@NotNull
reporter.increment();
}
}
final ObjectReader reader = repository.newObjectReader();
while (!queue.isEmpty()) {
final TaskKey taskKey = queue.pop();
for (TaskKey depend : converter.convertTask(taskKey).depends()) {
for (TaskKey depend : converter.convertTask(reader, taskKey).depends()) {
if (graph.addVertex(depend)) {
queue.add(depend);
reporter.increment();
Expand Down

0 comments on commit 36f2fab

Please sign in to comment.