001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.nio;
018
019import java.nio.channels.CancelledKeyException;
020import java.nio.channels.ClosedChannelException;
021import java.nio.channels.SelectionKey;
022import java.nio.channels.SocketChannel;
023import java.util.concurrent.atomic.AtomicBoolean;
024
025import org.apache.activemq.transport.nio.SelectorManager.Listener;
026
027/**
028 * @author chirino
029 */
030public final class SelectorSelection {
031
032    private final SelectorWorker worker;
033    private final Listener listener;
034    private int interest;
035    private SelectionKey key;
036    private AtomicBoolean closed = new AtomicBoolean();
037
038    public SelectorSelection(final SelectorWorker worker, final SocketChannel socketChannel, Listener listener) throws ClosedChannelException {
039        this.worker = worker;
040        this.listener = listener;
041        worker.addIoTask(new Runnable() {
042            public void run() {
043                try {
044                    SelectorSelection.this.key = socketChannel.register(worker.selector, 0, SelectorSelection.this);
045                } catch (Exception e) {
046                    e.printStackTrace();
047                }
048            }
049        });
050    }
051
052    public void setInterestOps(int ops) {
053        interest = ops;
054    }
055
056    public void enable() {
057        worker.addIoTask(new Runnable() {
058            public void run() {
059                try {
060                    key.interestOps(interest);
061                } catch (CancelledKeyException e) {
062                }
063            }
064        });        
065    }
066
067    public void disable() {
068        worker.addIoTask(new Runnable() {
069            public void run() {
070                try {
071                    key.interestOps(0);
072                } catch (CancelledKeyException e) {
073                }
074            }
075        });        
076    }
077
078    public void close() {
079        // guard against multiple closes.
080        if( closed.compareAndSet(false, true) ) {
081            worker.addIoTask(new Runnable() {
082                public void run() {
083                    try {
084                        key.cancel();
085                    } catch (CancelledKeyException e) {
086                    }
087                    worker.release();
088                }
089            });        
090        }
091    }
092
093    public void onSelect() {
094        listener.onSelect(this);
095    }
096
097    public void onError(Throwable e) {
098        listener.onError(this, e);
099    }
100
101}