001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport;
018
019import java.io.IOException;
020
021import org.apache.activemq.command.WireFormatInfo;
022import org.apache.activemq.wireformat.WireFormat;
023import org.slf4j.Logger;
024import org.slf4j.LoggerFactory;
025
026/**
027 * Used to make sure that commands are arriving periodically from the peer of
028 * the transport.
029 */
030public class InactivityMonitor extends AbstractInactivityMonitor {
031
032    private static final Logger LOG = LoggerFactory.getLogger(InactivityMonitor.class);
033
034    private WireFormatInfo localWireFormatInfo;
035    private WireFormatInfo remoteWireFormatInfo;
036
037    private boolean ignoreRemoteWireFormat = false;
038    private boolean ignoreAllWireFormatInfo = false;
039
040    public InactivityMonitor(Transport next, WireFormat wireFormat) {
041        super(next, wireFormat);
042        if (this.wireFormat == null) {
043            this.ignoreAllWireFormatInfo = true;
044        }
045    }
046
047    protected void processInboundWireFormatInfo(WireFormatInfo info) throws IOException {
048        IOException error = null;
049        remoteWireFormatInfo = info;
050        try {
051            startMonitorThreads();
052        } catch (IOException e) {
053            error = e;
054        }
055        if (error != null) {
056            onException(error);
057        }
058    }
059
060    protected void processOutboundWireFormatInfo(WireFormatInfo info) throws IOException{
061        localWireFormatInfo = info;
062        startMonitorThreads();
063    }
064
065    @Override
066    protected synchronized void startMonitorThreads() throws IOException {
067        if (isMonitorStarted()) {
068            return;
069        }
070
071        long readCheckTime = getReadCheckTime();
072
073        if (readCheckTime > 0) {
074            setWriteCheckTime(writeCheckValueFromReadCheck(readCheckTime));
075        }
076
077        super.startMonitorThreads();
078    }
079
080    private long writeCheckValueFromReadCheck(long readCheckTime) {
081        return readCheckTime>3 ? readCheckTime/3 : readCheckTime;
082    }
083
084    @Override
085    protected boolean configuredOk() throws IOException {
086        boolean configured = false;
087        if (ignoreAllWireFormatInfo) {
088            configured = true;
089        } else if (localWireFormatInfo != null && remoteWireFormatInfo != null) {
090            if (!ignoreRemoteWireFormat) {
091                if (LOG.isDebugEnabled()) {
092                    LOG.debug("Using min of local: " + localWireFormatInfo + " and remote: " + remoteWireFormatInfo);
093                }
094
095                long readCheckTime = Math.min(localWireFormatInfo.getMaxInactivityDuration(), remoteWireFormatInfo.getMaxInactivityDuration());
096                long writeCheckTime = writeCheckValueFromReadCheck(readCheckTime);
097
098                setReadCheckTime(readCheckTime);
099                setInitialDelayTime(Math.min(localWireFormatInfo.getMaxInactivityDurationInitalDelay(), remoteWireFormatInfo.getMaxInactivityDurationInitalDelay()));
100                setWriteCheckTime(writeCheckTime);
101
102            } else {
103                if (LOG.isDebugEnabled()) {
104                    LOG.debug("Using local: " + localWireFormatInfo);
105                }
106
107                long readCheckTime = localWireFormatInfo.getMaxInactivityDuration();
108                long writeCheckTime = writeCheckValueFromReadCheck(readCheckTime);
109
110                setReadCheckTime(readCheckTime);
111                setInitialDelayTime(localWireFormatInfo.getMaxInactivityDurationInitalDelay());
112                setWriteCheckTime(writeCheckTime);
113            }
114            configured = true;
115        }
116
117        return configured;
118    }
119
120    public boolean isIgnoreAllWireFormatInfo() {
121        return ignoreAllWireFormatInfo;
122    }
123
124    public void setIgnoreAllWireFormatInfo(boolean ignoreAllWireFormatInfo) {
125        this.ignoreAllWireFormatInfo = ignoreAllWireFormatInfo;
126    }
127
128    public boolean isIgnoreRemoteWireFormat() {
129        return ignoreRemoteWireFormat;
130    }
131
132    public void setIgnoreRemoteWireFormat(boolean ignoreRemoteWireFormat) {
133        this.ignoreRemoteWireFormat = ignoreRemoteWireFormat;
134    }
135}