[MNT-23509] Add lock callback to refreshLock method (#2030)

* [MNT-23509] Add lock callback to refreshLock method

* [MNT-23509] Added unit test

* [MNT-23509] PMD scan improvements
This commit is contained in:
tiagosalvado10
2023-08-16 17:34:54 +01:00
committed by GitHub
parent 653fb08a3a
commit 27186a56b8
6 changed files with 527 additions and 37 deletions

View File

@@ -25,16 +25,19 @@
*/ */
package org.alfresco.schedule; package org.alfresco.schedule;
import java.util.concurrent.atomic.AtomicBoolean;
import org.alfresco.repo.lock.JobLockService; import org.alfresco.repo.lock.JobLockService;
import org.alfresco.repo.lock.JobLockService.JobLockRefreshCallback;
import org.alfresco.repo.lock.LockAcquisitionException; import org.alfresco.repo.lock.LockAcquisitionException;
import org.alfresco.service.namespace.NamespaceService; import org.alfresco.service.namespace.NamespaceService;
import org.alfresco.service.namespace.QName; import org.alfresco.service.namespace.QName;
import org.alfresco.util.Pair; import org.alfresco.util.Pair;
import org.alfresco.util.VmShutdownListener.VmShutdownException; import org.alfresco.util.VmShutdownListener.VmShutdownException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.quartz.JobExecutionContext; import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException; import org.quartz.JobExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* This class encapsulates the {@link org.alfresco.repo.lock.JobLockService JobLockService} * This class encapsulates the {@link org.alfresco.repo.lock.JobLockService JobLockService}
@@ -56,7 +59,7 @@ public class ScheduledJobLockExecuter
{ {
private static final long LOCK_TTL = 30000L; private static final long LOCK_TTL = 30000L;
private static Log logger = LogFactory.getLog(ScheduledJobLockExecuter.class.getName()); private static final Logger LOGGER = LoggerFactory.getLogger(ScheduledJobLockExecuter.class);
private static ThreadLocal<Pair<Long, String>> lockThreadLocal = new ThreadLocal<Pair<Long, String>>(); private static ThreadLocal<Pair<Long, String>> lockThreadLocal = new ThreadLocal<Pair<Long, String>>();
private final JobLockService jobLockService; private final JobLockService jobLockService;
@@ -84,51 +87,42 @@ public class ScheduledJobLockExecuter
*/ */
public void execute(JobExecutionContext jobContext) throws JobExecutionException public void execute(JobExecutionContext jobContext) throws JobExecutionException
{ {
LockCallback lockCallback = new LockCallback();
String lockName = lockQName.getLocalName();
try try
{ {
if (logger.isDebugEnabled()) LOGGER.debug(" Job {} started.", lockName);
{ refreshLock(lockCallback);
logger.debug(String.format(" Job %s started.", lockQName.getLocalName()));
}
refreshLock();
job.executeJob(jobContext); job.executeJob(jobContext);
if (logger.isDebugEnabled()) LOGGER.debug(" Job {} completed.", lockName);
{
logger.debug(String.format(" Job %s completed.", lockQName.getLocalName()));
}
} }
catch (LockAcquisitionException e) catch (LockAcquisitionException e)
{ {
// Job being done by another process // Job being done by another process
if (logger.isDebugEnabled()) LOGGER.debug(" Job {} already underway.", lockName);
{
logger.debug(String.format(" Job %s already underway.", lockQName.getLocalName()));
}
} }
catch (VmShutdownException e) catch (VmShutdownException e)
{ {
// Aborted // Aborted
if (logger.isDebugEnabled()) LOGGER.debug(" Job {} aborted.", lockName);
{
logger.debug(String.format(" Job %s aborted.", lockQName.getLocalName()));
}
} }
finally finally
{ {
releaseLock(); releaseLock(lockCallback);
} }
} }
/** /**
* Lazily update the job lock * Lazily update the job lock
*/ */
private void refreshLock() private void refreshLock(LockCallback lockCallback)
{ {
Pair<Long, String> lockPair = lockThreadLocal.get(); Pair<Long, String> lockPair = lockThreadLocal.get();
if (lockPair == null) if (lockPair == null)
{ {
String lockToken = jobLockService.getLock(lockQName, LOCK_TTL); String lockToken = jobLockService.getLock(lockQName, LOCK_TTL);
Long lastLock = new Long(System.currentTimeMillis()); jobLockService.refreshLock(lockToken, lockQName, LOCK_TTL, lockCallback);
Long lastLock = Long.valueOf(System.currentTimeMillis());
// We have not locked before // We have not locked before
lockPair = new Pair<Long, String>(lastLock, lockToken); lockPair = new Pair<Long, String>(lastLock, lockToken);
lockThreadLocal.set(lockPair); lockThreadLocal.set(lockPair);
@@ -141,7 +135,7 @@ public class ScheduledJobLockExecuter
// Only refresh the lock if we are past a threshold // Only refresh the lock if we are past a threshold
if (now - lastLock > (long) (LOCK_TTL / 2L)) if (now - lastLock > (long) (LOCK_TTL / 2L))
{ {
jobLockService.refreshLock(lockToken, lockQName, LOCK_TTL); jobLockService.refreshLock(lockToken, lockQName, LOCK_TTL, lockCallback);
lastLock = System.currentTimeMillis(); lastLock = System.currentTimeMillis();
lockPair = new Pair<Long, String>(lastLock, lockToken); lockPair = new Pair<Long, String>(lastLock, lockToken);
lockThreadLocal.set(lockPair); lockThreadLocal.set(lockPair);
@@ -152,8 +146,13 @@ public class ScheduledJobLockExecuter
/** /**
* Release the lock after the job completes * Release the lock after the job completes
*/ */
private void releaseLock() private void releaseLock(LockCallback lockCallback)
{ {
if (lockCallback != null)
{
lockCallback.running.set(false);
}
Pair<Long, String> lockPair = lockThreadLocal.get(); Pair<Long, String> lockPair = lockThreadLocal.get();
if (lockPair != null) if (lockPair != null)
{ {
@@ -170,4 +169,22 @@ public class ScheduledJobLockExecuter
} }
// else: We can't release without a token // else: We can't release without a token
} }
private class LockCallback implements JobLockRefreshCallback
{
final AtomicBoolean running = new AtomicBoolean(true);
@Override
public boolean isActive()
{
return running.get();
}
@Override
public void lockReleased()
{
running.set(false);
LOGGER.debug("Lock release notification: {}", lockQName);
}
}
} }

View File

@@ -38,6 +38,15 @@ import org.junit.runners.Suite;
@RunWith(Categories.class) @RunWith(Categories.class)
@Categories.ExcludeCategory({DBTests.class, NonBuildTests.class}) @Categories.ExcludeCategory({DBTests.class, NonBuildTests.class})
@Suite.SuiteClasses({ @Suite.SuiteClasses({
// ----------------------------------------------------------------------
// testScheduleContext [classpath:alfresco/application-context.xml, classpath:alfresco/schedule/test-schedule-context.xml]
//
// This test needs to be first as it will clean nodes from trashcan, if order is changed, then it will take lot of time
// to remove all the nodes from previous tests
// ----------------------------------------------------------------------
org.alfresco.schedule.AbstractScheduledLockedJobTest.class,
// ---------------------------------------------------------------------- // ----------------------------------------------------------------------
// globalIntegrationTestContext [classpath:alfresco/application-context.xml, classpath:alfresco/test/global-integration-test-context.xml] // globalIntegrationTestContext [classpath:alfresco/application-context.xml, classpath:alfresco/test/global-integration-test-context.xml]
// ---------------------------------------------------------------------- // ----------------------------------------------------------------------

View File

@@ -0,0 +1,209 @@
/*
* #%L
* Alfresco Repository
* %%
* Copyright (C) 2005 - 2023 Alfresco Software Limited
* %%
* This file is part of the Alfresco software.
* If the software was purchased under a paid Alfresco license, the terms of
* the paid license agreement will prevail. Otherwise, the software is
* provided under the following open source license terms:
*
* Alfresco is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Alfresco is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
* #L%
*/
package org.alfresco.schedule;
import java.util.UUID;
import org.alfresco.model.ContentModel;
import org.alfresco.repo.model.Repository;
import org.alfresco.repo.security.authentication.AuthenticationUtil;
import org.alfresco.repo.transaction.RetryingTransactionHelper;
import org.alfresco.service.cmr.repository.ChildAssociationRef;
import org.alfresco.service.cmr.repository.NodeRef;
import org.alfresco.service.cmr.repository.NodeService;
import org.alfresco.service.cmr.repository.StoreRef;
import org.alfresco.service.namespace.NamespaceService;
import org.alfresco.service.namespace.QName;
import org.alfresco.service.namespace.RegexQNamePattern;
import org.alfresco.service.transaction.TransactionService;
import org.alfresco.util.BaseSpringTest;
import org.junit.Before;
import org.junit.Test;
import org.quartz.JobDetail;
import org.quartz.SchedulerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.quartz.SchedulerAccessorBean;
import org.springframework.test.context.ContextConfiguration;
import com.google.common.collect.ImmutableMap;
/**
*
* @author Tiago Salvado
*/
@ContextConfiguration({"classpath:alfresco/application-context.xml", "classpath:alfresco/schedule/test-schedule-context.xml"})
public class AbstractScheduledLockedJobTest extends BaseSpringTest
{
private static final int TOTAL_NODES = 9;
private static final int NUM_THREADS = 2;
private static final long JOB_EXECUTER_LOCK_TTL = 30000L;
private static final String ARCHIVE_STORE_URL = "archive://SpacesStore";
private NodeService nodeService;
private TransactionService transactionService;
private Repository repository;
private SchedulerAccessorBean testCleanerAccessor;
private JobDetail testCleanerJobDetail;
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractScheduledLockedJobTest.class);
/**
* Sets services and job beans
*/
@Before
public void setUp()
{
nodeService = (NodeService) applicationContext.getBean("nodeService");
transactionService = (TransactionService) applicationContext.getBean("transactionComponent");
repository = (Repository) applicationContext.getBean("repositoryHelper");
}
@Test
public void test() throws SchedulerException, InterruptedException
{
createAndDeleteNodes(TOTAL_NODES);
assertTrue("Expected nodes haven't been created", getNumberOfNodesInTrashcan() >= TOTAL_NODES);
CleanerThread[] threads = new CleanerThread[NUM_THREADS];
for (int i = 0; i < NUM_THREADS; i++)
{
CleanerThread t = new CleanerThread(i);
threads[i] = t;
t.start();
Thread.sleep(JOB_EXECUTER_LOCK_TTL);
}
for (Thread t : threads)
{
t.join();
}
while (getNumberOfNodesInTrashcan() > 0)
{
Thread.sleep(2000);
}
for (CleanerThread t : threads)
{
if (t.hasErrors())
{
fail("An error has occurred when executing multiple cleaner jobs at the same time");
}
}
}
/**
* Creates and deletes the specified number of nodes.
*
* @param archivedNodes
* Number of nodes to be created and added to trashcan
*/
private void createAndDeleteNodes(int archivedNodes)
{
AuthenticationUtil.runAsSystem(() -> {
RetryingTransactionHelper.RetryingTransactionCallback<Void> txnWork = () -> {
for (int i = 0; i < archivedNodes; i++)
{
addNodeToTrashcan();
}
return null;
};
return transactionService.getRetryingTransactionHelper().doInTransaction(txnWork);
});
}
/**
* Creates and deletes nodes
*/
private void addNodeToTrashcan()
{
NodeRef companyHome = repository.getCompanyHome();
String name = "Sample (" + UUID.randomUUID().toString() + ")";
ChildAssociationRef association = nodeService.createNode(companyHome, ContentModel.ASSOC_CONTAINS,
QName.createQName(NamespaceService.CONTENT_MODEL_PREFIX, name), ContentModel.TYPE_CONTENT,
ImmutableMap.of(ContentModel.PROP_NAME, name));
NodeRef parent = association.getChildRef();
nodeService.deleteNode(parent);
}
/**
* It returns the number of nodes present on trashcan.
*
* @return
*/
private long getNumberOfNodesInTrashcan()
{
StoreRef storeRef = new StoreRef(ARCHIVE_STORE_URL);
NodeRef archiveRoot = nodeService.getRootNode(storeRef);
return nodeService.getChildAssocs(archiveRoot, ContentModel.ASSOC_CHILDREN, RegexQNamePattern.MATCH_ALL).size();
}
/**
* Thread to start the cleaner job for the test.
*/
private class CleanerThread extends Thread
{
private int threadNum;
private boolean started;
private Cleaner testCleaner;
CleanerThread(int threadNum)
{
super(CleanerThread.class.getSimpleName() + "-" + threadNum);
this.threadNum = threadNum;
}
@Override
public void run()
{
try
{
testCleanerAccessor = (SchedulerAccessorBean) applicationContext.getBean("testSchedulerAccessor");
testCleanerJobDetail = (JobDetail) applicationContext.getBean("testCleanerJobDetail");
testCleaner = (Cleaner) testCleanerJobDetail.getJobDataMap().get("testCleaner");
testCleanerAccessor.getScheduler().triggerJob(testCleanerJobDetail.getKey());
LOGGER.info("Thread {} has started", this.threadNum);
this.started = true;
}
catch (SchedulerException e)
{
this.started = false;
}
}
public boolean hasErrors()
{
return !started || testCleaner != null && testCleaner.hasErrors();
}
}
}

View File

@@ -0,0 +1,174 @@
/*
* #%L
* Alfresco Repository
* %%
* Copyright (C) 2005 - 2023 Alfresco Software Limited
* %%
* This file is part of the Alfresco software.
* If the software was purchased under a paid Alfresco license, the terms of
* the paid license agreement will prevail. Otherwise, the software is
* provided under the following open source license terms:
*
* Alfresco is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Alfresco is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
* #L%
*/
package org.alfresco.schedule;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.alfresco.model.ContentModel;
import org.alfresco.repo.security.authentication.AuthenticationUtil;
import org.alfresco.repo.transaction.RetryingTransactionHelper.RetryingTransactionCallback;
import org.alfresco.service.cmr.repository.ChildAssociationRef;
import org.alfresco.service.cmr.repository.InvalidNodeRefException;
import org.alfresco.service.cmr.repository.NodeRef;
import org.alfresco.service.cmr.repository.NodeService;
import org.alfresco.service.cmr.repository.StoreRef;
import org.alfresco.service.namespace.RegexQNamePattern;
import org.alfresco.service.transaction.TransactionService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A test implementation similar to the trash can cleaner job implementation that will be used in
* {@link AbstractScheduledLockedJobTest}
*
* @author Tiago Salvado
*/
public class Cleaner
{
private static final Logger LOGGER = LoggerFactory.getLogger(Cleaner.class);
private final NodeService nodeService;
private final TransactionService transactionService;
private static final String ARCHIVE_STORE_URL = "archive://SpacesStore";
private final int deleteBatchCount;
private List<NodeRef> nodesToClean;
private int numErrors;
private static final int REMOVAL_WAIT_TIME_MS = 5000;
/**
*
* @param nodeService
* @param transactionService
* @param deleteBatchCount
*/
public Cleaner(NodeService nodeService, TransactionService transactionService, int deleteBatchCount)
{
this.nodeService = nodeService;
this.transactionService = transactionService;
this.deleteBatchCount = deleteBatchCount;
}
/**
*
* It deletes the {@link java.util.List List} of {@link org.alfresco.service.cmr.repository.NodeRef NodeRef}
* received as argument.
*
* @param nodes
*
* return The number of deleted nodes
*/
private int deleteNodes(List<NodeRef> nodes)
{
AtomicInteger deletedNodes = new AtomicInteger();
for (NodeRef nodeRef : nodes)
{
// create a new transaction for each deletion so the transactions are smaller and the progress of the
// cleaner is not lost in case of any problems encountered during the job execution
AuthenticationUtil.runAsSystem(() -> {
RetryingTransactionCallback<Void> txnWork = () -> {
try
{
nodeService.deleteNode(nodeRef);
}
catch (InvalidNodeRefException inre)
{
numErrors++;
}
deletedNodes.getAndIncrement();
// Waiting REMOVAL_WAIT_TIME_MS seconds for next deletion so we don't need to have many nodes on the trash can
Thread.sleep(REMOVAL_WAIT_TIME_MS);
return null;
};
return transactionService.getRetryingTransactionHelper().doInTransaction(txnWork, false, true);
});
}
return deletedNodes.get();
}
/**
*
* It returns the {@link java.util.List List} of {@link org.alfresco.service.cmr.repository.NodeRef NodeRef} of the
* archive store set to be deleted according to configuration for <b>deleteBatchCount</b>.
*
* @return
*/
private List<NodeRef> getBatchToDelete()
{
return getChildAssocs().stream().map(ChildAssociationRef::getChildRef).collect(Collectors.toList());
}
/**
*
* It will return the first {@link #deleteBatchCount}
* {@link org.alfresco.service.cmr.repository.ChildAssociationRef}s of type {@link ContentModel}.ASSOC_CHILDREN from
* the archive store set.
*
* @return
*/
private List<ChildAssociationRef> getChildAssocs()
{
StoreRef archiveStore = new StoreRef(ARCHIVE_STORE_URL);
NodeRef archiveRoot = nodeService.getRootNode(archiveStore);
return nodeService.getChildAssocs(archiveRoot, ContentModel.ASSOC_CHILDREN, RegexQNamePattern.MATCH_ALL, deleteBatchCount,
false);
}
/**
*
* The method that will clean the specified <b>archiveStoreUrl</b> to the limits defined by the values set for
* <b>deleteBatchCount</b>.
*/
public void clean()
{
LOGGER.info("Running TestCleaner");
// Retrieve in a new read-only transaction the list of nodes to be deleted by the TestCleaner
AuthenticationUtil.runAsSystem(() -> {
RetryingTransactionCallback<Void> txnWork = () -> {
nodesToClean = getBatchToDelete();
LOGGER.info(String.format("Number of nodes to delete: %s", nodesToClean.size()));
return null;
};
return transactionService.getRetryingTransactionHelper().doInTransaction(txnWork, true, true);
});
int deletedNodes = deleteNodes(nodesToClean);
LOGGER.info("TestCleaner finished. Number of deleted nodes: {}", deletedNodes);
}
public boolean hasErrors()
{
return numErrors > 0;
}
}

View File

@@ -0,0 +1,45 @@
/*
* #%L
* Alfresco Repository
* %%
* Copyright (C) 2005 - 2023 Alfresco Software Limited
* %%
* This file is part of the Alfresco software.
* If the software was purchased under a paid Alfresco license, the terms of
* the paid license agreement will prevail. Otherwise, the software is
* provided under the following open source license terms:
*
* Alfresco is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Alfresco is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
* #L%
*/
package org.alfresco.schedule;
import org.quartz.JobExecutionContext;
/**
* Test job that will execute {@link Cleaner}
*
* @author Tiago Salvado
*
* @see AbstractScheduledLockedJob
*/
public class CleanerJob extends AbstractScheduledLockedJob
{
@Override
public void executeJob(JobExecutionContext jobContext)
{
Cleaner testCleaner = (Cleaner) jobContext.getJobDetail().getJobDataMap().get("testCleaner");
testCleaner.clean();
}
}

View File

@@ -0,0 +1,36 @@
<?xml version='1.0' encoding='UTF-8'?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="testCleaner" class="org.alfresco.schedule.Cleaner">
<constructor-arg ref="nodeService" />
<constructor-arg ref="transactionService" />
<constructor-arg value="100" />
</bean>
<bean id="testSchedulerAccessor" class="org.springframework.scheduling.quartz.SchedulerAccessorBean">
<property name="scheduler" ref="schedulerFactory"/>
<property name="triggers">
<list>
<ref bean="testCleanerTrigger"/>
</list>
</property>
</bean>
<bean id="testCleanerTrigger" class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="cronExpression" value="* * * * * ? 2099"/>
<property name="jobDetail" ref="testCleanerJobDetail"/>
</bean>
<bean id="testCleanerJobDetail" class="org.springframework.scheduling.quartz.JobDetailFactoryBean">
<property name="jobClass" value="org.alfresco.schedule.CleanerJob"/>
<property name="jobDataAsMap">
<map>
<entry key="testCleaner" value-ref="testCleaner" />
<entry key="jobLockService" value-ref="jobLockService" />
</map>
</property>
</bean>
</beans>