/* * Copyright (C) 2005-2006 Alfresco, Inc. * * Licensed under the Mozilla Public License version 1.1 * with a permitted attribution clause. You may obtain a * copy of the License at * * http://www.alfresco.org/legal/license.txt * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, * either express or implied. See the License for the specific * language governing permissions and limitations under the * License. */ package org.alfresco.repo.node.index; import java.util.ArrayList; import java.util.List; import org.alfresco.error.AlfrescoRuntimeException; import org.alfresco.model.ContentModel; import org.alfresco.repo.domain.NodeStatus; import org.alfresco.repo.search.Indexer; import org.alfresco.repo.search.impl.lucene.LuceneIndexerImpl; import org.alfresco.repo.search.impl.lucene.fts.FullTextSearchIndexer; import org.alfresco.repo.transaction.TransactionUtil; import org.alfresco.repo.transaction.TransactionUtil.TransactionWork; import org.alfresco.service.cmr.repository.ChildAssociationRef; import org.alfresco.service.cmr.repository.NodeRef; import org.alfresco.service.cmr.repository.NodeService; import org.alfresco.service.cmr.repository.StoreRef; import org.alfresco.service.cmr.search.ResultSet; import org.alfresco.service.cmr.search.SearchParameters; import org.alfresco.service.cmr.search.SearchService; import org.alfresco.service.transaction.TransactionService; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.hibernate.CacheMode; import org.hibernate.Query; import org.hibernate.Session; import org.springframework.orm.hibernate3.HibernateCallback; import org.springframework.orm.hibernate3.support.HibernateDaoSupport; /** * Ensures that the FTS indexing picks up on any outstanding documents that * require indexing. *

* This component must be used as a singleton (one per VM) and may only be * called to reindex once. It will start a thread that processes all available * transactions and keeps checking to ensure that the index is up to date with * the latest database changes. *

* The following points are important: *

* * @author Derek Hulley */ public class FullIndexRecoveryComponent extends HibernateDaoSupport implements IndexRecovery { public static final String QUERY_GET_NEXT_CHANGE_TXN_IDS = "node.GetNextChangeTxnIds"; public static final String QUERY_GET_CHANGED_NODE_STATUSES = "node.GetChangedNodeStatuses"; public static final String QUERY_GET_DELETED_NODE_STATUSES = "node.GetDeletedNodeStatuses"; public static final String QUERY_GET_CHANGED_NODE_STATUSES_COUNT = "node.GetChangedNodeStatusesCount"; private static final String START_TXN_ID = "000"; private static Log logger = LogFactory.getLog(FullIndexRecoveryComponent.class); /** ensures that this process is kicked off once per VM */ private static boolean started = false; /** The current transaction ID being processed */ private static String currentTxnId = START_TXN_ID; /** kept to notify the thread that it should quit */ private boolean killThread = false; /** provides transactions to atomically index each missed transaction */ private TransactionService transactionService; /** the component to index the node hierarchy */ private Indexer indexer; /** the FTS indexer that we will prompt to pick up on any un-indexed text */ private FullTextSearchIndexer ftsIndexer; /** the component providing searches of the indexed nodes */ private SearchService searcher; /** the component giving direct access to node instances */ private NodeService nodeService; /** set this to run the index recovery component */ private boolean executeFullRecovery; /** set this on to keep checking for new transactions and never stop */ private boolean runContinuously; /** set the time to wait between checking indexes */ private long waitTime; /** controls how the L2 cache is used */ private CacheMode l2CacheMode; /** * @return Returns the ID of the current (or last) transaction processed */ public static String getCurrentTransactionId() { return currentTxnId; } public FullIndexRecoveryComponent() { this.killThread = false; this.executeFullRecovery = false; this.runContinuously = false; this.waitTime = 1000L; this.l2CacheMode = CacheMode.REFRESH; // ensure that we kill the thread when the VM is shutting down Runnable shutdownRunnable = new Runnable() { public void run() { killThread = true; }; }; Thread shutdownThread = new Thread(shutdownRunnable); Runtime.getRuntime().addShutdownHook(shutdownThread); } /** * @return Returns true if the component has already been started */ public static boolean isStarted() { return started; } /** * @param transactionService provide transactions to index each missed transaction */ public void setTransactionService(TransactionService transactionService) { this.transactionService = transactionService; } /** * @param indexer the indexer that will be index */ public void setIndexer(Indexer indexer) { this.indexer = indexer; } /** * @param ftsIndexer the FTS background indexer */ public void setFtsIndexer(FullTextSearchIndexer ftsIndexer) { this.ftsIndexer = ftsIndexer; } /** * @param searcher component providing index searches */ public void setSearcher(SearchService searcher) { this.searcher = searcher; } /** * @param nodeService provides information about nodes for indexing */ public void setNodeService(NodeService nodeService) { this.nodeService = nodeService; } /** * Set this to true to initiate the full index recovery. *

* This used to default to true but is now false. Set this * if the potentially long-running process of checking and fixing the * indexes must be started. * * @param executeFullRecovery */ public void setExecuteFullRecovery(boolean executeFullRecovery) { this.executeFullRecovery = executeFullRecovery; } /** * Set this to ensure that the process continuously checks for new transactions. * If not, it will permanently terminate once it catches up with the current * transactions. * * @param runContinuously true to never cease looking for new transactions */ public void setRunContinuously(boolean runContinuously) { this.runContinuously = runContinuously; } /** * Set the time to wait between checking for new transaction changes in the database. * * @param waitTime the time to wait in milliseconds */ public void setWaitTime(long waitTime) { this.waitTime = waitTime; } /** * Set the hibernate cache mode by name * * @see org.hibernate.CacheMode */ public void setL2CacheMode(String l2CacheModeStr) { if (l2CacheModeStr.equals("GET")) { l2CacheMode = CacheMode.GET; } else if (l2CacheModeStr.equals("IGNORE")) { l2CacheMode = CacheMode.IGNORE; } else if (l2CacheModeStr.equals("NORMAL")) { l2CacheMode = CacheMode.NORMAL; } else if (l2CacheModeStr.equals("PUT")) { l2CacheMode = CacheMode.PUT; } else if (l2CacheModeStr.equals("REFRESH")) { l2CacheMode = CacheMode.REFRESH; } else { throw new IllegalArgumentException("Unrecognised Hibernate L2 cache mode: " + l2CacheModeStr); } } /** * Ensure that the index is up to date with the current state of the persistence layer. * The full list of unique transaction change IDs is retrieved and used to detect * which are not present in the index. All the node changes and deletions for the * remaining transactions are then indexed. */ public synchronized void reindex() { if (FullIndexRecoveryComponent.started) { throw new AlfrescoRuntimeException ("Only one FullIndexRecoveryComponent may be used per VM and it may only be called once"); } // ensure that we don't redo this work FullIndexRecoveryComponent.started = true; // work to mark the stores for full text reindexing TransactionWork ftsReindexWork = new TransactionWork() { public Object doWork() { List storeRefs = nodeService.getStores(); // reindex each store for (StoreRef storeRef : storeRefs) { // check if the store exists if (!nodeService.exists(storeRef)) { // store does not exist if (logger.isDebugEnabled()) { logger.debug("Skipping reindex of non-existent store: " + storeRef); } continue; } // prompt FTS to reindex the store ftsIndexer.requiresIndex(storeRef); } // done if (logger.isDebugEnabled()) { logger.debug("Prompted FTS index on stores: " + storeRefs); } return null; } }; TransactionUtil.executeInNonPropagatingUserTransaction(transactionService, ftsReindexWork); // start full index recovery, if necessary if (!this.executeFullRecovery) { if (logger.isDebugEnabled()) { logger.debug("Full index recovery is off - quitting"); } } else { // set the state of the reindex FullIndexRecoveryComponent.currentTxnId = START_TXN_ID; // start a stateful thread that will begin processing the reindexing the transactions Runnable runnable = new ReindexRunner(); Thread reindexThread = new Thread(runnable); // make it a daemon thread reindexThread.setDaemon(true); // it should not be a high priority reindexThread.setPriority(Thread.MIN_PRIORITY); // start it reindexThread.start(); if (logger.isDebugEnabled()) { logger.debug("Full index recovery thread started: \n" + " continuous: " + runContinuously); } } } /** * Stateful thread runnable that executes reindex calls. * * @see FullIndexRecoveryComponent#reindexNodes() * * @author Derek Hulley */ private class ReindexRunner implements Runnable { public void run() { // keep this thread going permanently while (!killThread) { try { // reindex nodes List txnsIndexed = FullIndexRecoveryComponent.this.reindexNodes(); // reindex missing content // @SuppressWarnings("unused") // int missingContentCount = FullIndexRecoveryComponent.this.reindexMissingContent(); // check if the process should terminate if (txnsIndexed.size() == 0 && !runContinuously) { // the thread has caught up with all the available work and should not // run continuously if (logger.isDebugEnabled()) { logger.debug("Thread quitting - no more available indexing to do: \n" + " last txn: " + FullIndexRecoveryComponent.getCurrentTransactionId()); } break; } // brief pause synchronized(FullIndexRecoveryComponent.this) { FullIndexRecoveryComponent.this.wait(waitTime); } } catch (InterruptedException e) { // ignore } catch (Throwable e) { if (killThread) { // the shutdown may have caused the exception - ignore it } else { // we are still a go; report it logger.error("Reindex failure", e); } } } } } /** * @return Returns the transaction ID just reindexed, i.e. where some work was performed */ private List reindexNodes() { // get a list of all transactions still requiring a check List txnsToCheck = getNextChangeTxnIds(FullIndexRecoveryComponent.currentTxnId); // loop over each transaction for (String changeTxnId : txnsToCheck) { reindexNodes(changeTxnId); } // done return txnsToCheck; } /** * Reindexes changes specific to the change transaction ID. *

* All exceptions are absorbed. */ private void reindexNodes(final String changeTxnId) { /* * This must execute each within its own transaction. * The cache size is therefore not an issue. */ TransactionWork reindexWork = new TransactionWork() { public Object doWork() throws Exception { // perform the work in a Hibernate callback HibernateCallback callback = new ReindexCallback(changeTxnId); getHibernateTemplate().execute(callback); // done return null; } }; try { TransactionUtil.executeInNonPropagatingUserTransaction(transactionService, reindexWork); } catch (Throwable e) { logger.error("Transaction reindex failed: \n" + " txn: " + changeTxnId, e); } finally { // Up the current transaction now, in case the process fails at this point. // This will prevent the transaction from being processed again. // This applies to failures as well, which should be dealt with externally // and having the entire process start again, e.g. such as a system reboot currentTxnId = changeTxnId; } } /** * Stateful inner class that implements a single reindex call for a given store * and transaction. *

* It must be called within its own transaction. * * @author Derek Hulley */ private class ReindexCallback implements HibernateCallback { private final String changeTxnId; public ReindexCallback(String changeTxnId) { this.changeTxnId = changeTxnId; } /** * Changes the L2 cache usage before reindexing for each store * * @see #reindexNodes(StoreRef, String) */ public Object doInHibernate(Session session) { // set the way the L2 cache is used getSession().setCacheMode(l2CacheMode); // reindex each store // for (StoreRef storeRef : storeRefs) // { // if (!nodeService.exists(storeRef)) // { // // the store is not present // continue; // } // // reindex for store // reindexNodes(storeRef, changeTxnId); // } // done return null; } private void reindexNodes(StoreRef storeRef, String changeTxnId) { // check if we need to perform this operation SearchParameters sp = new SearchParameters(); sp.addStore(storeRef); // search for it in the index String query = "TX:\"" + changeTxnId + "\""; sp.setLanguage(SearchService.LANGUAGE_LUCENE); sp.setQuery(query); ResultSet results = null; try { results = searcher.query(sp); // did the index have any of these changes? if (results.length() > 0) { // the transaction has an entry in the index - assume that it was // atomically correct if (logger.isDebugEnabled()) { logger.debug("Transaction present in index - no indexing required: \n" + " store: " + storeRef + "\n" + " txn: " + changeTxnId); } return; } } finally { if (results != null) { results.close(); } } // the index has no record of this // were there any changes, or is it all just deletions? int changedCount = getChangedNodeStatusesCount(storeRef, changeTxnId); if (changedCount == 0) { // no nodes were changed in the transaction, i.e. they are only deletions // the index is quite right not to have any entries for the transaction if (logger.isDebugEnabled()) { logger.debug("Transaction only has deletions - no indexing required: \n" + " store: " + storeRef + "\n" + " txn: " + changeTxnId); } return; } // process the deletions relevant to the txn and the store List deletedNodeStatuses = getDeletedNodeStatuses(storeRef, changeTxnId); for (NodeStatus status : deletedNodeStatuses) { NodeRef nodeRef = new NodeRef(storeRef, status.getKey().getGuid()); // only the child node ref is relevant ChildAssociationRef assocRef = new ChildAssociationRef( ContentModel.ASSOC_CHILDREN, null, null, nodeRef); indexer.deleteNode(assocRef); } // process additions List changedNodeStatuses = getChangedNodeStatuses(storeRef, changeTxnId); for (NodeStatus status : changedNodeStatuses) { NodeRef nodeRef = new NodeRef(storeRef, status.getKey().getGuid()); // get the primary assoc for the node ChildAssociationRef primaryAssocRef = nodeService.getPrimaryParent(nodeRef); // reindex indexer.createNode(primaryAssocRef); } // done if (logger.isDebugEnabled()) { logger.debug("Transaction reindexed: \n" + " store: " + storeRef + "\n" + " txn: " + changeTxnId + "\n" + " deletions: " + deletedNodeStatuses.size() + "\n" + " modifications: " + changedNodeStatuses.size()); } } }; /** * Retrieve next 50 transaction IDs that are greater than the given transaction ID. * * @param currentTxnId the transaction ID that must be less than all returned results * @return Returns an ordered list of the next 50 transaction IDs */ @SuppressWarnings("unchecked") public List getNextChangeTxnIds(final String currentTxnId) { HibernateCallback callback = new HibernateCallback() { public Object doInHibernate(Session session) { Query query = session.getNamedQuery(QUERY_GET_NEXT_CHANGE_TXN_IDS); query.setString("currentTxnId", currentTxnId) .setMaxResults(50) .setReadOnly(true); return query.list(); } }; List queryResults = (List) getHibernateTemplate().execute(callback); // done return queryResults; } @SuppressWarnings("unchecked") public int getChangedNodeStatusesCount(final StoreRef storeRef, final String changeTxnId) { HibernateCallback callback = new HibernateCallback() { public Object doInHibernate(Session session) { Query query = session.getNamedQuery(QUERY_GET_CHANGED_NODE_STATUSES_COUNT); query.setString("storeProtocol", storeRef.getProtocol()) .setString("storeIdentifier", storeRef.getIdentifier()) .setString("changeTxnId", changeTxnId) .setReadOnly(true); return query.uniqueResult(); } }; Integer changeCount = (Integer) getHibernateTemplate().execute(callback); // done return changeCount.intValue(); } @SuppressWarnings("unchecked") public List getChangedNodeStatuses(final StoreRef storeRef, final String changeTxnId) { HibernateCallback callback = new HibernateCallback() { public Object doInHibernate(Session session) { Query query = session.getNamedQuery(QUERY_GET_CHANGED_NODE_STATUSES); query.setString("storeProtocol", storeRef.getProtocol()) .setString("storeIdentifier", storeRef.getIdentifier()) .setString("changeTxnId", changeTxnId) .setReadOnly(true); return query.list(); } }; List queryResults = (List) getHibernateTemplate().execute(callback); // done return queryResults; } @SuppressWarnings("unchecked") public List getDeletedNodeStatuses(final StoreRef storeRef, final String changeTxnId) { HibernateCallback callback = new HibernateCallback() { public Object doInHibernate(Session session) { Query query = session.getNamedQuery(QUERY_GET_DELETED_NODE_STATUSES); query.setString("storeProtocol", storeRef.getProtocol()) .setString("storeIdentifier", storeRef.getIdentifier()) .setString("changeTxnId", changeTxnId) .setReadOnly(true); return query.list(); } }; List queryResults = (List) getHibernateTemplate().execute(callback); // done return queryResults; } }