001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.jdbc.adapter;
018
019 import java.io.IOException;
020 import java.io.InputStream;
021 import java.io.OutputStream;
022 import java.sql.Blob;
023 import java.sql.Connection;
024 import java.sql.PreparedStatement;
025 import java.sql.ResultSet;
026 import java.sql.SQLException;
027
028 import javax.jms.JMSException;
029
030 import org.apache.activemq.store.jdbc.TransactionContext;
031 import org.apache.activemq.util.ByteArrayOutputStream;
032
033 /**
034 * This JDBCAdapter inserts and extracts BLOB data using the getBlob()/setBlob()
035 * operations. This is a little more involved since to insert a blob you have
036 * to:
037 *
038 * 1: insert empty blob. 2: select the blob 3: finally update the blob with data
039 * value.
040 *
041 * The databases/JDBC drivers that use this adapter are:
042 * <ul>
043 * <li></li>
044 * </ul>
045 *
046 * @org.apache.xbean.XBean element="blobJDBCAdapter"
047 *
048 *
049 */
050 public class BlobJDBCAdapter extends DefaultJDBCAdapter {
051
052 public void doAddMessage(Connection c, long seq, String messageID, String destinationName, byte[] data)
053 throws SQLException, JMSException {
054 PreparedStatement s = null;
055 ResultSet rs = null;
056 try {
057
058 // Add the Blob record.
059 s = c.prepareStatement(statements.getAddMessageStatement());
060 s.setLong(1, seq);
061 s.setString(2, destinationName);
062 s.setString(3, messageID);
063 s.setString(4, " ");
064
065 if (s.executeUpdate() != 1) {
066 throw new JMSException("Failed to broker message: " + messageID + " in container.");
067 }
068 s.close();
069
070 // Select the blob record so that we can update it.
071 s = c.prepareStatement(statements.getFindMessageStatement());
072 s.setLong(1, seq);
073 rs = s.executeQuery();
074 if (!rs.next()) {
075 throw new JMSException("Failed to broker message: " + messageID + " in container.");
076 }
077
078 // Update the blob
079 Blob blob = rs.getBlob(1);
080 OutputStream stream = blob.setBinaryStream(data.length);
081 stream.write(data);
082 stream.close();
083 s.close();
084
085 // Update the row with the updated blob
086 s = c.prepareStatement(statements.getUpdateMessageStatement());
087 s.setBlob(1, blob);
088 s.setLong(2, seq);
089
090 } catch (IOException e) {
091 throw (SQLException)new SQLException("BLOB could not be updated: " + e).initCause(e);
092 } finally {
093 try {
094 rs.close();
095 } catch (Throwable ignore) {
096 }
097 try {
098 s.close();
099 } catch (Throwable ignore) {
100 }
101 }
102 }
103
104 public byte[] doGetMessage(TransactionContext c, long seq) throws SQLException {
105 PreparedStatement s = null;
106 ResultSet rs = null;
107 try {
108
109 s = c.getConnection().prepareStatement(statements.getFindMessageStatement());
110 s.setLong(1, seq);
111 rs = s.executeQuery();
112
113 if (!rs.next()) {
114 return null;
115 }
116 Blob blob = rs.getBlob(1);
117 InputStream is = blob.getBinaryStream();
118
119 ByteArrayOutputStream os = new ByteArrayOutputStream((int)blob.length());
120 int ch;
121 while ((ch = is.read()) >= 0) {
122 os.write(ch);
123 }
124 is.close();
125 os.close();
126
127 return os.toByteArray();
128
129 } catch (IOException e) {
130 throw (SQLException)new SQLException("BLOB could not be updated: " + e).initCause(e);
131 } finally {
132 try {
133 rs.close();
134 } catch (Throwable ignore) {
135 }
136 try {
137 s.close();
138 } catch (Throwable ignore) {
139 }
140 }
141 }
142
143 }