Merge DM-DM_deployment to HEAD

18665 : Switch over to using new surf <formdata multipart-processing="false" /> configuration option.
   - now the PostContentCommandProcessor and PostSnapshotCommandProcessor handle their own MimePart processing.
  18683 : SAIL-288 Implementation of TransferService client side cancelAsync.
  18716 : Adding TransferEventBegin missed from asyncCancel work.
  18734 : Transfer format : implementation of null properties and Serialized base64 Java objects for type d:any
  18749 : SAIL-290: Added features that provide asynchronous commit on the receiver end and the ability to query commit status, as well as code that writes a progress report on the server side (currently only plain text) and the functionality to allow a transfer to be cancelled
  18750 : New files that should have been checked in with previous commit but weren't...
  18770 : Various transfer service work.
   - correction to transfer report name.
   - use surf Base64 Encoder
   - implementation of async commit to the TransferServiceImpl
   - implementation of the statusCommand through the HttpClientTransmitter.
  18773 : transferId was null.
  18780 : Changed the server-side commit to occur asynchronously.
	Added two test actions to transfer a single node or a tree of nodes.
	Tweaked TransferDefinition to provide varargs version of setNodes.
  18793 : SAIL-290:    Added a couple of test actions.
    	Added varargs versions of transfer and transferAsync on TransferService.
  18794 : SAIL-290: Added "targetExists" operation to the TransferService interface.
  18804 : SAIL-36: Fixed an issue where transfer could fail if numerous nodes with the same cm:name value are transferred.
  18805 : SAIL-36: Added model file that should have been with last commit.
  18808 Continuing work on transfer report.
  18825 TransferServiceImplTest green line.
  18836 : Added a little more output to the server-side transfer report. 
  18848 : More work on transfer report.

git-svn-id: https://svn.alfresco.com/repos/alfresco-enterprise/alfresco/HEAD/root@18865 c4b6b30b-aa2e-2d43-bbcb-ca4b014f7261
This commit is contained in:
Mark Rogers
2010-02-25 23:02:27 +00:00
parent 29bfaff367
commit 14a4f808b8
49 changed files with 4884 additions and 1536 deletions

View File

@@ -30,6 +30,10 @@ import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
@@ -80,6 +84,7 @@ import org.alfresco.service.cmr.search.SearchService;
import org.alfresco.service.cmr.transfer.TransferCallback;
import org.alfresco.service.cmr.transfer.TransferDefinition;
import org.alfresco.service.cmr.transfer.TransferException;
import org.alfresco.service.cmr.transfer.TransferProgress;
import org.alfresco.service.cmr.transfer.TransferService;
import org.alfresco.service.cmr.transfer.TransferTarget;
import org.alfresco.service.namespace.NamespaceService;
@@ -99,8 +104,14 @@ public class TransferServiceImpl implements TransferService
private static final String MSG_NO_GROUP = "transfer_service.unable_to_find_transfer_group";
private static final String MSG_NO_TARGET = "transfer_service.unable_to_find_transfer_target";
private static final String MSG_TARGET_EXISTS = "transfer_service.target_exists";
private static final String MSG_CANCELLED = "transfer_service.cancelled";
private static final String MSG_NO_NODES = "transfer_service.no_nodes";
/**
* The synchronised list of transfers in progress.
*/
private Map<String, TransferStatus> transferMonitoring = Collections.synchronizedMap(new HashMap<String,TransferStatus>());
private static Log logger = LogFactory.getLog(TransferServiceImpl.class);
public void init()
@@ -125,6 +136,11 @@ public class TransferServiceImpl implements TransferService
private TransferManifestNodeFactory transferManifestNodeFactory;
private TransferReporter transferReporter;
/**
* How long to delay while polling for commit status.
*/
private long commitPollDelay = 2000;
/**
* create transfer target
*/
@@ -268,6 +284,11 @@ public class TransferServiceImpl implements TransferService
NodeRef nodeRef = lookupTransferTarget(name);
nodeService.setProperty(nodeRef, TransferModel.PROP_ENABLED, new Boolean(enable));
}
public boolean targetExists(String name)
{
return (lookupTransferTarget(name) != null);
}
/**
*
@@ -321,7 +342,7 @@ public class TransferServiceImpl implements TransferService
}
/**
*
* Transfer sync without callbacks.
*/
public NodeRef transfer(String targetName, TransferDefinition definition)
{
@@ -335,8 +356,22 @@ public class TransferServiceImpl implements TransferService
* @param targetName
* @param definition
* @param callbacks
*
*/
public void transferAsync(String targetName, TransferDefinition definition, Set<TransferCallback> callbacks)
public void transferAsync(String targetName, TransferDefinition definition, TransferCallback... callbacks)
{
transferAsync(targetName, definition, Arrays.asList(callbacks));
}
/**
* Transfer async.
*
* @param targetName
* @param definition
* @param callbacks
*
*/
public void transferAsync(String targetName, TransferDefinition definition, Collection<TransferCallback> callbacks)
{
/**
* Event processor for this transfer instance
@@ -399,7 +434,20 @@ public class TransferServiceImpl implements TransferService
* @param definition
* @param callbacks
*/
public NodeRef transfer(String targetName, TransferDefinition definition, Set<TransferCallback> callbacks)
public NodeRef transfer(String targetName, TransferDefinition definition, TransferCallback... callbacks)
{
return transfer(targetName, definition, Arrays.asList(callbacks));
}
/**
* Transfer Synchronous
*
* @param targetName
* @param definition
* @param callbacks
*/
public NodeRef transfer(String targetName, TransferDefinition definition, Collection<TransferCallback> callbacks)
{
/**
* Event processor for this transfer instance
@@ -424,16 +472,14 @@ public class TransferServiceImpl implements TransferService
* @param eventProcessor
*/
private NodeRef transferImpl(String targetName, final TransferDefinition definition, final TransferEventProcessor eventProcessor)
{
NodeRef reportNode = null;
{
if(logger.isDebugEnabled())
{
logger.debug("transfer started to :" + targetName);
}
/**
* Wire in the transferReport
* Wire in the transferReport - so any callbacks are stored in transferReport
*/
final List<TransferEvent> transferReport = new LinkedList<TransferEvent>();
TransferCallback reportCallback = new TransferCallback()
@@ -467,9 +513,7 @@ public class TransferServiceImpl implements TransferService
*/
String prefix = "TRX-SNAP";
String suffix = ".xml";
long numberOfNodes = 0;
logger.debug("create snapshot");
// where to put snapshot ?
@@ -481,13 +525,13 @@ public class TransferServiceImpl implements TransferService
TransferManifestWriter formatter = new XMLTransferManifestWriter();
TransferManifestHeader header = new TransferManifestHeader();
header.setCreatedDate(new Date());
header.setNodeCount(nodes.size());
formatter.startTransferManifest(snapshotWriter);
formatter.writeTransferManifestHeader(header);
for(NodeRef nodeRef : nodes)
{
TransferManifestNode node = transferManifestNodeFactory.createTransferManifestNode(nodeRef);
formatter.writeTransferManifestNode(node);
numberOfNodes++;
}
formatter.endTransferManifest();
snapshotWriter.close();
@@ -500,8 +544,10 @@ public class TransferServiceImpl implements TransferService
{
outputFile(snapshotFile);
}
catch (Exception error)
catch (IOException error)
{
// This is debug code - so an exception thrown while debugging
logger.debug("error while outputting snapshotFile");
error.printStackTrace();
}
}
@@ -509,15 +555,22 @@ public class TransferServiceImpl implements TransferService
/**
* Begin
*/
logger.debug("transfer begin");
eventProcessor.start();
final Transfer transfer = transmitter.begin(target);
if(transfer != null)
{
logger.debug("transfer begin");
String transferId = transfer.getTransferId();
TransferStatus status = new TransferStatus(transferId);
transferMonitoring.put(transferId, status);
logger.debug("transfer begun transferId:" + transferId);
boolean prepared = false;
try
{
eventProcessor.begin(transferId);
checkCancel(transferId);
/**
* send Manifest
*/
@@ -528,16 +581,19 @@ public class TransferServiceImpl implements TransferService
/**
* Parse the manifest file and transfer chunks over
*
* ManifestFile -> Manifest Processor -> Chunker -> Transmitter
*
* Step 1: Create a chunker and wire it up to the transmitter
*/
// create a chunker and wire it up to the transmitter
final ContentChunker chunker = new ContentChunkerImpl();
final Long fRange = Long.valueOf(numberOfNodes);
final Long fRange = Long.valueOf(nodes.size());
chunker.setHandler(
new ContentChunkProcessor(){
private long counter = 0;
public void processChunk(Set<ContentData> data)
{
checkCancel(transfer.getTransferId());
logger.debug("send chunk to transmitter");
for(ContentData file : data)
{
@@ -548,9 +604,10 @@ public class TransferServiceImpl implements TransferService
}
}
);
// create a manifest processor and wire it up to the chunker
/**
* Step 2 : create a manifest processor and wire it up to the chunker
*/
TransferManifestProcessor processor = new TransferManifestProcessor()
{
public void processTransferManifestNode(TransferManifestNormalNode node)
@@ -558,6 +615,7 @@ public class TransferServiceImpl implements TransferService
Set<ContentData> data = TransferManifestNodeHelper.getContentData(node);
for(ContentData d : data)
{
checkCancel(transfer.getTransferId());
logger.debug("add content to chunker");
chunker.addContent(d);
}
@@ -571,68 +629,103 @@ public class TransferServiceImpl implements TransferService
}
};
// wire up the manifest parser to a manifest processor
/**
* Step 3: wire up the manifest reader to a manifest processor
*/
SAXParserFactory saxParserFactory = SAXParserFactory.newInstance();
SAXParser parser;
parser = saxParserFactory.newSAXParser();
XMLTransferManifestReader reader = new XMLTransferManifestReader(processor);
// start the magic
/**
* Step 4: start the magic Give the manifest file to the manifest reader
*/
parser.parse(snapshotFile, reader);
chunker.flush();
/**
* Content all sent over
*/
logger.debug("content sending finished");
checkCancel(transfer.getTransferId());
/**
* prepare
*/
eventProcessor.prepare();
transmitter.prepare(transfer);
logger.debug("prepared");
logger.debug("prepared transferId:" + transferId);
checkCancel(transfer.getTransferId());
/**
* committing
*/
eventProcessor.committing(100, 0);
eventProcessor.commit();
transmitter.commit(transfer);
logger.debug("committing transferId:" + transferId);
checkCancel(transfer.getTransferId());
/**
* need to poll for status
* need to poll for committed status
*/
// transmitter.getCommitStatus(transfer);
eventProcessor.committing(100, 50);
eventProcessor.committing(100, 100);
TransferProgress progress = null;
int position = -1;
for(int retries = 0; retries < 3; retries++)
{
checkCancel(transfer.getTransferId());
try
{
progress = transmitter.getStatus(transfer);
if(progress.getCurrentPosition() != position)
{
position = progress.getCurrentPosition();
eventProcessor.committing(progress.getEndPosition(), position);
}
if(progress.isFinished())
{
logger.debug("isFinished=true");
break;
}
retries = 0;
}
catch (TransferException te)
{
logger.debug("error while committing - retrying", te);
}
/**
* Now sleep for a while.
*/
Thread.sleep(commitPollDelay);
}
logger.debug("Finished transferId:" + transferId);
checkCancel(transfer.getTransferId());
/**
* committed
*/
//eventProcessor.serverMessage("Committed");
*/
eventProcessor.success();
checkCancel(transfer.getTransferId());
prepared = true;
logger.debug("committed");
logger.debug("committed - write transfer report transferId:" + transferId);
// Write the Successful transfer report if we get here
// in its own transaction so it cannot be rolled back
final TransferTarget fTarget = target;
reportNode = transactionService.getRetryingTransactionHelper().doInTransaction(
new RetryingTransactionHelper.RetryingTransactionCallback<NodeRef>()
{
public NodeRef execute() throws Throwable
{
logger.debug("transfer report starting");
NodeRef reportNode = transferReporter.createTransferReport(transfer, fTarget, definition, transferReport);
logger.debug("transfer report done");
return reportNode;
}
});
/**
* Write the Successful transfer report if we get here
*/
NodeRef reportNode = persistTransferReport(transfer, target, definition, transferReport, snapshotFile);
logger.debug("success - at end of method transferId:" + transferId);
return reportNode;
}
finally
{
logger.debug("remove monitoring for transferId:" + transferId);
transferMonitoring.remove(transferId);
logger.debug("removed monitoring for transferId:" + transferId);
if(!prepared)
{
logger.debug("abort incomplete transfer");
@@ -640,33 +733,42 @@ public class TransferServiceImpl implements TransferService
}
}
}
//TODO Do we ever get here ?
logger.debug("returning null - unable lock target");
return null;
}
catch (TransferException t)
{
logger.debug("TransferException - unable to transfer", t);
eventProcessor.error(t);
/**
* Write the transfer report. This is an error report so needs to be out
*/
if(target != null && reportNode!= null)
if(target != null )
{
// reportNode = transferReporter.createTransferReport(target, definition, transferReport);
persistTransferReport(t, target, definition, transferReport, snapshotFile);
}
throw t;
}
catch (Exception t)
{
// Wrap any other exception as a transfer exception
logger.debug("unable to transfer", t);
logger.debug("Exception - unable to transfer", t);
eventProcessor.error(t);
/**
* Write the transfer report. This is an error report so needs to be out
*/
if(target != null && reportNode!= null)
if(target != null )
{
// reportNode = transferReporter.createTransferReport(target, definition, transferReport);
persistTransferReport(t, target, definition, transferReport, snapshotFile);
}
/**
* Wrap the exception as a transfer exception
*/
throw new TransferException("unable to transfer:" + t.toString(), t);
}
finally
@@ -678,10 +780,39 @@ public class TransferServiceImpl implements TransferService
{
snapshotFile.delete();
}
}
return reportNode;
logger.debug("snapshot file deleted");
}
} // end of transferImpl
/**
* CancelAsync
*/
public void cancelAsync(String transferHandle)
{
TransferStatus status = transferMonitoring.get(transferHandle);
if(status != null)
{
logger.debug("canceling transfer :" + transferHandle);
status.cancelMe = true;
}
}
/**
* Check whether the specified transfer should be cancelled.
* @param transferHandle
* @throws TransferException - the transfer has been cancelled.
*/
private void checkCancel(String transferHandle) throws TransferException
{
TransferStatus status = transferMonitoring.get(transferHandle);
if(status != null)
{
if(status.cancelMe)
{
throw new TransferException(MSG_CANCELLED);
}
}
}
public void setNodeService(NodeService nodeService)
{
@@ -836,7 +967,7 @@ public class TransferServiceImpl implements TransferService
* Utility to dump the contents of a file to the console
* @param file
*/
private static void outputFile(File file) throws Exception
private static void outputFile(File file) throws IOException
{
BufferedReader reader = new BufferedReader(new FileReader(file));
String s = reader.readLine();
@@ -847,6 +978,48 @@ public class TransferServiceImpl implements TransferService
}
}
/**
* Success transfer report
*/
private NodeRef persistTransferReport(final Transfer transfer, final TransferTarget target, final TransferDefinition definition, final List<TransferEvent> events, final File snapshotFile)
{
/**
* persist the transfer report in its own transaction so it cannot be rolled back
*/
NodeRef reportNode = transactionService.getRetryingTransactionHelper().doInTransaction(
new RetryingTransactionHelper.RetryingTransactionCallback<NodeRef>()
{
public NodeRef execute() throws Throwable
{
logger.debug("transfer report starting");
NodeRef reportNode = transferReporter.createTransferReport(transfer, target, definition, events, snapshotFile);
logger.debug("transfer report done");
return reportNode;
}
}, false, true);
return reportNode;
}
/**
* Error Transfer report
*/
private NodeRef persistTransferReport(final Exception t, final TransferTarget target, final TransferDefinition definition, final List<TransferEvent> events, final File snapshotFile)
{
// in its own transaction so it cannot be rolled back
NodeRef reportNode = transactionService.getRetryingTransactionHelper().doInTransaction(
new RetryingTransactionHelper.RetryingTransactionCallback<NodeRef>()
{
public NodeRef execute() throws Throwable
{
logger.debug("transfer report starting");
NodeRef reportNode = transferReporter.createTransferReport(t, target, definition, events, snapshotFile);
logger.debug("transfer report done");
return reportNode;
}
}, false, true);
return reportNode;
}
public void setTransferManifestNodeFactory(TransferManifestNodeFactory transferManifestNodeFactory)
{
this.transferManifestNodeFactory = transferManifestNodeFactory;
@@ -886,4 +1059,24 @@ public class TransferServiceImpl implements TransferService
{
return transferReporter;
}
public void setCommitPollDelay(long commitPollDelay)
{
this.commitPollDelay = commitPollDelay;
}
public long getCommitPollDelay()
{
return commitPollDelay;
}
private class TransferStatus
{
TransferStatus(String transferId)
{
this.transferId = transferId;
}
String transferId;
boolean cancelMe = false;
}
}