- Apache ActiveMQ 教程
- Apache ActiveMQ - 主页
- Apache ActiveMQ - 概述
- Apache ActiveMQ - 环境设置
- Apache ActiveMQ - 功能
- Apache ActiveMQ - 运行代理服务器
- Apache ActiveMQ - 管理控制台
- 基于 Apache ActiveMQ 队列的示例
- Apache ActiveMQ - 生产者应用程序
- Apache ActiveMQ - 消费者应用程序
- Apache ActiveMQ - 测试应用程序
- Apache ActiveMQ 基于主题的示例
- Apache ActiveMQ - 发布者应用程序
- Apache ActiveMQ - 订阅者应用程序
- Apache ActiveMQ - 测试应用程序
- Apache ActiveMQ 有用资源
- Apache ActiveMQ - 快速指南
- Apache ActiveMQ - 有用的资源
- Apache ActiveMQ - 讨论
Apache ActiveMQ - 快速指南
Apache ActiveMQ - 概述
什么是 ActiveMQ?
ActiveMQ 是一个用 Java 编写的开源消息代理。它完全符合 JMS 1.1 标准。它由 Apache Software Foundation 开发和维护,并根据 Apache 许可证获得许可。它为企业级消息传递应用程序提供高可用性、可扩展性、可靠性、性能和安全性。
JMS 是一种允许开发基于消息的系统的规范。ActiveMQ 充当应用程序之间的消息代理,并允许它们以异步且可靠的方式进行通信。
消息传递类型
为了更好地理解,下面解释了两种类型的消息传递选项。
点对点
在这种类型的通信中,代理仅向一个消费者发送消息,而其他消费者将等待,直到从代理获取消息。没有消费者会收到相同的消息。
如果没有消费者,Broker 将保留消息,直到找到消费者。这种类型的通信也称为基于队列的通信,其中生产者将消息发送到队列,并且只有一个消费者从队列中获取一条消息。如果有多个消费者,他们可能会收到下一条消息,但不会收到与其他消费者相同的消息。
发布/订阅
在这种类型的通信中,代理将相同的消息副本发送给所有活跃的消费者。这种类型的通信也称为基于主题的通信,其中代理向订阅特定主题的所有活跃消费者发送相同的消息。该模型支持单向通信,无需验证传输的消息。
Apache ActiveMQ - 环境设置
本章将指导您如何准备开发环境以开始使用 ActiveMQ。它还将教您如何在设置 ActiveMQ 之前在计算机上设置 JDK、Maven 和 Eclipse -
设置 Java 开发工具包 (JDK)
您可以从 Oracle 的 Java 站点 - Java SE 下载下载最新版本的 SDK 。您将在下载的文件中找到安装 JDK 的说明,按照给定的说明进行安装和配置设置。最后设置 PATH 和 JAVA_HOME 环境变量以引用包含 java 和 javac 的目录,通常分别为 java_install_dir/bin 和 java_install_dir。
如果您运行的是 Windows 并已将 JDK 安装在 C:\jdk-11.0.11 中,则必须将以下行放入 C:\autoexec.bat 文件中。
set PATH=C:\jdk-11.0.11;%PATH% set JAVA_HOME=C:\jdk-11.0.11
或者,在 Windows NT/2000/XP 上,您必须右键单击“我的电脑”,选择“属性”→“高级”→“环境变量”。然后,您必须更新 PATH 值并单击“确定”按钮。
在 Unix(Solaris、Linux 等)上,如果 SDK 安装在 /usr/local/jdk-11.0.11 中并且您使用 C shell,则必须将以下内容放入 .cshrc 文件中。
setenv PATH /usr/local/jdk-11.0.11/bin:$PATH setenv JAVA_HOME /usr/local/jdk-11.0.11
或者,如果您使用集成开发环境 (IDE),如 Borland JBuilder、Eclipse、IntelliJ IDEA 或 Sun ONE Studio,则必须编译并运行一个简单的程序来确认 IDE 知道您安装了 Java 的位置。否则,您将必须按照 IDE 文档中的规定进行正确的设置。
设置 Eclipse IDE
本教程中的所有示例都是使用 Eclipse IDE 编写的。因此,我们建议您应该在计算机上安装最新版本的 Eclipse。
要安装 Eclipse IDE,请从www.eclipse.org/downloads/下载最新的 Eclipse 二进制文件。下载安装后,将二进制发行版解压到一个方便的位置。例如,在 Windows 上的 C:\eclipse 中,或在 Linux/Unix 上的 /usr/local/eclipse 中,最后适当地设置 PATH 变量。
Eclipse 可以通过在 Windows 机器上执行以下命令来启动,或者只需双击 eclipse.exe
%C:\eclipse\eclipse.exe
可以通过在 Unix(Solaris、Linux 等)机器上执行以下命令来启动 Eclipse -
$/usr/local/eclipse/eclipse
成功启动后,如果一切正常,那么它应该显示以下结果 -
设置Maven
在本教程中,我们使用 Maven 来运行和构建基于 Spring 的示例,以运行基于 ActiveMQ 的应用程序。按照Maven - 环境设置安装 maven。
下载ActiveMQ
您可以从其官方页面下载ActiveMQ的最新稳定版本。按照下载 ActivMQ下载 ActiveMQ。我们使用的是 2022 年 2 月 15 日发布的 5.13.4。将存档内容提取到您选择的文件夹中。我们已经解压到F:/ → Apache → apache-activemq-5.16.4。
Apache ActiveMQ - 功能
ActiveMQ 旨在为企业级消息应用程序提供高可用性、可扩展性、可靠性、性能和安全性。以下是 ActiveMQ 的一些显着特性。
符合 JMS - ActiveMQ 完全符合 JMS 1.1 标准。JMS 规范提供了同步或异步消息传递、一次性消息传递、订阅者消息持久性等的标准机制。
连接选项- ActiveMQ 支持 HTTP/S、多播、SSL、Stomp、TCP、UDP、XMPP,从而提供广泛的连接选项,并允许各种系统使用其选择的协议进行通信。
可插拔架构- ActiveMQ 允许选择持久性机制,还提供根据应用程序需求定制身份验证和授权安全性的选项。
多平台- ActiveMQ 为许多流行语言(如 Java、C、C++、.NET、Perl、PHP、Python、Ruby 等)提供客户端 API。ActiveMQ Broker 将在 JVM 中运行,但客户端可以使用任何支持的语言编写。
代理集群- ActiveMQ 允许准备代理网络以实现可扩展性,并且可以支持不同类型的拓扑。
功能丰富- ActiveMQ 为代理和客户端提供许多高级功能,并支持 Apache Camel。
简单的管理界面- ActiveMQ 管理控制台易于使用,但仍然提供许多强大的管理功能。
Apache ActiveMQ - 运行代理服务器
我们已经下载了ActiveMQ - 环境设置中提到的最新版本的 ActiveMQ 。现在转到文件夹F:/ → Apache → apache-activemq-5.16.4/bin并输入以下命令。
例子
F:\Apache\apache-activemq-5.16.4\bin>activemq start
输出
您将看到类似的输出,并且 ActiveMQ 将开始运行。
Java Runtime: Oracle Corporation 11.0.11 C:\Program Files\Java\jdk-11.0.11 Heap sizes: current=1048576k free=1041918k max=1048576k JVM args: -Dcom.sun.management.jmxremote -Xms1G -Xmx1G -Djava.util.logging.config.file=logging.properties -Djava.security.auth.login.config=F:\Apache\apache-activemq-5.16.4\bin\..\conf\login.config -Dactivemq.classpath=F:\Apache\apache-activemq-5.16.4\bin\..\conf;F:\Apache\apache-activemq-5.16.4\bin\../conf;F:\Apache\apache-activemq-5.16.4\bin\../conf; -Dactivemq.home=F:\Apache\apache-activemq-5.16.4\bin\.. -Dactivemq.base=F:\Apache\apache-activemq-5.16.4\bin\.. -Dactivemq.conf=F:\Apache\apache-activemq-5.16.4\bin\..\conf -Dactivemq.data=F:\Apache\apache-activemq-5.16.4\bin\..\data -Djava.io.tmpdir=F:\Apache\apache-activemq-5.16.4\bin\..\data\tmp Extensions classpath: [F:\Apache\apache-activemq-5.16.4\bin\..\lib,F:\Apache\apache-activemq-5.16.4\bin\..\lib\camel,F:\Apache\apache-activemq-5.16.4\bin\..\lib\optional,F:\Apache\apache-activemq-5.16.4\bin\..\lib\web,F:\Apache\apache-activemq-5.16.4\bin\..\lib\extra] ACTIVEMQ_HOME: F:\Apache\apache-activemq-5.16.4\bin\.. ACTIVEMQ_BASE: F:\Apache\apache-activemq-5.16.4\bin\.. ACTIVEMQ_CONF: F:\Apache\apache-activemq-5.16.4\bin\..\conf ACTIVEMQ_DATA: F:\Apache\apache-activemq-5.16.4\bin\..\data Loading message broker from: xbean:activemq.xml INFO | Refreshing org.apache.activemq.xbean.XBeanBrokerFactory$1@53fe15ff: startup date [Sat Feb 26 12:50:18 IST 2022]; root of context hierarchy INFO | Using Persistence Adapter: KahaDBPersistenceAdapter[F:\Apache\apache-activemq-5.16.4\bin\..\data\kahadb] INFO | PListStore:[F:\Apache\apache-activemq-5.16.4\bin\..\data\localhost\tmp_storage] started INFO | Apache ActiveMQ 5.16.4 (localhost, ID:DESKTOP-86KD9FC-52669-1645860020983-0:1) is starting INFO | Listening for connections at: tcp://DESKTOP-86KD9FC:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600 INFO | Connector openwire started INFO | Listening for connections at: amqp://DESKTOP-86KD9FC:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600 INFO | Connector amqp started INFO | Listening for connections at: stomp://DESKTOP-86KD9FC:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600 INFO | Connector stomp started INFO | Listening for connections at: mqtt://DESKTOP-86KD9FC:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600 INFO | Connector mqtt started INFO | Starting Jetty server INFO | Creating Jetty connector WARN | ServletContext@o.e.j.s.ServletContextHandler@4f966719{/,null,STARTING} has uncovered http methods for path: / INFO | Listening for connections at ws://DESKTOP-86KD9FC:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600 INFO | Connector ws started INFO | Apache ActiveMQ 5.16.4 (localhost, ID:DESKTOP-86KD9FC-52669-1645860020983-0:1) started INFO | For help or more information please see: http://activemq.apache.org INFO | ActiveMQ WebConsole available at http://127.0.0.1:8161/ INFO | ActiveMQ Jolokia REST API available at http://127.0.0.1:8161/api/jolokia/
确认
现在在浏览器中打开http://127.0.0.1:8161/admin/ 。它会要求提供凭据。使用 admin/admin 作为用户名/密码,它将加载 ActiveMQ 管理控制台,您可以在其中检查队列、主题、连接等。
Apache ActiveMQ - 管理控制台
ActiveMQ 服务器启动并运行后。您可以使用管理控制台来管理队列、主题、订阅者、连接、网络等。
在浏览器中打开http://127.0.0.1:8161/admin/ 。它会要求提供凭据。使用 admin/admin 作为用户名/密码,它将加载 ActiveMQ 管理控制台,您可以在其中检查队列、主题、连接等。
队列
单击队列选项卡,输入队列名称 testQueue,然后单击创建按钮。现在您可以在列表中看到队列。
话题
同样,您可以创建主题并在“主题”选项卡中查看主题。
其他的
以同样的方式,您可以探索订阅者、连接、网桥、调度程序详细信息。
发送
“发送”选项卡允许通过指定目标和其他详细信息将 JMS 消息发送到特定队列或主题。
Apache ActiveMQ - 生产者应用程序
现在让我们创建一个生产者应用程序,它将向 ActiveMQ 队列发送消息。
创建项目
使用 eclipse,选择File → New → Maven Project。勾选“创建一个简单项目(跳过原型选择)”,然后单击“下一步”。
输入详细信息,如下所示 -
groupId - com.tutorialspoint
artifactId - 生产者
版本- 0.0.1-SNAPSHOT
名称- ActiveMQ 生产者
单击完成按钮,将创建一个新项目。
pom.xml
现在更新 pom.xml 的内容以包含 ActiveMQ 的依赖项。
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.tutorialspoint.activemq</groupId> <artifactId>producer</artifactId> <version>0.0.1-SNAPSHOT</version> <name>ActiveMQ Producer</name> <dependencies> <dependency> <groupId>org.apache.geronimo.specs</groupId> <artifactId>geronimo-jms_1.1_spec</artifactId> <version>1.1</version> </dependency> <dependency> <groupId>org.apache.qpid</groupId> <artifactId>qpid-jms-client</artifactId> <version>0.40.0</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>2.1</version> <configuration> <source>1.6</source> <target>1.6</target> </configuration> </plugin> <plugin> <groupId>org.fusesource.mvnplugins</groupId> <artifactId>maven-uberize-plugin</artifactId> <version>1.14</version> <executions> <execution> <phase>package</phase> <goals><goal>uberize</goal></goals> </execution> </executions> </plugin> </plugins> </build> </project>
现在创建一个 Producer 类,它将向 ActiveMQ 队列发送消息。
package com.tutorialspoint.activemq; import java.io.Console; import java.util.Scanner; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.qpid.jms.JmsConnectionFactory; public class Producer { public static void main(String[] args) throws Exception { // Create a connection to ActiveMQ JMS broker using AMQP protocol JmsConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:5672"); Connection connection = factory.createConnection("admin", "password"); connection.start(); // Create a session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // Create a queue Destination destination = session.createQueue("MyFirstQueue"); // Create a producer specific to queue MessageProducer producer = session.createProducer(destination); Scanner input = new Scanner(System.in); String response; do { System.out.println("Enter message: "); response = input.nextLine(); // Create a message object TextMessage msg = session.createTextMessage(response); // Send the message to the queue producer.send(msg); } while (!response.equalsIgnoreCase("Quit")); input.close(); // Close the connection connection.close(); } }
生产者类创建一个连接,启动会话,创建一个生产者,然后要求用户输入消息。如果用户输入 quit,则应用程序终止,否则它将向队列发送消息。
我们将在ActiveMQ - 测试应用程序一章中运行此应用程序。
Apache ActiveMQ - 消费者应用程序
现在让我们创建一个将从 ActiveMQ 队列接收消息的消费者应用程序。
创建项目
使用 eclipse,选择File → New → Maven Project。勾选“创建一个简单项目(跳过原型选择)”,然后单击“下一步”。
输入详细信息,如下所示 -
groupId - com.tutorialspoint
artifactId - 消费者
版本- 0.0.1-SNAPSHOT
名称- ActiveMQ 消费者
单击完成按钮,将创建一个新项目。
pom.xml
现在更新 pom.xml 的内容以包含 ActiveMQ 的依赖项。
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.tutorialspoint.activemq</groupId> <artifactId>consumer</artifactId> <version>0.0.1-SNAPSHOT</version> <name>ActiveMQ Consumer</name> <dependencies> <dependency> <groupId>org.apache.geronimo.specs</groupId> <artifactId>geronimo-jms_1.1_spec</artifactId> <version>1.1</version> </dependency> <dependency> <groupId>org.apache.qpid</groupId> <artifactId>qpid-jms-client</artifactId> <version>0.40.0</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>2.1</version> <configuration> <source>1.6</source> <target>1.6</target> </configuration> </plugin> <plugin> <groupId>org.fusesource.mvnplugins</groupId> <artifactId>maven-uberize-plugin</artifactId> <version>1.14</version> <executions> <execution> <phase>package</phase> <goals><goal>uberize</goal></goals> </execution> </executions> </plugin> </plugins> </build> </project>
现在创建一个 Consumer 类,它将从 ActiveMQ 队列接收消息。
package com.tutorialspoint.activemq; import java.io.Console; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.qpid.jms.JmsConnectionFactory; public class Consumer { public static void main(String[] args) throws Exception { // Create a connection to ActiveMQ JMS broker using AMQP protocol JmsConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:5672"); Connection connection = factory.createConnection("admin", "password"); connection.start(); // Create a session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // Create a queue Destination destination = session.createQueue("MyFirstQueue"); // Create a consumer specific to queue MessageConsumer consumer = session.createConsumer(destination); Console c = System.console(); String response; do { // Receive the message Message msg = consumer.receive(); response = ((TextMessage) msg).getText(); System.out.println("Received = "+response); } while (!response.equalsIgnoreCase("Quit")); // Close the connection connection.close(); } }
Consumer 类创建连接,启动会话,创建消费者,然后从队列接收消息(如果有)。如果队列包含退出消息,则应用程序终止,否则它将继续轮询队列中的消息。
我们将在ActiveMQ - 测试应用程序一章中运行此应用程序。
Apache ActiveMQ - 测试应用程序
启动ActiveMQ服务器
现在让我们启动 ActiveMQ 服务器。转到文件夹F:/ → Apache → apache-activemq-5.16.4/bin并输入以下命令。
例子
F:\Apache\apache-activemq-5.16.4\bin>activemq start
输出
您将看到类似的输出,并且 ActiveMQ 将开始运行。
... INFO | Apache ActiveMQ 5.16.4 (localhost, ID:DESKTOP-86KD9FC-52669-1645860020983-0:1) started INFO | For help or more information please see: http://activemq.apache.org INFO | ActiveMQ WebConsole available at http://127.0.0.1:8161/ INFO | ActiveMQ Jolokia REST API available at http://127.0.0.1:8161/api/jolokia/
启动生产者应用程序
在 Eclipse 中,右键单击 Producer.java 源,然后选择 Run As > Java Application。生产者应用程序将开始运行,您将看到如下输出 -
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. Enter message:
启动消费者应用程序
在 Eclipse 中,右键单击 Consumer.java 源,然后选择 Run As > Java Application。消费者应用程序将开始运行,您将看到如下输出 -
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
发信息
在 Producer 控制台窗口中,输入 Hi 并按 Enter 按钮发送消息。
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. Enter message: Hi
接收消息
在消费者控制台窗口中验证是否收到消息。
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. Received = Hi
将 Quit 作为消息发送以终止生产者和消费者控制台窗口会话。
确认
现在在浏览器中打开http://127.0.0.1:8161/admin/ 。它会要求提供凭据。使用 admin/admin 作为用户名/密码,它将加载 ActiveMQ 管理控制台,您可以在其中检查队列以检查状态。它将显示 2 条已排队并已发送的消息。
Apache ActiveMQ - 发布者应用程序
现在让我们创建一个发布者应用程序,它将向 ActiveMQ 队列发送消息。
创建项目
使用 eclipse,选择File → New → Maven Project。勾选“创建一个简单项目(跳过原型选择)”,然后单击“下一步”。
输入详细信息,如下所示 -
groupId - com.tutorialspoint
artifactId - 发布者
版本- 0.0.1-SNAPSHOT
名称- ActiveMQ 发布者
单击完成按钮,将创建一个新项目。
pom.xml
现在更新 pom.xml 的内容以包含 ActiveMQ 的依赖项。
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.tutorialspoint.activemq</groupId> <artifactId>publisher</artifactId> <version>0.0.1-SNAPSHOT</version> <name>ActiveMQ Publisher</name> <dependencies> <dependency> <groupId>org.apache.geronimo.specs</groupId> <artifactId>geronimo-jms_1.1_spec</artifactId> <version>1.1</version> </dependency> <dependency> <groupId>org.apache.qpid</groupId> <artifactId>qpid-jms-client</artifactId> <version>0.40.0</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>2.1</version> <configuration> <source>1.6</source> <target>1.6</target> </configuration> </plugin> <plugin> <groupId>org.fusesource.mvnplugins</groupId> <artifactId>maven-uberize-plugin</artifactId> <version>1.14</version> <executions> <execution> <phase>package</phase> <goals><goal>uberize</goal></goals> </execution> </executions> </plugin> </plugins> </build> </project>
现在创建一个 Publisher 类,它将向 ActiveMQ 主题发送消息,以将其广播给所有订阅者。
package com.tutorialspoint.activemq; import java.io.Console; import java.util.Scanner; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.qpid.jms.JmsConnectionFactory; public class Publisher { public static void main(String[] args) throws Exception { // Create a connection to ActiveMQ JMS broker using AMQP protocol JmsConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:5672"); Connection connection = factory.createConnection("admin", "password"); connection.start(); // Create a session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // Create a topic Destination destination = session.createTopic("MyFirstTopic"); // Create a publisher specific to topic MessageProducer publisher = session.createProducer(destination); Scanner input = new Scanner(System.in); String response; do { System.out.println("Enter message: "); response = input.nextLine(); // Create a message object TextMessage msg = session.createTextMessage(response); // Send the message to the topic publisher.send(msg); } while (!response.equalsIgnoreCase("Quit")); input.close(); // Close the connection connection.close(); } }
生产者类创建一个连接,启动会话,创建一个生产者,然后要求用户输入消息。如果用户输入 quit,则应用程序终止,否则它将向主题发送消息。
我们将在ActiveMQ - 测试应用程序一章中运行此应用程序。
Apache ActiveMQ - 订阅者应用程序
现在让我们创建一个订阅者应用程序,它将从 ActiveMQ 主题接收消息。
创建项目
使用 eclipse,选择File → New → Maven Project。勾选“创建一个简单项目(跳过原型选择)”,然后单击“下一步”。
输入详细信息,如下所示 -
groupId - com.tutorialspoint
artifactId - 订阅者
版本- 0.0.1-SNAPSHOT
名称- ActiveMQ 订阅者
单击完成按钮,将创建一个新项目。
pom.xml
现在更新 pom.xml 的内容以包含 ActiveMQ 的依赖项。
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.tutorialspoint.activemq</groupId> <artifactId>subscriber</artifactId> <version>0.0.1-SNAPSHOT</version> <name>ActiveMQ Subscriber</name> <dependencies> <dependency> <groupId>org.apache.geronimo.specs</groupId> <artifactId>geronimo-jms_1.1_spec</artifactId> <version>1.1</version> </dependency> <dependency> <groupId>org.apache.qpid</groupId> <artifactId>qpid-jms-client</artifactId> <version>0.40.0</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>2.1</version> <configuration> <source>1.6</source> <target>1.6</target> </configuration> </plugin> <plugin> <groupId>org.fusesource.mvnplugins</groupId> <artifactId>maven-uberize-plugin</artifactId> <version>1.14</version> <executions> <execution> <phase>package</phase> <goals><goal>uberize</goal></goals> </execution> </executions> </plugin> </plugins> </build> </project>
现在创建一个订阅者类,它将从 ActiveMQ 队列接收消息。
package com.tutorialspoint.activemq; import java.io.Console; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.qpid.jms.JmsConnectionFactory; public class Subscriber { public static void main(String[] args) throws Exception { // Create a connection to ActiveMQ JMS broker using AMQP protocol JmsConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:5672"); Connection connection = factory.createConnection("admin", "password"); connection.start(); // Create a session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // Create a topic Destination destination = session.createTopic("MyFirstTopic"); // Create a subscriber specific to topic MessageConsumer subscriber = session.createConsumer(destination); Console c = System.console(); String response; do { // Receive the message Message msg = subscriber.receive(); response = ((TextMessage) msg).getText(); System.out.println("Received = "+response); } while (!response.equalsIgnoreCase("Quit")); // Close the connection connection.close(); } }
订阅者类创建一个连接,启动会话,创建一个消费者,然后从主题接收消息(如果有)。如果主题包含 quit 作为消息,则应用程序终止,否则它将继续轮询队列中的消息。
我们将在ActiveMQ - 测试应用程序一章中多次运行此应用程序来创建多个订阅者。
Apache ActiveMQ - 测试应用程序主题
启动ActiveMQ服务器
现在让我们启动 ActiveMQ 服务器。转到文件夹F:/ → Apache → apache-activemq-5.16.4/bin并输入以下命令。
例子
F:\Apache\apache-activemq-5.16.4\bin>activemq start
输出
您将看到类似的输出,并且 ActiveMQ 将开始运行。
... INFO | Apache ActiveMQ 5.16.4 (localhost, ID:DESKTOP-86KD9FC-52669-1645860020983-0:1) started INFO | For help or more information please see: http://activemq.apache.org INFO | ActiveMQ WebConsole available at http://127.0.0.1:8161/ INFO | ActiveMQ Jolokia REST API available at http://127.0.0.1:8161/api/jolokia/
启动发布者应用程序
在 Eclipse 中,右键单击 Publisher.java 源,然后选择 Run As → Java Application。发布者应用程序将开始运行,您将看到如下输出 -
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. Enter message:
启动订阅者应用程序
在 Eclipse 中,右键单击 Subscriber.java 源,然后选择 Run As → Java Application。订阅者应用程序将开始运行,您将看到如下输出 -
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
启动另一个订阅者应用程序
在 Eclipse 中,再次右键单击 Subscriber.java 源,然后选择 Run As → Java Application。另一个订阅者应用程序将开始运行,您将看到如下输出 -
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
发信息
在发布者控制台窗口中,键入 Hi 并按 Enter 按钮发送消息。
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. Enter message: Hi
接收消息
在订阅者控制台窗口中验证是否在每个窗口中都收到了消息。
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. Received = Hi
将 Quit 作为消息发送以终止所有发布者和订阅者控制台窗口会话。
确认
现在在浏览器中打开http://127.0.0.1:8161/admin/ 。它会要求提供凭据。使用 admin/admin 作为用户名/密码,它将加载 ActiveMQ 管理控制台,您可以在其中检查主题以检查状态。它将显示多条已排队和已传递的消息。