Below is the list of changes that have just been committed into a local
5.1 repository of tomas. When tomas does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/in...urce-tree.html
ChangeSet
1.1831 05/03/31 00:35:21 tomas (AT) poseidon (DOT) ndb.mysql.com +19 -0
reengineered suma for performance
added test case for node restart and events
ndb/test/src/HugoTransactions.cpp
1.24 05/03/31 00:35:16 tomas (AT) poseidon (DOT) ndb.mysql.com +0 -222
added nore restart tests for events
updated event test
ndb/test/ndbapi/test_event_multi_table.cpp
1.5 05/03/31 00:35:16 tomas (AT) poseidon (DOT) ndb.mysql.com +93 -16
added nore restart tests for events
updated event test
ndb/test/ndbapi/test_event.cpp
1.9 05/03/31 00:35:16 tomas (AT) poseidon (DOT) ndb.mysql.com +233 -10
added nore restart tests for events
updated event test
ndb/test/include/HugoTransactions.hpp
1.7 05/03/31 00:35:16 tomas (AT) poseidon (DOT) ndb.mysql.com +0 -2
added nore restart tests for events
updated event test
ndb/src/ndbapi/ndberror.c
1.32 05/03/31 00:35:16 tomas (AT) poseidon (DOT) ndb.mysql.com +1 -0
reengineered suma for performance
ndb/src/ndbapi/Ndbif.cpp
1.32 05/03/31 00:35:16 tomas (AT) poseidon (DOT) ndb.mysql.com +5 -5
reengineered suma for performance
ndb/src/ndbapi/NdbEventOperationImpl.cpp
1.26 05/03/31 00:35:16 tomas (AT) poseidon (DOT) ndb.mysql.com +1 -1
dbug printouts
ndb/src/kernel/blocks/trix/Trix.hpp
1.4 05/03/31 00:35:16 tomas (AT) poseidon (DOT) ndb.mysql.com +0 -2
reengineered suma for performance
ndb/src/kernel/blocks/trix/Trix.cpp
1.8 05/03/31 00:35:16 tomas (AT) poseidon (DOT) ndb.mysql.com +68 -62
reengineered suma for performance
ndb/src/kernel/blocks/suma/SumaInit.cpp
1.8 05/03/31 00:35:16 tomas (AT) poseidon (DOT) ndb.mysql.com +7 -12
reengineered suma for performance
ndb/src/kernel/blocks/suma/Suma.hpp
1.6 05/03/31 00:35:16 tomas (AT) poseidon (DOT) ndb.mysql.com +131 -126
reengineered suma for performance
ndb/src/kernel/blocks/suma/Suma.cpp
1.22 05/03/31 00:35:16 tomas (AT) poseidon (DOT) ndb.mysql.com +1519 -1821
reengineered suma for performance
ndb/src/kernel/blocks/dbdict/Dbdict.hpp
1.11 05/03/31 00:35:16 tomas (AT) poseidon (DOT) ndb.mysql.com +1 -3
reengineered suma for performance
ndb/src/kernel/blocks/dbdict/Dbdict.cpp
1.42 05/03/31 00:35:16 tomas (AT) poseidon (DOT) ndb.mysql.com +74 -113
reengineered suma for performance
ndb/src/kernel/blocks/Makefile.am
1.3 05/03/31 00:35:16 tomas (AT) poseidon (DOT) ndb.mysql.com +0 -1
removed grep block from code
ndb/src/common/debugger/signaldata/SumaImpl.cpp
1.3 05/03/31 00:35:16 tomas (AT) poseidon (DOT) ndb.mysql.com +20 -30
reengineered suma for performance
ndb/include/kernel/signaldata/SumaImpl.hpp
1.5 05/03/31 00:35:16 tomas (AT) poseidon (DOT) ndb.mysql.com +46 -102
reengineered suma for performance
ndb/include/kernel/signaldata/DropTab.hpp
1.3 05/03/31 00:35:16 tomas (AT) poseidon (DOT) ndb.mysql.com +1 -0
drop tab forwarded to suma
ndb/include/kernel/signaldata/CreateTab.hpp
1.2 05/03/31 00:35:15 tomas (AT) poseidon (DOT) ndb.mysql.com +1 -0
create tab forwarded to suma
# This is a BitKeeper patch. What follows are the unified diffs for the
# set of deltas contained in the patch. The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User: tomas
# Host: poseidon.ndb.mysql.com
# Root: /home/tomas/mysql-5.1-wl2325
--- 1.1/ndb/include/kernel/signaldata/CreateTab.hpp 2004-04-14 10:23:53 +02:00
+++ 1.2/ndb/include/kernel/signaldata/CreateTab.hpp 2005-03-31 00:35:15 +02:00
@@ -91,6 +91,7 @@
* Sender(s) / Reciver(s)
*/
friend class Dbdict;
+ friend class SumaParticipant;
/**
* For printing
--- 1.2/ndb/include/kernel/signaldata/DropTab.hpp 2004-05-26 13:23:33 +02:00
+++ 1.3/ndb/include/kernel/signaldata/DropTab.hpp 2005-03-31 00:35:16 +02:00
@@ -61,6 +61,7 @@
friend class Dbtup;
friend class Dbtux;
friend class Dbdih;
+ friend class SumaParticipant;
/**
* Receiver(s)
--- 1.4/ndb/include/kernel/signaldata/SumaImpl.hpp 2005-03-21 21:59:41 +01:00
+++ 1.5/ndb/include/kernel/signaldata/SumaImpl.hpp 2005-03-31 00:35:16 +02:00
@@ -30,7 +30,7 @@
friend bool printSUB_CREATE_REQ(FILE *, const Uint32 *, Uint32, Uint16);
public:
- STATIC_CONST( SignalLength = 5 );
+ STATIC_CONST( SignalLength = 6 );
enum SubscriptionType {
SingleTableScan = 1, //
@@ -43,17 +43,12 @@
RestartFlag = 0x2 << 16
};
- Uint32 subscriberRef;
- Uint32 subscriberData;
+ Uint32 senderRef;
+ Uint32 senderData;
Uint32 subscriptionId;
Uint32 subscriptionKey;
Uint32 subscriptionType;
- union {
- Uint32 tableId; // Used when doing SingelTableScan
- };
- SECTION( ATTRIBUTE_LIST = 0); // Used when doing SingelTableScan
- SECTION( TABLE_LIST = 1 );
-
+ Uint32 tableId;
};
class SubCreateRef {
@@ -65,19 +60,11 @@
friend bool printSUB_CREATE_REF(FILE *, const Uint32 *, Uint32, Uint16);
public:
- STATIC_CONST( SignalLength = 6 );
-
- Uint32 subscriberRef;
- Uint32 subscriberData;
- Uint32 subscriptionId;
- Uint32 subscriptionKey;
- Uint32 subscriptionType;
- Uint32 err;
+ STATIC_CONST( SignalLength = 3 );
- SECTION( ATTRIBUTE_LIST = 0); // Used when doing SingelTableScan
- union {
- Uint32 tableId; // Used when doing SingelTableScan
- };
+ Uint32 senderRef;
+ Uint32 senderData;
+ Uint32 errorCode;
};
class SubCreateConf {
@@ -89,11 +76,10 @@
friend bool printSUB_CREATE_CONF(FILE *, const Uint32 *, Uint32, Uint16);
public:
- STATIC_CONST( SignalLength = 3 );
+ STATIC_CONST( SignalLength = 2 );
- Uint32 subscriberData;
- Uint32 subscriptionId;
- Uint32 subscriptionKey;
+ Uint32 senderRef;
+ Uint32 senderData;
};
class SubscriptionData {
@@ -147,10 +133,8 @@
Uint32 subscriptionKey;
Uint32 part; // SubscriptionData::Part
Uint32 subscriberData;
- union { // do not change the order here!
- Uint32 err;
- Uint32 errorCode;
- };
+ // do not change the order here!
+ Uint32 errorCode;
// with SignalLength2
Uint32 subscriberRef;
};
@@ -218,10 +202,7 @@
Uint32 part; // SubscriptionData::Part
Uint32 subscriberData;
Uint32 subscriberRef;
- union {
- Uint32 err;
- Uint32 errorCode;
- };
+ Uint32 errorCode;
};
class SubStopConf {
@@ -252,13 +233,17 @@
friend bool printSUB_SYNC_REQ(FILE *, const Uint32 *, Uint32, Uint16);
public:
- STATIC_CONST( SignalLength = 4 );
+ STATIC_CONST( SignalLength = 5 );
public:
+ Uint32 senderRef;
+ Uint32 senderData;
Uint32 subscriptionId;
Uint32 subscriptionKey;
- Uint32 subscriberData;
Uint32 part; // SubscriptionData::Part
+
+ SECTION( ATTRIBUTE_LIST = 0); // Used when doing SingelTableScan
+ SECTION( TABLE_LIST = 1 );
};
class SubSyncRef {
@@ -273,16 +258,11 @@
enum ErrorCode {
Undefined = 1
};
- STATIC_CONST( SignalLength = 5 );
+ STATIC_CONST( SignalLength = 3 );
- Uint32 subscriptionId;
- Uint32 subscriptionKey;
- Uint32 part; // SubscriptionData::Part
- Uint32 subscriberData;
- union {
- Uint32 errorCode;
- Uint32 err;
- };
+ Uint32 senderRef;
+ Uint32 senderData;
+ Uint32 errorCode;
};
class SubSyncConf {
@@ -295,12 +275,10 @@
friend bool printSUB_SYNC_CONF(FILE *, const Uint32 *, Uint32, Uint16);
public:
- STATIC_CONST( SignalLength = 4 );
+ STATIC_CONST( SignalLength = 2 );
- Uint32 subscriptionId;
- Uint32 subscriptionKey;
- Uint32 part; // SubscriptionData::Part
- Uint32 subscriberData;
+ Uint32 senderRef;
+ Uint32 senderData;
};
class SubMetaData {
@@ -316,13 +294,8 @@
SECTION( DICT_TAB_INFO = 0 );
Uint32 gci;
- union { // Haven't decide what to call it
- Uint32 senderData;
- Uint32 subscriberData;
- };
- union {
- Uint32 tableId;
- };
+ Uint32 senderData;
+ Uint32 tableId;
};
class SubTableData {
@@ -347,10 +320,7 @@
bool isGCIConsistent()
{ return (logType & (Uint32)GCINOTCONSISTENT) == 0 ? true : false; };
- union { // Haven't decide what to call it
- Uint32 senderData;
- Uint32 subscriberData;
- };
+ Uint32 senderData;
Uint32 gci;
Uint32 tableId;
Uint32 operation;
@@ -371,10 +341,7 @@
public:
STATIC_CONST( SignalLength = 2 );
- union { // Haven't decide what to call it
- Uint32 senderData;
- Uint32 subscriberData;
- };
+ Uint32 subscriberData;
Uint32 noOfRowsSent;
};
@@ -426,10 +393,7 @@
Uint32 gci;
Uint32 senderRef;
- union { // Haven't decide what to call it
- Uint32 senderData;
- Uint32 subscriberData;
- };
+ Uint32 senderData;
};
class SubGcpCompleteAcc {
@@ -478,14 +442,8 @@
Uint32 senderRef;
Uint32 subscriptionId;
Uint32 subscriptionKey;
- union {
- Uint32 err;
- Uint32 errorCode;
- };
- union { // Haven't decide what to call it
- Uint32 senderData;
- Uint32 subscriberData;
- };
+ Uint32 errorCode;
+ Uint32 senderData;
};
class SubRemoveConf {
@@ -502,12 +460,8 @@
Uint32 senderRef;
Uint32 subscriptionId;
Uint32 subscriptionKey;
- Uint32 err;
- union { // Haven't decide what to call it
- Uint32 senderData;
- Uint32 subscriberData;
- };
-
+ Uint32 errorCode;
+ Uint32 senderData;
};
@@ -518,14 +472,10 @@
friend bool printCREATE_SUBSCRIPTION_ID_REQ(FILE *, const Uint32 *,
Uint32, Uint16);
public:
- STATIC_CONST( SignalLength = 3 );
+ STATIC_CONST( SignalLength = 2 );
- Uint32 subscriptionId;
- Uint32 subscriptionKey;
- union { // Haven't decide what to call it
- Uint32 senderData;
- Uint32 subscriberData;
- };
+ Uint32 senderRef;
+ Uint32 senderData;
};
@@ -536,14 +486,12 @@
friend bool printCREATE_SUBSCRIPTION_ID_CONF(FILE *, const Uint32 *,
Uint32, Uint16);
public:
- STATIC_CONST( SignalLength = 3 );
+ STATIC_CONST( SignalLength = 4 );
+ Uint32 senderRef;
+ Uint32 senderData;
Uint32 subscriptionId;
Uint32 subscriptionKey;
- union { // Haven't decide what to call it
- Uint32 senderData;
- Uint32 subscriberData;
- };
};
@@ -554,15 +502,11 @@
friend bool printCREATE_SUBSCRIPTION_ID_REF(FILE *, const Uint32 *,
Uint32, Uint16);
public:
- STATIC_CONST( SignalLength = 4 );
+ STATIC_CONST( SignalLength = 3 );
- Uint32 subscriptionId;
- Uint32 subscriptionKey;
- union { // Haven't decide what to call it
- Uint32 senderData;
- Uint32 subscriberData;
- };
- Uint32 err;
+ Uint32 senderRef;
+ Uint32 senderData;
+ Uint32 errorCode;
};
class SumaStartMe {
--- 1.2/ndb/src/common/debugger/signaldata/SumaImpl.cpp 2005-01-12 22:09:34 +01:00
+++ 1.3/ndb/src/common/debugger/signaldata/SumaImpl.cpp 2005-03-31 00:35:16 +02:00
@@ -20,8 +20,8 @@
printSUB_CREATE_REQ(FILE * output, const Uint32 * theData,
Uint32 len, Uint16 receiverBlockNo) {
const SubCreateReq * const sig = (SubCreateReq *)theData;
- fprintf(output, " subscriberRef: %x\n", sig->subscriberRef);
- fprintf(output, " subscriberData: %x\n", sig->subscriberData);
+ fprintf(output, " senderRef: %x\n", sig->senderRef);
+ fprintf(output, " senderData: %x\n", sig->senderData);
fprintf(output, " subscriptionId: %x\n", sig->subscriptionId);
fprintf(output, " subscriptionKey: %x\n", sig->subscriptionKey);
fprintf(output, " subscriptionType: %x\n", sig->subscriptionType);
@@ -33,9 +33,7 @@
printSUB_CREATE_CONF(FILE * output, const Uint32 * theData,
Uint32 len, Uint16 receiverBlockNo) {
const SubCreateConf * const sig = (SubCreateConf *)theData;
- fprintf(output, " subscriptionId: %x\n", sig->subscriptionId);
- fprintf(output, " subscriptionKey: %x\n", sig->subscriptionKey);
- fprintf(output, " subscriberData: %x\n", sig->subscriberData);
+ fprintf(output, " senderData: %x\n", sig->senderData);
return false;
}
@@ -43,9 +41,7 @@
printSUB_CREATE_REF(FILE * output, const Uint32 * theData,
Uint32 len, Uint16 receiverBlockNo) {
const SubCreateRef * const sig = (SubCreateRef *)theData;
- fprintf(output, " subscriptionId: %x\n", sig->subscriptionId);
- fprintf(output, " subscriptionKey: %x\n", sig->subscriptionKey);
- fprintf(output, " subscriberData: %x\n", sig->subscriberData);
+ fprintf(output, " senderData: %x\n", sig->senderData);
return false;
}
@@ -66,7 +62,7 @@
const SubRemoveConf * const sig = (SubRemoveConf *)theData;
fprintf(output, " subscriptionId: %x\n", sig->subscriptionId);
fprintf(output, " subscriptionKey: %x\n", sig->subscriptionKey);
- fprintf(output, " subscriberData: %x\n", sig->subscriberData);
+ fprintf(output, " senderData: %x\n", sig->senderData);
return false;
}
@@ -77,8 +73,8 @@
const SubRemoveRef * const sig = (SubRemoveRef *)theData;
fprintf(output, " subscriptionId: %x\n", sig->subscriptionId);
fprintf(output, " subscriptionKey: %x\n", sig->subscriptionKey);
- fprintf(output, " subscriberData: %x\n", sig->subscriberData);
- fprintf(output, " err: %x\n", sig->err);
+ fprintf(output, " senderData: %x\n", sig->senderData);
+ fprintf(output, " errorCode: %x\n", sig->errorCode);
return false;
}
@@ -88,7 +84,7 @@
const SubStartReq * const sig = (SubStartReq *)theData;
fprintf(output, " subscriptionId: %x\n", sig->subscriptionId);
fprintf(output, " subscriptionKey: %x\n", sig->subscriptionKey);
- fprintf(output, " subscriberData: %x\n", sig->subscriberData);
+ fprintf(output, " senderData: %x\n", sig->senderData);
return false;
}
@@ -99,8 +95,8 @@
fprintf(output, " subscriptionId: %x\n", sig->subscriptionId);
fprintf(output, " subscriptionKey: %x\n", sig->subscriptionKey);
fprintf(output, " startPart: %x\n", sig->part);
- fprintf(output, " subscriberData: %x\n", sig->subscriberData);
- fprintf(output, " err: %x\n", sig->err);
+ fprintf(output, " senderData: %x\n", sig->senderData);
+ fprintf(output, " errorCode: %x\n", sig->errorCode);
return false;
}
@@ -111,7 +107,7 @@
fprintf(output, " subscriptionId: %x\n", sig->subscriptionId);
fprintf(output, " subscriptionKey: %x\n", sig->subscriptionKey);
fprintf(output, " startPart: %x\n", sig->part);
- fprintf(output, " subscriberData: %x\n", sig->subscriberData);
+ fprintf(output, " senderData: %x\n", sig->senderData);
return false;
}
@@ -121,7 +117,7 @@
const SubStopReq * const sig = (SubStopReq *)theData;
fprintf(output, " subscriptionId: %x\n", sig->subscriptionId);
fprintf(output, " subscriptionKey: %x\n", sig->subscriptionKey);
- fprintf(output, " subscriberData: %x\n", sig->subscriberData);
+ fprintf(output, " senderData: %x\n", sig->senderData);
return false;
}
@@ -131,8 +127,8 @@
const SubStopRef * const sig = (SubStopRef *)theData;
fprintf(output, " subscriptionId: %x\n", sig->subscriptionId);
fprintf(output, " subscriptionKey: %x\n", sig->subscriptionKey);
- fprintf(output, " subscriberData: %x\n", sig->subscriberData);
- fprintf(output, " err: %x\n", sig->err);
+ fprintf(output, " senderData: %x\n", sig->senderData);
+ fprintf(output, " errorCode: %x\n", sig->errorCode);
return false;
}
@@ -142,7 +138,7 @@
const SubStopConf * const sig = (SubStopConf *)theData;
fprintf(output, " subscriptionId: %x\n", sig->subscriptionId);
fprintf(output, " subscriptionKey: %x\n", sig->subscriptionKey);
- fprintf(output, " subscriberData: %x\n", sig->subscriberData);
+ fprintf(output, " senderData: %x\n", sig->senderData);
return false;
}
@@ -160,11 +156,8 @@
printSUB_SYNC_REF(FILE * output, const Uint32 * theData,
Uint32 len, Uint16 receiverBlockNo) {
const SubSyncRef * const sig = (SubSyncRef *)theData;
- fprintf(output, " subscriptionId: %x\n", sig->subscriptionId);
- fprintf(output, " subscriptionKey: %x\n", sig->subscriptionKey);
- fprintf(output, " syncPart: %x\n", sig->part);
- fprintf(output, " subscriberData: %x\n", sig->subscriberData);
- fprintf(output, " err: %x\n", sig->err);
+ fprintf(output, " senderData: %x\n", sig->senderData);
+ fprintf(output, " errorCode: %x\n", sig->errorCode);
return false;
}
@@ -172,10 +165,7 @@
printSUB_SYNC_CONF(FILE * output, const Uint32 * theData,
Uint32 len, Uint16 receiverBlockNo) {
const SubSyncConf * const sig = (SubSyncConf *)theData;
- fprintf(output, " subscriptionId: %x\n", sig->subscriptionId);
- fprintf(output, " subscriptionKey: %x\n", sig->subscriptionKey);
- fprintf(output, " syncPart: %x\n", sig->part);
- fprintf(output, " subscriberData: %x\n", sig->subscriberData);
+ fprintf(output, " senderData: %x\n", sig->senderData);
return false;
}
@@ -185,7 +175,7 @@
const SubMetaData * const sig = (SubMetaData *)theData;
fprintf(output, " gci: %x\n", sig->gci);
fprintf(output, " senderData: %x\n", sig->senderData);
- fprintf(output, " subscriberData: %x\n", sig->subscriberData);
+ fprintf(output, " senderData: %x\n", sig->senderData);
fprintf(output, " tableId: %x\n", sig->tableId);
return false;
}
@@ -195,7 +185,7 @@
Uint32 len, Uint16 receiverBlockNo) {
const SubTableData * const sig = (SubTableData *)theData;
fprintf(output, " senderData: %x\n", sig->senderData);
- fprintf(output, " subscriberData: %x\n", sig->subscriberData);
+ fprintf(output, " senderData: %x\n", sig->senderData);
fprintf(output, " gci: %x\n", sig->gci);
fprintf(output, " tableId: %x\n", sig->tableId);
fprintf(output, " operation: %x\n", sig->operation);
--- 1.41/ndb/src/kernel/blocks/dbdict/Dbdict.cpp 2005-03-22 02:39:37 +01:00
+++ 1.42/ndb/src/kernel/blocks/dbdict/Dbdict.cpp 2005-03-31 00:35:16 +02:00
@@ -1143,9 +1143,6 @@
addRecSignal(GSN_SUB_STOP_CONF, &Dbdict::execSUB_STOP_CONF);
addRecSignal(GSN_SUB_STOP_REF, &Dbdict::execSUB_STOP_REF);
- addRecSignal(GSN_SUB_SYNC_CONF, &Dbdict::execSUB_SYNC_CONF);
- addRecSignal(GSN_SUB_SYNC_REF, &Dbdict::execSUB_SYNC_REF);
-
addRecSignal(GSN_DROP_EVNT_REQ, &Dbdict::execDROP_EVNT_REQ);
addRecSignal(GSN_SUB_REMOVE_REQ, &Dbdict::execSUB_REMOVE_REQ);
@@ -2505,13 +2502,27 @@
c_tableRecordPool.getPtr(tabPtr, createTabPtr.p->m_tablePtrI);
tabPtr.p->tabState = TableRecord:

EFINED;
- c_opCreateTable.release(createTabPtr);
+ releaseCreateTableOp(signal,createTabPtr);
c_restartRecord.activeTable++;
checkSchemaStatus(signal);
}
void
+Dbdict::releaseCreateTableOp(Signal* signal, CreateTableRecordPtr createTabPtr)
+{
+ if (createTabPtr.p->m_tabInfoPtrI != RNIL)
+ {
+ jam();
+ SegmentedSectionPtr tabInfoPtr;
+ getSection(tabInfoPtr, createTabPtr.p->m_tabInfoPtrI);
+ signal->setSection(tabInfoPtr, 0);
+ releaseSections(signal);
+ }
+ c_opCreateTable.release(createTabPtr);
+}
+
+void
Dbdict::restartDropTab(Signal* signal, Uint32 tableId){
const Uint32 key = ++c_opRecordSequence;
@@ -3458,7 +3469,7 @@
TableRecordPtr tabPtr;
c_tableRecordPool.getPtr(tabPtr, regAlterTabPtr->m_tablePtrI);
releaseTableObject(tabPtr.i, false);
- c_opCreateTable.release(alterTabPtr);
+ releaseCreateTableOp(signal,alterTabPtr);
c_blockState = BS_IDLE;
}
else {
@@ -3573,6 +3584,7 @@
writeTableFile(signal, tableId, tabInfoPtr, &callback);
+ regAlterTabPtr->m_tabInfoPtrI = RNIL;
signal->setSection(tabInfoPtr, 0);
releaseSections(signal);
}
@@ -3605,7 +3617,7 @@
// Release resources
c_tableRecordPool.getPtr(tabPtr, regAlterTabPtr->m_tablePtrI);
releaseTableObject(tabPtr.i, false);
- c_opCreateTable.release(alterTabPtr);
+ releaseCreateTableOp(signal,alterTabPtr);
c_blockState = BS_IDLE;
}
}
@@ -3796,7 +3808,7 @@
//@todo check api failed
sendSignal(createTabPtr.p->m_senderRef, GSN_CREATE_TABLE_REF, signal,
CreateTableRef::SignalLength, JBB);
- c_opCreateTable.release(createTabPtr);
+ releaseCreateTableOp(signal,createTabPtr);
c_blockState = BS_IDLE;
return;
}
@@ -3855,7 +3867,7 @@
//@todo check api failed
sendSignal(createTabPtr.p->m_senderRef, GSN_CREATE_TABLE_CONF, signal,
CreateTableConf::SignalLength, JBB);
- c_opCreateTable.release(createTabPtr);
+ releaseCreateTableOp(signal,createTabPtr);
c_blockState = BS_IDLE;
return;
}
@@ -3986,10 +3998,11 @@
SegmentedSectionPtr tabInfoPtr;
getSection(tabInfoPtr, createTabPtr.p->m_tabInfoPtrI);
writeTableFile(signal, createTabPtr.p->m_tablePtrI, tabInfoPtr, &callback);
-
+#if 0
createTabPtr.p->m_tabInfoPtrI = RNIL;
signal->setSection(tabInfoPtr, 0);
releaseSections(signal);
+#endif
}
void
@@ -4457,12 +4470,28 @@
CreateTabConf * const conf = (CreateTabConf*)signal->getDataPtr();
conf->senderRef = reference();
conf->senderData = createTabPtr.p->key;
+ {
+ CreateTabConf tmp= *conf;
+ conf->senderData = createTabPtr.p->m_tablePtrI;
+#if 0
+ signal->header.m_noOfSections = 1;
+ SegmentedSectionPtr tabInfoPtr;
+ getSection(tabInfoPtr, createTabPtr.p->m_tabInfoPtrI);
+ signal->setSection(tabInfoPtr, 0);
+#endif
+ sendSignal(SUMA_REF, GSN_CREATE_TAB_CONF, signal,
+ CreateTabConf::SignalLength, JBB);
+ *conf= tmp;
+#if 0
+ signal->header.m_noOfSections = 0;
+#endif
+ }
sendSignal(createTabPtr.p->m_coordinatorRef, GSN_CREATE_TAB_CONF,
signal, CreateTabConf::SignalLength, JBB);
if(createTabPtr.p->m_coordinatorRef != reference()){
jam();
- c_opCreateTable.release(createTabPtr);
+ releaseCreateTableOp(signal,createTabPtr);
}
}
@@ -4531,7 +4560,7 @@
if(createTabPtr.p->m_coordinatorRef != reference()){
jam();
- c_opCreateTable.release(createTabPtr);
+ releaseCreateTableOp(signal,createTabPtr);
}
c_opDropTable.release(dropTabPtr);
@@ -5303,12 +5332,6 @@
conf->senderData = dropTabPtr.p->m_request.senderData;
conf->tableId = dropTabPtr.p->m_request.tableId;
conf->tableVersion = dropTabPtr.p->m_request.tableVersion;
- {
- DropTableConf tmp= *conf;
- EXECUTE_DIRECT(SUMA, GSN_DROP_TABLE_CONF, signal,
- DropTableConf::SignalLength);
- *conf= tmp;
- }
Uint32 ref = dropTabPtr.p->m_request.senderRef;
sendSignal(ref, GSN_DROP_TABLE_CONF, signal,
DropTableConf::SignalLength, JBB);
@@ -5568,7 +5591,13 @@
conf->senderRef = reference();
conf->senderData = dropTabPtrI;
conf->tableId = dropTabPtr.p->m_request.tableId;
-
+ {
+ DropTabConf tmp= *conf;
+ EXECUTE_DIRECT(SUMA, GSN_DROP_TAB_CONF, signal,
+ DropTabConf::SignalLength);
+ jamEntry();
+ *conf= tmp;
+ }
dropTabPtr.p->m_participantData.m_gsn = GSN_DROP_TAB_CONF;
sendSignal(dropTabPtr.p->m_coordinatorRef, GSN_DROP_TAB_CONF, signal,
DropTabConf::SignalLength, JBB);
@@ -5715,7 +5744,7 @@
* The format of GetTabInfo Req/Ref is the same
*/
BlockReference retRef = req->senderRef;
- ref->err = errorCode;
+ ref->err = errorCode;
sendSignal(retRef, GSN_GET_TABLEID_REF, signal,
GetTableIdRef::SignalLength, JBB);
}//sendGET_TABINFOREF()
@@ -7485,6 +7514,7 @@
(CreateSubscriptionIdReq *)signal->getDataPtrSend();
// make sure we save the original sender for later
+ sumaIdReq->senderRef = reference();
sumaIdReq->senderData = evntRecPtr.i;
#ifdef EVENT_DEBUG
ndbout << "sumaIdReq->senderData = " << sumaIdReq->senderData << endl;
@@ -7506,8 +7536,8 @@
evntRecPtr.i = ref->senderData;
ndbrequire((evntRecPtr.p = c_opCreateEvent.getPtr(evntRecPtr.i)) != NULL);
- if (ref->err)
- evntRecPtr.p->m_errorCode = ref->err;
+ if (ref->errorCode)
+ evntRecPtr.p->m_errorCode = ref->errorCode;
else
evntRecPtr.p->m_errorCode = 1;
evntRecPtr.p->m_errorLine = __LINE__;
@@ -8085,8 +8115,8 @@
SubCreateReq * sumaReq = (SubCreateReq *)signal->getDataPtrSend();
- sumaReq->subscriberRef = reference(); // reference to DICT
- sumaReq->subscriberData = evntRecPtr.i;
+ sumaReq->senderRef = reference(); // reference to DICT
+ sumaReq->senderData = evntRecPtr.i;
sumaReq->subscriptionId = evntRecPtr.p->m_request.getEventId();
sumaReq->subscriptionKey = evntRecPtr.p->m_request.getEventKey();
sumaReq->subscriptionType = SubCreateReq::TableEvent;
@@ -8097,112 +8127,51 @@
#endif
sendSignal(SUMA_REF, GSN_SUB_CREATE_REQ, signal,
- SubCreateReq::SignalLength+1 /*to get table Id*/, JBB);
+ SubCreateReq::SignalLength, JBB);
}
void Dbdict::execSUB_CREATE_REF(Signal* signal)
{
jamEntry();
- EVENT_TRACE;
+ DBUG_ENTER("Dbdict::execSUB_CREATE_REF");
+
SubCreateRef * const ref = (SubCreateRef *)signal->getDataPtr();
OpCreateEventPtr evntRecPtr;
- evntRecPtr.i = ref->subscriberData;
+ evntRecPtr.i = ref->senderData;
ndbrequire((evntRecPtr.p = c_opCreateEvent.getPtr(evntRecPtr.i)) != NULL);
-#ifdef EVENT_PH2_DEBUG
- ndbout_c("DBDICT(Participant) got SUB_CREATE_REF evntRecPtr.i = (%d)", evntRecPtr.i);
-#endif
-
- if (ref->err == 1415) {
+ if (ref->errorCode == 1415) {
jam();
-#ifdef EVENT_PH2_DEBUG
- ndbout_c("SUBSCRIPTION_ID_NOT_UNIQUE");
-#endif
createEvent_sendReply(signal, evntRecPtr);
- return;
+ DBUG_VOID_RETURN;
}
-#ifdef EVENT_PH2_DEBUG
- ndbout_c("Other error");
-#endif
-
- if (ref->err)
- evntRecPtr.p->m_errorCode = ref->err;
+ if (ref->errorCode)
+ evntRecPtr.p->m_errorCode = ref->errorCode;
else
evntRecPtr.p->m_errorCode = 1;
evntRecPtr.p->m_errorLine = __LINE__;
evntRecPtr.p->m_errorNode = reference();
createEvent_sendReply(signal, evntRecPtr);
+ DBUG_VOID_RETURN;
}
void Dbdict::execSUB_CREATE_CONF(Signal* signal)
{
jamEntry();
+ DBUG_ENTER("Dbdict::execSUB_CREATE_CONF");
EVENT_TRACE;
SubCreateConf * const sumaConf = (SubCreateConf *)signal->getDataPtr();
-
- const Uint32 subscriptionId = sumaConf->subscriptionId;
- const Uint32 subscriptionKey = sumaConf->subscriptionKey;
- const Uint32 evntRecId = sumaConf->subscriberData;
-
- OpCreateEvent *evntRec;
- ndbrequire((evntRec = c_opCreateEvent.getPtr(evntRecId)) != NULL);
-
-#ifdef EVENT_PH2_DEBUG
- ndbout_c("DBDICT(Participant) got SUB_CREATE_CONF evntRecPtr.i = (%d)", evntRecId);
-#endif
-
- SubSyncReq *sumaSync = (SubSyncReq *)signal->getDataPtrSend();
-
- sumaSync->subscriptionId = subscriptionId;
- sumaSync->subscriptionKey = subscriptionKey;
- sumaSync->part = (Uint32) SubscriptionData::MetaData;
- sumaSync->subscriberData = evntRecId;
-
- sendSignal(SUMA_REF, GSN_SUB_SYNC_REQ, signal,
- SubSyncReq::SignalLength, JBB);
-}
-
-void Dbdict::execSUB_SYNC_REF(Signal* signal)
-{
- jamEntry();
- EVENT_TRACE;
- SubSyncRef * const ref = (SubSyncRef *)signal->getDataPtr();
OpCreateEventPtr evntRecPtr;
-
- evntRecPtr.i = ref->subscriberData;
+ evntRecPtr.i = sumaConf->senderData;
ndbrequire((evntRecPtr.p = c_opCreateEvent.getPtr(evntRecPtr.i)) != NULL);
- if (ref->err)
- evntRecPtr.p->m_errorCode = ref->err;
- else
- evntRecPtr.p->m_errorCode = 1;
- evntRecPtr.p->m_errorLine = __LINE__;
- evntRecPtr.p->m_errorNode = reference();
-
createEvent_sendReply(signal, evntRecPtr);
-}
-
-void Dbdict::execSUB_SYNC_CONF(Signal* signal)
-{
- jamEntry();
- EVENT_TRACE;
-
- SubSyncConf * const sumaSyncConf = (SubSyncConf *)signal->getDataPtr();
- // Uint32 subscriptionId = sumaSyncConf->subscriptionId;
- // Uint32 subscriptionKey = sumaSyncConf->subscriptionKey;
- OpCreateEventPtr evntRecPtr;
-
- evntRecPtr.i = sumaSyncConf->subscriberData;
- ndbrequire((evntRecPtr.p = c_opCreateEvent.getPtr(evntRecPtr.i)) != NULL);
-
- ndbrequire(sumaSyncConf->part == (Uint32)SubscriptionData::MetaData);
-
- createEvent_sendReply(signal, evntRecPtr);
+ DBUG_VOID_RETURN;
}
/************************************************** **
@@ -8326,7 +8295,7 @@
// ret->setErrorLine(__LINE__);
// ret->setErrorNode(reference());
ref->senderRef = reference();
- ref->err= SubStartRef::Busy;
+ ref->errorCode = SubStartRef::Busy;
sendSignal(origSenderRef, GSN_SUB_START_REF, signal,
SubStartRef::SignalLength2, JBB);
@@ -8387,7 +8356,7 @@
const SubStartRef* ref = (SubStartRef*) signal->getDataPtr();
Uint32 senderRef = ref->senderRef;
- Uint32 err = ref->err;
+ Uint32 err = ref->errorCode;
OpSubEventPtr subbPtr;
c_opSubEvent.getPtr(subbPtr, ref->senderData);
@@ -8529,7 +8498,7 @@
// ret->setErrorLine(__LINE__);
// ret->setErrorNode(reference());
ref->senderRef = reference();
- ref->err = SubStopRef::Busy;
+ ref->errorCode = SubStopRef::Busy;
sendSignal(origSenderRef, GSN_SUB_STOP_REF, signal,
SubStopRef::SignalLength, JBB);
@@ -8586,7 +8555,7 @@
jamEntry();
const SubStopRef* ref = (SubStopRef*) signal->getDataPtr();
Uint32 senderRef = ref->senderRef;
- Uint32 err = ref->err;
+ Uint32 err = ref->errorCode;
OpSubEventPtr subbPtr;
c_opSubEvent.getPtr(subbPtr, ref->senderData);
@@ -8676,17 +8645,9 @@
#endif
SubStopRef* ref = (SubStopRef*)signal->getDataPtrSend();
- ref->senderRef = reference();
- ref->senderData = subbPtr.p->m_senderData;
- /*
- ref->subscriptionId = subbPtr.p->m_senderData;
- ref->subscriptionKey = subbPtr.p->m_senderData;
- ref->part = subbPtr.p->m_part; // SubscriptionData::Part
- ref->subscriberData = subbPtr.p->m_subscriberData;
- ref->subscriberRef = subbPtr.p->m_subscriberRef;
- */
- ref->errorCode = subbPtr.p->m_errorCode;
-
+ ref->senderRef = reference();
+ ref->senderData = subbPtr.p->m_senderData;
+ ref->errorCode = subbPtr.p->m_errorCode;
sendSignal(subbPtr.p->m_senderRef, GSN_SUB_STOP_REF,
signal, SubStopRef::SignalLength, JBB);
@@ -8868,7 +8829,7 @@
SubRemoveRef * ref = (SubRemoveRef *)signal->getDataPtrSend();
jam();
ref->senderRef = reference();
- ref->err = SubRemoveRef::Busy;
+ ref->errorCode = SubRemoveRef::Busy;
sendSignal(origSenderRef, GSN_SUB_REMOVE_REF, signal,
SubRemoveRef::SignalLength, JBB);
@@ -8902,7 +8863,7 @@
const SubRemoveRef* ref = (SubRemoveRef*) signal->getDataPtr();
Uint32 senderRef = ref->senderRef;
- Uint32 err= ref->err;
+ Uint32 err= ref->errorCode;
if (refToBlock(senderRef) == SUMA) {
/*
--- 1.10/ndb/src/kernel/blocks/dbdict/Dbdict.hpp 2005-03-21 21:59:41 +01:00
+++ 1.11/ndb/src/kernel/blocks/dbdict/Dbdict.hpp 2005-03-31 00:35:16 +02:00
@@ -536,9 +536,6 @@
void execSUB_CREATE_CONF(Signal* signal);
void execSUB_CREATE_REF (Signal* signal);
- void execSUB_SYNC_CONF(Signal* signal);
- void execSUB_SYNC_REF (Signal* signal);
-
void execSUB_REMOVE_REQ(Signal* signal);
void execSUB_REMOVE_CONF(Signal* signal);
void execSUB_REMOVE_REF(Signal* signal);
@@ -1616,6 +1613,7 @@
/* ------------------------------------------------------------ */
// Add Table Handling
/* ------------------------------------------------------------ */
+ void releaseCreateTableOp(Signal* signal, CreateTableRecordPtr createTabPtr);
/* ------------------------------------------------------------ */
// Add Fragment Handling
--- 1.21/ndb/src/kernel/blocks/suma/Suma.cpp 2005-03-22 02:39:37 +01:00
+++ 1.22/ndb/src/kernel/blocks/suma/Suma.cpp 2005-03-31 00:35:16 +02:00
@@ -39,7 +39,8 @@
#include <signaldata/TrigAttrInfo.hpp>
#include <signaldata/CheckNodeGroups.hpp>
#include <signaldata/GCPSave.hpp>
-#include <signaldata/DropTable.hpp>
+#include <signaldata/CreateTab.hpp>
+#include <signaldata/DropTab.hpp>
#include <DebuggerNames.hpp>
@@ -80,11 +81,12 @@
*/
#define PRINT_ONLY 0
-static Uint32 g_TypeOfStart = NodeState::ST_ILLEGAL_TYPE;
void
-Suma::getNodeGroupMembers(Signal* signal) {
+Suma::getNodeGroupMembers(Signal* signal)
+{
jam();
+ DBUG_ENTER("Suma::getNodeGroupMembers");
/**
* Ask DIH for nodeGroupMembers
*/
@@ -111,13 +113,16 @@
// ndbout_c("c_noNodesInGroup=%d", c_noNodesInGroup);
ndbrequire(c_noNodesInGroup > 0); // at least 1 node in the nodegroup
-#ifdef NODEFAIL_DEBUG
+#ifndef DBUG_OFF
for (Uint32 i = 0; i < c_noNodesInGroup; i++) {
- ndbout_c ("Suma: NodeGroup %u, me %u, me in group %u, member[%u] %u",
- c_nodeGroup, getOwnNodeId(), c_idInNodeGroup,
- i, c_nodesInGroup[i]);
+ DBUG_PRINT("exit",("Suma: NodeGroup %u, me %u, me in group %u, "
+ "member[%u] %u",
+ c_nodeGroup, getOwnNodeId(), c_idInNodeGroup,
+ i, c_nodesInGroup[i]));
}
#endif
+
+ DBUG_VOID_RETURN
}
void
@@ -128,63 +133,54 @@
const Uint32 startphase = signal->theData[1];
const Uint32 typeOfStart = signal->theData[7];
- DBUG_PRINT("info",("startphase = %u, typeOfStart = %u", startphase, typeOfStart));
+ DBUG_PRINT("info",("startphase = %u, typeOfStart = %u",
+ startphase, typeOfStart));
- if(startphase == 1){
+ if(startphase == 1)
+ {
jam();
c_restartLock = true;
}
- if(startphase == 3){
+ if(startphase == 3)
+ {
jam();
- g_TypeOfStart = typeOfStart;
signal->theData[0] = reference();
sendSignal(NDBCNTR_REF, GSN_READ_NODESREQ, signal, 1, JBB);
-
-#if 0
-
- /**
- * Debug
- */
-
-
- SubscriptionPtr subPtr;
- Ptr<SyncRecord> syncPtr;
- ndbrequire(c_subscriptions.seize(subPtr));
- ndbrequire(c_syncPool.seize(syncPtr));
-
-
- ndbout_c("Suma: subPtr.i = %d syncPtr.i = %d", subPtr.i, syncPtr.i);
-
- subPtr.p->m_syncPtrI = syncPtr.i;
- subPtr.p->m_subscriptionType = SubCreateReq:

atabaseSnapshot;
- syncPtr.p->m_subscriptionPtrI = subPtr.i;
- syncPtr.p->ptrI = syncPtr.i;
- g_subPtrI = subPtr.i;
- // sendSTTORRY(signal);
-#endif
DBUG_VOID_RETURN;
}
- if(startphase == 5) {
+ if(startphase == 5)
+ {
getNodeGroupMembers(signal);
- if (g_TypeOfStart == NodeState::ST_NODE_RESTART) {
+ if (typeOfStart == NodeState::ST_NODE_RESTART ||
+ typeOfStart == NodeState::ST_INITIAL_NODE_RESTART)
+ {
jam();
- for (Uint32 i = 0; i < c_noNodesInGroup; i++) {
+ for (Uint32 i = 0; i < c_noNodesInGroup; i++)
+ {
Uint32 ref = calcSumaBlockRef(c_nodesInGroup[i]);
if (ref != reference())
+ {
+ DBUG_PRINT("info",("sent SUMA_START_ME to %u", refToNode(ref)));
sendSignal(ref, GSN_SUMA_START_ME, signal,
1 /*SumaStartMe::SignalLength*/, JBB);
+ }
}
}
}
- if(startphase == 7) {
+ if(startphase == 7)
+ {
c_restartLock = false; // may be set false earlier with HANDOVER_REQ
- if (g_TypeOfStart != NodeState::ST_NODE_RESTART) {
- for( int i = 0; i < NO_OF_BUCKETS; i++) {
- if (getResponsibleSumaNodeId(i) == refToNode(reference())) {
+ if (typeOfStart != NodeState::ST_NODE_RESTART &&
+ typeOfStart != NodeState::ST_INITIAL_NODE_RESTART)
+ {
+ for( int i = 0; i < NO_OF_BUCKETS; i++)
+ {
+ if (getResponsibleSumaNodeId(i) == refToNode(reference()))
+ {
// I'm running this bucket
DBUG_PRINT("info",("bucket %u set to true", i));
c_buckets[i].active = true;
@@ -192,15 +188,24 @@
}
}
- if(g_TypeOfStart == NodeState::ST_INITIAL_START &&
- c_masterNodeId == getOwnNodeId()) {
+ if(typeOfStart == NodeState::ST_INITIAL_START &&
+ c_masterNodeId == getOwnNodeId())
+ {
jam();
createSequence(signal);
DBUG_VOID_RETURN;
}//if
}//if
-
+ if(startphase == 9)
+ {
+ if (typeOfStart == NodeState::ST_NODE_RESTART ||
+ typeOfStart == NodeState::ST_INITIAL_NODE_RESTART)
+ {
+ // waiting for HANDOVER_REQ
+ DBUG_VOID_RETURN;
+ }
+ }
sendSTTORRY(signal);
DBUG_VOID_RETURN;
@@ -313,8 +318,9 @@
signal->theData[4] = 3;
signal->theData[5] = 5;
signal->theData[6] = 7;
- signal->theData[7] = 255; // No more start phases from missra
- sendSignal(NDBCNTR_REF, GSN_STTORRY, signal, 8, JBB);
+ signal->theData[7] = 9;
+ signal->theData[8] = 255; // No more start phases from missra
+ sendSignal(NDBCNTR_REF, GSN_STTORRY, signal, 9, JBB);
}
void
@@ -363,17 +369,26 @@
DBUG_ENTER("SumaParticipant::removeSubscribersOnNo de");
bool found = false;
- SubscriberPtr i_subbPtr;
- c_dataSubscribers.first(i_subbPtr);
- while(!i_subbPtr.isNull()){
- SubscriberPtr subbPtr = i_subbPtr;
- c_dataSubscribers.next(i_subbPtr);
- jam();
- if (refToNode(subbPtr.p->m_subscriberRef) == nodeId) {
+ KeyTable<Table>::Iterator it;
+ for(c_tables.first(it);!it.isNull();c_tables.next( it))
+ {
+ LocalDLList<Subscriber> subbs(c_subscriberPool,it.curr.p->c_subscribers);
+ SubscriberPtr i_subbPtr;
+ for(subbs.first(i_subbPtr);!i_subbPtr.isNull()

+ {
+ SubscriberPtr subbPtr = i_subbPtr;
+ subbs.next(i_subbPtr);
jam();
- c_dataSubscribers.remove(subbPtr);
- c_removeDataSubscribers.add(subbPtr);
- found = true;
+ if (refToNode(subbPtr.p->m_senderRef) == nodeId) {
+ jam();
+ subbs.remove(subbPtr);
+ c_removeDataSubscribers.add(subbPtr);
+ found = true;
+ }
+ }
+ if (subbs.isEmpty())
+ {
+ // ToDo handle this
}
}
if(found){
@@ -416,8 +431,8 @@
SubStopReq * const req = (SubStopReq*)signal->getDataPtrSend();
req->senderRef = reference();
req->senderData = subbPtr.i;
- req->subscriberRef = subbPtr.p->m_subscriberRef;
- req->subscriberData = subbPtr.p->m_subscriberData;
+ req->subscriberRef = subbPtr.p->m_senderRef;
+ req->subscriberData = subbPtr.p->m_senderData;
req->subscriptionId = subPtr.p->m_subscriptionId;
req->subscriptionKey = subPtr.p->m_subscriptionKey;
req->part = SubscriptionData::TableData;
@@ -430,31 +445,7 @@
SumaParticipant::execSUB_STOP_CONF(Signal* signal){
jamEntry();
DBUG_ENTER("SumaParticipant::execSUB_STOP_CONF");
-
- SubStopConf * const conf = (SubStopConf*)signal->getDataPtr();
-
- // Uint32 subscriberData = conf->subscriberData;
- // Uint32 subscriberRef = conf->subscriberRef;
-
- Subscription key;
- key.m_subscriptionId = conf->subscriptionId;
- key.m_subscriptionKey = conf->subscriptionKey;
-
- SubscriptionPtr subPtr;
- if(c_subscriptions.find(subPtr, key)) {
- jam();
- if (subPtr.p->m_markRemove) {
- jam();
- ndbrequire(false);
- ndbrequire(subPtr.p->m_nSubscribers > 0);
- subPtr.p->m_nSubscribers--;
- if (subPtr.p->m_nSubscribers == 0){
- jam();
- completeSubRemoveReq(signal, subPtr);
- }
- }
- }
-
+ ndbassert(signal->getNoOfSections() == 0);
sendSubStopReq(signal,true);
DBUG_VOID_RETURN;
}
@@ -463,6 +454,7 @@
SumaParticipant::execSUB_STOP_REF(Signal* signal){
jamEntry();
DBUG_ENTER("SumaParticipant::execSUB_STOP_REF");
+ ndbassert(signal->getNoOfSections() == 0);
SubStopRef * const ref = (SubStopRef*)signal->getDataPtr();
@@ -473,7 +465,7 @@
Uint32 subscriberData = ref->subscriberData;
Uint32 subscriberRef = ref->subscriberRef;
- if(ref->err != 1411){
+ if(ref->errorCode != 1411){
ndbrequire(false);
}
@@ -495,6 +487,7 @@
Suma::execNODE_FAILREP(Signal* signal){
jamEntry();
DBUG_ENTER("Suma::execNODE_FAILREP");
+ ndbassert(signal->getNoOfSections() == 0);
NodeFailRep * const rep = (NodeFailRep*)signal->getDataPtr();
@@ -648,13 +641,14 @@
jamEntry();
Uint32 tCase = signal->theData[0];
+#if 0
if(tCase >= 8000 && tCase <= 8003){
SubscriptionPtr subPtr;
c_subscriptions.getPtr(subPtr, g_subPtrI);
Ptr<SyncRecord> syncPtr;
c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI);
-
+
if(tCase == 8000){
syncPtr.p->startMeta(signal);
}
@@ -676,7 +670,7 @@
attrs.append(att, 3);
}
}
-
+#endif
if(tCase == 8004){
infoEvent("Suma: c_subscriberPool size: %d free: %d",
c_subscriberPool.getSize(),
@@ -700,117 +694,17 @@
infoEvent("Suma: c_metaSubscribers count: %d",
count_subscribers(c_metaSubscribers));
+#if 0
infoEvent("Suma: c_dataSubscribers count: %d",
count_subscribers(c_dataSubscribers));
infoEvent("Suma: c_prepDataSubscribers count: %d",
count_subscribers(c_prepDataSubscribers));
+#endif
infoEvent("Suma: c_removeDataSubscribers count: %d",
count_subscribers(c_removeDataSubscribers));
}
}
-/************************************************** ******************
- *
- * Convert a table name (db+schema+tablename) to tableId
- *
- */
-
-#if 0
-void
-SumaParticipant::convertNameToId(SubscriptionPtr subPtr, Signal * signal)
-{
- jam();
- if(subPtr.p->m_currentTable < subPtr.p->m_maxTables) {
- jam();
-
- GetTableIdReq * req = (GetTableIdReq *)signal->getDataPtrSend();
- char * tableName = subPtr.p->m_tableNames[subPtr.p->m_currentTable];
- const Uint32 strLen = strlen(tableName) + 1; // NULL Terminated
- req->senderRef = reference();
- req->senderData = subPtr.i;
- req->len = strLen;
-
- LinearSectionPtr ptr[1];
- ptr[0].p = (Uint32*)tableName;
- ptr[0].sz = strLen;
-
- sendSignal(DBDICT_REF,
- GSN_GET_TABLEID_REQ,
- signal,
- GetTableIdReq::SignalLength,
- JBB,
- ptr,
- 1);
- } else {
- jam();
- sendSubCreateConf(signal, subPtr.p->m_subscriberRef, subPtr);
- }
-}
-#endif
-
-
-void
-SumaParticipant::addTableId(Uint32 tableId,
- SubscriptionPtr subPtr, SyncRecord *psyncRec)
-{
-#ifdef NODEFAIL_DEBUG
- ndbout_c("SumaParticipant::addTableId(%u,%u,%u), current_table=%u",
- tableId, subPtr.i, psyncRec, subPtr.p->m_currentTable);
-#endif
- subPtr.p->m_tables.set(tableId);
- subPtr.p->m_currentTable++;
- if(psyncRec != NULL)
- psyncRec->m_tableList.append(&tableId, 1);
-}
-
-#if 0
-void
-SumaParticipant::execGET_TABLEID_CONF(Signal * signal)
-{
- jamEntry();
-
- GetTableIdConf* conf = (GetTableIdConf *)signal->getDataPtr();
- Uint32 tableId = conf->tableId;
- //Uint32 schemaVersion = conf->schemaVersion;
- Uint32 senderData = conf->senderData;
-
- SubscriptionPtr subPtr;
- Ptr<SyncRecord> syncPtr;
-
- c_subscriptions.getPtr(subPtr, senderData);
- c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI);
-
- /*
- * add to m_tableList
- */
- addTableId(tableId, subPtr, syncPtr.p);
-
- convertNameToId(subPtr, signal);
-}
-
-void
-SumaParticipant::execGET_TABLEID_REF(Signal * signal)
-{
- jamEntry();
- GetTableIdRef const * ref = (GetTableIdRef *)signal->getDataPtr();
- Uint32 senderData = ref->senderData;
- Uint32 err = ref->err;
-
- SubscriptionPtr subPtr;
- c_subscriptions.getPtr(subPtr, senderData);
- Uint32 subData = subPtr.p->m_subscriberData;
- SubCreateRef * reff = (SubCreateRef*)ref;
- reff->err = err;
- reff->subscriberData = subData;
- sendSignal(subPtr.p->m_subscriberRef,
- GSN_SUB_CREATE_REF,
- signal,
- SubCreateRef::SignalLength,
- JBB);
-}
-#endif
-
-
/************************************************** ***********
*
* Creation of subscription id's
@@ -821,7 +715,8 @@
Suma::execCREATE_SUBID_REQ(Signal* signal)
{
jamEntry();
-
+ DBUG_ENTER("Suma::execCREATE_SUBID_REQ");
+ ndbassert(signal->getNoOfSections() == 0);
CRASH_INSERTION(13001);
CreateSubscriptionIdReq const * req =
@@ -829,29 +724,32 @@
SubscriberPtr subbPtr;
if(!c_subscriberPool.seize(subbPtr)){
jam();
- sendSubIdRef(signal, 1412);
- return;
+ sendSubIdRef(signal, req->senderRef, req->senderData, 1412);
+ DBUG_VOID_RETURN;
}
+ DBUG_PRINT("info",("c_subscriberPool size: %d free: %d",
+ c_subscriberPool.getSize(),
+ c_subscriberPool.getNoOfFree()));
- subbPtr.p->m_subscriberRef = signal->getSendersBlockRef();
- subbPtr.p->m_senderData = req->senderData;
- subbPtr.p->m_subscriberData = subbPtr.i;
+ subbPtr.p->m_senderRef = req->senderRef;
+ subbPtr.p->m_senderData = req->senderData;
UtilSequenceReq * utilReq = (UtilSequenceReq*)signal->getDataPtrSend();
-
- utilReq->senderData = subbPtr.p->m_subscriberData;
+ utilReq->senderData = subbPtr.i;
utilReq->sequenceId = SUMA_SEQUENCE;
utilReq->requestType = UtilSequenceReq::NextVal;
sendSignal(DBUTIL_REF, GSN_UTIL_SEQUENCE_REQ,
signal, UtilSequenceReq::SignalLength, JBB);
+
+ DBUG_VOID_RETURN;
}
void
Suma::execUTIL_SEQUENCE_CONF(Signal* signal)
{
jamEntry();
-
DBUG_ENTER("Suma::execUTIL_SEQUENCE_CONF");
+ ndbassert(signal->getNoOfSections() == 0);
CRASH_INSERTION(13002);
UtilSequenceConf * conf = (UtilSequenceConf*)signal->getDataPtr();
@@ -863,22 +761,22 @@
Uint64 subId;
memcpy(&subId,conf->sequenceValue,8);
- Uint32 subData = conf->senderData;
-
SubscriberPtr subbPtr;
- c_subscriberPool.getPtr(subbPtr,subData);
-
+ c_subscriberPool.getPtr(subbPtr,conf->senderData);
CreateSubscriptionIdConf * subconf = (CreateSubscriptionIdConf*)conf;
+ subconf->senderRef = reference();
+ subconf->senderData = subbPtr.p->m_senderData;
subconf->subscriptionId = (Uint32)subId;
subconf->subscriptionKey =(getOwnNodeId() << 16) | (Uint32)(subId & 0xFFFF);
- subconf->subscriberData = subbPtr.p->m_senderData;
- sendSignal(subbPtr.p->m_subscriberRef, GSN_CREATE_SUBID_CONF, signal,
+ sendSignal(subbPtr.p->m_senderRef, GSN_CREATE_SUBID_CONF, signal,
CreateSubscriptionIdConf::SignalLength, JBB);
c_subscriberPool.release(subbPtr);
-
+ DBUG_PRINT("info",("c_subscriberPool size: %d free: %d",
+ c_subscriberPool.getSize(),
+ c_subscriberPool.getNoOfFree()));
DBUG_VOID_RETURN;
}
@@ -887,6 +785,7 @@
{
jamEntry();
DBUG_ENTER("Suma::execUTIL_SEQUENCE_REF");
+ ndbassert(signal->getNoOfSections() == 0);
UtilSequenceRef * ref = (UtilSequenceRef*)signal->getDataPtr();
Uint32 err= ref->errorCode;
@@ -900,27 +799,35 @@
SubscriberPtr subbPtr;
c_subscriberPool.getPtr(subbPtr,subData);
- sendSubIdRef(signal, err);
+ sendSubIdRef(signal, subbPtr.p->m_senderRef, subbPtr.p->m_senderData, err);
c_subscriberPool.release(subbPtr);
+ DBUG_PRINT("info",("c_subscriberPool size: %d free: %d",
+ c_subscriberPool.getSize(),
+ c_subscriberPool.getNoOfFree()));
DBUG_VOID_RETURN;
}//execUTIL_SEQUENCE_REF()
void
-SumaParticipant::sendSubIdRef(Signal* signal, Uint32 errCode){
+SumaParticipant::sendSubIdRef(Signal* signal,
+ Uint32 senderRef, Uint32 senderData, Uint32 errCode)
+{
jam();
+ DBUG_ENTER("SumaParticipant::sendSubIdRef");
CreateSubscriptionIdRef * ref =
(CreateSubscriptionIdRef *)signal->getDataPtrSend();
- ref->err = errCode;
- sendSignal(signal->getSendersBlockRef(),
+ ref->senderRef = reference();
+ ref->senderData = senderData;
+ ref->errorCode = errCode;
+ sendSignal(senderRef,
GSN_CREATE_SUBID_REF,
signal,
CreateSubscriptionIdRef::SignalLength,
JBB);
- releaseSections(signal);
- return;
+ releaseSections(signal);
+ DBUG_VOID_RETURN;
}
/************************************************** ********
@@ -929,218 +836,109 @@
* Creation of subscriptions
*/
+void
+SumaParticipant::addTableId(Uint32 tableId,
+ SubscriptionPtr subPtr, SyncRecord *psyncRec)
+{
+ DBUG_ENTER("SumaParticipant::addTableId");
+ DBUG_PRINT("enter",("tableId: %u subPtr.i: %u", tableId, subPtr.i));
+ subPtr.p->m_tableId= tableId;
+ if(psyncRec != NULL)
+ psyncRec->m_tableList.append(&tableId, 1);
+ DBUG_VOID_RETURN;
+}
+
void
-SumaParticipant::execSUB_CREATE_REQ(Signal* signal) {
-#ifdef NODEFAIL_DEBUG
- ndbout_c("SumaParticipant::execSUB_CREATE_REQ");
-#endif
+SumaParticipant::execSUB_CREATE_REQ(Signal* signal)
+{
jamEntry();
-
+ DBUG_ENTER("SumaParticipant::execSUB_CREATE_REQ");
+ ndbassert(signal->getNoOfSections() == 0);
CRASH_INSERTION(13003);
const SubCreateReq req = *(SubCreateReq*)signal->getDataPtr();
+ const Uint32 subRef = req.senderRef;
+ const Uint32 subData = req.senderData;
const Uint32 subId = req.subscriptionId;
const Uint32 subKey = req.subscriptionKey;
- const Uint32 subRef = req.subscriberRef;
- const Uint32 subData = req.subscriberData;
const Uint32 type = req.subscriptionType & SubCreateReq::RemoveFlags;
const Uint32 flags = req.subscriptionType & SubCreateReq::GetFlags;
const bool addTableFlag = (flags & SubCreateReq::AddTableFlag) != 0;
const bool restartFlag = (flags & SubCreateReq::RestartFlag) != 0;
-
- const Uint32 sender = signal->getSendersBlockRef();
+ const Uint32 tableId = req.tableId;
Subscription key;
key.m_subscriptionId = subId;
key.m_subscriptionKey = subKey;
SubscriptionPtr subPtr;
- Ptr<SyncRecord> syncPtr;
-
+
if (addTableFlag) {
ndbrequire(restartFlag); //TODO remove this
if(!c_subscriptions.find(subPtr, key)) {
jam();
- sendSubCreateRef(signal, req, 1407);
- return;
+ sendSubCreateRef(signal, 1407);
+ DBUG_VOID_RETURN;
}
jam();
- c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI);
+ if (restartFlag)
+ {
+ ndbrequire(type != SubCreateReq::SingleTableScan);
+ ndbrequire(req.tableId != subPtr.p->m_tableId);
+ ndbrequire(type != SubCreateReq::TableEvent);
+ addTableId(req.tableId, subPtr, 0);
+ }
} else {
// Check that id/key is unique
if(c_subscriptions.find(subPtr, key)) {
jam();
- sendSubCreateRef(signal, req, 1415);
- return;
+ sendSubCreateRef(signal, 1415);
+ DBUG_VOID_RETURN;
}
if(!c_subscriptions.seize(subPtr)) {
jam();
- sendSubCreateRef(signal, req, 1412);
- return;
- }
- if(!c_syncPool.seize(syncPtr)) {
- jam();
- sendSubCreateRef(signal, req, 1416);
- return;
+ sendSubCreateRef(signal, 1412);
+ DBUG_VOID_RETURN;
}
+ DBUG_PRINT("info",("c_subscriptionPool size: %d free: %d",
+ c_subscriptionPool.getSize(),
+ c_subscriptionPool.getNoOfFree()));
jam();
- subPtr.p->m_subscriberRef = subRef;
- subPtr.p->m_subscriberData = subData;
+ subPtr.p->m_senderRef = subRef;
+ subPtr.p->m_senderData = subData;
subPtr.p->m_subscriptionId = subId;
subPtr.p->m_subscriptionKey = subKey;
subPtr.p->m_subscriptionType = type;
-
- /**
- * ok to memset? Support on all compilers
- * @todo find out if memset is supported by all compilers
- */
- subPtr.p->m_tables.clear();
- subPtr.p->m_maxTables = 0;
- subPtr.p->m_currentTable = 0;
- subPtr.p->m_syncPtrI = syncPtr.i;
- subPtr.p->m_markRemove = false;
- subPtr.p->m_nSubscribers = 0;
-
- c_subscriptions.add(subPtr);
+ subPtr.p->m_tableId = tableId;
+ subPtr.p->m_state = Subscription:

EFINED;
+ subPtr.p->n_subscribers = 0;
- syncPtr.p->m_subscriptionPtrI = subPtr.i;
- syncPtr.p->m_doSendSyncData = true;
- syncPtr.p->ptrI = syncPtr.i;
- syncPtr.p->m_locked = false;
- syncPtr.p->m_error = 0;
- }
-
- if (restartFlag ||
- type == SubCreateReq::TableEvent) {
-
- syncPtr.p->m_doSendSyncData = false;
-
- ndbrequire(type != SubCreateReq::SingleTableScan);
- jam();
-
- if (subPtr.p->m_tables.get(req.tableId)) {
- ndbrequire(false); //TODO remove
- jam();
- sendSubCreateRef(signal, req, /** Error code */0);
- return;
- }
- if (addTableFlag) {
- ndbrequire(type != SubCreateReq::TableEvent);
- jam();
- }
- subPtr.p->m_maxTables++;
- addTableId(req.tableId, subPtr, syncPtr.p);
- } else {
- switch(type){
- case SubCreateReq::SingleTableScan:
- {
- jam();
- syncPtr.p->m_tableList.append(&req.tableId, 1);
- if(signal->getNoOfSections() > 0){
- SegmentedSectionPtr ptr;
- signal->getSection(ptr, SubCreateReq::ATTRIBUTE_LIST);
- LocalDataBuffer<15> attrBuf(c_dataBufferPool,syncPtr.p->m_attributeList);
- append(attrBuf, ptr, getSectionSegmentPool());
- }
- }
- break;
-#if 0
- case SubCreateReq::SelectiveTableSnapshot:
- /**
- * Tables specified by the user that does not exist
- * in the database are just ignored. No error message
- * is given, nor does the db nodes crash
- * @todo: Memory is not release here (used tableBuf)
- */
- {
- if(signal->getNoOfSections() == 0 ){
- jam();
- sendSubCreateRef(signal, req, /** Error code */);
- return;
- }
+ DBUG_PRINT("info",("Added: key.m_subscriptionId: %u, key.m_subscriptionKey: %u",
+ key.m_subscriptionId, key.m_subscriptionKey));
- jam();
- SegmentedSectionPtr ptr;
- signal->getSection(ptr,0);// SubCreateReq::TABLE_LIST);
- SimplePropertiesSectionReader r0(ptr, getSectionSegmentPool());
- Uint32 i=0;
- char table[MAX_TAB_NAME_SIZE];
- r0.reset();
- r0.first();
- while(true){
- if ((r0.getValueType() != SimpleProperties::StringValue) ||
- (r0.getValueLen() <= 0)) {
- releaseSections(signal);
- ndbrequire(false);
- }
- r0.getString(table);
- strcpy(subPtr.p->m_tableNames[i],table);
- i++;
- if(!r0.next())
- break;
- }
- releaseSections(signal);
- subPtr.p->m_maxTables = i;
- subPtr.p->m_currentTable = 0;
- releaseSections(signal);
- convertNameToId(subPtr, signal);
- return;
- }
- break;
-#endif
- case SubCreateReq:

atabaseSnapshot:
- {
- jam();
- }
- break;
- default:
- ndbrequire(false);
- }
+ c_subscriptions.add(subPtr);
}
- sendSubCreateConf(signal, sender, subPtr);
-
- return;
+ SubCreateConf * const conf = (SubCreateConf*)signal->getDataPtrSend();
+ conf->senderRef = reference();
+ conf->senderData = subPtr.p->m_senderData;
+ sendSignal(subRef, GSN_SUB_CREATE_CONF, signal, SubCreateConf::SignalLength, JBB);
+ DBUG_VOID_RETURN;
}
void
-SumaParticipant::sendSubCreateConf(Signal* signal, Uint32 sender,
- SubscriptionPtr subPtr)
+SumaParticipant::sendSubCreateRef(Signal* signal, Uint32 errCode)
{
- SubCreateConf * const conf = (SubCreateConf*)signal->getDataPtrSend();
- conf->subscriptionId = subPtr.p->m_subscriptionId;
- conf->subscriptionKey = subPtr.p->m_subscriptionKey;
- conf->subscriberData = subPtr.p->m_subscriberData;
- sendSignal(sender, GSN_SUB_CREATE_CONF, signal,
- SubCreateConf::SignalLength, JBB);
-}
-
-void
-SumaParticipant::sendSubCreateRef(Signal* signal, const SubCreateReq& req, Uint32 errCode){
jam();
SubCreateRef * ref = (SubCreateRef *)signal->getDataPtrSend();
- ref->subscriberRef = reference();
- ref->subscriberData = req.subscriberData;
- ref->err = errCode;
- releaseSections(signal);
+ ref->errorCode = errCode;
sendSignal(signal->getSendersBlockRef(), GSN_SUB_CREATE_REF, signal,
SubCreateRef::SignalLength, JBB);
return;
}
-
-
-
-
-
-
-
-
-
-
-
Uint32
SumaParticipant::getFirstGCI(Signal* signal) {
if (c_lastCompleteGCI == RNIL) {
@@ -1157,13 +955,12 @@
*/
void
-SumaParticipant::execSUB_SYNC_REQ(Signal* signal) {
+SumaParticipant::execSUB_SYNC_REQ(Signal* signal)
+{
jamEntry();
-
+ DBUG_ENTER("SumaParticipant::execSUB_SYNC_REQ");
+ ndbassert(signal->getNoOfSections() <= 1);
CRASH_INSERTION(13004);
-#ifdef EVENT_PH3_DEBUG
- ndbout_c("SumaParticipant::execSUB_SYNC_REQ");
-#endif
SubSyncReq * const req = (SubSyncReq*)signal->getDataPtr();
@@ -1171,28 +968,61 @@
Subscription key;
key.m_subscriptionId = req->subscriptionId;
key.m_subscriptionKey = req->subscriptionKey;
-
- if(!c_subscriptions.find(subPtr, key)){
- jam();
- sendSubSyncRef(signal, 1407);
- return;
- }
- /**
- * @todo Tomas, do you really need to do this?
- */
- if(subPtr.p->m_subscriptionType == SubCreateReq::TableEvent) {
+ DBUG_PRINT("enter",("key.m_subscriptionId: %u, key.m_subscriptionKey: %u",
+ key.m_subscriptionId, key.m_subscriptionKey));
+
+ if(!c_subscriptions.find(subPtr, key))
+ {
jam();
- subPtr.p->m_subscriberData = req->subscriberData;
+ DBUG_PRINT("info",("Not found"));
+ sendSubSyncRef(signal, 1407);
+ DBUG_VOID_RETURN;
}
bool ok = false;
SubscriptionData::Part part = (SubscriptionData::Part)req->part;
Ptr<SyncRecord> syncPtr;
- c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI);
+ if(!c_syncPool.seize(syncPtr))
+ {
+ jam();
+ sendSubSyncRef(signal, 1416);
+ DBUG_VOID_RETURN;
+ }
+ DBUG_PRINT("info",("c_syncPool size: %d free: %d",
+ c_syncPool.getSize(),
+ c_syncPool.getNoOfFree()));
+ new (syncPtr.p) Ptr<SyncRecord>;
+ syncPtr.p->m_senderRef = req->senderRef;
+ syncPtr.p->m_senderData = req->senderData;
+ syncPtr.p->m_subscriptionPtrI = subPtr.i;
+ syncPtr.p->ptrI = syncPtr.i;
+ syncPtr.p->m_error = 0;
+
+ {
+ jam();
+ syncPtr.p->m_tableList.append(&subPtr.p->m_tableId, 1);
+ if(signal->getNoOfSections() > 0){
+ SegmentedSectionPtr ptr;
+ signal->getSection(ptr, SubSyncReq::ATTRIBUTE_LIST);
+ LocalDataBuffer<15> attrBuf(c_dataBufferPool,syncPtr.p->m_attributeList);
+ append(attrBuf, ptr, getSectionSegmentPool());
+ releaseSections(signal);
+ }
+ }
+
+ TablePtr tabPtr;
+ initTable(signal,subPtr.p->m_tableId,tabPtr,syncPtr);
+ tabPtr.p->n_subscribers++;
+ DBUG_PRINT("info",("SumaParticipant::Table[%u]::n_subscribers: %u",
+ tabPtr.p->m_tableId, tabPtr.p->n_subscribers));
+ DBUG_VOID_RETURN;
+
switch(part){
case SubscriptionData::MetaData:
+ ndbrequire(false);
+#if 0
ok = true;
jam();
if (subPtr.p->m_subscriptionType == SubCreateReq:

atabaseSnapshot) {
@@ -1218,6 +1048,7 @@
}
syncPtr.p->startMeta(signal);
+#endif
break;
case SubscriptionData::TableData: {
ok = true;
@@ -1227,21 +1058,20 @@
}
}
ndbrequire(ok);
+ DBUG_VOID_RETURN;
}
void
SumaParticipant::sendSubSyncRef(Signal* signal, Uint32 errCode){
jam();
- SubSyncRef * ref =
- (SubSyncRef *)signal->getDataPtrSend();
- ref->err = errCode;
+ SubSyncRef * ref= (SubSyncRef *)signal->getDataPtrSend();
+ ref->errorCode = errCode;
+ releaseSections(signal);
sendSignal(signal->getSendersBlockRef(),
GSN_SUB_SYNC_REF,
signal,
SubSyncRef::SignalLength,
JBB);
-
- releaseSections(signal);
return;
}
@@ -1249,6 +1079,7 @@
* Dict interface
*/
+#if 0
void
SumaParticipant::execLIST_TABLES_CONF(Signal* signal){
jamEntry();
@@ -1257,14 +1088,243 @@
SyncRecord* tmp = c_syncPool.getPtr(conf->senderData);
tmp->runLIST_TABLES_CONF(signal);
}
+#endif
+
+
+/************************************************** ***********************
+ *
+ *
+ */
+#if 0
+void
+SumaParticipant::Table::runLIST_TABLES_CONF(Signa l* signal){
+ jam();
+
+ ListTablesConf * const conf = (ListTablesConf*)signal->getDataPtr();
+ const Uint32 len = signal->length() - ListTablesConf::HeaderLength;
+
+ SubscriptionPtr subPtr;
+ suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);
+
+ for (unsigned i = 0; i < len; i++) {
+ subPtr.p->m_maxTables++;
+ suma.addTableId(ListTablesConf::getTableId(conf->tableData[i]), subPtr, this);
+ }
+
+ // for (unsigned i = 0; i < len; i++)
+ // conf->tableData[i] = ListTablesConf::getTableId(conf->tableData[i]);
+ // m_tableList.append(&conf->tableData[0], len);
+
+#if 0
+ TableList:

ataBufferIterator it;
+ int i = 0;
+ for(m_tableList.first(it);!it.isNull();m_tableList .next(it)) {
+ ndbout_c("%u listtableconf tableid %d", i++, *it.data);
+ }
+#endif
+
+ if(len == ListTablesConf:

ataLength){
+ jam();
+ // we expect more LIST_TABLE_CONF
+ return;
+ }
+
+#if 0
+ subPtr.p->m_currentTable = 0;
+ subPtr.p->m_maxTables = 0;
+
+ TableList:

ataBufferIterator it;
+ for(m_tableList.first(it); !it.isNull(); m_tableList.next(it)) {
+ subPtr.p->m_maxTables++;
+ suma.addTableId(*it.data, subPtr, NULL);
+#ifdef NODEFAIL_DEBUG
+ ndbout_c(" listtableconf tableid %d",*it.data);
+#endif
+ }
+#endif
+
+ startMeta(signal);
+}
+#endif
+
+
+int
+SumaParticipant::initTable(Signal *signal, Uint32 tableId, TablePtr &tabPtr,
+ SubscriberPtr subbPtr)
+{
+ DBUG_ENTER("SumaParticipant::initTable SubscriberPtr");
+ DBUG_PRINT("enter",("tableId: %d", tableId));
+
+ int r= initTable(signal,tableId,tabPtr);
+
+ LocalDLList<Subscriber> subscribers(c_subscriberPool,
+ tabPtr.p->c_subscribers);
+ subscribers.add(subbPtr);
+
+ DBUG_PRINT("info",("added subscriber: %i", subbPtr.i));
+
+ if (r)
+ {
+ // we have to wait getting tab info
+ DBUG_RETURN(1);
+ }
+
+ if (tabPtr.p->setupTrigger(signal, *this))
+ {
+ // we have to wait for triggers to be setup
+ DBUG_RETURN(1);
+ }
+
+ completeInitTable(signal, tabPtr);
+ DBUG_RETURN(0);
+}
+
+int
+SumaParticipant::initTable(Signal *signal, Uint32 tableId, TablePtr &tabPtr,
+ Ptr<SyncRecord> syncPtr)
+{
+ jam();
+ DBUG_ENTER("SumaParticipant::initTable Ptr<SyncRecord>");
+ DBUG_PRINT("enter",("tableId: %d", tableId));
+
+ int r= initTable(signal,tableId,tabPtr);
+
+ LocalDLList<SyncRecord> syncRecords(c_syncPool,tabPtr.p->c_syncRecords);
+ syncRecords.add(syncPtr);
+
+ if (r)
+ {
+ // we have to wait getting tab info
+ DBUG_RETURN(1);
+ }
+ completeInitTable(signal, tabPtr);
+ DBUG_RETURN(0);
+}
+
+int
+SumaParticipant::initTable(Signal *signal, Uint32 tableId, TablePtr &tabPtr)
+{
+ jam();
+ DBUG_ENTER("SumaParticipant::initTable");
+
+ if (!c_tables.find(tabPtr, tableId))
+ {
+ // table not being prepared
+ // seize a new table, initialize and add to c_tables
+ ndbrequire(c_tablePool_.seize(tabPtr));
+ DBUG_PRINT("info",("c_tablePool size: %d free: %d",
+ c_tablePool_.getSize(),
+ c_tablePool_.getNoOfFree()));
+ new (tabPtr.p) Table;
+
+ tabPtr.p->m_tableId= tableId;
+ tabPtr.p->m_ptrI= tabPtr.i;
+ tabPtr.p->n_subscribers = 0;
+ DBUG_PRINT("info",("SumaParticipant::Table[%u]::n_subscribers: %u",
+ tabPtr.p->m_tableId, tabPtr.p->n_subscribers));
+
+ tabPtr.p->m_error = 0;
+ tabPtr.p->m_schemaVersion = RNIL;
+ tabPtr.p->m_state = Table:

EFINING;
+ tabPtr.p->m_hasTriggerDefined[0] = 0;
+ tabPtr.p->m_hasTriggerDefined[1] = 0;
+ tabPtr.p->m_hasTriggerDefined[2] = 0;
+ tabPtr.p->m_triggerIds[0] = ILLEGAL_TRIGGER_ID;
+ tabPtr.p->m_triggerIds[1] = ILLEGAL_TRIGGER_ID;
+ tabPtr.p->m_triggerIds[2] = ILLEGAL_TRIGGER_ID;
+
+ c_tables.add(tabPtr);
+
+ GetTabInfoReq * req = (GetTabInfoReq *)signal->getDataPtrSend();
+ req->senderRef = reference();
+ req->senderData = tabPtr.i;
+ req->requestType =
+ GetTabInfoReq::RequestById | GetTabInfoReq::LongSignalConf;
+ req->tableId = tableId;
+
+ DBUG_PRINT("info",("GET_TABINFOREQ id %d", req->tableId));
+ sendSignal(DBDICT_REF, GSN_GET_TABINFOREQ, signal,
+ GetTabInfoReq::SignalLength, JBB);
+ DBUG_RETURN(1);
+ }
+ if (tabPtr.p->m_state == Table:

EFINING)
+ {
+ DBUG_RETURN(1);
+ }
+ ndbrequire(tabPtr.p->m_state == Table:

EFINED);
+ DBUG_RETURN(0);
+}
+
+void
+SumaParticipant::completeInitTable(Signal *signal, TablePtr tabPtr)
+{
+ jam();
+ DBUG_ENTER("SumaParticipant::completeInitTable");
+
+ // handle all subscribers
+ LocalDLList<Subscriber> subscribers(c_subscriberPool,
+ tabPtr.p->c_subscribers);
+ SubscriberPtr subbPtr;
+ for(subscribers.first(subbPtr);
+ !subbPtr.isNull();
+ subscribers.next(subbPtr))
+ {
+ if (tabPtr.p->m_error)
+ {
+ sendSubStartRef(signal,subbPtr,tabPtr.p->m_error,
+ SubscriptionData::TableData);
+ tabPtr.p->n_subscribers--;
+ }
+ else
+ {
+ Uint32 gci= getFirstGCI(signal);
+ subbPtr.p->m_firstGCI = gci;
+ sendSubStartComplete(signal,subbPtr,gci,
+ SubscriptionData::TableData);
+ }
+ }
+
+ // handle all syncRecords
+ LocalDLList<SyncRecord> syncRecords(c_syncPool,
+ tabPtr.p->c_syncRecords);
+ Ptr<SyncRecord> syncPtr;
+ while(syncRecords.first(syncPtr))
+ {
+ syncRecords.remove(syncPtr);
+ syncPtr.p->ptrI = syncPtr.i;
+ if (tabPtr.p->m_error == 0)
+ {
+ jam();
+ syncPtr.p->startScan(signal);
+ }
+ else
+ {
+ jam();
+ syncPtr.p->completeScan(signal, tabPtr.p->m_error);
+ tabPtr.p->n_subscribers--;
+ }
+ }
+
+ if (tabPtr.p->m_error)
+ {
+ DBUG_PRINT("info",("SumaParticipant::Table[%u]::n_subscribers: %u",
+ tabPtr.p->m_tableId, tabPtr.p->n_subscribers));
+ tabPtr.p->checkRelease(*this);
+ }
+ else
+ {
+ tabPtr.p->m_state = Table:

EFINED;
+ }
+
+ DBUG_VOID_RETURN;
+}
void
SumaParticipant::execGET_TABINFOREF(Signal* signal){
jamEntry();
- GetTabInfoRef* const ref = (GetTabInfoRef*)signal->getDataPtr();
- SyncRecord* tmp = c_syncPool.getPtr(ref->senderData);
- tmp->runGET_TABINFOREF(signal);
+ /* ToDo handle this */
+ ndbrequire(false);
}
void
@@ -1278,23 +1338,30 @@
}
GetTabInfoConf* conf = (GetTabInfoConf*)signal->getDataPtr();
-
Uint32 tableId = conf->tableId;
- Uint32 senderData = conf->senderData;
-
- SyncRecord* tmp = c_syncPool.getPtr(senderData);
- ndbrequire(parseTable(signal, conf, tableId, tmp));
- tmp->runGET_TABINFO_CONF(signal);
+ TablePtr tabPtr;
+ c_tablePool_.getPtr(tabPtr, conf->senderData);
+ SegmentedSectionPtr ptr;
+ signal->getSection(ptr, GetTabInfoConf:

ICT_TAB_INFO);
+ ndbrequire(tabPtr.p->parseTable(ptr, *this));
+ releaseSections(signal);
+ /**
+ * We need to gather fragment info
+ */
+ jam();
+ signal->theData[0] = RNIL;
+ signal->theData[1] = tableId;
+ signal->theData[2] = tabPtr.i;
+ sendSignal(DBDIH_REF, GSN_DI_FCOUNTREQ, signal, 3, JBB);
}
bool
-SumaParticipant:

arseTable(Signal* signal, GetTabInfoConf* conf, Uint32 tableId,
- SyncRecord* syncPtr_p){
-
- SegmentedSectionPtr ptr;
- signal->getSection(ptr, GetTabInfoConf:

ICT_TAB_INFO);
+SumaParticipant::Table:

arseTable(SegmentedSecti onPtr ptr,
+ SumaParticipant &suma)
+{
+ DBUG_ENTER("SumaParticipant::Table:

arseTable");
- SimplePropertiesSectionReader it(ptr, getSectionSegmentPool());
+ SimplePropertiesSectionReader it(ptr, suma.getSectionSegmentPool());
SimpleProperties::UnpackStatus s;
DictTabInfo::Table tableDesc; tableDesc.init();
@@ -1303,22 +1370,23 @@
DictTabInfo::TableMappingSize,
true, true);
- ndbrequire(s == SimpleProperties::Break);
+ suma.suma_ndbrequire(s == SimpleProperties::Break);
- TablePtr tabPtr;
- c_tables.find(tabPtr, tableId);
-
- if(!tabPtr.isNull() &&
- tabPtr.p->m_schemaVersion != tableDesc.TableVersion){
+#if 0
+ToDo handle this
+ if(m_schemaVersion != tableDesc.TableVersion){
jam();
- tabPtr.p->release(* this);
+ release(* this);
// oops wrong schema version in stored tabledesc
// we need to find all subscriptions with old table desc
// and all subscribers to this
// hopefully none
c_tables.release(tabPtr);
+ DBUG_PRINT("info",("c_tablePool size: %d free: %d",
+ suma.c_tablePool_.getSize(),
+ suma.c_tablePool_.getNoOfFree()));
tabPtr.setNull();
DLHashTable<SumaParticipant::Subscription>::Iterat or i_subPtr;
c_subscriptions.first(i_subPtr);
@@ -1353,31 +1421,11 @@
}
}
}
-
- if (tabPtr.isNull()) {
- jam();
- /**
- * Uninitialized table record
- */
- ndbrequire(c_tables.seize(tabPtr));
- new (tabPtr.p) Table;
- tabPtr.p->m_schemaVersion = RNIL;
- tabPtr.p->m_tableId = tableId;
- tabPtr.p->m_hasTriggerDefined[0] = 0;
- tabPtr.p->m_hasTriggerDefined[1] = 0;
- tabPtr.p->m_hasTriggerDefined[2] = 0;
- tabPtr.p->m_triggerIds[0] = ILLEGAL_TRIGGER_ID;
- tabPtr.p->m_triggerIds[1] = ILLEGAL_TRIGGER_ID;
- tabPtr.p->m_triggerIds[2] = ILLEGAL_TRIGGER_ID;
-#if 0
- ndbout_c("Get tab info conf %d", tableId);
#endif
- c_tables.add(tabPtr);
- }
- if(tabPtr.p->m_attributes.getSize() != 0){
+ if(m_attributes.getSize() != 0){
jam();
- return true;
+ DBUG_RETURN(true);
}
/**
@@ -1385,22 +1433,22 @@
*/
Uint32 noAttribs = tableDesc.NoOfAttributes;
Uint32 notFixed = (tableDesc.NoOfNullable+tableDesc.NoOfVariable);
- tabPtr.p->m_schemaVersion = tableDesc.TableVersion;
+ m_schemaVersion = tableDesc.TableVersion;
// The attribute buffer
- LocalDataBuffer<15> attrBuf(c_dataBufferPool, tabPtr.p->m_attributes);
+ LocalDataBuffer<15> attrBuf(suma.c_dataBufferPool, m_attributes);
// Temporary buffer
- DataBuffer<15> theRest(c_dataBufferPool);
+ DataBuffer<15> theRest(suma.c_dataBufferPool);
if(!attrBuf.seize(noAttribs)){
- ndbrequire(false);
- return false;
+ suma.suma_ndbrequire(false);
+ DBUG_RETURN(false);
}
if(!theRest.seize(notFixed)){
- ndbrequire(false);
- return false;
+ suma.suma_ndbrequire(false);
+ DBUG_RETURN(false);
}
DataBuffer<15>:

ataBufferIterator attrIt; // Fixed not nullable
@@ -1414,7 +1462,7 @@
DictTabInfo::AttributeMapping,
DictTabInfo::AttributeMappingSize,
true, true);
- ndbrequire(s == SimpleProperties::Break);
+ suma.suma_ndbrequire(s == SimpleProperties::Break);
if (!attrDesc.AttributeNullableFlag
/* && !attrDesc.AttributeVariableFlag */) {
@@ -1442,310 +1490,15 @@
theRest.release();
- return true;
-}
-
-void
-SumaParticipant::execDI_FCOUNTCONF(Signal* signal){
- jamEntry();
-
- CRASH_INSERTION(13007);
-
- const Uint32 senderData = signal->theData[3];
- SyncRecord* tmp = c_syncPool.getPtr(senderData);
- tmp->runDI_FCOUNTCONF(signal);
+ DBUG_RETURN(true);
}
void
-SumaParticipant::execDIGETPRIMCONF(Signal* signal){
- jamEntry();
-
- CRASH_INSERTION(13008);
-
- const Uint32 senderData = signal->theData[1];
- SyncRecord* tmp = c_syncPool.getPtr(senderData);
- tmp->runDIGETPRIMCONF(signal);
-}
-
-void
-SumaParticipant::execCREATE_TRIG_CONF(Signal* signal){
- jamEntry();
- DBUG_ENTER("SumaParticipant::execCREATE_TRIG_CONF" );
- CRASH_INSERTION(13009);
-
- CreateTrigConf * const conf = (CreateTrigConf*)signal->getDataPtr();
-
- const Uint32 senderData = conf->getConnectionPtr();
- SyncRecord* tmp = c_syncPool.getPtr(senderData);
- tmp->runCREATE_TRIG_CONF(signal);
-
- /**
- * dodido
- * @todo: I (Johan) dont know what to do here. Jonas, what do you mean?
- */
- DBUG_VOID_RETURN;
-}
-
-void
-SumaParticipant::execCREATE_TRIG_REF(Signal* signal){
- jamEntry();
- DBUG_ENTER("SumaParticipant::execDROP_TRIG_REF");
- CreateTrigRef * const ref = (CreateTrigRef*)signal->getDataPtr();
-
- const Uint32 senderData = ref->getConnectionPtr();
- SyncRecord* tmp = c_syncPool.getPtr(senderData);
-
- tmp->runCREATE_TRIG_REF(signal);
-
- DBUG_VOID_RETURN;
-}
-
-void
-SumaParticipant::execDROP_TRIG_CONF(Signal* signal){
- jamEntry();
- DBUG_ENTER("SumaParticipant::execDROP_TRIG_CONF");
- CRASH_INSERTION(13010);
-
- DropTrigConf * const conf = (DropTrigConf*)signal->getDataPtr();
-
- const Uint32 senderData = conf->getConnectionPtr();
- SyncRecord* tmp = c_syncPool.getPtr(senderData);
- tmp->runDROP_TRIG_CONF(signal);
- DBUG_VOID_RETURN;
-}
-
-void
-SumaParticipant::execDROP_TRIG_REF(Signal* signal){
- jamEntry();
- DBUG_ENTER("SumaParticipant::execDROP_TRIG_CONF");
- DropTrigRef * const ref = (DropTrigRef*)signal->getDataPtr();
-
- const Uint32 senderData = ref->getConnectionPtr();
- SyncRecord* tmp = c_syncPool.getPtr(senderData);
- tmp->runDROP_TRIG_REF(signal);
- DBUG_VOID_RETURN;
-}
-
-/************************************************** ***********************
- *
- *
- */
-
-void
-SumaParticipant::SyncRecord::runLIST_TABLES_CONF(S ignal* signal){
- jam();
-
- ListTablesConf * const conf = (ListTablesConf*)signal->getDataPtr();
- const Uint32 len = signal->length() - ListTablesConf::HeaderLength;
-
- SubscriptionPtr subPtr;
- suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);
-
- for (unsigned i = 0; i < len; i++) {
- subPtr.p->m_maxTables++;
- suma.addTableId(ListTablesConf::getTableId(conf->tableData[i]), subPtr, this);
- }
-
- // for (unsigned i = 0; i < len; i++)
- // conf->tableData[i] = ListTablesConf::getTableId(conf->tableData[i]);
- // m_tableList.append(&conf->tableData[0], len);
-
-#if 0
- TableList:

ataBufferIterator it;
- int i = 0;
- for(m_tableList.first(it);!it.isNull();m_tableList .next(it)) {
- ndbout_c("%u listtableconf tableid %d", i++, *it.data);
- }
-#endif
-
- if(len == ListTablesConf:

ataLength){
- jam();
- // we expect more LIST_TABLE_CONF
- return;
- }
-
-#if 0
- subPtr.p->m_currentTable = 0;
- subPtr.p->m_maxTables = 0;
-
- TableList:

ataBufferIterator it;
- for(m_tableList.first(it); !it.isNull(); m_tableList.next(it)) {
- subPtr.p->m_maxTables++;
- suma.addTableId(*it.data, subPtr, NULL);
-#ifdef NODEFAIL_DEBUG
- ndbout_c(" listtableconf tableid %d",*it.data);
-#endif
- }
-#endif
-
- startMeta(signal);
-}
-
-void
-SumaParticipant::SyncRecord::startMeta(Signal* signal){
- jam();
- m_currentTable = 0;
- nextMeta(signal);
-}
-
-/**
- * m_tableList only contains UserTables
- */
-void
-SumaParticipant::SyncRecord::nextMeta(Signal* signal){
- jam();
-
- TableList:

ataBufferIterator it;
- if(!m_tableList.position(it, m_currentTable)){
- completeMeta(signal);
- return;
- }
-
- GetTabInfoReq * req = (GetTabInfoReq *)signal->getDataPtrSend();
- req->senderRef = suma.reference();
- req->senderData = ptrI;
- req->requestType =
- GetTabInfoReq::RequestById | GetTabInfoReq::LongSignalConf;
- req->tableId = * it.data;
-
-#if 0
- ndbout_c("GET_TABINFOREQ id %d", req->tableId);
-#endif
- suma.sendSignal(DBDICT_REF, GSN_GET_TABINFOREQ, signal,
- GetTabInfoReq::SignalLength, JBB);
-}
-
-void
-SumaParticipant::SyncRecord::runGET_TABINFOREF(Sig nal* signal)
+SumaParticipant::execDI_FCOUNTCONF(Signal* signal)
{
- jam();
-
- SubscriptionPtr subPtr;
- suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);
- ndbrequire(subPtr.p->m_syncPtrI == ptrI);
-
- Uint32 type = subPtr.p->m_subscriptionType;
-
- bool do_continue = false;
- switch (type) {
- case SubCreateReq::TableEvent:
- jam();
- break;
- case SubCreateReq:

atabaseSnapshot:
- jam();
- do_continue = true;
- break;
- case SubCreateReq::SelectiveTableSnapshot:
- jam();
- do_continue = true;
- break;
- case SubCreateReq::SingleTableScan:
- jam();
- break;
- default:
- ndbrequire(false);
- break;
- }
-
- if (! do_continue) {
- m_error = 1;
- completeMeta(signal);
- return;
- }
-
- m_currentTable++;
- nextMeta(signal);
- return;
-
- // now we need to clean-up
-}
-
-
-void
-SumaParticipant::SyncRecord::runGET_TABINFO_CONF(S ignal* signal){
- jam();
-
- GetTabInfoConf * const conf = (GetTabInfoConf*)signal->getDataPtr();
- // const Uint32 gci = conf->gci;
- const Uint32 tableId = conf->tableId;
- TableList:

ataBufferIterator it;
-
- ndbrequire(m_tableList.position(it, m_currentTable));
- ndbrequire(* it.data == tableId);
-
- SubscriptionPtr subPtr;
- suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);
- ndbrequire(subPtr.p->m_syncPtrI == ptrI);
-
- SegmentedSectionPtr ptr;
- signal->getSection(ptr, GetTabInfoConf:

ICT_TAB_INFO);
-
- SubMetaData * data = (SubMetaData*)signal->getDataPtrSend();
- /**
- * sending lastCompleteGCI. Used by Lars in interval calculations
- * incremenet by one, since last_CompleteGCI is the not the current gci.
- */
- data->gci = suma.c_lastCompleteGCI + 1;
- data->tableId = tableId;
- data->senderData = subPtr.p->m_subscriberData;
-#if PRINT_ONLY
- ndbout_c("GSN_SUB_META_DATA Table %d", tableId);
-#else
-
- bool okToSend = m_doSendSyncData;
-
- /*
- * If it is a selectivetablesnapshot and the table is not part of the
- * subscription, then do not send anything, just continue.
- * If it is a tablevent, don't send regardless since the APIs are not
- * interested in meta data.
- */
- if(subPtr.p->m_subscriptionType == SubCreateReq::SelectiveTableSnapshot)
- if(!subPtr.p->m_tables.get(tableId))
- okToSend = false;
-
- if(okToSend) {
- if(refToNode(subPtr.p->m_subscriberRef) == 0){
- jam();
- suma.EXECUTE_DIRECT(refToBlock(subPtr.p->m_subscriberRef),
- GSN_SUB_META_DATA,
- signal,
- SubMetaData::SignalLength);
- jamEntry();
- suma.releaseSections(signal);
- } else {
- jam();
- suma.sendSignal(subPtr.p->m_subscriberRef,
- GSN_SUB_META_DATA,
- signal,
- SubMetaData::SignalLength, JBB);
- }
- }
-#endif
-
- TablePtr tabPtr;
- ndbrequire(suma.c_tables.find(tabPtr, tableId));
-
- LocalDataBuffer<15> fragBuf(suma.c_dataBufferPool, tabPtr.p->m_fragments);
- if(fragBuf.getSize() == 0){
- /**
- * We need to gather fragment info
- */
- jam();
- signal->theData[0] = RNIL;
- signal->theData[1] = tableId;
- signal->theData[2] = ptrI;
- suma.sendSignal(DBDIH_REF, GSN_DI_FCOUNTREQ, signal, 3, JBB);
- return;
- }
-
- m_currentTable++;
- nextMeta(signal);
-}
-
-void
-SumaParticipant::SyncRecord::runDI_FCOUNTCONF(Sign al* signal){
- jam();
+ jamEntry();
+ DBUG_ENTER("SumaParticipant::execDI_FCOUNTCONF");
+ ndbassert(signal->getNoOfSections() == 0);
const Uint32 userPtr = signal->theData[0];
const Uint32 fragCount = signal->theData[1];
@@ -1754,25 +1507,31 @@
ndbrequire(userPtr == RNIL && signal->length() == 5);
TablePtr tabPtr;
- ndbrequire(suma.c_tables.find(tabPtr, tableId));
-
- LocalDataBuffer<15> fragBuf(suma.c_dataBufferPool, tabPtr.p->m_fragments);
+ tabPtr.i= signal->theData[3];
+ ndbrequire(tabPtr.p= c_tablePool_.getPtr(tabPtr.i));
+ ndbrequire(tabPtr.p->m_tableId == tableId);
+
+ LocalDataBuffer<15> fragBuf(c_dataBufferPool, tabPtr.p->m_fragments);
ndbrequire(fragBuf.getSize() == 0);
- m_currentFragment = fragCount;
+ tabPtr.p->m_fragCount = fragCount;
+
signal->theData[0] = RNIL;
- signal->theData[1] = ptrI;
+ signal->theData[1] = tabPtr.i;
signal->theData[2] = tableId;
signal->theData[3] = 0; // Frag no
- suma.sendSignal(DBDIH_REF, GSN_DIGETPRIMREQ, signal, 4, JBB);
+ sendSignal(DBDIH_REF, GSN_DIGETPRIMREQ, signal, 4, JBB);
+
+ DBUG_VOID_RETURN;
}
void
-SumaParticipant::SyncRecord::runDIGETPRIMCONF(Sign al* signal){
- jam();
+SumaParticipant::execDIGETPRIMCONF(Signal* signal){
+ jamEntry();
+ DBUG_ENTER("SumaParticipant::execDIGETPRIMCONF");
+ ndbassert(signal->getNoOfSections() == 0);
const Uint32 userPtr = signal->theData[0];
- //const Uint32 senderData = signal->theData[1];
const Uint32 nodeCount = signal->theData[6];
const Uint32 tableId = signal->theData[7];
const Uint32 fragNo = signal->theData[8];
@@ -1781,8 +1540,11 @@
ndbrequire(nodeCount > 0 && nodeCount <= MAX_REPLICAS);
TablePtr tabPtr;
- ndbrequire(suma.c_tables.find(tabPtr, tableId));
- LocalDataBuffer<15> fragBuf(suma.c_dataBufferPool, tabPtr.p->m_fragments);
+ tabPtr.i= signal->theData[1];
+ ndbrequire(tabPtr.p= c_tablePool_.getPtr(tabPtr.i));
+ ndbrequire(tabPtr.p->m_tableId == tableId);
+
+ LocalDataBuffer<15> fragBuf(c_dataBufferPool,tabPtr.p->m_fragments);
/**
* Add primary node for fragment to list
@@ -1794,27 +1556,39 @@
fragBuf.append(&signal->theData[2], 1);
const Uint32 nextFrag = fragNo + 1;
- if(nextFrag == m_currentFragment){
+ if(nextFrag == tabPtr.p->m_fragCount)
+ {
/**
* Complete frag info for table
+ * table is not up to date
*/
- m_currentTable++;
- nextMeta(signal);
- return;
+
+ LocalDLList<Subscriber> subscribers(c_subscriberPool,
+ tabPtr.p->c_subscribers);
+ if (subscribers.isEmpty())
+ {
+ completeInitTable(signal,tabPtr);
+ DBUG_VOID_RETURN;
+ }
+ tabPtr.p->setupTrigger(signal, *this);
+ DBUG_VOID_RETURN;
}
signal->theData[0] = RNIL;
- signal->theData[1] = ptrI;
+ signal->theData[1] = tabPtr.i;
signal->theData[2] = tableId;
signal->theData[3] = nextFrag; // Frag no
- suma.sendSignal(DBDIH_REF, GSN_DIGETPRIMREQ, signal, 4, JBB);
+ sendSignal(DBDIH_REF, GSN_DIGETPRIMREQ, signal, 4, JBB);
+
+ DBUG_VOID_RETURN;
}
+#if 0
void
-SumaParticipant::SyncRecord::completeMeta(Signal* signal){
+SumaParticipant::SyncRecord::completeTableInit(Si gnal* signal)
+{
jam();
SubscriptionPtr subPtr;
suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);
- ndbrequire(subPtr.p->m_syncPtrI == ptrI);
#if PRINT_ONLY
ndbout_c("GSN_SUB_SYNC_CONF (meta)");
@@ -1824,24 +1598,21 @@
if (m_error) {
SubSyncRef * const ref = (SubSyncRef*)signal->getDataPtrSend();
- ref->subscriptionId = subPtr.p->m_subscriptionId;
- ref->subscriptionKey = subPtr.p->m_subscriptionKey;
- ref->part = SubscriptionData::MetaData;
- ref->subscriberData = subPtr.p->m_subscriberData;
+ ref->senderRef = suma.reference();
+ ref->senderData = subPtr.p->m_senderData;
ref->errorCode = SubSyncRef::Undefined;
- suma.sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_SYNC_REF, signal,
+ suma.sendSignal(subPtr.p->m_senderRef, GSN_SUB_SYNC_REF, signal,
SubSyncRef::SignalLength, JBB);
} else {
SubSyncConf * const conf = (SubSyncConf*)signal->getDataPtrSend();
- conf->subscriptionId = subPtr.p->m_subscriptionId;
- conf->subscriptionKey = subPtr.p->m_subscriptionKey;
- conf->part = SubscriptionData::MetaData;
- conf->subscriberData = subPtr.p->m_subscriberData;
- suma.sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_SYNC_CONF, signal,
+ conf->senderRef = suma.reference();
+ conf->senderData = subPtr.p->m_senderData;
+ suma.sendSignal(subPtr.p->m_senderRef, GSN_SUB_SYNC_CONF, signal,
SubSyncConf::SignalLength, JBB);
}
#endif
}
+#endif
/************************************************** ********
*
@@ -1850,21 +1621,24 @@
*/
void
-SumaParticipant::SyncRecord::startScan(Signal* signal){
+SumaParticipant::SyncRecord::startScan(Signal* signal)
+{
jam();
+ DBUG_ENTER("SumaParticipant::SyncRecord::startScan ");
/**
* Get fraginfo
*/
m_currentTable = 0;
m_currentFragment = 0;
-
nextScan(signal);
+ DBUG_VOID_RETURN;
}
bool
SumaParticipant::SyncRecord::getNextFragment(Table Ptr * tab,
- FragmentDescriptor * fd){
+ FragmentDescriptor * fd)
+{
jam();
SubscriptionPtr subPtr;
suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);
@@ -1872,20 +1646,15 @@
DataBuffer<15>:

ataBufferIterator fragIt;
m_tableList.position(tabIt, m_currentTable);
- for(; !tabIt.curr.isNull(); m_tableList.next(tabIt), m_currentTable++){
+ for(; !tabIt.curr.isNull(); m_tableList.next(tabIt), m_currentTable++)
+ {
TablePtr tabPtr;
ndbrequire(suma.c_tables.find(tabPtr, * tabIt.data));
- if(subPtr.p->m_subscriptionType == SubCreateReq::SelectiveTableSnapshot)
- {
- if(!subPtr.p->m_tables.get(tabPtr.p->m_tableId)) {
- *tab = tabPtr;
- return true;
- }
- }
LocalDataBuffer<15> fragBuf(suma.c_dataBufferPool, tabPtr.p->m_fragments);
fragBuf.position(fragIt, m_currentFragment);
- for(; !fragIt.curr.isNull(); fragBuf.next(fragIt), m_currentFragment++){
+ for(; !fragIt.curr.isNull(); fragBuf.next(fragIt), m_currentFragment++)
+ {
FragmentDescriptor tmp;
tmp.m_dummy = * fragIt.data;
if(tmp.m_fragDesc.m_nodeId == suma.getOwnNodeId()){
@@ -1895,36 +1664,30 @@
}
}
m_currentFragment = 0;
+
+ tabPtr.p->n_subscribers--;
+ DBUG_PRINT("info",("SumaParticipant::Table[%u]::n_subscribers: %u",
+ tabPtr.p->m_tableId, tabPtr.p->n_subscribers));
+ tabPtr.p->checkRelease(suma);
}
return false;
}
void
-SumaParticipant::SyncRecord::nextScan(Signal* signal){
+SumaParticipant::SyncRecord::nextScan(Signal* signal)
+{
jam();
+ DBUG_ENTER("SumaParticipant::SyncRecord::nextScan" );
TablePtr tabPtr;
FragmentDescriptor fd;
SubscriptionPtr subPtr;
if(!getNextFragment(&tabPtr, &fd)){
jam();
completeScan(signal);
- return;
+ DBUG_VOID_RETURN;
}
suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);
- ndbrequire(subPtr.p->m_syncPtrI == ptrI);
- if(subPtr.p->m_subscriptionType == SubCreateReq::SelectiveTableSnapshot) {
- jam();
- if(!subPtr.p->m_tables.get(tabPtr.p->m_tableId)) {
- /*
- * table is not part of the subscription. Check next table
- */
- m_currentTable++;
- nextScan(signal);
- return;
- }
- }
-
DataBuffer<15>::Head head = m_attributeList;
if(head.getSize() == 0){
head = tabPtr.p->m_attributes;
@@ -1935,7 +1698,7 @@
const Uint32 parallelism = 16;
const Uint32 attrLen = 5 + attrBuf.getSize();
- req->senderData = m_subscriptionPtrI;
+ req->senderData = ptrI;
req->resultRef = suma.reference();
req->tableId = tabPtr.p->m_tableId;
req->requestInfo = 0;
@@ -1980,6 +1743,8 @@
m_currentTableId = tabPtr.p->m_tableId;
m_currentNoOfAttributes = attrBuf.getSize();
+
+ DBUG_VOID_RETURN;
}
@@ -1994,7 +1759,8 @@
void
SumaParticipant::execSCAN_FRAGCONF(Signal* signal){
jamEntry();
-
+ DBUG_ENTER("SumaParticipant::execSCAN_FRAGCONF");
+ ndbassert(signal->getNoOfSections() == 0);
CRASH_INSERTION(13011);
ScanFragConf * const conf = (ScanFragConf*)signal->getDataPtr();
@@ -2003,8 +1769,8 @@
const Uint32 senderData = conf->senderData;
const Uint32 completedOps = conf->completedOps;
- SubscriptionPtr subPtr;
- c_subscriptions.getPtr(subPtr, senderData);
+ Ptr<SyncRecord> syncPtr;
+ c_syncPool.getPtr(syncPtr, senderData);
if(completed != 2){
jam();
@@ -2017,25 +1783,25 @@
execSUB_SYNC_CONTINUE_CONF(signal);
#else
SubSyncContinueReq * const req = (SubSyncContinueReq*)signal->getDataPtrSend();
- req->subscriberData = subPtr.p->m_subscriberData;
+ req->subscriberData = syncPtr.p->m_senderData;
req->noOfRowsSent = completedOps;
- sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_SYNC_CONTINUE_REQ, signal,
+ sendSignal(syncPtr.p->m_senderRef, GSN_SUB_SYNC_CONTINUE_REQ, signal,
SubSyncContinueReq::SignalLength, JBB);
#endif
- return;
+ DBUG_VOID_RETURN;
}
ndbrequire(completedOps == 0);
- SyncRecord* tmp = c_syncPool.getPtr(subPtr.p->m_syncPtrI);
-
- tmp->m_currentFragment++;
- tmp->nextScan(signal);
+ syncPtr.p->m_currentFragment++;
+ syncPtr.p->nextScan(signal);
+ DBUG_VOID_RETURN;
}
void
SumaParticipant::execSUB_SYNC_CONTINUE_CONF(Signal * signal){
jamEntry();
+ ndbassert(signal->getNoOfSections() == 0);
CRASH_INSERTION(13012);
@@ -2061,25 +1827,39 @@
}
void
-SumaParticipant::SyncRecord::completeScan(Signal* signal){
+SumaParticipant::SyncRecord::completeScan(Signal* signal, int error)
+{
jam();
+ DBUG_ENTER("SumaParticipant::SyncRecord::completeS can");
// m_tableList.release();
- SubscriptionPtr subPtr;
- suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);
- ndbrequire(subPtr.p->m_syncPtrI == ptrI);
-
#if PRINT_ONLY
ndbout_c("GSN_SUB_SYNC_CONF (data)");
#else
- SubSyncConf * const conf = (SubSyncConf*)signal->getDataPtrSend();
- conf->subscriptionId = subPtr.p->m_subscriptionId;
- conf->subscriptionKey = subPtr.p->m_subscriptionKey;
- conf->part = SubscriptionData::TableData;
- conf->subscriberData = subPtr.p->m_subscriberData;
- suma.sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_SYNC_CONF, signal,
- SubSyncConf::SignalLength, JBB);
+ if (error == 0)
+ {
+ SubSyncConf * const conf = (SubSyncConf*)signal->getDataPtrSend();
+ conf->senderRef = suma.reference();
+ conf->senderData = m_senderData;
+ suma.sendSignal(m_senderRef, GSN_SUB_SYNC_CONF, signal,
+ SubSyncConf::SignalLength, JBB);
+ }
+ else
+ {
+ SubSyncRef * const ref = (SubSyncRef*)signal->getDataPtrSend();
+ ref->senderRef = suma.reference();
+ ref->senderData = m_senderData;
+ suma.sendSignal(m_senderRef, GSN_SUB_SYNC_REF, signal,
+ SubSyncRef::SignalLength, JBB);
+ }
#endif
+
+ release();
+ suma.c_syncPool.release(ptrI);
+ DBUG_PRINT("info",("c_syncPool size: %d free: %d",
+ suma.c_syncPool.getSize(),
+ suma.c_syncPool.getNoOfFree()));
+ DBUG_VOID_RETURN;
}
void
@@ -2107,6 +1887,7 @@
void
SumaParticipant::execSUB_START_REQ(Signal* signal){
jamEntry();
+ ndbassert(signal->getNoOfSections() == 0);
DBUG_ENTER("SumaParticipant::execSUB_START_REQ");
CRASH_INSERTION(13013);
@@ -2141,151 +1922,109 @@
DBUG_VOID_RETURN;
}
- Ptr<SyncRecord> syncPtr;
- c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI);
- if (syncPtr.p->m_locked) {
+ if (subPtr.p->m_state != Subscription:

EFINED) {
jam();
-#if 0
- ndbout_c("Locked");
-#endif
+ DBUG_PRINT("info",("Locked"));
sendSubStartRef(signal, 1411);
DBUG_VOID_RETURN;
}
- syncPtr.p->m_locked = true;
SubscriberPtr subbPtr;
if(!c_subscriberPool.seize(subbPtr)){
jam();
- syncPtr.p->m_locked = false;
sendSubStartRef(signal, 1412);
DBUG_VOID_RETURN;
}
+ DBUG_PRINT("info",("c_subscriberPool size: %d free: %d",
+ c_subscriberPool.getSize(),
+ c_subscriberPool.getNoOfFree()));
+
+ // setup subscription record
+ subPtr.p->m_state = Subscription::LOCKED;
+ // store these here for later use
+ subPtr.p->m_senderRef = senderRef;
+ subPtr.p->m_senderData = senderData;
- Uint32 type = subPtr.p->m_subscriptionType;
-
- subbPtr.p->m_senderRef = senderRef;
- subbPtr.p->m_senderData = senderData;
-
- switch (type) {
- case SubCreateReq::TableEvent:
- jam();
- // we want the data to return to the API not DICT
- subbPtr.p->m_subscriberRef = subscriberRef;
- // ndbout_c("start ref = %u", signal->getSendersBlockRef());
- // ndbout_c("ref = %u", subbPtr.p->m_subscriberRef);
- // we use the subscription id for now, should really be API choice
- subbPtr.p->m_subscriberData = subscriberData;
-
-#if 0
- if (RtoI(signal->getSendersBlockRef(), false) == RNIL) {
- jam();
- for (Uint32 i = 0; i < c_noNodesInGroup; i++) {
- Uint32 ref = calcSumaBlockRef(c_nodesInGroup[i]);
- if (ref != reference()) {
- jam();
- sendSubStartReq(subPtr, subbPtr, signal, ref);
- } else
- jam();
- }
- }
-#endif
- break;
- case SubCreateReq:

atabaseSnapshot:
- case SubCreateReq::SelectiveTableSnapshot:
- jam();
- subbPtr.p->m_subscriberRef = GREP_REF;
- subbPtr.p->m_subscriberData = subPtr.p->m_subscriberData;
- break;
- case SubCreateReq::SingleTableScan:
- jam();
- subbPtr.p->m_subscriberRef = subPtr.p->m_subscriberRef;
- subbPtr.p->m_subscriberData = subPtr.p->m_subscriberData;
- }
-
- subbPtr.p->m_subPtrI = subPtr.i;
+ // setup subscriber record
+ subbPtr.p->m_senderRef = subscriberRef;
+ subbPtr.p->m_senderData = subscriberData;
subbPtr.p->m_firstGCI = RNIL;
- if (type == SubCreateReq::TableEvent)
+ if (subPtr.p->m_subscriptionType == SubCreateReq::TableEvent)
subbPtr.p->m_lastGCI = 0;
else
subbPtr.p->m_lastGCI = RNIL; // disable usage of m_lastGCI
- bool ok = false;
-
+ subbPtr.p->m_subPtrI= subPtr.i;
+
+ DBUG_PRINT("info",("subscriber: %u[%u,%u] subscription: %u[%u,%u] "
+ "tableId: %u id: %u key: %u",
+ subbPtr.i, subbPtr.p->m_senderRef, subbPtr.p->m_senderData,
+ subPtr.i, subPtr.p->m_senderRef, subPtr.p->m_senderData,
+ subPtr.p->m_tableId,
+ subPtr.p->m_subscriptionId,subPtr.p->m_subscriptionKey));
+
+ TablePtr tabPtr;
switch(part){
case SubscriptionData::MetaData:
- ok = true;
jam();
c_metaSubscribers.add(subbPtr);
sendSubStartComplete(signal, subbPtr, 0, part);
- break;
+ DBUG_VOID_RETURN;
case SubscriptionData::TableData:
- ok = true;
jam();
- c_prepDataSubscribers.add(subbPtr);
- syncPtr.p->startTrigger(signal);
- break;
+ initTable(signal,subPtr.p->m_tableId,tabPtr,subbPtr);
+ tabPtr.p->n_subscribers++;
+ DBUG_PRINT("info",("SumaParticipant::Table[%u]::n_subscribers: %u",
+ tabPtr.p->m_tableId, tabPtr.p->n_subscribers));
+ DBUG_VOID_RETURN;
}
- ndbrequire(ok);
- DBUG_VOID_RETURN;
+ ndbrequire(false);
}
void
SumaParticipant::sendSubStartComplete(Signal* signal,
SubscriberPtr subbPtr,
Uint32 firstGCI,
- SubscriptionData::Part part){
+ SubscriptionData::Part part)
+{
jam();
+ DBUG_ENTER("SumaParticipant::sendSubStartComplete" );
SubscriptionPtr subPtr;
c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI);
+ ndbrequire( subPtr.p->m_state == Subscription::LOCKED )
+ subPtr.p->m_state = Subscription:

EFINED;
+ subPtr.p->n_subscribers++;
+
+ DBUG_PRINT("info",("subscriber: %u[%u,%u] subscription: %u[%u,%u] "
+ "tableId: %u id: %u key: %u",
+ subbPtr.i, subbPtr.p->m_senderRef, subbPtr.p->m_senderData,
+ subPtr.i, subPtr.p->m_senderRef, subPtr.p->m_senderData,
+ subPtr.p->m_tableId,
+ subPtr.p->m_subscriptionId,subPtr.p->m_subscriptionKey));
- Ptr<SyncRecord> syncPtr;
- c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI);
- syncPtr.p->m_locked = false;
-
- SubStartConf * const conf = (SubStartConf*)signal->getDataPtrSend();
+ SubStartConf * const conf = (SubStartConf*)signal->getDataPtrSend();
conf->senderRef = reference();
- conf->senderData = subbPtr.p->m_senderData;
+ conf->senderData = subPtr.p->m_senderData;
conf->subscriptionId = subPtr.p->m_subscriptionId;
conf->subscriptionKey = subPtr.p->m_subscriptionKey;
- conf->firstGCI = firstGCI;
- conf->part = (Uint32) part;
-
- conf->subscriberData = subPtr.p->m_subscriberData;
- sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_START_CONF, signal,
+ conf->firstGCI = firstGCI;
+ conf->part = (Uint32) part;
+
+ DBUG_PRINT("info",("subscriber: %u id: %u key: %u", subbPtr.i,
+ subPtr.p->m_subscriptionId,subPtr.p->m_subscriptionKey));
+ sendSignal(subPtr.p->m_senderRef, GSN_SUB_START_CONF, signal,
SubStartConf::SignalLength, JBB);
+ DBUG_VOID_RETURN;
}
-#if 0
-void
-SumaParticipant::sendSubStartRef(SubscriptionPtr subPtr,
- Signal* signal, Uint32 errCode,
- bool temporary){
- jam();
- SubStartRef * ref = (SubStartRef *)signal->getDataPtrSend();
- xxx ref->senderRef = reference();
- xxx ref->senderData = subPtr.p->m_senderData;
- ref->subscriptionId = subPtr.p->m_subscriptionId;
- ref->subscriptionKey = subPtr.p->m_subscriptionKey;
- ref->part = (Uint32) subPtr.p->m_subscriptionType;
- ref->subscriberData = subPtr.p->m_subscriberData;
- ref->err = errCode;
- if (temporary) {
- jam();
- ref->setTemporary();
- }
- releaseSections(signal);
- sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_START_REF, signal,
- SubStartRef::SignalLength, JBB);
-}
-#endif
void
SumaParticipant::sendSubStartRef(Signal* signal, Uint32 errCode)
{
jam();
SubStartRef * ref = (SubStartRef *)signal->getDataPtrSend();
- ref->senderRef = reference();
- ref->err = errCode;
+ ref->senderRef = reference();
+ ref->errorCode = errCode;
releaseSections(signal);
sendSignal(signal->getSendersBlockRef(), GSN_SUB_START_REF, signal,
SubStartRef::SignalLength, JBB);
@@ -2300,354 +2039,508 @@
SubscriptionPtr subPtr;
c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI);
- Ptr<SyncRecord> syncPtr;
- c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI);
+ ndbrequire( subPtr.p->m_state == Subscription::LOCKED );
+ subPtr.p->m_state = Subscription:

EFINED;
SubStartRef * ref= (SubStartRef *)signal->getDataPtrSend();
ref->senderRef = reference();
- ref->senderData = subbPtr.p->m_senderData;
+ ref->senderData = subPtr.p->m_senderData;
ref->subscriptionId = subPtr.p->m_subscriptionId;
ref->subscriptionKey = subPtr.p->m_subscriptionKey;
ref->part = (Uint32) part;
- ref->err = error;
+ ref->errorCode = error;
- sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_START_REF, signal,
+ sendSignal(subPtr.p->m_senderRef, GSN_SUB_START_REF, signal,
SubStartRef::SignalLength, JBB);
}
/************************************************** ********
+ * Suma participant interface
*
- * Trigger admin interface
+ * Stopping and removing of subscriber
*
*/
void
-SumaParticipant::SyncRecord::startTrigger(Signal* signal){
- jam();
- m_currentTable = 0;
- m_latestTriggerId = RNIL;
- nextTrigger(signal);
+SumaParticipant::execSUB_STOP_REQ(Signal* signal){
+ jamEntry();
+ ndbassert(signal->getNoOfSections() == 0);
+ DBUG_ENTER("SumaParticipant::execSUB_STOP_REQ");
+
+ CRASH_INSERTION(13019);
+
+ SubStopReq * const req = (SubStopReq*)signal->getDataPtr();
+ Uint32 senderRef = req->senderRef;
+ Uint32 senderData = req->senderData;
+ Uint32 subscriberRef = req->subscriberRef;
+ Uint32 subscriberData = req->subscriberData;
+ SubscriptionPtr subPtr;
+ Subscription key;
+ key.m_subscriptionId = req->subscriptionId;
+ key.m_subscriptionKey = req->subscriptionKey;
+ Uint32 part = req->part;
+
+ if (key.m_subscriptionKey == 0 &&
+ key.m_subscriptionId == 0 &&
+ subscriberData == 0)
+ {
+ SubStopConf* conf = (SubStopConf*)signal->getDataPtrSend();
+
+ conf->senderRef = reference();
+ conf->senderData = senderData;
+ conf->subscriptionId = key.m_subscriptionId;
+ conf->subscriptionKey = key.m_subscriptionKey;
+ conf->subscriberData = subscriberData;
+
+ sendSignal(senderRef, GSN_SUB_STOP_CONF, signal,
+ SubStopConf::SignalLength, JBB);
+
+ removeSubscribersOnNode(signal, refToNode(senderRef));
+ DBUG_VOID_RETURN;
+ }
+
+ if(!c_subscriptions.find(subPtr, key)){
+ jam();
+ DBUG_PRINT("error", ("not found"));
+ sendSubStopRef(signal, 1407);
+ DBUG_VOID_RETURN;
+ }
+
+ if (subPtr.p->m_state == Subscription::LOCKED) {
+ jam();
+ DBUG_PRINT("error", ("locked"));
+ sendSubStopRef(signal, 1411);
+ DBUG_VOID_RETURN;
+ }
+
+ ndbrequire(part == SubscriptionData::TableData);
+
+ TablePtr tabPtr;
+ if (!c_tables.find(tabPtr, subPtr.p->m_tableId))
+ {
+ jam();
+ DBUG_PRINT("error", ("no such table id %u", subPtr.p->m_tableId));
+ sendSubStopRef(signal, 1417);
+ DBUG_VOID_RETURN;
+ }
+
+ DBUG_PRINT("info",("subscription: %u tableId: %u id: %u key: %u",
+ subPtr.i, subPtr.p->m_tableId,
+ subPtr.p->m_subscriptionId,subPtr.p->m_subscriptionKey));
+
+ SubscriberPtr subbPtr;
+ if (senderRef == reference()){
+ jam();
+ c_subscriberPool.getPtr(subbPtr, senderData);
+ ndbrequire(subbPtr.p->m_subPtrI == subPtr.i &&
+ subbPtr.p->m_senderRef == subscriberRef &&
+ subbPtr.p->m_senderData == subscriberData);
+ c_removeDataSubscribers.remove(subbPtr);
+ }
+ else
+ {
+ jam();
+ LocalDLList<Subscriber>
+ subscribers(c_subscriberPool,tabPtr.p->c_subscribers);
+
+ DBUG_PRINT("info",("search: subscription: %u, ref: %u, data: %d",
+ subPtr.i, subscriberRef, subscriberData));
+ for (subscribers.first(subbPtr);!subbPtr.isNull();subs cribers.next(subbPtr))
+ {
+ jam();
+ DBUG_PRINT("info",
+ ("search: subscription: %u, ref: %u, data: %u, subscriber %u",
+ subbPtr.p->m_subPtrI, subbPtr.p->m_senderRef,
+ subbPtr.p->m_senderData, subbPtr.i));
+ if (subbPtr.p->m_subPtrI == subPtr.i &&
+ subbPtr.p->m_senderRef == subscriberRef &&
+ subbPtr.p->m_senderData == subscriberData)
+ {
+ jam();
+ DBUG_PRINT("info",("found"));
+ break;
+ }
+ }
+ /**
+ * If we didn't find anyone, send ref
+ */
+ if (subbPtr.isNull()) {
+ jam();
+ DBUG_PRINT("error", ("subscriber not found"));
+ sendSubStopRef(signal, 1407);
+ DBUG_VOID_RETURN;
+ }
+ subscribers.remove(subbPtr);
+ }
+
+ subbPtr.p->m_senderRef = senderRef; // store ref to requestor
+ subbPtr.p->m_senderData = senderData; // store ref to requestor
+
+ tabPtr.p->m_drop_subbPtr= subbPtr;
+
+ if (subPtr.p->m_state == Subscription:

EFINED)
+ {
+ jam();
+ subPtr.p->m_state = Subscription::LOCKED;
+ }
+
+ if (tabPtr.p->m_state == Table:

ROPPED)
+ {
+ jam();
+ SubscriberPtr subbPtr= tabPtr.p->m_drop_subbPtr;
+ tabPtr.p->n_subscribers--;
+ DBUG_PRINT("info",("SumaParticipant::Table[%u]::n_subscribers: %u",
+ tabPtr.p->m_tableId, tabPtr.p->n_subscribers));
+ tabPtr.p->checkRelease(*this);
+ sendSubStopComplete(signal, subbPtr);
+ }
+ else
+ {
+ jam();
+ tabPtr.p->dropTrigger(signal,*this);
+ }
+ DBUG_VOID_RETURN;
}
void
-SumaParticipant::SyncRecord::nextTrigger(Signal* signal){
+SumaParticipant::sendSubStopComplete(Signal* signal, SubscriberPtr subbPtr)
+{
jam();
+ DBUG_ENTER("SumaParticipant::sendSubStopComplete") ;
+ CRASH_INSERTION(13020);
- TableList:

ataBufferIterator it;
-
- if(!m_tableList.position(it, m_currentTable)){
- completeTrigger(signal);
- return;
- }
+ DBUG_PRINT("info",("removed subscriber: %i", subbPtr.i));
SubscriptionPtr subPtr;
- suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);
- ndbrequire(subPtr.p->m_syncPtrI == ptrI);
- const Uint32 RT_BREAK = 48;
- Uint32 latestTriggerId = 0;
- for(Uint32 i = 0; i<RT_BREAK && !it.isNull(); i++, m_tableList.next(it)){
- TablePtr tabPtr;
-#if 0
- ndbout_c("nextTrigger tableid %u", *it.data);
-#endif
- ndbrequire(suma.c_tables.find(tabPtr, *it.data));
+ c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI);
- AttributeMask attrMask;
- createAttributeMask(attrMask, tabPtr.p);
+ subPtr.p->n_subscribers--;
+ ndbrequire( subPtr.p->m_state == Subscription::LOCKED ||
+ subPtr.p->m_state == Subscription:

ROPPED);
+ if ( subPtr.p->m_state == Subscription::LOCKED )
+ {
+ jam();
+ subPtr.p->m_state = Subscription:

EFINED;
+ }
+ else if ( subPtr.p->n_subscribers == 0 )
+ {
+ // subscription is marked to be removed
+ // and there are no subscribers left
+ jam();
+ completeSubRemove(subPtr);
+ }
- for(Uint32 j = 0; j<3; j++){
- i++;
- latestTriggerId = (tabPtr.p->m_schemaVersion << 18) |
- (j << 16) | tabPtr.p->m_tableId;
- if(tabPtr.p->m_hasTriggerDefined[j] == 0) {
- ndbrequire(tabPtr.p->m_triggerIds[j] == ILLEGAL_TRIGGER_ID);
-#if 0
- ndbout_c("DEFINING trigger on table %u[%u]", tabPtr.p->m_tableId, j);
-#endif
- CreateTrigReq * const req = (CreateTrigReq*)signal->getDataPtrSend();
- req->setUserRef(SUMA_REF);
- req->setConnectionPtr(ptrI);
- req->setTriggerType(TriggerType::SUBSCRIPTION_BEFORE);
- req->setTriggerActionTime(TriggerActionTime::TA_DETACH ED);
- req->setMonitorReplicas(true);
- req->setMonitorAllAttributes(false);
- req->setReceiverRef(SUMA_REF);
- req->setTriggerId(latestTriggerId);
- req->setTriggerEvent((TriggerEvent::Value)j);
- req->setTableId(tabPtr.p->m_tableId);
- req->setAttributeMask(attrMask);
- suma.sendSignal(DBTUP_REF, GSN_CREATE_TRIG_REQ,
- signal, CreateTrigReq::SignalLength, JBB);
+ SubStopConf * const conf = (SubStopConf*)signal->getDataPtrSend();
+
+ conf->senderRef = reference();
+ conf->senderData = subbPtr.p->m_senderData;
- } else {
- /**
- * Faking that a trigger has been created in order to
- * simulate the proper behaviour.
- * Perhaps this should be a dummy signal instead of
- * (ab)using CREATE_TRIG_CONF.
- */
- CreateTrigConf * conf = (CreateTrigConf*)signal->getDataPtrSend();
- conf->setConnectionPtr(ptrI);
- conf->setTableId(tabPtr.p->m_tableId);
- conf->setTriggerId(latestTriggerId);
- suma.sendSignal(SUMA_REF,GSN_CREATE_TRIG_CONF,
- signal, CreateTrigConf::SignalLength, JBB);
-
- }
+ sendSignal(subbPtr.p->m_senderRef, GSN_SUB_STOP_CONF, signal,
+ SubStopConf::SignalLength, JBB);
+ c_subscriberPool.release(subbPtr);
+ DBUG_PRINT("info",("c_subscriberPool size: %d free: %d",
+ c_subscriberPool.getSize(),
+ c_subscriberPool.getNoOfFree()));
+ DBUG_VOID_RETURN;
+}
+
+void
+SumaParticipant::sendSubStopRef(Signal* signal, Uint32 errCode)
+{
+ jam();
+ DBUG_ENTER("SumaParticipant::sendSubStopRef");
+ SubStopRef * ref = (SubStopRef *)signal->getDataPtrSend();
+ ref->senderRef = reference();
+ ref->errorCode = errCode;
+ sendSignal(signal->getSendersBlockRef(),
+ GSN_SUB_STOP_REF,
+ signal,
+ SubStopRef::SignalLength,
+ JBB);
+ DBUG_VOID_RETURN;
+}
+
+/************************************************** ********
+ *
+ * Trigger admin interface
+ *
+ */
+
+int
+SumaParticipant::Table::setupTrigger(Signal* signal,
+ SumaParticipant &suma)
+{
+ jam();
+ DBUG_ENTER("SumaParticipant::Table::setupTrigger") ;
+
+ int ret= 0;
+
+ AttributeMask attrMask;
+ createAttributeMask(attrMask, suma);
+
+ for(Uint32 j = 0; j<3; j++)
+ {
+ Uint32 triggerId = (m_schemaVersion << 18) | (j << 16) | m_ptrI;
+ if(m_hasTriggerDefined[j] == 0)
+ {
+ suma.suma_ndbrequire(m_triggerIds[j] == ILLEGAL_TRIGGER_ID);
+ DBUG_PRINT("info",("DEFINING trigger on table %u[%u]", m_tableId, j));
+ CreateTrigReq * const req = (CreateTrigReq*)signal->getDataPtrSend();
+ req->setUserRef(SUMA_REF);
+ req->setConnectionPtr(m_ptrI);
+ req->setTriggerType(TriggerType::SUBSCRIPTION_BEFORE);
+ req->setTriggerActionTime(TriggerActionTime::TA_DETACH ED);
+ req->setMonitorReplicas(true);
+ req->setMonitorAllAttributes(false);
+ req->setReceiverRef(SUMA_REF);
+ req->setTriggerId(triggerId);
+ req->setTriggerEvent((TriggerEvent::Value)j);
+ req->setTableId(m_tableId);
+ req->setAttributeMask(attrMask);
+ suma.sendSignal(DBTUP_REF, GSN_CREATE_TRIG_REQ,
+ signal, CreateTrigReq::SignalLength, JBB);
+ ret= 1;
+ }
+ else
+ {
+ m_hasTriggerDefined[j]++;
+ DBUG_PRINT("info",("REFCOUNT trigger on table %u[%u] %u",
+ m_tableId, j, m_hasTriggerDefined[j]));
}
- m_currentTable++;
}
- m_latestTriggerId = latestTriggerId;
+ DBUG_RETURN(ret);
}
void
-SumaParticipant::SyncRecord::createAttributeMask(A ttributeMask& mask,
- Table * table){
+SumaParticipant::Table::createAttributeMask(Attri buteMask& mask,
+ SumaParticipant &suma)
+{
jam();
mask.clear();
DataBuffer<15>:

ataBufferIterator it;
- LocalDataBuffer<15> attrBuf(suma.c_dataBufferPool, table->m_attributes);
+ LocalDataBuffer<15> attrBuf(suma.c_dataBufferPool, m_attributes);
for(attrBuf.first(it); !it.curr.isNull(); attrBuf.next(it)){
mask.set(* it.data);
}
}
void
-SumaParticipant::SyncRecord::runCREATE_TRIG_CONF(S ignal* signal){
- jam();
-
+SumaParticipant::execCREATE_TRIG_CONF(Signal* signal){
+ jamEntry();
+ DBUG_ENTER("SumaParticipant::execCREATE_TRIG_CONF" );
+ ndbassert(signal->getNoOfSections() == 0);
CreateTrigConf * const conf = (CreateTrigConf*)signal->getDataPtr();
const Uint32 triggerId = conf->getTriggerId();
Uint32 type = (triggerId >> 16) & 0x3;
Uint32 tableId = conf->getTableId();
TablePtr tabPtr;
- ndbrequire(suma.c_tables.find(tabPtr, tableId));
+ c_tables.getPtr(tabPtr, conf->getConnectionPtr());
+ ndbrequire(tabPtr.p->m_tableId == tableId);
+ ndbrequire(tabPtr.p->m_state == Table:

EFINING);
ndbrequire(type < 3);
tabPtr.p->m_triggerIds[type] = triggerId;
- tabPtr.p->m_hasTriggerDefined[type]++;
+ ndbrequire(tabPtr.p->m_hasTriggerDefined[type] == 0);
+ tabPtr.p->m_hasTriggerDefined[type] = 1;
- if(triggerId == m_latestTriggerId){
- jam();
- nextTrigger(signal);
+ if (type == 2)
+ {
+ completeInitTable(signal,tabPtr);
+ DBUG_VOID_RETURN;
}
+ DBUG_VOID_RETURN;
}
void
-SumaParticipant::SyncRecord::runCREATE_TRIG_REF(Si gnal* signal){
- jam();
-
+SumaParticipant::execCREATE_TRIG_REF(Signal* signal){
+ jamEntry();
+ DBUG_ENTER("SumaParticipant::execCREATE_TRIG_CONF" );
+ ndbassert(signal->getNoOfSections() == 0);
CreateTrigRef * const ref = (CreateTrigRef*)signal->getDataPtr();
const Uint32 triggerId = ref->getTriggerId();
Uint32 type = (triggerId >> 16) & 0x3;
Uint32 tableId = ref->getTableId();
TablePtr tabPtr;
- ndbrequire(suma.c_tables.find(tabPtr, tableId));
+ c_tables.getPtr(tabPtr, ref->getConnectionPtr());
+ ndbrequire(tabPtr.p->m_tableId == tableId);
+ ndbrequire(tabPtr.p->m_state == Table:

EFINING);
- m_error= ref->getErrorCode();
+ tabPtr.p->m_error= ref->getErrorCode();
ndbrequire(type < 3);
- if(triggerId == m_latestTriggerId){
- jam();
- nextTrigger(signal);
- }
-}
-
-void
-SumaParticipant::SyncRecord::completeTrigger(Signa l* signal){
- jam();
- SubscriptionPtr subPtr;
- CRASH_INSERTION(13013);
-#ifdef EVENT_PH3_DEBUG
- ndbout_c("SumaParticipant: trigger completed");
-#endif
- suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);
- ndbrequire(subPtr.p->m_syncPtrI == ptrI);
-
- SubscriberPtr subbPtr;
- {
- bool found = false;
-
- for(suma.c_prepDataSubscribers.first(subbPtr);
- !subbPtr.isNull(); suma.c_prepDataSubscribers.next(subbPtr)) {
- jam();
- if(subbPtr.p->m_subPtrI == subPtr.i) {
- jam();
- found = true;
- break;
- }
- }
- ndbrequire(found);
- suma.c_prepDataSubscribers.remove(subbPtr);
- }
-
- if (m_error)
+ if (type == 2)
{
- suma.sendSubStartRef(signal,subbPtr,m_error,Subscr iptionData::TableData);
- return;
+ completeInitTable(signal,tabPtr);
+ DBUG_VOID_RETURN;
}
- Uint32 gci= suma.getFirstGCI(signal);
- subbPtr.p->m_firstGCI = gci;
- suma.c_dataSubscribers.add(subbPtr);
- suma.sendSubStartComplete(signal,subbPtr,gci,Subsc riptionData::TableData);
-}
-
-void
-SumaParticipant::SyncRecord::startDropTrigger(Sign al* signal){
- jam();
- m_currentTable = 0;
- m_latestTriggerId = RNIL;
- nextDropTrigger(signal);
+ DBUG_VOID_RETURN;
}
void
-SumaParticipant::SyncRecord::nextDropTrigger(Signa l* signal){
+SumaParticipant::Table::dropTrigger(Signal* signal,SumaParticipant& suma)
+{
jam();
-
- TableList:

ataBufferIterator it;
+ DBUG_ENTER("SumaParticipant::dropTrigger");
- if(!m_tableList.position(it, m_currentTable)){
- completeDropTrigger(signal);
- return;
- }
-
- SubscriptionPtr subPtr;
- suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);
- ndbrequire(subPtr.p->m_syncPtrI == ptrI);
-
- const Uint32 RT_BREAK = 48;
- Uint32 latestTriggerId = 0;
- for(Uint32 i = 0; i<RT_BREAK && !it.isNull(); i++, m_tableList.next(it)){
+ for(Uint32 j = 0; j<3; j++){
jam();
- TablePtr tabPtr;
-#if 0
- ndbout_c("nextDropTrigger tableid %u", *it.data);
-#endif
- ndbrequire(suma.c_tables.find(tabPtr, * it.data));
-
- for(Uint32 j = 0; j<3; j++){
+ suma.suma_ndbrequire(m_triggerIds[j] != ILLEGAL_TRIGGER_ID);
+ if(m_hasTriggerDefined[j] == 1) {
jam();
- ndbrequire(tabPtr.p->m_triggerIds[j] != ILLEGAL_TRIGGER_ID);
- i++;
- latestTriggerId = tabPtr.p->m_triggerIds[j];
- if(tabPtr.p->m_hasTriggerDefined[j] == 1) {
- jam();
- DropTrigReq * const req = (DropTrigReq*)signal->getDataPtrSend();
- req->setConnectionPtr(ptrI);
- req->setUserRef(SUMA_REF); // Sending to myself
- req->setRequestType(DropTrigReq::RT_USER);
- req->setTriggerType(TriggerType::SUBSCRIPTION_BEFORE);
- req->setTriggerActionTime(TriggerActionTime::TA_DETACH ED);
- req->setIndexId(RNIL);
-
- req->setTableId(tabPtr.p->m_tableId);
- req->setTriggerId(latestTriggerId);
- req->setTriggerEvent((TriggerEvent::Value)j);
+ DropTrigReq * const req = (DropTrigReq*)signal->getDataPtrSend();
+ req->setConnectionPtr(m_ptrI);
+ req->setUserRef(SUMA_REF); // Sending to myself
+ req->setRequestType(DropTrigReq::RT_USER);
+ req->setTriggerType(TriggerType::SUBSCRIPTION_BEFORE);
+ req->setTriggerActionTime(TriggerActionTime::TA_DETACH ED);
+ req->setIndexId(RNIL);
-#if 0
- ndbout_c("DROPPING trigger %u = %u %u %u on table %u[%u]",
- latestTriggerId,TriggerType::SUBSCRIPTION_BEFORE,
- TriggerActionTime::TA_DETACHED, j, tabPtr.p->m_tableId, j);
-#endif
- suma.sendSignal(DBTUP_REF, GSN_DROP_TRIG_REQ,
- signal, DropTrigReq::SignalLength, JBB);
- } else {
- jam();
- ndbrequire(tabPtr.p->m_hasTriggerDefined[j] > 1);
- /**
- * Faking that a trigger has been dropped in order to
- * simulate the proper behaviour.
- * Perhaps this should be a dummy signal instead of
- * (ab)using DROP_TRIG_CONF.
- */
- DropTrigConf * conf = (DropTrigConf*)signal->getDataPtrSend();
- conf->setConnectionPtr(ptrI);
- conf->setTableId(tabPtr.p->m_tableId);
- conf->setTriggerId(latestTriggerId);
- suma.sendSignal(SUMA_REF,GSN_DROP_TRIG_CONF,
- signal, DropTrigConf::SignalLength, JBB);
- }
+ req->setTableId(m_tableId);
+ req->setTriggerId(m_triggerIds[j]);
+ req->setTriggerEvent((TriggerEvent::Value)j);
+
+ DBUG_PRINT("info",("DROPPING trigger %u = %u %u %u on table %u[%u]",
+ m_triggerIds[j],
+ TriggerType::SUBSCRIPTION_BEFORE,
+ TriggerActionTime::TA_DETACHED,
+ j,
+ m_tableId, j));
+ suma.sendSignal(DBTUP_REF, GSN_DROP_TRIG_REQ,
+ signal, DropTrigReq::SignalLength, JBB);
+ } else {
+ jam();
+ suma.suma_ndbrequire(m_hasTriggerDefined[j] > 1);
+ runDropTrigger(signal,m_triggerIds[j],suma);
}
- m_currentTable++;
}
- m_latestTriggerId = latestTriggerId;
+ DBUG_VOID_RETURN;
}
void
-SumaParticipant::SyncRecord::runDROP_TRIG_REF(Sign al* signal){
- jam();
+SumaParticipant::execDROP_TRIG_REF(Signal* signal){
+ jamEntry();
+ DBUG_ENTER("SumaParticipant::execDROP_TRIG_REF");
+ ndbassert(signal->getNoOfSections() == 0);
DropTrigRef * const ref = (DropTrigRef*)signal->getDataPtr();
- if (ref->getErrorCode() != DropTrigRef::TriggerNotFound){
+ if (ref->getErrorCode() != DropTrigRef::TriggerNotFound)
+ {
ndbrequire(false);
}
- const Uint32 triggerId = ref->getTriggerId();
- Uint32 tableId = ref->getTableId();
- runDropTrig(signal, triggerId, tableId);
+ TablePtr tabPtr;
+ c_tables.getPtr(tabPtr, ref->getConnectionPtr());
+ ndbrequire(ref->getTableId() == tabPtr.p->m_tableId);
+
+ tabPtr.p->runDropTrigger(signal, ref->getTriggerId(), *this);
+ DBUG_VOID_RETURN;
}
void
-SumaParticipant::SyncRecord::runDROP_TRIG_CONF(Sig nal* signal){
- jam();
-
+SumaParticipant::execDROP_TRIG_CONF(Signal* signal){
+ jamEntry();
+ DBUG_ENTER("SumaParticipant::execDROP_TRIG_CONF");
+ ndbassert(signal->getNoOfSections() == 0);
+
DropTrigConf * const conf = (DropTrigConf*)signal->getDataPtr();
- const Uint32 triggerId = conf->getTriggerId();
- Uint32 tableId = conf->getTableId();
- runDropTrig(signal, triggerId, tableId);
+ TablePtr tabPtr;
+ c_tables.getPtr(tabPtr, conf->getConnectionPtr());
+ ndbrequire(conf->getTableId() == tabPtr.p->m_tableId);
+
+ tabPtr.p->runDropTrigger(signal, conf->getTriggerId(),*this);
+ DBUG_VOID_RETURN;
}
void
-SumaParticipant::SyncRecord::runDropTrig(Signal* signal,
- Uint32 triggerId,
- Uint32 tableId){
+SumaParticipant::Table::runDropTrigger(Signal* signal,
+ Uint32 triggerId,
+ SumaParticipant &suma)
+{
+ jam();
Uint32 type = (triggerId >> 16) & 0x3;
-
- TablePtr tabPtr;
- ndbrequire(suma.c_tables.find(tabPtr, tableId));
- ndbrequire(type < 3);
- ndbrequire(tabPtr.p->m_triggerIds[type] == triggerId);
- tabPtr.p->m_hasTriggerDefined[type]--;
- if (tabPtr.p->m_hasTriggerDefined[type] == 0) {
+ suma.suma_ndbrequire(type < 3);
+ suma.suma_ndbrequire(m_triggerIds[type] == triggerId);
+ m_hasTriggerDefined[type]--;
+ if (m_hasTriggerDefined[type] == 0)
+ {
jam();
- tabPtr.p->m_triggerIds[type] = ILLEGAL_TRIGGER_ID;
+ m_triggerIds[type] = ILLEGAL_TRIGGER_ID;
}
- if(triggerId == m_latestTriggerId){
+ if( m_hasTriggerDefined[0] != m_hasTriggerDefined[1] ||
+ m_hasTriggerDefined[0] != m_hasTriggerDefined[2])
+ {
+ // more to come
jam();
- nextDropTrigger(signal);
+ return;
}
-}
-void
-SumaParticipant::SyncRecord::completeDropTrigger(S ignal* signal){
- jam();
- SubscriptionPtr subPtr;
- CRASH_INSERTION(13014);
#if 0
ndbout_c("trigger completed");
#endif
+ n_subscribers--;
+ DBUG_PRINT("info",("SumaParticipant::Table[%u]::n_subscribers: %u",
+ m_tableId, n_subscribers));
+ checkRelease(suma);
- suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);
- ndbrequire(subPtr.p->m_syncPtrI == ptrI);
+ suma.sendSubStopComplete(signal, m_drop_subbPtr);
+}
- bool found = false;
- SubscriberPtr subbPtr;
- for(suma.c_prepDataSubscribers.first(subbPtr);
- !subbPtr.isNull(); suma.c_prepDataSubscribers.next(subbPtr)) {
+void SumaParticipant::suma_ndbrequire(bool v) { ndbrequire(v); }
+
+void
+SumaParticipant::Table::checkRelease(SumaParticip ant &suma)
+{
+ jam();
+ DBUG_ENTER("SumaParticipant::Table::checkRelease") ;
+ if (n_subscribers == 0)
+ {
jam();
- if(subbPtr.p->m_subPtrI == subPtr.i) {
- jam();
- found = true;
- break;
+ suma.suma_ndbrequire(m_hasTriggerDefined[0] == 0);
+ suma.suma_ndbrequire(m_hasTriggerDefined[1] == 0);
+ suma.suma_ndbrequire(m_hasTriggerDefined[2] == 0);
+ LocalDLList<Subscriber>
+ subscribers(suma.c_subscriberPool,c_subscribers);
+ if (!subscribers.isEmpty())
+ {
+ SubscriberPtr subbPtr;
+ for (subscribers.first(subbPtr);!subbPtr.isNull(); subscribers.next(subbPtr))
+ {
+ jam();
+ DBUG_PRINT("info",("subscriber: %u", subbPtr.i));
+ }
+ suma.suma_ndbrequire(false);
+ }
+#if 0
+ LocalDLList<SyncRecord>
+ syncRecords(suma.c_syncPool,c_syncRecords);
+ if (!syncRecords.isEmpty())
+ {
+ Ptr<SyncRecord> syncPtr;
+ for (syncRecords.first(syncPtr);!syncPtr.isNull(); syncRecords.next(syncPtr))
+ {
+ jam();
+ DBUG_PRINT("info",("syncRecord: %u", syncPtr.i));
+ }
+ suma.suma_ndbrequire(false);
}
+#endif
+ release(suma);
+ suma.c_tables.remove(m_ptrI);
+ suma.c_tablePool_.release(m_ptrI);
+ DBUG_PRINT("info",("c_tablePool size: %d free: %d",
+ suma.c_tablePool_.getSize(),
+ suma.c_tablePool_.getNoOfFree()));
}
- ndbrequire(found);
- suma.sendSubStopComplete(signal, subbPtr);
+ else
+ {
+ DBUG_PRINT("info",("n_subscribers: %d", n_subscribers));
+ }
+ DBUG_VOID_RETURN;
}
/************************************************** ********
@@ -2722,9 +2615,9 @@
* Initialize signal
*/
SubTableData * sdata = (SubTableData*)signal->getDataPtrSend();
- Uint32 ref = subPtr.p->m_subscriberRef;
+ Uint32 ref = subPtr.p->m_senderRef;
sdata->tableId = syncPtr.p->m_currentTableId;
- sdata->senderData = subPtr.p->m_subscriberData;
+ sdata->senderData = subPtr.p->m_senderData;
sdata->operation = 3; // Scan
sdata->gci = 1; // Undefined
#if PRINT_ONLY
@@ -2896,20 +2789,35 @@
}
void
-SumaParticipant::execFIRE_TRIG_ORD(Signal* signal){
+SumaParticipant::execFIRE_TRIG_ORD(Signal* signal)
+{
jamEntry();
DBUG_ENTER("SumaParticipant::execFIRE_TRIG_ORD");
+ ndbassert(signal->getNoOfSections() == 0);
+
CRASH_INSERTION(13016);
FireTrigOrd* const trg = (FireTrigOrd*)signal->getDataPtr();
const Uint32 trigId = trg->getTriggerId();
const Uint32 hashValue = trg->getHashValue();
const Uint32 gci = trg->getGCI();
const Uint32 event = trg->getTriggerEvent();
- const Uint32 triggerId = trg->getTriggerId();
- Uint32 tableId = triggerId & 0xFFFF;
+ TablePtr tabPtr;
+ tabPtr.i = trigId & 0xFFFF;
ndbrequire(f_bufferLock == trigId);
-
+ /**
+ * Reset f_bufferLock
+ */
+ f_bufferLock = 0;
+ b_bufferLock = 0;
+
+ bool replicaFlag = decideWhoToSend(getStoreBucket(hashValue), gci);
+ if (replicaFlag) {
+ jam();
+ c_failoverBuffer.subTableData(gci,NULL,0);
+ DBUG_VOID_RETURN;
+ }
+
Uint32 sz = trg->getNoOfPrimaryKeyWords()+trg->getNoOfAfterValueWords();
ndbrequire(sz == f_trigBufferSize);
@@ -2956,22 +2864,27 @@
nptr = 2;
}
- // right now only for tableEvent
- bool replicaFlag = decideWhoToSend(getStoreBucket(hashValue), gci);
-
/**
* Signal to subscriber(s)
*/
+ ndbrequire(tabPtr.p = c_tablePool_.getPtr(tabPtr.i));
+
SubTableData * data = (SubTableData*)signal->getDataPtrSend();//trg;
data->gci = gci;
- data->tableId = tableId;
+ data->tableId = tabPtr.p->m_tableId;
data->operation = event;
data->noOfAttributes = noOfAttrs;
- data->dataSize = dataLen;
-
+ data->dataSize = dataLen;
+ data->logType = 0;
+ if (c_lastInconsistentGCI == data->gci) {
+ data->setGCINotConsistent();
+ }
+
+ LocalDLList<Subscriber>
+ subscribers(c_subscriberPool,tabPtr.p->c_subscribers);
SubscriberPtr subbPtr;
- for(c_dataSubscribers.first(subbPtr); !subbPtr.isNull();
- c_dataSubscribers.next(subbPtr)){
+ for(subscribers.first(subbPtr);!subbPtr.isNull();s ubscribers.next(subbPtr))
+ {
if (c_handoverToDo && subbPtr.p->m_firstGCI > gci)
{
DBUG_PRINT("info",("c_handoverToDo: %d, m_firstGCI = %d, gci = %d",
@@ -2980,87 +2893,22 @@
// we're restarting and waiting for the right gci
continue;
}
-
- jam();
-
- /*
- * get subscription ptr for this subscriber
- */
- SubscriptionPtr subPtr;
- c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI);
-
- if(!subPtr.p->m_tables.get(tableId)) {
- jam();
- continue;
- //continue in for-loop if the table is not part of
- //the subscription. Otherwise, send data to subscriber.
- }
-
- const Uint32 ref = subbPtr.p->m_subscriberRef;
- data->senderData= subbPtr.p->m_subscriberData;
-
- if (subPtr.p->m_subscriptionType == SubCreateReq::TableEvent) {
- if (replicaFlag) {
- jam();
- c_failoverBuffer.subTableData(gci,NULL,0);
- continue;
- }
- jam();
- Uint32 tmp = data->logType;
- if (c_lastInconsistentGCI == data->gci) {
- data->setGCINotConsistent();
- }
-
-#ifdef HANDOVER_DEBUG
- {
- static int aLongGCIName = 0;
- if (data->gci != aLongGCIName) {
- aLongGCIName = data->gci;
- ndbout_c("sent from GCI = %u", aLongGCIName);
- }
- }
-#endif
- DBUG_PRINT("info",("GSN_SUB_TABLE_DATA to node %d", refToNode(ref)));
- sendSignal(ref, GSN_SUB_TABLE_DATA, signal,
- SubTableData::SignalLength, JBB, ptr, nptr);
- data->logType = tmp;
- } else {
- ndbassert(refToNode(ref) == 0 || refToNode(ref) == getOwnNodeId());
- jam();
-#if PRINT_ONLY
- ndbout_c("GSN_SUB_TABLE_DATA to %s: op: %d #attr: %d len: %d",
- getBlockName(refToBlock(ref)),
- noOfAttrs, dataLen);
-#else
-#ifdef HANDOVER_DEBUG
- {
- static int aLongGCIName2 = 0;
- if (data->gci != aLongGCIName2) {
- aLongGCIName2 = data->gci;
- ndbout_c("(EXECUTE_DIRECT) sent from GCI = %u to %u", aLongGCIName2, ref);
- }
- }
-#endif
- EXECUTE_DIRECT(refToBlock(ref), GSN_SUB_TABLE_DATA, signal,
- SubTableData::SignalLength);
- jamEntry();
-#endif
- }
+ DBUG_PRINT("info",("GSN_SUB_TABLE_DATA to node %d",
+ refToNode(subbPtr.p->m_senderRef)));
+ data->senderData = subbPtr.p->m_senderData;
+ sendSignal(subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
+ SubTableData::SignalLength, JBB, ptr, nptr);
}
-
- /**
- * Reset f_bufferLock
- */
- f_bufferLock = 0;
- b_bufferLock = 0;
DBUG_VOID_RETURN;
}
void
-SumaParticipant::execSUB_GCP_COMPLETE_REP(Signal* signal){
+SumaParticipant::execSUB_GCP_COMPLETE_REP(Signal* signal)
+{
jamEntry();
+ ndbassert(signal->getNoOfSections() == 0);
SubGcpCompleteRep * rep = (SubGcpCompleteRep*)signal->getDataPtrSend();
@@ -3068,56 +2916,46 @@
c_lastCompleteGCI = gci;
/**
- * always send SUB_GCP_COMPLETE_REP to Grep (so
- * Lars can do funky stuff calculating intervals,
- * even before the subscription is started
- */
- rep->senderRef = reference();
- rep->senderData = 0; //ignored in grep
- EXECUTE_DIRECT(refToBlock(GREP_REF), GSN_SUB_GCP_COMPLETE_REP, signal,
- SubGcpCompleteRep::SignalLength);
-
- /**
* Signal to subscriber(s)
*/
- SubscriberPtr subbPtr;
- SubscriptionPtr subPtr;
- c_dataSubscribers.first(subbPtr);
- for(; !subbPtr.isNull(); c_dataSubscribers.next(subbPtr)){
-
- if (subbPtr.p->m_firstGCI > gci) {
- jam();
- // we don't send SUB_GCP_COMPLETE_REP for incomplete GCI's
+ KeyTable<Table>::Iterator it;
+ for(c_tables.first(it);!it.isNull();c_tables.next( it))
+ {
+ if (it.curr.p->m_state != Table:

EFINED)
continue;
- }
-
- const Uint32 ref = subbPtr.p->m_subscriberRef;
- rep->senderRef = ref;
- rep->senderData = subbPtr.p->m_subscriberData;
- c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI);
+ LocalDLList<Subscriber>
+ subbs(c_subscriberPool,it.curr.p->c_subscribers);
+ SubscriberPtr subbPtr;
+ for(subbs.first(subbPtr);!subbPtr.isNull();subbs.n ext(subbPtr))
+ {
+ if (subbPtr.p->m_firstGCI > gci) {
+ jam();
+ // we don't send SUB_GCP_COMPLETE_REP for incomplete GCI's
+ continue;
+ }
+
+ const Uint32 ref = subbPtr.p->m_senderRef;
+ rep->senderRef = ref;
+ rep->senderData = subbPtr.p->m_senderData;
+
+ SubscriptionPtr subPtr;
+ c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI);
#if PRINT_ONLY
- ndbout_c("GSN_SUB_GCP_COMPLETE_REP to %s:",
- getBlockName(refToBlock(ref)));
+ ndbout_c("GSN_SUB_GCP_COMPLETE_REP to %s:",
+ getBlockName(refToBlock(ref)));
#else
- /**
- * Ignore sending to GREP (since we sent earlier)
- */
- if (ref == GREP_REF) {
- jam();
- continue;
- }
-
- CRASH_INSERTION(13018);
-
- if (subPtr.p->m_subscriptionType == SubCreateReq::TableEvent)
+
+ CRASH_INSERTION(13018);
+
+ if (subPtr.p->m_subscriptionType == SubCreateReq::TableEvent)
{
jam();
sendSignal(ref, GSN_SUB_GCP_COMPLETE_REP, signal,
SubGcpCompleteRep::SignalLength, JBB);
}
- else
+ else
{
jam();
ndbassert(refToNode(ref) == 0 || refToNode(ref) == getOwnNodeId());
@@ -3126,8 +2964,9 @@
jamEntry();
}
#endif
+ }
}
-
+
if (c_handoverToDo) {
jam();
c_handoverToDo = false;
@@ -3162,24 +3001,62 @@
}
void
-SumaParticipant::execDROP_TABLE_CONF(Signal *signal)
+SumaParticipant::execCREATE_TAB_CONF(Signal *signal)
{
+ jamEntry();
+ DBUG_ENTER("SumaParticipant::execCREATE_TAB_CONF") ;
+
+ CreateTabConf * const conf = (CreateTabConf*)signal->getDataPtr();
+ Uint32 tableId = conf->senderData;
+#if 0
+ TablePtr tabPtr;
+ initTable(signal,tableId,tabPtr);
+#endif
+ DBUG_VOID_RETURN;
+}
+
+void
+SumaParticipant::execDROP_TAB_CONF(Signal *signal)
+{
+ jamEntry();
DBUG_ENTER("SumaParticipant::execDROP_TABLE_CONF") ;
+ ndbassert(signal->getNoOfSections() == 0);
- DropTableConf * const conf = (DropTableConf*)signal->getDataPtr();
+ DropTabConf * const conf = (DropTabConf*)signal->getDataPtr();
Uint32 tableId= conf->tableId;
- Uint32 tableVersion= conf->tableVersion;
+ TablePtr tabPtr;
+ if (!c_tables.find(tabPtr, tableId))
+ {
+ DBUG_VOID_RETURN;
+ }
+
+ DBUG_PRINT("info",("drop table id: %d", tableId));
+
+ tabPtr.p->m_state = Table:

ROPPED;
+ tabPtr.p->m_hasTriggerDefined[0] = 0;
+ tabPtr.p->m_hasTriggerDefined[1] = 0;
+ tabPtr.p->m_hasTriggerDefined[2] = 0;
+ tabPtr.p->m_triggerIds[0] = ILLEGAL_TRIGGER_ID;
+ tabPtr.p->m_triggerIds[1] = ILLEGAL_TRIGGER_ID;
+ tabPtr.p->m_triggerIds[2] = ILLEGAL_TRIGGER_ID;
+
+ if (getResponsibleSumaNodeId(0) != refToNode(reference()))
+ {
+ DBUG_VOID_RETURN;
+ }
+ // responsible for bucket 0 sends info to API
+
SubTableData * data = (SubTableData*)signal->getDataPtrSend();
data->gci = 0;
data->tableId = tableId;
data->operation = 3;
data->noOfAttributes = 0;
data->dataSize = 0;
-
+
+ LocalDLList<Subscriber> subbs(c_subscriberPool,tabPtr.p->c_subscribers);
SubscriberPtr subbPtr;
- for(c_dataSubscribers.first(subbPtr); !subbPtr.isNull();
- c_dataSubscribers.next(subbPtr))
+ for(subbs.first(subbPtr);!subbPtr.isNull();subbs.n ext(subbPtr))
{
jam();
/*
@@ -3187,19 +3064,18 @@
*/
SubscriptionPtr subPtr;
c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI);
- if(!subPtr.p->m_tables.get(tableId) ||
- subPtr.p->m_subscriptionType != SubCreateReq::TableEvent) {
+ if(subPtr.p->m_subscriptionType != SubCreateReq::TableEvent) {
jam();
continue;
//continue in for-loop if the table is not part of
//the subscription. Otherwise, send data to subscriber.
}
- data->senderData= subbPtr.p->m_subscriberData;
- sendSignal(subbPtr.p->m_subscriberRef, GSN_SUB_TABLE_DATA, signal,
+ data->senderData= subbPtr.p->m_senderData;
+ sendSignal(subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
SubTableData::SignalLength, JBB);
+ DBUG_PRINT("info",("sent to subscriber %d", subbPtr.i));
}
-
DBUG_VOID_RETURN;
}
@@ -3211,7 +3087,8 @@
*/
void
-SumaParticipant::runSUB_GCP_COMPLETE_ACC(Signal* signal){
+SumaParticipant::runSUB_GCP_COMPLETE_ACC(Signal* signal)
+{
jam();
SubGcpCompleteAcc * const acc = (SubGcpCompleteAcc*)signal->getDataPtr();
@@ -3226,8 +3103,10 @@
}
void
-Suma::execSUB_GCP_COMPLETE_ACC(Signal* signal){
+Suma::execSUB_GCP_COMPLETE_ACC(Signal* signal)
+{
jamEntry();
+ ndbassert(signal->getNoOfSections() == 0);
if (RtoI(signal->getSendersBlockRef(), false) != RNIL) {
jam();
@@ -3242,39 +3121,13 @@
SubGcpCompleteAcc * const acc = (SubGcpCompleteAcc*)signal->getDataPtr();
Uint32 gci = acc->rep.gci;
Uint32 senderRef = acc->rep.senderRef;
- Uint32 subscriberData = acc->rep.subscriberData;
+ Uint32 senderData = acc->rep.senderData;
-#ifdef EVENT_DEBUG
- ndbout_c("Suma::execSUB_GCP_COMPLETE_ACC gci = %u", gci);
-#endif
- bool moreToCome = false;
+ static Uint32 last_gci= 0;
+ if (last_gci < gci)
+ {
+ last_gci= gci;
- SubscriberPtr subbPtr;
- for(c_dataSubscribers.first(subbPtr);
- !subbPtr.isNull(); c_dataSubscribers.next(subbPtr)){
-#ifdef EVENT_DEBUG
- ndbout_c("Suma::execSUB_GCP_COMPLETE_ACC %u == %u && %u == %u",
- subbPtr.p->m_subscriberRef,
- senderRef,
- subbPtr.p->m_subscriberData,
- subscriberData);
-#endif
- if (subbPtr.p->m_subscriberRef == senderRef &&
- subbPtr.p->m_subscriberData == subscriberData) {
- jam();
-#ifdef EVENT_DEBUG
- ndbout_c("Suma::execSUB_GCP_COMPLETE_ACC gci = FOUND SUBSCRIBER");
-#endif
- subbPtr.p->m_lastGCI = gci;
- } else if (subbPtr.p->m_lastGCI < gci) {
- jam();
- if (subbPtr.p->m_firstGCI <= gci)
- moreToCome = true;
- } else
- jam();
- }
-
- if (!moreToCome) {
// tell the other SUMA's that I'm done with this GCI
jam();
for (Uint32 i = 0; i < c_noNodesInGroup; i++) {
@@ -3353,150 +3206,6 @@
return ok;
}
-/************************************************** ********
- * Suma participant interface
- *
- * Stopping and removing of subscriber
- *
- */
-
-void
-SumaParticipant::execSUB_STOP_REQ(Signal* signal){
- jamEntry();
- DBUG_ENTER("SumaParticipant::execSUB_STOP_REQ");
-
- CRASH_INSERTION(13019);
-
- SubStopReq * const req = (SubStopReq*)signal->getDataPtr();
- Uint32 senderRef = signal->getSendersBlockRef();
- Uint32 senderData = req->senderData;
- Uint32 subscriberRef = req->subscriberRef;
- Uint32 subscriberData = req->subscriberData;
- SubscriptionPtr subPtr;
- Subscription key;
- key.m_subscriptionId = req->subscriptionId;
- key.m_subscriptionKey = req->subscriptionKey;
- Uint32 part = req->part;
-
- if (key.m_subscriptionKey == 0 &&
- key.m_subscriptionId == 0 &&
- subscriberData == 0) {
- SubStopConf* conf = (SubStopConf*)signal->getDataPtrSend();
-
- conf->senderRef = reference();
- conf->senderData = senderData;
- conf->subscriptionId = key.m_subscriptionId;
- conf->subscriptionKey = key.m_subscriptionKey;
- conf->subscriberData = subscriberData;
-
- sendSignal(senderRef, GSN_SUB_STOP_CONF, signal,
- SubStopConf::SignalLength, JBB);
-
- removeSubscribersOnNode(signal, refToNode(subscriberRef));
- DBUG_VOID_RETURN;
- }
-
- if(!c_subscriptions.find(subPtr, key)){
- jam();
- sendSubStopRef(signal, 1407);
- return;
- }
-
- ndbrequire(part == SubscriptionData::TableData);
-
- SubscriberPtr subbPtr;
- if (senderRef == reference()){
- jam();
- c_subscriberPool.getPtr(subbPtr, senderData);
- ndbrequire(subbPtr.p->m_subPtrI == subPtr.i &&
- subbPtr.p->m_subscriberRef == subscriberRef &&
- subbPtr.p->m_subscriberData == subscriberData);
- c_removeDataSubscribers.remove(subbPtr);
- } else {
- bool found = false;
- jam();
- c_dataSubscribers.first(subbPtr);
- for (;!subbPtr.isNull(); c_dataSubscribers.next(subbPtr)){
- jam();
- if (subbPtr.p->m_subPtrI == subPtr.i &&
- refToNode(subbPtr.p->m_subscriberRef) == refToNode(subscriberRef) &&
- subbPtr.p->m_subscriberData == subscriberData){
- // ndbout_c("STOP_REQ: before c_dataSubscribers.release");
- jam();
- c_dataSubscribers.remove(subbPtr);
- found = true;
- break;
- }
- }
- /**
- * If we didn't find anyone, send ref
- */
- if (!found) {
- jam();
- sendSubStopRef(signal, 1407);
- DBUG_VOID_RETURN;
- }
- }
-
- subbPtr.p->m_senderRef = senderRef; // store ref to requestor
- subbPtr.p->m_senderData = senderData; // store ref to requestor
- c_prepDataSubscribers.add(subbPtr);
-
- Ptr<SyncRecord> syncPtr;
- c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI);
- if (syncPtr.p->m_locked) {
- jam();
- sendSubStopRef(signal, 1411);
- DBUG_VOID_RETURN;
- }
- syncPtr.p->m_locked = true;
-
- syncPtr.p->startDropTrigger(signal);
- DBUG_VOID_RETURN;
-}
-
-void
-SumaParticipant::sendSubStopComplete(Signal* signal, SubscriberPtr subbPtr){
- jam();
-
- CRASH_INSERTION(13020);
-
- SubscriptionPtr subPtr;
- c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI);
-
- Ptr<SyncRecord> syncPtr;
- c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI);
- syncPtr.p->m_locked = false;
-
- SubStopConf * const conf = (SubStopConf*)signal->getDataPtrSend();
-
- conf->senderRef = reference();
- conf->senderData = subbPtr.p->m_senderData;
- conf->subscriptionId = subPtr.p->m_subscriptionId;
- conf->subscriptionKey = subPtr.p->m_subscriptionKey;
- conf->subscriberData = subbPtr.p->m_subscriberData;
- Uint32 senderRef = subbPtr.p->m_senderRef;
-
- c_prepDataSubscribers.release(subbPtr);
- sendSignal(senderRef, GSN_SUB_STOP_CONF, signal,
- SubStopConf::SignalLength, JBB);
-}
-
-void
-SumaParticipant::sendSubStopRef(Signal* signal, Uint32 errCode)
-{
- jam();
- SubStopRef * ref = (SubStopRef *)signal->getDataPtrSend();
- ref->senderRef = reference();
- ref->errorCode = errCode;
- sendSignal(signal->getSendersBlockRef(),
- GSN_SUB_STOP_REF,
- signal,
- SubStopRef::SignalLength,
- JBB);
- return;
-}
-
/************************************************** ************
*
* Removing subscription
@@ -3504,11 +3213,11 @@
*/
void
-SumaParticipant::execSUB_REMOVE_REQ(Signal* signal) {
+SumaParticipant::execSUB_REMOVE_REQ(Signal* signal)
+{
jamEntry();
DBUG_ENTER("SumaParticipant::execSUB_REMOVE_REQ");
-
- Uint32 senderRef = signal->getSendersBlockRef();
+ ndbassert(signal->getNoOfSections() == 0);
CRASH_INSERTION(13021);
@@ -3517,114 +3226,86 @@
Subscription key;
key.m_subscriptionId = req.subscriptionId;
key.m_subscriptionKey = req.subscriptionKey;
- bool drop_table= 1;
- if(!c_subscriptions.find(subPtr, key)) {
+ DBUG_PRINT("enter",("key.m_subscriptionId: %u, key.m_subscriptionKey: %u",
+ key.m_subscriptionId, key.m_subscriptionKey));
+
+ if(!c_subscriptions.find(subPtr, key))
+ {
jam();
+ DBUG_PRINT("info",("Not found"));
sendSubRemoveRef(signal, req, 1407);
DBUG_VOID_RETURN;
}
+ if (subPtr.p->m_state == Subscription::LOCKED)
+ {
+ /**
+ * we are currently setting up triggers etc. for this event
+ */
+ jam();
+ sendSubRemoveRef(signal, req, 1413);
+ DBUG_VOID_RETURN;
+ }
- int count = 0;
+ DBUG_PRINT("info",("table id: %u", subPtr.p->m_tableId));
+
+ unsigned count= 0;
+ TablePtr tabPtr;
+ if (c_tables.find(tabPtr, subPtr.p->m_tableId))
{
jam();
- SubscriberPtr i_subbPtr;
- for(c_prepDataSubscribers.first(i_subbPtr);
- !i_subbPtr.isNull(); c_prepDataSubscribers.next(i_subbPtr)){
- jam();
- if( i_subbPtr.p->m_subPtrI == subPtr.i ) {
- /**
- * we are currently setting up triggers etc. for this event
- */
- jam();
- sendSubRemoveRef(signal, req, 1413);
- DBUG_VOID_RETURN;
- // c_prepDataSubscribers.release(subbPtr);
- }
- }
- c_dataSubscribers.first(i_subbPtr);
- while(!i_subbPtr.isNull()){
- jam();
- SubscriberPtr subbPtr = i_subbPtr;
- c_dataSubscribers.next(i_subbPtr);
- if( subbPtr.p->m_subPtrI == subPtr.i ) {
- if (drop_table)
- {
- jam();
- /**
- * just remove it.
- * triggers will be released by the drop table
- */
- // ToDo send a signal to subscribers that it's dropped
- c_dataSubscribers.remove(subbPtr);
- }
- else
- {
- jam();
- // don't allow remove when there already is a subscription
- sendSubRemoveRef(signal, req, 1414);
- DBUG_VOID_RETURN;
- // future...
- /* Unfinished/untested code. If remove should be possible
- * even if subscribers are left these have to be stopped
- * first. See m_markRemove, m_nSubscribers. We need also to
- * block remove for this subscription so that multiple
- * removes is not possible...
- */
- c_dataSubscribers.remove(subbPtr);
- c_removeDataSubscribers.add(subbPtr);
- count++;
- }
- }
- }
- c_metaSubscribers.first(i_subbPtr);
- while(!i_subbPtr.isNull()){
+ LocalDLList<Subscriber> subbs(c_subscriberPool,tabPtr.p->c_subscribers);
+ SubscriberPtr subbPtr;
+ for(subbs.first(subbPtr);!subbPtr.isNull();subbs.n ext(subbPtr))
+ {
jam();
- SubscriberPtr subbPtr = i_subbPtr;
- c_metaSubscribers.next(i_subbPtr);
- if( subbPtr.p->m_subPtrI == subPtr.i ){
+ if( subbPtr.p->m_subPtrI == subPtr.i )
+ {
jam();
- c_metaSubscribers.release(subbPtr);
+ count++;
}
}
}
- subPtr.p->m_senderRef = senderRef;
- subPtr.p->m_senderData = req.senderData;
-
- if (count > 0){
+ if (count == 0)
+ {
+ // no subscribers on the subscription
+ // remove it
jam();
- ndbrequire(false); // code not finalized
- subPtr.p->m_markRemove = true;
- subPtr.p->m_nSubscribers = count;
- sendSubStopReq(signal);
- DBUG_VOID_RETURN;
- } else {
- completeSubRemoveReq(signal, subPtr);
- DBUG_VOID_RETURN;
+ completeSubRemove(subPtr);
}
+ else
+ {
+ // subscribers left on the subscription
+ // mark it to be removed once all subscribers
+ // are removed
+ jam();
+ subPtr.p->m_state = Subscription:

ROPPED;
+ }
+
+ SubRemoveConf * const conf = (SubRemoveConf*)signal->getDataPtrSend();
+ conf->senderRef = reference();
+ conf->senderData = req.senderData;
+ conf->subscriptionId = req.subscriptionId;
+ conf->subscriptionKey = req.subscriptionKey;
+
+ sendSignal(req.senderRef, GSN_SUB_REMOVE_CONF, signal,
+ SubRemoveConf::SignalLength, JBB);
+
+ DBUG_VOID_RETURN;
}
void
-SumaParticipant::completeSubRemoveReq(Signal* signal, SubscriptionPtr subPtr) {
+SumaParticipant::completeSubRemove(SubscriptionPt r subPtr)
+{
+ DBUG_ENTER("SumaParticipant::completeSubRemove");
Uint32 subscriptionId = subPtr.p->m_subscriptionId;
Uint32 subscriptionKey = subPtr.p->m_subscriptionKey;
- Uint32 senderRef = subPtr.p->m_senderRef;
- Uint32 senderData = subPtr.p->m_senderData;
- {
- Ptr<SyncRecord> syncPtr;
- c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI);
-
- syncPtr.p->release();
- c_syncPool.release(syncPtr);
- }
-
- // if (subPtr.p->m_subscriptionType != SubCreateReq::TableEvent) {
- // jam();
- // senderRef = subPtr.p->m_subscriberRef;
- // }
c_subscriptions.release(subPtr);
+ DBUG_PRINT("info",("c_subscriptionPool size: %d free: %d",
+ c_subscriptionPool.getSize(),
+ c_subscriptionPool.getNoOfFree()));
/**
* I was the last subscription to be remove so clear c_tables
@@ -3640,25 +3321,21 @@
ndbout_c("SUB_REMOVE_REQ:Clearing c_tables");
#endif
KeyTable<Table>::Iterator it;
- for(c_tables.first(it); !it.isNull(); ){
-
+ for(c_tables.first(it); !it.isNull(); )
+ {
+ ndbrequire(false);
+
it.curr.p->release(* this);
-
TablePtr tabPtr = it.curr;
-
c_tables.next(it);
- c_tables.release(tabPtr);
+ c_tables.remove(tabPtr);
+ c_tablePool_.release(tabPtr);
+ DBUG_PRINT("info",("c_tablePool size: %d free: %d",
+ c_tablePool_.getSize(),
+ c_tablePool_.getNoOfFree()));
}
}
-
- SubRemoveConf * const conf = (SubRemoveConf*)signal->getDataPtrSend();
- conf->senderRef = reference();
- conf->senderData = senderData;
- conf->subscriptionId = subscriptionId;
- conf->subscriptionKey = subscriptionKey;
-
- sendSignal(senderRef, GSN_SUB_REMOVE_CONF, signal,
- SubRemoveConf::SignalLength, JBB);
+ DBUG_VOID_RETURN;
}
void
@@ -3669,10 +3346,10 @@
DBUG_ENTER("SumaParticipant::sendSubRemoveRef");
SubRemoveRef * ref = (SubRemoveRef *)signal->getDataPtrSend();
ref->senderRef = reference();
+ ref->senderData = req.senderData;
ref->subscriptionId = req.subscriptionId;
ref->subscriptionKey = req.subscriptionKey;
- ref->senderData = req.senderData;
- ref->err = errCode;
+ ref->errorCode = errCode;
releaseSections(signal);
sendSignal(signal->getSendersBlockRef(), GSN_SUB_REMOVE_REF,
signal, SubRemoveRef::SignalLength, JBB);
@@ -3688,6 +3365,12 @@
LocalDataBuffer<15> fragBuf(suma.c_dataBufferPool, m_fragments);
fragBuf.release();
+
+ m_state = UNDEFINED;
+#ifndef DBUG_OFF
+ if (n_subscribers != 0)
+ abort();
+#endif
}
void
@@ -3710,59 +3393,139 @@
*
*/
-Suma::Restart::Restart(Suma& s) : suma(s) {
- for (int i = 0; i < MAX_REPLICAS; i++) {
+void
+Suma::execSUMA_START_ME(Signal* signal) {
+ jamEntry();
+ DBUG_ENTER("Suma::execSUMA_START_ME");
+ ndbassert(signal->getNoOfSections() == 0);
+ Restart.runSUMA_START_ME(signal, signal->getSendersBlockRef());
+ DBUG_VOID_RETURN;
+}
+
+void
+Suma::execSUB_CREATE_REF(Signal* signal) {
+ jamEntry();
+ DBUG_ENTER("Suma::execSUB_CREATE_REF");
+ ndbassert(signal->getNoOfSections() == 0);
+ ndbrequire(false);
+ DBUG_VOID_RETURN;
+}
+
+void
+Suma::execSUB_CREATE_CONF(Signal* signal)
+{
+ jamEntry();
+ DBUG_ENTER("Suma::execSUB_CREATE_CONF");
+ ndbassert(signal->getNoOfSections() == 0);
+ Restart.runSUB_CREATE_CONF(signal);
+ DBUG_VOID_RETURN;
+}
+
+void
+Suma::execSUB_START_CONF(Signal* signal)
+{
+ jamEntry();
+ DBUG_ENTER("Suma::execSUB_START_CONF");
+ ndbassert(signal->getNoOfSections() == 0);
+ Restart.runSUB_START_CONF(signal);
+ DBUG_VOID_RETURN;
+}
+
+void
+Suma::execSUB_START_REF(Signal* signal) {
+ jamEntry();
+ DBUG_ENTER("Suma::execSUB_START_REF");
+ ndbassert(signal->getNoOfSections() == 0);
+ ndbrequire(false);
+ DBUG_VOID_RETURN;
+}
+
+Suma::Restart::Restart(Suma& s) : suma(s)
+{
+ for (int i = 0; i < MAX_REPLICAS; i++)
+ {
c_okToStart[i] = false;
c_waitingToStart[i] = false;
}
}
void
+Suma::Restart::runSUMA_START_ME(Signal* signal, Uint32 sumaRef)
+{
+ jam();
+ DBUG_ENTER("Suma::Restart::runSUMA_START_ME");
+
+ int I = suma.RtoI(sumaRef);
+
+ // restarting Suma is ready for SUB_START_REQ
+ if (c_waitingToStart[i]) {
+ // we've waited with startSubscriber since restarting suma was not ready
+ c_waitingToStart[i] = false;
+ startSubscriber(signal, sumaRef);
+ } else {
+ // do startSubscriber as soon as its time
+ c_okToStart[i] = true;
+ }
+ DBUG_VOID_RETURN;
+}
+
+void
Suma::Restart::resetNode(Uint32 sumaRef)
{
jam();
+ DBUG_ENTER("Suma::Restart::resetNode");
int I = suma.RtoI(sumaRef);
c_okToStart[i] = false;
c_waitingToStart[i] = false;
+ DBUG_VOID_RETURN;
}
void
Suma::Restart::startNode(Signal* signal, Uint32 sumaRef)
{
jam();
+ DBUG_ENTER("Suma::Restart::startNode");
resetNode(sumaRef);
// right now we can only handle restarting one node
// at a time in a node group
createSubscription(signal, sumaRef);
+ DBUG_VOID_RETURN;
}
void
-Suma::Restart::createSubscription(Signal* signal, Uint32 sumaRef) {
+Suma::Restart::createSubscription(Signal* signal, Uint32 sumaRef)
+{
jam();
- suma.c_subscriptions.first(c_subPtr);
+ DBUG_ENTER("Suma::Restart::createSubscription");
+ suma.c_subscriptions.first(c_subIt);
nextSubscription(signal, sumaRef);
+ DBUG_VOID_RETURN;
}
void
-Suma::Restart::nextSubscription(Signal* signal, Uint32 sumaRef) {
+Suma::Restart::nextSubscription(Signal* signal, Uint32 sumaRef)
+{
jam();
- if (c_subPtr.isNull()) {
+ DBUG_ENTER("Suma::Restart::nextSubscription");
+
+ if (c_subIt.isNull())
+ {
jam();
completeSubscription(signal, sumaRef);
- return;
+ DBUG_VOID_RETURN;
}
SubscriptionPtr subPtr;
- subPtr.i = c_subPtr.curr.i;
+ subPtr.i = c_subIt.curr.i;
subPtr.p = suma.c_subscriptions.getPtr(subPtr.i);
- suma.c_subscriptions.next(c_subPtr);
+ suma.c_subscriptions.next(c_subIt);
SubCreateReq * req = (SubCreateReq *)signal->getDataPtrSend();
- req->subscriberRef = suma.reference();
- req->subscriberData = subPtr.i;
+ req->senderRef = suma.reference();
+ req->senderData = subPtr.i;
req->subscriptionId = subPtr.p->m_subscriptionId;
req->subscriptionKey = subPtr.p->m_subscriptionKey;
req->subscriptionType = subPtr.p->m_subscriptionType |
@@ -3770,196 +3533,146 @@
switch (subPtr.p->m_subscriptionType) {
case SubCreateReq::TableEvent:
- case SubCreateReq::SelectiveTableSnapshot:
- case SubCreateReq:

atabaseSnapshot: {
jam();
-
- Ptr<SyncRecord> syncPtr;
- suma.c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI);
- syncPtr.p->m_tableList.first(syncPtr.p->m_tableList_it);
-
- ndbrequire(!syncPtr.p->m_tableList_it.isNull());
-
- req->tableId = *syncPtr.p->m_tableList_it.data;
-
-#if 0
- for (int i = 0; i < MAX_TABLES; i++)
- if (subPtr.p->m_tables.get(i)) {
- req->tableId = i;
- break;
- }
-#endif
-
+ req->tableId = subPtr.p->m_tableId;
suma.sendSignal(sumaRef, GSN_SUB_CREATE_REQ, signal,
- SubCreateReq::SignalLength+1 /*to get table Id*/, JBB);
- return;
- }
- case SubCreateReq::SingleTableScan :
- // TODO
+ SubCreateReq::SignalLength, JBB);
+ DBUG_VOID_RETURN;
+ case SubCreateReq::SingleTableScan:
jam();
- return;
+ nextSubscription(signal, sumaRef);
+ DBUG_VOID_RETURN;
+ case SubCreateReq::SelectiveTableSnapshot:
+ case SubCreateReq:

atabaseSnapshot:
+ ndbrequire(false);
}
ndbrequire(false);
}
-void
-Suma::execSUB_CREATE_CONF(Signal* signal) {
- jamEntry();
-#ifdef NODEFAIL_DEBUG
- ndbout_c("Suma::execSUB_CREATE_CONF");
-#endif
+void
+Suma::Restart::runSUB_CREATE_CONF(Signal* signal)
+{
+ jam();
+ DBUG_ENTER("Suma::Restart::runSUB_CREATE_CONF");
const Uint32 senderRef = signal->senderBlockRef();
+ Uint32 sumaRef = signal->getSendersBlockRef();
SubCreateConf * const conf = (SubCreateConf *)signal->getDataPtr();
- Subscription key;
- const Uint32 subscriberData = conf->subscriberData;
- key.m_subscriptionId = conf->subscriptionId;
- key.m_subscriptionKey = conf->subscriptionKey;
-
SubscriptionPtr subPtr;
- ndbrequire(c_subscriptions.find(subPtr, key));
+ suma.c_subscriptions.getPtr(subPtr,conf->senderData);
switch(subPtr.p->m_subscriptionType) {
case SubCreateReq::TableEvent:
- case SubCreateReq::SelectiveTableSnapshot:
- case SubCreateReq:

atabaseSnapshot:
+ if (1)
{
- Ptr<SyncRecord> syncPtr;
- c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI);
-
- syncPtr.p->m_tableList.next(syncPtr.p->m_tableList_it);
- if (syncPtr.p->m_tableList_it.isNull()) {
- jam();
- SubSyncReq *req = (SubSyncReq *)signal->getDataPtrSend();
-
- req->subscriptionId = key.m_subscriptionId;
- req->subscriptionKey = key.m_subscriptionKey;
- req->subscriberData = subscriberData;
- req->part = (Uint32) SubscriptionData::MetaData;
-
- sendSignal(senderRef, GSN_SUB_SYNC_REQ, signal,
- SubSyncReq::SignalLength, JBB);
- } else {
- jam();
- SubCreateReq * req = (SubCreateReq *)signal->getDataPtrSend();
+ jam();
+ nextSubscription(signal, sumaRef);
+ } else {
+ jam();
+ SubCreateReq * req = (SubCreateReq *)signal->getDataPtrSend();
- req->subscriberRef = reference();
- req->subscriberData = subPtr.i;
- req->subscriptionId = subPtr.p->m_subscriptionId;
- req->subscriptionKey = subPtr.p->m_subscriptionKey;
- req->subscriptionType = subPtr.p->m_subscriptionType |
- SubCreateReq::RestartFlag |
- SubCreateReq::AddTableFlag;
+ req->senderRef = suma.reference();
+ req->senderData = subPtr.i;
+ req->subscriptionId = subPtr.p->m_subscriptionId;
+ req->subscriptionKey = subPtr.p->m_subscriptionKey;
+ req->subscriptionType = subPtr.p->m_subscriptionType |
+ SubCreateReq::RestartFlag |
+ SubCreateReq::AddTableFlag;
- req->tableId = *syncPtr.p->m_tableList_it.data;
+ req->tableId = 0;
- sendSignal(senderRef, GSN_SUB_CREATE_REQ, signal,
- SubCreateReq::SignalLength+1 /*to get table Id*/, JBB);
- }
+ suma.sendSignal(senderRef, GSN_SUB_CREATE_REQ, signal,
+ SubCreateReq::SignalLength, JBB);
}
- return;
+ DBUG_VOID_RETURN;
case SubCreateReq::SingleTableScan:
- ndbrequire(false);
- }
- ndbrequire(false);
-}
-
-void
-Suma::execSUB_CREATE_REF(Signal* signal) {
- jamEntry();
-#ifdef NODEFAIL_DEBUG
- ndbout_c("Suma::execSUB_CREATE_REF");
-#endif
- //ndbrequire(false);
-}
-
-void
-Suma::execSUB_SYNC_CONF(Signal* signal) {
- jamEntry();
-#ifdef NODEFAIL_DEBUG
- ndbout_c("Suma::execSUB_SYNC_CONF");
-#endif
- Uint32 sumaRef = signal->getSendersBlockRef();
-
- SubSyncConf *conf = (SubSyncConf *)signal->getDataPtr();
- Subscription key;
-
- key.m_subscriptionId = conf->subscriptionId;
- key.m_subscriptionKey = conf->subscriptionKey;
- // SubscriptionData::Part part = (SubscriptionData::Part)conf->part;
- // const Uint32 subscriberData = conf->subscriberData;
-
- SubscriptionPtr subPtr;
- c_subscriptions.find(subPtr, key);
-
- switch(subPtr.p->m_subscriptionType) {
- case SubCreateReq::TableEvent:
case SubCreateReq::SelectiveTableSnapshot:
case SubCreateReq:

atabaseSnapshot:
- jam();
- Restart.nextSubscription(signal, sumaRef);
- return;
- case SubCreateReq::SingleTableScan:
ndbrequire(false);
- return;
}
ndbrequire(false);
}
void
-Suma::execSUB_SYNC_REF(Signal* signal) {
- jamEntry();
-#ifdef NODEFAIL_DEBUG
- ndbout_c("Suma::execSUB_SYNC_REF");
-#endif
- //ndbrequire(false);
-}
-
-void
-Suma::execSUMA_START_ME(Signal* signal) {
- jamEntry();
-#ifdef NODEFAIL_DEBUG
- ndbout_c("Suma::execSUMA_START_ME");
-#endif
-
- Restart.runSUMA_START_ME(signal, signal->getSendersBlockRef());
-}
-
-void
-Suma::Restart::runSUMA_START_ME(Signal* signal, Uint32 sumaRef) {
+Suma::Restart::completeSubscription(Signal* signal, Uint32 sumaRef)
+{
+ jam();
+ DBUG_ENTER("Suma::Restart::completeSubscription");
int I = suma.RtoI(sumaRef);
- // restarting Suma is ready for SUB_START_REQ
- if (c_waitingToStart[i]) {
- // we've waited with startSubscriber since restarting suma was not ready
- c_waitingToStart[i] = false;
+ if (c_okToStart[i])
+ {// otherwise will start when START_ME comes
+ c_okToStart[i] = false;
startSubscriber(signal, sumaRef);
- } else {
- // do startSubscriber as soon as its time
- c_okToStart[i] = true;
}
+ else
+ {
+ c_waitingToStart[i] = true;
+ }
+ DBUG_VOID_RETURN;
}
void
-Suma::Restart::completeSubscription(Signal* signal, Uint32 sumaRef) {
+Suma::Restart::startSubscriber(Signal* signal, Uint32 sumaRef)
+{
jam();
- int I = suma.RtoI(sumaRef);
-
- if (c_okToStart[i]) {// otherwise will start when START_ME comes
- c_okToStart[i] = false;
- startSubscriber(signal, sumaRef);
- } else {
- c_waitingToStart[i] = true;
+ DBUG_ENTER("Suma::Restart::startSubscriber");
+ suma.c_tables.first(c_tabIt);
+ if (c_tabIt.isNull())
+ {
+ completeSubscriber(signal, sumaRef);
+ DBUG_VOID_RETURN;
}
+ LocalDLList<Subscriber>
+ subbs(suma.c_subscriberPool,c_tabIt.curr.p->c_subscribers);
+ SubscriberPtr subbPtr;
+ subbs.first(subbPtr);
+ nextSubscriber(signal, sumaRef, subbPtr);
+ DBUG_VOID_RETURN;
}
void
-Suma::Restart::startSubscriber(Signal* signal, Uint32 sumaRef) {
+Suma::Restart::nextSubscriber(Signal* signal, Uint32 sumaRef,
+ SubscriberPtr subbPtr)
+{
jam();
- suma.c_dataSubscribers.first(c_subbPtr);
- nextSubscriber(signal, sumaRef);
+ DBUG_ENTER("Suma::Restart::nextSubscriber");
+ while (subbPtr.isNull())
+ {
+ jam();
+ DBUG_PRINT("info",("prev tableId %u",c_tabIt.curr.p->m_tableId));
+ suma.c_tables.next(c_tabIt);
+ if (c_tabIt.isNull())
+ {
+ completeSubscriber(signal, sumaRef);
+ DBUG_VOID_RETURN;
+ }
+ DBUG_PRINT("info",("next tableId %u",c_tabIt.curr.p->m_tableId));
+
+ LocalDLList<Subscriber>
+ subbs(suma.c_subscriberPool,c_tabIt.curr.p->c_subscribers);
+ subbs.first(subbPtr);
+ }
+
+ /*
+ * get subscription ptr for this subscriber
+ */
+
+ SubscriptionPtr subPtr;
+ suma.c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI);
+ switch (subPtr.p->m_subscriptionType) {
+ case SubCreateReq::TableEvent:
+ jam();
+ sendSubStartReq(subPtr, subbPtr, signal, sumaRef);
+ DBUG_VOID_RETURN;
+ case SubCreateReq::SelectiveTableSnapshot:
+ case SubCreateReq:

atabaseSnapshot:
+ case SubCreateReq::SingleTableScan:
+ ndbrequire(false);
+ }
+ ndbrequire(false);
}
void
@@ -3967,113 +3680,84 @@
Signal* signal, Uint32 sumaRef)
{
jam();
+ DBUG_ENTER("Suma::Restart::sendSubStartReq");
SubStartReq * req = (SubStartReq *)signal->getDataPtrSend();
-
+
req->senderRef = suma.reference();
- req->senderData = subbPtr.p->m_senderData;
+ req->senderData = subbPtr.i;
req->subscriptionId = subPtr.p->m_subscriptionId;
req->subscriptionKey = subPtr.p->m_subscriptionKey;
req->part = SubscriptionData::TableData;
- req->subscriberData = subbPtr.p->m_subscriberData;
- req->subscriberRef = subbPtr.p->m_subscriberRef;
-
+ req->subscriberData = subbPtr.p->m_senderData;
+ req->subscriberRef = subbPtr.p->m_senderRef;
+
// restarting suma will not respond to this until startphase 5
// since it is not until then data copying has been completed
-#ifdef NODEFAIL_DEBUG
- ndbout_c("Suma::Restart::sendSubStartReq sending GSN_SUB_START_REQ id=%u key=%u",
- req->subscriptionId, req->subscriptionKey);
-#endif
+ DBUG_PRINT("info",("Restarting subscriber: %u on key: [%u,%u]",
+ subbPtr.i,
+ subPtr.p->m_subscriptionId,
+ subPtr.p->m_subscriptionKey,
+ subPtr.p->m_tableId));
+
suma.sendSignal(sumaRef, GSN_SUB_START_REQ,
signal, SubStartReq::SignalLength2, JBB);
+ DBUG_VOID_RETURN;
}
void
-Suma::execSUB_START_CONF(Signal* signal) {
- jamEntry();
-#ifdef NODEFAIL_DEBUG
- ndbout_c("Suma::execSUB_START_CONF");
-#endif
- Uint32 sumaRef = signal->getSendersBlockRef();
- Restart.nextSubscriber(signal, sumaRef);
-}
-
-void
-Suma::execSUB_START_REF(Signal* signal) {
- jamEntry();
-#ifdef NODEFAIL_DEBUG
- ndbout_c("Suma::execSUB_START_REF");
-#endif
- //ndbrequire(false);
-}
-
-void
-Suma::Restart::nextSubscriber(Signal* signal, Uint32 sumaRef) {
+Suma::Restart::runSUB_START_CONF(Signal* signal)
+{
jam();
- if (c_subbPtr.isNull()) {
- jam();
- completeSubscriber(signal, sumaRef);
- return;
- }
-
- SubscriberPtr subbPtr = c_subbPtr;
- suma.c_dataSubscribers.next(c_subbPtr);
+ DBUG_ENTER("Suma::Restart::runSUB_START_CONF");
- /*
- * get subscription ptr for this subscriber
- */
+ SubStartConf * const conf = (SubStartConf*)signal->getDataPtr();
+ Subscription key;
SubscriptionPtr subPtr;
- suma.c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI);
- switch (subPtr.p->m_subscriptionType) {
- case SubCreateReq::TableEvent:
- case SubCreateReq::SelectiveTableSnapshot:
- case SubCreateReq:

atabaseSnapshot:
- {
- jam();
- sendSubStartReq(subPtr, subbPtr, signal, sumaRef);
-#if 0
- SubStartReq * req = (SubStartReq *)signal->getDataPtrSend();
-
- req->senderRef = reference();
- req->senderData = subbPtr.p->m_senderData;
- req->subscriptionId = subPtr.p->m_subscriptionId;
- req->subscriptionKey = subPtr.p->m_subscriptionKey;
- req->part = SubscriptionData::TableData;
- req->subscriberData = subbPtr.p->m_subscriberData;
- req->subscriberRef = subbPtr.p->m_subscriberRef;
-
- // restarting suma will not respond to this until startphase 5
- // since it is not until then data copying has been completed
-#ifdef NODEFAIL_DEBUG
- ndbout_c("Suma::nextSubscriber sending GSN_SUB_START_REQ id=%u key=%u",
- req->subscriptionId, req->subscriptionKey);
-#endif
- suma.sendSignal(sumaRef, GSN_SUB_START_REQ,
- signal, SubStartReq::SignalLength2, JBB);
-#endif
- }
- return;
- case SubCreateReq::SingleTableScan:
- ndbrequire(false);
- return;
- }
- ndbrequire(false);
+ key.m_subscriptionId = conf->subscriptionId;
+ key.m_subscriptionKey = conf->subscriptionKey;
+ ndbrequire(suma.c_subscriptions.find(subPtr, key));
+
+ TablePtr tabPtr;
+ ndbrequire(suma.c_tables.find(tabPtr, subPtr.p->m_tableId));
+
+ LocalDLList<Subscriber>
+ subbs(suma.c_subscriberPool,tabPtr.p->c_subscribers);
+ SubscriberPtr subbPtr;
+ subbs.getPtr(subbPtr, conf->senderData);
+
+ DBUG_PRINT("info",("Restarted subscriber: %u on key: [%u,%u] table: %u",
+ subbPtr.i,key.m_subscriptionId,key.m_subscriptionK ey,
+ subPtr.p->m_tableId));
+
+ subbs.next(subbPtr);
+
+ Uint32 sumaRef = signal->getSendersBlockRef();
+ nextSubscriber(signal, sumaRef, subbPtr);
+
+ DBUG_VOID_RETURN;
}
void
-Suma::Restart::completeSubscriber(Signal* signal, Uint32 sumaRef) {
+Suma::Restart::completeSubscriber(Signal* signal, Uint32 sumaRef)
+{
+ DBUG_ENTER("Suma::Restart::completeSubscriber");
completeRestartingNode(signal, sumaRef);
+ DBUG_VOID_RETURN;
}
void
-Suma::Restart::completeRestartingNode(Signal* signal, Uint32 sumaRef) {
+Suma::Restart::completeRestartingNode(Signal* signal, Uint32 sumaRef)
+{
jam();
+ DBUG_ENTER("Suma::Restart::completeRestartingNode" );
SumaHandoverReq * req = (SumaHandoverReq *)signal->getDataPtrSend();
req->gci = suma.getFirstGCI(signal);
suma.sendSignal(sumaRef, GSN_SUMA_HANDOVER_REQ, signal,
SumaHandoverReq::SignalLength, JBB);
+ DBUG_VOID_RETURN;
}
// only run on restarting suma
@@ -4082,22 +3766,31 @@
Suma::execSUMA_HANDOVER_REQ(Signal* signal)
{
jamEntry();
+ DBUG_ENTER("Suma::execSUMA_HANDOVER_REQ");
// Uint32 sumaRef = signal->getSendersBlockRef();
SumaHandoverReq const * req = (SumaHandoverReq *)signal->getDataPtr();
Uint32 gci = req->gci;
Uint32 new_gci = getFirstGCI(signal);
- if (new_gci > gci) {
+ if (new_gci > gci)
+ {
gci = new_gci;
}
{ // all recreated subscribers at restarting SUMA start at same GCI
- SubscriberPtr subbPtr;
- for(c_dataSubscribers.first(subbPtr);
- !subbPtr.isNull();
- c_dataSubscribers.next(subbPtr)){
- subbPtr.p->m_firstGCI = gci;
+ KeyTable<Table>::Iterator it;
+ for(c_tables.first(it);!it.isNull();c_tables.next( it))
+ {
+ LocalDLList<Subscriber>
+ subscribers(c_subscriberPool,it.curr.p->c_subscribers);
+ SubscriberPtr subbPtr;
+ for(subscribers.first(subbPtr);
+ !subbPtr.isNull();
+ subscribers.next(subbPtr))
+ {
+ subbPtr.p->m_firstGCI = gci;
+ }
}
}
@@ -4139,12 +3832,16 @@
SumaHandoverConf::SignalLength, JBB);
}//if
}
+ sendSTTORRY(signal);
+ DBUG_VOID_RETURN;
}
// only run on all but restarting suma
void
Suma::execSUMA_HANDOVER_CONF(Signal* signal) {
jamEntry();
+ DBUG_ENTER("Suma::execSUMA_HANDOVER_CONF");
+
Uint32 sumaRef = signal->getSendersBlockRef();
SumaHandoverConf const * conf = (SumaHandoverConf *)signal->getDataPtr();
@@ -4178,6 +3875,7 @@
}
}
}
+ DBUG_VOID_RETURN;
}
template void append(DataBuffer<11>&,SegmentedSectionPtr,Section SegmentPool&);
--- 1.5/ndb/src/kernel/blocks/suma/Suma.hpp 2005-03-22 02:39:37 +01:00
+++ 1.6/ndb/src/kernel/blocks/suma/Suma.hpp 2005-03-31 00:35:16 +02:00
@@ -58,16 +58,19 @@
/**
* Dict interface
*/
+#if 0
void execLIST_TABLES_REF(Signal* signal);
void execLIST_TABLES_CONF(Signal* signal);
+#endif
void execGET_TABINFOREF(Signal* signal);
void execGET_TABINFO_CONF(Signal* signal);
-#if 0
+
void execGET_TABLEID_CONF(Signal* signal);
void execGET_TABLEID_REF(Signal* signal);
-#endif
- void execDROP_TABLE_CONF(Signal* signal);
+
+ void execDROP_TAB_CONF(Signal* signal);
+ void execCREATE_TAB_CONF(Signal* signal);
/**
* Scan interface
*/
@@ -108,6 +111,9 @@
void execCONTINUEB(Signal* signal);
public:
+
+ void suma_ndbrequire(bool v);
+
typedef DataBuffer<15> TableList;
union FragmentDescriptor {
@@ -129,47 +135,27 @@
Uint32 m_dummy;
};
- struct Table {
- Table() { m_tableId = ~0; }
- void release(SumaParticipant&);
+ struct Subscriber {
+ Uint32 m_senderRef;
+ Uint32 m_senderData;
+ Uint32 m_subPtrI; //reference to subscription
+ Uint32 m_firstGCI; // first GCI to send
+ Uint32 m_lastGCI; // last acnowledged GCI
+ Uint32 nextList;
- union { Uint32 m_tableId; Uint32 key; };
- Uint32 m_schemaVersion;
- Uint32 m_hasTriggerDefined[3]; // Insert/Update/Delete
- Uint32 m_triggerIds[3]; // Insert/Update/Delete
-
- /**
- * Default order in which to ask for attributes during scan
- * 1) Fixed, not nullable
- * 2) Rest
- */
- DataBuffer<15>::Head m_attributes; // Attribute id's
-
- /**
- * Fragments
- */
- DataBuffer<15>::Head m_fragments; // Fragment descriptors
-
- /**
- * Hash table stuff
- */
- Uint32 nextHash;
- union { Uint32 prevHash; Uint32 nextPool; };
- Uint32 hashValue() const {
- return m_tableId;
- }
- bool equal(const Table& rec) const {
- return m_tableId == rec.m_tableId;
- }
+ union { Uint32 nextPool; Uint32 prevList; };
};
- typedef Ptr<Table> TablePtr;
+ typedef Ptr<Subscriber> SubscriberPtr;
/**
* Subscriptions
*/
+ class Table;
+ typedef Ptr<Table> TablePtr;
+
struct SyncRecord {
SyncRecord(SumaParticipant& s, DataBuffer<15>:

ataBufferPool & p)
- : m_locked(false), m_tableList(p), suma(s)
+ : m_tableList(p), suma(s)
#ifdef ERROR_INSERT
, cerrorInsert(s.cerrorInsert)
#endif
@@ -177,50 +163,29 @@
void release();
+ Uint32 m_senderRef;
+ Uint32 m_senderData;
+
Uint32 m_subscriptionPtrI;
- bool m_locked;
- bool m_doSendSyncData;
Uint32 m_error;
- TableList m_tableList; // Tables to sync (snapshoted at beginning)
+ Uint32 m_currentTable;
+ TableList m_tableList; // Tables to sync
TableList:

ataBufferIterator m_tableList_it;
/**
- * Sync meta
- */
- void startMeta(Signal*);
- void nextMeta(Signal*);
- void completeMeta(Signal*);
-
- /**
- * Create triggers
- */
- Uint32 m_latestTriggerId;
- void startTrigger(Signal* signal);
- void nextTrigger(Signal* signal);
- void completeTrigger(Signal* signal);
- void createAttributeMask(AttributeMask&, Table*);
-
- /**
- * Drop triggers
- */
- void startDropTrigger(Signal* signal);
- void nextDropTrigger(Signal* signal);
- void completeDropTrigger(Signal* signal);
-
- /**
* Sync data
*/
- Uint32 m_currentTable; // Index in m_tableList
Uint32 m_currentFragment; // Index in tabPtr.p->m_fragments
DataBuffer<15>::Head m_attributeList; // Attribute if other than default
DataBuffer<15>::Head m_tabList; // tables if other than default
Uint32 m_currentTableId; // Current table
Uint32 m_currentNoOfAttributes; // No of attributes for current table
+
void startScan(Signal*);
void nextScan(Signal*);
bool getNextFragment(TablePtr * tab, FragmentDescriptor * fd);
- void completeScan(Signal*);
+ void completeScan(Signal*, int error= 0);
SumaParticipant & suma;
#ifdef ERROR_INSERT
@@ -231,35 +196,107 @@
suma.progError(line, cause, extra);
}
- void runLIST_TABLES_CONF(Signal* signal);
- void runGET_TABINFO_CONF(Signal* signal);
- void runGET_TABINFOREF(Signal* signal);
+ union { Uint32 nextPool; Uint32 nextList; Uint32 prevList; Uint32 ptrI; };
+ };
+ friend struct SyncRecord;
+
+ int initTable(Signal *signal,Uint32 tableId, TablePtr &tabPtr,
+ Ptr<SyncRecord> syncPtr);
+ int initTable(Signal *signal,Uint32 tableId, TablePtr &tabPtr,
+ SubscriberPtr subbPtr);
+ int initTable(Signal *signal,Uint32 tableId, TablePtr &tabPtr);
+
+ void completeInitTable(Signal* signal, TablePtr tabPtr);
+
+ struct Table {
+ Table() { m_tableId = ~0; n_subscribers = 0; }
+ void release(SumaParticipant&);
+ void checkRelease(SumaParticipant &suma);
+
+ DLList<Subscriber>::Head c_subscribers;
+ DLList<SyncRecord>::Head c_syncRecords;
+
+ enum State {
+ UNDEFINED,
+ DEFINING,
+ DEFINED,
+ DROPPED
+ };
+ State m_state;
+
+ Uint32 m_ptrI;
+ SubscriberPtr m_drop_subbPtr;
+
+ Uint32 n_subscribers;
+
+ bool parseTable(SegmentedSectionPtr ptr, SumaParticipant &suma);
+ /**
+ * Create triggers
+ */
+ int setupTrigger(Signal* signal, SumaParticipant &suma);
+ void completeTrigger(Signal* signal);
+ void createAttributeMask(AttributeMask&, SumaParticipant &suma);
- void runDI_FCOUNTCONF(Signal* signal);
- void runDIGETPRIMCONF(Signal* signal);
+ /**
+ * Drop triggers
+ */
+ void dropTrigger(Signal* signal,SumaParticipant&);
+ void runDropTrigger(Signal* signal, Uint32 triggerId,SumaParticipant&);
- void runCREATE_TRIG_CONF(Signal* signal);
- void runCREATE_TRIG_REF(Signal* signal);
- void runDROP_TRIG_CONF(Signal* signal);
- void runDROP_TRIG_REF(Signal* signal);
- void runDropTrig(Signal* signal, Uint32 triggerId, Uint32 tableId);
+ /**
+ * Sync meta
+ */
+#if 0
+ void runLIST_TABLES_CONF(Signal* signal);
+#endif
+
+ union { Uint32 m_tableId; Uint32 key; };
+ Uint32 m_schemaVersion;
+ Uint32 m_hasTriggerDefined[3]; // Insert/Update/Delete
+ Uint32 m_triggerIds[3]; // Insert/Update/Delete
- union { Uint32 nextPool; Uint32 nextList; Uint32 ptrI; };
+ Uint32 m_error;
+ /**
+ * Default order in which to ask for attributes during scan
+ * 1) Fixed, not nullable
+ * 2) Rest
+ */
+ DataBuffer<15>::Head m_attributes; // Attribute id's
+
+ /**
+ * Fragments
+ */
+ Uint32 m_fragCount;
+ DataBuffer<15>::Head m_fragments; // Fragment descriptors
+
+ /**
+ * Hash table stuff
+ */
+ Uint32 nextHash;
+ union { Uint32 prevHash; Uint32 nextPool; };
+ Uint32 hashValue() const {
+ return m_tableId;
+ }
+ bool equal(const Table& rec) const {
+ return m_tableId == rec.m_tableId;
+ }
};
- friend struct SyncRecord;
-
+
struct Subscription {
- Uint32 m_subscriberRef;
- Uint32 m_subscriberData;
Uint32 m_senderRef;
Uint32 m_senderData;
Uint32 m_subscriptionId;
Uint32 m_subscriptionKey;
Uint32 m_subscriptionType;
- Uint32 m_coordinatorRef;
- Uint32 m_syncPtrI; // Active sync operation
- Uint32 m_nSubscribers;
- bool m_markRemove;
+
+ enum State {
+ UNDEFINED,
+ LOCKED,
+ DEFINED,
+ DROPPED
+ };
+ State m_state;
+ Uint32 n_subscribers;
Uint32 nextHash;
union { Uint32 prevHash; Uint32 nextPool; };
@@ -274,35 +311,13 @@
m_subscriptionKey == s.m_subscriptionKey;
}
/**
- * The following holds the table names of tables included
+ * The following holds the tables included
* in the subscription.
*/
- // TODO we've got to fix this, this is to inefficient. Tomas
- Bitmask<(unsigned int)(MAX_TABLES+3)/4> m_tables;
-#if 0
- char m_tableNames[MAX_TABLES][MAX_TAB_NAME_SIZE];
-#endif
- /**
- * "Iterator" used to iterate through m_tableNames
- */
- Uint32 m_maxTables;
- Uint32 m_currentTable;
+ Uint32 m_tableId;
};
typedef Ptr<Subscription> SubscriptionPtr;
- struct Subscriber {
- Uint32 m_senderRef;
- Uint32 m_senderData;
- Uint32 m_subscriberRef;
- Uint32 m_subscriberData;
- Uint32 m_subPtrI; //reference to subscription
- Uint32 m_firstGCI; // first GCI to send
- Uint32 m_lastGCI; // last acnowledged GCI
- Uint32 nextList;
- union { Uint32 nextPool; Uint32 prevList; };
- };
- typedef Ptr<Subscriber> SubscriberPtr;
-
struct Bucket {
bool active;
bool handover;
@@ -318,8 +333,6 @@
*
*/
DLList<Subscriber> c_metaSubscribers;
- DLList<Subscriber> c_dataSubscribers;
- DLList<Subscriber> c_prepDataSubscribers;
DLList<Subscriber> c_removeDataSubscribers;
/**
@@ -357,16 +370,13 @@
*/
bool removeSubscribersOnNode(Signal *signal, Uint32 nodeId);
- bool parseTable(Signal* signal, class GetTabInfoConf* conf, Uint32 tableId,
- SyncRecord* syncPtr_p);
bool checkTableTriggers(SegmentedSectionPtr ptr);
void addTableId(Uint32 TableId,
SubscriptionPtr subPtr, SyncRecord *psyncRec);
- void sendSubIdRef(Signal* signal, Uint32 errorCode);
- void sendSubCreateConf(Signal* signal, Uint32 sender, SubscriptionPtr subPtr);
- void sendSubCreateRef(Signal* signal, const SubCreateReq& req, Uint32 errorCode);
+ void sendSubIdRef(Signal* signal,Uint32 senderRef,Uint32 senderData,Uint32 errorCode);
+ void sendSubCreateRef(Signal* signal, Uint32 errorCode);
void sendSubStartRef(Signal*, SubscriberPtr, Uint32 errorCode, SubscriptionData::Part);
void sendSubStartRef(Signal* signal, Uint32 errorCode);
void sendSubStopRef(Signal* signal, Uint32 errorCode);
@@ -378,7 +388,7 @@
void sendSubStopComplete(Signal*, SubscriberPtr);
void sendSubStopReq(Signal* signal, bool unlock= false);
- void completeSubRemoveReq(Signal* signal, SubscriptionPtr subPtr);
+ void completeSubRemove(SubscriptionPtr subPtr);
Uint32 getFirstGCI(Signal* signal);
Uint32 decideWhoToSend(Uint32 nBucket, Uint32 gci);
@@ -470,9 +480,6 @@
void execSUB_STOP_REF(Signal* signal);
void execSUB_STOP_CONF(Signal* signal);
- void execSUB_SYNC_REF(Signal* signal);
- void execSUB_SYNC_CONF(Signal* signal);
-
void execSUB_ABORT_SYNC_REF(Signal* signal);
void execSUB_ABORT_SYNC_CONF(Signal* signal);
@@ -506,8 +513,8 @@
bool c_okToStart[MAX_REPLICAS];
bool c_waitingToStart[MAX_REPLICAS];
- DLHashTable<SumaParticipant::Subscription>::Iterat or c_subPtr; // TODO [MAX_REPLICAS]
- SubscriberPtr c_subbPtr; // TODO [MAX_REPLICAS]
+ DLHashTable<SumaParticipant::Subscription>::Iterat or c_subIt;
+ KeyTable<Table>::Iterator c_tabIt;
void progError(int line, int cause, const char * extra) {
suma.progError(line, cause, extra);
@@ -519,16 +526,14 @@
void createSubscription(Signal* signal, Uint32 sumaRef);
void nextSubscription(Signal* signal, Uint32 sumaRef);
+ void runSUB_CREATE_CONF(Signal* signal);
void completeSubscription(Signal* signal, Uint32 sumaRef);
- void startSync(Signal* signal, Uint32 sumaRef);
- void nextSync(Signal* signal, Uint32 sumaRef);
- void completeSync(Signal* signal, Uint32 sumaRef);
-
+ void startSubscriber(Signal* signal, Uint32 sumaRef);
+ void nextSubscriber(Signal* signal, Uint32 sumaRef, SubscriberPtr subbPtr);
void sendSubStartReq(SubscriptionPtr subPtr, SubscriberPtr subbPtr,
Signal* signal, Uint32 sumaRef);
- void startSubscriber(Signal* signal, Uint32 sumaRef);
- void nextSubscriber(Signal* signal, Uint32 sumaRef);
+ void runSUB_START_CONF(Signal* signal);
void completeSubscriber(Signal* signal, Uint32 sumaRef);
void completeRestartingNode(Signal* signal, Uint32 sumaRef);
--- 1.7/ndb/src/kernel/blocks/suma/SumaInit.cpp 2005-03-22 02:39:37 +01:00
+++ 1.8/ndb/src/kernel/blocks/suma/SumaInit.cpp 2005-03-31 00:35:16 +02:00
@@ -22,8 +22,6 @@
SumaParticipant::SumaParticipant(const Configuration & conf) :
SimulatedBlock(SUMA, conf),
c_metaSubscribers(c_subscriberPool),
- c_dataSubscribers(c_subscriberPool),
- c_prepDataSubscribers(c_subscriberPool),
c_removeDataSubscribers(c_subscriberPool),
c_tables(c_tablePool_),
c_subscriptions(c_subscriptionPool)
@@ -37,19 +35,19 @@
addRecSignal(GSN_SUB_REMOVE_REQ, &SumaParticipant::execSUB_REMOVE_REQ);
addRecSignal(GSN_SUB_START_REQ, &SumaParticipant::execSUB_START_REQ);
addRecSignal(GSN_SUB_STOP_REQ, &SumaParticipant::execSUB_STOP_REQ);
- addRecSignal(GSN_SUB_SYNC_REQ, &SumaParticipant::execSUB_SYNC_REQ);
-
- addRecSignal(GSN_SUB_STOP_CONF, &SumaParticipant::execSUB_STOP_CONF);
addRecSignal(GSN_SUB_STOP_REF, &SumaParticipant::execSUB_STOP_REF);
+ addRecSignal(GSN_SUB_STOP_CONF, &SumaParticipant::execSUB_STOP_CONF);
+ addRecSignal(GSN_SUB_SYNC_REQ, &SumaParticipant::execSUB_SYNC_REQ);
/**
* Dict interface
*/
- addRecSignal(GSN_DROP_TABLE_CONF, &SumaParticipant::execDROP_TABLE_CONF);
+ addRecSignal(GSN_DROP_TAB_CONF, &SumaParticipant::execDROP_TAB_CONF);
+ addRecSignal(GSN_CREATE_TAB_CONF, &SumaParticipant::execCREATE_TAB_CONF);
- //addRecSignal(GSN_LIST_TABLES_REF, &SumaParticipant::execLIST_TABLES_REF);
+#if 0
addRecSignal(GSN_LIST_TABLES_CONF, &SumaParticipant::execLIST_TABLES_CONF);
- //addRecSignal(GSN_GET_TABINFOREF, &SumaParticipant::execGET_TABINFO_REF);
+#endif
addRecSignal(GSN_GET_TABINFO_CONF, &SumaParticipant::execGET_TABINFO_CONF);
addRecSignal(GSN_GET_TABINFOREF, &SumaParticipant::execGET_TABINFOREF);
#if 0
@@ -59,9 +57,7 @@
/**
* Dih interface
*/
- //addRecSignal(GSN_DI_FCOUNTREF, &SumaParticipant::execDI_FCOUNTREF);
addRecSignal(GSN_DI_FCOUNTCONF, &SumaParticipant::execDI_FCOUNTCONF);
- //addRecSignal(GSN_DIGETPRIMREF, &SumaParticipant::execDIGETPRIMREF);
addRecSignal(GSN_DIGETPRIMCONF, &SumaParticipant::execDIGETPRIMCONF);
/**
@@ -172,8 +168,7 @@
addRecSignal(GSN_SUB_CREATE_CONF, &Suma::execSUB_CREATE_CONF);
addRecSignal(GSN_SUB_CREATE_REF, &Suma::execSUB_CREATE_REF);
- addRecSignal(GSN_SUB_SYNC_CONF, &Suma::execSUB_SYNC_CONF);
- addRecSignal(GSN_SUB_SYNC_REF, &Suma::execSUB_SYNC_REF);
+
addRecSignal(GSN_SUB_START_CONF, &Suma::execSUB_START_CONF);
addRecSignal(GSN_SUB_START_REF, &Suma::execSUB_START_REF);
--- 1.7/ndb/src/kernel/blocks/trix/Trix.cpp 2005-03-10 07:48:43 +01:00
+++ 1.8/ndb/src/kernel/blocks/trix/Trix.cpp 2005-03-31 00:35:16 +02:00
@@ -83,7 +83,6 @@
addRecSignal(GSN_SUB_SYNC_CONF, &Trix::execSUB_SYNC_CONF);
addRecSignal(GSN_SUB_SYNC_REF, &Trix::execSUB_SYNC_REF);
addRecSignal(GSN_SUB_SYNC_CONTINUE_REQ, &Trix::execSUB_SYNC_CONTINUE_REQ);
- addRecSignal(GSN_SUB_META_DATA, &Trix::execSUB_META_DATA);
addRecSignal(GSN_SUB_TABLE_DATA, &Trix::execSUB_TABLE_DATA);
// Allocate pool sizes
@@ -424,6 +423,8 @@
void Trix:: execBUILDINDXREQ(Signal* signal)
{
jamEntry();
+ DBUG_ENTER("Trix:: execBUILDINDXREQ");
+
BuildIndxReq * buildIndxReq = (BuildIndxReq *)signal->getDataPtr();
// Seize a subscription record
@@ -438,7 +439,7 @@
releaseSections(signal);
sendSignal(buildIndxReq->getUserRef(),
GSN_BUILDINDXREF, signal, BuildIndxRef::SignalLength, JBB);
- return;
+ DBUG_VOID_RETURN;
}
subRec = subRecPtr.p;
subRec->errorCode = BuildIndxRef::NoError;
@@ -476,6 +477,7 @@
#endif
releaseSections(signal);
prepareInsertTransactions(signal, subRecPtr);
+ DBUG_VOID_RETURN;
}
void Trix:: execBUILDINDXCONF(Signal* signal)
@@ -563,25 +565,31 @@
void Trix::execSUB_CREATE_CONF(Signal* signal)
{
jamEntry();
+ DBUG_ENTER("Trix::execSUB_CREATE_CONF");
SubCreateConf * subCreateConf = (SubCreateConf *)signal->getDataPtr();
SubscriptionRecPtr subRecPtr;
SubscriptionRecord* subRec;
- subRecPtr.i = subCreateConf->subscriberData;
+ subRecPtr.i = subCreateConf->senderData;
if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
printf("Trix::execSUB_CREATE_CONF: Failed to find subscription data %u\n", subRecPtr.i);
- return;
+ DBUG_VOID_RETURN;
}
- ndbrequire(subRec->subscriptionId == subCreateConf->subscriptionId);
- ndbrequire(subRec->subscriptionKey == subCreateConf->subscriptionKey);
subRec->subscriptionCreated = true;
subRecPtr.p = subRec;
- setupTableScan(signal, subRecPtr);
+
+ DBUG_PRINT("info",("i: %u subscriptionId: %u, subscriptionKey: %u",
+ subRecPtr.i, subRecPtr.p->subscriptionId,
+ subRecPtr.p->subscriptionKey));
+
+ startTableScan(signal, subRecPtr);
+ DBUG_VOID_RETURN;
}
void Trix::execSUB_CREATE_REF(Signal* signal)
{
jamEntry();
+ DBUG_ENTER("Trix::execSUB_CREATE_REF");
// THIS SIGNAL IS NEVER SENT FROM SUMA?
/*
SubCreateRef * subCreateRef = (SubCreateRef *)signal->getDataPtr();
@@ -596,47 +604,48 @@
subRecPtr.p = subRec;
buildFailed(signal, subRecPtr, BuildIndxRef::InternalError);
*/
+ DBUG_VOID_RETURN;
}
void Trix::execSUB_SYNC_CONF(Signal* signal)
{
jamEntry();
+ DBUG_ENTER("Trix::execSUB_SYNC_CONF");
SubSyncConf * subSyncConf = (SubSyncConf *)signal->getDataPtr();
SubscriptionRecPtr subRecPtr;
SubscriptionRecord* subRec;
- subRecPtr.i = subSyncConf->subscriberData;
+ subRecPtr.i = subSyncConf->senderData;
if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
- printf("Trix::execSUB_SYNC_CONF: Failed to find subscription data %u\n", subRecPtr.i);
- return;
+ printf("Trix::execSUB_SYNC_CONF: Failed to find subscription data %u\n",
+ subRecPtr.i);
+ DBUG_VOID_RETURN;
}
- ndbrequire(subRec->subscriptionId == subSyncConf->subscriptionId);
- ndbrequire(subRec->subscriptionKey == subSyncConf->subscriptionKey);
+
subRecPtr.p = subRec;
- if(subSyncConf->part == SubscriptionData::MetaData)
- startTableScan(signal, subRecPtr);
- else {
- subRec->expectedConf--;
- checkParallelism(signal, subRec);
- if (subRec->expectedConf == 0)
- buildComplete(signal, subRecPtr);
- }
+ subRec->expectedConf--;
+ checkParallelism(signal, subRec);
+ if (subRec->expectedConf == 0)
+ buildComplete(signal, subRecPtr);
+ DBUG_VOID_RETURN;
}
void Trix::execSUB_SYNC_REF(Signal* signal)
{
jamEntry();
+ DBUG_ENTER("Trix::execSUB_SYNC_REF");
SubSyncRef * subSyncRef = (SubSyncRef *)signal->getDataPtr();
SubscriptionRecPtr subRecPtr;
SubscriptionRecord* subRec;
- subRecPtr.i = subSyncRef->subscriberData;
+ subRecPtr.i = subSyncRef->senderData;
if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
printf("Trix::execSUB_SYNC_REF: Failed to find subscription data %u\n", subRecPtr.i);
- return;
+ DBUG_VOID_RETURN;
}
subRecPtr.p = subRec;
buildFailed(signal, subRecPtr, BuildIndxRef::InternalError);
+ DBUG_VOID_RETURN;
}
void Trix::execSUB_SYNC_CONTINUE_REQ(Signal* signal)
@@ -656,18 +665,13 @@
checkParallelism(signal, subRec);
}
-void Trix::execSUB_META_DATA(Signal* signal)
-{
- jamEntry();
-}
-
void Trix::execSUB_TABLE_DATA(Signal* signal)
{
jamEntry();
SubTableData * subTableData = (SubTableData *)signal->getDataPtr();
SubscriptionRecPtr subRecPtr;
SubscriptionRecord* subRec;
- subRecPtr.i = subTableData->subscriberData;
+ subRecPtr.i = subTableData->senderData;
if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
printf("Trix::execSUB_TABLE_DATA: Failed to find subscription data %u\n", subRecPtr.i);
return;
@@ -685,14 +689,36 @@
void Trix::setupSubscription(Signal* signal, SubscriptionRecPtr subRecPtr)
{
- Uint32 attributeList[MAX_ATTRIBUTES_IN_TABLE * 2];
- SubCreateReq * subCreateReq = (SubCreateReq *)signal->getDataPtrSend();
+ jam();
+ DBUG_ENTER("Trix::setupSubscription");
SubscriptionRecord* subRec = subRecPtr.p;
+ SubCreateReq * subCreateReq = (SubCreateReq *)signal->getDataPtrSend();
// Uint32 listLen = subRec->noOfIndexColumns + subRec->noOfKeyColumns;
+ subCreateReq->senderRef = reference();
+ subCreateReq->senderData = subRecPtr.i;
+ subCreateReq->subscriptionId = subRec->subscriptionId;
+ subCreateReq->subscriptionKey = subRec->subscriptionKey;
+ subCreateReq->tableId = subRec->sourceTableId;
+ subCreateReq->subscriptionType = SubCreateReq::SingleTableScan;
+
+ DBUG_PRINT("info",("i: %u subscriptionId: %u, subscriptionKey: %u",
+ subRecPtr.i, subCreateReq->subscriptionId,
+ subCreateReq->subscriptionKey));
+
+ sendSignal(SUMA_REF, GSN_SUB_CREATE_REQ,
+ signal, SubCreateReq::SignalLength, JBB);
+ DBUG_VOID_RETURN;
+}
+
+void Trix::startTableScan(Signal* signal, SubscriptionRecPtr subRecPtr)
+{
+ jam();
+
+ Uint32 attributeList[MAX_ATTRIBUTES_IN_TABLE * 2];
+ SubscriptionRecord* subRec = subRecPtr.p;
AttrOrderBuffer:

ataBufferIterator iter;
Uint32 i = 0;
- jam();
bool moreAttributes = subRec->attributeOrder.first(iter);
while (moreAttributes) {
attributeList[i++] = *iter.data;
@@ -703,41 +729,21 @@
orderPtr[0].p = attributeList;
orderPtr[0].sz = subRec->attributeOrder.getSize();
-
- subCreateReq->subscriberRef = reference();
- subCreateReq->subscriberData = subRecPtr.i;
- subCreateReq->subscriptionId = subRec->subscriptionId;
- subCreateReq->subscriptionKey = subRec->subscriptionKey;
- subCreateReq->tableId = subRec->sourceTableId;
- subCreateReq->subscriptionType = SubCreateReq::SingleTableScan;
-
- sendSignal(SUMA_REF, GSN_SUB_CREATE_REQ,
- signal, SubCreateReq::SignalLength+1, JBB, orderPtr, 1);
-}
-
-void Trix::setupTableScan(Signal* signal, SubscriptionRecPtr subRecPtr)
-{
SubSyncReq * subSyncReq = (SubSyncReq *)signal->getDataPtrSend();
+ subSyncReq->senderRef = reference();
+ subSyncReq->senderData = subRecPtr.i;
+ subSyncReq->subscriptionId = subRec->subscriptionId;
+ subSyncReq->subscriptionKey = subRec->subscriptionKey;
+ subSyncReq->part = SubscriptionData::TableData;
- jam();
- subSyncReq->subscriptionId = subRecPtr.i;
- subSyncReq->subscriptionKey = subRecPtr.p->subscriptionKey;
- subSyncReq->part = SubscriptionData::MetaData;
- sendSignal(SUMA_REF, GSN_SUB_SYNC_REQ,
- signal, SubSyncReq::SignalLength, JBB);
-}
-
-void Trix::startTableScan(Signal* signal, SubscriptionRecPtr subRecPtr)
-{
- jam();
subRecPtr.p->expectedConf = 1;
- SubSyncReq * subSyncReq = (SubSyncReq *)signal->getDataPtrSend();
- subSyncReq->subscriptionId = subRecPtr.i;
- subSyncReq->subscriptionKey = subRecPtr.p->subscriptionKey;
- subSyncReq->part = SubscriptionData::TableData;
- sendSignal(SUMA_REF, GSN_SUB_SYNC_REQ,
- signal, SubSyncReq::SignalLength, JBB);
+ DBUG_PRINT("info",("i: %u subscriptionId: %u, subscriptionKey: %u",
+ subRecPtr.i, subSyncReq->subscriptionId,
+ subSyncReq->subscriptionKey));
+
+ sendSignal(SUMA_REF, GSN_SUB_SYNC_REQ,
+ signal, SubSyncReq::SignalLength, JBB, orderPtr, 1);
}
void Trix:

repareInsertTransactions(Signal* signal,
--- 1.3/ndb/src/kernel/blocks/trix/Trix.hpp 2004-08-23 12:57:20 +02:00
+++ 1.4/ndb/src/kernel/blocks/trix/Trix.hpp 2005-03-31 00:35:16 +02:00
@@ -170,12 +170,10 @@
void execSUB_SYNC_CONF(Signal* signal);
void execSUB_SYNC_REF(Signal* signal);
void execSUB_SYNC_CONTINUE_REQ(Signal* signal);
- void execSUB_META_DATA(Signal* signal);
void execSUB_TABLE_DATA(Signal* signal);
// Utility functions
void setupSubscription(Signal* signal, SubscriptionRecPtr subRecPtr);
- void setupTableScan(Signal* signal, SubscriptionRecPtr subRecPtr);
void startTableScan(Signal* signal, SubscriptionRecPtr subRecPtr);
void prepareInsertTransactions(Signal* signal, SubscriptionRecPtr subRecPtr);
void executeInsertTransaction(Signal* signal, SubscriptionRecPtr subRecPtr,
--- 1.25/ndb/src/ndbapi/NdbEventOperationImpl.cpp 2005-03-22 02:39:37 +01:00
+++ 1.26/ndb/src/ndbapi/NdbEventOperationImpl.cpp 2005-03-31 00:35:16 +02:00
@@ -123,7 +123,7 @@
}
m_ndb->theImpl->theNdbObjectIdMap.unmap(m_oid, this);
- DBUG_PRINT("exit",("this: 0x%x oid: %u", this, m_oid));
+ DBUG_PRINT("exit",("this: 0x%x/0x%x oid: %u", this, m_facade, m_oid));
DBUG_VOID_RETURN;
}
--- 1.31/ndb/src/ndbapi/Ndbif.cpp 2005-03-21 21:59:42 +01:00
+++ 1.32/ndb/src/ndbapi/Ndbif.cpp 2005-03-31 00:35:16 +02:00
@@ -702,7 +702,7 @@
CAST_CONSTPTR(SubGcpCompleteRep, aSignal->getDataPtr());
const Uint32 gci= rep->gci;
// const Uint32 senderRef= rep->senderRef;
- const Uint32 oid= rep->subscriberData;
+ const Uint32 oid= rep->senderData;
const Uint32 ref= aSignal->theSendersBlockRef;
// send acnowledge
@@ -730,15 +730,15 @@
{
const SubTableData * const sdata=
CAST_CONSTPTR(SubTableData, aSignal->getDataPtr());
- const Uint32 oid = sdata->subscriberData;
+ const Uint32 oid = sdata->senderData;
for (int i= aSignal->m_noOfSections;i < 3; i++) {
ptr[i].p = NULL;
ptr[i].sz = 0;
}
- DBUG_PRINT("info",("oid=subscriberData %d, gci %d, operation %d, "
- "tableId %d",
- sdata->subscriberData, sdata->gci, sdata->operation,
+ DBUG_PRINT("info",("oid=senderData: %d, gci: %d, operation: %d, "
+ "tableId: %d",
+ sdata->senderData, sdata->gci, sdata->operation,
sdata->tableId));
NdbEventOperationImpl *op= (NdbEventOperationImpl*)int2void(oid);
--- 1.6/ndb/test/include/HugoTransactions.hpp 2004-12-14 22:30:30 +01:00
+++ 1.7/ndb/test/include/HugoTransactions.hpp 2005-03-31 00:35:16 +02:00
@@ -29,8 +29,6 @@
const NdbDictionary::Index* idx = 0);
~HugoTransactions();
int createEvent(Ndb*);
- int eventOperation(Ndb*, void* stats,
- int records);
int loadTable(Ndb*,
int records,
int batch = 512,
--- 1.8/ndb/test/ndbapi/test_event.cpp 2005-03-09 01:22:06 +01:00
+++ 1.9/ndb/test/ndbapi/test_event.cpp 2005-03-31 00:35:16 +02:00
@@ -19,6 +19,7 @@
#include <HugoTransactions.hpp>
#include <UtilTransactions.hpp>
#include <TestNdbEventOperation.hpp>
+#include <NdbAutoPtr.hpp>
#define GETNDB(ps) ((NDBT_NdbApiStep*)ps)->getNdb()
@@ -32,6 +33,223 @@
return NDBT_OK;
}
+struct receivedEvent {
+ Uint32 pk;
+ Uint32 count;
+ Uint32 event;
+};
+
+static NdbEventOperation*
+createEventOperation(Ndb* pNdb, const NdbDictionary::Table &tab)
+{
+}
+
+static int
+dropEventOperation(Ndb* pNdb, NdbEventOperation *pOp)
+{
+}
+
+static int
+eventOperation(Ndb* pNdb, const NdbDictionary::Table &tab, void* pstats, int records)
+{
+ Uint32 i;
+ const char function[] = "HugoTransactions::eventOperation: ";
+ struct receivedEvent* recInsertEvent;
+ NdbAutoObjArrayPtr<struct receivedEvent>
+ p00( recInsertEvent = new struct receivedEvent[3*records] );
+ struct receivedEvent* recUpdateEvent = &recInsertEvent[records];
+ struct receivedEvent* recDeleteEvent = &recInsertEvent[2*records];
+
+ EventOperationStats &stats = *(EventOperationStats*)pstats;
+
+ stats.n_inserts = 0;
+ stats.n_deletes = 0;
+ stats.n_updates = 0;
+ stats.n_consecutive = 0;
+ stats.n_duplicates = 0;
+ stats.n_inconsistent_gcis = 0;
+
+ for (i = 0; i < records; i++) {
+ recInsertEvent[i].pk = 0xFFFFFFFF;
+ recInsertEvent[i].count = 0;
+ recInsertEvent[i].event = 0xFFFFFFFF;
+
+ recUpdateEvent[i].pk = 0xFFFFFFFF;
+ recUpdateEvent[i].count = 0;
+ recUpdateEvent[i].event = 0xFFFFFFFF;
+
+ recDeleteEvent[i].pk = 0xFFFFFFFF;
+ recDeleteEvent[i].count = 0;
+ recDeleteEvent[i].event = 0xFFFFFFFF;
+ }
+
+ NdbDictionary:

ictionary *myDict = pNdb->getDictionary();
+
+ if (!myDict) {
+ g_err << function << "Event Creation failedDictionary not found\n";
+ return NDBT_FAILED;
+ }
+
+ int r = 0;
+ NdbEventOperation *pOp;
+
+ char eventName[1024];
+ sprintf(eventName,"%s_EVENT",tab.getName());
+ int noEventColumnName = tab.getNoOfColumns();
+
+ g_info << function << "create EventOperation\n";
+ pOp = pNdb->createEventOperation(eventName, 100);
+ if ( pOp == NULL ) {
+ g_err << function << "Event operation creation failed\n";
+ return NDBT_FAILED;
+ }
+
+ g_info << function << "get values\n";
+ NdbRecAttr* recAttr[1024];
+ NdbRecAttr* recAttrPre[1024];
+
+ const NdbDictionary::Table *_table = myDict->getTable(tab.getName());
+
+ for (int a = 0; a < noEventColumnName; a++) {
+ recAttr[a] = pOp->getValue(_table->getColumn(a)->getName());
+ recAttrPre[a] = pOp->getPreValue(_table->getColumn(a)->getName());
+ }
+
+ // set up the callbacks
+ g_info << function << "execute\n";
+ if (pOp->execute()) { // This starts changes to "start flowing"
+ g_err << function << "operation execution failed: \n";
+ g_err << pOp->getNdbError().code << " "
+ << pOp->getNdbError().message << endl;
+ return NDBT_FAILED;
+ }
+
+ g_info << function << "ok\n";
+
+ int count = 0;
+ Uint32 last_inconsitant_gci = 0xEFFFFFF0;
+
+ while (r < records){
+ //printf("now waiting for event...\n");
+ int res = pNdb->pollEvents(1000); // wait for event or 1000 ms
+
+ if (res > 0) {
+ //printf("got data! %d\n", r);
+ int overrun;
+ while (pOp->next(&overrun) > 0) {
+ r++;
+ r += overrun;
+ count++;
+
+ Uint32 gci = pOp->getGCI();
+ Uint32 pk = recAttr[0]->u_32_value();
+
+ if (!pOp->isConsistent()) {
+ if (last_inconsitant_gci != gci) {
+ last_inconsitant_gci = gci;
+ stats.n_inconsistent_gcis++;
+ }
+ g_warning << "A node failure has occured and events might be missing\n";
+ }
+ g_info << function << "GCI " << gci << ": " << count;
+ struct receivedEvent* recEvent;
+ switch (pOp->getEventType()) {
+ case NdbDictionary::Event::TE_INSERT:
+ stats.n_inserts++;
+ g_info << " INSERT: ";
+ recEvent = recInsertEvent;
+ break;
+ case NdbDictionary::Event::TE_DELETE:
+ stats.n_deletes++;
+ g_info << " DELETE: ";
+ recEvent = recDeleteEvent;
+ break;
+ case NdbDictionary::Event::TE_UPDATE:
+ stats.n_updates++;
+ g_info << " UPDATE: ";
+ recEvent = recUpdateEvent;
+ break;
+ case NdbDictionary::Event::TE_ALL:
+ abort();
+ }
+
+ if ((int)pk < records) {
+ recEvent[pk].pk = pk;
+ recEvent[pk].count++;
+ }
+
+ g_info << "overrun " << overrun << " pk " << pk;
+ for (i = 1; i < noEventColumnName; i++) {
+ if (recAttr[i]->isNULL() >= 0) { // we have a value
+ g_info << " post[" << i << "]=";
+ if (recAttr[i]->isNULL() == 0) // we have a non-null value
+ g_info << recAttr[i]->u_32_value();
+ else // we have a null value
+ g_info << "NULL";
+ }
+ if (recAttrPre[i]->isNULL() >= 0) { // we have a value
+ g_info << " pre[" << i << "]=";
+ if (recAttrPre[i]->isNULL() == 0) // we have a non-null value
+ g_info << recAttrPre[i]->u_32_value();
+ else // we have a null value
+ g_info << "NULL";
+ }
+ }
+ g_info << endl;
+ }
+ } else
+ ;//printf("timed out\n");
+ }
+
+ g_info << "dropping event operation" << endl;
+
+ int res = pNdb->dropEventOperation(pOp);
+ if (res != 0) {
+ g_err << "operation execution failed\n";
+ return NDBT_FAILED;
+ }
+
+ g_info << " ok" << endl;
+
+ if (stats.n_inserts > 0) {
+ stats.n_consecutive++;
+ }
+ if (stats.n_deletes > 0) {
+ stats.n_consecutive++;
+ }
+ if (stats.n_updates > 0) {
+ stats.n_consecutive++;
+ }
+ for (i = 0; i < (Uint32)records/3; i++) {
+ if (recInsertEvent[i].pk != i) {
+ stats.n_consecutive ++;
+ ndbout << "missing insert pk " << i << endl;
+ } else if (recInsertEvent[i].count > 1) {
+ ndbout << "duplicates insert pk " << i
+ << " count " << recInsertEvent[i].count << endl;
+ stats.n_duplicates += recInsertEvent[i].count-1;
+ }
+ if (recUpdateEvent[i].pk != i) {
+ stats.n_consecutive ++;
+ ndbout << "missing update pk " << i << endl;
+ } else if (recUpdateEvent[i].count > 1) {
+ ndbout << "duplicates update pk " << i
+ << " count " << recUpdateEvent[i].count << endl;
+ stats.n_duplicates += recUpdateEvent[i].count-1;
+ }
+ if (recDeleteEvent[i].pk != i) {
+ stats.n_consecutive ++;
+ ndbout << "missing delete pk " << i << endl;
+ } else if (recDeleteEvent[i].count > 1) {
+ ndbout << "duplicates delete pk " << i
+ << " count " << recDeleteEvent[i].count << endl;
+ stats.n_duplicates += recDeleteEvent[i].count-1;
+ }
+ }
+
+ return NDBT_OK;
+}
+
int runCreateShadowTable(NDBT_Context* ctx, NDBT_Step* step)
{
const NdbDictionary::Table *table= ctx->getTab();
@@ -72,7 +290,7 @@
for (int i= 0; i < loops; i++)
{
#if 1
- if (hugoTrans.eventOperation(GETNDB(step), (void*)&stats, 0) != 0){
+ if (eventOperation(GETNDB(step), tab, (void*)&stats, 0) != 0){
return NDBT_FAILED;
}
#else
@@ -110,7 +328,7 @@
// sleep(tId);
- if (hugoTrans.eventOperation(GETNDB(step), (void*)&stats, 3*records) != 0){
+ if (eventOperation(GETNDB(step), *ctx->getTab(), (void*)&stats, 3*records) != 0){
return NDBT_FAILED;
}
@@ -143,9 +361,11 @@
int records = ctx->getNumRecords();
HugoTransactions hugoTrans(*ctx->getTab());
+ sleep(1);
+#if 0
sleep(5);
sleep(theThreadIdCounter);
-
+#endif
if (hugoTrans.loadTable(GETNDB(step), records, 1, true, loops) != 0){
return NDBT_FAILED;
}
@@ -163,9 +383,10 @@
int loops = ctx->getNumLoops();
int records = ctx->getNumRecords();
HugoTransactions hugoTrans(*ctx->getTab());
-
+ sleep(1);
+#if 0
sleep(5);
-
+#endif
if (hugoTrans.loadTable(GETNDB(step), 3*records, 1, true, 1) != 0){
return NDBT_FAILED;
}
@@ -439,15 +660,17 @@
DBUG_RETURN(NDBT_OK);
}
-// INITIALIZER(runInsert);
-// STEP(runPkRead);
-// VERIFIER(runVerifyInsert);
-// FINALIZER(runClearTable);
-
NDBT_TESTSUITE(test_event);
TESTCASE("BasicEventOperation",
"Verify that we can listen to Events"
"NOTE! No errors are allowed!" ){
+#if 0
+ TABLE("T1");
+ TABLE("T3");
+ TABLE("T5");
+ TABLE("T6");
+ TABLE("T8");
+#endif
INITIALIZER(runCreateEvent);
STEP(runEventOperation);
STEP(runEventLoad);
--- 1.23/ndb/test/src/HugoTransactions.cpp 2005-02-12 15:17:39 +01:00
+++ 1.24/ndb/test/src/HugoTransactions.cpp 2005-03-31 00:35:16 +02:00
@@ -784,13 +784,7 @@
NdbDictionary::Event myEvent(eventName);
myEvent.setTable(tab.getName());
myEvent.addTableEvent(NdbDictionary::Event::TE_ALL );
- // myEvent.addTableEvent(NdbDictionary::Event::TE_INS ERT);
- // myEvent.addTableEvent(NdbDictionary::Event::TE_UPD ATE);
- // myEvent.addTableEvent(NdbDictionary::Event::TE_DEL ETE);
-
- // const NdbDictionary::Table *_table = myDict->getTable(tab.getName());
for(int a = 0; a < tab.getNoOfColumns(); a++){
- // myEvent.addEventColumn(_table->getColumn(a)->getName());
myEvent.addEventColumn(a);
}
@@ -826,222 +820,6 @@
return NDBT_FAILED;
}
- return NDBT_OK;
-}
-
-#include <NdbEventOperation.hpp>
-#include "TestNdbEventOperation.hpp"
-#include <NdbAutoPtr.hpp>
-
-struct receivedEvent {
- Uint32 pk;
- Uint32 count;
- Uint32 event;
-};
-
-int XXXXX = 0;
-
-int
-HugoTransactions::eventOperation(Ndb* pNdb, void* pstats,
- int records) {
- int myXXXXX = XXXXX++;
- Uint32 i;
- const char function[] = "HugoTransactions::eventOperation: ";
- struct receivedEvent* recInsertEvent;
- NdbAutoObjArrayPtr<struct receivedEvent>
- p00( recInsertEvent = new struct receivedEvent[3*records] );
- struct receivedEvent* recUpdateEvent = &recInsertEvent[records];
- struct receivedEvent* recDeleteEvent = &recInsertEvent[2*records];
-
- EventOperationStats &stats = *(EventOperationStats*)pstats;
-
- stats.n_inserts = 0;
- stats.n_deletes = 0;
- stats.n_updates = 0;
- stats.n_consecutive = 0;
- stats.n_duplicates = 0;
- stats.n_inconsistent_gcis = 0;
-
- for (i = 0; i < records; i++) {
- recInsertEvent[i].pk = 0xFFFFFFFF;
- recInsertEvent[i].count = 0;
- recInsertEvent[i].event = 0xFFFFFFFF;
-
- recUpdateEvent[i].pk = 0xFFFFFFFF;
- recUpdateEvent[i].count = 0;
- recUpdateEvent[i].event = 0xFFFFFFFF;
-
- recDeleteEvent[i].pk = 0xFFFFFFFF;
- recDeleteEvent[i].count = 0;
- recDeleteEvent[i].event = 0xFFFFFFFF;
- }
-
- NdbDictionary:

ictionary *myDict = pNdb->getDictionary();
-
- if (!myDict) {
- g_err << function << "Event Creation failedDictionary not found\n";
- return NDBT_FAILED;
- }
-
- int r = 0;
- NdbEventOperation *pOp;
-
- char eventName[1024];
- sprintf(eventName,"%s_EVENT",tab.getName());
- int noEventColumnName = tab.getNoOfColumns();
-
- g_info << function << "create EventOperation\n";
- pOp = pNdb->createEventOperation(eventName, 100);
- if ( pOp == NULL ) {
- g_err << function << "Event operation creation failed\n";
- return NDBT_FAILED;
- }
-
- g_info << function << "get values\n";
- NdbRecAttr* recAttr[1024];
- NdbRecAttr* recAttrPre[1024];
-
- const NdbDictionary::Table *_table = myDict->getTable(tab.getName());
-
- for (int a = 0; a < noEventColumnName; a++) {
- recAttr[a] = pOp->getValue(_table->getColumn(a)->getName());
- recAttrPre[a] = pOp->getPreValue(_table->getColumn(a)->getName());
- }
-
- // set up the callbacks
- g_info << function << "execute\n";
- if (pOp->execute()) { // This starts changes to "start flowing"
- g_err << function << "operation execution failed: \n";
- g_err << pOp->getNdbError().code << " "
- << pOp->getNdbError().message << endl;
- return NDBT_FAILED;
- }
-
- g_info << function << "ok\n";
-
- int count = 0;
- Uint32 last_inconsitant_gci = 0xEFFFFFF0;
-
- while (r < records){
- //printf("now waiting for event...\n");
- int res = pNdb->pollEvents(1000); // wait for event or 1000 ms
-
- if (res > 0) {
- //printf("got data! %d\n", r);
- int overrun;
- while (pOp->next(&overrun) > 0) {
- r++;
- r += overrun;
- count++;
-
- Uint32 gci = pOp->getGCI();
- Uint32 pk = recAttr[0]->u_32_value();
-
- if (!pOp->isConsistent()) {
- if (last_inconsitant_gci != gci) {
- last_inconsitant_gci = gci;
- stats.n_inconsistent_gcis++;
- }
- g_warning << "A node failure has occured and events might be missing\n";
- }
- g_info << function << "GCI " << gci << ": " << count;
- struct receivedEvent* recEvent;
- switch (pOp->getEventType()) {
- case NdbDictionary::Event::TE_INSERT:
- stats.n_inserts++;
- g_info << " INSERT: ";
- recEvent = recInsertEvent;
- break;
- case NdbDictionary::Event::TE_DELETE:
- stats.n_deletes++;
- g_info << " DELETE: ";
- recEvent = recDeleteEvent;
- break;
- case NdbDictionary::Event::TE_UPDATE:
- stats.n_updates++;
- g_info << " UPDATE: ";
- recEvent = recUpdateEvent;
- break;
- case NdbDictionary::Event::TE_ALL:
- abort();
- }
-
- if ((int)pk < records) {
- recEvent[pk].pk = pk;
- recEvent[pk].count++;
- }
-
- g_info << "overrun " << overrun << " pk " << pk;
- for (i = 1; i < noEventColumnName; i++) {
- if (recAttr[i]->isNULL() >= 0) { // we have a value
- g_info << " post[" << i << "]=";
- if (recAttr[i]->isNULL() == 0) // we have a non-null value
- g_info << recAttr[i]->u_32_value();
- else // we have a null value
- g_info << "NULL";
- }
- if (recAttrPre[i]->isNULL() >= 0) { // we have a value
- g_info << " pre[" << i << "]=";
- if (recAttrPre[i]->isNULL() == 0) // we have a non-null value
- g_info << recAttrPre[i]->u_32_value();
- else // we have a null value
- g_info << "NULL";
- }
- }
- g_info << endl;
- }
- } else
- ;//printf("timed out\n");
- }
-
- // sleep ((XXXXX-myXXXXX)*2);
-
- g_info << myXXXXX << "dropping event operation" << endl;
-
- int res = pNdb->dropEventOperation(pOp);
- if (res != 0) {
- g_err << "operation execution failed\n";
- return NDBT_FAILED;
- }
-
- g_info << myXXXXX << " ok" << endl;
-
- if (stats.n_inserts > 0) {
- stats.n_consecutive++;
- }
- if (stats.n_deletes > 0) {
- stats.n_consecutive++;
- }
- if (stats.n_updates > 0) {
- stats.n_consecutive++;
- }
- for (i = 0; i < (Uint32)records/3; i++) {
- if (recInsertEvent[i].pk != i) {
- stats.n_consecutive ++;
- ndbout << "missing insert pk " << i << endl;
- } else if (recInsertEvent[i].count > 1) {
- ndbout << "duplicates insert pk " << i
- << " count " << recInsertEvent[i].count << endl;
- stats.n_duplicates += recInsertEvent[i].count-1;
- }
- if (recUpdateEvent[i].pk != i) {
- stats.n_consecutive ++;
- ndbout << "missing update pk " << i << endl;
- } else if (recUpdateEvent[i].count > 1) {
- ndbout << "duplicates update pk " << i
- << " count " << recUpdateEvent[i].count << endl;
- stats.n_duplicates += recUpdateEvent[i].count-1;
- }
- if (recDeleteEvent[i].pk != i) {
- stats.n_consecutive ++;
- ndbout << "missing delete pk " << i << endl;
- } else if (recDeleteEvent[i].count > 1) {
- ndbout << "duplicates delete pk " << i
- << " count " << recDeleteEvent[i].count << endl;
- stats.n_duplicates += recDeleteEvent[i].count-1;
- }
- }
-
return NDBT_OK;
}
--- 1.31/ndb/src/ndbapi/ndberror.c 2005-03-21 21:59:42 +01:00
+++ 1.32/ndb/src/ndbapi/ndberror.c 2005-03-31 00:35:16 +02:00
@@ -407,6 +407,7 @@
{ 1414, TR, "Subscriber manager has subscribers on this subscription" },
{ 1415, SE, "Subscription not unique in subscriber manager" },
{ 1416, IS, "Can't accept more subscriptions, out of space in pool" },
+ { 1417, SE, "Table in suscription not defined, probably dropped" },
{ 4004, AE, "Attribute name not found in the Table" },
--- 1.2/ndb/src/kernel/blocks/Makefile.am 2004-11-10 00:02:57 +01:00
+++ 1.3/ndb/src/kernel/blocks/Makefile.am 2005-03-31 00:35:16 +02:00
@@ -13,7 +13,6 @@
backup \
dbutil \
suma \
- grep \
dbtux
windoze-dsp:
--- 1.4/ndb/test/ndbapi/test_event_multi_table.cpp 2005-03-16 01:18:03 +01:00
+++ 1.5/ndb/test/ndbapi/test_event_multi_table.cpp 2005-03-31 00:35:16 +02:00
@@ -21,6 +21,8 @@
#include <HugoTransactions.hpp>
#include <UtilTransactions.hpp>
#include <TestNdbEventOperation.hpp>
+#include <NdbRestarter.hpp>
+#include <NdbRestarts.hpp>
static void usage()
{
@@ -295,19 +297,39 @@
// create all tables
Vector<const NdbDictionary::Table*> pTabs;
- for (i= 0; no_error && argc; argc--, i++)
+ if (argc == 0)
{
- dict->dropTable(argv[i]);
- NDBT_Tables::createTable(&ndb, argv[i]);
- const NdbDictionary::Table *pTab= dict->getTable(argv[i]);
- if (pTab == 0)
+ NDBT_Tables::dropAllTables(&ndb);
+ NDBT_Tables::createAllTables(&ndb);
+ for (i= 0; no_error && i < NDBT_Tables::getNumTables(); i++)
{
- ndbout << "Failed to create table" << endl;
- ndbout << dict->getNdbError() << endl;
- no_error= 0;
- break;
+ const NdbDictionary::Table *pTab= dict->getTable(NDBT_Tables::getTable(i)->getName());
+ if (pTab == 0)
+ {
+ ndbout << "Failed to create table" << endl;
+ ndbout << dict->getNdbError() << endl;
+ no_error= 0;
+ break;
+ }
+ pTabs.push_back(pTab);
+ }
+ }
+ else
+ {
+ for (i= 0; no_error && argc; argc--, i++)
+ {
+ dict->dropTable(argv[i]);
+ NDBT_Tables::createTable(&ndb, argv[i]);
+ const NdbDictionary::Table *pTab= dict->getTable(argv[i]);
+ if (pTab == 0)
+ {
+ ndbout << "Failed to create table" << endl;
+ ndbout << dict->getNdbError() << endl;
+ no_error= 0;
+ break;
+ }
+ pTabs.push_back(pTab);
}
- pTabs.push_back(pTab);
}
pTabs.push_back(NULL);
@@ -388,9 +410,8 @@
hugo_ops.push_back(new HugoOperations(*pTabs[i]));
}
- sleep(5);
-
- // insert 3 records per table
+ int n_records= 3;
+ // insert n_records records per table
do {
if (start_transaction(&ndb, hugo_ops))
{
@@ -399,7 +420,7 @@
}
for (i= 0; no_error && pTabs[i]; i++)
{
- hugo_ops[i]->pkInsertRecord(&ndb, 0, 3);
+ hugo_ops[i]->pkInsertRecord(&ndb, 0, n_records);
}
if (execute_commit(&ndb, hugo_ops))
{
@@ -427,7 +448,7 @@
}
} while (0);
- // update 2 records in first table
+ // update n_records-1 records in first table
do {
if (start_transaction(&ndb, hugo_ops))
{
@@ -435,7 +456,7 @@
break;
}
- hugo_ops[0]->pkUpdateRecord(&ndb, 2);
+ hugo_ops[0]->pkUpdateRecord(&ndb, n_records-1);
if (execute_commit(&ndb, hugo_ops))
{
@@ -463,6 +484,62 @@
}
} while (0);
+
+ {
+ NdbRestarts restarts;
+ for (int j= 0; j < 10; j++)
+ {
+ // restart a node
+ if (no_error)
+ {
+ int timeout = 240;
+ if (restarts.executeRestart("RestartRandomNodeAbort", timeout))
+ {
+ no_error= 0;
+ break;
+ }
+ }
+
+ // update all n_records records on all tables
+ if (start_transaction(&ndb, hugo_ops))
+ {
+ no_error= 0;
+ break;
+ }
+
+ for (int r= 0; r < n_records; r++)
+ {
+ for (i= 0; pTabs[i]; i++)
+ {
+ hugo_ops[i]->pkUpdateRecord(&ndb, r);
+ }
+ }
+ if (execute_commit(&ndb, hugo_ops))
+ {
+ no_error= 0;
+ break;
+ }
+ if(close_transaction(&ndb, hugo_ops))
+ {
+ no_error= 0;
+ break;
+ }
+
+ // copy events and verify
+ if (copy_events(&ndb) < 0)
+ {
+ no_error= 0;
+ break;
+ }
+ if (verify_copy(&ndb, pTabs, pShadowTabs))
+ {
+ no_error= 0;
+ break;
+ }
+ }
+ }
+
+ // drop the event operations
for (i= 0; i < (int)pOps.size(); i++)
{
if (ndb.dropEventOperation(pOps[i]))
--
MySQL Internals Mailing List
For list archives: http://lists.mysql.com/internals
To unsubscribe: http://lists.mysql.com/internals?uns...ie.nctu.edu.tw