00001 #include "ConnectionFactory.h"
00002
00006 string * attemptHeader(BufferedFile * file);
00010 int server(int port);
00011
00012 ConnectionFactory::ConnectionFactory(BufferedFileFactory * bf,int start) {
00013 init(bf,start,start+1,start+2,start+3);
00014 }
00015 ConnectionFactory::ConnectionFactory(BufferedFileFactory * bf,int inputPort,int outputPort,int filterPort,int configPort) {
00016 init(bf,inputPort,outputPort,filterPort,configPort);
00017 }
00018
00019 bool ConnectionFactory::isThereANewConnection() {
00020
00021
00022
00023
00024
00025 int newConnections = 0;
00026
00027 if (inputBF->readReady()) {
00028 newConnections++;
00029 }
00030 if (outputBF->readReady()) {
00031 newConnections++;
00032 }
00033 if (filterBF->readReady()) {
00034 newConnections++;
00035 }
00036 if (configBF->readReady()) {
00037 newConnections++;
00038 }
00039 return (newConnections > 0);
00040 }
00041
00042 #define MAXHEADERSIZE 4096
00043
00044 string * attemptHeader(BufferedFile * file) {
00045 char a[MAXHEADERSIZE+1];
00046 char * data = NULL;
00047 string work;
00048 string hStart = "<header>";
00049 string hClose = "</header>";
00050 int oldWait = file->getWait();
00051 string * buff = NULL;
00052 file->setWait(10000);
00053
00054 if (file->ready(BUFFER_READ)) {
00055 int count = file->Read((char*)a,MAXHEADERSIZE);
00056 if (count > 0) {
00057 a[count] = '\0';
00058 work = string(a);
00059 int headerStart = work.find("<header>",0);
00060 int headerEnd = work.find("</header>",0);
00061 if (headerStart!= (int)string::npos && headerEnd != (int)string::npos && headerStart < headerEnd && headerEnd) {
00062 buff = new string(work.substr(headerStart,headerEnd + hClose.size()));
00063 headerEnd = headerEnd+hClose.size();
00064 } else {
00065 headerStart = headerEnd = 0;
00066 }
00067 if (headerEnd < count) {
00068 data = new char[count-headerEnd];
00069 memcpy(data,(char*)a+(headerEnd),count-headerEnd);
00070 file->pushBack(data,count-headerEnd);
00071 }
00072 }
00073 }
00074 file->setWait(oldWait);
00075 return buff;
00076 }
00077 BufferedFile * ConnectionFactory::acceptAndGetFile(int fdin) {
00078 struct sockaddr_in clientname;
00079 int size = sizeof (struct sockaddr);
00080 int fd = accept(fdin, (struct sockaddr *) &clientname, &(socklen_t)size);
00081 cerr << "FileDescriptor: " << fd << "\n";
00082 return bf->getNewBufferedFile(fd);
00083 }
00084 int max(int a, int b) {
00085 if (a > b) {
00086 return a;
00087 }
00088 return b;
00089 }
00090
00091
00092
00093
00094
00095
00096
00097
00098
00099
00100
00101
00102 ConnectionWrap * ConnectionFactory::processConnection() {
00103 BufferedFile * file = NULL;
00104 string * header = NULL;
00105 Connection<char> * charConn = NULL;
00106 Connection<short> * shortConn = NULL;
00107 Connection<float> * floatConn = NULL;
00108 Connection<double> * doubleConn = NULL;
00109
00110 if (configBF->readReady()) {
00111
00112 file = acceptAndGetFile(configBF->getFileHandle());
00113 ConfigConnection * config = new ConfigConnection(file);
00114 ConnectionWrap * wrap = new ConnectionWrap(config,CONFIGCONN);
00115 return wrap;
00116 }
00117 bool input,output,filter;
00118 input = output = filter = false;
00119 string name="";
00120
00121 if (inputBF->readReady()) {
00122 file = acceptAndGetFile(inputfd);
00123 input = true;
00124 name+="input";
00125
00126 } else if (outputBF->readReady()) {
00127 file = acceptAndGetFile(outputfd);
00128 output = true;
00129 name+="ouput";
00130
00131 } else if (filterBF->readReady()) {
00132 file = acceptAndGetFile(filterfd);
00133 filter = true;
00134 name+="filter";
00135 } else {
00136 return NULL;
00137 }
00138 char number[12];
00139 snprintf(number,12,"%d",file->getFileHandle());
00140 name+=string(number);
00141 cerr << "FD: " << file->getFileHandle() << "\n";
00142 header = attemptHeader(file);
00143 string type = "short";
00144 if (header!=NULL) {
00145 Message * msg = new Message(*header);
00146 delete header;
00147 TreeNode * node = msg->getTree();
00148 try {
00149 if (node->existChild("header")) {
00150 node = node->getChild("header");
00151 if (node->existChild("name")) {
00152 name = node->getChild("name")->getValue();
00153 }
00154 if (node->existChild("type")) {
00155 type = node->getChild("type")->getValue();
00156 }
00157 }
00158 } catch (string * err) {}
00159 delete msg;
00160 }
00161
00162
00163
00164 ConnectionWrap * wrap = new ConnectionWrap();
00165 if (input) {
00166 file->setReadOnly();
00167 } else if (output) {
00168 file->setWriteOnly();
00169 }
00170 if (type == "short") {
00171 if (input) { (shortConn) = (Connection<short>*)(new InputConnection<short>(file,name)); shortConn->setType("short");}
00172 else if (output) { (shortConn) = (Connection<short>*)(new OutputConnection<short>(file,name));shortConn->setType("short");}
00173 else if (filter) { (shortConn) = (Connection<short>*)(new FilterConnection<short>(file,name));shortConn->setType("short");}
00174
00175 wrap->type = SHORTCONN;
00176 wrap->conn.shortConn = shortConn;
00177 } else if (type == "byte") {
00178 if (input) { (charConn) = (Connection<char>*)(new InputConnection<short>(file,name)); charConn->setType("byte");}
00179 else if (output) { (charConn) = (Connection<char>*)(new OutputConnection<short>(file,name));charConn->setType("byte");}
00180 else if (filter) { (charConn) = (Connection<char>*)(new FilterConnection<short>(file,name));charConn->setType("byte");}
00181
00182 wrap->type = CHARCONN;
00183 wrap->conn.charConn = charConn;
00184 } else if (type == "float") {
00185 if (input) { (floatConn) = (Connection<float>*)new InputConnection<short>(file,name); floatConn->setType("float");}
00186 else if (output) { (floatConn) = (Connection<float>*)new OutputConnection<short>(file,name);floatConn->setType("float");}
00187 else if (filter) { (floatConn) = (Connection<float>*)new FilterConnection<short>(file,name);floatConn->setType("float");}
00188
00189 wrap->type = FLOATCONN;
00190 wrap->conn.floatConn = floatConn;
00191 } else if (type == "double") {
00192 if (input) { (doubleConn) = (Connection<double>*) new InputConnection<short>(file,name); doubleConn->setType("double");}
00193 else if (output) { (doubleConn) = (Connection<double>*) new OutputConnection<short>(file,name);doubleConn->setType("double");}
00194 else if (filter) { (doubleConn) = (Connection<double>*) new FilterConnection<short>(file,name);doubleConn->setType("double");}
00195
00196 wrap->type = DOUBLECONN;
00197 wrap->conn.doubleConn = doubleConn;
00198 }
00199 if (wrap == NULL || wrap->conn.voidStar == NULL) {
00200 return NULL;
00201 }
00202 return wrap;
00203 }
00204
00205 void ConnectionFactory::init(BufferedFileFactory * bff,int inputPort,int outputPort,int filterPort,int configPort) {
00206 if (inputPort) {
00207 inputfd = server(inputPort);
00208 }
00209 if (outputPort) {
00210 outputfd = server(outputPort);
00211 }
00212 if (filterPort) {
00213 filterfd = server(filterPort);
00214 }
00215 if (configPort) {
00216 configfd = server(configPort);
00217 }
00218
00219
00220
00221
00222
00223
00224
00225 bf = bff;
00226 if (inputPort) {
00227 inputBF = bf->getNewBufferedFile(inputfd);
00228 inputBF->setReadOnly();
00229 } else {
00230 inputBF = new BufferedFile(-1);
00231 }
00232 if (outputPort) {
00233 outputBF = bf->getNewBufferedFile(outputfd);
00234 outputBF->setReadOnly();
00235 } else {
00236 outputBF = new BufferedFile(-1);
00237 }
00238 if (filterPort) {
00239 filterBF = bf->getNewBufferedFile(filterfd);
00240 filterBF->setReadOnly();
00241 } else {
00242 filterBF = new BufferedFile(-1);
00243 }
00244 if (configPort) {
00245 configBF = bf->getNewBufferedFile(configfd);
00246 configBF->setReadOnly();
00247 } else {
00248 configBF = new BufferedFile(-1);
00249 }
00250 }
00251
00252 int server(int port) {
00253 int serverfd;
00254 struct sockaddr_in serveraddr;
00255 serverfd = socket(AF_INET,SOCK_STREAM,0);
00256 bzero(&serveraddr,sizeof(serveraddr));
00257 serveraddr.sin_family = AF_INET;
00258 serveraddr.sin_port = htons(port);
00259 serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
00260 bind(serverfd, (struct sockaddr *)&serveraddr,sizeof(serveraddr));
00261 listen(serverfd,30);
00262 fprintf(stderr,"Waiting for connection on port %d\n",port);
00263 return serverfd;
00264 }
00265 ConnectionFactory::~ConnectionFactory() {
00266 inputBF->Close();
00267 outputBF->Close();
00268 filterBF->Close();
00269 configBF->Close();
00270 delete inputBF;
00271 delete outputBF;
00272 delete filterBF;
00273 delete configBF;
00274 }