listeners)
{
this.listeners = listeners;
}
/**
* Provides transactional callbacks to the listeners
*/
public void afterReturning(Object returnValue, Method method, Object[] args, Object target) throws Throwable
{
// check for specific events
if (method.getName().equals("close"))
{
fireChannelClosed();
}
}
private void fireChannelClosed()
{
if (listeners.size() == 0)
{
// nothing to do
return;
}
RetryingTransactionHelper.Callback cb =
new RetryingTransactionHelper.Callback()
{
public Object execute()
{
for (ContentStreamListener listener : listeners)
{
listener.contentStreamClosed();
}
return null;
}
};
if (transactionHelper != null)
{
// Execute in transaction.
transactionHelper.doInTransaction(cb, false);
}
else
{
try
{
cb.execute();
}
catch (Exception e)
{
throw new ContentIOException("Failed to executed channel close callbacks", e);
}
}
// done
if (logger.isDebugEnabled())
{
logger.debug("" + listeners.size() + " content listeners called: close");
}
}
}
/**
* Wraps a FileChannel
to provide callbacks to listeners when the
* channel is {@link java.nio.channels.Channel#close() closed}.
*
* This class is unfortunately necessary as the {@link FileChannel} doesn't have
* an single interface defining its methods, making it difficult to put an
* advice around the methods that require overriding.
*
* @author Derek Hulley
*/
protected class CallbackFileChannel extends FileChannel
{
/** the channel to route all calls to */
private FileChannel delegate;
/** listeners waiting for the stream close */
private List listeners;
/**
* @param delegate the channel that will perform the work
* @param listeners listeners for events coming from this channel
*/
public CallbackFileChannel(
FileChannel delegate,
List listeners)
{
if (delegate == null)
{
throw new IllegalArgumentException("FileChannel delegate is required");
}
if (delegate instanceof CallbackFileChannel)
{
throw new IllegalArgumentException("FileChannel delegate may not be a CallbackFileChannel");
}
this.delegate = delegate;
this.listeners = listeners;
}
/**
* Closes the channel and makes the callbacks to the listeners
*/
@Override
protected void implCloseChannel() throws IOException
{
delegate.close();
fireChannelClosed();
}
/**
* Helper method to notify stream listeners
*/
private void fireChannelClosed()
{
if (listeners.size() == 0)
{
// nothing to do
return;
}
// We're now doing this in a retrying transaction, which means
// that the body of execute() must be idempotent.
RetryingTransactionHelper.Callback cb =
new RetryingTransactionHelper.Callback()
{
public Object execute()
{
for (ContentStreamListener listener : listeners)
{
listener.contentStreamClosed();
}
return null;
}
};
// We're now doing this inside a Retrying transaction.
// NB
if (transactionHelper != null)
{
// just create a transaction
transactionHelper.doInTransaction(cb, false);
}
else
{
try
{
cb.execute();
}
catch (Exception e)
{
throw new ContentIOException("Failed to executed channel close callbacks", e);
}
}
// done
if (logger.isDebugEnabled())
{
logger.debug("" + listeners.size() + " content listeners called: close");
}
}
@Override
public void force(boolean metaData) throws IOException
{
delegate.force(metaData);
}
@Override
public FileLock lock(long position, long size, boolean shared) throws IOException
{
return delegate.lock(position, size, shared);
}
@Override
public MappedByteBuffer map(MapMode mode, long position, long size) throws IOException
{
return delegate.map(mode, position, size);
}
@Override
public long position() throws IOException
{
return delegate.position();
}
@Override
public FileChannel position(long newPosition) throws IOException
{
return delegate.position(newPosition);
}
@Override
public int read(ByteBuffer dst) throws IOException
{
return delegate.read(dst);
}
@Override
public int read(ByteBuffer dst, long position) throws IOException
{
return delegate.read(dst, position);
}
@Override
public long read(ByteBuffer[] dsts, int offset, int length) throws IOException
{
return delegate.read(dsts, offset, length);
}
@Override
public long size() throws IOException
{
return delegate.size();
}
@Override
public long transferFrom(ReadableByteChannel src, long position, long count) throws IOException
{
return delegate.transferFrom(src, position, count);
}
@Override
public long transferTo(long position, long count, WritableByteChannel target) throws IOException
{
return delegate.transferTo(position, count, target);
}
@Override
public FileChannel truncate(long size) throws IOException
{
return delegate.truncate(size);
}
@Override
public FileLock tryLock(long position, long size, boolean shared) throws IOException
{
return delegate.tryLock(position, size, shared);
}
@Override
public int write(ByteBuffer src) throws IOException
{
return delegate.write(src);
}
@Override
public int write(ByteBuffer src, long position) throws IOException
{
return delegate.write(src, position);
}
@Override
public long write(ByteBuffer[] srcs, int offset, int length) throws IOException
{
return delegate.write(srcs, offset, length);
}
}
}