Coverage Report - fr.fastconnect.gigaspaces.datasource.CompositeBulkDataPersister
 
Classes in this File Line Coverage Branch Coverage Complexity
CompositeBulkDataPersister
0%
0/138
0%
0/74
0
CompositeBulkDataPersister$1
0%
0/6
N/A
0
CompositeBulkDataPersister$BulkDataPersisterExecutor
0%
0/7
N/A
0
 
 1  
 /*
 2  
  * Copyright 2008 fastConnect.
 3  
  *    
 4  
  * This file is part of Composite BulkDataPersister.  
 5  
  *
 6  
  * This is free software; you can redistribute it and/or modify it
 7  
  * under the terms of the GNU Lesser General Public License as
 8  
  * published by the Free Software Foundation; either version 3 of
 9  
  * the License, or (at your option) any later version.
 10  
  *
 11  
  * This software is distributed in the hope that it will be useful,
 12  
  * but WITHOUT ANY WARRANTY; without even the implied warranty of
 13  
  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
 14  
  * Lesser General Public License for more details.
 15  
  *
 16  
  * You should have received a copy of the GNU Lesser General Public
 17  
  * License along with this software. If not, see <http://www.gnu.org/licenses/>.
 18  
  */
 19  
 package fr.fastconnect.gigaspaces.datasource;
 20  
 
 21  
 import java.io.FileOutputStream;
 22  
 import java.io.IOException;
 23  
 import java.io.ObjectOutputStream;
 24  
 import java.util.ArrayList;
 25  
 import java.util.Arrays;
 26  
 import java.util.HashMap;
 27  
 import java.util.List;
 28  
 import java.util.Map;
 29  
 import java.util.Properties;
 30  
 import java.util.concurrent.Callable;
 31  
 import java.util.concurrent.ExecutionException;
 32  
 import java.util.concurrent.ExecutorService;
 33  
 import java.util.concurrent.Executors;
 34  
 import java.util.concurrent.Future;
 35  
 import java.util.concurrent.LinkedBlockingQueue;
 36  
 import java.util.concurrent.ThreadFactory;
 37  
 import java.util.concurrent.ThreadPoolExecutor;
 38  
 import java.util.concurrent.TimeUnit;
 39  
 import java.util.concurrent.atomic.AtomicInteger;
 40  
 import java.util.logging.Level;
 41  
 import java.util.logging.Logger;
 42  
 
 43  
 import com.gigaspaces.datasource.BulkDataPersister;
 44  
 import com.gigaspaces.datasource.BulkItem;
 45  
 import com.gigaspaces.datasource.DataIterator;
 46  
 import com.gigaspaces.datasource.DataSourceException;
 47  
 import com.gigaspaces.datasource.ManagedDataSource;
 48  
 
 49  
 /**
 50  
  *Provides the capacity to use several implementation of BulkDataPersister using a Mirror.
 51  
  *
 52  
  *@param <T>
 53  
  */
 54  
 public class CompositeBulkDataPersister<T> implements BulkDataPersister, ManagedDataSource<T> {
 55  
         
 56  
         /**
 57  
          * Simple implementation of {@link Callable} calling {@link BulkDataPersister#executeBulk(List)} on provided {@link BulkDataPersister}.
 58  
          */
 59  0
         private static class BulkDataPersisterExecutor implements Callable<Void> {
 60  
 
 61  
                 private final BulkDataPersister bulkDataPersister;
 62  
                 private final List<BulkItem> bulkItems;
 63  
                 
 64  0
                 public BulkDataPersisterExecutor(final BulkDataPersister bulkDataPersister, final List<BulkItem> bulkItems, final ExecutionFailureStrategy executionFailureStrategy) {
 65  0
                         this.bulkDataPersister = bulkDataPersister;
 66  0
                         this.bulkItems = bulkItems;
 67  0
                 }
 68  
                 
 69  
                 public Void call() throws Exception {
 70  0
                         this.bulkDataPersister.executeBulk(this.bulkItems);
 71  0
                         return null;
 72  
                 }
 73  
                 
 74  
         }
 75  
 
 76  0
         private static final Logger LOGGER = Logger.getLogger(CompositeBulkDataPersister.class.getName());
 77  
         
 78  
         public static final String BULK_DATA_PERSISTER_CLASS_NAMES_PROPERTY = "delegate-classnames";
 79  
         public static final String BULK_DATA_PERSISTER_CLASS_NAMES_DELIMITER_PROPERTY = "delegate-classname-delimiter";
 80  
         public static final String BULK_DATA_PERSISTER_EXECUTION_EXCEPTION_STRATEGY_CLASS_NAME_PROPERTY = "executionfailurestrategy-classname";
 81  
         
 82  0
         private List<BulkDataPersister> bulkDataPersisterDelegates = null;
 83  
         private ExecutionFailureStrategy executionFailureStrategy;
 84  
         
 85  
         private ExecutorService executorService;
 86  
         
 87  0
         public CompositeBulkDataPersister() {
 88  0
         }
 89  
         
 90  0
         public CompositeBulkDataPersister(final BulkDataPersister[] bulkDataPersisterDelegates) {
 91  0
                 this.bulkDataPersisterDelegates = Arrays.asList(bulkDataPersisterDelegates);
 92  0
         }
 93  
         
 94  
         /** 
 95  
          * {@inheritDoc}
 96  
          */
 97  
         public void init(final Properties properties) throws DataSourceException {
 98  0
                 if (this.bulkDataPersisterDelegates == null) {
 99  
                         //get configured BulkDataPersister class names
 100  0
                         if (properties.containsKey(BULK_DATA_PERSISTER_CLASS_NAMES_PROPERTY)) {
 101  0
                                 final String bulkDataPersisterClassNames = properties.getProperty(BULK_DATA_PERSISTER_CLASS_NAMES_PROPERTY);
 102  0
                                 if (CompositeBulkDataPersister.LOGGER.isLoggable(Level.CONFIG)) {
 103  0
                                         CompositeBulkDataPersister.LOGGER.config("Using <"+bulkDataPersisterClassNames+"> as "+BULK_DATA_PERSISTER_CLASS_NAMES_PROPERTY);
 104  
                                 }
 105  
                                 
 106  0
                                 final String bulkDataPersisterClassNamesDelimiter = properties.getProperty(BULK_DATA_PERSISTER_CLASS_NAMES_DELIMITER_PROPERTY, ";");
 107  0
                                 if (CompositeBulkDataPersister.LOGGER.isLoggable(Level.CONFIG)) {
 108  0
                                         CompositeBulkDataPersister.LOGGER.config("Using <"+bulkDataPersisterClassNamesDelimiter+"> as "+BULK_DATA_PERSISTER_CLASS_NAMES_DELIMITER_PROPERTY);
 109  
                                 }
 110  
                                 
 111  
                                 //try to instantiate ExecutionFailureStrategy
 112  0
                                 final String bulkDataPersisterExecutionExceptionStrategyClassName = properties.getProperty(BULK_DATA_PERSISTER_EXECUTION_EXCEPTION_STRATEGY_CLASS_NAME_PROPERTY);
 113  0
                                 if (bulkDataPersisterExecutionExceptionStrategyClassName != null) {
 114  0
                                         if (CompositeBulkDataPersister.LOGGER.isLoggable(Level.CONFIG)) {
 115  0
                                                 CompositeBulkDataPersister.LOGGER.config("Using <"+bulkDataPersisterExecutionExceptionStrategyClassName+"> as "+BULK_DATA_PERSISTER_EXECUTION_EXCEPTION_STRATEGY_CLASS_NAME_PROPERTY);
 116  
                                         }
 117  
                                         
 118  
                                         try {
 119  0
                                                 final Class<?> bulkDataPersisterExecutionExceptionStrategyClass = Class.forName(bulkDataPersisterExecutionExceptionStrategyClassName.trim());
 120  
                                                 try {
 121  0
                                                         final Object bulkDataPersisterExecutionExceptionStrategy = bulkDataPersisterExecutionExceptionStrategyClass.newInstance();
 122  0
                                                         if (bulkDataPersisterExecutionExceptionStrategy instanceof ExecutionFailureStrategy) {
 123  0
                                                                 this.executionFailureStrategy = ExecutionFailureStrategy.class.cast(bulkDataPersisterExecutionExceptionStrategy);
 124  0
                                                                 this.executionFailureStrategy.init(properties);
 125  0
                                                         } else {
 126  0
                                                                 final String message = "<"+bulkDataPersisterExecutionExceptionStrategy+"> is not a "+BulkDataPersister.class.getSimpleName();
 127  0
                                                                 if (CompositeBulkDataPersister.LOGGER.isLoggable(Level.SEVERE)) {
 128  0
                                                                         CompositeBulkDataPersister.LOGGER.log(Level.SEVERE, message);
 129  
                                                                 }
 130  0
                                                                 throw new DataSourceException(message);
 131  
                                                         }
 132  0
                                                 } catch (InstantiationException e) {
 133  0
                                                         final String message = "<"+bulkDataPersisterExecutionExceptionStrategyClass.getName()+"> has no default constructor; skipping";
 134  0
                                                         if (CompositeBulkDataPersister.LOGGER.isLoggable(Level.SEVERE)) {
 135  0
                                                                 CompositeBulkDataPersister.LOGGER.log(Level.SEVERE, message, e);
 136  
                                                         }
 137  0
                                                         throw new DataSourceException(message, e);
 138  0
                                                 } catch (IllegalAccessException e) {
 139  0
                                                         final String message = "<"+bulkDataPersisterExecutionExceptionStrategyClass.getName()+"> has no visible default constructor; skipping";
 140  0
                                                         if (CompositeBulkDataPersister.LOGGER.isLoggable(Level.SEVERE)) {
 141  0
                                                                 CompositeBulkDataPersister.LOGGER.log(Level.SEVERE, message, e);
 142  
                                                         }
 143  0
                                                         throw new DataSourceException(message, e);
 144  0
                                                 }
 145  0
                                         } catch (ClassNotFoundException e) {
 146  0
                                                 final String message = "<"+bulkDataPersisterExecutionExceptionStrategyClassName+"> cannot be found";
 147  0
                                                 if (CompositeBulkDataPersister.LOGGER.isLoggable(Level.SEVERE)) {
 148  0
                                                         CompositeBulkDataPersister.LOGGER.log(Level.SEVERE, message, e);
 149  
                                                 }
 150  0
                                                 throw new DataSourceException(message, e);
 151  0
                                         }
 152  
                                 }
 153  
                                 
 154  0
                                 this.bulkDataPersisterDelegates = new ArrayList<BulkDataPersister>(bulkDataPersisterClassNames.length());
 155  
                                 
 156  
                                 //try to instantiate all defined BulkDataPersister classes
 157  0
                                 for (final String bulkDataPersisterClassName : bulkDataPersisterClassNames.split(bulkDataPersisterClassNamesDelimiter)) {
 158  
                                         try {
 159  0
                                                 final Class<?> bulkDataPersisterClass = Class.forName(bulkDataPersisterClassName.trim());
 160  
                                                 try {
 161  0
                                                         final Object bulkDataPersister = bulkDataPersisterClass.newInstance();
 162  0
                                                         if (bulkDataPersister instanceof BulkDataPersister) {
 163  0
                                                                 this.bulkDataPersisterDelegates.add(BulkDataPersister.class.cast(bulkDataPersister));
 164  0
                                                         } else {
 165  0
                                                                 if (CompositeBulkDataPersister.LOGGER.isLoggable(Level.WARNING)) {
 166  0
                                                                         CompositeBulkDataPersister.LOGGER.warning("<"+bulkDataPersister+"> is not a "+BulkDataPersister.class.getSimpleName()+"; skipping");
 167  
                                                                 }
 168  
                                                         }
 169  0
                                                 } catch (InstantiationException e) {
 170  0
                                                         if (CompositeBulkDataPersister.LOGGER.isLoggable(Level.WARNING)) {
 171  0
                                                                 CompositeBulkDataPersister.LOGGER.log(Level.WARNING, "<"+bulkDataPersisterClass.getName()+"> has no default constructor; skipping", e);
 172  
                                                         }
 173  0
                                                 } catch (IllegalAccessException e) {
 174  0
                                                         if (CompositeBulkDataPersister.LOGGER.isLoggable(Level.WARNING)) {
 175  0
                                                                 CompositeBulkDataPersister.LOGGER.log(Level.WARNING, "<"+bulkDataPersisterClass.getName()+"> has no visible default constructor; skipping", e);
 176  
                                                         }
 177  0
                                                 }
 178  0
                                         } catch (ClassNotFoundException e) {
 179  0
                                                 if (CompositeBulkDataPersister.LOGGER.isLoggable(Level.WARNING)) {
 180  0
                                                         CompositeBulkDataPersister.LOGGER.log(Level.WARNING, "<"+bulkDataPersisterClassName+"> cannot be found; skipping", e);
 181  
                                                 }
 182  0
                                         }
 183  
                                         
 184  
                                 }
 185  0
                         } else {
 186  0
                                 final String message = "No "+BULK_DATA_PERSISTER_CLASS_NAMES_PROPERTY+" defined";
 187  0
                                 if (CompositeBulkDataPersister.LOGGER.isLoggable(Level.SEVERE)) {
 188  0
                                         CompositeBulkDataPersister.LOGGER.log(Level.SEVERE, message);
 189  
                                 }
 190  0
                                 throw new DataSourceException(message);
 191  
                         }
 192  
                 } else {
 193  0
                         if (CompositeBulkDataPersister.LOGGER.isLoggable(Level.CONFIG)) {
 194  0
                                 CompositeBulkDataPersister.LOGGER.log(Level.CONFIG, CompositeBulkDataPersister.class.getSimpleName()+" configured manually; ignoring properties");
 195  
                         }
 196  
                 }
 197  
                 
 198  0
                 if (CompositeBulkDataPersister.LOGGER.isLoggable(Level.CONFIG)) {
 199  0
                         CompositeBulkDataPersister.LOGGER.log(Level.CONFIG, "Delegating to <"+this.bulkDataPersisterDelegates+"> using <"+this.executionFailureStrategy+"> execution exception strategy");
 200  
                 }
 201  0
                 if (this.executionFailureStrategy == null) {
 202  0
                         if (CompositeBulkDataPersister.LOGGER.isLoggable(Level.WARNING)) {
 203  0
                                 CompositeBulkDataPersister.LOGGER.warning("No "+ExecutionFailureStrategy.class.getSimpleName()+" defined; execution exception will be ignored");
 204  
                         }
 205  
                 }
 206  0
         }
 207  
         
 208  
         /** 
 209  
          * {@inheritDoc}
 210  
          */
 211  
         public DataIterator<T> initialLoad() throws DataSourceException {
 212  0
                 return null;
 213  
         }
 214  
         
 215  
         /** 
 216  
          * {@inheritDoc}
 217  
          */
 218  
         public void executeBulk(final List<BulkItem> bulkItems) throws DataSourceException {                
 219  0
                 final ExecutorService executorService = getExecutorService();
 220  0
                 final Map<Future<Void>, BulkDataPersister> bulkDataPersisterExecutions = new HashMap<Future<Void>, BulkDataPersister>(this.bulkDataPersisterDelegates.size());
 221  0
                 for (final BulkDataPersister bulkDataPersister : this.bulkDataPersisterDelegates) {
 222  0
                         bulkDataPersisterExecutions.put(executorService.submit(new BulkDataPersisterExecutor(bulkDataPersister, bulkItems, this.executionFailureStrategy)), bulkDataPersister);
 223  0
                 }
 224  
                 
 225  
                 //wait for end of execution of all threads
 226  0
                 for (final Map.Entry<Future<Void>, BulkDataPersister> result : bulkDataPersisterExecutions.entrySet()) {
 227  
                         try {
 228  
                                 //TODO do we have to handle a timeout here? 
 229  0
                                 result.getKey().get();
 230  0
                         } catch (InterruptedException e) {
 231  0
                                 if (CompositeBulkDataPersister.LOGGER.isLoggable(Level.SEVERE)) {
 232  0
                                         CompositeBulkDataPersister.LOGGER.log(Level.SEVERE, "Thread <"+Thread.currentThread().getName()+"> has been interrupted", e);
 233  
                                 }
 234  
                                 
 235  0
                                 Thread.currentThread().interrupt();
 236  0
                         } catch (ExecutionException e) {
 237  0
                                 final BulkDataPersister bulkDataPersister = result.getValue();
 238  0
                                 if (CompositeBulkDataPersister.LOGGER.isLoggable(Level.SEVERE)) {
 239  0
                                         CompositeBulkDataPersister.LOGGER.log(Level.SEVERE, "Execution of <"+bulkDataPersister+"> has thrown an exception", e);
 240  
                                 }
 241  
 
 242  
                                 //if exception thrown is a DataSourceException and we have an executionFailureStrategy defined then delegate handling to it
 243  0
                                 if (e.getCause() != null && e.getCause() instanceof DataSourceException && this.executionFailureStrategy != null) {
 244  0
                                         if (CompositeBulkDataPersister.LOGGER.isLoggable(Level.INFO)) {
 245  0
                                                 CompositeBulkDataPersister.LOGGER.log(Level.INFO, "Handling execution exception using <"+this.executionFailureStrategy+">");
 246  
                                         }
 247  
                                         
 248  
                                         try {
 249  0
                                                 this.executionFailureStrategy.handleExecutionFailure(bulkDataPersister, bulkItems, DataSourceException.class.cast(e.getCause()));
 250  0
                                         } catch (Exception unrecoverableException) {
 251  0
                                                 if (CompositeBulkDataPersister.LOGGER.isLoggable(Level.SEVERE)) {
 252  0
                                                         CompositeBulkDataPersister.LOGGER.log(Level.SEVERE, "Exception trying to handle execution exception", unrecoverableException);
 253  
                                                 }
 254  
                                                 
 255  0
                                                 handleUnrecoverableException(bulkDataPersister, bulkItems, unrecoverableException);
 256  0
                                         }
 257  0
                                 } else {
 258  0
                                         handleUnrecoverableException(bulkDataPersister, bulkItems, e);
 259  
                                 }
 260  0
                         }
 261  0
                 }
 262  0
         }
 263  
         
 264  
         /**
 265  
          * Constructs the {@link ExecutorService} implementation used to execute concurrently all {@link BulkDataPersister} delegates.
 266  
          * Called each time {@link CompositeBulkDataPersister#executeBulk(List)} is called.
 267  
          * <br />
 268  
          * <br />
 269  
          * Default implementation returns a {@link ThreadPoolExecutor} using one named thread (CompositeBulkDataPersisterThread-X) per {@link BulkDataPersister} delegate.
 270  
          *  
 271  
          * @return the executorService
 272  
          */
 273  
         protected synchronized ExecutorService getExecutorService() {
 274  0
                 if (this.executorService == null) {
 275  0
                         this.executorService = new ThreadPoolExecutor(this.bulkDataPersisterDelegates.size(), this.bulkDataPersisterDelegates.size(), 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
 276  
 
 277  0
                                 private final ThreadFactory defaultThreadFactory = Executors.defaultThreadFactory();
 278  0
                                 private final AtomicInteger threadCount = new AtomicInteger(0);
 279  
                                 
 280  0
                                 public Thread newThread(final Runnable runnable) {
 281  0
                                         final Thread thread = this.defaultThreadFactory.newThread(runnable);
 282  0
                                         thread.setName(CompositeBulkDataPersister.class.getSimpleName()+"Thread-"+String.valueOf(this.threadCount.incrementAndGet()));
 283  0
                                         return thread;
 284  
                                 }
 285  
                                 
 286  
                         });
 287  
                 }
 288  0
                 return this.executorService;
 289  
         }
 290  
         
 291  
         /**
 292  
          * Last chance to handle an exception thrown by {@link BulkDataPersister#executeBulk(List)}.
 293  
          * <br />
 294  
          * <br />
 295  
          * Is called when:
 296  
          * -exception thrown is a {@link DataSourceException} and provided execution exception strategy throws an exception
 297  
          * -exception thrown is a {@link DataSourceException} and there is no provided execution exception strategy
 298  
          * -exception thrown is not a {@link DataSourceException}
 299  
          * <br />
 300  
          * Is NOT called when:
 301  
          * -exception thrown is a {@link DataSourceException} and provided execution exception strategy DOES NOT throw an exception
 302  
          * <br />
 303  
          * <br />
 304  
          * Default implementation use a {@link ObjectOutputStream} to dump bulkItems.
 305  
          * 
 306  
          * @param bulkDataPersister the persister used
 307  
          * @param bulkItems the information received
 308  
          * @param exception the exception generated by {@link BulkDataPersister#executeBulk(List)}
 309  
          * @throws DataSourceException if thrown propagated to mirror which triggers redo-log mechanism
 310  
          */
 311  
         protected void handleUnrecoverableException(final BulkDataPersister bulkDataPersister, final List<BulkItem> bulkItems, final Exception exception) throws DataSourceException {
 312  0
                 ObjectOutputStream objectOutputStream = null;
 313  0
                 final String fileName = "unrecoverable-exception-"+bulkDataPersister.getClass().getName()+".txt";
 314  
                 try {
 315  0
                         objectOutputStream = new ObjectOutputStream(new FileOutputStream(fileName, false));
 316  0
                         objectOutputStream.writeObject(bulkItems);
 317  0
                 } catch (IOException e) {
 318  0
                         throw new DataSourceException("Got exception dealing with unrecoverable exception; propagating", exception);
 319  
                 } finally {
 320  0
                         if (objectOutputStream != null) {
 321  
                                 try {
 322  0
                                         objectOutputStream.close();
 323  0
                                 } catch (IOException e) {
 324  0
                                         if (CompositeBulkDataPersister.LOGGER.isLoggable(Level.SEVERE)) {
 325  0
                                                 CompositeBulkDataPersister.LOGGER.log(Level.SEVERE, "Exception closing <"+fileName+">", e);
 326  
                                         }
 327  0
                                 }
 328  0
                         }
 329  0
                 }
 330  0
         }
 331  
 
 332  
         /** 
 333  
          * {@inheritDoc}
 334  
          */
 335  
         public void shutdown() throws DataSourceException {
 336  
                 //if one of provided BulkDataPersister is a ManagedDataSource safely shutdown it
 337  0
                 for (final BulkDataPersister bulkDataPersister : this.bulkDataPersisterDelegates) {
 338  0
                         if (bulkDataPersister instanceof ManagedDataSource) {
 339  
                                 try {
 340  0
                                         ManagedDataSource.class.cast(bulkDataPersister).shutdown();
 341  0
                                 } catch (DataSourceException e) {
 342  0
                                         if (CompositeBulkDataPersister.LOGGER.isLoggable(Level.SEVERE)) {
 343  0
                                                 CompositeBulkDataPersister.LOGGER.log(Level.SEVERE, "Exception trying to shutdown <"+bulkDataPersister+">", e);
 344  
                                         }
 345  0
                                 }
 346  
                         }
 347  0
                 }
 348  0
         }
 349  
         
 350  
         public void setExecutionFailureStrategy(final ExecutionFailureStrategy executionFailureStrategy) {
 351  0
                 this.executionFailureStrategy = executionFailureStrategy;
 352  0
         }
 353  
 
 354  
 }