ActiveMQ 是 Apache 开源的一款功能强大的消息中间件,支持多种协议(如 JMS、AMQP 等),广泛应用于分布式系统中的异步通信和解耦场景。通过 ActiveMQ,可以实现生产者和消费者之间的消息传递,从而提高系统的可扩展性和可靠性。
下载与启动
bin
目录,运行 activemq start
启动服务,默认监听端口为 61616
(用于消息通信)和 8161
(管理控制台)。http://localhost:8161/admin
,默认用户名和密码为 admin/admin
。依赖引入 在 Maven 项目中添加以下依赖:
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.16.5</version>
</dependency>
生产者代码:
import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Producer {
public static void main(String[] args) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// 获取连接
try (Connection connection = factory.createConnection()) {
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建队列
Destination destination = session.createQueue("test.queue");
MessageProducer producer = session.createProducer(destination);
// 发送消息
for (int i = 0; i < 3; i++) {
TextMessage message = session.createTextMessage("Message " + i);
producer.send(message);
System.out.println("Sent: " + message.getText());
}
}
}
}
消费者代码:
import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Consumer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
try (Connection connection = factory.createConnection()) {
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("test.queue");
MessageConsumer consumer = session.createConsumer(destination);
// 接收消息
while (true) {
TextMessage message = (TextMessage) consumer.receive(1000);
if (message != null) {
System.out.println("Received: " + message.getText());
} else {
break;
}
}
}
}
}
生产者代码(与 Queue 模式类似,只需将 createQueue
替换为 createTopic
):
Destination destination = session.createTopic("test.topic");
消费者代码:
import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;
public class TopicConsumer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
try (Connection connection = factory.createConnection()) {
connection.start();
// 设置客户端ID(Topic模式需要)
connection.setClientID("consumer1");
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Destination destination = session.createTopic("test.topic");
// 创建持久化订阅者
MessageConsumer consumer = session.createDurableSubscriber((Topic) destination, "subscriptionName");
while (true) {
TextMessage message = (TextMessage) consumer.receive(1000);
if (message != null) {
System.out.println("Received: " + message.getText());
message.acknowledge(); // 手动确认
} else {
break;
}
}
}
}
}
DeliveryMode.PERSISTENT
实现。ActiveMQ 是一款成熟的消息中间件,适用于多种业务场景。通过本文的实战案例,读者可以快速上手并掌握其基本用法。结合实际需求,还可以进一步探索其高级特性,如事务支持、集群部署等。