00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043 #ifndef _ONE_PHASE_PULL_HH_
00044 #define _ONE_PHASE_PULL_HH_
00045
00046 #ifdef HAVE_CONFIG_H
00047 #include "config.h"
00048 #endif // HAVE_CONFIG_H
00049
00050 #include <algorithm>
00051 #include "diffapp.hh"
00052
00053 #ifdef NS_DIFFUSION
00054 #include <tcl.h>
00055 #include "diffagent.h"
00056 #else
00057 #include "main/hashutils.hh"
00058 #endif // NS_DIFFUSION
00059
00060 #define ONE_PHASE_PULL_FILTER_PRIORITY 80
00061
00062 class OPPGradientEntry {
00063 public:
00064 OPPGradientEntry(int32_t node_id) : node_id_(node_id)
00065 {
00066 GetTime(&tv_);
00067 };
00068
00069 int32_t node_id_;
00070 struct timeval tv_;
00071 };
00072
00073 typedef list<OPPGradientEntry *> GradientList;
00074
00075 class SinkEntry {
00076 public:
00077 SinkEntry(u_int16_t port) : port_(port)
00078 {
00079 GetTime(&tv_);
00080 };
00081
00082 u_int16_t port_;
00083 struct timeval tv_;
00084 };
00085
00086 typedef list<SinkEntry *> SinkList;
00087
00088 class OPPDataNeighborEntry {
00089 public:
00090 OPPDataNeighborEntry(int32_t node_id) : node_id_(node_id)
00091 {
00092 messages_ = 1;
00093 };
00094
00095 int32_t node_id_;
00096 int messages_;
00097 bool new_messages_;
00098 };
00099
00100 typedef list<OPPDataNeighborEntry *> DataNeighborList;
00101
00102 class SubscriptionEntry {
00103 public:
00104 SubscriptionEntry(NRAttrVec *attrs) : attrs_(attrs)
00105 {
00106 GetTime(&tv_);
00107 };
00108
00109 ~SubscriptionEntry()
00110 {
00111 ClearAttrs(attrs_);
00112 delete attrs_;
00113 };
00114
00115 struct timeval tv_;
00116 NRAttrVec *attrs_;
00117 };
00118
00119 typedef list<SubscriptionEntry *> SubscriptionList;
00120
00121 typedef list<int> FlowIdList;
00122
00123 class RoundIdEntry {
00124 public:
00125 RoundIdEntry(int32_t round_id) : round_id_(round_id)
00126 {
00127 GetTime(&tv_);
00128 };
00129
00130 ~RoundIdEntry()
00131 {
00132 GradientList::iterator gradient_itr;
00133 SinkList::iterator sink_itr;
00134
00135
00136 for (gradient_itr = gradients_.begin();
00137 gradient_itr != gradients_.end(); gradient_itr++){
00138 delete (*gradient_itr);
00139 }
00140 gradients_.clear();
00141
00142
00143 for (sink_itr = sinks_.begin(); sink_itr != sinks_.end(); sink_itr++){
00144 delete (*sink_itr);
00145 }
00146 sinks_.clear();
00147 };
00148
00149 OPPGradientEntry * findGradient(int32_t node_id);
00150 void deleteGradient(int32_t node_id);
00151 void addGradient(int32_t node_id);
00152 void updateSink(u_int16_t sink_id);
00153 void deleteExpiredSinks();
00154 void deleteExpiredGradients();
00155
00156 int32_t round_id_;
00157 struct timeval tv_;
00158 GradientList gradients_;
00159 SinkList sinks_;
00160 };
00161
00162 typedef list<RoundIdEntry *> RoundIdList;
00163
00164 class RoutingEntry {
00165 public:
00166 RoutingEntry() {
00167 GetTime(&tv_);
00168 };
00169
00170 ~RoutingEntry() {
00171 DataNeighborList::iterator data_neighbor_itr;
00172 RoundIdList::iterator round_id_itr;
00173 SubscriptionList::iterator subscription_itr;
00174
00175
00176 ClearAttrs(attrs_);
00177 delete attrs_;
00178
00179
00180 for (subscription_itr = subscription_list_.begin();
00181 subscription_itr != subscription_list_.end();
00182 subscription_itr++){
00183 delete (*subscription_itr);
00184 }
00185 subscription_list_.clear();
00186
00187
00188 for (round_id_itr = round_ids_.begin(); round_id_itr != round_ids_.end(); round_id_itr++){
00189 delete (*round_id_itr);
00190 }
00191 round_ids_.clear();
00192
00193
00194 for (data_neighbor_itr = data_neighbors_.begin(); data_neighbor_itr != data_neighbors_.end(); data_neighbor_itr++){
00195 delete (*data_neighbor_itr);
00196 }
00197 data_neighbors_.clear();
00198 };
00199
00200 RoundIdEntry * findRoundIdEntry(int32_t round_id);
00201 RoundIdEntry * addRoundIdEntry(int32_t round_id);
00202 void updateNeighborDataInfo(int32_t node_id, bool new_message);
00203 void addGradient(int32_t last_hop, int32_t round_id, bool new_gradient);
00204 void updateSink(u_int16_t sink_id, int32_t round_id);
00205 void deleteExpiredRoundIds();
00206 void getSinksFromList(FlowIdList *msg_list, FlowIdList *sink_list);
00207 void getFlowsFromList(FlowIdList *msg_list, FlowIdList *flow_list);
00208 int32_t getNeighborFromFlow(int32_t flow_id);
00209
00210 struct timeval tv_;
00211 NRAttrVec *attrs_;
00212 RoundIdList round_ids_;
00213 SubscriptionList subscription_list_;
00214 DataNeighborList data_neighbors_;
00215 };
00216
00217 typedef list<RoutingEntry *> RoutingTable;
00218 class OnePhasePullFilter;
00219
00220 class OnePhasePullFilterReceive : public FilterCallback {
00221 public:
00222 OnePhasePullFilterReceive(OnePhasePullFilter *filter) : filter_(filter) {};
00223 void recv(Message *msg, handle h);
00224
00225 OnePhasePullFilter *filter_;
00226 };
00227
00228 class DataForwardingHistory {
00229 public:
00230 DataForwardingHistory()
00231 {
00232 data_reinforced_ = false;
00233 };
00234
00235 ~DataForwardingHistory()
00236 {
00237 node_list_.clear();
00238 sink_list_.clear();
00239 };
00240
00241 bool alreadyForwardedToNetwork(int32_t node_id)
00242 {
00243 list<int32_t>::iterator list_itr;
00244
00245 list_itr = find(node_list_.begin(), node_list_.end(), node_id);
00246 if (list_itr == node_list_.end())
00247 return false;
00248 return true;
00249 };
00250
00251 bool alreadyForwardedToLibrary(u_int16_t sink_id)
00252 {
00253 list<u_int16_t>::iterator list_itr;
00254
00255 list_itr = find(sink_list_.begin(), sink_list_.end(), sink_id);
00256 if (list_itr == sink_list_.end())
00257 return false;
00258 return true;
00259 };
00260
00261 bool alreadyReinforced()
00262 {
00263 return data_reinforced_;
00264 };
00265
00266 void sendingReinforcement()
00267 {
00268 data_reinforced_ = true;
00269 };
00270
00271 void forwardingToNetwork(int32_t node_id)
00272 {
00273 node_list_.push_back(node_id);
00274 };
00275
00276 void forwardingToLibrary(u_int16_t sink_id)
00277 {
00278 sink_list_.push_back(sink_id);
00279 };
00280
00281 private:
00282 list<int32_t> node_list_;
00283 list<u_int16_t> sink_list_;
00284 bool data_reinforced_;
00285 };
00286
00287 class OnePhasePullFilter : public DiffApp {
00288 public:
00289 #ifdef NS_DIFFUSION
00290 OnePhasePullFilter(const char *dr);
00291 int command(int argc, const char*const* argv);
00292 void run() {}
00293 #else
00294 OnePhasePullFilter(int argc, char **argv);
00295 void run();
00296 #endif // NS_DIFFUSION
00297
00298 virtual ~OnePhasePullFilter()
00299 {
00300
00301 };
00302
00303 void recv(Message *msg, handle h);
00304
00305
00306 void messageTimeout(Message *msg);
00307 void interestTimeout(Message *msg);
00308 void gradientTimeout();
00309 void reinforcementTimeout();
00310 int subscriptionTimeout(NRAttrVec *attrs);
00311
00312 protected:
00313
00314
00315 handle filter_handle_;
00316 int pkt_count_;
00317 int random_id_;
00318
00319
00320 OnePhasePullFilterReceive *filter_callback_;
00321
00322
00323 RoutingTable routing_list_;
00324
00325
00326 handle setupFilter();
00327
00328
00329 RoutingEntry * findRoutingEntry(NRAttrVec *attrs);
00330 void deleteRoutingEntry(RoutingEntry *routing_entry);
00331 RoutingEntry * matchRoutingEntry(NRAttrVec *attrs, RoutingTable::iterator start, RoutingTable::iterator *place);
00332 SubscriptionEntry * findMatchingSubscription(RoutingEntry *routing_entry, NRAttrVec *attrs);
00333
00334
00335 void sendInterest(NRAttrVec *attrs, RoutingEntry *routing_entry);
00336 void sendDisinterest(NRAttrVec *attrs, RoutingEntry *routing_entry);
00337 void forwardData(Message *msg, RoutingEntry *routing_entry,
00338 DataForwardingHistory *forwarding_history);
00339
00340
00341 void processOldMessage(Message *msg);
00342 void processNewMessage(Message *msg);
00343
00344
00345 void addLocalFlowsToMessage(Message *msg);
00346 void readFlowsFromList(int number_of_flows, FlowIdList *flow_list,
00347 void *source_blob);
00348 int * writeFlowsToList(FlowIdList *flow_list);
00349 bool removeFlowFromList(FlowIdList *flow_list, int32_t flow);
00350 };
00351
00352 class OppGradientExpirationCheckTimer : public TimerCallback {
00353 public:
00354 OppGradientExpirationCheckTimer(OnePhasePullFilter *agent) : agent_(agent) {};
00355 ~OppGradientExpirationCheckTimer() {};
00356 int expire();
00357
00358 OnePhasePullFilter *agent_;
00359 };
00360
00361 class OppReinforcementCheckTimer : public TimerCallback {
00362 public:
00363 OppReinforcementCheckTimer(OnePhasePullFilter *agent) : agent_(agent) {};
00364 ~OppReinforcementCheckTimer() {};
00365 int expire();
00366
00367 OnePhasePullFilter *agent_;
00368 };
00369
00370 class OppMessageSendTimer : public TimerCallback {
00371 public:
00372 OppMessageSendTimer(OnePhasePullFilter *agent, Message *msg) :
00373 agent_(agent), msg_(msg) {};
00374 ~OppMessageSendTimer()
00375 {
00376 delete msg_;
00377 };
00378 int expire();
00379
00380 OnePhasePullFilter *agent_;
00381 Message *msg_;
00382 };
00383
00384 class OppInterestForwardTimer : public TimerCallback {
00385 public:
00386 OppInterestForwardTimer(OnePhasePullFilter *agent, Message *msg) :
00387 agent_(agent), msg_(msg) {};
00388 ~OppInterestForwardTimer()
00389 {
00390 delete msg_;
00391 };
00392 int expire();
00393
00394 OnePhasePullFilter *agent_;
00395 Message *msg_;
00396 };
00397
00398 class OppSubscriptionExpirationTimer : public TimerCallback {
00399 public:
00400 OppSubscriptionExpirationTimer(OnePhasePullFilter *agent, NRAttrVec *attrs) :
00401 agent_(agent), attrs_(attrs) {};
00402 ~OppSubscriptionExpirationTimer()
00403 {
00404 ClearAttrs(attrs_);
00405 delete attrs_;
00406 };
00407 int expire();
00408
00409 OnePhasePullFilter *agent_;
00410 NRAttrVec *attrs_;
00411 };
00412
00413 #endif // !_ONE_PHASE_PULL_HH_