将合用(pooling)与循环组合起来使用,这样可以确保很少创建新线程,而是重用线程。但是,那个过程相当复杂,过多的说明会分散本文的中心主题 ― JMS。)
在独立线程中,轮询程序接着从连接池中获取 JMSConnection,用它来创建一个 BytesMessage,并将这个文件的二进制内容放入那个消息中。最后这个消息查找到接收器,并发送到 JMS 服务器,接着将 JMSConnection 返回给 ConnectionPool。这个发送过程的部分步骤显示在下面的图 2 中。
图 2. 发送器过程
接收器是一个较简单的组件;它启动一些 FileListener 来等待将要放置在接收器队列中的消息。下面的清单 4 中的代码显示了 FileListener 设置处理过程。图 6 中的类实际上负责从队列中检索消息并对它们进行存档。JMS 保证队列发送每个消息的次数仅一次,所以我们可以安全启动许多不同的 FileListener 线程并且知道每个消息(因此每个文件)只处理一次。这个保证是使用基于 JMS 解决方案的另一个重要优点。在自己设计的解决方案中开发这样的功能(比如基于 FTP 的功能),花销很大且易出错。
清单 4:来自类 ar.jms.file.receive.FileListener
public void startOn(Queue queue) {
setQueue(queue);
createConnection();
try {
createSession();
createReceiver();
getConnection().start(); // this starts
the queue listener
} catch (JMSException exception) {
// Handle the exception
}
}
public void createReceiver() throws javax.jms.JMSException {
try {
QueueReceiver receiver = getSession().
createReceiver(getQueue());
receiver.setMessageListener(this);
} catch (JMSException exception) {
// Handle the exception
}
}
public void createSession() throws JMSException {
setSession(getConnection().
createQueueSession(false, Session.AUTO_ACKNOWLEDGE));
}
public void createConnection() {
while (!hasConnection()) {
try {
setConnection(getClient().createConnection());
} catch (JMSException exception) {
// Connections drop periodically on the
internet, log and try again.
try {
Thread.sleep(2000);
} catch
(java.lang.InterruptedException ignored) {
}
}
}
}
如何获取消息?使用JMS技术作为数据复制的解决方案(5)
时间:2011-01-25 IBM Daniel Drasin
以回调的方式编写消息处理代码,回调是当将消息传递给 FileListener 时,JMS 自动调用的方法。这个消息的代码显示在下面的清单 5 中。
清单 5. 来自类 ar.jms.file.receive.FileListener
public void onMessage(Message message) {
BytesMessage byteMessage = ((BytesMessage) message);
OutputStream stream =
new BufferedOutputStream(
new FileOutputStream(getFilenameFor(message)));
byte[] buffer = new byte[getFileBufferSize()];
int length = 0;
try {
while ((length = byteMessage.readBytes(buffer)) != -1) {
stream.write(buffer, 0, length);
}
stream.close();
} catch (JMSException exception) {
// Handle the JMSException
} catch (IOException
|