mirror of
https://github.com/Alfresco/alfresco-community-repo.git
synced 2025-08-07 17:49:17 +00:00
ACE-2780: Merged HEAD-BUG-FIX (5.0/Cloud) to HEAD (5.0/Cloud)
85504: Reverse merged HEAD-BUG-FIX (5.0/Cloud) << Build failure was fixed >> 84534: Reverse merged HEAD-BUG-FIX (5.0/Cloud) << Causes build problem with Cloud >> 84123: Merged V4.2-BUG-FIX (4.2.4) to HEAD-BUG-FIX (5.0/Cloud) 83401: Merged V4.1-BUG-FIX (4.1.10) to V4.2-BUG-FIX (4.2.4) 83366: Merged DEV to V4.1-BUG-FIX (4.1.10) 82671 : MNT-12278 : Activity processing is single-threaded - Refactoring for multithreading. 83343 : MNT-12278 : Activity processing is single-threaded 84124: Merged V4.2-BUG-FIX (4.2.4) to HEAD-BUG-FIX (5.0/Cloud) 83678: MNT-12278 : Activity processing is single-threaded - Fixed build failure git-svn-id: https://svn.alfresco.com/repos/alfresco-enterprise/alfresco/HEAD/root@85531 c4b6b30b-aa2e-2d43-bbcb-ca4b014f7261
This commit is contained in:
@@ -90,6 +90,8 @@
|
||||
|
||||
<bean id="feedGenerator" class="org.alfresco.repo.activities.feed.local.LocalFeedGenerator" parent="baseFeedGenerator">
|
||||
<property name="feedTaskProcessor" ref="FeedTaskProcessor"/>
|
||||
<property name="batchSize" value="${activities.feed.generator.batchSize}"/>
|
||||
<property name="numThreads" value="${activities.feed.generator.numThreads}"/>
|
||||
</bean>
|
||||
|
||||
<bean id="feedTaskProcessor" class="org.alfresco.repo.activities.feed.local.LocalFeedTaskProcessor">
|
||||
|
@@ -21,6 +21,8 @@ activities.feed.generator.startDelayMins=${system.cronJob.startDelayMinutes}
|
||||
activities.feed.generator.cronExpression=0/30 * * * * ?
|
||||
activities.feed.generator.maxItemsPerCycle=100
|
||||
activities.feed.generator.enabled=true
|
||||
activities.feed.generator.batchSize=1000
|
||||
activities.feed.generator.numThreads=4
|
||||
|
||||
# activities feed cleaner
|
||||
activities.feed.cleaner.startDelayMins=${system.cronJob.startDelayMinutes}
|
||||
|
@@ -122,6 +122,11 @@ public abstract class AbstractFeedGenerator implements FeedGenerator
|
||||
this.sysAdminParams = sysAdminParams;
|
||||
}
|
||||
|
||||
public TransactionService getTransactionService()
|
||||
{
|
||||
return transactionService;
|
||||
}
|
||||
|
||||
public void setTransactionService(TransactionService transactionService)
|
||||
{
|
||||
this.transactionService = transactionService;
|
||||
|
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright (C) 2005-2011 Alfresco Software Limited.
|
||||
* Copyright (C) 2005-2014 Alfresco Software Limited.
|
||||
*
|
||||
* This file is part of Alfresco
|
||||
*
|
||||
@@ -18,14 +18,21 @@
|
||||
*/
|
||||
package org.alfresco.repo.activities.feed.local;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import org.alfresco.repo.activities.feed.AbstractFeedGenerator;
|
||||
import org.alfresco.repo.activities.feed.FeedGridJob;
|
||||
import org.alfresco.repo.activities.feed.FeedTaskProcessor;
|
||||
import org.alfresco.repo.activities.feed.JobSettings;
|
||||
import org.alfresco.repo.activities.feed.RepoCtx;
|
||||
import org.alfresco.repo.batch.BatchProcessWorkProvider;
|
||||
import org.alfresco.repo.batch.BatchProcessor;
|
||||
import org.alfresco.repo.security.authentication.AuthenticationUtil;
|
||||
import org.alfresco.repo.security.authentication.AuthenticationUtil.RunAsWork;
|
||||
import org.alfresco.repo.transaction.RetryingTransactionHelper;
|
||||
import org.alfresco.repo.transaction.RetryingTransactionHelper.RetryingTransactionCallback;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
@@ -37,12 +44,25 @@ public class LocalFeedGenerator extends AbstractFeedGenerator
|
||||
private static Log logger = LogFactory.getLog(LocalFeedGenerator.class);
|
||||
|
||||
private FeedTaskProcessor feedTaskProcessor;
|
||||
|
||||
private int batchSize = 1000;
|
||||
private int numThreads = 4;
|
||||
|
||||
public void setFeedTaskProcessor(FeedTaskProcessor feedTaskProcessor)
|
||||
{
|
||||
this.feedTaskProcessor = feedTaskProcessor;
|
||||
}
|
||||
|
||||
public void setBatchSize(int batchSize)
|
||||
{
|
||||
this.batchSize = batchSize;
|
||||
}
|
||||
|
||||
public void setNumThreads(int numThreads)
|
||||
{
|
||||
this.numThreads = numThreads;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getEstimatedGridSize()
|
||||
{
|
||||
@@ -56,59 +76,124 @@ public class LocalFeedGenerator extends AbstractFeedGenerator
|
||||
|
||||
protected boolean generate() throws Exception
|
||||
{
|
||||
Long maxSequence = getPostDaoService().getMaxActivitySeq();
|
||||
Integer maxNodeHash = getPostDaoService().getMaxNodeHash();
|
||||
|
||||
String gridName = "local";
|
||||
|
||||
if ((maxSequence != null) && (maxNodeHash != null))
|
||||
final Long maxSequence = getPostDaoService().getMaxActivitySeq();
|
||||
final Long minSequence = getPostDaoService().getMinActivitySeq();
|
||||
final Integer maxNodeHash = getPostDaoService().getMaxNodeHash();
|
||||
|
||||
if ((maxSequence == null) || (minSequence == null) || (maxNodeHash == null))
|
||||
{
|
||||
if (logger.isDebugEnabled())
|
||||
{
|
||||
logger.debug(">>> Execute job cycle: " + gridName + " (maxSeq: " + maxSequence + ")");
|
||||
}
|
||||
|
||||
long startTime = System.currentTimeMillis();
|
||||
|
||||
// TODO ... or push this upto to job scheduler ... ?
|
||||
AuthenticationUtil.runAs(new RunAsWork<Object>()
|
||||
{
|
||||
public Object doWork()
|
||||
{
|
||||
getWebScriptsCtx().setTicket(getAuthenticationService().getCurrentTicket());
|
||||
return null;
|
||||
}
|
||||
}, AuthenticationUtil.getSystemUserName()); // need web scripts to support System-level authentication ... see RepositoryContainer !
|
||||
|
||||
JobSettings js = new JobSettings();
|
||||
js.setMaxSeq(maxSequence);
|
||||
js.setJobTaskNode(maxNodeHash);
|
||||
js.setWebScriptsCtx(getWebScriptsCtx());
|
||||
js.setMaxItemsPerCycle(getMaxItemsPerCycle());
|
||||
|
||||
LocalFeedTaskSplitter splitter = new LocalFeedTaskSplitter();
|
||||
splitter.setFeedTaskProcessor(feedTaskProcessor);
|
||||
|
||||
Collection<FeedGridJob> jobs = splitter.split(getEstimatedGridSize(), js);
|
||||
|
||||
for (FeedGridJob job : jobs)
|
||||
{
|
||||
job.execute();
|
||||
}
|
||||
|
||||
if (logger.isDebugEnabled())
|
||||
{
|
||||
logger.debug(">>> Finish job cycle: " + gridName + " (in " + (System.currentTimeMillis() - startTime) + " msecs)");
|
||||
}
|
||||
return true;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (logger.isDebugEnabled())
|
||||
{
|
||||
logger.debug(">>> No work to be done for this job cycle: " + gridName);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
// TODO ... or push this upto to job scheduler ... ?
|
||||
AuthenticationUtil.runAs(new RunAsWork<Object>()
|
||||
{
|
||||
public Object doWork()
|
||||
{
|
||||
getWebScriptsCtx().setTicket(getAuthenticationService().getCurrentTicket());
|
||||
return null;
|
||||
}
|
||||
}, AuthenticationUtil.getSystemUserName()); // need web scripts to support System-level authentication ... see RepositoryContainer !
|
||||
|
||||
// process the activity posts using the batch processor {@link BatchProcessor}
|
||||
BatchProcessor.BatchProcessWorker<JobSettings> worker = new BatchProcessor.BatchProcessWorker<JobSettings>()
|
||||
{
|
||||
@Override
|
||||
public String getIdentifier(final JobSettings js)
|
||||
{
|
||||
// TODO
|
||||
StringBuilder sb = new StringBuilder("JobSettings ");
|
||||
sb.append(js);
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void beforeProcess() throws Throwable
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterProcess() throws Throwable
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(final JobSettings js) throws Throwable
|
||||
{
|
||||
final RetryingTransactionHelper txHelper = getTransactionService().getRetryingTransactionHelper();
|
||||
txHelper.setMaxRetries(0);
|
||||
|
||||
txHelper.doInTransaction(new RetryingTransactionCallback<Void>()
|
||||
{
|
||||
public Void execute() throws Throwable
|
||||
{
|
||||
int jobTaskNode = js.getJobTaskNode();
|
||||
long minSeq = js.getMinSeq();
|
||||
long maxSeq = js.getMaxSeq();
|
||||
RepoCtx webScriptsCtx = js.getWebScriptsCtx();
|
||||
|
||||
// FeedTaskProcessor takes JobSettings parameters instead collection of ActivityPost. FeedTaskProcessor can be refactored.
|
||||
feedTaskProcessor.process(jobTaskNode , minSeq , maxSeq , webScriptsCtx );
|
||||
return null;
|
||||
}
|
||||
}, false, true);
|
||||
}
|
||||
};
|
||||
|
||||
// provides a JobSettings object
|
||||
BatchProcessWorkProvider<JobSettings> provider = new BatchProcessWorkProvider<JobSettings>()
|
||||
{
|
||||
private Long skip = minSequence;
|
||||
private boolean hasMore = true;
|
||||
|
||||
@Override
|
||||
public int getTotalEstimatedWorkSize()
|
||||
{
|
||||
long size = maxSequence - minSequence + 1;
|
||||
long remain = size % batchSize;
|
||||
long workSize = (remain == 0) ? (size / batchSize) : (size / batchSize + 1);
|
||||
return (int) workSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<JobSettings> getNextWork()
|
||||
{
|
||||
if (!hasMore)
|
||||
{
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
JobSettings js = new JobSettings();
|
||||
js.setMinSeq(skip);
|
||||
js.setMaxSeq(skip + batchSize - 1);
|
||||
js.setJobTaskNode(maxNodeHash);
|
||||
js.setWebScriptsCtx(getWebScriptsCtx());
|
||||
|
||||
skip += batchSize;
|
||||
hasMore = skip > maxSequence ? false : true;
|
||||
|
||||
// One JobSettings object will be returned. Because FeedTaskProcessor fetches list activity posts by itself before processing.
|
||||
List<JobSettings> result = new ArrayList<JobSettings>(1);
|
||||
result.add(js);
|
||||
|
||||
return result;
|
||||
}
|
||||
};
|
||||
|
||||
final RetryingTransactionHelper txHelper = getTransactionService().getRetryingTransactionHelper();
|
||||
txHelper.setMaxRetries(0);
|
||||
|
||||
// batchSize and loggingInterval parameters are equal 1 because provider always will provide collection with one JobSettings object.
|
||||
// FeedTaskProcessor fetches list activity posts by itself before processing. It needs only JobSettings parameters. FeedTaskProcessor can be refactored.
|
||||
new BatchProcessor<JobSettings>(
|
||||
"LocalFeedGenerator",
|
||||
txHelper,
|
||||
provider,
|
||||
numThreads, 1,
|
||||
null,
|
||||
logger, 1).process(worker, true);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
||||
|
@@ -261,11 +261,18 @@ public class SubscriptionServiceActivitiesTest
|
||||
|
||||
feed = activityService.getUserFeedEntries(USER_TWO_NAME, null, false, false, null, null);
|
||||
assertEquals(USER_TWO_NAME + " had wrong feed size.", 0, feed.size());
|
||||
|
||||
// userId1 + 5, userId2 + 0
|
||||
generateFeed();
|
||||
|
||||
feed = activityService.getUserFeedEntries(USER_ONE_NAME, null, false, false, null, null);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
// userId1 + 5, userId2 + 0
|
||||
generateFeed();
|
||||
doWorkAs(ADMIN, new RetryingTransactionCallback<Void>()
|
||||
{
|
||||
@Override public Void execute() throws Throwable
|
||||
{
|
||||
List<String> feed = activityService.getUserFeedEntries(USER_ONE_NAME, null, false, false, null, null);
|
||||
log.debug(USER_ONE_NAME + "'s feed: " + prettyJson(feed));
|
||||
assertEquals(USER_ONE_NAME + " had wrong feed size", 5, feed.size());
|
||||
|
||||
@@ -299,13 +306,12 @@ public class SubscriptionServiceActivitiesTest
|
||||
log.debug("And " + USER_TWO_NAME + " is now following " + USER_ONE_NAME);
|
||||
|
||||
|
||||
// userId1 + 5, userId2 + 2
|
||||
generateFeed();
|
||||
doWorkAs(ADMIN, new RetryingTransactionCallback<Void>()
|
||||
{
|
||||
@Override public Void execute() throws Throwable
|
||||
{
|
||||
// userId1 + 5, userId2 + 2
|
||||
generateFeed();
|
||||
|
||||
List<String> feed = activityService.getUserFeedEntries(USER_ONE_NAME, null, false, false, null, null);
|
||||
log.debug(USER_ONE_NAME + "'s feed: " + prettyJson(feed));
|
||||
assertEquals(USER_ONE_NAME + "'s feed was wrong size", 7, feed.size());
|
||||
@@ -333,13 +339,12 @@ public class SubscriptionServiceActivitiesTest
|
||||
log.debug(USER_ONE_NAME + " added some content across the sites.");
|
||||
|
||||
|
||||
// userId1 + 5, userId2 + 1
|
||||
generateFeed();
|
||||
doWorkAs(ADMIN, new RetryingTransactionCallback<Void>()
|
||||
{
|
||||
@Override public Void execute() throws Throwable
|
||||
{
|
||||
// userId1 + 5, userId2 + 1
|
||||
generateFeed();
|
||||
|
||||
List <String> feed = activityService.getUserFeedEntries(USER_ONE_NAME, null, false, false, null, null);
|
||||
log.debug(USER_ONE_NAME + "'s feed: " + prettyJson(feed));
|
||||
assertEquals(USER_ONE_NAME + "'s feed was wrong size", 12, feed.size());
|
||||
@@ -353,11 +358,18 @@ public class SubscriptionServiceActivitiesTest
|
||||
siteService.setMembership(modSite2.getShortName(), USER_TWO_NAME, SiteModel.SITE_MANAGER);
|
||||
|
||||
log.debug(USER_TWO_NAME + "'s role changed on some sites.");
|
||||
|
||||
// userId1 + 2, userId2 + 2
|
||||
generateFeed();
|
||||
|
||||
feed = activityService.getUserFeedEntries(USER_ONE_NAME, null, false, false, null, null);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
// userId1 + 2, userId2 + 2
|
||||
generateFeed();
|
||||
doWorkAs(ADMIN, new RetryingTransactionCallback<Void>()
|
||||
{
|
||||
@Override public Void execute() throws Throwable
|
||||
{
|
||||
List <String> feed = activityService.getUserFeedEntries(USER_ONE_NAME, null, false, false, null, null);
|
||||
log.debug(USER_ONE_NAME + "'s feed: " + prettyJson(feed));
|
||||
assertEquals(USER_ONE_NAME + "'s feed was wrong size", 16, feed.size());
|
||||
|
||||
@@ -385,14 +397,12 @@ public class SubscriptionServiceActivitiesTest
|
||||
log.debug(USER_ONE_NAME + " has added some more content...");
|
||||
|
||||
|
||||
|
||||
// userId1 + 5, userId2 + 3
|
||||
generateFeed();
|
||||
doWorkAs(ADMIN, new RetryingTransactionCallback<Void>()
|
||||
{
|
||||
@Override public Void execute() throws Throwable
|
||||
{
|
||||
// userId1 + 5, userId2 + 3
|
||||
generateFeed();
|
||||
|
||||
List<String> feed = activityService.getUserFeedEntries(USER_ONE_NAME, null, false, false, null, null);
|
||||
assertEquals("User's feed was wrong size", 21, feed.size());
|
||||
|
||||
|
Reference in New Issue
Block a user