Getting started with ActiveMQ on Unix
This FlashGuideTM will get you started with ActiveMQ. ActiveMQ implements the JMS 1.1 Specification. It is sponsored by Protique and is licensed under the Apache 2.0 license.
1. Installing ActiveMQ on UNIX
- Download the latest ActiveMQ binary from http://activemq.codehaus.org/Download. The current version is 2.0.
- Install ActiveMQ by unzipping/untaring the download file and placing in the desired directory (I used /usr/local)
cd /usr/local
tar zxf ./activemq-release-2.0.tar.gz
- Note the location of your ActiveMQ installation - we will refer to this as $ACTIVEMQ_HOME
- Optionally, save time on typing by creating a symbolic link like this:
ln -s activemq-release-2.0 activemq
2. Configuring ActiveMQ
- In this exercise, we are going to configure ActiveMQ for Berkeley DB Java Edition persistence and set up our own topic for testing publish/subscribe functions.
- Cd to the ActiveMQ installation directory:
cd $ACTIVEMQ_HOME
- Edit the ActiveMQ configuration file, conf/activemq.xml. Here is an example:
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE beans PUBLIC "-//ACTIVEMQ//DTD//EN" "http://activemq.codehaus.org/dtd/activemq.dtd">
<beans>
<broker>
<connector>
<tcpServerTransport uri="tcp://localhost:61616" backlog="1000" useAsyncSend="true" maxOutstandingMessages="50"/>
</connector>
<persistence>
<berkeleyDbPersistence directory="db/bdb"/>
<!--jdbmPersistence directory="db/jdbm"/-->
</persistence>
</broker>
</beans>
Note that the default JDBM persistence is commented out. You can always use that instead and use Apache Derby for persistence. Apache Derby comes built-in to ActiveMQ.
The "directory" attribute in either persistence tag is relative to the $ACTIVEMQ_HOME/bin directory.
- If you don't have it, download Berkely DB Java Edition from http://www.sleepycat.com/products/je.shtml.
- Copy the Berkeley DB jar to $ACTIVEMQ_HOME/lib:
cp $JE_HOME/lib/je.jar $ACTIVEMQ_HOME/lib
- If you want, it might be a good idea to turn on debugging for ActiveMQ. To do this, edit $ACTIVEMQ_HOME/conf/log4j.properties and change the first line to:
log4j.rootLogger=DEBUG, stdout, logfile
3. Testing ActiveMQ
- Check your prerequisites:
- Make sure you have a Java Development Kit (JDK) installed, and the environment variable $JAVA_HOME set to the JDK installation directory. You should also have the $JAVA_HOME/bin directory in your $PATH. (For Windows users, make sure you have %JAVA_HOME set and %JAVA_HOME\bin in your %PATH
- Make sure you have Ant 1.5.1 or greater installed and $ANT_HOME set. If not, follow these steps:
Set up a build environment for the test. We will be creating three source files, one configuration file and one shell script. We'll call the development directory that contains all our files $DEVEL_HOME.
Create the following, in some build area:
cd ~
mkdir activemqtest
export DEVEL_HOME=~/activemqtest
cd activemqtest
mkdir classes etc lib src
Create an Ant buildfile a $DEVEL_HOME/build.xml:
<project default="build" basedir=".">
<property name="activemq_home" value="/usr/local/activemq"/>
<path id="cp">
<pathelement path="${java.class.path}"/>
<fileset dir="${activemq_home}/lib">
<include name="*.jar"/>
</fileset>
<fileset dir="lib">
<include name="*.jar"/>
</fileset>
</path>
<target name="build">
<javac srcdir="src" destdir="classes"
debug="on" optimize="off" deprecation="off">
<classpath refid="cp"/>
</javac>
<jar jarfile="lib/activemqtest.jar">
<fileset dir="classes"
includes="**/*.class"/>
</jar>
</target>
</project>
You may have to change the location of the "activemq_home" property.
Create the ActiveMQ property file, as $DEVEL_HOME/etc/activemq.props:
jms-user=defaultUser
jms-pwd=defaultPassword
jms-url=tcp://localhost:61616
mode=topic
transact-mode=false
ack-mode=1
durability=true
subject=test-topic
consumer-id=ActiveMQClient
producer-id=ActiveMQProducer
consumer-name=Test
Create a class that encapsulates a connection to ActiveMQ. This class will be used by both the sender and receiver. Save the following as $DEVEL_HOME/src/AMQConnection.java:
package test.jms;
import org.codehaus.activemq.ActiveMQConnection;
import org.codehaus.activemq.ActiveMQConnectionFactory;
import java.util.Properties;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Session;
public class AMQConnection {
private Connection connection = null;
private Session session = null;
private Destination destination;
private boolean ready = false;
public AMQConnection() {
}
public boolean init(Properties props, String clientId) {
try {
ActiveMQConnectionFactory connectionFactory =
new ActiveMQConnectionFactory(props.getProperty("jms-user"),
props.getProperty("jms-pwd"),
props.getProperty("jms-url"));
Connection connection = connectionFactory.createConnection();
boolean durable =
Boolean.valueOf(props.getProperty("durability")).booleanValue();
if (durable) {
connection.setClientID(clientId);
}
connection.start();
session = connection.createSession(
Boolean.valueOf(props.getProperty("transact-mode")).booleanValue(),
Integer.parseInt(props.getProperty("ack-mode")));
String mode = props.getProperty("mode");
if (mode.equals("topic"))
destination = session.createTopic(props.getProperty("subject"));
else
destination = session.createQueue(props.getProperty("subject"));
ready = true;
} catch (Exception e) {
}
return ready;
}
public void close() {
try {
System.out.println("Closing session ...");
if (session != null) session.close();
} catch (Exception e1) {
System.err.println("AMQConnection.close: " + e1.toString());
}
try {
System.out.println("Closing connection ...");
if (connection != null) connection.close();
} catch (Exception e2) {
System.err.println("AMQConnection.close: " + e2.toString());
}
}
public Session getSession() {
return session;
}
public Destination getDestination() {
return destination;
}
}
Create a class for the sender (publisher). Save the following as $DEVEL_HOME/src/AMQTestSender.java:
package test.jms;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.util.Hashtable;
import java.util.Properties;
import javax.jms.DeliveryMode;
import javax.jms.MessageProducer;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.Session;
public class AMQTestSender {
private AMQConnection conn = null;
private MessageProducer producer;
public AMQTestSender() {
}
public boolean init() {
String propsFile = System.getProperty("config");
if (propsFile == null || propsFile.equals("")) {
System.err.println("Error: no configuration file specified");
return false;
}
Properties props = new Properties();
try {
props.load(new FileInputStream(propsFile));
} catch (Exception e) {
System.err.println("Unable to load configuration from: " +
propsFile);
return false;
}
conn = new AMQConnection();
if (!conn.init(props, props.getProperty("producer-id"))) return false;
try {
Session session = conn.getSession();
producer = session.createProducer(conn.getDestination());
boolean durable =
Boolean.valueOf(props.getProperty("durability")).booleanValue();
if (durable) {
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
} else {
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
} catch (Exception e2) {
System.err.println("Unable to start publisher: " +
e2.toString());
return false;
}
System.out.println("AMQTestSender is waiting ...\n");
return true;
}
public void run() {
BufferedReader is = null;
try {
String input = "";
is = new BufferedReader(new InputStreamReader(System.in));
while (true) {
System.out.println();
System.out.print("Enter message: ");
input = is.readLine();
if (input.equals("") || input.compareToIgnoreCase("quit") == 0 || input.compareToIgnoreCase("exit") == 0)
break;
TextMessage msg = conn.getSession().createTextMessage();
msg.setText(input);
producer.send(msg);
//Thread.sleep(0L);
}
} catch (Exception e) {
System.err.println("AMQTestSender.run: " + e.toString());
} finally {
try {
if (is != null) is.close();
} catch (Exception e2) { }
}
conn.close();
System.exit(1);
}
public static void main(String[] args) {
AMQTestSender ts = new AMQTestSender();
if (ts.init()) ts.run();
}
}
Create a class for the receiver (consumer). Save the following as $DEVEL_HOME/src/AMQTestReceiver.java:
package test.jms;
import java.io.FileInputStream;
import java.util.Hashtable;
import java.util.Properties;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import javax.jms.Session;
import javax.jms.Topic;
public class AMQTestReceiver implements MessageListener {
private Properties props = new Properties();
private AMQConnection conn = null;
private MessageConsumer consumer = null;
private ShutdownHook shutdownHook = null;
public AMQTestReceiver() {
}
public boolean init() {
String propsFile = System.getProperty("config");
if (propsFile == null || propsFile.equals("")) {
System.err.println("Error: no configuration file specified");
return false;
}
try {
props.load(new FileInputStream(propsFile));
} catch (Exception e) {
System.err.println("Unable to load configuration from: " +
propsFile);
return false;
}
conn = new AMQConnection();
if (!conn.init(props, props.getProperty("consumer-id"))) return false;
try {
Session session = conn.getSession();
boolean durable =
Boolean.valueOf(props.getProperty("durability")).booleanValue();
String mode = props.getProperty("mode");
if (durable && mode.equals("topic")) {
consumer = session.createDurableSubscriber(
(Topic)conn.getDestination(),
props.getProperty("consumer-name"));
} else {
consumer = session.createConsumer(conn.getDestination());
}
consumer.setMessageListener(this);
} catch (Exception e2) {
System.err.println("Unable to register listeners: " +
e2.toString());
return false;
}
System.out.println("AMQTestReceiver is waiting ...");
return true;
}
public void onMessage(Message message) {
try {
System.out.println("AMQTestReceiver: received " +
message.getJMSMessageID());
if (message instanceof TextMessage) {
String txt = ((TextMessage)message).getText();
System.out.println("\n" + txt);
if (txt.indexOf("SHUTDOWN") > -1) {
System.err.println("Received shutdown message ...");
consumer.setMessageListener(null);
Thread shutdown = new Thread() {
public void run() {
AMQTestReceiver.this.shutdown();
}
};
shutdown.start();
}
}
} catch (Exception e) {
}
}
public void shutdown() {
System.out.println("Shutting down receiver ...");
try {
if (shutdownHook != null)
Runtime.getRuntime().removeShutdownHook(shutdownHook);
} catch (Throwable t) { }
try {
consumer.close();
} catch (Exception e) { }
conn.close();
System.exit(1);
}
public static void main(String[] args) {
AMQTestReceiver tr = new AMQTestReceiver();
tr.init();
}
protected class ShutdownHook extends Thread {
public void run() {
AMQTestReceiver.this.shutdown();
}
}
}
Notice that the special message containing the string "SHUTDOWN" will cause the client to stop. Typically, you cannot stop the receiver from the onMessage method, and so we create a shutdown Thread to do the job for us.
Create the startup script that we'll use for both the sender and receiver. Save the following as $DEVEL_HOME/activemqtest.sh:
#!/bin/sh
_ACTIVEMQ_HOME=/usr/local/activemq
_CLASSPATH=
for i in "$_ACTIVEMQ_HOME"/lib/*.jar; do
_CLASSPATH="$_CLASSPATH":"$i"
done
for i in lib/*.jar; do
_CLASSPATH="$_CLASSPATH":"$i"
done
send() {
java -Dconfig=etc/activemq.props -classpath "$_CLASSPATH" test.jms.AMQTestSender
}
receive() {
java -Dconfig=etc/activemq.props -classpath "$_CLASSPATH" test.jms.AMQTestReceiver
}
case "$1" in
'send')
send
;;
'receive')
receive
;;
*)
echo "Usage: $0 { send | receive }"
exit 1
;;
esac
Change the location of _ACTIVEMQ_HOME if necessary.
Make sure the script is executable.
Compile the sources. This will create the file $DEVEL_HOME/lib/activemqtest.jar:
$ cd $DEVEL_HOME
$ ant build
Start ActiveMQ:
$ cd $ACTIVEMQ_HOME/bin
$ ./activemq &
If this is your first time running this example, you must start the subscriber client first so that it can identify itself to ActiveMQ. Then you can shut it down immediately.
$ cd $DEVEL_HOME
$ ./activemqtest.sh receive
CTRL-C
$
Publish some messages. Start the sender and keep entering text. If you enter the string "SHUTDOWN", the receiver will shut itself down gracefully after receiving all messages (this is not recommended in production, of course). A blank line, or the word "quit" or "exit" will stop the sender.
$ cd $DEVEL_HOME
$ ./activemqtest.sh send
AMQTestSender is waiting ...
Enter message: Hi there
Enter message: Hope you get this!
Enter message: Bye now
Enter message: SHUTDOWN
Enter message:
Before close session
Before close connection
$
Now run the client and get the messages:
$ cd $DEVEL_HOME
$ ./activemqtest.sh receive
Before create durable subscriber
AMQTestReceiver is waiting ...
AMQTestReceiver: received ID:galadriel-39668-1109644140293-7:0
Hi there
AMQTestReceiver: received ID:galadriel-39668-1109644140293-7:1
Hope you get this!
AMQTestReceiver: received ID:galadriel-39668-1109644140293-7:2
Bye now
AMQTestReceiver: received ID:galadriel-39668-1109644140293-7:3
SHUTDOWN
Received shutdown message ...
Shutting down receiver ...
Closing session ...
Closing connection ...
$
Back to Table of Contents
|