001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.ft;
018
019import java.io.IOException;
020import java.net.URI;
021import java.net.URISyntaxException;
022import java.util.List;
023import java.util.concurrent.atomic.AtomicBoolean;
024
025import org.apache.activemq.Service;
026import org.apache.activemq.broker.BrokerService;
027import org.apache.activemq.broker.BrokerServiceAware;
028import org.apache.activemq.broker.TransportConnector;
029import org.apache.activemq.command.BrokerInfo;
030import org.apache.activemq.command.Command;
031import org.apache.activemq.command.CommandTypes;
032import org.apache.activemq.command.ConnectionId;
033import org.apache.activemq.command.ConnectionInfo;
034import org.apache.activemq.command.MessageDispatch;
035import org.apache.activemq.command.ProducerInfo;
036import org.apache.activemq.command.Response;
037import org.apache.activemq.command.SessionInfo;
038import org.apache.activemq.command.ShutdownInfo;
039import org.apache.activemq.transport.DefaultTransportListener;
040import org.apache.activemq.transport.Transport;
041import org.apache.activemq.transport.TransportDisposedIOException;
042import org.apache.activemq.transport.TransportFactory;
043import org.apache.activemq.util.IdGenerator;
044import org.apache.activemq.util.ServiceStopper;
045import org.apache.activemq.util.ServiceSupport;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049/**
050 * Connects a Slave Broker to a Master when using <a
051 * href="http://activemq.apache.org/masterslave.html">Master Slave</a> for High
052 * Availability of messages.
053 * 
054 * @org.apache.xbean.XBean
055 * 
056 */
057public class MasterConnector implements Service, BrokerServiceAware {
058
059    private static final Logger LOG = LoggerFactory.getLogger(MasterConnector.class);
060    private BrokerService broker;
061    private URI remoteURI;
062    private URI localURI;
063    private Transport localBroker;
064    private Transport remoteBroker;
065    private TransportConnector connector;
066    private AtomicBoolean started = new AtomicBoolean(false);
067    private AtomicBoolean stoppedBeforeStart = new AtomicBoolean(false);
068    private final IdGenerator idGenerator = new IdGenerator();
069    private String userName;
070    private String password;
071    private ConnectionInfo connectionInfo;
072    private SessionInfo sessionInfo;
073    private ProducerInfo producerInfo;
074    private final AtomicBoolean masterActive = new AtomicBoolean();
075    private BrokerInfo brokerInfo;
076    private boolean firstConnection=true;
077    private boolean failedToStart;
078
079    public MasterConnector() {
080    }
081
082    public MasterConnector(String remoteUri) throws URISyntaxException {
083        remoteURI = new URI(remoteUri);
084    }
085
086    public void setBrokerService(BrokerService broker) {
087        this.broker = broker;
088        if (localURI == null) {
089            localURI = broker.getVmConnectorURI();
090        }
091        if (connector == null) {
092            List transportConnectors = broker.getTransportConnectors();
093            if (!transportConnectors.isEmpty()) {
094                connector = (TransportConnector)transportConnectors.get(0);
095            }
096        }
097    }
098
099    public boolean isSlave() {
100        return masterActive.get();
101    }
102
103    protected void restartBridge() throws Exception {
104        localBroker.oneway(connectionInfo);
105        remoteBroker.oneway(connectionInfo);
106        localBroker.oneway(sessionInfo);
107        remoteBroker.oneway(sessionInfo);
108        remoteBroker.oneway(producerInfo);
109        remoteBroker.oneway(brokerInfo);
110    }
111    
112    public void start() throws Exception {
113        if (!started.compareAndSet(false, true)) {
114            return;
115        }
116        if (remoteURI == null) {
117            throw new IllegalArgumentException("You must specify a remoteURI");
118        }
119        localBroker = TransportFactory.connect(localURI);
120        remoteBroker = TransportFactory.connect(remoteURI);
121        LOG.info("Starting a slave connection between " + localBroker + " and " + remoteBroker);
122        localBroker.setTransportListener(new DefaultTransportListener() {
123
124            public void onCommand(Object command) {
125            }
126
127            public void onException(IOException error) {
128                if (started.get()) {
129                    serviceLocalException(error);
130                }
131            }
132        });
133        remoteBroker.setTransportListener(new DefaultTransportListener() {
134
135            public void onCommand(Object o) {
136                Command command = (Command)o;
137                if (started.get()) {
138                    serviceRemoteCommand(command);
139                }
140            }
141
142            public void onException(IOException error) {
143                if (started.get()) {
144                    serviceRemoteException(error);
145                }
146            }
147            
148            public void transportResumed() {
149                try{
150                        if(!firstConnection){
151                                localBroker = TransportFactory.connect(localURI);
152                                localBroker.setTransportListener(new DefaultTransportListener() {
153        
154                                public void onCommand(Object command) {
155                                }
156        
157                                public void onException(IOException error) {
158                                    if (started.get()) {
159                                        serviceLocalException(error);
160                                    }
161                                }
162                            });
163                                localBroker.start();
164                                restartBridge();
165                                LOG.info("Slave connection between " + localBroker + " and " + remoteBroker + " has been reestablished.");
166                        }else{
167                                firstConnection=false;
168                        }
169                }catch(IOException e){
170                        LOG.error("MasterConnector failed to send BrokerInfo in transportResumed:", e);
171                }catch(Exception e){
172                        LOG.error("MasterConnector failed to restart localBroker in transportResumed:", e);
173                }
174                
175            }
176        });
177        try {
178            localBroker.start();
179            remoteBroker.start();
180            startBridge();
181            masterActive.set(true);
182        } catch (Exception e) {
183            masterActive.set(false);
184            if(!stoppedBeforeStart.get()){
185                LOG.error("Failed to start network bridge: " + e, e);
186            }else{
187                LOG.info("Slave stopped before connected to the master.");
188            }
189            setFailedToStart(true);
190        }    
191    }
192
193    protected void startBridge() throws Exception {
194        connectionInfo = new ConnectionInfo();
195        connectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
196        connectionInfo.setClientId(idGenerator.generateId());
197        connectionInfo.setUserName(userName);
198        connectionInfo.setPassword(password);
199        connectionInfo.setBrokerMasterConnector(true);
200        sessionInfo = new SessionInfo(connectionInfo, 1);
201        producerInfo = new ProducerInfo(sessionInfo, 1);
202        producerInfo.setResponseRequired(false);
203        if (connector != null) {
204            brokerInfo = connector.getBrokerInfo();
205        } else {
206            brokerInfo = new BrokerInfo();
207        }
208        brokerInfo.setBrokerName(broker.getBrokerName());
209        brokerInfo.setPeerBrokerInfos(broker.getBroker().getPeerBrokerInfos());
210        brokerInfo.setSlaveBroker(true);
211        brokerInfo.setPassiveSlave(broker.isPassiveSlave());
212        restartBridge();
213        LOG.info("Slave connection between " + localBroker + " and " + remoteBroker + " has been established.");
214    }
215
216    public void stop() throws Exception {
217        if (!started.compareAndSet(true, false)||!masterActive.get()) {
218            return;
219        }
220        masterActive.set(false);
221        try {
222            // if (connectionInfo!=null){
223            // localBroker.request(connectionInfo.createRemoveCommand());
224            // }
225            // localBroker.setTransportListener(null);
226            // remoteBroker.setTransportListener(null);
227            remoteBroker.oneway(new ShutdownInfo());
228            localBroker.oneway(new ShutdownInfo());
229        } catch (IOException e) {
230            LOG.debug("Caught exception stopping", e);
231        } finally {
232            ServiceStopper ss = new ServiceStopper();
233            ss.stop(localBroker);
234            ss.stop(remoteBroker);
235            ss.throwFirstException();
236        }
237    }
238    
239    public void stopBeforeConnected()throws Exception{
240        masterActive.set(false);
241        started.set(false);
242        stoppedBeforeStart.set(true);
243        ServiceStopper ss = new ServiceStopper();
244        ss.stop(localBroker);
245        ss.stop(remoteBroker);
246    }
247
248    protected void serviceRemoteException(IOException error) {
249        LOG.error("Network connection between " + localBroker + " and " + remoteBroker + " shutdown: " + error.getMessage(), error);
250        shutDown();
251    }
252
253    protected void serviceRemoteCommand(Command command) {
254        try {
255            if (command.isMessageDispatch()) {
256                MessageDispatch md = (MessageDispatch)command;
257                command = md.getMessage();
258            }
259            if (command.getDataStructureType() == CommandTypes.SHUTDOWN_INFO) {
260                LOG.warn("The Master has shutdown");
261                shutDown();
262            } else {
263                boolean responseRequired = command.isResponseRequired();
264                int commandId = command.getCommandId();
265                if (responseRequired) {
266                    Response response = (Response)localBroker.request(command);
267                    response.setCorrelationId(commandId);
268                    remoteBroker.oneway(response);
269                } else {
270                    localBroker.oneway(command);
271                }
272            }
273        } catch (IOException e) {
274            serviceRemoteException(e);
275        }
276    }
277
278    protected void serviceLocalException(Throwable error) {
279        if (!(error instanceof TransportDisposedIOException) || localBroker.isDisposed()){
280                LOG.info("Network connection between " + localBroker + " and " + remoteBroker + " shutdown: " + error.getMessage(), error);
281                ServiceSupport.dispose(this);
282        }else{
283                LOG.info(error.getMessage());
284        }
285    }
286
287    /**
288     * @return Returns the localURI.
289     */
290    public URI getLocalURI() {
291        return localURI;
292    }
293
294    /**
295     * @param localURI The localURI to set.
296     */
297    public void setLocalURI(URI localURI) {
298        this.localURI = localURI;
299    }
300
301    /**
302     * @return Returns the remoteURI.
303     */
304    public URI getRemoteURI() {
305        return remoteURI;
306    }
307
308    /**
309     * @param remoteURI The remoteURI to set.
310     */
311    public void setRemoteURI(URI remoteURI) {
312        this.remoteURI = remoteURI;
313    }
314
315    /**
316     * @return Returns the password.
317     */
318    public String getPassword() {
319        return password;
320    }
321
322    /**
323     * @param password The password to set.
324     */
325    public void setPassword(String password) {
326        this.password = password;
327    }
328
329    /**
330     * @return Returns the userName.
331     */
332    public String getUserName() {
333        return userName;
334    }
335
336    /**
337     * @param userName The userName to set.
338     */
339    public void setUserName(String userName) {
340        this.userName = userName;
341    }
342
343    private void shutDown() {
344        masterActive.set(false);
345        broker.masterFailed();
346        ServiceSupport.dispose(this);
347    }
348
349        public boolean isStoppedBeforeStart() {
350                return stoppedBeforeStart.get();
351        }
352
353    /**
354     * Get the failedToStart
355     * @return the failedToStart
356     */
357    public boolean isFailedToStart() {
358        return this.failedToStart;
359    }
360
361    /**
362     * Set the failedToStart
363     * @param failedToStart the failedToStart to set
364     */
365    public void setFailedToStart(boolean failedToStart) {
366        this.failedToStart = failedToStart;
367    }
368
369}