mirror of
https://github.com/Alfresco/alfresco-community-repo.git
synced 2025-07-24 17:32:48 +00:00
MNT-23043 - Property Tables Cleaner Job V3 (#1330)
* Implementation of V3 * Correct commit placement. If done while cursors are still open can result lost cursors - see MNT-23127 * Correct formatting * Unit Test * Add test to test suite * Corrections on the unit test * Correct typo and add documentation * Fix typos * Set the default values as constants * remove initialization of rowsProcessed * Improve comments * Optimizations regarding retrieving min and max values * Fix PostgreSQL sql script for v3
This commit is contained in:
@@ -46,6 +46,7 @@ public class PropTablesCleaner
|
||||
{
|
||||
private static final String PROPERTY_PROP_TABLE_CLEANER_ALG = "system.prop_table_cleaner.algorithm";
|
||||
private static final String PROP_TABLE_CLEANER_ALG_V2 = "V2";
|
||||
private static final String PROP_TABLE_CLEANER_ALG_V3 = "V3";
|
||||
|
||||
private PropertyValueDAO propertyValueDAO;
|
||||
private JobLockService jobLockService;
|
||||
@@ -100,6 +101,10 @@ public class PropTablesCleaner
|
||||
{
|
||||
propertyValueDAO.cleanupUnusedValuesV2();
|
||||
}
|
||||
else if (PROP_TABLE_CLEANER_ALG_V3.equalsIgnoreCase(getAlgorithm()))
|
||||
{
|
||||
propertyValueDAO.cleanupUnusedValuesV3();
|
||||
}
|
||||
else
|
||||
{
|
||||
propertyValueDAO.cleanupUnusedValues();
|
||||
|
@@ -377,4 +377,6 @@ public interface PropertyValueDAO
|
||||
void cleanupUnusedValues();
|
||||
|
||||
void cleanupUnusedValuesV2();
|
||||
|
||||
void cleanupUnusedValuesV3();
|
||||
}
|
||||
|
@@ -758,4 +758,23 @@ public class PropertyValueDAOImpl extends AbstractPropertyValueDAOImpl
|
||||
clearCaches();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cleanupUnusedValuesV3()
|
||||
{
|
||||
// Run the main script
|
||||
try
|
||||
{
|
||||
scriptExecutor.exec(false, "alfresco/dbscripts/utility/${db.script.dialect}", "CleanAlfPropTablesV3.sql");
|
||||
}
|
||||
catch (RuntimeException e)
|
||||
{
|
||||
logger.error("The cleanup script failed: ", e);
|
||||
throw e;
|
||||
}
|
||||
finally
|
||||
{
|
||||
clearCaches();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -73,7 +73,7 @@ public class DeleteNotExistsExecutor implements StatementExecutor
|
||||
private String sql;
|
||||
private int line;
|
||||
private File scriptFile;
|
||||
private Properties globalProperties;
|
||||
protected Properties globalProperties;
|
||||
|
||||
protected boolean readOnly;
|
||||
protected int deleteBatchSize;
|
||||
|
@@ -0,0 +1,501 @@
|
||||
/*
|
||||
* #%L
|
||||
* Alfresco Repository
|
||||
* %%
|
||||
* Copyright (C) 2005 - 2022 Alfresco Software Limited
|
||||
* %%
|
||||
* This file is part of the Alfresco software.
|
||||
* If the software was purchased under a paid Alfresco license, the terms of
|
||||
* the paid license agreement will prevail. Otherwise, the software is
|
||||
* provided under the following open source license terms:
|
||||
*
|
||||
* Alfresco is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Lesser General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Alfresco is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Lesser General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Lesser General Public License
|
||||
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
|
||||
* #L%
|
||||
*/
|
||||
|
||||
package org.alfresco.repo.domain.schema.script;
|
||||
|
||||
import java.io.File;
|
||||
import java.sql.Connection;
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.util.Date;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
|
||||
import javax.sql.DataSource;
|
||||
|
||||
import org.alfresco.repo.domain.dialect.Dialect;
|
||||
import org.alfresco.repo.domain.dialect.MySQLInnoDBDialect;
|
||||
import org.alfresco.util.Pair;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
/**
|
||||
* Same logic as DeleteNotExistsExecutor with the following changes:
|
||||
* <p/>
|
||||
* - filters the queries by unique values
|
||||
* <p/>
|
||||
* - eager close of result sets
|
||||
* <p/>
|
||||
* - we store all the ids in memory and process them from there - the secondary ids are stored in a unique list without
|
||||
* duplicate values.
|
||||
* <p/>
|
||||
* - we only cross 2 sets (the potential ids to delete from the primary table with the set of all secondary ids in that
|
||||
* range) removing all elements from the second set from the first set
|
||||
* <p/>
|
||||
* - every {pauseAndRecoverBatchSize} rows deleted we close all prepared statements and close the connection and sleep
|
||||
* for {pauseAndRecoverTime} milliseconds. This is necessary to allow the DBMS to perform the background tasks without
|
||||
* load from ACS. When we do not do this and if we are performing millions of deletes, the connection eventually gets
|
||||
* aborted.
|
||||
*
|
||||
* @author Eva Vasques
|
||||
*/
|
||||
public class DeleteNotExistsV3Executor extends DeleteNotExistsExecutor
|
||||
{
|
||||
private static Log logger = LogFactory.getLog(DeleteNotExistsV3Executor.class);
|
||||
|
||||
public static final String PROPERTY_PAUSE_AND_RECOVER_BATCHSIZE = "system.delete_not_exists.pauseAndRecoverBatchSize";
|
||||
public static final String PROPERTY_PAUSE_AND_RECOVER_TIME = "system.delete_not_exists.pauseAndRecoverTime";
|
||||
public static final long DEFAULT_PAUSE_AND_RECOVER_BATCHSIZE = 500000;
|
||||
public static final long DEFAULT_PAUSE_AND_RECOVER_TIME = 300000;
|
||||
|
||||
private Dialect dialect;
|
||||
private final DataSource dataSource;
|
||||
private long pauseAndRecoverTime;
|
||||
private long pauseAndRecoverBatchSize;
|
||||
private boolean pauseAndRecover = false;
|
||||
private int processedCounter = 0;
|
||||
|
||||
public DeleteNotExistsV3Executor(Dialect dialect, Connection connection, String sql, int line, File scriptFile,
|
||||
Properties globalProperties, DataSource dataSource)
|
||||
{
|
||||
super(connection, sql, line, scriptFile, globalProperties);
|
||||
this.dialect = dialect;
|
||||
this.dataSource = dataSource;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute() throws Exception
|
||||
{
|
||||
checkProperties();
|
||||
|
||||
String pauseAndRecoverBatchSizeString = globalProperties.getProperty(PROPERTY_PAUSE_AND_RECOVER_BATCHSIZE);
|
||||
pauseAndRecoverBatchSize = pauseAndRecoverBatchSizeString == null ? DEFAULT_PAUSE_AND_RECOVER_BATCHSIZE
|
||||
: Long.parseLong(pauseAndRecoverBatchSizeString);
|
||||
|
||||
String pauseAndRecoverTimeString = globalProperties.getProperty(PROPERTY_PAUSE_AND_RECOVER_TIME);
|
||||
pauseAndRecoverTime = pauseAndRecoverTimeString == null ? DEFAULT_PAUSE_AND_RECOVER_TIME
|
||||
: Long.parseLong(pauseAndRecoverTimeString);
|
||||
|
||||
super.execute();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void process(Pair<String, String>[] tableColumn, Long[] tableUpperLimits, String[] optionalWhereClauses)
|
||||
throws SQLException
|
||||
{
|
||||
String primaryTableName = tableColumn[0].getFirst();
|
||||
String primaryColumnName = tableColumn[0].getSecond();
|
||||
String primaryWhereClause = optionalWhereClauses[0];
|
||||
|
||||
Long primaryId = 0L;
|
||||
|
||||
deletedCount = 0L;
|
||||
startTime = new Date();
|
||||
|
||||
processBatch(primaryTableName, primaryColumnName, primaryWhereClause, primaryId, tableColumn, tableUpperLimits,
|
||||
optionalWhereClauses);
|
||||
|
||||
if (logger.isDebugEnabled())
|
||||
{
|
||||
String msg = ((readOnly) ? "Script would have" : "Script") + " deleted a total of " + deletedCount
|
||||
+ " items from table " + primaryTableName + ".";
|
||||
logger.debug(msg);
|
||||
}
|
||||
}
|
||||
|
||||
private void processBatch(String primaryTableName, String primaryColumnName, String primaryWhereClause, Long primaryId,
|
||||
Pair<String, String>[] tableColumn, Long[] tableUpperLimits, String[] optionalWhereClauses) throws SQLException
|
||||
{
|
||||
PreparedStatement primaryPrepStmt = null;
|
||||
PreparedStatement[] secondaryPrepStmts = null;
|
||||
PreparedStatement deletePrepStmt = null;
|
||||
Set<Long> deleteIds = new HashSet<>();
|
||||
pauseAndRecover = false;
|
||||
|
||||
try
|
||||
{
|
||||
|
||||
connection.setAutoCommit(false);
|
||||
|
||||
primaryPrepStmt = connection
|
||||
.prepareStatement(createPreparedSelectStatement(primaryTableName, primaryColumnName, primaryWhereClause));
|
||||
primaryPrepStmt.setFetchSize(batchSize);
|
||||
primaryPrepStmt.setLong(1, primaryId);
|
||||
primaryPrepStmt.setLong(2, tableUpperLimits[0]);
|
||||
|
||||
boolean hasResults = primaryPrepStmt.execute();
|
||||
|
||||
if (hasResults)
|
||||
{
|
||||
|
||||
// Prepared statements for secondary tables for the next batch
|
||||
secondaryPrepStmts = new PreparedStatement[tableColumn.length];
|
||||
for (int i = 1; i < tableColumn.length; i++)
|
||||
{
|
||||
PreparedStatement secStmt = connection.prepareStatement(createPreparedSelectStatement(
|
||||
tableColumn[i].getFirst(), tableColumn[i].getSecond(), optionalWhereClauses[i]));
|
||||
secStmt.setFetchSize(batchSize);
|
||||
secondaryPrepStmts[i] = secStmt;
|
||||
}
|
||||
|
||||
deletePrepStmt = connection.prepareStatement(
|
||||
createPreparedDeleteStatement(primaryTableName, primaryColumnName, deleteBatchSize, primaryWhereClause));
|
||||
|
||||
// Timeout is only checked at each batch start.
|
||||
// It can be further refined by being verified at each primary row processing.
|
||||
while (hasResults && !isTimeoutExceeded())
|
||||
{
|
||||
// Process batch
|
||||
primaryId = processPrimaryTableResultSet(primaryPrepStmt, secondaryPrepStmts, deletePrepStmt, deleteIds,
|
||||
primaryTableName, primaryColumnName, tableColumn);
|
||||
connection.commit();
|
||||
|
||||
// If we have no more results (next primaryId is null) or job is marked for pause and recover, do
|
||||
// not start the next batch
|
||||
if (primaryId == null || pauseAndRecover)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
// Prepare for next batch
|
||||
primaryPrepStmt.setLong(1, primaryId + 1);
|
||||
primaryPrepStmt.setLong(2, tableUpperLimits[0]);
|
||||
|
||||
// Query the primary table for the next batch
|
||||
hasResults = primaryPrepStmt.execute();
|
||||
}
|
||||
|
||||
// Check if we have any more ids to delete
|
||||
if (!deleteIds.isEmpty())
|
||||
{
|
||||
deleteFromPrimaryTable(deletePrepStmt, deleteIds, primaryTableName);
|
||||
connection.commit();
|
||||
}
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
closeQuietly(deletePrepStmt);
|
||||
closeQuietly(secondaryPrepStmts);
|
||||
closeQuietly(primaryPrepStmt);
|
||||
|
||||
closeQuietly(connection);
|
||||
}
|
||||
|
||||
if (pauseAndRecover)
|
||||
{
|
||||
pauseAndRecoverJob(dataSource);
|
||||
logger.info("Resuming the job on primary table " + primaryTableName + " picking up after id " + primaryId);
|
||||
processBatch(primaryTableName, primaryColumnName, primaryWhereClause, primaryId, tableColumn, tableUpperLimits,
|
||||
optionalWhereClauses);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String createPreparedSelectStatement(String tableName, String columnName, String whereClause)
|
||||
{
|
||||
StringBuilder sqlBuilder = new StringBuilder("SELECT " + columnName + " FROM " + tableName + " WHERE ");
|
||||
|
||||
if (whereClause != null && !whereClause.isEmpty())
|
||||
{
|
||||
sqlBuilder.append(whereClause + " AND ");
|
||||
}
|
||||
|
||||
sqlBuilder.append(
|
||||
columnName + " >= ? AND " + columnName + " <= ? GROUP BY " + columnName + " ORDER BY " + columnName + " ASC ");
|
||||
|
||||
if (dialect instanceof MySQLInnoDBDialect)
|
||||
{
|
||||
sqlBuilder.append(" LIMIT " + batchSize);
|
||||
}
|
||||
else
|
||||
{
|
||||
sqlBuilder.append(" OFFSET 0 ROWS FETCH FIRST " + batchSize + " ROWS ONLY");
|
||||
}
|
||||
|
||||
return sqlBuilder.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Long processPrimaryTableResultSet(PreparedStatement primaryPrepStmt, PreparedStatement[] secondaryPrepStmts,
|
||||
PreparedStatement deletePrepStmt, Set<Long> deleteIds, String primaryTableName, String primaryColumnName,
|
||||
Pair<String, String>[] tableColumn) throws SQLException
|
||||
{
|
||||
Long primaryId = null;
|
||||
Long minSecId = 0L;
|
||||
Long maxSecId = 0L;
|
||||
|
||||
// Set all rows retrieved from the primary table as our potential ids to delete
|
||||
Set<Long> potentialIdsToDelete = new HashSet<Long>();
|
||||
Long minPotentialId = 0L;
|
||||
Long maxPotentialId = 0L;
|
||||
try (ResultSet resultSet = primaryPrepStmt.getResultSet())
|
||||
{
|
||||
while (resultSet.next())
|
||||
{
|
||||
primaryId = resultSet.getLong(primaryColumnName);
|
||||
potentialIdsToDelete.add(primaryId);
|
||||
|
||||
minPotentialId = (minPotentialId == 0L || primaryId < minPotentialId) ? primaryId : minPotentialId;
|
||||
maxPotentialId = primaryId > maxPotentialId ? primaryId : maxPotentialId;
|
||||
}
|
||||
}
|
||||
|
||||
if (potentialIdsToDelete.size() == 0)
|
||||
{
|
||||
// Nothing more to do
|
||||
return primaryId;
|
||||
}
|
||||
|
||||
int rowsInBatch = potentialIdsToDelete.size();
|
||||
processedCounter = processedCounter + rowsInBatch;
|
||||
|
||||
// Get a combined list of the ids present in the secondary tables
|
||||
SecondaryResultsInfo secondaryResultsInfo = getSecondaryResults(secondaryPrepStmts, tableColumn, minPotentialId,
|
||||
maxPotentialId);
|
||||
|
||||
Set<Long> secondaryResults = secondaryResultsInfo.getValues();
|
||||
|
||||
if (secondaryResultsInfo.getSize() > 0)
|
||||
{
|
||||
minSecId = secondaryResultsInfo.getMinValue();
|
||||
maxSecId = secondaryResultsInfo.getMaxValue();
|
||||
|
||||
// From our potentialIdsToDelete list, remove all non-eligible ids: any id that is in a secondary table or
|
||||
// any ID past the last ID we were able to access in the secondary tables (maxSecId)
|
||||
Iterator<Long> it = potentialIdsToDelete.iterator();
|
||||
while (it.hasNext())
|
||||
{
|
||||
Long id = it.next();
|
||||
if (id > maxSecId || secondaryResults.contains(id))
|
||||
{
|
||||
it.remove();
|
||||
}
|
||||
}
|
||||
|
||||
// The next starting primary ID for the next batch will either be the next last id evaluated from the
|
||||
// primary table or, in case the secondary queries did not get that far, the last secondary table id
|
||||
// evaluated (maxSecId)
|
||||
primaryId = primaryId < maxSecId ? primaryId : maxSecId;
|
||||
}
|
||||
|
||||
// Delete the ids that are eligble from the primary table
|
||||
if (potentialIdsToDelete.size() > 0)
|
||||
{
|
||||
deleteInBatches(potentialIdsToDelete, deleteIds, primaryTableName, deletePrepStmt);
|
||||
}
|
||||
|
||||
if (logger.isTraceEnabled())
|
||||
{
|
||||
logger.trace("Rows processed " + rowsInBatch + " from primary table " + primaryTableName + ". Primary: ["
|
||||
+ minPotentialId + "," + maxPotentialId + "] Secondary rows processed: " + secondaryResultsInfo.getSize()
|
||||
+ " [" + minSecId + "," + maxSecId + "] Total Deleted: " + deletedCount);
|
||||
}
|
||||
|
||||
// If the total rows processed from all batches so far is greater that the defined pauseAndRecoverBatchSize,
|
||||
// mark the job to pause and recover after completing this batch
|
||||
if (processedCounter >= pauseAndRecoverBatchSize)
|
||||
{
|
||||
pauseAndRecover = true;
|
||||
}
|
||||
|
||||
// Return the last primary id processed for the next batch
|
||||
return primaryId;
|
||||
}
|
||||
|
||||
private void deleteInBatches(Set<Long> potentialIdsToDelete, Set<Long> deleteIds, String primaryTableName,
|
||||
PreparedStatement deletePrepStmt) throws SQLException
|
||||
{
|
||||
Iterator<Long> potentialIdsIt = potentialIdsToDelete.iterator();
|
||||
while (potentialIdsIt.hasNext())
|
||||
{
|
||||
Long idToDelete = (Long) potentialIdsIt.next();
|
||||
deleteIds.add(idToDelete);
|
||||
|
||||
if (deleteIds.size() == deleteBatchSize)
|
||||
{
|
||||
deleteFromPrimaryTable(deletePrepStmt, deleteIds, primaryTableName);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Get a combined list of the ids present in all the secondary tables
|
||||
*/
|
||||
private SecondaryResultsInfo getSecondaryResults(PreparedStatement[] preparedStatements, Pair<String, String>[] tableColumn,
|
||||
Long minPotentialId, Long maxPotentialId) throws SQLException
|
||||
{
|
||||
Set<Long> secondaryResultValues = new HashSet<Long>();
|
||||
Long lowestUpperValue = 0L;
|
||||
for (int i = 1; i < preparedStatements.length; i++)
|
||||
{
|
||||
String columnId = tableColumn[i].getSecond();
|
||||
PreparedStatement secStmt = preparedStatements[i];
|
||||
secStmt.setLong(1, minPotentialId);
|
||||
secStmt.setLong(2, maxPotentialId);
|
||||
|
||||
// Execute the query on each secondary table
|
||||
boolean secHasResults = secStmt.execute();
|
||||
if (secHasResults)
|
||||
{
|
||||
try (ResultSet secResultSet = secStmt.getResultSet())
|
||||
{
|
||||
Long thisId = 0L;
|
||||
Long resultSize = 0L;
|
||||
Long upperValue = 0L;
|
||||
while (secResultSet.next())
|
||||
{
|
||||
resultSize++;
|
||||
thisId = secResultSet.getLong(columnId);
|
||||
|
||||
// Add to the list if it's not there yet
|
||||
if (!secondaryResultValues.contains(thisId))
|
||||
{
|
||||
secondaryResultValues.add(thisId);
|
||||
}
|
||||
|
||||
upperValue = thisId > upperValue ? thisId : upperValue;
|
||||
}
|
||||
|
||||
// Set the upper min value. We need to gather the last ID processed, so on the next batch on the
|
||||
// primary table we can resume from there. We only need to do this if the number of results of the
|
||||
// secondary table matches the batch size (if not, it means that there aren't more results up to
|
||||
// maxPotentialId). Example on why this is needed: Primary table batch has 100k results from id's 1
|
||||
// to 250000. Secondary table on that interval returns 100k results from id 3 to 210000. Next batch
|
||||
// needs to start on id 210001
|
||||
if (upperValue > 0 && resultSize == batchSize)
|
||||
{
|
||||
lowestUpperValue = (lowestUpperValue == 0L || upperValue < lowestUpperValue) ? upperValue
|
||||
: lowestUpperValue;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If lowestUpperValue is still 0 (because a secondary table never had more or the same number of results as the
|
||||
// primary table), the next id should be the last max id evaluated from the primary table (maxPotentialId)
|
||||
lowestUpperValue = lowestUpperValue == 0 ? maxPotentialId : lowestUpperValue;
|
||||
|
||||
// Remove all values after the lower upper value of a secondary table
|
||||
long minSecId = 0L;
|
||||
Iterator<Long> it = secondaryResultValues.iterator();
|
||||
while (it.hasNext())
|
||||
{
|
||||
long secondaryId = it.next();
|
||||
if (secondaryId > lowestUpperValue)
|
||||
{
|
||||
it.remove();
|
||||
}
|
||||
else
|
||||
{
|
||||
minSecId = (minSecId == 0L || secondaryId < minSecId) ? secondaryId : minSecId;
|
||||
}
|
||||
}
|
||||
|
||||
// Return a combined list of the ids present in all the secondary tables
|
||||
return new SecondaryResultsInfo(secondaryResultValues, minSecId, lowestUpperValue);
|
||||
}
|
||||
|
||||
private class SecondaryResultsInfo
|
||||
{
|
||||
Set<Long> values;
|
||||
long minValue;
|
||||
long maxValue;
|
||||
long size;
|
||||
|
||||
public SecondaryResultsInfo(Set<Long> values, long minValue, long maxValue)
|
||||
{
|
||||
super();
|
||||
this.values = values;
|
||||
this.minValue = minValue;
|
||||
this.maxValue = maxValue;
|
||||
this.size = values.size();
|
||||
}
|
||||
|
||||
public Set<Long> getValues()
|
||||
{
|
||||
return values;
|
||||
}
|
||||
|
||||
public long getMinValue()
|
||||
{
|
||||
return minValue;
|
||||
}
|
||||
|
||||
public long getMaxValue()
|
||||
{
|
||||
return maxValue;
|
||||
}
|
||||
|
||||
public long getSize()
|
||||
{
|
||||
return size;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Sleep for {pauseAndRecoverTime} before opening a new connection and continue to process new batches
|
||||
*/
|
||||
private void pauseAndRecoverJob(DataSource dataSource) throws SQLException
|
||||
{
|
||||
if (logger.isDebugEnabled())
|
||||
{
|
||||
logger.debug("Reached batch size for pause and recovery. Job will resume in " + pauseAndRecoverTime + " ms");
|
||||
}
|
||||
// Wait
|
||||
try
|
||||
{
|
||||
Thread.sleep(pauseAndRecoverTime);
|
||||
}
|
||||
catch (InterruptedException e)
|
||||
{
|
||||
// Do nothing
|
||||
}
|
||||
// Start another connection and continue where we left off
|
||||
connection = dataSource.getConnection();
|
||||
pauseAndRecover = false;
|
||||
processedCounter = 0;
|
||||
}
|
||||
|
||||
protected void closeQuietly(Connection connection)
|
||||
{
|
||||
try
|
||||
{
|
||||
connection.close();
|
||||
}
|
||||
catch (SQLException e)
|
||||
{
|
||||
// Do nothing
|
||||
}
|
||||
finally
|
||||
{
|
||||
connection = null;
|
||||
}
|
||||
}
|
||||
}
|
@@ -196,7 +196,6 @@ public class MySQLDeleteNotExistsExecutor extends DeleteNotExistsExecutor
|
||||
if (deleteIds.size() == deleteBatchSize)
|
||||
{
|
||||
deleteFromPrimaryTable(deletePrepStmt, deleteIds, primaryTableName);
|
||||
connection.commit();
|
||||
}
|
||||
|
||||
if (!resultSet.next())
|
||||
@@ -208,13 +207,13 @@ public class MySQLDeleteNotExistsExecutor extends DeleteNotExistsExecutor
|
||||
primaryId = resultSet.getLong(primaryColumnName);
|
||||
}
|
||||
|
||||
if (logger.isTraceEnabled())
|
||||
{
|
||||
logger.trace("RowsProcessed " + rowsProcessed + " from primary table " + primaryTableName);
|
||||
}
|
||||
|
||||
updateSecondaryIds(primaryId, secondaryIds, secondaryPrepStmts, secondaryOffsets, secondaryResultSets, tableColumn);
|
||||
}
|
||||
|
||||
if (logger.isTraceEnabled())
|
||||
{
|
||||
logger.trace("RowsProcessed " + rowsProcessed + " from primary table " + primaryTableName);
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
|
@@ -274,6 +274,8 @@ public class ScriptExecutorImpl implements ScriptExecutor
|
||||
|
||||
while(true)
|
||||
{
|
||||
connection = refreshConnection(connection);
|
||||
|
||||
String sqlOriginal = reader.readLine();
|
||||
line++;
|
||||
|
||||
@@ -348,6 +350,24 @@ public class ScriptExecutorImpl implements ScriptExecutor
|
||||
}
|
||||
continue;
|
||||
}
|
||||
else if (sql.startsWith("--DELETE_NOT_EXISTS_V3"))
|
||||
{
|
||||
DeleteNotExistsV3Executor deleteNotExistsFiltered = createDeleteNotExistV3Executor(dialect, connection, sql,
|
||||
line, scriptFile);
|
||||
deleteNotExistsFiltered.execute();
|
||||
|
||||
// Reset
|
||||
sb.setLength(0);
|
||||
fetchVarName = null;
|
||||
fetchColumnName = null;
|
||||
defaultFetchValue = null;
|
||||
batchTableName = null;
|
||||
doBatch = false;
|
||||
batchUpperLimit = 0;
|
||||
batchSize = 1;
|
||||
|
||||
continue;
|
||||
}
|
||||
else if (sql.startsWith("--DELETE_NOT_EXISTS"))
|
||||
{
|
||||
DeleteNotExistsExecutor deleteNotExists = createDeleteNotExistsExecutor(dialect, connection, sql, line, scriptFile);
|
||||
@@ -538,6 +558,17 @@ public class ScriptExecutorImpl implements ScriptExecutor
|
||||
}
|
||||
}
|
||||
|
||||
private Connection refreshConnection(Connection connection) throws SQLException
|
||||
{
|
||||
if (connection == null || connection.isClosed())
|
||||
{
|
||||
connection = dataSource.getConnection();
|
||||
connection.setAutoCommit(true);
|
||||
}
|
||||
|
||||
return connection;
|
||||
}
|
||||
|
||||
private DeleteNotExistsExecutor createDeleteNotExistsExecutor(Dialect dialect, Connection connection, String sql, int line, File scriptFile)
|
||||
{
|
||||
if (dialect instanceof MySQLInnoDBDialect)
|
||||
@@ -548,6 +579,12 @@ public class ScriptExecutorImpl implements ScriptExecutor
|
||||
return new DeleteNotExistsExecutor(connection, sql, line, scriptFile, globalProperties);
|
||||
}
|
||||
|
||||
private DeleteNotExistsV3Executor createDeleteNotExistV3Executor(Dialect dialect, Connection connection, String sql, int line,
|
||||
File scriptFile)
|
||||
{
|
||||
return new DeleteNotExistsV3Executor(dialect, connection, sql, line, scriptFile, globalProperties, dataSource);
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute the given SQL statement, absorbing exceptions that we expect during
|
||||
* schema creation or upgrade.
|
||||
|
@@ -0,0 +1,9 @@
|
||||
--DELETE_NOT_EXISTS_V3 alf_prop_root.id,alf_audit_app.disabled_paths_id,alf_audit_entry.audit_values_id,alf_prop_unique_ctx.prop1_id system.delete_not_exists.batchsize
|
||||
|
||||
--DELETE_NOT_EXISTS_V3 alf_prop_value.id,alf_audit_app.app_name_id,alf_audit_entry.audit_user_id,alf_prop_link.key_prop_id,alf_prop_link.value_prop_id,alf_prop_unique_ctx.value1_prop_id,alf_prop_unique_ctx.value2_prop_id,alf_prop_unique_ctx.value3_prop_id system.delete_not_exists.batchsize
|
||||
|
||||
--DELETE_NOT_EXISTS_V3 alf_prop_string_value.id,alf_prop_value.long_value."persisted_type in (3, 5, 6)",alf_audit_app.app_name_id,alf_audit_entry.audit_user_id,alf_prop_link.key_prop_id,alf_prop_link.value_prop_id,alf_prop_unique_ctx.value1_prop_id,alf_prop_unique_ctx.value2_prop_id,alf_prop_unique_ctx.value3_prop_id system.delete_not_exists.batchsize
|
||||
|
||||
--DELETE_NOT_EXISTS_V3 alf_prop_serializable_value.id,alf_prop_value.long_value.persisted_type=4,alf_audit_app.app_name_id,alf_audit_entry.audit_user_id,alf_prop_link.key_prop_id,alf_prop_link.value_prop_id,alf_prop_unique_ctx.value1_prop_id,alf_prop_unique_ctx.value2_prop_id,alf_prop_unique_ctx.value3_prop_id system.delete_not_exists.batchsize
|
||||
|
||||
--DELETE_NOT_EXISTS_V3 alf_prop_double_value.id,alf_prop_value.long_value.persisted_type=2,alf_audit_app.app_name_id,alf_audit_entry.audit_user_id,alf_prop_link.key_prop_id,alf_prop_link.value_prop_id,alf_prop_unique_ctx.value1_prop_id,alf_prop_unique_ctx.value2_prop_id,alf_prop_unique_ctx.value3_prop_id system.delete_not_exists.batchsize
|
@@ -0,0 +1,9 @@
|
||||
--DELETE_NOT_EXISTS_V3 alf_prop_root.id,alf_audit_app.disabled_paths_id,alf_audit_entry.audit_values_id,alf_prop_unique_ctx.prop1_id system.delete_not_exists.batchsize
|
||||
|
||||
--DELETE_NOT_EXISTS_V3 alf_prop_value.id,alf_audit_app.app_name_id,alf_audit_entry.audit_user_id,alf_prop_link.key_prop_id,alf_prop_link.value_prop_id,alf_prop_unique_ctx.value1_prop_id,alf_prop_unique_ctx.value2_prop_id,alf_prop_unique_ctx.value3_prop_id system.delete_not_exists.batchsize
|
||||
|
||||
--DELETE_NOT_EXISTS_V3 alf_prop_string_value.id,alf_prop_value.long_value."persisted_type in (3, 5, 6)",alf_audit_app.app_name_id,alf_audit_entry.audit_user_id,alf_prop_link.key_prop_id,alf_prop_link.value_prop_id,alf_prop_unique_ctx.value1_prop_id,alf_prop_unique_ctx.value2_prop_id,alf_prop_unique_ctx.value3_prop_id system.delete_not_exists.batchsize
|
||||
|
||||
--DELETE_NOT_EXISTS_V3 alf_prop_serializable_value.id,alf_prop_value.long_value.persisted_type=4,alf_audit_app.app_name_id,alf_audit_entry.audit_user_id,alf_prop_link.key_prop_id,alf_prop_link.value_prop_id,alf_prop_unique_ctx.value1_prop_id,alf_prop_unique_ctx.value2_prop_id,alf_prop_unique_ctx.value3_prop_id system.delete_not_exists.batchsize
|
||||
|
||||
--DELETE_NOT_EXISTS_V3 alf_prop_double_value.id,alf_prop_value.long_value.persisted_type=2,alf_audit_app.app_name_id,alf_audit_entry.audit_user_id,alf_prop_link.key_prop_id,alf_prop_link.value_prop_id,alf_prop_unique_ctx.value1_prop_id,alf_prop_unique_ctx.value2_prop_id,alf_prop_unique_ctx.value3_prop_id system.delete_not_exists.batchsize
|
@@ -1246,6 +1246,12 @@ system.delete_not_exists.read_only=false
|
||||
system.delete_not_exists.timeout_seconds=-1
|
||||
system.prop_table_cleaner.algorithm=V2
|
||||
|
||||
#Aditional options for algorithm V3
|
||||
#After how many rows processed do we pause the job to allow the DB to recover
|
||||
system.delete_not_exists.pauseAndRecoverBatchSize=500000
|
||||
#Duration of the pause in milliseconds (default 10s)
|
||||
system.delete_not_exists.pauseAndRecoverTime=10000
|
||||
|
||||
# --Node cleanup batch - default settings
|
||||
system.node_cleanup.delete_batchSize=1000
|
||||
system.node_table_cleaner.algorithm=V1
|
||||
|
@@ -88,6 +88,7 @@ import org.junit.runners.Suite;
|
||||
org.alfresco.repo.security.person.GetPeopleCannedQueryTest.class,
|
||||
|
||||
org.alfresco.repo.domain.schema.script.DeleteNotExistsExecutorTest.class,
|
||||
org.alfresco.repo.domain.schema.script.DeleteNotExistsV3ExecutorTest.class,
|
||||
org.alfresco.repo.node.cleanup.DeletedNodeBatchCleanupTest.class
|
||||
})
|
||||
public class AllDBTestsTestSuite
|
||||
|
@@ -0,0 +1,201 @@
|
||||
/*
|
||||
* #%L
|
||||
* Alfresco Repository
|
||||
* %%
|
||||
* Copyright (C) 2005 - 2022 Alfresco Software Limited
|
||||
* %%
|
||||
* This file is part of the Alfresco software.
|
||||
* If the software was purchased under a paid Alfresco license, the terms of
|
||||
* the paid license agreement will prevail. Otherwise, the software is
|
||||
* provided under the following open source license terms:
|
||||
*
|
||||
* Alfresco is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Lesser General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Alfresco is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Lesser General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Lesser General Public License
|
||||
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
|
||||
* #L%
|
||||
*/
|
||||
package org.alfresco.repo.domain.schema.script;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.File;
|
||||
import java.sql.Connection;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
||||
import javax.sql.DataSource;
|
||||
|
||||
import org.alfresco.repo.domain.dialect.Dialect;
|
||||
import org.alfresco.util.ApplicationContextHelper;
|
||||
import org.alfresco.util.testing.category.DBTests;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.mockito.Mockito;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
|
||||
/**
|
||||
* Integration tests for the {@link DeleteNotExistsV3Executor} class.
|
||||
*
|
||||
* @author Eva Vasques
|
||||
*/
|
||||
@Category({DBTests.class})
|
||||
public class DeleteNotExistsV3ExecutorTest
|
||||
{
|
||||
private static ApplicationContext ctx;
|
||||
private ScriptExecutor scriptExecutor;
|
||||
private DataSource dataSource;
|
||||
private Dialect dialect;
|
||||
private JdbcTemplate jdbcTmpl;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass()
|
||||
{
|
||||
String[] config = new String[] { "classpath:alfresco/application-context.xml", "classpath:scriptexec/script-exec-test.xml" };
|
||||
ctx = ApplicationContextHelper.getApplicationContext(config);
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception
|
||||
{
|
||||
scriptExecutor = ctx.getBean("simpleScriptExecutor", ScriptExecutorImpl.class);
|
||||
dataSource = ctx.getBean("dataSource", DataSource.class);
|
||||
dialect = ctx.getBean("dialect", Dialect.class);
|
||||
jdbcTmpl = new JdbcTemplate(dataSource);
|
||||
}
|
||||
|
||||
private DeleteNotExistsV3Executor createDeleteNotExistsV3Executor(Dialect dialect, Connection connection, String sql, int line, File scriptFile, Properties properties)
|
||||
{
|
||||
return new DeleteNotExistsV3Executor(dialect, connection, sql, line, scriptFile, properties, dataSource);
|
||||
}
|
||||
|
||||
@Test()
|
||||
public void testDefaultBehaviour() throws Exception
|
||||
{
|
||||
scriptExecutor.executeScriptUrl("scriptexec/${db.script.dialect}/delete-not-exists/test-data1.sql");
|
||||
|
||||
String sql = "--DELETE_NOT_EXISTS_V3 temp_tst_tbl_1.id,temp_tst_tbl_2.tbl_2_id,temp_tst_tbl_3.tbl_3_id,temp_tst_tbl_4.tbl_4_id system.delete_not_exists.batchsize";
|
||||
int line = 1;
|
||||
File scriptFile = Mockito.mock(File.class);
|
||||
Properties properties = Mockito.mock(Properties.class);
|
||||
|
||||
String select = "select id from temp_tst_tbl_1 order by id ASC";
|
||||
|
||||
try (Connection connection = dataSource.getConnection())
|
||||
{
|
||||
connection.setAutoCommit(true);
|
||||
|
||||
// Test read only
|
||||
{
|
||||
when(properties.getProperty(DeleteNotExistsV3Executor.PROPERTY_READ_ONLY)).thenReturn("true");
|
||||
when(properties.getProperty(DeleteNotExistsV3Executor.PROPERTY_TIMEOUT_SECONDS)).thenReturn("-1");
|
||||
DeleteNotExistsV3Executor DeleteNotExistsV3Executor = createDeleteNotExistsV3Executor(dialect, connection, sql, line, scriptFile, properties);
|
||||
DeleteNotExistsV3Executor.execute();
|
||||
|
||||
List<String> res = jdbcTmpl.queryForList(select, String.class);
|
||||
assertEquals(7, res.size());
|
||||
}
|
||||
}
|
||||
|
||||
try (Connection connection = dataSource.getConnection())
|
||||
{
|
||||
connection.setAutoCommit(true);
|
||||
|
||||
// Test with delete
|
||||
{
|
||||
when(properties.getProperty(DeleteNotExistsV3Executor.PROPERTY_READ_ONLY)).thenReturn("false");
|
||||
when(properties.getProperty(DeleteNotExistsV3Executor.PROPERTY_TIMEOUT_SECONDS)).thenReturn("-1");
|
||||
DeleteNotExistsV3Executor DeleteNotExistsV3Executor = createDeleteNotExistsV3Executor(dialect, connection, sql, line, scriptFile, properties);
|
||||
DeleteNotExistsV3Executor.execute();
|
||||
|
||||
List<String> res = jdbcTmpl.queryForList(select, String.class);
|
||||
assertEquals(5, res.size());
|
||||
|
||||
assertEquals("1", res.get(0));
|
||||
assertEquals("2", res.get(1));
|
||||
assertEquals("4", res.get(2));
|
||||
assertEquals("10", res.get(3));
|
||||
assertEquals("11", res.get(4));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test()
|
||||
public void testDeleteBatch() throws Exception
|
||||
{
|
||||
scriptExecutor.executeScriptUrl("scriptexec/${db.script.dialect}/delete-not-exists/test-data1.sql");
|
||||
|
||||
String sql = "--DELETE_NOT_EXISTS_V3 temp_tst_tbl_1.id,temp_tst_tbl_2.tbl_2_id,temp_tst_tbl_3.tbl_3_id,temp_tst_tbl_4.tbl_4_id system.delete_not_exists.batchsize";
|
||||
int line = 1;
|
||||
File scriptFile = Mockito.mock(File.class);
|
||||
Properties properties = Mockito.mock(Properties.class);
|
||||
|
||||
String select = "select id from temp_tst_tbl_1 order by id ASC";
|
||||
|
||||
try (Connection connection = dataSource.getConnection())
|
||||
{
|
||||
connection.setAutoCommit(true);
|
||||
{
|
||||
when(properties.getProperty(DeleteNotExistsV3Executor.PROPERTY_DELETE_BATCH_SIZE)).thenReturn("1");
|
||||
when(properties.getProperty(DeleteNotExistsV3Executor.PROPERTY_READ_ONLY)).thenReturn("false");
|
||||
DeleteNotExistsV3Executor DeleteNotExistsV3Executor = createDeleteNotExistsV3Executor(dialect, connection, sql, line, scriptFile, properties);
|
||||
DeleteNotExistsV3Executor.execute();
|
||||
|
||||
List<String> res = jdbcTmpl.queryForList(select, String.class);
|
||||
assertEquals(5, res.size());
|
||||
|
||||
assertEquals("1", res.get(0));
|
||||
assertEquals("2", res.get(1));
|
||||
assertEquals("4", res.get(2));
|
||||
assertEquals("10", res.get(3));
|
||||
assertEquals("11", res.get(4));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test()
|
||||
public void testBatchExecute() throws Exception
|
||||
{
|
||||
scriptExecutor.executeScriptUrl("scriptexec/${db.script.dialect}/delete-not-exists/test-data1.sql");
|
||||
|
||||
String sql = "--DELETE_NOT_EXISTS_V3 temp_tst_tbl_1.id,temp_tst_tbl_2.tbl_2_id,temp_tst_tbl_3.tbl_3_id,temp_tst_tbl_4.tbl_4_id system.delete_not_exists.batchsize";
|
||||
int line = 1;
|
||||
File scriptFile = Mockito.mock(File.class);
|
||||
Properties properties = Mockito.mock(Properties.class);
|
||||
|
||||
String select = "select id from temp_tst_tbl_1 order by id ASC";
|
||||
|
||||
try (Connection connection = dataSource.getConnection())
|
||||
{
|
||||
connection.setAutoCommit(true);
|
||||
{
|
||||
when(properties.getProperty(DeleteNotExistsV3Executor.PROPERTY_BATCH_SIZE)).thenReturn("2");
|
||||
when(properties.getProperty(DeleteNotExistsV3Executor.PROPERTY_READ_ONLY)).thenReturn("false");
|
||||
when(properties.getProperty(DeleteNotExistsV3Executor.PROPERTY_TIMEOUT_SECONDS)).thenReturn("-1");
|
||||
DeleteNotExistsV3Executor DeleteNotExistsV3Executor = createDeleteNotExistsV3Executor(dialect, connection, sql, line, scriptFile, properties);
|
||||
DeleteNotExistsV3Executor.execute();
|
||||
|
||||
List<String> res = jdbcTmpl.queryForList(select, String.class);
|
||||
assertEquals(5, res.size());
|
||||
|
||||
assertEquals("1", res.get(0));
|
||||
assertEquals("2", res.get(1));
|
||||
assertEquals("4", res.get(2));
|
||||
assertEquals("10", res.get(3));
|
||||
assertEquals("11", res.get(4));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user