MNT-22186: propTablesCleanupJobDetail v2 can cause Out of Memory erro… (#402)

-  MySQLDeleteNotExistsExecutor (by extending DeleteNotExistsExecutor) to cope with MySQLspecific fetch size limitation and restrictions
- updated tests
- moved tests to AllDBTestsTestSuite from AppContextExtraTestSuite
This commit is contained in:
Cristian Turlica
2021-04-23 11:20:48 +03:00
committed by GitHub
parent 9e2a0bd899
commit f201f35ec1
6 changed files with 255 additions and 25 deletions

View File

@@ -69,19 +69,19 @@ public class DeleteNotExistsExecutor implements StatementExecutor
public static final String PROPERTY_READ_ONLY = "system.delete_not_exists.read_only";
public static final String PROPERTY_TIMEOUT_SECONDS = "system.delete_not_exists.timeout_seconds";
private Connection connection;
protected Connection connection;
private String sql;
private int line;
private File scriptFile;
private Properties globalProperties;
private boolean readOnly;
private int deleteBatchSize;
private int batchSize;
protected boolean readOnly;
protected int deleteBatchSize;
protected int batchSize;
private long timeoutSec;
private long deletedCount;
private Date startTime;
protected long deletedCount;
protected Date startTime;
public DeleteNotExistsExecutor(Connection connection, String sql, int line, File scriptFile, Properties globalProperties)
{
@@ -164,7 +164,7 @@ public class DeleteNotExistsExecutor implements StatementExecutor
}
}
private void process(Pair<String, String>[] tableColumn, Long[] tableUpperLimits, String[] optionalWhereClauses) throws SQLException
protected void process(Pair<String, String>[] tableColumn, Long[] tableUpperLimits, String[] optionalWhereClauses) throws SQLException
{
// The approach is to fetch ordered row ids from all referencer/secondary (e.g.
// alf_audit_app, alf_audit_entry, alf_prop_unique_ctx) tables and
@@ -190,6 +190,7 @@ public class DeleteNotExistsExecutor implements StatementExecutor
try
{
connection.setAutoCommit(false);
primaryPrepStmt = connection.prepareStatement(createPreparedSelectStatement(primaryTableName, primaryColumnName, primaryWhereClause));
primaryPrepStmt.setFetchSize(batchSize);
primaryPrepStmt.setLong(1, primaryId);
@@ -264,7 +265,7 @@ public class DeleteNotExistsExecutor implements StatementExecutor
}
}
private boolean isTimeoutExceeded()
protected boolean isTimeoutExceeded()
{
if (timeoutSec <= 0)
{
@@ -275,7 +276,7 @@ public class DeleteNotExistsExecutor implements StatementExecutor
return (now.getTime() > startTime.getTime() + (timeoutSec * 1000));
}
private Long processPrimaryTableResultSet(PreparedStatement primaryPrepStmt, PreparedStatement[] secondaryPrepStmts, PreparedStatement deletePrepStmt, Set<Long> deleteIds, String primaryTableName,
protected Long processPrimaryTableResultSet(PreparedStatement primaryPrepStmt, PreparedStatement[] secondaryPrepStmts, PreparedStatement deletePrepStmt, Set<Long> deleteIds, String primaryTableName,
String primaryColumnName, Pair<String, String>[] tableColumn) throws SQLException
{
int rowsProcessed = 0;
@@ -336,7 +337,7 @@ public class DeleteNotExistsExecutor implements StatementExecutor
return primaryId;
}
private void deleteFromPrimaryTable(PreparedStatement deletePrepStmt, Set<Long> deleteIds, String primaryTableName) throws SQLException
protected void deleteFromPrimaryTable(PreparedStatement deletePrepStmt, Set<Long> deleteIds, String primaryTableName) throws SQLException
{
int deletedBatchCount = deleteIds.size();
if (!readOnly && !deleteIds.isEmpty())
@@ -447,8 +448,8 @@ public class DeleteNotExistsExecutor implements StatementExecutor
return quotedString.replace("\"", "");
}
private String createPreparedSelectStatement(String tableName, String columnName, String whereClause)
protected String createPreparedSelectStatement(String tableName, String columnName, String whereClause)
{
StringBuilder sqlBuilder = new StringBuilder("SELECT " + columnName + " FROM " + tableName + " WHERE ");
@@ -461,7 +462,7 @@ public class DeleteNotExistsExecutor implements StatementExecutor
return sqlBuilder.toString();
}
private String createPreparedDeleteStatement(String tableName, String idColumnName, int deleteBatchSize, String whereClause)
protected String createPreparedDeleteStatement(String tableName, String idColumnName, int deleteBatchSize, String whereClause)
{
StringBuilder stmtBuilder = new StringBuilder("DELETE FROM " + tableName + " WHERE ");
@@ -571,7 +572,7 @@ public class DeleteNotExistsExecutor implements StatementExecutor
}
}
private void closeQuietly(Statement statement)
protected void closeQuietly(Statement statement)
{
if (statement != null)
{
@@ -586,7 +587,7 @@ public class DeleteNotExistsExecutor implements StatementExecutor
}
}
private void closeQuietly(Statement[] statements)
protected void closeQuietly(Statement[] statements)
{
if (statements != null)
{
@@ -597,7 +598,7 @@ public class DeleteNotExistsExecutor implements StatementExecutor
}
}
private void closeQuietly(ResultSet resultSet)
protected void closeQuietly(ResultSet resultSet)
{
if (resultSet != null)
{
@@ -612,7 +613,7 @@ public class DeleteNotExistsExecutor implements StatementExecutor
}
}
private void closeQuietly(ResultSet[] resultSets)
protected void closeQuietly(ResultSet[] resultSets)
{
if (resultSets != null)
{

View File

@@ -0,0 +1,201 @@
/*
* #%L
* Alfresco Repository
* %%
* Copyright (C) 2005 - 2020 Alfresco Software Limited
* %%
* This file is part of the Alfresco software.
* If the software was purchased under a paid Alfresco license, the terms of
* the paid license agreement will prevail. Otherwise, the software is
* provided under the following open source license terms:
*
* Alfresco is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Alfresco is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
* #L%
*/
package org.alfresco.repo.domain.schema.script;
import org.alfresco.util.Pair;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import javax.sql.DataSource;
import java.io.File;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Date;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
/**
* Extends <code>{@link DeleteNotExistsExecutor}</code> to cope with MySQL
* specific fetch size limitation and restrictions.
*/
public class MySQLDeleteNotExistsExecutor extends DeleteNotExistsExecutor
{
private static final Log logger = LogFactory.getLog(MySQLDeleteNotExistsExecutor.class);
private final DataSource dataSource;
public MySQLDeleteNotExistsExecutor(Connection connection, String sql, int line, File scriptFile, Properties globalProperties, DataSource dataSource)
{
super(connection, sql, line, scriptFile, globalProperties);
this.dataSource = dataSource;
}
@Override
protected void process(Pair<String, String>[] tableColumn, Long[] tableUpperLimits, String[] optionalWhereClauses) throws SQLException
{
// The approach is to fetch ordered row ids from all referencer/secondary (e.g.
// alf_audit_app, alf_audit_entry, alf_prop_unique_ctx) tables and
// referenced/primary table (e.g. alf_prop_root) concurrently, so that it is
// possible skip over id gaps efficiently while at the same time being able to
// work out which ids are obsolete and delete them in batches.
// The algorithm can be further improved by iterating over the rows in descending order.
// This is due to the fact that older data should be more stable in time.
String primaryTableName = tableColumn[0].getFirst();
String primaryColumnName = tableColumn[0].getSecond();
String primaryWhereClause = optionalWhereClauses[0];
Long primaryId = 0L;
// There are some caveats with MySQL specific fetch size limitation. You must
// read all of the rows in the result set (or close it) before you can issue any
// other queries on the connection, or an exception will be thrown.
Connection primaryConnection = null;
Connection[] secondaryConnections = null;
PreparedStatement primaryPrepStmt = null;
PreparedStatement[] secondaryPrepStmts = null;
PreparedStatement deletePrepStmt = null;
Set<Long> deleteIds = new HashSet<>();
deletedCount = 0L;
startTime = new Date();
try
{
connection.setAutoCommit(false);
primaryConnection = dataSource.getConnection();
primaryPrepStmt = primaryConnection.prepareStatement(createPreparedSelectStatement(primaryTableName, primaryColumnName, primaryWhereClause));
// Note the MySQL specific fetch size limitation (Integer.MIN_VALUE). fetchSize
// activates result set streaming.
primaryPrepStmt.setFetchSize(Integer.MIN_VALUE);
primaryPrepStmt.setLong(1, primaryId);
primaryPrepStmt.setLong(2, tableUpperLimits[0]);
boolean hasResults = primaryPrepStmt.execute();
if (hasResults)
{
secondaryPrepStmts = new PreparedStatement[tableColumn.length];
secondaryConnections = new Connection[tableColumn.length];
for (int i = 1; i < tableColumn.length; i++)
{
secondaryConnections[i] = dataSource.getConnection();
PreparedStatement secStmt = secondaryConnections[i].prepareStatement(createPreparedSelectStatement(tableColumn[i].getFirst(), tableColumn[i].getSecond(), optionalWhereClauses[i]));
// Note the MySQL specific fetch size limitation (Integer.MIN_VALUE). fetchSize
// activates result set streaming.
secStmt.setFetchSize(Integer.MIN_VALUE);
secStmt.setLong(1, primaryId);
secStmt.setLong(2, tableUpperLimits[i]);
secondaryPrepStmts[i] = secStmt;
}
deletePrepStmt = connection.prepareStatement(createPreparedDeleteStatement(primaryTableName, primaryColumnName, deleteBatchSize, primaryWhereClause));
// Timeout is only checked at each bach start.
// It can be further refined by being verified at each primary row processing.
while (hasResults && !isTimeoutExceeded())
{
// Process batch
primaryId = processPrimaryTableResultSet(primaryPrepStmt, secondaryPrepStmts, deletePrepStmt, deleteIds, primaryTableName, primaryColumnName, tableColumn);
connection.commit();
if (primaryId == null)
{
break;
}
// Prepare for next batch
primaryPrepStmt.setLong(1, primaryId);
primaryPrepStmt.setLong(2, tableUpperLimits[0]);
for (int i = 1; i < tableColumn.length; i++)
{
PreparedStatement secStmt = secondaryPrepStmts[i];
secStmt.setLong(1, primaryId);
secStmt.setLong(2, tableUpperLimits[i]);
}
hasResults = primaryPrepStmt.execute();
}
}
// Check if we have any more ids to delete
if (!deleteIds.isEmpty())
{
deleteFromPrimaryTable(deletePrepStmt, deleteIds, primaryTableName);
connection.commit();
}
if (logger.isDebugEnabled())
{
String msg = ((readOnly) ? "Script would have" : "Script") + " deleted a total of " + deletedCount + " items from table " + primaryTableName + ".";
logger.debug(msg);
}
}
finally
{
closeQuietly(deletePrepStmt);
closeQuietly(secondaryPrepStmts);
closeQuietly(primaryPrepStmt);
closeQuietly(secondaryConnections);
closeQuietly(primaryConnection);
connection.setAutoCommit(true);
}
}
protected void closeQuietly(Connection connection)
{
if (connection != null)
{
try
{
connection.close();
}
catch (Exception e)
{
logger.warn("Error closing DB connection: " + e.getMessage());
}
}
}
protected void closeQuietly(Connection[] connections)
{
if (connections != null)
{
for (Connection connection : connections)
{
closeQuietly(connection);
}
}
}
}

View File

@@ -350,7 +350,7 @@ public class ScriptExecutorImpl implements ScriptExecutor
}
else if (sql.startsWith("--DELETE_NOT_EXISTS"))
{
DeleteNotExistsExecutor deleteNotExists = new DeleteNotExistsExecutor(connection, sql, line, scriptFile, globalProperties);
DeleteNotExistsExecutor deleteNotExists = createDeleteNotExistsExecutor(dialect, connection, sql, line, scriptFile);
deleteNotExists.execute();
// Reset
@@ -537,7 +537,17 @@ public class ScriptExecutorImpl implements ScriptExecutor
try { scriptInputStream.close(); } catch (Throwable e) {}
}
}
private DeleteNotExistsExecutor createDeleteNotExistsExecutor(Dialect dialect, Connection connection, String sql, int line, File scriptFile)
{
if (dialect instanceof MySQLInnoDBDialect)
{
return new MySQLDeleteNotExistsExecutor(connection, sql, line, scriptFile, globalProperties, dataSource);
}
return new DeleteNotExistsExecutor(connection, sql, line, scriptFile, globalProperties);
}
/**
* Execute the given SQL statement, absorbing exceptions that we expect during
* schema creation or upgrade.

View File

@@ -82,7 +82,9 @@ import org.junit.runners.Suite;
org.alfresco.repo.node.db.DbNodeServiceImplTest.class,
org.alfresco.repo.node.cleanup.TransactionCleanupTest.class,
org.alfresco.repo.security.person.GetPeopleCannedQueryTest.class
org.alfresco.repo.security.person.GetPeopleCannedQueryTest.class,
org.alfresco.repo.domain.schema.script.DeleteNotExistsExecutorTest.class
})
public class AllDBTestsTestSuite
{

View File

@@ -75,7 +75,6 @@ import org.junit.runners.Suite;
org.alfresco.repo.site.SiteServiceImplTest.class,
// [classpath:alfresco/application-context.xml, classpath:scriptexec/script-exec-test.xml]
org.alfresco.repo.domain.schema.script.DeleteNotExistsExecutorTest.class,
org.alfresco.repo.domain.schema.script.ScriptExecutorImplIntegrationTest.class,
org.alfresco.repo.domain.schema.script.ScriptBundleExecutorImplIntegrationTest.class,

View File

@@ -35,10 +35,14 @@ import java.util.Properties;
import javax.sql.DataSource;
import org.alfresco.repo.domain.dialect.Dialect;
import org.alfresco.repo.domain.dialect.MySQLInnoDBDialect;
import org.alfresco.util.ApplicationContextHelper;
import org.alfresco.util.testing.category.DBTests;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import org.springframework.context.ApplicationContext;
import org.springframework.jdbc.core.JdbcTemplate;
@@ -48,11 +52,13 @@ import org.springframework.jdbc.core.JdbcTemplate;
*
* @author Cristian Turlica
*/
@Category({DBTests.class})
public class DeleteNotExistsExecutorTest
{
private static ApplicationContext ctx;
private ScriptExecutor scriptExecutor;
private DataSource dataSource;
private Dialect dialect;
private JdbcTemplate jdbcTmpl;
@BeforeClass
@@ -67,9 +73,20 @@ public class DeleteNotExistsExecutorTest
{
scriptExecutor = ctx.getBean("simpleScriptExecutor", ScriptExecutorImpl.class);
dataSource = ctx.getBean("dataSource", DataSource.class);
dialect = ctx.getBean("dialect", Dialect.class);
jdbcTmpl = new JdbcTemplate(dataSource);
}
private DeleteNotExistsExecutor createDeleteNotExistsExecutor(Connection connection, String sql, int line, File scriptFile, Properties properties)
{
if (dialect instanceof MySQLInnoDBDialect)
{
return new MySQLDeleteNotExistsExecutor(connection, sql, line, scriptFile, properties, dataSource);
}
return new DeleteNotExistsExecutor(connection, sql, line, scriptFile, properties);
}
@Test()
public void testDefaultBehaviour() throws Exception
{
@@ -90,7 +107,7 @@ public class DeleteNotExistsExecutorTest
{
when(properties.getProperty(DeleteNotExistsExecutor.PROPERTY_READ_ONLY)).thenReturn("true");
when(properties.getProperty(DeleteNotExistsExecutor.PROPERTY_TIMEOUT_SECONDS)).thenReturn("-1");
DeleteNotExistsExecutor deleteNotExistsExecutor = new DeleteNotExistsExecutor(connection, sql, line, scriptFile, properties);
DeleteNotExistsExecutor deleteNotExistsExecutor = createDeleteNotExistsExecutor(connection, sql, line, scriptFile, properties);
deleteNotExistsExecutor.execute();
List<String> res = jdbcTmpl.queryForList(select, String.class);
@@ -100,7 +117,7 @@ public class DeleteNotExistsExecutorTest
{
when(properties.getProperty(DeleteNotExistsExecutor.PROPERTY_READ_ONLY)).thenReturn("false");
when(properties.getProperty(DeleteNotExistsExecutor.PROPERTY_TIMEOUT_SECONDS)).thenReturn("-1");
DeleteNotExistsExecutor deleteNotExistsExecutor = new DeleteNotExistsExecutor(connection, sql, line, scriptFile, properties);
DeleteNotExistsExecutor deleteNotExistsExecutor = createDeleteNotExistsExecutor(connection, sql, line, scriptFile, properties);
deleteNotExistsExecutor.execute();
List<String> res = jdbcTmpl.queryForList(select, String.class);
@@ -133,7 +150,7 @@ public class DeleteNotExistsExecutorTest
{
when(properties.getProperty(DeleteNotExistsExecutor.PROPERTY_DELETE_BATCH_SIZE)).thenReturn("1");
when(properties.getProperty(DeleteNotExistsExecutor.PROPERTY_READ_ONLY)).thenReturn("false");
DeleteNotExistsExecutor deleteNotExistsExecutor = new DeleteNotExistsExecutor(connection, sql, line, scriptFile, properties);
DeleteNotExistsExecutor deleteNotExistsExecutor = createDeleteNotExistsExecutor(connection, sql, line, scriptFile, properties);
deleteNotExistsExecutor.execute();
List<String> res = jdbcTmpl.queryForList(select, String.class);
@@ -167,7 +184,7 @@ public class DeleteNotExistsExecutorTest
when(properties.getProperty(DeleteNotExistsExecutor.PROPERTY_BATCH_SIZE)).thenReturn("2");
when(properties.getProperty(DeleteNotExistsExecutor.PROPERTY_READ_ONLY)).thenReturn("false");
when(properties.getProperty(DeleteNotExistsExecutor.PROPERTY_TIMEOUT_SECONDS)).thenReturn("-1");
DeleteNotExistsExecutor deleteNotExistsExecutor = new DeleteNotExistsExecutor(connection, sql, line, scriptFile, properties);
DeleteNotExistsExecutor deleteNotExistsExecutor = createDeleteNotExistsExecutor(connection, sql, line, scriptFile, properties);
deleteNotExistsExecutor.execute();
List<String> res = jdbcTmpl.queryForList(select, String.class);