【Java开源代码栏目提醒】:网学会员为需要Java开源代码的朋友们搜集整理了TopicConsumer.java相关资料,希望对各位网友有所帮助!
package ejava.messagech22.jms;
import javax.naming.InitialContext;
import javax.naming.Context;
import javax.naming.NamingException;
import java.util.Properties;
import java.util.Hashtable;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.jms.Topic;
import javax.jms.TextMessage;
import javax.jms.ObjectMessage;
import javax.jms.JMSException;
import javax.jms.Session;
import java.io.Serializable;
import javax.jms.MessageListener;
import javax.jms.Message;
import javax.jms.StreamMessage;
import javax.jms.BytesMessage;
import java.io.IOException;
import java.io.FileInputStream;
import javax.jms.DeliveryMode;
import java.util.Date;
/**
* This example shows how to use JMS,
* in Topic mode and receive messages from the Topic.
*/
public class TopicConsumer implements MessageListener
{
private TopicConnectionFactory topicConnectionFactory;
private TopicConnection topicConnection;
private TopicSession topicSession;
private TopicSubscriber topicSubscriber;
private Topic topic;
private Properties properties;
public boolean quitFromReceiving = false;
/**
* Constructor , it creates all the needed Objects to make receive
* messages.
*/
public TopicConsumer(Properties defaultProperties, Context context)
throws NamingException, JMSException
{
properties = defaultProperties;
String jmsFactoryName = (String)properties.get("JMS_FACTORY_FOR_TOPIC");
// Create Queue Connection Factory
topicConnectionFactory = (TopicConnectionFactory)
context.lookup(jmsFactoryName);
// Create Queue Connection to The Factory
topicConnection = topicConnectionFactory.createTopicConnection();
// Create Session to the Connection
topicSession = topicConnection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
String topicName = (String)properties.get("TOPIC_NAME");
try {
topic = (Topic) context.lookup(topicName);
} catch (NamingException namingException) {
// create topic, if not created
topic = topicSession.createTopic(topicName);
// bind the topic with a name
context.bind(topicName, topic);
}
// Create Receiver
topicSubscriber = topicSession.createSubscriber(topic);
// Register TopicClient as Message Listener
topicSubscriber.setMessageListener(this);
// Start Receiving Message
topicConnection.start();
// start receiving
}
/**
* implements onMessage , the onMessage will be invoked by the Queue
* whenever it need to send the message to the receiver.
*/
public void onMessage(Message message)
{
try {
// print information about the message
System.out.println(message.getJMSMessageID());
System.out.println("MessageID :" + message.getJMSMessageID() +
" for " + message.getJMSDestination());
System.out.print("Message Expires ");
if (message.getJMSExpiration() > 0) {
System.out.println( new Date( message.getJMSExpiration()));
}
else{
System.out.println("never Exprires ");
}
System.out.println("Priority :" + message.getJMSPriority());
System.out.println("Mode : " + (
message.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ?
"PERSISTENT" : "NON_PERSISTENT"));
System.out.println("Reply to : " + message.getJMSReplyTo());
System.out.println("Message type : " + message.getJMSType());
if (message instanceof TextMessage) {
String receivedMessage = ((TextMessage)message).getText();
System.out.println("Received Message :" + receivedMessage);
} else if(message instanceof ObjectMessage){
String receivedMessage = message.toString();
System.out.println("Received Message :" + receivedMessage);
} else if(message instanceof StreamMessage){
String receivedMessage = message.toString();
System.out.println("Received Message :" + receivedMessage);
} else if(message instanceof BytesMessage){
String receivedMessage = message.toString();
System.out.println("Received Message :" + receivedMessage);
}
quitFromReceiving = true;
notifyAll(); // notify to main Thread to leave from the program
} catch (JMSException jmsException) {
jmsException.printStackTrace();
}
}
/**
* Close JMS Reletated Object.
*/
public void close()
throws JMSException
{
topicSubscriber.close();
this.topicSession.close();
this.topicConnection.close();
}
/**
* it returns the initial Context to the Queue Service
*/
pr