1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package fr.fastconnect.gigaspaces.datasource;
20
21 import java.io.FileOutputStream;
22 import java.io.IOException;
23 import java.io.ObjectOutputStream;
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.HashMap;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Properties;
30 import java.util.concurrent.Callable;
31 import java.util.concurrent.ExecutionException;
32 import java.util.concurrent.ExecutorService;
33 import java.util.concurrent.Executors;
34 import java.util.concurrent.Future;
35 import java.util.concurrent.LinkedBlockingQueue;
36 import java.util.concurrent.ThreadFactory;
37 import java.util.concurrent.ThreadPoolExecutor;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicInteger;
40 import java.util.logging.Level;
41 import java.util.logging.Logger;
42
43 import com.gigaspaces.datasource.BulkDataPersister;
44 import com.gigaspaces.datasource.BulkItem;
45 import com.gigaspaces.datasource.DataIterator;
46 import com.gigaspaces.datasource.DataSourceException;
47 import com.gigaspaces.datasource.ManagedDataSource;
48
49
50
51
52
53
54 public class CompositeBulkDataPersister<T> implements BulkDataPersister, ManagedDataSource<T> {
55
56
57
58
59 private static class BulkDataPersisterExecutor implements Callable<Void> {
60
61 private final BulkDataPersister bulkDataPersister;
62 private final List<BulkItem> bulkItems;
63
64 public BulkDataPersisterExecutor(final BulkDataPersister bulkDataPersister, final List<BulkItem> bulkItems, final ExecutionFailureStrategy executionFailureStrategy) {
65 this.bulkDataPersister = bulkDataPersister;
66 this.bulkItems = bulkItems;
67 }
68
69 public Void call() throws Exception {
70 this.bulkDataPersister.executeBulk(this.bulkItems);
71 return null;
72 }
73
74 }
75
76 private static final Logger LOGGER = Logger.getLogger(CompositeBulkDataPersister.class.getName());
77
78 public static final String BULK_DATA_PERSISTER_CLASS_NAMES_PROPERTY = "delegate-classnames";
79 public static final String BULK_DATA_PERSISTER_CLASS_NAMES_DELIMITER_PROPERTY = "delegate-classname-delimiter";
80 public static final String BULK_DATA_PERSISTER_EXECUTION_EXCEPTION_STRATEGY_CLASS_NAME_PROPERTY = "executionfailurestrategy-classname";
81
82 private List<BulkDataPersister> bulkDataPersisterDelegates = null;
83 private ExecutionFailureStrategy executionFailureStrategy;
84
85 private ExecutorService executorService;
86
87 public CompositeBulkDataPersister() {
88 }
89
90 public CompositeBulkDataPersister(final BulkDataPersister[] bulkDataPersisterDelegates) {
91 this.bulkDataPersisterDelegates = Arrays.asList(bulkDataPersisterDelegates);
92 }
93
94
95
96
97 public void init(final Properties properties) throws DataSourceException {
98 if (this.bulkDataPersisterDelegates == null) {
99
100 if (properties.containsKey(BULK_DATA_PERSISTER_CLASS_NAMES_PROPERTY)) {
101 final String bulkDataPersisterClassNames = properties.getProperty(BULK_DATA_PERSISTER_CLASS_NAMES_PROPERTY);
102 if (CompositeBulkDataPersister.LOGGER.isLoggable(Level.CONFIG)) {
103 CompositeBulkDataPersister.LOGGER.config("Using <"+bulkDataPersisterClassNames+"> as "+BULK_DATA_PERSISTER_CLASS_NAMES_PROPERTY);
104 }
105
106 final String bulkDataPersisterClassNamesDelimiter = properties.getProperty(BULK_DATA_PERSISTER_CLASS_NAMES_DELIMITER_PROPERTY, ";");
107 if (CompositeBulkDataPersister.LOGGER.isLoggable(Level.CONFIG)) {
108 CompositeBulkDataPersister.LOGGER.config("Using <"+bulkDataPersisterClassNamesDelimiter+"> as "+BULK_DATA_PERSISTER_CLASS_NAMES_DELIMITER_PROPERTY);
109 }
110
111
112 final String bulkDataPersisterExecutionExceptionStrategyClassName = properties.getProperty(BULK_DATA_PERSISTER_EXECUTION_EXCEPTION_STRATEGY_CLASS_NAME_PROPERTY);
113 if (bulkDataPersisterExecutionExceptionStrategyClassName != null) {
114 if (CompositeBulkDataPersister.LOGGER.isLoggable(Level.CONFIG)) {
115 CompositeBulkDataPersister.LOGGER.config("Using <"+bulkDataPersisterExecutionExceptionStrategyClassName+"> as "+BULK_DATA_PERSISTER_EXECUTION_EXCEPTION_STRATEGY_CLASS_NAME_PROPERTY);
116 }
117
118 try {
119 final Class<?> bulkDataPersisterExecutionExceptionStrategyClass = Class.forName(bulkDataPersisterExecutionExceptionStrategyClassName.trim());
120 try {
121 final Object bulkDataPersisterExecutionExceptionStrategy = bulkDataPersisterExecutionExceptionStrategyClass.newInstance();
122 if (bulkDataPersisterExecutionExceptionStrategy instanceof ExecutionFailureStrategy) {
123 this.executionFailureStrategy = ExecutionFailureStrategy.class.cast(bulkDataPersisterExecutionExceptionStrategy);
124 this.executionFailureStrategy.init(properties);
125 } else {
126 final String message = "<"+bulkDataPersisterExecutionExceptionStrategy+"> is not a "+BulkDataPersister.class.getSimpleName();
127 if (CompositeBulkDataPersister.LOGGER.isLoggable(Level.SEVERE)) {
128 CompositeBulkDataPersister.LOGGER.log(Level.SEVERE, message);
129 }
130 throw new DataSourceException(message);
131 }
132 } catch (InstantiationException e) {
133 final String message = "<"+bulkDataPersisterExecutionExceptionStrategyClass.getName()+"> has no default constructor; skipping";
134 if (CompositeBulkDataPersister.LOGGER.isLoggable(Level.SEVERE)) {
135 CompositeBulkDataPersister.LOGGER.log(Level.SEVERE, message, e);
136 }
137 throw new DataSourceException(message, e);
138 } catch (IllegalAccessException e) {
139 final String message = "<"+bulkDataPersisterExecutionExceptionStrategyClass.getName()+"> has no visible default constructor; skipping";
140 if (CompositeBulkDataPersister.LOGGER.isLoggable(Level.SEVERE)) {
141 CompositeBulkDataPersister.LOGGER.log(Level.SEVERE, message, e);
142 }
143 throw new DataSourceException(message, e);
144 }
145 } catch (ClassNotFoundException e) {
146 final String message = "<"+bulkDataPersisterExecutionExceptionStrategyClassName+"> cannot be found";
147 if (CompositeBulkDataPersister.LOGGER.isLoggable(Level.SEVERE)) {
148 CompositeBulkDataPersister.LOGGER.log(Level.SEVERE, message, e);
149 }
150 throw new DataSourceException(message, e);
151 }
152 }
153
154 this.bulkDataPersisterDelegates = new ArrayList<BulkDataPersister>(bulkDataPersisterClassNames.length());
155
156
157 for (final String bulkDataPersisterClassName : bulkDataPersisterClassNames.split(bulkDataPersisterClassNamesDelimiter)) {
158 try {
159 final Class<?> bulkDataPersisterClass = Class.forName(bulkDataPersisterClassName.trim());
160 try {
161 final Object bulkDataPersister = bulkDataPersisterClass.newInstance();
162 if (bulkDataPersister instanceof BulkDataPersister) {
163 this.bulkDataPersisterDelegates.add(BulkDataPersister.class.cast(bulkDataPersister));
164 } else {
165 if (CompositeBulkDataPersister.LOGGER.isLoggable(Level.WARNING)) {
166 CompositeBulkDataPersister.LOGGER.warning("<"+bulkDataPersister+"> is not a "+BulkDataPersister.class.getSimpleName()+"; skipping");
167 }
168 }
169 } catch (InstantiationException e) {
170 if (CompositeBulkDataPersister.LOGGER.isLoggable(Level.WARNING)) {
171 CompositeBulkDataPersister.LOGGER.log(Level.WARNING, "<"+bulkDataPersisterClass.getName()+"> has no default constructor; skipping", e);
172 }
173 } catch (IllegalAccessException e) {
174 if (CompositeBulkDataPersister.LOGGER.isLoggable(Level.WARNING)) {
175 CompositeBulkDataPersister.LOGGER.log(Level.WARNING, "<"+bulkDataPersisterClass.getName()+"> has no visible default constructor; skipping", e);
176 }
177 }
178 } catch (ClassNotFoundException e) {
179 if (CompositeBulkDataPersister.LOGGER.isLoggable(Level.WARNING)) {
180 CompositeBulkDataPersister.LOGGER.log(Level.WARNING, "<"+bulkDataPersisterClassName+"> cannot be found; skipping", e);
181 }
182 }
183
184 }
185 } else {
186 final String message = "No "+BULK_DATA_PERSISTER_CLASS_NAMES_PROPERTY+" defined";
187 if (CompositeBulkDataPersister.LOGGER.isLoggable(Level.SEVERE)) {
188 CompositeBulkDataPersister.LOGGER.log(Level.SEVERE, message);
189 }
190 throw new DataSourceException(message);
191 }
192 } else {
193 if (CompositeBulkDataPersister.LOGGER.isLoggable(Level.CONFIG)) {
194 CompositeBulkDataPersister.LOGGER.log(Level.CONFIG, CompositeBulkDataPersister.class.getSimpleName()+" configured manually; ignoring properties");
195 }
196 }
197
198 if (CompositeBulkDataPersister.LOGGER.isLoggable(Level.CONFIG)) {
199 CompositeBulkDataPersister.LOGGER.log(Level.CONFIG, "Delegating to <"+this.bulkDataPersisterDelegates+"> using <"+this.executionFailureStrategy+"> execution exception strategy");
200 }
201 if (this.executionFailureStrategy == null) {
202 if (CompositeBulkDataPersister.LOGGER.isLoggable(Level.WARNING)) {
203 CompositeBulkDataPersister.LOGGER.warning("No "+ExecutionFailureStrategy.class.getSimpleName()+" defined; execution exception will be ignored");
204 }
205 }
206 }
207
208
209
210
211 public DataIterator<T> initialLoad() throws DataSourceException {
212 return null;
213 }
214
215
216
217
218 public void executeBulk(final List<BulkItem> bulkItems) throws DataSourceException {
219 final ExecutorService executorService = getExecutorService();
220 final Map<Future<Void>, BulkDataPersister> bulkDataPersisterExecutions = new HashMap<Future<Void>, BulkDataPersister>(this.bulkDataPersisterDelegates.size());
221 for (final BulkDataPersister bulkDataPersister : this.bulkDataPersisterDelegates) {
222 bulkDataPersisterExecutions.put(executorService.submit(new BulkDataPersisterExecutor(bulkDataPersister, bulkItems, this.executionFailureStrategy)), bulkDataPersister);
223 }
224
225
226 for (final Map.Entry<Future<Void>, BulkDataPersister> result : bulkDataPersisterExecutions.entrySet()) {
227 try {
228
229 result.getKey().get();
230 } catch (InterruptedException e) {
231 if (CompositeBulkDataPersister.LOGGER.isLoggable(Level.SEVERE)) {
232 CompositeBulkDataPersister.LOGGER.log(Level.SEVERE, "Thread <"+Thread.currentThread().getName()+"> has been interrupted", e);
233 }
234
235 Thread.currentThread().interrupt();
236 } catch (ExecutionException e) {
237 final BulkDataPersister bulkDataPersister = result.getValue();
238 if (CompositeBulkDataPersister.LOGGER.isLoggable(Level.SEVERE)) {
239 CompositeBulkDataPersister.LOGGER.log(Level.SEVERE, "Execution of <"+bulkDataPersister+"> has thrown an exception", e);
240 }
241
242
243 if (e.getCause() != null && e.getCause() instanceof DataSourceException && this.executionFailureStrategy != null) {
244 if (CompositeBulkDataPersister.LOGGER.isLoggable(Level.INFO)) {
245 CompositeBulkDataPersister.LOGGER.log(Level.INFO, "Handling execution exception using <"+this.executionFailureStrategy+">");
246 }
247
248 try {
249 this.executionFailureStrategy.handleExecutionFailure(bulkDataPersister, bulkItems, DataSourceException.class.cast(e.getCause()));
250 } catch (Exception unrecoverableException) {
251 if (CompositeBulkDataPersister.LOGGER.isLoggable(Level.SEVERE)) {
252 CompositeBulkDataPersister.LOGGER.log(Level.SEVERE, "Exception trying to handle execution exception", unrecoverableException);
253 }
254
255 handleUnrecoverableException(bulkDataPersister, bulkItems, unrecoverableException);
256 }
257 } else {
258 handleUnrecoverableException(bulkDataPersister, bulkItems, e);
259 }
260 }
261 }
262 }
263
264
265
266
267
268
269
270
271
272
273 protected synchronized ExecutorService getExecutorService() {
274 if (this.executorService == null) {
275 this.executorService = new ThreadPoolExecutor(this.bulkDataPersisterDelegates.size(), this.bulkDataPersisterDelegates.size(), 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
276
277 private final ThreadFactory defaultThreadFactory = Executors.defaultThreadFactory();
278 private final AtomicInteger threadCount = new AtomicInteger(0);
279
280 public Thread newThread(final Runnable runnable) {
281 final Thread thread = this.defaultThreadFactory.newThread(runnable);
282 thread.setName(CompositeBulkDataPersister.class.getSimpleName()+"Thread-"+String.valueOf(this.threadCount.incrementAndGet()));
283 return thread;
284 }
285
286 });
287 }
288 return this.executorService;
289 }
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311 protected void handleUnrecoverableException(final BulkDataPersister bulkDataPersister, final List<BulkItem> bulkItems, final Exception exception) throws DataSourceException {
312 ObjectOutputStream objectOutputStream = null;
313 final String fileName = "unrecoverable-exception-"+bulkDataPersister.getClass().getName()+".txt";
314 try {
315 objectOutputStream = new ObjectOutputStream(new FileOutputStream(fileName, false));
316 objectOutputStream.writeObject(bulkItems);
317 } catch (IOException e) {
318 throw new DataSourceException("Got exception dealing with unrecoverable exception; propagating", exception);
319 } finally {
320 if (objectOutputStream != null) {
321 try {
322 objectOutputStream.close();
323 } catch (IOException e) {
324 if (CompositeBulkDataPersister.LOGGER.isLoggable(Level.SEVERE)) {
325 CompositeBulkDataPersister.LOGGER.log(Level.SEVERE, "Exception closing <"+fileName+">", e);
326 }
327 }
328 }
329 }
330 }
331
332
333
334
335 public void shutdown() throws DataSourceException {
336
337 for (final BulkDataPersister bulkDataPersister : this.bulkDataPersisterDelegates) {
338 if (bulkDataPersister instanceof ManagedDataSource) {
339 try {
340 ManagedDataSource.class.cast(bulkDataPersister).shutdown();
341 } catch (DataSourceException e) {
342 if (CompositeBulkDataPersister.LOGGER.isLoggable(Level.SEVERE)) {
343 CompositeBulkDataPersister.LOGGER.log(Level.SEVERE, "Exception trying to shutdown <"+bulkDataPersister+">", e);
344 }
345 }
346 }
347 }
348 }
349
350 public void setExecutionFailureStrategy(final ExecutionFailureStrategy executionFailureStrategy) {
351 this.executionFailureStrategy = executionFailureStrategy;
352 }
353
354 }