001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.network;
018
019import java.io.IOException;
020
021import org.apache.activemq.broker.region.DurableTopicSubscription;
022import org.apache.activemq.broker.region.RegionBroker;
023import org.apache.activemq.broker.region.Subscription;
024import org.apache.activemq.broker.region.TopicRegion;
025import org.apache.activemq.command.ActiveMQDestination;
026import org.apache.activemq.command.ConsumerId;
027import org.apache.activemq.command.ConsumerInfo;
028import org.apache.activemq.command.RemoveSubscriptionInfo;
029import org.apache.activemq.filter.DestinationFilter;
030import org.apache.activemq.transport.Transport;
031import org.apache.activemq.util.NetworkBridgeUtils;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035/**
036 * Consolidates subscriptions
037 */
038public class DurableConduitBridge extends ConduitBridge {
039    private static final Logger LOG = LoggerFactory.getLogger(DurableConduitBridge.class);
040
041    @Override
042    public String toString() {
043        return "DurableConduitBridge:" + configuration.getBrokerName() + "->" + getRemoteBrokerName();
044    }
045    /**
046     * Constructor
047     *
048     * @param configuration
049     *
050     * @param localBroker
051     * @param remoteBroker
052     */
053    public DurableConduitBridge(NetworkBridgeConfiguration configuration, Transport localBroker,
054                                Transport remoteBroker) {
055        super(configuration, localBroker, remoteBroker);
056    }
057
058    /**
059     * Subscriptions for these destinations are always created
060     *
061     */
062    @Override
063    protected void setupStaticDestinations() {
064        super.setupStaticDestinations();
065        ActiveMQDestination[] dests = configuration.isDynamicOnly() ? null : durableDestinations;
066        if (dests != null) {
067            for (ActiveMQDestination dest : dests) {
068                if (isPermissableDestination(dest) && !doesConsumerExist(dest)) {
069                    try {
070                        //Filtering by non-empty subscriptions, see AMQ-5875
071                        if (dest.isTopic()) {
072                            RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker();
073                            TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion();
074
075                            String candidateSubName = getSubscriberName(dest);
076                            for (Subscription subscription : topicRegion.getDurableSubscriptions().values()) {
077                                String subName = subscription.getConsumerInfo().getSubscriptionName();
078                                if (subName != null && subName.equals(candidateSubName)) {
079                                    DemandSubscription sub = createDemandSubscription(dest, subName);
080                                    sub.getLocalInfo().setSubscriptionName(getSubscriberName(dest));
081                                    sub.setStaticallyIncluded(true);
082                                    addSubscription(sub);
083                                    break;
084                                }
085                            }
086                        }
087                    } catch (IOException e) {
088                        LOG.error("Failed to add static destination {}", dest, e);
089                    }
090                    LOG.trace("Forwarding messages for durable destination: {}", dest);
091                } else if (configuration.isSyncDurableSubs() && !isPermissableDestination(dest)) {
092                    if (dest.isTopic()) {
093                        RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker();
094                        TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion();
095
096                        String candidateSubName = getSubscriberName(dest);
097                        for (Subscription subscription : topicRegion.getDurableSubscriptions().values()) {
098                            String subName = subscription.getConsumerInfo().getSubscriptionName();
099                            if (subName != null && subName.equals(candidateSubName) &&
100                                    subscription instanceof DurableTopicSubscription) {
101                               try {
102                                    DurableTopicSubscription durableSub = (DurableTopicSubscription) subscription;
103                                    //check the clientId so we only remove subs for the matching bridge
104                                    if (durableSub.getSubscriptionKey().getClientId().equals(localClientId)) {
105                                        // remove the NC subscription as it is no longer for a permissible dest
106                                        RemoveSubscriptionInfo sending = new RemoveSubscriptionInfo();
107                                        sending.setClientId(localClientId);
108                                        sending.setSubscriptionName(subName);
109                                        sending.setConnectionId(this.localConnectionInfo.getConnectionId());
110                                        localBroker.oneway(sending);
111                                    }
112                                } catch (IOException e) {
113                                    LOG.debug("Exception removing NC durable subscription: {}", subName, e);
114                                    serviceRemoteException(e);
115                                }
116                                break;
117                            }
118                        }
119                    }
120                }
121            }
122        }
123    }
124
125    @Override
126    protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException {
127        boolean isForcedDurable = NetworkBridgeUtils.isForcedDurable(info,
128                dynamicallyIncludedDestinations, staticallyIncludedDestinations);
129
130        if (addToAlreadyInterestedConsumers(info, isForcedDurable)) {
131            return null; // don't want this subscription added
132        }
133        //add our original id to ourselves
134        info.addNetworkConsumerId(info.getConsumerId());
135        ConsumerId forcedDurableId = isForcedDurable ? info.getConsumerId() : null;
136
137        if(info.isDurable() || isForcedDurable) {
138            // set the subscriber name to something reproducible
139            info.setSubscriptionName(getSubscriberName(info.getDestination()));
140            // and override the consumerId with something unique so that it won't
141            // be removed if the durable subscriber (at the other end) goes away
142            info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(),
143                               consumerIdGenerator.getNextSequenceId()));
144        }
145        info.setSelector(null);
146        DemandSubscription demandSubscription = doCreateDemandSubscription(info);
147        if (forcedDurableId != null) {
148            demandSubscription.addForcedDurableConsumer(forcedDurableId);
149            forcedDurableRemoteId.add(forcedDurableId);
150        }
151        return demandSubscription;
152    }
153
154    protected String getSubscriberName(ActiveMQDestination dest) {
155        String subscriberName = DURABLE_SUB_PREFIX + configuration.getBrokerName() + "_" + dest.getPhysicalName();
156        return subscriberName;
157    }
158
159    protected boolean doesConsumerExist(ActiveMQDestination dest) {
160        DestinationFilter filter = DestinationFilter.parseFilter(dest);
161        for (DemandSubscription ds : subscriptionMapByLocalId.values()) {
162            if (filter.matches(ds.getLocalInfo().getDestination())) {
163                return true;
164            }
165        }
166        return false;
167    }
168}