/* * Copyright (C) 2005 Alfresco, Inc. * * Licensed under the Mozilla Public License version 1.1 * with a permitted attribution clause. You may obtain a * copy of the License at * * http://www.alfresco.org/legal/license.txt * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, * either express or implied. See the License for the specific * language governing permissions and limitations under the * License. */ package org.alfresco.repo.content; import java.io.BufferedInputStream; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; import java.io.Reader; import java.nio.channels.Channels; import java.nio.channels.FileChannel; import java.nio.channels.ReadableByteChannel; import java.util.ArrayList; import java.util.List; import org.alfresco.error.AlfrescoRuntimeException; import org.alfresco.repo.content.filestore.FileContentWriter; import org.alfresco.service.cmr.repository.ContentAccessor; import org.alfresco.service.cmr.repository.ContentIOException; import org.alfresco.service.cmr.repository.ContentReader; import org.alfresco.service.cmr.repository.ContentStreamListener; import org.alfresco.util.TempFileProvider; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.aop.framework.ProxyFactory; import org.springframework.util.FileCopyUtils; /** * Implements all the convenience methods of the interface. The only methods * that need to be implemented, i.e. provide low-level content access are: * * * @author Derek Hulley */ public abstract class AbstractContentReader extends AbstractContentAccessor implements ContentReader { private static final Log logger = LogFactory.getLog(AbstractContentReader.class); private List listeners; private ReadableByteChannel channel; /** * @param contentUrl the content URL - this should be relative to the root of the store * and not absolute: to enable moving of the stores */ protected AbstractContentReader(String contentUrl) { super(contentUrl); listeners = new ArrayList(2); } /** * Adds the listener after checking that the output stream isn't already in * use. */ public synchronized void addListener(ContentStreamListener listener) { if (channel != null) { throw new RuntimeException("Channel is already in use"); } listeners.add(listener); } /** * A factory method for subclasses to implement that will ensure the proper * implementation of the {@link ContentReader#getReader()} method. *

* Only the instance need be constructed. The required mimetype, encoding, etc * will be copied across by this class. * * @return Returns a reader onto the location referenced by this instance. * The instance must always be a new instance. * @throws ContentIOException */ protected abstract ContentReader createReader() throws ContentIOException; /** * Performs checks and copies required reader attributes */ public final ContentReader getReader() throws ContentIOException { ContentReader reader = createReader(); if (reader == null) { throw new AlfrescoRuntimeException("ContentReader failed to create new reader: \n" + " reader: " + this); } else if (reader.getContentUrl() == null || !reader.getContentUrl().equals(getContentUrl())) { throw new AlfrescoRuntimeException("ContentReader has different URL: \n" + " reader: " + this + "\n" + " new reader: " + reader); } // copy across common attributes reader.setMimetype(this.getMimetype()); reader.setEncoding(this.getEncoding()); // done if (logger.isDebugEnabled()) { logger.debug("Reader spawned new reader: \n" + " reader: " + this + "\n" + " new reader: " + reader); } return reader; } /** * An automatically created listener sets the flag */ public synchronized final boolean isClosed() { if (channel != null) { return !channel.isOpen(); } else { return false; } } /** helper implementation for base class */ protected boolean isChannelOpen() { if (channel != null) { return channel.isOpen(); } else { return false; } } /** * Provides low-level access to read content from the repository. *

* This is the only of the content reading methods that needs to be implemented * by derived classes. All other content access methods make use of this in their * underlying implementations. * * @return Returns a channel from which content can be read * @throws ContentIOException if the channel could not be opened or the underlying content * has disappeared */ protected abstract ReadableByteChannel getDirectReadableChannel() throws ContentIOException; /** * Create a channel that performs callbacks to the given listeners. * * @param directChannel the result of {@link #getDirectReadableChannel()} * @param listeners the listeners to call * @return Returns a channel * @throws ContentIOException */ private ReadableByteChannel getCallbackReadableChannel( ReadableByteChannel directChannel, List listeners) throws ContentIOException { ReadableByteChannel callbackChannel = null; if (directChannel instanceof FileChannel) { callbackChannel = getCallbackFileChannel((FileChannel) directChannel, listeners); } else { // introduce an advistor to handle the callbacks to the listeners ChannelCloseCallbackAdvise advise = new ChannelCloseCallbackAdvise(listeners); ProxyFactory proxyFactory = new ProxyFactory(directChannel); proxyFactory.addAdvice(advise); callbackChannel = (ReadableByteChannel) proxyFactory.getProxy(); } // done if (logger.isDebugEnabled()) { logger.debug("Created callback byte channel: \n" + " original: " + directChannel + "\n" + " new: " + callbackChannel); } return callbackChannel; } /** * @see #getDirectReadableChannel() * @see #getCallbackReadableChannel(ReadableByteChannel, List) */ public synchronized final ReadableByteChannel getReadableChannel() throws ContentIOException { // this is a use-once object if (channel != null) { throw new RuntimeException("A channel has already been opened"); } ReadableByteChannel directChannel = getDirectReadableChannel(); channel = getCallbackReadableChannel(directChannel, listeners); // notify that the channel was opened super.channelOpened(); // done if (logger.isDebugEnabled()) { logger.debug("Opened channel onto content: " + this); } return channel; } /** * @inheritDoc */ public FileChannel getFileChannel() throws ContentIOException { /* * Where the underlying support is not present for this method, a temporary * file will be used as a substitute. When the write is complete, the * results are copied directly to the underlying channel. */ // get the underlying implementation's best readable channel channel = getReadableChannel(); // now use this channel if it can provide the random access, otherwise spoof it FileChannel clientFileChannel = null; if (channel instanceof FileChannel) { // all the support is provided by the underlying implementation clientFileChannel = (FileChannel) channel; // debug if (logger.isDebugEnabled()) { logger.debug("Content reader provided direct support for FileChannel: \n" + " reader: " + this); } } else { // No random access support is provided by the implementation. // Spoof it by providing a 2-stage read from a temp file File tempFile = TempFileProvider.createTempFile("random_read_spoof_", ".bin"); FileContentWriter spoofWriter = new FileContentWriter(tempFile); // pull the content in from the underlying channel FileChannel spoofWriterChannel = spoofWriter.getFileChannel(false); try { long spoofFileSize = this.getSize(); spoofWriterChannel.transferFrom(channel, 0, spoofFileSize); } catch (IOException e) { throw new ContentIOException("Failed to copy from permanent channel to spoofed temporary channel: \n" + " reader: " + this + "\n" + " temp: " + spoofWriter, e); } finally { try { spoofWriterChannel.close(); } catch (IOException e) {} } // get a reader onto the spoofed content final ContentReader spoofReader = spoofWriter.getReader(); // Attach a listener // - ensure that the close call gets propogated to the underlying channel ContentStreamListener spoofListener = new ContentStreamListener() { public void contentStreamClosed() throws ContentIOException { try { channel.close(); } catch (IOException e) { throw new ContentIOException("Failed to close underlying channel", e); } } }; spoofReader.addListener(spoofListener); // we now have the spoofed up channel that the client can work with clientFileChannel = spoofReader.getFileChannel(); // debug if (logger.isDebugEnabled()) { logger.debug("Content writer provided indirect support for FileChannel: \n" + " writer: " + this + "\n" + " temp writer: " + spoofWriter); } } // the file is now available for random access return clientFileChannel; } /** * @see Channels#newInputStream(java.nio.channels.ReadableByteChannel) */ public InputStream getContentInputStream() throws ContentIOException { try { ReadableByteChannel channel = getReadableChannel(); InputStream is = new BufferedInputStream(Channels.newInputStream(channel)); // done return is; } catch (Throwable e) { throw new ContentIOException("Failed to open stream onto channel: \n" + " accessor: " + this, e); } } /** * Copies the {@link #getContentInputStream() input stream} to the given * OutputStream */ public final void getContent(OutputStream os) throws ContentIOException { try { InputStream is = getContentInputStream(); FileCopyUtils.copy(is, os); // both streams are closed // done } catch (IOException e) { throw new ContentIOException("Failed to copy content to output stream: \n" + " accessor: " + this, e); } } public final void getContent(File file) throws ContentIOException { try { InputStream is = getContentInputStream(); FileOutputStream os = new FileOutputStream(file); FileCopyUtils.copy(is, os); // both streams are closed // done } catch (IOException e) { throw new ContentIOException("Failed to copy content to file: \n" + " accessor: " + this + "\n" + " file: " + file, e); } } public final String getContentString(int length) throws ContentIOException { if (length < 0 || length > Integer.MAX_VALUE) { throw new IllegalArgumentException("Character count must be positive and within range"); } Reader reader = null; try { // just create buffer of the required size char[] buffer = new char[length]; String encoding = getEncoding(); // create a reader from the input stream if (encoding == null) { reader = new InputStreamReader(getContentInputStream()); } else { reader = new InputStreamReader(getContentInputStream(), encoding); } // read it all, if possible int count = reader.read(buffer, 0, length); // there may have been fewer characters - create a new string String result = new String(buffer, 0, count); // done return result; } catch (IOException e) { throw new ContentIOException("Failed to copy content to string: \n" + " accessor: " + this + "\n" + " length: " + length, e); } finally { if (reader != null) { try { reader.close(); } catch (Throwable e) { logger.error(e); } } } } /** * Makes use of the encoding, if available, to convert bytes to a string. *

* All the content is streamed into memory. So, like the interface said, * be careful with this method. * * @see ContentAccessor#getEncoding() */ public final String getContentString() throws ContentIOException { try { // read from the stream into a byte[] InputStream is = getContentInputStream(); ByteArrayOutputStream os = new ByteArrayOutputStream(); FileCopyUtils.copy(is, os); // both streams are closed byte[] bytes = os.toByteArray(); // get the encoding for the string String encoding = getEncoding(); // create the string from the byte[] using encoding if necessary String content = (encoding == null) ? new String(bytes) : new String(bytes, encoding); // done return content; } catch (IOException e) { throw new ContentIOException("Failed to copy content to string: \n" + " accessor: " + this, e); } } }