-
Notifications
You must be signed in to change notification settings - Fork 21
/
Copy pathlevel4.java
641 lines (541 loc) · 19.2 KB
/
level4.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
/*
* level4.java
*
* Copyright (c) 1999, 2017, Oracle and/or its affiliates. All rights reserved.
*
* Licensed under the Universal Permissive License v 1.0 as shown
* at http://oss.oracle.com/licenses/upl
*
* Do some simple operations on a TimesTen database
* using the TimesTen JDBC driver (JDK 1.4 and newer)
* 1. Using multiple threads each with a connection to the database
* 2. Process all orders (one thread per order) in INPUTFILE3
* a. Inserts will be done into the ORDERS and ORDER_ITEM tables and
* b. Updates will be done to the INVENTORY table.
* 3. Disconnect from the data store
*
* Prerequisite:
* Run the following command every time before running this demo:
* ttIsql -f input0.dat
*/
import java.io.PrintStream;
import java.io.PrintWriter;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.sql.*;
import javax.sql.*;
import com.timesten.jdbc.TimesTenDataSource;
public class level4
{
// Private static final defines
/**
* Maximum number of threads to create
*/
private static final int NUM_THREADS = 2;
/**
* Prefix URL to pass to DataSource.getConnection() for
* TimesTen Direct Connection
*/
private static final String directURLPrefix = "jdbc:timesten:direct:";
/**
* Prefix of URL to pass to DataSource.getConnection() for
* TimesTen client Connection
*/
private static final String clientURLPrefix = "jdbc:timesten:client:";
/**
* Name of input file containing data to insert.
*/
private static final String INPUTFILE = "datfiles/input3.dat";
/**
* Canned SQL statements
*/
/* Insert a new order into the ORDERS table */
private static final String INSERTORDERSTMT = "insert into orders values (orderID.nextval,?,?,?,?)";
/* Insert the new order for products into the ORDER_ITEM table */
private static final String INSERTOITEMSTMT = "insert into order_item values (orderID.currval,?,?)";
/* Check inventory */
private static final String SELECTINVSTMT = "select quantity from inventory where prod_num = ? for update";
/* Update inventory */
private static final String UPDATEINVSTMT = "update inventory set quantity = (quantity - ?) where prod_num = ?";
// Private static variables
/**
* Standard output and error streams
*/
private static PrintStream errStream = System.err;
private static PrintStream outStream = System.out;
/**
* Variables for passing values from options parsing
*/
private static String opt_connstr = "";
private static boolean opt_doClient = false;
private static boolean opt_doTrace = false;
/**
* The ThreadParameter class is used to pass parameters into
* and receive return values out of a thread.
*
* @see #ProcessOrderThread
*/
class ThreadParameter
{
int returnValue;
int tid;
String orderLine;
Connection con;
PreparedStatement pInsOrd;
PreparedStatement pInsItm;
PreparedStatement pUpdInv;
PreparedStatement pSelInv;
}
// package static variables
/**
* Flag indicating whether the shutdown hook thread should wait
* and hold-up the JVM exit so that the running examples can
* implement a clean exit first.
* This flag is set only when the examples are being run.
*/
static boolean shouldWait = false;
/**
* A synchronization object used by the shutdown hook thread to
* wait for an orderly database disconnect before exiting.
*/
static Object stopMonitor;
/**
* Flag indicating whether a "signal" has been received.
* This flag is set by our shutdown hook thread.
*/
static boolean hasReceivedSignal = false;
// Instance variables for the instantiated level4 object
/**
* JDBC connection URL
*/
private String URL = "";
/**
* main: Start of appliation.
*
* 1. Parse the arguments.
* 2. Instantiate and run the level4 example.
*/
public static void main(String[] args)
{
int retcode = 1; // Return code: Assume failure
// Parse options
IOLibrary myLib = new IOLibrary(errStream);
//String className = this.getClass().getName();
String className = "level4";
String usageString = myLib.getUsageString(className);
if (myLib.parseOpts(args, usageString) == false)
{
System.exit(retcode);
}
// Add a thread to the JVM shutdown hook to cleanly exit the
// application in the event of a JVM shutdown
stopMonitor = new Object();
Runtime.getRuntime().addShutdownHook(new Thread("Shutdown thread")
{
public void run() {
synchronized (stopMonitor) {
if (shouldWait) {
// outStream.println("Shutdown thread Waiting");
hasReceivedSignal = true;
try {
// Wait for OK to proceed
stopMonitor.wait(); // Might want have time limit.
} catch (InterruptedException e) {
e.printStackTrace();
}
// outStream.println("Leaving Shutdown thread");
}
}
}
});
// Construct the level4 object
level4 lvl4 = new level4();
try {
// Tell the shutdown hook to wait for a clean exit
synchronized (stopMonitor) {
shouldWait = true;
}
// Run the level4 example.
retcode = lvl4.runexample(myLib.opt_doTrace, myLib.opt_doClient, myLib.opt_connstr);
} finally {
// Unblock the shutdown hook
synchronized (stopMonitor) {
shouldWait = false;
stopMonitor.notify();
}
}
// exit program
System.exit(retcode);
}
/**
* Constructor for class level4
*/
public level4()
{
}
/**
* Run the level4 example.
*/
public int runexample(boolean doTrace, boolean doClient, String connStr)
{
int retcode = 1; // Return code: Assume failure
// Construct the connection URL
if (doClient) {
outStream.println("\nUsing client connection");
URL = clientURLPrefix + connStr;
} else {
outStream.println("\nUsing direct connection");
URL = directURLPrefix + connStr;
}
// An array to hold the threads that will be created.
ProcessOrderThread[] threads = new ProcessOrderThread[NUM_THREADS];
// An array of thread parameter objects
ThreadParameter[] threadParms = new ThreadParameter[NUM_THREADS];
for (int i = 0; i < NUM_THREADS; i++) {
threadParms[i] = new ThreadParameter();
}
// Use the TimesTenDataSource object to connect to the datastore.
// See level1.java for example of using the DriverManager interface.
TimesTenDataSource ds = new TimesTenDataSource();
try {
// Turn on tracing if requested
if (doTrace) {
outStream.println("\nEnabling JDBC Tracing");
ds.setLogWriter(new PrintWriter(outStream, true));
}
// Open all connections to TimesTen
ds.setUrl(URL);
//Prompt for the Username and Password
AccessControl ac = new AccessControl();
String username = ac.getUsername();
String password = ac.getPassword();
Connection conInit;
conInit = ds.getConnection(username,password);
InitializeDatabase idb = new InitializeDatabase();
idb.initialize(conInit);
for (int i = 0; i < NUM_THREADS; i++)
{
outStream.println("\nOpening connection [" + i + "] to " + URL);
threadParms[i].tid = i;
threadParms[i].con = ds.getConnection(username,password);
reportSQLWarnings(threadParms[i].con.getWarnings());
CheckIfStopIsRequested();
// Explicitly turn off auto-commit
threadParms[i].con.setAutoCommit(false);
// Prepare all Statements ahead of time
threadParms[i].pInsOrd = threadParms[i].con.prepareStatement(INSERTORDERSTMT);
threadParms[i].pInsItm = threadParms[i].con.prepareStatement(INSERTOITEMSTMT);
threadParms[i].pUpdInv = threadParms[i].con.prepareStatement(UPDATEINVSTMT);
threadParms[i].pSelInv = threadParms[i].con.prepareStatement(SELECTINVSTMT);
// Prepare is a transaction; must commit to release locks
threadParms[i].con.commit();
}
int threadCount = 0;
// Process the orders
// Open file and read the input line by line
BufferedReader in = new BufferedReader(new FileReader(INPUTFILE));
String line;
while ((line = in.readLine()) != null) {
// Skip comments
if (line.equals("") || line.charAt(0) == '#') {
continue;
}
/*
* 1. Spawn as many simultaneous threads as we have connections
* 2. Wait for them all to finish
* 3. Repeat
*/
// Spawn the thread
threadParms[threadCount].orderLine = line;
threads[threadCount] = new ProcessOrderThread(threadParms[threadCount]);
threads[threadCount].start();
threadCount++;
CheckIfStopIsRequested();
// Wait for all threads
if (threadCount >= NUM_THREADS) {
// outStream.println("processOrders: Still processing file...");
for (int i = 0; i < NUM_THREADS; i++) {
threads[i].join();
CheckIfStopIsRequested();
// Check if we need to exit
outStream.println("\nthread [" + i + "] returned: " +
threadParms[i].returnValue);
threads[i] = null;
if (threadParms[i].returnValue != 0) {
return retcode; // fall into finally clause
}
}
threadCount = 0;
}
} // end of while()
// Close the file.
in.close();
// Close all prepared statements - we're done with them.
for (int i = 0; i < NUM_THREADS; i++) {
threadParms[i].pInsOrd.close();
threadParms[i].pInsItm.close();
threadParms[i].pUpdInv.close();
threadParms[i].pSelInv.close();
// Report any SQLWarnings on the connection
reportSQLWarnings(threadParms[i].con.getWarnings());
}
CheckIfStopIsRequested();
// Report any SQLWarnings on the connection
reportSQLWarnings(threadParms[0].con.getWarnings());
CheckIfStopIsRequested();
retcode = 0; // If we reached here - success.
// Fall through to finally clause
} catch (IOException ex) {
ex.printStackTrace();
// Fall through to finally clause
} catch (SQLException ex) {
if (ex.getSQLState().equalsIgnoreCase("S0002")) {
errStream.println("\nError: The table does not exist.\n\tPlease run ttIsql -f input0.dat to initialize the database.");
} else if (ex.getErrorCode() == 907) {
errStream.println("\nError: Attempting to insert a row with a duplicate primary key.\n\tPlease rerun ttIsql -f input0.dat to reinitialize the database.");
} else {
reportSQLExceptions(ex);
}
// Fall through to finally clause
} catch (InterruptedException ex) {
errStream.println(ex.getMessage());
// Wait for threads to finish if they were interrupted.
try {
for (int i = 0; i < NUM_THREADS; i++) {
if (threads[i] != null) {
threads[i].join();
threads[i] = null;
outStream.println("thread [" + i + "] returned: " +
threadParms[i].returnValue);
}
}
} catch (InterruptedException ex1) {
}
// Fall through to finally clause
} finally {
try {
for (int i = 0; i < NUM_THREADS; i++) {
if (threadParms[i].con != null && !threadParms[i].con.isClosed()) {
// Rollback any transactions in case of errors
if (retcode != 0) {
try {
outStream.println("\nEncountered error on a connection. Rolling back all transactions");
threadParms[i].con.rollback();
} catch (SQLException ex) {
reportSQLExceptions(ex);
}
}
outStream.println("\nClosing connection [" + i + "]");
threadParms[i].con.close();
threadParms[i].con = null;
}
}
} catch (SQLException ex) {
reportSQLExceptions(ex);
}
}
return retcode;
}
/**
* This class is a thread to process a single order.
* Through the threadParm object, it returns
* 0 on success, 1 on failure, 2 on abort signal received
*/
class ProcessOrderThread extends Thread
{
private int tid;
private Connection con;
private ThreadParameter threadParm;
private PreparedStatement pInsOrd;
private PreparedStatement pInsItm;
private PreparedStatement pUpdInv;
private PreparedStatement pSelInv;
/**
* Constructs the <code>ProcessOrderThread</code>
*
* @param threadParm the connection to the data store and its
* associated prepared statements.
*/
ProcessOrderThread(ThreadParameter threadParm)
{
this.threadParm = threadParm;
this.tid = threadParm.tid;
this.con = threadParm.con;
this.pInsOrd = threadParm.pInsOrd;
this.pInsItm = threadParm.pInsItm;
this.pUpdInv = threadParm.pUpdInv;
this.pSelInv = threadParm.pSelInv;
}
/**
* Processes a single order
*/
public void run()
{
int retcode = 1; // Return code: Assume failure
try {
outStream.println("\n[" + tid + "] Processing data: " + threadParm.orderLine);
String[] fields = threadParm.orderLine.split(",");
pInsOrd.setInt(1, Integer.parseInt(fields[0])); // cust_num
pInsOrd.setTimestamp(2, Timestamp.valueOf(fields[1])); // when_placed
pInsOrd.setNull(3, java.sql.Types.TIMESTAMP); // when_shipped
pInsOrd.setNull(4, java.sql.Types.VARCHAR); // notes
pInsOrd.executeUpdate();
reportSQLWarnings(pInsOrd.getWarnings());
// Process all the products that were ordered on the single line.
int numOrders = 0;
boolean skipToNextOrder = false;
for (int i = 2; i < fields.length; i += 2) {
int prodNum = Integer.parseInt(fields[i]);
int itemCount = Integer.parseInt(fields[i + 1]);
// Get the current inventory count for this product
pSelInv.setInt(1, prodNum);
ResultSet rs = pSelInv.executeQuery();
int invCount = 0;
if (rs.next()) {
invCount = rs.getInt(1);
} else {
errStream.println("[" + tid + "] Error: No inventory for pdocut number " +
prodNum);
skipToNextOrder = true;
break; // Break out of inner for loop
}
sleep(1000);
// Is there enough to fill the order?
if (itemCount <= invCount) {
// Yes - insert the row into the ORDER_ITEM table
pInsItm.setInt(1, prodNum); // prod_num
pInsItm.setInt(2, itemCount); // quantity
pInsItm.executeUpdate();
// Report any SQLWarnings on the connection
reportSQLWarnings(pInsItm.getWarnings());
CheckIfStopIsRequested();
// Debit the inventory
pUpdInv.setInt(1, prodNum); // quantity
pUpdInv.setInt(2, itemCount); // prod_num
pUpdInv.executeUpdate();
// Report any SQLWarnings on the connection
reportSQLWarnings(pInsItm.getWarnings());
} else {
errStream.println("[" + tid + "] Warning: Inventory for product number " +
prodNum + " is " + invCount + " Count. " +
"\n\tCannot fill order for " + itemCount);
skipToNextOrder = true;
break; // Break out of for loop
}
} // end of for()
numOrders++;
/*
* Finished processing a single line.
* Now, rollback or commit?
*/
if (skipToNextOrder || numOrders == 0) {
if (skipToNextOrder == false) {
errStream.println("[" + tid + "] Warning: No orders for any products were given.");
}
retcode = 1;
// Rollback the order line
outStream.println("[" + tid + "] Rolling back last transaction due to inability to process order");
con.rollback();
} else {
// Commit the order
con.commit();
}
retcode = 0;
} catch (SQLException ex) {
if (ex.getSQLState().equalsIgnoreCase("S0002")) {
errStream.println("\n[" + tid + "] Error: The table customer does not exists.\n\tPlease run ttIsql -f input0.dat to initialize the database.");
} else if (ex.getErrorCode() == 907) {
errStream.println("\n[" + tid + "] Error: Attempting to insert a row with a duplicate primary key.\n\tPlease rerun ttIsql -f input0.dat to reinitialize the database.");
} else {
reportSQLExceptions(ex);
}
// Fall through to exiting thread
} catch (InterruptedException ex) {
retcode = 2;
errStream.println(ex.getMessage());
// Fall through to exiting thread
} finally {
threadParm.returnValue = retcode;
}
} // end of run()
}
/**
* Dump a chain of SQLException objects
*
* @param ex SQLException object
*
* @return A count of the exceptions processed
*/
static int reportSQLExceptions(SQLException ex)
{
int errCount = 0;
if (ex != null) {
errStream.println("\n--- SQLException caught ---");
ex.printStackTrace();
while (ex != null) {
errStream.println("SQL State: " + ex.getSQLState());
errStream.println("Message: " + ex.getMessage());
errStream.println("Vendor Code: " + ex.getErrorCode());
errCount++;
ex = ex.getNextException();
errStream.println();
}
}
return errCount;
}
/**
* Dump a chain of SQLWarning objects
*
* @param wn SQLWarning object
*
* @return A count of the warnings processed
*/
static int reportSQLWarnings(SQLWarning wn)
{
int warnCount = 0;
while (wn != null) {
errStream.println("\n--- SQL Warning ---");
// is this a SQLWarning object or a DataTruncation object?
errStream.println("SQL State: " + wn.getSQLState());
errStream.println("Message: " + wn.getMessage());
errStream.println("Vendor Code: " + wn.getErrorCode());
if (wn instanceof DataTruncation) {
DataTruncation trn = (DataTruncation) wn;
errStream.println("Truncation error in column: " + trn.getIndex());
}
warnCount++;
wn = wn.getNextWarning();
errStream.println();
}
return warnCount;
}
/**
* Check if a "signal" is pending - i.e. the shutdown hook
* thread has been entered.
* @return true if a "signal" is pending.
*/
private static boolean IsSignalPending()
{
return (hasReceivedSignal == true);
}
/**
* Calls IsSignalPending() and throws an exception if true.
* @return false if a "signal" is not pending.
* @exception InterruptedException if a signal is pending
*/
private static boolean CheckIfStopIsRequested()
throws InterruptedException
{
if (IsSignalPending() == true) {
throw new InterruptedException("\nWarning: Stop Requested. Aborting!");
}
return false;
}
}
/* Emacs variable settings */
/* Local Variables: */
/* tab-width:8 */
/* indent-tabs-mode:nil */
/* c-basic-offset:2 */
/* End: */