00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055 #ifndef ns_srm_h
00056 #define ns_srm_h
00057
00058 #include <math.h>
00059 #include <tcl.h>
00060
00061 #include "config.h"
00062
00063 #include "srm-state.h"
00064 #include "srm-headers.h"
00065
00066 class SRMAgent : public Agent {
00067 protected:
00068 int dataCtr_;
00069 int sessCtr_;
00070 int packetSize_;
00071 SRMinfo* sip_;
00072 Tcl_HashTable* siphash_;
00073 int groupSize_;
00074 int seqno_;
00075 int app_fid_;
00076 packet_t app_type_;
00077
00078 virtual void start() {
00079 int new_entry = 0;
00080 long key = addr();
00081
00082 sip_->sender_ = addr();
00083 sip_->distance_ = 0.0;
00084 sip_->next_ = NULL;
00085
00086 siphash_ = new Tcl_HashTable;
00087 Tcl_InitHashTable(siphash_, TCL_ONE_WORD_KEYS);
00088 Tcl_HashEntry* he = Tcl_CreateHashEntry(siphash_,
00089 (char*) key,
00090 &new_entry);
00091 Tcl_SetHashValue(he, (ClientData*)sip_);
00092 groupSize_++;
00093 }
00094 SRMinfo* get_state(int sender) {
00095 assert(siphash_);
00096
00097 int new_entry = 0;
00098 long key = sender;
00099 Tcl_HashEntry* he = Tcl_CreateHashEntry(siphash_,
00100 (char*) key,
00101 &new_entry);
00102 if (new_entry) {
00103 groupSize_++;
00104 SRMinfo* tmp = new SRMinfo(sender);
00105 tmp->next_ = sip_->next_;
00106 sip_->next_ = tmp;
00107 Tcl_SetHashValue(he, (ClientData*)tmp);
00108 }
00109 return (SRMinfo*)Tcl_GetHashValue(he);
00110 }
00111 virtual void cleanup () {
00112 Tcl_DeleteHashTable(siphash_);
00113 }
00114
00115 virtual void addExtendedHeaders(Packet*) {}
00116 virtual void parseExtendedHeaders(Packet*) {}
00117 virtual int request(SRMinfo* sp, int hi) {
00118 int miss = 0;
00119 if (sp->ldata_ >= hi)
00120 return miss;
00121
00122 int maxsize = ((int)log10(hi + 1) + 2) * (hi - sp->ldata_);
00123
00124
00125
00126
00127 char* msgids = new char[maxsize + 1];
00128 *msgids = '\0';
00129 for (int i = sp->ldata_ + 1; i <= hi; i++)
00130 if (! sp->ifReceived(i)) {
00131 (void) sprintf(msgids, "%s %d", msgids, i);
00132 miss++;
00133 }
00134 assert(miss);
00135 Tcl::instance().evalf("%s request %d %s", name_,
00136 sp->sender_, msgids);
00137 delete[] msgids;
00138 return miss;
00139 }
00140
00141 virtual void recv_data(int sender, int msgid, u_char* data);
00142 virtual void recv_repr(int round, int sender, int msgid, u_char* data);
00143 virtual void recv_rqst(int requestr, int round, int sender, int msgid);
00144 virtual void recv_sess(Packet*, int sessCtr, int* data);
00145
00146 virtual void send_ctrl(int typ, int rnd, int sndr, int msgid, int sz);
00147 virtual void send_sess();
00148 public:
00149 SRMAgent();
00150 virtual ~SRMAgent();
00151 virtual int command(int argc, const char*const* argv);
00152 virtual void recv(Packet* p, Handler* h);
00153 virtual void sendmsg(int nbytes, const char *flags = 0);
00154 virtual void send(int nbytes) { sendmsg(nbytes); }
00155 };
00156
00157 class ASRMAgent : public SRMAgent {
00158 double pdistance_;
00159 int requestor_;
00160 public:
00161 ASRMAgent() {
00162 bind("pdistance_", &pdistance_);
00163 bind("requestor_", &requestor_);
00164 }
00165 protected:
00166 virtual void addExtendedHeaders(Packet* p) {
00167 SRMinfo* sp;
00168 hdr_srm* sh = hdr_srm::access(p);
00169 hdr_asrm* seh = hdr_asrm::access(p);
00170 switch (sh->type()) {
00171 case SRM_RQST:
00172 sp = get_state(sh->sender());
00173 seh->distance() = sp->distance_;
00174 break;
00175 case SRM_REPR:
00176 sp = get_state(requestor_);
00177 seh->distance() = sp->distance_;
00178 break;
00179 case SRM_DATA:
00180 case SRM_SESS:
00181 seh->distance() = 0.;
00182 break;
00183 default:
00184 assert(0);
00185
00186 }
00187 SRMAgent::addExtendedHeaders(p);
00188 }
00189 virtual void parseExtendedHeaders(Packet* p) {
00190 SRMAgent::parseExtendedHeaders(p);
00191 hdr_asrm* seh = hdr_asrm::access(p);
00192 pdistance_ = seh->distance();
00193 }
00194 };
00195
00196 #endif