001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.util;
018
019import java.io.IOException;
020
021import javax.jms.Destination;
022import javax.jms.JMSException;
023import javax.jms.Message;
024import javax.jms.MessageListener;
025import javax.jms.MessageProducer;
026import javax.jms.Session;
027import javax.jms.TextMessage;
028
029import org.apache.activemq.command.ActiveMQTextMessage;
030import org.apache.activemq.util.FactoryFinder;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034/**
035 * 
036 */
037public class CommandMessageListener implements MessageListener {
038    private static final Logger LOG = LoggerFactory.getLogger(CommandMessageListener.class);
039
040    private Session session;
041    private MessageProducer producer;
042    private CommandHandler handler;
043
044    public CommandMessageListener(Session session) {
045        this.session = session;
046    }
047
048    public void onMessage(Message message) {
049        if (LOG.isDebugEnabled()) {
050            LOG.debug("Received command: " + message);
051        }
052        if (message instanceof TextMessage) {
053            TextMessage request = (TextMessage)message;
054            try {
055                Destination replyTo = message.getJMSReplyTo();
056                if (replyTo == null) {
057                    LOG.warn("Ignored message as no JMSReplyTo set: " + message);
058                    return;
059                }
060                Message response = processCommand(request);
061                addReplyHeaders(request, response);
062                getProducer().send(replyTo, response);
063            } catch (Exception e) {
064                LOG.error("Failed to process message due to: " + e + ". Message: " + message, e);
065            }
066        } else {
067            LOG.warn("Ignoring invalid message: " + message);
068        }
069    }
070
071    protected void addReplyHeaders(TextMessage request, Message response) throws JMSException {
072        String correlationID = request.getJMSCorrelationID();
073        if (correlationID != null) {
074            response.setJMSCorrelationID(correlationID);
075        }
076    }
077
078    /**
079     * Processes an incoming JMS message returning the response message
080     */
081    public Message processCommand(TextMessage request) throws Exception {
082        TextMessage response = session.createTextMessage();
083        getHandler().processCommand(request, response);
084        return response;
085    }
086
087    /**
088     * Processes an incoming command from a console and returning the text to
089     * output
090     */
091    public String processCommandText(String line) throws Exception {
092        TextMessage request = new ActiveMQTextMessage();
093        request.setText(line);
094        TextMessage response = new ActiveMQTextMessage();
095        getHandler().processCommand(request, response);
096        return response.getText();
097    }
098
099    public Session getSession() {
100        return session;
101    }
102
103    public MessageProducer getProducer() throws JMSException {
104        if (producer == null) {
105            producer = getSession().createProducer(null);
106        }
107        return producer;
108    }
109
110    public CommandHandler getHandler() throws IllegalAccessException, IOException, InstantiationException, ClassNotFoundException {
111        if (handler == null) {
112            handler = createHandler();
113        }
114        return handler;
115    }
116
117    private CommandHandler createHandler() throws IllegalAccessException, IOException, ClassNotFoundException, InstantiationException {
118        FactoryFinder factoryFinder = new FactoryFinder("META-INF/services/org/apache/activemq/broker/");
119        return (CommandHandler)factoryFinder.newInstance("agent");
120    }
121}