Interacting with JMS Queue and Topic from Java SE image53

Interacting with JMS Queue and Topic from Java SE

This article is just a quick post of some code I want to have easy access to. It runs in Java SE – outside any container in a stand alone JVM. It creates a connection with a JMS Queue. One class sends messages to the Queue, the other class registers as a listener and consumes messages from a different queue.

I have created the code in JDeveloper. It runs stand-alone and connects to a WebLogic Server where the JMS Queues (and JMS Server, JMS Module and JMS Connection Factory) have been created. (blog article http://blog.soasuitehandbook.org/setup-for-jms-resources-in-weblogic-chapter-6/ provides an example of how JMS resources are configured on WebLogic)

image

The project has two libraries associated with it: Java EE and WebLogic Remote Client.

image

 

The JDeveloper application TemperatureMonitoring (created for a Stream Explorer/Event Processing demonstration) contains two projects that each contain a single class. One project is HotRoomAlertProcessor with class HotRoomAlertProcessor that registers as a listener to the HotRooms queue. Any message received on that queue is reported to the console.

The second project is TemperatureSensors. It contains class TemperatureSensorSignalPublisher. This class generates temperature values (in Celsius!) for a number of rooms, and publishes these to the queue temperatureMeasurements. At some random point, the class will start a fire in a randomly selected room. In this room, temperatures will soon be over 100 degrees.

Class TemperatureSensorSignalPublisher, publishing to the JMS Queue:

package nl.amis.temperature;

import java.util.Hashtable;

import java.util.Random;

import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;

import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;


public class TemperatureSensorSignalPublisher {
    public final static String JNDI_FACTORY = "weblogic.jndi.WLInitialContextFactory";
    public final static String JMS_FACTORY = "jms/handson-jms-connectionFactory";
    public final static String QUEUE = "jndi/temperatureMeasurements";
    private QueueConnectionFactory qconFactory;
    private QueueConnection qcon;
    private QueueSession qsession;
    private MessageProducer qproducer;
    private Queue queue;

    private static final int SLEEP_MILLIS = 100;
       private static Random rand = new Random();
       private boolean suspended;
       private int index = 0;

       public static int randInt(int min, int max) {
           // NOTE: Usually this should be a field rather than a method
           // variable so that it is not re-seeded every call.     
           // nextInt is normally exclusive of the top value,
           // so add 1 to make it inclusive
           int randomNum = rand.nextInt((max - min) + 1) + min;
           return randomNum;
       }

        public void run() {
            System.out.println("Started Producing Temperature Signals to "+QUEUE);
            suspended = false;
            while (!isSuspended()) { // Generate messages forever...
                generateTemperatureSensorSignal();
                try {
                    synchronized (this) {
                        wait(randInt(SLEEP_MILLIS/2, SLEEP_MILLIS*2));
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

        /* (non-Javadoc)
         * @see com.bea.wlevs.ede.api.SuspendableBean#suspend()
         */
        public synchronized void suspend() {
            suspended = true;
        }

        private synchronized boolean isSuspended() {
            return suspended;
        }

    String[] rooms = new String[]{"Cafeteria","Kitchen","Reception","Meetingroom One","CEO Office","Lounge Area","Office Floor A"};
    boolean onFire=false;
    int roomOnFireIndex ;

    private void generateTemperatureSensorSignal() {
        // determine roomId
        int roomIndex = randInt(1, rooms.length)-1;
        
        // determine if one room should be set on fire
        if (!onFire) {
            // chance of 1:500 that a fire is started
            onFire = randInt(1,50) < 2;
            if (onFire){
              roomOnFireIndex = roomIndex;
              System.out.println("Fire has started in room "+ rooms&#91;roomOnFireIndex&#93;);
            }
        }        
        // determine temperatureValue
        float temperature = randInt(160, 230)/11;
        if (onFire && roomIndex == roomOnFireIndex) {           
            temperature = temperature + randInt(90, 150);
        }
        publish(rooms&#91;roomIndex&#93;, temperature);        
    }


    public void publish(String roomId, Float temperature) {
        try {
            MapMessage message = qsession.createMapMessage();
            message.setString("RoomId", roomId);
            message.setFloat("Temperature", temperature);
            qproducer.send(message);
            //System.out.println("- Delivered: "+temperature+" in "+roomId);
        } catch (JMSException jmse) {
            System.err.println("An exception occurred: " + jmse.getMessage());
        }
    }

    public void init(Context ctx, String queueName)
        throws NamingException, JMSException
    {
        qconFactory = (QueueConnectionFactory) ctx.lookup(JMS_FACTORY);
        qcon = qconFactory.createQueueConnection();
        qsession = qcon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
        queue = (Queue) ctx.lookup(queueName);
        qproducer = qsession.createProducer(queue);
    }

    public void close() throws JMSException {
        qsession.close();
        qcon.close();
    }

    public static void main(String&#91;&#93; args) throws Exception {
        InitialContext ic = getInitialContext();
        TemperatureSensorSignalPublisher qr = new TemperatureSensorSignalPublisher();
        qr.init(ic, QUEUE);
        qr.run();
        qr.close();
    }

    private static InitialContext getInitialContext()
        throws NamingException    {
        Hashtable<String, String> env = new Hashtable<String, String>();
        env.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY);
        env.put(Context.PROVIDER_URL, "t3://localhost:7101");
        return new InitialContext(env);

    }
    
}

Class HotRoomAlertProcessor  consumes messages from a second JMS Queue:

package nl.amis.temperature;

import java.util.Enumeration;
import java.util.Hashtable;

import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;

import javax.jms.QueueReceiver;
import javax.jms.QueueSession;

import javax.jms.Session;

import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;


public class HotRoomAlertProcessor implements MessageListener {
    public final static String JNDI_FACTORY = "weblogic.jndi.WLInitialContextFactory";
    public final static String JMS_FACTORY = "jms/handson-jms-connectionFactory";
    public final static String QUEUE = "jndi/hotRooms";
    private QueueConnectionFactory qconFactory;
    private QueueConnection qcon;
    private QueueSession qsession;
    private QueueReceiver qreceiver;
    private Queue queue;
    private boolean quit = false;

    public void onMessage(Message msg)     {
        try {
            if (msg instanceof MapMessage) {
                MapMessage mess = ((MapMessage) msg);
//                Enumeration enumeration = mess.getMapNames();
//                while (enumeration.hasMoreElements()) {
//                    System.out.println(enumeration.nextElement());
//                }
                System.out.println("Room On Fire: " + mess.getString("RoomId"));
                System.out.println("Last Measured Temperature: " + mess.getFloat("AverageTemperature"));
            }
        } catch (JMSException jmse) {
            System.err.println("An exception occurred: " + jmse.getMessage());
        }
    }

    public void init(Context ctx, String queueName)
        throws NamingException, JMSException     {
        qconFactory = (QueueConnectionFactory) ctx.lookup(JMS_FACTORY);
        qcon = qconFactory.createQueueConnection();
        qsession = qcon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
        queue = (Queue) ctx.lookup(queueName);
        qreceiver = qsession.createReceiver(queue);
        qreceiver.setMessageListener((MessageListener) this);
        qcon.start();
    }

    public void close() throws JMSException    {
        qreceiver.close();
        qsession.close();
        qcon.close();
    }

    public static void main(String[] args) throws Exception {
        InitialContext ic = getInitialContext();
        HotRoomAlertProcessor qr = new HotRoomAlertProcessor();
        qr.init(ic, QUEUE);
        System.out.println("JMS Ready To Receive Messages (To quit, send a \"quit\" message).");
        synchronized (qr) {
            while (!qr.quit) {
                try {
                    qr.wait();
                } catch (InterruptedException ie) {
                }
            }
        }
        qr.close();
    }

    private static InitialContext getInitialContext()
        throws NamingException
    {
        Hashtable<String, String> env = new Hashtable<String, String>();
        env.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY);
        env.put(Context.PROVIDER_URL, "t3://localhost:7101/");
        return new InitialContext(env);
    }
}

Here is some output from the second class:

image