Java中使用ActiveMQ作为消息中间件的实战经验

2025-04发布8次浏览

Java中使用ActiveMQ作为消息中间件的实战经验

一、什么是ActiveMQ

ActiveMQ 是 Apache 开源的一款功能强大的消息中间件,支持多种协议(如 JMS、AMQP 等),广泛应用于分布式系统中的异步通信和解耦场景。通过 ActiveMQ,可以实现生产者和消费者之间的消息传递,从而提高系统的可扩展性和可靠性。

二、ActiveMQ 的核心概念

  1. Broker:ActiveMQ 的核心组件,负责接收、存储和转发消息。
  2. Producer:消息生产者,负责向 Broker 发送消息。
  3. Consumer:消息消费者,负责从 Broker 接收消息。
  4. Queue 和 Topic
    • Queue:点对点模式,每条消息只有一个消费者。
    • Topic:发布/订阅模式,每条消息可以被多个消费者接收。

三、ActiveMQ 的安装与配置

  1. 下载与启动

    • 下载地址:ActiveMQ 官网
    • 解压后进入 bin 目录,运行 activemq start 启动服务,默认监听端口为 61616(用于消息通信)和 8161(管理控制台)。
    • 访问管理界面:http://localhost:8161/admin,默认用户名和密码为 admin/admin
  2. 依赖引入 在 Maven 项目中添加以下依赖:

    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-all</artifactId>
        <version>5.16.5</version>
    </dependency>
    

四、Java 实战代码示例

1. Queue 模式

生产者代码

import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Producer {
    public static void main(String[] args) throws Exception {
        // 创建连接工厂
        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        // 获取连接
        try (Connection connection = factory.createConnection()) {
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 创建队列
            Destination destination = session.createQueue("test.queue");
            MessageProducer producer = session.createProducer(destination);
            // 发送消息
            for (int i = 0; i < 3; i++) {
                TextMessage message = session.createTextMessage("Message " + i);
                producer.send(message);
                System.out.println("Sent: " + message.getText());
            }
        }
    }
}

消费者代码

import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Consumer {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        try (Connection connection = factory.createConnection()) {
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createQueue("test.queue");
            MessageConsumer consumer = session.createConsumer(destination);
            // 接收消息
            while (true) {
                TextMessage message = (TextMessage) consumer.receive(1000);
                if (message != null) {
                    System.out.println("Received: " + message.getText());
                } else {
                    break;
                }
            }
        }
    }
}
2. Topic 模式

生产者代码(与 Queue 模式类似,只需将 createQueue 替换为 createTopic):

Destination destination = session.createTopic("test.topic");

消费者代码

import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;

public class TopicConsumer {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        try (Connection connection = factory.createConnection()) {
            connection.start();
            // 设置客户端ID(Topic模式需要)
            connection.setClientID("consumer1");
            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            Destination destination = session.createTopic("test.topic");
            // 创建持久化订阅者
            MessageConsumer consumer = session.createDurableSubscriber((Topic) destination, "subscriptionName");
            while (true) {
                TextMessage message = (TextMessage) consumer.receive(1000);
                if (message != null) {
                    System.out.println("Received: " + message.getText());
                    message.acknowledge(); // 手动确认
                } else {
                    break;
                }
            }
        }
    }
}

五、注意事项

  1. 性能优化:在高并发场景下,可以通过调整线程池、缓存连接等方式提升性能。
  2. 持久化:确保消息不会因 Broker 崩溃而丢失,可通过设置 DeliveryMode.PERSISTENT 实现。
  3. 监控与管理:利用 ActiveMQ 提供的 Web 控制台或 JMX 接口进行实时监控。

六、总结

ActiveMQ 是一款成熟的消息中间件,适用于多种业务场景。通过本文的实战案例,读者可以快速上手并掌握其基本用法。结合实际需求,还可以进一步探索其高级特性,如事务支持、集群部署等。