/* * Copyright (C) 2005-2007 Alfresco Software Limited. * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License * as published by the Free Software Foundation; either version 2 * of the License, or (at your option) any later version. * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * As a special exception to the terms and conditions of version 2.0 of * the GPL, you may redistribute this Program in connection with Free/Libre * and Open Source Software ("FLOSS") applications as described in Alfresco's * FLOSS exception. You should have recieved a copy of the text describing * the FLOSS exception, and it is also available here: * http://www.alfresco.com/legal/licensing" */ package org.alfresco.filesys.server.oncrpc; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; /** * ONC/RPC Request Thread Pool Class * *

Processes RPC requests using a pool of worker threads. * * @author GKSpencer */ public class RpcRequestThreadPool { // Debug logging private static final Log logger = LogFactory.getLog(RpcRequestThreadPool.class); // Default/minimum/maximum number of worker threads to use public static final int DefaultWorkerThreads = 8; public static final int MinimumWorkerThreads = 4; public static final int MaximumWorkerThreads = 50; // Queue of RPC requests private RpcRequestQueue m_queue; // Worker threads private ThreadWorker[] m_workers; // RPC dispatcher private RpcProcessor m_rpcProcessor; /** * Thread Worker Inner Class */ protected class ThreadWorker implements Runnable { // Worker thread private Thread mi_thread; // Worker unique id private int mi_id; // Shutdown flag private boolean mi_shutdown = false; /** * Class constructor * * @param name String * @param id int */ public ThreadWorker(String name, int id) { // Save the thread id mi_id = id; // Create the worker thread mi_thread = new Thread(this); mi_thread.setName(name); mi_thread.setDaemon(true); mi_thread.start(); } /** * Request the worker thread to shutdown */ public final void shutdownRequest() { mi_shutdown = true; try { mi_thread.interrupt(); } catch (Exception ex) { } } /** * Run the thread */ public void run() { // Loop until shutdown RpcPacket rpc = null; RpcPacket response = null; while (mi_shutdown == false) { try { // Wait for an RPC request to be queued rpc = m_queue.removeRequest(); } catch (InterruptedException ex) { // Check for shutdown if (mi_shutdown == true) break; } // If the request is valid process it if (rpc != null) { try { // Process the request response = m_rpcProcessor.processRpc(rpc); if (response != null) response.getPacketHandler().sendRpcResponse(response); } catch (Throwable ex) { // Do not display errors if shutting down if (mi_shutdown == false) { if ( logger.isDebugEnabled()) { logger.debug("Worker " + Thread.currentThread().getName() + ":"); logger.debug(ex); } } } finally { // Release the RPC packet(s) back to the packet pool if (rpc.getClientProtocol() == Rpc.TCP && rpc.isAllocatedFromPool()) rpc.getOwnerPacketPool().releasePacket(rpc); if (response != null && response.getClientProtocol() == Rpc.TCP && response.getBuffer() != rpc.getBuffer() && response.isAllocatedFromPool()) response.getOwnerPacketPool().releasePacket(response); } } } } }; /** * Class constructor * * @param threadName String * @param rpcServer RpcProcessor * @param pktHandler PacketHandlerInterface */ public RpcRequestThreadPool(String threadName, RpcProcessor rpcServer) { this(threadName, DefaultWorkerThreads, rpcServer); } /** * Class constructor * * @param threadName String * @param poolSize int * @param rpcServer RpcProcessor */ public RpcRequestThreadPool(String threadName, int poolSize, RpcProcessor rpcServer) { // Save the RPC handler m_rpcProcessor = rpcServer; // Create the request queue m_queue = new RpcRequestQueue(); // Check that we have at least minimum worker threads if (poolSize < MinimumWorkerThreads) poolSize = MinimumWorkerThreads; // Create the worker threads m_workers = new ThreadWorker[poolSize]; for (int i = 0; i < m_workers.length; i++) m_workers[i] = new ThreadWorker(threadName + (i + 1), i); } /** * Return the number of requests in the queue * * @return int */ public final int getNumberOfRequests() { return m_queue.numberOfRequests(); } /** * Queue an RPC request to the thread pool for processing * * @param rpc RpcPacket */ public final void queueRpcRequest(RpcPacket pkt) { m_queue.addRequest(pkt); } /** * Shutdown the thread pool and release all resources */ public void shutdownThreadPool() { // Shutdown the worker threads if (m_workers != null) { for (int i = 0; i < m_workers.length; i++) m_workers[i].shutdownRequest(); } } }