[daisy-commits] [daisy] commit 3872: /trunk/daisy/

bruno at cocoondev.org bruno at cocoondev.org
Fri Mar 23 13:06:54 CDT 2007


User: bruno
Date: 2007/03/23 01:06 PM

Added:
 /trunk/daisy/services/jms/impl/src/java/org/outerj/daisy/jms/impl/
  JmsClientImpl.xinfo

Removed:
 /trunk/daisy/services/jms/impl/
  maven.xml

Modified:
 /trunk/daisy/repository/server/src/conf/
  block.xml
 /trunk/daisy/repository/server/src/java/org/outerj/daisy/event/
  EventDispatcherImpl.java
 /trunk/daisy/repository/server/src/java/org/outerj/daisy/ftindex/
  FullTextIndexImpl.java, FullTextIndexUpdater.java
 /trunk/daisy/services/jms/impl/
  project.xml
 /trunk/daisy/services/jms/impl/src/java/org/outerj/daisy/jms/impl/
  JmsClientImpl.java
 /trunk/daisy/services/jms/api/src/java/org/outerj/daisy/jms/
  Sender.java
 /trunk/daisy/repository/server/
  activemq-conf.xml.template, project.xml
 /trunk/daisy/repository/server/src/java/org/outerj/daisy/repository/serverimpl/
  LocalDocumentStrategy.java, LocalRepositoryManager.java
 /trunk/daisy/services/doctaskrunner/server-impl/src/java/org/outerj/daisy/doctaskrunner/serverimpl/
  CommonDocumentTaskManager.java
 /trunk/daisy/services/emailer/server-impl/src/java/org/outerj/daisy/emailer/serverimpl/
  CommonEmailer.java
 /trunk/daisy/services/emailnotifier/server-impl/src/java/org/outerj/daisy/emailnotifier/serverimpl/
  EmailNotifierImpl.java
 /trunk/daisy/repository/test/
  myconfig.xml.template, project.xml

Log:
 * Fulltext indexing:
    - upgraded to Lucene 2.1
    - Lucene 2.1 allows to delete an index entry through the IndexWriter, seriously improving performance, since before deleting could only be done through an IndexReader, and it was impossible to have both a modifying-reader and an IndexWriter open on the same index at the same time, requiring opening-closing of these all the time.
    - Related to the previous item, the index writer is now flushed by a background thread at a configurable interval (default 5 seconds), instead of at each document, further improving the fulltext indexing performance. (more room for improvement: allow to configure related lucene parameters on the IndexWriter)
    - The format of the fulltext index changed: deleting and rebuilding is necessary.
    - still to look at (monday): indexExists check doesn't work, but it might not be needed anymore.
 
 * Brought some consistency in how background work threads are handled, especially the shutting down, doing the logging similarly everywhere and fixing some possible wait-forever situations on shutdown. (could use some extra work, like an utility class to force consistency or a component managing these threads)
     -> still todo: look into shutting down document task threads at shut down, it seems this currently doesn't happen
 
 * Unregister listeners from JMS, to avoid (not really harmful but untidy) errors on shutdown.
 
 * ActiveMQ configuration: disabled the use of prepared statement pooling in the database connection pool configuration. This fixes the problem with ActiveMQ using tons of MySQL connections (which was also the reason for ActiveMQ shutting down automatically). Also removed the extra connection pool config since ActiveMQ never seems to use more than 2 connections now.
    -> in case you're wondering: yes, the usageManager config is still needed too.
 
 * Fixed a SQL locking/transaction issue in LocalDocumentStrategy.storeSummary: the pattern of trying to update and do insert when no records are updated doesn't work.
 
 * EmailNotifier: the JMS listener was registered too early, causing errors if messages are getting delivered before all required initialization was done.
 
 * JmsClientImpl: various bugfixes and cleanup:
     - start at most one connection establishing thread
     - remove some potential deadlocks
     - fixed unregistering of listeners
     - ...

File Changes:

Directory: /trunk/daisy/repository/server/src/conf/
===================================================

File [modified]: block.xml
Delta lines: +2 -2
===================================================================
--- trunk/daisy/repository/server/src/conf/block.xml	2007-03-22 15:15:48 UTC (rev 3871)
+++ trunk/daisy/repository/server/src/conf/block.xml	2007-03-23 18:06:30 UTC (rev 3872)
@@ -164,8 +164,8 @@
         <repository>
           <resource id="daisy:daisy-repository-common-impl" version="2.0-dev"/>
           <resource id="daisy:daisy-repository-server-impl" version="2.0-dev"/>
-          <resource id="lucene:lucene-core" version="2.0.0"/>
-          <resource id="lucene:lucene-highlighter" version="2.0.0"/>
+          <resource id="lucene:lucene-core" version="2.1.0"/>
+          <resource id="lucene:lucene-highlighter" version="2.1.0"/>
           <resource id="jcifs:jcifs" version="1.1.11"/>
         </repository>
       </classpath>

Directory: /trunk/daisy/repository/server/src/java/org/outerj/daisy/event/
==========================================================================

File [modified]: EventDispatcherImpl.java
Delta lines: +13 -9
===================================================================
--- trunk/daisy/repository/server/src/java/org/outerj/daisy/event/EventDispatcherImpl.java	2007-03-22 15:15:48 UTC (rev 3871)
+++ trunk/daisy/repository/server/src/java/org/outerj/daisy/event/EventDispatcherImpl.java	2007-03-23 18:06:30 UTC (rev 3872)
@@ -46,7 +46,6 @@
     private String jmsTopicName;
     private JmsClient jmsClient;
     private Sender topicSender;
-    private boolean stopping = false;
 
     /**
      * @avalon.dependency key="datasource" type="javax.sql.DataSource"
@@ -76,11 +75,14 @@
     }
 
     public void stop() throws Exception {
-        stopping = true;
+        getLogger().info("Waiting for event dispatcher thread to end.");
         eventDispatchThread.interrupt();
         try {
             eventDispatchThread.join();
-        } catch (InterruptedException e) {}
+        } catch (InterruptedException e) {
+            // ignore
+        }
+        jmsClient.unregisterSender(topicSender);
     }
 
     private class EventDispatchThread extends Thread {
@@ -113,7 +115,7 @@
                         Iterator seqnrsToProcessIt = seqnrsToProcess.iterator();
                         while (seqnrsToProcessIt.hasNext()) {
                             // Check if we don't want to stop
-                            if (stopping)
+                            if (Thread.interrupted())
                                 return;
                             long seqnr = ((Long)seqnrsToProcessIt.next()).longValue();
 
@@ -132,7 +134,7 @@
 
                             // Again check if we don't want to stop, in an attempt to avoid a forever-wait
                             // condition in ActiveMQ when trying to send a message while the VM is shutting down.
-                            if (stopping)
+                            if (Thread.interrupted())
                                 return;
                             topicSender.send(jmsMessage);
 
@@ -140,10 +142,10 @@
                             removeEventStmt.execute();
                         }
                     } catch (Throwable e) {
-                        if (stopping) {
+                        if (e instanceof InterruptedException) {
                             return;
                         } else {
-                            EventDispatcherImpl.this.getLogger().error("Exception in event dispatcher.", e);
+                            getLogger().error("Exception in event dispatcher.", e);
                         }
                     } finally {
                         closeStatement(stmt);
@@ -156,12 +158,14 @@
                             getLogger().error("Failed to close database connection.", e);
                         }
                     }
-                    if (stopping)
+                    if (Thread.interrupted())
                         return;
                     wait(5000);
                 }
             } catch (InterruptedException e) {
-                EventDispatcherImpl.this.getLogger().info("Event dispatcher thread interrupted.");
+                // ignore
+            } finally {
+                getLogger().info("Event dispatcher thread ended.");
             }
         }
 

Directory: /trunk/daisy/repository/server/src/java/org/outerj/daisy/ftindex/
============================================================================

File [modified]: FullTextIndexImpl.java
Delta lines: +115 -35
===================================================================
--- trunk/daisy/repository/server/src/java/org/outerj/daisy/ftindex/FullTextIndexImpl.java	2007-03-22 15:15:48 UTC (rev 3871)
+++ trunk/daisy/repository/server/src/java/org/outerj/daisy/ftindex/FullTextIndexImpl.java	2007-03-23 18:06:30 UTC (rev 3872)
@@ -17,6 +17,7 @@
 
 import org.apache.avalon.framework.activity.Initializable;
 import org.apache.avalon.framework.activity.Disposable;
+import org.apache.avalon.framework.activity.Startable;
 import org.apache.avalon.framework.configuration.Configuration;
 import org.apache.avalon.framework.configuration.ConfigurationException;
 import org.apache.avalon.framework.configuration.Configurable;
@@ -60,12 +61,16 @@
  * @avalon.service type="org.outerj.daisy.ftindex.FullTextIndex"
  */
 public class FullTextIndexImpl extends AbstractLogEnabled implements FullTextIndex, Initializable, Serviceable,
-        Configurable, FullTextIndexImplMBean, SuspendableProcess, Disposable {
+        Configurable, FullTextIndexImplMBean, SuspendableProcess, Disposable, Startable {
+    private ServiceManager serviceManager;
     private File indexDirectory;
-    private ServiceManager serviceManager;
+    private IndexWriter indexWriter;
+    private boolean indexWriterDirty = false;
     private String indexerStatus;
+    private final Lock indexWriteLock = new ReentrantLock();
+    private int indexFlushInterval = 5000;
+    private Thread indexFlushThread = null;
     private IndexOptimizeThread indexOptimizeThread = null;
-    private final Lock indexWriteLock = new ReentrantLock();
     /**
      * The indexSearchLock is to control that not more than one party can modify
      * the indexSearchObjects instance variable at the same time.
@@ -76,6 +81,10 @@
     private SuspendForBackupRegistrar suspendForBackupRegistrar;
     private ReadWriteLock suspendIndexUpdatesLock = new ReentrantReadWriteLock(true);
 
+    private static final String FIELD_DOCUMENTID = "DocID";
+    private static final String FIELD_BRANCHID = "BranchID";
+    private static final String FIELD_LANGID = "LangID";
+    private static final String FIELD_VARIANTKEY = "VariantKey";
 
     public void configure(Configuration configuration) throws ConfigurationException {
         String directoryName = PropertyResolver.resolveProperties(configuration.getChild("indexDirectory").getValue());
@@ -85,6 +94,8 @@
         if (!indexDirectory.isDirectory())
             throw new ConfigurationException("The specified directory is not a directory: " + directoryName);
         getLogger().debug("Using the following as directory to store indexes: " + indexDirectory);
+
+        this.indexFlushInterval = configuration.getChild("indexFlushInterval").getValueAsInteger(indexFlushInterval);
     }
 
     /**
@@ -97,6 +108,10 @@
     }
 
     public void initialize() throws Exception {
+        // Make the initial IndexWriter instance
+        updateWriter();
+
+        // register with the mbean server
         MBeanServer mbeanServer = (MBeanServer)serviceManager.lookup("mbeanserver");
         try {
             mbeanServer.registerMBean(this, new ObjectName("Daisy:name=FullTextIndexer"));
@@ -109,12 +124,44 @@
         suspendForBackupRegistrar.register("Fulltext index", this);
     }
 
+    public void start() throws Exception {
+        indexFlushThread = new Thread(new IndexFlusher(), "Daisy index flusher");
+        indexFlushThread.setDaemon(true);
+        indexFlushThread.start();
+    }
+
+    public void stop() throws Exception {
+        if (indexOptimizeThread != null) {
+           try {
+               getLogger().info("Waiting for index optimalization thread to end.");
+               indexOptimizeThread.join();
+           } catch (InterruptedException e) {
+               // ignore
+           }
+        }
+
+        if (indexFlushThread != null) {
+            try {
+                getLogger().info("Waiting for index flush thread to end.");
+                indexFlushThread.interrupt();
+                indexFlushThread.join();
+            } catch (InterruptedException e) {
+                // ignore
+            }
+        }
+    }
+
     public synchronized void dispose() {
+        indexWriteLock.lock();
+        try {
+            closeIndexWriter();
+        } catch (IOException e) {
+            getLogger().error("Error closing fulltext index writer on shutdown.", e);
+        }
+        indexWriteLock.unlock();
+
         suspendForBackupRegistrar.unregister(this);
         serviceManager.release(suspendForBackupRegistrar);
-        if (indexOptimizeThread != null) {
-           try { indexOptimizeThread.join(); } catch (InterruptedException e) {}
-        }
     }
 
     public boolean suspendExecution(long msecs) throws InterruptedException {
@@ -202,8 +249,6 @@
     public void index(String documentId, long branchId, long languageId, String documentName, String content, String fields) throws Exception {
         suspendIndexUpdatesLock.readLock().lockInterruptibly();
         try {
-            boolean indexExists = indexExists();
-
             indexWriteLock.lockInterruptibly();
             try {
                 indexerStatus = "Updating index for document ID " + documentId + ", branch ID " + branchId + ", language ID " + languageId;
@@ -211,34 +256,15 @@
                 if (getLogger().isDebugEnabled())
                     getLogger().debug("Indexing content for document ID " + documentId + ", branch ID " + branchId + ", language ID " + languageId);
 
-                if (indexExists) {
-                    // first delete possible old indexed document parts
-                    IndexReader indexReader = IndexReader.open(indexDirectory);
-                    IndexSearcher indexSearcher = new IndexSearcher(indexReader);
-                    BooleanQuery booleanQuery = new BooleanQuery();
-                    booleanQuery.add(new TermQuery(new Term("DocID", documentId)), BooleanClause.Occur.MUST);
-                    booleanQuery.add(new TermQuery(new Term("BranchID", String.valueOf(branchId))), BooleanClause.Occur.MUST);
-                    booleanQuery.add(new TermQuery(new Term("LangID", String.valueOf(languageId))), BooleanClause.Occur.MUST);
-                    org.apache.lucene.search.Hits hits = indexSearcher.search(booleanQuery);
-                    if (hits.length() == 1) {
-                        indexReader.deleteDocument(hits.id(0));
-                    } else if (hits.length() > 1) {
-                        getLogger().error("More then one match in index for document ID " + documentId + ", branch ID " + branchId + ", language ID " + languageId + ". I will delete them all but this is *highly* suspicious.");
-                        for (int i = 0; i < hits.length(); i++) {
-                            indexReader.deleteDocument(hits.id(i));
-                        }
-                    }
-                    indexReader.close();
-                }
-
                 if (documentName != null || content != null || fields != null) {
                     // now index new stuff
-                    IndexWriter indexWriter = new IndexWriter(indexDirectory, new StandardAnalyzer(), !indexExists);
 
                     Document luceneDocument = new Document();
-                    luceneDocument.add(new Field("DocID", documentId, Field.Store.YES, Field.Index.UN_TOKENIZED));
-                    luceneDocument.add(new Field("BranchID", String.valueOf(branchId), Field.Store.YES, Field.Index.UN_TOKENIZED));
-                    luceneDocument.add(new Field("LangID", String.valueOf(languageId), Field.Store.YES, Field.Index.UN_TOKENIZED));
+                    luceneDocument.add(new Field(FIELD_DOCUMENTID, documentId, Field.Store.YES, Field.Index.UN_TOKENIZED));
+                    luceneDocument.add(new Field(FIELD_BRANCHID, String.valueOf(branchId), Field.Store.YES, Field.Index.UN_TOKENIZED));
+                    luceneDocument.add(new Field(FIELD_LANGID, String.valueOf(languageId), Field.Store.YES, Field.Index.UN_TOKENIZED));
+                    String variantString = documentId + "~" + branchId + "~" + languageId;
+                    luceneDocument.add(new Field(FIELD_VARIANTKEY, variantString, Field.Store.YES, Field.Index.UN_TOKENIZED));
 
                     if (documentName != null) {
                         luceneDocument.add(new Field("name", documentName, Field.Store.NO, Field.Index.TOKENIZED));
@@ -252,8 +278,8 @@
                         luceneDocument.add(new Field("fields", fields, Field.Store.NO, Field.Index.TOKENIZED));
                     }
 
-                    indexWriter.addDocument(luceneDocument);
-                    indexWriter.close();
+                    indexWriterDirty = true;
+                    indexWriter.updateDocument(new Term(FIELD_VARIANTKEY, variantString), luceneDocument);
                 }
             } finally {
                 indexerStatus = INDEXER_INACTIVE_MSG;
@@ -268,6 +294,28 @@
             throw new Exception("Error while indexing content for document " + documentId, e);
         } finally {
             suspendIndexUpdatesLock.readLock().unlock();
+        }
+    }
+
+    private void updateWriter() throws IOException {
+        getLogger().debug("Closing and reopening lucene index writer in order to flush changes.");
+        closeIndexWriter();
+        this.indexWriter = constructIndexWriter();
+    }
+
+    private IndexWriter constructIndexWriter() throws IOException {
+        return new IndexWriter(indexDirectory, new StandardAnalyzer());
+    }
+
+    /**
+     * Should only be called by methods that have the indexWriteLock.
+     */
+    private void closeIndexWriter() throws IOException {
+        if (this.indexWriter != null) {
+            this.indexWriter.close();
+            this.indexWriter = null;
+            this.indexWriterDirty = false;
+
             synchronized (indexSearchLock) {
                 if (indexSearchObjects != null) {
                     indexSearchObjects.close();
@@ -297,6 +345,37 @@
         return indexerStatus;
     }
 
+    public class IndexFlusher implements Runnable {
+        public void run() {
+            try {
+                while (true) {
+                    if (Thread.interrupted())
+                        return;
+
+                    Thread.sleep(indexFlushInterval);
+
+                    indexWriteLock.lockInterruptibly();
+                    try {
+                        if (indexWriterDirty) {
+                            try {
+                                updateWriter();
+                            } catch (Throwable e) {
+                                getLogger().error("Error updating fulltext index writer.", e);
+                            }
+                        }
+                    } finally {
+                        indexWriteLock.unlock();
+                    }
+                }
+            } catch (InterruptedException e) {
+                // ingore
+            } finally {
+                getLogger().info("Index flush thread ended.");
+
+            }
+        }
+    }
+
     public class IndexOptimizeThread extends Thread {
         public void run() {
             try {
@@ -318,7 +397,8 @@
                 try {
                     indexerStatus = "Running index optimization.";
                     getLogger().info("Starting index optimization.");
-                    indexWriter = new IndexWriter(indexDirectory, new StandardAnalyzer(), false);
+                    closeIndexWriter();
+                    indexWriter = constructIndexWriter();
                     indexWriter.optimize();
                 } catch (IOException e) {
                     getLogger().error("Error optimizing index.", e);

File [modified]: FullTextIndexUpdater.java
Delta lines: +5 -1
===================================================================
--- trunk/daisy/repository/server/src/java/org/outerj/daisy/ftindex/FullTextIndexUpdater.java	2007-03-22 15:15:48 UTC (rev 3871)
+++ trunk/daisy/repository/server/src/java/org/outerj/daisy/ftindex/FullTextIndexUpdater.java	2007-03-23 18:06:30 UTC (rev 3872)
@@ -126,10 +126,14 @@
     }
 
     public void dispose() {
+        jmsClient.unregisterListener(eventListener);
+        jmsClient.unregisterListener(fullTextQueueListener);
+        jmsClient.unregisterSender(fullTextQueueSender);
+
         if (reindexThread != null && reindexThread.isAlive()) {
+            getLogger().info("Waiting for reindex thread to end.");
             reindexThread.interrupt();
             try {
-                getLogger().info("Waiting for reindex thread to end.");
                 reindexThread.join(0);
             } catch (InterruptedException e) {
                 // ignore

Directory: /trunk/daisy/services/jms/impl/
==========================================

File [removed]: maven.xml
Delta lines: +0 -20
===================================================================
--- trunk/daisy/services/jms/impl/maven.xml	2007-03-22 15:15:48 UTC (rev 3871)
+++ trunk/daisy/services/jms/impl/maven.xml	2007-03-23 18:06:30 UTC (rev 3872)
@@ -1,20 +0,0 @@
-<!--
-  Copyright 2004 Outerthought bvba and Schaubroeck nv
-
-  Licensed under the Apache License, Version 2.0 (the "License");
-  you may not use this file except in compliance with the License.
-  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
-<project default="jar:jar" xmlns:maven="jelly:maven" xmlns:j="jelly:core" xmlns:util="jelly:util">
-  <preGoal name="java:compile">
-    <attainGoal name="avalon:meta"/>
-  </preGoal>
-</project>
\ No newline at end of file

File [modified]: project.xml
Delta lines: +6 -0
===================================================================
--- trunk/daisy/services/jms/impl/project.xml	2007-03-22 15:15:48 UTC (rev 3871)
+++ trunk/daisy/services/jms/impl/project.xml	2007-03-23 18:06:30 UTC (rev 3872)
@@ -59,6 +59,12 @@
           <include>block.xml</include>
         </includes>
       </resource>
+      <resource>
+        <directory>${basedir}/src/java/</directory>
+        <includes>
+          <include>**/*.xinfo</include>
+        </includes>
+      </resource>
     </resources>
   </build>
 

Directory: /trunk/daisy/services/jms/impl/src/java/org/outerj/daisy/jms/impl/
=============================================================================

File [modified]: JmsClientImpl.java
Delta lines: +107 -81
===================================================================
--- trunk/daisy/services/jms/impl/src/java/org/outerj/daisy/jms/impl/JmsClientImpl.java	2007-03-22 15:15:48 UTC (rev 3871)
+++ trunk/daisy/services/jms/impl/src/java/org/outerj/daisy/jms/impl/JmsClientImpl.java	2007-03-23 18:06:30 UTC (rev 3872)
@@ -17,7 +17,6 @@
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -44,10 +43,6 @@
 import org.outerj.daisy.jms.Sender;
 import org.outerj.daisy.configutil.PropertyResolver;
 
-/**
- * @avalon.component version="1.0" name="jsmclient" lifestyle="singleton"
- * @avalon.service type="org.outerj.daisy.jms.JmsClient"
- */
 public class JmsClientImpl extends AbstractLogEnabled implements JmsClient, Configurable, Initializable, Disposable, ThreadSafe, Serviceable {
     private Properties contextProperties;
     private String jmsUserName;
@@ -57,9 +52,12 @@
     private Connection jmsConnection;
     private static final int CONN_RETRY_INTERVAL = 10000;
     private boolean stopping = false;
-    private List consumers = new ArrayList();
-    private List senders = new ArrayList();
-    private List runningThreads = Collections.synchronizedList(new ArrayList());
+    private List<MyJmsMessageListener> consumers = new ArrayList<MyJmsMessageListener>();
+    private final Object consumersLock = new Object();
+    private List<SenderImpl> senders = new ArrayList<SenderImpl>();
+    private final Object sendersLock = new Object();
+    private Thread connectionEstablishThread;
+    private final Object connectionEstablishThreadLock = new Object();
     private ReadWriteLock suspendLock = new ReentrantReadWriteLock();
 
     public JmsClientImpl() {
@@ -87,12 +85,8 @@
     public void resume() {
         this.suspendLock.writeLock().unlock();
     }
-    
-    
-    /**
-     * @avalon.dependency key="driverregistrar" type="org.outerj.daisy.datasource.DriverRegistrar"
-     */
-    public void service(ServiceManager serviceManager) throws ServiceException {      
+
+    public void service(ServiceManager serviceManager) throws ServiceException {
         // used to set a dependency
     }
 
@@ -105,7 +99,7 @@
                 String value = PropertyResolver.resolveProperties(propertiesConf[i].getAttribute("value"));
                 // special hack for ActiveMQ on Windows: broker config should be an URL,
                 // but substitution of ${daisy.datadir} might contain backslashes, so
-                // convert backslashes to slashes 
+                // convert backslashes to slashes
                 if (value.indexOf("brokerConfig=xbean:file:") != -1) {
                     value = value.replaceAll("\\\\", "/");
                 }
@@ -118,7 +112,7 @@
             jmsUserName = jmsCredentials.getAttribute("username");
             jmsPassword = jmsCredentials.getAttribute("password");
         }
-        
+
         clientId = jmsConf.getChild("clientId").getValue();
 
         connectionFactoryName = jmsConf.getChild("connectionFactoryName").getValue();
@@ -132,7 +126,10 @@
     }
 
     protected void initializeJmsConnection(boolean failOnError) throws Exception {
-        while (jmsConnection == null) {
+        while (jmsConnection == null && !stopping) {
+            if (Thread.interrupted())
+                throw new InterruptedException();
+
             try {
                 getLogger().debug("Trying to establish JMS connection...");
                 Context context = getContext();
@@ -150,27 +147,40 @@
             } catch (Exception e) {
                 if (failOnError)
                     throw e;
-                try {
-                    Thread.sleep(CONN_RETRY_INTERVAL);
-                } catch (InterruptedException e1) {
-                    if (stopping)
-                        throw e1;
-                }
+                Thread.sleep(CONN_RETRY_INTERVAL);
             }
         }
         getLogger().info("JMS connection established.");
     }
 
     private class MyJmsExceptionListener implements ExceptionListener {
+        private boolean gotError = false;
+
         public void onException(JMSException e) {
             if (stopping)
                 return;
-            getLogger().error("Error with the JMS connection. Will automatically try to re-establish connection every " + CONN_RETRY_INTERVAL + " ms.", e);
-            connectionDown();
-            jmsConnection = null;
-            Thread thread = new ConnectionEstablisherThread();
-            runningThreads.add(thread);
-            thread.start();
+
+            synchronized (connectionEstablishThreadLock) {
+                try {
+                    jmsConnection.close();
+                } catch (Throwable t) {
+                    // ignore
+                }
+
+                if (gotError) {
+                    getLogger().error("Got another error on a JMS connection on which we got an error before.", e);
+                } else if (connectionEstablishThread == null) {
+                    gotError = true;
+                    getLogger().error("Error with the JMS connection. Will automatically try to re-establish connection every " + CONN_RETRY_INTERVAL + " ms.", e);
+                    connectionDown();
+                    jmsConnection = null;
+                    connectionEstablishThread = new ConnectionEstablisherThread();
+                    connectionEstablishThread.start();
+                } else {
+                    gotError = true;
+                    getLogger().error("Strange situation: got first error but there is already a connection establisher thread?", e);
+                }
+            }
         }
     }
 
@@ -184,10 +194,12 @@
             try {
                 initializeJmsConnection(false);
             } catch (Exception e2) {
-                // can probably never occur since the initializeJmsConnection method catches all exceptions
                 getLogger().error("Error trying to establish JMS topic connection, giving up.", e2);
             }
-            runningThreads.remove(this);
+            synchronized (connectionEstablishThreadLock) {
+                connectionEstablishThread = null;
+                getLogger().info("JMS connection re-establish thread ended.");
+            }
         }
     }
 
@@ -204,12 +216,16 @@
     public void dispose() {
         stopping = true;
 
-        if (runningThreads.size() > 0 && getLogger().isDebugEnabled())
-            getLogger().debug("Will interrupt " + runningThreads.size() + " JMS connection-establishing threads.");
-        Iterator runningThreadIt = runningThreads.iterator();
-        while (runningThreadIt.hasNext()) {
-            Thread thread = (Thread)runningThreadIt.next();
-            thread.interrupt();
+        synchronized (connectionEstablishThreadLock) {
+            if (connectionEstablishThread != null) {
+                getLogger().info("Waiting for JMS connection re-establish thread to end");
+                connectionEstablishThread.interrupt();
+                try {
+                    connectionEstablishThread.join();
+                } catch (InterruptedException e) {
+                    // ignore
+                }
+            }
         }
 
         try {
@@ -222,78 +238,90 @@
         }
     }
 
-    private synchronized void connectionUp() {
+    private void connectionUp() {
         bringUp(consumers);
         bringUp(senders);
     }
 
-    private synchronized void connectionDown() {
+    private void connectionDown() {
         bringDown(consumers);
         bringDown(senders);
     }
 
-    private void bringUp(List list) {
-        Iterator it = list.iterator();
-        while (it.hasNext()) {
+    private void bringUp(List<? extends Reconnectable> list) {
+        for (Reconnectable reconnectable : list) {
             try {
-                ((Reconnectable)it.next()).connectionUp();
+                reconnectable.connectionUp();
             } catch (Throwable e) {
-                getLogger().error("Error 'upping' a JMS session.", e);
+
+                getLogger().error("Error 'upping' a JMS session. When you see this, it is recommended to restart the application.", e);
             }
         }
     }
 
-    private void bringDown(List list) {
-        Iterator it = list.iterator();
-        while (it.hasNext()) {
+    private void bringDown(List<? extends Reconnectable> list) {
+        for (Reconnectable reconnectable : list) {
             try {
-                ((Reconnectable)it.next()).connectionDown();
+                reconnectable.connectionDown();
             } catch (Throwable e) {
                 getLogger().error("Error 'downing' a JMS session.", e);
             }
         }
     }
 
-    public synchronized void registerDurableTopicListener(String topicName, String subscriptionName, MessageListener listener) throws Exception {
+    public void registerDurableTopicListener(String topicName, String subscriptionName, MessageListener listener) throws Exception {
         MyJmsMessageListener theListener = new MyJmsMessageListener(topicName, subscriptionName, listener);
         theListener.connectionUp();
-        consumers.add(theListener);
+        synchronized (consumersLock) {
+            consumers.add(theListener);
+        }
     }
 
-    public synchronized void registerListener(String destinationName, MessageListener listener) throws Exception {
+    public void registerListener(String destinationName, MessageListener listener) throws Exception {
         MyJmsMessageListener theListener = new MyJmsMessageListener(destinationName, null, listener);
         theListener.connectionUp();
-        consumers.add(theListener);
+        synchronized (consumersLock) {
+            consumers.add(theListener);
+        }
     }
 
-    public synchronized void unregisterListener(MessageListener listener) {
-        if (listener instanceof MyJmsMessageListener) {
-            consumers.remove(listener);
-            ((MyJmsMessageListener)listener).dispose();
-        } else {
-            throw new RuntimeException("Unexpected object: " + listener);
+    public void unregisterListener(MessageListener listener) {
+        synchronized (consumersLock) {
+            for (MyJmsMessageListener myListener : consumers) {
+                if (myListener.getDelegate() == listener) {
+                    consumers.remove(myListener);
+                    myListener.dispose();
+                    return;
+                }
+            }
         }
+        throw new RuntimeException("The specified listener is currently not registered.");
     }
 
     public Sender getSender(String destinationName) {
         return getSender(destinationName, false);
     }
 
-    public synchronized Sender getSender(String destinationName, boolean transacted) {
+    public Sender getSender(String destinationName, boolean transacted) {
         SenderImpl sender = new SenderImpl(destinationName, transacted);
         try {
             sender.connectionUp();
         } catch (Exception e) {
             getLogger().warn("Sender could not be initialized after initial retrieval, meaning the JMS connection is probably down.", e);
         }
-        senders.add(sender);
+        synchronized (sendersLock) {
+            senders.add(sender);
+        }
         return sender;
     }
 
-    public synchronized void unregisterSender(Sender sender) {
+    public void unregisterSender(Sender sender) {
         if (sender instanceof SenderImpl) {
-            senders.remove(sender);
-            ((SenderImpl)sender).dispose();
+            SenderImpl senderImpl = (SenderImpl)sender;
+            synchronized (sendersLock) {
+                senders.remove(senderImpl);
+            }
+            senderImpl.dispose();
         } else {
             throw new RuntimeException("Unexpected object: " + sender);
         }
@@ -354,10 +382,14 @@
                 getLogger().error("Error closing JMS session.", e);
             }
         }
+
+        public MessageListener getDelegate() {
+            return delegate;
+        }
     }
 
     class SenderImpl implements Sender, Reconnectable {
-        private boolean connectionUp = false;
+        private volatile boolean connectionUp = false;
         private Session session;
         private String destinationName;
         private MessageProducer messageProducer;
@@ -376,7 +408,7 @@
             });
         }
 
-        public TextMessage createTextMessage(final String text) throws JMSException {
+        public TextMessage createTextMessage(final String text) throws JMSException, InterruptedException {
             final TextMessage[] message = new TextMessage[1];
             executeWhenConnectionIsUp(new JMSAction() {
                 public void run() throws Exception {
@@ -386,7 +418,7 @@
             return message[0];
         }
 
-        public MapMessage createMapMessage() throws JMSException {
+        public MapMessage createMapMessage() throws JMSException, InterruptedException {
             final MapMessage[] message = new MapMessage[1];
             executeWhenConnectionIsUp(new JMSAction() {
                 public void run() throws Exception {
@@ -426,17 +458,12 @@
             }
         }
 
-        protected synchronized void executeWhenConnectionIsUp(JMSAction action) {
+        protected void executeWhenConnectionIsUp(JMSAction action) throws InterruptedException {
             stoppingLoop: while (!stopping) {
                 // wait till connection is back up
-                while (!connectionUp) {
+                while (!connectionUp && !stopping) {
                     getLogger().debug("JMS connection is down...");
-                    try {
-                        Thread.sleep(CONN_RETRY_INTERVAL);
-                    } catch (InterruptedException e) {
-                        getLogger().debug("Got interruptedexception while sleeping to wait for JMS connection to re-appear.", e);
-                        break stoppingLoop;
-                    }
+                    Thread.sleep(CONN_RETRY_INTERVAL);
                 }
 
                 // connection is back, try to send message
@@ -452,6 +479,8 @@
                         action.run();
                         return;
                     } catch (Exception e) {
+                        if (e instanceof InterruptedException)
+                            throw (InterruptedException)e;
                         if (!connectionUp) {
                             // connection was just lost (again), go waiting till it is back up
                             break;
@@ -462,13 +491,10 @@
                             i++;
                             if (i >= 3) {
                                 throw new RuntimeException("Failed to execute JMS action, giving up.", e);
+                            } else if (stopping) {
+                                throw new InterruptedException("JMS client is shutting down.");
                             } else {
-                                try {
-                                    Thread.sleep(CONN_RETRY_INTERVAL);
-                                } catch (InterruptedException e2) {
-                                    getLogger().debug("Got interruptedexception while sleeping before retrying JMS action.", e);
-                                    break stoppingLoop;
-                                }
+                                Thread.sleep(CONN_RETRY_INTERVAL);
                             }
                         }
                     } finally {
@@ -476,7 +502,7 @@
                     }
                 }
             }
-            throw new RuntimeException("Failed to execute JMS action and now server is going down...");
+            throw new InterruptedException("JMS client is shutting down.");
         }
     }
 

File [added]: JmsClientImpl.xinfo
Delta lines: +17 -0
===================================================================
--- trunk/daisy/services/jms/impl/src/java/org/outerj/daisy/jms/impl/JmsClientImpl.xinfo	2007-03-22 15:15:48 UTC (rev 3871)
+++ trunk/daisy/services/jms/impl/src/java/org/outerj/daisy/jms/impl/JmsClientImpl.xinfo	2007-03-23 18:06:30 UTC (rev 3872)
@@ -0,0 +1,17 @@
+<?xml version="1.0" encoding="ISO-8859-1"?>
+<!DOCTYPE type PUBLIC "-//AVALON/Type DTD Version 1.0//EN" "http://avalon.apache.org/dtds/meta/type_1_1.dtd" >
+
+<type>
+  <info>
+    <name>jsmclient</name>
+    <version>1.0.0</version>
+    <lifestyle>singleton</lifestyle>
+    <collection>hard</collection>
+  </info>
+  <services>
+    <service type="org.outerj.daisy.jms.JmsClient"/>
+  </services>
+  <dependencies>
+    <dependency key="driverregistrar" type="org.outerj.daisy.datasource.DriverRegistrar"/>
+  </dependencies>
+</type>
\ No newline at end of file

Directory: /trunk/daisy/services/jms/api/src/java/org/outerj/daisy/jms/
=======================================================================

File [modified]: Sender.java
Delta lines: +2 -2
===================================================================
--- trunk/daisy/services/jms/api/src/java/org/outerj/daisy/jms/Sender.java	2007-03-22 15:15:48 UTC (rev 3871)
+++ trunk/daisy/services/jms/api/src/java/org/outerj/daisy/jms/Sender.java	2007-03-23 18:06:30 UTC (rev 3872)
@@ -23,9 +23,9 @@
 public interface Sender {
     void send(Message message) throws Exception;
 
-    TextMessage createTextMessage(String message) throws JMSException;
+    TextMessage createTextMessage(String message) throws JMSException, InterruptedException;
 
-    MapMessage createMapMessage() throws JMSException;
+    MapMessage createMapMessage() throws JMSException, InterruptedException;
 
     void commit() throws JMSException;
 }

Directory: /trunk/daisy/repository/server/
==========================================

File [modified]: activemq-conf.xml.template
Delta lines: +0 -6
===================================================================
--- trunk/daisy/repository/server/activemq-conf.xml.template	2007-03-22 15:15:48 UTC (rev 3871)
+++ trunk/daisy/repository/server/activemq-conf.xml.template	2007-03-23 18:06:30 UTC (rev 3872)
@@ -87,12 +87,6 @@
     <property name="username" value="activemq"/>
     <property name="password" value="activemq"/>
     <property name="defaultTransactionIsolation" value="2"/> <!-- 2 = TRANSACTION_READ_COMMITTED -->
-    <property name="poolPreparedStatements" value="true"/>
-    <property name="initialSize" value="5"/>
-    <property name="maxActive" value="20"/>
-    <property name="maxIdle" value="8"/>
-    <property name="minIdle" value="0"/>
-    <property name="maxWait" value="5000"/>
   </bean>
   
   <bean id="statements" class="org.apache.activemq.store.jdbc.Statements">

File [modified]: project.xml
Delta lines: +2 -2
===================================================================
--- trunk/daisy/repository/server/project.xml	2007-03-22 15:15:48 UTC (rev 3871)
+++ trunk/daisy/repository/server/project.xml	2007-03-23 18:06:30 UTC (rev 3872)
@@ -128,12 +128,12 @@
     <dependency>
       <groupId>lucene</groupId>
       <artifactId>lucene-core</artifactId>
-      <version>2.0.0</version>
+      <version>2.1.0</version>
     </dependency>
     <dependency>
       <groupId>lucene</groupId>
       <artifactId>lucene-highlighter</artifactId>
-      <version>2.0.0</version>
+      <version>2.1.0</version>
     </dependency>
     <dependency>
       <groupId>mx4j</groupId>

Directory: /trunk/daisy/repository/server/src/java/org/outerj/daisy/repository/serverimpl/
==========================================================================================

File [modified]: LocalDocumentStrategy.java
Delta lines: +23 -14
===================================================================
--- trunk/daisy/repository/server/src/java/org/outerj/daisy/repository/serverimpl/LocalDocumentStrategy.java	2007-03-22 15:15:48 UTC (rev 3871)
+++ trunk/daisy/repository/server/src/java/org/outerj/daisy/repository/serverimpl/LocalDocumentStrategy.java	2007-03-23 18:06:30 UTC (rev 3872)
@@ -794,27 +794,36 @@
             } catch (Exception e) {
                 logger.error("Error creating summary for document ID " + docId + ", branch " + getBranchLabel(document.getBranchId()) + ", language " + getLanguageLabel(document.getLanguageId()), e);
             }
-            if (summary != null) {
+
+            // check if a summary record exists
+            stmt = conn.prepareStatement("select 1 from summaries where doc_id = ? and ns_id = ? and branch_id = ? and lang_id = ?");
+            stmt.setLong(1, docId.getSeqId());
+            stmt.setLong(2, docId.getNsId());
+            stmt.setLong(3, document.getBranchId());
+            stmt.setLong(4, document.getLanguageId());
+            ResultSet rs = stmt.executeQuery();
+            boolean storedSummaryExists = rs.next();
+            stmt.close();
+
+            if (summary != null && storedSummaryExists) {
                 stmt = conn.prepareStatement("update summaries set summary=? where doc_id = ? and ns_id = ? and branch_id = ? and lang_id = ?");
                 stmt.setString(1, summary);
                 stmt.setLong(2, docId.getSeqId());
                 stmt.setLong(3, docId.getNsId());
                 stmt.setLong(4, document.getBranchId());
                 stmt.setLong(5, document.getLanguageId());
-                long updateCount = stmt.executeUpdate();
+                stmt.executeUpdate();
                 stmt.close();
-
-                if (updateCount == 0) {
-                    stmt = conn.prepareStatement("insert into summaries(doc_id, ns_id, branch_id, lang_id, summary) values(?,?,?,?,?)");
-                    stmt.setLong(1, docId.getSeqId());
-                    stmt.setLong(2, docId.getNsId());
-                    stmt.setLong(3, document.getBranchId());
-                    stmt.setLong(4, document.getLanguageId());
-                    stmt.setString(5, summary);
-                    stmt.execute();
-                    stmt.close();
-                }
-            } else {
+            } else if (summary != null && !storedSummaryExists) {
+                stmt = conn.prepareStatement("insert into summaries(doc_id, ns_id, branch_id, lang_id, summary) values(?,?,?,?,?)");
+                stmt.setLong(1, docId.getSeqId());
+                stmt.setLong(2, docId.getNsId());
+                stmt.setLong(3, document.getBranchId());
+                stmt.setLong(4, document.getLanguageId());
+                stmt.setString(5, summary);
+                stmt.execute();
+                stmt.close();
+            } else if (summary == null && storedSummaryExists) {
                 stmt = conn.prepareStatement("delete from summaries where doc_id = ? and ns_id = ? and branch_id = ? and lang_id = ?");
                 stmt.setLong(1, docId.getSeqId());
                 stmt.setLong(2, docId.getNsId());

File [modified]: LocalRepositoryManager.java
Delta lines: +9 -4
===================================================================
--- trunk/daisy/repository/server/src/java/org/outerj/daisy/repository/serverimpl/LocalRepositoryManager.java	2007-03-22 15:15:48 UTC (rev 3871)
+++ trunk/daisy/repository/server/src/java/org/outerj/daisy/repository/serverimpl/LocalRepositoryManager.java	2007-03-23 18:06:30 UTC (rev 3872)
@@ -146,8 +146,13 @@
     }
 
     public void stop() throws Exception {
+        getLogger().info("Waiting for expired lock janitor thread to end.");
         expiredLockJanitorThread.interrupt();
-        try { expiredLockJanitorThread.join(); } catch (InterruptedException e) { /* ignore */ }
+        try {
+            expiredLockJanitorThread.join();
+        } catch (InterruptedException e) {
+            // ignore
+        }
     }
 
     public Repository getRepository(final Credentials credentials) throws RepositoryException {
@@ -382,7 +387,6 @@
     }
 
     private class ExpiredLockJanitor implements Runnable {
-        private final String SHUTDOWN_MESSAGE = "ExpiredLockJanitor shutting down.";
         public void run() {
             try {
                 while (true) {
@@ -396,7 +400,6 @@
                         ResultSet rs = stmt.executeQuery();
                         while (rs.next()) {
                             if (Thread.interrupted()) {
-                                getLogger().info(SHUTDOWN_MESSAGE);
                                 return;
                             }
                             String documentId = rs.getLong("doc_id") + "-" + rs.getString("ns_name");
@@ -420,7 +423,9 @@
                     }
                 }
             } catch (InterruptedException e) {
-                getLogger().info(SHUTDOWN_MESSAGE);
+                // ignore
+            } finally {
+                getLogger().info("Expired lock janitor thread ended.");
             }
         }
     }

Directory: /trunk/daisy/services/doctaskrunner/server-impl/src/java/org/outerj/daisy/doctaskrunner/serverimpl/
==============================================================================================================

File [modified]: CommonDocumentTaskManager.java
Delta lines: +44 -34
===================================================================
--- trunk/daisy/services/doctaskrunner/server-impl/src/java/org/outerj/daisy/doctaskrunner/serverimpl/CommonDocumentTaskManager.java	2007-03-22 15:15:48 UTC (rev 3871)
+++ trunk/daisy/services/doctaskrunner/server-impl/src/java/org/outerj/daisy/doctaskrunner/serverimpl/CommonDocumentTaskManager.java	2007-03-23 18:06:30 UTC (rev 3872)
@@ -115,8 +115,13 @@
     }
 
     public void stop() throws Exception {
+        logger.info("Waiting for document task janitor thread to end.");
         janitorThread.interrupt();
-        try { janitorThread.join(); } catch (InterruptedException e) {}
+        try {
+            janitorThread.join();
+        } catch (InterruptedException e) {
+            // ignore
+        }
     }
 
     class MyExtensionProvider implements ExtensionProvider {
@@ -523,44 +528,49 @@
 
     class ExpiredTasksJanitor implements Runnable {
         public void run() {
-            while (true) {
-                try {
+            try {
+                while (true) {
+                    if (Thread.interrupted())
+                        return;
+
                     Thread.sleep(taskJanitorRunInterval);
-                } catch (InterruptedException e) {
-                    logger.debug("ExpiredTaskJanitor thread was interrupted.");
-                    return;
-                }
-                Connection conn = null;
-                PreparedStatement stmt = null;
-                try {
-                    conn = dataSource.getConnection();
-                    jdbcHelper.startTransaction(conn);
 
-                    // Note: the search is performed on started_at and not on finished_at because finished_at may
-                    // not always have a value (e.g. when the task was interrupted by shutdown)
-                    stmt = conn.prepareStatement("select id from document_tasks where started_at < ? and state not in ('" + TaskState.INITIALISING.getCode() + "', '" + TaskState.RUNNING.getCode() + "') " + jdbcHelper.getSharedLockClause());
-                    stmt.setTimestamp(1, new Timestamp(System.currentTimeMillis() - taskJanitorTaskMaxAge));
-                    ResultSet rs = stmt.executeQuery();
-                    ArrayList taskIds = new ArrayList();
-                    while (rs.next()) {
-                        taskIds.add(new Long(rs.getLong(1)));
-                    }
-                    stmt.close();
+                    Connection conn = null;
+                    PreparedStatement stmt = null;
+                    try {
+                        conn = dataSource.getConnection();
+                        jdbcHelper.startTransaction(conn);
 
-                    Iterator taskIdsIt = taskIds.iterator();
-                    while (taskIdsIt.hasNext()) {
-                        long taskId = ((Long)taskIdsIt.next()).longValue();
-                        deleteTask(taskId, conn);
+                        // Note: the search is performed on started_at and not on finished_at because finished_at may
+                        // not always have a value (e.g. when the task was interrupted by shutdown)
+                        stmt = conn.prepareStatement("select id from document_tasks where started_at < ? and state not in ('" + TaskState.INITIALISING.getCode() + "', '" + TaskState.RUNNING.getCode() + "') " + jdbcHelper.getSharedLockClause());
+                        stmt.setTimestamp(1, new Timestamp(System.currentTimeMillis() - taskJanitorTaskMaxAge));
+                        ResultSet rs = stmt.executeQuery();
+                        ArrayList taskIds = new ArrayList();
+                        while (rs.next()) {
+                            taskIds.add(new Long(rs.getLong(1)));
+                        }
+                        stmt.close();
+
+                        Iterator taskIdsIt = taskIds.iterator();
+                        while (taskIdsIt.hasNext()) {
+                            long taskId = ((Long)taskIdsIt.next()).longValue();
+                            deleteTask(taskId, conn);
+                        }
+
+                        conn.commit();
+                    } catch (Throwable e) {
+                        jdbcHelper.rollback(conn);
+                        logger.error("Expired tasks janitor: error while performing my job.", e);
+                    } finally {
+                        jdbcHelper.closeStatement(stmt);
+                        jdbcHelper.closeConnection(conn);
                     }
-
-                    conn.commit();
-                } catch (Throwable e) {
-                    jdbcHelper.rollback(conn);
-                    logger.error("Expired tasks janitor: error while performing my job.", e);
-                } finally {
-                    jdbcHelper.closeStatement(stmt);
-                    jdbcHelper.closeConnection(conn);
                 }
+            } catch (InterruptedException e) {
+                // ignore
+            } finally {
+                logger.debug("Expired document task janitor thread ended.");
             }
         }
     }

Directory: /trunk/daisy/services/emailer/server-impl/src/java/org/outerj/daisy/emailer/serverimpl/
==================================================================================================

File [modified]: CommonEmailer.java
Delta lines: +83 -71
===================================================================
--- trunk/daisy/services/emailer/server-impl/src/java/org/outerj/daisy/emailer/serverimpl/CommonEmailer.java	2007-03-22 15:15:48 UTC (rev 3871)
+++ trunk/daisy/services/emailer/server-impl/src/java/org/outerj/daisy/emailer/serverimpl/CommonEmailer.java	2007-03-23 18:06:30 UTC (rev 3872)
@@ -88,8 +88,13 @@
     }
 
     public void stop() throws Exception {
+        getLogger().info("Waiting for emailer thread to end.");
         emailerThread.interrupt();
-        try { emailerThread.join(); } catch (InterruptedException e) {}
+        try {
+            emailerThread.join();
+        } catch (InterruptedException e) {
+            // ignore
+        }
     }
 
     /**
@@ -170,88 +175,95 @@
 
     class EmailerThread implements Runnable {
         public void run() {
-            long lastInvocationTime = System.currentTimeMillis();
-            bigLoop: while (true) {
-                try {
-                    lastInvocationTime = System.currentTimeMillis();
-
-                    Connection conn = null;
-                    PreparedStatement stmt = null;
-                    PreparedStatement stmtUpdate = null;
-                    PreparedStatement stmtDelete = null;
+            try {
+                long lastInvocationTime = System.currentTimeMillis();
+                while (true) {
                     try {
-                        conn = dataSource.getConnection();
-                        stmt = conn.prepareStatement("select id,from_address,to_address,subject,message,retry_count,created from email_queue where retry_count < ? and (last_try_time is null or last_try_time < ?) order by created");
-                        stmt.setLong(1, maxTryCount);
-                        stmt.setTimestamp(2, new java.sql.Timestamp(System.currentTimeMillis() - (retryInterval * 60000)));
-                        ResultSet rs = stmt.executeQuery();
+                        lastInvocationTime = System.currentTimeMillis();
 
-                        while (rs.next()) {
-                            String from = rs.getString("from_address");
-                            if (from == null)
-                                from = defaultFrom;
-                            String to = rs.getString("to_address");
-                            String subject = rs.getString("subject");
-                            String message = rs.getString("message");
-                            int retryCount = rs.getInt("retry_count");
-                            long id = rs.getLong("id");
-                            boolean success = false;
+                        Connection conn = null;
+                        PreparedStatement stmt = null;
+                        PreparedStatement stmtUpdate = null;
+                        PreparedStatement stmtDelete = null;
+                        try {
+                            conn = dataSource.getConnection();
+                            stmt = conn.prepareStatement("select id,from_address,to_address,subject,message,retry_count,created from email_queue where retry_count < ? and (last_try_time is null or last_try_time < ?) order by created");
+                            stmt.setLong(1, maxTryCount);
+                            stmt.setTimestamp(2, new java.sql.Timestamp(System.currentTimeMillis() - (retryInterval * 60000)));
+                            ResultSet rs = stmt.executeQuery();
 
-                            try {
-                                sendEmail(from, to, subject, message);
-                                success = true;
-                            } catch (Throwable e) {
-                                // update DB record
-                                if (stmtUpdate == null)
-                                    stmtUpdate = conn.prepareStatement("update email_queue set retry_count = ?, last_try_time = ?, error = ? where id = ?");
-                                stmtUpdate.setInt(1, retryCount + 1);
-                                stmtUpdate.setTimestamp(2, new Timestamp(System.currentTimeMillis()));
-                                stmtUpdate.setString(3, e.toString());
-                                stmtUpdate.setLong(4, id);
-                                stmtUpdate.execute();
-                            }
+                            while (rs.next()) {
+                                String from = rs.getString("from_address");
+                                if (from == null)
+                                    from = defaultFrom;
+                                String to = rs.getString("to_address");
+                                String subject = rs.getString("subject");
+                                String message = rs.getString("message");
+                                int retryCount = rs.getInt("retry_count");
+                                long id = rs.getLong("id");
+                                boolean success = false;
 
-                            if (success) {
-                                if (stmtDelete == null)
-                                    stmtDelete = conn.prepareStatement("delete from email_queue where id = ?");
-                                stmtDelete.setLong(1, id);
-                                stmtDelete.execute();
+                                try {
+                                    sendEmail(from, to, subject, message);
+                                    success = true;
+                                } catch (Throwable e) {
+                                    // update DB record
+                                    if (stmtUpdate == null)
+                                        stmtUpdate = conn.prepareStatement("update email_queue set retry_count = ?, last_try_time = ?, error = ? where id = ?");
+                                    stmtUpdate.setInt(1, retryCount + 1);
+                                    stmtUpdate.setTimestamp(2, new Timestamp(System.currentTimeMillis()));
+                                    stmtUpdate.setString(3, e.toString());
+                                    stmtUpdate.setLong(4, id);
+                                    stmtUpdate.execute();
+                                }
+
+                                if (success) {
+                                    if (stmtDelete == null)
+                                        stmtDelete = conn.prepareStatement("delete from email_queue where id = ?");
+                                    stmtDelete.setLong(1, id);
+                                    stmtDelete.execute();
+                                }
                             }
-                        }
 
-                        stmt.close();
+                            stmt.close();
 
-                        // cleanup expired messages
-                        stmt = conn.prepareStatement("delete from email_queue where retry_count >= ? and last_try_time < ?");
-                        stmt.setLong(1, maxTryCount);
-                        stmt.setTimestamp(2, new Timestamp(System.currentTimeMillis() - maxAge));
-                        int messagesDeleted = stmt.executeUpdate();
-                        if (messagesDeleted > 0)
-                            getLogger().warn("Removed " + messagesDeleted + " expired unsent messages from the email queue.");
-                    } catch (SQLException e) {
-                        throw new RuntimeException("Database-related problem in emailer-thread.", e);
-                    } finally {
-                        closeStatement(stmt);
-                        closeStatement(stmtUpdate);
-                        closeStatement(stmtDelete);
-                        closeConnection(conn);
+                            // cleanup expired messages
+                            stmt = conn.prepareStatement("delete from email_queue where retry_count >= ? and last_try_time < ?");
+                            stmt.setLong(1, maxTryCount);
+                            stmt.setTimestamp(2, new Timestamp(System.currentTimeMillis() - maxAge));
+                            int messagesDeleted = stmt.executeUpdate();
+                            if (messagesDeleted > 0)
+                                getLogger().warn("Removed " + messagesDeleted + " expired unsent messages from the email queue.");
+                        } catch (SQLException e) {
+                            throw new RuntimeException("Database-related problem in emailer-thread.", e);
+                        } finally {
+                            closeStatement(stmt);
+                            closeStatement(stmtUpdate);
+                            closeStatement(stmtDelete);
+                            closeConnection(conn);
+                        }
+                    } catch (Throwable e) {
+                        if (e instanceof InterruptedException)
+                            return;
+                        else
+                            getLogger().error("Error in the emailer thread.", e);
                     }
-                } catch (Throwable e) {
-                    getLogger().error("Error in the emailer thread.", e);
-                }
 
-                // sleeping is performed after the try-catch block, so that in case of an exception
-                // we also sleep (the remaining exceptions will probably be problems connecting
-                // to the database, in which case we better wait a bit before trying again)
-                long sleepTime = emailThreadInterval - (System.currentTimeMillis() - lastInvocationTime);
-                if (sleepTime > 0) {
-                    try {
+                    if (Thread.interrupted())
+                        return;
+                    
+                    // sleeping is performed after the try-catch block, so that in case of an exception
+                    // we also sleep (the remaining exceptions will probably be problems connecting
+                    // to the database, in which case we better wait a bit before trying again)
+                    long sleepTime = emailThreadInterval - (System.currentTimeMillis() - lastInvocationTime);
+                    if (sleepTime > 0) {
                         Thread.sleep(sleepTime);
-                    } catch (InterruptedException e) {
-                        getLogger().info("Emailer shutting down.");
-                        break bigLoop;
                     }
                 }
+            } catch (InterruptedException e) {
+                // ignore
+            } finally {
+                getLogger().info("Emailer thread ended.");
             }
         }
     }

Directory: /trunk/daisy/services/emailnotifier/server-impl/src/java/org/outerj/daisy/emailnotifier/serverimpl/
==============================================================================================================

File [modified]: EmailNotifierImpl.java
Delta lines: +10 -3
===================================================================
--- trunk/daisy/services/emailnotifier/server-impl/src/java/org/outerj/daisy/emailnotifier/serverimpl/EmailNotifierImpl.java	2007-03-22 15:15:48 UTC (rev 3871)
+++ trunk/daisy/services/emailnotifier/server-impl/src/java/org/outerj/daisy/emailnotifier/serverimpl/EmailNotifierImpl.java	2007-03-23 18:06:30 UTC (rev 3872)
@@ -20,6 +20,7 @@
 import org.apache.avalon.framework.service.ServiceManager;
 import org.apache.avalon.framework.service.ServiceException;
 import org.apache.avalon.framework.activity.Initializable;
+import org.apache.avalon.framework.activity.Disposable;
 import org.apache.avalon.framework.configuration.Configurable;
 import org.apache.avalon.framework.configuration.Configuration;
 import org.apache.avalon.framework.configuration.ConfigurationException;
@@ -50,7 +51,7 @@
 /**
  * @avalon.component version="1.0" name="emailnotifier" lifestyle="singleton"
  */
-public class EmailNotifierImpl extends AbstractLogEnabled implements Serviceable, Initializable, Configurable {
+public class EmailNotifierImpl extends AbstractLogEnabled implements Serviceable, Initializable, Configurable, Disposable {
     private ServiceManager serviceManager;
     private Repository repository;
     private String repoUser;
@@ -118,8 +119,6 @@
     }
 
     public void initialize() throws Exception {
-        jmsClient.registerDurableTopicListener(jmsTopicName, subscriptionName, eventListener);
-
         RepositoryManager repositoryManager = (RepositoryManager)serviceManager.lookup("repository-manager");
         try {
             repository = repositoryManager.getRepository(new Credentials(repoUser, repoPassword));
@@ -147,8 +146,16 @@
         } finally {
             serviceManager.release(mbeanServer);
         }
+
+        // Start listening to the JMS events
+        jmsClient.registerDurableTopicListener(jmsTopicName, subscriptionName, eventListener);
     }
 
+    public void dispose() {
+        jmsClient.unregisterListener(eventListener);
+        serviceManager.release(jmsClient);
+    }
+
     class EventListener implements MessageListener {
         public void onMessage(Message aMessage) {
             try {

Directory: /trunk/daisy/repository/test/
========================================

File [modified]: myconfig.xml.template
Delta lines: +3 -0
===================================================================
--- trunk/daisy/repository/test/myconfig.xml.template	2007-03-22 15:15:48 UTC (rev 3871)
+++ trunk/daisy/repository/test/myconfig.xml.template	2007-03-23 18:06:30 UTC (rev 3872)
@@ -19,6 +19,9 @@
     <configuration>
       <!-- directory where index files should be stored -->
       <indexDirectory>@testsupport.fulltextindexstore@</indexDirectory>
+      <!-- How regularly should the full text index be flushed?
+           Updates to the fulltext index only have effect after flushing. -->
+      <indexFlushInterval>5000</indexFlushInterval>
     </configuration>
   </target>
 

File [modified]: project.xml
Delta lines: +2 -2
===================================================================
--- trunk/daisy/repository/test/project.xml	2007-03-22 15:15:48 UTC (rev 3871)
+++ trunk/daisy/repository/test/project.xml	2007-03-23 18:06:30 UTC (rev 3872)
@@ -203,12 +203,12 @@
     <dependency>
       <groupId>lucene</groupId>
       <artifactId>lucene-core</artifactId>
-      <version>2.0.0</version>
+      <version>2.1.0</version>
     </dependency>
     <dependency>
       <groupId>lucene</groupId>
       <artifactId>lucene-highlighter</artifactId>
-      <version>2.0.0</version>
+      <version>2.1.0</version>
     </dependency>
     <dependency>
       <groupId>mx4j</groupId>



More information about the daisy-commits mailing list