Getting started with OpenJMS on Unix
This FlashGuideTM will get you started with OpenJMS. OpenJMS implements the JMS 1.0.2 Specification. It is sponsored by the Exolab Project and is licensed under their license.
1. Installing OpenJMS on UNIX
- Download the latest OpenJMS binary from http://openjms.sourceforge.net/downloads.html. Currently, the version is 0.7.6.1.
- Install OpenJMS by unzipping/untaring the download file and placing in the desired directory (I used /usr/local)
cd /usr/local
tar zxf ./openjms-0.7.6.1.tar.gz
- Note the location of your OpenJMS installation - we will refer to this as $OPENJMS_HOME
- Optionally, save time on typing by creating a symbolic link like this:
ln -s openjms-0.7.6.1 openjms
2. Configuring OpenJMS
- In this exercise, we are going to configure OpenJMS for PostgreSQL-based persistence, TCP connectivity, and set up our own topic for testing publish/subscribe functions.
- Cd to the OpenJMS installation directory:
cd $OPENJMS_HOME
- Edit the OpenJMS configuration file, config/openjms.xml. Here is an example:
<?xml version="1.0"?>
<Configuration>
<ServerConfiguration host="localhost" embeddedJNDI="true" />
<Connectors>
<Connector scheme="tcp">
<ConnectionFactories>
<QueueConnectionFactory name="JmsQueueConnectionFactory" />
<TopicConnectionFactory name="JmsTopicConnectionFactory" />
</ConnectionFactories>
</Connector>
<Connector scheme="rmi">
<ConnectionFactories>
<QueueConnectionFactory name="JmsQueueConnectionFactory" />
<TopicConnectionFactory name="JmsTopicConnectionFactory" />
</ConnectionFactories>
</Connector>
</Connectors>
<DatabaseConfiguration>
<RdbmsDatabaseConfiguration driver="org.postgresql.Driver"
user="postgres" password=""
url="jdbc:postgresql://localhost:5432/mydb"/>
</DatabaseConfiguration>
<AdminConfiguration script="${openjms.home}\bin\startup.bat" />
<AdministeredDestinations>
<AdministeredTopic name="test-topic">
<Subscriber name="test-client1" />
<Subscriber name="test-client2" />
</AdministeredTopic>
</AdministeredDestinations>
<Users>
<User name="admin" password="openjms" />
</Users>
</Configuration>
This configuration file adds a TCP connector in the <Connectors> section. This means we can test connecting with TCP as well as RMI.
You may have to change the PostgreSQL parameters to match your PostgreSQL installation. If you don't have PostgreSQL installed, you can use my FlashGuide: Getting started with PostgreSQL on Linux".
Instead of the default topics & queues that come preconfigured in OpenJMS, we have defined a topic called "test-topic" with two subscribers: "test-client1" and "test-client2".
- Copy the PostgreSQL JDBC driver jar to $OPENJMS_HOME/lib:
cp $POSTGRESQL_HOME/share/java/postgresql.jar $OPENJMS_HOME/lib
- Add the PostgreSQL JDBC driver jar to the OpenJMS classpath. To do this, we need to edit $OPENJMS_HOME/bin/setenv.sh. Find the line, commented out by default, for setting the JDBC driver:
# Configure the JDBC driver
#
CLASSPATH=../lib/postgresql.jar
- Create the OpenJMS tables in the database using OpenJMS's dbtool:
cd $OPENJMS_HOME/bin
./dbtool.sh -create -config ../config/openjms.xml
This command should return a success message. If not, verify that the tables are there ("messages", "message_handles", "consumers", "destinations", "seeds" and "users"). If the tables are not there, find the SQL script in $OPENJMS_HOME/config/db and run it manually.
3. Testing OpenJMS
- Check your prerequisites:
- Make sure you have a Java Development Kit (JDK) installed, and the environment variable $JAVA_HOME set to the JDK installation directory. You should also have the $JAVA_HOME/bin directory in your $PATH. (For Windows users, make sure you have %JAVA_HOME set and %JAVA_HOME\bin in your %PATH
- Make sure you have Ant 1.5.1 or greater installed and $ANT_HOME set. If not, follow these steps:
Set up a build environment for the test. We will be creating three source files, one configuration file and one shell script. We'll call the development directory that contains all our files $DEVEL_HOME.
Create the following, in some build area:
cd ~
mkdir openjmstest
export DEVEL_HOME=~/openjmstest
cd openjmstest
mkdir classes etc lib src
Create an Ant buildfile a $DEVEL_HOME/build.xml:
<project default="build" basedir=".">
<property name="openjms_home" value="/usr/local/openjms"/>
<path id="cp">
<pathelement path="${java.class.path}"/>
<fileset dir="${openjms_home}/lib">
<include name="*.jar"/>
</fileset>
<fileset dir="lib">
<include name="*.jar"/>
</fileset>
</path>
<target name="build">
<javac srcdir="src" destdir="classes"
debug="on" optimize="off" deprecation="off">
<classpath refid="cp"/>
</javac>
<jar jarfile="lib/jmstests.jar">
<fileset dir="classes"
includes="**/*.class"/>
</jar>
</target>
</project>
You may have to change the location of the "openjms_home" property.
Create the OpenJMS property file, as $DEVEL_HOME/etc/openjms.props:
tcp-url=tcp://localhost:3035
rmi-url=rmi://localhost:1099
ctx-factory=org.exolab.jms.jndi.InitialContextFactory
jms-factory=JmsTopicConnectionFactory
mode=topic
access-mode=tcp
transact-mode=false
ack-mode=1
durability=true
subject=test-topic
default-client-id=test-client1
Create a class that encapsulates a connection to OpenJMS. This class will be used by both the sender and receiver. Save the following as $DEVEL_HOME/src/OpenJMSConnection.java:
package test.jms;
import java.util.Hashtable;
import java.util.Properties;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
public class OpenJMSConnection {
private TopicConnection connection = null;
private TopicSession session = null;
private Topic topic = null;
private boolean ready = false;
public OpenJMSConnection() {
}
public boolean init(Properties props) {
try {
Hashtable ctxProps = new Hashtable();
ctxProps.put(Context.INITIAL_CONTEXT_FACTORY,
props.getProperty("ctx-factory"));
String access = props.getProperty("access-mode");
if (access.equals("rmi")) {
ctxProps.put(Context.PROVIDER_URL,
props.getProperty("rmi-url"));
} else if (access.equals("tcp")) {
ctxProps.put(Context.PROVIDER_URL,
props.getProperty("tcp-url"));
} else {
System.err.println("Unknown access mode: " + access);
return false;
}
Context context = new InitialContext(ctxProps);
TopicConnectionFactory factory =
(TopicConnectionFactory) context.lookup(
props.getProperty("jms-factory"));
connection = factory.createTopicConnection();
if (context == null) {
System.err.print("Unable to retrieve context");
return false;
}
connection.start();
session = connection.createTopicSession(
Boolean.valueOf(props.getProperty("transact-mode")).booleanValue(),
Integer.parseInt(props.getProperty("ack-mode")));
topic = (Topic)context.lookup(props.getProperty("subject"));
if (topic == null)
topic = session.createTopic(props.getProperty("subject"));
System.out.println("Topic is " + topic.getTopicName());
ready = true;
} catch (Exception e) {
}
return ready;
}
public void close() {
try {
System.err.println("Before close session");
if (session != null) session.close();
} catch (Exception e1) { }
try {
System.err.println("Before close connection");
if (connection != null) connection.close();
} catch (Exception e2) { }
}
public Connection getConnection() {
return connection;
}
public TopicSession getSession() {
return session;
}
public Topic getTopic() {
return topic;
}
}
Create a class for the sender (publisher). Save the following as $DEVEL_HOME/src/OpenJMSTestSender.java:
package test.jms;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.util.Hashtable;
import java.util.Properties;
import javax.jms.DeliveryMode;
import javax.jms.TextMessage;
import javax.jms.TopicSession;
import javax.jms.TopicPublisher;
public class OpenJMSTestSender {
private OpenJMSConnection conn = null;
private TopicPublisher publisher;
private Properties props = new Properties();
public OpenJMSTestSender() {
}
public boolean init() {
String propsFile = System.getProperty("config");
if (propsFile == null || propsFile.equals("")) {
System.err.println("Error: no configuration file specified");
return false;
}
try {
props.load(new FileInputStream(propsFile));
} catch (Exception e) {
System.err.println("Unable to load configuration from: " +
propsFile);
return false;
}
conn = new OpenJMSConnection();
if (!conn.init(props)) return false;
try {
TopicSession session = conn.getSession();
publisher = session.createPublisher(conn.getTopic());
boolean durable =
Boolean.valueOf(props.getProperty("durability")).booleanValue();
if (durable) {
publisher.setDeliveryMode(DeliveryMode.PERSISTENT);
} else {
publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
} catch (Exception e2) {
System.err.println("Unable to start publisher: " +
e2.toString());
return false;
}
System.out.println("OpenJMSTestSender is waiting ...\n");
return true;
}
public void run() {
try {
String input = "";
BufferedReader is = new BufferedReader(
new InputStreamReader(System.in));
while (true) {
System.out.println();
System.out.print("Enter message: ");
input = is.readLine();
if (input.equals("") || input.compareToIgnoreCase("quit") == 0)
break;
TextMessage msg = conn.getSession().createTextMessage();
msg.setText(input);
publisher.publish(msg);
}
} catch (Exception e) {
}
try {
publisher.close();
} catch (Exception e2) { }
conn.close();
}
public static void main(String[] args) {
OpenJMSTestSender ts = new OpenJMSTestSender();
if (ts.init()) ts.run();
}
}
Create a class for the receiver (consumer). Save the following as $DEVEL_HOME/src/OpenJMSTestReceiver.java:
package test.jms;
import java.io.FileInputStream;
import java.util.Hashtable;
import java.util.Properties;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ExceptionListener;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
public class OpenJMSTestReceiver implements MessageListener, ExceptionListener {
private Properties props = new Properties();
private OpenJMSConnection conn = null;
private TopicSubscriber subscriber;
public OpenJMSTestReceiver() {
}
public boolean init() {
String propsFile = System.getProperty("config");
if (propsFile == null || propsFile.equals("")) {
System.err.println("Error: no configuration file specified");
return false;
}
try {
props.load(new FileInputStream(propsFile));
} catch (Exception e) {
System.err.println("Unable to load configuration from: " +
propsFile);
return false;
}
conn = new OpenJMSConnection();
String name = System.getProperty("name");
if (name == null || name.equals(""))
name = props.getProperty("default-client-id");
System.out.println("My name is " + name);
if (!conn.init(props)) return false;
try {
TopicSession session = (TopicSession)conn.getSession();
boolean durable =
Boolean.valueOf(props.getProperty("durability")).booleanValue();
if (durable) {
subscriber = conn.getSession().createDurableSubscriber(
conn.getTopic(), name, null, false);
} else {
subscriber = session.createSubscriber(conn.getTopic(), null,
false);
}
subscriber.setMessageListener(this);
conn.getConnection().setExceptionListener(this);
} catch (Exception e2) {
System.err.println("Unable to register listeners: " +
e2.toString());
return false;
}
System.out.println("OpenJMSTestReceiver is waiting ...");
return true;
}
public void onMessage(Message message) {
try {
if (message instanceof TextMessage) {
String txt = ((TextMessage)message).getText();
System.out.println("RECEIVED: " + txt);
if (txt.indexOf("SHUTDOWN") > -1) {
System.err.println("Received shutdown message ...");
subscriber.setMessageListener(null);
Thread shutdown = new Thread() {
public void run() {
OpenJMSTestReceiver.this.shutdown();
}
};
shutdown.start();
}
}
} catch (Exception e) {
}
}
public void onException(JMSException exception) {
System.err.println("Received onException notification");
shutdown();
}
public void shutdown() {
System.err.println("Shutting down receiver ...");
boolean durable =
Boolean.valueOf(props.getProperty("durability")).booleanValue();
conn.close();
System.exit(1);
}
public static void main(String[] args) {
OpenJMSTestReceiver tr = new OpenJMSTestReceiver();
tr.init();
}
}
Notice that the special message containing the string "SHUTDOWN" will cause the client to stop. Typically, you cannot stop the receiver from the onMessage method, and so we create a shutdown Thread to do the job for us.
Create the startup script that we'll use for both the sender and receiver. Save the following as $DEVEL_HOME/openjmstest.sh:
#!/bin/sh
_OPENJMS_HOME=/usr/local/openjms
_CLASSPATH=
_CLIENT_NAME="$2"
for i in "$_OPENJMS_HOME"/lib/*.jar; do
_CLASSPATH="$_CLASSPATH":"$i"
done
for i in lib/*.jar; do
_CLASSPATH="$_CLASSPATH":"$i"
done
send() {
java -Dconfig=etc/openjms.props -classpath "$_CLASSPATH" \
test.jms.OpenJMSTestSender
}
receive() {
java -Dconfig=etc/openjms.props -Dname="$_CLIENT_NAME" -classpath "$_CLASSPATH" \
test.jms.OpenJMSTestReceiver
}
case "$1" in
'send')
send
;;
'receive')
receive
;;
*)
echo "Usage: $0 { send | receive }"
exit 1
;;
esac
Change the location of _OPENJMS_HOME if necessary.
Make sure the script is executable.
Compile the sources. This will create the file $DEVEL_HOME/lib/jmstests.jar:
$ cd $DEVEL_HOME
$ ant build
Start OpenJMS:
$ cd $OPENJMS_HOME/bin
$ ./openjms.sh start &
Publish some messages. Start the sender and keep entering text. If you enter the string "SHUTDOWN", the receiver will shut itself down gracefully after receiving all messages (this is not recommended in production, of course). A blank line, or the word "quit" or "exit" will stop the sender.
$ cd $DEVEL_HOME
$ ./openjmstest.sh send
Topic is test-topic
OpenJMSTestSender is waiting ...
Enter message: Hi there
Enter message: Hope you get this!
Enter message: Bye now
Enter message: SHUTDOWN
Enter message:
Before close session
Before close connection
$
Since we have created durable subscriptions, the messages you send will be stored in the database tables. Go ahead and look for yourself in "messages" and "message_handles". Remember, from $OPENJMS_HOME/config/openjms.xml that we defined two subscribers. These messages will remain in the database waiting until BOTH clients have received the messages.
Now run the first client and get some messages:
$ cd $DEVEL_HOME
$ ./openjmstest.sh receive test-client1
My name is test-client1
Topic is test-topic
OpenJMSTestReceiver is waiting ...
RECEIVED: Hi there
RECEIVED: Hope you get this!
RECEIVED: Bye now
RECEIVED: SHUTDOWN
Received shutdown message ...
Shutting down receiver ...
Before close session
Before close connection
$
Now run the second client and get some messages:
$ cd $DEVEL_HOME
$ ./openjmstest.sh receive test-client2
My name is test-client2
Topic is test-topic
OpenJMSTestReceiver is waiting ...
RECEIVED: Hi there
RECEIVED: Hope you get this!
RECEIVED: Bye now
RECEIVED: SHUTDOWN
Received shutdown message ...
Shutting down receiver ...
Before close session
Before close connection
$
Now, if you are curious, you can go to the database and verify that there are no more rows in "messages" or "message_handles".
By modifying the properties in $DEVEL_HOME/etc/openjms.props, you can use the RMI connection, non-durable connections, queues, etc. Enjoy!
When you are done, shut down OpenJMS:
$ cd $OPENJMS_HOME/bin
$ ./openjms.sh stop
Back to Table of Contents
|