`

Java与Flex通信

阅读更多

Flex连接Java Server

首先向Server发送了

 

final static String security_req = "<policy-file-request/>"; 

   

继而Server端返回

 

public final String flexString = "<cross-domain-policy>\n" + " <allow-access-from domain=\"*\" to-ports=\"*\" />\n" + "</cross-domain-policy>";

 

可以开始正常通信。Client端和Server端输出如下:

 

Client:9099

> <policy-file-request/>
...sending <policy-file-request/>...
<cross-domain-policy>
  <allow-access-from  domain="*"  to-ports="*"  />
</cross-domain-policy>
> 123
...sending 123...
alarm

'''
Created on 2009-3-18

@author: Administrator
'''

from twisted.internet import protocol, reactor
from time import ctime

HOST = '192.168.0.93'
PORT = 9099

class TSClntProtocol(protocol.Protocol):
    def sendData(self):
        data = raw_input('> ')
        if data:
            print '...sending %s...' % data
            self.transport.write(data)
        else:
            self.transport.loseConnection()
            
    def connectionMade(self):
        self.sendData() 
        
    def dataReceived(self, data):
        print data
        self.sendData()
        
class TSClntFactory(protocol.ClientFactory):
    protocol = TSClntProtocol
    clientConnectionLost = clientConnectionFailed = lambda self, connector, reason:reactor.stop()
    
reactor.connectTCP(HOST, PORT, TSClntFactory())
reactor.run()

 

 

 

Client:9090

telnet 至9090端口并输入即可。

 

Server

package server.tcp.receive;

// $Id: ServerImpCopy.java,v 1.3 2009/03/27 03:59:05 cvsjyy Exp $

import java.io.IOException;
import java.net.BindException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;

import server.alarm.NBlockingServer;
import server.log.LoggerFactory;

public class ServerImpCopy implements IServer {
	// The port we will listen on
	private int port;

	// A pre-allocated buffer for encrypting data
	private ByteBuffer btBffr;

	//
	private ServerSocketChannel srvrScktChnnl;

	private ServerSocket srvrSckt;

	private InetSocketAddress isa;

	private Selector selector;

	/**
	 * */
	private Logger logger = LoggerFactory.initLogger();

	private Queue queue = null;

	// private boolean initialFlag = true;

	public ServerImpCopy(Queue queue) {
		this.queue = queue;
	}

	/**
	 * @param port
	 *            The port to set.
	 */
	public void setPort(int port) {
		this.port = port;
	}

	private void init() throws IOException {

		this.btBffr = ByteBuffer.allocate(2048);
		this.srvrScktChnnl = ServerSocketChannel.open();

		// Set it to non-blocking, so we can use select
		this.srvrScktChnnl.configureBlocking(false);

		// Get the Socket connected to this channel, and bind it
		// to the listening port
		this.srvrSckt = srvrScktChnnl.socket();
		this.isa = new InetSocketAddress(this.port);
		// ss.bind(isa);
		try {
			srvrSckt.bind(isa);
		} catch (BindException e) {
			logger.error("Unable to bind to port " + isa);

			System.exit(1);

		}

		// Create a new Selector for selecting
		selector = Selector.open();

		// Register the ServerSocketChannel, so we can
		// listen for incoming connections
		srvrScktChnnl.register(selector, SelectionKey.OP_ACCEPT,
				new ChannelBuffer());
	}

	public void run() throws IOException {
		init();
		while (true) {
			// See if we've had any activity -- either
			// an incoming connection, or incoming data on an
			// existing connection
			int num = selector.select();

			// If we don't have any activity, loop around and wait
			// again
			if (num == 0) {
				continue;
			}

			// Get the keys corresponding to the activity
			// that has been detected, and process them
			// one by one
			Set<?> keys = selector.selectedKeys();
			Iterator<?> it = keys.iterator();
			while (it.hasNext()) {
				// Get a key representing one of bits of I/O
				// activity
				SelectionKey slctonk = (SelectionKey) it.next();

				// What kind of activity is it?
				if ((slctonk.readyOps() & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) {
					logger.log(Level.INFO, "accept");
					// It's an incoming connection.
					// Register this socket with the Selector
					// so we can listen for input on it

					Socket socket = srvrSckt.accept();
					logger.log(Level.INFO, "Got connection from " + socket);

					// Make sure to make it non-blocking, so we can
					// use a selector on it.
					SocketChannel scktChnnl = socket.getChannel();
					scktChnnl.configureBlocking(false);

					// Register it with the selector, for reading
					scktChnnl.register(selector, SelectionKey.OP_READ,
							new ChannelBuffer());
				} else if ((slctonk.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {

					SocketChannel scktChnnl = null;

					try {
						// It's incoming data on a connection, so
						// process it
						scktChnnl = (SocketChannel) slctonk.channel();
						ChannelBuffer channelBuffer = (ChannelBuffer) slctonk
								.attachment();
						byte[] result = processInput(scktChnnl, channelBuffer);

						// If the connection is dead, then remove it
						// from the selector and close it
						if (result.length == 0) {
							slctonk.attach(null);
							slctonk.cancel();
							logger.info("release the attachement 1.");
							Socket s = null;
							try {
								s = scktChnnl.socket();
								s.close();
							} catch (IOException ie) {
								logger.error("Error closing socket " + s + ": "
										+ ie);
							}
						}

						/*
						 * echo
						 */
						ByteBuffer buf = ByteBuffer.allocate(result.length);
						buf.put(result);
						buf.flip();
						int nbytes = scktChnnl.write(buf);
						logger.info("nbytes is: " + nbytes);
						/*
						 * broadcast
						 */
						Set<?> bKeys = this.selector.keys();
						int i = 0;

						Iterator<?> bIt = bKeys.iterator();
						while (bIt.hasNext()) {
							SelectionKey bKey = (SelectionKey) bIt.next();
							logger.info("flex connection:");
							logger.info(i);
							i++;
							if ((bKey.interestOps() & SelectionKey.OP_READ) != 0) {
								SelectableChannel channel = (SelectableChannel) bKey
										.channel();

								SocketChannel sChannel = (SocketChannel) channel;
								ByteBuffer bBuf = ByteBuffer
										.allocate(result.length);
								bBuf.put(result);
								bBuf.flip();
								int tmpnbytes = sChannel.write(bBuf);
								logger.info("tmpnbytes is: " + tmpnbytes);
								logger.info("ContentsAlarm send to Flex!");
								// writeToChannel((SocketChannel) channel,
								// "alarm to XZC");

							} else
								continue;
						}
						/* send to flex */
						NBlockingServer ns = NBlockingServer
								.getNBlockingServer();
						ns.reportAlarm("alarm");

					} catch (IOException ie) {

						// On exception, remove this channel from the
						// selector
						slctonk.attach(null);
						slctonk.cancel();
						logger.info("release the attachement 2.");
						try {
							scktChnnl.close();
						} catch (IOException ie2) {
							logger.error(ie2);
						}
						logger.error("Closed " + scktChnnl);
					}
				}
			}

			// We remove the selected keys, because we've dealt
			// with them.
			keys.clear();
		}

	} // Do some cheesy encryption on the incoming data,

	// and send it back out
	private byte[] processInput(SocketChannel sc, ChannelBuffer channelBuffer)
			throws IOException {
		logger.info("method of processInput start");

		byte[] result = new byte[0];

		btBffr.clear();
		sc.read(btBffr);
		btBffr.flip();

		// If no data, close the connection
		if (btBffr.limit() == 0) {
			return result;
		}

		/**/
		btBffr.position(0);

		/**/
		result = new byte[btBffr.limit()];

		btBffr.get(result);

		/**/
		logger.info("Processed " + btBffr.limit() + " from " + sc);

		/**/
		// channelBuffer.setBuffer(tmpBuffer);
		/**/
		logger.info("method of processInput end ");
		return result;
	}

	public static void main(String args[]) throws Exception {

		// ApplicationContext context = SpringContext.getContext();
		// IServer tcpServer = (IServer) context.getBean("tcpReceiver");
		// send alarm to flex
		Thread t1 = new Thread(NBlockingServer.getNBlockingServer());
		t1.start();
		//
		// // Consumer Thread
		// PckgHandler ph = new PckgHandler(Queue.getQueue());
		// Thread t2 = new Thread(ph);
		// t2.start();

		// main thread
		int port = 9090;
		// IServer tcpServer = (IServer) SpringContext.getBean("tcpReceiver");
		ServerImpCopy tcpServer = new ServerImpCopy(Queue.getQueue());
		tcpServer.setPort(port);
		tcpServer.run();

	}

}

 

 

 

Server执行其他逻辑,在得到某些信息后,取得Socket的句柄,向Flex发送信息。另外,回想我们以前写过的NIO的客户端回显及广播的程序。我们尝试将在9090端口得到的数据除了在发送端回显,并向9090的所有Client广播,此外我们将通知Flex,也就是9099Client一个alarm

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics