Skip to content

Commit

Permalink
clean up from testing
Browse files Browse the repository at this point in the history
Signed-off-by: Jesse Nelson <jesse@swirldslabs.com>
  • Loading branch information
jnels124 committed Apr 25, 2024
1 parent b10f022 commit a47381b
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ ASYNC_TABLE_SPLITS=
CONCURRENCY=
CONCURRENT_COPIES_PER_TABLE=
CREATE_IDEXES_BEFORE_MIGRATION=
EXCLUDED_TABLES=("'entity_state_start'","'entity_transaction'","'flyway_schema_history'","'transaction_hash'","'topic_message'")
EXCLUDED_TABLES=("'entity_state_start'","'entity_transaction'","'flyway_schema_history'","'transaction_hash'")
FLYWAY_URL=https://repo1.maven.org/maven2/org/flywaydb/flyway-commandline/9.22.3/flyway-commandline-9.22.3-linux-x64.tar.gz
MAX_TIMESTAMP=
SOURCE_DB_HOST=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ insertCursors() {
local conditions
getCursorConditions conditions

cursorId=1
local cursorId=1
local insertSql="\\copy async_migration_status(the_table, cursor_id, cursor_range) from program 'cat ${sqlTemp}' with csv delimiter ',';"
local values=()
for value in "${conditions[@]}"; do
Expand Down Expand Up @@ -144,7 +144,7 @@ migrateTableBinary() {
fi

local where
if [[ "${table}" != "topic_message" && "${columns}" =~ $TS_COLUMN_REGEX ]]; then
if [[ "${columns}" =~ $TS_COLUMN_REGEX ]]; then
where="where consensus_timestamp \<= ${MAX_TIMESTAMP}"
# handle address book.
elif [[ "${columns}" =~ \<consensus_timestamp_start\> ]]; then
Expand Down Expand Up @@ -182,7 +182,7 @@ migrateTableAsyncBinary() {

local tableCursors=$(PGPASSWORD="${TARGET_DB_PASSWORD}" psql -q --csv -t -h "${TARGET_DB_HOST}" -d "${TARGET_DB_NAME}" -p "${TARGET_DB_PORT}" -U "${TARGET_DB_USER}" -c "${cursorQuery}")

while IFS="," read -r cursor_lower_bound cursor_upper_bound lower_inc upper_inc; do
while IFS="," read -r cursor_id cursor_lower_bound cursor_upper_bound lower_inc upper_inc; do
ACTIVE_COPIES=$(find . -maxdepth 1 -name "process-${table}*" -printf '.' | wc -m)
while [[ "${ACTIVE_COPIES}" -ge ${CONCURRENT_COPIES_PER_TABLE} ]]; do
sleep .5
Expand All @@ -202,9 +202,10 @@ migrateTableAsyncBinary() {
else
upperOperator="<"
fi

local where
if [[ -z "${cursor_lower_bound}" ]]; then
where="where consensus_timestamp \<= ${cursor_upper_bound}"
where="where consensus_timestamp \\${upperOperator} ${cursor_upper_bound}"
else
where="where consensus_timestamp \\${upperOperator} ${cursor_upper_bound} and consensus_timestamp \\${lowerOperator} ${cursor_lower_bound}"
fi
Expand Down Expand Up @@ -314,15 +315,14 @@ log "Installing Flyway"
wget -qO- "${FLYWAY_URL}" | tar -xz && mv flyway-*/* .
log "Flyway installed"

log "Copying Flyway configuration"

CREATE_INDEXES_BEFORE_MIGRATION=${CREATE_INDEXES_BEFORE_MIGRATION:-true}
if [[ "${CREATE_INDEXES_BEFORE_MIGRATION}" = "true" ]]; then
cp "${MIGRATIONS_DIR}/V2.0."* "${FLYWAY_DIR}"/sql/
else
cp "${MIGRATIONS_DIR}/V2.0."[0-2]* "${FLYWAY_DIR}"/sql/
fi

log "Copying Flyway configuration"
cat >"${FLYWAY_DIR}/conf/flyway.conf" <<EOF
flyway.password=${TARGET_DB_PASSWORD}
flyway.placeholders.hashShardCount=6
Expand Down Expand Up @@ -353,17 +353,14 @@ ASYNC_TABLES=$(echo "${ASYNC_TABLES//[\']/}" | tr "," "\n")

declare tablesArray
IFS=" " readarray -t tablesArray <<< "${TABLES}"
tablesArray+=("topic_message")
tablesArray+=("${ASYNC_TABLES[@]}")

COUNT="${#tablesArray[@]}"

insertCursors

log "Migrating ${COUNT} tables from ${SOURCE_DB_HOST}:${SOURCE_DB_PORT} to ${TARGET_DB_HOST}:${TARGET_DB_PORT}. Tables: ${TABLES[*]}"

echo "${tablesArray[*]}" | tr " " "\n" |ASYNC_TABLES=$ASYNC_TABLES xargs -n 1 -P "${CONCURRENCY}" -I {} bash -c 'migrateTable "$@"' _ {}

log "migration completed in $SECONDS seconds."

popd || die "Couldn't change directory back to ${SCRIPTS_DIR}"
exit 0

0 comments on commit a47381b

Please sign in to comment.