001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.util;
018
019import javax.annotation.PostConstruct;
020import javax.annotation.PreDestroy;
021import javax.jms.Connection;
022import javax.jms.ConnectionFactory;
023import javax.jms.Destination;
024import javax.jms.ExceptionListener;
025import javax.jms.JMSException;
026import javax.jms.MessageConsumer;
027import javax.jms.Session;
028import org.apache.activemq.ActiveMQConnectionFactory;
029import org.apache.activemq.Service;
030import org.apache.activemq.advisory.AdvisorySupport;
031import org.apache.activemq.util.ServiceStopper;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035/**
036 * An agent which listens to commands on a JMS destination
037 * 
038 * 
039 * @org.apache.xbean.XBean
040 */
041public class CommandAgent implements Service, ExceptionListener {
042    private static final Logger LOG = LoggerFactory.getLogger(CommandAgent.class);
043
044    private String brokerUrl = "vm://localhost";
045    private String username;
046    private String password;
047    private ConnectionFactory connectionFactory;
048    private Connection connection;
049    private Destination commandDestination;
050    private CommandMessageListener listener;
051    private Session session;
052    private MessageConsumer consumer;
053
054    /**
055     *
056     * @throws Exception
057     * @org.apache.xbean.InitMethod
058     */
059    @PostConstruct
060    public void start() throws Exception {
061        session = getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
062        listener = new CommandMessageListener(session);
063        Destination destination = getCommandDestination();
064        if (LOG.isDebugEnabled()) {
065            LOG.debug("Agent subscribing to control destination: " + destination);
066        }
067        consumer = session.createConsumer(destination);
068        consumer.setMessageListener(listener);
069    }
070
071    /**
072     *
073     * @throws Exception
074     * @org.apache.xbean.DestroyMethod
075     */
076    @PreDestroy
077    public void stop() throws Exception {
078        ServiceStopper stopper = new ServiceStopper();
079        if (consumer != null) {
080            try {
081                consumer.close();
082                consumer = null;
083            } catch (JMSException e) {
084                stopper.onException(this, e);
085            }
086        }
087        if (session != null) {
088            try {
089                session.close();
090                session = null;
091            } catch (JMSException e) {
092                stopper.onException(this, e);
093            }
094        }
095        if (connection != null) {
096            try {
097                connection.close();
098                connection = null;
099            } catch (JMSException e) {
100                stopper.onException(this, e);
101            }
102        }
103        stopper.throwFirstException();
104    }
105
106    // Properties
107    // -------------------------------------------------------------------------
108    public String getBrokerUrl() {
109        return brokerUrl;
110    }
111
112    public void setBrokerUrl(String brokerUrl) {
113        this.brokerUrl = brokerUrl;
114    }    
115
116    public String getUsername() {
117                return username;
118        }
119
120        public void setUsername(String username) {
121                this.username = username;
122        }
123
124        public String getPassword() {
125                return password;
126        }
127
128        public void setPassword(String password) {
129                this.password = password;
130        }
131
132        public ConnectionFactory getConnectionFactory() {
133        if (connectionFactory == null) {
134            connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
135        }
136        return connectionFactory;
137    }
138
139    public void setConnectionFactory(ConnectionFactory connectionFactory) {
140        this.connectionFactory = connectionFactory;
141    }
142
143    public Connection getConnection() throws JMSException {
144        if (connection == null) {
145            connection = createConnection();
146            connection.setExceptionListener(this);
147            connection.start();
148        }
149        return connection;
150    }
151
152    public void setConnection(Connection connection) {
153        this.connection = connection;
154    }
155
156    public Destination getCommandDestination() {
157        if (commandDestination == null) {
158            commandDestination = createCommandDestination();
159        }
160        return commandDestination;
161    }
162
163    public void setCommandDestination(Destination commandDestination) {
164        this.commandDestination = commandDestination;
165    }
166
167    protected Connection createConnection() throws JMSException {
168        return getConnectionFactory().createConnection(username, password);
169    }
170
171    protected Destination createCommandDestination() {
172        return AdvisorySupport.getAgentDestination();
173    }
174
175    public void onException(JMSException exception) {
176        try {
177            stop();
178        } catch (Exception e) {
179        }
180    }
181}