001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.transport.peer; 018 019 import java.io.IOException; 020 import java.net.URI; 021 import java.net.URISyntaxException; 022 import java.util.HashMap; 023 import java.util.Map; 024 import java.util.concurrent.ConcurrentHashMap; 025 026 import org.apache.activemq.broker.BrokerFactoryHandler; 027 import org.apache.activemq.broker.BrokerService; 028 import org.apache.activemq.broker.TransportConnector; 029 import org.apache.activemq.transport.Transport; 030 import org.apache.activemq.transport.TransportFactory; 031 import org.apache.activemq.transport.TransportServer; 032 import org.apache.activemq.transport.vm.VMTransportFactory; 033 import org.apache.activemq.util.IOExceptionSupport; 034 import org.apache.activemq.util.IdGenerator; 035 import org.apache.activemq.util.IntrospectionSupport; 036 import org.apache.activemq.util.URISupport; 037 038 public class PeerTransportFactory extends TransportFactory { 039 040 public static final ConcurrentHashMap BROKERS = new ConcurrentHashMap(); 041 public static final ConcurrentHashMap CONNECTORS = new ConcurrentHashMap(); 042 public static final ConcurrentHashMap SERVERS = new ConcurrentHashMap(); 043 private static final IdGenerator ID_GENERATOR = new IdGenerator("peer-"); 044 045 public Transport doConnect(URI location) throws Exception { 046 VMTransportFactory vmTransportFactory = createTransportFactory(location); 047 return vmTransportFactory.doConnect(location); 048 } 049 050 public Transport doCompositeConnect(URI location) throws Exception { 051 VMTransportFactory vmTransportFactory = createTransportFactory(location); 052 return vmTransportFactory.doCompositeConnect(location); 053 } 054 055 /** 056 * @param location 057 * @return the converted URI 058 * @throws URISyntaxException 059 */ 060 private VMTransportFactory createTransportFactory(URI location) throws IOException { 061 try { 062 String group = location.getHost(); 063 String broker = URISupport.stripPrefix(location.getPath(), "/"); 064 065 if (group == null) { 066 group = "default"; 067 } 068 if (broker == null || broker.length() == 0) { 069 broker = ID_GENERATOR.generateSanitizedId(); 070 } 071 072 final Map<String, String> brokerOptions = new HashMap<String, String>(URISupport.parseParameters(location)); 073 if (!brokerOptions.containsKey("persistent")) { 074 brokerOptions.put("persistent", "false"); 075 } 076 077 final URI finalLocation = new URI("vm://" + broker); 078 final String finalBroker = broker; 079 final String finalGroup = group; 080 VMTransportFactory rc = new VMTransportFactory() { 081 public Transport doConnect(URI ignore) throws Exception { 082 return super.doConnect(finalLocation); 083 }; 084 085 public Transport doCompositeConnect(URI ignore) throws Exception { 086 return super.doCompositeConnect(finalLocation); 087 }; 088 }; 089 rc.setBrokerFactoryHandler(new BrokerFactoryHandler() { 090 public BrokerService createBroker(URI brokerURI) throws Exception { 091 BrokerService service = new BrokerService(); 092 IntrospectionSupport.setProperties(service, brokerOptions); 093 service.setBrokerName(finalBroker); 094 TransportConnector c = service.addConnector("tcp://0.0.0.0:0"); 095 c.setDiscoveryUri(new URI("multicast://default?group=" + finalGroup)); 096 service.addNetworkConnector("multicast://default?group=" + finalGroup); 097 return service; 098 } 099 }); 100 return rc; 101 102 } catch (URISyntaxException e) { 103 throw IOExceptionSupport.create(e); 104 } 105 } 106 107 public TransportServer doBind(URI location) throws IOException { 108 throw new IOException("This protocol does not support being bound."); 109 } 110 111 }