Merge WCM_WDR_MER TO HEAD

MOB-399 Implementation of Deploymemnt Engine
     MOB-669
     MOB-630
     Revisions 14181 - 14539 merged,,

git-svn-id: https://svn.alfresco.com/repos/alfresco-enterprise/alfresco/HEAD/root@14543 c4b6b30b-aa2e-2d43-bbcb-ca4b014f7261
This commit is contained in:
Mark Rogers
2009-06-04 20:57:18 +00:00
parent 5cd055bbd7
commit eeb22722a8
15 changed files with 759 additions and 337 deletions

View File

@@ -53,14 +53,23 @@
<!-- Deployment Service -->
<bean id="deploymentService" class="org.alfresco.repo.deploy.DeploymentServiceImpl">
<bean id="deploymentService" class="org.alfresco.repo.deploy.DeploymentServiceImpl" init-method="init">
<property name="avmService">
<ref bean="indexingAVMService"/>
</property>
<property name="avmNodeService">
<ref bean="avmNodeService"/>
</property>
<property name="transactionService">
<ref bean="transactionService"/>
</property>
<property name="jobLockService">
<ref bean="jobLockService"/>
</property>
<!-- how many files to send in parallel -->
<property name="numberOfSendingThreads">
<value>5</value>

View File

@@ -0,0 +1,90 @@
<?xml version='1.0' encoding='UTF-8'?>
<!DOCTYPE beans PUBLIC '-//SPRING//DTD BEAN//EN' 'http://www.springframework.org/dtd/spring-beans.dtd'>
<!-- Built in deployment server target -->
<beans>
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="ignoreUnresolvablePlaceholders">
<value>true</value>
</property>
<property name="locations">
<list>
<value>classpath:alfresco/deployment/filesystem.properties</value>
</list>
</property>
</bean>
<!-- This is the common configuration for all file system receivers -->
<bean id="fileSystemReceiverService" class="org.alfresco.deployment.impl.fsr.FileSystemReceiverServiceImpl"
init-method="init">
<!-- Will an error be thrown if the FSR overwrites files outside its control -->
<property name="errorOnOverwrite"><value>${deployment.filesystem.errorOnOverwrite}</value></property>
<!-- Where to store temporary data -->
<property name="dataDirectory">
<value>${deployment.filesystem.datadir}</value>
</property>
<!-- Where to log information -->
<property name="logDirectory">
<value>${deployment.filesystem.logdir}</value>
</property>
<!-- where to store meta data -->
<property name="metaDataDirectory">
<value>${deployment.filesystem.metadatadir}</value>
</property>
<property name="commandQueue"><ref bean="deploymentReceiverCommandQueue" /></property>
</bean>
<!--
Defines and registers the deployment target with the name "default"
Which is a FileSystemDeploymentTarget with the "deploymentReceiverEngine"
-->
<bean class="org.alfresco.deployment.impl.server.DeploymentTargetRegistrationBean"
init-method="register" >
<property name="name"><value>${deployment.filesystem.default.name}</value></property>
<property name="registry"><ref bean="deploymentReceiverEngine" /></property>
<property name="target">
<bean class="org.alfresco.deployment.impl.fsr.FileSystemDeploymentTarget" init-method="init">
<property name="rootDirectory"><value>${deployment.filesystem.default.rootdir}</value></property>
<property name="autoFix"><value>${deployment.filesystem.autofix}</value></property>
<property name="fileSystemReceiverService"><ref bean="fileSystemReceiverService"/></property>
<!-- how to authenticate for this particular target -->
<property name="authenticator">
<bean
class="org.alfresco.deployment.impl.DeploymentReceiverAuthenticatorAuthenticationService"
init-method="init">
<property name="authenticationService">
<ref bean="AuthenticationService" />
</property>
</bean>
</property>
<!-- Target Event Handlers
<property name="prepare">
<list>
<bean class="org.alfresco.deployment.SampleRunnable"/>
</list>
</property>
<property name="postCommit">
<list>
<bean class="org.alfresco.deployment.SampleRunnable"/>
</list>
</property>
-->
</bean>
</property>
</bean>
</beans>

View File

@@ -0,0 +1,18 @@
; Built in deployment receiver properties for the default
; filesystem receiver
; filesystem receiver configuration
deployment.filesystem.datadir=./depdata
deployment.filesystem.logdir=./deplog
deployment.filesystem.metadatadir=./depmetadata
deployment.filesystem.autofix=true
deployment.filesystem.errorOnOverwrite=false
; default filesystem target configuration
deployment.filesystem.default.rootdir=./www
deployment.filesystem.default.name=filesystem

View File

@@ -309,6 +309,7 @@ attribute.rmi.service.port=50503
authentication.rmi.service.port=50504
repo.rmi.service.port=50505
action.rmi.service.port=50506
deployment.rmi.service.port=50507
# External executable locations
ooo.exe=soffice

View File

@@ -0,0 +1,4 @@
# WCM Deployment - Receiver Properties
wcm-deployment-receiver.rmi.service.port=${deployment.rmi.service.port}
wcm-deployment-receiver.poll.delay=5000

View File

@@ -0,0 +1,71 @@
<?xml version='1.0' encoding='UTF-8'?>
<!DOCTYPE beans PUBLIC '-//SPRING//DTD BEAN//EN' 'http://www.springframework.org/dtd/spring-beans.dtd'>
<beans>
<!-- Repository Embedded Deployment Receiver Configuration -->
<!-- command queue for deployment service -->
<bean id="deploymentReceiverCommandQueue" class="org.alfresco.deployment.impl.server.DeploymentCommandQueueImpl" >
</bean>
<!-- Housekeeper for the deployment queue -->
<bean id="commandQueueHousekeeper" class="org.alfresco.deployment.impl.server.DeploymentCommandQueueHousekeeper"
init-method="init">
<property name="commandQueue"><ref bean="deploymentReceiverCommandQueue" /></property>
</bean>
<!-- Management for reader threads -->
<bean id="deploymentReaderManagement" class="org.alfresco.deployment.impl.server.ReaderManagementPool" >
</bean>
<!-- This is the deployment engine which is built into the repository -->
<bean id="deploymentReceiverEngine"
class="org.alfresco.deployment.impl.server.DeploymentReceiverEngineImpl"
init-method="init">
<!-- How long to wait before polling housekeeping -->
<property name="pollDelay"><value>${wcm-deployment-receiver.poll.delay}</value></property>
<property name="commandQueue"><ref bean="deploymentReceiverCommandQueue" /></property>
<property name="readerManagement"><ref bean="deploymentReaderManagement" /></property>
<!-- Define your content transformers here
<property name="transformers">
<list>
</list>
</property>
-->
<!-- Define your housekeepers here -->
<property name="housekeepers">
<set>
<ref bean="commandQueueHousekeeper"/>
</set>
</property>
</bean>
<!-- The remote interface for the deployment receiver engine -->
<bean id="deploymentReceiverTransportRMI" class="org.springframework.remoting.rmi.RmiServiceExporter">
<property name="service">
<ref bean="deploymentReceiverEngine"/>
</property>
<property name="serviceInterface">
<value>org.alfresco.deployment.DeploymentReceiverTransport</value>
</property>
<property name="serviceName">
<value>deployment</value>
</property>
<property name="registryPort">
<value>${alfresco.rmi.services.port}</value>
</property>
<property name="servicePort">
<value>${wcm-deployment-receiver.rmi.service.port}</value>
</property>
</bean>
<!-- Now import the deployment targets -->
<import resource="classpath*:alfresco/deployment/*-target.xml" />
</beans>

View File

@@ -418,8 +418,18 @@ public class AVMDeployWebsiteAction extends ActionExecuterAbstractBase
logger.debug("Performing Alfresco deployment to " + host + ":" + port +
" using deploymentserver: " + serverProps);
this.deployService.deployDifference(version, path, host, port,
remoteUsername, remotePassword, targetPath, regexMatcher, true, false, false, callbacks);
this.deployService.deployDifference(version,
path,
host,
port,
remoteUsername,
remotePassword,
targetPath,
regexMatcher,
true,
false,
false,
callbacks);
}
}
catch (Throwable err)

View File

@@ -126,7 +126,7 @@ public class BulkLoader
catch (IOException e)
{
e.printStackTrace(System.err);
throw new AVMException("I/O Error");
throw new AVMException("I/O Error", e);
}
}
}

View File

@@ -119,10 +119,10 @@ public class ASRDeploymentTest extends AVMServiceTestBase
String buffyText = "This is test data: Buffy the Vampire Slayer is an Emmy Award-winning and Golden Globe-nominated American cult television series that aired from March 10, 1997 until May 20, 2003. The series was created in 1997 by writer-director Joss Whedon under his production tag, Mutant Enemy Productions with later co-executive producers being Jane Espenson, David Fury, and Marti Noxon. The series narrative follows Buffy Summers (played by Sarah Michelle Gellar), the latest in a line of young women chosen by fate to battle against vampires, demons, and the forces of darkness as the Slayer. Like previous Slayers, Buffy is aided by a Watcher, who guides and trains her. Unlike her predecessors, Buffy surrounds herself with a circle of loyal friends who become known as the Scooby Gang.";
fService.createFile("main:/a/b", "buffy").close();
writer = fService.getContentWriter("main:/a/b/buffy");
// Force a conversion
writer.setEncoding("UTF-16");
writer.setMimetype(MimetypeMap.MIMETYPE_TEXT_PLAIN);
writer.putContent(buffyText);
fService.createFile("main:/a/b", "fudge.bak").close();
@@ -144,14 +144,14 @@ public class ASRDeploymentTest extends AVMServiceTestBase
assertTrue("first deployment no start", firstDeployment.contains(new DeploymentEvent(DeploymentEvent.Type.START, null, destRef)));
assertTrue("first deployment no finish", firstDeployment.contains(new DeploymentEvent(DeploymentEvent.Type.END, null, destRef)));
assertTrue("first deployment wrong size", firstDeployment.size() == 11);
assertTrue("Update missing: /a", firstDeployment.contains(new DeploymentEvent(DeploymentEvent.Type.COPIED, null, destRef + "/a")));
assertTrue("Update missing: /a/b", firstDeployment.contains(new DeploymentEvent(DeploymentEvent.Type.COPIED, null, destRef + "/a/b")));
assertTrue("Update missing: /a/b/c", firstDeployment.contains(new DeploymentEvent(DeploymentEvent.Type.COPIED, null, destRef + "/a/b/c")));
assertTrue("Update missing: /d/e", firstDeployment.contains(new DeploymentEvent(DeploymentEvent.Type.COPIED, null, destRef + "/d/e")));
assertTrue("Update missing: /a/b/c/foo", firstDeployment.contains(new DeploymentEvent(DeploymentEvent.Type.COPIED, null, destRef + "/a/b/c/foo")));
assertTrue("Update missing: /a/b/c/bar", firstDeployment.contains(new DeploymentEvent(DeploymentEvent.Type.COPIED, null, destRef + "/a/b/c/bar")));
assertTrue("Update missing: /a/b/buffy", firstDeployment.contains(new DeploymentEvent(DeploymentEvent.Type.COPIED, null, destRef + "/a/b/buffy")));
assertFalse("Fudge has not been excluded", firstDeployment.contains(new DeploymentEvent(DeploymentEvent.Type.COPIED, null, destRef + "/a/b/fudge.bak")));
assertTrue("Update missing: /a", firstDeployment.contains(new DeploymentEvent(DeploymentEvent.Type.CREATED, null, destRef + "/a")));
assertTrue("Update missing: /a/b", firstDeployment.contains(new DeploymentEvent(DeploymentEvent.Type.CREATED, null, destRef + "/a/b")));
assertTrue("Update missing: /a/b/c", firstDeployment.contains(new DeploymentEvent(DeploymentEvent.Type.CREATED, null, destRef + "/a/b/c")));
assertTrue("Update missing: /d/e", firstDeployment.contains(new DeploymentEvent(DeploymentEvent.Type.CREATED, null, destRef + "/d/e")));
assertTrue("Update missing: /a/b/c/foo", firstDeployment.contains(new DeploymentEvent(DeploymentEvent.Type.CREATED, null, destRef + "/a/b/c/foo")));
assertTrue("Update missing: /a/b/c/bar", firstDeployment.contains(new DeploymentEvent(DeploymentEvent.Type.CREATED, null, destRef + "/a/b/c/bar")));
assertTrue("Update missing: /a/b/buffy", firstDeployment.contains(new DeploymentEvent(DeploymentEvent.Type.CREATED, null, destRef + "/a/b/buffy")));
assertFalse("Fudge has not been excluded", firstDeployment.contains(new DeploymentEvent(DeploymentEvent.Type.CREATED, null, destRef + "/a/b/fudge.bak")));
// Check that files exist in the destination AVM Store
{

View File

@@ -43,6 +43,8 @@ public class DeploymentClientOutputStream extends OutputStream
private String fOutputToken;
private boolean open = true;
/**
* Make one up.
* @param transport
@@ -75,7 +77,11 @@ public class DeploymentClientOutputStream extends OutputStream
@Override
public void close() throws IOException
{
// NO OP
if(open)
{
fTransport.finishSend(fTicket, fOutputToken);
}
open = false;
}
/* (non-Javadoc)

View File

@@ -27,9 +27,13 @@ package org.alfresco.repo.deploy;
import java.io.OutputStream;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.io.Serializable;
import org.alfresco.deployment.DeploymentReceiverService;
import org.alfresco.deployment.DeploymentReceiverTransport;
import org.alfresco.deployment.DeploymentToken;
import org.alfresco.deployment.FileDescriptor;
/**
@@ -68,9 +72,17 @@ public class DeploymentReceiverServiceClient implements
/* (non-Javadoc)
* @see org.alfresco.deployment.DeploymentReceiverService#begin(java.lang.String, java.lang.String, java.lang.String)
*/
public String begin(String target, String user, String password)
public DeploymentToken begin(String target, String storeName, int version, String user, String password)
{
return fTransport.begin(target, user, password);
return fTransport.begin(target, storeName, version, user, password);
}
/* (non-Javadoc)
* @see org.alfresco.deployment.DeploymentReceiverService#commit(java.lang.String)
*/
public void prepare(String ticket)
{
fTransport.prepare(ticket);
}
/* (non-Javadoc)
@@ -109,17 +121,17 @@ public class DeploymentReceiverServiceClient implements
/* (non-Javadoc)
* @see org.alfresco.deployment.DeploymentReceiverService#mkdir(java.lang.String, java.lang.String, java.lang.String)
*/
public void mkdir(String ticket, String path, String guid)
public void createDirectory(String ticket, String path, String guid, Set<String>aspects, Map<String, Serializable> properties)
{
fTransport.mkdir(ticket, path, guid);
fTransport.createDirectory(ticket, path, guid, aspects, properties);
}
/* (non-Javadoc)
* @see org.alfresco.deployment.DeploymentReceiverService#send(java.lang.String, java.lang.String, java.lang.String)
*/
public OutputStream send(String ticket, String path, String guid)
public OutputStream send(String ticket, String path, String guid, String encoding, String mimeType, Set<String>aspects, Map<String, Serializable> props)
{
String outputToken = fTransport.getSendToken(ticket, path, guid);
String outputToken = fTransport.getSendToken(ticket, path, guid, encoding, mimeType, aspects, props);
return new DeploymentClientOutputStream(fTransport, ticket, outputToken);
}
@@ -131,8 +143,8 @@ public class DeploymentReceiverServiceClient implements
fTransport.shutDown(user, password);
}
public void setGuid(String ticket, String path, String guid)
public void updateDirectory(String ticket, String path, String guid, Set<String>aspects, Map<String, Serializable> props)
{
fTransport.setGuid(ticket, path, guid);
fTransport.updateDirectory(ticket, path, guid, aspects, props);
}
}

View File

@@ -29,9 +29,11 @@ import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -45,13 +47,16 @@ import java.util.concurrent.TimeUnit;
import org.alfresco.deployment.DeploymentReceiverService;
import org.alfresco.deployment.DeploymentReceiverTransport;
import org.alfresco.deployment.DeploymentToken;
import org.alfresco.deployment.DeploymentTransportOutputFilter;
import org.alfresco.deployment.FileDescriptor;
import org.alfresco.deployment.FileType;
import org.alfresco.repo.action.ActionServiceRemote;
import org.alfresco.repo.avm.AVMNodeConverter;
import org.alfresco.repo.avm.AVMNodeService;
import org.alfresco.repo.avm.util.SimplePath;
import org.alfresco.repo.domain.PropertyValue;
import org.alfresco.repo.lock.JobLockService;
import org.alfresco.repo.remote.AVMRemoteImpl;
import org.alfresco.repo.remote.AVMSyncServiceRemote;
import org.alfresco.repo.remote.ClientTicketHolder;
@@ -76,11 +81,13 @@ import org.alfresco.service.cmr.remote.AVMRemote;
import org.alfresco.service.cmr.remote.AVMRemoteTransport;
import org.alfresco.service.cmr.remote.AVMSyncServiceTransport;
import org.alfresco.service.cmr.repository.ContentData;
import org.alfresco.service.cmr.repository.NodeRef;
import org.alfresco.service.cmr.security.AuthenticationService;
import org.alfresco.service.namespace.QName;
import org.alfresco.service.transaction.TransactionService;
import org.alfresco.util.NameMatcher;
import org.alfresco.util.Pair;
import org.alfresco.util.PropertyCheck;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.remoting.rmi.RmiProxyFactoryBean;
@@ -103,11 +110,18 @@ public class DeploymentServiceImpl implements DeploymentService
*/
private AVMService fAVMService;
private AVMNodeService fAVMNodeService;
/**
* The local Transaction Service Instance
*/
TransactionService trxService;
/**
* The jobLockService
*/
private JobLockService jobLockService;
/**
* The Ticket holder.
*/
@@ -116,7 +130,37 @@ public class DeploymentServiceImpl implements DeploymentService
/**
* number of concurrent sending threads
*/
private int numberOfSendingThreads = 3;
private int numberOfSendingThreads = 4;
/**
* Hold the deployment lock for 3600 seconds (1 hour)
*/
private long targetLockTimeToLive = 3600000;
/**
* Retry for target lock every 10 seconds
*/
private long targetLockRetryWait = 10000;
/**
* Retry 10000 times before giving up
*/
private int targetLockRetryCount = 10000;
/**
* The size of the output buffers
*/
private int OUTPUT_BUFFER_SIZE = 20000;
private int outputBufferSize = OUTPUT_BUFFER_SIZE;
public void init()
{
PropertyCheck.mandatory(this, "jobLockService", jobLockService);
PropertyCheck.mandatory(this, "transactionService", trxService);
PropertyCheck.mandatory(this, "avmService", fAVMService);
PropertyCheck.mandatory(this, "avmNodeService", fAVMNodeService);
}
/**
* Default constructor.
@@ -163,13 +207,25 @@ public class DeploymentServiceImpl implements DeploymentService
final boolean dontDo,
final List<DeploymentCallback> callbacks)
{
DeploymentDestination dest = getLock(hostName, port);
synchronized (dest)
{
final String storeName = srcPath.substring(0, srcPath.indexOf(":"));
/**
* Lock the cluster for the remote target
*/
String lockStr = hostName + "." + "asr." + storeName;
QName lockQName = QName.createQName("{org.alfresco.deployment.lock}" + lockStr);
Lock lock = new Lock(lockQName);
lock.makeLock();
/**
* Got the lock - now do a deployment
*/
if (fgLogger.isDebugEnabled())
{
fgLogger.debug("Deploying to Remote Alfresco at " + dest);
fgLogger.debug("Deploying to Remote Alfresco at " + hostName);
}
try
{
RetryingTransactionHelper trn = trxService.getRetryingTransactionHelper();
@@ -182,19 +238,18 @@ public class DeploymentServiceImpl implements DeploymentService
* If version is -1, Create a local snapshot to deploy
*/
fgLogger.debug("creating snapshot of local version");
final String storeName = srcPath.substring(0, srcPath.indexOf(":"));
version = snapshotLocal(storeName);
// RetryingTransactionCallback<Integer> localSnapshot = new RetryingTransactionCallback<Integer>()
// {
// public Integer execute() throws Throwable
// {
// int newVersion = fAVMService.createSnapshot(storeName, null, null).get(storeName);
// return new Integer(newVersion);
// }
// };
// version = trn.doInTransaction(localSnapshot, false, true).intValue();
// fgLogger.debug("snapshot local created " + storeName + ", " + version);
RetryingTransactionCallback<Integer> localSnapshot = new RetryingTransactionCallback<Integer>()
{
public Integer execute() throws Throwable
{
int newVersion = fAVMService.createSnapshot(storeName, null, null).get(storeName);
return new Integer(newVersion);
}
};
version = trn.doInTransaction(localSnapshot, false, true).intValue();
fgLogger.debug("snapshot local created " + storeName + ", " + version);
}
{
@@ -373,7 +428,6 @@ public class DeploymentServiceImpl implements DeploymentService
fTicketHolder.setTicket(null);
}
}
}
/**
* Deploy all the children of corresponding directories. (ASR version)
@@ -467,7 +521,7 @@ public class DeploymentServiceImpl implements DeploymentService
Pair<Integer, String> source =
new Pair<Integer, String>(version, src.getPath());
String destination = AVMNodeConverter.ExtendAVMPath(dstParent.getPath(), src.getName());
DeploymentEvent event = new DeploymentEvent(DeploymentEvent.Type.COPIED,
DeploymentEvent event = new DeploymentEvent(DeploymentEvent.Type.CREATED,
source,
destination);
processEvent(event, callbacks);
@@ -478,10 +532,12 @@ public class DeploymentServiceImpl implements DeploymentService
copyDirectory(version, src, dstParent, remote, matcher, callbacks);
return;
}
// here when src is a file
Pair<Integer, String> source =
new Pair<Integer, String>(version, src.getPath());
String destination = AVMNodeConverter.ExtendAVMPath(dstParent.getPath(), src.getName());
DeploymentEvent event = new DeploymentEvent(DeploymentEvent.Type.COPIED,
DeploymentEvent event = new DeploymentEvent(DeploymentEvent.Type.CREATED,
source,
destination);
processEvent(event, callbacks);
@@ -491,12 +547,28 @@ public class DeploymentServiceImpl implements DeploymentService
}
// Copy a source file.
OutputStream out = remote.createFile(dstParent.getPath(), src.getName());
try
{
InputStream in = fAVMService.getFileInputStream(src);
copyStream(in, out);
copyMetadata(version, src, remote.lookup(-1, dstParent.getPath() + '/' + src.getName()), remote);
}
finally
{
try
{
out.close();
}
catch (IOException e)
{
throw new AVMException("I/O Exception", e);
}
out = null;
}
return;
}
// Destination exists.
// Destination exists and is a directory.
if (src.isDirectory())
{
// If the destination is also a directory, recursively deploy.
@@ -508,7 +580,7 @@ public class DeploymentServiceImpl implements DeploymentService
Pair<Integer, String> source =
new Pair<Integer, String>(version, src.getPath());
String destination = dst.getPath();
DeploymentEvent event = new DeploymentEvent(DeploymentEvent.Type.COPIED,
DeploymentEvent event = new DeploymentEvent(DeploymentEvent.Type.CREATED,
source, destination);
processEvent(event, callbacks);
@@ -522,7 +594,7 @@ public class DeploymentServiceImpl implements DeploymentService
copyDirectory(version, src, dstParent, remote, matcher, callbacks);
return;
}
// Source is a file.
// Source exists and is a file.
if (dst.isFile())
{
// Destination is also a file. Overwrite if the GUIDS are different.
@@ -593,7 +665,7 @@ public class DeploymentServiceImpl implements DeploymentService
if (child.isFile())
{
DeploymentEvent event =
new DeploymentEvent(DeploymentEvent.Type.COPIED,
new DeploymentEvent(DeploymentEvent.Type.CREATED,
new Pair<Integer, String>(version, src.getPath() + '/' + child.getName()),
newParent.getPath() + '/' + child.getName());
processEvent(event, callbacks);
@@ -607,7 +679,7 @@ public class DeploymentServiceImpl implements DeploymentService
{
// is a directory
DeploymentEvent event =
new DeploymentEvent(DeploymentEvent.Type.COPIED,
new DeploymentEvent(DeploymentEvent.Type.CREATED,
new Pair<Integer, String>(version, src.getPath() + '/' + child.getName() ),
newParent.getPath() + '/' + child.getName());
processEvent(event, callbacks);
@@ -634,7 +706,6 @@ public class DeploymentServiceImpl implements DeploymentService
out.write(buff, 0, read);
}
in.close();
out.close();
}
catch (IOException e)
{
@@ -856,32 +927,29 @@ public class DeploymentServiceImpl implements DeploymentService
}
}
/**
* Create a new local snapshot
* @param storeName
* @return the version
*/
private int snapshotLocal(final String storeName)
private Set<String>getAspects(AVMService avmService, AVMNodeDescriptor src)
{
RetryingTransactionHelper trn = trxService.getRetryingTransactionHelper();
/**
* If version is -1, Create a local snapshot to deploy
*/
fgLogger.debug("creating snapshot of local version");
RetryingTransactionCallback<Integer> localSnapshot = new RetryingTransactionCallback<Integer>()
Set<QName>aspects = avmService.getAspects(src);
Set<String>stringAspects = new HashSet<String>();
for (QName aspect : aspects)
{
public Integer execute() throws Throwable
{
int newVersion = fAVMService.createSnapshot(storeName, null, null).get(storeName);
return new Integer(newVersion);
stringAspects.add(aspect.toString());
}
return stringAspects;
}
};
int version = trn.doInTransaction(localSnapshot, false, true).intValue();
fgLogger.debug("snapshot local created " + storeName + ", " + version);
return version;
private Map<String, Serializable> getProperties(AVMNodeDescriptor src)
{
Map<QName, PropertyValue> properties = fAVMService.getNodeProperties(src);
NodeRef nodeRef = AVMNodeConverter.ToNodeRef(src.getVersionID(), src.getPath());
Map<String, Serializable> stringProperties = new HashMap<String, Serializable>();
for(QName key : properties.keySet())
{
stringProperties.put(key.toString(), fAVMNodeService.getProperty(nodeRef, key));
}
return stringProperties;
}
/**
@@ -901,6 +969,8 @@ public class DeploymentServiceImpl implements DeploymentService
* @param dontDo Not implemented
* @param callbacks Event callbacks when a deployment Starts, Ends, Adds, Deletes etc.
*
* @throws AVMException
*
* @see org.alfresco.service.cmr.avm.deploy.DeploymentService#deployDifferenceFS(int, java.lang.String, java.lang.String, int, java.lang.String, java.lang.String, java.lang.String, boolean, boolean)
*/
public void deployDifferenceFS(int version,
@@ -917,9 +987,25 @@ public class DeploymentServiceImpl implements DeploymentService
boolean dontDo,
List<DeploymentCallback> callbacks)
{
/**
* Lock cluster for the remote target
*/
String lockStr = hostName + "." + target;
QName lockQName = QName.createQName("{org.alfresco.deployment.lock}" + lockStr);
Lock lock = new Lock(lockQName);
lock.makeLock();
/**
* Cluster Lock held here
*/
if (fgLogger.isDebugEnabled())
{
fgLogger.debug("Deploying To File System Reciever on " + hostName + " to target " + target);
Object[] objs = {version, srcPath, adapterName, hostName, port, target};
MessageFormat f = new MessageFormat("Deployment Lock Held: version {0}, srcPath {1}, adapterName {2}, hostName {3}, port {4}, target {5}");
fgLogger.debug(f.format(objs));
}
DeploymentReceiverService service = null;
@@ -939,14 +1025,23 @@ public class DeploymentServiceImpl implements DeploymentService
try
{
final String storeName = srcPath.substring(0, srcPath.indexOf(':'));
try {
if (version < 0)
{
String storeName = srcPath.substring(0, srcPath.indexOf(':'));
version = snapshotLocal(storeName);
//version = fAVMService.createSnapshot(storeName, null, null).get(storeName);
RetryingTransactionHelper trn = trxService.getRetryingTransactionHelper();
RetryingTransactionCallback<Integer> localSnapshot = new RetryingTransactionCallback<Integer>()
{
public Integer execute() throws Throwable
{
int newVersion = fAVMService.createSnapshot(storeName, null, null).get(storeName);
return new Integer(newVersion);
}
};
version = trn.doInTransaction(localSnapshot, false, true).intValue();
fgLogger.debug("snapshot local created " + storeName + ", " + version);
}
transformers = getTransformers(adapterName);
@@ -985,8 +1080,9 @@ public class DeploymentServiceImpl implements DeploymentService
try
{
ticket = service.begin(target, userName, password);
deployDirectoryPushFSR(service, ticket, version, srcPath, "/", matcher, eventQueue, sendQueue, errors);
DeploymentToken token = service.begin(target, storeName, version, userName, password);
ticket = token.getTicket();
deployDirectoryPushFSR(service, ticket, version, srcPath, "/", matcher, eventQueue, sendQueue, errors, lock);
}
catch (Exception e)
{
@@ -1010,6 +1106,8 @@ public class DeploymentServiceImpl implements DeploymentService
{
try
{
fgLogger.debug("no errors - prepare and commit");
service.prepare(ticket);
service.commit(ticket);
}
catch (Exception e)
@@ -1020,6 +1118,7 @@ public class DeploymentServiceImpl implements DeploymentService
if(errors.size() > 0)
{
fgLogger.debug("errors on deployment workers");
Exception firstError = errors.get(0);
eventQueue.add(new DeploymentEvent(DeploymentEvent.Type.FAILED,
@@ -1093,13 +1192,16 @@ public class DeploymentServiceImpl implements DeploymentService
NameMatcher matcher,
BlockingQueue<DeploymentEvent> eventQueue,
BlockingQueue<DeploymentWork> sendQueue,
List<Exception> errors)
List<Exception> errors,
Lock lock)
{
Map<String, AVMNodeDescriptor> srcListing = fAVMService.getDirectoryListing(version, srcPath);
List<FileDescriptor> dstListing = service.getListing(ticket, dstPath);
Iterator<AVMNodeDescriptor> srcIter = srcListing.values().iterator();
Iterator<FileDescriptor> dstIter = dstListing.iterator();
lock.refreshLock();
// Here with two sorted directory listings
AVMNodeDescriptor src = null;
FileDescriptor dst = null;
@@ -1125,6 +1227,9 @@ public class DeploymentServiceImpl implements DeploymentService
{
fgLogger.debug("comparing src:" + src + " dst:"+ dst);
}
lock.refreshLock();
// This means no entry on src so delete what is on dst.
if (src == null)
{
@@ -1186,9 +1291,6 @@ public class DeploymentServiceImpl implements DeploymentService
new DeploymentEvent(DeploymentEvent.Type.UPDATED,
new Pair<Integer, String>(version, src.getPath()),
extendedPath), ticket, src));
// Work in progress
// copyFileToFSR(service, ticket, version, src,
// extendedPath, false);
}
src = null;
dst = null;
@@ -1198,11 +1300,20 @@ public class DeploymentServiceImpl implements DeploymentService
if (dst.getType() == FileType.DIR)
{
String extendedPath = extendPath(dstPath, dst.getName());
Set<String>stringAspects = getAspects(fAVMService, src);
Map<String, Serializable> stringProperties = getProperties(src);
/**
* Update the directory before any children
*/
service.updateDirectory(ticket, extendedPath, src.getGuid(), stringAspects, stringProperties);
if (!excluded(matcher, src.getPath(), extendedPath))
{
deployDirectoryPushFSR(service, ticket, version, src.getPath(), extendedPath, matcher, eventQueue, sendQueue, errors);
deployDirectoryPushFSR(service, ticket, version, src.getPath(), extendedPath, matcher, eventQueue, sendQueue, errors, lock);
}
service.setGuid(ticket, extendedPath, src.getGuid());
src = null;
dst = null;
continue;
@@ -1256,21 +1367,25 @@ public class DeploymentServiceImpl implements DeploymentService
{
String dstPath = extendPath(parentPath, src.getName());
// Need to queue the request to copy file or dir to remote.
sendQueue.add(new DeploymentWork(
new DeploymentEvent(DeploymentEvent.Type.COPIED,
new DeploymentEvent(DeploymentEvent.Type.CREATED,
new Pair<Integer, String>(version, src.getPath()),
dstPath), ticket, src));
if (src.isFile())
{
// copyFileToFSR(service, ticket, version, src, dstPath, true, transformers);
return;
}
// Need to create directories in controlling thread since then need to be BEFORE any children are written
// here if src is a directory.
service.mkdir(ticket, dstPath, src.getGuid());
// Need to create directories in controlling thread since it needs to be created
// BEFORE any children are written
Set<String>stringAspects = getAspects(fAVMService, src);
Map<String, Serializable> stringProperties = getProperties(src);
service.createDirectory(ticket, dstPath, src.getGuid(), stringAspects, stringProperties);
// now copy the children over
Map<String, AVMNodeDescriptor> listing = fAVMService.getDirectoryListing(src);
@@ -1325,25 +1440,6 @@ public class DeploymentServiceImpl implements DeploymentService
return matcher != null && ((srcPath != null && matcher.matches(srcPath)) || (dstPath != null && matcher.matches(dstPath)));
}
/**
* Get the object to lock for an alfresco->alfresco target.
* @param host
* @param port
* @return the lock
*/
private synchronized DeploymentDestination getLock(String host, int port)
{
DeploymentDestination newDest = new DeploymentDestination(host, port);
DeploymentDestination dest = fDestinations.get(newDest);
if (dest == null)
{
dest = newDest;
fDestinations.put(dest, dest);
}
return dest;
}
private Map<String, DeploymentReceiverTransportAdapter> deploymentReceiverTransportAdapters;
/**
* The deployment transport adapters provide the factories used to connect to a remote file system receiver.
@@ -1377,6 +1473,54 @@ public class DeploymentServiceImpl implements DeploymentService
return numberOfSendingThreads;
}
public void setJobLockService(JobLockService jobLockService) {
this.jobLockService = jobLockService;
}
public JobLockService getJobLockService() {
return jobLockService;
}
public void setTargetLockTimeToLive(long targetLockTimeToLive) {
this.targetLockTimeToLive = targetLockTimeToLive;
}
public long getTargetLockTimeToLive() {
return targetLockTimeToLive;
}
public void setTargetLockRetryWait(long targetLockRetryWait) {
this.targetLockRetryWait = targetLockRetryWait;
}
public long getTargetLockRetryWait() {
return targetLockRetryWait;
}
public void setTargetLockRetryCount(int targetLockRetryCount) {
this.targetLockRetryCount = targetLockRetryCount;
}
public int getTargetLockRetryCount() {
return targetLockRetryCount;
}
public void setAvmNodeService(AVMNodeService fAVMNodeService) {
this.fAVMNodeService = fAVMNodeService;
}
public AVMNodeService getAvmNodeService() {
return fAVMNodeService;
}
public void setOutputBufferSize(int outputBufferSize) {
this.outputBufferSize = outputBufferSize;
}
public int getOutputBufferSize() {
return outputBufferSize;
}
/**
* This thread processes the event queue to do the callbacks
* @author mrogers
@@ -1439,6 +1583,47 @@ public class DeploymentServiceImpl implements DeploymentService
stopMe = true;
}
}
/**
* Inner Class to Decorate the jobLockService to add control over the refreshLock behaviour to
* reduce the number of calls to the underlying lock service.
*/
private class Lock
{
QName lockQName;
long lockTime;
public Lock(QName lockQName)
{
this.lockQName = lockQName;
}
/**
* @throws LockAquisitionException
*/
public void makeLock()
{
jobLockService.getTransacionalLock(lockQName, getTargetLockTimeToLive(), getTargetLockRetryWait(), getTargetLockRetryCount());
lockTime = new Date().getTime();
fgLogger.debug("lock taken" + lockQName);
}
public void refreshLock()
{
/**
* Optimisation to stop the lock being refreshed thousands of times, refresh lock only after half lock time has expired
*/
Date now = new Date();
if(now.getTime() - lockTime > (targetLockTimeToLive / 2))
{
fgLogger.debug("lock refreshed" + lockQName);
jobLockService.getTransacionalLock(lockQName, getTargetLockTimeToLive(), getTargetLockRetryWait(), getTargetLockRetryCount());
lockTime = new Date().getTime();
}
}
}
/**
@@ -1516,7 +1701,7 @@ public class DeploymentServiceImpl implements DeploymentService
{
service.delete(ticket, event.getDestination());
}
else if (event.getType().equals(DeploymentEvent.Type.COPIED))
else if (event.getType().equals(DeploymentEvent.Type.CREATED))
{
AVMNodeDescriptor src = work.getSrc();
if(src.isFile())
@@ -1571,25 +1756,40 @@ public class DeploymentServiceImpl implements DeploymentService
{
public Boolean execute() throws Exception
{
ContentData data = avmService.getContentDataForRead(src);
InputStream in = avmService.getFileInputStream(src);
String encoding = data.getEncoding();
String mimeType = data.getMimetype();
OutputStream out = service.send(ticket, dstPath, src.getGuid());
OutputStream baseStream = out; // finish send needs out, not a decorated stream
Set<String>stringAspects = getAspects(avmService, src);
Map<String, Serializable> stringProperties = getProperties(src);
OutputStream out = service.send(ticket, dstPath, src.getGuid(), encoding, mimeType, stringAspects, stringProperties);
try
{
// Buffer the output, we don't want to send lots of small packets
out = new BufferedOutputStream(out, 10000);
out = new BufferedOutputStream(out, outputBufferSize);
// Call content transformers here to transform from local to network format
if(transformers != null && transformers.size() > 0) {
// yes we have pay-load transformers
for(DeploymentTransportOutputFilter transformer : transformers)
{
out = transformer.addFilter(out, src.getPath());
out = transformer.addFilter(out, src.getPath(), encoding, mimeType);
}
}
copyStream(in, out);
service.finishSend(ticket, baseStream);
}
finally
{
// whatever happens close the output stream
if(out != null)
{
out.close();
out = null;
}
}
return true;
}
}, true);

View File

@@ -180,14 +180,14 @@ public class FSDeploymentTest extends AVMServiceTestBase
assertTrue("first deployment no start", firstDeployment.contains(new DeploymentEvent(DeploymentEvent.Type.START, null, TEST_TARGET)));
assertTrue("first deployment no finish", firstDeployment.contains(new DeploymentEvent(DeploymentEvent.Type.END, null, TEST_TARGET)));
assertTrue("first deployment wrong size", firstDeployment.size() == 11);
assertTrue("Update missing: /a", firstDeployment.contains(new DeploymentEvent(DeploymentEvent.Type.COPIED, null, "/a")));
assertTrue("Update missing: /a/b", firstDeployment.contains(new DeploymentEvent(DeploymentEvent.Type.COPIED, null, "/a/b")));
assertTrue("Update missing: /a/b/c", firstDeployment.contains(new DeploymentEvent(DeploymentEvent.Type.COPIED, null, "/a/b/c")));
assertTrue("Update missing: /d/e", firstDeployment.contains(new DeploymentEvent(DeploymentEvent.Type.COPIED, null, "/d/e")));
assertTrue("Update missing: /a/b/c/foo", firstDeployment.contains(new DeploymentEvent(DeploymentEvent.Type.COPIED, null, "/a/b/c/foo")));
assertTrue("Update missing: /a/b/c/bar", firstDeployment.contains(new DeploymentEvent(DeploymentEvent.Type.COPIED, null, "/a/b/c/bar")));
assertTrue("Update missing: /a/b/buffy", firstDeployment.contains(new DeploymentEvent(DeploymentEvent.Type.COPIED, null, "/a/b/buffy")));
assertFalse("Fudge has not been excluded", firstDeployment.contains(new DeploymentEvent(DeploymentEvent.Type.COPIED, null, "/a/b/fudge.bak")));
assertTrue("Update missing: /a", firstDeployment.contains(new DeploymentEvent(DeploymentEvent.Type.CREATED, null, "/a")));
assertTrue("Update missing: /a/b", firstDeployment.contains(new DeploymentEvent(DeploymentEvent.Type.CREATED, null, "/a/b")));
assertTrue("Update missing: /a/b/c", firstDeployment.contains(new DeploymentEvent(DeploymentEvent.Type.CREATED, null, "/a/b/c")));
assertTrue("Update missing: /d/e", firstDeployment.contains(new DeploymentEvent(DeploymentEvent.Type.CREATED, null, "/d/e")));
assertTrue("Update missing: /a/b/c/foo", firstDeployment.contains(new DeploymentEvent(DeploymentEvent.Type.CREATED, null, "/a/b/c/foo")));
assertTrue("Update missing: /a/b/c/bar", firstDeployment.contains(new DeploymentEvent(DeploymentEvent.Type.CREATED, null, "/a/b/c/bar")));
assertTrue("Update missing: /a/b/buffy", firstDeployment.contains(new DeploymentEvent(DeploymentEvent.Type.CREATED, null, "/a/b/buffy")));
assertFalse("Fudge has not been excluded", firstDeployment.contains(new DeploymentEvent(DeploymentEvent.Type.CREATED, null, "/a/b/fudge.bak")));
// Check that files exist on target
File target = new File("target");
@@ -342,7 +342,7 @@ public class FSDeploymentTest extends AVMServiceTestBase
Set<DeploymentEvent> firstDeployment = new HashSet<DeploymentEvent>();
firstDeployment.addAll(report.getEvents());
assertTrue("Update missing: /a/test02.html", firstDeployment.contains(new DeploymentEvent(DeploymentEvent.Type.COPIED, null, "/a/test02.html")));
assertTrue("Update missing: /a/test02.html", firstDeployment.contains(new DeploymentEvent(DeploymentEvent.Type.CREATED, null, "/a/test02.html")));
assertTrue("delete missing: /a/test03.html", firstDeployment.contains(new DeploymentEvent(DeploymentEvent.Type.DELETED, null, "/a/test03.html")));
@@ -409,7 +409,7 @@ public class FSDeploymentTest extends AVMServiceTestBase
{
System.out.println(event);
}
assertTrue("Update missing", smallUpdate.contains(new DeploymentEvent(DeploymentEvent.Type.COPIED, null, "/a/b/fudge.bak")));
assertTrue("Update missing", smallUpdate.contains(new DeploymentEvent(DeploymentEvent.Type.CREATED, null, "/a/b/fudge.bak")));
assertEquals(5, smallUpdate.size());
}
@@ -455,7 +455,7 @@ public class FSDeploymentTest extends AVMServiceTestBase
{
System.out.println(event);
}
assertTrue("Update missing", smallUpdate.contains(new DeploymentEvent(DeploymentEvent.Type.COPIED, null, "/a/b/Zander")));
assertTrue("Update missing", smallUpdate.contains(new DeploymentEvent(DeploymentEvent.Type.CREATED, null, "/a/b/Zander")));
assertTrue("Update missing", smallUpdate.contains(new DeploymentEvent(DeploymentEvent.Type.DELETED, null, "/a/b/Drusilla")));
assertTrue("Update missing", smallUpdate.contains(new DeploymentEvent(DeploymentEvent.Type.DELETED, null, "/a/b/Master")));
assertEquals(5, smallUpdate.size());

View File

@@ -78,6 +78,7 @@ public interface JobLockService
* make use of retrying; the lock is actually being refreshed and will therefore never
* become valid if it doesn't refresh directly.
*
* @param timeToLive the time (in milliseconds) for the lock to remain valid
* @param retryWait the time (in milliseconds) to wait before trying again
* @param retryCount the maximum number of times to attempt the lock acquisition
* @throws LockAcquisitionException if the lock could not be acquired

View File

@@ -43,7 +43,7 @@ public class DeploymentEvent implements Serializable
*/
public static enum Type implements Serializable
{
COPIED, // Copied a source node that did not exist on the destination.
CREATED, // Copied a source node that did not exist on the destination.
UPDATED, // Overwrote the destination.
DELETED, // Deleted the destination node.
START, // A Deployment has begun.