diff --git a/source/java/org/alfresco/repo/search/impl/lucene/index/IndexEntry.java b/source/java/org/alfresco/repo/search/impl/lucene/index/IndexEntry.java new file mode 100644 index 0000000000..56c8942ce8 --- /dev/null +++ b/source/java/org/alfresco/repo/search/impl/lucene/index/IndexEntry.java @@ -0,0 +1,112 @@ +/* + * Copyright (C) 2005 Alfresco, Inc. + * + * Licensed under the Mozilla Public License version 1.1 + * with a permitted attribution clause. You may obtain a + * copy of the License at + * + * http://www.alfresco.org/legal/license.txt + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific + * language governing permissions and limitations under the + * License. + */ +package org.alfresco.repo.search.impl.lucene.index; + +/** + * Describes an entry in an index + * + * @author Andy Hind + */ +class IndexEntry +{ + /** + * The type of the index entry + */ + IndexType type; + + /** + * The unique name of the index entry + */ + String name; + + /** + * The preceeding index name. + * Allows deltas etc to apply to the index or an overlay for example. + */ + String parentName; + + /** + * The status of the inedx entry + */ + TransactionStatus status; + + /** + * If merging, the id where the result is going + */ + String mergeId; + + IndexEntry(IndexType type, String name, String parentName, TransactionStatus status, String mergeId) + { + this.type = type; + this.name = name; + this.parentName = parentName; + this.status = status; + this.mergeId = mergeId; + } + + public String getMergeId() + { + return mergeId; + } + + public void setMergeId(String mergeId) + { + this.mergeId = mergeId; + } + + public String getName() + { + return name; + } + + public void setName(String name) + { + this.name = name; + } + + public String getParentName() + { + return parentName; + } + + public void setParentName(String parentName) + { + this.parentName = parentName; + } + + public TransactionStatus getStatus() + { + return status; + } + + public void setStatus(TransactionStatus status) + { + this.status = status; + } + + public IndexType getType() + { + return type; + } + + public void setType(IndexType type) + { + this.type = type; + } + + +} \ No newline at end of file diff --git a/source/java/org/alfresco/repo/search/impl/lucene/index/IndexInfo.java b/source/java/org/alfresco/repo/search/impl/lucene/index/IndexInfo.java new file mode 100644 index 0000000000..8120d2608d --- /dev/null +++ b/source/java/org/alfresco/repo/search/impl/lucene/index/IndexInfo.java @@ -0,0 +1,650 @@ +/* + * Copyright (C) 2005 Alfresco, Inc. + * + * Licensed under the Mozilla Public License version 1.1 + * with a permitted attribution clause. You may obtain a + * copy of the License at + * + * http://www.alfresco.org/legal/license.txt + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific + * language governing permissions and limitations under the + * License. + */ +package org.alfresco.repo.search.impl.lucene.index; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; +import java.nio.channels.FileChannel.MapMode; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Set; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.zip.CRC32; + +import org.alfresco.error.AlfrescoRuntimeException; +import org.alfresco.repo.search.impl.lucene.LuceneAnalyser; +import org.alfresco.service.cmr.dictionary.DictionaryService; +import org.alfresco.util.GUID; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.Term; + +/** + * The information that makes up an index. + * + * IndexInfoVersion + * + * Repeated information of the form + *
    + *
  1. Index Type. + *
  2. sub-directory name. + *
  3. Status + *
      + *
    1. Indexes, sub indexes, and overlays must be committed. Status is ACTIVE, MERGING, COMPLETING_INDEX + *
    2. Delta: Transaction status + *
    3. Overlay: Transaction status + *
    + *
+ * + * Merges always take place to new indexes so we can detect merge failure or partial merges. Or we do not know what has merged. + * + * Incomplete delete merging does not matter - the overlay would still exist and be treated as such. So a document may be deleted in the index as well as in the applied overlay. It + * is still correctly deleted. + * + * TODO: Write element status into individual directories. This would be enough for recovery if both index files are lost or corrupted. + * + * TODO: Tidy up index status at start up or after some time. How long would you leave a merge to run? + * + * @author Andy Hind + */ +public class IndexInfo +{ + + private static final boolean useNIOMemoryMapping = true; + + /** + * The default name for the file that holds the index information + */ + private static String INDEX_INFO = "IndexInfo"; + + /** + * The default name for the back up file that holds the index information + */ + private static String INDEX_INFO_BACKUP = "IndexInfoBackup"; + + /** + * The directory that holds the index + */ + private File indexDirectory; + + /** + * The file holding the index information + */ + private RandomAccessFile indexInfoRAF; + + /** + * And its file channel + */ + private FileChannel indexInfoChannel; + + /** + * The file holding the backup index information. + */ + + private RandomAccessFile indexInfoBackupRAF; + + /** + * And its file channel + */ + private FileChannel indexInfoBackupChannel; + + /** + * The file version. Negative is not yet written. + */ + private long version = -1; + + /** + * The index entries that make up this index. Map entries are looked up by name. These are maintained in order so document order is maintained. + */ + private LinkedHashMap indexEntries = new LinkedHashMap(); + + /** + * Lock for the index entries + */ + ReentrantReadWriteLock entriesReadWriteLock = new ReentrantReadWriteLock(); + + /** + * Lock for switching over the main cached reader + */ + ReentrantReadWriteLock mainIndexReaderReadWriteLock = new ReentrantReadWriteLock(); + + /** + * Read only index readers that also do reference counting. + */ + private HashMap referenceCountingReadOnlyIndexReaders = new HashMap(); + + /** + * Index writers + */ + private HashMap indexWriters = new HashMap(); + + /** + * Index Readers + */ + private HashMap indexReaders = new HashMap(); + + private DictionaryService dictionaryService; + + public IndexInfo(File indexDirectory) + { + super(); + this.indexDirectory = indexDirectory; + + if (!this.indexDirectory.exists()) + { + if (this.indexDirectory.mkdirs()) + { + throw new AlfrescoRuntimeException("Failed to create index directory"); + } + } + if (!this.indexDirectory.isDirectory()) + { + throw new AlfrescoRuntimeException("The index must be held in a directory"); + } + + File indexInfoFile = new File(this.indexDirectory, INDEX_INFO); + File indexInfoBackupFile = new File(this.indexDirectory, INDEX_INFO_BACKUP); + if (createFile(indexInfoFile) && createFile(indexInfoBackupFile)) + { + version = 0; + } + + this.indexInfoRAF = openFile(indexInfoFile); + this.indexInfoChannel = this.indexInfoRAF.getChannel(); + + this.indexInfoBackupRAF = openFile(indexInfoBackupFile); + this.indexInfoBackupChannel = this.indexInfoBackupRAF.getChannel(); + + if (version == -1) + { + entriesReadWriteLock.writeLock().lock(); + try + { + doWithFileLock(new LockWork() + { + public Object doWork() throws Exception + { + checkStatus(); + return null; + } + + }); + } + finally + { + entriesReadWriteLock.writeLock().unlock(); + } + } + } + + public void setDictionaryService(DictionaryService dictionaryService) + { + this.dictionaryService = dictionaryService; + } + + public DictionaryService getDictionaryService() + { + return dictionaryService; + } + + /** + * This method should only be called from one thread. + * + * @param id + * @return + * @throws IOException + */ + public IndexReader getDeltaIndexReader(String id) throws IOException + { + IndexReader reader = indexReaders.get(id); + if (reader == null) + { + // close index writer if required + closeDeltaIndexWriter(id); + File location = ensureDeltaExistsAndIsRegistered(id); + reader = IndexReader.open(location); + indexReaders.put(id, reader); + } + return reader; + } + + private File ensureDeltaExistsAndIsRegistered(String id) throws IOException + { + File location = new File(indexDirectory, id); + if (!IndexReader.indexExists(location)) + { + IndexWriter creator = new IndexWriter(location, new LuceneAnalyser(dictionaryService), true); + creator.setUseCompoundFile(true); + creator.close(); + } + entriesReadWriteLock.readLock().lock(); + try + { + if (!indexEntries.containsKey(id)) + { + entriesReadWriteLock.writeLock().lock(); + try + { + indexEntries.put(id, new IndexEntry(IndexType.DELTA, id, "", TransactionStatus.ACTIVE, "")); + } + finally + { + entriesReadWriteLock.writeLock().unlock(); + } + } + } + finally + { + entriesReadWriteLock.readLock().unlock(); + } + return location; + } + + public IndexWriter getDeltaIndexWriter(String id) throws IOException + { + IndexWriter writer = indexWriters.get(id); + if (writer == null) + { + // close index writer if required + closeDeltaIndexReader(id); + File location = ensureDeltaExistsAndIsRegistered(id); + writer = new IndexWriter(location, new LuceneAnalyser(dictionaryService), false); + indexWriters.put(id, writer); + } + return writer; + } + + public void closeDeltaIndexReader(String id) throws IOException + { + IndexReader reader = indexReaders.get(id); + if (reader != null) + { + reader.close(); + indexReaders.remove(id); + } + } + + public void closeDeltaIndexWriter(String id) throws IOException + { + IndexWriter writer = indexWriters.get(id); + if (writer != null) + { + writer.close(); + indexWriters.remove(id); + } + } + + public IndexReader getMainIndexReferenceCountingReadOnlyIndexReader() + { + return null; + } + + public IndexReader getMainIndexReferenceCountingReadOnlyIndexReader(String id) + { + return null; + } + + public void setStatus(String id, TransactionStatus status, Set toDelete) + { + + } + + private static boolean createFile(File file) + { + + if (!file.exists()) + { + try + { + file.createNewFile(); + return true; + } + catch (IOException e) + { + throw new AlfrescoRuntimeException("Failed to create info file", e); + } + } + return false; + } + + private static RandomAccessFile openFile(File file) + { + try + { + if (useNIOMemoryMapping) + { + return new RandomAccessFile(file, "rw"); + } + else + { + return new RandomAccessFile(file, "rws"); + } + } + catch (FileNotFoundException e) + { + throw new AlfrescoRuntimeException("Failed to open index info file", e); + } + } + + /** + * Check status must be called holding the file lock. + * + * @param raf + * @throws IOException + */ + private void checkStatus() throws IOException + { + try + { + setStatusFromFile(indexInfoChannel); + } + catch (IOException e) + { + // The first data file is corrupt so we fall back to the back up + System.out.println("BACKUP"); + setStatusFromFile(indexInfoBackupChannel); + } + } + + private void setStatusFromFile(FileChannel channel) throws IOException + { + if (channel.size() > 0) + { + channel.position(0); + ByteBuffer buffer; + + if (useNIOMemoryMapping) + { + MappedByteBuffer mbb = channel.map(MapMode.READ_ONLY, 0, channel.size()); + mbb.load(); + buffer = mbb; + } + else + { + buffer = ByteBuffer.wrap(new byte[(int) channel.size()]); + channel.read(buffer); + buffer.position(0); + } + + buffer.position(0); + long onDiskVersion = buffer.getLong(); + if (version != onDiskVersion) + { + CRC32 crc32 = new CRC32(); + crc32.update((int) (onDiskVersion >>> 32) & 0xFFFFFFFF); + crc32.update((int) (onDiskVersion >>> 0) & 0xFFFFFFFF); + int size = buffer.getInt(); + crc32.update(size); + LinkedHashMap newIndexEntries = new LinkedHashMap(); + for (int i = 0; i < size; i++) + { + String indexTypeString = readString(buffer, crc32); + IndexType indexType; + try + { + indexType = IndexType.valueOf(indexTypeString); + } + catch (IllegalArgumentException e) + { + throw new IOException("Invalid type " + indexTypeString); + } + + String name = readString(buffer, crc32); + + String parentName = readString(buffer, crc32); + + String txStatus = readString(buffer, crc32); + TransactionStatus status; + try + { + status = TransactionStatus.valueOf(txStatus); + } + catch (IllegalArgumentException e) + { + throw new IOException("Invalid status " + txStatus); + } + + String mergeId = readString(buffer, crc32); + + newIndexEntries.put(name, new IndexEntry(indexType, name, parentName, status, mergeId)); + } + long onDiskCRC32 = buffer.getLong(); + if (crc32.getValue() == onDiskCRC32) + { + version = onDiskVersion; + indexEntries = newIndexEntries; + } + else + { + throw new IOException("Invalid file check sum"); + } + } + } + + } + + private String readString(ByteBuffer buffer, CRC32 crc32) throws UnsupportedEncodingException + { + int size = buffer.getInt(); + char[] chars = new char[size]; + for (int i = 0; i < size; i++) + { + chars[i] = buffer.getChar(); + } + String string = new String(chars); + + crc32.update(string.getBytes("UTF-8")); + return string; + } + + private void writeString(ByteBuffer buffer, CRC32 crc32, String string) throws UnsupportedEncodingException + { + char[] chars = string.toCharArray(); + buffer.putInt(chars.length); + + for (int i = 0; i < chars.length; i++) + { + buffer.putChar(chars[i]); + } + crc32.update(string.getBytes("UTF-8")); + } + + private void writeStatus() throws IOException + { + version++; + writeStatusToFile(indexInfoChannel); + writeStatusToFile(indexInfoBackupChannel); + } + + private void writeStatusToFile(FileChannel channel) throws IOException + { + long size = getBufferSize(); + + ByteBuffer buffer; + if (useNIOMemoryMapping) + { + MappedByteBuffer mbb = channel.map(MapMode.READ_WRITE, 0, size); + mbb.load(); + buffer = mbb; + } + else + { + channel.truncate(size); + buffer = ByteBuffer.wrap(new byte[(int) size]); + } + + buffer.position(0); + + buffer.putLong(version); + CRC32 crc32 = new CRC32(); + crc32.update((int) (version >>> 32) & 0xFFFFFFFF); + crc32.update((int) (version >>> 0) & 0xFFFFFFFF); + + buffer.putInt(indexEntries.size()); + crc32.update(indexEntries.size()); + + for (IndexEntry entry : indexEntries.values()) + { + String entryType = entry.getType().toString(); + writeString(buffer, crc32, entryType); + + writeString(buffer, crc32, entry.getName()); + + writeString(buffer, crc32, entry.getParentName()); + + String entryStatus = entry.getStatus().toString(); + writeString(buffer, crc32, entryStatus); + + writeString(buffer, crc32, entry.getMergeId()); + } + buffer.putLong(crc32.getValue()); + + if (useNIOMemoryMapping) + { + ((MappedByteBuffer) buffer).force(); + } + else + { + buffer.rewind(); + channel.position(0); + channel.write(buffer); + } + } + + private long getBufferSize() + { + long size = 0; + size += 8; + size += 4; + for (IndexEntry entry : indexEntries.values()) + { + String entryType = entry.getType().toString(); + size += (entryType.length() * 2) + 4; + size += (entry.getName().length() * 2) + 4; + size += (entry.getParentName().length() * 2) + 4; + String entryStatus = entry.getStatus().toString(); + size += (entryStatus.length() * 2) + 4; + size += (entry.getMergeId().length() * 2) + 4; + } + size += 8; + return size; + } + + public interface LockWork + { + public Result doWork() throws Exception; + } + + public R doWithFileLock(LockWork lockWork) + { + FileLock fileLock = null; + R result = null; + try + { + fileLock = indexInfoChannel.lock(); + result = lockWork.doWork(); + return result; + } + catch (Throwable exception) + { + + // Re-throw the exception + if (exception instanceof RuntimeException) + { + throw (RuntimeException) exception; + } + else + { + throw new RuntimeException("Error during run with lock.", exception); + } + } + finally + { + if (fileLock != null) + { + try + { + fileLock.release(); + } + catch (IOException e) + { + } + } + } + } + + public static void main(String[] args) + { + int repeat = 100; + final IndexInfo ii = new IndexInfo(new File("c:\\indexTest")); + ii.indexEntries.clear(); + + for (int i = 0; i < 100; i++) + { + String guid = GUID.generate(); + ii.indexEntries.put(guid, new IndexEntry(IndexType.INDEX, guid, GUID.generate(), + TransactionStatus.COMMITTED, "")); + } + + long totalTime = 0; + long count = 0; + + while (true) + { + long start = System.nanoTime(); + + for (int i = 0; i < repeat; i++) + { + ii.entriesReadWriteLock.writeLock().lock(); + try + { + ii.doWithFileLock(new LockWork() + { + public Object doWork() throws Exception + { + + ii.version = 0; + ii.checkStatus(); + ii.writeStatus(); + return null; + } + }); + } + finally + { + ii.entriesReadWriteLock.writeLock().unlock(); + } + } + + long end = System.nanoTime(); + + totalTime += (end - start); + count += repeat; + float average = count * 1000000000f / totalTime; + + System.out.println("Repeated " + + repeat + " in " + ((end - start) / 1000000000.0) + " average = " + average); + } + } +} diff --git a/source/java/org/alfresco/repo/search/impl/lucene/index/IndexType.java b/source/java/org/alfresco/repo/search/impl/lucene/index/IndexType.java new file mode 100644 index 0000000000..e690641b13 --- /dev/null +++ b/source/java/org/alfresco/repo/search/impl/lucene/index/IndexType.java @@ -0,0 +1,41 @@ +package org.alfresco.repo.search.impl.lucene.index; + +/** + * The type of an entry in this index. + * + * @author Andy Hind + */ +public enum IndexType +{ + /** + * Identifies the main index. This is always a fully optimised index. + */ + INDEX, + + /** + * Identifies a subindex. This will also be optimised. Sub-indexes are periodically merged into the index. + */ + SUBINDEX, + + /** + * An overlay. This is an optimised index with a deletion list. To commit an overlay requires no deletions against other indexes. Deletions are done when an overlay turns + * into or is merged into a subindex. Overlays are periodically merged into a sub index. An overlay can require or have background properties indexed + */ + INDEX_OVERLAY, + + /** + * A long running overlay defintion against the index. Not yet supported. + * This, itself, may have transactional additions. + */ + OVERLAY, + + /** + * A delta is a transactional change set. This commits to an overlay index. + */ + DELTA, + + /** + * A delta to an overlay + */ + OVERLAY_DELTA; +} \ No newline at end of file diff --git a/source/java/org/alfresco/repo/search/impl/lucene/index/ReferenceCountingReadOnlyIndexReaderFactory.java b/source/java/org/alfresco/repo/search/impl/lucene/index/ReferenceCountingReadOnlyIndexReaderFactory.java new file mode 100644 index 0000000000..5d1cbd3713 --- /dev/null +++ b/source/java/org/alfresco/repo/search/impl/lucene/index/ReferenceCountingReadOnlyIndexReaderFactory.java @@ -0,0 +1,118 @@ +/* + * Copyright (C) 2005 Alfresco, Inc. + * + * Licensed under the Mozilla Public License version 1.1 + * with a permitted attribution clause. You may obtain a + * copy of the License at + * + * http://www.alfresco.org/legal/license.txt + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific + * language governing permissions and limitations under the + * License. + */ +package org.alfresco.repo.search.impl.lucene.index; + +import java.io.IOException; + +import org.aopalliance.intercept.MethodInvocation; +import org.apache.lucene.index.IndexReader; +import org.springframework.aop.framework.ProxyFactory; +import org.springframework.aop.framework.adapter.AdvisorAdapterRegistry; +import org.springframework.aop.framework.adapter.GlobalAdvisorAdapterRegistry; +import org.springframework.aop.support.DelegatingIntroductionInterceptor; +import org.springframework.aop.target.SingletonTargetSource; + +public class ReferenceCountingReadOnlyIndexReaderFactory +{ + public interface RefCounting + { + public void incrementRefCount(); + + public void decrementRefCount() throws IOException; + + public boolean isUsed(); + + public void setClosable() throws IOException; + } + + public static IndexReader createReader(IndexReader indexReader) + { + AdvisorAdapterRegistry advisorAdapterRegistry = GlobalAdvisorAdapterRegistry.getInstance(); + ProxyFactory proxyFactory = new ProxyFactory(); + proxyFactory.addAdvisor(advisorAdapterRegistry.wrap(new Interceptor(indexReader))); + proxyFactory.setTargetSource(new SingletonTargetSource(indexReader)); + IndexReader proxy = (IndexReader) proxyFactory.getProxy(); + return proxy; + } + + public static class Interceptor extends DelegatingIntroductionInterceptor implements RefCounting + { + + private static final long serialVersionUID = 7693185658022810428L; + + IndexReader indexReader; + + int refCount = 0; + + boolean shouldClose = false; + + Interceptor(IndexReader indexReader) + { + this.indexReader = indexReader; + } + + public Object invoke(MethodInvocation mi) throws Throwable + { + // Read only + String methodName = mi.getMethod().getName(); + if (methodName.equals("delete") || methodName.equals("doDelete")) + { + throw new UnsupportedOperationException("Delete is not supported by read only index readers"); + } + // Close + else if (methodName.equals("close")) + { + decrementRefCount(); + return null; + } + else + { + return super.invoke(mi); + } + } + + public synchronized void incrementRefCount() + { + refCount++; + } + + public synchronized void decrementRefCount() throws IOException + { + refCount--; + closeIfRequired(); + } + + private void closeIfRequired() throws IOException + { + if ((refCount == 0) && shouldClose) + { + indexReader.close(); + } + } + + public synchronized boolean isUsed() + { + return (refCount > 0); + } + + public synchronized void setClosable() throws IOException + { + shouldClose = true; + closeIfRequired(); + } + } +} diff --git a/source/java/org/alfresco/repo/search/impl/lucene/index/TransactionStatus.java b/source/java/org/alfresco/repo/search/impl/lucene/index/TransactionStatus.java new file mode 100644 index 0000000000..c02edd7767 --- /dev/null +++ b/source/java/org/alfresco/repo/search/impl/lucene/index/TransactionStatus.java @@ -0,0 +1,73 @@ +package org.alfresco.repo.search.impl.lucene.index; + +/** + * Status of indexes that make up the whole index. + * This starts with the value from javax.transaction.Status. + * + * Lifecycle + * --------- + * + * As a transaction starts, the delta is ACTIVE + * It may be MARKED_ROLLBACK -> ROLLED BACK + * -> PREPARING -> PREPARED -> COMMITTING -> COMMITTED... + * with roll back at any time + * + * If the index has any delayed indexing it commits to + * COMMITTED_REQUIRES_REINDEX + * and then the overlay can go from -> COMMITTED_REINDEXING -> COMMITTED_REINDEXED + * + * If there was no reindexing required the delat commits as COMMITTED + * + * A delta changes to an index overlay as it is committed. + * + * For an overlay in COMMITTED or COMMITTED_REINDEXED it can have its delete list applied + * to sub indexes. At this point it becomes a sub index. + * + * @author Andy Hind + */ + +public enum TransactionStatus +{ + // Match the order in javax.transaction.Status so ordinal values are correct + ACTIVE, + MARKED_ROLLBACK, + PREPARED, + COMMITTED, + ROLLEDBACK, + UNKNOWN, + NO_TRANSACTION, + PREPARING, + COMMITTING, + ROLLINGBACK, + + /* + * This entry is the source for an active merge. + * The result will be in a new index. + */ + MERGING, + + /* + * A new index element that is being made by a merge. + */ + MERGING_TARGET, + + /* + * These index overlays require reindexing + */ + COMMITTED_REQUIRES_REINDEX, + + /* + * These index overlays are reindexing + */ + COMMITTED_REINDEXING, + + /* + * These index overlays have ben reindexed. + */ + COMMITTED_REINDEXED, + + /* + * An entry that may be deleted + */ + DELETABLE; +} \ No newline at end of file