View Javadoc

1   /*
2    * Copyright 2008 fastConnect.
3    *    
4    * This file is part of Composite BulkDataPersister.  
5    *
6    * This is free software; you can redistribute it and/or modify it
7    * under the terms of the GNU Lesser General Public License as
8    * published by the Free Software Foundation; either version 3 of
9    * the License, or (at your option) any later version.
10   *
11   * This software is distributed in the hope that it will be useful,
12   * but WITHOUT ANY WARRANTY; without even the implied warranty of
13   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14   * Lesser General Public License for more details.
15   *
16   * You should have received a copy of the GNU Lesser General Public
17   * License along with this software. If not, see <http://www.gnu.org/licenses/>.
18   */
19  package fr.fastconnect.gigaspaces.datasource;
20  
21  import java.io.FileOutputStream;
22  import java.io.IOException;
23  import java.io.ObjectOutputStream;
24  import java.util.ArrayList;
25  import java.util.Arrays;
26  import java.util.HashMap;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.Properties;
30  import java.util.concurrent.Callable;
31  import java.util.concurrent.ExecutionException;
32  import java.util.concurrent.ExecutorService;
33  import java.util.concurrent.Executors;
34  import java.util.concurrent.Future;
35  import java.util.concurrent.LinkedBlockingQueue;
36  import java.util.concurrent.ThreadFactory;
37  import java.util.concurrent.ThreadPoolExecutor;
38  import java.util.concurrent.TimeUnit;
39  import java.util.concurrent.atomic.AtomicInteger;
40  import java.util.logging.Level;
41  import java.util.logging.Logger;
42  
43  import com.gigaspaces.datasource.BulkDataPersister;
44  import com.gigaspaces.datasource.BulkItem;
45  import com.gigaspaces.datasource.DataIterator;
46  import com.gigaspaces.datasource.DataSourceException;
47  import com.gigaspaces.datasource.ManagedDataSource;
48  
49  /**
50   *Provides the capacity to use several implementation of BulkDataPersister using a Mirror.
51   *
52   *@param <T>
53   */
54  public class CompositeBulkDataPersister<T> implements BulkDataPersister, ManagedDataSource<T> {
55  	
56  	/**
57  	 * Simple implementation of {@link Callable} calling {@link BulkDataPersister#executeBulk(List)} on provided {@link BulkDataPersister}.
58  	 */
59  	private static class BulkDataPersisterExecutor implements Callable<Void> {
60  
61  		private final BulkDataPersister bulkDataPersister;
62  		private final List<BulkItem> bulkItems;
63  		
64  		public BulkDataPersisterExecutor(final BulkDataPersister bulkDataPersister, final List<BulkItem> bulkItems, final ExecutionFailureStrategy executionFailureStrategy) {
65  			this.bulkDataPersister = bulkDataPersister;
66  			this.bulkItems = bulkItems;
67  		}
68  		
69  		public Void call() throws Exception {
70  			this.bulkDataPersister.executeBulk(this.bulkItems);
71  			return null;
72  		}
73  		
74  	}
75  
76  	private static final Logger LOGGER = Logger.getLogger(CompositeBulkDataPersister.class.getName());
77  	
78  	public static final String BULK_DATA_PERSISTER_CLASS_NAMES_PROPERTY = "delegate-classnames";
79  	public static final String BULK_DATA_PERSISTER_CLASS_NAMES_DELIMITER_PROPERTY = "delegate-classname-delimiter";
80  	public static final String BULK_DATA_PERSISTER_EXECUTION_EXCEPTION_STRATEGY_CLASS_NAME_PROPERTY = "executionfailurestrategy-classname";
81  	
82  	private List<BulkDataPersister> bulkDataPersisterDelegates = null;
83  	private ExecutionFailureStrategy executionFailureStrategy;
84  	
85  	private ExecutorService executorService;
86  	
87  	public CompositeBulkDataPersister() {
88  	}
89  	
90  	public CompositeBulkDataPersister(final BulkDataPersister[] bulkDataPersisterDelegates) {
91  		this.bulkDataPersisterDelegates = Arrays.asList(bulkDataPersisterDelegates);
92  	}
93  	
94  	/** 
95  	 * {@inheritDoc}
96  	 */
97  	public void init(final Properties properties) throws DataSourceException {
98  		if (this.bulkDataPersisterDelegates == null) {
99  			//get configured BulkDataPersister class names
100 			if (properties.containsKey(BULK_DATA_PERSISTER_CLASS_NAMES_PROPERTY)) {
101 				final String bulkDataPersisterClassNames = properties.getProperty(BULK_DATA_PERSISTER_CLASS_NAMES_PROPERTY);
102 				if (CompositeBulkDataPersister.LOGGER.isLoggable(Level.CONFIG)) {
103 					CompositeBulkDataPersister.LOGGER.config("Using <"+bulkDataPersisterClassNames+"> as "+BULK_DATA_PERSISTER_CLASS_NAMES_PROPERTY);
104 				}
105 				
106 				final String bulkDataPersisterClassNamesDelimiter = properties.getProperty(BULK_DATA_PERSISTER_CLASS_NAMES_DELIMITER_PROPERTY, ";");
107 				if (CompositeBulkDataPersister.LOGGER.isLoggable(Level.CONFIG)) {
108 					CompositeBulkDataPersister.LOGGER.config("Using <"+bulkDataPersisterClassNamesDelimiter+"> as "+BULK_DATA_PERSISTER_CLASS_NAMES_DELIMITER_PROPERTY);
109 				}
110 				
111 				//try to instantiate ExecutionFailureStrategy
112 				final String bulkDataPersisterExecutionExceptionStrategyClassName = properties.getProperty(BULK_DATA_PERSISTER_EXECUTION_EXCEPTION_STRATEGY_CLASS_NAME_PROPERTY);
113 				if (bulkDataPersisterExecutionExceptionStrategyClassName != null) {
114 					if (CompositeBulkDataPersister.LOGGER.isLoggable(Level.CONFIG)) {
115 						CompositeBulkDataPersister.LOGGER.config("Using <"+bulkDataPersisterExecutionExceptionStrategyClassName+"> as "+BULK_DATA_PERSISTER_EXECUTION_EXCEPTION_STRATEGY_CLASS_NAME_PROPERTY);
116 					}
117 					
118 					try {
119 						final Class<?> bulkDataPersisterExecutionExceptionStrategyClass = Class.forName(bulkDataPersisterExecutionExceptionStrategyClassName.trim());
120 						try {
121 							final Object bulkDataPersisterExecutionExceptionStrategy = bulkDataPersisterExecutionExceptionStrategyClass.newInstance();
122 							if (bulkDataPersisterExecutionExceptionStrategy instanceof ExecutionFailureStrategy) {
123 								this.executionFailureStrategy = ExecutionFailureStrategy.class.cast(bulkDataPersisterExecutionExceptionStrategy);
124 								this.executionFailureStrategy.init(properties);
125 							} else {
126 								final String message = "<"+bulkDataPersisterExecutionExceptionStrategy+"> is not a "+BulkDataPersister.class.getSimpleName();
127 								if (CompositeBulkDataPersister.LOGGER.isLoggable(Level.SEVERE)) {
128 									CompositeBulkDataPersister.LOGGER.log(Level.SEVERE, message);
129 								}
130 								throw new DataSourceException(message);
131 							}
132 						} catch (InstantiationException e) {
133 							final String message = "<"+bulkDataPersisterExecutionExceptionStrategyClass.getName()+"> has no default constructor; skipping";
134 							if (CompositeBulkDataPersister.LOGGER.isLoggable(Level.SEVERE)) {
135 								CompositeBulkDataPersister.LOGGER.log(Level.SEVERE, message, e);
136 							}
137 							throw new DataSourceException(message, e);
138 						} catch (IllegalAccessException e) {
139 							final String message = "<"+bulkDataPersisterExecutionExceptionStrategyClass.getName()+"> has no visible default constructor; skipping";
140 							if (CompositeBulkDataPersister.LOGGER.isLoggable(Level.SEVERE)) {
141 								CompositeBulkDataPersister.LOGGER.log(Level.SEVERE, message, e);
142 							}
143 							throw new DataSourceException(message, e);
144 						}
145 					} catch (ClassNotFoundException e) {
146 						final String message = "<"+bulkDataPersisterExecutionExceptionStrategyClassName+"> cannot be found";
147 						if (CompositeBulkDataPersister.LOGGER.isLoggable(Level.SEVERE)) {
148 							CompositeBulkDataPersister.LOGGER.log(Level.SEVERE, message, e);
149 						}
150 						throw new DataSourceException(message, e);
151 					}
152 				}
153 				
154 				this.bulkDataPersisterDelegates = new ArrayList<BulkDataPersister>(bulkDataPersisterClassNames.length());
155 				
156 				//try to instantiate all defined BulkDataPersister classes
157 				for (final String bulkDataPersisterClassName : bulkDataPersisterClassNames.split(bulkDataPersisterClassNamesDelimiter)) {
158 					try {
159 						final Class<?> bulkDataPersisterClass = Class.forName(bulkDataPersisterClassName.trim());
160 						try {
161 							final Object bulkDataPersister = bulkDataPersisterClass.newInstance();
162 							if (bulkDataPersister instanceof BulkDataPersister) {
163 								this.bulkDataPersisterDelegates.add(BulkDataPersister.class.cast(bulkDataPersister));
164 							} else {
165 								if (CompositeBulkDataPersister.LOGGER.isLoggable(Level.WARNING)) {
166 									CompositeBulkDataPersister.LOGGER.warning("<"+bulkDataPersister+"> is not a "+BulkDataPersister.class.getSimpleName()+"; skipping");
167 								}
168 							}
169 						} catch (InstantiationException e) {
170 							if (CompositeBulkDataPersister.LOGGER.isLoggable(Level.WARNING)) {
171 								CompositeBulkDataPersister.LOGGER.log(Level.WARNING, "<"+bulkDataPersisterClass.getName()+"> has no default constructor; skipping", e);
172 							}
173 						} catch (IllegalAccessException e) {
174 							if (CompositeBulkDataPersister.LOGGER.isLoggable(Level.WARNING)) {
175 								CompositeBulkDataPersister.LOGGER.log(Level.WARNING, "<"+bulkDataPersisterClass.getName()+"> has no visible default constructor; skipping", e);
176 							}
177 						}
178 					} catch (ClassNotFoundException e) {
179 						if (CompositeBulkDataPersister.LOGGER.isLoggable(Level.WARNING)) {
180 							CompositeBulkDataPersister.LOGGER.log(Level.WARNING, "<"+bulkDataPersisterClassName+"> cannot be found; skipping", e);
181 						}
182 					}
183 					
184 				}
185 			} else {
186 				final String message = "No "+BULK_DATA_PERSISTER_CLASS_NAMES_PROPERTY+" defined";
187 				if (CompositeBulkDataPersister.LOGGER.isLoggable(Level.SEVERE)) {
188 					CompositeBulkDataPersister.LOGGER.log(Level.SEVERE, message);
189 				}
190 				throw new DataSourceException(message);
191 			}
192 		} else {
193 			if (CompositeBulkDataPersister.LOGGER.isLoggable(Level.CONFIG)) {
194 				CompositeBulkDataPersister.LOGGER.log(Level.CONFIG, CompositeBulkDataPersister.class.getSimpleName()+" configured manually; ignoring properties");
195 			}
196 		}
197 		
198 		if (CompositeBulkDataPersister.LOGGER.isLoggable(Level.CONFIG)) {
199 			CompositeBulkDataPersister.LOGGER.log(Level.CONFIG, "Delegating to <"+this.bulkDataPersisterDelegates+"> using <"+this.executionFailureStrategy+"> execution exception strategy");
200 		}
201 		if (this.executionFailureStrategy == null) {
202 			if (CompositeBulkDataPersister.LOGGER.isLoggable(Level.WARNING)) {
203 				CompositeBulkDataPersister.LOGGER.warning("No "+ExecutionFailureStrategy.class.getSimpleName()+" defined; execution exception will be ignored");
204 			}
205 		}
206 	}
207 	
208 	/** 
209 	 * {@inheritDoc}
210 	 */
211 	public DataIterator<T> initialLoad() throws DataSourceException {
212 		return null;
213 	}
214 	
215 	/** 
216 	 * {@inheritDoc}
217 	 */
218 	public void executeBulk(final List<BulkItem> bulkItems) throws DataSourceException {		
219 		final ExecutorService executorService = getExecutorService();
220 		final Map<Future<Void>, BulkDataPersister> bulkDataPersisterExecutions = new HashMap<Future<Void>, BulkDataPersister>(this.bulkDataPersisterDelegates.size());
221 		for (final BulkDataPersister bulkDataPersister : this.bulkDataPersisterDelegates) {
222 			bulkDataPersisterExecutions.put(executorService.submit(new BulkDataPersisterExecutor(bulkDataPersister, bulkItems, this.executionFailureStrategy)), bulkDataPersister);
223 		}
224 		
225 		//wait for end of execution of all threads
226 		for (final Map.Entry<Future<Void>, BulkDataPersister> result : bulkDataPersisterExecutions.entrySet()) {
227 			try {
228 				//TODO do we have to handle a timeout here? 
229 				result.getKey().get();
230 			} catch (InterruptedException e) {
231 				if (CompositeBulkDataPersister.LOGGER.isLoggable(Level.SEVERE)) {
232 					CompositeBulkDataPersister.LOGGER.log(Level.SEVERE, "Thread <"+Thread.currentThread().getName()+"> has been interrupted", e);
233 				}
234 				
235 				Thread.currentThread().interrupt();
236 			} catch (ExecutionException e) {
237 				final BulkDataPersister bulkDataPersister = result.getValue();
238 				if (CompositeBulkDataPersister.LOGGER.isLoggable(Level.SEVERE)) {
239 					CompositeBulkDataPersister.LOGGER.log(Level.SEVERE, "Execution of <"+bulkDataPersister+"> has thrown an exception", e);
240 				}
241 
242 				//if exception thrown is a DataSourceException and we have an executionFailureStrategy defined then delegate handling to it
243 				if (e.getCause() != null && e.getCause() instanceof DataSourceException && this.executionFailureStrategy != null) {
244 					if (CompositeBulkDataPersister.LOGGER.isLoggable(Level.INFO)) {
245 						CompositeBulkDataPersister.LOGGER.log(Level.INFO, "Handling execution exception using <"+this.executionFailureStrategy+">");
246 					}
247 					
248 					try {
249 						this.executionFailureStrategy.handleExecutionFailure(bulkDataPersister, bulkItems, DataSourceException.class.cast(e.getCause()));
250 					} catch (Exception unrecoverableException) {
251 						if (CompositeBulkDataPersister.LOGGER.isLoggable(Level.SEVERE)) {
252 							CompositeBulkDataPersister.LOGGER.log(Level.SEVERE, "Exception trying to handle execution exception", unrecoverableException);
253 						}
254 						
255 						handleUnrecoverableException(bulkDataPersister, bulkItems, unrecoverableException);
256 					}
257 				} else {
258 					handleUnrecoverableException(bulkDataPersister, bulkItems, e);
259 				}
260 			}
261 		}
262 	}
263 	
264 	/**
265 	 * Constructs the {@link ExecutorService} implementation used to execute concurrently all {@link BulkDataPersister} delegates.
266 	 * Called each time {@link CompositeBulkDataPersister#executeBulk(List)} is called.
267 	 * <br />
268 	 * <br />
269 	 * Default implementation returns a {@link ThreadPoolExecutor} using one named thread (CompositeBulkDataPersisterThread-X) per {@link BulkDataPersister} delegate.
270 	 *  
271 	 * @return the executorService
272 	 */
273 	protected synchronized ExecutorService getExecutorService() {
274 		if (this.executorService == null) {
275 			this.executorService = new ThreadPoolExecutor(this.bulkDataPersisterDelegates.size(), this.bulkDataPersisterDelegates.size(), 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
276 
277 				private final ThreadFactory defaultThreadFactory = Executors.defaultThreadFactory();
278 				private final AtomicInteger threadCount = new AtomicInteger(0);
279 				
280 				public Thread newThread(final Runnable runnable) {
281 					final Thread thread = this.defaultThreadFactory.newThread(runnable);
282 					thread.setName(CompositeBulkDataPersister.class.getSimpleName()+"Thread-"+String.valueOf(this.threadCount.incrementAndGet()));
283 					return thread;
284 				}
285 				
286 			});
287 		}
288 		return this.executorService;
289 	}
290 	
291 	/**
292 	 * Last chance to handle an exception thrown by {@link BulkDataPersister#executeBulk(List)}.
293 	 * <br />
294 	 * <br />
295 	 * Is called when:
296 	 * -exception thrown is a {@link DataSourceException} and provided execution exception strategy throws an exception
297 	 * -exception thrown is a {@link DataSourceException} and there is no provided execution exception strategy
298 	 * -exception thrown is not a {@link DataSourceException}
299 	 * <br />
300 	 * Is NOT called when:
301 	 * -exception thrown is a {@link DataSourceException} and provided execution exception strategy DOES NOT throw an exception
302 	 * <br />
303 	 * <br />
304 	 * Default implementation use a {@link ObjectOutputStream} to dump bulkItems.
305 	 * 
306 	 * @param bulkDataPersister the persister used
307 	 * @param bulkItems the information received
308 	 * @param exception the exception generated by {@link BulkDataPersister#executeBulk(List)}
309 	 * @throws DataSourceException if thrown propagated to mirror which triggers redo-log mechanism
310 	 */
311 	protected void handleUnrecoverableException(final BulkDataPersister bulkDataPersister, final List<BulkItem> bulkItems, final Exception exception) throws DataSourceException {
312 		ObjectOutputStream objectOutputStream = null;
313 		final String fileName = "unrecoverable-exception-"+bulkDataPersister.getClass().getName()+".txt";
314 		try {
315 			objectOutputStream = new ObjectOutputStream(new FileOutputStream(fileName, false));
316 			objectOutputStream.writeObject(bulkItems);
317 		} catch (IOException e) {
318 			throw new DataSourceException("Got exception dealing with unrecoverable exception; propagating", exception);
319 		} finally {
320 			if (objectOutputStream != null) {
321 				try {
322 					objectOutputStream.close();
323 				} catch (IOException e) {
324 					if (CompositeBulkDataPersister.LOGGER.isLoggable(Level.SEVERE)) {
325 						CompositeBulkDataPersister.LOGGER.log(Level.SEVERE, "Exception closing <"+fileName+">", e);
326 					}
327 				}
328 			}
329 		}
330 	}
331 
332 	/** 
333 	 * {@inheritDoc}
334 	 */
335 	public void shutdown() throws DataSourceException {
336 		//if one of provided BulkDataPersister is a ManagedDataSource safely shutdown it
337 		for (final BulkDataPersister bulkDataPersister : this.bulkDataPersisterDelegates) {
338 			if (bulkDataPersister instanceof ManagedDataSource) {
339 				try {
340 					ManagedDataSource.class.cast(bulkDataPersister).shutdown();
341 				} catch (DataSourceException e) {
342 					if (CompositeBulkDataPersister.LOGGER.isLoggable(Level.SEVERE)) {
343 						CompositeBulkDataPersister.LOGGER.log(Level.SEVERE, "Exception trying to shutdown <"+bulkDataPersister+">", e);
344 					}
345 				}
346 			}
347 		}
348 	}
349 	
350 	public void setExecutionFailureStrategy(final ExecutionFailureStrategy executionFailureStrategy) {
351 		this.executionFailureStrategy = executionFailureStrategy;
352 	}
353 
354 }