Made the ThreadPoolExecutorFactory more configurable, but also with more sensible defaults.

- The queue for background jobs is unlimited
 - A smaller number of core threads is maintained
 - It is possible to set the threads' priority (default low)
Added hasAspect() method to XPath functions.
Various other cosmetic changes.


git-svn-id: https://svn.alfresco.com/repos/alfresco-enterprise/alfresco/HEAD/root@6146 c4b6b30b-aa2e-2d43-bbcb-ca4b014f7261
This commit is contained in:
Derek Hulley 2007-07-04 00:51:54 +00:00
parent 4b0491c232
commit d1a9347c09
9 changed files with 256 additions and 42 deletions

View File

@ -811,15 +811,9 @@
<!-- Thread Pool --> <!-- Thread Pool -->
<!-- --> <!-- -->
<bean id="threadPoolExecutor" class="org.alfresco.util.ThreadPoolExecutorFactoryBean" singleton="true" > <bean id="threadPoolExecutor" class="org.alfresco.util.ThreadPoolExecutorFactoryBean" singleton="true">
<property name="corePoolSize"> <property name="corePoolSize">
<value>5</value> <value>2</value>
</property>
<property name="maximumPoolSize">
<value>20</value>
</property>
<property name="keepAliveTime">
<value>60</value>
</property> </property>
</bean> </bean>

View File

@ -2117,7 +2117,7 @@ public class ContentDiskDriver extends AlfrescoDiskDriver implements DiskInterfa
* @param sess Server session * @param sess Server session
* @param tree Tree connection * @param tree Tree connection
* @param file Network file details * @param file Network file details
* @param siz New file length * @param size New file length
* @exception java.io.IOException The exception description. * @exception java.io.IOException The exception description.
*/ */
public void truncateFile(SrvSession sess, TreeConnection tree, NetworkFile file, long size) throws IOException public void truncateFile(SrvSession sess, TreeConnection tree, NetworkFile file, long size) throws IOException

View File

@ -24,7 +24,6 @@
*/ */
package org.alfresco.repo.domain; package org.alfresco.repo.domain;
import java.util.Collection;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -71,8 +70,6 @@ public interface Node
public Set<QName> getAspects(); public Set<QName> getAspects();
// public Collection<ChildAssoc> getParentAssocs();
public Map<QName, PropertyValue> getProperties(); public Map<QName, PropertyValue> getProperties();
public DbAccessControlList getAccessControlList(); public DbAccessControlList getAccessControlList();

View File

@ -153,19 +153,6 @@ public class NodeImpl extends LifecycleAdapter implements Node, Serializable
return getUuid().hashCode(); return getUuid().hashCode();
} }
// @Override
// public boolean onDelete(Session session) throws CallbackException
// {
// // check if there is an access control list
// DbAccessControlList acl = getAccessControlList();
// if (acl != null)
// {
// session.delete(acl);
// }
// return NO_VETO;
// }
//
public Long getId() public Long getId()
{ {
return id; return id;

View File

@ -559,4 +559,9 @@ public class DocumentNavigator extends DefaultNavigator implements NamedAccessNa
QName nodeTypeQName = nodeService.getType(nodeRef); QName nodeTypeQName = nodeService.getType(nodeRef);
return dictionaryService.isSubClass(nodeTypeQName, typeQName); return dictionaryService.isSubClass(nodeTypeQName, typeQName);
} }
public Boolean hasAspect(NodeRef nodeRef, QName typeQName)
{
return nodeService.hasAspect(nodeRef, typeQName);
}
} }

View File

@ -225,8 +225,7 @@ public class NodeServiceXPath extends BaseXPath
} }
/** /**
* A boolean function to determine if a node type is a subtype of another * A boolean function to determine if a node type is a subtype of another type
* type
*/ */
static class SubTypeOf implements Function static class SubTypeOf implements Function
{ {
@ -278,6 +277,59 @@ public class NodeServiceXPath extends BaseXPath
} }
} }
/**
* A boolean function to determine if a node has a given aspect
*/
static class HasAspect implements Function
{
public Object call(Context context, List args) throws FunctionCallException
{
if (args.size() != 1)
{
throw new FunctionCallException("hasAspect() requires one argument: hasAspect(QName typeQName)");
}
return evaluate(context.getNodeSet(), args.get(0), context.getNavigator());
}
public Object evaluate(List nodes, Object qnameObj, Navigator nav)
{
if (nodes.size() != 1)
{
return false;
}
// resolve the qname of the type we are checking for
String qnameStr = StringFunction.evaluate(qnameObj, nav);
if (qnameStr.equals("*"))
{
return true;
}
QName typeQName;
if (qnameStr.startsWith("{"))
{
typeQName = QName.createQName(qnameStr);
}
else
{
typeQName = QName.createQName(qnameStr, ((DocumentNavigator) nav).getNamespacePrefixResolver());
}
// resolve the noderef
NodeRef nodeRef = null;
if (nav.isElement(nodes.get(0)))
{
nodeRef = ((ChildAssociationRef) nodes.get(0)).getChildRef();
}
else if (nav.isAttribute(nodes.get(0)))
{
nodeRef = ((DocumentNavigator.Property) nodes.get(0)).parent;
}
DocumentNavigator dNav = (DocumentNavigator) nav;
boolean result = dNav.hasAspect(nodeRef, typeQName);
return result;
}
}
static class Deref implements Function static class Deref implements Function
{ {
@ -643,6 +695,7 @@ public class NodeServiceXPath extends BaseXPath
"ends-with", new EndsWithFunction()); "ends-with", new EndsWithFunction());
registerFunction("", "subtypeOf", new SubTypeOf()); registerFunction("", "subtypeOf", new SubTypeOf());
registerFunction("", "hasAspect", new HasAspect());
registerFunction("", "deref", new Deref()); registerFunction("", "deref", new Deref());
registerFunction("", "like", new Like()); registerFunction("", "like", new Like());
registerFunction("", "contains", new Contains()); registerFunction("", "contains", new Contains());

View File

@ -50,7 +50,7 @@ public abstract class AuthenticationUtil
Result doWork() throws Exception; Result doWork() throws Exception;
} }
private static final String SYSTEM_USER_NAME = "System"; public static final String SYSTEM_USER_NAME = "System";
private AuthenticationUtil() private AuthenticationUtil()
{ {

View File

@ -24,8 +24,9 @@
*/ */
package org.alfresco.util; package org.alfresco.util;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -35,34 +36,64 @@ import org.springframework.beans.factory.InitializingBean;
/** /**
* Factory for {@link java.util.concurrent.ThreadPoolExecutor} instances, * Factory for {@link java.util.concurrent.ThreadPoolExecutor} instances,
* which cannot easily be constructed using constructor injection. * which cannot easily be constructed using constructor injection. This instance
* also allows the setting of the thread-specific properties that would otherwise
* require setting a <code>ThreadFactory</code>.
* <p> * <p>
* This factory provides the a singleton instance of the pool. * This factory provides the a singleton instance of the pool.
* <p>
* Defaults are:
* <ul>
* <li><b>{@link #setCorePoolSize(int) corePoolSize}: </b>
* 20</li>
* <li><b>{@link #setMaximumPoolSize(int) maximumPoolSize}: </b>
* Equal to the {@link #setCorePoolSize(int)} at the time of instance creation</li>
* <li><b>{@link #setKeepAliveTime(int) keepAliveTime}: </b>
* 90 seconds</li>
* <li><b>{@link #setThreadPriority(int) threadPriority}: </b>
* 1 (LOWEST)</li>
* <li><b>{@link #setThreadDaemon(boolean) threadDaemon}: </b>
* true</li>
* <li><b>{@link #setWorkQueue(BlockingQueue) workQueue}: </b>
* An unbounded <code>LinkedBlockingQueue</code></li>
* <li><b>{@link #setRejectedExecutionHandler(RejectedExecutionHandler) rejectedExecutionHandler: </b>
* <code>ThreadPoolExecutor.CallerRunsPolicy</code></li>
* </ul>
* *
* @author Derek Hulley * @author Derek Hulley
*/ */
public class ThreadPoolExecutorFactoryBean implements FactoryBean, InitializingBean public class ThreadPoolExecutorFactoryBean implements FactoryBean, InitializingBean
{ {
private static final int DEFAULT_CORE_POOL_SIZE = 20;
private static final int DEFAULT_MAXIMUM_POOL_SIZE = -1; // -1 is a sign that it must match the core pool size
private static final int DEFAULT_KEEP_ALIVE_TIME = 90; // seconds
private static final int DEFAULT_THREAD_PRIORITY = Thread.MIN_PRIORITY;
private static final boolean DEFAULT_THREAD_DAEMON = Boolean.TRUE;
private static final BlockingQueue<Runnable> DEFAULT_WORK_QUEUE = new LinkedBlockingQueue<Runnable>();
private static final RejectedExecutionHandler DEFAULT_REJECTED_EXECUTION_HANDLER = new ThreadPoolExecutor.CallerRunsPolicy();
private int corePoolSize; private int corePoolSize;
private int maximumPoolSize; private int maximumPoolSize;
private int keepAliveTime; private int keepAliveTime;
private int threadPriority;
private boolean threadDaemon;
private BlockingQueue<Runnable> workQueue; private BlockingQueue<Runnable> workQueue;
private RejectedExecutionHandler rejectedExecutionHandler;
/** the instance that will be given out by the factory */
private ThreadPoolExecutor instance; private ThreadPoolExecutor instance;
/** /**
* Constructor setting default properties: * Constructor setting default properties:
* <ul>
* <li>corePoolSize: 5</li>
* <li>maximumPoolSize: 20</li>
* <li>keepAliveTime: 60s</li>
* <li>workQueue: {@link ArrayBlockingQueue}</li>
* </ul>
*/ */
public ThreadPoolExecutorFactoryBean() public ThreadPoolExecutorFactoryBean()
{ {
corePoolSize = 5; corePoolSize = DEFAULT_CORE_POOL_SIZE;
maximumPoolSize = 20; maximumPoolSize = DEFAULT_MAXIMUM_POOL_SIZE;
keepAliveTime = 30; keepAliveTime = DEFAULT_KEEP_ALIVE_TIME;
threadPriority = DEFAULT_THREAD_PRIORITY;
threadDaemon = DEFAULT_THREAD_DAEMON;
workQueue = DEFAULT_WORK_QUEUE;
rejectedExecutionHandler = DEFAULT_REJECTED_EXECUTION_HANDLER;
} }
/** /**
@ -95,6 +126,27 @@ public class ThreadPoolExecutorFactoryBean implements FactoryBean, InitializingB
this.keepAliveTime = keepAliveTime; this.keepAliveTime = keepAliveTime;
} }
/**
* The priority that all threads must have on the scale of 1 to 10,
* where 1 has the lowest priority and 10 has the highest priority.
*
* @param threadPriority the thread priority
*/
public void setThreadPriority(int threadPriority)
{
this.threadPriority = threadPriority;
}
/**
* Set whether the threads run as daemon threads or not.
*
* @param threadDaemon <tt>true</tt> to run as daemon
*/
public void setThreadDaemon(boolean threadDaemon)
{
this.threadDaemon = threadDaemon;
}
/** /**
* The optional queue instance to use * The optional queue instance to use
* *
@ -104,15 +156,40 @@ public class ThreadPoolExecutorFactoryBean implements FactoryBean, InitializingB
{ {
this.workQueue = workQueue; this.workQueue = workQueue;
} }
/**
* The optional handler for when tasks cannot be submitted to the queue.
* The default is the <code>CallerRunsPolicy</code>.
*
* @param rejectedExecutionHandler the handler to use
*/
public void setRejectedExecutionHandler(RejectedExecutionHandler rejectedExecutionHandler)
{
this.rejectedExecutionHandler = rejectedExecutionHandler;
}
public void afterPropertiesSet() throws Exception public void afterPropertiesSet() throws Exception
{ {
if (workQueue == null) // if the maximum pool size has not been set, change it to match the core pool size
if (maximumPoolSize == DEFAULT_MAXIMUM_POOL_SIZE)
{ {
workQueue = new ArrayBlockingQueue<Runnable>(corePoolSize); maximumPoolSize = corePoolSize;
} }
// We need a thread factory
TraceableThreadFactory threadFactory = new TraceableThreadFactory();
threadFactory.setThreadDaemon(threadDaemon);
threadFactory.setThreadPriority(threadPriority);
// construct the instance // construct the instance
instance = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, workQueue); instance = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
TimeUnit.SECONDS,
workQueue,
threadFactory,
rejectedExecutionHandler);
} }
/** /**

View File

@ -0,0 +1,101 @@
/*
* Copyright (C) 2005-2007 Alfresco Software Limited.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
* As a special exception to the terms and conditions of version 2.0 of
* the GPL, you may redistribute this Program in connection with Free/Libre
* and Open Source Software ("FLOSS") applications as described in Alfresco's
* FLOSS exception. You should have recieved a copy of the text describing
* the FLOSS exception, and it is also available here:
* http://www.alfresco.com/legal/licensing"
*/
package org.alfresco.util;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
/**
* A thread factory that spawns threads that are statically visible. Each factory uses a unique
* thread group. All the groups that have been used can be fetched using
* {@link #getActiveThreadGroups()}, allowing iteration of the the threads in the group.
*
* @since 2.1
* @author Derek Hulley
*/
public class TraceableThreadFactory implements ThreadFactory
{
private static final AtomicInteger factoryNumber = new AtomicInteger(1);
private static List<ThreadGroup> activeThreadGroups = Collections.synchronizedList(new ArrayList<ThreadGroup>(1));
/**
* Get a list of thread groups registered by the factory.
*
* @return Returns a snapshot of thread groups
*/
public static List<ThreadGroup> getActiveThreadGroups()
{
return activeThreadGroups;
}
private final ThreadGroup group;
private final String namePrefix;
private final AtomicInteger threadNumber;
private boolean threadDaemon;
private int threadPriority;
TraceableThreadFactory()
{
this.group = new ThreadGroup("TraceableThreadGroup-" + factoryNumber.getAndIncrement());
TraceableThreadFactory.activeThreadGroups.add(this.group);
this.namePrefix = "TraceableThread-" + factoryNumber.getAndIncrement() + "-thread-";
this.threadNumber = new AtomicInteger(1);
}
/**
* @param daemon <tt>true</tt> if all threads created must be daemon threads
*/
public void setThreadDaemon(boolean daemon)
{
this.threadDaemon = daemon;
}
/**
*
* @param threadPriority the threads priority from 1 (lowest) to 10 (highest)
*/
public void setThreadPriority(int threadPriority)
{
this.threadPriority = threadPriority;
}
public Thread newThread(Runnable r)
{
Thread thread = new Thread(
group,
r,
namePrefix + threadNumber.getAndIncrement(),
0);
thread.setDaemon(threadDaemon);
thread.setPriority(threadPriority);
return thread;
}
}