REPO-3745 Fix tx retry for CMIS Service (#164)

REPO-3745: HotFix: MNT-19881: CLONE - Multi-threaded check-in and check-out throwing exception using CMIS API
   Remove temp file caching in CMIS service
   Add tx aware wrapper for CMIS Holder
   Correct names and add a comment
   Add synchronization check before binding listener
This commit is contained in:
Alex Mukha
2018-09-03 08:34:28 +01:00
committed by Alexandru-Eusebiu Epure
parent 6a0eb891fc
commit 701addecb2
6 changed files with 343 additions and 163 deletions

View File

@@ -55,6 +55,7 @@ public class AlfrescoCmisServiceFactory extends AbstractServiceFactory
private AlfrescoCmisExceptionInterceptor cmisExceptions;
private AlfrescoCmisServiceInterceptor cmisControl;
private AlfrescoCmisStreamInterceptor cmisStreams;
private CMISTransactionAwareHolderInterceptor cmisHolder;
private AuthorityService authorityService;
/**
@@ -133,6 +134,11 @@ public class AlfrescoCmisServiceFactory extends AbstractServiceFactory
this.cmisStreams = cmisStreams;
}
public void setCmisHolder(CMISTransactionAwareHolderInterceptor cmisHolder)
{
this.cmisHolder = cmisHolder;
}
@Override
public void init(Map<String, String> parameters)
{
@@ -195,6 +201,7 @@ public class AlfrescoCmisServiceFactory extends AbstractServiceFactory
proxyFactory.addAdvice(cmisControl);
proxyFactory.addAdvice(cmisStreams);
proxyFactory.addAdvice(cmisTransactions);
proxyFactory.addAdvice(cmisHolder);
AlfrescoCmisService cmisService = (AlfrescoCmisService) proxyFactory.getProxy();
ConformanceCmisServiceWrapper wrapperService = new ConformanceCmisServiceWrapper(

View File

@@ -26,13 +26,10 @@
package org.alfresco.opencmis;
import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.math.BigInteger;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -87,7 +84,6 @@ import org.alfresco.service.namespace.NamespaceService;
import org.alfresco.service.namespace.QName;
import org.alfresco.service.namespace.RegexQNamePattern;
import org.alfresco.util.Pair;
import org.alfresco.util.TempFileProvider;
import org.apache.chemistry.opencmis.commons.PropertyIds;
import org.apache.chemistry.opencmis.commons.data.Acl;
import org.apache.chemistry.opencmis.commons.data.AllowableActions;
@@ -1299,31 +1295,15 @@ public class AlfrescoCmisServiceImpl extends AbstractCmisService implements Alfr
connector.applyACL(nodeRef, type, addAces, removeAces);
// handle content
File tempFile = null;
try
{
if (contentStream != null)
{
// write content
String mimeType = parseMimeType(contentStream);
// copy stream to temp file
// OpenCMIS does this for us ....
tempFile = copyToTempFile(contentStream);
String encoding = getEncoding(tempFile, mimeType);
String encoding = getEncoding(contentStream.getStream(), mimeType);
ContentWriter writer = connector.getFileFolderService().getWriter(nodeRef);
writer.setMimetype(mimeType);
writer.setEncoding(encoding);
writer.putContent(tempFile);
}
}
finally
{
if(tempFile != null)
{
removeTempFile(tempFile);
}
writer.putContent(contentStream.getStream());
}
connector.extractMetadata(nodeRef);
@@ -1333,8 +1313,6 @@ public class AlfrescoCmisServiceImpl extends AbstractCmisService implements Alfr
connector.applyVersioningState(nodeRef, versioningState);
removeTempFile(tempFile);
String objectId = connector.createObjectId(nodeRef);
connector.getActivityPoster().postFileFolderAdded(nodeRef);
@@ -1597,21 +1575,11 @@ public class AlfrescoCmisServiceImpl extends AbstractCmisService implements Alfr
public Void execute() throws Throwable
{
String mimeType = parseMimeType(contentStream);
final File tempFile = copyToTempFile(contentStream);
String encoding = getEncoding(tempFile, mimeType);
try
{
String encoding = getEncoding(contentStream.getStream(), mimeType);
ContentWriter writer = connector.getFileFolderService().getWriter(nodeRef);
writer.setMimetype(mimeType);
writer.setEncoding(encoding);
writer.putContent(tempFile);
}
finally
{
removeTempFile(tempFile);
}
writer.putContent(contentStream.getStream());
connector.getActivityPoster().postFileFolderUpdated(info.isFolder(), nodeRef);
return null;
}
@@ -2358,9 +2326,6 @@ public class AlfrescoCmisServiceImpl extends AbstractCmisService implements Alfr
final NodeRef nodeRef = info.getNodeRef();
final TypeDefinitionWrapper type = info.getType();
// copy stream to temp file
final File tempFile = copyToTempFile(contentStream);
// check in
// update PWC
connector.setProperties(nodeRef, type, properties,
@@ -2372,12 +2337,12 @@ public class AlfrescoCmisServiceImpl extends AbstractCmisService implements Alfr
if (contentStream != null)
{
String mimeType = parseMimeType(contentStream);
String encoding = getEncoding(tempFile, mimeType);
String encoding = getEncoding(contentStream.getStream(), mimeType);
// write content
ContentWriter writer = connector.getFileFolderService().getWriter(nodeRef);
writer.setMimetype(mimeType);
writer.setEncoding(encoding);
writer.putContent(tempFile);
writer.putContent(contentStream.getStream());
}
// create version properties
@@ -2395,8 +2360,6 @@ public class AlfrescoCmisServiceImpl extends AbstractCmisService implements Alfr
connector.getActivityPoster().postFileFolderUpdated(info.isFolder(), newNodeRef);
objectId.setValue(connector.createObjectId(newNodeRef));
removeTempFile(tempFile);
}
@Override
@@ -2893,7 +2856,6 @@ public class AlfrescoCmisServiceImpl extends AbstractCmisService implements Alfr
}
}
}
return info;
}
@@ -3165,14 +3127,14 @@ public class AlfrescoCmisServiceImpl extends AbstractCmisService implements Alfr
/**
* Inspired from NodesImpl.guessEncoding method.
*
* @param tempFile can be null;
* @param inputStream can be null;
* @param mimeType can be null;
* @return the encoding detected. never null;
*/
private String getEncoding(File tempFile, String mimeType)
private String getEncoding(InputStream inputStream, String mimeType)
{
String defaultEncoding = "UTF-8";
if (tempFile == null)
if (inputStream == null)
{
return defaultEncoding;
}
@@ -3180,7 +3142,7 @@ public class AlfrescoCmisServiceImpl extends AbstractCmisService implements Alfr
InputStream tfis = null;
try
{
tfis = new BufferedInputStream(new FileInputStream(tempFile));
tfis = new BufferedInputStream(inputStream);
ContentCharsetFinder charsetFinder = connector.getMimetypeService().getContentCharsetFinder();
return charsetFinder.getCharset(tfis, mimeType).name();
}
@@ -3189,12 +3151,6 @@ public class AlfrescoCmisServiceImpl extends AbstractCmisService implements Alfr
throw new CmisStorageException("Unable to read content: " + e.getMessage(), e);
}
finally
{
closeInputStream(tfis);
}
}
private void closeInputStream(InputStream tfis)
{
if (tfis != null)
{
@@ -3208,56 +3164,6 @@ public class AlfrescoCmisServiceImpl extends AbstractCmisService implements Alfr
}
}
}
private File copyToTempFile(ContentStream contentStream)
{
if (contentStream == null)
{
return null;
}
int bufferSize = 40 * 1024;
File result = null;
try
{
InputStream in = null;
if (contentStream.getStream() != null)
{
in = new BufferedInputStream(contentStream.getStream(), bufferSize);
}
result = TempFileProvider.createTempFile(in, "cmis", "content");
}
catch (Exception e)
{
throw new CmisStorageException("Unable to store content: " + e.getMessage(), e);
}
if ((contentStream.getLength() > -1) && (result == null || contentStream.getLength() != result.length()))
{
removeTempFile(result);
throw new CmisStorageException("Expected " + contentStream.getLength() + " bytes but retrieved " +
(result == null ? -1 :result.length()) + " bytes!");
}
return result;
}
private void removeTempFile(File tempFile)
{
if (tempFile == null)
{
return;
}
try
{
tempFile.delete();
}
catch (Exception e)
{
// ignore - file will be removed by TempFileProvider
}
}
@Override

View File

@@ -25,20 +25,25 @@
*/
package org.alfresco.opencmis;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import org.alfresco.error.AlfrescoRuntimeException;
import org.alfresco.repo.content.filestore.FileContentReader;
import org.alfresco.service.cmr.repository.MimetypeService;
import org.alfresco.util.TempFileProvider;
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.apache.chemistry.opencmis.commons.data.ContentStream;
import org.apache.chemistry.opencmis.commons.impl.dataobjects.ContentStreamImpl;
/**
* Interceptor to deal with ContentStreams, determining mime type if appropriate.
*
* Note: this used to cache content stream to a local file so that retrying transaction behaviour
* worked properly; this is now done in the Chemsitry OpenCMIS layer so no need to do it again
* here.
*
* @author steveglover
* @author Alan Davis
* @since 4.0.2.26 / 4.1.4
@@ -56,26 +61,103 @@ public class AlfrescoCmisStreamInterceptor implements MethodInterceptor
}
public Object invoke(MethodInvocation mi) throws Throwable
{
List<ReusableContentStream> reusableContentStreams = null;
try
{
Class<?>[] parameterTypes = mi.getMethod().getParameterTypes();
Object[] arguments = mi.getArguments();
for (int i=0; i<parameterTypes.length; i++)
{
if (arguments[i] instanceof ContentStreamImpl)
if (ContentStream.class.isAssignableFrom(parameterTypes[i]))
{
ContentStreamImpl contentStream = (ContentStreamImpl) arguments[i];
ContentStream contentStream = (ContentStream) arguments[i];
if (contentStream != null)
{
if (reusableContentStreams == null)
{
reusableContentStreams = new ArrayList<ReusableContentStream>();
}
// reusable streams are required for buffering in case of tx retry
ReusableContentStream reusableContentStream = new ReusableContentStream(contentStream);
// ALF-18006
if (contentStream.getMimeType() == null)
{
InputStream stream = contentStream.getStream();
String mimeType = mimetypeService.guessMimetype(contentStream.getFileName(), stream);
contentStream.setMimeType(mimeType);
String mimeType = mimetypeService.guessMimetype(reusableContentStream.getFileName(), new FileContentReader(reusableContentStream.file));
reusableContentStream.setMimeType(mimeType);
}
reusableContentStreams.add(reusableContentStream);
arguments[i] = reusableContentStream;
}
}
}
return mi.proceed();
}
finally
{
if (reusableContentStreams != null)
{
for (ReusableContentStream contentStream: reusableContentStreams)
{
contentStream.close();
}
}
}
}
private static class ReusableContentStream extends ContentStreamImpl
{
private static final long serialVersionUID = 8992465629472248502L;
private File file;
public ReusableContentStream(ContentStream contentStream) throws Exception
{
setLength(contentStream.getBigLength());
setMimeType(contentStream.getMimeType());
setFileName(contentStream.getFileName());
file = TempFileProvider.createTempFile(contentStream.getStream(), "cmis", "contentStream");
}
@Override
public InputStream getStream() {
InputStream stream = super.getStream();
if (stream == null && file != null)
{
try
{
stream = new FileInputStream(file)
{
@Override
public void close() throws IOException
{
setStream(null);
super.close();
}
};
}
catch (Exception e)
{
throw new AlfrescoRuntimeException("Expected to be able to reopen temporary file", e);
}
setStream(stream);
}
return stream;
}
public void close()
{
try
{
file.delete();
}
finally
{
file = null;
}
}
}
}

View File

@@ -0,0 +1,56 @@
/*
* #%L
* Alfresco Repository
* %%
* Copyright (C) 2005 - 2018 Alfresco Software Limited
* %%
* This file is part of the Alfresco software.
* If the software was purchased under a paid Alfresco license, the terms of
* the paid license agreement will prevail. Otherwise, the software is
* provided under the following open source license terms:
*
* Alfresco is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Alfresco is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
* #L%
*/
package org.alfresco.opencmis;
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.apache.chemistry.opencmis.commons.spi.Holder;
/**
* The interceptor wraps {@link Holder} class in {@link TransactionAwareHolder}.
* This is designed specifically for CMIS Service.
*
* @author alex.mukha
*/
public class CMISTransactionAwareHolderInterceptor implements MethodInterceptor
{
@Override
@SuppressWarnings("unchecked")
public Object invoke(MethodInvocation invocation) throws Throwable
{
Class<?>[] parameterTypes = invocation.getMethod().getParameterTypes();
Object[] arguments = invocation.getArguments();
for (int i = 0; i < parameterTypes.length; i++)
{
if (Holder.class.isAssignableFrom(parameterTypes[i]) && arguments[i] != null)
{
TransactionAwareHolder txnHolder = new TransactionAwareHolder(((Holder) arguments[i]));
arguments[i] = txnHolder;
}
}
return invocation.proceed();
}
}

View File

@@ -0,0 +1,126 @@
/*
* #%L
* Alfresco Repository
* %%
* Copyright (C) 2005 - 2018 Alfresco Software Limited
* %%
* This file is part of the Alfresco software.
* If the software was purchased under a paid Alfresco license, the terms of
* the paid license agreement will prevail. Otherwise, the software is
* provided under the following open source license terms:
*
* Alfresco is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Alfresco is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with Alfresco. If not, see <http://www.gnu.org/licenses/>.
* #L%
*/
package org.alfresco.opencmis;
import org.alfresco.repo.transaction.AlfrescoTransactionSupport;
import org.alfresco.repo.transaction.RetryingTransactionInterceptor;
import org.alfresco.util.transaction.TransactionListenerAdapter;
import org.apache.chemistry.opencmis.commons.spi.Holder;
import org.springframework.transaction.support.TransactionSynchronizationManager;
/**
* A Tx aware wrapper around {@link Holder}.
*
* <p>
* This wrapper is created in {@link CMISTransactionAwareHolderInterceptor}.
* It is designed to handle the state of the {@link Holder} in case of tx retries which are handled by
* {@link RetryingTransactionInterceptor}.
* </p>
* <p>
* There are a few things that influenced the implementation of this wrapper and need to be taken into account:
* <ul>
* <li>
* The wrapper is created in {@link CMISTransactionAwareHolderInterceptor} and is replacing the incoming
* parameter ({@link Holder}) in the call to {@link AlfrescoCmisServiceImpl}.
* </li>
* <li>
* The calls to {@link AlfrescoCmisServiceImpl} generally return nothing, therefore the state
* of {@link Holder} or it's wrapper ({@link TransactionAwareHolder}) is modified inside
* the {@link AlfrescoCmisServiceImpl} and then read in CMIS layer.
* </li>
* <li>
* The {@link CMISTransactionAwareHolderInterceptor} is called after {@link RetryingTransactionInterceptor}
* but due to internal counter in Spring AOP it is not called again if the tx is retried.
* The proxied service ({@link AlfrescoCmisServiceImpl}) is called straight away.
* Fortunately the parameter replacing is not required for the second time.
* The wrapper ({@link TransactionAwareHolder}) will still be used.
* </li>
* <li>
* The {@link TxAwareHolderListener} is bound to the tx when the internal value is read.
* This is done this way because once the tx is rolled backed the listener list is cleared.
* The {@link TxAwareHolderListener} is still required to be called once the retry succeeds with a commit.
* The {@link CMISTransactionAwareHolderInterceptor} cannot recreate the {@link TransactionAwareHolder}
* as the interceptor is called only once.
* It is safe to bind the same listener many times as it is always the same object.
* </li>
* </ul>
* </p>
*
* @author alex.mukha
*/
public class TransactionAwareHolder<T> extends Holder<T>
{
private Holder<T> internalHolder;
private T value;
private TxAwareHolderListener txListener;
TransactionAwareHolder(Holder<T> internalHolder)
{
this.internalHolder = internalHolder;
this.value = internalHolder.getValue();
txListener = new TxAwareHolderListener();
}
@Override
public T getValue()
{
if (TransactionSynchronizationManager.isSynchronizationActive())
{
AlfrescoTransactionSupport.bindListener(txListener);
}
return this.value;
}
@Override
public void setValue(T value)
{
this.value = value;
}
@Override
public String toString()
{
return "TransactionAwareHolder{" +
"internalHolder=" + internalHolder +
", value=" + value +
'}';
}
private class TxAwareHolderListener extends TransactionListenerAdapter
{
@Override
public void afterCommit()
{
internalHolder.setValue(getValue());
}
@Override
public void afterRollback()
{
setValue(internalHolder.getValue());
}
}
}

View File

@@ -84,9 +84,12 @@
<property name="cmisExceptions" ref="CMISService_Exceptions" />
<property name="cmisControl" ref="CMISService_Control" />
<property name="cmisStreams" ref="CMISService_Streams" />
<property name="cmisHolder" ref="CMISTransactionAwareHolderInterceptor" />
<property name="authorityService" ref="AuthorityService" />
</bean>
<bean id="CMISTransactionAwareHolderInterceptor" class="org.alfresco.opencmis.CMISTransactionAwareHolderInterceptor" />
<bean id="CMISService_Transactions" class="org.alfresco.repo.transaction.RetryingTransactionInterceptor">
<property name="transactionService" ref="TransactionService" />
<property name="transactionManager" ref="transactionManager" />