28
28
29
29
#include " ../dsql/dsql_proto.h"
30
30
#include " ../dsql/DsqlCursor.h"
31
+ #include " ../dsql/StmtNodes.h"
31
32
32
33
using namespace Firebird ;
33
34
using namespace Jrd ;
@@ -36,7 +37,11 @@ static const char* const SCRATCH = "fb_cursor_";
36
37
static const ULONG PREFETCH_SIZE = 65536 ; // 64 KB
37
38
38
39
DsqlCursor::DsqlCursor (DsqlDmlRequest* req, ULONG flags)
39
- : m_dsqlRequest(req), m_message(req->getDsqlStatement ()->getReceiveMsg()),
40
+ : m_dsqlRequest(req),
41
+ m_keyBuffer(nullptr ),
42
+ m_keyBufferLength(0 ),
43
+ m_message(req->getDsqlStatement ()->getReceiveMsg()->msg_number),
44
+ m_messageLength(0 ),
40
45
m_resultSet(NULL ), m_flags(flags),
41
46
m_space(req->getPool (), SCRATCH),
42
47
m_state(BOS), m_eof(false ), m_position(0 ), m_cachedCount(0 )
@@ -48,6 +53,11 @@ DsqlCursor::~DsqlCursor()
48
53
{
49
54
if (m_resultSet)
50
55
m_resultSet->resetHandle ();
56
+ if (m_keyBuffer)
57
+ {
58
+ delete[] m_keyBuffer;
59
+ m_keyBuffer = nullptr ;
60
+ }
51
61
}
52
62
53
63
jrd_tra* DsqlCursor::getTransaction () const
@@ -66,6 +76,17 @@ void DsqlCursor::setInterfacePtr(JResultSet* interfacePtr) noexcept
66
76
m_resultSet = interfacePtr;
67
77
}
68
78
79
+ bool DsqlCursor::getCurrentRecordKey (USHORT context, RecordKey& key) const
80
+ {
81
+ if (context * sizeof (RecordKey) >= m_keyBufferLength)
82
+ {
83
+ return false ;
84
+ }
85
+
86
+ key = m_keyBuffer[context];
87
+ return key.recordNumber .bid_relation_id != 0 ;
88
+ }
89
+
69
90
void DsqlCursor::close (thread_db* tdbb, DsqlCursor* cursor)
70
91
{
71
92
if (!cursor)
@@ -88,7 +109,7 @@ void DsqlCursor::close(thread_db* tdbb, DsqlCursor* cursor)
88
109
89
110
if (dsqlRequest->req_traced && TraceManager::need_dsql_free (attachment))
90
111
{
91
- TraceSQLStatementImpl stmt (dsqlRequest, NULL );
112
+ TraceSQLStatementImpl stmt (dsqlRequest, nullptr , nullptr );
92
113
TraceManager::event_dsql_free (attachment, &stmt, DSQL_close);
93
114
}
94
115
@@ -115,6 +136,15 @@ int DsqlCursor::fetchNext(thread_db* tdbb, UCHAR* buffer)
115
136
return 1 ;
116
137
}
117
138
139
+ if (m_keyBufferLength == 0 )
140
+ {
141
+ Request* req = m_dsqlRequest->getRequest ();
142
+ m_keyBufferLength = req->req_rpb .getCount () * sizeof (RecordKey);
143
+ fb_assert (m_keyBufferLength > 0 );
144
+ m_keyBuffer = new RecordKey[req->req_rpb .getCount ()];
145
+ }
146
+
147
+ m_dsqlRequest->gatherRecordKey (m_keyBuffer);
118
148
m_state = POSITIONED;
119
149
return 0 ;
120
150
}
@@ -163,7 +193,7 @@ int DsqlCursor::fetchAbsolute(thread_db* tdbb, UCHAR* buffer, SLONG position)
163
193
{
164
194
if (!m_eof)
165
195
{
166
- cacheInput (tdbb);
196
+ cacheInput (tdbb, buffer );
167
197
fb_assert (m_eof);
168
198
}
169
199
@@ -248,7 +278,7 @@ void DsqlCursor::getInfo(thread_db* tdbb,
248
278
case IResultSet::INF_RECORD_COUNT:
249
279
if (isScrollable && !m_eof)
250
280
{
251
- cacheInput (tdbb);
281
+ cacheInput (tdbb, nullptr );
252
282
fb_assert (m_eof);
253
283
}
254
284
response.insertInt (tag, isScrollable ? m_cachedCount : -1 );
@@ -291,48 +321,71 @@ int DsqlCursor::fetchFromCache(thread_db* tdbb, UCHAR* buffer, FB_UINT64 positio
291
321
{
292
322
if (position >= m_cachedCount)
293
323
{
294
- if (m_eof || !cacheInput (tdbb, position))
324
+ if (m_eof || !cacheInput (tdbb, buffer, position))
295
325
{
296
326
m_state = EOS;
297
327
return 1 ;
298
328
}
299
329
}
300
330
301
331
fb_assert (position < m_cachedCount);
332
+ fb_assert (m_messageLength > 0 ); // At this point m_messageLength must be set by cacheInput
302
333
303
- UCHAR* const msgBuffer = m_dsqlRequest->req_msg_buffers [m_message->msg_buffer_number ];
304
-
305
- const FB_UINT64 offset = position * m_message->msg_length ;
306
- const FB_UINT64 readBytes = m_space.read (offset, msgBuffer, m_message->msg_length );
307
- fb_assert (readBytes == m_message->msg_length );
308
-
309
- m_dsqlRequest->mapInOut (tdbb, true , m_message, NULL , buffer);
334
+ FB_UINT64 offset = position * (m_messageLength + m_keyBufferLength);
335
+ FB_UINT64 readBytes = m_space.read (offset, buffer, m_messageLength);
336
+ offset += m_messageLength;
337
+ readBytes += m_space.read (offset, m_keyBuffer, m_keyBufferLength);
338
+ fb_assert (readBytes == m_messageLength + m_keyBufferLength);
310
339
311
340
m_position = position;
312
341
m_state = POSITIONED;
313
342
return 0 ;
314
343
}
315
344
316
- bool DsqlCursor::cacheInput (thread_db* tdbb, FB_UINT64 position)
345
+ bool DsqlCursor::cacheInput (thread_db* tdbb, UCHAR* buffer, FB_UINT64 position)
317
346
{
318
347
fb_assert (!m_eof);
319
348
320
- const ULONG prefetchCount = MAX (PREFETCH_SIZE / m_message->msg_length , 1 );
321
- const UCHAR* const msgBuffer = m_dsqlRequest->req_msg_buffers [m_message->msg_buffer_number ];
349
+ // It could not be done before: user buffer length may be unknown until call setDelayedOutputMetadata()
350
+ if (m_messageLength == 0 )
351
+ {
352
+ Request* req = m_dsqlRequest->getRequest ();
353
+ const MessageNode* msg = req->getStatement ()->getMessage (m_message);
354
+ m_messageLength = msg->getFormat (req)->fmt_length ;
355
+ // Save record key unconditionally because setCursorName() can be called after openCursor()
356
+ m_keyBufferLength = req->req_rpb .getCount () * sizeof (RecordKey);
357
+ m_keyBuffer = new RecordKey[req->req_rpb .getCount ()];
358
+ }
359
+
360
+ std::unique_ptr<UCHAR[]> ownBuffer;
361
+ if (buffer == nullptr )
362
+ {
363
+ // We are called from getInfo() and there is no user-provided buffer for data.
364
+ // Create a temporary one.
365
+ // This code cannot be moved into getInfo() itself because it is most likely called before fetch()
366
+ // so m_messageLength is still unknown there.
367
+ ownBuffer.reset (buffer = new UCHAR[m_messageLength]);
368
+ }
369
+
370
+ const ULONG prefetchCount = MAX (PREFETCH_SIZE / (m_messageLength + m_keyBufferLength), 1 );
322
371
323
372
while (position >= m_cachedCount)
324
373
{
325
374
for (ULONG count = 0 ; count < prefetchCount; count++)
326
375
{
327
- if (!m_dsqlRequest->fetch (tdbb, NULL ))
376
+ if (!m_dsqlRequest->fetch (tdbb, buffer ))
328
377
{
329
378
m_eof = true ;
330
379
break ;
331
380
}
332
381
333
- const FB_UINT64 offset = m_cachedCount * m_message->msg_length ;
334
- const FB_UINT64 writtenBytes = m_space.write (offset, msgBuffer, m_message->msg_length );
335
- fb_assert (writtenBytes == m_message->msg_length );
382
+ m_dsqlRequest->gatherRecordKey (m_keyBuffer);
383
+
384
+ FB_UINT64 offset = m_cachedCount * (m_messageLength + m_keyBufferLength);
385
+ FB_UINT64 writtenBytes = m_space.write (offset, buffer, m_messageLength);
386
+ offset += m_messageLength;
387
+ writtenBytes += m_space.write (offset, m_keyBuffer, m_keyBufferLength);
388
+ fb_assert (writtenBytes == m_messageLength + m_keyBufferLength);
336
389
m_cachedCount++;
337
390
}
338
391
0 commit comments