Skip to content

Commit

Permalink
refactored data clumps
Browse files Browse the repository at this point in the history
  • Loading branch information
compf committed Apr 12, 2024
1 parent 00db8d5 commit 00afcc5
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 78 deletions.
3 changes: 2 additions & 1 deletion flyway-core/src/main/java/org/flywaydb/core/Flyway.java
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,8 @@ public MigrateResult migrate() throws FlywayException {
public MigrationInfoService info() {

return flywayExecutor.execute((migrationResolver, schemaHistory, database, defaultSchema, schemas, callbackExecutor, statementInterceptor) -> {
MigrationInfoService migrationInfoService = new DbInfo(migrationResolver, schemaHistory, configuration, database, callbackExecutor, schemas).info();
var flywayCommandSupport = new FlywayCommandSupport(migrationResolver, schemaHistory, configuration, database, callbackExecutor);
MigrationInfoService migrationInfoService = new DbInfo(schemas,flywayCommandSupport).info();

callbackExecutor.onOperationFinishEvent(Event.AFTER_INFO_OPERATION_FINISH, migrationInfoService.getInfoResult());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,30 +30,26 @@

@RequiredArgsConstructor
public class DbInfo {
private final CompositeMigrationResolver migrationResolver;
private final SchemaHistory schemaHistory;
private final Configuration configuration;
private final Database database;
private final CallbackExecutor callbackExecutor;
private final Schema[] schemas;
private final FlywayCommandSupport flywayCommandSupport;

public MigrationInfoService info() {

callbackExecutor.onEvent(Event.BEFORE_INFO);
flywayCommandSupport.getCallbackExecutor().onEvent(Event.BEFORE_INFO);

MigrationInfoServiceImpl migrationInfoService;
try {
migrationInfoService =
new MigrationInfoServiceImpl(migrationResolver, schemaHistory, database, configuration,
configuration.getTarget(), configuration.isOutOfOrder(), ValidatePatternUtils.getIgnoreAllPattern(), configuration.getCherryPick());
new MigrationInfoServiceImpl(flywayCommandSupport.getMigrationResolver(), flywayCommandSupport.getSchemaHistory(), flywayCommandSupport.getDatabase(), flywayCommandSupport.getConfiguration(),
flywayCommandSupport.getConfiguration().getTarget(), flywayCommandSupport.getConfiguration().isOutOfOrder(), ValidatePatternUtils.getIgnoreAllPattern(), flywayCommandSupport.getConfiguration().getCherryPick());
migrationInfoService.refresh();
migrationInfoService.setAllSchemasEmpty(schemas);
} catch (FlywayException e) {
callbackExecutor.onEvent(Event.AFTER_INFO_ERROR);
flywayCommandSupport.getCallbackExecutor().onEvent(Event.AFTER_INFO_ERROR);
throw e;
}

callbackExecutor.onEvent(Event.AFTER_INFO);
flywayCommandSupport.getCallbackExecutor().onEvent(Event.AFTER_INFO);

return migrationInfoService;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,10 @@
@CustomLog
public class DbMigrate {

private final Database database;
private final SchemaHistory schemaHistory;
/**
* The schema containing the schema history table.
*/
private final Schema schema;
private final CompositeMigrationResolver migrationResolver;
private final Configuration configuration;
private final CallbackExecutor callbackExecutor;
/**
* The connection to use to perform the actual database migrations.
*/
Expand All @@ -66,34 +61,35 @@ public class DbMigrate {
*/
private boolean isPreviousVersioned;
private final List<ResolvedMigration> appliedResolvedMigrations = new ArrayList<>();
private FlywayCommandSupport flywayCommandSupport = new FlywayCommandSupport(null, null, null, null, null);

public DbMigrate(Database database,
SchemaHistory schemaHistory, Schema schema, CompositeMigrationResolver migrationResolver,
Configuration configuration, CallbackExecutor callbackExecutor) {
this.database = database;
this.flywayCommandSupport.setDatabase(database);
this.connectionUserObjects = database.getMigrationConnection();
this.schemaHistory = schemaHistory;
this.flywayCommandSupport.setSchemaHistory(schemaHistory);
this.schema = schema;
this.migrationResolver = migrationResolver;
this.configuration = configuration;
this.callbackExecutor = callbackExecutor;
this.flywayCommandSupport.setMigrationResolver(migrationResolver);
this.flywayCommandSupport.setConfiguration(configuration);
this.flywayCommandSupport.setCallbackExecutor(callbackExecutor);
}

/**
* Starts the actual migration.
*/
public MigrateResult migrate() throws FlywayException {
callbackExecutor.onMigrateOrUndoEvent(Event.BEFORE_MIGRATE);
flywayCommandSupport.getCallbackExecutor().onMigrateOrUndoEvent(Event.BEFORE_MIGRATE);

migrateResult = CommandResultFactory.createMigrateResult(database.getCatalog(), configuration);
migrateResult = CommandResultFactory.createMigrateResult(flywayCommandSupport.getDatabase().getCatalog(), flywayCommandSupport.getConfiguration());

int count;
try {

count = configuration.isGroup() ?
count = flywayCommandSupport.getConfiguration().isGroup() ?
// When group is active, start the transaction boundary early to
// ensure that all changes to the schema history table are either committed or rolled back atomically.
schemaHistory.lock(this::migrateAll) :
flywayCommandSupport.getSchemaHistory().lock(this::migrateAll) :
// For all regular cases, proceed with the migration as usual.
migrateAll();

Expand All @@ -103,14 +99,14 @@ public MigrateResult migrate() throws FlywayException {
logSummary(count, migrateResult.getTotalMigrationTime(), migrateResult.targetSchemaVersion);

} catch (FlywayException e) {
callbackExecutor.onMigrateOrUndoEvent(Event.AFTER_MIGRATE_ERROR);
flywayCommandSupport.getCallbackExecutor().onMigrateOrUndoEvent(Event.AFTER_MIGRATE_ERROR);
throw e;
}

if (count > 0) {
callbackExecutor.onMigrateOrUndoEvent(Event.AFTER_MIGRATE_APPLIED);
flywayCommandSupport.getCallbackExecutor().onMigrateOrUndoEvent(Event.AFTER_MIGRATE_APPLIED);
}
callbackExecutor.onMigrateOrUndoEvent(Event.AFTER_MIGRATE);
flywayCommandSupport.getCallbackExecutor().onMigrateOrUndoEvent(Event.AFTER_MIGRATE);

return migrateResult;
}
Expand All @@ -133,26 +129,26 @@ private int migrateAll() {

while (true) {
final boolean firstRun = total == 0;
int count = configuration.isGroup()
int count = flywayCommandSupport.getConfiguration().isGroup()
// With group active a lock on the schema history table has already been acquired.
? migrateGroup(firstRun)
// Otherwise acquire the lock now. The lock will be released at the end of each migration.
: schemaHistory.lock(() -> migrateGroup(firstRun));
: flywayCommandSupport.getSchemaHistory().lock(() -> migrateGroup(firstRun));

migrateResult.migrationsExecuted += count;

total += count;
if (count == 0) {
// No further migrations available
break;
} else if (configuration.getTarget() == MigrationVersion.NEXT) {
} else if (flywayCommandSupport.getConfiguration().getTarget() == MigrationVersion.NEXT) {
// With target=next we only execute one migration
break;
}
}

if (isPreviousVersioned) {
callbackExecutor.onMigrateOrUndoEvent(Event.AFTER_VERSIONED);
flywayCommandSupport.getCallbackExecutor().onMigrateOrUndoEvent(Event.AFTER_VERSIONED);
}

return total;
Expand All @@ -166,8 +162,8 @@ private int migrateAll() {
*/
private Integer migrateGroup(boolean firstRun) {
MigrationInfoServiceImpl infoService =
new MigrationInfoServiceImpl(migrationResolver, schemaHistory, database, configuration,
configuration.getTarget(), configuration.isOutOfOrder(), ValidatePatternUtils.getIgnoreAllPattern(), configuration.getCherryPick());
new MigrationInfoServiceImpl(flywayCommandSupport.getMigrationResolver(), flywayCommandSupport.getSchemaHistory(), flywayCommandSupport.getDatabase(), flywayCommandSupport.getConfiguration(),
flywayCommandSupport.getConfiguration().getTarget(), flywayCommandSupport.getConfiguration().isOutOfOrder(), ValidatePatternUtils.getIgnoreAllPattern(), flywayCommandSupport.getConfiguration().getCherryPick());
infoService.refresh();

MigrationInfo current = infoService.current();
Expand All @@ -178,7 +174,7 @@ private Integer migrateGroup(boolean firstRun) {
MigrationVersion schemaVersionToOutput = currentSchemaVersion == null ? MigrationVersion.EMPTY : currentSchemaVersion;
migrateResult.initialSchemaVersion = schemaVersionToOutput.getVersion();

if (configuration.isOutOfOrder()) {
if (flywayCommandSupport.getConfiguration().isOutOfOrder()) {
String outOfOrderWarning = "outOfOrder mode is active. Migration of schema " + schema + " may not be reproducible.";
LOG.warn(outOfOrderWarning);
migrateResult.addWarning(outOfOrderWarning);
Expand Down Expand Up @@ -209,7 +205,7 @@ private Integer migrateGroup(boolean firstRun) {
if (failed.length > 0) {
if ((failed.length == 1)
&& (failed[0].getState() == MigrationState.FUTURE_FAILED)
&& ValidatePatternUtils.isFutureIgnored(configuration.getIgnoreMigrationPatterns())) {
&& ValidatePatternUtils.isFutureIgnored(flywayCommandSupport.getConfiguration().getIgnoreMigrationPatterns())) {
LOG.warn("Schema " + schema + " contains a failed future migration to version " + failed[0].getVersion() + " !");
} else {
final boolean inTransaction = failed[0].canExecuteInTransaction();
Expand All @@ -231,7 +227,7 @@ private Integer migrateGroup(boolean firstRun) {

group.put(pendingMigration, isOutOfOrder);

if (!configuration.isGroup()) {
if (!flywayCommandSupport.getConfiguration().isGroup()) {
// Only include one pending migration if group is disabled
break;
}
Expand Down Expand Up @@ -269,7 +265,7 @@ private void applyMigrations(final LinkedHashMap<MigrationInfoImpl, Boolean> gro
final StopWatch stopWatch = new StopWatch();
try {
if (executeGroupInTransaction) {
ExecutionTemplateFactory.createExecutionTemplate(connectionUserObjects.getJdbcConnection(), database).execute(() -> {
ExecutionTemplateFactory.createExecutionTemplate(connectionUserObjects.getJdbcConnection(), flywayCommandSupport.getDatabase()).execute(() -> {
doMigrateGroup(group, stopWatch, skipExecutingMigrations, true);
return null;
});
Expand All @@ -280,14 +276,14 @@ private void applyMigrations(final LinkedHashMap<MigrationInfoImpl, Boolean> gro
MigrationInfo migration = e.getMigration();

String failedMsg = "Migration of " + toMigrationText(migration, e.isExecutableInTransaction(), e.isOutOfOrder()) + " failed!";
if (database.supportsDdlTransactions() && executeGroupInTransaction) {
if (flywayCommandSupport.getDatabase().supportsDdlTransactions() && executeGroupInTransaction) {
LOG.error(failedMsg + " Changes successfully rolled back.");
} else {
LOG.error(failedMsg + " Please restore backups and roll back database and code!");

stopWatch.stop();
int executionTime = (int) stopWatch.getTotalTimeMillis();
schemaHistory.addAppliedMigration(migration.getVersion(), migration.getDescription(),
flywayCommandSupport.getSchemaHistory().addAppliedMigration(migration.getVersion(), migration.getDescription(),
migration.getType(), migration.getScript(), migration.getChecksum(), executionTime, false);
}
throw e;
Expand All @@ -308,7 +304,7 @@ private boolean isExecuteGroupInTransaction(LinkedHashMap<MigrationInfoImpl, Boo
continue;
}

if (!configuration.isMixed() && executeGroupInTransaction != inTransaction) {
if (!flywayCommandSupport.getConfiguration().isMixed() && executeGroupInTransaction != inTransaction) {
throw new FlywayMigrateException(entry.getKey(),
"Detected both transactional and non-transactional migrations within the same migration group"
+ " (even though mixed is false). First offending migration: "
Expand All @@ -329,7 +325,7 @@ private void doMigrateGroup(LinkedHashMap<MigrationInfoImpl, Boolean> group, Sto
Context context = new Context() {
@Override
public Configuration getConfiguration() {
return configuration;
return flywayCommandSupport.getConfiguration();
}

@Override
Expand All @@ -347,8 +343,8 @@ public java.sql.Connection getConnection() {
stopWatch.start();

if (isPreviousVersioned && migration.getVersion() == null) {
callbackExecutor.onMigrateOrUndoEvent(Event.AFTER_VERSIONED);
callbackExecutor.onMigrateOrUndoEvent(Event.BEFORE_REPEATABLES);
flywayCommandSupport.getCallbackExecutor().onMigrateOrUndoEvent(Event.AFTER_VERSIONED);
flywayCommandSupport.getCallbackExecutor().onMigrateOrUndoEvent(Event.BEFORE_REPEATABLES);
isPreviousVersioned = false;
}

Expand All @@ -361,35 +357,35 @@ public java.sql.Connection getConnection() {
connectionUserObjects.changeCurrentSchemaTo(schema);

try {
callbackExecutor.setMigrationInfo(migration);
callbackExecutor.onEachMigrateOrUndoEvent(Event.BEFORE_EACH_MIGRATE);
flywayCommandSupport.getCallbackExecutor().setMigrationInfo(migration);
flywayCommandSupport.getCallbackExecutor().onEachMigrateOrUndoEvent(Event.BEFORE_EACH_MIGRATE);
try {
LOG.info("Migrating " + migrationText);

// With single connection databases we need to manually disable the transaction for the
// migration as it is turned on for schema history changes
boolean oldAutoCommit = context.getConnection().getAutoCommit();
if (database.useSingleConnection() && !isExecuteInTransaction) {
if (flywayCommandSupport.getDatabase().useSingleConnection() && !isExecuteInTransaction) {
context.getConnection().setAutoCommit(true);
}
migration.getResolvedMigration().getExecutor().execute(context);
if (database.useSingleConnection() && !isExecuteInTransaction) {
if (flywayCommandSupport.getDatabase().useSingleConnection() && !isExecuteInTransaction) {
context.getConnection().setAutoCommit(oldAutoCommit);
}

appliedResolvedMigrations.add(migration.getResolvedMigration());
} catch (FlywayException e) {
callbackExecutor.onEachMigrateOrUndoEvent(Event.AFTER_EACH_MIGRATE_ERROR);
flywayCommandSupport.getCallbackExecutor().onEachMigrateOrUndoEvent(Event.AFTER_EACH_MIGRATE_ERROR);
throw new FlywayMigrateException(migration, isOutOfOrder, e, migration.canExecuteInTransaction(), migrateResult);
} catch (SQLException e) {
callbackExecutor.onEachMigrateOrUndoEvent(Event.AFTER_EACH_MIGRATE_ERROR);
flywayCommandSupport.getCallbackExecutor().onEachMigrateOrUndoEvent(Event.AFTER_EACH_MIGRATE_ERROR);
throw new FlywayMigrateException(migration, isOutOfOrder, e, migration.canExecuteInTransaction(), migrateResult);
}

LOG.debug("Successfully completed migration of " + migrationText);
callbackExecutor.onEachMigrateOrUndoEvent(Event.AFTER_EACH_MIGRATE);
flywayCommandSupport.getCallbackExecutor().onEachMigrateOrUndoEvent(Event.AFTER_EACH_MIGRATE);
} finally {
callbackExecutor.setMigrationInfo(null);
flywayCommandSupport.getCallbackExecutor().setMigrationInfo(null);
}
}

Expand All @@ -398,7 +394,7 @@ public java.sql.Connection getConnection() {

migrateResult.migrations.add(CommandResultFactory.createMigrateOutput(migration, executionTime));

schemaHistory.addAppliedMigration(migration.getVersion(), migration.getDescription(), migration.getType(),
flywayCommandSupport.getSchemaHistory().addAppliedMigration(migration.getVersion(), migration.getDescription(), migration.getType(),
migration.getScript(), migration.getResolvedMigration().getChecksum(), executionTime, true);
}
}
Expand Down

0 comments on commit 00afcc5

Please sign in to comment.