001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.util; 018 019import java.io.IOException; 020 021import javax.jms.Destination; 022import javax.jms.JMSException; 023import javax.jms.Message; 024import javax.jms.MessageListener; 025import javax.jms.MessageProducer; 026import javax.jms.Session; 027import javax.jms.TextMessage; 028 029import org.apache.activemq.command.ActiveMQTextMessage; 030import org.apache.activemq.util.FactoryFinder; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033 034/** 035 * 036 */ 037public class CommandMessageListener implements MessageListener { 038 private static final Logger LOG = LoggerFactory.getLogger(CommandMessageListener.class); 039 040 private Session session; 041 private MessageProducer producer; 042 private CommandHandler handler; 043 044 public CommandMessageListener(Session session) { 045 this.session = session; 046 } 047 048 public void onMessage(Message message) { 049 if (LOG.isDebugEnabled()) { 050 LOG.debug("Received command: " + message); 051 } 052 if (message instanceof TextMessage) { 053 TextMessage request = (TextMessage)message; 054 try { 055 Destination replyTo = message.getJMSReplyTo(); 056 if (replyTo == null) { 057 LOG.warn("Ignored message as no JMSReplyTo set: " + message); 058 return; 059 } 060 Message response = processCommand(request); 061 addReplyHeaders(request, response); 062 getProducer().send(replyTo, response); 063 } catch (Exception e) { 064 LOG.error("Failed to process message due to: " + e + ". Message: " + message, e); 065 } 066 } else { 067 LOG.warn("Ignoring invalid message: " + message); 068 } 069 } 070 071 protected void addReplyHeaders(TextMessage request, Message response) throws JMSException { 072 String correlationID = request.getJMSCorrelationID(); 073 if (correlationID != null) { 074 response.setJMSCorrelationID(correlationID); 075 } 076 } 077 078 /** 079 * Processes an incoming JMS message returning the response message 080 */ 081 public Message processCommand(TextMessage request) throws Exception { 082 TextMessage response = session.createTextMessage(); 083 getHandler().processCommand(request, response); 084 return response; 085 } 086 087 /** 088 * Processes an incoming command from a console and returning the text to 089 * output 090 */ 091 public String processCommandText(String line) throws Exception { 092 TextMessage request = new ActiveMQTextMessage(); 093 request.setText(line); 094 TextMessage response = new ActiveMQTextMessage(); 095 getHandler().processCommand(request, response); 096 return response.getText(); 097 } 098 099 public Session getSession() { 100 return session; 101 } 102 103 public MessageProducer getProducer() throws JMSException { 104 if (producer == null) { 105 producer = getSession().createProducer(null); 106 } 107 return producer; 108 } 109 110 public CommandHandler getHandler() throws IllegalAccessException, IOException, InstantiationException, ClassNotFoundException { 111 if (handler == null) { 112 handler = createHandler(); 113 } 114 return handler; 115 } 116 117 private CommandHandler createHandler() throws IllegalAccessException, IOException, ClassNotFoundException, InstantiationException { 118 FactoryFinder factoryFinder = new FactoryFinder("META-INF/services/org/apache/activemq/broker/"); 119 return (CommandHandler)factoryFinder.newInstance("agent"); 120 } 121}