diff --git a/config/alfresco/subsystems/ActivitiesFeed/default/activities-feed-context.xml b/config/alfresco/subsystems/ActivitiesFeed/default/activities-feed-context.xml index 2ecc34bb01..17e1d62dbc 100644 --- a/config/alfresco/subsystems/ActivitiesFeed/default/activities-feed-context.xml +++ b/config/alfresco/subsystems/ActivitiesFeed/default/activities-feed-context.xml @@ -90,6 +90,8 @@ + + diff --git a/config/alfresco/subsystems/ActivitiesFeed/default/activities-jobs.properties b/config/alfresco/subsystems/ActivitiesFeed/default/activities-jobs.properties index c18bd217e0..6081aabf53 100644 --- a/config/alfresco/subsystems/ActivitiesFeed/default/activities-jobs.properties +++ b/config/alfresco/subsystems/ActivitiesFeed/default/activities-jobs.properties @@ -21,6 +21,8 @@ activities.feed.generator.startDelayMins=${system.cronJob.startDelayMinutes} activities.feed.generator.cronExpression=0/30 * * * * ? activities.feed.generator.maxItemsPerCycle=100 activities.feed.generator.enabled=true +activities.feed.generator.batchSize=1000 +activities.feed.generator.numThreads=4 # activities feed cleaner activities.feed.cleaner.startDelayMins=${system.cronJob.startDelayMinutes} diff --git a/source/java/org/alfresco/repo/activities/feed/AbstractFeedGenerator.java b/source/java/org/alfresco/repo/activities/feed/AbstractFeedGenerator.java index d157bac9b3..39f586c9e9 100644 --- a/source/java/org/alfresco/repo/activities/feed/AbstractFeedGenerator.java +++ b/source/java/org/alfresco/repo/activities/feed/AbstractFeedGenerator.java @@ -122,6 +122,11 @@ public abstract class AbstractFeedGenerator implements FeedGenerator this.sysAdminParams = sysAdminParams; } + public TransactionService getTransactionService() + { + return transactionService; + } + public void setTransactionService(TransactionService transactionService) { this.transactionService = transactionService; diff --git a/source/java/org/alfresco/repo/activities/feed/local/LocalFeedGenerator.java b/source/java/org/alfresco/repo/activities/feed/local/LocalFeedGenerator.java index cb43b4dd6c..cd01f38fa7 100644 --- a/source/java/org/alfresco/repo/activities/feed/local/LocalFeedGenerator.java +++ b/source/java/org/alfresco/repo/activities/feed/local/LocalFeedGenerator.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2005-2011 Alfresco Software Limited. + * Copyright (C) 2005-2014 Alfresco Software Limited. * * This file is part of Alfresco * @@ -18,14 +18,21 @@ */ package org.alfresco.repo.activities.feed.local; +import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; +import java.util.List; import org.alfresco.repo.activities.feed.AbstractFeedGenerator; -import org.alfresco.repo.activities.feed.FeedGridJob; import org.alfresco.repo.activities.feed.FeedTaskProcessor; import org.alfresco.repo.activities.feed.JobSettings; +import org.alfresco.repo.activities.feed.RepoCtx; +import org.alfresco.repo.batch.BatchProcessWorkProvider; +import org.alfresco.repo.batch.BatchProcessor; import org.alfresco.repo.security.authentication.AuthenticationUtil; import org.alfresco.repo.security.authentication.AuthenticationUtil.RunAsWork; +import org.alfresco.repo.transaction.RetryingTransactionHelper; +import org.alfresco.repo.transaction.RetryingTransactionHelper.RetryingTransactionCallback; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -37,12 +44,25 @@ public class LocalFeedGenerator extends AbstractFeedGenerator private static Log logger = LogFactory.getLog(LocalFeedGenerator.class); private FeedTaskProcessor feedTaskProcessor; + + private int batchSize = 1000; + private int numThreads = 4; public void setFeedTaskProcessor(FeedTaskProcessor feedTaskProcessor) { this.feedTaskProcessor = feedTaskProcessor; } + public void setBatchSize(int batchSize) + { + this.batchSize = batchSize; + } + + public void setNumThreads(int numThreads) + { + this.numThreads = numThreads; + } + @Override public int getEstimatedGridSize() { @@ -56,59 +76,124 @@ public class LocalFeedGenerator extends AbstractFeedGenerator protected boolean generate() throws Exception { - Long maxSequence = getPostDaoService().getMaxActivitySeq(); - Integer maxNodeHash = getPostDaoService().getMaxNodeHash(); - - String gridName = "local"; - - if ((maxSequence != null) && (maxNodeHash != null)) + final Long maxSequence = getPostDaoService().getMaxActivitySeq(); + final Long minSequence = getPostDaoService().getMinActivitySeq(); + final Integer maxNodeHash = getPostDaoService().getMaxNodeHash(); + + if ((maxSequence == null) || (minSequence == null) || (maxNodeHash == null)) { - if (logger.isDebugEnabled()) - { - logger.debug(">>> Execute job cycle: " + gridName + " (maxSeq: " + maxSequence + ")"); - } - - long startTime = System.currentTimeMillis(); - - // TODO ... or push this upto to job scheduler ... ? - AuthenticationUtil.runAs(new RunAsWork() - { - public Object doWork() - { - getWebScriptsCtx().setTicket(getAuthenticationService().getCurrentTicket()); - return null; - } - }, AuthenticationUtil.getSystemUserName()); // need web scripts to support System-level authentication ... see RepositoryContainer ! - - JobSettings js = new JobSettings(); - js.setMaxSeq(maxSequence); - js.setJobTaskNode(maxNodeHash); - js.setWebScriptsCtx(getWebScriptsCtx()); - js.setMaxItemsPerCycle(getMaxItemsPerCycle()); - - LocalFeedTaskSplitter splitter = new LocalFeedTaskSplitter(); - splitter.setFeedTaskProcessor(feedTaskProcessor); - - Collection jobs = splitter.split(getEstimatedGridSize(), js); - - for (FeedGridJob job : jobs) - { - job.execute(); - } - - if (logger.isDebugEnabled()) - { - logger.debug(">>> Finish job cycle: " + gridName + " (in " + (System.currentTimeMillis() - startTime) + " msecs)"); - } - return true; - } - else - { - if (logger.isDebugEnabled()) - { - logger.debug(">>> No work to be done for this job cycle: " + gridName); - } return false; } + + // TODO ... or push this upto to job scheduler ... ? + AuthenticationUtil.runAs(new RunAsWork() + { + public Object doWork() + { + getWebScriptsCtx().setTicket(getAuthenticationService().getCurrentTicket()); + return null; + } + }, AuthenticationUtil.getSystemUserName()); // need web scripts to support System-level authentication ... see RepositoryContainer ! + + // process the activity posts using the batch processor {@link BatchProcessor} + BatchProcessor.BatchProcessWorker worker = new BatchProcessor.BatchProcessWorker() + { + @Override + public String getIdentifier(final JobSettings js) + { + // TODO + StringBuilder sb = new StringBuilder("JobSettings "); + sb.append(js); + return sb.toString(); + } + + @Override + public void beforeProcess() throws Throwable + { + } + + @Override + public void afterProcess() throws Throwable + { + } + + @Override + public void process(final JobSettings js) throws Throwable + { + final RetryingTransactionHelper txHelper = getTransactionService().getRetryingTransactionHelper(); + txHelper.setMaxRetries(0); + + txHelper.doInTransaction(new RetryingTransactionCallback() + { + public Void execute() throws Throwable + { + int jobTaskNode = js.getJobTaskNode(); + long minSeq = js.getMinSeq(); + long maxSeq = js.getMaxSeq(); + RepoCtx webScriptsCtx = js.getWebScriptsCtx(); + + // FeedTaskProcessor takes JobSettings parameters instead collection of ActivityPost. FeedTaskProcessor can be refactored. + feedTaskProcessor.process(jobTaskNode , minSeq , maxSeq , webScriptsCtx ); + return null; + } + }, false, true); + } + }; + + // provides a JobSettings object + BatchProcessWorkProvider provider = new BatchProcessWorkProvider() + { + private Long skip = minSequence; + private boolean hasMore = true; + + @Override + public int getTotalEstimatedWorkSize() + { + long size = maxSequence - minSequence + 1; + long remain = size % batchSize; + long workSize = (remain == 0) ? (size / batchSize) : (size / batchSize + 1); + return (int) workSize; + } + + @Override + public Collection getNextWork() + { + if (!hasMore) + { + return Collections.emptyList(); + } + + JobSettings js = new JobSettings(); + js.setMinSeq(skip); + js.setMaxSeq(skip + batchSize - 1); + js.setJobTaskNode(maxNodeHash); + js.setWebScriptsCtx(getWebScriptsCtx()); + + skip += batchSize; + hasMore = skip > maxSequence ? false : true; + + // One JobSettings object will be returned. Because FeedTaskProcessor fetches list activity posts by itself before processing. + List result = new ArrayList(1); + result.add(js); + + return result; + } + }; + + final RetryingTransactionHelper txHelper = getTransactionService().getRetryingTransactionHelper(); + txHelper.setMaxRetries(0); + + // batchSize and loggingInterval parameters are equal 1 because provider always will provide collection with one JobSettings object. + // FeedTaskProcessor fetches list activity posts by itself before processing. It needs only JobSettings parameters. FeedTaskProcessor can be refactored. + new BatchProcessor( + "LocalFeedGenerator", + txHelper, + provider, + numThreads, 1, + null, + logger, 1).process(worker, true); + + return true; } + } diff --git a/source/test-java/org/alfresco/repo/subscriptions/SubscriptionServiceActivitiesTest.java b/source/test-java/org/alfresco/repo/subscriptions/SubscriptionServiceActivitiesTest.java index a001d9683a..2ff48fe41d 100644 --- a/source/test-java/org/alfresco/repo/subscriptions/SubscriptionServiceActivitiesTest.java +++ b/source/test-java/org/alfresco/repo/subscriptions/SubscriptionServiceActivitiesTest.java @@ -261,11 +261,18 @@ public class SubscriptionServiceActivitiesTest feed = activityService.getUserFeedEntries(USER_TWO_NAME, null, false, false, null, null); assertEquals(USER_TWO_NAME + " had wrong feed size.", 0, feed.size()); - - // userId1 + 5, userId2 + 0 - generateFeed(); - - feed = activityService.getUserFeedEntries(USER_ONE_NAME, null, false, false, null, null); + return null; + } + }); + + + // userId1 + 5, userId2 + 0 + generateFeed(); + doWorkAs(ADMIN, new RetryingTransactionCallback() + { + @Override public Void execute() throws Throwable + { + List feed = activityService.getUserFeedEntries(USER_ONE_NAME, null, false, false, null, null); log.debug(USER_ONE_NAME + "'s feed: " + prettyJson(feed)); assertEquals(USER_ONE_NAME + " had wrong feed size", 5, feed.size()); @@ -299,13 +306,12 @@ public class SubscriptionServiceActivitiesTest log.debug("And " + USER_TWO_NAME + " is now following " + USER_ONE_NAME); + // userId1 + 5, userId2 + 2 + generateFeed(); doWorkAs(ADMIN, new RetryingTransactionCallback() { @Override public Void execute() throws Throwable { - // userId1 + 5, userId2 + 2 - generateFeed(); - List feed = activityService.getUserFeedEntries(USER_ONE_NAME, null, false, false, null, null); log.debug(USER_ONE_NAME + "'s feed: " + prettyJson(feed)); assertEquals(USER_ONE_NAME + "'s feed was wrong size", 7, feed.size()); @@ -333,13 +339,12 @@ public class SubscriptionServiceActivitiesTest log.debug(USER_ONE_NAME + " added some content across the sites."); + // userId1 + 5, userId2 + 1 + generateFeed(); doWorkAs(ADMIN, new RetryingTransactionCallback() { @Override public Void execute() throws Throwable { - // userId1 + 5, userId2 + 1 - generateFeed(); - List feed = activityService.getUserFeedEntries(USER_ONE_NAME, null, false, false, null, null); log.debug(USER_ONE_NAME + "'s feed: " + prettyJson(feed)); assertEquals(USER_ONE_NAME + "'s feed was wrong size", 12, feed.size()); @@ -353,11 +358,18 @@ public class SubscriptionServiceActivitiesTest siteService.setMembership(modSite2.getShortName(), USER_TWO_NAME, SiteModel.SITE_MANAGER); log.debug(USER_TWO_NAME + "'s role changed on some sites."); - - // userId1 + 2, userId2 + 2 - generateFeed(); - - feed = activityService.getUserFeedEntries(USER_ONE_NAME, null, false, false, null, null); + return null; + } + }); + + + // userId1 + 2, userId2 + 2 + generateFeed(); + doWorkAs(ADMIN, new RetryingTransactionCallback() + { + @Override public Void execute() throws Throwable + { + List feed = activityService.getUserFeedEntries(USER_ONE_NAME, null, false, false, null, null); log.debug(USER_ONE_NAME + "'s feed: " + prettyJson(feed)); assertEquals(USER_ONE_NAME + "'s feed was wrong size", 16, feed.size()); @@ -385,14 +397,12 @@ public class SubscriptionServiceActivitiesTest log.debug(USER_ONE_NAME + " has added some more content..."); - + // userId1 + 5, userId2 + 3 + generateFeed(); doWorkAs(ADMIN, new RetryingTransactionCallback() { @Override public Void execute() throws Throwable { - // userId1 + 5, userId2 + 3 - generateFeed(); - List feed = activityService.getUserFeedEntries(USER_ONE_NAME, null, false, false, null, null); assertEquals("User's feed was wrong size", 21, feed.size());