package org.alfresco.repo.activities.feed; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import org.alfresco.model.ContentModel; import org.alfresco.query.PagingRequest; import org.alfresco.query.PagingResults; import org.alfresco.repo.admin.SysAdminParams; import org.alfresco.repo.batch.BatchProcessWorkProvider; import org.alfresco.repo.batch.BatchProcessor; import org.alfresco.repo.dictionary.RepositoryLocation; import org.alfresco.repo.lock.JobLockService; import org.alfresco.repo.lock.LockAcquisitionException; import org.alfresco.repo.security.authentication.AuthenticationUtil; import org.alfresco.repo.transaction.RetryingTransactionHelper; import org.alfresco.service.cmr.admin.RepoAdminService; import org.alfresco.service.cmr.model.FileFolderService; import org.alfresco.service.cmr.repository.InvalidNodeRefException; import org.alfresco.service.cmr.repository.NodeRef; import org.alfresco.service.cmr.repository.NodeService; import org.alfresco.service.cmr.repository.StoreRef; import org.alfresco.service.cmr.search.SearchService; import org.alfresco.service.cmr.security.PersonService; import org.alfresco.service.cmr.security.PersonService.PersonInfo; import org.alfresco.service.namespace.NamespaceService; import org.alfresco.service.namespace.QName; import org.alfresco.service.transaction.TransactionService; import org.alfresco.util.ModelUtil; import org.alfresco.util.Pair; import org.alfresco.util.PropertyCheck; import org.alfresco.util.UrlUtil; import org.alfresco.util.VmShutdownListener; import org.alfresco.util.VmShutdownListener.VmShutdownException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.extensions.surf.util.I18NUtil; /** * Implementation of the Activity Feed Notifier component * * Note: currently implemented to email activities stored in JSON format * * @since 4.0 */ public class FeedNotifierImpl implements FeedNotifier, ApplicationContextAware { protected static Log logger = LogFactory.getLog(FeedNotifier.class); private static final QName LOCK_QNAME = QName.createQName(NamespaceService.SYSTEM_MODEL_1_0_URI, "ActivityFeedNotifier"); private static final long LOCK_TTL = 30000L; private static VmShutdownListener vmShutdownListener = new VmShutdownListener(FeedNotifierImpl.class.getName()); private static final String MSG_EMAIL_SUBJECT = "activities.feed.notifier.email.subject"; private NamespaceService namespaceService; private FileFolderService fileFolderService; private SearchService searchService; private PersonService personService; private NodeService nodeService; private JobLockService jobLockService; private TransactionService transactionService; private SysAdminParams sysAdminParams; private RepoAdminService repoAdminService; private UserNotifier userNotifier; private ApplicationContext applicationContext; private RepositoryLocation feedEmailTemplateLocation; private int numThreads = 4; private int batchSize = 200; public void setNumThreads(int numThreads) { this.numThreads = numThreads; } public void setBatchSize(int batchSize) { this.batchSize = batchSize; } public void setUserNotifier(UserNotifier userNotifier) { this.userNotifier = userNotifier; } public void setFileFolderService(FileFolderService fileFolderService) { this.fileFolderService = fileFolderService; } public void setSearchService(SearchService searchService) { this.searchService = searchService; } public void setNamespaceService(NamespaceService namespaceService) { this.namespaceService = namespaceService; } public void setPersonService(PersonService personService) { this.personService = personService; } public void setNodeService(NodeService nodeService) { this.nodeService = nodeService; } public void setJobLockService(JobLockService jobLockService) { this.jobLockService = jobLockService; } public void setTransactionService(TransactionService transactionService) { this.transactionService = transactionService; } public void setSysAdminParams(SysAdminParams sysAdminParams) { this.sysAdminParams = sysAdminParams; } public void setRepoAdminService(RepoAdminService repoAdminService) { this.repoAdminService = repoAdminService; } /** * Perform basic checks to ensure that the necessary dependencies were injected. */ protected void checkProperties() { PropertyCheck.mandatory(this, "personService", personService); PropertyCheck.mandatory(this, "nodeService", nodeService); PropertyCheck.mandatory(this, "jobLockService", jobLockService); PropertyCheck.mandatory(this, "transactionService", transactionService); PropertyCheck.mandatory(this, "sysAdminParams", sysAdminParams); } public void execute(int repeatIntervalMins) { checkProperties(); // Bypass if the system is in read-only mode if (transactionService.isReadOnly()) { if (logger.isDebugEnabled()) { logger.debug("Activities email notification bypassed; the system is read-only"); } return; } String lockToken = getLock(LOCK_TTL); try { if (logger.isTraceEnabled()) { logger.trace("Activities email notification started"); } executeInternal(lockToken, repeatIntervalMins); // Done if (logger.isTraceEnabled()) { logger.trace("Activities email notification completed"); } } catch (LockAcquisitionException e) { // Job being done by another process if (logger.isDebugEnabled()) { logger.debug("Activities email notification already underway"); } } catch (VmShutdownException e) { // Aborted if (logger.isDebugEnabled()) { logger.debug("Activities email notification aborted"); } } finally { releaseLock(lockToken); } } public void setFeedEmailTemplateLocation(RepositoryLocation feedEmailTemplateLocation) { this.feedEmailTemplateLocation = feedEmailTemplateLocation; } private NodeRef getEmailTemplateRef() { StoreRef store = feedEmailTemplateLocation.getStoreRef(); String xpath = feedEmailTemplateLocation.getPath(); if (! feedEmailTemplateLocation.getQueryLanguage().equals(SearchService.LANGUAGE_XPATH)) { logger.warn("Cannot find the activities email template - repository location query language is not 'xpath': "+feedEmailTemplateLocation.getQueryLanguage()); return null; } List nodeRefs = searchService.selectNodes(nodeService.getRootNode(store), xpath, null, namespaceService, false); if (nodeRefs.size() != 1) { logger.warn("Cannot find the activities email template: "+xpath); return null; } return fileFolderService.getLocalizedSibling(nodeRefs.get(0)); } private void executeInternal(final String lockToken, final int repeatIntervalMins) { final NodeRef emailTemplateRef = getEmailTemplateRef(); if (emailTemplateRef == null) { return; } final String shareUrl = UrlUtil.getShareUrl(sysAdminParams); if (logger.isDebugEnabled()) { logger.debug("Share URL configured as: "+shareUrl); } final AtomicInteger userCnt = new AtomicInteger(0); final AtomicInteger feedEntryCnt = new AtomicInteger(0); long startTime = System.currentTimeMillis(); // local cache for this execution final Map siteNames = new ConcurrentHashMap(10); try { final String subjectText = buildSubjectText(startTime); final String currentUser = AuthenticationUtil.getFullyAuthenticatedUser(); // process the feeds using the batch processor {@link BatchProcessor} BatchProcessor.BatchProcessWorker worker = new BatchProcessor.BatchProcessWorker() { public String getIdentifier(final PersonInfo person) { StringBuilder sb = new StringBuilder("Person "); sb.append(person.getUserName()); return sb.toString(); } public void beforeProcess() throws Throwable { AuthenticationUtil.setRunAsUser(currentUser); refreshLock(lockToken, batchSize * 500L); } public void afterProcess() throws Throwable { } public void process(final PersonInfo person) throws Throwable { final NodeRef personNodeRef = person.getNodeRef(); try { Pair result = userNotifier.notifyUser(personNodeRef, subjectText, siteNames, shareUrl, repeatIntervalMins, emailTemplateRef); if (result != null) { int entryCnt = result.getFirst(); final long maxFeedId = result.getSecond(); Long currentMaxFeedId = (Long)nodeService.getProperty(personNodeRef, ContentModel.PROP_EMAIL_FEED_ID); if ((currentMaxFeedId == null) || (currentMaxFeedId < maxFeedId)) { nodeService.setProperty(personNodeRef, ContentModel.PROP_EMAIL_FEED_ID, maxFeedId); } userCnt.incrementAndGet(); feedEntryCnt.addAndGet(entryCnt); } } catch (InvalidNodeRefException inre) { // skip this person - eg. no longer exists ? logger.warn("Skip feed notification for user ("+personNodeRef+"): " + inre.getMessage()); } } }; // grab people for the batch processor in chunks of size batchSize BatchProcessWorkProvider provider = new BatchProcessWorkProvider() { private int skip = 0; private int maxItems = batchSize; @Override public int getTotalEstimatedWorkSize() { return personService.countPeople(); } @Override public Collection getNextWork() { PagingResults people = personService.getPeople(null, true, null, new PagingRequest(skip, maxItems)); skip += maxItems; return people.getPage(); } }; final RetryingTransactionHelper txHelper = transactionService.getRetryingTransactionHelper(); txHelper.setMaxRetries(0); new BatchProcessor( "FeedNotifier", txHelper, provider, numThreads, batchSize, applicationContext, logger, 100).process(worker, true); } catch (Throwable e) { // If the VM is shutting down, then ignore if (vmShutdownListener.isVmShuttingDown()) { // Ignore } else { logger.error("Exception during notification of feeds", e); } } finally { int count = userCnt.get(); int entryCount = feedEntryCnt.get(); // assume sends are synchronous - hence bump up to last max feed id if (count > 0) { if (logger.isInfoEnabled()) { // TODO i18n of info message StringBuilder sb = new StringBuilder(); sb.append("Notified ").append(userCnt).append(" user").append(count != 1 ? "s" : ""); sb.append(" of ").append(feedEntryCnt).append(" activity feed entr").append(entryCount != 1 ? "ies" : "y"); sb.append(" (in ").append(System.currentTimeMillis()-startTime).append(" msecs)"); logger.info(sb.toString()); } } else { if (logger.isTraceEnabled()) { logger.trace("Nothing to send since no new user activities found"); } } } } protected String buildSubjectText(long currentTime) { return I18NUtil.getMessage(MSG_EMAIL_SUBJECT, ModelUtil.getProductName(repoAdminService)); } private String getLock(long time) { try { return jobLockService.getLock(LOCK_QNAME, time); } catch (LockAcquisitionException e) { return null; } } protected void refreshLock(String lockToken, long time) { if (lockToken == null) { throw new IllegalArgumentException("Must provide existing lockToken"); } jobLockService.refreshLock(lockToken, LOCK_QNAME, time); } /** * Release the lock after the job completes */ private void releaseLock(String lockToken) { if (lockToken == null) { throw new IllegalArgumentException("Must provide existing lockToken"); } jobLockService.releaseLock(lockToken, LOCK_QNAME); } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } }